From a31079cb60f81b9d24fc6bde589370f27f3dfa0e Mon Sep 17 00:00:00 2001 From: Aleksander Bielicki Date: Wed, 31 Dec 2025 00:02:46 +0100 Subject: [PATCH 1/8] Add types for transporting Wal stuff from server to cache without exposing all of it --- server/Cargo.toml | 1 + storage/src/write_ahead_log.rs | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/server/Cargo.toml b/server/Cargo.toml index 857367d..23335f6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -8,6 +8,7 @@ rust-version.workspace = true executor = { path = "../executor" } metadata = { path = "../metadata" } engine = { path = "../engine" } +storage = { path = "../storage" } protocol = { path = "../protocol" } types = { path = "../types" } storage = { path = "../storage" } diff --git a/storage/src/write_ahead_log.rs b/storage/src/write_ahead_log.rs index 2c58024..6e5c20f 100644 --- a/storage/src/write_ahead_log.rs +++ b/storage/src/write_ahead_log.rs @@ -38,6 +38,21 @@ pub struct StartHandle { pub(crate) redo_records: Vec<(Lsn, WalRecordData)>, } +/// Handle containing WAL client and redo records for passing to Executor/Cache. +pub struct WalHandle { + pub(crate) wal_client: WalClient, + pub(crate) redo_records: Vec<(Lsn, WalRecordData)>, +} + +impl WalHandle { + fn new(wal_client: WalClient, redo_records: Vec<(Lsn, WalRecordData)>) -> Self { + Self { + wal_client, + redo_records, + } + } +} + impl StartHandle { fn new( handle: JoinHandle<()>, @@ -51,6 +66,13 @@ impl StartHandle { } } + /// Consumes StartHandle and returns WalHandle for passing to Executor/Cache. + /// The JoinHandle remains in StartHandle for cleanup. + pub fn into_wal_handle(self) -> (WalHandle, JoinHandle<()>) { + let wal_handle = WalHandle::new(self.wal_client, self.redo_records); + (wal_handle, self.handle) + } + /// Waits for the WAL thread to finish (when either WalClient is dropped or an error occurs) /// and joins it. pub fn join(self) -> thread::Result<()> { From ffd383910f16ffb041f903556f211a71809822c8 Mon Sep 17 00:00:00 2001 From: Aleksander Bielicki Date: Wed, 31 Dec 2025 22:33:32 +0100 Subject: [PATCH 2/8] Use crossbeam channels instead of std::mpsc --- Cargo.lock | 2 +- server/Cargo.toml | 2 -- storage/Cargo.toml | 2 +- storage/src/background_worker.rs | 9 +++++---- storage/src/cache.rs | 10 +++++----- storage/src/files_manager.rs | 11 ++++++----- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37a66a2..f373815 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -998,7 +998,6 @@ dependencies = [ "parking_lot", "protocol", "rkyv", - "serde", "serde_json", "storage", "thiserror", @@ -1043,6 +1042,7 @@ name = "storage" version = "0.1.0" dependencies = [ "bytemuck", + "crossbeam", "dashmap", "log", "lru", diff --git a/server/Cargo.toml b/server/Cargo.toml index 23335f6..8027f04 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,11 +11,9 @@ engine = { path = "../engine" } storage = { path = "../storage" } protocol = { path = "../protocol" } types = { path = "../types" } -storage = { path = "../storage" } tokio.workspace = true thiserror.workspace = true -serde.workspace = true serde_json.workspace = true dashmap.workspace = true parking_lot.workspace = true diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 047dc36..58a0e48 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -11,7 +11,7 @@ thiserror.workspace = true log.workspace = true parking_lot.workspace = true dashmap.workspace = true - +crossbeam.workspace = true lru = "0.16.0" [dev-dependencies] diff --git a/storage/src/background_worker.rs b/storage/src/background_worker.rs index 08b9841..425c36d 100644 --- a/storage/src/background_worker.rs +++ b/storage/src/background_worker.rs @@ -1,5 +1,6 @@ -use std::{mem, sync::mpsc, thread}; +use std::{mem, thread}; +use crossbeam::channel; use thiserror::Error; /// Error for [`BackgroundWorker`] related operations. @@ -19,19 +20,19 @@ pub struct BackgroundWorkerHandle { /// Handle to the [`BackgroundWorker`]'s thread. handle: thread::JoinHandle<()>, /// Sender end of the channel used for shutting down [`BackgroundWorker`]. - shutdown: Option>, + shutdown: Option>, } impl BackgroundWorkerHandle { /// Creates new [`BackgroundWorkerHandle`]. - pub(crate) fn new(handle: thread::JoinHandle<()>, shutdown: mpsc::Sender<()>) -> Self { + pub(crate) fn new(handle: thread::JoinHandle<()>, shutdown: channel::Sender<()>) -> Self { BackgroundWorkerHandle { handle, shutdown: Some(shutdown), } } - /// Sends signal via [`mpsc::Channel`] to [`BackgroundWorkerHandle::handle`] to shutdown. + /// Sends signal via channel to [`BackgroundWorkerHandle::handle`] to shutdown. /// This function can only be called once for the whole lifetime of this struct. pub fn shutdown(&mut self) -> Result<(), BackgroundWorkerError> { let tx = mem::take(&mut self.shutdown); diff --git a/storage/src/cache.rs b/storage/src/cache.rs index f611aa0..f309a4b 100644 --- a/storage/src/cache.rs +++ b/storage/src/cache.rs @@ -3,12 +3,12 @@ use std::{ sync::{ Arc, atomic::{AtomicBool, AtomicUsize, Ordering}, - mpsc, }, thread, time::Duration, }; +use crossbeam::channel; use dashmap::{DashMap, Entry}; use log::{error, info, warn}; use lru::LruCache; @@ -601,7 +601,7 @@ impl Drop for Cache { struct BackgroundCacheCleaner { cache: Arc, cleanup_interval: Duration, - shutdown: mpsc::Receiver<()>, + shutdown: channel::Receiver<()>, } struct BackgroundCacheCleanerParams { @@ -614,7 +614,7 @@ impl BackgroundWorker for BackgroundCacheCleaner { fn start(params: Self::BackgroundWorkerParams) -> BackgroundWorkerHandle { info!("Starting cache background cleaner"); - let (tx, rx) = mpsc::channel(); + let (tx, rx) = channel::unbounded(); let cleaner = BackgroundCacheCleaner { cache: params.cache, cleanup_interval: params.cleanup_interval, @@ -636,13 +636,13 @@ impl BackgroundCacheCleaner { info!("Shutting down cache background cleaner"); break; } - Err(mpsc::RecvTimeoutError::Timeout) => { + Err(channel::RecvTimeoutError::Timeout) => { info!("Cache background cleaner - syncing frames and lru"); if let Err(e) = self.sync_frames_and_lru() { error!("failed to sync frames and lru: {e}") } } - Err(mpsc::RecvTimeoutError::Disconnected) => { + Err(channel::RecvTimeoutError::Disconnected) => { // Sender dropped - trying to shutdown anyway. info!("Shutting down cache background cleaner (cancellation channel dropped)"); break; diff --git a/storage/src/files_manager.rs b/storage/src/files_manager.rs index 27849c7..e4a6694 100644 --- a/storage/src/files_manager.rs +++ b/storage/src/files_manager.rs @@ -2,11 +2,12 @@ use crate::background_worker::{BackgroundWorker, BackgroundWorkerHandle}; use crate::paged_file::{PagedFile, PagedFileError}; +use crossbeam::channel; use dashmap::DashMap; use log::{error, info}; use parking_lot::Mutex; use std::path::{Path, PathBuf}; -use std::sync::{Arc, mpsc}; +use std::sync::Arc; use std::thread; use std::time::Duration; use thiserror::Error; @@ -139,7 +140,7 @@ impl FilesManager { pub(crate) struct BackgroundFilesManagerCleaner { files_manager: Arc, cleanup_interval: Duration, - shutdown: mpsc::Receiver<()>, + shutdown: channel::Receiver<()>, } pub(crate) struct BackgroundFilesManagerCleanerParams { @@ -152,7 +153,7 @@ impl BackgroundWorker for BackgroundFilesManagerCleaner { fn start(params: Self::BackgroundWorkerParams) -> BackgroundWorkerHandle { info!("Starting files manager background cleaner"); - let (tx, rx) = mpsc::channel(); + let (tx, rx) = channel::unbounded(); let cleaner = BackgroundFilesManagerCleaner { files_manager: params.files_manager, cleanup_interval: params.cleanup_interval, @@ -174,13 +175,13 @@ impl BackgroundFilesManagerCleaner { info!("Shutting down files manager background cleaner"); break; } - Err(mpsc::RecvTimeoutError::Timeout) => { + Err(channel::RecvTimeoutError::Timeout) => { info!("Files manager background cleaner - truncating files"); if let Err(e) = self.truncate_files() { error!("failed to truncate files: {e}") } } - Err(mpsc::RecvTimeoutError::Disconnected) => { + Err(channel::RecvTimeoutError::Disconnected) => { // Sender dropped - trying to shutdown anyway. info!( "Shutting down files manager background cleaner (cancellation channel dropped)" From 22091e4ab379cccbf011554bd85942a673eeeb93 Mon Sep 17 00:00:00 2001 From: Aleksander Bielicki Date: Wed, 31 Dec 2025 23:31:21 +0100 Subject: [PATCH 3/8] Add automatic flushing every so often,modify operations to take in PageRef instead of PageId and convert WAL into BackgroundWorker type shit --- storage/src/cache.rs | 26 +++ storage/src/files_manager.rs | 88 ++++++++ storage/src/write_ahead_log.rs | 396 +++++++++++++++++++++------------ 3 files changed, 367 insertions(+), 143 deletions(-) diff --git a/storage/src/cache.rs b/storage/src/cache.rs index f309a4b..f3e59d2 100644 --- a/storage/src/cache.rs +++ b/storage/src/cache.rs @@ -21,6 +21,7 @@ use crate::{ page_diff::PageDiff, paged_file::{Lsn, Page, PageId, PagedFile, PagedFileError, get_page_lsn, set_page_lsn}, }; +use types::serialization::DbSerializable; /// Structure for referring to single page in the file. #[derive(Debug, PartialEq, Eq, Hash, Clone)] @@ -43,6 +44,31 @@ impl FilePageRef { } } +impl DbSerializable for FilePageRef { + fn serialize(&self, buffer: &mut Vec) { + self.file_key.serialize(buffer); + self.page_id.serialize(buffer); + } + + fn serialize_into(&self, buffer: &mut [u8]) { + let file_key_size = self.file_key.size_serialized(); + self.file_key.serialize_into(&mut buffer[0..file_key_size]); + self.page_id.serialize_into(&mut buffer[file_key_size..]); + } + + fn deserialize( + data: &[u8], + ) -> Result<(Self, &[u8]), types::serialization::DbSerializationError> { + let (file_key, rest) = FileKey::deserialize(data)?; + let (page_id, rest) = PageId::deserialize(rest)?; + Ok((FilePageRef { page_id, file_key }, rest)) + } + + fn size_serialized(&self) -> usize { + self.file_key.size_serialized() + self.page_id.size_serialized() + } +} + /// Wrapper around the [`Page`] and its metadata used for concurrent usage. pub(crate) struct PageFrame { /// Page id and file identifier - unique per frame. diff --git a/storage/src/files_manager.rs b/storage/src/files_manager.rs index e4a6694..91d3400 100644 --- a/storage/src/files_manager.rs +++ b/storage/src/files_manager.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use std::thread; use std::time::Duration; use thiserror::Error; +use types::serialization::DbSerializable; /// Represents possible file types inside a table directory (refer to `docs/file_structure.md` for more /// details) @@ -20,6 +21,40 @@ enum FileType { Index, } +impl DbSerializable for FileType { + fn serialize(&self, buffer: &mut Vec) { + let type_id: u8 = match self { + FileType::Data => 0, + FileType::Index => 1, + }; + type_id.serialize(buffer); + } + + fn serialize_into(&self, buffer: &mut [u8]) { + let type_id: u8 = match self { + FileType::Data => 0, + FileType::Index => 1, + }; + type_id.serialize_into(buffer); + } + + fn deserialize( + data: &[u8], + ) -> Result<(Self, &[u8]), types::serialization::DbSerializationError> { + let (type_id, rest) = u8::deserialize(data)?; + let file_type = match type_id { + 0 => FileType::Data, + 1 => FileType::Index, + _ => return Err(types::serialization::DbSerializationError::FailedToDeserialize), + }; + Ok((file_type, rest)) + } + + fn size_serialized(&self) -> usize { + size_of::() + } +} + /// Helper type created to make referring to files easier and cleaner. #[derive(Eq, Hash, PartialEq, Clone, Debug)] pub struct FileKey { @@ -60,6 +95,59 @@ impl FileKey { } } +impl DbSerializable for FileKey { + fn serialize(&self, buffer: &mut Vec) { + let name_bytes = self.table_name.as_bytes(); + (name_bytes.len() as u16).serialize(buffer); + buffer.extend_from_slice(name_bytes); + self.file_type.serialize(buffer); + } + + fn serialize_into(&self, buffer: &mut [u8]) { + let name_bytes = self.table_name.as_bytes(); + let name_len = name_bytes.len(); + + (name_len as u16).serialize_into(&mut buffer[0..size_of::()]); + + buffer[size_of::()..size_of::() + name_len].copy_from_slice(name_bytes); + + self.file_type.serialize_into(&mut buffer[2 + name_len..]); + } + + fn deserialize( + data: &[u8], + ) -> Result<(Self, &[u8]), types::serialization::DbSerializationError> { + let (name_len, rest) = u16::deserialize(data)?; + let name_len = name_len as usize; + + if rest.len() < name_len { + return Err(types::serialization::DbSerializationError::UnexpectedEnd { + expected: name_len, + actual: rest.len(), + }); + } + + let table_name = std::str::from_utf8(&rest[..name_len]) + .map_err(|_| types::serialization::DbSerializationError::FailedToDeserialize)? + .to_string(); + + let rest = &rest[name_len..]; + let (file_type, rest) = FileType::deserialize(rest)?; + + Ok(( + FileKey { + table_name, + file_type, + }, + rest, + )) + } + + fn size_serialized(&self) -> usize { + size_of::() + self.table_name.len() + size_of::() + } +} + /// Responsible for storing and distributing [`PagedFile`]s of a single database /// to higher level components. pub struct FilesManager { diff --git a/storage/src/write_ahead_log.rs b/storage/src/write_ahead_log.rs index 6e5c20f..58c1687 100644 --- a/storage/src/write_ahead_log.rs +++ b/storage/src/write_ahead_log.rs @@ -1,13 +1,16 @@ -use crate::page_diff::PageDiff; -use crate::paged_file::{AtomicLsn, Lsn, PageId}; +use crate::background_worker::BackgroundWorkerHandle; +use crate::cache::FilePageRef; +use crate::page_diff::PageDiff; +use crate::paged_file::{AtomicLsn, Lsn}; +use crossbeam::channel; +use crossbeam::select; use log::error; use std::fs::File; use std::io::{BufReader, BufWriter, Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::sync::atomic::Ordering; -use std::sync::mpsc::SyncSender; -use std::sync::{Arc, mpsc}; -use std::thread::JoinHandle; +use std::time::Duration; use std::{io, thread, time}; use thiserror::Error; use types::serialization::DbSerializable; @@ -32,12 +35,6 @@ struct RecoveryResult { pub last_lsn: Lsn, } -pub struct StartHandle { - handle: JoinHandle<()>, - pub(crate) wal_client: WalClient, - pub(crate) redo_records: Vec<(Lsn, WalRecordData)>, -} - /// Handle containing WAL client and redo records for passing to Executor/Cache. pub struct WalHandle { pub(crate) wal_client: WalClient, @@ -53,72 +50,67 @@ impl WalHandle { } } -impl StartHandle { - fn new( - handle: JoinHandle<()>, - wal_client: WalClient, - redo_records: Vec<(Lsn, WalRecordData)>, - ) -> Self { - Self { - handle, - wal_client, - redo_records, - } - } - - /// Consumes StartHandle and returns WalHandle for passing to Executor/Cache. - /// The JoinHandle remains in StartHandle for cleanup. - pub fn into_wal_handle(self) -> (WalHandle, JoinHandle<()>) { - let wal_handle = WalHandle::new(self.wal_client, self.redo_records); - (wal_handle, self.handle) - } - - /// Waits for the WAL thread to finish (when either WalClient is dropped or an error occurs) - /// and joins it. - pub fn join(self) -> thread::Result<()> { - self.handle.join() - } -} - struct WalManager { /// The highest LSN assigned so far. current_lsn: Lsn, /// The highest LSN that has been flushed to disk. Atomic for cross-thread access. flushed_lsn: Arc, /// Receiver for incoming WAL requests. - recv: mpsc::Receiver, + recv: channel::Receiver, /// Path to the file where WAL records are stored. log_path: PathBuf, /// Buffered writer for the WAL file. log_file: BufWriter, + /// Receiver for shutdown signal. + shutdown: channel::Receiver<()>, + /// Interval for automatic flush. + flush_interval: Duration, + /// Maximum number of unflushed records before forcing a flush. + max_unflushed_records: u64, } /// Spawns the WAL manager thread and returns a handle for interaction. /// This is the main entry point for using WAL. -pub fn spawn_wal(log_path: impl AsRef) -> Result { - let (sender, recv) = mpsc::sync_channel(1024); - - let (mut manager, recovery, flushed_lsn) = WalManager::with_recovery(recv, log_path)?; +pub fn spawn_wal( + log_path: impl AsRef, + flush_interval: Duration, + max_unflushed_records: u64, +) -> Result<(WalHandle, BackgroundWorkerHandle), WalError> { + let (record_sender, record_receiver) = channel::bounded(1024); + let (shutdown_sender, shutdown_receiver) = channel::unbounded(); + + let (mut manager, recovery, flushed_lsn) = WalManager::with_recovery( + record_receiver, + shutdown_receiver, + log_path, + flush_interval, + max_unflushed_records, + )?; let handle = thread::spawn(move || { manager.run(); }); let client = WalClient { - sender, + sender: record_sender, flushed_lsn, }; - let handle = StartHandle::new(handle, client, recovery.redo_records); - Ok(handle) + let wal_handle = WalHandle::new(client, recovery.redo_records); + let bg_handle = BackgroundWorkerHandle::new(handle, shutdown_sender); + + Ok((wal_handle, bg_handle)) } impl WalManager { /// Creates WalManager after recovering from existing log file. /// Returns the manager, recovery result, and shared flushed_lsn. fn with_recovery( - recv: mpsc::Receiver, + recv: channel::Receiver, + shutdown: channel::Receiver<()>, log_path: impl AsRef, + flush_interval: Duration, + max_unflushed_records: u64, ) -> Result<(Self, RecoveryResult, Arc), WalError> { let path = log_path.as_ref(); let recovery = Self::recover_from_log(path)?; @@ -133,19 +125,52 @@ impl WalManager { recv, log_path: path.to_path_buf(), log_file: BufWriter::new(log_file), + shutdown, + max_unflushed_records, + flush_interval, }; Ok((manager, recovery, flushed_lsn)) } fn run(&mut self) { - while let Ok(req) = self.recv.recv() { - if let Err(err) = self.handle_request(req) { - error!("WAL error occurred: {}", err); + loop { + select! { + recv(self.recv) -> msg => { + match msg { + Ok(req) => { + if let Err(err) = self.handle_request(req) { + error!("WAL error occurred: {}", err); + } + } + Err(_) => { + // Channel closed, shutdown gracefully + self.shutdown_gracefully(); + break; + } + } + } + recv(self.shutdown) -> _ => { + self.shutdown_gracefully(); + break; + } + default(self.flush_interval) => { + if self.current_lsn > self.flushed_lsn.load(Ordering::Acquire) + && let Err(e) = self.flush_internal() { + error!("Periodic WAL flush failed: {}", e); + } + } } } } + fn shutdown_gracefully(&mut self) { + // Flush any remaining records before shutting down + if let Err(e) = self.flush_internal() { + error!("Failed to flush WAL during shutdown: {}", e); + } + } + fn handle_request(&mut self, req: WalRequest) -> Result<(), WalError> { match req { WalRequest::Write(record) => self.handle_write(record), @@ -158,6 +183,9 @@ impl WalManager { match self.append_record(record.data) { Ok(lsn) => { let _ = record.send.send(Some(lsn)); + if lsn - self.flushed_lsn.load(Ordering::Acquire) >= self.max_unflushed_records { + self.flush_internal()?; + } Ok(()) } Err(err) => { @@ -175,7 +203,7 @@ impl WalManager { Ok(record_lsn) } - fn handle_flush(&mut self, sender: SyncSender) -> Result<(), WalError> { + fn handle_flush(&mut self, sender: channel::Sender) -> Result<(), WalError> { let result = self.flush_internal(); let _ = sender.send(result.is_ok()); result @@ -188,7 +216,7 @@ impl WalManager { Ok(()) } - fn handle_checkpoint(&mut self, sender: SyncSender>) -> Result<(), WalError> { + fn handle_checkpoint(&mut self, sender: channel::Sender>) -> Result<(), WalError> { let result = self.perform_checkpoint(); let lsn = match &result { Ok(lsn) => Some(*lsn), @@ -326,14 +354,18 @@ impl WalManager { /// Represents a request to the WAL manager. enum WalRequest { Write(WalRecord), - ForceFlush { sender: SyncSender }, - Checkpoint { sender: SyncSender> }, + ForceFlush { + sender: channel::Sender, + }, + Checkpoint { + sender: channel::Sender>, + }, } /// Represents a WAL record write request along with a channel to send back the assigned LSN. struct WalRecord { data: WalRecordData, - send: SyncSender>, + send: channel::Sender>, } /// Represents the data contained in a WAL record. Can be single-page if only one page is modified, @@ -348,10 +380,27 @@ pub(crate) enum WalRecordData { /// Represents a single page operation in WAL. pub(crate) struct SinglePageOperation { - page_id: PageId, + file_page_ref: FilePageRef, diff: PageDiff, } +impl SinglePageOperation { + pub fn new(file_page_ref: FilePageRef, diff: PageDiff) -> Self { + Self { + file_page_ref, + diff, + } + } + + pub fn file_page_ref(&self) -> &FilePageRef { + &self.file_page_ref + } + + pub fn diff(&self) -> &PageDiff { + &self.diff + } +} + /// Guard for temporary files that ensures deletion on drop unless committed. struct TempFileGuard { path: PathBuf, @@ -382,7 +431,7 @@ impl Drop for TempFileGuard { /// 3. Record type (u8) /// 4. Number of page operations (if multipage) /// 5. For each page operation: -/// a. Page ID (u32) +/// a. PageRef (u32 + FileKey) /// b. Number of diffs (u16) /// c. For each diff: /// i. Offset (u16) @@ -409,13 +458,13 @@ impl WalRecordData { match &self { WalRecordData::SinglePageOperation(op) => { - op.page_id.serialize(&mut buffer); + op.file_page_ref.serialize(&mut buffer); op.diff.serialize(&mut buffer); } WalRecordData::MultiPageOperation(ops) => { (ops.len() as u16).serialize(&mut buffer); for op in ops { - op.page_id.serialize(&mut buffer); + op.file_page_ref.serialize(&mut buffer); op.diff.serialize(&mut buffer); } } @@ -451,10 +500,10 @@ impl WalRecordData { } fn deserialize_single_page_op(data: &[u8], lsn: Lsn) -> Result { - let (page_id, rest) = - PageId::deserialize(data).map_err(|err| WalError::CorruptedRecord { + let (file_page_ref, rest) = + FilePageRef::deserialize(data).map_err(|err| WalError::CorruptedRecord { lsn, - reason: format!("failed to deserialize PageId: {}", err), + reason: format!("failed to deserialize FilePageRef: {}", err), })?; let (diff, _) = PageDiff::deserialize(rest).map_err(|e| WalError::CorruptedRecord { @@ -463,7 +512,7 @@ impl WalRecordData { })?; Ok(WalRecordData::SinglePageOperation(SinglePageOperation { - page_id, + file_page_ref, diff, })) } @@ -477,10 +526,10 @@ impl WalRecordData { let mut ops = Vec::with_capacity(count as usize); for _ in 0..count { - let (page_id, rest) = - PageId::deserialize(data).map_err(|e| WalError::CorruptedRecord { + let (file_page_ref, rest) = + FilePageRef::deserialize(data).map_err(|e| WalError::CorruptedRecord { lsn, - reason: format!("Failed to deserialize PageId: {}", e), + reason: format!("Failed to deserialize FilePageRef: {}", e), })?; data = rest; @@ -491,7 +540,10 @@ impl WalRecordData { })?; data = rest; - ops.push(SinglePageOperation { page_id, diff }); + ops.push(SinglePageOperation { + file_page_ref, + diff, + }); } Ok(WalRecordData::MultiPageOperation(ops)) @@ -510,17 +562,20 @@ impl WalRecordData { /// Client handle for interacting with the WAL from other threads. /// This is the public API for WAL operations. pub(crate) struct WalClient { - sender: SyncSender, + sender: channel::Sender, flushed_lsn: Arc, } impl WalClient { /// Writes a single page operation to WAL and returns the assigned LSN. /// Returns None if the write operation failed. - pub(crate) fn write_single(&self, page_id: PageId, diff: PageDiff) -> Option { - let (send, recv) = mpsc::sync_channel(1); + pub(crate) fn write_single(&self, file_page_ref: FilePageRef, diff: PageDiff) -> Option { + let (send, recv) = channel::bounded(1); let record = WalRecord { - data: WalRecordData::SinglePageOperation(SinglePageOperation { page_id, diff }), + data: WalRecordData::SinglePageOperation(SinglePageOperation { + file_page_ref, + diff, + }), send, }; self.sender.send(WalRequest::Write(record)).ok()?; @@ -529,11 +584,14 @@ impl WalClient { /// Writes multiple page operations as single record to WAL and returns the assigned LSN. /// Returns None if the write operation failed. - pub(crate) fn write_multi(&self, ops: Vec<(PageId, PageDiff)>) -> Option { - let (send, recv) = mpsc::sync_channel(1); + pub(crate) fn write_multi(&self, ops: Vec<(FilePageRef, PageDiff)>) -> Option { + let (send, recv) = channel::bounded(1); let ops = ops .into_iter() - .map(|(page_id, diff)| SinglePageOperation { page_id, diff }) + .map(|(file_page_ref, diff)| SinglePageOperation { + file_page_ref, + diff, + }) .collect(); let record = WalRecord { data: WalRecordData::MultiPageOperation(ops), @@ -546,7 +604,7 @@ impl WalClient { /// Forces a flush of all pending WAL records to disk. /// Returns true if flush succeeded. pub(crate) fn flush(&self) -> bool { - let (sender, recv) = mpsc::sync_channel(1); + let (sender, recv) = channel::bounded(1); if self.sender.send(WalRequest::ForceFlush { sender }).is_err() { return false; } @@ -556,7 +614,7 @@ impl WalClient { /// Requests a checkpoint. Caller must ensure all dirty pages have been flushed first. /// Returns the checkpoint LSN on success. pub(crate) fn checkpoint(&self) -> Option { - let (sender, recv) = mpsc::sync_channel(1); + let (sender, recv) = channel::bounded(1); self.sender.send(WalRequest::Checkpoint { sender }).ok()?; recv.recv().ok()? } @@ -570,15 +628,30 @@ impl WalClient { #[cfg(test)] mod tests { use super::*; + use crate::files_manager::FileKey; + use crate::paged_file::PageId; use std::thread; use tempfile::tempdir; + /// Interval for automatic WAL flush (in milliseconds). + const FLUSH_INTERVAL_MS: Duration = Duration::from_millis(100); + + /// Maximum number of unflushed records before forcing a flush. + const MAX_UNFLUSHED_RECORDS: u64 = 1024; + // ==================== Serialization / Deserialization ==================== + fn make_test_file_page_ref(page_id: PageId) -> FilePageRef { + FilePageRef::new(page_id, FileKey::data("test_table")) + } + fn make_single_page_op(page_id: PageId, data: Vec) -> WalRecordData { let mut diff = PageDiff::default(); diff.write_at(0, data); - WalRecordData::SinglePageOperation(SinglePageOperation { page_id, diff }) + WalRecordData::SinglePageOperation(SinglePageOperation { + file_page_ref: make_test_file_page_ref(page_id), + diff, + }) } fn make_multi_page_op(ops: Vec<(PageId, Vec)>) -> WalRecordData { @@ -587,7 +660,10 @@ mod tests { .map(|(page_id, data)| { let mut diff = PageDiff::default(); diff.write_at(0, data); - SinglePageOperation { page_id, diff } + SinglePageOperation { + file_page_ref: make_test_file_page_ref(page_id), + diff, + } }) .collect(); WalRecordData::MultiPageOperation(ops) @@ -607,7 +683,8 @@ mod tests { match result { WalRecordData::SinglePageOperation(op) => { - assert_eq!(op.page_id, 42); + assert_eq!(op.file_page_ref.page_id(), 42); + assert_eq!(op.file_page_ref.file_key().file_name(), "test_table.tbl"); } _ => panic!("expected SinglePageOperation"), } @@ -621,9 +698,9 @@ mod tests { match result { WalRecordData::MultiPageOperation(ops) => { assert_eq!(ops.len(), 3); - assert_eq!(ops[0].page_id, 1); - assert_eq!(ops[1].page_id, 2); - assert_eq!(ops[2].page_id, 3); + assert_eq!(ops[0].file_page_ref.page_id(), 1); + assert_eq!(ops[1].file_page_ref.page_id(), 2); + assert_eq!(ops[2].file_page_ref.page_id(), 3); } _ => panic!("expected MultiPageOperation"), } @@ -769,113 +846,131 @@ mod tests { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let handle = spawn_wal(&log_path).expect("spawn_wal should succeed"); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).expect("spawn_wal should succeed"); assert!(handle.redo_records.is_empty()); - drop(handle.wal_client); - handle - .handle - .join() - .expect("WAL thread should join cleanly"); + drop(handle); + let _ = bg_handle.shutdown(); } #[test] fn write_single_returns_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let handle = spawn_wal(&log_path).unwrap(); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let mut diff = PageDiff::default(); diff.write_at(0, vec![1, 2, 3]); - let lsn = handle.wal_client.write_single(1, diff); + let lsn = handle + .wal_client + .write_single(make_test_file_page_ref(1), diff); assert_eq!(lsn, Some(1)); - drop(handle.wal_client); - handle.handle.join().unwrap(); + drop(handle); + let _ = bg_handle.shutdown(); } #[test] fn write_single_increments_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let handle = spawn_wal(&log_path).unwrap(); - - let lsn1 = handle.wal_client.write_single(1, PageDiff::default()); - let lsn2 = handle.wal_client.write_single(2, PageDiff::default()); - let lsn3 = handle.wal_client.write_single(3, PageDiff::default()); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + + let lsn1 = handle + .wal_client + .write_single(make_test_file_page_ref(1), PageDiff::default()); + let lsn2 = handle + .wal_client + .write_single(make_test_file_page_ref(2), PageDiff::default()); + let lsn3 = handle + .wal_client + .write_single(make_test_file_page_ref(3), PageDiff::default()); assert_eq!(lsn1, Some(1)); assert_eq!(lsn2, Some(2)); assert_eq!(lsn3, Some(3)); - drop(handle.wal_client); - handle.handle.join().unwrap(); + drop(handle); + let _ = bg_handle.shutdown(); } #[test] fn write_multi_returns_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let handle = spawn_wal(&log_path).unwrap(); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let mut diff1 = PageDiff::default(); diff1.write_at(0, vec![1]); let mut diff2 = PageDiff::default(); diff2.write_at(0, vec![2]); - let lsn = handle.wal_client.write_multi(vec![(1, diff1), (2, diff2)]); + let lsn = handle.wal_client.write_multi(vec![ + (make_test_file_page_ref(1), diff1), + (make_test_file_page_ref(2), diff2), + ]); assert_eq!(lsn, Some(1)); - drop(handle.wal_client); - handle.handle.join().unwrap(); + drop(handle); + let _ = bg_handle.shutdown(); } #[test] fn flush_updates_flushed_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let handle = spawn_wal(&log_path).unwrap(); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); assert_eq!(handle.wal_client.flushed_lsn(), 0); - handle.wal_client.write_single(1, PageDiff::default()); - handle.wal_client.write_single(2, PageDiff::default()); + handle + .wal_client + .write_single(make_test_file_page_ref(1), PageDiff::default()); + handle + .wal_client + .write_single(make_test_file_page_ref(2), PageDiff::default()); let success = handle.wal_client.flush(); assert!(success); assert_eq!(handle.wal_client.flushed_lsn(), 2); - drop(handle.wal_client); - handle.handle.join().unwrap(); + drop(handle); + let _ = bg_handle.shutdown(); } #[test] fn checkpoint_returns_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let handle = spawn_wal(&log_path).unwrap(); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); - handle.wal_client.write_single(1, PageDiff::default()); - handle.wal_client.write_single(2, PageDiff::default()); + handle + .wal_client + .write_single(make_test_file_page_ref(1), PageDiff::default()); + handle + .wal_client + .write_single(make_test_file_page_ref(2), PageDiff::default()); let checkpoint_lsn = handle.wal_client.checkpoint(); assert_eq!(checkpoint_lsn, Some(3)); - drop(handle.wal_client); - handle.handle.join().unwrap(); + drop(handle); + let _ = bg_handle.shutdown(); } #[test] fn checkpoint_truncates_wal_file() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let handle = spawn_wal(&log_path).unwrap(); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // Write several records for _ in 0..10 { - handle.wal_client.write_single(1, PageDiff::default()); + handle + .wal_client + .write_single(make_test_file_page_ref(1), PageDiff::default()); } handle.wal_client.flush(); @@ -888,8 +983,8 @@ mod tests { // After checkpoint, file should be smaller (contains only checkpoint record) assert!(size_after < size_before); - drop(handle.wal_client); - handle.handle.join().unwrap(); + drop(handle); + let _ = bg_handle.shutdown(); } #[test] @@ -899,28 +994,36 @@ mod tests { // First session: write records and checkpoint { - let handle = spawn_wal(&log_path).unwrap(); - handle.wal_client.write_single(1, PageDiff::default()); - handle.wal_client.write_single(2, PageDiff::default()); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + handle + .wal_client + .write_single(make_test_file_page_ref(1), PageDiff::default()); + handle + .wal_client + .write_single(make_test_file_page_ref(2), PageDiff::default()); handle.wal_client.checkpoint(); - handle.wal_client.write_single(3, PageDiff::default()); - handle.wal_client.write_single(4, PageDiff::default()); + handle + .wal_client + .write_single(make_test_file_page_ref(3), PageDiff::default()); + handle + .wal_client + .write_single(make_test_file_page_ref(4), PageDiff::default()); handle.wal_client.flush(); - drop(handle.wal_client); - handle.handle.join().unwrap(); + drop(handle); + let _ = bg_handle.shutdown(); } // Second session: recover { - let handle = spawn_wal(&log_path).unwrap(); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // Should only have records after checkpoint (LSN 4 and 5) assert_eq!(handle.redo_records.len(), 2); assert_eq!(handle.redo_records[0].0, 4); assert_eq!(handle.redo_records[1].0, 5); - drop(handle.wal_client); - handle.handle.join().unwrap(); + drop(handle); + let _ = bg_handle.shutdown(); } } @@ -931,24 +1034,30 @@ mod tests { // First session { - let handle = spawn_wal(&log_path).unwrap(); - handle.wal_client.write_single(1, PageDiff::default()); - handle.wal_client.write_single(2, PageDiff::default()); - handle.wal_client.write_single(3, PageDiff::default()); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + handle + .wal_client + .write_single(make_test_file_page_ref(1), PageDiff::default()); + handle + .wal_client + .write_single(make_test_file_page_ref(2), PageDiff::default()); + handle + .wal_client + .write_single(make_test_file_page_ref(3), PageDiff::default()); handle.wal_client.flush(); - drop(handle.wal_client); - handle.handle.join().unwrap(); + drop(handle); + let _ = bg_handle.shutdown(); } // Second session { - let handle = spawn_wal(&log_path).unwrap(); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // flushed_lsn should be set to last_lsn from recovery assert_eq!(handle.wal_client.flushed_lsn(), 3); - drop(handle.wal_client); - handle.handle.join().unwrap(); + drop(handle); + let _ = bg_handle.shutdown(); } } @@ -971,7 +1080,7 @@ mod tests { fn multiple_writers_concurrent() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let handle = spawn_wal(&log_path).unwrap(); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // Simulate how Cache would share WalClient via Arc let shared = SharedWalClient::new(handle.wal_client); @@ -981,7 +1090,8 @@ mod tests { let client = shared.clone_arc(); thread::spawn(move || { for _ in 0..10 { - let lsn = client.write_single(i, PageDiff::default()); + let lsn = + client.write_single(make_test_file_page_ref(i), PageDiff::default()); assert!(lsn.is_some()); } }) @@ -996,21 +1106,21 @@ mod tests { assert_eq!(shared.0.flushed_lsn(), 100); drop(shared); - handle.handle.join().unwrap(); + let _ = bg_handle.shutdown(); } #[test] fn arc_shared_client_sees_same_flushed_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let handle = spawn_wal(&log_path).unwrap(); + let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let shared = SharedWalClient::new(handle.wal_client); let client1 = shared.clone_arc(); let client2 = shared.clone_arc(); - client1.write_single(1, PageDiff::default()); - client1.write_single(2, PageDiff::default()); + client1.write_single(make_test_file_page_ref(1), PageDiff::default()); + client1.write_single(make_test_file_page_ref(2), PageDiff::default()); client1.flush(); // Both Arc references should see the same flushed_lsn @@ -1020,7 +1130,7 @@ mod tests { drop(client1); drop(client2); drop(shared); - handle.handle.join().unwrap(); + let _ = bg_handle.shutdown(); } // ==================== Error Cases ==================== From 3291f813ead4c81c4a1fa78f055b5bf601510112 Mon Sep 17 00:00:00 2001 From: Aleksander Bielicki Date: Thu, 1 Jan 2026 01:08:38 +0100 Subject: [PATCH 4/8] Add optional WAL to cache --- engine/src/b_tree.rs | 2 +- engine/src/heap_file.rs | 2 +- executor/src/consts.rs | 6 +++ executor/src/lib.rs | 21 ++++++++-- storage/src/cache.rs | 73 +++++++++++++++++++++------------- storage/src/write_ahead_log.rs | 42 ++++++++++++------- 6 files changed, 99 insertions(+), 47 deletions(-) diff --git a/engine/src/b_tree.rs b/engine/src/b_tree.rs index 00d5d68..0bbc0f5 100644 --- a/engine/src/b_tree.rs +++ b/engine/src/b_tree.rs @@ -1569,7 +1569,7 @@ mod test { fs::create_dir_all(&db_dir).unwrap(); let files_manager = Arc::new(FilesManager::new(db_dir).unwrap()); - let cache = Cache::new(2000, files_manager.clone()); + let cache = Cache::new(2000, files_manager.clone(), None); // Use a unique file name for each test to avoid conflicts let file_key = FileKey::index(format!( diff --git a/engine/src/heap_file.rs b/engine/src/heap_file.rs index ed5f50c..31fd39a 100644 --- a/engine/src/heap_file.rs +++ b/engine/src/heap_file.rs @@ -2238,7 +2238,7 @@ mod tests { fs::create_dir_all(&db_dir).unwrap(); let files_manager = Arc::new(FilesManager::new(db_dir).unwrap()); - let cache = Cache::new(100, files_manager.clone()); + let cache = Cache::new(100, files_manager.clone(), None); // Use a unique file name for each test to avoid conflicts let file_key = FileKey::data(format!( diff --git a/executor/src/consts.rs b/executor/src/consts.rs index 412f5ce..b17802d 100644 --- a/executor/src/consts.rs +++ b/executor/src/consts.rs @@ -8,3 +8,9 @@ pub(crate) const CACHE_CLEANUP_INTERVAL: Duration = Duration::from_secs(120); /// Interval at which the files manager performs background cleanup (truncate files, removing unused pages). pub(crate) const FILES_MANAGER_CLEANUP_INTERVAL: Duration = Duration::from_secs(300); + +/// Interval for automatic WAL flush (in milliseconds). +pub(crate) const WAL_FLUSH_INTERVAL_MS: Duration = Duration::from_millis(100); + +/// Maximum number of unflushed records before forcing a flush. +pub(crate) const WAL_MAX_UNFLUSHED_RECORDS: u64 = 1024; diff --git a/executor/src/lib.rs b/executor/src/lib.rs index 64cfccb..ff3cc20 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -20,6 +20,7 @@ use engine::heap_file::{HeapFile, HeapFileFactory}; use metadata::catalog::Catalog; use parking_lot::RwLock; use planner::{query_plan::StatementPlan, resolved_tree::ResolvedTree}; +use storage::write_ahead_log::{WalError, spawn_wal}; use storage::{ background_worker::BackgroundWorkerHandle, cache::Cache, @@ -39,13 +40,15 @@ pub struct Executor { pub enum ExecutorError { #[error("Cannot open files manager: {0}")] CannotOpenFilesManager(#[from] FilesManagerError), + #[error("Cannot open write-ahead log: {0}")] + CannotOpenWAL(#[from] WalError), } impl Executor { /// Creates new [`Executor`] for database at `database_path`. pub fn new(database_path: impl AsRef, catalog: Catalog) -> Result { let files = Arc::new(FilesManager::new(database_path)?); - let cache = Cache::new(consts::CACHE_SIZE, files); + let cache = Cache::new(consts::CACHE_SIZE, files, None); let catalog = Arc::new(RwLock::new(catalog)); Ok(Executor { heap_files: DashMap::new(), @@ -62,15 +65,23 @@ impl Executor { catalog: Catalog, ) -> Result<(Self, Vec), ExecutorError> { let (files, files_background_worker) = FilesManager::with_background_cleaner( - database_path, + database_path.as_ref(), consts::FILES_MANAGER_CLEANUP_INTERVAL, )?; + let (wal_handle, wal_background_worker) = spawn_wal( + database_path, + consts::WAL_FLUSH_INTERVAL_MS, + consts::WAL_MAX_UNFLUSHED_RECORDS, + )?; + let (cache, cache_background_worker) = Cache::with_background_cleaner( consts::CACHE_SIZE, files, consts::CACHE_CLEANUP_INTERVAL, + Some(wal_handle), ); + let catalog = Arc::new(RwLock::new(catalog)); let executor = Executor { heap_files: DashMap::new(), @@ -78,7 +89,11 @@ impl Executor { cache, catalog, }; - let workers = vec![cache_background_worker, files_background_worker]; + let workers = vec![ + cache_background_worker, + files_background_worker, + wal_background_worker, + ]; Ok((executor, workers)) } diff --git a/storage/src/cache.rs b/storage/src/cache.rs index f3e59d2..d3c5fd1 100644 --- a/storage/src/cache.rs +++ b/storage/src/cache.rs @@ -15,6 +15,7 @@ use lru::LruCache; use parking_lot::{MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use thiserror::Error; +use crate::write_ahead_log::{WalClient, WalHandle}; use crate::{ background_worker::{BackgroundWorker, BackgroundWorkerHandle}, files_manager::{FileKey, FilesManager, FilesManagerError}, @@ -292,16 +293,24 @@ pub struct Cache { lru: Arc>>, /// Pointer to [`FilesManager`], used for file operations when page must be loaded from/flushed to disk. files: Arc, + /// Client for writing to WAL. + wal_client: Option, + /// Maximum capacity of the cache (number of frames). capacity: usize, } impl Cache { /// Creates new [`Cache`] that handles frames for single database. - pub fn new(capacity: usize, files: Arc) -> Arc { + pub fn new( + capacity: usize, + files: Arc, + wal_client: Option, + ) -> Arc { Arc::new(Self { frames: DashMap::with_capacity(capacity), lru: Arc::new(RwLock::new(LruCache::new(NonZero::new(capacity).unwrap()))), files, + wal_client, capacity, }) } @@ -311,12 +320,20 @@ impl Cache { capacity: usize, files: Arc, cleanup_interval: Duration, + wal_handle: Option, ) -> (Arc, BackgroundWorkerHandle) { - let cache = Self::new(capacity, files); + let (wal_client, redo_records) = match wal_handle { + Some(handle) => (Some(handle.wal_client), Some(handle.redo_records)), + None => (None, None), + }; + let cache = Self::new(capacity, files, wal_client); let cleaner = BackgroundCacheCleaner::start(BackgroundCacheCleanerParams { cache: cache.clone(), cleanup_interval, }); + if let Some(redo_records) = redo_records { + // TODO: Apply redo records to cache + } (cache, cleaner) } @@ -787,7 +804,7 @@ mod tests { let file_key = FileKey::data("table1"); let page_id = alloc_page_with_u64(&files, &file_key, 7); - let cache = Cache::new(1, files.clone()); + let cache = Cache::new(1, files.clone(), None); let id = FilePageRef { page_id, @@ -808,7 +825,7 @@ mod tests { let file_key = FileKey::data("table1"); let page_id = alloc_page_with_u64(&files, &file_key, 7); - let cache = Cache::new(2, files.clone()); + let cache = Cache::new(2, files.clone(), None); let id = FilePageRef { page_id, @@ -839,7 +856,7 @@ mod tests { let page_id2 = alloc_page_with_u64(&files, &file_key, 2); let page_id3 = alloc_page_with_u64(&files, &file_key, 3); - let cache = Cache::new(3, files.clone()); + let cache = Cache::new(3, files.clone(), None); let id1 = FilePageRef { page_id: page_id1, @@ -883,7 +900,7 @@ mod tests { let page_id2 = alloc_page_with_u64(&files, &file_key, 2); let page_id3 = alloc_page_with_u64(&files, &file_key, 3); - let cache = Cache::new(3, files.clone()); + let cache = Cache::new(3, files.clone(), None); let id1 = FilePageRef { page_id: page_id1, @@ -922,7 +939,7 @@ mod tests { let file_key = FileKey::data("table1"); let page_id = alloc_page_with_u64(&files, &file_key, 0xdeadbeefu64); - let cache = Cache::new(2, files.clone()); + let cache = Cache::new(2, files.clone(), None); let id = FilePageRef { page_id, @@ -982,7 +999,7 @@ mod tests { let id2 = alloc_page_with_u64(&files, &file_key, 102); let id3 = alloc_page_with_u64(&files, &file_key, 103); - let cache = Cache::new(1, files.clone()); + let cache = Cache::new(1, files.clone(), None); let fp1 = FilePageRef { page_id: id1, @@ -1076,7 +1093,7 @@ mod tests { let files = create_files_manager(); let file_key = FileKey::data("table1"); let pid = alloc_page_with_u64(&files, &file_key, 0x1111); - let cache = Cache::new(1, files.clone()); + let cache = Cache::new(1, files.clone(), None); let id = FilePageRef { page_id: pid, file_key: file_key.clone(), @@ -1110,7 +1127,7 @@ mod tests { let files = create_files_manager(); let file_key = FileKey::data("table1"); - let cache = Cache::new(1, files.clone()); + let cache = Cache::new(1, files.clone(), None); // Allocate page via cache let (mut w, pid) = cache.allocate_page(&file_key).expect("allocate_page"); @@ -1145,7 +1162,7 @@ mod tests { page_id: pid, file_key: fk.clone(), }; - let cache = Cache::new(1, files.clone()); + let cache = Cache::new(1, files.clone(), None); let writers = 4; let start = Arc::new(Barrier::new(writers + 1)); @@ -1192,7 +1209,7 @@ mod tests { let files = create_files_manager(); let file_key = FileKey::data("table_alloc"); - let cache = Cache::new(1, files.clone()); + let cache = Cache::new(1, files.clone(), None); let (mut pinned, _) = cache .allocate_page(&file_key) @@ -1228,7 +1245,7 @@ mod tests { file_key: fk.clone(), }; - let cache = Cache::new(1, files.clone()); + let cache = Cache::new(1, files.clone(), None); { let h = spawn_check_page(cache.clone(), id.clone(), 0xAA); @@ -1254,7 +1271,7 @@ mod tests { let pid2 = alloc_page_with_u64(&files, &file_key, 2); let pid3 = alloc_page_with_u64(&files, &file_key, 3); - let cache = Cache::new(2, files.clone()); + let cache = Cache::new(2, files.clone(), None); let fp1 = FilePageRef { page_id: pid1, @@ -1306,7 +1323,7 @@ mod tests { // start cache with a short cleanup interval let (cache, mut cleaner) = - Cache::with_background_cleaner(2, files.clone(), Duration::from_millis(50)); + Cache::with_background_cleaner(2, files.clone(), Duration::from_millis(50), None); // create a frame and insert it directly into frames map WITHOUT adding to LRU let page: Page = [0u8; 4096]; @@ -1343,7 +1360,7 @@ mod tests { }; let (cache, mut cleaner) = - Cache::with_background_cleaner(2, files.clone(), Duration::from_millis(50)); + Cache::with_background_cleaner(2, files.clone(), Duration::from_millis(50), None); let page: Page = [0u8; 4096]; let frame = Arc::new(PageFrame::new(id.clone(), page)); @@ -1389,7 +1406,7 @@ mod tests { }; let (cache, mut cleaner) = - Cache::with_background_cleaner(3, files.clone(), Duration::from_millis(50)); + Cache::with_background_cleaner(3, files.clone(), Duration::from_millis(50), None); let page: Page = [0u8; 4096]; @@ -1437,7 +1454,7 @@ mod tests { file_key: file_key.clone(), }; - let cache = Cache::new(1, files.clone()); + let cache = Cache::new(1, files.clone(), None); { let mut w = cache.pin_write(&id).expect("pin_write failed"); @@ -1463,7 +1480,7 @@ mod tests { let pid2 = alloc_page_with_u64(&files, &file_key, 0x2222); let pid3 = alloc_page_with_u64(&files, &file_key, 0x3333); - let cache = Cache::new(5, files.clone()); + let cache = Cache::new(5, files.clone(), None); let id1 = FilePageRef { page_id: pid1, @@ -1518,7 +1535,7 @@ mod tests { let pid1 = alloc_page_with_u64(&files, &file_key1, 0x1111); let pid2 = alloc_page_with_u64(&files, &file_key2, 0x2222); - let cache = Cache::new(5, files.clone()); + let cache = Cache::new(5, files.clone(), None); let id1 = FilePageRef { page_id: pid1, @@ -1554,7 +1571,7 @@ mod tests { let file_key = FileKey::data("table_remove_dirty"); let pid = alloc_page_with_u64(&files, &file_key, 0x1111); - let cache = Cache::new(5, files.clone()); + let cache = Cache::new(5, files.clone(), None); let id = FilePageRef { page_id: pid, @@ -1594,7 +1611,7 @@ mod tests { let files = create_files_manager(); let file_key = FileKey::data("table_empty"); - let cache = Cache::new(5, files.clone()); + let cache = Cache::new(5, files.clone(), None); // Try to remove pages from a file that has no cached pages cache @@ -1610,7 +1627,7 @@ mod tests { let file_key = FileKey::data("table_remove_held"); let pid = alloc_page_with_u64(&files, &file_key, 0x1111); - let cache = Cache::new(5, files.clone()); + let cache = Cache::new(5, files.clone(), None); let id = FilePageRef { page_id: pid, @@ -1664,7 +1681,7 @@ mod tests { let pid1 = alloc_page_with_u64(&files, &file_key, 0x1111); let pid2 = alloc_page_with_u64(&files, &file_key, 0x2222); - let cache = Cache::new(5, files.clone()); + let cache = Cache::new(5, files.clone(), None); let id1 = FilePageRef { page_id: pid1, @@ -1715,7 +1732,7 @@ mod tests { let pid1 = alloc_page_with_u64(&files, &file_key1, 0x1111); let pid2 = alloc_page_with_u64(&files, &file_key2, 0x2222); - let cache = Cache::new(5, files.clone()); + let cache = Cache::new(5, files.clone(), None); let id1 = FilePageRef { page_id: pid1, @@ -1771,7 +1788,7 @@ mod tests { let file_key = FileKey::data("table_remove_held_flush"); let pid = alloc_page_with_u64(&files, &file_key, 0x1111); - let cache = Cache::new(5, files.clone()); + let cache = Cache::new(5, files.clone(), None); let id = FilePageRef { page_id: pid, @@ -1820,7 +1837,7 @@ mod tests { let files = create_files_manager(); let file_key = FileKey::data("table_empty_remove"); - let cache = Cache::new(5, files.clone()); + let cache = Cache::new(5, files.clone(), None); // Remove file that has no cached pages cache @@ -1839,7 +1856,7 @@ mod tests { let pid1 = alloc_page_with_u64(&files, &file_key1, 0x1111); let pid2 = alloc_page_with_u64(&files, &file_key2, 0x2222); - let cache = Cache::new(5, files.clone()); + let cache = Cache::new(5, files.clone(), None); let id1 = FilePageRef { page_id: pid1, diff --git a/storage/src/write_ahead_log.rs b/storage/src/write_ahead_log.rs index 58c1687..82cbce4 100644 --- a/storage/src/write_ahead_log.rs +++ b/storage/src/write_ahead_log.rs @@ -561,7 +561,7 @@ impl WalRecordData { /// Client handle for interacting with the WAL from other threads. /// This is the public API for WAL operations. -pub(crate) struct WalClient { +pub struct WalClient { sender: channel::Sender, flushed_lsn: Arc, } @@ -846,7 +846,9 @@ mod tests { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).expect("spawn_wal should succeed"); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS) + .expect("spawn_wal should succeed"); assert!(handle.redo_records.is_empty()); drop(handle); @@ -857,7 +859,8 @@ mod tests { fn write_single_returns_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let mut diff = PageDiff::default(); diff.write_at(0, vec![1, 2, 3]); @@ -875,7 +878,8 @@ mod tests { fn write_single_increments_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let lsn1 = handle .wal_client @@ -899,7 +903,8 @@ mod tests { fn write_multi_returns_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let mut diff1 = PageDiff::default(); diff1.write_at(0, vec![1]); @@ -920,7 +925,8 @@ mod tests { fn flush_updates_flushed_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); assert_eq!(handle.wal_client.flushed_lsn(), 0); @@ -943,7 +949,8 @@ mod tests { fn checkpoint_returns_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); handle .wal_client @@ -964,7 +971,8 @@ mod tests { fn checkpoint_truncates_wal_file() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // Write several records for _ in 0..10 { @@ -994,7 +1002,8 @@ mod tests { // First session: write records and checkpoint { - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); handle .wal_client .write_single(make_test_file_page_ref(1), PageDiff::default()); @@ -1015,7 +1024,8 @@ mod tests { // Second session: recover { - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // Should only have records after checkpoint (LSN 4 and 5) assert_eq!(handle.redo_records.len(), 2); @@ -1034,7 +1044,8 @@ mod tests { // First session { - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); handle .wal_client .write_single(make_test_file_page_ref(1), PageDiff::default()); @@ -1051,7 +1062,8 @@ mod tests { // Second session { - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // flushed_lsn should be set to last_lsn from recovery assert_eq!(handle.wal_client.flushed_lsn(), 3); @@ -1080,7 +1092,8 @@ mod tests { fn multiple_writers_concurrent() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // Simulate how Cache would share WalClient via Arc let shared = SharedWalClient::new(handle.wal_client); @@ -1113,7 +1126,8 @@ mod tests { fn arc_shared_client_sees_same_flushed_lsn() { let dir = tempdir().unwrap(); let log_path = dir.path().join("wal.log"); - let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let (handle, mut bg_handle) = + spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let shared = SharedWalClient::new(handle.wal_client); let client1 = shared.clone_arc(); From a0a847005cf74c6349afb862261445f6e9e482cb Mon Sep 17 00:00:00 2001 From: Aleksander Bielicki Date: Thu, 1 Jan 2026 01:21:57 +0100 Subject: [PATCH 5/8] Fix WAL not appending its log file path to the gotten path and tests in executor expecting 2 background workers --- executor/src/lib.rs | 4 ++-- storage/src/write_ahead_log.rs | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/executor/src/lib.rs b/executor/src/lib.rs index ff3cc20..54875ae 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -1383,8 +1383,8 @@ mod tests { let (executor, mut workers) = Executor::with_background_workers(db_path, catalog) .expect("with_background_workers should succeed"); - // We expect two background workers (cache and files manager) - assert_eq!(workers.len(), 2); + // We expect three background workers (cache and files manager and wal) + assert_eq!(workers.len(), 3); // Shutdown and join all workers to ensure threads are started and can be stopped. while let Some(mut handle) = workers.pop() { diff --git a/storage/src/write_ahead_log.rs b/storage/src/write_ahead_log.rs index 82cbce4..f07ea83 100644 --- a/storage/src/write_ahead_log.rs +++ b/storage/src/write_ahead_log.rs @@ -27,6 +27,8 @@ pub enum WalError { RecoveryFailed(String), } +const WAL_LOG_FILE_NAME: &str = "wal.log"; + /// Result of WAL recovery process. struct RecoveryResult { /// Records that need to be redone (after last checkpoint). @@ -72,13 +74,15 @@ struct WalManager { /// Spawns the WAL manager thread and returns a handle for interaction. /// This is the main entry point for using WAL. pub fn spawn_wal( - log_path: impl AsRef, + log_dir_path: impl AsRef, flush_interval: Duration, max_unflushed_records: u64, ) -> Result<(WalHandle, BackgroundWorkerHandle), WalError> { let (record_sender, record_receiver) = channel::bounded(1024); let (shutdown_sender, shutdown_receiver) = channel::unbounded(); + let log_path = log_dir_path.as_ref().join(WAL_LOG_FILE_NAME); + let (mut manager, recovery, flushed_lsn) = WalManager::with_recovery( record_receiver, shutdown_receiver, From daf1d9e6c372902bdbab48b9915e2fb627403be2 Mon Sep 17 00:00:00 2001 From: Aleksander Bielicki Date: Thu, 1 Jan 2026 01:34:24 +0100 Subject: [PATCH 6/8] Fix WAL path issues in tests --- storage/src/write_ahead_log.rs | 63 ++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/storage/src/write_ahead_log.rs b/storage/src/write_ahead_log.rs index f07ea83..678b76e 100644 --- a/storage/src/write_ahead_log.rs +++ b/storage/src/write_ahead_log.rs @@ -786,15 +786,16 @@ mod tests { #[test] fn recover_multiple_records() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); - let mut file = File::create(&log_path).unwrap(); + let log_path = dir.path(); + let file_path = log_path.join(WAL_LOG_FILE_NAME); + let mut file = File::create(&file_path).unwrap(); write_record_to_file(&mut file, make_single_page_op(1, vec![1]), 1); write_record_to_file(&mut file, make_single_page_op(2, vec![2]), 2); write_record_to_file(&mut file, make_single_page_op(3, vec![3]), 3); drop(file); - let result = WalManager::recover_from_log(&log_path).unwrap(); + let result = WalManager::recover_from_log(&file_path).unwrap(); assert_eq!(result.redo_records.len(), 3); assert_eq!(result.last_lsn, 3); @@ -803,8 +804,9 @@ mod tests { #[test] fn recover_clears_records_on_checkpoint() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); - let mut file = File::create(&log_path).unwrap(); + let log_path = dir.path(); + let file_path = log_path.join(WAL_LOG_FILE_NAME); + let mut file = File::create(&file_path).unwrap(); write_record_to_file(&mut file, make_single_page_op(1, vec![1]), 1); write_record_to_file(&mut file, make_single_page_op(2, vec![2]), 2); @@ -816,7 +818,7 @@ mod tests { write_record_to_file(&mut file, make_single_page_op(3, vec![3]), 4); drop(file); - let result = WalManager::recover_from_log(&log_path).unwrap(); + let result = WalManager::recover_from_log(&file_path).unwrap(); // Only record after checkpoint should be in redo_records assert_eq!(result.redo_records.len(), 1); @@ -827,8 +829,10 @@ mod tests { #[test] fn recover_with_only_checkpoint() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); - let mut file = File::create(&log_path).unwrap(); + let log_path = dir.path(); + let (_, _) = spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let file_path = log_path.join(WAL_LOG_FILE_NAME); + let mut file = File::create(&file_path).unwrap(); write_record_to_file( &mut file, @@ -837,7 +841,7 @@ mod tests { ); drop(file); - let result = WalManager::recover_from_log(&log_path).unwrap(); + let result = WalManager::recover_from_log(&file_path).unwrap(); assert!(result.redo_records.is_empty()); assert_eq!(result.last_lsn, 10); @@ -848,11 +852,9 @@ mod tests { #[test] fn spawn_wal_returns_valid_handle() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); - + let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS) - .expect("spawn_wal should succeed"); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); assert!(handle.redo_records.is_empty()); drop(handle); @@ -862,9 +864,9 @@ mod tests { #[test] fn write_single_returns_lsn() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); + let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let mut diff = PageDiff::default(); diff.write_at(0, vec![1, 2, 3]); @@ -881,9 +883,9 @@ mod tests { #[test] fn write_single_increments_lsn() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); + let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let lsn1 = handle .wal_client @@ -906,9 +908,9 @@ mod tests { #[test] fn write_multi_returns_lsn() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); + let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let mut diff1 = PageDiff::default(); diff1.write_at(0, vec![1]); @@ -928,7 +930,7 @@ mod tests { #[test] fn flush_updates_flushed_lsn() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); + let log_path = dir.path(); let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); @@ -952,7 +954,7 @@ mod tests { #[test] fn checkpoint_returns_lsn() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); + let log_path = dir.path(); let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); @@ -974,9 +976,10 @@ mod tests { #[test] fn checkpoint_truncates_wal_file() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); + let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + let file_path = log_path.join(WAL_LOG_FILE_NAME); // Write several records for _ in 0..10 { @@ -986,11 +989,11 @@ mod tests { } handle.wal_client.flush(); - let size_before = std::fs::metadata(&log_path).unwrap().len(); + let size_before = std::fs::metadata(&file_path).unwrap().len(); handle.wal_client.checkpoint(); - let size_after = std::fs::metadata(&log_path).unwrap().len(); + let size_after = std::fs::metadata(&file_path).unwrap().len(); // After checkpoint, file should be smaller (contains only checkpoint record) assert!(size_after < size_before); @@ -1002,7 +1005,7 @@ mod tests { #[test] fn recovery_after_checkpoint_only_contains_post_checkpoint_records() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); + let log_path = dir.path(); // First session: write records and checkpoint { @@ -1044,7 +1047,7 @@ mod tests { #[test] fn flushed_lsn_persists_across_recovery() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); + let log_path = dir.path(); // First session { @@ -1095,7 +1098,7 @@ mod tests { #[test] fn multiple_writers_concurrent() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); + let log_path = dir.path(); let (handle, mut bg_handle) = spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); @@ -1129,9 +1132,9 @@ mod tests { #[test] fn arc_shared_client_sees_same_flushed_lsn() { let dir = tempdir().unwrap(); - let log_path = dir.path().join("wal.log"); + let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); let shared = SharedWalClient::new(handle.wal_client); let client1 = shared.clone_arc(); From 296ff262915aac8b7f20e1eca95806f573a0957d Mon Sep 17 00:00:00 2001 From: Aleksander Bielicki Date: Thu, 1 Jan 2026 01:37:15 +0100 Subject: [PATCH 7/8] Fix copilot review --- storage/src/files_manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/src/files_manager.rs b/storage/src/files_manager.rs index 91d3400..592f71b 100644 --- a/storage/src/files_manager.rs +++ b/storage/src/files_manager.rs @@ -111,7 +111,8 @@ impl DbSerializable for FileKey { buffer[size_of::()..size_of::() + name_len].copy_from_slice(name_bytes); - self.file_type.serialize_into(&mut buffer[2 + name_len..]); + self.file_type + .serialize_into(&mut buffer[size_of::() + name_len..]); } fn deserialize( From 9bafbb06e370f3ea042bf0be4408358753512b09 Mon Sep 17 00:00:00 2001 From: Aleksander Bielicki Date: Thu, 1 Jan 2026 15:49:19 +0100 Subject: [PATCH 8/8] Use repr u8 --- storage/src/files_manager.rs | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/storage/src/files_manager.rs b/storage/src/files_manager.rs index 592f71b..5ef3204 100644 --- a/storage/src/files_manager.rs +++ b/storage/src/files_manager.rs @@ -11,11 +11,12 @@ use std::sync::Arc; use std::thread; use std::time::Duration; use thiserror::Error; -use types::serialization::DbSerializable; +use types::serialization::{DbSerializable, DbSerializationError}; /// Represents possible file types inside a table directory (refer to `docs/file_structure.md` for more /// details) -#[derive(Eq, PartialEq, Hash, Clone, Debug)] +#[derive(Eq, PartialEq, Hash, Clone, Debug, Copy)] +#[repr(u8)] enum FileType { Data, Index, @@ -23,30 +24,22 @@ enum FileType { impl DbSerializable for FileType { fn serialize(&self, buffer: &mut Vec) { - let type_id: u8 = match self { - FileType::Data => 0, - FileType::Index => 1, - }; - type_id.serialize(buffer); + (*self as u8).serialize(buffer); } fn serialize_into(&self, buffer: &mut [u8]) { - let type_id: u8 = match self { - FileType::Data => 0, - FileType::Index => 1, - }; - type_id.serialize_into(buffer); + (*self as u8).serialize_into(buffer); } - fn deserialize( - data: &[u8], - ) -> Result<(Self, &[u8]), types::serialization::DbSerializationError> { - let (type_id, rest) = u8::deserialize(data)?; - let file_type = match type_id { + fn deserialize(data: &[u8]) -> Result<(Self, &[u8]), DbSerializationError> { + let (value, rest) = u8::deserialize(data)?; + + let file_type = match value { 0 => FileType::Data, 1 => FileType::Index, - _ => return Err(types::serialization::DbSerializationError::FailedToDeserialize), + _ => return Err(DbSerializationError::FailedToDeserialize), }; + Ok((file_type, rest)) }