Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
38b028c
Initial plan
Copilot Jan 3, 2026
dee2d28
Create NOAA HRRR analysis dataset files
Copilot Jan 3, 2026
bc67eb2
Add templates directory for NOAA HRRR analysis
Copilot Jan 3, 2026
dc83007
Use common analysis validators instead of custom ones
Copilot Jan 3, 2026
ea78900
Delete .gitkeep
aldenks Jan 3, 2026
1fbe4e3
Address code review feedback: simplify code and improve metadata
Copilot Jan 3, 2026
4ad5cf8
Generate zarr template metadata for NOAA HRRR analysis dataset
Copilot Jan 3, 2026
556db75
Address code review feedback: add TODOs, suspend crons, fix metadata
Copilot Jan 3, 2026
efc0cd2
Add faulthandler to conftest and specific precipitation value checks
Copilot Jan 3, 2026
789e610
Set chunk and shard sizes. Start dataset 12 hours after exact minimum…
aldenks Jan 6, 2026
a6ce683
add new cronjobs to submit job action
aldenks Jan 6, 2026
4393480
unrelated fixups
aldenks Jan 6, 2026
0839d2c
Set operational resources and update schedule
aldenks Jan 6, 2026
60576c6
Fix test failures: Update tests to use new start time 2018-07-14T00:00
Copilot Jan 6, 2026
5b761b9
test: add verbose pytest output for debugging worker crashes
aldenks Jan 6, 2026
6ec3f91
test: update dynamical dataset test cases and assertions
aldenks Jan 6, 2026
adef605
I'll help you generate a concise commit message for these changes. Ba…
aldenks Jan 6, 2026
38733b8
test: remove unnecessary print statement in dynamical dataset test
aldenks Jan 6, 2026
5e3e814
fix: update test assertions for precipitation values with small toler…
aldenks Jan 6, 2026
e873a85
test: add comprehensive test coverage for region_job.py
aldenks Jan 6, 2026
711e848
style: format code with linter and improve readability
aldenks Jan 6, 2026
7d47663
refactor: move inline imports to top of file and remove redundant imp…
aldenks Jan 6, 2026
eb9ec1d
fix: update test to use "no-rounding" for keep_mantissa_bits
aldenks Jan 6, 2026
c3aed56
fix: add units attribute to test data array for deaccumulation test
aldenks Jan 6, 2026
b17f8aa
test: update test to check deaccumulation rates per second
aldenks Jan 6, 2026
1f74588
test: update deaccumulation test with realistic precipitation data
aldenks Jan 6, 2026
48b8786
style: format code and remove unnecessary whitespace
aldenks Jan 6, 2026
251d8f7
refactor: update region_job tests to mock transformation functions
aldenks Jan 6, 2026
acba4ac
undo pytest addops change
aldenks Jan 6, 2026
37c1c72
handle time/lead_time deaccumulation dimension + set validation schedule
aldenks Jan 6, 2026
2ad5dba
polish hrrr analysis region job and dynamical dataset tests
aldenks Jan 6, 2026
5d7e1de
self review
aldenks Jan 6, 2026
e5b774f
Try 30GB local disk
aldenks Jan 7, 2026
83f371e
minor dataset attribute proofs
aldenks Jan 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/manual-create-job-from-cronjob.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ name: 'Manual: Create Job from CronJob'
- noaa-gefs-forecast-35-day-validate
- noaa-gfs-forecast-update
- noaa-gfs-forecast-validate
- noaa-hrrr-analysis-update
- noaa-hrrr-analysis-validate
- noaa-hrrr-forecast-48-hour-update
- noaa-hrrr-forecast-48-hour-validate
- noaa-ndvi-cdr-analysis-update
Expand Down
11 changes: 10 additions & 1 deletion src/reformatters/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import faulthandler
import multiprocessing
import os
from collections.abc import Sequence
Expand All @@ -8,7 +9,6 @@
with contextlib.suppress(RuntimeError): # skip if already set
multiprocessing.set_start_method("spawn", force=True)


