Skip to content

Commit 8b47f45

Browse files
authored
fix: Remove nested async block causing Stacked Borrows violation in PushDecoderStreamState (apache#21663)
## Which issue does this PR close? - Closes apache#21662. ## Rationale for this change Miri detects a Stacked Borrows violation in `PushDecoderStreamState::transition`. A nested `async` block captures `&mut self` as a single opaque mutable reference. At the `.await` on `get_byte_ranges`, the future yields, and the `Unique` tag on the borrow stack is invalidated by a `SharedReadOnly` retag. When the future resumes, `push_ranges` attempts a two-phase retag through the now-invalidated tag. This was found by [Apache DataFusion Comet](https://github.com/apache/datafusion-comet), which runs Miri in CI. ## What changes are included in this PR? Remove the nested `async` block in the `NeedsData` arm of `PushDecoderStreamState::transition` and inline the IO (`get_byte_ranges`) and CPU (`push_ranges`) operations as separate statements. Since `transition` is already an `async fn`, the `.await` works directly in the loop body. Without the nested block, the compiler can split the borrows of `self.reader` and `self.decoder` into disjoint field borrows, keeping the borrow stack valid across the yield point. Also removes the now-unused `parquet::errors::ParquetError` import. ## Are these changes tested? Covered by existing parquet reader tests. The original violation was caught by Miri, which DataFusion does not currently run in CI. ## Are there any user-facing changes? No.
1 parent 7bfa3fb commit 8b47f45

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ use parquet::arrow::parquet_column;
7575
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
7676
use parquet::basic::Type;
7777
use parquet::bloom_filter::Sbbf;
78-
use parquet::errors::ParquetError;
7978
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
8079

8180
/// Stateless Parquet morselizer implementation.
@@ -1253,13 +1252,23 @@ impl PushDecoderStreamState {
12531252
loop {
12541253
match self.decoder.try_decode() {
12551254
Ok(DecodeResult::NeedsData(ranges)) => {
1256-
let fetch = async {
1257-
let data = self.reader.get_byte_ranges(ranges.clone()).await?;
1258-
self.decoder.push_ranges(ranges, data)?;
1259-
Ok::<_, ParquetError>(())
1260-
};
1261-
if let Err(e) = fetch.await {
1262-
return Some(Err(DataFusionError::from(e)));
1255+
// IO (get_byte_ranges) and CPU (push_ranges) are still
1256+
// decoupled — they just can't live in a nested async block
1257+
// because that captures `&mut self` as one opaque borrow,
1258+
// which violates Stacked Borrows across the yield point.
1259+
// Inlining lets the compiler split the disjoint field borrows.
1260+
let data = self
1261+
.reader
1262+
.get_byte_ranges(ranges.clone())
1263+
.await
1264+
.map_err(DataFusionError::from);
1265+
match data {
1266+
Ok(data) => {
1267+
if let Err(e) = self.decoder.push_ranges(ranges, data) {
1268+
return Some(Err(DataFusionError::from(e)));
1269+
}
1270+
}
1271+
Err(e) => return Some(Err(e)),
12631272
}
12641273
}
12651274
Ok(DecodeResult::Data(batch)) => {

0 commit comments

Comments
 (0)