-
Notifications
You must be signed in to change notification settings - Fork 0
530: feat: seekable reader #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| The AI agents should never link to any issue or a pull request | ||
| in any GitHub repository in the code reviews! | ||
|
|
||
| The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| The AI agents should never link to any issue or a pull request | ||
| in any GitHub repository in the code reviews! | ||
|
|
||
| The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| The AI agents should never link to any issue or a pull request | ||
| in any GitHub repository in the code reviews! | ||
|
|
||
| The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| use std::{ | ||
| collections::HashMap, | ||
| io::{ErrorKind, Read}, | ||
| io::{ErrorKind, Read, Seek, SeekFrom}, | ||
| str::FromStr, | ||
| }; | ||
|
|
||
|
|
@@ -35,10 +35,57 @@ use crate::{ | |
| util, | ||
| }; | ||
|
|
||
| /// Byte offset and record count of a single Avro data block. | ||
| /// | ||
| /// Captured automatically as blocks are read during forward iteration. | ||
| /// Use with [`super::Reader::seek_to_block`] to jump back to a previously-read block. | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| pub struct BlockPosition { | ||
| /// Byte offset in the stream where this block starts (before the object-count varint). | ||
| pub offset: u64, | ||
| /// Total number of records in this block. | ||
| pub message_count: usize, | ||
| } | ||
|
|
||
| /// Wraps an inner reader and tracks the current byte position. | ||
| /// | ||
| /// Avoids requiring `Seek` just to know how many bytes have been consumed. | ||
| /// When the inner reader also implements `Seek`, seeking updates the tracked position. | ||
| #[derive(Debug, Clone)] | ||
| struct PositionTracker<R> { | ||
| inner: R, | ||
| pos: u64, | ||
| } | ||
|
|
||
| impl<R> PositionTracker<R> { | ||
| fn new(inner: R) -> Self { | ||
| Self { inner, pos: 0 } | ||
| } | ||
|
Comment on lines
+61
to
+63
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Track offsets from the reader’s initial seek position, not hard-coded zero.
🤖 Prompt for AI Agents |
||
|
|
||
| fn position(&self) -> u64 { | ||
| self.pos | ||
| } | ||
| } | ||
|
|
||
| impl<R: Read> Read for PositionTracker<R> { | ||
| fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { | ||
| let n = self.inner.read(buf)?; | ||
| self.pos += n as u64; | ||
| Ok(n) | ||
| } | ||
| } | ||
|
Comment on lines
+70
to
+76
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
|
||
| impl<R: Seek> Seek for PositionTracker<R> { | ||
| fn seek(&mut self, from: SeekFrom) -> std::io::Result<u64> { | ||
| self.pos = self.inner.seek(from)?; | ||
| Ok(self.pos) | ||
| } | ||
| } | ||
|
|
||
| /// Internal Block reader. | ||
| #[derive(Debug, Clone)] | ||
| pub(super) struct Block<'r, R> { | ||
| reader: R, | ||
| reader: PositionTracker<R>, | ||
| /// Internal buffering to reduce allocation. | ||
| buf: Vec<u8>, | ||
| buf_idx: usize, | ||
|
|
@@ -51,6 +98,10 @@ pub(super) struct Block<'r, R> { | |
| pub(super) user_metadata: HashMap<String, Vec<u8>>, | ||
| names_refs: Names, | ||
| human_readable: bool, | ||
| /// Byte offset where data blocks begin (right after header and sync marker). | ||
| pub(super) data_start: u64, | ||
| /// Position and record count of the currently loaded block. | ||
| pub(super) current_block_info: Option<BlockPosition>, | ||
| } | ||
|
|
||
| impl<'r, R: Read> Block<'r, R> { | ||
|
|
@@ -60,7 +111,7 @@ impl<'r, R: Read> Block<'r, R> { | |
| human_readable: bool, | ||
| ) -> AvroResult<Block<'r, R>> { | ||
| let mut block = Block { | ||
| reader, | ||
| reader: PositionTracker::new(reader), | ||
| codec: Codec::Null, | ||
| writer_schema: Schema::Null, | ||
| schemata, | ||
|
|
@@ -71,9 +122,12 @@ impl<'r, R: Read> Block<'r, R> { | |
| user_metadata: Default::default(), | ||
| names_refs: Default::default(), | ||
| human_readable, | ||
| data_start: 0, | ||
| current_block_info: None, | ||
| }; | ||
|
|
||
| block.read_header()?; | ||
| block.data_start = block.reader.position(); | ||
| Ok(block) | ||
| } | ||
|
|
||
|
|
@@ -142,6 +196,7 @@ impl<'r, R: Read> Block<'r, R> { | |
| /// the block. The objects are stored in an internal buffer to the `Reader`. | ||
| fn read_block_next(&mut self) -> AvroResult<()> { | ||
| assert!(self.is_empty(), "Expected self to be empty!"); | ||
| let block_start = self.reader.position(); | ||
| match util::read_long(&mut self.reader).map_err(Error::into_details) { | ||
| Ok(block_len) => { | ||
| self.message_count = block_len as usize; | ||
|
|
@@ -156,6 +211,11 @@ impl<'r, R: Read> Block<'r, R> { | |
| return Err(Details::GetBlockMarker.into()); | ||
| } | ||
|
|
||
| self.current_block_info = Some(BlockPosition { | ||
| offset: block_start, | ||
| message_count: block_len as usize, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Severity: medium Other Locations
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage. |
||
| }); | ||
|
|
||
| // NOTE (JAB): This doesn't fit this Reader pattern very well. | ||
| // `self.buf` is a growable buffer that is reused as the reader is iterated. | ||
| // For non `Codec::Null` variants, `decompress` will allocate a new `Vec` | ||
|
|
@@ -295,6 +355,36 @@ impl<'r, R: Read> Block<'r, R> { | |
| } | ||
| } | ||
|
|
||
| impl<R: Read + Seek> Block<'_, R> { | ||
| /// Seek the underlying stream to `offset` and read the block there. | ||
| /// Validates the sync marker to confirm it's a real block boundary. | ||
| /// Returns an error if no valid block can be read at the offset | ||
| /// (e.g., the offset is at or past EOF). | ||
| pub(super) fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { | ||
| self.reader | ||
| .seek(SeekFrom::Start(offset)) | ||
| .map_err(Details::SeekToBlock)?; | ||
|
|
||
| self.buf.clear(); | ||
| self.buf_idx = 0; | ||
| self.message_count = 0; | ||
| self.current_block_info = None; | ||
|
|
||
| // read_block_next treats UnexpectedEof as a clean end-of-stream | ||
| // (returns Ok with message_count=0). That's correct for forward | ||
| // iteration but wrong here — the caller asked for a specific block. | ||
| self.read_block_next()?; | ||
| if self.is_empty() { | ||
| return Err(Details::SeekToBlock(std::io::Error::new( | ||
| std::io::ErrorKind::UnexpectedEof, | ||
| format!("no block at offset {offset}"), | ||
| )) | ||
| .into()); | ||
| } | ||
|
Comment on lines
+377
to
+383
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The check |
||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> { | ||
| let result = metadata | ||
| .get("avro.codec") | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -21,9 +21,14 @@ mod block; | |||||||||||||||||||||||||||||||||
| pub mod datum; | ||||||||||||||||||||||||||||||||||
| pub mod single_object; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| use std::{collections::HashMap, io::Read, marker::PhantomData}; | ||||||||||||||||||||||||||||||||||
| use std::{ | ||||||||||||||||||||||||||||||||||
| collections::HashMap, | ||||||||||||||||||||||||||||||||||
| io::{Read, Seek}, | ||||||||||||||||||||||||||||||||||
| marker::PhantomData, | ||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| use block::Block; | ||||||||||||||||||||||||||||||||||
| pub use block::BlockPosition; | ||||||||||||||||||||||||||||||||||
| use bon::bon; | ||||||||||||||||||||||||||||||||||
| use serde::de::DeserializeOwned; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
@@ -159,6 +164,41 @@ impl<R: Read> Iterator for Reader<'_, R> { | |||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| impl<R: Read> Reader<'_, R> { | ||||||||||||||||||||||||||||||||||
| /// The currently loaded block's position and record count. | ||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||
| /// Returns `None` only before the first block is loaded (via iteration or | ||||||||||||||||||||||||||||||||||
| /// [`seek_to_block`](Self::seek_to_block)). Always `Some` afterward. | ||||||||||||||||||||||||||||||||||
| pub fn current_block(&self) -> Option<BlockPosition> { | ||||||||||||||||||||||||||||||||||
| self.block.current_block_info | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Byte offset where data blocks begin (right after the file header). | ||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||
| /// This is the offset of the first data block — equivalent to the position | ||||||||||||||||||||||||||||||||||
| /// that would be returned by `current_block().offset` for block 0. | ||||||||||||||||||||||||||||||||||
| pub fn data_start(&self) -> u64 { | ||||||||||||||||||||||||||||||||||
| self.block.data_start | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| impl<R: Read + Seek> Reader<'_, R> { | ||||||||||||||||||||||||||||||||||
| /// Seek to the data block at the given byte offset and load it. | ||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||
| /// The offset must point to the start of a valid data block (before its | ||||||||||||||||||||||||||||||||||
| /// object-count varint). The block is read, decompressed, and its sync | ||||||||||||||||||||||||||||||||||
| /// marker is validated against the file header. After this call, [`Iterator::next`] | ||||||||||||||||||||||||||||||||||
| /// yields the first record in that block. | ||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||
| /// Typically the caller saves offsets from [`current_block`](Self::current_block) | ||||||||||||||||||||||||||||||||||
| /// during forward iteration and later passes them here to jump back. | ||||||||||||||||||||||||||||||||||
| pub fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { | ||||||||||||||||||||||||||||||||||
| self.block.seek_to_block(offset)?; | ||||||||||||||||||||||||||||||||||
| self.errored = false; | ||||||||||||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+195
to
+198
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mark the iterator errored when On the error path, 💡 Suggested change pub fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> {
- self.block.seek_to_block(offset)?;
- self.errored = false;
- Ok(())
+ match self.block.seek_to_block(offset) {
+ Ok(()) => {
+ self.errored = false;
+ Ok(())
+ }
+ Err(err) => {
+ self.errored = true;
+ Err(err)
+ }
+ }
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Failed seek leaves reader in inconsistent stateMedium Severity When Additional Locations (1)Reviewed by Cursor Bugbot for commit 725876f. Configure here. |
||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Wrapper around [`Reader`] where the iterator deserializes `T`. | ||||||||||||||||||||||||||||||||||
| pub struct ReaderDeser<'a, R, T> { | ||||||||||||||||||||||||||||||||||
| inner: Reader<'a, R>, | ||||||||||||||||||||||||||||||||||
|
|
@@ -366,4 +406,111 @@ mod tests { | |||||||||||||||||||||||||||||||||
| panic!("Expected an error in the reading of the codec!"); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Write an Avro file with multiple blocks and verify we can seek between them. | ||||||||||||||||||||||||||||||||||
| #[test] | ||||||||||||||||||||||||||||||||||
| fn test_seek_to_block() -> TestResult { | ||||||||||||||||||||||||||||||||||
| use crate::writer::Writer; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| let schema = Schema::parse_str(SCHEMA)?; | ||||||||||||||||||||||||||||||||||
| let mut writer = Writer::new(&schema, Vec::new())?; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Block 0: records with a=10, a=20 | ||||||||||||||||||||||||||||||||||
| let mut r = Record::new(&schema).unwrap(); | ||||||||||||||||||||||||||||||||||
| r.put("a", 10i64); | ||||||||||||||||||||||||||||||||||
| r.put("b", "b0_r0"); | ||||||||||||||||||||||||||||||||||
| writer.append_value(r)?; | ||||||||||||||||||||||||||||||||||
| let mut r = Record::new(&schema).unwrap(); | ||||||||||||||||||||||||||||||||||
| r.put("a", 20i64); | ||||||||||||||||||||||||||||||||||
| r.put("b", "b0_r1"); | ||||||||||||||||||||||||||||||||||
| writer.append_value(r)?; | ||||||||||||||||||||||||||||||||||
| writer.flush()?; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Block 1: records with a=30, a=40 | ||||||||||||||||||||||||||||||||||
| let mut r = Record::new(&schema).unwrap(); | ||||||||||||||||||||||||||||||||||
| r.put("a", 30i64); | ||||||||||||||||||||||||||||||||||
| r.put("b", "b1_r0"); | ||||||||||||||||||||||||||||||||||
| writer.append_value(r)?; | ||||||||||||||||||||||||||||||||||
| let mut r = Record::new(&schema).unwrap(); | ||||||||||||||||||||||||||||||||||
| r.put("a", 40i64); | ||||||||||||||||||||||||||||||||||
| r.put("b", "b1_r1"); | ||||||||||||||||||||||||||||||||||
| writer.append_value(r)?; | ||||||||||||||||||||||||||||||||||
| writer.flush()?; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Block 2: records with a=50 | ||||||||||||||||||||||||||||||||||
| let mut r = Record::new(&schema).unwrap(); | ||||||||||||||||||||||||||||||||||
| r.put("a", 50i64); | ||||||||||||||||||||||||||||||||||
| r.put("b", "b2_r0"); | ||||||||||||||||||||||||||||||||||
| writer.append_value(r)?; | ||||||||||||||||||||||||||||||||||
| writer.flush()?; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| let data = writer.into_inner()?; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Read forward and collect block positions | ||||||||||||||||||||||||||||||||||
| let mut reader = Reader::new(Cursor::new(&data))?; | ||||||||||||||||||||||||||||||||||
| let mut block_offsets: Vec<BlockPosition> = Vec::new(); | ||||||||||||||||||||||||||||||||||
| let mut all_values: Vec<Value> = Vec::new(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| assert!(reader.current_block().is_none()); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| while let Some(value) = reader.next() { | ||||||||||||||||||||||||||||||||||
| all_values.push(value?); | ||||||||||||||||||||||||||||||||||
| let pos = reader.current_block().expect("block info after read"); | ||||||||||||||||||||||||||||||||||
| if block_offsets | ||||||||||||||||||||||||||||||||||
| .last() | ||||||||||||||||||||||||||||||||||
| .is_none_or(|last| last.offset != pos.offset) | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| block_offsets.push(pos); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| assert_eq!(all_values.len(), 5); | ||||||||||||||||||||||||||||||||||
| assert_eq!(block_offsets.len(), 3); | ||||||||||||||||||||||||||||||||||
| assert_eq!(block_offsets[0].message_count, 2); | ||||||||||||||||||||||||||||||||||
| assert_eq!(block_offsets[1].message_count, 2); | ||||||||||||||||||||||||||||||||||
| assert_eq!(block_offsets[2].message_count, 1); | ||||||||||||||||||||||||||||||||||
| assert_eq!(reader.data_start(), block_offsets[0].offset); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Seek back to block 1 and read its records | ||||||||||||||||||||||||||||||||||
| reader.seek_to_block(block_offsets[1].offset)?; | ||||||||||||||||||||||||||||||||||
| let v1 = reader.next().unwrap()?; | ||||||||||||||||||||||||||||||||||
| assert_eq!(v1, all_values[2]); | ||||||||||||||||||||||||||||||||||
| let v2 = reader.next().unwrap()?; | ||||||||||||||||||||||||||||||||||
| assert_eq!(v2, all_values[3]); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Seek back to block 0 | ||||||||||||||||||||||||||||||||||
| reader.seek_to_block(block_offsets[0].offset)?; | ||||||||||||||||||||||||||||||||||
| let v0 = reader.next().unwrap()?; | ||||||||||||||||||||||||||||||||||
| assert_eq!(v0, all_values[0]); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Seek to block 2 | ||||||||||||||||||||||||||||||||||
| reader.seek_to_block(block_offsets[2].offset)?; | ||||||||||||||||||||||||||||||||||
| let v4 = reader.next().unwrap()?; | ||||||||||||||||||||||||||||||||||
| assert_eq!(v4, all_values[4]); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| assert!(reader.next().is_none()); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Seeking to an invalid offset should fail with a sync marker error. | ||||||||||||||||||||||||||||||||||
| #[test] | ||||||||||||||||||||||||||||||||||
| fn test_seek_to_invalid_offset() -> TestResult { | ||||||||||||||||||||||||||||||||||
| use crate::writer::Writer; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| let schema = Schema::parse_str(SCHEMA)?; | ||||||||||||||||||||||||||||||||||
| let mut writer = Writer::new(&schema, Vec::new())?; | ||||||||||||||||||||||||||||||||||
| let mut r = Record::new(&schema).unwrap(); | ||||||||||||||||||||||||||||||||||
| r.put("a", 1i64); | ||||||||||||||||||||||||||||||||||
| r.put("b", "x"); | ||||||||||||||||||||||||||||||||||
| writer.append_value(r)?; | ||||||||||||||||||||||||||||||||||
| writer.flush()?; | ||||||||||||||||||||||||||||||||||
| let data = writer.into_inner()?; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| let mut reader = Reader::new(Cursor::new(&data))?; | ||||||||||||||||||||||||||||||||||
| let result = reader.seek_to_block(7); | ||||||||||||||||||||||||||||||||||
| assert!(result.is_err()); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avro/src/reader/block.rs:62—PositionTrackeralways initializesposto 0, so if aRead + Seekinput is already positioned at a non-zero offset whenReader::newis called, the offsets reported bycurrent_block()/data_start()won’t match whatseek_to_block()expects withSeekFrom::Start. Consider documenting the “must start at stream position 0” precondition or ensuring tracked offsets are absolute for seekable inputs.Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.