Skip to content

Commit 319fd13

Browse files
LLDayaskalt
authored andcommitted
feat: expression rewriting in execution plans
Introduces `physical_expressions` and `with_physical_expressions` methods to the `ExecutionPlan` trait. These methods provide an interface to inspect and replace physical expressions within execution plan nodes. The commit also implements these methods for various ExecutionPlans and introduces a new physical expression invariant check to ensure consistency between expression retrieval and replacement.
1 parent 86cb815 commit 319fd13

File tree

31 files changed

+1333
-48
lines changed

31 files changed

+1333
-48
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2780,6 +2780,15 @@ impl DefaultPhysicalPlanner {
27802780
// to verify that the plan is executable.
27812781
InvariantChecker(InvariantLevel::Executable).check(&new_plan)?;
27822782

2783+
#[cfg(debug_assertions)]
2784+
{
2785+
use datafusion_physical_plan::execution_plan::check_physical_expressions;
2786+
new_plan.apply(|p| {
2787+
check_physical_expressions(Arc::clone(p))?;
2788+
Ok(TreeNodeRecursion::Continue)
2789+
})?;
2790+
}
2791+
27832792
debug!(
27842793
"Optimized physical plan:\n{}\n",
27852794
displayable(new_plan.as_ref()).indent(false)

datafusion/core/tests/physical_optimizer/pushdown_utils.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,16 @@ impl FileSource for TestSource {
156156
})
157157
}
158158

159+
fn with_filter_and_projection(
160+
&self,
161+
filter: Option<Arc<dyn PhysicalExpr>>,
162+
_projection: ProjectionExprs,
163+
) -> Result<Option<Arc<dyn FileSource>>> {
164+
let mut conf = self.clone();
165+
conf.predicate = filter;
166+
Ok(Some(Arc::new(conf)))
167+
}
168+
159169
fn metrics(&self) -> &ExecutionPlanMetricsSet {
160170
&self.metrics
161171
}

datafusion/datasource-arrow/src/source.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use datafusion_datasource::{TableSchema, as_file_source};
3838

3939
use arrow::buffer::Buffer;
4040
use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader};
41+
use datafusion_common::assert_or_internal_err;
4142
use datafusion_common::error::Result;
4243
use datafusion_common::exec_datafusion_err;
4344
use datafusion_common::tree_node::TreeNodeRecursion;
@@ -51,6 +52,7 @@ use datafusion_physical_plan::projection::ProjectionExprs;
5152

5253
use datafusion_datasource::file_stream::FileOpenFuture;
5354
use datafusion_datasource::file_stream::FileOpener;
55+
use datafusion_physical_plan::PhysicalExpr;
5456
use futures::StreamExt;
5557
use itertools::Itertools;
5658
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore, ObjectStoreExt};
@@ -400,9 +402,7 @@ impl FileSource for ArrowSource {
400402

401403
fn apply_expressions(
402404
&self,
403-
f: &mut dyn FnMut(
404-
&dyn datafusion_physical_plan::PhysicalExpr,
405-
) -> Result<TreeNodeRecursion>,
405+
f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
406406
) -> Result<TreeNodeRecursion> {
407407
// Visit projection expressions
408408
let mut tnr = TreeNodeRecursion::Continue;
@@ -411,6 +411,20 @@ impl FileSource for ArrowSource {
411411
}
412412
Ok(tnr)
413413
}
414+
415+
fn with_filter_and_projection(
416+
&self,
417+
filter: Option<Arc<dyn PhysicalExpr>>,
418+
projection: ProjectionExprs,
419+
) -> Result<Option<Arc<dyn FileSource>>> {
420+
assert_or_internal_err!(filter.is_none(), "filter should not be defined");
421+
422+
let mut conf = self.clone();
423+
conf.projection =
424+
SplitProjection::new(self.table_schema.file_schema(), &projection);
425+
426+
Ok(Some(Arc::new(conf)))
427+
}
414428
}
415429

416430
/// `FileOpener` wrapper for both Arrow IPC file and stream formats

datafusion/datasource-avro/src/source.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::sync::Arc;
2222

2323
use crate::avro_to_arrow::Reader as AvroReader;
2424

25+
use datafusion_common::assert_or_internal_err;
2526
use datafusion_common::error::Result;
2627
use datafusion_common::tree_node::TreeNodeRecursion;
2728
use datafusion_datasource::TableSchema;
@@ -33,6 +34,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering;
3334
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3435
use datafusion_physical_plan::projection::ProjectionExprs;
3536

37+
use datafusion_physical_plan::PhysicalExpr;
3638
use object_store::ObjectStore;
3739

3840
/// AvroSource holds the extra configuration that is necessary for opening avro files
@@ -123,6 +125,20 @@ impl FileSource for AvroSource {
123125
Some(&self.projection.source)
124126
}
125127

