From cfa4b4021386f3cc9cc1e6b774fb7343bc4eafc1 Mon Sep 17 00:00:00 2001 From: "erni.durdevic@databricks.com" Date: Tue, 20 Jun 2023 04:22:57 +0000 Subject: [PATCH] Added table freshness --- discoverx/dx.py | 24 ++++++++ discoverx/scanner.py | 108 +++++++++++++++++++++++++++++++++++- notebooks/interaction_ui.py | 84 ++++++++++++++++++++++++++++ 3 files changed, 214 insertions(+), 2 deletions(-) diff --git a/discoverx/dx.py b/discoverx/dx.py index 772d9fc..55080cf 100644 --- a/discoverx/dx.py +++ b/discoverx/dx.py @@ -137,6 +137,30 @@ def scan_result(self): return self.scanner.scan_result.df + def table_freshness( + self, + from_tables="*.*.*", + what_if: bool = False, + ): + """Scans the lakehouse for columns matching the given rules + + Args: + from_tables (str, optional): The tables to be scanned in format "catalog.schema.table", use "*" as a wildcard. Defaults to "*.*.*". + what_if (bool, optional): Whether to run the scan in what-if mode and print the SQL commands instead of executing them. Defaults to False. + """ + catalogs, schemas, tables = Msql.validate_from_components(from_tables) + + self.scanner = Scanner( + self.spark, + self.rules, + catalogs=catalogs, + schemas=schemas, + tables=tables, + what_if=what_if, + ) + + return self.scanner.scan_history() + def _classify(self, classification_threshold: float): """Classifies the columns in the lakehouse diff --git a/discoverx/scanner.py b/discoverx/scanner.py index d23e125..9d776d5 100644 --- a/discoverx/scanner.py +++ b/discoverx/scanner.py @@ -191,13 +191,117 @@ def scan(self): logger.debug("Launching lakehouse scanning task\n") + self.scan_result = self.do_scan(self.scan_table) + + + def scan_table_history(self, table): + + try: + if self.what_if: + logger.friendly(f"SQL that would be executed for '{table.catalog}.{table.schema}.{table.table}'") + else: + logger.friendly(f"Checking history for table '{table.catalog}.{table.schema}.{table.table}'") + + frequency = "1 day" + # Build rule matching SQL + # TODO: Add more metrics https://docs.delta.io/latest/delta-utility.html#operation-metrics-keys + sql = f""" + WITH + + all_dates AS ( + SELECT explode(sequence(to_date(date_sub(now(), 90)), to_date(now()), interval {frequency})) as date + ), + + table_metrics AS ( + SELECT w.start, w.end, * + FROM + ( + SELECT * + FROM + ( + SELECT window(timestamp, '{frequency}') AS w, metric, sum(value) AS value + FROM ( + SELECT timestamp, explode(operationMetrics) AS (metric, value) + FROM ( + DESCRIBE HISTORY {table.catalog}.{table.schema}.{table.table} + ) + ) + GROUP BY window(timestamp, '{frequency}'), metric + ) + PIVOT ( + SUM(value) AS val + FOR (metric) IN ( + + 'numAddedBytes' AS numAddedBytes, + 'numOutputBytes' AS numOutputBytes, + 'numRemovedBytes' AS numRemovedBytes, + + 'numFiles' AS numFiles, + 'numAddedFiles' AS numAddedFiles, + 'numAddedChangeFiles' AS numAddedChangeFiles, + 'numRemovedFiles' AS numRemovedFiles, + + 'numOutputRows' AS numOutputRows, + 'numCopiedRows' AS numCopiedRows, + 'numDeletedRows' AS numDeletedRows, + + 'numDeletionVectorsAdded' AS numDeletionVectorsAdded, + 'numDeletionVectorsRemoved' AS numDeletionVectorsRemoved, + + 'executionTimeMs' AS executionTimeMs, + 'scanTimeMs' AS scanTimeMs, + 'rewriteTimeMs' AS rewriteTimeMs + ) + ) + ) + ) + + + SELECT + '{table.catalog}' as table_catalog, + '{table.schema}' as table_schema, + '{table.table}' as table_name, + * + FROM table_metrics + """ + + if self.what_if: + logger.friendly(sql) + else: + # Execute SQL and return the result + return self.spark.sql(sql).toPandas() + except Exception as e: + logger.error(f"Error while scanning table history '{table.catalog}.{table.schema}.{table.table}': {e}") + return None + + def scan_history(self): + + logger.friendly("""Ok, I'm going to scan your lakehouse data history.""") + text = f""" + This is what you asked for: + + catalogs ({self.content.n_catalogs}) = {self.catalogs} + schemas ({self.content.n_schemas}) = {self.schemas} + tables ({self.content.n_tables}) = {self.tables} + + This may take a while, so please be patient. I'll let you know when I'm done. + ... + """ + + logger.friendly(strip_margin(text)) + + logger.debug("Launching lakehouse history scanning task\n") + + return self.do_scan(self.scan_table_history) + + def do_scan(self, f): if len(self.content.table_list) == 0: raise Exception("No tables found matching your filters") dfs = [] with concurrent.futures.ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor: # Submit tasks to the thread pool - futures = [executor.submit(self.scan_table, table) for table in self.content.table_list] + futures = [executor.submit(f, table) for table in self.content.table_list] # Process completed tasks for future in concurrent.futures.as_completed(futures): @@ -208,7 +312,7 @@ def scan(self): logger.debug("Finished lakehouse scanning task") if dfs: - self.scan_result = ScanResult(df=pd.concat(dfs)) + return ScanResult(df=pd.concat(dfs)) else: raise Exception("No tables were scanned successfully.") diff --git a/notebooks/interaction_ui.py b/notebooks/interaction_ui.py index c91fb2f..4e169df 100644 --- a/notebooks/interaction_ui.py +++ b/notebooks/interaction_ui.py @@ -44,6 +44,14 @@ # COMMAND ---------- +df = dx.table_freshness(from_tables="discoverx*.*.*", what_if=False) + +# COMMAND ---------- + +display(df.df) + +# COMMAND ---------- + # MAGIC %md # MAGIC ### Scan # MAGIC This section demonstrates a typical DiscoverX workflow which consists of the following steps: @@ -174,3 +182,79 @@ help(DX) # COMMAND ---------- + +# MAGIC %sql +# MAGIC +# MAGIC + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC +# MAGIC +# MAGIC +# MAGIC WITH +# MAGIC +# MAGIC all_dates AS ( +# MAGIC SELECT explode(sequence(to_date(date_sub(now(), 90)), to_date(now()), interval 1 day)) as date +# MAGIC ), +# MAGIC +# MAGIC table_metrics AS ( +# MAGIC SELECT w.start, w.end, * +# MAGIC FROM +# MAGIC ( +# MAGIC SELECT * +# MAGIC FROM +# MAGIC ( +# MAGIC SELECT window(timestamp, '1 day') AS w, metric, sum(value) AS value +# MAGIC FROM ( +# MAGIC SELECT timestamp, explode(operationMetrics) AS (metric, value) +# MAGIC FROM ( +# MAGIC DESCRIBE HISTORY discoverx_sample.sample_datasets.cyber_data +# MAGIC ) +# MAGIC ) +# MAGIC GROUP BY window(timestamp, '1 day'), metric +# MAGIC ) +# MAGIC PIVOT ( +# MAGIC SUM(value) AS val +# MAGIC FOR (metric) IN ( +# MAGIC -- TODO: add metrics https://docs.delta.io/latest/delta-utility.html#operation-metrics-keys +# MAGIC 'numAddedBytes' AS numAddedBytes, +# MAGIC 'numOutputBytes' AS numOutputBytes, +# MAGIC 'numRemovedBytes' AS numRemovedBytes, +# MAGIC +# MAGIC 'numFiles' AS numFiles, +# MAGIC 'numAddedFiles' AS numAddedFiles, +# MAGIC 'numAddedChangeFiles' AS numAddedChangeFiles, +# MAGIC 'numRemovedFiles' AS numRemovedFiles, +# MAGIC +# MAGIC 'numOutputRows' AS numOutputRows, +# MAGIC 'numCopiedRows' AS numCopiedRows, +# MAGIC 'numDeletedRows' AS numDeletedRows, +# MAGIC +# MAGIC 'numDeletionVectorsAdded' AS numDeletionVectorsAdded, +# MAGIC 'numDeletionVectorsRemoved' AS numDeletionVectorsRemoved, +# MAGIC +# MAGIC 'executionTimeMs' AS executionTimeMs, +# MAGIC 'scanTimeMs' AS scanTimeMs, +# MAGIC 'rewriteTimeMs' AS rewriteTimeMs +# MAGIC ) +# MAGIC ) +# MAGIC ) +# MAGIC ) +# MAGIC +# MAGIC +# MAGIC SELECT 'cyber_data' AS table_name, * +# MAGIC FROM all_dates +# MAGIC LEFT OUTER JOIN table_metrics +# MAGIC ON date = to_date(w.end) + +# COMMAND ---------- + +dx._msql( + "SELECT * FROM (DESCRIBE HISTORY *.*.*)", + what_if=True) + +# COMMAND ---------- + +