From dc98d8ced20f863e99c2cba2ac5d2d699b3208b8 Mon Sep 17 00:00:00 2001 From: Alex Gaetano Padula Date: Fri, 13 Feb 2026 18:43:07 -0500 Subject: [PATCH] issue #151 -- preserve tombstones during merges and fix deletion semantics ** add canDropTombstones to mergeSSTables; compute it in callers and preserve tombstones when overlapping SSTables exist ** fix Delete() to always create tombstone nodes (insert via CAS when key is missing from the skiplist) ** make tombstones visible: ** findVisibleVersion() now returns Delete versions ** Iterator.Next() yields deleted entries with nil data for the flusher ** include tombstone keys in SSTable metadata: ** add GetMinKey / GetMaxKey ** update flusher and EntryCount to account for tombstones ** update Go module dependencies ** corrected Windows race by waiting in ForceFlush() for in-progress background flushes to complete ** rid magic numbers, and strings from code --- authors | 3 +- bloomfilter/bloomfilter.go | 18 ++- compactor.go | 121 +++++++++++++++---- compactor_test.go | 233 +++++++++++++++++++++++++++++++++++++ db.go | 46 ++++++-- flusher.go | 25 ++-- go.mod | 2 +- memtable.go | 9 +- skiplist/skiplist.go | 188 ++++++++++++++++++++++-------- skiplist/skiplist_test.go | 7 +- sstable.go | 11 +- txn.go | 9 +- 12 files changed, 554 insertions(+), 118 deletions(-) diff --git a/authors b/authors index 87609ef..45c3f16 100644 --- a/authors +++ b/authors @@ -1 +1,2 @@ -Alex Gaetano Padula \ No newline at end of file +Alex Gaetano Padula +Mehrdad Aksari \ No newline at end of file diff --git a/bloomfilter/bloomfilter.go b/bloomfilter/bloomfilter.go index 2f008a0..21ce91f 100644 --- a/bloomfilter/bloomfilter.go +++ b/bloomfilter/bloomfilter.go @@ -22,6 +22,14 @@ import ( "math" ) +const ( + LowFPRThreshold = 0.01 // False positive rate below which extra space is allocated + LowFPRSizeMultiplier = 1.2 // Multiplier for extra space when FPR is below threshold (20% extra) + SmallDataThreshold = 8 // Byte count below which extra hash mixing is applied + HashMixRightShift = 13 // Right shift for hash mixing of small data + HashMixLeftShift = 37 // Left shift for hash mixing of small data +) + // BloomFilter struct represents a Bloom filter type BloomFilter struct { Bitset []int8 // Bitset, each int8 can store 8 bits @@ -43,9 +51,8 @@ func New(expectedItems uint, falsePositiveRate float64) (*BloomFilter, error) { // Calculate optimal size and add a safety margin for low FPR cases size := optimalSize(expectedItems, falsePositiveRate) - if falsePositiveRate < 0.01 { - // Add 20% extra space for very low FPR targets - size = uint(float64(size) * 1.2) + if falsePositiveRate < LowFPRThreshold { + size = uint(float64(size) * LowFPRSizeMultiplier) } // Make size a prime number (or at least odd) to improve hash distribution @@ -133,9 +140,8 @@ func (bf *BloomFilter) getTwoHashes(data []byte) (uint64, uint64, error) { // It's possible for small data inputs, FNV hashes might be too similar.. // Thus we add an extra mixing step if data is small - if len(data) < 8 { - // Mix h1 and h2 with different patterns - h2 = h2 ^ (h1 >> 13) ^ (h1 << 37) + if len(data) < SmallDataThreshold { + h2 = h2 ^ (h1 >> HashMixRightShift) ^ (h1 << HashMixLeftShift) } return h1, h2, nil diff --git a/compactor.go b/compactor.go index 7d820d8..e8303f3 100644 --- a/compactor.go +++ b/compactor.go @@ -4,15 +4,16 @@ import ( "bytes" "errors" "fmt" - "github.com/wildcatdb/wildcat/v2/blockmanager" - "github.com/wildcatdb/wildcat/v2/tree" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" "os" "sort" "sync" "sync/atomic" "time" + + "github.com/wildcatdb/wildcat/v2/blockmanager" + "github.com/wildcatdb/wildcat/v2/tree" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" ) // compactorJob represents a scheduled compaction @@ -93,7 +94,7 @@ func (compactor *Compactor) checkAndScheduleCompactions() { // Check if last level needs partitioning first lastLevelIdx := len(*levels) - 1 - if lastLevelIdx >= 2 { + if lastLevelIdx >= MinLevelsForPartitioning { lastLevel := (*levels)[lastLevelIdx] if compactor.shouldPartitionLastLevel(lastLevel) { compactor.scheduleLastLevelPartitioning(lastLevel, lastLevelIdx) @@ -122,8 +123,8 @@ func (compactor *Compactor) checkAndScheduleCompactions() { score := sizeScore*compactor.db.opts.CompactionScoreSizeWeight + countScore*compactor.db.opts.CompactionScoreCountWeight // Schedule compaction if score exceeds threshold - if score > 1.0 { - if levelIdx < 2 { + if score > CompactionScoreThreshold { + if levelIdx < SizeTieredMaxLevelIdx { compactor.scheduleSizeTieredCompaction(level, levelIdx, score) } else { compactor.scheduleLeveledCompaction(level, levelIdx, score) @@ -156,8 +157,8 @@ func (compactor *Compactor) scheduleSizeTieredCompaction(level *Level, levelIdx } } - if len(availableTables) < 2 { - compactor.db.log(fmt.Sprintf("Insufficient available SSTables for compaction on level %d (%d available, need at least 2)", levelNum, len(availableTables))) + if len(availableTables) < MinSSTablesToCompact { + compactor.db.log(fmt.Sprintf("Insufficient available SSTables for compaction on level %d (%d available, need at least %d)", levelNum, len(availableTables), MinSSTablesToCompact)) return } @@ -179,7 +180,7 @@ func (compactor *Compactor) scheduleSizeTieredCompaction(level *Level, levelIdx j++ } - if len(similarSized) >= 2 { + if len(similarSized) >= MinSSTablesToCompact { selectedTables = similarSized break } @@ -188,11 +189,11 @@ func (compactor *Compactor) scheduleSizeTieredCompaction(level *Level, levelIdx } // If we couldn't find similar-sized tables, just take the smallest available ones - if len(selectedTables) < 2 && len(availableTables) >= 2 { + if len(selectedTables) < MinSSTablesToCompact && len(availableTables) >= MinSSTablesToCompact { selectedTables = availableTables[:min(compactor.db.opts.CompactionBatchSize, len(availableTables))] } - if len(selectedTables) >= 2 { + if len(selectedTables) >= MinSSTablesToCompact { var reservedTables []*SSTable for _, table := range selectedTables { if atomic.CompareAndSwapInt32(&table.isMerging, 0, 1) { @@ -204,7 +205,7 @@ func (compactor *Compactor) scheduleSizeTieredCompaction(level *Level, levelIdx } // Only proceed if we have at least 2 reserved tables - if len(reservedTables) >= 2 { + if len(reservedTables) >= MinSSTablesToCompact { compactor.compactionQueue = append(compactor.compactionQueue, &compactorJob{ levelIdx: levelIdx, priority: score, @@ -460,8 +461,12 @@ func (compactor *Compactor) compactSSTables(sstables []*SSTable, sourceLevelIdx, } }(vlogBm) + // Determine if tombstones can be safely dropped + dropTombstones := compactor.canDropTombstones(sstables) + compactor.db.log(fmt.Sprintf("Compaction dropTombstones=%v for %d SSTables", dropTombstones, len(sstables))) + // Merge the SSTables - err = compactor.mergeSSTables(sstables, klogBm, vlogBm, newSSTable) + err = compactor.mergeSSTables(sstables, klogBm, vlogBm, newSSTable, dropTombstones) if err != nil { _ = os.Remove(klogTmpPath) _ = os.Remove(vlogTmpPath) @@ -594,9 +599,11 @@ func (compactor *Compactor) compactSSTables(sstables []*SSTable, sourceLevelIdx, return nil } -// mergeSSTables merges multiple SSTables into a new SSTable -func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *blockmanager.BlockManager, output *SSTable) error { - readTs := time.Now().UnixNano() + 10000000000 +// mergeSSTables merges multiple SSTables into a new SSTable. +// When dropTombstones is false, tombstone entries are preserved in the output to prevent +// deleted keys from reappearing if older versions exist in SSTables outside the merge set. +func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *blockmanager.BlockManager, output *SSTable, dropTombstones bool) error { + readTs := time.Now().UnixNano() + FarFutureOffsetNs var iters []*iterState @@ -729,10 +736,8 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b return fmt.Errorf("no valid latest state found for key %s", minKey) } - // Collect entry if not a tombstone (merge output is sorted, so we bulk load at the end) - if latest != nil && latest.ValueBlockID != -1 { - - // Read the value from the original VLog + if latest != nil && latest.ValueBlockID != TombstoneBlockID { + // Non-tombstone entry: copy value to new VLog and record in KLog val, _, err := latestState.vlogBm.Read(latest.ValueBlockID) if err != nil { return fmt.Errorf("failed to read value: %w", err) @@ -755,6 +760,16 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b entryCount++ totalSize += int64(len(latest.Key) + len(val)) } + } else if latest != nil && latest.ValueBlockID == TombstoneBlockID && !dropTombstones { + // Tombstone entry: preserve it if there may be older versions in other SSTables + klogEntry := &KLogEntry{ + Key: latest.Key, + Timestamp: latest.Timestamp, + ValueBlockID: TombstoneBlockID, + } + entries = append(entries, tree.KeyValue{Key: latest.Key, Value: klogEntry}) + entryCount++ + totalSize += int64(len(latest.Key)) } // Advance all iterators with that key @@ -884,14 +899,14 @@ func (compactor *Compactor) scheduleLastLevelPartitioning(lastLevel *Level, leve // Create high-priority partitioning job compactor.compactionQueue = append(compactor.compactionQueue, &compactorJob{ levelIdx: levelIdx, - priority: 10.0, // Highest priority + priority: PartitioningJobPriority, ssTables: reservedTables, targetIdx: -1, // Special marker for partitioning job inProgress: false, }) compactor.db.log(fmt.Sprintf("Scheduled partitioning job with %d SSTables (%.2f MB)", - len(reservedTables), float64(selectedSize)/(1024*1024))) + len(reservedTables), float64(selectedSize)/BytesPerMB)) } // shouldPartitionLastLevel checks if the last level should be partitioned @@ -1096,8 +1111,11 @@ func (compactor *Compactor) redistributeToLevel(tables []*SSTable, targetLevelId } }(vlogBm) + // Determine if tombstones can be safely dropped + dropTombstones := compactor.canDropTombstones(tables) + // Merge the SSTables - err = compactor.mergeSSTables(tables, klogBm, vlogBm, newSSTable) + err = compactor.mergeSSTables(tables, klogBm, vlogBm, newSSTable, dropTombstones) if err != nil { _ = os.Remove(klogPath) _ = os.Remove(vlogPath) @@ -1198,7 +1216,7 @@ func (compactor *Compactor) removeSSTablesFromLevel(tablesToRemove []*SSTable, l } compactor.db.log(fmt.Sprintf("Removed %d SSTables from level %d, freed %.2f MB", - len(tablesToRemove), levelNum, float64(totalRemovedSize)/(1024*1024))) + len(tablesToRemove), levelNum, float64(totalRemovedSize)/BytesPerMB)) return nil } @@ -1235,6 +1253,59 @@ func (compactor *Compactor) shouldCompact() bool { return false } +// canDropTombstones checks whether tombstones can be safely dropped during a merge. +// Tombstones can only be dropped if no SSTable outside the merge set (across all levels) +// has a key range that overlaps with the merge's key range. If any such SSTable exists, +// tombstones must be preserved to prevent deleted keys from reappearing (zombie entries). +func (compactor *Compactor) canDropTombstones(mergingTables []*SSTable) bool { + // Build a set of merging table IDs for fast lookup + mergingIDs := make(map[int64]bool, len(mergingTables)) + for _, t := range mergingTables { + mergingIDs[t.Id] = true + } + + // Compute the overall key range of the merge set + var mergeMin, mergeMax []byte + for _, t := range mergingTables { + if mergeMin == nil || bytes.Compare(t.Min, mergeMin) < 0 { + mergeMin = t.Min + } + if mergeMax == nil || bytes.Compare(t.Max, mergeMax) > 0 { + mergeMax = t.Max + } + } + + if mergeMin == nil || mergeMax == nil { + return true // No key range to check + } + + // Check all SSTables across all levels for overlapping key ranges + levels := compactor.db.levels.Load() + if levels == nil { + return true + } + + for _, level := range *levels { + sstables := level.sstables.Load() + if sstables == nil { + continue + } + + for _, sst := range *sstables { + if mergingIDs[sst.Id] { + continue // Skip SSTables in the merge set + } + + // Check for key range overlap: sst.Max >= mergeMin && sst.Min <= mergeMax + if bytes.Compare(sst.Max, mergeMin) >= 0 && bytes.Compare(sst.Min, mergeMax) <= 0 { + return false // Overlapping SSTable found, cannot drop tombstones + } + } + } + + return true // No overlapping SSTables, safe to drop tombstones +} + // getOrOpenBM retrieves a BlockManager from the LRU cache or opens it if not found func getOrOpenBM(db *DB, path string) (*blockmanager.BlockManager, error) { var ok bool diff --git a/compactor_test.go b/compactor_test.go index fdc24a7..3b4db42 100644 --- a/compactor_test.go +++ b/compactor_test.go @@ -2217,3 +2217,236 @@ func TestCompactor_CooldownPeriod(t *testing.T) { } db.compactor.scoreLock.Unlock() } + +// TestZombieEntry_DirectReproduction verifies that a deleted key does not reappear +// when SSTables containing the original value and its tombstone are compacted separately. +// This is a regression test for the zombie entry bug in size-tiered compaction. +func TestZombieEntry_DirectReproduction(t *testing.T) { + dir, err := os.MkdirTemp("", "zombie_direct_test") + if err != nil { + t.Fatalf("Failed to create temp directory: %v", err) + } + defer os.RemoveAll(dir) + + opts := &Options{ + Directory: dir, + WriteBufferSize: 8 * 1024, + SyncOption: SyncNone, + STDOutLogging: true, + } + + db, err := Open(opts) + if err != nil { + t.Fatalf("Failed to open database: %v", err) + } + defer db.Close() + + targetKey := []byte("direct_zombie_key") + originalValue := []byte("alive") + + // Step 1: Create SSTable A with target key + err = db.Update(func(txn *Txn) error { + return txn.Put(targetKey, originalValue) + }) + if err != nil { + t.Fatalf("Put failed: %v", err) + } + db.ForceFlush() + time.Sleep(200 * time.Millisecond) + + // Get SSTable A reference + levels := db.levels.Load() + l1 := (*levels)[0] + sstablesL1 := l1.sstables.Load() + sstA := (*sstablesL1)[0] + sstAId := sstA.Id + t.Logf("SSTable A: ID=%d, Size=%d", sstA.Id, sstA.Size) + + // Step 2: Delete key and create SSTable B with lots of padding + err = db.Update(func(txn *Txn) error { + if err := txn.Delete(targetKey); err != nil { + return err + } + // Add padding to make it larger (creates size disparity) + for i := 0; i < 100; i++ { + if err := txn.Put([]byte(fmt.Sprintf("pad%04d", i)), make([]byte, 50)); err != nil { + return err + } + } + return nil + }) + if err != nil { + t.Fatalf("Delete/padding failed: %v", err) + } + db.ForceFlush() + time.Sleep(200 * time.Millisecond) + + // Find SSTable B + sstablesL1 = l1.sstables.Load() + var sstB *SSTable + var sstBId int64 + for _, sst := range *sstablesL1 { + if sst.Id != sstAId && (sstB == nil || sst.Size > sstB.Size) { + sstB = sst + sstBId = sst.Id + } + } + t.Logf("SSTable B: ID=%d, Size=%d", sstB.Id, sstB.Size) + t.Logf("Size ratio B/A: %.2f (threshold is 1.5)", float64(sstB.Size)/float64(sstA.Size)) + + // Step 3: Compact SSTable A alone to L2 + sstablesL1 = l1.sstables.Load() + var freshA *SSTable + for _, sst := range *sstablesL1 { + if sst.Id == sstAId { + freshA = sst + break + } + } + atomic.StoreInt32(&freshA.isMerging, 1) + err = db.compactor.compactSSTables([]*SSTable{freshA}, 0, 1) + if err != nil { + t.Fatalf("Compaction of A failed: %v", err) + } + time.Sleep(200 * time.Millisecond) + + // Step 4: Compact SSTable B alone to L2 + sstablesL1 = l1.sstables.Load() + var freshB *SSTable + for _, sst := range *sstablesL1 { + if sst.Id == sstBId { + freshB = sst + break + } + } + if freshB != nil { + atomic.StoreInt32(&freshB.isMerging, 1) + db.compactor.compactSSTables([]*SSTable{freshB}, 0, 1) + } + time.Sleep(200 * time.Millisecond) + + // Step 5: Check for zombie - the deleted key must NOT reappear + var val []byte + var getErr error + db.View(func(txn *Txn) error { + val, getErr = txn.Get(targetKey) + return nil + }) + + if getErr == nil && val != nil { + t.Errorf("ZOMBIE DETECTED: Expected key to be deleted, but got: %s", string(val)) + } else { + t.Logf("PASS: Key correctly remains deleted after separate compactions") + } +} + +// TestZombieEntry_SizeTieredCompactionVulnerability tests a more realistic scenario with +// multiple SSTables where size-tiered grouping could separate tombstones from their targets. +func TestZombieEntry_SizeTieredCompactionVulnerability(t *testing.T) { + dir, err := os.MkdirTemp("", "zombie_entry_test") + if err != nil { + t.Fatalf("Failed to create temp directory: %v", err) + } + defer os.RemoveAll(dir) + + opts := &Options{ + Directory: dir, + WriteBufferSize: 512, + SyncOption: SyncNone, + CompactionSizeThreshold: 4, + CompactionBatchSize: 2, + CompactionSizeTieredSimilarityRatio: 1.5, + CompactionCooldownPeriod: 50 * time.Millisecond, + CompactorTickerInterval: 100 * time.Millisecond, + } + + db, err := Open(opts) + if err != nil { + t.Fatalf("Failed to open database: %v", err) + } + defer db.Close() + + targetKey := []byte("zombie_key") + originalValue := []byte("i_should_be_dead") + + // Step 1: Create small SSTable with target key + db.Update(func(txn *Txn) error { + return txn.Put(targetKey, originalValue) + }) + db.ForceFlush() + time.Sleep(100 * time.Millisecond) + + levels := db.levels.Load() + level1 := (*levels)[0] + sstableA := (*level1.sstables.Load())[0] + t.Logf("SSTable A: ID=%d, Size=%d bytes", sstableA.Id, sstableA.Size) + + // Step 2: Create medium-sized SSTables (similar size group) + for batch := 0; batch < 3; batch++ { + db.Update(func(txn *Txn) error { + for i := 0; i < 20; i++ { + key := []byte(fmt.Sprintf("medium_batch%d_key%03d", batch, i)) + value := make([]byte, 50) + txn.Put(key, value) + } + return nil + }) + db.ForceFlush() + time.Sleep(100 * time.Millisecond) + } + + // Step 3: Delete target key and create LARGE SSTable + db.Update(func(txn *Txn) error { + txn.Delete(targetKey) + for i := 0; i < 150; i++ { + key := []byte(fmt.Sprintf("padding_key_%04d", i)) + value := make([]byte, 100) + txn.Put(key, value) + } + return nil + }) + db.ForceFlush() + time.Sleep(100 * time.Millisecond) + + // Find large SSTable with tombstone + sstablesAfterLarge := level1.sstables.Load() + var sstableLarge *SSTable + for _, sst := range *sstablesAfterLarge { + if sst.Id != sstableA.Id && (sstableLarge == nil || sst.Size > sstableLarge.Size) { + sstableLarge = sst + } + } + + ratio := float64(sstableLarge.Size) / float64(sstableA.Size) + t.Logf("SSTable Large: ID=%d, Size=%d bytes", sstableLarge.Id, sstableLarge.Size) + t.Logf("Size ratio: %.2f (threshold is 1.5)", ratio) + + // Step 4: Compact large SSTable (with tombstone) to L2 + atomic.StoreInt32(&sstableLarge.isMerging, 1) + db.compactor.compactSSTables([]*SSTable{sstableLarge}, 0, 1) + time.Sleep(200 * time.Millisecond) + + // Step 5: Compact small SSTable (with value) to L2 + sstablesL1 := level1.sstables.Load() + for _, sst := range *sstablesL1 { + if sst.Id == sstableA.Id { + atomic.StoreInt32(&sst.isMerging, 1) + db.compactor.compactSSTables([]*SSTable{sst}, 0, 1) + break + } + } + time.Sleep(200 * time.Millisecond) + + // Step 6: Check for zombie - the deleted key must NOT reappear + var val []byte + db.View(func(txn *Txn) error { + val, _ = txn.Get(targetKey) + return nil + }) + + if val != nil { + t.Errorf("ZOMBIE DETECTED: Expected key to be deleted, but got: %s", string(val)) + } else { + t.Logf("PASS: Key correctly remains deleted after size-disparate compactions") + } +} diff --git a/db.go b/db.go index 2db02e7..d186f8e 100644 --- a/db.go +++ b/db.go @@ -18,10 +18,6 @@ package wildcat import ( "errors" "fmt" - "github.com/wildcatdb/wildcat/v2/blockmanager" - "github.com/wildcatdb/wildcat/v2/buffer" - "github.com/wildcatdb/wildcat/v2/lru" - "github.com/wildcatdb/wildcat/v2/skiplist" "math" "os" "sort" @@ -30,6 +26,11 @@ import ( "sync/atomic" "time" "unicode/utf8" + + "github.com/wildcatdb/wildcat/v2/blockmanager" + "github.com/wildcatdb/wildcat/v2/buffer" + "github.com/wildcatdb/wildcat/v2/lru" + "github.com/wildcatdb/wildcat/v2/skiplist" ) // SyncOption is a block manager sync option that can be set for a *DB instance. @@ -53,6 +54,22 @@ const ( TempFileExtension = ".tmp" // Temporary file extension for intermediate files ) +// Internal constants used across the codebase to avoid magic numbers +const ( + FarFutureOffsetNs = 10_000_000_000 // 10 seconds in nanoseconds; added to time.Now().UnixNano() to scan all versions + TombstoneBlockID int64 = -1 // Sentinel ValueBlockID indicating a deletion marker in KLogEntry + FlushTargetLevel = 1 // Memtables are always flushed to level 1 (L0 is the active memtable) + CompactionScoreThreshold = 1.0 // Compaction score above which a level triggers compaction + PartitioningJobPriority = 10.0 // Highest priority value for last-level partitioning jobs + MinSSTablesToCompact = 2 // Minimum number of SSTables required to perform a compaction + SizeTieredMaxLevelIdx = 2 // Levels with index < this use size-tiered compaction; others use leveled + MinLevelsForPartitioning = 2 // Minimum last-level index required before partitioning is considered + MinBTreeOrder = 2 // Minimum allowed B-tree order for SSTables + BytesPerMB = 1024 * 1024 // Bytes in a megabyte, used for human-readable log output + JitterFraction = 4 // Divides backoff to produce ±25% jitter + JitterCoinFlip = 2 // Used with rand.Intn for 50/50 jitter direction +) + // Defaults const ( DefaultWriteBufferSize = 64 * 1024 * 1024 // Default write buffer size @@ -360,7 +377,7 @@ func (opts *Options) setDefaults() error { opts.SSTableBTreeOrder = DefaultSSTableBTreeOrder } else { // Ensure the B-tree order is reasonable - if opts.SSTableBTreeOrder < 2 { + if opts.SSTableBTreeOrder < MinBTreeOrder { return fmt.Errorf("SSTable B-tree order must be at least 2, got %d", opts.SSTableBTreeOrder) } @@ -743,7 +760,7 @@ func (db *DB) reinstate() error { db.log(fmt.Sprintf("Recovered %d transactions total, %d committed, %d active", len(globalTxnMap), committedCount, activeCount)) db.log(fmt.Sprintf("Active memtable size: %d bytes with %d entries", - atomic.LoadInt64(&activeMemt.size), activeMemt.skiplist.Count(time.Now().UnixNano()+10000000000))) + atomic.LoadInt64(&activeMemt.size), activeMemt.skiplist.Count(time.Now().UnixNano()+FarFutureOffsetNs))) return nil } @@ -877,6 +894,15 @@ func (db *DB) ForceFlush() error { return errors.New("database is nil") } + // Wait for any in-progress background flush to complete. + // The background flusher may have already dequeued a memtable from the + // immutable queue but not yet finished writing its SSTable. Without this + // wait, ForceFlush could return before that SSTable is added to level 1, + // causing reads to miss recently committed data. + for db.flusher.flushing.Load() != nil { + time.Sleep(1 * time.Millisecond) + } + // Force flush all memtables err := db.flusher.flushMemtable(db.memtable.Load().(*Memtable)) if err != nil { @@ -908,19 +934,19 @@ func (db *DB) totalEntries() int64 { // Count entries in the active memtable if activeMemt, ok := db.memtable.Load().(*Memtable); ok { - total += int64(activeMemt.skiplist.Count(time.Now().UnixNano() + 10000000000)) + total += int64(activeMemt.skiplist.Count(time.Now().UnixNano() + FarFutureOffsetNs)) } // We need to check if we have a flushing memtable fmem := db.flusher.flushing.Load() if fmem != nil { - total += int64(fmem.skiplist.Count(time.Now().UnixNano() + 10000000000)) + total += int64(fmem.skiplist.Count(time.Now().UnixNano() + FarFutureOffsetNs)) } // Count entries in immutable memtables db.flusher.immutable.ForEach(func(item interface{}) bool { if memt, ok := item.(*Memtable); ok { - total += int64(memt.skiplist.Count(time.Now().UnixNano() + 10000000000)) + total += int64(memt.skiplist.Count(time.Now().UnixNano() + FarFutureOffsetNs)) } return true // Continue iteration }) @@ -1021,7 +1047,7 @@ func (db *DB) Stats() string { }, Values: []any{ atomic.LoadInt64(&db.memtable.Load().(*Memtable).size), - db.memtable.Load().(*Memtable).skiplist.Count(time.Now().UnixNano() + 10000000000), + db.memtable.Load().(*Memtable).skiplist.Count(time.Now().UnixNano() + FarFutureOffsetNs), db.txnBuffer.Count(), len(db.flusher.immutable.List()), func() int { sstables := 0 levels := db.levels.Load() diff --git a/flusher.go b/flusher.go index b10c2c4..74665b8 100644 --- a/flusher.go +++ b/flusher.go @@ -2,13 +2,14 @@ package wildcat import ( "fmt" + "os" + "sync/atomic" + "time" + "github.com/wildcatdb/wildcat/v2/blockmanager" "github.com/wildcatdb/wildcat/v2/queue" "github.com/wildcatdb/wildcat/v2/skiplist" "github.com/wildcatdb/wildcat/v2/tree" - "os" - "sync/atomic" - "time" ) // Flusher is responsible for queuing and flushing memtables to disk @@ -108,7 +109,7 @@ func (flusher *Flusher) backgroundProcess() { // flushMemtable flushes a memtable to disk as an SSTable at level 1 func (flusher *Flusher) flushMemtable(memt *Memtable) error { - maxTimestamp := time.Now().UnixNano() + 10000000000 // Far in the future + maxTimestamp := time.Now().UnixNano() + FarFutureOffsetNs entryCount := memt.skiplist.Count(maxTimestamp) deletionCount := memt.skiplist.DeleteCount(maxTimestamp) @@ -128,19 +129,22 @@ func (flusher *Flusher) flushMemtable(memt *Memtable) error { sstable := &SSTable{ Id: flusher.db.sstIdGenerator.nextID(), db: flusher.db, - Level: 1, // We always flush to level 1, L0 is active memtable + Level: FlushTargetLevel, } // Use max timestamp to ensure we get all keys when finding min/max - maxPossibleTs := time.Now().UnixNano() + 10000000000 // Far in the future + maxPossibleTs := time.Now().UnixNano() + FarFutureOffsetNs // Min and max keys are for sstable metadata - minKey, _, exists := memt.skiplist.GetMin(maxPossibleTs) + // Use GetMinKey/GetMaxKey to include tombstone (delete) keys in the range, + // ensuring the SSTable bounds correctly cover all entries for proper lookups + // and overlap detection during compaction. + minKey, exists := memt.skiplist.GetMinKey(maxPossibleTs) if exists { sstable.Min = minKey } - maxKey, _, exists := memt.skiplist.GetMax(maxPossibleTs) + maxKey, exists := memt.skiplist.GetMaxKey(maxPossibleTs) if exists { sstable.Max = maxKey } @@ -151,7 +155,8 @@ func (flusher *Flusher) flushMemtable(memt *Memtable) error { sstable.Size = atomic.LoadInt64(&memt.size) // Use max timestamp to get a count of all entries regardless of version - sstable.EntryCount = memt.skiplist.Count(maxPossibleTs) + // Include both writes and deletions for accurate entry count + sstable.EntryCount = memt.skiplist.Count(maxPossibleTs) + memt.skiplist.DeleteCount(maxPossibleTs) sstable.Timestamp = latestTs // We create new sstable files (.klog and .vlog) here @@ -217,7 +222,7 @@ func (flusher *Flusher) flushMemtable(memt *Memtable) error { Value: &KLogEntry{ Key: key, Timestamp: ts, - ValueBlockID: -1, // Special marker for deletion + ValueBlockID: TombstoneBlockID, }, }) } else { diff --git a/go.mod b/go.mod index 11f72b7..3113e2f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/wildcatdb/wildcat/v2 -go 1.25 +go 1.26 require go.mongodb.org/mongo-driver v1.17.3 diff --git a/memtable.go b/memtable.go index aa2a5ab..c972a07 100644 --- a/memtable.go +++ b/memtable.go @@ -2,12 +2,13 @@ package wildcat import ( "fmt" - "github.com/wildcatdb/wildcat/v2/blockmanager" - "github.com/wildcatdb/wildcat/v2/bloomfilter" - "github.com/wildcatdb/wildcat/v2/skiplist" "os" "sync/atomic" "time" + + "github.com/wildcatdb/wildcat/v2/blockmanager" + "github.com/wildcatdb/wildcat/v2/bloomfilter" + "github.com/wildcatdb/wildcat/v2/skiplist" ) // A memtable contains a skiplist and a write-ahead log (WAL) for durability, they are paired. @@ -140,7 +141,7 @@ func (memtable *Memtable) replay(activeTxns *[]*Txn) error { // createBloomFilter Creates a bloom filter from skiplist func (memtable *Memtable) createBloomFilter(entries int64) (*bloomfilter.BloomFilter, error) { - maxPossibleTs := time.Now().UnixNano() + 10000000000 // Far in the future + maxPossibleTs := time.Now().UnixNano() + FarFutureOffsetNs iter, err := memtable.skiplist.NewIterator(nil, maxPossibleTs) if err != nil { return nil, err diff --git a/skiplist/skiplist.go b/skiplist/skiplist.go index 4b6087d..b70102f 100644 --- a/skiplist/skiplist.go +++ b/skiplist/skiplist.go @@ -39,16 +39,16 @@ const ( // ValueVersion represents a single version of a value in MVCC type ValueVersion struct { - Data []byte // Value data - Timestamp int64 // Version timestamp - Type ValueVersionType // Type of version (write or delete) + Data []byte // Value data + Timestamp int64 // Version timestamp + Type ValueVersionType // Type of version (write or delete) Next atomic.Pointer[ValueVersion] // Atomic pointer to the previous version } // Node represents a node in the skip list type Node struct { forward [MaxLevel]atomic.Pointer[Node] // Array of atomic pointers to Node - backward atomic.Pointer[Node] // Atomic pointer to the previous node + backward atomic.Pointer[Node] // Atomic pointer to the previous node key []byte // Key used for searches versions atomic.Pointer[ValueVersion] // Atomic pointer to the head of version chain } @@ -158,7 +158,9 @@ func (n *Node) getLatestVersion() *ValueVersion { return n.versions.Load() } -// findVisibleVersion finds the latest version visible at the given timestamp +// findVisibleVersion finds the latest version visible at the given timestamp. +// Returns the version even if it is a Delete marker. Callers must check +// version.Type to distinguish between live values and tombstones. func (n *Node) findVisibleVersion(readTimestamp int64) *ValueVersion { if n == nil { return nil @@ -170,12 +172,6 @@ func (n *Node) findVisibleVersion(readTimestamp int64) *ValueVersion { // Traverse the version chain atomically for version != nil { if version.Timestamp <= readTimestamp { - // Check if it's a delete version - if version.Type == Delete { - // This key was deleted at or before our read timestamp - return nil - } - // Found a valid version return version } // Try older version - atomic load @@ -382,58 +378,110 @@ func (sl *SkipList) Put(searchKey []byte, newValue []byte, writeTimestamp int64) } // Delete adds a delete marker in the version chain at the given timestamp -// This follows MVCC principles where deletes are just another version +// This follows MVCC principles where deletes are just another version. +// If the key does not exist in the skip list, a new node is created with +// a delete marker to ensure the tombstone is persisted through flush to SSTable. func (sl *SkipList) Delete(searchKey []byte, deleteTimestamp int64) bool { - var prev *Node - var curr *Node + topLevel := 0 - prev = sl.header + for { + var existingNode *Node + var update [MaxLevel]*Node + var prev *Node + var curr *Node - // Get current level - currentLevel := sl.getLevel() + prev = sl.header + currentLevel := sl.getLevel() - // For each level, starting at the highest level in the list - for i := currentLevel - 1; i >= 0; i-- { - // Get the next node (atomic load) - curr = prev.forward[i].Load() + // Navigate to the target position + for i := currentLevel - 1; i >= 0; i-- { + curr = prev.forward[i].Load() + for curr != nil { + if sl.comparator(curr.key, searchKey) >= 0 { + break + } + prev = curr + curr = curr.forward[i].Load() + } + update[i] = prev + } - // Traverse the current level + // Check for existing key at level 0 + curr = prev.forward[0].Load() for curr != nil { - - // If the current key is greater or equal, stop traversing this level - if sl.comparator(curr.key, searchKey) >= 0 { + cmp := sl.comparator(curr.key, searchKey) + if cmp == 0 { + existingNode = curr + break + } + if cmp > 0 { break } - - // Move forward prev = curr - curr = curr.forward[i].Load() + curr = curr.forward[0].Load() + update[0] = prev } - } - // Check bottom level for exact match - curr = prev.forward[0].Load() + // If key exists, add delete version to existing node + if existingNode != nil { + existingNode.addVersion(nil, deleteTimestamp, Delete) + return true + } - // Search for the node at level 0 - for curr != nil { + // Key not found — insert a new node with a Delete version. + // This is essential for keys previously flushed to SSTables: + // without a tombstone in the memtable, the delete would be lost on flush. + if topLevel == 0 { + topLevel = sl.randomLevel() + } - // Check if we found the key - cmp := sl.comparator(curr.key, searchKey) - if cmp == 0 { - curr.addVersion(nil, deleteTimestamp, Delete) - return true + if topLevel > currentLevel { + for i := currentLevel; i < topLevel; i++ { + update[i] = sl.header + } + sl.level.CompareAndSwap(currentLevel, topLevel) } - // If we've gone too far, the key doesn't exist - if cmp > 0 { - return false + keyClone := make([]byte, len(searchKey)) + copy(keyClone, searchKey) + + newNode := &Node{ + key: keyClone, } + newNode.addVersion(nil, deleteTimestamp, Delete) - // Move to the next node - curr = curr.forward[0].Load() - } + insertSuccess := true + for i := 0; i < topLevel; i++ { + for { + next := update[i] + if next == nil { + insertSuccess = false + break + } + nextNode := next.forward[i].Load() + newNode.forward[i].Store(nextNode) + if next.forward[i].CompareAndSwap(nextNode, newNode) { + if i == 0 { + if nextNode != nil { + nextNode.backward.Store(newNode) + } + newNode.backward.Store(next) + } + break + } + insertSuccess = false + break + } + if !insertSuccess { + break + } + } - return false + if insertSuccess { + return true + } + // Retry if insertion failed due to concurrent modifications + } } // NewIterator creates a new iterator starting at the given key @@ -504,8 +552,9 @@ func (it *Iterator) Value() ([]byte, int64, bool) { return nil, 0, false } -// Next moves the iterator to the next node and returns the key and visible version -// Returns nil, nil, false if there are no more nodes or no visible versions +// Next moves the iterator to the next node and returns the key and visible version. +// Delete markers are returned with nil data so callers (e.g. the flusher) can +// persist tombstones. Nodes with no visible version are skipped. func (it *Iterator) Next() ([]byte, []byte, int64, bool) { if it.current == nil { return nil, nil, 0, false @@ -517,7 +566,7 @@ func (it *Iterator) Next() ([]byte, []byte, int64, bool) { // Return the key and visible version at the read timestamp if it.current != nil { version := it.current.findVisibleVersion(it.readTimestamp) - if version != nil && version.Type != Delete { + if version != nil { return it.current.key, version.Data, version.Timestamp, true } @@ -643,6 +692,48 @@ func (sl *SkipList) GetMax(readTimestamp int64) ([]byte, []byte, bool) { return nil, nil, false } +// GetMinKey retrieves the minimum key from the skip list, including delete markers. +// This is used for SSTable metadata to ensure tombstone keys are included in the key range. +func (sl *SkipList) GetMinKey(readTimestamp int64) ([]byte, bool) { + curr := sl.header + curr = curr.forward[0].Load() + + for curr != nil { + version := curr.findVisibleVersion(readTimestamp) + if version != nil { + return curr.key, true + } + curr = curr.forward[0].Load() + } + + return nil, false +} + +// GetMaxKey retrieves the maximum key from the skip list, including delete markers. +// This is used for SSTable metadata to ensure tombstone keys are included in the key range. +func (sl *SkipList) GetMaxKey(readTimestamp int64) ([]byte, bool) { + curr := sl.header + var lastKey []byte + + for { + curr = curr.forward[0].Load() + if curr == nil { + break + } + + version := curr.findVisibleVersion(readTimestamp) + if version != nil { + lastKey = curr.key + } + } + + if lastKey != nil { + return lastKey, true + } + + return nil, false +} + // Count returns the number of entries visible at the given timestamp func (sl *SkipList) Count(readTimestamp int64) int { curr := sl.header @@ -1227,4 +1318,3 @@ func (it *RangeIterator) ToFirst() { it.current = nil } - diff --git a/skiplist/skiplist_test.go b/skiplist/skiplist_test.go index c490aa8..a5a084b 100644 --- a/skiplist/skiplist_test.go +++ b/skiplist/skiplist_test.go @@ -1188,11 +1188,12 @@ func TestGetLatestTimestamp(t *testing.T) { t.Errorf("Expected latest timestamp %d after delete with older timestamp, got %d", baseTS+400, latestTS) } - // Delete non-existent key should not affect timestamp + // Delete non-existent key now inserts a tombstone node, updating the timestamp. + // This is required so tombstones for previously-flushed keys are persisted to SSTables. sl.Delete([]byte("nonexistent"), baseTS+500) latestTS = sl.GetLatestTimestamp() - if latestTS != baseTS+400 { - t.Errorf("Expected latest timestamp %d after deleting non-existent key, got %d", baseTS+400, latestTS) + if latestTS != baseTS+500 { + t.Errorf("Expected latest timestamp %d after deleting non-existent key, got %d", baseTS+500, latestTS) } // Add another write operation with the highest timestamp yet diff --git a/sstable.go b/sstable.go index 48587f0..460fc6d 100644 --- a/sstable.go +++ b/sstable.go @@ -3,15 +3,16 @@ package wildcat import ( "bytes" "fmt" + "os" + "strconv" + "strings" + "sync/atomic" + "github.com/wildcatdb/wildcat/v2/blockmanager" "github.com/wildcatdb/wildcat/v2/bloomfilter" "github.com/wildcatdb/wildcat/v2/tree" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" - "os" - "strconv" - "strings" - "sync/atomic" ) // SSTable represents a sorted string table @@ -144,7 +145,7 @@ func (sst *SSTable) get(key []byte, readTimestamp int64) ([]byte, int64) { // Only return if this version is visible to the read timestamp if entry.Timestamp <= readTimestamp { - if entry.ValueBlockID == -1 { + if entry.ValueBlockID == TombstoneBlockID { return nil, entry.Timestamp // Return nil value but valid timestamp for deletion } v := sst.readValueFromVLog(entry.ValueBlockID) diff --git a/txn.go b/txn.go index efb022b..12e1659 100644 --- a/txn.go +++ b/txn.go @@ -3,8 +3,6 @@ package wildcat import ( "errors" "fmt" - "github.com/wildcatdb/wildcat/v2/blockmanager" - "github.com/wildcatdb/wildcat/v2/tree" "math/rand" "os" "strings" @@ -12,6 +10,9 @@ import ( "sync/atomic" "syscall" "time" + + "github.com/wildcatdb/wildcat/v2/blockmanager" + "github.com/wildcatdb/wildcat/v2/tree" ) // Txn represents a transaction in a Wildcat DB instance @@ -64,8 +65,8 @@ func (db *DB) Begin() (*Txn, error) { } // Add jitter (± 25% randomization) - jitter := time.Duration(rand.Int63n(int64(sleepTime / 4))) - if rand.Intn(2) == 0 { + jitter := time.Duration(rand.Int63n(int64(sleepTime / JitterFraction))) + if rand.Intn(JitterCoinFlip) == 0 { sleepTime += jitter } else { sleepTime -= jitter