diff --git a/tests/test_clean_input_selection.py b/tests/test_clean_input_selection.py index b527a3a..04319b5 100644 --- a/tests/test_clean_input_selection.py +++ b/tests/test_clean_input_selection.py @@ -47,7 +47,7 @@ def _fake_run_sql(input_files, sql_query, output_path, read_cfg=None, **_kwargs) seen["output_path"] = output_path output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(b"PAR1") - return ("strict", {"delim": ";", "decimal": ",", "encoding": "utf-8"}) + return ("strict", {"delim": ";", "decimal": ",", "encoding": "utf-8"}, 42) monkeypatch.setattr("toolkit.clean.run._run_sql", _fake_run_sql) run_clean("demo", 2024, str(tmp_path), clean_cfg, logger or _NoopLogger()) diff --git a/tests/test_run_context.py b/tests/test_run_context.py index 15dbc87..d753f97 100644 --- a/tests/test_run_context.py +++ b/tests/test_run_context.py @@ -3,6 +3,8 @@ import json from pathlib import Path +import time + from toolkit.core.run_context import RunContext, get_run_dir, read_run_record @@ -80,6 +82,67 @@ def test_read_run_record_marks_absolute_paths_outside_root_as_non_portable(tmp_p assert record["_portability"]["warnings"] == ["/outside/root/file.csv"] +def test_layer_metrics_default_to_null(tmp_path: Path) -> None: + ctx = RunContext("ds", 2030, root=str(tmp_path)) + stored = _read_context(ctx.path) + for layer in ("raw", "clean", "mart"): + metrics = stored["layers"][layer]["metrics"] + assert metrics["output_rows"] is None + assert metrics["output_bytes"] is None + assert metrics["tables_count"] is None + + +def test_set_layer_metrics_persists(tmp_path: Path) -> None: + ctx = RunContext("ds", 2030, root=str(tmp_path)) + ctx.set_layer_metrics("clean", output_rows=1000, output_bytes=204800) + stored = _read_context(ctx.path) + m = stored["layers"]["clean"]["metrics"] + assert m["output_rows"] == 1000 + assert m["output_bytes"] == 204800 + assert m["tables_count"] is None + + +def test_set_layer_metrics_mart_with_tables_count(tmp_path: Path) -> None: + ctx = RunContext("ds", 2030, root=str(tmp_path)) + ctx.set_layer_metrics("mart", output_rows=5000, output_bytes=409600, tables_count=3) + stored = _read_context(ctx.path) + m = stored["layers"]["mart"]["metrics"] + assert m["output_rows"] == 5000 + assert m["output_bytes"] == 409600 + assert m["tables_count"] == 3 + + +def test_duration_seconds_computed_after_complete(tmp_path: Path) -> None: + ctx = RunContext("ds", 2030, root=str(tmp_path)) + ctx.start_layer("raw") + time.sleep(0.05) + ctx.complete_layer("raw") + ctx.complete_run() + stored = _read_context(ctx.path) + assert stored["layers"]["raw"]["duration_seconds"] is not None + assert stored["layers"]["raw"]["duration_seconds"] >= 0 + assert stored["duration_seconds"] is not None + assert stored["duration_seconds"] >= 0 + + +def test_duration_seconds_null_while_running(tmp_path: Path) -> None: + ctx = RunContext("ds", 2030, root=str(tmp_path)) + stored = _read_context(ctx.path) + assert stored["duration_seconds"] is None + assert stored["layers"]["raw"]["duration_seconds"] is None + + +def test_metrics_survive_json_round_trip(tmp_path: Path) -> None: + ctx = RunContext("ds", 2030, root=str(tmp_path)) + ctx.set_layer_metrics("raw", output_bytes=8192) + ctx.set_layer_metrics("clean", output_rows=500, output_bytes=16384) + ctx.set_layer_metrics("mart", output_rows=200, output_bytes=4096, tables_count=2) + record = read_run_record(get_run_dir(tmp_path, "ds", 2030), ctx.run_id) + assert record["layers"]["raw"]["metrics"]["output_bytes"] == 8192 + assert record["layers"]["clean"]["metrics"]["output_rows"] == 500 + assert record["layers"]["mart"]["metrics"]["tables_count"] == 2 + + def test_read_run_record_does_not_treat_error_message_as_path(tmp_path: Path) -> None: run_dir = get_run_dir(tmp_path, "demo_ds", 2022) run_dir.mkdir(parents=True, exist_ok=True) diff --git a/toolkit/clean/run.py b/toolkit/clean/run.py index 5ebbaf4..72e5028 100644 --- a/toolkit/clean/run.py +++ b/toolkit/clean/run.py @@ -188,7 +188,7 @@ def _run_sql( read_cfg: dict[str, Any] | None = None, read_mode: str = "fallback", logger=None, -) -> tuple[str, dict[str, Any]]: +) -> tuple[str, dict[str, Any], int]: con = duckdb.connect(":memory:") try: read_info = read_raw_to_relation(con, input_files, read_cfg, read_mode, logger) @@ -197,7 +197,8 @@ def _run_sql( con.execute( f"COPY clean_out TO '{sql_path(output_path)}' (FORMAT PARQUET);" ) - return read_info.source, read_info.params_used + row_count: int = con.execute("SELECT count(*) FROM clean_out").fetchone()[0] + return read_info.source, read_info.params_used, row_count finally: con.close() @@ -253,7 +254,7 @@ def run_clean( ) output_path = out_dir / f"{dataset}_{year}_clean.parquet" - read_source_used, read_params_used = _run_sql( + read_source_used, read_params_used, output_rows = _run_sql( input_files, sql, output_path, @@ -261,6 +262,7 @@ def run_clean( read_mode=read_mode, logger=logger, ) + output_bytes: int | None = output_path.stat().st_size if output_path.exists() else None outputs = [file_record(output_path)] metadata_payload = _clean_metadata_payload( @@ -294,3 +296,4 @@ def run_clean( warnings_count=None, ) logger.info(f"CLEAN -> {output_path}") + return {"output_rows": output_rows, "output_bytes": output_bytes} diff --git a/toolkit/cli/cmd_run.py b/toolkit/cli/cmd_run.py index 7f76bca..9255996 100644 --- a/toolkit/cli/cmd_run.py +++ b/toolkit/cli/cmd_run.py @@ -217,8 +217,10 @@ def _execute_layer(layer_name: str, target, *args, **kwargs) -> None: layer_logger = bind_logger(base_logger, layer=layer_name) context.start_layer(layer_name) try: - target(*args, logger=layer_logger, **kwargs) + metrics = target(*args, logger=layer_logger, **kwargs) context.complete_layer(layer_name) + if isinstance(metrics, dict): + context.set_layer_metrics(layer_name, **metrics) summary = _validation_runner(layer_name)(cfg, year, layer_logger) context.set_validation(layer_name, summary) diff --git a/toolkit/core/run_context.py b/toolkit/core/run_context.py index 88c2dbd..c12dc30 100644 --- a/toolkit/core/run_context.py +++ b/toolkit/core/run_context.py @@ -5,7 +5,7 @@ import uuid from datetime import datetime, timezone from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath -from typing import Any, Dict +from typing import Any, Dict, Optional from toolkit.core.paths import to_root_relative @@ -22,6 +22,21 @@ def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() +def _duration_seconds(started: Optional[str], finished: Optional[str]) -> Optional[float]: + if started is None or finished is None: + return None + try: + s = datetime.fromisoformat(started) + f = datetime.fromisoformat(finished) + return round((f - s).total_seconds(), 3) + except Exception: + return None + + +def _empty_layer_metrics() -> Dict[str, Any]: + return {"output_rows": None, "output_bytes": None, "tables_count": None} + + def get_run_dir(root: Path, dataset: str, year: int) -> Path: return root / "data" / "_runs" / dataset / str(year) @@ -155,7 +170,7 @@ def __init__( self.finished_at: str | None = None self.status = "RUNNING" self.layers = { - layer: {"status": "PENDING", "started_at": None, "finished_at": None} + layer: {"status": "PENDING", "started_at": None, "finished_at": None, "metrics": _empty_layer_metrics()} for layer in _LAYER_NAMES } self.validations = {layer: {} for layer in _LAYER_NAMES} @@ -169,14 +184,21 @@ def path(self) -> Path: return self._path def to_dict(self) -> Dict[str, Any]: + layers_out = {} + for layer, info in self.layers.items(): + layers_out[layer] = { + **info, + "duration_seconds": _duration_seconds(info.get("started_at"), info.get("finished_at")), + } return { "dataset": self.dataset, "year": self.year, "run_id": self.run_id, "started_at": self.started_at, "finished_at": self.finished_at, + "duration_seconds": _duration_seconds(self.started_at, self.finished_at), "status": self.status, - "layers": self.layers, + "layers": layers_out, "validations": self.validations, "resumed_from": self.resumed_from, "error": self.error, @@ -220,6 +242,22 @@ def set_validation(self, layer: str, summary: Dict[str, Any]) -> None: self.validations[layer] = summary self.save() + def set_layer_metrics( + self, + layer: str, + *, + output_rows: Optional[int] = None, + output_bytes: Optional[int] = None, + tables_count: Optional[int] = None, + ) -> None: + info = self._layer(layer) + info["metrics"] = { + "output_rows": output_rows, + "output_bytes": output_bytes, + "tables_count": tables_count, + } + self.save() + def complete_run(self, *, success_with_warnings: bool = False) -> None: self.finished_at = _now_iso() if self.status != "FAILED": diff --git a/toolkit/mart/run.py b/toolkit/mart/run.py index be17aa4..327fc63 100644 --- a/toolkit/mart/run.py +++ b/toolkit/mart/run.py @@ -77,6 +77,7 @@ def run_mart( written: list[Path] = [] executed: list[dict[str, Any]] = [] debug_tables: list[dict[str, Any]] = [] + total_rows = 0 for i, table in enumerate(tables, start=1): if not isinstance(table, dict): @@ -102,6 +103,7 @@ def run_mart( # Create table and export con.execute(f"CREATE OR REPLACE TABLE {name} AS {sql}") + total_rows += con.execute(f"SELECT count(*) FROM {name}").fetchone()[0] out = mart_dir / f"{name}.parquet" con.execute(f"COPY {name} TO '{out}' (FORMAT PARQUET);") @@ -158,4 +160,6 @@ def run_mart( errors_count=None, warnings_count=None, ) + total_bytes = sum(p.stat().st_size for p in written if p.exists()) logger.info(f"MART -> {mart_dir}") + return {"output_rows": total_rows, "output_bytes": total_bytes, "tables_count": len(written)} diff --git a/toolkit/raw/run.py b/toolkit/raw/run.py index dce6a9c..7ae8b2e 100644 --- a/toolkit/raw/run.py +++ b/toolkit/raw/run.py @@ -324,3 +324,6 @@ def run_raw( raise RuntimeError(f"RAW validation failed for {dataset} {year}. See {vpath}") else: logger.info(f"RAW QA OK ({dataset} {year}) -> {vpath.name}") + + output_bytes = sum(f.get("bytes", 0) for f in files_written) if files_written else None + return {"output_bytes": output_bytes}