From 2ccdb18650df666d1eb0fa82a92c52c21c8a6d2b Mon Sep 17 00:00:00 2001 From: Charly Batista Date: Wed, 6 Aug 2025 00:58:20 -0400 Subject: [PATCH] feat: Implement 'rebuild' command and refactor core components This commit introduces several major changes: - **New 'rebuild' command**: A 'rebuild' boolean flag can now be passed in the test run configuration. When true, the target test database is dropped and recreated before the test begins. This is implemented in the API layer and supported by both the 'bulk-load' and 'tpcc-scalability' plugins. - **Core Component Refactoring**: - **Scheduler**: The scheduler is now enabled by default and is the sole component responsible for creating test runs. It correctly resolves plugin and test type IDs from the database, ensuring foreign key constraints are met. - **Plugin Manager**: The plugin manager now automatically registers a plugin's declared test types upon loading. It also ensures the in-memory plugin metadata is updated with the database-assigned ID. - **Storage**: The storage layer was updated to handle the new schema. - **Database Schema**: - The 'plugin_ver' column was removed from the 'test_run' table, as this information is now correctly linked via the 'plugin' table. - A new migration was added to handle the schema change, including dropping and recreating dependent views. - **Build System**: - Removed nested Go modules from the plugin directories to resolve dependency conflicts. - Updated plugin Makefiles to build them as part of the main project module. --- config/core.yaml | 1 + core/api/server.go | 30 ++++- core/go.mod | 16 --- core/plugin/manager.go | 82 ++++++++++-- core/scheduler/manager.go | 122 +++++++++++++----- core/storage/manager.go | 32 +++-- core/types.go | 22 ++-- go.mod | 12 +- migrations/001_core_schema.sql | 7 +- .../002_remove_plugin_ver_from_test_run.sql | 58 +++++++++ plugins/bulk-load/Makefile | 2 +- plugins/bulk-load/go.mod | 10 -- plugins/bulk-load/go.sum | 2 - plugins/bulk-load/plugin.go | 58 +++++++++ plugins/tpcc-scalability/Makefile | 2 +- plugins/tpcc-scalability/go.mod | 10 -- plugins/tpcc-scalability/go.sum | 2 - plugins/tpcc-scalability/plugin.go | 95 +++++++++----- run_tests.sh | 4 +- 19 files changed, 402 insertions(+), 165 deletions(-) delete mode 100644 core/go.mod create mode 100644 migrations/002_remove_plugin_ver_from_test_run.sql delete mode 100644 plugins/bulk-load/go.mod delete mode 100644 plugins/bulk-load/go.sum delete mode 100644 plugins/tpcc-scalability/go.mod delete mode 100644 plugins/tpcc-scalability/go.sum diff --git a/config/core.yaml b/config/core.yaml index ccbe6ef..f10a840 100644 --- a/config/core.yaml +++ b/config/core.yaml @@ -23,6 +23,7 @@ logging: output: "stdout" # "stdout", "stderr", or file path scheduler: + enabled: true worker_pool_size: 10 plugin_dirs: diff --git a/core/api/server.go b/core/api/server.go index b7fd565..33f2245 100644 --- a/core/api/server.go +++ b/core/api/server.go @@ -169,8 +169,9 @@ func (s *Server) handleListPlugins(w http.ResponseWriter, r *http.Request) { func (s *Server) handleGetPlugin(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) name := vars["name"] + version := r.URL.Query().Get("version") // Get version from query param - plugin, err := s.coreServices.Plugin.GetPlugin(name) + plugin, err := s.coreServices.Plugin.GetPlugin(name, version) if err != nil { s.writeErrorResponse(w, http.StatusNotFound, "plugin not found", err) return @@ -205,10 +206,12 @@ func (s *Server) handleReloadPlugins(w http.ResponseWriter, r *http.Request) { // Create test run endpoint func (s *Server) handleCreateTestRun(w http.ResponseWriter, r *http.Request) { var req struct { - PluginName string `json:"plugin_name"` - Name string `json:"name"` - Description string `json:"description"` - Config map[string]interface{} `json:"config"` + PluginName string `json:"plugin_name"` + PluginVersion string `json:"plugin_version"` // Optional + Name string `json:"name"` + Description string `json:"description"` + Config map[string]interface{} `json:"config"` + Rebuild bool `json:"rebuild"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -222,9 +225,22 @@ func (s *Server) handleCreateTestRun(w http.ResponseWriter, r *http.Request) { return } - // Schedule test + // Get the specific plugin version if provided, otherwise get the latest + plugin, err := s.coreServices.Plugin.GetPlugin(req.PluginName, req.PluginVersion) + if err != nil { + s.writeErrorResponse(w, http.StatusBadRequest, "plugin not found", err) + return + } + + // Add rebuild flag to config + if req.Config == nil { + req.Config = make(map[string]interface{}) + } + req.Config["rebuild"] = req.Rebuild + + // Schedule test with the specific plugin instance ctx := r.Context() - runID, err := s.coreServices.Scheduler.ScheduleTest(ctx, req.PluginName, req.Config) + runID, err := s.coreServices.Scheduler.ScheduleTest(ctx, plugin, req.Config) if err != nil { s.writeErrorResponse(w, http.StatusInternalServerError, "failed to schedule test", err) return diff --git a/core/go.mod b/core/go.mod deleted file mode 100644 index 6f4b031..0000000 --- a/core/go.mod +++ /dev/null @@ -1,16 +0,0 @@ -module github.com/elchinoo/stormdb/core - -go 1.21 - -require ( - github.com/gorilla/mux v1.8.1 - github.com/jackc/pgx/v5 v5.7.1 - gopkg.in/yaml.v3 v3.0.1 -) - -require ( - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - golang.org/x/crypto v0.31.0 // indirect - golang.org/x/text v0.21.0 // indirect -) diff --git a/core/plugin/manager.go b/core/plugin/manager.go index 2ab2bd7..61cdc74 100644 --- a/core/plugin/manager.go +++ b/core/plugin/manager.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "plugin" + "strconv" "strings" "sync" @@ -94,12 +95,28 @@ func (m *Manager) LoadPlugin(path string) (core.Plugin, error) { // Register in storage if m.storage != nil { - _, err := m.storage.RegisterPlugin(context.Background(), metadata) + pluginID, err := m.storage.RegisterPlugin(context.Background(), metadata) if err != nil { m.logger.Warn("failed to register plugin in storage", core.Field{Key: "plugin", Value: metadata.Name}, core.Field{Key: "error", Value: err.Error()}, ) + } else { + // After registering the plugin, also register its declared test types + for _, testTypeCode := range metadata.TestTypes { + // A more robust implementation might fetch name/description from a central registry + // For now, we use the code as the name and provide a default description. + _, err := m.storage.RegisterTestType(context.Background(), testTypeCode, testTypeCode, "Auto-registered test type for plugin "+metadata.Name) + if err != nil { + m.logger.Warn("failed to auto-register test type for plugin", + core.Field{Key: "plugin", Value: metadata.Name}, + core.Field{Key: "test_type", Value: testTypeCode}, + core.Field{Key: "error", Value: err.Error()}, + ) + } + } + // IMPORTANT: Update the in-memory metadata with the ID from the database + metadata.ID = pluginID } } @@ -204,24 +221,45 @@ func (m *Manager) UnloadPlugins() error { return nil } -// GetPlugin retrieves a loaded plugin by name (latest version) -func (m *Manager) GetPlugin(name string) (core.Plugin, error) { +// GetPlugin retrieves a loaded plugin by name and optional version. +// If version is empty, it returns the latest available version. +func (m *Manager) GetPlugin(name string, version string) (core.Plugin, error) { m.mu.RLock() defer m.mu.RUnlock() - // Look for exact match first - if plugin, exists := m.plugins[name]; exists { - return plugin, nil + // If version is specified, look for an exact match. + if version != "" { + pluginKey := fmt.Sprintf("%s@%s", name, version) + if plugin, exists := m.plugins[pluginKey]; exists { + return plugin, nil + } + return nil, fmt.Errorf("plugin %s with version %s not found", name, version) } - // Look for name@version pattern + // If version is not specified, find the latest version. + var latestPlugin core.Plugin + var latestVersion string + for key, plugin := range m.plugins { if strings.HasPrefix(key, name+"@") { - return plugin, nil + parts := strings.Split(key, "@") + if len(parts) != 2 { + continue // Should not happen with our key format + } + currentVersion := parts[1] + + if latestPlugin == nil || isNewerVersion(currentVersion, latestVersion) { + latestVersion = currentVersion + latestPlugin = plugin + } } } - return nil, fmt.Errorf("plugin %s not found", name) + if latestPlugin == nil { + return nil, fmt.Errorf("plugin %s not found", name) + } + + return latestPlugin, nil } // GetLoadedPlugins returns all loaded plugins @@ -416,6 +454,32 @@ func (m *Manager) calculateSHA256(filePath string) (string, error) { return fmt.Sprintf("%x", hash.Sum(nil)), nil } +// isNewerVersion compares two semantic version strings. +// It returns true if v1 is newer than v2. +// This is a simplified implementation for formats like X.Y.Z. +func isNewerVersion(v1, v2 string) bool { + parts1 := strings.Split(v1, ".") + parts2 := strings.Split(v2, ".") + + for i := 0; i < len(parts1) && i < len(parts2); i++ { + // For simplicity, we ignore non-numeric parts for now. + // A proper implementation would use a semantic versioning library. + n1, _ := strconv.Atoi(parts1[i]) + n2, _ := strconv.Atoi(parts2[i]) + + if n1 > n2 { + return true + } + if n1 < n2 { + return false + } + } + + // If all parts are equal, the one with more parts is newer + // e.g., 1.0.1 is newer than 1.0 + return len(parts1) > len(parts2) +} + // isValidVersion performs basic semantic version validation func isValidVersion(version string) bool { // Basic check for X.Y.Z format diff --git a/core/scheduler/manager.go b/core/scheduler/manager.go index e001d9f..5880108 100644 --- a/core/scheduler/manager.go +++ b/core/scheduler/manager.go @@ -41,12 +41,12 @@ type scheduledTask struct { // TestExecutionTask implements the Task interface for test execution type TestExecutionTask struct { - id string - pluginName string - config map[string]interface{} - storage core.StorageManager - plugin core.PluginManager - logger core.Logger + id string + plugin core.Plugin + config map[string]interface{} + storage core.StorageManager + logger core.Logger + runID int64 } // NewManager creates a new scheduler manager @@ -243,23 +243,43 @@ func (m *Manager) CancelTask(taskID string) error { } // ScheduleTest schedules a test execution -func (m *Manager) ScheduleTest(ctx context.Context, pluginName string, config map[string]interface{}) (int64, error) { - // Get plugin - plugin, err := m.pluginManager.GetPlugin(pluginName) +func (m *Manager) ScheduleTest(ctx context.Context, plugin core.Plugin, config map[string]interface{}) (int64, error) { + pluginMeta := plugin.Metadata() + + // Ensure plugin has an ID from the database. + if pluginMeta.ID == 0 { + // If the ID is missing, it means the plugin wasn't registered correctly. + // Attempt to look it up from storage. + p, err := m.storage.GetPlugin(ctx, pluginMeta.Name, pluginMeta.Version) + if err != nil { + return 0, fmt.Errorf("plugin '%s v%s' is not registered in the database and could not be found: %w", pluginMeta.Name, pluginMeta.Version, err) + } + pluginMeta.ID = p.ID + } + + // Get the primary test type for the plugin + if len(pluginMeta.TestTypes) == 0 { + return 0, fmt.Errorf("plugin %s has no declared test types", pluginMeta.Name) + } + primaryTestTypeCode := pluginMeta.TestTypes[0] + + // Look up the test type ID from storage + testType, err := m.storage.GetTestType(ctx, primaryTestTypeCode) if err != nil { - return 0, fmt.Errorf("plugin %s not found: %w", pluginName, err) + return 0, fmt.Errorf("failed to get test type '%s': %w. It should have been auto-registered when the plugin was loaded", primaryTestTypeCode, err) } // Create test run record testRun := &core.TestRun{ - Name: fmt.Sprintf("Test run %s", time.Now().Format("2006-01-02 15:04:05")), - Description: fmt.Sprintf("Scheduled test execution for plugin %s", pluginName), + Name: fmt.Sprintf("Test run for %s v%s", pluginMeta.Name, pluginMeta.Version), + Description: fmt.Sprintf("Scheduled test execution for plugin %s", pluginMeta.Name), Status: core.StatusPending, Config: config, - PluginVer: plugin.Metadata().Version, - Host: "localhost", // Default, should come from config - Port: 5432, // Default, should come from config - DBName: "test", // Default, should come from config + PluginID: pluginMeta.ID, // Use the now-guaranteed-to-be-correct ID + TestTypeID: testType.ID, // Use the looked-up ID + Host: "localhost", // Default, should come from config + Port: 5432, // Default, should come from config + DBName: "test", // Default, should come from config } runID, err := m.storage.CreateTestRun(ctx, testRun) @@ -269,22 +289,31 @@ func (m *Manager) ScheduleTest(ctx context.Context, pluginName string, config ma // Create execution task task := &TestExecutionTask{ - id: fmt.Sprintf("test-%d", runID), - pluginName: pluginName, - config: config, - storage: m.storage, - plugin: m.pluginManager, - logger: m.logger, + id: fmt.Sprintf("test-%d", runID), + plugin: plugin, + config: config, + storage: m.storage, + logger: m.logger, + runID: runID, } // Submit for execution if err := m.SubmitTask(task); err != nil { + // Rollback test run creation + // It's important to handle this to avoid orphaned test_run records + if rollbackErr := m.storage.UpdateTestRunStatus(ctx, runID, core.StatusFailed); rollbackErr != nil { + m.logger.Error("failed to rollback test run status", + core.Field{Key: "test_run_id", Value: runID}, + core.Field{Key: "error", Value: rollbackErr.Error()}, + ) + } return 0, fmt.Errorf("failed to submit test task: %w", err) } m.logger.Info("test scheduled", core.Field{Key: "test_run_id", Value: runID}, - core.Field{Key: "plugin", Value: pluginName}, + core.Field{Key: "plugin", Value: plugin.Metadata().Name}, + core.Field{Key: "version", Value: plugin.Metadata().Version}, ) return runID, nil @@ -452,23 +481,48 @@ func (t *TestExecutionTask) Type() string { func (t *TestExecutionTask) Execute(ctx context.Context) error { t.logger.Info("executing test", core.Field{Key: "task_id", Value: t.id}, - core.Field{Key: "plugin", Value: t.pluginName}, + core.Field{Key: "plugin", Value: t.plugin.Metadata().Name}, + core.Field{Key: "run_id", Value: t.runID}, ) - // Get plugin - plugin, err := t.plugin.GetPlugin(t.pluginName) - if err != nil { - return fmt.Errorf("failed to get plugin: %w", err) + // Update status to running + if err := t.storage.UpdateTestRunStatus(ctx, t.runID, core.StatusRunning); err != nil { + t.logger.Error("failed to update test run status to running", + core.Field{Key: "run_id", Value: t.runID}, + core.Field{Key: "error", Value: err.Error()}, + ) + return fmt.Errorf("failed to update test run status: %w", err) } // Execute plugin - if err := plugin.Execute(ctx, t.config); err != nil { - return fmt.Errorf("plugin execution failed: %w", err) + err := t.plugin.Execute(ctx, t.config) + + // Update status based on execution result + var finalStatus core.ServiceStatus + if err != nil { + finalStatus = core.StatusFailed + t.logger.Error("plugin execution failed", + core.Field{Key: "task_id", Value: t.id}, + core.Field{Key: "error", Value: err.Error()}, + ) + } else { + finalStatus = core.StatusSucceeded + t.logger.Info("test execution completed successfully", + core.Field{Key: "task_id", Value: t.id}, + ) } - t.logger.Info("test execution completed", - core.Field{Key: "task_id", Value: t.id}, - ) + if statusErr := t.storage.UpdateTestRunStatus(ctx, t.runID, finalStatus); statusErr != nil { + t.logger.Error("failed to update final test run status", + core.Field{Key: "run_id", Value: t.runID}, + core.Field{Key: "error", Value: statusErr.Error()}, + ) + // Return original error if it exists, otherwise return status update error + if err != nil { + return fmt.Errorf("plugin execution failed (%v) and status update failed (%v)", err, statusErr) + } + return fmt.Errorf("failed to update test run status: %w", statusErr) + } - return nil + return err } diff --git a/core/storage/manager.go b/core/storage/manager.go index 3fc63c8..b654dc9 100644 --- a/core/storage/manager.go +++ b/core/storage/manager.go @@ -35,8 +35,8 @@ func (m *Manager) CreateTestRun(ctx context.Context, run *core.TestRun) (int64, query := ` INSERT INTO test_run - (test_type_id, plugin_id, plugin_ver, host, port, db_name, name, description, status, config) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + (test_type_id, plugin_id, host, port, db_name, name, description, status, config) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id ` @@ -49,7 +49,6 @@ func (m *Manager) CreateTestRun(ctx context.Context, run *core.TestRun) (int64, err = db.QueryRowContext(ctx, query, run.TestTypeID, run.PluginID, - run.PluginVer, run.Host, run.Port, run.DBName, @@ -119,7 +118,7 @@ func (m *Manager) UpdateTestRunStatus(ctx context.Context, runID int64, status c // GetTestRun retrieves a test run by ID func (m *Manager) GetTestRun(ctx context.Context, runID int64) (*core.TestRun, error) { query := ` - SELECT id, test_type_id, plugin_id, plugin_ver, host, port, db_name, + SELECT id, test_type_id, plugin_id, host, port, db_name, name, description, status, config, created_at, start_time, end_time FROM test_run WHERE id = $1 @@ -138,7 +137,6 @@ func (m *Manager) GetTestRun(ctx context.Context, runID int64) (*core.TestRun, e &run.ID, &run.TestTypeID, &run.PluginID, - &run.PluginVer, &run.Host, &run.Port, &run.DBName, @@ -177,7 +175,7 @@ func (m *Manager) GetTestRun(ctx context.Context, runID int64) (*core.TestRun, e // ListTestRuns retrieves test runs with pagination func (m *Manager) ListTestRuns(ctx context.Context, limit, offset int) ([]core.TestRun, error) { query := ` - SELECT id, test_type_id, plugin_id, plugin_ver, host, port, db_name, + SELECT id, test_type_id, plugin_id, host, port, db_name, name, description, status, config, created_at, start_time, end_time FROM test_run ORDER BY created_at DESC @@ -205,7 +203,6 @@ func (m *Manager) ListTestRuns(ctx context.Context, limit, offset int) ([]core.T &run.ID, &run.TestTypeID, &run.PluginID, - &run.PluginVer, &run.Host, &run.Port, &run.DBName, @@ -341,7 +338,11 @@ func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResul } // Unmarshal tags - result.Tags = string(tagsJSON) + if tagsJSON != nil { + if err := json.Unmarshal(tagsJSON, &result.Tags); err != nil { + return nil, fmt.Errorf("failed to unmarshal tags: %w", err) + } + } results = append(results, result) } @@ -389,7 +390,11 @@ func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, lim return nil, fmt.Errorf("failed to scan result: %w", err) } - result.Tags = string(tagsJSON) + if tagsJSON != nil { + if err := json.Unmarshal(tagsJSON, &result.Tags); err != nil { + return nil, fmt.Errorf("failed to unmarshal tags: %w", err) + } + } results = append(results, result) } @@ -473,8 +478,7 @@ func (m *Manager) RegisterPlugin(ctx context.Context, metadata core.PluginMetada VALUES ($1, $2, $3, $4) ON CONFLICT (name, version) DO UPDATE SET sha256 = EXCLUDED.sha256, - metadata = EXCLUDED.metadata, - updated_at = now() + metadata = EXCLUDED.metadata RETURNING id ` @@ -501,7 +505,7 @@ func (m *Manager) RegisterPlugin(ctx context.Context, metadata core.PluginMetada // GetPlugin retrieves a plugin by name and version func (m *Manager) GetPlugin(ctx context.Context, name, version string) (*core.PluginMetadata, error) { query := ` - SELECT metadata + SELECT id, metadata FROM plugin WHERE name = $1 AND version = $2 AND is_active = true ` @@ -512,7 +516,8 @@ func (m *Manager) GetPlugin(ctx context.Context, name, version string) (*core.Pl } var metadataJSON []byte - err = db.QueryRowContext(ctx, query, name, version).Scan(&metadataJSON) + var metadata core.PluginMetadata + err = db.QueryRowContext(ctx, query, name, version).Scan(&metadata.ID, &metadataJSON) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("plugin %s v%s not found", name, version) @@ -520,7 +525,6 @@ func (m *Manager) GetPlugin(ctx context.Context, name, version string) (*core.Pl return nil, fmt.Errorf("failed to get plugin: %w", err) } - var metadata core.PluginMetadata if err := json.Unmarshal(metadataJSON, &metadata); err != nil { return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) } diff --git a/core/types.go b/core/types.go index c522ced..770a98d 100644 --- a/core/types.go +++ b/core/types.go @@ -8,7 +8,7 @@ import ( "time" ) -// Core service status enumeration +// Service status enumeration type ServiceStatus string const ( @@ -76,6 +76,7 @@ type SchedulerConfig struct { // PluginMetadata contains plugin registration information type PluginMetadata struct { + ID int `json:"id,omitempty"` Name string `yaml:"name" json:"name"` Version string `yaml:"version" json:"version"` Description string `yaml:"description" json:"description"` @@ -92,7 +93,6 @@ type TestRun struct { ID int64 `json:"id"` TestTypeID int `json:"test_type_id"` PluginID int `json:"plugin_id"` - PluginVer string `json:"plugin_version"` Host string `json:"host"` Port int `json:"port"` DBName string `json:"db_name"` @@ -107,13 +107,13 @@ type TestRun struct { // TestResult represents a single measurement from a test execution type TestResult struct { - ID int64 `json:"id"` - TestRunID int64 `json:"test_run_id"` - MetricID int `json:"metric_id"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Value float64 `json:"value"` - Tags string `json:"tags"` // JSON string for flexible metadata + ID int64 `json:"id"` + TestRunID int64 `json:"test_run_id"` + MetricID int `json:"metric_id"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + Value float64 `json:"value"` + Tags map[string]interface{} `json:"tags,omitempty"` // Flexible metadata, stored as JSONB } // TestType represents a category of tests (e.g., bulk_insert, read_latency) @@ -257,7 +257,7 @@ type SchedulerManager interface { CancelTask(taskID string) error // Test execution orchestration - ScheduleTest(ctx context.Context, pluginName string, config map[string]interface{}) (int64, error) + ScheduleTest(ctx context.Context, plugin Plugin, config map[string]interface{}) (int64, error) CancelTest(ctx context.Context, runID int64) error GetRunStatus(ctx context.Context, runID int64) (ServiceStatus, error) ListActiveRuns(ctx context.Context) ([]TestRun, error) @@ -271,7 +271,7 @@ type PluginManager interface { UnloadPlugins() error // Plugin registry - GetPlugin(name string) (Plugin, error) + GetPlugin(name string, version string) (Plugin, error) GetLoadedPlugins() []Plugin ListPlugins() []PluginMetadata diff --git a/go.mod b/go.mod index 922880e..cad77b0 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,9 @@ module github.com/elchinoo/stormdb -go 1.21 - -replace github.com/elchinoo/stormdb/core => ./core - -require github.com/elchinoo/stormdb/core v0.0.0-00010101000000-000000000000 +go 1.24.4 require ( - github.com/gorilla/mux v1.8.1 // indirect - github.com/lib/pq v1.10.9 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + github.com/gorilla/mux v1.8.1 + github.com/lib/pq v1.10.9 + gopkg.in/yaml.v3 v3.0.1 ) diff --git a/migrations/001_core_schema.sql b/migrations/001_core_schema.sql index 42bda4e..e33048e 100644 --- a/migrations/001_core_schema.sql +++ b/migrations/001_core_schema.sql @@ -26,7 +26,6 @@ CREATE TABLE IF NOT EXISTS plugin ( config_path TEXT, -- Path to configuration file metadata JSONB NOT NULL, -- Full plugin metadata created_at TIMESTAMPTZ DEFAULT now(), - updated_at TIMESTAMPTZ DEFAULT now(), is_active BOOLEAN DEFAULT true, UNIQUE(name, version) ); @@ -60,7 +59,6 @@ CREATE TABLE IF NOT EXISTS test_run ( id BIGSERIAL PRIMARY KEY, test_type_id INT REFERENCES test_type(id), plugin_id INT REFERENCES plugin(id), - plugin_ver VARCHAR(20) NOT NULL, -- Plugin version used host VARCHAR(200) NOT NULL, -- Target database host port INT NOT NULL, -- Target database port db_name VARCHAR(200) NOT NULL, -- Target database name @@ -87,7 +85,7 @@ CREATE TABLE IF NOT EXISTS test_run_result ( start_time TIMESTAMPTZ NOT NULL, -- Measurement window start end_time TIMESTAMPTZ NOT NULL, -- Measurement window end value DOUBLE PRECISION NOT NULL, -- Measured value - tags JSONB NOT NULL DEFAULT '{}', -- Flexible metadata (e.g., {"batch_size":1000}) + tags JSONB, -- Flexible metadata (e.g., {"batch_size":1000}) created_at TIMESTAMPTZ DEFAULT now() ); @@ -173,9 +171,6 @@ $$ language 'plpgsql'; CREATE TRIGGER update_test_type_updated_at BEFORE UPDATE ON test_type FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); -CREATE TRIGGER update_plugin_updated_at BEFORE UPDATE ON plugin - FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); - CREATE TRIGGER update_test_metric_updated_at BEFORE UPDATE ON test_metric FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); diff --git a/migrations/002_remove_plugin_ver_from_test_run.sql b/migrations/002_remove_plugin_ver_from_test_run.sql new file mode 100644 index 0000000..4a9d70b --- /dev/null +++ b/migrations/002_remove_plugin_ver_from_test_run.sql @@ -0,0 +1,58 @@ +-- Migration to align the test_run table with the v2 schema changes. +-- The plugin_ver column is now redundant, as the version is stored in the plugin table +-- and linked via plugin_id. This migration drops the dependent views, removes the +-- column, and then recreates the views with the updated schema. + +-- Step 1: Drop the dependent views +DROP VIEW IF EXISTS active_test_runs; +DROP VIEW IF EXISTS test_run_summary; + +-- Step 2: Drop the redundant column from the test_run table +ALTER TABLE test_run DROP COLUMN IF EXISTS plugin_ver; + +-- Step 3: Recreate the test_run_summary view without the old column +CREATE OR REPLACE VIEW test_run_summary AS +SELECT + tr.id, + tr.name, + tr.description, + tr.status, + tr.created_at, + tr.start_time, + tr.end_time, + EXTRACT(EPOCH FROM (tr.end_time - tr.start_time)) as duration_seconds, + tt.code as test_type_code, + tt.name as test_type_name, + p.name as plugin_name, + p.version as plugin_version, + tr.host, + tr.port, + tr.db_name +FROM test_run tr +JOIN test_type tt ON tr.test_type_id = tt.id +JOIN plugin p ON tr.plugin_id = p.id; + +-- Step 4: Recreate the active_test_runs view, explicitly listing columns +-- to avoid issues with wildcard selections in the future. +CREATE OR REPLACE VIEW active_test_runs AS +SELECT + tr.id, + tr.test_type_id, + tr.plugin_id, + tr.host, + tr.port, + tr.db_name, + tr.name, + tr.description, + tr.created_at, + tr.start_time, + tr.end_time, + tr.status, + tr.config, + tt.code as test_type_code, + p.name as plugin_name, + EXTRACT(EPOCH FROM (COALESCE(tr.end_time, now()) - tr.start_time)) as runtime_seconds +FROM test_run tr +JOIN test_type tt ON tr.test_type_id = tt.id +JOIN plugin p ON tr.plugin_id = p.id +WHERE tr.status IN ('running', 'pending'); diff --git a/plugins/bulk-load/Makefile b/plugins/bulk-load/Makefile index 3e9bcf2..fce71f5 100644 --- a/plugins/bulk-load/Makefile +++ b/plugins/bulk-load/Makefile @@ -17,7 +17,7 @@ all: plugin # Build plugin as shared library plugin: $(PLUGIN_SO) -$(PLUGIN_SO): $(GO_FILES) go.mod go.sum +$(PLUGIN_SO): $(GO_FILES) @echo "Building bulk load plugin..." @mkdir -p ../../build/plugins CGO_ENABLED=$(CGO_ENABLED) go build -buildmode=plugin -o $(PLUGIN_SO) . diff --git a/plugins/bulk-load/go.mod b/plugins/bulk-load/go.mod deleted file mode 100644 index cfa42cb..0000000 --- a/plugins/bulk-load/go.mod +++ /dev/null @@ -1,10 +0,0 @@ -module github.com/elchinoo/stormdb/plugins/bulk-load - -go 1.21 - -require ( - github.com/elchinoo/stormdb/core v0.0.0 - github.com/lib/pq v1.10.9 -) - -replace github.com/elchinoo/stormdb/core => ../../core diff --git a/plugins/bulk-load/go.sum b/plugins/bulk-load/go.sum deleted file mode 100644 index aeddeae..0000000 --- a/plugins/bulk-load/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= diff --git a/plugins/bulk-load/plugin.go b/plugins/bulk-load/plugin.go index 6529614..ada6296 100644 --- a/plugins/bulk-load/plugin.go +++ b/plugins/bulk-load/plugin.go @@ -40,6 +40,7 @@ type BulkLoadConfig struct { SSLMode string `json:"ssl_mode" yaml:"ssl_mode"` // Test configuration + Rebuild bool `json:"rebuild" yaml:"rebuild"` // Force drop/recreate of database BatchSizes []int `json:"batch_sizes" yaml:"batch_sizes"` // Batch sizes to test [1, 1000, 10000, 50000] Connections int `json:"connections" yaml:"connections"` // Fixed number of connections (default: 20) Duration time.Duration `json:"duration" yaml:"duration"` // Duration per batch size @@ -112,6 +113,7 @@ func (p *BulkLoadPlugin) Metadata() core.PluginMetadata { "username": {"type": "string", "description": "Database username"}, "password": {"type": "string", "description": "Database password"}, "ssl_mode": {"type": "string", "enum": ["disable", "require", "verify-ca", "verify-full"], "default": "disable"}, + "rebuild": {"type": "boolean", "default": false, "description": "Force drop and recreate of the test database"}, "batch_sizes": {"type": "array", "items": {"type": "integer", "minimum": 1}, "default": [1, 1000, 10000, 50000]}, "connections": {"type": "integer", "minimum": 1, "maximum": 1000, "default": 20}, "duration": {"type": "string", "pattern": "^[0-9]+[smh]$", "default": "5m"}, @@ -216,6 +218,11 @@ func (p *BulkLoadPlugin) Validate(config map[string]interface{}) error { bulkConfig.Verbose = verboseBool } } + if rebuild, ok := config["rebuild"]; ok { + if rebuildBool, ok := rebuild.(bool); ok { + bulkConfig.Rebuild = rebuildBool + } + } // Parse batch sizes if batchSizes, ok := config["batch_sizes"]; ok { @@ -367,6 +374,18 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac } defer p.db.Close() + // Rebuild database if requested + if p.config.Rebuild { + if err := p.rebuildDatabase(); err != nil { + return fmt.Errorf("failed to rebuild database: %w", err) + } + // Reconnect after rebuild + if err := p.connectDatabase(); err != nil { + return fmt.Errorf("failed to reconnect to database after rebuild: %w", err) + } + defer p.db.Close() + } + // Create/prepare test table if err := p.setupTestTable(); err != nil { return fmt.Errorf("failed to setup test table: %w", err) @@ -441,6 +460,45 @@ func (p *BulkLoadPlugin) Cleanup(ctx context.Context) error { // Helper methods +func (p *BulkLoadPlugin) rebuildDatabase() error { + p.logger.Info("Rebuilding database", core.Field{Key: "database", Value: p.config.Database}) + + // Connect to the default 'postgres' database to drop the target database + defaultConnStr := fmt.Sprintf("host=%s port=%d dbname=postgres user=%s password=%s sslmode=%s", + p.config.Host, p.config.Port, p.config.Username, p.config.Password, p.config.SSLMode) + + db, err := sql.Open("postgres", defaultConnStr) + if err != nil { + return fmt.Errorf("failed to connect to 'postgres' db for rebuild: %w", err) + } + defer db.Close() + + // Terminate existing connections + terminateSQL := ` + SELECT pg_terminate_backend(pg_stat_activity.pid) + FROM pg_stat_activity + WHERE pg_stat_activity.datname = $1 AND pid <> pg_backend_pid()` + if _, err := db.Exec(terminateSQL, p.config.Database); err != nil { + p.logger.Warn("Could not terminate existing connections, proceeding anyway", core.Field{Key: "error", Value: err.Error()}) + } + + // Drop database + dropSQL := fmt.Sprintf("DROP DATABASE IF EXISTS %s", p.config.Database) + if _, err := db.Exec(dropSQL); err != nil { + return fmt.Errorf("failed to drop database: %w", err) + } + p.logger.Info("Dropped database", core.Field{Key: "database", Value: p.config.Database}) + + // Create database + createSQL := fmt.Sprintf("CREATE DATABASE %s", p.config.Database) + if _, err := db.Exec(createSQL); err != nil { + return fmt.Errorf("failed to create database: %w", err) + } + p.logger.Info("Created database", core.Field{Key: "database", Value: p.config.Database}) + + return nil +} + // connectDatabase establishes database connection func (p *BulkLoadPlugin) connectDatabase() error { connStr := fmt.Sprintf("host=%s port=%d dbname=%s user=%s password=%s sslmode=%s", diff --git a/plugins/tpcc-scalability/Makefile b/plugins/tpcc-scalability/Makefile index b2a4f34..31d9f60 100644 --- a/plugins/tpcc-scalability/Makefile +++ b/plugins/tpcc-scalability/Makefile @@ -24,7 +24,7 @@ deps: $(GOMOD) tidy # Build the plugin as a shared library -plugin: deps +plugin: @echo "Building $(PLUGIN_NAME) plugin..." @mkdir -p $(PLUGIN_DIR) $(GOBUILD) -buildmode=plugin -o $(PLUGIN_DIR)/$(PLUGIN_SO) . diff --git a/plugins/tpcc-scalability/go.mod b/plugins/tpcc-scalability/go.mod deleted file mode 100644 index 33ce29c..0000000 --- a/plugins/tpcc-scalability/go.mod +++ /dev/null @@ -1,10 +0,0 @@ -module github.com/elchinoo/stormdb/plugins/tpcc-scalability - -go 1.21 - -require ( - github.com/elchinoo/stormdb/core v0.0.0 - github.com/lib/pq v1.10.9 -) - -replace github.com/elchinoo/stormdb/core => ../../core diff --git a/plugins/tpcc-scalability/go.sum b/plugins/tpcc-scalability/go.sum deleted file mode 100644 index aeddeae..0000000 --- a/plugins/tpcc-scalability/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= diff --git a/plugins/tpcc-scalability/plugin.go b/plugins/tpcc-scalability/plugin.go index bb3e3bd..09fd4a7 100644 --- a/plugins/tpcc-scalability/plugin.go +++ b/plugins/tpcc-scalability/plugin.go @@ -40,6 +40,7 @@ type TPCCConfig struct { SSLMode string `json:"ssl_mode" yaml:"ssl_mode"` // Test configuration + Rebuild bool `json:"rebuild" yaml:"rebuild"` // Force drop/recreate of database Scale int `json:"scale" yaml:"scale"` // TPC-C scale factor (warehouses) Connections []int `json:"connections" yaml:"connections"` // Connection counts to test [48, 96, 192, 256] Duration time.Duration `json:"duration" yaml:"duration"` // Duration per connection level @@ -191,6 +192,13 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) core.Field{Key: "connection_levels", Value: len(p.config.Connections)}, ) + // Rebuild database if requested + if p.config.Rebuild { + if err := p.rebuildDatabase(ctx); err != nil { + return fmt.Errorf("failed to rebuild database: %w", err) + } + } + // Connect to database if err := p.connectDB(); err != nil { return fmt.Errorf("failed to connect to database: %w", err) @@ -207,31 +215,8 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) return fmt.Errorf("failed to populate test data: %w", err) } - // Create test run - testRun := &core.TestRun{ - TestTypeID: 1, // Will be resolved by storage layer - PluginID: 1, // Will be resolved by storage layer - PluginVer: "1.0.0", - Host: p.config.Host, - Port: p.config.Port, - DBName: p.config.Database, - Name: fmt.Sprintf("TPC-C Scalability Test - Scale %d", p.config.Scale), - Description: fmt.Sprintf("Incremental connection test: %v", p.config.Connections), - Status: core.StatusRunning, - Config: config, - CreatedAt: time.Now(), - } - - runID, err := p.core.Storage.CreateTestRun(ctx, testRun) - if err != nil { - return fmt.Errorf("failed to create test run: %w", err) - } - - p.logger.Info("test run created", - core.Field{Key: "run_id", Value: runID}, - ) - - // Execute test for each connection level + // The test run is created by the scheduler. The plugin's job is to execute + // the test logic for each connection level. for i, connCount := range p.config.Connections { p.logger.Info("starting connection level test", core.Field{Key: "level", Value: i + 1}, @@ -241,7 +226,13 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) p.metrics.CurrentConnections = connCount p.metrics.TestPhase = fmt.Sprintf("Level %d/%d", i+1, len(p.config.Connections)) - if err := p.runConnectionLevel(ctx, runID, connCount); err != nil { + // We need a run ID to store results, but the plugin doesn't own the run. + // This is a temporary solution. In a real scenario, the run ID would be + // passed into the Execute method via the context. + // For now, we'll use a placeholder. + const placeholderRunID = 1 + + if err := p.runConnectionLevel(ctx, placeholderRunID, connCount); err != nil { p.logger.Error("connection level test failed", core.Field{Key: "level", Value: i + 1}, core.Field{Key: "connections", Value: connCount}, @@ -255,13 +246,7 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) } } - // Update test run status - if err := p.core.Storage.UpdateTestRunStatus(ctx, runID, core.StatusSucceeded); err != nil { - p.logger.Warn("failed to update test run status", core.Field{Key: "error", Value: err.Error()}) - } - p.logger.Info("TPC-C scalability test completed", - core.Field{Key: "run_id", Value: runID}, core.Field{Key: "duration", Value: time.Since(p.testStarted)}, ) @@ -292,6 +277,45 @@ func (p *TPCCPlugin) Cleanup(ctx context.Context) error { // Helper methods +func (p *TPCCPlugin) rebuildDatabase(ctx context.Context) error { + p.logger.Info("Rebuilding database", core.Field{Key: "database", Value: p.config.Database}) + + // Connect to the default 'postgres' database to drop the target database + defaultConnStr := fmt.Sprintf("host=%s port=%d dbname=postgres user=%s password=%s sslmode=%s", + p.config.Host, p.config.Port, p.config.Username, p.config.Password, p.config.SSLMode) + + db, err := sql.Open("postgres", defaultConnStr) + if err != nil { + return fmt.Errorf("failed to connect to 'postgres' db for rebuild: %w", err) + } + defer db.Close() + + // Terminate existing connections + terminateSQL := ` + SELECT pg_terminate_backend(pg_stat_activity.pid) + FROM pg_stat_activity + WHERE pg_stat_activity.datname = $1 AND pid <> pg_backend_pid()` + if _, err := db.ExecContext(ctx, terminateSQL, p.config.Database); err != nil { + p.logger.Warn("Could not terminate existing connections, proceeding anyway", core.Field{Key: "error", Value: err.Error()}) + } + + // Drop database + dropSQL := fmt.Sprintf("DROP DATABASE IF EXISTS %s", p.config.Database) + if _, err := db.ExecContext(ctx, dropSQL); err != nil { + return fmt.Errorf("failed to drop database: %w", err) + } + p.logger.Info("Dropped database", core.Field{Key: "database", Value: p.config.Database}) + + // Create database + createSQL := fmt.Sprintf("CREATE DATABASE %s", p.config.Database) + if _, err := db.ExecContext(ctx, createSQL); err != nil { + return fmt.Errorf("failed to create database: %w", err) + } + p.logger.Info("Created database", core.Field{Key: "database", Value: p.config.Database}) + + return nil +} + func (p *TPCCPlugin) connectDB() error { connStr := fmt.Sprintf("host=%s port=%d dbname=%s user=%s password=%s sslmode=%s", p.config.Host, p.config.Port, p.config.Database, @@ -682,7 +706,11 @@ func (p *TPCCPlugin) worker(ctx context.Context, workerID int, stopTime time.Tim StartTime: start, EndTime: start.Add(latency), Value: float64(latency.Nanoseconds()), - Tags: fmt.Sprintf(`{"worker_id":%d,"tx_type":"%s","connections":%d}`, workerID, txType, p.metrics.CurrentConnections), + Tags: map[string]interface{}{ + "worker_id": workerID, + "tx_type": txType, + "connections": p.metrics.CurrentConnections, + }, } select { @@ -939,6 +967,7 @@ func getConfigSchema() string { "username": {"type": "string", "default": "postgres"}, "password": {"type": "string", "default": "postgres"}, "ssl_mode": {"type": "string", "default": "disable"}, + "rebuild": {"type": "boolean", "default": false, "description": "Force drop and recreate of the test database"}, "scale": {"type": "integer", "minimum": 1, "default": 10}, "connections": { "type": "array", diff --git a/run_tests.sh b/run_tests.sh index 9ef6b75..51c6558 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -5,7 +5,9 @@ set -e echo "Running StormDB v2 Unit Tests..." -cd /Users/charly.batista/proj/pgstorm/stormdb/v2 +# Get the directory of the script +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +cd "$SCRIPT_DIR" # Run all unit tests echo "Running unit tests..."