From dae2fa683bd345e91e16c2a4039c8878b04762c2 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Wed, 14 May 2025 13:22:06 +0300 Subject: [PATCH 1/6] Fixed event-bus producer lib emulation --- acronis-db-bench/event-bus/events.go | 104 ++++++++++++++---- .../test-groups/basic-scenarios/common.go | 9 +- .../large-objects-operations/tests.go | 10 +- db/helpers.go | 9 +- 4 files changed, 108 insertions(+), 24 deletions(-) diff --git a/acronis-db-bench/event-bus/events.go b/acronis-db-bench/event-bus/events.go index 127edc2e..08dbf457 100644 --- a/acronis-db-bench/event-bus/events.go +++ b/acronis-db-bench/event-bus/events.go @@ -2,6 +2,7 @@ package event_bus import ( "context" + "database/sql" "fmt" "strconv" "strings" @@ -67,7 +68,7 @@ func NewEventBus(conn db.Database, logger logger.Logger) *EventBus { return &EventBus{ workerConn: conn, workerStarted: false, - stopCh: make(chan bool), + stopCh: make(chan bool, 1), batchSize: 500, sleepMsec: 10, logger: logger, @@ -85,18 +86,19 @@ func (e *EventBus) MainLoop() { defer e.wg.Done() for { + select { + case <-e.stopCh: + e.logger.Info("stopping main worker loop") + return + default: + } + if empty, err := e.QueueIsEmpty(); err != nil { e.logger.Error("cannot check if queue is empty: %v", err) - return } else if empty { - select { - case <-e.stopCh: - e.logger.Info("stopping main worker loop") - - return - default: - } + time.Sleep(time.Duration(e.sleepMsec) * time.Millisecond) + continue } time.Sleep(time.Duration(e.sleepMsec) * time.Millisecond) @@ -411,6 +413,24 @@ func (e *EventBus) DoAlign() (bool, error) { return err } + case db.MYSQL: + if err = tx.QueryRow("SELECT sequence + 1 FROM acronis_db_bench_eventbus_sequences WHERE int_id = ? FOR UPDATE;", 1).Scan(&seq64); err != nil { + return err + } + + if _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_sequences SET sequence = ? - 1 WHERE int_id = ?;", seq64+int64(len(ids)), 1); err != nil { + return err + } + + case db.SQLITE: + if err = tx.QueryRow("SELECT sequence + 1 FROM acronis_db_bench_eventbus_sequences WHERE int_id = $1;", 1).Scan(&seq64); err != nil { + return err + } + + if _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_sequences SET sequence = $1 - 1 WHERE int_id = $2;", seq64+int64(len(ids)), 1); err != nil { + return err + } + default: if err = tx.QueryRow("SELECT sequence + 1 FROM acronis_db_bench_eventbus_sequences WHERE int_id = $1 FOR UPDATE;", 1).Scan(&seq64); err != nil { return err @@ -430,7 +450,14 @@ func (e *EventBus) DoAlign() (bool, error) { values := make([]interface{}, fields*len(data)) for n := range data { - placeholders[n] = fmt.Sprintf("(%s)", db.GenDBParameterPlaceholders(n*fields, fields)) + var rawPlaceholders string + if c.DialectName() == db.MYSQL { + rawPlaceholders = db.GenDBParameterPlaceholders(n*fields, fields, false) + } else { + rawPlaceholders = db.GenDBParameterPlaceholders(n*fields, fields, true) + } + + placeholders[n] = fmt.Sprintf("(%s)", rawPlaceholders) values[n*fields+0] = seq64 + int64(n) // intId: global sequence values[n*fields+1] = data[n].TopicInternalID // topic_id values[n*fields+2] = data[n].EventTypeInternalID // event type_id @@ -450,9 +477,13 @@ func (e *EventBus) DoAlign() (bool, error) { for n := range data { if c.DialectName() == db.MSSQL { - placeholders[n] = fmt.Sprintf("(%s, GETDATE())", db.GenDBParameterPlaceholders(n*fields, fields)) + placeholders[n] = fmt.Sprintf("(%s, GETDATE())", db.GenDBParameterPlaceholders(n*fields, fields, true)) + } else if c.DialectName() == db.POSTGRES { + placeholders[n] = fmt.Sprintf("(%s, NOW())", db.GenDBParameterPlaceholders(n*fields, fields, true)) + } else if c.DialectName() == db.SQLITE { + placeholders[n] = fmt.Sprintf("(%s, datetime('now'))", db.GenDBParameterPlaceholders(n*fields, fields, true)) } else { - placeholders[n] = fmt.Sprintf("(%s, NOW())", db.GenDBParameterPlaceholders(n*fields, fields)) + placeholders[n] = fmt.Sprintf("(%s, NOW())", db.GenDBParameterPlaceholders(n*fields, fields, false)) } values[n*fields+0] = seq64 + int64(n) // int_id: global sequence values[n*fields+1] = data[n].TopicInternalID // topic_id @@ -484,15 +515,24 @@ func (e *EventBus) DoMaxSeqShifter() (bool, error) { switch c.DialectName() { case db.MSSQL: err = tx.QueryRow("SELECT TOP(1) seq FROM acronis_db_bench_eventbus_stream WHERE topic_id = $1 AND seq IS NOT NULL ORDER BY seq DESC;", t).Scan(&seq64) + case db.MYSQL: + err = tx.QueryRow("SELECT seq FROM acronis_db_bench_eventbus_stream WHERE topic_id = ? AND seq IS NOT NULL ORDER BY seq DESC LIMIT 1;", t).Scan(&seq64) default: err = tx.QueryRow("SELECT seq FROM acronis_db_bench_eventbus_stream WHERE topic_id = $1 AND seq IS NOT NULL ORDER BY seq DESC LIMIT 1;", t).Scan(&seq64) } - if err != nil { + if err != nil && err.Error() != sql.ErrNoRows.Error() { return err } - if _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_topics SET max_seq = $1, acked_cursor = $2 WHERE internal_id = $3 AND max_seq < $4", seq64, seq64, t, seq64); err != nil { + switch c.DialectName() { + case db.MYSQL: + _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_topics SET max_seq = ?, acked_cursor = ? WHERE internal_id = ? AND max_seq < ?;", seq64, seq64, t, seq64) + default: + _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_topics SET max_seq = $1, acked_cursor = $2 WHERE internal_id = $3 AND max_seq < $4;", seq64, seq64, t, seq64) + } + + if err != nil { return err } @@ -512,8 +552,15 @@ func (e *EventBus) DoFetch() (bool, error) { for t := 1; t < MaxTopics+1; t++ { var cur64 int64 - if err := sess.QueryRow("SELECT sent_cursor FROM acronis_db_bench_eventbus_topics WHERE internal_id = $1", t).Scan(&cur64); err != nil { - return false, err + switch c.DialectName() { + case db.MYSQL: + if err := sess.QueryRow("SELECT sent_cursor FROM acronis_db_bench_eventbus_topics WHERE internal_id = ?;", t).Scan(&cur64); err != nil { + return false, err + } + default: + if err := sess.QueryRow("SELECT sent_cursor FROM acronis_db_bench_eventbus_topics WHERE internal_id = $1;", t).Scan(&cur64); err != nil { + return false, err + } } var rows, err = sess.Query(fmt.Sprintf(` @@ -545,7 +592,13 @@ func (e *EventBus) DoFetch() (bool, error) { } } - _, err = sess.Exec("UPDATE acronis_db_bench_eventbus_topics SET sent_cursor = $1 WHERE internal_id = $2", sentCursor, t) + switch c.DialectName() { + case db.MYSQL: + _, err = sess.Exec("UPDATE acronis_db_bench_eventbus_topics SET sent_cursor = ? WHERE internal_id = ?;", sentCursor, t) + default: + _, err = sess.Exec("UPDATE acronis_db_bench_eventbus_topics SET sent_cursor = $1 WHERE internal_id = $2;", sentCursor, t) + } + return false, err } @@ -575,8 +628,15 @@ func (e *EventBus) DoArchive() (bool, error) { for t := 1; t < MaxTopics+1; t++ { if txErr := sess.Transact(func(tx db.DatabaseAccessor) error { var cur64 int64 - if err := tx.QueryRow("SELECT acked_cursor FROM acronis_db_bench_eventbus_topics WHERE internal_id = $1", t).Scan(&cur64); err != nil { - return err + switch c.DialectName() { + case db.MYSQL: + if err := tx.QueryRow("SELECT acked_cursor FROM acronis_db_bench_eventbus_topics WHERE internal_id = ?;", t).Scan(&cur64); err != nil { + return err + } + default: + if err := tx.QueryRow("SELECT acked_cursor FROM acronis_db_bench_eventbus_topics WHERE internal_id = $1;", t).Scan(&cur64); err != nil { + return err + } } var err error @@ -585,6 +645,10 @@ func (e *EventBus) DoArchive() (bool, error) { _, err = tx.Exec(fmt.Sprintf("INSERT INTO acronis_db_bench_eventbus_archive (int_id, topic_id, seq, seq_time) SELECT TOP %d int_id, topic_id, seq, seq_time "+ "FROM acronis_db_bench_eventbus_stream WHERE topic_id = %d AND seq IS NOT NULL AND seq <= %d ORDER BY seq ;", e.batchSize, t, cur64)) + case db.MYSQL: + _, err = tx.Exec("INSERT INTO acronis_db_bench_eventbus_archive (int_id, topic_id, seq, seq_time) SELECT int_id, topic_id, seq, seq_time "+ + "FROM acronis_db_bench_eventbus_stream WHERE topic_id = ? AND seq IS NOT NULL AND seq <= ? ORDER BY seq LIMIT ?;", + t, cur64, e.batchSize) default: _, err = tx.Exec("INSERT INTO acronis_db_bench_eventbus_archive (int_id, topic_id, seq, seq_time) SELECT int_id, topic_id, seq, seq_time "+ "FROM acronis_db_bench_eventbus_stream WHERE topic_id = $1 AND seq IS NOT NULL AND seq <= $2 ORDER BY seq LIMIT $3;", @@ -593,7 +657,7 @@ func (e *EventBus) DoArchive() (bool, error) { switch c.DialectName() { case db.MYSQL: - _, err = tx.Exec("DELETE FROM acronis_db_bench_eventbus_stream WHERE topic_id = $1 AND seq <= $2 ORDER BY seq ASC LIMIT $3;", t, cur64, e.batchSize) + _, err = tx.Exec("DELETE FROM acronis_db_bench_eventbus_stream WHERE topic_id = ? AND seq <= ? ORDER BY seq ASC LIMIT ?;", t, cur64, e.batchSize) case db.MSSQL: _, err = tx.Exec(fmt.Sprintf(`DELETE x FROM acronis_db_bench_eventbus_stream x diff --git a/acronis-db-bench/test-groups/basic-scenarios/common.go b/acronis-db-bench/test-groups/basic-scenarios/common.go index 4fd48cdc..a564edd0 100644 --- a/acronis-db-bench/test-groups/basic-scenarios/common.go +++ b/acronis-db-bench/test-groups/basic-scenarios/common.go @@ -29,7 +29,14 @@ func insertByPreparedDataWorker(b *benchmark.Benchmark, c *engine.DBConnector, t b.Exit(err) } - parametersPlaceholder := db.GenDBParameterPlaceholders(0, len(*colConfs)) + var numericPlaceholders bool + if c.Database.DialectName() == db.POSTGRES { + numericPlaceholders = true + } else { + numericPlaceholders = false + } + + parametersPlaceholder := db.GenDBParameterPlaceholders(0, len(*colConfs), numericPlaceholders) sql := fmt.Sprintf("INSERT INTO %s (%s) VALUES(%s)", testDesc.Table.TableName, strings.Join(columns, ","), parametersPlaceholder) sql = engine.FormatSQL(sql, c.Database.DialectName()) diff --git a/acronis-db-bench/test-groups/large-objects-operations/tests.go b/acronis-db-bench/test-groups/large-objects-operations/tests.go index b2bb56cc..be521663 100644 --- a/acronis-db-bench/test-groups/large-objects-operations/tests.go +++ b/acronis-db-bench/test-groups/large-objects-operations/tests.go @@ -46,7 +46,15 @@ var TestTableLargeObj = engine.TestTable{ // createLargeObjectWorker inserts a row with large random object into the 'largeobject' table func createLargeObjectWorker(b *benchmark.Benchmark, c *engine.DBConnector, testDesc *engine.TestDesc, batch int) (loops int) { colConfs := testDesc.Table.GetColumnsForInsert(db.WithAutoInc(c.Database.DialectName())) - parametersPlaceholder := db.GenDBParameterPlaceholders(0, len(*colConfs)) + + var numericPlaceholders bool + if c.Database.DialectName() == db.POSTGRES { + numericPlaceholders = true + } else { + numericPlaceholders = false + } + + parametersPlaceholder := db.GenDBParameterPlaceholders(0, len(*colConfs), numericPlaceholders) var session = c.Database.Session(c.Database.Context(context.Background(), false)) diff --git a/db/helpers.go b/db/helpers.go index 546932e2..926a5d45 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -352,11 +352,16 @@ func SortFields(fields map[string][]string) []Cond { } // GenDBParameterPlaceholders generates placeholders for given start and count -func GenDBParameterPlaceholders(start int, count int) string { +func GenDBParameterPlaceholders(start int, count int, numeric bool) string { var ret = make([]string, count) end := start + count for i := start; i < end; i++ { - ret[i-start] = fmt.Sprintf("$%d", i+1) + if numeric { + ret[i-start] = fmt.Sprintf("$%d", i+1) + } else { + ret[i-start] = fmt.Sprintf("?") + } + } return strings.Join(ret, ",") From ee8413e24a76f0666a9c43273268c2cc3d7cbbba Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Wed, 14 May 2025 16:29:15 +0300 Subject: [PATCH 2/6] Fixed event-bus producer emmulator for SQL Server --- acronis-db-bench/event-bus/events.go | 78 ++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 23 deletions(-) diff --git a/acronis-db-bench/event-bus/events.go b/acronis-db-bench/event-bus/events.go index 08dbf457..9940a89c 100644 --- a/acronis-db-bench/event-bus/events.go +++ b/acronis-db-bench/event-bus/events.go @@ -333,8 +333,17 @@ func (e *EventBus) DoAlign() (bool, error) { * step #1 - get fresh events */ + var topOrEmpty, limitOrEmpty string + if c.DialectName() == db.MSSQL { + topOrEmpty = fmt.Sprintf("TOP %d", e.batchSize) + limitOrEmpty = "" + } else { + topOrEmpty = "" + limitOrEmpty = fmt.Sprintf("LIMIT %d", e.batchSize) + } + var rows, err = tx.Query(fmt.Sprintf(` - SELECT internal_id, + SELECT %s internal_id, topic_internal_id, event_type_internal_id, event_id, @@ -350,7 +359,9 @@ func (e *EventBus) DoAlign() (bool, error) { created_at FROM acronis_db_bench_eventbus_events ORDER BY internal_id - LIMIT %d;`, e.batchSize)) + %s;`, + topOrEmpty, + limitOrEmpty)) if err != nil { return err } @@ -405,11 +416,11 @@ func (e *EventBus) DoAlign() (bool, error) { switch c.DialectName() { case db.MSSQL: - if err = tx.QueryRow("SELECT sequence + 1 FROM acronis_db_bench_eventbus_sequences WITH (UPDLOCK) WHERE int_id = $1;", 1).Scan(&seq64); err != nil { + if err = tx.QueryRow("SELECT sequence + 1 FROM acronis_db_bench_eventbus_sequences WITH (UPDLOCK) WHERE int_id = @p1;", 1).Scan(&seq64); err != nil { return err } - if _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_sequences SET sequence = $1 - 1 WHERE int_id = $2;", seq64+int64(len(ids)), 1); err != nil { + if _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_sequences SET sequence = @p1 - 1 WHERE int_id = @p2;", seq64+int64(len(ids)), 1); err != nil { return err } @@ -451,46 +462,56 @@ func (e *EventBus) DoAlign() (bool, error) { for n := range data { var rawPlaceholders string - if c.DialectName() == db.MYSQL { - rawPlaceholders = db.GenDBParameterPlaceholders(n*fields, fields, false) - } else { - rawPlaceholders = db.GenDBParameterPlaceholders(n*fields, fields, true) + start := n * fields + switch c.DialectName() { + case db.MSSQL: + rawPlaceholders = fmt.Sprintf("(@p%d,@p%d,@p%d,@p%d)", start+1, start+2, start+3, start+4) + case db.MYSQL: + rawPlaceholders = fmt.Sprintf("(?,?,?,?)") + default: + rawPlaceholders = fmt.Sprintf("($%d,$%d,$%d,$%d)", start+1, start+2, start+3, start+4) } - placeholders[n] = fmt.Sprintf("(%s)", rawPlaceholders) + placeholders[n] = rawPlaceholders values[n*fields+0] = seq64 + int64(n) // intId: global sequence values[n*fields+1] = data[n].TopicInternalID // topic_id values[n*fields+2] = data[n].EventTypeInternalID // event type_id values[n*fields+3] = data[n].Data // event data } - if _, err = tx.Exec(fmt.Sprintf("INSERT INTO acronis_db_bench_eventbus_data (int_id, topic_id, type_id, data) VALUES%s;", strings.Join(placeholders, ",")), values...); err != nil { + if _, err = tx.Exec(fmt.Sprintf("INSERT INTO acronis_db_bench_eventbus_data (int_id, topic_id, type_id, data) VALUES %s;", strings.Join(placeholders, ",")), values...); err != nil { return err } /* - * step #4 - create meta data in `acronis_db_bench_eventbus_data` + * step #5 - create meta data in `acronis_db_bench_stream` */ fields = 3 values = make([]interface{}, fields*len(data)) for n := range data { - if c.DialectName() == db.MSSQL { - placeholders[n] = fmt.Sprintf("(%s, GETDATE())", db.GenDBParameterPlaceholders(n*fields, fields, true)) - } else if c.DialectName() == db.POSTGRES { - placeholders[n] = fmt.Sprintf("(%s, NOW())", db.GenDBParameterPlaceholders(n*fields, fields, true)) - } else if c.DialectName() == db.SQLITE { - placeholders[n] = fmt.Sprintf("(%s, datetime('now'))", db.GenDBParameterPlaceholders(n*fields, fields, true)) - } else { - placeholders[n] = fmt.Sprintf("(%s, NOW())", db.GenDBParameterPlaceholders(n*fields, fields, false)) + var rawPlaceholders string + start := n * fields + + switch c.DialectName() { + case db.MSSQL: + rawPlaceholders = fmt.Sprintf("(@p%d,@p%d,@p%d, GETDATE())", start+1, start+2, start+3) + case db.POSTGRES: + rawPlaceholders = fmt.Sprintf("($%d,$%d,$%d, NOW())", start+1, start+2, start+3) + case db.SQLITE: + rawPlaceholders = fmt.Sprintf("($%d,$%d,$%d, datetime('now'))", start+1, start+2, start+3) + case db.MYSQL: + rawPlaceholders = fmt.Sprintf("(?,?,?,NOW())") } + + placeholders[n] = rawPlaceholders values[n*fields+0] = seq64 + int64(n) // int_id: global sequence values[n*fields+1] = data[n].TopicInternalID // topic_id values[n*fields+2] = seq64 + int64(n) // seq: per-topic sequence, currently equals to int_id } - if _, err = tx.Exec(fmt.Sprintf("INSERT INTO acronis_db_bench_eventbus_stream (int_id, topic_id, seq, seq_time) VALUES%s;", strings.Join(placeholders, ",")), values...); err != nil { + if _, err = tx.Exec(fmt.Sprintf("INSERT INTO acronis_db_bench_eventbus_stream (int_id, topic_id, seq, seq_time) VALUES %s;", strings.Join(placeholders, ",")), values...); err != nil { return err } @@ -563,16 +584,27 @@ func (e *EventBus) DoFetch() (bool, error) { } } + var topOrEmpty, limitOrEmpty string + if c.DialectName() == db.MSSQL { + topOrEmpty = fmt.Sprintf("TOP %d", e.batchSize) + limitOrEmpty = "" + } else { + topOrEmpty = "" + limitOrEmpty = fmt.Sprintf("LIMIT %d", e.batchSize) + } + var rows, err = sess.Query(fmt.Sprintf(` - SELECT s.int_id, s.topic_id, d.type_id, s.seq, s.seq_time, d.data + SELECT %s s.int_id, s.topic_id, d.type_id, s.seq, s.seq_time, d.data FROM acronis_db_bench_eventbus_stream s INNER JOIN acronis_db_bench_eventbus_data d ON s.int_id = d.int_id WHERE s.topic_id = %d AND s.seq IS NOT NULL AND s.seq > %d ORDER BY s.seq - LIMIT %d;`, - t, cur64, e.batchSize)) + %s`, + topOrEmpty, + t, cur64, + limitOrEmpty)) if err != nil { return false, err } From e2e2b281c821ec0ecefa319af4c1ca6c12c55bb5 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Wed, 14 May 2025 18:17:48 +0300 Subject: [PATCH 3/6] Improved logging of event-bus lib emulator --- acronis-db-bench/event-bus/events.go | 55 +++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/acronis-db-bench/event-bus/events.go b/acronis-db-bench/event-bus/events.go index 9940a89c..bc245171 100644 --- a/acronis-db-bench/event-bus/events.go +++ b/acronis-db-bench/event-bus/events.go @@ -292,15 +292,35 @@ type stepType func() //nolint:unused func (e *EventBus) Work() { e.workerIteration++ e.logger.Trace(fmt.Sprintf("worker iteration #%d start", e.workerIteration)) - if e.Step("phase #1 (aligner)", e.DoAlign) { // perf model: per event - e.Step("phase #2 (max seq shifter)", e.DoMaxSeqShifter) // perf model: per batch - e.Step("phase #3 (fetcher)", e.DoFetch) // perf model: per event, but in a batch - // e.Step("phase # (window shift)", e.DoWindowShift) // perf model: per larger batch, depends on ingest & delivery response - // e.Step("phase # (consolidation)", e.DoConsolidate) // rarely used - // e.Step("phase # (fetch consolidated)", e.DoFetchConsolidated) // rarely used - e.Step("phase #4 (archive)", e.DoArchive) // perf model: per event, but in a batch - // e.Step("phase #5 (delete)", e.DoDelete) // perf model: per event, but in a larger batch + + // Phase 1: Align events + if aligned := e.Step("phase #1 (aligner)", e.DoAlign); aligned { + e.logger.Debug("Events aligned, proceeding with processing") + + // Phase 2: Update max sequence + if shifted := e.Step("phase #2 (max seq shifter)", e.DoMaxSeqShifter); shifted { + e.logger.Debug("Max sequence shifted") + } else { + e.logger.Debug("Max sequence shift failed or no events to process") + } + + // Phase 3: Fetch and process events + if fetched := e.Step("phase #3 (fetcher)", e.DoFetch); fetched { + e.logger.Debug("Events fetched and processed") + } else { + e.logger.Debug("No events fetched or processing failed") + } + + // Phase 4: Archive processed events + if archived := e.Step("phase #4 (archive)", e.DoArchive); archived { + e.logger.Debug("Events archived") + } else { + e.logger.Debug("Archiving failed or no events to archive") + } + } else { + e.logger.Debug("No events to align or alignment failed") } + e.logger.Trace(fmt.Sprintf("worker iteration #%d end", e.workerIteration)) } @@ -328,6 +348,7 @@ func (e *EventBus) DoAlign() (bool, error) { var session = c.Session(c.Context(context.Background(), false)) if txErr := session.Transact(func(tx db.DatabaseAccessor) error { + e.logger.Debug("Starting DoAlign transaction") /* * step #1 - get fresh events @@ -363,6 +384,7 @@ func (e *EventBus) DoAlign() (bool, error) { topOrEmpty, limitOrEmpty)) if err != nil { + e.logger.Error("Failed to query events: %v", err) return err } @@ -383,7 +405,8 @@ func (e *EventBus) DoAlign() (bool, error) { &unused, // &ed.DataBase64 &unused) // &ed.CreatedAt if err != nil { - return err + e.logger.Error("Failed to scan event: %v", err) + return fmt.Errorf("scan failed: %v", err) } ids = append(ids, strconv.FormatInt(ed.InternalID, 10)) @@ -397,7 +420,7 @@ func (e *EventBus) DoAlign() (bool, error) { return nil } - e.logger.Trace(fmt.Sprintf("%d new events found", len(ids))) + e.logger.Debug("Found %d new events", len(ids)) newEventsFound = true /* @@ -405,6 +428,7 @@ func (e *EventBus) DoAlign() (bool, error) { */ if _, err = tx.Exec(fmt.Sprintf("DELETE FROM acronis_db_bench_eventbus_events WHERE internal_id IN (%s);", strings.Join(ids, ","))); err != nil { + e.logger.Error("Failed to delete events: %v", err) return err } @@ -417,37 +441,45 @@ func (e *EventBus) DoAlign() (bool, error) { switch c.DialectName() { case db.MSSQL: if err = tx.QueryRow("SELECT sequence + 1 FROM acronis_db_bench_eventbus_sequences WITH (UPDLOCK) WHERE int_id = @p1;", 1).Scan(&seq64); err != nil { + e.logger.Error("Failed to get sequence (MSSQL): %v", err) return err } if _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_sequences SET sequence = @p1 - 1 WHERE int_id = @p2;", seq64+int64(len(ids)), 1); err != nil { + e.logger.Error("Failed to update sequence (MSSQL): %v", err) return err } case db.MYSQL: if err = tx.QueryRow("SELECT sequence + 1 FROM acronis_db_bench_eventbus_sequences WHERE int_id = ? FOR UPDATE;", 1).Scan(&seq64); err != nil { + e.logger.Error("Failed to get sequence (MySQL): %v", err) return err } if _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_sequences SET sequence = ? - 1 WHERE int_id = ?;", seq64+int64(len(ids)), 1); err != nil { + e.logger.Error("Failed to update sequence (MySQL): %v", err) return err } case db.SQLITE: if err = tx.QueryRow("SELECT sequence + 1 FROM acronis_db_bench_eventbus_sequences WHERE int_id = $1;", 1).Scan(&seq64); err != nil { + e.logger.Error("Failed to get sequence (SQLite): %v", err) return err } if _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_sequences SET sequence = $1 - 1 WHERE int_id = $2;", seq64+int64(len(ids)), 1); err != nil { + e.logger.Error("Failed to update sequence (SQLite): %v", err) return err } default: if err = tx.QueryRow("SELECT sequence + 1 FROM acronis_db_bench_eventbus_sequences WHERE int_id = $1 FOR UPDATE;", 1).Scan(&seq64); err != nil { + e.logger.Error("Failed to get sequence (default): %v", err) return err } if _, err = tx.Exec("UPDATE acronis_db_bench_eventbus_sequences SET sequence = $1 - 1 WHERE int_id = $2;", seq64+int64(len(ids)), 1); err != nil { + e.logger.Error("Failed to update sequence (default): %v", err) return err } } @@ -480,6 +512,7 @@ func (e *EventBus) DoAlign() (bool, error) { } if _, err = tx.Exec(fmt.Sprintf("INSERT INTO acronis_db_bench_eventbus_data (int_id, topic_id, type_id, data) VALUES %s;", strings.Join(placeholders, ",")), values...); err != nil { + e.logger.Error("Failed to insert into eventbus_data: %v", err) return err } @@ -512,9 +545,11 @@ func (e *EventBus) DoAlign() (bool, error) { } if _, err = tx.Exec(fmt.Sprintf("INSERT INTO acronis_db_bench_eventbus_stream (int_id, topic_id, seq, seq_time) VALUES %s;", strings.Join(placeholders, ",")), values...); err != nil { + e.logger.Error("Failed to insert into eventbus_stream: %v", err) return err } + e.logger.Debug("Successfully completed DoAlign transaction") return nil }); txErr != nil { return false, fmt.Errorf("align: %v", txErr) From 97166fbe5702292c5288a508331fdbb4c4b9e8a7 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Wed, 14 May 2025 18:21:32 +0300 Subject: [PATCH 4/6] Fixed dead-lock in event-bus lib emulator --- acronis-db-bench/event-bus/events.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/acronis-db-bench/event-bus/events.go b/acronis-db-bench/event-bus/events.go index bc245171..67703896 100644 --- a/acronis-db-bench/event-bus/events.go +++ b/acronis-db-bench/event-bus/events.go @@ -91,18 +91,17 @@ func (e *EventBus) MainLoop() { e.logger.Info("stopping main worker loop") return default: - } + if empty, err := e.QueueIsEmpty(); err != nil { + e.logger.Error("cannot check if queue is empty: %v", err) + return + } else if empty { + time.Sleep(time.Duration(e.sleepMsec) * time.Millisecond) + continue + } - if empty, err := e.QueueIsEmpty(); err != nil { - e.logger.Error("cannot check if queue is empty: %v", err) - return - } else if empty { time.Sleep(time.Duration(e.sleepMsec) * time.Millisecond) - continue + e.Work() } - - time.Sleep(time.Duration(e.sleepMsec) * time.Millisecond) - e.Work() } } @@ -148,6 +147,7 @@ func (e *EventBus) Stop() { return } e.stopCh <- true + e.workerStarted = false e.wg.Wait() e.logger.Debug("worker stop") } From 11db73e3ba5d6dca7cf7f408cf11f4ed3b921512 Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Wed, 14 May 2025 18:35:36 +0300 Subject: [PATCH 5/6] Added tests for event-bus lib emulator for all dialects except sqlite --- acronis-db-bench/event-bus/events_test.go | 248 ++++++++++++++++++++++ 1 file changed, 248 insertions(+) create mode 100644 acronis-db-bench/event-bus/events_test.go diff --git a/acronis-db-bench/event-bus/events_test.go b/acronis-db-bench/event-bus/events_test.go new file mode 100644 index 00000000..e52f1afe --- /dev/null +++ b/acronis-db-bench/event-bus/events_test.go @@ -0,0 +1,248 @@ +package event_bus + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/acronis/perfkit/benchmark" + "github.com/acronis/perfkit/db" + "github.com/acronis/perfkit/logger" + + // List of required database drivers + _ "github.com/acronis/perfkit/db/sql" // sql drivers +) + +const ( + mariaDBConnString = "mysql://user:password@tcp(localhost:3306)/perfkit_db_ci" // example value of a secret + postgresqlConnString = "postgresql://root:password@localhost:5432/perfkit_pg_vector_db_ci?sslmode=disable" // example value of a secret + sqlServerConnString = "sqlserver://perfkit_db_runner:MyP%40ssw0rd123@localhost:1433?database=perfkit_db_ci" // example value of a secret +) + +func TestEventBusSuiteMariaDB(t *testing.T) { + suite.Run(t, &EventBusTestSuite{ConnString: mariaDBConnString}) +} + +func TestEventBusSuitePostgreSQL(t *testing.T) { + suite.Run(t, &EventBusTestSuite{ConnString: postgresqlConnString}) +} + +func TestEventBusSuiteSQLServer(t *testing.T) { + suite.Run(t, &EventBusTestSuite{ConnString: sqlServerConnString}) +} + +type mockLogger struct { + logger.Logger +} + +func (m *mockLogger) Debug(format string, args ...interface{}) {} +func (m *mockLogger) Info(format string, args ...interface{}) {} +func (m *mockLogger) Warn(format string, args ...interface{}) {} +func (m *mockLogger) Error(format string, args ...interface{}) {} +func (m *mockLogger) Trace(format string, args ...interface{}) {} +func (m *mockLogger) Log(level logger.LogLevel, message string, args ...interface{}) {} +func (m *mockLogger) GetLevel() logger.LogLevel { return logger.LevelDebug } +func (m *mockLogger) SetLevel(level logger.LogLevel) {} +func (m *mockLogger) GetLastMessage() *logger.LogMessage { return nil } +func (m *mockLogger) Clone() logger.Logger { return m } + +type EventBusTestSuite struct { + suite.Suite + ConnString string + eventBus *EventBus + conn db.Database + mockLog *mockLogger +} + +func (suite *EventBusTestSuite) SetupSuite() { + var err error + suite.conn, err = db.Open(db.Config{ + ConnString: suite.ConnString, + }) + require.NoError(suite.T(), err) + suite.mockLog = &mockLogger{} +} + +func (suite *EventBusTestSuite) TearDownSuite() { + if suite.conn != nil { + suite.conn.Close() + } +} + +func (suite *EventBusTestSuite) SetupTest() { + suite.eventBus = NewEventBus(suite.conn, suite.mockLog) +} + +func (suite *EventBusTestSuite) TearDownTest() { + if suite.eventBus != nil && suite.eventBus.workerStarted { + suite.eventBus.Stop() + // Give some time for the worker to stop + time.Sleep(100 * time.Millisecond) + } +} + +func (suite *EventBusTestSuite) TestNewEventBus() { + require.NoError(suite.T(), suite.eventBus.CreateTables()) + defer func() { + _ = suite.eventBus.DropTables() + }() + + assert.NotNil(suite.T(), suite.eventBus) + assert.Equal(suite.T(), suite.conn, suite.eventBus.workerConn) + assert.Equal(suite.T(), false, suite.eventBus.workerStarted) + assert.Equal(suite.T(), 500, suite.eventBus.batchSize) + assert.Equal(suite.T(), 10, suite.eventBus.sleepMsec) +} + +func (suite *EventBusTestSuite) TestCreateTables() { + require.NoError(suite.T(), suite.eventBus.CreateTables()) + defer func() { + _ = suite.eventBus.DropTables() + }() + + // Verify tables were created + tables := []string{ + "acronis_db_bench_eventbus_events", + "acronis_db_bench_eventbus_topics", + "acronis_db_bench_eventbus_event_types", + "acronis_db_bench_eventbus_sequences", + } + + for _, table := range tables { + exists, err := suite.conn.TableExists(table) + require.NoError(suite.T(), err) + assert.True(suite.T(), exists, "Table %s should exist", table) + } + + // Verify initial data + var count int + ctx := suite.conn.Context(context.Background(), false) + err := suite.conn.Session(ctx).QueryRow("SELECT COUNT(*) FROM acronis_db_bench_eventbus_topics").Scan(&count) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), MaxTopics, count, "Should have created MaxTopics entries") +} + +func (suite *EventBusTestSuite) TestDropTables() { + require.NoError(suite.T(), suite.eventBus.CreateTables()) + require.NoError(suite.T(), suite.eventBus.DropTables()) + + // Verify tables were dropped + tables := []string{ + "acronis_db_bench_eventbus_events", + "acronis_db_bench_eventbus_topics", + "acronis_db_bench_eventbus_event_types", + "acronis_db_bench_eventbus_sequences", + } + + for _, table := range tables { + exists, err := suite.conn.TableExists(table) + require.NoError(suite.T(), err) + assert.False(suite.T(), exists, "Table %s should not exist", table) + } +} + +func (suite *EventBusTestSuite) TestInsertEvent() { + require.NoError(suite.T(), suite.eventBus.CreateTables()) + defer func() { + _ = suite.eventBus.DropTables() + }() + + rz := benchmark.NewRandomizer(time.Now().UnixNano(), 0) + subjectUUID := "test-subject-uuid" + + ctx := suite.conn.Context(context.Background(), false) + session := suite.conn.Session(ctx) + err := suite.eventBus.InsertEvent(rz, session, subjectUUID) + require.NoError(suite.T(), err) + + // Verify event was inserted + var count int + err = session.QueryRow("SELECT COUNT(*) FROM acronis_db_bench_eventbus_events").Scan(&count) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), 1, count, "Should have inserted one event") +} + +func (suite *EventBusTestSuite) TestStartStop() { + require.NoError(suite.T(), suite.eventBus.CreateTables()) + + // Verify tables exist before proceeding + exists, err := suite.conn.TableExists("acronis_db_bench_eventbus_events") + require.NoError(suite.T(), err) + require.True(suite.T(), exists, "Events table should exist before starting test") + + // Test Start + err = suite.eventBus.Start() + require.NoError(suite.T(), err) + assert.True(suite.T(), suite.eventBus.workerStarted) + + // Give some time for the worker to start + time.Sleep(100 * time.Millisecond) + + // Insert an event to ensure worker is processing + rz := benchmark.NewRandomizer(time.Now().UnixNano(), 0) + ctx := suite.conn.Context(context.Background(), false) + session := suite.conn.Session(ctx) + err = suite.eventBus.InsertEvent(rz, session, "test-subject") + require.NoError(suite.T(), err) + + // Wait for event to be processed + time.Sleep(200 * time.Millisecond) + + // Stop the worker before checking the event count + suite.eventBus.Stop() + time.Sleep(100 * time.Millisecond) + + // Verify event was processed + var count int + err = session.QueryRow("SELECT COUNT(*) FROM acronis_db_bench_eventbus_events").Scan(&count) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, count, "Event should have been processed") + + // Verify worker is stopped + assert.False(suite.T(), suite.eventBus.workerStarted, "Worker should be marked as stopped") + + // Insert another event to verify worker is stopped + err = suite.eventBus.InsertEvent(rz, session, "test-subject-2") + require.NoError(suite.T(), err) + + // Verify event remains in events table since worker is stopped + err = session.QueryRow("SELECT COUNT(*) FROM acronis_db_bench_eventbus_events").Scan(&count) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), 1, count, "Event should remain in events table when worker is stopped") + + // Test double stop - should not panic or block + suite.eventBus.Stop() + + // Clean up tables at the end + defer func() { + _ = suite.eventBus.DropTables() + }() +} + +func (suite *EventBusTestSuite) TestQueueIsEmpty() { + require.NoError(suite.T(), suite.eventBus.CreateTables()) + defer func() { + _ = suite.eventBus.DropTables() + }() + + // Initially queue should be empty + empty, err := suite.eventBus.QueueIsEmpty() + require.NoError(suite.T(), err) + assert.True(suite.T(), empty) + + // Insert an event + rz := benchmark.NewRandomizer(time.Now().UnixNano(), 0) + ctx := suite.conn.Context(context.Background(), false) + session := suite.conn.Session(ctx) + err = suite.eventBus.InsertEvent(rz, session, "test-subject") + require.NoError(suite.T(), err) + + // Queue should not be empty + empty, err = suite.eventBus.QueueIsEmpty() + require.NoError(suite.T(), err) + assert.False(suite.T(), empty) +} From 908ea070ba8cda55461aefd6add2c8fbe11a67cc Mon Sep 17 00:00:00 2001 From: Ilya Kompaniets Date: Thu, 15 May 2025 13:56:23 +0300 Subject: [PATCH 6/6] Fixed race condition in event-bus producer emulator tests --- acronis-db-bench/event-bus/events_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/acronis-db-bench/event-bus/events_test.go b/acronis-db-bench/event-bus/events_test.go index e52f1afe..b44b25cc 100644 --- a/acronis-db-bench/event-bus/events_test.go +++ b/acronis-db-bench/event-bus/events_test.go @@ -168,6 +168,9 @@ func (suite *EventBusTestSuite) TestInsertEvent() { func (suite *EventBusTestSuite) TestStartStop() { require.NoError(suite.T(), suite.eventBus.CreateTables()) + defer func() { + _ = suite.eventBus.DropTables() + }() // Verify tables exist before proceeding exists, err := suite.conn.TableExists("acronis_db_bench_eventbus_events") @@ -198,7 +201,10 @@ func (suite *EventBusTestSuite) TestStartStop() { // Verify event was processed var count int - err = session.QueryRow("SELECT COUNT(*) FROM acronis_db_bench_eventbus_events").Scan(&count) + // Re-establish session context for this query to ensure freshness + freshCtx := suite.conn.Context(context.Background(), false) + freshSession := suite.conn.Session(freshCtx) + err = freshSession.QueryRow("SELECT COUNT(*) FROM acronis_db_bench_eventbus_events").Scan(&count) require.NoError(suite.T(), err) assert.Equal(suite.T(), 0, count, "Event should have been processed") @@ -216,11 +222,6 @@ func (suite *EventBusTestSuite) TestStartStop() { // Test double stop - should not panic or block suite.eventBus.Stop() - - // Clean up tables at the end - defer func() { - _ = suite.eventBus.DropTables() - }() } func (suite *EventBusTestSuite) TestQueueIsEmpty() {