Skip to content

Commit dcc9cfa

Browse files
committed
Improve hash join exec builder
Prior the patch HashJoinExecBuilder constructed from an existing node reseted some fields of the node, e.g. dynamic filters, metrics. It significantly reduces usage scope of the builder. This patch improves the implementation. Now builder created from the existing node preserves all fields in case they have not been explicitly updated. Also builder now tracks flag if it must recompute plan properties. Closes apache#20270
1 parent 35caa19 commit dcc9cfa

File tree

3 files changed

+188
-183
lines changed

3 files changed

+188
-183
lines changed

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 25 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -286,39 +286,26 @@ pub fn adjust_input_keys_ordering(
286286
) -> Result<Transformed<PlanWithKeyRequirements>> {
287287
let plan = Arc::clone(&requirements.plan);
288288

289-
if let Some(HashJoinExec {
290-
left,
291-
right,
292-
on,
293-
filter,
294-
join_type,
295-
projection,
296-
mode,
297-
null_equality,
298-
null_aware,
299-
..
300-
}) = plan.as_any().downcast_ref::<HashJoinExec>()
289+
if let Some(
290+
exec @ HashJoinExec {
291+
left,
292+
on,
293+
join_type,
294+
mode,
295+
..
296+
},
297+
) = plan.as_any().downcast_ref::<HashJoinExec>()
301298
{
302299
match mode {
303300
PartitionMode::Partitioned => {
304301
let join_constructor = |new_conditions: (
305302
Vec<(PhysicalExprRef, PhysicalExprRef)>,
306303
Vec<SortOptions>,
307304
)| {
308-
HashJoinExecBuilder::new(
309-
Arc::clone(left),
310-
Arc::clone(right),
311-
new_conditions.0,
312-
*join_type,
313-
)
314-
.with_filter(filter.clone())
315-
// TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
316-
.with_projection_ref(projection.clone())
317-
.with_partition_mode(PartitionMode::Partitioned)
318-
.with_null_equality(*null_equality)
319-
.with_null_aware(*null_aware)
320-
.build()
321-
.map(|e| Arc::new(e) as _)
305+
HashJoinExecBuilder::from(exec)
306+
.with_partition_mode(PartitionMode::Partitioned)
307+
.with_on(new_conditions.0)
308+
.build_exec()
322309
};
323310
return reorder_partitioned_join_keys(
324311
requirements,
@@ -612,18 +599,15 @@ pub fn reorder_join_keys_to_inputs(
612599
plan: Arc<dyn ExecutionPlan>,
613600
) -> Result<Arc<dyn ExecutionPlan>> {
614601
let plan_any = plan.as_any();
615-
if let Some(HashJoinExec {
616-
left,
617-
right,
618-
on,
619-
filter,
620-
join_type,
621-
projection,
622-
mode,
623-
null_equality,
624-
null_aware,
625-
..
626-
}) = plan_any.downcast_ref::<HashJoinExec>()
602+
if let Some(
603+
exec @ HashJoinExec {
604+
left,
605+
right,
606+
on,
607+
mode,
608+
..
609+
},
610+
) = plan_any.downcast_ref::<HashJoinExec>()
627611
{
628612
if matches!(mode, PartitionMode::Partitioned) {
629613
let (join_keys, positions) = reorder_current_join_keys(
@@ -639,20 +623,10 @@ pub fn reorder_join_keys_to_inputs(
639623
right_keys,
640624
} = join_keys;
641625
let new_join_on = new_join_conditions(&left_keys, &right_keys);
642-
return Ok(Arc::new(
643-
HashJoinExecBuilder::new(
644-
Arc::clone(left),
645-
Arc::clone(right),
646-
new_join_on,
647-
*join_type,
648-
)
649-
.with_filter(filter.clone())
650-
.with_projection_ref(projection.clone())
626+
return HashJoinExecBuilder::from(exec)
651627
.with_partition_mode(PartitionMode::Partitioned)
652-
.with_null_equality(*null_equality)
653-
.with_null_aware(*null_aware)
654-
.build()?,
655-
));
628+
.with_on(new_join_on)
629+
.build_exec();
656630
}
657631
}
658632
} else if let Some(SortMergeJoinExec {

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub use crate::ordering::InputOrderMode;
2525
use crate::sort_pushdown::SortOrderPushdownResult;
2626
pub use crate::stream::EmptyRecordBatchStream;
2727

28+
use arrow_schema::Schema;
2829
pub use datafusion_common::hash_utils;
2930
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3031
pub use datafusion_common::utils::project_schema;
@@ -38,7 +39,7 @@ pub use datafusion_physical_expr::{
3839

3940
use std::any::Any;
4041
use std::fmt::Debug;
41-
use std::sync::Arc;
42+
use std::sync::{Arc, LazyLock};
4243

4344
use crate::coalesce_partitions::CoalescePartitionsExec;
4445
use crate::display::DisplayableExecutionPlan;
@@ -1444,6 +1445,16 @@ pub enum CardinalityEffect {
14441445
GreaterEqual,
14451446
}
14461447

1448+
/// Can be used in contexts where properties have not yet been initialized properly.
1449+
pub(crate) static STUB_PROPERTIES: LazyLock<PlanProperties> = LazyLock::new(|| {
1450+
PlanProperties::new(
1451+
EquivalenceProperties::new(Arc::new(Schema::empty())),
1452+
Partitioning::UnknownPartitioning(1),
1453+
EmissionType::Final,
1454+
Boundedness::Bounded,
1455+
)
1456+
});
1457+
14471458
#[cfg(test)]
14481459
mod tests {
14491460
use std::any::Any;

0 commit comments

Comments
 (0)