Skip to content

Commit 95b675e

Browse files
author
Bert Vermeiren
committed
fix: filter with limit support raises internal error
1 parent 30a693c commit 95b675e

File tree

2 files changed

+33
-44
lines changed

2 files changed

+33
-44
lines changed

datafusion/physical-plan/src/coalesce/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ impl LimitedBatchCoalescer {
134134
Ok(())
135135
}
136136

137+
pub fn is_finished(&self) -> bool {
138+
self.finished
139+
}
140+
137141
/// Return the next completed batch, if any
138142
pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
139143
self.inner.next_completed_batch()

datafusion/physical-plan/src/filter.rs

Lines changed: 29 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use super::{
2626
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
2727
RecordBatchStream, SendableRecordBatchStream, Statistics,
2828
};
29-
use crate::coalesce::LimitedBatchCoalescer;
3029
use crate::coalesce::PushBatchStatus::LimitReached;
30+
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
3131
use crate::common::can_project;
3232
use crate::execution_plan::CardinalityEffect;
3333
use crate::filter_pushdown::{
@@ -711,23 +711,6 @@ impl FilterExecMetrics {
711711
}
712712
}
713713

714-
impl FilterExecStream {
715-
fn flush_remaining_batches(
716-
&mut self,
717-
) -> Poll<Option<std::result::Result<RecordBatch, DataFusionError>>> {
718-
// Flush any remaining buffered batch
719-
match self.batch_coalescer.finish() {
720-
Ok(()) => {
721-
Poll::Ready(self.batch_coalescer.next_completed_batch().map(|batch| {
722-
self.metrics.selectivity.add_part(batch.num_rows());
723-
Ok(batch)
724-
}))
725-
}
726-
Err(e) => Poll::Ready(Some(Err(e))),
727-
}
728-
}
729-
}
730-
731714
pub fn batch_filter(
732715
batch: &RecordBatch,
733716
predicate: &Arc<dyn PhysicalExpr>,
@@ -767,10 +750,27 @@ impl Stream for FilterExecStream {
767750
mut self: Pin<&mut Self>,
768751
cx: &mut Context<'_>,
769752
) -> Poll<Option<Self::Item>> {
770-
let poll;
771753
let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
772754
loop {
755+
// If there is any completed batch ready, return it
756+
if let Some(batch) = self.batch_coalescer.next_completed_batch() {
757+
self.metrics.selectivity.add_part(batch.num_rows());
758+
let poll = Poll::Ready(Some(Ok(batch)));
759+
return self.metrics.baseline_metrics.record_poll(poll);
760+
}
761+
762+
if self.batch_coalescer.is_finished() {
763+
// If input is done and no batches are ready, return None to signal end of stream.
764+
let poll = Poll::Ready(None);
765+
return self.metrics.baseline_metrics.record_poll(poll);
766+
}
767+
768+
// Attempt to pull the next batch from the input stream.
773769
match ready!(self.input.poll_next_unpin(cx)) {
770+
None => {
771+
self.batch_coalescer.finish()?;
772+
// continue draining the coalescer
773+
}
774774
Some(Ok(batch)) => {
775775
let timer = elapsed_compute.timer();
776776
let status = self.predicate.as_ref()
@@ -802,37 +802,22 @@ impl Stream for FilterExecStream {
802802
})?;
803803
timer.done();
804804

805-
if let LimitReached = status {
806-
poll = self.flush_remaining_batches();
807-
break;
808-
}
809-
810-
if let Some(batch) = self.batch_coalescer.next_completed_batch() {
811-
self.metrics.selectivity.add_part(batch.num_rows());
812-
poll = Poll::Ready(Some(Ok(batch)));
813-
break;
814-
}
815-
continue;
816-
}
817-
None => {
818-
// Flush any remaining buffered batch
819-
match self.batch_coalescer.finish() {
820-
Ok(()) => {
821-
poll = self.flush_remaining_batches();
805+
match status {
806+
PushBatchStatus::Continue => {
807+
// Keep pushing more batches
822808
}
823-
Err(e) => {
824-
poll = Poll::Ready(Some(Err(e)));
809+
LimitReached => {
810+
// limit was reached, so stop early
811+
self.batch_coalescer.finish()?
812+
// continue draining the coalescer
825813
}
826814
}
827-
break;
828-
}
829-
value => {
830-
poll = Poll::Ready(value);
831-
break;
832815
}
816+
817+
// Error case
818+
other => return Poll::Ready(other),
833819
}
834820
}
835-
self.metrics.baseline_metrics.record_poll(poll)
836821
}
837822

838823
fn size_hint(&self) -> (usize, Option<usize>) {

0 commit comments

Comments
 (0)