diff --git a/plugins/bulk-load/plugin.go b/plugins/bulk-load/plugin.go index 9f96f67..81f55c6 100644 --- a/plugins/bulk-load/plugin.go +++ b/plugins/bulk-load/plugin.go @@ -29,6 +29,14 @@ type BulkLoadPlugin struct { testStarted time.Time currentWorkers []*WorkerStats // Live worker stats for real-time metrics workersMu sync.RWMutex // Protect access to currentWorkers + currentBatch int // Current batch size being tested + currentBatchMu sync.RWMutex // Protect access to currentBatch + + // Previous metrics for delta calculation + prevTransactions int64 // Previous total transactions + prevRows int64 // Previous total rows + prevSaveTime time.Time // Previous save time for rate calculation + prevMetricsMu sync.RWMutex // Protect access to previous metrics } // BulkLoadConfig defines the configuration for bulk load tests @@ -475,6 +483,18 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac p.logger.Info("Test results stored successfully") } + // Reset current batch tracking + p.currentBatchMu.Lock() + p.currentBatch = 0 + p.currentBatchMu.Unlock() + + // Reset previous metrics tracking + p.prevMetricsMu.Lock() + p.prevTransactions = 0 + p.prevRows = 0 + p.prevSaveTime = time.Time{} + p.prevMetricsMu.Unlock() + return nil } @@ -644,6 +664,18 @@ func (p *BulkLoadPlugin) runBatchTest(ctx context.Context, batchSize int) (*Batc MinLatencyMs: float64(time.Hour.Milliseconds()), // Initialize with high value } + // Set current batch size for background metrics + p.currentBatchMu.Lock() + p.currentBatch = batchSize + p.currentBatchMu.Unlock() + + // Initialize previous metrics for delta calculation + p.prevMetricsMu.Lock() + p.prevTransactions = 0 + p.prevRows = 0 + p.prevSaveTime = time.Time{} // Reset to zero value + p.prevMetricsMu.Unlock() + // Warmup phase p.logger.Info("Starting warmup phase", core.Field{Key: "duration", Value: p.config.WarmupTime}) warmupCtx, warmupCancel := context.WithTimeout(ctx, p.config.WarmupTime) @@ -957,136 +989,149 @@ func (p *BulkLoadPlugin) saveCurrentMetrics(ctx context.Context, iteration int) now := time.Now() var results []core.TestResult - p.metrics.mu.RLock() - accumulatedTransactions := p.metrics.TotalTransactions - accumulatedRows := p.metrics.TotalRowsInserted - p.metrics.mu.RUnlock() + // Get current batch size and worker stats + p.currentBatchMu.RLock() + currentBatchSize := p.currentBatch + p.currentBatchMu.RUnlock() - // Get current live worker stats p.workersMu.RLock() currentWorkers := p.currentWorkers activeWorkers := len(currentWorkers) - activeConnections := p.config.Connections // Current configured connections + activeConnections := p.config.Connections p.workersMu.RUnlock() + // If no current batch is running, don't save metrics + if currentBatchSize == 0 || currentWorkers == nil { + return nil + } + + // Calculate current live totals from active workers var liveTransactions, liveRows int64 + var totalLatency time.Duration + var operationCount int64 + for _, worker := range currentWorkers { - liveTransactions += atomic.LoadInt64(&worker.Transactions) - liveRows += atomic.LoadInt64(&worker.RowsInserted) + if worker != nil { + transactions := atomic.LoadInt64(&worker.Transactions) + rows := atomic.LoadInt64(&worker.RowsInserted) + + liveTransactions += transactions + liveRows += rows + totalLatency += worker.TotalLatency + operationCount += transactions + } } - // Debug logging - p.logger.Debug("Background metrics check", - core.Field{Key: "save_iteration", Value: iteration}, - core.Field{Key: "accumulated_transactions", Value: accumulatedTransactions}, - core.Field{Key: "live_transactions", Value: liveTransactions}, - core.Field{Key: "worker_count", Value: len(currentWorkers)}, - ) + // Get previous values for delta calculation + p.prevMetricsMu.Lock() + prevTransactions := p.prevTransactions + prevRows := p.prevRows + prevTime := p.prevSaveTime - // Total transactions = accumulated from completed batches + live from current batch - totalTransactions := accumulatedTransactions + liveTransactions - totalRows := accumulatedRows + liveRows + // Calculate deltas (incremental values since last save) + deltaTransactions := liveTransactions - prevTransactions + deltaRows := liveRows - prevRows + timeDelta := now.Sub(prevTime).Seconds() - // Save cumulative metrics as snapshot - if totalTransactions > 0 { - // Calculate elapsed time for rates - elapsed := now.Sub(p.metrics.StartTime).Seconds() - if elapsed > 0 { - transactionRate := float64(totalTransactions) / elapsed - rowRate := float64(totalRows) / elapsed + // Update previous values for next iteration + p.prevTransactions = liveTransactions + p.prevRows = liveRows + p.prevSaveTime = now + p.prevMetricsMu.Unlock() - // Store transaction rate - results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: throughputMetric.ID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: transactionRate, - ActiveConnections: &activeConnections, - ActiveWorkers: &activeWorkers, - Tags: map[string]interface{}{ - "metric_type": "cumulative_transaction_rate", - "iteration": iteration, - "connections": p.config.Connections, - "batch_size": nil, // Not applicable for cumulative metrics - "total_transactions": totalTransactions, - "total_rows": totalRows, - "batch_count": len(p.metrics.BatchResults), - "test_phase": "measurement", - }, - }) + // Skip first iteration since we don't have previous values yet + if prevTime.IsZero() || timeDelta <= 0 { + return nil + } + + // Only save if we have incremental data + if deltaTransactions > 0 && timeDelta > 0 { + // Calculate rates per second for this interval + transactionRate := float64(deltaTransactions) / timeDelta + rowRate := float64(deltaRows) / timeDelta + + // Store incremental transaction rate + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: prevTime, + EndTime: now, + Value: transactionRate, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, + Tags: map[string]interface{}{ + "metric_type": "interval_transaction_rate", + "iteration": iteration, + "connections": p.config.Connections, + "batch_size": currentBatchSize, + "interval_transactions": deltaTransactions, + "interval_rows": deltaRows, + "interval_seconds": timeDelta, + "test_phase": "measurement", + }, + }) + + // Store incremental row insertion rate + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: rowInsertMetric.ID, + StartTime: prevTime, + EndTime: now, + Value: rowRate, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, + Tags: map[string]interface{}{ + "metric_type": "interval_row_rate", + "iteration": iteration, + "connections": p.config.Connections, + "batch_size": currentBatchSize, + "interval_transactions": deltaTransactions, + "interval_rows": deltaRows, + "interval_seconds": timeDelta, + "test_phase": "measurement", + }, + }) + + // Calculate average latency for this interval (using current running average) + if operationCount > 0 { + avgLatencyMs := float64(totalLatency.Nanoseconds()) / float64(operationCount) / 1000000.0 - // Store row insertion rate results = append(results, core.TestResult{ TestRunID: testRunID, - MetricID: rowInsertMetric.ID, - StartTime: p.metrics.StartTime, + MetricID: latencyAvgMetric.ID, + StartTime: prevTime, EndTime: now, - Value: rowRate, + Value: avgLatencyMs, ActiveConnections: &activeConnections, ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "metric_type": "cumulative_row_rate", - "iteration": iteration, - "connections": p.config.Connections, - "batch_size": nil, // Not applicable for cumulative metrics - "total_transactions": totalTransactions, - "total_rows": totalRows, - "batch_count": len(p.metrics.BatchResults), - "test_phase": "measurement", + "metric_type": "interval_avg_latency", + "iteration": iteration, + "connections": p.config.Connections, + "batch_size": currentBatchSize, + "interval_transactions": deltaTransactions, + "interval_rows": deltaRows, + "interval_seconds": timeDelta, + "test_phase": "measurement", }, }) } - - // Calculate average latency from batch results - if len(p.metrics.BatchResults) > 0 { - var totalLatency float64 - var totalOperations int64 - - for _, batch := range p.metrics.BatchResults { - totalLatency += batch.AvgLatencyMs * float64(batch.TotalTransactions) - totalOperations += batch.TotalTransactions - } - - if totalOperations > 0 { - avgLatency := totalLatency / float64(totalOperations) - - results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: latencyAvgMetric.ID, - StartTime: p.metrics.StartTime, - EndTime: now, - Value: avgLatency, - ActiveConnections: &activeConnections, - ActiveWorkers: &activeWorkers, - Tags: map[string]interface{}{ - "metric_type": "cumulative_avg_latency", - "iteration": iteration, - "connections": p.config.Connections, - "batch_size": nil, // Not applicable for cumulative metrics - "total_transactions": totalTransactions, - "total_rows": totalRows, - "batch_count": len(p.metrics.BatchResults), - "test_phase": "measurement", - }, - }) - } - } } if len(results) > 0 { - p.logger.Debug("Saving background metrics", + p.logger.Debug("Saving interval metrics", core.Field{Key: "test_run_id", Value: testRunID}, core.Field{Key: "result_count", Value: len(results)}, - core.Field{Key: "iteration", Value: iteration}) + core.Field{Key: "iteration", Value: iteration}, + core.Field{Key: "batch_size", Value: currentBatchSize}, + core.Field{Key: "delta_transactions", Value: deltaTransactions}, + core.Field{Key: "interval_seconds", Value: timeDelta}) return p.core.Storage.StoreResults(ctx, results) } return nil -} - -// storeResults converts plugin metrics to core.TestResult and stores them in the database +} // storeResults converts plugin metrics to core.TestResult and stores them in the database func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { if p.core == nil || p.core.Storage == nil { return fmt.Errorf("core services not available") @@ -1146,7 +1191,6 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { "batch_size": batch.BatchSize, "total_transactions": batch.TotalTransactions, "total_rows": batch.TotalRowsInserted, - "batch_count": len(p.metrics.BatchResults), "test_phase": "final_results", }, }) @@ -1167,7 +1211,6 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { "batch_size": batch.BatchSize, "total_transactions": batch.TotalTransactions, "total_rows": batch.TotalRowsInserted, - "batch_count": len(p.metrics.BatchResults), "test_phase": "final_results", }, }) @@ -1188,7 +1231,6 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error { "batch_size": batch.BatchSize, "total_transactions": batch.TotalTransactions, "total_rows": batch.TotalRowsInserted, - "batch_count": len(p.metrics.BatchResults), "test_phase": "final_results", }, }) diff --git a/plugins/tpcc-scalability/plugin.go b/plugins/tpcc-scalability/plugin.go index 3b71455..ca133f4 100644 --- a/plugins/tpcc-scalability/plugin.go +++ b/plugins/tpcc-scalability/plugin.go @@ -26,6 +26,11 @@ type TPCCPlugin struct { wg sync.WaitGroup metrics *TPCCMetrics testStarted time.Time + + // Previous metrics for delta calculation + prevTotalTxns int64 // Previous total transactions + prevSaveTime time.Time // Previous save time for rate calculation + prevMetricsMu sync.RWMutex // Protect access to previous metrics } // TPCCConfig defines the configuration for TPC-C scalability tests @@ -1040,6 +1045,12 @@ func (p *TPCCPlugin) resetMetrics() { p.metrics.TotalLatency = 0 p.metrics.MinLatency = time.Hour p.metrics.MaxLatency = 0 + + // Reset previous metrics for delta calculation + p.prevMetricsMu.Lock() + p.prevTotalTxns = 0 + p.prevSaveTime = time.Time{} // Reset to zero value + p.prevMetricsMu.Unlock() } func (p *TPCCPlugin) getMetricID(txType string) int { @@ -1163,7 +1174,7 @@ func (p *TPCCPlugin) backgroundMetricsSaver(ctx context.Context, done chan<- str } } -// saveCurrentTPCCMetrics saves current accumulated metrics to database +// saveCurrentTPCCMetrics saves incremental/delta metrics to database func (p *TPCCPlugin) saveCurrentTPCCMetrics(ctx context.Context, iteration int) error { if p.core == nil || p.core.Storage == nil { return fmt.Errorf("core services not available") @@ -1190,112 +1201,93 @@ func (p *TPCCPlugin) saveCurrentTPCCMetrics(ctx context.Context, iteration int) var results []core.TestResult p.metrics.mu.RLock() - defer p.metrics.mu.RUnlock() - - // Calculate total transactions - totalTxns := p.metrics.NewOrderCount + p.metrics.PaymentCount + + // Calculate current total transactions + currentTotalTxns := p.metrics.NewOrderCount + p.metrics.PaymentCount + p.metrics.OrderStatusCount + p.metrics.DeliveryCount + p.metrics.StockLevelCount // Get connection and worker info activeConnections := p.metrics.CurrentConnections activeWorkers := activeConnections // For TPCC, number of workers typically equals connections + testPhase := p.metrics.TestPhase + totalLatency := p.metrics.TotalLatency + p.metrics.mu.RUnlock() + + // Get previous values for delta calculation + p.prevMetricsMu.Lock() + prevTotalTxns := p.prevTotalTxns + prevTime := p.prevSaveTime + + // Calculate deltas (incremental values since last save) + deltaTxns := currentTotalTxns - prevTotalTxns + timeDelta := now.Sub(prevTime).Seconds() + + // Update previous values for next iteration + p.prevTotalTxns = currentTotalTxns + p.prevSaveTime = now + p.prevMetricsMu.Unlock() + + // Skip first iteration since we don't have previous values yet + if prevTime.IsZero() || timeDelta <= 0 { + return nil + } - if totalTxns > 0 { - // Calculate elapsed time for rates - elapsed := now.Sub(p.testStarted).Seconds() - if elapsed > 0 { - transactionRate := float64(totalTxns) / elapsed + // Only save if we have incremental data + if deltaTxns > 0 && timeDelta > 0 { + // Calculate rate per second for this interval + transactionRate := float64(deltaTxns) / timeDelta - // Store transaction rate - results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: throughputMetric.ID, - StartTime: p.testStarted, - EndTime: now, - Value: transactionRate, - ActiveConnections: &activeConnections, - ActiveWorkers: &activeWorkers, - Tags: map[string]interface{}{ - "metric_type": "cumulative_tps", - "iteration": iteration, - "connections": p.metrics.CurrentConnections, - "batch_size": nil, // Not applicable for TPCC - "total_transactions": totalTxns, - "total_rows": nil, // Not applicable for TPCC - "batch_count": nil, // Not applicable for TPCC - "test_phase": p.metrics.TestPhase, - "scale_factor": p.config.Scale, - }, - }) - } + // Store incremental transaction rate + results = append(results, core.TestResult{ + TestRunID: testRunID, + MetricID: throughputMetric.ID, + StartTime: prevTime, + EndTime: now, + Value: transactionRate, + ActiveConnections: &activeConnections, + ActiveWorkers: &activeWorkers, + Tags: map[string]interface{}{ + "metric_type": "interval_tps", + "iteration": iteration, + "connections": activeConnections, + "interval_transactions": deltaTxns, + "interval_seconds": timeDelta, + "test_phase": testPhase, + "scale_factor": p.config.Scale, + }, + }) - // Calculate and store average latency - if totalTxns > 0 { - avgLatencyMs := float64(p.metrics.TotalLatency.Nanoseconds()) / float64(totalTxns) / 1000000.0 + // Calculate and store current average latency (using running average) + if currentTotalTxns > 0 { + avgLatencyMs := float64(totalLatency.Nanoseconds()) / float64(currentTotalTxns) / 1000000.0 results = append(results, core.TestResult{ TestRunID: testRunID, MetricID: latencyAvgMetric.ID, - StartTime: p.testStarted, + StartTime: prevTime, EndTime: now, Value: avgLatencyMs, ActiveConnections: &activeConnections, ActiveWorkers: &activeWorkers, Tags: map[string]interface{}{ - "metric_type": "cumulative_avg_latency", - "iteration": iteration, - "connections": p.metrics.CurrentConnections, - "batch_size": nil, // Not applicable for TPCC - "total_transactions": totalTxns, - "total_rows": nil, // Not applicable for TPCC - "batch_count": nil, // Not applicable for TPCC - "test_phase": p.metrics.TestPhase, - "scale_factor": p.config.Scale, + "metric_type": "interval_avg_latency", + "iteration": iteration, + "connections": activeConnections, + "interval_transactions": deltaTxns, + "interval_seconds": timeDelta, + "test_phase": testPhase, + "scale_factor": p.config.Scale, }, }) } - - // Store individual transaction type counts - txTypes := map[string]int64{ - "new_order": p.metrics.NewOrderCount, - "payment": p.metrics.PaymentCount, - "order_status": p.metrics.OrderStatusCount, - "delivery": p.metrics.DeliveryCount, - "stock_level": p.metrics.StockLevelCount, - } - - for txType, count := range txTypes { - if count > 0 { - results = append(results, core.TestResult{ - TestRunID: testRunID, - MetricID: throughputMetric.ID, - StartTime: p.testStarted, - EndTime: now, - Value: float64(count), - ActiveConnections: &activeConnections, - ActiveWorkers: &activeWorkers, - Tags: map[string]interface{}{ - "metric_type": "transaction_count", - "iteration": iteration, - "connections": p.metrics.CurrentConnections, - "batch_size": nil, // Not applicable for TPCC - "total_transactions": totalTxns, - "total_rows": nil, // Not applicable for TPCC - "batch_count": nil, // Not applicable for TPCC - "test_phase": p.metrics.TestPhase, - "scale_factor": p.config.Scale, - "transaction_type": txType, - }, - }) - } - } } if len(results) > 0 { - p.logger.Debug("Saving background TPCC metrics", + p.logger.Debug("Saving interval TPCC metrics", core.Field{Key: "test_run_id", Value: testRunID}, core.Field{Key: "result_count", Value: len(results)}, - core.Field{Key: "iteration", Value: iteration}) + core.Field{Key: "iteration", Value: iteration}, + core.Field{Key: "delta_transactions", Value: deltaTxns}, + core.Field{Key: "interval_seconds", Value: timeDelta}) return p.core.Storage.StoreResults(ctx, results) } @@ -1355,10 +1347,7 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { "metric_type": "total_transactions", "iteration": nil, // Not applicable for final results "connections": p.metrics.CurrentConnections, - "batch_size": nil, // Not applicable for TPCC "total_transactions": totalTxns, - "total_rows": nil, // Not applicable for TPCC - "batch_count": nil, // Not applicable for TPCC "test_phase": "final_results", "scale_factor": p.config.Scale, }, @@ -1378,10 +1367,7 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { "metric_type": "avg_latency_ms", "iteration": nil, // Not applicable for final results "connections": p.metrics.CurrentConnections, - "batch_size": nil, // Not applicable for TPCC "total_transactions": totalTxns, - "total_rows": nil, // Not applicable for TPCC - "batch_count": nil, // Not applicable for TPCC "test_phase": "final_results", "scale_factor": p.config.Scale, }, @@ -1410,10 +1396,7 @@ func (p *TPCCPlugin) storeResults(ctx context.Context) error { "metric_type": "transaction_count", "iteration": nil, // Not applicable for final results "connections": p.metrics.CurrentConnections, - "batch_size": nil, // Not applicable for TPCC "total_transactions": totalTxns, - "total_rows": nil, // Not applicable for TPCC - "batch_count": nil, // Not applicable for TPCC "test_phase": "final_results", "scale_factor": p.config.Scale, "transaction_type": txType,