Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 54 additions & 59 deletions datafusion/core/tests/dataframe/mod.rs

Large diffs are not rendered by default.

25 changes: 11 additions & 14 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,19 +766,18 @@ async fn test_physical_plan_display_indent() {

assert_snapshot!(
actual,
@r###"
@r"
SortPreservingMergeExec: [the_min@2 DESC], fetch=10
SortExec: TopK(fetch=10), expr=[the_min@2 DESC], preserve_partitioning=[true]
ProjectionExec: expr=[c1@0 as c1, max(aggregate_test_100.c12)@1 as max(aggregate_test_100.c12), min(aggregate_test_100.c12)@2 as the_min]
AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000
AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]
CoalesceBatchesExec: target_batch_size=4096
FilterExec: c12@1 < 10
RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], file_type=csv, has_header=true
"###
FilterExec: c12@1 < 10
RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], file_type=csv, has_header=true
"
);
}

Expand Down Expand Up @@ -1013,16 +1012,14 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> {
RecursiveQueryExec: name=number_series, is_distinct=false
CoalescePartitionsExec
ProjectionExec: expr=[id@0 as id, 1 as level]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: id@0 = 1
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
FilterExec: id@0 = 1
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
CoalescePartitionsExec
ProjectionExec: expr=[id@0 + 1 as ns.id + Int64(1), level@1 + 1 as ns.level + Int64(1)]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: id@0 < 10
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
WorkTableExec: name=number_series
FilterExec: id@0 < 10
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
WorkTableExec: name=number_series
"
);

Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-optimizer/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion_common::error::Result;
use datafusion_common::{config::ConfigOptions, internal_err};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_plan::{
async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec,
joins::HashJoinExec, repartition::RepartitionExec, ExecutionPlan,
};

Expand Down Expand Up @@ -60,8 +60,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
// See https://github.com/apache/datafusion/issues/139
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
let wrap_in_coalesce = plan_any.downcast_ref::<HashJoinExec>().is_some()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment above still mentions handling “small batches that are produced by highly selective filters,” but the implementation no longer wraps FilterExec. Consider updating that comment to reflect the current behavior (HashJoin/Repartition and AsyncFuncExec) to keep docs accurate.

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-but-wont-fix; category:documentation; feedback:The Augment AI reviewer is correct that the comment mentions "filter" but the result could be filtered not only by FilterExec. HashJoinExec also filters the results by returning only those matching the join condition.

// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
|| plan_any
.downcast_ref::<RepartitionExec>()
Expand Down
20 changes: 9 additions & 11 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6022,10 +6022,9 @@ physical_plan
07)------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
08)--------------CoalescePartitionsExec
09)----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
13)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
10)------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2, c3], file_type=csv, has_header=true

query I
SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 group by c3 order by c3 limit 4;
Expand Down Expand Up @@ -7165,13 +7164,12 @@ logical_plan
03)----Aggregate: groupBy=[[having_test.v1, having_test.v2]], aggr=[[max(having_test.v1)]]
04)------TableScan: having_test projection=[v1, v2]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: max(having_test.v1)@2 = 3, projection=[v1@0, v2@1]
03)----AggregateExec: mode=FinalPartitioned, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([v1@0, v2@1], 4), input_partitions=1
06)----------AggregateExec: mode=Partial, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
01)FilterExec: max(having_test.v1)@2 = 3, projection=[v1@0, v2@1]
02)--AggregateExec: mode=FinalPartitioned, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([v1@0, v2@1], 4), input_partitions=1
05)--------AggregateExec: mode=Partial, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]


query error
Expand Down
42 changes: 18 additions & 24 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6436,10 +6436,9 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]

query I
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
Expand All @@ -6465,10 +6464,9 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]

query I
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
Expand All @@ -6494,10 +6492,9 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]

query I
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
Expand All @@ -6523,10 +6520,9 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]

query I
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
Expand All @@ -6552,10 +6548,9 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]

query I
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
Expand Down Expand Up @@ -6583,10 +6578,9 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IS NOT NULL OR NULL
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IS NOT NULL OR NULL
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]

# any operator
query ?
Expand Down
11 changes: 5 additions & 6 deletions datafusion/sqllogictest/test_files/async_udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ logical_plan
01)Filter: async_abs(data.x) < Int32(5)
02)--TableScan: data projection=[x]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: __async_fn_0@1 < 5, projection=[x@0]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_abs(x@0))]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
01)FilterExec: __async_fn_0@1 < 5, projection=[x@0]
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
03)----AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_abs(x@0))]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------DataSourceExec: partitions=1, partition_sizes=[1]

# Async udf can be used in projection
query I rowsort
Expand Down
13 changes: 6 additions & 7 deletions datafusion/sqllogictest/test_files/count_star_rule.slt
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,12 @@ logical_plan
04)------TableScan: t1 projection=[a]
physical_plan
01)ProjectionExec: expr=[a@0 as a, count(Int64(1))@1 as cnt]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----FilterExec: count(Int64(1))@1 > 0
04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
07)------------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))]
08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
02)--FilterExec: count(Int64(1))@1 > 0
03)----AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))]
07)------------DataSourceExec: partitions=1, partition_sizes=[1]

query II
SELECT t1.a, COUNT() AS cnt FROM t1 GROUP BY t1.a HAVING COUNT() > 1;
Expand Down
39 changes: 17 additions & 22 deletions datafusion/sqllogictest/test_files/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,9 @@ physical_plan
03)----PlaceholderRowExec
04)--CoalescePartitionsExec
05)----ProjectionExec: expr=[id@0 + 1 as id]
06)------CoalesceBatchesExec: target_batch_size=8192
07)--------FilterExec: id@0 < 10
08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)------------WorkTableExec: name=nodes
06)------FilterExec: id@0 < 10
07)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)----------WorkTableExec: name=nodes

