From 3a660a880204bd2f1b0ad079ef90a2b557384b26 Mon Sep 17 00:00:00 2001 From: rohit kumar Date: Wed, 14 Jan 2026 04:04:36 +0000 Subject: [PATCH 1/6] docs: add SDMX quick links Highlight key SDMX docs for faster navigation --- tools/agentic_import/README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tools/agentic_import/README.md b/tools/agentic_import/README.md index d70dafa8fb..4eb5ca6356 100644 --- a/tools/agentic_import/README.md +++ b/tools/agentic_import/README.md @@ -5,7 +5,7 @@ This guide describes the complete process for importing CSV data into Data Commo ## Table of Contents - [Agentic Import Tool for Data Commons](#agentic-import-tool-for-data-commons) - - [Table of Contents](#table-of-contents) + - [SDMX Quick Links](#sdmx-quick-links) - [Prerequisites](#prerequisites) - [Required Tools](#required-tools) - [Setup](#setup) @@ -26,6 +26,12 @@ This guide describes the complete process for importing CSV data into Data Commo - [Gemini CLI Debugging](#gemini-cli-debugging) - [Log Structure](#log-structure) +## SDMX Quick Links + +- [SDMX import pipeline (end-to-end)](sdmx_import_pipeline.md) +- [SDMX Downloads (section)](#sdmx-downloads) +- [SDMX CLI documentation](../sdmx_import/README.md) + ## Prerequisites Before starting the import process, ensure you have the following installed and configured: @@ -112,6 +118,7 @@ working_directory/ #### SDMX Downloads Refer to the [SDMX CLI documentation](../sdmx_import/README.md) for details on downloading SDMX data and metadata files. +See the [SDMX import pipeline](sdmx_import_pipeline.md) for the end-to-end SDMX flow. Extract a simplified, token-efficient JSON metadata copy from `metadata.xml`, retaining the original XML for later PV map generation. From 61500d3e92967e0704cbb24a2ed0629001cdedcc Mon Sep 17 00:00:00 2001 From: rohit kumar Date: Wed, 14 Jan 2026 05:58:01 +0000 Subject: [PATCH 2/6] feat: add SDMX metadata extraction step Use extracted JSON for schema mapping and update tests --- .../sdmx_import_pipeline_test.py | 74 ++++++++++++++++++- tools/agentic_import/sdmx_pipeline_builder.py | 3 +- tools/agentic_import/sdmx_pipeline_steps.py | 73 +++++++++++++++++- 3 files changed, 144 insertions(+), 6 deletions(-) diff --git a/tools/agentic_import/sdmx_import_pipeline_test.py b/tools/agentic_import/sdmx_import_pipeline_test.py index 367ee0364f..b67eb71924 100644 --- a/tools/agentic_import/sdmx_import_pipeline_test.py +++ b/tools/agentic_import/sdmx_import_pipeline_test.py @@ -52,7 +52,8 @@ PipelineConfig, RunConfig, SampleConfig, SdmxConfig, SdmxDataflowConfig) from tools.agentic_import.sdmx_pipeline_steps import ( # pylint: disable=import-error CreateDcConfigStep, CreateSampleStep, CreateSchemaMapStep, DownloadDataStep, - DownloadMetadataStep, ProcessFullDataStep, SdmxStep, _run_command) + DownloadMetadataStep, ExtractMetadataStep, ProcessFullDataStep, SdmxStep, + _run_command) from tools.agentic_import.state_handler import ( # pylint: disable=import-error PipelineState, StateHandler, StepState) @@ -369,6 +370,7 @@ def test_force_semantics(self) -> None: self.assertEqual(names_all, [ "download-data", "download-metadata", + "extract-metadata", "create-sample", "create-schema-mapping", "process-full-data", @@ -381,6 +383,7 @@ def test_timestamp_chaining_triggers_next_step(self) -> None: state = self._state_with({ "download-data": ("1", "succeeded", newer), "download-metadata": ("1", "succeeded", older), + "extract-metadata": ("1", "succeeded", older), "create-sample": ("1", "succeeded", older), "create-schema-mapping": ("1", "succeeded", older), "process-full-data": ("1", "succeeded", older), @@ -390,6 +393,7 @@ def test_timestamp_chaining_triggers_next_step(self) -> None: names = self._names_from_builder(cfg, state=state) self.assertEqual(names, [ "download-metadata", + "extract-metadata", "create-sample", "create-schema-mapping", "process-full-data", @@ -443,6 +447,7 @@ def test_incremental_records_skip_reasons(self) -> None: state = self._state_with({ "download-data": ("1", "succeeded", 1_000), "download-metadata": ("1", "succeeded", 1_000), + "extract-metadata": ("1", "succeeded", 1_000), "create-sample": ("1", "succeeded", 1_000), "create-schema-mapping": ("1", "succeeded", 1_000), "process-full-data": ("1", "succeeded", 1_000), @@ -548,11 +553,12 @@ def test_run_pipeline_updates_state_and_hash(self) -> None: self.assertEqual(state["dataset_prefix"], "demo") self.assertEqual(state["command"], command) self.assertEqual(state["critical_input_hash"], expected_hash) - self.assertEqual(len(state["steps"]), 6) + self.assertEqual(len(state["steps"]), 7) for step_name in [ - "download-data", "download-metadata", "create-sample", - "create-schema-mapping", "process-full-data", "create-dc-config" + "download-data", "download-metadata", "extract-metadata", + "create-sample", "create-schema-mapping", "process-full-data", + "create-dc-config" ]: self.assertIn(step_name, state["steps"]) self.assertEqual(state["steps"][step_name]["status"], "succeeded") @@ -767,6 +773,37 @@ def test_download_metadata_step_run_and_dry_run_use_same_plan(self) -> None: cmd_contains="download-metadata", ) + def test_extract_metadata_step_caches_plan(self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = ExtractMetadataStep(name="test-step", config=config) + # Cache the resolved command and paths for the extractor step. + self._assert_step_caches_plan( + step, + command_contains=["sdmx_metadata_extractor.py"], + path_attrs=["input_path", "output_path"], + ) + + def test_extract_metadata_step_run_and_dry_run_use_same_plan(self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = ExtractMetadataStep(name="test-step", config=config) + (Path(self._tmpdir) / "demo_metadata.xml").write_text("") + # Dry run and run should share the same extractor command plan. + self._assert_run_and_dry_run_use_same_plan( + step, + log_contains="sdmx_metadata_extractor.py", + cmd_contains="sdmx_metadata_extractor.py", + ) + def test_download_data_step_caches_plan(self) -> None: config = PipelineConfig( sdmx=SdmxConfig( @@ -905,6 +942,35 @@ def test_create_schema_map_step_caches_plan(self) -> None: path_attrs=["sample_path", "metadata_path", "output_prefix"], ) + def test_create_schema_map_step_prefers_extracted_metadata(self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = CreateSchemaMapStep(name="test-step", config=config) + (Path(self._tmpdir) / "demo_sample.csv").write_text("header\nrow1") + (Path(self._tmpdir) / "demo_metadata.xml").write_text("") + (Path(self._tmpdir) / "demo_metadata.json").write_text("{}") + context = step._prepare_command() + # Prefer the extracted JSON when both formats are available. + self.assertEqual(context.metadata_path.name, "demo_metadata.json") + + def test_create_schema_map_step_uses_xml_when_json_missing(self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = CreateSchemaMapStep(name="test-step", config=config) + (Path(self._tmpdir) / "demo_sample.csv").write_text("header\nrow1") + (Path(self._tmpdir) / "demo_metadata.xml").write_text("") + context = step._prepare_command() + # If JSON metadata is missing, fall back to the XML file. + self.assertEqual(context.metadata_path.name, "demo_metadata.xml") + def test_create_schema_map_step_run_and_dry_run_use_same_plan(self) -> None: config = PipelineConfig(run=RunConfig( command="test", diff --git a/tools/agentic_import/sdmx_pipeline_builder.py b/tools/agentic_import/sdmx_pipeline_builder.py index e7588577e5..5a2ef4a2f4 100644 --- a/tools/agentic_import/sdmx_pipeline_builder.py +++ b/tools/agentic_import/sdmx_pipeline_builder.py @@ -24,7 +24,7 @@ from tools.agentic_import.sdmx_pipeline_config import PipelineConfig from tools.agentic_import.sdmx_pipeline_steps import ( CreateDcConfigStep, CreateSampleStep, CreateSchemaMapStep, DownloadDataStep, - DownloadMetadataStep, ProcessFullDataStep) + DownloadMetadataStep, ExtractMetadataStep, ProcessFullDataStep) from tools.agentic_import.state_handler import PipelineState @@ -202,6 +202,7 @@ def build_steps(config: PipelineConfig) -> list[Step]: return [ DownloadDataStep(name="download-data", config=config), DownloadMetadataStep(name="download-metadata", config=config), + ExtractMetadataStep(name="extract-metadata", config=config), CreateSampleStep(name="create-sample", config=config), CreateSchemaMapStep(name="create-schema-mapping", config=config), ProcessFullDataStep(name="process-full-data", config=config), diff --git a/tools/agentic_import/sdmx_pipeline_steps.py b/tools/agentic_import/sdmx_pipeline_steps.py index 9455a811c3..787c46a93f 100644 --- a/tools/agentic_import/sdmx_pipeline_steps.py +++ b/tools/agentic_import/sdmx_pipeline_steps.py @@ -39,6 +39,8 @@ STAT_VAR_PROCESSOR_PATH = (REPO_ROOT / "tools" / "statvar_importer" / "stat_var_processor.py") PVMAP_GENERATOR_PATH = REPO_ROOT / "tools" / "agentic_import" / "pvmap_generator.py" +SDMX_METADATA_EXTRACTOR_PATH = (REPO_ROOT / "tools" / "agentic_import" / + "sdmx_metadata_extractor.py") DC_CONFIG_GENERATOR_PATH = (REPO_ROOT / "tools" / "agentic_import" / "generate_custom_dc_config.py") @@ -201,6 +203,62 @@ def dry_run(self) -> None: ) +class ExtractMetadataStep(SdmxStep): + """Extracts SDMX metadata to JSON for downstream steps.""" + + VERSION = "1" + + @dataclass(frozen=True) + class _StepContext: + input_path: Path + output_path: Path + full_command: list[str] + + def __init__(self, *, name: str, config: PipelineConfig) -> None: + super().__init__(name=name, version=self.VERSION, config=config) + self._context: ExtractMetadataStep._StepContext | None = None + + def _prepare_command(self) -> _StepContext: + if self._context: + return self._context + dataset_prefix = self._config.run.dataset_prefix + working_dir = Path(self._config.run.working_dir).resolve() + input_path = working_dir / f"{dataset_prefix}_metadata.xml" + output_path = working_dir / f"{dataset_prefix}_metadata.json" + args = [ + f"--input_metadata={input_path}", + f"--output_path={output_path}", + ] + full_command = [sys.executable, + str(SDMX_METADATA_EXTRACTOR_PATH)] + args + self._context = ExtractMetadataStep._StepContext( + input_path=input_path, + output_path=output_path, + full_command=full_command, + ) + return self._context + + def run(self) -> None: + context = self._prepare_command() + if not context.input_path.is_file(): + raise RuntimeError( + f"Metadata XML file missing: {context.input_path}") + if self._config.run.verbose: + logging.info( + f"Starting SDMX metadata extraction: {' '.join(context.full_command)} -> {context.output_path}" + ) + else: + logging.info( + f"Extracting SDMX metadata to {context.output_path}") + _run_command(context.full_command, verbose=self._config.run.verbose) + + def dry_run(self) -> None: + context = self._prepare_command() + logging.info( + f"{self.name} (dry run): would run {' '.join(context.full_command)}" + ) + + class CreateSampleStep(SdmxStep): """Creates a sample dataset from downloaded data.""" @@ -277,7 +335,8 @@ def _prepare_command(self) -> _StepContext: dataset_prefix = self._config.run.dataset_prefix working_dir = Path(self._config.run.working_dir).resolve() sample_path = working_dir / f"{dataset_prefix}_sample.csv" - metadata_path = working_dir / f"{dataset_prefix}_metadata.xml" + metadata_path = self._resolve_metadata_path( + working_dir=working_dir, dataset_prefix=dataset_prefix) output_prefix = working_dir / SAMPLE_OUTPUT_DIR / dataset_prefix args = [ @@ -300,6 +359,18 @@ def _prepare_command(self) -> _StepContext: full_command=full_command) return self._context + def _resolve_metadata_path(self, *, working_dir: Path, + dataset_prefix: str) -> Path: + extracted_path = working_dir / f"{dataset_prefix}_metadata.json" + xml_path = working_dir / f"{dataset_prefix}_metadata.xml" + if extracted_path.is_file(): + logging.info(f"Using extracted SDMX metadata: {extracted_path}") + return extracted_path + logging.info( + f"Extracted SDMX metadata not found; falling back to XML: {xml_path}" + ) + return xml_path + def run(self) -> None: context = self._prepare_command() if not context.sample_path.is_file(): From f6fd408e733321b4f27bf9de82d9c317f14b5751 Mon Sep 17 00:00:00 2001 From: rohit kumar Date: Wed, 14 Jan 2026 06:41:34 +0000 Subject: [PATCH 3/6] test: tighten sdmx step command assertions --- .../sdmx_import_pipeline_test.py | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/tools/agentic_import/sdmx_import_pipeline_test.py b/tools/agentic_import/sdmx_import_pipeline_test.py index b67eb71924..4f741b5c9e 100644 --- a/tools/agentic_import/sdmx_import_pipeline_test.py +++ b/tools/agentic_import/sdmx_import_pipeline_test.py @@ -667,26 +667,22 @@ def _assert_run_and_dry_run_use_same_plan( self, step, *, - log_contains: str, - cmd_contains: str, + cmd_contains: str | None = None, + expected_command: list[str] | None = None, extra_cmd_checks=None, expect_verbose: bool = True) -> None: extra_cmd_checks = extra_cmd_checks or [] with mock.patch("tools.agentic_import.sdmx_pipeline_steps._run_command" ) as mock_run_cmd: - with self.assertLogs(logging.get_absl_logger(), - level="INFO") as logs: - step.dry_run() - step.run() - - self.assertTrue( - any("test-step (dry run): would run" in entry - for entry in logs.output)) - self.assertTrue(any(log_contains in entry for entry in logs.output)) + step.dry_run() + step.run() mock_run_cmd.assert_called_once() args, kwargs = mock_run_cmd.call_args command = args[0] - self.assertTrue(any(cmd_contains in arg for arg in command)) + if cmd_contains: + self.assertTrue(any(cmd_contains in arg for arg in command)) + if expected_command: + self.assertEqual(command, expected_command) self.assertEqual(kwargs["verbose"], expect_verbose) for check in extra_cmd_checks: check(command) @@ -769,7 +765,6 @@ def test_download_metadata_step_run_and_dry_run_use_same_plan(self) -> None: step = DownloadMetadataStep(name="test-step", config=config) self._assert_run_and_dry_run_use_same_plan( step, - log_contains="download-metadata", cmd_contains="download-metadata", ) @@ -797,11 +792,19 @@ def test_extract_metadata_step_run_and_dry_run_use_same_plan(self) -> None: ),) step = ExtractMetadataStep(name="test-step", config=config) (Path(self._tmpdir) / "demo_metadata.xml").write_text("") - # Dry run and run should share the same extractor command plan. + # Verify the extractor command matches the expected full command. + working_dir = Path(self._tmpdir).resolve() + expected_command = [ + sys.executable, + str( + Path(_PROJECT_ROOT) / "tools" / "agentic_import" / + "sdmx_metadata_extractor.py"), + f"--input_metadata={working_dir / 'demo_metadata.xml'}", + f"--output_path={working_dir / 'demo_metadata.json'}", + ] self._assert_run_and_dry_run_use_same_plan( step, - log_contains="sdmx_metadata_extractor.py", - cmd_contains="sdmx_metadata_extractor.py", + expected_command=expected_command, ) def test_download_data_step_caches_plan(self) -> None: @@ -851,7 +854,6 @@ def test_download_data_step_run_and_dry_run_use_same_plan(self) -> None: step = DownloadDataStep(name="test-step", config=config) self._assert_run_and_dry_run_use_same_plan( step, - log_contains="download-data", cmd_contains="download-data", ) @@ -889,7 +891,6 @@ def test_create_sample_step_run_and_dry_run_use_same_plan(self) -> None: input_path.write_text("header\nrow1") self._assert_run_and_dry_run_use_same_plan( step, - log_contains="data_sampler.py", cmd_contains="data_sampler.py", ) @@ -985,7 +986,6 @@ def test_create_schema_map_step_run_and_dry_run_use_same_plan(self) -> None: (Path(self._tmpdir) / "demo_metadata.xml").write_text("") self._assert_run_and_dry_run_use_same_plan( step, - log_contains="pvmap_generator.py", cmd_contains="pvmap_generator.py", ) @@ -1043,7 +1043,6 @@ def test_process_full_data_step_run_and_dry_run_use_same_plan(self) -> None: self._create_test_input_files("demo") self._assert_run_and_dry_run_use_same_plan( step, - log_contains="stat_var_processor.py", cmd_contains="stat_var_processor.py", extra_cmd_checks=[ lambda command: self.assertTrue( @@ -1100,7 +1099,6 @@ def test_create_dc_config_step_run_and_dry_run_use_same_plan(self) -> None: (final_output_dir / "demo.csv").write_text("data") self._assert_run_and_dry_run_use_same_plan( step, - log_contains="generate_custom_dc_config.py", cmd_contains="generate_custom_dc_config.py", extra_cmd_checks=[ lambda command: self.assertIn( From 774398c30ecf2953e7c5857d01d21e13b6de11e9 Mon Sep 17 00:00:00 2001 From: rohit kumar Date: Wed, 14 Jan 2026 06:57:20 +0000 Subject: [PATCH 4/6] test: assert full sdmx step commands --- .../sdmx_import_pipeline_test.py | 52 +++++++++++++++++-- tools/agentic_import/sdmx_pipeline_steps.py | 3 +- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/tools/agentic_import/sdmx_import_pipeline_test.py b/tools/agentic_import/sdmx_import_pipeline_test.py index 4f741b5c9e..08209d04af 100644 --- a/tools/agentic_import/sdmx_import_pipeline_test.py +++ b/tools/agentic_import/sdmx_import_pipeline_test.py @@ -763,9 +763,20 @@ def test_download_metadata_step_run_and_dry_run_use_same_plan(self) -> None: ), ) step = DownloadMetadataStep(name="test-step", config=config) + working_dir = Path(self._tmpdir).resolve() + expected_command = [ + sys.executable, + str(Path(_PROJECT_ROOT) / "tools" / "sdmx_import" / "sdmx_cli.py"), + "download-metadata", + "--endpoint=https://example.com", + "--agency=AGENCY", + "--dataflow=FLOW", + f"--output_path={working_dir / 'demo_metadata.xml'}", + "--verbose", + ] self._assert_run_and_dry_run_use_same_plan( step, - cmd_contains="download-metadata", + expected_command=expected_command, ) def test_extract_metadata_step_caches_plan(self) -> None: @@ -852,9 +863,20 @@ def test_download_data_step_run_and_dry_run_use_same_plan(self) -> None: ), ) step = DownloadDataStep(name="test-step", config=config) + working_dir = Path(self._tmpdir).resolve() + expected_command = [ + sys.executable, + str(Path(_PROJECT_ROOT) / "tools" / "sdmx_import" / "sdmx_cli.py"), + "download-data", + "--endpoint=https://example.com", + "--agency=AGENCY", + "--dataflow=FLOW", + f"--output_path={working_dir / 'demo_data.csv'}", + "--verbose", + ] self._assert_run_and_dry_run_use_same_plan( step, - cmd_contains="download-data", + expected_command=expected_command, ) def test_create_sample_step_caches_plan(self) -> None: @@ -889,9 +911,19 @@ def test_create_sample_step_run_and_dry_run_use_same_plan(self) -> None: # Create test input file for run() input_path = Path(self._tmpdir) / "demo_data.csv" input_path.write_text("header\nrow1") + working_dir = Path(self._tmpdir).resolve() + expected_command = [ + sys.executable, + str( + Path(_PROJECT_ROOT) / "tools" / "statvar_importer" / + "data_sampler.py"), + f"--sampler_input={working_dir / 'demo_data.csv'}", + f"--sampler_output={working_dir / 'demo_sample.csv'}", + "--sampler_output_rows=500", + ] self._assert_run_and_dry_run_use_same_plan( step, - cmd_contains="data_sampler.py", + expected_command=expected_command, ) def test_create_sample_step_dry_run_succeeds_if_input_missing(self) -> None: @@ -984,9 +1016,21 @@ def test_create_schema_map_step_run_and_dry_run_use_same_plan(self) -> None: # Create test input files for run() (Path(self._tmpdir) / "demo_sample.csv").write_text("header\nrow1") (Path(self._tmpdir) / "demo_metadata.xml").write_text("") + working_dir = Path(self._tmpdir).resolve() + expected_command = [ + sys.executable, + str( + Path(_PROJECT_ROOT) / "tools" / "agentic_import" / + "pvmap_generator.py"), + f"--input_data={working_dir / 'demo_sample.csv'}", + f"--input_metadata={working_dir / 'demo_metadata.xml'}", + "--sdmx_dataset", + f"--output_path={working_dir / 'sample_output' / 'demo'}", + f"--working_dir={working_dir}", + ] self._assert_run_and_dry_run_use_same_plan( step, - cmd_contains="pvmap_generator.py", + expected_command=expected_command, ) def test_create_schema_map_step_dry_run_succeeds_if_input_missing( diff --git a/tools/agentic_import/sdmx_pipeline_steps.py b/tools/agentic_import/sdmx_pipeline_steps.py index 787c46a93f..498553572a 100644 --- a/tools/agentic_import/sdmx_pipeline_steps.py +++ b/tools/agentic_import/sdmx_pipeline_steps.py @@ -248,8 +248,7 @@ def run(self) -> None: f"Starting SDMX metadata extraction: {' '.join(context.full_command)} -> {context.output_path}" ) else: - logging.info( - f"Extracting SDMX metadata to {context.output_path}") + logging.info(f"Extracting SDMX metadata to {context.output_path}") _run_command(context.full_command, verbose=self._config.run.verbose) def dry_run(self) -> None: From 04da431ce48caf79e15a3645fad7e9213517e5fa Mon Sep 17 00:00:00 2001 From: rohit kumar Date: Wed, 14 Jan 2026 07:10:03 +0000 Subject: [PATCH 5/6] docs: add metadata extraction step Document new pipeline step and outputs --- tools/agentic_import/sdmx_import_pipeline.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tools/agentic_import/sdmx_import_pipeline.md b/tools/agentic_import/sdmx_import_pipeline.md index 32f7eae252..1fca41c15d 100644 --- a/tools/agentic_import/sdmx_import_pipeline.md +++ b/tools/agentic_import/sdmx_import_pipeline.md @@ -6,6 +6,7 @@ The SDMX Agentic Import Pipeline is a Python-based system designed to automate t The pipeline orchestrates several tools to handle the end-to-end import process: 1. **Download**: Retrieves data and metadata from SDMX endpoints. +2. **Metadata Extraction**: Converts SDMX metadata into JSON for downstream steps. 2. **Sample**: Creates a manageable sample of the data for analysis. 3. **Schema Mapping**: Generates Property-Value (PV) mappings using LLM-based tools. 4. **Full Data Processing**: Converts the full dataset into Data Commons MCF and CSV formats. @@ -51,6 +52,7 @@ python $DC_DATA_REPO_PATH/tools/agentic_import/sdmx_import_pipeline.py \ - `--dataset_prefix`: (Optional) Prefix for generated artifacts. Useful for disambiguating multiple datasets in the same working directory. If not provided, it is derived from the dataflow ID. - `--sample.rows`: Number of rows for the sample dataset (default: 1000). - `--force`: Force re-execution of all steps, ignoring saved state. +- `--run_only`: Execute only a single pipeline step by name. - `--skip_confirmation`: Skip interactive confirmation prompts during schema mapping. - `--verbose`: Enable verbose logging. @@ -60,10 +62,11 @@ The pipeline consists of the following steps, executed in order: 1. **DownloadDataStep**: Downloads SDMX data to `_data.csv`. 2. **DownloadMetadataStep**: Downloads SDMX metadata to `_metadata.xml`. -3. **CreateSampleStep**: Creates `_sample.csv` from the downloaded data. -4. **CreateSchemaMapStep**: Generates PV map and config in `sample_output/` using `pvmap_generator.py`. -5. **ProcessFullDataStep**: Processes the full data using `stat_var_processor.py` to generate artifacts in `output/`. -6. **CreateDcConfigStep**: Generates `output/_config.json` for custom DC imports. +3. **ExtractMetadataStep**: Extracts SDMX metadata to `_metadata.json`. +4. **CreateSampleStep**: Creates `_sample.csv` from the downloaded data. +5. **CreateSchemaMapStep**: Generates PV map and config in `sample_output/` using `pvmap_generator.py`. +6. **ProcessFullDataStep**: Processes the full data using `stat_var_processor.py` to generate artifacts in `output/`. +7. **CreateDcConfigStep**: Generates `output/_config.json` for custom DC imports. ## Directory Structure @@ -73,6 +76,7 @@ The pipeline organizes outputs within the specified working directory: working_dir/ ├── _data.csv # Raw downloaded data ├── _metadata.xml # Raw downloaded metadata +├── _metadata.json # Extracted metadata for downstream steps ├── _sample.csv # Sampled data ├── .datacommons/ │ └── .state.json # Pipeline state for resuming runs @@ -92,6 +96,7 @@ The pipeline automatically saves its state to a `.state.json` fi - **Resuming**: If a run is interrupted, running the same command again will resume from the last successful step. - **Skipping**: Steps that have already completed successfully will be skipped unless `--force` is used. - **Input Hashing**: The pipeline tracks input configuration. If critical configuration changes, it may trigger re-execution of steps. +- **Run Only**: Use `--run_only=` to execute just one step (for example, `download-metadata` or `create-schema-mapping`). ## Troubleshooting From ac53b46e7928acca7605934cb0f68c498e428cb7 Mon Sep 17 00:00:00 2001 From: rohit kumar Date: Thu, 15 Jan 2026 06:54:43 +0000 Subject: [PATCH 6/6] Add SDMX run range flags Wire run_from/run_until into config and planning Document and test range filtering --- tools/agentic_import/sdmx_import_pipeline.md | 3 + tools/agentic_import/sdmx_import_pipeline.py | 6 ++ .../sdmx_import_pipeline_test.py | 69 ++++++++++++++++ tools/agentic_import/sdmx_pipeline_builder.py | 80 ++++++++++++++++++- tools/agentic_import/sdmx_pipeline_config.py | 2 + 5 files changed, 158 insertions(+), 2 deletions(-) diff --git a/tools/agentic_import/sdmx_import_pipeline.md b/tools/agentic_import/sdmx_import_pipeline.md index 1fca41c15d..f7e0282b72 100644 --- a/tools/agentic_import/sdmx_import_pipeline.md +++ b/tools/agentic_import/sdmx_import_pipeline.md @@ -53,6 +53,8 @@ python $DC_DATA_REPO_PATH/tools/agentic_import/sdmx_import_pipeline.py \ - `--sample.rows`: Number of rows for the sample dataset (default: 1000). - `--force`: Force re-execution of all steps, ignoring saved state. - `--run_only`: Execute only a single pipeline step by name. +- `--run_from`: Execute pipeline steps starting at the named step (inclusive). +- `--run_until`: Execute pipeline steps through the named step (inclusive). - `--skip_confirmation`: Skip interactive confirmation prompts during schema mapping. - `--verbose`: Enable verbose logging. @@ -97,6 +99,7 @@ The pipeline automatically saves its state to a `.state.json` fi - **Skipping**: Steps that have already completed successfully will be skipped unless `--force` is used. - **Input Hashing**: The pipeline tracks input configuration. If critical configuration changes, it may trigger re-execution of steps. - **Run Only**: Use `--run_only=` to execute just one step (for example, `download-metadata` or `create-schema-mapping`). +- **Run Range**: Use `--run_from=` and/or `--run_until=` to limit execution to a contiguous range of steps (inclusive). The range respects incremental state by default; use `--force` to rerun all steps in the range. ## Troubleshooting diff --git a/tools/agentic_import/sdmx_import_pipeline.py b/tools/agentic_import/sdmx_import_pipeline.py index 32957e6c6d..77bdee4838 100644 --- a/tools/agentic_import/sdmx_import_pipeline.py +++ b/tools/agentic_import/sdmx_import_pipeline.py @@ -86,6 +86,10 @@ def _define_flags() -> None: flags.DEFINE_string("run_only", None, "Execute only a specific pipeline step by name.") + flags.DEFINE_string("run_from", None, + "Execute pipeline steps starting at the named step.") + flags.DEFINE_string("run_until", None, + "Execute pipeline steps through the named step.") flags.DEFINE_boolean("force", False, "Force all steps to run.") @@ -313,6 +317,8 @@ def prepare_config() -> PipelineConfig: dataset_prefix=FLAGS.dataset_prefix, working_dir=FLAGS.working_dir, run_only=FLAGS.run_only, + run_from=FLAGS.run_from, + run_until=FLAGS.run_until, force=FLAGS.force, verbose=FLAGS.verbose, skip_confirmation=FLAGS.skip_confirmation, diff --git a/tools/agentic_import/sdmx_import_pipeline_test.py b/tools/agentic_import/sdmx_import_pipeline_test.py index 08209d04af..a4f3e06d29 100644 --- a/tools/agentic_import/sdmx_import_pipeline_test.py +++ b/tools/agentic_import/sdmx_import_pipeline_test.py @@ -424,6 +424,75 @@ def test_run_only_ignores_timestamp_chaining(self) -> None: names = self._names_from_builder(cfg, state=state) self.assertEqual(names, ["download-data"]) + def test_run_range_selects_steps(self) -> None: + cfg = PipelineConfig(run=RunConfig(command=_TEST_COMMAND, + run_from="create-sample", + run_until="process-full-data")) + names = self._names_from_builder(cfg) + self.assertEqual(names, [ + "create-sample", + "create-schema-mapping", + "process-full-data", + ]) + + def test_run_range_respects_outside_predecessor(self) -> None: + newer = 2_000 + older = 1_000 + state = self._state_with({ + "download-data": ("1", "succeeded", older), + "download-metadata": ("1", "succeeded", newer), + "extract-metadata": ("1", "succeeded", older), + "create-sample": ("1", "succeeded", older), + "create-schema-mapping": ("1", "succeeded", older), + "process-full-data": ("1", "succeeded", older), + "create-dc-config": ("1", "succeeded", older), + }) + cfg = PipelineConfig(run=RunConfig(command=_TEST_COMMAND, + run_from="extract-metadata", + run_until="process-full-data")) + names = self._names_from_builder(cfg, state=state) + self.assertEqual(names, [ + "extract-metadata", + "create-sample", + "create-schema-mapping", + "process-full-data", + ]) + + def test_run_range_requires_valid_step(self) -> None: + cfg = PipelineConfig( + run=RunConfig(command=_TEST_COMMAND, run_from="nope")) + with self.assertRaisesRegex(ValueError, + "run_from step not found: nope"): + self._names_from_builder(cfg) + + def test_run_range_rejects_inverted_bounds(self) -> None: + cfg = PipelineConfig(run=RunConfig(command=_TEST_COMMAND, + run_from="process-full-data", + run_until="create-sample")) + with self.assertRaisesRegex(ValueError, "must not come after"): + self._names_from_builder(cfg) + + def test_force_run_range_overrides_state(self) -> None: + state = self._state_with({ + "download-data": ("1", "succeeded", 1_000), + "download-metadata": ("1", "succeeded", 1_000), + "extract-metadata": ("1", "succeeded", 1_000), + }) + cfg = PipelineConfig(run=RunConfig(command=_TEST_COMMAND, + run_from="download-data", + run_until="extract-metadata")) + names = self._names_from_builder(cfg, state=state) + self.assertEqual(names, []) + + cfg_force = PipelineConfig(run=RunConfig(command=_TEST_COMMAND, + run_from="download-data", + run_until="extract-metadata", + force=True)) + names_force = self._names_from_builder(cfg_force, state=state) + self.assertEqual( + names_force, + ["download-data", "download-metadata", "extract-metadata"]) + def test_version_bump_schedules_downstream(self) -> None: steps = [ _VersionedStep("download-data", "1"), diff --git a/tools/agentic_import/sdmx_pipeline_builder.py b/tools/agentic_import/sdmx_pipeline_builder.py index 5a2ef4a2f4..6590af64e5 100644 --- a/tools/agentic_import/sdmx_pipeline_builder.py +++ b/tools/agentic_import/sdmx_pipeline_builder.py @@ -63,8 +63,14 @@ def __init__(self, def build(self) -> BuildResult: if self._config.run.run_only: + if self._config.run.run_from or self._config.run.run_until: + raise ValueError( + "run_only cannot be combined with run_from/run_until") planned, decisions = self._plan_run_only(self._config.run.run_only) - elif self._config.run.force: + logging.info("Built SDMX pipeline with %d steps", len(planned)) + return BuildResult(pipeline=Pipeline(steps=planned), + decisions=decisions) + if self._config.run.force: logging.info("Force flag set; scheduling all SDMX steps") planned, decisions = self._plan_all_steps( "Force flag set; scheduling this step") @@ -74,6 +80,7 @@ def build(self) -> BuildResult: "Critical inputs changed; scheduling this step") else: planned, decisions = self._plan_incremental() + planned, decisions = self._apply_range_filter(planned, decisions) logging.info("Built SDMX pipeline with %d steps", len(planned)) return BuildResult(pipeline=Pipeline(steps=planned), decisions=decisions) @@ -117,11 +124,16 @@ def _plan_all_steps(self, return planned, decisions def _plan_incremental(self) -> tuple[list[Step], list[StepDecision]]: + return self._plan_incremental_steps(self._steps) + + def _plan_incremental_steps( + self, + steps: Sequence[Step]) -> tuple[list[Step], list[StepDecision]]: planned: list[Step] = [] decisions: list[StepDecision] = [] schedule_all_remaining = False previous: Step | None = None - for step in self._steps: + for step in steps: if schedule_all_remaining: planned.append(step) decisions.append( @@ -175,6 +187,70 @@ def _plan_incremental(self) -> tuple[list[Step], list[StepDecision]]: logging.info("No steps scheduled.") return planned, decisions + def _resolve_step_index(self, name: str, flag_name: str) -> int: + for index, step in enumerate(self._steps): + if step.name == name: + return index + raise ValueError(f"{flag_name} step not found: {name}") + + def _range_skip_reason(self) -> str: + parts = [] + if self._config.run.run_from: + parts.append(f"run_from={self._config.run.run_from}") + if self._config.run.run_until: + parts.append(f"run_until={self._config.run.run_until}") + suffix = " ".join(parts) if parts else "range" + return f"Outside requested range ({suffix})" + + def _select_range(self) -> tuple[int, int, bool]: + run_from = self._config.run.run_from + run_until = self._config.run.run_until + if not run_from and not run_until: + return 0, len(self._steps) - 1, False + start = 0 + end = len(self._steps) - 1 + if run_from: + start = self._resolve_step_index(run_from, "run_from") + if run_until: + end = self._resolve_step_index(run_until, "run_until") + if start > end: + raise ValueError( + f"run_from={run_from} must not come after run_until={run_until}" + ) + return start, end, True + + def _apply_range_filter( + self, + planned: list[Step], + decisions: list[StepDecision], + ) -> tuple[list[Step], list[StepDecision]]: + start, end, has_range = self._select_range() + if not has_range: + return planned, decisions + decisions_by_name = { + decision.step_name: decision for decision in decisions + } + in_range = {step.name for step in self._steps[start:end + 1]} + filtered_planned = [step for step in planned if step.name in in_range] + filtered_decisions: list[StepDecision] = [] + outside_reason = self._range_skip_reason() + for index, step in enumerate(self._steps): + if start <= index <= end: + filtered_decisions.append(decisions_by_name[step.name]) + else: + decision = decisions_by_name[step.name] + filtered_decisions.append( + StepDecision(step_name=step.name, + decision=StepDecision.SKIP, + reason=self._format_outside_range_reason( + outside_reason, decision))) + return filtered_planned, filtered_decisions + + def _format_outside_range_reason(self, outside_reason: str, + decision: StepDecision) -> str: + return (f"{outside_reason}; original decision={decision.decision}; " + f"original reason={decision.reason}") + def _hash_changed(self) -> bool: if not self._critical_input_hash: return False diff --git a/tools/agentic_import/sdmx_pipeline_config.py b/tools/agentic_import/sdmx_pipeline_config.py index d6260eabd7..592ee508cb 100644 --- a/tools/agentic_import/sdmx_pipeline_config.py +++ b/tools/agentic_import/sdmx_pipeline_config.py @@ -54,6 +54,8 @@ class RunConfig: dataset_prefix: str | None = None working_dir: str | None = None run_only: str | None = None + run_from: str | None = None + run_until: str | None = None force: bool = False verbose: bool = False skip_confirmation: bool = False