diff --git a/docker/.gitignore b/docker/.gitignore new file mode 100644 index 0000000..d89eb27 --- /dev/null +++ b/docker/.gitignore @@ -0,0 +1 @@ +polytope.env diff --git a/docker/compose.yaml b/docker/compose.yaml new file mode 100644 index 0000000..c49c6c5 --- /dev/null +++ b/docker/compose.yaml @@ -0,0 +1,12 @@ +services: + backend: + build: + context: ../ + dockerfile: docker/yampit/Dockerfile + env_file: "./polytope.env" + cache: + build: varnish + environment: + VARNISH_SIZE: 5G + ports: + - "2080:80" diff --git a/docker/polytope.env.dummy b/docker/polytope.env.dummy new file mode 100644 index 0000000..f22e513 --- /dev/null +++ b/docker/polytope.env.dummy @@ -0,0 +1 @@ +POLYTOPE_USER_KEY=... diff --git a/docker/varnish/Dockerfile b/docker/varnish/Dockerfile new file mode 100644 index 0000000..86a098a --- /dev/null +++ b/docker/varnish/Dockerfile @@ -0,0 +1,2 @@ +FROM varnish:7 +COPY default.vcl /etc/varnish/ diff --git a/docker/varnish/default.vcl b/docker/varnish/default.vcl new file mode 100644 index 0000000..6fce83f --- /dev/null +++ b/docker/varnish/default.vcl @@ -0,0 +1,6 @@ +vcl 4.1; + +backend default { + .host = "backend"; + .port = "8000"; +} diff --git a/docker/yampit/Dockerfile b/docker/yampit/Dockerfile new file mode 100644 index 0000000..c6320d1 --- /dev/null +++ b/docker/yampit/Dockerfile @@ -0,0 +1,24 @@ +FROM python:3.12-slim-bookworm +RUN apt update && apt install -y git cmake gcc g++ gfortran + +ARG eccodes_version=2.38.3 +RUN mkdir -p /src +WORKDIR /src +#RUN git clone https://github.com/ecmwf/ecbuild --depth 1 -b $ecbuild_version +#ENV PATH="${PATH}:/src/ecbuild/bin" +RUN git clone https://github.com/ecmwf/eccodes --depth 1 -b $eccodes_version +RUN apt install -y libaec-dev +RUN mkdir /src/eccodes/build && cd /src/eccodes/build && cmake .. -DENABLE_AEC=ON && make && make install + +COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ + +RUN mkdir /app +ADD README.md src pyproject.toml uv.lock /app + +# Sync the project into a new environment, using the frozen lockfile +WORKDIR /app +#RUN uv build +RUN uv sync --frozen + +# use POLYTOPE_USER_KEY environment variable to authenticate against polytope server +CMD ["uv", "run", "sanic", "yampit.server", "-H0.0.0.0"] diff --git a/pyproject.toml b/pyproject.toml index 3b52656..647827f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,8 +9,10 @@ authors = [ ] requires-python = ">=3.11" dependencies = [ + "aiohttp>=3.11.1", "eccodes>=2.38.3", "polytope-client>=0.7.4", + "sanic>=24.6.0", ] [build-system] diff --git a/src/yampit/async_polytope_request_handler.py b/src/yampit/async_polytope_request_handler.py new file mode 100644 index 0000000..d612b7d --- /dev/null +++ b/src/yampit/async_polytope_request_handler.py @@ -0,0 +1,75 @@ +import json +from urllib.parse import urljoin +import aiohttp +import asyncio + +async def get_client(**kwargs): + return aiohttp.ClientSession(**kwargs) + + +class AsyncPolytopeRequestHandler: + def __init__(self, server, collection): + from polytope.api import Client + self.client = Client(address=server) + + self.server = server + self.collection = collection + self.get_client = get_client + self.max_poll_retries = 100 + self._session = None + + self.auth_headers = {"Authorization": ", ".join(self.client.auth.get_auth_headers())} + + + async def set_session(self): + if self._session is None: + self._session = await self.get_client() + return self._session + + def __del__(self): + if self._session: + self._session.close() + + async def _poll_get(self, url): + session = await self.set_session() + + for i in range(self.max_poll_retries): + async with session.get(url, headers=self.auth_headers) as r: + r.raise_for_status() + match r.status: + case 200: # OK, direct download + return await r.read() + case 202: # Accepted, scheduled. Needs polling + url = urljoin(url, r.headers.get("Location")) + wait = int(r.headers.get("Retry-After", 0)) + await asyncio.sleep(wait) + continue + case 303: # redirect to direct download + raise NotImplementedError("direct download") + else: + raise RuntimeError("max poll retries exceeded") + + + async def get(self, request): + request_object = {"verb": "retrieve", "request": json.dumps(request)} + url = self.client.config.get_url("requests", collection_id=self.collection) + poll_url = None + + session = await self.set_session() + async with session.post(url, headers=self.auth_headers, json=request_object) as r: + r.raise_for_status() + match r.status: + case 200: # OK, direct download + res = await r.read() + case 202: # Accepted, scheduled. Needs polling + poll_url = urljoin(url, r.headers.get("Location")) + wait = int(r.headers.get("Retry-After", 0)) + await asyncio.sleep(wait) + res = await self._poll_get(poll_url) + case 303: # redirect to direct download + raise NotImplementedError("direct download") + + if poll_url: + async with session.delete(poll_url, headers=self.auth_headers) as r: + r.raise_for_status() + return res diff --git a/src/yampit/catalog.py b/src/yampit/catalog.py new file mode 100644 index 0000000..a4f5e2f --- /dev/null +++ b/src/yampit/catalog.py @@ -0,0 +1,30 @@ +import xarray as xr + +demo = dict( + base_request={ + 'activity': 'CMIP6', + 'class': 'd1', + 'dataset': 'climate-dt', + 'experiment': 'hist', + 'generation': '1', + 'levtype': 'sfc', + 'realization': '1', + 'resolution': 'high', + 'stream': 'clte', + 'type': 'fc', + }, + coords={ + "time": xr.date_range("1991-03-01", "2012-12-31", freq="h"), + "cell": range(12 * 4**10), + "model": ["IFS-NEMO", "ICON"], + }, + variables={ + "2t": {"dims": ("model", "time", "cell")}, + "tcwv": {"dims": ("model", "time", "cell")}, + "tclw": {"dims": ("model", "time", "cell")}, + "tclw": {"dims": ("model", "time", "cell")}, + "10u": {"dims": ("model", "time", "cell")}, + "10v": {"dims": ("model", "time", "cell")}, + }, + internal_dims=["cell"], +) diff --git a/src/yampit/mapper.py b/src/yampit/mapper.py index 432981e..348751b 100644 --- a/src/yampit/mapper.py +++ b/src/yampit/mapper.py @@ -4,16 +4,16 @@ import eccodes -class Mapper: - def __init__(self, request_handler, base_request, coords, variables, internal_dims): - self.request_handler = request_handler +class MarsDataset: + def __init__(self, base_request, coords, variables, internal_dims): self.base_request = base_request self.coords = {k: np.asarray(v) for k, v in coords.items()} self.variables = variables self.internal_dims = internal_dims + @lru_cache def zmetadata(self): - return json.dumps({ + return { "zarr_consolidated_format": 1, "metadata": { ".zattrs": {}, @@ -57,7 +57,7 @@ def zmetadata(self): for name, info in self.variables.items() }, } - }).encode("utf-8") + } def coord(self, name): if name == "time": @@ -65,18 +65,19 @@ def coord(self, name): else: return bytes(self.coords[name]) - @lru_cache - def __getitem__(self, key): + def key2request(self, key): if key == ".zmetadata": - return self.zmetadata() - try: - var, chunk = key.split("/") - chunk = list(map(int, chunk.split("."))) - except KeyError: - raise KeyError() + return 'inline', json.dumps(self.zmetadata()).encode("utf-8") + + key_parts = key.split("/") + if key_parts[-1].startswith(".z"): + return 'inline', json.dumps(self.zmetadata()["metadata"][key]).encode("utf-8") + + var, chunk = key_parts + chunk = list(map(int, chunk.split("."))) if var in self.coords: - return self.coord(var) + return 'inline', self.coord(var) def _encode_mars_request(dim, c): if dim == "time": @@ -87,7 +88,7 @@ def _encode_mars_request(dim, c): else: return {dim: str(c)} - request = { + return 'request', { **self.base_request, "param": var, **{ @@ -98,11 +99,28 @@ def _encode_mars_request(dim, c): } } - data = self.request_handler.get(request) - mid = eccodes.codes_new_from_message(data) - data = eccodes.codes_get_array(mid, "values") - eccodes.codes_release(mid) - return bytes(data.astype("") +async def get_chunk(request, key): + kind, request = app.ctx.dataset.key2request(key) + + if is_meta(key): + content_type="application/json" + else: + content_type = "application/octet-stream" + + headers = { + "Access-Control-Allow-Origin": "*", + "Cache-Control": "public, max-age=604800", + } + + if kind == 'inline': + return raw(request, content_type=content_type, headers=headers) + elif kind == 'request': + data = await app.ctx.request_handler.get(request) + + mid = eccodes.codes_new_from_message(data) + data = eccodes.codes_get_array(mid, "values") + eccodes.codes_release(mid) + return raw(bytes(data.astype("