From d86874ed3305e21804d7443f670b3b53b6524bae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20=C5=9Een?= Date: Fri, 3 Apr 2026 00:17:06 +0300 Subject: [PATCH 1/4] [docs] add sql example to timestamp/datetime docs for time zone (#21082) ## Which issue does this PR close? - Closes #19104. ## Rationale for this change #19104 explains but some datetime functions respect `execution.time_zone` config but there are no SQL examples for them. This PR adds it and also missing sql example for `date_part`, `extract` and `date_trunc` ## What changes are included in this PR? sql example changes and generated doc ## Are these changes tested? ## Are there any user-facing changes? --- .../functions/src/datetime/current_date.rs | 19 +++- .../functions/src/datetime/current_time.rs | 19 +++- .../functions/src/datetime/date_part.rs | 16 +++- .../functions/src/datetime/date_trunc.rs | 16 +++- datafusion/functions/src/datetime/now.rs | 19 +++- .../source/user-guide/sql/scalar_functions.md | 94 +++++++++++++++++++ 6 files changed, 178 insertions(+), 5 deletions(-) diff --git a/datafusion/functions/src/datetime/current_date.rs b/datafusion/functions/src/datetime/current_date.rs index da9f6946d2023..d07a3b1caf13b 100644 --- a/datafusion/functions/src/datetime/current_date.rs +++ b/datafusion/functions/src/datetime/current_date.rs @@ -37,7 +37,24 @@ The `current_date()` return value is determined at query time and will return th "#, syntax_example = r#"current_date() (optional) SET datafusion.execution.time_zone = '+00:00'; - SELECT current_date();"# + SELECT current_date();"#, + sql_example = r#"```sql +> SELECT current_date(); ++----------------+ +| current_date() | ++----------------+ +| 2024-12-23 | ++----------------+ + +-- The current date is based on the session time zone (UTC by default) +> SET datafusion.execution.time_zone = 'Asia/Tokyo'; +> SELECT current_date(); ++----------------+ +| current_date() | ++----------------+ +| 2024-12-24 | ++----------------+ +```"# )] #[derive(Debug, PartialEq, Eq, Hash)] pub struct CurrentDateFunc { diff --git a/datafusion/functions/src/datetime/current_time.rs b/datafusion/functions/src/datetime/current_time.rs index b42f3ada3f037..92f4ae5e66f02 100644 --- a/datafusion/functions/src/datetime/current_time.rs +++ b/datafusion/functions/src/datetime/current_time.rs @@ -40,7 +40,24 @@ The session time zone can be set using the statement 'SET datafusion.execution.t "#, syntax_example = r#"current_time() (optional) SET datafusion.execution.time_zone = '+00:00'; - SELECT current_time();"# + SELECT current_time();"#, + sql_example = r#"```sql +> SELECT current_time(); ++--------------------+ +| current_time() | ++--------------------+ +| 06:30:00.123456789 | ++--------------------+ + +-- The current time is based on the session time zone (UTC by default) +> SET datafusion.execution.time_zone = 'Asia/Tokyo'; +> SELECT current_time(); ++--------------------+ +| current_time() | ++--------------------+ +| 15:30:00.123456789 | ++--------------------+ +```"# )] #[derive(Debug, PartialEq, Eq, Hash)] pub struct CurrentTimeFunc { diff --git a/datafusion/functions/src/datetime/date_part.rs b/datafusion/functions/src/datetime/date_part.rs index 24d6ff9cfb379..10275f5aaed2e 100644 --- a/datafusion/functions/src/datetime/date_part.rs +++ b/datafusion/functions/src/datetime/date_part.rs @@ -86,7 +86,21 @@ use datafusion_macros::user_doc; argument( name = "expression", description = "Time expression to operate on. Can be a constant, column, or function." - ) + ), + sql_example = r#"```sql +> SELECT date_part('year', '2024-05-01T00:00:00'); ++-----------------------------------------------------+ +| date_part(Utf8("year"),Utf8("2024-05-01T00:00:00")) | ++-----------------------------------------------------+ +| 2024 | ++-----------------------------------------------------+ +> SELECT extract(day FROM timestamp '2024-05-01T00:00:00'); ++----------------------------------------------------+ +| date_part(Utf8("DAY"),Utf8("2024-05-01T00:00:00")) | ++----------------------------------------------------+ +| 1 | ++----------------------------------------------------+ +```"# )] #[derive(Debug, PartialEq, Eq, Hash)] pub struct DatePartFunc { diff --git a/datafusion/functions/src/datetime/date_trunc.rs b/datafusion/functions/src/datetime/date_trunc.rs index c952f1f39282b..a97fd138ecb12 100644 --- a/datafusion/functions/src/datetime/date_trunc.rs +++ b/datafusion/functions/src/datetime/date_trunc.rs @@ -166,7 +166,21 @@ impl DateTruncGranularity { argument( name = "expression", description = "Timestamp or time expression to operate on. Can be a constant, column, or function." - ) + ), + sql_example = r#"```sql +> SELECT date_trunc('month', '2024-05-15T10:30:00'); ++-----------------------------------------------+ +| date_trunc(Utf8("month"),Utf8("2024-05-15T10:30:00")) | ++-----------------------------------------------+ +| 2024-05-01T00:00:00 | ++-----------------------------------------------+ +> SELECT date_trunc('hour', '2024-05-15T10:30:00'); ++----------------------------------------------+ +| date_trunc(Utf8("hour"),Utf8("2024-05-15T10:30:00")) | ++----------------------------------------------+ +| 2024-05-15T10:00:00 | ++----------------------------------------------+ +```"# )] #[derive(Debug, PartialEq, Eq, Hash)] pub struct DateTruncFunc { diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index 2e3cd453d29ba..82bb1251b2045 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -36,7 +36,24 @@ Returns the current timestamp in the system configured timezone (None by default The `now()` return value is determined at query time and will return the same timestamp, no matter when in the query plan the function executes. "#, - syntax_example = "now()" + syntax_example = "now()", + sql_example = r#"```sql +> SELECT now(); ++----------------------------------+ +| now() | ++----------------------------------+ +| 2024-12-23T06:30:00.123456789 | ++----------------------------------+ + +-- The timezone of the returned timestamp depends on the session time zone +> SET datafusion.execution.time_zone = 'America/New_York'; +> SELECT now(); ++--------------------------------------+ +| now() | ++--------------------------------------+ +| 2024-12-23T01:30:00.123456789-05:00 | ++--------------------------------------+ +```"# )] #[derive(Debug, PartialEq, Eq, Hash)] pub struct NowFunc { diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 022b0f9daec86..c303b43fc8844 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -2415,6 +2415,26 @@ current_date() SELECT current_date(); ``` +#### Example + +```sql +> SELECT current_date(); ++----------------+ +| current_date() | ++----------------+ +| 2024-12-23 | ++----------------+ + +-- The current date is based on the session time zone (UTC by default) +> SET datafusion.execution.time_zone = 'Asia/Tokyo'; +> SELECT current_date(); ++----------------+ +| current_date() | ++----------------+ +| 2024-12-24 | ++----------------+ +``` + #### Aliases - today @@ -2433,6 +2453,26 @@ current_time() SELECT current_time(); ``` +#### Example + +```sql +> SELECT current_time(); ++--------------------+ +| current_time() | ++--------------------+ +| 06:30:00.123456789 | ++--------------------+ + +-- The current time is based on the session time zone (UTC by default) +> SET datafusion.execution.time_zone = 'Asia/Tokyo'; +> SELECT current_time(); ++--------------------+ +| current_time() | ++--------------------+ +| 15:30:00.123456789 | ++--------------------+ +``` + ### `current_timestamp` _Alias of [now](#now)._ @@ -2537,6 +2577,23 @@ date_part(part, expression) - **expression**: Time expression to operate on. Can be a constant, column, or function. +#### Example + +```sql +> SELECT date_part('year', '2024-05-01T00:00:00'); ++-----------------------------------------------------+ +| date_part(Utf8("year"),Utf8("2024-05-01T00:00:00")) | ++-----------------------------------------------------+ +| 2024 | ++-----------------------------------------------------+ +> SELECT extract(day FROM timestamp '2024-05-01T00:00:00'); ++----------------------------------------------------+ +| date_part(Utf8("DAY"),Utf8("2024-05-01T00:00:00")) | ++----------------------------------------------------+ +| 1 | ++----------------------------------------------------+ +``` + #### Alternative Syntax ```sql @@ -2582,6 +2639,23 @@ date_trunc(precision, expression) - **expression**: Timestamp or time expression to operate on. Can be a constant, column, or function. +#### Example + +```sql +> SELECT date_trunc('month', '2024-05-15T10:30:00'); ++-----------------------------------------------+ +| date_trunc(Utf8("month"),Utf8("2024-05-15T10:30:00")) | ++-----------------------------------------------+ +| 2024-05-01T00:00:00 | ++-----------------------------------------------+ +> SELECT date_trunc('hour', '2024-05-15T10:30:00'); ++----------------------------------------------+ +| date_trunc(Utf8("hour"),Utf8("2024-05-15T10:30:00")) | ++----------------------------------------------+ +| 2024-05-15T10:00:00 | ++----------------------------------------------+ +``` + #### Aliases - datetrunc @@ -2694,6 +2768,26 @@ The `now()` return value is determined at query time and will return the same ti now() ``` +#### Example + +```sql +> SELECT now(); ++----------------------------------+ +| now() | ++----------------------------------+ +| 2024-12-23T06:30:00.123456789 | ++----------------------------------+ + +-- The timezone of the returned timestamp depends on the session time zone +> SET datafusion.execution.time_zone = 'America/New_York'; +> SELECT now(); ++--------------------------------------+ +| now() | ++--------------------------------------+ +| 2024-12-23T01:30:00.123456789-05:00 | ++--------------------------------------+ +``` + #### Aliases - current_timestamp From a76751187d075f7ab63d41d3045a259cfb900715 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Thu, 2 Apr 2026 17:21:13 -0400 Subject: [PATCH 2/4] feat: Complete basic `LATERAL JOIN` functionality (#21202) ## Which issue does this PR close? - Closes #10048. ## Rationale for this change Lateral joins are a commonly used SQL feature that allows the right-side join relation to access columns from the left-side of the join. Like correlated subqueries, two popular evaluation strategies are nested loops (re-evaluate the right-side of the join for each row of the left join input) and decorrelation (rewrite the right join input to remove the correlation, converting the lateral join into a standard join with the correlation predicates as join conditions). Decorrelation is typically much faster because the right side is evaluated once rather than re-executed for every row of the left input. Previously, DataFusion had some support for evaluating lateral joins via decorrelation, but it was not functional. This PR fixes and extends the existing code to make basic lateral joins functional, although several notable TODOs remain. This PR also adds a suite of SLT tests for lateral joins (derived from the DuckDB and Postgres tests), covering both implemented and to-be-implemented behavior. Remaining extensions: * LATERAL subqueries with HAVING clauses (#21198) * LEFT JOIN LATERAL (#21199) * LATERAL subqueries with outer relation references outside the WHERE clause (#21201) ## What changes are included in this PR? * Match query structure properly (unwrap `SubqueryAlias`) so that lateral subqueries are recognized properly, even if they have aliases * Handle nested LATERAL clauses; each LATERAL can only reference sibling outer relations * Properly handle "the count bug", following similar logic to what we do for this case with correlated subqueries * Remove a `todo!` panic in the physical planner if a `Subquery` node is seen; these just represent a subquery structure we aren't able to decorrelate yet * Properly raise an error and bail out for LATERAL subqueries with HAVING clauses * Add SLT test suite for lateral joins (~33 queries), based in part on DuckDB and Postgres test suites * Update expected EXPLAIN output in various places * Add docs for `LATERAL JOIN` ## Are these changes tested? Yes; new tests added. I ran the test suite against DuckDB and confirmed that we get the same results under DuckDB, except for cases where DuckDB supports a class of lateral joins we don't support yet. ## Are there any user-facing changes? Yes; lateral joins now work for a wide swath of useful scenarios. --- datafusion/core/src/physical_planner.rs | 9 +- datafusion/optimizer/src/decorrelate.rs | 6 + .../optimizer/src/decorrelate_lateral_join.rs | 243 ++++++-- datafusion/sqllogictest/test_files/joins.slt | 83 +-- .../sqllogictest/test_files/lateral_join.slt | 576 ++++++++++++++++++ docs/source/user-guide/sql/select.md | 90 ++- docs/source/user-guide/sql/subqueries.md | 2 + 7 files changed, 915 insertions(+), 94 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/lateral_join.slt diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e25969903521c..683a16af6cb4e 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1198,7 +1198,14 @@ impl DefaultPhysicalPlanner { let new_sort = SortExec::new(ordering, physical_input).with_fetch(*fetch); Arc::new(new_sort) } - LogicalPlan::Subquery(_) => todo!(), + // The optimizer's decorrelation passes remove Subquery nodes + // for supported patterns. This error is hit for correlated + // patterns that the optimizer cannot (yet) decorrelate. + LogicalPlan::Subquery(_) => { + return not_impl_err!( + "Physical plan does not support undecorrelated Subquery" + ); + } LogicalPlan::SubqueryAlias(_) => children.one()?, LogicalPlan::Limit(limit) => { let input = children.one()?; diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index 08839b49ef4b0..2a71205c64c8b 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -137,6 +137,12 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { fn f_down(&mut self, plan: LogicalPlan) -> Result> { match plan { LogicalPlan::Filter(_) => Ok(Transformed::no(plan)), + // Subquery nodes are scope boundaries for correlation. A nested + // Subquery's outer references belong to a different decorrelation + // level and must not be pulled up into the current scope. + LogicalPlan::Subquery(_) => { + Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump)) + } LogicalPlan::Union(_) | LogicalPlan::Sort(_) | LogicalPlan::Extension(_) => { let plan_hold_outer = !plan.all_out_ref_exprs().is_empty(); if plan_hold_outer { diff --git a/datafusion/optimizer/src/decorrelate_lateral_join.rs b/datafusion/optimizer/src/decorrelate_lateral_join.rs index a8c751ff46288..b9c9fabb8efb2 100644 --- a/datafusion/optimizer/src/decorrelate_lateral_join.rs +++ b/datafusion/optimizer/src/decorrelate_lateral_join.rs @@ -17,20 +17,21 @@ //! [`DecorrelateLateralJoin`] decorrelates logical plans produced by lateral joins. -use std::collections::BTreeSet; +use std::sync::Arc; -use crate::decorrelate::PullUpCorrelatedExpr; +use crate::decorrelate::{PullUpCorrelatedExpr, UN_MATCHED_ROW_INDICATOR}; use crate::optimizer::ApplyOrder; +use crate::utils::evaluates_to_null; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_expr::{Join, lit}; +use datafusion_expr::{Expr, Join, expr}; -use datafusion_common::Result; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; -use datafusion_expr::logical_plan::JoinType; +use datafusion_common::{Column, DFSchema, Result, TableReference}; +use datafusion_expr::logical_plan::{JoinType, Subquery}; use datafusion_expr::utils::conjunction; -use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; +use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, SubqueryAlias}; /// Optimizer rule for rewriting lateral joins to joins #[derive(Default, Debug)] @@ -70,74 +71,206 @@ impl OptimizerRule for DecorrelateLateralJoin { } } -// Build the decorrelated join based on the original lateral join query. For now, we only support cross/inner -// lateral joins. +// Build the decorrelated join based on the original lateral join query. For +// now, we only support cross/inner lateral joins. fn rewrite_internal(join: Join) -> Result> { + // TODO: Support outer joins + // if join.join_type != JoinType::Inner { return Ok(Transformed::no(LogicalPlan::Join(join))); } - match join.right.apply_with_subqueries(|p| { - // TODO: support outer joins - if p.contains_outer_reference() { - Ok(TreeNodeRecursion::Stop) - } else { - Ok(TreeNodeRecursion::Continue) - } - })? { - TreeNodeRecursion::Stop => {} - TreeNodeRecursion::Continue => { - // The left side contains outer references, we need to decorrelate it. - return Ok(Transformed::new( - LogicalPlan::Join(join), - false, - TreeNodeRecursion::Jump, - )); - } - TreeNodeRecursion::Jump => { - unreachable!("") - } - } - - let LogicalPlan::Subquery(subquery) = join.right.as_ref() else { + // The right side is wrapped in a Subquery node when it contains outer + // references. Quickly skip joins that don't have this structure. + let Some((subquery, alias)) = extract_lateral_subquery(join.right.as_ref()) else { return Ok(Transformed::no(LogicalPlan::Join(join))); }; - if join.join_type != JoinType::Inner { + // If the subquery has no outer references, there is nothing to decorrelate. + // A LATERAL with no outer references is just a cross join. + let has_outer_refs = matches!( + subquery.subquery.apply_with_subqueries(|p| { + if p.contains_outer_reference() { + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } + })?, + TreeNodeRecursion::Stop + ); + if !has_outer_refs { return Ok(Transformed::no(LogicalPlan::Join(join))); } + let subquery_plan = subquery.subquery.as_ref(); + let original_join_filter = join.filter.clone(); + + // Walk the subquery plan bottom-up, extracting correlated filter + // predicates into join conditions and converting scalar aggregates + // into group-by aggregates keyed on the correlation columns. let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true); let rewritten_subquery = subquery_plan.clone().rewrite(&mut pull_up).data()?; if !pull_up.can_pull_up { return Ok(Transformed::no(LogicalPlan::Join(join))); } - let mut all_correlated_cols = BTreeSet::new(); - pull_up - .correlated_subquery_cols_map - .values() - .for_each(|cols| all_correlated_cols.extend(cols.clone())); - let join_filter_opt = conjunction(pull_up.join_filters); - let join_filter = match join_filter_opt { - Some(join_filter) => join_filter, - None => lit(true), + // TODO: support HAVING in lateral subqueries. + // + if pull_up.pull_up_having_expr.is_some() { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + + // We apply the correlation predicates (extracted from the subquery's WHERE) + // as the ON clause of the rewritten join. The original ON clause is applied + // as a post-join predicate. Semantically, this is important when the join + // is rewritten as a left join; we only want outer join semantics for the + // correlation predicates (which is required for "count bug" handling), not + // the original join predicates. + let correlation_filter = conjunction(pull_up.join_filters); + + // Look up each aggregate's default value on empty input (e.g., COUNT → 0, + // SUM → NULL). This must happen before wrapping in SubqueryAlias, because + // the map is keyed by LogicalPlan and wrapping changes the plan. + let collected_count_expr_map = pull_up + .collected_count_expr_map + .get(&rewritten_subquery) + .cloned(); + + // Re-wrap in SubqueryAlias if the original had one, preserving the alias name. + // The SubqueryAlias re-qualifies all columns with the alias, so we must also + // rewrite column references in both the correlation and ON-clause filters. + let (right_plan, correlation_filter, original_join_filter) = + if let Some(ref alias) = alias { + let inner_schema = Arc::clone(rewritten_subquery.schema()); + let right = LogicalPlan::SubqueryAlias(SubqueryAlias::try_new( + Arc::new(rewritten_subquery), + alias.clone(), + )?); + let corr = correlation_filter + .map(|f| requalify_filter(f, &inner_schema, alias)) + .transpose()?; + let on = original_join_filter + .map(|f| requalify_filter(f, &inner_schema, alias)) + .transpose()?; + (right, corr, on) + } else { + (rewritten_subquery, correlation_filter, original_join_filter) + }; + + // Use a left join when a scalar aggregation was pulled up (preserves + // outer rows with no matches), otherwise keep inner join. + // SELECT * FROM t0, LATERAL (SELECT sum(v1) FROM t1 WHERE t0.v0 = t1.v0); → left join + // SELECT * FROM t0, LATERAL (SELECT * FROM t1 WHERE t0.v0 = t1.v0); → inner join + let join_type = if pull_up.pulled_up_scalar_agg { + JoinType::Left + } else { + JoinType::Inner }; - // -- inner join but the right side always has one row, we need to rewrite it to a left join - // SELECT * FROM t0, LATERAL (SELECT sum(v1) FROM t1 WHERE t0.v0 = t1.v0); - // -- inner join but the right side number of rows is related to the filter (join) condition, so keep inner join. - // SELECT * FROM t0, LATERAL (SELECT * FROM t1 WHERE t0.v0 = t1.v0); + let left_field_count = join.left.schema().fields().len(); let new_plan = LogicalPlanBuilder::from(join.left) - .join_on( - rewritten_subquery, - if pull_up.pulled_up_scalar_agg { - JoinType::Left - } else { - JoinType::Inner - }, - Some(join_filter), - )? + .join_on(right_plan, join_type, correlation_filter)? .build()?; - // TODO: handle count(*) bug + + // Handle the count bug: after a left join, unmatched outer rows get NULLs + // for all right-side columns. But COUNT(*) over an empty group should + // return 0, not NULL. Add a projection that wraps affected expressions: + // CASE WHEN __always_true IS NULL THEN ELSE END + let new_plan = if let Some(expr_map) = collected_count_expr_map { + let join_schema = new_plan.schema(); + let alias_qualifier = alias.as_ref(); + let mut proj_exprs: Vec = vec![]; + + for (i, (qualifier, field)) in join_schema.iter().enumerate() { + let col = Expr::Column(Column::new(qualifier.cloned(), field.name())); + + // Only compensate right-side (subquery) fields. Left-side fields + // may share a name with an aggregate alias but must not be wrapped. + let name = field.name(); + if i >= left_field_count + && let Some(default_value) = expr_map.get(name.as_str()) + && !evaluates_to_null(default_value.clone(), default_value.column_refs())? + { + // Column whose aggregate doesn't naturally return NULL + // on empty input (e.g., COUNT returns 0). Wrap it. + let indicator_col = + Column::new(alias_qualifier.cloned(), UN_MATCHED_ROW_INDICATOR); + let case_expr = Expr::Case(expr::Case { + expr: None, + when_then_expr: vec![( + Box::new(Expr::IsNull(Box::new(Expr::Column(indicator_col)))), + Box::new(default_value.clone()), + )], + else_expr: Some(Box::new(col)), + }); + proj_exprs.push(Expr::Alias(expr::Alias { + expr: Box::new(case_expr), + relation: qualifier.cloned(), + name: name.to_string(), + metadata: None, + })); + continue; + } + proj_exprs.push(col); + } + + LogicalPlanBuilder::from(new_plan) + .project(proj_exprs)? + .build()? + } else { + new_plan + }; + + // Apply the original ON clause as a post-join filter. + let new_plan = if let Some(on_filter) = original_join_filter { + LogicalPlanBuilder::from(new_plan) + .filter(on_filter)? + .build()? + } else { + new_plan + }; + Ok(Transformed::new(new_plan, true, TreeNodeRecursion::Jump)) } + +/// Extract the Subquery and optional alias from a lateral join's right side. +fn extract_lateral_subquery( + plan: &LogicalPlan, +) -> Option<(Subquery, Option)> { + match plan { + LogicalPlan::Subquery(sq) => Some((sq.clone(), None)), + LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { + if let LogicalPlan::Subquery(sq) = input.as_ref() { + Some((sq.clone(), Some(alias.clone()))) + } else { + None + } + } + _ => None, + } +} + +/// Rewrite column references in a join filter expression so that columns +/// belonging to the inner (right) side use the SubqueryAlias qualifier. +/// +/// The `PullUpCorrelatedExpr` pass extracts join filters with the inner +/// columns qualified by their original table names (e.g., `t2.t1_id`). +/// When the inner plan is wrapped in a `SubqueryAlias("sub")`, those +/// columns are re-qualified as `sub.t1_id`. This function applies the +/// same requalification to the filter so it matches the aliased schema. +fn requalify_filter( + filter: Expr, + inner_schema: &DFSchema, + alias: &TableReference, +) -> Result { + filter + .transform(|expr| { + if let Expr::Column(col) = &expr + && inner_schema.has_column(col) + { + let new_col = Column::new(Some(alias.clone()), col.name.clone()); + return Ok(Transformed::yes(Expr::Column(new_col))); + } + Ok(Transformed::no(expr)) + }) + .data() +} diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index fd9a26721ac64..0c14958f1a87c 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4020,13 +4020,12 @@ logical_plan 02)--SubqueryAlias: t1 03)----TableScan: join_t1 projection=[t1_id, t1_name] 04)--SubqueryAlias: series -05)----Subquery: -06)------Projection: UNNEST(generate_series(Int64(1),outer_ref(t1.t1_int))) AS i -07)--------Subquery: -08)----------Projection: __unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)),depth=1) AS UNNEST(generate_series(Int64(1),outer_ref(t1.t1_int))) -09)------------Unnest: lists[__unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))|depth=1] structs[] -10)--------------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int) AS Int64)) AS __unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int))) -11)----------------EmptyRelation: rows=1 +05)----Projection: UNNEST(generate_series(Int64(1),outer_ref(t1.t1_int))) AS i +06)------Subquery: +07)--------Projection: __unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)),depth=1) AS UNNEST(generate_series(Int64(1),outer_ref(t1.t1_int))) +08)----------Unnest: lists[__unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))|depth=1] structs[] +09)------------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int) AS Int64)) AS __unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int))) +10)--------------EmptyRelation: rows=1 physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "t1_int", data_type: UInt32, nullable: true }, Column { relation: Some(Bare { table: "t1" }), name: "t1_int" }) @@ -4045,13 +4044,12 @@ logical_plan 02)--SubqueryAlias: t2 03)----TableScan: join_t1 projection=[t1_id, t1_name] 04)--SubqueryAlias: series -05)----Subquery: -06)------Projection: UNNEST(generate_series(Int64(1),outer_ref(t2.t1_int))) AS i -07)--------Subquery: -08)----------Projection: __unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)),depth=1) AS UNNEST(generate_series(Int64(1),outer_ref(t2.t1_int))) -09)------------Unnest: lists[__unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))|depth=1] structs[] -10)--------------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int) AS Int64)) AS __unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int))) -11)----------------EmptyRelation: rows=1 +05)----Projection: UNNEST(generate_series(Int64(1),outer_ref(t2.t1_int))) AS i +06)------Subquery: +07)--------Projection: __unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)),depth=1) AS UNNEST(generate_series(Int64(1),outer_ref(t2.t1_int))) +08)----------Unnest: lists[__unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))|depth=1] structs[] +09)------------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int) AS Int64)) AS __unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int))) +10)--------------EmptyRelation: rows=1 physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "t1_int", data_type: UInt32, nullable: true }, Column { relation: Some(Bare { table: "t2" }), name: "t1_int" }) @@ -4635,31 +4633,40 @@ query TT explain SELECT j1_string, j2_string FROM j1, LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2; ---- logical_plan -01)Cross Join: -02)--TableScan: j1 projection=[j1_string] -03)--SubqueryAlias: j2 -04)----Projection: j2.j2_string -05)------Subquery: -06)--------Filter: outer_ref(j1.j1_id) < j2.j2_id -07)----------TableScan: j2 projection=[j2_string, j2_id] -physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" }) +01)Projection: j1.j1_string, j2.j2_string +02)--Inner Join: Filter: j1.j1_id < j2.j2_id +03)----TableScan: j1 projection=[j1_string, j1_id] +04)----SubqueryAlias: j2 +05)------TableScan: j2 projection=[j2_string, j2_id] +physical_plan +01)NestedLoopJoinExec: join_type=Inner, filter=j1_id@0 < j2_id@1, projection=[j1_string@0, j2_string@2] +02)--DataSourceExec: partitions=1, partition_sizes=[0] +03)--DataSourceExec: partitions=1, partition_sizes=[0] query TT explain SELECT * FROM j1 JOIN (j2 JOIN j3 ON(j2_id = j3_id - 2)) ON(j1_id = j2_id), LATERAL (SELECT * FROM j3 WHERE j3_string = j2_string) as j4 ---- logical_plan -01)Cross Join: +01)Inner Join: j2.j2_string = j4.j3_string 02)--Inner Join: CAST(j2.j2_id AS Int64) = CAST(j3.j3_id AS Int64) - Int64(2) 03)----Inner Join: j1.j1_id = j2.j2_id 04)------TableScan: j1 projection=[j1_string, j1_id] 05)------TableScan: j2 projection=[j2_string, j2_id] 06)----TableScan: j3 projection=[j3_string, j3_id] 07)--SubqueryAlias: j4 -08)----Subquery: -09)------Filter: j3.j3_string = outer_ref(j2.j2_string) -10)--------TableScan: j3 projection=[j3_string, j3_id] -physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j2_string", data_type: Utf8View, nullable: true }, Column { relation: Some(Bare { table: "j2" }), name: "j2_string" }) +08)----TableScan: j3 projection=[j3_string, j3_id] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(j2_string@2, j3_string@0)] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(CAST(j2.j2_id AS Int64)@4, j3.j3_id - Int64(2)@2)], projection=[j1_string@0, j1_id@1, j2_string@2, j2_id@3, j3_string@5, j3_id@6] +03)----ProjectionExec: expr=[j1_string@0 as j1_string, j1_id@1 as j1_id, j2_string@2 as j2_string, j2_id@3 as j2_id, CAST(j2_id@3 AS Int64) as CAST(j2.j2_id AS Int64)] +04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(j1_id@1, j2_id@1)] +05)--------DataSourceExec: partitions=1, partition_sizes=[0] +06)--------DataSourceExec: partitions=1, partition_sizes=[0] +07)----ProjectionExec: expr=[j3_string@0 as j3_string, j3_id@1 as j3_id, CAST(j3_id@1 AS Int64) - 2 as j3.j3_id - Int64(2)] +08)------DataSourceExec: partitions=1, partition_sizes=[0] +09)--DataSourceExec: partitions=1, partition_sizes=[0] +# Nested LATERAL: each level only references siblings, no skip-level correlation. query TT explain SELECT * FROM j1, LATERAL (SELECT * FROM j1, LATERAL (SELECT * FROM j2 WHERE j1_id = j2_id) as j2) as j2; ---- @@ -4667,14 +4674,16 @@ logical_plan 01)Cross Join: 02)--TableScan: j1 projection=[j1_string, j1_id] 03)--SubqueryAlias: j2 -04)----Subquery: -05)------Cross Join: -06)--------TableScan: j1 projection=[j1_string, j1_id] -07)--------SubqueryAlias: j2 -08)----------Subquery: -09)------------Filter: outer_ref(j1.j1_id) = j2.j2_id -10)--------------TableScan: j2 projection=[j2_string, j2_id] -physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "j1_id", data_type: Int32, nullable: true }, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" }) +04)----Inner Join: j1.j1_id = j2.j2_id +05)------TableScan: j1 projection=[j1_string, j1_id] +06)------SubqueryAlias: j2 +07)--------TableScan: j2 projection=[j2_string, j2_id] +physical_plan +01)CrossJoinExec +02)--DataSourceExec: partitions=1, partition_sizes=[0] +03)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(j1_id@1, j2_id@1)] +04)----DataSourceExec: partitions=1, partition_sizes=[0] +05)----DataSourceExec: partitions=1, partition_sizes=[0] query TT explain SELECT j1_string, j2_string FROM j1 LEFT JOIN LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2 ON(true); @@ -5425,7 +5434,7 @@ statement count 0 DROP TABLE issue_20437_large; # Test count(*) with right semi/anti joins returns correct row counts -# issue: https://github.com/apache/datafusion/issues/20669 +# issue: https://github.com/apache/datafusion/issues/20669 statement ok CREATE TABLE t1 (k INT, v INT); @@ -5465,7 +5474,7 @@ FROM t; statement ok reset datafusion.execution.batch_size; -# The SLT runner sets `target_partitions` to 4 instead of using the default, so +# The SLT runner sets `target_partitions` to 4 instead of using the default, so # reset it explicitly. statement ok set datafusion.execution.target_partitions = 4; diff --git a/datafusion/sqllogictest/test_files/lateral_join.slt b/datafusion/sqllogictest/test_files/lateral_join.slt new file mode 100644 index 0000000000000..ddc2fe586029a --- /dev/null +++ b/datafusion/sqllogictest/test_files/lateral_join.slt @@ -0,0 +1,576 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# LATERAL join tests for DataFusion. +# +# Many tests adapted from DuckDB (MIT license) test/sql/subquery/lateral/ +# and PostgreSQL (BSD license) regression tests. + +########################################################### +# Setup +########################################################### + +statement ok +CREATE TABLE integers(i INTEGER) AS VALUES (1), (2), (3), (NULL); + +statement ok +CREATE TABLE students(id INTEGER, name VARCHAR) AS VALUES + (1, 'Mark'), + (2, 'Dirk'), + (3, 'Dana'); + +statement ok +CREATE TABLE exams(sid INTEGER, course VARCHAR, grade INTEGER) AS VALUES + (1, 'Database Systems', 10), + (1, 'Graphics', 9), + (2, 'Database Systems', 7), + (2, 'Graphics', 7); + +statement ok +CREATE TABLE t1(id INTEGER, val TEXT) AS VALUES (1, 'a'), (2, 'b'), (3, 'c'); + +statement ok +CREATE TABLE t2(id INTEGER, t1_id INTEGER, data TEXT) AS VALUES + (10, 1, 'x'), + (20, 1, 'y'), + (30, 2, 'z'); + +statement ok +CREATE TABLE t3(id INTEGER, t2_id INTEGER) AS VALUES (100, 10), (200, 20), (300, 30); + +########################################################### +# Section 1: Basic LATERAL with correlated WHERE +# +# The current decorrelation path handles outer references +# that appear in WHERE clauses. These tests exercise the +# three syntax variants: comma+LATERAL, JOIN LATERAL ON, +# and CROSS JOIN LATERAL. +########################################################### + +# Comma + LATERAL keyword +query ITT +SELECT t1.id, t1.val, sub.data +FROM t1, LATERAL (SELECT t2.data FROM t2 WHERE t2.t1_id = t1.id) AS sub +ORDER BY t1.id, sub.data; +---- +1 a x +1 a y +2 b z + +# JOIN LATERAL ... ON true +query ITT +SELECT t1.id, t1.val, sub.data +FROM t1 JOIN LATERAL (SELECT t2.data FROM t2 WHERE t2.t1_id = t1.id) AS sub ON true +ORDER BY t1.id, sub.data; +---- +1 a x +1 a y +2 b z + +# CROSS JOIN LATERAL +query ITT +SELECT t1.id, t1.val, sub.data +FROM t1 CROSS JOIN LATERAL (SELECT t2.data FROM t2 WHERE t2.t1_id = t1.id) AS sub +ORDER BY t1.id, sub.data; +---- +1 a x +1 a y +2 b z + +# Correlation with inequality predicate +query II +SELECT i1.i, sub.j +FROM integers i1, LATERAL (SELECT i2.i AS j FROM integers i2 WHERE i2.i > i1.i) sub +ORDER BY i1.i, sub.j; +---- +1 2 +1 3 +2 3 + +# Unmatched outer rows are excluded (inner join semantics) +# t1.id=3 has no matching t2 rows, so it does not appear +query IT +SELECT t1.id, sub.data +FROM t1, LATERAL (SELECT t2.data FROM t2 WHERE t2.t1_id = t1.id) AS sub +ORDER BY t1.id; +---- +1 x +1 y +2 z + +# Lateral with no outer reference (degenerates to cross join) +query II +SELECT * FROM (SELECT 42) t(a), LATERAL (SELECT 100) t2(b); +---- +42 100 + +# WHERE-only chained lateral: filter references multiple prior tables +query III +SELECT * FROM + (SELECT 42) t1(i), + (SELECT 22) t2(j), + LATERAL (SELECT 1 AS l WHERE i + j = 64) t3; +---- +42 22 1 + +# WHERE filter that eliminates all rows +query III +SELECT * FROM + (SELECT 64) t1(i), + (SELECT 22) t2(j), + LATERAL (SELECT 1 AS l WHERE i + j = 64) t3; +---- + +# Multiple correlation predicates +query ITI +SELECT t1.id, t1.val, sub.t2_id +FROM t1, LATERAL ( + SELECT t2.id AS t2_id FROM t2 WHERE t2.t1_id = t1.id AND t2.id > t1.id * 10 +) sub +ORDER BY t1.id, sub.t2_id; +---- +1 a 20 +2 b 30 + +# JOIN LATERAL with non-trivial ON clause (no aggregate) +query ITT +SELECT t1.id, t1.val, sub.data +FROM t1 JOIN LATERAL (SELECT t2.data FROM t2 WHERE t2.t1_id = t1.id) sub ON sub.data = 'x' +ORDER BY t1.id; +---- +1 a x + +# Empty outer table produces no rows +query II +SELECT t1.id, sub.cnt +FROM (SELECT 1 AS id WHERE false) t1, + LATERAL (SELECT count(*) AS cnt FROM t2 WHERE t2.t1_id = t1.id) sub; +---- + +########################################################### +# Section 2: LATERAL with aggregation and the COUNT bug +########################################################### + +# Students and total grades (SUM, with JOIN LATERAL) +query TI rowsort +SELECT name, total +FROM students +JOIN LATERAL (SELECT SUM(grade) AS total FROM exams WHERE exams.sid = students.id) grades ON true; +---- +Dana NULL +Dirk 14 +Mark 19 + +# Same with comma + LATERAL +query TI rowsort +SELECT name, total +FROM students, LATERAL (SELECT SUM(grade) AS total FROM exams WHERE exams.sid = students.id) grades; +---- +Dana NULL +Dirk 14 +Mark 19 + +# COUNT(*) must return 0 (not NULL) for unmatched rows +query ITI +SELECT t1.id, t1.val, sub.cnt +FROM t1, LATERAL (SELECT count(*) AS cnt FROM t2 WHERE t2.t1_id = t1.id) AS sub +ORDER BY t1.id; +---- +1 a 2 +2 b 1 +3 c 0 + +# SUM returns NULL for empty groups +query ITI +SELECT t1.id, t1.val, sub.total +FROM t1, LATERAL (SELECT sum(t2.id) AS total FROM t2 WHERE t2.t1_id = t1.id) AS sub +ORDER BY t1.id; +---- +1 a 30 +2 b 30 +3 c NULL + +# AVG returns NULL for empty groups +query ITR +SELECT t1.id, t1.val, sub.avg_id +FROM t1, LATERAL (SELECT avg(t2.id) AS avg_id FROM t2 WHERE t2.t1_id = t1.id) AS sub +ORDER BY t1.id; +---- +1 a 15 +2 b 30 +3 c NULL + +# MIN returns NULL for empty groups +query ITI +SELECT t1.id, t1.val, sub.min_id +FROM t1, LATERAL (SELECT min(t2.id) AS min_id FROM t2 WHERE t2.t1_id = t1.id) AS sub +ORDER BY t1.id; +---- +1 a 10 +2 b 30 +3 c NULL + +# MAX returns NULL for empty groups +query ITI +SELECT t1.id, t1.val, sub.max_id +FROM t1, LATERAL (SELECT max(t2.id) AS max_id FROM t2 WHERE t2.t1_id = t1.id) AS sub +ORDER BY t1.id; +---- +1 a 20 +2 b 30 +3 c NULL + +# COUNT bug compensation with type-changing expressions: the default +# value must have the correct type even after CAST or arithmetic. +query ITT +SELECT t1.id, t1.val, sub.cnt +FROM t1, LATERAL ( + SELECT CAST(count(*) AS TEXT) AS cnt FROM t2 WHERE t2.t1_id = t1.id +) AS sub +ORDER BY t1.id; +---- +1 a 2 +2 b 1 +3 c 0 + +query ITR +SELECT t1.id, t1.val, sub.result +FROM t1, LATERAL ( + SELECT count(*) + 0.5 AS result FROM t2 WHERE t2.t1_id = t1.id +) AS sub +ORDER BY t1.id; +---- +1 a 2.5 +2 b 1.5 +3 c 0.5 + +# Multiple aggregates: COUNT should be 0, SUM should be NULL +query ITII +SELECT t1.id, t1.val, sub.cnt, sub.total +FROM t1, LATERAL ( + SELECT count(*) AS cnt, sum(t2.id) AS total + FROM t2 WHERE t2.t1_id = t1.id +) AS sub +ORDER BY t1.id; +---- +1 a 2 30 +2 b 1 30 +3 c 0 NULL + +# Name collision: left side has 'id', right side aliases count(*) AS 'id'. +# Count bug compensation must only target right-side fields. +query II +SELECT t1.id, sub.id AS cnt +FROM t1, LATERAL (SELECT count(*) AS id FROM t2 WHERE t2.t1_id = t1.id) sub +ORDER BY t1.id; +---- +1 2 +2 1 +3 0 + +# Filter on count: t1.id=3 has cnt=0, satisfies cnt < 5 +query ITI +SELECT t1.id, t1.val, sub.cnt +FROM t1, LATERAL (SELECT count(*) AS cnt FROM t2 WHERE t2.t1_id = t1.id) AS sub +WHERE sub.cnt < 5 +ORDER BY t1.id; +---- +1 a 2 +2 b 1 +3 c 0 + +# JOIN LATERAL with non-trivial ON clause and scalar aggregate: the ON +# condition must be applied after the count bug projection, not as part +# of the left join condition (which would incorrectly preserve non-matching +# outer rows with count-bug defaults). +query ITI +SELECT t1.id, t1.val, sub.cnt +FROM t1 JOIN LATERAL ( + SELECT count(*) AS cnt FROM t2 WHERE t2.t1_id = t1.id +) sub ON sub.cnt > 1 +ORDER BY t1.id; +---- +1 a 2 + +# Validate: equivalent scalar subquery produces same result +query ITI +SELECT t1.id, t1.val, + (SELECT count(*) FROM t2 WHERE t2.t1_id = t1.id) AS cnt +FROM t1 +ORDER BY t1.id; +---- +1 a 2 +2 b 1 +3 c 0 + +# Validate: equivalent scalar subquery for SUM +query ITI +SELECT t1.id, t1.val, + (SELECT sum(t2.id) FROM t2 WHERE t2.t1_id = t1.id) AS total +FROM t1 +ORDER BY t1.id; +---- +1 a 30 +2 b 30 +3 c NULL + +# JOIN LATERAL ON + outer WHERE: both filters applied correctly +query ITI +SELECT t1.id, t1.val, sub.cnt +FROM t1 JOIN LATERAL ( + SELECT count(*) AS cnt FROM t2 WHERE t2.t1_id = t1.id +) sub ON sub.cnt > 0 +WHERE sub.cnt < 3 +ORDER BY t1.id; +---- +1 a 2 +2 b 1 + +# NULL in correlation column: NULL = NULL is unknown, so no match; +# count bug gives 0 for the unmatched group +query II +SELECT i1.i, sub.cnt +FROM integers i1, LATERAL ( + SELECT count(*) AS cnt FROM integers i2 WHERE i2.i = i1.i +) sub +ORDER BY i1.i; +---- +1 1 +2 1 +3 1 +NULL 0 + +# Lateral subquery with GROUP BY (not a scalar aggregate, so inner join) +query ITTI +SELECT t1.id, t1.val, sub.data, sub.cnt +FROM t1, LATERAL ( + SELECT t2.data, count(*) AS cnt FROM t2 + WHERE t2.t1_id = t1.id GROUP BY t2.data +) sub +ORDER BY t1.id, sub.data; +---- +1 a x 1 +1 a y 1 +2 b z 1 + +########################################################### +# Section 3: Nested LATERAL joins +########################################################### + +# Both levels correlate via WHERE with their sibling table +query III +SELECT t1.id AS t1_id, sub.t2_id, sub.t3_id +FROM t1, LATERAL ( + SELECT t2_sub.id AS t2_id, t3_sub.id AS t3_id + FROM t2 AS t2_sub, LATERAL ( + SELECT t3.id FROM t3 WHERE t3.t2_id = t2_sub.id + ) AS t3_sub + WHERE t2_sub.t1_id = t1.id +) AS sub +ORDER BY t1_id, t2_id, t3_id; +---- +1 10 100 +1 20 200 +2 30 300 + +# Nested lateral with aggregation at inner level only +query III +SELECT t1.id, sub.t2_id, sub.t3_cnt +FROM t1, LATERAL ( + SELECT t2_sub.id AS t2_id, t3_agg.t3_cnt + FROM t2 AS t2_sub, LATERAL ( + SELECT count(*) AS t3_cnt FROM t3 WHERE t3.t2_id = t2_sub.id + ) AS t3_agg + WHERE t2_sub.t1_id = t1.id +) AS sub +ORDER BY t1.id, sub.t2_id; +---- +1 10 1 +1 20 1 +2 30 1 + +########################################################### +# Section 4: Semantically invalid queries (permanent errors) +########################################################### + +# FULL/RIGHT JOIN LATERAL are invalid per the SQL standard: the right +# side cannot both reference and be independent of the left side. +statement error +SELECT * FROM integers FULL JOIN LATERAL (SELECT integers.i AS b) t ON (true); + +statement error +SELECT * FROM integers RIGHT JOIN LATERAL (SELECT integers.i AS b) t ON (true); + +# Aggregating directly over outer columns in LATERAL is invalid — the +# aggregate would need all rows of the outer table, but LATERAL evaluates +# per-row. Note: using an outer column *inside* an aggregate over an inner +# table (e.g., SUM(inner.x + outer.y)) is valid SQL — see Section 5. +statement error +SELECT * FROM integers, LATERAL (SELECT SUM(i)) t(s); + +statement error +SELECT * FROM integers, LATERAL (SELECT SUM(i) AS s) t; + +########################################################### +# Section 5: Currently unsupported patterns +# +# These are valid SQL that DataFusion cannot decorrelate yet. +# The primary limitation is that outer references must appear +# in WHERE clauses. Outer references in SELECT expressions, +# inside aggregate arguments, or in LEFT JOIN LATERAL are not +# yet supported. +########################################################### + +# --- Outer reference in SELECT expression (not WHERE) --- + +# Simplest case: SELECT references outer table column +statement error OuterReferenceColumn +SELECT * FROM (SELECT 42) t(a) CROSS JOIN LATERAL (SELECT t.a + 1) t2(b); + +# Outer ref in SELECT with multiple rows +statement error OuterReferenceColumn +SELECT * FROM (VALUES (42), (84)) t(a), LATERAL (SELECT t.a + 1) t2(b); + +# Outer ref inside aggregate expression argument +statement error OuterReferenceColumn +SELECT i1.i, sub.s +FROM integers i1, + LATERAL (SELECT SUM(i2.i + i1.i) AS s FROM integers i2) sub +ORDER BY i1.i; + +# Chained lateral with outer ref in SELECT +statement error OuterReferenceColumn +SELECT i1.i, sub1.j, sub2.k +FROM integers i1, + LATERAL (SELECT i1.i * 2 AS j) sub1, + LATERAL (SELECT sub1.j + 1 AS k) sub2 +ORDER BY i1.i; + +# --- LEFT JOIN LATERAL --- + +# LEFT JOIN preserving unmatched outer rows with filter +statement error OuterReferenceColumn +SELECT i1.i, sub.b +FROM integers i1 +LEFT JOIN LATERAL (SELECT i1.i AS b WHERE i1.i IN (1, 3)) sub ON (i1.i = sub.b) +ORDER BY i1.i; + +# LEFT JOIN where all rows match +statement error OuterReferenceColumn +SELECT i1.i, sub.b +FROM integers i1 +LEFT JOIN LATERAL (SELECT i1.i AS b) sub ON (i1.i = sub.b) +ORDER BY i1.i; + +# LEFT JOIN where no rows match (all NULLs on right) +statement error OuterReferenceColumn +SELECT i1.i, sub.b +FROM integers i1 +LEFT JOIN LATERAL (SELECT i1.i + 1 AS b) sub ON (i1.i = sub.b) +ORDER BY i1.i; + +# --- HAVING in lateral subquery --- +# https://github.com/apache/datafusion/issues/21198 + +# HAVING count(*) < 2 evaluates to true on an empty group (0 < 2), so +# PullUpCorrelatedExpr pulls it out. The lateral join code does not yet +# re-apply the pulled-up HAVING filter as a post-join predicate. +# Expected result once supported: only rows for t1.id=2 (cnt=1) and t1.id=3 (cnt=0). +statement error OuterReferenceColumn +SELECT t1.id, t1.val, sub.cnt +FROM t1, LATERAL ( + SELECT count(*) AS cnt FROM t2 WHERE t2.t1_id = t1.id HAVING count(*) < 2 +) AS sub +ORDER BY t1.id; + +# --- Implicit LATERAL (comma without LATERAL keyword) --- +# DuckDB auto-detects lateral correlation; DataFusion requires +# the explicit LATERAL keyword. + +statement error +SELECT name, total FROM students, + (SELECT SUM(grade) AS total FROM exams WHERE exams.sid = students.id) grades; + +########################################################### +# Section 6: EXPLAIN plan verification +########################################################### + +# Verify the COUNT bug fix: Left Join with CASE WHEN compensation +query TT +EXPLAIN SELECT t1.id, sub.cnt +FROM t1, LATERAL (SELECT count(*) AS cnt FROM t2 WHERE t2.t1_id = t1.id) AS sub +ORDER BY t1.id; +---- +logical_plan +01)Sort: t1.id ASC NULLS LAST +02)--Projection: t1.id, CASE WHEN sub.__always_true IS NULL THEN Int64(0) ELSE sub.cnt END AS cnt +03)----Left Join: t1.id = sub.t1_id +04)------TableScan: t1 projection=[id] +05)------SubqueryAlias: sub +06)--------Projection: count(Int64(1)) AS cnt, t2.t1_id, Boolean(true) AS __always_true +07)----------Aggregate: groupBy=[[t2.t1_id]], aggr=[[count(Int64(1))]] +08)------------TableScan: t2 projection=[t1_id] +physical_plan +01)SortPreservingMergeExec: [id@0 ASC NULLS LAST] +02)--SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----ProjectionExec: expr=[id@0 as id, CASE WHEN __always_true@2 IS NULL THEN 0 ELSE cnt@1 END as cnt] +04)------HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@0, t1_id@1)], projection=[id@0, cnt@1, __always_true@3] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] +06)--------ProjectionExec: expr=[count(Int64(1))@1 as cnt, t1_id@0 as t1_id, true as __always_true] +07)----------AggregateExec: mode=FinalPartitioned, gby=[t1_id@0 as t1_id], aggr=[count(Int64(1))] +08)------------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=1 +09)--------------AggregateExec: mode=Partial, gby=[t1_id@0 as t1_id], aggr=[count(Int64(1))] +10)----------------DataSourceExec: partitions=1, partition_sizes=[1] + +# Verify non-aggregate lateral decorrelates to inner join +query TT +EXPLAIN SELECT t1.id, sub.data +FROM t1, LATERAL (SELECT t2.data FROM t2 WHERE t2.t1_id = t1.id) AS sub; +---- +logical_plan +01)Projection: t1.id, sub.data +02)--Inner Join: t1.id = sub.t1_id +03)----TableScan: t1 projection=[id] +04)----SubqueryAlias: sub +05)------Projection: t2.data, t2.t1_id +06)--------TableScan: t2 projection=[t1_id, data] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, t1_id@1)], projection=[id@0, data@1] +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +########################################################### +# Cleanup +########################################################### + +statement ok +DROP TABLE integers; + +statement ok +DROP TABLE students; + +statement ok +DROP TABLE exams; + +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; + +statement ok +DROP TABLE t3; diff --git a/docs/source/user-guide/sql/select.md b/docs/source/user-guide/sql/select.md index baacf432f5fde..7ab6ed1aea05d 100644 --- a/docs/source/user-guide/sql/select.md +++ b/docs/source/user-guide/sql/select.md @@ -86,7 +86,7 @@ SELECT a FROM table WHERE a > 10 ## JOIN clause -DataFusion supports `INNER JOIN`, `LEFT OUTER JOIN`, `RIGHT OUTER JOIN`, `FULL OUTER JOIN`, `NATURAL JOIN`, `CROSS JOIN`, `LEFT SEMI JOIN`, `RIGHT SEMI JOIN`, `LEFT ANTI JOIN`, and `RIGHT ANTI JOIN`. +DataFusion supports `INNER JOIN`, `LEFT OUTER JOIN`, `RIGHT OUTER JOIN`, `FULL OUTER JOIN`, `NATURAL JOIN`, `CROSS JOIN`, `LEFT SEMI JOIN`, `RIGHT SEMI JOIN`, `LEFT ANTI JOIN`, `RIGHT ANTI JOIN`, and `LATERAL JOIN`. The following examples are based on this table: @@ -238,6 +238,94 @@ SELECT * FROM x RIGHT ANTI JOIN x y ON x.column_1 = y.column_1; +----------+----------+ ``` +### LATERAL JOIN + +A `LATERAL JOIN` allows the right-hand side of a join to reference columns from +the left-hand side. Conceptually, the subquery on the right is evaluated once +for each row of the left-hand table, which makes it possible to "parameterize" a +subquery with values from preceding tables. + +The `LATERAL` keyword is required; DataFusion does not implicitly detect +correlation in `FROM` clause subqueries. + +The following examples use these tables: + +```sql +CREATE TABLE departments(id INT, name TEXT) AS VALUES (1, 'HR'), (2, 'Eng'), (3, 'Sales'); +CREATE TABLE employees(id INT, dept_id INT, name TEXT) AS VALUES + (10, 1, 'Alice'), (20, 1, 'Bob'), (30, 2, 'Carol'); +``` + +#### Comma syntax + +The most concise form places `LATERAL` after a comma in the `FROM` clause. +Rows from the left table that have no matching rows in the subquery are excluded +(inner join semantics). + +```sql +SELECT d.name AS dept, e.name AS emp +FROM departments d, LATERAL ( + SELECT employees.name FROM employees WHERE employees.dept_id = d.id +) AS e +ORDER BY dept, emp; ++------+-------+ +| dept | emp | ++------+-------+ +| Eng | Carol | +| HR | Alice | +| HR | Bob | ++------+-------+ +``` + +#### CROSS JOIN LATERAL + +Equivalent to the comma syntax above. + +```sql +SELECT d.name AS dept, e.name AS emp +FROM departments d +CROSS JOIN LATERAL ( + SELECT employees.name FROM employees WHERE employees.dept_id = d.id +) AS e +ORDER BY dept, emp; ++------+-------+ +| dept | emp | ++------+-------+ +| Eng | Carol | +| HR | Alice | +| HR | Bob | ++------+-------+ +``` + +#### JOIN LATERAL ... ON + +`JOIN LATERAL` with an `ON` clause applies the `ON` condition as an additional +filter after the lateral subquery is evaluated. + +```sql +SELECT d.name AS dept, sub.emp, sub.cnt +FROM departments d +JOIN LATERAL ( + SELECT count(*) AS cnt, min(employees.name) AS emp + FROM employees WHERE employees.dept_id = d.id +) AS sub ON sub.cnt > 0 +ORDER BY dept; ++------+-------+-----+ +| dept | emp | cnt | ++------+-------+-----+ +| Eng | Carol | 1 | +| HR | Alice | 2 | ++------+-------+-----+ +``` + +#### Limitations + +The following patterns are not yet supported: + +- `LEFT JOIN LATERAL` (lateral join with outer join semantics). +- Outer references in the `SELECT` list of the lateral subquery (e.g., `LATERAL (SELECT outer.col + 1)`). +- `HAVING` in lateral subqueries. + ## GROUP BY clause Example: diff --git a/docs/source/user-guide/sql/subqueries.md b/docs/source/user-guide/sql/subqueries.md index 7533bd49d553d..cba15f1bfd5c0 100644 --- a/docs/source/user-guide/sql/subqueries.md +++ b/docs/source/user-guide/sql/subqueries.md @@ -162,6 +162,8 @@ operated on by the outer query. SELECT expression1[, expression2, ..., expressionN] FROM () ``` +To reference columns from other tables in the same `FROM` clause, use [`LATERAL JOIN`](select.md#lateral-join). + ### Example The following query returns the average of maximum values per room. From e5358b7e535f199adb29c684fb4d8bef86d0975e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 14:23:48 -0700 Subject: [PATCH 3/4] chore(deps): bump object_store from 0.13.1 to 0.13.2 (#21275) Bumps [object_store](https://github.com/apache/arrow-rs-object-store) from 0.13.1 to 0.13.2.
Changelog

Sourced from object_store's changelog.

Historical Changelog

Commits
  • 7a65b75 chore: prepare 0.13.2 release (#671)
  • d22f7f9 fix: missing 5xx error body when retry exhausted (#618)
  • ebaee78 Support --xa-s3 suffix for S3 Express One Zone bucket access points (#663)
  • 7ded938 Replace Path::child with Path::join (#666)
  • bdcac43 feat: Add support for AWS_ENDPOINT_URL_S3 environment variable (#590)
  • ca15c63 Implement Clone for local and memory stores (#653)
  • 41d3242 docs: clarify Clone behavior (#656)
  • 6259202 Unify from_env behaviours (#652)
  • 66e640d Switch TokenCache to RWLock (#648)
  • 907653e docs: add examples to the aws docs where appropriate (#651)
  • Additional commits viewable in compare view

--------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Oleks V Co-authored-by: Andrew Lamb --- Cargo.lock | 14 ++++++++------ Cargo.toml | 2 +- .../core/src/datasource/physical_plan/parquet.rs | 2 +- datafusion/core/tests/sql/path_partition.rs | 2 +- datafusion/datasource/src/url.rs | 2 +- datafusion/datasource/src/write/demux.rs | 7 ++++--- 6 files changed, 16 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca723a061c1a6..4803d13bca9c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4281,16 +4281,18 @@ dependencies = [ [[package]] name = "object_store" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2858065e55c148d294a9f3aae3b0fa9458edadb41a108397094566f4e3c0dfb" +checksum = "622acbc9100d3c10e2ee15804b0caa40e55c933d5aa53814cd520805b7958a49" dependencies = [ "async-trait", "base64 0.22.1", "bytes", "chrono", "form_urlencoded", - "futures", + "futures-channel", + "futures-core", + "futures-util", "http 1.4.0", "http-body-util", "humantime", @@ -4300,7 +4302,7 @@ dependencies = [ "parking_lot", "percent-encoding", "quick-xml", - "rand 0.9.2", + "rand 0.10.0", "reqwest", "ring", "rustls-pki-types", @@ -4846,9 +4848,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.38.4" +version = "0.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b66c2058c55a409d601666cffe35f04333cf1013010882cec174a7467cd4e21c" +checksum = "958f21e8e7ceb5a1aa7fa87fab28e7c75976e0bfe7e23ff069e0a260f894067d" dependencies = [ "memchr", "serde", diff --git a/Cargo.toml b/Cargo.toml index c6f38e16b94e5..64673c025d299 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -170,7 +170,7 @@ liblzma = { version = "0.4.6", features = ["static"] } log = "^0.4" memchr = "2.8.0" num-traits = { version = "0.2" } -object_store = { version = "0.13.1", default-features = false } +object_store = { version = "0.13.2", default-features = false } parking_lot = "0.12" parquet = { version = "58.1.0", default-features = false, features = [ "arrow", diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 0d1b0be906a82..dd8c20628b43e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1704,7 +1704,7 @@ mod tests { let state = session_ctx.state(); let location = Path::from_filesystem_path(".") .unwrap() - .child("invalid.parquet"); + .join("invalid.parquet"); let partitioned_file = PartitionedFile::new_from_meta(ObjectMeta { location, diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index 1afab529f019c..2eff1c262f855 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -774,7 +774,7 @@ impl ObjectStore for MirroringObjectStore { }; if parts.next().is_some() { - common_prefixes.insert(prefix.child(common_prefix)); + common_prefixes.insert(prefix.clone().join(common_prefix)); } else { let object = ObjectMeta { location: k.clone(), diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index f155bacbd3e88..88f85eb456ebb 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -536,7 +536,7 @@ mod tests { let root = root.to_string_lossy(); let url = ListingTableUrl::parse(root).unwrap(); - let child = url.prefix.child("partition").child("file"); + let child = url.prefix.clone().join("partition").join("file"); let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect(); assert_eq!(prefix, vec!["partition", "file"]); diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index 1648624747af2..acc6435acf371 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -260,7 +260,8 @@ fn generate_file_path( if !single_file_output { base_output_path .prefix() - .child(format!("{write_id}_{part_idx}.{file_extension}")) + .clone() + .join(format!("{write_id}_{part_idx}.{file_extension}")) } else { base_output_path.prefix().to_owned() } @@ -588,8 +589,8 @@ fn compute_hive_style_file_path( ) -> Path { let mut file_path = base_output_path.prefix().clone(); for j in 0..part_key.len() { - file_path = file_path.child(format!("{}={}", partition_by[j].0, part_key[j])); + file_path = file_path.join(format!("{}={}", partition_by[j].0, part_key[j])); } - file_path.child(format!("{write_id}.{file_extension}")) + file_path.join(format!("{write_id}.{file_extension}")) } From 18af5189e212e681db5510bc08b7d24e85a69dd2 Mon Sep 17 00:00:00 2001 From: Dmitrii Blaginin Date: Thu, 2 Apr 2026 22:43:03 +0100 Subject: [PATCH 4/4] Merge queue: make dev checks required + add .asf.yaml validation (#21239) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Related to https://github.com/apache/datafusion/issues/6880 Follow up on https://github.com/apache/datafusion/pull/17538 Bringing back the merge queues 🤞🏻 Last time it was painful because: - there was no `.asf.yaml` validation - if something goes wrong, we weren't able to force merge without the infra team this PR solved the first problem. As for the second one, I have the permissions to bypass so should be able to quickly revert if something ever goes bad image Check result: https://github.com/apache/datafusion/actions/runs/23715604583/job/69082077684?pr=21239 I also checked that CI will keep working by [merging](https://github.com/apache/datafusion-sandbox/pull/197) this into our sandbox and then opening and merging [a dummy pr](https://github.com/apache/datafusion-sandbox/pull/204) For now, bringing just the basic checks (in dev.yml). Will do rust.yml separately if everything goes smoothly after this one is merged --- .asf.yaml | 6 + .github/workflows/dev.yml | 8 ++ ci/scripts/check_asf_yaml_status_checks.py | 145 +++++++++++++++++++++ 3 files changed, 159 insertions(+) create mode 100644 ci/scripts/check_asf_yaml_status_checks.py diff --git a/.asf.yaml b/.asf.yaml index b719a495bd735..73adb1c058b7b 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -51,6 +51,12 @@ github: main: required_pull_request_reviews: required_approving_review_count: 1 + required_status_checks: + contexts: + - "Check License Header" + - "Use prettier to check formatting of documents" + - "Validate required_status_checks in .asf.yaml" + - "Spell Check with Typos" # needs to be updated as part of the release process # .asf.yaml doesn't support wildcard branch protection rules, only exact branch names # https://github.com/apache/infrastructure-asfyaml?tab=readme-ov-file#branch-protection diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml index c2a06ed392519..a247f07333ee5 100644 --- a/.github/workflows/dev.yml +++ b/.github/workflows/dev.yml @@ -51,6 +51,14 @@ jobs: # if you encounter error, see instructions inside the script run: ci/scripts/doc_prettier_check.sh + asf-yaml-check: + name: Validate required_status_checks in .asf.yaml + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + - run: pip install pyyaml + - run: python3 ci/scripts/check_asf_yaml_status_checks.py + typos: name: Spell Check with Typos runs-on: ubuntu-latest diff --git a/ci/scripts/check_asf_yaml_status_checks.py b/ci/scripts/check_asf_yaml_status_checks.py new file mode 100644 index 0000000000000..135654159051c --- /dev/null +++ b/ci/scripts/check_asf_yaml_status_checks.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Validate that every entry in .asf.yaml required_status_checks +matches an actual GitHub Actions job name, and that the workflow +is not filtered by paths/paths-ignore (which would prevent the +check from running on some PRs, blocking merges). + +A typo or stale entry in required_status_checks will block all +merges for the project, so this check catches that early. +""" + +import glob +import os +import sys + +import yaml + + +def get_required_checks(asf_yaml_path): + """Extract all required_status_checks contexts from .asf.yaml.""" + with open(asf_yaml_path) as f: + config = yaml.safe_load(f) + + checks = {} # context -> list of branches requiring it + branches = config.get("github", {}).get("protected_branches", {}) + for branch, settings in branches.items(): + contexts = ( + settings.get("required_status_checks", {}).get("contexts", []) + ) + for ctx in contexts: + checks.setdefault(ctx, []).append(branch) + + return checks + + +def get_workflow_jobs(workflows_dir): + """Collect all jobs with their metadata from GitHub Actions workflow files. + + Returns a dict mapping job identifier (name or key) to a list of + (workflow_file, has_path_filters) tuples. + """ + jobs = {} # identifier -> [(workflow_file, has_path_filters)] + for workflow_file in sorted(glob.glob(os.path.join(workflows_dir, "*.yml"))): + with open(workflow_file) as f: + workflow = yaml.safe_load(f) + + if not workflow or "jobs" not in workflow: + continue + + # Check if pull_request trigger has path filters + on = workflow.get(True, workflow.get("on", {})) # yaml parses `on:` as True + pr_trigger = on.get("pull_request", {}) if isinstance(on, dict) else {} + has_path_filters = bool( + isinstance(pr_trigger, dict) + and (pr_trigger.get("paths") or pr_trigger.get("paths-ignore")) + ) + + basename = os.path.basename(workflow_file) + for job_key, job_config in workflow.get("jobs", {}).items(): + if not isinstance(job_config, dict): + continue + job_name = job_config.get("name", job_key) + info = (basename, has_path_filters) + jobs.setdefault(job_name, []).append(info) + if job_key != job_name: + jobs.setdefault(job_key, []).append(info) + + return jobs + + +def main(): + repo_root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ) + asf_yaml = os.path.join(repo_root, ".asf.yaml") + workflows_dir = os.path.join(repo_root, ".github", "workflows") + + required_checks = get_required_checks(asf_yaml) + if not required_checks: + print("No required_status_checks found in .asf.yaml — nothing to validate.") + return + + jobs = get_workflow_jobs(workflows_dir) + errors = [] + + for ctx in sorted(required_checks): + branches = ", ".join(sorted(required_checks[ctx])) + if ctx not in jobs: + errors.append( + f' - "{ctx}" (branch: {branches}): ' + f"not found in any GitHub Actions workflow" + ) + continue + + # Check if ALL workflows providing this job have path filters + # (if at least one doesn't, the check will still run) + filtered_workflows = [ + wf for wf, has_filter in jobs[ctx] if has_filter + ] + unfiltered_workflows = [ + wf for wf, has_filter in jobs[ctx] if not has_filter + ] + if filtered_workflows and not unfiltered_workflows: + wf_list = ", ".join(filtered_workflows) + errors.append( + f' - "{ctx}" (branch: {branches}): ' + f"workflow {wf_list} uses paths/paths-ignore filters on " + f"pull_request, so this check won't run for some PRs " + f"and will block merging" + ) + + if errors: + print("ERROR: Problems found with required_status_checks in .asf.yaml:\n") + print("\n".join(errors)) + print() + print("Available job names across all workflows:") + for name in sorted(jobs): + print(f" - {name}") + sys.exit(1) + + print( + f"OK: All {len(required_checks)} required_status_checks " + "match existing GitHub Actions jobs." + ) + + +if __name__ == "__main__": + main()