Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b

var entryCount int64
var totalSize int64
var entries []tree.KeyValue // collect for bulk load (merge output is sorted by key)

for {
var minKey []byte
Expand Down Expand Up @@ -728,7 +729,7 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b
return fmt.Errorf("no valid latest state found for key %s", minKey)
}

// Insert if not a tombstone
// Collect entry if not a tombstone (merge output is sorted, so we bulk load at the end)
if latest != nil && latest.ValueBlockID != -1 {

// Read the value from the original VLog
Expand All @@ -744,18 +745,13 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b
return fmt.Errorf("failed to write to VLog: %w", err)
}

// Create new KLog entry
// Create new KLog entry and collect for bulk load
klogEntry := &KLogEntry{
Key: latest.Key,
Timestamp: latest.Timestamp,
ValueBlockID: vID,
}

// Insert into new KLog
if err := bt.Put(latest.Key, klogEntry); err != nil {
return fmt.Errorf("failed to insert into KLog: %w", err)
}

entries = append(entries, tree.KeyValue{Key: latest.Key, Value: klogEntry})
entryCount++
totalSize += int64(len(latest.Key) + len(val))
}
Expand Down Expand Up @@ -810,6 +806,12 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b
}
}

if len(entries) > 0 {
if err := bt.BulkPutSorted(entries); err != nil {
return fmt.Errorf("failed to bulk insert into output KLog: %w", err)
}
}

output.EntryCount = int(entryCount)
output.Size = totalSize

