From a35a7fd32c394633dab06edba65398074d6b4a23 Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Wed, 11 Feb 2026 22:11:59 +0200 Subject: [PATCH] optimizations: push project into join --- .../src/eliminate_unused_fields.rs | 4 +- miso-optimizations/src/lib.rs | 3 + .../src/push_filter_into_join.rs | 29 +-- .../src/push_project_into_join.rs | 243 ++++++++++++++++++ miso-optimizations/src/test_utils.rs | 22 ++ 5 files changed, 273 insertions(+), 28 deletions(-) create mode 100644 miso-optimizations/src/push_project_into_join.rs diff --git a/miso-optimizations/src/eliminate_unused_fields.rs b/miso-optimizations/src/eliminate_unused_fields.rs index d447900..1eb9a86 100644 --- a/miso-optimizations/src/eliminate_unused_fields.rs +++ b/miso-optimizations/src/eliminate_unused_fields.rs @@ -55,7 +55,7 @@ fn transform_steps(steps: &[WorkflowStep]) -> Option> { None } -fn create_identity_project(fields: BTreeSet) -> WorkflowStep { +pub(crate) fn create_identity_project(fields: BTreeSet) -> WorkflowStep { WorkflowStep::Project( fields .into_iter() @@ -70,7 +70,7 @@ fn create_identity_project(fields: BTreeSet) -> WorkflowStep { ) } -fn compute_required_before_step( +pub(crate) fn compute_required_before_step( step: &WorkflowStep, mut after: BTreeSet, ) -> BTreeSet { diff --git a/miso-optimizations/src/lib.rs b/miso-optimizations/src/lib.rs index 15e3bfd..795d44a 100644 --- a/miso-optimizations/src/lib.rs +++ b/miso-optimizations/src/lib.rs @@ -22,6 +22,7 @@ use push_into_scan::PushIntoScan; use push_join_into_scan::PushJoinIntoScan; use push_limit_into_limit::PushLimitIntoLimit; use push_limit_into_topn::PushLimitIntoTopN; +use push_project_into_join::PushProjectIntoJoin; use push_steps_into_union::PushStepsIntoUnion; use push_union_into_scan::PushUnionIntoScan; use remove_no_op_filter::RemoveNoOpFilter; @@ -55,6 +56,7 @@ mod push_into_scan; mod push_join_into_scan; mod push_limit_into_limit; mod push_limit_into_topn; +mod push_project_into_join; mod push_steps_into_union; mod push_union_into_scan; mod remove_no_op_filter; @@ -181,6 +183,7 @@ impl Optimizer { vec![ opt!(ReorderFilterBeforeSort), opt!(PushFilterIntoJoin), + opt!(PushProjectIntoJoin), opt!(MergeConsecutiveExtends), opt!(FoldRenameIntoProject), opt!(MergeConsecutiveProjects), diff --git a/miso-optimizations/src/push_filter_into_join.rs b/miso-optimizations/src/push_filter_into_join.rs index 70fb22a..340d6cd 100644 --- a/miso-optimizations/src/push_filter_into_join.rs +++ b/miso-optimizations/src/push_filter_into_join.rs @@ -118,7 +118,7 @@ fn flatten_and_conditions(expr: Expr) -> Vec { } } -fn right_workflow_fields(steps: &[WorkflowStep]) -> Option> { +pub(crate) fn right_workflow_fields(steps: &[WorkflowStep]) -> Option> { match steps.last()? { WorkflowStep::Project(fields) => Some(fields.iter().map(|pf| pf.to.clone()).collect()), WorkflowStep::Summarize(summarize) => { @@ -143,33 +143,10 @@ fn classify_condition(condition: &Expr, right_fields: &HashSet) -> Condit #[cfg(test)] mod tests { - use miso_workflow::Workflow; - use miso_workflow_types::join::{Join, JoinType}; + use miso_workflow_types::join::JoinType; use super::*; - use crate::test_utils::{and, eq, field, field_expr, gt, lit, noop_project, summarize_by}; - - fn join( - type_: JoinType, - left_key: &str, - right_key: &str, - right_steps: Vec, - ) -> WorkflowStep { - WorkflowStep::Join( - Join { - on: (field(left_key), field(right_key)), - type_, - partitions: 1, - }, - Workflow::new(right_steps), - ) - } - - fn right_project(fields: &[&str]) -> Vec { - vec![WorkflowStep::Project( - fields.iter().map(|f| noop_project(f)).collect(), - )] - } + use crate::test_utils::{and, eq, field_expr, gt, join, lit, right_project, summarize_by}; fn apply(steps: &[WorkflowStep]) -> OptimizationResult { let opt = PushFilterIntoJoin; diff --git a/miso-optimizations/src/push_project_into_join.rs b/miso-optimizations/src/push_project_into_join.rs new file mode 100644 index 0000000..a7c4632 --- /dev/null +++ b/miso-optimizations/src/push_project_into_join.rs @@ -0,0 +1,243 @@ +//! Narrows both sides of a join to only the fields that later steps need. +//! +//! A join merges two streams, but often the steps after it only use a few +//! fields. Without pruning, both sides carry unused fields through the join. +//! +//! We walk backwards from the last step to find which fields are needed, then +//! split them into left vs right based on the right workflow's output fields. +//! The left side has unknown schema, so we always inject a project before the +//! join. The right side has known schema (project or summarize), so we only +//! add a project when it actually drops fields. +//! +//! Example: +//! join (scan | project rf, rid, extra) on id == rid | project lf, rf +//! becomes: +//! project id, lf | join (scan | project rf, rid, extra | project rf, rid) on id == rid | project lf, rf +//! +//! When the only step after the join is count, both sides shrink to just the key: +//! join (scan | project rf, rid) on id == rid | count +//! becomes: +//! project id | join (scan | project rf, rid | project rid) on id == rid | count + +use std::collections::BTreeSet; + +use miso_workflow::WorkflowStep; +use miso_workflow_types::field::Field; + +use crate::eliminate_unused_fields::{compute_required_before_step, create_identity_project}; +use crate::pattern; +use crate::push_filter_into_join::right_workflow_fields; + +use super::{Group, Optimization, OptimizationResult, Pattern}; + +pub struct PushProjectIntoJoin; + +impl Optimization for PushProjectIntoJoin { + fn pattern(&self) -> Pattern { + pattern!(Join ([^Join Union Tee Write]*?) [Project Summarize MuxSummarize Count MuxCount]) + } + + fn apply(&self, steps: &[WorkflowStep], groups: &[Group]) -> OptimizationResult { + let WorkflowStep::Join(join, right_workflow) = &steps[0] else { + return OptimizationResult::Unchanged; + }; + + let Some(mut right_fields) = right_workflow_fields(&right_workflow.steps) else { + return OptimizationResult::Unchanged; + }; + + if join.on.0 == join.on.1 { + right_fields.remove(&join.on.1); + } + + let mut required: BTreeSet = BTreeSet::new(); + for step in steps[1..].iter().rev() { + required = compute_required_before_step(step, required); + } + required.insert(join.on.0.clone()); + required.insert(join.on.1.clone()); + + let left_required: BTreeSet = required + .iter() + .filter(|f| !right_fields.contains(*f)) + .cloned() + .collect(); + let right_required: BTreeSet = required + .iter() + .filter(|f| right_fields.contains(*f)) + .cloned() + .collect(); + + let prune_right = right_required.len() < right_fields.len(); + + if left_required.is_empty() && !prune_right { + return OptimizationResult::Unchanged; + } + + let mut result = Vec::new(); + + if !left_required.is_empty() { + result.push(create_identity_project(left_required)); + } + + let mut right_wf = right_workflow.clone(); + if prune_right { + right_wf.steps.push(create_identity_project(right_required)); + } + result.push(WorkflowStep::Join(join.clone(), right_wf)); + + let group = groups[0]; + result.extend(steps[group.0..group.1].iter().cloned()); + result.push(steps.last().unwrap().clone()); + + OptimizationResult::Changed(result) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{ + field, field_expr, gt, join, lit, project, right_project, summarize_by, + }; + use miso_workflow::WorkflowStep; + use miso_workflow_types::join::JoinType; + + fn apply(steps: &[WorkflowStep]) -> OptimizationResult { + let opt = PushProjectIntoJoin; + let pattern = opt.pattern(); + let mut groups = Vec::new(); + let kinds: Vec<_> = steps.iter().map(|s| s.kind()).collect(); + let Some((start, end)) = Pattern::search_first_with_groups(&pattern, &kinds, &mut groups) + else { + return OptimizationResult::Unchanged; + }; + opt.apply(&steps[start..end], &groups) + } + + #[test] + fn prunes_both_sides() { + let steps = vec![ + join( + JoinType::Inner, + "id", + "rid", + right_project(&["rf", "rid", "extra"]), + ), + project(&["lf", "rf"]), + ]; + + let OptimizationResult::Changed(new_steps) = apply(&steps) else { + panic!("expected Changed"); + }; + + assert_eq!(new_steps.len(), 3); + assert!(matches!(&new_steps[0], WorkflowStep::Project(fields) if fields.len() == 2)); + let WorkflowStep::Join(_, rw) = &new_steps[1] else { + panic!("expected Join"); + }; + assert_eq!(rw.steps.len(), 2); + assert!(matches!(&new_steps[2], WorkflowStep::Project(fields) if fields.len() == 2)); + } + + #[test] + fn count_prunes_to_join_keys_only() { + let steps = vec![ + join(JoinType::Inner, "id", "rid", right_project(&["rf", "rid"])), + WorkflowStep::Count, + ]; + + let OptimizationResult::Changed(new_steps) = apply(&steps) else { + panic!("expected Changed"); + }; + + assert!(matches!(&new_steps[0], WorkflowStep::Project(fields) if fields.len() == 1)); + let WorkflowStep::Join(_, rw) = &new_steps[1] else { + panic!("expected Join"); + }; + assert!( + matches!(rw.steps.last().unwrap(), WorkflowStep::Project(fields) if fields.len() == 1) + ); + } + + #[test] + fn preserves_intermediate_steps() { + let steps = vec![ + join( + JoinType::Inner, + "id", + "rid", + right_project(&["rf", "rid", "extra"]), + ), + WorkflowStep::Filter(gt(field_expr("lf"), lit(5))), + project(&["lf", "rf"]), + ]; + + let OptimizationResult::Changed(new_steps) = apply(&steps) else { + panic!("expected Changed"); + }; + + assert_eq!(new_steps.len(), 4); + assert!(matches!(&new_steps[0], WorkflowStep::Project(_))); + assert!(matches!(&new_steps[2], WorkflowStep::Filter(_))); + assert!(matches!(&new_steps[3], WorkflowStep::Project(_))); + } + + #[test] + fn right_fields_unknown_unchanged() { + let steps = vec![ + join( + JoinType::Inner, + "id", + "rid", + vec![WorkflowStep::Filter(gt(field_expr("x"), lit(1)))], + ), + project(&["lf", "rf"]), + ]; + + assert_eq!(apply(&steps), OptimizationResult::Unchanged); + } + + #[test] + fn overlapping_join_keys() { + let steps = vec![ + join(JoinType::Inner, "id", "id", right_project(&["rf", "id"])), + project(&["rf"]), + ]; + + let OptimizationResult::Changed(new_steps) = apply(&steps) else { + panic!("expected Changed"); + }; + + assert!(matches!(&new_steps[0], WorkflowStep::Project(fields) if fields.len() == 1)); + let WorkflowStep::Project(left_proj) = &new_steps[0] else { + panic!("expected Project"); + }; + assert_eq!(left_proj[0].to, field("id")); + } + + #[test] + fn summarize_right_workflow() { + let steps = vec![ + join( + JoinType::Inner, + "id", + "rid", + vec![summarize_by(&["rf", "rid", "extra"])], + ), + project(&["lf", "rf"]), + ]; + + let OptimizationResult::Changed(new_steps) = apply(&steps) else { + panic!("expected Changed"); + }; + + let WorkflowStep::Join(_, rw) = &new_steps[1] else { + panic!("expected Join"); + }; + assert_eq!(rw.steps.len(), 2); + assert!( + matches!(rw.steps.last().unwrap(), WorkflowStep::Project(fields) if fields.len() == 2) + ); + } +} diff --git a/miso-optimizations/src/test_utils.rs b/miso-optimizations/src/test_utils.rs index 90d6204..48894fb 100644 --- a/miso-optimizations/src/test_utils.rs +++ b/miso-optimizations/src/test_utils.rs @@ -6,6 +6,7 @@ use miso_workflow_types::{ expr::Expr, field::Field, field_unwrap, + join::JoinType, project::ProjectField, sort::{NullsOrder, Sort, SortOrder}, summarize::{Aggregation, ByField, Summarize}, @@ -137,3 +138,24 @@ pub fn summarize_by(fields: &[&str]) -> S { .collect(), }) } + +pub fn join(type_: JoinType, left_key: &str, right_key: &str, right_steps: Vec) -> S { + use miso_workflow::Workflow; + use miso_workflow_types::join::Join; + S::Join( + Join { + on: (field(left_key), field(right_key)), + type_, + partitions: 1, + }, + Workflow::new(right_steps), + ) +} + +pub fn right_project(fields: &[&str]) -> Vec { + vec![S::Project(fields.iter().map(|f| noop_project(f)).collect())] +} + +pub fn project(fields: &[&str]) -> S { + S::Project(fields.iter().map(|f| noop_project(f)).collect()) +}