diff --git a/engine/src/b_tree.rs b/engine/src/b_tree.rs index 8613c7b..de7cbab 100644 --- a/engine/src/b_tree.rs +++ b/engine/src/b_tree.rs @@ -750,13 +750,32 @@ impl BTree { record_pointer: RecordPtr, metadata_page: Option, ) -> Result<(), BTreeError> { + // We need to store all the latches to nodes we modify during split propagation to + // write them atomically to WAL at the end. + let mut dropped_page_latches: Vec = Vec::new(); + let (leaf_page_id, leaf_node) = (leaf_node.page_id, leaf_node.node); // Split the leaf and get separator + new leaf id - let (separator_key, new_leaf_id) = - self.split_leaf(leaf_page_id, leaf_node, key, record_pointer)?; + let (separator_key, new_leaf_id) = self.split_leaf( + leaf_page_id, + leaf_node, + key, + record_pointer, + &mut dropped_page_latches, + )?; // Propagate separator up the stack (or create new root if stack empty) - self.propagate_separator_up(internal_nodes, separator_key, new_leaf_id, metadata_page) + self.propagate_separator_up( + internal_nodes, + separator_key, + new_leaf_id, + metadata_page, + &mut dropped_page_latches, + )?; + + self.cache.drop_write_pages(dropped_page_latches); + + Ok(()) } /// Splits the provided leaf node and creates a new one with the split keys. Returns a pair @@ -767,6 +786,7 @@ impl BTree { mut leaf_node: BTreeLeafNode, key: &[u8], record_pointer: RecordPtr, + dropped_page_latches: &mut Vec, ) -> Result<(Vec, PageId), BTreeError> { // Split keys of the leaf. let (records_to_move, separator_key) = leaf_node.split_keys()?; @@ -794,6 +814,9 @@ impl BTree { // Bump version. self.update_structural_version(leaf_page_id); + dropped_page_latches.push(leaf_node.into_page()); + dropped_page_latches.push(new_leaf_node.into_page()); + Ok((separator_key, new_leaf_id)) } @@ -805,6 +828,7 @@ impl BTree { mut current_separator_key: Vec, mut child_page_id: PageId, metadata_page: Option, + dropped_page_latches: &mut Vec, ) -> Result<(), BTreeError> { // If there are no parents, create new root. if internal_nodes.is_empty() { @@ -816,6 +840,7 @@ impl BTree { current_separator_key, child_page_id, metadata, + dropped_page_latches, ); } @@ -862,6 +887,10 @@ impl BTree { // Bump version for parent to indicate structural change. self.update_structural_version(parent_page_id); + // Store modified pages for WAL write later. + dropped_page_latches.push(parent_node.into_page()); + dropped_page_latches.push(new_internal_node.into_page()); + // The new separator and new child id will be propagated up current_separator_key = new_separator; child_page_id = new_internal_id; @@ -876,6 +905,7 @@ impl BTree { current_separator_key, child_page_id, metadata, + dropped_page_latches, ); } } @@ -892,6 +922,7 @@ impl BTree { separator_key: Vec, right_child_id: PageId, mut metadata_page: PinnedWritePage, + dropped_page_latches: &mut Vec, ) -> Result<(), BTreeError> { let (new_root_id, mut new_root) = self.allocate_and_init_internal(left_child_id)?; @@ -902,6 +933,10 @@ impl BTree { let metadata = BTreeMetadata::new(new_root_id); metadata.save_to_page(&mut metadata_page); + // Store modified pages for WAL write later. + dropped_page_latches.push(new_root.into_page()); + dropped_page_latches.push(metadata_page); + Ok(()) } @@ -981,6 +1016,10 @@ impl BTree { mut leaf_latch: LeafNodeLatch, metadata_page: Option, ) -> Result<(), BTreeError> { + // We need to store all the latches to nodes we modify during merge/redistribute to + // write them atomically to WAL at the end. + let mut dropped_page_latches: Vec = Vec::new(); + // We need the parent for separator updates. let parent = match internal_nodes.pop() { Some(p) => p, @@ -1019,6 +1058,13 @@ impl BTree { self.update_structural_version(parent_id); self.update_structural_version(right_sibling_id); + + // Store modified pages for WAL write. + dropped_page_latches.push(leaf_latch.node.into_page()); + dropped_page_latches.push(right_sibling.into_page()); + dropped_page_latches.push(parent_node.into_page()); + self.cache.drop_write_pages(dropped_page_latches); + return Ok(()); } @@ -1038,6 +1084,7 @@ impl BTree { node: right_sibling, }, ctx, + dropped_page_latches, ); } } @@ -1067,6 +1114,13 @@ impl BTree { leaf_latch .node .insert_record(redistributed_record.as_slice())?; + + // Store modified pages for WAL write. + dropped_page_latches.push(leaf_latch.node.into_page()); + dropped_page_latches.push(left_sibling.into_page()); + dropped_page_latches.push(parent_node.into_page()); + self.cache.drop_write_pages(dropped_page_latches); + return Ok(()); } @@ -1086,6 +1140,7 @@ impl BTree { node: left_sibling, }, ctx, + dropped_page_latches, ) } @@ -1095,7 +1150,8 @@ impl BTree { &self, mut current_node_latch: LeafNodeLatch, right_sibling_latch: LeafNodeLatch, - mut ctx: MergeContext, + ctx: MergeContext, + mut dropped_page_latches: Vec, ) -> Result<(), BTreeError> { // Move all keys from right sibling to node. let keys_to_move = right_sibling_latch.node.get_all_records()?; @@ -1117,15 +1173,41 @@ impl BTree { .slot_id() .expect("next_child_pos must be AfterSlot"); - ctx.parent_node.delete_at(slot_id)?; - self.update_structural_version(ctx.parent_id); + // Destructure ctx to extract components + let MergeContext { + parent_id, + mut parent_node, + child_pos, + internal_nodes, + metadata_page, + } = ctx; + + parent_node.delete_at(slot_id)?; + self.update_structural_version(parent_id); + // Free the right sibling page self.free_latch(right_sibling_latch)?; - if ctx.parent_node.is_underflow()? { - self.handle_internal_underflow(ctx)?; + // Check underflow BEFORE moving parent_node to vector + let parent_has_underflow = parent_node.is_underflow()?; + + // Store modified current node page + dropped_page_latches.push(current_node_latch.node.into_page()); + + if parent_has_underflow { + let new_ctx = MergeContext { + parent_id, + parent_node, + child_pos, + internal_nodes, + metadata_page, + }; + return self.handle_internal_underflow(new_ctx, dropped_page_latches); } + // No underflow, so we can write all pages atomically + dropped_page_latches.push(parent_node.into_page()); + self.cache.drop_write_pages(dropped_page_latches); Ok(()) } @@ -1135,7 +1217,8 @@ impl BTree { &self, current_node_latch: LeafNodeLatch, mut left_sibling_latch: LeafNodeLatch, - mut ctx: MergeContext, + ctx: MergeContext, + mut dropped_page_latches: Vec, ) -> Result<(), BTreeError> { let keys_to_move = current_node_latch.node.get_all_records()?; for key in keys_to_move { @@ -1151,18 +1234,41 @@ impl BTree { .child_pos .slot_id() .expect("child_pos must be AfterSlot for merge_with_left_sibling"); - ctx.parent_node.delete_at(slot_id)?; - self.update_structural_version(ctx.parent_id); + + let MergeContext { + parent_id, + mut parent_node, + child_pos, + internal_nodes, + metadata_page, + } = ctx; + + parent_node.delete_at(slot_id)?; + self.update_structural_version(parent_id); self.free_latch(current_node_latch)?; - if ctx.parent_node.is_underflow()? { - self.handle_internal_underflow(ctx)?; + let parent_has_underflow = parent_node.is_underflow()?; + + // Store modified left sibling page + dropped_page_latches.push(left_sibling_latch.node.into_page()); + + if parent_has_underflow { + let new_ctx = MergeContext { + parent_id, + parent_node, + child_pos, + internal_nodes, + metadata_page, + }; + return self.handle_internal_underflow(new_ctx, dropped_page_latches); } + // No underflow, so we can write all pages atomically + dropped_page_latches.push(parent_node.into_page()); + self.cache.drop_write_pages(dropped_page_latches); Ok(()) } - /// Frees the latch's page at disk level and removes it from structural version numbers map. fn free_latch(&self, latch: impl Latch) -> Result<(), BTreeError> { self.structural_version_numbers.remove(&latch.page_id()); @@ -1172,7 +1278,11 @@ impl BTree { /// Takes care of restructuring an internal node if it has an underflow. Works similarly to /// [`redistribute_or_merge`] just for internal nodes. - fn handle_internal_underflow(&self, mut ctx: MergeContext) -> Result<(), BTreeError> { + fn handle_internal_underflow( + &self, + mut ctx: MergeContext, + dropped_page_latches: Vec, + ) -> Result<(), BTreeError> { // The node that has an underflow (it was the parent when handling the leaf). let underflow_id = ctx.parent_id; let underflow_node = ctx.parent_node; @@ -1186,6 +1296,7 @@ impl BTree { node: underflow_node, }, ctx.metadata_page.unwrap(), + dropped_page_latches, ); } Some(handle) => handle, @@ -1222,6 +1333,7 @@ impl BTree { node: parent_node, }, separator_slot, + dropped_page_latches, ); } @@ -1249,6 +1361,7 @@ impl BTree { }, separator_slot, new_ctx, + dropped_page_latches, ); } } @@ -1278,6 +1391,7 @@ impl BTree { node: parent_node, }, separator_slot, + dropped_page_latches, ); } @@ -1299,6 +1413,7 @@ impl BTree { node: left_sibling, }, new_ctx, + dropped_page_latches, ) } @@ -1312,6 +1427,7 @@ impl BTree { mut right_sibling_latch: InternalNodeLatch, mut parent_latch: InternalNodeLatch, separator_slot: SlotId, + mut dropped_page_latches: Vec, ) -> Result<(), BTreeError> { // Get current separator from parent let separator_key = parent_latch.node.get_key(separator_slot)?.to_vec(); @@ -1335,6 +1451,12 @@ impl BTree { self.update_structural_version(right_sibling_latch.page_id); self.update_structural_version(parent_latch.page_id); + // Store modified pages for WAL write. + dropped_page_latches.push(underflow_node_latch.node.into_page()); + dropped_page_latches.push(right_sibling_latch.node.into_page()); + dropped_page_latches.push(parent_latch.node.into_page()); + self.cache.drop_write_pages(dropped_page_latches); + Ok(()) } @@ -1348,6 +1470,7 @@ impl BTree { mut left_sibling_latch: InternalNodeLatch, mut parent_latch: InternalNodeLatch, separator_slot: SlotId, + mut dropped_page_latches: Vec, ) -> Result<(), BTreeError> { // Get current separator from parent let separator_key = parent_latch.node.get_key(separator_slot)?.to_vec(); @@ -1369,6 +1492,12 @@ impl BTree { self.update_structural_version(left_sibling_latch.page_id); self.update_structural_version(parent_latch.page_id); + // Store modified pages for WAL write. + dropped_page_latches.push(underflow_node_latch.node.into_page()); + dropped_page_latches.push(left_sibling_latch.node.into_page()); + dropped_page_latches.push(parent_latch.node.into_page()); + self.cache.drop_write_pages(dropped_page_latches); + Ok(()) } @@ -1381,9 +1510,18 @@ impl BTree { mut underflow_node_latch: InternalNodeLatch, right_sibling_latch: InternalNodeLatch, separator_slot: SlotId, - mut ctx: MergeContext, + ctx: MergeContext, + mut dropped_page_latches: Vec, ) -> Result<(), BTreeError> { - let separator_key = ctx.parent_node.get_key(separator_slot)?.to_vec(); + let MergeContext { + parent_id, + mut parent_node, + child_pos, + internal_nodes, + metadata_page, + } = ctx; + + let separator_key = parent_node.get_key(separator_slot)?.to_vec(); let right_leftmost = right_sibling_latch .node @@ -1397,14 +1535,30 @@ impl BTree { let right_records = right_sibling_latch.node.get_all_records()?; self.insert_internal_records(&mut underflow_node_latch.node, right_records)?; - ctx.parent_node.delete_at(separator_slot)?; - self.update_structural_version(ctx.parent_id); + parent_node.delete_at(separator_slot)?; + self.update_structural_version(parent_id); + self.free_latch(right_sibling_latch)?; - if ctx.parent_node.is_underflow()? { - return self.handle_internal_underflow(ctx); + let parent_has_underflow = parent_node.is_underflow()?; + + // Store modified underflow node page + dropped_page_latches.push(underflow_node_latch.node.into_page()); + + if parent_has_underflow { + let new_ctx = MergeContext { + parent_id, + parent_node, + child_pos, + internal_nodes, + metadata_page, + }; + return self.handle_internal_underflow(new_ctx, dropped_page_latches); } + // No underflow, so we can write all pages atomically + dropped_page_latches.push(parent_node.into_page()); + self.cache.drop_write_pages(dropped_page_latches); Ok(()) } @@ -1416,14 +1570,23 @@ impl BTree { &self, underflow_node_latch: InternalNodeLatch, mut left_sibling_latch: InternalNodeLatch, - mut ctx: MergeContext, + ctx: MergeContext, + mut dropped_page_latches: Vec, ) -> Result<(), BTreeError> { - let separator_slot = ctx + let slot_id = ctx .child_pos .slot_id() .expect("child_pos must be AfterSlot for merge_with_left_sibling"); - let separator_key = ctx.parent_node.get_key(separator_slot)?.to_vec(); + let MergeContext { + parent_id, + mut parent_node, + child_pos, + internal_nodes, + metadata_page, + } = ctx; + + let separator_key = parent_node.get_key(slot_id)?.to_vec(); let underflow_leftmost = underflow_node_latch .node @@ -1437,14 +1600,30 @@ impl BTree { let underflow_records = underflow_node_latch.node.get_all_records()?; self.insert_internal_records(&mut left_sibling_latch.node, underflow_records)?; - ctx.parent_node.delete_at(separator_slot)?; - self.update_structural_version(ctx.parent_id); + parent_node.delete_at(slot_id)?; + self.update_structural_version(parent_id); + self.free_latch(underflow_node_latch)?; - if ctx.parent_node.is_underflow()? { - return self.handle_internal_underflow(ctx); + let parent_has_underflow = parent_node.is_underflow()?; + + // Store modified left sibling page + dropped_page_latches.push(left_sibling_latch.node.into_page()); + + if parent_has_underflow { + let new_ctx = MergeContext { + parent_id, + parent_node, + child_pos, + internal_nodes, + metadata_page, + }; + return self.handle_internal_underflow(new_ctx, dropped_page_latches); } + // No underflow, so we can write all pages atomically + dropped_page_latches.push(parent_node.into_page()); + self.cache.drop_write_pages(dropped_page_latches); Ok(()) } @@ -1453,6 +1632,7 @@ impl BTree { &self, root_latch: InternalNodeLatch, mut metadata_page: PinnedWritePage, + mut dropped_page_latches: Vec, ) -> Result<(), BTreeError> { // If root has more than one child we can keep it with the underflow. if root_latch.node.num_children()? == 1 { @@ -1463,9 +1643,18 @@ impl BTree { let metadata = BTreeMetadata::new(child_id); metadata.save_to_page(&mut metadata_page); + // Store modified metadata page + dropped_page_latches.push(metadata_page); + + // Free the old root page self.free_latch(root_latch)?; + } else { + // Root still has multiple children; ensure the modified root page is also flushed. + dropped_page_latches.push(root_latch.node.into_page()); } + // Write all modified pages atomically to WAL + self.cache.drop_write_pages(dropped_page_latches); Ok(()) } diff --git a/storage/src/cache.rs b/storage/src/cache.rs index eed09dd..6a19369 100644 --- a/storage/src/cache.rs +++ b/storage/src/cache.rs @@ -478,7 +478,7 @@ impl Cache { /// Consumes `pages` and send all their diffs to wal as [`MultiPageOperation`]. /// - /// /// This helper is intended for callers that want to explicitly flush a batch of + /// This helper is intended for callers that want to explicitly flush a batch of /// [`PinnedWritePage`]s at once, instead of relying on each page being dropped /// independently. This could be the case if you performed one logical operation that /// required changes in more than one page (e.g. merge in [`BTree`]).