Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,32 @@ Convenzioni:
- `sql/mart/*.sql` definisce le tabelle MART
- per esempi pronti, vedi [examples/dataset_min.yml](examples/dataset_min.yml) e [examples/dataset_full.yml](examples/dataset_full.yml)

Caso utile per CSV pubblici multi-anno quasi stabili ma non identici:

```yaml
clean:
sql: "sql/clean.sql"
read:
source: config_only
header: false
skip: 1
delim: ";"
encoding: "utf-8"
columns:
column00: "VARCHAR"
column01: "VARCHAR"
column02: "VARCHAR"
normalize_rows_to_columns: true
```

Usa `normalize_rows_to_columns: true` quando:

- vuoi un layout posizionale canonico
- alcuni file o anni hanno righe piu corte del numero di colonne atteso
- vuoi evitare che una colonna aggiunta o mancante faccia saltare il `clean`

Non usarlo come default. Ha senso quando stai deliberatamente gestendo una fonte instabile con schema posizionale dichiarato.

## CLI

Workflow canonico:
Expand Down
3 changes: 3 additions & 0 deletions docs/config-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ I path relativi sono sempre risolti rispetto alla directory che contiene `datase
| `parallel` | `bool \| null` | `null` |
| `nullstr` | `string \| list[string] \| null` | `null` |
| `columns` | `dict[string,string] \| null` | `null` |
| `normalize_rows_to_columns` | `bool` | `false` |
| `trim_whitespace` | `bool` | `true` |
| `sample_size` | `int \| null` | `null` |
| `sheet_name` | `string \| int \| null` | `null` |
Expand All @@ -103,6 +104,8 @@ Note pratiche:
- RAW conserva il workbook originale senza convertirlo
- per `.xlsx`, le opzioni utili sono soprattutto `header`, `skip`, `columns`, `trim_whitespace`, `sheet_name`
- `sheet_name` usa il primo foglio se omesso
- `normalize_rows_to_columns: true` ha senso solo insieme a `columns`
- con `normalize_rows_to_columns: true`, il toolkit normalizza le righe corte del CSV allo schema atteso prima di esporre `raw_input`

`CleanValidate`:

Expand Down
21 changes: 21 additions & 0 deletions docs/conventions.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ clean:
- I suggested hints non forzano mai il preset robusto.
- Il preset robusto viene applicato solo se richiesto da `clean.read_mode` o dal fallback runtime del reader.

## Positional Fixed Schema

- `clean.read.columns` dichiara uno schema canonico per il reader CLEAN.
- `clean.read.normalize_rows_to_columns: true` attiva una lettura CSV normalizzata lato toolkit:
- richiede `clean.read.columns`
- salta l'header se `header: true`
- pad-da a destra le righe piu corte fino al numero di colonne atteso
- fallisce se una riga ha piu colonne di quelle dichiarate

Usarlo quando:

- stai leggendo CSV pubblici multi-anno con schema quasi stabile ma non identico
- vuoi mantenere un mapping posizionale unico nel `clean.sql`
- alcune annualita hanno colonne finali assenti o vuote

Non usarlo quando:

- il file ha gia uno schema per nome colonna stabile
- il problema e solo di delimitatore o quoting
- non hai deciso esplicitamente uno schema canonico

## Metadata

- Ogni `metadata.json` include `metadata_schema_version`.
Expand Down
63 changes: 63 additions & 0 deletions tests/test_clean_duckdb_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,69 @@ def test_read_raw_to_relation_handles_no_header_fixed_schema_without_extra_colum
con.close()


def test_read_raw_to_relation_normalizes_short_rows_to_fixed_schema(tmp_path: Path):
input_file = tmp_path / "ragged.csv"
input_file.write_text("A;B;C\n1;2\n3;4;5\n", encoding="utf-8")

con = duckdb.connect(":memory:")
logger = logging.getLogger("tests.clean.duckdb_read.normalize_rows")

info = duckdb_read.read_raw_to_relation(
con,
[input_file],
{
"delim": ";",
"encoding": "utf-8",
"header": False,
"columns": {
"col0": "VARCHAR",
"col1": "VARCHAR",
"col2": "VARCHAR",
},
"normalize_rows_to_columns": True,
},
"strict",
logger,
)

rows = con.execute("SELECT col0, col1, col2 FROM raw_input ORDER BY col0").fetchall()
assert info.source == "strict"
assert info.params_used["normalize_rows_to_columns"] is True
assert rows == [("1", "2", ""), ("3", "4", "5"), ("A", "B", "C")]
con.close()


def test_read_raw_to_relation_normalize_rows_skips_header_when_configured(tmp_path: Path):
input_file = tmp_path / "ragged_header.csv"
input_file.write_text("h0;h1;h2\n1;2\n", encoding="utf-8")

con = duckdb.connect(":memory:")
logger = logging.getLogger("tests.clean.duckdb_read.normalize_rows_header")

info = duckdb_read.read_raw_to_relation(
con,
[input_file],
{
"delim": ";",
"encoding": "utf-8",
"header": True,
"columns": {
"col0": "VARCHAR",
"col1": "VARCHAR",
"col2": "VARCHAR",
},
"normalize_rows_to_columns": True,
},
"strict",
logger,
)

rows = con.execute("SELECT col0, col1, col2 FROM raw_input").fetchall()
assert info.params_used["header"] is True
assert rows == [("1", "2", "")]
con.close()


def test_read_raw_to_relation_reads_xlsx_first_sheet(tmp_path: Path):
input_file = tmp_path / "ok.xlsx"
pd.DataFrame(
Expand Down
93 changes: 93 additions & 0 deletions toolkit/clean/duckdb_read.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import csv
import json
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -237,6 +238,9 @@ def _execute_csv_read(
input_files: list[Path],
read_cfg: dict[str, Any],
) -> dict[str, Any]:
if read_cfg.get("normalize_rows_to_columns"):
return _execute_normalized_csv_read(con, input_files, read_cfg)

paths = quote_list(input_files)
trim_whitespace = read_cfg.get("trim_whitespace", True)
sample_size = read_cfg.get("sample_size")
Expand Down Expand Up @@ -269,6 +273,95 @@ def _execute_csv_read(
return params_used


def _normalized_csv_reader_kwargs(read_cfg: dict[str, Any]) -> dict[str, Any]:
kwargs: dict[str, Any] = {
"delimiter": read_cfg.get("delim") or ",",
}
quote = read_cfg.get("quote")
if quote is not None:
kwargs["quotechar"] = quote
escape = read_cfg.get("escape")
if escape is not None:
kwargs["escapechar"] = escape
return kwargs


def _load_normalized_csv_frame(
input_file: Path,
read_cfg: dict[str, Any],
columns: dict[str, str],
) -> pd.DataFrame:
encoding = normalize_encoding(read_cfg.get("encoding")) or "utf-8"
trim_whitespace = bool(read_cfg.get("trim_whitespace", True))
header = bool(read_cfg.get("header", True))
skip = int(read_cfg.get("skip") or 0)
expected_names = list(columns.keys())
expected_len = len(expected_names)
skip_rows = skip + (1 if header else 0)

rows: list[list[Any]] = []
with input_file.open("r", encoding=encoding, newline="") as handle:
reader = csv.reader(handle, **_normalized_csv_reader_kwargs(read_cfg))
for _ in range(skip_rows):
try:
next(reader)
except StopIteration:
break
for row_number, row in enumerate(reader, start=skip_rows + 1):
if len(row) > expected_len:
raise ValueError(
"CSV row wider than configured columns while normalize_rows_to_columns=true. "
f"file={input_file} row={row_number} configured={expected_len} actual={len(row)}"
)
if len(row) < expected_len:
row = list(row) + [""] * (expected_len - len(row))
else:
row = list(row)
if trim_whitespace:
row = [value.strip() if isinstance(value, str) else value for value in row]
rows.append(row)

return pd.DataFrame(rows, columns=expected_names)


def _execute_normalized_csv_read(
con: duckdb.DuckDBPyConnection,
input_files: list[Path],
read_cfg: dict[str, Any],
) -> dict[str, Any]:
columns = read_cfg.get("columns")
if not columns:
raise ValueError(
"clean.read.normalize_rows_to_columns=true requires clean.read.columns"
)

frames = [
_load_normalized_csv_frame(input_file, read_cfg, columns)
for input_file in input_files
]
combined = pd.concat(frames, ignore_index=True) if len(frames) > 1 else frames[0]
con.register("raw_input_df", combined)
con.execute("CREATE OR REPLACE VIEW raw_input AS SELECT * FROM raw_input_df;")

params_used: dict[str, Any] = {
"columns": dict(columns),
"normalize_rows_to_columns": True,
"trim_whitespace": bool(read_cfg.get("trim_whitespace", True)),
"header": bool(read_cfg.get("header", True)),
}
if read_cfg.get("delim") is not None:
params_used["delim"] = read_cfg.get("delim")
if read_cfg.get("encoding") is not None:
params_used["encoding"] = normalize_encoding(read_cfg.get("encoding"))
if read_cfg.get("skip") is not None:
params_used["skip"] = int(read_cfg.get("skip"))
if read_cfg.get("quote") is not None:
params_used["quote"] = read_cfg.get("quote")
if read_cfg.get("escape") is not None:
params_used["escape"] = read_cfg.get("escape")
return params_used


def _execute_parquet_read(
con: duckdb.DuckDBPyConnection,
input_files: list[Path],
Expand Down
6 changes: 6 additions & 0 deletions toolkit/core/config_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ class CleanReadConfig(BaseModel):
parallel: bool | None = None
nullstr: str | list[str] | None = None
columns: dict[str, str] | None = None
normalize_rows_to_columns: bool = False
trim_whitespace: bool = True
sample_size: int | None = None
sheet_name: str | int | None = None
Expand All @@ -284,6 +285,11 @@ class CleanReadConfig(BaseModel):
def _normalize_columns(cls, value: Any) -> dict[str, str] | None:
return normalize_columns_spec(value)

@field_validator("normalize_rows_to_columns", mode="before")
@classmethod
def _normalize_rows_to_columns(cls, value: Any) -> bool:
return parse_bool(value, "clean.read.normalize_rows_to_columns")

@field_validator("include", mode="before")
@classmethod
def _normalize_include(cls, value: Any) -> list[str] | None:
Expand Down
3 changes: 3 additions & 0 deletions toolkit/core/csv_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"include",
"csv",
"columns",
"normalize_rows_to_columns",
"trim_whitespace",
"sample_size",
"sheet_name",
Expand All @@ -45,6 +46,7 @@
"parallel",
"nullstr",
"columns",
"normalize_rows_to_columns",
"trim_whitespace",
"sheet_name",
}
Expand All @@ -60,6 +62,7 @@
"nullstr",
"parallel",
"columns",
"normalize_rows_to_columns",
"trim_whitespace",
}
READ_SELECTION_KEYS = {"mode", "glob", "prefer_from_raw_run", "allow_ambiguous", "include"}
Expand Down