diff --git a/README.md b/README.md index b196f0c..69e6897 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,11 @@ The properties available in table_info are * **Maintenance** * [VACUUM all tables](docs/Vacuum.md) ([example notebook](examples/vacuum_multiple_tables.py)) * Detect tables having too many small files ([example notebook](examples/detect_small_files.py)) + * Delta housekeeping analysis ([example notebook](examples/exec_delta_housekeeping.py)) which provide: + * stats (size of tables and number of files, timestamps of latest OPTIMIZE & VACUUM operations, stats of OPTIMIZE) + * recommendations on tables that need to be OPTIMIZED/VACUUM'ed + * are tables OPTIMIZED/VACUUM'ed often enough + * tables that have small files / tables for which ZORDER is not being effective * Deep clone a catalog ([example notebook](examples/deep_clone_schema.py)) * **Governance** * PII detection with Presidio ([example notebook](examples/pii_detection_presidio.py)) @@ -91,7 +96,7 @@ from discoverx import DX dx = DX(locale="US") ``` -You can now run operations across multiple tables. +You can now run operations across multiple tables. ## Available functionality @@ -128,4 +133,3 @@ After a `with_sql` or `unpivot_string_columns` command, you can apply the follow Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects. Any issues discovered through the use of this project should be filed as GitHub Issues on the Repo. They will be reviewed as time permits, but there are no formal SLAs for support. - diff --git a/discoverx/delta_housekeeping.py b/discoverx/delta_housekeeping.py new file mode 100644 index 0000000..45daa11 --- /dev/null +++ b/discoverx/delta_housekeeping.py @@ -0,0 +1,450 @@ +from typing import Iterable, Callable +from datetime import datetime, timezone +import pandas as pd + +from discoverx.table_info import TableInfo + +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.window import Window +import pyspark.sql.types as T +import pyspark.sql.functions as F + + +class DeltaHousekeeping: + + def __init__(self, spark: SparkSession) -> None: + self._spark = spark + self.empty_schema = T.StructType([ + T.StructField("catalog", T.StringType()), + T.StructField("database", T.StringType()), + T.StructField("tableName", T.StringType()), + ]) + + @staticmethod + def _process_describe_history( + describe_detail_df: DataFrame, describe_history_df: DataFrame + ) -> pd.DataFrame: + """ + processes the DESCRIBE HISTORY result of potentially several tables in different schemas/catalogs + Provides + - table stats (size and number of files) + - timestamp for last & second last OPTIMIZE + - stats of OPTIMIZE (including ZORDER) + - timestamp for last & second last VACUUM + + returns a pandas DataFrame, and converts Spark internal dfs to pandas as soon as they are manageable + the reason being that DESCRIBE HISTORY / DESCRIBE DETAIL cannot be cached + """ + if not "operation" in describe_history_df.columns: + return describe_detail_df.toPandas() + + # window over operation + operation_order = ( + describe_history_df + .filter(F.col("operation").isin(["OPTIMIZE", "VACUUM END"])) + .withColumn("operation_order", F.row_number().over( + Window.partitionBy(["catalog", "database", "tableName", "operation"]).orderBy(F.col("timestamp").desc()) + )) + ) + + if operation_order.isEmpty(): + return describe_detail_df.toPandas() + + operation_order = operation_order.toPandas() + + # max & 2nd timestamp of OPTIMIZE into output + out = describe_detail_df.toPandas().merge( + operation_order[(operation_order.operation == "OPTIMIZE") & (operation_order.operation_order == 1)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': 'max_optimize_timestamp'}), + how="outer", on=["catalog", "database", "tableName"] + ) + out = out.merge( + operation_order[(operation_order.operation == "OPTIMIZE") & (operation_order.operation_order == 2)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': '2nd_optimize_timestamp'}), + how="outer", on=["catalog", "database", "tableName"] + ) + # max timestamp of VACUUM into output + out = out.merge( + operation_order[(operation_order.operation == "VACUUM END") & (operation_order.operation_order == 1)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': 'max_vacuum_timestamp'}), + how="outer", on=["catalog", "database", "tableName"] + ) + out = out.merge( + operation_order[(operation_order.operation == "VACUUM END") & (operation_order.operation_order == 2)] + .loc[:, ["catalog", "database", "tableName", "timestamp"]] + .rename(columns={'timestamp': '2nd_vacuum_timestamp'}), + how="outer", on=["catalog", "database", "tableName"] + ) + # summary of table metrics + table_metrics_1 = ( + operation_order[(operation_order['operation'] == 'OPTIMIZE') & (operation_order['operation_order'] == 1)] + .loc[:, ['catalog', 'database', 'tableName', 'min_file_size', 'p50_file_size', 'max_file_size', 'z_order_by']] + ) + + # write to output + out = out.merge( + table_metrics_1, + how="outer", on=["catalog", "database", "tableName"] + ) + + return out + + @staticmethod + def save_as_table( + result: DataFrame, + housekeeping_table_name: str, + ): + """ + Static method to store intermediate results of the scan operation into Delta + Would make sense only if using map_chunked from the `DataExplorer` object + (otherwise tables are writen one by one into Delta with overhead) + """ + ( + result + .write + .format("delta") + .mode("append") + .option("mergeSchema", "true") + .saveAsTable(housekeeping_table_name) + ) + + def get_describe_detail(self, table_info: TableInfo): + dd = self._spark.sql(f""" + DESCRIBE DETAIL {table_info.catalog}.{table_info.schema}.{table_info.table}; + """) + dd = ( + dd + .withColumn("split", F.split(F.col('name'), '\.')) + .withColumn("catalog", F.col("split").getItem(0)) + .withColumn("database", F.col("split").getItem(1)) + .withColumn("tableName", F.col("split").getItem(2)) + .select([ + F.col("catalog"), + F.col("database"), + F.col("tableName"), + F.col("numFiles").alias("number_of_files"), + F.col("sizeInBytes").alias("bytes"), + ]) + ) + return dd + + @staticmethod + def get_describe_history_statement(table_info: TableInfo): + return f""" + SELECT + '{table_info.catalog}' AS catalog, + '{table_info.schema}' AS database, + '{table_info.table}' AS tableName, + operation, + timestamp, + operationMetrics.minFileSize AS min_file_size, + operationMetrics.p50FileSize AS p50_file_size, + operationMetrics.maxFileSize AS max_file_size, + operationParameters.zOrderBy AS z_order_by + FROM (DESCRIBE HISTORY {table_info.catalog}.{table_info.schema}.{table_info.table}) + WHERE operation in ('OPTIMIZE', 'VACUUM END') + """ + + def scan( + self, + table_info: TableInfo, + ) -> pd.DataFrame: + """ + Scans a table_info to fetch Delta stats + - DESCRIBE DETAIL + - DESCRIBE HISTORY + """ + try: + # runs a describe detail per table, figures out if exception + dd = self.get_describe_detail(table_info) + + # prepares a DESCRIBE HISTORY statement per table (will be run outside the try-catch) + statement = self.get_describe_history_statement(table_info) + + return self._process_describe_history( + dd, + self._spark.sql(statement), + ) + + except Exception as e: + errors_df = self._spark.createDataFrame( + [(table_info.catalog or "", table_info.schema, table_info.table, str(e))], + ["catalog", "database", "tableName", "error"] + ) + return errors_df.toPandas() + + +class DeltaHousekeepingActions: + """ + Processes the output of the `DeltaHousekeeping` object to provide recommendations + - tables that need to be OPTIMIZED/VACUUM'ed + - are tables OPTIMIZED/VACUUM'ed often enough + - tables that have small files / tables for which ZORDER is not being effective + """ + + def __init__( + self, + mapped_pd_dfs: Iterable[pd.DataFrame], + spark: SparkSession = None, + min_table_size_optimize: int = 128*1024*1024, # i.e. 128 MB + min_days_not_optimized: int = 7, # in days + min_days_not_vacuumed: int = 31, # in days + max_optimize_freq: int = 2, # in days - e.g. 2 means that a daily run would be flagged + max_vacuum_freq: int = 2, # in days - e.g. 2 means that a daily run would be flagged + small_file_threshold: int = 32*1024*1024, # i.e. 32 MB + min_number_of_files_for_zorder: int = 8, + stats: pd.DataFrame = None, # for testability only + ) -> None: + if spark is None: + spark = SparkSession.builder.getOrCreate() + self._spark = spark + + if stats is None: + self._mapped_pd_dfs = mapped_pd_dfs + stats = pd.concat(self._mapped_pd_dfs) + self._stats_df: DataFrame = self._spark.createDataFrame(stats) + for column in [col_name for col_name in self._stats_df.columns if 'timestamp' in col_name]: + column_type = [dtype for col_name, dtype in self._stats_df.dtypes if col_name == column][0] + if column_type == 'timestamp': + continue + self._stats_df = self._stats_df.withColumn( + column, + F.when(F.isnan(F.col(column)), None).otherwise(F.col(column)) + ) + self._stats_rec: DataFrame = None + + self.min_table_size_optimize = min_table_size_optimize + self.min_days_not_optimized = min_days_not_optimized + self.min_days_not_vacuumed = min_days_not_vacuumed + self.max_optimize_freq = max_optimize_freq + self.max_vacuum_freq = max_vacuum_freq + self.small_file_threshold = small_file_threshold + self.min_number_of_files_for_zorder = min_number_of_files_for_zorder + self.reason_col_suffix = "_reason" + self.recomendations_dict = { + "not_optimized": { + "legend": "Tables that have never been OPTIMIZED and would benefit from it", + "col_name": "rec_not_optimized" + }, + "not_vacuumed": { + "legend": "Tables that have never been VACUUM'ed", + "col_name": "rec_not_vacuumed" + }, + "not_optimized_last_days": { + "legend": "Tables that are not OPTIMIZED often enough", + "col_name": "rec_not_optimized_last_days" + }, + "not_vacuumed_last_days": { + "legend": "Tables that are not VACUUM'ed often enough", + "col_name": "rec_not_vacuumed_last_days" + }, + "optimized_too_freq": { + "legend": "Tables that are OPTIMIZED too often", + "col_name": "rec_optimized_too_freq" + }, + "vacuumed_too_freq": { + "legend": "Tables that are VACUUM'ed too often", + "col_name": "rec_vacuumed_too_freq" + }, + "do_not_need_optimize": { + "legend": "Tables that are too small to be OPTIMIZED", + "col_name": "rec_do_not_need_optimize" + }, + "to_analyze": { + "legend": "Tables that need more analysis -small_files", + "col_name": "rec_to_analyze" + }, + "zorder_not_effective": { + "legend": "Tables for which ZORDER is not being effective", + "col_name": "rec_zorder_not_effective" + }, + } + + def _need_optimize(self, stats_rec: DataFrame) -> DataFrame: + conf_dict = self.recomendations_dict["not_optimized"] + return stats_rec.withColumn( + conf_dict["col_name"], + F.when( + F.col("max_optimize_timestamp").isNull() & + F.col("bytes").isNotNull() & (F.col("bytes").astype("int") > F.lit(self.min_table_size_optimize)), + F.lit(True) + ).otherwise(F.lit(False)) + ) + + def _optimize_not_needed(self, stats_rec: DataFrame) -> DataFrame: + conf_dict = self.recomendations_dict["do_not_need_optimize"] + return stats_rec.withColumn( + conf_dict["col_name"], + F.when( + F.col("max_optimize_timestamp").isNotNull() & + F.col("bytes").isNotNull() & + (F.col("bytes").astype("int") < F.lit(self.min_table_size_optimize)), + F.lit(True) + ).otherwise(F.lit(False)) + ) + + def _not_optimized_last_days(self, stats_rec: DataFrame) -> DataFrame: + conf_dict = self.recomendations_dict["not_optimized_last_days"] + return stats_rec.withColumn( + "optimize_lag", + F.date_diff(F.lit(datetime.today()), F.col("max_optimize_timestamp")) + ).withColumn( + conf_dict["col_name"], + F.when( + F.col("optimize_lag") > F.lit(self.min_days_not_optimized), + F.lit(True) + ).otherwise(F.lit(False)) + ) + + def _optimized_too_frequently(self, stats_rec: DataFrame) -> DataFrame: + conf_dict = self.recomendations_dict["optimized_too_freq"] + return stats_rec.withColumn( + "optimize_freq", + F.when( + F.col("max_optimize_timestamp").isNotNull() & F.col("2nd_optimize_timestamp").isNotNull(), + F.date_diff(F.col("max_optimize_timestamp"), F.col("2nd_optimize_timestamp")) + ) + ).withColumn( + conf_dict["col_name"], + F.when( + F.col("optimize_freq") < F.lit(self.max_optimize_freq), + F.lit(True) + ).otherwise(F.lit(False)) + ) + + def _never_vacuumed(self, stats_rec: DataFrame) -> DataFrame: + conf_dict = self.recomendations_dict["not_vacuumed"] + return stats_rec.withColumn( + conf_dict["col_name"], + F.when( + F.col("max_vacuum_timestamp").isNull(), + F.lit(True) + ).otherwise(F.lit(False)) + ) + + def _not_vacuumed_last_days(self, stats_rec: DataFrame) -> DataFrame: + conf_dict = self.recomendations_dict["not_vacuumed_last_days"] + return stats_rec.withColumn( + "vacuum_lag", + F.date_diff(F.lit(datetime.today()), F.col("max_vacuum_timestamp")) + ).withColumn( + conf_dict["col_name"], + F.when( + F.col("vacuum_lag") > F.lit(self.min_days_not_vacuumed), + F.lit(True) + ).otherwise(F.lit(False)) + ) + + def _vacuumed_too_frequently(self, stats_rec: DataFrame) -> DataFrame: + conf_dict = self.recomendations_dict["vacuumed_too_freq"] + return stats_rec.withColumn( + "vacuum_freq", + F.when( + F.col("max_vacuum_timestamp").isNotNull() & F.col("2nd_vacuum_timestamp").isNotNull(), + F.date_diff(F.col("max_vacuum_timestamp"), F.col("2nd_vacuum_timestamp")) + ) + ).withColumn( + conf_dict["col_name"], + F.when( + F.col("vacuum_freq") < F.lit(self.max_vacuum_freq), + F.lit(True) + ).otherwise(F.lit(False)) + ) + + def _analyze_these_tables(self, stats_rec: DataFrame) -> DataFrame: + conf_dict = self.recomendations_dict["to_analyze"] + return stats_rec.withColumn( + conf_dict["col_name"], + F.when( + F.col("max_optimize_timestamp").isNotNull() & + F.col("p50_file_size").isNotNull() & (F.col("number_of_files") > F.lit(1)) & + (F.col("p50_file_size").astype("int") < F.lit(self.small_file_threshold)), + F.lit(True) + ).otherwise(F.lit(False)) + ) + + def _zorder_not_effective(self, stats_rec: DataFrame) -> DataFrame: + conf_dict = self.recomendations_dict["zorder_not_effective"] + return stats_rec.withColumn( + "z_order_by_clean", + F.when( + F.col("max_optimize_timestamp").isNull() | + F.col("p50_file_size").isNull() | (F.col("z_order_by") == "[]"), + None + ).otherwise( + F.regexp_replace( + F.regexp_replace( + F.regexp_replace( + F.col("z_order_by"), "\\[", "" + ), "\\]", "" + ), '"', "" + ) + ) + ).withColumn( + "z_order_by_array", F.split(F.col("z_order_by_clean"), ",") + ).withColumn( + conf_dict["col_name"], + F.when( + (F.size(F.col("z_order_by_array")) > 0) & + (F.col("number_of_files") < F.lit(self.min_number_of_files_for_zorder)), + F.lit(True) + ).otherwise(F.lit(False)) + ) + + def display(self) -> None: + """Executes the Delta housekeeping analysis and displays a sample of results""" + return self.apply().display() + + def apply(self) -> DataFrame: + """Displays recommendations in a DataFrame format""" + return self.generate_recommendations() + + def generate_recommendations(self) -> DataFrame: + """ + Generates Delta Housekeeping recommendations as a list of dictionaries (internal use + unit tests only) + A dict per recommendation where: + - The key is the legend of the recommendation + - The value is a pandas df with the affected tables + """ + if self._stats_rec is None: + stats_rec = self._need_optimize(self._stats_df) + stats_rec = self._never_vacuumed(stats_rec) + stats_rec = self._not_optimized_last_days(stats_rec) + stats_rec = self._not_vacuumed_last_days(stats_rec) + stats_rec = self._optimized_too_frequently(stats_rec) + stats_rec = self._vacuumed_too_frequently(stats_rec) + stats_rec = self._optimize_not_needed(stats_rec) + stats_rec = self._analyze_these_tables(stats_rec) + stats_rec = self._zorder_not_effective(stats_rec) + self._stats_rec = stats_rec + + return self._stats_rec + + def _explain(self) -> Iterable[dict]: + stats = self.generate_recommendations() + out = [] + for _, item in self.recomendations_dict.items(): + legend = None + col_name = None + for k, v in item.items(): + if k == "legend": + legend = v + elif k == "col_name": + col_name = v + + out.append({ + legend: stats.filter(F.col(col_name)) + }) + + return out + + def explain(self) -> None: + from databricks.sdk.runtime import display + + for item in self._explain(): + for legend, df in item.items(): + display(legend) + display(df) diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 22d85e3..d252f52 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -1,7 +1,8 @@ import concurrent.futures import copy import re -from typing import Optional, List +import pandas as pd +from typing import Optional, List, Callable, Iterable, Any from discoverx import logging from discoverx.common import helper from discoverx.discovery import Discovery @@ -11,6 +12,7 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import lit from discoverx.table_info import InfoFetcher, TableInfo +from discoverx.delta_housekeeping import DeltaHousekeeping, DeltaHousekeepingActions logger = logging.Logging() @@ -182,7 +184,7 @@ def scan( discover.scan(rules=rules, sample_size=sample_size, what_if=what_if) return discover - def map(self, f) -> list[any]: + def map(self, f: Callable[[TableInfo], Any]) -> list[Any]: """Runs a function for each table in the data explorer Args: @@ -214,6 +216,14 @@ def map(self, f) -> list[any]: return res + def delta_housekeeping(self) -> pd.DataFrame: + """ + Gathers stats and recommendations on Delta Housekeeping + """ + dh = DeltaHousekeeping(self._spark) + dfs_pd: Iterable[pd.DataFrame] = self.map(dh.scan) + return DeltaHousekeepingActions(dfs_pd, spark=self._spark) + class DataExplorerActions: def __init__( diff --git a/examples/exec_delta_housekeeping.py b/examples/exec_delta_housekeeping.py new file mode 100644 index 0000000..bf41e4c --- /dev/null +++ b/examples/exec_delta_housekeeping.py @@ -0,0 +1,65 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Run Delta Housekeeping across multiple tables +# MAGIC Analysis that provides stats on Delta tables / recommendations for improvements, including: +# MAGIC - stats:size of tables and number of files, timestamps of latest OPTIMIZE & VACUUM operations, stats of OPTIMIZE) +# MAGIC - recommendations on tables that need to be OPTIMIZED/VACUUM'ed +# MAGIC - are tables OPTIMIZED/VACUUM'ed often enough +# MAGIC - tables that have small files / tables for which ZORDER is not being effective +# MAGIC + +# COMMAND ---------- + +# MAGIC %pip install dbl-discoverx + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Declare Variables + +# COMMAND ---------- + +dbutils.widgets.text("catalogs", "*", "Catalogs") +dbutils.widgets.text("schemas", "*", "Schemas") +dbutils.widgets.text("tables", "*", "Tables") + +# COMMAND ---------- + +catalogs = dbutils.widgets.get("catalogs") +schemas = dbutils.widgets.get("schemas") +tables = dbutils.widgets.get("tables") +from_table_statement = ".".join([catalogs, schemas, tables]) + +# COMMAND ---------- + +from discoverx import DX + +dx = DX() + +# COMMAND ---------- + +# DBTITLE 1,Run the discoverx DeltaHousekeeping operation -generates an output object on which you can run operations +output = ( + dx.from_tables(from_table_statement) + .delta_housekeeping() +) + +# COMMAND ---------- + +# DBTITLE 1,apply() operation generates a spark dataframe with recommendations +result = output.apply() +result.select("catalog", "database", "tableName", *[c for c in result.columns if c.startswith("rec_")]).display() + +# COMMAND ---------- + +# DBTITLE 1,display() runs apply and displays the full result (including stats per table) +output.display() + +# COMMAND ---------- + +# DBTITLE 1,explain() outputs the DeltaHousekeeping recommendations in HTML format +output.explain() + +# COMMAND ---------- + + diff --git a/tests/unit/data/delta_housekeeping/dd_click_sales.csv b/tests/unit/data/delta_housekeeping/dd_click_sales.csv new file mode 100644 index 0000000..4ffb5a3 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dd_click_sales.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes +lorenzorubi,default,click_sales,6,326068799 diff --git a/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv b/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv new file mode 100644 index 0000000..70fc5a1 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dd_housekeeping_summary.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes +lorenzorubi,default,housekeeping_summary,1,192917 diff --git a/tests/unit/data/delta_housekeeping/dh_click_sales.csv b/tests/unit/data/delta_housekeeping/dh_click_sales.csv new file mode 100644 index 0000000..f35d6b6 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dh_click_sales.csv @@ -0,0 +1,4 @@ +catalog,database,tableName,operation,timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,click_sales,VACUUM END,2023-12-06T16:40:28Z,null,null,null,null +lorenzorubi,default,click_sales,VACUUM END,2023-12-05T01:19:47Z,null,null,null,null +lorenzorubi,default,click_sales,VACUUM END,2023-11-25T04:03:41Z,null,null,null,null diff --git a/tests/unit/data/delta_housekeeping/dh_housekeeping_summary.csv b/tests/unit/data/delta_housekeeping/dh_housekeeping_summary.csv new file mode 100644 index 0000000..d1ee36e --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dh_housekeeping_summary.csv @@ -0,0 +1,25 @@ +catalog,database,tableName,operation,timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T05:50:14Z,192917,192917,192917,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T05:21:22Z,184203,184203,184203,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T04:37:19Z,176955,176955,176955,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T04:10:26Z,168560,168560,168560,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T03:11:02Z,161710,161710,161710,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T02:44:41Z,154166,154166,154166,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T02:18:54Z,145990,145990,145990,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T01:42:12Z,137677,137677,137677,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T01:09:19Z,130864,130864,130864,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:53:33Z,123702,123702,123702,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:43:44Z,118806,118806,118806,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:28:00Z,111983,111983,111983,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-05T00:14:21Z,104790,104790,104790,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T23:47:02Z,97314,97314,97314,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T23:18:17Z,91509,91509,91509,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T22:14:48Z,84152,84152,84152,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T21:57:53Z,76464,76464,76464,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T21:30:49Z,67498,67498,67498,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T21:18:59Z,59412,59412,59412,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T20:30:48Z,51173,51173,51173,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T20:12:59Z,42346,42346,42346,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T19:35:05Z,34463,34463,34463,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T19:30:46Z,28604,28604,28604,[] +lorenzorubi,default,housekeeping_summary,OPTIMIZE,2023-12-04T19:06:51Z,8412,17592,17592,[] diff --git a/tests/unit/data/delta_housekeeping/dhk_pandas_result.csv b/tests/unit/data/delta_housekeeping/dhk_pandas_result.csv new file mode 100644 index 0000000..2616a7c --- /dev/null +++ b/tests/unit/data/delta_housekeeping/dhk_pandas_result.csv @@ -0,0 +1,20 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error +lorenzorubi,default,housekeeping_summary_v3,1,3787,null,null,null,null,null,null,null,null,null +lorenzorubi,maxmind_geo,gold_ipv6,1,4907069,null,null,null,null,null,null,null,null,null +lorenzorubi,default,click_sales,6,326068799,null,null,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,null,null,null,null,null +lorenzorubi,default,housekeeping_summary,1,192917,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,null,null,192917,192917,192917,["a"],null +lorenzorubi,default,housekeeping_summary_v2,3,12326,2023-12-18T11:25:35Z,null,null,null,5273,5273,5273,[],null +lorenzorubi,maxmind_geo,raw_locations,1,6933,null,null,null,null,null,null,null,null,null +lorenzorubi,tpch,customer,1,61897021,null,null,null,null,null,null,null,null,null +lorenzorubi,tpch,nation,1,3007,null,null,null,null,null,null,null,null,null +lorenzorubi,maxmind_geo,raw_ipv6,1,1783720,null,null,null,null,null,null,null,null,null +lorenzorubi,maxmind_geo,gold_ipv4,1,7220024,null,null,null,null,null,null,null,null,null +lorenzorubi,dais_dlt_2023,enriched_orders,null,null,null,null,null,null,null,null,null,null,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`enriched_orders` does not support DESCRIBE DETAIL. ; line 2 pos 20 +lorenzorubi,default,click_sales_history,1,7710,null,null,null,null,null,null,null,null,null +lorenzorubi,tpch,orders,2406,317120666,null,null,null,null,null,null,null,null,null +lorenzorubi,default,complete_data,6,326060019,null,null,2023-12-06T16:40:36Z,2023-12-05T01:19:25Z,null,null,null,null,null +lorenzorubi,maxmind_geo,raw_ipv4,1,3115269,null,null,null,null,null,null,null,null,null +lorenzorubi,gcp_cost_analysis,sku_prices,1,835,null,null,null,null,null,null,null,null,null +lorenzorubi,dais_dlt_2023,daily_totalorders_by_nation,null,null,null,null,null,null,null,null,null,null,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`daily_totalorders_by_nation` does not support DESCRIBE DETAIL. ; line 2 pos 20 +lorenzorubi,gcp_cost_analysis,project_ids,2,1774,null,null,null,null,null,null,null,null,null +lorenzorubi,dais_dlt_2023,daily_2nd_high_orderprice,null,null,null,null,null,null,null,null,null,null,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`daily_2nd_high_orderprice` does not support DESCRIBE DETAIL. ; line 2 pos 20 diff --git a/tests/unit/data/delta_housekeeping/expected_do_not_need_optimize.csv b/tests/unit/data/delta_housekeeping/expected_do_not_need_optimize.csv new file mode 100644 index 0000000..ee17ad5 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_do_not_need_optimize.csv @@ -0,0 +1,3 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error,rec_optimize,rec_optimize_reason,rec_vacuum,rec_vacuum_reason,rec_misc,rec_misc_reason +lorenzorubi,default,housekeeping_summary,1.0,192917.0,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,,,192917.0,192917.0,192917.0,[],,True, | Tables that are not OPTIMIZED often enough | Tables that are OPTIMIZED too often | Tables that are too small to be OPTIMIZED,True,The table has never been VACUUM'ed | | ,False, | +lorenzorubi,default,housekeeping_summary_v2,3.0,12326.0,2023-12-18T11:25:35Z,,,,5273.0,5273.0,5273.0,[],,True, | Tables that are not OPTIMIZED often enough | | Tables that are too small to be OPTIMIZED,True,The table has never been VACUUM'ed | | ,True,Tables that need more analysis -small_files | diff --git a/tests/unit/data/delta_housekeeping/expected_need_analysis.csv b/tests/unit/data/delta_housekeeping/expected_need_analysis.csv new file mode 100644 index 0000000..2defd2a --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_need_analysis.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error,rec_optimize,rec_optimize_reason,rec_vacuum,rec_vacuum_reason,rec_misc,rec_misc_reason +lorenzorubi,default,housekeeping_summary_v2,3.0,12326.0,2023-12-18T11:25:35Z,,,,5273.0,5273.0,5273.0,[],,True, | Tables that are not OPTIMIZED often enough | | ,True,The table has never been VACUUM'ed | | ,True,Tables that need more analysis -small_files diff --git a/tests/unit/data/delta_housekeeping/expected_need_optimize.csv b/tests/unit/data/delta_housekeeping/expected_need_optimize.csv new file mode 100644 index 0000000..9766cf0 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_need_optimize.csv @@ -0,0 +1,4 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error,rec_optimize,rec_optimize_reason +lorenzorubi,default,click_sales,6.0,326068799.0,,,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,,,,,,True,The table has not been OPTIMIZED and would benefit from it +lorenzorubi,tpch,orders,2406.0,317120666.0,,,,,,,,,,True,The table has not been OPTIMIZED and would benefit from it +lorenzorubi,default,complete_data,6.0,326060019.0,,,2023-12-06T16:40:36Z,2023-12-05T01:19:25Z,,,,,,True,The table has not been OPTIMIZED and would benefit from it diff --git a/tests/unit/data/delta_housekeeping/expected_need_vacuum.csv b/tests/unit/data/delta_housekeeping/expected_need_vacuum.csv new file mode 100644 index 0000000..27c04ab --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_need_vacuum.csv @@ -0,0 +1,18 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error,rec_optimize,rec_optimize_reason,rec_vacuum,rec_vacuum_reason +lorenzorubi,default,housekeeping_summary_v3,1.0,3787.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,maxmind_geo,gold_ipv6,1.0,4907069.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,default,housekeeping_summary,1.0,192917.0,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,,,192917.0,192917.0,192917.0,[],,False,,True,The table has never been VACUUM'ed +lorenzorubi,default,housekeeping_summary_v2,3.0,12326.0,2023-12-18T11:25:35Z,,,,5273.0,5273.0,5273.0,[],,False,,True,The table has never been VACUUM'ed +lorenzorubi,maxmind_geo,raw_locations,1.0,6933.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,tpch,customer,1.0,61897021.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,tpch,nation,1.0,3007.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,maxmind_geo,raw_ipv6,1.0,1783720.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,maxmind_geo,gold_ipv4,1.0,7220024.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,dais_dlt_2023,enriched_orders,,,,,,,,,,,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`enriched_orders` does not support DESCRIBE DETAIL. ; line 2 pos 20,False,,True,The table has never been VACUUM'ed +lorenzorubi,default,click_sales_history,1.0,7710.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,tpch,orders,2406.0,317120666.0,,,,,,,,,,True,The table has not been OPTIMIZED and would benefit from it,True,The table has never been VACUUM'ed +lorenzorubi,maxmind_geo,raw_ipv4,1.0,3115269.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,gcp_cost_analysis,sku_prices,1.0,835.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,dais_dlt_2023,daily_totalorders_by_nation,,,,,,,,,,,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`daily_totalorders_by_nation` does not support DESCRIBE DETAIL. ; line 2 pos 20,False,,True,The table has never been VACUUM'ed +lorenzorubi,gcp_cost_analysis,project_ids,2.0,1774.0,,,,,,,,,,False,,True,The table has never been VACUUM'ed +lorenzorubi,dais_dlt_2023,daily_2nd_high_orderprice,,,,,,,,,,,[UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION] The view `lorenzorubi`.`dais_dlt_2023`.`daily_2nd_high_orderprice` does not support DESCRIBE DETAIL. ; line 2 pos 20,False,,True,The table has never been VACUUM'ed diff --git a/tests/unit/data/delta_housekeeping/expected_not_optimized_last_days.csv b/tests/unit/data/delta_housekeeping/expected_not_optimized_last_days.csv new file mode 100644 index 0000000..f6b4724 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_not_optimized_last_days.csv @@ -0,0 +1,3 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error,rec_optimize,rec_optimize_reason,rec_vacuum,rec_vacuum_reason +lorenzorubi,default,housekeeping_summary,1.0,192917.0,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,,,192917.0,192917.0,192917.0,[],,True, | Tables that are not OPTIMIZED often enough,True,The table has never been VACUUM'ed +lorenzorubi,default,housekeeping_summary_v2,3.0,12326.0,2023-12-18T11:25:35Z,,,,5273.0,5273.0,5273.0,[],,True, | Tables that are not OPTIMIZED often enough,True,The table has never been VACUUM'ed diff --git a/tests/unit/data/delta_housekeeping/expected_not_vacuumed_last_days.csv b/tests/unit/data/delta_housekeeping/expected_not_vacuumed_last_days.csv new file mode 100644 index 0000000..fa34f67 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_not_vacuumed_last_days.csv @@ -0,0 +1,3 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error,rec_optimize,rec_optimize_reason,rec_vacuum,rec_vacuum_reason +lorenzorubi,default,click_sales,6.0,326068799.0,,,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,,,,,,True,The table has not been OPTIMIZED and would benefit from it | ,True, | Tables that are not VACUUM'ed often enough +lorenzorubi,default,complete_data,6.0,326060019.0,,,2023-12-06T16:40:36Z,2023-12-05T01:19:25Z,,,,,,True,The table has not been OPTIMIZED and would benefit from it | ,True, | Tables that are not VACUUM'ed often enough diff --git a/tests/unit/data/delta_housekeeping/expected_optimized_too_freq.csv b/tests/unit/data/delta_housekeeping/expected_optimized_too_freq.csv new file mode 100644 index 0000000..656b2e0 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_optimized_too_freq.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error,rec_optimize,rec_optimize_reason,rec_vacuum,rec_vacuum_reason,rec_misc,rec_misc_reason +lorenzorubi,default,housekeeping_summary,1.0,192917.0,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,,,192917.0,192917.0,192917.0,[],,True, | Tables that are not OPTIMIZED often enough | Tables that are OPTIMIZED too often | ,True,The table has never been VACUUM'ed | | ,False, | diff --git a/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv b/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv new file mode 100644 index 0000000..569c1e8 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_pdh_click_sales.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,click_sales,6,326068799,,,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,,,, diff --git a/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv b/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv new file mode 100644 index 0000000..af564ba --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_pdh_housekeeping_summary.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by +lorenzorubi,default,housekeeping_summary,1,192917,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,,,192917,192917,192917,[] diff --git a/tests/unit/data/delta_housekeeping/expected_vacuumed_too_freq.csv b/tests/unit/data/delta_housekeeping/expected_vacuumed_too_freq.csv new file mode 100644 index 0000000..ac87c76 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_vacuumed_too_freq.csv @@ -0,0 +1,3 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error,rec_optimize,rec_optimize_reason,rec_vacuum,rec_vacuum_reason,rec_misc,rec_misc_reason +lorenzorubi,default,click_sales,6.0,326068799.0,,,2023-12-06T16:40:28Z,2023-12-05T01:19:47Z,,,,,,True,The table has not been OPTIMIZED and would benefit from it | | | ,True, | Tables that are not VACUUM'ed often enough | Tables that are VACUUM'ed too often,False, | +lorenzorubi,default,complete_data,6.0,326060019.0,,,2023-12-06T16:40:36Z,2023-12-05T01:19:25Z,,,,,,True,The table has not been OPTIMIZED and would benefit from it | | | ,True, | Tables that are not VACUUM'ed often enough | Tables that are VACUUM'ed too often,False, | diff --git a/tests/unit/data/delta_housekeeping/expected_zorder_not_effective.csv b/tests/unit/data/delta_housekeeping/expected_zorder_not_effective.csv new file mode 100644 index 0000000..cd0da45 --- /dev/null +++ b/tests/unit/data/delta_housekeeping/expected_zorder_not_effective.csv @@ -0,0 +1,2 @@ +catalog,database,tableName,number_of_files,bytes,max_optimize_timestamp,2nd_optimize_timestamp,max_vacuum_timestamp,2nd_vacuum_timestamp,min_file_size,p50_file_size,max_file_size,z_order_by,error,rec_optimize,rec_optimize_reason,rec_vacuum,rec_vacuum_reason,rec_misc,rec_misc_reason +lorenzorubi,default,housekeeping_summary,1.0,192917.0,2023-12-05T05:50:14Z,2023-12-05T05:21:22Z,,,192917.0,192917.0,192917.0,"[""a""]",,True, | Tables that are not OPTIMIZED often enough | Tables that are OPTIMIZED too often | Tables that are too small to be OPTIMIZED,True,The table has never been VACUUM'ed | | ,True, | Tables for which ZORDER is not being effective diff --git a/tests/unit/delta_housekeeping_actions_test.py b/tests/unit/delta_housekeeping_actions_test.py new file mode 100644 index 0000000..0ab54fc --- /dev/null +++ b/tests/unit/delta_housekeeping_actions_test.py @@ -0,0 +1,316 @@ +import pytest +import pandas as pd +import datetime +import discoverx.delta_housekeeping as mut +from discoverx.delta_housekeeping import DeltaHousekeepingActions +from pathlib import Path + + +def _resolve_file_path(request, relative_path): + module_path = Path(request.module.__file__) + test_file_path = module_path.parent / relative_path + return pd.read_csv(str(test_file_path.resolve())) + + +@pytest.fixture() +def housekeeping_stats(request): + return _resolve_file_path(request, "data/delta_housekeeping/dhk_pandas_result.csv") + + +@pytest.fixture() +def expected_need_optimize(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_need_optimize.csv") + + +@pytest.fixture() +def expected_need_vacuum(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_need_vacuum.csv") + + +@pytest.fixture() +def expected_not_optimized_last_days(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_not_optimized_last_days.csv") + + +@pytest.fixture() +def expected_not_vacuumed_last_days(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_not_vacuumed_last_days.csv") + + +@pytest.fixture() +def expected_need_analysis(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_need_analysis.csv") + + +@pytest.fixture() +def expected_optimized_too_freq(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_optimized_too_freq.csv") + + +@pytest.fixture() +def expected_vacuumed_too_freq(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_vacuumed_too_freq.csv") + + +@pytest.fixture() +def expected_do_not_need_optimize(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_do_not_need_optimize.csv") + + +@pytest.fixture() +def expected_zorder_not_effective(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_zorder_not_effective.csv") + + +@pytest.fixture() +def patch_datetime_now(monkeypatch): + class mydatetime: + @classmethod + def now(cls, tzinfo): + return datetime.datetime(2024, 1, 28, 12, 0, 0).replace(tzinfo=tzinfo) + + @classmethod + def today(cls): + return datetime.datetime(2024, 1, 28, 12, 0, 0) + + monkeypatch.setattr(mut, 'datetime', mydatetime) + + +def test_apply_need_optimize(housekeeping_stats, expected_need_optimize, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["not_optimized"]["col_name"] + need_optimize_df = res.loc[res[col_name], :] + assert need_optimize_df.shape[0] == 3 + pd.testing.assert_frame_equal( + need_optimize_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_need_optimize.loc[:, ["catalog", "database", "tableName"]], + ) + + +def test_empty_apply_need_optimize(housekeeping_stats, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + min_table_size_optimize=1024*1024*1024*1024 + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["not_optimized"]["col_name"] + need_optimize_df = res.loc[res[col_name], :] + assert need_optimize_df.shape[0] == 0 + + +def test_apply_need_vacuum(housekeeping_stats, expected_need_vacuum, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["not_vacuumed"]["col_name"] + need_vacuum_df = res.loc[res[col_name], :] + assert need_vacuum_df.shape[0] == 17 + pd.testing.assert_frame_equal( + need_vacuum_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_need_vacuum.loc[:, ["catalog", "database", "tableName"]], + ) + + +def test_apply_not_optimized_last_days(housekeeping_stats, expected_not_optimized_last_days, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["not_optimized_last_days"]["col_name"] + need_optimize_df = res.loc[res[col_name], :] + assert need_optimize_df.shape[0] == 2 + pd.testing.assert_frame_equal( + need_optimize_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_not_optimized_last_days.loc[:, ["catalog", "database", "tableName"]], + ) + + +def test_empty_apply_not_optimized_last_days(housekeeping_stats, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + min_days_not_optimized=1e60 + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["not_optimized_last_days"]["col_name"] + need_optimize_df = res.loc[res[col_name], :] + assert need_optimize_df.shape[0] == 0 + + +def test_apply_not_vacuumed_last_days(housekeeping_stats, expected_not_vacuumed_last_days, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["not_vacuumed_last_days"]["col_name"] + need_vacuum_df = res.loc[res[col_name], :] + assert need_vacuum_df.shape[0] == 2 + pd.testing.assert_frame_equal( + need_vacuum_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_not_vacuumed_last_days.loc[:, ["catalog", "database", "tableName"]], + ) + + +def test_empty_apply_not_vacuumed_last_days(housekeeping_stats, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + min_days_not_vacuumed=1e60 + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["not_vacuumed_last_days"]["col_name"] + need_vacuum_df = res.loc[res[col_name], :] + assert need_vacuum_df.shape[0] == 0 + + +def test_apply_optimized_too_freq(housekeeping_stats, expected_optimized_too_freq, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["optimized_too_freq"]["col_name"] + need_optimize_df = res.loc[res[col_name], :] + assert need_optimize_df.shape[0] == 1 + pd.testing.assert_frame_equal( + need_optimize_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_optimized_too_freq.loc[:, ["catalog", "database", "tableName"]], + ) + + +def test_empty_apply_optimized_too_freq(housekeeping_stats, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + max_optimize_freq=0 + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["optimized_too_freq"]["col_name"] + need_optimize_df = res.loc[res[col_name], :] + assert need_optimize_df.shape[0] == 0 + + +def test_apply_vacuumed_too_freq(housekeeping_stats, expected_vacuumed_too_freq, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["vacuumed_too_freq"]["col_name"] + need_vacuum_df = res.loc[res[col_name], :] + assert need_vacuum_df.shape[0] == 2 + pd.testing.assert_frame_equal( + need_vacuum_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_vacuumed_too_freq.loc[:, ["catalog", "database", "tableName"]], + ) + + +def test_empty_apply_vacuumed_too_freq(housekeeping_stats, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + max_vacuum_freq=0 + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["vacuumed_too_freq"]["col_name"] + need_vacuum_df = res.loc[res[col_name], :] + assert need_vacuum_df.shape[0] == 0 + + +def test_apply_do_not_need_optimize(housekeeping_stats, expected_do_not_need_optimize, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["do_not_need_optimize"]["col_name"] + need_optimize_df = res.loc[res[col_name], :] + assert need_optimize_df.shape[0] == 2 + pd.testing.assert_frame_equal( + need_optimize_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_do_not_need_optimize.loc[:, ["catalog", "database", "tableName"]], + ) + + +def test_empty_apply_do_not_need_optimize(housekeeping_stats, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + min_table_size_optimize=0 + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["do_not_need_optimize"]["col_name"] + need_optimize_df = res.loc[res[col_name], :] + assert need_optimize_df.shape[0] == 0 + + +def test_apply_analyze_tables(housekeeping_stats, expected_need_analysis, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["to_analyze"]["col_name"] + need_analysis_df = res.loc[res[col_name], :] + assert need_analysis_df.shape[0] == 1 + pd.testing.assert_frame_equal( + need_analysis_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_need_analysis.loc[:, ["catalog", "database", "tableName"]], + ) + + +def test_empty_apply_analyze_tables(housekeeping_stats, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + small_file_threshold=0 + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["to_analyze"]["col_name"] + need_analysis_df = res.loc[res[col_name], :] + assert need_analysis_df.shape[0] == 0 + + +def test_apply_zorder_not_effective(housekeeping_stats, expected_zorder_not_effective, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["zorder_not_effective"]["col_name"] + need_analysis_df = res.loc[res[col_name], :] + assert need_analysis_df.shape[0] == 1 + pd.testing.assert_frame_equal( + need_analysis_df.reset_index().loc[:, ["catalog", "database", "tableName"]], + expected_zorder_not_effective.loc[:, ["catalog", "database", "tableName"]], + ) + + +def test_empty_apply_zorder_not_effective(housekeeping_stats, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + min_number_of_files_for_zorder=0 + ) + res = dha.generate_recommendations().toPandas() + col_name = dha.recomendations_dict["zorder_not_effective"]["col_name"] + need_analysis_df = res.loc[res[col_name], :] + assert need_analysis_df.shape[0] == 0 + + +def test_explain(housekeeping_stats, patch_datetime_now): + dha = DeltaHousekeepingActions( + None, + stats=housekeeping_stats, + ) + res = dha._explain() + assert len(res) == 9 diff --git a/tests/unit/delta_housekeeping_test.py b/tests/unit/delta_housekeeping_test.py new file mode 100644 index 0000000..63aea52 --- /dev/null +++ b/tests/unit/delta_housekeeping_test.py @@ -0,0 +1,98 @@ +import pytest +import pandas as pd +from discoverx.delta_housekeeping import DeltaHousekeeping +from pathlib import Path +import pyspark.sql.functions as F + + +def _resolve_file_path(request, relative_path): + module_path = Path(request.module.__file__) + test_file_path = module_path.parent / relative_path + return pd.read_csv( + str(test_file_path.resolve()), + dtype={ + "max_optimize_timestamp": "str", + "2nd_optimize_timestamp": "str", + "max_vacuum_timestamp": "str", + "2nd_vacuum_timestamp": "str", + } + ) + + +@pytest.fixture() +def dh_click_sales(request): + return _resolve_file_path(request, "data/delta_housekeeping/dh_click_sales.csv") + + +@pytest.fixture() +def dd_click_sales(request): + return _resolve_file_path(request, "data/delta_housekeeping/dd_click_sales.csv") + + +@pytest.fixture() +def expected_pdh_click_sales(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_pdh_click_sales.csv") + + +@pytest.fixture() +def dh_housekeeping_summary(request): + return _resolve_file_path(request, "data/delta_housekeeping/dh_housekeeping_summary.csv") + + +@pytest.fixture() +def dd_housekeeping_summary(request): + return _resolve_file_path(request, "data/delta_housekeeping/dd_housekeeping_summary.csv") + + +@pytest.fixture() +def expected_pdh_housekeeping_summary(request): + return _resolve_file_path(request, "data/delta_housekeeping/expected_pdh_housekeeping_summary.csv") + + +def test_process_describe_history_no_optimize(spark, dh_click_sales, dd_click_sales, expected_pdh_click_sales): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_click_sales) + describe_history_df = spark.createDataFrame(dh_click_sales) + out = dh._process_describe_history(describe_detail_df, describe_history_df) + pd.testing.assert_frame_equal( + out.reset_index(), + expected_pdh_click_sales.reset_index(), + ) + + +def test_process_describe_history_no_vacuum( + spark, dh_housekeeping_summary, dd_housekeeping_summary, expected_pdh_housekeeping_summary +): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_housekeeping_summary) + describe_history_df = spark.createDataFrame(dh_housekeeping_summary) + out = dh._process_describe_history(describe_detail_df, describe_history_df) + pd.testing.assert_frame_equal( + out.reset_index(), + expected_pdh_housekeeping_summary.reset_index(), + ) + + +def test_process_describe_history_no_operation(spark, dd_click_sales): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_click_sales) + describe_history_df = spark.createDataFrame([], "string") + out = dh._process_describe_history(describe_detail_df, describe_history_df) + # output should be equal to DESCRIBE DETAIL + pd.testing.assert_frame_equal( + out.reset_index(), + dd_click_sales.reset_index(), + ) + + +def test_process_describe_history_empty_history(spark, dd_click_sales, dh_click_sales): + dh = DeltaHousekeeping(spark) + describe_detail_df = spark.createDataFrame(dd_click_sales) + describe_history_df = spark.createDataFrame(dh_click_sales) + describe_history_df = describe_history_df.withColumn("operation", F.lit("NOOP")) + out = dh._process_describe_history(describe_detail_df, describe_history_df) + # output should be equal to DESCRIBE DETAIL + pd.testing.assert_frame_equal( + out.reset_index(), + dd_click_sales.reset_index(), + ) \ No newline at end of file diff --git a/tests/unit/explorer_test.py b/tests/unit/explorer_test.py index ac6d518..a59f361 100644 --- a/tests/unit/explorer_test.py +++ b/tests/unit/explorer_test.py @@ -1,9 +1,8 @@ +import pandas import pytest from discoverx.explorer import DataExplorer, DataExplorerActions, InfoFetcher, TableInfo -# # Sample test data -# sample_table_info = TableInfo("catalog1", "schema1", "table1", []) @pytest.fixture() def info_fetcher(spark): return InfoFetcher(spark=spark, information_schema="default") @@ -71,8 +70,8 @@ def test_map(spark, info_fetcher): assert len(result) == 1 assert result[0].table == "tb_1" assert result[0].schema == "default" - assert result[0].catalog == None - assert result[0].tags == None + assert result[0].catalog is None + assert result[0].tags is None def test_map_with_tags(spark, info_fetcher): @@ -81,7 +80,7 @@ def test_map_with_tags(spark, info_fetcher): assert len(result) == 1 assert result[0].table == "tb_1" assert result[0].schema == "default" - assert result[0].catalog == None + assert result[0].catalog is None assert len(result[0].tags.table_tags) == 1 @@ -93,7 +92,7 @@ def test_map_with_source_data_formats(spark, info_fetcher): assert len(result) == 1 assert result[0].table == "tb_1" assert result[0].schema == "default" - assert result[0].catalog == None + assert result[0].catalog is None data_explorer = DataExplorer("*.default.tb_1", spark, info_fetcher).with_data_source_formats( data_source_formats=["CSV"] @@ -106,3 +105,14 @@ def test_no_tables_matching_filter(spark, info_fetcher): data_explorer = DataExplorer("some_catalog.default.non_existent_table", spark, info_fetcher) with pytest.raises(ValueError): data_explorer.map(lambda table_info: table_info) + + +def test_delta_housekeeeping_call(spark, info_fetcher): + data_explorer = DataExplorer("*.default.*", spark, info_fetcher) + result: pandas.DataFrame = data_explorer.delta_housekeeping()._stats_df.toPandas() + print(result['tableName'].count()) + assert result['tableName'].count() == 3 + for res in result['tableName'].tolist(): + assert res in ["tb_all_types", "tb_1", "tb_2"] + for col in result.columns: + assert col in ["catalog", "database", "tableName", "error"]