-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path08.DeltaLiveMedallion.py
More file actions
46 lines (38 loc) · 1.36 KB
/
08.DeltaLiveMedallion.py
File metadata and controls
46 lines (38 loc) · 1.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# Databricks notebook source
import dlt
from pyspark.sql.functions import *
# COMMAND ----------
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
# COMMAND ----------
@dlt.table(
comment="The raw wikipedia clickstream dataset, ingested from /databricks-datasets."
)
def clickstream_raw():
return (spark.read.format("json").load(json_path))
# COMMAND ----------
@dlt.table(
comment="Wikipedia clickstream data cleaned and prepared for analysis."
)
@dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_prepared():
return (
dlt.read("clickstream_raw")
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_title", "click_count", "previous_page_title")
)
# COMMAND ----------
@dlt.table(
comment="A table containing the top pages linking to the Apache Spark page."
)
def top_spark_referrers():
return (
dlt.read("clickstream_prepared")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.sort(desc("click_count"))
.select("referrer", "click_count")
.limit(10)
)