diff --git a/acronis-db-bench/dataset-source/numbers.parquet b/acronis-db-bench/dataset-source/numbers.parquet new file mode 100644 index 00000000..573ee658 Binary files /dev/null and b/acronis-db-bench/dataset-source/numbers.parquet differ diff --git a/acronis-db-bench/dataset-source/parquet.go b/acronis-db-bench/dataset-source/parquet.go index dc9c037e..d5fd81df 100644 --- a/acronis-db-bench/dataset-source/parquet.go +++ b/acronis-db-bench/dataset-source/parquet.go @@ -20,10 +20,11 @@ type ParquetFileDataSource struct { recordReader pqarrow.RecordReader currentRecord arrow.Record - currentOffset int + currentOffset int64 + circular bool } -func NewParquetFileDataSource(filePath string) (*ParquetFileDataSource, error) { +func NewParquetFileDataSource(filePath string, offset int64, circular bool) (*ParquetFileDataSource, error) { var rdr, err = file.OpenParquetFile(filePath, true) if err != nil { return nil, fmt.Errorf("error opening parquet file: %v", err) @@ -58,11 +59,59 @@ func NewParquetFileDataSource(filePath string) (*ParquetFileDataSource, error) { return nil, fmt.Errorf("error creating record reader: %v", err) } - return &ParquetFileDataSource{ + var source = &ParquetFileDataSource{ columns: columnNames, fileReader: rdr, recordReader: recordReader, - }, nil + circular: circular, + } + + if skipErr := source.skipUntilOffset(offset); skipErr != nil { + return nil, skipErr + } + + return source, nil +} + +func min(a, b int64) int64 { + if a < b { + return a + } + return b +} + +func (ds *ParquetFileDataSource) skipUntilOffset(rowsToSkip int64) error { + var skipCount int64 = 0 + + for ds.currentOffset < rowsToSkip { + if ds.currentRecord == nil { + if !ds.recordReader.Next() { + if ds.circular { + ds.resetReader() + if !ds.recordReader.Next() { + return fmt.Errorf("failed to read after reset") + } + rowsToSkip -= skipCount + } else { + return nil + } + } + ds.currentRecord = ds.recordReader.Record() + } + + remainingInRecord := ds.currentRecord.NumRows() - ds.currentOffset + skipCount = min(rowsToSkip-ds.currentOffset, remainingInRecord) + + ds.currentOffset += skipCount + + if ds.currentOffset >= ds.currentRecord.NumRows() { + ds.currentRecord.Release() + ds.currentRecord = nil + ds.currentOffset = 0 + } + } + + return nil } func (ds *ParquetFileDataSource) GetColumnNames() []string { @@ -73,6 +122,12 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { if ds.currentRecord == nil { if ds.recordReader.Next() { ds.currentRecord = ds.recordReader.Record() + } else if ds.circular { + ds.resetReader() + if !ds.recordReader.Next() { + return nil, fmt.Errorf("failed to read after reset") + } + ds.currentRecord = ds.recordReader.Record() } else { return nil, nil } @@ -84,15 +139,15 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { var colData interface{} switch specificArray := col.(type) { case *array.Int64: - colData = specificArray.Value(ds.currentOffset) + colData = specificArray.Value(int(ds.currentOffset)) case *array.Float64: - colData = specificArray.Value(ds.currentOffset) + colData = specificArray.Value(int(ds.currentOffset)) case *array.String: - colData = specificArray.Value(ds.currentOffset) + colData = specificArray.Value(int(ds.currentOffset)) case *array.Binary: - colData = specificArray.Value(ds.currentOffset) + colData = specificArray.Value(int(ds.currentOffset)) case *array.List: - var beg, end = specificArray.ValueOffsets(ds.currentOffset) + var beg, end = specificArray.ValueOffsets(int(ds.currentOffset)) var values = array.NewSlice(specificArray.ListValues(), beg, end) switch specificNestedArray := values.(type) { case *array.Float32: @@ -105,7 +160,7 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { } ds.currentOffset++ - if int64(ds.currentOffset) >= ds.currentRecord.NumRows() { + if ds.currentOffset >= ds.currentRecord.NumRows() { ds.currentRecord.Release() ds.currentRecord = nil ds.currentOffset = 0 @@ -118,3 +173,37 @@ func (ds *ParquetFileDataSource) Close() { ds.recordReader.Release() ds.fileReader.Close() } + +func (ds *ParquetFileDataSource) resetReader() { + if ds.currentRecord != nil { + ds.currentRecord.Release() + ds.currentRecord = nil + } + ds.recordReader.Release() + + mem := memory.NewGoAllocator() + reader, err := pqarrow.NewFileReader(ds.fileReader, pqarrow.ArrowReadProperties{ + BatchSize: defaultBatchSize, + }, mem) + if err != nil { + panic(fmt.Sprintf("error creating Arrow file reader: %v", err)) + } + + var columns []int + for i := 0; i < ds.fileReader.MetaData().Schema.NumColumns(); i++ { + columns = append(columns, i) + } + + var rgrs []int + for r := 0; r < ds.fileReader.NumRowGroups(); r++ { + rgrs = append(rgrs, r) + } + + recordReader, err := reader.GetRecordReader(context.Background(), columns, rgrs) + if err != nil { + panic(fmt.Sprintf("error creating record reader: %v", err)) + } + + ds.recordReader = recordReader + ds.currentOffset = 0 +} diff --git a/acronis-db-bench/dataset-source/parquet_test.go b/acronis-db-bench/dataset-source/parquet_test.go new file mode 100644 index 00000000..7719e905 --- /dev/null +++ b/acronis-db-bench/dataset-source/parquet_test.go @@ -0,0 +1,231 @@ +package dataset_source + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReadParquetWithOffset(t *testing.T) { + // Test cases with different offset values + testCases := []struct { + name string + offset int64 + expected int64 // Expected number of records to read + }{ + { + name: "Read from beginning", + offset: 0, + expected: 100, // File has 100 records + }, + { + name: "Read from middle", + offset: 50, + expected: 50, + }, + { + name: "Read from end", + offset: 99, + expected: 1, + }, + { + name: "Read beyond file size", + offset: 200, + expected: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a new parquet reader + reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, false) + require.NoError(t, err) + defer reader.Close() + + // Read all records + var count int64 + for { + row, err := reader.GetNextRow() + if err != nil { + break + } + if row == nil { + break + } + count++ + } + + // Verify the number of records read + assert.Equal(t, tc.expected, count) + }) + } +} + +func TestReadParquetWithOffsetAndLimit(t *testing.T) { + // Test cases combining offset and limit + testCases := []struct { + name string + offset int64 + limit int64 + expectedFirst int64 + expected int64 + }{ + { + name: "Read first 10 records", + offset: 0, + limit: 10, + expectedFirst: 1, + expected: 10, + }, + { + name: "Read 20 records from middle", + offset: 40, + limit: 20, + expectedFirst: 41, + expected: 20, + }, + { + name: "Read beyond file size", + offset: 90, + limit: 20, + expectedFirst: 91, + expected: 10, // Only 10 records left from offset 90 + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, false) + require.NoError(t, err) + defer reader.Close() + + var count int64 + for { + row, err := reader.GetNextRow() + if err != nil { + break + } + if row == nil { + break + } + if count >= tc.limit { + break + } + + assert.Equal(t, 1, len(row)) + + if castedRow, casted := row[0].(int64); casted { + if count == 0 { + assert.Equal(t, tc.expectedFirst, castedRow) + } + } else { + t.Error("wrong data type") + return + } + + count++ + } + + assert.Equal(t, tc.expected, count) + }) + } +} + +func TestReadParquetCircular(t *testing.T) { + testCases := []struct { + name string + offset int64 + expectedRounds int // Number of complete file reads to perform + expectedFirst int64 + expectedTotal int64 // Total number of records to read + }{ + { + name: "Read file twice", + offset: 0, + expectedRounds: 2, + expectedFirst: 1, + expectedTotal: 201, // 100 records + 101 records (including the first record of the second round) + }, + { + name: "Read file twice from middle", + offset: 50, + expectedRounds: 2, + expectedFirst: 51, + expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round) + }, + { + name: "Read file twice from middle, skipping file once", + offset: 150, + expectedRounds: 2, + expectedFirst: 51, + expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round) + }, + { + name: "Read file twice from middle, skipping file twice", + offset: 250, + expectedRounds: 2, + expectedFirst: 51, + expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round) + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, true) + require.NoError(t, err) + defer reader.Close() + + var count int64 + var rounds int + var firstRow []interface{} + var isFirstRow = true + + for { + row, err := reader.GetNextRow() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if row == nil { + t.Fatalf("Unexpected nil row") + } + + // Store the first row we read + if isFirstRow { + firstRow = row + isFirstRow = false + } + + // Check if we've completed a full round + if count > 0 && count%100 == 0 && !isFirstRow { + rounds++ + // Verify we're back at the beginning by comparing with first row + if rounds > 1 { + assert.Equal(t, firstRow, row, "Row should match after completing a round") + } + } + + assert.Equal(t, 1, len(row)) + + if castedRow, casted := row[0].(int64); casted { + if count == 0 { + assert.Equal(t, tc.expectedFirst, castedRow) + } + } else { + t.Error("wrong data type") + return + } + + count++ + + // Stop after expected number of rounds + if rounds >= tc.expectedRounds { + break + } + } + + assert.Equal(t, tc.expectedTotal, count, "Total number of records read") + assert.Equal(t, tc.expectedRounds, rounds, "Number of complete rounds") + }) + } +} diff --git a/acronis-db-bench/engine/helpers.go b/acronis-db-bench/engine/helpers.go index 5f87d602..7f0a2195 100644 --- a/acronis-db-bench/engine/helpers.go +++ b/acronis-db-bench/engine/helpers.go @@ -6,6 +6,7 @@ import ( "regexp" "sort" "strings" + "sync" "github.com/acronis/perfkit/benchmark" "github.com/acronis/perfkit/db" @@ -166,12 +167,12 @@ func FormatSQL(sqlTemlate string, dialectName db.DialectName) string { } // NewParquetFileDataSourceForRandomizer creates a new parquet DataSetSource instance and register as plugin for Randomizer -func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath string) error { +func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath string, offset int64, circular bool) error { if bench.Randomizer == nil { bench.Randomizer = benchmark.NewRandomizer(bench.CommonOpts.RandSeed, bench.CommonOpts.Workers) } - var source, err = dataset.NewParquetFileDataSource(filePath) + var source, err = dataset.NewParquetFileDataSource(filePath, offset, circular) if err != nil { return err } @@ -186,6 +187,7 @@ func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath var dataSourcePlugin = &DataSetSourcePlugin{ source: source, columns: registeredColumns, + mx: sync.Mutex{}, } bench.Randomizer.RegisterPlugin("dataset", dataSourcePlugin) @@ -199,11 +201,12 @@ func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath type DataSetSourcePlugin struct { source dataset.DataSetSource - columns []string - currentValues map[string]interface{} + columns []string + + mx sync.Mutex } -func (p *DataSetSourcePlugin) GenCommonFakeValue(columnType string, rz *benchmark.Randomizer, cardinality int) (bool, interface{}) { +func (p *DataSetSourcePlugin) GenCommonFakeValues(columnType string, rz *benchmark.Randomizer, cardinality int) (bool, map[string]interface{}) { if len(p.columns) == 0 { return false, nil } @@ -213,21 +216,24 @@ func (p *DataSetSourcePlugin) GenCommonFakeValue(columnType string, rz *benchmar return false, nil } + p.mx.Lock() + defer p.mx.Unlock() + var row, err = p.source.GetNextRow() if err != nil { return false, nil } if row == nil { - return false, nil + return true, nil } - p.currentValues = make(map[string]interface{}, len(row)) + var currentValues = make(map[string]interface{}, len(row)) for i, value := range row { - p.currentValues[p.columns[i]] = value + currentValues[p.columns[i]] = value } - return true, p.currentValues[columnType] + return false, currentValues } func (p *DataSetSourcePlugin) GenFakeValue(columnType string, rz *benchmark.Randomizer, cardinality int, preGenerated map[string]interface{}) (bool, interface{}) { @@ -235,7 +241,7 @@ func (p *DataSetSourcePlugin) GenFakeValue(columnType string, rz *benchmark.Rand return false, nil } - var value, ok = p.currentValues[columnType] + var value, ok = preGenerated[columnType] if !ok { return false, nil } diff --git a/acronis-db-bench/engine/main.go b/acronis-db-bench/engine/main.go index b91e3fea..e84aa5f4 100644 --- a/acronis-db-bench/engine/main.go +++ b/acronis-db-bench/engine/main.go @@ -229,6 +229,10 @@ func Main() { // Has to be returned back to connection pool because it is not used anywhere else c.Release() + if b.Logger.GetLevel() > logger.LevelInfo && !testOpts.BenchOpts.Info { + b.Log(logger.LevelTrace, 0, getDBInfo(b, content)) + } + if testOpts.BenchOpts.Info || b.Logger.GetLevel() > logger.LevelInfo { if testOpts.BenchOpts.Info { fmt.Printf(getDBInfo(b, content)) //nolint:staticcheck @@ -252,50 +256,6 @@ func Main() { fmt.Printf("to collect the profiler log run: go tool pprof 'http://localhost:%d/debug/pprof/profile?seconds=10'\n", testOpts.BenchOpts.ProfilerPort) } - b.Init = func() { - b.Vault.(*DBTestData).TenantsCache = tenants.NewTenantsCache(b) - - b.Vault.(*DBTestData).TenantsCache.SetTenantsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.TenantsWorkingSet) - b.Vault.(*DBTestData).TenantsCache.SetCTIsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.CTIsWorkingSet) - - var tenantCacheDBOpts = b.TestOpts.(*TestOpts).DBOpts - if b.TestOpts.(*TestOpts).BenchOpts.TenantConnString != "" { - tenantCacheDBOpts.ConnString = b.TestOpts.(*TestOpts).BenchOpts.TenantConnString - } - - var tenantCacheDatabase *DBConnector - if tenantCacheDatabase, err = NewDBConnector(&tenantCacheDBOpts, -1, true, b.Logger, 1); err != nil { - b.Exit("db: cannot create tenants cache connection: %v", err) - return - } - - if err = b.Vault.(*DBTestData).TenantsCache.Init(tenantCacheDatabase.Database); err != nil { - b.Exit("db: cannot initialize tenants cache: %v", err) - } - - if b.Logger.GetLevel() > logger.LevelInfo && !testOpts.BenchOpts.Info { - b.Log(logger.LevelTrace, 0, getDBInfo(b, content)) - } - - tenantCacheDatabase.Release() - - if b.TestOpts.(*TestOpts).BenchOpts.Events { - var workingConn *DBConnector - if workingConn, err = NewDBConnector(&b.TestOpts.(*TestOpts).DBOpts, -1, true, b.Logger, 1); err != nil { - return - } - - b.Vault.(*DBTestData).EventBus = events.NewEventBus(workingConn.Database, b.Logger) - b.Vault.(*DBTestData).EventBus.CreateTables() - } - - if b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource != "" { - if err = NewParquetFileDataSourceForRandomizer(b, b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource); err != nil { - b.Exit("failed to create parquet data source: %v", err) - } - } - } - b.Finish = func() { b.Logger.Debug("finishing") if b.TestOpts.(*TestOpts).BenchOpts.Events { diff --git a/acronis-db-bench/engine/workers.go b/acronis-db-bench/engine/workers.go index 810a3d92..98dff7de 100644 --- a/acronis-db-bench/engine/workers.go +++ b/acronis-db-bench/engine/workers.go @@ -14,6 +14,7 @@ import ( "github.com/acronis/perfkit/db" "github.com/acronis/perfkit/logger" + events "github.com/acronis/perfkit/acronis-db-bench/event-bus" tenants "github.com/acronis/perfkit/acronis-db-bench/tenants-cache" ) @@ -21,83 +22,138 @@ import ( * Worker initialization */ -func initWorker(worker *benchmark.BenchmarkWorker, testDesc *TestDesc, rowsRequired uint64) { - b := worker.Benchmark - workerID := worker.WorkerID +func initGeneric(b *benchmark.Benchmark, testDesc *TestDesc, rowsRequired uint64) { + b.Vault.(*DBTestData).TenantsCache = tenants.NewTenantsCache(b) - if worker.Data == nil { - var workerData DBWorkerData - var err error + b.Vault.(*DBTestData).TenantsCache.SetTenantsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.TenantsWorkingSet) + b.Vault.(*DBTestData).TenantsCache.SetCTIsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.CTIsWorkingSet) - if workerData.workingConn, err = NewDBConnector(&b.TestOpts.(*TestOpts).DBOpts, workerID, false, worker.Logger, 1); err != nil { + var tenantCacheDBOpts = b.TestOpts.(*TestOpts).DBOpts + if b.TestOpts.(*TestOpts).BenchOpts.TenantConnString != "" { + tenantCacheDBOpts.ConnString = b.TestOpts.(*TestOpts).BenchOpts.TenantConnString + } + + var tenantCacheDatabase, err = NewDBConnector(&tenantCacheDBOpts, -1, true, b.Logger, 1) + if err != nil { + b.Exit("db: cannot create tenants cache connection: %v", err) + return + } + + if err = b.Vault.(*DBTestData).TenantsCache.Init(tenantCacheDatabase.Database); err != nil { + b.Exit("db: cannot initialize tenants cache: %v", err) + } + + tenantCacheDatabase.Release() + + if b.TestOpts.(*TestOpts).BenchOpts.Events { + var workingConn *DBConnector + if workingConn, err = NewDBConnector(&b.TestOpts.(*TestOpts).DBOpts, -1, true, b.Logger, 1); err != nil { return } - worker.Data = &workerData + b.Vault.(*DBTestData).EventBus = events.NewEventBus(workingConn.Database, b.Logger) + b.Vault.(*DBTestData).EventBus.CreateTables() } - if workerID == 0 { - conn := worker.Data.(*DBWorkerData).workingConn - testData := b.Vault.(*DBTestData) - testData.TestDesc = testDesc + tableName := testDesc.Table.TableName + if tableName == "" { + testDesc.Table.RowsCount = 0 + return + } - // Initialize TenantsCache if it's nil - if testData.TenantsCache == nil { - testData.TenantsCache = tenants.NewTenantsCache(b) + var ddlConnDatabase *DBConnector + if ddlConnDatabase, err = NewDBConnector(&tenantCacheDBOpts, -1, true, b.Logger, 1); err != nil { + b.Exit("db: cannot create connection for DDL: %v", err) + return + } + + conn := ddlConnDatabase + testData := b.Vault.(*DBTestData) + testData.TestDesc = testDesc + + t := testRegistry.GetTableByName(tableName) + + b.Logger.Debug("initializing table '%s'", tableName) + if testDesc.IsReadonly { + t.Create(conn, b) + b.Logger.Debug("readonly test, skipping table '%s' initialization", tableName) + if exists, err := conn.Database.TableExists(tableName); err != nil { + b.Exit(fmt.Sprintf("db: cannot check if table '%s' exists: %v", tableName, err)) + } else if !exists { + b.Exit("The '%s' table doesn't exist, please create tables using -I option, or use individual insert test using the -t `insert-***`", tableName) } - worker.Randomizer.RegisterPlugin("tenant", testData.TenantsCache) + } else { + b.Logger.Debug("creating table '%s'", tableName) + t.Create(conn, b) + } - tableName := testDesc.Table.TableName + var session = conn.Database.Session(conn.Database.Context(context.Background(), false)) + var rowNum int64 + if rows, err := session.Select(tableName, &db.SelectCtrl{Fields: []string{"COUNT(0)"}}); err != nil { + b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, err)) + } else { + for rows.Next() { + if scanErr := rows.Scan(&rowNum); scanErr != nil { + b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, scanErr)) + } + } + rows.Close() + } - t := testRegistry.GetTableByName(tableName) + testDesc.Table.RowsCount = uint64(rowNum) + b.Logger.Debug("table '%s' has %d rows", tableName, testDesc.Table.RowsCount) + + if rowsRequired > 0 { + if testDesc.Table.RowsCount < rowsRequired { + b.Exit(fmt.Sprintf("table '%s' has %d rows, but this test requires at least %d rows, please insert it first and then re-run the test", + testDesc.Table.TableName, testDesc.Table.RowsCount, rowsRequired)) + } + } - if tableName == "" { - testDesc.Table.RowsCount = 0 + ddlConnDatabase.Release() + + if b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource != "" { + var offset int64 + var circular bool + if testDesc.IsReadonly { + offset = 0 + circular = true } else { - b.Logger.Debug("initializing table '%s'", tableName) - if testDesc.IsReadonly { - t.Create(conn, b) - b.Logger.Debug("readonly test, skipping table '%s' initialization", tableName) - if exists, err := conn.Database.TableExists(tableName); err != nil { - b.Exit(fmt.Sprintf("db: cannot check if table '%s' exists: %v", tableName, err)) - } else if !exists { - b.Exit("The '%s' table doesn't exist, please create tables using -I option, or use individual insert test using the -t `insert-***`", tableName) - } - } else { - b.Logger.Debug("creating table '%s'", tableName) - t.Create(conn, b) - } + offset = rowNum + circular = false + } - var session = conn.Database.Session(conn.Database.Context(context.Background(), false)) - var rowNum int64 - if rows, err := session.Select(tableName, &db.SelectCtrl{Fields: []string{"COUNT(0)"}}); err != nil { - b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, err)) - } else { - for rows.Next() { - if scanErr := rows.Scan(&rowNum); scanErr != nil { - b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, scanErr)) - } - } - rows.Close() - } + if err = NewParquetFileDataSourceForRandomizer(b, b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource, offset, circular); err != nil { + b.Exit("failed to create parquet data source: %v", err) + } + } +} - testDesc.Table.RowsCount = uint64(rowNum) - b.Logger.Debug("table '%s' has %d rows", tableName, testDesc.Table.RowsCount) +func initWorker(worker *benchmark.BenchmarkWorker) { + b := worker.Benchmark + workerID := worker.WorkerID - if rowsRequired > 0 { - if testDesc.Table.RowsCount < rowsRequired { - b.Exit(fmt.Sprintf("table '%s' has %d rows, but this test requires at least %d rows, please insert it first and then re-run the test", - testDesc.Table.TableName, testDesc.Table.RowsCount, rowsRequired)) - } - } + if worker.Data == nil { + var workerData DBWorkerData + var err error + + if workerData.workingConn, err = NewDBConnector(&b.TestOpts.(*TestOpts).DBOpts, workerID, false, worker.Logger, 1); err != nil { + return } + + worker.Data = &workerData } + worker.Logger.Trace("worker is initialized") } func initCommon(b *benchmark.Benchmark, testDesc *TestDesc, rowsRequired uint64) { + b.Init = func() { + initGeneric(b, testDesc, rowsRequired) + } + b.WorkerInitFunc = func(worker *benchmark.BenchmarkWorker) { - initWorker(worker, testDesc, rowsRequired) + initWorker(worker) } b.Metric = func() (metric string) { @@ -513,12 +569,21 @@ func InsertMultiValueDataWorker(b *benchmark.Benchmark, c *DBConnector, testDesc if err != nil { b.Exit(err) } + + if genColumns == nil { + break + } + values = append(values, vals) if i == 0 { columns = genColumns } } + if len(values) == 0 { + return + } + var session = c.Database.Session(c.Database.Context(context.Background(), false)) if txErr := session.Transact(func(tx db.DatabaseAccessor) error { return tx.BulkInsert(testDesc.Table.TableName, values, columns) diff --git a/acronis-db-bench/tenants-cache/tenant_generator.go b/acronis-db-bench/tenants-cache/tenant_generator.go index 56ac5b2e..845d680b 100644 --- a/acronis-db-bench/tenants-cache/tenant_generator.go +++ b/acronis-db-bench/tenants-cache/tenant_generator.go @@ -985,7 +985,7 @@ func (tc *TenantsCache) GetRandomCTIUUID(rz *benchmark.Randomizer, testCardinali return tc.ctiUuids[rz.IntnExp(cardinality)], nil } -func (tc *TenantsCache) GenCommonFakeValue(columnType string, rz *benchmark.Randomizer, cardinality int) (bool, interface{}) { +func (tc *TenantsCache) GenCommonFakeValues(columnType string, rz *benchmark.Randomizer, cardinality int) (bool, map[string]interface{}) { if columnType != "tenant_uuid" && columnType != "customer_uuid" && columnType != "partner_uuid" { return false, nil } @@ -1003,7 +1003,10 @@ func (tc *TenantsCache) GenCommonFakeValue(columnType string, rz *benchmark.Rand tc.benchmark.Exit(err.Error()) } - return true, tenantUUID + var currentValues = make(map[string]interface{}, 1) + currentValues[columnType] = tenantUUID + + return false, currentValues } func (tc *TenantsCache) GenFakeValue(columnType string, rz *benchmark.Randomizer, cardinality int, preGenerated map[string]interface{}) (bool, interface{}) { diff --git a/acronis-db-bench/test-groups/basic-scenarios/medium_test.go b/acronis-db-bench/test-groups/basic-scenarios/medium_test.go index 7ef197fa..40b1a19c 100644 --- a/acronis-db-bench/test-groups/basic-scenarios/medium_test.go +++ b/acronis-db-bench/test-groups/basic-scenarios/medium_test.go @@ -2,7 +2,6 @@ package basic_scenarios import ( "github.com/acronis/perfkit/acronis-db-bench/engine" - tenants "github.com/acronis/perfkit/acronis-db-bench/tenants-cache" "github.com/acronis/perfkit/benchmark" ) @@ -54,7 +53,8 @@ func (suite *TestingSuite) TestMediumTableTests() { b.AddOpts = func() benchmark.TestOpts { var testOpts = engine.TestOpts{ - DBOpts: engine.DatabaseOpts{ConnString: suite.ConnString}, + DBOpts: engine.DatabaseOpts{ConnString: suite.ConnString}, + BenchOpts: engine.BenchOpts{TenantsWorkingSet: 50}, } b.Cli.AddFlagGroup("Database options", "", &testOpts.DBOpts) @@ -62,7 +62,7 @@ func (suite *TestingSuite) TestMediumTableTests() { b.Cli.AddFlagGroup("Testcase specific options", "", &testOpts.TestcaseOpts) b.Cli.AddFlagGroup("CTI-pattern simulation test options", "", &testOpts.CTIOpts) - return &testOpts + return mediumTestOpts } b.InitOpts() @@ -78,23 +78,7 @@ func (suite *TestingSuite) TestMediumTableTests() { d.Scores[s] = []benchmark.Score{} } - b.Init = func() { - b.Vault.(*engine.DBTestData).TenantsCache = tenants.NewTenantsCache(b) - - var tenantCacheDatabase, err = engine.NewDBConnector(&b.TestOpts.(*engine.TestOpts).DBOpts, -1, true, b.Logger, 1) - if err != nil { - b.Exit("db: cannot create tenants cache connection: %v", err) - return - } - - if err = b.Vault.(*engine.DBTestData).TenantsCache.Init(tenantCacheDatabase.Database); err != nil { - b.Exit("db: cannot initialize tenants cache: %v", err) - } - } - - mediumTableTestSuite.Execute(b, &engine.TestOpts{ - DBOpts: engine.DatabaseOpts{ConnString: suite.ConnString}, - }, 1) + mediumTableTestSuite.Execute(b, nil, 1) // Clear tables engine.CleanupTables(b) diff --git a/acronis-db-bench/test-groups/vector-search/tests.go b/acronis-db-bench/test-groups/vector-search/tests.go index f4b6a696..7fc70845 100644 --- a/acronis-db-bench/test-groups/vector-search/tests.go +++ b/acronis-db-bench/test-groups/vector-search/tests.go @@ -25,7 +25,7 @@ func init() { // TestTableVector768 is table to store 768-dimensions vector objects var TestTableVector768 = engine.TestTable{ TableName: "acronis_db_bench_vector_768", - Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Databases: []db.DialectName{db.POSTGRES, db.ELASTICSEARCH, db.OPENSEARCH}, Columns: [][]interface{}{ {"id", "dataset.id"}, {"embedding", "dataset.emb.list.item"}, @@ -57,7 +57,7 @@ var TestInsertVector768MultiValue = engine.TestDesc{ Description: "insert a 768-dim vectors with ids into the 'vector' table by batches", Category: engine.TestInsert, IsReadonly: false, - Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Databases: []db.DialectName{db.POSTGRES, db.ELASTICSEARCH, db.OPENSEARCH}, Table: TestTableVector768, LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { engine.TestGeneric(b, testDesc, engine.InsertMultiValueDataWorker, 0) @@ -70,7 +70,7 @@ var TestSelectVector768NearestL2 = engine.TestDesc{ Metric: "rows/sec", Description: "selects k nearest vectors by L2 norm from the 'vector' table to the given 768-dim vector", Category: engine.TestSelect, - IsReadonly: false, + IsReadonly: true, Databases: []db.DialectName{db.POSTGRES, db.ELASTICSEARCH, db.OPENSEARCH}, Table: TestTableVector768, LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { diff --git a/benchmark/faker.go b/benchmark/faker.go index afc91f31..39f921a1 100644 --- a/benchmark/faker.go +++ b/benchmark/faker.go @@ -198,7 +198,7 @@ func NewRandomizer(seed int64, workerID int) *Randomizer { // RandomizerPlugin is a type RandomizerPlugin interface { - GenCommonFakeValue(columnType string, rz *Randomizer, cardinality int) (bool, interface{}) + GenCommonFakeValues(columnType string, rz *Randomizer, cardinality int) (bool, map[string]interface{}) GenFakeValue(columnType string, rz *Randomizer, cardinality int, preGenerated map[string]interface{}) (bool, interface{}) } @@ -339,11 +339,18 @@ func (rz *Randomizer) GenFakeData(colConfs *[]DBFakeColumnConf, WithAutoInc bool var preGenerated map[string]interface{} for _, plugin := range rz.plugins { for _, c := range *colConfs { - if exists, value := plugin.GenCommonFakeValue(c.ColumnType, rz, c.Cardinality); exists { - if preGenerated == nil { - preGenerated = make(map[string]interface{}) + if shouldStop, commonValues := plugin.GenCommonFakeValues(c.ColumnType, rz, c.Cardinality); !shouldStop { + if commonValues != nil { + if preGenerated == nil { + preGenerated = make(map[string]interface{}) + } + for colType, val := range commonValues { + preGenerated[colType] = val + } } - preGenerated[c.ColumnType] = value + } else { + // End of generation + return nil, nil, nil } } } @@ -372,11 +379,18 @@ func (rz *Randomizer) GenFakeDataAsMap(colConfs *[]DBFakeColumnConf, WithAutoInc var preGenerated map[string]interface{} for _, plugin := range rz.plugins { for _, c := range *colConfs { - if exists, value := plugin.GenCommonFakeValue(c.ColumnType, rz, c.Cardinality); exists { - if preGenerated == nil { - preGenerated = make(map[string]interface{}) + if shouldStop, commonValues := plugin.GenCommonFakeValues(c.ColumnType, rz, c.Cardinality); !shouldStop { + if commonValues != nil { + if preGenerated == nil { + preGenerated = make(map[string]interface{}) + } + for colType, val := range commonValues { + preGenerated[colType] = val + } } - preGenerated[c.ColumnType] = value + } else { + // End of generation + return nil, nil } } }