Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
polytope.env
12 changes: 12 additions & 0 deletions docker/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
services:
backend:
build:
context: ../
dockerfile: docker/yampit/Dockerfile
env_file: "./polytope.env"
cache:
build: varnish
environment:
VARNISH_SIZE: 5G
ports:
- "2080:80"
1 change: 1 addition & 0 deletions docker/polytope.env.dummy
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
POLYTOPE_USER_KEY=...
2 changes: 2 additions & 0 deletions docker/varnish/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM varnish:7
COPY default.vcl /etc/varnish/
6 changes: 6 additions & 0 deletions docker/varnish/default.vcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
vcl 4.1;

backend default {
.host = "backend";
.port = "8000";
}
24 changes: 24 additions & 0 deletions docker/yampit/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
FROM python:3.12-slim-bookworm
RUN apt update && apt install -y git cmake gcc g++ gfortran

ARG eccodes_version=2.38.3
RUN mkdir -p /src
WORKDIR /src
#RUN git clone https://github.com/ecmwf/ecbuild --depth 1 -b $ecbuild_version
#ENV PATH="${PATH}:/src/ecbuild/bin"
RUN git clone https://github.com/ecmwf/eccodes --depth 1 -b $eccodes_version
RUN apt install -y libaec-dev
RUN mkdir /src/eccodes/build && cd /src/eccodes/build && cmake .. -DENABLE_AEC=ON && make && make install

COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

RUN mkdir /app
ADD README.md src pyproject.toml uv.lock /app

# Sync the project into a new environment, using the frozen lockfile
WORKDIR /app
#RUN uv build
RUN uv sync --frozen

# use POLYTOPE_USER_KEY environment variable to authenticate against polytope server
CMD ["uv", "run", "sanic", "yampit.server", "-H0.0.0.0"]
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ authors = [
]
requires-python = ">=3.11"
dependencies = [
"aiohttp>=3.11.1",
"eccodes>=2.38.3",
"polytope-client>=0.7.4",
"sanic>=24.6.0",
]

[build-system]
Expand Down
75 changes: 75 additions & 0 deletions src/yampit/async_polytope_request_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import json
from urllib.parse import urljoin
import aiohttp
import asyncio

async def get_client(**kwargs):
return aiohttp.ClientSession(**kwargs)


class AsyncPolytopeRequestHandler:
def __init__(self, server, collection):
from polytope.api import Client
self.client = Client(address=server)

self.server = server
self.collection = collection
self.get_client = get_client
self.max_poll_retries = 100
self._session = None

self.auth_headers = {"Authorization": ", ".join(self.client.auth.get_auth_headers())}


async def set_session(self):
if self._session is None:
self._session = await self.get_client()
return self._session

def __del__(self):
if self._session:
self._session.close()

async def _poll_get(self, url):
session = await self.set_session()

for i in range(self.max_poll_retries):
async with session.get(url, headers=self.auth_headers) as r:
r.raise_for_status()
match r.status:
case 200: # OK, direct download
return await r.read()
case 202: # Accepted, scheduled. Needs polling
url = urljoin(url, r.headers.get("Location"))
wait = int(r.headers.get("Retry-After", 0))
await asyncio.sleep(wait)
continue
case 303: # redirect to direct download
raise NotImplementedError("direct download")
else:
raise RuntimeError("max poll retries exceeded")


async def get(self, request):
request_object = {"verb": "retrieve", "request": json.dumps(request)}
url = self.client.config.get_url("requests", collection_id=self.collection)
poll_url = None

session = await self.set_session()
async with session.post(url, headers=self.auth_headers, json=request_object) as r:
r.raise_for_status()
match r.status:
case 200: # OK, direct download
res = await r.read()
case 202: # Accepted, scheduled. Needs polling
poll_url = urljoin(url, r.headers.get("Location"))
wait = int(r.headers.get("Retry-After", 0))
await asyncio.sleep(wait)
res = await self._poll_get(poll_url)
case 303: # redirect to direct download
raise NotImplementedError("direct download")

if poll_url:
async with session.delete(poll_url, headers=self.auth_headers) as r:
r.raise_for_status()
return res
30 changes: 30 additions & 0 deletions src/yampit/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import xarray as xr

