diff --git a/config/core.yaml b/config/core.yaml index ccbe6ef..f10a840 100644 --- a/config/core.yaml +++ b/config/core.yaml @@ -23,6 +23,7 @@ logging: output: "stdout" # "stdout", "stderr", or file path scheduler: + enabled: true worker_pool_size: 10 plugin_dirs: diff --git a/core/api/server.go b/core/api/server.go index b7fd565..33f2245 100644 --- a/core/api/server.go +++ b/core/api/server.go @@ -169,8 +169,9 @@ func (s *Server) handleListPlugins(w http.ResponseWriter, r *http.Request) { func (s *Server) handleGetPlugin(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) name := vars["name"] + version := r.URL.Query().Get("version") // Get version from query param - plugin, err := s.coreServices.Plugin.GetPlugin(name) + plugin, err := s.coreServices.Plugin.GetPlugin(name, version) if err != nil { s.writeErrorResponse(w, http.StatusNotFound, "plugin not found", err) return @@ -205,10 +206,12 @@ func (s *Server) handleReloadPlugins(w http.ResponseWriter, r *http.Request) { // Create test run endpoint func (s *Server) handleCreateTestRun(w http.ResponseWriter, r *http.Request) { var req struct { - PluginName string `json:"plugin_name"` - Name string `json:"name"` - Description string `json:"description"` - Config map[string]interface{} `json:"config"` + PluginName string `json:"plugin_name"` + PluginVersion string `json:"plugin_version"` // Optional + Name string `json:"name"` + Description string `json:"description"` + Config map[string]interface{} `json:"config"` + Rebuild bool `json:"rebuild"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -222,9 +225,22 @@ func (s *Server) handleCreateTestRun(w http.ResponseWriter, r *http.Request) { return } - // Schedule test + // Get the specific plugin version if provided, otherwise get the latest + plugin, err := s.coreServices.Plugin.GetPlugin(req.PluginName, req.PluginVersion) + if err != nil { + s.writeErrorResponse(w, http.StatusBadRequest, "plugin not found", err) + return + } + + // Add rebuild flag to config + if req.Config == nil { + req.Config = make(map[string]interface{}) + } + req.Config["rebuild"] = req.Rebuild + + // Schedule test with the specific plugin instance ctx := r.Context() - runID, err := s.coreServices.Scheduler.ScheduleTest(ctx, req.PluginName, req.Config) + runID, err := s.coreServices.Scheduler.ScheduleTest(ctx, plugin, req.Config) if err != nil { s.writeErrorResponse(w, http.StatusInternalServerError, "failed to schedule test", err) return diff --git a/core/go.mod b/core/go.mod deleted file mode 100644 index 6f4b031..0000000 --- a/core/go.mod +++ /dev/null @@ -1,16 +0,0 @@ -module github.com/elchinoo/stormdb/core - -go 1.21 - -require ( - github.com/gorilla/mux v1.8.1 - github.com/jackc/pgx/v5 v5.7.1 - gopkg.in/yaml.v3 v3.0.1 -) - -require ( - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - golang.org/x/crypto v0.31.0 // indirect - golang.org/x/text v0.21.0 // indirect -) diff --git a/core/plugin/manager.go b/core/plugin/manager.go index 2ab2bd7..61cdc74 100644 --- a/core/plugin/manager.go +++ b/core/plugin/manager.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "plugin" + "strconv" "strings" "sync" @@ -94,12 +95,28 @@ func (m *Manager) LoadPlugin(path string) (core.Plugin, error) { // Register in storage if m.storage != nil { - _, err := m.storage.RegisterPlugin(context.Background(), metadata) + pluginID, err := m.storage.RegisterPlugin(context.Background(), metadata) if err != nil { m.logger.Warn("failed to register plugin in storage", core.Field{Key: "plugin", Value: metadata.Name}, core.Field{Key: "error", Value: err.Error()}, ) + } else { + // After registering the plugin, also register its declared test types + for _, testTypeCode := range metadata.TestTypes { + // A more robust implementation might fetch name/description from a central registry + // For now, we use the code as the name and provide a default description. + _, err := m.storage.RegisterTestType(context.Background(), testTypeCode, testTypeCode, "Auto-registered test type for plugin "+metadata.Name) + if err != nil { + m.logger.Warn("failed to auto-register test type for plugin", + core.Field{Key: "plugin", Value: metadata.Name}, + core.Field{Key: "test_type", Value: testTypeCode}, + core.Field{Key: "error", Value: err.Error()}, + ) + } + } + // IMPORTANT: Update the in-memory metadata with the ID from the database + metadata.ID = pluginID } } @@ -204,24 +221,45 @@ func (m *Manager) UnloadPlugins() error { return nil } -// GetPlugin retrieves a loaded plugin by name (latest version) -func (m *Manager) GetPlugin(name string) (core.Plugin, error) { +// GetPlugin retrieves a loaded plugin by name and optional version. +// If version is empty, it returns the latest available version. +func (m *Manager) GetPlugin(name string, version string) (core.Plugin, error) { m.mu.RLock() defer m.mu.RUnlock() - // Look for exact match first - if plugin, exists := m.plugins[name]; exists { - return plugin, nil + // If version is specified, look for an exact match. + if version != "" { + pluginKey := fmt.Sprintf("%s@%s", name, version) + if plugin, exists := m.plugins[pluginKey]; exists { + return plugin, nil + } + return nil, fmt.Errorf("plugin %s with version %s not found", name, version) } - // Look for name@version pattern + // If version is not specified, find the latest version. + var latestPlugin core.Plugin + var latestVersion string + for key, plugin := range m.plugins { if strings.HasPrefix(key, name+"@") { - return plugin, nil + parts := strings.Split(key, "@") + if len(parts) != 2 { + continue // Should not happen with our key format + } + currentVersion := parts[1] + + if latestPlugin == nil || isNewerVersion(currentVersion, latestVersion) { + latestVersion = currentVersion + latestPlugin = plugin + } } } - return nil, fmt.Errorf("plugin %s not found", name) + if latestPlugin == nil { + return nil, fmt.Errorf("plugin %s not found", name) + } + + return latestPlugin, nil } // GetLoadedPlugins returns all loaded plugins @@ -416,6 +454,32 @@ func (m *Manager) calculateSHA256(filePath string) (string, error) { return fmt.Sprintf("%x", hash.Sum(nil)), nil } +// isNewerVersion compares two semantic version strings. +// It returns true if v1 is newer than v2. +// This is a simplified implementation for formats like X.Y.Z. +func isNewerVersion(v1, v2 string) bool { + parts1 := strings.Split(v1, ".") + parts2 := strings.Split(v2, ".") + + for i := 0; i < len(parts1) && i < len(parts2); i++ { + // For simplicity, we ignore non-numeric parts for now. + // A proper implementation would use a semantic versioning library. + n1, _ := strconv.Atoi(parts1[i]) + n2, _ := strconv.Atoi(parts2[i]) + + if n1 > n2 { + return true + } + if n1 < n2 { + return false + } + } + + // If all parts are equal, the one with more parts is newer + // e.g., 1.0.1 is newer than 1.0 + return len(parts1) > len(parts2) +} + // isValidVersion performs basic semantic version validation func isValidVersion(version string) bool { // Basic check for X.Y.Z format diff --git a/core/scheduler/manager.go b/core/scheduler/manager.go index e001d9f..5880108 100644 --- a/core/scheduler/manager.go +++ b/core/scheduler/manager.go @@ -41,12 +41,12 @@ type scheduledTask struct { // TestExecutionTask implements the Task interface for test execution type TestExecutionTask struct { - id string - pluginName string - config map[string]interface{} - storage core.StorageManager - plugin core.PluginManager - logger core.Logger + id string + plugin core.Plugin + config map[string]interface{} + storage core.StorageManager + logger core.Logger + runID int64 } // NewManager creates a new scheduler manager @@ -243,23 +243,43 @@ func (m *Manager) CancelTask(taskID string) error { } // ScheduleTest schedules a test execution -func (m *Manager) ScheduleTest(ctx context.Context, pluginName string, config map[string]interface{}) (int64, error) { - // Get plugin - plugin, err := m.pluginManager.GetPlugin(pluginName) +func (m *Manager) ScheduleTest(ctx context.Context, plugin core.Plugin, config map[string]interface{}) (int64, error) { + pluginMeta := plugin.Metadata() + + // Ensure plugin has an ID from the database. + if pluginMeta.ID == 0 { + // If the ID is missing, it means the plugin wasn't registered correctly. + // Attempt to look it up from storage. + p, err := m.storage.GetPlugin(ctx, pluginMeta.Name, pluginMeta.Version) + if err != nil { + return 0, fmt.Errorf("plugin '%s v%s' is not registered in the database and could not be found: %w", pluginMeta.Name, pluginMeta.Version, err) + } + pluginMeta.ID = p.ID + } + + // Get the primary test type for the plugin + if len(pluginMeta.TestTypes) == 0 { + return 0, fmt.Errorf("plugin %s has no declared test types", pluginMeta.Name) + } + primaryTestTypeCode := pluginMeta.TestTypes[0] + + // Look up the test type ID from storage + testType, err := m.storage.GetTestType(ctx, primaryTestTypeCode) if err != nil { - return 0, fmt.Errorf("plugin %s not found: %w", pluginName, err) + return 0, fmt.Errorf("failed to get test type '%s': %w. It should have been auto-registered when the plugin was loaded", primaryTestTypeCode, err) } // Create test run record testRun := &core.TestRun{ - Name: fmt.Sprintf("Test run %s", time.Now().Format("2006-01-02 15:04:05")), - Description: fmt.Sprintf("Scheduled test execution for plugin %s", pluginName), + Name: fmt.Sprintf("Test run for %s v%s", pluginMeta.Name, pluginMeta.Version), + Description: fmt.Sprintf("Scheduled test execution for plugin %s", pluginMeta.Name), Status: core.StatusPending, Config: config, - PluginVer: plugin.Metadata().Version, - Host: "localhost", // Default, should come from config - Port: 5432, // Default, should come from config - DBName: "test", // Default, should come from config + PluginID: pluginMeta.ID, // Use the now-guaranteed-to-be-correct ID + TestTypeID: testType.ID, // Use the looked-up ID + Host: "localhost", // Default, should come from config + Port: 5432, // Default, should come from config + DBName: "test", // Default, should come from config } runID, err := m.storage.CreateTestRun(ctx, testRun) @@ -269,22 +289,31 @@ func (m *Manager) ScheduleTest(ctx context.Context, pluginName string, config ma // Create execution task task := &TestExecutionTask{ - id: fmt.Sprintf("test-%d", runID), - pluginName: pluginName, - config: config, - storage: m.storage, - plugin: m.pluginManager, - logger: m.logger, + id: fmt.Sprintf("test-%d", runID), + plugin: plugin, + config: config, + storage: m.storage, + logger: m.logger, + runID: runID, } // Submit for execution if err := m.SubmitTask(task); err != nil { + // Rollback test run creation + // It's important to handle this to avoid orphaned test_run records + if rollbackErr := m.storage.UpdateTestRunStatus(ctx, runID, core.StatusFailed); rollbackErr != nil { + m.logger.Error("failed to rollback test run status", + core.Field{Key: "test_run_id", Value: runID}, + core.Field{Key: "error", Value: rollbackErr.Error()}, + ) + } return 0, fmt.Errorf("failed to submit test task: %w", err) } m.logger.Info("test scheduled", core.Field{Key: "test_run_id", Value: runID}, - core.Field{Key: "plugin", Value: pluginName}, + core.Field{Key: "plugin", Value: plugin.Metadata().Name}, + core.Field{Key: "version", Value: plugin.Metadata().Version}, ) return runID, nil @@ -452,23 +481,48 @@ func (t *TestExecutionTask) Type() string { func (t *TestExecutionTask) Execute(ctx context.Context) error { t.logger.Info("executing test", core.Field{Key: "task_id", Value: t.id}, - core.Field{Key: "plugin", Value: t.pluginName}, + core.Field{Key: "plugin", Value: t.plugin.Metadata().Name}, + core.Field{Key: "run_id", Value: t.runID}, ) - // Get plugin - plugin, err := t.plugin.GetPlugin(t.pluginName) - if err != nil { - return fmt.Errorf("failed to get plugin: %w", err) + // Update status to running + if err := t.storage.UpdateTestRunStatus(ctx, t.runID, core.StatusRunning); err != nil { + t.logger.Error("failed to update test run status to running", + core.Field{Key: "run_id", Value: t.runID}, + core.Field{Key: "error", Value: err.Error()}, + ) + return fmt.Errorf("failed to update test run status: %w", err) } // Execute plugin - if err := plugin.Execute(ctx, t.config); err != nil { - return fmt.Errorf("plugin execution failed: %w", err) + err := t.plugin.Execute(ctx, t.config) + + // Update status based on execution result + var finalStatus core.ServiceStatus + if err != nil { + finalStatus = core.StatusFailed + t.logger.Error("plugin execution failed", + core.Field{Key: "task_id", Value: t.id}, + core.Field{Key: "error", Value: err.Error()}, + ) + } else { + finalStatus = core.StatusSucceeded + t.logger.Info("test execution completed successfully", + core.Field{Key: "task_id", Value: t.id}, + ) } - t.logger.Info("test execution completed", - core.Field{Key: "task_id", Value: t.id}, - ) + if statusErr := t.storage.UpdateTestRunStatus(ctx, t.runID, finalStatus); statusErr != nil { + t.logger.Error("failed to update final test run status", + core.Field{Key: "run_id", Value: t.runID}, + core.Field{Key: "error", Value: statusErr.Error()}, + ) + // Return original error if it exists, otherwise return status update error + if err != nil { + return fmt.Errorf("plugin execution failed (%v) and status update failed (%v)", err, statusErr) + } + return fmt.Errorf("failed to update test run status: %w", statusErr) + } - return nil + return err } diff --git a/core/storage/manager.go b/core/storage/manager.go index 3fc63c8..b654dc9 100644 --- a/core/storage/manager.go +++ b/core/storage/manager.go @@ -35,8 +35,8 @@ func (m *Manager) CreateTestRun(ctx context.Context, run *core.TestRun) (int64, query := ` INSERT INTO test_run - (test_type_id, plugin_id, plugin_ver, host, port, db_name, name, description, status, config) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + (test_type_id, plugin_id, host, port, db_name, name, description, status, config) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id ` @@ -49,7 +49,6 @@ func (m *Manager) CreateTestRun(ctx context.Context, run *core.TestRun) (int64, err = db.QueryRowContext(ctx, query, run.TestTypeID, run.PluginID, - run.PluginVer, run.Host, run.Port, run.DBName, @@ -119,7 +118,7 @@ func (m *Manager) UpdateTestRunStatus(ctx context.Context, runID int64, status c // GetTestRun retrieves a test run by ID func (m *Manager) GetTestRun(ctx context.Context, runID int64) (*core.TestRun, error) { query := ` - SELECT id, test_type_id, plugin_id, plugin_ver, host, port, db_name, + SELECT id, test_type_id, plugin_id, host, port, db_name, name, description, status, config, created_at, start_time, end_time FROM test_run WHERE id = $1 @@ -138,7 +137,6 @@ func (m *Manager) GetTestRun(ctx context.Context, runID int64) (*core.TestRun, e &run.ID, &run.TestTypeID, &run.PluginID, - &run.PluginVer, &run.Host, &run.Port, &run.DBName, @@ -177,7 +175,7 @@ func (m *Manager) GetTestRun(ctx context.Context, runID int64) (*core.TestRun, e // ListTestRuns retrieves test runs with pagination func (m *Manager) ListTestRuns(ctx context.Context, limit, offset int) ([]core.TestRun, error) { query := ` - SELECT id, test_type_id, plugin_id, plugin_ver, host, port, db_name, + SELECT id, test_type_id, plugin_id, host, port, db_name, name, description, status, config, created_at, start_time, end_time FROM test_run ORDER BY created_at DESC @@ -205,7 +203,6 @@ func (m *Manager) ListTestRuns(ctx context.Context, limit, offset int) ([]core.T &run.ID, &run.TestTypeID, &run.PluginID, - &run.PluginVer, &run.Host, &run.Port, &run.DBName, @@ -341,7 +338,11 @@ func (m *Manager) GetResults(ctx context.Context, runID int64) ([]core.TestResul } // Unmarshal tags - result.Tags = string(tagsJSON) + if tagsJSON != nil { + if err := json.Unmarshal(tagsJSON, &result.Tags); err != nil { + return nil, fmt.Errorf("failed to unmarshal tags: %w", err) + } + } results = append(results, result) } @@ -389,7 +390,11 @@ func (m *Manager) GetResultsByMetric(ctx context.Context, metricCode string, lim return nil, fmt.Errorf("failed to scan result: %w", err) } - result.Tags = string(tagsJSON) + if tagsJSON != nil { + if err := json.Unmarshal(tagsJSON, &result.Tags); err != nil { + return nil, fmt.Errorf("failed to unmarshal tags: %w", err) + } + } results = append(results, result) } @@ -473,8 +478,7 @@ func (m *Manager) RegisterPlugin(ctx context.Context, metadata core.PluginMetada VALUES ($1, $2, $3, $4) ON CONFLICT (name, version) DO UPDATE SET sha256 = EXCLUDED.sha256, - metadata = EXCLUDED.metadata, - updated_at = now() + metadata = EXCLUDED.metadata RETURNING id ` @@ -501,7 +505,7 @@ func (m *Manager) RegisterPlugin(ctx context.Context, metadata core.PluginMetada // GetPlugin retrieves a plugin by name and version func (m *Manager) GetPlugin(ctx context.Context, name, version string) (*core.PluginMetadata, error) { query := ` - SELECT metadata + SELECT id, metadata FROM plugin WHERE name = $1 AND version = $2 AND is_active = true ` @@ -512,7 +516,8 @@ func (m *Manager) GetPlugin(ctx context.Context, name, version string) (*core.Pl } var metadataJSON []byte - err = db.QueryRowContext(ctx, query, name, version).Scan(&metadataJSON) + var metadata core.PluginMetadata + err = db.QueryRowContext(ctx, query, name, version).Scan(&metadata.ID, &metadataJSON) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("plugin %s v%s not found", name, version) @@ -520,7 +525,6 @@ func (m *Manager) GetPlugin(ctx context.Context, name, version string) (*core.Pl return nil, fmt.Errorf("failed to get plugin: %w", err) } - var metadata core.PluginMetadata if err := json.Unmarshal(metadataJSON, &metadata); err != nil { return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) } diff --git a/core/types.go b/core/types.go index c522ced..770a98d 100644 --- a/core/types.go +++ b/core/types.go @@ -8,7 +8,7 @@ import ( "time" ) -// Core service status enumeration +// Service status enumeration type ServiceStatus string const ( @@ -76,6 +76,7 @@ type SchedulerConfig struct { // PluginMetadata contains plugin registration information type PluginMetadata struct { + ID int `json:"id,omitempty"` Name string `yaml:"name" json:"name"` Version string `yaml:"version" json:"version"` Description string `yaml:"description" json:"description"` @@ -92,7 +93,6 @@ type TestRun struct { ID int64 `json:"id"` TestTypeID int `json:"test_type_id"` PluginID int `json:"plugin_id"` - PluginVer string `json:"plugin_version"` Host string `json:"host"` Port int `json:"port"` DBName string `json:"db_name"` @@ -107,13 +107,13 @@ type TestRun struct { // TestResult represents a single measurement from a test execution type TestResult struct { - ID int64 `json:"id"` - TestRunID int64 `json:"test_run_id"` - MetricID int `json:"metric_id"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Value float64 `json:"value"` - Tags string `json:"tags"` // JSON string for flexible metadata + ID int64 `json:"id"` + TestRunID int64 `json:"test_run_id"` + MetricID int `json:"metric_id"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + Value float64 `json:"value"` + Tags map[string]interface{} `json:"tags,omitempty"` // Flexible metadata, stored as JSONB } // TestType represents a category of tests (e.g., bulk_insert, read_latency) @@ -257,7 +257,7 @@ type SchedulerManager interface { CancelTask(taskID string) error // Test execution orchestration - ScheduleTest(ctx context.Context, pluginName string, config map[string]interface{}) (int64, error) + ScheduleTest(ctx context.Context, plugin Plugin, config map[string]interface{}) (int64, error) CancelTest(ctx context.Context, runID int64) error GetRunStatus(ctx context.Context, runID int64) (ServiceStatus, error) ListActiveRuns(ctx context.Context) ([]TestRun, error) @@ -271,7 +271,7 @@ type PluginManager interface { UnloadPlugins() error // Plugin registry - GetPlugin(name string) (Plugin, error) + GetPlugin(name string, version string) (Plugin, error) GetLoadedPlugins() []Plugin ListPlugins() []PluginMetadata diff --git a/go.mod b/go.mod index 922880e..cad77b0 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,9 @@ module github.com/elchinoo/stormdb -go 1.21 - -replace github.com/elchinoo/stormdb/core => ./core - -require github.com/elchinoo/stormdb/core v0.0.0-00010101000000-000000000000 +go 1.24.4 require ( - github.com/gorilla/mux v1.8.1 // indirect - github.com/lib/pq v1.10.9 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + github.com/gorilla/mux v1.8.1 + github.com/lib/pq v1.10.9 + gopkg.in/yaml.v3 v3.0.1 ) diff --git a/migrations/001_core_schema.sql b/migrations/001_core_schema.sql index 42bda4e..e33048e 100644 --- a/migrations/001_core_schema.sql +++ b/migrations/001_core_schema.sql @@ -26,7 +26,6 @@ CREATE TABLE IF NOT EXISTS plugin ( config_path TEXT, -- Path to configuration file metadata JSONB NOT NULL, -- Full plugin metadata created_at TIMESTAMPTZ DEFAULT now(), - updated_at TIMESTAMPTZ DEFAULT now(), is_active BOOLEAN DEFAULT true, UNIQUE(name, version) ); @@ -60,7 +59,6 @@ CREATE TABLE IF NOT EXISTS test_run ( id BIGSERIAL PRIMARY KEY, test_type_id INT REFERENCES test_type(id), plugin_id INT REFERENCES plugin(id), - plugin_ver VARCHAR(20) NOT NULL, -- Plugin version used host VARCHAR(200) NOT NULL, -- Target database host port INT NOT NULL, -- Target database port db_name VARCHAR(200) NOT NULL, -- Target database name @@ -87,7 +85,7 @@ CREATE TABLE IF NOT EXISTS test_run_result ( start_time TIMESTAMPTZ NOT NULL, -- Measurement window start end_time TIMESTAMPTZ NOT NULL, -- Measurement window end value DOUBLE PRECISION NOT NULL, -- Measured value - tags JSONB NOT NULL DEFAULT '{}', -- Flexible metadata (e.g., {"batch_size":1000}) + tags JSONB, -- Flexible metadata (e.g., {"batch_size":1000}) created_at TIMESTAMPTZ DEFAULT now() ); @@ -173,9 +171,6 @@ $$ language 'plpgsql'; CREATE TRIGGER update_test_type_updated_at BEFORE UPDATE ON test_type FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); -CREATE TRIGGER update_plugin_updated_at BEFORE UPDATE ON plugin - FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); - CREATE TRIGGER update_test_metric_updated_at BEFORE UPDATE ON test_metric FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); diff --git a/migrations/002_remove_plugin_ver_from_test_run.sql b/migrations/002_remove_plugin_ver_from_test_run.sql new file mode 100644 index 0000000..4a9d70b --- /dev/null +++ b/migrations/002_remove_plugin_ver_from_test_run.sql @@ -0,0 +1,58 @@ +-- Migration to align the test_run table with the v2 schema changes. +-- The plugin_ver column is now redundant, as the version is stored in the plugin table +-- and linked via plugin_id. This migration drops the dependent views, removes the +-- column, and then recreates the views with the updated schema. + +-- Step 1: Drop the dependent views +DROP VIEW IF EXISTS active_test_runs; +DROP VIEW IF EXISTS test_run_summary; + +-- Step 2: Drop the redundant column from the test_run table +ALTER TABLE test_run DROP COLUMN IF EXISTS plugin_ver; + +-- Step 3: Recreate the test_run_summary view without the old column +CREATE OR REPLACE VIEW test_run_summary AS +SELECT + tr.id, + tr.name, + tr.description, + tr.status, + tr.created_at, + tr.start_time, + tr.end_time, + EXTRACT(EPOCH FROM (tr.end_time - tr.start_time)) as duration_seconds, + tt.code as test_type_code, + tt.name as test_type_name, + p.name as plugin_name, + p.version as plugin_version, + tr.host, + tr.port, + tr.db_name +FROM test_run tr +JOIN test_type tt ON tr.test_type_id = tt.id +JOIN plugin p ON tr.plugin_id = p.id; + +-- Step 4: Recreate the active_test_runs view, explicitly listing columns +-- to avoid issues with wildcard selections in the future. +CREATE OR REPLACE VIEW active_test_runs AS +SELECT + tr.id, + tr.test_type_id, + tr.plugin_id, + tr.host, + tr.port, + tr.db_name, + tr.name, + tr.description, + tr.created_at, + tr.start_time, + tr.end_time, + tr.status, + tr.config, + tt.code as test_type_code, + p.name as plugin_name, + EXTRACT(EPOCH FROM (COALESCE(tr.end_time, now()) - tr.start_time)) as runtime_seconds +FROM test_run tr +JOIN test_type tt ON tr.test_type_id = tt.id +JOIN plugin p ON tr.plugin_id = p.id +WHERE tr.status IN ('running', 'pending'); diff --git a/plugins/bulk-load/Makefile b/plugins/bulk-load/Makefile index 3e9bcf2..fce71f5 100644 --- a/plugins/bulk-load/Makefile +++ b/plugins/bulk-load/Makefile @@ -17,7 +17,7 @@ all: plugin # Build plugin as shared library plugin: $(PLUGIN_SO) -$(PLUGIN_SO): $(GO_FILES) go.mod go.sum +$(PLUGIN_SO): $(GO_FILES) @echo "Building bulk load plugin..." @mkdir -p ../../build/plugins CGO_ENABLED=$(CGO_ENABLED) go build -buildmode=plugin -o $(PLUGIN_SO) . diff --git a/plugins/bulk-load/go.mod b/plugins/bulk-load/go.mod deleted file mode 100644 index cfa42cb..0000000 --- a/plugins/bulk-load/go.mod +++ /dev/null @@ -1,10 +0,0 @@ -module github.com/elchinoo/stormdb/plugins/bulk-load - -go 1.21 - -require ( - github.com/elchinoo/stormdb/core v0.0.0 - github.com/lib/pq v1.10.9 -) - -replace github.com/elchinoo/stormdb/core => ../../core diff --git a/plugins/bulk-load/go.sum b/plugins/bulk-load/go.sum deleted file mode 100644 index aeddeae..0000000 --- a/plugins/bulk-load/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= diff --git a/plugins/bulk-load/plugin.go b/plugins/bulk-load/plugin.go index 6529614..ada6296 100644 --- a/plugins/bulk-load/plugin.go +++ b/plugins/bulk-load/plugin.go @@ -40,6 +40,7 @@ type BulkLoadConfig struct { SSLMode string `json:"ssl_mode" yaml:"ssl_mode"` // Test configuration + Rebuild bool `json:"rebuild" yaml:"rebuild"` // Force drop/recreate of database BatchSizes []int `json:"batch_sizes" yaml:"batch_sizes"` // Batch sizes to test [1, 1000, 10000, 50000] Connections int `json:"connections" yaml:"connections"` // Fixed number of connections (default: 20) Duration time.Duration `json:"duration" yaml:"duration"` // Duration per batch size @@ -112,6 +113,7 @@ func (p *BulkLoadPlugin) Metadata() core.PluginMetadata { "username": {"type": "string", "description": "Database username"}, "password": {"type": "string", "description": "Database password"}, "ssl_mode": {"type": "string", "enum": ["disable", "require", "verify-ca", "verify-full"], "default": "disable"}, + "rebuild": {"type": "boolean", "default": false, "description": "Force drop and recreate of the test database"}, "batch_sizes": {"type": "array", "items": {"type": "integer", "minimum": 1}, "default": [1, 1000, 10000, 50000]}, "connections": {"type": "integer", "minimum": 1, "maximum": 1000, "default": 20}, "duration": {"type": "string", "pattern": "^[0-9]+[smh]$", "default": "5m"}, @@ -216,6 +218,11 @@ func (p *BulkLoadPlugin) Validate(config map[string]interface{}) error { bulkConfig.Verbose = verboseBool } } + if rebuild, ok := config["rebuild"]; ok { + if rebuildBool, ok := rebuild.(bool); ok { + bulkConfig.Rebuild = rebuildBool + } + } // Parse batch sizes if batchSizes, ok := config["batch_sizes"]; ok { @@ -367,6 +374,18 @@ func (p *BulkLoadPlugin) Execute(ctx context.Context, config map[string]interfac } defer p.db.Close() + // Rebuild database if requested + if p.config.Rebuild { + if err := p.rebuildDatabase(); err != nil { + return fmt.Errorf("failed to rebuild database: %w", err) + } + // Reconnect after rebuild + if err := p.connectDatabase(); err != nil { + return fmt.Errorf("failed to reconnect to database after rebuild: %w", err) + } + defer p.db.Close() + } + // Create/prepare test table if err := p.setupTestTable(); err != nil { return fmt.Errorf("failed to setup test table: %w", err) @@ -441,6 +460,45 @@ func (p *BulkLoadPlugin) Cleanup(ctx context.Context) error { // Helper methods +func (p *BulkLoadPlugin) rebuildDatabase() error { + p.logger.Info("Rebuilding database", core.Field{Key: "database", Value: p.config.Database}) + + // Connect to the default 'postgres' database to drop the target database + defaultConnStr := fmt.Sprintf("host=%s port=%d dbname=postgres user=%s password=%s sslmode=%s", + p.config.Host, p.config.Port, p.config.Username, p.config.Password, p.config.SSLMode) + + db, err := sql.Open("postgres", defaultConnStr) + if err != nil { + return fmt.Errorf("failed to connect to 'postgres' db for rebuild: %w", err) + } + defer db.Close() + + // Terminate existing connections + terminateSQL := ` + SELECT pg_terminate_backend(pg_stat_activity.pid) + FROM pg_stat_activity + WHERE pg_stat_activity.datname = $1 AND pid <> pg_backend_pid()` + if _, err := db.Exec(terminateSQL, p.config.Database); err != nil { + p.logger.Warn("Could not terminate existing connections, proceeding anyway", core.Field{Key: "error", Value: err.Error()}) + } + + // Drop database + dropSQL := fmt.Sprintf("DROP DATABASE IF EXISTS %s", p.config.Database) + if _, err := db.Exec(dropSQL); err != nil { + return fmt.Errorf("failed to drop database: %w", err) + } + p.logger.Info("Dropped database", core.Field{Key: "database", Value: p.config.Database}) + + // Create database + createSQL := fmt.Sprintf("CREATE DATABASE %s", p.config.Database) + if _, err := db.Exec(createSQL); err != nil { + return fmt.Errorf("failed to create database: %w", err) + } + p.logger.Info("Created database", core.Field{Key: "database", Value: p.config.Database}) + + return nil +} + // connectDatabase establishes database connection func (p *BulkLoadPlugin) connectDatabase() error { connStr := fmt.Sprintf("host=%s port=%d dbname=%s user=%s password=%s sslmode=%s", diff --git a/plugins/tpcc-scalability/Makefile b/plugins/tpcc-scalability/Makefile index b2a4f34..31d9f60 100644 --- a/plugins/tpcc-scalability/Makefile +++ b/plugins/tpcc-scalability/Makefile @@ -24,7 +24,7 @@ deps: $(GOMOD) tidy # Build the plugin as a shared library -plugin: deps +plugin: @echo "Building $(PLUGIN_NAME) plugin..." @mkdir -p $(PLUGIN_DIR) $(GOBUILD) -buildmode=plugin -o $(PLUGIN_DIR)/$(PLUGIN_SO) . diff --git a/plugins/tpcc-scalability/go.mod b/plugins/tpcc-scalability/go.mod deleted file mode 100644 index 33ce29c..0000000 --- a/plugins/tpcc-scalability/go.mod +++ /dev/null @@ -1,10 +0,0 @@ -module github.com/elchinoo/stormdb/plugins/tpcc-scalability - -go 1.21 - -require ( - github.com/elchinoo/stormdb/core v0.0.0 - github.com/lib/pq v1.10.9 -) - -replace github.com/elchinoo/stormdb/core => ../../core diff --git a/plugins/tpcc-scalability/go.sum b/plugins/tpcc-scalability/go.sum deleted file mode 100644 index aeddeae..0000000 --- a/plugins/tpcc-scalability/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= diff --git a/plugins/tpcc-scalability/plugin.go b/plugins/tpcc-scalability/plugin.go index bb3e3bd..09fd4a7 100644 --- a/plugins/tpcc-scalability/plugin.go +++ b/plugins/tpcc-scalability/plugin.go @@ -40,6 +40,7 @@ type TPCCConfig struct { SSLMode string `json:"ssl_mode" yaml:"ssl_mode"` // Test configuration + Rebuild bool `json:"rebuild" yaml:"rebuild"` // Force drop/recreate of database Scale int `json:"scale" yaml:"scale"` // TPC-C scale factor (warehouses) Connections []int `json:"connections" yaml:"connections"` // Connection counts to test [48, 96, 192, 256] Duration time.Duration `json:"duration" yaml:"duration"` // Duration per connection level @@ -191,6 +192,13 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) core.Field{Key: "connection_levels", Value: len(p.config.Connections)}, ) + // Rebuild database if requested + if p.config.Rebuild { + if err := p.rebuildDatabase(ctx); err != nil { + return fmt.Errorf("failed to rebuild database: %w", err) + } + } + // Connect to database if err := p.connectDB(); err != nil { return fmt.Errorf("failed to connect to database: %w", err) @@ -207,31 +215,8 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) return fmt.Errorf("failed to populate test data: %w", err) } - // Create test run - testRun := &core.TestRun{ - TestTypeID: 1, // Will be resolved by storage layer - PluginID: 1, // Will be resolved by storage layer - PluginVer: "1.0.0", - Host: p.config.Host, - Port: p.config.Port, - DBName: p.config.Database, - Name: fmt.Sprintf("TPC-C Scalability Test - Scale %d", p.config.Scale), - Description: fmt.Sprintf("Incremental connection test: %v", p.config.Connections), - Status: core.StatusRunning, - Config: config, - CreatedAt: time.Now(), - } - - runID, err := p.core.Storage.CreateTestRun(ctx, testRun) - if err != nil { - return fmt.Errorf("failed to create test run: %w", err) - } - - p.logger.Info("test run created", - core.Field{Key: "run_id", Value: runID}, - ) - - // Execute test for each connection level + // The test run is created by the scheduler. The plugin's job is to execute + // the test logic for each connection level. for i, connCount := range p.config.Connections { p.logger.Info("starting connection level test", core.Field{Key: "level", Value: i + 1}, @@ -241,7 +226,13 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) p.metrics.CurrentConnections = connCount p.metrics.TestPhase = fmt.Sprintf("Level %d/%d", i+1, len(p.config.Connections)) - if err := p.runConnectionLevel(ctx, runID, connCount); err != nil { + // We need a run ID to store results, but the plugin doesn't own the run. + // This is a temporary solution. In a real scenario, the run ID would be + // passed into the Execute method via the context. + // For now, we'll use a placeholder. + const placeholderRunID = 1 + + if err := p.runConnectionLevel(ctx, placeholderRunID, connCount); err != nil { p.logger.Error("connection level test failed", core.Field{Key: "level", Value: i + 1}, core.Field{Key: "connections", Value: connCount}, @@ -255,13 +246,7 @@ func (p *TPCCPlugin) Execute(ctx context.Context, config map[string]interface{}) } } - // Update test run status - if err := p.core.Storage.UpdateTestRunStatus(ctx, runID, core.StatusSucceeded); err != nil { - p.logger.Warn("failed to update test run status", core.Field{Key: "error", Value: err.Error()}) - } - p.logger.Info("TPC-C scalability test completed", - core.Field{Key: "run_id", Value: runID}, core.Field{Key: "duration", Value: time.Since(p.testStarted)}, ) @@ -292,6 +277,45 @@ func (p *TPCCPlugin) Cleanup(ctx context.Context) error { // Helper methods +func (p *TPCCPlugin) rebuildDatabase(ctx context.Context) error { + p.logger.Info("Rebuilding database", core.Field{Key: "database", Value: p.config.Database}) + + // Connect to the default 'postgres' database to drop the target database + defaultConnStr := fmt.Sprintf("host=%s port=%d dbname=postgres user=%s password=%s sslmode=%s", + p.config.Host, p.config.Port, p.config.Username, p.config.Password, p.config.SSLMode) + + db, err := sql.Open("postgres", defaultConnStr) + if err != nil { + return fmt.Errorf("failed to connect to 'postgres' db for rebuild: %w", err) + } + defer db.Close() + + // Terminate existing connections + terminateSQL := ` + SELECT pg_terminate_backend(pg_stat_activity.pid) + FROM pg_stat_activity + WHERE pg_stat_activity.datname = $1 AND pid <> pg_backend_pid()` + if _, err := db.ExecContext(ctx, terminateSQL, p.config.Database); err != nil { + p.logger.Warn("Could not terminate existing connections, proceeding anyway", core.Field{Key: "error", Value: err.Error()}) + } + + // Drop database + dropSQL := fmt.Sprintf("DROP DATABASE IF EXISTS %s", p.config.Database) + if _, err := db.ExecContext(ctx, dropSQL); err != nil { + return fmt.Errorf("failed to drop database: %w", err) + } + p.logger.Info("Dropped database", core.Field{Key: "database", Value: p.config.Database}) + + // Create database + createSQL := fmt.Sprintf("CREATE DATABASE %s", p.config.Database) + if _, err := db.ExecContext(ctx, createSQL); err != nil { + return fmt.Errorf("failed to create database: %w", err) + } + p.logger.Info("Created database", core.Field{Key: "database", Value: p.config.Database}) + + return nil +} + func (p *TPCCPlugin) connectDB() error { connStr := fmt.Sprintf("host=%s port=%d dbname=%s user=%s password=%s sslmode=%s", p.config.Host, p.config.Port, p.config.Database, @@ -682,7 +706,11 @@ func (p *TPCCPlugin) worker(ctx context.Context, workerID int, stopTime time.Tim StartTime: start, EndTime: start.Add(latency), Value: float64(latency.Nanoseconds()), - Tags: fmt.Sprintf(`{"worker_id":%d,"tx_type":"%s","connections":%d}`, workerID, txType, p.metrics.CurrentConnections), + Tags: map[string]interface{}{ + "worker_id": workerID, + "tx_type": txType, + "connections": p.metrics.CurrentConnections, + }, } select { @@ -939,6 +967,7 @@ func getConfigSchema() string { "username": {"type": "string", "default": "postgres"}, "password": {"type": "string", "default": "postgres"}, "ssl_mode": {"type": "string", "default": "disable"}, + "rebuild": {"type": "boolean", "default": false, "description": "Force drop and recreate of the test database"}, "scale": {"type": "integer", "minimum": 1, "default": 10}, "connections": { "type": "array", diff --git a/run_tests.sh b/run_tests.sh index 9ef6b75..51c6558 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -5,7 +5,9 @@ set -e echo "Running StormDB v2 Unit Tests..." -cd /Users/charly.batista/proj/pgstorm/stormdb/v2 +# Get the directory of the script +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +cd "$SCRIPT_DIR" # Run all unit tests echo "Running unit tests..."