Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func (e *Encoder) Encode(v any) error {
return e.w.Error
}

// Reset resets the encoder to write to a new io.Writer.
func (e *Encoder) Reset(w io.Writer) {
e.w.Reset(w)
}

// Marshal returns the Avro encoding of v.
func Marshal(schema Schema, v any) ([]byte, error) {
return DefaultConfig.Marshal(schema, v)
Expand Down
37 changes: 37 additions & 0 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ type Encoder struct {
blockLength int
count int
blockSize int

// Stored for Reset.
header Header
}

// NewEncoder returns a new encoder that writes to w using schema s.
Expand Down Expand Up @@ -386,6 +389,11 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e
codec: h.Codec,
blockLength: cfg.BlockLength,
blockSize: cfg.BlockSize,
header: Header{
Magic: magicBytes,
Meta: h.Meta,
Sync: h.Sync,
},
}
return e, nil
}
Expand Down Expand Up @@ -427,6 +435,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e
codec: codec,
blockLength: cfg.BlockLength,
blockSize: cfg.BlockSize,
header: header,
}
return e, nil
}
Expand Down Expand Up @@ -515,6 +524,34 @@ func (e *Encoder) Close() error {
return err
}

// Reset flushes any pending data, resets the encoder to write to a new io.Writer,
// and writes a fresh header with a new sync marker. The schema, codec, and other
// settings are preserved from the original encoder.
// This allows reusing the encoder for multiple files without reallocating buffers.
func (e *Encoder) Reset(w io.Writer) error {
if err := e.Flush(); err != nil {
return err
}

// Generate new sync marker for the new file.
_, _ = rand.Read(e.header.Sync[:])
e.sync = e.header.Sync

// Reset writer to new output and write header.
e.writer.Reset(w)
e.writer.WriteVal(HeaderSchema, e.header)
if err := e.writer.Flush(); err != nil {
return err
}

// Reset buffer and encoder.
e.buf.Reset()
e.encoder.Reset(e.buf)
e.count = 0

return nil
}

