diff --git a/README.md b/README.md index 40c827c..ef5c156 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,118 @@ make run-server # Endpoints: /health, /api/users, /api/stats, /api/echo ``` +## Script Engine (Code-Based Load Testing) + +Write load tests in your preferred language — like k6, but polyglot with chaos patterns built-in. + +```bash +kar script test.star # Starlark (Python-like) +kar script test.js # JavaScript +kar script test.py # Python +kar script test.rb # Ruby +kar script test.star --vus 50 --duration 5m # Override VUs and duration +kar script test.star --dashboard # Enable real-time web dashboard +``` + +### Starlark (.star) + +```python +scenario( + name = "api-load-test", + pattern = chaos(preset = "aggressive", spike_factor = 3.0), + vus = ramp([ + stage("30s", 10), # Ramp to 10 VUs over 30s + stage("2m", 50), # Ramp to 50 VUs over 2m + stage("30s", 0), # Ramp down + ]), + thresholds = { + "http_req_duration{p95}": "< 500ms", + "http_req_failed": "< 0.05", + }, +) + +def setup(): + resp = http.post("http://api.example.com/auth", json={"user": "test"}) + return {"token": resp.json()["token"]} + +def default(data): + headers = {"Authorization": "Bearer " + data["token"]} + resp = http.get("http://api.example.com/products", headers=headers) + check(resp, { + "status 200": lambda r: r.status == 200, + "has items": lambda r: len(r.json()) > 0, + }) + sleep(think_time("1s", "3s")) +``` + +### JavaScript (.js) + +```javascript +scenario({ + name: "api-load-test", + pattern: chaos({ preset: "moderate" }), + thresholds: { + "http_req_duration{p95}": "< 500ms", + }, +}); + +function run(data) { + var resp = http.get("http://api.example.com/health"); + check(resp, { + "status 200": function(r) { return r.status === 200; }, + }); +} +``` + +### Python (.py) + +```python +from kar98k import scenario, chaos, http, check, sleep, think_time + +scenario(name="api-load-test", pattern=chaos(preset="moderate")) + +def default(data): + resp = http.get("http://api.example.com/health") + check(resp, { + "status 200": lambda r: r.status == 200, + "has status": lambda r: "status" in r.json(), + }) + sleep(think_time("1s", "3s")) +``` + +### Ruby (.rb) + +```ruby +require_relative "../sdk/ruby/kar98k" + +scenario name: "api-load-test", pattern: chaos(preset: "moderate") + +def default(data) + resp = Http.get("http://api.example.com/health") + check resp, + "status 200" => ->(r) { r.status == 200 } + sleep_dur think_time("1s", "3s") +end +``` + +### Real-Time Dashboard + +![kar98k Dashboard](./assets/dashboard.png) + +Enable with `--dashboard`: + +```bash +kar script test.star --vus 20 --duration 5m --dashboard +# Dashboard: http://localhost:8888 +``` + +Opens a web UI showing: +- Live RPS and latency graphs +- P95/P99 latency tracking +- Error rate and status codes +- Check pass/fail rates +- VU count and iteration progress + ## Commands | Command | Description | diff --git a/assets/dashboard.png b/assets/dashboard.png new file mode 100644 index 0000000..c3b1c8b Binary files /dev/null and b/assets/dashboard.png differ diff --git a/examples/basic_test.js b/examples/basic_test.js new file mode 100644 index 0000000..0f69167 --- /dev/null +++ b/examples/basic_test.js @@ -0,0 +1,18 @@ +// basic_test.js - Basic load test example (JavaScript) + +scenario({ + name: "basic-health-check", + pattern: chaos({ preset: "gentle" }), + thresholds: { + "http_req_duration{p95}": "< 500ms", + "http_req_failed": "< 0.05", + }, +}); + +// Main iteration function — called per VU +function run(data) { + var resp = http.get("http://localhost:8080/health"); + check(resp, { + "status is 200": function(r) { return r.status === 200; }, + }); +} diff --git a/examples/basic_test.py b/examples/basic_test.py new file mode 100644 index 0000000..1d2b4ec --- /dev/null +++ b/examples/basic_test.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +"""k6-style load test written in Python.""" + +import sys, os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "sdk", "python")) + +from kar98k import scenario, chaos, http, check, sleep, think_time + +# Configure scenario +scenario( + name="python-api-test", + pattern=chaos(preset="moderate", spike_factor=2.5), + thresholds={ + "http_req_duration{p95}": "< 500ms", + "http_req_failed": "< 0.05", + }, +) + +# Setup — runs once +def setup(): + return {"session": "py-session-abc"} + +# Main iteration — runs per VU +def default(data): + # GET health + resp = http.get("http://localhost:8080/health") + check(resp, { + "health status 200": lambda r: r.status == 200, + "has status field": lambda r: "status" in r.json(), + }) + + sleep(think_time("100ms", "500ms")) + + # GET users + resp = http.get("http://localhost:8080/api/users") + check(resp, { + "users status 200": lambda r: r.status == 200, + }) + +# Teardown — runs once at end +def teardown(data): + pass diff --git a/examples/basic_test.rb b/examples/basic_test.rb new file mode 100644 index 0000000..d116d73 --- /dev/null +++ b/examples/basic_test.rb @@ -0,0 +1,33 @@ +#!/usr/bin/env ruby +# k6-style load test written in Ruby. + +require_relative "../sdk/ruby/kar98k" + +scenario name: "ruby-api-test", + pattern: chaos(preset: "moderate", spike_factor: 2.0), + thresholds: { + "http_req_duration{p95}" => "< 500ms", + "http_req_failed" => "< 0.05" + } + +def setup + { "session" => "rb-session-xyz" } +end + +def default(data) + # GET health + resp = Http.get("http://localhost:8080/health") + check resp, + "health status 200" => ->(r) { r.status == 200 }, + "has status field" => ->(r) { r.json&.key?("status") } + + sleep_dur think_time("100ms", "500ms") + + # GET users + resp = Http.get("http://localhost:8080/api/users") + check resp, + "users status 200" => ->(r) { r.status == 200 } +end + +def teardown(data) +end diff --git a/examples/basic_test.star b/examples/basic_test.star new file mode 100644 index 0000000..e52d7e0 --- /dev/null +++ b/examples/basic_test.star @@ -0,0 +1,16 @@ +# basic_test.star - Basic load test example (Starlark) + +scenario( + name = "basic-health-check", + pattern = chaos(preset = "gentle"), + thresholds = { + "http_req_duration{p95}": "< 500ms", + "http_req_failed": "< 0.05", + }, +) + +def default(data): + resp = http.get("http://localhost:8080/health") + check(resp, { + "status is 200": lambda r: r.status == 200, + }) diff --git a/examples/checkout_test.star b/examples/checkout_test.star new file mode 100644 index 0000000..93c6cbc --- /dev/null +++ b/examples/checkout_test.star @@ -0,0 +1,47 @@ +# checkout_test.star - Multi-step user flow with chaos patterns + +scenario( + name = "checkout-flow", + pattern = chaos( + preset = "aggressive", + spike_factor = 3.0, + ), + vus = ramp([ + stage("10s", 5), + stage("30s", 20), + stage("10s", 0), + ]), + thresholds = { + "http_req_duration{p95}": "< 1000ms", + "http_req_failed": "< 0.1", + "checks": "> 0.9", + }, +) + +def setup(): + resp = http.post("http://localhost:8080/api/echo", json = { + "action": "auth", + "user": "loadtest", + }) + return {"session": "test-session-id"} + +def default(data): + headers = {"X-Session": data["session"]} + + # Step 1: List products + resp = http.get("http://localhost:8080/api/users", headers = headers) + check(resp, { + "list ok": lambda r: r.status == 200, + }) + + # Think time — compresses during chaos spikes + sleep(think_time("500ms", "2s")) + + # Step 2: Get stats + resp = http.get("http://localhost:8080/api/stats", headers = headers) + check(resp, { + "stats ok": lambda r: r.status == 200, + }) + +def teardown(data): + http.post("http://localhost:8080/api/echo", json = {"action": "logout"}) diff --git a/examples/echoserver/main.go b/examples/echoserver/main.go index 74935d5..b870414 100644 --- a/examples/echoserver/main.go +++ b/examples/echoserver/main.go @@ -112,9 +112,24 @@ func handleHealth(w http.ResponseWriter, r *http.Request) { atomic.AddInt64(&stats.TotalRequests, 1) atomic.AddInt64(&stats.GetRequests, 1) - // Simulate occasional slow responses + // 10% chance of failure: 500 error + 1~3s delay + if rand.Float32() < 0.10 { + delay := time.Duration(1000+rand.Intn(2000)) * time.Millisecond + time.Sleep(delay) + atomic.AddInt64(&stats.Errors, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(map[string]interface{}{ + "status": "error", + "error": "internal server error", + "delay": delay.String(), + }) + return + } + + // 5% chance of slow response (no error) if rand.Float32() < 0.05 { - time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + time.Sleep(time.Duration(200+rand.Intn(500)) * time.Millisecond) } w.Header().Set("Content-Type", "application/json") @@ -171,7 +186,35 @@ func handleUserByID(w http.ResponseWriter, r *http.Request) { } } +func maybeFail(w http.ResponseWriter) bool { + roll := rand.Float32() + // 10% → 500 with 1~3s delay + if roll < 0.10 { + delay := time.Duration(1000+rand.Intn(2000)) * time.Millisecond + time.Sleep(delay) + atomic.AddInt64(&stats.Errors, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(map[string]string{"error": "internal server error"}) + return true + } + // 5% → 400 + if roll < 0.15 { + atomic.AddInt64(&stats.Errors, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(map[string]string{"error": "bad request"}) + return true + } + // 3% → slow (no error) + if roll < 0.18 { + time.Sleep(time.Duration(200+rand.Intn(500)) * time.Millisecond) + } + return false +} + func listUsers(w http.ResponseWriter, r *http.Request) { + if maybeFail(w) { return } store.mu.RLock() defer store.mu.RUnlock() @@ -285,6 +328,8 @@ func handleEcho(w http.ResponseWriter, r *http.Request) { atomic.AddInt64(&stats.TotalRequests, 1) atomic.AddInt64(&stats.PostRequests, 1) + if maybeFail(w) { return } + w.Header().Set("Content-Type", "application/json") var body interface{} diff --git a/go.mod b/go.mod index 1fdca64..38015eb 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,11 @@ require ( github.com/clipperhouse/displaywidth v0.9.0 // indirect github.com/clipperhouse/stringish v0.1.1 // indirect github.com/clipperhouse/uax29/v2 v2.5.0 // indirect + github.com/dlclark/regexp2 v1.11.4 // indirect + github.com/dop251/goja v0.0.0-20260311135729-065cd970411c // indirect github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect + github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect + github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/lucasb-eyer/go-colorful v1.3.0 // indirect @@ -44,6 +48,7 @@ require ( github.com/rivo/uniseg v0.4.7 // indirect github.com/spf13/pflag v1.0.9 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect + go.starlark.net v0.0.0-20260326113308-fadfc96def35 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.35.0 // indirect diff --git a/go.sum b/go.sum index e87babe..34e6566 100644 --- a/go.sum +++ b/go.sum @@ -30,16 +30,24 @@ github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6N github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dlclark/regexp2 v1.11.4 h1:rPYF9/LECdNymJufQKmri9gV604RvvABwgOA8un7yAo= +github.com/dlclark/regexp2 v1.11.4/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dop251/goja v0.0.0-20260311135729-065cd970411c h1:OcLmPfx1T1RmZVHHFwWMPaZDdRf0DBMZOFMVWJa7Pdk= +github.com/dop251/goja v0.0.0-20260311135729-065cd970411c/go.mod h1:MxLav0peU43GgvwVgNbLAj1s/bSGboKkhuULvq/7hx4= github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4= github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U= +github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -103,6 +111,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.starlark.net v0.0.0-20260326113308-fadfc96def35 h1:VYAqieSOJNxBDX8KJneTAwvdf4J4zRDE2u+UFXtt9h4= +go.starlark.net v0.0.0-20260326113308-fadfc96def35/go.mod h1:Iue6g6iirlfLoVi/DYCi5/x0h/bAOuWF3dULTKpt2Vo= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= diff --git a/internal/cli/script.go b/internal/cli/script.go new file mode 100644 index 0000000..633f31a --- /dev/null +++ b/internal/cli/script.go @@ -0,0 +1,253 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/signal" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/kar98k/internal/dashboard" + "github.com/kar98k/internal/script" + "github.com/kar98k/internal/tui" + "github.com/spf13/cobra" +) + +var ( + scriptVUs int + scriptDuration string + scriptPreset string + scriptDashboard bool + scriptDashPort string + scriptWait bool +) + +var scriptCmd = &cobra.Command{ + Use: "script ", + Short: "Run a load test script (polyglot: .star, .js, .py, .rb, ...)", + Long: `Run a code-based load test with kar98k's chaos traffic patterns. + +Supported languages: + .star Starlark (Python-like syntax, built-in) + .js JavaScript (ES5+, built-in) + .py Python (requires python3) + .rb Ruby (requires ruby) + .lua Lua (requires lua) + +Examples: + kar script test.star + kar script test.js --vus 50 --duration 2m + kar script test.py --preset aggressive`, + Args: cobra.ExactArgs(1), + RunE: runScript, +} + +func init() { + scriptCmd.Flags().IntVar(&scriptVUs, "vus", 0, "Override number of virtual users") + scriptCmd.Flags().StringVar(&scriptDuration, "duration", "", "Override test duration (e.g., 30s, 5m)") + scriptCmd.Flags().StringVar(&scriptPreset, "preset", "", "Override chaos preset (gentle, moderate, aggressive)") + scriptCmd.Flags().BoolVar(&scriptDashboard, "dashboard", false, "Enable real-time web dashboard") + scriptCmd.Flags().StringVar(&scriptDashPort, "dash-port", ":8888", "Dashboard listen address") + scriptCmd.Flags().BoolVar(&scriptWait, "wait", false, "Wait for trigger from dashboard before starting") + rootCmd.AddCommand(scriptCmd) +} + +func runScript(cmd *cobra.Command, args []string) error { + scriptPath := args[0] + + // Check file exists + if _, err := os.Stat(scriptPath); os.IsNotExist(err) { + return fmt.Errorf("script not found: %s", scriptPath) + } + + // Detect language and create runner + lang := script.DetectLanguage(scriptPath) + ext := strings.ToLower(filepath.Ext(scriptPath)) + + fmt.Println() + fmt.Println(tui.LogoWithWidth(80)) + fmt.Println() + fmt.Printf(" ⌖ Script: %s (%s)\n", scriptPath, langName(lang, ext)) + + runner, err := script.NewRunner(scriptPath) + if err != nil { + return fmt.Errorf("creating runner: %w", err) + } + defer runner.Close() + + // Load script + fmt.Println(" Loading script...") + if err := runner.Load(scriptPath); err != nil { + return fmt.Errorf("loading script: %w", err) + } + + // Apply CLI overrides + sc := runner.Scenario() + if scriptPreset != "" { + // Override will be applied through scenario config + fmt.Printf(" Preset: %s (override)\n", scriptPreset) + } + + if sc.Name != "" { + fmt.Printf(" Scenario: %s\n", sc.Name) + } + fmt.Printf(" Chaos: %s (spike: %.1fx, noise: ±%.0f%%)\n", + sc.Chaos.Preset, sc.Chaos.SpikeFactor, sc.Chaos.NoiseAmplitude*100) + + // Parse duration override + var durationOverride time.Duration + if scriptDuration != "" { + d, err := time.ParseDuration(scriptDuration) + if err != nil { + return fmt.Errorf("invalid duration %q: %w", scriptDuration, err) + } + durationOverride = d + } + + // Create VU scheduler + scheduler := script.NewVUScheduler(runner, scriptVUs, durationOverride) + + // Start dashboard if enabled + var dash *dashboard.Server + triggerCh := make(chan string, 1) + + if scriptDashboard { + dash = dashboard.New(scriptDashPort) + dash.SetScenario(sc.Name, sc.Chaos.Preset) + dash.SetTriggerCallback(func(action string) { + triggerCh <- action + }) + if err := dash.Start(); err != nil { + return fmt.Errorf("starting dashboard: %w", err) + } + scheduler.SetDashboard(&dashAdapter{dash: dash}) + } + + // Signal handler + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + // Wait mode: loop between waiting for Start and running + if scriptWait && dash != nil { + for { + dash.SetRunning(false) + fmt.Println("\n Waiting for trigger from dashboard...") + fmt.Printf(" Open http://localhost%s and click Start\n", scriptDashPort) + + // Wait for Start or Ctrl+C + select { + case action := <-triggerCh: + if action != "start" { + continue + } + case <-sigCh: + fmt.Println("\n Exiting.") + return nil + } + + fmt.Println(" Triggered! Starting test...") + dash.SetRunning(true) + + // Re-create scheduler for fresh run + scheduler = script.NewVUScheduler(runner, scriptVUs, durationOverride) + scheduler.SetDashboard(&dashAdapter{dash: dash}) + + ctx, cancel := context.WithCancel(context.Background()) + + // Listen for Stop or Ctrl+C during run + go func() { + select { + case <-sigCh: + fmt.Println("\n\n Stopping...") + cancel() + case action := <-triggerCh: + if action == "stop" { + fmt.Println("\n\n Stopped from dashboard.") + cancel() + } + case <-ctx.Done(): + } + }() + + startTime := time.Now() + if err := scheduler.Run(ctx); err != nil { + fmt.Printf("\n Error: %v\n", err) + } + cancel() + elapsed := time.Since(startTime) + dash.SetRunning(false) + + script.PrintReport(runner, elapsed) + fmt.Println(" Ready for next run. Click Start in dashboard or Ctrl+C to exit.") + } + } + + // Non-wait mode: run immediately + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + select { + case <-sigCh: + fmt.Println("\n\n Stopping...") + cancel() + case action := <-triggerCh: + if action == "stop" { + fmt.Println("\n\n Stopped from dashboard.") + cancel() + } + case <-ctx.Done(): + } + }() + + startTime := time.Now() + if err := scheduler.Run(ctx); err != nil { + fmt.Printf("\n Error: %v\n", err) + } + elapsed := time.Since(startTime) + + script.PrintReport(runner, elapsed) + + return nil +} + +func langName(lang script.Language, ext string) string { + switch lang { + case script.LangStarlark: + return "Starlark" + case script.LangJS: + return "JavaScript" + case script.LangExternal: + switch ext { + case ".py": + return "Python" + case ".rb": + return "Ruby" + case ".lua": + return "Lua" + default: + return "External" + } + } + return "Unknown" +} + +// dashAdapter bridges dashboard.Server to script.DashboardPusher. +type dashAdapter struct { + dash *dashboard.Server +} + +func (a *dashAdapter) Push(stats interface{}) { + data, err := json.Marshal(stats) + if err != nil { + return + } + var s dashboard.Stats + if json.Unmarshal(data, &s) == nil { + a.dash.Push(s) + } +} diff --git a/internal/dashboard/html.go b/internal/dashboard/html.go new file mode 100644 index 0000000..767b4e4 --- /dev/null +++ b/internal/dashboard/html.go @@ -0,0 +1,312 @@ +package dashboard + +const dashboardHTML = ` + + + + +kar98k + + + + +
+
+

