From 14318b5e52056551ca7b47c0d85795898344a07e Mon Sep 17 00:00:00 2001 From: Aleksander Bielicki Date: Thu, 1 Jan 2026 16:21:29 +0100 Subject: [PATCH] Handle insert results and return leaf in pessimistic path --- engine/src/b_tree.rs | 36 ++++++++++++-------------- engine/src/b_tree_node.rs | 53 +++++++++++++++++++++++++++++++-------- 2 files changed, 59 insertions(+), 30 deletions(-) diff --git a/engine/src/b_tree.rs b/engine/src/b_tree.rs index 0bbc0f5..6ffc43b 100644 --- a/engine/src/b_tree.rs +++ b/engine/src/b_tree.rs @@ -114,7 +114,7 @@ impl PageVersion { /// ancestor nodes and metadata page. struct PessimisticPath { latch_stack: Vec, - leaf_page_id: PageId, + leaf_node_latch: LeafNodeLatch, metadata_page: Option, } @@ -146,12 +146,12 @@ impl LatchHandle { impl PessimisticPath { fn new( latch_stack: Vec, - leaf_page_id: PageId, + leaf_node_latch: LeafNodeLatch, metadata_page: Option, ) -> Self { Self { latch_stack, - leaf_page_id, + leaf_node_latch, metadata_page, } } @@ -682,7 +682,10 @@ impl BTree { NodeType::Leaf => { return Ok(PessimisticPath::new( latch_stack, - current_page_id, + LeafNodeLatch { + page_id: current_page_id, + node: BTreeLeafNode::::new(page)?, + }, metadata_page, )); } @@ -692,15 +695,13 @@ impl BTree { /// Performs a pessimistic insert, splitting nodes as necessary. fn insert_pessimistic(&self, key: &[u8], record_pointer: RecordPtr) -> Result<(), BTreeError> { - let path = self.traverse_pessimistic(key, |node| Ok(node.can_fit_another()?))?; - - let mut leaf = self.pin_leaf_for_write(path.leaf_page_id)?; + let mut path = self.traverse_pessimistic(key, |node| Ok(node.can_fit_another()?))?; - match leaf.insert(key, record_pointer)? { + match path.leaf_node_latch.node.insert(key, record_pointer)? { NodeInsertResult::Success => Ok(()), NodeInsertResult::PageFull => self.split_and_propagate( path.latch_stack, - (path.leaf_page_id, leaf), + path.leaf_node_latch, key, record_pointer, path.metadata_page, @@ -744,12 +745,12 @@ impl BTree { fn split_and_propagate( &self, internal_nodes: Vec, - leaf_node: (PageId, BTreeLeafNode), + leaf_node: LeafNodeLatch, key: &[u8], record_pointer: RecordPtr, metadata_page: Option, ) -> Result<(), BTreeError> { - let (leaf_page_id, leaf_node) = leaf_node; + let (leaf_page_id, leaf_node) = (leaf_node.page_id, leaf_node.node); // Split the leaf and get separator + new leaf id let (separator_key, new_leaf_id) = self.split_leaf(leaf_page_id, leaf_node, key, record_pointer)?; @@ -957,19 +958,14 @@ impl BTree { fn delete_pessimistic(&self, key: &[u8]) -> Result<(), BTreeError> { // Keep the full ancestor path latched during delete so we can safely perform // redistributions/merges that bubble up the tree (avoids losing structural context). - let path = + let mut path = self.traverse_pessimistic(key, |node| Ok(node.will_not_underflow_after_delete()?))?; - let mut leaf = self.pin_leaf_for_write(path.leaf_page_id)?; - - match leaf.delete(key)? { + match path.leaf_node_latch.node.delete(key)? { NodeDeleteResult::Success => Ok(()), NodeDeleteResult::SuccessUnderflow => self.redistribute_or_merge( path.latch_stack, - LeafNodeLatch { - page_id: path.leaf_page_id, - node: leaf, - }, + path.leaf_node_latch, path.metadata_page, ), NodeDeleteResult::KeyDoesNotExist => Err(BTreeError::KeyForDeleteNotFound), @@ -2535,7 +2531,7 @@ mod test { let (cache, file_key, _temp_dir) = setup_test_cache(); let btree = Arc::new(create_empty_btree(cache, file_key).unwrap()); - let num_keys = 2000; // Use more keys to ensure multi-level tree + let num_keys = 5000; // Use more keys to ensure multi-level tree // Pre-populate for i in 0..num_keys { diff --git a/engine/src/b_tree_node.rs b/engine/src/b_tree_node.rs index eef7f5e..9dbe9ec 100644 --- a/engine/src/b_tree_node.rs +++ b/engine/src/b_tree_node.rs @@ -265,8 +265,28 @@ where pub fn batch_insert(&mut self, insert_values: Vec>) -> Result<(), BTreeNodeError> { for (index, value) in insert_values.iter().enumerate() { - self.slotted_page - .insert_at(value.as_slice(), index as SlotId)?; + let pos = index as SlotId; + let res = self.slotted_page.insert_at(value.as_slice(), pos)?; + match res { + InsertResult::Success(_) => {} + InsertResult::NeedsDefragmentation => { + self.slotted_page.compact_records()?; + let res2 = self.slotted_page.insert_at(value.as_slice(), pos)?; + match res2 { + InsertResult::Success(_) => {} + InsertResult::NeedsDefragmentation | InsertResult::PageFull => { + return Err(BTreeNodeError::CorruptNode { + reason: "batch_insert failed after defragmentation".to_string(), + }); + } + } + } + InsertResult::PageFull => { + return Err(BTreeNodeError::CorruptNode { + reason: "batch_insert: page unexpectedly full".to_string(), + }); + } + } } Ok(()) } @@ -504,16 +524,28 @@ where ) -> Result<(), BTreeNodeError> { let child_ptr = self.get_child_ptr(slot_id)?; - self.slotted_page.delete(slot_id)?; - let mut buffer = Vec::new(); buffer.extend_from_slice(new_key); child_ptr.serialize(&mut buffer); - let result = self.slotted_page.insert_at(&buffer, slot_id)?; - self.handle_insert_result(result, &buffer, slot_id)?; - - Ok(()) + match self.slotted_page.update(slot_id, &buffer)? { + UpdateResult::Success => Ok(()), + UpdateResult::NeedsDefragmentation => { + self.slotted_page.compact_records()?; + match self.slotted_page.update(slot_id, &buffer)? { + UpdateResult::Success => Ok(()), + UpdateResult::NeedsDefragmentation | UpdateResult::PageFull => { + Err(BTreeNodeError::CorruptNode { + reason: "update_separator_at_slot failed after defragmentation" + .to_string(), + }) + } + } + } + UpdateResult::PageFull => Err(BTreeNodeError::CorruptNode { + reason: "update_separator_at_slot: page unexpectedly full".to_string(), + }), + } } pub fn split_keys(&mut self) -> Result<(Vec>, Vec), BTreeNodeError> { @@ -861,12 +893,13 @@ where let serialized_record_ptr_size = size_of::() + size_of::(); let key_bytes_end = record.len() - serialized_record_ptr_size; let (key, _) = record.split_at(key_bytes_end); + let position = match self.search(key)? { LeafNodeSearchResult::Found { .. } => return Ok(NodeInsertResult::KeyAlreadyExists), LeafNodeSearchResult::NotFoundLeaf { insert_slot_id } => insert_slot_id, }; - self.slotted_page.insert_at(record, position)?; - Ok(NodeInsertResult::Success) + let insert_result = self.slotted_page.insert_at(record, position)?; + self.handle_insert_result(insert_result, record, position) } pub(crate) fn update_record_ptr(