diff --git a/README.md b/README.md index efc6ef3..fe6d0a4 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,8 @@ pip install -U sqlplate ## :fork_and_knife: Usage +### Generate SQL template + Start passing option parameters before generate the Delta ETL SQL statement that will use on the Azure Databricks service. @@ -67,7 +69,7 @@ print(statement.strip().strip('\n')) The result SQL statement: -```text +```sql MERGE INTO catalog-name.schema-name.table-name AS target USING ( WITH change_query AS ( @@ -108,6 +110,55 @@ WHEN NOT MATCHED THEN INSERT ; ``` +### Data Quality + +This package handle generate SQL statement only. For a data quality part, you can +use the quality template. + +> [!IMPORTANT] +> This feature does not support yet!!! + +```python +from sqlplate import SQLPlate + +statement: str = ( + SQLPlate.format('databricks') + .template('quality.check') + .option('catalog', 'catalog-name') + .option('schema', 'schema-name') + .option('table', 'table-name') + .option('filter', "load_date >= to_timestamp('20250201', 'yyyyMMdd')") + .option('unique', ['pk_col']) + .option('notnull', ['col01', 'col02']) + .option("contain", [("col01", ["A", "B", "C"])]) + .option("validate", [("col03", "> 10000")]) + .load() +) +print(statement.strip().strip('\n')) +``` + +The result SQL statement: + +```sql +WITH source AS ( + SELECT + * + FROM + catalog-name.schema-name.table-name + WHERE load_date >= to_timestamp('20250201', 'yyyyMMdd') +) +, records AS ( + SELECT COUNT(1) AS table_records + FROM source +) +SELECT + (SELECT table_records FROM records) AS table_records + , ((SELECT COUNT( DISTINCT pk_col ) FROM source) = (SELECT table_records FROM records)) AS unique_pk_col + , (SELECT COUNT(1) FROM source WHERE pk_col IS NULL) = 0 AS notnull_pk_col + , (SELECT COUNT(1) FROM source WHERE col01 NOT IN ['A', 'B', 'C']) = 0 AS contain_col01 + , ((SELECT COUNT(1) FROM source WHERE col03 > 10000) = (SELECT table_records FROM records)) AS validate_col03 +``` + ## :chains: Support Systems | System | Progress Status | System Integration Test | Remark | diff --git a/src/sqlplate/sqlity.py b/src/sqlplate/sqlity.py deleted file mode 100644 index 76d6ad7..0000000 --- a/src/sqlplate/sqlity.py +++ /dev/null @@ -1,10 +0,0 @@ -# ------------------------------------------------------------------------------ -# Copyright (c) 2022 Korawich Anuttra. All rights reserved. -# Licensed under the MIT License. See LICENSE in the project root for -# license information. -# ------------------------------------------------------------------------------ -from __future__ import annotations - - -class SQLity: - """A SQLity object for render data quality report by Jinja template.""" diff --git a/src/sqlplate/sqlplate.py b/src/sqlplate/sqlplate.py index 9ab5f33..b2f3565 100644 --- a/src/sqlplate/sqlplate.py +++ b/src/sqlplate/sqlplate.py @@ -6,7 +6,7 @@ from __future__ import annotations from pathlib import Path -from typing import Any, Iterator, Optional, Callable +from typing import Any, Iterator, Optional, Callable, Literal from jinja2 import Template @@ -83,6 +83,9 @@ def template(self, name: str) -> 'SQLPlate': ) return self + def quality(self, mode: Literal["pushdown", "memory"]) -> 'SQLPlate': + return self + def option(self, key: str, value: Any) -> 'SQLPlate': """Pass an option key-value pair before generate template.""" self._option[key] = value diff --git a/templates/databricks/quality.check.sql b/templates/databricks/quality.check.sql new file mode 100644 index 0000000..37de899 --- /dev/null +++ b/templates/databricks/quality.check.sql @@ -0,0 +1,37 @@ +{% extends "base.jinja" %} + +{% block statement %} +WITH source AS ( + SELECT + * + FROM + {{ catalog }}.{{ schema }}.{{ table }} + {%+ if filter %}WHERE {{ filter }}{% endif +%} +) +, records AS ( + SELECT COUNT(1) AS table_records + FROM source +) +SELECT + (SELECT table_records FROM records) AS table_records + {%+ if unique -%} + {%- for col in unique -%} + , ((SELECT COUNT( DISTINCT {{ col }} ) FROM source) = (SELECT table_records FROM records)) AS unique_{{ col }} + {%- endfor -%} + {%- endif +%} + {%+ if notnull -%} + {%- for col in unique -%} + , (SELECT COUNT(1) FROM source WHERE {{ col }} IS NULL) = 0 AS notnull_{{ col }} + {%- endfor -%} + {%- endif +%} + {%+ if contain -%} + {%- for col in contain -%} + , (SELECT COUNT(1) FROM source WHERE {{ col[0] }} NOT IN {{ col[1] }}) = 0 AS contain_{{ col[0] }} + {%- endfor -%} + {%- endif +%} + {%+ if contain -%} + {%- for col in validate -%} + , ((SELECT COUNT(1) FROM source WHERE {{ col[0] }} {{ col[1] }}) = (SELECT table_records FROM records)) AS validate_{{ col[0] }} + {%- endfor -%} + {%- endif +%} +{% endblock statement %} diff --git a/templates/databricks/quality.metrix.sql b/templates/databricks/quality.metrix.sql new file mode 100644 index 0000000..5a21b34 --- /dev/null +++ b/templates/databricks/quality.metrix.sql @@ -0,0 +1,11 @@ +{% extends "base.jinja" %} + +{% block statement %} +WITH source AS ( + SELECT + * + FROM {{ catalog }}.{{ schema }}.{{ table }} + {%+ if filter %}WHERE {{ filter }}{% endif +%} +) +SELECT +{% endblock statement %} diff --git a/tests/test_databricks.py b/tests/test_databricks.py index d1c1ee1..41aec71 100644 --- a/tests/test_databricks.py +++ b/tests/test_databricks.py @@ -277,3 +277,40 @@ def test_sql_full_dump(template_path): FROM ( SELECT * FROM catalog-name.schema-name.source-name ) AS sub_query ; """).strip('\n') + + +def test_quality_check(template_path): + statement: SQLPlate = ( + SQLPlate.format('databricks', path=template_path) + .template('quality.check') + .option('catalog', 'catalog-name') + .option('schema', 'schema-name') + .option('table', 'table-name') + .option('filter', "load_date >= to_timestamp('20250201', 'yyyyMMdd')") + .option('unique', ['pk_col']) + .option('notnull', ['col01', 'col02']) + .option( + "contain", + [("col01", ["A", "B", "C"])], + ) + .option( + "validate", + [("col03", "> 10000")], + ) + .load() + ) + print(statement) + + +def test_quality_metrix(template_path): + statement: SQLPlate = ( + SQLPlate.format('databricks', path=template_path) + .template('quality.metrix') + .option('catalog', 'catalog-name') + .option('schema', 'schema-name') + .option('table', 'table-name') + .option('filter', "load_date >= to_timestamp('20250201', 'yyyyMMdd')") + .option("metrix", ["col1", "col2", "col3"]) + .load() + ) + print(statement)