Skip to content

Commit 4ed3035

Browse files
committed
used duckdb for datatype validation
1 parent 6818887 commit 4ed3035

3 files changed

Lines changed: 444 additions & 272 deletions

File tree

digital_land/expectations/operations/csv.py

Lines changed: 101 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,9 @@
22
import pandas as pd
33

44
from digital_land.expectations.operations.datatype_validators import (
5-
_is_valid_address_value,
6-
_is_valid_curie_list_value,
7-
_is_valid_curie_value,
8-
_is_valid_datetime_value,
9-
_is_valid_decimal_value,
10-
_is_valid_flag_value,
11-
_is_valid_hash_value,
12-
_is_valid_integer_value,
13-
_is_valid_json_value,
14-
_is_valid_latitude_value,
15-
_is_valid_longitude_value,
165
_is_valid_multipolygon_value,
176
_is_valid_pattern_value,
187
_is_valid_point_value,
19-
_is_valid_reference_value,
20-
_is_valid_url_value,
218
)
229

2310

@@ -316,16 +303,8 @@ def check_allowed_values(conn, file_path: Path, field: str, allowed_values: list
316303
field: the column name to validate
317304
allowed_values: allowed values for the field
318305
"""
319-
cleaned_allowed_values = [
320-
str(value).strip().replace("'", "''")
321-
for value in (allowed_values or [])
322-
if str(value).strip() != ""
323-
]
324-
325-
if not cleaned_allowed_values:
326-
raise ValueError("allowed_values must contain at least one non-empty value")
327306

328-
allowed_values_sql = ",".join("'" + value + "'" for value in cleaned_allowed_values)
307+
allowed_values_sql = ",".join("'" + value + "'" for value in allowed_values)
329308

330309
result = conn.execute(
331310
f"""
@@ -349,7 +328,7 @@ def check_allowed_values(conn, file_path: Path, field: str, allowed_values: list
349328

350329
details = {
351330
"field": field,
352-
"allowed_values": sorted({value for value in cleaned_allowed_values}),
331+
"allowed_values": sorted({value for value in allowed_values}),
353332
"invalid_values": invalid_values,
354333
"invalid_rows": invalid_rows,
355334
}
@@ -641,66 +620,128 @@ def check_field_is_within_range_by_dataset_org(
641620
return passed, message, details
642621

643622

644-
def check_values_have_the_correct_datatype(file_path, field_datatype, conn=None):
623+
def check_values_have_the_correct_datatype(conn, file_path, field_datatype):
645624
"""
646625
Validates that CSV column values have correct datatypes.
647626
627+
Uses DuckDB queries for datatypes: integer, decimal, flag, latitude, longitude, hash, curie, curie-list, json, url, date, datetime.
628+
629+
Uses Python validators for complex datatypes: pattern, multipolygon, point.
630+
648631
Args:
649632
file_path: path to the CSV file to validate
650633
field_datatype: dict mapping column name to datatype string
651-
conn: duckdb connection not used but required by caller
652634
"""
653-
validators = {
654-
"address": _is_valid_address_value,
655-
"curie-list": _is_valid_curie_list_value,
656-
"curie": _is_valid_curie_value,
657-
"date": _is_valid_datetime_value,
658-
"datetime": _is_valid_datetime_value,
659-
"decimal": _is_valid_decimal_value,
660-
"flag": _is_valid_flag_value,
661-
"hash": _is_valid_hash_value,
662-
"integer": _is_valid_integer_value,
663-
"json": _is_valid_json_value,
664-
"latitude": _is_valid_latitude_value,
665-
"longitude": _is_valid_longitude_value,
666-
"multipolygon": _is_valid_multipolygon_value,
635+
636+
def _get_sql_validation_condition(datatype: str, field_name: str) -> str:
637+
field_ref = f"TRIM(COALESCE(\"{field_name}\", ''))"
638+
639+
conditions = {
640+
"integer": f"{field_ref} != '' AND NOT (TRY_CAST({field_ref} AS DOUBLE) IS NOT NULL AND TRY_CAST({field_ref} AS DOUBLE) = TRY_CAST({field_ref} AS BIGINT))",
641+
"decimal": f"{field_ref} != '' AND TRY_CAST({field_ref} AS DECIMAL) IS NULL",
642+
"flag": f"{field_ref} != '' AND LOWER({field_ref}) NOT IN ('yes', 'no', 'true', 'false')",
643+
"latitude": f"{field_ref} != '' AND (TRY_CAST({field_ref} AS DOUBLE) IS NULL OR TRY_CAST({field_ref} AS DOUBLE) < -90 OR TRY_CAST({field_ref} AS DOUBLE) > 90)",
644+
"longitude": f"{field_ref} != '' AND (TRY_CAST({field_ref} AS DOUBLE) IS NULL OR TRY_CAST({field_ref} AS DOUBLE) < -180 OR TRY_CAST({field_ref} AS DOUBLE) > 180)",
645+
"hash": f"{field_ref} != '' AND NOT (REGEXP_MATCHES({field_ref}, '^([a-z]+:)?[0-9a-fA-F]+$'))",
646+
"curie": f"{field_ref} != '' AND NOT (REGEXP_MATCHES({field_ref}, '^[a-z0-9-]+:[^\\s:][^\\s]*$'))",
647+
"curie-list": f"{field_ref} != '' AND NOT (REGEXP_MATCHES({field_ref}, '^([a-z0-9-]+:[^\\s:][^\\s]*(;[a-z0-9-]+:[^\\s:][^\\s]*)*)?$'))",
648+
"json": f"{field_ref} != '' AND TRY(json_extract({field_ref}, '$')) IS NULL",
649+
"url": f"{field_ref} != '' AND NOT (REGEXP_MATCHES({field_ref}, '^[a-zA-Z][a-zA-Z0-9+.-]*://[^\\s/:?#]+(?::[0-9]+)?(?:[/?#][^\\s]*)?$'))",
650+
"date": f"{field_ref} != '' AND TRY_CAST({field_ref} AS DATE) IS NULL",
651+
"datetime": f"{field_ref} != '' AND TRY_CAST({field_ref} AS TIMESTAMP) IS NULL",
652+
}
653+
654+
return conditions.get(datatype, "FALSE")
655+
656+
# Python validators for complex datatypes that can't be easily expressed in SQL
657+
python_validators = {
667658
"pattern": _is_valid_pattern_value,
659+
"multipolygon": _is_valid_multipolygon_value,
668660
"point": _is_valid_point_value,
669-
"reference": _is_valid_reference_value,
670-
"url": _is_valid_url_value,
671661
}
672662

673-
df = pd.read_csv(file_path, dtype=str, keep_default_na=False)
674-
675-
if df.empty or len(df.columns) == 0:
676-
return True, "no invalid values found", {"invalid_rows": []}
663+
sql_validators = {
664+
"integer",
665+
"decimal",
666+
"flag",
667+
"latitude",
668+
"longitude",
669+
"hash",
670+
"curie",
671+
"curie-list",
672+
"json",
673+
"url",
674+
"date",
675+
"datetime",
676+
}
677677

678-
applicable_fields = [
679-
(field, field_datatype.get(field), validators[field_datatype.get(field)])
680-
for field in df.columns
681-
if field in field_datatype and field_datatype.get(field) in validators
682-
]
678+
fields_for_sql = []
679+
fields_for_python = []
683680

684-
if not applicable_fields:
685-
return True, "no invalid values found", {"invalid_rows": []}
681+
for field in field_datatype:
682+
datatype = field_datatype.get(field)
683+
if datatype in sql_validators:
684+
fields_for_sql.append((field, datatype))
685+
elif datatype in python_validators:
686+
fields_for_python.append((field, datatype, python_validators[datatype]))
686687

687688
invalid_values = []
688-
for line_number, (_, row) in enumerate(df.iterrows(), start=2):
689-
for field, datatype, validator in applicable_fields:
690-
value = str(row.get(field, "")).strip()
691-
if not value:
692-
continue
693689

694-
if not validator(value):
690+
# SQL validation: query invalid rows for each field
691+
if fields_for_sql:
692+
for field, datatype in fields_for_sql:
693+
condition = _get_sql_validation_condition(datatype, field)
694+
695+
result = conn.execute(
696+
f"""
697+
WITH source_rows AS (
698+
SELECT
699+
ROW_NUMBER() OVER () + 1 AS line_number,
700+
*
701+
FROM {_read_csv(file_path)}
702+
)
703+
SELECT
704+
line_number,
705+
TRIM(COALESCE("{field}", '')) AS value
706+
FROM source_rows
707+
WHERE {condition}
708+
"""
709+
).fetchall()
710+
711+
for row in result:
695712
invalid_values.append(
696713
{
697-
"line_number": line_number,
714+
"line_number": row[0],
698715
"field": field,
699716
"datatype": datatype,
700-
"value": value,
717+
"value": row[1],
701718
}
702719
)
703720

721+
if fields_for_python:
722+
df = pd.read_csv(file_path, dtype=str, keep_default_na=False)
723+
724+
if df.empty or len(df.columns) == 0:
725+
pass
726+
else:
727+
for line_number, (_, row) in enumerate(df.iterrows(), start=2):
728+
for field, datatype, validator in fields_for_python:
729+
if field not in df.columns:
730+
continue
731+
value = str(row.get(field, "")).strip()
732+
if not value:
733+
continue
734+
735+
if not validator(value):
736+
invalid_values.append(
737+
{
738+
"line_number": line_number,
739+
"field": field,
740+
"datatype": datatype,
741+
"value": value,
742+
}
743+
)
744+
704745
if len(invalid_values) == 0:
705746
passed = True
706747
message = "all values have valid datatypes"

0 commit comments

Comments
 (0)