Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cmd/bench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# InceptionDB Bench Tool

## How to use

Compile and run the command.

## Test inserts

```sh
GOEXPERIMENT=jsonv2 go run . --test insert --n 2_000_000 --workers 16
```

## Test patch

```sh
GOEXPERIMENT=jsonv2 go run . --test patch --n 100_000 --workers 16
```
71 changes: 71 additions & 0 deletions cmd/bench/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"bytes"
"encoding/json"
"io"
"net/http"
"os"
"strconv"
"sync"
"time"

"github.com/fulldump/inceptiondb/bootstrap"
"github.com/fulldump/inceptiondb/configuration"
)

type JSON = map[string]any

func Parallel(workers int, f func()) {
wg := &sync.WaitGroup{}
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
f()
}()
}
wg.Wait()
}

func TempDir() (string, func()) {
dir, err := os.MkdirTemp("", "inceptiondb_bench_*")
if err != nil {
panic("Could not create temp directory: " + err.Error())
}

cleanup := func() {
os.RemoveAll(dir)
}

return dir, cleanup
}

func CreateCollection(base string) string {

name := "col-" + strconv.FormatInt(time.Now().UnixNano(), 10)

payload, _ := json.Marshal(JSON{"name": name})

req, _ := http.NewRequest("POST", base+"/v1/collections", bytes.NewReader(payload))
resp, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()

io.Copy(os.Stdout, resp.Body)

return name
}

func CreateServer(c *Config) (start, stop func()) {
dir, cleanup := TempDir()
cleanups = append(cleanups, cleanup)

conf := configuration.Default()
conf.Dir = dir
c.Base = "http://" + conf.HttpAddr

return bootstrap.Bootstrap(conf)
}
47 changes: 47 additions & 0 deletions cmd/bench/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"fmt"
"log"
"strings"

"github.com/fulldump/goconfig"
)

type Config struct {
Test string `usage:"name of the test: ALL | INSERT | PATCH"`
Base string `usage:"base URL"`
N int64 `usage:"number of documents"`
Workers int `usage:"number of workers"`
}

var cleanups []func()

func main() {

defer func() {
fmt.Println("Cleaning up...")
for _, cleanup := range cleanups {
cleanup()
}
}()

c := Config{
Test: "patch",
Base: "",
N: 1_000_000,
Workers: 16,
}
goconfig.Read(&c)

switch strings.ToUpper(c.Test) {
case "ALL":
case "INSERT":
TestInsert(c)
case "PATCH":
TestPatch(c)
default:
log.Fatalf("Unknown test %s", c.Test)
}

}
82 changes: 82 additions & 0 deletions cmd/bench/test_insert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"bufio"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync/atomic"
"time"
)

func TestInsert(c Config) {

if c.Base == "" {
start, stop := CreateServer(&c)
defer stop()
go start()
}

collection := CreateCollection(c.Base)

payload := strings.Repeat("fake ", 0)
_ = payload

client := &http.Client{
Transport: &http.Transport{
MaxConnsPerHost: 1024,
MaxIdleConnsPerHost: 1024,
MaxIdleConns: 1024,
},
}

items := c.N

go func() {
for {
fmt.Println("items:", items)
time.Sleep(1 * time.Second)
}
}()

t0 := time.Now()
Parallel(c.Workers, func() {

r, w := io.Pipe()

wb := bufio.NewWriterSize(w, 1*1024*1024)

go func() {
for {
n := atomic.AddInt64(&items, -1)
if n < 0 {
break
}
fmt.Fprintf(wb, "{\"id\":%d,\"n\":\"%d\"}\n", n, n)
}
wb.Flush()
w.Close()
}()

req, err := http.NewRequest("POST", c.Base+"/v1/collections/"+collection+":insert", r)
if err != nil {
fmt.Println("ERROR: new request:", err.Error())
os.Exit(3)
}

resp, err := client.Do(req)
if err != nil {
fmt.Println("ERROR: do request:", err.Error())
os.Exit(4)
}
io.Copy(io.Discard, resp.Body)
})

took := time.Since(t0)
fmt.Println("sent:", c.N)
fmt.Println("took:", took)
fmt.Printf("Throughput: %.2f rows/sec\n", float64(c.N)/took.Seconds())

}
125 changes: 125 additions & 0 deletions cmd/bench/test_patch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/fulldump/inceptiondb/bootstrap"
"github.com/fulldump/inceptiondb/collection"
"github.com/fulldump/inceptiondb/configuration"
)

func TestPatch(c Config) {

createServer := c.Base == ""

var start, stop func()
var dataDir string
if createServer {
dir, cleanup := TempDir()
dataDir = dir
cleanups = append(cleanups, cleanup)

conf := configuration.Default()
conf.Dir = dir
c.Base = "http://" + conf.HttpAddr

start, stop = bootstrap.Bootstrap(conf)
go start()
}

collectionName := CreateCollection(c.Base)

transport := &http.Transport{
MaxConnsPerHost: 1024,
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}
defer transport.CloseIdleConnections()

client := &http.Client{
Transport: transport,
Timeout: 10 * time.Second,
}

{
fmt.Println("Preload documents...")
r, w := io.Pipe()

encoder := json.NewEncoder(w)
go func() {
for i := int64(0); i < c.N; i++ {
encoder.Encode(JSON{
"id": strconv.FormatInt(i, 10),
"value": 0,
"worker": i % int64(c.Workers),
})
}
w.Close()
}()

req, err := http.NewRequest("POST", c.Base+"/v1/collections/"+collectionName+":insert", r)
if err != nil {
fmt.Println("ERROR: new request:", err.Error())
os.Exit(3)
}

resp, err := client.Do(req)
if err != nil {
fmt.Println("ERROR: do request:", err.Error())
os.Exit(4)
}
io.Copy(io.Discard, resp.Body)
}

patchURL := fmt.Sprintf("%s/v1/collections/%s:patch", c.Base, collectionName)

t0 := time.Now()
worker := int64(-1)
Parallel(c.Workers, func() {
w := atomic.AddInt64(&worker, 1)

body := fmt.Sprintf(`{"filter":{"worker":%d},"patch":{"value":%d},"limit":-1}`, w, 1000+worker)
req, err := http.NewRequest(http.MethodPost, patchURL, strings.NewReader(body))
if err != nil {
fmt.Println("ERROR: new request:", err.Error())
}
req.Header.Set("Content-Type", "application/json")

resp, err := client.Do(req)
if err != nil {
fmt.Println("ERROR: do request:", err.Error())
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()

if resp.StatusCode != http.StatusOK {
fmt.Println("ERROR: bad status:", resp.Status)
}
})

took := time.Since(t0)
fmt.Println("sent:", c.N)
fmt.Println("took:", took)
fmt.Printf("Throughput: %.2f rows/sec\n", float64(c.N)/took.Seconds())

if !createServer {
return
}

stop() // Stop the server

t1 := time.Now()
collection.OpenCollection(path.Join(dataDir, collectionName))
tookOpen := time.Since(t1)
fmt.Println("open took:", tookOpen)
fmt.Printf("Throughput Open: %.2f rows/sec\n", float64(c.N)/tookOpen.Seconds())
}
24 changes: 0 additions & 24 deletions cmd/streamtest/jsonv2_test.go

This file was deleted.

Loading
Loading