Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ae8cc92
Added reading from offset to parquet file reader
iluhinsky May 16, 2025
37c17df
Added end-to-end tests for parquet file reader
iluhinsky May 16, 2025
f1a94df
Added ability to stop generating values by plugins to benchmark faker
iluhinsky May 16, 2025
a784be2
Moved system worker init logic from WorkerInitFunc to Init
iluhinsky May 16, 2025
cf4a876
Returned support of PostgreSQL to vector search tests
iluhinsky May 16, 2025
c03b7f0
Fixed handling of empty generated values in generic insert worker
iluhinsky May 16, 2025
02ac237
Fixed configuring parquet reader offset for readonly tests
iluhinsky May 16, 2025
6619f5d
Implemented ability to read parquet file in a circular way
iluhinsky May 17, 2025
b855082
Added feature of reading parquet datasources in curcular way for read…
iluhinsky May 17, 2025
51b0872
[Parquet Reader] Moved skip until offset logic to dedicated function
iluhinsky May 18, 2025
e851709
[Parquet Reader] Simplified internal logic by removing rowsToSkip str…
iluhinsky May 18, 2025
8fd4bdb
[Parquet Reader] Fixed bug with circular reading with offset
iluhinsky May 18, 2025
60acf1f
[Parquet Reader] Added test case for circular reading with offset
iluhinsky May 18, 2025
d85903b
[Parquet Reader] Added test cases
iluhinsky May 18, 2025
dc39c5f
[Vector Search] Minor fixes
iluhinsky May 18, 2025
c6a73a9
[Benchmark] Changed RandomizerPlugin interface by requiring return of…
iluhinsky May 18, 2025
5d8babd
[DB Bench] Made DataSetSourcePlugin concurrent-friendly
iluhinsky May 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added acronis-db-bench/dataset-source/numbers.parquet
Binary file not shown.
109 changes: 99 additions & 10 deletions acronis-db-bench/dataset-source/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ type ParquetFileDataSource struct {
recordReader pqarrow.RecordReader

currentRecord arrow.Record
currentOffset int
currentOffset int64
circular bool
}

