Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/test_clean_input_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _fake_run_sql(input_files, sql_query, output_path, read_cfg=None, **_kwargs)
seen["output_path"] = output_path
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(b"PAR1")
return ("strict", {"delim": ";", "decimal": ",", "encoding": "utf-8"})
return ("strict", {"delim": ";", "decimal": ",", "encoding": "utf-8"}, 42)

monkeypatch.setattr("toolkit.clean.run._run_sql", _fake_run_sql)
run_clean("demo", 2024, str(tmp_path), clean_cfg, logger or _NoopLogger())
Expand Down
63 changes: 63 additions & 0 deletions tests/test_run_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import json
from pathlib import Path

import time

from toolkit.core.run_context import RunContext, get_run_dir, read_run_record


Expand Down Expand Up @@ -80,6 +82,67 @@ def test_read_run_record_marks_absolute_paths_outside_root_as_non_portable(tmp_p
assert record["_portability"]["warnings"] == ["/outside/root/file.csv"]


def test_layer_metrics_default_to_null(tmp_path: Path) -> None:
ctx = RunContext("ds", 2030, root=str(tmp_path))
stored = _read_context(ctx.path)
for layer in ("raw", "clean", "mart"):
metrics = stored["layers"][layer]["metrics"]
assert metrics["output_rows"] is None
assert metrics["output_bytes"] is None
assert metrics["tables_count"] is None


def test_set_layer_metrics_persists(tmp_path: Path) -> None:
ctx = RunContext("ds", 2030, root=str(tmp_path))
ctx.set_layer_metrics("clean", output_rows=1000, output_bytes=204800)
stored = _read_context(ctx.path)
m = stored["layers"]["clean"]["metrics"]
assert m["output_rows"] == 1000
assert m["output_bytes"] == 204800
assert m["tables_count"] is None


def test_set_layer_metrics_mart_with_tables_count(tmp_path: Path) -> None:
ctx = RunContext("ds", 2030, root=str(tmp_path))
ctx.set_layer_metrics("mart", output_rows=5000, output_bytes=409600, tables_count=3)
stored = _read_context(ctx.path)
m = stored["layers"]["mart"]["metrics"]
assert m["output_rows"] == 5000
assert m["output_bytes"] == 409600
assert m["tables_count"] == 3


def test_duration_seconds_computed_after_complete(tmp_path: Path) -> None:
ctx = RunContext("ds", 2030, root=str(tmp_path))
ctx.start_layer("raw")
time.sleep(0.05)
ctx.complete_layer("raw")
ctx.complete_run()
stored = _read_context(ctx.path)
assert stored["layers"]["raw"]["duration_seconds"] is not None
assert stored["layers"]["raw"]["duration_seconds"] >= 0
assert stored["duration_seconds"] is not None
assert stored["duration_seconds"] >= 0


def test_duration_seconds_null_while_running(tmp_path: Path) -> None:
ctx = RunContext("ds", 2030, root=str(tmp_path))
stored = _read_context(ctx.path)
assert stored["duration_seconds"] is None
assert stored["layers"]["raw"]["duration_seconds"] is None


def test_metrics_survive_json_round_trip(tmp_path: Path) -> None:
ctx = RunContext("ds", 2030, root=str(tmp_path))
ctx.set_layer_metrics("raw", output_bytes=8192)
ctx.set_layer_metrics("clean", output_rows=500, output_bytes=16384)
ctx.set_layer_metrics("mart", output_rows=200, output_bytes=4096, tables_count=2)
record = read_run_record(get_run_dir(tmp_path, "ds", 2030), ctx.run_id)
assert record["layers"]["raw"]["metrics"]["output_bytes"] == 8192
assert record["layers"]["clean"]["metrics"]["output_rows"] == 500
assert record["layers"]["mart"]["metrics"]["tables_count"] == 2


def test_read_run_record_does_not_treat_error_message_as_path(tmp_path: Path) -> None:
run_dir = get_run_dir(tmp_path, "demo_ds", 2022)
run_dir.mkdir(parents=True, exist_ok=True)
Expand Down
9 changes: 6 additions & 3 deletions toolkit/clean/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def _run_sql(
read_cfg: dict[str, Any] | None = None,
read_mode: str = "fallback",
logger=None,
) -> tuple[str, dict[str, Any]]:
) -> tuple[str, dict[str, Any], int]:
con = duckdb.connect(":memory:")
try:
read_info = read_raw_to_relation(con, input_files, read_cfg, read_mode, logger)
Expand All @@ -197,7 +197,8 @@ def _run_sql(
con.execute(
f"COPY clean_out TO '{sql_path(output_path)}' (FORMAT PARQUET);"
)
return read_info.source, read_info.params_used
row_count: int = con.execute("SELECT count(*) FROM clean_out").fetchone()[0]
return read_info.source, read_info.params_used, row_count
finally:
con.close()

Expand Down Expand Up @@ -253,14 +254,15 @@ def run_clean(
)

output_path = out_dir / f"{dataset}_{year}_clean.parquet"
read_source_used, read_params_used = _run_sql(
read_source_used, read_params_used, output_rows = _run_sql(
input_files,
sql,
output_path,
read_cfg=relation_read_cfg,
read_mode=read_mode,
logger=logger,
)
output_bytes: int | None = output_path.stat().st_size if output_path.exists() else None

outputs = [file_record(output_path)]
metadata_payload = _clean_metadata_payload(
Expand Down Expand Up @@ -294,3 +296,4 @@ def run_clean(
warnings_count=None,
)
logger.info(f"CLEAN -> {output_path}")
return {"output_rows": output_rows, "output_bytes": output_bytes}
4 changes: 3 additions & 1 deletion toolkit/cli/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,10 @@ def _execute_layer(layer_name: str, target, *args, **kwargs) -> None:
layer_logger = bind_logger(base_logger, layer=layer_name)
context.start_layer(layer_name)
try:
target(*args, logger=layer_logger, **kwargs)
metrics = target(*args, logger=layer_logger, **kwargs)
context.complete_layer(layer_name)
if isinstance(metrics, dict):
context.set_layer_metrics(layer_name, **metrics)

summary = _validation_runner(layer_name)(cfg, year, layer_logger)
context.set_validation(layer_name, summary)
Expand Down
44 changes: 41 additions & 3 deletions toolkit/core/run_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid
from datetime import datetime, timezone
from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath
from typing import Any, Dict
from typing import Any, Dict, Optional

from toolkit.core.paths import to_root_relative

Expand All @@ -22,6 +22,21 @@ def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()


def _duration_seconds(started: Optional[str], finished: Optional[str]) -> Optional[float]:
if started is None or finished is None:
return None
try:
s = datetime.fromisoformat(started)
f = datetime.fromisoformat(finished)
return round((f - s).total_seconds(), 3)
except Exception:
return None


def _empty_layer_metrics() -> Dict[str, Any]:
return {"output_rows": None, "output_bytes": None, "tables_count": None}


def get_run_dir(root: Path, dataset: str, year: int) -> Path:
return root / "data" / "_runs" / dataset / str(year)

Expand Down Expand Up @@ -155,7 +170,7 @@ def __init__(
self.finished_at: str | None = None
self.status = "RUNNING"
self.layers = {
layer: {"status": "PENDING", "started_at": None, "finished_at": None}
layer: {"status": "PENDING", "started_at": None, "finished_at": None, "metrics": _empty_layer_metrics()}
for layer in _LAYER_NAMES
}
self.validations = {layer: {} for layer in _LAYER_NAMES}
Expand All @@ -169,14 +184,21 @@ def path(self) -> Path:
return self._path

def to_dict(self) -> Dict[str, Any]:
layers_out = {}
for layer, info in self.layers.items():
layers_out[layer] = {
**info,
"duration_seconds": _duration_seconds(info.get("started_at"), info.get("finished_at")),
}
return {
"dataset": self.dataset,
"year": self.year,
"run_id": self.run_id,
"started_at": self.started_at,
"finished_at": self.finished_at,
"duration_seconds": _duration_seconds(self.started_at, self.finished_at),
"status": self.status,
"layers": self.layers,
"layers": layers_out,
"validations": self.validations,
"resumed_from": self.resumed_from,
"error": self.error,
Expand Down Expand Up @@ -220,6 +242,22 @@ def set_validation(self, layer: str, summary: Dict[str, Any]) -> None:
self.validations[layer] = summary
self.save()

def set_layer_metrics(
self,
layer: str,
*,
output_rows: Optional[int] = None,
output_bytes: Optional[int] = None,
tables_count: Optional[int] = None,
) -> None:
info = self._layer(layer)
info["metrics"] = {
"output_rows": output_rows,
"output_bytes": output_bytes,
"tables_count": tables_count,
}
self.save()

def complete_run(self, *, success_with_warnings: bool = False) -> None:
self.finished_at = _now_iso()
if self.status != "FAILED":
Expand Down
4 changes: 4 additions & 0 deletions toolkit/mart/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def run_mart(
written: list[Path] = []
executed: list[dict[str, Any]] = []
debug_tables: list[dict[str, Any]] = []
total_rows = 0

for i, table in enumerate(tables, start=1):
if not isinstance(table, dict):
Expand All @@ -102,6 +103,7 @@ def run_mart(

# Create table and export
con.execute(f"CREATE OR REPLACE TABLE {name} AS {sql}")
total_rows += con.execute(f"SELECT count(*) FROM {name}").fetchone()[0]

out = mart_dir / f"{name}.parquet"
con.execute(f"COPY {name} TO '{out}' (FORMAT PARQUET);")
Expand Down Expand Up @@ -158,4 +160,6 @@ def run_mart(
errors_count=None,
warnings_count=None,
)
total_bytes = sum(p.stat().st_size for p in written if p.exists())
logger.info(f"MART -> {mart_dir}")
return {"output_rows": total_rows, "output_bytes": total_bytes, "tables_count": len(written)}
3 changes: 3 additions & 0 deletions toolkit/raw/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,6 @@ def run_raw(
raise RuntimeError(f"RAW validation failed for {dataset} {year}. See {vpath}")
else:
logger.info(f"RAW QA OK ({dataset} {year}) -> {vpath.name}")

output_bytes = sum(f.get("bytes", 0) for f in files_written) if files_written else None
return {"output_bytes": output_bytes}
Loading