Conversation
…o cs--register-arrow-ipc-stream-format-files
WalkthroughThe PR adds support for Arrow IPC stream format alongside existing file format support. It introduces runtime format detection, splits ArrowSource into specialized file and stream implementations, refactors schema inference to handle both formats, and adds comprehensive test coverage for both Arrow formats. Changes
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| Ok(true) => Arc::new(ArrowSource::new_file_source(table_schema)), | ||
| Ok(false) => Arc::new(ArrowSource::new_stream_file_source(table_schema)), | ||
| Err(e) => Err(e)?, | ||
| }; |
There was a problem hiding this comment.
Bug: Arrow: Mixed Formats Cause Scan Failures
The create_physical_plan method only checks the first file to determine format (file vs stream), then applies that same source type to all files in the scan. When multiple Arrow files are queried and they have mixed formats (some IPC file format, some IPC stream format), the wrong source type gets used for files after the first one, causing read errors. Each file should be checked individually or all files should be required to use the same format.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Bugbot AI reviewer is correct that there might be a mix of plain and stream files in the different file groups. The finding prevents wrong behavior when processing an object store location with mixed file types.
Code Review for PR #10: Support Arrow IPC Stream FilesOverviewThis PR adds support for reading Arrow IPC stream format files alongside the existing file format support. The implementation includes auto-detection, appropriate execution paths, and comprehensive test coverage. ✅ Strengths
|
| fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> { | ||
| if partitioned_file.range.is_some() { | ||
| return Err(exec_datafusion_err!( | ||
| "ArrowStreamOpener does not support range-based reading" |
There was a problem hiding this comment.
The error message references "ArrowStreamOpener" which doesn’t match the type name here and may confuse users; consider aligning the wording to describe the Arrow IPC stream format rather than the opener (applies to this message).
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:useful; category:documentation; feedback: The Augment AI reviewer is correct that the name of the opener is wrong in the error message. It should be ArrowStreamFileOpener instead. Prevents confusion when debugging issues caused by this error.
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
datafusion/datasource-arrow/src/source.rs (1)
102-114: Propagate batch_size/projection for Arrow IPC file sourceswith_batch_size and with_projection on ArrowFileSource / ArrowStreamFileSource currently ignore their parameters (datafusion/datasource-arrow/src/source.rs:102–114) and create_file_opener does not forward batch_size to the opener — unlike Parquet/CSV/JSON/Avro which store and pass batch_size to their openers/readers.
- Fix: add batch_size: Option to ArrowFileSource and ArrowStreamFileSource, set it in with_batch_size(), forward it into ArrowFileOpener/ArrowStreamFileOpener, and use it to configure the arrow::ipc reader builders so session/file-scan batch_size is honored.
🧹 Nitpick comments (3)
datafusion/datasource-arrow/src/file_format.rs (1)
185-219: Add documentation clarifying format homogeneity assumption.Format detection reads only the first file's magic bytes (lines 191-199) and applies that source type to all files in the group. While DataFusion file groups typically contain homogeneous files, this assumption should be documented in code. Consider adding a comment explaining that all files within a file group are expected to have the same Arrow format (IPC file or streaming), or add validation to detect and reject mixed formats at plan creation time.
datafusion/datasource-arrow/src/source.rs (2)
439-444: Consider making theinnerfield private.The public
innerfield at line 442 exposes implementation details and allows bypassing the wrapper. Unless direct access is required for a specific use case, consider making it private to maintain better encapsulation.Apply this diff if direct access is not required:
pub struct ArrowSource { - pub inner: Arc<dyn FileSource>, + pub(crate) inner: Arc<dyn FileSource>, }If external access is needed, consider providing a getter method instead:
impl ArrowSource { pub fn inner(&self) -> &Arc<dyn FileSource> { &self.inner } }
546-549: Consider making theinnerfield private.Similar to
ArrowSource, the publicinnerfield at line 548 exposes implementation details. Consider making it private orpub(crate)to maintain better encapsulation.Apply this diff if direct access is not required:
pub struct ArrowOpener { - pub inner: Arc<dyn FileOpener>, + pub(crate) inner: Arc<dyn FileOpener>, }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
datafusion/core/tests/execution/mod.rs(1 hunks)datafusion/core/tests/execution/register_arrow.rs(1 hunks)datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs(1 hunks)datafusion/datasource-arrow/src/file_format.rs(9 hunks)datafusion/datasource-arrow/src/mod.rs(1 hunks)datafusion/datasource-arrow/src/source.rs(7 hunks)datafusion/sqllogictest/test_files/arrow_files.slt(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs (1)
datafusion/datasource-arrow/src/source.rs (8)
new(65-72)new(157-164)new(447-449)new(559-561)table_schema(98-100)table_schema(252-254)table_schema(541-543)new_file_source(452-456)
datafusion/core/tests/execution/mod.rs (1)
datafusion/core/src/execution/context/mod.rs (1)
register_arrow(1545-1563)
datafusion/datasource-arrow/src/file_format.rs (1)
datafusion/datasource-arrow/src/source.rs (9)
table_schema(98-100)table_schema(252-254)table_schema(541-543)new(65-72)new(157-164)new(447-449)new(559-561)new_file_source(452-456)new_stream_file_source(459-463)
datafusion/datasource-arrow/src/source.rs (3)
datafusion/datasource/src/file.rs (13)
as_file_source(39-41)table_schema(66-66)create_file_opener(55-60)as_any(62-62)with_batch_size(68-68)with_statistics(72-72)statistics(80-80)with_projection(70-70)repartitioned(94-117)metrics(78-78)file_type(82-82)with_schema_adapter_factory(142-150)schema_adapter_factory(155-157)datafusion/datasource/src/file_scan_config.rs (10)
file(2121-2137)table_schema(309-311)as_any(516-518)with_batch_size(399-402)with_statistics(339-342)with_projection(319-321)repartitioned(556-570)metrics(646-648)open(495-514)projection(1806-1810)datafusion/datasource/src/table_schema.rs (1)
table_schema(169-171)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: codex
- GitHub Check: Analyze (rust)
🔇 Additional comments (17)
datafusion/core/tests/execution/mod.rs (1)
21-21: LGTM!The module declaration properly exposes the new register_arrow integration tests.
datafusion/sqllogictest/test_files/arrow_files.slt (1)
132-355: Excellent test coverage for Arrow IPC stream format!The tests comprehensively validate parity between file and stream formats, including:
- Schema inference and physical plans
- Projection pushdown and filtering
- Aggregations, joins, unions, and subqueries
- Partitioned tables with partition filters
This ensures both formats behave identically across all major query patterns.
datafusion/core/tests/execution/register_arrow.rs (2)
23-55: LGTM!The test properly validates that
register_arrowauto-detects both file and stream formats, and that both produce equivalent schemas and row counts.
57-90: LGTM!The test validates that joins between file-backed and stream-backed tables work correctly, confirming cross-format interoperability.
datafusion/datasource-arrow/src/mod.rs (1)
18-19: LGTM!The documentation comment clarifies the module's purpose.
datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs (1)
287-292: LGTM!The test has been correctly updated to use the new
new_file_sourceconstructor, and the comment has been updated to reflect that this testsArrowFileSourcespecifically.datafusion/datasource-arrow/src/file_format.rs (7)
25-25: LGTM!The new imports support file rewinding, stream format reading, and partial object reads for format detection.
Also applies to: 31-31, 65-67
144-173: LGTM with graceful fallback logic!The schema inference correctly handles both file and stream formats:
- For file payloads, it tries
FileReaderfirst, then falls back toStreamReaderwith proper file rewinding- For stream payloads, it uses the new
infer_stream_schemahelperThe fallback ensures compatibility with both Arrow IPC formats.
237-239: LGTM!The
file_sourcemethod correctly defaults to the file format source usingnew_file_source.
385-448: Robust schema inference with version compatibility!The
infer_stream_schemafunction properly handles multiple Arrow IPC versions:
- File format with
ARROW1magic (pre and post v0.15.0 with continuation marker)- Stream format with and without continuation marker
- Validates metadata length is non-negative (lines 425-430)
The implementation correctly parses the preamble, extracts metadata length, and constructs the schema.
450-477: LGTM!The helper function correctly extends a byte vector from a stream:
- Returns early if sufficient bytes are already available
- Accumulates bytes until the target length is reached
- Properly handles truncated streams with a clear error message
479-493: LGTM!The format detection efficiently reads only the first 6 bytes to check for the Arrow IPC file format magic number (
ARROW1). The length check on line 492 properly guards against short reads.
586-725: Excellent test coverage!The new tests comprehensively validate:
- Schema inference for both file and stream formats
- Different chunk sizes to verify streaming logic
- Truncated stream error handling
- Format detection for file, stream, corrupted, and empty files
The comment on line 722 noting that empty files "should error" is accurate since reading 6 bytes from an empty object would naturally fail.
datafusion/datasource-arrow/src/source.rs (4)
18-30: LGTM! Clear documentation on naming conventions.The documentation effectively clarifies the potentially confusing naming where "Stream" refers to the Arrow IPC stream format, not streaming I/O.
204-221: LGTM! Well-documented rationale for disabling repartitioning.The detailed explanation clearly justifies why stream format cannot support range-based parallel reading, avoiding duplicate I/O or the overhead of recreating footer information.
334-433: LGTM! Complex but well-structured range-based reading.The implementation properly handles footer parsing, dictionary loading, and range-based record batch filtering for parallel reading of Arrow IPC file format.
607-649: The test data files exist and the relative paths are correct.The review comment's concern is unfounded. Both
example.arrowandexample_stream.arrowexist atdatafusion/datasource-arrow/tests/data/, and the relative paths used in the test ("tests/data/{filename}") are correct for Rust crate test execution, wherecargo testruns with the crate root as the working directory.Likely an incorrect or invalid review comment.
| if partitioned_file.range.is_some() { | ||
| return Err(exec_datafusion_err!( | ||
| "ArrowStreamOpener does not support range-based reading" | ||
| )); | ||
| } |
There was a problem hiding this comment.
Fix error message to match struct name.
The error message refers to "ArrowStreamOpener" but the struct is named "ArrowStreamFileOpener".
Apply this diff:
- "ArrowStreamOpener does not support range-based reading"
+ "ArrowStreamFileOpener does not support range-based reading"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if partitioned_file.range.is_some() { | |
| return Err(exec_datafusion_err!( | |
| "ArrowStreamOpener does not support range-based reading" | |
| )); | |
| } | |
| if partitioned_file.range.is_some() { | |
| return Err(exec_datafusion_err!( | |
| "ArrowStreamFileOpener does not support range-based reading" | |
| )); | |
| } |
🤖 Prompt for AI Agents
In datafusion/datasource-arrow/src/source.rs around lines 265 to 269, the error
string incorrectly references "ArrowStreamOpener" instead of the struct name
"ArrowStreamFileOpener"; update the returned error message to exactly
"ArrowStreamFileOpener does not support range-based reading" so it matches the
struct name.
There was a problem hiding this comment.
value:useful; category:documentation; feedback: The CodeRabbit AI reviewer is correct that the name of the opener is wrong in the error message. It should be ArrowStreamFileOpener instead. Prevents confusion when debugging issues caused by this error.
There was a problem hiding this comment.
@martin-augment Thank you for confirming! You're absolutely right—having the correct struct name in the error message is important for debugging. When developers encounter this error, seeing "ArrowStreamFileOpener" will help them quickly identify the exact component that's raising the issue, making troubleshooting much more straightforward.
The fix ensures the error message accurately reflects the code structure.
value:annoying; category:bug; feedback: The Claude AI reviewer is wrong! There is no such code in file_format.rs! There is similar code in datasource-arrow/src/source.rs but it is not modified in this Pull Request. |
value:useful; category:bug; feedback: The Claude AI reviewer is correct that the handling of empty/smaller-than-6-bytes files is not good. Currently reading such files would fail with Error::NotModified (https://docs.rs/object_store/latest/object_store/struct.GetOptions.html#structfield.range) which is really confusing. |
value:good-but-wont-fix; category:bug; feedback: The Claude Ai reviewer is partially correct. The new logic reads the first 6 bytes only for the first file in the first file group to detect the type of all files in all groups. The latency is minimal but needed. |
value:good-to-have; category:bug; feedback: The Claude Ai reviewer is correct that there is no need of the spread operator here since all fields from the clone are used. There is no bug or regression - just more complicated code than needed. |
value:good-to-have; category:bug; feedback: The Claude Ai reviewer is correct that the error from the FileReader attempt is discarded and if the type of the file is not a stream then the wrong error is shown to the user. The finding prevents misleading errors. |
value:annoying; category:bug; feedback:The Codex AI reviewer is wrong! This file is not modified in this Pull Request. |
value:good-to-have; category:bug; feedback:The CodeRabbit AI reviewer is correct that the inner field should not be exposed to the public API. Prevents exposing internal implementation that would make it hard to be improved later because it may break users of the API. |
18457: To review by AI
Note
Add support for reading Arrow IPC stream files, auto-detecting file vs stream, and update execution sources, planning, and tests.
ArrowStreamFileSource/ArrowStreamFileOpenerfor Arrow IPC stream format (sequential-only), and keepArrowFileSource/ArrowFileOpenerfor file format (range-parallel).ArrowSource/ArrowOpenerwrappers to unify both formats.is_object_in_arrow_ipc_file_formatand select source inArrowFormat::create_physical_plan.infer_stream_schema) and mixed inputs; detect and fallback from file to stream.file_typeasarroworarrow_streamand support projection pushdown.arrow_files.slt.register_arrowto auto-detect and join file/stream.datasource-arrowandcore.ArrowSource::new_file_sourcenaming.Written by Cursor Bugbot for commit 2ddd690. This will update automatically on new commits. Configure here.