From 50ac8da4a08f059d335b4b564b307ca06ade82a8 Mon Sep 17 00:00:00 2001 From: Charly Batista Date: Wed, 6 Aug 2025 09:16:12 -0400 Subject: [PATCH 1/4] Fix critical database persistence issue - lookup metric IDs instead of hardcoded values CRITICAL FIX: Plugins were using hardcoded metric IDs (1, 7) which may not exist in the database, causing silent failures in result storage. Changes: - Both bulk-load and tpcc-scalability plugins now properly look up metric IDs from database using GetMetric() instead of hardcoded values - Added comprehensive debug logging to trace data flow - Enhanced error handling in StoreResults with detailed logging - Added validation that results are actually being stored This ensures that test metrics are properly persisted to the database instead of silently failing due to invalid metric references. --- core/storage/manager.go | 25 +++++++++++++---- plugins/bulk-load/plugin.go | 44 ++++++++++++++++++++++++------ plugins/bulk-load/plugin_test.go | 14 +++++----- plugins/tpcc-scalability/plugin.go | 27 +++++++++++++----- 4 files changed, 82 insertions(+), 28 deletions(-) diff --git a/core/storage/manager.go b/core/storage/manager.go index 3bf433d..1dc52e4 100644 --- a/core/storage/manager.go +++ b/core/storage/manager.go @@ -314,9 +314,16 @@ func (m *Manager) ListTestRuns(ctx context.Context, limit, offset int) ([]core.T // StoreResults stores test results in batch func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) error { if len(results) == 0 { + m.logger.Warn("StoreResults called with empty results array") return nil } + m.logger.Info("StoreResults called", + core.Field{Key: "result_count", Value: len(results)}, + core.Field{Key: "first_test_run_id", Value: results[0].TestRunID}, + core.Field{Key: "first_metric_id", Value: results[0].MetricID}, + ) + db, err := m.db.GetConnection(ctx) if err != nil { return fmt.Errorf("failed to get database connection: %w", err) @@ -342,12 +349,19 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e defer stmt.Close() // Insert each result - for _, result := range results { + for i, result := range results { tagsJSON, err := json.Marshal(result.Tags) if err != nil { - return fmt.Errorf("failed to marshal tags: %w", err) + return fmt.Errorf("failed to marshal tags for result %d: %w", i, err) } + m.logger.Debug("Inserting result", + core.Field{Key: "index", Value: i}, + core.Field{Key: "test_run_id", Value: result.TestRunID}, + core.Field{Key: "metric_id", Value: result.MetricID}, + core.Field{Key: "value", Value: result.Value}, + ) + _, err = stmt.ExecContext(ctx, result.TestRunID, result.MetricID, @@ -357,7 +371,7 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e tagsJSON, ) if err != nil { - return fmt.Errorf("failed to insert result: %w", err) + return fmt.Errorf("failed to insert result %d: %w", i, err) } } @@ -365,9 +379,8 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e return fmt.Errorf("failed to commit transaction: %w", err) } - m.logger.Info("test results stored", - core.Field{Key: "result_count", Value: len(results)}, - core.Field{Key: "test_run_id", Value: results[0].TestRunID}, + m.logger.Info("Successfully stored results to database", + core.Field{Key: "stored_count", Value: len(results)}, ) return nil diff --git a/plugins/bulk-load/plugin.go b/plugins/bulk-load/plugin.go index 6650706..f70a75e 100644 --- a/plugins/bulk-load/plugin.go +++ b/plugins/bulk-load/plugin.go @@ -449,6 +449,8 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac if err := p.storeResults(ctx); err != nil { p.logger.Error("Failed to store test results", core.Field{Key: "error", Value: err.Error()}) // Don't fail the entire test, just log the error + } else { + p.logger.Info("Test results stored successfully") } return nil @@ -820,19 +822,26 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { testRunID = 0 } + // Look up metric IDs from database instead of using hardcoded values + rowInsertMetric, err := p.core.Storage.GetMetric(ctx, "ROW_INSERT") + if err != nil { + return fmt.Errorf("failed to get ROW_INSERT metric: %w", err) + } + + latencyAvgMetric, err := p.core.Storage.GetMetric(ctx, "LATENCY_AVG") + if err != nil { + return fmt.Errorf("failed to get LATENCY_AVG metric: %w", err) + } + var results []core.TestResult now := time.Now() - // Get metric IDs (we should really look these up, but for now use hardcoded IDs) - rowInsertMetricID := 1 // ROW_INSERT - latencyMetricID := 7 // LATENCY_AVG - // Convert each batch result to database format for _, batch := range p.metrics.BatchResults { // Store transaction rate results = append(results, core.TestResult{ TestRunID: testRunID, - MetricID: rowInsertMetricID, + MetricID: rowInsertMetric.ID, StartTime: p.metrics.StartTime, EndTime: now, Value: batch.TransactionsPerSec, @@ -846,7 +855,7 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { // Store rows per second results = append(results, core.TestResult{ TestRunID: testRunID, - MetricID: rowInsertMetricID, + MetricID: rowInsertMetric.ID, StartTime: p.metrics.StartTime, EndTime: now, Value: batch.RowsPerSec, @@ -860,7 +869,7 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { // Store average latency results = append(results, core.TestResult{ TestRunID: testRunID, - MetricID: latencyMetricID, + MetricID: latencyAvgMetric.ID, StartTime: p.metrics.StartTime, EndTime: now, Value: batch.AvgLatencyMs, @@ -872,8 +881,27 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { }) } + p.logger.Info("Storing test results", + core.Field{Key: "test_run_id", Value: testRunID}, + core.Field{Key: "result_count", Value: len(results)}, + core.Field{Key: "batch_count", Value: len(p.metrics.BatchResults)}, + core.Field{Key: "metrics_start_time", Value: p.metrics.StartTime}, + core.Field{Key: "metrics_end_time", Value: p.metrics.EndTime}, + core.Field{Key: "total_transactions", Value: p.metrics.TotalTransactions}, + core.Field{Key: "total_rows", Value: p.metrics.TotalRowsInserted}, + ) + // Store all results - return p.core.Storage.StoreResults(ctx, results) + storeErr := p.core.Storage.StoreResults(ctx, results) + if storeErr != nil { + p.logger.Error("Failed to store results to database", core.Field{Key: "error", Value: storeErr.Error()}) + return storeErr + } + + p.logger.Info("Successfully stored test results to database", + core.Field{Key: "stored_results", Value: len(results)}, + ) + return nil } // NewPlugin returns the plugin instance (required for plugin loading) diff --git a/plugins/bulk-load/plugin_test.go b/plugins/bulk-load/plugin_test.go index b4dfb53..9ece89b 100644 --- a/plugins/bulk-load/plugin_test.go +++ b/plugins/bulk-load/plugin_test.go @@ -12,13 +12,13 @@ import ( // MockLogger implements the core.Logger interface for testing type MockLogger struct{} -func (m *MockLogger) Debug(msg string, fields ...core.Field) {} -func (m *MockLogger) Info(msg string, fields ...core.Field) {} -func (m *MockLogger) Warn(msg string, fields ...core.Field) {} -func (m *MockLogger) Error(msg string, fields ...core.Field) {} -func (m *MockLogger) WithFields(fields ...core.Field) core.Logger { return m } -func (m *MockLogger) WithPlugin(pluginName string) core.Logger { return m } -func (m *MockLogger) WithStorage(storage core.StorageManager) core.Logger { return m } +func (m *MockLogger) Debug(msg string, fields ...core.Field) {} +func (m *MockLogger) Info(msg string, fields ...core.Field) {} +func (m *MockLogger) Warn(msg string, fields ...core.Field) {} +func (m *MockLogger) Error(msg string, fields ...core.Field) {} +func (m *MockLogger) WithFields(fields ...core.Field) core.Logger { return m } +func (m *MockLogger) WithPlugin(pluginName string) core.Logger { return m } +func (m *MockLogger) WithStorage(storage core.StorageManager) core.Logger { return m } // MockCoreServices provides mock implementations for testing type MockCoreServices struct { diff --git a/plugins/tpcc-scalability/plugin.go b/plugins/tpcc-scalability/plugin.go index 0ac08ac..1d073ce 100644 --- a/plugins/tpcc-scalability/plugin.go +++ b/plugins/tpcc-scalability/plugin.go @@ -1116,13 +1116,20 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { testRunID = 0 } + // Look up metric IDs from database instead of using hardcoded values + throughputMetric, err := p.core.Storage.GetMetric(ctx, "THROUGHPUT") + if err != nil { + return fmt.Errorf("failed to get THROUGHPUT metric: %w", err) + } + + latencyAvgMetric, err := p.core.Storage.GetMetric(ctx, "LATENCY_AVG") + if err != nil { + return fmt.Errorf("failed to get LATENCY_AVG metric: %w", err) + } + var results []core.TestResult now := time.Now() - // Get metric IDs (we should really look these up, but for now use hardcoded IDs) - transactionMetricID := 2 // TRANSACTION_RATE - latencyMetricID := 7 // LATENCY_AVG - p.metrics.mu.RLock() defer p.metrics.mu.RUnlock() @@ -1134,7 +1141,7 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { // Store total transaction rate results = append(results, core.TestResult{ TestRunID: testRunID, - MetricID: transactionMetricID, + MetricID: throughputMetric.ID, StartTime: p.testStarted, EndTime: now, Value: float64(totalTxns), @@ -1149,7 +1156,7 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { avgLatencyMs := float64(p.metrics.TotalLatency.Nanoseconds()) / float64(totalTxns) / 1000000.0 results = append(results, core.TestResult{ TestRunID: testRunID, - MetricID: latencyMetricID, + MetricID: latencyAvgMetric.ID, StartTime: p.testStarted, EndTime: now, Value: avgLatencyMs, @@ -1173,7 +1180,7 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { if count > 0 { results = append(results, core.TestResult{ TestRunID: testRunID, - MetricID: transactionMetricID, + MetricID: throughputMetric.ID, StartTime: p.testStarted, EndTime: now, Value: float64(count), @@ -1188,6 +1195,12 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { } } + p.logger.Info("Storing TPCC test results", + core.Field{Key: "test_run_id", Value: testRunID}, + core.Field{Key: "result_count", Value: len(results)}, + core.Field{Key: "total_transactions", Value: totalTxns}, + ) + // Store all results return p.core.Storage.StoreResults(ctx, results) } From f7d0b4b6fde01ebc14a4d01d58aba9ebaee3d6bc Mon Sep 17 00:00:00 2001 From: Charly Batista Date: Wed, 6 Aug 2025 09:18:03 -0400 Subject: [PATCH 2/4] Add comprehensive debug logging to track metric collection and storage --- plugins/bulk-load/plugin.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/plugins/bulk-load/plugin.go b/plugins/bulk-load/plugin.go index f70a75e..4bd5cb5 100644 --- a/plugins/bulk-load/plugin.go +++ b/plugins/bulk-load/plugin.go @@ -356,6 +356,9 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac // Extract test run ID from context and add to logger if testRunID, ok := ctx.Value("test_run_id").(int64); ok { p.logger = p.logger.WithFields(core.Field{Key: "test_run_id", Value: testRunID}) + p.logger.Info("Plugin executing with test run ID", core.Field{Key: "test_run_id", Value: testRunID}) + } else { + p.logger.Warn("No test run ID found in context - this may cause issues with result storage") } // Set running state @@ -419,6 +422,14 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac p.metrics.TotalErrors += result.TotalErrors p.metrics.mu.Unlock() + p.logger.Info("Metrics collection update", + core.Field{Key: "batch_size", Value: batchSize}, + core.Field{Key: "total_transactions", Value: result.TotalTransactions}, + core.Field{Key: "total_rows", Value: result.TotalRowsInserted}, + core.Field{Key: "errors", Value: result.TotalErrors}, + core.Field{Key: "accumulated_total_transactions", Value: p.metrics.TotalTransactions}, + core.Field{Key: "accumulated_total_rows", Value: p.metrics.TotalRowsInserted}) + p.logger.Info("Batch test completed", core.Field{Key: "batch_size", Value: batchSize}, core.Field{Key: "transactions", Value: result.TotalTransactions}, @@ -836,6 +847,17 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { var results []core.TestResult now := time.Now() + if len(p.metrics.BatchResults) == 0 { + p.logger.Warn("No batch results to store - test may not have run properly") + return nil + } + + p.logger.Info("Processing batch results for storage", + core.Field{Key: "batch_count", Value: len(p.metrics.BatchResults)}, + core.Field{Key: "total_transactions", Value: p.metrics.TotalTransactions}, + core.Field{Key: "total_rows", Value: p.metrics.TotalRowsInserted}, + ) + // Convert each batch result to database format for _, batch := range p.metrics.BatchResults { // Store transaction rate From a43fef65f5c39444d0bad312cb57ea278f547ef2 Mon Sep 17 00:00:00 2001 From: Charly Batista Date: Wed, 6 Aug 2025 09:42:11 -0400 Subject: [PATCH 3/4] Implement background metrics saving every second for both plugins - Add backgroundMetricsSaver goroutine to both bulk-load and tpcc-scalability plugins - Metrics are now saved to database every second during test execution - Added comprehensive logging to track metrics collection and storage - Ensures real-time data persistence instead of only saving at test end - Thread-safe implementation with proper context cancellation --- plugins/bulk-load/plugin.go | 175 ++++++++++++++++++++++++++++ plugins/tpcc-scalability/plugin.go | 176 +++++++++++++++++++++++++++++ 2 files changed, 351 insertions(+) diff --git a/plugins/bulk-load/plugin.go b/plugins/bulk-load/plugin.go index 4bd5cb5..84c2d52 100644 --- a/plugins/bulk-load/plugin.go +++ b/plugins/bulk-load/plugin.go @@ -370,6 +370,15 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac p.testStarted = time.Now() p.metrics.StartTime = p.testStarted + // Start background metrics saver goroutine + metricsCtx, cancelMetrics := context.WithCancel(ctx) + metricsDone := make(chan struct{}) + go p.backgroundMetricsSaver(metricsCtx, metricsDone) + defer func() { + cancelMetrics() + <-metricsDone // Wait for goroutine to finish + }() + p.logger.Info("Starting bulk load performance test", core.Field{Key: "batch_sizes", Value: p.config.BatchSizes}, core.Field{Key: "connections", Value: p.config.Connections}, @@ -820,6 +829,172 @@ func (p *BulkLoadPlugin) executeBulkInsert(batchSize int) error { return err } +// backgroundMetricsSaver continuously saves metrics every second while test is running +func (p *BulkLoadPlugin) backgroundMetricsSaver(ctx context.Context, done chan<- struct{}) { + defer close(done) + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + lastSavedTransactions := int64(0) + lastSavedRows := int64(0) + saveCounter := 0 + + for { + select { + case <-ctx.Done(): + p.logger.Info("Background metrics saver stopping") + return + case <-ticker.C: + saveCounter++ + + // Get current metrics snapshot + p.metrics.mu.RLock() + currentTransactions := p.metrics.TotalTransactions + currentRows := p.metrics.TotalRowsInserted + batchCount := len(p.metrics.BatchResults) + p.metrics.mu.RUnlock() + + // Only save if we have new data + if currentTransactions > lastSavedTransactions || currentRows > lastSavedRows { + err := p.saveCurrentMetrics(ctx, saveCounter) + if err != nil { + p.logger.Error("Failed to save metrics in background", + core.Field{Key: "error", Value: err.Error()}, + core.Field{Key: "save_iteration", Value: saveCounter}) + } else { + p.logger.Info("Background metrics saved", + core.Field{Key: "save_iteration", Value: saveCounter}, + core.Field{Key: "total_transactions", Value: currentTransactions}, + core.Field{Key: "total_rows", Value: currentRows}, + core.Field{Key: "batch_count", Value: batchCount}) + + lastSavedTransactions = currentTransactions + lastSavedRows = currentRows + } + } else { + p.logger.Debug("No new metrics to save", + core.Field{Key: "save_iteration", Value: saveCounter}) + } + } + } +} + +// saveCurrentMetrics saves current accumulated metrics to database +func (p *BulkLoadPlugin) saveCurrentMetrics(ctx context.Context, iteration int) error { + if p.core == nil || p.core.Storage == nil { + return fmt.Errorf("core services not available") + } + + // Extract test run ID from context + testRunID, ok := ctx.Value("test_run_id").(int64) + if !ok { + return fmt.Errorf("test_run_id not found in context") + } + + // Get metric IDs + rowInsertMetric, err := p.core.Storage.GetMetric(ctx, "ROW_INSERT") + if err != nil { + return fmt.Errorf("failed to get ROW_INSERT metric: %w", err) + } + + latencyAvgMetric, err := p.core.Storage.GetMetric(ctx, "LATENCY_AVG") + if err != nil { + return fmt.Errorf("failed to get LATENCY_AVG metric: %w", err) + } + + throughputMetric, err := p.core.Storage.GetMetric(ctx, "THROUGHPUT") + if err != nil { + return fmt.Errorf("failed to get THROUGHPUT metric: %w", err) + } + + now := time.Now() + var results []core.TestResult + + p.metrics.mu.RLock() + defer p.metrics.mu.RUnlock() + + // Save cumulative metrics as snapshot + if p.metrics.TotalTransactions > 0 { + // Calculate elapsed time for rates + elapsed := now.Sub(p.metrics.StartTime).Seconds() + if elapsed > 0 { + transactionRate := float64(p.metrics.TotalTransactions) / elapsed + rowRate := float64(p.metrics.TotalRowsInserted) / elapsed + + // Store transaction rate + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: transactionRate, + Tags: map[string]interface{}{ + "metric_type": "cumulative_transaction_rate", + "iteration": iteration, + "connections": p.config.Connections, + "total_transactions": p.metrics.TotalTransactions, + }, + }) + + // Store row insertion rate + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: rowInsertMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: rowRate, + Tags: map[string]interface{}{ + "metric_type": "cumulative_row_rate", + "iteration": iteration, + "connections": p.config.Connections, + "total_rows": p.metrics.TotalRowsInserted, + }, + }) + } + + // Calculate average latency from batch results + if len(p.metrics.BatchResults) > 0 { + var totalLatency float64 + var totalOperations int64 + + for _, batch := range p.metrics.BatchResults { + totalLatency += batch.AvgLatencyMs * float64(batch.TotalTransactions) + totalOperations += batch.TotalTransactions + } + + if totalOperations > 0 { + avgLatency := totalLatency / float64(totalOperations) + + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: latencyAvgMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: avgLatency, + Tags: map[string]interface{}{ + "metric_type": "cumulative_avg_latency", + "iteration": iteration, + "connections": p.config.Connections, + "batch_count": len(p.metrics.BatchResults), + }, + }) + } + } + } + + if len(results) > 0 { + p.logger.Debug("Saving background metrics", + core.Field{Key: "test_run_id", Value: testRunID}, + core.Field{Key: "result_count", Value: len(results)}, + core.Field{Key: "iteration", Value: iteration}) + + return p.core.Storage.StoreResults(ctx, results) + } + + return nil +} + // storeResults converts plugin metrics to core.TestResult and stores them in the database func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { if p.core == nil || p.core.Storage == nil { diff --git a/plugins/tpcc-scalability/plugin.go b/plugins/tpcc-scalability/plugin.go index 1d073ce..dcb8c8a 100644 --- a/plugins/tpcc-scalability/plugin.go +++ b/plugins/tpcc-scalability/plugin.go @@ -196,6 +196,16 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) defer atomic.StoreInt64(&p.isRunning, 0) p.testStarted = time.Now() + + // Start background metrics saver goroutine + metricsCtx, cancelMetrics := context.WithCancel(ctx) + metricsDone := make(chan struct{}) + go p.backgroundMetricsSaver(metricsCtx, metricsDone) + defer func() { + cancelMetrics() + <-metricsDone // Wait for goroutine to finish + }() + p.logger.Info("starting TPC-C scalability test", core.Field{Key: "scale", Value: p.config.Scale}, core.Field{Key: "connection_levels", Value: len(p.config.Connections)}, @@ -1103,6 +1113,172 @@ func (p *TPCCPlugin) logConnectionLevelSummary(connCount int, duration time.Dura ) } +// backgroundMetricsSaver continuously saves metrics every second while test is running +func (p *TPCCPlugin) backgroundMetricsSaver(ctx context.Context, done chan<- struct{}) { + defer close(done) + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + lastSavedTransactions := int64(0) + saveCounter := 0 + + for { + select { + case <-ctx.Done(): + p.logger.Info("Background metrics saver stopping") + return + case <-ticker.C: + saveCounter++ + + // Get current metrics snapshot + p.metrics.mu.RLock() + totalTxns := p.metrics.NewOrderCount + p.metrics.PaymentCount + + p.metrics.OrderStatusCount + p.metrics.DeliveryCount + p.metrics.StockLevelCount + currentConnections := p.metrics.CurrentConnections + testPhase := p.metrics.TestPhase + p.metrics.mu.RUnlock() + + // Only save if we have new data + if totalTxns > lastSavedTransactions { + err := p.saveCurrentTPCCMetrics(ctx, saveCounter) + if err != nil { + p.logger.Error("Failed to save TPCC metrics in background", + core.Field{Key: "error", Value: err.Error()}, + core.Field{Key: "save_iteration", Value: saveCounter}) + } else { + p.logger.Info("Background TPCC metrics saved", + core.Field{Key: "save_iteration", Value: saveCounter}, + core.Field{Key: "total_transactions", Value: totalTxns}, + core.Field{Key: "connections", Value: currentConnections}, + core.Field{Key: "test_phase", Value: testPhase}) + + lastSavedTransactions = totalTxns + } + } else { + p.logger.Debug("No new TPCC metrics to save", + core.Field{Key: "save_iteration", Value: saveCounter}) + } + } + } +} + +// saveCurrentTPCCMetrics saves current accumulated metrics to database +func (p *TPCCPlugin) saveCurrentTPCCMetrics(ctx context.Context, iteration int) error { + if p.core == nil || p.core.Storage == nil { + return fmt.Errorf("core services not available") + } + + // Extract test run ID from context + testRunID, ok := ctx.Value("test_run_id").(int64) + if !ok { + return fmt.Errorf("test_run_id not found in context") + } + + // Get metric IDs + throughputMetric, err := p.core.Storage.GetMetric(ctx, "THROUGHPUT") + if err != nil { + return fmt.Errorf("failed to get THROUGHPUT metric: %w", err) + } + + latencyAvgMetric, err := p.core.Storage.GetMetric(ctx, "LATENCY_AVG") + if err != nil { + return fmt.Errorf("failed to get LATENCY_AVG metric: %w", err) + } + + now := time.Now() + var results []core.TestResult + + p.metrics.mu.RLock() + defer p.metrics.mu.RUnlock() + + // Calculate total transactions + totalTxns := p.metrics.NewOrderCount + p.metrics.PaymentCount + + p.metrics.OrderStatusCount + p.metrics.DeliveryCount + p.metrics.StockLevelCount + + if totalTxns > 0 { + // Calculate elapsed time for rates + elapsed := now.Sub(p.testStarted).Seconds() + if elapsed > 0 { + transactionRate := float64(totalTxns) / elapsed + + // Store transaction rate + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: transactionRate, + Tags: map[string]interface{}{ + "metric_type": "cumulative_tps", + "iteration": iteration, + "connections": p.metrics.CurrentConnections, + "test_phase": p.metrics.TestPhase, + "total_transactions": totalTxns, + }, + }) + } + + // Calculate and store average latency + if totalTxns > 0 { + avgLatencyMs := float64(p.metrics.TotalLatency.Nanoseconds()) / float64(totalTxns) / 1000000.0 + + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: latencyAvgMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: avgLatencyMs, + Tags: map[string]interface{}{ + "metric_type": "cumulative_avg_latency", + "iteration": iteration, + "connections": p.metrics.CurrentConnections, + "test_phase": p.metrics.TestPhase, + }, + }) + } + + // Store individual transaction type counts + txTypes := map[string]int64{ + "new_order": p.metrics.NewOrderCount, + "payment": p.metrics.PaymentCount, + "order_status": p.metrics.OrderStatusCount, + "delivery": p.metrics.DeliveryCount, + "stock_level": p.metrics.StockLevelCount, + } + + for txType, count := range txTypes { + if count > 0 { + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: float64(count), + Tags: map[string]interface{}{ + "metric_type": "transaction_count", + "transaction_type": txType, + "iteration": iteration, + "connections": p.metrics.CurrentConnections, + "test_phase": p.metrics.TestPhase, + }, + }) + } + } + } + + if len(results) > 0 { + p.logger.Debug("Saving background TPCC metrics", + core.Field{Key: "test_run_id", Value: testRunID}, + core.Field{Key: "result_count", Value: len(results)}, + core.Field{Key: "iteration", Value: iteration}) + + return p.core.Storage.StoreResults(ctx, results) + } + + return nil +} + // storeResults converts plugin metrics to core.TestResult and stores them in the database func (p *TPCCPlugin) storeResults(ctx context.Context) error { if p.core == nil || p.core.Storage == nil { From 5d4959e54e3f6ac195ddf03910bb18013951cd5c Mon Sep 17 00:00:00 2001 From: Charly Batista Date: Wed, 6 Aug 2025 10:45:26 -0400 Subject: [PATCH 4/4] feat: Add connection/worker tracking and standardize metric tags - Add migration 004 to include active_connections and active_workers columns in test_run_result table - Update TestResult struct to include optional ActiveConnections and ActiveWorkers fields - Enhance StorageManager to handle new connection/worker fields in StoreResults and GetResults - Standardize tag structure across both plugins for consistent analysis: * All metrics now include: metric_type, iteration, connections, batch_size, total_transactions, total_rows, batch_count, test_phase * Use null values for non-applicable fields to maintain consistency * Add scale_factor for TPCC metrics and transaction_type for transaction counts - Update bulk-load plugin to track active connections and workers in all metrics - Update tpcc-scalability plugin with consistent tag structure and connection/worker tracking - Improve background metrics collection with real-time worker and connection statistics This ensures all metrics have consistent structure for easier analysis and querying. --- config/core.yaml | 2 +- core/storage/manager.go | 41 ++- core/types.go | 16 +- .../004_add_connection_worker_tracking.sql | 17 + plugins/bulk-load/API_EXAMPLES.md | 8 +- plugins/bulk-load/plugin.go | 300 ++++++++++++------ plugins/tpcc-scalability/plugin.go | 158 +++++---- 7 files changed, 380 insertions(+), 162 deletions(-) create mode 100644 migrations/004_add_connection_worker_tracking.sql diff --git a/config/core.yaml b/config/core.yaml index f10a840..407abfb 100644 --- a/config/core.yaml +++ b/config/core.yaml @@ -18,7 +18,7 @@ api: port: 9090 logging: - level: "info" + level: "debug" format: "text" # "text" or "json"aa output: "stdout" # "stdout", "stderr", or file path diff --git a/core/storage/manager.go b/core/storage/manager.go index 1dc52e4..6ae8063 100644 --- a/core/storage/manager.go +++ b/core/storage/manager.go @@ -336,11 +336,11 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e } defer tx.Rollback() - // Prepare insert statement + // Prepare insert statement with new connection and worker fields query := ` INSERT INTO test_run_result - (test_run_id, test_metric_id, start_time, end_time, value, tags) - VALUES ($1, $2, $3, $4, $5, $6) + (test_run_id, test_metric_id, start_time, end_time, value, tags, active_connections, active_workers) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ` stmt, err := tx.PrepareContext(ctx, query) if err != nil { @@ -360,6 +360,8 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e core.Field{Key: "test_run_id", Value: result.TestRunID}, core.Field{Key: "metric_id", Value: result.MetricID}, core.Field{Key: "value", Value: result.Value}, + core.Field{Key: "active_connections", Value: result.ActiveConnections}, + core.Field{Key: "active_workers", Value: result.ActiveWorkers}, ) _, err = stmt.ExecContext(ctx, @@ -369,6 +371,8 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e result.EndTime, result.Value, tagsJSON, + result.ActiveConnections, + result.ActiveWorkers, ) if err != nil { return fmt.Errorf("failed to insert result %d: %w", i, err) @@ -389,7 +393,7 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e // GetResults retrieves results for a test run func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResult, error) { query := ` - SELECT id, test_run_id, test_metric_id, start_time, end_time, value, tags + SELECT id, test_run_id, test_metric_id, start_time, end_time, value, tags, active_connections, active_workers FROM test_run_result WHERE test_run_id = $1 ORDER BY start_time @@ -410,6 +414,7 @@ func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResul for rows.Next() { var result core.TestResult var tagsJSON []byte + var activeConnections, activeWorkers sql.NullInt32 err := rows.Scan( &result.ID, @@ -419,6 +424,8 @@ func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResul &result.EndTime, &result.Value, &tagsJSON, + &activeConnections, + &activeWorkers, ) if err != nil { return nil, fmt.Errorf("failed to scan result: %w", err) @@ -431,6 +438,16 @@ func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResul } } + // Handle nullable connection and worker counts + if activeConnections.Valid { + connections := int(activeConnections.Int32) + result.ActiveConnections = &connections + } + if activeWorkers.Valid { + workers := int(activeWorkers.Int32) + result.ActiveWorkers = &workers + } + results = append(results, result) } @@ -440,7 +457,7 @@ func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResul // GetResultsByMetric retrieves recent results for a specific metric func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, limit int) ([]core.TestResult, error) { query := ` - SELECT trr.id, trr.test_run_id, trr.test_metric_id, trr.start_time, trr.end_time, trr.value, trr.tags + SELECT trr.id, trr.test_run_id, trr.test_metric_id, trr.start_time, trr.end_time, trr.value, trr.tags, trr.active_connections, trr.active_workers FROM test_run_result trr JOIN test_metric tm ON trr.test_metric_id = tm.id WHERE tm.code = $1 @@ -463,6 +480,7 @@ func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, lim for rows.Next() { var result core.TestResult var tagsJSON []byte + var activeConnections, activeWorkers sql.NullInt32 err := rows.Scan( &result.ID, @@ -472,6 +490,8 @@ func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, lim &result.EndTime, &result.Value, &tagsJSON, + &activeConnections, + &activeWorkers, ) if err != nil { return nil, fmt.Errorf("failed to scan result: %w", err) @@ -482,6 +502,17 @@ func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, lim return nil, fmt.Errorf("failed to unmarshal tags: %w", err) } } + + // Handle nullable connection and worker counts + if activeConnections.Valid { + connections := int(activeConnections.Int32) + result.ActiveConnections = &connections + } + if activeWorkers.Valid { + workers := int(activeWorkers.Int32) + result.ActiveWorkers = &workers + } + results = append(results, result) } diff --git a/core/types.go b/core/types.go index 3b90f3e..491f9a2 100644 --- a/core/types.go +++ b/core/types.go @@ -107,13 +107,15 @@ type TestRun struct { // TestResult represents a single measurement from a test execution type TestResult struct { - ID int64 `json:"id"` - TestRunID int64 `json:"test_run_id"` - MetricID int `json:"metric_id"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Value float64 `json:"value"` - Tags map[string]interface{} `json:"tags,omitempty"` // Flexible metadata, stored as JSONB + ID int64 `json:"id"` + TestRunID int64 `json:"test_run_id"` + MetricID int `json:"metric_id"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + Value float64 `json:"value"` + Tags map[string]interface{} `json:"tags,omitempty"` // Flexible metadata, stored as JSONB + ActiveConnections *int `json:"active_connections,omitempty"` // Number of active database connections + ActiveWorkers *int `json:"active_workers,omitempty"` // Number of active worker threads/processes } // TestType represents a category of tests (e.g., bulk_insert, read_latency) diff --git a/migrations/004_add_connection_worker_tracking.sql b/migrations/004_add_connection_worker_tracking.sql new file mode 100644 index 0000000..d0662b1 --- /dev/null +++ b/migrations/004_add_connection_worker_tracking.sql @@ -0,0 +1,17 @@ +-- Migration 004: Add active connections and workers tracking to test_run_result + +-- Add columns to track the number of active connections and workers at the time each metric was saved +ALTER TABLE test_run_result +ADD COLUMN IF NOT EXISTS active_connections INTEGER, +ADD COLUMN IF NOT EXISTS active_workers INTEGER; + +-- Create indexes for querying by connection and worker counts +CREATE INDEX IF NOT EXISTS idx_test_run_result_active_connections ON test_run_result(active_connections); +CREATE INDEX IF NOT EXISTS idx_test_run_result_active_workers ON test_run_result(active_workers); + +-- Create a compound index for metrics with connection/worker analysis +CREATE INDEX IF NOT EXISTS idx_test_run_result_conn_workers ON test_run_result(test_run_id, active_connections, active_workers); + +-- Add comments to clarify the purpose of these fields +COMMENT ON COLUMN test_run_result.active_connections IS 'Number of database connections active when this metric was recorded'; +COMMENT ON COLUMN test_run_result.active_workers IS 'Number of worker threads/processes active when this metric was recorded'; diff --git a/plugins/bulk-load/API_EXAMPLES.md b/plugins/bulk-load/API_EXAMPLES.md index dc512ea..3394b49 100644 --- a/plugins/bulk-load/API_EXAMPLES.md +++ b/plugins/bulk-load/API_EXAMPLES.md @@ -19,11 +19,11 @@ curl -X POST "http://localhost:8080/test-runs" \ "config": { "host": "localhost", "port": 5432, - "database": "stormdb_test", - "username": "stormdb", - "password": "stormdb_password", + "database": "stormdb", + "username": "postgres", + "password": "postgres", "ssl_mode": "disable", - "batch_sizes": [1, 100, 1000], + "batch_sizes": [1, 100], "connections": 10, "duration": "2m", "warmup_time": "10s", diff --git a/plugins/bulk-load/plugin.go b/plugins/bulk-load/plugin.go index 84c2d52..9f96f67 100644 --- a/plugins/bulk-load/plugin.go +++ b/plugins/bulk-load/plugin.go @@ -18,15 +18,17 @@ import ( // BulkLoadPlugin implements the bulk load performance test type BulkLoadPlugin struct { - core *core.CoreServices - logger core.Logger - db *sql.DB - config *BulkLoadConfig - isRunning int64 - stopChan chan struct{} - wg sync.WaitGroup - metrics *BulkLoadMetrics - testStarted time.Time + core *core.CoreServices + logger core.Logger + db *sql.DB + config *BulkLoadConfig + isRunning int64 + stopChan chan struct{} + wg sync.WaitGroup + metrics *BulkLoadMetrics + testStarted time.Time + currentWorkers []*WorkerStats // Live worker stats for real-time metrics + workersMu sync.RWMutex // Protect access to currentWorkers } // BulkLoadConfig defines the configuration for bulk load tests @@ -659,12 +661,22 @@ func (p *BulkLoadPlugin) runBatchTest(ctx context.Context, batchSize int) (*Batc } } + // Store worker stats in plugin for background metrics access + p.workersMu.Lock() + p.currentWorkers = workerStats + p.workersMu.Unlock() + measureCtx, measureCancel := context.WithTimeout(ctx, p.config.Duration) testStart := time.Now() p.runWorkers(measureCtx, batchSize, workerStats) measureCancel() + // Clear worker stats after measurement + p.workersMu.Lock() + p.currentWorkers = nil + p.workersMu.Unlock() + testDuration := time.Since(testStart) result.DurationSeconds = testDuration.Seconds() @@ -830,15 +842,14 @@ func (p *BulkLoadPlugin) executeBulkInsert(batchSize int) error { } // backgroundMetricsSaver continuously saves metrics every second while test is running +// Enhanced version with real-time worker statistics and comprehensive logging func (p *BulkLoadPlugin) backgroundMetricsSaver(ctx context.Context, done chan<- struct{}) { defer close(done) - + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() - lastSavedTransactions := int64(0) - lastSavedRows := int64(0) - saveCounter := 0 + iteration := 0 for { select { @@ -846,35 +857,70 @@ func (p *BulkLoadPlugin) backgroundMetricsSaver(ctx context.Context, done chan<- p.logger.Info("Background metrics saver stopping") return case <-ticker.C: - saveCounter++ - - // Get current metrics snapshot + iteration++ + + now := time.Now() + + // Get current accumulated metrics p.metrics.mu.RLock() - currentTransactions := p.metrics.TotalTransactions - currentRows := p.metrics.TotalRowsInserted - batchCount := len(p.metrics.BatchResults) + accumulatedTransactions := p.metrics.TotalTransactions + accumulatedRows := p.metrics.TotalRowsInserted p.metrics.mu.RUnlock() - // Only save if we have new data - if currentTransactions > lastSavedTransactions || currentRows > lastSavedRows { - err := p.saveCurrentMetrics(ctx, saveCounter) - if err != nil { - p.logger.Error("Failed to save metrics in background", - core.Field{Key: "error", Value: err.Error()}, - core.Field{Key: "save_iteration", Value: saveCounter}) - } else { - p.logger.Info("Background metrics saved", - core.Field{Key: "save_iteration", Value: saveCounter}, - core.Field{Key: "total_transactions", Value: currentTransactions}, - core.Field{Key: "total_rows", Value: currentRows}, - core.Field{Key: "batch_count", Value: batchCount}) - - lastSavedTransactions = currentTransactions - lastSavedRows = currentRows + // Get current live worker statistics + p.workersMu.RLock() + currentWorkers := make([]*WorkerStats, len(p.currentWorkers)) + copy(currentWorkers, p.currentWorkers) + p.workersMu.RUnlock() + + // Calculate live transactions/rows from current workers + var liveTransactions, liveRows int64 + for _, worker := range currentWorkers { + if worker != nil { + liveTransactions += atomic.LoadInt64(&worker.Transactions) + liveRows += atomic.LoadInt64(&worker.RowsInserted) + } + } + + // Debug logging + p.logger.Debug("Background metrics check", + core.Field{Key: "save_iteration", Value: iteration}, + core.Field{Key: "accumulated_transactions", Value: accumulatedTransactions}, + core.Field{Key: "live_transactions", Value: liveTransactions}, + core.Field{Key: "worker_count", Value: len(currentWorkers)}, + ) + + // Total transactions = accumulated from completed batches + live from current batch + totalTransactions := accumulatedTransactions + liveTransactions + totalRows := accumulatedRows + liveRows + + // Save cumulative metrics as snapshot + if totalTransactions > 0 { + // Calculate elapsed time for rates + elapsed := now.Sub(p.metrics.StartTime).Seconds() + if elapsed > 0 { + transactionRate := float64(totalTransactions) / elapsed + rowRate := float64(totalRows) / elapsed + + err := p.saveCurrentMetrics(ctx, iteration) + if err != nil { + p.logger.Error("Failed to save background metrics", + core.Field{Key: "error", Value: err.Error()}, + core.Field{Key: "save_iteration", Value: iteration}) + } else { + p.logger.Debug("Background metrics saved", + core.Field{Key: "save_iteration", Value: iteration}, + core.Field{Key: "total_transactions", Value: totalTransactions}, + core.Field{Key: "total_rows", Value: totalRows}, + core.Field{Key: "transaction_rate", Value: transactionRate}, + core.Field{Key: "row_rate", Value: rowRate}, + core.Field{Key: "active_workers", Value: len(currentWorkers)}) + } } } else { - p.logger.Debug("No new metrics to save", - core.Field{Key: "save_iteration", Value: saveCounter}) + p.logger.Debug("No metrics to save - no transactions yet", + core.Field{Key: "save_iteration", Value: iteration}, + core.Field{Key: "worker_count", Value: len(currentWorkers)}) } } } @@ -912,43 +958,82 @@ func (p *BulkLoadPlugin) saveCurrentMetrics(ctx context.Context, iteration int) var results []core.TestResult p.metrics.mu.RLock() - defer p.metrics.mu.RUnlock() + accumulatedTransactions := p.metrics.TotalTransactions + accumulatedRows := p.metrics.TotalRowsInserted + p.metrics.mu.RUnlock() + + // Get current live worker stats + p.workersMu.RLock() + currentWorkers := p.currentWorkers + activeWorkers := len(currentWorkers) + activeConnections := p.config.Connections // Current configured connections + p.workersMu.RUnlock() + + var liveTransactions, liveRows int64 + for _, worker := range currentWorkers { + liveTransactions += atomic.LoadInt64(&worker.Transactions) + liveRows += atomic.LoadInt64(&worker.RowsInserted) + } + + // Debug logging + p.logger.Debug("Background metrics check", + core.Field{Key: "save_iteration", Value: iteration}, + core.Field{Key: "accumulated_transactions", Value: accumulatedTransactions}, + core.Field{Key: "live_transactions", Value: liveTransactions}, + core.Field{Key: "worker_count", Value: len(currentWorkers)}, + ) + + // Total transactions = accumulated from completed batches + live from current batch + totalTransactions := accumulatedTransactions + liveTransactions + totalRows := accumulatedRows + liveRows // Save cumulative metrics as snapshot - if p.metrics.TotalTransactions > 0 { + if totalTransactions > 0 { // Calculate elapsed time for rates elapsed := now.Sub(p.metrics.StartTime).Seconds() if elapsed > 0 { - transactionRate := float64(p.metrics.TotalTransactions) / elapsed - rowRate := float64(p.metrics.TotalRowsInserted) / elapsed + transactionRate := float64(totalTransactions) / elapsed + rowRate := float64(totalRows) / elapsed // Store transaction rate results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: throughputMetric.ID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: transactionRate, + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: transactionRate, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ "metric_type": "cumulative_transaction_rate", "iteration": iteration, "connections": p.config.Connections, - "total_transactions": p.metrics.TotalTransactions, + "batch_size": nil, // Not applicable for cumulative metrics + "total_transactions": totalTransactions, + "total_rows": totalRows, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "measurement", }, }) // Store row insertion rate results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: rowInsertMetric.ID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: rowRate, + TestRunID: testRunID, + MetricID: rowInsertMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: rowRate, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "metric_type": "cumulative_row_rate", - "iteration": iteration, - "connections": p.config.Connections, - "total_rows": p.metrics.TotalRowsInserted, + "metric_type": "cumulative_row_rate", + "iteration": iteration, + "connections": p.config.Connections, + "batch_size": nil, // Not applicable for cumulative metrics + "total_transactions": totalTransactions, + "total_rows": totalRows, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "measurement", }, }) } @@ -957,26 +1042,32 @@ func (p *BulkLoadPlugin) saveCurrentMetrics(ctx context.Context, iteration int) if len(p.metrics.BatchResults) > 0 { var totalLatency float64 var totalOperations int64 - + for _, batch := range p.metrics.BatchResults { totalLatency += batch.AvgLatencyMs * float64(batch.TotalTransactions) totalOperations += batch.TotalTransactions } - + if totalOperations > 0 { avgLatency := totalLatency / float64(totalOperations) - + results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: latencyAvgMetric.ID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: avgLatency, + TestRunID: testRunID, + MetricID: latencyAvgMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: avgLatency, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "metric_type": "cumulative_avg_latency", - "iteration": iteration, - "connections": p.config.Connections, - "batch_count": len(p.metrics.BatchResults), + "metric_type": "cumulative_avg_latency", + "iteration": iteration, + "connections": p.config.Connections, + "batch_size": nil, // Not applicable for cumulative metrics + "total_transactions": totalTransactions, + "total_rows": totalRows, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "measurement", }, }) } @@ -1033,47 +1124,72 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { core.Field{Key: "total_rows", Value: p.metrics.TotalRowsInserted}, ) + // Active connections and workers for final results + activeConnections := p.config.Connections + activeWorkers := 0 // No workers active at completion + // Convert each batch result to database format for _, batch := range p.metrics.BatchResults { // Store transaction rate results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: rowInsertMetric.ID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: batch.TransactionsPerSec, + TestRunID: testRunID, + MetricID: rowInsertMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: batch.TransactionsPerSec, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "batch_size": batch.BatchSize, - "connections": batch.Connections, - "metric_type": "transactions_per_sec", + "metric_type": "transactions_per_sec", + "iteration": nil, // Not applicable for final batch results + "connections": batch.Connections, + "batch_size": batch.BatchSize, + "total_transactions": batch.TotalTransactions, + "total_rows": batch.TotalRowsInserted, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "final_results", }, }) // Store rows per second results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: rowInsertMetric.ID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: batch.RowsPerSec, + TestRunID: testRunID, + MetricID: rowInsertMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: batch.RowsPerSec, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "batch_size": batch.BatchSize, - "connections": batch.Connections, - "metric_type": "rows_per_sec", + "metric_type": "rows_per_sec", + "iteration": nil, // Not applicable for final batch results + "connections": batch.Connections, + "batch_size": batch.BatchSize, + "total_transactions": batch.TotalTransactions, + "total_rows": batch.TotalRowsInserted, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "final_results", }, }) // Store average latency results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: latencyAvgMetric.ID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: batch.AvgLatencyMs, + TestRunID: testRunID, + MetricID: latencyAvgMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: batch.AvgLatencyMs, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "batch_size": batch.BatchSize, - "connections": batch.Connections, - "metric_type": "avg_latency_ms", + "metric_type": "avg_latency_ms", + "iteration": nil, // Not applicable for final batch results + "connections": batch.Connections, + "batch_size": batch.BatchSize, + "total_transactions": batch.TotalTransactions, + "total_rows": batch.TotalRowsInserted, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "final_results", }, }) } diff --git a/plugins/tpcc-scalability/plugin.go b/plugins/tpcc-scalability/plugin.go index dcb8c8a..3b71455 100644 --- a/plugins/tpcc-scalability/plugin.go +++ b/plugins/tpcc-scalability/plugin.go @@ -196,7 +196,7 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) defer atomic.StoreInt64(&p.isRunning, 0) p.testStarted = time.Now() - + // Start background metrics saver goroutine metricsCtx, cancelMetrics := context.WithCancel(ctx) metricsDone := make(chan struct{}) @@ -205,7 +205,7 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) cancelMetrics() <-metricsDone // Wait for goroutine to finish }() - + p.logger.Info("starting TPC-C scalability test", core.Field{Key: "scale", Value: p.config.Scale}, core.Field{Key: "connection_levels", Value: len(p.config.Connections)}, @@ -1116,7 +1116,7 @@ func (p *TPCCPlugin) logConnectionLevelSummary(connCount int, duration time.Dura // backgroundMetricsSaver continuously saves metrics every second while test is running func (p *TPCCPlugin) backgroundMetricsSaver(ctx context.Context, done chan<- struct{}) { defer close(done) - + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -1130,7 +1130,7 @@ func (p *TPCCPlugin) backgroundMetricsSaver(ctx context.Context, done chan<- str return case <-ticker.C: saveCounter++ - + // Get current metrics snapshot p.metrics.mu.RLock() totalTxns := p.metrics.NewOrderCount + p.metrics.PaymentCount + @@ -1152,11 +1152,11 @@ func (p *TPCCPlugin) backgroundMetricsSaver(ctx context.Context, done chan<- str core.Field{Key: "total_transactions", Value: totalTxns}, core.Field{Key: "connections", Value: currentConnections}, core.Field{Key: "test_phase", Value: testPhase}) - + lastSavedTransactions = totalTxns } } else { - p.logger.Debug("No new TPCC metrics to save", + p.logger.Debug("No new TPCC metrics to save", core.Field{Key: "save_iteration", Value: saveCounter}) } } @@ -1196,6 +1196,10 @@ func (p *TPCCPlugin) saveCurrentTPCCMetrics(ctx context.Context, iteration int) totalTxns := p.metrics.NewOrderCount + p.metrics.PaymentCount + p.metrics.OrderStatusCount + p.metrics.DeliveryCount + p.metrics.StockLevelCount + // Get connection and worker info + activeConnections := p.metrics.CurrentConnections + activeWorkers := activeConnections // For TPCC, number of workers typically equals connections + if totalTxns > 0 { // Calculate elapsed time for rates elapsed := now.Sub(p.testStarted).Seconds() @@ -1204,17 +1208,23 @@ func (p *TPCCPlugin) saveCurrentTPCCMetrics(ctx context.Context, iteration int) // Store transaction rate results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: throughputMetric.ID, - StartTime: p.testStarted, - EndTime: now, - Value: transactionRate, + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: transactionRate, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ "metric_type": "cumulative_tps", "iteration": iteration, "connections": p.metrics.CurrentConnections, - "test_phase": p.metrics.TestPhase, + "batch_size": nil, // Not applicable for TPCC "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": p.metrics.TestPhase, + "scale_factor": p.config.Scale, }, }) } @@ -1222,18 +1232,25 @@ func (p *TPCCPlugin) saveCurrentTPCCMetrics(ctx context.Context, iteration int) // Calculate and store average latency if totalTxns > 0 { avgLatencyMs := float64(p.metrics.TotalLatency.Nanoseconds()) / float64(totalTxns) / 1000000.0 - + results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: latencyAvgMetric.ID, - StartTime: p.testStarted, - EndTime: now, - Value: avgLatencyMs, + TestRunID: testRunID, + MetricID: latencyAvgMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: avgLatencyMs, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "metric_type": "cumulative_avg_latency", - "iteration": iteration, - "connections": p.metrics.CurrentConnections, - "test_phase": p.metrics.TestPhase, + "metric_type": "cumulative_avg_latency", + "iteration": iteration, + "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": p.metrics.TestPhase, + "scale_factor": p.config.Scale, }, }) } @@ -1250,17 +1267,24 @@ func (p *TPCCPlugin) saveCurrentTPCCMetrics(ctx context.Context, iteration int) for txType, count := range txTypes { if count > 0 { results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: throughputMetric.ID, - StartTime: p.testStarted, - EndTime: now, - Value: float64(count), + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: float64(count), + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ "metric_type": "transaction_count", - "transaction_type": txType, "iteration": iteration, "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC "test_phase": p.metrics.TestPhase, + "scale_factor": p.config.Scale, + "transaction_type": txType, }, }) } @@ -1313,33 +1337,53 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { totalTxns := p.metrics.NewOrderCount + p.metrics.PaymentCount + p.metrics.OrderStatusCount + p.metrics.DeliveryCount + p.metrics.StockLevelCount + // Connection and worker info for final results + activeConnections := p.metrics.CurrentConnections + activeWorkers := 0 // No workers active at completion + if totalTxns > 0 { // Store total transaction rate results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: throughputMetric.ID, - StartTime: p.testStarted, - EndTime: now, - Value: float64(totalTxns), + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: float64(totalTxns), + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "connections": p.metrics.CurrentConnections, - "metric_type": "total_transactions", - "test_phase": p.metrics.TestPhase, + "metric_type": "total_transactions", + "iteration": nil, // Not applicable for final results + "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": "final_results", + "scale_factor": p.config.Scale, }, }) // Store average latency avgLatencyMs := float64(p.metrics.TotalLatency.Nanoseconds()) / float64(totalTxns) / 1000000.0 results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: latencyAvgMetric.ID, - StartTime: p.testStarted, - EndTime: now, - Value: avgLatencyMs, + TestRunID: testRunID, + MetricID: latencyAvgMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: avgLatencyMs, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "connections": p.metrics.CurrentConnections, - "metric_type": "avg_latency_ms", - "test_phase": p.metrics.TestPhase, + "metric_type": "avg_latency_ms", + "iteration": nil, // Not applicable for final results + "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": "final_results", + "scale_factor": p.config.Scale, }, }) @@ -1355,16 +1399,24 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { for txType, count := range txTypes { if count > 0 { results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: throughputMetric.ID, - StartTime: p.testStarted, - EndTime: now, - Value: float64(count), + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: float64(count), + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "connections": p.metrics.CurrentConnections, - "metric_type": "transaction_count", - "transaction_type": txType, - "test_phase": p.metrics.TestPhase, + "metric_type": "transaction_count", + "iteration": nil, // Not applicable for final results + "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": "final_results", + "scale_factor": p.config.Scale, + "transaction_type": txType, }, }) }