Skip to content

Commit 682da84

Browse files
feat: Push limit into hash join (#20228)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Part of #18295. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Push limit down into hash join using limit pushdown optimizer. Use limit pushdown optimizer to pass the limit value to Hash Join exec using `with_fetch` and passing the `fetch` value to `LimitedBatch Coalescer` to emit the batch once the limit is hit. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? SLT tests + unit tests --------- Co-authored-by: Yongting You <2010youy01@gmail.com>
1 parent 9c6a35f commit 682da84

File tree

6 files changed

+563
-66
lines changed

6 files changed

+563
-66
lines changed

datafusion/core/tests/physical_optimizer/limit_pushdown.rs

Lines changed: 165 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
use std::sync::Arc;
1919

2020
use crate::physical_optimizer::test_utils::{
21-
coalesce_partitions_exec, global_limit_exec, local_limit_exec, sort_exec,
22-
sort_preserving_merge_exec, stream_exec,
21+
coalesce_partitions_exec, global_limit_exec, hash_join_exec, local_limit_exec,
22+
sort_exec, sort_preserving_merge_exec, stream_exec,
2323
};
2424

2525
use arrow::compute::SortOptions;
@@ -29,6 +29,7 @@ use datafusion_common::error::Result;
2929
use datafusion_expr::{JoinType, Operator};
3030
use datafusion_physical_expr::Partitioning;
3131
use datafusion_physical_expr::expressions::{BinaryExpr, col, lit};
32+
use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
3233
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
3334
use datafusion_physical_optimizer::PhysicalOptimizerRule;
3435
use datafusion_physical_optimizer::limit_pushdown::LimitPushdown;
@@ -161,6 +162,168 @@ fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li
161162
Ok(())
162163
}
163164

165+
fn join_on_columns(
166+
left_col: &str,
167+
right_col: &str,
168+
) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
169+
vec![(
170+
Arc::new(datafusion_physical_expr::expressions::Column::new(
171+
left_col, 0,
172+
)) as _,
173+
Arc::new(datafusion_physical_expr::expressions::Column::new(
174+
right_col, 0,
175+
)) as _,
176+
)]
177+
}
178+
179+
#[test]
180+
fn absorbs_limit_into_hash_join_inner() -> Result<()> {
181+
// HashJoinExec with Inner join should absorb limit via with_fetch
182+
let schema = create_schema();
183+
let left = empty_exec(Arc::clone(&schema));
184+
let right = empty_exec(Arc::clone(&schema));
185+
let on = join_on_columns("c1", "c1");
186+
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?;
187+
let global_limit = global_limit_exec(hash_join, 0, Some(5));
188+
189+
let initial = format_plan(&global_limit);
190+
insta::assert_snapshot!(
191+
initial,
192+
@r"
193+
GlobalLimitExec: skip=0, fetch=5
194+
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
195+
EmptyExec
196+
EmptyExec
197+
"
198+
);
199+
200+
let after_optimize =
201+
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
202+
let optimized = format_plan(&after_optimize);
203+
// The limit should be absorbed by the hash join (not pushed to children)
204+
insta::assert_snapshot!(
205+
optimized,
206+
@r"
207+
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=5
208+
EmptyExec
209+
EmptyExec
210+
"
211+
);
212+
213+
Ok(())
214+
}
215+
216+
#[test]
217+
fn absorbs_limit_into_hash_join_right() -> Result<()> {
218+
// HashJoinExec with Right join should absorb limit via with_fetch
219+
let schema = create_schema();
220+
let left = empty_exec(Arc::clone(&schema));
221+
let right = empty_exec(Arc::clone(&schema));
222+
let on = join_on_columns("c1", "c1");
223+
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Right)?;
224+
let global_limit = global_limit_exec(hash_join, 0, Some(10));
225+
226+
let initial = format_plan(&global_limit);
227+
insta::assert_snapshot!(
228+
initial,
229+
@r"
230+
GlobalLimitExec: skip=0, fetch=10
231+
HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)]
232+
EmptyExec
233+
EmptyExec
234+
"
235+
);
236+
237+
let after_optimize =
238+
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
239+
let optimized = format_plan(&after_optimize);
240+
// The limit should be absorbed by the hash join
241+
insta::assert_snapshot!(
242+
optimized,
243+
@r"
244+
HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)], fetch=10
245+
EmptyExec
246+
EmptyExec
247+
"
248+
);
249+
250+
Ok(())
251+
}
252+
253+
#[test]
254+
fn absorbs_limit_into_hash_join_left() -> Result<()> {
255+
// during probing, then unmatched rows at the end, stopping when limit is reached
256+
let schema = create_schema();
257+
let left = empty_exec(Arc::clone(&schema));
258+
let right = empty_exec(Arc::clone(&schema));
259+
let on = join_on_columns("c1", "c1");
260+
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Left)?;
261+
let global_limit = global_limit_exec(hash_join, 0, Some(5));
262+
263+
let initial = format_plan(&global_limit);
264+
insta::assert_snapshot!(
265+
initial,
266+
@r"
267+
GlobalLimitExec: skip=0, fetch=5
268+
HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)]
269+
EmptyExec
270+
EmptyExec
271+
"
272+
);
273+
274+
let after_optimize =
275+
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
276+
let optimized = format_plan(&after_optimize);
277+
// Left join now absorbs the limit
278+
insta::assert_snapshot!(
279+
optimized,
280+
@r"
281+
HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)], fetch=5
282+
EmptyExec
283+
EmptyExec
284+
"
285+
);
286+
287+
Ok(())
288+
}
289+
290+
#[test]
291+
fn absorbs_limit_with_skip_into_hash_join() -> Result<()> {
292+
let schema = create_schema();
293+
let left = empty_exec(Arc::clone(&schema));
294+
let right = empty_exec(Arc::clone(&schema));
295+
let on = join_on_columns("c1", "c1");
296+
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?;
297+
let global_limit = global_limit_exec(hash_join, 3, Some(5));
298+
299+
let initial = format_plan(&global_limit);
300+
insta::assert_snapshot!(
301+
initial,
302+
@r"
303+
GlobalLimitExec: skip=3, fetch=5
304+
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
305+
EmptyExec
306+
EmptyExec
307+
"
308+
);
309+
310+
let after_optimize =
311+
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
312+
let optimized = format_plan(&after_optimize);
313+
// With skip, GlobalLimit is kept but fetch (skip + limit = 8) is absorbed by the join
314+
insta::assert_snapshot!(
315+
optimized,
316+
@r"
317+
GlobalLimitExec: skip=3, fetch=5
318+
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=8
319+
EmptyExec
320+
EmptyExec
321+
"
322+
);
323+
324+
Ok(())
325+
}
326+
164327
#[test]
165328
fn pushes_global_limit_exec_through_projection_exec() -> Result<()> {
166329
let schema = create_schema();

0 commit comments

Comments
 (0)