Comment on lines +120 to 123
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Refresh batch-size commentary for recursive CTE plans.

These explain outputs used to show CoalesceBatchesExec: target_batch_size=2, which justified the nearby note “use explain to ensure batch size is set to 2.” After removing the Filter coalescing, that evidence disappeared, so the comment is now misleading. Please adjust the wording (or add another observable hook) so the test description reflects what the plan actually shows.

You could, for example, rewrite it to something like:

-# use explain to ensure that batch size is set to 2. This should produce multiple batches per iteration since the input
-# table 'balances' has 4 rows
+# With batch size set to 2 we still produce multiple batches per iteration; the plan no longer prints a Coalesce node, so we
+# rely on the configuration rather than the explain output to confirm the setting.

Also applies to: 164-167

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct that the comments in the snapshot tests need to be updated to reflect that FilterExec is no more wrapped inside CoalesceBatchesExec

# setup
statement ok
Expand Down Expand Up @@ -162,10 +161,9 @@ physical_plan
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/recursive_cte/balance.csv]]}, projection=[time, name, account_balance], file_type=csv, has_header=true
04)----CoalescePartitionsExec
05)------ProjectionExec: expr=[time@0 + 1 as time, name@1 as name, account_balance@2 + 10 as account_balance]
06)--------CoalesceBatchesExec: target_batch_size=2
07)----------FilterExec: time@0 < 10
08)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)--------------WorkTableExec: name=balances
06)--------FilterExec: time@0 < 10
07)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)------------WorkTableExec: name=balances

# recursive CTE with static term derived from table works
# note that this is run with batch size set to 2. This should produce multiple batches per iteration since the input
Expand Down Expand Up @@ -734,12 +732,11 @@ physical_plan
04)--ProjectionExec: expr=[2 as val]
05)----CrossJoinExec
06)------CoalescePartitionsExec
07)--------CoalesceBatchesExec: target_batch_size=8182
08)----------FilterExec: val@0 < 2
09)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)--------------WorkTableExec: name=recursive_cte
11)------ProjectionExec: expr=[2 as val]
12)--------PlaceholderRowExec
07)--------FilterExec: val@0 < 2
08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)------------WorkTableExec: name=recursive_cte
10)------ProjectionExec: expr=[2 as val]
11)--------PlaceholderRowExec

# Test issue: https://github.com/apache/datafusion/issues/9794
# Non-recursive term and recursive term have different types
Expand Down Expand Up @@ -964,10 +961,9 @@ physical_plan
03)----PlaceholderRowExec
04)--CoalescePartitionsExec
05)----ProjectionExec: expr=[n@0 + 1 as numbers.n + Int64(1)]
06)------CoalesceBatchesExec: target_batch_size=8182
07)--------FilterExec: n@0 < 10
08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)------------WorkTableExec: name=numbers
06)------FilterExec: n@0 < 10
07)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)----------WorkTableExec: name=numbers

query TT
explain WITH RECURSIVE numbers AS (
Expand All @@ -990,10 +986,9 @@ physical_plan
03)----PlaceholderRowExec
04)--CoalescePartitionsExec
05)----ProjectionExec: expr=[n@0 + 1 as numbers.n + Int64(1)]
06)------CoalesceBatchesExec: target_batch_size=8182
07)--------FilterExec: n@0 < 10
08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)------------WorkTableExec: name=numbers
06)------FilterExec: n@0 < 10
07)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)----------WorkTableExec: name=numbers

# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions
query II
Expand Down
15 changes: 6 additions & 9 deletions datafusion/sqllogictest/test_files/dictionary.slt
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,8 @@ logical_plan
01)Filter: test.column2 = Dictionary(Int32, Utf8("1"))
02)--TableScan: test projection=[column1, column2]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: column2@1 = 1
03)----DataSourceExec: partitions=1, partition_sizes=[1]
01)FilterExec: column2@1 = 1
02)--DataSourceExec: partitions=1, partition_sizes=[1]

# try literal = col to verify order doesn't matter
# filter should not cast column2
Expand All @@ -423,9 +422,8 @@ logical_plan
01)Filter: test.column2 = Dictionary(Int32, Utf8("1"))
02)--TableScan: test projection=[column1, column2]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: column2@1 = 1
03)----DataSourceExec: partitions=1, partition_sizes=[1]
01)FilterExec: column2@1 = 1
02)--DataSourceExec: partitions=1, partition_sizes=[1]


# Now query using an integer which must be coerced into a dictionary string
Expand All @@ -441,9 +439,8 @@ logical_plan
01)Filter: test.column2 = Dictionary(Int32, Utf8("1"))
02)--TableScan: test projection=[column1, column2]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: column2@1 = 1
03)----DataSourceExec: partitions=1, partition_sizes=[1]
01)FilterExec: column2@1 = 1
02)--DataSourceExec: partitions=1, partition_sizes=[1]

# Window Functions
query I
Expand Down
7 changes: 3 additions & 4 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ logical_plan
02)--Filter: aggregate_test_100.c2 > Int8(10)
03)----TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: c2@1 > 10, projection=[c1@0]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2], file_type=csv, has_header=true
01)FilterExec: c2@1 > 10, projection=[c1@0]
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2], file_type=csv, has_header=true

# explain_csv_exec_scan_config

Expand Down
Loading
Loading