diff --git a/carlog/carlog.go b/carlog/carlog.go index 2d0beaa..1860b2a 100644 --- a/carlog/carlog.go +++ b/carlog/carlog.go @@ -485,7 +485,7 @@ func (j *CarLog) fixLevelIndex(h Head, w WritableIndex) error { err := j.iterate(h.RetiredAt, func(off int64, length uint64, c cid.Cid, data []byte) error { mhsBuf = append(mhsBuf, c.Hash()) - offsBuf = append(offsBuf, makeOffsetLen(off, int(length))) + offsBuf = append(offsBuf, MakeOffsetLen(off, int(length))) done++ @@ -625,7 +625,7 @@ func (j *CarLog) Put(c []mh.Multihash, b []blocks.Block) error { // todo use a buffer with fixed cid prefix to avoid allocs bcid := cid.NewCidV1(cid.Raw, c[i]).Bytes() - offsets[i] = makeOffsetLen(j.dataLen, len(bcid)+len(blk.RawData())) + offsets[i] = MakeOffsetLen(j.dataLen, len(bcid)+len(blk.RawData())) n, err := j.ldWrite(bcid, blk.RawData()) if err != nil { @@ -773,7 +773,7 @@ func (j *CarLog) View(c []mh.Multihash, cb func(cidx int, found bool, data []byt continue } - off, entLen := fromOffsetLen(locs[i]) + off, entLen := FromOffsetLen(locs[i]) if entLen > len(entBuf) { // expand buffer to next power of two if needed @@ -855,7 +855,7 @@ func (j *CarLog) viewExternal(c []mh.Multihash, cb func(cidx int, found bool, da continue } - off, entLen := fromOffsetLen(locs[i]) + off, entLen := FromOffsetLen(locs[i]) if entLen > len(entBuf) { // expand buffer to next power of two if needed @@ -1478,7 +1478,7 @@ func (c *carIdxSource) List(f func(c mh.Multihash, offs []int64) error) error { return xerrors.Errorf("decode block cid: %w", err) } - err = f(c.Hash(), []int64{makeOffsetLen(at, len(d))}) + err = f(c.Hash(), []int64{MakeOffsetLen(at, len(d))}) if err != nil { return err } @@ -1736,11 +1736,11 @@ func (rs *readSeekerFromReaderAt) Seek(offset int64, whence int) (int64, error) const MaxEntryLen = 1 << (64 - 40) -func makeOffsetLen(off int64, length int) int64 { +func MakeOffsetLen(off int64, length int) int64 { return (int64(length) << 40) | (off & 0xFFFF_FFFF_FF) } -func fromOffsetLen(offlen int64) (int64, int) { +func FromOffsetLen(offlen int64) (int64, int) { return offlen & 0xFFFF_FFFF_FF, int(offlen >> 40) } diff --git a/carlog/idx_level.go b/carlog/idx_level.go index 80b7ab6..7682c66 100644 --- a/carlog/idx_level.go +++ b/carlog/idx_level.go @@ -136,7 +136,7 @@ func (l *LevelDBIndex) ToTruncate(atOrAbove int64) ([]multihash.Multihash, error if len(it.Value()) != 8 { return nil, xerrors.Errorf("invalid value length") } - offs, _ := fromOffsetLen(int64(binary.LittleEndian.Uint64(it.Value()))) + offs, _ := FromOffsetLen(int64(binary.LittleEndian.Uint64(it.Value()))) if offs >= atOrAbove { keyCopy := make([]byte, len(it.Key())) copy(keyCopy, it.Key()) diff --git a/go.mod b/go.mod index acd02ce..9676ea1 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/aws/aws-sdk-go v1.44.269 + github.com/cheggaaa/pb v1.0.29 github.com/cockroachdb/pebble v0.0.0-20230503034834-93b977533929 github.com/fatih/color v1.13.0 github.com/filecoin-project/go-address v1.1.0 @@ -45,6 +46,7 @@ require ( github.com/raulk/go-watchdog v1.3.0 github.com/stretchr/testify v1.8.4 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 + github.com/urfave/cli/v2 v2.25.5 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa go.uber.org/multierr v1.11.0 @@ -171,6 +173,7 @@ require ( github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-runewidth v0.0.10 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/miekg/dns v1.1.54 // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect @@ -203,12 +206,12 @@ require ( github.com/quic-go/quic-go v0.33.0 // indirect github.com/quic-go/webtransport-go v0.5.3 // indirect github.com/raulk/clock v1.1.0 // indirect + github.com/rivo/uniseg v0.1.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v2.18.12+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/urfave/cli/v2 v2.25.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.0.1 // indirect github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba // indirect diff --git a/go.sum b/go.sum index 94d4164..4790853 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/cheggaaa/pb v1.0.29 h1:FckUN5ngEk2LpvuG0fw1GEFx6LtyY2pWI/Z2QgCnEYo= +github.com/cheggaaa/pb v1.0.29/go.mod h1:W40334L7FMC5JKWldsTWbdGjLo0RxUKK73K+TuPxX30= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -203,6 +205,7 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= @@ -1043,6 +1046,7 @@ github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= @@ -1052,6 +1056,7 @@ github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= +github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= @@ -1059,6 +1064,9 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.10 h1:CoZ3S2P7pvtP45xOtBw+/mDL2z0RKI576gSkzRRpdGg= +github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= @@ -1315,6 +1323,8 @@ github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtB github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY= +github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= diff --git a/ributil/levelcache/levelcache.go b/ributil/levelcache/levelcache.go new file mode 100644 index 0000000..4b5e19f --- /dev/null +++ b/ributil/levelcache/levelcache.go @@ -0,0 +1,62 @@ +package levelcache + +import ( + "github.com/lotus-web3/ribs/ributil/logcache" + "github.com/lotus-web3/ribs/ributil/mlru" + "github.com/multiformats/go-multihash" + "sync" +) + +type mhStr string + +type LevelCache struct { + // root dir + // Contents: + // /l0.lru + // /l0.car + // /l0.offs + // ... + root string + + // settings + l0size int64 + levelExpansion int + + // mlru group + lrugroup *mlru.LRUGroup + + // persistent top level lru + // (the top layer is special because it's the only actively mutated layer) + topLru *mlru.MLRU[mhStr, bool] + topLog *logcache.LogCache + + // levels, with, at least for now, one logcache+mlru per level + levels []*cacheLevel + + // compaction stuff + compactLk sync.RWMutex + + // todo mem tier + // temp memory cache used during compaction + compactCache map[mhStr][]byte +} + +type cacheLevel struct { + lru *mlru.MLRU[mhStr, bool] + log *logcache.LogCache +} + +func (c *LevelCache) compactTop() error { + c.compactLk.Lock() + defer c.compactLk.Unlock() + + c.levels[0].lru.EvictStats() +} + +func (c *LevelCache) Put(k multihash.Multihash, v []byte) error { + +} + +func (c *LevelCache) Get(k multihash.Multihash, cb func([]byte) error) error { + +} diff --git a/ributil/logcache/logcache.go b/ributil/logcache/logcache.go new file mode 100644 index 0000000..50028cf --- /dev/null +++ b/ributil/logcache/logcache.go @@ -0,0 +1,280 @@ +package logcache + +import ( + "encoding/binary" + "github.com/ipfs/go-cid" + "github.com/ipld/go-car" + "github.com/lotus-web3/ribs/carlog" + "golang.org/x/xerrors" + "io" + "os" + "path/filepath" + "sync" + + "github.com/multiformats/go-multihash" +) + +const ( + carExt = ".car" + idxExt = ".offs" +) + +type mhStr string + +type ReaderAtWriter interface { + io.ReaderAt + io.Writer +} + +// LogCache is a cache of logs, backed by a car file and an index file +// The car file contains a dummy header, and a sequence of blocks stored with raw cids +// the index is in-memory + on disk, contains a set of dataLen/offset/multihash entries +type LogCache struct { + carFile *os.File + + // index file + // [[i24:dataLen][i48:offset]:i64][mhLen:i8][multihash]... + indexFile *os.File + + // todo use some peekable buffered writer for this + carBuf, idxBuf ReaderAtWriter + carAt int64 + + indexLk sync.Mutex + index map[mhStr]uint64 + + // entirely separate write mutex because reads happen on read-only data + readLk sync.RWMutex + writeLk sync.Mutex +} + +func Open(basePath string) (lc *LogCache, err error) { + lc = &LogCache{ + index: map[mhStr]uint64{}, + } + + dir, base := filepath.Split(basePath) + + // open car file + lc.carFile, err = os.OpenFile(filepath.Join(dir, base+carExt), os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, xerrors.Errorf("open cache car: %w", err) + } + lc.carBuf = lc.carFile + + st, err := lc.carFile.Stat() + if err != nil { + return nil, xerrors.Errorf("stat cache car: %w", err) + } + + if st.Size() == 0 { + // new car, write a dummy header + + head := &car.CarHeader{ + Roots: nil, + Version: 1, + } + + err = car.WriteHeader(head, lc.carFile) + if err != nil { + return nil, xerrors.Errorf("write cache car header: %w", err) + } + + st, err = lc.carFile.Stat() + if err != nil { + return nil, xerrors.Errorf("stat cache car: %w", err) + } + } + + lc.carAt = st.Size() + + // open index file + lc.indexFile, err = os.OpenFile(filepath.Join(dir, base+idxExt), os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, xerrors.Errorf("open cache index: %w", err) + } + lc.idxBuf = lc.indexFile + + var entBuf [512]byte + var lastOffLen uint64 + + // load index file into index + for { + // read the 9 entry bytes + n, err := lc.indexFile.Read(entBuf[:9]) + if err != nil { + if err == io.EOF && n == 0 { + break + } + + // todo handle truncated index + return nil, xerrors.Errorf("reading index header entry: %w", err) + } + + lastOffLen = binary.LittleEndian.Uint64(entBuf[:8]) + + hlen := entBuf[8] + + // read hash + n, err = lc.indexFile.Read(entBuf[:hlen]) + if err != nil { + return nil, xerrors.Errorf("read index entry key: %w", err) + } + + lc.index[mhStr(entBuf[:hlen])] = lastOffLen + } + + if lastOffLen > 0 { // first entry is never at 0 because that's where the car header is + offset, length := carlog.FromOffsetLen(int64(lastOffLen)) + + expectedLen := offset + int64(length) + int64(binary.PutUvarint(entBuf[:], uint64(length))) + if st.Size() != expectedLen { + // todo truncate car file (or the index.. or both) + return nil, xerrors.Errorf("car file is truncated, expected %d bytes, got %d", expectedLen, st.Size()) + } + } + + _, err = lc.carFile.Seek(0, io.SeekEnd) + if err != nil { + return nil, xerrors.Errorf("seek to end of car file: %w", err) + } + + _, err = lc.indexFile.Seek(0, io.SeekEnd) + if err != nil { + return nil, xerrors.Errorf("seek to end of index file: %w", err) + } + + return lc, nil +} + +// Put returns an error if the multihash is already stored in the cache, otherwise +// it appends the entry to the cache car, then appends an index entry +func (lc *LogCache) Put(mh multihash.Multihash, data []byte) (err error) { + lc.writeLk.Lock() + defer lc.writeLk.Unlock() + + bcid := cid.NewCidV1(cid.Raw, mh).Bytes() + + wrote, err := lc.ldWrite(bcid, data) + if err != nil { + return err + } + + offLen := uint64(carlog.MakeOffsetLen(lc.carAt, int(wrote))) + lc.carAt += wrote + + lc.readLk.Lock() + lc.index[mhStr(mh)] = offLen + lc.readLk.Unlock() + + var idxBuf [512]byte + binary.LittleEndian.PutUint64(idxBuf[:8], offLen) + idxBuf[8] = byte(len(mh)) + n := copy(idxBuf[9:], mh) + if n != int(idxBuf[8]) { + return xerrors.Errorf("copied unexpected number of bytes when writing cache entry multihash; max hash len is 255 bytes") + } + + _, err = lc.idxBuf.Write(idxBuf[:n+9]) + if err != nil { + return xerrors.Errorf("writing cache index entry: %w", err) + } + + return nil +} + +func (lc *LogCache) ldWrite(d ...[]byte) (int64, error) { + var sum uint64 + for _, s := range d { + sum += uint64(len(s)) + } + + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(buf, sum) + + _, err := lc.carBuf.Write(buf[:n]) + if err != nil { + // todo flag as corrupt + return 0, err + } + + for _, s := range d { + _, err = lc.carBuf.Write(s) + if err != nil { + // todo flag as corrupt + return 0, err + } + } + + return int64(sum), nil +} + +func (lc *LogCache) Get(mh multihash.Multihash, cb func([]byte) error) error { + lc.readLk.RLock() + atOffLen, found := lc.index[mhStr(mh)] + lc.readLk.RUnlock() + + if !found { + return nil + } + + off, length := carlog.FromOffsetLen(int64(atOffLen)) + + buf := make([]byte, length+binary.MaxVarintLen64) // todo pool! + vlen := binary.PutUvarint(buf, uint64(length)) + + _, err := lc.carBuf.ReadAt(buf[:length+vlen], off) + if err != nil { + return xerrors.Errorf("read cache car data: %w", err) + } + + // buf is [len:varint][cid][data] + + // read length + n, l := binary.Uvarint(buf) // todo skip this + if uint64(length) != n { + return xerrors.Errorf("index entry len mismatch") + } + + cl, _, err := cid.CidFromBytes(buf[l:]) // todo more optimized skip read + if err != nil { + return xerrors.Errorf("read cache car cid: %w", err) + } + + return cb(buf[cl+l : l+int(n)]) +} + +func (lc *LogCache) Flush() error { + lc.writeLk.Lock() + defer lc.writeLk.Unlock() + + // todo sync idx / car Bufs once those are actually buffers + + if err := lc.carFile.Sync(); err != nil { + return xerrors.Errorf("sync car: %w", err) + } + + if err := lc.indexFile.Sync(); err != nil { + return xerrors.Errorf("sync index: %w", err) + } + + return nil +} + +// Close closes the opened car and index files. +func (lc *LogCache) Close() error { + lc.writeLk.Lock() + defer lc.writeLk.Unlock() + + // todo sync idx / car Bufs once those are actually buffers + + if err := lc.carFile.Close(); err != nil { + return xerrors.Errorf("close car file: %w", err) + } + + if err := lc.indexFile.Close(); err != nil { + return xerrors.Errorf("close index file: %w", err) + } + + return nil +} diff --git a/ributil/logcache/logcache_test.go b/ributil/logcache/logcache_test.go new file mode 100644 index 0000000..f03a24d --- /dev/null +++ b/ributil/logcache/logcache_test.go @@ -0,0 +1,107 @@ +package logcache + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" +) + +func TestOpen(t *testing.T) { + // Create a temporary directory for testing + dir, err := ioutil.TempDir("", "logcache") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + // Testing opening a new LogCache + lc, err := Open(dir) + assert.NoError(t, err) + assert.NotNil(t, lc) +} + +func TestPut(t *testing.T) { + dir, err := ioutil.TempDir("", "logcache") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + lc, err := Open(dir) + assert.NoError(t, err) + + // Create a sample multihash + mh, err := multihash.Sum([]byte("hello"), multihash.SHA2_256, -1) + assert.NoError(t, err) + + // Testing putting a new entry in the cache + err = lc.Put(mh, []byte("data")) + assert.NoError(t, err) +} + +func TestGet(t *testing.T) { + dir, err := ioutil.TempDir("", "logcache") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + lc, err := Open(dir) + assert.NoError(t, err) + + mh, err := multihash.Sum([]byte("hello"), multihash.SHA2_256, -1) + assert.NoError(t, err) + + err = lc.Put(mh, []byte("data")) + assert.NoError(t, err) + + // Testing getting an entry from the cache + err = lc.Get(mh, func(data []byte) error { + assert.Equal(t, []byte("data"), data) + return nil + }) + assert.NoError(t, err) +} + +func TestFlush(t *testing.T) { + dir, err := ioutil.TempDir("", "logcache") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + lc, err := Open(dir) + assert.NoError(t, err) + + // Testing flushing the cache + err = lc.Flush() + assert.NoError(t, err) +} + +func TestOpenPutCloseOpenGet(t *testing.T) { + dir, err := ioutil.TempDir("", "logcache") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + // First Open + lc, err := Open(dir) + assert.NoError(t, err) + + // Create a sample multihash + mh, err := multihash.Sum([]byte("hello"), multihash.SHA2_256, -1) + assert.NoError(t, err) + + // Put data + err = lc.Put(mh, []byte("data")) + assert.NoError(t, err) + + // Close the LogCache + err = lc.Close() + assert.NoError(t, err) + + // Second Open + lc, err = Open(dir) + assert.NoError(t, err) + + // Get data + err = lc.Get(mh, func(data []byte) error { + assert.Equal(t, []byte("data"), data) + return nil + }) + assert.NoError(t, err) +} diff --git a/ributil/mlru/mlru.go b/ributil/mlru/mlru.go new file mode 100644 index 0000000..e18f01d --- /dev/null +++ b/ributil/mlru/mlru.go @@ -0,0 +1,258 @@ +// Package mlru provides a thread-safe, mergeable least recently used (LRU) cache. +package mlru + +import ( + "errors" + "sync" + "sync/atomic" +) + +// LRUGroup contains a shared counter used for indexing entries. +type LRUGroup struct { + counter atomic.Int64 +} + +// MLRU represents a mergeable least recently used (LRU) cache. +type MLRU[K comparable, V any] struct { + group *LRUGroup + mu sync.Mutex // Mutex to ensure thread safety + valid bool // Flag to check if the cache is valid + + currentSize, capacity int64 + newest, oldest *entry[K, V] + keys map[K]*entry[K, V] +} + +// entry represents an entry in the LRU cache. +type entry[K comparable, V any] struct { + key K + newer, older *entry[K, V] + + index int64 + value V +} + +// NewMLRU creates a new MLRU cache with the specified group and capacity. +func NewMLRU[K comparable, V any](group *LRUGroup, capacity int64) *MLRU[K, V] { + return &MLRU[K, V]{ + group: group, + valid: true, + capacity: capacity, + keys: make(map[K]*entry[K, V]), + } +} + +// evictLast removes the oldest entry from the cache. +func (l *MLRU[K, V]) evictLast() (evicted *entry[K, V]) { + if l.oldest != nil { + evicted = l.oldest + + delete(l.keys, l.oldest.key) + if l.currentSize == 1 { + // If size is one, set newest and oldest to nil after eviction. + l.newest, l.oldest = nil, nil // TODO NOT COVERED + } else { + l.oldest = l.oldest.newer + if l.oldest != nil { + l.oldest.older = nil + } + } + l.currentSize-- + } + + return +} + +// normal lru would just evict the oldest entry. This is part of levelcache, +// where instead of evicting on put, we will merge this layer into the lower layer +// which is where the eviction happens. +var ErrCacheFull = errors.New("cache is full") + +// Put adds a new entry to the cache or updates an existing entry. +// It evicts the oldest entry if the cache is at full capacity. +func (l *MLRU[K, V]) Put(key K, value V) error { + l.mu.Lock() + defer l.mu.Unlock() + + if !l.valid { + return errors.New("invalid cache") // TODO NOT COVERED + } + + if existing, ok := l.keys[key]; ok { + // Allow updating the value and move the entry to the front. + existing.value = value + if existing.newer != nil { + existing.newer.older = existing.older + } + if existing.older != nil { + existing.older.newer = existing.newer // TODO NOT COVERED + } + existing.older = l.newest + l.newest.newer = existing + l.newest = existing + return nil + } + + if l.currentSize >= l.capacity { + return ErrCacheFull + } + index := l.group.counter.Add(1) + newEntry := &entry[K, V]{key: key, index: index, value: value} + l.keys[key] = newEntry + if l.newest == nil { + l.newest, l.oldest = newEntry, newEntry + } else { + newEntry.older = l.newest + l.newest.newer = newEntry + l.newest = newEntry + } + l.currentSize++ + return nil +} + +// Get retrieves an entry from the cache and moves it to the front. +// Returns an error if the key does not exist or the cache is invalid. +func (l *MLRU[K, V]) Get(key K) (V, error) { + l.mu.Lock() + defer l.mu.Unlock() + + if !l.valid { + var zero V + return zero, errors.New("invalid cache") + } + + entry, exists := l.keys[key] + if !exists { + var zero V + return zero, errors.New("key does not exist") + } + // Move the accessed entry to the front. + if entry.newer != nil { + entry.newer.older = entry.older + } else { + // already newest + return entry.value, nil + } + if entry.older != nil { + entry.older.newer = entry.newer + } else { + // If the entry was the oldest, update l.oldest. + l.oldest = entry.newer + } + entry.older = l.newest + entry.newer = nil + l.newest.newer = entry + l.newest = entry + return entry.value, nil +} + +// Merge merges another MLRU cache into the current cache. +// The other cache is invalidated after merging. +// Returns an error if either cache is invalid. +func (l *MLRU[K, V]) Merge(other *MLRU[K, V]) error { + if other == nil { + return errors.New("invalid cache") + } + if l == other { + return errors.New("cannot merge cache with itself") + } + + l.mu.Lock() + other.mu.Lock() + defer l.mu.Unlock() + defer other.mu.Unlock() + + if !l.valid || !other.valid { + return errors.New("invalid cache") + } + + if l.capacity == 0 { + return nil // wat + } + + // Invalidating the other cache after merging. + other.valid = false + + entOther := other.newest + entHere := l.newest + + toIter := l.capacity // tracks how many entries have been processed + + for entOther != nil && toIter > 0 { + // rewind to entOther.index + // atIndex will be decreasing as we iterate because we're viewing older-and-older entries + atIndex := int64(-1) + if entHere != nil { + atIndex = entHere.index + } + + if entOther.index < atIndex { + // need to rewind entHere more + if entHere == nil { + // wut + return errors.New("somehow other ent counter is lt -1") + } + + // basically we accept the entry from the cache at this position + entHere = entHere.older + toIter-- + continue + } + + if l.currentSize >= l.capacity { + + // If already at capacity, evict the oldest entry to make room. + evicted := l.evictLast() + + if evicted == entHere { + // now we're inserting the oldest element + entHere = nil + } + } + + olderOther := entOther.older + + l.keys[entOther.key] = entOther + l.currentSize++ + toIter-- + + // now the entHere is OLDER than entOther, so we need to insert entOther + // as the newer element to entHere + + // first make sure that entOther pointers aren't pointing at anything we really + // don't want them to point at + entOther.older, entOther.newer = entHere, nil + + if entHere == nil { + // if entHere is nil, we are inserting the oldest element + entOther.newer = l.oldest + l.oldest = entOther + if entOther.newer != nil { + // there may not have been any older elements + // (this probably means that we're also inserting the newest element, + // and this probably could be in the if body below) + entOther.newer.older = entOther + } + if l.newest == nil { + // and apparently this is also the newest element + l.newest = entOther + } + } else { + if entHere.key == entOther.key { + return errors.New("can't merge caches with duplicate keys") + } + + // we are inserting an element newer to entHere + entOther.newer = entHere.newer + entHere.newer = entOther + if l.newest == entHere { + l.newest = entOther + } + } + + // take next entOther + entOther = olderOther + } + + return nil +} diff --git a/ributil/mlru/mlru_test.go b/ributil/mlru/mlru_test.go new file mode 100644 index 0000000..c47033c --- /dev/null +++ b/ributil/mlru/mlru_test.go @@ -0,0 +1,389 @@ +package mlru + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sync/atomic" + "testing" +) + +func TestPutAndGet(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache := NewMLRU[int, int](group, 2) + + err := cache.Put(1, 100) + assert.NoError(t, err) + val, err := cache.Get(1) + assert.NoError(t, err) + assert.Equal(t, 100, val) +} + +func TestEvictLast(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache := NewMLRU[int, int](group, 2) + + err := cache.Put(1, 100) + assert.NoError(t, err) + err = cache.Put(2, 200) + assert.NoError(t, err) + err = cache.Put(3, 300) + assert.Equal(t, ErrCacheFull, err) + + _, err = cache.Get(3) + assert.Error(t, err) // The key 3 should not have been put + + val, err := cache.Get(2) + assert.NoError(t, err) + assert.Equal(t, 200, val) + + val, err = cache.Get(1) + assert.NoError(t, err) + assert.Equal(t, 100, val) +} + +func TestUpdateExistingKey(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache := NewMLRU[int, int](group, 2) + + err := cache.Put(1, 100) + assert.NoError(t, err) + err = cache.Put(1, 150) + assert.NoError(t, err) + + val, err := cache.Get(1) + assert.NoError(t, err) + assert.Equal(t, 150, val) +} + +func TestMergeCaches(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} // Shared group + cache1 := NewMLRU[int, int](group, 2) + err := cache1.Put(1, 100) + assert.NoError(t, err) + err = cache1.Put(2, 200) + assert.NoError(t, err) + + cache2 := NewMLRU[int, int](group, 2) // Sharing the same group + err = cache2.Put(3, 300) + assert.NoError(t, err) + err = cache2.Put(4, 400) + assert.NoError(t, err) + + err = cache1.Merge(cache2) + assert.NoError(t, err) + + val, err := cache1.Get(3) + assert.NoError(t, err) + assert.Equal(t, 300, val) + + val, err = cache1.Get(4) + assert.NoError(t, err) + assert.Equal(t, 400, val) + + _, err = cache2.Get(3) + assert.Error(t, err) // cache2 should be invalid after the merge +} + +func TestOrderedMerge(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} // Shared group + cache1 := NewMLRU[int, int](group, 2) + err := cache1.Put(1, 100) // v1 into cache1 + assert.NoError(t, err) + + cache2 := NewMLRU[int, int](group, 2) // Sharing the same group + err = cache2.Put(3, 300) // v3 into cache2 + assert.NoError(t, err) + + err = cache1.Put(2, 200) // v2 into cache1 + assert.NoError(t, err) + + err = cache2.Put(4, 400) // v4 into cache2 + assert.NoError(t, err) + + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // Expecting v2, v4 in cache1 + val, err := cache1.Get(2) + assert.NoError(t, err) + assert.Equal(t, 200, val) + + val, err = cache1.Get(4) + assert.NoError(t, err) + assert.Equal(t, 400, val) + + // v1 and v3 should not be in cache1 after the merge + _, err = cache1.Get(1) + assert.Error(t, err) + + _, err = cache1.Get(3) + assert.Error(t, err) + + // cache2 should be invalid after the merge + _, err = cache2.Get(3) + assert.Error(t, err) +} + +func TestEvictLastWithOneElement(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache := NewMLRU[int, int](group, 1) + + err := cache.Put(1, 100) + assert.NoError(t, err) + err = cache.Put(2, 200) // This should say the cache is full + assert.EqualError(t, err, ErrCacheFull.Error()) +} + +func TestInvalidCachePut(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache1 := NewMLRU[int, int](group, 2) + cache2 := NewMLRU[int, int](group, 2) + + err := cache1.Merge(cache2) // This will invalidate cache2 + assert.NoError(t, err) + + err = cache2.Put(1, 100) + assert.Error(t, err) // Should return error as the cache is invalid + assert.Equal(t, "invalid cache", err.Error()) +} + +func TestMergeWithMovingToNext(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache1 := NewMLRU[int, int](group, 2) + err := cache1.Put(2, 200) // most recent entry in cache1 + assert.NoError(t, err) + + cache2 := NewMLRU[int, int](group, 2) + err = cache2.Put(1, 100) // older entry in cache2 + assert.NoError(t, err) + + err = cache1.Merge(cache2) + assert.NoError(t, err) + + _, err = cache1.Get(2) + assert.NoError(t, err) // The key 2 should be in cache1 + + _, err = cache1.Get(1) + assert.NoError(t, err) // The key 1 should be in cache1 +} + +func TestMergeWithIncreasingCurrentSize(t *testing.T) { + group := &LRUGroup{counter: atomic.Int64{}} + cache1 := NewMLRU[int, int](group, 4) + err := cache1.Put(3, 300) // fresh entry in cache1 + assert.NoError(t, err) + err = cache1.Put(4, 400) // fresh entry in cache1 + assert.NoError(t, err) + + cache2 := NewMLRU[int, int](group, 2) + err = cache2.Put(1, 100) // older entry in cache2 + assert.NoError(t, err) + err = cache2.Put(2, 200) // older entry in cache2 + assert.NoError(t, err) + + err = cache1.Merge(cache2) + assert.NoError(t, err) + + _, err = cache1.Get(3) + assert.NoError(t, err) // The key 3 should be in cache1 + + _, err = cache1.Get(4) + assert.NoError(t, err) // The key 4 should be in cache1 + + _, err = cache1.Get(1) + assert.NoError(t, err) // The key 1 should be in cache1 + + _, err = cache1.Get(2) + assert.NoError(t, err) // The key 2 should be in cache1 + + assert.Equal(t, int64(4), cache1.currentSize) // The currentSize should be 4 +} + +func TestMergeWithNilCurrentEntry(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + cache2 := NewMLRU[string, string](group, 2) + + // Put some values in cache2 + err := cache2.Put("k1", "v1") + assert.NoError(t, err) + err = cache2.Put("k2", "v2") + assert.NoError(t, err) + + // Merging empty cache1 with cache2 + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // cache1 should have the values from cache2 + value, err := cache1.Get("k1") + assert.NoError(t, err) + assert.Equal(t, "v1", value) + + value, err = cache1.Get("k2") + assert.NoError(t, err) + assert.Equal(t, "v2", value) +} + +func TestMergeAndUpdatePointers(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 3) + cache2 := NewMLRU[string, string](group, 3) + + // Put some values in both caches + err := cache1.Put("k1", "v1") + assert.NoError(t, err) + err = cache1.Put("k2", "v2") + assert.NoError(t, err) + err = cache2.Put("k3", "v3") + assert.NoError(t, err) + + // Merging cache1 with cache2 + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // cache1 should have the values from both caches + value, err := cache1.Get("k1") + assert.NoError(t, err) + assert.Equal(t, "v1", value) + + value, err = cache1.Get("k2") + assert.NoError(t, err) + assert.Equal(t, "v2", value) + + value, err = cache1.Get("k3") + assert.NoError(t, err) + assert.Equal(t, "v3", value) +} + +func TestMergeAndEvict(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + cache2 := NewMLRU[string, string](group, 2) + + // Put some values in both caches + err := cache1.Put("k1", "v1") + assert.NoError(t, err) + err = cache1.Put("k2", "v2") + assert.NoError(t, err) + err = cache2.Put("k3", "v3") + assert.NoError(t, err) + err = cache2.Put("k4", "v4") + assert.NoError(t, err) + + // Merging cache1 with cache2 + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // validate entry chain + require.Equal(t, "k4", cache1.newest.key) + require.Equal(t, "k3", cache1.newest.older.key) + require.Nil(t, cache1.newest.newer) + require.Equal(t, "k3", cache1.oldest.key) + require.Equal(t, "k4", cache1.oldest.newer.key) + require.Nil(t, cache1.oldest.older) + + // cache1 should have the most recent values from both caches + _, err = cache1.Get("k1") + assert.Error(t, err) // "k1" should have been evicted + + value, err := cache1.Get("k2") + assert.Error(t, err) // "k2" should have been evicted + + value, err = cache1.Get("k3") + assert.NoError(t, err) + assert.Equal(t, "v3", value) + + value, err = cache1.Get("k4") + assert.NoError(t, err) + assert.Equal(t, "v4", value) + + // validate entry chain + require.Equal(t, "k4", cache1.newest.key) + require.Equal(t, "k3", cache1.newest.older.key) + require.Nil(t, cache1.newest.newer) + require.Equal(t, "k3", cache1.oldest.key) + require.Equal(t, "k4", cache1.oldest.newer.key) + require.Nil(t, cache1.oldest.older) + + require.Nil(t, cache1.newest.older.older) + require.Nil(t, cache1.oldest.newer.newer) +} + +func TestMergeWithInvalidCache(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + cache2 := NewMLRU[string, string](group, 2) + + // Put some values in cache2 + err := cache2.Put("k1", "v1") + assert.NoError(t, err) + err = cache2.Put("k2", "v2") + assert.NoError(t, err) + + // Invalidate cache2 + cache2.valid = false + + // Merging cache1 with invalid cache2 should result in an error + err = cache1.Merge(cache2) + assert.Error(t, err) + assert.Equal(t, "invalid cache", err.Error()) +} + +func TestMergeWithNilCache(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + + // Merging cache1 with nil cache should result in an error + err := cache1.Merge(nil) + assert.Error(t, err) + assert.Equal(t, "invalid cache", err.Error()) +} + +func TestMergeWithMoreRecentOther(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + cache2 := NewMLRU[string, string](group, 2) + + // Put some values in both caches + err := cache1.Put("k1", "v1") + assert.NoError(t, err) + err = cache2.Put("k2", "v2") + assert.NoError(t, err) + err = cache2.Put("k3", "v3") + assert.NoError(t, err) + + // Merging cache1 with cache2 + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // cache1 should have the most recent values from cache2 + _, err = cache1.Get("k1") + assert.Error(t, err) // "k1" should have been evicted + + value, err := cache1.Get("k2") + assert.NoError(t, err) + assert.Equal(t, "v2", value) + + value, err = cache1.Get("k3") + assert.NoError(t, err) + assert.Equal(t, "v3", value) +} + +func TestMergeWithEmptyOther(t *testing.T) { + group := &LRUGroup{} + cache1 := NewMLRU[string, string](group, 2) + cache2 := NewMLRU[string, string](group, 2) + + // Put some values in cache1 + err := cache1.Put("k1", "v1") + assert.NoError(t, err) + + // Merging cache1 with empty cache2 + err = cache1.Merge(cache2) + assert.NoError(t, err) + + // cache1 should still have the values from before + value, err := cache1.Get("k1") + assert.NoError(t, err) + assert.Equal(t, "v1", value) +}