Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions miso-optimizations/src/eliminate_unused_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn transform_steps(steps: &[WorkflowStep]) -> Option<Vec<WorkflowStep>> {
None
}

fn create_identity_project(fields: BTreeSet<Field>) -> WorkflowStep {
pub(crate) fn create_identity_project(fields: BTreeSet<Field>) -> WorkflowStep {
WorkflowStep::Project(
fields
.into_iter()
Expand All @@ -70,7 +70,7 @@ fn create_identity_project(fields: BTreeSet<Field>) -> WorkflowStep {
)
}

fn compute_required_before_step(
pub(crate) fn compute_required_before_step(
step: &WorkflowStep,
mut after: BTreeSet<Field>,
) -> BTreeSet<Field> {
Expand Down
3 changes: 3 additions & 0 deletions miso-optimizations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use push_into_scan::PushIntoScan;
use push_join_into_scan::PushJoinIntoScan;
use push_limit_into_limit::PushLimitIntoLimit;
use push_limit_into_topn::PushLimitIntoTopN;
use push_project_into_join::PushProjectIntoJoin;
use push_steps_into_union::PushStepsIntoUnion;
use push_union_into_scan::PushUnionIntoScan;
use remove_no_op_filter::RemoveNoOpFilter;
Expand Down Expand Up @@ -55,6 +56,7 @@ mod push_into_scan;
mod push_join_into_scan;
mod push_limit_into_limit;
mod push_limit_into_topn;
mod push_project_into_join;
mod push_steps_into_union;
mod push_union_into_scan;
mod remove_no_op_filter;
Expand Down Expand Up @@ -181,6 +183,7 @@ impl Optimizer {
vec![
opt!(ReorderFilterBeforeSort),
opt!(PushFilterIntoJoin),
opt!(PushProjectIntoJoin),
opt!(MergeConsecutiveExtends),
opt!(FoldRenameIntoProject),
opt!(MergeConsecutiveProjects),
Expand Down
29 changes: 3 additions & 26 deletions miso-optimizations/src/push_filter_into_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ fn flatten_and_conditions(expr: Expr) -> Vec<Expr> {
}
}

fn right_workflow_fields(steps: &[WorkflowStep]) -> Option<HashSet<Field>> {
pub(crate) fn right_workflow_fields(steps: &[WorkflowStep]) -> Option<HashSet<Field>> {
match steps.last()? {
WorkflowStep::Project(fields) => Some(fields.iter().map(|pf| pf.to.clone()).collect()),
WorkflowStep::Summarize(summarize) => {
Expand All @@ -143,33 +143,10 @@ fn classify_condition(condition: &Expr, right_fields: &HashSet<Field>) -> Condit

#[cfg(test)]
mod tests {
use miso_workflow::Workflow;
use miso_workflow_types::join::{Join, JoinType};
use miso_workflow_types::join::JoinType;

use super::*;
use crate::test_utils::{and, eq, field, field_expr, gt, lit, noop_project, summarize_by};

fn join(
type_: JoinType,
left_key: &str,
right_key: &str,
right_steps: Vec<WorkflowStep>,
) -> WorkflowStep {
WorkflowStep::Join(
Join {
on: (field(left_key), field(right_key)),
type_,
partitions: 1,
},
Workflow::new(right_steps),
)
}

fn right_project(fields: &[&str]) -> Vec<WorkflowStep> {
vec![WorkflowStep::Project(
fields.iter().map(|f| noop_project(f)).collect(),
)]
}
use crate::test_utils::{and, eq, field_expr, gt, join, lit, right_project, summarize_by};

fn apply(steps: &[WorkflowStep]) -> OptimizationResult {
let opt = PushFilterIntoJoin;
Expand Down
243 changes: 243 additions & 0 deletions miso-optimizations/src/push_project_into_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
//! Narrows both sides of a join to only the fields that later steps need.
//!
//! A join merges two streams, but often the steps after it only use a few
//! fields. Without pruning, both sides carry unused fields through the join.
//!
//! We walk backwards from the last step to find which fields are needed, then
//! split them into left vs right based on the right workflow's output fields.
//! The left side has unknown schema, so we always inject a project before the
//! join. The right side has known schema (project or summarize), so we only
//! add a project when it actually drops fields.
//!
//! Example:
//! join (scan | project rf, rid, extra) on id == rid | project lf, rf
//! becomes:
//! project id, lf | join (scan | project rf, rid, extra | project rf, rid) on id == rid | project lf, rf
//!
//! When the only step after the join is count, both sides shrink to just the key:
//! join (scan | project rf, rid) on id == rid | count
//! becomes:
//! project id | join (scan | project rf, rid | project rid) on id == rid | count

use std::collections::BTreeSet;

use miso_workflow::WorkflowStep;
use miso_workflow_types::field::Field;

use crate::eliminate_unused_fields::{compute_required_before_step, create_identity_project};
use crate::pattern;
use crate::push_filter_into_join::right_workflow_fields;

use super::{Group, Optimization, OptimizationResult, Pattern};

pub struct PushProjectIntoJoin;

impl Optimization for PushProjectIntoJoin {
fn pattern(&self) -> Pattern {
pattern!(Join ([^Join Union Tee Write]*?) [Project Summarize MuxSummarize Count MuxCount])
}

fn apply(&self, steps: &[WorkflowStep], groups: &[Group]) -> OptimizationResult {
let WorkflowStep::Join(join, right_workflow) = &steps[0] else {
return OptimizationResult::Unchanged;
};

let Some(mut right_fields) = right_workflow_fields(&right_workflow.steps) else {
return OptimizationResult::Unchanged;
};

if join.on.0 == join.on.1 {
right_fields.remove(&join.on.1);
}

let mut required: BTreeSet<Field> = BTreeSet::new();
for step in steps[1..].iter().rev() {
required = compute_required_before_step(step, required);
}
required.insert(join.on.0.clone());
required.insert(join.on.1.clone());

let left_required: BTreeSet<Field> = required
.iter()
.filter(|f| !right_fields.contains(*f))
.cloned()
.collect();
let right_required: BTreeSet<Field> = required
.iter()
.filter(|f| right_fields.contains(*f))
.cloned()
.collect();

let prune_right = right_required.len() < right_fields.len();

if left_required.is_empty() && !prune_right {
return OptimizationResult::Unchanged;
}

let mut result = Vec::new();

if !left_required.is_empty() {
result.push(create_identity_project(left_required));
}

let mut right_wf = right_workflow.clone();
if prune_right {
right_wf.steps.push(create_identity_project(right_required));
}
result.push(WorkflowStep::Join(join.clone(), right_wf));

let group = groups[0];
result.extend(steps[group.0..group.1].iter().cloned());
result.push(steps.last().unwrap().clone());

OptimizationResult::Changed(result)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
field, field_expr, gt, join, lit, project, right_project, summarize_by,
};
use miso_workflow::WorkflowStep;
use miso_workflow_types::join::JoinType;

fn apply(steps: &[WorkflowStep]) -> OptimizationResult {
let opt = PushProjectIntoJoin;
let pattern = opt.pattern();
let mut groups = Vec::new();
let kinds: Vec<_> = steps.iter().map(|s| s.kind()).collect();
let Some((start, end)) = Pattern::search_first_with_groups(&pattern, &kinds, &mut groups)
else {
return OptimizationResult::Unchanged;
};
opt.apply(&steps[start..end], &groups)
}

#[test]
fn prunes_both_sides() {
let steps = vec![
join(
JoinType::Inner,
"id",
"rid",
right_project(&["rf", "rid", "extra"]),
),
project(&["lf", "rf"]),
];

let OptimizationResult::Changed(new_steps) = apply(&steps) else {
panic!("expected Changed");
};

assert_eq!(new_steps.len(), 3);
assert!(matches!(&new_steps[0], WorkflowStep::Project(fields) if fields.len() == 2));
let WorkflowStep::Join(_, rw) = &new_steps[1] else {
panic!("expected Join");
};
assert_eq!(rw.steps.len(), 2);
assert!(matches!(&new_steps[2], WorkflowStep::Project(fields) if fields.len() == 2));
}

#[test]
fn count_prunes_to_join_keys_only() {
let steps = vec![
join(JoinType::Inner, "id", "rid", right_project(&["rf", "rid"])),
WorkflowStep::Count,
];

let OptimizationResult::Changed(new_steps) = apply(&steps) else {
panic!("expected Changed");
};

assert!(matches!(&new_steps[0], WorkflowStep::Project(fields) if fields.len() == 1));
let WorkflowStep::Join(_, rw) = &new_steps[1] else {
panic!("expected Join");
};
assert!(
matches!(rw.steps.last().unwrap(), WorkflowStep::Project(fields) if fields.len() == 1)
);
}

#[test]
fn preserves_intermediate_steps() {
let steps = vec![
join(
JoinType::Inner,
"id",
"rid",
right_project(&["rf", "rid", "extra"]),
),
WorkflowStep::Filter(gt(field_expr("lf"), lit(5))),
project(&["lf", "rf"]),
];

let OptimizationResult::Changed(new_steps) = apply(&steps) else {
panic!("expected Changed");
};

assert_eq!(new_steps.len(), 4);
assert!(matches!(&new_steps[0], WorkflowStep::Project(_)));
assert!(matches!(&new_steps[2], WorkflowStep::Filter(_)));
assert!(matches!(&new_steps[3], WorkflowStep::Project(_)));
}

#[test]
fn right_fields_unknown_unchanged() {
let steps = vec![
join(
JoinType::Inner,
"id",
"rid",
vec![WorkflowStep::Filter(gt(field_expr("x"), lit(1)))],
),
project(&["lf", "rf"]),
];

assert_eq!(apply(&steps), OptimizationResult::Unchanged);
}

#[test]
fn overlapping_join_keys() {
let steps = vec![
join(JoinType::Inner, "id", "id", right_project(&["rf", "id"])),
project(&["rf"]),
];

let OptimizationResult::Changed(new_steps) = apply(&steps) else {
panic!("expected Changed");
};

assert!(matches!(&new_steps[0], WorkflowStep::Project(fields) if fields.len() == 1));
let WorkflowStep::Project(left_proj) = &new_steps[0] else {
panic!("expected Project");
};
assert_eq!(left_proj[0].to, field("id"));
}

#[test]
fn summarize_right_workflow() {
let steps = vec![
join(
JoinType::Inner,
"id",
"rid",
vec![summarize_by(&["rf", "rid", "extra"])],
),
project(&["lf", "rf"]),
];

let OptimizationResult::Changed(new_steps) = apply(&steps) else {
panic!("expected Changed");
};

let WorkflowStep::Join(_, rw) = &new_steps[1] else {
panic!("expected Join");
};
assert_eq!(rw.steps.len(), 2);
assert!(
matches!(rw.steps.last().unwrap(), WorkflowStep::Project(fields) if fields.len() == 2)
);
}
}
22 changes: 22 additions & 0 deletions miso-optimizations/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use miso_workflow_types::{
expr::Expr,
field::Field,
field_unwrap,
join::JoinType,
project::ProjectField,
sort::{NullsOrder, Sort, SortOrder},
summarize::{Aggregation, ByField, Summarize},
Expand Down Expand Up @@ -137,3 +138,24 @@ pub fn summarize_by(fields: &[&str]) -> S {
.collect(),
})
}

pub fn join(type_: JoinType, left_key: &str, right_key: &str, right_steps: Vec<S>) -> S {
use miso_workflow::Workflow;
use miso_workflow_types::join::Join;
S::Join(
Join {
on: (field(left_key), field(right_key)),
type_,
partitions: 1,
},
Workflow::new(right_steps),
)
}

pub fn right_project(fields: &[&str]) -> Vec<S> {
vec![S::Project(fields.iter().map(|f| noop_project(f)).collect())]
}

pub fn project(fields: &[&str]) -> S {
S::Project(fields.iter().map(|f| noop_project(f)).collect())
}
Loading