diff --git a/tools/agentic_import/common/gemini_prompt_runner.py b/tools/agentic_import/common/gemini_prompt_runner.py new file mode 100644 index 0000000000..29b696730a --- /dev/null +++ b/tools/agentic_import/common/gemini_prompt_runner.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Render prompts and run the Gemini CLI with tracked run outputs.""" + +import shutil +import subprocess +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Callable, Mapping, Optional + +from absl import logging +from jinja2 import Environment, FileSystemLoader + + +@dataclass +class GeminiRunResult: + run_id: str + run_dir: Path + prompt_path: Path + gemini_log_path: Path + gemini_command: str + sandbox_enabled: bool + + +class GeminiPromptRunner: + + def __init__(self, + dataset_prefix: str, + working_dir: Optional[str] = None, + run_root: str = '.datacommons/runs', + dry_run: bool = False, + skip_confirmation: bool = False, + enable_sandboxing: bool = False, + gemini_cli: Optional[str] = None): + self._working_dir = Path( + working_dir).resolve() if working_dir else Path.cwd() + self._dataset_prefix = (dataset_prefix or '').strip() + if not self._dataset_prefix: + raise ValueError("dataset_prefix must be a non-empty string.") + + self._run_root = run_root + self._dry_run = dry_run + self._skip_confirmation = skip_confirmation + self._enable_sandboxing = enable_sandboxing + self._gemini_cli = gemini_cli + + self._run_id = self._build_run_id() + self._run_dir = self._create_run_dir() + + @property + def run_id(self) -> str: + return self._run_id + + @property + def run_dir(self) -> Path: + return self._run_dir + + @property + def working_dir(self) -> Path: + return self._working_dir + + def render_prompt(self, template_dir: Path, template_name: str, + context: Mapping[str, str], prompt_filename: str) -> Path: + # If other LLM runners are added later, extract rendering into a separate utility. + env = Environment(loader=FileSystemLoader(str(template_dir))) + template = env.get_template(template_name) + + rendered_prompt = template.render(**context) + output_file = self._run_dir / prompt_filename + with open(output_file, 'w') as f: + f.write(rendered_prompt) + + logging.info("Generated prompt written to: %s", output_file) + return output_file + + def run(self, + prompt_file: Path, + log_filename: str = 'gemini_cli.log', + log_path_override: Optional[Path] = None, + confirm_fn: Optional[Callable[[Path], bool]] = None, + cancel_log_message: Optional[str] = None) -> GeminiRunResult: + gemini_log_path = (log_path_override.resolve() if log_path_override else + (self._run_dir / log_filename)) + gemini_command = self._build_gemini_command(prompt_file, + gemini_log_path) + + result = GeminiRunResult(run_id=self._run_id, + run_dir=self._run_dir, + prompt_path=prompt_file, + gemini_log_path=gemini_log_path, + gemini_command=gemini_command, + sandbox_enabled=self._enable_sandboxing) + + if self._dry_run: + logging.info( + "Dry run mode: Prompt file generated at %s. " + "Skipping Gemini CLI execution.", prompt_file) + return result + + if not self._skip_confirmation and confirm_fn is not None: + if not confirm_fn(prompt_file): + if cancel_log_message: + logging.info(cancel_log_message) + return result + + if not self._check_gemini_cli_available(): + logging.warning( + "Gemini CLI not found in PATH. Will attempt to run anyway (may work if aliased)." + ) + + logging.info("Launching gemini (cwd: %s): %s", self._working_dir, + gemini_command) + logging.info("Gemini output will be saved to: %s", gemini_log_path) + + exit_code = self._run_subprocess(gemini_command) + if exit_code == 0: + logging.info("Gemini CLI completed successfully") + return result + + raise RuntimeError( + f"Gemini CLI execution failed with exit code {exit_code}") + + def _build_run_id(self) -> str: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return f"{self._dataset_prefix}_gemini_{timestamp}" + + def _create_run_dir(self) -> Path: + run_root = Path(self._run_root).expanduser() + if not run_root.is_absolute(): + run_root = self._working_dir / run_root + run_root.mkdir(parents=True, exist_ok=True) + + run_dir = run_root / self._run_id + run_dir.mkdir(parents=True, exist_ok=True) + return run_dir + + def _check_gemini_cli_available(self) -> bool: + if self._gemini_cli: + return True + return shutil.which('gemini') is not None + + def _build_gemini_command(self, prompt_file: Path, log_file: Path) -> str: + prompt_path = prompt_file.resolve() + log_path = log_file.resolve() + gemini_cmd = self._gemini_cli or 'gemini' + sandbox_flag = "--sandbox" if self._enable_sandboxing else "" + return ( + f"cat '{prompt_path}' | {gemini_cmd} {sandbox_flag} -y 2>&1 | tee '{log_path}'" + ) + + def _run_subprocess(self, command: str) -> int: + try: + process = subprocess.Popen(command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=True, + cwd=self._working_dir, + encoding='utf-8', + errors='replace', + bufsize=1, + universal_newlines=True) + + while True: + output = process.stdout.readline() + if output == '' and process.poll() is not None: + break + if output: + print(output.rstrip()) + + return process.wait() + except Exception as e: + logging.error("Error running subprocess: %s", str(e)) + return 1 diff --git a/tools/agentic_import/sdmx/README.md b/tools/agentic_import/sdmx/README.md new file mode 100644 index 0000000000..a66c828331 --- /dev/null +++ b/tools/agentic_import/sdmx/README.md @@ -0,0 +1,94 @@ +# SDMX Metadata Enrichment Pipeline + +The enrichment process is organized into three distinct steps: Discovery, Fetching, and Integration. + +--- + +## Step 1: Discovery (`metadata_enricher_find.py`) + +**Role**: Analyzes the base SDMX metadata using Gemini CLI to identify codes and concepts that require enrichment. It generates context-aware search queries (`enrichment_query`) for these items while preserving the original dataset structure. + +**Command**: +```bash +python tools/agentic_import/sdmx/metadata_enricher_find.py \ + --input_metadata_json="/path/to/metadata.json" \ + --dataset_prefix="oecd_prices" \ + --output_path="/path/to/items_to_enrich.json" \ + --gemini_cli="gemini" \ + --enable_sandboxing +``` + +**Input**: +- Base SDMX `metadata.json` file. + +**Output**: +- `items_to_enrich.json`: A pruned JSON structure containing only selected items with their generated `enrichment_query`. + +--- + +## Step 2: Fetching (`metadata_enricher_fetch.py`) + +**Role**: Orchestrates external web searches (via Gemini CLI) to populate detailed descriptions (`enriched_description`) for the items identified in the previous step. + +**Command**: +```bash +python tools/agentic_import/sdmx/metadata_enricher_fetch.py \ + --input_items_json="/path/to/items_to_enrich.json" \ + --dataset_prefix="oecd_prices" \ + --output_path="/path/to/enriched_items.json" \ + --gemini_cli="gemini" \ + --enable_sandboxing +``` + +**Input**: +- `items_to_enrich.json` (from Step 1). + +**Output**: +- `enriched_items.json`: A pruned JSON structure with `enriched_description` added for each item. + +--- + +## Step 3: Integration (`metadata_enricher_merge.py`) + +**Role**: Merges the fetched descriptions back into the original SDMX metadata JSON, resulting in a complete, enriched metadata file. + +**Command**: +```bash +python tools/agentic_import/sdmx/metadata_enricher_merge.py \ + --input_metadata_json="/path/to/metadata.json" \ + --input_enriched_items_json="/path/to/enriched_items.json" \ + --output_path="/path/to/metadata_enriched.json" +``` + +**Input**: +- Base SDMX `metadata.json`. +- `enriched_items.json` (from Step 2). + +**Output**: +- `metadata_enriched.json`: The final, full metadata JSON with `enriched_description` fields merged into the matching codes and concepts. + +--- + +## Full Pipeline Example + +To run the entire enrichment pipeline for a dataset: + +```bash +# 1. Discover items to enrich +python tools/agentic_import/sdmx/metadata_enricher_find.py \ + --input_metadata_json="metadata.json" \ + --dataset_prefix="my_dataset" \ + --output_path="items_to_enrich.json" + +# 2. Fetch enriched descriptions +python tools/agentic_import/sdmx/metadata_enricher_fetch.py \ + --input_items_json="items_to_enrich.json" \ + --dataset_prefix="my_dataset" \ + --output_path="enriched_items.json" + +# 3. Merge results into the original metadata +python tools/agentic_import/sdmx/metadata_enricher_merge.py \ + --input_metadata_json="metadata.json" \ + --input_enriched_items_json="enriched_items.json" \ + --output_path="metadata_enriched.json" +``` diff --git a/tools/agentic_import/sdmx/metadata_enricher_fetch.py b/tools/agentic_import/sdmx/metadata_enricher_fetch.py new file mode 100644 index 0000000000..eed3da1568 --- /dev/null +++ b/tools/agentic_import/sdmx/metadata_enricher_fetch.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Fetch enriched descriptions for selected SDMX items with Gemini CLI.""" + +import os +import platform +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +_REPO_ROOT = Path(_SCRIPT_DIR).resolve().parents[3] +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from absl import app +from absl import flags +from absl import logging + +from tools.agentic_import.common.gemini_prompt_runner import ( + GeminiPromptRunner, GeminiRunResult) + +_FLAGS = flags.FLAGS + + +def _define_flags(): + try: + flags.DEFINE_string('input_items_json', None, + 'Path to input items JSON (required)') + flags.mark_flag_as_required('input_items_json') + + flags.DEFINE_string('dataset_prefix', None, + 'Dataset prefix for run id (required, non-empty)') + flags.mark_flag_as_required('dataset_prefix') + + flags.DEFINE_string('output_path', None, + 'Path to output items JSON (required)') + flags.mark_flag_as_required('output_path') + + flags.DEFINE_boolean('dry_run', False, + 'Generate prompt only without calling Gemini CLI') + + flags.DEFINE_boolean( + 'skip_confirmation', False, + 'Skip user confirmation before running Gemini CLI') + + flags.DEFINE_boolean( + 'enable_sandboxing', + platform.system() == 'Darwin', + 'Enable sandboxing for Gemini CLI (default: True on macOS, False elsewhere)' + ) + + flags.DEFINE_string( + 'gemini_cli', 'gemini', + 'Custom path or command to invoke Gemini CLI. ' + 'Example: "/usr/local/bin/gemini". ' + 'WARNING: This value is executed in a shell - use only with trusted input.' + ) + + flags.DEFINE_string( + 'working_dir', None, + 'Working directory for the run (default: current directory)') + except flags.DuplicateFlagError: + pass + + +@dataclass +class Config: + input_items_json: str + dataset_prefix: str + output_path: str + dry_run: bool = False + skip_confirmation: bool = False + enable_sandboxing: bool = False + gemini_cli: Optional[str] = None + working_dir: Optional[str] = None + + +class EnrichmentDataFetcher: + + def __init__(self, config: Config): + self._config = config + self._working_dir = Path( + config.working_dir).resolve() if config.working_dir else Path.cwd() + self._input_path = self._resolve_path(config.input_items_json) + self._output_path = self._resolve_path(config.output_path) + self._dataset_prefix = (config.dataset_prefix or '').strip() + + if not self._dataset_prefix: + raise ValueError("dataset_prefix must be a non-empty string.") + + if not self._input_path.exists(): + raise FileNotFoundError( + f"input_items_json does not exist: {self._input_path}") + + self._output_path.parent.mkdir(parents=True, exist_ok=True) + + self._runner = GeminiPromptRunner( + dataset_prefix=self._dataset_prefix, + working_dir=str(self._working_dir), + dry_run=config.dry_run, + skip_confirmation=config.skip_confirmation, + enable_sandboxing=config.enable_sandboxing, + gemini_cli=config.gemini_cli, + ) + + def fetch_enrichment_data(self) -> GeminiRunResult: + prompt_file = self._generate_prompt() + return self._runner.run( + prompt_file, + log_filename='gemini_cli.log', + confirm_fn=self._get_user_confirmation, + cancel_log_message="Enrichment data fetch cancelled by user.", + ) + + def _resolve_path(self, path: str) -> Path: + resolved = Path(path).expanduser() + if not resolved.is_absolute(): + resolved = self._working_dir / resolved + return resolved.resolve() + + def _generate_prompt(self) -> Path: + template_dir = Path(_SCRIPT_DIR) / 'templates' + return self._runner.render_prompt( + template_dir=template_dir, + template_name='metadata_enricher_fetch_prompt.j2', + context={ + "input_items_abs": str(self._input_path), + "output_path_abs": str(self._output_path), + }, + prompt_filename='metadata_enricher_fetch_prompt.md', + ) + + def _get_user_confirmation(self, prompt_file: Path) -> bool: + print("\n" + "=" * 60) + print("SDMX ENRICHMENT DATA FETCH SUMMARY") + print("=" * 60) + print(f"Input items file: {self._input_path}") + print(f"Output items file: {self._output_path}") + print(f"Prompt file: {prompt_file}") + print(f"Working directory: {self._working_dir}") + print( + f"Sandboxing: {'Enabled' if self._config.enable_sandboxing else 'Disabled'}" + ) + if not self._config.enable_sandboxing: + print( + "WARNING: Sandboxing is disabled. Gemini will run without safety restrictions." + ) + print("=" * 60) + + while True: + try: + response = input( + "Ready to run Gemini for enrichment data fetch? (y/n): " + ).strip().lower() + if response in ['y', 'yes']: + return True + if response in ['n', 'no']: + print("Data fetch cancelled by user.") + return False + print("Please enter 'y' or 'n'.") + except KeyboardInterrupt: + print("\nData fetch cancelled by user.") + return False + + +def prepare_config() -> Config: + return Config(input_items_json=_FLAGS.input_items_json, + dataset_prefix=_FLAGS.dataset_prefix, + output_path=_FLAGS.output_path, + dry_run=_FLAGS.dry_run, + skip_confirmation=_FLAGS.skip_confirmation, + enable_sandboxing=_FLAGS.enable_sandboxing, + gemini_cli=_FLAGS.gemini_cli, + working_dir=_FLAGS.working_dir) + + +def main(_): + config = prepare_config() + logging.info("Loaded config for enrichment data fetch") + + fetcher = EnrichmentDataFetcher(config) + fetcher.fetch_enrichment_data() + + logging.info("Enrichment data fetch completed.") + return 0 + + +if __name__ == '__main__': + _define_flags() + app.run(main) diff --git a/tools/agentic_import/sdmx/metadata_enricher_fetch_test.py b/tools/agentic_import/sdmx/metadata_enricher_fetch_test.py new file mode 100644 index 0000000000..494381f72e --- /dev/null +++ b/tools/agentic_import/sdmx/metadata_enricher_fetch_test.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +from jinja2 import Template + +from tools.agentic_import.sdmx.metadata_enricher_fetch import ( + Config, EnrichmentDataFetcher) + + +class EnrichmentDataFetcherTest(unittest.TestCase): + + def test_dry_run_creates_prompt_and_run_dir(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + input_path = Path(tmpdir) / 'items.json' + input_path.write_text(json.dumps({"items": []})) + output_path = Path(tmpdir) / 'out' / 'items_enriched.json' + + config = Config( + input_items_json=str(input_path), + dataset_prefix='demo', + output_path=str(output_path), + dry_run=True, + skip_confirmation=True, + enable_sandboxing=False, + working_dir=tmpdir, + ) + + fetcher = EnrichmentDataFetcher(config) + with mock.patch("jinja2.environment.Template.render", + autospec=True, + side_effect=Template.render) as render_mock: + result = fetcher.fetch_enrichment_data() + + self.assertTrue(result.run_id.startswith('demo_gemini_')) + self.assertTrue(result.run_dir.is_dir()) + self.assertTrue(result.prompt_path.is_file()) + self.assertTrue(result.gemini_log_path.is_absolute()) + self.assertEqual(result.prompt_path.parent, result.run_dir) + expected_command = ( + f"cat '{result.prompt_path.resolve()}' | " + f"{config.gemini_cli or 'gemini'} " + f"{'--sandbox' if config.enable_sandboxing else ''} " + f"-y 2>&1 | tee '{result.gemini_log_path.resolve()}'") + self.assertEqual(result.gemini_command, expected_command) + self.assertTrue(output_path.parent.is_dir()) + + self.assertEqual(render_mock.call_count, 1) + _, render_kwargs = render_mock.call_args + self.assertEqual( + render_kwargs, { + "input_items_abs": str(input_path.resolve()), + "output_path_abs": str(output_path.resolve()), + }) + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/agentic_import/sdmx/metadata_enricher_find.py b/tools/agentic_import/sdmx/metadata_enricher_find.py new file mode 100644 index 0000000000..eabaff845b --- /dev/null +++ b/tools/agentic_import/sdmx/metadata_enricher_find.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Select SDMX items to enrich and generate enrichment queries.""" + +import os +import platform +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +_REPO_ROOT = Path(_SCRIPT_DIR).resolve().parents[3] +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from absl import app +from absl import flags +from absl import logging + +from tools.agentic_import.common.gemini_prompt_runner import ( + GeminiPromptRunner, GeminiRunResult) + +_FLAGS = flags.FLAGS + + +def _define_flags(): + try: + flags.DEFINE_string('input_metadata_json', None, + 'Path to input SDMX metadata JSON (required)') + flags.mark_flag_as_required('input_metadata_json') + + flags.DEFINE_string('dataset_prefix', None, + 'Dataset prefix for run id (required, non-empty)') + flags.mark_flag_as_required('dataset_prefix') + + flags.DEFINE_string('output_path', None, + 'Path to output items JSON (required)') + flags.mark_flag_as_required('output_path') + + flags.DEFINE_boolean('dry_run', False, + 'Generate prompt only without calling Gemini CLI') + + flags.DEFINE_boolean( + 'skip_confirmation', False, + 'Skip user confirmation before running Gemini CLI') + + flags.DEFINE_boolean( + 'enable_sandboxing', + platform.system() == 'Darwin', + 'Enable sandboxing for Gemini CLI (default: True on macOS, False elsewhere)' + ) + + flags.DEFINE_string( + 'gemini_cli', 'gemini', + 'Custom path or command to invoke Gemini CLI. ' + 'Example: "/usr/local/bin/gemini". ' + 'WARNING: This value is executed in a shell - use only with trusted input.' + ) + + flags.DEFINE_string( + 'working_dir', None, + 'Working directory for the run (default: current directory)') + except flags.DuplicateFlagError: + pass + + +@dataclass +class Config: + input_metadata_json: str + dataset_prefix: str + output_path: str + dry_run: bool = False + skip_confirmation: bool = False + enable_sandboxing: bool = False + gemini_cli: Optional[str] = None + working_dir: Optional[str] = None + + +class EnrichmentItemsFinder: + + def __init__(self, config: Config): + self._config = config + self._working_dir = Path( + config.working_dir).resolve() if config.working_dir else Path.cwd() + self._input_path = self._resolve_path(config.input_metadata_json) + self._output_path = self._resolve_path(config.output_path) + self._dataset_prefix = (config.dataset_prefix or '').strip() + + if not self._dataset_prefix: + raise ValueError("dataset_prefix must be a non-empty string.") + + if not self._input_path.exists(): + raise FileNotFoundError( + f"input_metadata_json does not exist: {self._input_path}") + + self._output_path.parent.mkdir(parents=True, exist_ok=True) + + self._runner = GeminiPromptRunner( + dataset_prefix=self._dataset_prefix, + working_dir=str(self._working_dir), + dry_run=config.dry_run, + skip_confirmation=config.skip_confirmation, + enable_sandboxing=config.enable_sandboxing, + gemini_cli=config.gemini_cli, + ) + + def find_items_to_enrich(self) -> GeminiRunResult: + prompt_file = self._generate_prompt() + return self._runner.run( + prompt_file, + log_filename='gemini_cli.log', + confirm_fn=self._get_user_confirmation, + cancel_log_message="Enrichment item selection cancelled by user.", + ) + + def _resolve_path(self, path: str) -> Path: + resolved = Path(path).expanduser() + if not resolved.is_absolute(): + resolved = self._working_dir / resolved + return resolved.resolve() + + def _generate_prompt(self) -> Path: + template_dir = Path(_SCRIPT_DIR) / 'templates' + return self._runner.render_prompt( + template_dir=template_dir, + template_name='metadata_enricher_find_prompt.j2', + context={ + "input_metadata_abs": str(self._input_path), + "output_path_abs": str(self._output_path), + }, + prompt_filename='metadata_enricher_find_prompt.md', + ) + + def _get_user_confirmation(self, prompt_file: Path) -> bool: + print("\n" + "=" * 60) + print("SDMX ENRICHMENT ITEM SELECTION SUMMARY") + print("=" * 60) + print(f"Input metadata file: {self._input_path}") + print(f"Output items file: {self._output_path}") + print(f"Prompt file: {prompt_file}") + print(f"Working directory: {self._working_dir}") + print( + f"Sandboxing: {'Enabled' if self._config.enable_sandboxing else 'Disabled'}" + ) + if not self._config.enable_sandboxing: + print( + "WARNING: Sandboxing is disabled. Gemini will run without safety restrictions." + ) + print("=" * 60) + + while True: + try: + response = input( + "Ready to run Gemini for enrichment item selection? (y/n): " + ).strip().lower() + if response in ['y', 'yes']: + return True + if response in ['n', 'no']: + print("Selection cancelled by user.") + return False + print("Please enter 'y' or 'n'.") + except KeyboardInterrupt: + print("\nSelection cancelled by user.") + return False + + +def prepare_config() -> Config: + return Config(input_metadata_json=_FLAGS.input_metadata_json, + dataset_prefix=_FLAGS.dataset_prefix, + output_path=_FLAGS.output_path, + dry_run=_FLAGS.dry_run, + skip_confirmation=_FLAGS.skip_confirmation, + enable_sandboxing=_FLAGS.enable_sandboxing, + gemini_cli=_FLAGS.gemini_cli, + working_dir=_FLAGS.working_dir) + + +def main(_): + config = prepare_config() + logging.info("Loaded config for enrichment item selection") + + finder = EnrichmentItemsFinder(config) + finder.find_items_to_enrich() + + logging.info("Enrichment item selection completed.") + return 0 + + +if __name__ == '__main__': + _define_flags() + app.run(main) diff --git a/tools/agentic_import/sdmx/metadata_enricher_find_test.py b/tools/agentic_import/sdmx/metadata_enricher_find_test.py new file mode 100644 index 0000000000..4f45a227cf --- /dev/null +++ b/tools/agentic_import/sdmx/metadata_enricher_find_test.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +from jinja2 import Template + +from tools.agentic_import.sdmx.metadata_enricher_find import ( + Config, EnrichmentItemsFinder) + + +class EnrichmentItemsFinderTest(unittest.TestCase): + + def test_dry_run_creates_prompt_and_run_dir(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + input_path = Path(tmpdir) / 'metadata.json' + input_path.write_text(json.dumps({"dataflows": []})) + output_path = Path(tmpdir) / 'out' / 'items.json' + + config = Config( + input_metadata_json=str(input_path), + dataset_prefix='demo', + output_path=str(output_path), + dry_run=True, + skip_confirmation=True, + enable_sandboxing=False, + working_dir=tmpdir, + ) + + finder = EnrichmentItemsFinder(config) + with mock.patch("jinja2.environment.Template.render", + autospec=True, + side_effect=Template.render) as render_mock: + result = finder.find_items_to_enrich() + + self.assertTrue(result.run_id.startswith('demo_gemini_')) + self.assertTrue(result.run_dir.is_dir()) + self.assertTrue(result.prompt_path.is_file()) + self.assertTrue(result.gemini_log_path.is_absolute()) + self.assertEqual(result.prompt_path.parent, result.run_dir) + expected_command = ( + f"cat '{result.prompt_path.resolve()}' | " + f"{config.gemini_cli or 'gemini'} " + f"{'--sandbox' if config.enable_sandboxing else ''} " + f"-y 2>&1 | tee '{result.gemini_log_path.resolve()}'") + self.assertEqual(result.gemini_command, expected_command) + self.assertTrue(output_path.parent.is_dir()) + + self.assertEqual(render_mock.call_count, 1) + _, render_kwargs = render_mock.call_args + self.assertEqual( + render_kwargs, { + "input_metadata_abs": str(input_path.resolve()), + "output_path_abs": str(output_path.resolve()), + }) + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/agentic_import/sdmx/metadata_enricher_merge.py b/tools/agentic_import/sdmx/metadata_enricher_merge.py new file mode 100644 index 0000000000..4ae702ee16 --- /dev/null +++ b/tools/agentic_import/sdmx/metadata_enricher_merge.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Merge enriched SDMX descriptions into base metadata files.""" + +import json +from pathlib import Path +from typing import Any, Dict, List, Set, Union + +from absl import app +from absl import flags +from absl import logging + +_FLAGS = flags.FLAGS + +DictOrList = Union[Dict[str, Any], List[Any]] + + +class CollectionMerger: + """Merges selected fields from an incoming nested collection into a base one.""" + + def merge(self, + base: DictOrList, + incoming: DictOrList, + fields_to_update: List[str], + key_field: str = 'id', + allow_overwrite: bool = False) -> DictOrList: + """Merges specific fields from `incoming` into `base` in-place. + + This performs a structure-aware merge: + 1. Only keys in `fields_to_update` are added or updated. + 2. Lists are merged by matching items on `key_field` (e.g., "id"). + 3. Structure in `base` is preserved; new unmatched containers are ignored. + + Args: + base: The target dictionary or list to update. + incoming: The source dictionary or list with updates. + fields_to_update: Keys allowed to be changed or added. + key_field: Key used to match list items. + allow_overwrite: Whether to overwrite existing values. + + Returns: + The updated `base` object. + """ + return self._merge_value(base, + incoming, + fields_to_update=set(fields_to_update), + key_field=key_field, + allow_overwrite=allow_overwrite, + path='') + + def _merge_value(self, base: DictOrList, incoming: DictOrList, + fields_to_update: Set[str], key_field: str, + allow_overwrite: bool, path: str) -> DictOrList: + # Only traverse matching container types; leave base untouched otherwise. + if isinstance(base, dict) and isinstance(incoming, dict): + return self._merge_dict(base, + incoming, + fields_to_update=fields_to_update, + key_field=key_field, + allow_overwrite=allow_overwrite, + path=path) + if isinstance(base, list) and isinstance(incoming, list): + return self._merge_list(base, + incoming, + fields_to_update=fields_to_update, + key_field=key_field, + allow_overwrite=allow_overwrite, + path=path) + if type(base) != type(incoming): + location = path or 'root' + logging.warning(f"Type mismatch at {location}; skipping.") + return base + + def _merge_dict(self, base: Dict[str, Any], incoming: Dict[str, Any], + fields_to_update: Set[str], key_field: str, + allow_overwrite: bool, path: str) -> Dict[str, Any]: + for key, incoming_value in incoming.items(): + next_path = self._join_path(path, key) + if key in fields_to_update: + self._merge_field(base, + key, + incoming_value, + allow_overwrite=allow_overwrite, + path=next_path) + continue + if key not in base: + continue + base[key] = self._merge_value(base[key], + incoming_value, + fields_to_update=fields_to_update, + key_field=key_field, + allow_overwrite=allow_overwrite, + path=next_path) + return base + + def _merge_list(self, base: List[Any], incoming: List[Any], + fields_to_update: Set[str], key_field: str, + allow_overwrite: bool, path: str) -> List[Any]: + # Keep base ordering; only merge keyed items already present in base. + base_by_key: Dict[Any, Dict[str, Any]] = {} + for index, item in enumerate(base): + if not isinstance(item, dict): + logging.warning( + f"Base list item at {path}[index={index}] is not a dict; skipping keyed merge." + ) + continue + key_value = item.get(key_field) + if key_value is None: + logging.warning( + f"Base list item at {path}[index={index}] missing key '{key_field}'; skipping keyed merge." + ) + continue + if key_value in base_by_key: + logging.warning( + f"Duplicate key '{key_value}' in base list at {path}; using first occurrence." + ) + continue + base_by_key[key_value] = item + + seen_incoming_keys = set() + for index, item in enumerate(incoming): + if not isinstance(item, dict): + logging.warning( + f"Incoming list item at {path}[index={index}] is not a dict; ignoring." + ) + continue + key_value = item.get(key_field) + if key_value is None: + logging.warning( + f"Incoming list item at {path}[index={index}] missing key '{key_field}'; ignoring." + ) + continue + if key_value in seen_incoming_keys: + logging.warning( + f"Duplicate key '{key_value}' in incoming list at {path}; merging again." + ) + seen_incoming_keys.add(key_value) + + base_item = base_by_key.get(key_value) + if base_item is None: + item_path = self._list_item_path(path, key_field, key_value) + logging.warning( + f"No base match for {item_path}; ignoring incoming list item." + ) + continue + + item_path = self._list_item_path(path, key_field, key_value) + self._merge_dict(base_item, + item, + fields_to_update=fields_to_update, + key_field=key_field, + allow_overwrite=allow_overwrite, + path=item_path) + return base + + def _merge_field(self, base: Dict[str, Any], key: str, incoming_value: Any, + allow_overwrite: bool, path: str) -> Any: + if key not in base: + base[key] = incoming_value + return base + + base_value = base[key] + if allow_overwrite: + if base_value != incoming_value: + logging.info( + f"Overwriting value at {path} from {base_value!r} to {incoming_value!r}." + ) + base[key] = incoming_value + return base + + if base_value != incoming_value: + logging.info( + f"Preserving base value at {path}; incoming value ignored.") + return base + + def _join_path(self, path: str, key: str) -> str: + if not path: + return key + return f"{path}.{key}" + + def _list_item_path(self, path: str, key_field: str, key_value: Any) -> str: + return f"{path}[{key_field}={key_value}]" + + +def _define_flags(): + try: + flags.DEFINE_string('input_metadata_json', None, + 'Path to base SDMX metadata JSON (required)') + flags.mark_flag_as_required('input_metadata_json') + + flags.DEFINE_string('input_enriched_items_json', None, + 'Path to enriched items JSON (required)') + flags.mark_flag_as_required('input_enriched_items_json') + + flags.DEFINE_string('output_path', None, + 'Path to output enriched metadata JSON (required)') + flags.mark_flag_as_required('output_path') + except flags.DuplicateFlagError: + pass + + +def _load_json(path: Path) -> Dict[str, Any]: + with open(path, 'r') as f: + return json.load(f) + + +def _write_json(path: Path, data: Dict[str, Any]) -> None: + with open(path, 'w') as f: + json.dump(data, f, indent=2) + + +def merge_enrichment(input_metadata_json: str, input_enriched_items_json: str, + output_path: str) -> None: + base_data = _load_json(Path(input_metadata_json)) + enriched_data = _load_json(Path(input_enriched_items_json)) + merger = CollectionMerger() + merged = merger.merge(base_data, + enriched_data, + fields_to_update=['enriched_description'], + key_field='id', + allow_overwrite=False) + _write_json(Path(output_path), merged) + + +def main(_): + merge_enrichment(_FLAGS.input_metadata_json, + _FLAGS.input_enriched_items_json, _FLAGS.output_path) + logging.info("Merged enriched descriptions into base metadata JSON") + return 0 + + +if __name__ == '__main__': + _define_flags() + app.run(main) diff --git a/tools/agentic_import/sdmx/metadata_enricher_merge_test.py b/tools/agentic_import/sdmx/metadata_enricher_merge_test.py new file mode 100644 index 0000000000..1de45a7fff --- /dev/null +++ b/tools/agentic_import/sdmx/metadata_enricher_merge_test.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import tempfile +import unittest +from pathlib import Path + +from deepdiff.diff import DeepDiff + +from tools.agentic_import.sdmx.metadata_enricher_merge import CollectionMerger +from tools.agentic_import.sdmx.metadata_enricher_merge import merge_enrichment + +_TESTDATA_DIR = Path(os.path.dirname(__file__)) / 'testdata' +_BASE_JSON = _TESTDATA_DIR / 'sample_metadata.json' +_ENRICHED_JSON = _TESTDATA_DIR / 'sample_enriched_items.json' +_EXPECTED_JSON = _TESTDATA_DIR / 'sample_metadata_enriched_expected.json' + + +class CollectionMergerTest(unittest.TestCase): + + def setUp(self) -> None: + self._merger = CollectionMerger() + + def test_updates_only_listed_fields(self) -> None: + base = {"item": {"name": "Base"}} + incoming = {"item": {"name": "Incoming", "enriched_description": "New"}} + + merged = self._merger.merge(base, + incoming, + fields_to_update=["enriched_description"], + allow_overwrite=False) + + self.assertEqual(merged["item"]["name"], "Base") + self.assertEqual(merged["item"]["enriched_description"], "New") + + def test_overwrite_policy_for_listed_fields(self) -> None: + base_keep = {"item": {"enriched_description": "Old"}} + base_overwrite = {"item": {"enriched_description": "Old"}} + incoming = {"item": {"enriched_description": "New"}} + + merged_keep = self._merger.merge( + base_keep, + incoming, + fields_to_update=["enriched_description"], + allow_overwrite=False) + merged_overwrite = self._merger.merge( + base_overwrite, + incoming, + fields_to_update=["enriched_description"], + allow_overwrite=True) + + self.assertEqual(merged_keep["item"]["enriched_description"], "Old") + self.assertEqual(merged_overwrite["item"]["enriched_description"], + "New") + + def test_type_mismatch_on_listed_field_respects_overwrite(self) -> None: + base_keep = {"item": {"enriched_description": {"a": 1}}} + base_overwrite = {"item": {"enriched_description": {"a": 1}}} + incoming = {"item": {"enriched_description": "New"}} + + merged_keep = self._merger.merge( + base_keep, + incoming, + fields_to_update=["enriched_description"], + allow_overwrite=False) + merged_overwrite = self._merger.merge( + base_overwrite, + incoming, + fields_to_update=["enriched_description"], + allow_overwrite=True) + + self.assertEqual(merged_keep["item"]["enriched_description"], {"a": 1}) + self.assertEqual(merged_overwrite["item"]["enriched_description"], + "New") + + def test_traversal_type_mismatch_is_skipped(self) -> None: + base = {"item": {"details": {"a": 1}}} + incoming = {"item": {"details": ["x"]}} + + merged = self._merger.merge(base, + incoming, + fields_to_update=["enriched_description"]) + + self.assertEqual(merged["item"]["details"], {"a": 1}) + + def test_keyed_list_merge_respects_hierarchy(self) -> None: + base = { + "codelists": [ + { + "id": "CL1", + "codes": [{ + "id": "A" + },], + }, + { + "id": "CL2", + "codes": [{ + "id": "A" + },], + }, + ] + } + incoming = { + "codelists": [ + { + "id": + "CL1", + "codes": [{ + "id": "A", + "enriched_description": "Code A in CL1", + },], + }, + { + "id": + "CL2", + "codes": [{ + "id": "A", + "enriched_description": "Code A in CL2", + },], + }, + ] + } + + merged = self._merger.merge(base, + incoming, + fields_to_update=["enriched_description"], + allow_overwrite=False) + + cl1_code = merged["codelists"][0]["codes"][0] + cl2_code = merged["codelists"][1]["codes"][0] + self.assertEqual(cl1_code["enriched_description"], "Code A in CL1") + self.assertEqual(cl2_code["enriched_description"], "Code A in CL2") + + def test_keyed_list_merge_with_custom_key(self) -> None: + base = {"items": [{"code": "X", "value": 1}]} + incoming = { + "items": [ + { + "code": "X", + "enriched_description": "X desc" + }, + { + "code": "Y", + "enriched_description": "Y desc" + }, + ] + } + + merged = self._merger.merge(base, + incoming, + fields_to_update=["enriched_description"], + key_field="code") + + self.assertEqual(len(merged["items"]), 1) + self.assertEqual(merged["items"][0]["enriched_description"], "X desc") + + def test_list_items_without_key_are_ignored(self) -> None: + base = {"items": [{"id": "x"}]} + incoming = {"items": [{"name": "no_id"}]} + + merged = self._merger.merge(base, + incoming, + fields_to_update=["enriched_description"]) + + self.assertEqual(len(merged["items"]), 1) + self.assertEqual(merged["items"][0], {"id": "x"}) + + def test_base_items_without_key_are_ignored(self) -> None: + base = {"items": [{"name": "base-only"}]} + incoming = {"items": [{"id": "x", "enriched_description": "desc"}]} + + merged = self._merger.merge(base, + incoming, + fields_to_update=["enriched_description"]) + + self.assertEqual(len(merged["items"]), 1) + self.assertEqual(merged["items"][0]["name"], "base-only") + self.assertNotIn("enriched_description", merged["items"][0]) + + +class EnrichmentMergeTest(unittest.TestCase): + + def test_merge_enriched_description_across_lists(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / 'merged.json' + merge_enrichment(str(_BASE_JSON), str(_ENRICHED_JSON), + str(output_path)) + + merged = json.loads(output_path.read_text()) + + expected = json.loads(_EXPECTED_JSON.read_text()) + diff = DeepDiff(expected, merged, ignore_order=True) + self.assertFalse(diff, msg=str(diff)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/agentic_import/sdmx/templates/metadata_enricher_fetch_prompt.j2 b/tools/agentic_import/sdmx/templates/metadata_enricher_fetch_prompt.j2 new file mode 100644 index 0000000000..8f3ada99d8 --- /dev/null +++ b/tools/agentic_import/sdmx/templates/metadata_enricher_fetch_prompt.j2 @@ -0,0 +1,37 @@ +You are an expert SDMX metadata analyst. Your task is to enrich selected SDMX +codes and concepts using web search and provide concise descriptions. + +# INPUT +- Items-to-enrich JSON: {{ input_items_abs }} + +# OUTPUT +- Write JSON to: {{ output_path_abs }} +- Output MUST be valid JSON only. No extra text. + +# CRITICAL RULES +- Process the ENTIRE input file. Do not read only the first lines. +- Use web search for each item, batching multiple items per web call when possible. +- Do not add `enriched_name` anywhere. +- Do not include `name` or `description` fields in the output. +- Do not include `enrichment_query` in the output. +- Ground descriptions in search results and dataset context. +- Keep `enriched_description` concise (<= 240 chars). + +# TASK +1) Read the full JSON from the input path. +2) For each selected item, use its `enrichment_query` to search the web. +3) Produce an `enriched_description` for each item. +4) Output the SAME pruned JSON structure as input, but remove + `enrichment_query` and add `enriched_description`. + +# FIELD MINIMUMS (do not add name/description) +- dataflow: `id` +- data_structure_definition: `id` +- component (dimension/attribute/measure): `id` +- concept: `id`, `concept_scheme_id`, `enriched_description` +- representation: `type` +- codelist: `id` +- code: `id`, `enriched_description` +- referenced_concept_schemes: `id` + +Write ONLY the JSON file to the output path. diff --git a/tools/agentic_import/sdmx/templates/metadata_enricher_find_prompt.j2 b/tools/agentic_import/sdmx/templates/metadata_enricher_find_prompt.j2 new file mode 100644 index 0000000000..11e965df39 --- /dev/null +++ b/tools/agentic_import/sdmx/templates/metadata_enricher_find_prompt.j2 @@ -0,0 +1,49 @@ +You are an expert SDMX metadata analyst. Your task is to select only the SDMX +codes and concepts that need enrichment and to craft precise web search queries +for them. + +# INPUT +- Full extractor JSON: {{ input_metadata_abs }} + +# OUTPUT +- Write JSON to: {{ output_path_abs }} +- Output MUST be valid JSON only. No extra text. + +# CRITICAL RULES +- Process the ENTIRE input file. Do not read only the first lines. +- Do not add `enriched_name` anywhere. +- Do not include `name` or `description` fields in the output. +- Skip place names (countries, regions, cities, etc.). +- Skip popular/self-explanatory terms when clear (e.g., GDP, Population). +- Use full context (dataflow name/description, codelist name, dimension name, + concept name, code name/description) to decide and to build queries. +- Example: Interpret HICP in the context of the dataset and codelist, not alone. + +# TASK +1) Read the full JSON from the input path. +2) Select only items that truly need enrichment. +3) For each selected item, add an `enrichment_query` string that reflects the + full context needed for web search. +4) Produce a PRUNED JSON that preserves the original structure but ONLY keeps + the selected items and their necessary parent structure. + +# OUTPUT SHAPE (pruned) +- Keep `dataflows` array. +- For each kept dataflow: include `id` and only the substructures that contain + selected items. +- For code items, keep them under their original `representation.codelist.codes`. +- For concept items, keep them under their original `concept` (components) and/or + `referenced_concept_schemes[*].concepts`. +- Remove all unselected items and any parent objects left empty. + +# FIELD MINIMUMS (do not add name/description) +- dataflow: `id` +- data_structure_definition: `id` +- component (dimension/attribute/measure): `id` +- concept: `id`, `concept_scheme_id`, `enrichment_query` +- representation: `type` +- codelist: `id` +- code: `id`, `enrichment_query` +- referenced_concept_schemes: `id` + +Write ONLY the JSON file to the output path. diff --git a/tools/agentic_import/sdmx/testdata/sample_enriched_items.json b/tools/agentic_import/sdmx/testdata/sample_enriched_items.json new file mode 100644 index 0000000000..d566992a50 --- /dev/null +++ b/tools/agentic_import/sdmx/testdata/sample_enriched_items.json @@ -0,0 +1,109 @@ +{ + "dataflows": [ + { + "id": "DF1", + "enriched_description": "Flow One enriched", + "data_structure_definition": { + "dimensions": [ + { + "id": "DIM1", + "enriched_description": "Dimension enriched", + "concept": { + "id": "C1", + "enriched_description": "Concept C1 enriched" + }, + "representation": { + "codelist": { + "id": "CL1", + "name": "Enriched Codelist One", + "codes": [ + { + "id": "CODE1", + "name": "Enriched Code1 CL1", + "enriched_description": "Code 1 enriched" + }, + { + "id": "CODE2", + "name": "Enriched Code2 CL1", + "enriched_description": "Code 2 enriched" + } + ] + } + } + }, + { + "id": "DIM2", + "enriched_description": "Dimension two enriched", + "concept": { + "id": "C4", + "enriched_description": "Concept C4 enriched" + }, + "representation": { + "codelist": { + "id": "CL2", + "name": "Enriched Codelist Two", + "codes": [ + { + "id": "CODE1", + "name": "Enriched Code1 CL2", + "enriched_description": "Code 1 enriched CL2" + } + ] + } + } + } + ], + "attributes": [ + { + "id": "ATTR1", + "enriched_description": "Attribute enriched", + "concept": { + "id": "C2", + "enriched_description": "Concept C2 enriched" + }, + "representation": { + "codelist": { + "codes": [ + { + "id": "ACODE1", + "enriched_description": "Attr code enriched" + } + ] + } + } + } + ], + "measures": [ + { + "id": "MEAS1", + "enriched_description": "Measure enriched", + "concept": { + "id": "C3", + "enriched_description": "Concept C3 enriched" + } + } + ] + }, + "referenced_concept_schemes": [ + { + "id": "CS1", + "enriched_description": "Scheme enriched", + "concepts": [ + { + "id": "CON1", + "enriched_description": "Concept 1 enriched" + }, + { + "id": "CON2", + "enriched_description": "Concept 2 enriched" + } + ] + } + ] + }, + { + "id": "DF3", + "enriched_description": "No base match" + } + ] +} diff --git a/tools/agentic_import/sdmx/testdata/sample_metadata.json b/tools/agentic_import/sdmx/testdata/sample_metadata.json new file mode 100644 index 0000000000..5770feaf3a --- /dev/null +++ b/tools/agentic_import/sdmx/testdata/sample_metadata.json @@ -0,0 +1,71 @@ +{ + "dataflows": [ + { + "id": "DF1", + "name": "Flow One", + "data_structure_definition": { + "dimensions": [ + { + "id": "DIM1", + "concept": {"id": "C1"}, + "representation": { + "codelist": { + "id": "CL1", + "name": "Base Codelist One", + "codes": [ + {"id": "CODE1", "name": "Base Code1 CL1"}, + {"id": "CODE2", "name": "Base Code2 CL1"} + ] + } + } + }, + { + "id": "DIM2", + "concept": {"id": "C4"}, + "representation": { + "codelist": { + "id": "CL2", + "name": "Base Codelist Two", + "codes": [ + {"id": "CODE1", "name": "Base Code1 CL2"} + ] + } + } + } + ], + "attributes": [ + { + "id": "ATTR1", + "concept": {"id": "C2"}, + "representation": { + "codelist": { + "codes": [ + {"id": "ACODE1"} + ] + } + } + } + ], + "measures": [ + { + "id": "MEAS1", + "concept": {"id": "C3"} + } + ] + }, + "referenced_concept_schemes": [ + { + "id": "CS1", + "concepts": [ + {"id": "CON1"}, + {"id": "CON2"} + ] + } + ] + }, + { + "id": "DF2", + "name": "Flow Two" + } + ] +} diff --git a/tools/agentic_import/sdmx/testdata/sample_metadata_enriched_expected.json b/tools/agentic_import/sdmx/testdata/sample_metadata_enriched_expected.json new file mode 100644 index 0000000000..ce6b3978db --- /dev/null +++ b/tools/agentic_import/sdmx/testdata/sample_metadata_enriched_expected.json @@ -0,0 +1,110 @@ +{ + "dataflows": [ + { + "id": "DF1", + "name": "Flow One", + "enriched_description": "Flow One enriched", + "data_structure_definition": { + "dimensions": [ + { + "id": "DIM1", + "concept": { + "id": "C1", + "enriched_description": "Concept C1 enriched" + }, + "representation": { + "codelist": { + "id": "CL1", + "name": "Base Codelist One", + "codes": [ + { + "id": "CODE1", + "name": "Base Code1 CL1", + "enriched_description": "Code 1 enriched" + }, + { + "id": "CODE2", + "name": "Base Code2 CL1", + "enriched_description": "Code 2 enriched" + } + ] + } + }, + "enriched_description": "Dimension enriched" + }, + { + "id": "DIM2", + "concept": { + "id": "C4", + "enriched_description": "Concept C4 enriched" + }, + "representation": { + "codelist": { + "id": "CL2", + "name": "Base Codelist Two", + "codes": [ + { + "id": "CODE1", + "name": "Base Code1 CL2", + "enriched_description": "Code 1 enriched CL2" + } + ] + } + }, + "enriched_description": "Dimension two enriched" + } + ], + "attributes": [ + { + "id": "ATTR1", + "concept": { + "id": "C2", + "enriched_description": "Concept C2 enriched" + }, + "representation": { + "codelist": { + "codes": [ + { + "id": "ACODE1", + "enriched_description": "Attr code enriched" + } + ] + } + }, + "enriched_description": "Attribute enriched" + } + ], + "measures": [ + { + "id": "MEAS1", + "concept": { + "id": "C3", + "enriched_description": "Concept C3 enriched" + }, + "enriched_description": "Measure enriched" + } + ] + }, + "referenced_concept_schemes": [ + { + "id": "CS1", + "concepts": [ + { + "id": "CON1", + "enriched_description": "Concept 1 enriched" + }, + { + "id": "CON2", + "enriched_description": "Concept 2 enriched" + } + ], + "enriched_description": "Scheme enriched" + } + ] + }, + { + "id": "DF2", + "name": "Flow Two" + } + ] +}