Skip to content

Commit 4347a27

Browse files
committed
Improve hash join exec builder
Prior the patch HashJoinExecBuilder constructed from an existing node reseted some fields of the node, e.g. dynamic filters, metrics. It significantly reduces usage scope of the builder. This patch improves the implementation. Now builder created from the existing node preserves all fields in case they have not been explicitly updated. Also builder now tracks flag if it must recompute plan properties. Closes apache#20270
1 parent 3f38609 commit 4347a27

File tree

4 files changed

+246
-237
lines changed

4 files changed

+246
-237
lines changed

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 27 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use datafusion_physical_plan::aggregates::{
4949
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
5050
use datafusion_physical_plan::execution_plan::EmissionType;
5151
use datafusion_physical_plan::joins::{
52-
CrossJoinExec, HashJoinExec, HashJoinExecBuilder, PartitionMode, SortMergeJoinExec,
52+
CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
5353
};
5454
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
5555
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -286,39 +286,26 @@ pub fn adjust_input_keys_ordering(
286286
) -> Result<Transformed<PlanWithKeyRequirements>> {
287287
let plan = Arc::clone(&requirements.plan);
288288

289-
if let Some(HashJoinExec {
290-
left,
291-
right,
292-
on,
293-
filter,
294-
join_type,
295-
projection,
296-
mode,
297-
null_equality,
298-
null_aware,
299-
..
300-
}) = plan.as_any().downcast_ref::<HashJoinExec>()
289+
if let Some(
290+
exec @ HashJoinExec {
291+
left,
292+
on,
293+
join_type,
294+
mode,
295+
..
296+
},
297+
) = plan.as_any().downcast_ref::<HashJoinExec>()
301298
{
302299
match mode {
303300
PartitionMode::Partitioned => {
304301
let join_constructor = |new_conditions: (
305302
Vec<(PhysicalExprRef, PhysicalExprRef)>,
306303
Vec<SortOptions>,
307304
)| {
308-
HashJoinExecBuilder::new(
309-
Arc::clone(left),
310-
Arc::clone(right),
311-
new_conditions.0,
312-
*join_type,
313-
)
314-
.with_filter(filter.clone())
315-
// TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
316-
.with_projection_ref(projection.clone())
317-
.with_partition_mode(PartitionMode::Partitioned)
318-
.with_null_equality(*null_equality)
319-
.with_null_aware(*null_aware)
320-
.build()
321-
.map(|e| Arc::new(e) as _)
305+
exec.builder()
306+
.with_partition_mode(PartitionMode::Partitioned)
307+
.with_on(new_conditions.0)
308+
.build_exec()
322309
};
323310
return reorder_partitioned_join_keys(
324311
requirements,
@@ -612,18 +599,15 @@ pub fn reorder_join_keys_to_inputs(
612599
plan: Arc<dyn ExecutionPlan>,
613600
) -> Result<Arc<dyn ExecutionPlan>> {
614601
let plan_any = plan.as_any();
615-
if let Some(HashJoinExec {
616-
left,
617-
right,
618-
on,
619-
filter,
620-
join_type,
621-
projection,
622-
mode,
623-
null_equality,
624-
null_aware,
625-
..
626-
}) = plan_any.downcast_ref::<HashJoinExec>()
602+
if let Some(
603+
exec @ HashJoinExec {
604+
left,
605+
right,
606+
on,
607+
mode,
608+
..
609+
},
610+
) = plan_any.downcast_ref::<HashJoinExec>()
627611
{
628612
if matches!(mode, PartitionMode::Partitioned) {
629613
let (join_keys, positions) = reorder_current_join_keys(
@@ -639,20 +623,11 @@ pub fn reorder_join_keys_to_inputs(
639623
right_keys,
640624
} = join_keys;
641625
let new_join_on = new_join_conditions(&left_keys, &right_keys);
642-
return Ok(Arc::new(
643-
HashJoinExecBuilder::new(
644-
Arc::clone(left),
645-
Arc::clone(right),
646-
new_join_on,
647-
*join_type,
648-
)
649-
.with_filter(filter.clone())
650-
.with_projection_ref(projection.clone())
626+
return exec
627+
.builder()
651628
.with_partition_mode(PartitionMode::Partitioned)
652-
.with_null_equality(*null_equality)
653-
.with_null_aware(*null_aware)
654-
.build()?,
655-
));
629+
.with_on(new_join_on)
630+
.build_exec();
656631
}
657632
}
658633
} else if let Some(SortMergeJoinExec {

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion_physical_expr::expressions::Column;
3434
use datafusion_physical_plan::execution_plan::EmissionType;
3535
use datafusion_physical_plan::joins::utils::ColumnIndex;
3636
use datafusion_physical_plan::joins::{
37-
CrossJoinExec, HashJoinExec, HashJoinExecBuilder, NestedLoopJoinExec, PartitionMode,
37+
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
3838
StreamJoinPartitionMode, SymmetricHashJoinExec,
3939
};
4040
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
@@ -192,14 +192,16 @@ pub(crate) fn try_collect_left(
192192
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
193193
} else {
194194
Ok(Some(Arc::new(
195-
HashJoinExecBuilder::from(hash_join)
195+
hash_join
196+
.builder()
196197
.with_partition_mode(PartitionMode::CollectLeft)
197198
.build()?,
198199
)))
199200
}
200201
}
201202
(true, false) => Ok(Some(Arc::new(
202-
HashJoinExecBuilder::from(hash_join)
203+
hash_join
204+
.builder()
203205
.with_partition_mode(PartitionMode::CollectLeft)
204206
.build()?,
205207
))),
@@ -243,7 +245,8 @@ pub(crate) fn partitioned_hash_join(
243245
};
244246

245247
Ok(Arc::new(
246-
HashJoinExecBuilder::from(hash_join)
248+
hash_join
249+
.builder()
247250
.with_partition_mode(partition_mode)
248251
.build()?,
249252
))

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub use crate::ordering::InputOrderMode;
2525
use crate::sort_pushdown::SortOrderPushdownResult;
2626
pub use crate::stream::EmptyRecordBatchStream;
2727

28+
use arrow_schema::Schema;
2829
pub use datafusion_common::hash_utils;
2930
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3031
pub use datafusion_common::utils::project_schema;
@@ -38,7 +39,7 @@ pub use datafusion_physical_expr::{
3839

3940
use std::any::Any;
4041
use std::fmt::Debug;
41-
use std::sync::Arc;
42+
use std::sync::{Arc, LazyLock};
4243

4344
use crate::coalesce_partitions::CoalescePartitionsExec;
4445
use crate::display::DisplayableExecutionPlan;
@@ -1433,6 +1434,16 @@ pub enum CardinalityEffect {
14331434
GreaterEqual,
14341435
}
14351436

1437+
/// Can be used in contexts where properties have not yet been initialized properly.
1438+
pub(crate) static STUB_PROPERTIES: LazyLock<PlanProperties> = LazyLock::new(|| {
1439+
PlanProperties::new(
1440+
EquivalenceProperties::new(Arc::new(Schema::empty())),
1441+
Partitioning::UnknownPartitioning(1),
1442+
EmissionType::Final,
1443+
Boundedness::Bounded,
1444+
)
1445+
});
1446+
14361447
#[cfg(test)]
14371448
mod tests {
14381449
use std::any::Any;

0 commit comments

Comments
 (0)