From 1083382d302f907193393e83d3f36e4b19f639e3 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 27 Oct 2025 17:21:29 -0500 Subject: [PATCH 01/19] start drafting format --- Cargo.lock | 36 +++++ Cargo.toml | 7 +- rust/sedona-datasource/Cargo.toml | 66 +++++++++ rust/sedona-datasource/src/format.rs | 206 +++++++++++++++++++++++++++ rust/sedona-datasource/src/lib.rs | 18 +++ 5 files changed, 330 insertions(+), 3 deletions(-) create mode 100644 rust/sedona-datasource/Cargo.toml create mode 100644 rust/sedona-datasource/src/format.rs create mode 100644 rust/sedona-datasource/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4537b8c1d..6ad1b6615 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4893,6 +4893,42 @@ dependencies = [ "regex", ] +[[package]] +name = "sedona-datasource" +version = "0.2.0" +dependencies = [ + "arrow-array", + "arrow-schema", + "async-trait", + "bytes", + "chrono", + "datafusion", + "datafusion-catalog", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "datafusion-physical-plan", + "float_next_after", + "futures", + "geo-traits", + "object_store", + "parquet", + "rstest", + "sedona-common", + "sedona-expr", + "sedona-functions", + "sedona-geometry", + "sedona-schema", + "sedona-testing", + "serde", + "serde_json", + "serde_with", + "tempfile", + "tokio", + "url", +] + [[package]] name = "sedona-expr" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 84b19f58d..f8a7a8e1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,12 +21,14 @@ members = [ "c/sedona-proj", "c/sedona-s2geography", "c/sedona-tg", + "python/sedonadb", "r/sedonadb/src/rust", - "rust/sedona-geo-traits-ext", - "rust/sedona-geo-generic-alg", "rust/sedona-adbc", + "rust/sedona-datasource", "rust/sedona-expr", "rust/sedona-functions", + "rust/sedona-geo-generic-alg", + "rust/sedona-geo-traits-ext", "rust/sedona-geo", "rust/sedona-geometry", "rust/sedona-geoparquet", @@ -34,7 +36,6 @@ members = [ "rust/sedona-spatial-join", "rust/sedona-testing", "rust/sedona", - "python/sedonadb", "sedona-cli", ] resolver = "2" diff --git a/rust/sedona-datasource/Cargo.toml b/rust/sedona-datasource/Cargo.toml new file mode 100644 index 000000000..ffea1d818 --- /dev/null +++ b/rust/sedona-datasource/Cargo.toml @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "sedona-datasource" +version.workspace = true +homepage.workspace = true +repository.workspace = true +description.workspace = true +readme.workspace = true +edition.workspace = true +rust-version.workspace = true + +[lints.clippy] +result_large_err = "allow" + +[features] +default = [] + +[dev-dependencies] +sedona-testing = { path = "../sedona-testing" } +url = { workspace = true } +rstest = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true } + +[dependencies] +async-trait = { workspace = true } +arrow-schema = { workspace = true } +arrow-array = { workspace = true } +bytes = { workspace = true } +chrono = { workspace = true } +datafusion = { workspace = true, features = ["parquet"] } +datafusion-catalog = { workspace = true } +datafusion-common = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-plan = { workspace = true } +float_next_after = { workspace = true } +geo-traits = { workspace = true } +futures = { workspace = true } +object_store = { workspace = true } +parquet = { workspace = true } +sedona-common = { path = "../sedona-common" } +sedona-expr = { path = "../sedona-expr" } +sedona-functions = { path = "../sedona-functions" } +sedona-geometry = { path = "../sedona-geometry" } +sedona-schema = { path = "../sedona-schema" } +serde = { workspace = true } +serde_json = { workspace = true } +serde_with = { workspace = true } diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs new file mode 100644 index 000000000..d1f109a44 --- /dev/null +++ b/rust/sedona-datasource/src/format.rs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, sync::Arc}; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::{ + config::ConfigOptions, + datasource::{ + file_format::{file_compression_type::FileCompressionType, FileFormat, FileFormatFactory}, + physical_plan::{FileOpener, FileScanConfig, FileSinkConfig, FileSource}, + }, +}; +use datafusion_catalog::Session; +use datafusion_common::{GetExt, Result, Statistics}; +use datafusion_physical_expr::{LexRequirement, PhysicalExpr}; +use datafusion_physical_plan::{ + filter_pushdown::FilterPushdownPropagation, metrics::ExecutionPlanMetricsSet, ExecutionPlan, +}; +use object_store::{ObjectMeta, ObjectStore}; + +#[derive(Debug)] +struct SedonaFileFormatFactory {} + +#[derive(Debug)] +struct SedonaFileFormat {} + +/// GeoParquet FormatFactory +/// +/// A DataFusion FormatFactory provides a means to allow creating a table +/// or referencing one from a SQL context like COPY TO. +#[derive(Debug)] +pub struct SedonaFormatFactory { + inner: SedonaFileFormatFactory, +} + +impl SedonaFormatFactory { + /// Creates an instance of [GeoParquetFormatFactory] + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self { + inner: SedonaFileFormatFactory {}, + } + } +} + +impl FileFormatFactory for SedonaFormatFactory { + fn create( + &self, + state: &dyn Session, + format_options: &HashMap, + ) -> Result> { + todo!() + } + + fn default(&self) -> std::sync::Arc { + todo!() + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + +impl GetExt for SedonaFormatFactory { + fn get_ext(&self) -> String { + todo!() + } +} + +#[derive(Debug)] +pub struct SedonaFormat { + factory: SedonaFileFormatFactory, +} + +#[async_trait] +impl FileFormat for SedonaFormat { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_ext(&self) -> String { + todo!() + } + + fn get_ext_with_compression( + &self, + file_compression_type: &FileCompressionType, + ) -> Result { + todo!() + } + + fn compression_type(&self) -> Option { + todo!() + } + + async fn infer_schema( + &self, + state: &dyn Session, + store: &Arc, + objects: &[ObjectMeta], + ) -> Result { + todo!() + } + + async fn infer_stats( + &self, + state: &dyn Session, + store: &Arc, + table_schema: SchemaRef, + object: &ObjectMeta, + ) -> Result { + todo!() + } + + async fn create_physical_plan( + &self, + _state: &dyn Session, + config: FileScanConfig, + ) -> Result> { + todo!() + } + + async fn create_writer_physical_plan( + &self, + input: Arc, + _state: &dyn Session, + conf: FileSinkConfig, + order_requirements: Option, + ) -> Result> { + todo!() + } + + fn file_source(&self) -> Arc { + todo!() + } +} + +#[derive(Debug, Clone)] +pub struct SedonaFileSource {} + +impl FileSource for SedonaFileSource { + fn create_file_opener( + &self, + object_store: Arc, + base_config: &FileScanConfig, + partition: usize, + ) -> Arc { + todo!() + } + + fn try_pushdown_filters( + &self, + filters: Vec>, + config: &ConfigOptions, + ) -> Result>> { + todo!() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn with_batch_size(&self, batch_size: usize) -> Arc { + todo!() + } + + fn with_schema(&self, schema: SchemaRef) -> Arc { + todo!() + } + + fn with_projection(&self, config: &FileScanConfig) -> Arc { + todo!() + } + + fn with_statistics(&self, statistics: Statistics) -> Arc { + todo!() + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + todo!() + } + + fn statistics(&self) -> Result { + todo!() + } + + fn file_type(&self) -> &str { + todo!() + } +} diff --git a/rust/sedona-datasource/src/lib.rs b/rust/sedona-datasource/src/lib.rs new file mode 100644 index 000000000..af4a4d839 --- /dev/null +++ b/rust/sedona-datasource/src/lib.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod format; From 707673b296876bcd8a1c2b2e0e3add0fedb74aa1 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 28 Oct 2025 17:16:08 -0500 Subject: [PATCH 02/19] oof --- rust/sedona-datasource/src/format.rs | 280 ++++++++++++++++++++++----- 1 file changed, 227 insertions(+), 53 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index d1f109a44..0c22e1219 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -15,61 +15,116 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, collections::HashMap, sync::Arc}; +use std::{ + any::Any, + collections::HashMap, + fmt::{Debug, Display}, + sync::Arc, +}; -use arrow_schema::SchemaRef; +use arrow_array::RecordBatchReader; +use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; use datafusion::{ config::ConfigOptions, datasource::{ file_format::{file_compression_type::FileCompressionType, FileFormat, FileFormatFactory}, - physical_plan::{FileOpener, FileScanConfig, FileSinkConfig, FileSource}, + listing::PartitionedFile, + physical_plan::{ + FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileSinkConfig, FileSource, + }, }, }; -use datafusion_catalog::Session; -use datafusion_common::{GetExt, Result, Statistics}; -use datafusion_physical_expr::{LexRequirement, PhysicalExpr}; +use datafusion_catalog::{memory::DataSourceExec, Session}; +use datafusion_common::{not_impl_err, DataFusionError, GetExt, Result, Statistics}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExpr}; use datafusion_physical_plan::{ - filter_pushdown::FilterPushdownPropagation, metrics::ExecutionPlanMetricsSet, ExecutionPlan, + filter_pushdown::{FilterPushdownPropagation, PushedDown}, + metrics::ExecutionPlanMetricsSet, + ExecutionPlan, }; +use futures::{StreamExt, TryStreamExt}; use object_store::{ObjectMeta, ObjectStore}; +use sedona_common::sedona_internal_err; -#[derive(Debug)] -struct SedonaFileFormatFactory {} +#[async_trait] +pub trait SimpleFileFormat: Debug + Send + Sync { + fn extension(&self) -> &str; + fn with_options(&self, options: &HashMap) -> Result>; + async fn infer_schema(&self, location: &Object) -> Result; + async fn infer_stats(&self, _location: &Object, table_schema: &Schema) -> Result { + Ok(Statistics::new_unknown(table_schema)) + } + async fn open_reader(&self, args: &OpenReaderArgs) + -> Result>; +} -#[derive(Debug)] -struct SedonaFileFormat {} +#[derive(Debug, Clone)] +pub struct OpenReaderArgs { + pub src: Object, + pub batch_size: Option, + pub file_schema: Option, + pub file_projection: Option>, + pub filters: Option>>, +} + +#[derive(Debug, Clone)] +pub enum Object { + ObjectStoreUrl(Arc, ObjectStoreUrl), + ObjetctStoreMeta(Arc, ObjectMeta), + String(String), +} + +impl Display for Object { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Object::ObjectStoreUrl(_, object_store_url) => write!(f, "{object_store_url}"), + Object::ObjetctStoreMeta(object_store, object_meta) => { + // There's no great way to map an object_store to a url prefix. + // This is a heuristic that should work for https and a local filesystem, + // which is what we might be able to expect a non-DataFusion system like + // GDAL to be able to translate. + let object_store_debug = format!("{object_store:?}").to_lowercase(); + if object_store_debug.contains("http") { + write!(f, "https://{}", object_meta.location) + } else if object_store_debug.contains("local") { + write!(f, "file://{}", object_meta.location) + } else { + write!(f, "{object_store_debug}: {}", object_meta.location) + } + } + Object::String(item) => write!(f, "{item}"), + } + } +} -/// GeoParquet FormatFactory -/// -/// A DataFusion FormatFactory provides a means to allow creating a table -/// or referencing one from a SQL context like COPY TO. #[derive(Debug)] pub struct SedonaFormatFactory { - inner: SedonaFileFormatFactory, + spec: Arc, } impl SedonaFormatFactory { - /// Creates an instance of [GeoParquetFormatFactory] - #[allow(clippy::new_without_default)] - pub fn new() -> Self { - Self { - inner: SedonaFileFormatFactory {}, - } + pub fn new(spec: Arc) -> Self { + Self { spec } } } impl FileFormatFactory for SedonaFormatFactory { fn create( &self, - state: &dyn Session, + _state: &dyn Session, format_options: &HashMap, ) -> Result> { - todo!() + Ok(Arc::new(SedonaFormat { + spec: self.spec.with_options(format_options)?, + })) } - fn default(&self) -> std::sync::Arc { - todo!() + fn default(&self) -> Arc { + Arc::new(SedonaFormat { + spec: self.spec.clone(), + }) } fn as_any(&self) -> &dyn std::any::Any { @@ -79,13 +134,13 @@ impl FileFormatFactory for SedonaFormatFactory { impl GetExt for SedonaFormatFactory { fn get_ext(&self) -> String { - todo!() + self.spec.extension().to_string() } } #[derive(Debug)] pub struct SedonaFormat { - factory: SedonaFileFormatFactory, + spec: Arc, } #[async_trait] @@ -95,18 +150,18 @@ impl FileFormat for SedonaFormat { } fn get_ext(&self) -> String { - todo!() + self.spec.extension().to_string() } fn get_ext_with_compression( &self, - file_compression_type: &FileCompressionType, + _file_compression_type: &FileCompressionType, ) -> Result { - todo!() + not_impl_err!("extension with compression type") } fn compression_type(&self) -> Option { - todo!() + None } async fn infer_schema( @@ -115,17 +170,43 @@ impl FileFormat for SedonaFormat { store: &Arc, objects: &[ObjectMeta], ) -> Result { - todo!() + let mut schemas: Vec<_> = futures::stream::iter(objects) + .map(|object| async move { + let schema = self + .spec + .infer_schema(&Object::ObjetctStoreMeta(store.clone(), object.clone())) + .await?; + Ok::<_, DataFusionError>((object.location.clone(), schema)) + }) + .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 + .buffered(state.config_options().execution.meta_fetch_concurrency) + .try_collect() + .await?; + + schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2)); + + let schemas = schemas + .into_iter() + .map(|(_, schema)| schema) + .collect::>(); + + let schema = Schema::try_merge(schemas)?; + Ok(Arc::new(schema)) } async fn infer_stats( &self, - state: &dyn Session, + _state: &dyn Session, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, ) -> Result { - todo!() + self.spec + .infer_stats( + &Object::ObjetctStoreMeta(store.clone(), object.clone()), + &table_schema, + ) + .await } async fn create_physical_plan( @@ -133,43 +214,86 @@ impl FileFormat for SedonaFormat { _state: &dyn Session, config: FileScanConfig, ) -> Result> { - todo!() + Ok(DataSourceExec::from_data_source(config)) } async fn create_writer_physical_plan( &self, - input: Arc, + _input: Arc, _state: &dyn Session, - conf: FileSinkConfig, - order_requirements: Option, + _conf: FileSinkConfig, + _order_requirements: Option, ) -> Result> { - todo!() + not_impl_err!("writing not yet supported for SimpleSedonaFormat") } fn file_source(&self) -> Arc { - todo!() + Arc::new(SedonaFileSource::new(self.spec.clone())) } } #[derive(Debug, Clone)] -pub struct SedonaFileSource {} +struct SedonaFileSource { + spec: Arc, + batch_size: Option, + file_schema: Option, + file_projection: Option>, + filters: Option>>, + metrics: ExecutionPlanMetricsSet, + projected_statistics: Option, +} + +impl SedonaFileSource { + pub fn new(spec: Arc) -> Self { + Self { + spec, + batch_size: None, + file_schema: None, + file_projection: None, + filters: None, + metrics: ExecutionPlanMetricsSet::default(), + projected_statistics: None, + } + } +} impl FileSource for SedonaFileSource { fn create_file_opener( &self, - object_store: Arc, + store: Arc, base_config: &FileScanConfig, partition: usize, ) -> Arc { - todo!() + let args = OpenReaderArgs { + src: Object::ObjectStoreUrl(store.clone(), base_config.object_store_url.clone()), + batch_size: self.batch_size, + file_schema: self.file_schema.clone(), + file_projection: self.file_projection.clone(), + filters: self.filters.clone(), + }; + + Arc::new(SimpleOpener { + spec: self.spec.clone(), + args, + partition, + }) } fn try_pushdown_filters( &self, filters: Vec>, - config: &ConfigOptions, + _config: &ConfigOptions, ) -> Result>> { - todo!() + let source = Self { + filters: Some(filters.clone()), + ..self.clone() + }; + + Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![ + PushedDown::No; + filters.len() + ]) + .with_updated_node(Arc::new(source))) } fn as_any(&self) -> &dyn Any { @@ -177,30 +301,80 @@ impl FileSource for SedonaFileSource { } fn with_batch_size(&self, batch_size: usize) -> Arc { - todo!() + Arc::new(Self { + batch_size: Some(batch_size), + ..self.clone() + }) } fn with_schema(&self, schema: SchemaRef) -> Arc { - todo!() + Arc::new(Self { + file_schema: Some(schema), + ..self.clone() + }) } fn with_projection(&self, config: &FileScanConfig) -> Arc { - todo!() + Arc::new(Self { + file_projection: config.file_column_projection_indices(), + ..self.clone() + }) } fn with_statistics(&self, statistics: Statistics) -> Arc { - todo!() + Arc::new(Self { + projected_statistics: Some(statistics), + ..self.clone() + }) } fn metrics(&self) -> &ExecutionPlanMetricsSet { - todo!() + &self.metrics } fn statistics(&self) -> Result { - todo!() + let statistics = &self.projected_statistics; + Ok(statistics + .clone() + .expect("projected_statistics must be set")) } fn file_type(&self) -> &str { - todo!() + self.spec.extension() + } + + // File formats implemented in this way can't be repartitioned. File formats that + // benefit from this need their own FileFormat implementation. + fn repartitioned( + &self, + _target_partitions: usize, + _repartition_file_min_size: usize, + _output_ordering: Option, + _config: &FileScanConfig, + ) -> Result> { + Ok(None) + } +} + +#[derive(Debug, Clone)] +struct SimpleOpener { + spec: Arc, + args: OpenReaderArgs, + partition: usize, +} + +impl FileOpener for SimpleOpener { + fn open(&self, _file_meta: FileMeta, _file: PartitionedFile) -> Result { + if self.partition != 0 { + return sedona_internal_err!("Expected SimpleOpener to open a single partition"); + } + + let self_clone = self.clone(); + Ok(Box::pin(async move { + let reader = self_clone.spec.open_reader(&self_clone.args).await?; + let stream = + futures::stream::iter(reader.into_iter().map(|batch| batch.map_err(Into::into))); + Ok(stream.boxed()) + })) } } From 758f1fbc6add4270693e4b9999a7c46336274927 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 28 Oct 2025 17:18:54 -0500 Subject: [PATCH 03/19] rename stuff --- rust/sedona-datasource/src/format.rs | 35 +++++++++++++++------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 0c22e1219..3baf8b8e0 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -49,9 +49,12 @@ use object_store::{ObjectMeta, ObjectStore}; use sedona_common::sedona_internal_err; #[async_trait] -pub trait SimpleFileFormat: Debug + Send + Sync { +pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { fn extension(&self) -> &str; - fn with_options(&self, options: &HashMap) -> Result>; + fn with_options( + &self, + options: &HashMap, + ) -> Result>; async fn infer_schema(&self, location: &Object) -> Result; async fn infer_stats(&self, _location: &Object, table_schema: &Schema) -> Result { Ok(Statistics::new_unknown(table_schema)) @@ -100,29 +103,29 @@ impl Display for Object { } #[derive(Debug)] -pub struct SedonaFormatFactory { - spec: Arc, +pub struct RecordBatchReaderFormatFactory { + spec: Arc, } -impl SedonaFormatFactory { - pub fn new(spec: Arc) -> Self { +impl RecordBatchReaderFormatFactory { + pub fn new(spec: Arc) -> Self { Self { spec } } } -impl FileFormatFactory for SedonaFormatFactory { +impl FileFormatFactory for RecordBatchReaderFormatFactory { fn create( &self, _state: &dyn Session, format_options: &HashMap, ) -> Result> { - Ok(Arc::new(SedonaFormat { + Ok(Arc::new(RecordBatchReaderFormat { spec: self.spec.with_options(format_options)?, })) } fn default(&self) -> Arc { - Arc::new(SedonaFormat { + Arc::new(RecordBatchReaderFormat { spec: self.spec.clone(), }) } @@ -132,19 +135,19 @@ impl FileFormatFactory for SedonaFormatFactory { } } -impl GetExt for SedonaFormatFactory { +impl GetExt for RecordBatchReaderFormatFactory { fn get_ext(&self) -> String { self.spec.extension().to_string() } } #[derive(Debug)] -pub struct SedonaFormat { - spec: Arc, +struct RecordBatchReaderFormat { + spec: Arc, } #[async_trait] -impl FileFormat for SedonaFormat { +impl FileFormat for RecordBatchReaderFormat { fn as_any(&self) -> &dyn Any { self } @@ -234,7 +237,7 @@ impl FileFormat for SedonaFormat { #[derive(Debug, Clone)] struct SedonaFileSource { - spec: Arc, + spec: Arc, batch_size: Option, file_schema: Option, file_projection: Option>, @@ -244,7 +247,7 @@ struct SedonaFileSource { } impl SedonaFileSource { - pub fn new(spec: Arc) -> Self { + pub fn new(spec: Arc) -> Self { Self { spec, batch_size: None, @@ -358,7 +361,7 @@ impl FileSource for SedonaFileSource { #[derive(Debug, Clone)] struct SimpleOpener { - spec: Arc, + spec: Arc, args: OpenReaderArgs, partition: usize, } From 93639c02165ee7ea4efdc4ad6c83eb31fe95ae4c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 29 Oct 2025 21:25:06 -0500 Subject: [PATCH 04/19] write a test --- rust/sedona-datasource/src/format.rs | 119 +++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 3baf8b8e0..12c855299 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -381,3 +381,122 @@ impl FileOpener for SimpleOpener { })) } } + +#[cfg(test)] +mod test { + + use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray}; + use arrow_schema::{DataType, Field}; + use datafusion::{assert_batches_eq, execution::SessionStateBuilder, prelude::SessionContext}; + use datafusion_common::plan_err; + use tempfile::TempDir; + + use super::*; + + #[derive(Debug, Default, Clone)] + struct EchoSpec { + option_value: Option, + } + + #[async_trait] + impl RecordBatchReaderFormatSpec for EchoSpec { + fn extension(&self) -> &str { + "echospec" + } + + fn with_options( + &self, + options: &HashMap, + ) -> Result> { + let mut self_clone = self.clone(); + for (k, v) in options { + if k == "option_value" { + self_clone.option_value = Some(v.to_string()); + } else { + return plan_err!("Unsupported option for EchoSpec: '{k}'"); + } + } + + Ok(Arc::new(self_clone)) + } + + async fn infer_schema(&self, _location: &Object) -> Result { + Ok(Schema::new(vec![ + Field::new("src", DataType::Utf8, true), + Field::new("batch_size", DataType::Int64, true), + Field::new("filter_count", DataType::Int32, true), + Field::new("option_value", DataType::Utf8, true), + ])) + } + + async fn infer_stats( + &self, + _location: &Object, + table_schema: &Schema, + ) -> Result { + Ok(Statistics::new_unknown(table_schema)) + } + + async fn open_reader( + &self, + args: &OpenReaderArgs, + ) -> Result> { + let src: StringArray = [args.src.clone()] + .iter() + .map(|item| Some(item.to_string())) + .collect(); + let batch_size: Int64Array = [args.batch_size] + .iter() + .map(|item| item.map(|i| i as i64)) + .collect(); + let filter_count: Int32Array = [args.filters.clone().map(|f| f.len() as i32)] + .iter() + .collect(); + let option_value: StringArray = [self.option_value.clone()].iter().collect(); + + let schema = Arc::new(self.infer_schema(&args.src).await?); + let mut batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(src), + Arc::new(batch_size), + Arc::new(filter_count), + Arc::new(option_value), + ], + )?; + + if let Some(projection) = &args.file_projection { + batch = batch.project(projection)?; + } + + Ok(Box::new(RecordBatchIterator::new([Ok(batch)], schema))) + } + } + + #[tokio::test] + async fn spec_format() { + let spec = Arc::new(EchoSpec::default()); + let factory = RecordBatchReaderFormatFactory::new(spec); + + let temp_dir = TempDir::new().unwrap(); + let temp_path = temp_dir.path(); + let file0 = temp_path.join("item0.echospec"); + std::fs::File::create(&file0).unwrap(); + let file1 = temp_path.join("item1.echospec"); + std::fs::File::create(&file1).unwrap(); + + let mut state = SessionStateBuilder::new().build(); + state.register_file_format(Arc::new(factory), true).unwrap(); + let ctx = SessionContext::new_with_state(state).enable_url_table(); + + let batches_item0 = ctx + .table(file0.to_string_lossy().to_string()) + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_batches_eq!([""], &batches_item0); + } +} From cfee6cec51afa25af758ef03e81c5faa67cba72d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 29 Oct 2025 22:27:54 -0500 Subject: [PATCH 05/19] passing test --- rust/sedona-datasource/src/format.rs | 80 +++++++++++++++++----------- 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 12c855299..95f6dd968 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -15,12 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{ - any::Any, - collections::HashMap, - fmt::{Debug, Display}, - sync::Arc, -}; +use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc}; use arrow_array::RecordBatchReader; use arrow_schema::{Schema, SchemaRef}; @@ -73,31 +68,33 @@ pub struct OpenReaderArgs { } #[derive(Debug, Clone)] -pub enum Object { - ObjectStoreUrl(Arc, ObjectStoreUrl), - ObjetctStoreMeta(Arc, ObjectMeta), - String(String), +pub struct Object { + store: Arc, + url: Option, + meta: Option, } -impl Display for Object { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Object::ObjectStoreUrl(_, object_store_url) => write!(f, "{object_store_url}"), - Object::ObjetctStoreMeta(object_store, object_meta) => { - // There's no great way to map an object_store to a url prefix. +impl Object { + pub fn to_url_string(&self) -> String { + match (&self.url, &self.meta) { + (None, None) => format!("{:?}", self.store), + (None, Some(meta)) => { + // There's no great way to map an object_store to a url prefix if we're not + // provided the `url`; however, this is what we have access to on occasion. // This is a heuristic that should work for https and a local filesystem, // which is what we might be able to expect a non-DataFusion system like // GDAL to be able to translate. - let object_store_debug = format!("{object_store:?}").to_lowercase(); + let object_store_debug = format!("{:?}", self.store).to_lowercase(); if object_store_debug.contains("http") { - write!(f, "https://{}", object_meta.location) + format!("https://{}", meta.location) } else if object_store_debug.contains("local") { - write!(f, "file://{}", object_meta.location) + format!("file://{}", meta.location) } else { - write!(f, "{object_store_debug}: {}", object_meta.location) + format!("{object_store_debug}: {}", meta.location) } } - Object::String(item) => write!(f, "{item}"), + (Some(url), None) => url.to_string(), + (Some(url), Some(meta)) => format!("{url}/{}", meta.location), } } } @@ -177,7 +174,11 @@ impl FileFormat for RecordBatchReaderFormat { .map(|object| async move { let schema = self .spec - .infer_schema(&Object::ObjetctStoreMeta(store.clone(), object.clone())) + .infer_schema(&Object { + store: store.clone(), + url: None, + meta: Some(object.clone()), + }) .await?; Ok::<_, DataFusionError>((object.location.clone(), schema)) }) @@ -206,7 +207,11 @@ impl FileFormat for RecordBatchReaderFormat { ) -> Result { self.spec .infer_stats( - &Object::ObjetctStoreMeta(store.clone(), object.clone()), + &Object { + store: store.clone(), + url: None, + meta: Some(object.clone()), + }, &table_schema, ) .await @@ -268,7 +273,11 @@ impl FileSource for SedonaFileSource { partition: usize, ) -> Arc { let args = OpenReaderArgs { - src: Object::ObjectStoreUrl(store.clone(), base_config.object_store_url.clone()), + src: Object { + store: store.clone(), + url: Some(base_config.object_store_url.clone()), + meta: None, + }, batch_size: self.batch_size, file_schema: self.file_schema.clone(), file_projection: self.file_projection.clone(), @@ -367,13 +376,14 @@ struct SimpleOpener { } impl FileOpener for SimpleOpener { - fn open(&self, _file_meta: FileMeta, _file: PartitionedFile) -> Result { + fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> Result { if self.partition != 0 { return sedona_internal_err!("Expected SimpleOpener to open a single partition"); } - let self_clone = self.clone(); + let mut self_clone = self.clone(); Ok(Box::pin(async move { + self_clone.args.src.meta.replace(file_meta.object_meta); let reader = self_clone.spec.open_reader(&self_clone.args).await?; let stream = futures::stream::iter(reader.into_iter().map(|batch| batch.map_err(Into::into))); @@ -387,8 +397,9 @@ mod test { use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray}; use arrow_schema::{DataType, Field}; - use datafusion::{assert_batches_eq, execution::SessionStateBuilder, prelude::SessionContext}; + use datafusion::{execution::SessionStateBuilder, prelude::SessionContext}; use datafusion_common::plan_err; + use std::io::Write; use tempfile::TempDir; use super::*; @@ -443,7 +454,7 @@ mod test { ) -> Result> { let src: StringArray = [args.src.clone()] .iter() - .map(|item| Some(item.to_string())) + .map(|item| Some(item.to_url_string())) .collect(); let batch_size: Int64Array = [args.batch_size] .iter() @@ -481,9 +492,15 @@ mod test { let temp_dir = TempDir::new().unwrap(); let temp_path = temp_dir.path(); let file0 = temp_path.join("item0.echospec"); - std::fs::File::create(&file0).unwrap(); + std::fs::File::create(&file0) + .unwrap() + .write_all(b"not empty") + .unwrap(); let file1 = temp_path.join("item1.echospec"); - std::fs::File::create(&file1).unwrap(); + std::fs::File::create(&file1) + .unwrap() + .write_all(b"not empty") + .unwrap(); let mut state = SessionStateBuilder::new().build(); state.register_file_format(Arc::new(factory), true).unwrap(); @@ -497,6 +514,7 @@ mod test { .await .unwrap(); - assert_batches_eq!([""], &batches_item0); + assert_eq!(batches_item0.len(), 1); + assert_eq!(batches_item0[0].num_rows(), 1); } } From c3b0e71d053eeda7d3e00212de13d8e539a4c809 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 30 Oct 2025 16:18:06 -0500 Subject: [PATCH 06/19] split out the spec --- rust/sedona-datasource/src/format.rs | 62 ++------------------- rust/sedona-datasource/src/lib.rs | 1 + rust/sedona-datasource/src/spec.rs | 83 ++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 58 deletions(-) create mode 100644 rust/sedona-datasource/src/spec.rs diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 95f6dd968..f32adfe12 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -17,7 +17,6 @@ use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc}; -use arrow_array::RecordBatchReader; use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; use datafusion::{ @@ -32,7 +31,6 @@ use datafusion::{ }; use datafusion_catalog::{memory::DataSourceExec, Session}; use datafusion_common::{not_impl_err, DataFusionError, GetExt, Result, Statistics}; -use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExpr}; use datafusion_physical_plan::{ filter_pushdown::{FilterPushdownPropagation, PushedDown}, @@ -43,61 +41,7 @@ use futures::{StreamExt, TryStreamExt}; use object_store::{ObjectMeta, ObjectStore}; use sedona_common::sedona_internal_err; -#[async_trait] -pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { - fn extension(&self) -> &str; - fn with_options( - &self, - options: &HashMap, - ) -> Result>; - async fn infer_schema(&self, location: &Object) -> Result; - async fn infer_stats(&self, _location: &Object, table_schema: &Schema) -> Result { - Ok(Statistics::new_unknown(table_schema)) - } - async fn open_reader(&self, args: &OpenReaderArgs) - -> Result>; -} - -#[derive(Debug, Clone)] -pub struct OpenReaderArgs { - pub src: Object, - pub batch_size: Option, - pub file_schema: Option, - pub file_projection: Option>, - pub filters: Option>>, -} - -#[derive(Debug, Clone)] -pub struct Object { - store: Arc, - url: Option, - meta: Option, -} - -impl Object { - pub fn to_url_string(&self) -> String { - match (&self.url, &self.meta) { - (None, None) => format!("{:?}", self.store), - (None, Some(meta)) => { - // There's no great way to map an object_store to a url prefix if we're not - // provided the `url`; however, this is what we have access to on occasion. - // This is a heuristic that should work for https and a local filesystem, - // which is what we might be able to expect a non-DataFusion system like - // GDAL to be able to translate. - let object_store_debug = format!("{:?}", self.store).to_lowercase(); - if object_store_debug.contains("http") { - format!("https://{}", meta.location) - } else if object_store_debug.contains("local") { - format!("file://{}", meta.location) - } else { - format!("{object_store_debug}: {}", meta.location) - } - } - (Some(url), None) => url.to_string(), - (Some(url), Some(meta)) => format!("{url}/{}", meta.location), - } - } -} +use crate::spec::{Object, OpenReaderArgs, RecordBatchReaderFormatSpec}; #[derive(Debug)] pub struct RecordBatchReaderFormatFactory { @@ -395,7 +339,9 @@ impl FileOpener for SimpleOpener { #[cfg(test)] mod test { - use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray}; + use arrow_array::{ + Int32Array, Int64Array, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray, + }; use arrow_schema::{DataType, Field}; use datafusion::{execution::SessionStateBuilder, prelude::SessionContext}; use datafusion_common::plan_err; diff --git a/rust/sedona-datasource/src/lib.rs b/rust/sedona-datasource/src/lib.rs index af4a4d839..8834b8fc2 100644 --- a/rust/sedona-datasource/src/lib.rs +++ b/rust/sedona-datasource/src/lib.rs @@ -16,3 +16,4 @@ // under the License. pub mod format; +pub mod spec; diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs new file mode 100644 index 000000000..7b690b42a --- /dev/null +++ b/rust/sedona-datasource/src/spec.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::HashMap, fmt::Debug, sync::Arc}; + +use arrow_array::RecordBatchReader; +use arrow_schema::{Schema, SchemaRef}; +use async_trait::async_trait; + +use datafusion_common::{Result, Statistics}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_expr::PhysicalExpr; +use object_store::{ObjectMeta, ObjectStore}; + +#[async_trait] +pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { + fn extension(&self) -> &str; + fn with_options( + &self, + options: &HashMap, + ) -> Result>; + async fn infer_schema(&self, location: &Object) -> Result; + async fn infer_stats(&self, _location: &Object, table_schema: &Schema) -> Result { + Ok(Statistics::new_unknown(table_schema)) + } + async fn open_reader(&self, args: &OpenReaderArgs) + -> Result>; +} + +#[derive(Debug, Clone)] +pub struct OpenReaderArgs { + pub src: Object, + pub batch_size: Option, + pub file_schema: Option, + pub file_projection: Option>, + pub filters: Option>>, +} + +#[derive(Debug, Clone)] +pub struct Object { + pub store: Arc, + pub url: Option, + pub meta: Option, +} + +impl Object { + pub fn to_url_string(&self) -> String { + match (&self.url, &self.meta) { + (None, None) => format!("{:?}", self.store), + (None, Some(meta)) => { + // There's no great way to map an object_store to a url prefix if we're not + // provided the `url`; however, this is what we have access to on occasion. + // This is a heuristic that should work for https and a local filesystem, + // which is what we might be able to expect a non-DataFusion system like + // GDAL to be able to translate. + let object_store_debug = format!("{:?}", self.store).to_lowercase(); + if object_store_debug.contains("http") { + format!("https://{}", meta.location) + } else if object_store_debug.contains("local") { + format!("file://{}", meta.location) + } else { + format!("{object_store_debug}: {}", meta.location) + } + } + (Some(url), None) => url.to_string(), + (Some(url), Some(meta)) => format!("{url}/{}", meta.location), + } + } +} From d1e4c65395e1b838749a3a684203211a5f655d04 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 30 Oct 2025 17:26:28 -0500 Subject: [PATCH 07/19] provider sketch --- rust/sedona-datasource/src/format.rs | 19 ++++- rust/sedona-datasource/src/lib.rs | 1 + rust/sedona-datasource/src/provider.rs | 113 +++++++++++++++++++++++++ rust/sedona-datasource/src/spec.rs | 5 ++ 4 files changed, 136 insertions(+), 2 deletions(-) create mode 100644 rust/sedona-datasource/src/provider.rs diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index f32adfe12..b27442ce9 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -83,10 +83,16 @@ impl GetExt for RecordBatchReaderFormatFactory { } #[derive(Debug)] -struct RecordBatchReaderFormat { +pub struct RecordBatchReaderFormat { spec: Arc, } +impl RecordBatchReaderFormat { + pub fn new(spec: Arc) -> Self { + Self { spec } + } +} + #[async_trait] impl FileFormat for RecordBatchReaderFormat { fn as_any(&self) -> &dyn Any { @@ -343,7 +349,9 @@ mod test { Int32Array, Int64Array, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray, }; use arrow_schema::{DataType, Field}; - use datafusion::{execution::SessionStateBuilder, prelude::SessionContext}; + use datafusion::{ + config::TableOptions, execution::SessionStateBuilder, prelude::SessionContext, + }; use datafusion_common::plan_err; use std::io::Write; use tempfile::TempDir; @@ -377,6 +385,13 @@ mod test { Ok(Arc::new(self_clone)) } + fn with_table_options( + &self, + _table_options: &TableOptions, + ) -> Arc { + Arc::new(self.clone()) + } + async fn infer_schema(&self, _location: &Object) -> Result { Ok(Schema::new(vec![ Field::new("src", DataType::Utf8, true), diff --git a/rust/sedona-datasource/src/lib.rs b/rust/sedona-datasource/src/lib.rs index 8834b8fc2..4bdc59695 100644 --- a/rust/sedona-datasource/src/lib.rs +++ b/rust/sedona-datasource/src/lib.rs @@ -16,4 +16,5 @@ // under the License. pub mod format; +pub mod provider; pub mod spec; diff --git a/rust/sedona-datasource/src/provider.rs b/rust/sedona-datasource/src/provider.rs new file mode 100644 index 000000000..a555a8351 --- /dev/null +++ b/rust/sedona-datasource/src/provider.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::HashMap, sync::Arc}; + +use arrow_schema::{DataType, SchemaRef}; +use async_trait::async_trait; +use datafusion::{ + config::TableOptions, + datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + execution::{options::ReadOptions, SessionState}, + prelude::{SessionConfig, SessionContext}, +}; +use datafusion_common::{exec_err, Result}; + +use crate::{format::RecordBatchReaderFormat, spec::RecordBatchReaderFormatSpec}; + +#[derive(Debug, Clone)] +pub struct RecordBatchReaderTableOptions { + spec: Arc, + table_partition_cols: Vec<(String, DataType)>, + options: HashMap, +} + +impl RecordBatchReaderTableOptions { + pub fn new(spec: Arc) -> Self { + Self { + spec, + table_partition_cols: Vec::new(), + options: HashMap::new(), + } + } +} + +#[async_trait] +impl ReadOptions<'_> for RecordBatchReaderTableOptions { + fn to_listing_options( + &self, + config: &SessionConfig, + table_options: TableOptions, + ) -> ListingOptions { + let format = RecordBatchReaderFormat::new(self.spec.with_table_options(&table_options)); + ListingOptions::new(Arc::new(format)) + .with_file_extension(self.spec.extension()) + .with_table_partition_cols(self.table_partition_cols.clone()) + .with_session_config_options(config) + } + + async fn get_resolved_schema( + &self, + config: &SessionConfig, + state: SessionState, + table_path: ListingTableUrl, + ) -> Result { + self.to_listing_options(config, state.default_table_options()) + .infer_schema(&state, &table_path) + .await + } +} + +/// Create a [ListingTable] of GeoParquet (or normal Parquet) files +pub async fn generic_listing_table( + context: &SessionContext, + table_paths: Vec, + mut options: RecordBatchReaderTableOptions, +) -> Result { + let session_config = context.copied_config(); + + options.spec = options.spec.with_options(&options.options)?; + let listing_options = + options.to_listing_options(&session_config, context.copied_table_options()); + + let option_extension = listing_options.file_extension.clone(); + + if table_paths.is_empty() { + return exec_err!("No table paths were provided"); + } + + // check if the file extension matches the expected extension if one is provided + if !option_extension.is_empty() { + for path in &table_paths { + let file_path = path.as_str(); + if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() { + return exec_err!( + "File path '{file_path}' does not match the expected extension '{option_extension}'" + ); + } + } + } + + let resolved_schema = options + .get_resolved_schema(&session_config, context.state(), table_paths[0].clone()) + .await?; + let config = ListingTableConfig::new_with_multi_paths(table_paths) + .with_listing_options(listing_options) + .with_schema(resolved_schema); + + ListingTable::try_new(config) +} diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index 7b690b42a..4bc0ea7d6 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -21,6 +21,7 @@ use arrow_array::RecordBatchReader; use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; +use datafusion::config::TableOptions; use datafusion_common::{Result, Statistics}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::PhysicalExpr; @@ -33,6 +34,10 @@ pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { &self, options: &HashMap, ) -> Result>; + fn with_table_options( + &self, + table_options: &TableOptions, + ) -> Arc; async fn infer_schema(&self, location: &Object) -> Result; async fn infer_stats(&self, _location: &Object, table_schema: &Schema) -> Result { Ok(Statistics::new_unknown(table_schema)) From 9ccadfe210fc9effcb07a26f3f426595b3e2a8c6 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Sun, 2 Nov 2025 20:21:20 -0600 Subject: [PATCH 08/19] comments --- rust/sedona-datasource/src/format.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index b27442ce9..e3fc93515 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -450,6 +450,7 @@ mod test { let spec = Arc::new(EchoSpec::default()); let factory = RecordBatchReaderFormatFactory::new(spec); + // Create a temporary directory with a few files with the declared extension let temp_dir = TempDir::new().unwrap(); let temp_path = temp_dir.path(); let file0 = temp_path.join("item0.echospec"); @@ -463,10 +464,12 @@ mod test { .write_all(b"not empty") .unwrap(); + // Register the format let mut state = SessionStateBuilder::new().build(); state.register_file_format(Arc::new(factory), true).unwrap(); let ctx = SessionContext::new_with_state(state).enable_url_table(); + // Select using just the filename and ensure we get a result let batches_item0 = ctx .table(file0.to_string_lossy().to_string()) .await From 5b876293f447dcd071d1f6a44449c72535cccc81 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 15:45:12 -0600 Subject: [PATCH 09/19] more tests --- rust/sedona-datasource/src/format.rs | 141 +++++++++++++++++++------ rust/sedona-datasource/src/provider.rs | 6 +- rust/sedona-datasource/src/spec.rs | 2 +- 3 files changed, 115 insertions(+), 34 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index e3fc93515..bd9019301 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -220,7 +220,7 @@ impl FileSource for SedonaFileSource { &self, store: Arc, base_config: &FileScanConfig, - partition: usize, + _partition: usize, ) -> Arc { let args = OpenReaderArgs { src: Object { @@ -237,7 +237,6 @@ impl FileSource for SedonaFileSource { Arc::new(SimpleOpener { spec: self.spec.clone(), args, - partition, }) } @@ -322,13 +321,14 @@ impl FileSource for SedonaFileSource { struct SimpleOpener { spec: Arc, args: OpenReaderArgs, - partition: usize, } impl FileOpener for SimpleOpener { fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> Result { - if self.partition != 0 { - return sedona_internal_err!("Expected SimpleOpener to open a single partition"); + if file_meta.range.is_some() { + return sedona_internal_err!( + "Expected SimpleOpener to open a single partition per file" + ); } let mut self_clone = self.clone(); @@ -350,14 +350,63 @@ mod test { }; use arrow_schema::{DataType, Field}; use datafusion::{ - config::TableOptions, execution::SessionStateBuilder, prelude::SessionContext, + config::TableOptions, datasource::listing::ListingTableUrl, execution::SessionStateBuilder, + prelude::SessionContext, }; use datafusion_common::plan_err; - use std::io::Write; + use std::{ + io::{Read, Write}, + path::PathBuf, + }; use tempfile::TempDir; + use url::Url; + + use crate::provider::{record_batch_reader_listing_table, RecordBatchReaderTableOptions}; use super::*; + fn create_echo_spec_ctx() -> SessionContext { + let spec = Arc::new(EchoSpec::default()); + let factory = RecordBatchReaderFormatFactory::new(spec.clone()); + + // Register the format + let mut state = SessionStateBuilder::new().build(); + state.register_file_format(Arc::new(factory), true).unwrap(); + SessionContext::new_with_state(state).enable_url_table() + } + + fn create_echo_spec_temp_dir() -> (TempDir, Vec) { + // Create a temporary directory with a few files with the declared extension + let temp_dir = TempDir::new().unwrap(); + let temp_path = temp_dir.path(); + let file0 = temp_path.join("item0.echospec"); + std::fs::File::create(&file0) + .unwrap() + .write_all(b"not empty") + .unwrap(); + let file1 = temp_path.join("item1.echospec"); + std::fs::File::create(&file1) + .unwrap() + .write_all(b"not empty") + .unwrap(); + (temp_dir, vec![file0, file1]) + } + + fn check_object_is_readable_file(location: &Object) { + let url = Url::parse(&location.to_url_string()).expect("valid uri"); + assert_eq!(url.scheme(), "file"); + let path = url.to_file_path().expect("can extract file path"); + + let mut content = String::new(); + std::fs::File::open(path) + .expect("url can't be opened") + .read_to_string(&mut content) + .expect("failed to read"); + if content.is_empty() { + panic!("empty file at url {url}"); + } + } + #[derive(Debug, Default, Clone)] struct EchoSpec { option_value: Option, @@ -392,7 +441,8 @@ mod test { Arc::new(self.clone()) } - async fn infer_schema(&self, _location: &Object) -> Result { + async fn infer_schema(&self, location: &Object) -> Result { + check_object_is_readable_file(location); Ok(Schema::new(vec![ Field::new("src", DataType::Utf8, true), Field::new("batch_size", DataType::Int64, true), @@ -403,9 +453,10 @@ mod test { async fn infer_stats( &self, - _location: &Object, + location: &Object, table_schema: &Schema, ) -> Result { + check_object_is_readable_file(location); Ok(Statistics::new_unknown(table_schema)) } @@ -413,6 +464,8 @@ mod test { &self, args: &OpenReaderArgs, ) -> Result> { + check_object_is_readable_file(&args.src); + let src: StringArray = [args.src.clone()] .iter() .map(|item| Some(item.to_url_string())) @@ -447,31 +500,12 @@ mod test { #[tokio::test] async fn spec_format() { - let spec = Arc::new(EchoSpec::default()); - let factory = RecordBatchReaderFormatFactory::new(spec); - - // Create a temporary directory with a few files with the declared extension - let temp_dir = TempDir::new().unwrap(); - let temp_path = temp_dir.path(); - let file0 = temp_path.join("item0.echospec"); - std::fs::File::create(&file0) - .unwrap() - .write_all(b"not empty") - .unwrap(); - let file1 = temp_path.join("item1.echospec"); - std::fs::File::create(&file1) - .unwrap() - .write_all(b"not empty") - .unwrap(); - - // Register the format - let mut state = SessionStateBuilder::new().build(); - state.register_file_format(Arc::new(factory), true).unwrap(); - let ctx = SessionContext::new_with_state(state).enable_url_table(); + let ctx = create_echo_spec_ctx(); + let (temp_dir, files) = create_echo_spec_temp_dir(); // Select using just the filename and ensure we get a result let batches_item0 = ctx - .table(file0.to_string_lossy().to_string()) + .table(files[0].to_string_lossy().to_string()) .await .unwrap() .collect() @@ -480,5 +514,50 @@ mod test { assert_eq!(batches_item0.len(), 1); assert_eq!(batches_item0[0].num_rows(), 1); + + // With a glob we should get all the files + let batches = ctx + .table(format!("{}/*.echospec", temp_dir.path().to_string_lossy())) + .await + .unwrap() + .collect() + .await + .unwrap(); + // We should get one value per partition + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 1); + assert_eq!(batches[1].num_rows(), 1); + } + + #[tokio::test] + async fn spec_listing_table() { + let spec = Arc::new(EchoSpec::default()); + let ctx = SessionContext::new(); + let (_temp_dir, files) = create_echo_spec_temp_dir(); + + // Select using a listing table and ensure we get a result + let options = RecordBatchReaderTableOptions::new(spec); + let provider = record_batch_reader_listing_table( + &ctx, + files + .iter() + .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) + .collect(), + options, + ) + .await + .unwrap(); + + let batches = ctx + .read_table(Arc::new(provider)) + .unwrap() + .collect() + .await + .unwrap(); + + // We should get one value per partition + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 1); + assert_eq!(batches[1].num_rows(), 1); } } diff --git a/rust/sedona-datasource/src/provider.rs b/rust/sedona-datasource/src/provider.rs index a555a8351..735c8bb05 100644 --- a/rust/sedona-datasource/src/provider.rs +++ b/rust/sedona-datasource/src/provider.rs @@ -33,6 +33,7 @@ use crate::{format::RecordBatchReaderFormat, spec::RecordBatchReaderFormatSpec}; pub struct RecordBatchReaderTableOptions { spec: Arc, table_partition_cols: Vec<(String, DataType)>, + check_extension: bool, options: HashMap, } @@ -41,6 +42,7 @@ impl RecordBatchReaderTableOptions { Self { spec, table_partition_cols: Vec::new(), + check_extension: true, options: HashMap::new(), } } @@ -73,7 +75,7 @@ impl ReadOptions<'_> for RecordBatchReaderTableOptions { } /// Create a [ListingTable] of GeoParquet (or normal Parquet) files -pub async fn generic_listing_table( +pub async fn record_batch_reader_listing_table( context: &SessionContext, table_paths: Vec, mut options: RecordBatchReaderTableOptions, @@ -91,7 +93,7 @@ pub async fn generic_listing_table( } // check if the file extension matches the expected extension if one is provided - if !option_extension.is_empty() { + if !option_extension.is_empty() && options.check_extension { for path in &table_paths { let file_path = path.as_str(); if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() { diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index 4bc0ea7d6..357c7158a 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -76,7 +76,7 @@ impl Object { if object_store_debug.contains("http") { format!("https://{}", meta.location) } else if object_store_debug.contains("local") { - format!("file://{}", meta.location) + format!("file:///{}", meta.location) } else { format!("{object_store_debug}: {}", meta.location) } From 2f13728783c89d5b2ca69a2d2a70afa4d5161b50 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 16:35:55 -0600 Subject: [PATCH 10/19] experimental repartitioning --- rust/sedona-datasource/src/format.rs | 48 +++++++++++++++++++++------- rust/sedona-datasource/src/spec.rs | 14 ++++++-- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index bd9019301..696c4e58c 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -25,7 +25,8 @@ use datafusion::{ file_format::{file_compression_type::FileCompressionType, FileFormat, FileFormatFactory}, listing::PartitionedFile, physical_plan::{ - FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileSinkConfig, FileSource, + FileGroupPartitioner, FileMeta, FileOpenFuture, FileOpener, FileScanConfig, + FileSinkConfig, FileSource, }, }, }; @@ -41,7 +42,7 @@ use futures::{StreamExt, TryStreamExt}; use object_store::{ObjectMeta, ObjectStore}; use sedona_common::sedona_internal_err; -use crate::spec::{Object, OpenReaderArgs, RecordBatchReaderFormatSpec}; +use crate::spec::{Object, OpenReaderArgs, RecordBatchReaderFormatSpec, SupportsRepartition}; #[derive(Debug)] pub struct RecordBatchReaderFormatFactory { @@ -125,9 +126,10 @@ impl FileFormat for RecordBatchReaderFormat { let schema = self .spec .infer_schema(&Object { - store: store.clone(), + store: Some(store.clone()), url: None, meta: Some(object.clone()), + range: None, }) .await?; Ok::<_, DataFusionError>((object.location.clone(), schema)) @@ -158,9 +160,10 @@ impl FileFormat for RecordBatchReaderFormat { self.spec .infer_stats( &Object { - store: store.clone(), + store: Some(store.clone()), url: None, meta: Some(object.clone()), + range: None, }, &table_schema, ) @@ -224,9 +227,10 @@ impl FileSource for SedonaFileSource { ) -> Arc { let args = OpenReaderArgs { src: Object { - store: store.clone(), + store: Some(store.clone()), url: Some(base_config.object_store_url.clone()), meta: None, + range: None, }, batch_size: self.batch_size, file_schema: self.file_schema.clone(), @@ -304,16 +308,35 @@ impl FileSource for SedonaFileSource { self.spec.extension() } - // File formats implemented in this way can't be repartitioned. File formats that - // benefit from this need their own FileFormat implementation. fn repartitioned( &self, - _target_partitions: usize, - _repartition_file_min_size: usize, - _output_ordering: Option, - _config: &FileScanConfig, + target_partitions: usize, + repartition_file_min_size: usize, + output_ordering: Option, + config: &FileScanConfig, ) -> Result> { - Ok(None) + match self.spec.supports_repartition() { + SupportsRepartition::None => Ok(None), + SupportsRepartition::ByRange => { + // Default implementation + if config.file_compression_type.is_compressed() || config.new_lines_in_values { + return Ok(None); + } + + let repartitioned_file_groups_option = FileGroupPartitioner::new() + .with_target_partitions(target_partitions) + .with_repartition_file_min_size(repartition_file_min_size) + .with_preserve_order_within_groups(output_ordering.is_some()) + .repartition_file_groups(&config.file_groups); + + if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { + let mut source = config.clone(); + source.file_groups = repartitioned_file_groups; + return Ok(Some(source)); + } + Ok(None) + } + } } } @@ -334,6 +357,7 @@ impl FileOpener for SimpleOpener { let mut self_clone = self.clone(); Ok(Box::pin(async move { self_clone.args.src.meta.replace(file_meta.object_meta); + self_clone.args.src.range = file_meta.range; let reader = self_clone.spec.open_reader(&self_clone.args).await?; let stream = futures::stream::iter(reader.into_iter().map(|batch| batch.map_err(Into::into))); diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index 357c7158a..41a460114 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -21,7 +21,7 @@ use arrow_array::RecordBatchReader; use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; -use datafusion::config::TableOptions; +use datafusion::{config::TableOptions, datasource::listing::FileRange}; use datafusion_common::{Result, Statistics}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::PhysicalExpr; @@ -38,6 +38,9 @@ pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { &self, table_options: &TableOptions, ) -> Arc; + fn supports_repartition(&self) -> SupportsRepartition { + SupportsRepartition::None + } async fn infer_schema(&self, location: &Object) -> Result; async fn infer_stats(&self, _location: &Object, table_schema: &Schema) -> Result { Ok(Statistics::new_unknown(table_schema)) @@ -46,6 +49,12 @@ pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { -> Result>; } +#[derive(Debug, Clone, Copy)] +pub enum SupportsRepartition { + None, + ByRange, +} + #[derive(Debug, Clone)] pub struct OpenReaderArgs { pub src: Object, @@ -57,9 +66,10 @@ pub struct OpenReaderArgs { #[derive(Debug, Clone)] pub struct Object { - pub store: Arc, + pub store: Option>, pub url: Option, pub meta: Option, + pub range: Option, } impl Object { From b7c37e52abae0aace8d4df20c19c7f2b2fd9f357 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 17:05:53 -0600 Subject: [PATCH 11/19] try again --- rust/sedona-datasource/src/format.rs | 38 ++++++++++++++++++++++++-- rust/sedona-datasource/src/provider.rs | 1 - 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 696c4e58c..79e074c4c 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -374,8 +374,11 @@ mod test { }; use arrow_schema::{DataType, Field}; use datafusion::{ - config::TableOptions, datasource::listing::ListingTableUrl, execution::SessionStateBuilder, - prelude::SessionContext, + assert_batches_eq, + config::TableOptions, + datasource::listing::ListingTableUrl, + execution::SessionStateBuilder, + prelude::{col, lit, SessionContext}, }; use datafusion_common::plan_err; use std::{ @@ -553,6 +556,37 @@ mod test { assert_eq!(batches[1].num_rows(), 1); } + #[tokio::test] + async fn spec_format_project_filter() { + let ctx = create_echo_spec_ctx(); + let (_temp_dir, files) = create_echo_spec_temp_dir(); + + // Ensure that if we pass + let batches = ctx + .table(files[0].to_string_lossy().to_string()) + .await + .unwrap() + .filter(col("src").like(lit("%echospec"))) + .unwrap() + .select(vec![col("batch_size"), col("filter_count")]) + .unwrap() + .collect() + .await + .unwrap(); + + // We don't seem to be getting the filter here :( + assert_batches_eq!( + [ + "+------------+--------------+", + "| batch_size | filter_count |", + "+------------+--------------+", + "| 8192 | 1 |", + "+------------+--------------+", + ], + &batches + ); + } + #[tokio::test] async fn spec_listing_table() { let spec = Arc::new(EchoSpec::default()); diff --git a/rust/sedona-datasource/src/provider.rs b/rust/sedona-datasource/src/provider.rs index 735c8bb05..c80b8250b 100644 --- a/rust/sedona-datasource/src/provider.rs +++ b/rust/sedona-datasource/src/provider.rs @@ -74,7 +74,6 @@ impl ReadOptions<'_> for RecordBatchReaderTableOptions { } } -/// Create a [ListingTable] of GeoParquet (or normal Parquet) files pub async fn record_batch_reader_listing_table( context: &SessionContext, table_paths: Vec, From 4824cac04ca9aa26e4ea13fd1b45788896f22f9c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 20:46:30 -0600 Subject: [PATCH 12/19] accumulate filters --- rust/sedona-datasource/src/format.rs | 22 +++++++++++----------- rust/sedona-datasource/src/spec.rs | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 79e074c4c..53043efc7 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -199,7 +199,7 @@ struct SedonaFileSource { batch_size: Option, file_schema: Option, file_projection: Option>, - filters: Option>>, + filters: Vec>, metrics: ExecutionPlanMetricsSet, projected_statistics: Option, } @@ -211,7 +211,7 @@ impl SedonaFileSource { batch_size: None, file_schema: None, file_projection: None, - filters: None, + filters: Vec::new(), metrics: ExecutionPlanMetricsSet::default(), projected_statistics: None, } @@ -249,14 +249,17 @@ impl FileSource for SedonaFileSource { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { + let num_filters = filters.len(); + let mut new_filters = self.filters.clone(); + new_filters.extend(filters); let source = Self { - filters: Some(filters.clone()), + filters: new_filters, ..self.clone() }; Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![ PushedDown::No; - filters.len() + num_filters ]) .with_updated_node(Arc::new(source))) } @@ -501,9 +504,7 @@ mod test { .iter() .map(|item| item.map(|i| i as i64)) .collect(); - let filter_count: Int32Array = [args.filters.clone().map(|f| f.len() as i32)] - .iter() - .collect(); + let filter_count: Int32Array = [args.filters.len() as i32].into_iter().collect(); let option_value: StringArray = [self.option_value.clone()].iter().collect(); let schema = Arc::new(self.infer_schema(&args.src).await?); @@ -559,14 +560,14 @@ mod test { #[tokio::test] async fn spec_format_project_filter() { let ctx = create_echo_spec_ctx(); - let (_temp_dir, files) = create_echo_spec_temp_dir(); + let (temp_dir, _files) = create_echo_spec_temp_dir(); // Ensure that if we pass let batches = ctx - .table(files[0].to_string_lossy().to_string()) + .table(format!("{}/*.echospec", temp_dir.path().to_string_lossy())) .await .unwrap() - .filter(col("src").like(lit("%echospec"))) + .filter(col("src").like(lit("%item0%"))) .unwrap() .select(vec![col("batch_size"), col("filter_count")]) .unwrap() @@ -574,7 +575,6 @@ mod test { .await .unwrap(); - // We don't seem to be getting the filter here :( assert_batches_eq!( [ "+------------+--------------+", diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index 41a460114..e7ef7d039 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -61,7 +61,7 @@ pub struct OpenReaderArgs { pub batch_size: Option, pub file_schema: Option, pub file_projection: Option>, - pub filters: Option>>, + pub filters: Vec>, } #[derive(Debug, Clone)] From 73c9537e353a92dfe7471bce5c8436953a751767 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 21:39:53 -0600 Subject: [PATCH 13/19] tests --- rust/sedona-datasource/src/format.rs | 97 ++++++++++++++++++++++++++ rust/sedona-datasource/src/provider.rs | 17 ++--- rust/sedona-datasource/src/spec.rs | 3 +- 3 files changed, 104 insertions(+), 13 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 53043efc7..91391f9e2 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -618,4 +618,101 @@ mod test { assert_eq!(batches[0].num_rows(), 1); assert_eq!(batches[1].num_rows(), 1); } + + #[tokio::test] + async fn spec_listing_table_options() { + let spec = Arc::new(EchoSpec::default()) + .with_options(&[("option_value".to_string(), "foofy".to_string())].into()) + .unwrap(); + + let ctx = SessionContext::new(); + let (_temp_dir, files) = create_echo_spec_temp_dir(); + + // Select using a listing table and ensure we get a result with the option passed + let options = RecordBatchReaderTableOptions::new(spec); + let provider = record_batch_reader_listing_table( + &ctx, + files + .iter() + .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) + .collect(), + options, + ) + .await + .unwrap(); + + let batches = ctx + .read_table(Arc::new(provider)) + .unwrap() + .select(vec![col("batch_size"), col("option_value")]) + .unwrap() + .collect() + .await + .unwrap(); + assert_batches_eq!( + [ + "+------------+--------------+", + "| batch_size | option_value |", + "+------------+--------------+", + "| 8192 | foofy |", + "| 8192 | foofy |", + "+------------+--------------+", + ], + &batches + ); + } + + #[tokio::test] + async fn spec_listing_table_errors() { + let spec = Arc::new(EchoSpec::default()) + .with_options(&[("option_value".to_string(), "foofy".to_string())].into()) + .unwrap(); + + let ctx = SessionContext::new(); + let mut options = RecordBatchReaderTableOptions::new(spec); + let (temp_dir, mut files) = create_echo_spec_temp_dir(); + + // Listing table with no files should error + let err = record_batch_reader_listing_table(&ctx, vec![], options.clone()) + .await + .unwrap_err(); + assert_eq!(err.message(), "No table paths were provided"); + + // Create a file with a different extension + let file2 = temp_dir.path().join("item2.echospecNOT"); + std::fs::File::create(&file2) + .unwrap() + .write_all(b"not empty") + .unwrap(); + files.push(file2); + + // With the default options we should get an error + let err = record_batch_reader_listing_table( + &ctx, + files + .iter() + .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) + .collect(), + options.clone(), + ) + .await + .unwrap_err(); + + assert!(err + .message() + .ends_with("does not match the expected extension 'echospec'")); + + // ...but we should be able to turn off the error + options.check_extension = false; + record_batch_reader_listing_table( + &ctx, + files + .iter() + .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) + .collect(), + options, + ) + .await + .unwrap(); + } } diff --git a/rust/sedona-datasource/src/provider.rs b/rust/sedona-datasource/src/provider.rs index c80b8250b..f7a53eee9 100644 --- a/rust/sedona-datasource/src/provider.rs +++ b/rust/sedona-datasource/src/provider.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; -use arrow_schema::{DataType, SchemaRef}; +use arrow_schema::SchemaRef; use async_trait::async_trait; use datafusion::{ config::TableOptions, @@ -31,19 +31,15 @@ use crate::{format::RecordBatchReaderFormat, spec::RecordBatchReaderFormatSpec}; #[derive(Debug, Clone)] pub struct RecordBatchReaderTableOptions { - spec: Arc, - table_partition_cols: Vec<(String, DataType)>, - check_extension: bool, - options: HashMap, + pub spec: Arc, + pub check_extension: bool, } impl RecordBatchReaderTableOptions { pub fn new(spec: Arc) -> Self { Self { spec, - table_partition_cols: Vec::new(), check_extension: true, - options: HashMap::new(), } } } @@ -58,7 +54,6 @@ impl ReadOptions<'_> for RecordBatchReaderTableOptions { let format = RecordBatchReaderFormat::new(self.spec.with_table_options(&table_options)); ListingOptions::new(Arc::new(format)) .with_file_extension(self.spec.extension()) - .with_table_partition_cols(self.table_partition_cols.clone()) .with_session_config_options(config) } @@ -77,11 +72,9 @@ impl ReadOptions<'_> for RecordBatchReaderTableOptions { pub async fn record_batch_reader_listing_table( context: &SessionContext, table_paths: Vec, - mut options: RecordBatchReaderTableOptions, + options: RecordBatchReaderTableOptions, ) -> Result { let session_config = context.copied_config(); - - options.spec = options.spec.with_options(&options.options)?; let listing_options = options.to_listing_options(&session_config, context.copied_table_options()); diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index e7ef7d039..371335e86 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -78,7 +78,8 @@ impl Object { (None, None) => format!("{:?}", self.store), (None, Some(meta)) => { // There's no great way to map an object_store to a url prefix if we're not - // provided the `url`; however, this is what we have access to on occasion. + // provided the `url`; however, this is what we have access to in the + // Schema and Statistics resolution phases of the FileFormat. // This is a heuristic that should work for https and a local filesystem, // which is what we might be able to expect a non-DataFusion system like // GDAL to be able to translate. From 13d719b37f9fcbd268242e4eae089ca4b83c0ae2 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 21:59:59 -0600 Subject: [PATCH 14/19] make some things internal --- rust/sedona-datasource/src/format.rs | 22 ++++---- rust/sedona-datasource/src/provider.rs | 78 ++++++++++++-------------- 2 files changed, 48 insertions(+), 52 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 91391f9e2..9e8dfbb97 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -391,7 +391,7 @@ mod test { use tempfile::TempDir; use url::Url; - use crate::provider::{record_batch_reader_listing_table, RecordBatchReaderTableOptions}; + use crate::provider::record_batch_reader_listing_table; use super::*; @@ -594,14 +594,14 @@ mod test { let (_temp_dir, files) = create_echo_spec_temp_dir(); // Select using a listing table and ensure we get a result - let options = RecordBatchReaderTableOptions::new(spec); let provider = record_batch_reader_listing_table( + spec, &ctx, files .iter() .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) .collect(), - options, + true, ) .await .unwrap(); @@ -629,14 +629,14 @@ mod test { let (_temp_dir, files) = create_echo_spec_temp_dir(); // Select using a listing table and ensure we get a result with the option passed - let options = RecordBatchReaderTableOptions::new(spec); let provider = record_batch_reader_listing_table( + spec, &ctx, files .iter() .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) .collect(), - options, + true, ) .await .unwrap(); @@ -669,11 +669,10 @@ mod test { .unwrap(); let ctx = SessionContext::new(); - let mut options = RecordBatchReaderTableOptions::new(spec); let (temp_dir, mut files) = create_echo_spec_temp_dir(); // Listing table with no files should error - let err = record_batch_reader_listing_table(&ctx, vec![], options.clone()) + let err = record_batch_reader_listing_table(spec.clone(), &ctx, vec![], true) .await .unwrap_err(); assert_eq!(err.message(), "No table paths were provided"); @@ -686,14 +685,15 @@ mod test { .unwrap(); files.push(file2); - // With the default options we should get an error + // With check_extension as true we should get an error let err = record_batch_reader_listing_table( + spec.clone(), &ctx, files .iter() .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) .collect(), - options.clone(), + true, ) .await .unwrap_err(); @@ -703,14 +703,14 @@ mod test { .ends_with("does not match the expected extension 'echospec'")); // ...but we should be able to turn off the error - options.check_extension = false; record_batch_reader_listing_table( + spec, &ctx, files .iter() .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) .collect(), - options, + false, ) .await .unwrap(); diff --git a/rust/sedona-datasource/src/provider.rs b/rust/sedona-datasource/src/provider.rs index f7a53eee9..ad4cfb2b6 100644 --- a/rust/sedona-datasource/src/provider.rs +++ b/rust/sedona-datasource/src/provider.rs @@ -29,52 +29,17 @@ use datafusion_common::{exec_err, Result}; use crate::{format::RecordBatchReaderFormat, spec::RecordBatchReaderFormatSpec}; -#[derive(Debug, Clone)] -pub struct RecordBatchReaderTableOptions { - pub spec: Arc, - pub check_extension: bool, -} - -impl RecordBatchReaderTableOptions { - pub fn new(spec: Arc) -> Self { - Self { - spec, - check_extension: true, - } - } -} - -#[async_trait] -impl ReadOptions<'_> for RecordBatchReaderTableOptions { - fn to_listing_options( - &self, - config: &SessionConfig, - table_options: TableOptions, - ) -> ListingOptions { - let format = RecordBatchReaderFormat::new(self.spec.with_table_options(&table_options)); - ListingOptions::new(Arc::new(format)) - .with_file_extension(self.spec.extension()) - .with_session_config_options(config) - } - - async fn get_resolved_schema( - &self, - config: &SessionConfig, - state: SessionState, - table_path: ListingTableUrl, - ) -> Result { - self.to_listing_options(config, state.default_table_options()) - .infer_schema(&state, &table_path) - .await - } -} - pub async fn record_batch_reader_listing_table( + spec: Arc, context: &SessionContext, table_paths: Vec, - options: RecordBatchReaderTableOptions, + check_extension: bool, ) -> Result { let session_config = context.copied_config(); + let options = RecordBatchReaderTableOptions { + spec, + check_extension, + }; let listing_options = options.to_listing_options(&session_config, context.copied_table_options()); @@ -105,3 +70,34 @@ pub async fn record_batch_reader_listing_table( ListingTable::try_new(config) } + +#[derive(Debug, Clone)] +struct RecordBatchReaderTableOptions { + spec: Arc, + check_extension: bool, +} + +#[async_trait] +impl ReadOptions<'_> for RecordBatchReaderTableOptions { + fn to_listing_options( + &self, + config: &SessionConfig, + table_options: TableOptions, + ) -> ListingOptions { + let format = RecordBatchReaderFormat::new(self.spec.with_table_options(&table_options)); + ListingOptions::new(Arc::new(format)) + .with_file_extension(self.spec.extension()) + .with_session_config_options(config) + } + + async fn get_resolved_schema( + &self, + config: &SessionConfig, + state: SessionState, + table_path: ListingTableUrl, + ) -> Result { + self.to_listing_options(config, state.default_table_options()) + .infer_schema(&state, &table_path) + .await + } +} From 2a0f55cf82efb8389f310ddbeb1c875235b9c64e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 22:44:57 -0600 Subject: [PATCH 15/19] documennt --- rust/sedona-datasource/src/format.rs | 12 +-- rust/sedona-datasource/src/provider.rs | 7 +- rust/sedona-datasource/src/spec.rs | 120 ++++++++++++++++++++++--- 3 files changed, 115 insertions(+), 24 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 9e8dfbb97..e1067b5ff 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -378,7 +378,6 @@ mod test { use arrow_schema::{DataType, Field}; use datafusion::{ assert_batches_eq, - config::TableOptions, datasource::listing::ListingTableUrl, execution::SessionStateBuilder, prelude::{col, lit, SessionContext}, @@ -423,7 +422,7 @@ mod test { } fn check_object_is_readable_file(location: &Object) { - let url = Url::parse(&location.to_url_string()).expect("valid uri"); + let url = Url::parse(&location.to_url_string().unwrap()).expect("valid uri"); assert_eq!(url.scheme(), "file"); let path = url.to_file_path().expect("can extract file path"); @@ -464,13 +463,6 @@ mod test { Ok(Arc::new(self_clone)) } - fn with_table_options( - &self, - _table_options: &TableOptions, - ) -> Arc { - Arc::new(self.clone()) - } - async fn infer_schema(&self, location: &Object) -> Result { check_object_is_readable_file(location); Ok(Schema::new(vec![ @@ -498,7 +490,7 @@ mod test { let src: StringArray = [args.src.clone()] .iter() - .map(|item| Some(item.to_url_string())) + .map(|item| Some(item.to_url_string().unwrap())) .collect(); let batch_size: Int64Array = [args.batch_size] .iter() diff --git a/rust/sedona-datasource/src/provider.rs b/rust/sedona-datasource/src/provider.rs index ad4cfb2b6..df744e167 100644 --- a/rust/sedona-datasource/src/provider.rs +++ b/rust/sedona-datasource/src/provider.rs @@ -84,7 +84,12 @@ impl ReadOptions<'_> for RecordBatchReaderTableOptions { config: &SessionConfig, table_options: TableOptions, ) -> ListingOptions { - let format = RecordBatchReaderFormat::new(self.spec.with_table_options(&table_options)); + let format = if let Some(modified) = self.spec.with_table_options(&table_options) { + RecordBatchReaderFormat::new(modified) + } else { + RecordBatchReaderFormat::new(self.spec.clone()) + }; + ListingOptions::new(Arc::new(format)) .with_file_extension(self.spec.extension()) .with_session_config_options(config) diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index 371335e86..835c957b8 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -27,55 +27,148 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::PhysicalExpr; use object_store::{ObjectMeta, ObjectStore}; +/// Simple file format specification +/// +/// In DataFusion, various parts of the file format are split among the +/// FileFormatFactory, the FileFormat, the FileSource, the FileOpener, +/// and a few other traits. This trait is designed to provide a few +/// important features of a natively implemented FileFormat but consolidating +/// the components of implementing the format in the same place. This is +/// intended to provide a less verbose way to implement readers for a wide +/// variety of spatial formats. #[async_trait] pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { - fn extension(&self) -> &str; + /// Infer a schema for a given file + /// + /// Given a single file, infer what schema [RecordBatchReaderFormatSpec::open_reader] + /// would produce in the absence of any other guidance. + async fn infer_schema(&self, location: &Object) -> Result; + + /// Open a [RecordBatchReader] for a given file + /// + /// The implementation must handle the `file_projection`; however, + /// need not handle the `filters` (but may use them for pruning). + async fn open_reader(&self, args: &OpenReaderArgs) + -> Result>; + + /// A file extension or `""` if this concept does not apply + fn extension(&self) -> &str { + "" + } + + /// Compute a clone of self but with the key/value options specified + /// + /// Implementations should error for invalid key/value input that does + /// not apply to this reader. fn with_options( &self, options: &HashMap, ) -> Result>; + + /// Fill in default options from [TableOptions] + /// + /// The TableOptions are a DataFusion concept that provide a means by which + /// options can be set for various table formats. If the defaults for a built-in + /// table format are reasonable to fill in or if Extensions have been set, + /// these can be accessed and used to fill default options. Note that any options + /// set with [RecordBatchReaderFormatSpec::with_options] should take precedent. fn with_table_options( &self, - table_options: &TableOptions, - ) -> Arc; + _table_options: &TableOptions, + ) -> Option> { + None + } + + /// Allow repartitioning + /// + /// This allows an implementation to opt in to DataFusion's built-in file size + /// based partitioner, which works well for partitioning files where a simple + /// file plus byte range is sufficient. The default opts out of this feature + /// (i.e., every file is passed exactly one to [RecordBatchReaderFormatSpec::open_reader] + /// without a `range`). fn supports_repartition(&self) -> SupportsRepartition { SupportsRepartition::None } - async fn infer_schema(&self, location: &Object) -> Result; + + /// Infer [Statistics] for a given file async fn infer_stats(&self, _location: &Object, table_schema: &Schema) -> Result { Ok(Statistics::new_unknown(table_schema)) } - async fn open_reader(&self, args: &OpenReaderArgs) - -> Result>; } +/// Enumerator for repartitioning support #[derive(Debug, Clone, Copy)] pub enum SupportsRepartition { + /// This implementation does not support repartitioning beyond the file level None, + /// This implementation supports partitioning by arbitrary ranges with a file ByRange, } +/// Arguments to [RecordBatchReaderFormatSpec::open_reader] #[derive(Debug, Clone)] pub struct OpenReaderArgs { + /// The input file, or partial file if [SupportsRepartition::ByRange] is used pub src: Object, + + /// The requested batch size + /// + /// DataFusion will usually fill this in to a default of 8192 or a user-specified + /// default in the session configuration. pub batch_size: Option, + + /// The requested file schema, if specified + /// + /// DataFusion will usually fill this in to the schema inferred by + /// [RecordBatchReaderFormatSpec::infer_schema]. pub file_schema: Option, + + /// The requested field indices + /// + /// Implementations must handle this (e.g., using `RecordBatch::project` + /// or by implementing partial reads). pub file_projection: Option>, + + /// Filter expressions + /// + /// Expressions that may be used for pruning. Implementations need not + /// apply these filters. pub filters: Vec>, } +/// The information required to specify a file or partial file +/// +/// Depending exactly where in DataFusion we are calling in from, we might +/// have various information about the file. In general, implementations should +/// use [ObjectStore] and [ObjectMeta] to access the file to use DataFusion's +/// registered IO for these protocols. When implementing a filename-based reader +/// (e.g., that uses some external API to read files), use [Object::to_url_string]. #[derive(Debug, Clone)] pub struct Object { + /// The object store reference pub store: Option>, + + /// A URL that may be used to retrieve an [ObjectStore] from a registry + /// + /// These URLs typically are populated only with the scheme. pub url: Option, + + /// An individual object in an ObjectStore pub meta: Option, + + /// If this represents a partial file, the byte range within the file + /// + /// This is only set if partitioning other than `None` is provided pub range: Option, } impl Object { - pub fn to_url_string(&self) -> String { + /// Convert this object to a URL string, if possible + /// + /// Returns `None` if there is not suficient information in the Object to calculate + /// this. + pub fn to_url_string(&self) -> Option { match (&self.url, &self.meta) { - (None, None) => format!("{:?}", self.store), (None, Some(meta)) => { // There's no great way to map an object_store to a url prefix if we're not // provided the `url`; however, this is what we have access to in the @@ -85,15 +178,16 @@ impl Object { // GDAL to be able to translate. let object_store_debug = format!("{:?}", self.store).to_lowercase(); if object_store_debug.contains("http") { - format!("https://{}", meta.location) + Some(format!("https://{}", meta.location)) } else if object_store_debug.contains("local") { - format!("file:///{}", meta.location) + Some(format!("file:///{}", meta.location)) } else { - format!("{object_store_debug}: {}", meta.location) + None } } - (Some(url), None) => url.to_string(), - (Some(url), Some(meta)) => format!("{url}/{}", meta.location), + (Some(url), None) => Some(url.to_string()), + (Some(url), Some(meta)) => Some(format!("{url}/{}", meta.location)), + (None, None) => None, } } } From f55bf713dea9d100deeb2065438abe3ec8199098 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 22:48:13 -0600 Subject: [PATCH 16/19] document another pub class --- rust/sedona-datasource/src/format.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index e1067b5ff..170c96e7a 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -44,6 +44,10 @@ use sedona_common::sedona_internal_err; use crate::spec::{Object, OpenReaderArgs, RecordBatchReaderFormatSpec, SupportsRepartition}; +/// Create a [FileFormatFactory] from a [RecordBatchReaderFormatSpec] +/// +/// The FileFormatFactory is the object that may be reigstered with a +/// SessionStateBuilder to allow SQL queries to access this format. #[derive(Debug)] pub struct RecordBatchReaderFormatFactory { spec: Arc, @@ -84,7 +88,7 @@ impl GetExt for RecordBatchReaderFormatFactory { } #[derive(Debug)] -pub struct RecordBatchReaderFormat { +pub(crate) struct RecordBatchReaderFormat { spec: Arc, } From 17bc525fa080390ac7c17a604817b5eb6dcc62b9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 22:50:55 -0600 Subject: [PATCH 17/19] comments --- rust/sedona-datasource/src/format.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 170c96e7a..b45fa3d70 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -253,6 +253,7 @@ impl FileSource for SedonaFileSource { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { + // Record any new filters let num_filters = filters.len(); let mut new_filters = self.filters.clone(); new_filters.extend(filters); @@ -261,6 +262,8 @@ impl FileSource for SedonaFileSource { ..self.clone() }; + // ...but don't indicate that we handled them so that the filters are + // applied by the other node. Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![ PushedDown::No; num_filters From e2952ff219691ef1bc18b072700b205b4a14970e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 22:56:04 -0600 Subject: [PATCH 18/19] renames --- rust/sedona-datasource/src/format.rs | 68 +++++++++++++------------- rust/sedona-datasource/src/provider.rs | 16 +++--- rust/sedona-datasource/src/spec.rs | 16 +++--- 3 files changed, 52 insertions(+), 48 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index b45fa3d70..9ee29eead 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -42,36 +42,36 @@ use futures::{StreamExt, TryStreamExt}; use object_store::{ObjectMeta, ObjectStore}; use sedona_common::sedona_internal_err; -use crate::spec::{Object, OpenReaderArgs, RecordBatchReaderFormatSpec, SupportsRepartition}; +use crate::spec::{ExternalFormatSpec, Object, OpenReaderArgs, SupportsRepartition}; -/// Create a [FileFormatFactory] from a [RecordBatchReaderFormatSpec] +/// Create a [FileFormatFactory] from a [ExternalFormatSpec] /// /// The FileFormatFactory is the object that may be reigstered with a /// SessionStateBuilder to allow SQL queries to access this format. #[derive(Debug)] -pub struct RecordBatchReaderFormatFactory { - spec: Arc, +pub struct ExternalFormatFactory { + spec: Arc, } -impl RecordBatchReaderFormatFactory { - pub fn new(spec: Arc) -> Self { +impl ExternalFormatFactory { + pub fn new(spec: Arc) -> Self { Self { spec } } } -impl FileFormatFactory for RecordBatchReaderFormatFactory { +impl FileFormatFactory for ExternalFormatFactory { fn create( &self, _state: &dyn Session, format_options: &HashMap, ) -> Result> { - Ok(Arc::new(RecordBatchReaderFormat { + Ok(Arc::new(ExternalFileFormat { spec: self.spec.with_options(format_options)?, })) } fn default(&self) -> Arc { - Arc::new(RecordBatchReaderFormat { + Arc::new(ExternalFileFormat { spec: self.spec.clone(), }) } @@ -81,25 +81,25 @@ impl FileFormatFactory for RecordBatchReaderFormatFactory { } } -impl GetExt for RecordBatchReaderFormatFactory { +impl GetExt for ExternalFormatFactory { fn get_ext(&self) -> String { self.spec.extension().to_string() } } #[derive(Debug)] -pub(crate) struct RecordBatchReaderFormat { - spec: Arc, +pub(crate) struct ExternalFileFormat { + spec: Arc, } -impl RecordBatchReaderFormat { - pub fn new(spec: Arc) -> Self { +impl ExternalFileFormat { + pub fn new(spec: Arc) -> Self { Self { spec } } } #[async_trait] -impl FileFormat for RecordBatchReaderFormat { +impl FileFormat for ExternalFileFormat { fn as_any(&self) -> &dyn Any { self } @@ -193,13 +193,13 @@ impl FileFormat for RecordBatchReaderFormat { } fn file_source(&self) -> Arc { - Arc::new(SedonaFileSource::new(self.spec.clone())) + Arc::new(ExternalFileSource::new(self.spec.clone())) } } #[derive(Debug, Clone)] -struct SedonaFileSource { - spec: Arc, +struct ExternalFileSource { + spec: Arc, batch_size: Option, file_schema: Option, file_projection: Option>, @@ -208,8 +208,8 @@ struct SedonaFileSource { projected_statistics: Option, } -impl SedonaFileSource { - pub fn new(spec: Arc) -> Self { +impl ExternalFileSource { + pub fn new(spec: Arc) -> Self { Self { spec, batch_size: None, @@ -222,7 +222,7 @@ impl SedonaFileSource { } } -impl FileSource for SedonaFileSource { +impl FileSource for ExternalFileSource { fn create_file_opener( &self, store: Arc, @@ -242,7 +242,7 @@ impl FileSource for SedonaFileSource { filters: self.filters.clone(), }; - Arc::new(SimpleOpener { + Arc::new(ExternalFileOpener { spec: self.spec.clone(), args, }) @@ -351,12 +351,12 @@ impl FileSource for SedonaFileSource { } #[derive(Debug, Clone)] -struct SimpleOpener { - spec: Arc, +struct ExternalFileOpener { + spec: Arc, args: OpenReaderArgs, } -impl FileOpener for SimpleOpener { +impl FileOpener for ExternalFileOpener { fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> Result { if file_meta.range.is_some() { return sedona_internal_err!( @@ -397,13 +397,13 @@ mod test { use tempfile::TempDir; use url::Url; - use crate::provider::record_batch_reader_listing_table; + use crate::provider::external_listing_table; use super::*; fn create_echo_spec_ctx() -> SessionContext { let spec = Arc::new(EchoSpec::default()); - let factory = RecordBatchReaderFormatFactory::new(spec.clone()); + let factory = ExternalFormatFactory::new(spec.clone()); // Register the format let mut state = SessionStateBuilder::new().build(); @@ -449,7 +449,7 @@ mod test { } #[async_trait] - impl RecordBatchReaderFormatSpec for EchoSpec { + impl ExternalFormatSpec for EchoSpec { fn extension(&self) -> &str { "echospec" } @@ -457,7 +457,7 @@ mod test { fn with_options( &self, options: &HashMap, - ) -> Result> { + ) -> Result> { let mut self_clone = self.clone(); for (k, v) in options { if k == "option_value" { @@ -593,7 +593,7 @@ mod test { let (_temp_dir, files) = create_echo_spec_temp_dir(); // Select using a listing table and ensure we get a result - let provider = record_batch_reader_listing_table( + let provider = external_listing_table( spec, &ctx, files @@ -628,7 +628,7 @@ mod test { let (_temp_dir, files) = create_echo_spec_temp_dir(); // Select using a listing table and ensure we get a result with the option passed - let provider = record_batch_reader_listing_table( + let provider = external_listing_table( spec, &ctx, files @@ -671,7 +671,7 @@ mod test { let (temp_dir, mut files) = create_echo_spec_temp_dir(); // Listing table with no files should error - let err = record_batch_reader_listing_table(spec.clone(), &ctx, vec![], true) + let err = external_listing_table(spec.clone(), &ctx, vec![], true) .await .unwrap_err(); assert_eq!(err.message(), "No table paths were provided"); @@ -685,7 +685,7 @@ mod test { files.push(file2); // With check_extension as true we should get an error - let err = record_batch_reader_listing_table( + let err = external_listing_table( spec.clone(), &ctx, files @@ -702,7 +702,7 @@ mod test { .ends_with("does not match the expected extension 'echospec'")); // ...but we should be able to turn off the error - record_batch_reader_listing_table( + external_listing_table( spec, &ctx, files diff --git a/rust/sedona-datasource/src/provider.rs b/rust/sedona-datasource/src/provider.rs index df744e167..6b06a70d1 100644 --- a/rust/sedona-datasource/src/provider.rs +++ b/rust/sedona-datasource/src/provider.rs @@ -27,10 +27,14 @@ use datafusion::{ }; use datafusion_common::{exec_err, Result}; -use crate::{format::RecordBatchReaderFormat, spec::RecordBatchReaderFormatSpec}; +use crate::{format::ExternalFileFormat, spec::ExternalFormatSpec}; -pub async fn record_batch_reader_listing_table( - spec: Arc, +/// Create a [ListingTable] from an [ExternalFormatSpec] and one or more URLs +/// +/// This can be used to resolve a format specification into a TableProvider that +/// may be registered with a [SessionContext]. +pub async fn external_listing_table( + spec: Arc, context: &SessionContext, table_paths: Vec, check_extension: bool, @@ -73,7 +77,7 @@ pub async fn record_batch_reader_listing_table( #[derive(Debug, Clone)] struct RecordBatchReaderTableOptions { - spec: Arc, + spec: Arc, check_extension: bool, } @@ -85,9 +89,9 @@ impl ReadOptions<'_> for RecordBatchReaderTableOptions { table_options: TableOptions, ) -> ListingOptions { let format = if let Some(modified) = self.spec.with_table_options(&table_options) { - RecordBatchReaderFormat::new(modified) + ExternalFileFormat::new(modified) } else { - RecordBatchReaderFormat::new(self.spec.clone()) + ExternalFileFormat::new(self.spec.clone()) }; ListingOptions::new(Arc::new(format)) diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index 835c957b8..324544ef7 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -37,10 +37,10 @@ use object_store::{ObjectMeta, ObjectStore}; /// intended to provide a less verbose way to implement readers for a wide /// variety of spatial formats. #[async_trait] -pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { +pub trait ExternalFormatSpec: Debug + Send + Sync { /// Infer a schema for a given file /// - /// Given a single file, infer what schema [RecordBatchReaderFormatSpec::open_reader] + /// Given a single file, infer what schema [ExternalFormatSpec::open_reader] /// would produce in the absence of any other guidance. async fn infer_schema(&self, location: &Object) -> Result; @@ -63,7 +63,7 @@ pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { fn with_options( &self, options: &HashMap, - ) -> Result>; + ) -> Result>; /// Fill in default options from [TableOptions] /// @@ -71,11 +71,11 @@ pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { /// options can be set for various table formats. If the defaults for a built-in /// table format are reasonable to fill in or if Extensions have been set, /// these can be accessed and used to fill default options. Note that any options - /// set with [RecordBatchReaderFormatSpec::with_options] should take precedent. + /// set with [ExternalFormatSpec::with_options] should take precedent. fn with_table_options( &self, _table_options: &TableOptions, - ) -> Option> { + ) -> Option> { None } @@ -84,7 +84,7 @@ pub trait RecordBatchReaderFormatSpec: Debug + Send + Sync { /// This allows an implementation to opt in to DataFusion's built-in file size /// based partitioner, which works well for partitioning files where a simple /// file plus byte range is sufficient. The default opts out of this feature - /// (i.e., every file is passed exactly one to [RecordBatchReaderFormatSpec::open_reader] + /// (i.e., every file is passed exactly one to [ExternalFormatSpec::open_reader] /// without a `range`). fn supports_repartition(&self) -> SupportsRepartition { SupportsRepartition::None @@ -105,7 +105,7 @@ pub enum SupportsRepartition { ByRange, } -/// Arguments to [RecordBatchReaderFormatSpec::open_reader] +/// Arguments to [ExternalFormatSpec::open_reader] #[derive(Debug, Clone)] pub struct OpenReaderArgs { /// The input file, or partial file if [SupportsRepartition::ByRange] is used @@ -120,7 +120,7 @@ pub struct OpenReaderArgs { /// The requested file schema, if specified /// /// DataFusion will usually fill this in to the schema inferred by - /// [RecordBatchReaderFormatSpec::infer_schema]. + /// [ExternalFormatSpec::infer_schema]. pub file_schema: Option, /// The requested field indices From e28d215d8d885693efa8e232ef95813197468326 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 3 Nov 2025 22:58:57 -0600 Subject: [PATCH 19/19] spelling --- rust/sedona-datasource/src/format.rs | 2 +- rust/sedona-datasource/src/spec.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 9ee29eead..20d8f5145 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -46,7 +46,7 @@ use crate::spec::{ExternalFormatSpec, Object, OpenReaderArgs, SupportsRepartitio /// Create a [FileFormatFactory] from a [ExternalFormatSpec] /// -/// The FileFormatFactory is the object that may be reigstered with a +/// The FileFormatFactory is the object that may be registered with a /// SessionStateBuilder to allow SQL queries to access this format. #[derive(Debug)] pub struct ExternalFormatFactory { diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index 324544ef7..1a1bb2b75 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -165,7 +165,7 @@ pub struct Object { impl Object { /// Convert this object to a URL string, if possible /// - /// Returns `None` if there is not suficient information in the Object to calculate + /// Returns `None` if there is not sufficient information in the Object to calculate /// this. pub fn to_url_string(&self) -> Option { match (&self.url, &self.meta) {