Skip to content

Commit d3ac7a3

Browse files
authored
Wrap immutable plan parts into Arc (make creating ExecutionPlans less costly) (#19893)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19852 ## Rationale for this change Improve performance of query planning and plan state re-set by making node clone cheap. ## What changes are included in this PR? - Store projection as `Option<Arc<[usize]>>` instead of `Option<Vec<usize>>` in `FilterExec`, `HashJoinExec`, `NestedLoopJoinExec`. - Store exprs as `Arc<[ProjectionExpr]>` instead of Vec in `ProjectionExprs`. - Store arced aggregation, filter, group by expressions within `AggregateExec`.
1 parent 828e1c1 commit d3ac7a3

File tree

22 files changed

+509
-290
lines changed

22 files changed

+509
-290
lines changed

datafusion/common/src/stats.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,13 @@ impl Statistics {
391391
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
392392
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
393393
/// "b"}`.
394-
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
395-
let Some(projection) = projection else {
394+
pub fn project(self, projection: Option<&impl AsRef<[usize]>>) -> Self {
395+
let projection = projection.map(AsRef::as_ref);
396+
self.project_impl(projection)
397+
}
398+
399+
fn project_impl(mut self, projection: Option<&[usize]>) -> Self {
400+
let Some(projection) = projection.map(AsRef::as_ref) else {
396401
return self;
397402
};
398403

@@ -410,7 +415,7 @@ impl Statistics {
410415
.map(Slot::Present)
411416
.collect();
412417

413-
for idx in projection {
418+
for idx in projection.iter() {
414419
let next_idx = self.column_statistics.len();
415420
let slot = std::mem::replace(
416421
columns.get_mut(*idx).expect("projection out of bounds"),
@@ -1066,7 +1071,7 @@ mod tests {
10661071

10671072
#[test]
10681073
fn test_project_none() {
1069-
let projection = None;
1074+
let projection: Option<Vec<usize>> = None;
10701075
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
10711076
assert_eq!(stats, make_stats(vec![10, 20, 30]));
10721077
}

datafusion/common/src/utils/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ use std::thread::available_parallelism;
7070
/// ```
7171
pub fn project_schema(
7272
schema: &SchemaRef,
73-
projection: Option<&Vec<usize>>,
73+
projection: Option<&impl AsRef<[usize]>>,
7474
) -> Result<SchemaRef> {
7575
let schema = match projection {
76-
Some(columns) => Arc::new(schema.project(columns)?),
76+
Some(columns) => Arc::new(schema.project(columns.as_ref())?),
7777
None => Arc::clone(schema),
7878
};
7979
Ok(schema)

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,7 @@ impl DefaultPhysicalPlanner {
10071007
// project the output columns excluding the async functions
10081008
// The async functions are always appended to the end of the schema.
10091009
.apply_projection(Some(
1010-
(0..input.schema().fields().len()).collect(),
1010+
(0..input.schema().fields().len()).collect::<Vec<_>>(),
10111011
))?
10121012
.with_batch_size(session_state.config().batch_size())
10131013
.build()?

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ async fn test_hash_join_swap_on_joins_with_projections(
762762
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
763763
);
764764

765-
assert_eq!(swapped_join.projection, Some(vec![0_usize]));
765+
assert_eq!(swapped_join.projection.as_deref().unwrap(), &[0_usize]);
766766
assert_eq!(swapped.schema().fields.len(), 1);
767767
assert_eq!(swapped.schema().fields[0].name(), "small_col");
768768
Ok(())

datafusion/physical-expr/src/projection.rs

Lines changed: 72 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
2929
use datafusion_common::stats::{ColumnStatistics, Precision};
3030
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3131
use datafusion_common::{
32-
Result, ScalarValue, assert_or_internal_err, internal_datafusion_err, plan_err,
32+
Result, ScalarValue, Statistics, assert_or_internal_err, internal_datafusion_err,
33+
plan_err,
3334
};
3435

3536
use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet;
@@ -125,7 +126,8 @@ impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
125126
/// indices.
126127
#[derive(Debug, Clone, PartialEq, Eq)]
127128
pub struct ProjectionExprs {
128-
exprs: Vec<ProjectionExpr>,
129+
/// [`Arc`] used for a cheap clone, which improves physical plan optimization performance.
130+
exprs: Arc<[ProjectionExpr]>,
129131
}
130132

131133
impl std::fmt::Display for ProjectionExprs {
@@ -137,22 +139,24 @@ impl std::fmt::Display for ProjectionExprs {
137139

138140
impl From<Vec<ProjectionExpr>> for ProjectionExprs {
139141
fn from(value: Vec<ProjectionExpr>) -> Self {
140-
Self { exprs: value }
142+
Self {
143+
exprs: value.into(),
144+
}
141145
}
142146
}
143147

144148
impl From<&[ProjectionExpr]> for ProjectionExprs {
145149
fn from(value: &[ProjectionExpr]) -> Self {
146150
Self {
147-
exprs: value.to_vec(),
151+
exprs: value.iter().cloned().collect(),
148152
}
149153
}
150154
}
151155

152156
impl FromIterator<ProjectionExpr> for ProjectionExprs {
153157
fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
154158
Self {
155-
exprs: exprs.into_iter().collect::<Vec<_>>(),
159+
exprs: exprs.into_iter().collect(),
156160
}
157161
}
158162
}
@@ -164,12 +168,17 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs {
164168
}
165169

166170
impl ProjectionExprs {
167-
pub fn new<I>(exprs: I) -> Self
168-
where
169-
I: IntoIterator<Item = ProjectionExpr>,
170-
{
171+
/// Make a new [`ProjectionExprs`] from expressions iterator.
172+
pub fn new(exprs: impl IntoIterator<Item = ProjectionExpr>) -> Self {
173+
Self {
174+
exprs: exprs.into_iter().collect(),
175+
}
176+
}
177+
178+
/// Make a new [`ProjectionExprs`] from expressions.
179+
pub fn from_expressions(exprs: impl Into<Arc<[ProjectionExpr]>>) -> Self {
171180
Self {
172-
exprs: exprs.into_iter().collect::<Vec<_>>(),
181+
exprs: exprs.into(),
173182
}
174183
}
175184

@@ -285,13 +294,14 @@ impl ProjectionExprs {
285294
{
286295
let exprs = self
287296
.exprs
288-
.into_iter()
297+
.iter()
298+
.cloned()
289299
.map(|mut proj| {
290300
proj.expr = f(proj.expr)?;
291301
Ok(proj)
292302
})
293-
.collect::<Result<Vec<_>>>()?;
294-
Ok(Self::new(exprs))
303+
.collect::<Result<Arc<_>>>()?;
304+
Ok(Self::from_expressions(exprs))
295305
}
296306

297307
/// Apply another projection on top of this projection, returning the combined projection.
@@ -361,7 +371,7 @@ impl ProjectionExprs {
361371
/// applied on top of this projection.
362372
pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
363373
let mut new_exprs = Vec::with_capacity(other.exprs.len());
364-
for proj_expr in &other.exprs {
374+
for proj_expr in other.exprs.iter() {
365375
let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
366376
.ok_or_else(|| {
367377
internal_datafusion_err!(
@@ -602,12 +612,12 @@ impl ProjectionExprs {
602612
/// ```
603613
pub fn project_statistics(
604614
&self,
605-
mut stats: datafusion_common::Statistics,
615+
mut stats: Statistics,
606616
output_schema: &Schema,
607-
) -> Result<datafusion_common::Statistics> {
617+
) -> Result<Statistics> {
608618
let mut column_statistics = vec![];
609619

610-
for proj_expr in &self.exprs {
620+
for proj_expr in self.exprs.iter() {
611621
let expr = &proj_expr.expr;
612622
let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
613623
std::mem::take(&mut stats.column_statistics[col.index()])
@@ -754,13 +764,52 @@ impl Projector {
754764
}
755765
}
756766

757-
impl IntoIterator for ProjectionExprs {
758-
type Item = ProjectionExpr;
759-
type IntoIter = std::vec::IntoIter<ProjectionExpr>;
767+
/// Describes an immutable reference counted projection.
768+
///
769+
/// This structure represents projecting a set of columns by index.
770+
/// [`Arc`] is used to make it cheap to clone.
771+
pub type ProjectionRef = Arc<[usize]>;
760772

761-
fn into_iter(self) -> Self::IntoIter {
762-
self.exprs.into_iter()
763-
}
773+
/// Combine two projections.
774+
///
775+
/// If `p1` is [`None`] then there are no changes.
776+
/// Otherwise, if passed `p2` is not [`None`] then it is remapped
777+
/// according to the `p1`. Otherwise, there are no changes.
778+
///
779+
/// # Example
780+
///
781+
/// If stored projection is [0, 2] and we call `apply_projection([0, 2, 3])`,
782+
/// then the resulting projection will be [0, 3].
783+
///
784+
/// # Error
785+
///
786+
/// Returns an internal error if `p1` contains index that is greater than `p2` len.
787+
///
788+
pub fn combine_projections(
789+
p1: Option<&ProjectionRef>,
790+
p2: Option<&ProjectionRef>,
791+
) -> Result<Option<ProjectionRef>> {
792+
let Some(p1) = p1 else {
793+
return Ok(None);
794+
};
795+
let Some(p2) = p2 else {
796+
return Ok(Some(Arc::clone(p1)));
797+
};
798+
799+
Ok(Some(
800+
p1.iter()
801+
.map(|i| {
802+
let idx = *i;
803+
assert_or_internal_err!(
804+
idx < p2.len(),
805+
"unable to apply projection: index {} is greater than new projection len {}",
806+
idx,
807+
p2.len(),
808+
);
809+
Ok(p2[*i])
810+
})
811+
.collect::<Result<Arc<[usize]>>>()?,
812+
))
764813
}
765814

766815
/// The function operates in two modes:

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use datafusion_physical_plan::aggregates::{
4949
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
5050
use datafusion_physical_plan::execution_plan::EmissionType;
5151
use datafusion_physical_plan::joins::{
52-
CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
52+
CrossJoinExec, HashJoinExec, HashJoinExecBuilder, PartitionMode, SortMergeJoinExec,
5353
};
5454
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
5555
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -305,18 +305,19 @@ pub fn adjust_input_keys_ordering(
305305
Vec<(PhysicalExprRef, PhysicalExprRef)>,
306306
Vec<SortOptions>,
307307
)| {
308-
HashJoinExec::try_new(
308+
HashJoinExecBuilder::new(
309309
Arc::clone(left),
310310
Arc::clone(right),
311311
new_conditions.0,
312-
filter.clone(),
313-
join_type,
314-
// TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
315-
projection.clone(),
316-
PartitionMode::Partitioned,
317-
*null_equality,
318-
*null_aware,
312+
*join_type,
319313
)
314+
.with_filter(filter.clone())
315+
// TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
316+
.with_projection_ref(projection.clone())
317+
.with_partition_mode(PartitionMode::Partitioned)
318+
.with_null_equality(*null_equality)
319+
.with_null_aware(*null_aware)
320+
.build()
320321
.map(|e| Arc::new(e) as _)
321322
};
322323
return reorder_partitioned_join_keys(
@@ -638,17 +639,20 @@ pub fn reorder_join_keys_to_inputs(
638639
right_keys,
639640
} = join_keys;
640641
let new_join_on = new_join_conditions(&left_keys, &right_keys);
641-
return Ok(Arc::new(HashJoinExec::try_new(
642-
Arc::clone(left),
643-
Arc::clone(right),
644-
new_join_on,
645-
filter.clone(),
646-
join_type,
647-
projection.clone(),
648-
PartitionMode::Partitioned,
649-
*null_equality,
650-
*null_aware,
651-
)?));
642+
return Ok(Arc::new(
643+
HashJoinExecBuilder::new(
644+
Arc::clone(left),
645+
Arc::clone(right),
646+
new_join_on,
647+
*join_type,
648+
)
649+
.with_filter(filter.clone())
650+
.with_projection_ref(projection.clone())
651+
.with_partition_mode(PartitionMode::Partitioned)
652+
.with_null_equality(*null_equality)
653+
.with_null_aware(*null_aware)
654+
.build()?,
655+
));
652656
}
653657
}
654658
} else if let Some(SortMergeJoinExec {

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ fn handle_hash_join(
723723
.collect();
724724

725725
let column_indices = build_join_column_index(plan);
726-
let projected_indices: Vec<_> = if let Some(projection) = &plan.projection {
726+
let projected_indices: Vec<_> = if let Some(projection) = plan.projection.as_ref() {
727727
projection.iter().map(|&i| &column_indices[i]).collect()
728728
} else {
729729
column_indices.iter().collect()

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion_physical_expr::expressions::Column;
3434
use datafusion_physical_plan::execution_plan::EmissionType;
3535
use datafusion_physical_plan::joins::utils::ColumnIndex;
3636
use datafusion_physical_plan::joins::{
37-
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
37+
CrossJoinExec, HashJoinExec, HashJoinExecBuilder, NestedLoopJoinExec, PartitionMode,
3838
StreamJoinPartitionMode, SymmetricHashJoinExec,
3939
};
4040
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
@@ -191,30 +191,18 @@ pub(crate) fn try_collect_left(
191191
{
192192
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
193193
} else {
194-
Ok(Some(Arc::new(HashJoinExec::try_new(
195-
Arc::clone(left),
196-
Arc::clone(right),
197-
hash_join.on().to_vec(),
198-
hash_join.filter().cloned(),
199-
hash_join.join_type(),
200-
hash_join.projection.clone(),
201-
PartitionMode::CollectLeft,
202-
hash_join.null_equality(),
203-
hash_join.null_aware,
204-
)?)))
194+
Ok(Some(Arc::new(
195+
HashJoinExecBuilder::from(hash_join)
196+
.with_partition_mode(PartitionMode::CollectLeft)
197+
.build()?,
198+
)))
205199
}
206200
}
207-
(true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(
208-
Arc::clone(left),
209-
Arc::clone(right),
210-
hash_join.on().to_vec(),
211-
hash_join.filter().cloned(),
212-
hash_join.join_type(),
213-
hash_join.projection.clone(),
214-
PartitionMode::CollectLeft,
215-
hash_join.null_equality(),
216-
hash_join.null_aware,
217-
)?))),
201+
(true, false) => Ok(Some(Arc::new(
202+
HashJoinExecBuilder::from(hash_join)
203+
.with_partition_mode(PartitionMode::CollectLeft)
204+
.build()?,
205+
))),
218206
(false, true) => {
219207
// Don't swap null-aware anti joins as they have specific side requirements
220208
if hash_join.join_type().supports_swap() && !hash_join.null_aware {
@@ -254,17 +242,11 @@ pub(crate) fn partitioned_hash_join(
254242
PartitionMode::Partitioned
255243
};
256244

257-
Ok(Arc::new(HashJoinExec::try_new(
258-
Arc::clone(left),
259-
Arc::clone(right),
260-
hash_join.on().to_vec(),
261-
hash_join.filter().cloned(),
262-
hash_join.join_type(),
263-
hash_join.projection.clone(),
264-
partition_mode,
265-
hash_join.null_equality(),
266-
hash_join.null_aware,
267-
)?))
245+
Ok(Arc::new(
246+
HashJoinExecBuilder::from(hash_join)
247+
.with_partition_mode(partition_mode)
248+
.build()?,
249+
))
268250
}
269251
}
270252

datafusion/physical-optimizer/src/projection_pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ fn try_push_down_join_filter(
135135
);
136136

137137
let new_lhs_length = lhs_rewrite.data.0.schema().fields.len();
138-
let projections = match projections {
138+
let projections = match projections.as_ref() {
139139
None => match join.join_type() {
140140
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
141141
// Build projections that ignore the newly projected columns.

0 commit comments

Comments
 (0)