Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ datafusion-physical-expr = { version = "51.0.0" }
datafusion-physical-plan = { version = "51.0.0" }
dirs = "6.0.0"
env_logger = "0.11"
log = "^0.4"
fastrand = "2.0"
futures = "0.3"
pin-project-lite = "0.2"
Expand Down
97 changes: 97 additions & 0 deletions rust/sedona-common/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,94 @@ config_namespace! {
/// Include tie-breakers in KNN join results when there are tied distances
pub knn_include_tie_breakers: bool, default = false

/// Maximum number of sample bounding boxes collected from the index side for partitioning the
/// data when running out-of-core spatial join
pub max_index_side_bbox_samples: usize, default = 10000

/// Minimum number of sample bounding boxes collected from the index side for partitioning the
/// data when running out-of-core spatial join
pub min_index_side_bbox_samples: usize, default = 1000

/// Target sampling rate for sampling bounding boxes from the index side for partitioning the
/// data when running out-of-core spatial join
pub target_index_side_bbox_sampling_rate: f64, default = 0.01

/// The in memory size threshold of batches written to spill files. If the spilled batch is
/// too large, it will be broken into several smaller parts before written to spill files.
/// This is for avoiding overshooting the memory limit when reading spilled batches from
/// spill files. Specify 0 for unlimited size.
pub spilled_batch_in_memory_size_threshold: usize, default = 0

/// The minimum number of geometry pairs per chunk required to enable parallel
/// refinement during the spatial join operation. When the refinement phase has
/// fewer geometry pairs than this threshold, it will run sequentially instead
/// of spawning parallel tasks. Higher values reduce parallelization overhead
/// for small datasets, while lower values enable more fine-grained parallelism.
pub parallel_refinement_chunk_size: usize, default = 8192

/// Options for debugging or testing spatial join
pub debug : SpatialJoinDebugOptions, default = SpatialJoinDebugOptions::default()
}
}

config_namespace! {
/// Configurations for debugging or testing spatial join
pub struct SpatialJoinDebugOptions {
/// Number of spatial partitions to use for spatial join
pub num_spatial_partitions: NumSpatialPartitionsConfig, default = NumSpatialPartitionsConfig::Auto

/// The amount of memory for intermittent usage such as spatially repartitioning the data
pub memory_for_intermittent_usage: Option<usize>, default = None

/// Force spilling while collecting the build side or not
pub force_spill: bool, default = false

/// Seed for random processes in the spatial join for testing purpose
pub random_seed: Option<u64>, default = None
}
}

#[derive(Debug, PartialEq, Clone, Copy)]
pub enum NumSpatialPartitionsConfig {
/// Automatically determine the number of spatial partitions
Auto,

/// Use a fixed number of spatial partitions
Fixed(usize),
}

impl ConfigField for NumSpatialPartitionsConfig {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
let value = match self {
NumSpatialPartitionsConfig::Auto => "auto".into(),
NumSpatialPartitionsConfig::Fixed(n) => format!("{n}"),
};
v.some(key, value, description);
}

fn set(&mut self, _key: &str, value: &str) -> Result<()> {
let value = value.to_lowercase();
let config = match value.as_str() {
"auto" => NumSpatialPartitionsConfig::Auto,
_ => match value.parse::<usize>() {
Ok(n) => {
if n > 0 {
NumSpatialPartitionsConfig::Fixed(n)
} else {
return Err(datafusion_common::DataFusionError::Configuration(
"num_spatial_partitions must be greater than 0".to_string(),
));
}
}
Err(_) => {
return Err(datafusion_common::DataFusionError::Configuration(format!(
"Unknown num_spatial_partitions config: {value}. Expected formats: auto, <number>"
)));
}
},
};
*self = config;
Ok(())
}
}

Expand Down Expand Up @@ -421,4 +503,19 @@ mod tests {
assert!(index_type.set("", "invalid").is_err());
assert!(index_type.set("", "").is_err());
}

#[test]
fn test_num_spatial_partitions_config_parsing() {
let mut config = NumSpatialPartitionsConfig::Auto;

assert!(config.set("", "auto").is_ok());
assert_eq!(config, NumSpatialPartitionsConfig::Auto);

assert!(config.set("", "10").is_ok());
assert_eq!(config, NumSpatialPartitionsConfig::Fixed(10));

assert!(config.set("", "0").is_err());
assert!(config.set("", "invalid").is_err());
assert!(config.set("", "fixed[10]").is_err());
}
}
15 changes: 15 additions & 0 deletions rust/sedona-geometry/src/bounding_box.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ impl BoundingBox {
}
}

/// Create an empty BoundingBox
pub fn empty() -> Self {
Self {
x: WraparoundInterval::empty(),
y: Interval::empty(),
z: None,
m: None,
}
}

