From ddcb4390671607727b950d8aff5bd7b0e3152084 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Wed, 28 Jan 2026 12:51:17 +0000 Subject: [PATCH 01/15] Add more guards to run.sh --- scripts/run.sh | 83 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 33 deletions(-) diff --git a/scripts/run.sh b/scripts/run.sh index 71a060a..57f9f6b 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -107,47 +107,64 @@ if [ -f "${DATADIR}"/BL_Species_Elevations_2023.csv ]; then python3 ./prepare_species/apply_birdlife_data.py --geojsons "${DATADIR}"/species-info/AVES --overrides "${DATADIR}"/BL_Species_Elevations_2023.csv fi -echo "Generating AoH task list..." -python3 ./utils/aoh_generator.py --input "${DATADIR}"/species-info --datadir "${DATADIR}" --output "${DATADIR}"/aohbatch.csv +if [ ! -d "${DATADIR}"/aohs ]; then + echo "Generating AoH task list..." + python3 ./utils/aoh_generator.py --input "${DATADIR}"/species-info --datadir "${DATADIR}" --output "${DATADIR}"/aohbatch.csv -echo "Generating AoHs..." -littlejohn -j "${PROCESS_COUNT}" -o "${DATADIR}"/aohbatch.log -c "${DATADIR}"/aohbatch.csv "${AOH_CALC_BIN}" + echo "Generating AoHs..." + littlejohn -j "${PROCESS_COUNT}" -o "${DATADIR}"/aohbatch.log -c "${DATADIR}"/aohbatch.csv "${AOH_CALC_BIN}" +fi # Calculate predictors from AoHs -echo "Generating species richness..." -aoh-species-richness --aohs_folder "${DATADIR}"/aohs/current/ \ - --output "${DATADIR}"/summaries/species_richness.tif -echo "Generating endemism..." -aoh-endemism --aohs_folder "${DATADIR}"/aohs/current/ \ - --species_richness "${DATADIR}"/summaries/species_richness.tif \ - --output "${DATADIR}"/summaries/endemism.tif +if [ ! -f "${DATADIR}"/summaries/species_richness.tif ]; then + echo "Generating species richness..." + aoh-species-richness --aohs_folder "${DATADIR}"/aohs/current/ \ + --output "${DATADIR}"/summaries/species_richness.tif +fi +if [ ! -f "${DATADIR}"/summaries/endemism.tif ]; then + echo "Generating endemism..." + aoh-endemism --aohs_folder "${DATADIR}"/aohs/current/ \ + --species_richness "${DATADIR}"/summaries/species_richness.tif \ + --output "${DATADIR}"/summaries/endemism.tif +fi # Aoh Validation -echo "Collating validation data..." -aoh-collate-data --aoh_results "${DATADIR}"/aohs/current/ \ - --output "${DATADIR}"/validation/aohs.csv -echo "Calculating model validation..." -aoh-validate-prevalence --collated_aoh_data "${DATADIR}"/validation/aohs.csv \ - --output "${DATADIR}"/validation/model_validation.csv +if [ ! -f "${DATADIR}"/validation/aohs.csv ]; then + echo "Collating validation data..." + aoh-collate-data --aoh_results "${DATADIR}"/aohs/current/ \ + --output "${DATADIR}"/validation/aohs.csv +fi +if [ ! -f "${DATADIR}"/validation/model_validation.csv ]; then + echo "Calculating model validation..." + aoh-validate-prevalence --collated_aoh_data "${DATADIR}"/validation/aohs.csv \ + --output "${DATADIR}"/validation/model_validation.csv +fi + for TAXA in "${TAXALIST[@]}" do - echo "Fetching GBIF data for ${TAXA}..." - aoh-fetch-gbif-data --collated_aoh_data "${DATADIR}"/validation/aohs.csv \ - --taxa "${TAXA}" \ - --output_dir "${DATADIR}"/validation/occurrences/ - echo "Validating occurrences for ${TAXA}..." - aoh-validate-occurrences --gbif_data_path "${DATADIR}"/validation/occurrences/"${TAXA}" \ - --species_data "${DATADIR}"/species-info/"${TAXA}"/current/ \ - --aoh_results "${DATADIR}"/aohs/current/"${TAXA}"/ \ - --output "${DATADIR}"/validation/occurrences/"${TAXA}".csv + if [ ! -f "${DATADIR}"/validation/occurrences/"${TAXA}".csv ]; then + echo "Fetching GBIF data for ${TAXA}..." + aoh-fetch-gbif-data --collated_aoh_data "${DATADIR}"/validation/aohs.csv \ + --taxa "${TAXA}" \ + --output_dir "${DATADIR}"/validation/occurrences/ + echo "Validating occurrences for ${TAXA}..." + aoh-validate-occurrences --gbif_data_path "${DATADIR}"/validation/occurrences/"${TAXA}" \ + --species_data "${DATADIR}"/species-info/"${TAXA}"/current/ \ + --aoh_results "${DATADIR}"/aohs/current/"${TAXA}"/ \ + --output "${DATADIR}"/validation/occurrences/"${TAXA}".csv + fi done # Threats -echo "Generating threat task list..." -python3 ./utils/threats_generator.py --input "${DATADIR}"/species-info --datadir "${DATADIR}" --output "${DATADIR}"/threatbatch.csv - -echo "Generating threat rasters..." -littlejohn -j "${PROCESS_COUNT}" -o "${DATADIR}"/threatbatch.log -c "${DATADIR}"/threatbatch.csv "${PYTHON_BIN}" -- ./threats/threat_processing.py +if [ ! -d "${DATADIR}"/threat_rasters ]; then + echo "Generating threat task list..." + python3 ./utils/threats_generator.py --input "${DATADIR}"/species-info --datadir "${DATADIR}" --output "${DATADIR}"/threatbatch.csv -echo "Summarising threats..." -python3 ./threats/threat_summation.py --threat_rasters "${DATADIR}"/threat_rasters --output "${DATADIR}"/threat_results + echo "Generating threat rasters..." + littlejohn -j "${PROCESS_COUNT}" -o "${DATADIR}"/threatbatch.log -c "${DATADIR}"/threatbatch.csv "${PYTHON_BIN}" -- ./threats/threat_processing.py +fi +if [ ! -d "${DATADIR}"/threat_results ]; then + echo "Summarising threats..." + python3 ./threats/threat_summation.py --threat_rasters "${DATADIR}"/threat_rasters --output "${DATADIR}"/threat_results_tmp + mv "${DATADIR}"/threat_results_tmp "${DATADIR}"/threat_results +fi From 1071d3f307d9efb78174f536e2bade89c25a86b0 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Thu, 29 Jan 2026 13:36:37 +0000 Subject: [PATCH 02/15] Initial cut of using snakemake --- .github/workflows/python-package.yml | 14 +- .gitignore | 2 + Dockerfile | 28 ++- config/config.yaml | 38 ++++ prepare_layers/convert_crosswalk.py | 5 + prepare_layers/remove_nans_from_mask.py | 5 + prepare_species/apply_birdlife_data.py | 5 + prepare_species/extract_species_data_psql.py | 23 +- requirements.txt | 4 + threats/threat_processing.py | 6 + threats/threat_summation.py | 6 + workflow/Snakefile | 134 +++++++++++ workflow/rules/aoh.smk | 155 +++++++++++++ workflow/rules/prepare.smk | 167 ++++++++++++++ workflow/rules/species.smk | 155 +++++++++++++ workflow/rules/summaries.smk | 86 +++++++ workflow/rules/threats.smk | 228 +++++++++++++++++++ workflow/rules/validation.smk | 169 ++++++++++++++ 18 files changed, 1209 insertions(+), 21 deletions(-) create mode 100644 config/config.yaml create mode 100644 workflow/Snakefile create mode 100644 workflow/rules/aoh.smk create mode 100644 workflow/rules/prepare.smk create mode 100644 workflow/rules/species.smk create mode 100644 workflow/rules/summaries.smk create mode 100644 workflow/rules/threats.smk create mode 100644 workflow/rules/validation.smk diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f1baeec..8360353 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -22,7 +22,7 @@ jobs: - name: Install system run: | apt-get update -qqy - apt-get install -y git python3-pip libpq5 libpq-dev r-base libtirpc-dev shellcheck + apt-get install -y git python3-pip libpq5 libpq-dev r-base libtirpc-dev - uses: actions/checkout@v4 with: submodules: 'true' @@ -37,6 +37,7 @@ jobs: python -m pip install --upgrade pip python -m pip install gdal[numpy]==3.11.4 python -m pip install -r requirements.txt + python -m pip install snakefmt - name: Lint with pylint run: python3 -m pylint utils prepare_layers prepare_species threats @@ -47,6 +48,11 @@ jobs: - name: Tests run: python3 -m pytest ./tests - - name: Script checks - run: | - shellcheck ./scripts/run.sh + - name: Snakemake format check + run: snakefmt --check workflow/ + + - name: Snakemake lint + run: snakemake --snakefile workflow/Snakefile --lint + + - name: Snakemake dry run + run: snakemake --snakefile workflow/Snakefile --dry-run all diff --git a/.gitignore b/.gitignore index 68bc17f..526d6b5 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ __pycache__/ *.py[cod] *$py.class +.snakemake/ + # C extensions *.so diff --git a/Dockerfile b/Dockerfile index 913f289..65448f4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,10 @@ +# Build stage for reclaimer (used to download from Zenodo) FROM golang:latest AS reclaimerbuild RUN git clone https://github.com/quantifyearth/reclaimer.git WORKDIR /go/reclaimer RUN go mod tidy RUN go build -FROM golang:latest AS littlejohnbuild -RUN git clone https://github.com/quantifyearth/littlejohn.git -WORKDIR /go/littlejohn -RUN go mod tidy -RUN go build - FROM ghcr.io/osgeo/gdal:ubuntu-small-3.11.4 RUN apt-get update -qqy && \ @@ -17,7 +12,6 @@ RUN apt-get update -qqy && \ git \ cmake \ python3-pip \ - shellcheck \ r-base \ libpq-dev \ libtirpc-dev \ @@ -25,7 +19,6 @@ RUN apt-get update -qqy && \ && rm -rf /var/cache/apt/* COPY --from=reclaimerbuild /go/reclaimer/reclaimer /bin/reclaimer -COPY --from=littlejohnbuild /go/littlejohn/littlejohn /bin/littlejohn RUN rm /usr/lib/python3.*/EXTERNALLY-MANAGED RUN pip install gdal[numpy]==3.11.4 @@ -33,13 +26,15 @@ RUN pip install gdal[numpy]==3.11.4 COPY requirements.txt /tmp/ RUN pip install -r /tmp/requirements.txt +# Snakemake linting/formatting tools +RUN pip install snakefmt + RUN mkdir /root/R ENV R_LIBS_USER=/root/R RUN Rscript -e 'install.packages(c("lme4","lmerTest","emmeans"), repos="https://cloud.r-project.org")' COPY ./ /root/star WORKDIR /root/star -RUN chmod 755 ./scripts/run.sh # We create a DATADIR - this should be mapped at container creation # time to a volume somewhere else @@ -53,6 +48,15 @@ ENV VIRTUAL_ENV=/usr ENV PYTHONPATH=/root/star RUN python3 -m pytest ./tests -RUN python3 -m pylint prepare_layers prepare_species utils tests -RUN python3 -m mypy prepare_layers prepare_species utils tests -RUN shellcheck ./scripts/run.sh +RUN python3 -m pylint prepare_layers prepare_species threats utils tests +RUN python3 -m mypy prepare_layers prepare_species threats utils tests + +# Snakemake validation +RUN snakefmt --check workflow/ +RUN snakemake --snakefile workflow/Snakefile --lint +RUN snakemake --snakefile workflow/Snakefile --dry-run all + +# Default command runs the full Snakemake pipeline +# Use --cores to specify parallelism, e.g.: docker run ... --cores 8 +# Note: --scheduler greedy avoids ILP solver issues on some platforms +CMD ["snakemake", "--snakefile", "workflow/Snakefile", "--scheduler", "greedy", "--cores", "1", "all"] diff --git a/config/config.yaml b/config/config.yaml new file mode 100644 index 0000000..f5613f9 --- /dev/null +++ b/config/config.yaml @@ -0,0 +1,38 @@ +# STAR Pipeline Configuration +# =========================== + +# Taxonomic classes to process +taxa: + - AMPHIBIA + - AVES + - MAMMALIA + - REPTILIA + +# Scenario for habitat layers (for future expansion) +scenario: current + +# Projection used throughout the pipeline +projection: "ESRI:54009" + +# Scale for habitat processing (meters) +habitat_scale: 992.292720200000133 + +# Input data files (relative to datadir) +# These are expected to be pre-downloaded in the Zenodo subfolder +inputs: + raw_habitat: "habitat/raw.tif" + zenodo_mask: "Zenodo/CGLS100Inland_withGADMIslands.tif" + zenodo_elevation_max: "Zenodo/FABDEM_1km_max_patched.tif" + zenodo_elevation_min: "Zenodo/FABDEM_1km_min_patched.tif" + zenodo_islands: "Zenodo/MissingLandcover_1km_cover.tif" + crosswalk_source: "data/crosswalk_bin_T.csv" + +# Optional input files (pipeline will check if these exist) +optional_inputs: + birdlife_elevations: "BL_Species_Elevations_2023.csv" + species_excludes: "SpeciesList_generalisedRangePolygons.csv" + +# Zenodo configuration for downloading raw habitat +zenodo: + habitat_id: 3939050 + habitat_filename: "PROBAV_LC100_global_v3.0.1_2019-nrt_Discrete-Classification-map_EPSG-4326.tif" diff --git a/prepare_layers/convert_crosswalk.py b/prepare_layers/convert_crosswalk.py index 83aa8d7..cbd36db 100644 --- a/prepare_layers/convert_crosswalk.py +++ b/prepare_layers/convert_crosswalk.py @@ -2,6 +2,7 @@ from pathlib import Path import pandas as pd +from snakemake_argparse_bridge import snakemake_compatible # Take from https://www.iucnredlist.org/resources/habitat-classification-scheme IUCN_HABITAT_CODES = { @@ -57,6 +58,10 @@ def convert_crosswalk( df = pd.DataFrame(res, columns=["code", "value"]) df.to_csv(output_path, index=False) +@snakemake_compatible(mapping={ + "original_path": "input.original", + "output_path": "output.crosswalk", +}) def main() -> None: parser = argparse.ArgumentParser(description="Convert IUCN crosswalk to minimal common format.") parser.add_argument( diff --git a/prepare_layers/remove_nans_from_mask.py b/prepare_layers/remove_nans_from_mask.py index 180cd9f..0300312 100644 --- a/prepare_layers/remove_nans_from_mask.py +++ b/prepare_layers/remove_nans_from_mask.py @@ -3,6 +3,7 @@ from pathlib import Path import yirgacheffe as yg +from snakemake_argparse_bridge import snakemake_compatible def remove_nans_from_mask( input_path: Path, @@ -13,6 +14,10 @@ def remove_nans_from_mask( converted = layer.nan_to_num() converted.to_geotiff(output_path) +@snakemake_compatible(mapping={ + "original_path": "input.original", + "output_path": "output.mask", +}) def main() -> None: parser = argparse.ArgumentParser(description="Convert NaNs to zeros in mask layers") parser.add_argument( diff --git a/prepare_species/apply_birdlife_data.py b/prepare_species/apply_birdlife_data.py index da65746..bcc7199 100644 --- a/prepare_species/apply_birdlife_data.py +++ b/prepare_species/apply_birdlife_data.py @@ -5,6 +5,7 @@ import aoh import geopandas as gpd import pandas as pd +from snakemake_argparse_bridge import snakemake_compatible # Columns from current BirdLife data overrides: # SIS ID @@ -51,6 +52,10 @@ def apply_birdlife_data( res = gpd.GeoDataFrame(data.to_frame().transpose(), crs=species_info.crs, geometry="geometry") res.to_file(path, driver="GeoJSON") +@snakemake_compatible(mapping={ + "geojson_directory_path": "params.geojson_dir", + "overrides": "input.overrides", +}) def main() -> None: parser = argparse.ArgumentParser(description="Process agregate species data to per-species-file.") parser.add_argument( diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index d13b7cf..bfeda99 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -14,6 +14,7 @@ import psycopg2 import shapely from postgis.psycopg import register +from snakemake_argparse_bridge import snakemake_compatible from common import ( CATEGORY_WEIGHTS, @@ -289,11 +290,14 @@ def extract_data_per_species( results = apply_overrides(overrides_path, results) # The limiting amount here is how many concurrent connections the database can take - with Pool(processes=20) as pool: - reports = pool.map( - partial(process_row, class_name, era_output_directory_path, target_projection, presence), - results - ) + try: + with Pool(processes=20) as pool: + reports = pool.map( + partial(process_row, class_name, era_output_directory_path, target_projection, presence), + results + ) + except psycopg2.OperationalError: + sys.exit("Database connection failed for some rows, aborting") reports_df = pd.DataFrame( [x.as_row() for x in reports], @@ -302,6 +306,13 @@ def extract_data_per_species( os.makedirs(era_output_directory_path, exist_ok=True) reports_df.to_csv(era_output_directory_path / "report.csv", index=False) +@snakemake_compatible(mapping={ + "classname": "params.classname", + "overrides": "params.overrides", + "excludes": "params.excludes", + "output_directory_path": "params.output_dir", + "target_projection": "params.projection", +}) def main() -> None: parser = argparse.ArgumentParser(description="Process agregate species data to per-species-file.") parser.add_argument( @@ -316,6 +327,7 @@ def main() -> None: type=Path, help="CSV of overrides", required=False, + default=None, dest="overrides", ) parser.add_argument( @@ -323,6 +335,7 @@ def main() -> None: type=Path, help="CSV of taxon IDs to not include", required=False, + default=None, dest="excludes" ) parser.add_argument( diff --git a/requirements.txt b/requirements.txt index c760a2c..555868a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,10 @@ boto3 yirgacheffe>=1.12,<2.0 aoh[validation]>=2.0.2,<3.0 +# Snakemake workflow management +snakemake>=8.0 +snakemake-argparse-bridge + # GDAL should be installed manually to match the version of the library installed on your machine gdal[numpy] diff --git a/threats/threat_processing.py b/threats/threat_processing.py index 0e04c73..3bd7b27 100644 --- a/threats/threat_processing.py +++ b/threats/threat_processing.py @@ -7,6 +7,7 @@ import geopandas as gpd import yirgacheffe as yg from pyogrio.errors import DataSourceError +from snakemake_argparse_bridge import snakemake_compatible def threat_processing_per_species( species_data_path: Path, @@ -51,6 +52,11 @@ def threat_processing_per_species( output_path = threat_dir_path / f"{taxon_id}.tif" per_threat_per_species_score.to_geotiff(output_path) +@snakemake_compatible(mapping={ + "species_data_path": "input.species_data", + "aoh_path": "input.aoh", + "output_directory_path": "params.output_dir", +}) def main() -> None: os.environ["OGR_GEOJSON_MAX_OBJ_SIZE"] = "0" diff --git a/threats/threat_summation.py b/threats/threat_summation.py index 5b93ede..b05d38c 100644 --- a/threats/threat_summation.py +++ b/threats/threat_summation.py @@ -9,6 +9,7 @@ import yirgacheffe as yg from yirgacheffe.layers import RasterLayer from osgeo import gdal +from snakemake_argparse_bridge import snakemake_compatible gdal.SetCacheMax(1024 * 1024 * 32) @@ -180,6 +181,11 @@ def threat_summation( reduce_to_next_level(level1_target, final_target, processes_count) +@snakemake_compatible(mapping={ + "rasters_directory": "params.threat_rasters_dir", + "output_directory": "params.output_dir", + "processes_count": "threads", +}) def main() -> None: parser = argparse.ArgumentParser(description="Generates the combined, and level 1 and level 2 threat rasters.") parser.add_argument( diff --git a/workflow/Snakefile b/workflow/Snakefile new file mode 100644 index 0000000..436b63a --- /dev/null +++ b/workflow/Snakefile @@ -0,0 +1,134 @@ +# STAR Pipeline - Snakemake Workflow +# =================================== +# +# This workflow implements the STAR (Species Threat Abatement and Recovery) +# methodology from the IUCN. It processes species data, generates Area of +# Habitat (AoH) rasters, and produces threat-weighted biodiversity maps. +# +# Usage: +# snakemake --cores N all +# snakemake --cores N validation +# snakemake --cores N threats +# +# Environment variables required: +# DATADIR - Directory for input/output data +# DB_HOST, DB_NAME, DB_USER, DB_PASSWORD - PostgreSQL credentials +# +# For GBIF validation (optional): +# GBIF_USERNAME, GBIF_EMAIL, GBIF_PASSWORD + +import os +from pathlib import Path + +# Load configuration +configfile: "config/config.yaml" + +# Data directory from environment variable +DATADIR = Path(os.environ.get("DATADIR", "/data")) + +# Taxa list from config +TAXA = config["taxa"] +SCENARIO = config["scenario"] + +# Source directory (where this repo lives) +SRCDIR = Path(workflow.basedir).parent + +# Include rule modules +include: "rules/prepare.smk" +include: "rules/species.smk" +include: "rules/aoh.smk" +include: "rules/summaries.smk" +include: "rules/threats.smk" +include: "rules/validation.smk" + + +# ============================================================================= +# Target Rules +# ============================================================================= + +rule all: + """ + Default target: run the complete STAR pipeline including validation. + """ + input: + # Final STAR output + DATADIR / "threat_results" / "level0" / "top.tif", + # Summaries + DATADIR / "summaries" / "species_richness.tif", + DATADIR / "summaries" / "endemism.tif", + # Model validation + DATADIR / "validation" / "model_validation.csv", + + +rule threats: + """ + Target: generate threat results without validation. + """ + input: + DATADIR / "threat_results" / "level0" / "top.tif", + + +rule summaries: + """ + Target: generate species richness and endemism maps. + """ + input: + DATADIR / "summaries" / "species_richness.tif", + DATADIR / "summaries" / "endemism.tif", + + +rule aohs: + """ + Target: generate all AoH rasters. + """ + input: + expand( + str(DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete"), + taxa=TAXA + ), + + +rule species_data: + """ + Target: extract species data from PostgreSQL. + """ + input: + expand( + str(DATADIR / "species-info" / "{taxa}" / SCENARIO / "report.csv"), + taxa=TAXA + ), + + +rule prepare: + """ + Target: prepare all base layers (habitat, masks, crosswalk). + """ + input: + DATADIR / "crosswalk.csv", + DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif", + expand( + str(DATADIR / "habitat_layers" / SCENARIO / "lcc_{n}.tif"), + n=range(0, 127) # Habitat layer codes + ), + + +rule validation: + """ + Target: run model validation (excludes GBIF which is expensive). + """ + input: + DATADIR / "validation" / "model_validation.csv", + + +# ============================================================================= +# Utility Functions +# ============================================================================= + +def get_all_species_geojsons(wildcards): + """ + Return all species GeoJSON files for a given taxa. + Used for checkpoint-based dependency resolution. + """ + checkpoint_output = checkpoints.extract_species_data.get(taxa=wildcards.taxa).output[0] + geojson_dir = DATADIR / "species-info" / wildcards.taxa / SCENARIO + return list(geojson_dir.glob("*.geojson")) diff --git a/workflow/rules/aoh.smk b/workflow/rules/aoh.smk new file mode 100644 index 0000000..29d8050 --- /dev/null +++ b/workflow/rules/aoh.smk @@ -0,0 +1,155 @@ +# STAR Pipeline - Area of Habitat (AoH) Generation Rules +# ======================================================= +# +# These rules generate AoH rasters for each species. This is the most +# parallelizable part of the pipeline - each species can be processed +# independently with `snakemake --cores N`. +# +# Code-sensitive: These rules rebuild if the aoh package version changes. + +import os +from pathlib import Path + + +# ============================================================================= +# Version Sentinel for AOH Code +# ============================================================================= + +rule aoh_version_sentinel: + """ + Create a version sentinel that tracks the aoh package version. + AOH rules depend on this to trigger rebuilds when the package updates. + """ + output: + sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + run: + import subprocess + + os.makedirs(os.path.dirname(output.sentinel), exist_ok=True) + + # Get aoh package version + try: + result = subprocess.run( + ["aoh-calc", "--version"], + capture_output=True, text=True, check=True + ) + aoh_version = result.stdout.strip() + except Exception: + aoh_version = "unknown" + + with open(output.sentinel, 'w') as f: + f.write(f"aoh: {aoh_version}\n") + + +# ============================================================================= +# Per-Species AOH Generation +# ============================================================================= + +rule generate_aoh: + """ + Generate Area of Habitat raster for a single species. + + This rule is parallelizable - run with `snakemake --cores N` to process + multiple species concurrently. Each species takes its GeoJSON data and + produces a raster (.tif) and metadata (.json) file. + + The rule depends on the aoh version sentinel to rebuild when the + aoh package is updated. + """ + input: + # Species data + species_data=DATADIR / "species-info" / "{taxa}" / SCENARIO / "{species_id}.geojson", + # Base layers (precious - won't trigger rebuilds) + habitat_sentinel=ancient(DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete"), + crosswalk=DATADIR / "crosswalk.csv", + mask=ancient(DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif"), + elevation_max=ancient(DATADIR / "Zenodo" / "FABDEM_1km_max_patched.tif"), + elevation_min=ancient(DATADIR / "Zenodo" / "FABDEM_1km_min_patched.tif"), + # Version sentinel for code-sensitive rebuilds + version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + output: + # Only declare JSON as output - TIF is optional (not created for empty AOHs) + metadata=DATADIR / "aohs" / SCENARIO / "{taxa}" / "{species_id}_all.json", + params: + habitat_dir=DATADIR / "habitat_layers" / SCENARIO, + log: + DATADIR / "logs" / "aoh" / "{taxa}" / "{species_id}.log", + resources: + # Limit concurrent AOH jobs if needed (e.g., for memory) + aoh_slots=1, + shell: + """ + mkdir -p $(dirname {log}) + aoh-calc \ + --fractional_habitats {params.habitat_dir} \ + --elevation-max {input.elevation_max} \ + --elevation-min {input.elevation_min} \ + --crosswalk {input.crosswalk} \ + --speciesdata {input.species_data} \ + --weights {input.mask} \ + --output $(dirname {output.metadata}) \ + 2>&1 | tee {log} + """ + + +# ============================================================================= +# Per-Taxa AOH Aggregation +# ============================================================================= + +def get_species_ids_for_taxa(wildcards): + """ + Get all species IDs for a taxa by reading the checkpoint output. + Returns list of species IDs (GeoJSON file stems). + """ + # Wait for the checkpoint to complete + checkpoint_output = checkpoints.extract_species_data.get(taxa=wildcards.taxa).output[0] + geojson_dir = Path(checkpoint_output).parent + return [p.stem for p in geojson_dir.glob("*.geojson")] + + +def get_all_aoh_metadata_for_taxa(wildcards): + """ + Get paths to all AOH metadata files for a taxa. + """ + species_ids = get_species_ids_for_taxa(wildcards) + return [ + DATADIR / "aohs" / SCENARIO / wildcards.taxa / f"{sid}_all.json" + for sid in species_ids + ] + + +rule aggregate_aohs_per_taxa: + """ + Aggregate rule that ensures all AOHs for a taxa are generated. + Creates a sentinel file when complete. + """ + input: + # Only depend on JSON metadata (always created), not TIFs (optional) + metadata=get_all_aoh_metadata_for_taxa, + output: + sentinel=DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete", + shell: + """ + echo "Generated $(echo {input.metadata} | wc -w) AOHs for {wildcards.taxa}" + touch {output.sentinel} + """ + + +# ============================================================================= +# All AOHs Aggregation +# ============================================================================= + +rule all_aohs: + """ + Aggregate rule that ensures all AOHs for all taxa are generated. + """ + input: + sentinels=expand( + str(DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete"), + taxa=TAXA + ), + output: + sentinel=DATADIR / "aohs" / SCENARIO / ".all_complete", + shell: + "touch {output.sentinel}" + diff --git a/workflow/rules/prepare.smk b/workflow/rules/prepare.smk new file mode 100644 index 0000000..f0554cb --- /dev/null +++ b/workflow/rules/prepare.smk @@ -0,0 +1,167 @@ +# STAR Pipeline - Prepare Layers Rules +# ===================================== +# +# These rules handle the "precious" base layers that are slow to build +# and should only be regenerated if explicitly deleted (not if code changes). +# +# Precious layers: +# - habitat_layers/current/lcc_*.tif +# - masks/CGLS100Inland_withGADMIslands.tif +# - elevation maps (if generated locally) +# +# Normal layers (regenerate if inputs change): +# - crosswalk.csv + +import os +from pathlib import Path + + +# ============================================================================= +# Crosswalk Table (normal dependency tracking) +# ============================================================================= + +rule convert_crosswalk: + """ + Convert IUCN crosswalk to minimal common format. + This is fast to regenerate so uses normal dependency tracking. + """ + input: + original=SRCDIR / "data" / "crosswalk_bin_T.csv", + output: + crosswalk=DATADIR / "crosswalk.csv", + log: + DATADIR / "logs" / "convert_crosswalk.log", + script: + str(SRCDIR / "prepare_layers" / "convert_crosswalk.py") + + +# ============================================================================= +# Mask Processing (precious - only if missing) +# ============================================================================= + +rule remove_nans_from_mask: + """ + Convert NaNs to zeros in the mask layer. + + This is a precious layer - it will only be built if the output doesn't exist. + The rule won't trigger rebuilds due to code changes. + """ + input: + original=ancient(DATADIR / "Zenodo" / "CGLS100Inland_withGADMIslands.tif"), + output: + mask=DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif", + log: + DATADIR / "logs" / "remove_nans_from_mask.log", + script: + str(SRCDIR / "prepare_layers" / "remove_nans_from_mask.py") + + +# ============================================================================= +# Habitat Layer Processing (precious - only if missing) +# ============================================================================= + +rule download_habitat: + """ + Download raw habitat map from Zenodo. + + This is a precious layer - will only download if output doesn't exist. + """ + output: + habitat=DATADIR / "habitat" / "raw.tif", + params: + zenodo_id=config["zenodo"]["habitat_id"], + filename=config["zenodo"]["habitat_filename"], + log: + DATADIR / "logs" / "download_habitat.log", + shell: + """ + reclaimer zenodo --zenodo_id {params.zenodo_id} \ + --filename "{params.filename}" \ + --output {output.habitat} \ + 2>&1 | tee {log} + """ + + +rule process_habitat: + """ + Process raw habitat map into per-class layers. + + This is a precious layer - the entire habitat_layers directory should + only be built if it doesn't exist. We use a sentinel file to track this. + """ + input: + habitat=ancient(DATADIR / "habitat" / "raw.tif"), + output: + # Sentinel file to indicate habitat processing is complete + sentinel=DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete", + params: + scale=config["habitat_scale"], + projection=config["projection"], + output_dir=DATADIR / "habitat_layers" / SCENARIO, + tmp_dir=DATADIR / "tmp_habitat_layers" / SCENARIO, + log: + DATADIR / "logs" / "process_habitat.log", + shell: + """ + set -e + aoh-habitat-process --habitat {input.habitat} \ + --scale {params.scale} \ + --projection "{params.projection}" \ + --output {params.tmp_dir} \ + 2>&1 | tee {log} + + # Atomic move of completed directory + rm -rf {params.output_dir} + mv {params.tmp_dir} {params.output_dir} + + # Create sentinel file + touch {output.sentinel} + """ + + +rule copy_islands_layer: + """ + Copy the missing landcover (islands) layer to habitat layers. + This is lcc_0.tif which represents island areas. + """ + input: + islands=ancient(DATADIR / "Zenodo" / "MissingLandcover_1km_cover.tif"), + # Ensure habitat processing is done first + habitat_sentinel=DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete", + output: + lcc_0=DATADIR / "habitat_layers" / SCENARIO / "lcc_0.tif", + log: + DATADIR / "logs" / "copy_islands_layer.log", + shell: + """ + cp {input.islands} {output.lcc_0} 2>&1 | tee {log} + """ + + +# ============================================================================= +# Helper rule to get specific habitat layer +# ============================================================================= + +def get_habitat_layer(wildcards): + """ + Returns the path to a specific habitat layer. + lcc_0 is special (islands layer), others come from habitat processing. + """ + n = int(wildcards.n) + if n == 0: + return DATADIR / "habitat_layers" / SCENARIO / "lcc_0.tif" + else: + return DATADIR / "habitat_layers" / SCENARIO / f"lcc_{n}.tif" + + +rule habitat_layer: + """ + Pseudo-rule to request a specific habitat layer. + This triggers either the habitat processing or islands copy as needed. + """ + input: + layer=get_habitat_layer, + output: + # This is a bit of a trick - we declare the output but let the + # upstream rules actually create it + touch(DATADIR / "habitat_layers" / SCENARIO / ".lcc_{n}_exists"), diff --git a/workflow/rules/species.smk b/workflow/rules/species.smk new file mode 100644 index 0000000..e1cdf00 --- /dev/null +++ b/workflow/rules/species.smk @@ -0,0 +1,155 @@ +# STAR Pipeline - Species Data Extraction Rules +# ============================================== +# +# These rules handle extracting species data from the IUCN PostgreSQL database. +# Species extraction is a checkpoint because the number of output files +# (one GeoJSON per species) is only known after extraction completes. +# +# Code-sensitive: These rules should rebuild if the extraction scripts change. + +import os +from pathlib import Path + + +# ============================================================================= +# Version Sentinel for Code-Sensitive Dependencies +# ============================================================================= + +rule species_version_sentinel: + """ + Create a version sentinel that tracks changes to species extraction code. + Downstream rules depend on this to trigger rebuilds when code changes. + """ + input: + # Track the extraction scripts + script1=SRCDIR / "prepare_species" / "extract_species_data_psql.py", + script2=SRCDIR / "prepare_species" / "common.py", + output: + sentinel=DATADIR / ".sentinels" / "species_code_version.txt", + run: + import hashlib + import subprocess + + os.makedirs(os.path.dirname(output.sentinel), exist_ok=True) + + # Hash the tracked scripts + hashes = [] + for f in input: + with open(f, 'rb') as fh: + hashes.append(hashlib.sha256(fh.read()).hexdigest()[:12]) + + # Get aoh package version + try: + result = subprocess.run( + ["aoh-calc", "--version"], + capture_output=True, text=True, check=True + ) + aoh_version = result.stdout.strip() + except Exception: + aoh_version = "unknown" + + with open(output.sentinel, 'w') as f: + f.write(f"scripts: {','.join(hashes)}\n") + f.write(f"aoh: {aoh_version}\n") + + +# ============================================================================= +# Species Data Extraction (Checkpoint) +# ============================================================================= + +checkpoint extract_species_data: + """ + Extract species data from PostgreSQL database. + + This is a checkpoint because the number of output GeoJSON files is only + known after extraction. Each taxa produces N species files where N is + determined by the database query. + + Environment variables required: + DB_HOST, DB_NAME, DB_USER, DB_PASSWORD + """ + input: + # Code version sentinel for rebuild tracking + version_sentinel=DATADIR / ".sentinels" / "species_code_version.txt", + output: + # The report.csv is the known output; GeoJSON files are dynamic + report=DATADIR / "species-info" / "{taxa}" / SCENARIO / "report.csv", + params: + classname="{taxa}", + output_dir=lambda wildcards: DATADIR / "species-info" / wildcards.taxa, + projection=config["projection"], + excludes=lambda wildcards: ( + f"--excludes {DATADIR / config['optional_inputs']['species_excludes']}" + if (DATADIR / config["optional_inputs"]["species_excludes"]).exists() + else "" + ), + log: + DATADIR / "logs" / "extract_species_{taxa}.log", + resources: + # Serialize DB access - only one extraction at a time + db_connections=1, + shell: + """ + python3 {SRCDIR}/prepare_species/extract_species_data_psql.py \ + --class {params.classname} \ + --output {params.output_dir} \ + --projection "{params.projection}" \ + {params.excludes} \ + 2>&1 | tee {log} + """ + + +# ============================================================================= +# BirdLife Elevation Overrides (Optional) +# ============================================================================= + +rule apply_birdlife_overrides: + """ + Apply BirdLife elevation data overrides to AVES species. + + This rule only runs if the BirdLife elevations file exists. + It modifies GeoJSON files in-place. + """ + input: + report=DATADIR / "species-info" / "AVES" / SCENARIO / "report.csv", + overrides=DATADIR / config["optional_inputs"]["birdlife_elevations"], + output: + sentinel=DATADIR / "species-info" / "AVES" / ".birdlife_applied", + params: + geojson_dir=DATADIR / "species-info", + log: + DATADIR / "logs" / "apply_birdlife_overrides.log", + shell: + """ + python3 {SRCDIR}/prepare_species/apply_birdlife_data.py \ + --geojsons {params.geojson_dir} \ + --overrides {input.overrides} \ + 2>&1 | tee {log} + touch {output.sentinel} + """ + + +# ============================================================================= +# Aggregation Rule for All Species Data +# ============================================================================= + +def all_species_reports(wildcards): + """ + Return paths to all species reports for all taxa. + """ + return [ + DATADIR / "species-info" / taxa / SCENARIO / "report.csv" + for taxa in TAXA + ] + + +rule all_species_data: + """ + Aggregate rule that ensures all species data is extracted. + """ + input: + reports=all_species_reports, + output: + sentinel=DATADIR / "species-info" / ".all_extracted", + shell: + "touch {output.sentinel}" diff --git a/workflow/rules/summaries.smk b/workflow/rules/summaries.smk new file mode 100644 index 0000000..a8ea476 --- /dev/null +++ b/workflow/rules/summaries.smk @@ -0,0 +1,86 @@ +# STAR Pipeline - Summary Statistics Rules +# ========================================= +# +# These rules generate summary statistics from AOH rasters: +# - Species richness: count of species per pixel +# - Endemism: weighted measure of endemic species per pixel +# +# Code-sensitive: These rules rebuild if any AOH changes or if the +# aoh package version changes. + +import os +from pathlib import Path + + +# ============================================================================= +# Species Richness +# ============================================================================= + +rule species_richness: + """ + Calculate species richness from all AOH rasters. + + Species richness is the sum of all species AOHs - giving the number + of species present at each pixel. + + This rebuilds if: + - Any AOH file changes + - The aoh package version changes + """ + input: + # All AOHs must be complete + aoh_sentinel=DATADIR / "aohs" / SCENARIO / ".all_complete", + # Version tracking + version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + output: + richness=DATADIR / "summaries" / "species_richness.tif", + params: + aohs_folder=DATADIR / "aohs" / SCENARIO, + log: + DATADIR / "logs" / "species_richness.log", + shell: + """ + mkdir -p $(dirname {output.richness}) + aoh-species-richness \ + --aohs_folder {params.aohs_folder} \ + --output {output.richness} \ + 2>&1 | tee {log} + """ + + +# ============================================================================= +# Endemism +# ============================================================================= + +rule endemism: + """ + Calculate endemism from AOH rasters and species richness. + + Endemism weights each species by the inverse of its range size, + giving higher values to pixels with range-restricted species. + + This rebuilds if: + - Species richness changes + - Any AOH file changes + - The aoh package version changes + """ + input: + # Dependencies + aoh_sentinel=DATADIR / "aohs" / SCENARIO / ".all_complete", + species_richness=DATADIR / "summaries" / "species_richness.tif", + # Version tracking + version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + output: + endemism=DATADIR / "summaries" / "endemism.tif", + params: + aohs_folder=DATADIR / "aohs" / SCENARIO, + log: + DATADIR / "logs" / "endemism.log", + shell: + """ + aoh-endemism \ + --aohs_folder {params.aohs_folder} \ + --species_richness {input.species_richness} \ + --output {output.endemism} \ + 2>&1 | tee {log} + """ diff --git a/workflow/rules/threats.smk b/workflow/rules/threats.smk new file mode 100644 index 0000000..e724ada --- /dev/null +++ b/workflow/rules/threats.smk @@ -0,0 +1,228 @@ +# STAR Pipeline - Threat Processing Rules +# ======================================== +# +# These rules generate per-species threat rasters and aggregate them +# into the final STAR threat maps at different hierarchy levels. +# +# The pipeline: +# 1. For each species marked in_star=true with an AOH: +# - Generate threat rasters (one per threat the species faces) +# 2. Aggregate threat rasters: +# - Level 2: Sum by threat code prefix (e.g., 1.1, 1.2) +# - Level 1: Sum by major threat (e.g., 1, 2) +# - Level 0: Sum all threats (final STAR map) +# +# Code-sensitive: These rules rebuild if threat scripts change. + +import os +import json +from pathlib import Path + + +# ============================================================================= +# Version Sentinel for Threat Code +# ============================================================================= + +rule threat_version_sentinel: + """ + Create a version sentinel that tracks changes to threat processing code. + """ + input: + script1=SRCDIR / "threats" / "threat_processing.py", + script2=SRCDIR / "threats" / "threat_summation.py", + output: + sentinel=DATADIR / ".sentinels" / "threat_code_version.txt", + run: + import hashlib + import subprocess + + os.makedirs(os.path.dirname(output.sentinel), exist_ok=True) + + # Hash the tracked scripts + hashes = [] + for f in input: + with open(f, 'rb') as fh: + hashes.append(hashlib.sha256(fh.read()).hexdigest()[:12]) + + # Get aoh package version (threat processing depends on it) + try: + result = subprocess.run( + ["aoh-calc", "--version"], + capture_output=True, text=True, check=True + ) + aoh_version = result.stdout.strip() + except Exception: + aoh_version = "unknown" + + with open(output.sentinel, 'w') as f: + f.write(f"scripts: {','.join(hashes)}\n") + f.write(f"aoh: {aoh_version}\n") + + +# ============================================================================= +# Helper Functions +# ============================================================================= + +def get_star_species_for_taxa(wildcards): + """ + Get species IDs that should be included in STAR for a taxa. + A species is included if: + - It has in_star=true in its GeoJSON + - Its AOH raster exists (some species have no AOH due to no habitat overlap) + """ + # Wait for the checkpoint + checkpoint_output = checkpoints.extract_species_data.get(taxa=wildcards.taxa).output[0] + geojson_dir = Path(checkpoint_output).parent + + star_species = [] + for geojson_path in geojson_dir.glob("*.geojson"): + species_id = geojson_path.stem + aoh_path = DATADIR / "aohs" / SCENARIO / wildcards.taxa / f"{species_id}_all.tif" + + # Check if species should be in STAR and has an AOH + try: + with open(geojson_path, 'r') as f: + data = json.load(f) + if data['features'][0]['properties'].get('in_star', False): + # Only include if AOH TIF actually exists + if aoh_path.exists(): + star_species.append(species_id) + except (json.JSONDecodeError, KeyError, IndexError): + continue + + return star_species + + +def get_threat_raster_sentinels_for_taxa(wildcards): + """ + Get paths to threat raster sentinel files for all STAR species in a taxa. + """ + species_ids = get_star_species_for_taxa(wildcards) + return [ + DATADIR / "threat_rasters" / wildcards.taxa / f".{sid}_complete" + for sid in species_ids + ] + + +# ============================================================================= +# Per-Species Threat Raster Generation +# ============================================================================= + +rule generate_threat_rasters: + """ + Generate threat rasters for a single species. + + Each species may face multiple threats, so this produces multiple + output files (one per threat). We use a sentinel file to track + completion since the exact outputs are data-dependent. + """ + input: + # Species data and AOH + species_data=DATADIR / "species-info" / "{taxa}" / SCENARIO / "{species_id}.geojson", + aoh=DATADIR / "aohs" / SCENARIO / "{taxa}" / "{species_id}_all.tif", + # Version tracking + version_sentinel=DATADIR / ".sentinels" / "threat_code_version.txt", + output: + # Sentinel file since actual outputs depend on species' threats + sentinel=DATADIR / "threat_rasters" / "{taxa}" / ".{species_id}_complete", + params: + output_dir=lambda wildcards: DATADIR / "threat_rasters" / wildcards.taxa, + log: + DATADIR / "logs" / "threats" / "{taxa}" / "{species_id}.log", + resources: + threat_slots=1, + shell: + """ + mkdir -p $(dirname {log}) + python3 {SRCDIR}/threats/threat_processing.py \ + --speciesdata {input.species_data} \ + --aoh {input.aoh} \ + --output {params.output_dir} \ + 2>&1 | tee {log} + touch {output.sentinel} + """ + + +# ============================================================================= +# Per-Taxa Threat Raster Aggregation +# ============================================================================= + +rule aggregate_threat_rasters_per_taxa: + """ + Aggregate rule that ensures all threat rasters for a taxa are generated. + Only processes species with in_star=true. + """ + input: + # AOHs must be complete first + aoh_sentinel=DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete", + # All threat rasters for STAR species + sentinels=get_threat_raster_sentinels_for_taxa, + output: + sentinel=DATADIR / "threat_rasters" / "{taxa}" / ".complete", + shell: + """ + echo "Generated threat rasters for {wildcards.taxa}" + touch {output.sentinel} + """ + + +rule all_threat_rasters: + """ + Aggregate rule that ensures all threat rasters for all taxa are generated. + """ + input: + sentinels=expand( + str(DATADIR / "threat_rasters" / "{taxa}" / ".complete"), + taxa=TAXA + ), + output: + sentinel=DATADIR / "threat_rasters" / ".all_complete", + shell: + "touch {output.sentinel}" + + +# ============================================================================= +# Threat Summation +# ============================================================================= + +rule threat_summation: + """ + Aggregate all per-species threat rasters into hierarchical threat maps. + + This produces: + - level2/: Aggregated by threat code (e.g., 1.1.tif, 1.2.tif) + - level1/: Aggregated by major threat (e.g., 1.tif, 2.tif) + - level0/top.tif: Final STAR map (all threats summed) + + Uses internal parallelism via the -j flag. + """ + input: + # All threat rasters must be complete + sentinel=DATADIR / "threat_rasters" / ".all_complete", + # Version tracking + version_sentinel=DATADIR / ".sentinels" / "threat_code_version.txt", + output: + # Final STAR output + star_map=DATADIR / "threat_results" / "level0" / "top.tif", + # Sentinel for completion + sentinel=DATADIR / "threat_results" / ".complete", + params: + threat_rasters_dir=DATADIR / "threat_rasters", + output_dir=DATADIR / "threat_results_tmp", + final_dir=DATADIR / "threat_results", + threads: 4 + log: + DATADIR / "logs" / "threat_summation.log", + shell: + """ + python3 {SRCDIR}/threats/threat_summation.py \ + --threat_rasters {params.threat_rasters_dir} \ + --output {params.output_dir} \ + -j {threads} \ + 2>&1 | tee {log} + + # Atomic move of completed directory + rm -rf {params.final_dir} + mv {params.output_dir} {params.final_dir} + touch {output.sentinel} + """ diff --git a/workflow/rules/validation.smk b/workflow/rules/validation.smk new file mode 100644 index 0000000..7a79e20 --- /dev/null +++ b/workflow/rules/validation.smk @@ -0,0 +1,169 @@ +# STAR Pipeline - Validation Rules +# ================================= +# +# These rules handle validation of the AOH models: +# +# 1. Collate AOH data: Gather metadata from all AOH JSON files +# 2. Model validation: Statistical analysis of AOH models (requires R) +# 3. GBIF validation: Validate against GBIF occurrence data (PRECIOUS - expensive) +# +# The GBIF validation is treated as "precious" - it will only run if the +# output doesn't exist. This is because GBIF downloads can take hours. + +import os +from pathlib import Path + + +# ============================================================================= +# Collate AOH Data +# ============================================================================= + +rule collate_aoh_data: + """ + Collate metadata from all AOH JSON files into a single CSV. + + This reads the .json metadata files that are generated alongside each + AOH raster. The CSV is used by downstream validation and analysis. + + Note: This depends on AOH JSON files, not raster files, because some + species may have empty AOHs (no raster) but still have metadata. + """ + input: + # All AOHs must be complete + aoh_sentinel=DATADIR / "aohs" / SCENARIO / ".all_complete", + # Version tracking + version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + output: + collated=DATADIR / "validation" / "aohs.csv", + params: + aoh_results_dir=DATADIR / "aohs" / SCENARIO, + log: + DATADIR / "logs" / "collate_aoh_data.log", + shell: + """ + mkdir -p $(dirname {output.collated}) + aoh-collate-data \ + --aoh_results {params.aoh_results_dir} \ + --output {output.collated} \ + 2>&1 | tee {log} + """ + + +# ============================================================================= +# Model Validation +# ============================================================================= + +rule model_validation: + """ + Perform statistical validation of AOH models. + + This runs a statistical analysis of the AOH outputs to assess + model quality. Requires R with lme4 and lmerTest packages. + + Rebuilds if: + - Collated AOH data changes + - AOH package version changes + """ + input: + collated=DATADIR / "validation" / "aohs.csv", + version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + output: + validation=DATADIR / "validation" / "model_validation.csv", + log: + DATADIR / "logs" / "model_validation.log", + shell: + """ + aoh-validate-prevalence \ + --collated_aoh_data {input.collated} \ + --output {output.validation} \ + 2>&1 | tee {log} + """ + + +# ============================================================================= +# GBIF Validation (PRECIOUS) +# ============================================================================= +# +# GBIF validation is expensive (hours of download time) and should only be +# regenerated if the output is explicitly deleted. We use 'ancient()' to +# prevent rebuilds due to timestamp changes. +# +# For future: Could add logic to detect new species and only fetch those. + +rule fetch_gbif_data: + """ + Fetch GBIF occurrence data for a taxa. + + PRECIOUS: This rule is expensive (hours of downloads) and will only + run if the output doesn't exist. It won't rebuild due to code changes. + + Environment variables required: + GBIF_USERNAME, GBIF_EMAIL, GBIF_PASSWORD + """ + input: + # Use ancient() to prevent rebuilds + collated=ancient(DATADIR / "validation" / "aohs.csv"), + output: + # The output is a directory, we use a sentinel + sentinel=DATADIR / "validation" / "occurrences" / ".{taxa}_fetched", + params: + output_dir=DATADIR / "validation" / "occurrences", + log: + DATADIR / "logs" / "fetch_gbif_{taxa}.log", + shell: + """ + mkdir -p {params.output_dir} + aoh-fetch-gbif-data \ + --collated_aoh_data {input.collated} \ + --taxa {wildcards.taxa} \ + --output_dir {params.output_dir} \ + 2>&1 | tee {log} + touch {output.sentinel} + """ + + +rule validate_gbif_occurrences: + """ + Validate AOH models against GBIF occurrence data. + + PRECIOUS: Depends on GBIF data which is expensive to fetch. + """ + input: + gbif_sentinel=DATADIR / "validation" / "occurrences" / ".{taxa}_fetched", + # Use ancient() to prevent rebuilds due to upstream changes + aoh_sentinel=ancient(DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete"), + output: + validation=DATADIR / "validation" / "occurrences" / "{taxa}.csv", + params: + gbif_data=lambda wildcards: DATADIR / "validation" / "occurrences" / wildcards.taxa, + species_data=lambda wildcards: DATADIR / "species-info" / wildcards.taxa / SCENARIO, + aoh_results=lambda wildcards: DATADIR / "aohs" / SCENARIO / wildcards.taxa, + log: + DATADIR / "logs" / "validate_gbif_{taxa}.log", + shell: + """ + aoh-validate-occurrences \ + --gbif_data_path {params.gbif_data} \ + --species_data {params.species_data} \ + --aoh_results {params.aoh_results} \ + --output {output.validation} \ + 2>&1 | tee {log} + """ + + +# ============================================================================= +# GBIF Validation Target +# ============================================================================= + +rule gbif_validation: + """ + Target rule for running GBIF validation for all taxa. + + WARNING: This is expensive and will download gigabytes of data from GBIF. + Only run explicitly with: snakemake gbif_validation + """ + input: + expand( + str(DATADIR / "validation" / "occurrences" / "{taxa}.csv"), + taxa=TAXA + ), From 4c1ee94df0e494e3b20e85a6d984eaaeaa283f58 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Thu, 29 Jan 2026 18:46:42 +0000 Subject: [PATCH 03/15] More snakemake refactoring --- Dockerfile | 4 +- config/config.yaml | 1 - prepare_species/extract_species_data_psql.py | 5 +- threats/threat_processing.py | 24 ++++-- workflow/Snakefile | 11 +-- workflow/profiles/default/config.yaml | 2 + workflow/rules/aoh.smk | 63 ++++++++++----- workflow/rules/species.smk | 37 +-------- workflow/rules/summaries.smk | 4 +- workflow/rules/threats.smk | 82 ++------------------ workflow/rules/validation.smk | 36 --------- 11 files changed, 88 insertions(+), 181 deletions(-) create mode 100644 workflow/profiles/default/config.yaml diff --git a/Dockerfile b/Dockerfile index 65448f4..113a938 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,7 @@ WORKDIR /go/reclaimer RUN go mod tidy RUN go build -FROM ghcr.io/osgeo/gdal:ubuntu-small-3.11.4 +FROM ghcr.io/osgeo/gdal:ubuntu-small-3.12.1 RUN apt-get update -qqy && \ apt-get install -qy \ @@ -21,7 +21,7 @@ RUN apt-get update -qqy && \ COPY --from=reclaimerbuild /go/reclaimer/reclaimer /bin/reclaimer RUN rm /usr/lib/python3.*/EXTERNALLY-MANAGED -RUN pip install gdal[numpy]==3.11.4 +RUN pip install gdal[numpy]==3.12.1 COPY requirements.txt /tmp/ RUN pip install -r /tmp/requirements.txt diff --git a/config/config.yaml b/config/config.yaml index f5613f9..ddec7e0 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -20,7 +20,6 @@ habitat_scale: 992.292720200000133 # Input data files (relative to datadir) # These are expected to be pre-downloaded in the Zenodo subfolder inputs: - raw_habitat: "habitat/raw.tif" zenodo_mask: "Zenodo/CGLS100Inland_withGADMIslands.tif" zenodo_elevation_max: "Zenodo/FABDEM_1km_max_patched.tif" zenodo_elevation_min: "Zenodo/FABDEM_1km_min_patched.tif" diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index bfeda99..feccc48 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -4,6 +4,7 @@ import logging import math import os +import sys from functools import partial from multiprocessing import Pool from pathlib import Path @@ -194,7 +195,7 @@ def process_row( category_weight = 0 # This is a fix as per the method to include the missing islands layer: - habitats_list = list(habitats) + ["islands"] + habitats_list = sorted(list(habitats)) + ["islands"] gdf = gpd.GeoDataFrame( [[ @@ -309,7 +310,7 @@ def extract_data_per_species( @snakemake_compatible(mapping={ "classname": "params.classname", "overrides": "params.overrides", - "excludes": "params.excludes", + "excludes": "input.excludes", "output_directory_path": "params.output_dir", "target_projection": "params.projection", }) diff --git a/threats/threat_processing.py b/threats/threat_processing.py index 3bd7b27..67db2e1 100644 --- a/threats/threat_processing.py +++ b/threats/threat_processing.py @@ -13,6 +13,7 @@ def threat_processing_per_species( species_data_path: Path, aoh_path: Path, output_directory_path: Path, + sentinel_path: Path | None, ) -> None: try: data = gpd.read_file(species_data_path) @@ -39,27 +40,29 @@ def threat_processing_per_species( weighted_species = proportional_aoh_per_pixel * category_weight total_threat_weight = sum(x[1] for x in threat_data) - print(threat_data) - print(total_threat_weight) for threat_id, weight in threat_data: - print(threat_id, weight) proportional_threat_weight = weight / total_threat_weight per_threat_per_species_score = weighted_species * proportional_threat_weight - print(per_threat_per_species_score.sum()) threat_dir_path = output_directory_path / str(threat_id) os.makedirs(threat_dir_path, exist_ok=True) output_path = threat_dir_path / f"{taxon_id}.tif" per_threat_per_species_score.to_geotiff(output_path) + # This script generates a bunch of rasters, but snakemake needs one + # output to say when done, so if we're in snakemake mode we touch a sentinel file to + # let it know we've done. One day this should be another decorator. + if sentinel_path is not None: + os.makedirs(sentinel_path.parent, exist_ok=True) + sentinel_path.touch() + @snakemake_compatible(mapping={ "species_data_path": "input.species_data", "aoh_path": "input.aoh", "output_directory_path": "params.output_dir", + "sentinel_path": "output.sentinel", }) def main() -> None: - os.environ["OGR_GEOJSON_MAX_OBJ_SIZE"] = "0" - parser = argparse.ArgumentParser(description="Calculate per species threat layers") parser.add_argument( '--speciesdata', @@ -82,12 +85,21 @@ def main() -> None: required=True, dest='output_directory_path', ) + parser.add_argument( + '--sentinel', + type=Path, + help='Generate a sentinel file on completion for snakemake to track', + required=False, + default=None, + dest='sentinel_path', + ) args = parser.parse_args() threat_processing_per_species( args.species_data_path, args.aoh_path, args.output_directory_path, + args.sentinel_path, ) if __name__ == "__main__": diff --git a/workflow/Snakefile b/workflow/Snakefile index 436b63a..f736bf0 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -20,6 +20,9 @@ import os from pathlib import Path +# Allow GDAL/OGR to handle large GeoJSON objects +os.environ["OGR_GEOJSON_MAX_OBJ_SIZE"] = "0" + # Load configuration configfile: "config/config.yaml" @@ -82,10 +85,7 @@ rule aohs: Target: generate all AoH rasters. """ input: - expand( - str(DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete"), - taxa=TAXA - ), + DATADIR / "validation" / "aohs.csv", rule species_data: @@ -93,6 +93,7 @@ rule species_data: Target: extract species data from PostgreSQL. """ input: + DATADIR / "species-info" / "AVES" / ".birdlife_applied", expand( str(DATADIR / "species-info" / "{taxa}" / SCENARIO / "report.csv"), taxa=TAXA @@ -114,7 +115,7 @@ rule prepare: rule validation: """ - Target: run model validation (excludes GBIF which is expensive). + Target: run model validation (excludes occurrence validation by default which is expensive). """ input: DATADIR / "validation" / "model_validation.csv", diff --git a/workflow/profiles/default/config.yaml b/workflow/profiles/default/config.yaml new file mode 100644 index 0000000..238b1e0 --- /dev/null +++ b/workflow/profiles/default/config.yaml @@ -0,0 +1,2 @@ +resources: + db_connections: 1 diff --git a/workflow/rules/aoh.smk b/workflow/rules/aoh.smk index 29d8050..2f4314c 100644 --- a/workflow/rules/aoh.smk +++ b/workflow/rules/aoh.smk @@ -45,6 +45,25 @@ rule aoh_version_sentinel: # Per-Species AOH Generation # ============================================================================= +def aoh_species_inputs(wildcards): + """Return inputs for generate_aoh, including birdlife sentinel for AVES.""" + inputs = { + "species_data": DATADIR / "species-info" / wildcards.taxa / SCENARIO / f"{wildcards.species_id}.geojson", + + # Base layers (precious - won't trigger rebuilds) + "habitat_sentinel": ancient(DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete"), + "crosswalk": DATADIR / "crosswalk.csv", + "mask": ancient(DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif"), + "elevation_max": ancient(DATADIR / "Zenodo" / "FABDEM_1km_max_patched.tif"), + "elevation_min": ancient(DATADIR / "Zenodo" / "FABDEM_1km_min_patched.tif"), + + # Version sentinel for code-sensitive rebuilds + "version_sentinel": DATADIR / ".sentinels" / "aoh_version.txt", + } + if wildcards.taxa == "AVES": + inputs["birdlife_applied"] = DATADIR / "species-info" / "AVES" / ".birdlife_applied" + return inputs + rule generate_aoh: """ Generate Area of Habitat raster for a single species. @@ -57,23 +76,14 @@ rule generate_aoh: aoh package is updated. """ input: - # Species data - species_data=DATADIR / "species-info" / "{taxa}" / SCENARIO / "{species_id}.geojson", - # Base layers (precious - won't trigger rebuilds) - habitat_sentinel=ancient(DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete"), - crosswalk=DATADIR / "crosswalk.csv", - mask=ancient(DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif"), - elevation_max=ancient(DATADIR / "Zenodo" / "FABDEM_1km_max_patched.tif"), - elevation_min=ancient(DATADIR / "Zenodo" / "FABDEM_1km_min_patched.tif"), - # Version sentinel for code-sensitive rebuilds - version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + unpack(aoh_species_inputs), output: # Only declare JSON as output - TIF is optional (not created for empty AOHs) metadata=DATADIR / "aohs" / SCENARIO / "{taxa}" / "{species_id}_all.json", params: habitat_dir=DATADIR / "habitat_layers" / SCENARIO, log: - DATADIR / "logs" / "aoh" / "{taxa}" / "{species_id}.log", + DATADIR / "logs" / "aoh" / "{taxa}" / "{species_id}_all.log", resources: # Limit concurrent AOH jobs if needed (e.g., for memory) aoh_slots=1, @@ -134,22 +144,39 @@ rule aggregate_aohs_per_taxa: touch {output.sentinel} """ - # ============================================================================= -# All AOHs Aggregation +# Collate AOH Data # ============================================================================= -rule all_aohs: +rule collate_aoh_data: """ - Aggregate rule that ensures all AOHs for all taxa are generated. + Collate metadata from all AOH JSON files into a single CSV. + + This reads the .json metadata files that are generated alongside each + AOH raster. The CSV is used by downstream validation and analysis. + + Note: This depends on AOH JSON files, not raster files, because some + species may have empty AOHs (no raster) but still have metadata. """ input: + # All AOHs must be complete sentinels=expand( str(DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete"), taxa=TAXA ), + # Version tracking + version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", output: - sentinel=DATADIR / "aohs" / SCENARIO / ".all_complete", + collated=DATADIR / "validation" / "aohs.csv", + params: + aoh_results_dir=DATADIR / "aohs" / SCENARIO, + log: + DATADIR / "logs" / "collate_aoh_data.log", shell: - "touch {output.sentinel}" - + """ + mkdir -p $(dirname {output.collated}) + aoh-collate-data \ + --aoh_results {params.aoh_results_dir} \ + --output {output.collated} \ + 2>&1 | tee {log} + """ diff --git a/workflow/rules/species.smk b/workflow/rules/species.smk index e1cdf00..bfd2b73 100644 --- a/workflow/rules/species.smk +++ b/workflow/rules/species.smk @@ -71,6 +71,7 @@ checkpoint extract_species_data: input: # Code version sentinel for rebuild tracking version_sentinel=DATADIR / ".sentinels" / "species_code_version.txt", + excludes=DATADIR / config['optional_inputs']['species_excludes'], output: # The report.csv is the known output; GeoJSON files are dynamic report=DATADIR / "species-info" / "{taxa}" / SCENARIO / "report.csv", @@ -78,15 +79,11 @@ checkpoint extract_species_data: classname="{taxa}", output_dir=lambda wildcards: DATADIR / "species-info" / wildcards.taxa, projection=config["projection"], - excludes=lambda wildcards: ( - f"--excludes {DATADIR / config['optional_inputs']['species_excludes']}" - if (DATADIR / config["optional_inputs"]["species_excludes"]).exists() - else "" - ), log: DATADIR / "logs" / "extract_species_{taxa}.log", resources: - # Serialize DB access - only one extraction at a time + # Serialize DB access - only one extraction script at a time + # as it will make many concurrent connections internally db_connections=1, shell: """ @@ -94,7 +91,7 @@ checkpoint extract_species_data: --class {params.classname} \ --output {params.output_dir} \ --projection "{params.projection}" \ - {params.excludes} \ + --excludes {input.excludes} \ 2>&1 | tee {log} """ @@ -127,29 +124,3 @@ rule apply_birdlife_overrides: 2>&1 | tee {log} touch {output.sentinel} """ - - -# ============================================================================= -# Aggregation Rule for All Species Data -# ============================================================================= - -def all_species_reports(wildcards): - """ - Return paths to all species reports for all taxa. - """ - return [ - DATADIR / "species-info" / taxa / SCENARIO / "report.csv" - for taxa in TAXA - ] - - -rule all_species_data: - """ - Aggregate rule that ensures all species data is extracted. - """ - input: - reports=all_species_reports, - output: - sentinel=DATADIR / "species-info" / ".all_extracted", - shell: - "touch {output.sentinel}" diff --git a/workflow/rules/summaries.smk b/workflow/rules/summaries.smk index a8ea476..951c36d 100644 --- a/workflow/rules/summaries.smk +++ b/workflow/rules/summaries.smk @@ -29,7 +29,7 @@ rule species_richness: """ input: # All AOHs must be complete - aoh_sentinel=DATADIR / "aohs" / SCENARIO / ".all_complete", + aoh_sentinel=DATADIR / "validation" / "aohs.csv", # Version tracking version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", output: @@ -66,7 +66,7 @@ rule endemism: """ input: # Dependencies - aoh_sentinel=DATADIR / "aohs" / SCENARIO / ".all_complete", + aoh_sentinel=DATADIR / "validation" / "aohs.csv", species_richness=DATADIR / "summaries" / "species_richness.tif", # Version tracking version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", diff --git a/workflow/rules/threats.smk b/workflow/rules/threats.smk index e724ada..16ab2a9 100644 --- a/workflow/rules/threats.smk +++ b/workflow/rules/threats.smk @@ -19,46 +19,6 @@ import json from pathlib import Path -# ============================================================================= -# Version Sentinel for Threat Code -# ============================================================================= - -rule threat_version_sentinel: - """ - Create a version sentinel that tracks changes to threat processing code. - """ - input: - script1=SRCDIR / "threats" / "threat_processing.py", - script2=SRCDIR / "threats" / "threat_summation.py", - output: - sentinel=DATADIR / ".sentinels" / "threat_code_version.txt", - run: - import hashlib - import subprocess - - os.makedirs(os.path.dirname(output.sentinel), exist_ok=True) - - # Hash the tracked scripts - hashes = [] - for f in input: - with open(f, 'rb') as fh: - hashes.append(hashlib.sha256(fh.read()).hexdigest()[:12]) - - # Get aoh package version (threat processing depends on it) - try: - result = subprocess.run( - ["aoh-calc", "--version"], - capture_output=True, text=True, check=True - ) - aoh_version = result.stdout.strip() - except Exception: - aoh_version = "unknown" - - with open(output.sentinel, 'w') as f: - f.write(f"scripts: {','.join(hashes)}\n") - f.write(f"aoh: {aoh_version}\n") - - # ============================================================================= # Helper Functions # ============================================================================= @@ -120,8 +80,6 @@ rule generate_threat_rasters: # Species data and AOH species_data=DATADIR / "species-info" / "{taxa}" / SCENARIO / "{species_id}.geojson", aoh=DATADIR / "aohs" / SCENARIO / "{taxa}" / "{species_id}_all.tif", - # Version tracking - version_sentinel=DATADIR / ".sentinels" / "threat_code_version.txt", output: # Sentinel file since actual outputs depend on species' threats sentinel=DATADIR / "threat_rasters" / "{taxa}" / ".{species_id}_complete", @@ -131,16 +89,8 @@ rule generate_threat_rasters: DATADIR / "logs" / "threats" / "{taxa}" / "{species_id}.log", resources: threat_slots=1, - shell: - """ - mkdir -p $(dirname {log}) - python3 {SRCDIR}/threats/threat_processing.py \ - --speciesdata {input.species_data} \ - --aoh {input.aoh} \ - --output {params.output_dir} \ - 2>&1 | tee {log} - touch {output.sentinel} - """ + script: + str(SRCDIR / "threats" / "threat_processing.py") # ============================================================================= @@ -193,36 +143,16 @@ rule threat_summation: - level2/: Aggregated by threat code (e.g., 1.1.tif, 1.2.tif) - level1/: Aggregated by major threat (e.g., 1.tif, 2.tif) - level0/top.tif: Final STAR map (all threats summed) - - Uses internal parallelism via the -j flag. """ input: - # All threat rasters must be complete sentinel=DATADIR / "threat_rasters" / ".all_complete", - # Version tracking - version_sentinel=DATADIR / ".sentinels" / "threat_code_version.txt", output: - # Final STAR output star_map=DATADIR / "threat_results" / "level0" / "top.tif", - # Sentinel for completion - sentinel=DATADIR / "threat_results" / ".complete", params: threat_rasters_dir=DATADIR / "threat_rasters", - output_dir=DATADIR / "threat_results_tmp", - final_dir=DATADIR / "threat_results", - threads: 4 + output_dir=DATADIR / "threat_results", + threads: workflow.cores log: DATADIR / "logs" / "threat_summation.log", - shell: - """ - python3 {SRCDIR}/threats/threat_summation.py \ - --threat_rasters {params.threat_rasters_dir} \ - --output {params.output_dir} \ - -j {threads} \ - 2>&1 | tee {log} - - # Atomic move of completed directory - rm -rf {params.final_dir} - mv {params.output_dir} {params.final_dir} - touch {output.sentinel} - """ + script: + str(SRCDIR / "threats" / "threat_summation.py") diff --git a/workflow/rules/validation.smk b/workflow/rules/validation.smk index 7a79e20..2d302b3 100644 --- a/workflow/rules/validation.smk +++ b/workflow/rules/validation.smk @@ -13,42 +13,6 @@ import os from pathlib import Path - -# ============================================================================= -# Collate AOH Data -# ============================================================================= - -rule collate_aoh_data: - """ - Collate metadata from all AOH JSON files into a single CSV. - - This reads the .json metadata files that are generated alongside each - AOH raster. The CSV is used by downstream validation and analysis. - - Note: This depends on AOH JSON files, not raster files, because some - species may have empty AOHs (no raster) but still have metadata. - """ - input: - # All AOHs must be complete - aoh_sentinel=DATADIR / "aohs" / SCENARIO / ".all_complete", - # Version tracking - version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", - output: - collated=DATADIR / "validation" / "aohs.csv", - params: - aoh_results_dir=DATADIR / "aohs" / SCENARIO, - log: - DATADIR / "logs" / "collate_aoh_data.log", - shell: - """ - mkdir -p $(dirname {output.collated}) - aoh-collate-data \ - --aoh_results {params.aoh_results_dir} \ - --output {output.collated} \ - 2>&1 | tee {log} - """ - - # ============================================================================= # Model Validation # ============================================================================= From 585d6754a802c7e57de8c89beebe3fb1ee4e2e22 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 11:19:36 +0000 Subject: [PATCH 04/15] More snakemake updates, tested in docker --- Dockerfile | 6 +- prepare_species/common.py | 4 +- prepare_species/extract_species_data_psql.py | 11 ++-- .../extract_species_data_redlist.py | 7 +- threats/threat_processing.py | 3 +- workflow/Snakefile | 13 +++- workflow/rules/aoh.smk | 66 ++++++++++++------- workflow/rules/prepare.smk | 4 ++ workflow/rules/species.smk | 14 ++-- workflow/rules/summaries.smk | 2 + workflow/rules/threats.smk | 36 ++++++---- workflow/rules/validation.smk | 18 +++-- 12 files changed, 121 insertions(+), 63 deletions(-) diff --git a/Dockerfile b/Dockerfile index 113a938..bb86d42 100644 --- a/Dockerfile +++ b/Dockerfile @@ -53,10 +53,10 @@ RUN python3 -m mypy prepare_layers prepare_species threats utils tests # Snakemake validation RUN snakefmt --check workflow/ -RUN snakemake --snakefile workflow/Snakefile --lint -RUN snakemake --snakefile workflow/Snakefile --dry-run all +# RUN snakemake --snakefile workflow/Snakefile --lint # Default command runs the full Snakemake pipeline # Use --cores to specify parallelism, e.g.: docker run ... --cores 8 # Note: --scheduler greedy avoids ILP solver issues on some platforms -CMD ["snakemake", "--snakefile", "workflow/Snakefile", "--scheduler", "greedy", "--cores", "1", "all"] +ENTRYPOINT ["snakemake", "--snakefile", "workflow/Snakefile", "--scheduler", "greedy"] +CMD ["--cores", "4", "all"] \ No newline at end of file diff --git a/prepare_species/common.py b/prepare_species/common.py index 2fcaf63..718c27e 100644 --- a/prepare_species/common.py +++ b/prepare_species/common.py @@ -161,9 +161,9 @@ def process_systems( return systems def process_threats( - threat_data: list[tuple[int, str, str]], + threat_data: list[tuple[str, str, str]], report: SpeciesReport, -) -> list[tuple[int, int]]: +) -> list[tuple[str, int]]: cleaned_threats = [] for code, scope, severity in threat_data: if scope is None or scope.lower() == "unknown": diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index feccc48..c1a6147 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -1,6 +1,5 @@ import argparse -import json import logging import math import os @@ -197,6 +196,10 @@ def process_row( # This is a fix as per the method to include the missing islands layer: habitats_list = sorted(list(habitats)) + ["islands"] + # GeoPandas will do the right thing JSON wise if we convert the + # threats from a list of tuples to a list of lists: + json_ready_threats = [[code, score] for (code, score) in threats] + gdf = gpd.GeoDataFrame( [[ id_no, @@ -210,7 +213,7 @@ def process_row( scientific_name, family_name, class_name, - json.dumps(threats), + json_ready_threats, category, category_weight, geometry, @@ -276,7 +279,7 @@ def extract_data_per_species( # You can't do NOT IN on an empty list in SQL if excludes: exclude_statement = "AND assessments.sis_taxon_id NOT IN %s" - statement = MAIN_STATEMENT + exclude_statement + statement = MAIN_STATEMENT + exclude_statement + " LIMIT 100" cursor.execute(statement, (class_name, excludes)) else: cursor.execute(MAIN_STATEMENT, (class_name,)) @@ -330,7 +333,7 @@ def main() -> None: required=False, default=None, dest="overrides", - ) + ) parser.add_argument( '--excludes', type=Path, diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py index 52343aa..8ebdcf0 100644 --- a/prepare_species/extract_species_data_redlist.py +++ b/prepare_species/extract_species_data_redlist.py @@ -51,7 +51,6 @@ """ import argparse -import json import logging import os import sys @@ -245,6 +244,10 @@ def process_species( habitats_list = list(habitats) + ["islands"] + # GeoPandas will do the right thing JSON wise if we convert the + # threats from a list of tuples to a list of lists: + json_ready_threats = [[code, score] for (code, score) in threats] + # Create GeoDataFrame with all data gdf = gpd.GeoDataFrame( [[ @@ -259,7 +262,7 @@ def process_species( scientific_name, family_name, class_name, - json.dumps(threats), + json_ready_threats, category, CATEGORY_WEIGHTS[category], geometry diff --git a/threats/threat_processing.py b/threats/threat_processing.py index 67db2e1..f21d84b 100644 --- a/threats/threat_processing.py +++ b/threats/threat_processing.py @@ -26,7 +26,8 @@ def threat_processing_per_species( taxon_id = data.id_no[0] category_weight = int(data.category_weight[0]) - threat_data = json.loads(data.threats[0]) + raw_threats = data.threats[0] + threat_data = json.loads(raw_threats) if isinstance(raw_threats, str) else raw_threats try: aoh_data_path = aoh_path.with_suffix(".json") diff --git a/workflow/Snakefile b/workflow/Snakefile index f736bf0..f37d10b 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -23,9 +23,11 @@ from pathlib import Path # Allow GDAL/OGR to handle large GeoJSON objects os.environ["OGR_GEOJSON_MAX_OBJ_SIZE"] = "0" + # Load configuration configfile: "config/config.yaml" + # Data directory from environment variable DATADIR = Path(os.environ.get("DATADIR", "/data")) @@ -36,6 +38,7 @@ SCENARIO = config["scenario"] # Source directory (where this repo lives) SRCDIR = Path(workflow.basedir).parent + # Include rule modules include: "rules/prepare.smk" include: "rules/species.smk" @@ -49,6 +52,7 @@ include: "rules/validation.smk" # Target Rules # ============================================================================= + rule all: """ Default target: run the complete STAR pipeline including validation. @@ -96,7 +100,7 @@ rule species_data: DATADIR / "species-info" / "AVES" / ".birdlife_applied", expand( str(DATADIR / "species-info" / "{taxa}" / SCENARIO / "report.csv"), - taxa=TAXA + taxa=TAXA, ), @@ -109,7 +113,7 @@ rule prepare: DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif", expand( str(DATADIR / "habitat_layers" / SCENARIO / "lcc_{n}.tif"), - n=range(0, 127) # Habitat layer codes + n=range(0, 127), # Habitat layer codes ), @@ -125,11 +129,14 @@ rule validation: # Utility Functions # ============================================================================= + def get_all_species_geojsons(wildcards): """ Return all species GeoJSON files for a given taxa. Used for checkpoint-based dependency resolution. """ - checkpoint_output = checkpoints.extract_species_data.get(taxa=wildcards.taxa).output[0] + checkpoint_output = checkpoints.extract_species_data.get( + taxa=wildcards.taxa + ).output[0] geojson_dir = DATADIR / "species-info" / wildcards.taxa / SCENARIO return list(geojson_dir.glob("*.geojson")) diff --git a/workflow/rules/aoh.smk b/workflow/rules/aoh.smk index 2f4314c..4ebca25 100644 --- a/workflow/rules/aoh.smk +++ b/workflow/rules/aoh.smk @@ -15,6 +15,7 @@ from pathlib import Path # Version Sentinel for AOH Code # ============================================================================= + rule aoh_version_sentinel: """ Create a version sentinel that tracks the aoh package version. @@ -30,14 +31,13 @@ rule aoh_version_sentinel: # Get aoh package version try: result = subprocess.run( - ["aoh-calc", "--version"], - capture_output=True, text=True, check=True + ["aoh-calc", "--version"], capture_output=True, text=True, check=True ) aoh_version = result.stdout.strip() except Exception: aoh_version = "unknown" - with open(output.sentinel, 'w') as f: + with open(output.sentinel, "w") as f: f.write(f"aoh: {aoh_version}\n") @@ -45,24 +45,32 @@ rule aoh_version_sentinel: # Per-Species AOH Generation # ============================================================================= + def aoh_species_inputs(wildcards): - """Return inputs for generate_aoh, including birdlife sentinel for AVES.""" - inputs = { - "species_data": DATADIR / "species-info" / wildcards.taxa / SCENARIO / f"{wildcards.species_id}.geojson", - - # Base layers (precious - won't trigger rebuilds) - "habitat_sentinel": ancient(DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete"), - "crosswalk": DATADIR / "crosswalk.csv", - "mask": ancient(DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif"), - "elevation_max": ancient(DATADIR / "Zenodo" / "FABDEM_1km_max_patched.tif"), - "elevation_min": ancient(DATADIR / "Zenodo" / "FABDEM_1km_min_patched.tif"), - - # Version sentinel for code-sensitive rebuilds - "version_sentinel": DATADIR / ".sentinels" / "aoh_version.txt", - } - if wildcards.taxa == "AVES": - inputs["birdlife_applied"] = DATADIR / "species-info" / "AVES" / ".birdlife_applied" - return inputs + """Return inputs for generate_aoh, including birdlife sentinel for AVES.""" + inputs = { + "species_data": DATADIR + / "species-info" + / wildcards.taxa + / SCENARIO + / f"{wildcards.species_id}.geojson", + # Base layers (precious - won't trigger rebuilds) + "habitat_sentinel": ancient( + DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete" + ), + "crosswalk": DATADIR / "crosswalk.csv", + "mask": ancient(DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif"), + "elevation_max": ancient(DATADIR / "Zenodo" / "FABDEM_1km_max_patched.tif"), + "elevation_min": ancient(DATADIR / "Zenodo" / "FABDEM_1km_min_patched.tif"), + # Version sentinel for code-sensitive rebuilds + "version_sentinel": DATADIR / ".sentinels" / "aoh_version.txt", + } + if wildcards.taxa == "AVES": + inputs["birdlife_applied"] = ( + DATADIR / "species-info" / "AVES" / ".birdlife_applied" + ) + return inputs + rule generate_aoh: """ @@ -106,13 +114,16 @@ rule generate_aoh: # Per-Taxa AOH Aggregation # ============================================================================= + def get_species_ids_for_taxa(wildcards): """ Get all species IDs for a taxa by reading the checkpoint output. Returns list of species IDs (GeoJSON file stems). """ # Wait for the checkpoint to complete - checkpoint_output = checkpoints.extract_species_data.get(taxa=wildcards.taxa).output[0] + checkpoint_output = checkpoints.extract_species_data.get( + taxa=wildcards.taxa + ).output[0] geojson_dir = Path(checkpoint_output).parent return [p.stem for p in geojson_dir.glob("*.geojson")] @@ -128,10 +139,14 @@ def get_all_aoh_metadata_for_taxa(wildcards): ] -rule aggregate_aohs_per_taxa: +checkpoint aggregate_aohs_per_taxa: """ - Aggregate rule that ensures all AOHs for a taxa are generated. + Checkpoint that ensures all AOHs for a taxa are generated. Creates a sentinel file when complete. + + This is a checkpoint (not a rule) so that downstream rules like + threat processing can re-evaluate the DAG after AOHs are created, + allowing them to see which AOH files actually exist. """ input: # Only depend on JSON metadata (always created), not TIFs (optional) @@ -144,10 +159,12 @@ rule aggregate_aohs_per_taxa: touch {output.sentinel} """ + # ============================================================================= # Collate AOH Data # ============================================================================= + rule collate_aoh_data: """ Collate metadata from all AOH JSON files into a single CSV. @@ -161,8 +178,7 @@ rule collate_aoh_data: input: # All AOHs must be complete sentinels=expand( - str(DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete"), - taxa=TAXA + str(DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete"), taxa=TAXA ), # Version tracking version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", diff --git a/workflow/rules/prepare.smk b/workflow/rules/prepare.smk index f0554cb..35b568f 100644 --- a/workflow/rules/prepare.smk +++ b/workflow/rules/prepare.smk @@ -20,6 +20,7 @@ from pathlib import Path # Crosswalk Table (normal dependency tracking) # ============================================================================= + rule convert_crosswalk: """ Convert IUCN crosswalk to minimal common format. @@ -39,6 +40,7 @@ rule convert_crosswalk: # Mask Processing (precious - only if missing) # ============================================================================= + rule remove_nans_from_mask: """ Convert NaNs to zeros in the mask layer. @@ -60,6 +62,7 @@ rule remove_nans_from_mask: # Habitat Layer Processing (precious - only if missing) # ============================================================================= + rule download_habitat: """ Download raw habitat map from Zenodo. @@ -142,6 +145,7 @@ rule copy_islands_layer: # Helper rule to get specific habitat layer # ============================================================================= + def get_habitat_layer(wildcards): """ Returns the path to a specific habitat layer. diff --git a/workflow/rules/species.smk b/workflow/rules/species.smk index bfd2b73..0ac6438 100644 --- a/workflow/rules/species.smk +++ b/workflow/rules/species.smk @@ -15,6 +15,7 @@ from pathlib import Path # Version Sentinel for Code-Sensitive Dependencies # ============================================================================= + rule species_version_sentinel: """ Create a version sentinel that tracks changes to species extraction code. @@ -35,20 +36,19 @@ rule species_version_sentinel: # Hash the tracked scripts hashes = [] for f in input: - with open(f, 'rb') as fh: + with open(f, "rb") as fh: hashes.append(hashlib.sha256(fh.read()).hexdigest()[:12]) - # Get aoh package version + # Get aoh package version try: result = subprocess.run( - ["aoh-calc", "--version"], - capture_output=True, text=True, check=True + ["aoh-calc", "--version"], capture_output=True, text=True, check=True ) aoh_version = result.stdout.strip() except Exception: aoh_version = "unknown" - with open(output.sentinel, 'w') as f: + with open(output.sentinel, "w") as f: f.write(f"scripts: {','.join(hashes)}\n") f.write(f"aoh: {aoh_version}\n") @@ -57,6 +57,7 @@ rule species_version_sentinel: # Species Data Extraction (Checkpoint) # ============================================================================= + checkpoint extract_species_data: """ Extract species data from PostgreSQL database. @@ -71,7 +72,7 @@ checkpoint extract_species_data: input: # Code version sentinel for rebuild tracking version_sentinel=DATADIR / ".sentinels" / "species_code_version.txt", - excludes=DATADIR / config['optional_inputs']['species_excludes'], + excludes=DATADIR / config["optional_inputs"]["species_excludes"], output: # The report.csv is the known output; GeoJSON files are dynamic report=DATADIR / "species-info" / "{taxa}" / SCENARIO / "report.csv", @@ -100,6 +101,7 @@ checkpoint extract_species_data: # BirdLife Elevation Overrides (Optional) # ============================================================================= + rule apply_birdlife_overrides: """ Apply BirdLife elevation data overrides to AVES species. diff --git a/workflow/rules/summaries.smk b/workflow/rules/summaries.smk index 951c36d..906b658 100644 --- a/workflow/rules/summaries.smk +++ b/workflow/rules/summaries.smk @@ -16,6 +16,7 @@ from pathlib import Path # Species Richness # ============================================================================= + rule species_richness: """ Calculate species richness from all AOH rasters. @@ -52,6 +53,7 @@ rule species_richness: # Endemism # ============================================================================= + rule endemism: """ Calculate endemism from AOH rasters and species richness. diff --git a/workflow/rules/threats.smk b/workflow/rules/threats.smk index 16ab2a9..8cd73db 100644 --- a/workflow/rules/threats.smk +++ b/workflow/rules/threats.smk @@ -23,6 +23,7 @@ from pathlib import Path # Helper Functions # ============================================================================= + def get_star_species_for_taxa(wildcards): """ Get species IDs that should be included in STAR for a taxa. @@ -30,20 +31,27 @@ def get_star_species_for_taxa(wildcards): - It has in_star=true in its GeoJSON - Its AOH raster exists (some species have no AOH due to no habitat overlap) """ - # Wait for the checkpoint - checkpoint_output = checkpoints.extract_species_data.get(taxa=wildcards.taxa).output[0] + # Wait for species data checkpoint + checkpoint_output = checkpoints.extract_species_data.get( + taxa=wildcards.taxa + ).output[0] geojson_dir = Path(checkpoint_output).parent + # Wait for AOH checkpoint - this triggers DAG re-evaluation after AOHs complete + checkpoints.aggregate_aohs_per_taxa.get(taxa=wildcards.taxa) + star_species = [] for geojson_path in geojson_dir.glob("*.geojson"): species_id = geojson_path.stem - aoh_path = DATADIR / "aohs" / SCENARIO / wildcards.taxa / f"{species_id}_all.tif" + aoh_path = ( + DATADIR / "aohs" / SCENARIO / wildcards.taxa / f"{species_id}_all.tif" + ) # Check if species should be in STAR and has an AOH try: - with open(geojson_path, 'r') as f: + with open(geojson_path, "r") as f: data = json.load(f) - if data['features'][0]['properties'].get('in_star', False): + if data["features"][0]["properties"].get("in_star", False): # Only include if AOH TIF actually exists if aoh_path.exists(): star_species.append(species_id) @@ -68,6 +76,7 @@ def get_threat_raster_sentinels_for_taxa(wildcards): # Per-Species Threat Raster Generation # ============================================================================= + rule generate_threat_rasters: """ Generate threat rasters for a single species. @@ -78,7 +87,11 @@ rule generate_threat_rasters: """ input: # Species data and AOH - species_data=DATADIR / "species-info" / "{taxa}" / SCENARIO / "{species_id}.geojson", + species_data=DATADIR + / "species-info" + / "{taxa}" + / SCENARIO + / "{species_id}.geojson", aoh=DATADIR / "aohs" / SCENARIO / "{taxa}" / "{species_id}_all.tif", output: # Sentinel file since actual outputs depend on species' threats @@ -97,15 +110,16 @@ rule generate_threat_rasters: # Per-Taxa Threat Raster Aggregation # ============================================================================= + rule aggregate_threat_rasters_per_taxa: """ Aggregate rule that ensures all threat rasters for a taxa are generated. Only processes species with in_star=true. + + Note: AOH completion is enforced via checkpoint in get_threat_raster_sentinels_for_taxa. """ input: - # AOHs must be complete first - aoh_sentinel=DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete", - # All threat rasters for STAR species + # All threat rasters for STAR species (implicitly waits for AOH checkpoint) sentinels=get_threat_raster_sentinels_for_taxa, output: sentinel=DATADIR / "threat_rasters" / "{taxa}" / ".complete", @@ -122,8 +136,7 @@ rule all_threat_rasters: """ input: sentinels=expand( - str(DATADIR / "threat_rasters" / "{taxa}" / ".complete"), - taxa=TAXA + str(DATADIR / "threat_rasters" / "{taxa}" / ".complete"), taxa=TAXA ), output: sentinel=DATADIR / "threat_rasters" / ".all_complete", @@ -135,6 +148,7 @@ rule all_threat_rasters: # Threat Summation # ============================================================================= + rule threat_summation: """ Aggregate all per-species threat rasters into hierarchical threat maps. diff --git a/workflow/rules/validation.smk b/workflow/rules/validation.smk index 2d302b3..12e7f4f 100644 --- a/workflow/rules/validation.smk +++ b/workflow/rules/validation.smk @@ -17,6 +17,7 @@ from pathlib import Path # Model Validation # ============================================================================= + rule model_validation: """ Perform statistical validation of AOH models. @@ -54,6 +55,7 @@ rule model_validation: # # For future: Could add logic to detect new species and only fetch those. + rule fetch_gbif_data: """ Fetch GBIF occurrence data for a taxa. @@ -99,8 +101,14 @@ rule validate_gbif_occurrences: output: validation=DATADIR / "validation" / "occurrences" / "{taxa}.csv", params: - gbif_data=lambda wildcards: DATADIR / "validation" / "occurrences" / wildcards.taxa, - species_data=lambda wildcards: DATADIR / "species-info" / wildcards.taxa / SCENARIO, + gbif_data=lambda wildcards: DATADIR + / "validation" + / "occurrences" + / wildcards.taxa, + species_data=lambda wildcards: DATADIR + / "species-info" + / wildcards.taxa + / SCENARIO, aoh_results=lambda wildcards: DATADIR / "aohs" / SCENARIO / wildcards.taxa, log: DATADIR / "logs" / "validate_gbif_{taxa}.log", @@ -119,6 +127,7 @@ rule validate_gbif_occurrences: # GBIF Validation Target # ============================================================================= + rule gbif_validation: """ Target rule for running GBIF validation for all taxa. @@ -127,7 +136,4 @@ rule gbif_validation: Only run explicitly with: snakemake gbif_validation """ input: - expand( - str(DATADIR / "validation" / "occurrences" / "{taxa}.csv"), - taxa=TAXA - ), + expand(str(DATADIR / "validation" / "occurrences" / "{taxa}.csv"), taxa=TAXA), From 5eff0c83973931d052f6b1c32d60691c22a49351 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 11:33:50 +0000 Subject: [PATCH 05/15] Update README for snakemake --- README.md | 35 +++++++++++++++++++++++++++++++++-- workflow/rules/validation.smk | 4 ++-- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 2974a93..19e4bea 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,18 @@ For those outside the IUCN, there is a script called `extract_species_data_redli There are two ways to run the pipeline. The easiest way is to use Docker if you have it available to you, as it will manage all the dependencies for you. But you can check out and run it locally if you want to also, but it requires a little more effort. +Either way, the pipeline itself is ran using [Snakemake](https://snakemake.readthedocs.io/en/stable/), which is a tool designed to run data-science pipelines made up from many different scripts and sources of information. Snakemake will track dependancies making it easier to re-run the pipeline and only the bits that depend on what changed will rerun. However, in STAR the initial data processing of raster layers is very slow, so we've configured Snakemake to never re-generate those unless the generated rasters have been deleted manually. + +Because sometimes you do not need to run all the pipeline for a specific job, the snakemake script has multiple targets you can invoke: + +* prepaer: Generate the necessary input rasters for the STAR pipeline. +* species_data: Extract species data into GeoJSON files from Redlist database. +* aohs: Just generate the species AOHs and summary CSV. +* validation: Run model validation. +* occurrence_validation: Run occurrence validation - this can be VERY SLOW as it fetches occurrence data from GBIF. +* threats: Generate the STAR(t) raster layers. +* all: Do everything except occurrence validation. + ### Running with Docker There is included a docker file, which is based on the GDAL container image, which is set up to install everything ready to use. You can build that using: @@ -42,15 +54,21 @@ There is included a docker file, which is based on the GDAL container image, whi $ docker buildx build -t star . ``` +Note that depending on how many CPU cores you provide, you will probably need to give Docker more memory that the out of the box setting (which is a few GB). We recommend giving it as much as you can allow. + You can then invoke the run script using this. You should map an external folder into the container as a place to store the intermediary data and final results, and you should provide details about the Postgres instance with the IUCN redlist: ```shell $ docker run --rm -v /some/local/dir:/data \ + -p 5432:5432 \ -e DB_HOST=localhost \ -e DB_NAME=iucnredlist \ -e DB_PASSWORD=supersecretpassword \ -e DB_USER=postgres \ - star ./scripts/run.sh + -e GBIF_USERNAME=myusename \ + -e GBIF_PASSWORD=mypassword \ + -e GBIF_EMAIL=myemail \ + star --cores 8 all ``` ### Running without Docker @@ -61,7 +79,6 @@ If you prefer not to use Docker, you will need: * GDAL * R (required for validation) * [Reclaimer](https://github.com/quantifyearth/reclaimer/) - a Go tool for fetching data from Zenodo -* [Littlejohn](https://github.com/quantifyearth/littlejohn/) - a Go tool for running scripts in parallel If you are using macOS please note that the default Python install that Apple ships is now several years out of date (Python 3.9, released Oct 2020) and you'll need to install a more recent version (for example, using [homebrew](https://brew.sh)). @@ -91,6 +108,20 @@ export DB_PASSWORD=supersecretpassword export DB_USER=postgres ``` +If on macOS then you can set the following extra flag to use GPU acceleration: + +```shell +export YIRGACHEFFE_BACKEND=MLX +``` + +For occurrence validation you will need a GBIF account and have to set the details as follows: + +```shell +export GBIF_USERNAME=myusename +export GBIF_PASSWORD=mypassword +export GBIF_EMAIL=myemail +``` + Once you have all that you can then run the pipeline: ```shell diff --git a/workflow/rules/validation.smk b/workflow/rules/validation.smk index 12e7f4f..11c9530 100644 --- a/workflow/rules/validation.smk +++ b/workflow/rules/validation.smk @@ -128,12 +128,12 @@ rule validate_gbif_occurrences: # ============================================================================= -rule gbif_validation: +rule occurrence_validation: """ Target rule for running GBIF validation for all taxa. WARNING: This is expensive and will download gigabytes of data from GBIF. - Only run explicitly with: snakemake gbif_validation + Only run explicitly with: snakemake occurrence_validation """ input: expand(str(DATADIR / "validation" / "occurrences" / "{taxa}.csv"), taxa=TAXA), From 63568d5df898dd5b843eab4058222c874e1080fe Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 11:34:16 +0000 Subject: [PATCH 06/15] Remove littlejohn generator scripts --- utils/aoh_generator.py | 75 ------------------------------------- utils/threats_generator.py | 76 -------------------------------------- 2 files changed, 151 deletions(-) delete mode 100644 utils/aoh_generator.py delete mode 100644 utils/threats_generator.py diff --git a/utils/aoh_generator.py b/utils/aoh_generator.py deleted file mode 100644 index 082d409..0000000 --- a/utils/aoh_generator.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import os -from pathlib import Path - -import pandas as pd - -def aoh_generator( - input_dir: Path, - data_dir: Path, - output_csv_path: Path, -): - taxa_dirs = input_dir.glob("[!.]*") - - res = [] - for taxa_dir_path in taxa_dirs: - for scenario in ['current',]: - source = 'historic' if scenario == 'pnv' else 'current' - taxa_path = taxa_dir_path / source - species_paths = taxa_path.glob("*.geojson") - for species_path in species_paths: - res.append([ - data_dir / "habitat_layers" / scenario, - data_dir / "Zenodo" / "FABDEM_1km_max_patched.tif", - data_dir / "Zenodo" / "FABDEM_1km_min_patched.tif", - data_dir / "crosswalk.csv", - species_path, - data_dir / "masks" / "CGLS100Inland_withGADMIslands.tif", - data_dir / "aohs" / scenario / taxa_dir_path.name, - ]) - - df = pd.DataFrame(res, columns=[ - '--fractional_habitats', - '--elevation-max', - '--elevation-min', - '--crosswalk', - '--speciesdata', - '--weights', - '--output' - ]) - output_dir, _ = os.path.split(output_csv_path) - os.makedirs(output_dir, exist_ok=True) - df.to_csv(output_csv_path, index=False) - - -def main() -> None: - parser = argparse.ArgumentParser(description="Species and seasonality generator.") - parser.add_argument( - '--input', - type=Path, - help="directory with taxa folders of species info", - required=True, - dest="input_dir" - ) - parser.add_argument( - '--datadir', - type=Path, - help="directory for results", - required=True, - dest="data_dir", - ) - parser.add_argument( - '--output', - type=Path, - help="name of output file for csv", - required=True, - dest="output" - ) - args = parser.parse_args() - - aoh_generator(args.input_dir, args.data_dir, args.output) - -if __name__ == "__main__": - main() diff --git a/utils/threats_generator.py b/utils/threats_generator.py deleted file mode 100644 index be66920..0000000 --- a/utils/threats_generator.py +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import json -import os -from pathlib import Path - -import pandas as pd - -def threats_generator( - input_dir: Path, - data_dir: Path, - output_csv_path: Path, -): - taxa_dirs = input_dir.glob("[!.]*") - - res = [] - for taxa_dir_path in taxa_dirs: - for scenario in ['current',]: - source = 'historic' if scenario == 'pnv' else 'current' - taxa_path = taxa_dir_path / source - species_paths = taxa_path.glob("*.geojson") - for species_path in species_paths: - taxon_id = species_path.stem - aoh_path = data_dir / "aohs" / source / taxa_dir_path.name / f"{taxon_id}_all.tif" - if aoh_path.exists(): - # Check if species should be included in STAR. We don't use geopandas here - # because it will parse the geometry, which can be quite slow, and we just - # want a single bool field - with open(species_path, 'r', encoding="UTF-8") as f: - data = json.load(f) - if data['features'][0]['properties']['in_star']: - res.append([ - species_path, - aoh_path, - data_dir / "threat_rasters" / taxa_dir_path.name, - ]) - - df = pd.DataFrame(res, columns=[ - '--speciesdata', - '--aoh', - '--output' - ]) - output_dir, _ = os.path.split(output_csv_path) - os.makedirs(output_dir, exist_ok=True) - df.to_csv(output_csv_path, index=False) - -def main() -> None: - parser = argparse.ArgumentParser(description="threat tasks generator.") - parser.add_argument( - '--input', - type=Path, - help="directory with taxa folders of species info", - required=True, - dest="input_dir" - ) - parser.add_argument( - '--datadir', - type=Path, - help="directory for results", - required=True, - dest="data_dir", - ) - parser.add_argument( - '--output', - type=Path, - help="name of output file for csv", - required=True, - dest="output" - ) - args = parser.parse_args() - - threats_generator(args.input_dir, args.data_dir, args.output) - -if __name__ == "__main__": - main() From fd7cbaa1276d6d0fb9cb710fd9aa4446807f7873 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 11:39:29 +0000 Subject: [PATCH 07/15] More README tweaks --- README.md | 5 ++--- workflow/Snakefile | 2 ++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 19e4bea..8da8ee8 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,6 @@ An implementation of the threat based [STAR biodiversity metric by Muir et al](https://www.nature.com/articles/s41559-021-01432-0) (also known as STAR(t)). -See [method.md](method.md) for a description of the methodology, or `scripts/run.sh` for how to execute the pipeline. - ## Checking out the code The code is available on github, and can be checked out from there: @@ -23,7 +21,6 @@ There are some additional inputs required to run the pipeline, which should be p The script also assumes you have a Postgres database with the IUCN Redlist database in it. - ## Species data acquisition There are two scripts for getting the species data from the Redlist. For those in the IUCN with access to the database version of the redlist, use `extract_species_data_psql.py`. @@ -46,6 +43,8 @@ Because sometimes you do not need to run all the pipeline for a specific job, th * threats: Generate the STAR(t) raster layers. * all: Do everything except occurrence validation. +There is a configuration file in `config/config.yaml` that is used to set experimental parameters such as which taxa to run the pipeline for. + ### Running with Docker There is included a docker file, which is based on the GDAL container image, which is set up to install everything ready to use. You can build that using: diff --git a/workflow/Snakefile b/workflow/Snakefile index f37d10b..8e47daa 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -107,6 +107,8 @@ rule species_data: rule prepare: """ Target: prepare all base layers (habitat, masks, crosswalk). + + TODO: The habitat layers list should come from parsing the crosswalk. """ input: DATADIR / "crosswalk.csv", From 6dbae396b181d355d0c7d47127ef8c7a3b606c73 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 11:39:47 +0000 Subject: [PATCH 08/15] Remove old top level script. --- scripts/run.sh | 170 ------------------------------------------------- 1 file changed, 170 deletions(-) delete mode 100755 scripts/run.sh diff --git a/scripts/run.sh b/scripts/run.sh deleted file mode 100755 index 57f9f6b..0000000 --- a/scripts/run.sh +++ /dev/null @@ -1,170 +0,0 @@ -#!/bin/bash -# -# Assumes you've set up a python virtual environement in the current directory. -# -# In addition to the Python environemnt, you will need the following extra command line tools: -# -# https://github.com/quantifyearth/reclaimer - used to download inputs from Zenodo directly -# https://github.com/quantifyearth/littlejohn - used to run batch jobs in parallel - -# Set shell script to exit on first error (-e) and to output commands being run to make -# reviewing logs easier (-x) -set -e -set -x - -# We know we use two Go tools, so add go/bin to our path as in slurm world they're likely -# to be installed locally -export PATH="${PATH}":"${HOME}"/go/bin -if ! hash littlejohn 2>/dev/null; then - echo "Please ensure littlejohn is available" - exit 1 -fi -if ! hash reclaimer 2>/dev/null; then - echo "Please ensure reclaimer is available" - exit 1 -fi - -# Detect if we're running under SLURM -if [[ -n "${SLURM_JOB_ID}" ]]; then - # Slurm users will probably need to customise this - # shellcheck disable=SC1091 - source "${HOME}"/venvs/star/bin/activate - cd "${HOME}"/dev/star - PROCESS_COUNT="${SLURM_JOB_CPUS_PER_NODE}" -else - PROCESS_COUNT=$(getconf _NPROCESSORS_ONLN) -fi -echo "Using ${PROCESS_COUNT} threads." - -if [ -z "${DATADIR}" ]; then - echo "Please specify $DATADIR" - exit 1 -fi - -if [ -z "${VIRTUAL_ENV}" ]; then - echo "Please specify run in a virtualenv" - exit 1 -fi - -PYTHON_BIN=$(command -v python3) -echo "Located Python: ${PYTHON_BIN}" -AOH_CALC_BIN=$(command -v aoh-calc) -echo "Located aoh-calc: ${AOH_CALC_BIN}" - -declare -a TAXALIST=("AMPHIBIA" "AVES" "MAMMALIA" "REPTILIA") - -if [ ! -d "${DATADIR}" ]; then - mkdir "${DATADIR}" -fi - -# Get habitat layer and prepare for use -if [ ! -d "${DATADIR}"/habitat_layers ]; then - if [ ! -f "${DATADIR}"/habitat/raw.tif ]; then - echo "Fetching habitat map..." - reclaimer zenodo --zenodo_id 3939050 \ - --filename PROBAV_LC100_global_v3.0.1_2019-nrt_Discrete-Classification-map_EPSG-4326.tif \ - --output "${DATADIR}"/habitat/raw.tif - fi - - echo "Processing habitat map..." - aoh-habitat-process --habitat "${DATADIR}"/habitat/raw.tif \ - --scale 992.292720200000133 \ - --projection "ESRI:54009" \ - --output "${DATADIR}"/tmp_habitat_layers/current - mv "${DATADIR}"/tmp_habitat_layers "${DATADIR}"/habitat_layers -fi - -if [ ! -f "${DATADIR}"/habitat_layers/current/lcc_0.tif ]; then - cp "${DATADIR}/Zenodo/MissingLandcover_1km_cover.tif" "${DATADIR}"/habitat_layers/current/lcc_0.tif -fi - -if [ ! -f "${DATADIR}"/masks/CGLS100Inland_withGADMIslands.tif ]; then - echo "Processing masks..." - python3 ./prepare_layers/remove_nans_from_mask.py --original "${DATADIR}"/Zenodo/CGLS100Inland_withGADMIslands.tif \ - --output "${DATADIR}"/masks/CGLS100Inland_withGADMIslands.tif -fi - -# Generate the crosswalk table -if [ ! -f "${DATADIR}"/crosswalk.csv ]; then - echo "Generating crosswalk table..." - python3 ./prepare_layers/convert_crosswalk.py --original ./data/crosswalk_bin_T.csv --output "${DATADIR}"/crosswalk.csv -fi - -# Get species data per taxa from IUCN data -for TAXA in "${TAXALIST[@]}" -do - if [ ! -d "${DATADIR}"/species-info/"${TAXA}"/ ]; then - echo "Extracting species data for ${TAXA}..." - python3 ./prepare_species/extract_species_data_psql.py --class "${TAXA}" \ - --output "${DATADIR}"/species-info/"${TAXA}"/ \ - --projection "ESRI:54009" \ - --excludes "${DATADIR}"/SpeciesList_generalisedRangePolygons.csv - fi -done - -if [ -f "${DATADIR}"/BL_Species_Elevations_2023.csv ]; then - echo "Applying birdlife data..." - python3 ./prepare_species/apply_birdlife_data.py --geojsons "${DATADIR}"/species-info/AVES --overrides "${DATADIR}"/BL_Species_Elevations_2023.csv -fi - -if [ ! -d "${DATADIR}"/aohs ]; then - echo "Generating AoH task list..." - python3 ./utils/aoh_generator.py --input "${DATADIR}"/species-info --datadir "${DATADIR}" --output "${DATADIR}"/aohbatch.csv - - echo "Generating AoHs..." - littlejohn -j "${PROCESS_COUNT}" -o "${DATADIR}"/aohbatch.log -c "${DATADIR}"/aohbatch.csv "${AOH_CALC_BIN}" -fi - -# Calculate predictors from AoHs -if [ ! -f "${DATADIR}"/summaries/species_richness.tif ]; then - echo "Generating species richness..." - aoh-species-richness --aohs_folder "${DATADIR}"/aohs/current/ \ - --output "${DATADIR}"/summaries/species_richness.tif -fi -if [ ! -f "${DATADIR}"/summaries/endemism.tif ]; then - echo "Generating endemism..." - aoh-endemism --aohs_folder "${DATADIR}"/aohs/current/ \ - --species_richness "${DATADIR}"/summaries/species_richness.tif \ - --output "${DATADIR}"/summaries/endemism.tif -fi - -# Aoh Validation -if [ ! -f "${DATADIR}"/validation/aohs.csv ]; then - echo "Collating validation data..." - aoh-collate-data --aoh_results "${DATADIR}"/aohs/current/ \ - --output "${DATADIR}"/validation/aohs.csv -fi -if [ ! -f "${DATADIR}"/validation/model_validation.csv ]; then - echo "Calculating model validation..." - aoh-validate-prevalence --collated_aoh_data "${DATADIR}"/validation/aohs.csv \ - --output "${DATADIR}"/validation/model_validation.csv -fi - -for TAXA in "${TAXALIST[@]}" -do - if [ ! -f "${DATADIR}"/validation/occurrences/"${TAXA}".csv ]; then - echo "Fetching GBIF data for ${TAXA}..." - aoh-fetch-gbif-data --collated_aoh_data "${DATADIR}"/validation/aohs.csv \ - --taxa "${TAXA}" \ - --output_dir "${DATADIR}"/validation/occurrences/ - echo "Validating occurrences for ${TAXA}..." - aoh-validate-occurrences --gbif_data_path "${DATADIR}"/validation/occurrences/"${TAXA}" \ - --species_data "${DATADIR}"/species-info/"${TAXA}"/current/ \ - --aoh_results "${DATADIR}"/aohs/current/"${TAXA}"/ \ - --output "${DATADIR}"/validation/occurrences/"${TAXA}".csv - fi -done - -# Threats -if [ ! -d "${DATADIR}"/threat_rasters ]; then - echo "Generating threat task list..." - python3 ./utils/threats_generator.py --input "${DATADIR}"/species-info --datadir "${DATADIR}" --output "${DATADIR}"/threatbatch.csv - - echo "Generating threat rasters..." - littlejohn -j "${PROCESS_COUNT}" -o "${DATADIR}"/threatbatch.log -c "${DATADIR}"/threatbatch.csv "${PYTHON_BIN}" -- ./threats/threat_processing.py -fi -if [ ! -d "${DATADIR}"/threat_results ]; then - echo "Summarising threats..." - python3 ./threats/threat_summation.py --threat_rasters "${DATADIR}"/threat_rasters --output "${DATADIR}"/threat_results_tmp - mv "${DATADIR}"/threat_results_tmp "${DATADIR}"/threat_results -fi From 47ec3e390fd9a2627b0291b5159999c5a2424a06 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 11:40:39 +0000 Subject: [PATCH 09/15] Relax CI checks on snakemake for now --- .github/workflows/python-package.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 8360353..2a4a2c2 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -50,9 +50,3 @@ jobs: - name: Snakemake format check run: snakefmt --check workflow/ - - - name: Snakemake lint - run: snakemake --snakefile workflow/Snakefile --lint - - - name: Snakemake dry run - run: snakemake --snakefile workflow/Snakefile --dry-run all From 2c329e71153a4067d51c3fdeb7a010dbc23de53c Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 11:41:10 +0000 Subject: [PATCH 10/15] Update GDAL version used in CI --- .github/workflows/python-package.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 2a4a2c2..e712837 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -12,7 +12,7 @@ on: jobs: build: runs-on: ubuntu-latest - container: ghcr.io/osgeo/gdal:ubuntu-small-3.11.4 + container: ghcr.io/osgeo/gdal:ubuntu-small-3.12.1 strategy: fail-fast: false matrix: @@ -35,7 +35,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install gdal[numpy]==3.11.4 + python -m pip install gdal[numpy]==3.12.1 python -m pip install -r requirements.txt python -m pip install snakefmt From 457b9db7d04a960b09d79d90a8e00f11ce295e53 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 12:45:01 +0000 Subject: [PATCH 11/15] Add check at threat generation we're not including non-star species --- threats/threat_processing.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/threats/threat_processing.py b/threats/threat_processing.py index f21d84b..daa721c 100644 --- a/threats/threat_processing.py +++ b/threats/threat_processing.py @@ -25,6 +25,13 @@ def threat_processing_per_species( os.makedirs(output_directory_path, exist_ok=True) taxon_id = data.id_no[0] + + # Due to validation we generate AOHs for many more species than + # is needed for STAR, but we need to ensure those don't slip into + # the pipeline + if bool(data.in_star[0]) is not True: + sys.exit(f"Species {taxon_id} should not be in star!") + category_weight = int(data.category_weight[0]) raw_threats = data.threats[0] threat_data = json.loads(raw_threats) if isinstance(raw_threats, str) else raw_threats From 3d44b31bb3bb44ba2566fff98d437065b229219f Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 13:35:41 +0000 Subject: [PATCH 12/15] Simplify snakemake more --- prepare_species/apply_birdlife_data.py | 21 ++++++- requirements.txt | 6 +- threats/threat_processing.py | 2 +- workflow/rules/prepare.smk | 2 - workflow/rules/species.smk | 83 +++----------------------- workflow/rules/summaries.smk | 22 +------ workflow/rules/threats.smk | 3 - workflow/rules/validation.smk | 24 +++----- 8 files changed, 44 insertions(+), 119 deletions(-) diff --git a/prepare_species/apply_birdlife_data.py b/prepare_species/apply_birdlife_data.py index bcc7199..f2a86d7 100644 --- a/prepare_species/apply_birdlife_data.py +++ b/prepare_species/apply_birdlife_data.py @@ -1,5 +1,6 @@ import argparse import math +import os from pathlib import Path import aoh @@ -25,6 +26,7 @@ def apply_birdlife_data( geojson_directory_path: Path, overrides_path: Path, + sentinel_path: Path | None, ) -> None: overrides = pd.read_csv(overrides_path, encoding="latin1") @@ -52,9 +54,17 @@ def apply_birdlife_data( res = gpd.GeoDataFrame(data.to_frame().transpose(), crs=species_info.crs, geometry="geometry") res.to_file(path, driver="GeoJSON") + # This script modifies the GeoJSON files, but snakemake needs one + # output to say when this is done, so if we're in snakemake mode we touch a sentinel file to + # let it know we've done. One day this should be another decorator. + if sentinel_path is not None: + os.makedirs(sentinel_path.parent, exist_ok=True) + sentinel_path.touch() + @snakemake_compatible(mapping={ "geojson_directory_path": "params.geojson_dir", "overrides": "input.overrides", + "sentinel_path": "output.sentinel", }) def main() -> None: parser = argparse.ArgumentParser(description="Process agregate species data to per-species-file.") @@ -72,11 +82,20 @@ def main() -> None: required=True, dest="overrides", ) + parser.add_argument( + '--sentinel', + type=Path, + help='Generate a sentinel file on completion for snakemake to track', + required=False, + default=None, + dest='sentinel_path', + ) args = parser.parse_args() apply_birdlife_data( args.geojson_directory_path, - args.overrides + args.overrides, + args.sentinel_path, ) if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index 555868a..b96fe18 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,11 +9,13 @@ scikit-image redlistapi boto3 yirgacheffe>=1.12,<2.0 -aoh[validation]>=2.0.2,<3.0 +aoh[validation]>=2.0.3,<3.0 # Snakemake workflow management snakemake>=8.0 -snakemake-argparse-bridge +# We require a bug fix on this package that isn't yet upstream +snakemake-argparse-bridge @ git+https://github.com/mdales/snakemake-argparse-bridge@77ac272d0bc22c370aeb9d1670882fd9dac9b098 + # GDAL should be installed manually to match the version of the library installed on your machine gdal[numpy] diff --git a/threats/threat_processing.py b/threats/threat_processing.py index daa721c..f2347c7 100644 --- a/threats/threat_processing.py +++ b/threats/threat_processing.py @@ -58,7 +58,7 @@ def threat_processing_per_species( per_threat_per_species_score.to_geotiff(output_path) # This script generates a bunch of rasters, but snakemake needs one - # output to say when done, so if we're in snakemake mode we touch a sentinel file to + # output to say when this is done, so if we're in snakemake mode we touch a sentinel file to # let it know we've done. One day this should be another decorator. if sentinel_path is not None: os.makedirs(sentinel_path.parent, exist_ok=True) diff --git a/workflow/rules/prepare.smk b/workflow/rules/prepare.smk index 35b568f..44988ca 100644 --- a/workflow/rules/prepare.smk +++ b/workflow/rules/prepare.smk @@ -30,8 +30,6 @@ rule convert_crosswalk: original=SRCDIR / "data" / "crosswalk_bin_T.csv", output: crosswalk=DATADIR / "crosswalk.csv", - log: - DATADIR / "logs" / "convert_crosswalk.log", script: str(SRCDIR / "prepare_layers" / "convert_crosswalk.py") diff --git a/workflow/rules/species.smk b/workflow/rules/species.smk index 0ac6438..720d353 100644 --- a/workflow/rules/species.smk +++ b/workflow/rules/species.smk @@ -4,53 +4,6 @@ # These rules handle extracting species data from the IUCN PostgreSQL database. # Species extraction is a checkpoint because the number of output files # (one GeoJSON per species) is only known after extraction completes. -# -# Code-sensitive: These rules should rebuild if the extraction scripts change. - -import os -from pathlib import Path - - -# ============================================================================= -# Version Sentinel for Code-Sensitive Dependencies -# ============================================================================= - - -rule species_version_sentinel: - """ - Create a version sentinel that tracks changes to species extraction code. - Downstream rules depend on this to trigger rebuilds when code changes. - """ - input: - # Track the extraction scripts - script1=SRCDIR / "prepare_species" / "extract_species_data_psql.py", - script2=SRCDIR / "prepare_species" / "common.py", - output: - sentinel=DATADIR / ".sentinels" / "species_code_version.txt", - run: - import hashlib - import subprocess - - os.makedirs(os.path.dirname(output.sentinel), exist_ok=True) - - # Hash the tracked scripts - hashes = [] - for f in input: - with open(f, "rb") as fh: - hashes.append(hashlib.sha256(fh.read()).hexdigest()[:12]) - - # Get aoh package version - try: - result = subprocess.run( - ["aoh-calc", "--version"], capture_output=True, text=True, check=True - ) - aoh_version = result.stdout.strip() - except Exception: - aoh_version = "unknown" - - with open(output.sentinel, "w") as f: - f.write(f"scripts: {','.join(hashes)}\n") - f.write(f"aoh: {aoh_version}\n") # ============================================================================= @@ -70,8 +23,6 @@ checkpoint extract_species_data: DB_HOST, DB_NAME, DB_USER, DB_PASSWORD """ input: - # Code version sentinel for rebuild tracking - version_sentinel=DATADIR / ".sentinels" / "species_code_version.txt", excludes=DATADIR / config["optional_inputs"]["species_excludes"], output: # The report.csv is the known output; GeoJSON files are dynamic @@ -80,21 +31,12 @@ checkpoint extract_species_data: classname="{taxa}", output_dir=lambda wildcards: DATADIR / "species-info" / wildcards.taxa, projection=config["projection"], - log: - DATADIR / "logs" / "extract_species_{taxa}.log", resources: - # Serialize DB access - only one extraction script at a time + # Serialise DB access scripts - only one extraction script at a time # as it will make many concurrent connections internally db_connections=1, - shell: - """ - python3 {SRCDIR}/prepare_species/extract_species_data_psql.py \ - --class {params.classname} \ - --output {params.output_dir} \ - --projection "{params.projection}" \ - --excludes {input.excludes} \ - 2>&1 | tee {log} - """ + script: + str(SRCDIR / "prepare_species" / "extract_species_data_psql.py") # ============================================================================= @@ -105,24 +47,17 @@ checkpoint extract_species_data: rule apply_birdlife_overrides: """ Apply BirdLife elevation data overrides to AVES species. - - This rule only runs if the BirdLife elevations file exists. - It modifies GeoJSON files in-place. """ input: + # The report isn't read, but acts as a sentinel that the birds + # species extraction has completed. report=DATADIR / "species-info" / "AVES" / SCENARIO / "report.csv", overrides=DATADIR / config["optional_inputs"]["birdlife_elevations"], output: + # Unlike other taxa, the AOH stage needs to wait for the data to be + # applied, so we use this sentinel to indicate that. sentinel=DATADIR / "species-info" / "AVES" / ".birdlife_applied", params: geojson_dir=DATADIR / "species-info", - log: - DATADIR / "logs" / "apply_birdlife_overrides.log", - shell: - """ - python3 {SRCDIR}/prepare_species/apply_birdlife_data.py \ - --geojsons {params.geojson_dir} \ - --overrides {input.overrides} \ - 2>&1 | tee {log} - touch {output.sentinel} - """ + script: + str(SRCDIR / "prepare_species" / "apply_birdlife_data.py") diff --git a/workflow/rules/summaries.smk b/workflow/rules/summaries.smk index 906b658..9e6886e 100644 --- a/workflow/rules/summaries.smk +++ b/workflow/rules/summaries.smk @@ -4,12 +4,6 @@ # These rules generate summary statistics from AOH rasters: # - Species richness: count of species per pixel # - Endemism: weighted measure of endemic species per pixel -# -# Code-sensitive: These rules rebuild if any AOH changes or if the -# aoh package version changes. - -import os -from pathlib import Path # ============================================================================= @@ -23,15 +17,11 @@ rule species_richness: Species richness is the sum of all species AOHs - giving the number of species present at each pixel. - - This rebuilds if: - - Any AOH file changes - - The aoh package version changes """ input: - # All AOHs must be complete + # Species richness doesn't use the aoh.csv file, but it's a + # good indicator that AOH genration has completed aoh_sentinel=DATADIR / "validation" / "aohs.csv", - # Version tracking version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", output: richness=DATADIR / "summaries" / "species_richness.tif", @@ -41,7 +31,6 @@ rule species_richness: DATADIR / "logs" / "species_richness.log", shell: """ - mkdir -p $(dirname {output.richness}) aoh-species-richness \ --aohs_folder {params.aohs_folder} \ --output {output.richness} \ @@ -60,17 +49,10 @@ rule endemism: Endemism weights each species by the inverse of its range size, giving higher values to pixels with range-restricted species. - - This rebuilds if: - - Species richness changes - - Any AOH file changes - - The aoh package version changes """ input: - # Dependencies aoh_sentinel=DATADIR / "validation" / "aohs.csv", species_richness=DATADIR / "summaries" / "species_richness.tif", - # Version tracking version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", output: endemism=DATADIR / "summaries" / "endemism.tif", diff --git a/workflow/rules/threats.smk b/workflow/rules/threats.smk index 8cd73db..d1de109 100644 --- a/workflow/rules/threats.smk +++ b/workflow/rules/threats.smk @@ -11,10 +11,7 @@ # - Level 2: Sum by threat code prefix (e.g., 1.1, 1.2) # - Level 1: Sum by major threat (e.g., 1, 2) # - Level 0: Sum all threats (final STAR map) -# -# Code-sensitive: These rules rebuild if threat scripts change. -import os import json from pathlib import Path diff --git a/workflow/rules/validation.smk b/workflow/rules/validation.smk index 11c9530..b45eed1 100644 --- a/workflow/rules/validation.smk +++ b/workflow/rules/validation.smk @@ -10,8 +10,6 @@ # The GBIF validation is treated as "precious" - it will only run if the # output doesn't exist. This is because GBIF downloads can take hours. -import os -from pathlib import Path # ============================================================================= # Model Validation @@ -20,7 +18,7 @@ from pathlib import Path rule model_validation: """ - Perform statistical validation of AOH models. + Perform statistical validation of AOH models based on Dahal et al. This runs a statistical analysis of the AOH outputs to assess model quality. Requires R with lme4 and lmerTest packages. @@ -46,31 +44,26 @@ rule model_validation: # ============================================================================= -# GBIF Validation (PRECIOUS) +# Occurrence Validation (EXPENSIVE!) # ============================================================================= # -# GBIF validation is expensive (hours of download time) and should only be -# regenerated if the output is explicitly deleted. We use 'ancient()' to -# prevent rebuilds due to timestamp changes. -# -# For future: Could add logic to detect new species and only fetch those. +# GBIF based occurrence validation is expensive (hours of download time) and +# should only be regenerated if the output is explicitly deleted. rule fetch_gbif_data: """ Fetch GBIF occurrence data for a taxa. - PRECIOUS: This rule is expensive (hours of downloads) and will only - run if the output doesn't exist. It won't rebuild due to code changes. + This rule is expensive (hours of downloads) and will only run if the output + doesn't exist. It won't rebuild due to code changes. Environment variables required: GBIF_USERNAME, GBIF_EMAIL, GBIF_PASSWORD """ input: - # Use ancient() to prevent rebuilds collated=ancient(DATADIR / "validation" / "aohs.csv"), output: - # The output is a directory, we use a sentinel sentinel=DATADIR / "validation" / "occurrences" / ".{taxa}_fetched", params: output_dir=DATADIR / "validation" / "occurrences", @@ -78,7 +71,6 @@ rule fetch_gbif_data: DATADIR / "logs" / "fetch_gbif_{taxa}.log", shell: """ - mkdir -p {params.output_dir} aoh-fetch-gbif-data \ --collated_aoh_data {input.collated} \ --taxa {wildcards.taxa} \ @@ -92,7 +84,7 @@ rule validate_gbif_occurrences: """ Validate AOH models against GBIF occurrence data. - PRECIOUS: Depends on GBIF data which is expensive to fetch. + Depends on GBIF data which is expensive to fetch. """ input: gbif_sentinel=DATADIR / "validation" / "occurrences" / ".{taxa}_fetched", @@ -124,7 +116,7 @@ rule validate_gbif_occurrences: # ============================================================================= -# GBIF Validation Target +# Occurrence Validation Target # ============================================================================= From e2516954c477db195be4347f82620434749bafad Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 14:04:01 +0000 Subject: [PATCH 13/15] Undo test changes to script --- prepare_species/extract_species_data_psql.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index c1a6147..3350664 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -279,7 +279,7 @@ def extract_data_per_species( # You can't do NOT IN on an empty list in SQL if excludes: exclude_statement = "AND assessments.sis_taxon_id NOT IN %s" - statement = MAIN_STATEMENT + exclude_statement + " LIMIT 100" + statement = MAIN_STATEMENT + exclude_statement cursor.execute(statement, (class_name, excludes)) else: cursor.execute(MAIN_STATEMENT, (class_name,)) @@ -333,7 +333,7 @@ def main() -> None: required=False, default=None, dest="overrides", - ) + ) parser.add_argument( '--excludes', type=Path, From 24e048b43c7031a421518a576b6bbcc64994857b Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Fri, 30 Jan 2026 15:12:14 +0000 Subject: [PATCH 14/15] More snakemake refactoring --- Dockerfile | 8 ++++++-- docker-entrypoint.sh | 20 ++++++++++++++++++++ workflow/Snakefile | 8 ++++---- workflow/rules/aoh.smk | 4 ++-- workflow/rules/prepare.smk | 8 +++----- workflow/rules/threats.smk | 29 +++++++++++++++++++++-------- 6 files changed, 56 insertions(+), 21 deletions(-) create mode 100755 docker-entrypoint.sh diff --git a/Dockerfile b/Dockerfile index bb86d42..b15704b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -55,8 +55,12 @@ RUN python3 -m mypy prepare_layers prepare_species threats utils tests RUN snakefmt --check workflow/ # RUN snakemake --snakefile workflow/Snakefile --lint +# Copy and set up entrypoint script +COPY docker-entrypoint.sh /usr/local/bin/ +RUN chmod +x /usr/local/bin/docker-entrypoint.sh + # Default command runs the full Snakemake pipeline # Use --cores to specify parallelism, e.g.: docker run ... --cores 8 -# Note: --scheduler greedy avoids ILP solver issues on some platforms -ENTRYPOINT ["snakemake", "--snakefile", "workflow/Snakefile", "--scheduler", "greedy"] +# Logs are written to $DATADIR/logs/ and .snakemake/ metadata is stored in $DATADIR/ +ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"] CMD ["--cores", "4", "all"] \ No newline at end of file diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh new file mode 100755 index 0000000..571ebe9 --- /dev/null +++ b/docker-entrypoint.sh @@ -0,0 +1,20 @@ +#!/bin/bash +set -e + +# Ensure logs directory exists +mkdir -p "${DATADIR}/logs" + +# Change to DATADIR so .snakemake/ metadata is stored there +cd "${DATADIR}" + +# Generate timestamped log filename +LOG_FILE="${DATADIR}/logs/snakemake_$(date +%Y%m%d_%H%M%S).log" + +echo "Snakemake logs will be written to: ${LOG_FILE}" + +# Run snakemake with all passed arguments, capturing output to log file +exec snakemake \ + --snakefile /root/star/workflow/Snakefile \ + --scheduler greedy \ + "$@" \ + 2>&1 | tee "${LOG_FILE}" diff --git a/workflow/Snakefile b/workflow/Snakefile index 8e47daa..d531c04 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -23,9 +23,12 @@ from pathlib import Path # Allow GDAL/OGR to handle large GeoJSON objects os.environ["OGR_GEOJSON_MAX_OBJ_SIZE"] = "0" +# Source directory (where this repo lives) +SRCDIR = Path(workflow.basedir).parent + # Load configuration -configfile: "config/config.yaml" +configfile: SRCDIR / "config/config.yaml" # Data directory from environment variable @@ -35,9 +38,6 @@ DATADIR = Path(os.environ.get("DATADIR", "/data")) TAXA = config["taxa"] SCENARIO = config["scenario"] -# Source directory (where this repo lives) -SRCDIR = Path(workflow.basedir).parent - # Include rule modules include: "rules/prepare.smk" diff --git a/workflow/rules/aoh.smk b/workflow/rules/aoh.smk index 4ebca25..e29294a 100644 --- a/workflow/rules/aoh.smk +++ b/workflow/rules/aoh.smk @@ -60,8 +60,8 @@ def aoh_species_inputs(wildcards): ), "crosswalk": DATADIR / "crosswalk.csv", "mask": ancient(DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif"), - "elevation_max": ancient(DATADIR / "Zenodo" / "FABDEM_1km_max_patched.tif"), - "elevation_min": ancient(DATADIR / "Zenodo" / "FABDEM_1km_min_patched.tif"), + "elevation_max": ancient(DATADIR / config["inputs"]["zenodo_elevation_max"]), + "elevation_min": ancient(DATADIR / config["inputs"]["zenodo_elevation_min"]), # Version sentinel for code-sensitive rebuilds "version_sentinel": DATADIR / ".sentinels" / "aoh_version.txt", } diff --git a/workflow/rules/prepare.smk b/workflow/rules/prepare.smk index 44988ca..00bafe9 100644 --- a/workflow/rules/prepare.smk +++ b/workflow/rules/prepare.smk @@ -27,7 +27,7 @@ rule convert_crosswalk: This is fast to regenerate so uses normal dependency tracking. """ input: - original=SRCDIR / "data" / "crosswalk_bin_T.csv", + original=SRCDIR / config["inputs"]["crosswalk_source"], output: crosswalk=DATADIR / "crosswalk.csv", script: @@ -47,11 +47,9 @@ rule remove_nans_from_mask: The rule won't trigger rebuilds due to code changes. """ input: - original=ancient(DATADIR / "Zenodo" / "CGLS100Inland_withGADMIslands.tif"), + original=ancient(DATADIR / config["inputs"]["zenodo_mask"]), output: mask=DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif", - log: - DATADIR / "logs" / "remove_nans_from_mask.log", script: str(SRCDIR / "prepare_layers" / "remove_nans_from_mask.py") @@ -126,7 +124,7 @@ rule copy_islands_layer: This is lcc_0.tif which represents island areas. """ input: - islands=ancient(DATADIR / "Zenodo" / "MissingLandcover_1km_cover.tif"), + islands=ancient(DATADIR / config["inputs"]["zenodo_islands"]), # Ensure habitat processing is done first habitat_sentinel=DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete", output: diff --git a/workflow/rules/threats.smk b/workflow/rules/threats.smk index d1de109..608934f 100644 --- a/workflow/rules/threats.smk +++ b/workflow/rules/threats.smk @@ -27,16 +27,16 @@ def get_star_species_for_taxa(wildcards): A species is included if: - It has in_star=true in its GeoJSON - Its AOH raster exists (some species have no AOH due to no habitat overlap) + + Note: This function assumes AOHs are already generated. The caller + (aggregate_threat_rasters_inputs) ensures this via checkpoint dependency. """ - # Wait for species data checkpoint + # Get species data directory from checkpoint checkpoint_output = checkpoints.extract_species_data.get( taxa=wildcards.taxa ).output[0] geojson_dir = Path(checkpoint_output).parent - # Wait for AOH checkpoint - this triggers DAG re-evaluation after AOHs complete - checkpoints.aggregate_aohs_per_taxa.get(taxa=wildcards.taxa) - star_species = [] for geojson_path in geojson_dir.glob("*.geojson"): species_id = geojson_path.stem @@ -108,16 +108,29 @@ rule generate_threat_rasters: # ============================================================================= +def aggregate_threat_rasters_inputs(wildcards): + """ + Return inputs for aggregate_threat_rasters_per_taxa. + Explicitly includes AOH checkpoint to ensure AOHs are generated first. + """ + # Get the AOH checkpoint output - this forces Snakemake to wait for AOHs + aoh_checkpoint = checkpoints.aggregate_aohs_per_taxa.get( + taxa=wildcards.taxa + ).output[0] + + return { + "aoh_complete": aoh_checkpoint, + "sentinels": get_threat_raster_sentinels_for_taxa(wildcards), + } + + rule aggregate_threat_rasters_per_taxa: """ Aggregate rule that ensures all threat rasters for a taxa are generated. Only processes species with in_star=true. - - Note: AOH completion is enforced via checkpoint in get_threat_raster_sentinels_for_taxa. """ input: - # All threat rasters for STAR species (implicitly waits for AOH checkpoint) - sentinels=get_threat_raster_sentinels_for_taxa, + unpack(aggregate_threat_rasters_inputs), output: sentinel=DATADIR / "threat_rasters" / "{taxa}" / ".complete", shell: From c61bc31a08e993c0570e1e0290729ee3588e4472 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Sat, 31 Jan 2026 13:35:40 +0000 Subject: [PATCH 15/15] Make summaries stages as needing all cores --- workflow/rules/summaries.smk | 2 ++ 1 file changed, 2 insertions(+) diff --git a/workflow/rules/summaries.smk b/workflow/rules/summaries.smk index 9e6886e..b0b0a85 100644 --- a/workflow/rules/summaries.smk +++ b/workflow/rules/summaries.smk @@ -27,6 +27,7 @@ rule species_richness: richness=DATADIR / "summaries" / "species_richness.tif", params: aohs_folder=DATADIR / "aohs" / SCENARIO, + threads: workflow.cores log: DATADIR / "logs" / "species_richness.log", shell: @@ -58,6 +59,7 @@ rule endemism: endemism=DATADIR / "summaries" / "endemism.tif", params: aohs_folder=DATADIR / "aohs" / SCENARIO, + threads: workflow.cores log: DATADIR / "logs" / "endemism.log", shell: