diff --git a/nemo_retriever/README.md b/nemo_retriever/README.md index c3292ba69..cc0bde0f2 100644 --- a/nemo_retriever/README.md +++ b/nemo_retriever/README.md @@ -332,6 +332,34 @@ If you want a separate detection file for ad hoc inspection, set `write_detectio `nemo_retriever/harness/test_configs.yaml`. When tags are supplied with `--tag`, they are persisted in `results.json` and in session rollups for sweep/nightly runs. +### Observability artifacts (extracts/chunks) + +The harness can optionally persist run-wide JSONL shards for extracted page content and pre-embed +chunk rows. This is intended for semantic comparison/debugging, and the format is not a +legacy per-document dump. + +Enable in `nemo_retriever/harness/test_configs.yaml`: + +- `write_extract_artifacts: true` +- `write_chunk_manifest: true` +- `observability_archive_dir: null` (or set a durable mirror root) + +When enabled, `results.json -> artifacts` includes: + +- `extract_artifacts_dir` +- `chunk_manifest_dir` +- `ingest_errors_file` +- optional `durable_extract_artifacts_dir` and `durable_chunk_manifest_dir` + +CLI overrides for one-off runs: + +```bash +retriever harness run --dataset jp20 --preset single_gpu \ + --override write_extract_artifacts=true \ + --override write_chunk_manifest=true \ + --override observability_archive_dir=/path/to/archive +``` + `results.json` also includes a nested `run_metadata` block for lightweight environment context: - `host` diff --git a/nemo_retriever/harness/test_configs.yaml b/nemo_retriever/harness/test_configs.yaml index de871ebda..4cdb66a13 100644 --- a/nemo_retriever/harness/test_configs.yaml +++ b/nemo_retriever/harness/test_configs.yaml @@ -14,6 +14,9 @@ active: hybrid: false embed_model_name: nvidia/llama-nemotron-embed-1b-v2 write_detection_file: false + write_extract_artifacts: true + write_chunk_manifest: true + observability_archive_dir: null presets: single_gpu: diff --git a/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py b/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py index b7e96ac93..1c08e0f96 100644 --- a/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py +++ b/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py @@ -201,6 +201,45 @@ def main( dir_okay=False, help="Optional JSON file path to write end-of-run detection counts summary.", ), + extract_output_dir: Optional[Path] = typer.Option( + None, + "--extract-output-dir", + path_type=Path, + file_okay=False, + dir_okay=True, + help="Optional directory where extract snapshots are written as JSONL shards.", + ), + durable_extract_output_dir: Optional[Path] = typer.Option( + None, + "--durable-extract-output-dir", + path_type=Path, + file_okay=False, + dir_okay=True, + help="Optional durable mirror directory for extract snapshots.", + ), + chunk_manifest_dir: Optional[Path] = typer.Option( + None, + "--chunk-manifest-dir", + path_type=Path, + file_okay=False, + dir_okay=True, + help="Optional directory where pre-embed chunk manifests are written as JSONL shards.", + ), + durable_chunk_manifest_dir: Optional[Path] = typer.Option( + None, + "--durable-chunk-manifest-dir", + path_type=Path, + file_okay=False, + dir_okay=True, + help="Optional durable mirror directory for chunk manifests.", + ), + ingest_errors_file: Optional[Path] = typer.Option( + None, + "--ingest-errors-file", + path_type=Path, + dir_okay=False, + help="Optional JSON file path where ingest error rows are written on failure.", + ), recall_match_mode: str = typer.Option( "pdf_page", "--recall-match-mode", @@ -486,6 +525,21 @@ def main( if recall_match_mode not in {"pdf_page", "pdf_only"}: raise ValueError(f"Unsupported --recall-match-mode: {recall_match_mode}") + if durable_extract_output_dir is not None and extract_output_dir is None: + raise typer.BadParameter("--durable-extract-output-dir requires --extract-output-dir") + if durable_chunk_manifest_dir is not None and chunk_manifest_dir is None: + raise typer.BadParameter("--durable-chunk-manifest-dir requires --chunk-manifest-dir") + + extract_output_dir = extract_output_dir.expanduser().resolve() if extract_output_dir is not None else None + durable_extract_output_dir = ( + durable_extract_output_dir.expanduser().resolve() if durable_extract_output_dir is not None else None + ) + chunk_manifest_dir = chunk_manifest_dir.expanduser().resolve() if chunk_manifest_dir is not None else None + durable_chunk_manifest_dir = ( + durable_chunk_manifest_dir.expanduser().resolve() if durable_chunk_manifest_dir is not None else None + ) + ingest_errors_file = ingest_errors_file.expanduser().resolve() if ingest_errors_file is not None else None + os.environ["RAY_LOG_TO_DRIVER"] = "1" if ray_log_to_driver else "0" # Use an absolute path so driver and Ray actors resolve the same LanceDB URI. lancedb_uri = str(Path(lancedb_uri).expanduser().resolve()) @@ -665,6 +719,8 @@ def _extract_params(batch_tuning: dict, **overrides: Any) -> ExtractParams: max_tokens=text_chunk_max_tokens or 1024, overlap_tokens=text_chunk_overlap_tokens if text_chunk_overlap_tokens is not None else 150, ) + extract_snapshot_drop_columns = ["bytes", "page_image"] if input_type in {"pdf", "doc", "image"} else [] + chunk_manifest_drop_columns = ["_image_b64", "page_image"] if input_type == "txt": ingestor = ingestor.files(file_patterns).extract_txt(_text_chunk_params) @@ -679,11 +735,26 @@ def _extract_params(batch_tuning: dict, **overrides: Any) -> ExtractParams: _extract_params(_pdf_batch_tuning, inference_batch_size=page_elements_batch_size) ) + if extract_output_dir is not None: + ingestor = ingestor.write_observability_snapshot( + str(extract_output_dir), + stage_name="extract", + durable_output_dir=str(durable_extract_output_dir) if durable_extract_output_dir is not None else None, + drop_columns=extract_snapshot_drop_columns, + ) + enable_text_chunk = text_chunk or text_chunk_max_tokens is not None or text_chunk_overlap_tokens is not None if enable_text_chunk: ingestor = ingestor.split(_text_chunk_params) - ingestor = ingestor.embed(embed_params) + ingestor = ingestor.embed( + embed_params, + chunk_manifest_output_dir=str(chunk_manifest_dir) if chunk_manifest_dir is not None else None, + durable_chunk_manifest_dir=( + str(durable_chunk_manifest_dir) if durable_chunk_manifest_dir is not None else None + ), + chunk_manifest_drop_columns=chunk_manifest_drop_columns, + ) logger.info("Running extraction...") ingest_start = time.perf_counter() @@ -717,7 +788,8 @@ def _extract_params(batch_tuning: dict, **overrides: Any) -> ExtractParams: # Error out, stop processing, and write top 5 errors rows to a local file for analysis. if error_count > 0: - error_file = Path("ingest_errors.json").resolve() + error_file = ingest_errors_file or Path("ingest_errors.json").resolve() + error_file.parent.mkdir(parents=True, exist_ok=True) max_error_rows_to_write = 5 error_rows_to_write = error_rows.take(min(max_error_rows_to_write, error_count)) with error_file.open("w", encoding="utf-8") as fh: diff --git a/nemo_retriever/src/nemo_retriever/harness/config.py b/nemo_retriever/src/nemo_retriever/harness/config.py index 4daef726b..22e58e7fc 100644 --- a/nemo_retriever/src/nemo_retriever/harness/config.py +++ b/nemo_retriever/src/nemo_retriever/harness/config.py @@ -61,6 +61,9 @@ class HarnessConfig: hybrid: bool = False embed_model_name: str = "nvidia/llama-nemotron-embed-1b-v2" write_detection_file: bool = False + write_extract_artifacts: bool = False + write_chunk_manifest: bool = False + observability_archive_dir: str | None = None pdf_extract_workers: int = 8 pdf_extract_num_cpus: float = 2.0 @@ -101,6 +104,9 @@ def validate(self) -> list[str]: if self.recall_adapter not in VALID_RECALL_ADAPTERS: errors.append(f"recall_adapter must be one of {sorted(VALID_RECALL_ADAPTERS)}") + if self.observability_archive_dir is not None and not str(self.observability_archive_dir).strip(): + errors.append("observability_archive_dir must be a non-empty string when set") + for name in TUNING_FIELDS: val = getattr(self, name) if name.startswith("gpu_") and float(val) < 0.0: @@ -207,6 +213,9 @@ def _apply_env_overrides(config_dict: dict[str, Any]) -> None: "HARNESS_HYBRID": ("hybrid", _parse_bool), "HARNESS_EMBED_MODEL_NAME": ("embed_model_name", str), "HARNESS_WRITE_DETECTION_FILE": ("write_detection_file", _parse_bool), + "HARNESS_WRITE_EXTRACT_ARTIFACTS": ("write_extract_artifacts", _parse_bool), + "HARNESS_WRITE_CHUNK_MANIFEST": ("write_chunk_manifest", _parse_bool), + "HARNESS_OBSERVABILITY_ARCHIVE_DIR": ("observability_archive_dir", str), } for key in TUNING_FIELDS: @@ -309,6 +318,17 @@ def load_harness_config( if merged.get("artifacts_dir") is not None: merged["artifacts_dir"] = _resolve_path_like(str(merged["artifacts_dir"]), REPO_ROOT) + if merged.get("observability_archive_dir") is not None: + archive_dir_raw = str(merged["observability_archive_dir"]) + if archive_dir_raw.strip(): + merged["observability_archive_dir"] = _resolve_path_like( + archive_dir_raw, + REPO_ROOT, + ) + else: + # Preserve blank values so dataclass validation raises a clear error. + merged["observability_archive_dir"] = archive_dir_raw + if merged.get("lancedb_uri") is None: merged["lancedb_uri"] = "lancedb" diff --git a/nemo_retriever/src/nemo_retriever/harness/run.py b/nemo_retriever/src/nemo_retriever/harness/run.py index f0d10509c..e68c35559 100644 --- a/nemo_retriever/src/nemo_retriever/harness/run.py +++ b/nemo_retriever/src/nemo_retriever/harness/run.py @@ -188,7 +188,48 @@ def _resolve_lancedb_uri(cfg: HarnessConfig, artifact_dir: Path) -> str: return str(p) -def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple[list[str], Path, Path, Path]: +def _resolve_observability_archive_run_dir(cfg: HarnessConfig, artifact_dir: Path) -> Path | None: + raw = cfg.observability_archive_dir + if raw is None: + return None + + root = Path(str(raw)).expanduser() + if not root.is_absolute(): + root = (Path.cwd() / root).resolve() + + parent_name = artifact_dir.parent.name.strip() + run_name = artifact_dir.name.strip() or "run" + if parent_name and parent_name != run_name: + return root / parent_name / run_name + return root / run_name + + +def _resolve_observability_artifacts(cfg: HarnessConfig, artifact_dir: Path) -> dict[str, Path]: + artifacts: dict[str, Path] = { + "ingest_errors_file": artifact_dir / "ingest_errors.json", + } + + observability_dir = artifact_dir / "observability" + if cfg.write_extract_artifacts: + artifacts["extract_artifacts_dir"] = observability_dir / "extracts" + if cfg.write_chunk_manifest: + artifacts["chunk_manifest_dir"] = observability_dir / "chunks" + + durable_run_dir = _resolve_observability_archive_run_dir(cfg, artifact_dir) + if durable_run_dir is not None: + if cfg.write_extract_artifacts: + artifacts["durable_extract_artifacts_dir"] = durable_run_dir / "extracts" + if cfg.write_chunk_manifest: + artifacts["durable_chunk_manifest_dir"] = durable_run_dir / "chunks" + + return artifacts + + +def _build_command( + cfg: HarnessConfig, + artifact_dir: Path, + run_id: str, +) -> tuple[list[str], Path, Path, Path, dict[str, Path]]: runtime_dir = artifact_dir / "runtime_metrics" runtime_dir.mkdir(parents=True, exist_ok=True) if cfg.write_detection_file: @@ -201,6 +242,7 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple recall_adapter=cfg.recall_adapter, output_dir=runtime_dir, ) + observability_artifacts = _resolve_observability_artifacts(cfg, artifact_dir) cmd = [ sys.executable, @@ -254,6 +296,8 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple run_id, "--detection-summary-file", str(detection_summary_file), + "--ingest-errors-file", + str(observability_artifacts["ingest_errors_file"]), "--lancedb-uri", _resolve_lancedb_uri(cfg, artifact_dir), ] @@ -262,8 +306,16 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple cmd += ["--ray-address", cfg.ray_address] if cfg.hybrid: cmd += ["--hybrid"] + if cfg.write_extract_artifacts and "extract_artifacts_dir" in observability_artifacts: + cmd += ["--extract-output-dir", str(observability_artifacts["extract_artifacts_dir"])] + if "durable_extract_artifacts_dir" in observability_artifacts: + cmd += ["--durable-extract-output-dir", str(observability_artifacts["durable_extract_artifacts_dir"])] + if cfg.write_chunk_manifest and "chunk_manifest_dir" in observability_artifacts: + cmd += ["--chunk-manifest-dir", str(observability_artifacts["chunk_manifest_dir"])] + if "durable_chunk_manifest_dir" in observability_artifacts: + cmd += ["--durable-chunk-manifest-dir", str(observability_artifacts["durable_chunk_manifest_dir"])] - return cmd, runtime_dir, detection_summary_file, query_csv + return cmd, runtime_dir, detection_summary_file, query_csv, observability_artifacts def _evaluate_run_outcome( @@ -352,7 +404,9 @@ def _run_subprocess_with_tty(cmd: list[str], metrics: StreamMetrics) -> int: def _run_single(cfg: HarnessConfig, artifact_dir: Path, run_id: str, tags: list[str] | None = None) -> dict[str, Any]: - cmd, runtime_dir, detection_summary_file, effective_query_csv = _build_command(cfg, artifact_dir, run_id) + cmd, runtime_dir, detection_summary_file, effective_query_csv, observability_artifacts = _build_command( + cfg, artifact_dir, run_id + ) command_text = " ".join(shlex.quote(token) for token in cmd) (artifact_dir / "command.txt").write_text(command_text + "\n", encoding="utf-8") @@ -407,6 +461,9 @@ def _run_single(cfg: HarnessConfig, artifact_dir: Path, run_id: str, tags: list[ "hybrid": cfg.hybrid, "embed_model_name": cfg.embed_model_name, "write_detection_file": cfg.write_detection_file, + "write_extract_artifacts": cfg.write_extract_artifacts, + "write_chunk_manifest": cfg.write_chunk_manifest, + "observability_archive_dir": cfg.observability_archive_dir, "lancedb_uri": _resolve_lancedb_uri(cfg, artifact_dir), "tuning": {field: getattr(cfg, field) for field in sorted(TUNING_FIELDS)}, }, @@ -426,6 +483,7 @@ def _run_single(cfg: HarnessConfig, artifact_dir: Path, run_id: str, tags: list[ "artifacts": { "command_file": str((artifact_dir / "command.txt").resolve()), "runtime_metrics_dir": str(runtime_dir.resolve()), + **{key: str(path.resolve()) for key, path in observability_artifacts.items()}, }, } if cfg.write_detection_file: diff --git a/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py b/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py index 84c13fe5f..66504e8d9 100644 --- a/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py +++ b/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py @@ -801,6 +801,10 @@ def embed( "No Ray Dataset to embed. Provide input_dataset or run .files(...) / .extract(...) first." ) + chunk_manifest_output_dir = kwargs.pop("chunk_manifest_output_dir", None) + durable_chunk_manifest_dir = kwargs.pop("durable_chunk_manifest_dir", None) + chunk_manifest_drop_columns = kwargs.pop("chunk_manifest_drop_columns", None) + from nemo_retriever.params.utils import build_embed_kwargs resolved = _coerce_params(params, EmbedParams, kwargs) @@ -839,6 +843,21 @@ def embed( num_cpus=1, ) + if chunk_manifest_output_dir: + from nemo_retriever.ingest_modes.observability import write_jsonl_snapshot_batch + + self._rd_dataset = self._rd_dataset.map_batches( + partial( + write_jsonl_snapshot_batch, + output_dir=str(chunk_manifest_output_dir), + stage_name="chunk-manifest", + durable_output_dir=str(durable_chunk_manifest_dir) if durable_chunk_manifest_dir else None, + drop_columns=list(chunk_manifest_drop_columns or []), + ), + batch_format="pandas", + num_cpus=1, + ) + # When using a remote NIM endpoint, no GPU is needed for embedding. endpoint = (kwargs.get("embedding_endpoint") or kwargs.get("embed_invoke_url") or "").strip() if endpoint: @@ -861,6 +880,35 @@ def embed( return self + def write_observability_snapshot( + self, + output_dir: str, + *, + stage_name: str, + durable_output_dir: str | None = None, + drop_columns: list[str] | None = None, + ) -> "BatchIngestor": + """Persist the current dataset to JSONL shards and continue the pipeline.""" + if not isinstance(output_dir, str) or not output_dir.strip(): + raise ValueError(f"output_dir must be a non-empty string, got {output_dir!r}") + if self._rd_dataset is None: + raise RuntimeError("No Ray Dataset to snapshot. Call .files(...) and a pipeline stage first.") + + from nemo_retriever.ingest_modes.observability import write_jsonl_snapshot_batch + + self._rd_dataset = self._rd_dataset.map_batches( + partial( + write_jsonl_snapshot_batch, + output_dir=output_dir, + stage_name=stage_name, + durable_output_dir=durable_output_dir, + drop_columns=list(drop_columns or []), + ), + batch_format="pandas", + num_cpus=1, + ) + return self + def vdb_upload(self, params: VdbUploadParams | None = None, **kwargs: Any) -> "BatchIngestor": """ Add a streaming LanceDB upload stage to the batch pipeline. diff --git a/nemo_retriever/src/nemo_retriever/ingest_modes/observability.py b/nemo_retriever/src/nemo_retriever/ingest_modes/observability.py new file mode 100644 index 000000000..7c9733a9b --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/ingest_modes/observability.py @@ -0,0 +1,114 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from datetime import datetime, timezone +import json +import logging +import os +from pathlib import Path +from typing import Any +from uuid import uuid4 + +logger = logging.getLogger(__name__) + + +def _to_jsonable(value: Any) -> Any: + if value is None or isinstance(value, (str, int, float, bool)): + return value + + if isinstance(value, (bytes, bytearray)): + return {"omitted_bytes": int(len(value))} + + if isinstance(value, dict): + out: dict[str, Any] = {} + for raw_key, raw_value in value.items(): + key = str(raw_key) + if key == "image_b64" and isinstance(raw_value, str): + out["image_b64_chars"] = int(len(raw_value)) + out["image_b64_omitted"] = True + continue + if key == "embedding": + try: + dim = len(raw_value) # type: ignore[arg-type] + except Exception: + dim = None + out["embedding_dimensions"] = int(dim) if dim is not None else None + out["embedding_omitted"] = True + continue + out[key] = _to_jsonable(raw_value) + return out + + if isinstance(value, (list, tuple)): + return [_to_jsonable(item) for item in value] + + item = getattr(value, "item", None) + if callable(item): + try: + return _to_jsonable(item()) + except Exception: + pass + + tolist = getattr(value, "tolist", None) + if callable(tolist): + try: + return _to_jsonable(tolist()) + except Exception: + pass + + return str(value) + + +def _normalize_record(record: dict[str, Any], *, drop_columns: set[str]) -> dict[str, Any]: + return {str(key): _to_jsonable(value) for key, value in record.items() if str(key) not in drop_columns} + + +def _atomic_write_text(path: Path, payload: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = path.with_name(path.name + ".tmp") + with tmp_path.open("w", encoding="utf-8") as fh: + fh.write(payload) + fh.flush() + try: + os.fsync(fh.fileno()) + except Exception: + pass + tmp_path.replace(path) + + +def write_jsonl_snapshot_batch( + batch_df: Any, + *, + output_dir: str, + stage_name: str, + durable_output_dir: str | None = None, + drop_columns: list[str] | None = None, +) -> Any: + raw_records = batch_df.to_dict(orient="records") + if not raw_records: + return batch_df + + records = [ + _normalize_record(record, drop_columns={str(col) for col in (drop_columns or [])}) for record in raw_records + ] + if not records: + return batch_df + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ") + file_name = f"{stage_name}-{timestamp}-{uuid4().hex[:8]}.jsonl" + payload = "".join(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n" for record in records) + + primary_path = Path(output_dir).expanduser().resolve() / file_name + _atomic_write_text(primary_path, payload) + + if durable_output_dir: + durable_path = Path(durable_output_dir).expanduser().resolve() / file_name + if durable_path != primary_path: + try: + _atomic_write_text(durable_path, payload) + except Exception: + logger.warning("Failed writing durable observability snapshot to %s", durable_path, exc_info=True) + + return batch_df diff --git a/nemo_retriever/tests/test_batch_ingestor.py b/nemo_retriever/tests/test_batch_ingestor.py index 21b9a7dde..c0b60a316 100644 --- a/nemo_retriever/tests/test_batch_ingestor.py +++ b/nemo_retriever/tests/test_batch_ingestor.py @@ -5,6 +5,7 @@ pytest.importorskip("ray") from nemo_retriever.ingest_modes.batch import BatchIngestor +from nemo_retriever.params import EmbedParams class _DummyClusterResources: @@ -56,3 +57,71 @@ def test_batch_ingestor_filters_none_runtime_env_vars(monkeypatch) -> None: } assert dummy_ctx.enable_rich_progress_bars is True assert dummy_ctx.use_ray_tqdm is False + + +class _DummyDataset: + def __init__(self) -> None: + self.calls: list[tuple[str, object, dict[str, object]]] = [] + + def repartition(self, **kwargs): + self.calls.append(("repartition", None, kwargs)) + return self + + def map_batches(self, fn, **kwargs): + self.calls.append(("map_batches", fn, kwargs)) + return self + + +class _DummyRequestedPlan: + def get_embed_batch_size(self) -> int: + return 4 + + def get_embed_gpus_per_actor(self) -> float: + return 0.0 + + def get_embed_initial_actors(self) -> int: + return 1 + + def get_embed_min_actors(self) -> int: + return 1 + + def get_embed_max_actors(self) -> int: + return 1 + + +def test_write_observability_snapshot_requires_output_dir() -> None: + ingestor = object.__new__(BatchIngestor) + ingestor._rd_dataset = _DummyDataset() + with pytest.raises(ValueError, match="output_dir must be a non-empty string"): + ingestor.write_observability_snapshot("", stage_name="extract") + + +def test_embed_inserts_chunk_manifest_snapshot_stage(monkeypatch) -> None: + ingestor = object.__new__(BatchIngestor) + ingestor._rd_dataset = _DummyDataset() + ingestor._requested_plan = _DummyRequestedPlan() + ingestor._tasks = [] + + params = EmbedParams(model_name="nvidia/llama-nemotron-embed-1b-v2") + + ingestor.embed( + params, + chunk_manifest_output_dir="/tmp/chunks", + durable_chunk_manifest_dir="/tmp/chunks-archive", + chunk_manifest_drop_columns=["_image_b64"], + ) + + map_batch_calls = [entry for entry in ingestor._rd_dataset.calls if entry[0] == "map_batches"] + assert len(map_batch_calls) >= 3 + + snapshot_call = map_batch_calls[1] + fn = snapshot_call[1] + kwargs = snapshot_call[2] + + assert getattr(fn, "func").__name__ == "write_jsonl_snapshot_batch" + assert fn.keywords["output_dir"] == "/tmp/chunks" + assert fn.keywords["stage_name"] == "chunk-manifest" + assert fn.keywords["durable_output_dir"] == "/tmp/chunks-archive" + assert fn.keywords["drop_columns"] == ["_image_b64"] + assert kwargs["batch_format"] == "pandas" + assert kwargs["num_cpus"] == 1 diff --git a/nemo_retriever/tests/test_batch_pipeline.py b/nemo_retriever/tests/test_batch_pipeline.py index 2d18d92bb..466d74ed2 100644 --- a/nemo_retriever/tests/test_batch_pipeline.py +++ b/nemo_retriever/tests/test_batch_pipeline.py @@ -1,21 +1,67 @@ +import sys +from types import SimpleNamespace + import pytest pytest.importorskip("ray") -from nemo_retriever.examples.batch_pipeline import _count_materialized_rows +from nemo_retriever.examples.batch_pipeline import _ensure_lancedb_table from nemo_retriever.utils.input_files import resolve_input_patterns -class _DatasetWithoutLen: - def count(self) -> int: - return 42 +def test_ensure_lancedb_table_creates_table_when_missing(monkeypatch, tmp_path) -> None: + created: dict[str, object] = {} + + class _FakeDb: + def open_table(self, _name: str) -> None: + raise RuntimeError("missing") + + def create_table(self, table_name: str, data, schema, mode: str) -> None: + created["table_name"] = table_name + created["schema"] = schema + created["mode"] = mode + created["rows"] = data.num_rows + + class _FakeLanceDb: + def connect(self, _uri: str) -> _FakeDb: + return _FakeDb() + + monkeypatch.setattr("nemo_retriever.examples.batch_pipeline._lancedb", lambda: _FakeLanceDb()) + monkeypatch.setattr("nemo_retriever.examples.batch_pipeline.lancedb_schema", lambda: []) + + class _FakeTable: + def __init__(self, values: dict[str, list[object]], schema: list[object]) -> None: + self.num_rows = len(next(iter(values.values()), [])) + self.schema = schema + + fake_pyarrow = SimpleNamespace(table=lambda values, schema: _FakeTable(values, schema)) + monkeypatch.setitem(sys.modules, "pyarrow", fake_pyarrow) + + _ensure_lancedb_table(str(tmp_path / "lancedb"), "nv-ingest") + assert created == {"table_name": "nv-ingest", "schema": [], "mode": "create", "rows": 0} + + +def test_ensure_lancedb_table_noops_when_table_exists(monkeypatch, tmp_path) -> None: + class _FakeDb: + def __init__(self) -> None: + self.create_called = False + + def open_table(self, _name: str) -> None: + return None + + def create_table(self, *args, **kwargs) -> None: # pragma: no cover - should not run + self.create_called = True + + fake_db = _FakeDb() - def __len__(self) -> int: - raise AssertionError("__len__ should not be used") + class _FakeLanceDb: + def connect(self, _uri: str) -> _FakeDb: + return fake_db + monkeypatch.setattr("nemo_retriever.examples.batch_pipeline._lancedb", lambda: _FakeLanceDb()) -def test_count_materialized_rows_prefers_dataset_count() -> None: - assert _count_materialized_rows(_DatasetWithoutLen()) == 42 + _ensure_lancedb_table(str(tmp_path / "lancedb"), "nv-ingest") + assert fake_db.create_called is False def test_resolve_input_file_patterns_recurses_for_directory_inputs(tmp_path) -> None: diff --git a/nemo_retriever/tests/test_harness_config.py b/nemo_retriever/tests/test_harness_config.py index d315cbaf2..4d687801b 100644 --- a/nemo_retriever/tests/test_harness_config.py +++ b/nemo_retriever/tests/test_harness_config.py @@ -305,3 +305,76 @@ def _fake_exists(path_self: Path) -> bool: assert cfg.dataset_dir == str(Path("/raid/tester/financebench").resolve()) assert cfg.query_csv == str(expected_query_csv) assert cfg.recall_required is True + + +def test_load_harness_config_defaults_observability_flags_to_false(tmp_path: Path) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + cfg_path = tmp_path / "test_configs.yaml" + cfg_path.write_text( + "\n".join( + [ + "active:", + f" dataset_dir: {dataset_dir}", + " preset: base", + " recall_required: false", + "presets:", + " base: {}", + "datasets: {}", + ] + ), + encoding="utf-8", + ) + + cfg = load_harness_config(config_file=str(cfg_path)) + assert cfg.write_extract_artifacts is False + assert cfg.write_chunk_manifest is False + assert cfg.observability_archive_dir is None + + +def test_load_harness_config_resolves_relative_observability_archive_dir(tmp_path: Path) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + cfg_path = tmp_path / "test_configs.yaml" + cfg_path.write_text( + "\n".join( + [ + "active:", + f" dataset_dir: {dataset_dir}", + " preset: base", + " recall_required: false", + " observability_archive_dir: observability_archive", + "presets:", + " base: {}", + "datasets: {}", + ] + ), + encoding="utf-8", + ) + + cfg = load_harness_config(config_file=str(cfg_path)) + assert cfg.observability_archive_dir == str((harness_config.REPO_ROOT / "observability_archive").resolve()) + + +def test_load_harness_config_rejects_blank_observability_archive_dir(tmp_path: Path) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + cfg_path = tmp_path / "test_configs.yaml" + cfg_path.write_text( + "\n".join( + [ + "active:", + f" dataset_dir: {dataset_dir}", + " preset: base", + " recall_required: false", + " observability_archive_dir: ' '", + "presets:", + " base: {}", + "datasets: {}", + ] + ), + encoding="utf-8", + ) + + with pytest.raises(ValueError, match="observability_archive_dir must be a non-empty string when set"): + load_harness_config(config_file=str(cfg_path)) diff --git a/nemo_retriever/tests/test_harness_run.py b/nemo_retriever/tests/test_harness_run.py index f94ee0f6d..c3934bf0c 100644 --- a/nemo_retriever/tests/test_harness_run.py +++ b/nemo_retriever/tests/test_harness_run.py @@ -51,7 +51,9 @@ def test_build_command_uses_hidden_detection_file_by_default(tmp_path: Path) -> query_csv=str(query_csv), write_detection_file=False, ) - cmd, runtime_dir, detection_file, effective_query_csv = _build_command(cfg, tmp_path, run_id="r1") + cmd, runtime_dir, detection_file, effective_query_csv, observability_artifacts = _build_command( + cfg, tmp_path, run_id="r1" + ) assert "--detection-summary-file" in cmd assert "--recall-match-mode" in cmd assert "pdf_page" in cmd @@ -74,6 +76,8 @@ def test_build_command_uses_hidden_detection_file_by_default(tmp_path: Path) -> assert detection_file.parent == runtime_dir assert detection_file.name == ".detection_summary.json" assert effective_query_csv == query_csv + assert "--ingest-errors-file" in cmd + assert observability_artifacts == {"ingest_errors_file": tmp_path / "ingest_errors.json"} def test_build_command_uses_top_level_detection_file_when_enabled(tmp_path: Path) -> None: @@ -89,7 +93,9 @@ def test_build_command_uses_top_level_detection_file_when_enabled(tmp_path: Path query_csv=str(query_csv), write_detection_file=True, ) - cmd, runtime_dir, detection_file, effective_query_csv = _build_command(cfg, tmp_path, run_id="r1") + cmd, runtime_dir, detection_file, effective_query_csv, _observability_artifacts = _build_command( + cfg, tmp_path, run_id="r1" + ) assert "--detection-summary-file" in cmd assert detection_file.parent == tmp_path assert detection_file.name == "detection_summary.json" @@ -109,7 +115,9 @@ def test_build_command_applies_page_plus_one_adapter(tmp_path: Path) -> None: query_csv=str(query_csv), recall_adapter="page_plus_one", ) - cmd, runtime_dir, _detection_file, effective_query_csv = _build_command(cfg, tmp_path, run_id="r1") + cmd, runtime_dir, _detection_file, effective_query_csv, _observability_artifacts = _build_command( + cfg, tmp_path, run_id="r1" + ) assert effective_query_csv.parent == runtime_dir assert effective_query_csv.name == "query_adapter.page_plus_one.csv" @@ -120,6 +128,49 @@ def test_build_command_applies_page_plus_one_adapter(tmp_path: Path) -> None: assert "q,doc_name_1" in csv_contents +def test_build_command_adds_observability_dirs_when_enabled(tmp_path: Path) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + query_csv = tmp_path / "query.csv" + query_csv.write_text("q,s,p\nx,y,1\n", encoding="utf-8") + archive_root = tmp_path / "archive" + + cfg = HarnessConfig( + dataset_dir=str(dataset_dir), + dataset_label="jp20", + preset="single_gpu", + query_csv=str(query_csv), + write_extract_artifacts=True, + write_chunk_manifest=True, + observability_archive_dir=str(archive_root), + ) + + cmd, _runtime_dir, _detection_file, _effective_query_csv, observability_artifacts = _build_command( + cfg, tmp_path, run_id="r1" + ) + + expected_local_extract = tmp_path / "observability" / "extracts" + expected_local_chunks = tmp_path / "observability" / "chunks" + expected_durable_extract = observability_artifacts["durable_extract_artifacts_dir"] + expected_durable_chunks = observability_artifacts["durable_chunk_manifest_dir"] + + assert "--extract-output-dir" in cmd + assert str(expected_local_extract) in cmd + assert "--chunk-manifest-dir" in cmd + assert str(expected_local_chunks) in cmd + assert "--durable-extract-output-dir" in cmd + assert str(expected_durable_extract) in cmd + assert "--durable-chunk-manifest-dir" in cmd + assert str(expected_durable_chunks) in cmd + assert observability_artifacts == { + "ingest_errors_file": tmp_path / "ingest_errors.json", + "extract_artifacts_dir": expected_local_extract, + "chunk_manifest_dir": expected_local_chunks, + "durable_extract_artifacts_dir": expected_durable_extract, + "durable_chunk_manifest_dir": expected_durable_chunks, + } + + def test_normalize_recall_metric_key_removes_duplicate_prefix() -> None: assert _normalize_recall_metric_key("recall@1") == "recall_1" assert _normalize_recall_metric_key("recall@10") == "recall_10" @@ -143,7 +194,13 @@ def test_run_single_writes_tags_to_results_json(monkeypatch, tmp_path: Path) -> monkeypatch.setattr( harness_run, "_build_command", - lambda *_args, **_kwargs: (["python", "-V"], runtime_dir, runtime_dir / ".detection_summary.json", query_csv), + lambda *_args, **_kwargs: ( + ["python", "-V"], + runtime_dir, + runtime_dir / ".detection_summary.json", + query_csv, + {"ingest_errors_file": tmp_path / "ingest_errors.json"}, + ), ) def _fake_run_subprocess(_cmd: list[str], metrics) -> int: @@ -318,6 +375,7 @@ def test_run_single_writes_results_with_run_metadata(monkeypatch, tmp_path: Path runtime_dir, detection_file, query_csv, + {"ingest_errors_file": artifact_dir / "ingest_errors.json"}, ), ) @@ -369,6 +427,9 @@ def _fake_run_subprocess(_cmd: list[str], metrics) -> int: "hybrid": cfg.hybrid, "embed_model_name": cfg.embed_model_name, "write_detection_file": True, + "write_extract_artifacts": False, + "write_chunk_manifest": False, + "observability_archive_dir": None, "lancedb_uri": str((artifact_dir / "lancedb").resolve()), "tuning": {field: getattr(cfg, field) for field in sorted(harness_run.TUNING_FIELDS)}, }, @@ -399,6 +460,7 @@ def _fake_run_subprocess(_cmd: list[str], metrics) -> int: "artifacts": { "command_file": str((artifact_dir / "command.txt").resolve()), "runtime_metrics_dir": str(runtime_dir.resolve()), + "ingest_errors_file": str((artifact_dir / "ingest_errors.json").resolve()), "detection_summary_file": str(detection_file.resolve()), }, } @@ -436,6 +498,7 @@ def test_run_single_allows_missing_optional_summary_files(monkeypatch, tmp_path: runtime_dir, detection_file, query_csv, + {"ingest_errors_file": artifact_dir / "ingest_errors.json"}, ), )