/// The x interval
pub fn x(&self) -> &WraparoundInterval {
&self.x
Expand All @@ -91,6 +101,11 @@ impl BoundingBox {
&self.m
}

/// Check whether this BoundingBox is empty
pub fn is_empty(&self) -> bool {
self.x.is_empty() || self.y.is_empty()
}

/// Calculate intersection with another BoundingBox
///
/// Returns true if this bounding box may intersect other or false otherwise. This
Expand Down
1 change: 1 addition & 0 deletions rust/sedona-spatial-join/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ geo-index = { workspace = true }
geos = { workspace = true }
float_next_after = { workspace = true }
fastrand = { workspace = true }
log = { workspace = true }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are log and env_logger doing the same thing or do we need both of them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need both. log is the logging facade, env_logger is the logging implementation we actually use.

Reference: https://docs.rs/log/latest/log/#available-logging-implementations


[dev-dependencies]
criterion = { workspace = true }
Expand Down
28 changes: 24 additions & 4 deletions rust/sedona-spatial-join/src/build_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ pub async fn build_index(
join_type: JoinType,
probe_threads_count: usize,
metrics: ExecutionPlanMetricsSet,
seed: u64,
) -> Result<SpatialIndex> {
log::debug!(
"Building spatial index for running spatial join, seed = {}",
seed
);
let session_config = context.session_config();
let sedona_options = session_config
.options()
Expand All @@ -55,10 +60,14 @@ pub async fn build_index(
.cloned()
.unwrap_or_default();
let concurrent = sedona_options.spatial_join.concurrent_build_side_collection;
let runtime_env = context.runtime_env();
let spill_compression = session_config.spill_compression();
let memory_pool = context.memory_pool();
let collector = BuildSideBatchesCollector::new(
spatial_predicate.clone(),
sedona_options.spatial_join.clone(),
Arc::clone(&runtime_env),
spill_compression,
);
let num_partitions = build_streams.len();
let mut collect_metrics_vec = Vec::with_capacity(num_partitions);
Expand All @@ -72,12 +81,23 @@ pub async fn build_index(
}

let build_partitions = collector
.collect_all(build_streams, reservations, collect_metrics_vec, concurrent)
.collect_all(
build_streams,
reservations,
collect_metrics_vec,
concurrent,
seed,
)
.await?;

let contains_external_stream = build_partitions
.iter()
.any(|partition| partition.build_side_batch_stream.is_external());
let contains_external_stream = build_partitions.iter().any(|partition| {
// Access fields to avoid unused variable warnings. Will be removed when out-of-core
// spatial join (https://github.com/apache/sedona-db/issues/436) is fully implemented.
let _ = partition.num_rows;
let _ = partition.bbox_samples;
let _ = partition.estimated_spatial_index_memory_usage;
partition.build_side_batch_stream.is_external()
});
if !contains_external_stream {
let mut index_builder = SpatialIndexBuilder::new(
build_schema,
Expand Down
22 changes: 20 additions & 2 deletions rust/sedona-spatial-join/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use datafusion_physical_plan::{
PlanProperties,
};
use parking_lot::Mutex;
use sedona_common::SpatialJoinOptions;

use crate::{
build_index::build_index,
Expand Down Expand Up @@ -137,6 +138,8 @@ pub struct SpatialJoinExec {
/// Indicates if this SpatialJoin was converted from a HashJoin
/// When true, we preserve HashJoin's equivalence properties and partitioning
converted_from_hash_join: bool,
/// A random seed for making random procedures in spatial join deterministic
seed: u64,
}

impl SpatialJoinExec {
Expand All @@ -148,18 +151,23 @@ impl SpatialJoinExec {
filter: Option<JoinFilter>,
join_type: &JoinType,
projection: Option<Vec<usize>>,
options: &SpatialJoinOptions,
) -> Result<Self> {
Self::try_new_with_options(left, right, on, filter, join_type, projection, false)
Self::try_new_with_options(
left, right, on, filter, join_type, projection, options, false,
)
}

/// Create a new SpatialJoinExec with additional options
#[allow(clippy::too_many_arguments)]
pub fn try_new_with_options(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: SpatialPredicate,
filter: Option<JoinFilter>,
join_type: &JoinType,
projection: Option<Vec<usize>>,
options: &SpatialJoinOptions,
converted_from_hash_join: bool,
) -> Result<Self> {
let left_schema = left.schema();
Expand All @@ -179,6 +187,11 @@ impl SpatialJoinExec {
converted_from_hash_join,
)?;

let seed = options
.debug
.random_seed
.unwrap_or(fastrand::u64(0..0xFFFF));

Ok(SpatialJoinExec {
left,
right,
Expand All @@ -192,6 +205,7 @@ impl SpatialJoinExec {
cache,
once_async_spatial_index: Arc::new(Mutex::new(None)),
converted_from_hash_join,
seed,
})
}

Expand Down Expand Up @@ -419,6 +433,7 @@ impl ExecutionPlan for SpatialJoinExec {
cache: self.cache.clone(),
once_async_spatial_index: Arc::new(Mutex::new(None)),
converted_from_hash_join: self.converted_from_hash_join,
seed: self.seed,
}))
}

Expand Down Expand Up @@ -472,6 +487,7 @@ impl ExecutionPlan for SpatialJoinExec {
self.join_type,
probe_thread_count,
self.metrics.clone(),
self.seed,
))
})?
};
Expand Down Expand Up @@ -563,6 +579,7 @@ impl SpatialJoinExec {
self.join_type,
probe_thread_count,
self.metrics.clone(),
self.seed,
))
})?
};
Expand Down Expand Up @@ -1330,7 +1347,7 @@ mod tests {
let sql = "SELECT * FROM L LEFT JOIN R ON ST_Intersects(L.geometry, R.geometry)";

// Create SpatialJoinExec plan
let ctx = setup_context(Some(options), batch_size)?;
let ctx = setup_context(Some(options.clone()), batch_size)?;
ctx.register_table("L", mem_table_left.clone())?;
ctx.register_table("R", mem_table_right.clone())?;
let df = ctx.sql(sql).await?;
Expand All @@ -1345,6 +1362,7 @@ mod tests {
original_exec.filter.clone(),
&join_type,
None,
&options,
)?;

// Create NestedLoopJoinExec plan for comparison
Expand Down
Loading