Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 142 additions & 100 deletions plugins/bulk-load/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type BulkLoadPlugin struct {
testStarted time.Time
currentWorkers []*WorkerStats // Live worker stats for real-time metrics
workersMu sync.RWMutex // Protect access to currentWorkers
currentBatch int // Current batch size being tested
currentBatchMu sync.RWMutex // Protect access to currentBatch

// Previous metrics for delta calculation
prevTransactions int64 // Previous total transactions
prevRows int64 // Previous total rows
prevSaveTime time.Time // Previous save time for rate calculation
prevMetricsMu sync.RWMutex // Protect access to previous metrics
}

// BulkLoadConfig defines the configuration for bulk load tests
Expand Down Expand Up @@ -475,6 +483,18 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac
p.logger.Info("Test results stored successfully")
}

// Reset current batch tracking
p.currentBatchMu.Lock()
p.currentBatch = 0
p.currentBatchMu.Unlock()

// Reset previous metrics tracking
p.prevMetricsMu.Lock()
p.prevTransactions = 0
p.prevRows = 0
p.prevSaveTime = time.Time{}
p.prevMetricsMu.Unlock()

return nil
}

Expand Down Expand Up @@ -644,6 +664,18 @@ func (p *BulkLoadPlugin) runBatchTest(ctx context.Context, batchSize int) (*Batc
MinLatencyMs: float64(time.Hour.Milliseconds()), // Initialize with high value
}

// Set current batch size for background metrics
p.currentBatchMu.Lock()
p.currentBatch = batchSize
p.currentBatchMu.Unlock()

// Initialize previous metrics for delta calculation
p.prevMetricsMu.Lock()
p.prevTransactions = 0
p.prevRows = 0
p.prevSaveTime = time.Time{} // Reset to zero value
p.prevMetricsMu.Unlock()

// Warmup phase
p.logger.Info("Starting warmup phase", core.Field{Key: "duration", Value: p.config.WarmupTime})
warmupCtx, warmupCancel := context.WithTimeout(ctx, p.config.WarmupTime)
Expand Down Expand Up @@ -957,136 +989,149 @@ func (p *BulkLoadPlugin) saveCurrentMetrics(ctx context.Context, iteration int)
now := time.Now()
var results []core.TestResult

p.metrics.mu.RLock()
accumulatedTransactions := p.metrics.TotalTransactions
accumulatedRows := p.metrics.TotalRowsInserted
p.metrics.mu.RUnlock()
// Get current batch size and worker stats
p.currentBatchMu.RLock()
currentBatchSize := p.currentBatch
p.currentBatchMu.RUnlock()

// Get current live worker stats
p.workersMu.RLock()
currentWorkers := p.currentWorkers
activeWorkers := len(currentWorkers)
activeConnections := p.config.Connections // Current configured connections
activeConnections := p.config.Connections
p.workersMu.RUnlock()

// If no current batch is running, don't save metrics
if currentBatchSize == 0 || currentWorkers == nil {
return nil
}

// Calculate current live totals from active workers
var liveTransactions, liveRows int64
var totalLatency time.Duration
var operationCount int64

for _, worker := range currentWorkers {
liveTransactions += atomic.LoadInt64(&worker.Transactions)
liveRows += atomic.LoadInt64(&worker.RowsInserted)
if worker != nil {
transactions := atomic.LoadInt64(&worker.Transactions)
rows := atomic.LoadInt64(&worker.RowsInserted)

liveTransactions += transactions
liveRows += rows
totalLatency += worker.TotalLatency
operationCount += transactions
}
}

// Debug logging
p.logger.Debug("Background metrics check",
core.Field{Key: "save_iteration", Value: iteration},
core.Field{Key: "accumulated_transactions", Value: accumulatedTransactions},
core.Field{Key: "live_transactions", Value: liveTransactions},
core.Field{Key: "worker_count", Value: len(currentWorkers)},
)
// Get previous values for delta calculation
p.prevMetricsMu.Lock()
prevTransactions := p.prevTransactions
prevRows := p.prevRows
prevTime := p.prevSaveTime

// Total transactions = accumulated from completed batches + live from current batch
totalTransactions := accumulatedTransactions + liveTransactions
totalRows := accumulatedRows + liveRows
// Calculate deltas (incremental values since last save)
deltaTransactions := liveTransactions - prevTransactions
deltaRows := liveRows - prevRows
timeDelta := now.Sub(prevTime).Seconds()

