From 697eb52753b480ee0b26b94609696322ccf0927c Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Sun, 2 Nov 2025 23:55:08 +0100 Subject: [PATCH 1/8] (feat): first pass remove dtype + fill val handling per chunk --- python/zarrs/pipeline.py | 5 +- python/zarrs/utils.py | 6 +- src/chunk_item.rs | 113 ++++++----------- src/concurrency.rs | 12 +- src/lib.rs | 263 ++++++++++++++++++++++----------------- src/utils.rs | 8 +- 6 files changed, 203 insertions(+), 204 deletions(-) diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index a90d127..3ffc7df 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -29,6 +29,7 @@ CollapsedDimensionError, DiscontiguousArrayError, FillValueNoneError, + get_implicit_fill_value, make_chunk_info_for_rust_with_indices, ) @@ -62,8 +63,10 @@ def get_codec_pipeline_impl( ), num_threads=config.get("threading.max_workers", None), direct_io=config.get("codec_pipeline.direct_io", False), + fill_value=get_implicit_fill_value(metadata.dtype, metadata.fill_value), + dtype_str=str(metadata.dtype.to_native_dtype()), ) - except TypeError as e: + except (TypeError, ValueError) as e: warn( f"Array is unsupported by ZarrsCodecPipeline: {e}", category=UserWarning, diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index 114d30c..8318cc3 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -10,7 +10,7 @@ from zarr.core.array_spec import ArraySpec from zarr.core.indexing import SelectorTuple, is_integer -from zarrs._internal import Basic, WithSubset +from zarrs._internal import WithSubset if TYPE_CHECKING: from collections.abc import Iterable @@ -178,7 +178,6 @@ def make_chunk_info_for_rust_with_indices( chunk_spec.config, chunk_spec.prototype, ) - chunk_info = Basic(byte_getter, chunk_spec) out_selection_as_slices = selector_tuple_to_slice_selection(out_selection) chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection) shape_chunk_selection_slices = get_shape_for_selector( @@ -196,8 +195,9 @@ def make_chunk_info_for_rust_with_indices( ) chunk_info_with_indices.append( WithSubset( - chunk_info, + key=byte_getter.path, chunk_subset=chunk_selection_as_slices, + chunk_shape=chunk_spec.shape, subset=out_selection_as_slices, shape=shape, ) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 98e55ca..da2e2f4 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -2,7 +2,7 @@ use std::num::NonZeroU64; use pyo3::{ Bound, PyAny, PyErr, PyResult, - exceptions::{PyIndexError, PyRuntimeError, PyValueError}, + exceptions::{PyIndexError, PyValueError}, pyclass, pymethods, types::{PyAnyMethods, PyBytes, PyBytesMethods, PyInt, PySlice, PySliceMethods as _}, }; @@ -10,26 +10,12 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use zarrs::{ array::{ChunkRepresentation, DataType, FillValue}, array_subset::ArraySubset, - metadata::v3::MetadataV3, storage::StoreKey, }; use crate::utils::PyErrExt; -pub(crate) trait ChunksItem { - fn key(&self) -> &StoreKey; - fn representation(&self) -> &ChunkRepresentation; -} - -#[derive(Clone)] -#[gen_stub_pyclass] -#[pyclass] -pub(crate) struct Basic { - key: StoreKey, - representation: ChunkRepresentation, -} - -fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult> { +pub fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult> { if dtype == "string" { // Match zarr-python 2.x.x string fill value behaviour with a 0 fill value // See https://github.com/zarr-developers/zarr-python/issues/2792#issuecomment-2644362122 @@ -55,40 +41,34 @@ fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult, chunk_spec: &Bound<'_, PyAny>) -> PyResult { - let path: String = byte_interface.getattr("path")?.extract()?; - - let chunk_shape = chunk_spec.getattr("shape")?.extract()?; - let mut dtype: String = chunk_spec - .getattr("dtype")? - .call_method0("to_native_dtype")? - .call_method0("__str__")? - .extract()?; - if dtype == "object" { - // zarrs doesn't understand `object` which is the output of `np.dtype("|O").__str__()` - // but maps it to "string" internally https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L288 - dtype = String::from("string"); - } - let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?; - let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?; - Ok(Self { - key: StoreKey::new(path).map_py_err::()?, - representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?, - }) - } -} - #[derive(Clone)] #[gen_stub_pyclass] #[pyclass] pub(crate) struct WithSubset { - pub item: Basic, + pub key: StoreKey, pub chunk_subset: ArraySubset, pub subset: ArraySubset, + chunk_shape: Vec, +} + +pub trait WithRepresentations { + fn with_representations( + self, + data_type: DataType, + fill_value: FillValue, + ) -> PyResult>; +} + +impl WithRepresentations for Vec { + fn with_representations( + self, + data_type: DataType, + fill_value: FillValue, + ) -> PyResult> { + self.into_iter() + .map(|f| Ok((f.representation(data_type.clone(), fill_value.clone())?, f))) + .collect::>>() + } } #[gen_stub_pymethods] @@ -97,13 +77,13 @@ impl WithSubset { #[new] #[allow(clippy::needless_pass_by_value)] fn new( - item: Basic, + key: String, chunk_subset: Vec>, + chunk_shape: Vec, subset: Vec>, shape: Vec, ) -> PyResult { - let chunk_subset = - selection_to_array_subset(&chunk_subset, &item.representation.shape_u64())?; + let chunk_subset = selection_to_array_subset(&chunk_subset, &chunk_shape)?; let subset = selection_to_array_subset(&subset, &shape)?; // Check that subset and chunk_subset have the same number of elements. // This permits broadcasting of a constant input. @@ -112,50 +92,37 @@ impl WithSubset { "the size of the chunk subset {chunk_subset} and input/output subset {subset} are incompatible", ))); } + Ok(Self { - item, + key: StoreKey::new(key).map_py_err::()?, chunk_subset, subset, + chunk_shape, }) } } -impl ChunksItem for Basic { - fn key(&self) -> &StoreKey { - &self.key - } - fn representation(&self) -> &ChunkRepresentation { - &self.representation - } -} - -impl ChunksItem for WithSubset { - fn key(&self) -> &StoreKey { - &self.item.key - } - fn representation(&self) -> &ChunkRepresentation { - &self.item.representation +impl WithSubset { + fn representation( + &self, + dtype: DataType, + fill_value: FillValue, + ) -> PyResult { + get_chunk_representation(self.chunk_shape.clone(), dtype, fill_value) } } fn get_chunk_representation( chunk_shape: Vec, - dtype: &str, - fill_value: Vec, + dtype: DataType, + fill_value: FillValue, ) -> PyResult { - // Get the chunk representation - let data_type = DataType::from_metadata( - &MetadataV3::new(dtype), - zarrs::config::global_config().data_type_aliases_v3(), - ) - .map_py_err::()?; let chunk_shape = chunk_shape .into_iter() .map(|x| NonZeroU64::new(x).expect("chunk shapes should always be non-zero")) .collect(); let chunk_representation = - ChunkRepresentation::new(chunk_shape, data_type, FillValue::new(fill_value)) - .map_py_err::()?; + ChunkRepresentation::new(chunk_shape, dtype, fill_value).map_py_err::()?; Ok(chunk_representation) } diff --git a/src/concurrency.rs b/src/concurrency.rs index 364b33b..fb8adba 100644 --- a/src/concurrency.rs +++ b/src/concurrency.rs @@ -1,10 +1,10 @@ use pyo3::PyResult; use zarrs::array::{ - ArrayCodecTraits, RecommendedConcurrency, codec::CodecOptions, + ArrayCodecTraits, ChunkRepresentation, RecommendedConcurrency, codec::CodecOptions, concurrency::calc_concurrency_outer_inner, }; -use crate::{CodecPipelineImpl, chunk_item::ChunksItem, utils::PyCodecErrExt as _}; +use crate::{CodecPipelineImpl, chunk_item::WithSubset, utils::PyCodecErrExt as _}; pub trait ChunkConcurrentLimitAndCodecOptions { fn get_chunk_concurrent_limit_and_codec_options( @@ -13,19 +13,15 @@ pub trait ChunkConcurrentLimitAndCodecOptions { ) -> PyResult>; } -impl ChunkConcurrentLimitAndCodecOptions for Vec -where - T: ChunksItem, -{ +impl ChunkConcurrentLimitAndCodecOptions for Vec<(ChunkRepresentation, WithSubset)> { fn get_chunk_concurrent_limit_and_codec_options( &self, codec_pipeline_impl: &CodecPipelineImpl, ) -> PyResult> { let num_chunks = self.len(); - let Some(chunk_descriptions0) = self.first() else { + let Some((chunk_representation, _)) = self.first() else { return Ok(None); }; - let chunk_representation = chunk_descriptions0.representation(); let codec_concurrency = codec_pipeline_impl .codec_chain diff --git a/src/lib.rs b/src/lib.rs index b01ee1c..65354d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,8 +23,8 @@ use zarrs::array::codec::{ StoragePartialDecoder, }; use zarrs::array::{ - ArrayBytes, ArrayBytesFixedDisjointView, ArrayMetadata, ArraySize, CodecChain, FillValue, - copy_fill_value_into, update_array_bytes, + ArrayBytes, ArrayBytesFixedDisjointView, ArrayMetadata, ArraySize, ChunkRepresentation, + CodecChain, DataType, FillValue, copy_fill_value_into, update_array_bytes, }; use zarrs::array_subset::ArraySubset; use zarrs::config::global_config; @@ -43,7 +43,7 @@ mod store; mod tests; mod utils; -use crate::chunk_item::ChunksItem; +use crate::chunk_item::{WithRepresentations, fill_value_to_bytes}; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; use crate::store::StoreConfig; use crate::utils::{PyCodecErrExt, PyErrExt as _, PyUntypedArrayExt as _}; @@ -58,79 +58,84 @@ pub struct CodecPipelineImpl { pub(crate) chunk_concurrent_minimum: usize, pub(crate) chunk_concurrent_maximum: usize, pub(crate) num_threads: usize, + pub(crate) fill_value: FillValue, + pub(crate) data_type: DataType, } impl CodecPipelineImpl { - fn retrieve_chunk_bytes<'a, I: ChunksItem>( + fn retrieve_chunk_bytes<'a>( &self, - item: &I, + item: &WithSubset, + representation: &ChunkRepresentation, codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = self.store.get(item.key()).map_py_err::()?; + let value_encoded = self.store.get(&item.key).map_py_err::()?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain - .decode(value_encoded.into(), item.representation(), codec_options) + .decode(value_encoded.into(), representation, codec_options) .map_codec_err()? } else { - let array_size = ArraySize::new( - item.representation().data_type().size(), - item.representation().num_elements(), - ); - ArrayBytes::new_fill_value(array_size, item.representation().fill_value()) + let array_size = ArraySize::new(self.data_type.size(), representation.num_elements()); + ArrayBytes::new_fill_value(array_size, &self.fill_value) }; Ok(value_decoded) } - fn store_chunk_bytes( + fn store_chunk_bytes( &self, - item: &I, + item: &WithSubset, + representation: &ChunkRepresentation, codec_chain: &CodecChain, value_decoded: ArrayBytes, codec_options: &CodecOptions, ) -> PyResult<()> { value_decoded - .validate( - item.representation().num_elements(), - item.representation().data_type().size(), - ) + .validate(representation.num_elements(), self.data_type.size()) .map_codec_err()?; - if value_decoded.is_fill_value(item.representation().fill_value()) { - self.store.erase(item.key()).map_py_err::() + if value_decoded.is_fill_value(&self.fill_value) { + self.store.erase(&item.key).map_py_err::() } else { let value_encoded = codec_chain - .encode(value_decoded, item.representation(), codec_options) + .encode(value_decoded, representation, codec_options) .map(Cow::into_owned) .map_codec_err()?; // Store the encoded chunk self.store - .set(item.key(), value_encoded.into()) + .set(&item.key, value_encoded.into()) .map_py_err::() } } - fn store_chunk_subset_bytes( + fn store_chunk_subset_bytes( &self, - item: &I, + item: &WithSubset, + representation: &ChunkRepresentation, codec_chain: &CodecChain, chunk_subset_bytes: ArrayBytes, chunk_subset: &ArraySubset, codec_options: &CodecOptions, ) -> PyResult<()> { - let array_shape = item.representation().shape_u64(); + let array_shape = representation.shape_u64(); if !chunk_subset.inbounds_shape(&array_shape) { return Err(PyErr::new::(format!( "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" ))); } - let data_type_size = item.representation().data_type().size(); + let data_type_size = self.data_type.size(); if chunk_subset.start().iter().all(|&o| o == 0) && chunk_subset.shape() == array_shape { // Fast path if the chunk subset spans the entire chunk, no read required - self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) + self.store_chunk_bytes( + item, + representation, + codec_chain, + chunk_subset_bytes, + codec_options, + ) } else { // Validate the chunk subset bytes chunk_subset_bytes @@ -138,7 +143,8 @@ impl CodecPipelineImpl { .map_codec_err()?; // Retrieve the chunk - let chunk_bytes_old = self.retrieve_chunk_bytes(item, codec_chain, codec_options)?; + let chunk_bytes_old = + self.retrieve_chunk_bytes(item, representation, codec_chain, codec_options)?; // Update the chunk let chunk_bytes_new = update_array_bytes( @@ -151,7 +157,13 @@ impl CodecPipelineImpl { .map_codec_err()?; // Store the updated chunk - self.store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) + self.store_chunk_bytes( + item, + representation, + codec_chain, + chunk_bytes_new, + codec_options, + ) } } @@ -246,7 +258,9 @@ impl CodecPipelineImpl { chunk_concurrent_minimum=None, chunk_concurrent_maximum=None, num_threads=None, - direct_io=false + direct_io=false, + fill_value, + dtype_str, ))] #[new] fn new( @@ -257,6 +271,8 @@ impl CodecPipelineImpl { chunk_concurrent_maximum: Option, num_threads: Option, direct_io: bool, + fill_value: Bound<'_, PyAny>, + dtype_str: String, ) -> PyResult { store_config.direct_io(direct_io); let metadata: ArrayMetadata = @@ -281,6 +297,21 @@ impl CodecPipelineImpl { let store: ReadableWritableListableStorage = (&store_config).try_into().map_py_err::()?; + let fill_value = FillValue::new(fill_value_to_bytes(&dtype_str, &fill_value)?); + + let dtype = if dtype_str == "object" { + // zarrs doesn't understand `object` which is the output of `np.dtype("|O").__str__()` + // but maps it to "string" internally https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L288 + String::from("string") + } else { + dtype_str + }; + let data_type = DataType::from_metadata( + &MetadataV3::new(dtype), + zarrs::config::global_config().data_type_aliases_v3(), + ) + .map_py_err::()?; + Ok(Self { store, codec_chain, @@ -288,6 +319,8 @@ impl CodecPipelineImpl { chunk_concurrent_minimum, chunk_concurrent_maximum, num_threads, + fill_value, + data_type, }) } @@ -298,43 +331,40 @@ impl CodecPipelineImpl { value: &Bound<'_, PyUntypedArray>, ) -> PyResult<()> { // Get input array + let representations_with_chunk_subsets = chunk_descriptions + .with_representations(self.data_type.clone(), self.fill_value.clone())?; let output = Self::nparray_to_unsafe_cell_slice(value)?; let output_shape: Vec = value.shape_zarr()?; // Adjust the concurrency based on the codec chain and the first chunk description - let Some((chunk_concurrent_limit, codec_options)) = - chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? + let Some((chunk_concurrent_limit, codec_options)) = representations_with_chunk_subsets + .get_chunk_concurrent_limit_and_codec_options(self)? else { return Ok(()); }; // Assemble partial decoders ahead of time and in parallel - let partial_chunk_descriptions = chunk_descriptions + let partial_chunk_descriptions_with_representations = representations_with_chunk_subsets .iter() - .filter(|item| !(is_whole_chunk(item))) - .unique_by(|item| item.key()) + .filter(|(representation, item)| !(is_whole_chunk(item, representation))) + .unique_by(|(_, item)| item.key.clone()) .collect::>(); let mut partial_decoder_cache: HashMap> = HashMap::new(); - if !partial_chunk_descriptions.is_empty() { + if !partial_chunk_descriptions_with_representations.is_empty() { let key_decoder_pairs = iter_concurrent_limit!( chunk_concurrent_limit, - partial_chunk_descriptions, + partial_chunk_descriptions_with_representations, map, - |item| { + |(representation, item)| { let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); - let input_handle = - StoragePartialDecoder::new(storage_handle, item.key().clone()); + let input_handle = StoragePartialDecoder::new(storage_handle, item.key.clone()); let partial_decoder = self .codec_chain .clone() - .partial_decoder( - Arc::new(input_handle), - item.representation(), - &codec_options, - ) + .partial_decoder(Arc::new(input_handle), &representation, &codec_options) .map_codec_err()?; - Ok((item.key().clone(), partial_decoder)) + Ok((item.key.clone(), partial_decoder)) } ) .collect::>>()?; @@ -345,71 +375,75 @@ impl CodecPipelineImpl { // FIXME: the `decode_into` methods only support fixed length data types. // For variable length data types, need a codepath with non `_into` methods. // Collect all the subsets and copy into value on the Python side? - let update_chunk_subset = |item: chunk_item::WithSubset| { - let chunk_item::WithSubset { - item, - subset, - chunk_subset, - } = item; - let mut output_view = unsafe { - // TODO: Is the following correct? - // can we guarantee that when this function is called from Python with arbitrary arguments? - // SAFETY: chunks represent disjoint array subsets - ArrayBytesFixedDisjointView::new( - output, - // TODO: why is data_type in `item`, it should be derived from `output`, no? - item.representation() - .data_type() - .fixed_size() - .ok_or("variable length data type not supported") - .map_py_err::()?, - &output_shape, + let update_chunk_subset = + |(representation, item): (ChunkRepresentation, WithSubset)| { + let chunk_item::WithSubset { + key, subset, - ) - .map_py_err::()? - }; + chunk_subset, + .. + } = item; + let mut output_view = unsafe { + // TODO: Is the following correct? + // can we guarantee that when this function is called from Python with arbitrary arguments? + // SAFETY: chunks represent disjoint array subsets + ArrayBytesFixedDisjointView::new( + output, + // TODO: why is data_type in `item`, it should be derived from `output`, no? + representation + .data_type() + .fixed_size() + .ok_or("variable length data type not supported") + .map_py_err::()?, + &output_shape, + subset, + ) + .map_py_err::()? + }; - // See zarrs::array::Array::retrieve_chunk_subset_into - if chunk_subset.start().iter().all(|&o| o == 0) - && chunk_subset.shape() == item.representation().shape_u64() - { - // See zarrs::array::Array::retrieve_chunk_into - if let Some(chunk_encoded) = - self.store.get(item.key()).map_py_err::()? + // See zarrs::array::Array::retrieve_chunk_subset_into + if chunk_subset.start().iter().all(|&o| o == 0) + && chunk_subset.shape() == representation.shape_u64() { - // Decode the encoded data into the output buffer - let chunk_encoded: Vec = chunk_encoded.into(); - self.codec_chain.decode_into( - Cow::Owned(chunk_encoded), - item.representation(), - &mut output_view, - &codec_options, - ) + // See zarrs::array::Array::retrieve_chunk_into + if let Some(chunk_encoded) = + self.store.get(&key).map_py_err::()? + { + // Decode the encoded data into the output buffer + let chunk_encoded: Vec = chunk_encoded.into(); + self.codec_chain.decode_into( + Cow::Owned(chunk_encoded), + &representation, + &mut output_view, + &codec_options, + ) + } else { + // The chunk is missing, write the fill value + copy_fill_value_into( + &self.data_type, + &self.fill_value, + &mut output_view, + ) + } } else { - // The chunk is missing, write the fill value - copy_fill_value_into( - item.representation().data_type(), - item.representation().fill_value(), + let key = &key; + let partial_decoder = partial_decoder_cache.get(key).ok_or_else(|| { + PyRuntimeError::new_err(format!( + "Partial decoder not found for key: {key}" + )) + })?; + partial_decoder.partial_decode_into( + &chunk_subset, &mut output_view, + &codec_options, ) } - } else { - let key = item.key(); - let partial_decoder = partial_decoder_cache.get(key).ok_or_else(|| { - PyRuntimeError::new_err(format!("Partial decoder not found for key: {key}")) - })?; - partial_decoder.partial_decode_into( - &chunk_subset, - &mut output_view, - &codec_options, - ) - } - .map_codec_err() - }; + .map_codec_err() + }; iter_concurrent_limit!( chunk_concurrent_limit, - chunk_descriptions, + representations_with_chunk_subsets, try_for_each, update_chunk_subset )?; @@ -429,6 +463,8 @@ impl CodecPipelineImpl { Array(ArrayBytes<'a>), Constant(FillValue), } + let representations_with_chunk_subsets = chunk_descriptions + .with_representations(self.data_type.clone(), self.fill_value.clone())?; // Get input array let input_slice = Self::nparray_to_slice(value)?; @@ -441,25 +477,25 @@ impl CodecPipelineImpl { let input_shape: Vec = value.shape_zarr()?; // Adjust the concurrency based on the codec chain and the first chunk description - let Some((chunk_concurrent_limit, mut codec_options)) = - chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? + let Some((chunk_concurrent_limit, mut codec_options)) = representations_with_chunk_subsets + .get_chunk_concurrent_limit_and_codec_options(self)? else { return Ok(()); }; codec_options.set_store_empty_chunks(write_empty_chunks); py.allow_threads(move || { - let store_chunk = |item: chunk_item::WithSubset| match &input { + let store_chunk = |(representation, item): ( + ChunkRepresentation, + chunk_item::WithSubset, + )| match &input { InputValue::Array(input) => { let chunk_subset_bytes = input - .extract_array_subset( - &item.subset, - &input_shape, - item.item.representation().data_type(), - ) + .extract_array_subset(&item.subset, &input_shape, &self.data_type) .map_codec_err()?; self.store_chunk_subset_bytes( &item, + &representation, &self.codec_chain, chunk_subset_bytes, &item.chunk_subset, @@ -468,15 +504,13 @@ impl CodecPipelineImpl { } InputValue::Constant(constant_value) => { let chunk_subset_bytes = ArrayBytes::new_fill_value( - ArraySize::new( - item.representation().data_type().size(), - item.chunk_subset.num_elements(), - ), + ArraySize::new(self.data_type.size(), item.chunk_subset.num_elements()), constant_value, ); self.store_chunk_subset_bytes( &item, + &representation, &self.codec_chain, chunk_subset_bytes, &item.chunk_subset, @@ -487,7 +521,7 @@ impl CodecPipelineImpl { iter_concurrent_limit!( chunk_concurrent_limit, - chunk_descriptions, + representations_with_chunk_subsets, try_for_each, store_chunk )?; @@ -502,7 +536,6 @@ impl CodecPipelineImpl { fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_class::()?; - m.add_class::()?; m.add_class::()?; Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index eda2aa0..8c44c67 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -2,9 +2,9 @@ use std::fmt::Display; use numpy::{PyUntypedArray, PyUntypedArrayMethods}; use pyo3::{Bound, PyErr, PyResult, PyTypeInfo}; -use zarrs::array::codec::CodecError; +use zarrs::array::{ChunkRepresentation, codec::CodecError}; -use crate::{ChunksItem, WithSubset}; +use crate::WithSubset; pub(crate) trait PyErrExt { fn map_py_err(self) -> PyResult; @@ -55,7 +55,7 @@ impl PyUntypedArrayExt for Bound<'_, PyUntypedArray> { } } -pub fn is_whole_chunk(item: &WithSubset) -> bool { +pub fn is_whole_chunk(item: &WithSubset, representation: &ChunkRepresentation) -> bool { item.chunk_subset.start().iter().all(|&o| o == 0) - && item.chunk_subset.shape() == item.representation().shape_u64() + && item.chunk_subset.shape() == representation.shape_u64() } From 9c43e815f89fc6412bf050f1b184d8b490361aa9 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 1 Jan 2026 22:30:50 +0100 Subject: [PATCH 2/8] feat: upgrade zarr v3 --- Cargo.toml | 2 +- src/chunk_item.rs | 61 ++--------- src/concurrency.rs | 15 ++- src/lib.rs | 264 ++++++++++++++++++++------------------------- src/utils.rs | 12 ++- 5 files changed, 145 insertions(+), 209 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f3afd94..55b05ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ crate-type = ["cdylib", "rlib"] [dependencies] pyo3 = { version = "0.27.1", features = ["abi3-py311"] } -zarrs = { version = "0.22.4", features = ["async", "zlib", "pcodec", "bz2"] } +zarrs = { version = "0.23.0-beta.3", features = ["async", "zlib", "pcodec", "bz2"] } rayon_iter_concurrent_limit = "0.2.0" rayon = "1.10.0" # fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 7031fe7..e76172f 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -7,11 +7,7 @@ use pyo3::{ types::{PyAnyMethods, PyBytes, PyBytesMethods, PyInt, PySlice, PySliceMethods as _}, }; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; -use zarrs::{ - array::{ChunkRepresentation, DataType, FillValue}, - array_subset::ArraySubset, - storage::StoreKey, -}; +use zarrs::{array_subset::ArraySubset, storage::StoreKey}; use crate::utils::PyErrExt; @@ -48,27 +44,9 @@ pub(crate) struct WithSubset { pub key: StoreKey, pub chunk_subset: ArraySubset, pub subset: ArraySubset, - chunk_shape: Vec, -} - -pub trait WithRepresentations { - fn with_representations( - self, - data_type: DataType, - fill_value: FillValue, - ) -> PyResult>; -} - -impl WithRepresentations for Vec { - fn with_representations( - self, - data_type: DataType, - fill_value: FillValue, - ) -> PyResult> { - self.into_iter() - .map(|f| Ok((f.representation(data_type.clone(), fill_value.clone())?, f))) - .collect::>>() - } + pub chunk_shape_u64: Vec, + pub chunk_shape: Vec, + pub num_elements: u64, } #[gen_stub_pymethods] @@ -97,35 +75,16 @@ impl WithSubset { key: StoreKey::new(key).map_py_err::()?, chunk_subset, subset, - chunk_shape, + chunk_shape: chunk_shape + .iter() + .map(|v| NonZeroU64::new(*v).unwrap()) + .collect(), // TODO: Unwrap + num_elements: chunk_shape.iter().product(), + chunk_shape_u64: chunk_shape, }) } } -impl WithSubset { - fn representation( - &self, - dtype: DataType, - fill_value: FillValue, - ) -> PyResult { - get_chunk_representation(self.chunk_shape.clone(), dtype, fill_value) - } -} - -fn get_chunk_representation( - chunk_shape: Vec, - dtype: DataType, - fill_value: FillValue, -) -> PyResult { - let chunk_shape = chunk_shape - .into_iter() - .map(|x| NonZeroU64::new(x).expect("chunk shapes should always be non-zero")) - .collect(); - let chunk_representation = - ChunkRepresentation::new(chunk_shape, dtype, fill_value).map_py_err::()?; - Ok(chunk_representation) -} - fn slice_to_range(slice: &Bound<'_, PySlice>, length: isize) -> PyResult> { let indices = slice.indices(length)?; if indices.start < 0 { diff --git a/src/concurrency.rs b/src/concurrency.rs index fb8adba..40b0301 100644 --- a/src/concurrency.rs +++ b/src/concurrency.rs @@ -1,6 +1,6 @@ use pyo3::PyResult; use zarrs::array::{ - ArrayCodecTraits, ChunkRepresentation, RecommendedConcurrency, codec::CodecOptions, + ArrayCodecTraits, RecommendedConcurrency, codec::CodecOptions, concurrency::calc_concurrency_outer_inner, }; @@ -13,19 +13,19 @@ pub trait ChunkConcurrentLimitAndCodecOptions { ) -> PyResult>; } -impl ChunkConcurrentLimitAndCodecOptions for Vec<(ChunkRepresentation, WithSubset)> { +impl ChunkConcurrentLimitAndCodecOptions for Vec { fn get_chunk_concurrent_limit_and_codec_options( &self, codec_pipeline_impl: &CodecPipelineImpl, ) -> PyResult> { let num_chunks = self.len(); - let Some((chunk_representation, _)) = self.first() else { + let Some(item) = self.first() else { return Ok(None); }; let codec_concurrency = codec_pipeline_impl .codec_chain - .recommended_concurrency(chunk_representation) + .recommended_concurrency(&item.chunk_shape, &codec_pipeline_impl.data_type) .map_codec_err()?; let min_concurrent_chunks = @@ -37,11 +37,8 @@ impl ChunkConcurrentLimitAndCodecOptions for Vec<(ChunkRepresentation, WithSubse &RecommendedConcurrency::new(min_concurrent_chunks..max_concurrent_chunks), &codec_concurrency, ); - let codec_options = codec_pipeline_impl - .codec_options - .into_builder() - .concurrent_target(codec_concurrent_limit) - .build(); + let mut codec_options = codec_pipeline_impl.codec_options.clone(); + codec_options.set_concurrent_target(codec_concurrent_limit); Ok(Some((chunk_concurrent_limit, codec_options))) } } diff --git a/src/lib.rs b/src/lib.rs index db3f90f..f8cc246 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,20 +19,20 @@ use rayon_iter_concurrent_limit::iter_concurrent_limit; use unsafe_cell_slice::UnsafeCellSlice; use utils::is_whole_chunk; use zarrs::array::codec::{ - ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, + ArrayBytesDecodeIntoTarget, ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, StoragePartialDecoder, }; use zarrs::array::{ - ArrayBytes, ArrayBytesFixedDisjointView, ArrayMetadata, ArraySize, ChunkRepresentation, - CodecChain, DataType, FillValue, copy_fill_value_into, update_array_bytes, + ArrayBytes, ArrayBytesFixedDisjointView, ArrayMetadata, CodecChain, DataType, DataTypeExt, + FillValue, copy_fill_value_into, update_array_bytes, }; use zarrs::array_subset::ArraySubset; use zarrs::config::global_config; -use zarrs::metadata::v2::data_type_metadata_v2_to_endianness; -use zarrs::metadata::v3::MetadataV3; -use zarrs::metadata_ext::v2_to_v3::{ +use zarrs::convert::{ ArrayMetadataV2ToV3Error, codec_metadata_v2_to_v3, data_type_metadata_v2_to_v3, }; +use zarrs::metadata::v2::data_type_metadata_v2_to_endianness; +use zarrs::metadata::v3::MetadataV3; use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; mod chunk_item; @@ -43,7 +43,7 @@ mod store; mod tests; mod utils; -use crate::chunk_item::{WithRepresentations, fill_value_to_bytes}; +use crate::chunk_item::fill_value_to_bytes; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; use crate::store::StoreConfig; use crate::utils::{PyCodecErrExt, PyErrExt as _, PyUntypedArrayExt as _}; @@ -66,7 +66,6 @@ impl CodecPipelineImpl { fn retrieve_chunk_bytes<'a>( &self, item: &WithSubset, - representation: &ChunkRepresentation, codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { @@ -74,11 +73,17 @@ impl CodecPipelineImpl { let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain - .decode(value_encoded.into(), representation, codec_options) + .decode( + value_encoded.into(), + &item.chunk_shape, + &self.data_type, + &self.fill_value, + codec_options, + ) .map_codec_err()? } else { - let array_size = ArraySize::new(self.data_type.size(), representation.num_elements()); - ArrayBytes::new_fill_value(array_size, &self.fill_value) + ArrayBytes::new_fill_value(&self.data_type, item.num_elements, &self.fill_value) + .map_py_err::()? }; Ok(value_decoded) } @@ -86,20 +91,25 @@ impl CodecPipelineImpl { fn store_chunk_bytes( &self, item: &WithSubset, - representation: &ChunkRepresentation, codec_chain: &CodecChain, value_decoded: ArrayBytes, codec_options: &CodecOptions, ) -> PyResult<()> { value_decoded - .validate(representation.num_elements(), self.data_type.size()) + .validate(item.num_elements, &self.data_type) .map_codec_err()?; if value_decoded.is_fill_value(&self.fill_value) { self.store.erase(&item.key).map_py_err::() } else { let value_encoded = codec_chain - .encode(value_decoded, representation, codec_options) + .encode( + value_decoded, + &item.chunk_shape, + &self.data_type, + &self.fill_value, + codec_options, + ) .map(Cow::into_owned) .map_codec_err()?; @@ -113,43 +123,40 @@ impl CodecPipelineImpl { fn store_chunk_subset_bytes( &self, item: &WithSubset, - representation: &ChunkRepresentation, codec_chain: &CodecChain, chunk_subset_bytes: ArrayBytes, chunk_subset: &ArraySubset, codec_options: &CodecOptions, ) -> PyResult<()> { - let array_shape = representation.shape_u64(); - if !chunk_subset.inbounds_shape(&array_shape) { + let chunk_shape = item + .chunk_shape + .clone() + .into_iter() + .map(|v| u64::from(v)) + .collect::>(); + if !chunk_subset.inbounds_shape(&chunk_shape) { return Err(PyErr::new::(format!( - "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" + "chunk subset ({chunk_subset}) is out of bounds for array shape ({chunk_shape:?})" ))); } let data_type_size = self.data_type.size(); - if chunk_subset.start().iter().all(|&o| o == 0) && chunk_subset.shape() == array_shape { + if chunk_subset.start().iter().all(|&o| o == 0) && chunk_subset.shape() == chunk_shape { // Fast path if the chunk subset spans the entire chunk, no read required - self.store_chunk_bytes( - item, - representation, - codec_chain, - chunk_subset_bytes, - codec_options, - ) + self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) } else { // Validate the chunk subset bytes chunk_subset_bytes - .validate(chunk_subset.num_elements(), data_type_size) + .validate(chunk_subset.num_elements(), &self.data_type) .map_codec_err()?; // Retrieve the chunk - let chunk_bytes_old = - self.retrieve_chunk_bytes(item, representation, codec_chain, codec_options)?; + let chunk_bytes_old = self.retrieve_chunk_bytes(item, codec_chain, codec_options)?; // Update the chunk let chunk_bytes_new = update_array_bytes( chunk_bytes_old, - &array_shape, + &chunk_shape, chunk_subset, &chunk_subset_bytes, data_type_size, @@ -157,13 +164,7 @@ impl CodecPipelineImpl { .map_codec_err()?; // Store the updated chunk - self.store_chunk_bytes( - item, - representation, - codec_chain, - chunk_bytes_new, - codec_options, - ) + self.store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) } } @@ -224,14 +225,9 @@ fn array_metadata_to_codec_metadata_v3( match metadata { ArrayMetadata::V3(metadata) => Ok(metadata.codecs), ArrayMetadata::V2(metadata) => { - let config = global_config(); let endianness = data_type_metadata_v2_to_endianness(&metadata.dtype) .map_err(ArrayMetadataV2ToV3Error::InvalidEndianness)?; - let data_type = data_type_metadata_v2_to_v3( - &metadata.dtype, - config.data_type_aliases_v2(), - config.data_type_aliases_v3(), - )?; + let data_type: MetadataV3 = data_type_metadata_v2_to_v3(&metadata.dtype)?; codec_metadata_v2_to_v3( metadata.order, @@ -240,8 +236,6 @@ fn array_metadata_to_codec_metadata_v3( endianness, &metadata.filters, &metadata.compressor, - config.codec_aliases_v2(), - config.codec_aliases_v3(), ) } } @@ -278,15 +272,12 @@ impl CodecPipelineImpl { let metadata: ArrayMetadata = serde_json::from_str(array_metadata).map_py_err::()?; let codec_metadata = - array_metadata_to_codec_metadata_v3(metadata).map_py_err::()?; + array_metadata_to_codec_metadata_v3(metadata.clone()).map_py_err::()?; let codec_chain = Arc::new(CodecChain::from_metadata(&codec_metadata).map_py_err::()?); + let mut codec_options = CodecOptions::default(); - let mut codec_options = CodecOptionsBuilder::new(); - - codec_options = codec_options.validate_checksums(validate_checksums); - - let codec_options = codec_options.build(); + codec_options.set_validate_checksums(validate_checksums); let chunk_concurrent_minimum = chunk_concurrent_minimum .unwrap_or(zarrs::config::global_config().chunk_concurrent_minimum()); @@ -299,18 +290,11 @@ impl CodecPipelineImpl { let fill_value = FillValue::new(fill_value_to_bytes(&dtype_str, &fill_value)?); - let dtype = if dtype_str == "object" { - // zarrs doesn't understand `object` which is the output of `np.dtype("|O").__str__()` - // but maps it to "string" internally https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L288 - String::from("string") - } else { - dtype_str + let metadata_dtype = match metadata { + ArrayMetadata::V2(_) => todo!("figure this one out"), + ArrayMetadata::V3(v3) => v3.data_type.clone(), }; - let data_type = DataType::from_metadata( - &MetadataV3::new(dtype), - zarrs::config::global_config().data_type_aliases_v3(), - ) - .map_py_err::()?; + let data_type = DataType::from_metadata(&metadata_dtype).map_py_err::()?; Ok(Self { store, @@ -331,23 +315,21 @@ impl CodecPipelineImpl { value: &Bound<'_, PyUntypedArray>, ) -> PyResult<()> { // Get input array - let representations_with_chunk_subsets = chunk_descriptions - .with_representations(self.data_type.clone(), self.fill_value.clone())?; let output = Self::nparray_to_unsafe_cell_slice(value)?; let output_shape: Vec = value.shape_zarr()?; // Adjust the concurrency based on the codec chain and the first chunk description - let Some((chunk_concurrent_limit, codec_options)) = representations_with_chunk_subsets - .get_chunk_concurrent_limit_and_codec_options(self)? + let Some((chunk_concurrent_limit, codec_options)) = + chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? else { return Ok(()); }; // Assemble partial decoders ahead of time and in parallel - let partial_chunk_descriptions_with_representations = representations_with_chunk_subsets + let partial_chunk_descriptions_with_representations = chunk_descriptions .iter() - .filter(|(representation, item)| !(is_whole_chunk(item, representation))) - .unique_by(|(_, item)| item.key.clone()) + .filter(|item| !(is_whole_chunk(item))) + .unique_by(|item| item.key.clone()) .collect::>(); let mut partial_decoder_cache: HashMap> = HashMap::new(); @@ -356,13 +338,19 @@ impl CodecPipelineImpl { chunk_concurrent_limit, partial_chunk_descriptions_with_representations, map, - |(representation, item)| { + |item| { let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); let input_handle = StoragePartialDecoder::new(storage_handle, item.key.clone()); let partial_decoder = self .codec_chain .clone() - .partial_decoder(Arc::new(input_handle), &representation, &codec_options) + .partial_decoder( + Arc::new(input_handle), + &item.chunk_shape, + &self.data_type, + &self.fill_value, + &codec_options, + ) .map_codec_err()?; Ok((item.key.clone(), partial_decoder)) } @@ -375,75 +363,66 @@ impl CodecPipelineImpl { // FIXME: the `decode_into` methods only support fixed length data types. // For variable length data types, need a codepath with non `_into` methods. // Collect all the subsets and copy into value on the Python side? - let update_chunk_subset = - |(representation, item): (ChunkRepresentation, WithSubset)| { - let chunk_item::WithSubset { - key, + let update_chunk_subset = |item| { + let chunk_item::WithSubset { + key, + subset, + chunk_subset, + chunk_shape_u64, + .. + } = item; + let mut output_view = unsafe { + // TODO: Is the following correct? + // can we guarantee that when this function is called from Python with arbitrary arguments? + // SAFETY: chunks represent disjoint array subsets + ArrayBytesFixedDisjointView::new( + output, + // TODO: why is data_type in `item`, it should be derived from `output`, no? + self.data_type + .fixed_size() + .ok_or("variable length data type not supported") + .map_py_err::()?, + &output_shape, subset, - chunk_subset, - .. - } = item; - let mut output_view = unsafe { - // TODO: Is the following correct? - // can we guarantee that when this function is called from Python with arbitrary arguments? - // SAFETY: chunks represent disjoint array subsets - ArrayBytesFixedDisjointView::new( - output, - // TODO: why is data_type in `item`, it should be derived from `output`, no? - representation - .data_type() - .fixed_size() - .ok_or("variable length data type not supported") - .map_py_err::()?, - &output_shape, - subset, - ) - .map_py_err::()? - }; - - // See zarrs::array::Array::retrieve_chunk_subset_into - if chunk_subset.start().iter().all(|&o| o == 0) - && chunk_subset.shape() == representation.shape_u64() + ) + .map_py_err::()? + }; + let target = ArrayBytesDecodeIntoTarget::Fixed(&mut output_view); + // See zarrs::array::Array::retrieve_chunk_subset_into + if chunk_subset.start().iter().all(|&o| o == 0) + && chunk_subset.shape() == chunk_shape_u64 + { + // See zarrs::array::Array::retrieve_chunk_into + if let Some(chunk_encoded) = + self.store.get(&key).map_py_err::()? { - // See zarrs::array::Array::retrieve_chunk_into - if let Some(chunk_encoded) = - self.store.get(&key).map_py_err::()? - { - // Decode the encoded data into the output buffer - let chunk_encoded: Vec = chunk_encoded.into(); - self.codec_chain.decode_into( - Cow::Owned(chunk_encoded), - &representation, - &mut output_view, - &codec_options, - ) - } else { - // The chunk is missing, write the fill value - copy_fill_value_into( - &self.data_type, - &self.fill_value, - &mut output_view, - ) - } - } else { - let key = &key; - let partial_decoder = partial_decoder_cache.get(key).ok_or_else(|| { - PyRuntimeError::new_err(format!( - "Partial decoder not found for key: {key}" - )) - })?; - partial_decoder.partial_decode_into( - &chunk_subset, - &mut output_view, + // Decode the encoded data into the output buffer + let chunk_encoded: Vec = chunk_encoded.into(); + self.codec_chain.decode_into( + Cow::Owned(chunk_encoded), + &item.chunk_shape, + &self.data_type, + &self.fill_value, + target, &codec_options, ) + } else { + // The chunk is missing, write the fill value + copy_fill_value_into(&self.data_type, &self.fill_value, target) } - .map_codec_err() - }; + } else { + let key = &key; + let partial_decoder = partial_decoder_cache.get(key).ok_or_else(|| { + PyRuntimeError::new_err(format!("Partial decoder not found for key: {key}")) + })?; + partial_decoder.partial_decode_into(&chunk_subset, target, &codec_options) + } + .map_codec_err() + }; iter_concurrent_limit!( chunk_concurrent_limit, - representations_with_chunk_subsets, + chunk_descriptions, try_for_each, update_chunk_subset )?; @@ -463,8 +442,6 @@ impl CodecPipelineImpl { Array(ArrayBytes<'a>), Constant(FillValue), } - let representations_with_chunk_subsets = chunk_descriptions - .with_representations(self.data_type.clone(), self.fill_value.clone())?; // Get input array let input_slice = Self::nparray_to_slice(value)?; @@ -477,25 +454,21 @@ impl CodecPipelineImpl { let input_shape: Vec = value.shape_zarr()?; // Adjust the concurrency based on the codec chain and the first chunk description - let Some((chunk_concurrent_limit, mut codec_options)) = representations_with_chunk_subsets - .get_chunk_concurrent_limit_and_codec_options(self)? + let Some((chunk_concurrent_limit, mut codec_options)) = + chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? else { return Ok(()); }; codec_options.set_store_empty_chunks(write_empty_chunks); py.detach(move || { - let store_chunk = |(representation, item): ( - ChunkRepresentation, - chunk_item::WithSubset, - )| match &input { + let store_chunk = |item: WithSubset| match &input { InputValue::Array(input) => { let chunk_subset_bytes = input .extract_array_subset(&item.subset, &input_shape, &self.data_type) .map_codec_err()?; self.store_chunk_subset_bytes( &item, - &representation, &self.codec_chain, chunk_subset_bytes, &item.chunk_subset, @@ -504,13 +477,14 @@ impl CodecPipelineImpl { } InputValue::Constant(constant_value) => { let chunk_subset_bytes = ArrayBytes::new_fill_value( - ArraySize::new(self.data_type.size(), item.chunk_subset.num_elements()), + &self.data_type, + item.chunk_subset.num_elements(), constant_value, - ); + ) + .map_py_err::()?; self.store_chunk_subset_bytes( &item, - &representation, &self.codec_chain, chunk_subset_bytes, &item.chunk_subset, @@ -521,7 +495,7 @@ impl CodecPipelineImpl { iter_concurrent_limit!( chunk_concurrent_limit, - representations_with_chunk_subsets, + chunk_descriptions, try_for_each, store_chunk )?; diff --git a/src/utils.rs b/src/utils.rs index 8c44c67..c6a4e71 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -2,7 +2,7 @@ use std::fmt::Display; use numpy::{PyUntypedArray, PyUntypedArrayMethods}; use pyo3::{Bound, PyErr, PyResult, PyTypeInfo}; -use zarrs::array::{ChunkRepresentation, codec::CodecError}; +use zarrs::array::codec::CodecError; use crate::WithSubset; @@ -55,7 +55,13 @@ impl PyUntypedArrayExt for Bound<'_, PyUntypedArray> { } } -pub fn is_whole_chunk(item: &WithSubset, representation: &ChunkRepresentation) -> bool { +pub fn is_whole_chunk(item: &WithSubset) -> bool { item.chunk_subset.start().iter().all(|&o| o == 0) - && item.chunk_subset.shape() == representation.shape_u64() + && item.chunk_subset.shape() + == item + .chunk_shape + .clone() + .into_iter() + .map(|v| u64::from(v)) + .collect::>() // TODO: Remove copy } From 50c35608c9e1da4f0085c96403826bf010c12536 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Sat, 3 Jan 2026 12:42:11 +0100 Subject: [PATCH 3/8] fix: give a real title to zarr store --- tests/test_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 3dd34df..8fe0793 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -151,7 +151,7 @@ def maybe_convert( def gen_arr(fill_value, tmp_path, dimensionality, format) -> zarr.Array: return zarr.create( (axis_size_,) * dimensionality, - store=LocalStore(root=tmp_path / ".zarr"), + store=LocalStore(root=tmp_path / "store.zarr"), chunks=(chunk_size_,) * dimensionality, dtype=np.int16, fill_value=fill_value, From faf922b72af2b3e5c9fde8f41d668d30ff584ca7 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Sat, 3 Jan 2026 14:37:56 +0100 Subject: [PATCH 4/8] fix: don't pass in any metadata --- python/zarrs/pipeline.py | 3 --- src/lib.rs | 49 +++++++++------------------------------- 2 files changed, 11 insertions(+), 41 deletions(-) diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index f0f8bbf..ac87e94 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -29,7 +29,6 @@ CollapsedDimensionError, DiscontiguousArrayError, FillValueNoneError, - get_implicit_fill_value, make_chunk_info_for_rust_with_indices, ) @@ -63,8 +62,6 @@ def get_codec_pipeline_impl( ), num_threads=config.get("threading.max_workers", None), direct_io=config.get("codec_pipeline.direct_io", False), - fill_value=get_implicit_fill_value(metadata.dtype, metadata.fill_value), - dtype_str=str(metadata.dtype.to_native_dtype()), ) except (TypeError, ValueError) as e: if strict: diff --git a/src/lib.rs b/src/lib.rs index f8cc246..d09a741 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,7 +29,8 @@ use zarrs::array::{ use zarrs::array_subset::ArraySubset; use zarrs::config::global_config; use zarrs::convert::{ - ArrayMetadataV2ToV3Error, codec_metadata_v2_to_v3, data_type_metadata_v2_to_v3, + ArrayMetadataV2ToV3Error, array_metadata_v2_to_v3, codec_metadata_v2_to_v3, + data_type_metadata_v2_to_v3, fill_value_metadata_v2_to_v3, }; use zarrs::metadata::v2::data_type_metadata_v2_to_endianness; use zarrs::metadata::v3::MetadataV3; @@ -219,28 +220,6 @@ impl CodecPipelineImpl { } } -fn array_metadata_to_codec_metadata_v3( - metadata: ArrayMetadata, -) -> Result, ArrayMetadataV2ToV3Error> { - match metadata { - ArrayMetadata::V3(metadata) => Ok(metadata.codecs), - ArrayMetadata::V2(metadata) => { - let endianness = data_type_metadata_v2_to_endianness(&metadata.dtype) - .map_err(ArrayMetadataV2ToV3Error::InvalidEndianness)?; - let data_type: MetadataV3 = data_type_metadata_v2_to_v3(&metadata.dtype)?; - - codec_metadata_v2_to_v3( - metadata.order, - metadata.shape.len(), - &data_type, - endianness, - &metadata.filters, - &metadata.compressor, - ) - } - } -} - #[gen_stub_pymethods] #[pymethods] impl CodecPipelineImpl { @@ -253,8 +232,6 @@ impl CodecPipelineImpl { chunk_concurrent_maximum=None, num_threads=None, direct_io=false, - fill_value, - dtype_str, ))] #[new] fn new( @@ -265,14 +242,13 @@ impl CodecPipelineImpl { chunk_concurrent_maximum: Option, num_threads: Option, direct_io: bool, - fill_value: Bound<'_, PyAny>, - dtype_str: String, ) -> PyResult { store_config.direct_io(direct_io); - let metadata: ArrayMetadata = - serde_json::from_str(array_metadata).map_py_err::()?; - let codec_metadata = - array_metadata_to_codec_metadata_v3(metadata.clone()).map_py_err::()?; + let metadata = match serde_json::from_str(array_metadata).map_py_err::()? { + ArrayMetadata::V2(v2) => array_metadata_v2_to_v3(&v2).map_py_err::()?, + ArrayMetadata::V3(v3) => v3, + }; + let codec_metadata = metadata.codecs; let codec_chain = Arc::new(CodecChain::from_metadata(&codec_metadata).map_py_err::()?); let mut codec_options = CodecOptions::default(); @@ -288,13 +264,10 @@ impl CodecPipelineImpl { let store: ReadableWritableListableStorage = (&store_config).try_into().map_py_err::()?; - let fill_value = FillValue::new(fill_value_to_bytes(&dtype_str, &fill_value)?); - - let metadata_dtype = match metadata { - ArrayMetadata::V2(_) => todo!("figure this one out"), - ArrayMetadata::V3(v3) => v3.data_type.clone(), - }; - let data_type = DataType::from_metadata(&metadata_dtype).map_py_err::()?; + let data_type = DataType::from_metadata(&metadata.data_type).map_py_err::()?; + let fill_value = data_type + .fill_value(&metadata.fill_value) + .map_py_err::()?; Ok(Self { store, From c922dd02f2d8ec318c05478d928b7bd44fd34c8a Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Sat, 3 Jan 2026 19:18:35 +0100 Subject: [PATCH 5/8] fix: warning --- tests/test_v2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_v2.py b/tests/test_v2.py index c75877f..6e95a6e 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -301,7 +301,7 @@ def test_parse_structured_fill_value_valid( ) @pytest.mark.filterwarnings( # TODO: Fix handling of string fill values for Zarr v2 bytes data - "ignore:Array is unsupported by ZarrsCodecPipeline. incompatible fill value .eAAAAAAAAA==. for data type bytes:UserWarning" + "ignore:Array is unsupported by ZarrsCodecPipeline. incompatible fill value 0 for data type r56:UserWarning" ) @pytest.mark.parametrize("fill_value", [None, b"x"], ids=["no_fill", "fill"]) def test_other_dtype_roundtrip(fill_value, tmp_path) -> None: From 66096a55f7cdb62ea513dbd35e81a8627e2a7ca0 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Sat, 3 Jan 2026 21:13:16 +0100 Subject: [PATCH 6/8] fix: cleanups --- python/zarrs/_internal.pyi | 7 ++----- src/chunk_item.rs | 26 -------------------------- src/lib.rs | 8 +------- 3 files changed, 3 insertions(+), 38 deletions(-) diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index 0e4e76f..46b4dd4 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -7,10 +7,6 @@ import typing import numpy.typing import zarr.abc.store -@typing.final -class Basic: - def __new__(cls, byte_interface: typing.Any, chunk_spec: typing.Any) -> Basic: ... - @typing.final class CodecPipelineImpl: def __new__( @@ -40,8 +36,9 @@ class CodecPipelineImpl: class WithSubset: def __new__( cls, - item: Basic, + key: builtins.str, chunk_subset: typing.Sequence[slice], + chunk_shape: typing.Sequence[builtins.int], subset: typing.Sequence[slice], shape: typing.Sequence[builtins.int], ) -> WithSubset: ... diff --git a/src/chunk_item.rs b/src/chunk_item.rs index d7adae2..1dd31aa 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -11,32 +11,6 @@ use zarrs::{array_subset::ArraySubset, storage::StoreKey}; use crate::utils::PyErrExt; -pub fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult> { - if dtype == "string" { - // Match zarr-python 2.x.x string fill value behaviour with a 0 fill value - // See https://github.com/zarr-developers/zarr-python/issues/2792#issuecomment-2644362122 - if let Ok(fill_value_downcast) = fill_value.cast::() { - let fill_value_usize: usize = fill_value_downcast.extract()?; - if fill_value_usize == 0 { - return Ok(vec![]); - } - Err(PyErr::new::(format!( - "Cannot understand non-zero integer {fill_value_usize} fill value for dtype {dtype}" - )))?; - } - } - - if let Ok(fill_value_downcast) = fill_value.cast::() { - Ok(fill_value_downcast.as_bytes().to_vec()) - } else if fill_value.hasattr("tobytes")? { - Ok(fill_value.call_method0("tobytes")?.extract()?) - } else { - Err(PyErr::new::(format!( - "Unsupported fill value {fill_value:?}" - ))) - } -} - #[derive(Clone)] #[gen_stub_pyclass] #[pyclass] diff --git a/src/lib.rs b/src/lib.rs index 9eda050..31ca6ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,12 +28,7 @@ use zarrs::array::{ }; use zarrs::array_subset::ArraySubset; use zarrs::config::global_config; -use zarrs::convert::{ - ArrayMetadataV2ToV3Error, array_metadata_v2_to_v3, codec_metadata_v2_to_v3, - data_type_metadata_v2_to_v3, fill_value_metadata_v2_to_v3, -}; -use zarrs::metadata::v2::data_type_metadata_v2_to_endianness; -use zarrs::metadata::v3::MetadataV3; +use zarrs::convert::array_metadata_v2_to_v3; use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; mod chunk_item; @@ -44,7 +39,6 @@ mod store; mod tests; mod utils; -use crate::chunk_item::fill_value_to_bytes; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; use crate::store::StoreConfig; use crate::utils::{PyCodecErrExt, PyErrExt as _, PyUntypedArrayExt as _}; From dc5e60d6ba2aca689f9a9a8f6bdcd376d6b1f4ad Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Sun, 4 Jan 2026 14:59:26 +0100 Subject: [PATCH 7/8] chore: small cleanups --- src/chunk_item.rs | 38 ++++++++++++++++---------------------- src/lib.rs | 16 ++++++---------- 2 files changed, 22 insertions(+), 32 deletions(-) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 1dd31aa..79fa66d 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -1,16 +1,28 @@ use std::num::NonZeroU64; use pyo3::{ - Bound, PyAny, PyErr, PyResult, + Bound, PyErr, PyResult, exceptions::{PyIndexError, PyValueError}, pyclass, pymethods, - types::{PyAnyMethods, PyBytes, PyBytesMethods, PyInt, PySlice, PySliceMethods as _}, + types::{PySlice, PySliceMethods as _}, }; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use zarrs::{array_subset::ArraySubset, storage::StoreKey}; use crate::utils::PyErrExt; +fn to_nonzero_u64_vec(v: Vec) -> PyResult> { + v.into_iter() + .map(|dim| { + NonZeroU64::new(dim).ok_or_else(|| { + PyErr::new::( + "subset dimensions must be greater than zero".to_string(), + ) + }) + }) + .collect::>>() +} + #[derive(Clone)] #[gen_stub_pyclass] #[pyclass] @@ -35,26 +47,8 @@ impl WithSubset { shape: Vec, ) -> PyResult { let num_elements = chunk_shape.iter().product(); - let shape_nonzero_u64: Vec = shape - .into_iter() - .map(|dim| { - NonZeroU64::new(dim).ok_or_else(|| { - PyErr::new::( - "subset dimensions must be greater than zero".to_string(), - ) - }) - }) - .collect::>>()?; - let chunk_shape_nonzero_u64: Vec = chunk_shape - .into_iter() - .map(|dim| { - NonZeroU64::new(dim).ok_or_else(|| { - PyErr::new::( - "subset dimensions must be greater than zero".to_string(), - ) - }) - }) - .collect::>>()?; + let shape_nonzero_u64 = to_nonzero_u64_vec(shape)?; + let chunk_shape_nonzero_u64 = to_nonzero_u64_vec(chunk_shape)?; let chunk_subset = selection_to_array_subset(&chunk_subset, &chunk_shape_nonzero_u64)?; let subset = selection_to_array_subset(&subset, &shape_nonzero_u64)?; // Check that subset and chunk_subset have the same number of elements. diff --git a/src/lib.rs b/src/lib.rs index 31ca6ca..d07f13f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -288,19 +288,16 @@ impl CodecPipelineImpl { }; // Assemble partial decoders ahead of time and in parallel - let partial_chunk_descriptions_with_representations = chunk_descriptions + let partial_chunk_items = chunk_descriptions .iter() .filter(|item| !(is_whole_chunk(item))) .unique_by(|item| item.key().clone()) .collect::>(); let mut partial_decoder_cache: HashMap> = HashMap::new(); - if !partial_chunk_descriptions_with_representations.is_empty() { - let key_decoder_pairs = iter_concurrent_limit!( - chunk_concurrent_limit, - partial_chunk_descriptions_with_representations, - map, - |item| { + if !partial_chunk_items.is_empty() { + let key_decoder_pairs = + iter_concurrent_limit!(chunk_concurrent_limit, partial_chunk_items, map, |item| { let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); let input_handle = StoragePartialDecoder::new(storage_handle, item.key().clone()); @@ -316,9 +313,8 @@ impl CodecPipelineImpl { ) .map_codec_err()?; Ok((item.key().clone(), partial_decoder)) - } - ) - .collect::>>()?; + }) + .collect::>>()?; partial_decoder_cache.extend(key_decoder_pairs); } From 613033f8f7bcd6ff705707bc76e6164815ee922d Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Mon, 5 Jan 2026 12:27:11 +0100 Subject: [PATCH 8/8] chore: use `is_whole_chunk` more --- src/lib.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d07f13f..9fb2c9d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,10 +120,10 @@ impl CodecPipelineImpl { item: &WithSubset, codec_chain: &CodecChain, chunk_subset_bytes: ArrayBytes, - chunk_subset: &ArraySubset, codec_options: &CodecOptions, ) -> PyResult<()> { let array_shape = item.shape(); + let chunk_subset = &item.chunk_subset; if !chunk_subset.inbounds_shape(bytemuck::must_cast_slice(array_shape)) { return Err(PyErr::new::(format!( "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" @@ -131,9 +131,7 @@ impl CodecPipelineImpl { } let data_type_size = self.data_type.size(); - if chunk_subset.start().iter().all(|&o| o == 0) - && chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(array_shape) - { + if is_whole_chunk(item) { // Fast path if the chunk subset spans the entire chunk, no read required self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) } else { @@ -342,9 +340,7 @@ impl CodecPipelineImpl { }; let target = ArrayBytesDecodeIntoTarget::Fixed(&mut output_view); // See zarrs::array::Array::retrieve_chunk_subset_into - if item.chunk_subset.start().iter().all(|&o| o == 0) - && item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(shape) - { + if is_whole_chunk(&item) { // See zarrs::array::Array::retrieve_chunk_into if let Some(chunk_encoded) = self.store.get(item.key()).map_py_err::()? @@ -424,7 +420,6 @@ impl CodecPipelineImpl { &item, &self.codec_chain, chunk_subset_bytes, - &item.chunk_subset, &codec_options, ) } @@ -440,7 +435,6 @@ impl CodecPipelineImpl { &item, &self.codec_chain, chunk_subset_bytes, - &item.chunk_subset, &codec_options, ) }