diff --git a/Cargo.lock b/Cargo.lock index 4537b8c1d..6ad1b6615 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4893,6 +4893,42 @@ dependencies = [ "regex", ] +[[package]] +name = "sedona-datasource" +version = "0.2.0" +dependencies = [ + "arrow-array", + "arrow-schema", + "async-trait", + "bytes", + "chrono", + "datafusion", + "datafusion-catalog", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "datafusion-physical-plan", + "float_next_after", + "futures", + "geo-traits", + "object_store", + "parquet", + "rstest", + "sedona-common", + "sedona-expr", + "sedona-functions", + "sedona-geometry", + "sedona-schema", + "sedona-testing", + "serde", + "serde_json", + "serde_with", + "tempfile", + "tokio", + "url", +] + [[package]] name = "sedona-expr" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 84b19f58d..f8a7a8e1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,12 +21,14 @@ members = [ "c/sedona-proj", "c/sedona-s2geography", "c/sedona-tg", + "python/sedonadb", "r/sedonadb/src/rust", - "rust/sedona-geo-traits-ext", - "rust/sedona-geo-generic-alg", "rust/sedona-adbc", + "rust/sedona-datasource", "rust/sedona-expr", "rust/sedona-functions", + "rust/sedona-geo-generic-alg", + "rust/sedona-geo-traits-ext", "rust/sedona-geo", "rust/sedona-geometry", "rust/sedona-geoparquet", @@ -34,7 +36,6 @@ members = [ "rust/sedona-spatial-join", "rust/sedona-testing", "rust/sedona", - "python/sedonadb", "sedona-cli", ] resolver = "2" diff --git a/rust/sedona-datasource/Cargo.toml b/rust/sedona-datasource/Cargo.toml new file mode 100644 index 000000000..ffea1d818 --- /dev/null +++ b/rust/sedona-datasource/Cargo.toml @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "sedona-datasource" +version.workspace = true +homepage.workspace = true +repository.workspace = true +description.workspace = true +readme.workspace = true +edition.workspace = true +rust-version.workspace = true + +[lints.clippy] +result_large_err = "allow" + +[features] +default = [] + +[dev-dependencies] +sedona-testing = { path = "../sedona-testing" } +url = { workspace = true } +rstest = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true } + +[dependencies] +async-trait = { workspace = true } +arrow-schema = { workspace = true } +arrow-array = { workspace = true } +bytes = { workspace = true } +chrono = { workspace = true } +datafusion = { workspace = true, features = ["parquet"] } +datafusion-catalog = { workspace = true } +datafusion-common = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-plan = { workspace = true } +float_next_after = { workspace = true } +geo-traits = { workspace = true } +futures = { workspace = true } +object_store = { workspace = true } +parquet = { workspace = true } +sedona-common = { path = "../sedona-common" } +sedona-expr = { path = "../sedona-expr" } +sedona-functions = { path = "../sedona-functions" } +sedona-geometry = { path = "../sedona-geometry" } +sedona-schema = { path = "../sedona-schema" } +serde = { workspace = true } +serde_json = { workspace = true } +serde_with = { workspace = true } diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs new file mode 100644 index 000000000..20d8f5145 --- /dev/null +++ b/rust/sedona-datasource/src/format.rs @@ -0,0 +1,717 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc}; + +use arrow_schema::{Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::{ + config::ConfigOptions, + datasource::{ + file_format::{file_compression_type::FileCompressionType, FileFormat, FileFormatFactory}, + listing::PartitionedFile, + physical_plan::{ + FileGroupPartitioner, FileMeta, FileOpenFuture, FileOpener, FileScanConfig, + FileSinkConfig, FileSource, + }, + }, +}; +use datafusion_catalog::{memory::DataSourceExec, Session}; +use datafusion_common::{not_impl_err, DataFusionError, GetExt, Result, Statistics}; +use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExpr}; +use datafusion_physical_plan::{ + filter_pushdown::{FilterPushdownPropagation, PushedDown}, + metrics::ExecutionPlanMetricsSet, + ExecutionPlan, +}; +use futures::{StreamExt, TryStreamExt}; +use object_store::{ObjectMeta, ObjectStore}; +use sedona_common::sedona_internal_err; + +use crate::spec::{ExternalFormatSpec, Object, OpenReaderArgs, SupportsRepartition}; + +/// Create a [FileFormatFactory] from a [ExternalFormatSpec] +/// +/// The FileFormatFactory is the object that may be registered with a +/// SessionStateBuilder to allow SQL queries to access this format. +#[derive(Debug)] +pub struct ExternalFormatFactory { + spec: Arc, +} + +impl ExternalFormatFactory { + pub fn new(spec: Arc) -> Self { + Self { spec } + } +} + +impl FileFormatFactory for ExternalFormatFactory { + fn create( + &self, + _state: &dyn Session, + format_options: &HashMap, + ) -> Result> { + Ok(Arc::new(ExternalFileFormat { + spec: self.spec.with_options(format_options)?, + })) + } + + fn default(&self) -> Arc { + Arc::new(ExternalFileFormat { + spec: self.spec.clone(), + }) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + +impl GetExt for ExternalFormatFactory { + fn get_ext(&self) -> String { + self.spec.extension().to_string() + } +} + +#[derive(Debug)] +pub(crate) struct ExternalFileFormat { + spec: Arc, +} + +impl ExternalFileFormat { + pub fn new(spec: Arc) -> Self { + Self { spec } + } +} + +#[async_trait] +impl FileFormat for ExternalFileFormat { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_ext(&self) -> String { + self.spec.extension().to_string() + } + + fn get_ext_with_compression( + &self, + _file_compression_type: &FileCompressionType, + ) -> Result { + not_impl_err!("extension with compression type") + } + + fn compression_type(&self) -> Option { + None + } + + async fn infer_schema( + &self, + state: &dyn Session, + store: &Arc, + objects: &[ObjectMeta], + ) -> Result { + let mut schemas: Vec<_> = futures::stream::iter(objects) + .map(|object| async move { + let schema = self + .spec + .infer_schema(&Object { + store: Some(store.clone()), + url: None, + meta: Some(object.clone()), + range: None, + }) + .await?; + Ok::<_, DataFusionError>((object.location.clone(), schema)) + }) + .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 + .buffered(state.config_options().execution.meta_fetch_concurrency) + .try_collect() + .await?; + + schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2)); + + let schemas = schemas + .into_iter() + .map(|(_, schema)| schema) + .collect::>(); + + let schema = Schema::try_merge(schemas)?; + Ok(Arc::new(schema)) + } + + async fn infer_stats( + &self, + _state: &dyn Session, + store: &Arc, + table_schema: SchemaRef, + object: &ObjectMeta, + ) -> Result { + self.spec + .infer_stats( + &Object { + store: Some(store.clone()), + url: None, + meta: Some(object.clone()), + range: None, + }, + &table_schema, + ) + .await + } + + async fn create_physical_plan( + &self, + _state: &dyn Session, + config: FileScanConfig, + ) -> Result> { + Ok(DataSourceExec::from_data_source(config)) + } + + async fn create_writer_physical_plan( + &self, + _input: Arc, + _state: &dyn Session, + _conf: FileSinkConfig, + _order_requirements: Option, + ) -> Result> { + not_impl_err!("writing not yet supported for SimpleSedonaFormat") + } + + fn file_source(&self) -> Arc { + Arc::new(ExternalFileSource::new(self.spec.clone())) + } +} + +#[derive(Debug, Clone)] +struct ExternalFileSource { + spec: Arc, + batch_size: Option, + file_schema: Option, + file_projection: Option>, + filters: Vec>, + metrics: ExecutionPlanMetricsSet, + projected_statistics: Option, +} + +impl ExternalFileSource { + pub fn new(spec: Arc) -> Self { + Self { + spec, + batch_size: None, + file_schema: None, + file_projection: None, + filters: Vec::new(), + metrics: ExecutionPlanMetricsSet::default(), + projected_statistics: None, + } + } +} + +impl FileSource for ExternalFileSource { + fn create_file_opener( + &self, + store: Arc, + base_config: &FileScanConfig, + _partition: usize, + ) -> Arc { + let args = OpenReaderArgs { + src: Object { + store: Some(store.clone()), + url: Some(base_config.object_store_url.clone()), + meta: None, + range: None, + }, + batch_size: self.batch_size, + file_schema: self.file_schema.clone(), + file_projection: self.file_projection.clone(), + filters: self.filters.clone(), + }; + + Arc::new(ExternalFileOpener { + spec: self.spec.clone(), + args, + }) + } + + fn try_pushdown_filters( + &self, + filters: Vec>, + _config: &ConfigOptions, + ) -> Result>> { + // Record any new filters + let num_filters = filters.len(); + let mut new_filters = self.filters.clone(); + new_filters.extend(filters); + let source = Self { + filters: new_filters, + ..self.clone() + }; + + // ...but don't indicate that we handled them so that the filters are + // applied by the other node. + Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![ + PushedDown::No; + num_filters + ]) + .with_updated_node(Arc::new(source))) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn with_batch_size(&self, batch_size: usize) -> Arc { + Arc::new(Self { + batch_size: Some(batch_size), + ..self.clone() + }) + } + + fn with_schema(&self, schema: SchemaRef) -> Arc { + Arc::new(Self { + file_schema: Some(schema), + ..self.clone() + }) + } + + fn with_projection(&self, config: &FileScanConfig) -> Arc { + Arc::new(Self { + file_projection: config.file_column_projection_indices(), + ..self.clone() + }) + } + + fn with_statistics(&self, statistics: Statistics) -> Arc { + Arc::new(Self { + projected_statistics: Some(statistics), + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn statistics(&self) -> Result { + let statistics = &self.projected_statistics; + Ok(statistics + .clone() + .expect("projected_statistics must be set")) + } + + fn file_type(&self) -> &str { + self.spec.extension() + } + + fn repartitioned( + &self, + target_partitions: usize, + repartition_file_min_size: usize, + output_ordering: Option, + config: &FileScanConfig, + ) -> Result> { + match self.spec.supports_repartition() { + SupportsRepartition::None => Ok(None), + SupportsRepartition::ByRange => { + // Default implementation + if config.file_compression_type.is_compressed() || config.new_lines_in_values { + return Ok(None); + } + + let repartitioned_file_groups_option = FileGroupPartitioner::new() + .with_target_partitions(target_partitions) + .with_repartition_file_min_size(repartition_file_min_size) + .with_preserve_order_within_groups(output_ordering.is_some()) + .repartition_file_groups(&config.file_groups); + + if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { + let mut source = config.clone(); + source.file_groups = repartitioned_file_groups; + return Ok(Some(source)); + } + Ok(None) + } + } + } +} + +#[derive(Debug, Clone)] +struct ExternalFileOpener { + spec: Arc, + args: OpenReaderArgs, +} + +impl FileOpener for ExternalFileOpener { + fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> Result { + if file_meta.range.is_some() { + return sedona_internal_err!( + "Expected SimpleOpener to open a single partition per file" + ); + } + + let mut self_clone = self.clone(); + Ok(Box::pin(async move { + self_clone.args.src.meta.replace(file_meta.object_meta); + self_clone.args.src.range = file_meta.range; + let reader = self_clone.spec.open_reader(&self_clone.args).await?; + let stream = + futures::stream::iter(reader.into_iter().map(|batch| batch.map_err(Into::into))); + Ok(stream.boxed()) + })) + } +} + +#[cfg(test)] +mod test { + + use arrow_array::{ + Int32Array, Int64Array, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray, + }; + use arrow_schema::{DataType, Field}; + use datafusion::{ + assert_batches_eq, + datasource::listing::ListingTableUrl, + execution::SessionStateBuilder, + prelude::{col, lit, SessionContext}, + }; + use datafusion_common::plan_err; + use std::{ + io::{Read, Write}, + path::PathBuf, + }; + use tempfile::TempDir; + use url::Url; + + use crate::provider::external_listing_table; + + use super::*; + + fn create_echo_spec_ctx() -> SessionContext { + let spec = Arc::new(EchoSpec::default()); + let factory = ExternalFormatFactory::new(spec.clone()); + + // Register the format + let mut state = SessionStateBuilder::new().build(); + state.register_file_format(Arc::new(factory), true).unwrap(); + SessionContext::new_with_state(state).enable_url_table() + } + + fn create_echo_spec_temp_dir() -> (TempDir, Vec) { + // Create a temporary directory with a few files with the declared extension + let temp_dir = TempDir::new().unwrap(); + let temp_path = temp_dir.path(); + let file0 = temp_path.join("item0.echospec"); + std::fs::File::create(&file0) + .unwrap() + .write_all(b"not empty") + .unwrap(); + let file1 = temp_path.join("item1.echospec"); + std::fs::File::create(&file1) + .unwrap() + .write_all(b"not empty") + .unwrap(); + (temp_dir, vec![file0, file1]) + } + + fn check_object_is_readable_file(location: &Object) { + let url = Url::parse(&location.to_url_string().unwrap()).expect("valid uri"); + assert_eq!(url.scheme(), "file"); + let path = url.to_file_path().expect("can extract file path"); + + let mut content = String::new(); + std::fs::File::open(path) + .expect("url can't be opened") + .read_to_string(&mut content) + .expect("failed to read"); + if content.is_empty() { + panic!("empty file at url {url}"); + } + } + + #[derive(Debug, Default, Clone)] + struct EchoSpec { + option_value: Option, + } + + #[async_trait] + impl ExternalFormatSpec for EchoSpec { + fn extension(&self) -> &str { + "echospec" + } + + fn with_options( + &self, + options: &HashMap, + ) -> Result> { + let mut self_clone = self.clone(); + for (k, v) in options { + if k == "option_value" { + self_clone.option_value = Some(v.to_string()); + } else { + return plan_err!("Unsupported option for EchoSpec: '{k}'"); + } + } + + Ok(Arc::new(self_clone)) + } + + async fn infer_schema(&self, location: &Object) -> Result { + check_object_is_readable_file(location); + Ok(Schema::new(vec![ + Field::new("src", DataType::Utf8, true), + Field::new("batch_size", DataType::Int64, true), + Field::new("filter_count", DataType::Int32, true), + Field::new("option_value", DataType::Utf8, true), + ])) + } + + async fn infer_stats( + &self, + location: &Object, + table_schema: &Schema, + ) -> Result { + check_object_is_readable_file(location); + Ok(Statistics::new_unknown(table_schema)) + } + + async fn open_reader( + &self, + args: &OpenReaderArgs, + ) -> Result> { + check_object_is_readable_file(&args.src); + + let src: StringArray = [args.src.clone()] + .iter() + .map(|item| Some(item.to_url_string().unwrap())) + .collect(); + let batch_size: Int64Array = [args.batch_size] + .iter() + .map(|item| item.map(|i| i as i64)) + .collect(); + let filter_count: Int32Array = [args.filters.len() as i32].into_iter().collect(); + let option_value: StringArray = [self.option_value.clone()].iter().collect(); + + let schema = Arc::new(self.infer_schema(&args.src).await?); + let mut batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(src), + Arc::new(batch_size), + Arc::new(filter_count), + Arc::new(option_value), + ], + )?; + + if let Some(projection) = &args.file_projection { + batch = batch.project(projection)?; + } + + Ok(Box::new(RecordBatchIterator::new([Ok(batch)], schema))) + } + } + + #[tokio::test] + async fn spec_format() { + let ctx = create_echo_spec_ctx(); + let (temp_dir, files) = create_echo_spec_temp_dir(); + + // Select using just the filename and ensure we get a result + let batches_item0 = ctx + .table(files[0].to_string_lossy().to_string()) + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(batches_item0.len(), 1); + assert_eq!(batches_item0[0].num_rows(), 1); + + // With a glob we should get all the files + let batches = ctx + .table(format!("{}/*.echospec", temp_dir.path().to_string_lossy())) + .await + .unwrap() + .collect() + .await + .unwrap(); + // We should get one value per partition + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 1); + assert_eq!(batches[1].num_rows(), 1); + } + + #[tokio::test] + async fn spec_format_project_filter() { + let ctx = create_echo_spec_ctx(); + let (temp_dir, _files) = create_echo_spec_temp_dir(); + + // Ensure that if we pass + let batches = ctx + .table(format!("{}/*.echospec", temp_dir.path().to_string_lossy())) + .await + .unwrap() + .filter(col("src").like(lit("%item0%"))) + .unwrap() + .select(vec![col("batch_size"), col("filter_count")]) + .unwrap() + .collect() + .await + .unwrap(); + + assert_batches_eq!( + [ + "+------------+--------------+", + "| batch_size | filter_count |", + "+------------+--------------+", + "| 8192 | 1 |", + "+------------+--------------+", + ], + &batches + ); + } + + #[tokio::test] + async fn spec_listing_table() { + let spec = Arc::new(EchoSpec::default()); + let ctx = SessionContext::new(); + let (_temp_dir, files) = create_echo_spec_temp_dir(); + + // Select using a listing table and ensure we get a result + let provider = external_listing_table( + spec, + &ctx, + files + .iter() + .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) + .collect(), + true, + ) + .await + .unwrap(); + + let batches = ctx + .read_table(Arc::new(provider)) + .unwrap() + .collect() + .await + .unwrap(); + + // We should get one value per partition + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 1); + assert_eq!(batches[1].num_rows(), 1); + } + + #[tokio::test] + async fn spec_listing_table_options() { + let spec = Arc::new(EchoSpec::default()) + .with_options(&[("option_value".to_string(), "foofy".to_string())].into()) + .unwrap(); + + let ctx = SessionContext::new(); + let (_temp_dir, files) = create_echo_spec_temp_dir(); + + // Select using a listing table and ensure we get a result with the option passed + let provider = external_listing_table( + spec, + &ctx, + files + .iter() + .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) + .collect(), + true, + ) + .await + .unwrap(); + + let batches = ctx + .read_table(Arc::new(provider)) + .unwrap() + .select(vec![col("batch_size"), col("option_value")]) + .unwrap() + .collect() + .await + .unwrap(); + assert_batches_eq!( + [ + "+------------+--------------+", + "| batch_size | option_value |", + "+------------+--------------+", + "| 8192 | foofy |", + "| 8192 | foofy |", + "+------------+--------------+", + ], + &batches + ); + } + + #[tokio::test] + async fn spec_listing_table_errors() { + let spec = Arc::new(EchoSpec::default()) + .with_options(&[("option_value".to_string(), "foofy".to_string())].into()) + .unwrap(); + + let ctx = SessionContext::new(); + let (temp_dir, mut files) = create_echo_spec_temp_dir(); + + // Listing table with no files should error + let err = external_listing_table(spec.clone(), &ctx, vec![], true) + .await + .unwrap_err(); + assert_eq!(err.message(), "No table paths were provided"); + + // Create a file with a different extension + let file2 = temp_dir.path().join("item2.echospecNOT"); + std::fs::File::create(&file2) + .unwrap() + .write_all(b"not empty") + .unwrap(); + files.push(file2); + + // With check_extension as true we should get an error + let err = external_listing_table( + spec.clone(), + &ctx, + files + .iter() + .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) + .collect(), + true, + ) + .await + .unwrap_err(); + + assert!(err + .message() + .ends_with("does not match the expected extension 'echospec'")); + + // ...but we should be able to turn off the error + external_listing_table( + spec, + &ctx, + files + .iter() + .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap()) + .collect(), + false, + ) + .await + .unwrap(); + } +} diff --git a/rust/sedona-datasource/src/lib.rs b/rust/sedona-datasource/src/lib.rs new file mode 100644 index 000000000..4bdc59695 --- /dev/null +++ b/rust/sedona-datasource/src/lib.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod format; +pub mod provider; +pub mod spec; diff --git a/rust/sedona-datasource/src/provider.rs b/rust/sedona-datasource/src/provider.rs new file mode 100644 index 000000000..6b06a70d1 --- /dev/null +++ b/rust/sedona-datasource/src/provider.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::{ + config::TableOptions, + datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + execution::{options::ReadOptions, SessionState}, + prelude::{SessionConfig, SessionContext}, +}; +use datafusion_common::{exec_err, Result}; + +use crate::{format::ExternalFileFormat, spec::ExternalFormatSpec}; + +/// Create a [ListingTable] from an [ExternalFormatSpec] and one or more URLs +/// +/// This can be used to resolve a format specification into a TableProvider that +/// may be registered with a [SessionContext]. +pub async fn external_listing_table( + spec: Arc, + context: &SessionContext, + table_paths: Vec, + check_extension: bool, +) -> Result { + let session_config = context.copied_config(); + let options = RecordBatchReaderTableOptions { + spec, + check_extension, + }; + let listing_options = + options.to_listing_options(&session_config, context.copied_table_options()); + + let option_extension = listing_options.file_extension.clone(); + + if table_paths.is_empty() { + return exec_err!("No table paths were provided"); + } + + // check if the file extension matches the expected extension if one is provided + if !option_extension.is_empty() && options.check_extension { + for path in &table_paths { + let file_path = path.as_str(); + if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() { + return exec_err!( + "File path '{file_path}' does not match the expected extension '{option_extension}'" + ); + } + } + } + + let resolved_schema = options + .get_resolved_schema(&session_config, context.state(), table_paths[0].clone()) + .await?; + let config = ListingTableConfig::new_with_multi_paths(table_paths) + .with_listing_options(listing_options) + .with_schema(resolved_schema); + + ListingTable::try_new(config) +} + +#[derive(Debug, Clone)] +struct RecordBatchReaderTableOptions { + spec: Arc, + check_extension: bool, +} + +#[async_trait] +impl ReadOptions<'_> for RecordBatchReaderTableOptions { + fn to_listing_options( + &self, + config: &SessionConfig, + table_options: TableOptions, + ) -> ListingOptions { + let format = if let Some(modified) = self.spec.with_table_options(&table_options) { + ExternalFileFormat::new(modified) + } else { + ExternalFileFormat::new(self.spec.clone()) + }; + + ListingOptions::new(Arc::new(format)) + .with_file_extension(self.spec.extension()) + .with_session_config_options(config) + } + + async fn get_resolved_schema( + &self, + config: &SessionConfig, + state: SessionState, + table_path: ListingTableUrl, + ) -> Result { + self.to_listing_options(config, state.default_table_options()) + .infer_schema(&state, &table_path) + .await + } +} diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs new file mode 100644 index 000000000..1a1bb2b75 --- /dev/null +++ b/rust/sedona-datasource/src/spec.rs @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::HashMap, fmt::Debug, sync::Arc}; + +use arrow_array::RecordBatchReader; +use arrow_schema::{Schema, SchemaRef}; +use async_trait::async_trait; + +use datafusion::{config::TableOptions, datasource::listing::FileRange}; +use datafusion_common::{Result, Statistics}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_expr::PhysicalExpr; +use object_store::{ObjectMeta, ObjectStore}; + +/// Simple file format specification +/// +/// In DataFusion, various parts of the file format are split among the +/// FileFormatFactory, the FileFormat, the FileSource, the FileOpener, +/// and a few other traits. This trait is designed to provide a few +/// important features of a natively implemented FileFormat but consolidating +/// the components of implementing the format in the same place. This is +/// intended to provide a less verbose way to implement readers for a wide +/// variety of spatial formats. +#[async_trait] +pub trait ExternalFormatSpec: Debug + Send + Sync { + /// Infer a schema for a given file + /// + /// Given a single file, infer what schema [ExternalFormatSpec::open_reader] + /// would produce in the absence of any other guidance. + async fn infer_schema(&self, location: &Object) -> Result; + + /// Open a [RecordBatchReader] for a given file + /// + /// The implementation must handle the `file_projection`; however, + /// need not handle the `filters` (but may use them for pruning). + async fn open_reader(&self, args: &OpenReaderArgs) + -> Result>; + + /// A file extension or `""` if this concept does not apply + fn extension(&self) -> &str { + "" + } + + /// Compute a clone of self but with the key/value options specified + /// + /// Implementations should error for invalid key/value input that does + /// not apply to this reader. + fn with_options( + &self, + options: &HashMap, + ) -> Result>; + + /// Fill in default options from [TableOptions] + /// + /// The TableOptions are a DataFusion concept that provide a means by which + /// options can be set for various table formats. If the defaults for a built-in + /// table format are reasonable to fill in or if Extensions have been set, + /// these can be accessed and used to fill default options. Note that any options + /// set with [ExternalFormatSpec::with_options] should take precedent. + fn with_table_options( + &self, + _table_options: &TableOptions, + ) -> Option> { + None + } + + /// Allow repartitioning + /// + /// This allows an implementation to opt in to DataFusion's built-in file size + /// based partitioner, which works well for partitioning files where a simple + /// file plus byte range is sufficient. The default opts out of this feature + /// (i.e., every file is passed exactly one to [ExternalFormatSpec::open_reader] + /// without a `range`). + fn supports_repartition(&self) -> SupportsRepartition { + SupportsRepartition::None + } + + /// Infer [Statistics] for a given file + async fn infer_stats(&self, _location: &Object, table_schema: &Schema) -> Result { + Ok(Statistics::new_unknown(table_schema)) + } +} + +/// Enumerator for repartitioning support +#[derive(Debug, Clone, Copy)] +pub enum SupportsRepartition { + /// This implementation does not support repartitioning beyond the file level + None, + /// This implementation supports partitioning by arbitrary ranges with a file + ByRange, +} + +/// Arguments to [ExternalFormatSpec::open_reader] +#[derive(Debug, Clone)] +pub struct OpenReaderArgs { + /// The input file, or partial file if [SupportsRepartition::ByRange] is used + pub src: Object, + + /// The requested batch size + /// + /// DataFusion will usually fill this in to a default of 8192 or a user-specified + /// default in the session configuration. + pub batch_size: Option, + + /// The requested file schema, if specified + /// + /// DataFusion will usually fill this in to the schema inferred by + /// [ExternalFormatSpec::infer_schema]. + pub file_schema: Option, + + /// The requested field indices + /// + /// Implementations must handle this (e.g., using `RecordBatch::project` + /// or by implementing partial reads). + pub file_projection: Option>, + + /// Filter expressions + /// + /// Expressions that may be used for pruning. Implementations need not + /// apply these filters. + pub filters: Vec>, +} + +/// The information required to specify a file or partial file +/// +/// Depending exactly where in DataFusion we are calling in from, we might +/// have various information about the file. In general, implementations should +/// use [ObjectStore] and [ObjectMeta] to access the file to use DataFusion's +/// registered IO for these protocols. When implementing a filename-based reader +/// (e.g., that uses some external API to read files), use [Object::to_url_string]. +#[derive(Debug, Clone)] +pub struct Object { + /// The object store reference + pub store: Option>, + + /// A URL that may be used to retrieve an [ObjectStore] from a registry + /// + /// These URLs typically are populated only with the scheme. + pub url: Option, + + /// An individual object in an ObjectStore + pub meta: Option, + + /// If this represents a partial file, the byte range within the file + /// + /// This is only set if partitioning other than `None` is provided + pub range: Option, +} + +impl Object { + /// Convert this object to a URL string, if possible + /// + /// Returns `None` if there is not sufficient information in the Object to calculate + /// this. + pub fn to_url_string(&self) -> Option { + match (&self.url, &self.meta) { + (None, Some(meta)) => { + // There's no great way to map an object_store to a url prefix if we're not + // provided the `url`; however, this is what we have access to in the + // Schema and Statistics resolution phases of the FileFormat. + // This is a heuristic that should work for https and a local filesystem, + // which is what we might be able to expect a non-DataFusion system like + // GDAL to be able to translate. + let object_store_debug = format!("{:?}", self.store).to_lowercase(); + if object_store_debug.contains("http") { + Some(format!("https://{}", meta.location)) + } else if object_store_debug.contains("local") { + Some(format!("file:///{}", meta.location)) + } else { + None + } + } + (Some(url), None) => Some(url.to_string()), + (Some(url), Some(meta)) => Some(format!("{url}/{}", meta.location)), + (None, None) => None, + } + } +}