diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml index e295b2a..cb409db 100644 --- a/.github/workflows/ci-build.yml +++ b/.github/workflows/ci-build.yml @@ -23,21 +23,23 @@ jobs: with: access_token: ${{ github.token }} if: ${{github.ref != 'refs/head/main'}} - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Get pip cache dir id: pip-cache run: | python -m pip install --upgrade pip wheel - echo "dir=$(pip cache dir)" >> "$GITHUB_OUTPUT" + echo "::set-output name=dir::$(pip cache dir)" - name: pip cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ${{ steps.pip-cache.outputs.dir }} key: ${{ runner.os }}-pip-${{ hashFiles('**/setup.py') }} + restore-keys: | + ${{ runner.os }}-pip- - name: Install Xarray-Beam run: | pip install -e .[tests] diff --git a/docs/read-write.ipynb b/docs/read-write.ipynb index 509250b..05df4e0 100644 --- a/docs/read-write.ipynb +++ b/docs/read-write.ipynb @@ -1,498 +1,531 @@ { "cells": [ - { - "cell_type": "markdown", - "id": "c54a0838", - "metadata": {}, - "source": [ - "# Reading and writing data" - ] - }, - { - "cell_type": "markdown", - "id": "ed512674", - "metadata": {}, - "source": [ - "## Read datasets into chunks" - ] - }, - { - "cell_type": "markdown", - "id": "30edb09d", - "metadata": {}, - "source": [ - "There are two main options for loading an `xarray.Dataset` into Xarray-Beam. You can either [create the dataset](data-model.ipynb) from scratch or use the {py:class}`~xarray_beam.DatasetToChunks` transform starting at the root of a Beam pipeline:" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "427d32c2", - "metadata": {}, - "outputs": [], - "source": [ - "import apache_beam as beam\n", - "import numpy as np\n", - "import pandas as pd\n", - "import xarray_beam as xbeam\n", - "import xarray" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "6fe9fabe", - "metadata": {}, - "outputs": [], - "source": [ - "ds = xarray.tutorial.load_dataset('air_temperature')" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "28eb3b6e", - "metadata": {}, - "outputs": [ - { - "data": { - "application/javascript": [ + { + "cell_type": "markdown", + "id": "c54a0838", + "metadata": { + "id": "b4sQqohzBS45" + }, + "source": [ + "# Reading and writing data" + ] + }, + { + "cell_type": "markdown", + "id": "ed512674", + "metadata": { + "id": "DFgXiyk0BS45" + }, + "source": [ + "## Read datasets into chunks" + ] + }, + { + "cell_type": "markdown", + "id": "30edb09d", + "metadata": { + "id": "9u-o9LjvBS45" + }, + "source": [ + "There are two main options for loading an `xarray.Dataset` into Xarray-Beam. You can either [create the dataset](data-model.ipynb) from scratch or use the {py:class}`~xarray_beam.DatasetToChunks` transform starting at the root of a Beam pipeline:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "427d32c2", + "metadata": { + "id": "9sNXzAtkBS45" + }, + "outputs": [], + "source": [ + "import apache_beam as beam\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xarray_beam as xbeam\n", + "import xarray" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "6fe9fabe", + "metadata": { + "id": "5dvq3RHbBS45" + }, + "outputs": [], + "source": [ + "ds = xarray.tutorial.load_dataset('air_temperature')" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "28eb3b6e", + "metadata": { + "id": "ZEhCjJXDBS45" + }, + "outputs": [ + { + "data": { + "application/javascript": [ + "\n", + " if (typeof window.interactive_beam_jquery == 'undefined') {\n", + " var jqueryScript = document.createElement('script');\n", + " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", + " jqueryScript.type = 'text/javascript';\n", + " jqueryScript.onload = function() {\n", + " var datatableScript = document.createElement('script');\n", + " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", + " datatableScript.type = 'text/javascript';\n", + " datatableScript.onload = function() {\n", + " window.interactive_beam_jquery = jQuery.noConflict(true);\n", + " window.interactive_beam_jquery(document).ready(function($){\n", + " \n", + " });\n", + " }\n", + " document.head.appendChild(datatableScript);\n", + " };\n", + " document.head.appendChild(jqueryScript);\n", + " } else {\n", + " window.interactive_beam_jquery(document).ready(function($){\n", + " \n", + " });\n", + " }" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None) \u003cclass 'xarray.core.dataset.Dataset'\u003e\n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None) \u003cclass 'xarray.core.dataset.Dataset'\u003e\n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None) \u003cclass 'xarray.core.dataset.Dataset'\u003e\n" + ] + } + ], + "source": [ + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(ds, chunks={'time': 1000}) | beam.MapTuple(lambda k, v: print(k, type(v)))" + ] + }, + { + "cell_type": "markdown", + "id": "d7c825b8", + "metadata": { + "id": "k-PFVdDzBS45" + }, + "source": [ + "Importantly, xarray datasets fed into `DatasetToChunks` **can be lazy**, with data not already loaded eagerly into NumPy arrays. When you feed lazy datasets into `DatasetToChunks`, each individual chunk will be indexed and evaluated separately on Beam workers.\n", + "\n", + "This pattern allows for leveraging Xarray's builtin dataset loaders (e.g., `open_dataset()` and `open_zarr()`) for feeding arbitrarily large datasets into Xarray-Beam." + ] + }, + { + "cell_type": "markdown", + "id": "70f09baa", + "metadata": { + "id": "Pl6UGVhyBS45" + }, + "source": [ + "## Reading data from Zarr" + ] + }, + { + "cell_type": "markdown", + "id": "f7229f50", + "metadata": { + "id": "3FvrNiCjBS45" + }, + "source": [ + "[Zarr](https://zarr.readthedocs.io/) is the preferred file format for reading and writing data with Xarray-Beam, due to its excellent scalability and support inside Xarray.\n", + "\n", + "The easiest way to get good performance from Zarr into Xarray-Beam is to use {py:func}`xarray_beam.open_zarr`. This function returns a pair of values:\n", + "\n", + "1. A lazily indexed `xarray.Dataset` corresponding to the Zarr store, but not using Dask. This is exactly what you would get from `xarray.open_zarr` with `chunks=None`.\n", + "2. A dictionary mapping from dimension names to integer chunk sizes. Obtaining this information without using Dask to chunk the array requires looking at Xarray's `encoding` dictionaries or directly inspecting the Zarr store." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "10c17dc3", + "metadata": { + "id": "pfQRSu_iBS45" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u003cxarray.Dataset\u003e\n", + "Dimensions: (time: 2920, lat: 25, lon: 53)\n", + "Coordinates:\n", + " * lat (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0\n", + " * lon (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0\n", + " * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00\n", + "Data variables:\n", + " air (time, lat, lon) float32 ...\n", + "Attributes:\n", + " Conventions: COARDS\n", + " description: Data is from NMC initialized reanalysis\\n(4x/day). These a...\n", + " platform: Model\n", + " references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...\n", + " title: 4x daily NMC reanalysis (1948)\n", + "{'time': 1000, 'lat': 25, 'lon': 53}\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/shoyer/miniconda3/envs/xarray-beam/lib/python3.9/site-packages/xarray/core/dataset.py:2060: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs\n", + " return to_zarr( # type: ignore\n" + ] + } + ], + "source": [ + "# write data into the distributed Zarr format\n", + "ds.chunk({'time': 1000}).to_zarr('example-data.zarr', mode='w')\n", "\n", - " if (typeof window.interactive_beam_jquery == 'undefined') {\n", - " var jqueryScript = document.createElement('script');\n", - " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", - " jqueryScript.type = 'text/javascript';\n", - " jqueryScript.onload = function() {\n", - " var datatableScript = document.createElement('script');\n", - " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", - " datatableScript.type = 'text/javascript';\n", - " datatableScript.onload = function() {\n", - " window.interactive_beam_jquery = jQuery.noConflict(true);\n", - " window.interactive_beam_jquery(document).ready(function($){\n", - " \n", - " });\n", - " }\n", - " document.head.appendChild(datatableScript);\n", - " };\n", - " document.head.appendChild(jqueryScript);\n", - " } else {\n", - " window.interactive_beam_jquery(document).ready(function($){\n", - " \n", - " });\n", - " }" - ] + "# read it using xarray-beam's utilities\n", + "ds_on_disk, chunks = xbeam.open_zarr('example-data.zarr')\n", + "print(ds_on_disk)\n", + "print(chunks)" + ] + }, + { + "cell_type": "markdown", + "id": "83a1833a", + "metadata": { + "id": "_vDX8aenBS45" }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None) \n", - "Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None) \n", - "Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None) \n" + "source": [ + "Conveniently, this is exactly the information you need for feeding into {py:class}`~xarray_beam.DatasetToChunks` to write an Xarray-Beam pipeline:" ] - } - ], - "source": [ - "with beam.Pipeline() as p:\n", - " p | xbeam.DatasetToChunks(ds, chunks={'time': 1000}) | beam.MapTuple(lambda k, v: print(k, type(v)))" - ] - }, - { - "cell_type": "markdown", - "id": "d7c825b8", - "metadata": {}, - "source": [ - "Importantly, xarray datasets fed into `DatasetToChunks` **can be lazy**, with data not already loaded eagerly into NumPy arrays. When you feed lazy datasets into `DatasetToChunks`, each individual chunk will be indexed and evaluated separately on Beam workers.\n", - "\n", - "This pattern allows for leveraging Xarray's builtin dataset loaders (e.g., `open_dataset()` and `open_zarr()`) for feeding arbitrarily large datasets into Xarray-Beam." - ] - }, - { - "cell_type": "markdown", - "id": "70f09baa", - "metadata": {}, - "source": [ - "## Reading data from Zarr" - ] - }, - { - "cell_type": "markdown", - "id": "f7229f50", - "metadata": {}, - "source": [ - "[Zarr](https://zarr.readthedocs.io/) is the preferred file format for reading and writing data with Xarray-Beam, due to its excellent scalability and support inside Xarray.\n", - "\n", - "The easiest way to get good performance from Zarr into Xarray-Beam is to use {py:func}`xarray_beam.open_zarr`. This function returns a pair of values:\n", - "\n", - "1. A lazily indexed `xarray.Dataset` corresponding to the Zarr store, but not using Dask. This is exactly what you would get from `xarray.open_zarr` with `chunks=None`.\n", - "2. A dictionary mapping from dimension names to integer chunk sizes. Obtaining this information without using Dask to chunk the array requires looking at Xarray's `encoding` dictionaries or directly inspecting the Zarr store." - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "10c17dc3", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "Dimensions: (time: 2920, lat: 25, lon: 53)\n", - "Coordinates:\n", - " * lat (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0\n", - " * lon (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0\n", - " * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00\n", - "Data variables:\n", - " air (time, lat, lon) float32 ...\n", - "Attributes:\n", - " Conventions: COARDS\n", - " description: Data is from NMC initialized reanalysis\\n(4x/day). These a...\n", - " platform: Model\n", - " references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...\n", - " title: 4x daily NMC reanalysis (1948)\n", - "{'time': 1000, 'lat': 25, 'lon': 53}\n" + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "7b76ba13", + "metadata": { + "id": "U6imtj5CBS45" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None) \u003cclass 'xarray.core.dataset.Dataset'\u003e\n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None) \u003cclass 'xarray.core.dataset.Dataset'\u003e\n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None) \u003cclass 'xarray.core.dataset.Dataset'\u003e\n" + ] + } + ], + "source": [ + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(ds_on_disk, chunks) | beam.MapTuple(lambda k, v: print(k, type(v)))" ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/shoyer/miniconda3/envs/xarray-beam/lib/python3.9/site-packages/xarray/core/dataset.py:2060: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs\n", - " return to_zarr( # type: ignore\n" + }, + { + "cell_type": "markdown", + "id": "169f838c", + "metadata": { + "id": "y7H5uTsxBS45" + }, + "source": [ + "## Writing data to Zarr" ] - } - ], - "source": [ - "# write data into the distributed Zarr format\n", - "ds.chunk({'time': 1000}).to_zarr('example-data.zarr', mode='w')\n", - "\n", - "# read it using xarray-beam's utilities\n", - "ds_on_disk, chunks = xbeam.open_zarr('example-data.zarr')\n", - "print(ds_on_disk)\n", - "print(chunks)" - ] - }, - { - "cell_type": "markdown", - "id": "83a1833a", - "metadata": {}, - "source": [ - "Conveniently, this is exactly the information you need for feeding into {py:class}`~xarray_beam.DatasetToChunks` to write an Xarray-Beam pipeline:" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "7b76ba13", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None) \n", - "Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None) \n", - "Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None) \n" + }, + { + "cell_type": "markdown", + "id": "5bcf8ad1", + "metadata": { + "id": "re3c6-NTBS45" + }, + "source": [ + "{py:class}`~xarray_beam.ChunksToZarr` is Xarray-Beam's API for saving chunks into a Zarr store." ] - } - ], - "source": [ - "with beam.Pipeline() as p:\n", - " p | xbeam.DatasetToChunks(ds_on_disk, chunks) | beam.MapTuple(lambda k, v: print(k, type(v)))" - ] - }, - { - "cell_type": "markdown", - "id": "169f838c", - "metadata": {}, - "source": [ - "## Writing data to Zarr" - ] - }, - { - "cell_type": "markdown", - "id": "5bcf8ad1", - "metadata": {}, - "source": [ - "{py:class}`~xarray_beam.ChunksToZarr` is Xarray-Beam's API for saving chunks into a Zarr store. \n", - "\n", - "For small datasets where you aren't concerned about extra overhead for writing data, you can get started just using it directly:" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "23b74c9f", - "metadata": {}, - "outputs": [], - "source": [ - "with beam.Pipeline() as p:\n", - " p | xbeam.DatasetToChunks(ds_on_disk, chunks) | xbeam.ChunksToZarr('example-data-v2.zarr')" - ] - }, - { - "cell_type": "markdown", - "id": "fd18c764", - "metadata": {}, - "source": [ - "The resulting Zarr dataset will have array chunks matching the in-memory chunks from your Beam PCollection. If you want different chunks on disk, consider inserting [transforms to rechunk](rechunking.ipynb) before exporting to Zarr.\n", - "\n", - "For larger datasets, read on -- you'll want to use a template." - ] - }, - { - "cell_type": "markdown", - "id": "012d88ee", - "metadata": {}, - "source": [ - "### Creating templates\n", - "\n", - "By default, {py:class}`ChunksToZarr` needs to evaluate and combine the entire distributed dataset in order to determine overall Zarr metadata (e.g., array names, shapes, dtypes and attributes). This is fine for relatively small datasets, but can entail significant additional communication and storage costs for large datasets.\n", - "\n", - "The optional `template` argument allows for prespecifying structure of the full on disk dataset in the form of another lazy `xarray.Dataset`. Lazy templates specify the structure of the array data that will be written by the PTransform. Array values that may be written as part of the Beam pipeline are indicated by using lazily computed Dask arrays to store the data.\n", - "\n", - "The easiest way to make a template is with {py:func}`xarray_beam.make_template` helper, which transforms a dataset into another dataset where every value is lazy:" - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "id": "bbf65917", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "Dimensions: (time: 2920, lat: 25, lon: 53)\n", - "Coordinates:\n", - " * lat (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0\n", - " * lon (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0\n", - " * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00\n", - "Data variables:\n", - " air (time, lat, lon) float32 dask.array\n", - "Attributes:\n", - " Conventions: COARDS\n", - " description: Data is from NMC initialized reanalysis\\n(4x/day). These a...\n", - " platform: Model\n", - " references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...\n", - " title: 4x daily NMC reanalysis (1948)\n" + }, + { + "cell_type": "markdown", + "id": "012d88ee", + "metadata": { + "id": "qFsuz2R1BS45" + }, + "source": [ + "### Creating templates\n", + "\n", + "The `template` argument to {py:class}`ChunksToZarr` prespecifies the structure of the full on disk dataset in the form of an lazy `xarray.Dataset`, so it can be filled in one chunk at a time in parallel using Beam. Lazy templates specify the structure of the array data that will be written by the PTransform. Array values that may be written as part of the Beam pipeline are indicated by using lazily computed Dask arrays to store the data.\n", + "\n", + "The easiest way to make a template is with {py:func}`xarray_beam.make_template` helper, which transforms a dataset into another dataset where every value is lazy:" ] - } - ], - "source": [ - "ds = xarray.open_zarr('example-data.zarr', chunks=None)\n", - "template = xbeam.make_template(ds)\n", - "print(template)" - ] - }, - { - "cell_type": "markdown", - "id": "0b21fded", - "metadata": {}, - "source": [ - "```{tip}\n", - "Under the covers, {py:func}`~xarray_beam.make_template` has a very simple implementation, equivalent to `xarray.zeros_like(ds.chunk(-1))`.\n", - "```\n", - "\n", - "\"Template\" datasets are not only useful for expressing the desired structures of Zarr stores, but also because every builtin Xarray operation is entirely lazy on Datasets consistenting of Dask arrays. This makes it relatively straightforward to build up a new Dataset with the required variables and dimension, e.g.," - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "id": "18eb8a29", - "metadata": {}, - "outputs": [], - "source": [ - "# remove the \"time\" dimension, and insert a new \"sample\" dimension\n", - "new_template = template.isel(time=0, drop=True).expand_dims(sample=np.arange(10))" - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "id": "eecfc041", - "metadata": {}, - "outputs": [], - "source": [ - "# setup a template for spatially regridding along latitude and longitude\n", - "new_longitudes = np.linspace(0, 100, num=8)\n", - "new_latitudes = np.linspace(30, 80, num=7)\n", - "new_template = template.head(lat=7, lon=8).assign_coords(lat=new_latitudes, lon=new_longitudes)" - ] - }, - { - "cell_type": "markdown", - "id": "e3079b5d", - "metadata": {}, - "source": [ - "### End to end examples\n", - "\n", - "If you supply a `template`, you should also supply the `zarr_chunks` argument in order to ensure that the data ends up appropriately chunked in the Zarr store. A complete example of reading and writing data from a Zarr store typically looks something like:" - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "id": "b6bd8cb7", - "metadata": {}, - "outputs": [], - "source": [ - "ds_on_disk, chunks = xbeam.open_zarr('example-data.zarr')\n", - "\n", - "template = xbeam.make_template(ds_on_disk)\n", - "\n", - "with beam.Pipeline() as p:\n", - " (\n", - " p\n", - " | xbeam.DatasetToChunks(ds_on_disk, chunks)\n", - " # insert additional transforms here\n", - " | xbeam.ChunksToZarr('example-data-v3.zarr', template, chunks)\n", - " )" - ] - }, - { - "cell_type": "markdown", - "id": "d06f806b", - "metadata": {}, - "source": [ - "If you don't have an existing Dataset to start with, a common pattern is to reuse the same function you'll use to load data for each chunk, e.g.," - ] - }, - { - "cell_type": "code", - "execution_count": 11, - "id": "5e161959", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '' in '[11]: ConsolidateChunks/GroupByTempKeys'. \n", - "WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '' in '[11]: ConsolidateChunks/GroupByTempKeys'. \n" + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "bbf65917", + "metadata": { + "id": "M2TMQm9hBS45" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u003cxarray.Dataset\u003e\n", + "Dimensions: (time: 2920, lat: 25, lon: 53)\n", + "Coordinates:\n", + " * lat (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0\n", + " * lon (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0\n", + " * time (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00\n", + "Data variables:\n", + " air (time, lat, lon) float32 dask.array\u003cchunksize=(2920, 25, 53), meta=np.ndarray\u003e\n", + "Attributes:\n", + " Conventions: COARDS\n", + " description: Data is from NMC initialized reanalysis\\n(4x/day). These a...\n", + " platform: Model\n", + " references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...\n", + " title: 4x daily NMC reanalysis (1948)\n" + ] + } + ], + "source": [ + "ds = xarray.open_zarr('example-data.zarr', chunks=None)\n", + "template = xbeam.make_template(ds)\n", + "print(template)" ] - } - ], - "source": [ - "all_days = pd.date_range('2013-01-01', '2014-01-01', freq='1D')\n", - "\n", - "def load_one_example(time: pd.Timestamp) -> tuple[xbeam.Key, xarray.Dataset]:\n", - " key = xbeam.Key({'time': (time - all_days[0]).days})\n", - " dataset = ds.sel(time=[time]) # replace with your code to create one example\n", - " return key, dataset\n", - "\n", - "_, example = load_one_example(all_days[0])\n", - "template = xbeam.make_template(example).squeeze('time', drop=True).expand_dims(time=all_days)\n", - "zarr_chunks = {'time': 100} # desired chunking along \"time\", e.g., for more efficient storage in Zarr\n", - "\n", - "with beam.Pipeline() as p:\n", - " (\n", - " p\n", - " | beam.Create(all_days)\n", - " | beam.Map(load_one_example)\n", - " | xbeam.ConsolidateChunks(zarr_chunks)\n", - " | xbeam.ChunksToZarr('example-data-v4.zarr', template, zarr_chunks)\n", - " )" - ] - }, - { - "cell_type": "markdown", - "id": "5d5c2e0f", - "metadata": {}, - "source": [ - "For more examples of how to manipulate templates and read/write data with Zarr, see the end-to-end [ERA5 climatology](https://github.com/google/xarray-beam/blob/main/examples/era5_climatology.py) and [ERA5 rechunk](https://github.com/google/xarray-beam/blob/main/examples/era5_rechunk.py) examples." - ] - }, - { - "cell_type": "markdown", - "id": "9613b48c", - "metadata": {}, - "source": [ - "## Tips for custom data loaders" - ] - }, - { - "cell_type": "markdown", - "id": "d2500f6e", - "metadata": {}, - "source": [ - "If you use Xarray's file opening utilities instead of {py:class}`xarray_beam.open_zarr`, you need to take some care to get good performance when processing very large numbers of chunks (hundreds of thousands).\n", - "\n", - "The main tip is to set `chunks=None` when opening datasets and then _explicitly_ provide chunks in `DatasetToChunks` -- exactly the pattern facilitated by `xarray_beam.open_zarr`.\n", - "\n", - "`chunks=None` tells Xarray to use its builtin lazy indexing machinery, instead of using Dask. This is advantageous because datasets using Xarray's lazy indexing are serialized much more compactly (via [pickle](https://docs.python.org/3/library/pickle.html)) when passed into Beam transforms." - ] - }, - { - "cell_type": "markdown", - "id": "7d3ec100", - "metadata": {}, - "source": [ - "Alternatively, you can pass in lazy datasets [using dask](http://xarray.pydata.org/en/stable/user-guide/dask.html). In this case, you don't need to explicitly supply `chunks` to `DatasetToChunks`:" - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "id": "d3f4f0a0", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None) \n", - "Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None) \n", - "Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None) \n" + }, + { + "cell_type": "markdown", + "id": "0b21fded", + "metadata": { + "id": "25vzYlbWBS45" + }, + "source": [ + "```{tip}\n", + "Under the covers, {py:func}`~xarray_beam.make_template` has a very simple implementation, equivalent to `xarray.zeros_like(ds.chunk(-1))`.\n", + "```\n", + "\n", + "\"Template\" datasets are not only useful for expressing the desired structures of Zarr stores, but also because every builtin Xarray operation is entirely lazy on Datasets consistenting of Dask arrays. This makes it relatively straightforward to build up a new Dataset with the required variables and dimension, e.g.," ] - } - ], - "source": [ - "on_disk = xarray.open_zarr('example-data.zarr', chunks={'time': 1000})\n", - "\n", - "with beam.Pipeline() as p:\n", - " p | xbeam.DatasetToChunks(on_disk) | beam.MapTuple(lambda k, v: print(k, type(v)))" - ] - }, - { - "cell_type": "markdown", - "id": "1a9a5810", - "metadata": {}, - "source": [ - "Dask's lazy evaluation system is much more general than Xarray's lazy indexing, so as long as resulting dataset can be independently evaluated in each chunk using Dask can be a very convenient way to setup computation for Xarray-Beam.\n", - "\n", - "Unfortunately, it doesn't scale as well. In particular, the overhead of pickling large Dask graphs for passing to Beam workers can be prohibitive for large (multiple TB) datasets with millions of chunks. There are [plans to eventually fix this in Dask](https://github.com/dask/distributed/issues/5581), but in the meantime, prefer the pattern of using Dask arrays with single chunks (e.g., as created by `make_template`), with separate explicit specification of array chunks." - ] - } + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "18eb8a29", + "metadata": { + "id": "q2ns7Ep3BS45" + }, + "outputs": [], + "source": [ + "# remove the \"time\" dimension, and insert a new \"sample\" dimension\n", + "new_template = template.isel(time=0, drop=True).expand_dims(sample=np.arange(10))" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "eecfc041", + "metadata": { + "id": "izu99U1XBS45" + }, + "outputs": [], + "source": [ + "# setup a template for spatially regridding along latitude and longitude\n", + "new_longitudes = np.linspace(0, 100, num=8)\n", + "new_latitudes = np.linspace(30, 80, num=7)\n", + "new_template = template.head(lat=7, lon=8).assign_coords(lat=new_latitudes, lon=new_longitudes)" + ] + }, + { + "cell_type": "markdown", + "id": "e3079b5d", + "metadata": { + "id": "UJNKRbvUBS45" + }, + "source": [ + "### End to end examples\n", + "\n", + "It is also a good idea to supply the `zarr_chunks` argument in order to ensure that the data ends up appropriately chunked in the Zarr store. A complete example of reading and writing data from a Zarr store typically looks something like:" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "b6bd8cb7", + "metadata": { + "id": "i3Sd5CnKBS45" + }, + "outputs": [], + "source": [ + "ds_on_disk, chunks = xbeam.open_zarr('example-data.zarr')\n", + "\n", + "template = xbeam.make_template(ds_on_disk)\n", + "\n", + "with beam.Pipeline() as p:\n", + " (\n", + " p\n", + " | xbeam.DatasetToChunks(ds_on_disk, chunks)\n", + " # insert additional transforms here\n", + " | xbeam.ChunksToZarr('example-data-v3.zarr', template, chunks)\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "d06f806b", + "metadata": { + "id": "Uu-S6fehBS45" + }, + "source": [ + "If you don't have an existing Dataset to start with, a common pattern is to reuse the same function you'll use to load data for each chunk, e.g.," + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "5e161959", + "metadata": { + "id": "WQkUVWfwBS45" + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '\u003cclass 'xarray_beam._src.core.Key'\u003e' in '[11]: ConsolidateChunks/GroupByTempKeys'. \n", + "WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '\u003cclass 'xarray_beam._src.core.Key'\u003e' in '[11]: ConsolidateChunks/GroupByTempKeys'. \n" + ] + } + ], + "source": [ + "all_days = pd.date_range('2013-01-01', '2014-01-01', freq='1D')\n", + "\n", + "def load_one_example(time: pd.Timestamp) -\u003e tuple[xbeam.Key, xarray.Dataset]:\n", + " key = xbeam.Key({'time': (time - all_days[0]).days})\n", + " dataset = ds.sel(time=[time]) # replace with your code to create one example\n", + " return key, dataset\n", + "\n", + "_, example = load_one_example(all_days[0])\n", + "template = xbeam.make_template(example).squeeze('time', drop=True).expand_dims(time=all_days)\n", + "zarr_chunks = {'time': 100} # desired chunking along \"time\", e.g., for more efficient storage in Zarr\n", + "\n", + "with beam.Pipeline() as p:\n", + " (\n", + " p\n", + " | beam.Create(all_days)\n", + " | beam.Map(load_one_example)\n", + " | xbeam.ConsolidateChunks(zarr_chunks)\n", + " | xbeam.ChunksToZarr('example-data-v4.zarr', template, zarr_chunks)\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "5d5c2e0f", + "metadata": { + "id": "upG6zLQ5BS45" + }, + "source": [ + "For more examples of how to manipulate templates and read/write data with Zarr, see the end-to-end [ERA5 climatology](https://github.com/google/xarray-beam/blob/main/examples/era5_climatology.py) and [ERA5 rechunk](https://github.com/google/xarray-beam/blob/main/examples/era5_rechunk.py) examples." + ] + }, + { + "cell_type": "markdown", + "id": "9613b48c", + "metadata": { + "id": "CQoRNyAiBS45" + }, + "source": [ + "## Tips for custom data loaders" + ] + }, + { + "cell_type": "markdown", + "id": "d2500f6e", + "metadata": { + "id": "CVthTvlIBS45" + }, + "source": [ + "If you use Xarray's file opening utilities instead of {py:class}`xarray_beam.open_zarr`, you need to take some care to get good performance when processing very large numbers of chunks (hundreds of thousands).\n", + "\n", + "The main tip is to set `chunks=None` when opening datasets and then _explicitly_ provide chunks in `DatasetToChunks` -- exactly the pattern facilitated by `xarray_beam.open_zarr`.\n", + "\n", + "`chunks=None` tells Xarray to use its builtin lazy indexing machinery, instead of using Dask. This is advantageous because datasets using Xarray's lazy indexing are serialized much more compactly (via [pickle](https://docs.python.org/3/library/pickle.html)) when passed into Beam transforms." + ] + }, + { + "cell_type": "markdown", + "id": "7d3ec100", + "metadata": { + "id": "f6SEr2VhBS45" + }, + "source": [ + "Alternatively, you can pass in lazy datasets [using dask](http://xarray.pydata.org/en/stable/user-guide/dask.html). In this case, you don't need to explicitly supply `chunks` to `DatasetToChunks`:" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "d3f4f0a0", + "metadata": { + "id": "4MZtgkz0BS45" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None) \u003cclass 'xarray.core.dataset.Dataset'\u003e\n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None) \u003cclass 'xarray.core.dataset.Dataset'\u003e\n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None) \u003cclass 'xarray.core.dataset.Dataset'\u003e\n" + ] + } + ], + "source": [ + "on_disk = xarray.open_zarr('example-data.zarr', chunks={'time': 1000})\n", + "\n", + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(on_disk) | beam.MapTuple(lambda k, v: print(k, type(v)))" + ] + }, + { + "cell_type": "markdown", + "id": "1a9a5810", + "metadata": { + "id": "30mhyfQrBS45" + }, + "source": [ + "Dask's lazy evaluation system is much more general than Xarray's lazy indexing, so as long as resulting dataset can be independently evaluated in each chunk using Dask can be a very convenient way to setup computation for Xarray-Beam.\n", + "\n", + "Unfortunately, it doesn't scale as well. In particular, the overhead of pickling large Dask graphs for passing to Beam workers can be prohibitive for large (multiple TB) datasets with millions of chunks. There are [plans to eventually fix this in Dask](https://github.com/dask/distributed/issues/5581), but in the meantime, prefer the pattern of using Dask arrays with single chunks (e.g., as created by `make_template`), with separate explicit specification of array chunks." + ] + } ], "metadata": { - "celltoolbar": "Tags", - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 + "celltoolbar": "Tags", + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.13" - } + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + } }, "nbformat": 4, "nbformat_minor": 5 - } \ No newline at end of file +} diff --git a/setup.py b/setup.py index 823e6f3..b7fd929 100644 --- a/setup.py +++ b/setup.py @@ -41,7 +41,7 @@ setuptools.setup( name='xarray-beam', - version='0.6.4', # keep in sync with __init__.py + version='0.7.0', # keep in sync with __init__.py license='Apache 2.0', author='Google LLC', author_email='noreply@google.com', diff --git a/xarray_beam/__init__.py b/xarray_beam/__init__.py index b2452b9..5cbb3fe 100644 --- a/xarray_beam/__init__.py +++ b/xarray_beam/__init__.py @@ -51,4 +51,4 @@ DatasetToZarr, ) -__version__ = '0.6.4' # keep in sync with setup.py +__version__ = '0.7.0' # keep in sync with setup.py diff --git a/xarray_beam/_src/zarr.py b/xarray_beam/_src/zarr.py index 09b9018..67cc0ac 100644 --- a/xarray_beam/_src/zarr.py +++ b/xarray_beam/_src/zarr.py @@ -28,6 +28,7 @@ Tuple, Union, ) +import warnings import apache_beam as beam import dask @@ -393,18 +394,20 @@ def __init__( matches the structure of the virtual combined dataset corresponding to the chunks fed into this PTransform. One or more variables are expected to be "chunked" with Dask, and will only have their metadata written to - Zarr without array values. Three types of inputs are supported: + Zarr without array values. Two types of inputs are supported: 1. If `template` is an xarray.Dataset, the Zarr store is setup eagerly. 2. If `template` is a beam.pvalue.AsSingleton object representing the result of a prior step in a Beam pipeline, the Zarr store is setup as part of the pipeline. - 3. Finally, if `template` is None, then the structure of the desired - Zarr store is discovered automatically by inspecting the inputs into - ChunksToZarr. This is an easy option, but can be quite expensive/slow - for large datasets -- Beam runners will typically handle this by - dumping a temporary copy of the complete dataset to disk. For best - performance, supply the template explicitly (1 or 2). + + A `template` of `None` is also supported only for backwards + compatibility, in which case Xarray-Beam will attempt to discover the + structure of the desired Zarr store automatically by inspecting the + inputs into. THIS OPTION IS NOT RECOMMENDED. Due to a race condition + (https://github.com/google/xarray-beam/issues/85), it can result in + writing corrupted data Zarr stores, particularly when they contain many + variables. It can also be quite slow for large datasets. zarr_chunks: chunking scheme to use for Zarr. If set, overrides the chunking scheme on already chunked arrays in template. num_threads: the number of Dataset chunks to write in parallel per worker. @@ -431,6 +434,14 @@ def __init__( elif template is None: if not needs_setup: raise ValueError('setup required if template is not supplied') + warnings.warn( + 'No template provided in xarray_beam.ChunksToZarr. This will ' + 'sometimes succeed, but can also result in writing silently ' + 'incomplete data due to a race condition! This option will be ' + 'removed in the future', + FutureWarning, + stacklevel=2, + ) # Setup happens later, in expand(). else: raise TypeError( diff --git a/xarray_beam/_src/zarr_test.py b/xarray_beam/_src/zarr_test.py index 0936bed..b455ccd 100644 --- a/xarray_beam/_src/zarr_test.py +++ b/xarray_beam/_src/zarr_test.py @@ -114,7 +114,8 @@ def test_chunks_to_zarr(self): ] with self.subTest('no template'): temp_dir = self.create_tempdir().full_path - inputs | xbeam.ChunksToZarr(temp_dir) + with self.assertWarnsRegex(FutureWarning, 'No template provided'): + inputs | xbeam.ChunksToZarr(temp_dir, template=None) result = xarray.open_zarr(temp_dir, consolidated=True) xarray.testing.assert_identical(dataset, result) with self.subTest('with template'): @@ -138,7 +139,10 @@ def test_chunks_to_zarr(self): with self.subTest('with zarr_chunks and no template'): temp_dir = self.create_tempdir().full_path zarr_chunks = {'x': 3} - inputs | xbeam.ChunksToZarr(temp_dir, zarr_chunks=zarr_chunks) + with self.assertWarnsRegex(FutureWarning, 'No template provided'): + inputs | xbeam.ChunksToZarr( + temp_dir, template=None, zarr_chunks=zarr_chunks + ) result = xarray.open_zarr(temp_dir, consolidated=True) xarray.testing.assert_identical(dataset, result) self.assertEqual(result.chunks, {'x': (3, 3)}) @@ -183,7 +187,8 @@ def test_multiple_vars_chunks_to_zarr(self): ] with self.subTest('no template'): temp_dir = self.create_tempdir().full_path - inputs | xbeam.ChunksToZarr(temp_dir) + with self.assertWarnsRegex(FutureWarning, 'No template provided'): + inputs | xbeam.ChunksToZarr(temp_dir, template=None) result = xarray.open_zarr(temp_dir, consolidated=True) xarray.testing.assert_identical(dataset, result) with self.subTest('with template'): @@ -210,19 +215,21 @@ def test_2d_chunks_to_zarr(self, coords): with self.subTest('partial key'): inputs = [(xbeam.Key({'x': 0}), dataset)] temp_dir = self.create_tempdir().full_path - inputs | xbeam.ChunksToZarr(temp_dir) + inputs | xbeam.ChunksToZarr(temp_dir, template=dataset.chunk()) result = xarray.open_zarr(temp_dir, consolidated=True) xarray.testing.assert_identical(dataset, result) with self.subTest('split along partial key'): inputs = [(xbeam.Key({'x': 0}), dataset)] temp_dir = self.create_tempdir().full_path - inputs | xbeam.SplitChunks({'x': 1}) | xbeam.ChunksToZarr(temp_dir) + inputs | xbeam.SplitChunks({'x': 1}) | xbeam.ChunksToZarr( + temp_dir, template=dataset.chunk({'x': 1}) + ) result = xarray.open_zarr(temp_dir, consolidated=True) xarray.testing.assert_identical(dataset, result) with self.subTest('full key'): inputs = [(xbeam.Key({'x': 0, 'y': 0}), dataset)] temp_dir = self.create_tempdir().full_path - inputs | xbeam.ChunksToZarr(temp_dir) + inputs | xbeam.ChunksToZarr(temp_dir, template=dataset.chunk()) result = xarray.open_zarr(temp_dir, consolidated=True) xarray.testing.assert_identical(dataset, result) @@ -254,7 +261,7 @@ def test_validate_zarr_chunk_accepts_partial_key(self): coords={'x': np.arange(3), 'y': np.arange(2)}, ) # Should not raise an exception: - xbeam._src.zarr.validate_zarr_chunk( + xbeam.validate_zarr_chunk( key=xbeam.Key({'x': 0}), chunk=dataset, template=dataset.chunk(), @@ -262,8 +269,9 @@ def test_validate_zarr_chunk_accepts_partial_key(self): ) def test_to_zarr_wrong_multiple_error(self): + ds = xarray.Dataset({'foo': ('x', np.arange(6))}) inputs = [ - (xbeam.Key({'x': 3}), xarray.Dataset({'foo': ('x', np.arange(3, 6))})), + (xbeam.Key({'x': 3}), ds.tail(3)), ] temp_dir = self.create_tempdir().full_path with self.assertRaisesRegex( @@ -273,7 +281,9 @@ def test_to_zarr_wrong_multiple_error(self): "chunks {'x': 4}" ), ): - inputs | xbeam.ChunksToZarr(temp_dir, zarr_chunks={'x': 4}) + inputs | xbeam.ChunksToZarr( + temp_dir, template=ds.chunk(4), zarr_chunks={'x': 4} + ) def test_to_zarr_needs_consolidation_error(self): ds = xarray.Dataset({'foo': ('x', np.arange(6))}) @@ -285,7 +295,9 @@ def test_to_zarr_needs_consolidation_error(self): with self.assertRaisesRegex( ValueError, 'chunk is smaller than zarr chunks' ): - inputs | xbeam.ChunksToZarr(temp_dir, zarr_chunks={'x': 6}) + inputs | xbeam.ChunksToZarr( + temp_dir, template=ds.chunk(), zarr_chunks={'x': 6} + ) with self.assertRaisesRegex( ValueError, 'chunk is smaller than zarr chunks' ): @@ -351,7 +363,9 @@ def test_chunks_to_zarr_docs_demo(self): ( test_util.EagerPipeline() | xbeam.DatasetToChunks(ds, chunks) - | xbeam.ChunksToZarr(temp_dir) + | xbeam.ChunksToZarr( + temp_dir, template=xbeam.make_template(ds), zarr_chunks=chunks + ) ) result = xarray.open_zarr(temp_dir) xarray.testing.assert_identical(result, ds)