From 6ede139c97ffd64e2f571636ce296f4fbfc580b7 Mon Sep 17 00:00:00 2001 From: fulldump Date: Sat, 18 Oct 2025 04:53:52 +0200 Subject: [PATCH] optimize index operations by reusing the item (map[string]any) --- cmd/streamtest/streamtest_test.go | 10 +++++-- collection/collection.go | 48 ++++++++++++++++++------------- collection/collection_test.go | 24 ++++++++-------- collection/index.go | 4 +-- collection/indexbtree.go | 16 ++++------- collection/indexbtree_test.go | 27 ++++++++++------- collection/indexmap.go | 17 ++--------- collection/indexmap_test.go | 23 +++++++++------ collection/indexsyncmap.go | 26 +++++++---------- 9 files changed, 99 insertions(+), 96 deletions(-) diff --git a/cmd/streamtest/streamtest_test.go b/cmd/streamtest/streamtest_test.go index 6a7b569..f9eb3ab 100644 --- a/cmd/streamtest/streamtest_test.go +++ b/cmd/streamtest/streamtest_test.go @@ -38,7 +38,7 @@ func Test_Streamtest(t *testing.T) { t.SkipNow() - if false { + if true { conf := configuration.Default() conf.Dir = t.TempDir() @@ -53,8 +53,12 @@ func Test_Streamtest(t *testing.T) { { // Create collection - payload := strings.NewReader(`{"name": "streammm"}`) - req, _ := http.NewRequest("POST", base+"/v1/collections", payload) + payload := strings.NewReader(`{ + "field": "n", + "name": "my-index", + "type": "map" + }`) + req, _ := http.NewRequest("POST", base+"/v1/collections/streammm:createIndex", payload) resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) diff --git a/collection/collection.go b/collection/collection.go index 88ee2e9..d1e2857 100644 --- a/collection/collection.go +++ b/collection/collection.go @@ -107,7 +107,7 @@ func OpenCollection(filename string) (*Collection, error) { switch command.Name { case "insert": - _, err := collection.addRow(command.Payload) + _, err := collection.addRow(command.Payload, nil) if err != nil { return nil, err } @@ -181,13 +181,13 @@ func OpenCollection(filename string) (*Collection, error) { return collection, nil } -func (c *Collection) addRow(payload json.RawMessage) (*Row, error) { +func (c *Collection) addRow(payload json.RawMessage, item map[string]any) (*Row, error) { row := &Row{ Payload: payload, } - err := indexInsert(c.Indexes, row) + err := indexInsert(c.Indexes, row, item) if err != nil { return nil, err } @@ -209,12 +209,6 @@ func (c *Collection) Insert(item map[string]any) (*Row, error) { auto := atomic.AddInt64(&c.Count, 1) if c.Defaults != nil { - // item := map[string]any{} // todo: item is shadowed, choose a better name - // err := json.Unmarshal(payload, &item) - // if err != nil { - // return nil, fmt.Errorf("json encode defaults: %w", err) - // } - for k, v := range c.Defaults { if item[k] != nil { continue @@ -234,13 +228,13 @@ func (c *Collection) Insert(item map[string]any) (*Row, error) { } } - payload, err := json.Marshal(item) + payload, err := json2.Marshal(item) if err != nil { return nil, fmt.Errorf("json encode payload: %w", err) } // Add row - row, err := c.addRow(payload) + row, err := c.addRow(payload, item) if err != nil { return nil, err } @@ -359,7 +353,9 @@ func (c *Collection) createIndex(name string, options interface{}, persist bool) // Add all rows to the index for _, row := range c.Rows { - err := index.AddRow(row) + item := map[string]any{} + json2.Unmarshal(row.Payload, &item) + err := index.AddRow(row, item) if err != nil { delete(c.Indexes, name) return fmt.Errorf("index row: %s, data: %s", err.Error(), string(row.Payload)) @@ -390,7 +386,12 @@ func (c *Collection) createIndex(name string, options interface{}, persist bool) return c.EncodeCommand(command) } -func indexInsert(indexes map[string]*collectionIndex, row *Row) (err error) { +func indexInsert(indexes map[string]*collectionIndex, row *Row, item map[string]any) (err error) { + + if item == nil { + item = make(map[string]any) + json2.Unmarshal(row.Payload, &item) + } // Note: rollbacks array should be kept in stack if it is smaller than 65536 bytes, so // our recommended maximum number of indexes should NOT exceed 8192 indexes @@ -403,12 +404,12 @@ func indexInsert(indexes map[string]*collectionIndex, row *Row) (err error) { return } for i := 0; i < c; i++ { - rollbacks[i].RemoveRow(row) + rollbacks[i].RemoveRow(row, item) } }() for key, index := range indexes { - err = index.AddRow(row) + err = index.AddRow(row, item) if err != nil { return fmt.Errorf("index add '%s': %s", key, err.Error()) } @@ -420,9 +421,10 @@ func indexInsert(indexes map[string]*collectionIndex, row *Row) (err error) { return } -func indexRemove(indexes map[string]*collectionIndex, row *Row) (err error) { +func indexRemove(indexes map[string]*collectionIndex, row *Row, item map[string]any) (err error) { + for key, index := range indexes { - err = index.RemoveRow(row) + err = index.RemoveRow(row, item) if err != nil { // TODO: does this make any sense? return fmt.Errorf("index remove '%s': %s", key, err.Error()) @@ -445,6 +447,9 @@ func lockBlock(m *sync.Mutex, f func() error) error { func (c *Collection) removeByRow(row *Row, persist bool) error { // todo: rename to 'removeRow' + item := map[string]any{} + json2.Unmarshal(row.Payload, &item) + var i int err := lockBlock(c.rowsMutex, func() error { i = row.I @@ -452,7 +457,7 @@ func (c *Collection) removeByRow(row *Row, persist bool) error { // todo: rename return fmt.Errorf("row %d does not exist", i) } - err := indexRemove(c.Indexes, row) + err := indexRemove(c.Indexes, row, item) if err != nil { return fmt.Errorf("could not free index") } @@ -514,15 +519,18 @@ func (c *Collection) patchByRow(row *Row, patch interface{}, persist bool) error return nil } + item := map[string]any{} + json2.Unmarshal(row.Payload, &item) + // index update - err = indexRemove(c.Indexes, row) + err = indexRemove(c.Indexes, row, item) if err != nil { return fmt.Errorf("indexRemove: %w", err) } row.Payload = newPayload - err = indexInsert(c.Indexes, row) + err = indexInsert(c.Indexes, row, item) if err != nil { return fmt.Errorf("indexInsert: %w", err) } diff --git a/collection/collection_test.go b/collection/collection_test.go index a5ef181..6092a27 100644 --- a/collection/collection_test.go +++ b/collection/collection_test.go @@ -401,16 +401,16 @@ func TestPersistenceUpdate_TwiceOptimization(t *testing.T) { } type MockIndex struct { - AddRowCallback func(row *Row) error - RemoveRowCallback func(row *Row) error + AddRowCallback func(row *Row, item map[string]any) error + RemoveRowCallback func(row *Row, item map[string]any) error } -func (m *MockIndex) AddRow(row *Row) error { - return m.AddRowCallback(row) +func (m *MockIndex) AddRow(row *Row, item map[string]any) error { + return m.AddRowCallback(row, item) } -func (m *MockIndex) RemoveRow(row *Row) error { - return m.RemoveRowCallback(row) +func (m *MockIndex) RemoveRow(row *Row, item map[string]any) error { + return m.RemoveRowCallback(row, item) } func (m *MockIndex) Traverse(options []byte, f func(row *Row) bool) { @@ -425,14 +425,14 @@ func TestIndexInsert_Rollback(t *testing.T) { newMock := func(name string) Index { return &MockIndex{ - AddRowCallback: func(row *Row) error { + AddRowCallback: func(row *Row, item map[string]any) error { if len(adds) == 2 { return errors.New("mock error") } adds = append(adds, name) return nil }, - RemoveRowCallback: func(row *Row) error { + RemoveRowCallback: func(row *Row, item map[string]any) error { removes = append(removes, name) return nil }, @@ -453,7 +453,7 @@ func TestIndexInsert_Rollback(t *testing.T) { row := &Row{} - indexInsert(indexes, row) + indexInsert(indexes, row, nil) AssertEqual(removes, adds) AssertEqual(len(removes), 2) @@ -473,13 +473,13 @@ func TestIndexInsert_Rollback_BlackBox(t *testing.T) { Payload: []byte(`{"id":"my-id"}`), } - err1 := indexInsert(indexes, row) + err1 := indexInsert(indexes, row, nil) AssertNil(err1) - err2 := indexInsert(indexes, row) + err2 := indexInsert(indexes, row, nil) AssertNotNil(err2) - err3 := indexInsert(indexes, row) + err3 := indexInsert(indexes, row, nil) AssertNotNil(err3) } diff --git a/collection/index.go b/collection/index.go index 7517d49..bf596c9 100644 --- a/collection/index.go +++ b/collection/index.go @@ -1,7 +1,7 @@ package collection type Index interface { - AddRow(row *Row) error - RemoveRow(row *Row) error + AddRow(row *Row, item map[string]any) error + RemoveRow(row *Row, item map[string]any) error Traverse(options []byte, f func(row *Row) bool) // todo: return error? } diff --git a/collection/indexbtree.go b/collection/indexbtree.go index 9548319..d684e12 100644 --- a/collection/indexbtree.go +++ b/collection/indexbtree.go @@ -14,19 +14,17 @@ type IndexBtree struct { Options *IndexBTreeOptions } -func (b *IndexBtree) RemoveRow(r *Row) error { +func (b *IndexBtree) RemoveRow(row *Row, item map[string]any) error { // TODO: duplicated code: values := []interface{}{} - data := map[string]interface{}{} - json.Unmarshal(r.Payload, &data) for _, field := range b.Options.Fields { - values = append(values, data[field]) + values = append(values, item[field]) } b.Btree.Delete(&RowOrdered{ - Row: r, // probably r is not needed + Row: row, // probably r is not needed Values: values, }) @@ -102,14 +100,12 @@ func NewIndexBTree(options *IndexBTreeOptions) *IndexBtree { } } -func (b *IndexBtree) AddRow(r *Row) error { +func (b *IndexBtree) AddRow(row *Row, item map[string]any) error { var values []interface{} - data := map[string]interface{}{} - json.Unmarshal(r.Payload, &data) for _, field := range b.Options.Fields { field = strings.TrimPrefix(field, "-") - value, exists := data[field] + value, exists := item[field] if exists { values = append(values, value) continue @@ -134,7 +130,7 @@ func (b *IndexBtree) AddRow(r *Row) error { } b.Btree.ReplaceOrInsert(&RowOrdered{ - Row: r, + Row: row, Values: values, }) diff --git a/collection/indexbtree_test.go b/collection/indexbtree_test.go index 9290577..5504bf1 100644 --- a/collection/indexbtree_test.go +++ b/collection/indexbtree_test.go @@ -22,12 +22,13 @@ func Test_IndexBTree_HappyPath(t *testing.T) { n := 4 for i := 0; i < n; i++ { - data, _ := json.Marshal(JSON{ + item := JSON{ "id": float64(i), - }) + } + data, _ := json.Marshal(item) index.AddRow(&Row{ Payload: data, - }) + }, item) } { @@ -70,10 +71,12 @@ func TestIndexBtree_AddRow_Sparse(t *testing.T) { } for i, document := range payloads { + item := JSON{} + json.Unmarshal([]byte(document), &item) err := index.AddRow(&Row{ I: i, Payload: json.RawMessage(document), - }) + }, item) biff.AssertNil(err) } @@ -101,20 +104,22 @@ func TestIndexBtree_RemoveRow(t *testing.T) { } for _, document := range documents { + item := JSON{} + json.Unmarshal([]byte(document), &item) err := index.AddRow(&Row{ Payload: json.RawMessage(document), - }) + }, item) biff.AssertNil(err) } errFirst := index.RemoveRow(&Row{ Payload: json.RawMessage(`{"name":"b"}`), - }) + }, JSON{"name": "b"}) biff.AssertNil(errFirst) errSecond := index.RemoveRow(&Row{ Payload: json.RawMessage(`{"name":"b"}`), - }) + }, JSON{"name": "b"}) biff.AssertNil(errSecond) expectedDocuments := []string{ @@ -139,13 +144,13 @@ func TestIndexBtree_AddRow_NonSparse(t *testing.T) { // Insert defined field errValid := index.AddRow(&Row{ Payload: json.RawMessage(`{"name":"Fulanez", "year":1986}`), - }) + }, JSON{"name": "Fulanez", "year": 1986}) biff.AssertNil(errValid) // Insert undefined field errInvalid := index.AddRow(&Row{ Payload: json.RawMessage(`{"name":"Fulanez"}`), - }) + }, JSON{"name": "Fulanez"}) biff.AssertEqual(errInvalid.Error(), "field 'year' not defined") } @@ -159,13 +164,13 @@ func TestIndexBtree_AddRow_Conflict(t *testing.T) { // Insert first errValid := index.AddRow(&Row{ Payload: json.RawMessage(`{"product_code":1,"product_category":"cat1"}`), - }) + }, JSON{"product_code": 1, "product_category": "cat1"}) biff.AssertNil(errValid) // Insert same value errConflict := index.AddRow(&Row{ Payload: json.RawMessage(`{"product_code":1,"product_category":"cat1"}`), - }) + }, JSON{"product_code": 1, "product_category": "cat1"}) biff.AssertEqual(errConflict.Error(), "key (product_code:1,product_category:cat1) already exists") } diff --git a/collection/indexmap.go b/collection/indexmap.go index 77901f6..51dc798 100644 --- a/collection/indexmap.go +++ b/collection/indexmap.go @@ -21,14 +21,7 @@ func NewIndexMap(options *IndexMapOptions) *IndexMap { } } -func (i *IndexMap) RemoveRow(row *Row) error { - - item := map[string]interface{}{} - - err := json.Unmarshal(row.Payload, &item) - if err != nil { - return fmt.Errorf("unmarshal: %w", err) - } +func (i *IndexMap) RemoveRow(row *Row, item map[string]any) error { field := i.Options.Field entries := i.Entries @@ -55,13 +48,7 @@ func (i *IndexMap) RemoveRow(row *Row) error { return nil } -func (i *IndexMap) AddRow(row *Row) error { - - item := map[string]interface{}{} - err := json.Unmarshal(row.Payload, &item) - if err != nil { - return fmt.Errorf("unmarshal: %w", err) - } +func (i *IndexMap) AddRow(row *Row, item map[string]any) error { field := i.Options.Field diff --git a/collection/indexmap_test.go b/collection/indexmap_test.go index 7187572..1d31ba3 100644 --- a/collection/indexmap_test.go +++ b/collection/indexmap_test.go @@ -1,6 +1,7 @@ package collection import ( + json2 "encoding/json/v2" "fmt" "testing" ) @@ -28,7 +29,7 @@ func BenchmarkIndexMap_AddRow(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { row := &Row{Payload: payloads[i]} - if err := index.AddRow(row); err != nil { + if err := index.AddRow(row, nil); err != nil { b.Fatalf("AddRow error: %v", err) } } @@ -47,7 +48,9 @@ func BenchmarkIndexSyncMap_AddRow(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { row := &Row{Payload: payloads[i]} - if err := index.AddRow(row); err != nil { + item := map[string]any{} + json2.Unmarshal(row.Payload, &item) + if err := index.AddRow(row, item); err != nil { b.Fatalf("AddRow error: %v", err) } } @@ -65,7 +68,7 @@ func BenchmarkIndexMap_RemoveRow(b *testing.B) { for i := 0; i < b.N; i++ { key := fmt.Sprintf("key-%d", i) row := &Row{Payload: createPayload(key, options.Field)} - if err := index.AddRow(row); err != nil { + if err := index.AddRow(row, nil); err != nil { b.Fatalf("AddRow error: %v", err) } } @@ -80,7 +83,7 @@ func BenchmarkIndexMap_RemoveRow(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { row := &Row{Payload: payloads[i]} - if err := index.RemoveRow(row); err != nil { + if err := index.RemoveRow(row, nil); err != nil { b.Fatalf("RemoveRow error: %v", err) } } @@ -93,7 +96,9 @@ func BenchmarkIndexSyncMap_RemoveRow(b *testing.B) { for i := 0; i < b.N; i++ { key := fmt.Sprintf("key-%d", i) row := &Row{Payload: createPayload(key, options.Field)} - if err := index.AddRow(row); err != nil { + item := map[string]any{} + json2.Unmarshal(row.Payload, &item) + if err := index.AddRow(row, item); err != nil { b.Fatalf("AddRow error: %v", err) } } @@ -107,7 +112,7 @@ func BenchmarkIndexSyncMap_RemoveRow(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { row := &Row{Payload: payloads[i]} - if err := index.RemoveRow(row); err != nil { + if err := index.RemoveRow(row, nil); err != nil { b.Fatalf("RemoveRow error: %v", err) } } @@ -126,7 +131,7 @@ func BenchmarkIndexMap_Traverse(b *testing.B) { for i := 0; i < numRows; i++ { key := fmt.Sprintf("key-%d", i) row := &Row{Payload: createPayload(key, options.Field)} - if err := index.AddRow(row); err != nil { + if err := index.AddRow(row, nil); err != nil { b.Fatalf("AddRow error: %v", err) } } @@ -149,7 +154,9 @@ func BenchmarkIndexSyncMap_Traverse(b *testing.B) { for i := 0; i < numRows; i++ { key := fmt.Sprintf("key-%d", i) row := &Row{Payload: createPayload(key, options.Field)} - if err := index.AddRow(row); err != nil { + item := map[string]any{} + json2.Unmarshal(row.Payload, &item) + if err := index.AddRow(row, item); err != nil { b.Fatalf("AddRow error: %v", err) } } diff --git a/collection/indexsyncmap.go b/collection/indexsyncmap.go index 032a5e1..be2d1a7 100644 --- a/collection/indexsyncmap.go +++ b/collection/indexsyncmap.go @@ -19,14 +19,7 @@ func NewIndexSyncMap(options *IndexMapOptions) *IndexSyncMap { } } -func (i *IndexSyncMap) RemoveRow(row *Row) error { - - item := map[string]interface{}{} - - err := json.Unmarshal(row.Payload, &item) - if err != nil { - return fmt.Errorf("unmarshal: %w", err) - } +func (i *IndexSyncMap) RemoveRow(row *Row, item map[string]any) error { field := i.Options.Field entries := i.Entries @@ -53,13 +46,7 @@ func (i *IndexSyncMap) RemoveRow(row *Row) error { return nil } -func (i *IndexSyncMap) AddRow(row *Row) error { - - item := map[string]interface{}{} - err := json.Unmarshal(row.Payload, &item) - if err != nil { - return fmt.Errorf("unmarshal: %w", err) - } +func (i *IndexSyncMap) AddRow(row *Row, item map[string]any) error { field := i.Options.Field @@ -81,6 +68,15 @@ func (i *IndexSyncMap) AddRow(row *Row) error { return fmt.Errorf("index conflict: field '%s' with value '%s'", field, value) } entries.Store(value, row) + case []string: + for _, s := range value { + if _, exists := entries.Load(s); exists { + return fmt.Errorf("index conflict: field '%s' with value '%s'", field, value) + } + } + for _, s := range value { + entries.Store(s, row) + } case []interface{}: for _, v := range value { s := v.(string) // TODO: handle this casting error