From 1f5fcf74946a4371a6c6bc799eed4a6cd4355f82 Mon Sep 17 00:00:00 2001 From: alfredomoraleja Date: Sat, 18 Oct 2025 09:33:48 +0200 Subject: [PATCH 1/2] Add patch benchmark harness --- cmd/patchbench/patchbench_test.go | 155 ++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 cmd/patchbench/patchbench_test.go diff --git a/cmd/patchbench/patchbench_test.go b/cmd/patchbench/patchbench_test.go new file mode 100644 index 0000000..debd36b --- /dev/null +++ b/cmd/patchbench/patchbench_test.go @@ -0,0 +1,155 @@ +package patchbench + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/fulldump/inceptiondb/bootstrap" + "github.com/fulldump/inceptiondb/configuration" +) + +func BenchmarkPatch(b *testing.B) { + b.ReportAllocs() + + conf := configuration.Default() + conf.Dir = b.TempDir() + conf.HttpAddr = "127.0.0.1:18080" + conf.ShowBanner = false + + start, stop := bootstrap.Bootstrap(conf) + defer stop() + + go start() + + baseURL := "http://" + conf.HttpAddr + collectionName := "patch-benchmark" + + transport := &http.Transport{ + MaxConnsPerHost: 1024, + MaxIdleConns: 1024, + MaxIdleConnsPerHost: 1024, + } + defer transport.CloseIdleConnections() + + client := &http.Client{ + Transport: transport, + Timeout: 10 * time.Second, + } + + ensureCollection(b, client, baseURL, collectionName) + + const datasetSize = 1024 + preloadDocuments(b, client, baseURL, collectionName, datasetSize) + + patchURL := fmt.Sprintf("%s/v1/collections/%s:patch", baseURL, collectionName) + + var opCounter int64 + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + op := atomic.AddInt64(&opCounter, 1) + targetID := int(op % datasetSize) + patchValue := op + + body := fmt.Sprintf(`{"filter":{"id":"%s"},"patch":{"value":%d}}`, strconv.Itoa(targetID), patchValue) + req, err := http.NewRequest(http.MethodPost, patchURL, strings.NewReader(body)) + if err != nil { + b.Fatalf("new request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + b.Fatalf("do request: %v", err) + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + b.Fatalf("unexpected status: %d", resp.StatusCode) + } + } + }) +} + +func ensureCollection(b *testing.B, client *http.Client, baseURL, name string) { + b.Helper() + + endpoint := baseURL + "/v1/collections" + payload := fmt.Sprintf(`{"name":"%s"}`, name) + + var lastErr error + for i := 0; i < 100; i++ { + req, err := http.NewRequest(http.MethodPost, endpoint, strings.NewReader(payload)) + if err != nil { + b.Fatalf("ensure collection request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + lastErr = err + time.Sleep(100 * time.Millisecond) + continue + } + + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusConflict { + return + } + + lastErr = fmt.Errorf("unexpected status %d", resp.StatusCode) + time.Sleep(100 * time.Millisecond) + } + + if lastErr != nil { + b.Fatalf("ensure collection: %v", lastErr) + } + b.Fatalf("ensure collection: timeout waiting for server") +} + +func preloadDocuments(b *testing.B, client *http.Client, baseURL, collection string, size int) { + b.Helper() + + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + for i := 0; i < size; i++ { + doc := map[string]interface{}{ + "id": strconv.Itoa(i), + "value": 0, + } + if err := enc.Encode(doc); err != nil { + b.Fatalf("marshal doc: %v", err) + } + } + + endpoint := fmt.Sprintf("%s/v1/collections/%s:insert", baseURL, collection) + req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(buf.Bytes())) + if err != nil { + b.Fatalf("preload request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + b.Fatalf("preload request: %v", err) + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + if resp.StatusCode != http.StatusCreated { + b.Fatalf("preload unexpected status: %d", resp.StatusCode) + } +} From ba574c71ff99962e889787633bd8e26dfc6fe4ac Mon Sep 17 00:00:00 2001 From: fulldump Date: Sun, 19 Oct 2025 16:34:09 +0200 Subject: [PATCH 2/2] refactor benchmark --- cmd/bench/README.md | 17 ++++ cmd/bench/helpers.go | 71 ++++++++++++++ cmd/bench/main.go | 47 +++++++++ cmd/bench/test_insert.go | 82 ++++++++++++++++ cmd/bench/test_patch.go | 125 ++++++++++++++++++++++++ cmd/patchbench/patchbench_test.go | 155 ------------------------------ cmd/streamtest/jsonv2_test.go | 24 ----- cmd/streamtest/streamtest_test.go | 127 ------------------------ 8 files changed, 342 insertions(+), 306 deletions(-) create mode 100644 cmd/bench/README.md create mode 100644 cmd/bench/helpers.go create mode 100644 cmd/bench/main.go create mode 100644 cmd/bench/test_insert.go create mode 100644 cmd/bench/test_patch.go delete mode 100644 cmd/patchbench/patchbench_test.go delete mode 100644 cmd/streamtest/jsonv2_test.go delete mode 100644 cmd/streamtest/streamtest_test.go diff --git a/cmd/bench/README.md b/cmd/bench/README.md new file mode 100644 index 0000000..2c174a6 --- /dev/null +++ b/cmd/bench/README.md @@ -0,0 +1,17 @@ +# InceptionDB Bench Tool + +## How to use + +Compile and run the command. + +## Test inserts + +```sh +GOEXPERIMENT=jsonv2 go run . --test insert --n 2_000_000 --workers 16 +``` + +## Test patch + +```sh +GOEXPERIMENT=jsonv2 go run . --test patch --n 100_000 --workers 16 +``` \ No newline at end of file diff --git a/cmd/bench/helpers.go b/cmd/bench/helpers.go new file mode 100644 index 0000000..c06dea9 --- /dev/null +++ b/cmd/bench/helpers.go @@ -0,0 +1,71 @@ +package main + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "os" + "strconv" + "sync" + "time" + + "github.com/fulldump/inceptiondb/bootstrap" + "github.com/fulldump/inceptiondb/configuration" +) + +type JSON = map[string]any + +func Parallel(workers int, f func()) { + wg := &sync.WaitGroup{} + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + f() + }() + } + wg.Wait() +} + +func TempDir() (string, func()) { + dir, err := os.MkdirTemp("", "inceptiondb_bench_*") + if err != nil { + panic("Could not create temp directory: " + err.Error()) + } + + cleanup := func() { + os.RemoveAll(dir) + } + + return dir, cleanup +} + +func CreateCollection(base string) string { + + name := "col-" + strconv.FormatInt(time.Now().UnixNano(), 10) + + payload, _ := json.Marshal(JSON{"name": name}) + + req, _ := http.NewRequest("POST", base+"/v1/collections", bytes.NewReader(payload)) + resp, err := http.DefaultClient.Do(req) + if err != nil { + panic(err) + } + defer resp.Body.Close() + + io.Copy(os.Stdout, resp.Body) + + return name +} + +func CreateServer(c *Config) (start, stop func()) { + dir, cleanup := TempDir() + cleanups = append(cleanups, cleanup) + + conf := configuration.Default() + conf.Dir = dir + c.Base = "http://" + conf.HttpAddr + + return bootstrap.Bootstrap(conf) +} diff --git a/cmd/bench/main.go b/cmd/bench/main.go new file mode 100644 index 0000000..162767a --- /dev/null +++ b/cmd/bench/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + "log" + "strings" + + "github.com/fulldump/goconfig" +) + +type Config struct { + Test string `usage:"name of the test: ALL | INSERT | PATCH"` + Base string `usage:"base URL"` + N int64 `usage:"number of documents"` + Workers int `usage:"number of workers"` +} + +var cleanups []func() + +func main() { + + defer func() { + fmt.Println("Cleaning up...") + for _, cleanup := range cleanups { + cleanup() + } + }() + + c := Config{ + Test: "patch", + Base: "", + N: 1_000_000, + Workers: 16, + } + goconfig.Read(&c) + + switch strings.ToUpper(c.Test) { + case "ALL": + case "INSERT": + TestInsert(c) + case "PATCH": + TestPatch(c) + default: + log.Fatalf("Unknown test %s", c.Test) + } + +} diff --git a/cmd/bench/test_insert.go b/cmd/bench/test_insert.go new file mode 100644 index 0000000..2835835 --- /dev/null +++ b/cmd/bench/test_insert.go @@ -0,0 +1,82 @@ +package main + +import ( + "bufio" + "fmt" + "io" + "net/http" + "os" + "strings" + "sync/atomic" + "time" +) + +func TestInsert(c Config) { + + if c.Base == "" { + start, stop := CreateServer(&c) + defer stop() + go start() + } + + collection := CreateCollection(c.Base) + + payload := strings.Repeat("fake ", 0) + _ = payload + + client := &http.Client{ + Transport: &http.Transport{ + MaxConnsPerHost: 1024, + MaxIdleConnsPerHost: 1024, + MaxIdleConns: 1024, + }, + } + + items := c.N + + go func() { + for { + fmt.Println("items:", items) + time.Sleep(1 * time.Second) + } + }() + + t0 := time.Now() + Parallel(c.Workers, func() { + + r, w := io.Pipe() + + wb := bufio.NewWriterSize(w, 1*1024*1024) + + go func() { + for { + n := atomic.AddInt64(&items, -1) + if n < 0 { + break + } + fmt.Fprintf(wb, "{\"id\":%d,\"n\":\"%d\"}\n", n, n) + } + wb.Flush() + w.Close() + }() + + req, err := http.NewRequest("POST", c.Base+"/v1/collections/"+collection+":insert", r) + if err != nil { + fmt.Println("ERROR: new request:", err.Error()) + os.Exit(3) + } + + resp, err := client.Do(req) + if err != nil { + fmt.Println("ERROR: do request:", err.Error()) + os.Exit(4) + } + io.Copy(io.Discard, resp.Body) + }) + + took := time.Since(t0) + fmt.Println("sent:", c.N) + fmt.Println("took:", took) + fmt.Printf("Throughput: %.2f rows/sec\n", float64(c.N)/took.Seconds()) + +} diff --git a/cmd/bench/test_patch.go b/cmd/bench/test_patch.go new file mode 100644 index 0000000..e0287c5 --- /dev/null +++ b/cmd/bench/test_patch.go @@ -0,0 +1,125 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/fulldump/inceptiondb/bootstrap" + "github.com/fulldump/inceptiondb/collection" + "github.com/fulldump/inceptiondb/configuration" +) + +func TestPatch(c Config) { + + createServer := c.Base == "" + + var start, stop func() + var dataDir string + if createServer { + dir, cleanup := TempDir() + dataDir = dir + cleanups = append(cleanups, cleanup) + + conf := configuration.Default() + conf.Dir = dir + c.Base = "http://" + conf.HttpAddr + + start, stop = bootstrap.Bootstrap(conf) + go start() + } + + collectionName := CreateCollection(c.Base) + + transport := &http.Transport{ + MaxConnsPerHost: 1024, + MaxIdleConns: 1024, + MaxIdleConnsPerHost: 1024, + } + defer transport.CloseIdleConnections() + + client := &http.Client{ + Transport: transport, + Timeout: 10 * time.Second, + } + + { + fmt.Println("Preload documents...") + r, w := io.Pipe() + + encoder := json.NewEncoder(w) + go func() { + for i := int64(0); i < c.N; i++ { + encoder.Encode(JSON{ + "id": strconv.FormatInt(i, 10), + "value": 0, + "worker": i % int64(c.Workers), + }) + } + w.Close() + }() + + req, err := http.NewRequest("POST", c.Base+"/v1/collections/"+collectionName+":insert", r) + if err != nil { + fmt.Println("ERROR: new request:", err.Error()) + os.Exit(3) + } + + resp, err := client.Do(req) + if err != nil { + fmt.Println("ERROR: do request:", err.Error()) + os.Exit(4) + } + io.Copy(io.Discard, resp.Body) + } + + patchURL := fmt.Sprintf("%s/v1/collections/%s:patch", c.Base, collectionName) + + t0 := time.Now() + worker := int64(-1) + Parallel(c.Workers, func() { + w := atomic.AddInt64(&worker, 1) + + body := fmt.Sprintf(`{"filter":{"worker":%d},"patch":{"value":%d},"limit":-1}`, w, 1000+worker) + req, err := http.NewRequest(http.MethodPost, patchURL, strings.NewReader(body)) + if err != nil { + fmt.Println("ERROR: new request:", err.Error()) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + fmt.Println("ERROR: do request:", err.Error()) + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + fmt.Println("ERROR: bad status:", resp.Status) + } + }) + + took := time.Since(t0) + fmt.Println("sent:", c.N) + fmt.Println("took:", took) + fmt.Printf("Throughput: %.2f rows/sec\n", float64(c.N)/took.Seconds()) + + if !createServer { + return + } + + stop() // Stop the server + + t1 := time.Now() + collection.OpenCollection(path.Join(dataDir, collectionName)) + tookOpen := time.Since(t1) + fmt.Println("open took:", tookOpen) + fmt.Printf("Throughput Open: %.2f rows/sec\n", float64(c.N)/tookOpen.Seconds()) +} diff --git a/cmd/patchbench/patchbench_test.go b/cmd/patchbench/patchbench_test.go deleted file mode 100644 index debd36b..0000000 --- a/cmd/patchbench/patchbench_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package patchbench - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net/http" - "strconv" - "strings" - "sync/atomic" - "testing" - "time" - - "github.com/fulldump/inceptiondb/bootstrap" - "github.com/fulldump/inceptiondb/configuration" -) - -func BenchmarkPatch(b *testing.B) { - b.ReportAllocs() - - conf := configuration.Default() - conf.Dir = b.TempDir() - conf.HttpAddr = "127.0.0.1:18080" - conf.ShowBanner = false - - start, stop := bootstrap.Bootstrap(conf) - defer stop() - - go start() - - baseURL := "http://" + conf.HttpAddr - collectionName := "patch-benchmark" - - transport := &http.Transport{ - MaxConnsPerHost: 1024, - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - } - defer transport.CloseIdleConnections() - - client := &http.Client{ - Transport: transport, - Timeout: 10 * time.Second, - } - - ensureCollection(b, client, baseURL, collectionName) - - const datasetSize = 1024 - preloadDocuments(b, client, baseURL, collectionName, datasetSize) - - patchURL := fmt.Sprintf("%s/v1/collections/%s:patch", baseURL, collectionName) - - var opCounter int64 - - b.ResetTimer() - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - op := atomic.AddInt64(&opCounter, 1) - targetID := int(op % datasetSize) - patchValue := op - - body := fmt.Sprintf(`{"filter":{"id":"%s"},"patch":{"value":%d}}`, strconv.Itoa(targetID), patchValue) - req, err := http.NewRequest(http.MethodPost, patchURL, strings.NewReader(body)) - if err != nil { - b.Fatalf("new request: %v", err) - } - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Do(req) - if err != nil { - b.Fatalf("do request: %v", err) - } - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - b.Fatalf("unexpected status: %d", resp.StatusCode) - } - } - }) -} - -func ensureCollection(b *testing.B, client *http.Client, baseURL, name string) { - b.Helper() - - endpoint := baseURL + "/v1/collections" - payload := fmt.Sprintf(`{"name":"%s"}`, name) - - var lastErr error - for i := 0; i < 100; i++ { - req, err := http.NewRequest(http.MethodPost, endpoint, strings.NewReader(payload)) - if err != nil { - b.Fatalf("ensure collection request: %v", err) - } - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Do(req) - if err != nil { - lastErr = err - time.Sleep(100 * time.Millisecond) - continue - } - - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - - if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusConflict { - return - } - - lastErr = fmt.Errorf("unexpected status %d", resp.StatusCode) - time.Sleep(100 * time.Millisecond) - } - - if lastErr != nil { - b.Fatalf("ensure collection: %v", lastErr) - } - b.Fatalf("ensure collection: timeout waiting for server") -} - -func preloadDocuments(b *testing.B, client *http.Client, baseURL, collection string, size int) { - b.Helper() - - var buf bytes.Buffer - enc := json.NewEncoder(&buf) - for i := 0; i < size; i++ { - doc := map[string]interface{}{ - "id": strconv.Itoa(i), - "value": 0, - } - if err := enc.Encode(doc); err != nil { - b.Fatalf("marshal doc: %v", err) - } - } - - endpoint := fmt.Sprintf("%s/v1/collections/%s:insert", baseURL, collection) - req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(buf.Bytes())) - if err != nil { - b.Fatalf("preload request: %v", err) - } - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Do(req) - if err != nil { - b.Fatalf("preload request: %v", err) - } - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - - if resp.StatusCode != http.StatusCreated { - b.Fatalf("preload unexpected status: %d", resp.StatusCode) - } -} diff --git a/cmd/streamtest/jsonv2_test.go b/cmd/streamtest/jsonv2_test.go deleted file mode 100644 index e4fea62..0000000 --- a/cmd/streamtest/jsonv2_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package streamtest - -import ( - "bytes" - "encoding/json/jsontext" - json2 "encoding/json/v2" - "testing" - - "github.com/fulldump/biff" -) - -func Test_Jsonv2(t *testing.T) { - b := bytes.NewBufferString(`{"hello":"world"}`) - jsonDecoder := jsontext.NewDecoder(b) - greeting := struct { - Hello string `json:"hello"` - }{ - Hello: "Trololo", - } - err := json2.UnmarshalDecode(jsonDecoder, &greeting) - - biff.AssertNil(err) - biff.AssertEqual(greeting.Hello, "world") -} diff --git a/cmd/streamtest/streamtest_test.go b/cmd/streamtest/streamtest_test.go deleted file mode 100644 index 6a7b569..0000000 --- a/cmd/streamtest/streamtest_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package streamtest - -import ( - "bufio" - "fmt" - "io" - "log" - "net/http" - "os" - "strings" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/fulldump/inceptiondb/bootstrap" - "github.com/fulldump/inceptiondb/configuration" -) - -type Item struct { - Id int64 `json:"id"` - Payload string `json:"payload"` -} - -func Parallel(workers int, f func()) { - wg := &sync.WaitGroup{} - for i := 0; i < workers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - f() - }() - } - wg.Wait() -} - -func Test_Streamtest(t *testing.T) { - - t.SkipNow() - - if false { - conf := configuration.Default() - conf.Dir = t.TempDir() - - start, stop := bootstrap.Bootstrap(conf) - defer stop() - - go start() - } - - base := "https://inceptiondb.io" - base = "http://localhost:8080" - - { - // Create collection - payload := strings.NewReader(`{"name": "streammm"}`) - req, _ := http.NewRequest("POST", base+"/v1/collections", payload) - resp, err := http.DefaultClient.Do(req) - if err != nil { - log.Fatal(err) - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - log.Fatal(err) - } - fmt.Println(string(body)) - } - - counter := int64(0) - t0 := time.Now() - load_per_worker := 100_000 - - payload := strings.Repeat("fake ", 0) - _ = payload - - c := &http.Client{ - Transport: &http.Transport{ - MaxConnsPerHost: 1024, - MaxIdleConnsPerHost: 1024, - MaxIdleConns: 1024, - }, - } - - Parallel(16, func() { - - r, w := io.Pipe() - - wb := bufio.NewWriterSize(w, 1*1024*1024) - - go func() { - // e := json.NewEncoder(w) - for i := 0; i < load_per_worker; i++ { - // e.Encode(Item{ - // Id: atomic.AddInt64(&counter, 1), - // Payload: payload, - // }) - n := atomic.AddInt64(&counter, 1) - fmt.Fprintf(wb, "{\"id\":%d,\"n\":\"%d\"}\n", n, n) - } - wb.Flush() - w.Close() - }() - - { - req, err := http.NewRequest("POST", base+"/v1/collections/streammm:insert", r) - if err != nil { - fmt.Println("ERROR: new request:", err.Error()) - os.Exit(3) - } - - resp, err := c.Do(req) - if err != nil { - fmt.Println("ERROR: do request:", err.Error()) - os.Exit(4) - } - io.Copy(io.Discard, resp.Body) - } - }) - - took := time.Since(t0) - fmt.Println("received:", counter) - fmt.Println("took:", took) - fmt.Printf("Throughput: %.2f rows/sec\n", float64(counter)/took.Seconds()) - -}