Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 244 additions & 9 deletions collection/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"fmt"
"io"
"os"
"reflect"
"sync"
"sync/atomic"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/google/uuid"

"github.com/fulldump/inceptiondb/utils"
Expand Down Expand Up @@ -495,25 +495,30 @@ func (c *Collection) Patch(row *Row, patch interface{}) error {

func (c *Collection) patchByRow(row *Row, patch interface{}, persist bool) error { // todo: rename to 'patchRow'

patchBytes, err := json.Marshal(patch)
originalValue, err := decodeJSONValue(row.Payload)
if err != nil {
return fmt.Errorf("marshal patch: %w", err)
return fmt.Errorf("decode row payload: %w", err)
}

newPayload, err := jsonpatch.MergePatch(row.Payload, patchBytes)
normalizedPatch, err := normalizeJSONValue(patch)
if err != nil {
return fmt.Errorf("cannot apply patch: %w", err)
return fmt.Errorf("normalize patch: %w", err)
}

diff, err := jsonpatch.CreateMergePatch(row.Payload, newPayload) // todo: optimization: discard operation if empty
newValue, changed, err := applyMergePatchValue(originalValue, normalizedPatch)
if err != nil {
return fmt.Errorf("cannot diff: %w", err)
return fmt.Errorf("cannot apply patch: %w", err)
}

if len(diff) == 2 { // diff == '{}'
if !changed {
return nil
}

newPayload, err := json.Marshal(newValue)
if err != nil {
return fmt.Errorf("marshal payload: %w", err)
}

// index update
err = indexRemove(c.Indexes, row)
if err != nil {
Expand All @@ -531,10 +536,15 @@ func (c *Collection) patchByRow(row *Row, patch interface{}, persist bool) error
return nil
}

diffValue, hasDiff := createMergeDiff(originalValue, newValue)
if !hasDiff {
return nil
}

// Persist
payload, err := json.Marshal(map[string]interface{}{
"i": row.I,
"diff": json.RawMessage(diff),
"diff": diffValue,
})
if err != nil {
return err // todo: wrap error
Expand All @@ -550,6 +560,231 @@ func (c *Collection) patchByRow(row *Row, patch interface{}, persist bool) error
return c.EncodeCommand(command)
}

func decodeJSONValue(raw json.RawMessage) (interface{}, error) {

if len(raw) == 0 {
return nil, nil
}

var value interface{}
if err := json.Unmarshal(raw, &value); err != nil {
return nil, err
}
return value, nil
}

func normalizeJSONValue(value interface{}) (interface{}, error) {

switch v := value.(type) {
case json.RawMessage:
var decoded interface{}
if err := json.Unmarshal(v, &decoded); err != nil {
return nil, err
}
return normalizeJSONValue(decoded)
case map[string]interface{}:
normalized := make(map[string]interface{}, len(v))
for key, item := range v {
nv, err := normalizeJSONValue(item)
if err != nil {
return nil, err
}
normalized[key] = nv
}
return normalized, nil
case []interface{}:
normalized := make([]interface{}, len(v))
for i, item := range v {
nv, err := normalizeJSONValue(item)
if err != nil {
return nil, err
}
normalized[i] = nv
}
return normalized, nil
default:
return v, nil
}
}

func applyMergePatchValue(original interface{}, patch interface{}) (interface{}, bool, error) {

switch p := patch.(type) {
case map[string]interface{}:
var originalMap map[string]interface{}
if m, ok := original.(map[string]interface{}); ok {
originalMap = m
}

result := make(map[string]interface{}, len(originalMap)+len(p))
for k, v := range originalMap {
result[k] = cloneJSONValue(v)
}

changed := false
for k, item := range p {
if item == nil {
if _, exists := result[k]; exists {
delete(result, k)
changed = true
}
continue
}

originalValue := interface{}(nil)
if originalMap != nil {
originalValue, _ = originalMap[k]
}

mergedValue, valueChanged, err := applyMergePatchValue(originalValue, item)
if err != nil {
return nil, false, err
}

if originalMap == nil {
changed = true
} else {
if _, exists := originalMap[k]; !exists || valueChanged {
changed = true
}
}

result[k] = mergedValue
}

return result, changed, nil
case []interface{}:
cloned := cloneJSONArray(p)
if current, ok := original.([]interface{}); ok {
if reflect.DeepEqual(current, cloned) {
return cloned, false, nil
}
}
return cloned, true, nil
default:
if reflect.DeepEqual(original, p) {
return cloneJSONValue(p), false, nil
}
return cloneJSONValue(p), true, nil
}
}

func createMergeDiff(original interface{}, modified interface{}) (interface{}, bool) {

switch o := original.(type) {
case map[string]interface{}:
modifiedMap, ok := modified.(map[string]interface{})
if !ok {
if reflect.DeepEqual(original, modified) {
return nil, false
}
return cloneJSONValue(modified), true
}

diff := make(map[string]interface{})
changed := false

for k := range o {
if _, exists := modifiedMap[k]; !exists {
diff[k] = nil
changed = true
}
}

for k, mv := range modifiedMap {
ov, exists := o[k]
if !exists {
diff[k] = cloneJSONValue(mv)
changed = true
continue
}

if om, ok := ov.(map[string]interface{}); ok {
if mm, ok := mv.(map[string]interface{}); ok {
subDiff, subChanged := createMergeDiff(om, mm)
if subChanged {
diff[k] = subDiff
changed = true
}
continue
}
}

if oa, ok := ov.([]interface{}); ok {
if ma, ok := mv.([]interface{}); ok {
if !reflect.DeepEqual(oa, ma) {
diff[k] = cloneJSONValue(mv)
changed = true
}
continue
}
}

if !reflect.DeepEqual(ov, mv) {
diff[k] = cloneJSONValue(mv)
changed = true
}
}

if !changed {
return nil, false
}
return diff, true
case []interface{}:
if ma, ok := modified.([]interface{}); ok {
if reflect.DeepEqual(o, ma) {
return nil, false
}
return cloneJSONValue(ma), true
}
if reflect.DeepEqual(original, modified) {
return nil, false
}
return cloneJSONValue(modified), true
default:
if reflect.DeepEqual(original, modified) {
return nil, false
}
return cloneJSONValue(modified), true
}
}

func cloneJSONValue(value interface{}) interface{} {

switch v := value.(type) {
case map[string]interface{}:
cloned := make(map[string]interface{}, len(v))
for k, item := range v {
cloned[k] = cloneJSONValue(item)
}
return cloned
case []interface{}:
return cloneJSONArray(v)
case json.RawMessage:
if v == nil {
return nil
}
cloned := make(json.RawMessage, len(v))
copy(cloned, v)
return cloned
default:
return v
}
}

func cloneJSONArray(values []interface{}) []interface{} {

if values == nil {
return nil
}

cloned := make([]interface{}, len(values))
for i, item := range values {
cloned[i] = cloneJSONValue(item)
}
return cloned
}

func (c *Collection) Close() error {
{
err := c.buffer.Flush()
Expand Down
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@ go 1.25

require (
github.com/SierraSoftworks/connor v1.0.2
github.com/evanphx/json-patch v0.5.2
github.com/fulldump/apitest v1.3.0
github.com/fulldump/biff v1.3.0
github.com/fulldump/box v0.7.0
github.com/fulldump/goconfig v1.7.1
github.com/google/btree v1.1.3
github.com/google/uuid v1.6.0
)

require github.com/pkg/errors v0.9.1 // indirect
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
github.com/SierraSoftworks/connor v1.0.2 h1:vIPgtPP4rhMT1kaFfj85hV8QEBG67zy7cShOMnEBlVU=
github.com/SierraSoftworks/connor v1.0.2/go.mod h1:hCWEm8Mpp8zrJ++0I4xdo6oNn8cSG4BjcYi4+JgWViM=
github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fulldump/apitest v1.3.0 h1:BG2Z2iCh5u5m/mpzAnaTDxMno8Iv4jkLoDtI08gFx+8=
github.com/fulldump/apitest v1.3.0/go.mod h1:UZ/2tr5LhMNXZLgEG9tdz+ekUN8JtBHEn84d8zOm5p4=
Expand All @@ -18,13 +16,10 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
25 changes: 0 additions & 25 deletions vendor/github.com/evanphx/json-patch/LICENSE

This file was deleted.

Loading
Loading