Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ api:
port: 9090

logging:
level: "info"
level: "debug"
format: "text" # "text" or "json"aa
output: "stdout" # "stdout", "stderr", or file path

Expand Down
66 changes: 55 additions & 11 deletions core/storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,16 @@
// StoreResults stores test results in batch
func (m *Manager) StoreResults(ctx context.Context, results []core.TestResult) error {
if len(results) == 0 {
m.logger.Warn("StoreResults called with empty results array")
return nil
}

m.logger.Info("StoreResults called",
core.Field{Key: "result_count", Value: len(results)},
core.Field{Key: "first_test_run_id", Value: results[0].TestRunID},
core.Field{Key: "first_metric_id", Value: results[0].MetricID},
)

db, err := m.db.GetConnection(ctx)
if err != nil {
return fmt.Errorf("failed to get database connection: %w", err)
Expand All @@ -327,13 +334,13 @@
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()

Check failure on line 337 in core/storage/manager.go

View workflow job for this annotation

GitHub Actions / Test

Error return value of `tx.Rollback` is not checked (errcheck)

// Prepare insert statement
// Prepare insert statement with new connection and worker fields
query := `
INSERT INTO test_run_result
(test_run_id, test_metric_id, start_time, end_time, value, tags)
VALUES ($1, $2, $3, $4, $5, $6)
(test_run_id, test_metric_id, start_time, end_time, value, tags, active_connections, active_workers)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`
stmt, err := tx.PrepareContext(ctx, query)
if err != nil {
Expand All @@ -342,32 +349,42 @@
defer stmt.Close()

// Insert each result
for _, result := range results {
for i, result := range results {
tagsJSON, err := json.Marshal(result.Tags)
if err != nil {
return fmt.Errorf("failed to marshal tags: %w", err)
return fmt.Errorf("failed to marshal tags for result %d: %w", i, err)
}

m.logger.Debug("Inserting result",
core.Field{Key: "index", Value: i},
core.Field{Key: "test_run_id", Value: result.TestRunID},
core.Field{Key: "metric_id", Value: result.MetricID},
core.Field{Key: "value", Value: result.Value},
core.Field{Key: "active_connections", Value: result.ActiveConnections},
core.Field{Key: "active_workers", Value: result.ActiveWorkers},
)

_, err = stmt.ExecContext(ctx,
result.TestRunID,
result.MetricID,
result.StartTime,
result.EndTime,
result.Value,
tagsJSON,
result.ActiveConnections,
result.ActiveWorkers,
)
if err != nil {
return fmt.Errorf("failed to insert result: %w", err)
return fmt.Errorf("failed to insert result %d: %w", i, err)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

m.logger.Info("test results stored",
core.Field{Key: "result_count", Value: len(results)},
core.Field{Key: "test_run_id", Value: results[0].TestRunID},
m.logger.Info("Successfully stored results to database",
core.Field{Key: "stored_count", Value: len(results)},
)

return nil
Expand All @@ -376,7 +393,7 @@
// GetResults retrieves results for a test run
func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResult, error) {
query := `
SELECT id, test_run_id, test_metric_id, start_time, end_time, value, tags
SELECT id, test_run_id, test_metric_id, start_time, end_time, value, tags, active_connections, active_workers
FROM test_run_result
WHERE test_run_id = $1
ORDER BY start_time
Expand All @@ -397,6 +414,7 @@
for rows.Next() {
var result core.TestResult
var tagsJSON []byte
var activeConnections, activeWorkers sql.NullInt32

err := rows.Scan(
&result.ID,
Expand All @@ -406,6 +424,8 @@
&result.EndTime,
&result.Value,
&tagsJSON,
&activeConnections,
&activeWorkers,
)
if err != nil {
return nil, fmt.Errorf("failed to scan result: %w", err)
Expand All @@ -418,6 +438,16 @@
}
}

// Handle nullable connection and worker counts
if activeConnections.Valid {
connections := int(activeConnections.Int32)
result.ActiveConnections = &connections
}
if activeWorkers.Valid {
workers := int(activeWorkers.Int32)
result.ActiveWorkers = &workers
}

results = append(results, result)
}

Expand All @@ -427,7 +457,7 @@
// GetResultsByMetric retrieves recent results for a specific metric
func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, limit int) ([]core.TestResult, error) {
query := `
SELECT trr.id, trr.test_run_id, trr.test_metric_id, trr.start_time, trr.end_time, trr.value, trr.tags
SELECT trr.id, trr.test_run_id, trr.test_metric_id, trr.start_time, trr.end_time, trr.value, trr.tags, trr.active_connections, trr.active_workers
FROM test_run_result trr
JOIN test_metric tm ON trr.test_metric_id = tm.id
WHERE tm.code = $1
Expand All @@ -450,6 +480,7 @@
for rows.Next() {
var result core.TestResult
var tagsJSON []byte
var activeConnections, activeWorkers sql.NullInt32

err := rows.Scan(
&result.ID,
Expand All @@ -459,6 +490,8 @@
&result.EndTime,
&result.Value,
&tagsJSON,
&activeConnections,
&activeWorkers,
)
if err != nil {
return nil, fmt.Errorf("failed to scan result: %w", err)
Expand All @@ -469,6 +502,17 @@
return nil, fmt.Errorf("failed to unmarshal tags: %w", err)
}
}

// Handle nullable connection and worker counts
if activeConnections.Valid {
connections := int(activeConnections.Int32)
result.ActiveConnections = &connections
}
if activeWorkers.Valid {
workers := int(activeWorkers.Int32)
result.ActiveWorkers = &workers
}

results = append(results, result)
}

Expand Down
16 changes: 9 additions & 7 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,15 @@ type TestRun struct {

// TestResult represents a single measurement from a test execution
type TestResult struct {
ID int64 `json:"id"`
TestRunID int64 `json:"test_run_id"`
MetricID int `json:"metric_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Value float64 `json:"value"`
Tags map[string]interface{} `json:"tags,omitempty"` // Flexible metadata, stored as JSONB
ID int64 `json:"id"`
TestRunID int64 `json:"test_run_id"`
MetricID int `json:"metric_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Value float64 `json:"value"`
Tags map[string]interface{} `json:"tags,omitempty"` // Flexible metadata, stored as JSONB
ActiveConnections *int `json:"active_connections,omitempty"` // Number of active database connections
ActiveWorkers *int `json:"active_workers,omitempty"` // Number of active worker threads/processes
}

// TestType represents a category of tests (e.g., bulk_insert, read_latency)
Expand Down
17 changes: 17 additions & 0 deletions migrations/004_add_connection_worker_tracking.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Migration 004: Add active connections and workers tracking to test_run_result

-- Add columns to track the number of active connections and workers at the time each metric was saved
ALTER TABLE test_run_result
ADD COLUMN IF NOT EXISTS active_connections INTEGER,
ADD COLUMN IF NOT EXISTS active_workers INTEGER;

-- Create indexes for querying by connection and worker counts
CREATE INDEX IF NOT EXISTS idx_test_run_result_active_connections ON test_run_result(active_connections);
CREATE INDEX IF NOT EXISTS idx_test_run_result_active_workers ON test_run_result(active_workers);

-- Create a compound index for metrics with connection/worker analysis
CREATE INDEX IF NOT EXISTS idx_test_run_result_conn_workers ON test_run_result(test_run_id, active_connections, active_workers);

-- Add comments to clarify the purpose of these fields
COMMENT ON COLUMN test_run_result.active_connections IS 'Number of database connections active when this metric was recorded';
COMMENT ON COLUMN test_run_result.active_workers IS 'Number of worker threads/processes active when this metric was recorded';
8 changes: 4 additions & 4 deletions plugins/bulk-load/API_EXAMPLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ curl -X POST "http://localhost:8080/test-runs" \
"config": {
"host": "localhost",
"port": 5432,
"database": "stormdb_test",
"username": "stormdb",
"password": "stormdb_password",
"database": "stormdb",
"username": "postgres",
"password": "postgres",
"ssl_mode": "disable",
"batch_sizes": [1, 100, 1000],
"batch_sizes": [1, 100],
"connections": 10,
"duration": "2m",
"warmup_time": "10s",
Expand Down
Loading
Loading