Skip to content

18688: feat: allow custom caching via logical node#26

Open
martin-augment wants to merge 4 commits intomainfrom
pr-18688-2025-11-14-07-18-52
Open

18688: feat: allow custom caching via logical node#26
martin-augment wants to merge 4 commits intomainfrom
pr-18688-2025-11-14-07-18-52

Conversation

@martin-augment
Copy link
Copy Markdown
Owner

18688: To review by AI

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Nov 14, 2025

Walkthrough

The changes introduce an extensible caching mechanism to DataFusion. A new CacheProducer trait is defined, allowing custom cache implementations through a pluggable interface. The SessionContext and SessionState are extended to store and manage a CacheProducer. The DataFrame::cache method is modified to use the registered cache producer when available, falling back to the original MemTable materialization behavior. Test utilities and a corresponding test are added to validate this caching capability with custom logical nodes.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-18688-2025-11-14-07-18-52

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown

claude bot commented Nov 14, 2025

Code Review: Custom Caching via Logical Node

Thank you for this contribution! This PR adds a flexible extension point for custom caching strategies. Here's my detailed review:

✅ Strengths

  1. Well-designed extension point: The CacheProducer trait provides a clean interface for custom caching implementations
  2. Backward compatibility: Falls back to the existing MemTable-based implementation when no custom producer is registered
  3. Consistent API pattern: Follows DataFusion's existing patterns for extensibility (similar to FunctionFactory, TableProviderFactory)
  4. Good test coverage: Includes a test demonstrating the custom caching functionality

🔍 Potential Issues

1. Missing Physical Plan Implementation (Critical)

The test only validates the logical plan structure but doesn't verify that the custom cache node can be executed. Important concern: Users implementing CacheProducer will need to also register an ExtensionPlanner to convert their custom logical node to a physical plan, but this requirement is not documented.

Recommendation: Add documentation explaining that users must:

  • Implement CacheProducer to create the logical node
  • Implement ExtensionPlanner to convert it to a physical plan
  • Register the ExtensionPlanner via SessionState

Example needed in docs:

// Users need both:
ctx.with_cache_producer(Arc::new(MyCacheProducer {}))
   .with_physical_planner(/* custom planner with ExtensionPlanner */)

2. Test Coverage Gaps

  • No execution test: The test uses into_optimized_plan() but doesn't execute the query via .collect().await?. This means the physical planning path is untested
  • No error handling test: What happens if cache_producer.create() returns an error?
  • No schema validation: Should verify the cached node preserves the original schema

Recommendation: Add an integration test that:

let result = cached_df.collect().await?;
// Verify results match original

3. Documentation Gaps

The CacheProducer trait has minimal documentation:

  • Missing usage examples
  • No explanation of when/how the custom node will be planned
  • No guidance on what the custom logical node should do

Recommendation: Expand the doc comment:

/// Interface for applying a custom caching strategy.
/// 
/// # Overview
/// Implement this trait to provide custom caching behavior for DataFrames.
/// When registered via `SessionContext::with_cache_producer`, the `DataFrame::cache()`
/// method will create your custom logical node instead of materializing to a MemTable.
///
/// # Requirements
/// Your implementation must also provide an `ExtensionPlanner` that converts
/// your custom logical node to a physical plan. Register it via:
/// `SessionStateBuilder::with_extension_planner()`
///
/// # Example
/// See `datafusion/core/tests/user_defined/user_defined_plan.rs` for a complete
/// example of implementing custom logical nodes with physical planning.

4. Minor Code Quality Issues

In test_util/mod.rs:

  • Line 322: The assertion message could be more descriptive
  • Line 335: Missing documentation on the test helper function
  • The TestCacheProducer and CacheNode are not feature-gated, but they're test utilities

Recommendation:

