diff --git a/cmd/bench/README.md b/cmd/bench/README.md new file mode 100644 index 0000000..2c174a6 --- /dev/null +++ b/cmd/bench/README.md @@ -0,0 +1,17 @@ +# InceptionDB Bench Tool + +## How to use + +Compile and run the command. + +## Test inserts + +```sh +GOEXPERIMENT=jsonv2 go run . --test insert --n 2_000_000 --workers 16 +``` + +## Test patch + +```sh +GOEXPERIMENT=jsonv2 go run . --test patch --n 100_000 --workers 16 +``` \ No newline at end of file diff --git a/cmd/bench/helpers.go b/cmd/bench/helpers.go new file mode 100644 index 0000000..c06dea9 --- /dev/null +++ b/cmd/bench/helpers.go @@ -0,0 +1,71 @@ +package main + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "os" + "strconv" + "sync" + "time" + + "github.com/fulldump/inceptiondb/bootstrap" + "github.com/fulldump/inceptiondb/configuration" +) + +type JSON = map[string]any + +func Parallel(workers int, f func()) { + wg := &sync.WaitGroup{} + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + f() + }() + } + wg.Wait() +} + +func TempDir() (string, func()) { + dir, err := os.MkdirTemp("", "inceptiondb_bench_*") + if err != nil { + panic("Could not create temp directory: " + err.Error()) + } + + cleanup := func() { + os.RemoveAll(dir) + } + + return dir, cleanup +} + +func CreateCollection(base string) string { + + name := "col-" + strconv.FormatInt(time.Now().UnixNano(), 10) + + payload, _ := json.Marshal(JSON{"name": name}) + + req, _ := http.NewRequest("POST", base+"/v1/collections", bytes.NewReader(payload)) + resp, err := http.DefaultClient.Do(req) + if err != nil { + panic(err) + } + defer resp.Body.Close() + + io.Copy(os.Stdout, resp.Body) + + return name +} + +func CreateServer(c *Config) (start, stop func()) { + dir, cleanup := TempDir() + cleanups = append(cleanups, cleanup) + + conf := configuration.Default() + conf.Dir = dir + c.Base = "http://" + conf.HttpAddr + + return bootstrap.Bootstrap(conf) +} diff --git a/cmd/bench/main.go b/cmd/bench/main.go new file mode 100644 index 0000000..162767a --- /dev/null +++ b/cmd/bench/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + "log" + "strings" + + "github.com/fulldump/goconfig" +) + +type Config struct { + Test string `usage:"name of the test: ALL | INSERT | PATCH"` + Base string `usage:"base URL"` + N int64 `usage:"number of documents"` + Workers int `usage:"number of workers"` +} + +var cleanups []func() + +func main() { + + defer func() { + fmt.Println("Cleaning up...") + for _, cleanup := range cleanups { + cleanup() + } + }() + + c := Config{ + Test: "patch", + Base: "", + N: 1_000_000, + Workers: 16, + } + goconfig.Read(&c) + + switch strings.ToUpper(c.Test) { + case "ALL": + case "INSERT": + TestInsert(c) + case "PATCH": + TestPatch(c) + default: + log.Fatalf("Unknown test %s", c.Test) + } + +} diff --git a/cmd/bench/test_insert.go b/cmd/bench/test_insert.go new file mode 100644 index 0000000..2835835 --- /dev/null +++ b/cmd/bench/test_insert.go @@ -0,0 +1,82 @@ +package main + +import ( + "bufio" + "fmt" + "io" + "net/http" + "os" + "strings" + "sync/atomic" + "time" +) + +func TestInsert(c Config) { + + if c.Base == "" { + start, stop := CreateServer(&c) + defer stop() + go start() + } + + collection := CreateCollection(c.Base) + + payload := strings.Repeat("fake ", 0) + _ = payload + + client := &http.Client{ + Transport: &http.Transport{ + MaxConnsPerHost: 1024, + MaxIdleConnsPerHost: 1024, + MaxIdleConns: 1024, + }, + } + + items := c.N + + go func() { + for { + fmt.Println("items:", items) + time.Sleep(1 * time.Second) + } + }() + + t0 := time.Now() + Parallel(c.Workers, func() { + + r, w := io.Pipe() + + wb := bufio.NewWriterSize(w, 1*1024*1024) + + go func() { + for { + n := atomic.AddInt64(&items, -1) + if n < 0 { + break + } + fmt.Fprintf(wb, "{\"id\":%d,\"n\":\"%d\"}\n", n, n) + } + wb.Flush() + w.Close() + }() + + req, err := http.NewRequest("POST", c.Base+"/v1/collections/"+collection+":insert", r) + if err != nil { + fmt.Println("ERROR: new request:", err.Error()) + os.Exit(3) + } + + resp, err := client.Do(req) + if err != nil { + fmt.Println("ERROR: do request:", err.Error()) + os.Exit(4) + } + io.Copy(io.Discard, resp.Body) + }) + + took := time.Since(t0) + fmt.Println("sent:", c.N) + fmt.Println("took:", took) + fmt.Printf("Throughput: %.2f rows/sec\n", float64(c.N)/took.Seconds()) + +} diff --git a/cmd/bench/test_patch.go b/cmd/bench/test_patch.go new file mode 100644 index 0000000..e0287c5 --- /dev/null +++ b/cmd/bench/test_patch.go @@ -0,0 +1,125 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/fulldump/inceptiondb/bootstrap" + "github.com/fulldump/inceptiondb/collection" + "github.com/fulldump/inceptiondb/configuration" +) + +func TestPatch(c Config) { + + createServer := c.Base == "" + + var start, stop func() + var dataDir string + if createServer { + dir, cleanup := TempDir() + dataDir = dir + cleanups = append(cleanups, cleanup) + + conf := configuration.Default() + conf.Dir = dir + c.Base = "http://" + conf.HttpAddr + + start, stop = bootstrap.Bootstrap(conf) + go start() + } + + collectionName := CreateCollection(c.Base) + + transport := &http.Transport{ + MaxConnsPerHost: 1024, + MaxIdleConns: 1024, + MaxIdleConnsPerHost: 1024, + } + defer transport.CloseIdleConnections() + + client := &http.Client{ + Transport: transport, + Timeout: 10 * time.Second, + } + + { + fmt.Println("Preload documents...") + r, w := io.Pipe() + + encoder := json.NewEncoder(w) + go func() { + for i := int64(0); i < c.N; i++ { + encoder.Encode(JSON{ + "id": strconv.FormatInt(i, 10), + "value": 0, + "worker": i % int64(c.Workers), + }) + } + w.Close() + }() + + req, err := http.NewRequest("POST", c.Base+"/v1/collections/"+collectionName+":insert", r) + if err != nil { + fmt.Println("ERROR: new request:", err.Error()) + os.Exit(3) + } + + resp, err := client.Do(req) + if err != nil { + fmt.Println("ERROR: do request:", err.Error()) + os.Exit(4) + } + io.Copy(io.Discard, resp.Body) + } + + patchURL := fmt.Sprintf("%s/v1/collections/%s:patch", c.Base, collectionName) + + t0 := time.Now() + worker := int64(-1) + Parallel(c.Workers, func() { + w := atomic.AddInt64(&worker, 1) + + body := fmt.Sprintf(`{"filter":{"worker":%d},"patch":{"value":%d},"limit":-1}`, w, 1000+worker) + req, err := http.NewRequest(http.MethodPost, patchURL, strings.NewReader(body)) + if err != nil { + fmt.Println("ERROR: new request:", err.Error()) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + fmt.Println("ERROR: do request:", err.Error()) + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + fmt.Println("ERROR: bad status:", resp.Status) + } + }) + + took := time.Since(t0) + fmt.Println("sent:", c.N) + fmt.Println("took:", took) + fmt.Printf("Throughput: %.2f rows/sec\n", float64(c.N)/took.Seconds()) + + if !createServer { + return + } + + stop() // Stop the server + + t1 := time.Now() + collection.OpenCollection(path.Join(dataDir, collectionName)) + tookOpen := time.Since(t1) + fmt.Println("open took:", tookOpen) + fmt.Printf("Throughput Open: %.2f rows/sec\n", float64(c.N)/tookOpen.Seconds()) +} diff --git a/cmd/streamtest/jsonv2_test.go b/cmd/streamtest/jsonv2_test.go deleted file mode 100644 index e4fea62..0000000 --- a/cmd/streamtest/jsonv2_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package streamtest - -import ( - "bytes" - "encoding/json/jsontext" - json2 "encoding/json/v2" - "testing" - - "github.com/fulldump/biff" -) - -func Test_Jsonv2(t *testing.T) { - b := bytes.NewBufferString(`{"hello":"world"}`) - jsonDecoder := jsontext.NewDecoder(b) - greeting := struct { - Hello string `json:"hello"` - }{ - Hello: "Trololo", - } - err := json2.UnmarshalDecode(jsonDecoder, &greeting) - - biff.AssertNil(err) - biff.AssertEqual(greeting.Hello, "world") -} diff --git a/cmd/streamtest/streamtest_test.go b/cmd/streamtest/streamtest_test.go deleted file mode 100644 index 6a7b569..0000000 --- a/cmd/streamtest/streamtest_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package streamtest - -import ( - "bufio" - "fmt" - "io" - "log" - "net/http" - "os" - "strings" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/fulldump/inceptiondb/bootstrap" - "github.com/fulldump/inceptiondb/configuration" -) - -type Item struct { - Id int64 `json:"id"` - Payload string `json:"payload"` -} - -func Parallel(workers int, f func()) { - wg := &sync.WaitGroup{} - for i := 0; i < workers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - f() - }() - } - wg.Wait() -} - -func Test_Streamtest(t *testing.T) { - - t.SkipNow() - - if false { - conf := configuration.Default() - conf.Dir = t.TempDir() - - start, stop := bootstrap.Bootstrap(conf) - defer stop() - - go start() - } - - base := "https://inceptiondb.io" - base = "http://localhost:8080" - - { - // Create collection - payload := strings.NewReader(`{"name": "streammm"}`) - req, _ := http.NewRequest("POST", base+"/v1/collections", payload) - resp, err := http.DefaultClient.Do(req) - if err != nil { - log.Fatal(err) - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - log.Fatal(err) - } - fmt.Println(string(body)) - } - - counter := int64(0) - t0 := time.Now() - load_per_worker := 100_000 - - payload := strings.Repeat("fake ", 0) - _ = payload - - c := &http.Client{ - Transport: &http.Transport{ - MaxConnsPerHost: 1024, - MaxIdleConnsPerHost: 1024, - MaxIdleConns: 1024, - }, - } - - Parallel(16, func() { - - r, w := io.Pipe() - - wb := bufio.NewWriterSize(w, 1*1024*1024) - - go func() { - // e := json.NewEncoder(w) - for i := 0; i < load_per_worker; i++ { - // e.Encode(Item{ - // Id: atomic.AddInt64(&counter, 1), - // Payload: payload, - // }) - n := atomic.AddInt64(&counter, 1) - fmt.Fprintf(wb, "{\"id\":%d,\"n\":\"%d\"}\n", n, n) - } - wb.Flush() - w.Close() - }() - - { - req, err := http.NewRequest("POST", base+"/v1/collections/streammm:insert", r) - if err != nil { - fmt.Println("ERROR: new request:", err.Error()) - os.Exit(3) - } - - resp, err := c.Do(req) - if err != nil { - fmt.Println("ERROR: do request:", err.Error()) - os.Exit(4) - } - io.Copy(io.Discard, resp.Body) - } - }) - - took := time.Since(t0) - fmt.Println("received:", counter) - fmt.Println("took:", took) - fmt.Printf("Throughput: %.2f rows/sec\n", float64(counter)/took.Seconds()) - -}