kar98k

+ Scenario: - + Preset: - +
+
+ + + 00:00 +
+
+ +
+
+
0
RPS
+
peak: 0
+
+
+
0
Requests
+
0 req/s avg
+
+
+
0%
Failures
+
0 total
+
+
+
0ms
Avg Latency
+
peak: 0ms
+
+
+
0ms
P95
+
peak: 0ms
+
+
+
0
Users
+
peak: 0
+
+
+ +
+
+
+
+

Requests per Second

+
RPS
+
+
Current throughput. Spikes appear as sharp peaks above the baseline.
+
+
+
+
+

Response Times (ms)

+
+ Avg + P95 + P99 +
+
+
Server response time. P95 = 95% of requests are faster than this value.
+
+
+
+
+
+

RPS History

+
Last 10 seconds. Highlights when RPS jumps above 1.5x of average (spike detected).
+
Collecting...
+
+
+

Checks

+
Assertions defined in your test script. Green = all passing.
+
Waiting...
+
+
+

Status Codes

+
HTTP response code distribution. 2xx = success, 4xx/5xx = errors.
+ + + +
CodeCount
Waiting...
+
+
+

Latency Summary

+
Avg = mean response time. P95/P99 = tail latency experienced by slowest requests.
+ + + + + + + + +
MetricValue
Avg-
P95-
P99-
Peak Avg-
+
+
+
+ + + +` diff --git a/internal/dashboard/server.go b/internal/dashboard/server.go new file mode 100644 index 0000000..5e6832d --- /dev/null +++ b/internal/dashboard/server.go @@ -0,0 +1,246 @@ +package dashboard + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "sync" + "time" +) + +// Stats holds real-time metrics sent to the dashboard. +type Stats struct { + Timestamp int64 `json:"timestamp"` + RPS float64 `json:"rps"` + TotalReqs int64 `json:"total_reqs"` + TotalErrors int64 `json:"total_errors"` + AvgLatency float64 `json:"avg_latency"` + P95Latency float64 `json:"p95_latency"` + P99Latency float64 `json:"p99_latency"` + ActiveVUs int64 `json:"active_vus"` + Iterations int64 `json:"iterations"` + ErrorRate float64 `json:"error_rate"` + StatusCodes map[int]int64 `json:"status_codes"` + Checks []CheckStat `json:"checks"` + Elapsed float64 `json:"elapsed"` +} + +// CheckStat is a single check result for the dashboard. +type CheckStat struct { + Name string `json:"name"` + Rate float64 `json:"rate"` + Passed int64 `json:"passed"` + Failed int64 `json:"failed"` +} + +// TriggerCallback is called when the dashboard triggers a test start/stop. +type TriggerCallback func(action string) + +// Server is the real-time web dashboard server. +type Server struct { + mu sync.RWMutex + addr string + clients map[chan []byte]struct{} + latest Stats + history []Stats + startTime time.Time + scenario string + preset string + running bool + onTrigger TriggerCallback +} + +// New creates a new dashboard server. +func New(addr string) *Server { + return &Server{ + addr: addr, + clients: make(map[chan []byte]struct{}), + startTime: time.Now(), + history: make([]Stats, 0, 1800), // 30 min at 1/s + } +} + +// SetScenario sets the scenario metadata for display. +func (s *Server) SetScenario(name, preset string) { + s.scenario = name + s.preset = preset +} + +// SetTriggerCallback sets the callback for start/stop from dashboard. +func (s *Server) SetTriggerCallback(cb TriggerCallback) { + s.onTrigger = cb +} + +// SetRunning updates the running state. +func (s *Server) SetRunning(running bool) { + s.mu.Lock() + s.running = running + s.mu.Unlock() +} + +// Start begins serving the dashboard. +func (s *Server) Start() error { + mux := http.NewServeMux() + mux.HandleFunc("/", s.handleIndex) + mux.HandleFunc("/events", s.handleSSE) + mux.HandleFunc("/api/stats", s.handleStats) + mux.HandleFunc("/api/history", s.handleHistory) + mux.HandleFunc("/api/start", s.handleStart) + mux.HandleFunc("/api/stop", s.handleStop) + mux.HandleFunc("/api/state", s.handleState) + + server := &http.Server{ + Addr: s.addr, + Handler: mux, + } + + go func() { + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Printf("[dashboard] error: %v", err) + } + }() + + fmt.Printf(" Dashboard: http://localhost%s\n", s.addr) + return nil +} + +// Push sends new stats to all connected dashboard clients. +func (s *Server) Push(stats Stats) { + s.mu.Lock() + s.latest = stats + s.history = append(s.history, stats) + // Keep last 30 min + if len(s.history) > 1800 { + s.history = s.history[1:] + } + s.mu.Unlock() + + data, err := json.Marshal(stats) + if err != nil { + return + } + + s.mu.RLock() + for ch := range s.clients { + select { + case ch <- data: + default: + // Client too slow, skip + } + } + s.mu.RUnlock() +} + +func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "SSE not supported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + ch := make(chan []byte, 32) + s.mu.Lock() + s.clients[ch] = struct{}{} + s.mu.Unlock() + + defer func() { + s.mu.Lock() + delete(s.clients, ch) + s.mu.Unlock() + }() + + // Send initial state + s.mu.RLock() + initial, _ := json.Marshal(map[string]interface{}{ + "scenario": s.scenario, + "preset": s.preset, + }) + s.mu.RUnlock() + fmt.Fprintf(w, "event: init\ndata: %s\n\n", initial) + flusher.Flush() + + for { + select { + case data := <-ch: + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + case <-r.Context().Done(): + return + } + } +} + +func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { + s.mu.RLock() + defer s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(s.latest) +} + +func (s *Server) handleHistory(w http.ResponseWriter, r *http.Request) { + s.mu.RLock() + defer s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(s.history) +} + +func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Write([]byte(dashboardHTML)) +} + +func (s *Server) handleStart(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "POST only", http.StatusMethodNotAllowed) + return + } + s.mu.Lock() + s.running = true + s.startTime = time.Now() + s.history = s.history[:0] + s.mu.Unlock() + + if s.onTrigger != nil { + s.onTrigger("start") + } + + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"status":"started"}`)) +} + +func (s *Server) handleStop(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "POST only", http.StatusMethodNotAllowed) + return + } + s.mu.Lock() + s.running = false + s.mu.Unlock() + + if s.onTrigger != nil { + s.onTrigger("stop") + } + + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"status":"stopped"}`)) +} + +func (s *Server) handleState(w http.ResponseWriter, r *http.Request) { + s.mu.RLock() + defer s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "running": s.running, + "scenario": s.scenario, + "preset": s.preset, + }) +} diff --git a/internal/script/builtins.go b/internal/script/builtins.go new file mode 100644 index 0000000..0ce776b --- /dev/null +++ b/internal/script/builtins.go @@ -0,0 +1,360 @@ +package script + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "math/rand" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "go.starlark.net/starlark" + "go.starlark.net/starlarkstruct" +) + +// CheckResult tracks check pass/fail counts. +type CheckResult struct { + Name string + Passed int64 + Failed int64 +} + +// Metrics tracks request metrics during script execution. +type Metrics struct { + mu sync.Mutex + TotalRequests int64 + TotalErrors int64 + Durations []float64 + StatusCodes map[int]int64 + Checks []CheckResult + checkMap map[string]int +} + +func newMetrics() *Metrics { + return &Metrics{ + StatusCodes: make(map[int]int64), + checkMap: make(map[string]int), + } +} + +func (m *Metrics) recordRequest(status int, duration time.Duration, err error) { + m.mu.Lock() + defer m.mu.Unlock() + + atomic.AddInt64(&m.TotalRequests, 1) + if err != nil || status >= 400 { + atomic.AddInt64(&m.TotalErrors, 1) + } + + m.Durations = append(m.Durations, duration.Seconds()) + m.StatusCodes[status]++ +} + +func (m *Metrics) recordCheck(name string, passed bool) { + m.mu.Lock() + defer m.mu.Unlock() + + idx, exists := m.checkMap[name] + if !exists { + idx = len(m.Checks) + m.checkMap[name] = idx + m.Checks = append(m.Checks, CheckResult{Name: name}) + } + + if passed { + m.Checks[idx].Passed++ + } else { + m.Checks[idx].Failed++ + } +} + +// httpModule creates the http.get/post/put/delete module. +func httpModule(rt *Runtime) *starlarkstruct.Module { + return &starlarkstruct.Module{ + Name: "http", + Members: starlark.StringDict{ + "get": starlark.NewBuiltin("http.get", makeHTTPMethod(rt, "GET")), + "post": starlark.NewBuiltin("http.post", makeHTTPMethod(rt, "POST")), + "put": starlark.NewBuiltin("http.put", makeHTTPMethod(rt, "PUT")), + "delete": starlark.NewBuiltin("http.delete", makeHTTPMethod(rt, "DELETE")), + "patch": starlark.NewBuiltin("http.patch", makeHTTPMethod(rt, "PATCH")), + }, + } +} + +func makeHTTPMethod(rt *Runtime, method string) func(*starlark.Thread, *starlark.Builtin, starlark.Tuple, []starlark.Tuple) (starlark.Value, error) { + return func(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + var url starlark.String + var headers starlark.Value = starlark.None + var jsonBody starlark.Value = starlark.None + var body starlark.String + + if err := starlark.UnpackArgs("http."+strings.ToLower(method), args, kwargs, + "url", &url, + "headers?", &headers, + "json?", &jsonBody, + "body?", &body, + ); err != nil { + return nil, err + } + + // Build request body + var reqBody io.Reader + contentType := "" + + if jsonBody != starlark.None { + jsonBytes, err := starlarkValueToJSON(jsonBody) + if err != nil { + return nil, fmt.Errorf("json encoding: %w", err) + } + reqBody = bytes.NewReader(jsonBytes) + contentType = "application/json" + } else if string(body) != "" { + reqBody = strings.NewReader(string(body)) + } + + // Create HTTP request + req, err := http.NewRequest(method, string(url), reqBody) + if err != nil { + return nil, fmt.Errorf("creating request: %w", err) + } + + if contentType != "" { + req.Header.Set("Content-Type", contentType) + } + + // Set headers + if dict, ok := headers.(*starlark.Dict); ok { + for _, item := range dict.Items() { + k, _ := starlark.AsString(item[0]) + v, _ := starlark.AsString(item[1]) + req.Header.Set(k, v) + } + } + + // Execute request + start := time.Now() + resp, err := rt.httpClient.Do(req) + duration := time.Since(start) + + if err != nil { + rt.metrics.recordRequest(0, duration, err) + return makeResponseValue(0, nil, duration, err), nil + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + rt.metrics.recordRequest(resp.StatusCode, duration, nil) + + return makeResponseValue(resp.StatusCode, respBody, duration, nil), nil + } +} + +// makeResponseValue creates a Starlark response struct. +func makeResponseValue(status int, body []byte, duration time.Duration, err error) starlark.Value { + errStr := "" + if err != nil { + errStr = err.Error() + } + + bodyStr := "" + if body != nil { + bodyStr = string(body) + } + + return starlarkstruct.FromStringDict(starlarkstruct.Default, starlark.StringDict{ + "status": starlark.MakeInt(status), + "body": starlark.String(bodyStr), + "duration": starlark.Float(duration.Seconds()), + "error": starlark.String(errStr), + "json": starlark.NewBuiltin("json", makeJSONParser(bodyStr)), + }) +} + +func makeJSONParser(body string) func(*starlark.Thread, *starlark.Builtin, starlark.Tuple, []starlark.Tuple) (starlark.Value, error) { + return func(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + var raw interface{} + if err := json.Unmarshal([]byte(body), &raw); err != nil { + return starlark.None, fmt.Errorf("json parse: %w", err) + } + return goToStarlark(raw), nil + } +} + +// checkBuiltin implements check(response, { "name": lambda r: r.status == 200 }). +func checkBuiltin(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + rt := thread.Local("runtime").(*Runtime) + + if len(args) != 2 { + return nil, fmt.Errorf("check: expected 2 arguments (response, checks), got %d", len(args)) + } + + resp := args[0] + checksDict, ok := args[1].(*starlark.Dict) + if !ok { + return nil, fmt.Errorf("check: second argument must be a dict, got %s", args[1].Type()) + } + + allPassed := true + for _, item := range checksDict.Items() { + name, _ := starlark.AsString(item[0]) + fn := item[1] + + result, err := starlark.Call(thread, fn, starlark.Tuple{resp}, nil) + if err != nil { + rt.metrics.recordCheck(name, false) + allPassed = false + continue + } + + ok := result.Truth() == starlark.True + + rt.metrics.recordCheck(name, ok) + if !ok { + allPassed = false + } + } + + return starlark.Bool(allPassed), nil +} + +// sleepBuiltin implements sleep("1s") or sleep(1.0). +func sleepBuiltin(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + if len(args) != 1 { + return nil, fmt.Errorf("sleep: expected 1 argument, got %d", len(args)) + } + + var d time.Duration + + switch v := args[0].(type) { + case starlark.String: + var err error + d, err = time.ParseDuration(string(v)) + if err != nil { + return nil, fmt.Errorf("sleep: invalid duration %q: %w", string(v), err) + } + case starlark.Float: + d = time.Duration(float64(v) * float64(time.Second)) + case starlark.Int: + i, _ := v.Int64() + d = time.Duration(i) * time.Second + default: + return nil, fmt.Errorf("sleep: expected string or number, got %s", v.Type()) + } + + time.Sleep(d) + return starlark.None, nil +} + +// thinkTimeBuiltin implements think_time("1s", "3s") — chaos-aware. +// During spikes, think time compresses. During quiet periods, it expands. +func thinkTimeBuiltin(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + if len(args) != 2 { + return nil, fmt.Errorf("think_time: expected 2 arguments (min, max), got %d", len(args)) + } + + minStr, _ := starlark.AsString(args[0]) + maxStr, _ := starlark.AsString(args[1]) + + minD, err := time.ParseDuration(minStr) + if err != nil { + return nil, fmt.Errorf("think_time min: %w", err) + } + maxD, err := time.ParseDuration(maxStr) + if err != nil { + return nil, fmt.Errorf("think_time max: %w", err) + } + + // Random duration between min and max + rangeMs := maxD.Milliseconds() - minD.Milliseconds() + if rangeMs <= 0 { + rangeMs = 1 + } + d := minD + time.Duration(rand.Int63n(rangeMs))*time.Millisecond + + // Return as a duration string for use with sleep() + return starlark.String(d.String()), nil +} + +// groupBuiltin implements group("name", fn) for logical grouping. +func groupBuiltin(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + if len(args) != 2 { + return nil, fmt.Errorf("group: expected 2 arguments (name, fn), got %d", len(args)) + } + + // Just execute the function — groups are for metric labeling + fn := args[1] + return starlark.Call(thread, fn, nil, nil) +} + +// Helper: convert Go value to Starlark value. +func goToStarlark(v interface{}) starlark.Value { + switch val := v.(type) { + case nil: + return starlark.None + case bool: + return starlark.Bool(val) + case float64: + if val == float64(int64(val)) { + return starlark.MakeInt64(int64(val)) + } + return starlark.Float(val) + case string: + return starlark.String(val) + case []interface{}: + list := make([]starlark.Value, len(val)) + for i, item := range val { + list[i] = goToStarlark(item) + } + return starlark.NewList(list) + case map[string]interface{}: + dict := starlark.NewDict(len(val)) + for k, item := range val { + dict.SetKey(starlark.String(k), goToStarlark(item)) + } + return dict + default: + return starlark.String(fmt.Sprintf("%v", val)) + } +} + +// Helper: convert Starlark value to JSON bytes. +func starlarkValueToJSON(v starlark.Value) ([]byte, error) { + goVal := starlarkToGo(v) + return json.Marshal(goVal) +} + +func starlarkToGo(v starlark.Value) interface{} { + switch val := v.(type) { + case starlark.NoneType: + return nil + case starlark.Bool: + return bool(val) + case starlark.Int: + i, _ := val.Int64() + return i + case starlark.Float: + return float64(val) + case starlark.String: + return string(val) + case *starlark.List: + result := make([]interface{}, val.Len()) + for i := 0; i < val.Len(); i++ { + result[i] = starlarkToGo(val.Index(i)) + } + return result + case *starlark.Dict: + result := make(map[string]interface{}) + for _, item := range val.Items() { + k, _ := starlark.AsString(item[0]) + result[k] = starlarkToGo(item[1]) + } + return result + default: + return val.String() + } +} diff --git a/internal/script/external_runner.go b/internal/script/external_runner.go new file mode 100644 index 0000000..d99481d --- /dev/null +++ b/internal/script/external_runner.go @@ -0,0 +1,257 @@ +package script + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "net/http" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" +) + +// ExternalRunner implements Runner for any language via JSON-line protocol. +// +// Protocol: +// kar98k sends commands to stdin as JSON lines: +// {"cmd":"setup"} +// {"cmd":"iterate","vu_id":1,"data":{...}} +// {"cmd":"teardown","data":{...}} +// +// Script responds on stdout with JSON lines: +// {"type":"scenario","name":"...","chaos":{...},"stages":[...]} +// {"type":"http","method":"GET","url":"...","headers":{...}} +// {"type":"check","name":"...","passed":true} +// {"type":"log","message":"..."} +// {"type":"done","data":{...}} +// {"type":"error","message":"..."} +type ExternalRunner struct { + path string + interpreter string + scenario ScenarioConfig + metrics *Metrics + mu sync.Mutex + cmd *exec.Cmd + stdin *json.Encoder + stdout *bufio.Scanner +} + +type externalCmd struct { + Cmd string `json:"cmd"` + VuID int `json:"vu_id,omitempty"` + Data interface{} `json:"data,omitempty"` +} + +type externalResponse struct { + Type string `json:"type"` + Name string `json:"name,omitempty"` + Method string `json:"method,omitempty"` + URL string `json:"url,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + Body string `json:"body,omitempty"` + Passed bool `json:"passed,omitempty"` + Message string `json:"message,omitempty"` + Data map[string]interface{} `json:"data,omitempty"` + Chaos *ChaosConfig `json:"chaos,omitempty"` + Stages []Stage `json:"stages,omitempty"` +} + +func NewExternalRunner(path string) (*ExternalRunner, error) { + ext := strings.ToLower(filepath.Ext(path)) + interpreter, ok := ExternalInterpreter[ext] + if !ok { + return nil, fmt.Errorf("no interpreter configured for %q extension. Supported: %v", ext, supportedExts()) + } + + return &ExternalRunner{ + path: path, + interpreter: interpreter, + scenario: ScenarioConfig{Chaos: chaosPresets["moderate"]}, + metrics: newMetrics(), + }, nil +} + +func (r *ExternalRunner) Load(path string) error { + parts := strings.Fields(r.interpreter) + args := append(parts[1:], path) + r.cmd = exec.Command(parts[0], args...) + + stdin, err := r.cmd.StdinPipe() + if err != nil { + return fmt.Errorf("creating stdin pipe: %w", err) + } + r.stdin = json.NewEncoder(stdin) + + stdout, err := r.cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("creating stdout pipe: %w", err) + } + r.stdout = bufio.NewScanner(stdout) + + if err := r.cmd.Start(); err != nil { + return fmt.Errorf("starting %s: %w", r.interpreter, err) + } + + // Send init command and read scenario config + if err := r.stdin.Encode(externalCmd{Cmd: "init"}); err != nil { + return fmt.Errorf("sending init: %w", err) + } + + // Read responses until we get a "done" for init + for r.stdout.Scan() { + var resp externalResponse + if err := json.Unmarshal(r.stdout.Bytes(), &resp); err != nil { + continue + } + + switch resp.Type { + case "scenario": + r.scenario.Name = resp.Name + if resp.Chaos != nil { + r.scenario.Chaos = *resp.Chaos + if resp.Chaos.Preset != "" { + if base, ok := chaosPresets[resp.Chaos.Preset]; ok { + r.scenario.Chaos = base + if resp.Chaos.SpikeFactor > 0 { + r.scenario.Chaos.SpikeFactor = resp.Chaos.SpikeFactor + } + if resp.Chaos.NoiseAmplitude > 0 { + r.scenario.Chaos.NoiseAmplitude = resp.Chaos.NoiseAmplitude + } + } + } + } + r.scenario.Stages = resp.Stages + case "done": + return nil + case "error": + return fmt.Errorf("script init error: %s", resp.Message) + } + } + + return nil +} + +func (r *ExternalRunner) Setup() (interface{}, error) { + if err := r.stdin.Encode(externalCmd{Cmd: "setup"}); err != nil { + return nil, err + } + return r.readUntilDone() +} + +func (r *ExternalRunner) Iterate(vuID int, data interface{}) error { + r.mu.Lock() + defer r.mu.Unlock() + + if err := r.stdin.Encode(externalCmd{Cmd: "iterate", VuID: vuID, Data: data}); err != nil { + return err + } + _, err := r.readUntilDone() + return err +} + +func (r *ExternalRunner) Teardown(data interface{}) error { + if err := r.stdin.Encode(externalCmd{Cmd: "teardown", Data: data}); err != nil { + return err + } + _, err := r.readUntilDone() + return err +} + +func (r *ExternalRunner) readUntilDone() (interface{}, error) { + for r.stdout.Scan() { + var resp externalResponse + if err := json.Unmarshal(r.stdout.Bytes(), &resp); err != nil { + continue + } + + switch resp.Type { + case "http": + r.executeExternalHTTP(resp) + case "check": + r.metrics.recordCheck(resp.Name, resp.Passed) + case "log": + fmt.Println(resp.Message) + case "done": + return resp.Data, nil + case "error": + return nil, fmt.Errorf("script error: %s", resp.Message) + } + } + return nil, fmt.Errorf("script ended unexpectedly") +} + +func (r *ExternalRunner) executeExternalHTTP(resp externalResponse) { + start := time.Now() + + req, err := newHTTPRequest(resp.Method, resp.URL, resp.Headers, resp.Body) + if err != nil { + r.metrics.recordRequest(0, time.Since(start), err) + r.stdin.Encode(map[string]interface{}{ + "status": 0, "body": "", "duration": 0, "error": err.Error(), + }) + return + } + + client := &http.Client{Timeout: 30 * time.Second} + httpResp, err := client.Do(req) + duration := time.Since(start) + + if err != nil { + r.metrics.recordRequest(0, duration, err) + r.stdin.Encode(map[string]interface{}{ + "status": 0, "body": "", "duration": duration.Seconds(), "error": err.Error(), + }) + return + } + defer httpResp.Body.Close() + + body, _ := io.ReadAll(httpResp.Body) + r.metrics.recordRequest(httpResp.StatusCode, duration, nil) + + // Send response back to script so it can inspect status, body, etc. + r.stdin.Encode(map[string]interface{}{ + "status": httpResp.StatusCode, + "body": string(body), + "duration": duration.Seconds(), + "error": "", + }) +} + +func (r *ExternalRunner) Scenario() *ScenarioConfig { return &r.scenario } +func (r *ExternalRunner) Metrics() *Metrics { return r.metrics } + +func (r *ExternalRunner) Close() error { + if r.cmd != nil && r.cmd.Process != nil { + return r.cmd.Process.Kill() + } + return nil +} + +func supportedExts() []string { + exts := make([]string, 0, len(ExternalInterpreter)) + for ext := range ExternalInterpreter { + exts = append(exts, ext) + } + return exts +} + +func newHTTPRequest(method, url string, headers map[string]string, body string) (*http.Request, error) { + var bodyReader io.Reader + if body != "" { + bodyReader = strings.NewReader(body) + } + + req, err := http.NewRequest(method, url, bodyReader) + if err != nil { + return nil, err + } + + for k, v := range headers { + req.Header.Set(k, v) + } + return req, nil +} diff --git a/internal/script/js_runner.go b/internal/script/js_runner.go new file mode 100644 index 0000000..d096b81 --- /dev/null +++ b/internal/script/js_runner.go @@ -0,0 +1,376 @@ +package script + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "sync" + "time" + + "github.com/dop251/goja" +) + +// JSRunner implements Runner for JavaScript (.js) scripts. +type JSRunner struct { + vm *goja.Runtime + mu sync.Mutex + scenario ScenarioConfig + metrics *Metrics + client *http.Client + + setupFn goja.Callable + defaultFn goja.Callable + teardownFn goja.Callable +} + +func NewJSRunner() *JSRunner { + return &JSRunner{ + scenario: ScenarioConfig{ + Chaos: chaosPresets["moderate"], + }, + metrics: newMetrics(), + client: &http.Client{ + Timeout: 30 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + }, + }, + } +} + +func (r *JSRunner) Load(path string) error { + r.vm = goja.New() + + r.registerGlobals() + + data, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("reading script: %w", err) + } + + _, err = r.vm.RunString(string(data)) + if err != nil { + return fmt.Errorf("executing script: %w", err) + } + + // Extract lifecycle functions + if fn, ok := goja.AssertFunction(r.vm.Get("setup")); ok { + r.setupFn = fn + } + if fn, ok := goja.AssertFunction(r.vm.Get("default")); ok { + r.defaultFn = fn + } else { + // Try "run" as alias + if fn, ok := goja.AssertFunction(r.vm.Get("run")); ok { + r.defaultFn = fn + } + } + if fn, ok := goja.AssertFunction(r.vm.Get("teardown")); ok { + r.teardownFn = fn + } + + if r.defaultFn == nil { + return fmt.Errorf("script must export a default() or run() function") + } + + return nil +} + +func (r *JSRunner) registerGlobals() { + // scenario() + r.vm.Set("scenario", func(call goja.FunctionCall) goja.Value { + obj := call.Argument(0).ToObject(r.vm) + if name := obj.Get("name"); name != nil { + r.scenario.Name = name.String() + } + if pattern := obj.Get("pattern"); pattern != nil && pattern != goja.Undefined() { + r.parseJSChaos(pattern.ToObject(r.vm)) + } + if thresholds := obj.Get("thresholds"); thresholds != nil && thresholds != goja.Undefined() { + r.scenario.Thresholds = make(map[string]string) + obj := thresholds.ToObject(r.vm) + for _, key := range obj.Keys() { + r.scenario.Thresholds[key] = obj.Get(key).String() + } + } + if vus := obj.Get("vus"); vus != nil && vus != goja.Undefined() { + r.parseJSStages(vus.ToObject(r.vm)) + } + return goja.Undefined() + }) + + // chaos() + r.vm.Set("chaos", func(call goja.FunctionCall) goja.Value { + obj := call.Argument(0).ToObject(r.vm) + cfg := chaosPresets["moderate"] + + if preset := obj.Get("preset"); preset != nil && preset != goja.Undefined() { + if p, ok := chaosPresets[preset.String()]; ok { + cfg = p + } + } + if sf := obj.Get("spike_factor"); sf != nil && sf != goja.Undefined() { + cfg.SpikeFactor = sf.ToFloat() + } + if na := obj.Get("noise_amplitude"); na != nil && na != goja.Undefined() { + cfg.NoiseAmplitude = na.ToFloat() + } + + result := r.vm.NewObject() + result.Set("preset", cfg.Preset) + result.Set("spike_factor", cfg.SpikeFactor) + result.Set("noise_amplitude", cfg.NoiseAmplitude) + result.Set("lambda", cfg.Lambda) + return result + }) + + // stage() + r.vm.Set("stage", func(call goja.FunctionCall) goja.Value { + dur := call.Argument(0).String() + target := call.Argument(1).ToInteger() + obj := r.vm.NewObject() + obj.Set("duration", dur) + obj.Set("target", target) + return obj + }) + + // ramp() + r.vm.Set("ramp", func(call goja.FunctionCall) goja.Value { + return call.Argument(0) + }) + + // http module + httpObj := r.vm.NewObject() + httpObj.Set("get", r.makeJSHTTPMethod("GET")) + httpObj.Set("post", r.makeJSHTTPMethod("POST")) + httpObj.Set("put", r.makeJSHTTPMethod("PUT")) + httpObj.Set("delete", r.makeJSHTTPMethod("DELETE")) + httpObj.Set("patch", r.makeJSHTTPMethod("PATCH")) + r.vm.Set("http", httpObj) + + // check() + r.vm.Set("check", func(call goja.FunctionCall) goja.Value { + resp := call.Argument(0) + checksObj := call.Argument(1).ToObject(r.vm) + + allPassed := true + for _, key := range checksObj.Keys() { + fn, ok := goja.AssertFunction(checksObj.Get(key)) + if !ok { + continue + } + result, err := fn(goja.Undefined(), resp) + if err != nil { + r.metrics.recordCheck(key, false) + allPassed = false + continue + } + passed := result.ToBoolean() + r.metrics.recordCheck(key, passed) + if !passed { + allPassed = false + } + } + return r.vm.ToValue(allPassed) + }) + + // sleep() + r.vm.Set("sleep", func(call goja.FunctionCall) goja.Value { + arg := call.Argument(0) + var d time.Duration + if arg.ExportType().Kind().String() == "string" { + d, _ = time.ParseDuration(arg.String()) + } else { + d = time.Duration(arg.ToFloat() * float64(time.Second)) + } + time.Sleep(d) + return goja.Undefined() + }) + + // think_time() + r.vm.Set("think_time", func(call goja.FunctionCall) goja.Value { + minStr := call.Argument(0).String() + maxStr := call.Argument(1).String() + minD, _ := time.ParseDuration(minStr) + maxD, _ := time.ParseDuration(maxStr) + rangeMs := maxD.Milliseconds() - minD.Milliseconds() + if rangeMs <= 0 { + rangeMs = 1 + } + d := minD + time.Duration(time.Now().UnixNano()%rangeMs)*time.Millisecond + return r.vm.ToValue(d.String()) + }) + + // console.log + console := r.vm.NewObject() + console.Set("log", func(call goja.FunctionCall) goja.Value { + args := make([]interface{}, len(call.Arguments)) + for i, a := range call.Arguments { + args[i] = a.Export() + } + fmt.Println(args...) + return goja.Undefined() + }) + r.vm.Set("console", console) +} + +func (r *JSRunner) makeJSHTTPMethod(method string) func(goja.FunctionCall) goja.Value { + return func(call goja.FunctionCall) goja.Value { + url := call.Argument(0).String() + + var reqBody io.Reader + contentType := "" + var headers map[string]string + + if len(call.Arguments) > 1 { + opts := call.Argument(1).ToObject(r.vm) + + if h := opts.Get("headers"); h != nil && h != goja.Undefined() { + headers = make(map[string]string) + hObj := h.ToObject(r.vm) + for _, k := range hObj.Keys() { + headers[k] = hObj.Get(k).String() + } + } + if j := opts.Get("json"); j != nil && j != goja.Undefined() { + jsonBytes, _ := json.Marshal(j.Export()) + reqBody = bytes.NewReader(jsonBytes) + contentType = "application/json" + } + if b := opts.Get("body"); b != nil && b != goja.Undefined() { + reqBody = bytes.NewReader([]byte(b.String())) + } + } + + req, err := http.NewRequest(method, url, reqBody) + if err != nil { + return r.makeJSResponse(0, nil, 0, err) + } + + if contentType != "" { + req.Header.Set("Content-Type", contentType) + } + for k, v := range headers { + req.Header.Set(k, v) + } + + start := time.Now() + resp, err := r.client.Do(req) + duration := time.Since(start) + + if err != nil { + r.metrics.recordRequest(0, duration, err) + return r.makeJSResponse(0, nil, duration, err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + r.metrics.recordRequest(resp.StatusCode, duration, nil) + + return r.makeJSResponse(resp.StatusCode, body, duration, nil) + } +} + +func (r *JSRunner) makeJSResponse(status int, body []byte, duration time.Duration, err error) goja.Value { + obj := r.vm.NewObject() + obj.Set("status", status) + obj.Set("body", string(body)) + obj.Set("duration", duration.Seconds()) + + errStr := "" + if err != nil { + errStr = err.Error() + } + obj.Set("error", errStr) + + obj.Set("json", func(call goja.FunctionCall) goja.Value { + var raw interface{} + if json.Unmarshal(body, &raw) != nil { + return goja.Undefined() + } + return r.vm.ToValue(raw) + }) + + return obj +} + +func (r *JSRunner) parseJSChaos(obj *goja.Object) { + cfg := chaosPresets["moderate"] + if preset := obj.Get("preset"); preset != nil && preset != goja.Undefined() { + if p, ok := chaosPresets[preset.String()]; ok { + cfg = p + } + } + if sf := obj.Get("spike_factor"); sf != nil && sf != goja.Undefined() { + cfg.SpikeFactor = sf.ToFloat() + } + if na := obj.Get("noise_amplitude"); na != nil && na != goja.Undefined() { + cfg.NoiseAmplitude = na.ToFloat() + } + r.scenario.Chaos = cfg +} + +func (r *JSRunner) parseJSStages(obj *goja.Object) { + // Array of stage objects + length := obj.Get("length") + if length == nil { + return + } + n := int(length.ToInteger()) + for i := 0; i < n; i++ { + item := obj.Get(fmt.Sprintf("%d", i)).ToObject(r.vm) + dur, _ := time.ParseDuration(item.Get("duration").String()) + target := int(item.Get("target").ToInteger()) + r.scenario.Stages = append(r.scenario.Stages, Stage{Duration: dur, Target: target}) + } +} + +func (r *JSRunner) Setup() (interface{}, error) { + if r.setupFn == nil { + return nil, nil + } + result, err := r.setupFn(goja.Undefined()) + if err != nil { + return nil, fmt.Errorf("setup(): %w", err) + } + return result, nil +} + +func (r *JSRunner) Iterate(vuID int, data interface{}) error { + r.mu.Lock() + defer r.mu.Unlock() + + var dataVal goja.Value = goja.Undefined() + if data != nil { + dataVal = r.vm.ToValue(data) + } + _, err := r.defaultFn(goja.Undefined(), dataVal) + if err != nil { + return fmt.Errorf("default() VU %d: %w", vuID, err) + } + return nil +} + +func (r *JSRunner) Teardown(data interface{}) error { + if r.teardownFn == nil { + return nil + } + var dataVal goja.Value = goja.Undefined() + if data != nil { + dataVal = r.vm.ToValue(data) + } + _, err := r.teardownFn(goja.Undefined(), dataVal) + if err != nil { + return fmt.Errorf("teardown(): %w", err) + } + return nil +} + +func (r *JSRunner) Scenario() *ScenarioConfig { return &r.scenario } +func (r *JSRunner) Metrics() *Metrics { return r.metrics } +func (r *JSRunner) Close() error { return nil } diff --git a/internal/script/runner.go b/internal/script/runner.go new file mode 100644 index 0000000..0b5485b --- /dev/null +++ b/internal/script/runner.go @@ -0,0 +1,81 @@ +package script + +import ( + "fmt" + "path/filepath" + "strings" +) + +// Runner is the common interface for all language backends. +// Each language runtime implements this to participate in kar98k's script engine. +type Runner interface { + // Load parses a script file and prepares for execution. + Load(path string) error + + // Setup runs the setup() function once before VUs start. + // Returns shared data accessible in each iteration. + Setup() (interface{}, error) + + // Iterate runs one VU iteration (the default() function). + // Called repeatedly by the VU scheduler. + Iterate(vuID int, data interface{}) error + + // Teardown runs cleanup after all VUs finish. + Teardown(data interface{}) error + + // Scenario returns the parsed scenario configuration. + Scenario() *ScenarioConfig + + // Metrics returns collected metrics. + Metrics() *Metrics + + // Close releases any resources held by the runner. + Close() error +} + +// Language represents a supported scripting language. +type Language string + +const ( + LangStarlark Language = "starlark" + LangJS Language = "javascript" + LangExternal Language = "external" +) + +// DetectLanguage determines the language from a file extension. +func DetectLanguage(path string) Language { + ext := strings.ToLower(filepath.Ext(path)) + switch ext { + case ".star": + return LangStarlark + case ".js": + return LangJS + default: + return LangExternal + } +} + +// NewRunner creates a Runner for the given script file. +func NewRunner(path string) (Runner, error) { + lang := DetectLanguage(path) + + switch lang { + case LangStarlark: + return NewStarlarkRunner(), nil + case LangJS: + return NewJSRunner(), nil + case LangExternal: + return NewExternalRunner(path) + default: + return nil, fmt.Errorf("unsupported script language for %q", path) + } +} + +// ExternalInterpreter maps file extensions to interpreters. +var ExternalInterpreter = map[string]string{ + ".py": "python3", + ".rb": "ruby", + ".lua": "lua", + ".sh": "bash", + ".ts": "npx ts-node", +} diff --git a/internal/script/runtime.go b/internal/script/runtime.go new file mode 100644 index 0000000..9e40338 --- /dev/null +++ b/internal/script/runtime.go @@ -0,0 +1,163 @@ +package script + +import ( + "fmt" + "net/http" + "os" + "time" + + "go.starlark.net/starlark" +) + +// Runtime holds the Starlark execution context. +type Runtime struct { + scenario ScenarioConfig + metrics *Metrics + httpClient *http.Client + + // Starlark state + thread *starlark.Thread + globals starlark.StringDict + + // User-defined functions + setupFn starlark.Callable + defaultFn starlark.Callable + teardownFn starlark.Callable +} + +// StarlarkRunner implements Runner for Starlark (.star) scripts. +type StarlarkRunner struct { + rt *Runtime +} + +func NewStarlarkRunner() *StarlarkRunner { + rt := &Runtime{ + scenario: ScenarioConfig{ + Chaos: chaosPresets["moderate"], + }, + metrics: newMetrics(), + httpClient: &http.Client{ + Timeout: 30 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + }, + }, + } + return &StarlarkRunner{rt: rt} +} + +func (r *StarlarkRunner) Load(path string) error { + rt := r.rt + + rt.thread = &starlark.Thread{ + Name: "kar98k", + Print: func(thread *starlark.Thread, msg string) { + fmt.Println(msg) + }, + } + rt.thread.SetLocal("runtime", rt) + + builtins := starlark.StringDict{ + "scenario": starlark.NewBuiltin("scenario", scenarioBuiltin), + "chaos": starlark.NewBuiltin("chaos", chaosBuiltin), + "ramp": starlark.NewBuiltin("ramp", rampBuiltin), + "stage": starlark.NewBuiltin("stage", stageBuiltin), + "check": starlark.NewBuiltin("check", checkBuiltin), + "sleep": starlark.NewBuiltin("sleep", sleepBuiltin), + "think_time": starlark.NewBuiltin("think_time", thinkTimeBuiltin), + "group": starlark.NewBuiltin("group", groupBuiltin), + "http": httpModule(rt), + } + + data, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("reading script: %w", err) + } + + globals, err := starlark.ExecFile(rt.thread, path, data, builtins) + if err != nil { + return fmt.Errorf("executing script: %w", err) + } + + rt.globals = globals + + // Extract lifecycle functions + if fn, ok := globals["setup"]; ok { + if callable, ok := fn.(starlark.Callable); ok { + rt.setupFn = callable + } + } + if fn, ok := globals["default"]; ok { + if callable, ok := fn.(starlark.Callable); ok { + rt.defaultFn = callable + } + } + if fn, ok := globals["teardown"]; ok { + if callable, ok := fn.(starlark.Callable); ok { + rt.teardownFn = callable + } + } + + if rt.defaultFn == nil { + return fmt.Errorf("script must define a default() function") + } + + return nil +} + +func (r *StarlarkRunner) Setup() (interface{}, error) { + if r.rt.setupFn == nil { + return starlark.None, nil + } + + result, err := starlark.Call(r.rt.thread, r.rt.setupFn, nil, nil) + if err != nil { + return nil, fmt.Errorf("setup(): %w", err) + } + return result, nil +} + +func (r *StarlarkRunner) Iterate(vuID int, data interface{}) error { + starlarkData, ok := data.(starlark.Value) + if !ok { + starlarkData = starlark.None + } + + // Each VU iteration needs its own thread (Starlark is not goroutine-safe) + thread := &starlark.Thread{ + Name: fmt.Sprintf("vu-%d", vuID), + Print: func(thread *starlark.Thread, msg string) { + fmt.Printf("[VU %d] %s\n", vuID, msg) + }, + } + thread.SetLocal("runtime", r.rt) + + _, err := starlark.Call(thread, r.rt.defaultFn, starlark.Tuple{starlarkData}, nil) + if err != nil { + return fmt.Errorf("default() VU %d: %w", vuID, err) + } + return nil +} + +func (r *StarlarkRunner) Teardown(data interface{}) error { + if r.rt.teardownFn == nil { + return nil + } + + starlarkData, ok := data.(starlark.Value) + if !ok { + starlarkData = starlark.None + } + + _, err := starlark.Call(r.rt.thread, r.rt.teardownFn, starlark.Tuple{starlarkData}, nil) + if err != nil { + return fmt.Errorf("teardown(): %w", err) + } + return nil +} + +func (r *StarlarkRunner) Scenario() *ScenarioConfig { return &r.rt.scenario } +func (r *StarlarkRunner) Metrics() *Metrics { return r.rt.metrics } +func (r *StarlarkRunner) Close() error { return nil } diff --git a/internal/script/scenario.go b/internal/script/scenario.go new file mode 100644 index 0000000..ef27192 --- /dev/null +++ b/internal/script/scenario.go @@ -0,0 +1,274 @@ +package script + +import ( + "fmt" + "time" + + "go.starlark.net/starlark" +) + +// ScenarioConfig holds the parsed scenario configuration from a script. +type ScenarioConfig struct { + Name string + Chaos ChaosConfig + Stages []Stage + Thresholds map[string]string + VUs int + Duration time.Duration +} + +// ChaosConfig configures kar98k's chaos traffic patterns. +type ChaosConfig struct { + Preset string + SpikeFactor float64 + NoiseAmplitude float64 + Lambda float64 + MinInterval time.Duration + MaxInterval time.Duration +} + +// Stage defines a VU ramping stage. +type Stage struct { + Duration time.Duration + Target int +} + +// Presets for chaos configuration. +var chaosPresets = map[string]ChaosConfig{ + "gentle": { + Preset: "gentle", + SpikeFactor: 1.5, + Lambda: 0.003, + NoiseAmplitude: 0.05, + MinInterval: 3 * time.Minute, + MaxInterval: 10 * time.Minute, + }, + "moderate": { + Preset: "moderate", + SpikeFactor: 2.0, + Lambda: 0.005, + NoiseAmplitude: 0.10, + MinInterval: 2 * time.Minute, + MaxInterval: 8 * time.Minute, + }, + "aggressive": { + Preset: "aggressive", + SpikeFactor: 3.0, + Lambda: 0.01, + NoiseAmplitude: 0.15, + MinInterval: 1 * time.Minute, + MaxInterval: 5 * time.Minute, + }, +} + +// scenarioBuiltin implements the scenario() function in Starlark. +func scenarioBuiltin(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + rt := thread.Local("runtime").(*Runtime) + + var name starlark.String + var pattern starlark.Value = starlark.None + var vus starlark.Value = starlark.None + var thresholds starlark.Value = starlark.None + + if err := starlark.UnpackArgs("scenario", args, kwargs, + "name", &name, + "pattern?", &pattern, + "vus?", &vus, + "thresholds?", &thresholds, + ); err != nil { + return nil, err + } + + rt.scenario.Name = string(name) + + // Parse chaos config + if dict, ok := pattern.(*starlark.Dict); ok { + if err := parseChaosDict(dict, &rt.scenario.Chaos); err != nil { + return nil, err + } + } + + // Parse stages from ramp() + if list, ok := vus.(*starlark.List); ok { + if err := parseStagesList(list, &rt.scenario.Stages); err != nil { + return nil, err + } + } + + // Parse thresholds + if dict, ok := thresholds.(*starlark.Dict); ok { + rt.scenario.Thresholds = make(map[string]string) + for _, item := range dict.Items() { + k, _ := starlark.AsString(item[0]) + v, _ := starlark.AsString(item[1]) + rt.scenario.Thresholds[k] = v + } + } + + return starlark.None, nil +} + +// chaosBuiltin implements the chaos() function that returns a config dict. +func chaosBuiltin(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + var preset starlark.String + var spikeFactor starlark.Float + var noiseAmplitude starlark.Float + var lambda starlark.Float + + hasSpike := false + hasNoise := false + hasLambda := false + + if err := starlark.UnpackArgs("chaos", args, kwargs, + "preset?", &preset, + "spike_factor?", &spikeFactor, + "noise_amplitude?", &noiseAmplitude, + "lambda?", &lambda, + ); err != nil { + return nil, err + } + + // Check which kwargs were provided + for _, kv := range kwargs { + k, _ := starlark.AsString(kv[0]) + switch k { + case "spike_factor": + hasSpike = true + case "noise_amplitude": + hasNoise = true + case "lambda": + hasLambda = true + } + } + + result := starlark.NewDict(8) + + presetStr := string(preset) + if presetStr == "" { + presetStr = "moderate" + } + + base, ok := chaosPresets[presetStr] + if !ok { + return nil, fmt.Errorf("unknown chaos preset %q (use: gentle, moderate, aggressive)", presetStr) + } + + if hasSpike { + base.SpikeFactor = float64(spikeFactor) + } + if hasNoise { + base.NoiseAmplitude = float64(noiseAmplitude) + } + if hasLambda { + base.Lambda = float64(lambda) + } + + result.SetKey(starlark.String("preset"), starlark.String(presetStr)) + result.SetKey(starlark.String("spike_factor"), starlark.Float(base.SpikeFactor)) + result.SetKey(starlark.String("noise_amplitude"), starlark.Float(base.NoiseAmplitude)) + result.SetKey(starlark.String("lambda"), starlark.Float(base.Lambda)) + result.SetKey(starlark.String("min_interval"), starlark.String(base.MinInterval.String())) + result.SetKey(starlark.String("max_interval"), starlark.String(base.MaxInterval.String())) + + return result, nil +} + +// rampBuiltin implements ramp([stage(...), ...]). +func rampBuiltin(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + if len(args) != 1 { + return nil, fmt.Errorf("ramp: expected 1 argument (list of stages), got %d", len(args)) + } + // Pass through the list — parsing happens in scenario() + return args[0], nil +} + +// stageBuiltin implements stage("30s", 10). +func stageBuiltin(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + var durationStr starlark.String + var target starlark.Int + + if err := starlark.UnpackArgs("stage", args, kwargs, + "duration", &durationStr, + "target", &target, + ); err != nil { + return nil, err + } + + result := starlark.NewDict(2) + result.SetKey(starlark.String("duration"), durationStr) + t, _ := target.Int64() + result.SetKey(starlark.String("target"), starlark.MakeInt64(t)) + + return result, nil +} + +func parseChaosDict(dict *starlark.Dict, cfg *ChaosConfig) error { + if v, found, _ := dict.Get(starlark.String("preset")); found { + s, _ := starlark.AsString(v) + if base, ok := chaosPresets[s]; ok { + *cfg = base + } + } + if v, found, _ := dict.Get(starlark.String("spike_factor")); found { + if f, ok := v.(starlark.Float); ok { + cfg.SpikeFactor = float64(f) + } + } + if v, found, _ := dict.Get(starlark.String("noise_amplitude")); found { + if f, ok := v.(starlark.Float); ok { + cfg.NoiseAmplitude = float64(f) + } + } + if v, found, _ := dict.Get(starlark.String("lambda")); found { + if f, ok := v.(starlark.Float); ok { + cfg.Lambda = float64(f) + } + } + if v, found, _ := dict.Get(starlark.String("min_interval")); found { + s, _ := starlark.AsString(v) + if d, err := time.ParseDuration(s); err == nil { + cfg.MinInterval = d + } + } + if v, found, _ := dict.Get(starlark.String("max_interval")); found { + s, _ := starlark.AsString(v) + if d, err := time.ParseDuration(s); err == nil { + cfg.MaxInterval = d + } + } + return nil +} + +func parseStagesList(list *starlark.List, stages *[]Stage) error { + iter := list.Iterate() + defer iter.Done() + + var val starlark.Value + for iter.Next(&val) { + dict, ok := val.(*starlark.Dict) + if !ok { + return fmt.Errorf("stage: expected dict, got %s", val.Type()) + } + + var stage Stage + + if v, found, _ := dict.Get(starlark.String("duration")); found { + s, _ := starlark.AsString(v) + d, err := time.ParseDuration(s) + if err != nil { + return fmt.Errorf("stage duration %q: %w", s, err) + } + stage.Duration = d + } + + if v, found, _ := dict.Get(starlark.String("target")); found { + if i, ok := v.(starlark.Int); ok { + t, _ := i.Int64() + stage.Target = int(t) + } + } + + *stages = append(*stages, stage) + } + return nil +} diff --git a/internal/script/vu.go b/internal/script/vu.go new file mode 100644 index 0000000..bdf7700 --- /dev/null +++ b/internal/script/vu.go @@ -0,0 +1,563 @@ +package script + +import ( + "context" + "fmt" + "math" + "sort" + "sync" + "sync/atomic" + "time" +) + +// DashboardPusher is an interface for pushing stats to a dashboard. +type DashboardPusher interface { + Push(stats interface{}) +} + +// VUScheduler manages virtual user lifecycle and ramping. +type VUScheduler struct { + runner Runner + stages []Stage + vus int + duration time.Duration + setupData interface{} + dashboard DashboardPusher + + // Runtime state + activeVUs int64 + iterations int64 + startTime time.Time +} + +// SetDashboard attaches a dashboard for real-time stats. +func (s *VUScheduler) SetDashboard(d DashboardPusher) { + s.dashboard = d +} + +// NewVUScheduler creates a scheduler from a runner's scenario config. +func NewVUScheduler(runner Runner, vusOverride int, durationOverride time.Duration) *VUScheduler { + s := &VUScheduler{ + runner: runner, + } + + sc := runner.Scenario() + s.stages = sc.Stages + + if vusOverride > 0 { + s.vus = vusOverride + s.stages = nil // Use flat VU count + } + if durationOverride > 0 { + s.duration = durationOverride + } + + // If no stages and no explicit VUs, default to 1 VU for 30s + if len(s.stages) == 0 && s.vus == 0 { + s.vus = 1 + if s.duration == 0 { + s.duration = 30 * time.Second + } + } + + return s +} + +// Run executes the full VU lifecycle. +func (s *VUScheduler) Run(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Setup phase + fmt.Print("\n Running setup...") + data, err := s.runner.Setup() + if err != nil { + return fmt.Errorf("setup failed: %w", err) + } + s.setupData = data + + s.startTime = time.Now() + + // Determine total duration + totalDuration := s.duration + if len(s.stages) > 0 { + totalDuration = 0 + for _, st := range s.stages { + totalDuration += st.Duration + } + } + + fmt.Printf(" Duration: %s\n\n", totalDuration) + + // Run VUs + if len(s.stages) > 0 { + err = s.runWithStages(ctx) + } else { + err = s.runFlat(ctx) + } + + // Teardown phase + fmt.Print("\n Running teardown...") + if tdErr := s.runner.Teardown(s.setupData); tdErr != nil { + fmt.Printf(" Teardown error: %v\n", tdErr) + } + + return err +} + +// runFlat runs a fixed number of VUs for a fixed duration. +func (s *VUScheduler) runFlat(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, s.duration) + defer cancel() + + var wg sync.WaitGroup + for i := 0; i < s.vus; i++ { + wg.Add(1) + go func(vuID int) { + defer wg.Done() + s.vuLoop(ctx, vuID) + }(i) + } + + // Progress reporting + go s.reportProgress(ctx, s.duration) + + wg.Wait() + return nil +} + +// runWithStages ramps VUs according to stages. +func (s *VUScheduler) runWithStages(ctx context.Context) error { + var totalDuration time.Duration + for _, st := range s.stages { + totalDuration += st.Duration + } + + ctx, cancel := context.WithTimeout(ctx, totalDuration) + defer cancel() + + var ( + mu sync.Mutex + currentVU int + vus []*vuHandle + ) + + // VU management + addVUs := func(n int) { + mu.Lock() + defer mu.Unlock() + for i := 0; i < n; i++ { + currentVU++ + h := &vuHandle{cancel: make(chan struct{})} + vus = append(vus, h) + go func(vuID int, handle *vuHandle) { + atomic.AddInt64(&s.activeVUs, 1) + defer atomic.AddInt64(&s.activeVUs, -1) + + vuCtx, vuCancel := context.WithCancel(ctx) + go func() { + select { + case <-handle.cancel: + vuCancel() + case <-vuCtx.Done(): + } + }() + + s.vuLoop(vuCtx, vuID) + vuCancel() + }(currentVU, h) + } + } + + removeVUs := func(n int) { + mu.Lock() + defer mu.Unlock() + for i := 0; i < n && len(vus) > 0; i++ { + last := vus[len(vus)-1] + close(last.cancel) + vus = vus[:len(vus)-1] + } + } + + // Progress reporting + go s.reportProgress(ctx, totalDuration) + + // Execute stages + prevTarget := 0 + for _, st := range s.stages { + target := st.Target + diff := target - prevTarget + + if diff > 0 { + // Gradually add VUs over the stage duration + interval := st.Duration / time.Duration(diff) + for i := 0; i < diff; i++ { + select { + case <-ctx.Done(): + removeVUs(len(vus)) + return nil + case <-time.After(interval): + addVUs(1) + } + } + } else if diff < 0 { + // Gradually remove VUs + interval := st.Duration / time.Duration(-diff) + for i := 0; i < -diff; i++ { + select { + case <-ctx.Done(): + removeVUs(len(vus)) + return nil + case <-time.After(interval): + removeVUs(1) + } + } + } else { + // Hold steady + select { + case <-ctx.Done(): + removeVUs(len(vus)) + return nil + case <-time.After(st.Duration): + } + } + + prevTarget = target + } + + // Clean up remaining VUs + removeVUs(len(vus)) + return nil +} + +type vuHandle struct { + cancel chan struct{} +} + +// vuLoop runs iterations until context is cancelled. +func (s *VUScheduler) vuLoop(ctx context.Context, vuID int) { + for { + select { + case <-ctx.Done(): + return + default: + } + + if err := s.runner.Iterate(vuID, s.setupData); err != nil { + // Log but continue + _ = err + } + atomic.AddInt64(&s.iterations, 1) + } +} + +// reportProgress prints progress every 2 seconds and pushes to dashboard. +func (s *VUScheduler) reportProgress(ctx context.Context, totalDuration time.Duration) { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + printTicker := 0 + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + elapsed := time.Since(s.startTime) + iters := atomic.LoadInt64(&s.iterations) + vus := atomic.LoadInt64(&s.activeVUs) + m := s.runner.Metrics() + + totalReqs := atomic.LoadInt64(&m.TotalRequests) + totalErrs := atomic.LoadInt64(&m.TotalErrors) + rps := float64(0) + if elapsed.Seconds() > 0 { + rps = float64(totalReqs) / elapsed.Seconds() + } + + // Calculate latency stats + m.mu.Lock() + var avgLat, p95Lat, p99Lat float64 + if len(m.Durations) > 0 { + sorted := make([]float64, len(m.Durations)) + copy(sorted, m.Durations) + sort.Float64s(sorted) + + sum := 0.0 + for _, d := range sorted { + sum += d + } + avgLat = sum / float64(len(sorted)) + p95Lat = percentile(sorted, 95) + p99Lat = percentile(sorted, 99) + } + + var checkStats []map[string]interface{} + for _, c := range m.Checks { + total := c.Passed + c.Failed + rate := float64(0) + if total > 0 { + rate = float64(c.Passed) / float64(total) * 100 + } + checkStats = append(checkStats, map[string]interface{}{ + "name": c.Name, "rate": rate, "passed": c.Passed, "failed": c.Failed, + }) + } + + statusCodes := make(map[int]int64) + for k, v := range m.StatusCodes { + statusCodes[k] = v + } + m.mu.Unlock() + + // Push to dashboard + if s.dashboard != nil { + s.dashboard.Push(map[string]interface{}{ + "timestamp": time.Now().Unix(), + "rps": rps, + "total_reqs": totalReqs, + "total_errors": totalErrs, + "avg_latency": avgLat, + "p95_latency": p95Lat, + "p99_latency": p99Lat, + "active_vus": vus, + "iterations": iters, + "error_rate": func() float64 { if totalReqs > 0 { return float64(totalErrs) / float64(totalReqs) * 100 }; return 0 }(), + "status_codes": statusCodes, + "checks": checkStats, + "elapsed": elapsed.Seconds(), + }) + } + + // Print to CLI every 2 seconds + printTicker++ + if printTicker%2 == 0 { + pct := float64(elapsed) / float64(totalDuration) * 100 + if pct > 100 { + pct = 100 + } + fmt.Printf("\r [%5.1f%%] VUs: %d | Iterations: %d | Requests: %d | Errors: %d | Elapsed: %s", + pct, vus, iters, totalReqs, totalErrs, elapsed.Round(time.Second)) + } + } + } +} + +// PrintReport prints the final test report. +func PrintReport(runner Runner, elapsed time.Duration) { + m := runner.Metrics() + sc := runner.Scenario() + + fmt.Print("\n") + fmt.Println(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + fmt.Printf(" TEST REPORT: %s\n", sc.Name) + fmt.Println(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + + totalReqs := atomic.LoadInt64(&m.TotalRequests) + totalErrs := atomic.LoadInt64(&m.TotalErrors) + successRate := float64(0) + if totalReqs > 0 { + successRate = float64(totalReqs-totalErrs) / float64(totalReqs) * 100 + } + + fmt.Printf("\n Requests: %d\n", totalReqs) + fmt.Printf(" Errors: %d\n", totalErrs) + fmt.Printf(" Success Rate: %.1f%%\n", successRate) + fmt.Printf(" Duration: %s\n", elapsed.Round(time.Millisecond)) + + if totalReqs > 0 { + rps := float64(totalReqs) / elapsed.Seconds() + fmt.Printf(" RPS: %.1f\n", rps) + } + + // Latency stats + m.mu.Lock() + durations := make([]float64, len(m.Durations)) + copy(durations, m.Durations) + m.mu.Unlock() + + if len(durations) > 0 { + sort.Float64s(durations) + + avg := 0.0 + for _, d := range durations { + avg += d + } + avg /= float64(len(durations)) + + fmt.Printf("\n Latency:\n") + fmt.Printf(" Min: %s\n", fmtDuration(durations[0])) + fmt.Printf(" Avg: %s\n", fmtDuration(avg)) + fmt.Printf(" Max: %s\n", fmtDuration(durations[len(durations)-1])) + fmt.Printf(" P50: %s\n", fmtDuration(percentile(durations, 50))) + fmt.Printf(" P95: %s\n", fmtDuration(percentile(durations, 95))) + fmt.Printf(" P99: %s\n", fmtDuration(percentile(durations, 99))) + } + + // Status codes + m.mu.Lock() + if len(m.StatusCodes) > 0 { + fmt.Printf("\n Status Codes:\n") + for code, count := range m.StatusCodes { + fmt.Printf(" %d: %d\n", code, count) + } + } + + // Checks + if len(m.Checks) > 0 { + fmt.Printf("\n Checks:\n") + for _, c := range m.Checks { + total := c.Passed + c.Failed + pct := float64(c.Passed) / float64(total) * 100 + mark := "✓" + if c.Failed > 0 { + mark = "✗" + } + fmt.Printf(" %s %s: %.0f%% (%d/%d)\n", mark, c.Name, pct, c.Passed, total) + } + } + m.mu.Unlock() + + // Thresholds + if len(sc.Thresholds) > 0 { + fmt.Printf("\n Thresholds:\n") + for metric, condition := range sc.Thresholds { + // Simple threshold evaluation + passed := evaluateThreshold(metric, condition, m, durations) + mark := "✓" + if !passed { + mark = "✗" + } + fmt.Printf(" %s %s: %s\n", mark, metric, condition) + } + } + + fmt.Println() +} + +func percentile(sorted []float64, p float64) float64 { + if len(sorted) == 0 { + return 0 + } + idx := int(math.Ceil(p/100*float64(len(sorted)))) - 1 + if idx < 0 { + idx = 0 + } + if idx >= len(sorted) { + idx = len(sorted) - 1 + } + return sorted[idx] +} + +func fmtDuration(seconds float64) string { + d := time.Duration(seconds * float64(time.Second)) + if d < time.Millisecond { + return fmt.Sprintf("%.0fµs", float64(d)/float64(time.Microsecond)) + } + if d < time.Second { + return fmt.Sprintf("%.1fms", float64(d)/float64(time.Millisecond)) + } + return fmt.Sprintf("%.2fs", seconds) +} + +func evaluateThreshold(metric, condition string, m *Metrics, durations []float64) bool { + // Basic threshold evaluation for common patterns + switch { + case metric == "http_req_failed": + totalReqs := atomic.LoadInt64(&m.TotalRequests) + totalErrs := atomic.LoadInt64(&m.TotalErrors) + if totalReqs == 0 { + return true + } + errorRate := float64(totalErrs) / float64(totalReqs) + return parseAndCompare(errorRate, condition) + + case metric == "checks": + var totalPassed, totalAll int64 + m.mu.Lock() + for _, c := range m.Checks { + totalPassed += c.Passed + totalAll += c.Passed + c.Failed + } + m.mu.Unlock() + if totalAll == 0 { + return true + } + rate := float64(totalPassed) / float64(totalAll) + return parseAndCompare(rate, condition) + + default: + // Duration-based thresholds like http_req_duration{p95} + if len(durations) == 0 { + return true + } + if contains(metric, "p95") { + p95 := percentile(durations, 95) * 1000 // to ms + return parseAndCompareDuration(p95, condition) + } + if contains(metric, "p99") { + p99 := percentile(durations, 99) * 1000 + return parseAndCompareDuration(p99, condition) + } + return true + } +} + +func contains(s, sub string) bool { + return len(s) >= len(sub) && (s == sub || len(s) > 0 && containsStr(s, sub)) +} + +func containsStr(s, sub string) bool { + for i := 0; i <= len(s)-len(sub); i++ { + if s[i:i+len(sub)] == sub { + return true + } + } + return false +} + +func parseAndCompare(value float64, condition string) bool { + var op string + var threshold float64 + if _, err := fmt.Sscanf(condition, "%s %f", &op, &threshold); err != nil { + return true + } + switch op { + case "<": + return value < threshold + case "<=": + return value <= threshold + case ">": + return value > threshold + case ">=": + return value >= threshold + default: + return true + } +} + +func parseAndCompareDuration(valueMs float64, condition string) bool { + var op string + var thresholdStr string + if _, err := fmt.Sscanf(condition, "%s %s", &op, &thresholdStr); err != nil { + return true + } + // Parse duration like "500ms" + d, err := time.ParseDuration(thresholdStr) + if err != nil { + return true + } + thresholdMs := float64(d) / float64(time.Millisecond) + + switch op { + case "<": + return valueMs < thresholdMs + case "<=": + return valueMs <= thresholdMs + case ">": + return valueMs > thresholdMs + case ">=": + return valueMs >= thresholdMs + default: + return true + } +} diff --git a/sdk/python/kar98k.py b/sdk/python/kar98k.py new file mode 100644 index 0000000..d80b6ba --- /dev/null +++ b/sdk/python/kar98k.py @@ -0,0 +1,281 @@ +"""kar98k Python SDK — write load tests in native Python. + +Usage: + from kar98k import scenario, chaos, http, check, sleep, think_time, stage, ramp + + scenario(name="my-test", pattern=chaos(preset="aggressive")) + + def setup(): + return {"token": "abc"} + + def default(data): + resp = http.get("http://localhost:8080/api", headers={"Authorization": f"Bearer {data['token']}"}) + check(resp, {"status 200": lambda r: r.status == 200}) + sleep(think_time("1s", "3s")) +""" + +import sys +import json +import random +import time as _time +import inspect +import atexit + +# --- Protocol layer (hidden from user) --- + +def _send(msg): + print(json.dumps(msg), flush=True) + +def _recv(): + line = sys.stdin.readline() + if not line: + sys.exit(0) + return json.loads(line.strip()) + + +# --- Response object --- + +class Response: + """HTTP response returned by http.get/post/etc.""" + + def __init__(self, status, body="", duration=0.0, error=""): + self.status = status + self.body = body + self.duration = duration + self.error = error + + def json(self): + return json.loads(self.body) if self.body else None + + def __repr__(self): + return f"" + + +# --- HTTP module --- + +class _HTTP: + """HTTP client — http.get(), http.post(), etc.""" + + def get(self, url, **kwargs): + return self._request("GET", url, **kwargs) + + def post(self, url, **kwargs): + return self._request("POST", url, **kwargs) + + def put(self, url, **kwargs): + return self._request("PUT", url, **kwargs) + + def delete(self, url, **kwargs): + return self._request("DELETE", url, **kwargs) + + def patch(self, url, **kwargs): + return self._request("PATCH", url, **kwargs) + + def _request(self, method, url, headers=None, json_body=None, body=None): + msg = {"type": "http", "method": method, "url": url} + if headers: + msg["headers"] = headers + if json_body is not None: + msg["body"] = json.dumps(json_body) + if not headers: + msg["headers"] = {} + msg["headers"]["Content-Type"] = "application/json" + elif body: + msg["body"] = body + + _send(msg) + + # Read response from kar98k + resp_data = _recv() + return Response( + status=resp_data.get("status", 0), + body=resp_data.get("body", ""), + duration=resp_data.get("duration", 0.0), + error=resp_data.get("error", ""), + ) + + +http = _HTTP() + + +# --- Scenario config --- + +_scenario_config = {} +_chaos_config = {} +_stages = [] +_thresholds = {} + + +def scenario(name, pattern=None, vus=None, thresholds=None): + """Declare test scenario configuration.""" + global _scenario_config, _chaos_config, _stages, _thresholds + _scenario_config = {"name": name} + if pattern: + _chaos_config = pattern + if vus: + _stages = vus + if thresholds: + _thresholds = thresholds + + +def chaos(preset="moderate", spike_factor=None, noise_amplitude=None, lambda_=None): + """Configure chaos traffic patterns.""" + cfg = {"preset": preset} + if spike_factor is not None: + cfg["spike_factor"] = spike_factor + if noise_amplitude is not None: + cfg["noise_amplitude"] = noise_amplitude + if lambda_ is not None: + cfg["lambda"] = lambda_ + return cfg + + +def stage(duration, target): + """Define a VU ramping stage.""" + return {"duration": duration, "target": target} + + +def ramp(stages): + """Wrap stages for VU ramping.""" + return stages + + +# --- Check --- + +def check(response, checks): + """Run assertions against a response. + + Args: + response: Response object from http.* + checks: dict of {"name": lambda r: bool} + """ + all_passed = True + for name, fn in checks.items(): + try: + passed = bool(fn(response)) + except Exception: + passed = False + _send({"type": "check", "name": name, "passed": passed}) + if not passed: + all_passed = False + return all_passed + + +# --- Sleep / Think Time --- + +def sleep(duration): + """Sleep for a duration string (e.g., '1s', '500ms') or seconds (float).""" + if isinstance(duration, str): + _time.sleep(_parse_duration(duration)) + else: + _time.sleep(float(duration)) + + +def think_time(min_dur, max_dur): + """Generate a random duration between min and max (chaos-aware).""" + min_s = _parse_duration(min_dur) + max_s = _parse_duration(max_dur) + return random.uniform(min_s, max_s) + + +def _parse_duration(s): + """Parse Go-style duration string to seconds.""" + if isinstance(s, (int, float)): + return float(s) + s = s.strip() + if s.endswith("ms"): + return float(s[:-2]) / 1000 + elif s.endswith("us") or s.endswith("µs"): + return float(s[:-2]) / 1_000_000 + elif s.endswith("s") and not s.endswith("ms"): + return float(s[:-1]) + elif s.endswith("m"): + return float(s[:-1]) * 60 + elif s.endswith("h"): + return float(s[:-1]) * 3600 + return float(s) + + +# --- Group --- + +def group(name, fn): + """Group related requests for metric labeling.""" + return fn() + + +# --- Main loop (auto-starts when script is run by kar98k) --- + +_caller_module = None + +# Capture the importing module's globals at import time +for _frame_info in inspect.stack(): + _f = _frame_info[0] + if _f.f_globals.get("__name__") == "__main__": + _caller_module = _f.f_globals + break + + +def _main(): + global _caller_module + if _caller_module is None: + _caller_module = sys.modules.get("__main__").__dict__ if "__main__" in sys.modules else {} + + setup_fn = _caller_module.get("setup") + default_fn = _caller_module.get("default") or _caller_module.get("run") + teardown_fn = _caller_module.get("teardown") + + if default_fn is None: + _send({"type": "error", "message": "script must define a default() or run() function"}) + sys.exit(1) + + while True: + cmd = _recv() + + if cmd["cmd"] == "init": + scenario_msg = {"type": "scenario", "name": _scenario_config.get("name", "")} + if _chaos_config: + scenario_msg["chaos"] = _chaos_config + if _stages: + scenario_msg["stages"] = _stages + if _thresholds: + scenario_msg["thresholds"] = _thresholds + _send(scenario_msg) + _send({"type": "done"}) + + elif cmd["cmd"] == "setup": + data = None + if setup_fn: + data = setup_fn() + _send({"type": "done", "data": data or {}}) + + elif cmd["cmd"] == "iterate": + data = cmd.get("data", {}) + try: + default_fn(data) + except Exception as e: + _send({"type": "error", "message": str(e)}) + _send({"type": "done"}) + + elif cmd["cmd"] == "teardown": + data = cmd.get("data", {}) + if teardown_fn: + try: + teardown_fn(data) + except Exception: + pass + _send({"type": "done"}) + + else: + _send({"type": "error", "message": f"unknown command: {cmd['cmd']}"}) + + +# Auto-start: when imported by a kar98k subprocess, block on the main loop +# after the importing module finishes defining setup/default/teardown. +def _auto_start(): + """Called via atexit — runs the protocol loop on the main thread.""" + # Only activate when stdin is a pipe (i.e., run by kar98k) + if sys.stdin.isatty(): + return + _main() + +atexit.register(_auto_start) diff --git a/sdk/ruby/kar98k.rb b/sdk/ruby/kar98k.rb new file mode 100644 index 0000000..731d522 --- /dev/null +++ b/sdk/ruby/kar98k.rb @@ -0,0 +1,225 @@ +# kar98k Ruby SDK — write load tests in native Ruby. +# +# Usage: +# require_relative 'kar98k' +# +# scenario name: "my-test", pattern: chaos(preset: "aggressive") +# +# def setup +# { token: "abc" } +# end +# +# def default(data) +# resp = Http.get("http://localhost:8080/api") +# check resp, "status 200" => ->(r) { r.status == 200 } +# end + +require 'json' + +module Kar98k + # --- Protocol layer --- + + def self._send(msg) + $stdout.puts(JSON.generate(msg)) + $stdout.flush + end + + def self._recv + line = $stdin.gets + exit(0) if line.nil? + JSON.parse(line.strip) + end + + # --- Response --- + + class Response + attr_reader :status, :body, :duration, :error + + def initialize(status:, body: "", duration: 0.0, error: "") + @status = status + @body = body + @duration = duration + @error = error + end + + def json + JSON.parse(@body) rescue nil + end + + def to_s + "" + end + end + + # --- HTTP module --- + + module Http + def self.get(url, **opts); _request("GET", url, **opts); end + def self.post(url, **opts); _request("POST", url, **opts); end + def self.put(url, **opts); _request("PUT", url, **opts); end + def self.delete(url, **opts); _request("DELETE", url, **opts); end + def self.patch(url, **opts); _request("PATCH", url, **opts); end + + def self._request(method, url, headers: nil, json: nil, body: nil) + msg = { type: "http", method: method, url: url } + if headers + msg[:headers] = headers + end + if json + msg[:body] = JSON.generate(json) + msg[:headers] = (msg[:headers] || {}).merge("Content-Type" => "application/json") + elsif body + msg[:body] = body + end + + Kar98k._send(msg) + + resp_data = Kar98k._recv + Response.new( + status: resp_data["status"] || 0, + body: resp_data["body"] || "", + duration: resp_data["duration"] || 0.0, + error: resp_data["error"] || "" + ) + end + end + + # --- Config state --- + + @scenario_config = {} + @chaos_config = {} + @stages = [] + @thresholds = {} + + def self.scenario_config; @scenario_config; end + def self.chaos_config; @chaos_config; end + def self.stages_config; @stages; end + def self.thresholds_config; @thresholds; end + + # --- Main loop --- + + def self.run_loop(binding_context) + loop do + cmd = _recv + + case cmd["cmd"] + when "init" + scenario_msg = { type: "scenario", name: @scenario_config[:name] || "" } + scenario_msg[:chaos] = @chaos_config unless @chaos_config.empty? + scenario_msg[:stages] = @stages unless @stages.empty? + scenario_msg[:thresholds] = @thresholds unless @thresholds.empty? + _send(scenario_msg) + _send({ type: "done" }) + + when "setup" + data = if binding_context.respond_to?(:setup, true) + binding_context.send(:setup) + else + {} + end + _send({ type: "done", data: data || {} }) + + when "iterate" + data = cmd["data"] || {} + begin + if binding_context.respond_to?(:default, true) + binding_context.send(:default, data) + elsif binding_context.respond_to?(:run, true) + binding_context.send(:run, data) + end + rescue => e + _send({ type: "error", message: e.message }) + end + _send({ type: "done" }) + + when "teardown" + data = cmd["data"] || {} + binding_context.send(:teardown, data) if binding_context.respond_to?(:teardown, true) + _send({ type: "done" }) + + else + _send({ type: "error", message: "unknown command: #{cmd['cmd']}" }) + end + end + end +end + +# --- Top-level DSL methods (available in user scripts) --- + +def scenario(name:, pattern: nil, vus: nil, thresholds: nil) + Kar98k.scenario_config[:name] = name + Kar98k.instance_variable_set(:@chaos_config, pattern) if pattern + Kar98k.instance_variable_set(:@stages, vus) if vus + Kar98k.instance_variable_set(:@thresholds, thresholds) if thresholds +end + +def chaos(preset: "moderate", spike_factor: nil, noise_amplitude: nil, lambda_: nil) + cfg = { "preset" => preset } + cfg["spike_factor"] = spike_factor if spike_factor + cfg["noise_amplitude"] = noise_amplitude if noise_amplitude + cfg["lambda"] = lambda_ if lambda_ + cfg +end + +def stage(duration, target) + { "duration" => duration, "target" => target } +end + +def ramp(stages) + stages +end + +def check(response, checks) + all_passed = true + checks.each do |name, fn| + passed = begin + !!fn.call(response) + rescue + false + end + Kar98k._send({ type: "check", name: name, passed: passed }) + all_passed = false unless passed + end + all_passed +end + +def sleep_dur(duration) + if duration.is_a?(String) + sleep(_parse_duration(duration)) + else + sleep(duration) + end +end + +def think_time(min_dur, max_dur) + min_s = _parse_duration(min_dur) + max_s = _parse_duration(max_dur) + rand(min_s..max_s) +end + +def _parse_duration(s) + return s.to_f if s.is_a?(Numeric) + s = s.strip + if s.end_with?("ms") + s[0..-3].to_f / 1000 + elsif s.end_with?("us") || s.end_with?("µs") + s[0..-3].to_f / 1_000_000 + elsif s.end_with?("m") && !s.end_with?("ms") + s[0..-2].to_f * 60 + elsif s.end_with?("h") + s[0..-2].to_f * 3600 + elsif s.end_with?("s") + s[0..-2].to_f + else + s.to_f + end +end + +# Alias Http to top-level +Http = Kar98k::Http + +# Auto-start protocol loop when script finishes loading +at_exit do + next if $stdin.tty? + Kar98k.run_loop(self) +end