Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion tools/agentic_import/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This guide describes the complete process for importing CSV data into Data Commo
## Table of Contents

- [Agentic Import Tool for Data Commons](#agentic-import-tool-for-data-commons)
- [Table of Contents](#table-of-contents)
- [SDMX Quick Links](#sdmx-quick-links)
- [Prerequisites](#prerequisites)
- [Required Tools](#required-tools)
- [Setup](#setup)
Expand All @@ -26,6 +26,12 @@ This guide describes the complete process for importing CSV data into Data Commo
- [Gemini CLI Debugging](#gemini-cli-debugging)
- [Log Structure](#log-structure)

## SDMX Quick Links

- [SDMX import pipeline (end-to-end)](sdmx_import_pipeline.md)
- [SDMX Downloads (section)](#sdmx-downloads)
- [SDMX CLI documentation](../sdmx_import/README.md)

## Prerequisites

Before starting the import process, ensure you have the following installed and configured:
Expand Down Expand Up @@ -112,6 +118,7 @@ working_directory/
#### SDMX Downloads

Refer to the [SDMX CLI documentation](../sdmx_import/README.md) for details on downloading SDMX data and metadata files.
See the [SDMX import pipeline](sdmx_import_pipeline.md) for the end-to-end SDMX flow.

Extract a simplified, token-efficient JSON metadata copy from `metadata.xml`, retaining the original XML for later PV map generation.

Expand Down
16 changes: 12 additions & 4 deletions tools/agentic_import/sdmx_import_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The SDMX Agentic Import Pipeline is a Python-based system designed to automate t

The pipeline orchestrates several tools to handle the end-to-end import process:
1. **Download**: Retrieves data and metadata from SDMX endpoints.
2. **Metadata Extraction**: Converts SDMX metadata into JSON for downstream steps.
2. **Sample**: Creates a manageable sample of the data for analysis.
3. **Schema Mapping**: Generates Property-Value (PV) mappings using LLM-based tools.
4. **Full Data Processing**: Converts the full dataset into Data Commons MCF and CSV formats.
Expand Down Expand Up @@ -51,6 +52,9 @@ python $DC_DATA_REPO_PATH/tools/agentic_import/sdmx_import_pipeline.py \
- `--dataset_prefix`: (Optional) Prefix for generated artifacts. Useful for disambiguating multiple datasets in the same working directory. If not provided, it is derived from the dataflow ID.
- `--sample.rows`: Number of rows for the sample dataset (default: 1000).
- `--force`: Force re-execution of all steps, ignoring saved state.
- `--run_only`: Execute only a single pipeline step by name.
- `--run_from`: Execute pipeline steps starting at the named step (inclusive).
- `--run_until`: Execute pipeline steps through the named step (inclusive).
- `--skip_confirmation`: Skip interactive confirmation prompts during schema mapping.
- `--verbose`: Enable verbose logging.

Expand All @@ -60,10 +64,11 @@ The pipeline consists of the following steps, executed in order:

1. **DownloadDataStep**: Downloads SDMX data to `<dataset_prefix>_data.csv`.
2. **DownloadMetadataStep**: Downloads SDMX metadata to `<dataset_prefix>_metadata.xml`.
3. **CreateSampleStep**: Creates `<dataset_prefix>_sample.csv` from the downloaded data.
4. **CreateSchemaMapStep**: Generates PV map and config in `sample_output/` using `pvmap_generator.py`.
5. **ProcessFullDataStep**: Processes the full data using `stat_var_processor.py` to generate artifacts in `output/`.
6. **CreateDcConfigStep**: Generates `output/<dataset_prefix>_config.json` for custom DC imports.
3. **ExtractMetadataStep**: Extracts SDMX metadata to `<dataset_prefix>_metadata.json`.
4. **CreateSampleStep**: Creates `<dataset_prefix>_sample.csv` from the downloaded data.
5. **CreateSchemaMapStep**: Generates PV map and config in `sample_output/` using `pvmap_generator.py`.
6. **ProcessFullDataStep**: Processes the full data using `stat_var_processor.py` to generate artifacts in `output/`.
7. **CreateDcConfigStep**: Generates `output/<dataset_prefix>_config.json` for custom DC imports.

## Directory Structure

Expand All @@ -73,6 +78,7 @@ The pipeline organizes outputs within the specified working directory:
working_dir/
├── <dataset_prefix>_data.csv # Raw downloaded data
├── <dataset_prefix>_metadata.xml # Raw downloaded metadata
├── <dataset_prefix>_metadata.json # Extracted metadata for downstream steps
├── <dataset_prefix>_sample.csv # Sampled data
├── .datacommons/
│ └── <dataset_prefix>.state.json # Pipeline state for resuming runs
Expand All @@ -92,6 +98,8 @@ The pipeline automatically saves its state to a `<dataset_prefix>.state.json` fi
- **Resuming**: If a run is interrupted, running the same command again will resume from the last successful step.
- **Skipping**: Steps that have already completed successfully will be skipped unless `--force` is used.
- **Input Hashing**: The pipeline tracks input configuration. If critical configuration changes, it may trigger re-execution of steps.
- **Run Only**: Use `--run_only=<step_name>` to execute just one step (for example, `download-metadata` or `create-schema-mapping`).
- **Run Range**: Use `--run_from=<step_name>` and/or `--run_until=<step_name>` to limit execution to a contiguous range of steps (inclusive). The range respects incremental state by default; use `--force` to rerun all steps in the range.

## Troubleshooting

Expand Down
6 changes: 6 additions & 0 deletions tools/agentic_import/sdmx_import_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def _define_flags() -> None:

flags.DEFINE_string("run_only", None,
"Execute only a specific pipeline step by name.")
flags.DEFINE_string("run_from", None,
"Execute pipeline steps starting at the named step.")
flags.DEFINE_string("run_until", None,
"Execute pipeline steps through the named step.")

flags.DEFINE_boolean("force", False, "Force all steps to run.")

Expand Down Expand Up @@ -313,6 +317,8 @@ def prepare_config() -> PipelineConfig:
dataset_prefix=FLAGS.dataset_prefix,
working_dir=FLAGS.working_dir,
run_only=FLAGS.run_only,
run_from=FLAGS.run_from,
run_until=FLAGS.run_until,
force=FLAGS.force,
verbose=FLAGS.verbose,
skip_confirmation=FLAGS.skip_confirmation,
Expand Down
Loading
Loading