From 39af6dbd9f7d23d0f1c5b664090604b8ba3710e9 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Wed, 21 Jan 2026 21:35:39 +0800 Subject: [PATCH 1/3] Initial integration without spiller --- rust/sedona-common/src/option.rs | 12 ++ rust/sedona-geometry/src/bounding_box.rs | 15 +++ rust/sedona-spatial-join/src/build_index.rs | 9 +- rust/sedona-spatial-join/src/exec.rs | 7 ++ .../src/index/build_side_collector.rs | 112 +++++++++++++++--- 5 files changed, 137 insertions(+), 18 deletions(-) diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs index bc74acf74..9fcd2de79 100644 --- a/rust/sedona-common/src/option.rs +++ b/rust/sedona-common/src/option.rs @@ -71,6 +71,18 @@ config_namespace! { /// Include tie-breakers in KNN join results when there are tied distances pub knn_include_tie_breakers: bool, default = false + /// Maximum number of sample bounding boxes collected from the index side for partitioning the + /// data when running out-of-core spatial join + pub max_index_side_bbox_samples: usize, default = 10000 + + /// Minimum number of sample bounding boxes collected from the index side for partitioning the + /// data when running out-of-core spatial join + pub min_index_side_bbox_samples: usize, default = 1000 + + /// Target sampling rate for sampling bounding boxes from the index side for partitioning the + /// data when running out-of-core spatial join + pub target_index_side_bbox_sampling_rate: f64, default = 0.01 + /// The minimum number of geometry pairs per chunk required to enable parallel /// refinement during the spatial join operation. When the refinement phase has /// fewer geometry pairs than this threshold, it will run sequentially instead diff --git a/rust/sedona-geometry/src/bounding_box.rs b/rust/sedona-geometry/src/bounding_box.rs index 7a018fb52..2da191fd6 100644 --- a/rust/sedona-geometry/src/bounding_box.rs +++ b/rust/sedona-geometry/src/bounding_box.rs @@ -69,6 +69,16 @@ impl BoundingBox { } } + /// Create an empty BoundingBox + pub fn empty() -> Self { + Self { + x: WraparoundInterval::empty(), + y: Interval::empty(), + z: None, + m: None, + } + } + /// The x interval pub fn x(&self) -> &WraparoundInterval { &self.x @@ -91,6 +101,11 @@ impl BoundingBox { &self.m } + /// Check whether this BoundingBox is empty + pub fn is_empty(&self) -> bool { + self.x.is_empty() || self.y.is_empty() + } + /// Calculate intersection with another BoundingBox /// /// Returns true if this bounding box may intersect other or false otherwise. This diff --git a/rust/sedona-spatial-join/src/build_index.rs b/rust/sedona-spatial-join/src/build_index.rs index 5a171007b..db82fa5f7 100644 --- a/rust/sedona-spatial-join/src/build_index.rs +++ b/rust/sedona-spatial-join/src/build_index.rs @@ -46,6 +46,7 @@ pub async fn build_index( join_type: JoinType, probe_threads_count: usize, metrics: ExecutionPlanMetricsSet, + seed: u64, ) -> Result { let session_config = context.session_config(); let sedona_options = session_config @@ -72,7 +73,13 @@ pub async fn build_index( } let build_partitions = collector - .collect_all(build_streams, reservations, collect_metrics_vec, concurrent) + .collect_all( + build_streams, + reservations, + collect_metrics_vec, + concurrent, + seed, + ) .await?; let contains_external_stream = build_partitions diff --git a/rust/sedona-spatial-join/src/exec.rs b/rust/sedona-spatial-join/src/exec.rs index 43b73290c..b2b280697 100644 --- a/rust/sedona-spatial-join/src/exec.rs +++ b/rust/sedona-spatial-join/src/exec.rs @@ -137,6 +137,8 @@ pub struct SpatialJoinExec { /// Indicates if this SpatialJoin was converted from a HashJoin /// When true, we preserve HashJoin's equivalence properties and partitioning converted_from_hash_join: bool, + /// A random seed for making random procedures in spatial join deterministic + seed: u64, } impl SpatialJoinExec { @@ -178,6 +180,7 @@ impl SpatialJoinExec { filter.as_ref(), converted_from_hash_join, )?; + let seed = fastrand::u64(0..0xFFFF); Ok(SpatialJoinExec { left, @@ -192,6 +195,7 @@ impl SpatialJoinExec { cache, once_async_spatial_index: Arc::new(Mutex::new(None)), converted_from_hash_join, + seed, }) } @@ -419,6 +423,7 @@ impl ExecutionPlan for SpatialJoinExec { cache: self.cache.clone(), once_async_spatial_index: Arc::new(Mutex::new(None)), converted_from_hash_join: self.converted_from_hash_join, + seed: self.seed, })) } @@ -472,6 +477,7 @@ impl ExecutionPlan for SpatialJoinExec { self.join_type, probe_thread_count, self.metrics.clone(), + self.seed, )) })? }; @@ -563,6 +569,7 @@ impl SpatialJoinExec { self.join_type, probe_thread_count, self.metrics.clone(), + self.seed, )) })? }; diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs b/rust/sedona-spatial-join/src/index/build_side_collector.rs index 895375925..0d4b81799 100644 --- a/rust/sedona-spatial-join/src/index/build_side_collector.rs +++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_common_runtime::JoinSet; use datafusion_execution::{memory_pool::MemoryReservation, SendableRecordBatchStream}; use datafusion_physical_plan::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; @@ -37,7 +37,8 @@ use crate::{ }, index::SpatialIndexBuilder, operand_evaluator::{create_operand_evaluator, OperandEvaluator}, - SpatialPredicate, + spatial_predicate::SpatialPredicate, + utils::bbox_sampler::{BoundingBoxSampler, BoundingBoxSamples}, }; /// Safety buffer applied when pre-growing build-side reservations to leave headroom for @@ -46,9 +47,19 @@ use crate::{ const BUILD_SIDE_RESERVATION_BUFFER_RATIO: f64 = 0.20; pub(crate) struct BuildPartition { + pub num_rows: usize, pub build_side_batch_stream: SendableEvaluatedBatchStream, pub geo_statistics: GeoStatistics, + /// Subset of build-side bounding boxes kept for building partitioners (e.g. KDB partitioner) + /// when the indexed data cannot be fully loaded into memory. + pub bbox_samples: BoundingBoxSamples, + + /// The estimated memory usage of building spatial index from all the data + /// collected in this partition. The estimated memory used by the global + /// spatial index will be the sum of these per-partition estimation. + pub estimated_spatial_index_memory_usage: usize, + /// Memory reservation for tracking the memory usage of the build partition /// Cleared on `BuildPartition` drop pub reservation: MemoryReservation, @@ -105,32 +116,60 @@ impl BuildSideBatchesCollector { } } + /// Collect build-side batches from the stream into a `BuildPartition`. + /// + /// This method grows the given memory reservation as if an in-memory spatial + /// index will be built for all collected batches. If the reservation cannot + /// be grown, batches are spilled to disk and the reservation is left at its + /// peak value. + /// + /// The reservation represents memory available for loading the spatial index. + /// Across all partitions, the sum of their reservations forms a soft memory + /// cap for subsequent spatial join operations. Reservations grown here are + /// not released until the spatial join operator completes. pub async fn collect( &self, mut stream: SendableEvaluatedBatchStream, mut reservation: MemoryReservation, + mut bbox_sampler: BoundingBoxSampler, metrics: &CollectBuildSideMetrics, ) -> Result { let mut in_mem_batches: Vec = Vec::new(); + let mut total_num_rows = 0; + let mut total_size_bytes = 0; let mut analyzer = AnalyzeAccumulator::new(WKB_GEOMETRY, WKB_GEOMETRY); + // Reserve memory for holding bbox samples. This should be a small reservation. + // We simply return error if the reservation cannot be fulfilled, since there's + // too little memory for the collector and proceeding will risk overshooting the + // memory limit. + reservation.try_grow(bbox_sampler.estimate_maximum_memory_usage())?; + while let Some(evaluated_batch) = stream.next().await { let build_side_batch = evaluated_batch?; let _timer = metrics.time_taken.timer(); - // Process the record batch and create a BuildSideBatch let geom_array = &build_side_batch.geom_array; for wkb in geom_array.wkbs().iter().flatten() { - analyzer.update_statistics(wkb)?; + let summary = sedona_geometry::analyze::analyze_geometry(wkb) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + if !summary.bbox.is_empty() { + bbox_sampler.add_bbox(&summary.bbox); + } + analyzer.ingest_geometry_summary(&summary); } + let num_rows = build_side_batch.num_rows(); let in_mem_size = build_side_batch.in_mem_size()?; + total_num_rows += num_rows; + total_size_bytes += in_mem_size; + metrics.num_batches.add(1); - metrics.num_rows.add(build_side_batch.num_rows()); + metrics.num_rows.add(num_rows); metrics.total_size_bytes.add(in_mem_size); - reservation.try_grow(in_mem_size)?; in_mem_batches.push(build_side_batch); + reservation.try_grow(in_mem_size)?; } let geo_statistics = analyzer.finish(); @@ -147,12 +186,19 @@ impl BuildSideBatchesCollector { let additional_reservation = extra_mem + buffer_bytes; reservation.try_grow(additional_reservation)?; + let build_side_batch_stream: SendableEvaluatedBatchStream = { + let schema = stream.schema(); + Box::pin(InMemoryEvaluatedBatchStream::new(schema, in_mem_batches)) + }; + + let estimated_spatial_index_memory_usage = total_size_bytes + extra_mem; + Ok(BuildPartition { - build_side_batch_stream: Box::pin(InMemoryEvaluatedBatchStream::new( - stream.schema(), - in_mem_batches, - )), + num_rows: total_num_rows, + build_side_batch_stream, geo_statistics, + bbox_samples: bbox_sampler.into_samples(), + estimated_spatial_index_memory_usage, reservation, }) } @@ -163,16 +209,28 @@ impl BuildSideBatchesCollector { reservations: Vec, metrics_vec: Vec, concurrent: bool, + seed: u64, ) -> Result> { if streams.is_empty() { return Ok(vec![]); } + assert_eq!( + streams.len(), + reservations.len(), + "each build stream must have a reservation" + ); + assert_eq!( + streams.len(), + metrics_vec.len(), + "each build stream must have a metrics collector" + ); + if concurrent { - self.collect_all_concurrently(streams, reservations, metrics_vec) + self.collect_all_concurrently(streams, reservations, metrics_vec, seed) .await } else { - self.collect_all_sequentially(streams, reservations, metrics_vec) + self.collect_all_sequentially(streams, reservations, metrics_vec, seed) .await } } @@ -182,8 +240,9 @@ impl BuildSideBatchesCollector { streams: Vec, reservations: Vec, metrics_vec: Vec, + seed: u64, ) -> Result> { - // Spawn a task for each stream to scan all streams concurrently + // Spawn task for each stream to scan all streams concurrently let mut join_set = JoinSet::new(); for (partition_id, ((stream, metrics), reservation)) in streams .into_iter() @@ -193,11 +252,18 @@ impl BuildSideBatchesCollector { { let collector = self.clone(); let evaluator = Arc::clone(&self.evaluator); + let bbox_sampler = BoundingBoxSampler::try_new( + self.spatial_join_options.min_index_side_bbox_samples, + self.spatial_join_options.max_index_side_bbox_samples, + self.spatial_join_options + .target_index_side_bbox_sampling_rate, + seed.wrapping_add(partition_id as u64), + )?; join_set.spawn(async move { let evaluated_stream = create_evaluated_build_stream(stream, evaluator, metrics.time_taken.clone()); let result = collector - .collect(evaluated_stream, reservation, &metrics) + .collect(evaluated_stream, reservation, bbox_sampler, &metrics) .await; (partition_id, result) }); @@ -224,17 +290,29 @@ impl BuildSideBatchesCollector { streams: Vec, reservations: Vec, metrics_vec: Vec, + seed: u64, ) -> Result> { // Collect partitions sequentially (for JNI/embedded contexts) let mut results = Vec::with_capacity(streams.len()); - for ((stream, metrics), reservation) in - streams.into_iter().zip(metrics_vec).zip(reservations) + for (partition_id, ((stream, metrics), reservation)) in streams + .into_iter() + .zip(metrics_vec) + .zip(reservations) + .enumerate() { let evaluator = Arc::clone(&self.evaluator); + let bbox_sampler = BoundingBoxSampler::try_new( + self.spatial_join_options.min_index_side_bbox_samples, + self.spatial_join_options.max_index_side_bbox_samples, + self.spatial_join_options + .target_index_side_bbox_sampling_rate, + seed.wrapping_add(partition_id as u64), + )?; + let evaluated_stream = create_evaluated_build_stream(stream, evaluator, metrics.time_taken.clone()); let result = self - .collect(evaluated_stream, reservation, &metrics) + .collect(evaluated_stream, reservation, bbox_sampler, &metrics) .await?; results.push(result); } From 17a9261eb735a7ee57493d7481438821b5c1423e Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Thu, 22 Jan 2026 21:08:54 +0800 Subject: [PATCH 2/3] Integrate with both bbox sampler and spiller --- Cargo.lock | 1 + Cargo.toml | 1 + rust/sedona-common/src/option.rs | 82 +++++ rust/sedona-spatial-join/Cargo.toml | 1 + rust/sedona-spatial-join/src/build_index.rs | 15 +- .../src/index/build_side_collector.rs | 327 ++++++++++++++++-- 6 files changed, 401 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9a8e0b9d7..5ea269758 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5404,6 +5404,7 @@ dependencies = [ "geo-traits", "geo-types", "geos", + "log", "once_cell", "parking_lot", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 6c20d23df..569b7fe81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ datafusion-physical-expr = { version = "51.0.0" } datafusion-physical-plan = { version = "51.0.0" } dirs = "6.0.0" env_logger = "0.11" +log = "^0.4" fastrand = "2.0" futures = "0.3" pin-project-lite = "0.2" diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs index 9fcd2de79..e68612d45 100644 --- a/rust/sedona-common/src/option.rs +++ b/rust/sedona-common/src/option.rs @@ -83,12 +83,79 @@ config_namespace! { /// data when running out-of-core spatial join pub target_index_side_bbox_sampling_rate: f64, default = 0.01 + /// The in memory size threshold of batches written to spill files. If the spilled batch is + /// too large, it will be broken into several smaller parts before written to spill files. + /// This is for avoiding overshooting the memory limit when reading spilled batches from + /// spill files. Specify 0 for unlimited size. + pub spilled_batch_in_memory_size_threshold: usize, default = 0 + /// The minimum number of geometry pairs per chunk required to enable parallel /// refinement during the spatial join operation. When the refinement phase has /// fewer geometry pairs than this threshold, it will run sequentially instead /// of spawning parallel tasks. Higher values reduce parallelization overhead /// for small datasets, while lower values enable more fine-grained parallelism. pub parallel_refinement_chunk_size: usize, default = 8192 + + /// Options for debugging or testing spatial join + pub debug : SpatialJoinDebugOptions, default = SpatialJoinDebugOptions::default() + } +} + +config_namespace! { + /// Configurations for debugging or testing spatial join + pub struct SpatialJoinDebugOptions { + /// Number of spatial partitions to use for spatial join + pub num_spatial_partitions: NumSpatialPartitionsConfig, default = NumSpatialPartitionsConfig::Auto + + /// The amount of memory for intermittent usage such as spatially repartitioning the data + pub memory_for_intermittent_usage: Option, default = None + + /// Force spilling while collecting the build side or not + pub force_spill: bool, default = false + } +} + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum NumSpatialPartitionsConfig { + /// Automatically determine the number of spatial partitions + Auto, + + /// Use a fixed number of spatial partitions + Fixed(usize), +} + +impl ConfigField for NumSpatialPartitionsConfig { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + let value = match self { + NumSpatialPartitionsConfig::Auto => "auto".into(), + NumSpatialPartitionsConfig::Fixed(n) => format!("{n}"), + }; + v.some(key, value, description); + } + + fn set(&mut self, _key: &str, value: &str) -> Result<()> { + let value = value.to_lowercase(); + let config = match value.as_str() { + "auto" => NumSpatialPartitionsConfig::Auto, + _ => match value.parse::() { + Ok(n) => { + if n > 0 { + NumSpatialPartitionsConfig::Fixed(n) + } else { + return Err(datafusion_common::DataFusionError::Configuration( + "num_spatial_partitions must be greater than 0".to_string(), + )); + } + } + Err(_) => { + return Err(datafusion_common::DataFusionError::Configuration(format!( + "Unknown num_spatial_partitions config: {value}. Expected formats: auto, " + ))); + } + }, + }; + *self = config; + Ok(()) } } @@ -433,4 +500,19 @@ mod tests { assert!(index_type.set("", "invalid").is_err()); assert!(index_type.set("", "").is_err()); } + + #[test] + fn test_num_spatial_partitions_config_parsing() { + let mut config = NumSpatialPartitionsConfig::Auto; + + assert!(config.set("", "auto").is_ok()); + assert_eq!(config, NumSpatialPartitionsConfig::Auto); + + assert!(config.set("", "10").is_ok()); + assert_eq!(config, NumSpatialPartitionsConfig::Fixed(10)); + + assert!(config.set("", "0").is_err()); + assert!(config.set("", "invalid").is_err()); + assert!(config.set("", "fixed[10]").is_err()); + } } diff --git a/rust/sedona-spatial-join/Cargo.toml b/rust/sedona-spatial-join/Cargo.toml index 010c2eadb..322ec5721 100644 --- a/rust/sedona-spatial-join/Cargo.toml +++ b/rust/sedona-spatial-join/Cargo.toml @@ -66,6 +66,7 @@ geo-index = { workspace = true } geos = { workspace = true } float_next_after = { workspace = true } fastrand = { workspace = true } +log = { workspace = true } [dev-dependencies] criterion = { workspace = true } diff --git a/rust/sedona-spatial-join/src/build_index.rs b/rust/sedona-spatial-join/src/build_index.rs index db82fa5f7..11c4c26f6 100644 --- a/rust/sedona-spatial-join/src/build_index.rs +++ b/rust/sedona-spatial-join/src/build_index.rs @@ -56,10 +56,14 @@ pub async fn build_index( .cloned() .unwrap_or_default(); let concurrent = sedona_options.spatial_join.concurrent_build_side_collection; + let runtime_env = context.runtime_env(); + let spill_compression = session_config.spill_compression(); let memory_pool = context.memory_pool(); let collector = BuildSideBatchesCollector::new( spatial_predicate.clone(), sedona_options.spatial_join.clone(), + Arc::clone(&runtime_env), + spill_compression, ); let num_partitions = build_streams.len(); let mut collect_metrics_vec = Vec::with_capacity(num_partitions); @@ -82,9 +86,14 @@ pub async fn build_index( ) .await?; - let contains_external_stream = build_partitions - .iter() - .any(|partition| partition.build_side_batch_stream.is_external()); + let contains_external_stream = build_partitions.iter().any(|partition| { + // Access fields to avoid unused variable warnings. Will be removed when out-of-core + // spatial join (https://github.com/apache/sedona-db/issues/436) is fully implemented. + let _ = partition.num_rows; + let _ = partition.bbox_samples; + let _ = partition.estimated_spatial_index_memory_usage; + partition.build_side_batch_stream.is_external() + }); if !contains_external_stream { let mut index_builder = SpatialIndexBuilder::new( build_schema, diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs b/rust/sedona-spatial-join/src/index/build_side_collector.rs index 0d4b81799..646c6be21 100644 --- a/rust/sedona-spatial-join/src/index/build_side_collector.rs +++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs @@ -17,12 +17,17 @@ use std::sync::Arc; +use datafusion::config::SpillCompression; use datafusion_common::{DataFusionError, Result}; use datafusion_common_runtime::JoinSet; -use datafusion_execution::{memory_pool::MemoryReservation, SendableRecordBatchStream}; -use datafusion_physical_plan::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; +use datafusion_execution::{ + memory_pool::MemoryReservation, runtime_env::RuntimeEnv, SendableRecordBatchStream, +}; +use datafusion_physical_plan::metrics::{ + self, ExecutionPlanMetricsSet, MetricBuilder, SpillMetrics, +}; use futures::StreamExt; -use sedona_common::SpatialJoinOptions; +use sedona_common::{sedona_internal_err, SpatialJoinOptions}; use sedona_expr::statistics::GeoStatistics; use sedona_functions::st_analyze_agg::AnalyzeAccumulator; use sedona_schema::datatypes::WKB_GEOMETRY; @@ -30,9 +35,10 @@ use sedona_schema::datatypes::WKB_GEOMETRY; use crate::{ evaluated_batch::{ evaluated_batch_stream::{ - evaluate::create_evaluated_build_stream, in_mem::InMemoryEvaluatedBatchStream, - SendableEvaluatedBatchStream, + evaluate::create_evaluated_build_stream, external::ExternalEvaluatedBatchStream, + in_mem::InMemoryEvaluatedBatchStream, SendableEvaluatedBatchStream, }, + spill::EvaluatedBatchSpillWriter, EvaluatedBatch, }, index::SpatialIndexBuilder, @@ -41,11 +47,6 @@ use crate::{ utils::bbox_sampler::{BoundingBoxSampler, BoundingBoxSamples}, }; -/// Safety buffer applied when pre-growing build-side reservations to leave headroom for -/// auxiliary structures beyond the build batches themselves. -/// 20% was chosen as a conservative margin. -const BUILD_SIDE_RESERVATION_BUFFER_RATIO: f64 = 0.20; - pub(crate) struct BuildPartition { pub num_rows: usize, pub build_side_batch_stream: SendableEvaluatedBatchStream, @@ -60,8 +61,12 @@ pub(crate) struct BuildPartition { /// spatial index will be the sum of these per-partition estimation. pub estimated_spatial_index_memory_usage: usize, - /// Memory reservation for tracking the memory usage of the build partition - /// Cleared on `BuildPartition` drop + /// Memory reservation for tracking the maximum memory usage when collecting + /// the build side. This reservation won't be freed even when spilling is + /// triggered. We deliberately only grow the memory reservation to probe + /// the amount of memory available for loading spatial index into memory. + /// The size of this reservation will be used to determine the maximum size of + /// each spatial partition, as well as how many spatial partitions to create. pub reservation: MemoryReservation, } @@ -74,8 +79,11 @@ pub(crate) struct BuildSideBatchesCollector { spatial_predicate: SpatialPredicate, spatial_join_options: SpatialJoinOptions, evaluator: Arc, + runtime_env: Arc, + spill_compression: SpillCompression, } +#[derive(Clone)] pub(crate) struct CollectBuildSideMetrics { /// Number of batches collected num_batches: metrics::Count, @@ -88,6 +96,8 @@ pub(crate) struct CollectBuildSideMetrics { /// Total time taken to collect and process the build side batches. This does not include the time awaiting /// for batches from the input stream. time_taken: metrics::Time, + /// Spill metrics of build partitions collecting phase + spill_metrics: SpillMetrics, } impl CollectBuildSideMetrics { @@ -99,6 +109,7 @@ impl CollectBuildSideMetrics { .gauge("build_input_total_size_bytes", partition), time_taken: MetricBuilder::new(metrics) .subset_time("build_input_collection_time", partition), + spill_metrics: SpillMetrics::new(metrics, partition), } } } @@ -107,12 +118,16 @@ impl BuildSideBatchesCollector { pub fn new( spatial_predicate: SpatialPredicate, spatial_join_options: SpatialJoinOptions, + runtime_env: Arc, + spill_compression: SpillCompression, ) -> Self { let evaluator = create_operand_evaluator(&spatial_predicate, spatial_join_options.clone()); BuildSideBatchesCollector { spatial_predicate, spatial_join_options, evaluator, + runtime_env, + spill_compression, } } @@ -134,6 +149,7 @@ impl BuildSideBatchesCollector { mut bbox_sampler: BoundingBoxSampler, metrics: &CollectBuildSideMetrics, ) -> Result { + let mut spill_writer_opt = None; let mut in_mem_batches: Vec = Vec::new(); let mut total_num_rows = 0; let mut total_size_bytes = 0; @@ -168,8 +184,29 @@ impl BuildSideBatchesCollector { metrics.num_rows.add(num_rows); metrics.total_size_bytes.add(in_mem_size); - in_mem_batches.push(build_side_batch); - reservation.try_grow(in_mem_size)?; + match &mut spill_writer_opt { + None => { + // Collected batches are in memory, no spilling happened for this partition before. We'll try + // storing this batch in memory first, and switch to writing everything to disk if we fail + // to grow the reservation. + in_mem_batches.push(build_side_batch); + if let Err(e) = reservation.try_grow(in_mem_size) { + log::debug!( + "Failed to grow reservation by {} bytes. Current reservation: {} bytes. \ + num rows: {}, reason: {:?}, Spilling...", + in_mem_size, + reservation.size(), + num_rows, + e, + ); + spill_writer_opt = + self.spill_in_mem_batches(&mut in_mem_batches, metrics)?; + } + } + Some(spill_writer) => { + spill_writer.append(&build_side_batch)?; + } + } } let geo_statistics = analyzer.finish(); @@ -179,16 +216,45 @@ impl BuildSideBatchesCollector { &self.spatial_join_options, ); - // Try to grow the reservation with a safety buffer to leave room for additional data structures - let buffer_bytes = ((extra_mem + reservation.size()) as f64 - * BUILD_SIDE_RESERVATION_BUFFER_RATIO) - .ceil() as usize; - let additional_reservation = extra_mem + buffer_bytes; - reservation.try_grow(additional_reservation)?; + // Try to grow the reservation a bit more to account for any underestimation of + // memory usage. We proceed even when the growth fails. + let additional_reservation = extra_mem + (extra_mem + reservation.size()) / 5; + if let Err(e) = reservation.try_grow(additional_reservation) { + log::debug!( + "Failed to grow reservation by {} bytes to account for spatial index building memory usage. \ + Current reservation: {} bytes. reason: {:?}", + additional_reservation, + reservation.size(), + e, + ); + } + + // If force spill is enabled, flush everything to disk regardless of whether the memory + // is enough or not. + if self.spatial_join_options.debug.force_spill && spill_writer_opt.is_none() { + log::debug!( + "Force spilling enabled. Spilling {} in-memory batches to disk.", + in_mem_batches.len() + ); + spill_writer_opt = self.spill_in_mem_batches(&mut in_mem_batches, metrics)?; + } - let build_side_batch_stream: SendableEvaluatedBatchStream = { - let schema = stream.schema(); - Box::pin(InMemoryEvaluatedBatchStream::new(schema, in_mem_batches)) + let build_side_batch_stream: SendableEvaluatedBatchStream = match spill_writer_opt { + Some(spill_writer) => { + let spill_file = spill_writer.finish()?; + if !in_mem_batches.is_empty() { + return sedona_internal_err!( + "In-memory batches should have been spilled when spill file exists" + ); + } + Box::pin(ExternalEvaluatedBatchStream::try_from_spill_file( + Arc::new(spill_file), + )?) + } + None => { + let schema = stream.schema(); + Box::pin(InMemoryEvaluatedBatchStream::new(schema, in_mem_batches)) + } }; let estimated_spatial_index_memory_usage = total_size_bytes + extra_mem; @@ -318,4 +384,219 @@ impl BuildSideBatchesCollector { } Ok(results) } + + fn spill_in_mem_batches( + &self, + in_mem_batches: &mut Vec, + metrics: &CollectBuildSideMetrics, + ) -> Result> { + if in_mem_batches.is_empty() { + return Ok(None); + } + + let build_side_batch = &in_mem_batches[0]; + + let schema = build_side_batch.schema(); + let sedona_type = &build_side_batch.geom_array.sedona_type; + let mut spill_writer = EvaluatedBatchSpillWriter::try_new( + Arc::clone(&self.runtime_env), + schema, + sedona_type, + "spilling build side batches", + self.spill_compression, + metrics.spill_metrics.clone(), + if self + .spatial_join_options + .spilled_batch_in_memory_size_threshold + == 0 + { + None + } else { + Some( + self.spatial_join_options + .spilled_batch_in_memory_size_threshold, + ) + }, + )?; + + for in_mem_batch in in_mem_batches.iter() { + spill_writer.append(in_mem_batch)?; + } + + in_mem_batches.clear(); + Ok(Some(spill_writer)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + operand_evaluator::EvaluatedGeometryArray, + spatial_predicate::{RelationPredicate, SpatialRelationType}, + }; + use arrow_array::{ArrayRef, BinaryArray, Int32Array, RecordBatch}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use datafusion_execution::memory_pool::{GreedyMemoryPool, MemoryConsumer, MemoryPool}; + use datafusion_physical_expr::expressions::Literal; + use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + use futures::TryStreamExt; + use sedona_common::SpatialJoinOptions; + use sedona_schema::datatypes::WKB_GEOMETRY; + + fn test_schema() -> Arc { + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])) + } + + fn sample_batch(ids: &[i32], wkbs: Vec>>) -> Result { + assert_eq!(ids.len(), wkbs.len()); + let id_array = Arc::new(Int32Array::from(ids.to_vec())) as ArrayRef; + let batch = RecordBatch::try_new(test_schema(), vec![id_array])?; + let geom_values: Vec> = wkbs + .iter() + .map(|wkb_opt| wkb_opt.as_ref().map(|wkb| wkb.as_slice())) + .collect(); + let geom_array: ArrayRef = Arc::new(BinaryArray::from(geom_values)); + let geom = EvaluatedGeometryArray::try_new(geom_array, &WKB_GEOMETRY)?; + Ok(EvaluatedBatch { + batch, + geom_array: geom, + }) + } + + fn point_wkb(x: f64, y: f64) -> Vec { + let mut buf = vec![1u8, 1, 0, 0, 0]; + buf.extend_from_slice(&x.to_le_bytes()); + buf.extend_from_slice(&y.to_le_bytes()); + buf + } + + fn build_collector() -> BuildSideBatchesCollector { + let expr: Arc = + Arc::new(Literal::new(ScalarValue::Null)); + let predicate = SpatialPredicate::Relation(RelationPredicate::new( + Arc::clone(&expr), + expr, + SpatialRelationType::Intersects, + )); + BuildSideBatchesCollector::new( + predicate, + SpatialJoinOptions::default(), + Arc::new(RuntimeEnv::default()), + SpillCompression::Uncompressed, + ) + } + + fn memory_reservation(limit: usize) -> (MemoryReservation, Arc) { + let pool: Arc = Arc::new(GreedyMemoryPool::new(limit)); + let consumer = MemoryConsumer::new("build-side-test").with_can_spill(true); + let reservation = consumer.register(&pool); + (reservation, pool) + } + + fn build_stream(batches: Vec) -> SendableEvaluatedBatchStream { + let schema = batches + .first() + .map(|batch| batch.schema()) + .unwrap_or_else(test_schema); + Box::pin(InMemoryEvaluatedBatchStream::new(schema, batches)) + } + + fn collect_ids(batches: &[EvaluatedBatch]) -> Vec { + let mut ids = Vec::new(); + for batch in batches { + let array = batch + .batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..array.len() { + ids.push(array.value(i)); + } + } + ids + } + + #[tokio::test] + async fn collect_keeps_batches_in_memory_when_capacity_suffices() -> Result<()> { + let collector = build_collector(); + let (reservation, _pool) = memory_reservation(10 * 1024 * 1024); + let sampler = BoundingBoxSampler::try_new(1, 4, 1.0, 7)?; + let batch_a = sample_batch( + &[0, 1], + vec![Some(point_wkb(0.0, 0.0)), Some(point_wkb(1.0, 1.0))], + )?; + let batch_b = sample_batch(&[2], vec![Some(point_wkb(2.0, 2.0))])?; + let stream = build_stream(vec![batch_a, batch_b]); + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = CollectBuildSideMetrics::new(0, &metrics_set); + + let partition = collector + .collect(stream, reservation, sampler, &metrics) + .await?; + let stream = partition.build_side_batch_stream; + let is_external = stream.is_external(); + let batches: Vec = stream.try_collect().await?; + assert!(!is_external, "Expected in-memory batches"); + assert_eq!(collect_ids(&batches), vec![0, 1, 2]); + assert_eq!(partition.num_rows, 3); + assert_eq!(metrics.num_batches.value(), 2); + assert_eq!(metrics.num_rows.value(), 3); + assert!(metrics.total_size_bytes.value() > 0); + Ok(()) + } + + #[tokio::test] + async fn collect_spills_when_reservation_cannot_grow() -> Result<()> { + let collector = build_collector(); + let sampler = BoundingBoxSampler::try_new(1, 2, 1.0, 13)?; + let bbox_mem = sampler.estimate_maximum_memory_usage(); + let (reservation, _pool) = memory_reservation(bbox_mem + 1); + let batch_a = sample_batch( + &[10, 11], + vec![Some(point_wkb(5.0, 5.0)), Some(point_wkb(6.0, 6.0))], + )?; + let batch_b = sample_batch(&[12], vec![Some(point_wkb(7.0, 7.0))])?; + let stream = build_stream(vec![batch_a, batch_b]); + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = CollectBuildSideMetrics::new(0, &metrics_set); + + let partition = collector + .collect(stream, reservation, sampler, &metrics) + .await?; + let stream = partition.build_side_batch_stream; + let is_external = stream.is_external(); + let batches: Vec = stream.try_collect().await?; + assert!(is_external, "Expected batches to spill to disk"); + assert_eq!(collect_ids(&batches), vec![10, 11, 12]); + let spill_metrics = metrics.spill_metrics; + assert!(spill_metrics.spill_file_count.value() >= 1); + assert!(spill_metrics.spilled_rows.value() >= 1); + Ok(()) + } + + #[tokio::test] + async fn collect_handles_empty_stream() -> Result<()> { + let collector = build_collector(); + let (reservation, _pool) = memory_reservation(1024); + let sampler = BoundingBoxSampler::try_new(1, 2, 1.0, 19)?; + let stream = build_stream(Vec::new()); + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = CollectBuildSideMetrics::new(0, &metrics_set); + + let partition = collector + .collect(stream, reservation, sampler, &metrics) + .await?; + assert_eq!(partition.num_rows, 0); + let stream = partition.build_side_batch_stream; + let is_external = stream.is_external(); + let batches: Vec = stream.try_collect().await?; + assert!(!is_external); + assert!(batches.is_empty()); + assert_eq!(metrics.num_batches.value(), 0); + assert_eq!(metrics.num_rows.value(), 0); + Ok(()) + } } From 52ec01e30cac3225a747f09bccc1315bf97cb110 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Mon, 26 Jan 2026 16:11:28 +0800 Subject: [PATCH 3/3] Add a sedona.spatial_join.debug.random_seed option for setting random seed --- rust/sedona-common/src/option.rs | 3 +++ rust/sedona-spatial-join/src/build_index.rs | 4 ++++ rust/sedona-spatial-join/src/exec.rs | 17 ++++++++++++++--- rust/sedona-spatial-join/src/optimizer.rs | 20 +++++++++++++++++--- 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs index e68612d45..e5290688f 100644 --- a/rust/sedona-common/src/option.rs +++ b/rust/sedona-common/src/option.rs @@ -112,6 +112,9 @@ config_namespace! { /// Force spilling while collecting the build side or not pub force_spill: bool, default = false + + /// Seed for random processes in the spatial join for testing purpose + pub random_seed: Option, default = None } } diff --git a/rust/sedona-spatial-join/src/build_index.rs b/rust/sedona-spatial-join/src/build_index.rs index 11c4c26f6..f3cbb34b1 100644 --- a/rust/sedona-spatial-join/src/build_index.rs +++ b/rust/sedona-spatial-join/src/build_index.rs @@ -48,6 +48,10 @@ pub async fn build_index( metrics: ExecutionPlanMetricsSet, seed: u64, ) -> Result { + log::debug!( + "Building spatial index for running spatial join, seed = {}", + seed + ); let session_config = context.session_config(); let sedona_options = session_config .options() diff --git a/rust/sedona-spatial-join/src/exec.rs b/rust/sedona-spatial-join/src/exec.rs index b2b280697..50cbd171d 100644 --- a/rust/sedona-spatial-join/src/exec.rs +++ b/rust/sedona-spatial-join/src/exec.rs @@ -33,6 +33,7 @@ use datafusion_physical_plan::{ PlanProperties, }; use parking_lot::Mutex; +use sedona_common::SpatialJoinOptions; use crate::{ build_index::build_index, @@ -150,11 +151,15 @@ impl SpatialJoinExec { filter: Option, join_type: &JoinType, projection: Option>, + options: &SpatialJoinOptions, ) -> Result { - Self::try_new_with_options(left, right, on, filter, join_type, projection, false) + Self::try_new_with_options( + left, right, on, filter, join_type, projection, options, false, + ) } /// Create a new SpatialJoinExec with additional options + #[allow(clippy::too_many_arguments)] pub fn try_new_with_options( left: Arc, right: Arc, @@ -162,6 +167,7 @@ impl SpatialJoinExec { filter: Option, join_type: &JoinType, projection: Option>, + options: &SpatialJoinOptions, converted_from_hash_join: bool, ) -> Result { let left_schema = left.schema(); @@ -180,7 +186,11 @@ impl SpatialJoinExec { filter.as_ref(), converted_from_hash_join, )?; - let seed = fastrand::u64(0..0xFFFF); + + let seed = options + .debug + .random_seed + .unwrap_or(fastrand::u64(0..0xFFFF)); Ok(SpatialJoinExec { left, @@ -1337,7 +1347,7 @@ mod tests { let sql = "SELECT * FROM L LEFT JOIN R ON ST_Intersects(L.geometry, R.geometry)"; // Create SpatialJoinExec plan - let ctx = setup_context(Some(options), batch_size)?; + let ctx = setup_context(Some(options.clone()), batch_size)?; ctx.register_table("L", mem_table_left.clone())?; ctx.register_table("R", mem_table_right.clone())?; let df = ctx.sql(sql).await?; @@ -1352,6 +1362,7 @@ mod tests { original_exec.filter.clone(), &join_type, None, + &options, )?; // Create NestedLoopJoinExec plan for comparison diff --git a/rust/sedona-spatial-join/src/optimizer.rs b/rust/sedona-spatial-join/src/optimizer.rs index bd01821b1..a8c281673 100644 --- a/rust/sedona-spatial-join/src/optimizer.rs +++ b/rust/sedona-spatial-join/src/optimizer.rs @@ -235,18 +235,20 @@ impl SpatialJoinOptimizer { fn try_optimize_join( &self, plan: Arc, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> Result>> { // Check if this is a NestedLoopJoinExec that we can convert to spatial join if let Some(nested_loop_join) = plan.as_any().downcast_ref::() { - if let Some(spatial_join) = self.try_convert_to_spatial_join(nested_loop_join)? { + if let Some(spatial_join) = + self.try_convert_to_spatial_join(nested_loop_join, config)? + { return Ok(Transformed::yes(spatial_join)); } } // Check if this is a HashJoinExec with spatial filter that we can convert to spatial join if let Some(hash_join) = plan.as_any().downcast_ref::() { - if let Some(spatial_join) = self.try_convert_hash_join_to_spatial(hash_join)? { + if let Some(spatial_join) = self.try_convert_hash_join_to_spatial(hash_join, config)? { return Ok(Transformed::yes(spatial_join)); } } @@ -261,7 +263,12 @@ impl SpatialJoinOptimizer { fn try_convert_to_spatial_join( &self, nested_loop_join: &NestedLoopJoinExec, + config: &ConfigOptions, ) -> Result>> { + let Some(options) = config.extensions.get::() else { + return Ok(None); + }; + if let Some(join_filter) = nested_loop_join.filter() { if let Some((spatial_predicate, remainder)) = transform_join_filter(join_filter) { // The left side of the nested loop join is required to have only one partition, while SpatialJoinExec @@ -300,6 +307,7 @@ impl SpatialJoinOptimizer { remainder, join_type, nested_loop_join.projection().cloned(), + &options.spatial_join, )?; return Ok(Some(Arc::new(spatial_join))); @@ -316,7 +324,12 @@ impl SpatialJoinOptimizer { fn try_convert_hash_join_to_spatial( &self, hash_join: &HashJoinExec, + config: &ConfigOptions, ) -> Result>> { + let Some(options) = config.extensions.get::() else { + return Ok(None); + }; + // Check if the filter contains spatial predicates if let Some(join_filter) = hash_join.filter() { if let Some((spatial_predicate, mut remainder)) = transform_join_filter(join_filter) { @@ -354,6 +367,7 @@ impl SpatialJoinOptimizer { remainder, hash_join.join_type(), None, // No projection in SpatialJoinExec + &options.spatial_join, true, // converted_from_hash_join = true )?);