func NewParquetFileDataSource(filePath string) (*ParquetFileDataSource, error) {
func NewParquetFileDataSource(filePath string, offset int64, circular bool) (*ParquetFileDataSource, error) {
var rdr, err = file.OpenParquetFile(filePath, true)
if err != nil {
return nil, fmt.Errorf("error opening parquet file: %v", err)
Expand Down Expand Up @@ -58,11 +59,59 @@ func NewParquetFileDataSource(filePath string) (*ParquetFileDataSource, error) {
return nil, fmt.Errorf("error creating record reader: %v", err)
}

return &ParquetFileDataSource{
var source = &ParquetFileDataSource{
columns: columnNames,
fileReader: rdr,
recordReader: recordReader,
}, nil
circular: circular,
}

if skipErr := source.skipUntilOffset(offset); skipErr != nil {
return nil, skipErr
}

return source, nil
}

func min(a, b int64) int64 {
if a < b {
return a
}
return b
}

func (ds *ParquetFileDataSource) skipUntilOffset(rowsToSkip int64) error {
var skipCount int64 = 0

for ds.currentOffset < rowsToSkip {
if ds.currentRecord == nil {
if !ds.recordReader.Next() {
if ds.circular {
ds.resetReader()
if !ds.recordReader.Next() {
return fmt.Errorf("failed to read after reset")
}
rowsToSkip -= skipCount
} else {
return nil
}
}
ds.currentRecord = ds.recordReader.Record()
}

remainingInRecord := ds.currentRecord.NumRows() - ds.currentOffset
skipCount = min(rowsToSkip-ds.currentOffset, remainingInRecord)

ds.currentOffset += skipCount

if ds.currentOffset >= ds.currentRecord.NumRows() {
ds.currentRecord.Release()
ds.currentRecord = nil
ds.currentOffset = 0
}
}

return nil
}

func (ds *ParquetFileDataSource) GetColumnNames() []string {
Expand All @@ -73,6 +122,12 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) {
if ds.currentRecord == nil {
if ds.recordReader.Next() {
ds.currentRecord = ds.recordReader.Record()
} else if ds.circular {
ds.resetReader()
if !ds.recordReader.Next() {
return nil, fmt.Errorf("failed to read after reset")
}
ds.currentRecord = ds.recordReader.Record()
} else {
return nil, nil
}
Expand All @@ -84,15 +139,15 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) {
var colData interface{}
switch specificArray := col.(type) {
case *array.Int64:
colData = specificArray.Value(ds.currentOffset)
colData = specificArray.Value(int(ds.currentOffset))
case *array.Float64:
colData = specificArray.Value(ds.currentOffset)
colData = specificArray.Value(int(ds.currentOffset))
case *array.String:
colData = specificArray.Value(ds.currentOffset)
colData = specificArray.Value(int(ds.currentOffset))
case *array.Binary:
colData = specificArray.Value(ds.currentOffset)
colData = specificArray.Value(int(ds.currentOffset))
case *array.List:
var beg, end = specificArray.ValueOffsets(ds.currentOffset)
var beg, end = specificArray.ValueOffsets(int(ds.currentOffset))
var values = array.NewSlice(specificArray.ListValues(), beg, end)
switch specificNestedArray := values.(type) {
case *array.Float32:
Expand All @@ -105,7 +160,7 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) {
}
ds.currentOffset++

if int64(ds.currentOffset) >= ds.currentRecord.NumRows() {
if ds.currentOffset >= ds.currentRecord.NumRows() {
ds.currentRecord.Release()
ds.currentRecord = nil
ds.currentOffset = 0
Expand All @@ -118,3 +173,37 @@ func (ds *ParquetFileDataSource) Close() {
ds.recordReader.Release()
ds.fileReader.Close()
}

func (ds *ParquetFileDataSource) resetReader() {
if ds.currentRecord != nil {
ds.currentRecord.Release()
ds.currentRecord = nil
}
ds.recordReader.Release()

mem := memory.NewGoAllocator()
reader, err := pqarrow.NewFileReader(ds.fileReader, pqarrow.ArrowReadProperties{
BatchSize: defaultBatchSize,
}, mem)
if err != nil {
panic(fmt.Sprintf("error creating Arrow file reader: %v", err))
}

var columns []int
for i := 0; i < ds.fileReader.MetaData().Schema.NumColumns(); i++ {
columns = append(columns, i)
}

var rgrs []int
for r := 0; r < ds.fileReader.NumRowGroups(); r++ {
rgrs = append(rgrs, r)
}

recordReader, err := reader.GetRecordReader(context.Background(), columns, rgrs)
if err != nil {
panic(fmt.Sprintf("error creating record reader: %v", err))
}

ds.recordReader = recordReader
ds.currentOffset = 0
}
231 changes: 231 additions & 0 deletions acronis-db-bench/dataset-source/parquet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package dataset_source

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestReadParquetWithOffset(t *testing.T) {
// Test cases with different offset values
testCases := []struct {
name string
offset int64
expected int64 // Expected number of records to read
}{
{
name: "Read from beginning",
offset: 0,
expected: 100, // File has 100 records
},
{
name: "Read from middle",
offset: 50,
expected: 50,
},
{
name: "Read from end",
offset: 99,
expected: 1,
},
{
name: "Read beyond file size",
offset: 200,
expected: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create a new parquet reader
reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, false)
require.NoError(t, err)
defer reader.Close()

// Read all records
var count int64
for {
row, err := reader.GetNextRow()
if err != nil {
break
}
if row == nil {
break
}
count++
}

// Verify the number of records read
assert.Equal(t, tc.expected, count)
})
}
}

func TestReadParquetWithOffsetAndLimit(t *testing.T) {
// Test cases combining offset and limit
testCases := []struct {
name string
offset int64
limit int64
expectedFirst int64
expected int64
}{
{
name: "Read first 10 records",
offset: 0,
limit: 10,
expectedFirst: 1,
expected: 10,
},
{
name: "Read 20 records from middle",
offset: 40,
limit: 20,
expectedFirst: 41,
expected: 20,
},
{
name: "Read beyond file size",
offset: 90,
limit: 20,
expectedFirst: 91,
expected: 10, // Only 10 records left from offset 90
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, false)
require.NoError(t, err)
defer reader.Close()

var count int64
for {
row, err := reader.GetNextRow()
if err != nil {
break
}
if row == nil {
break
}
if count >= tc.limit {
break
}

assert.Equal(t, 1, len(row))

if castedRow, casted := row[0].(int64); casted {
if count == 0 {
assert.Equal(t, tc.expectedFirst, castedRow)
}
} else {
t.Error("wrong data type")
return
}

count++
}

assert.Equal(t, tc.expected, count)
})
}
}

func TestReadParquetCircular(t *testing.T) {
testCases := []struct {
name string
offset int64
expectedRounds int // Number of complete file reads to perform
expectedFirst int64
expectedTotal int64 // Total number of records to read
}{
{
name: "Read file twice",
offset: 0,
expectedRounds: 2,
expectedFirst: 1,
expectedTotal: 201, // 100 records + 101 records (including the first record of the second round)
},
{
name: "Read file twice from middle",
offset: 50,
expectedRounds: 2,
expectedFirst: 51,
expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round)
},
{
name: "Read file twice from middle, skipping file once",
offset: 150,
expectedRounds: 2,
expectedFirst: 51,
expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round)
},
{
name: "Read file twice from middle, skipping file twice",
offset: 250,
expectedRounds: 2,
expectedFirst: 51,
expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round)
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, true)
require.NoError(t, err)
defer reader.Close()

var count int64
var rounds int
var firstRow []interface{}
var isFirstRow = true

for {
row, err := reader.GetNextRow()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if row == nil {
t.Fatalf("Unexpected nil row")
}

// Store the first row we read
if isFirstRow {
firstRow = row
isFirstRow = false
}

// Check if we've completed a full round
if count > 0 && count%100 == 0 && !isFirstRow {
rounds++
// Verify we're back at the beginning by comparing with first row
if rounds > 1 {
assert.Equal(t, firstRow, row, "Row should match after completing a round")
}
}

assert.Equal(t, 1, len(row))

if castedRow, casted := row[0].(int64); casted {
if count == 0 {
assert.Equal(t, tc.expectedFirst, castedRow)
}
} else {
t.Error("wrong data type")
return
}

count++

// Stop after expected number of rounds
if rounds >= tc.expectedRounds {
break
}
}

assert.Equal(t, tc.expectedTotal, count, "Total number of records read")
assert.Equal(t, tc.expectedRounds, rounds, "Number of complete rounds")
})
}
}
Loading
Loading