Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions nemo_retriever/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,34 @@ If you want a separate detection file for ad hoc inspection, set `write_detectio
`nemo_retriever/harness/test_configs.yaml`.
When tags are supplied with `--tag`, they are persisted in `results.json` and in session rollups for sweep/nightly runs.

### Observability artifacts (extracts/chunks)

The harness can optionally persist run-wide JSONL shards for extracted page content and pre-embed
chunk rows. This is intended for semantic comparison/debugging, and the format is not a
legacy per-document dump.

Enable in `nemo_retriever/harness/test_configs.yaml`:

- `write_extract_artifacts: true`
- `write_chunk_manifest: true`
- `observability_archive_dir: null` (or set a durable mirror root)

When enabled, `results.json -> artifacts` includes:

- `extract_artifacts_dir`
- `chunk_manifest_dir`
- `ingest_errors_file`
- optional `durable_extract_artifacts_dir` and `durable_chunk_manifest_dir`

CLI overrides for one-off runs:

```bash
retriever harness run --dataset jp20 --preset single_gpu \
--override write_extract_artifacts=true \
--override write_chunk_manifest=true \
--override observability_archive_dir=/path/to/archive
```

`results.json` also includes a nested `run_metadata` block for lightweight environment context:

- `host`
Expand Down
3 changes: 3 additions & 0 deletions nemo_retriever/harness/test_configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ active:
hybrid: false
embed_model_name: nvidia/llama-nemotron-embed-1b-v2
write_detection_file: false
write_extract_artifacts: true
write_chunk_manifest: true
observability_archive_dir: null

presets:
single_gpu:
Expand Down
76 changes: 74 additions & 2 deletions nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,45 @@ def main(
dir_okay=False,
help="Optional JSON file path to write end-of-run detection counts summary.",
),
extract_output_dir: Optional[Path] = typer.Option(
None,
"--extract-output-dir",
path_type=Path,
file_okay=False,
dir_okay=True,
help="Optional directory where extract snapshots are written as JSONL shards.",
),
durable_extract_output_dir: Optional[Path] = typer.Option(
None,
"--durable-extract-output-dir",
path_type=Path,
file_okay=False,
dir_okay=True,
help="Optional durable mirror directory for extract snapshots.",
),
chunk_manifest_dir: Optional[Path] = typer.Option(
None,
"--chunk-manifest-dir",
path_type=Path,
file_okay=False,
dir_okay=True,
help="Optional directory where pre-embed chunk manifests are written as JSONL shards.",
),
durable_chunk_manifest_dir: Optional[Path] = typer.Option(
None,
"--durable-chunk-manifest-dir",
path_type=Path,
file_okay=False,
dir_okay=True,
help="Optional durable mirror directory for chunk manifests.",
),
ingest_errors_file: Optional[Path] = typer.Option(
None,
"--ingest-errors-file",
path_type=Path,
dir_okay=False,
help="Optional JSON file path where ingest error rows are written on failure.",
),
recall_match_mode: str = typer.Option(
"pdf_page",
"--recall-match-mode",
Expand Down Expand Up @@ -486,6 +525,21 @@ def main(
if recall_match_mode not in {"pdf_page", "pdf_only"}:
raise ValueError(f"Unsupported --recall-match-mode: {recall_match_mode}")

if durable_extract_output_dir is not None and extract_output_dir is None:
raise typer.BadParameter("--durable-extract-output-dir requires --extract-output-dir")
if durable_chunk_manifest_dir is not None and chunk_manifest_dir is None:
raise typer.BadParameter("--durable-chunk-manifest-dir requires --chunk-manifest-dir")

extract_output_dir = extract_output_dir.expanduser().resolve() if extract_output_dir is not None else None
durable_extract_output_dir = (
durable_extract_output_dir.expanduser().resolve() if durable_extract_output_dir is not None else None
)
chunk_manifest_dir = chunk_manifest_dir.expanduser().resolve() if chunk_manifest_dir is not None else None
durable_chunk_manifest_dir = (
durable_chunk_manifest_dir.expanduser().resolve() if durable_chunk_manifest_dir is not None else None
)
ingest_errors_file = ingest_errors_file.expanduser().resolve() if ingest_errors_file is not None else None

os.environ["RAY_LOG_TO_DRIVER"] = "1" if ray_log_to_driver else "0"
# Use an absolute path so driver and Ray actors resolve the same LanceDB URI.
lancedb_uri = str(Path(lancedb_uri).expanduser().resolve())
Expand Down Expand Up @@ -665,6 +719,8 @@ def _extract_params(batch_tuning: dict, **overrides: Any) -> ExtractParams:
max_tokens=text_chunk_max_tokens or 1024,
overlap_tokens=text_chunk_overlap_tokens if text_chunk_overlap_tokens is not None else 150,
)
extract_snapshot_drop_columns = ["bytes", "page_image"] if input_type in {"pdf", "doc", "image"} else []
chunk_manifest_drop_columns = ["_image_b64", "page_image"]

if input_type == "txt":
ingestor = ingestor.files(file_patterns).extract_txt(_text_chunk_params)
Expand All @@ -679,11 +735,26 @@ def _extract_params(batch_tuning: dict, **overrides: Any) -> ExtractParams:
_extract_params(_pdf_batch_tuning, inference_batch_size=page_elements_batch_size)
)

if extract_output_dir is not None:
ingestor = ingestor.write_observability_snapshot(
str(extract_output_dir),
stage_name="extract",
durable_output_dir=str(durable_extract_output_dir) if durable_extract_output_dir is not None else None,
drop_columns=extract_snapshot_drop_columns,
)

enable_text_chunk = text_chunk or text_chunk_max_tokens is not None or text_chunk_overlap_tokens is not None
if enable_text_chunk:
ingestor = ingestor.split(_text_chunk_params)

ingestor = ingestor.embed(embed_params)
ingestor = ingestor.embed(
embed_params,
chunk_manifest_output_dir=str(chunk_manifest_dir) if chunk_manifest_dir is not None else None,
durable_chunk_manifest_dir=(
str(durable_chunk_manifest_dir) if durable_chunk_manifest_dir is not None else None
),
chunk_manifest_drop_columns=chunk_manifest_drop_columns,
)

logger.info("Running extraction...")
ingest_start = time.perf_counter()
Expand Down Expand Up @@ -717,7 +788,8 @@ def _extract_params(batch_tuning: dict, **overrides: Any) -> ExtractParams:

# Error out, stop processing, and write top 5 errors rows to a local file for analysis.
if error_count > 0:
error_file = Path("ingest_errors.json").resolve()
error_file = ingest_errors_file or Path("ingest_errors.json").resolve()
error_file.parent.mkdir(parents=True, exist_ok=True)
max_error_rows_to_write = 5
error_rows_to_write = error_rows.take(min(max_error_rows_to_write, error_count))
with error_file.open("w", encoding="utf-8") as fh:
Expand Down
20 changes: 20 additions & 0 deletions nemo_retriever/src/nemo_retriever/harness/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class HarnessConfig:
hybrid: bool = False
embed_model_name: str = "nvidia/llama-nemotron-embed-1b-v2"
write_detection_file: bool = False
write_extract_artifacts: bool = False
write_chunk_manifest: bool = False
observability_archive_dir: str | None = None

pdf_extract_workers: int = 8
pdf_extract_num_cpus: float = 2.0
Expand Down Expand Up @@ -101,6 +104,9 @@ def validate(self) -> list[str]:
if self.recall_adapter not in VALID_RECALL_ADAPTERS:
errors.append(f"recall_adapter must be one of {sorted(VALID_RECALL_ADAPTERS)}")

if self.observability_archive_dir is not None and not str(self.observability_archive_dir).strip():
errors.append("observability_archive_dir must be a non-empty string when set")

for name in TUNING_FIELDS:
val = getattr(self, name)
if name.startswith("gpu_") and float(val) < 0.0:
Expand Down Expand Up @@ -207,6 +213,9 @@ def _apply_env_overrides(config_dict: dict[str, Any]) -> None:
"HARNESS_HYBRID": ("hybrid", _parse_bool),
"HARNESS_EMBED_MODEL_NAME": ("embed_model_name", str),
"HARNESS_WRITE_DETECTION_FILE": ("write_detection_file", _parse_bool),
"HARNESS_WRITE_EXTRACT_ARTIFACTS": ("write_extract_artifacts", _parse_bool),
"HARNESS_WRITE_CHUNK_MANIFEST": ("write_chunk_manifest", _parse_bool),
"HARNESS_OBSERVABILITY_ARCHIVE_DIR": ("observability_archive_dir", str),
}

for key in TUNING_FIELDS:
Expand Down Expand Up @@ -309,6 +318,17 @@ def load_harness_config(
if merged.get("artifacts_dir") is not None:
merged["artifacts_dir"] = _resolve_path_like(str(merged["artifacts_dir"]), REPO_ROOT)

if merged.get("observability_archive_dir") is not None:
archive_dir_raw = str(merged["observability_archive_dir"])
if archive_dir_raw.strip():
merged["observability_archive_dir"] = _resolve_path_like(
archive_dir_raw,
REPO_ROOT,
)
else:
# Preserve blank values so dataclass validation raises a clear error.
merged["observability_archive_dir"] = archive_dir_raw

if merged.get("lancedb_uri") is None:
merged["lancedb_uri"] = "lancedb"

Expand Down
64 changes: 61 additions & 3 deletions nemo_retriever/src/nemo_retriever/harness/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,48 @@ def _resolve_lancedb_uri(cfg: HarnessConfig, artifact_dir: Path) -> str:
return str(p)


def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple[list[str], Path, Path, Path]:
def _resolve_observability_archive_run_dir(cfg: HarnessConfig, artifact_dir: Path) -> Path | None:
raw = cfg.observability_archive_dir
if raw is None:
return None

root = Path(str(raw)).expanduser()
if not root.is_absolute():
root = (Path.cwd() / root).resolve()

parent_name = artifact_dir.parent.name.strip()
run_name = artifact_dir.name.strip() or "run"
if parent_name and parent_name != run_name:
return root / parent_name / run_name
return root / run_name


def _resolve_observability_artifacts(cfg: HarnessConfig, artifact_dir: Path) -> dict[str, Path]:
artifacts: dict[str, Path] = {
"ingest_errors_file": artifact_dir / "ingest_errors.json",
}

observability_dir = artifact_dir / "observability"
if cfg.write_extract_artifacts:
artifacts["extract_artifacts_dir"] = observability_dir / "extracts"
if cfg.write_chunk_manifest:
artifacts["chunk_manifest_dir"] = observability_dir / "chunks"

durable_run_dir = _resolve_observability_archive_run_dir(cfg, artifact_dir)
if durable_run_dir is not None:
if cfg.write_extract_artifacts:
artifacts["durable_extract_artifacts_dir"] = durable_run_dir / "extracts"
if cfg.write_chunk_manifest:
artifacts["durable_chunk_manifest_dir"] = durable_run_dir / "chunks"

return artifacts


def _build_command(
cfg: HarnessConfig,
artifact_dir: Path,
run_id: str,
) -> tuple[list[str], Path, Path, Path, dict[str, Path]]:
runtime_dir = artifact_dir / "runtime_metrics"
runtime_dir.mkdir(parents=True, exist_ok=True)
if cfg.write_detection_file:
Expand All @@ -201,6 +242,7 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple
recall_adapter=cfg.recall_adapter,
output_dir=runtime_dir,
)
observability_artifacts = _resolve_observability_artifacts(cfg, artifact_dir)

cmd = [
sys.executable,
Expand Down Expand Up @@ -254,6 +296,8 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple
run_id,
"--detection-summary-file",
str(detection_summary_file),
"--ingest-errors-file",
str(observability_artifacts["ingest_errors_file"]),
"--lancedb-uri",
_resolve_lancedb_uri(cfg, artifact_dir),
]
Expand All @@ -262,8 +306,16 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple
cmd += ["--ray-address", cfg.ray_address]
if cfg.hybrid:
cmd += ["--hybrid"]
if cfg.write_extract_artifacts and "extract_artifacts_dir" in observability_artifacts:
cmd += ["--extract-output-dir", str(observability_artifacts["extract_artifacts_dir"])]
if "durable_extract_artifacts_dir" in observability_artifacts:
cmd += ["--durable-extract-output-dir", str(observability_artifacts["durable_extract_artifacts_dir"])]
if cfg.write_chunk_manifest and "chunk_manifest_dir" in observability_artifacts:
cmd += ["--chunk-manifest-dir", str(observability_artifacts["chunk_manifest_dir"])]
if "durable_chunk_manifest_dir" in observability_artifacts:
cmd += ["--durable-chunk-manifest-dir", str(observability_artifacts["durable_chunk_manifest_dir"])]

return cmd, runtime_dir, detection_summary_file, query_csv
return cmd, runtime_dir, detection_summary_file, query_csv, observability_artifacts


def _evaluate_run_outcome(
Expand Down Expand Up @@ -352,7 +404,9 @@ def _run_subprocess_with_tty(cmd: list[str], metrics: StreamMetrics) -> int:


def _run_single(cfg: HarnessConfig, artifact_dir: Path, run_id: str, tags: list[str] | None = None) -> dict[str, Any]:
cmd, runtime_dir, detection_summary_file, effective_query_csv = _build_command(cfg, artifact_dir, run_id)
cmd, runtime_dir, detection_summary_file, effective_query_csv, observability_artifacts = _build_command(
cfg, artifact_dir, run_id
)
command_text = " ".join(shlex.quote(token) for token in cmd)
(artifact_dir / "command.txt").write_text(command_text + "\n", encoding="utf-8")

Expand Down Expand Up @@ -407,6 +461,9 @@ def _run_single(cfg: HarnessConfig, artifact_dir: Path, run_id: str, tags: list[
"hybrid": cfg.hybrid,
"embed_model_name": cfg.embed_model_name,
"write_detection_file": cfg.write_detection_file,
"write_extract_artifacts": cfg.write_extract_artifacts,
"write_chunk_manifest": cfg.write_chunk_manifest,
"observability_archive_dir": cfg.observability_archive_dir,
"lancedb_uri": _resolve_lancedb_uri(cfg, artifact_dir),
"tuning": {field: getattr(cfg, field) for field in sorted(TUNING_FIELDS)},
},
Expand All @@ -426,6 +483,7 @@ def _run_single(cfg: HarnessConfig, artifact_dir: Path, run_id: str, tags: list[
"artifacts": {
"command_file": str((artifact_dir / "command.txt").resolve()),
"runtime_metrics_dir": str(runtime_dir.resolve()),
**{key: str(path.resolve()) for key, path in observability_artifacts.items()},
},
}
if cfg.write_detection_file:
Expand Down
48 changes: 48 additions & 0 deletions nemo_retriever/src/nemo_retriever/ingest_modes/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,10 @@ def embed(
"No Ray Dataset to embed. Provide input_dataset or run .files(...) / .extract(...) first."
)

chunk_manifest_output_dir = kwargs.pop("chunk_manifest_output_dir", None)
durable_chunk_manifest_dir = kwargs.pop("durable_chunk_manifest_dir", None)
chunk_manifest_drop_columns = kwargs.pop("chunk_manifest_drop_columns", None)

from nemo_retriever.params.utils import build_embed_kwargs

resolved = _coerce_params(params, EmbedParams, kwargs)
Expand Down Expand Up @@ -839,6 +843,21 @@ def embed(
num_cpus=1,
)

if chunk_manifest_output_dir:
from nemo_retriever.ingest_modes.observability import write_jsonl_snapshot_batch

self._rd_dataset = self._rd_dataset.map_batches(
partial(
write_jsonl_snapshot_batch,
output_dir=str(chunk_manifest_output_dir),
stage_name="chunk-manifest",
durable_output_dir=str(durable_chunk_manifest_dir) if durable_chunk_manifest_dir else None,
drop_columns=list(chunk_manifest_drop_columns or []),
),
batch_format="pandas",
num_cpus=1,
)

# When using a remote NIM endpoint, no GPU is needed for embedding.
endpoint = (kwargs.get("embedding_endpoint") or kwargs.get("embed_invoke_url") or "").strip()
if endpoint:
Expand All @@ -861,6 +880,35 @@ def embed(

return self

def write_observability_snapshot(
self,
output_dir: str,
*,
stage_name: str,
durable_output_dir: str | None = None,
drop_columns: list[str] | None = None,
) -> "BatchIngestor":
"""Persist the current dataset to JSONL shards and continue the pipeline."""
if not isinstance(output_dir, str) or not output_dir.strip():
raise ValueError(f"output_dir must be a non-empty string, got {output_dir!r}")
if self._rd_dataset is None:
raise RuntimeError("No Ray Dataset to snapshot. Call .files(...) and a pipeline stage first.")

from nemo_retriever.ingest_modes.observability import write_jsonl_snapshot_batch

self._rd_dataset = self._rd_dataset.map_batches(
partial(
write_jsonl_snapshot_batch,
output_dir=output_dir,
stage_name=stage_name,
durable_output_dir=durable_output_dir,
drop_columns=list(drop_columns or []),
),
batch_format="pandas",
num_cpus=1,
)
return self

def vdb_upload(self, params: VdbUploadParams | None = None, **kwargs: Any) -> "BatchIngestor":
"""
Add a streaming LanceDB upload stage to the batch pipeline.
Expand Down
Loading
Loading