diff --git a/encoder.go b/encoder.go index faa285e..49e3d7b 100644 --- a/encoder.go +++ b/encoder.go @@ -31,6 +31,11 @@ func (e *Encoder) Encode(v any) error { return e.w.Error } +// Reset resets the encoder to write to a new io.Writer. +func (e *Encoder) Reset(w io.Writer) { + e.w.Reset(w) +} + // Marshal returns the Avro encoding of v. func Marshal(schema Schema, v any) ([]byte, error) { return DefaultConfig.Marshal(schema, v) diff --git a/ocf/ocf.go b/ocf/ocf.go index 09d7a93..bc6d62a 100644 --- a/ocf/ocf.go +++ b/ocf/ocf.go @@ -333,6 +333,9 @@ type Encoder struct { blockLength int count int blockSize int + + // Stored for Reset. + header Header } // NewEncoder returns a new encoder that writes to w using schema s. @@ -386,6 +389,11 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e codec: h.Codec, blockLength: cfg.BlockLength, blockSize: cfg.BlockSize, + header: Header{ + Magic: magicBytes, + Meta: h.Meta, + Sync: h.Sync, + }, } return e, nil } @@ -427,6 +435,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e codec: codec, blockLength: cfg.BlockLength, blockSize: cfg.BlockSize, + header: header, } return e, nil } @@ -515,6 +524,34 @@ func (e *Encoder) Close() error { return err } +// Reset flushes any pending data, resets the encoder to write to a new io.Writer, +// and writes a fresh header with a new sync marker. The schema, codec, and other +// settings are preserved from the original encoder. +// This allows reusing the encoder for multiple files without reallocating buffers. +func (e *Encoder) Reset(w io.Writer) error { + if err := e.Flush(); err != nil { + return err + } + + // Generate new sync marker for the new file. + _, _ = rand.Read(e.header.Sync[:]) + e.sync = e.header.Sync + + // Reset writer to new output and write header. + e.writer.Reset(w) + e.writer.WriteVal(HeaderSchema, e.header) + if err := e.writer.Flush(); err != nil { + return err + } + + // Reset buffer and encoder. + e.buf.Reset() + e.encoder.Reset(e.buf) + e.count = 0 + + return nil +} + func (e *Encoder) writerBlock() error { e.writer.WriteLong(int64(e.count)) diff --git a/ocf/ocf_test.go b/ocf/ocf_test.go index b22e6dc..25566f9 100644 --- a/ocf/ocf_test.go +++ b/ocf/ocf_test.go @@ -1312,3 +1312,266 @@ type errorHeaderWriter struct{} func (*errorHeaderWriter) Write(p []byte) (int, error) { return 0, errors.New("test") } + +// TestEncoder_Reset tests that Reset allows reusing encoder for multiple files. +func TestEncoder_Reset(t *testing.T) { + record1 := FullRecord{ + Strings: []string{"first", "record"}, + Longs: []int64{}, + Enum: "A", + Map: map[string]int{}, + Record: &TestRecord{Long: 1}, + } + record2 := FullRecord{ + Strings: []string{"second", "record"}, + Longs: []int64{}, + Enum: "B", + Map: map[string]int{}, + Record: &TestRecord{Long: 2}, + } + + // Create first file + buf1 := &bytes.Buffer{} + enc, err := ocf.NewEncoder(schema, buf1) + require.NoError(t, err) + + err = enc.Encode(record1) + require.NoError(t, err) + + err = enc.Close() + require.NoError(t, err) + + // Reset to write to second file + buf2 := &bytes.Buffer{} + err = enc.Reset(buf2) + require.NoError(t, err) + + err = enc.Encode(record2) + require.NoError(t, err) + + err = enc.Close() + require.NoError(t, err) + + // Verify first file + dec1, err := ocf.NewDecoder(buf1) + require.NoError(t, err) + + require.True(t, dec1.HasNext()) + var got1 FullRecord + err = dec1.Decode(&got1) + require.NoError(t, err) + assert.Equal(t, record1, got1) + require.False(t, dec1.HasNext()) + + // Verify second file + dec2, err := ocf.NewDecoder(buf2) + require.NoError(t, err) + + require.True(t, dec2.HasNext()) + var got2 FullRecord + err = dec2.Decode(&got2) + require.NoError(t, err) + assert.Equal(t, record2, got2) + require.False(t, dec2.HasNext()) +} + +// TestEncoder_ResetWithPendingData tests Reset flushes pending data. +func TestEncoder_ResetWithPendingData(t *testing.T) { + buf1 := &bytes.Buffer{} + enc, err := ocf.NewEncoder(`"long"`, buf1, ocf.WithBlockLength(10)) + require.NoError(t, err) + + // Write data but don't close (pending data) + err = enc.Encode(int64(42)) + require.NoError(t, err) + + // Reset should flush the pending data + buf2 := &bytes.Buffer{} + err = enc.Reset(buf2) + require.NoError(t, err) + + // Verify first file has the data + dec1, err := ocf.NewDecoder(buf1) + require.NoError(t, err) + + require.True(t, dec1.HasNext()) + var got int64 + err = dec1.Decode(&got) + require.NoError(t, err) + assert.Equal(t, int64(42), got) +} + +// TestEncoder_ResetGeneratesNewSyncMarker tests that each reset creates a new sync marker. +func TestEncoder_ResetGeneratesNewSyncMarker(t *testing.T) { + buf1 := &bytes.Buffer{} + enc, err := ocf.NewEncoder(`"long"`, buf1) + require.NoError(t, err) + + err = enc.Encode(int64(1)) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) + + // Get sync marker from first file + dec1, err := ocf.NewDecoder(bytes.NewReader(buf1.Bytes())) + require.NoError(t, err) + + reader1 := avro.NewReader(bytes.NewReader(buf1.Bytes()), 1024) + var h1 ocf.Header + reader1.ReadVal(ocf.HeaderSchema, &h1) + require.NoError(t, reader1.Error) + sync1 := h1.Sync + + // Reset to second buffer + buf2 := &bytes.Buffer{} + err = enc.Reset(buf2) + require.NoError(t, err) + + err = enc.Encode(int64(2)) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) + + // Get sync marker from second file + reader2 := avro.NewReader(bytes.NewReader(buf2.Bytes()), 1024) + var h2 ocf.Header + reader2.ReadVal(ocf.HeaderSchema, &h2) + require.NoError(t, reader2.Error) + sync2 := h2.Sync + + // Sync markers should be different + assert.NotEqual(t, sync1, sync2, "each file should have a unique sync marker") + + // But both files should be readable + _ = dec1 + dec2, err := ocf.NewDecoder(buf2) + require.NoError(t, err) + require.True(t, dec2.HasNext()) +} + +// TestEncoder_ResetMultipleTimes tests multiple sequential resets. +func TestEncoder_ResetMultipleTimes(t *testing.T) { + buffers := make([]*bytes.Buffer, 5) + for i := range buffers { + buffers[i] = &bytes.Buffer{} + } + + enc, err := ocf.NewEncoder(`"long"`, buffers[0]) + require.NoError(t, err) + + for i := 0; i < 5; i++ { + if i > 0 { + err = enc.Reset(buffers[i]) + require.NoError(t, err) + } + + err = enc.Encode(int64(i * 10)) + require.NoError(t, err) + + err = enc.Close() + require.NoError(t, err) + } + + // Verify all files + for i := 0; i < 5; i++ { + dec, err := ocf.NewDecoder(buffers[i]) + require.NoError(t, err, "file %d", i) + + require.True(t, dec.HasNext(), "file %d", i) + var got int64 + err = dec.Decode(&got) + require.NoError(t, err, "file %d", i) + assert.Equal(t, int64(i*10), got, "file %d", i) + } +} + +// TestEncoder_AppendToExistingFile tests appending records to an existing OCF file. +func TestEncoder_AppendToExistingFile(t *testing.T) { + type SimpleRecord struct { + Name string `avro:"name"` + ID int64 `avro:"id"` + } + simpleSchema := `{"type":"record","name":"SimpleRecord","fields":[{"name":"name","type":"string"},{"name":"id","type":"long"}]}` + + record1 := SimpleRecord{Name: "first", ID: 1} + record2 := SimpleRecord{Name: "second", ID: 2} + + tmpFile, err := os.CreateTemp("", "append-test-*.avro") + require.NoError(t, err) + tmpName := tmpFile.Name() + t.Cleanup(func() { _ = os.Remove(tmpName) }) + + // Write first record + enc, err := ocf.NewEncoder(simpleSchema, tmpFile) + require.NoError(t, err) + err = enc.Encode(record1) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) + err = tmpFile.Close() + require.NoError(t, err) + + // Reopen file and append second record + file, err := os.OpenFile(tmpName, os.O_RDWR, 0o644) + require.NoError(t, err) + + enc2, err := ocf.NewEncoder(simpleSchema, file) + require.NoError(t, err) + err = enc2.Encode(record2) + require.NoError(t, err) + err = enc2.Close() + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + + // Read back and verify both records + file, err = os.Open(tmpName) + require.NoError(t, err) + defer file.Close() + + dec, err := ocf.NewDecoder(file) + require.NoError(t, err) + + var records []SimpleRecord + for dec.HasNext() { + var r SimpleRecord + err = dec.Decode(&r) + require.NoError(t, err) + records = append(records, r) + } + require.NoError(t, dec.Error()) + + require.Len(t, records, 2) + assert.Equal(t, record1, records[0]) + assert.Equal(t, record2, records[1]) +} + +// TestEncoder_ResetPreservesCodec tests that codec is preserved across reset. +func TestEncoder_ResetPreservesCodec(t *testing.T) { + buf1 := &bytes.Buffer{} + enc, err := ocf.NewEncoder(`"long"`, buf1, ocf.WithCodec(ocf.Deflate)) + require.NoError(t, err) + + err = enc.Encode(int64(1)) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) + + buf2 := &bytes.Buffer{} + err = enc.Reset(buf2) + require.NoError(t, err) + + err = enc.Encode(int64(2)) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) + + // Both files should use deflate codec + dec1, err := ocf.NewDecoder(buf1) + require.NoError(t, err) + assert.Equal(t, []byte("deflate"), dec1.Metadata()["avro.codec"]) + + dec2, err := ocf.NewDecoder(buf2) + require.NoError(t, err) + assert.Equal(t, []byte("deflate"), dec2.Metadata()["avro.codec"]) +}