// Save cumulative metrics as snapshot
if totalTransactions > 0 {
// Calculate elapsed time for rates
elapsed := now.Sub(p.metrics.StartTime).Seconds()
if elapsed > 0 {
transactionRate := float64(totalTransactions) / elapsed
rowRate := float64(totalRows) / elapsed
// Update previous values for next iteration
p.prevTransactions = liveTransactions
p.prevRows = liveRows
p.prevSaveTime = now
p.prevMetricsMu.Unlock()

// Store transaction rate
results = append(results, core.TestResult{
TestRunID: testRunID,
MetricID: throughputMetric.ID,
StartTime: p.metrics.StartTime,
EndTime: now,
Value: transactionRate,
ActiveConnections: &activeConnections,
ActiveWorkers: &activeWorkers,
Tags: map[string]interface{}{
"metric_type": "cumulative_transaction_rate",
"iteration": iteration,
"connections": p.config.Connections,
"batch_size": nil, // Not applicable for cumulative metrics
"total_transactions": totalTransactions,
"total_rows": totalRows,
"batch_count": len(p.metrics.BatchResults),
"test_phase": "measurement",
},
})
// Skip first iteration since we don't have previous values yet
if prevTime.IsZero() || timeDelta <= 0 {
return nil
}

// Only save if we have incremental data
if deltaTransactions > 0 && timeDelta > 0 {
// Calculate rates per second for this interval
transactionRate := float64(deltaTransactions) / timeDelta
rowRate := float64(deltaRows) / timeDelta

// Store incremental transaction rate
results = append(results, core.TestResult{
TestRunID: testRunID,
MetricID: throughputMetric.ID,
StartTime: prevTime,
EndTime: now,
Value: transactionRate,
ActiveConnections: &activeConnections,
ActiveWorkers: &activeWorkers,
Tags: map[string]interface{}{
"metric_type": "interval_transaction_rate",
"iteration": iteration,
"connections": p.config.Connections,
"batch_size": currentBatchSize,
"interval_transactions": deltaTransactions,
"interval_rows": deltaRows,
"interval_seconds": timeDelta,
"test_phase": "measurement",
},
})

// Store incremental row insertion rate
results = append(results, core.TestResult{
TestRunID: testRunID,
MetricID: rowInsertMetric.ID,
StartTime: prevTime,
EndTime: now,
Value: rowRate,
ActiveConnections: &activeConnections,
ActiveWorkers: &activeWorkers,
Tags: map[string]interface{}{
"metric_type": "interval_row_rate",
"iteration": iteration,
"connections": p.config.Connections,
"batch_size": currentBatchSize,
"interval_transactions": deltaTransactions,
"interval_rows": deltaRows,
"interval_seconds": timeDelta,
"test_phase": "measurement",
},
})

// Calculate average latency for this interval (using current running average)
if operationCount > 0 {
avgLatencyMs := float64(totalLatency.Nanoseconds()) / float64(operationCount) / 1000000.0

// Store row insertion rate
results = append(results, core.TestResult{
TestRunID: testRunID,
MetricID: rowInsertMetric.ID,
StartTime: p.metrics.StartTime,
MetricID: latencyAvgMetric.ID,
StartTime: prevTime,
EndTime: now,
Value: rowRate,
Value: avgLatencyMs,
ActiveConnections: &activeConnections,
ActiveWorkers: &activeWorkers,
Tags: map[string]interface{}{
"metric_type": "cumulative_row_rate",
"iteration": iteration,
"connections": p.config.Connections,
"batch_size": nil, // Not applicable for cumulative metrics
"total_transactions": totalTransactions,
"total_rows": totalRows,
"batch_count": len(p.metrics.BatchResults),
"test_phase": "measurement",
"metric_type": "interval_avg_latency",
"iteration": iteration,
"connections": p.config.Connections,
"batch_size": currentBatchSize,
"interval_transactions": deltaTransactions,
"interval_rows": deltaRows,
"interval_seconds": timeDelta,
"test_phase": "measurement",
},
})
}

// Calculate average latency from batch results
if len(p.metrics.BatchResults) > 0 {
var totalLatency float64
var totalOperations int64

for _, batch := range p.metrics.BatchResults {
totalLatency += batch.AvgLatencyMs * float64(batch.TotalTransactions)
totalOperations += batch.TotalTransactions
}

if totalOperations > 0 {
avgLatency := totalLatency / float64(totalOperations)

results = append(results, core.TestResult{
TestRunID: testRunID,
MetricID: latencyAvgMetric.ID,
StartTime: p.metrics.StartTime,
EndTime: now,
Value: avgLatency,
ActiveConnections: &activeConnections,
ActiveWorkers: &activeWorkers,
Tags: map[string]interface{}{
"metric_type": "cumulative_avg_latency",
"iteration": iteration,
"connections": p.config.Connections,
"batch_size": nil, // Not applicable for cumulative metrics
"total_transactions": totalTransactions,
"total_rows": totalRows,
"batch_count": len(p.metrics.BatchResults),
"test_phase": "measurement",
},
})
}
}
}

if len(results) > 0 {
p.logger.Debug("Saving background metrics",
p.logger.Debug("Saving interval metrics",
core.Field{Key: "test_run_id", Value: testRunID},
core.Field{Key: "result_count", Value: len(results)},
core.Field{Key: "iteration", Value: iteration})
core.Field{Key: "iteration", Value: iteration},
core.Field{Key: "batch_size", Value: currentBatchSize},
core.Field{Key: "delta_transactions", Value: deltaTransactions},
core.Field{Key: "interval_seconds", Value: timeDelta})

return p.core.Storage.StoreResults(ctx, results)
}

return nil
}

// storeResults converts plugin metrics to core.TestResult and stores them in the database
} // storeResults converts plugin metrics to core.TestResult and stores them in the database
func (p *BulkLoadPlugin) storeResults(ctx context.Context) error {
if p.core == nil || p.core.Storage == nil {
return fmt.Errorf("core services not available")
Expand Down Expand Up @@ -1146,7 +1191,6 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error {
"batch_size": batch.BatchSize,
"total_transactions": batch.TotalTransactions,
"total_rows": batch.TotalRowsInserted,
"batch_count": len(p.metrics.BatchResults),
"test_phase": "final_results",
},
})
Expand All @@ -1167,7 +1211,6 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error {
"batch_size": batch.BatchSize,
"total_transactions": batch.TotalTransactions,
"total_rows": batch.TotalRowsInserted,
"batch_count": len(p.metrics.BatchResults),
"test_phase": "final_results",
},
})
Expand All @@ -1188,7 +1231,6 @@ func (p *BulkLoadPlugin) storeResults(ctx context.Context) error {
"batch_size": batch.BatchSize,
"total_transactions": batch.TotalTransactions,
"total_rows": batch.TotalRowsInserted,
"batch_count": len(p.metrics.BatchResults),
"test_phase": "final_results",
},
})
Expand Down
Loading
Loading