/// Create a test table with a custom cache producer for testing cache behavior.
/// 
/// Returns a DataFrame from the aggregate_test_100 table with a TestCacheProducer
/// that creates CacheNode instances when `cache()` is called.
pub async fn test_table_with_cache_producer() -> Result<DataFrame> {

🎯 Performance Considerations

  1. No performance impact on default path: The Option check adds negligible overhead
  2. Custom implementations: Performance depends entirely on user implementation - this is acceptable for an extension point
  3. Memory: The default MemTable path is unchanged, so no memory impact

🔒 Security Concerns

No security issues identified. The trait is properly bounded with Send + Sync and follows Rust's safety guarantees.

📝 API Consistency

  1. Builder pattern: Correctly follows the builder pattern in SessionStateBuilder
  2. Naming: CacheProducer is consistent with other factory traits (FunctionFactory, TableProviderFactory)
  3. Trait bounds: Appropriate bounds (Debug + Sync + Send)

🧪 Test Improvements Needed

Add these test cases:

  1. End-to-end execution test with custom cache node
  2. Error handling when create() fails
  3. Verify behavior when cache is called multiple times
  4. Test with physical planner integration

📋 Summary

This is a solid foundation for custom caching! The main concerns are:

Must address:

  • Document the requirement for ExtensionPlanner
  • Add end-to-end execution test

Should address:

  • Expand trait documentation with examples
  • Add error handling tests
  • Document test utilities

Nice to have:

  • Example in datafusion-examples directory
  • Mention in user guide documentation

The API design is sound and follows DataFusion patterns well. With better documentation and test coverage, this will be a valuable addition to DataFusion's extensibility.


Review generated with Claude Code

Copy link
Copy Markdown

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. No suggestions at this time.

Comment augment review to trigger a new review at any time.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
datafusion/core/src/dataframe/mod.rs (1)

2347-2365: Cache producer path looks correct; consider clarifying cache semantics in docs

The new cache_producer branch cleanly builds an Extension plan node and preserves the existing MemTable materialization behavior in the else branch, so existing users without a cache producer should see identical behavior.

For sessions with a cache producer, cache() now returns a logical extension node without eagerly materializing into memory; the actual caching semantics are delegated to the custom node. This is reasonable, but the doc comment (“Cache DataFrame as a memory table.”) is now only accurate for the fallback path.

I suggest adjusting the doc comment to something like “Cache this DataFrame using the configured cache producer, falling back to an in-memory table when none is set” to avoid confusion for users wiring in custom caching.

datafusion/core/src/test_util/mod.rs (1)

28-28: Test cache node/producer wiring looks good; consider deriving Clone on CacheNode

The added imports, CacheNode implementation, TestCacheProducer, and test_table_with_cache_producer helper cleanly exercise the new cache producer pathway:

  • CacheNode correctly wraps a single LogicalPlan input, delegates schema() to the input, exposes no extra expressions, and formats succinctly for EXPLAIN.
  • with_exprs_and_inputs enforcing exactly one input and rebuilding from that input is appropriate for this simple wrapper node.
  • TestCacheProducer returns a CacheNode wrapped in Arc<dyn UserDefinedLogicalNode>, and test_table_with_cache_producer wires it into a SessionContext in a minimal, targeted way for tests.

One small improvement: many user-defined logical nodes in DataFusion also derive Clone so they satisfy any blanket trait implementations and can be easily duplicated in tests. Adding Clone to CacheNode’s derive list would align with that pattern and future‑proof the test node:

#[derive(Hash, Eq, PartialEq, PartialOrd, Debug, Clone)]
struct CacheNode {
    input: LogicalPlan,
}

Functionally everything else here looks solid for exercising the custom cache logical node path.

Also applies to: 40-41, 49-53, 291-336, 338-344

datafusion/core/tests/dataframe/mod.rs (1)

2341-2362: Suggest collecting and verifying results for more comprehensive testing.

The test correctly verifies that the optimized plan contains a CacheNode, but it doesn't verify the actual execution behavior. The existing cache_test (lines 2308-2339) both checks the plan structure AND collects/verifies the cached results. Consider adding result collection and verification to ensure the custom caching mechanism works correctly end-to-end:

let df_results = df.collect().await?;
let cached_df_results = cached_df.collect().await?;
assert_eq!(&df_results, &cached_df_results);

This would provide stronger validation that the cache producer integration functions correctly.

datafusion/core/src/execution/context/mod.rs (1)

1893-1900: Trait design is solid; consider enhancing documentation.

The CacheProducer trait follows a clean, focused design pattern. The single-method interface with appropriate trait bounds (Debug + Sync + Send) is well-suited for a pluggable caching strategy.

Consider expanding the documentation to include:

  • When and how the create method is invoked (e.g., "called by DataFrame::cache when a cache producer is registered")
  • Expected behavior of the returned UserDefinedLogicalNode
  • Example implementation reference (e.g., pointing to test utilities)

Example enhancement:

 /// Interface for applying a custom caching strategy.
-/// Implement this trait and register via [`SessionState`]
-/// to create a custom logical node for caching.
+/// 
+/// Implement this trait and register via [`SessionContext::with_cache_producer`]
+/// or [`SessionState::set_cache_producer`] to provide a custom caching implementation.
+/// 
+/// When [`DataFrame::cache`] is called, the registered producer will be invoked
+/// to create a custom logical node that wraps the DataFrame's plan.
+/// 
+/// # Example
+/// See `test_util::TestCacheProducer` for a reference implementation.
 pub trait CacheProducer: Debug + Sync + Send {
     /// Create a custom logical node for caching
     /// given a logical plan (of DF to cache).
     fn create(&self, plan: LogicalPlan) -> Result<Arc<dyn UserDefinedLogicalNode>>;
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 320cfb8 and b17e5bb.

📒 Files selected for processing (5)
  • datafusion/core/src/dataframe/mod.rs (2 hunks)
  • datafusion/core/src/execution/context/mod.rs (2 hunks)
  • datafusion/core/src/execution/session_state.rs (12 hunks)
  • datafusion/core/src/test_util/mod.rs (4 hunks)
  • datafusion/core/tests/dataframe/mod.rs (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
datafusion/core/src/test_util/mod.rs (1)
datafusion/core/src/execution/context/mod.rs (2)
  • create (1873-1877)
  • create (1899-1899)
datafusion/core/src/execution/context/mod.rs (2)
datafusion/core/src/execution/session_state.rs (3)
  • with_cache_producer (1339-1345)
  • cache_producer (368-370)
  • cache_producer (1652-1654)
datafusion/core/src/test_util/mod.rs (2)
  • create (186-195)
  • create (333-335)
datafusion/core/src/execution/session_state.rs (1)
datafusion/core/src/execution/context/mod.rs (1)
  • with_cache_producer (472-475)
datafusion/core/src/dataframe/mod.rs (3)
datafusion/core/src/execution/session_state.rs (7)
  • cache_producer (368-370)
  • cache_producer (1652-1654)
  • task_ctx (288-290)
  • task_ctx (833-835)
  • new (973-1002)
  • new (2052-2054)
  • new (2353-2361)
datafusion/core/src/execution/context/mod.rs (6)
  • new_with_state (355-361)
  • schema (313-313)
  • task_ctx (1698-1700)
  • new (298-300)
  • new (1954-1956)
  • new (1988-1990)
datafusion/expr/src/logical_plan/plan.rs (4)
  • schema (323-359)
  • new (4502-4504)
  • new (5127-5131)
  • new (5193-5197)
datafusion/core/tests/dataframe/mod.rs (2)
datafusion/core/src/test_util/mod.rs (1)
  • test_table_with_cache_producer (339-344)
datafusion/core/src/dataframe/mod.rs (1)
  • into_optimized_plan (1655-1658)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Analyze (rust)
  • GitHub Check: claude-review
🔇 Additional comments (5)
datafusion/core/src/dataframe/mod.rs (1)

59-59: Import of Extension is appropriate and localized

The new use datafusion_expr::Extension; is needed for the LogicalPlan::Extension construction in cache and does not affect other code paths. Looks good.

datafusion/core/tests/dataframe/mod.rs (1)

64-64: LGTM: Import added correctly.

The import of test_table_with_cache_producer is properly placed and maintains the existing order in the test_util imports.

datafusion/core/src/execution/context/mod.rs (1)

471-475: LGTM! Clean builder pattern integration.

The implementation correctly follows DataFusion's builder pattern for registering pluggable components, consistent with with_function_factory and other similar methods.

datafusion/core/src/execution/session_state.rs (2)

190-190: LGTM! SessionState integration follows established patterns.

The cache_producer field and accessor methods are implemented consistently with the existing function_factory pattern in this file. The Option<Arc<dyn CacheProducer>> type correctly models an optional, shared component.

Also applies to: 362-370


958-958: LGTM! Complete and consistent SessionStateBuilder integration.

The builder integration comprehensively handles the cache_producer field across all necessary code paths:

  • Field declaration and initialization
  • Builder method with_cache_producer accepting Option for flexibility
  • Propagation in new_from_existing to preserve state when cloning
  • Correct threading through the build() method
  • Mutable accessor for advanced use cases
  • Debug formatting

All changes follow the established patterns in this file (comparable to function_factory integration).

Also applies to: 996-996, 1049-1049, 1338-1345, 1410-1410, 1447-1447, 1651-1654, 1694-1694

@martin-augment
Copy link
Copy Markdown
Owner Author

2. Test Coverage Gaps

  • No execution test: The test uses into_optimized_plan() but doesn't execute the query via .collect().await?. This means the physical planning path is untested
  • No error handling test: What happens if cache_producer.create() returns an error?
  • No schema validation: Should verify the cached node preserves the original schema

Recommendation: Add an integration test that:

let result = cached_df.collect().await?;
// Verify results match original

value:good-to-have; category:bug; feedback:The Claude AI reviewer is correct that the physical execution is not currently tested. Adding a test that the non-cached DataFrame returns the same results as the cached one will make the test complete.

@martin-augment
Copy link
Copy Markdown
Owner Author

Suggest collecting and verifying results for more comprehensive testing.

The test correctly verifies that the optimized plan contains a CacheNode, but it doesn't verify the actual execution behavior. The existing cache_test (lines 2308-2339) both checks the plan structure AND collects/verifies the cached results. Consider adding result collection and verification to ensure the custom caching mechanism works correctly end-to-end:

let df_results = df.collect().await?;
let cached_df_results = cached_df.collect().await?;
assert_eq!(&df_results, &cached_df_results);

This would provide stronger validation that the cache producer integration functions correctly.

value:good-to-have; category:bug; feedback:The CodeRabbit AI reviewer is correct that the physical execution is not currently tested. Adding a test that the non-cached DataFrame returns the same results as the cached one will make the test complete.

@martin-augment
Copy link
Copy Markdown
Owner Author

2347-2365: Cache producer path looks correct; consider clarifying cache semantics in docs

The new cache_producer branch cleanly builds an Extension plan node and preserves the existing MemTable materialization behavior in the else branch, so existing users without a cache producer should see identical behavior.

For sessions with a cache producer, cache() now returns a logical extension node without eagerly materializing into memory; the actual caching semantics are delegated to the custom node. This is reasonable, but the doc comment (“Cache DataFrame as a memory table.”) is now only accurate for the fallback path.

I suggest adjusting the doc comment to something like “Cache this DataFrame using the configured cache producer, falling back to an in-memory table when none is set” to avoid confusion for users wiring in custom caching.

value:good-to-have; category:bug; feedback:The CodeRabbit AI reviewer is correct that the docstring of the DataFrame::cache() method needs to be updated to explain that a custom CacheProducer could be used instead of the eagerly materializing MemTable implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants