Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion authors
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
Alex Gaetano Padula <me@alexpadula.com>
Alex Gaetano Padula <me@alexpadula.com>
Mehrdad Aksari <https://mehrdad3301.github.io/>
18 changes: 12 additions & 6 deletions bloomfilter/bloomfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ import (
"math"
)

const (
LowFPRThreshold = 0.01 // False positive rate below which extra space is allocated
LowFPRSizeMultiplier = 1.2 // Multiplier for extra space when FPR is below threshold (20% extra)
SmallDataThreshold = 8 // Byte count below which extra hash mixing is applied
HashMixRightShift = 13 // Right shift for hash mixing of small data
HashMixLeftShift = 37 // Left shift for hash mixing of small data
)

// BloomFilter struct represents a Bloom filter
type BloomFilter struct {
Bitset []int8 // Bitset, each int8 can store 8 bits
Expand All @@ -43,9 +51,8 @@ func New(expectedItems uint, falsePositiveRate float64) (*BloomFilter, error) {

// Calculate optimal size and add a safety margin for low FPR cases
size := optimalSize(expectedItems, falsePositiveRate)
if falsePositiveRate < 0.01 {
// Add 20% extra space for very low FPR targets
size = uint(float64(size) * 1.2)
if falsePositiveRate < LowFPRThreshold {
size = uint(float64(size) * LowFPRSizeMultiplier)
}

// Make size a prime number (or at least odd) to improve hash distribution
Expand Down Expand Up @@ -133,9 +140,8 @@ func (bf *BloomFilter) getTwoHashes(data []byte) (uint64, uint64, error) {

// It's possible for small data inputs, FNV hashes might be too similar..
// Thus we add an extra mixing step if data is small
if len(data) < 8 {
// Mix h1 and h2 with different patterns
h2 = h2 ^ (h1 >> 13) ^ (h1 << 37)
if len(data) < SmallDataThreshold {
h2 = h2 ^ (h1 >> HashMixRightShift) ^ (h1 << HashMixLeftShift)
}

return h1, h2, nil
Expand Down
121 changes: 96 additions & 25 deletions compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"bytes"
"errors"
"fmt"
"github.com/wildcatdb/wildcat/v2/blockmanager"
"github.com/wildcatdb/wildcat/v2/tree"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"os"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/wildcatdb/wildcat/v2/blockmanager"
"github.com/wildcatdb/wildcat/v2/tree"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

// compactorJob represents a scheduled compaction
Expand Down Expand Up @@ -93,7 +94,7 @@ func (compactor *Compactor) checkAndScheduleCompactions() {

// Check if last level needs partitioning first
lastLevelIdx := len(*levels) - 1
if lastLevelIdx >= 2 {
if lastLevelIdx >= MinLevelsForPartitioning {
lastLevel := (*levels)[lastLevelIdx]
if compactor.shouldPartitionLastLevel(lastLevel) {
compactor.scheduleLastLevelPartitioning(lastLevel, lastLevelIdx)
Expand Down Expand Up @@ -122,8 +123,8 @@ func (compactor *Compactor) checkAndScheduleCompactions() {
score := sizeScore*compactor.db.opts.CompactionScoreSizeWeight + countScore*compactor.db.opts.CompactionScoreCountWeight

// Schedule compaction if score exceeds threshold
if score > 1.0 {
if levelIdx < 2 {
if score > CompactionScoreThreshold {
if levelIdx < SizeTieredMaxLevelIdx {
compactor.scheduleSizeTieredCompaction(level, levelIdx, score)
} else {
compactor.scheduleLeveledCompaction(level, levelIdx, score)
Expand Down Expand Up @@ -156,8 +157,8 @@ func (compactor *Compactor) scheduleSizeTieredCompaction(level *Level, levelIdx
}
}

if len(availableTables) < 2 {
compactor.db.log(fmt.Sprintf("Insufficient available SSTables for compaction on level %d (%d available, need at least 2)", levelNum, len(availableTables)))
if len(availableTables) < MinSSTablesToCompact {
compactor.db.log(fmt.Sprintf("Insufficient available SSTables for compaction on level %d (%d available, need at least %d)", levelNum, len(availableTables), MinSSTablesToCompact))
return
}

Expand All @@ -179,7 +180,7 @@ func (compactor *Compactor) scheduleSizeTieredCompaction(level *Level, levelIdx
j++
}

if len(similarSized) >= 2 {
if len(similarSized) >= MinSSTablesToCompact {
selectedTables = similarSized
break
}
Expand All @@ -188,11 +189,11 @@ func (compactor *Compactor) scheduleSizeTieredCompaction(level *Level, levelIdx
}

// If we couldn't find similar-sized tables, just take the smallest available ones
if len(selectedTables) < 2 && len(availableTables) >= 2 {
if len(selectedTables) < MinSSTablesToCompact && len(availableTables) >= MinSSTablesToCompact {
selectedTables = availableTables[:min(compactor.db.opts.CompactionBatchSize, len(availableTables))]
}

if len(selectedTables) >= 2 {
if len(selectedTables) >= MinSSTablesToCompact {
var reservedTables []*SSTable
for _, table := range selectedTables {
if atomic.CompareAndSwapInt32(&table.isMerging, 0, 1) {
Expand All @@ -204,7 +205,7 @@ func (compactor *Compactor) scheduleSizeTieredCompaction(level *Level, levelIdx
}

// Only proceed if we have at least 2 reserved tables
if len(reservedTables) >= 2 {
if len(reservedTables) >= MinSSTablesToCompact {
compactor.compactionQueue = append(compactor.compactionQueue, &compactorJob{
levelIdx: levelIdx,
priority: score,
Expand Down Expand Up @@ -460,8 +461,12 @@ func (compactor *Compactor) compactSSTables(sstables []*SSTable, sourceLevelIdx,
}
}(vlogBm)

// Determine if tombstones can be safely dropped
dropTombstones := compactor.canDropTombstones(sstables)
compactor.db.log(fmt.Sprintf("Compaction dropTombstones=%v for %d SSTables", dropTombstones, len(sstables)))

// Merge the SSTables
err = compactor.mergeSSTables(sstables, klogBm, vlogBm, newSSTable)
err = compactor.mergeSSTables(sstables, klogBm, vlogBm, newSSTable, dropTombstones)
if err != nil {
_ = os.Remove(klogTmpPath)
_ = os.Remove(vlogTmpPath)
Expand Down Expand Up @@ -594,9 +599,11 @@ func (compactor *Compactor) compactSSTables(sstables []*SSTable, sourceLevelIdx,
return nil
}

// mergeSSTables merges multiple SSTables into a new SSTable
func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *blockmanager.BlockManager, output *SSTable) error {
readTs := time.Now().UnixNano() + 10000000000
// mergeSSTables merges multiple SSTables into a new SSTable.
// When dropTombstones is false, tombstone entries are preserved in the output to prevent
// deleted keys from reappearing if older versions exist in SSTables outside the merge set.
func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *blockmanager.BlockManager, output *SSTable, dropTombstones bool) error {
readTs := time.Now().UnixNano() + FarFutureOffsetNs

var iters []*iterState

Expand Down Expand Up @@ -729,10 +736,8 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b
return fmt.Errorf("no valid latest state found for key %s", minKey)
}

// Collect entry if not a tombstone (merge output is sorted, so we bulk load at the end)
if latest != nil && latest.ValueBlockID != -1 {

// Read the value from the original VLog
if latest != nil && latest.ValueBlockID != TombstoneBlockID {
// Non-tombstone entry: copy value to new VLog and record in KLog
val, _, err := latestState.vlogBm.Read(latest.ValueBlockID)
if err != nil {
return fmt.Errorf("failed to read value: %w", err)
Expand All @@ -755,6 +760,16 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b
entryCount++
totalSize += int64(len(latest.Key) + len(val))
}
} else if latest != nil && latest.ValueBlockID == TombstoneBlockID && !dropTombstones {
// Tombstone entry: preserve it if there may be older versions in other SSTables
klogEntry := &KLogEntry{
Key: latest.Key,
Timestamp: latest.Timestamp,
ValueBlockID: TombstoneBlockID,
}
entries = append(entries, tree.KeyValue{Key: latest.Key, Value: klogEntry})
entryCount++
totalSize += int64(len(latest.Key))
}

// Advance all iterators with that key
Expand Down Expand Up @@ -884,14 +899,14 @@ func (compactor *Compactor) scheduleLastLevelPartitioning(lastLevel *Level, leve
// Create high-priority partitioning job
compactor.compactionQueue = append(compactor.compactionQueue, &compactorJob{
levelIdx: levelIdx,
priority: 10.0, // Highest priority
priority: PartitioningJobPriority,
ssTables: reservedTables,
targetIdx: -1, // Special marker for partitioning job
inProgress: false,
})

compactor.db.log(fmt.Sprintf("Scheduled partitioning job with %d SSTables (%.2f MB)",
len(reservedTables), float64(selectedSize)/(1024*1024)))
len(reservedTables), float64(selectedSize)/BytesPerMB))
}

// shouldPartitionLastLevel checks if the last level should be partitioned
Expand Down Expand Up @@ -1096,8 +1111,11 @@ func (compactor *Compactor) redistributeToLevel(tables []*SSTable, targetLevelId
}
}(vlogBm)

// Determine if tombstones can be safely dropped
dropTombstones := compactor.canDropTombstones(tables)

// Merge the SSTables
err = compactor.mergeSSTables(tables, klogBm, vlogBm, newSSTable)
err = compactor.mergeSSTables(tables, klogBm, vlogBm, newSSTable, dropTombstones)
if err != nil {
_ = os.Remove(klogPath)
_ = os.Remove(vlogPath)
Expand Down Expand Up @@ -1198,7 +1216,7 @@ func (compactor *Compactor) removeSSTablesFromLevel(tablesToRemove []*SSTable, l
}

compactor.db.log(fmt.Sprintf("Removed %d SSTables from level %d, freed %.2f MB",
len(tablesToRemove), levelNum, float64(totalRemovedSize)/(1024*1024)))
len(tablesToRemove), levelNum, float64(totalRemovedSize)/BytesPerMB))

return nil
}
Expand Down Expand Up @@ -1235,6 +1253,59 @@ func (compactor *Compactor) shouldCompact() bool {
return false
}

// canDropTombstones checks whether tombstones can be safely dropped during a merge.
// Tombstones can only be dropped if no SSTable outside the merge set (across all levels)
// has a key range that overlaps with the merge's key range. If any such SSTable exists,
// tombstones must be preserved to prevent deleted keys from reappearing (zombie entries).
func (compactor *Compactor) canDropTombstones(mergingTables []*SSTable) bool {
// Build a set of merging table IDs for fast lookup
mergingIDs := make(map[int64]bool, len(mergingTables))
for _, t := range mergingTables {
mergingIDs[t.Id] = true
}

// Compute the overall key range of the merge set
var mergeMin, mergeMax []byte
for _, t := range mergingTables {
if mergeMin == nil || bytes.Compare(t.Min, mergeMin) < 0 {
mergeMin = t.Min
}
if mergeMax == nil || bytes.Compare(t.Max, mergeMax) > 0 {
mergeMax = t.Max
}
}

if mergeMin == nil || mergeMax == nil {
return true // No key range to check
}

// Check all SSTables across all levels for overlapping key ranges
levels := compactor.db.levels.Load()
if levels == nil {
return true
}

for _, level := range *levels {
sstables := level.sstables.Load()
if sstables == nil {
continue
}

for _, sst := range *sstables {
if mergingIDs[sst.Id] {
continue // Skip SSTables in the merge set
}

// Check for key range overlap: sst.Max >= mergeMin && sst.Min <= mergeMax
if bytes.Compare(sst.Max, mergeMin) >= 0 && bytes.Compare(sst.Min, mergeMax) <= 0 {
return false // Overlapping SSTable found, cannot drop tombstones
}
}
}

return true // No overlapping SSTables, safe to drop tombstones
}

// getOrOpenBM retrieves a BlockManager from the LRU cache or opens it if not found
func getOrOpenBM(db *DB, path string) (*blockmanager.BlockManager, error) {
var ok bool
Expand Down
Loading