128+
fn with_filter_and_projection(
129+
&self,
130+
filter: Option<Arc<dyn PhysicalExpr>>,
131+
projection: ProjectionExprs,
132+
) -> Result<Option<Arc<dyn FileSource>>> {
133+
assert_or_internal_err!(filter.is_none(), "filter should not be defined");
134+
135+
let mut conf = self.clone();
136+
conf.projection =
137+
SplitProjection::new(self.table_schema.file_schema(), &projection);
138+
139+
Ok(Some(Arc::new(conf)))
140+
}
141+
126142
fn metrics(&self) -> &ExecutionPlanMetricsSet {
127143
&self.metrics
128144
}
@@ -143,9 +159,7 @@ impl FileSource for AvroSource {
143159

144160
fn apply_expressions(
145161
&self,
146-
f: &mut dyn FnMut(
147-
&dyn datafusion_physical_plan::PhysicalExpr,
148-
) -> Result<TreeNodeRecursion>,
162+
f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
149163
) -> Result<TreeNodeRecursion> {
150164
// Visit projection expressions
151165
let mut tnr = TreeNodeRecursion::Continue;

datafusion/datasource-csv/src/source.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ use datafusion_datasource::{
3636
use arrow::csv;
3737
use datafusion_common::config::CsvOptions;
3838
use datafusion_common::tree_node::TreeNodeRecursion;
39-
use datafusion_common::{DataFusionError, Result};
39+
use datafusion_common::{DataFusionError, Result, assert_or_internal_err};
4040
use datafusion_common_runtime::JoinSet;
4141
use datafusion_datasource::file::FileSource;
4242
use datafusion_datasource::file_scan_config::FileScanConfig;
4343
use datafusion_execution::TaskContext;
4444
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
4545
use datafusion_physical_plan::{
46-
DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
46+
DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr,
4747
};
4848

4949
use crate::file_format::CsvDecoder;
@@ -293,6 +293,20 @@ impl FileSource for CsvSource {
293293
Some(&self.projection.source)
294294
}
295295

296+
fn with_filter_and_projection(
297+
&self,
298+
filter: Option<Arc<dyn PhysicalExpr>>,
299+
projection: ProjectionExprs,
300+
) -> Result<Option<Arc<dyn FileSource>>> {
301+
assert_or_internal_err!(filter.is_none(), "filter should not be defined");
302+
303+
let mut conf = self.clone();
304+
conf.projection =
305+
SplitProjection::new(self.table_schema.file_schema(), &projection);
306+
307+
Ok(Some(Arc::new(conf)))
308+
}
309+
296310
fn metrics(&self) -> &ExecutionPlanMetricsSet {
297311
&self.metrics
298312
}
@@ -318,9 +332,7 @@ impl FileSource for CsvSource {
318332

319333
fn apply_expressions(
320334
&self,
321-
f: &mut dyn FnMut(
322-
&dyn datafusion_physical_plan::PhysicalExpr,
323-
) -> Result<TreeNodeRecursion>,
335+
f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
324336
) -> Result<TreeNodeRecursion> {
325337
// Visit projection expressions
326338
let mut tnr = TreeNodeRecursion::Continue;

datafusion/datasource-json/src/source.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::task::{Context, Poll};
2626
use crate::file_format::JsonDecoder;
2727
use crate::utils::{ChannelReader, JsonArrayToNdjsonReader};
2828

29+
use datafusion_common::assert_or_internal_err;
2930
use datafusion_common::error::{DataFusionError, Result};
3031
use datafusion_common::tree_node::TreeNodeRecursion;
3132
use datafusion_common_runtime::{JoinSet, SpawnedTask};
@@ -37,7 +38,7 @@ use datafusion_datasource::{
3738
ListingTableUrl, PartitionedFile, RangeCalculation, as_file_source, calculate_range,
3839
};
3940
use datafusion_physical_plan::projection::ProjectionExprs;
40-
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
41+
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties, PhysicalExpr};
4142

4243
use arrow::array::RecordBatch;
4344
use arrow::json::ReaderBuilder;
@@ -230,6 +231,20 @@ impl FileSource for JsonSource {
230231
Some(&self.projection.source)
231232
}
232233

234+
fn with_filter_and_projection(
235+
&self,
236+
filter: Option<Arc<dyn PhysicalExpr>>,
237+
projection: ProjectionExprs,
238+
) -> Result<Option<Arc<dyn FileSource>>> {
239+
assert_or_internal_err!(filter.is_none(), "filter should not be defined");
240+
241+
let mut conf = self.clone();
242+
conf.projection =
243+
SplitProjection::new(self.table_schema.file_schema(), &projection);
244+
245+
Ok(Some(Arc::new(conf)))
246+
}
247+
233248
fn metrics(&self) -> &ExecutionPlanMetricsSet {
234249
&self.metrics
235250
}
@@ -240,9 +255,7 @@ impl FileSource for JsonSource {
240255

241256
fn apply_expressions(
242257
&self,
243-
f: &mut dyn FnMut(
244-
&dyn datafusion_physical_plan::PhysicalExpr,
245-
) -> Result<TreeNodeRecursion>,
258+
f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
246259
) -> Result<TreeNodeRecursion> {
247260
// Visit projection expressions
248261
let mut tnr = TreeNodeRecursion::Continue;

datafusion/datasource-parquet/src/source.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,17 @@ impl FileSource for ParquetSource {
604604
Some(&self.projection)
605605
}
606606

607+
fn with_filter_and_projection(
608+
&self,
609+
filter: Option<Arc<dyn PhysicalExpr>>,
610+
projection: ProjectionExprs,
611+
) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> {
612+
let mut conf = self.clone();
613+
conf.predicate = filter;
614+
conf.projection = projection;
615+
Ok(Some(Arc::new(conf)))
616+
}
617+
607618
fn metrics(&self) -> &ExecutionPlanMetricsSet {
608619
&self.metrics
609620
}

datafusion/datasource/src/file.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,21 @@ pub trait FileSource: Send + Sync {
104104
None
105105
}
106106

107+
/// Returns new file source with given filter and projection.
108+
///
109+
/// This method is primarily used during physical plan rewriting (in
110+
/// `ExecutionPlan::with_physical_expressions`) to update the expressions within a file source.
111+
///
112+
/// It should NOT be used to pass in projections or filters. That pushdown is handled by
113+
/// optimizer rules.
114+
fn with_filter_and_projection(
115+
&self,
116+
_filter: Option<Arc<dyn PhysicalExpr>>,
117+
_projection: ProjectionExprs,
118+
) -> Result<Option<Arc<dyn FileSource>>> {
119+
Ok(None)
120+
}
121+
107122
/// Return execution plan metrics
108123
fn metrics(&self) -> &ExecutionPlanMetricsSet;
109124

datafusion/datasource/src/file_scan_config.rs

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use arrow::datatypes::{DataType, Schema, SchemaRef};
2929
use datafusion_common::config::ConfigOptions;
3030
use datafusion_common::tree_node::TreeNodeRecursion;
3131
use datafusion_common::{
32-
Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
32+
Constraints, Result, ScalarValue, Statistics, assert_eq_or_internal_err,
33+
internal_datafusion_err, internal_err,
3334
};
3435
use datafusion_execution::{
3536
SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
@@ -38,15 +39,15 @@ use datafusion_expr::Operator;
3839

3940
use datafusion_physical_expr::equivalence::project_orderings;
4041
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
41-
use datafusion_physical_expr::projection::ProjectionExprs;
42+
use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
4243
use datafusion_physical_expr::utils::reassign_expr_columns;
4344
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
4445
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
4546
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4647
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
4748
use datafusion_physical_plan::SortOrderPushdownResult;
4849
use datafusion_physical_plan::coop::cooperative;
49-
use datafusion_physical_plan::execution_plan::SchedulingType;
50+
use datafusion_physical_plan::execution_plan::{ReplacePhysicalExpr, SchedulingType};
5051
use datafusion_physical_plan::{
5152
DisplayAs, DisplayFormatType,
5253
display::{ProjectSchemaDisplay, display_orderings},
@@ -941,6 +942,75 @@ impl DataSource for FileScanConfig {
941942
// Delegate to the file source
942943
self.file_source.apply_expressions(f)
943944
}
945+
946+
fn physical_expressions<'a>(
947+
&'a self,
948+
) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> {
949+
let filter = self.file_source.filter().into_iter();
950+
let projection = self
951+
.file_source
952+
.projection()
953+
.into_iter()
954+
.flat_map(|p| p.expr_iter());
955+
956+
Some(Box::new(filter.chain(projection)))
957+
}
958+
959+
fn with_physical_expressions(
960+
&self,
961+
params: ReplacePhysicalExpr,
962+
) -> Result<Option<Arc<dyn DataSource>>> {
963+
let filter_count = self.file_source.filter().iter().len();
964+
let projection_count = self
965+
.file_source
966+
.projection()
967+
.map(|p| p.expr_iter().count())
968+
.unwrap_or(0);
969+
970+
let expected_count = filter_count + projection_count;
971+
let exprs_count = params.exprs.len();
972+
973+
assert_eq_or_internal_err!(
974+
expected_count,
975+
exprs_count,
976+
"Inconsistent number of physical expressions for FileScanConfig",
977+
);
978+
979+
let mut filter = None;
980+
let mut projection = vec![];
981+
let mut expr_iter = params.exprs.into_iter();
982+
983+
if filter_count > 0 {
984+
filter = expr_iter.next();
985+
}
986+
987+
if projection_count > 0 {
988+
projection = self
989+
.file_source
990+
.projection()
991+
.expect("should have expressions")
992+
.iter()
993+
.zip(expr_iter)
994+
.map(|(p, expr)| ProjectionExpr::new(expr, p.alias.clone()))
995+
.collect();
996+
}
997+
998+
let file_source = self
999+
.file_source
1000+
.with_filter_and_projection(filter, projection.into())?;
1001+
1002+
match file_source {
1003+
Some(file_source) => {
1004+
let conf_builder: FileScanConfigBuilder = self.clone().into();
1005+
Ok(Some(Arc::new(
1006+
conf_builder.with_source(file_source).build(),
1007+
)))
1008+
}
1009+
None => {
1010+
internal_err!("file source is not rebuilt")
1011+
}
1012+
}
1013+
}
9441014
}
9451015

9461016
impl FileScanConfig {

0 commit comments

Comments
 (0)