Skip to content

Conversation

@TuSKan
Copy link
Contributor

@TuSKan TuSKan commented Dec 16, 2025

Goal of this PR

Expose Reader Offset and OCF Block Status for Concurrent Decoder

Description

This PR enhances the Reader and OCF Decoder by exposing internal state information that is critical for advanced processing scenarios, such as progress tracking, concurrently splitting, and debugging.

Key Changes

  • Reader.InputOffset(): Adds a method to the Reader to retrieve the current input offset. This allows consumers to know exactly where in the underlying stream the reader is currently positioned.
  • OCF Decoder.BlockStatus(): Introduces a BlockStatus() method (and corresponding struct) to the OCF Decoder. This provides a snapshot of the current block being processed, including:
    • Current: The index of the current record within the block.
    • Count: The total number of records in the current block.
    • Size: The size (in bytes) of the current block.
    • Offset: The input offset provided by the underlying reader.

Motivation

Currently, the avro package abstracts away the underlying stream position and block details. While this is fine for simple reading, it limits users who need to:

  1. Track Progress: Accurately report percentage completion when processing large OCF files.
  2. Implement Splitting: effectively split file processing based on block boundaries and offsets.
  3. Debug: gain better visibility into how the decoder is traversing the file structure.

Use Case Example

A data processing pipeline can now use BlockStatus() to log precise progress or checkpoint processing at specific block offsets, improving reliability and observability.

// Example usage
decoder, _ := ocf.NewDecoder(r)
for decoder.HasNext() {
    var record MyRecord
    decoder.Decode(&record)
    
    // Track progress
    status := decoder.BlockStatus()
    fmt.Printf("Processing record %d/%d of block at offset %d\n", status.Current, status.Count, status.Offset)
}

## How did I test it?

I Make a test TestConcurrentDecode for concurrent decode

Please, let me know if this PR makes sense for you.

@TuSKan TuSKan changed the title Header Concurrently Decoder Dec 16, 2025
@TuSKan TuSKan changed the title Concurrently Decoder Concurrent Decoder Dec 16, 2025
Comment on lines -382 to +432
writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig))
writer := avro.NewWriter(w, 512, avro.WithWriterConfig(avro.DefaultConfig))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix bug #590

Comment on lines -423 to +473
writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig))
writer := avro.NewWriter(w, 512, avro.WithWriterConfig(avro.DefaultConfig))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same above

Comment on lines +62 to +75
func newDecoderConfig(opts ...DecoderFunc) *decoderConfig {
cfg := decoderConfig{
DecoderConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
CodecOptions: codecOptions{
DeflateCompressionLevel: flate.DefaultCompression,
},
}
for _, opt := range opts {
opt(&cfg)
}
return &cfg
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that this change helped anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoiding duplicate code.

need := min(r.tail-r.head, tokenLen-1)

// Construct boundary window: stash + beginning of new buffer
boundary := make([]byte, len(stash)+need)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a known size, allocate once and reuse instead of constantly re-allocating.

copy(boundary, stash)
copy(boundary[len(stash):], r.buf[r.head:r.head+need])

if idx := bytes.Index(boundary, token); idx >= 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, surely the reader has advanced too far, as the start of the token is no longer in the buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a case when the token extrapolate the buffer, só a bigger buffer is needed

data: []byte{0x38, 0x36},
},
{

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto format

data: []byte{0x38, 0x36},
},
{

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto format

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

3 participants