Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions engine/src/b_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl PageVersion {
/// ancestor nodes and metadata page.
struct PessimisticPath {
latch_stack: Vec<LatchHandle>,
leaf_page_id: PageId,
leaf_node_latch: LeafNodeLatch,
metadata_page: Option<PinnedWritePage>,
}

Expand Down Expand Up @@ -146,12 +146,12 @@ impl LatchHandle {
impl PessimisticPath {
fn new(
latch_stack: Vec<LatchHandle>,
leaf_page_id: PageId,
leaf_node_latch: LeafNodeLatch,
metadata_page: Option<PinnedWritePage>,
) -> Self {
Self {
latch_stack,
leaf_page_id,
leaf_node_latch,
metadata_page,
}
}
Expand Down Expand Up @@ -682,7 +682,10 @@ impl BTree {
NodeType::Leaf => {
return Ok(PessimisticPath::new(
latch_stack,
current_page_id,
LeafNodeLatch {
page_id: current_page_id,
node: BTreeLeafNode::<PinnedWritePage>::new(page)?,
},
metadata_page,
));
}
Expand All @@ -692,15 +695,13 @@ impl BTree {

/// Performs a pessimistic insert, splitting nodes as necessary.
fn insert_pessimistic(&self, key: &[u8], record_pointer: RecordPtr) -> Result<(), BTreeError> {
let path = self.traverse_pessimistic(key, |node| Ok(node.can_fit_another()?))?;

let mut leaf = self.pin_leaf_for_write(path.leaf_page_id)?;
let mut path = self.traverse_pessimistic(key, |node| Ok(node.can_fit_another()?))?;

match leaf.insert(key, record_pointer)? {
match path.leaf_node_latch.node.insert(key, record_pointer)? {
NodeInsertResult::Success => Ok(()),
NodeInsertResult::PageFull => self.split_and_propagate(
path.latch_stack,
(path.leaf_page_id, leaf),
path.leaf_node_latch,
key,
record_pointer,
path.metadata_page,
Expand Down Expand Up @@ -744,12 +745,12 @@ impl BTree {
fn split_and_propagate(
&self,
internal_nodes: Vec<LatchHandle>,
leaf_node: (PageId, BTreeLeafNode<PinnedWritePage>),
leaf_node: LeafNodeLatch,
key: &[u8],
record_pointer: RecordPtr,
metadata_page: Option<PinnedWritePage>,
) -> Result<(), BTreeError> {
let (leaf_page_id, leaf_node) = leaf_node;
let (leaf_page_id, leaf_node) = (leaf_node.page_id, leaf_node.node);
// Split the leaf and get separator + new leaf id
let (separator_key, new_leaf_id) =
self.split_leaf(leaf_page_id, leaf_node, key, record_pointer)?;
Expand Down Expand Up @@ -957,19 +958,14 @@ impl BTree {
fn delete_pessimistic(&self, key: &[u8]) -> Result<(), BTreeError> {
// Keep the full ancestor path latched during delete so we can safely perform
// redistributions/merges that bubble up the tree (avoids losing structural context).
let path =
let mut path =
self.traverse_pessimistic(key, |node| Ok(node.will_not_underflow_after_delete()?))?;

let mut leaf = self.pin_leaf_for_write(path.leaf_page_id)?;

match leaf.delete(key)? {
match path.leaf_node_latch.node.delete(key)? {
NodeDeleteResult::Success => Ok(()),
NodeDeleteResult::SuccessUnderflow => self.redistribute_or_merge(
path.latch_stack,
LeafNodeLatch {
page_id: path.leaf_page_id,
node: leaf,
},
path.leaf_node_latch,
path.metadata_page,
),
NodeDeleteResult::KeyDoesNotExist => Err(BTreeError::KeyForDeleteNotFound),
Expand Down Expand Up @@ -2535,7 +2531,7 @@ mod test {
let (cache, file_key, _temp_dir) = setup_test_cache();
let btree = Arc::new(create_empty_btree(cache, file_key).unwrap());

let num_keys = 2000; // Use more keys to ensure multi-level tree
let num_keys = 5000; // Use more keys to ensure multi-level tree

// Pre-populate
for i in 0..num_keys {
Expand Down
53 changes: 43 additions & 10 deletions engine/src/b_tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,28 @@ where

pub fn batch_insert(&mut self, insert_values: Vec<Vec<u8>>) -> Result<(), BTreeNodeError> {
for (index, value) in insert_values.iter().enumerate() {
self.slotted_page
.insert_at(value.as_slice(), index as SlotId)?;
let pos = index as SlotId;
let res = self.slotted_page.insert_at(value.as_slice(), pos)?;
match res {
InsertResult::Success(_) => {}
InsertResult::NeedsDefragmentation => {
self.slotted_page.compact_records()?;
let res2 = self.slotted_page.insert_at(value.as_slice(), pos)?;
match res2 {
InsertResult::Success(_) => {}
InsertResult::NeedsDefragmentation | InsertResult::PageFull => {
return Err(BTreeNodeError::CorruptNode {
reason: "batch_insert failed after defragmentation".to_string(),
});
}
}
}
InsertResult::PageFull => {
return Err(BTreeNodeError::CorruptNode {
reason: "batch_insert: page unexpectedly full".to_string(),
});
}
}
}
Ok(())
}
Expand Down Expand Up @@ -504,16 +524,28 @@ where
) -> Result<(), BTreeNodeError> {
let child_ptr = self.get_child_ptr(slot_id)?;

self.slotted_page.delete(slot_id)?;

let mut buffer = Vec::new();
buffer.extend_from_slice(new_key);
child_ptr.serialize(&mut buffer);

let result = self.slotted_page.insert_at(&buffer, slot_id)?;
self.handle_insert_result(result, &buffer, slot_id)?;

Ok(())
match self.slotted_page.update(slot_id, &buffer)? {
UpdateResult::Success => Ok(()),
UpdateResult::NeedsDefragmentation => {
self.slotted_page.compact_records()?;
match self.slotted_page.update(slot_id, &buffer)? {
UpdateResult::Success => Ok(()),
UpdateResult::NeedsDefragmentation | UpdateResult::PageFull => {
Err(BTreeNodeError::CorruptNode {
reason: "update_separator_at_slot failed after defragmentation"
.to_string(),
})
}
}
}
UpdateResult::PageFull => Err(BTreeNodeError::CorruptNode {
reason: "update_separator_at_slot: page unexpectedly full".to_string(),
}),
}
}

pub fn split_keys(&mut self) -> Result<(Vec<Vec<u8>>, Vec<u8>), BTreeNodeError> {
Expand Down Expand Up @@ -861,12 +893,13 @@ where
let serialized_record_ptr_size = size_of::<PageId>() + size_of::<SlotId>();
let key_bytes_end = record.len() - serialized_record_ptr_size;
let (key, _) = record.split_at(key_bytes_end);

let position = match self.search(key)? {
LeafNodeSearchResult::Found { .. } => return Ok(NodeInsertResult::KeyAlreadyExists),
LeafNodeSearchResult::NotFoundLeaf { insert_slot_id } => insert_slot_id,
};
self.slotted_page.insert_at(record, position)?;
Ok(NodeInsertResult::Success)
let insert_result = self.slotted_page.insert_at(record, position)?;
self.handle_insert_result(insert_result, record, position)
}

pub(crate) fn update_record_ptr(
Expand Down
Loading