Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5234439
Add SDMX enrichment item selector
rohitkumarbhagat Jan 19, 2026
328e130
Add SDMX enrichment data fetcher
rohitkumarbhagat Jan 19, 2026
b4089df
Add SDMX enrichment merge tool
rohitkumarbhagat Jan 19, 2026
108fef6
lint changes
rohitkumarbhagat Jan 19, 2026
b1850c3
Document SDMX enrichment tools
rohitkumarbhagat Jan 19, 2026
dfc3b86
Require dataset prefix for Gemini runs
rohitkumarbhagat Jan 19, 2026
5d17565
Merge branch 'master' of github.com:datacommonsorg/data into enrich_s…
rohitkumarbhagat Jan 20, 2026
1a7b29d
test: add sdmx enrichment fixtures
rohitkumarbhagat Jan 20, 2026
45bf22f
test: assert SDMX prompt params
rohitkumarbhagat Jan 20, 2026
bda8cbd
lint fix
rohitkumarbhagat Jan 20, 2026
f8692f5
Rename SDMX metadata enricher tools
rohitkumarbhagat Jan 20, 2026
f7c530b
Merge branch 'master' of github.com:datacommonsorg/data into enrich_s…
rohitkumarbhagat Jan 20, 2026
7c85eff
Extract gemini prompt runner
rohitkumarbhagat Jan 20, 2026
2ad41f9
Merge branch 'master' of github.com:datacommonsorg/data into enrich_s…
rohitkumarbhagat Jan 20, 2026
e50a67a
Add json merge helper and tests
rohitkumarbhagat Jan 21, 2026
9ffdfa6
Add field-whitelist JSON merge helper
rohitkumarbhagat Jan 21, 2026
565ae08
refactor: move collection merge into sdmx tool
rohitkumarbhagat Jan 21, 2026
d9149da
docs: Refactor the SDMX enrichment README to detail the metadata enri…
rohitkumarbhagat Jan 21, 2026
1e12a1b
Merge branch 'master' into enrich_sedmx_metadata_v2
rohitkumarbhagat Jan 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions tools/agentic_import/common/gemini_prompt_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Render prompts and run the Gemini CLI with tracked run outputs."""

import shutil
import subprocess
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Callable, Mapping, Optional

from absl import logging
from jinja2 import Environment, FileSystemLoader


@dataclass
class GeminiRunResult:
run_id: str
run_dir: Path
prompt_path: Path
gemini_log_path: Path
gemini_command: str
sandbox_enabled: bool


class GeminiPromptRunner:

def __init__(self,
dataset_prefix: str,
working_dir: Optional[str] = None,
run_root: str = '.datacommons/runs',
dry_run: bool = False,
skip_confirmation: bool = False,
enable_sandboxing: bool = False,
gemini_cli: Optional[str] = None):
self._working_dir = Path(
working_dir).resolve() if working_dir else Path.cwd()
self._dataset_prefix = (dataset_prefix or '').strip()
if not self._dataset_prefix:
raise ValueError("dataset_prefix must be a non-empty string.")

self._run_root = run_root
self._dry_run = dry_run
self._skip_confirmation = skip_confirmation
self._enable_sandboxing = enable_sandboxing
self._gemini_cli = gemini_cli

self._run_id = self._build_run_id()
self._run_dir = self._create_run_dir()

@property
def run_id(self) -> str:
return self._run_id

@property
def run_dir(self) -> Path:
return self._run_dir

@property
def working_dir(self) -> Path:
return self._working_dir

def render_prompt(self, template_dir: Path, template_name: str,
context: Mapping[str, str], prompt_filename: str) -> Path:
# If other LLM runners are added later, extract rendering into a separate utility.
env = Environment(loader=FileSystemLoader(str(template_dir)))
template = env.get_template(template_name)

rendered_prompt = template.render(**context)
output_file = self._run_dir / prompt_filename
with open(output_file, 'w') as f:
f.write(rendered_prompt)

logging.info("Generated prompt written to: %s", output_file)
return output_file

def run(self,
prompt_file: Path,
log_filename: str = 'gemini_cli.log',
log_path_override: Optional[Path] = None,
confirm_fn: Optional[Callable[[Path], bool]] = None,
cancel_log_message: Optional[str] = None) -> GeminiRunResult:
gemini_log_path = (log_path_override.resolve() if log_path_override else
(self._run_dir / log_filename))
gemini_command = self._build_gemini_command(prompt_file,
gemini_log_path)

result = GeminiRunResult(run_id=self._run_id,
run_dir=self._run_dir,
prompt_path=prompt_file,
gemini_log_path=gemini_log_path,
gemini_command=gemini_command,
sandbox_enabled=self._enable_sandboxing)

if self._dry_run:
logging.info(
"Dry run mode: Prompt file generated at %s. "
"Skipping Gemini CLI execution.", prompt_file)
return result

if not self._skip_confirmation and confirm_fn is not None:
if not confirm_fn(prompt_file):
if cancel_log_message:
logging.info(cancel_log_message)
return result

if not self._check_gemini_cli_available():
logging.warning(
"Gemini CLI not found in PATH. Will attempt to run anyway (may work if aliased)."
)

logging.info("Launching gemini (cwd: %s): %s", self._working_dir,
gemini_command)
logging.info("Gemini output will be saved to: %s", gemini_log_path)

exit_code = self._run_subprocess(gemini_command)
if exit_code == 0:
logging.info("Gemini CLI completed successfully")
return result

raise RuntimeError(
f"Gemini CLI execution failed with exit code {exit_code}")

def _build_run_id(self) -> str:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"{self._dataset_prefix}_gemini_{timestamp}"

def _create_run_dir(self) -> Path:
run_root = Path(self._run_root).expanduser()
if not run_root.is_absolute():
run_root = self._working_dir / run_root
run_root.mkdir(parents=True, exist_ok=True)

run_dir = run_root / self._run_id
run_dir.mkdir(parents=True, exist_ok=True)
return run_dir

def _check_gemini_cli_available(self) -> bool:
if self._gemini_cli:
return True
return shutil.which('gemini') is not None

def _build_gemini_command(self, prompt_file: Path, log_file: Path) -> str:
prompt_path = prompt_file.resolve()
log_path = log_file.resolve()
gemini_cmd = self._gemini_cli or 'gemini'
sandbox_flag = "--sandbox" if self._enable_sandboxing else ""
return (
f"cat '{prompt_path}' | {gemini_cmd} {sandbox_flag} -y 2>&1 | tee '{log_path}'"
)

def _run_subprocess(self, command: str) -> int:
try:
process = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
cwd=self._working_dir,
encoding='utf-8',
errors='replace',
bufsize=1,
universal_newlines=True)

while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.rstrip())

return process.wait()
except Exception as e:
logging.error("Error running subprocess: %s", str(e))
return 1
94 changes: 94 additions & 0 deletions tools/agentic_import/sdmx/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# SDMX Metadata Enrichment Pipeline

The enrichment process is organized into three distinct steps: Discovery, Fetching, and Integration.

---

## Step 1: Discovery (`metadata_enricher_find.py`)

**Role**: Analyzes the base SDMX metadata using Gemini CLI to identify codes and concepts that require enrichment. It generates context-aware search queries (`enrichment_query`) for these items while preserving the original dataset structure.

**Command**:
```bash
python tools/agentic_import/sdmx/metadata_enricher_find.py \
--input_metadata_json="/path/to/metadata.json" \
--dataset_prefix="oecd_prices" \
--output_path="/path/to/items_to_enrich.json" \
--gemini_cli="gemini" \
--enable_sandboxing
```

**Input**:
- Base SDMX `metadata.json` file.

**Output**:
- `items_to_enrich.json`: A pruned JSON structure containing only selected items with their generated `enrichment_query`.

---

## Step 2: Fetching (`metadata_enricher_fetch.py`)

**Role**: Orchestrates external web searches (via Gemini CLI) to populate detailed descriptions (`enriched_description`) for the items identified in the previous step.

**Command**:
```bash
python tools/agentic_import/sdmx/metadata_enricher_fetch.py \
--input_items_json="/path/to/items_to_enrich.json" \
--dataset_prefix="oecd_prices" \
--output_path="/path/to/enriched_items.json" \
--gemini_cli="gemini" \
--enable_sandboxing
```

**Input**:
- `items_to_enrich.json` (from Step 1).

**Output**:
- `enriched_items.json`: A pruned JSON structure with `enriched_description` added for each item.

---

## Step 3: Integration (`metadata_enricher_merge.py`)

**Role**: Merges the fetched descriptions back into the original SDMX metadata JSON, resulting in a complete, enriched metadata file.

**Command**:
```bash
python tools/agentic_import/sdmx/metadata_enricher_merge.py \
--input_metadata_json="/path/to/metadata.json" \
--input_enriched_items_json="/path/to/enriched_items.json" \
--output_path="/path/to/metadata_enriched.json"
```

**Input**:
- Base SDMX `metadata.json`.
- `enriched_items.json` (from Step 2).

**Output**:
- `metadata_enriched.json`: The final, full metadata JSON with `enriched_description` fields merged into the matching codes and concepts.

---

## Full Pipeline Example

To run the entire enrichment pipeline for a dataset:

```bash
# 1. Discover items to enrich
python tools/agentic_import/sdmx/metadata_enricher_find.py \
--input_metadata_json="metadata.json" \
--dataset_prefix="my_dataset" \
--output_path="items_to_enrich.json"

# 2. Fetch enriched descriptions
python tools/agentic_import/sdmx/metadata_enricher_fetch.py \
--input_items_json="items_to_enrich.json" \
--dataset_prefix="my_dataset" \
--output_path="enriched_items.json"

# 3. Merge results into the original metadata
python tools/agentic_import/sdmx/metadata_enricher_merge.py \
--input_metadata_json="metadata.json" \
--input_enriched_items_json="enriched_items.json" \
--output_path="metadata_enriched.json"
```
Loading
Loading