Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/sedonadb/python/sedonadb/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def connect(**kwargs: Mapping[str, Any]) -> "Connection":

>>> con = sedona.dbapi.connect()
>>> with con.cursor() as cur:
... cur.execute("SELECT 1 as one")
... _ = cur.execute("SELECT 1 as one")
... cur.fetchall()
[(1,)]
"""
Expand Down
89 changes: 89 additions & 0 deletions rust/sedona-spatial-join/src/build_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow_schema::SchemaRef;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::{memory_pool::MemoryConsumer, SendableRecordBatchStream, TaskContext};
use datafusion_expr::JoinType;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use sedona_common::SedonaOptions;

use crate::{
index::{
BuildSideBatchesCollector, CollectBuildSideMetrics, SpatialIndex, SpatialIndexBuilder,
SpatialJoinBuildMetrics,
},
operand_evaluator::create_operand_evaluator,
spatial_predicate::SpatialPredicate,
};

pub(crate) async fn build_index(
context: Arc<TaskContext>,
build_schema: SchemaRef,
build_streams: Vec<SendableRecordBatchStream>,
spatial_predicate: SpatialPredicate,
join_type: JoinType,
probe_threads_count: usize,
metrics: ExecutionPlanMetricsSet,
) -> Result<SpatialIndex> {
let session_config = context.session_config();
let sedona_options = session_config
.options()
.extensions
.get::<SedonaOptions>()
.cloned()
.unwrap_or_default();
let memory_pool = context.memory_pool();
let evaluator =
create_operand_evaluator(&spatial_predicate, sedona_options.spatial_join.clone());
let collector = BuildSideBatchesCollector::new(evaluator);
let num_partitions = build_streams.len();
let mut collect_metrics_vec = Vec::with_capacity(num_partitions);
let mut reservations = Vec::with_capacity(num_partitions);
for k in 0..num_partitions {
let consumer =
MemoryConsumer::new(format!("SpatialJoinCollectBuildSide[{}]", k)).with_can_spill(true);
let reservation = consumer.register(memory_pool);
reservations.push(reservation);
collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics));
}

let build_partitions = collector
.collect_all(build_streams, reservations, collect_metrics_vec)
.await?;

let contains_external_stream = build_partitions
.iter()
.any(|partition| partition.build_side_batch_stream.is_external());
if !contains_external_stream {
let mut index_builder = SpatialIndexBuilder::new(
build_schema,
spatial_predicate,
sedona_options.spatial_join,
join_type,
probe_threads_count,
Arc::clone(memory_pool),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the else branch below, since external spilling isn't implemented, consider enhancing the error with a TODO/issue reference to aid operators and future implementation tracking (e.g., include an issue ID in the message).

🤖 Was this useful? React with 👍 or 👎

SpatialJoinBuildMetrics::new(0, &metrics),
)?;
index_builder.add_partitions(build_partitions).await?;
index_builder.finish()
} else {
Err(DataFusionError::ResourcesExhausted("Memory limit exceeded while collecting indexed data. External spatial index builder is not yet implemented.".to_string()))
}
}
63 changes: 63 additions & 0 deletions rust/sedona-spatial-join/src/evaluated_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::RecordBatch;
use datafusion_expr::ColumnarValue;
use geo::Rect;
use wkb::reader::Wkb;

use crate::operand_evaluator::EvaluatedGeometryArray;

/// EvaluatedBatch contains the original record batch from the input stream and the evaluated
/// geometry array.
pub(crate) struct EvaluatedBatch {
/// Original record batch polled from the stream
pub batch: RecordBatch,
/// Evaluated geometry array, containing the geometry array containing geometries to be joined,
/// rects of joined geometries, evaluated distance columnar values if we are running a distance
/// join, etc.
pub geom_array: EvaluatedGeometryArray,
}

impl EvaluatedBatch {
pub fn in_mem_size(&self) -> usize {
// NOTE: sometimes `geom_array` will reuse the memory of `batch`, especially when
// the expression for evaluating the geometry is a simple column reference. In this case,
// the in_mem_size will be overestimated. It is a conservative estimation so there's no risk
// of running out of memory because of underestimation.
self.batch.get_array_memory_size() + self.geom_array.in_mem_size()
}

pub fn num_rows(&self) -> usize {
self.batch.num_rows()
}

pub fn wkb(&self, idx: usize) -> Option<&Wkb<'_>> {
let wkbs = self.geom_array.wkbs();
wkbs[idx].as_ref()
}

pub fn rects(&self) -> &Vec<Option<Rect<f32>>> {
&self.geom_array.rects
}

pub fn distance(&self) -> &Option<ColumnarValue> {
&self.geom_array.distance
}
}

pub(crate) mod evaluated_batch_stream;
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::pin::Pin;

use futures::Stream;

use crate::evaluated_batch::EvaluatedBatch;
use datafusion_common::Result;

/// A stream that produces [`EvaluatedBatch`] items. This stream may have purely in-memory or
/// out-of-core implementations. The type of the stream could be queried calling `is_external()`.
pub(crate) trait EvaluatedBatchStream: Stream<Item = Result<EvaluatedBatch>> {
/// Returns true if this stream is an external stream, where batch data were spilled to disk.
fn is_external(&self) -> bool;
}

pub(crate) type SendableEvaluatedBatchStream = Pin<Box<dyn EvaluatedBatchStream + Send>>;

pub(crate) mod in_mem;
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{
pin::Pin,
task::{Context, Poll},
vec::IntoIter,
};

use datafusion_common::Result;

use crate::evaluated_batch::{evaluated_batch_stream::EvaluatedBatchStream, EvaluatedBatch};

pub(crate) struct InMemoryEvaluatedBatchStream {
iter: IntoIter<EvaluatedBatch>,
}

impl InMemoryEvaluatedBatchStream {
pub fn new(batches: Vec<EvaluatedBatch>) -> Self {
InMemoryEvaluatedBatchStream {
iter: batches.into_iter(),
}
}
}

impl EvaluatedBatchStream for InMemoryEvaluatedBatchStream {
fn is_external(&self) -> bool {
false
}
}

impl futures::Stream for InMemoryEvaluatedBatchStream {
type Item = Result<EvaluatedBatch>;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut()
.iter
.next()
.map(|batch| Poll::Ready(Some(Ok(batch))))
.unwrap_or(Poll::Ready(None))
}
}
24 changes: 9 additions & 15 deletions rust/sedona-spatial-join/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use datafusion_physical_plan::{
use parking_lot::Mutex;

use crate::{
index::{build_index, SpatialIndex, SpatialJoinBuildMetrics},
once_fut::OnceAsync,
build_index::build_index,
index::SpatialIndex,
spatial_predicate::{KNNPredicate, SpatialPredicate},
stream::{SpatialJoinProbeMetrics, SpatialJoinStream},
utils::{asymmetric_join_output_partitioning, boundedness_from_children},
// Re-export from sedona-common
utils::join_utils::{asymmetric_join_output_partitioning, boundedness_from_children},
utils::once_fut::OnceAsync,
SedonaOptions,
};

Expand Down Expand Up @@ -141,7 +141,7 @@ pub struct SpatialJoinExec {
}

impl SpatialJoinExec {
// Try to create a new [`SpatialJoinExec`]
/// Try to create a new [`SpatialJoinExec`]
pub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -449,25 +449,22 @@ impl ExecutionPlan for SpatialJoinExec {

let num_partitions = build_side.output_partitioning().partition_count();
let mut build_streams = Vec::with_capacity(num_partitions);
let mut build_metrics = Vec::with_capacity(num_partitions);
for k in 0..num_partitions {
let stream = build_side.execute(k, Arc::clone(&context))?;
build_streams.push(stream);
build_metrics.push(SpatialJoinBuildMetrics::new(k, &self.metrics));
}

let probe_thread_count =
self.right.output_partitioning().partition_count();

Ok(build_index(
Arc::clone(&context),
build_side.schema(),
build_streams,
self.on.clone(),
sedona_options.spatial_join.clone(),
build_metrics,
Arc::clone(context.memory_pool()),
self.join_type,
probe_thread_count,
self.metrics.clone(),
))
})?
};
Expand Down Expand Up @@ -546,24 +543,21 @@ impl SpatialJoinExec {

let num_partitions = build_side.output_partitioning().partition_count();
let mut build_streams = Vec::with_capacity(num_partitions);
let mut build_metrics = Vec::with_capacity(num_partitions);
for k in 0..num_partitions {
let stream = build_side.execute(k, Arc::clone(&context))?;
build_streams.push(stream);
build_metrics.push(SpatialJoinBuildMetrics::new(k, &self.metrics));
}

let probe_thread_count = self.right.output_partitioning().partition_count();

Ok(build_index(
Arc::clone(&context),
build_side.schema(),
build_streams,
self.on.clone(),
sedona_options.spatial_join.clone(),
build_metrics,
Arc::clone(context.memory_pool()),
self.join_type,
probe_thread_count,
self.metrics.clone(),
Comment on lines 551 to +560
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix probe partition count after KNN swap

Line 552 computes probe_thread_count from self.right, but in KNN mode probe_plan can be the left side (when probe_side == JoinSide::Left). In that case the left plan may expose more partitions than the right plan, and downstream components index into probe-partition metrics using the actual probe partition ID. Passing the smaller right-side partition count therefore risks out-of-bounds panics or silently under-provisioned probe metrics when the optimizer decides to probe from the left. Please derive the count from probe_plan instead.

Apply this diff:

-                    let probe_thread_count = self.right.output_partitioning().partition_count();
+                    let probe_thread_count = probe_plan.output_partitioning().partition_count();
🤖 Prompt for AI Agents
In rust/sedona-spatial-join/src/exec.rs around lines 551 to 560,
probe_thread_count is computed from self.right which is incorrect in KNN mode
when probe_plan may be the left side; change the code to derive the partition
count from probe_plan (the actual probe side) instead of self.right so
probe_thread_count = probe_plan.output_partitioning().partition_count(), and
pass that value into build_index to avoid out-of-bounds or under-provisioned
probe metrics.

))
})?
};
Expand Down
Loading