diff --git a/avro/src/error.rs b/avro/src/error.rs index 425bf4fb..3e9e82e0 100644 --- a/avro/src/error.rs +++ b/avro/src/error.rs @@ -534,6 +534,9 @@ pub enum Details { #[error("Failed to read block marker bytes: {0}")] ReadBlockMarker(#[source] std::io::Error), + #[error("Failed to seek to block: {0}")] + SeekToBlock(#[source] std::io::Error), + #[error("Read into buffer failed: {0}")] ReadIntoBuf(#[source] std::io::Error), diff --git a/avro/src/lib.rs b/avro/src/lib.rs index 3f509897..586c6687 100644 --- a/avro/src/lib.rs +++ b/avro/src/lib.rs @@ -99,7 +99,7 @@ pub use error::Error; reason = "Still need to export it until we remove it completely" )] pub use reader::{ - Reader, + BlockPosition, Reader, datum::{from_avro_datum, from_avro_datum_reader_schemata, from_avro_datum_schemata}, read_marker, single_object::{GenericSingleObjectReader, SpecificSingleObjectReader}, diff --git a/avro/src/reader/block.rs b/avro/src/reader/block.rs index 8899c3a4..deba52f0 100644 --- a/avro/src/reader/block.rs +++ b/avro/src/reader/block.rs @@ -17,7 +17,7 @@ use std::{ collections::HashMap, - io::{ErrorKind, Read}, + io::{ErrorKind, Read, Seek, SeekFrom}, str::FromStr, }; @@ -35,10 +35,57 @@ use crate::{ util, }; +/// Byte offset and record count of a single Avro data block. +/// +/// Captured automatically as blocks are read during forward iteration. +/// Use with [`super::Reader::seek_to_block`] to jump back to a previously-read block. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BlockPosition { + /// Byte offset in the stream where this block starts (before the object-count varint). + pub offset: u64, + /// Total number of records in this block. + pub message_count: usize, +} + +/// Wraps an inner reader and tracks the current byte position. +/// +/// Avoids requiring `Seek` just to know how many bytes have been consumed. +/// When the inner reader also implements `Seek`, seeking updates the tracked position. +#[derive(Debug, Clone)] +struct PositionTracker { + inner: R, + pos: u64, +} + +impl PositionTracker { + fn new(inner: R) -> Self { + Self { inner, pos: 0 } + } + + fn position(&self) -> u64 { + self.pos + } +} + +impl Read for PositionTracker { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let n = self.inner.read(buf)?; + self.pos += n as u64; + Ok(n) + } +} + +impl Seek for PositionTracker { + fn seek(&mut self, from: SeekFrom) -> std::io::Result { + self.pos = self.inner.seek(from)?; + Ok(self.pos) + } +} + /// Internal Block reader. #[derive(Debug, Clone)] pub(super) struct Block<'r, R> { - reader: R, + reader: PositionTracker, /// Internal buffering to reduce allocation. buf: Vec, buf_idx: usize, @@ -51,6 +98,10 @@ pub(super) struct Block<'r, R> { pub(super) user_metadata: HashMap>, names_refs: Names, human_readable: bool, + /// Byte offset where data blocks begin (right after header and sync marker). + pub(super) data_start: u64, + /// Position and record count of the currently loaded block. + pub(super) current_block_info: Option, } impl<'r, R: Read> Block<'r, R> { @@ -60,7 +111,7 @@ impl<'r, R: Read> Block<'r, R> { human_readable: bool, ) -> AvroResult> { let mut block = Block { - reader, + reader: PositionTracker::new(reader), codec: Codec::Null, writer_schema: Schema::Null, schemata, @@ -71,9 +122,12 @@ impl<'r, R: Read> Block<'r, R> { user_metadata: Default::default(), names_refs: Default::default(), human_readable, + data_start: 0, + current_block_info: None, }; block.read_header()?; + block.data_start = block.reader.position(); Ok(block) } @@ -142,6 +196,7 @@ impl<'r, R: Read> Block<'r, R> { /// the block. The objects are stored in an internal buffer to the `Reader`. fn read_block_next(&mut self) -> AvroResult<()> { assert!(self.is_empty(), "Expected self to be empty!"); + let block_start = self.reader.position(); match util::read_long(&mut self.reader).map_err(Error::into_details) { Ok(block_len) => { self.message_count = block_len as usize; @@ -156,6 +211,11 @@ impl<'r, R: Read> Block<'r, R> { return Err(Details::GetBlockMarker.into()); } + self.current_block_info = Some(BlockPosition { + offset: block_start, + message_count: block_len as usize, + }); + // NOTE (JAB): This doesn't fit this Reader pattern very well. // `self.buf` is a growable buffer that is reused as the reader is iterated. // For non `Codec::Null` variants, `decompress` will allocate a new `Vec` @@ -295,6 +355,26 @@ impl<'r, R: Read> Block<'r, R> { } } +impl Block<'_, R> { + /// Seek the underlying stream to `offset` and read the block there. + /// Validates the sync marker to confirm it's a real block boundary. + /// Returns an error if no valid block can be read at the offset + /// (e.g., the offset is at or past EOF). + pub(super) fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { + self.reader + .seek(SeekFrom::Start(offset)) + .map_err(Details::SeekToBlock)?; + + self.buf.clear(); + self.buf_idx = 0; + self.message_count = 0; + self.current_block_info = None; + + self.read_block_next()?; + Ok(()) + } +} + fn read_codec(metadata: &HashMap) -> AvroResult { let result = metadata .get("avro.codec") diff --git a/avro/src/reader/mod.rs b/avro/src/reader/mod.rs index 7002fb86..583aac05 100644 --- a/avro/src/reader/mod.rs +++ b/avro/src/reader/mod.rs @@ -21,9 +21,14 @@ mod block; pub mod datum; pub mod single_object; -use std::{collections::HashMap, io::Read, marker::PhantomData}; +use std::{ + collections::HashMap, + io::{Read, Seek}, + marker::PhantomData, +}; use block::Block; +pub use block::BlockPosition; use bon::bon; use serde::de::DeserializeOwned; @@ -159,6 +164,41 @@ impl Iterator for Reader<'_, R> { } } +impl Reader<'_, R> { + /// The currently loaded block's position and record count. + /// + /// Returns `None` only before the first block is loaded (via iteration or + /// [`seek_to_block`](Self::seek_to_block)). Always `Some` afterward. + pub fn current_block(&self) -> Option { + self.block.current_block_info + } + + /// Byte offset where data blocks begin (right after the file header). + /// + /// This is the offset of the first data block — equivalent to the position + /// that would be returned by `current_block().offset` for block 0. + pub fn data_start(&self) -> u64 { + self.block.data_start + } +} + +impl Reader<'_, R> { + /// Seek to the data block at the given byte offset and load it. + /// + /// The offset must point to the start of a valid data block (before its + /// object-count varint). The block is read, decompressed, and its sync + /// marker is validated against the file header. After this call, [`Iterator::next`] + /// yields the first record in that block. + /// + /// Typically the caller saves offsets from [`current_block`](Self::current_block) + /// during forward iteration and later passes them here to jump back. + pub fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { + let seek_status = self.block.seek_to_block(offset); + self.errored = seek_status.is_err(); + seek_status + } +} + /// Wrapper around [`Reader`] where the iterator deserializes `T`. pub struct ReaderDeser<'a, R, T> { inner: Reader<'a, R>, @@ -366,4 +406,111 @@ mod tests { panic!("Expected an error in the reading of the codec!"); } } + + /// Write an Avro file with multiple blocks and verify we can seek between them. + #[test] + fn test_seek_to_block() -> TestResult { + use crate::writer::Writer; + + let schema = Schema::parse_str(SCHEMA)?; + let mut writer = Writer::new(&schema, Vec::new())?; + + // Block 0: records with a=10, a=20 + let mut r = Record::new(&schema).unwrap(); + r.put("a", 10i64); + r.put("b", "b0_r0"); + writer.append_value(r)?; + let mut r = Record::new(&schema).unwrap(); + r.put("a", 20i64); + r.put("b", "b0_r1"); + writer.append_value(r)?; + writer.flush()?; + + // Block 1: records with a=30, a=40 + let mut r = Record::new(&schema).unwrap(); + r.put("a", 30i64); + r.put("b", "b1_r0"); + writer.append_value(r)?; + let mut r = Record::new(&schema).unwrap(); + r.put("a", 40i64); + r.put("b", "b1_r1"); + writer.append_value(r)?; + writer.flush()?; + + // Block 2: records with a=50 + let mut r = Record::new(&schema).unwrap(); + r.put("a", 50i64); + r.put("b", "b2_r0"); + writer.append_value(r)?; + writer.flush()?; + + let data = writer.into_inner()?; + + // Read forward and collect block positions + let mut reader = Reader::new(Cursor::new(&data))?; + let mut block_offsets: Vec = Vec::new(); + let mut all_values: Vec = Vec::new(); + + assert!(reader.current_block().is_none()); + + while let Some(value) = reader.next() { + all_values.push(value?); + let pos = reader.current_block().expect("block info after read"); + if block_offsets + .last() + .is_none_or(|last| last.offset != pos.offset) + { + block_offsets.push(pos); + } + } + + assert_eq!(all_values.len(), 5); + assert_eq!(block_offsets.len(), 3); + assert_eq!(block_offsets[0].message_count, 2); + assert_eq!(block_offsets[1].message_count, 2); + assert_eq!(block_offsets[2].message_count, 1); + assert_eq!(reader.data_start(), block_offsets[0].offset); + + // Seek back to block 1 and read its records + reader.seek_to_block(block_offsets[1].offset)?; + let v1 = reader.next().unwrap()?; + assert_eq!(v1, all_values[2]); + let v2 = reader.next().unwrap()?; + assert_eq!(v2, all_values[3]); + + // Seek back to block 0 + reader.seek_to_block(block_offsets[0].offset)?; + let v0 = reader.next().unwrap()?; + assert_eq!(v0, all_values[0]); + + // Seek to block 2 + reader.seek_to_block(block_offsets[2].offset)?; + let v4 = reader.next().unwrap()?; + assert_eq!(v4, all_values[4]); + + assert!(reader.next().is_none()); + + Ok(()) + } + + /// Seeking to an invalid offset should fail with a sync marker error. + #[test] + fn test_seek_to_invalid_offset() -> TestResult { + use crate::writer::Writer; + + let schema = Schema::parse_str(SCHEMA)?; + let mut writer = Writer::new(&schema, Vec::new())?; + let mut r = Record::new(&schema).unwrap(); + r.put("a", 1i64); + r.put("b", "x"); + writer.append_value(r)?; + writer.flush()?; + let data = writer.into_inner()?; + + let mut reader = Reader::new(Cursor::new(&data))?; + let result = reader.seek_to_block(7); + assert!(result.is_err()); + + Ok(()) + } }