diff --git a/.gitignore b/.gitignore index 9cb740a8..6dda51b8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ __pycache__/ *.py[cod] - +.history/ .cache .coverage build diff --git a/digital_land/expectations/checkpoints/csv.py b/digital_land/expectations/checkpoints/csv.py index 08e47d2d..b69ead33 100644 --- a/digital_land/expectations/checkpoints/csv.py +++ b/digital_land/expectations/checkpoints/csv.py @@ -9,6 +9,25 @@ check_unique, check_no_shared_values, check_no_overlapping_ranges, + check_fields_are_within_range, + check_field_is_within_range_by_dataset_org, + check_allowed_values, + check_no_blank_rows, + expect_column_to_be_integer, + expect_column_to_be_decimal, + expect_column_to_be_flag, + expect_column_to_be_latitude, + expect_column_to_be_longitude, + expect_column_to_be_hash, + expect_column_to_be_curie, + expect_column_to_be_curie_list, + expect_column_to_be_json, + expect_column_to_be_url, + expect_column_to_be_date, + expect_column_to_be_datetime, + expect_column_to_be_pattern, + expect_column_to_be_multipolygon, + expect_column_to_be_point, ) @@ -24,6 +43,25 @@ def operation_factory(self, operation_string: str): "check_unique": check_unique, "check_no_shared_values": check_no_shared_values, "check_no_overlapping_ranges": check_no_overlapping_ranges, + "check_fields_are_within_range": check_fields_are_within_range, + "check_field_is_within_range_by_dataset_org": check_field_is_within_range_by_dataset_org, + "check_allowed_values": check_allowed_values, + "check_no_blank_rows": check_no_blank_rows, + "expect_column_to_be_integer": expect_column_to_be_integer, + "expect_column_to_be_decimal": expect_column_to_be_decimal, + "expect_column_to_be_flag": expect_column_to_be_flag, + "expect_column_to_be_latitude": expect_column_to_be_latitude, + "expect_column_to_be_longitude": expect_column_to_be_longitude, + "expect_column_to_be_hash": expect_column_to_be_hash, + "expect_column_to_be_curie": expect_column_to_be_curie, + "expect_column_to_be_curie_list": expect_column_to_be_curie_list, + "expect_column_to_be_json": expect_column_to_be_json, + "expect_column_to_be_url": expect_column_to_be_url, + "expect_column_to_be_date": expect_column_to_be_date, + "expect_column_to_be_datetime": expect_column_to_be_datetime, + "expect_column_to_be_pattern": expect_column_to_be_pattern, + "expect_column_to_be_multipolygon": expect_column_to_be_multipolygon, + "expect_column_to_be_point": expect_column_to_be_point, } if operation_string not in operation_map: raise ValueError( diff --git a/digital_land/expectations/operations/csv.py b/digital_land/expectations/operations/csv.py index 5e6267b9..72f1d2e4 100644 --- a/digital_land/expectations/operations/csv.py +++ b/digital_land/expectations/operations/csv.py @@ -1,10 +1,144 @@ from pathlib import Path +import re +import pandas as pd + +from digital_land.expectations.operations.datatype_validators import ( + _is_valid_multipolygon_value, + _is_valid_point_value, +) def _read_csv(file_path: Path) -> str: return f"read_csv_auto('{str(file_path)}',all_varchar=true,delim=',',quote='\"',escape='\"')" +def _get_csv_columns(conn, file_path: Path) -> list: + """Get column names from CSV file.""" + return [ + col[0] + for col in conn.execute( + f"SELECT * FROM {_read_csv(file_path)} LIMIT 0" + ).description + ] + + +def _sql_string(value) -> str: + cleaned = str(value).strip().replace("'", "''") + return f"'{cleaned}'" + + +def _build_field_condition(field_name: str, spec) -> str: + if isinstance(spec, dict): + op = str(spec.get("op", spec.get("operation", ""))).strip().lower() + value = spec.get("value") + if not op: + raise ValueError( + f"Condition for '{field_name}' must include 'op' when using dict format" + ) + else: + op = "=" + value = spec + + if op in ("=", "=="): + return f'"{field_name}" = {_sql_string(value)}' + if op in ("!=", "<>"): + return f'"{field_name}" != {_sql_string(value)}' + if op in ("in", "not in"): + if not isinstance(value, (list, tuple, set)) or len(value) == 0: + raise ValueError( + f"Condition for '{field_name}' with op '{op}' must use a non-empty list" + ) + values_sql = ", ".join(_sql_string(item) for item in value) + return f'"{field_name}" {op.upper()} ({values_sql})' + + raise ValueError( + f"Unsupported operator '{op}' for field '{field_name}'. Supported: =, !=, in, not in" + ) + + +def _build_filter_clause(filter_spec, file_columns: list, name: str) -> str: + """Build SQL clause that keeps rows matching structured conditions.""" + if filter_spec is None: + groups = [] + elif isinstance(filter_spec, dict): + groups = [filter_spec] + elif isinstance(filter_spec, list): + groups = filter_spec + else: + raise ValueError(f"{name} must be a dict, list of dicts, or None") + + if not groups: + return "" + + clauses = [] + for group in groups: + if not isinstance(group, dict) or not group: + raise ValueError("Each condition group must be a non-empty dict") + + parts = [] + for field_name, spec in group.items(): + if field_name not in file_columns: + raise ValueError( + f"Column '{field_name}' not found in file. Available columns: {file_columns}" + ) + parts.append(_build_field_condition(field_name, spec)) + + clauses.append(f"({' AND '.join(parts)})") + + return f" AND ({' OR '.join(clauses)})" + + +def _normalize_fields_for_validation(field_spec, file_columns: list) -> list: + """Normalize a field spec into a list of column names to validate.""" + if isinstance(field_spec, str): + fields = [item.strip() for item in field_spec.split(",") if item.strip()] + elif isinstance(field_spec, (list, tuple, set)): + fields = [str(item).strip() for item in field_spec if str(item).strip()] + else: + raise ValueError( + "field must be a string, comma-separated string, or list of strings" + ) + + if not fields: + raise ValueError("field must include at least one column name") + + seen = set() + normalized_fields = [] + for field_name in fields: + if field_name not in seen: + seen.add(field_name) + normalized_fields.append(field_name) + + missing_fields = [ + field_name for field_name in normalized_fields if field_name not in file_columns + ] + if missing_fields: + raise ValueError( + f"Column(s) {missing_fields} not found in file. Available columns: {file_columns}" + ) + + return normalized_fields + + +def _build_range_invalid_rows( + result: list, + validating_multiple_fields: bool, +) -> list: + """Format query rows into expectation invalid_rows shape.""" + out_of_range_rows = [] + + for row in result: + field_name = row[1] + + invalid_row = {"line_number": row[0], "value": row[2]} + if validating_multiple_fields: + invalid_row["field"] = field_name + + out_of_range_rows.append(invalid_row) + + return out_of_range_rows + + def count_rows( conn, file_path: Path, expected: int, comparison_rule: str = "greater_than" ): @@ -157,3 +291,743 @@ def check_no_overlapping_ranges(conn, file_path: Path, min_field: str, max_field } return passed, message, details + + +def check_allowed_values(conn, file_path: Path, field: str, allowed_values: list): + """ + Checks that a field contains only values from an allowed set. + + Args: + conn: duckdb connection + file_path: path to the CSV file + field: the column name to validate + allowed_values: allowed values for the field + """ + + allowed_values_sql = ",".join("'" + value + "'" for value in allowed_values) + + result = conn.execute( + f""" + SELECT + ROW_NUMBER() OVER () + 1 AS line_number, + TRIM(COALESCE("{field}", '')) AS value + FROM {_read_csv(file_path)} + WHERE TRIM(COALESCE("{field}", '')) NOT IN ({allowed_values_sql}) + """ + ).fetchall() + + invalid_rows = [{"line_number": row[0], "value": row[1]} for row in result] + invalid_values = sorted({row["value"] for row in invalid_rows}) + + if len(invalid_rows) == 0: + passed = True + message = f"all values in '{field}' are allowed" + else: + passed = False + message = f"there were {len(invalid_rows)} invalid values in '{field}'" + + details = { + "field": field, + "allowed_values": sorted({value for value in allowed_values}), + "invalid_values": invalid_values, + "invalid_rows": invalid_rows, + } + + return passed, message, details + + +def check_no_blank_rows(conn, file_path: Path): + """ + Checks that the CSV does not contain fully blank rows. + + A row is considered blank when every column is empty after trimming whitespace. + + Args: + conn: duckdb connection + file_path: path to the CSV file + """ + file_columns = _get_csv_columns(conn, file_path) + if not file_columns: + return True, "no blank rows found", {"invalid_rows": []} + + blank_conditions = " AND ".join( + f"TRIM(COALESCE(\"{column_name}\", '')) = ''" for column_name in file_columns + ) + + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT + ROW_NUMBER() OVER () + 1 AS line_number, + * + FROM {_read_csv(file_path)} + ) + SELECT + line_number + FROM source_rows + WHERE {blank_conditions} + ORDER BY line_number + """ + ).fetchall() + + invalid_rows = [{"line_number": row[0]} for row in result] + + if len(invalid_rows) == 0: + passed = True + message = "no blank rows found" + else: + passed = False + message = f"there were {len(invalid_rows)} blank rows found" + + details = { + "invalid_rows": invalid_rows, + } + return passed, message, details + + +def check_fields_are_within_range( + conn, + file_path: Path, + field: str, + external_file: Path, + min_field: str, + max_field: str, + rules: dict = None, +): + """ + Check that one or more lookup fields are within ranges from an external file. + + Args: + conn: duckdb connection + file_path: path to the CSV file containing fields to validate + field: column name(s) to validate. + You can pass a single name ("entity") or a comma-separated list + ("entity, end-entity"). All specified fields must be within range. + external_file: path to the CSV file containing valid ranges + min_field: the column name for the range minimum + max_field: the column name for the range maximum + rules: optional dict controlling subset selection on lookup rows. + Supported keys: + - lookup_rules: dict or list[dict] of structured conditions. + Fields in one dict are AND'ed; multiple dicts are OR'ed. + Examples: + {"lookup_rules": {"prefix": "conservationarea"}} + {"lookup_rules": {"organisation": {"op": "in", "value": ["orgA", "orgB"]}}} + Use operators like != and not in when you want to exclude rows. + """ + file_columns = _get_csv_columns(conn, file_path) + rules = rules or {} + if not isinstance(rules, dict): + raise ValueError("rules must be a dictionary or None") + + lookup_clause = _build_filter_clause( + rules.get("lookup_rules"), + file_columns, + "rules.lookup_rules", + ) + + fields_to_validate = _normalize_fields_for_validation(field, file_columns) + validating_multiple_fields = len(fields_to_validate) > 1 + lookup_values_sql = ",\n ".join( + f'({i}, {_sql_string(field_name)}, TRY_CAST(src."{field_name}" AS BIGINT))' + for i, field_name in enumerate(fields_to_validate) + ) + + result = conn.execute( + f""" + WITH ranges AS ( + SELECT + TRY_CAST("{min_field}" AS BIGINT) AS min_value, + TRY_CAST("{max_field}" AS BIGINT) AS max_value + FROM {_read_csv(external_file)} + WHERE TRY_CAST("{min_field}" AS BIGINT) IS NOT NULL + AND TRY_CAST("{max_field}" AS BIGINT) IS NOT NULL + ), + source_rows AS ( + SELECT + ROW_NUMBER() OVER () + 1 AS line_number, + * + FROM {_read_csv(file_path)} + ), + lookup_rows AS ( + SELECT + src.line_number, + fields.field_order, + fields.field_name, + fields.value + FROM source_rows src + CROSS JOIN LATERAL ( + VALUES + {lookup_values_sql} + ) AS fields(field_order, field_name, value) + WHERE fields.value IS NOT NULL{lookup_clause} + ) + SELECT + line_number, + field_name, + value + FROM lookup_rows l + WHERE NOT EXISTS ( + SELECT 1 + FROM ranges r + WHERE l.value BETWEEN r.min_value AND r.max_value + ) + ORDER BY field_order, line_number + """ + ).fetchall() + + out_of_range_rows = _build_range_invalid_rows( + result=result, + validating_multiple_fields=validating_multiple_fields, + ) + + if len(out_of_range_rows) == 0: + passed = True + message = f"all values in '{field}' are within allowed ranges" + else: + passed = False + message = f"there were {len(out_of_range_rows)} out-of-range rows found" + + details = {"invalid_rows": out_of_range_rows} + return passed, message, details + + +def check_field_is_within_range_by_dataset_org( + conn, + file_path: Path, + field: str, + external_file: Path, + min_field: str, + max_field: str, + lookup_dataset_field: str, + range_dataset_field: str, + rules: dict = None, +): + """ + Check field values are within ranges matched by dataset field and organisation. + + Matching is fixed to two keys: + 1. lookup_dataset_field -> range_dataset_field + 2. organisation -> organisation + + Args: + conn: duckdb connection + file_path: path to the CSV file containing fields to validate + field: single column name to validate (for example: "entity"). + external_file: path to the CSV file containing valid ranges + min_field: the column name for the range minimum + max_field: the column name for the range maximum + lookup_dataset_field: dataset column name in file_path + range_dataset_field: dataset column name in external_file + rules: optional dict controlling subset selection on lookup rows. + Supported keys: + - lookup_rules: dict or list[dict] of structured conditions. + Fields in one dict are AND'ed; multiple dicts are OR'ed. + Examples: + {"lookup_rules": {"prefix": "conservationarea"}} + {"lookup_rules": {"organisation": {"op": "in", "value": ["orgA", "orgB"]}}} + Use operators like != and not in when you want to exclude rows. + """ + file_columns = _get_csv_columns(conn, file_path) + rules = rules or {} + if not isinstance(rules, dict): + raise ValueError("rules must be a dictionary or None") + + lookup_clause = _build_filter_clause( + rules.get("lookup_rules"), + file_columns, + "rules.lookup_rules", + ) + + fields_to_validate = _normalize_fields_for_validation(field, file_columns) + if len(fields_to_validate) != 1: + raise ValueError("field must be a single column name") + field_name = fields_to_validate[0] + + lookup_dataset_name = str(lookup_dataset_field).strip() + range_dataset_name = str(range_dataset_field).strip() + lookup_match_columns = [lookup_dataset_name, "organisation"] + + lookup_dataset_col = f'"{lookup_dataset_name}"' + range_dataset_col = f'"{range_dataset_name}"' + min_col = f'"{min_field}"' + max_col = f'"{max_field}"' + value_col = f'"{field_name}"' + + result = conn.execute( + f""" + WITH ranges AS ( + SELECT + TRY_CAST({min_col} AS BIGINT) AS min_value, + TRY_CAST({max_col} AS BIGINT) AS max_value, + TRIM(COALESCE({range_dataset_col}, '')) AS range_key_0, + TRIM(COALESCE("organisation", '')) AS range_key_1 + FROM {_read_csv(external_file)} + WHERE TRY_CAST({min_col} AS BIGINT) IS NOT NULL + AND TRY_CAST({max_col} AS BIGINT) IS NOT NULL + AND TRIM(COALESCE({range_dataset_col}, '')) != '' + AND TRIM(COALESCE("organisation", '')) != '' + ), + source_rows AS ( + SELECT + ROW_NUMBER() OVER () + 1 AS line_number, + * + FROM {_read_csv(file_path)} + ), + lookup_rows AS ( + SELECT + src.line_number, + TRY_CAST(src.{value_col} AS BIGINT) AS value, + TRIM(COALESCE(src.{lookup_dataset_col}, '')) AS lookup_key_0, + TRIM(COALESCE(src."organisation", '')) AS lookup_key_1 + FROM source_rows src + WHERE TRY_CAST(src.{value_col} AS BIGINT) IS NOT NULL + AND TRIM(COALESCE(src.{lookup_dataset_col}, '')) != '' + AND TRIM(COALESCE(src."organisation", '')) != ''{lookup_clause} + ) + SELECT + line_number, + value, + lookup_key_0, + lookup_key_1 + FROM lookup_rows l + WHERE NOT EXISTS ( + SELECT 1 + FROM ranges r + WHERE l.value BETWEEN r.min_value AND r.max_value + AND l.lookup_key_0 = r.range_key_0 + AND l.lookup_key_1 = r.range_key_1 + ) + ORDER BY line_number + """ + ).fetchall() + + out_of_range_rows = [] + for row in result: + invalid_row = {"line_number": row[0], field_name: row[1]} + for i, col_name in enumerate(lookup_match_columns): + invalid_row[col_name] = row[i + 2] + out_of_range_rows.append(invalid_row) + + if len(out_of_range_rows) == 0: + passed = True + message = f"all values in '{field}' are within allowed ranges" + else: + passed = False + message = f"there were {len(out_of_range_rows)} out-of-range rows found" + + details = {"invalid_rows": out_of_range_rows} + return passed, message, details + + +def expect_column_to_be_integer(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND NOT ( + TRY_CAST(TRIM(COALESCE("{field}", '')) AS DOUBLE) IS NOT NULL + AND TRY_CAST(TRIM(COALESCE("{field}", '')) AS DOUBLE) = TRY_CAST(TRIM(COALESCE("{field}", '')) AS BIGINT) + ) + """ + ).fetchall() + invalid_rows = [ + {"line_number": row[0], "field": field, "datatype": "integer", "value": row[1]} + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'integer'" + if passed + else f"there were {len(invalid_rows)} invalid 'integer' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_decimal(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND TRY_CAST(TRIM(COALESCE("{field}", '')) AS DECIMAL) IS NULL + """ + ).fetchall() + invalid_rows = [ + {"line_number": row[0], "field": field, "datatype": "decimal", "value": row[1]} + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'decimal'" + if passed + else f"there were {len(invalid_rows)} invalid 'decimal' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_flag(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND LOWER(TRIM(COALESCE("{field}", ''))) NOT IN ('yes', 'no', 'true', 'false') + """ + ).fetchall() + invalid_rows = [ + {"line_number": row[0], "field": field, "datatype": "flag", "value": row[1]} + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'flag'" + if passed + else f"there were {len(invalid_rows)} invalid 'flag' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_latitude(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND ( + TRY_CAST(TRIM(COALESCE("{field}", '')) AS DOUBLE) IS NULL + OR TRY_CAST(TRIM(COALESCE("{field}", '')) AS DOUBLE) < -90 + OR TRY_CAST(TRIM(COALESCE("{field}", '')) AS DOUBLE) > 90 + ) + """ + ).fetchall() + invalid_rows = [ + {"line_number": row[0], "field": field, "datatype": "latitude", "value": row[1]} + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'latitude'" + if passed + else f"there were {len(invalid_rows)} invalid 'latitude' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_longitude(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND ( + TRY_CAST(TRIM(COALESCE("{field}", '')) AS DOUBLE) IS NULL + OR TRY_CAST(TRIM(COALESCE("{field}", '')) AS DOUBLE) < -180 + OR TRY_CAST(TRIM(COALESCE("{field}", '')) AS DOUBLE) > 180 + ) + """ + ).fetchall() + invalid_rows = [ + { + "line_number": row[0], + "field": field, + "datatype": "longitude", + "value": row[1], + } + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'longitude'" + if passed + else f"there were {len(invalid_rows)} invalid 'longitude' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_hash(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND NOT (REGEXP_MATCHES(TRIM(COALESCE("{field}", '')), '^([a-z]+:)?[0-9a-fA-F]+$')) + """ + ).fetchall() + invalid_rows = [ + {"line_number": row[0], "field": field, "datatype": "hash", "value": row[1]} + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'hash'" + if passed + else f"there were {len(invalid_rows)} invalid 'hash' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_curie(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND NOT (REGEXP_MATCHES(TRIM(COALESCE("{field}", '')), '^[a-z0-9-]+:[^\\s:][^\\s]*$')) + """ + ).fetchall() + invalid_rows = [ + {"line_number": row[0], "field": field, "datatype": "curie", "value": row[1]} + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'curie'" + if passed + else f"there were {len(invalid_rows)} invalid 'curie' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_curie_list(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND NOT (REGEXP_MATCHES(TRIM(COALESCE("{field}", '')), '^([a-z0-9-]+:[^\\s:][^\\s]*(;[a-z0-9-]+:[^\\s:][^\\s]*)*)?$')) + """ + ).fetchall() + invalid_rows = [ + { + "line_number": row[0], + "field": field, + "datatype": "curie-list", + "value": row[1], + } + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'curie-list'" + if passed + else f"there were {len(invalid_rows)} invalid 'curie-list' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_json(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND TRY(json_extract(TRIM(COALESCE("{field}", '')), '$')) IS NULL + """ + ).fetchall() + invalid_rows = [ + {"line_number": row[0], "field": field, "datatype": "json", "value": row[1]} + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'json'" + if passed + else f"there were {len(invalid_rows)} invalid 'json' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_url(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND NOT (REGEXP_MATCHES(TRIM(COALESCE("{field}", '')), '^[a-zA-Z][a-zA-Z0-9+.-]*://[^\\s/:?#]+(?::[0-9]+)?(?:[/?#][^\\s]*)?$')) + """ + ).fetchall() + invalid_rows = [ + {"line_number": row[0], "field": field, "datatype": "url", "value": row[1]} + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'url'" + if passed + else f"there were {len(invalid_rows)} invalid 'url' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_date(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND TRY_CAST(TRIM(COALESCE("{field}", '')) AS DATE) IS NULL + """ + ).fetchall() + invalid_rows = [ + {"line_number": row[0], "field": field, "datatype": "date", "value": row[1]} + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'date'" + if passed + else f"there were {len(invalid_rows)} invalid 'date' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_datetime(conn, file_path: Path, field: str): + result = conn.execute( + f""" + WITH source_rows AS ( + SELECT ROW_NUMBER() OVER () + 1 AS line_number, * + FROM {_read_csv(file_path)} + ) + SELECT line_number, TRIM(COALESCE("{field}", '')) AS value + FROM source_rows + WHERE TRIM(COALESCE("{field}", '')) != '' + AND TRY_CAST(TRIM(COALESCE("{field}", '')) AS TIMESTAMP) IS NULL + """ + ).fetchall() + invalid_rows = [ + {"line_number": row[0], "field": field, "datatype": "datetime", "value": row[1]} + for row in result + ] + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'datetime'" + if passed + else f"there were {len(invalid_rows)} invalid 'datetime' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_pattern(conn, file_path: Path, field: str): + invalid_rows = [] + df = pd.read_csv(file_path, dtype=str, keep_default_na=False) + if not df.empty and len(df.columns) > 0 and field in df.columns: + for line_number, (_, row) in enumerate(df.iterrows(), start=2): + value = str(row.get(field, "")).strip() + if not value: + continue + try: + re.compile(value) + except re.error: + invalid_rows.append( + { + "line_number": line_number, + "field": field, + "datatype": "pattern", + "value": value, + } + ) + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'pattern'" + if passed + else f"there were {len(invalid_rows)} invalid 'pattern' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_multipolygon(conn, file_path: Path, field: str): + invalid_rows = [] + df = pd.read_csv(file_path, dtype=str, keep_default_na=False) + if not df.empty and len(df.columns) > 0 and field in df.columns: + for line_number, (_, row) in enumerate(df.iterrows(), start=2): + value = str(row.get(field, "")).strip() + if not value: + continue + if not _is_valid_multipolygon_value(value): + invalid_rows.append( + { + "line_number": line_number, + "field": field, + "datatype": "multipolygon", + "value": value, + } + ) + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'multipolygon'" + if passed + else f"there were {len(invalid_rows)} invalid 'multipolygon' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} + + +def expect_column_to_be_point(conn, file_path: Path, field: str): + invalid_rows = [] + df = pd.read_csv(file_path, dtype=str, keep_default_na=False) + if not df.empty and len(df.columns) > 0 and field in df.columns: + for line_number, (_, row) in enumerate(df.iterrows(), start=2): + value = str(row.get(field, "")).strip() + if not value: + continue + if not _is_valid_point_value(value): + invalid_rows.append( + { + "line_number": line_number, + "field": field, + "datatype": "point", + "value": value, + } + ) + passed = len(invalid_rows) == 0 + message = ( + f"all values in '{field}' have datatype 'point'" + if passed + else f"there were {len(invalid_rows)} invalid 'point' value(s) in '{field}'" + ) + return passed, message, {"invalid_rows": invalid_rows} diff --git a/digital_land/expectations/operations/datatype_validators.py b/digital_land/expectations/operations/datatype_validators.py new file mode 100644 index 00000000..ce9eedd2 --- /dev/null +++ b/digital_land/expectations/operations/datatype_validators.py @@ -0,0 +1,73 @@ +import json + +import shapely.errors +import shapely.wkt +from shapely.geometry import GeometryCollection, MultiPolygon, Point, Polygon, shape + + +def _is_valid_multipolygon_value(value): + candidate = (value or "").strip() + if not candidate: + return False + + try: + geometry = shapely.wkt.loads(candidate) + except shapely.errors.WKTReadingError: + try: + geojson = json.loads(candidate) + geometry = shape(geojson) + except Exception: + return False + + if not isinstance(geometry, (Polygon, MultiPolygon, GeometryCollection)): + return False + + # Shapely normal validity check where available. + is_valid = getattr(geometry, "is_valid", True) + return bool(is_valid) + + +def _is_valid_point_value(value): + candidate = value + if candidate is None: + return False + + # Try WKT first. + try: + point = shapely.wkt.loads( + candidate if isinstance(candidate, str) else str(candidate) + ) + if not isinstance(point, Point): + return False + return bool(getattr(point, "is_valid", True)) + except shapely.errors.WKTReadingError: + pass + except Exception: + return False + + # Fallback to coordinate pair. + try: + if isinstance(candidate, (list, tuple)) and len(candidate) == 2: + x_raw, y_raw = candidate[0], candidate[1] + elif isinstance(candidate, str): + text = candidate.strip() + if not text: + return False + + if text.startswith("["): + coords = json.loads(text) + if not isinstance(coords, list) or len(coords) != 2: + return False + x_raw, y_raw = coords[0], coords[1] + else: + parts = [p.strip() for p in text.split(",")] + if len(parts) != 2: + return False + x_raw, y_raw = parts[0], parts[1] + else: + return False + + point = Point(float(x_raw), float(y_raw)) + return bool(getattr(point, "is_valid", True)) + except Exception: + return False diff --git a/tests/integration/expectations/operations/test_csv.py b/tests/integration/expectations/operations/test_csv.py index 77a9677c..166b5140 100644 --- a/tests/integration/expectations/operations/test_csv.py +++ b/tests/integration/expectations/operations/test_csv.py @@ -7,6 +7,25 @@ check_unique, check_no_shared_values, check_no_overlapping_ranges, + check_allowed_values, + check_no_blank_rows, + check_fields_are_within_range, + check_field_is_within_range_by_dataset_org, + expect_column_to_be_decimal, + expect_column_to_be_flag, + expect_column_to_be_latitude, + expect_column_to_be_longitude, + expect_column_to_be_hash, + expect_column_to_be_curie, + expect_column_to_be_curie_list, + expect_column_to_be_json, + expect_column_to_be_url, + expect_column_to_be_date, + expect_column_to_be_datetime, + expect_column_to_be_pattern, + expect_column_to_be_point, + expect_column_to_be_integer, + expect_column_to_be_multipolygon, ) @@ -201,3 +220,732 @@ def test_check_no_overlapping_ranges_adjacent_fails(tmp_path): ) assert passed is False assert len(details["overlaps"]) == 1 + + +def test_check_field_is_within_ranges_fails(tmp_path): + lookup_file = tmp_path / "lookup.csv" + with open(lookup_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity"]) + writer.writerow(["150"]) + writer.writerow(["999"]) + + organisation_file = tmp_path / "entity-organisation.csv" + with open(organisation_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity-minimum", "entity-maximum"]) + writer.writerow(["100", "200"]) + writer.writerow(["300", "400"]) + + conn = duckdb.connect() + passed, message, details = check_fields_are_within_range( + conn, + file_path=lookup_file, + external_file=organisation_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity", + ) + + assert passed is False + assert "out-of-range" in message + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["line_number"] == 3 + assert details["invalid_rows"][0]["value"] == 999 + + +def test_check_field_is_within_ranges_ignores_org(tmp_path): + lookup_file = tmp_path / "lookup.csv" + with open(lookup_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity"]) + writer.writerow(["150"]) + writer.writerow(["250"]) + + organisation_file = tmp_path / "entity-organisation.csv" + with open(organisation_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity-minimum", "entity-maximum"]) + writer.writerow(["100", "200"]) + writer.writerow(["300", "400"]) + + conn = duckdb.connect() + # Test without match_fields - simple range check + passed, message, details = check_fields_are_within_range( + conn, + file_path=lookup_file, + external_file=organisation_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity", + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["line_number"] == 3 + assert details["invalid_rows"][0]["value"] == 250 + + +def test_check_allowed_values_fails_for_old_entity_status(tmp_path): + file_path = tmp_path / "old-entity.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["old-entity", "status", "entity"]) + writer.writerow(["1001", "301", "2001"]) + writer.writerow(["1002", "410", "2002"]) + writer.writerow(["1003", "302", "2003"]) + + conn = duckdb.connect() + passed, message, details = check_allowed_values( + conn, + file_path=file_path, + field="status", + allowed_values=["301", "410"], + ) + + assert passed is False + assert "invalid values" in message + assert details["invalid_values"] == ["302"] + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["value"] == "302" + + +def test_check_allowed_values_passes_for_old_entity_status(tmp_path): + file_path = tmp_path / "old-entity.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["old-entity", "status", "entity"]) + writer.writerow(["1001", "301", "2001"]) + writer.writerow(["1002", "410", "2002"]) + + conn = duckdb.connect() + passed, message, details = check_allowed_values( + conn, + file_path=file_path, + field="status", + allowed_values=["301", "410"], + ) + + assert passed is True + assert details["invalid_rows"] == [] + + +def test_check_no_blank_rows_passes(tmp_path): + file_path = tmp_path / "no-blank-rows.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity", "name", "reference"]) + writer.writerow(["1", "foo", "ref1"]) + writer.writerow(["2", "bar", "ref2"]) + + conn = duckdb.connect() + passed, message, details = check_no_blank_rows(conn, file_path=file_path) + + assert passed is True + assert details["invalid_rows"] == [] + + +def test_check_no_blank_rows_fails(tmp_path): + file_path = tmp_path / "has-blank-rows.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity", "name", "reference"]) + writer.writerow(["1", "foo", "ref1"]) + writer.writerow(["", "", ""]) + writer.writerow([" ", "", " "]) + writer.writerow(["2", "bar", "ref2"]) + + conn = duckdb.connect() + passed, message, details = check_no_blank_rows(conn, file_path=file_path) + + assert passed is False + assert "blank rows" in message + assert len(details["invalid_rows"]) == 2 + assert details["invalid_rows"][0]["line_number"] == 3 + assert details["invalid_rows"][1]["line_number"] == 4 + + +def test_check_field_is_within_ranges_by_dataset_org_matches_prefix_and_organisation_fails( + tmp_path, +): + file_path = tmp_path / "lookup.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity", "prefix", "organisation", "reference"]) + writer.writerow(["150", "dataset-a", "org-1", "ok-ref"]) + writer.writerow(["250", "dataset-a", "org-1", "bad-ref"]) + writer.writerow(["999", "dataset-a", "org-2", "other-org-ref"]) + + external_file = tmp_path / "ranges.csv" + with open(external_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["dataset", "organisation", "entity-minimum", "entity-maximum"]) + writer.writerow(["dataset-a", "org-1", "100", "200"]) + writer.writerow(["dataset-a", "org-2", "900", "1000"]) + + conn = duckdb.connect() + passed, message, details = check_field_is_within_range_by_dataset_org( + conn, + file_path=file_path, + external_file=external_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity", + lookup_dataset_field="prefix", + range_dataset_field="dataset", + ) + + assert passed is False + assert "out-of-range" in message + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["line_number"] == 3 + assert details["invalid_rows"][0]["entity"] == 250 + assert details["invalid_rows"][0]["prefix"] == "dataset-a" + assert details["invalid_rows"][0]["organisation"] == "org-1" + + +def test_check_field_is_within_ranges_by_dataset_org_matches_prefix_and_organisation_passes( + tmp_path, +): + file_path = tmp_path / "lookup.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity", "prefix", "organisation", "reference"]) + writer.writerow(["150", "dataset-a", "org-1", "ok-ref"]) + writer.writerow(["950", "dataset-a", "org-2", "ok-ref-2"]) + + external_file = tmp_path / "ranges.csv" + with open(external_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["dataset", "organisation", "entity-minimum", "entity-maximum"]) + writer.writerow(["dataset-a", "org-1", "100", "200"]) + writer.writerow(["dataset-a", "org-2", "900", "1000"]) + + conn = duckdb.connect() + passed, message, details = check_field_is_within_range_by_dataset_org( + conn, + file_path=file_path, + external_file=external_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity", + lookup_dataset_field="prefix", + range_dataset_field="dataset", + ) + + +def test_check_field_is_within_ranges_by_dataset_org_supports_custom_column_names( + tmp_path, +): + file_path = tmp_path / "lookup_custom.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity_value", "dataset_key", "organisation", "ref_code"]) + writer.writerow(["55", "dataset-x", "org-a", "ok-ref"]) + writer.writerow(["250", "dataset-x", "org-a", "bad-ref"]) + + external_file = tmp_path / "ranges_custom.csv" + with open(external_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow( + ["dataset_name", "organisation", "entity-minimum", "entity-maximum"] + ) + writer.writerow(["dataset-x", "org-a", "50", "100"]) + + conn = duckdb.connect() + passed, message, details = check_field_is_within_range_by_dataset_org( + conn, + file_path=file_path, + external_file=external_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity_value", + lookup_dataset_field="dataset_key", + range_dataset_field="dataset_name", + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["line_number"] == 3 + assert details["invalid_rows"][0]["entity_value"] == 250 + assert details["invalid_rows"][0]["dataset_key"] == "dataset-x" + assert details["invalid_rows"][0]["organisation"] == "org-a" + + +def test_check_field_is_within_ranges_filters_rows_with_lookup_rules(tmp_path): + """Test filtering rows with lookup_rules during validation.""" + file_path = tmp_path / "lookup.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity", "status"]) + writer.writerow(["150", "active"]) + writer.writerow(["250", "active"]) + writer.writerow(["350", "inactive"]) + + external_file = tmp_path / "ranges.csv" + with open(external_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity-minimum", "entity-maximum"]) + writer.writerow(["100", "200"]) + + conn = duckdb.connect() + passed, message, details = check_fields_are_within_range( + conn, + file_path=file_path, + external_file=external_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity", + rules={"lookup_rules": {"status": "active"}}, + ) + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["value"] == 250 + assert details["invalid_rows"][0]["line_number"] == 3 + + +def test_check_field_is_within_ranges_lookup_rules_operator_eq_shape(tmp_path): + file_path = tmp_path / "lookup.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity", "prefix"]) + writer.writerow(["150", "conservationarea"]) + writer.writerow(["350", "other"]) + + external_file = tmp_path / "ranges.csv" + with open(external_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity-minimum", "entity-maximum"]) + writer.writerow(["100", "200"]) + + conn = duckdb.connect() + passed, message, details = check_fields_are_within_range( + conn, + file_path=file_path, + external_file=external_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity", + rules={"lookup_rules": {"prefix": {"op": "=", "value": "conservationarea"}}}, + ) + + assert passed is True + assert details["invalid_rows"] == [] + + +def test_check_field_is_within_ranges_lookup_rules_exact_match(tmp_path): + file_path = tmp_path / "lookup.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity", "prefix"]) + writer.writerow(["150", "conservationarea"]) + writer.writerow(["350", "other"]) + + external_file = tmp_path / "ranges.csv" + with open(external_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity-minimum", "entity-maximum"]) + writer.writerow(["100", "200"]) + + conn = duckdb.connect() + passed, message, details = check_fields_are_within_range( + conn, + file_path=file_path, + external_file=external_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity", + rules={"lookup_rules": {"prefix": "conservationarea"}}, + ) + + assert passed is True + assert details["invalid_rows"] == [] + + +def test_check_field_is_within_ranges_lookup_rules_operator_in(tmp_path): + file_path = tmp_path / "lookup.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity", "organisation"]) + writer.writerow(["150", "org-a"]) + writer.writerow(["350", "org-b"]) + writer.writerow(["350", "org-c"]) + + external_file = tmp_path / "ranges.csv" + with open(external_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity-minimum", "entity-maximum"]) + writer.writerow(["100", "200"]) + + conn = duckdb.connect() + passed, message, details = check_fields_are_within_range( + conn, + file_path=file_path, + external_file=external_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity", + rules={ + "lookup_rules": {"organisation": {"op": "in", "value": ["org-a", "org-b"]}} + }, + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["line_number"] == 3 + assert details["invalid_rows"][0]["value"] == 350 + + +def test_check_field_is_within_ranges_comma_separated_fields(tmp_path): + file_path = tmp_path / "lookup.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity", "end-entity"]) + writer.writerow(["150", "160"]) + writer.writerow(["150", "350"]) + writer.writerow(["350", "150"]) + + external_file = tmp_path / "ranges.csv" + with open(external_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity-minimum", "entity-maximum"]) + writer.writerow(["100", "200"]) + + conn = duckdb.connect() + passed, message, details = check_fields_are_within_range( + conn, + file_path=file_path, + external_file=external_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity, end-entity", + ) + + assert passed is False + assert len(details["invalid_rows"]) == 2 + assert details["invalid_rows"][0]["line_number"] == 4 + assert details["invalid_rows"][0]["field"] == "entity" + assert details["invalid_rows"][0]["value"] == 350 + assert details["invalid_rows"][1]["line_number"] == 3 + assert details["invalid_rows"][1]["field"] == "end-entity" + assert details["invalid_rows"][1]["value"] == 350 + + +def test_check_field_is_within_ranges_for_only_staus_301(tmp_path): + file_path = tmp_path / "lookup.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity", "status", "old-entity"]) + writer.writerow(["150", "301", "140"]) + writer.writerow(["250", "301", "150"]) + writer.writerow(["350", "410", "340"]) + + external_file = tmp_path / "ranges.csv" + with open(external_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["entity-minimum", "entity-maximum"]) + writer.writerow(["100", "200"]) + + conn = duckdb.connect() + passed, message, details = check_fields_are_within_range( + conn, + file_path=file_path, + external_file=external_file, + min_field="entity-minimum", + max_field="entity-maximum", + field="entity,old-entity", + rules={"lookup_rules": {"status": {"op": "=", "value": "301"}}}, + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["line_number"] == 3 + assert details["invalid_rows"][0]["field"] == "entity" + assert details["invalid_rows"][0]["value"] == 250 + + +def test_expect_column_to_be_integer(tmp_path): + file_path = tmp_path / "expect_integer.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["count"]) + writer.writerow(["10"]) + writer.writerow(["abc"]) + + conn = duckdb.connect() + passed, message, details = expect_column_to_be_integer( + conn, file_path=file_path, field="count" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "integer" + assert "invalid 'integer'" in message + + +def test_expect_column_to_be_multipolygon(tmp_path): + file_path = tmp_path / "expect_multipolygon.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["boundary"]) + writer.writerow(["POLYGON((0 0, 10 0, 10 10, 0 10, 0 0))"]) + writer.writerow(["POINT(0 0)"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_multipolygon( + conn, file_path=file_path, field="boundary" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "multipolygon" + + +def test_expect_column_to_be_decimal(tmp_path): + file_path = tmp_path / "expect_decimal.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["price"]) + writer.writerow(["1.2"]) + writer.writerow(["10"]) + writer.writerow(["0.01"]) + writer.writerow(["bad"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_decimal( + conn, file_path=file_path, field="price" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "decimal" + + +def test_expect_column_to_be_flag(tmp_path): + file_path = tmp_path / "expect_flag.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["active"]) + writer.writerow(["true"]) + writer.writerow(["false"]) + writer.writerow(["yes"]) + writer.writerow(["no"]) + writer.writerow(["maybe"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_flag( + conn, file_path=file_path, field="active" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "flag" + + +def test_expect_column_to_be_latitude(tmp_path): + file_path = tmp_path / "expect_latitude.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["lat"]) + writer.writerow(["0"]) + writer.writerow(["51.5"]) + writer.writerow(["-90"]) + writer.writerow(["90"]) + writer.writerow(["91"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_latitude( + conn, file_path=file_path, field="lat" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "latitude" + + +def test_expect_column_to_be_longitude(tmp_path): + file_path = tmp_path / "expect_longitude.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["lon"]) + writer.writerow(["0"]) + writer.writerow(["-0.1"]) + writer.writerow(["-180"]) + writer.writerow(["180"]) + writer.writerow(["181"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_longitude( + conn, file_path=file_path, field="lon" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "longitude" + + +def test_expect_column_to_be_hash(tmp_path): + file_path = tmp_path / "expect_hash.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["hash"]) + writer.writerow(["abcdef"]) + writer.writerow(["sha:5d41402abc4b2a76b9719d911017c592"]) + writer.writerow(["xyz:notahex"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_hash( + conn, file_path=file_path, field="hash" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "hash" + + +def test_expect_column_to_be_curie(tmp_path): + file_path = tmp_path / "expect_curie.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["id"]) + writer.writerow(["prefix:value"]) + writer.writerow(["org:entity123"]) + writer.writerow(["no_colon"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_curie( + conn, file_path=file_path, field="id" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "curie" + + +def test_expect_column_to_be_curie_list(tmp_path): + file_path = tmp_path / "expect_curie_list.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["ids"]) + writer.writerow(["prefix:a;org:b"]) + writer.writerow(["schema:name"]) + writer.writerow(["not-valid"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_curie_list( + conn, file_path=file_path, field="ids" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "curie-list" + + +def test_expect_column_to_be_json(tmp_path): + file_path = tmp_path / "expect_json.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["payload"]) + writer.writerow(['{"a":1}']) + writer.writerow(["[1,2,3]"]) + writer.writerow(["not json"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_json( + conn, file_path=file_path, field="payload" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "json" + + +def test_expect_column_to_be_url(tmp_path): + file_path = tmp_path / "expect_url.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["url"]) + writer.writerow(["https://example.com"]) + writer.writerow(["http://test.org"]) + writer.writerow(["example.com"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_url(conn, file_path=file_path, field="url") + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "url" + + +def test_expect_column_to_be_date(tmp_path): + file_path = tmp_path / "expect_date.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["d"]) + writer.writerow(["2024-01-01"]) + writer.writerow(["2023-12-31"]) + writer.writerow(["not-a-date"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_date(conn, file_path=file_path, field="d") + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "date" + + +def test_expect_column_to_be_datetime(tmp_path): + file_path = tmp_path / "expect_datetime.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["dt"]) + writer.writerow(["2024-01-01T10:00:00"]) + writer.writerow(["2023-12-31T23:59:59Z"]) + writer.writerow(["not-a-datetime"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_datetime( + conn, file_path=file_path, field="dt" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "datetime" + + +def test_expect_column_to_be_pattern(tmp_path): + file_path = tmp_path / "expect_pattern.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["regex"]) + writer.writerow(["^[A-Z]+$"]) + writer.writerow(["(foo|bar)"]) + writer.writerow(["["]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_pattern( + conn, file_path=file_path, field="regex" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "pattern" + + +def test_expect_column_to_be_point(tmp_path): + file_path = tmp_path / "expect_point.csv" + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["geom"]) + writer.writerow(["POINT(0 0)"]) + writer.writerow(["POINT(1 2)"]) + writer.writerow(["POINT(0)"]) + + conn = duckdb.connect() + passed, _, details = expect_column_to_be_point( + conn, file_path=file_path, field="geom" + ) + + assert passed is False + assert len(details["invalid_rows"]) == 1 + assert details["invalid_rows"][0]["datatype"] == "point"