Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ logging:
output: "stdout" # "stdout", "stderr", or file path

scheduler:
enabled: true
worker_pool_size: 10

plugin_dirs:
Expand Down
30 changes: 23 additions & 7 deletions core/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ func (s *Server) handleListPlugins(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleGetPlugin(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
name := vars["name"]
version := r.URL.Query().Get("version") // Get version from query param

plugin, err := s.coreServices.Plugin.GetPlugin(name)
plugin, err := s.coreServices.Plugin.GetPlugin(name, version)
if err != nil {
s.writeErrorResponse(w, http.StatusNotFound, "plugin not found", err)
return
Expand Down Expand Up @@ -205,10 +206,12 @@ func (s *Server) handleReloadPlugins(w http.ResponseWriter, r *http.Request) {
// Create test run endpoint
func (s *Server) handleCreateTestRun(w http.ResponseWriter, r *http.Request) {
var req struct {
PluginName string `json:"plugin_name"`
Name string `json:"name"`
Description string `json:"description"`
Config map[string]interface{} `json:"config"`
PluginName string `json:"plugin_name"`
PluginVersion string `json:"plugin_version"` // Optional
Name string `json:"name"`
Description string `json:"description"`
Config map[string]interface{} `json:"config"`
Rebuild bool `json:"rebuild"`
}

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
Expand All @@ -222,9 +225,22 @@ func (s *Server) handleCreateTestRun(w http.ResponseWriter, r *http.Request) {
return
}

// Schedule test
// Get the specific plugin version if provided, otherwise get the latest
plugin, err := s.coreServices.Plugin.GetPlugin(req.PluginName, req.PluginVersion)
if err != nil {
s.writeErrorResponse(w, http.StatusBadRequest, "plugin not found", err)
return
}

// Add rebuild flag to config
if req.Config == nil {
req.Config = make(map[string]interface{})
}
req.Config["rebuild"] = req.Rebuild

// Schedule test with the specific plugin instance
ctx := r.Context()
runID, err := s.coreServices.Scheduler.ScheduleTest(ctx, req.PluginName, req.Config)
runID, err := s.coreServices.Scheduler.ScheduleTest(ctx, plugin, req.Config)
if err != nil {
s.writeErrorResponse(w, http.StatusInternalServerError, "failed to schedule test", err)
return
Expand Down
16 changes: 0 additions & 16 deletions core/go.mod

This file was deleted.

82 changes: 73 additions & 9 deletions core/plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"plugin"
"strconv"
"strings"
"sync"

Expand Down Expand Up @@ -94,12 +95,28 @@ func (m *Manager) LoadPlugin(path string) (core.Plugin, error) {

// Register in storage
if m.storage != nil {
_, err := m.storage.RegisterPlugin(context.Background(), metadata)
pluginID, err := m.storage.RegisterPlugin(context.Background(), metadata)
if err != nil {
m.logger.Warn("failed to register plugin in storage",
core.Field{Key: "plugin", Value: metadata.Name},
core.Field{Key: "error", Value: err.Error()},
)
} else {
// After registering the plugin, also register its declared test types
for _, testTypeCode := range metadata.TestTypes {
// A more robust implementation might fetch name/description from a central registry
// For now, we use the code as the name and provide a default description.
_, err := m.storage.RegisterTestType(context.Background(), testTypeCode, testTypeCode, "Auto-registered test type for plugin "+metadata.Name)
if err != nil {
m.logger.Warn("failed to auto-register test type for plugin",
core.Field{Key: "plugin", Value: metadata.Name},
core.Field{Key: "test_type", Value: testTypeCode},
core.Field{Key: "error", Value: err.Error()},
)
}
}
// IMPORTANT: Update the in-memory metadata with the ID from the database
metadata.ID = pluginID
}
}

Expand Down Expand Up @@ -204,24 +221,45 @@ func (m *Manager) UnloadPlugins() error {
return nil
}

// GetPlugin retrieves a loaded plugin by name (latest version)
func (m *Manager) GetPlugin(name string) (core.Plugin, error) {
// GetPlugin retrieves a loaded plugin by name and optional version.
// If version is empty, it returns the latest available version.
func (m *Manager) GetPlugin(name string, version string) (core.Plugin, error) {
m.mu.RLock()
defer m.mu.RUnlock()

// Look for exact match first
if plugin, exists := m.plugins[name]; exists {
return plugin, nil
// If version is specified, look for an exact match.
if version != "" {
pluginKey := fmt.Sprintf("%s@%s", name, version)
if plugin, exists := m.plugins[pluginKey]; exists {
return plugin, nil
}
return nil, fmt.Errorf("plugin %s with version %s not found", name, version)
}

// Look for name@version pattern
// If version is not specified, find the latest version.
var latestPlugin core.Plugin
var latestVersion string

for key, plugin := range m.plugins {
if strings.HasPrefix(key, name+"@") {
return plugin, nil
parts := strings.Split(key, "@")
if len(parts) != 2 {
continue // Should not happen with our key format
}
currentVersion := parts[1]

if latestPlugin == nil || isNewerVersion(currentVersion, latestVersion) {
latestVersion = currentVersion
latestPlugin = plugin
}
}
}

return nil, fmt.Errorf("plugin %s not found", name)
if latestPlugin == nil {
return nil, fmt.Errorf("plugin %s not found", name)
}

return latestPlugin, nil
}

// GetLoadedPlugins returns all loaded plugins
Expand Down Expand Up @@ -416,6 +454,32 @@ func (m *Manager) calculateSHA256(filePath string) (string, error) {
return fmt.Sprintf("%x", hash.Sum(nil)), nil
}

// isNewerVersion compares two semantic version strings.
// It returns true if v1 is newer than v2.
// This is a simplified implementation for formats like X.Y.Z.
func isNewerVersion(v1, v2 string) bool {
parts1 := strings.Split(v1, ".")
parts2 := strings.Split(v2, ".")

for i := 0; i < len(parts1) && i < len(parts2); i++ {
// For simplicity, we ignore non-numeric parts for now.
// A proper implementation would use a semantic versioning library.
n1, _ := strconv.Atoi(parts1[i])
n2, _ := strconv.Atoi(parts2[i])

if n1 > n2 {
return true
}
if n1 < n2 {
return false
}
}

// If all parts are equal, the one with more parts is newer
// e.g., 1.0.1 is newer than 1.0
return len(parts1) > len(parts2)
}

// isValidVersion performs basic semantic version validation
func isValidVersion(version string) bool {
// Basic check for X.Y.Z format
Expand Down
122 changes: 88 additions & 34 deletions core/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ type scheduledTask struct {

// TestExecutionTask implements the Task interface for test execution
type TestExecutionTask struct {
id string
pluginName string
config map[string]interface{}
storage core.StorageManager
plugin core.PluginManager
logger core.Logger
id string
plugin core.Plugin
config map[string]interface{}
storage core.StorageManager
logger core.Logger
runID int64
}

// NewManager creates a new scheduler manager
Expand Down Expand Up @@ -243,23 +243,43 @@ func (m *Manager) CancelTask(taskID string) error {
}

// ScheduleTest schedules a test execution
func (m *Manager) ScheduleTest(ctx context.Context, pluginName string, config map[string]interface{}) (int64, error) {
// Get plugin
plugin, err := m.pluginManager.GetPlugin(pluginName)
func (m *Manager) ScheduleTest(ctx context.Context, plugin core.Plugin, config map[string]interface{}) (int64, error) {
pluginMeta := plugin.Metadata()

// Ensure plugin has an ID from the database.
if pluginMeta.ID == 0 {
// If the ID is missing, it means the plugin wasn't registered correctly.
// Attempt to look it up from storage.
p, err := m.storage.GetPlugin(ctx, pluginMeta.Name, pluginMeta.Version)
if err != nil {
return 0, fmt.Errorf("plugin '%s v%s' is not registered in the database and could not be found: %w", pluginMeta.Name, pluginMeta.Version, err)
}
pluginMeta.ID = p.ID
}

// Get the primary test type for the plugin
if len(pluginMeta.TestTypes) == 0 {
return 0, fmt.Errorf("plugin %s has no declared test types", pluginMeta.Name)
}
primaryTestTypeCode := pluginMeta.TestTypes[0]

// Look up the test type ID from storage
testType, err := m.storage.GetTestType(ctx, primaryTestTypeCode)
if err != nil {
return 0, fmt.Errorf("plugin %s not found: %w", pluginName, err)
return 0, fmt.Errorf("failed to get test type '%s': %w. It should have been auto-registered when the plugin was loaded", primaryTestTypeCode, err)
}

// Create test run record
testRun := &core.TestRun{
Name: fmt.Sprintf("Test run %s", time.Now().Format("2006-01-02 15:04:05")),
Description: fmt.Sprintf("Scheduled test execution for plugin %s", pluginName),
Name: fmt.Sprintf("Test run for %s v%s", pluginMeta.Name, pluginMeta.Version),
Description: fmt.Sprintf("Scheduled test execution for plugin %s", pluginMeta.Name),
Status: core.StatusPending,
Config: config,
PluginVer: plugin.Metadata().Version,
Host: "localhost", // Default, should come from config
Port: 5432, // Default, should come from config
DBName: "test", // Default, should come from config
PluginID: pluginMeta.ID, // Use the now-guaranteed-to-be-correct ID
TestTypeID: testType.ID, // Use the looked-up ID
Host: "localhost", // Default, should come from config
Port: 5432, // Default, should come from config
DBName: "test", // Default, should come from config
}

runID, err := m.storage.CreateTestRun(ctx, testRun)
Expand All @@ -269,22 +289,31 @@ func (m *Manager) ScheduleTest(ctx context.Context, pluginName string, config ma

// Create execution task
task := &TestExecutionTask{
id: fmt.Sprintf("test-%d", runID),
pluginName: pluginName,
config: config,
storage: m.storage,
plugin: m.pluginManager,
logger: m.logger,
id: fmt.Sprintf("test-%d", runID),
plugin: plugin,
config: config,
storage: m.storage,
logger: m.logger,
runID: runID,
}

// Submit for execution
if err := m.SubmitTask(task); err != nil {
// Rollback test run creation
// It's important to handle this to avoid orphaned test_run records
if rollbackErr := m.storage.UpdateTestRunStatus(ctx, runID, core.StatusFailed); rollbackErr != nil {
m.logger.Error("failed to rollback test run status",
core.Field{Key: "test_run_id", Value: runID},
core.Field{Key: "error", Value: rollbackErr.Error()},
)
}
return 0, fmt.Errorf("failed to submit test task: %w", err)
}

m.logger.Info("test scheduled",
core.Field{Key: "test_run_id", Value: runID},
core.Field{Key: "plugin", Value: pluginName},
core.Field{Key: "plugin", Value: plugin.Metadata().Name},
core.Field{Key: "version", Value: plugin.Metadata().Version},
)

return runID, nil
Expand Down Expand Up @@ -452,23 +481,48 @@ func (t *TestExecutionTask) Type() string {
func (t *TestExecutionTask) Execute(ctx context.Context) error {
t.logger.Info("executing test",
core.Field{Key: "task_id", Value: t.id},
core.Field{Key: "plugin", Value: t.pluginName},
core.Field{Key: "plugin", Value: t.plugin.Metadata().Name},
core.Field{Key: "run_id", Value: t.runID},
)

// Get plugin
plugin, err := t.plugin.GetPlugin(t.pluginName)
if err != nil {
return fmt.Errorf("failed to get plugin: %w", err)
// Update status to running
if err := t.storage.UpdateTestRunStatus(ctx, t.runID, core.StatusRunning); err != nil {
t.logger.Error("failed to update test run status to running",
core.Field{Key: "run_id", Value: t.runID},
core.Field{Key: "error", Value: err.Error()},
)
return fmt.Errorf("failed to update test run status: %w", err)
}

// Execute plugin
if err := plugin.Execute(ctx, t.config); err != nil {
return fmt.Errorf("plugin execution failed: %w", err)
err := t.plugin.Execute(ctx, t.config)

// Update status based on execution result
var finalStatus core.ServiceStatus
if err != nil {
finalStatus = core.StatusFailed
t.logger.Error("plugin execution failed",
core.Field{Key: "task_id", Value: t.id},
core.Field{Key: "error", Value: err.Error()},
)
} else {
finalStatus = core.StatusSucceeded
t.logger.Info("test execution completed successfully",
core.Field{Key: "task_id", Value: t.id},
)
}

t.logger.Info("test execution completed",
core.Field{Key: "task_id", Value: t.id},
)
if statusErr := t.storage.UpdateTestRunStatus(ctx, t.runID, finalStatus); statusErr != nil {
t.logger.Error("failed to update final test run status",
core.Field{Key: "run_id", Value: t.runID},
core.Field{Key: "error", Value: statusErr.Error()},
)
// Return original error if it exists, otherwise return status update error
if err != nil {
return fmt.Errorf("plugin execution failed (%v) and status update failed (%v)", err, statusErr)
}
return fmt.Errorf("failed to update test run status: %w", statusErr)
}

return nil
return err
}
Loading
Loading