Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cmd/streamtest/streamtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Test_Streamtest(t *testing.T) {

t.SkipNow()

if false {
if true {
conf := configuration.Default()
conf.Dir = t.TempDir()

Expand All @@ -53,8 +53,12 @@ func Test_Streamtest(t *testing.T) {

{
// Create collection
payload := strings.NewReader(`{"name": "streammm"}`)
req, _ := http.NewRequest("POST", base+"/v1/collections", payload)
payload := strings.NewReader(`{
"field": "n",
"name": "my-index",
"type": "map"
}`)
req, _ := http.NewRequest("POST", base+"/v1/collections/streammm:createIndex", payload)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
Expand Down
48 changes: 28 additions & 20 deletions collection/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func OpenCollection(filename string) (*Collection, error) {

switch command.Name {
case "insert":
_, err := collection.addRow(command.Payload)
_, err := collection.addRow(command.Payload, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,13 +181,13 @@ func OpenCollection(filename string) (*Collection, error) {
return collection, nil
}

func (c *Collection) addRow(payload json.RawMessage) (*Row, error) {
func (c *Collection) addRow(payload json.RawMessage, item map[string]any) (*Row, error) {

row := &Row{
Payload: payload,
}

err := indexInsert(c.Indexes, row)
err := indexInsert(c.Indexes, row, item)
if err != nil {
return nil, err
}
Expand All @@ -209,12 +209,6 @@ func (c *Collection) Insert(item map[string]any) (*Row, error) {
auto := atomic.AddInt64(&c.Count, 1)

if c.Defaults != nil {
// item := map[string]any{} // todo: item is shadowed, choose a better name
// err := json.Unmarshal(payload, &item)
// if err != nil {
// return nil, fmt.Errorf("json encode defaults: %w", err)
// }

for k, v := range c.Defaults {
if item[k] != nil {
continue
Expand All @@ -234,13 +228,13 @@ func (c *Collection) Insert(item map[string]any) (*Row, error) {
}
}

payload, err := json.Marshal(item)
payload, err := json2.Marshal(item)
if err != nil {
return nil, fmt.Errorf("json encode payload: %w", err)
}

// Add row
row, err := c.addRow(payload)
row, err := c.addRow(payload, item)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -359,7 +353,9 @@ func (c *Collection) createIndex(name string, options interface{}, persist bool)

// Add all rows to the index
for _, row := range c.Rows {
err := index.AddRow(row)
item := map[string]any{}
json2.Unmarshal(row.Payload, &item)
err := index.AddRow(row, item)
if err != nil {
delete(c.Indexes, name)
return fmt.Errorf("index row: %s, data: %s", err.Error(), string(row.Payload))
Expand Down Expand Up @@ -390,7 +386,12 @@ func (c *Collection) createIndex(name string, options interface{}, persist bool)
return c.EncodeCommand(command)
}

func indexInsert(indexes map[string]*collectionIndex, row *Row) (err error) {
func indexInsert(indexes map[string]*collectionIndex, row *Row, item map[string]any) (err error) {

if item == nil {
item = make(map[string]any)
json2.Unmarshal(row.Payload, &item)
}

// Note: rollbacks array should be kept in stack if it is smaller than 65536 bytes, so
// our recommended maximum number of indexes should NOT exceed 8192 indexes
Expand All @@ -403,12 +404,12 @@ func indexInsert(indexes map[string]*collectionIndex, row *Row) (err error) {
return
}
for i := 0; i < c; i++ {
rollbacks[i].RemoveRow(row)
rollbacks[i].RemoveRow(row, item)
}
}()

for key, index := range indexes {
err = index.AddRow(row)
err = index.AddRow(row, item)
if err != nil {
return fmt.Errorf("index add '%s': %s", key, err.Error())
}
Expand All @@ -420,9 +421,10 @@ func indexInsert(indexes map[string]*collectionIndex, row *Row) (err error) {
return
}

func indexRemove(indexes map[string]*collectionIndex, row *Row) (err error) {
func indexRemove(indexes map[string]*collectionIndex, row *Row, item map[string]any) (err error) {

for key, index := range indexes {
err = index.RemoveRow(row)
err = index.RemoveRow(row, item)
if err != nil {
// TODO: does this make any sense?
return fmt.Errorf("index remove '%s': %s", key, err.Error())
Expand All @@ -445,14 +447,17 @@ func lockBlock(m *sync.Mutex, f func() error) error {

func (c *Collection) removeByRow(row *Row, persist bool) error { // todo: rename to 'removeRow'

item := map[string]any{}
json2.Unmarshal(row.Payload, &item)

var i int
err := lockBlock(c.rowsMutex, func() error {
i = row.I
if len(c.Rows) <= i {
return fmt.Errorf("row %d does not exist", i)
}

err := indexRemove(c.Indexes, row)
err := indexRemove(c.Indexes, row, item)
if err != nil {
return fmt.Errorf("could not free index")
}
Expand Down Expand Up @@ -514,15 +519,18 @@ func (c *Collection) patchByRow(row *Row, patch interface{}, persist bool) error
return nil
}

item := map[string]any{}
json2.Unmarshal(row.Payload, &item)

// index update
err = indexRemove(c.Indexes, row)
err = indexRemove(c.Indexes, row, item)
if err != nil {
return fmt.Errorf("indexRemove: %w", err)
}

row.Payload = newPayload

err = indexInsert(c.Indexes, row)
err = indexInsert(c.Indexes, row, item)
if err != nil {
return fmt.Errorf("indexInsert: %w", err)
}
Expand Down
24 changes: 12 additions & 12 deletions collection/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,16 +401,16 @@ func TestPersistenceUpdate_TwiceOptimization(t *testing.T) {
}

type MockIndex struct {
AddRowCallback func(row *Row) error
RemoveRowCallback func(row *Row) error
AddRowCallback func(row *Row, item map[string]any) error
RemoveRowCallback func(row *Row, item map[string]any) error
}

func (m *MockIndex) AddRow(row *Row) error {
return m.AddRowCallback(row)
func (m *MockIndex) AddRow(row *Row, item map[string]any) error {
return m.AddRowCallback(row, item)
}

func (m *MockIndex) RemoveRow(row *Row) error {
return m.RemoveRowCallback(row)
func (m *MockIndex) RemoveRow(row *Row, item map[string]any) error {
return m.RemoveRowCallback(row, item)
}

func (m *MockIndex) Traverse(options []byte, f func(row *Row) bool) {
Expand All @@ -425,14 +425,14 @@ func TestIndexInsert_Rollback(t *testing.T) {

newMock := func(name string) Index {
return &MockIndex{
AddRowCallback: func(row *Row) error {
AddRowCallback: func(row *Row, item map[string]any) error {
if len(adds) == 2 {
return errors.New("mock error")
}
adds = append(adds, name)
return nil
},
RemoveRowCallback: func(row *Row) error {
RemoveRowCallback: func(row *Row, item map[string]any) error {
removes = append(removes, name)
return nil
},
Expand All @@ -453,7 +453,7 @@ func TestIndexInsert_Rollback(t *testing.T) {

row := &Row{}

indexInsert(indexes, row)
indexInsert(indexes, row, nil)

AssertEqual(removes, adds)
AssertEqual(len(removes), 2)
Expand All @@ -473,13 +473,13 @@ func TestIndexInsert_Rollback_BlackBox(t *testing.T) {
Payload: []byte(`{"id":"my-id"}`),
}

err1 := indexInsert(indexes, row)
err1 := indexInsert(indexes, row, nil)
AssertNil(err1)

err2 := indexInsert(indexes, row)
err2 := indexInsert(indexes, row, nil)
AssertNotNil(err2)

err3 := indexInsert(indexes, row)
err3 := indexInsert(indexes, row, nil)
AssertNotNil(err3)

}
Expand Down
4 changes: 2 additions & 2 deletions collection/index.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package collection

type Index interface {
AddRow(row *Row) error
RemoveRow(row *Row) error
AddRow(row *Row, item map[string]any) error
RemoveRow(row *Row, item map[string]any) error
Traverse(options []byte, f func(row *Row) bool) // todo: return error?
}
16 changes: 6 additions & 10 deletions collection/indexbtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,17 @@ type IndexBtree struct {
Options *IndexBTreeOptions
}

func (b *IndexBtree) RemoveRow(r *Row) error {
func (b *IndexBtree) RemoveRow(row *Row, item map[string]any) error {

// TODO: duplicated code:
values := []interface{}{}
data := map[string]interface{}{}
json.Unmarshal(r.Payload, &data)

for _, field := range b.Options.Fields {
values = append(values, data[field])
values = append(values, item[field])
}

b.Btree.Delete(&RowOrdered{
Row: r, // probably r is not needed
Row: row, // probably r is not needed
Values: values,
})

Expand Down Expand Up @@ -102,14 +100,12 @@ func NewIndexBTree(options *IndexBTreeOptions) *IndexBtree {
}
}

func (b *IndexBtree) AddRow(r *Row) error {
func (b *IndexBtree) AddRow(row *Row, item map[string]any) error {
var values []interface{}
data := map[string]interface{}{}
json.Unmarshal(r.Payload, &data)

for _, field := range b.Options.Fields {
field = strings.TrimPrefix(field, "-")
value, exists := data[field]
value, exists := item[field]
if exists {
values = append(values, value)
continue
Expand All @@ -134,7 +130,7 @@ func (b *IndexBtree) AddRow(r *Row) error {
}

b.Btree.ReplaceOrInsert(&RowOrdered{
Row: r,
Row: row,
Values: values,
})

Expand Down
27 changes: 16 additions & 11 deletions collection/indexbtree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ func Test_IndexBTree_HappyPath(t *testing.T) {

n := 4
for i := 0; i < n; i++ {
data, _ := json.Marshal(JSON{
item := JSON{
"id": float64(i),
})
}
data, _ := json.Marshal(item)
index.AddRow(&Row{
Payload: data,
})
}, item)
}

{
Expand Down Expand Up @@ -70,10 +71,12 @@ func TestIndexBtree_AddRow_Sparse(t *testing.T) {
}

for i, document := range payloads {
item := JSON{}
json.Unmarshal([]byte(document), &item)
err := index.AddRow(&Row{
I: i,
Payload: json.RawMessage(document),
})
}, item)
biff.AssertNil(err)
}

Expand Down Expand Up @@ -101,20 +104,22 @@ func TestIndexBtree_RemoveRow(t *testing.T) {
}

for _, document := range documents {
item := JSON{}
json.Unmarshal([]byte(document), &item)
err := index.AddRow(&Row{
Payload: json.RawMessage(document),
})
}, item)
biff.AssertNil(err)
}

errFirst := index.RemoveRow(&Row{
Payload: json.RawMessage(`{"name":"b"}`),
})
}, JSON{"name": "b"})
biff.AssertNil(errFirst)

errSecond := index.RemoveRow(&Row{
Payload: json.RawMessage(`{"name":"b"}`),
})
}, JSON{"name": "b"})
biff.AssertNil(errSecond)

expectedDocuments := []string{
Expand All @@ -139,13 +144,13 @@ func TestIndexBtree_AddRow_NonSparse(t *testing.T) {
// Insert defined field
errValid := index.AddRow(&Row{
Payload: json.RawMessage(`{"name":"Fulanez", "year":1986}`),
})
}, JSON{"name": "Fulanez", "year": 1986})
biff.AssertNil(errValid)

// Insert undefined field
errInvalid := index.AddRow(&Row{
Payload: json.RawMessage(`{"name":"Fulanez"}`),
})
}, JSON{"name": "Fulanez"})
biff.AssertEqual(errInvalid.Error(), "field 'year' not defined")
}

Expand All @@ -159,13 +164,13 @@ func TestIndexBtree_AddRow_Conflict(t *testing.T) {
// Insert first
errValid := index.AddRow(&Row{
Payload: json.RawMessage(`{"product_code":1,"product_category":"cat1"}`),
})
}, JSON{"product_code": 1, "product_category": "cat1"})
biff.AssertNil(errValid)

// Insert same value
errConflict := index.AddRow(&Row{
Payload: json.RawMessage(`{"product_code":1,"product_category":"cat1"}`),
})
}, JSON{"product_code": 1, "product_category": "cat1"})
biff.AssertEqual(errConflict.Error(), "key (product_code:1,product_category:cat1) already exists")
}

Expand Down
Loading
Loading