func (e *Encoder) writerBlock() error {
e.writer.WriteLong(int64(e.count))

Expand Down
263 changes: 263 additions & 0 deletions ocf/ocf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,3 +1312,266 @@ type errorHeaderWriter struct{}
func (*errorHeaderWriter) Write(p []byte) (int, error) {
return 0, errors.New("test")
}

// TestEncoder_Reset tests that Reset allows reusing encoder for multiple files.
func TestEncoder_Reset(t *testing.T) {
record1 := FullRecord{
Strings: []string{"first", "record"},
Longs: []int64{},
Enum: "A",
Map: map[string]int{},
Record: &TestRecord{Long: 1},
}
record2 := FullRecord{
Strings: []string{"second", "record"},
Longs: []int64{},
Enum: "B",
Map: map[string]int{},
Record: &TestRecord{Long: 2},
}

// Create first file
buf1 := &bytes.Buffer{}
enc, err := ocf.NewEncoder(schema, buf1)
require.NoError(t, err)

err = enc.Encode(record1)
require.NoError(t, err)

err = enc.Close()
require.NoError(t, err)

// Reset to write to second file
buf2 := &bytes.Buffer{}
err = enc.Reset(buf2)
require.NoError(t, err)

err = enc.Encode(record2)
require.NoError(t, err)

err = enc.Close()
require.NoError(t, err)

// Verify first file
dec1, err := ocf.NewDecoder(buf1)
require.NoError(t, err)

require.True(t, dec1.HasNext())
var got1 FullRecord
err = dec1.Decode(&got1)
require.NoError(t, err)
assert.Equal(t, record1, got1)
require.False(t, dec1.HasNext())

// Verify second file
dec2, err := ocf.NewDecoder(buf2)
require.NoError(t, err)

require.True(t, dec2.HasNext())
var got2 FullRecord
err = dec2.Decode(&got2)
require.NoError(t, err)
assert.Equal(t, record2, got2)
require.False(t, dec2.HasNext())
}

// TestEncoder_ResetWithPendingData tests Reset flushes pending data.
func TestEncoder_ResetWithPendingData(t *testing.T) {
buf1 := &bytes.Buffer{}
enc, err := ocf.NewEncoder(`"long"`, buf1, ocf.WithBlockLength(10))
require.NoError(t, err)

// Write data but don't close (pending data)
err = enc.Encode(int64(42))
require.NoError(t, err)

// Reset should flush the pending data
buf2 := &bytes.Buffer{}
err = enc.Reset(buf2)
require.NoError(t, err)

// Verify first file has the data
dec1, err := ocf.NewDecoder(buf1)
require.NoError(t, err)

require.True(t, dec1.HasNext())
var got int64
err = dec1.Decode(&got)
require.NoError(t, err)
assert.Equal(t, int64(42), got)
}

// TestEncoder_ResetGeneratesNewSyncMarker tests that each reset creates a new sync marker.
func TestEncoder_ResetGeneratesNewSyncMarker(t *testing.T) {
buf1 := &bytes.Buffer{}
enc, err := ocf.NewEncoder(`"long"`, buf1)
require.NoError(t, err)

err = enc.Encode(int64(1))
require.NoError(t, err)
err = enc.Close()
require.NoError(t, err)

// Get sync marker from first file
dec1, err := ocf.NewDecoder(bytes.NewReader(buf1.Bytes()))
require.NoError(t, err)

reader1 := avro.NewReader(bytes.NewReader(buf1.Bytes()), 1024)
var h1 ocf.Header
reader1.ReadVal(ocf.HeaderSchema, &h1)
require.NoError(t, reader1.Error)
sync1 := h1.Sync

// Reset to second buffer
buf2 := &bytes.Buffer{}
err = enc.Reset(buf2)
require.NoError(t, err)

err = enc.Encode(int64(2))
require.NoError(t, err)
err = enc.Close()
require.NoError(t, err)

// Get sync marker from second file
reader2 := avro.NewReader(bytes.NewReader(buf2.Bytes()), 1024)
var h2 ocf.Header
reader2.ReadVal(ocf.HeaderSchema, &h2)
require.NoError(t, reader2.Error)
sync2 := h2.Sync

// Sync markers should be different
assert.NotEqual(t, sync1, sync2, "each file should have a unique sync marker")

// But both files should be readable
_ = dec1
dec2, err := ocf.NewDecoder(buf2)
require.NoError(t, err)
require.True(t, dec2.HasNext())
}

// TestEncoder_ResetMultipleTimes tests multiple sequential resets.
func TestEncoder_ResetMultipleTimes(t *testing.T) {
buffers := make([]*bytes.Buffer, 5)
for i := range buffers {
buffers[i] = &bytes.Buffer{}
}

enc, err := ocf.NewEncoder(`"long"`, buffers[0])
require.NoError(t, err)

for i := 0; i < 5; i++ {
if i > 0 {
err = enc.Reset(buffers[i])
require.NoError(t, err)
}

err = enc.Encode(int64(i * 10))
require.NoError(t, err)

err = enc.Close()
require.NoError(t, err)
}

// Verify all files
for i := 0; i < 5; i++ {
dec, err := ocf.NewDecoder(buffers[i])
require.NoError(t, err, "file %d", i)

require.True(t, dec.HasNext(), "file %d", i)
var got int64
err = dec.Decode(&got)
require.NoError(t, err, "file %d", i)
assert.Equal(t, int64(i*10), got, "file %d", i)
}
}

// TestEncoder_AppendToExistingFile tests appending records to an existing OCF file.
func TestEncoder_AppendToExistingFile(t *testing.T) {
type SimpleRecord struct {
Name string `avro:"name"`
ID int64 `avro:"id"`
}
simpleSchema := `{"type":"record","name":"SimpleRecord","fields":[{"name":"name","type":"string"},{"name":"id","type":"long"}]}`

record1 := SimpleRecord{Name: "first", ID: 1}
record2 := SimpleRecord{Name: "second", ID: 2}

tmpFile, err := os.CreateTemp("", "append-test-*.avro")
require.NoError(t, err)
tmpName := tmpFile.Name()
t.Cleanup(func() { _ = os.Remove(tmpName) })

// Write first record
enc, err := ocf.NewEncoder(simpleSchema, tmpFile)
require.NoError(t, err)
err = enc.Encode(record1)
require.NoError(t, err)
err = enc.Close()
require.NoError(t, err)
err = tmpFile.Close()
require.NoError(t, err)

// Reopen file and append second record
file, err := os.OpenFile(tmpName, os.O_RDWR, 0o644)
require.NoError(t, err)

enc2, err := ocf.NewEncoder(simpleSchema, file)
require.NoError(t, err)
err = enc2.Encode(record2)
require.NoError(t, err)
err = enc2.Close()
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)

// Read back and verify both records
file, err = os.Open(tmpName)
require.NoError(t, err)
defer file.Close()

dec, err := ocf.NewDecoder(file)
require.NoError(t, err)

var records []SimpleRecord
for dec.HasNext() {
var r SimpleRecord
err = dec.Decode(&r)
require.NoError(t, err)
records = append(records, r)
}
require.NoError(t, dec.Error())

require.Len(t, records, 2)
assert.Equal(t, record1, records[0])
assert.Equal(t, record2, records[1])
}

// TestEncoder_ResetPreservesCodec tests that codec is preserved across reset.
func TestEncoder_ResetPreservesCodec(t *testing.T) {
buf1 := &bytes.Buffer{}
enc, err := ocf.NewEncoder(`"long"`, buf1, ocf.WithCodec(ocf.Deflate))
require.NoError(t, err)

err = enc.Encode(int64(1))
require.NoError(t, err)
err = enc.Close()
require.NoError(t, err)

buf2 := &bytes.Buffer{}
err = enc.Reset(buf2)
require.NoError(t, err)

err = enc.Encode(int64(2))
require.NoError(t, err)
err = enc.Close()
require.NoError(t, err)

// Both files should use deflate codec
dec1, err := ocf.NewDecoder(buf1)
require.NoError(t, err)
assert.Equal(t, []byte("deflate"), dec1.Metadata()["avro.codec"])

dec2, err := ocf.NewDecoder(buf2)
require.NoError(t, err)
assert.Equal(t, []byte("deflate"), dec2.Metadata()["avro.codec"])
}