import sentry_sdk
import typer
from sentry_sdk.integrations.typer import TyperIntegration
Expand All @@ -33,10 +33,15 @@
GefsForecast35DayDataset,
)
from reformatters.noaa.gfs.forecast import NoaaGfsForecastDataset
from reformatters.noaa.hrrr.analysis.dynamical_dataset import (
NoaaHrrrAnalysisDataset,
)
from reformatters.noaa.hrrr.forecast_48_hour.dynamical_dataset import (
NoaaHrrrForecast48HourDataset,
)

faulthandler.enable()


class NoaaHrrrIcechunkAwsOpenDataDatasetStorageConfig(StorageConfig):
"""Configuration for the storage of a AWS Open Data dataset."""
Expand Down Expand Up @@ -98,6 +103,10 @@ class UpstreamGriddedZarrsDatasetStorageConfig(StorageConfig):
primary_storage_config=SourceCoopZarrDatasetStorageConfig(),
replica_storage_configs=[NoaaHrrrIcechunkAwsOpenDataDatasetStorageConfig()],
),
NoaaHrrrAnalysisDataset(
primary_storage_config=SourceCoopZarrDatasetStorageConfig(),
replica_storage_configs=[NoaaHrrrIcechunkAwsOpenDataDatasetStorageConfig()],
),
# ECMWF
EcmwfIfsEnsForecast15Day025DegreeDataset(
primary_storage_config=SourceCoopZarrDatasetStorageConfig(),
Expand Down
Empty file.
67 changes: 67 additions & 0 deletions src/reformatters/noaa/hrrr/analysis/dynamical_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from collections.abc import Iterable, Sequence
from datetime import timedelta

from reformatters.common import validation
from reformatters.common.dynamical_dataset import DynamicalDataset
from reformatters.common.kubernetes import (
CronJob,
ReformatCronJob,
ValidationCronJob,
)
from reformatters.noaa.hrrr.hrrr_config_models import NoaaHrrrDataVar
from reformatters.noaa.hrrr.region_job import NoaaHrrrSourceFileCoord

from .region_job import NoaaHrrrAnalysisRegionJob
from .template_config import NoaaHrrrAnalysisTemplateConfig


class NoaaHrrrAnalysisDataset(
DynamicalDataset[NoaaHrrrDataVar, NoaaHrrrSourceFileCoord]
):
"""DynamicalDataset implementation for NOAA HRRR analysis."""

template_config: NoaaHrrrAnalysisTemplateConfig = NoaaHrrrAnalysisTemplateConfig()
region_job_class: type[NoaaHrrrAnalysisRegionJob] = NoaaHrrrAnalysisRegionJob

def operational_kubernetes_resources(self, image_tag: str) -> Iterable[CronJob]:
"""Define Kubernetes cron jobs for operational updates and validation."""
operational_update_cron_job = ReformatCronJob(
name=f"{self.dataset_id}-update",
# Every 3 hours at 57 minutes past the hour.
# Data for forecast hour 0 is available 54 mins after init time on most issuances
# We could of course increase this to hourly
schedule="57 */3 * * *",
pod_active_deadline=timedelta(minutes=20),
image=image_tag,
dataset_id=self.dataset_id,
cpu="7", # 8 shards can be compressed in parallel
memory="30G",
shared_memory="16.5G",
ephemeral_storage="30G",
secret_names=self.store_factory.k8s_secret_names(),
suspend=True,
)

validation_cron_job = ValidationCronJob(
name=f"{self.dataset_id}-validate",
# Run this 23 mins after the operational update, which is at 57 */3 * * *
# That is, at 57 + 23 = 80th min of the hour, which is 20 mins into the next hour.
# To get every third hour, but offset by +1 hr 20 min versus the main cron,
# we do "20 1-23/3 * * *"
schedule="20 1-23/3 * * *",
pod_active_deadline=timedelta(minutes=10),
image=image_tag,
dataset_id=self.dataset_id,
cpu="0.7",
memory="3.5G",
secret_names=self.store_factory.k8s_secret_names(),
suspend=True,
)

return [operational_update_cron_job, validation_cron_job]

def validators(self) -> Sequence[validation.DataValidator]:
return (
validation.check_analysis_current_data,
validation.check_analysis_recent_nans,
)
57 changes: 57 additions & 0 deletions src/reformatters/noaa/hrrr/analysis/region_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from collections.abc import Mapping, Sequence

import pandas as pd
import xarray as xr

from reformatters.common.iterating import item
from reformatters.common.logging import get_logger
from reformatters.common.region_job import (
CoordinateValueOrRange,
)
from reformatters.common.types import (
Dim,
)
from reformatters.noaa.hrrr.hrrr_config_models import (
NoaaHrrrDataVar,
)
from reformatters.noaa.hrrr.region_job import NoaaHrrrRegionJob, NoaaHrrrSourceFileCoord
from reformatters.noaa.noaa_utils import has_hour_0_values

log = get_logger(__name__)


class NoaaHrrrAnalysisSourceFileCoord(NoaaHrrrSourceFileCoord):
def out_loc(self) -> Mapping[Dim, CoordinateValueOrRange]:
return {"time": self.init_time + self.lead_time}


class NoaaHrrrAnalysisRegionJob(NoaaHrrrRegionJob):
"""Region job for HRRR analysis data processing."""

def generate_source_file_coords(
self,
processing_region_ds: xr.Dataset,
data_var_group: Sequence[NoaaHrrrDataVar],
) -> Sequence[NoaaHrrrAnalysisSourceFileCoord]:
times = pd.to_datetime(processing_region_ds["time"].values)
group_has_hour_0 = item({has_hour_0_values(var) for var in data_var_group})

if group_has_hour_0:
init_times = times
lead_time = pd.Timedelta("0h")
else:
init_times = times - pd.Timedelta(hours=1)
lead_time = pd.Timedelta("1h")

file_type = item({var.internal_attrs.hrrr_file_type for var in data_var_group})

return [
NoaaHrrrAnalysisSourceFileCoord(
init_time=init_time,
lead_time=lead_time,
domain="conus",
file_type=file_type,
data_vars=data_var_group,
)
for init_time in init_times
]
129 changes: 129 additions & 0 deletions src/reformatters/noaa/hrrr/analysis/template_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from collections.abc import Sequence
from typing import Any

import numpy as np
import pandas as pd
import xarray as xr
from pydantic import computed_field

from reformatters.common.config_models import (
Coordinate,
CoordinateAttrs,
DatasetAttributes,
Encoding,
StatisticsApproximate,
)
from reformatters.common.template_config import SPATIAL_REF_COORDS
from reformatters.common.time_utils import whole_hours
from reformatters.common.types import AppendDim, Dim, Timedelta, Timestamp
from reformatters.common.zarr import (
BLOSC_4BYTE_ZSTD_LEVEL3_SHUFFLE,
BLOSC_8BYTE_ZSTD_LEVEL3_SHUFFLE,
)
from reformatters.noaa.hrrr.hrrr_config_models import (
NoaaHrrrDataVar,
)
from reformatters.noaa.hrrr.template_config import NoaaHrrrCommonTemplateConfig


class NoaaHrrrAnalysisTemplateConfig(NoaaHrrrCommonTemplateConfig):
dims: tuple[Dim, ...] = ("time", "y", "x")
append_dim: AppendDim = "time"
append_dim_start: Timestamp = pd.Timestamp("2018-07-14T00:00") # start of HRRR v3
append_dim_frequency: Timedelta = pd.Timedelta("1h")

@computed_field # type: ignore[prop-decorator]
@property
def dataset_attributes(self) -> DatasetAttributes:
return DatasetAttributes(
dataset_id="noaa-hrrr-analysis",
dataset_version="0.1.0",
name="NOAA HRRR analysis",
description="Analysis data from the High-Resolution Rapid Refresh (HRRR) model operated by NOAA NWS NCEP.",
attribution="NOAA NWS NCEP HRRR data processed by dynamical.org from NOAA Open Data Dissemination archives.",
spatial_domain="Continental United States",
spatial_resolution="3 km",
time_domain=f"{self.append_dim_start} UTC to Present",
time_resolution=f"{whole_hours(self.append_dim_frequency)} hour",
)

def dimension_coordinates(self) -> dict[str, Any]:
y_coords, x_coords = self._y_x_coordinates()
return {
"time": self.append_dim_coordinates(
self.append_dim_start + self.append_dim_frequency
),
"y": y_coords,
"x": x_coords,
}

def derive_coordinates(
self, ds: xr.Dataset
) -> dict[str, xr.DataArray | tuple[tuple[str, ...], np.ndarray[Any, Any]]]:
latitudes, longitudes = self._latitude_longitude_coordinates(
ds["x"].values, ds["y"].values
)

return {
"latitude": (("y", "x"), latitudes),
"longitude": (("y", "x"), longitudes),
"spatial_ref": SPATIAL_REF_COORDS,
}

@computed_field # type: ignore[prop-decorator]
@property
def coords(self) -> Sequence[Coordinate]:
append_dim_coordinate_chunk_size = self.append_dim_coordinate_chunk_size()

hrrr_common_coords = super().coords

return [
*hrrr_common_coords,
Coordinate(
name="time",
encoding=Encoding(
dtype="int64",
fill_value=0,
compressors=[BLOSC_8BYTE_ZSTD_LEVEL3_SHUFFLE],
calendar="proleptic_gregorian",
units="seconds since 1970-01-01 00:00:00",
chunks=append_dim_coordinate_chunk_size,
shards=None,
),
attrs=CoordinateAttrs(
units="seconds since 1970-01-01 00:00:00",
statistics_approximate=StatisticsApproximate(
min=self.append_dim_start.isoformat(), max="Present"
),
),
),
]

@computed_field # type: ignore[prop-decorator]
@property
def data_vars(self) -> Sequence[NoaaHrrrDataVar]:
# ~18MB uncompressed, ~3.5MB compressed
var_chunks: dict[Dim, int] = {
"time": 90 * 24, # 90 days of hourly data
# Chunks are 135 km by 135 km spatially
"x": 45, # 40 chunks over 1799 pixels
"y": 45, # 24 chunks over 1059 pixels
}

# ~2GB uncompressed, ~400MB compressed
var_shards: dict[Dim, int] = {
"time": var_chunks["time"],
# Shards are 1350 km by 1620 km spatially
"x": var_chunks["x"] * 10, # 4 shards over 1799 pixels
"y": var_chunks["y"] * 12, # 2 shards over 1059 pixels
}

encoding_float32_default = Encoding(
dtype="float32",
fill_value=np.nan,
chunks=tuple(var_chunks[d] for d in self.dims),
shards=tuple(var_shards[d] for d in self.dims),
compressors=[BLOSC_4BYTE_ZSTD_LEVEL3_SHUFFLE],
)

return self.get_data_vars(encoding_float32_default)
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{
"shape": [
1,
1059,
1799
],
"data_type": "float32",
"chunk_grid": {
"name": "regular",
"configuration": {
"chunk_shape": [
2160,
540,
450
]
}
},
"chunk_key_encoding": {
"name": "default",
"configuration": {
"separator": "/"
}
},
"fill_value": "NaN",
"codecs": [
{
"name": "sharding_indexed",
"configuration": {
"chunk_shape": [
2160,
45,
45
],
"codecs": [
{
"name": "bytes",
"configuration": {
"endian": "little"
}
},
{
"name": "blosc",
"configuration": {
"typesize": 4,
"cname": "zstd",
"clevel": 3,
"shuffle": "shuffle",
"blocksize": 0
}
}
],
"index_codecs": [
{
"name": "bytes",
"configuration": {
"endian": "little"
}
},
{
"name": "crc32c"
}
],
"index_location": "end"
}
}
],
"attributes": {
"long_name": "Categorical freezing rain",
"short_name": "cfrzr",
"units": "0=no; 1=yes",
"step_type": "instant",
"coordinates": "latitude longitude spatial_ref",
"_FillValue": "AAAAAAAA+H8="
},
"dimension_names": [
"time",
"y",
"x"
],
"zarr_format": 3,
"node_type": "array",
"storage_transformers": []
}
Loading