diff --git a/config/core.yaml b/config/core.yaml index f10a840..407abfb 100644 --- a/config/core.yaml +++ b/config/core.yaml @@ -18,7 +18,7 @@ api: port: 9090 logging: - level: "info" + level: "debug" format: "text" # "text" or "json"aa output: "stdout" # "stdout", "stderr", or file path diff --git a/core/storage/manager.go b/core/storage/manager.go index 3bf433d..6ae8063 100644 --- a/core/storage/manager.go +++ b/core/storage/manager.go @@ -314,9 +314,16 @@ func (m *Manager) ListTestRuns(ctx context.Context, limit, offset int) ([]core.T // StoreResults stores test results in batch func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) error { if len(results) == 0 { + m.logger.Warn("StoreResults called with empty results array") return nil } + m.logger.Info("StoreResults called", + core.Field{Key: "result_count", Value: len(results)}, + core.Field{Key: "first_test_run_id", Value: results[0].TestRunID}, + core.Field{Key: "first_metric_id", Value: results[0].MetricID}, + ) + db, err := m.db.GetConnection(ctx) if err != nil { return fmt.Errorf("failed to get database connection: %w", err) @@ -329,11 +336,11 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e } defer tx.Rollback() - // Prepare insert statement + // Prepare insert statement with new connection and worker fields query := ` INSERT INTO test_run_result - (test_run_id, test_metric_id, start_time, end_time, value, tags) - VALUES ($1, $2, $3, $4, $5, $6) + (test_run_id, test_metric_id, start_time, end_time, value, tags, active_connections, active_workers) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ` stmt, err := tx.PrepareContext(ctx, query) if err != nil { @@ -342,12 +349,21 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e defer stmt.Close() // Insert each result - for _, result := range results { + for i, result := range results { tagsJSON, err := json.Marshal(result.Tags) if err != nil { - return fmt.Errorf("failed to marshal tags: %w", err) + return fmt.Errorf("failed to marshal tags for result %d: %w", i, err) } + m.logger.Debug("Inserting result", + core.Field{Key: "index", Value: i}, + core.Field{Key: "test_run_id", Value: result.TestRunID}, + core.Field{Key: "metric_id", Value: result.MetricID}, + core.Field{Key: "value", Value: result.Value}, + core.Field{Key: "active_connections", Value: result.ActiveConnections}, + core.Field{Key: "active_workers", Value: result.ActiveWorkers}, + ) + _, err = stmt.ExecContext(ctx, result.TestRunID, result.MetricID, @@ -355,9 +371,11 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e result.EndTime, result.Value, tagsJSON, + result.ActiveConnections, + result.ActiveWorkers, ) if err != nil { - return fmt.Errorf("failed to insert result: %w", err) + return fmt.Errorf("failed to insert result %d: %w", i, err) } } @@ -365,9 +383,8 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e return fmt.Errorf("failed to commit transaction: %w", err) } - m.logger.Info("test results stored", - core.Field{Key: "result_count", Value: len(results)}, - core.Field{Key: "test_run_id", Value: results[0].TestRunID}, + m.logger.Info("Successfully stored results to database", + core.Field{Key: "stored_count", Value: len(results)}, ) return nil @@ -376,7 +393,7 @@ func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) e // GetResults retrieves results for a test run func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResult, error) { query := ` - SELECT id, test_run_id, test_metric_id, start_time, end_time, value, tags + SELECT id, test_run_id, test_metric_id, start_time, end_time, value, tags, active_connections, active_workers FROM test_run_result WHERE test_run_id = $1 ORDER BY start_time @@ -397,6 +414,7 @@ func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResul for rows.Next() { var result core.TestResult var tagsJSON []byte + var activeConnections, activeWorkers sql.NullInt32 err := rows.Scan( &result.ID, @@ -406,6 +424,8 @@ func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResul &result.EndTime, &result.Value, &tagsJSON, + &activeConnections, + &activeWorkers, ) if err != nil { return nil, fmt.Errorf("failed to scan result: %w", err) @@ -418,6 +438,16 @@ func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResul } } + // Handle nullable connection and worker counts + if activeConnections.Valid { + connections := int(activeConnections.Int32) + result.ActiveConnections = &connections + } + if activeWorkers.Valid { + workers := int(activeWorkers.Int32) + result.ActiveWorkers = &workers + } + results = append(results, result) } @@ -427,7 +457,7 @@ func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResul // GetResultsByMetric retrieves recent results for a specific metric func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, limit int) ([]core.TestResult, error) { query := ` - SELECT trr.id, trr.test_run_id, trr.test_metric_id, trr.start_time, trr.end_time, trr.value, trr.tags + SELECT trr.id, trr.test_run_id, trr.test_metric_id, trr.start_time, trr.end_time, trr.value, trr.tags, trr.active_connections, trr.active_workers FROM test_run_result trr JOIN test_metric tm ON trr.test_metric_id = tm.id WHERE tm.code = $1 @@ -450,6 +480,7 @@ func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, lim for rows.Next() { var result core.TestResult var tagsJSON []byte + var activeConnections, activeWorkers sql.NullInt32 err := rows.Scan( &result.ID, @@ -459,6 +490,8 @@ func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, lim &result.EndTime, &result.Value, &tagsJSON, + &activeConnections, + &activeWorkers, ) if err != nil { return nil, fmt.Errorf("failed to scan result: %w", err) @@ -469,6 +502,17 @@ func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, lim return nil, fmt.Errorf("failed to unmarshal tags: %w", err) } } + + // Handle nullable connection and worker counts + if activeConnections.Valid { + connections := int(activeConnections.Int32) + result.ActiveConnections = &connections + } + if activeWorkers.Valid { + workers := int(activeWorkers.Int32) + result.ActiveWorkers = &workers + } + results = append(results, result) } diff --git a/core/types.go b/core/types.go index 3b90f3e..491f9a2 100644 --- a/core/types.go +++ b/core/types.go @@ -107,13 +107,15 @@ type TestRun struct { // TestResult represents a single measurement from a test execution type TestResult struct { - ID int64 `json:"id"` - TestRunID int64 `json:"test_run_id"` - MetricID int `json:"metric_id"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Value float64 `json:"value"` - Tags map[string]interface{} `json:"tags,omitempty"` // Flexible metadata, stored as JSONB + ID int64 `json:"id"` + TestRunID int64 `json:"test_run_id"` + MetricID int `json:"metric_id"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + Value float64 `json:"value"` + Tags map[string]interface{} `json:"tags,omitempty"` // Flexible metadata, stored as JSONB + ActiveConnections *int `json:"active_connections,omitempty"` // Number of active database connections + ActiveWorkers *int `json:"active_workers,omitempty"` // Number of active worker threads/processes } // TestType represents a category of tests (e.g., bulk_insert, read_latency) diff --git a/migrations/004_add_connection_worker_tracking.sql b/migrations/004_add_connection_worker_tracking.sql new file mode 100644 index 0000000..d0662b1 --- /dev/null +++ b/migrations/004_add_connection_worker_tracking.sql @@ -0,0 +1,17 @@ +-- Migration 004: Add active connections and workers tracking to test_run_result + +-- Add columns to track the number of active connections and workers at the time each metric was saved +ALTER TABLE test_run_result +ADD COLUMN IF NOT EXISTS active_connections INTEGER, +ADD COLUMN IF NOT EXISTS active_workers INTEGER; + +-- Create indexes for querying by connection and worker counts +CREATE INDEX IF NOT EXISTS idx_test_run_result_active_connections ON test_run_result(active_connections); +CREATE INDEX IF NOT EXISTS idx_test_run_result_active_workers ON test_run_result(active_workers); + +-- Create a compound index for metrics with connection/worker analysis +CREATE INDEX IF NOT EXISTS idx_test_run_result_conn_workers ON test_run_result(test_run_id, active_connections, active_workers); + +-- Add comments to clarify the purpose of these fields +COMMENT ON COLUMN test_run_result.active_connections IS 'Number of database connections active when this metric was recorded'; +COMMENT ON COLUMN test_run_result.active_workers IS 'Number of worker threads/processes active when this metric was recorded'; diff --git a/plugins/bulk-load/API_EXAMPLES.md b/plugins/bulk-load/API_EXAMPLES.md index dc512ea..3394b49 100644 --- a/plugins/bulk-load/API_EXAMPLES.md +++ b/plugins/bulk-load/API_EXAMPLES.md @@ -19,11 +19,11 @@ curl -X POST "http://localhost:8080/test-runs" \ "config": { "host": "localhost", "port": 5432, - "database": "stormdb_test", - "username": "stormdb", - "password": "stormdb_password", + "database": "stormdb", + "username": "postgres", + "password": "postgres", "ssl_mode": "disable", - "batch_sizes": [1, 100, 1000], + "batch_sizes": [1, 100], "connections": 10, "duration": "2m", "warmup_time": "10s", diff --git a/plugins/bulk-load/plugin.go b/plugins/bulk-load/plugin.go index 6650706..9f96f67 100644 --- a/plugins/bulk-load/plugin.go +++ b/plugins/bulk-load/plugin.go @@ -18,15 +18,17 @@ import ( // BulkLoadPlugin implements the bulk load performance test type BulkLoadPlugin struct { - core *core.CoreServices - logger core.Logger - db *sql.DB - config *BulkLoadConfig - isRunning int64 - stopChan chan struct{} - wg sync.WaitGroup - metrics *BulkLoadMetrics - testStarted time.Time + core *core.CoreServices + logger core.Logger + db *sql.DB + config *BulkLoadConfig + isRunning int64 + stopChan chan struct{} + wg sync.WaitGroup + metrics *BulkLoadMetrics + testStarted time.Time + currentWorkers []*WorkerStats // Live worker stats for real-time metrics + workersMu sync.RWMutex // Protect access to currentWorkers } // BulkLoadConfig defines the configuration for bulk load tests @@ -356,6 +358,9 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac // Extract test run ID from context and add to logger if testRunID, ok := ctx.Value("test_run_id").(int64); ok { p.logger = p.logger.WithFields(core.Field{Key: "test_run_id", Value: testRunID}) + p.logger.Info("Plugin executing with test run ID", core.Field{Key: "test_run_id", Value: testRunID}) + } else { + p.logger.Warn("No test run ID found in context - this may cause issues with result storage") } // Set running state @@ -367,6 +372,15 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac p.testStarted = time.Now() p.metrics.StartTime = p.testStarted + // Start background metrics saver goroutine + metricsCtx, cancelMetrics := context.WithCancel(ctx) + metricsDone := make(chan struct{}) + go p.backgroundMetricsSaver(metricsCtx, metricsDone) + defer func() { + cancelMetrics() + <-metricsDone // Wait for goroutine to finish + }() + p.logger.Info("Starting bulk load performance test", core.Field{Key: "batch_sizes", Value: p.config.BatchSizes}, core.Field{Key: "connections", Value: p.config.Connections}, @@ -419,6 +433,14 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac p.metrics.TotalErrors += result.TotalErrors p.metrics.mu.Unlock() + p.logger.Info("Metrics collection update", + core.Field{Key: "batch_size", Value: batchSize}, + core.Field{Key: "total_transactions", Value: result.TotalTransactions}, + core.Field{Key: "total_rows", Value: result.TotalRowsInserted}, + core.Field{Key: "errors", Value: result.TotalErrors}, + core.Field{Key: "accumulated_total_transactions", Value: p.metrics.TotalTransactions}, + core.Field{Key: "accumulated_total_rows", Value: p.metrics.TotalRowsInserted}) + p.logger.Info("Batch test completed", core.Field{Key: "batch_size", Value: batchSize}, core.Field{Key: "transactions", Value: result.TotalTransactions}, @@ -449,6 +471,8 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac if err := p.storeResults(ctx); err != nil { p.logger.Error("Failed to store test results", core.Field{Key: "error", Value: err.Error()}) // Don't fail the entire test, just log the error + } else { + p.logger.Info("Test results stored successfully") } return nil @@ -637,12 +661,22 @@ func (p *BulkLoadPlugin) runBatchTest(ctx context.Context, batchSize int) (*Batc } } + // Store worker stats in plugin for background metrics access + p.workersMu.Lock() + p.currentWorkers = workerStats + p.workersMu.Unlock() + measureCtx, measureCancel := context.WithTimeout(ctx, p.config.Duration) testStart := time.Now() p.runWorkers(measureCtx, batchSize, workerStats) measureCancel() + // Clear worker stats after measurement + p.workersMu.Lock() + p.currentWorkers = nil + p.workersMu.Unlock() + testDuration := time.Since(testStart) result.DurationSeconds = testDuration.Seconds() @@ -807,6 +841,251 @@ func (p *BulkLoadPlugin) executeBulkInsert(batchSize int) error { return err } +// backgroundMetricsSaver continuously saves metrics every second while test is running +// Enhanced version with real-time worker statistics and comprehensive logging +func (p *BulkLoadPlugin) backgroundMetricsSaver(ctx context.Context, done chan<- struct{}) { + defer close(done) + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + iteration := 0 + + for { + select { + case <-ctx.Done(): + p.logger.Info("Background metrics saver stopping") + return + case <-ticker.C: + iteration++ + + now := time.Now() + + // Get current accumulated metrics + p.metrics.mu.RLock() + accumulatedTransactions := p.metrics.TotalTransactions + accumulatedRows := p.metrics.TotalRowsInserted + p.metrics.mu.RUnlock() + + // Get current live worker statistics + p.workersMu.RLock() + currentWorkers := make([]*WorkerStats, len(p.currentWorkers)) + copy(currentWorkers, p.currentWorkers) + p.workersMu.RUnlock() + + // Calculate live transactions/rows from current workers + var liveTransactions, liveRows int64 + for _, worker := range currentWorkers { + if worker != nil { + liveTransactions += atomic.LoadInt64(&worker.Transactions) + liveRows += atomic.LoadInt64(&worker.RowsInserted) + } + } + + // Debug logging + p.logger.Debug("Background metrics check", + core.Field{Key: "save_iteration", Value: iteration}, + core.Field{Key: "accumulated_transactions", Value: accumulatedTransactions}, + core.Field{Key: "live_transactions", Value: liveTransactions}, + core.Field{Key: "worker_count", Value: len(currentWorkers)}, + ) + + // Total transactions = accumulated from completed batches + live from current batch + totalTransactions := accumulatedTransactions + liveTransactions + totalRows := accumulatedRows + liveRows + + // Save cumulative metrics as snapshot + if totalTransactions > 0 { + // Calculate elapsed time for rates + elapsed := now.Sub(p.metrics.StartTime).Seconds() + if elapsed > 0 { + transactionRate := float64(totalTransactions) / elapsed + rowRate := float64(totalRows) / elapsed + + err := p.saveCurrentMetrics(ctx, iteration) + if err != nil { + p.logger.Error("Failed to save background metrics", + core.Field{Key: "error", Value: err.Error()}, + core.Field{Key: "save_iteration", Value: iteration}) + } else { + p.logger.Debug("Background metrics saved", + core.Field{Key: "save_iteration", Value: iteration}, + core.Field{Key: "total_transactions", Value: totalTransactions}, + core.Field{Key: "total_rows", Value: totalRows}, + core.Field{Key: "transaction_rate", Value: transactionRate}, + core.Field{Key: "row_rate", Value: rowRate}, + core.Field{Key: "active_workers", Value: len(currentWorkers)}) + } + } + } else { + p.logger.Debug("No metrics to save - no transactions yet", + core.Field{Key: "save_iteration", Value: iteration}, + core.Field{Key: "worker_count", Value: len(currentWorkers)}) + } + } + } +} + +// saveCurrentMetrics saves current accumulated metrics to database +func (p *BulkLoadPlugin) saveCurrentMetrics(ctx context.Context, iteration int) error { + if p.core == nil || p.core.Storage == nil { + return fmt.Errorf("core services not available") + } + + // Extract test run ID from context + testRunID, ok := ctx.Value("test_run_id").(int64) + if !ok { + return fmt.Errorf("test_run_id not found in context") + } + + // Get metric IDs + rowInsertMetric, err := p.core.Storage.GetMetric(ctx, "ROW_INSERT") + if err != nil { + return fmt.Errorf("failed to get ROW_INSERT metric: %w", err) + } + + latencyAvgMetric, err := p.core.Storage.GetMetric(ctx, "LATENCY_AVG") + if err != nil { + return fmt.Errorf("failed to get LATENCY_AVG metric: %w", err) + } + + throughputMetric, err := p.core.Storage.GetMetric(ctx, "THROUGHPUT") + if err != nil { + return fmt.Errorf("failed to get THROUGHPUT metric: %w", err) + } + + now := time.Now() + var results []core.TestResult + + p.metrics.mu.RLock() + accumulatedTransactions := p.metrics.TotalTransactions + accumulatedRows := p.metrics.TotalRowsInserted + p.metrics.mu.RUnlock() + + // Get current live worker stats + p.workersMu.RLock() + currentWorkers := p.currentWorkers + activeWorkers := len(currentWorkers) + activeConnections := p.config.Connections // Current configured connections + p.workersMu.RUnlock() + + var liveTransactions, liveRows int64 + for _, worker := range currentWorkers { + liveTransactions += atomic.LoadInt64(&worker.Transactions) + liveRows += atomic.LoadInt64(&worker.RowsInserted) + } + + // Debug logging + p.logger.Debug("Background metrics check", + core.Field{Key: "save_iteration", Value: iteration}, + core.Field{Key: "accumulated_transactions", Value: accumulatedTransactions}, + core.Field{Key: "live_transactions", Value: liveTransactions}, + core.Field{Key: "worker_count", Value: len(currentWorkers)}, + ) + + // Total transactions = accumulated from completed batches + live from current batch + totalTransactions := accumulatedTransactions + liveTransactions + totalRows := accumulatedRows + liveRows + + // Save cumulative metrics as snapshot + if totalTransactions > 0 { + // Calculate elapsed time for rates + elapsed := now.Sub(p.metrics.StartTime).Seconds() + if elapsed > 0 { + transactionRate := float64(totalTransactions) / elapsed + rowRate := float64(totalRows) / elapsed + + // Store transaction rate + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: transactionRate, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, + Tags: map[string]interface{}{ + "metric_type": "cumulative_transaction_rate", + "iteration": iteration, + "connections": p.config.Connections, + "batch_size": nil, // Not applicable for cumulative metrics + "total_transactions": totalTransactions, + "total_rows": totalRows, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "measurement", + }, + }) + + // Store row insertion rate + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: rowInsertMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: rowRate, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, + Tags: map[string]interface{}{ + "metric_type": "cumulative_row_rate", + "iteration": iteration, + "connections": p.config.Connections, + "batch_size": nil, // Not applicable for cumulative metrics + "total_transactions": totalTransactions, + "total_rows": totalRows, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "measurement", + }, + }) + } + + // Calculate average latency from batch results + if len(p.metrics.BatchResults) > 0 { + var totalLatency float64 + var totalOperations int64 + + for _, batch := range p.metrics.BatchResults { + totalLatency += batch.AvgLatencyMs * float64(batch.TotalTransactions) + totalOperations += batch.TotalTransactions + } + + if totalOperations > 0 { + avgLatency := totalLatency / float64(totalOperations) + + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: latencyAvgMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: avgLatency, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, + Tags: map[string]interface{}{ + "metric_type": "cumulative_avg_latency", + "iteration": iteration, + "connections": p.config.Connections, + "batch_size": nil, // Not applicable for cumulative metrics + "total_transactions": totalTransactions, + "total_rows": totalRows, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "measurement", + }, + }) + } + } + } + + if len(results) > 0 { + p.logger.Debug("Saving background metrics", + core.Field{Key: "test_run_id", Value: testRunID}, + core.Field{Key: "result_count", Value: len(results)}, + core.Field{Key: "iteration", Value: iteration}) + + return p.core.Storage.StoreResults(ctx, results) + } + + return nil +} + // storeResults converts plugin metrics to core.TestResult and stores them in the database func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { if p.core == nil || p.core.Storage == nil { @@ -820,60 +1099,122 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { testRunID = 0 } + // Look up metric IDs from database instead of using hardcoded values + rowInsertMetric, err := p.core.Storage.GetMetric(ctx, "ROW_INSERT") + if err != nil { + return fmt.Errorf("failed to get ROW_INSERT metric: %w", err) + } + + latencyAvgMetric, err := p.core.Storage.GetMetric(ctx, "LATENCY_AVG") + if err != nil { + return fmt.Errorf("failed to get LATENCY_AVG metric: %w", err) + } + var results []core.TestResult now := time.Now() - // Get metric IDs (we should really look these up, but for now use hardcoded IDs) - rowInsertMetricID := 1 // ROW_INSERT - latencyMetricID := 7 // LATENCY_AVG + if len(p.metrics.BatchResults) == 0 { + p.logger.Warn("No batch results to store - test may not have run properly") + return nil + } + + p.logger.Info("Processing batch results for storage", + core.Field{Key: "batch_count", Value: len(p.metrics.BatchResults)}, + core.Field{Key: "total_transactions", Value: p.metrics.TotalTransactions}, + core.Field{Key: "total_rows", Value: p.metrics.TotalRowsInserted}, + ) + + // Active connections and workers for final results + activeConnections := p.config.Connections + activeWorkers := 0 // No workers active at completion // Convert each batch result to database format for _, batch := range p.metrics.BatchResults { // Store transaction rate results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: rowInsertMetricID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: batch.TransactionsPerSec, + TestRunID: testRunID, + MetricID: rowInsertMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: batch.TransactionsPerSec, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "batch_size": batch.BatchSize, - "connections": batch.Connections, - "metric_type": "transactions_per_sec", + "metric_type": "transactions_per_sec", + "iteration": nil, // Not applicable for final batch results + "connections": batch.Connections, + "batch_size": batch.BatchSize, + "total_transactions": batch.TotalTransactions, + "total_rows": batch.TotalRowsInserted, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "final_results", }, }) // Store rows per second results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: rowInsertMetricID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: batch.RowsPerSec, + TestRunID: testRunID, + MetricID: rowInsertMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: batch.RowsPerSec, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "batch_size": batch.BatchSize, - "connections": batch.Connections, - "metric_type": "rows_per_sec", + "metric_type": "rows_per_sec", + "iteration": nil, // Not applicable for final batch results + "connections": batch.Connections, + "batch_size": batch.BatchSize, + "total_transactions": batch.TotalTransactions, + "total_rows": batch.TotalRowsInserted, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "final_results", }, }) // Store average latency results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: latencyMetricID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: batch.AvgLatencyMs, + TestRunID: testRunID, + MetricID: latencyAvgMetric.ID, + StartTime: p.metrics.StartTime, + EndTime: now, + Value: batch.AvgLatencyMs, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "batch_size": batch.BatchSize, - "connections": batch.Connections, - "metric_type": "avg_latency_ms", + "metric_type": "avg_latency_ms", + "iteration": nil, // Not applicable for final batch results + "connections": batch.Connections, + "batch_size": batch.BatchSize, + "total_transactions": batch.TotalTransactions, + "total_rows": batch.TotalRowsInserted, + "batch_count": len(p.metrics.BatchResults), + "test_phase": "final_results", }, }) } + p.logger.Info("Storing test results", + core.Field{Key: "test_run_id", Value: testRunID}, + core.Field{Key: "result_count", Value: len(results)}, + core.Field{Key: "batch_count", Value: len(p.metrics.BatchResults)}, + core.Field{Key: "metrics_start_time", Value: p.metrics.StartTime}, + core.Field{Key: "metrics_end_time", Value: p.metrics.EndTime}, + core.Field{Key: "total_transactions", Value: p.metrics.TotalTransactions}, + core.Field{Key: "total_rows", Value: p.metrics.TotalRowsInserted}, + ) + // Store all results - return p.core.Storage.StoreResults(ctx, results) + storeErr := p.core.Storage.StoreResults(ctx, results) + if storeErr != nil { + p.logger.Error("Failed to store results to database", core.Field{Key: "error", Value: storeErr.Error()}) + return storeErr + } + + p.logger.Info("Successfully stored test results to database", + core.Field{Key: "stored_results", Value: len(results)}, + ) + return nil } // NewPlugin returns the plugin instance (required for plugin loading) diff --git a/plugins/bulk-load/plugin_test.go b/plugins/bulk-load/plugin_test.go index b4dfb53..9ece89b 100644 --- a/plugins/bulk-load/plugin_test.go +++ b/plugins/bulk-load/plugin_test.go @@ -12,13 +12,13 @@ import ( // MockLogger implements the core.Logger interface for testing type MockLogger struct{} -func (m *MockLogger) Debug(msg string, fields ...core.Field) {} -func (m *MockLogger) Info(msg string, fields ...core.Field) {} -func (m *MockLogger) Warn(msg string, fields ...core.Field) {} -func (m *MockLogger) Error(msg string, fields ...core.Field) {} -func (m *MockLogger) WithFields(fields ...core.Field) core.Logger { return m } -func (m *MockLogger) WithPlugin(pluginName string) core.Logger { return m } -func (m *MockLogger) WithStorage(storage core.StorageManager) core.Logger { return m } +func (m *MockLogger) Debug(msg string, fields ...core.Field) {} +func (m *MockLogger) Info(msg string, fields ...core.Field) {} +func (m *MockLogger) Warn(msg string, fields ...core.Field) {} +func (m *MockLogger) Error(msg string, fields ...core.Field) {} +func (m *MockLogger) WithFields(fields ...core.Field) core.Logger { return m } +func (m *MockLogger) WithPlugin(pluginName string) core.Logger { return m } +func (m *MockLogger) WithStorage(storage core.StorageManager) core.Logger { return m } // MockCoreServices provides mock implementations for testing type MockCoreServices struct { diff --git a/plugins/tpcc-scalability/plugin.go b/plugins/tpcc-scalability/plugin.go index 0ac08ac..3b71455 100644 --- a/plugins/tpcc-scalability/plugin.go +++ b/plugins/tpcc-scalability/plugin.go @@ -196,6 +196,16 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) defer atomic.StoreInt64(&p.isRunning, 0) p.testStarted = time.Now() + + // Start background metrics saver goroutine + metricsCtx, cancelMetrics := context.WithCancel(ctx) + metricsDone := make(chan struct{}) + go p.backgroundMetricsSaver(metricsCtx, metricsDone) + defer func() { + cancelMetrics() + <-metricsDone // Wait for goroutine to finish + }() + p.logger.Info("starting TPC-C scalability test", core.Field{Key: "scale", Value: p.config.Scale}, core.Field{Key: "connection_levels", Value: len(p.config.Connections)}, @@ -1103,6 +1113,196 @@ func (p *TPCCPlugin) logConnectionLevelSummary(connCount int, duration time.Dura ) } +// backgroundMetricsSaver continuously saves metrics every second while test is running +func (p *TPCCPlugin) backgroundMetricsSaver(ctx context.Context, done chan<- struct{}) { + defer close(done) + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + lastSavedTransactions := int64(0) + saveCounter := 0 + + for { + select { + case <-ctx.Done(): + p.logger.Info("Background metrics saver stopping") + return + case <-ticker.C: + saveCounter++ + + // Get current metrics snapshot + p.metrics.mu.RLock() + totalTxns := p.metrics.NewOrderCount + p.metrics.PaymentCount + + p.metrics.OrderStatusCount + p.metrics.DeliveryCount + p.metrics.StockLevelCount + currentConnections := p.metrics.CurrentConnections + testPhase := p.metrics.TestPhase + p.metrics.mu.RUnlock() + + // Only save if we have new data + if totalTxns > lastSavedTransactions { + err := p.saveCurrentTPCCMetrics(ctx, saveCounter) + if err != nil { + p.logger.Error("Failed to save TPCC metrics in background", + core.Field{Key: "error", Value: err.Error()}, + core.Field{Key: "save_iteration", Value: saveCounter}) + } else { + p.logger.Info("Background TPCC metrics saved", + core.Field{Key: "save_iteration", Value: saveCounter}, + core.Field{Key: "total_transactions", Value: totalTxns}, + core.Field{Key: "connections", Value: currentConnections}, + core.Field{Key: "test_phase", Value: testPhase}) + + lastSavedTransactions = totalTxns + } + } else { + p.logger.Debug("No new TPCC metrics to save", + core.Field{Key: "save_iteration", Value: saveCounter}) + } + } + } +} + +// saveCurrentTPCCMetrics saves current accumulated metrics to database +func (p *TPCCPlugin) saveCurrentTPCCMetrics(ctx context.Context, iteration int) error { + if p.core == nil || p.core.Storage == nil { + return fmt.Errorf("core services not available") + } + + // Extract test run ID from context + testRunID, ok := ctx.Value("test_run_id").(int64) + if !ok { + return fmt.Errorf("test_run_id not found in context") + } + + // Get metric IDs + throughputMetric, err := p.core.Storage.GetMetric(ctx, "THROUGHPUT") + if err != nil { + return fmt.Errorf("failed to get THROUGHPUT metric: %w", err) + } + + latencyAvgMetric, err := p.core.Storage.GetMetric(ctx, "LATENCY_AVG") + if err != nil { + return fmt.Errorf("failed to get LATENCY_AVG metric: %w", err) + } + + now := time.Now() + var results []core.TestResult + + p.metrics.mu.RLock() + defer p.metrics.mu.RUnlock() + + // Calculate total transactions + totalTxns := p.metrics.NewOrderCount + p.metrics.PaymentCount + + p.metrics.OrderStatusCount + p.metrics.DeliveryCount + p.metrics.StockLevelCount + + // Get connection and worker info + activeConnections := p.metrics.CurrentConnections + activeWorkers := activeConnections // For TPCC, number of workers typically equals connections + + if totalTxns > 0 { + // Calculate elapsed time for rates + elapsed := now.Sub(p.testStarted).Seconds() + if elapsed > 0 { + transactionRate := float64(totalTxns) / elapsed + + // Store transaction rate + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: transactionRate, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, + Tags: map[string]interface{}{ + "metric_type": "cumulative_tps", + "iteration": iteration, + "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": p.metrics.TestPhase, + "scale_factor": p.config.Scale, + }, + }) + } + + // Calculate and store average latency + if totalTxns > 0 { + avgLatencyMs := float64(p.metrics.TotalLatency.Nanoseconds()) / float64(totalTxns) / 1000000.0 + + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: latencyAvgMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: avgLatencyMs, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, + Tags: map[string]interface{}{ + "metric_type": "cumulative_avg_latency", + "iteration": iteration, + "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": p.metrics.TestPhase, + "scale_factor": p.config.Scale, + }, + }) + } + + // Store individual transaction type counts + txTypes := map[string]int64{ + "new_order": p.metrics.NewOrderCount, + "payment": p.metrics.PaymentCount, + "order_status": p.metrics.OrderStatusCount, + "delivery": p.metrics.DeliveryCount, + "stock_level": p.metrics.StockLevelCount, + } + + for txType, count := range txTypes { + if count > 0 { + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: float64(count), + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, + Tags: map[string]interface{}{ + "metric_type": "transaction_count", + "iteration": iteration, + "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": p.metrics.TestPhase, + "scale_factor": p.config.Scale, + "transaction_type": txType, + }, + }) + } + } + } + + if len(results) > 0 { + p.logger.Debug("Saving background TPCC metrics", + core.Field{Key: "test_run_id", Value: testRunID}, + core.Field{Key: "result_count", Value: len(results)}, + core.Field{Key: "iteration", Value: iteration}) + + return p.core.Storage.StoreResults(ctx, results) + } + + return nil +} + // storeResults converts plugin metrics to core.TestResult and stores them in the database func (p *TPCCPlugin) storeResults(ctx context.Context) error { if p.core == nil || p.core.Storage == nil { @@ -1116,13 +1316,20 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { testRunID = 0 } + // Look up metric IDs from database instead of using hardcoded values + throughputMetric, err := p.core.Storage.GetMetric(ctx, "THROUGHPUT") + if err != nil { + return fmt.Errorf("failed to get THROUGHPUT metric: %w", err) + } + + latencyAvgMetric, err := p.core.Storage.GetMetric(ctx, "LATENCY_AVG") + if err != nil { + return fmt.Errorf("failed to get LATENCY_AVG metric: %w", err) + } + var results []core.TestResult now := time.Now() - // Get metric IDs (we should really look these up, but for now use hardcoded IDs) - transactionMetricID := 2 // TRANSACTION_RATE - latencyMetricID := 7 // LATENCY_AVG - p.metrics.mu.RLock() defer p.metrics.mu.RUnlock() @@ -1130,33 +1337,53 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { totalTxns := p.metrics.NewOrderCount + p.metrics.PaymentCount + p.metrics.OrderStatusCount + p.metrics.DeliveryCount + p.metrics.StockLevelCount + // Connection and worker info for final results + activeConnections := p.metrics.CurrentConnections + activeWorkers := 0 // No workers active at completion + if totalTxns > 0 { // Store total transaction rate results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: transactionMetricID, - StartTime: p.testStarted, - EndTime: now, - Value: float64(totalTxns), + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: float64(totalTxns), + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "connections": p.metrics.CurrentConnections, - "metric_type": "total_transactions", - "test_phase": p.metrics.TestPhase, + "metric_type": "total_transactions", + "iteration": nil, // Not applicable for final results + "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": "final_results", + "scale_factor": p.config.Scale, }, }) // Store average latency avgLatencyMs := float64(p.metrics.TotalLatency.Nanoseconds()) / float64(totalTxns) / 1000000.0 results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: latencyMetricID, - StartTime: p.testStarted, - EndTime: now, - Value: avgLatencyMs, + TestRunID: testRunID, + MetricID: latencyAvgMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: avgLatencyMs, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "connections": p.metrics.CurrentConnections, - "metric_type": "avg_latency_ms", - "test_phase": p.metrics.TestPhase, + "metric_type": "avg_latency_ms", + "iteration": nil, // Not applicable for final results + "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": "final_results", + "scale_factor": p.config.Scale, }, }) @@ -1172,22 +1399,36 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { for txType, count := range txTypes { if count > 0 { results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: transactionMetricID, - StartTime: p.testStarted, - EndTime: now, - Value: float64(count), + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: p.testStarted, + EndTime: now, + Value: float64(count), + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "connections": p.metrics.CurrentConnections, - "metric_type": "transaction_count", - "transaction_type": txType, - "test_phase": p.metrics.TestPhase, + "metric_type": "transaction_count", + "iteration": nil, // Not applicable for final results + "connections": p.metrics.CurrentConnections, + "batch_size": nil, // Not applicable for TPCC + "total_transactions": totalTxns, + "total_rows": nil, // Not applicable for TPCC + "batch_count": nil, // Not applicable for TPCC + "test_phase": "final_results", + "scale_factor": p.config.Scale, + "transaction_type": txType, }, }) } } } + p.logger.Info("Storing TPCC test results", + core.Field{Key: "test_run_id", Value: testRunID}, + core.Field{Key: "result_count", Value: len(results)}, + core.Field{Key: "total_transactions", Value: totalTxns}, + ) + // Store all results return p.core.Storage.StoreResults(ctx, results) }