From 93c6e1480ee32c6683ccf1359a29ade37f884d0a Mon Sep 17 00:00:00 2001 From: "erni.durdevic@databricks.com" Date: Fri, 10 Nov 2023 23:02:14 +0000 Subject: [PATCH 1/7] Added slide deck examples --- examples/slide_deck_examples.py | 91 +++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 examples/slide_deck_examples.py diff --git a/examples/slide_deck_examples.py b/examples/slide_deck_examples.py new file mode 100644 index 0000000..7df154a --- /dev/null +++ b/examples/slide_deck_examples.py @@ -0,0 +1,91 @@ +# Databricks notebook source +# MAGIC %pip install dbl-discoverx + +# COMMAND ---------- + +dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*") +from_tables = dbutils.widgets.get("from_tables") + + +# COMMAND ---------- + +from discoverx import DX + +dx = DX() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Which are the biggest 10 tables in the "sample_data_discoverx" catalog? +# MAGIC +# MAGIC + +# COMMAND ---------- + +from pyspark.sql.functions import col + +(dx + .from_tables("sample_data_discoverx.*.*") + .with_sql("DESCRIBE DETAIL {full_table_name}") + .apply() + .orderBy(col("sizeInBytes").desc()) + .display() +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Which tables have the most daily transactions? + +# COMMAND ---------- + +from pyspark.sql.functions import window + +(dx + .from_tables("sample_data_discoverx.*.*") + .with_sql("DESCRIBE HISTORY {full_table_name}") + .apply() + .groupBy("table_catalog", "table_schema", "table_name", window("timestamp", "1 day")) + .count() + .display() +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Which tables have too many small files? + +# COMMAND ---------- + +from pyspark.sql.functions import col, lit + +(dx + .from_tables("sample_data_discoverx.*.*") + .with_sql("DESCRIBE DETAIL {full_table_name}") + .apply() + .withColumn("average_file_size", col("sizeInBytes") / col("numFiles")) + .withColumn("has_many_small_files", + (col("average_file_size") < 10000000) & (col("numFiles") > 100)) + .orderBy("average_file_size") + .display() +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Which tables contain email addresses? + +# COMMAND ---------- + +result = (dx + .from_tables("sample_data_discoverx.*.*") + .scan() +) + +# COMMAND ---------- + +result.search("erni@databricks.com").display() + +# COMMAND ---------- + + From a36098b8f65f34c488b4478c1838e8fbd379fe5b Mon Sep 17 00:00:00 2001 From: "erni.durdevic@databricks.com" Date: Fri, 10 Nov 2023 23:14:33 +0000 Subject: [PATCH 2/7] Updated scan and search examples --- examples/slide_deck_examples.py | 37 ++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/examples/slide_deck_examples.py b/examples/slide_deck_examples.py index 7df154a..963312d 100644 --- a/examples/slide_deck_examples.py +++ b/examples/slide_deck_examples.py @@ -79,13 +79,48 @@ result = (dx .from_tables("sample_data_discoverx.*.*") - .scan() + .scan() # Returns a Discovery object ) +# Ideally, this should look like this: +# (dx +# .from_tables("sample_data_discoverx.*.*") +# .scan() # This should return a DataExplorerActions object +# .apply() # This should return a Dataframe +# ) + +# and optionally +# classes = (dx +# .from_tables("sample_data_discoverx.*.*") +# .scan() # This should return a DataExplorerActions object +# .apply() # This should return a Dataframe +# ) +# dx.apply_tags(classes) + + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Which tables contain the email address “erni@databricks.com”? + # COMMAND ---------- result.search("erni@databricks.com").display() +# Ideally this should look like +# (dx +# .from_tables("sample_data_discoverx.*.*") +# .search("erni@databricks.com") # This should return a DataExplorerActions object +# .apply() # This should return a Dataframe +# ) + +# and optionally +# (dx +# .from_tables("sample_data_discoverx.*.*") +# .search("erni@databricks.com", in_columns_tagged_with=["email"]) # This should return a DataExplorerActions object +# .apply() # This should return a Dataframe +# ) + # COMMAND ---------- From 9771273c07e3a138acbbfc271ee7a2283a32f7a7 Mon Sep 17 00:00:00 2001 From: edurdevic Date: Sun, 31 Dec 2023 14:29:36 +0000 Subject: [PATCH 3/7] Updated slide deck examples --- examples/slide_deck_examples.py | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/examples/slide_deck_examples.py b/examples/slide_deck_examples.py index 963312d..9026ea8 100644 --- a/examples/slide_deck_examples.py +++ b/examples/slide_deck_examples.py @@ -6,7 +6,6 @@ dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*") from_tables = dbutils.widgets.get("from_tables") - # COMMAND ---------- from discoverx import DX @@ -82,22 +81,6 @@ .scan() # Returns a Discovery object ) -# Ideally, this should look like this: -# (dx -# .from_tables("sample_data_discoverx.*.*") -# .scan() # This should return a DataExplorerActions object -# .apply() # This should return a Dataframe -# ) - -# and optionally -# classes = (dx -# .from_tables("sample_data_discoverx.*.*") -# .scan() # This should return a DataExplorerActions object -# .apply() # This should return a Dataframe -# ) -# dx.apply_tags(classes) - - # COMMAND ---------- # MAGIC %md @@ -107,20 +90,6 @@ result.search("erni@databricks.com").display() -# Ideally this should look like -# (dx -# .from_tables("sample_data_discoverx.*.*") -# .search("erni@databricks.com") # This should return a DataExplorerActions object -# .apply() # This should return a Dataframe -# ) - -# and optionally -# (dx -# .from_tables("sample_data_discoverx.*.*") -# .search("erni@databricks.com", in_columns_tagged_with=["email"]) # This should return a DataExplorerActions object -# .apply() # This should return a Dataframe -# ) - # COMMAND ---------- From 3b4fa07069ad98aa218d02e555db40ffffb45343 Mon Sep 17 00:00:00 2001 From: edurdevic Date: Sun, 31 Dec 2023 15:04:16 +0000 Subject: [PATCH 4/7] Added table freshness --- examples/table_freshness.py | 103 ++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 examples/table_freshness.py diff --git a/examples/table_freshness.py b/examples/table_freshness.py new file mode 100644 index 0000000..b808d71 --- /dev/null +++ b/examples/table_freshness.py @@ -0,0 +1,103 @@ +# Databricks notebook source +# MAGIC %pip install dbl-discoverx + +# COMMAND ---------- + +dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*") +from_tables = dbutils.widgets.get("from_tables") +dbutils.widgets.text("time_span", "1 day") +time_span = dbutils.widgets.get("time_span") + +# COMMAND ---------- + +from discoverx import DX + +dx = DX() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Number of delta table versions per time period + +# COMMAND ---------- + +from pyspark.sql.functions import window, count + +(dx + .from_tables(from_tables) + .with_sql("DESCRIBE HISTORY {full_table_name}") + .apply() + .groupBy("table_catalog", "table_schema", "table_name", window("timestamp", time_span)) + .agg(count("*").alias("delta_versions_count")) + .display() +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Number of processed rows + +# COMMAND ---------- + +sql_template = f""" + WITH metrics AS ( + SELECT timestamp, operation, explode(operationMetrics) AS (metric, value) + FROM ( + DESCRIBE HISTORY {{full_table_name}} + ) + ), + + metrics_window AS ( + SELECT window(timestamp, '{time_span}') AS time_window, metric, sum(value) as total_rows + FROM metrics + WHERE metric IN ( + -- Written + "numCopiedRows", + "numUpdatedRows", + "numOutputRows", + -- Deleted + "numDeletedRows", + "numTargetRowsDeleted" + ) + GROUP BY 1, 2 + ), + + metrics_pivot AS ( + SELECT * + FROM metrics_window + PIVOT (sum(total_rows) as total_rows + FOR (metric) IN ( + -- Written + "numCopiedRows", + "numUpdatedRows", + "numOutputRows", + + -- Deleted + "numDeletedRows", + "numTargetRowsDeleted" + ) + ) + ) + + SELECT + time_window, + -- Written rows include copied, updated and added rows + (COALESCE(numCopiedRows, 0) + COALESCE(numUpdatedRows, 0) + COALESCE(numOutputRows, 0)) AS totNumWrittenRows, + -- Deleted rows from both delete and merge operations + (COALESCE(numDeletedRows, 0) + COALESCE(numTargetRowsDeleted, 0)) AS totNumDeletedRows + FROM metrics_pivot +""" + +processed_rows = (dx + .from_tables(from_tables) + .with_sql(sql_template) + .apply() +).toPandas() + +# COMMAND ---------- + +display(processed_rows) + +# COMMAND ---------- + + From d5abb7d4a60bbd5498de37df48b081ffe455a04d Mon Sep 17 00:00:00 2001 From: edurdevic Date: Sun, 31 Dec 2023 15:54:23 +0000 Subject: [PATCH 5/7] Added delta protocol version check --- examples/check_delta_protocol_version.py | 90 ++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 examples/check_delta_protocol_version.py diff --git a/examples/check_delta_protocol_version.py b/examples/check_delta_protocol_version.py new file mode 100644 index 0000000..29e4a7f --- /dev/null +++ b/examples/check_delta_protocol_version.py @@ -0,0 +1,90 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Check delta protocol version +# MAGIC +# MAGIC This notebook will check the delta read and write protocol versions of multiple tables. +# MAGIC +# MAGIC Feature compatibility between delta lake versions is managed through [read protocol](https://docs.delta.io/latest/versioning.html#read-protocol) and [write protocol](https://docs.delta.io/latest/versioning.html#write-protocol). +# MAGIC +# MAGIC Check out the [feature by protocol version table](https://docs.delta.io/latest/versioning.html#features-by-protocol-version) for more details. + +# COMMAND ---------- + +# %pip install dbl-discoverx + +# COMMAND ---------- + +dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*") +from_tables = dbutils.widgets.get("from_tables") + + +# COMMAND ---------- + +from discoverx import DX + +dx = DX() + + +# COMMAND ---------- + +dx.from_tables(from_tables)\ + .with_sql("SHOW TBLPROPERTIES {full_table_name}")\ + .apply()\ + .filter('key = "delta.minReaderVersion" OR key = "delta.minWriterVersion"')\ + .display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Show delta feature compatibility + +# COMMAND ---------- + +from pyspark.sql.functions import col, expr + +result = (dx.from_tables(from_tables)\ + .with_sql("SHOW TBLPROPERTIES {full_table_name}") + .apply() + .filter('key = "delta.minReaderVersion" OR key = "delta.minWriterVersion"') + .withColumn("value", col("value").cast("int")) + .groupBy("table_catalog", "table_schema", "table_name") + .pivot("key", ["delta.minWriterVersion", "delta.minReaderVersion"]) + .min("value") + .withColumnRenamed("delta.minReaderVersion", "minReaderVersion") + .withColumnRenamed("delta.minWriterVersion", "minWriterVersion") + .withColumn("supports_basic_functionality", expr("minWriterVersion >= 2 AND minReaderVersion >= 1")) + .withColumn("supports_check_constraint", expr("minWriterVersion >= 3 AND minReaderVersion >= 1")) + .withColumn("supports_change_data_feed", expr("minWriterVersion >= 4 AND minReaderVersion >= 1")) + .withColumn("supports_generated_columns", expr("minWriterVersion >= 4 AND minReaderVersion >= 1")) + .withColumn("supports_column_mapping", expr("minWriterVersion >= 5 AND minReaderVersion >= 2")) + .withColumn("supports_table_features_read", expr("minWriterVersion >= 7 AND minReaderVersion >= 1")) + .withColumn("supports_table_features_write", expr("minWriterVersion >= 7 AND minReaderVersion >= 3")) + .withColumn("supports_deletion_vectors", expr("minWriterVersion >= 7 AND minReaderVersion >= 3")) + .withColumn("supports_timestamp_without_timezone", expr("minWriterVersion >= 7 AND minReaderVersion >= 3")) + .withColumn("supports_iceberg_compatibilty_v1", expr("minWriterVersion >= 7 AND minReaderVersion >= 2")) + .withColumn("supports_v2_checkpoints", expr("minWriterVersion >= 7 AND minReaderVersion >= 3")) +) + +result.display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Update protocol version +# MAGIC +# MAGIC You can update the table protocol read and write versions by uncommenting the following snippet. +# MAGIC +# MAGIC !!! BE CAREFUL !!! +# MAGIC +# MAGIC Upgrading a reader or writer version might impact older DBR version's ability to read or write the tables. Check [this page](https://docs.databricks.com/en/delta/feature-compatibility.html#what-delta-lake-features-require-databricks-runtime-upgrades) for more details. + +# COMMAND ---------- + +# (dx.from_tables(from_tables) +# .with_sql("ALTER TABLE {full_table_name} SET TBLPROPERTIES('delta.minWriterVersion' = '5', 'delta.minReaderVersion' = '2')") +# .apply() +# ) + +# COMMAND ---------- + + From 68da83a9f19951901b03bec14a098dedcecfe71d Mon Sep 17 00:00:00 2001 From: edurdevic Date: Sun, 31 Dec 2023 15:58:27 +0000 Subject: [PATCH 6/7] Updated readme --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 909e40e..2093f1d 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ Operations are applied concurrently across multiple tables * [VACUUM all tables](docs/Vacuum.md) ([example notebook](examples/vacuum_multiple_tables.py)) * OPTIMIZE with z-order on tables having specified columns * Detect tables having too many small files ([example notebook](examples/detect_small_files.py)) - * Visualise quantity of data written per table per period + * Visualise quantity of data written per table per period ([example notebook](examples/table_freshness.py)) + * Check delta protocol versions ([example notebook](examples/check_delta_protocol_version.py)) * **Governance** * PII detection with Presidio ([example notebook](examples/pii_detection_presidio.py)) * Text Analysis with MosaicML and Databricks MLflow ([example notebook](examples/text_analysis_mosaicml_mlflow.py)) From eb3c3226f254170176b7071714094c6838ae3d69 Mon Sep 17 00:00:00 2001 From: edurdevic Date: Mon, 8 Jan 2024 21:00:36 +0000 Subject: [PATCH 7/7] Updated notebooks based on feedback --- examples/check_delta_protocol_version.py | 4 +--- examples/slide_deck_examples.py | 15 +++++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/examples/check_delta_protocol_version.py b/examples/check_delta_protocol_version.py index 29e4a7f..3c866ba 100644 --- a/examples/check_delta_protocol_version.py +++ b/examples/check_delta_protocol_version.py @@ -10,21 +10,19 @@ # COMMAND ---------- -# %pip install dbl-discoverx +# MAGIC %pip install dbl-discoverx # COMMAND ---------- dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*") from_tables = dbutils.widgets.get("from_tables") - # COMMAND ---------- from discoverx import DX dx = DX() - # COMMAND ---------- dx.from_tables(from_tables)\ diff --git a/examples/slide_deck_examples.py b/examples/slide_deck_examples.py index 9026ea8..6566be1 100644 --- a/examples/slide_deck_examples.py +++ b/examples/slide_deck_examples.py @@ -1,4 +1,11 @@ # Databricks notebook source +# MAGIC %md +# MAGIC # General examples +# MAGIC +# MAGIC This notebook contains a list of examples of what is possible to do with DiscoverX. + +# COMMAND ---------- + # MAGIC %pip install dbl-discoverx # COMMAND ---------- @@ -84,12 +91,8 @@ # COMMAND ---------- # MAGIC %md -# MAGIC ## Which tables contain the email address “erni@databricks.com”? +# MAGIC ## Which tables contain the email address "example@databricks.com”? # COMMAND ---------- -result.search("erni@databricks.com").display() - -# COMMAND ---------- - - +result.search("example@databricks.com").display()