From 193f8567aca07fdaa59bd254b32ef9802f80a9e0 Mon Sep 17 00:00:00 2001 From: Mehrdad Mahabadi Date: Fri, 13 Feb 2026 22:39:53 +0330 Subject: [PATCH] feat: add support for bulk loading --- compactor.go | 18 ++-- flusher.go | 45 +++++----- tree/tree.go | 164 +++++++++++++++++++++++++++++++++ tree/tree_test.go | 224 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 420 insertions(+), 31 deletions(-) diff --git a/compactor.go b/compactor.go index f7602c1..7d820d8 100644 --- a/compactor.go +++ b/compactor.go @@ -686,6 +686,7 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b var entryCount int64 var totalSize int64 + var entries []tree.KeyValue // collect for bulk load (merge output is sorted by key) for { var minKey []byte @@ -728,7 +729,7 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b return fmt.Errorf("no valid latest state found for key %s", minKey) } - // Insert if not a tombstone + // Collect entry if not a tombstone (merge output is sorted, so we bulk load at the end) if latest != nil && latest.ValueBlockID != -1 { // Read the value from the original VLog @@ -744,18 +745,13 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b return fmt.Errorf("failed to write to VLog: %w", err) } - // Create new KLog entry + // Create new KLog entry and collect for bulk load klogEntry := &KLogEntry{ Key: latest.Key, Timestamp: latest.Timestamp, ValueBlockID: vID, } - - // Insert into new KLog - if err := bt.Put(latest.Key, klogEntry); err != nil { - return fmt.Errorf("failed to insert into KLog: %w", err) - } - + entries = append(entries, tree.KeyValue{Key: latest.Key, Value: klogEntry}) entryCount++ totalSize += int64(len(latest.Key) + len(val)) } @@ -810,6 +806,12 @@ func (compactor *Compactor) mergeSSTables(sstables []*SSTable, klogBm, vlogBm *b } } + if len(entries) > 0 { + if err := bt.BulkPutSorted(entries); err != nil { + return fmt.Errorf("failed to bulk insert into output KLog: %w", err) + } + } + output.EntryCount = int(entryCount) output.Size = totalSize diff --git a/flusher.go b/flusher.go index ead3042..b10c2c4 100644 --- a/flusher.go +++ b/flusher.go @@ -202,6 +202,8 @@ func (flusher *Flusher) flushMemtable(memt *Memtable) error { flusher.db.log(fmt.Sprintf("Starting to flush memtable to SSTable %d", sstable.Id)) + // Collect (key, KLogEntry) from memtable iterator (sorted by key); then bulk load into B-tree + var entries []tree.KeyValue for { key, value, ts, ok := iter.Next() if !ok { @@ -210,37 +212,34 @@ func (flusher *Flusher) flushMemtable(memt *Memtable) error { // Check if this is a deletion marker if value == nil { - // Write a deletion marker to the SSTable - klogEntry := &KLogEntry{ - Key: key, - Timestamp: ts, - ValueBlockID: -1, // Special marker for deletion - } - - err = t.Put(key, klogEntry) // Insert deletion marker into B-tree - if err != nil { - return fmt.Errorf("failed to insert deletion marker into B-tree: %w", err) - } + entries = append(entries, tree.KeyValue{ + Key: key, + Value: &KLogEntry{ + Key: key, + Timestamp: ts, + ValueBlockID: -1, // Special marker for deletion + }, + }) } else { // Viewable value, write it to the VLog id, err := vlogBm.Append(value[:]) if err != nil { return fmt.Errorf("failed to write VLog: %w", err) } - - klogEntry := &KLogEntry{ - Key: key, - Timestamp: ts, - ValueBlockID: id, - } - - // Insert the KLog entry into the B-tree - err = t.Put(key, klogEntry) - if err != nil { - return fmt.Errorf("failed to insert KLog entry into B-tree: %w", err) - } + entries = append(entries, tree.KeyValue{ + Key: key, + Value: &KLogEntry{ + Key: key, + Timestamp: ts, + ValueBlockID: id, + }, + }) } + } + err = t.BulkPutSorted(entries) + if err != nil { + return fmt.Errorf("failed to bulk insert into B-tree: %w", err) } flusher.db.log(fmt.Sprintf("Finished flushing memtable to SSTable %d", sstable.Id)) diff --git a/tree/tree.go b/tree/tree.go index 3a21375..73e87be 100644 --- a/tree/tree.go +++ b/tree/tree.go @@ -183,6 +183,170 @@ func (bt *BTree) saveMetadata() error { return err } +// KeyValue is a key-value pair for bulk loading. Used by BulkPutSorted. +type KeyValue struct { + Key []byte + Value interface{} +} + +// BulkPutSorted builds the B-tree from a sorted slice of key-value pairs in one bottom-up pass. +// Entries must be sorted by key (bytes.Compare(entries[i].Key, entries[i+1].Key) <= 0). +// Intended for a newly created empty B-tree (e.g. when flushing a memtable). +// If the tree already has keys, behavior is undefined; the existing structure may be replaced. +// Empty entries: no-op, current empty root is kept. +func (bt *BTree) BulkPutSorted(entries []KeyValue) error { + if bt.metadata == nil || bt.metadata.RootBlockID == -1 { + return errors.New("tree not initialized") + } + if len(entries) == 0 { + return nil + } + + root, err := bt.loadNode(bt.metadata.RootBlockID) + if err != nil { + return err + } + // Allow bulk load only when root is empty (new tree) + if len(root.Keys) > 0 { + return errors.New("BulkPutSorted requires an empty tree") + } + + maxLeafKeys := 2*bt.metadata.Order - 1 + order := bt.metadata.Order + + // Build leaves: partition entries into chunks of maxLeafKeys + var leafBlockIDs []int64 + var levelChildren []childInfo // blockID + first key/value for building parent level + + for off := 0; off < len(entries); off += maxLeafKeys { + end := off + maxLeafKeys + if end > len(entries) { + end = len(entries) + } + chunk := entries[off:end] + + keys := make([][]byte, len(chunk)) + values := make([]interface{}, len(chunk)) + for i, kv := range chunk { + keys[i] = kv.Key + values[i] = kv.Value + } + + leaf := &Node{ + BlockID: -1, + IsLeaf: true, + Keys: keys, + Values: values, + Children: make([]int64, 0), + Parent: -1, + NextLeaf: -1, + PrevLeaf: -1, + } + + blockID, err := bt.storeNode(leaf) + if err != nil { + return err + } + leafBlockIDs = append(leafBlockIDs, blockID) + levelChildren = append(levelChildren, childInfo{ + blockID: blockID, + firstKey: chunk[0].Key, + firstVal: chunk[0].Value, + firstKeyB: chunk[0].Key, // separator in parent + }) + } + + // Link leaves: NextLeaf / PrevLeaf + for i := 0; i < len(leafBlockIDs); i++ { + leaf, err := bt.loadNode(leafBlockIDs[i]) + if err != nil { + return err + } + if i > 0 { + leaf.PrevLeaf = leafBlockIDs[i-1] + } + if i < len(leafBlockIDs)-1 { + leaf.NextLeaf = leafBlockIDs[i+1] + } + if _, err := bt.storeNode(leaf); err != nil { + return err + } + } + + // Build internal layers until one root remains + maxChildren := 2 * order + for len(levelChildren) > 1 { + var nextLevel []childInfo + for i := 0; i < len(levelChildren); i += maxChildren { + end := i + maxChildren + if end > len(levelChildren) { + end = len(levelChildren) + } + group := levelChildren[i:end] + + // Internal node: Children = group block IDs, Keys = separators (first key of 2nd, 3rd, ... child) + childBlockIDs := make([]int64, len(group)) + keys := make([][]byte, 0, len(group)-1) + values := make([]interface{}, 0, len(group)-1) + for j, c := range group { + childBlockIDs[j] = c.blockID + if j > 0 { + keys = append(keys, c.firstKeyB) + values = append(values, c.firstVal) + } + } + + internal := &Node{ + BlockID: -1, + IsLeaf: false, + Keys: keys, + Values: values, + Children: childBlockIDs, + Parent: -1, + NextLeaf: -1, + PrevLeaf: -1, + } + + parentID, err := bt.storeNode(internal) + if err != nil { + return err + } + + // Set Parent on each child + for _, c := range group { + child, err := bt.loadNode(c.blockID) + if err != nil { + return err + } + child.Parent = parentID + if _, err := bt.storeNode(child); err != nil { + return err + } + } + + nextLevel = append(nextLevel, childInfo{ + blockID: parentID, + firstKey: group[0].firstKey, + firstVal: group[0].firstVal, + firstKeyB: group[0].firstKey, + }) + } + levelChildren = nextLevel + } + + newRootID := levelChildren[0].blockID + bt.metadata.RootBlockID = newRootID + return bt.saveMetadata() +} + +// childInfo holds block ID and first key/value for a node when building internal layers +type childInfo struct { + blockID int64 + firstKey []byte + firstVal interface{} + firstKeyB []byte // key to use as separator in parent +} + // Put adds a key-value pair to the B-tree, updates if the key already exists func (bt *BTree) Put(key []byte, value interface{}) error { diff --git a/tree/tree_test.go b/tree/tree_test.go index 9a5ef31..532bb7f 100644 --- a/tree/tree_test.go +++ b/tree/tree_test.go @@ -16,6 +16,7 @@ package tree import ( + "bytes" "fmt" "github.com/wildcatdb/wildcat/v2/blockmanager" "math/rand" @@ -249,6 +250,229 @@ func TestBTreeLargeDataset(t *testing.T) { } } +func TestBTreeBulkPutSortedEmpty(t *testing.T) { + tree, cleanup := setupTestBTree(t, 3) + defer cleanup() + + err := tree.BulkPutSorted(nil) + if err != nil { + t.Fatalf("BulkPutSorted(nil) failed: %v", err) + } + err = tree.BulkPutSorted([]KeyValue{}) + if err != nil { + t.Fatalf("BulkPutSorted(empty) failed: %v", err) + } + // Tree should still be empty (Get any key -> not found) + _, ok, _ := tree.Get([]byte("x")) + if ok { + t.Error("expected no key after empty bulk load") + } +} + +func TestBTreeBulkPutSortedNonEmptyTreeReturnsError(t *testing.T) { + tree, cleanup := setupTestBTree(t, 3) + defer cleanup() + + _ = tree.Put([]byte("a"), "a") + entries := []KeyValue{{Key: []byte("b"), Value: "b"}} + err := tree.BulkPutSorted(entries) + if err == nil { + t.Error("expected BulkPutSorted on non-empty tree to return error") + } +} + +func TestBTreeBulkPutSortedOneEntry(t *testing.T) { + tree, cleanup := setupTestBTree(t, 3) + defer cleanup() + + entries := []KeyValue{ + {Key: []byte("k"), Value: "v"}, + } + err := tree.BulkPutSorted(entries) + if err != nil { + t.Fatalf("BulkPutSorted failed: %v", err) + } + val, ok, err := tree.Get([]byte("k")) + if err != nil || !ok { + t.Fatalf("Get failed: ok=%v err=%v", ok, err) + } + if val.(string) != "v" { + t.Errorf("expected value v, got %v", val) + } + iter, err := tree.Iterator(true) + if err != nil { + t.Fatalf("Iterator failed: %v", err) + } + if !iter.Valid() || string(iter.Key()) != "k" || iter.Value().(string) != "v" { + t.Errorf("iterator: key=%s value=%v", iter.Key(), iter.Value()) + } +} + +func TestBTreeBulkPutSortedOneLeafFull(t *testing.T) { + // order 3 -> max 5 keys per leaf; fill one leaf + tree, cleanup := setupTestBTree(t, 3) + defer cleanup() + + var entries []KeyValue + for i := 0; i < 5; i++ { + k := fmt.Sprintf("key%02d", i) + entries = append(entries, KeyValue{Key: []byte(k), Value: k + "_val"}) + } + err := tree.BulkPutSorted(entries) + if err != nil { + t.Fatalf("BulkPutSorted failed: %v", err) + } + for i := 0; i < 5; i++ { + k := fmt.Sprintf("key%02d", i) + val, ok, err := tree.Get([]byte(k)) + if err != nil || !ok { + t.Errorf("Get(%s) failed: ok=%v err=%v", k, ok, err) + continue + } + if val.(string) != k+"_val" { + t.Errorf("Get(%s): expected %s_val, got %v", k, k, val) + } + } + n := 0 + iter, _ := tree.Iterator(true) + for iter.Valid() { + n++ + iter.Next() + } + if n != 5 { + t.Errorf("expected 5 keys from iterator, got %d", n) + } +} + +func TestBTreeBulkPutSortedMultipleLeaves(t *testing.T) { + tree, cleanup := setupTestBTree(t, 3) + defer cleanup() + + // More than one leaf: order 3 -> 5 keys/leaf; use 12 keys -> 3 leaves (5+5+2) + var entries []KeyValue + for i := 0; i < 12; i++ { + k := fmt.Sprintf("k%03d", i) + entries = append(entries, KeyValue{Key: []byte(k), Value: k}) + } + err := tree.BulkPutSorted(entries) + if err != nil { + t.Fatalf("BulkPutSorted failed: %v", err) + } + // Get at boundaries + for _, idx := range []int{0, 4, 5, 9, 10, 11} { + k := fmt.Sprintf("k%03d", idx) + val, ok, err := tree.Get([]byte(k)) + if err != nil || !ok { + t.Errorf("Get(%s) failed: ok=%v err=%v", k, ok, err) + continue + } + if val.(string) != k { + t.Errorf("Get(%s): got %v", k, val) + } + } + // Range iterator [k002, k008] inclusive + iter, err := tree.RangeIterator([]byte("k002"), []byte("k008"), true) + if err != nil { + t.Fatalf("RangeIterator failed: %v", err) + } + var got []string + for iter.Valid() { + got = append(got, string(iter.Key())) + iter.Next() + } + expected := []string{"k002", "k003", "k004", "k005", "k006", "k007", "k008"} + if len(got) != len(expected) { + t.Errorf("range: expected %v, got %v", expected, got) + } else { + for i := range expected { + if got[i] != expected[i] { + t.Errorf("range at %d: expected %s, got %s", i, expected[i], got[i]) + } + } + } + // Prefix iterator (if keys share prefix) + prefixIter, err := tree.PrefixIterator([]byte("k01"), true) + if err != nil { + t.Fatalf("PrefixIterator failed: %v", err) + } + got = nil + for prefixIter.Valid() { + got = append(got, string(prefixIter.Key())) + prefixIter.Next() + } + expectedPrefix := []string{"k010", "k011"} + if len(got) != len(expectedPrefix) { + t.Errorf("prefix: expected %v, got %v", expectedPrefix, got) + } else { + for i := range expectedPrefix { + if got[i] != expectedPrefix[i] { + t.Errorf("prefix at %d: expected %s, got %s", i, expectedPrefix[i], got[i]) + } + } + } +} + +func TestBTreeBulkPutSortedVsSequentialPut(t *testing.T) { + order := 5 + N := 50 + var sortedEntries []KeyValue + for i := 0; i < N; i++ { + k := fmt.Sprintf("key_%04d", i) + sortedEntries = append(sortedEntries, KeyValue{Key: []byte(k), Value: "val_" + k}) + } + // Already sorted + + // Tree 1: BulkPutSorted + tree1, cleanup1 := setupTestBTree(t, order) + defer cleanup1() + err := tree1.BulkPutSorted(sortedEntries) + if err != nil { + t.Fatalf("BulkPutSorted failed: %v", err) + } + + // Tree 2: sequential Put (same order) + tree2, cleanup2 := setupTestBTree(t, order) + defer cleanup2() + for _, e := range sortedEntries { + err := tree2.Put(e.Key, e.Value) + if err != nil { + t.Fatalf("Put failed: %v", err) + } + } + + // Same keys and values + for i := 0; i < N; i++ { + k := fmt.Sprintf("key_%04d", i) + key := []byte(k) + v1, ok1, _ := tree1.Get(key) + v2, ok2, _ := tree2.Get(key) + if !ok1 || !ok2 { + t.Errorf("key %s: bulk ok=%v seq ok=%v", k, ok1, ok2) + continue + } + if v1.(string) != v2.(string) { + t.Errorf("key %s: bulk=%v seq=%v", k, v1, v2) + } + } + + // Same iteration order (ascending) + it1, _ := tree1.Iterator(true) + it2, _ := tree2.Iterator(true) + for it1.Valid() && it2.Valid() { + if !bytes.Equal(it1.Key(), it2.Key()) { + t.Errorf("iterator key mismatch: %s vs %s", it1.Key(), it2.Key()) + } + if it1.Value().(string) != it2.Value().(string) { + t.Errorf("iterator value mismatch at key %s", it1.Key()) + } + it1.Next() + it2.Next() + } + if it1.Valid() != it2.Valid() { + t.Error("iterator length mismatch") + } +} + func TestBTreeRangeIteratorAscending(t *testing.T) { tree, cleanup := setupTestBTree(t, 3) defer cleanup()