Skip to content

Commit 1dbbad9

Browse files
authored
Feat/csv expectations (#478)
* add new CSV checkpoint for generic use on a csv * alter how csv tests run * read as all strinngs from csv for some operations * add extra test * make a generic functionn for building the read csv string * fix od operation tests * add new test for overlapping ranges * update to BIGINT * add extra BIGINT ttest cases
1 parent 57fbacd commit 1dbbad9

12 files changed

Lines changed: 583 additions & 2 deletions

File tree

digital_land/cli.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,49 @@ def expectations_run_dataset_checkpoint(
462462
run_dataset_checkpoint(dataset, file_path, output_dir, config, organisations)
463463

464464

465+
@cli.command(
466+
"expectations-csv-checkpoint",
467+
short_help="runs data quality expectations against a CSV file using duckdb",
468+
)
469+
@click.option(
470+
"--dataset",
471+
type=click.STRING,
472+
help="the dataset name for logging purposes",
473+
required=True,
474+
)
475+
@click.option(
476+
"--file-path",
477+
type=click.Path(),
478+
help="path to the CSV file to run expectations against",
479+
required=True,
480+
)
481+
@click.option(
482+
"--log-dir",
483+
type=click.Path(),
484+
help="directory to store expectation logs",
485+
required=True,
486+
)
487+
@click.option(
488+
"--rules",
489+
type=click.STRING,
490+
help="JSON string containing the list of expectation rules",
491+
required=True,
492+
)
493+
def expectations_run_csv_checkpoint(
494+
dataset,
495+
file_path,
496+
log_dir,
497+
rules,
498+
):
499+
import json
500+
501+
from digital_land.expectations.commands import run_csv_checkpoint
502+
503+
output_dir = Path(log_dir) / "expectation"
504+
parsed_rules = json.loads(rules)
505+
run_csv_checkpoint(dataset, file_path, output_dir, parsed_rules)
506+
507+
465508
@cli.command("retire-endpoints-and-sources")
466509
@config_collections_dir
467510
@click.argument("csv-path", nargs=1, type=click.Path())
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import json
2+
import duckdb
3+
from pathlib import Path
4+
5+
from .base import BaseCheckpoint
6+
from ..log import ExpectationLog
7+
from ..operations.csv import (
8+
count_rows,
9+
check_unique,
10+
check_no_shared_values,
11+
check_no_overlapping_ranges,
12+
)
13+
14+
15+
class CsvCheckpoint(BaseCheckpoint):
16+
def __init__(self, dataset, file_path):
17+
self.dataset = dataset
18+
self.file_path = Path(file_path)
19+
self.log = ExpectationLog(dataset=dataset)
20+
21+
def operation_factory(self, operation_string: str):
22+
operation_map = {
23+
"count_rows": count_rows,
24+
"check_unique": check_unique,
25+
"check_no_shared_values": check_no_shared_values,
26+
"check_no_overlapping_ranges": check_no_overlapping_ranges,
27+
}
28+
if operation_string not in operation_map:
29+
raise ValueError(
30+
f"Unknown operation: '{operation_string}'. Must be one of {list(operation_map.keys())}."
31+
)
32+
return operation_map[operation_string]
33+
34+
def load(self, rules):
35+
self.expectations = []
36+
for rule in rules:
37+
expectation = {
38+
"operation": self.operation_factory(rule["operation"]),
39+
"name": rule["name"],
40+
"description": rule.get("description", ""),
41+
"dataset": self.dataset,
42+
"severity": rule.get("severity", ""),
43+
"responsibility": rule.get("responsibility", ""),
44+
"parameters": (
45+
json.loads(rule["parameters"])
46+
if isinstance(rule["parameters"], str)
47+
else rule["parameters"]
48+
),
49+
}
50+
self.expectations.append(expectation)
51+
52+
def run_expectation(self, conn, expectation) -> tuple:
53+
params = expectation["parameters"]
54+
passed, msg, details = expectation["operation"](
55+
conn=conn, file_path=self.file_path, **params
56+
)
57+
return passed, msg, details
58+
59+
def run(self):
60+
with duckdb.connect() as conn:
61+
for expectation in self.expectations:
62+
passed, message, details = self.run_expectation(conn, expectation)
63+
self.log.add(
64+
{
65+
"organisation": "",
66+
"name": expectation["name"],
67+
"passed": passed,
68+
"message": message,
69+
"details": details,
70+
"description": expectation["description"],
71+
"severity": expectation["severity"],
72+
"responsibility": expectation["responsibility"],
73+
"operation": expectation["operation"].__name__,
74+
"parameters": expectation["parameters"],
75+
}
76+
)
77+
78+
def save(self, output_dir: Path):
79+
self.log.save_parquet(output_dir)

digital_land/expectations/checkpoints/dataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from .base import BaseCheckpoint
99
from ..log import ExpectationLog
10-
from ..operation import (
10+
from ..operations.dataset import (
1111
count_lpa_boundary,
1212
count_deleted_entities,
1313
duplicate_geometry_check,

digital_land/expectations/commands.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .checkpoints.dataset import DatasetCheckpoint
2+
from .checkpoints.csv import CsvCheckpoint
23

34
from digital_land.configuration.main import Config
45
from digital_land.organisation import Organisation
@@ -29,3 +30,18 @@ def run_dataset_checkpoint(
2930
# TODO add failure on critical error back in
3031
if act_on_critical_error:
3132
checkpoint.act_on_critical_error()
33+
34+
35+
def run_csv_checkpoint(
36+
dataset,
37+
file_path,
38+
output_dir,
39+
rules,
40+
):
41+
"""
42+
Run expectation rules against a CSV file using duckdb.
43+
"""
44+
checkpoint = CsvCheckpoint(dataset, file_path)
45+
checkpoint.load(rules)
46+
checkpoint.run()
47+
checkpoint.save(output_dir)
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from .dataset import ( # noqa: F401
2+
count_lpa_boundary,
3+
count_deleted_entities,
4+
check_columns,
5+
duplicate_geometry_check,
6+
)
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
from pathlib import Path
2+
3+
4+
def _read_csv(file_path: Path) -> str:
5+
return f"read_csv_auto('{str(file_path)}',all_varchar=true,delim=',',quote='\"',escape='\"')"
6+
7+
8+
def count_rows(
9+
conn, file_path: Path, expected: int, comparison_rule: str = "greater_than"
10+
):
11+
"""
12+
Counts the number of rows in the CSV and compares against an expected value.
13+
14+
Args:
15+
conn: duckdb connection
16+
file_path: path to the CSV file
17+
expected: the expected row count
18+
comparison_rule: how to compare actual vs expected
19+
"""
20+
result = conn.execute(f"SELECT COUNT(*) FROM {_read_csv(file_path)}").fetchone()
21+
actual = result[0]
22+
23+
comparison_rules = {
24+
"equals_to": actual == expected,
25+
"not_equal_to": actual != expected,
26+
"greater_than": actual > expected,
27+
"greater_than_or_equal_to": actual >= expected,
28+
"less_than": actual < expected,
29+
"less_than_or_equal_to": actual <= expected,
30+
}
31+
32+
if comparison_rule not in comparison_rules:
33+
raise ValueError(
34+
f"Invalid comparison_rule: '{comparison_rule}'. Must be one of {list(comparison_rules.keys())}."
35+
)
36+
37+
passed = comparison_rules[comparison_rule]
38+
message = f"there were {actual} rows found"
39+
details = {
40+
"actual": actual,
41+
"expected": expected,
42+
}
43+
44+
return passed, message, details
45+
46+
47+
def check_unique(conn, file_path: Path, field: str):
48+
"""
49+
Checks that all values in a given field are unique.
50+
51+
Args:
52+
conn: duckdb connection
53+
file_path: path to the CSV file
54+
field: the column name to check for uniqueness
55+
"""
56+
result = conn.execute(
57+
f'SELECT "{field}", COUNT(*) as cnt FROM {_read_csv(file_path)} GROUP BY "{field}" HAVING cnt > 1'
58+
).fetchall()
59+
60+
duplicates = [{"value": row[0], "count": row[1]} for row in result]
61+
62+
if len(duplicates) == 0:
63+
passed = True
64+
message = f"all values in '{field}' are unique"
65+
else:
66+
passed = False
67+
message = f"there were {len(duplicates)} duplicate values in '{field}'"
68+
69+
details = {
70+
"field": field,
71+
"duplicates": duplicates,
72+
}
73+
74+
return passed, message, details
75+
76+
77+
def check_no_shared_values(conn, file_path: Path, field_1: str, field_2: str):
78+
"""
79+
Checks that no value appears in both field_1 and field_2.
80+
81+
Args:
82+
conn: duckdb connection
83+
file_path: path to the CSV file
84+
field_1: the first column name
85+
field_2: the second column name
86+
"""
87+
result = conn.execute(
88+
f"""
89+
SELECT DISTINCT a."{field_1}" as value
90+
FROM {_read_csv(file_path)} a
91+
WHERE a."{field_1}" IN (SELECT "{field_2}" FROM {_read_csv(file_path)})
92+
AND a."{field_1}" IS NOT NULL AND a."{field_1}" != ''
93+
"""
94+
).fetchall()
95+
96+
shared_values = [row[0] for row in result]
97+
98+
if len(shared_values) == 0:
99+
passed = True
100+
message = f"no shared values between '{field_1}' and '{field_2}'"
101+
else:
102+
passed = False
103+
message = f"there were {len(shared_values)} shared values between '{field_1}' and '{field_2}'"
104+
105+
details = {
106+
"field_1": field_1,
107+
"field_2": field_2,
108+
"shared_values": shared_values,
109+
}
110+
111+
return passed, message, details
112+
113+
114+
def check_no_overlapping_ranges(conn, file_path: Path, min_field: str, max_field: str):
115+
"""
116+
Checks that no ranges overlap between rows.
117+
118+
Two ranges [a_min, a_max] and [b_min, b_max] overlap if:
119+
a_min <= b_max AND a_max >= b_min
120+
121+
Args:
122+
conn: duckdb connection
123+
file_path: path to the CSV file
124+
min_field: the column name for the range minimum
125+
max_field: the column name for the range maximum
126+
"""
127+
result = conn.execute(
128+
f"""
129+
SELECT
130+
a."{min_field}" as a_min,
131+
a."{max_field}" as a_max,
132+
b."{min_field}" as b_min,
133+
b."{max_field}" as b_max
134+
FROM {_read_csv(file_path)} a
135+
JOIN {_read_csv(file_path)} b
136+
ON CAST(a."{min_field}" AS BIGINT) < CAST(b."{min_field}" AS BIGINT)
137+
WHERE CAST(a."{min_field}" AS BIGINT) <= CAST(b."{max_field}" AS BIGINT)
138+
AND CAST(a."{max_field}" AS BIGINT) >= CAST(b."{min_field}" AS BIGINT)
139+
"""
140+
).fetchall()
141+
142+
overlaps = [
143+
{"range_1": [row[0], row[1]], "range_2": [row[2], row[3]]} for row in result
144+
]
145+
146+
if len(overlaps) == 0:
147+
passed = True
148+
message = f"no overlapping ranges found between '{min_field}' and '{max_field}'"
149+
else:
150+
passed = False
151+
message = f"there were {len(overlaps)} overlapping ranges found"
152+
153+
details = {
154+
"min_field": min_field,
155+
"max_field": max_field,
156+
"overlaps": overlaps,
157+
}
158+
159+
return passed, message, details
File renamed without changes.

0 commit comments

Comments
 (0)