diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f1baeec..e712837 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -12,7 +12,7 @@ on: jobs: build: runs-on: ubuntu-latest - container: ghcr.io/osgeo/gdal:ubuntu-small-3.11.4 + container: ghcr.io/osgeo/gdal:ubuntu-small-3.12.1 strategy: fail-fast: false matrix: @@ -22,7 +22,7 @@ jobs: - name: Install system run: | apt-get update -qqy - apt-get install -y git python3-pip libpq5 libpq-dev r-base libtirpc-dev shellcheck + apt-get install -y git python3-pip libpq5 libpq-dev r-base libtirpc-dev - uses: actions/checkout@v4 with: submodules: 'true' @@ -35,8 +35,9 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install gdal[numpy]==3.11.4 + python -m pip install gdal[numpy]==3.12.1 python -m pip install -r requirements.txt + python -m pip install snakefmt - name: Lint with pylint run: python3 -m pylint utils prepare_layers prepare_species threats @@ -47,6 +48,5 @@ jobs: - name: Tests run: python3 -m pytest ./tests - - name: Script checks - run: | - shellcheck ./scripts/run.sh + - name: Snakemake format check + run: snakefmt --check workflow/ diff --git a/.gitignore b/.gitignore index 68bc17f..526d6b5 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ __pycache__/ *.py[cod] *$py.class +.snakemake/ + # C extensions *.so diff --git a/Dockerfile b/Dockerfile index 913f289..b15704b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,23 +1,17 @@ +# Build stage for reclaimer (used to download from Zenodo) FROM golang:latest AS reclaimerbuild RUN git clone https://github.com/quantifyearth/reclaimer.git WORKDIR /go/reclaimer RUN go mod tidy RUN go build -FROM golang:latest AS littlejohnbuild -RUN git clone https://github.com/quantifyearth/littlejohn.git -WORKDIR /go/littlejohn -RUN go mod tidy -RUN go build - -FROM ghcr.io/osgeo/gdal:ubuntu-small-3.11.4 +FROM ghcr.io/osgeo/gdal:ubuntu-small-3.12.1 RUN apt-get update -qqy && \ apt-get install -qy \ git \ cmake \ python3-pip \ - shellcheck \ r-base \ libpq-dev \ libtirpc-dev \ @@ -25,21 +19,22 @@ RUN apt-get update -qqy && \ && rm -rf /var/cache/apt/* COPY --from=reclaimerbuild /go/reclaimer/reclaimer /bin/reclaimer -COPY --from=littlejohnbuild /go/littlejohn/littlejohn /bin/littlejohn RUN rm /usr/lib/python3.*/EXTERNALLY-MANAGED -RUN pip install gdal[numpy]==3.11.4 +RUN pip install gdal[numpy]==3.12.1 COPY requirements.txt /tmp/ RUN pip install -r /tmp/requirements.txt +# Snakemake linting/formatting tools +RUN pip install snakefmt + RUN mkdir /root/R ENV R_LIBS_USER=/root/R RUN Rscript -e 'install.packages(c("lme4","lmerTest","emmeans"), repos="https://cloud.r-project.org")' COPY ./ /root/star WORKDIR /root/star -RUN chmod 755 ./scripts/run.sh # We create a DATADIR - this should be mapped at container creation # time to a volume somewhere else @@ -53,6 +48,19 @@ ENV VIRTUAL_ENV=/usr ENV PYTHONPATH=/root/star RUN python3 -m pytest ./tests -RUN python3 -m pylint prepare_layers prepare_species utils tests -RUN python3 -m mypy prepare_layers prepare_species utils tests -RUN shellcheck ./scripts/run.sh +RUN python3 -m pylint prepare_layers prepare_species threats utils tests +RUN python3 -m mypy prepare_layers prepare_species threats utils tests + +# Snakemake validation +RUN snakefmt --check workflow/ +# RUN snakemake --snakefile workflow/Snakefile --lint + +# Copy and set up entrypoint script +COPY docker-entrypoint.sh /usr/local/bin/ +RUN chmod +x /usr/local/bin/docker-entrypoint.sh + +# Default command runs the full Snakemake pipeline +# Use --cores to specify parallelism, e.g.: docker run ... --cores 8 +# Logs are written to $DATADIR/logs/ and .snakemake/ metadata is stored in $DATADIR/ +ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"] +CMD ["--cores", "4", "all"] \ No newline at end of file diff --git a/README.md b/README.md index 2974a93..8da8ee8 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,6 @@ An implementation of the threat based [STAR biodiversity metric by Muir et al](https://www.nature.com/articles/s41559-021-01432-0) (also known as STAR(t)). -See [method.md](method.md) for a description of the methodology, or `scripts/run.sh` for how to execute the pipeline. - ## Checking out the code The code is available on github, and can be checked out from there: @@ -23,7 +21,6 @@ There are some additional inputs required to run the pipeline, which should be p The script also assumes you have a Postgres database with the IUCN Redlist database in it. - ## Species data acquisition There are two scripts for getting the species data from the Redlist. For those in the IUCN with access to the database version of the redlist, use `extract_species_data_psql.py`. @@ -34,6 +31,20 @@ For those outside the IUCN, there is a script called `extract_species_data_redli There are two ways to run the pipeline. The easiest way is to use Docker if you have it available to you, as it will manage all the dependencies for you. But you can check out and run it locally if you want to also, but it requires a little more effort. +Either way, the pipeline itself is ran using [Snakemake](https://snakemake.readthedocs.io/en/stable/), which is a tool designed to run data-science pipelines made up from many different scripts and sources of information. Snakemake will track dependancies making it easier to re-run the pipeline and only the bits that depend on what changed will rerun. However, in STAR the initial data processing of raster layers is very slow, so we've configured Snakemake to never re-generate those unless the generated rasters have been deleted manually. + +Because sometimes you do not need to run all the pipeline for a specific job, the snakemake script has multiple targets you can invoke: + +* prepaer: Generate the necessary input rasters for the STAR pipeline. +* species_data: Extract species data into GeoJSON files from Redlist database. +* aohs: Just generate the species AOHs and summary CSV. +* validation: Run model validation. +* occurrence_validation: Run occurrence validation - this can be VERY SLOW as it fetches occurrence data from GBIF. +* threats: Generate the STAR(t) raster layers. +* all: Do everything except occurrence validation. + +There is a configuration file in `config/config.yaml` that is used to set experimental parameters such as which taxa to run the pipeline for. + ### Running with Docker There is included a docker file, which is based on the GDAL container image, which is set up to install everything ready to use. You can build that using: @@ -42,15 +53,21 @@ There is included a docker file, which is based on the GDAL container image, whi $ docker buildx build -t star . ``` +Note that depending on how many CPU cores you provide, you will probably need to give Docker more memory that the out of the box setting (which is a few GB). We recommend giving it as much as you can allow. + You can then invoke the run script using this. You should map an external folder into the container as a place to store the intermediary data and final results, and you should provide details about the Postgres instance with the IUCN redlist: ```shell $ docker run --rm -v /some/local/dir:/data \ + -p 5432:5432 \ -e DB_HOST=localhost \ -e DB_NAME=iucnredlist \ -e DB_PASSWORD=supersecretpassword \ -e DB_USER=postgres \ - star ./scripts/run.sh + -e GBIF_USERNAME=myusename \ + -e GBIF_PASSWORD=mypassword \ + -e GBIF_EMAIL=myemail \ + star --cores 8 all ``` ### Running without Docker @@ -61,7 +78,6 @@ If you prefer not to use Docker, you will need: * GDAL * R (required for validation) * [Reclaimer](https://github.com/quantifyearth/reclaimer/) - a Go tool for fetching data from Zenodo -* [Littlejohn](https://github.com/quantifyearth/littlejohn/) - a Go tool for running scripts in parallel If you are using macOS please note that the default Python install that Apple ships is now several years out of date (Python 3.9, released Oct 2020) and you'll need to install a more recent version (for example, using [homebrew](https://brew.sh)). @@ -91,6 +107,20 @@ export DB_PASSWORD=supersecretpassword export DB_USER=postgres ``` +If on macOS then you can set the following extra flag to use GPU acceleration: + +```shell +export YIRGACHEFFE_BACKEND=MLX +``` + +For occurrence validation you will need a GBIF account and have to set the details as follows: + +```shell +export GBIF_USERNAME=myusename +export GBIF_PASSWORD=mypassword +export GBIF_EMAIL=myemail +``` + Once you have all that you can then run the pipeline: ```shell diff --git a/config/config.yaml b/config/config.yaml new file mode 100644 index 0000000..ddec7e0 --- /dev/null +++ b/config/config.yaml @@ -0,0 +1,37 @@ +# STAR Pipeline Configuration +# =========================== + +# Taxonomic classes to process +taxa: + - AMPHIBIA + - AVES + - MAMMALIA + - REPTILIA + +# Scenario for habitat layers (for future expansion) +scenario: current + +# Projection used throughout the pipeline +projection: "ESRI:54009" + +# Scale for habitat processing (meters) +habitat_scale: 992.292720200000133 + +# Input data files (relative to datadir) +# These are expected to be pre-downloaded in the Zenodo subfolder +inputs: + zenodo_mask: "Zenodo/CGLS100Inland_withGADMIslands.tif" + zenodo_elevation_max: "Zenodo/FABDEM_1km_max_patched.tif" + zenodo_elevation_min: "Zenodo/FABDEM_1km_min_patched.tif" + zenodo_islands: "Zenodo/MissingLandcover_1km_cover.tif" + crosswalk_source: "data/crosswalk_bin_T.csv" + +# Optional input files (pipeline will check if these exist) +optional_inputs: + birdlife_elevations: "BL_Species_Elevations_2023.csv" + species_excludes: "SpeciesList_generalisedRangePolygons.csv" + +# Zenodo configuration for downloading raw habitat +zenodo: + habitat_id: 3939050 + habitat_filename: "PROBAV_LC100_global_v3.0.1_2019-nrt_Discrete-Classification-map_EPSG-4326.tif" diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh new file mode 100755 index 0000000..571ebe9 --- /dev/null +++ b/docker-entrypoint.sh @@ -0,0 +1,20 @@ +#!/bin/bash +set -e + +# Ensure logs directory exists +mkdir -p "${DATADIR}/logs" + +# Change to DATADIR so .snakemake/ metadata is stored there +cd "${DATADIR}" + +# Generate timestamped log filename +LOG_FILE="${DATADIR}/logs/snakemake_$(date +%Y%m%d_%H%M%S).log" + +echo "Snakemake logs will be written to: ${LOG_FILE}" + +# Run snakemake with all passed arguments, capturing output to log file +exec snakemake \ + --snakefile /root/star/workflow/Snakefile \ + --scheduler greedy \ + "$@" \ + 2>&1 | tee "${LOG_FILE}" diff --git a/prepare_layers/convert_crosswalk.py b/prepare_layers/convert_crosswalk.py index 83aa8d7..cbd36db 100644 --- a/prepare_layers/convert_crosswalk.py +++ b/prepare_layers/convert_crosswalk.py @@ -2,6 +2,7 @@ from pathlib import Path import pandas as pd +from snakemake_argparse_bridge import snakemake_compatible # Take from https://www.iucnredlist.org/resources/habitat-classification-scheme IUCN_HABITAT_CODES = { @@ -57,6 +58,10 @@ def convert_crosswalk( df = pd.DataFrame(res, columns=["code", "value"]) df.to_csv(output_path, index=False) +@snakemake_compatible(mapping={ + "original_path": "input.original", + "output_path": "output.crosswalk", +}) def main() -> None: parser = argparse.ArgumentParser(description="Convert IUCN crosswalk to minimal common format.") parser.add_argument( diff --git a/prepare_layers/remove_nans_from_mask.py b/prepare_layers/remove_nans_from_mask.py index 180cd9f..0300312 100644 --- a/prepare_layers/remove_nans_from_mask.py +++ b/prepare_layers/remove_nans_from_mask.py @@ -3,6 +3,7 @@ from pathlib import Path import yirgacheffe as yg +from snakemake_argparse_bridge import snakemake_compatible def remove_nans_from_mask( input_path: Path, @@ -13,6 +14,10 @@ def remove_nans_from_mask( converted = layer.nan_to_num() converted.to_geotiff(output_path) +@snakemake_compatible(mapping={ + "original_path": "input.original", + "output_path": "output.mask", +}) def main() -> None: parser = argparse.ArgumentParser(description="Convert NaNs to zeros in mask layers") parser.add_argument( diff --git a/prepare_species/apply_birdlife_data.py b/prepare_species/apply_birdlife_data.py index da65746..f2a86d7 100644 --- a/prepare_species/apply_birdlife_data.py +++ b/prepare_species/apply_birdlife_data.py @@ -1,10 +1,12 @@ import argparse import math +import os from pathlib import Path import aoh import geopandas as gpd import pandas as pd +from snakemake_argparse_bridge import snakemake_compatible # Columns from current BirdLife data overrides: # SIS ID @@ -24,6 +26,7 @@ def apply_birdlife_data( geojson_directory_path: Path, overrides_path: Path, + sentinel_path: Path | None, ) -> None: overrides = pd.read_csv(overrides_path, encoding="latin1") @@ -51,6 +54,18 @@ def apply_birdlife_data( res = gpd.GeoDataFrame(data.to_frame().transpose(), crs=species_info.crs, geometry="geometry") res.to_file(path, driver="GeoJSON") + # This script modifies the GeoJSON files, but snakemake needs one + # output to say when this is done, so if we're in snakemake mode we touch a sentinel file to + # let it know we've done. One day this should be another decorator. + if sentinel_path is not None: + os.makedirs(sentinel_path.parent, exist_ok=True) + sentinel_path.touch() + +@snakemake_compatible(mapping={ + "geojson_directory_path": "params.geojson_dir", + "overrides": "input.overrides", + "sentinel_path": "output.sentinel", +}) def main() -> None: parser = argparse.ArgumentParser(description="Process agregate species data to per-species-file.") parser.add_argument( @@ -67,11 +82,20 @@ def main() -> None: required=True, dest="overrides", ) + parser.add_argument( + '--sentinel', + type=Path, + help='Generate a sentinel file on completion for snakemake to track', + required=False, + default=None, + dest='sentinel_path', + ) args = parser.parse_args() apply_birdlife_data( args.geojson_directory_path, - args.overrides + args.overrides, + args.sentinel_path, ) if __name__ == "__main__": diff --git a/prepare_species/common.py b/prepare_species/common.py index 2fcaf63..718c27e 100644 --- a/prepare_species/common.py +++ b/prepare_species/common.py @@ -161,9 +161,9 @@ def process_systems( return systems def process_threats( - threat_data: list[tuple[int, str, str]], + threat_data: list[tuple[str, str, str]], report: SpeciesReport, -) -> list[tuple[int, int]]: +) -> list[tuple[str, int]]: cleaned_threats = [] for code, scope, severity in threat_data: if scope is None or scope.lower() == "unknown": diff --git a/prepare_species/extract_species_data_psql.py b/prepare_species/extract_species_data_psql.py index d13b7cf..3350664 100644 --- a/prepare_species/extract_species_data_psql.py +++ b/prepare_species/extract_species_data_psql.py @@ -1,9 +1,9 @@ import argparse -import json import logging import math import os +import sys from functools import partial from multiprocessing import Pool from pathlib import Path @@ -14,6 +14,7 @@ import psycopg2 import shapely from postgis.psycopg import register +from snakemake_argparse_bridge import snakemake_compatible from common import ( CATEGORY_WEIGHTS, @@ -193,7 +194,11 @@ def process_row( category_weight = 0 # This is a fix as per the method to include the missing islands layer: - habitats_list = list(habitats) + ["islands"] + habitats_list = sorted(list(habitats)) + ["islands"] + + # GeoPandas will do the right thing JSON wise if we convert the + # threats from a list of tuples to a list of lists: + json_ready_threats = [[code, score] for (code, score) in threats] gdf = gpd.GeoDataFrame( [[ @@ -208,7 +213,7 @@ def process_row( scientific_name, family_name, class_name, - json.dumps(threats), + json_ready_threats, category, category_weight, geometry, @@ -289,11 +294,14 @@ def extract_data_per_species( results = apply_overrides(overrides_path, results) # The limiting amount here is how many concurrent connections the database can take - with Pool(processes=20) as pool: - reports = pool.map( - partial(process_row, class_name, era_output_directory_path, target_projection, presence), - results - ) + try: + with Pool(processes=20) as pool: + reports = pool.map( + partial(process_row, class_name, era_output_directory_path, target_projection, presence), + results + ) + except psycopg2.OperationalError: + sys.exit("Database connection failed for some rows, aborting") reports_df = pd.DataFrame( [x.as_row() for x in reports], @@ -302,6 +310,13 @@ def extract_data_per_species( os.makedirs(era_output_directory_path, exist_ok=True) reports_df.to_csv(era_output_directory_path / "report.csv", index=False) +@snakemake_compatible(mapping={ + "classname": "params.classname", + "overrides": "params.overrides", + "excludes": "input.excludes", + "output_directory_path": "params.output_dir", + "target_projection": "params.projection", +}) def main() -> None: parser = argparse.ArgumentParser(description="Process agregate species data to per-species-file.") parser.add_argument( @@ -316,6 +331,7 @@ def main() -> None: type=Path, help="CSV of overrides", required=False, + default=None, dest="overrides", ) parser.add_argument( @@ -323,6 +339,7 @@ def main() -> None: type=Path, help="CSV of taxon IDs to not include", required=False, + default=None, dest="excludes" ) parser.add_argument( diff --git a/prepare_species/extract_species_data_redlist.py b/prepare_species/extract_species_data_redlist.py index 52343aa..8ebdcf0 100644 --- a/prepare_species/extract_species_data_redlist.py +++ b/prepare_species/extract_species_data_redlist.py @@ -51,7 +51,6 @@ """ import argparse -import json import logging import os import sys @@ -245,6 +244,10 @@ def process_species( habitats_list = list(habitats) + ["islands"] + # GeoPandas will do the right thing JSON wise if we convert the + # threats from a list of tuples to a list of lists: + json_ready_threats = [[code, score] for (code, score) in threats] + # Create GeoDataFrame with all data gdf = gpd.GeoDataFrame( [[ @@ -259,7 +262,7 @@ def process_species( scientific_name, family_name, class_name, - json.dumps(threats), + json_ready_threats, category, CATEGORY_WEIGHTS[category], geometry diff --git a/requirements.txt b/requirements.txt index c760a2c..b96fe18 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,13 @@ scikit-image redlistapi boto3 yirgacheffe>=1.12,<2.0 -aoh[validation]>=2.0.2,<3.0 +aoh[validation]>=2.0.3,<3.0 + +# Snakemake workflow management +snakemake>=8.0 +# We require a bug fix on this package that isn't yet upstream +snakemake-argparse-bridge @ git+https://github.com/mdales/snakemake-argparse-bridge@77ac272d0bc22c370aeb9d1670882fd9dac9b098 + # GDAL should be installed manually to match the version of the library installed on your machine gdal[numpy] diff --git a/scripts/run.sh b/scripts/run.sh deleted file mode 100755 index 71a060a..0000000 --- a/scripts/run.sh +++ /dev/null @@ -1,153 +0,0 @@ -#!/bin/bash -# -# Assumes you've set up a python virtual environement in the current directory. -# -# In addition to the Python environemnt, you will need the following extra command line tools: -# -# https://github.com/quantifyearth/reclaimer - used to download inputs from Zenodo directly -# https://github.com/quantifyearth/littlejohn - used to run batch jobs in parallel - -# Set shell script to exit on first error (-e) and to output commands being run to make -# reviewing logs easier (-x) -set -e -set -x - -# We know we use two Go tools, so add go/bin to our path as in slurm world they're likely -# to be installed locally -export PATH="${PATH}":"${HOME}"/go/bin -if ! hash littlejohn 2>/dev/null; then - echo "Please ensure littlejohn is available" - exit 1 -fi -if ! hash reclaimer 2>/dev/null; then - echo "Please ensure reclaimer is available" - exit 1 -fi - -# Detect if we're running under SLURM -if [[ -n "${SLURM_JOB_ID}" ]]; then - # Slurm users will probably need to customise this - # shellcheck disable=SC1091 - source "${HOME}"/venvs/star/bin/activate - cd "${HOME}"/dev/star - PROCESS_COUNT="${SLURM_JOB_CPUS_PER_NODE}" -else - PROCESS_COUNT=$(getconf _NPROCESSORS_ONLN) -fi -echo "Using ${PROCESS_COUNT} threads." - -if [ -z "${DATADIR}" ]; then - echo "Please specify $DATADIR" - exit 1 -fi - -if [ -z "${VIRTUAL_ENV}" ]; then - echo "Please specify run in a virtualenv" - exit 1 -fi - -PYTHON_BIN=$(command -v python3) -echo "Located Python: ${PYTHON_BIN}" -AOH_CALC_BIN=$(command -v aoh-calc) -echo "Located aoh-calc: ${AOH_CALC_BIN}" - -declare -a TAXALIST=("AMPHIBIA" "AVES" "MAMMALIA" "REPTILIA") - -if [ ! -d "${DATADIR}" ]; then - mkdir "${DATADIR}" -fi - -# Get habitat layer and prepare for use -if [ ! -d "${DATADIR}"/habitat_layers ]; then - if [ ! -f "${DATADIR}"/habitat/raw.tif ]; then - echo "Fetching habitat map..." - reclaimer zenodo --zenodo_id 3939050 \ - --filename PROBAV_LC100_global_v3.0.1_2019-nrt_Discrete-Classification-map_EPSG-4326.tif \ - --output "${DATADIR}"/habitat/raw.tif - fi - - echo "Processing habitat map..." - aoh-habitat-process --habitat "${DATADIR}"/habitat/raw.tif \ - --scale 992.292720200000133 \ - --projection "ESRI:54009" \ - --output "${DATADIR}"/tmp_habitat_layers/current - mv "${DATADIR}"/tmp_habitat_layers "${DATADIR}"/habitat_layers -fi - -if [ ! -f "${DATADIR}"/habitat_layers/current/lcc_0.tif ]; then - cp "${DATADIR}/Zenodo/MissingLandcover_1km_cover.tif" "${DATADIR}"/habitat_layers/current/lcc_0.tif -fi - -if [ ! -f "${DATADIR}"/masks/CGLS100Inland_withGADMIslands.tif ]; then - echo "Processing masks..." - python3 ./prepare_layers/remove_nans_from_mask.py --original "${DATADIR}"/Zenodo/CGLS100Inland_withGADMIslands.tif \ - --output "${DATADIR}"/masks/CGLS100Inland_withGADMIslands.tif -fi - -# Generate the crosswalk table -if [ ! -f "${DATADIR}"/crosswalk.csv ]; then - echo "Generating crosswalk table..." - python3 ./prepare_layers/convert_crosswalk.py --original ./data/crosswalk_bin_T.csv --output "${DATADIR}"/crosswalk.csv -fi - -# Get species data per taxa from IUCN data -for TAXA in "${TAXALIST[@]}" -do - if [ ! -d "${DATADIR}"/species-info/"${TAXA}"/ ]; then - echo "Extracting species data for ${TAXA}..." - python3 ./prepare_species/extract_species_data_psql.py --class "${TAXA}" \ - --output "${DATADIR}"/species-info/"${TAXA}"/ \ - --projection "ESRI:54009" \ - --excludes "${DATADIR}"/SpeciesList_generalisedRangePolygons.csv - fi -done - -if [ -f "${DATADIR}"/BL_Species_Elevations_2023.csv ]; then - echo "Applying birdlife data..." - python3 ./prepare_species/apply_birdlife_data.py --geojsons "${DATADIR}"/species-info/AVES --overrides "${DATADIR}"/BL_Species_Elevations_2023.csv -fi - -echo "Generating AoH task list..." -python3 ./utils/aoh_generator.py --input "${DATADIR}"/species-info --datadir "${DATADIR}" --output "${DATADIR}"/aohbatch.csv - -echo "Generating AoHs..." -littlejohn -j "${PROCESS_COUNT}" -o "${DATADIR}"/aohbatch.log -c "${DATADIR}"/aohbatch.csv "${AOH_CALC_BIN}" - -# Calculate predictors from AoHs -echo "Generating species richness..." -aoh-species-richness --aohs_folder "${DATADIR}"/aohs/current/ \ - --output "${DATADIR}"/summaries/species_richness.tif -echo "Generating endemism..." -aoh-endemism --aohs_folder "${DATADIR}"/aohs/current/ \ - --species_richness "${DATADIR}"/summaries/species_richness.tif \ - --output "${DATADIR}"/summaries/endemism.tif - -# Aoh Validation -echo "Collating validation data..." -aoh-collate-data --aoh_results "${DATADIR}"/aohs/current/ \ - --output "${DATADIR}"/validation/aohs.csv -echo "Calculating model validation..." -aoh-validate-prevalence --collated_aoh_data "${DATADIR}"/validation/aohs.csv \ - --output "${DATADIR}"/validation/model_validation.csv -for TAXA in "${TAXALIST[@]}" -do - echo "Fetching GBIF data for ${TAXA}..." - aoh-fetch-gbif-data --collated_aoh_data "${DATADIR}"/validation/aohs.csv \ - --taxa "${TAXA}" \ - --output_dir "${DATADIR}"/validation/occurrences/ - echo "Validating occurrences for ${TAXA}..." - aoh-validate-occurrences --gbif_data_path "${DATADIR}"/validation/occurrences/"${TAXA}" \ - --species_data "${DATADIR}"/species-info/"${TAXA}"/current/ \ - --aoh_results "${DATADIR}"/aohs/current/"${TAXA}"/ \ - --output "${DATADIR}"/validation/occurrences/"${TAXA}".csv -done - -# Threats -echo "Generating threat task list..." -python3 ./utils/threats_generator.py --input "${DATADIR}"/species-info --datadir "${DATADIR}" --output "${DATADIR}"/threatbatch.csv - -echo "Generating threat rasters..." -littlejohn -j "${PROCESS_COUNT}" -o "${DATADIR}"/threatbatch.log -c "${DATADIR}"/threatbatch.csv "${PYTHON_BIN}" -- ./threats/threat_processing.py - -echo "Summarising threats..." -python3 ./threats/threat_summation.py --threat_rasters "${DATADIR}"/threat_rasters --output "${DATADIR}"/threat_results diff --git a/threats/threat_processing.py b/threats/threat_processing.py index 0e04c73..f2347c7 100644 --- a/threats/threat_processing.py +++ b/threats/threat_processing.py @@ -7,11 +7,13 @@ import geopandas as gpd import yirgacheffe as yg from pyogrio.errors import DataSourceError +from snakemake_argparse_bridge import snakemake_compatible def threat_processing_per_species( species_data_path: Path, aoh_path: Path, output_directory_path: Path, + sentinel_path: Path | None, ) -> None: try: data = gpd.read_file(species_data_path) @@ -23,8 +25,16 @@ def threat_processing_per_species( os.makedirs(output_directory_path, exist_ok=True) taxon_id = data.id_no[0] + + # Due to validation we generate AOHs for many more species than + # is needed for STAR, but we need to ensure those don't slip into + # the pipeline + if bool(data.in_star[0]) is not True: + sys.exit(f"Species {taxon_id} should not be in star!") + category_weight = int(data.category_weight[0]) - threat_data = json.loads(data.threats[0]) + raw_threats = data.threats[0] + threat_data = json.loads(raw_threats) if isinstance(raw_threats, str) else raw_threats try: aoh_data_path = aoh_path.with_suffix(".json") @@ -38,22 +48,29 @@ def threat_processing_per_species( weighted_species = proportional_aoh_per_pixel * category_weight total_threat_weight = sum(x[1] for x in threat_data) - print(threat_data) - print(total_threat_weight) for threat_id, weight in threat_data: - print(threat_id, weight) proportional_threat_weight = weight / total_threat_weight per_threat_per_species_score = weighted_species * proportional_threat_weight - print(per_threat_per_species_score.sum()) threat_dir_path = output_directory_path / str(threat_id) os.makedirs(threat_dir_path, exist_ok=True) output_path = threat_dir_path / f"{taxon_id}.tif" per_threat_per_species_score.to_geotiff(output_path) -def main() -> None: - os.environ["OGR_GEOJSON_MAX_OBJ_SIZE"] = "0" + # This script generates a bunch of rasters, but snakemake needs one + # output to say when this is done, so if we're in snakemake mode we touch a sentinel file to + # let it know we've done. One day this should be another decorator. + if sentinel_path is not None: + os.makedirs(sentinel_path.parent, exist_ok=True) + sentinel_path.touch() +@snakemake_compatible(mapping={ + "species_data_path": "input.species_data", + "aoh_path": "input.aoh", + "output_directory_path": "params.output_dir", + "sentinel_path": "output.sentinel", +}) +def main() -> None: parser = argparse.ArgumentParser(description="Calculate per species threat layers") parser.add_argument( '--speciesdata', @@ -76,12 +93,21 @@ def main() -> None: required=True, dest='output_directory_path', ) + parser.add_argument( + '--sentinel', + type=Path, + help='Generate a sentinel file on completion for snakemake to track', + required=False, + default=None, + dest='sentinel_path', + ) args = parser.parse_args() threat_processing_per_species( args.species_data_path, args.aoh_path, args.output_directory_path, + args.sentinel_path, ) if __name__ == "__main__": diff --git a/threats/threat_summation.py b/threats/threat_summation.py index 5b93ede..b05d38c 100644 --- a/threats/threat_summation.py +++ b/threats/threat_summation.py @@ -9,6 +9,7 @@ import yirgacheffe as yg from yirgacheffe.layers import RasterLayer from osgeo import gdal +from snakemake_argparse_bridge import snakemake_compatible gdal.SetCacheMax(1024 * 1024 * 32) @@ -180,6 +181,11 @@ def threat_summation( reduce_to_next_level(level1_target, final_target, processes_count) +@snakemake_compatible(mapping={ + "rasters_directory": "params.threat_rasters_dir", + "output_directory": "params.output_dir", + "processes_count": "threads", +}) def main() -> None: parser = argparse.ArgumentParser(description="Generates the combined, and level 1 and level 2 threat rasters.") parser.add_argument( diff --git a/utils/aoh_generator.py b/utils/aoh_generator.py deleted file mode 100644 index 082d409..0000000 --- a/utils/aoh_generator.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import os -from pathlib import Path - -import pandas as pd - -def aoh_generator( - input_dir: Path, - data_dir: Path, - output_csv_path: Path, -): - taxa_dirs = input_dir.glob("[!.]*") - - res = [] - for taxa_dir_path in taxa_dirs: - for scenario in ['current',]: - source = 'historic' if scenario == 'pnv' else 'current' - taxa_path = taxa_dir_path / source - species_paths = taxa_path.glob("*.geojson") - for species_path in species_paths: - res.append([ - data_dir / "habitat_layers" / scenario, - data_dir / "Zenodo" / "FABDEM_1km_max_patched.tif", - data_dir / "Zenodo" / "FABDEM_1km_min_patched.tif", - data_dir / "crosswalk.csv", - species_path, - data_dir / "masks" / "CGLS100Inland_withGADMIslands.tif", - data_dir / "aohs" / scenario / taxa_dir_path.name, - ]) - - df = pd.DataFrame(res, columns=[ - '--fractional_habitats', - '--elevation-max', - '--elevation-min', - '--crosswalk', - '--speciesdata', - '--weights', - '--output' - ]) - output_dir, _ = os.path.split(output_csv_path) - os.makedirs(output_dir, exist_ok=True) - df.to_csv(output_csv_path, index=False) - - -def main() -> None: - parser = argparse.ArgumentParser(description="Species and seasonality generator.") - parser.add_argument( - '--input', - type=Path, - help="directory with taxa folders of species info", - required=True, - dest="input_dir" - ) - parser.add_argument( - '--datadir', - type=Path, - help="directory for results", - required=True, - dest="data_dir", - ) - parser.add_argument( - '--output', - type=Path, - help="name of output file for csv", - required=True, - dest="output" - ) - args = parser.parse_args() - - aoh_generator(args.input_dir, args.data_dir, args.output) - -if __name__ == "__main__": - main() diff --git a/utils/threats_generator.py b/utils/threats_generator.py deleted file mode 100644 index be66920..0000000 --- a/utils/threats_generator.py +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import json -import os -from pathlib import Path - -import pandas as pd - -def threats_generator( - input_dir: Path, - data_dir: Path, - output_csv_path: Path, -): - taxa_dirs = input_dir.glob("[!.]*") - - res = [] - for taxa_dir_path in taxa_dirs: - for scenario in ['current',]: - source = 'historic' if scenario == 'pnv' else 'current' - taxa_path = taxa_dir_path / source - species_paths = taxa_path.glob("*.geojson") - for species_path in species_paths: - taxon_id = species_path.stem - aoh_path = data_dir / "aohs" / source / taxa_dir_path.name / f"{taxon_id}_all.tif" - if aoh_path.exists(): - # Check if species should be included in STAR. We don't use geopandas here - # because it will parse the geometry, which can be quite slow, and we just - # want a single bool field - with open(species_path, 'r', encoding="UTF-8") as f: - data = json.load(f) - if data['features'][0]['properties']['in_star']: - res.append([ - species_path, - aoh_path, - data_dir / "threat_rasters" / taxa_dir_path.name, - ]) - - df = pd.DataFrame(res, columns=[ - '--speciesdata', - '--aoh', - '--output' - ]) - output_dir, _ = os.path.split(output_csv_path) - os.makedirs(output_dir, exist_ok=True) - df.to_csv(output_csv_path, index=False) - -def main() -> None: - parser = argparse.ArgumentParser(description="threat tasks generator.") - parser.add_argument( - '--input', - type=Path, - help="directory with taxa folders of species info", - required=True, - dest="input_dir" - ) - parser.add_argument( - '--datadir', - type=Path, - help="directory for results", - required=True, - dest="data_dir", - ) - parser.add_argument( - '--output', - type=Path, - help="name of output file for csv", - required=True, - dest="output" - ) - args = parser.parse_args() - - threats_generator(args.input_dir, args.data_dir, args.output) - -if __name__ == "__main__": - main() diff --git a/workflow/Snakefile b/workflow/Snakefile new file mode 100644 index 0000000..d531c04 --- /dev/null +++ b/workflow/Snakefile @@ -0,0 +1,144 @@ +# STAR Pipeline - Snakemake Workflow +# =================================== +# +# This workflow implements the STAR (Species Threat Abatement and Recovery) +# methodology from the IUCN. It processes species data, generates Area of +# Habitat (AoH) rasters, and produces threat-weighted biodiversity maps. +# +# Usage: +# snakemake --cores N all +# snakemake --cores N validation +# snakemake --cores N threats +# +# Environment variables required: +# DATADIR - Directory for input/output data +# DB_HOST, DB_NAME, DB_USER, DB_PASSWORD - PostgreSQL credentials +# +# For GBIF validation (optional): +# GBIF_USERNAME, GBIF_EMAIL, GBIF_PASSWORD + +import os +from pathlib import Path + +# Allow GDAL/OGR to handle large GeoJSON objects +os.environ["OGR_GEOJSON_MAX_OBJ_SIZE"] = "0" + +# Source directory (where this repo lives) +SRCDIR = Path(workflow.basedir).parent + + +# Load configuration +configfile: SRCDIR / "config/config.yaml" + + +# Data directory from environment variable +DATADIR = Path(os.environ.get("DATADIR", "/data")) + +# Taxa list from config +TAXA = config["taxa"] +SCENARIO = config["scenario"] + + +# Include rule modules +include: "rules/prepare.smk" +include: "rules/species.smk" +include: "rules/aoh.smk" +include: "rules/summaries.smk" +include: "rules/threats.smk" +include: "rules/validation.smk" + + +# ============================================================================= +# Target Rules +# ============================================================================= + + +rule all: + """ + Default target: run the complete STAR pipeline including validation. + """ + input: + # Final STAR output + DATADIR / "threat_results" / "level0" / "top.tif", + # Summaries + DATADIR / "summaries" / "species_richness.tif", + DATADIR / "summaries" / "endemism.tif", + # Model validation + DATADIR / "validation" / "model_validation.csv", + + +rule threats: + """ + Target: generate threat results without validation. + """ + input: + DATADIR / "threat_results" / "level0" / "top.tif", + + +rule summaries: + """ + Target: generate species richness and endemism maps. + """ + input: + DATADIR / "summaries" / "species_richness.tif", + DATADIR / "summaries" / "endemism.tif", + + +rule aohs: + """ + Target: generate all AoH rasters. + """ + input: + DATADIR / "validation" / "aohs.csv", + + +rule species_data: + """ + Target: extract species data from PostgreSQL. + """ + input: + DATADIR / "species-info" / "AVES" / ".birdlife_applied", + expand( + str(DATADIR / "species-info" / "{taxa}" / SCENARIO / "report.csv"), + taxa=TAXA, + ), + + +rule prepare: + """ + Target: prepare all base layers (habitat, masks, crosswalk). + + TODO: The habitat layers list should come from parsing the crosswalk. + """ + input: + DATADIR / "crosswalk.csv", + DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif", + expand( + str(DATADIR / "habitat_layers" / SCENARIO / "lcc_{n}.tif"), + n=range(0, 127), # Habitat layer codes + ), + + +rule validation: + """ + Target: run model validation (excludes occurrence validation by default which is expensive). + """ + input: + DATADIR / "validation" / "model_validation.csv", + + +# ============================================================================= +# Utility Functions +# ============================================================================= + + +def get_all_species_geojsons(wildcards): + """ + Return all species GeoJSON files for a given taxa. + Used for checkpoint-based dependency resolution. + """ + checkpoint_output = checkpoints.extract_species_data.get( + taxa=wildcards.taxa + ).output[0] + geojson_dir = DATADIR / "species-info" / wildcards.taxa / SCENARIO + return list(geojson_dir.glob("*.geojson")) diff --git a/workflow/profiles/default/config.yaml b/workflow/profiles/default/config.yaml new file mode 100644 index 0000000..238b1e0 --- /dev/null +++ b/workflow/profiles/default/config.yaml @@ -0,0 +1,2 @@ +resources: + db_connections: 1 diff --git a/workflow/rules/aoh.smk b/workflow/rules/aoh.smk new file mode 100644 index 0000000..e29294a --- /dev/null +++ b/workflow/rules/aoh.smk @@ -0,0 +1,198 @@ +# STAR Pipeline - Area of Habitat (AoH) Generation Rules +# ======================================================= +# +# These rules generate AoH rasters for each species. This is the most +# parallelizable part of the pipeline - each species can be processed +# independently with `snakemake --cores N`. +# +# Code-sensitive: These rules rebuild if the aoh package version changes. + +import os +from pathlib import Path + + +# ============================================================================= +# Version Sentinel for AOH Code +# ============================================================================= + + +rule aoh_version_sentinel: + """ + Create a version sentinel that tracks the aoh package version. + AOH rules depend on this to trigger rebuilds when the package updates. + """ + output: + sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + run: + import subprocess + + os.makedirs(os.path.dirname(output.sentinel), exist_ok=True) + + # Get aoh package version + try: + result = subprocess.run( + ["aoh-calc", "--version"], capture_output=True, text=True, check=True + ) + aoh_version = result.stdout.strip() + except Exception: + aoh_version = "unknown" + + with open(output.sentinel, "w") as f: + f.write(f"aoh: {aoh_version}\n") + + +# ============================================================================= +# Per-Species AOH Generation +# ============================================================================= + + +def aoh_species_inputs(wildcards): + """Return inputs for generate_aoh, including birdlife sentinel for AVES.""" + inputs = { + "species_data": DATADIR + / "species-info" + / wildcards.taxa + / SCENARIO + / f"{wildcards.species_id}.geojson", + # Base layers (precious - won't trigger rebuilds) + "habitat_sentinel": ancient( + DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete" + ), + "crosswalk": DATADIR / "crosswalk.csv", + "mask": ancient(DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif"), + "elevation_max": ancient(DATADIR / config["inputs"]["zenodo_elevation_max"]), + "elevation_min": ancient(DATADIR / config["inputs"]["zenodo_elevation_min"]), + # Version sentinel for code-sensitive rebuilds + "version_sentinel": DATADIR / ".sentinels" / "aoh_version.txt", + } + if wildcards.taxa == "AVES": + inputs["birdlife_applied"] = ( + DATADIR / "species-info" / "AVES" / ".birdlife_applied" + ) + return inputs + + +rule generate_aoh: + """ + Generate Area of Habitat raster for a single species. + + This rule is parallelizable - run with `snakemake --cores N` to process + multiple species concurrently. Each species takes its GeoJSON data and + produces a raster (.tif) and metadata (.json) file. + + The rule depends on the aoh version sentinel to rebuild when the + aoh package is updated. + """ + input: + unpack(aoh_species_inputs), + output: + # Only declare JSON as output - TIF is optional (not created for empty AOHs) + metadata=DATADIR / "aohs" / SCENARIO / "{taxa}" / "{species_id}_all.json", + params: + habitat_dir=DATADIR / "habitat_layers" / SCENARIO, + log: + DATADIR / "logs" / "aoh" / "{taxa}" / "{species_id}_all.log", + resources: + # Limit concurrent AOH jobs if needed (e.g., for memory) + aoh_slots=1, + shell: + """ + mkdir -p $(dirname {log}) + aoh-calc \ + --fractional_habitats {params.habitat_dir} \ + --elevation-max {input.elevation_max} \ + --elevation-min {input.elevation_min} \ + --crosswalk {input.crosswalk} \ + --speciesdata {input.species_data} \ + --weights {input.mask} \ + --output $(dirname {output.metadata}) \ + 2>&1 | tee {log} + """ + + +# ============================================================================= +# Per-Taxa AOH Aggregation +# ============================================================================= + + +def get_species_ids_for_taxa(wildcards): + """ + Get all species IDs for a taxa by reading the checkpoint output. + Returns list of species IDs (GeoJSON file stems). + """ + # Wait for the checkpoint to complete + checkpoint_output = checkpoints.extract_species_data.get( + taxa=wildcards.taxa + ).output[0] + geojson_dir = Path(checkpoint_output).parent + return [p.stem for p in geojson_dir.glob("*.geojson")] + + +def get_all_aoh_metadata_for_taxa(wildcards): + """ + Get paths to all AOH metadata files for a taxa. + """ + species_ids = get_species_ids_for_taxa(wildcards) + return [ + DATADIR / "aohs" / SCENARIO / wildcards.taxa / f"{sid}_all.json" + for sid in species_ids + ] + + +checkpoint aggregate_aohs_per_taxa: + """ + Checkpoint that ensures all AOHs for a taxa are generated. + Creates a sentinel file when complete. + + This is a checkpoint (not a rule) so that downstream rules like + threat processing can re-evaluate the DAG after AOHs are created, + allowing them to see which AOH files actually exist. + """ + input: + # Only depend on JSON metadata (always created), not TIFs (optional) + metadata=get_all_aoh_metadata_for_taxa, + output: + sentinel=DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete", + shell: + """ + echo "Generated $(echo {input.metadata} | wc -w) AOHs for {wildcards.taxa}" + touch {output.sentinel} + """ + + +# ============================================================================= +# Collate AOH Data +# ============================================================================= + + +rule collate_aoh_data: + """ + Collate metadata from all AOH JSON files into a single CSV. + + This reads the .json metadata files that are generated alongside each + AOH raster. The CSV is used by downstream validation and analysis. + + Note: This depends on AOH JSON files, not raster files, because some + species may have empty AOHs (no raster) but still have metadata. + """ + input: + # All AOHs must be complete + sentinels=expand( + str(DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete"), taxa=TAXA + ), + # Version tracking + version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + output: + collated=DATADIR / "validation" / "aohs.csv", + params: + aoh_results_dir=DATADIR / "aohs" / SCENARIO, + log: + DATADIR / "logs" / "collate_aoh_data.log", + shell: + """ + mkdir -p $(dirname {output.collated}) + aoh-collate-data \ + --aoh_results {params.aoh_results_dir} \ + --output {output.collated} \ + 2>&1 | tee {log} + """ diff --git a/workflow/rules/prepare.smk b/workflow/rules/prepare.smk new file mode 100644 index 0000000..00bafe9 --- /dev/null +++ b/workflow/rules/prepare.smk @@ -0,0 +1,167 @@ +# STAR Pipeline - Prepare Layers Rules +# ===================================== +# +# These rules handle the "precious" base layers that are slow to build +# and should only be regenerated if explicitly deleted (not if code changes). +# +# Precious layers: +# - habitat_layers/current/lcc_*.tif +# - masks/CGLS100Inland_withGADMIslands.tif +# - elevation maps (if generated locally) +# +# Normal layers (regenerate if inputs change): +# - crosswalk.csv + +import os +from pathlib import Path + + +# ============================================================================= +# Crosswalk Table (normal dependency tracking) +# ============================================================================= + + +rule convert_crosswalk: + """ + Convert IUCN crosswalk to minimal common format. + This is fast to regenerate so uses normal dependency tracking. + """ + input: + original=SRCDIR / config["inputs"]["crosswalk_source"], + output: + crosswalk=DATADIR / "crosswalk.csv", + script: + str(SRCDIR / "prepare_layers" / "convert_crosswalk.py") + + +# ============================================================================= +# Mask Processing (precious - only if missing) +# ============================================================================= + + +rule remove_nans_from_mask: + """ + Convert NaNs to zeros in the mask layer. + + This is a precious layer - it will only be built if the output doesn't exist. + The rule won't trigger rebuilds due to code changes. + """ + input: + original=ancient(DATADIR / config["inputs"]["zenodo_mask"]), + output: + mask=DATADIR / "masks" / "CGLS100Inland_withGADMIslands.tif", + script: + str(SRCDIR / "prepare_layers" / "remove_nans_from_mask.py") + + +# ============================================================================= +# Habitat Layer Processing (precious - only if missing) +# ============================================================================= + + +rule download_habitat: + """ + Download raw habitat map from Zenodo. + + This is a precious layer - will only download if output doesn't exist. + """ + output: + habitat=DATADIR / "habitat" / "raw.tif", + params: + zenodo_id=config["zenodo"]["habitat_id"], + filename=config["zenodo"]["habitat_filename"], + log: + DATADIR / "logs" / "download_habitat.log", + shell: + """ + reclaimer zenodo --zenodo_id {params.zenodo_id} \ + --filename "{params.filename}" \ + --output {output.habitat} \ + 2>&1 | tee {log} + """ + + +rule process_habitat: + """ + Process raw habitat map into per-class layers. + + This is a precious layer - the entire habitat_layers directory should + only be built if it doesn't exist. We use a sentinel file to track this. + """ + input: + habitat=ancient(DATADIR / "habitat" / "raw.tif"), + output: + # Sentinel file to indicate habitat processing is complete + sentinel=DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete", + params: + scale=config["habitat_scale"], + projection=config["projection"], + output_dir=DATADIR / "habitat_layers" / SCENARIO, + tmp_dir=DATADIR / "tmp_habitat_layers" / SCENARIO, + log: + DATADIR / "logs" / "process_habitat.log", + shell: + """ + set -e + aoh-habitat-process --habitat {input.habitat} \ + --scale {params.scale} \ + --projection "{params.projection}" \ + --output {params.tmp_dir} \ + 2>&1 | tee {log} + + # Atomic move of completed directory + rm -rf {params.output_dir} + mv {params.tmp_dir} {params.output_dir} + + # Create sentinel file + touch {output.sentinel} + """ + + +rule copy_islands_layer: + """ + Copy the missing landcover (islands) layer to habitat layers. + This is lcc_0.tif which represents island areas. + """ + input: + islands=ancient(DATADIR / config["inputs"]["zenodo_islands"]), + # Ensure habitat processing is done first + habitat_sentinel=DATADIR / "habitat_layers" / SCENARIO / ".habitat_complete", + output: + lcc_0=DATADIR / "habitat_layers" / SCENARIO / "lcc_0.tif", + log: + DATADIR / "logs" / "copy_islands_layer.log", + shell: + """ + cp {input.islands} {output.lcc_0} 2>&1 | tee {log} + """ + + +# ============================================================================= +# Helper rule to get specific habitat layer +# ============================================================================= + + +def get_habitat_layer(wildcards): + """ + Returns the path to a specific habitat layer. + lcc_0 is special (islands layer), others come from habitat processing. + """ + n = int(wildcards.n) + if n == 0: + return DATADIR / "habitat_layers" / SCENARIO / "lcc_0.tif" + else: + return DATADIR / "habitat_layers" / SCENARIO / f"lcc_{n}.tif" + + +rule habitat_layer: + """ + Pseudo-rule to request a specific habitat layer. + This triggers either the habitat processing or islands copy as needed. + """ + input: + layer=get_habitat_layer, + output: + # This is a bit of a trick - we declare the output but let the + # upstream rules actually create it + touch(DATADIR / "habitat_layers" / SCENARIO / ".lcc_{n}_exists"), diff --git a/workflow/rules/species.smk b/workflow/rules/species.smk new file mode 100644 index 0000000..720d353 --- /dev/null +++ b/workflow/rules/species.smk @@ -0,0 +1,63 @@ +# STAR Pipeline - Species Data Extraction Rules +# ============================================== +# +# These rules handle extracting species data from the IUCN PostgreSQL database. +# Species extraction is a checkpoint because the number of output files +# (one GeoJSON per species) is only known after extraction completes. + + +# ============================================================================= +# Species Data Extraction (Checkpoint) +# ============================================================================= + + +checkpoint extract_species_data: + """ + Extract species data from PostgreSQL database. + + This is a checkpoint because the number of output GeoJSON files is only + known after extraction. Each taxa produces N species files where N is + determined by the database query. + + Environment variables required: + DB_HOST, DB_NAME, DB_USER, DB_PASSWORD + """ + input: + excludes=DATADIR / config["optional_inputs"]["species_excludes"], + output: + # The report.csv is the known output; GeoJSON files are dynamic + report=DATADIR / "species-info" / "{taxa}" / SCENARIO / "report.csv", + params: + classname="{taxa}", + output_dir=lambda wildcards: DATADIR / "species-info" / wildcards.taxa, + projection=config["projection"], + resources: + # Serialise DB access scripts - only one extraction script at a time + # as it will make many concurrent connections internally + db_connections=1, + script: + str(SRCDIR / "prepare_species" / "extract_species_data_psql.py") + + +# ============================================================================= +# BirdLife Elevation Overrides (Optional) +# ============================================================================= + + +rule apply_birdlife_overrides: + """ + Apply BirdLife elevation data overrides to AVES species. + """ + input: + # The report isn't read, but acts as a sentinel that the birds + # species extraction has completed. + report=DATADIR / "species-info" / "AVES" / SCENARIO / "report.csv", + overrides=DATADIR / config["optional_inputs"]["birdlife_elevations"], + output: + # Unlike other taxa, the AOH stage needs to wait for the data to be + # applied, so we use this sentinel to indicate that. + sentinel=DATADIR / "species-info" / "AVES" / ".birdlife_applied", + params: + geojson_dir=DATADIR / "species-info", + script: + str(SRCDIR / "prepare_species" / "apply_birdlife_data.py") diff --git a/workflow/rules/summaries.smk b/workflow/rules/summaries.smk new file mode 100644 index 0000000..b0b0a85 --- /dev/null +++ b/workflow/rules/summaries.smk @@ -0,0 +1,72 @@ +# STAR Pipeline - Summary Statistics Rules +# ========================================= +# +# These rules generate summary statistics from AOH rasters: +# - Species richness: count of species per pixel +# - Endemism: weighted measure of endemic species per pixel + + +# ============================================================================= +# Species Richness +# ============================================================================= + + +rule species_richness: + """ + Calculate species richness from all AOH rasters. + + Species richness is the sum of all species AOHs - giving the number + of species present at each pixel. + """ + input: + # Species richness doesn't use the aoh.csv file, but it's a + # good indicator that AOH genration has completed + aoh_sentinel=DATADIR / "validation" / "aohs.csv", + version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + output: + richness=DATADIR / "summaries" / "species_richness.tif", + params: + aohs_folder=DATADIR / "aohs" / SCENARIO, + threads: workflow.cores + log: + DATADIR / "logs" / "species_richness.log", + shell: + """ + aoh-species-richness \ + --aohs_folder {params.aohs_folder} \ + --output {output.richness} \ + 2>&1 | tee {log} + """ + + +# ============================================================================= +# Endemism +# ============================================================================= + + +rule endemism: + """ + Calculate endemism from AOH rasters and species richness. + + Endemism weights each species by the inverse of its range size, + giving higher values to pixels with range-restricted species. + """ + input: + aoh_sentinel=DATADIR / "validation" / "aohs.csv", + species_richness=DATADIR / "summaries" / "species_richness.tif", + version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + output: + endemism=DATADIR / "summaries" / "endemism.tif", + params: + aohs_folder=DATADIR / "aohs" / SCENARIO, + threads: workflow.cores + log: + DATADIR / "logs" / "endemism.log", + shell: + """ + aoh-endemism \ + --aohs_folder {params.aohs_folder} \ + --species_richness {input.species_richness} \ + --output {output.endemism} \ + 2>&1 | tee {log} + """ diff --git a/workflow/rules/threats.smk b/workflow/rules/threats.smk new file mode 100644 index 0000000..608934f --- /dev/null +++ b/workflow/rules/threats.smk @@ -0,0 +1,182 @@ +# STAR Pipeline - Threat Processing Rules +# ======================================== +# +# These rules generate per-species threat rasters and aggregate them +# into the final STAR threat maps at different hierarchy levels. +# +# The pipeline: +# 1. For each species marked in_star=true with an AOH: +# - Generate threat rasters (one per threat the species faces) +# 2. Aggregate threat rasters: +# - Level 2: Sum by threat code prefix (e.g., 1.1, 1.2) +# - Level 1: Sum by major threat (e.g., 1, 2) +# - Level 0: Sum all threats (final STAR map) + +import json +from pathlib import Path + + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_star_species_for_taxa(wildcards): + """ + Get species IDs that should be included in STAR for a taxa. + A species is included if: + - It has in_star=true in its GeoJSON + - Its AOH raster exists (some species have no AOH due to no habitat overlap) + + Note: This function assumes AOHs are already generated. The caller + (aggregate_threat_rasters_inputs) ensures this via checkpoint dependency. + """ + # Get species data directory from checkpoint + checkpoint_output = checkpoints.extract_species_data.get( + taxa=wildcards.taxa + ).output[0] + geojson_dir = Path(checkpoint_output).parent + + star_species = [] + for geojson_path in geojson_dir.glob("*.geojson"): + species_id = geojson_path.stem + aoh_path = ( + DATADIR / "aohs" / SCENARIO / wildcards.taxa / f"{species_id}_all.tif" + ) + + # Check if species should be in STAR and has an AOH + try: + with open(geojson_path, "r") as f: + data = json.load(f) + if data["features"][0]["properties"].get("in_star", False): + # Only include if AOH TIF actually exists + if aoh_path.exists(): + star_species.append(species_id) + except (json.JSONDecodeError, KeyError, IndexError): + continue + + return star_species + + +def get_threat_raster_sentinels_for_taxa(wildcards): + """ + Get paths to threat raster sentinel files for all STAR species in a taxa. + """ + species_ids = get_star_species_for_taxa(wildcards) + return [ + DATADIR / "threat_rasters" / wildcards.taxa / f".{sid}_complete" + for sid in species_ids + ] + + +# ============================================================================= +# Per-Species Threat Raster Generation +# ============================================================================= + + +rule generate_threat_rasters: + """ + Generate threat rasters for a single species. + + Each species may face multiple threats, so this produces multiple + output files (one per threat). We use a sentinel file to track + completion since the exact outputs are data-dependent. + """ + input: + # Species data and AOH + species_data=DATADIR + / "species-info" + / "{taxa}" + / SCENARIO + / "{species_id}.geojson", + aoh=DATADIR / "aohs" / SCENARIO / "{taxa}" / "{species_id}_all.tif", + output: + # Sentinel file since actual outputs depend on species' threats + sentinel=DATADIR / "threat_rasters" / "{taxa}" / ".{species_id}_complete", + params: + output_dir=lambda wildcards: DATADIR / "threat_rasters" / wildcards.taxa, + log: + DATADIR / "logs" / "threats" / "{taxa}" / "{species_id}.log", + resources: + threat_slots=1, + script: + str(SRCDIR / "threats" / "threat_processing.py") + + +# ============================================================================= +# Per-Taxa Threat Raster Aggregation +# ============================================================================= + + +def aggregate_threat_rasters_inputs(wildcards): + """ + Return inputs for aggregate_threat_rasters_per_taxa. + Explicitly includes AOH checkpoint to ensure AOHs are generated first. + """ + # Get the AOH checkpoint output - this forces Snakemake to wait for AOHs + aoh_checkpoint = checkpoints.aggregate_aohs_per_taxa.get( + taxa=wildcards.taxa + ).output[0] + + return { + "aoh_complete": aoh_checkpoint, + "sentinels": get_threat_raster_sentinels_for_taxa(wildcards), + } + + +rule aggregate_threat_rasters_per_taxa: + """ + Aggregate rule that ensures all threat rasters for a taxa are generated. + Only processes species with in_star=true. + """ + input: + unpack(aggregate_threat_rasters_inputs), + output: + sentinel=DATADIR / "threat_rasters" / "{taxa}" / ".complete", + shell: + """ + echo "Generated threat rasters for {wildcards.taxa}" + touch {output.sentinel} + """ + + +rule all_threat_rasters: + """ + Aggregate rule that ensures all threat rasters for all taxa are generated. + """ + input: + sentinels=expand( + str(DATADIR / "threat_rasters" / "{taxa}" / ".complete"), taxa=TAXA + ), + output: + sentinel=DATADIR / "threat_rasters" / ".all_complete", + shell: + "touch {output.sentinel}" + + +# ============================================================================= +# Threat Summation +# ============================================================================= + + +rule threat_summation: + """ + Aggregate all per-species threat rasters into hierarchical threat maps. + + This produces: + - level2/: Aggregated by threat code (e.g., 1.1.tif, 1.2.tif) + - level1/: Aggregated by major threat (e.g., 1.tif, 2.tif) + - level0/top.tif: Final STAR map (all threats summed) + """ + input: + sentinel=DATADIR / "threat_rasters" / ".all_complete", + output: + star_map=DATADIR / "threat_results" / "level0" / "top.tif", + params: + threat_rasters_dir=DATADIR / "threat_rasters", + output_dir=DATADIR / "threat_results", + threads: workflow.cores + log: + DATADIR / "logs" / "threat_summation.log", + script: + str(SRCDIR / "threats" / "threat_summation.py") diff --git a/workflow/rules/validation.smk b/workflow/rules/validation.smk new file mode 100644 index 0000000..b45eed1 --- /dev/null +++ b/workflow/rules/validation.smk @@ -0,0 +1,131 @@ +# STAR Pipeline - Validation Rules +# ================================= +# +# These rules handle validation of the AOH models: +# +# 1. Collate AOH data: Gather metadata from all AOH JSON files +# 2. Model validation: Statistical analysis of AOH models (requires R) +# 3. GBIF validation: Validate against GBIF occurrence data (PRECIOUS - expensive) +# +# The GBIF validation is treated as "precious" - it will only run if the +# output doesn't exist. This is because GBIF downloads can take hours. + + +# ============================================================================= +# Model Validation +# ============================================================================= + + +rule model_validation: + """ + Perform statistical validation of AOH models based on Dahal et al. + + This runs a statistical analysis of the AOH outputs to assess + model quality. Requires R with lme4 and lmerTest packages. + + Rebuilds if: + - Collated AOH data changes + - AOH package version changes + """ + input: + collated=DATADIR / "validation" / "aohs.csv", + version_sentinel=DATADIR / ".sentinels" / "aoh_version.txt", + output: + validation=DATADIR / "validation" / "model_validation.csv", + log: + DATADIR / "logs" / "model_validation.log", + shell: + """ + aoh-validate-prevalence \ + --collated_aoh_data {input.collated} \ + --output {output.validation} \ + 2>&1 | tee {log} + """ + + +# ============================================================================= +# Occurrence Validation (EXPENSIVE!) +# ============================================================================= +# +# GBIF based occurrence validation is expensive (hours of download time) and +# should only be regenerated if the output is explicitly deleted. + + +rule fetch_gbif_data: + """ + Fetch GBIF occurrence data for a taxa. + + This rule is expensive (hours of downloads) and will only run if the output + doesn't exist. It won't rebuild due to code changes. + + Environment variables required: + GBIF_USERNAME, GBIF_EMAIL, GBIF_PASSWORD + """ + input: + collated=ancient(DATADIR / "validation" / "aohs.csv"), + output: + sentinel=DATADIR / "validation" / "occurrences" / ".{taxa}_fetched", + params: + output_dir=DATADIR / "validation" / "occurrences", + log: + DATADIR / "logs" / "fetch_gbif_{taxa}.log", + shell: + """ + aoh-fetch-gbif-data \ + --collated_aoh_data {input.collated} \ + --taxa {wildcards.taxa} \ + --output_dir {params.output_dir} \ + 2>&1 | tee {log} + touch {output.sentinel} + """ + + +rule validate_gbif_occurrences: + """ + Validate AOH models against GBIF occurrence data. + + Depends on GBIF data which is expensive to fetch. + """ + input: + gbif_sentinel=DATADIR / "validation" / "occurrences" / ".{taxa}_fetched", + # Use ancient() to prevent rebuilds due to upstream changes + aoh_sentinel=ancient(DATADIR / "aohs" / SCENARIO / "{taxa}" / ".complete"), + output: + validation=DATADIR / "validation" / "occurrences" / "{taxa}.csv", + params: + gbif_data=lambda wildcards: DATADIR + / "validation" + / "occurrences" + / wildcards.taxa, + species_data=lambda wildcards: DATADIR + / "species-info" + / wildcards.taxa + / SCENARIO, + aoh_results=lambda wildcards: DATADIR / "aohs" / SCENARIO / wildcards.taxa, + log: + DATADIR / "logs" / "validate_gbif_{taxa}.log", + shell: + """ + aoh-validate-occurrences \ + --gbif_data_path {params.gbif_data} \ + --species_data {params.species_data} \ + --aoh_results {params.aoh_results} \ + --output {output.validation} \ + 2>&1 | tee {log} + """ + + +# ============================================================================= +# Occurrence Validation Target +# ============================================================================= + + +rule occurrence_validation: + """ + Target rule for running GBIF validation for all taxa. + + WARNING: This is expensive and will download gigabytes of data from GBIF. + Only run explicitly with: snakemake occurrence_validation + """ + input: + expand(str(DATADIR / "validation" / "occurrences" / "{taxa}.csv"), taxa=TAXA),