diff --git a/README.md b/README.md index 909e40e..2093f1d 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ Operations are applied concurrently across multiple tables * [VACUUM all tables](docs/Vacuum.md) ([example notebook](examples/vacuum_multiple_tables.py)) * OPTIMIZE with z-order on tables having specified columns * Detect tables having too many small files ([example notebook](examples/detect_small_files.py)) - * Visualise quantity of data written per table per period + * Visualise quantity of data written per table per period ([example notebook](examples/table_freshness.py)) + * Check delta protocol versions ([example notebook](examples/check_delta_protocol_version.py)) * **Governance** * PII detection with Presidio ([example notebook](examples/pii_detection_presidio.py)) * Text Analysis with MosaicML and Databricks MLflow ([example notebook](examples/text_analysis_mosaicml_mlflow.py)) diff --git a/examples/check_delta_protocol_version.py b/examples/check_delta_protocol_version.py new file mode 100644 index 0000000..3c866ba --- /dev/null +++ b/examples/check_delta_protocol_version.py @@ -0,0 +1,88 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Check delta protocol version +# MAGIC +# MAGIC This notebook will check the delta read and write protocol versions of multiple tables. +# MAGIC +# MAGIC Feature compatibility between delta lake versions is managed through [read protocol](https://docs.delta.io/latest/versioning.html#read-protocol) and [write protocol](https://docs.delta.io/latest/versioning.html#write-protocol). +# MAGIC +# MAGIC Check out the [feature by protocol version table](https://docs.delta.io/latest/versioning.html#features-by-protocol-version) for more details. + +# COMMAND ---------- + +# MAGIC %pip install dbl-discoverx + +# COMMAND ---------- + +dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*") +from_tables = dbutils.widgets.get("from_tables") + +# COMMAND ---------- + +from discoverx import DX + +dx = DX() + +# COMMAND ---------- + +dx.from_tables(from_tables)\ + .with_sql("SHOW TBLPROPERTIES {full_table_name}")\ + .apply()\ + .filter('key = "delta.minReaderVersion" OR key = "delta.minWriterVersion"')\ + .display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Show delta feature compatibility + +# COMMAND ---------- + +from pyspark.sql.functions import col, expr + +result = (dx.from_tables(from_tables)\ + .with_sql("SHOW TBLPROPERTIES {full_table_name}") + .apply() + .filter('key = "delta.minReaderVersion" OR key = "delta.minWriterVersion"') + .withColumn("value", col("value").cast("int")) + .groupBy("table_catalog", "table_schema", "table_name") + .pivot("key", ["delta.minWriterVersion", "delta.minReaderVersion"]) + .min("value") + .withColumnRenamed("delta.minReaderVersion", "minReaderVersion") + .withColumnRenamed("delta.minWriterVersion", "minWriterVersion") + .withColumn("supports_basic_functionality", expr("minWriterVersion >= 2 AND minReaderVersion >= 1")) + .withColumn("supports_check_constraint", expr("minWriterVersion >= 3 AND minReaderVersion >= 1")) + .withColumn("supports_change_data_feed", expr("minWriterVersion >= 4 AND minReaderVersion >= 1")) + .withColumn("supports_generated_columns", expr("minWriterVersion >= 4 AND minReaderVersion >= 1")) + .withColumn("supports_column_mapping", expr("minWriterVersion >= 5 AND minReaderVersion >= 2")) + .withColumn("supports_table_features_read", expr("minWriterVersion >= 7 AND minReaderVersion >= 1")) + .withColumn("supports_table_features_write", expr("minWriterVersion >= 7 AND minReaderVersion >= 3")) + .withColumn("supports_deletion_vectors", expr("minWriterVersion >= 7 AND minReaderVersion >= 3")) + .withColumn("supports_timestamp_without_timezone", expr("minWriterVersion >= 7 AND minReaderVersion >= 3")) + .withColumn("supports_iceberg_compatibilty_v1", expr("minWriterVersion >= 7 AND minReaderVersion >= 2")) + .withColumn("supports_v2_checkpoints", expr("minWriterVersion >= 7 AND minReaderVersion >= 3")) +) + +result.display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Update protocol version +# MAGIC +# MAGIC You can update the table protocol read and write versions by uncommenting the following snippet. +# MAGIC +# MAGIC !!! BE CAREFUL !!! +# MAGIC +# MAGIC Upgrading a reader or writer version might impact older DBR version's ability to read or write the tables. Check [this page](https://docs.databricks.com/en/delta/feature-compatibility.html#what-delta-lake-features-require-databricks-runtime-upgrades) for more details. + +# COMMAND ---------- + +# (dx.from_tables(from_tables) +# .with_sql("ALTER TABLE {full_table_name} SET TBLPROPERTIES('delta.minWriterVersion' = '5', 'delta.minReaderVersion' = '2')") +# .apply() +# ) + +# COMMAND ---------- + + diff --git a/examples/slide_deck_examples.py b/examples/slide_deck_examples.py new file mode 100644 index 0000000..6566be1 --- /dev/null +++ b/examples/slide_deck_examples.py @@ -0,0 +1,98 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # General examples +# MAGIC +# MAGIC This notebook contains a list of examples of what is possible to do with DiscoverX. + +# COMMAND ---------- + +# MAGIC %pip install dbl-discoverx + +# COMMAND ---------- + +dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*") +from_tables = dbutils.widgets.get("from_tables") + +# COMMAND ---------- + +from discoverx import DX + +dx = DX() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Which are the biggest 10 tables in the "sample_data_discoverx" catalog? +# MAGIC +# MAGIC + +# COMMAND ---------- + +from pyspark.sql.functions import col + +(dx + .from_tables("sample_data_discoverx.*.*") + .with_sql("DESCRIBE DETAIL {full_table_name}") + .apply() + .orderBy(col("sizeInBytes").desc()) + .display() +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Which tables have the most daily transactions? + +# COMMAND ---------- + +from pyspark.sql.functions import window + +(dx + .from_tables("sample_data_discoverx.*.*") + .with_sql("DESCRIBE HISTORY {full_table_name}") + .apply() + .groupBy("table_catalog", "table_schema", "table_name", window("timestamp", "1 day")) + .count() + .display() +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Which tables have too many small files? + +# COMMAND ---------- + +from pyspark.sql.functions import col, lit + +(dx + .from_tables("sample_data_discoverx.*.*") + .with_sql("DESCRIBE DETAIL {full_table_name}") + .apply() + .withColumn("average_file_size", col("sizeInBytes") / col("numFiles")) + .withColumn("has_many_small_files", + (col("average_file_size") < 10000000) & (col("numFiles") > 100)) + .orderBy("average_file_size") + .display() +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Which tables contain email addresses? + +# COMMAND ---------- + +result = (dx + .from_tables("sample_data_discoverx.*.*") + .scan() # Returns a Discovery object +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Which tables contain the email address "example@databricks.com”? + +# COMMAND ---------- + +result.search("example@databricks.com").display() diff --git a/examples/table_freshness.py b/examples/table_freshness.py new file mode 100644 index 0000000..b808d71 --- /dev/null +++ b/examples/table_freshness.py @@ -0,0 +1,103 @@ +# Databricks notebook source +# MAGIC %pip install dbl-discoverx + +# COMMAND ---------- + +dbutils.widgets.text("from_tables", "sample_data_discoverx.*.*") +from_tables = dbutils.widgets.get("from_tables") +dbutils.widgets.text("time_span", "1 day") +time_span = dbutils.widgets.get("time_span") + +# COMMAND ---------- + +from discoverx import DX + +dx = DX() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Number of delta table versions per time period + +# COMMAND ---------- + +from pyspark.sql.functions import window, count + +(dx + .from_tables(from_tables) + .with_sql("DESCRIBE HISTORY {full_table_name}") + .apply() + .groupBy("table_catalog", "table_schema", "table_name", window("timestamp", time_span)) + .agg(count("*").alias("delta_versions_count")) + .display() +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Number of processed rows + +# COMMAND ---------- + +sql_template = f""" + WITH metrics AS ( + SELECT timestamp, operation, explode(operationMetrics) AS (metric, value) + FROM ( + DESCRIBE HISTORY {{full_table_name}} + ) + ), + + metrics_window AS ( + SELECT window(timestamp, '{time_span}') AS time_window, metric, sum(value) as total_rows + FROM metrics + WHERE metric IN ( + -- Written + "numCopiedRows", + "numUpdatedRows", + "numOutputRows", + -- Deleted + "numDeletedRows", + "numTargetRowsDeleted" + ) + GROUP BY 1, 2 + ), + + metrics_pivot AS ( + SELECT * + FROM metrics_window + PIVOT (sum(total_rows) as total_rows + FOR (metric) IN ( + -- Written + "numCopiedRows", + "numUpdatedRows", + "numOutputRows", + + -- Deleted + "numDeletedRows", + "numTargetRowsDeleted" + ) + ) + ) + + SELECT + time_window, + -- Written rows include copied, updated and added rows + (COALESCE(numCopiedRows, 0) + COALESCE(numUpdatedRows, 0) + COALESCE(numOutputRows, 0)) AS totNumWrittenRows, + -- Deleted rows from both delete and merge operations + (COALESCE(numDeletedRows, 0) + COALESCE(numTargetRowsDeleted, 0)) AS totNumDeletedRows + FROM metrics_pivot +""" + +processed_rows = (dx + .from_tables(from_tables) + .with_sql(sql_template) + .apply() +).toPandas() + +# COMMAND ---------- + +display(processed_rows) + +# COMMAND ---------- + +