diff --git a/nemo_retriever/README.md b/nemo_retriever/README.md index 71f58efd0..5233aead6 100644 --- a/nemo_retriever/README.md +++ b/nemo_retriever/README.md @@ -156,6 +156,11 @@ retriever local stage6 run --input-dir - `recall_adapter: page_plus_one` (convert zero-indexed `page` CSVs to `pdf_page`) - `recall_adapter: financebench_json` (convert FinanceBench JSON to `query,expected_pdf`) - `recall_match_mode: pdf_page|pdf_only` controls recall matching mode. +- Harness presets support `extends:` inheritance so common tuning can live in one base preset. +- Worker-count tuning fields can be set to `auto` in a preset to defer those counts to the batch + runtime heuristics. +- The built-in `auto_workers` preset keeps the conservative `single_gpu` batch sizes while letting + worker/task counts scale from the detected Ray GPU inventory. - Dataset presets configured under `/datasets/nv-ingest/...` will fall back to `/raid/$USER/...` when the dataset is not present in `/datasets`. - Relative `query_csv` entries in harness YAML resolve from the config file directory first, then fall back to the repo root. - The default `financebench` dataset preset now points at `data/financebench_train.json` and enables recall out of the box. @@ -259,6 +264,9 @@ Preset dataset name ```bash # Dataset preset from test_configs.yaml (recall-required example) retriever harness run --dataset jp20 --preset single_gpu + +# Keep the same single-GPU-style batch sizes but let worker counts auto-scale +retriever harness run --dataset jp20 --preset auto_workers ``` or @@ -346,6 +354,25 @@ These fields use best-effort discovery and fall back to `null` or `"unknown"` ra Sweep/nightly sessions additionally write: +6. Presets and heuristic sizing + +Use fixed presets such as `single_gpu` and `dgx_8gpu` when you want repeatable, pinned worker counts. +Use `auto_workers` when you want the harness to preserve the same high-level tuning shape but let the +batch runtime discover worker/task counts from the currently available GPUs. + +Today, `auto` is intentionally limited to worker/task count fields in +`nemo_retriever/harness/test_configs.yaml`, such as: + +- `pdf_extract_workers` +- `page_elements_workers` +- `ocr_workers` +- `embed_workers` + +When one of those fields is set to `auto`, the harness serializes the batch CLI sentinel value and +`nemo_retriever.examples.batch_pipeline` falls back to its existing `resolve_requested_plan()` +heuristics at runtime. Batch sizes, CPU-per-task values, and GPU shares remain explicit so the +preset behavior stays easy to reason about for new users. + The `runtime_metrics/` directory contains: When Slack posting is enabled, the nightly summary is built from `session_summary.json` plus each diff --git a/nemo_retriever/harness/HANDOFF.md b/nemo_retriever/harness/HANDOFF.md index 281234e12..364caba25 100644 --- a/nemo_retriever/harness/HANDOFF.md +++ b/nemo_retriever/harness/HANDOFF.md @@ -35,9 +35,11 @@ It captures what exists now, what was intentionally chosen, and what to iterate - Active default dataset: `jp20` (recall-required workflow). - `bo20` remains ingestion-only (`query_csv: null`, `recall_required: false`). -- Two main presets are available: +- Presets now support `extends:` inheritance and selective `auto` tuning values. +- Commonly used presets are: - `single_gpu` - `dgx_8gpu` + - `auto_workers` (keeps conservative batch sizes, defers worker counts to runtime heuristics) - Adapter-capable datasets: - `earnings` uses `recall_adapter: page_plus_one` (`page` -> `pdf_page` conversion). - `bo10k` wiring is included (adapter + mode), with recall disabled by default until query path is set. @@ -147,6 +149,9 @@ Notes: - Relative `query_csv` paths are stable across cwd/worktree runs. - Dataset presets can resolve from `/raid/$USER/...` when `/datasets/nv-ingest/...` is unavailable. - `financebench` now defaults to `data/financebench_train.json` with recall enabled. +- Preset usability improvement: + - Harness presets can inherit from a shared base preset. + - Worker-count fields can use `auto` so batch mode sizes them from Ray GPU inventory. - Session UX improvements: - Runs, sweeps, and nightly sessions accept repeatable `--tag` values persisted into artifacts. - `retriever harness summary` prints a compact table from `session_summary.json`. @@ -225,7 +230,6 @@ shim for that newer upstream behavior. ### P1 -- Add preset inheritance or scaling helper to reduce duplicated numeric tuning in YAML. - Add an artifact retention helper (manual command) to prune old sessions by age/size. - Consider a single `recall_profile` abstraction (mapping to adapter + match mode) to reduce config coupling and avoid invalid combinations. diff --git a/nemo_retriever/harness/test_configs.yaml b/nemo_retriever/harness/test_configs.yaml index de871ebda..4e21a9ef2 100644 --- a/nemo_retriever/harness/test_configs.yaml +++ b/nemo_retriever/harness/test_configs.yaml @@ -16,16 +16,11 @@ active: write_detection_file: false presets: - single_gpu: - pdf_extract_workers: 8 + fixed_common: pdf_extract_num_cpus: 2.0 pdf_extract_batch_size: 4 pdf_split_batch_size: 1 - page_elements_batch_size: 4 - page_elements_workers: 3 - ocr_workers: 3 ocr_batch_size: 16 - embed_workers: 3 embed_batch_size: 256 page_elements_cpus_per_actor: 1.0 ocr_cpus_per_actor: 1.0 @@ -34,23 +29,30 @@ presets: gpu_ocr: 0.1 gpu_embed: 0.25 + single_gpu: + extends: fixed_common + pdf_extract_workers: 8 + page_elements_batch_size: 4 + page_elements_workers: 3 + ocr_workers: 3 + embed_workers: 3 + dgx_8gpu: + extends: single_gpu pdf_extract_workers: 64 - pdf_extract_num_cpus: 2.0 - pdf_extract_batch_size: 4 - pdf_split_batch_size: 1 page_elements_batch_size: 32 page_elements_workers: 24 ocr_workers: 24 - ocr_batch_size: 16 embed_workers: 24 - embed_batch_size: 256 - page_elements_cpus_per_actor: 1.0 - ocr_cpus_per_actor: 1.0 - embed_cpus_per_actor: 1.0 - gpu_page_elements: 0.1 - gpu_ocr: 0.1 - gpu_embed: 0.25 + + # Keep the same conservative single-GPU batch sizes, but let batch mode + # discover actor/task counts from the current Ray GPU inventory. + auto_workers: + extends: single_gpu + pdf_extract_workers: auto + page_elements_workers: auto + ocr_workers: auto + embed_workers: auto datasets: bo20: diff --git a/nemo_retriever/src/nemo_retriever/harness/config.py b/nemo_retriever/src/nemo_retriever/harness/config.py index 4daef726b..c76acec49 100644 --- a/nemo_retriever/src/nemo_retriever/harness/config.py +++ b/nemo_retriever/src/nemo_retriever/harness/config.py @@ -16,6 +16,12 @@ DEFAULT_TEST_CONFIG_PATH = NEMO_RETRIEVER_ROOT / "harness" / "test_configs.yaml" DEFAULT_NIGHTLY_CONFIG_PATH = NEMO_RETRIEVER_ROOT / "harness" / "nightly_config.yaml" VALID_RECALL_ADAPTERS = {"none", "page_plus_one", "financebench_json"} +AUTO_WORKER_FIELDS = { + "pdf_extract_workers", + "page_elements_workers", + "ocr_workers", + "embed_workers", +} DEFAULT_NIGHTLY_SLACK_METRIC_KEYS = [ "pages", "ingest_secs", @@ -62,15 +68,15 @@ class HarnessConfig: embed_model_name: str = "nvidia/llama-nemotron-embed-1b-v2" write_detection_file: bool = False - pdf_extract_workers: int = 8 + pdf_extract_workers: int | None = 8 pdf_extract_num_cpus: float = 2.0 pdf_extract_batch_size: int = 4 pdf_split_batch_size: int = 1 page_elements_batch_size: int = 4 - page_elements_workers: int = 3 - ocr_workers: int = 3 + page_elements_workers: int | None = 3 + ocr_workers: int | None = 3 ocr_batch_size: int = 16 - embed_workers: int = 3 + embed_workers: int | None = 3 embed_batch_size: int = 256 page_elements_cpus_per_actor: float = 1.0 ocr_cpus_per_actor: float = 1.0 @@ -103,10 +109,16 @@ def validate(self) -> list[str]: for name in TUNING_FIELDS: val = getattr(self, name) + if val is None and name in AUTO_WORKER_FIELDS: + continue if name.startswith("gpu_") and float(val) < 0.0: errors.append(f"{name} must be >= 0.0") elif name.endswith("_workers") and int(val) < 1: errors.append(f"{name} must be >= 1") + elif name.endswith("_batch_size") and int(val) < 1: + errors.append(f"{name} must be >= 1") + elif name.endswith("_num_cpus") and float(val) <= 0.0: + errors.append(f"{name} must be > 0.0") return errors @@ -121,6 +133,12 @@ def _parse_number(value: str) -> int | float: return int(value) +def _parse_auto_worker_value(value: str) -> int | None: + if str(value).strip().lower() == "auto": + return None + return int(value) + + def _resolve_config_path(config_file: str | None, default_path: Path) -> Path: if config_file is None: return default_path @@ -141,6 +159,39 @@ def _read_yaml_mapping(path: Path) -> dict[str, Any]: return data +def _resolve_preset_values( + presets: dict[str, Any], preset_name: str, *, resolution_stack: tuple[str, ...] = () +) -> dict[str, Any]: + preset_key = str(preset_name) + if preset_key not in presets: + return {} + + raw_preset = presets[preset_key] + if not isinstance(raw_preset, dict): + raise ValueError(f"Preset '{preset_key}' must be a mapping") + + if preset_key in resolution_stack: + cycle = " -> ".join((*resolution_stack, preset_key)) + raise ValueError(f"Preset inheritance cycle detected: {cycle}") + + preset_values = dict(raw_preset) + parent_name = preset_values.pop("extends", None) + if parent_name is None: + return preset_values + if not isinstance(parent_name, str) or not parent_name.strip(): + raise ValueError(f"Preset '{preset_key}' has invalid extends value: {parent_name!r}") + if parent_name not in presets: + raise ValueError(f"Preset '{preset_key}' extends unknown preset '{parent_name}'") + + resolved_parent = _resolve_preset_values( + presets, + parent_name, + resolution_stack=(*resolution_stack, preset_key), + ) + resolved_parent.update(preset_values) + return resolved_parent + + def _resolve_path_like(value: str | None, base_path: Path = REPO_ROOT) -> str | None: if value is None: return None @@ -210,7 +261,8 @@ def _apply_env_overrides(config_dict: dict[str, Any]) -> None: } for key in TUNING_FIELDS: - env_map[f"HARNESS_{key.upper()}"] = (key, _parse_number) + parser = _parse_auto_worker_value if key in AUTO_WORKER_FIELDS else _parse_number + env_map[f"HARNESS_{key.upper()}"] = (key, parser) for env_key, (cfg_key, parser) in env_map.items(): raw = os.getenv(env_key) @@ -241,6 +293,27 @@ def _parse_cli_overrides(overrides: list[str] | None) -> dict[str, Any]: return parsed +def _normalize_tuning_values(config_dict: dict[str, Any]) -> None: + for name in AUTO_WORKER_FIELDS: + if name not in config_dict: + continue + + value = config_dict[name] + if value is None: + config_dict[name] = None + continue + + if isinstance(value, str): + raw = value.strip() + if raw.lower() == "auto" or raw == "": + config_dict[name] = None + continue + value = int(raw) + + normalized = int(value) + config_dict[name] = None if normalized == 0 else normalized + + def load_harness_config( *, config_file: str | None = None, @@ -292,13 +365,14 @@ def load_harness_config( dataset_label = Path(str(dataset_ref)).name merged["dataset_dir"] = str(dataset_ref) - preset_values = dict(presets.get(str(preset_ref), {})) + preset_values = _resolve_preset_values(presets, str(preset_ref)) merged.update(preset_values) merged.update({k: v for k, v in sweep_data.items() if k not in {"dataset", "preset"}}) merged.update(cli_override_map) if cli_recall_required is not None: merged["recall_required"] = cli_recall_required _apply_env_overrides(merged) + _normalize_tuning_values(merged) dataset_dir = merged.get("dataset_dir") if dataset_dir is None: diff --git a/nemo_retriever/src/nemo_retriever/harness/run.py b/nemo_retriever/src/nemo_retriever/harness/run.py index f0d10509c..28001790a 100644 --- a/nemo_retriever/src/nemo_retriever/harness/run.py +++ b/nemo_retriever/src/nemo_retriever/harness/run.py @@ -188,6 +188,12 @@ def _resolve_lancedb_uri(cfg: HarnessConfig, artifact_dir: Path) -> str: return str(p) +def _command_worker_count_value(value: int | None) -> str: + if value is not None: + return str(value) + return "0" + + def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple[list[str], Path, Path, Path]: runtime_dir = artifact_dir / "runtime_metrics" runtime_dir.mkdir(parents=True, exist_ok=True) @@ -215,7 +221,7 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple cfg.recall_match_mode, "--no-recall-details", "--pdf-extract-tasks", - str(cfg.pdf_extract_workers), + _command_worker_count_value(cfg.pdf_extract_workers), "--pdf-extract-cpus-per-task", str(cfg.pdf_extract_num_cpus), "--pdf-extract-batch-size", @@ -225,13 +231,13 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple "--page-elements-batch-size", str(cfg.page_elements_batch_size), "--page-elements-actors", - str(cfg.page_elements_workers), + _command_worker_count_value(cfg.page_elements_workers), "--ocr-actors", - str(cfg.ocr_workers), + _command_worker_count_value(cfg.ocr_workers), "--ocr-batch-size", str(cfg.ocr_batch_size), "--embed-actors", - str(cfg.embed_workers), + _command_worker_count_value(cfg.embed_workers), "--embed-batch-size", str(cfg.embed_batch_size), "--page-elements-cpus-per-actor", @@ -554,9 +560,10 @@ def sweep_command( typer.echo("Sweep dry run:") for idx, run in enumerate(runs): tag_text = f" tags={normalized_tags}" if normalized_tags else "" + effective_preset = preset if preset is not None else run.get("preset") plan_line = ( f" {idx + 1:03d}: name={run.get('name')} " - f"dataset={run.get('dataset')} preset={run.get('preset')}{tag_text}" + f"dataset={run.get('dataset')} preset={effective_preset}{tag_text}" ) typer.echo(plan_line) raise typer.Exit(code=0) diff --git a/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py b/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py index 84c13fe5f..a2585695a 100644 --- a/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py +++ b/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py @@ -83,6 +83,79 @@ def _runtime_env_vars() -> dict[str, str]: return {key: value for key, value in env_vars.items() if isinstance(value, str)} +def _normalize_requested_plan_int(value: Any) -> int | None: + if value is None: + return None + try: + normalized = int(value) + except (TypeError, ValueError): + return None + return normalized if normalized > 0 else None + + +def _normalize_requested_plan_float(value: Any) -> float | None: + if value is None: + return None + try: + normalized = float(value) + except (TypeError, ValueError): + return None + return normalized if normalized > 0.0 else None + + +def _batch_tuning_to_requested_plan_overrides(batch_tuning: dict[str, Any]) -> dict[str, int | float | None]: + overrides: dict[str, int | float | None] = {} + + if "pdf_extract_workers" in batch_tuning: + overrides["override_pdf_extract_tasks"] = _normalize_requested_plan_int(batch_tuning.get("pdf_extract_workers")) + if "pdf_extract_num_cpus" in batch_tuning: + overrides["override_pdf_extract_cpus_per_task"] = _normalize_requested_plan_float( + batch_tuning.get("pdf_extract_num_cpus") + ) + if "pdf_extract_batch_size" in batch_tuning: + overrides["override_pdf_extract_batch_size"] = _normalize_requested_plan_int( + batch_tuning.get("pdf_extract_batch_size") + ) + if "page_elements_batch_size" in batch_tuning: + overrides["override_page_elements_batch_size"] = _normalize_requested_plan_int( + batch_tuning.get("page_elements_batch_size") + ) + if "embed_batch_size" in batch_tuning: + overrides["override_embed_batch_size"] = _normalize_requested_plan_int(batch_tuning.get("embed_batch_size")) + + ocr_batch_size = batch_tuning.get("ocr_batch_size", batch_tuning.get("detect_batch_size")) + if "ocr_batch_size" in batch_tuning or "detect_batch_size" in batch_tuning: + overrides["override_ocr_batch_size"] = _normalize_requested_plan_int(ocr_batch_size) + + actor_count_fields = { + "page_elements_workers": "page_elements", + "ocr_workers": "ocr", + "detect_workers": "ocr", + "embed_workers": "embed", + } + for field_name, stage_name in actor_count_fields.items(): + if field_name not in batch_tuning: + continue + actor_count = _normalize_requested_plan_int(batch_tuning.get(field_name)) + overrides[f"override_{stage_name}_initial_actors"] = actor_count + overrides[f"override_{stage_name}_min_actors"] = actor_count + overrides[f"override_{stage_name}_max_actors"] = actor_count + + gpu_fields = { + "gpu_page_elements": "page_elements", + "gpu_ocr": "ocr", + "gpu_embed": "embed", + } + for field_name, stage_name in gpu_fields.items(): + if field_name not in batch_tuning: + continue + overrides[f"override_{stage_name}_gpus_per_actor"] = _normalize_requested_plan_float( + batch_tuning.get(field_name) + ) + + return overrides + + class _LanceDBWriteActor: """Ray Data actor that streams batches into LanceDB as they arrive. @@ -241,6 +314,7 @@ def __init__( # 2. Resolve requested plan for the Ray DAG that will be built self._requested_plan = resolve_requested_plan(cluster_resources=self._cluster_resources) logger.info(self._requested_plan) + self._requested_plan_overrides: dict[str, int | float | None] = {} # Builder-style task configuration recorded for later execution. # Keep backwards-compatibility with code that inspects `Ingestor._documents` @@ -254,6 +328,18 @@ def __init__( self._extract_html_kwargs: Dict[str, Any] = {} # noqa: F821 self._use_nemotron_parse_only: bool = False + def _refresh_requested_plan(self, batch_tuning: dict[str, Any]) -> None: + requested_plan_overrides = _batch_tuning_to_requested_plan_overrides(batch_tuning) + if not requested_plan_overrides: + return + + self._requested_plan_overrides.update(requested_plan_overrides) + self._requested_plan = resolve_requested_plan( + cluster_resources=self._cluster_resources, + **self._requested_plan_overrides, + ) + logger.info(self._requested_plan) + def files(self, documents: Union[str, List[str]]) -> "BatchIngestor": """ Add local files for batch processing. @@ -327,6 +413,7 @@ def extract(self, params: ExtractParams | None = None, **kwargs: Any) -> "BatchI **resolved.remote_retry.model_dump(mode="python", exclude_none=True), **resolved.batch_tuning.model_dump(mode="python", exclude_none=True), } + self._refresh_requested_plan(kwargs) # -- Pop resource-tuning kwargs before forwarding to actors -- def _endpoint_count(raw: Any) -> int: @@ -807,6 +894,7 @@ def embed( if any((resolved.embedding_endpoint, resolved.embed_invoke_url)) and not resolved.api_key: resolved = resolved.model_copy(update={"api_key": resolve_remote_api_key()}) kwargs = build_embed_kwargs(resolved, include_batch_tuning=True) + self._refresh_requested_plan(kwargs) # Remaining kwargs are forwarded to the actor constructor. embed_modality = resolved.embed_modality diff --git a/nemo_retriever/tests/test_batch_ingestor.py b/nemo_retriever/tests/test_batch_ingestor.py index 21b9a7dde..bd4f88884 100644 --- a/nemo_retriever/tests/test_batch_ingestor.py +++ b/nemo_retriever/tests/test_batch_ingestor.py @@ -4,7 +4,7 @@ pytest.importorskip("ray") -from nemo_retriever.ingest_modes.batch import BatchIngestor +from nemo_retriever.ingest_modes.batch import BatchIngestor, _batch_tuning_to_requested_plan_overrides class _DummyClusterResources: @@ -56,3 +56,43 @@ def test_batch_ingestor_filters_none_runtime_env_vars(monkeypatch) -> None: } assert dummy_ctx.enable_rich_progress_bars is True assert dummy_ctx.use_ray_tqdm is False + + +def test_batch_tuning_to_requested_plan_overrides_maps_fixed_and_auto_values() -> None: + overrides = _batch_tuning_to_requested_plan_overrides( + { + "pdf_extract_workers": 12, + "pdf_extract_num_cpus": 2.5, + "pdf_extract_batch_size": 8, + "page_elements_batch_size": 16, + "page_elements_workers": 6, + "detect_workers": 0, + "detect_batch_size": 0, + "embed_workers": 4, + "embed_batch_size": 128, + "gpu_page_elements": 0.2, + "gpu_ocr": 0.0, + "gpu_embed": 0.5, + } + ) + + assert overrides == { + "override_pdf_extract_tasks": 12, + "override_pdf_extract_cpus_per_task": 2.5, + "override_pdf_extract_batch_size": 8, + "override_page_elements_batch_size": 16, + "override_page_elements_initial_actors": 6, + "override_page_elements_min_actors": 6, + "override_page_elements_max_actors": 6, + "override_ocr_initial_actors": None, + "override_ocr_min_actors": None, + "override_ocr_max_actors": None, + "override_ocr_batch_size": None, + "override_embed_initial_actors": 4, + "override_embed_min_actors": 4, + "override_embed_max_actors": 4, + "override_embed_batch_size": 128, + "override_page_elements_gpus_per_actor": 0.2, + "override_ocr_gpus_per_actor": None, + "override_embed_gpus_per_actor": 0.5, + } diff --git a/nemo_retriever/tests/test_harness_config.py b/nemo_retriever/tests/test_harness_config.py index d315cbaf2..7a73fd9ca 100644 --- a/nemo_retriever/tests/test_harness_config.py +++ b/nemo_retriever/tests/test_harness_config.py @@ -65,6 +65,98 @@ def test_load_harness_config_precedence(tmp_path: Path, monkeypatch: pytest.Monk assert cfg.recall_required is True +def test_load_harness_config_supports_preset_inheritance(tmp_path: Path) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + query_csv = tmp_path / "query.csv" + query_csv.write_text("query,pdf_page\nq,doc_1\n", encoding="utf-8") + cfg_path = tmp_path / "test_configs.yaml" + cfg_path.write_text( + "\n".join( + [ + "active:", + " dataset: tiny", + " preset: derived", + "presets:", + " base:", + " pdf_extract_workers: 4", + " embed_workers: 2", + " gpu_embed: 0.3", + " derived:", + " extends: base", + " embed_workers: 6", + "datasets:", + " tiny:", + f" path: {dataset_dir}", + f" query_csv: {query_csv}", + " recall_required: true", + ] + ), + encoding="utf-8", + ) + + cfg = load_harness_config(config_file=str(cfg_path)) + assert cfg.pdf_extract_workers == 4 + assert cfg.embed_workers == 6 + assert cfg.gpu_embed == 0.3 + + +def test_load_harness_config_normalizes_auto_tuning_values(tmp_path: Path) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + query_csv = tmp_path / "query.csv" + query_csv.write_text("query,pdf_page\nq,doc_1\n", encoding="utf-8") + cfg_path = tmp_path / "test_configs.yaml" + cfg_path.write_text( + "\n".join( + [ + "active:", + " dataset: tiny", + " preset: auto_workers", + "presets:", + " auto_workers:", + " pdf_extract_workers: auto", + " page_elements_workers: 0", + " ocr_workers: auto", + " embed_workers: auto", + "datasets:", + " tiny:", + f" path: {dataset_dir}", + f" query_csv: {query_csv}", + " recall_required: true", + ] + ), + encoding="utf-8", + ) + + cfg = load_harness_config(config_file=str(cfg_path)) + assert cfg.pdf_extract_workers is None + assert cfg.page_elements_workers is None + assert cfg.ocr_workers is None + assert cfg.embed_workers is None + + +def test_load_harness_config_supports_auto_env_and_cli_overrides( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + query_csv = tmp_path / "query.csv" + query_csv.write_text("query,pdf_page\nq,doc_1\n", encoding="utf-8") + cfg_path = tmp_path / "test_configs.yaml" + _write_harness_config(cfg_path, dataset_dir, query_csv) + + monkeypatch.setenv("HARNESS_EMBED_WORKERS", "auto") + + cfg = load_harness_config( + config_file=str(cfg_path), + cli_overrides=["page_elements_workers=auto"], + ) + + assert cfg.page_elements_workers is None + assert cfg.embed_workers is None + + def test_load_harness_config_fails_when_recall_required_without_query(tmp_path: Path) -> None: dataset_dir = tmp_path / "dataset" dataset_dir.mkdir() diff --git a/nemo_retriever/tests/test_harness_run.py b/nemo_retriever/tests/test_harness_run.py index f94ee0f6d..5bef507fd 100644 --- a/nemo_retriever/tests/test_harness_run.py +++ b/nemo_retriever/tests/test_harness_run.py @@ -76,6 +76,41 @@ def test_build_command_uses_hidden_detection_file_by_default(tmp_path: Path) -> assert effective_query_csv == query_csv +def test_build_command_uses_zero_sentinels_for_auto_tuning_fields(tmp_path: Path) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + query_csv = tmp_path / "query.csv" + query_csv.write_text("q,s,p\nx,y,1\n", encoding="utf-8") + + cfg = HarnessConfig( + dataset_dir=str(dataset_dir), + dataset_label="jp20", + preset="auto_workers", + query_csv=str(query_csv), + pdf_extract_workers=None, + page_elements_workers=None, + ocr_workers=None, + embed_workers=None, + ) + cmd, _runtime_dir, _detection_file, _effective_query_csv = _build_command(cfg, tmp_path, run_id="r1") + + def _value_for(flag: str) -> str: + return cmd[cmd.index(flag) + 1] + + assert _value_for("--pdf-extract-tasks") == "0" + assert _value_for("--pdf-extract-cpus-per-task") == "2.0" + assert _value_for("--pdf-extract-batch-size") == "4" + assert _value_for("--page-elements-batch-size") == "4" + assert _value_for("--page-elements-actors") == "0" + assert _value_for("--ocr-actors") == "0" + assert _value_for("--ocr-batch-size") == "16" + assert _value_for("--embed-actors") == "0" + assert _value_for("--embed-batch-size") == "256" + assert _value_for("--page-elements-gpus-per-actor") == "0.1" + assert _value_for("--ocr-gpus-per-actor") == "0.1" + assert _value_for("--embed-gpus-per-actor") == "0.25" + + def test_build_command_uses_top_level_detection_file_when_enabled(tmp_path: Path) -> None: dataset_dir = tmp_path / "dataset" dataset_dir.mkdir() @@ -525,3 +560,19 @@ def _fake_run_entry(**kwargs) -> dict: assert result.exit_code == 0 assert captured["tags"] == ["nightly", "candidate"] + + +def test_cli_sweep_dry_run_uses_preset_override(monkeypatch) -> None: + monkeypatch.setattr( + harness_run, + "load_runs_config", + lambda _path: [{"name": "jp20_single", "dataset": "jp20", "preset": "single_gpu"}], + ) + + result = RUNNER.invoke( + harness_app, + ["sweep", "--runs-config", "nightly.yaml", "--preset", "auto_workers", "--dry-run"], + ) + + assert result.exit_code == 0 + assert "preset=auto_workers" in result.stdout