Skip to content

Commit 1b2f6ee

Browse files
perf: correctly try execute parent in the iterative child execute loop (#7386)
We run iterative execution for executing arrays (decompressing). This PR add a execute_parent call when executing a child in a iterative fashion --------- Signed-off-by: Alfonso Subiotto Marques <alfonso.subiotto@polarsignals.com> Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk> Co-authored-by: Alfonso Subiotto Marques <alfonso.subiotto@polarsignals.com>
1 parent 67c1655 commit 1b2f6ee

File tree

2 files changed

+66
-30
lines changed

2 files changed

+66
-30
lines changed

encodings/runend/src/compute/filter.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ fn filter_run_end_primitive<R: NativePType + AddAssign + From<bool> + AsPrimitiv
115115
#[cfg(test)]
116116
mod tests {
117117
use vortex_array::IntoArray;
118+
use vortex_array::LEGACY_SESSION;
119+
use vortex_array::VortexSessionExecute;
118120
use vortex_array::arrays::PrimitiveArray;
119121
use vortex_array::assert_arrays_eq;
120122
use vortex_error::VortexResult;
@@ -142,4 +144,43 @@ mod tests {
142144
);
143145
Ok(())
144146
}
147+
148+
/// Regression: Filter(Slice(RunEnd)) must preserve RunEnd after execution.
149+
/// Previously Filter.execute() forced its child to canonical, decoding
150+
/// Slice(RunEnd) → Primitive and destroying run structure. The fix lets
151+
/// Filter unwrap one layer at a time so RunEnd's FilterKernel can fire.
152+
#[test]
153+
fn filter_sliced_run_end_preserves_encoding() -> VortexResult<()> {
154+
// 4 runs of 32 each = 128 rows. Large enough that FilterKernel takes
155+
// the run-preserving path (true_count >= 25).
156+
let values: Vec<i32> = [10, 20, 30, 40]
157+
.iter()
158+
.flat_map(|&v| std::iter::repeat_n(v, 32))
159+
.collect();
160+
let arr = RunEnd::encode(PrimitiveArray::from_iter(values).into_array())?;
161+
162+
// Slice off the first 16 rows. Slice(RunEnd), 112 rows, 4 runs.
163+
let sliced = arr.into_array().slice(16..128)?;
164+
165+
// Keep every other row = 112/2 = 56 rows.
166+
let mask = Mask::from_iter((0..sliced.len()).map(|i| i % 2 == 0));
167+
let filtered = sliced.filter(mask)?;
168+
169+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
170+
let executed = filtered.execute_until::<RunEnd>(&mut ctx)?;
171+
assert_eq!(
172+
executed.encoding_id().as_ref(),
173+
"vortex.runend",
174+
"Filter(Slice(RunEnd)) should preserve RunEnd encoding"
175+
);
176+
177+
let expected: Vec<i32> = std::iter::repeat_n(10, 8)
178+
.chain(std::iter::repeat_n(20, 16))
179+
.chain(std::iter::repeat_n(30, 16))
180+
.chain(std::iter::repeat_n(40, 16))
181+
.collect();
182+
assert_arrays_eq!(executed, PrimitiveArray::from_iter(expected));
183+
184+
Ok(())
185+
}
145186
}

