From 03bce2b10d26e1cbf87eb0f17bfa433f832661fe Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Tue, 3 Mar 2026 00:24:52 +0000 Subject: [PATCH] Add clean.read.parallel support for dirty CSV ingestion --- docs/config-schema.md | 1 + tests/test_clean_duckdb_read.py | 20 ++++++++++++++++++++ toolkit/clean/duckdb_read.py | 4 ++++ toolkit/core/config_models.py | 1 + toolkit/core/csv_read.py | 3 +++ 5 files changed, 29 insertions(+) diff --git a/docs/config-schema.md b/docs/config-schema.md index 121e3b5..9a43f8c 100644 --- a/docs/config-schema.md +++ b/docs/config-schema.md @@ -85,6 +85,7 @@ I path relativi sono sempre risolti rispetto alla directory che contiene `datase | `ignore_errors` | `bool \| null` | `null` | | `strict_mode` | `bool \| null` | `null` | | `null_padding` | `bool \| null` | `null` | +| `parallel` | `bool \| null` | `null` | | `nullstr` | `string \| list[string] \| null` | `null` | | `columns` | `dict[string,string] \| null` | `null` | | `trim_whitespace` | `bool` | `true` | diff --git a/tests/test_clean_duckdb_read.py b/tests/test_clean_duckdb_read.py index 5b91b81..3ba1e34 100644 --- a/tests/test_clean_duckdb_read.py +++ b/tests/test_clean_duckdb_read.py @@ -69,6 +69,26 @@ def test_read_raw_to_relation_strict_returns_strict_info(tmp_path: Path): con.close() +def test_read_raw_to_relation_passes_parallel_flag(tmp_path: Path): + input_file = tmp_path / "ok.csv" + input_file.write_text("a;b\n1;2\n", encoding="utf-8") + + con = duckdb.connect(":memory:") + logger = logging.getLogger("tests.clean.duckdb_read.parallel") + + info = duckdb_read.read_raw_to_relation( + con, + [input_file], + {"delim": ";", "encoding": "utf-8", "header": True, "parallel": False}, + "strict", + logger, + ) + + assert info.source == "strict" + assert info.params_used["parallel"] is False + con.close() + + def test_read_raw_to_relation_strict_error_message_uses_current_config_keys(tmp_path: Path): input_file = tmp_path / "bad.csv" input_file.write_text("a;b\n1;2;3\n", encoding="utf-8") diff --git a/toolkit/clean/duckdb_read.py b/toolkit/clean/duckdb_read.py index 12fbd6a..40061f3 100644 --- a/toolkit/clean/duckdb_read.py +++ b/toolkit/clean/duckdb_read.py @@ -147,6 +147,7 @@ def _csv_read_options(read_cfg: dict[str, Any]) -> tuple[list[str], dict[str, An strict_mode = read_cfg.get("strict_mode") ignore_errors = read_cfg.get("ignore_errors") null_padding = read_cfg.get("null_padding") + parallel = read_cfg.get("parallel") quote = read_cfg.get("quote") escape = read_cfg.get("escape") comment = read_cfg.get("comment") @@ -183,6 +184,9 @@ def _csv_read_options(read_cfg: dict[str, Any]) -> tuple[list[str], dict[str, An if null_padding is not None: opts.append(f"null_padding={'true' if bool(null_padding) else 'false'}") params_used["null_padding"] = bool(null_padding) + if parallel is not None: + opts.append(f"parallel={'true' if bool(parallel) else 'false'}") + params_used["parallel"] = bool(parallel) if quote is not None: opts.append(f"quote='{sql_str(quote)}'") params_used["quote"] = quote diff --git a/toolkit/core/config_models.py b/toolkit/core/config_models.py index a5eb64f..a4aece4 100644 --- a/toolkit/core/config_models.py +++ b/toolkit/core/config_models.py @@ -267,6 +267,7 @@ class CleanReadConfig(BaseModel): ignore_errors: bool | None = None strict_mode: bool | None = None null_padding: bool | None = None + parallel: bool | None = None nullstr: str | list[str] | None = None columns: dict[str, str] | None = None trim_whitespace: bool = True diff --git a/toolkit/core/csv_read.py b/toolkit/core/csv_read.py index 6b78932..d3e1f83 100644 --- a/toolkit/core/csv_read.py +++ b/toolkit/core/csv_read.py @@ -16,6 +16,7 @@ "ignore_errors", "strict_mode", "null_padding", + "parallel", "nullstr", "mode", "glob", @@ -40,6 +41,7 @@ "ignore_errors", "strict_mode", "null_padding", + "parallel", "nullstr", "columns", "trim_whitespace", @@ -54,6 +56,7 @@ "escape", "comment", "nullstr", + "parallel", "columns", "trim_whitespace", }