From 78eb9fd93ca24d0025ed874b8416beda6db19092 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 26 Sep 2023 09:23:05 +0200 Subject: [PATCH 01/10] wip mlru/logcache --- ributil/logcache/logcache.go | 61 ++++++++++++ ributil/mlru/mlru.go | 187 +++++++++++++++++++++++++++++++++++ 2 files changed, 248 insertions(+) create mode 100644 ributil/logcache/logcache.go create mode 100644 ributil/mlru/mlru.go diff --git a/ributil/logcache/logcache.go b/ributil/logcache/logcache.go new file mode 100644 index 0000000..103d932 --- /dev/null +++ b/ributil/logcache/logcache.go @@ -0,0 +1,61 @@ +package logcache + +import ( + "bufio" + "os" + "sync" + + "github.com/multiformats/go-multihash" +) + +const ( + carExt = ".car" + idxExt = ".offs" +) + +type mhStr string + +type LogCache struct { + carFile os.File + + // index file + // [[i24:dataLen][i48:offset]:i64][mhLen:i8][multihash]... + indexFile os.File + + carBuf, idxBuf bufio.ReaderAt + + indexLk sync.Mutex + index map[mhStr]int64 + + // entirely separate write mutex because reads happen on read-only data + read sync.RWMutex + write sync.Mutex +} + +func Open(basePath string) (*LogCache, error) { + // open car file + + // open index file + + // load index file into index + + // check that last index file points to a valid car entry, and that there's no + // data at the end of carFile + // - if there is trailer data in the car, truncate it + // - if the index points beyond the end of the car file, scan the index for the + // last entry which is within the car, truncate the index, then read the car (if last entry is truncated, do truncate it) +} + +// Put returns an error if the multihash is already stored in the cache, otherwise +// it appends the entry to the cache car, then appends an index entry +func (lc *LogCache) Put(mh multihash.Multihash, data []byte) (err error) { + +} + +func (lc *LogCache) Get(mh multihash.Multihash, cb func([]byte) error) error { + +} + +func (lc *LogCache) Flush() error { + +} diff --git a/ributil/mlru/mlru.go b/ributil/mlru/mlru.go new file mode 100644 index 0000000..1dffe80 --- /dev/null +++ b/ributil/mlru/mlru.go @@ -0,0 +1,187 @@ +package mlru + +import ( + "errors" + "sync" + "sync/atomic" +) + +type LRUGroup struct { + counter atomic.Int64 +} + +type MLRU[K comparable, V any] struct { + group *LRUGroup + mu sync.Mutex // Mutex to ensure thread safety + valid bool // Flag to check if the cache is valid + + currentSize, capacity int64 + first, last *entry[K, V] + keys map[K]*entry[K, V] +} + +type entry[K comparable, V any] struct { + key K + prev, next *entry[K, V] + + index int64 + value V +} + +func NewMLRU[K comparable, V any](group *LRUGroup, capacity int64) *MLRU[K, V] { + return &MLRU[K, V]{ + group: group, + valid: true, + capacity: capacity, + keys: make(map[K]*entry[K, V]), + } +} + +func (l *MLRU[K, V]) evictLast() { + if l.last != nil { + delete(l.keys, l.last.key) + if l.currentSize == 1 { + // If size is one, set first and last to nil after eviction + l.first, l.last = nil, nil + } else { + l.last = l.last.prev + if l.last != nil { + l.last.next = nil + } + } + l.currentSize-- + } +} + +func (l *MLRU[K, V]) Put(key K, value V) error { + l.mu.Lock() + defer l.mu.Unlock() + + if !l.valid { + return errors.New("invalid cache") + } + + if existing, ok := l.keys[key]; ok { + // Allow updating the value and move the entry to the front + existing.value = value + if existing.prev != nil { + existing.prev.next = existing.next + } + if existing.next != nil { + existing.next.prev = existing.prev + } + existing.next = l.first + l.first.prev = existing + l.first = existing + return nil + } + + if l.currentSize >= l.capacity { + l.evictLast() + } + index := l.group.counter.Add(1) + newEntry := &entry[K, V]{key: key, index: index, value: value} + l.keys[key] = newEntry + if l.first == nil { + l.first, l.last = newEntry, newEntry + } else { + newEntry.next = l.first + l.first.prev = newEntry + l.first = newEntry + } + l.currentSize++ + return nil +} + +func (l *MLRU[K, V]) Get(key K) (V, error) { + l.mu.Lock() + defer l.mu.Unlock() + + if !l.valid { + var zero V + return zero, errors.New("invalid cache") + } + + entry, exists := l.keys[key] + if !exists { + var zero V + return zero, errors.New("key does not exist") + } + // Move the accessed entry to the front. + if entry.prev != nil { + entry.prev.next = entry.next + } + if entry.next != nil { + entry.next.prev = entry.prev + } else { + // If the entry was the last, update l.last + l.last = entry.prev + } + entry.next = l.first + l.first.prev = entry + l.first = entry + return entry.value, nil +} + +func (l *MLRU[K, V]) Merge(other *MLRU[K, V]) error { + l.mu.Lock() + other.mu.Lock() + defer l.mu.Unlock() + defer other.mu.Unlock() + + if !l.valid || !other.valid { + return errors.New("invalid cache") + } + + // Invalidating the other cache after merging. + other.valid = false + + curEntOther := other.first + curEntHere := l.first + + for { + if curEntHere.index > curEntOther.index { + if curEntHere.next == nil { + // At the end of the list, append the rest of the other list until at capacity + for l.currentSize < l.capacity && curEntOther != nil { + l.currentSize++ + curEntHere.next = curEntOther + curEntOther.prev = curEntHere + curEntHere = curEntOther + curEntOther = curEntOther.next + } + break + } + + // Move to the next here, which will have a lower index + curEntHere = curEntHere.next + continue + } + + // Here the other entry has a higher index, insert it before the current entry in this list + if l.currentSize >= l.capacity { + // If already at capacity, evict the last entry to make room + l.evictLast() + } + newEntry := curEntOther + curEntOther = curEntOther.next // move to the next in other list + + // Insert newEntry before curEntHere in this list + newEntry.next = curEntHere + newEntry.prev = curEntHere.prev + if curEntHere.prev != nil { + curEntHere.prev.next = newEntry + } else { + // Updating the first entry if needed + l.first = newEntry + } + curEntHere.prev = newEntry + l.currentSize++ + l.keys[newEntry.key] = newEntry + + if curEntOther == nil { + break + } + } + return nil +} From 7f73e15b787abd0b54c616e5cfc89684ebcf3297 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 26 Sep 2023 09:32:23 +0200 Subject: [PATCH 02/10] mlru: comments, tests --- ributil/mlru/mlru.go | 33 +++++++--- ributil/mlru/mlru_test.go | 124 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 10 deletions(-) create mode 100644 ributil/mlru/mlru_test.go diff --git a/ributil/mlru/mlru.go b/ributil/mlru/mlru.go index 1dffe80..7990f8a 100644 --- a/ributil/mlru/mlru.go +++ b/ributil/mlru/mlru.go @@ -1,3 +1,4 @@ +// Package mlru provides a thread-safe, mergeable least recently used (LRU) cache. package mlru import ( @@ -6,10 +7,12 @@ import ( "sync/atomic" ) +// LRUGroup contains a shared counter used for indexing entries. type LRUGroup struct { counter atomic.Int64 } +// MLRU represents a mergeable least recently used (LRU) cache. type MLRU[K comparable, V any] struct { group *LRUGroup mu sync.Mutex // Mutex to ensure thread safety @@ -20,6 +23,7 @@ type MLRU[K comparable, V any] struct { keys map[K]*entry[K, V] } +// entry represents an entry in the LRU cache. type entry[K comparable, V any] struct { key K prev, next *entry[K, V] @@ -28,6 +32,7 @@ type entry[K comparable, V any] struct { value V } +// NewMLRU creates a new MLRU cache with the specified group and capacity. func NewMLRU[K comparable, V any](group *LRUGroup, capacity int64) *MLRU[K, V] { return &MLRU[K, V]{ group: group, @@ -37,11 +42,12 @@ func NewMLRU[K comparable, V any](group *LRUGroup, capacity int64) *MLRU[K, V] { } } +// evictLast removes the last entry from the cache. func (l *MLRU[K, V]) evictLast() { if l.last != nil { delete(l.keys, l.last.key) if l.currentSize == 1 { - // If size is one, set first and last to nil after eviction + // If size is one, set first and last to nil after eviction. l.first, l.last = nil, nil } else { l.last = l.last.prev @@ -53,6 +59,8 @@ func (l *MLRU[K, V]) evictLast() { } } +// Put adds a new entry to the cache or updates an existing entry. +// It evicts the last entry if the cache is at full capacity. func (l *MLRU[K, V]) Put(key K, value V) error { l.mu.Lock() defer l.mu.Unlock() @@ -62,7 +70,7 @@ func (l *MLRU[K, V]) Put(key K, value V) error { } if existing, ok := l.keys[key]; ok { - // Allow updating the value and move the entry to the front + // Allow updating the value and move the entry to the front. existing.value = value if existing.prev != nil { existing.prev.next = existing.next @@ -93,6 +101,8 @@ func (l *MLRU[K, V]) Put(key K, value V) error { return nil } +// Get retrieves an entry from the cache and moves it to the front. +// Returns an error if the key does not exist or the cache is invalid. func (l *MLRU[K, V]) Get(key K) (V, error) { l.mu.Lock() defer l.mu.Unlock() @@ -114,7 +124,7 @@ func (l *MLRU[K, V]) Get(key K) (V, error) { if entry.next != nil { entry.next.prev = entry.prev } else { - // If the entry was the last, update l.last + // If the entry was the last, update l.last. l.last = entry.prev } entry.next = l.first @@ -123,6 +133,9 @@ func (l *MLRU[K, V]) Get(key K) (V, error) { return entry.value, nil } +// Merge merges another MLRU cache into the current cache. +// The other cache is invalidated after merging. +// Returns an error if either cache is invalid. func (l *MLRU[K, V]) Merge(other *MLRU[K, V]) error { l.mu.Lock() other.mu.Lock() @@ -142,7 +155,7 @@ func (l *MLRU[K, V]) Merge(other *MLRU[K, V]) error { for { if curEntHere.index > curEntOther.index { if curEntHere.next == nil { - // At the end of the list, append the rest of the other list until at capacity + // At the end of the list, append the rest of the other list until at capacity. for l.currentSize < l.capacity && curEntOther != nil { l.currentSize++ curEntHere.next = curEntOther @@ -153,26 +166,26 @@ func (l *MLRU[K, V]) Merge(other *MLRU[K, V]) error { break } - // Move to the next here, which will have a lower index + // Move to the next here, which will have a lower index. curEntHere = curEntHere.next continue } - // Here the other entry has a higher index, insert it before the current entry in this list + // Here the other entry has a higher index, insert it before the current entry in this list. if l.currentSize >= l.capacity { - // If already at capacity, evict the last entry to make room + // If already at capacity, evict the last entry to make room. l.evictLast() } newEntry := curEntOther - curEntOther = curEntOther.next // move to the next in other list + curEntOther = curEntOther.next // move to the next in other list. - // Insert newEntry before curEntHere in this list + // Insert newEntry before curEntHere in this list. newEntry.next = curEntHere newEntry.prev = curEntHere.prev if curEntHere.prev != nil { curEntHere.prev.next = newEntry } else { - // Updating the first entry if needed + // Updating the first entry if needed. l.first = newEntry } curEntHere.prev = newEntry diff --git a/ributil/mlru/mlru_test.go b/ributil/mlru/mlru_test.go new file mode 100644 index 0000000..aa79ad2 --- /dev/null +++ b/ributil/mlru/mlru_test.go @@ -0,0 +1,124 @@ +package mlru + +import ( + "github.com/stretchr/testify/assert" + "sync/atomic" + "testing" +) + +func TestPutAndGet(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache := NewMLRU[int, int](group, 2) + + err := cache.Put(1, 100) + assert.NoError(t, err) + val, err := cache.Get(1) + assert.NoError(t, err) + assert.Equal(t, 100, val) +} + +func TestEvictLast(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache := NewMLRU[int, int](group, 2) + + err := cache.Put(1, 100) + assert.NoError(t, err) + err = cache.Put(2, 200) + assert.NoError(t, err) + err = cache.Put(3, 300) + assert.NoError(t, err) + + _, err = cache.Get(1) + assert.Error(t, err) // The key 1 should have been evicted + + val, err := cache.Get(2) + assert.NoError(t, err) + assert.Equal(t, 200, val) + + val, err = cache.Get(3) + assert.NoError(t, err) + assert.Equal(t, 300, val) +} + +func TestUpdateExistingKey(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache := NewMLRU[int, int](group, 2) + + err := cache.Put(1, 100) + assert.NoError(t, err) + err = cache.Put(1, 150) + assert.NoError(t, err) + + val, err := cache.Get(1) + assert.NoError(t, err) + assert.Equal(t, 150, val) +} + +func TestMergeCaches(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} // Shared group + cache1 := NewMLRU[int, int](group, 2) + err := cache1.Put(1, 100) + assert.NoError(t, err) + err = cache1.Put(2, 200) + assert.NoError(t, err) + + cache2 := NewMLRU[int, int](group, 2) // Sharing the same group + err = cache2.Put(3, 300) + assert.NoError(t, err) + err = cache2.Put(4, 400) + assert.NoError(t, err) + + err = cache1.Merge(cache2) + assert.NoError(t, err) + + val, err := cache1.Get(3) + assert.NoError(t, err) + assert.Equal(t, 300, val) + + val, err = cache1.Get(4) + assert.NoError(t, err) + assert.Equal(t, 400, val) + + _, err = cache2.Get(3) + assert.Error(t, err) // cache2 should be invalid after the merge +} + +func TestOrderedMerge(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} // Shared group + cache1 := NewMLRU[int, int](group, 2) + err := cache1.Put(1, 100) // v1 into cache1 + assert.NoError(t, err) + + cache2 := NewMLRU[int, int](group, 2) // Sharing the same group + err = cache2.Put(3, 300) // v3 into cache2 + assert.NoError(t, err) + + err = cache1.Put(2, 200) // v2 into cache1 + assert.NoError(t, err) + + err = cache2.Put(4, 400) // v4 into cache2 + assert.NoError(t, err) + + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // Expecting v2, v4 in cache1 + val, err := cache1.Get(2) + assert.NoError(t, err) + assert.Equal(t, 200, val) + + val, err = cache1.Get(4) + assert.NoError(t, err) + assert.Equal(t, 400, val) + + // v1 and v3 should not be in cache1 after the merge + _, err = cache1.Get(1) + assert.Error(t, err) + + _, err = cache1.Get(3) + assert.Error(t, err) + + // cache2 should be invalid after the merge + _, err = cache2.Get(3) + assert.Error(t, err) +} From 56d3c7064729a6e1b7d8b76878d3831125a3a542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 26 Sep 2023 09:53:43 +0200 Subject: [PATCH 03/10] mlru: merge fixes and tests --- ributil/mlru/mlru.go | 28 +++-- ributil/mlru/mlru_test.go | 249 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 267 insertions(+), 10 deletions(-) diff --git a/ributil/mlru/mlru.go b/ributil/mlru/mlru.go index 7990f8a..bdfb8f5 100644 --- a/ributil/mlru/mlru.go +++ b/ributil/mlru/mlru.go @@ -48,7 +48,7 @@ func (l *MLRU[K, V]) evictLast() { delete(l.keys, l.last.key) if l.currentSize == 1 { // If size is one, set first and last to nil after eviction. - l.first, l.last = nil, nil + l.first, l.last = nil, nil // TODO NOT COVERED } else { l.last = l.last.prev if l.last != nil { @@ -66,7 +66,7 @@ func (l *MLRU[K, V]) Put(key K, value V) error { defer l.mu.Unlock() if !l.valid { - return errors.New("invalid cache") + return errors.New("invalid cache") // TODO NOT COVERED } if existing, ok := l.keys[key]; ok { @@ -76,7 +76,7 @@ func (l *MLRU[K, V]) Put(key K, value V) error { existing.prev.next = existing.next } if existing.next != nil { - existing.next.prev = existing.prev + existing.next.prev = existing.prev // TODO NOT COVERED } existing.next = l.first l.first.prev = existing @@ -152,20 +152,31 @@ func (l *MLRU[K, V]) Merge(other *MLRU[K, V]) error { curEntOther := other.first curEntHere := l.first - for { - if curEntHere.index > curEntOther.index { + for curEntOther != nil { + if curEntHere == nil || curEntHere.index > curEntOther.index { + if curEntHere == nil { + // if curEntHere is nil, assume the list is empty and populate it with curEntOther + l.first = curEntOther + l.last = curEntOther + curEntOther = curEntOther.next + // Update the keys map with the new entry. + l.keys[curEntOther.key] = curEntOther + continue + } if curEntHere.next == nil { // At the end of the list, append the rest of the other list until at capacity. for l.currentSize < l.capacity && curEntOther != nil { l.currentSize++ curEntHere.next = curEntOther curEntOther.prev = curEntHere + // Update the last pointer and the keys map. + l.last = curEntOther + l.keys[curEntOther.key] = curEntOther curEntHere = curEntOther curEntOther = curEntOther.next } break } - // Move to the next here, which will have a lower index. curEntHere = curEntHere.next continue @@ -190,11 +201,8 @@ func (l *MLRU[K, V]) Merge(other *MLRU[K, V]) error { } curEntHere.prev = newEntry l.currentSize++ + // Update the keys map with the new entry. l.keys[newEntry.key] = newEntry - - if curEntOther == nil { - break - } } return nil } diff --git a/ributil/mlru/mlru_test.go b/ributil/mlru/mlru_test.go index aa79ad2..839f5fe 100644 --- a/ributil/mlru/mlru_test.go +++ b/ributil/mlru/mlru_test.go @@ -122,3 +122,252 @@ func TestOrderedMerge(t *testing.T) { _, err = cache2.Get(3) assert.Error(t, err) } + +func TestEvictLastWithOneElement(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache := NewMLRU[int, int](group, 1) + + err := cache.Put(1, 100) + assert.NoError(t, err) + err = cache.Put(2, 200) // This should evict key 1 + assert.NoError(t, err) + + _, err = cache.Get(1) + assert.Error(t, err) // The key 1 should have been evicted +} + +func TestInvalidCachePut(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache1 := NewMLRU[int, int](group, 2) + cache2 := NewMLRU[int, int](group, 2) + + err := cache1.Merge(cache2) // This will invalidate cache2 + assert.NoError(t, err) + + err = cache2.Put(1, 100) + assert.Error(t, err) // Should return error as the cache is invalid + assert.Equal(t, "invalid cache", err.Error()) +} + +func TestMergeWithMovingToNext(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache1 := NewMLRU[int, int](group, 2) + err := cache1.Put(2, 200) // most recent entry in cache1 + assert.NoError(t, err) + + cache2 := NewMLRU[int, int](group, 2) + err = cache2.Put(1, 100) // older entry in cache2 + assert.NoError(t, err) + + err = cache1.Merge(cache2) + assert.NoError(t, err) + + _, err = cache1.Get(2) + assert.NoError(t, err) // The key 2 should be in cache1 + + _, err = cache1.Get(1) + assert.NoError(t, err) // The key 1 should be in cache1 +} + +func TestMergeWithIncreasingCurrentSize(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache1 := NewMLRU[int, int](group, 4) + err := cache1.Put(3, 300) // fresh entry in cache1 + assert.NoError(t, err) + err = cache1.Put(4, 400) // fresh entry in cache1 + assert.NoError(t, err) + + cache2 := NewMLRU[int, int](group, 2) + err = cache2.Put(1, 100) // older entry in cache2 + assert.NoError(t, err) + err = cache2.Put(2, 200) // older entry in cache2 + assert.NoError(t, err) + + err = cache1.Merge(cache2) + assert.NoError(t, err) + + _, err = cache1.Get(3) + assert.NoError(t, err) // The key 3 should be in cache1 + + _, err = cache1.Get(4) + assert.NoError(t, err) // The key 4 should be in cache1 + + _, err = cache1.Get(1) + assert.NoError(t, err) // The key 1 should be in cache1 + + _, err = cache1.Get(2) + assert.NoError(t, err) // The key 2 should be in cache1 + + assert.Equal(t, int64(4), cache1.currentSize) // The currentSize should be 4 +} + +func TestMergeWithNilCurrentEntry(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + cache2 := NewMLRU[string, string](group, 2) + + // Put some values in cache2 + err := cache2.Put("k1", "v1") + assert.NoError(t, err) + err = cache2.Put("k2", "v2") + assert.NoError(t, err) + + // Merging empty cache1 with cache2 + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // cache1 should have the values from cache2 + value, err := cache1.Get("k1") + assert.NoError(t, err) + assert.Equal(t, "v1", value) + + value, err = cache1.Get("k2") + assert.NoError(t, err) + assert.Equal(t, "v2", value) +} + +func TestMergeAndUpdatePointers(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 3) + cache2 := NewMLRU[string, string](group, 3) + + // Put some values in both caches + err := cache1.Put("k1", "v1") + assert.NoError(t, err) + err = cache1.Put("k2", "v2") + assert.NoError(t, err) + err = cache2.Put("k3", "v3") + assert.NoError(t, err) + + // Merging cache1 with cache2 + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // cache1 should have the values from both caches + value, err := cache1.Get("k1") + assert.NoError(t, err) + assert.Equal(t, "v1", value) + + value, err = cache1.Get("k2") + assert.NoError(t, err) + assert.Equal(t, "v2", value) + + value, err = cache1.Get("k3") + assert.NoError(t, err) + assert.Equal(t, "v3", value) +} + +func TestMergeAndEvict(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + cache2 := NewMLRU[string, string](group, 2) + + // Put some values in both caches + err := cache1.Put("k1", "v1") + assert.NoError(t, err) + err = cache1.Put("k2", "v2") + assert.NoError(t, err) + err = cache2.Put("k3", "v3") + assert.NoError(t, err) + err = cache2.Put("k4", "v4") + assert.NoError(t, err) + + // Merging cache1 with cache2 + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // cache1 should have the most recent values from both caches + _, err = cache1.Get("k1") + assert.Error(t, err) // "k1" should have been evicted + + value, err := cache1.Get("k2") + assert.NoError(t, err) + assert.Equal(t, "v2", value) + + value, err = cache1.Get("k3") + assert.NoError(t, err) + assert.Equal(t, "v3", value) + + value, err = cache1.Get("k4") + assert.NoError(t, err) + assert.Equal(t, "v4", value) +} + +func TestMergeWithInvalidCache(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + cache2 := NewMLRU[string, string](group, 2) + + // Put some values in cache2 + err := cache2.Put("k1", "v1") + assert.NoError(t, err) + err = cache2.Put("k2", "v2") + assert.NoError(t, err) + + // Invalidate cache2 + cache2.valid = false + + // Merging cache1 with invalid cache2 should result in an error + err = cache1.Merge(cache2) + assert.Error(t, err) + assert.Equal(t, "invalid cache", err.Error()) +} + +func TestMergeWithNilCache(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + + // Merging cache1 with nil cache should result in an error + err := cache1.Merge(nil) + assert.Error(t, err) + assert.Equal(t, "invalid cache", err.Error()) +} + +func TestMergeWithMoreRecentOther(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + cache2 := NewMLRU[string, string](group, 2) + + // Put some values in both caches + err := cache1.Put("k1", "v1") + assert.NoError(t, err) + err = cache2.Put("k2", "v2") + assert.NoError(t, err) + err = cache2.Put("k3", "v3") + assert.NoError(t, err) + + // Merging cache1 with cache2 + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // cache1 should have the most recent values from cache2 + _, err = cache1.Get("k1") + assert.Error(t, err) // "k1" should have been evicted + + value, err := cache1.Get("k2") + assert.NoError(t, err) + assert.Equal(t, "v2", value) + + value, err = cache1.Get("k3") + assert.NoError(t, err) + assert.Equal(t, "v3", value) +} + +func TestMergeWithEmptyOther(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + cache2 := NewMLRU[string, string](group, 2) + + // Put some values in cache1 + err := cache1.Put("k1", "v1") + assert.NoError(t, err) + + // Merging cache1 with empty cache2 + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // cache1 should still have the values from before + value, err := cache1.Get("k1") + assert.NoError(t, err) + assert.Equal(t, "v1", value) +} From c48e645edfd3b36b7e916c1c934dc445ef4424c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 26 Sep 2023 12:00:10 +0200 Subject: [PATCH 04/10] mlru: refactor names so that they make more sense --- ributil/mlru/mlru.go | 145 +++++++++++++++++++++----------------- ributil/mlru/mlru_test.go | 15 +++- 2 files changed, 92 insertions(+), 68 deletions(-) diff --git a/ributil/mlru/mlru.go b/ributil/mlru/mlru.go index bdfb8f5..d2e2934 100644 --- a/ributil/mlru/mlru.go +++ b/ributil/mlru/mlru.go @@ -19,14 +19,14 @@ type MLRU[K comparable, V any] struct { valid bool // Flag to check if the cache is valid currentSize, capacity int64 - first, last *entry[K, V] + newest, oldest *entry[K, V] keys map[K]*entry[K, V] } // entry represents an entry in the LRU cache. type entry[K comparable, V any] struct { - key K - prev, next *entry[K, V] + key K + newer, older *entry[K, V] index int64 value V @@ -42,17 +42,17 @@ func NewMLRU[K comparable, V any](group *LRUGroup, capacity int64) *MLRU[K, V] { } } -// evictLast removes the last entry from the cache. +// evictLast removes the oldest entry from the cache. func (l *MLRU[K, V]) evictLast() { - if l.last != nil { - delete(l.keys, l.last.key) + if l.oldest != nil { + delete(l.keys, l.oldest.key) if l.currentSize == 1 { - // If size is one, set first and last to nil after eviction. - l.first, l.last = nil, nil // TODO NOT COVERED + // If size is one, set newest and oldest to nil after eviction. + l.newest, l.oldest = nil, nil // TODO NOT COVERED } else { - l.last = l.last.prev - if l.last != nil { - l.last.next = nil + l.oldest = l.oldest.newer + if l.oldest != nil { + l.oldest.older = nil } } l.currentSize-- @@ -60,7 +60,7 @@ func (l *MLRU[K, V]) evictLast() { } // Put adds a new entry to the cache or updates an existing entry. -// It evicts the last entry if the cache is at full capacity. +// It evicts the oldest entry if the cache is at full capacity. func (l *MLRU[K, V]) Put(key K, value V) error { l.mu.Lock() defer l.mu.Unlock() @@ -72,15 +72,15 @@ func (l *MLRU[K, V]) Put(key K, value V) error { if existing, ok := l.keys[key]; ok { // Allow updating the value and move the entry to the front. existing.value = value - if existing.prev != nil { - existing.prev.next = existing.next + if existing.newer != nil { + existing.newer.older = existing.older } - if existing.next != nil { - existing.next.prev = existing.prev // TODO NOT COVERED + if existing.older != nil { + existing.older.newer = existing.newer // TODO NOT COVERED } - existing.next = l.first - l.first.prev = existing - l.first = existing + existing.older = l.newest + l.newest.newer = existing + l.newest = existing return nil } @@ -90,12 +90,12 @@ func (l *MLRU[K, V]) Put(key K, value V) error { index := l.group.counter.Add(1) newEntry := &entry[K, V]{key: key, index: index, value: value} l.keys[key] = newEntry - if l.first == nil { - l.first, l.last = newEntry, newEntry + if l.newest == nil { + l.newest, l.oldest = newEntry, newEntry } else { - newEntry.next = l.first - l.first.prev = newEntry - l.first = newEntry + newEntry.older = l.newest + l.newest.newer = newEntry + l.newest = newEntry } l.currentSize++ return nil @@ -118,18 +118,18 @@ func (l *MLRU[K, V]) Get(key K) (V, error) { return zero, errors.New("key does not exist") } // Move the accessed entry to the front. - if entry.prev != nil { - entry.prev.next = entry.next + if entry.newer != nil { + entry.newer.older = entry.older } - if entry.next != nil { - entry.next.prev = entry.prev + if entry.older != nil { + entry.older.newer = entry.newer } else { - // If the entry was the last, update l.last. - l.last = entry.prev + // If the entry was the oldest, update l.oldest. + l.oldest = entry.newer } - entry.next = l.first - l.first.prev = entry - l.first = entry + entry.older = l.newest + l.newest.newer = entry + l.newest = entry return entry.value, nil } @@ -137,6 +137,13 @@ func (l *MLRU[K, V]) Get(key K) (V, error) { // The other cache is invalidated after merging. // Returns an error if either cache is invalid. func (l *MLRU[K, V]) Merge(other *MLRU[K, V]) error { + if other == nil { + return errors.New("invalid cache") + } + if l == other { + return errors.New("cannot merge cache with itself") + } + l.mu.Lock() other.mu.Lock() defer l.mu.Unlock() @@ -146,60 +153,66 @@ func (l *MLRU[K, V]) Merge(other *MLRU[K, V]) error { return errors.New("invalid cache") } + if l.capacity == 0 { + return nil // wat + } + // Invalidating the other cache after merging. other.valid = false - curEntOther := other.first - curEntHere := l.first + newestEntOther := other.newest + newestEntHere := l.newest - for curEntOther != nil { - if curEntHere == nil || curEntHere.index > curEntOther.index { - if curEntHere == nil { - // if curEntHere is nil, assume the list is empty and populate it with curEntOther - l.first = curEntOther - l.last = curEntOther - curEntOther = curEntOther.next + for newestEntOther != nil { + if newestEntHere == nil || newestEntHere.index > newestEntOther.index { + if newestEntHere == nil { + // if newestEntHere is nil, assume the list is empty and populate it with newestEntOther + l.newest = newestEntOther + l.oldest = newestEntOther // Update the keys map with the new entry. - l.keys[curEntOther.key] = curEntOther + l.keys[newestEntOther.key] = newestEntOther + + newestEntHere = newestEntOther + newestEntOther = newestEntOther.older continue } - if curEntHere.next == nil { + if newestEntHere.older == nil { // At the end of the list, append the rest of the other list until at capacity. - for l.currentSize < l.capacity && curEntOther != nil { + for l.currentSize < l.capacity && newestEntOther != nil { l.currentSize++ - curEntHere.next = curEntOther - curEntOther.prev = curEntHere - // Update the last pointer and the keys map. - l.last = curEntOther - l.keys[curEntOther.key] = curEntOther - curEntHere = curEntOther - curEntOther = curEntOther.next + newestEntHere.older = newestEntOther + newestEntOther.newer = newestEntHere + // Update the oldest pointer and the keys map. + l.oldest = newestEntOther + l.keys[newestEntOther.key] = newestEntOther + newestEntHere = newestEntOther + newestEntOther = newestEntOther.older } break } - // Move to the next here, which will have a lower index. - curEntHere = curEntHere.next + // Move to the older here, which will have a lower index. + newestEntHere = newestEntHere.older continue } // Here the other entry has a higher index, insert it before the current entry in this list. if l.currentSize >= l.capacity { - // If already at capacity, evict the last entry to make room. + // If already at capacity, evict the oldest entry to make room. l.evictLast() } - newEntry := curEntOther - curEntOther = curEntOther.next // move to the next in other list. - - // Insert newEntry before curEntHere in this list. - newEntry.next = curEntHere - newEntry.prev = curEntHere.prev - if curEntHere.prev != nil { - curEntHere.prev.next = newEntry + newEntry := newestEntOther + newestEntOther = newestEntOther.older // move to the older in other list. + + // Insert newEntry before newestEntHere in this list. + newEntry.older = newestEntHere + newEntry.newer = newestEntHere.newer + if newestEntHere.newer != nil { + newestEntHere.newer.older = newEntry } else { - // Updating the first entry if needed. - l.first = newEntry + // Updating the newest entry if needed. + l.newest = newEntry } - curEntHere.prev = newEntry + newestEntHere.newer = newEntry l.currentSize++ // Update the keys map with the new entry. l.keys[newEntry.key] = newEntry diff --git a/ributil/mlru/mlru_test.go b/ributil/mlru/mlru_test.go index 839f5fe..8993886 100644 --- a/ributil/mlru/mlru_test.go +++ b/ributil/mlru/mlru_test.go @@ -2,6 +2,7 @@ package mlru import ( "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "sync/atomic" "testing" ) @@ -281,8 +282,7 @@ func TestMergeAndEvict(t *testing.T) { assert.Error(t, err) // "k1" should have been evicted value, err := cache1.Get("k2") - assert.NoError(t, err) - assert.Equal(t, "v2", value) + assert.Error(t, err) // "k2" should have been evicted value, err = cache1.Get("k3") assert.NoError(t, err) @@ -291,6 +291,17 @@ func TestMergeAndEvict(t *testing.T) { value, err = cache1.Get("k4") assert.NoError(t, err) assert.Equal(t, "v4", value) + + // validate entry chain + require.Equal(t, "k4", cache1.newest.key) + require.Equal(t, "k3", cache1.newest.older.key) + require.Nil(t, cache1.newest.newer) + require.Equal(t, "k3", cache1.oldest.key) + require.Equal(t, "k4", cache1.oldest.newer.key) + require.Nil(t, cache1.oldest.older) + + require.Nil(t, cache1.newest.older.older) + require.Nil(t, cache1.oldest.newer.newer) } func TestMergeWithInvalidCache(t *testing.T) { From 4ff256b2019cf321daa6fd06ab1cbac8d023514c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 26 Sep 2023 13:09:49 +0200 Subject: [PATCH 05/10] mlru: Proper Merge, passes tests! --- ributil/mlru/mlru.go | 128 ++++++++++++++++++++++++-------------- ributil/mlru/mlru_test.go | 8 +++ 2 files changed, 88 insertions(+), 48 deletions(-) diff --git a/ributil/mlru/mlru.go b/ributil/mlru/mlru.go index d2e2934..8b7651a 100644 --- a/ributil/mlru/mlru.go +++ b/ributil/mlru/mlru.go @@ -43,8 +43,10 @@ func NewMLRU[K comparable, V any](group *LRUGroup, capacity int64) *MLRU[K, V] { } // evictLast removes the oldest entry from the cache. -func (l *MLRU[K, V]) evictLast() { +func (l *MLRU[K, V]) evictLast() (evicted *entry[K, V]) { if l.oldest != nil { + evicted = l.oldest + delete(l.keys, l.oldest.key) if l.currentSize == 1 { // If size is one, set newest and oldest to nil after eviction. @@ -57,6 +59,8 @@ func (l *MLRU[K, V]) evictLast() { } l.currentSize-- } + + return } // Put adds a new entry to the cache or updates an existing entry. @@ -120,6 +124,9 @@ func (l *MLRU[K, V]) Get(key K) (V, error) { // Move the accessed entry to the front. if entry.newer != nil { entry.newer.older = entry.older + } else { + // already newest + return entry.value, nil } if entry.older != nil { entry.older.newer = entry.newer @@ -128,6 +135,7 @@ func (l *MLRU[K, V]) Get(key K) (V, error) { l.oldest = entry.newer } entry.older = l.newest + entry.newer = nil l.newest.newer = entry l.newest = entry return entry.value, nil @@ -160,62 +168,86 @@ func (l *MLRU[K, V]) Merge(other *MLRU[K, V]) error { // Invalidating the other cache after merging. other.valid = false - newestEntOther := other.newest - newestEntHere := l.newest - - for newestEntOther != nil { - if newestEntHere == nil || newestEntHere.index > newestEntOther.index { - if newestEntHere == nil { - // if newestEntHere is nil, assume the list is empty and populate it with newestEntOther - l.newest = newestEntOther - l.oldest = newestEntOther - // Update the keys map with the new entry. - l.keys[newestEntOther.key] = newestEntOther - - newestEntHere = newestEntOther - newestEntOther = newestEntOther.older - continue - } - if newestEntHere.older == nil { - // At the end of the list, append the rest of the other list until at capacity. - for l.currentSize < l.capacity && newestEntOther != nil { - l.currentSize++ - newestEntHere.older = newestEntOther - newestEntOther.newer = newestEntHere - // Update the oldest pointer and the keys map. - l.oldest = newestEntOther - l.keys[newestEntOther.key] = newestEntOther - newestEntHere = newestEntOther - newestEntOther = newestEntOther.older - } - break + entOther := other.newest + entHere := l.newest + + toIter := l.capacity // tracks how many entries have been processed + + for entOther != nil && toIter > 0 { + // rewind to entOther.index + // atIndex will be decreasing as we iterate because we're viewing older-and-older entries + atIndex := int64(-1) + if entHere != nil { + atIndex = entHere.index + } + + if entOther.index < atIndex { + // need to rewind entHere more + if entHere == nil { + // wut + return errors.New("somehow other ent counter is lt -1") } - // Move to the older here, which will have a lower index. - newestEntHere = newestEntHere.older + + // basically we accept the entry from the cache at this position + entHere = entHere.older + toIter-- continue } - // Here the other entry has a higher index, insert it before the current entry in this list. if l.currentSize >= l.capacity { + // If already at capacity, evict the oldest entry to make room. - l.evictLast() + evicted := l.evictLast() + + if evicted == entHere { + // now we're inserting the oldest element + entHere = nil + } } - newEntry := newestEntOther - newestEntOther = newestEntOther.older // move to the older in other list. - - // Insert newEntry before newestEntHere in this list. - newEntry.older = newestEntHere - newEntry.newer = newestEntHere.newer - if newestEntHere.newer != nil { - newestEntHere.newer.older = newEntry + + olderOther := entOther.older + + l.keys[entOther.key] = entOther + l.currentSize++ + toIter-- + + // now the entHere is OLDER than entOther, so we need to insert entOther + // as the newer element to entHere + + // first make sure that entOther pointers aren't pointing at anything we really + // don't want them to point at + entOther.older, entOther.newer = entHere, nil + + if entHere == nil { + // if entHere is nil, we are inserting the oldest element + entOther.newer = l.oldest + l.oldest = entOther + if entOther.newer != nil { + // there may not have been any older elements + // (this probably means that we're also inserting the newest element, + // and this probably could be in the if body below) + entOther.newer.older = entOther + } + if l.newest == nil { + // and apparently this is also the newest element + l.newest = entOther + } } else { - // Updating the newest entry if needed. - l.newest = newEntry + if entHere.key == entOther.key { + return errors.New("can't merge caches with duplicate keys") + } + + // we are inserting an element newer to entHere + entOther.newer = entHere.newer + entHere.newer = entOther + if l.newest == entHere { + l.newest = entOther + } } - newestEntHere.newer = newEntry - l.currentSize++ - // Update the keys map with the new entry. - l.keys[newEntry.key] = newEntry + + // take next entOther + entOther = olderOther } + return nil } diff --git a/ributil/mlru/mlru_test.go b/ributil/mlru/mlru_test.go index 8993886..689e01f 100644 --- a/ributil/mlru/mlru_test.go +++ b/ributil/mlru/mlru_test.go @@ -277,6 +277,14 @@ func TestMergeAndEvict(t *testing.T) { err = cache1.Merge(cache2) assert.NoError(t, err) + // validate entry chain + require.Equal(t, "k4", cache1.newest.key) + require.Equal(t, "k3", cache1.newest.older.key) + require.Nil(t, cache1.newest.newer) + require.Equal(t, "k3", cache1.oldest.key) + require.Equal(t, "k4", cache1.oldest.newer.key) + require.Nil(t, cache1.oldest.older) + // cache1 should have the most recent values from both caches _, err = cache1.Get("k1") assert.Error(t, err) // "k1" should have been evicted From 93fbb2b55cbd4eced22d104c3e4a83689ca2bd18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 27 Sep 2023 10:54:35 +0000 Subject: [PATCH 06/10] some logcache hacking --- carlog/carlog.go | 14 +-- carlog/idx_level.go | 2 +- ributil/levelcache/levelcache.go | 19 ++++ ributil/logcache/logcache.go | 144 +++++++++++++++++++++++++++++-- 4 files changed, 163 insertions(+), 16 deletions(-) create mode 100644 ributil/levelcache/levelcache.go diff --git a/carlog/carlog.go b/carlog/carlog.go index 2d0beaa..1860b2a 100644 --- a/carlog/carlog.go +++ b/carlog/carlog.go @@ -485,7 +485,7 @@ func (j *CarLog) fixLevelIndex(h Head, w WritableIndex) error { err := j.iterate(h.RetiredAt, func(off int64, length uint64, c cid.Cid, data []byte) error { mhsBuf = append(mhsBuf, c.Hash()) - offsBuf = append(offsBuf, makeOffsetLen(off, int(length))) + offsBuf = append(offsBuf, MakeOffsetLen(off, int(length))) done++ @@ -625,7 +625,7 @@ func (j *CarLog) Put(c []mh.Multihash, b []blocks.Block) error { // todo use a buffer with fixed cid prefix to avoid allocs bcid := cid.NewCidV1(cid.Raw, c[i]).Bytes() - offsets[i] = makeOffsetLen(j.dataLen, len(bcid)+len(blk.RawData())) + offsets[i] = MakeOffsetLen(j.dataLen, len(bcid)+len(blk.RawData())) n, err := j.ldWrite(bcid, blk.RawData()) if err != nil { @@ -773,7 +773,7 @@ func (j *CarLog) View(c []mh.Multihash, cb func(cidx int, found bool, data []byt continue } - off, entLen := fromOffsetLen(locs[i]) + off, entLen := FromOffsetLen(locs[i]) if entLen > len(entBuf) { // expand buffer to next power of two if needed @@ -855,7 +855,7 @@ func (j *CarLog) viewExternal(c []mh.Multihash, cb func(cidx int, found bool, da continue } - off, entLen := fromOffsetLen(locs[i]) + off, entLen := FromOffsetLen(locs[i]) if entLen > len(entBuf) { // expand buffer to next power of two if needed @@ -1478,7 +1478,7 @@ func (c *carIdxSource) List(f func(c mh.Multihash, offs []int64) error) error { return xerrors.Errorf("decode block cid: %w", err) } - err = f(c.Hash(), []int64{makeOffsetLen(at, len(d))}) + err = f(c.Hash(), []int64{MakeOffsetLen(at, len(d))}) if err != nil { return err } @@ -1736,11 +1736,11 @@ func (rs *readSeekerFromReaderAt) Seek(offset int64, whence int) (int64, error) const MaxEntryLen = 1 << (64 - 40) -func makeOffsetLen(off int64, length int) int64 { +func MakeOffsetLen(off int64, length int) int64 { return (int64(length) << 40) | (off & 0xFFFF_FFFF_FF) } -func fromOffsetLen(offlen int64) (int64, int) { +func FromOffsetLen(offlen int64) (int64, int) { return offlen & 0xFFFF_FFFF_FF, int(offlen >> 40) } diff --git a/carlog/idx_level.go b/carlog/idx_level.go index 80b7ab6..7682c66 100644 --- a/carlog/idx_level.go +++ b/carlog/idx_level.go @@ -136,7 +136,7 @@ func (l *LevelDBIndex) ToTruncate(atOrAbove int64) ([]multihash.Multihash, error if len(it.Value()) != 8 { return nil, xerrors.Errorf("invalid value length") } - offs, _ := fromOffsetLen(int64(binary.LittleEndian.Uint64(it.Value()))) + offs, _ := FromOffsetLen(int64(binary.LittleEndian.Uint64(it.Value()))) if offs >= atOrAbove { keyCopy := make([]byte, len(it.Key())) copy(keyCopy, it.Key()) diff --git a/ributil/levelcache/levelcache.go b/ributil/levelcache/levelcache.go new file mode 100644 index 0000000..4d31a0f --- /dev/null +++ b/ributil/levelcache/levelcache.go @@ -0,0 +1,19 @@ +package levelcache + +import "github.com/multiformats/go-multihash" + +type LevelCache struct { + // mlru group + + // persistent top level lru + + // levels, with, at least for now, one logcache+mlru per level +} + +func (c *LevelCache) Put(k multihash.Multihash, v []byte) error { + +} + +func (c *LevelCache) Get(k multihash.Multihash, cb func([]byte) error) error { + +} diff --git a/ributil/logcache/logcache.go b/ributil/logcache/logcache.go index 103d932..26014a4 100644 --- a/ributil/logcache/logcache.go +++ b/ributil/logcache/logcache.go @@ -1,8 +1,13 @@ package logcache import ( - "bufio" + "encoding/binary" + "github.com/ipfs/go-cid" + "github.com/lotus-web3/ribs/carlog" + "golang.org/x/xerrors" + "io" "os" + "path/filepath" "sync" "github.com/multiformats/go-multihash" @@ -15,29 +20,81 @@ const ( type mhStr string +type ReaderAtWriter interface { + io.ReaderAt + io.Writer +} + type LogCache struct { - carFile os.File + carFile *os.File // index file // [[i24:dataLen][i48:offset]:i64][mhLen:i8][multihash]... - indexFile os.File + indexFile *os.File - carBuf, idxBuf bufio.ReaderAt + // todo use some peekable buffered writer for this + carBuf, idxBuf ReaderAtWriter + carAt int64 indexLk sync.Mutex - index map[mhStr]int64 + index map[mhStr]uint64 // entirely separate write mutex because reads happen on read-only data - read sync.RWMutex - write sync.Mutex + readLk sync.RWMutex + writeLk sync.Mutex } -func Open(basePath string) (*LogCache, error) { +func Open(basePath string) (lc *LogCache, err error) { + lc = &LogCache{ + index: map[mhStr]uint64{}, + } + + dir, base := filepath.Split(basePath) + // open car file + lc.carFile, err = os.Open(filepath.Join(dir, base+carExt)) + if err != nil { + return nil, xerrors.Errorf("open cache car: %w", err) + } + + // todo if this is a newly created car, write a dummy header + + lc.carAt // open index file + idxFile, err := os.Open(filepath.Join(dir, base+idxExt)) + if err != nil { + return nil, xerrors.Errorf("open cache index: %w", err) + } + lc.idxBuf = idxFile + + var entBuf [512]byte // load index file into index + for { + // read the 9 header bytes + n, err := idxFile.Read(entBuf[:9]) + if err != nil { + if err == io.EOF && n == 0 { + break + } + + // todo handle truncated index + return nil, xerrors.Errorf("reading index header entry: %w", err) + } + + offLen := binary.LittleEndian.Uint64(entBuf[:8]) + + hlen := entBuf[8] + + // read hash + n, err = idxFile.Read(entBuf[:hlen]) + if err != nil { + return nil, xerrors.Errorf("read index entry key: %w", err) + } + + lc.index[mhStr(entBuf[:hlen])] = offLen + } // check that last index file points to a valid car entry, and that there's no // data at the end of carFile @@ -49,11 +106,82 @@ func Open(basePath string) (*LogCache, error) { // Put returns an error if the multihash is already stored in the cache, otherwise // it appends the entry to the cache car, then appends an index entry func (lc *LogCache) Put(mh multihash.Multihash, data []byte) (err error) { + lc.writeLk.Lock() + defer lc.writeLk.Unlock() + bcid := cid.NewCidV1(cid.Raw, mh).Bytes() + + wrote, err := lc.ldWrite(bcid, data) + if err != nil { + return err + } + + offLen := uint64(carlog.MakeOffsetLen(lc.carAt, int(wrote))) + lc.carAt += wrote + + lc.readLk.Lock() + lc.index[mhStr(mh)] = offLen + lc.readLk.Unlock() + + var idxBuf [512]byte + binary.LittleEndian.PutUint64(idxBuf[:8], offLen) + idxBuf[9] = byte(len(mh)) + n := copy(idxBuf[9:], mh) + if n != int(idxBuf[9]) { + return xerrors.Errorf("copied unexpected number of bytes when writing cache entry multihash; max hash len is 255 bytes") + } + + _, err = lc.idxBuf.Write(idxBuf[:n+9]) + if err != nil { + return xerrors.Errorf("writing cache index entry: %w", err) + } + + return nil +} + +func (lc *LogCache) ldWrite(d ...[]byte) (int64, error) { + var sum uint64 + for _, s := range d { + sum += uint64(len(s)) + } + + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(buf, sum) + + _, err := lc.carBuf.Write(buf[:n]) + if err != nil { + // todo flag as corrupt + return 0, err + } + + for _, s := range d { + _, err = lc.carBuf.Write(s) + if err != nil { + // todo flag as corrupt + return 0, err + } + } + + return int64(sum), nil } func (lc *LogCache) Get(mh multihash.Multihash, cb func([]byte) error) error { + lc.readLk.RLock() + atOffLen, found := lc.index[mhStr(mh)] + lc.readLk.RUnlock() + + if !found { + return nil + } + + off, length := carlog.FromOffsetLen(int64(atOffLen)) + + buf := make([]byte, length+binary.MaxVarintLen64) // todo pool! + _, err := lc.carBuf.ReadAt(buf, off) + if err != nil { + return xerrors.Errorf("read cache car data: %w", err) + } } func (lc *LogCache) Flush() error { From 6861a51013c05fe47b034fb735b424c117d90bb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 28 Sep 2023 13:24:44 +0000 Subject: [PATCH 07/10] logcache: passing tests --- ributil/logcache/logcache.go | 127 +++++++++++++++++++++++++----- ributil/logcache/logcache_test.go | 106 +++++++++++++++++++++++++ 2 files changed, 215 insertions(+), 18 deletions(-) create mode 100644 ributil/logcache/logcache_test.go diff --git a/ributil/logcache/logcache.go b/ributil/logcache/logcache.go index 26014a4..50028cf 100644 --- a/ributil/logcache/logcache.go +++ b/ributil/logcache/logcache.go @@ -3,6 +3,7 @@ package logcache import ( "encoding/binary" "github.com/ipfs/go-cid" + "github.com/ipld/go-car" "github.com/lotus-web3/ribs/carlog" "golang.org/x/xerrors" "io" @@ -25,6 +26,9 @@ type ReaderAtWriter interface { io.Writer } +// LogCache is a cache of logs, backed by a car file and an index file +// The car file contains a dummy header, and a sequence of blocks stored with raw cids +// the index is in-memory + on disk, contains a set of dataLen/offset/multihash entries type LogCache struct { carFile *os.File @@ -52,28 +56,52 @@ func Open(basePath string) (lc *LogCache, err error) { dir, base := filepath.Split(basePath) // open car file - lc.carFile, err = os.Open(filepath.Join(dir, base+carExt)) + lc.carFile, err = os.OpenFile(filepath.Join(dir, base+carExt), os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, xerrors.Errorf("open cache car: %w", err) } + lc.carBuf = lc.carFile - // todo if this is a newly created car, write a dummy header + st, err := lc.carFile.Stat() + if err != nil { + return nil, xerrors.Errorf("stat cache car: %w", err) + } + + if st.Size() == 0 { + // new car, write a dummy header + + head := &car.CarHeader{ + Roots: nil, + Version: 1, + } + + err = car.WriteHeader(head, lc.carFile) + if err != nil { + return nil, xerrors.Errorf("write cache car header: %w", err) + } + + st, err = lc.carFile.Stat() + if err != nil { + return nil, xerrors.Errorf("stat cache car: %w", err) + } + } - lc.carAt + lc.carAt = st.Size() // open index file - idxFile, err := os.Open(filepath.Join(dir, base+idxExt)) + lc.indexFile, err = os.OpenFile(filepath.Join(dir, base+idxExt), os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, xerrors.Errorf("open cache index: %w", err) } - lc.idxBuf = idxFile + lc.idxBuf = lc.indexFile var entBuf [512]byte + var lastOffLen uint64 // load index file into index for { - // read the 9 header bytes - n, err := idxFile.Read(entBuf[:9]) + // read the 9 entry bytes + n, err := lc.indexFile.Read(entBuf[:9]) if err != nil { if err == io.EOF && n == 0 { break @@ -83,24 +111,40 @@ func Open(basePath string) (lc *LogCache, err error) { return nil, xerrors.Errorf("reading index header entry: %w", err) } - offLen := binary.LittleEndian.Uint64(entBuf[:8]) + lastOffLen = binary.LittleEndian.Uint64(entBuf[:8]) hlen := entBuf[8] // read hash - n, err = idxFile.Read(entBuf[:hlen]) + n, err = lc.indexFile.Read(entBuf[:hlen]) if err != nil { return nil, xerrors.Errorf("read index entry key: %w", err) } - lc.index[mhStr(entBuf[:hlen])] = offLen + lc.index[mhStr(entBuf[:hlen])] = lastOffLen } - // check that last index file points to a valid car entry, and that there's no - // data at the end of carFile - // - if there is trailer data in the car, truncate it - // - if the index points beyond the end of the car file, scan the index for the - // last entry which is within the car, truncate the index, then read the car (if last entry is truncated, do truncate it) + if lastOffLen > 0 { // first entry is never at 0 because that's where the car header is + offset, length := carlog.FromOffsetLen(int64(lastOffLen)) + + expectedLen := offset + int64(length) + int64(binary.PutUvarint(entBuf[:], uint64(length))) + if st.Size() != expectedLen { + // todo truncate car file (or the index.. or both) + return nil, xerrors.Errorf("car file is truncated, expected %d bytes, got %d", expectedLen, st.Size()) + } + } + + _, err = lc.carFile.Seek(0, io.SeekEnd) + if err != nil { + return nil, xerrors.Errorf("seek to end of car file: %w", err) + } + + _, err = lc.indexFile.Seek(0, io.SeekEnd) + if err != nil { + return nil, xerrors.Errorf("seek to end of index file: %w", err) + } + + return lc, nil } // Put returns an error if the multihash is already stored in the cache, otherwise @@ -125,9 +169,9 @@ func (lc *LogCache) Put(mh multihash.Multihash, data []byte) (err error) { var idxBuf [512]byte binary.LittleEndian.PutUint64(idxBuf[:8], offLen) - idxBuf[9] = byte(len(mh)) + idxBuf[8] = byte(len(mh)) n := copy(idxBuf[9:], mh) - if n != int(idxBuf[9]) { + if n != int(idxBuf[8]) { return xerrors.Errorf("copied unexpected number of bytes when writing cache entry multihash; max hash len is 255 bytes") } @@ -177,13 +221,60 @@ func (lc *LogCache) Get(mh multihash.Multihash, cb func([]byte) error) error { off, length := carlog.FromOffsetLen(int64(atOffLen)) buf := make([]byte, length+binary.MaxVarintLen64) // todo pool! + vlen := binary.PutUvarint(buf, uint64(length)) - _, err := lc.carBuf.ReadAt(buf, off) + _, err := lc.carBuf.ReadAt(buf[:length+vlen], off) if err != nil { return xerrors.Errorf("read cache car data: %w", err) } + + // buf is [len:varint][cid][data] + + // read length + n, l := binary.Uvarint(buf) // todo skip this + if uint64(length) != n { + return xerrors.Errorf("index entry len mismatch") + } + + cl, _, err := cid.CidFromBytes(buf[l:]) // todo more optimized skip read + if err != nil { + return xerrors.Errorf("read cache car cid: %w", err) + } + + return cb(buf[cl+l : l+int(n)]) } func (lc *LogCache) Flush() error { + lc.writeLk.Lock() + defer lc.writeLk.Unlock() + + // todo sync idx / car Bufs once those are actually buffers + + if err := lc.carFile.Sync(); err != nil { + return xerrors.Errorf("sync car: %w", err) + } + + if err := lc.indexFile.Sync(); err != nil { + return xerrors.Errorf("sync index: %w", err) + } + return nil +} + +// Close closes the opened car and index files. +func (lc *LogCache) Close() error { + lc.writeLk.Lock() + defer lc.writeLk.Unlock() + + // todo sync idx / car Bufs once those are actually buffers + + if err := lc.carFile.Close(); err != nil { + return xerrors.Errorf("close car file: %w", err) + } + + if err := lc.indexFile.Close(); err != nil { + return xerrors.Errorf("close index file: %w", err) + } + + return nil } diff --git a/ributil/logcache/logcache_test.go b/ributil/logcache/logcache_test.go new file mode 100644 index 0000000..2add23e --- /dev/null +++ b/ributil/logcache/logcache_test.go @@ -0,0 +1,106 @@ +package logcache + +import ( + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" + "io/ioutil" + "os" + "testing" +) + +func TestOpen(t *testing.T) { + // Create a temporary directory for testing + dir, err := ioutil.TempDir("", "logcache") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + // Testing opening a new LogCache + lc, err := Open(dir) + assert.NoError(t, err) + assert.NotNil(t, lc) +} + +func TestPut(t *testing.T) { + dir, err := ioutil.TempDir("", "logcache") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + lc, err := Open(dir) + assert.NoError(t, err) + + // Create a sample multihash + mh, err := multihash.Sum([]byte("hello"), multihash.SHA2_256, -1) + assert.NoError(t, err) + + // Testing putting a new entry in the cache + err = lc.Put(mh, []byte("data")) + assert.NoError(t, err) +} + +func TestGet(t *testing.T) { + dir, err := ioutil.TempDir("", "logcache") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + lc, err := Open(dir) + assert.NoError(t, err) + + mh, err := multihash.Sum([]byte("hello"), multihash.SHA2_256, -1) + assert.NoError(t, err) + + err = lc.Put(mh, []byte("data")) + assert.NoError(t, err) + + // Testing getting an entry from the cache + err = lc.Get(mh, func(data []byte) error { + assert.Equal(t, []byte("data"), data) + return nil + }) + assert.NoError(t, err) +} + +func TestFlush(t *testing.T) { + dir, err := ioutil.TempDir("", "logcache") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + lc, err := Open(dir) + assert.NoError(t, err) + + // Testing flushing the cache + err = lc.Flush() + assert.NoError(t, err) +} + +func TestOpenPutCloseOpenGet(t *testing.T) { + dir, err := ioutil.TempDir("", "logcache") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + // First Open + lc, err := Open(dir) + assert.NoError(t, err) + + // Create a sample multihash + mh, err := multihash.Sum([]byte("hello"), multihash.SHA2_256, -1) + assert.NoError(t, err) + + // Put data + err = lc.Put(mh, []byte("data")) + assert.NoError(t, err) + + // Close the LogCache + err = lc.Close() + assert.NoError(t, err) + + // Second Open + lc, err = Open(dir) + assert.NoError(t, err) + + // Get data + err = lc.Get(mh, func(data []byte) error { + assert.Equal(t, []byte("data"), data) + return nil + }) + assert.NoError(t, err) +} From 35aa87054075f2151b5aa365205599651ed3e7f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 28 Sep 2023 13:24:52 +0000 Subject: [PATCH 08/10] mod tidy --- go.mod | 5 ++++- go.sum | 10 ++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index acd02ce..9676ea1 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/aws/aws-sdk-go v1.44.269 + github.com/cheggaaa/pb v1.0.29 github.com/cockroachdb/pebble v0.0.0-20230503034834-93b977533929 github.com/fatih/color v1.13.0 github.com/filecoin-project/go-address v1.1.0 @@ -45,6 +46,7 @@ require ( github.com/raulk/go-watchdog v1.3.0 github.com/stretchr/testify v1.8.4 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 + github.com/urfave/cli/v2 v2.25.5 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa go.uber.org/multierr v1.11.0 @@ -171,6 +173,7 @@ require ( github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-runewidth v0.0.10 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/miekg/dns v1.1.54 // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect @@ -203,12 +206,12 @@ require ( github.com/quic-go/quic-go v0.33.0 // indirect github.com/quic-go/webtransport-go v0.5.3 // indirect github.com/raulk/clock v1.1.0 // indirect + github.com/rivo/uniseg v0.1.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v2.18.12+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/urfave/cli/v2 v2.25.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.0.1 // indirect github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba // indirect diff --git a/go.sum b/go.sum index 94d4164..4790853 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/cheggaaa/pb v1.0.29 h1:FckUN5ngEk2LpvuG0fw1GEFx6LtyY2pWI/Z2QgCnEYo= +github.com/cheggaaa/pb v1.0.29/go.mod h1:W40334L7FMC5JKWldsTWbdGjLo0RxUKK73K+TuPxX30= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -203,6 +205,7 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= @@ -1043,6 +1046,7 @@ github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= @@ -1052,6 +1056,7 @@ github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= +github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= @@ -1059,6 +1064,9 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.10 h1:CoZ3S2P7pvtP45xOtBw+/mDL2z0RKI576gSkzRRpdGg= +github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= @@ -1315,6 +1323,8 @@ github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtB github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY= +github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= From 9e7656aa5765150f4e566ee037f4c8d461c56680 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 28 Sep 2023 13:51:38 +0000 Subject: [PATCH 09/10] levelcache: wip --- ributil/levelcache/levelcache.go | 45 ++++++++++++++++++++++++++++++- ributil/logcache/logcache_test.go | 5 ++-- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/ributil/levelcache/levelcache.go b/ributil/levelcache/levelcache.go index 4d31a0f..4b5e19f 100644 --- a/ributil/levelcache/levelcache.go +++ b/ributil/levelcache/levelcache.go @@ -1,13 +1,56 @@ package levelcache -import "github.com/multiformats/go-multihash" +import ( + "github.com/lotus-web3/ribs/ributil/logcache" + "github.com/lotus-web3/ribs/ributil/mlru" + "github.com/multiformats/go-multihash" + "sync" +) + +type mhStr string type LevelCache struct { + // root dir + // Contents: + // /l0.lru + // /l0.car + // /l0.offs + // ... + root string + + // settings + l0size int64 + levelExpansion int + // mlru group + lrugroup *mlru.LRUGroup // persistent top level lru + // (the top layer is special because it's the only actively mutated layer) + topLru *mlru.MLRU[mhStr, bool] + topLog *logcache.LogCache // levels, with, at least for now, one logcache+mlru per level + levels []*cacheLevel + + // compaction stuff + compactLk sync.RWMutex + + // todo mem tier + // temp memory cache used during compaction + compactCache map[mhStr][]byte +} + +type cacheLevel struct { + lru *mlru.MLRU[mhStr, bool] + log *logcache.LogCache +} + +func (c *LevelCache) compactTop() error { + c.compactLk.Lock() + defer c.compactLk.Unlock() + + c.levels[0].lru.EvictStats() } func (c *LevelCache) Put(k multihash.Multihash, v []byte) error { diff --git a/ributil/logcache/logcache_test.go b/ributil/logcache/logcache_test.go index 2add23e..f03a24d 100644 --- a/ributil/logcache/logcache_test.go +++ b/ributil/logcache/logcache_test.go @@ -1,11 +1,12 @@ package logcache import ( - "github.com/multiformats/go-multihash" - "github.com/stretchr/testify/assert" "io/ioutil" "os" "testing" + + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" ) func TestOpen(t *testing.T) { From 6dcbb9c569438dda4937f3c2ca92ba1b5a580383 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Oct 2023 13:21:45 +0200 Subject: [PATCH 10/10] mlru: Error on full cache put --- ributil/mlru/mlru.go | 7 ++++++- ributil/mlru/mlru_test.go | 17 +++++++---------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/ributil/mlru/mlru.go b/ributil/mlru/mlru.go index 8b7651a..e18f01d 100644 --- a/ributil/mlru/mlru.go +++ b/ributil/mlru/mlru.go @@ -63,6 +63,11 @@ func (l *MLRU[K, V]) evictLast() (evicted *entry[K, V]) { return } +// normal lru would just evict the oldest entry. This is part of levelcache, +// where instead of evicting on put, we will merge this layer into the lower layer +// which is where the eviction happens. +var ErrCacheFull = errors.New("cache is full") + // Put adds a new entry to the cache or updates an existing entry. // It evicts the oldest entry if the cache is at full capacity. func (l *MLRU[K, V]) Put(key K, value V) error { @@ -89,7 +94,7 @@ func (l *MLRU[K, V]) Put(key K, value V) error { } if l.currentSize >= l.capacity { - l.evictLast() + return ErrCacheFull } index := l.group.counter.Add(1) newEntry := &entry[K, V]{key: key, index: index, value: value} diff --git a/ributil/mlru/mlru_test.go b/ributil/mlru/mlru_test.go index 689e01f..c47033c 100644 --- a/ributil/mlru/mlru_test.go +++ b/ributil/mlru/mlru_test.go @@ -27,18 +27,18 @@ func TestEvictLast(t *testing.T) { err = cache.Put(2, 200) assert.NoError(t, err) err = cache.Put(3, 300) - assert.NoError(t, err) + assert.Equal(t, ErrCacheFull, err) - _, err = cache.Get(1) - assert.Error(t, err) // The key 1 should have been evicted + _, err = cache.Get(3) + assert.Error(t, err) // The key 3 should not have been put val, err := cache.Get(2) assert.NoError(t, err) assert.Equal(t, 200, val) - val, err = cache.Get(3) + val, err = cache.Get(1) assert.NoError(t, err) - assert.Equal(t, 300, val) + assert.Equal(t, 100, val) } func TestUpdateExistingKey(t *testing.T) { @@ -130,11 +130,8 @@ func TestEvictLastWithOneElement(t *testing.T) { err := cache.Put(1, 100) assert.NoError(t, err) - err = cache.Put(2, 200) // This should evict key 1 - assert.NoError(t, err) - - _, err = cache.Get(1) - assert.Error(t, err) // The key 1 should have been evicted + err = cache.Put(2, 200) // This should say the cache is full + assert.EqualError(t, err, ErrCacheFull.Error()) } func TestInvalidCachePut(t *testing.T) {