From 731cf9b9dad1852be774ec19f7063cec6fec5861 Mon Sep 17 00:00:00 2001 From: Igor Artamonov Date: Mon, 6 Apr 2026 00:51:17 +0100 Subject: [PATCH 1/3] feat: seekable reader --- avro/src/error.rs | 3 + avro/src/lib.rs | 2 +- avro/src/reader/block.rs | 83 +++++++++++++++++++++- avro/src/reader/mod.rs | 147 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 230 insertions(+), 5 deletions(-) diff --git a/avro/src/error.rs b/avro/src/error.rs index 425bf4fb..3e9e82e0 100644 --- a/avro/src/error.rs +++ b/avro/src/error.rs @@ -534,6 +534,9 @@ pub enum Details { #[error("Failed to read block marker bytes: {0}")] ReadBlockMarker(#[source] std::io::Error), + #[error("Failed to seek to block: {0}")] + SeekToBlock(#[source] std::io::Error), + #[error("Read into buffer failed: {0}")] ReadIntoBuf(#[source] std::io::Error), diff --git a/avro/src/lib.rs b/avro/src/lib.rs index 3f509897..586c6687 100644 --- a/avro/src/lib.rs +++ b/avro/src/lib.rs @@ -99,7 +99,7 @@ pub use error::Error; reason = "Still need to export it until we remove it completely" )] pub use reader::{ - Reader, + BlockPosition, Reader, datum::{from_avro_datum, from_avro_datum_reader_schemata, from_avro_datum_schemata}, read_marker, single_object::{GenericSingleObjectReader, SpecificSingleObjectReader}, diff --git a/avro/src/reader/block.rs b/avro/src/reader/block.rs index 8899c3a4..2784c69d 100644 --- a/avro/src/reader/block.rs +++ b/avro/src/reader/block.rs @@ -17,7 +17,7 @@ use std::{ collections::HashMap, - io::{ErrorKind, Read}, + io::{ErrorKind, Read, Seek, SeekFrom}, str::FromStr, }; @@ -35,10 +35,57 @@ use crate::{ util, }; +/// Position and size of a single Avro data block within the file stream. +/// +/// Captured automatically as blocks are read during forward iteration. +/// Use with [`super::Reader::seek_to_block`] to jump back to a previously-read block. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BlockPosition { + /// Byte offset in the stream where this block starts (before the object-count varint). + pub offset: u64, + /// Total number of records in this block. + pub message_count: usize, +} + +/// Wraps an inner reader and tracks the current byte position. +/// +/// Avoids requiring `Seek` just to know how many bytes have been consumed. +/// When the inner reader also implements `Seek`, seeking updates the tracked position. +#[derive(Debug, Clone)] +struct PositionTracker { + inner: R, + pos: u64, +} + +impl PositionTracker { + fn new(inner: R) -> Self { + Self { inner, pos: 0 } + } + + fn position(&self) -> u64 { + self.pos + } +} + +impl Read for PositionTracker { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let n = self.inner.read(buf)?; + self.pos += n as u64; + Ok(n) + } +} + +impl Seek for PositionTracker { + fn seek(&mut self, from: SeekFrom) -> std::io::Result { + self.pos = self.inner.seek(from)?; + Ok(self.pos) + } +} + /// Internal Block reader. #[derive(Debug, Clone)] pub(super) struct Block<'r, R> { - reader: R, + reader: PositionTracker, /// Internal buffering to reduce allocation. buf: Vec, buf_idx: usize, @@ -51,6 +98,10 @@ pub(super) struct Block<'r, R> { pub(super) user_metadata: HashMap>, names_refs: Names, human_readable: bool, + /// Byte offset where data blocks begin (right after header and sync marker). + pub(super) data_start: u64, + /// Position and record count of the currently loaded block. + pub(super) current_block_info: Option, } impl<'r, R: Read> Block<'r, R> { @@ -60,7 +111,7 @@ impl<'r, R: Read> Block<'r, R> { human_readable: bool, ) -> AvroResult> { let mut block = Block { - reader, + reader: PositionTracker::new(reader), codec: Codec::Null, writer_schema: Schema::Null, schemata, @@ -71,9 +122,12 @@ impl<'r, R: Read> Block<'r, R> { user_metadata: Default::default(), names_refs: Default::default(), human_readable, + data_start: 0, + current_block_info: None, }; block.read_header()?; + block.data_start = block.reader.position(); Ok(block) } @@ -142,6 +196,7 @@ impl<'r, R: Read> Block<'r, R> { /// the block. The objects are stored in an internal buffer to the `Reader`. fn read_block_next(&mut self) -> AvroResult<()> { assert!(self.is_empty(), "Expected self to be empty!"); + let block_start = self.reader.position(); match util::read_long(&mut self.reader).map_err(Error::into_details) { Ok(block_len) => { self.message_count = block_len as usize; @@ -156,6 +211,11 @@ impl<'r, R: Read> Block<'r, R> { return Err(Details::GetBlockMarker.into()); } + self.current_block_info = Some(BlockPosition { + offset: block_start, + message_count: block_len as usize, + }); + // NOTE (JAB): This doesn't fit this Reader pattern very well. // `self.buf` is a growable buffer that is reused as the reader is iterated. // For non `Codec::Null` variants, `decompress` will allocate a new `Vec` @@ -295,6 +355,23 @@ impl<'r, R: Read> Block<'r, R> { } } +impl Block<'_, R> { + /// Seek the underlying stream to `offset` and read the block there. + /// Validates the sync marker to confirm it's a real block boundary. + pub(super) fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { + self.reader + .seek(SeekFrom::Start(offset)) + .map_err(Details::SeekToBlock)?; + + self.buf.clear(); + self.buf_idx = 0; + self.message_count = 0; + self.current_block_info = None; + + self.read_block_next() + } +} + fn read_codec(metadata: &HashMap) -> AvroResult { let result = metadata .get("avro.codec") diff --git a/avro/src/reader/mod.rs b/avro/src/reader/mod.rs index 7002fb86..802f34c6 100644 --- a/avro/src/reader/mod.rs +++ b/avro/src/reader/mod.rs @@ -21,9 +21,14 @@ mod block; pub mod datum; pub mod single_object; -use std::{collections::HashMap, io::Read, marker::PhantomData}; +use std::{ + collections::HashMap, + io::{Read, Seek}, + marker::PhantomData, +}; use block::Block; +pub use block::BlockPosition; use bon::bon; use serde::de::DeserializeOwned; @@ -159,6 +164,39 @@ impl Iterator for Reader<'_, R> { } } +impl Reader<'_, R> { + /// The currently loaded block's position and record count. + /// + /// Returns `None` before the first record has been read. + /// Updated automatically each time a new block is loaded during iteration. + pub fn current_block(&self) -> Option { + self.block.current_block_info + } + + /// Byte offset where data blocks begin (right after the file header). + /// + /// This is the offset of the first data block — equivalent to the position + /// that would be returned by `current_block().offset` for block 0. + pub fn data_start(&self) -> u64 { + self.block.data_start + } + + /// Seek to the data block at the given byte offset and load it. + /// + /// The offset must point to the start of a valid data block (before its + /// object-count varint). The block is read, decompressed, and its sync + /// marker is validated against the file header. After this call, [`Iterator::next`] + /// yields the first record in that block. + /// + /// Typically the caller saves offsets from [`current_block`](Self::current_block) + /// during forward iteration and later passes them here to jump back. + pub fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { + self.block.seek_to_block(offset)?; + self.errored = false; + Ok(()) + } +} + /// Wrapper around [`Reader`] where the iterator deserializes `T`. pub struct ReaderDeser<'a, R, T> { inner: Reader<'a, R>, @@ -366,4 +404,111 @@ mod tests { panic!("Expected an error in the reading of the codec!"); } } + + /// Write an Avro file with multiple blocks and verify we can seek between them. + #[test] + fn test_seek_to_block() -> TestResult { + use crate::writer::Writer; + + let schema = Schema::parse_str(SCHEMA)?; + let mut writer = Writer::new(&schema, Vec::new())?; + + // Block 0: records with a=10, a=20 + let mut r = Record::new(&schema).unwrap(); + r.put("a", 10i64); + r.put("b", "b0_r0"); + writer.append_value(r)?; + let mut r = Record::new(&schema).unwrap(); + r.put("a", 20i64); + r.put("b", "b0_r1"); + writer.append_value(r)?; + writer.flush()?; + + // Block 1: records with a=30, a=40 + let mut r = Record::new(&schema).unwrap(); + r.put("a", 30i64); + r.put("b", "b1_r0"); + writer.append_value(r)?; + let mut r = Record::new(&schema).unwrap(); + r.put("a", 40i64); + r.put("b", "b1_r1"); + writer.append_value(r)?; + writer.flush()?; + + // Block 2: records with a=50 + let mut r = Record::new(&schema).unwrap(); + r.put("a", 50i64); + r.put("b", "b2_r0"); + writer.append_value(r)?; + writer.flush()?; + + let data = writer.into_inner()?; + + // Read forward and collect block positions + let mut reader = Reader::new(Cursor::new(&data))?; + let mut block_offsets: Vec = Vec::new(); + let mut all_values: Vec = Vec::new(); + + assert!(reader.current_block().is_none()); + + while let Some(value) = reader.next() { + all_values.push(value?); + let pos = reader.current_block().expect("block info after read"); + if block_offsets + .last() + .is_none_or(|last| last.offset != pos.offset) + { + block_offsets.push(pos); + } + } + + assert_eq!(all_values.len(), 5); + assert_eq!(block_offsets.len(), 3); + assert_eq!(block_offsets[0].message_count, 2); + assert_eq!(block_offsets[1].message_count, 2); + assert_eq!(block_offsets[2].message_count, 1); + assert_eq!(reader.data_start(), block_offsets[0].offset); + + // Seek back to block 1 and read its records + reader.seek_to_block(block_offsets[1].offset)?; + let v1 = reader.next().unwrap()?; + assert_eq!(v1, all_values[2]); + let v2 = reader.next().unwrap()?; + assert_eq!(v2, all_values[3]); + + // Seek back to block 0 + reader.seek_to_block(block_offsets[0].offset)?; + let v0 = reader.next().unwrap()?; + assert_eq!(v0, all_values[0]); + + // Seek to block 2 + reader.seek_to_block(block_offsets[2].offset)?; + let v4 = reader.next().unwrap()?; + assert_eq!(v4, all_values[4]); + + assert!(reader.next().is_none()); + + Ok(()) + } + + /// Seeking to an invalid offset should fail with a sync marker error. + #[test] + fn test_seek_to_invalid_offset() -> TestResult { + use crate::writer::Writer; + + let schema = Schema::parse_str(SCHEMA)?; + let mut writer = Writer::new(&schema, Vec::new())?; + let mut r = Record::new(&schema).unwrap(); + r.put("a", 1i64); + r.put("b", "x"); + writer.append_value(r)?; + writer.flush()?; + let data = writer.into_inner()?; + + let mut reader = Reader::new(Cursor::new(&data))?; + let result = reader.seek_to_block(7); + assert!(result.is_err()); + + Ok(()) + } } From d8e73faafd25836b610d962e2a5f70814e6a43a9 Mon Sep 17 00:00:00 2001 From: Igor Artamonov Date: Mon, 6 Apr 2026 01:14:01 +0100 Subject: [PATCH 2/3] after code review --- avro/src/reader/block.rs | 17 +++++++++++++++-- avro/src/reader/mod.rs | 8 +++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/avro/src/reader/block.rs b/avro/src/reader/block.rs index 2784c69d..71d9e01b 100644 --- a/avro/src/reader/block.rs +++ b/avro/src/reader/block.rs @@ -35,7 +35,7 @@ use crate::{ util, }; -/// Position and size of a single Avro data block within the file stream. +/// Byte offset and record count of a single Avro data block. /// /// Captured automatically as blocks are read during forward iteration. /// Use with [`super::Reader::seek_to_block`] to jump back to a previously-read block. @@ -358,6 +358,8 @@ impl<'r, R: Read> Block<'r, R> { impl Block<'_, R> { /// Seek the underlying stream to `offset` and read the block there. /// Validates the sync marker to confirm it's a real block boundary. + /// Returns an error if no valid block can be read at the offset + /// (e.g., the offset is at or past EOF). pub(super) fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { self.reader .seek(SeekFrom::Start(offset)) @@ -368,7 +370,18 @@ impl Block<'_, R> { self.message_count = 0; self.current_block_info = None; - self.read_block_next() + // read_block_next treats UnexpectedEof as a clean end-of-stream + // (returns Ok with message_count=0). That's correct for forward + // iteration but wrong here — the caller asked for a specific block. + self.read_block_next()?; + if self.is_empty() { + return Err(Details::SeekToBlock(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + format!("no block at offset {offset}"), + )) + .into()); + } + Ok(()) } } diff --git a/avro/src/reader/mod.rs b/avro/src/reader/mod.rs index 802f34c6..5e5946b0 100644 --- a/avro/src/reader/mod.rs +++ b/avro/src/reader/mod.rs @@ -164,11 +164,11 @@ impl Iterator for Reader<'_, R> { } } -impl Reader<'_, R> { +impl Reader<'_, R> { /// The currently loaded block's position and record count. /// - /// Returns `None` before the first record has been read. - /// Updated automatically each time a new block is loaded during iteration. + /// Returns `None` only before the first block is loaded (via iteration or + /// [`seek_to_block`](Self::seek_to_block)). Always `Some` afterward. pub fn current_block(&self) -> Option { self.block.current_block_info } @@ -180,7 +180,9 @@ impl Reader<'_, R> { pub fn data_start(&self) -> u64 { self.block.data_start } +} +impl Reader<'_, R> { /// Seek to the data block at the given byte offset and load it. /// /// The offset must point to the start of a valid data block (before its From a6c7563272484dcd71fc1584b7b574e6d401ccea Mon Sep 17 00:00:00 2001 From: Igor Artamonov Date: Mon, 6 Apr 2026 14:36:41 +0100 Subject: [PATCH 3/3] after code review --- avro/src/reader/block.rs | 10 ---------- avro/src/reader/mod.rs | 6 +++--- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/avro/src/reader/block.rs b/avro/src/reader/block.rs index 71d9e01b..deba52f0 100644 --- a/avro/src/reader/block.rs +++ b/avro/src/reader/block.rs @@ -370,17 +370,7 @@ impl Block<'_, R> { self.message_count = 0; self.current_block_info = None; - // read_block_next treats UnexpectedEof as a clean end-of-stream - // (returns Ok with message_count=0). That's correct for forward - // iteration but wrong here — the caller asked for a specific block. self.read_block_next()?; - if self.is_empty() { - return Err(Details::SeekToBlock(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - format!("no block at offset {offset}"), - )) - .into()); - } Ok(()) } } diff --git a/avro/src/reader/mod.rs b/avro/src/reader/mod.rs index 5e5946b0..583aac05 100644 --- a/avro/src/reader/mod.rs +++ b/avro/src/reader/mod.rs @@ -193,9 +193,9 @@ impl Reader<'_, R> { /// Typically the caller saves offsets from [`current_block`](Self::current_block) /// during forward iteration and later passes them here to jump back. pub fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { - self.block.seek_to_block(offset)?; - self.errored = false; - Ok(()) + let seek_status = self.block.seek_to_block(offset); + self.errored = seek_status.is_err(); + seek_status } }