vortex-array/src/executor.rs

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,18 @@ impl ArrayRef {
8686
/// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
8787
/// stack.
8888
///
89-
/// The scheduler repeatedly:
90-
/// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
91-
/// 2. Runs `execute_parent` on each child for child-driven optimizations.
92-
/// 3. Calls `execute` which returns an [`ExecutionStep`].
89+
/// Each iteration proceeds through three steps in order:
90+
///
91+
/// 1. **Done / canonical check** — if `current` satisfies the active done predicate or is
92+
/// canonical, splice it back into the stacked parent (if any) and continue, or return.
93+
/// 2. **`execute_parent` on children** — try each child's `execute_parent` against `current`
94+
/// as the parent (e.g. `Filter(RunEnd)` → `FilterExecuteAdaptor` fires from RunEnd).
95+
/// If there is a stacked parent frame, the rewritten child is spliced back into it so
96+
/// that optimize and further `execute_parent` can fire on the reconstructed parent
97+
/// (e.g. `Slice(RunEnd)` → `RunEnd` spliced into stacked `Filter` → `Filter(RunEnd)`
98+
/// whose `FilterExecuteAdaptor` fires on the next iteration).
99+
/// 3. **`execute`** — call the encoding's own execute step, which either returns `Done` or
100+
/// `ExecuteSlot(i)` to push a child onto the stack for focused execution.
93101
///
94102
/// Note: the returned array may not match `M`. If execution converges to a canonical form
95103
/// that does not match `M`, the canonical array is returned since no further execution
@@ -103,51 +111,41 @@ impl ArrayRef {
103111
let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
104112

105113
for _ in 0..max_iterations() {
106-
// Check for termination: use the stack frame's done predicate, or the root matcher.
114+
// Step 1: done / canonical — splice back into stacked parent or return.
107115
let is_done = stack
108116
.last()
109117
.map_or(M::matches as DonePredicate, |frame| frame.2);
110-
if is_done(&current) {
118+
if is_done(&current) || AnyCanonical::matches(&current) {
111119
match stack.pop() {
112120
None => {
113121
ctx.log(format_args!("-> {}", current));
114122
return Ok(current);
115123
}
116124
Some((parent, slot_idx, _)) => {
117-
current = parent.with_slot(slot_idx, current)?;
118-
current = current.optimize()?;
125+
current = parent.with_slot(slot_idx, current)?.optimize()?;
119126
continue;
120127
}
121128
}
122129
}
123130

124-
// If we've reached canonical form, we can't execute any further regardless
125-
// of whether the matcher matched.
126-
if AnyCanonical::matches(&current) {
127-
match stack.pop() {
128-
None => {
129-
ctx.log(format_args!("-> canonical (unmatched) {}", current));
130-
return Ok(current);
131-
}
132-
Some((parent, slot_idx, _)) => {
133-
current = parent.with_slot(slot_idx, current)?;
134-
current = current.optimize()?;
135-
continue;
136-
}
137-
}
138-
}
139-
140-
// Try execute_parent (child-driven optimized execution)
131+
// Step 2: execute_parent on children (current is the parent).
132+
// If there is a stacked parent frame, splice the rewritten child back into it
133+
// so that optimize and execute_parent can fire naturally on the reconstructed parent
134+
// (e.g. Slice(RunEnd) -RunEndSliceKernel-> RunEnd, spliced back into Filter gives
135+
// Filter(RunEnd), whose FilterExecuteAdaptor fires on the next iteration).
141136
if let Some(rewritten) = try_execute_parent(&current, ctx)? {
142137
ctx.log(format_args!(
143138
"execute_parent rewrote {} -> {}",
144139
current, rewritten
145140
));
146141
current = rewritten.optimize()?;
142+
if let Some((parent, slot_idx, _)) = stack.pop() {
143+
current = parent.with_slot(slot_idx, current)?.optimize()?;
144+
}
147145
continue;
148146
}
149147

150-
// Execute the array itself.
148+
// Step 4: execute the encoding's own step.
151149
let result = execute_step(current, ctx)?;
152150
let (array, step) = result.into_parts();
153151
match step {
@@ -177,9 +175,6 @@ impl ArrayRef {
177175
}
178176

179177
/// Execution context for batch CPU compute.
180-
///
181-
/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
182-
/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
183178
#[derive(Debug, Clone)]
184179
pub struct ExecutionCtx {
185180
id: usize,
@@ -193,8 +188,8 @@ impl ExecutionCtx {
193188
static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
194189
let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
195190
Self {
196-
id,
197191
session,
192+
id,
198193
ops: Vec::new(),
199194
}
200195
}

0 commit comments

Comments
 (0)