Skip to content

Commit 05cb453

Browse files
committed
global refactoring to make it dagster-only and introduce dynamic partitions
1 parent 97333d4 commit 05cb453

45 files changed

Lines changed: 5745 additions & 5846 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,4 @@ uv.lock
3535
.vscode/
3636
.idea/
3737
.cursor/
38+
.dagster_home

AGENTS.md

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ This repository is dedicated to the preparation of genomic annotation data (Ense
66

77
- `src/prepare_annotations/`: Core logic and CLI.
88
- `cli.py`: Main Typer CLI entrypoint.
9-
- `pipelines.py`: Main Prefect flow and pipeline definitions.
9+
- `pipelines/`: Primary Dagster-based pipelines.
1010
- `vcf_downloader.py`: VCF download utilities.
1111
- `genome_downloader.py`: Ensembl genome download utilities.
1212
- `huggingface_uploader.py`: Upload utilities for HuggingFace Hub.
@@ -21,8 +21,7 @@ This repository is dedicated to the preparation of genomic annotation data (Ense
2121
- `superhuman.py`: Elite performance genetics conversion.
2222
- `vo2max.py`: VO2max conversion.
2323
- `common.py`: Shared conversion utilities.
24-
- `vortex/`: Vortex data conversion utilities.
25-
- `pipelines_dagster/`: Dagster-based pipeline alternative.
24+
- `pipelines/`: Primary Dagster-based pipelines.
2625
- `io.py`: VCF/Parquet I/O utilities.
2726
- `runtime.py`: Execution environment and profiling.
2827
- `models.py`: Pydantic models for results.
@@ -36,19 +35,19 @@ This repository is dedicated to the preparation of genomic annotation data (Ense
3635
- **Type hints**: Mandatory for all Python code.
3736
- **Pathlib**: Always use for all file paths.
3837
- **Polars**: Prefer over Pandas for performance.
39-
- **Prefect**: Used for workflow orchestration and parallel execution.
38+
- **Dagster**: Primary tool for workflow orchestration and parallel execution.
4039
- **Eliot**: Used for structured logging and action tracking.
4140
- **Typer**: Mandatory for CLI tools.
4241
- **Pydantic 2**: Mandatory for data classes.
42+
- **Avoid __all__: avoid __init__.py with __all__ as it confuses where things are located
4343

4444
## Commands
4545

46-
### Main Genomic Data Pipelines
46+
### Primary Dagster Pipelines (Recommended)
4747

48-
- `uv run prepare-annotations ensembl`: Download and prepare Ensembl variations.
49-
- `uv run prepare-annotations clinvar`: Download and prepare ClinVar data.
50-
- `uv run prepare-annotations dbsnp`: Download and prepare dbSNP data.
51-
- `uv run prepare-annotations gnomad`: Download and prepare gnomAD data.
48+
- `uv run dagster-ensembl`: Run the full Ensembl pipeline (download, convert, upload).
49+
- `uv run dagster-ensembl ui`: Launch Dagster UI for Ensembl pipelines.
50+
- `uv run dagster-ui`: General Dagster development server entrypoint.
5251

5352
### OakVar Module Management
5453

README.md

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@ A dedicated toolkit for downloading, processing, and preparing genomic annotatio
44

55
## Features
66

7-
- **Prefect-based Pipelines**: robust workflows for data preparation.
7+
- **Dagster-based Pipelines (Primary)**: Software-Defined Assets (SDA) with full lineage tracking, parallel execution, and automated Hugging Face uploads.
88
- **Support for multiple sources**:
99
- **Ensembl**: Human genetic variations.
1010
- **ClinVar**: Clinical variant data.
1111
- **dbSNP**: Single Nucleotide Polymorphism database.
1212
- **gnomAD**: Genome Aggregation Database.
1313
- **OakVar Module Management**: Download and convert data from [dna-seq](https://github.com/orgs/dna-seq/repositories) OakVar modules.
14-
- **VCF to Parquet**: Efficient conversion of large VCF files to columnar format.
15-
- **Variant Splitting**: Splitting variants by type (SNV, Indel, etc.) for optimized annotation.
14+
- **VCF to Parquet**: Efficient conversion of large VCF files to columnar format using `polars-bio`.
1615
- **Hugging Face Hub Integration**: Direct upload of processed datasets with automatic dataset card generation.
1716

1817
## Installation
@@ -27,33 +26,32 @@ uv sync
2726

2827
## Usage
2928

30-
### Main Genomic Data Pipeline
29+
### 🔷 Dagster Pipelines
3130

32-
The `prepare-annotations` command handles large-scale genomic data downloads and processing.
31+
The primary way to run pipelines is using Dagster. This provides parallel execution, resumable downloads, and integrated Hugging Face uploads.
3332

34-
```bash
35-
# Show version
36-
uv run prepare-annotations version
37-
38-
# Download and process Ensembl variations
39-
uv run prepare-annotations ensembl --split --upload
33+
#### Ensembl Pipeline
4034

41-
# Download and process ClinVar data
42-
uv run prepare-annotations clinvar --split --upload
35+
```bash
36+
# Run the full pipeline (download → convert → upload)
37+
uv run dagster-ensembl
4338

44-
# Download and process dbSNP data
45-
uv run prepare-annotations dbsnp --build GRCh38 --split
39+
# Start the Dagster UI for monitoring and interactive execution
40+
uv run dagster-ensembl ui
4641

47-
# Download and process gnomAD data
48-
uv run prepare-annotations gnomad --version v4 --split
42+
# Run for a specific species
43+
uv run dagster-ensembl run --species mus_musculus
4944
```
5045

51-
#### Main Pipeline Options
46+
#### Other Dagster Commands
5247

53-
- `--dest-dir`: Destination directory for downloads.
54-
- `--split`: Split downloaded files by variant type.
55-
- `--upload`: Upload results to Hugging Face Hub.
56-
- `--repo-id`: Custom Hugging Face repository ID.
48+
```bash
49+
# List all available assets
50+
uv run dagster-ui assets
51+
52+
# Materialize specific assets
53+
uv run dagster-ui materialize ensembl_vcf_urls
54+
```
5755

5856
### OakVar Module Management
5957

dagster.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Dagster instance configuration for prepare-annotations
2+
#
3+
# Concurrency Control:
4+
# Uses tag-based concurrency limits via run_coordinator to prevent
5+
# too many memory-intensive operations running in parallel.
6+
#
7+
# See: https://docs.dagster.io/deployment/dagster-instance
8+
9+
# Run coordinator with tag-based concurrency limits
10+
run_coordinator:
11+
module: dagster.core.run_coordinator
12+
class: QueuedRunCoordinator
13+
config:
14+
tag_concurrency_limits:
15+
# Limit concurrent VCF downloads (I/O bound)
16+
- key: "dagster/concurrency_key"
17+
value: "ensembl_vcf_download"
18+
limit: 4
19+
# Limit concurrent parquet conversions (CPU/memory intensive)
20+
- key: "dagster/concurrency_key"
21+
value: "ensembl_parquet_conversion"
22+
limit: 2
23+
24+
# Unified storage configuration (SQLite for local development)
25+
storage:
26+
sqlite:
27+
base_dir: .dagster_home

docs/DAGSTER_ENSEMBL_PIPELINE.md

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,27 @@
33
This repo includes a **Dagster** implementation of the Ensembl preparation pipeline as a parallel alternative to the Prefect flows.
44

55
The Dagster implementation lives under:
6-
- `src/prepare_annotations/pipelines_dagster/`
6+
- `src/prepare_annotations/pipelines/`
77

88
It is intentionally **file/directory based**: each asset materializes a concrete on-disk artifact (a JSON manifest, a directory of VCFs, a directory of Parquet files, etc.). This makes lineage inspectable and keeps memory usage predictable.
99

1010
---
1111

12+
### Core principles
13+
14+
- **Lineage-first assets**: each asset returns a concrete on-disk artifact (Path) to avoid passing large in-memory objects.
15+
- **Dynamic partitioning**: per-file assets are partitioned by filename for fine-grained lineage and UI progress.
16+
- **Memory safety**: prefer streaming (`LazyFrame.sink_parquet` with `engine="streaming"` by default) and avoid eager materialization during conversion.
17+
- **Scale-aware joins**: for joins that Polars would materialize in memory, prefer DuckDB or staged filtering.
18+
- **Resource visibility**: download/convert steps log duration and peak memory where available.
19+
- **Idempotent outputs**: assets skip work when target files are present and up-to-date.
20+
21+
---
22+
1223
### Key Features
1324

1425
- **Parallel downloads**: Configurable concurrent downloads (`max_concurrent_downloads`, default: 4)
15-
- **Retry policies**: Exponential backoff retry policy on download failures (max 3 retries)
26+
- **Retry policies**: Dagster retry policy (max 3 attempts) plus downloader retries (default: 10)
1627
- **Checksum verification**: BSD sum checksum validation using CHECKSUMS file from Ensembl FTP
1728
- **Resumable downloads**: fsspec filecache-based resumption for interrupted transfers
1829
- Uploads directly from the non-split Parquet directory (no legacy TSA splitting in Dagster)
@@ -36,9 +47,11 @@ The default pipeline prepares Ensembl VCFs into Parquet format:
3647
```mermaid
3748
flowchart TD
3849
A[ensembl_ftp_source<br/>(external)] --> B[ensembl_vcf_urls<br/>vcf_urls.json]
39-
B --> C[ensembl_vcf_files<br/>vcf/ directory<br/>(parallel downloads)]
40-
C --> D[ensembl_parquet_files<br/>species dir (*.parquet)]
41-
D --> F[ensembl_hf_upload<br/>(optional)]
50+
B --> C1[ensembl_vcf_file<br/>per-file download<br/>(partitioned)]
51+
C1 --> C2[ensembl_vcf_files<br/>vcf/ directory<br/>(batch downloads)]
52+
C2 --> D1[ensembl_parquet_file<br/>per-file conversion<br/>(partitioned)]
53+
D1 --> D2[ensembl_parquet_files<br/>species dir (*.parquet)]
54+
D2 --> F[ensembl_hf_upload<br/>(optional)]
4255
```
4356

4457
#### ASCII diagram (fallback)
@@ -50,10 +63,16 @@ ensembl_ftp_source (external)
5063
ensembl_vcf_urls (vcf_urls.json)
5164
|
5265
v
66+
ensembl_vcf_file (per-file download, partitioned)
67+
|
68+
v
5369
ensembl_vcf_files (vcf/ directory, parallel downloads with retries)
5470
|
5571
v
56-
ensembl_parquet_files (species directory with *.parquet)
72+
ensembl_parquet_file (per-file conversion, partitioned)
73+
|
74+
v
75+
ensembl_parquet_files (species directory with *.parquet)
5776
|
5877
v
5978
ensembl_hf_upload (optional)
@@ -63,7 +82,7 @@ ensembl_hf_upload (optional)
6382

6483
### On-disk layout (default)
6584

66-
Paths are resolved via `src/prepare_annotations/pipelines_dagster/resources.py`.
85+
Paths are resolved via `src/prepare_annotations/pipelines/resources.py`.
6786

6887
By default the pipeline writes to your user cache (same convention as other Just DNA tooling):
6988
- Base cache dir: `~/.cache/just-dna-pipelines/` (or `JUST_DNA_PIPELINES_CACHE_DIR`)
@@ -82,7 +101,9 @@ For Ensembl:
82101
- **Retry policy** with exponential backoff (30s initial delay, up to 3 retries) at the Dagster asset level.
83102
- **Resumable downloads** via fsspec filecache (interrupted downloads resume from where they left off).
84103
- **Checksum verification** using BSD sum (`CHECKSUMS` file from Ensembl FTP); corrupted files are automatically re-downloaded.
85-
- **VCF → Parquet** uses `polars-bio` scanning and `LazyFrame.sink_parquet(...)` to stream to disk.
104+
- **VCF → Parquet** uses `polars-bio` scanning and `LazyFrame.sink_parquet(..., engine="streaming")` to stream to disk by default.
105+
- **Resource logging**: conversion and download steps record duration/peak memory when available.
106+
- **Join strategy**: when Polars would materialize full datasets on joins, prefer DuckDB or pre-filtered joins to limit memory pressure.
86107
- Dagster assets return **Paths** (manifest files / directories), not large Python lists, to avoid passing large in-memory objects between steps.
87108

88109
---
@@ -124,6 +145,14 @@ List available jobs:
124145
uv run dagster-ensembl jobs
125146
```
126147

148+
Run LongevityMap conversion (Dagster module assets):
149+
150+
```bash
151+
uv run dagster-ensembl longevitymap
152+
uv run dagster-ensembl longevitymap --full
153+
uv run dagster-ensembl longevitymap --upload
154+
```
155+
127156
#### Run via Dagster UI
128157

129158
Start the web interface for interactive execution:
@@ -138,7 +167,7 @@ Then materialize assets / jobs from the UI.
138167

139168
### Jobs provided
140169

141-
Jobs are defined in `src/prepare_annotations/pipelines_dagster/definitions.py`:
170+
Jobs are defined in `src/prepare_annotations/pipelines/definitions.py`:
142171

143172
| Job | Description |
144173
|-----|-------------|
@@ -147,6 +176,9 @@ Jobs are defined in `src/prepare_annotations/pipelines_dagster/definitions.py`:
147176
| `download` | Download VCF files only (parallel with retries) |
148177
| `convert` | Convert VCF to Parquet (assumes VCFs downloaded) |
149178
| `upload` | Upload to HuggingFace Hub (assumes parquet exists) |
179+
| `longevitymap` | Convert LongevityMap to unified schema (with Ensembl genotype resolution) |
180+
| `longevitymap_full` | Convert LongevityMap and join with full Ensembl data |
181+
| `longevitymap_upload` | Convert LongevityMap and upload to `just-dna-seq/annotators` |
150182

151183
---
152184

@@ -156,18 +188,31 @@ Key configuration parameters (set via Dagster config):
156188

157189
**EnsemblDownloadConfig:**
158190
- `species`: Species name (default: `homo_sapiens`)
191+
- `base_url`: Ensembl FTP base URL (default: `https://ftp.ensembl.org/pub/current_variation/vcf/`)
192+
- `pattern`: Regex to filter remote files (default: species-aware pattern)
193+
- `cache_dir`: Override cache directory (default: `~/.cache/just-dna-pipelines/ensembl/{species}`)
159194
- `max_concurrent_downloads`: Maximum parallel downloads (default: `4`)
160195
- `verify_checksums`: Whether to verify checksums (default: `True`)
196+
- `force_download`: Re-download even if files already exist (default: `False`)
197+
- `http_max_pool`: HTTP pool size for downloader (default: `20`)
161198
- `retries`: Number of retry attempts per file (default: `10`)
162199
- `connect_timeout`: Connection timeout in seconds (default: `10.0`)
163200
- `sock_read_timeout`: Socket read timeout in seconds (default: `120.0`)
164201

202+
**ParquetConversionConfig:**
203+
- `max_concurrent_conversions`: Maximum parallel conversions. If unset, uses `PREPARE_ANNOTATIONS_PARQUET_WORKERS` env var; defaults to `2`.
204+
- `threads`: Thread count per conversion (auto-detected if not set).
205+
- `force_convert`: Re-convert even when parquet is up-to-date.
206+
207+
**Environment overrides:**
208+
- `PREPARE_ANNOTATIONS_PARQUET_WORKERS`: Max concurrent parquet conversions (used when config not set).
209+
165210
### HuggingFace upload lineage
166211

167212
The upload asset (`ensembl_hf_upload`) depends on the parquet directory output (`ensembl_parquet_files`). In the Dagster UI, this makes it straightforward to answer:
168213
- "Which local dataset was uploaded?"
169214
- "When did we last upload, and what was uploaded vs skipped?"
170215

171216
Uploads are executed using the existing uploader implementation:
172-
- `prepare_annotations.preparation.huggingface_uploader.upload_parquet_to_hf`
217+
- `prepare_annotations.huggingface_uploader.upload_parquet_to_hf`
173218

notebooks/inspect_modules.ipynb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
],
9898
"source": [
9999
"from os import listdir\n",
100-
"from prepare_annotations.paths import (\n",
100+
"from prepare_annotations.resources import (\n",
101101
" get_cache_dir,\n",
102102
" get_ensembl_cache,\n",
103103
" get_ensembl_variations_cache,\n",
@@ -628,7 +628,7 @@
628628
}
629629
],
630630
"source": [
631-
"from prepare_annotations.paths import (\n",
631+
"from prepare_annotations.resources import (\n",
632632
" find_ensembl_genome_fasta,\n",
633633
" list_ensembl_genome_fastas,\n",
634634
")\n",

0 commit comments

Comments
 (0)