From ae8cc92bee08a402b06bcee67b4e30ffcefccccf Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Fri, 16 May 2025 16:39:06 +0300 Subject: [PATCH 01/17] Added reading from offset to parquet file reader --- acronis-db-bench/dataset-source/parquet.go | 33 +++++++++++++++++++++- acronis-db-bench/engine/helpers.go | 2 +- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/acronis-db-bench/dataset-source/parquet.go b/acronis-db-bench/dataset-source/parquet.go index dc9c037e..7119ad2d 100644 --- a/acronis-db-bench/dataset-source/parquet.go +++ b/acronis-db-bench/dataset-source/parquet.go @@ -21,9 +21,11 @@ type ParquetFileDataSource struct { currentRecord arrow.Record currentOffset int + globalOffset int64 + rowsToSkip int64 } -func NewParquetFileDataSource(filePath string) (*ParquetFileDataSource, error) { +func NewParquetFileDataSource(filePath string, offset int64) (*ParquetFileDataSource, error) { var rdr, err = file.OpenParquetFile(filePath, true) if err != nil { return nil, fmt.Errorf("error opening parquet file: %v", err) @@ -62,6 +64,7 @@ func NewParquetFileDataSource(filePath string) (*ParquetFileDataSource, error) { columns: columnNames, fileReader: rdr, recordReader: recordReader, + rowsToSkip: offset, }, nil } @@ -70,6 +73,27 @@ func (ds *ParquetFileDataSource) GetColumnNames() []string { } func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { + for ds.globalOffset < ds.rowsToSkip { + if ds.currentRecord == nil { + if !ds.recordReader.Next() { + return nil, nil + } + ds.currentRecord = ds.recordReader.Record() + } + + remainingInRecord := ds.currentRecord.NumRows() - int64(ds.currentOffset) + skipCount := min(ds.rowsToSkip-ds.globalOffset, remainingInRecord) + + ds.currentOffset += int(skipCount) + ds.globalOffset += skipCount + + if int64(ds.currentOffset) >= ds.currentRecord.NumRows() { + ds.currentRecord.Release() + ds.currentRecord = nil + ds.currentOffset = 0 + } + } + if ds.currentRecord == nil { if ds.recordReader.Next() { ds.currentRecord = ds.recordReader.Record() @@ -118,3 +142,10 @@ func (ds *ParquetFileDataSource) Close() { ds.recordReader.Release() ds.fileReader.Close() } + +func min(a, b int64) int64 { + if a < b { + return a + } + return b +} diff --git a/acronis-db-bench/engine/helpers.go b/acronis-db-bench/engine/helpers.go index 5f87d602..5e19fe43 100644 --- a/acronis-db-bench/engine/helpers.go +++ b/acronis-db-bench/engine/helpers.go @@ -171,7 +171,7 @@ func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath bench.Randomizer = benchmark.NewRandomizer(bench.CommonOpts.RandSeed, bench.CommonOpts.Workers) } - var source, err = dataset.NewParquetFileDataSource(filePath) + var source, err = dataset.NewParquetFileDataSource(filePath, 0) if err != nil { return err } From 37c17dfa941f6091df0dcdd242a934ddab421dfc Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Fri, 16 May 2025 16:58:48 +0300 Subject: [PATCH 02/17] Added end-to-end tests for parquet file reader --- .../dataset-source/numbers.parquet | Bin 0 -> 2106 bytes .../dataset-source/parquet_test.go | 117 ++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 acronis-db-bench/dataset-source/numbers.parquet create mode 100644 acronis-db-bench/dataset-source/parquet_test.go diff --git a/acronis-db-bench/dataset-source/numbers.parquet b/acronis-db-bench/dataset-source/numbers.parquet new file mode 100644 index 0000000000000000000000000000000000000000..573ee6583f579479510ab136ee43250a1611a8f5 GIT binary patch literal 2106 zcmb8xO>Em_7zc2t>(Uh3Z46S9EsSDV*+$ysqv_f*#vHdv>^13{t@-fSI)62BYCBot zd~GV)VTT=%*nz{4kl2C47$A1w09+VCh$1A!k=;0OK?sQ}H~z1kmUfH-u70)sKCd4? z?}w|NWwK#!zxTnA_e=MJcf;kS_EXdycfX4obWweS1O3#lVGq?0yTJ_u;DJGS0-l5+ z7=}Ht7xuwZupgd=XTS?1@GSVi4+r2N1mHP11kb}LyZ|)32rt257=xGL2polD@CpQB z946p6gdhwNh{7bqUo!6ic}y#U%AfKsoHkspfrHntUp{@rx98>!>Fe_Jp<6eV_=kmw+i$IY^IPZC zom=I(WAA3}zEk@W<7!e!1TnLVj<*KNzEY?uJ}e4bj_&hQocn0kdgHYP~^?Y`FXZe_)IXpks`3 z@AGfPynP6c+}k^HZ*c5a_n7DZ49_3LW~Rv7D9lYCb@Hg&KZdcQAd+#zHReHD${+N( zYoe)$4c}+GuF;yQsGSwLsy7X@LC=m}xJIv`4OetsG)pQS9HmXsP|;vEbscvNORQVC z4cXV#8ZN^j=SH<})-NLBWL&Y+tG&cK^Q>yBuB_~Aa|&Crx;64bx@l^1RdH%)8ny0L zpej{V*&?+WswFC-C8GN^S~byKGfU*z<@1U0X?lHIF*~cEd&@zR>?mb>;E{SX)0&#J zXJkh;Tg}^wA(Q32yRM2>wQkp^Yf@dTcS(7*u2`>E+ay^Fn?kFaq%{(nzz@BS*50eP z2PYyEQF@*74Kdm5Vt&@yUcLwJvXoyYeVR(X(^y~`O^#;Evblsdrro=5$t*X;&CR#d zES@nlA)&yKcxN>iYH%!Hks_V2l#i{FcL;^DE_cyCVWFOX=NAlJmy+34$w*kjIrK}$ z!cwwL^0bS1zF5etDk;5f$5=ir89MT3H|q^cg?OwG&Bo*;shiQH2!BP$XF|?+!n%@7 zAV;=L#)@(4l96c%Nz4nWiRE*-`J%PR@-vUrewFmkM;qE!T`Z3|32Q=uzv|=`YC^uV zR)}nl3GX&@R?*0HG2Z{Gzb2VWkqm3cNxrt4n+d6fxNffn)=QZyKDKt`Zb+%HLe`)d z<#mjyC2JT%3Nof&jHZwITGHK6Gy23Qm>|~oW?X~)6 zj!E4Sy(N*HEZ1GAaLsg=pCQ|cVTus%^-bg2-ipg4f-9U$WIw0Ke#EilY(hKA+C9nc z-`v0UrHtQXk2&*6o}aV#vCKGKNhf?BI4Iv}O{`yOs@Aw%tBu= tc.limit { + break + } + count++ + } + + assert.Equal(t, tc.expected, count) + }) + } +} From f1a94dfc9183cc7d7b855e8b14f9690dad3f4c75 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Fri, 16 May 2025 21:09:36 +0300 Subject: [PATCH 03/17] Added ability to stop generating values by plugins to benchmark faker --- acronis-db-bench/engine/helpers.go | 4 +-- .../tenants-cache/tenant_generator.go | 2 +- benchmark/faker.go | 26 +++++++++++++------ 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/acronis-db-bench/engine/helpers.go b/acronis-db-bench/engine/helpers.go index 5e19fe43..a74bd8c7 100644 --- a/acronis-db-bench/engine/helpers.go +++ b/acronis-db-bench/engine/helpers.go @@ -219,7 +219,7 @@ func (p *DataSetSourcePlugin) GenCommonFakeValue(columnType string, rz *benchmar } if row == nil { - return false, nil + return true, nil } p.currentValues = make(map[string]interface{}, len(row)) @@ -227,7 +227,7 @@ func (p *DataSetSourcePlugin) GenCommonFakeValue(columnType string, rz *benchmar p.currentValues[p.columns[i]] = value } - return true, p.currentValues[columnType] + return false, p.currentValues[columnType] } func (p *DataSetSourcePlugin) GenFakeValue(columnType string, rz *benchmark.Randomizer, cardinality int, preGenerated map[string]interface{}) (bool, interface{}) { diff --git a/acronis-db-bench/tenants-cache/tenant_generator.go b/acronis-db-bench/tenants-cache/tenant_generator.go index 56ac5b2e..5eac8a4d 100644 --- a/acronis-db-bench/tenants-cache/tenant_generator.go +++ b/acronis-db-bench/tenants-cache/tenant_generator.go @@ -1003,7 +1003,7 @@ func (tc *TenantsCache) GenCommonFakeValue(columnType string, rz *benchmark.Rand tc.benchmark.Exit(err.Error()) } - return true, tenantUUID + return false, tenantUUID } func (tc *TenantsCache) GenFakeValue(columnType string, rz *benchmark.Randomizer, cardinality int, preGenerated map[string]interface{}) (bool, interface{}) { diff --git a/benchmark/faker.go b/benchmark/faker.go index afc91f31..15ad1310 100644 --- a/benchmark/faker.go +++ b/benchmark/faker.go @@ -339,11 +339,16 @@ func (rz *Randomizer) GenFakeData(colConfs *[]DBFakeColumnConf, WithAutoInc bool var preGenerated map[string]interface{} for _, plugin := range rz.plugins { for _, c := range *colConfs { - if exists, value := plugin.GenCommonFakeValue(c.ColumnType, rz, c.Cardinality); exists { - if preGenerated == nil { - preGenerated = make(map[string]interface{}) + if shouldStop, value := plugin.GenCommonFakeValue(c.ColumnType, rz, c.Cardinality); !shouldStop { + if value != nil { + if preGenerated == nil { + preGenerated = make(map[string]interface{}) + } + preGenerated[c.ColumnType] = value } - preGenerated[c.ColumnType] = value + } else { + // End of generation + return nil, nil, nil } } } @@ -372,11 +377,16 @@ func (rz *Randomizer) GenFakeDataAsMap(colConfs *[]DBFakeColumnConf, WithAutoInc var preGenerated map[string]interface{} for _, plugin := range rz.plugins { for _, c := range *colConfs { - if exists, value := plugin.GenCommonFakeValue(c.ColumnType, rz, c.Cardinality); exists { - if preGenerated == nil { - preGenerated = make(map[string]interface{}) + if shouldStop, value := plugin.GenCommonFakeValue(c.ColumnType, rz, c.Cardinality); !shouldStop { + if value != nil { + if preGenerated == nil { + preGenerated = make(map[string]interface{}) + } + preGenerated[c.ColumnType] = value } - preGenerated[c.ColumnType] = value + } else { + // End of generation + return nil, nil } } } From a784be2dd1c23ca82b656ede709f74f75765e08f Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Fri, 16 May 2025 21:46:34 +0300 Subject: [PATCH 04/17] Moved system worker init logic from WorkerInitFunc to Init --- acronis-db-bench/engine/helpers.go | 4 +- acronis-db-bench/engine/main.go | 48 +----- acronis-db-bench/engine/workers.go | 156 ++++++++++++------ .../basic-scenarios/medium_test.go | 24 +-- 4 files changed, 111 insertions(+), 121 deletions(-) diff --git a/acronis-db-bench/engine/helpers.go b/acronis-db-bench/engine/helpers.go index a74bd8c7..a477306c 100644 --- a/acronis-db-bench/engine/helpers.go +++ b/acronis-db-bench/engine/helpers.go @@ -166,12 +166,12 @@ func FormatSQL(sqlTemlate string, dialectName db.DialectName) string { } // NewParquetFileDataSourceForRandomizer creates a new parquet DataSetSource instance and register as plugin for Randomizer -func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath string) error { +func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath string, offset int64) error { if bench.Randomizer == nil { bench.Randomizer = benchmark.NewRandomizer(bench.CommonOpts.RandSeed, bench.CommonOpts.Workers) } - var source, err = dataset.NewParquetFileDataSource(filePath, 0) + var source, err = dataset.NewParquetFileDataSource(filePath, offset) if err != nil { return err } diff --git a/acronis-db-bench/engine/main.go b/acronis-db-bench/engine/main.go index b91e3fea..e84aa5f4 100644 --- a/acronis-db-bench/engine/main.go +++ b/acronis-db-bench/engine/main.go @@ -229,6 +229,10 @@ func Main() { // Has to be returned back to connection pool because it is not used anywhere else c.Release() + if b.Logger.GetLevel() > logger.LevelInfo && !testOpts.BenchOpts.Info { + b.Log(logger.LevelTrace, 0, getDBInfo(b, content)) + } + if testOpts.BenchOpts.Info || b.Logger.GetLevel() > logger.LevelInfo { if testOpts.BenchOpts.Info { fmt.Printf(getDBInfo(b, content)) //nolint:staticcheck @@ -252,50 +256,6 @@ func Main() { fmt.Printf("to collect the profiler log run: go tool pprof 'http://localhost:%d/debug/pprof/profile?seconds=10'\n", testOpts.BenchOpts.ProfilerPort) } - b.Init = func() { - b.Vault.(*DBTestData).TenantsCache = tenants.NewTenantsCache(b) - - b.Vault.(*DBTestData).TenantsCache.SetTenantsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.TenantsWorkingSet) - b.Vault.(*DBTestData).TenantsCache.SetCTIsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.CTIsWorkingSet) - - var tenantCacheDBOpts = b.TestOpts.(*TestOpts).DBOpts - if b.TestOpts.(*TestOpts).BenchOpts.TenantConnString != "" { - tenantCacheDBOpts.ConnString = b.TestOpts.(*TestOpts).BenchOpts.TenantConnString - } - - var tenantCacheDatabase *DBConnector - if tenantCacheDatabase, err = NewDBConnector(&tenantCacheDBOpts, -1, true, b.Logger, 1); err != nil { - b.Exit("db: cannot create tenants cache connection: %v", err) - return - } - - if err = b.Vault.(*DBTestData).TenantsCache.Init(tenantCacheDatabase.Database); err != nil { - b.Exit("db: cannot initialize tenants cache: %v", err) - } - - if b.Logger.GetLevel() > logger.LevelInfo && !testOpts.BenchOpts.Info { - b.Log(logger.LevelTrace, 0, getDBInfo(b, content)) - } - - tenantCacheDatabase.Release() - - if b.TestOpts.(*TestOpts).BenchOpts.Events { - var workingConn *DBConnector - if workingConn, err = NewDBConnector(&b.TestOpts.(*TestOpts).DBOpts, -1, true, b.Logger, 1); err != nil { - return - } - - b.Vault.(*DBTestData).EventBus = events.NewEventBus(workingConn.Database, b.Logger) - b.Vault.(*DBTestData).EventBus.CreateTables() - } - - if b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource != "" { - if err = NewParquetFileDataSourceForRandomizer(b, b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource); err != nil { - b.Exit("failed to create parquet data source: %v", err) - } - } - } - b.Finish = func() { b.Logger.Debug("finishing") if b.TestOpts.(*TestOpts).BenchOpts.Events { diff --git a/acronis-db-bench/engine/workers.go b/acronis-db-bench/engine/workers.go index 810a3d92..c8a98f30 100644 --- a/acronis-db-bench/engine/workers.go +++ b/acronis-db-bench/engine/workers.go @@ -14,6 +14,7 @@ import ( "github.com/acronis/perfkit/db" "github.com/acronis/perfkit/logger" + events "github.com/acronis/perfkit/acronis-db-bench/event-bus" tenants "github.com/acronis/perfkit/acronis-db-bench/tenants-cache" ) @@ -21,83 +22,128 @@ import ( * Worker initialization */ -func initWorker(worker *benchmark.BenchmarkWorker, testDesc *TestDesc, rowsRequired uint64) { - b := worker.Benchmark - workerID := worker.WorkerID +func initGeneric(b *benchmark.Benchmark, testDesc *TestDesc, rowsRequired uint64) { + b.Vault.(*DBTestData).TenantsCache = tenants.NewTenantsCache(b) - if worker.Data == nil { - var workerData DBWorkerData - var err error + b.Vault.(*DBTestData).TenantsCache.SetTenantsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.TenantsWorkingSet) + b.Vault.(*DBTestData).TenantsCache.SetCTIsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.CTIsWorkingSet) - if workerData.workingConn, err = NewDBConnector(&b.TestOpts.(*TestOpts).DBOpts, workerID, false, worker.Logger, 1); err != nil { + var tenantCacheDBOpts = b.TestOpts.(*TestOpts).DBOpts + if b.TestOpts.(*TestOpts).BenchOpts.TenantConnString != "" { + tenantCacheDBOpts.ConnString = b.TestOpts.(*TestOpts).BenchOpts.TenantConnString + } + + var tenantCacheDatabase, err = NewDBConnector(&tenantCacheDBOpts, -1, true, b.Logger, 1) + if err != nil { + b.Exit("db: cannot create tenants cache connection: %v", err) + return + } + + if err = b.Vault.(*DBTestData).TenantsCache.Init(tenantCacheDatabase.Database); err != nil { + b.Exit("db: cannot initialize tenants cache: %v", err) + } + + tenantCacheDatabase.Release() + + if b.TestOpts.(*TestOpts).BenchOpts.Events { + var workingConn *DBConnector + if workingConn, err = NewDBConnector(&b.TestOpts.(*TestOpts).DBOpts, -1, true, b.Logger, 1); err != nil { return } - worker.Data = &workerData + b.Vault.(*DBTestData).EventBus = events.NewEventBus(workingConn.Database, b.Logger) + b.Vault.(*DBTestData).EventBus.CreateTables() } - if workerID == 0 { - conn := worker.Data.(*DBWorkerData).workingConn - testData := b.Vault.(*DBTestData) - testData.TestDesc = testDesc + tableName := testDesc.Table.TableName + if tableName == "" { + testDesc.Table.RowsCount = 0 + return + } - // Initialize TenantsCache if it's nil - if testData.TenantsCache == nil { - testData.TenantsCache = tenants.NewTenantsCache(b) - } - worker.Randomizer.RegisterPlugin("tenant", testData.TenantsCache) + var ddlConnDatabase *DBConnector + if ddlConnDatabase, err = NewDBConnector(&tenantCacheDBOpts, -1, true, b.Logger, 1); err != nil { + b.Exit("db: cannot create connection for DDL: %v", err) + return + } - tableName := testDesc.Table.TableName + conn := ddlConnDatabase + testData := b.Vault.(*DBTestData) + testData.TestDesc = testDesc - t := testRegistry.GetTableByName(tableName) + t := testRegistry.GetTableByName(tableName) - if tableName == "" { - testDesc.Table.RowsCount = 0 - } else { - b.Logger.Debug("initializing table '%s'", tableName) - if testDesc.IsReadonly { - t.Create(conn, b) - b.Logger.Debug("readonly test, skipping table '%s' initialization", tableName) - if exists, err := conn.Database.TableExists(tableName); err != nil { - b.Exit(fmt.Sprintf("db: cannot check if table '%s' exists: %v", tableName, err)) - } else if !exists { - b.Exit("The '%s' table doesn't exist, please create tables using -I option, or use individual insert test using the -t `insert-***`", tableName) - } - } else { - b.Logger.Debug("creating table '%s'", tableName) - t.Create(conn, b) - } + b.Logger.Debug("initializing table '%s'", tableName) + if testDesc.IsReadonly { + t.Create(conn, b) + b.Logger.Debug("readonly test, skipping table '%s' initialization", tableName) + if exists, err := conn.Database.TableExists(tableName); err != nil { + b.Exit(fmt.Sprintf("db: cannot check if table '%s' exists: %v", tableName, err)) + } else if !exists { + b.Exit("The '%s' table doesn't exist, please create tables using -I option, or use individual insert test using the -t `insert-***`", tableName) + } + } else { + b.Logger.Debug("creating table '%s'", tableName) + t.Create(conn, b) + } - var session = conn.Database.Session(conn.Database.Context(context.Background(), false)) - var rowNum int64 - if rows, err := session.Select(tableName, &db.SelectCtrl{Fields: []string{"COUNT(0)"}}); err != nil { - b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, err)) - } else { - for rows.Next() { - if scanErr := rows.Scan(&rowNum); scanErr != nil { - b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, scanErr)) - } - } - rows.Close() + var session = conn.Database.Session(conn.Database.Context(context.Background(), false)) + var rowNum int64 + if rows, err := session.Select(tableName, &db.SelectCtrl{Fields: []string{"COUNT(0)"}}); err != nil { + b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, err)) + } else { + for rows.Next() { + if scanErr := rows.Scan(&rowNum); scanErr != nil { + b.Exit(fmt.Sprintf("db: cannot get rows count in table '%s': %v", tableName, scanErr)) } + } + rows.Close() + } - testDesc.Table.RowsCount = uint64(rowNum) - b.Logger.Debug("table '%s' has %d rows", tableName, testDesc.Table.RowsCount) + testDesc.Table.RowsCount = uint64(rowNum) + b.Logger.Debug("table '%s' has %d rows", tableName, testDesc.Table.RowsCount) - if rowsRequired > 0 { - if testDesc.Table.RowsCount < rowsRequired { - b.Exit(fmt.Sprintf("table '%s' has %d rows, but this test requires at least %d rows, please insert it first and then re-run the test", - testDesc.Table.TableName, testDesc.Table.RowsCount, rowsRequired)) - } - } + if rowsRequired > 0 { + if testDesc.Table.RowsCount < rowsRequired { + b.Exit(fmt.Sprintf("table '%s' has %d rows, but this test requires at least %d rows, please insert it first and then re-run the test", + testDesc.Table.TableName, testDesc.Table.RowsCount, rowsRequired)) + } + } + + ddlConnDatabase.Release() + + if b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource != "" { + if err = NewParquetFileDataSourceForRandomizer(b, b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource, rowNum); err != nil { + b.Exit("failed to create parquet data source: %v", err) } } +} + +func initWorker(worker *benchmark.BenchmarkWorker) { + b := worker.Benchmark + workerID := worker.WorkerID + + if worker.Data == nil { + var workerData DBWorkerData + var err error + + if workerData.workingConn, err = NewDBConnector(&b.TestOpts.(*TestOpts).DBOpts, workerID, false, worker.Logger, 1); err != nil { + return + } + + worker.Data = &workerData + } + worker.Logger.Trace("worker is initialized") } func initCommon(b *benchmark.Benchmark, testDesc *TestDesc, rowsRequired uint64) { + b.Init = func() { + initGeneric(b, testDesc, rowsRequired) + } + b.WorkerInitFunc = func(worker *benchmark.BenchmarkWorker) { - initWorker(worker, testDesc, rowsRequired) + initWorker(worker) } b.Metric = func() (metric string) { diff --git a/acronis-db-bench/test-groups/basic-scenarios/medium_test.go b/acronis-db-bench/test-groups/basic-scenarios/medium_test.go index 7ef197fa..40b1a19c 100644 --- a/acronis-db-bench/test-groups/basic-scenarios/medium_test.go +++ b/acronis-db-bench/test-groups/basic-scenarios/medium_test.go @@ -2,7 +2,6 @@ package basic_scenarios import ( "github.com/acronis/perfkit/acronis-db-bench/engine" - tenants "github.com/acronis/perfkit/acronis-db-bench/tenants-cache" "github.com/acronis/perfkit/benchmark" ) @@ -54,7 +53,8 @@ func (suite *TestingSuite) TestMediumTableTests() { b.AddOpts = func() benchmark.TestOpts { var testOpts = engine.TestOpts{ - DBOpts: engine.DatabaseOpts{ConnString: suite.ConnString}, + DBOpts: engine.DatabaseOpts{ConnString: suite.ConnString}, + BenchOpts: engine.BenchOpts{TenantsWorkingSet: 50}, } b.Cli.AddFlagGroup("Database options", "", &testOpts.DBOpts) @@ -62,7 +62,7 @@ func (suite *TestingSuite) TestMediumTableTests() { b.Cli.AddFlagGroup("Testcase specific options", "", &testOpts.TestcaseOpts) b.Cli.AddFlagGroup("CTI-pattern simulation test options", "", &testOpts.CTIOpts) - return &testOpts + return mediumTestOpts } b.InitOpts() @@ -78,23 +78,7 @@ func (suite *TestingSuite) TestMediumTableTests() { d.Scores[s] = []benchmark.Score{} } - b.Init = func() { - b.Vault.(*engine.DBTestData).TenantsCache = tenants.NewTenantsCache(b) - - var tenantCacheDatabase, err = engine.NewDBConnector(&b.TestOpts.(*engine.TestOpts).DBOpts, -1, true, b.Logger, 1) - if err != nil { - b.Exit("db: cannot create tenants cache connection: %v", err) - return - } - - if err = b.Vault.(*engine.DBTestData).TenantsCache.Init(tenantCacheDatabase.Database); err != nil { - b.Exit("db: cannot initialize tenants cache: %v", err) - } - } - - mediumTableTestSuite.Execute(b, &engine.TestOpts{ - DBOpts: engine.DatabaseOpts{ConnString: suite.ConnString}, - }, 1) + mediumTableTestSuite.Execute(b, nil, 1) // Clear tables engine.CleanupTables(b) From cf4a8762eec003199977c7eab90d76418817814e Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Fri, 16 May 2025 22:09:04 +0300 Subject: [PATCH 05/17] Returned support of PostgreSQL to vector search tests --- acronis-db-bench/test-groups/vector-search/tests.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/acronis-db-bench/test-groups/vector-search/tests.go b/acronis-db-bench/test-groups/vector-search/tests.go index f4b6a696..0ceefdb5 100644 --- a/acronis-db-bench/test-groups/vector-search/tests.go +++ b/acronis-db-bench/test-groups/vector-search/tests.go @@ -57,7 +57,7 @@ var TestInsertVector768MultiValue = engine.TestDesc{ Description: "insert a 768-dim vectors with ids into the 'vector' table by batches", Category: engine.TestInsert, IsReadonly: false, - Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Databases: []db.DialectName{db.POSTGRES, db.ELASTICSEARCH, db.OPENSEARCH}, Table: TestTableVector768, LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { engine.TestGeneric(b, testDesc, engine.InsertMultiValueDataWorker, 0) From c03b7f02444a0e2b9a2aa1af1731597b35d55783 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Fri, 16 May 2025 22:15:43 +0300 Subject: [PATCH 06/17] Fixed handling of empty generated values in generic insert worker --- acronis-db-bench/engine/workers.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/acronis-db-bench/engine/workers.go b/acronis-db-bench/engine/workers.go index c8a98f30..abb1f835 100644 --- a/acronis-db-bench/engine/workers.go +++ b/acronis-db-bench/engine/workers.go @@ -559,12 +559,21 @@ func InsertMultiValueDataWorker(b *benchmark.Benchmark, c *DBConnector, testDesc if err != nil { b.Exit(err) } + + if genColumns == nil { + break + } + values = append(values, vals) if i == 0 { columns = genColumns } } + if len(values) == 0 { + return + } + var session = c.Database.Session(c.Database.Context(context.Background(), false)) if txErr := session.Transact(func(tx db.DatabaseAccessor) error { return tx.BulkInsert(testDesc.Table.TableName, values, columns) From 02ac237e6f775722ec6adfc95b377b814573a835 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Fri, 16 May 2025 23:00:36 +0300 Subject: [PATCH 07/17] Fixed configuring parquet reader offset for readonly tests --- acronis-db-bench/engine/workers.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/acronis-db-bench/engine/workers.go b/acronis-db-bench/engine/workers.go index abb1f835..8802abe2 100644 --- a/acronis-db-bench/engine/workers.go +++ b/acronis-db-bench/engine/workers.go @@ -113,7 +113,12 @@ func initGeneric(b *benchmark.Benchmark, testDesc *TestDesc, rowsRequired uint64 ddlConnDatabase.Release() if b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource != "" { - if err = NewParquetFileDataSourceForRandomizer(b, b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource, rowNum); err != nil { + var offset int64 + if !testDesc.IsReadonly { + offset = rowNum + } + + if err = NewParquetFileDataSourceForRandomizer(b, b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource, offset); err != nil { b.Exit("failed to create parquet data source: %v", err) } } From 6619f5d8f6a6b8bd57bb3aa5aba5c009d3895bd9 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Sat, 17 May 2025 12:40:34 +0300 Subject: [PATCH 08/17] Implemented ability to read parquet file in a circular way --- acronis-db-bench/dataset-source/parquet.go | 59 ++++++++++++++- .../dataset-source/parquet_test.go | 74 ++++++++++++++++++- acronis-db-bench/engine/helpers.go | 2 +- 3 files changed, 130 insertions(+), 5 deletions(-) diff --git a/acronis-db-bench/dataset-source/parquet.go b/acronis-db-bench/dataset-source/parquet.go index 7119ad2d..b87a66c2 100644 --- a/acronis-db-bench/dataset-source/parquet.go +++ b/acronis-db-bench/dataset-source/parquet.go @@ -23,9 +23,10 @@ type ParquetFileDataSource struct { currentOffset int globalOffset int64 rowsToSkip int64 + circular bool } -func NewParquetFileDataSource(filePath string, offset int64) (*ParquetFileDataSource, error) { +func NewParquetFileDataSource(filePath string, offset int64, circular bool) (*ParquetFileDataSource, error) { var rdr, err = file.OpenParquetFile(filePath, true) if err != nil { return nil, fmt.Errorf("error opening parquet file: %v", err) @@ -65,6 +66,7 @@ func NewParquetFileDataSource(filePath string, offset int64) (*ParquetFileDataSo fileReader: rdr, recordReader: recordReader, rowsToSkip: offset, + circular: circular, }, nil } @@ -76,7 +78,17 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { for ds.globalOffset < ds.rowsToSkip { if ds.currentRecord == nil { if !ds.recordReader.Next() { - return nil, nil + if ds.circular { + ds.resetReader() + if !ds.recordReader.Next() { + return nil, fmt.Errorf("failed to read after reset") + } + ds.globalOffset = 0 + ds.rowsToSkip = 0 + continue + } else { + return nil, nil + } } ds.currentRecord = ds.recordReader.Record() } @@ -97,6 +109,14 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { if ds.currentRecord == nil { if ds.recordReader.Next() { ds.currentRecord = ds.recordReader.Record() + } else if ds.circular { + ds.resetReader() + if !ds.recordReader.Next() { + return nil, fmt.Errorf("failed to read after reset") + } + ds.currentRecord = ds.recordReader.Record() + ds.globalOffset = 0 + ds.rowsToSkip = 0 } else { return nil, nil } @@ -149,3 +169,38 @@ func min(a, b int64) int64 { } return b } + +func (ds *ParquetFileDataSource) resetReader() { + if ds.currentRecord != nil { + ds.currentRecord.Release() + ds.currentRecord = nil + } + ds.recordReader.Release() + + mem := memory.NewGoAllocator() + reader, err := pqarrow.NewFileReader(ds.fileReader, pqarrow.ArrowReadProperties{ + BatchSize: defaultBatchSize, + }, mem) + if err != nil { + panic(fmt.Sprintf("error creating Arrow file reader: %v", err)) + } + + var columns []int + for i := 0; i < ds.fileReader.MetaData().Schema.NumColumns(); i++ { + columns = append(columns, i) + } + + var rgrs []int + for r := 0; r < ds.fileReader.NumRowGroups(); r++ { + rgrs = append(rgrs, r) + } + + recordReader, err := reader.GetRecordReader(context.Background(), columns, rgrs) + if err != nil { + panic(fmt.Sprintf("error creating record reader: %v", err)) + } + + ds.recordReader = recordReader + ds.currentOffset = 0 + ds.globalOffset = 0 +} diff --git a/acronis-db-bench/dataset-source/parquet_test.go b/acronis-db-bench/dataset-source/parquet_test.go index 176c49fc..4c07d55e 100644 --- a/acronis-db-bench/dataset-source/parquet_test.go +++ b/acronis-db-bench/dataset-source/parquet_test.go @@ -39,7 +39,7 @@ func TestReadParquetWithOffset(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // Create a new parquet reader - reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset) + reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, false) require.NoError(t, err) defer reader.Close() @@ -92,7 +92,7 @@ func TestReadParquetWithOffsetAndLimit(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset) + reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, false) require.NoError(t, err) defer reader.Close() @@ -115,3 +115,73 @@ func TestReadParquetWithOffsetAndLimit(t *testing.T) { }) } } + +func TestReadParquetCircular(t *testing.T) { + testCases := []struct { + name string + offset int64 + expectedRounds int // Number of complete file reads to perform + expectedTotal int64 // Total number of records to read + }{ + { + name: "Read file twice", + offset: 0, + expectedRounds: 2, + expectedTotal: 201, // 100 records + 101 records (including the first record of the second round) + }, + { + name: "Read file twice from middle", + offset: 50, + expectedRounds: 2, + expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round) + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, true) + require.NoError(t, err) + defer reader.Close() + + var count int64 + var rounds int + var firstRow []interface{} + var isFirstRow = true + + for { + row, err := reader.GetNextRow() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if row == nil { + t.Fatalf("Unexpected nil row") + } + + // Store the first row we read + if isFirstRow { + firstRow = row + isFirstRow = false + } + + // Check if we've completed a full round + if count > 0 && count%100 == 0 && !isFirstRow { + rounds++ + // Verify we're back at the beginning by comparing with first row + if rounds > 1 { + assert.Equal(t, firstRow, row, "Row should match after completing a round") + } + } + + count++ + + // Stop after expected number of rounds + if rounds >= tc.expectedRounds { + break + } + } + + assert.Equal(t, tc.expectedTotal, count, "Total number of records read") + assert.Equal(t, tc.expectedRounds, rounds, "Number of complete rounds") + }) + } +} diff --git a/acronis-db-bench/engine/helpers.go b/acronis-db-bench/engine/helpers.go index a477306c..df9ff6db 100644 --- a/acronis-db-bench/engine/helpers.go +++ b/acronis-db-bench/engine/helpers.go @@ -171,7 +171,7 @@ func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath bench.Randomizer = benchmark.NewRandomizer(bench.CommonOpts.RandSeed, bench.CommonOpts.Workers) } - var source, err = dataset.NewParquetFileDataSource(filePath, offset) + var source, err = dataset.NewParquetFileDataSource(filePath, offset, false) if err != nil { return err } From b85508226a2a2391c5c1e073f88e681c9df9bb04 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Sat, 17 May 2025 12:56:02 +0300 Subject: [PATCH 09/17] Added feature of reading parquet datasources in curcular way for readonly tests --- acronis-db-bench/engine/helpers.go | 4 ++-- acronis-db-bench/engine/workers.go | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/acronis-db-bench/engine/helpers.go b/acronis-db-bench/engine/helpers.go index df9ff6db..9d664d24 100644 --- a/acronis-db-bench/engine/helpers.go +++ b/acronis-db-bench/engine/helpers.go @@ -166,12 +166,12 @@ func FormatSQL(sqlTemlate string, dialectName db.DialectName) string { } // NewParquetFileDataSourceForRandomizer creates a new parquet DataSetSource instance and register as plugin for Randomizer -func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath string, offset int64) error { +func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath string, offset int64, circular bool) error { if bench.Randomizer == nil { bench.Randomizer = benchmark.NewRandomizer(bench.CommonOpts.RandSeed, bench.CommonOpts.Workers) } - var source, err = dataset.NewParquetFileDataSource(filePath, offset, false) + var source, err = dataset.NewParquetFileDataSource(filePath, offset, circular) if err != nil { return err } diff --git a/acronis-db-bench/engine/workers.go b/acronis-db-bench/engine/workers.go index 8802abe2..98dff7de 100644 --- a/acronis-db-bench/engine/workers.go +++ b/acronis-db-bench/engine/workers.go @@ -114,11 +114,16 @@ func initGeneric(b *benchmark.Benchmark, testDesc *TestDesc, rowsRequired uint64 if b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource != "" { var offset int64 - if !testDesc.IsReadonly { + var circular bool + if testDesc.IsReadonly { + offset = 0 + circular = true + } else { offset = rowNum + circular = false } - if err = NewParquetFileDataSourceForRandomizer(b, b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource, offset); err != nil { + if err = NewParquetFileDataSourceForRandomizer(b, b.TestOpts.(*TestOpts).BenchOpts.ParquetDataSource, offset, circular); err != nil { b.Exit("failed to create parquet data source: %v", err) } } From 51b0872c6aefb2cc2e63f2f64ff50100ae168ea5 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Sun, 18 May 2025 18:53:28 +0300 Subject: [PATCH 10/17] [Parquet Reader] Moved skip until offset logic to dedicated function --- acronis-db-bench/dataset-source/parquet.go | 38 ++++++++++++++-------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/acronis-db-bench/dataset-source/parquet.go b/acronis-db-bench/dataset-source/parquet.go index b87a66c2..baeda2df 100644 --- a/acronis-db-bench/dataset-source/parquet.go +++ b/acronis-db-bench/dataset-source/parquet.go @@ -61,33 +61,42 @@ func NewParquetFileDataSource(filePath string, offset int64, circular bool) (*Pa return nil, fmt.Errorf("error creating record reader: %v", err) } - return &ParquetFileDataSource{ + var source = &ParquetFileDataSource{ columns: columnNames, fileReader: rdr, recordReader: recordReader, rowsToSkip: offset, circular: circular, - }, nil + } + + if skipErr := source.skipUntilOffset(); skipErr != nil { + return nil, skipErr + } + + return source, nil } -func (ds *ParquetFileDataSource) GetColumnNames() []string { - return ds.columns +func min(a, b int64) int64 { + if a < b { + return a + } + return b } -func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { +func (ds *ParquetFileDataSource) skipUntilOffset() error { for ds.globalOffset < ds.rowsToSkip { if ds.currentRecord == nil { if !ds.recordReader.Next() { if ds.circular { ds.resetReader() if !ds.recordReader.Next() { - return nil, fmt.Errorf("failed to read after reset") + return fmt.Errorf("failed to read after reset") } ds.globalOffset = 0 ds.rowsToSkip = 0 continue } else { - return nil, nil + return nil } } ds.currentRecord = ds.recordReader.Record() @@ -106,6 +115,14 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { } } + return nil +} + +func (ds *ParquetFileDataSource) GetColumnNames() []string { + return ds.columns +} + +func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { if ds.currentRecord == nil { if ds.recordReader.Next() { ds.currentRecord = ds.recordReader.Record() @@ -163,13 +180,6 @@ func (ds *ParquetFileDataSource) Close() { ds.fileReader.Close() } -func min(a, b int64) int64 { - if a < b { - return a - } - return b -} - func (ds *ParquetFileDataSource) resetReader() { if ds.currentRecord != nil { ds.currentRecord.Release() From e85170938c2d0c51931824aa556a78ccb67ec969 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Sun, 18 May 2025 19:08:18 +0300 Subject: [PATCH 11/17] [Parquet Reader] Simplified internal logic by removing rowsToSkip structure field --- acronis-db-bench/dataset-source/parquet.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/acronis-db-bench/dataset-source/parquet.go b/acronis-db-bench/dataset-source/parquet.go index baeda2df..208ab467 100644 --- a/acronis-db-bench/dataset-source/parquet.go +++ b/acronis-db-bench/dataset-source/parquet.go @@ -22,7 +22,6 @@ type ParquetFileDataSource struct { currentRecord arrow.Record currentOffset int globalOffset int64 - rowsToSkip int64 circular bool } @@ -65,11 +64,10 @@ func NewParquetFileDataSource(filePath string, offset int64, circular bool) (*Pa columns: columnNames, fileReader: rdr, recordReader: recordReader, - rowsToSkip: offset, circular: circular, } - if skipErr := source.skipUntilOffset(); skipErr != nil { + if skipErr := source.skipUntilOffset(offset); skipErr != nil { return nil, skipErr } @@ -83,8 +81,8 @@ func min(a, b int64) int64 { return b } -func (ds *ParquetFileDataSource) skipUntilOffset() error { - for ds.globalOffset < ds.rowsToSkip { +func (ds *ParquetFileDataSource) skipUntilOffset(rowsToSkip int64) error { + for ds.globalOffset < rowsToSkip { if ds.currentRecord == nil { if !ds.recordReader.Next() { if ds.circular { @@ -93,7 +91,7 @@ func (ds *ParquetFileDataSource) skipUntilOffset() error { return fmt.Errorf("failed to read after reset") } ds.globalOffset = 0 - ds.rowsToSkip = 0 + rowsToSkip = 0 continue } else { return nil @@ -103,7 +101,7 @@ func (ds *ParquetFileDataSource) skipUntilOffset() error { } remainingInRecord := ds.currentRecord.NumRows() - int64(ds.currentOffset) - skipCount := min(ds.rowsToSkip-ds.globalOffset, remainingInRecord) + skipCount := min(rowsToSkip-ds.globalOffset, remainingInRecord) ds.currentOffset += int(skipCount) ds.globalOffset += skipCount @@ -133,7 +131,6 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { } ds.currentRecord = ds.recordReader.Record() ds.globalOffset = 0 - ds.rowsToSkip = 0 } else { return nil, nil } From 8fd4bdbc338f22078f02f36e4f9432a09ad0ddfd Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Sun, 18 May 2025 19:27:47 +0300 Subject: [PATCH 12/17] [Parquet Reader] Fixed bug with circular reading with offset --- acronis-db-bench/dataset-source/parquet.go | 34 ++++++++++------------ 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/acronis-db-bench/dataset-source/parquet.go b/acronis-db-bench/dataset-source/parquet.go index 208ab467..d5fd81df 100644 --- a/acronis-db-bench/dataset-source/parquet.go +++ b/acronis-db-bench/dataset-source/parquet.go @@ -20,8 +20,7 @@ type ParquetFileDataSource struct { recordReader pqarrow.RecordReader currentRecord arrow.Record - currentOffset int - globalOffset int64 + currentOffset int64 circular bool } @@ -82,7 +81,9 @@ func min(a, b int64) int64 { } func (ds *ParquetFileDataSource) skipUntilOffset(rowsToSkip int64) error { - for ds.globalOffset < rowsToSkip { + var skipCount int64 = 0 + + for ds.currentOffset < rowsToSkip { if ds.currentRecord == nil { if !ds.recordReader.Next() { if ds.circular { @@ -90,9 +91,7 @@ func (ds *ParquetFileDataSource) skipUntilOffset(rowsToSkip int64) error { if !ds.recordReader.Next() { return fmt.Errorf("failed to read after reset") } - ds.globalOffset = 0 - rowsToSkip = 0 - continue + rowsToSkip -= skipCount } else { return nil } @@ -100,13 +99,12 @@ func (ds *ParquetFileDataSource) skipUntilOffset(rowsToSkip int64) error { ds.currentRecord = ds.recordReader.Record() } - remainingInRecord := ds.currentRecord.NumRows() - int64(ds.currentOffset) - skipCount := min(rowsToSkip-ds.globalOffset, remainingInRecord) + remainingInRecord := ds.currentRecord.NumRows() - ds.currentOffset + skipCount = min(rowsToSkip-ds.currentOffset, remainingInRecord) - ds.currentOffset += int(skipCount) - ds.globalOffset += skipCount + ds.currentOffset += skipCount - if int64(ds.currentOffset) >= ds.currentRecord.NumRows() { + if ds.currentOffset >= ds.currentRecord.NumRows() { ds.currentRecord.Release() ds.currentRecord = nil ds.currentOffset = 0 @@ -130,7 +128,6 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { return nil, fmt.Errorf("failed to read after reset") } ds.currentRecord = ds.recordReader.Record() - ds.globalOffset = 0 } else { return nil, nil } @@ -142,15 +139,15 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { var colData interface{} switch specificArray := col.(type) { case *array.Int64: - colData = specificArray.Value(ds.currentOffset) + colData = specificArray.Value(int(ds.currentOffset)) case *array.Float64: - colData = specificArray.Value(ds.currentOffset) + colData = specificArray.Value(int(ds.currentOffset)) case *array.String: - colData = specificArray.Value(ds.currentOffset) + colData = specificArray.Value(int(ds.currentOffset)) case *array.Binary: - colData = specificArray.Value(ds.currentOffset) + colData = specificArray.Value(int(ds.currentOffset)) case *array.List: - var beg, end = specificArray.ValueOffsets(ds.currentOffset) + var beg, end = specificArray.ValueOffsets(int(ds.currentOffset)) var values = array.NewSlice(specificArray.ListValues(), beg, end) switch specificNestedArray := values.(type) { case *array.Float32: @@ -163,7 +160,7 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) { } ds.currentOffset++ - if int64(ds.currentOffset) >= ds.currentRecord.NumRows() { + if ds.currentOffset >= ds.currentRecord.NumRows() { ds.currentRecord.Release() ds.currentRecord = nil ds.currentOffset = 0 @@ -209,5 +206,4 @@ func (ds *ParquetFileDataSource) resetReader() { ds.recordReader = recordReader ds.currentOffset = 0 - ds.globalOffset = 0 } From 60acf1f74b46d5221b36eb25527bff2a7c821123 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Sun, 18 May 2025 19:28:21 +0300 Subject: [PATCH 13/17] [Parquet Reader] Added test case for circular reading with offset --- acronis-db-bench/dataset-source/parquet_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/acronis-db-bench/dataset-source/parquet_test.go b/acronis-db-bench/dataset-source/parquet_test.go index 4c07d55e..544f2bd1 100644 --- a/acronis-db-bench/dataset-source/parquet_test.go +++ b/acronis-db-bench/dataset-source/parquet_test.go @@ -135,6 +135,12 @@ func TestReadParquetCircular(t *testing.T) { expectedRounds: 2, expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round) }, + { + name: "Read file twice from middle, skipping file once", + offset: 150, + expectedRounds: 2, + expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round) + }, } for _, tc := range testCases { From d85903b3bd7900fa36066ab5ac45e53ecc8e29cd Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Sun, 18 May 2025 19:41:18 +0300 Subject: [PATCH 14/17] [Parquet Reader] Added test cases --- .../dataset-source/parquet_test.go | 72 ++++++++++++++----- 1 file changed, 55 insertions(+), 17 deletions(-) diff --git a/acronis-db-bench/dataset-source/parquet_test.go b/acronis-db-bench/dataset-source/parquet_test.go index 544f2bd1..7719e905 100644 --- a/acronis-db-bench/dataset-source/parquet_test.go +++ b/acronis-db-bench/dataset-source/parquet_test.go @@ -65,28 +65,32 @@ func TestReadParquetWithOffset(t *testing.T) { func TestReadParquetWithOffsetAndLimit(t *testing.T) { // Test cases combining offset and limit testCases := []struct { - name string - offset int64 - limit int64 - expected int64 + name string + offset int64 + limit int64 + expectedFirst int64 + expected int64 }{ { - name: "Read first 10 records", - offset: 0, - limit: 10, - expected: 10, + name: "Read first 10 records", + offset: 0, + limit: 10, + expectedFirst: 1, + expected: 10, }, { - name: "Read 20 records from middle", - offset: 40, - limit: 20, - expected: 20, + name: "Read 20 records from middle", + offset: 40, + limit: 20, + expectedFirst: 41, + expected: 20, }, { - name: "Read beyond file size", - offset: 90, - limit: 20, - expected: 10, // Only 10 records left from offset 90 + name: "Read beyond file size", + offset: 90, + limit: 20, + expectedFirst: 91, + expected: 10, // Only 10 records left from offset 90 }, } @@ -108,6 +112,18 @@ func TestReadParquetWithOffsetAndLimit(t *testing.T) { if count >= tc.limit { break } + + assert.Equal(t, 1, len(row)) + + if castedRow, casted := row[0].(int64); casted { + if count == 0 { + assert.Equal(t, tc.expectedFirst, castedRow) + } + } else { + t.Error("wrong data type") + return + } + count++ } @@ -120,25 +136,36 @@ func TestReadParquetCircular(t *testing.T) { testCases := []struct { name string offset int64 - expectedRounds int // Number of complete file reads to perform + expectedRounds int // Number of complete file reads to perform + expectedFirst int64 expectedTotal int64 // Total number of records to read }{ { name: "Read file twice", offset: 0, expectedRounds: 2, + expectedFirst: 1, expectedTotal: 201, // 100 records + 101 records (including the first record of the second round) }, { name: "Read file twice from middle", offset: 50, expectedRounds: 2, + expectedFirst: 51, expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round) }, { name: "Read file twice from middle, skipping file once", offset: 150, expectedRounds: 2, + expectedFirst: 51, + expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round) + }, + { + name: "Read file twice from middle, skipping file twice", + offset: 250, + expectedRounds: 2, + expectedFirst: 51, expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round) }, } @@ -178,6 +205,17 @@ func TestReadParquetCircular(t *testing.T) { } } + assert.Equal(t, 1, len(row)) + + if castedRow, casted := row[0].(int64); casted { + if count == 0 { + assert.Equal(t, tc.expectedFirst, castedRow) + } + } else { + t.Error("wrong data type") + return + } + count++ // Stop after expected number of rounds From dc39c5ff7c194823f3d29651162999235e7b38bb Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Sun, 18 May 2025 20:11:42 +0300 Subject: [PATCH 15/17] [Vector Search] Minor fixes --- acronis-db-bench/test-groups/vector-search/tests.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/acronis-db-bench/test-groups/vector-search/tests.go b/acronis-db-bench/test-groups/vector-search/tests.go index 0ceefdb5..7fc70845 100644 --- a/acronis-db-bench/test-groups/vector-search/tests.go +++ b/acronis-db-bench/test-groups/vector-search/tests.go @@ -25,7 +25,7 @@ func init() { // TestTableVector768 is table to store 768-dimensions vector objects var TestTableVector768 = engine.TestTable{ TableName: "acronis_db_bench_vector_768", - Databases: []db.DialectName{db.ELASTICSEARCH, db.OPENSEARCH}, + Databases: []db.DialectName{db.POSTGRES, db.ELASTICSEARCH, db.OPENSEARCH}, Columns: [][]interface{}{ {"id", "dataset.id"}, {"embedding", "dataset.emb.list.item"}, @@ -70,7 +70,7 @@ var TestSelectVector768NearestL2 = engine.TestDesc{ Metric: "rows/sec", Description: "selects k nearest vectors by L2 norm from the 'vector' table to the given 768-dim vector", Category: engine.TestSelect, - IsReadonly: false, + IsReadonly: true, Databases: []db.DialectName{db.POSTGRES, db.ELASTICSEARCH, db.OPENSEARCH}, Table: TestTableVector768, LauncherFunc: func(b *benchmark.Benchmark, testDesc *engine.TestDesc) { From c6a73a9b2924fd8e6d65c6f53bc4b300844a881e Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Sun, 18 May 2025 20:24:01 +0300 Subject: [PATCH 16/17] [Benchmark] Changed RandomizerPlugin interface by requiring return of all common generated values by GenCommonFakeValues --- acronis-db-bench/engine/helpers.go | 13 ++++++------- .../tenants-cache/tenant_generator.go | 7 +++++-- benchmark/faker.go | 18 +++++++++++------- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/acronis-db-bench/engine/helpers.go b/acronis-db-bench/engine/helpers.go index 9d664d24..b1c7168b 100644 --- a/acronis-db-bench/engine/helpers.go +++ b/acronis-db-bench/engine/helpers.go @@ -199,11 +199,10 @@ func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath type DataSetSourcePlugin struct { source dataset.DataSetSource - columns []string - currentValues map[string]interface{} + columns []string } -func (p *DataSetSourcePlugin) GenCommonFakeValue(columnType string, rz *benchmark.Randomizer, cardinality int) (bool, interface{}) { +func (p *DataSetSourcePlugin) GenCommonFakeValues(columnType string, rz *benchmark.Randomizer, cardinality int) (bool, map[string]interface{}) { if len(p.columns) == 0 { return false, nil } @@ -222,12 +221,12 @@ func (p *DataSetSourcePlugin) GenCommonFakeValue(columnType string, rz *benchmar return true, nil } - p.currentValues = make(map[string]interface{}, len(row)) + var currentValues = make(map[string]interface{}, len(row)) for i, value := range row { - p.currentValues[p.columns[i]] = value + currentValues[p.columns[i]] = value } - return false, p.currentValues[columnType] + return false, currentValues } func (p *DataSetSourcePlugin) GenFakeValue(columnType string, rz *benchmark.Randomizer, cardinality int, preGenerated map[string]interface{}) (bool, interface{}) { @@ -235,7 +234,7 @@ func (p *DataSetSourcePlugin) GenFakeValue(columnType string, rz *benchmark.Rand return false, nil } - var value, ok = p.currentValues[columnType] + var value, ok = preGenerated[columnType] if !ok { return false, nil } diff --git a/acronis-db-bench/tenants-cache/tenant_generator.go b/acronis-db-bench/tenants-cache/tenant_generator.go index 5eac8a4d..845d680b 100644 --- a/acronis-db-bench/tenants-cache/tenant_generator.go +++ b/acronis-db-bench/tenants-cache/tenant_generator.go @@ -985,7 +985,7 @@ func (tc *TenantsCache) GetRandomCTIUUID(rz *benchmark.Randomizer, testCardinali return tc.ctiUuids[rz.IntnExp(cardinality)], nil } -func (tc *TenantsCache) GenCommonFakeValue(columnType string, rz *benchmark.Randomizer, cardinality int) (bool, interface{}) { +func (tc *TenantsCache) GenCommonFakeValues(columnType string, rz *benchmark.Randomizer, cardinality int) (bool, map[string]interface{}) { if columnType != "tenant_uuid" && columnType != "customer_uuid" && columnType != "partner_uuid" { return false, nil } @@ -1003,7 +1003,10 @@ func (tc *TenantsCache) GenCommonFakeValue(columnType string, rz *benchmark.Rand tc.benchmark.Exit(err.Error()) } - return false, tenantUUID + var currentValues = make(map[string]interface{}, 1) + currentValues[columnType] = tenantUUID + + return false, currentValues } func (tc *TenantsCache) GenFakeValue(columnType string, rz *benchmark.Randomizer, cardinality int, preGenerated map[string]interface{}) (bool, interface{}) { diff --git a/benchmark/faker.go b/benchmark/faker.go index 15ad1310..39f921a1 100644 --- a/benchmark/faker.go +++ b/benchmark/faker.go @@ -198,7 +198,7 @@ func NewRandomizer(seed int64, workerID int) *Randomizer { // RandomizerPlugin is a type RandomizerPlugin interface { - GenCommonFakeValue(columnType string, rz *Randomizer, cardinality int) (bool, interface{}) + GenCommonFakeValues(columnType string, rz *Randomizer, cardinality int) (bool, map[string]interface{}) GenFakeValue(columnType string, rz *Randomizer, cardinality int, preGenerated map[string]interface{}) (bool, interface{}) } @@ -339,12 +339,14 @@ func (rz *Randomizer) GenFakeData(colConfs *[]DBFakeColumnConf, WithAutoInc bool var preGenerated map[string]interface{} for _, plugin := range rz.plugins { for _, c := range *colConfs { - if shouldStop, value := plugin.GenCommonFakeValue(c.ColumnType, rz, c.Cardinality); !shouldStop { - if value != nil { + if shouldStop, commonValues := plugin.GenCommonFakeValues(c.ColumnType, rz, c.Cardinality); !shouldStop { + if commonValues != nil { if preGenerated == nil { preGenerated = make(map[string]interface{}) } - preGenerated[c.ColumnType] = value + for colType, val := range commonValues { + preGenerated[colType] = val + } } } else { // End of generation @@ -377,12 +379,14 @@ func (rz *Randomizer) GenFakeDataAsMap(colConfs *[]DBFakeColumnConf, WithAutoInc var preGenerated map[string]interface{} for _, plugin := range rz.plugins { for _, c := range *colConfs { - if shouldStop, value := plugin.GenCommonFakeValue(c.ColumnType, rz, c.Cardinality); !shouldStop { - if value != nil { + if shouldStop, commonValues := plugin.GenCommonFakeValues(c.ColumnType, rz, c.Cardinality); !shouldStop { + if commonValues != nil { if preGenerated == nil { preGenerated = make(map[string]interface{}) } - preGenerated[c.ColumnType] = value + for colType, val := range commonValues { + preGenerated[colType] = val + } } } else { // End of generation From 5d8babdc03a5a4062981c0c5415744ff8c8b9c7c Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Sun, 18 May 2025 20:27:55 +0300 Subject: [PATCH 17/17] [DB Bench] Made DataSetSourcePlugin concurrent-friendly --- acronis-db-bench/engine/helpers.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/acronis-db-bench/engine/helpers.go b/acronis-db-bench/engine/helpers.go index b1c7168b..7f0a2195 100644 --- a/acronis-db-bench/engine/helpers.go +++ b/acronis-db-bench/engine/helpers.go @@ -6,6 +6,7 @@ import ( "regexp" "sort" "strings" + "sync" "github.com/acronis/perfkit/benchmark" "github.com/acronis/perfkit/db" @@ -186,6 +187,7 @@ func NewParquetFileDataSourceForRandomizer(bench *benchmark.Benchmark, filePath var dataSourcePlugin = &DataSetSourcePlugin{ source: source, columns: registeredColumns, + mx: sync.Mutex{}, } bench.Randomizer.RegisterPlugin("dataset", dataSourcePlugin) @@ -200,6 +202,8 @@ type DataSetSourcePlugin struct { source dataset.DataSetSource columns []string + + mx sync.Mutex } func (p *DataSetSourcePlugin) GenCommonFakeValues(columnType string, rz *benchmark.Randomizer, cardinality int) (bool, map[string]interface{}) { @@ -212,6 +216,9 @@ func (p *DataSetSourcePlugin) GenCommonFakeValues(columnType string, rz *benchma return false, nil } + p.mx.Lock() + defer p.mx.Unlock() + var row, err = p.source.GetNextRow() if err != nil { return false, nil