From 81f0125ab63512ae1ec303b3ae3e7902b6897b7e Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Wed, 4 Mar 2026 11:55:06 +0000 Subject: [PATCH] Add normalized CSV read mode for ragged clean inputs --- README.md | 26 +++++++++ docs/config-schema.md | 3 ++ docs/conventions.md | 21 ++++++++ tests/test_clean_duckdb_read.py | 63 ++++++++++++++++++++++ toolkit/clean/duckdb_read.py | 93 +++++++++++++++++++++++++++++++++ toolkit/core/config_models.py | 6 +++ toolkit/core/csv_read.py | 3 ++ 7 files changed, 215 insertions(+) diff --git a/README.md b/README.md index 62ff410..aa59302 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,32 @@ Convenzioni: - `sql/mart/*.sql` definisce le tabelle MART - per esempi pronti, vedi [examples/dataset_min.yml](examples/dataset_min.yml) e [examples/dataset_full.yml](examples/dataset_full.yml) +Caso utile per CSV pubblici multi-anno quasi stabili ma non identici: + +```yaml +clean: + sql: "sql/clean.sql" + read: + source: config_only + header: false + skip: 1 + delim: ";" + encoding: "utf-8" + columns: + column00: "VARCHAR" + column01: "VARCHAR" + column02: "VARCHAR" + normalize_rows_to_columns: true +``` + +Usa `normalize_rows_to_columns: true` quando: + +- vuoi un layout posizionale canonico +- alcuni file o anni hanno righe piu corte del numero di colonne atteso +- vuoi evitare che una colonna aggiunta o mancante faccia saltare il `clean` + +Non usarlo come default. Ha senso quando stai deliberatamente gestendo una fonte instabile con schema posizionale dichiarato. + ## CLI Workflow canonico: diff --git a/docs/config-schema.md b/docs/config-schema.md index d08b552..9ea11db 100644 --- a/docs/config-schema.md +++ b/docs/config-schema.md @@ -88,6 +88,7 @@ I path relativi sono sempre risolti rispetto alla directory che contiene `datase | `parallel` | `bool \| null` | `null` | | `nullstr` | `string \| list[string] \| null` | `null` | | `columns` | `dict[string,string] \| null` | `null` | +| `normalize_rows_to_columns` | `bool` | `false` | | `trim_whitespace` | `bool` | `true` | | `sample_size` | `int \| null` | `null` | | `sheet_name` | `string \| int \| null` | `null` | @@ -103,6 +104,8 @@ Note pratiche: - RAW conserva il workbook originale senza convertirlo - per `.xlsx`, le opzioni utili sono soprattutto `header`, `skip`, `columns`, `trim_whitespace`, `sheet_name` - `sheet_name` usa il primo foglio se omesso +- `normalize_rows_to_columns: true` ha senso solo insieme a `columns` +- con `normalize_rows_to_columns: true`, il toolkit normalizza le righe corte del CSV allo schema atteso prima di esporre `raw_input` `CleanValidate`: diff --git a/docs/conventions.md b/docs/conventions.md index 414cf96..9b43d0a 100644 --- a/docs/conventions.md +++ b/docs/conventions.md @@ -71,6 +71,27 @@ clean: - I suggested hints non forzano mai il preset robusto. - Il preset robusto viene applicato solo se richiesto da `clean.read_mode` o dal fallback runtime del reader. +## Positional Fixed Schema + +- `clean.read.columns` dichiara uno schema canonico per il reader CLEAN. +- `clean.read.normalize_rows_to_columns: true` attiva una lettura CSV normalizzata lato toolkit: + - richiede `clean.read.columns` + - salta l'header se `header: true` + - pad-da a destra le righe piu corte fino al numero di colonne atteso + - fallisce se una riga ha piu colonne di quelle dichiarate + +Usarlo quando: + +- stai leggendo CSV pubblici multi-anno con schema quasi stabile ma non identico +- vuoi mantenere un mapping posizionale unico nel `clean.sql` +- alcune annualita hanno colonne finali assenti o vuote + +Non usarlo quando: + +- il file ha gia uno schema per nome colonna stabile +- il problema e solo di delimitatore o quoting +- non hai deciso esplicitamente uno schema canonico + ## Metadata - Ogni `metadata.json` include `metadata_schema_version`. diff --git a/tests/test_clean_duckdb_read.py b/tests/test_clean_duckdb_read.py index dddfe68..ad36bc5 100644 --- a/tests/test_clean_duckdb_read.py +++ b/tests/test_clean_duckdb_read.py @@ -176,6 +176,69 @@ def test_read_raw_to_relation_handles_no_header_fixed_schema_without_extra_colum con.close() +def test_read_raw_to_relation_normalizes_short_rows_to_fixed_schema(tmp_path: Path): + input_file = tmp_path / "ragged.csv" + input_file.write_text("A;B;C\n1;2\n3;4;5\n", encoding="utf-8") + + con = duckdb.connect(":memory:") + logger = logging.getLogger("tests.clean.duckdb_read.normalize_rows") + + info = duckdb_read.read_raw_to_relation( + con, + [input_file], + { + "delim": ";", + "encoding": "utf-8", + "header": False, + "columns": { + "col0": "VARCHAR", + "col1": "VARCHAR", + "col2": "VARCHAR", + }, + "normalize_rows_to_columns": True, + }, + "strict", + logger, + ) + + rows = con.execute("SELECT col0, col1, col2 FROM raw_input ORDER BY col0").fetchall() + assert info.source == "strict" + assert info.params_used["normalize_rows_to_columns"] is True + assert rows == [("1", "2", ""), ("3", "4", "5"), ("A", "B", "C")] + con.close() + + +def test_read_raw_to_relation_normalize_rows_skips_header_when_configured(tmp_path: Path): + input_file = tmp_path / "ragged_header.csv" + input_file.write_text("h0;h1;h2\n1;2\n", encoding="utf-8") + + con = duckdb.connect(":memory:") + logger = logging.getLogger("tests.clean.duckdb_read.normalize_rows_header") + + info = duckdb_read.read_raw_to_relation( + con, + [input_file], + { + "delim": ";", + "encoding": "utf-8", + "header": True, + "columns": { + "col0": "VARCHAR", + "col1": "VARCHAR", + "col2": "VARCHAR", + }, + "normalize_rows_to_columns": True, + }, + "strict", + logger, + ) + + rows = con.execute("SELECT col0, col1, col2 FROM raw_input").fetchall() + assert info.params_used["header"] is True + assert rows == [("1", "2", "")] + con.close() + + def test_read_raw_to_relation_reads_xlsx_first_sheet(tmp_path: Path): input_file = tmp_path / "ok.xlsx" pd.DataFrame( diff --git a/toolkit/clean/duckdb_read.py b/toolkit/clean/duckdb_read.py index fcc1d26..b253b8d 100644 --- a/toolkit/clean/duckdb_read.py +++ b/toolkit/clean/duckdb_read.py @@ -1,5 +1,6 @@ from __future__ import annotations +import csv import json from dataclasses import dataclass from pathlib import Path @@ -237,6 +238,9 @@ def _execute_csv_read( input_files: list[Path], read_cfg: dict[str, Any], ) -> dict[str, Any]: + if read_cfg.get("normalize_rows_to_columns"): + return _execute_normalized_csv_read(con, input_files, read_cfg) + paths = quote_list(input_files) trim_whitespace = read_cfg.get("trim_whitespace", True) sample_size = read_cfg.get("sample_size") @@ -269,6 +273,95 @@ def _execute_csv_read( return params_used +def _normalized_csv_reader_kwargs(read_cfg: dict[str, Any]) -> dict[str, Any]: + kwargs: dict[str, Any] = { + "delimiter": read_cfg.get("delim") or ",", + } + quote = read_cfg.get("quote") + if quote is not None: + kwargs["quotechar"] = quote + escape = read_cfg.get("escape") + if escape is not None: + kwargs["escapechar"] = escape + return kwargs + + +def _load_normalized_csv_frame( + input_file: Path, + read_cfg: dict[str, Any], + columns: dict[str, str], +) -> pd.DataFrame: + encoding = normalize_encoding(read_cfg.get("encoding")) or "utf-8" + trim_whitespace = bool(read_cfg.get("trim_whitespace", True)) + header = bool(read_cfg.get("header", True)) + skip = int(read_cfg.get("skip") or 0) + expected_names = list(columns.keys()) + expected_len = len(expected_names) + skip_rows = skip + (1 if header else 0) + + rows: list[list[Any]] = [] + with input_file.open("r", encoding=encoding, newline="") as handle: + reader = csv.reader(handle, **_normalized_csv_reader_kwargs(read_cfg)) + for _ in range(skip_rows): + try: + next(reader) + except StopIteration: + break + for row_number, row in enumerate(reader, start=skip_rows + 1): + if len(row) > expected_len: + raise ValueError( + "CSV row wider than configured columns while normalize_rows_to_columns=true. " + f"file={input_file} row={row_number} configured={expected_len} actual={len(row)}" + ) + if len(row) < expected_len: + row = list(row) + [""] * (expected_len - len(row)) + else: + row = list(row) + if trim_whitespace: + row = [value.strip() if isinstance(value, str) else value for value in row] + rows.append(row) + + return pd.DataFrame(rows, columns=expected_names) + + +def _execute_normalized_csv_read( + con: duckdb.DuckDBPyConnection, + input_files: list[Path], + read_cfg: dict[str, Any], +) -> dict[str, Any]: + columns = read_cfg.get("columns") + if not columns: + raise ValueError( + "clean.read.normalize_rows_to_columns=true requires clean.read.columns" + ) + + frames = [ + _load_normalized_csv_frame(input_file, read_cfg, columns) + for input_file in input_files + ] + combined = pd.concat(frames, ignore_index=True) if len(frames) > 1 else frames[0] + con.register("raw_input_df", combined) + con.execute("CREATE OR REPLACE VIEW raw_input AS SELECT * FROM raw_input_df;") + + params_used: dict[str, Any] = { + "columns": dict(columns), + "normalize_rows_to_columns": True, + "trim_whitespace": bool(read_cfg.get("trim_whitespace", True)), + "header": bool(read_cfg.get("header", True)), + } + if read_cfg.get("delim") is not None: + params_used["delim"] = read_cfg.get("delim") + if read_cfg.get("encoding") is not None: + params_used["encoding"] = normalize_encoding(read_cfg.get("encoding")) + if read_cfg.get("skip") is not None: + params_used["skip"] = int(read_cfg.get("skip")) + if read_cfg.get("quote") is not None: + params_used["quote"] = read_cfg.get("quote") + if read_cfg.get("escape") is not None: + params_used["escape"] = read_cfg.get("escape") + return params_used + + def _execute_parquet_read( con: duckdb.DuckDBPyConnection, input_files: list[Path], diff --git a/toolkit/core/config_models.py b/toolkit/core/config_models.py index 26a1e4c..06275de 100644 --- a/toolkit/core/config_models.py +++ b/toolkit/core/config_models.py @@ -270,6 +270,7 @@ class CleanReadConfig(BaseModel): parallel: bool | None = None nullstr: str | list[str] | None = None columns: dict[str, str] | None = None + normalize_rows_to_columns: bool = False trim_whitespace: bool = True sample_size: int | None = None sheet_name: str | int | None = None @@ -284,6 +285,11 @@ class CleanReadConfig(BaseModel): def _normalize_columns(cls, value: Any) -> dict[str, str] | None: return normalize_columns_spec(value) + @field_validator("normalize_rows_to_columns", mode="before") + @classmethod + def _normalize_rows_to_columns(cls, value: Any) -> bool: + return parse_bool(value, "clean.read.normalize_rows_to_columns") + @field_validator("include", mode="before") @classmethod def _normalize_include(cls, value: Any) -> list[str] | None: diff --git a/toolkit/core/csv_read.py b/toolkit/core/csv_read.py index de8e4a1..f88350a 100644 --- a/toolkit/core/csv_read.py +++ b/toolkit/core/csv_read.py @@ -25,6 +25,7 @@ "include", "csv", "columns", + "normalize_rows_to_columns", "trim_whitespace", "sample_size", "sheet_name", @@ -45,6 +46,7 @@ "parallel", "nullstr", "columns", + "normalize_rows_to_columns", "trim_whitespace", "sheet_name", } @@ -60,6 +62,7 @@ "nullstr", "parallel", "columns", + "normalize_rows_to_columns", "trim_whitespace", } READ_SELECTION_KEYS = {"mode", "glob", "prefer_from_raw_run", "allow_ambiguous", "include"}