277: chore: Refactor spatial join code for easier implementing new features#7
277: chore: Refactor spatial join code for easier implementing new features#7martin-augment wants to merge 5 commits intomainfrom
Conversation
WalkthroughThis pull request refactors Sedona's spatial join implementation with new modular infrastructure for stream processing, index building, and query execution. Changes introduce evaluated batch streaming, build-side collection, spatial indexing with KNN support, and reorganize utilities into specialized modules. Changes
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Code Review: Spatial Join Refactoring (PR #7)This is a substantial refactoring that improves code organization and maintainability. Overall, the changes are well-structured and follow good software engineering practices. Here's my detailed analysis: ✅ Strengths1. Excellent ModularizationThe refactoring successfully breaks down the monolithic
This significantly improves code navigability and makes future modifications easier. 2. Improved Abstractions
3. Better Separation of Concerns
4. Type Safety ImprovementsThe change from
|
| @@ -524,9 +518,10 @@ impl SpatialJoinBatchIterator { | |||
| // Process probe rows incrementally until we have enough results or finish | |||
| let initial_size = self.build_batch_positions.len(); | |||
|
|
|||
There was a problem hiding this comment.
Consider asserting the invariant that rects.len() == wkbs.len() before the loop (e.g., a debug_assert_eq!) to catch any mismatch early and prevent potential out-of-bounds access when indexing rects[self.current_probe_idx].
🤖 Was this useful? React with 👍 or 👎
| sedona_options.spatial_join, | ||
| join_type, | ||
| probe_threads_count, | ||
| Arc::clone(memory_pool), |
There was a problem hiding this comment.
In the else branch below, since external spilling isn't implemented, consider enhancing the error with a TODO/issue reference to aid operators and future implementation tracking (e.g., include an issue ID in the message).
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rust/sedona-spatial-join/src/operand_evaluator.rs (1)
243-253: Don't clearrects; it breaks downstream indexing.When the scalar distance evaluates to NULL, the new code empties
result.rects. Later consumers (e.g.SpatialJoinBatchIterator::next_batchinstream.rs) indexrectsby probe row and now hit an out-of-bounds panic wheneverdistanceis NULL, even though geometries still exist. Keep the vector length aligned with the geometry count—set each entry toNoneinstead of clearing or leave it unchanged. For example:- ColumnarValue::Scalar(ScalarValue::Float64(None)) => { - // Distance expression evaluates to NULL, the resulting distance should be NULL as well. - result.rects.clear(); - } + ColumnarValue::Scalar(ScalarValue::Float64(None)) => { + // Distance expression evaluates to NULL; distances are unknown but geometry count stays the same. + result.rects.iter_mut().for_each(|rect_opt| *rect_opt = None); + }
🧹 Nitpick comments (1)
python/sedonadb/python/sedonadb/dbapi.py (1)
41-41: Minor documentation change with unclear relevance to PR objectives.The addition of
_ =to explicitly ignore the return value ofcur.execute()is valid Python convention but not strictly necessary here. The PR title mentions "Refactor spatial join code for easier implementing new features," yet this file only contains a trivial documentation update to a DBAPI example.Consider whether this change is intended for this PR, or if it was included accidentally.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (18)
python/sedonadb/python/sedonadb/dbapi.py(1 hunks)rust/sedona-spatial-join/src/build_index.rs(1 hunks)rust/sedona-spatial-join/src/evaluated_batch.rs(1 hunks)rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs(1 hunks)rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs(1 hunks)rust/sedona-spatial-join/src/exec.rs(4 hunks)rust/sedona-spatial-join/src/index.rs(1 hunks)rust/sedona-spatial-join/src/index/build_side_collector.rs(1 hunks)rust/sedona-spatial-join/src/index/knn_adapter.rs(1 hunks)rust/sedona-spatial-join/src/index/spatial_index.rs(1 hunks)rust/sedona-spatial-join/src/index/spatial_index_builder.rs(1 hunks)rust/sedona-spatial-join/src/lib.rs(1 hunks)rust/sedona-spatial-join/src/operand_evaluator.rs(4 hunks)rust/sedona-spatial-join/src/refine/geos.rs(3 hunks)rust/sedona-spatial-join/src/refine/tg.rs(1 hunks)rust/sedona-spatial-join/src/stream.rs(10 hunks)rust/sedona-spatial-join/src/utils.rs(1 hunks)rust/sedona-spatial-join/src/utils/join_utils.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-28T20:11:56.888Z
Learnt from: martin-augment
Repo: martin-augment/sedona-db PR: 5
File: rust/sedona-functions/src/st_start_point.rs:115-123
Timestamp: 2025-10-28T20:11:56.888Z
Learning: In Rust Arrow projects, `BinaryBuilder` (which is a type alias for `GenericByteBuilder<GenericBinaryType<i32>>`) implements the `std::io::Write` trait. The correct pattern for building WKB binary data is: (1) write bytes to the builder using the Write trait methods, (2) call `builder.append_value([])` to finalize the current value. The empty slice signals completion of the buffered value, not replacement. This pattern is used throughout sedona-functions.
Applied to files:
rust/sedona-spatial-join/src/operand_evaluator.rs
📚 Learning: 2025-10-27T06:50:54.305Z
Learnt from: martin-augment
Repo: martin-augment/sedona-db PR: 2
File: c/sedona-geos/src/st_buffer.rs:243-252
Timestamp: 2025-10-27T06:50:54.305Z
Learning: In the ST_Buffer implementation in c/sedona-geos/src/st_buffer.rs, the `parse_side()` function should return an enum (e.g., BufferSide with variants Both, Left, Right) instead of a boolean, to properly distinguish between left and right side buffer operations. The distance sign should be negated for right-side buffers since GEOS uses positive distance for left and negative for right.
Applied to files:
rust/sedona-spatial-join/src/refine/geos.rs
🧬 Code graph analysis (15)
rust/sedona-spatial-join/src/build_index.rs (4)
rust/sedona-spatial-join/src/exec.rs (5)
metrics(417-419)join_type(199-201)session_config(432-435)session_config(516-519)session_config(708-711)rust/sedona-spatial-join/src/operand_evaluator.rs (4)
create_operand_evaluator(74-89)new(182-187)new(198-203)new(380-382)rust/sedona-spatial-join/src/index/build_side_collector.rs (2)
new(72-81)new(85-87)rust/sedona-spatial-join/src/index/spatial_index_builder.rs (2)
new(92-97)new(102-126)
rust/sedona-spatial-join/src/index/build_side_collector.rs (2)
rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs (1)
new(33-37)rust/sedona-spatial-join/src/evaluated_batch.rs (1)
in_mem_size(37-43)
rust/sedona-spatial-join/src/operand_evaluator.rs (1)
rust/sedona-spatial-join/src/evaluated_batch.rs (1)
wkb(49-52)
rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs (1)
rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs (1)
is_external(41-43)
rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs (1)
rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs (1)
is_external(29-29)
rust/sedona-spatial-join/src/index/spatial_index_builder.rs (7)
rust/sedona-spatial-join/src/operand_evaluator.rs (8)
array(255-255)array(290-290)create_operand_evaluator(74-89)new(182-187)new(198-203)new(380-382)in_mem_size(157-171)try_new(108-146)rust/sedona-spatial-join/src/index/spatial_index.rs (5)
RTreeBuilder(119-119)refine(393-443)schema(136-138)empty(101-134)visited_left_side(463-465)rust/sedona-spatial-join/src/refine.rs (2)
refine(56-60)create_refiner(112-142)rust/sedona-spatial-join/src/utils/join_utils.rs (1)
need_produce_result_in_final(44-53)rust/sedona-spatial-join/src/index/knn_adapter.rs (2)
new(40-62)new(89-99)rust/sedona-spatial-join/src/evaluated_batch.rs (2)
in_mem_size(37-43)rects(54-56)rust/sedona-spatial-join/src/utils/concurrent_reservation.rs (1)
try_new(36-44)
rust/sedona-spatial-join/src/lib.rs (6)
rust/sedona-spatial-join/src/build_index.rs (1)
build_index(36-89)rust/sedona-spatial-join/src/index/spatial_index.rs (1)
refine(393-443)rust/sedona-spatial-join/src/refine/geos.rs (1)
refine(330-346)rust/sedona-spatial-join/src/refine/tg.rs (1)
refine(248-268)rust/sedona-spatial-join/src/refine.rs (1)
refine(56-60)rust/sedona-spatial-join/src/refine/geo.rs (1)
refine(154-175)
rust/sedona-spatial-join/src/index/knn_adapter.rs (3)
rust/sedona-spatial-join/src/evaluated_batch.rs (1)
wkb(49-52)rust/sedona-geo/src/to_geo.rs (1)
item_to_geometry(58-69)rust/sedona-spatial-join/src/index/spatial_index_builder.rs (4)
new(92-97)new(102-126)indexed_batches(148-152)indexed_batches(243-247)
rust/sedona-spatial-join/src/index/spatial_index.rs (7)
rust/sedona-spatial-join/src/evaluated_batch.rs (2)
distance(58-60)wkb(49-52)rust/sedona-geo/src/to_geo.rs (1)
item_to_geometry(58-69)rust/sedona-spatial-join/src/refine.rs (3)
refine(56-60)need_more_probe_stats(81-81)merge_probe_stats(87-87)rust/sedona-spatial-join/src/refine/geo.rs (4)
refine(154-175)new(78-109)need_more_probe_stats(189-191)merge_probe_stats(193-197)rust/sedona-spatial-join/src/utils/concurrent_reservation.rs (1)
try_new(36-44)rust/sedona-spatial-join/src/index/knn_adapter.rs (2)
new(40-62)new(89-99)rust/sedona-spatial-join/src/index/spatial_index_builder.rs (4)
new(92-97)new(102-126)indexed_batches(148-152)indexed_batches(243-247)
rust/sedona-spatial-join/src/evaluated_batch.rs (1)
rust/sedona-spatial-join/src/operand_evaluator.rs (1)
wkbs(149-155)
rust/sedona-spatial-join/src/exec.rs (2)
rust/sedona-spatial-join/src/build_index.rs (1)
build_index(36-89)rust/sedona-spatial-join/src/utils/join_utils.rs (2)
asymmetric_join_output_partitioning(431-452)boundedness_from_children(457-487)
rust/sedona-spatial-join/src/stream.rs (3)
rust/sedona-spatial-join/src/operand_evaluator.rs (6)
create_operand_evaluator(74-89)distance_value_at(283-306)new(182-187)new(198-203)new(380-382)wkbs(149-155)rust/sedona-spatial-join/src/utils/join_utils.rs (2)
apply_join_filter_to_indices(95-128)build_batch_from_indices(132-185)rust/sedona-spatial-join/src/evaluated_batch.rs (2)
rects(54-56)distance(58-60)
rust/sedona-spatial-join/src/utils/join_utils.rs (2)
rust/sedona-spatial-join/src/exec.rs (5)
join_type(199-201)filter(57-57)try_new_with_options(157-196)try_new(145-154)children(393-395)rust/sedona-spatial-join/src/stream.rs (5)
schema(442-444)new(87-119)new(146-160)new(494-508)new(849-870)
rust/sedona-spatial-join/src/index.rs (1)
rust/sedona-spatial-join/src/evaluated_batch.rs (1)
wkb(49-52)
rust/sedona-spatial-join/src/refine/geos.rs (1)
rust/sedona-spatial-join/src/evaluated_batch.rs (1)
wkb(49-52)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: codex
- GitHub Check: claude-review
🔇 Additional comments (1)
rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs (1)
33-55: Nice lightweight stream adapter.Compact synchronous iterator bridge keeps the evaluated batch stream path simple and avoids unnecessary wakeups.
| let probe_thread_count = self.right.output_partitioning().partition_count(); | ||
|
|
||
| Ok(build_index( | ||
| Arc::clone(&context), | ||
| build_side.schema(), | ||
| build_streams, | ||
| self.on.clone(), | ||
| sedona_options.spatial_join.clone(), | ||
| build_metrics, | ||
| Arc::clone(context.memory_pool()), | ||
| self.join_type, | ||
| probe_thread_count, | ||
| self.metrics.clone(), |
There was a problem hiding this comment.
Fix probe partition count after KNN swap
Line 552 computes probe_thread_count from self.right, but in KNN mode probe_plan can be the left side (when probe_side == JoinSide::Left). In that case the left plan may expose more partitions than the right plan, and downstream components index into probe-partition metrics using the actual probe partition ID. Passing the smaller right-side partition count therefore risks out-of-bounds panics or silently under-provisioned probe metrics when the optimizer decides to probe from the left. Please derive the count from probe_plan instead.
Apply this diff:
- let probe_thread_count = self.right.output_partitioning().partition_count();
+ let probe_thread_count = probe_plan.output_partitioning().partition_count();🤖 Prompt for AI Agents
In rust/sedona-spatial-join/src/exec.rs around lines 551 to 560,
probe_thread_count is computed from self.right which is incorrect in KNN mode
when probe_plan may be the left side; change the code to derive the partition
count from probe_plan (the actual probe side) instead of self.right so
probe_thread_count = probe_plan.output_partitioning().partition_count(), and
pass that value into build_index to avoid out-of-bounds or under-provisioned
probe metrics.
| .expression() | ||
| .evaluate(&intermediate_batch)? | ||
| .into_array(intermediate_batch.num_rows())?; | ||
| let mask = as_boolean_array(&filter_result)?; | ||
|
|
||
| let left_filtered = compute::filter(&build_indices, mask)?; | ||
| let right_filtered = compute::filter(&probe_indices, mask)?; | ||
| Ok(( | ||
| downcast_array(left_filtered.as_ref()), | ||
| downcast_array(right_filtered.as_ref()), | ||
| )) |
There was a problem hiding this comment.
Return owned filtered arrays
compute::filter gives you an ArrayRef, and downcast_array only hands back a borrowed view. Returning those borrows from the function doesn’t compile (and would dangle if it did). Clone the downcast arrays before returning so you hand back owned PrimitiveArrays.
let left_filtered = compute::filter(&build_indices, mask)?;
let right_filtered = compute::filter(&probe_indices, mask)?;
Ok((
- downcast_array(left_filtered.as_ref()),
- downcast_array(right_filtered.as_ref()),
- ))
+ downcast_array::<UInt64Array>(left_filtered.as_ref()).clone(),
+ downcast_array::<UInt32Array>(right_filtered.as_ref()).clone(),
+ ))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| .expression() | |
| .evaluate(&intermediate_batch)? | |
| .into_array(intermediate_batch.num_rows())?; | |
| let mask = as_boolean_array(&filter_result)?; | |
| let left_filtered = compute::filter(&build_indices, mask)?; | |
| let right_filtered = compute::filter(&probe_indices, mask)?; | |
| Ok(( | |
| downcast_array(left_filtered.as_ref()), | |
| downcast_array(right_filtered.as_ref()), | |
| )) | |
| .expression() | |
| .evaluate(&intermediate_batch)? | |
| .into_array(intermediate_batch.num_rows())?; | |
| let mask = as_boolean_array(&filter_result)?; | |
| let left_filtered = compute::filter(&build_indices, mask)?; | |
| let right_filtered = compute::filter(&probe_indices, mask)?; | |
| Ok(( | |
| downcast_array::<UInt64Array>(left_filtered.as_ref()).clone(), | |
| downcast_array::<UInt32Array>(right_filtered.as_ref()).clone(), | |
| )) |
🤖 Prompt for AI Agents
In rust/sedona-spatial-join/src/utils/join_utils.rs around lines 117 to 127, the
code uses compute::filter to get ArrayRef then calls downcast_array which
returns a borrowed view; you must return owned PrimitiveArray instances instead
of borrows. Fix by cloning or converting the downcasted arrays to owned arrays
before returning (e.g., call .clone() or .to_owned() on the downcast result) so
the function returns owned PrimitiveArray values rather than borrowed
references.
|
Findings
|
|
The changes in this PR are too complex for me and I cannot say whether the suggestions are good or not. |
277: To review by AI
Note
Major spatial join refactor: split into cohesive modules, add centralized index build pipeline, rework streaming and utils, and tweak APIs/metrics; minor Python doc example update.
index.rsintoindex/spatial_index.rs,index/spatial_index_builder.rs,index/build_side_collector.rs, andindex/knn_adapter.rs.evaluated_batch(with in-memory streams) and move join helpers toutils/join_utils.rs; addutils/{once_fut,init_once_array,concurrent_reservation}.rs.build_index.rsto orchestrate build-side collection (with metrics/memory reservations) and index construction; returns error when spilling (external builder not implemented).EvaluatedGeometryArray.rectsnowVec<Option<Rect<f32>>>; propagate through collectors, builder, index, and stream.output_rows), and tracked refiner mem/exec mode.SpatialJoinExecupdated to use newbuild_indexandutils::join_utils; cleans up once-future handling.SpatialJoinStream/iterators operate onEvaluatedBatch; outer-join unmatched production preserved.cur.execute(...)to a throwaway variable to avoid unused result.Written by Cursor Bugbot for commit 8c3535f. This will update automatically on new commits. Configure here.