Expand Down
45 changes: 22 additions & 23 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func (flusher *Flusher) flushMemtable(memt *Memtable) error {

flusher.db.log(fmt.Sprintf("Starting to flush memtable to SSTable %d", sstable.Id))

// Collect (key, KLogEntry) from memtable iterator (sorted by key); then bulk load into B-tree
var entries []tree.KeyValue
for {
key, value, ts, ok := iter.Next()
if !ok {
Expand All @@ -210,37 +212,34 @@ func (flusher *Flusher) flushMemtable(memt *Memtable) error {

// Check if this is a deletion marker
if value == nil {
// Write a deletion marker to the SSTable
klogEntry := &KLogEntry{
Key: key,
Timestamp: ts,
ValueBlockID: -1, // Special marker for deletion
}

err = t.Put(key, klogEntry) // Insert deletion marker into B-tree
if err != nil {
return fmt.Errorf("failed to insert deletion marker into B-tree: %w", err)
}
entries = append(entries, tree.KeyValue{
Key: key,
Value: &KLogEntry{
Key: key,
Timestamp: ts,
ValueBlockID: -1, // Special marker for deletion
},
})
} else {
// Viewable value, write it to the VLog
id, err := vlogBm.Append(value[:])
if err != nil {
return fmt.Errorf("failed to write VLog: %w", err)
}

klogEntry := &KLogEntry{
Key: key,
Timestamp: ts,
ValueBlockID: id,
}

// Insert the KLog entry into the B-tree
err = t.Put(key, klogEntry)
if err != nil {
return fmt.Errorf("failed to insert KLog entry into B-tree: %w", err)
}
entries = append(entries, tree.KeyValue{
Key: key,
Value: &KLogEntry{
Key: key,
Timestamp: ts,
ValueBlockID: id,
},
})
}
}

err = t.BulkPutSorted(entries)
if err != nil {
return fmt.Errorf("failed to bulk insert into B-tree: %w", err)
}

flusher.db.log(fmt.Sprintf("Finished flushing memtable to SSTable %d", sstable.Id))
Expand Down
164 changes: 164 additions & 0 deletions tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,170 @@ func (bt *BTree) saveMetadata() error {
return err
}

// KeyValue is a key-value pair for bulk loading. Used by BulkPutSorted.
type KeyValue struct {
Key []byte
Value interface{}
}

// BulkPutSorted builds the B-tree from a sorted slice of key-value pairs in one bottom-up pass.
// Entries must be sorted by key (bytes.Compare(entries[i].Key, entries[i+1].Key) <= 0).
// Intended for a newly created empty B-tree (e.g. when flushing a memtable).
// If the tree already has keys, behavior is undefined; the existing structure may be replaced.
// Empty entries: no-op, current empty root is kept.
func (bt *BTree) BulkPutSorted(entries []KeyValue) error {
if bt.metadata == nil || bt.metadata.RootBlockID == -1 {
return errors.New("tree not initialized")
}
if len(entries) == 0 {
return nil
}

root, err := bt.loadNode(bt.metadata.RootBlockID)
if err != nil {
return err
}
// Allow bulk load only when root is empty (new tree)
if len(root.Keys) > 0 {
return errors.New("BulkPutSorted requires an empty tree")
}

maxLeafKeys := 2*bt.metadata.Order - 1
order := bt.metadata.Order

// Build leaves: partition entries into chunks of maxLeafKeys
var leafBlockIDs []int64
var levelChildren []childInfo // blockID + first key/value for building parent level

for off := 0; off < len(entries); off += maxLeafKeys {
end := off + maxLeafKeys
if end > len(entries) {
end = len(entries)
}
chunk := entries[off:end]

keys := make([][]byte, len(chunk))
values := make([]interface{}, len(chunk))
for i, kv := range chunk {
keys[i] = kv.Key
values[i] = kv.Value
}

leaf := &Node{
BlockID: -1,
IsLeaf: true,
Keys: keys,
Values: values,
Children: make([]int64, 0),
Parent: -1,
NextLeaf: -1,
PrevLeaf: -1,
}

blockID, err := bt.storeNode(leaf)
if err != nil {
return err
}
leafBlockIDs = append(leafBlockIDs, blockID)
levelChildren = append(levelChildren, childInfo{
blockID: blockID,
firstKey: chunk[0].Key,
firstVal: chunk[0].Value,
firstKeyB: chunk[0].Key, // separator in parent
})
}

// Link leaves: NextLeaf / PrevLeaf
for i := 0; i < len(leafBlockIDs); i++ {
leaf, err := bt.loadNode(leafBlockIDs[i])
if err != nil {
return err
}
if i > 0 {
leaf.PrevLeaf = leafBlockIDs[i-1]
}
if i < len(leafBlockIDs)-1 {
leaf.NextLeaf = leafBlockIDs[i+1]
}
if _, err := bt.storeNode(leaf); err != nil {
return err
}
}

// Build internal layers until one root remains
maxChildren := 2 * order
for len(levelChildren) > 1 {
var nextLevel []childInfo
for i := 0; i < len(levelChildren); i += maxChildren {
end := i + maxChildren
if end > len(levelChildren) {
end = len(levelChildren)
}
group := levelChildren[i:end]

// Internal node: Children = group block IDs, Keys = separators (first key of 2nd, 3rd, ... child)
childBlockIDs := make([]int64, len(group))
keys := make([][]byte, 0, len(group)-1)
values := make([]interface{}, 0, len(group)-1)
for j, c := range group {
childBlockIDs[j] = c.blockID
if j > 0 {
keys = append(keys, c.firstKeyB)
values = append(values, c.firstVal)
}
}

internal := &Node{
BlockID: -1,
IsLeaf: false,
Keys: keys,
Values: values,
Children: childBlockIDs,
Parent: -1,
NextLeaf: -1,
PrevLeaf: -1,
}

parentID, err := bt.storeNode(internal)
if err != nil {
return err
}

// Set Parent on each child
for _, c := range group {
child, err := bt.loadNode(c.blockID)
if err != nil {
return err
}
child.Parent = parentID
if _, err := bt.storeNode(child); err != nil {
return err
}
}

nextLevel = append(nextLevel, childInfo{
blockID: parentID,
firstKey: group[0].firstKey,
firstVal: group[0].firstVal,
firstKeyB: group[0].firstKey,
})
}
levelChildren = nextLevel
}

newRootID := levelChildren[0].blockID
bt.metadata.RootBlockID = newRootID
return bt.saveMetadata()
}

// childInfo holds block ID and first key/value for a node when building internal layers
type childInfo struct {
blockID int64
firstKey []byte
firstVal interface{}
firstKeyB []byte // key to use as separator in parent
}

// Put adds a key-value pair to the B-tree, updates if the key already exists
func (bt *BTree) Put(key []byte, value interface{}) error {

Expand Down
Loading