demo = dict(
base_request={
'activity': 'CMIP6',
'class': 'd1',
'dataset': 'climate-dt',
'experiment': 'hist',
'generation': '1',
'levtype': 'sfc',
'realization': '1',
'resolution': 'high',
'stream': 'clte',
'type': 'fc',
},
coords={
"time": xr.date_range("1991-03-01", "2012-12-31", freq="h"),
"cell": range(12 * 4**10),
"model": ["IFS-NEMO", "ICON"],
},
variables={
"2t": {"dims": ("model", "time", "cell")},
"tcwv": {"dims": ("model", "time", "cell")},
"tclw": {"dims": ("model", "time", "cell")},
"tclw": {"dims": ("model", "time", "cell")},
"10u": {"dims": ("model", "time", "cell")},
"10v": {"dims": ("model", "time", "cell")},
},
internal_dims=["cell"],
)
58 changes: 38 additions & 20 deletions src/yampit/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@

import eccodes

class Mapper:
def __init__(self, request_handler, base_request, coords, variables, internal_dims):
self.request_handler = request_handler
class MarsDataset:
def __init__(self, base_request, coords, variables, internal_dims):
self.base_request = base_request
self.coords = {k: np.asarray(v) for k, v in coords.items()}
self.variables = variables
self.internal_dims = internal_dims

@lru_cache
def zmetadata(self):
return json.dumps({
return {
"zarr_consolidated_format": 1,
"metadata": {
".zattrs": {},
Expand Down Expand Up @@ -57,26 +57,27 @@ def zmetadata(self):
for name, info in self.variables.items()
},
}
}).encode("utf-8")
}

def coord(self, name):
if name == "time":
return bytes((self.coords[name] - self.coords[name][0]).astype("timedelta64[s]").astype(np.int32))
else:
return bytes(self.coords[name])

@lru_cache
def __getitem__(self, key):
def key2request(self, key):
if key == ".zmetadata":
return self.zmetadata()
try:
var, chunk = key.split("/")
chunk = list(map(int, chunk.split(".")))
except KeyError:
raise KeyError()
return 'inline', json.dumps(self.zmetadata()).encode("utf-8")

key_parts = key.split("/")
if key_parts[-1].startswith(".z"):
return 'inline', json.dumps(self.zmetadata()["metadata"][key]).encode("utf-8")

var, chunk = key_parts
chunk = list(map(int, chunk.split(".")))

if var in self.coords:
return self.coord(var)
return 'inline', self.coord(var)

def _encode_mars_request(dim, c):
if dim == "time":
Expand All @@ -87,7 +88,7 @@ def _encode_mars_request(dim, c):
else:
return {dim: str(c)}

request = {
return 'request', {
**self.base_request,
"param": var,
**{
Expand All @@ -98,11 +99,28 @@ def _encode_mars_request(dim, c):
}
}

data = self.request_handler.get(request)
mid = eccodes.codes_new_from_message(data)
data = eccodes.codes_get_array(mid, "values")
eccodes.codes_release(mid)
return bytes(data.astype("<f4"))

class Mapper:
def __init__(self, request_handler, *args, **kwargs):
self.request_handler = request_handler
self._mds = MarsDataset(*args, **kwargs)
self.logger = logging.getLogger(__name__)

@lru_cache
def __getitem__(self, key):
kind, request = self._mds.key2request(key)

if kind == 'inline':
return request
elif kind == 'request':
self.logger.debug("requesting %s for %s", request, key)
data = self.request_handler.get(request)
mid = eccodes.codes_new_from_message(data)
data = eccodes.codes_get_array(mid, "values")
eccodes.codes_release(mid)
return bytes(data.astype("<f4"))
else:
raise NotImplementedError(f"kind {kind}")

def __contains__(self, key):
try:
Expand Down
42 changes: 42 additions & 0 deletions src/yampit/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from sanic import Sanic
from sanic.response import raw

import eccodes

from .catalog import demo
from .mapper import MarsDataset
from .async_polytope_request_handler import AsyncPolytopeRequestHandler

app = Sanic("YAMPIT_Server")

app.ctx.dataset = MarsDataset(**demo)
app.ctx.request_handler = AsyncPolytopeRequestHandler("polytope.lumi.apps.dte.destination-earth.eu", "destination-earth")

def is_meta(key):
return key.split("/")[-1].startswith(".z")

@app.get("/ds/<key:path>")
async def get_chunk(request, key):
kind, request = app.ctx.dataset.key2request(key)

if is_meta(key):
content_type="application/json"
else:
content_type = "application/octet-stream"

headers = {
"Access-Control-Allow-Origin": "*",
"Cache-Control": "public, max-age=604800",
}

if kind == 'inline':
return raw(request, content_type=content_type, headers=headers)
elif kind == 'request':
data = await app.ctx.request_handler.get(request)

mid = eccodes.codes_new_from_message(data)
data = eccodes.codes_get_array(mid, "values")
eccodes.codes_release(mid)
return raw(bytes(data.astype("<f4")), content_type=content_type, headers=headers)
else:
raise NotImplementedError(f"kind {kind}")
Loading