21566: Parquet: Add scan-level cache for adapted pruning setup (projection/predicate) to reuse CPU-only work across same-schema files#310
Conversation
Add a scan-local ParquetPruningSetupCache in opener.rs to reuse adapted projection, adapted predicate, and row-group pruning predicate setup for same-schema files. Implement PhysicalExprAdapterFactory::supports_reusable_rewrites() in schema_rewriter.rs to allow default adapters while defaulting custom ones to opt-out. Connect the cache to ParquetMorselizer in source.rs and introduce a regression test to ensure the setup is reused correctly across same-schema files while keeping it conservative to avoid per-file literal replacements.
Simplify the pruning setup cache in opener.rs by removing the wrapper entry struct, deriving PartialEq/Eq, and returning cloned setup values instead of an extra Arc. Extract the cache-or-build branch into build_or_get_pruning_setup and avoid repeated literal_columns.is_empty() checks. Move returned pruning setup fields directly into prepared and simplify the test-only counting adapter and cache regression test loop. Tighten the supports_reusable_rewrites doc comment in schema_rewriter.rs without changing the public interface.
Refactor cache in opener.rs to utilize a HashMap, computing cold misses outside the mutex for better performance, with a re-check on insert. Add cache-boundary tests for non-reusable adapters and diverse physical schemas. Clarify documentation in schema_rewriter.rs for supports_reusable_rewrites() to specify the same logical/physical schema rewrite inputs.
Enhance opener.rs with cache-key rationale comments for clarity. Update ParquetPruningSetupCache to handle concurrent cold misses by implementing a per-entry pending/ready state. Clarify the public contract of supports_reusable_rewrites() in schema_rewriter.rs.
WalkthroughThis pull request introduces a pruning setup cache mechanism to the Parquet datasource, alongside configuration guidelines for AI agents. The cache reuses CPU-only pruning setup across multiple files sharing the same logical/physical schemas and scan-level projection/predicate identities. The ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Code ReviewThis PR introduces a scan-level cache ( Correctness Issues1. Blocking a tokio worker thread via
Since 2. Cache key uses pointer identity ( predicate_ptr: predicate.map(physical_expr_ptr),
projection_expr_ptrs: projection.iter().map(|expr| physical_expr_ptr(&expr.expr)).collect(),The per-scan scope (fresh 3. Race window in error path: entry removed after entry.ready.notify_all();
drop(state); // mutex released here
self.entries()?.remove(key); // new callers can find the Failed entry before this line runsAfter Design Observations4. Unbounded cache growth
5. Missing test for literal-column bypass The code correctly sets Minor Notes
SummaryThe core approach is sound. The two items most worth addressing before merge are: (1) the |
There was a problem hiding this comment.
Code Review
This pull request introduces a scan-local cache, ParquetPruningSetupCache, to reuse CPU-only pruning setups across Parquet files with identical physical schemas and expression inputs. It also adds rules for AI agents regarding PR linking and config file reviews. Several critical issues were identified in the implementation, including compilation errors due to SchemaRef not implementing Hash and DataFusionError not being clonable for the cache's error state. Additionally, the cache initialization in ParquetSource needs to be adjusted to ensure it is shared across partitions rather than being partition-local.
| Mutex<HashMap<ParquetPruningSetupCacheKey, Arc<ParquetPruningSetupCacheEntry>>>, | ||
| } | ||
|
|
||
| #[derive(Debug, Clone, PartialEq, Eq, Hash)] |
There was a problem hiding this comment.
The ParquetPruningSetupCacheKey struct derives Hash, but it contains logical_file_schema and physical_file_schema of type SchemaRef (which is Arc<Schema>). In the arrow-rs crate, Schema does not implement the Hash trait. Consequently, Arc<Schema> also does not implement Hash, and this derive will cause a compilation error. You should implement Hash manually for ParquetPruningSetupCacheKey, perhaps by using pointer equality for the schemas if they are expected to be stable, or by hashing a unique representation of the schema.
| entry.ready.notify_all(); | ||
| drop(state); | ||
| self.entries()?.remove(key); | ||
| Err(DataFusionError::from(&shared_error)) |
There was a problem hiding this comment.
The code attempts to return an error using DataFusionError::from(&shared_error), where shared_error is an Arc<DataFusionError>. However, DataFusionError does not implement Clone (as it contains non-clonable Box<dyn Error> variants) and there is no From<&DataFusionError> implementation. This will result in a compilation error. Consider storing a clonable error representation (like a string) in the cache or using a custom error type that can be cloned.
| return Ok(setup.clone()); | ||
| } | ||
| ParquetPruningSetupCacheEntryState::Failed(e) => { | ||
| return Err(DataFusionError::from(e)); |
There was a problem hiding this comment.
| encryption_factory: self.get_encryption_factory_with_config(), | ||
| max_predicate_cache_size: self.max_predicate_cache_size(), | ||
| reverse_row_groups: self.reverse_row_groups, | ||
| pruning_setup_cache: Arc::default(), |
There was a problem hiding this comment.
The pruning_setup_cache is initialized here with Arc::default() inside create_file_opener. Since create_file_opener is called once per partition, this cache is effectively partition-local rather than scan-level. To achieve true scan-level caching as intended by the PR title, the cache should be owned by ParquetSource (as an Arc) so that it is shared across all partitions of the same DataSourceExec instance.
| pruning_setup_cache: Arc::default(), | |
| pruning_setup_cache: Arc::clone(&self.pruning_setup_cache), |
🤖 Augment PR SummarySummary: This PR adds a scan-local cache for Parquet pruning setup (schema adaptation + pruning predicate construction) so CPU-only work can be reused across multiple files that share the same physical schema. Changes:
Technical Notes: Reuse is intentionally disabled when file-specific literal column replacement occurs, and when adapter factories don’t opt in to reusable rewrites. 🤖 Was this summary useful? React with 👍 or 👎 |
| }; | ||
|
|
||
| if should_compute { | ||
| let setup_result = make_setup(); |
There was a problem hiding this comment.
If make_setup() panics after inserting the Pending entry, the state never transitions and subsequent callers can block forever in the Condvar wait loop. Consider ensuring the cache entry is moved to a terminal state (or removed) even when the setup computation unwinds.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit a57b22e. Configure here.
| drop(state); | ||
| self.entries()?.remove(key); | ||
| Err(DataFusionError::from(&shared_error)) | ||
| } |
There was a problem hiding this comment.
Missing panic guard leaves Condvar waiters stuck forever
Medium Severity
In get_or_insert_with, make_setup() is called on line 284 before the entry's inner state lock is acquired. If make_setup() panics, notify_all() is never called, the entry state remains Pending, and the inner Mutex is not poisoned. Any threads blocked on entry.ready.wait(state) will wait indefinitely. A drop guard (or equivalent) that transitions the entry to Failed and calls notify_all() on unwind is needed to prevent this hang.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit a57b22e. Configure here.
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Blocking Condvar wait inside Future::poll risks thread starvation
Low Severity
Condvar::wait is a blocking call that can be reached from ParquetOpenFuture::poll → plan() → transition() → prepare_filters() → build_or_get_pruning_setup() → get_or_insert_with(). Using a blocking synchronization primitive inside Future::poll blocks the async executor's worker thread, which is a well-known anti-pattern in async Rust and can cause thread starvation under concurrent partition processing.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit a57b22e. Configure here.
|
/coderabbit review |
|
coderabbit review |
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
datafusion/datasource-parquet/src/source.rs (1)
545-575:⚠️ Potential issue | 🟠 MajorMake the cache scan-shared, not opener-local.
Line 573 allocates a fresh
ParquetPruningSetupCacheinsidecreate_file_opener, so reuse is limited to files handled by that one opener/execution partition. Same-schema files that land in different file groups will miss the cache entirely, which makes this optimization partition-local rather than scan-local. Hoist theArc<ParquetPruningSetupCache>onto a scan-scoped object and clone it into each opener instead.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion/datasource-parquet/src/source.rs` around lines 545 - 575, The ParquetPruningSetupCache is being created inside create_file_opener (as pruning_setup_cache: Arc::default() in the ParquetMorselizer) which makes the cache opener-local; move the Arc<ParquetPruningSetupCache> allocation to a scan-scoped owner (e.g., the struct that represents the whole parquet scan or the builder that constructs all openers), store it there, and clone that Arc into each ParquetOpener/ParquetMorselizer when constructing them (keep the field name pruning_setup_cache and populate it by cloning the scan-scoped Arc instead of Arc::default() inside create_file_opener).
🧹 Nitpick comments (1)
datafusion/datasource-parquet/src/opener.rs (1)
2327-2460: Add a truly concurrent cache-reuse test.These tests open
file1andfile2serially, so they only validate reuse after the first setup has already completed. They never hit the newPending/Condvarpath in the cache entry state machine, which is the riskiest part of this change. Please add a case that starts two same-schema opens concurrently and asserts a single adapter creation.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion/datasource-parquet/src/opener.rs` around lines 2327 - 2460, The tests currently open file1 and file2 serially so they never exercise the cache's Pending/Condvar concurrency path; add a new async test (e.g., test_pruning_setup_cache_reuses_adapter_concurrent) that uses the same setup as test_pruning_setup_cache_reuses_adapter but starts two opener.open(...) futures concurrently (spawn or join them) for the two PartitionedFile instances so they race to create the pruning setup and then await both; use the same CountingPhysicalExprAdapterFactory and assert create_count.load(Ordering::SeqCst) == 1 to ensure only one adapter is created, referencing ParquetOpenerBuilder, CountingPhysicalExprAdapterFactory, and opener.open to locate the code to change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@datafusion/datasource-parquet/src/source.rs`:
- Around line 545-575: The ParquetPruningSetupCache is being created inside
create_file_opener (as pruning_setup_cache: Arc::default() in the
ParquetMorselizer) which makes the cache opener-local; move the
Arc<ParquetPruningSetupCache> allocation to a scan-scoped owner (e.g., the
struct that represents the whole parquet scan or the builder that constructs all
openers), store it there, and clone that Arc into each
ParquetOpener/ParquetMorselizer when constructing them (keep the field name
pruning_setup_cache and populate it by cloning the scan-scoped Arc instead of
Arc::default() inside create_file_opener).
---
Nitpick comments:
In `@datafusion/datasource-parquet/src/opener.rs`:
- Around line 2327-2460: The tests currently open file1 and file2 serially so
they never exercise the cache's Pending/Condvar concurrency path; add a new
async test (e.g., test_pruning_setup_cache_reuses_adapter_concurrent) that uses
the same setup as test_pruning_setup_cache_reuses_adapter but starts two
opener.open(...) futures concurrently (spawn or join them) for the two
PartitionedFile instances so they race to create the pruning setup and then
await both; use the same CountingPhysicalExprAdapterFactory and assert
create_count.load(Ordering::SeqCst) == 1 to ensure only one adapter is created,
referencing ParquetOpenerBuilder, CountingPhysicalExprAdapterFactory, and
opener.open to locate the code to change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7125384f-371f-4ad2-8733-728e4024b176
📒 Files selected for processing (5)
.cursor/rules.mdAGENTS.mddatafusion/datasource-parquet/src/opener.rsdatafusion/datasource-parquet/src/source.rsdatafusion/physical-expr-adapter/src/schema_rewriter.rs


21566: To review by AI