From fa4b327bd932af83135378d34bcbe05607eb8fdc Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Sat, 12 Jul 2025 21:03:48 +0200 Subject: [PATCH 01/14] - --- Cargo.toml | 4 ++ src/write_ahead_log/mod.rs | 3 +- src/write_ahead_log/segment_wal.rs | 85 ++++++++++++++++++++++++++ src/write_ahead_log/write_ahead_log.rs | 2 +- tests/write_ahead_log.rs | 0 tests/write_ahead_log_test.rs | 8 +++ 6 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 src/write_ahead_log/segment_wal.rs delete mode 100644 tests/write_ahead_log.rs create mode 100644 tests/write_ahead_log_test.rs diff --git a/Cargo.toml b/Cargo.toml index c66312c..5e1d427 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,3 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] + + +[dev-dependencies] +tempfile = "3.20.0" \ No newline at end of file diff --git a/src/write_ahead_log/mod.rs b/src/write_ahead_log/mod.rs index 394fe71..81dee1a 100644 --- a/src/write_ahead_log/mod.rs +++ b/src/write_ahead_log/mod.rs @@ -1 +1,2 @@ -pub mod write_ahead_log; \ No newline at end of file +pub mod write_ahead_log; +pub mod segment_wal; \ No newline at end of file diff --git a/src/write_ahead_log/segment_wal.rs b/src/write_ahead_log/segment_wal.rs new file mode 100644 index 0000000..1ffef3d --- /dev/null +++ b/src/write_ahead_log/segment_wal.rs @@ -0,0 +1,85 @@ +use std::{io::{Error, SeekFrom}, path::PathBuf}; +use crate::write_ahead_log::write_ahead_log::WriteAheadLog; + +const WAL_SIGNATURE: [u8; 24] = *b"SegmentWal_2025.7.12____"; +const SEGMENT_FILE_SIGNATURE: [u8; 8] = *b"Log_File"; +const DATA_FILE_SIGNATURE: [u8; 8] = *b"Data____"; + +pub struct SegmentWal { + dir_path: PathBuf, + data_file: std::fs::File, +} + +enum LogOperation { + Read(u64), + Write(Vec), + Seek(SeekFrom), + StreamLen(u64), + StreamPosition(u64), + SetLen(u64), +} + +struct LogEntry { + log_sequence_number: u64, + operation: LogOperation, + checksum: u64, + persistent: bool, +} + + +impl SegmentWal { + pub fn new_wal_at_directory(dir_path: PathBuf) -> Self { + assert!(dir_path.exists(), "Directory does not exist: {:?}", dir_path); + assert!(dir_path.is_dir(), "Path is not a directory: {:?}", dir_path); + assert!(dir_path.read_dir().expect("Could not read directory").next().is_none(), "Directory is not empty: {:?}", dir_path); + + let data_file_path = dir_path.join("wal.data"); + let data_file = std::fs::File::create(&data_file_path) + .expect("Failed to create data file"); + + //get current system time for the log file name + let log_file_name = format!("segment_{}.log", 1); + + Self { + dir_path, + data_file, + } + } +} + +impl WriteAheadLog for SegmentWal { + fn read(&mut self, size: u64) -> Result, Error> { + // Boilerplate implementation + Ok(vec![0; size as usize]) // Return a vector of zeros of the requested size + } + + fn write(&mut self, buf: Vec) -> Result<(), Error> { + // Boilerplate implementation + Ok(()) + } + + fn seek(&mut self, pos: SeekFrom) -> Result<(), Error> { + // Boilerplate implementation + Ok(()) + } + + fn stream_len(&mut self) -> Result { + // Boilerplate implementation + Ok(0) + } + + fn stream_position(&mut self) -> Result { + // Boilerplate implementation + Ok(0) + } + + fn atomic_checkpoint(&mut self) -> Result<(), Error> { + // Boilerplate implementation + Ok(()) + } + + fn set_len(&mut self, size: u64) -> Result<(), Error> { + // Boilerplate implementation + Ok(()) + } +} diff --git a/src/write_ahead_log/write_ahead_log.rs b/src/write_ahead_log/write_ahead_log.rs index 889f45d..4bfb786 100644 --- a/src/write_ahead_log/write_ahead_log.rs +++ b/src/write_ahead_log/write_ahead_log.rs @@ -2,7 +2,7 @@ use std::io::{Error, SeekFrom}; pub trait WriteAheadLog { - fn read(&mut self) -> Result, Error>; + fn read(&mut self, size: u64) -> Result, Error>; fn write(&mut self, buf: Vec) -> Result<(), Error>; diff --git a/tests/write_ahead_log.rs b/tests/write_ahead_log.rs deleted file mode 100644 index e69de29..0000000 diff --git a/tests/write_ahead_log_test.rs b/tests/write_ahead_log_test.rs new file mode 100644 index 0000000..c95158c --- /dev/null +++ b/tests/write_ahead_log_test.rs @@ -0,0 +1,8 @@ +use core_data::write_ahead_log::segment_wal::SegmentWal; + +#[test] +fn test_segment_wal() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + +} From b457201690d4b68fbe3096c5d1005be4c6c64329 Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Sun, 13 Jul 2025 00:34:28 +0200 Subject: [PATCH 02/14] - --- src/write_ahead_log/copy_wal.rs | 101 ++++++++++++++++++++++++++++ src/write_ahead_log/copy_wal_log.rs | 0 src/write_ahead_log/mod.rs | 2 +- src/write_ahead_log/segment_wal.rs | 85 ----------------------- tests/write_ahead_log_test.rs | 4 +- 5 files changed, 105 insertions(+), 87 deletions(-) create mode 100644 src/write_ahead_log/copy_wal.rs create mode 100644 src/write_ahead_log/copy_wal_log.rs delete mode 100644 src/write_ahead_log/segment_wal.rs diff --git a/src/write_ahead_log/copy_wal.rs b/src/write_ahead_log/copy_wal.rs new file mode 100644 index 0000000..80e9bdf --- /dev/null +++ b/src/write_ahead_log/copy_wal.rs @@ -0,0 +1,101 @@ +use std::{io::{Read, Seek, Write}, path::PathBuf}; + +use crate::write_ahead_log::write_ahead_log::WriteAheadLog; + +const WAL_SIGNATURE: [u8; 24] = *b"COPY_WAL_2025_7_12______"; +const DATA_FILE_SIGNATURE: [u8; 8] = *b"DATA____"; +const COPY_FILE_SIGNATURE: [u8; 8] = *b"COPY____"; +const LOG_FILE_SIGNATURE: [u8; 8] = *b"LOG_____"; + +pub struct CopyWal { + dir_path: PathBuf, + data_file: std::fs::File, + copy_file: std::fs::File, + log_file: std::fs::File, +} + +impl CopyWal { + pub fn new_wal_at_directory(dir_path: PathBuf) -> Self { + assert!(dir_path.exists(), "Directory does not exist: {:?}", dir_path); + assert!(dir_path.is_dir(), "Path is not a directory: {:?}", dir_path); + assert!(dir_path.read_dir().expect("Could not read directory").next().is_none(), "Directory is not empty: {:?}", dir_path); + + let data_file_path = dir_path.join("wal.data"); + let mut data_file = std::fs::File::create(&data_file_path) + .expect("Failed to create data file"); + + let copy_file_path = dir_path.join("wal.copy"); + let mut copy_file = std::fs::File::create(©_file_path) + .expect("Failed to create copy file"); + + let log_file_path = dir_path.join("wal.log"); + let mut log_file = std::fs::File::create(&log_file_path) + .expect("Failed to create log file"); + + data_file.write_all(&WAL_SIGNATURE) + .expect("Failed to write data file signature"); + data_file.write_all(&DATA_FILE_SIGNATURE) + .expect("Failed to write data file signature"); + + copy_file.write_all(&WAL_SIGNATURE) + .expect("Failed to write copy file signature"); + copy_file.write_all(©_FILE_SIGNATURE) + .expect("Failed to write copy file signature"); + + log_file.write_all(&WAL_SIGNATURE) + .expect("Failed to write log file signature"); + log_file.write_all(&LOG_FILE_SIGNATURE) + .expect("Failed to write log file signature"); + + Self { + dir_path, + data_file, + copy_file, + log_file, + } + } +} + +impl WriteAheadLog for CopyWal { + fn read(&mut self, size: u64) -> Result, std::io::Error> { + let mut buffer = Vec::with_capacity(size as usize); + self.copy_file + .read_exact(&mut buffer)?; + Ok(buffer) + } + + fn write(&mut self, buf: Vec) -> Result<(), std::io::Error> { + self.copy_file + .write_all(buf.as_slice())?; + Ok(()) + } + + fn seek(&mut self, pos: std::io::SeekFrom) -> Result<(), std::io::Error> { + self.copy_file + .seek(pos)?; + Ok(()) + } + + fn stream_len(&mut self) -> Result { + let len = self.copy_file + .metadata()? + .len(); + Ok(len) + } + + fn stream_position(&mut self) -> Result { + let pos = self.copy_file + .stream_position()?; + Ok(pos) + } + + fn set_len(&mut self, size: u64) -> Result<(), std::io::Error> { + self.copy_file + .set_len(size)?; + Ok(()) + } + + fn atomic_checkpoint(&mut self) -> Result<(), std::io::Error> { + todo!() + } +} \ No newline at end of file diff --git a/src/write_ahead_log/copy_wal_log.rs b/src/write_ahead_log/copy_wal_log.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/write_ahead_log/mod.rs b/src/write_ahead_log/mod.rs index 81dee1a..976f89e 100644 --- a/src/write_ahead_log/mod.rs +++ b/src/write_ahead_log/mod.rs @@ -1,2 +1,2 @@ pub mod write_ahead_log; -pub mod segment_wal; \ No newline at end of file +pub mod copy_wal; \ No newline at end of file diff --git a/src/write_ahead_log/segment_wal.rs b/src/write_ahead_log/segment_wal.rs deleted file mode 100644 index 1ffef3d..0000000 --- a/src/write_ahead_log/segment_wal.rs +++ /dev/null @@ -1,85 +0,0 @@ -use std::{io::{Error, SeekFrom}, path::PathBuf}; -use crate::write_ahead_log::write_ahead_log::WriteAheadLog; - -const WAL_SIGNATURE: [u8; 24] = *b"SegmentWal_2025.7.12____"; -const SEGMENT_FILE_SIGNATURE: [u8; 8] = *b"Log_File"; -const DATA_FILE_SIGNATURE: [u8; 8] = *b"Data____"; - -pub struct SegmentWal { - dir_path: PathBuf, - data_file: std::fs::File, -} - -enum LogOperation { - Read(u64), - Write(Vec), - Seek(SeekFrom), - StreamLen(u64), - StreamPosition(u64), - SetLen(u64), -} - -struct LogEntry { - log_sequence_number: u64, - operation: LogOperation, - checksum: u64, - persistent: bool, -} - - -impl SegmentWal { - pub fn new_wal_at_directory(dir_path: PathBuf) -> Self { - assert!(dir_path.exists(), "Directory does not exist: {:?}", dir_path); - assert!(dir_path.is_dir(), "Path is not a directory: {:?}", dir_path); - assert!(dir_path.read_dir().expect("Could not read directory").next().is_none(), "Directory is not empty: {:?}", dir_path); - - let data_file_path = dir_path.join("wal.data"); - let data_file = std::fs::File::create(&data_file_path) - .expect("Failed to create data file"); - - //get current system time for the log file name - let log_file_name = format!("segment_{}.log", 1); - - Self { - dir_path, - data_file, - } - } -} - -impl WriteAheadLog for SegmentWal { - fn read(&mut self, size: u64) -> Result, Error> { - // Boilerplate implementation - Ok(vec![0; size as usize]) // Return a vector of zeros of the requested size - } - - fn write(&mut self, buf: Vec) -> Result<(), Error> { - // Boilerplate implementation - Ok(()) - } - - fn seek(&mut self, pos: SeekFrom) -> Result<(), Error> { - // Boilerplate implementation - Ok(()) - } - - fn stream_len(&mut self) -> Result { - // Boilerplate implementation - Ok(0) - } - - fn stream_position(&mut self) -> Result { - // Boilerplate implementation - Ok(0) - } - - fn atomic_checkpoint(&mut self) -> Result<(), Error> { - // Boilerplate implementation - Ok(()) - } - - fn set_len(&mut self, size: u64) -> Result<(), Error> { - // Boilerplate implementation - Ok(()) - } -} diff --git a/tests/write_ahead_log_test.rs b/tests/write_ahead_log_test.rs index c95158c..06edb38 100644 --- a/tests/write_ahead_log_test.rs +++ b/tests/write_ahead_log_test.rs @@ -1,8 +1,10 @@ -use core_data::write_ahead_log::segment_wal::SegmentWal; +use core_data::write_ahead_log::copy_wal::{CopyWal}; #[test] fn test_segment_wal() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); + + } From a6c942069ff5d2d2a25f318eccd05e17a2c38c21 Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Sun, 13 Jul 2025 14:50:31 +0200 Subject: [PATCH 03/14] - --- src/write_ahead_log/copy_wal.rs | 101 -------------- src/write_ahead_log/copy_wal_log.rs | 0 src/write_ahead_log/mod.rs | 2 +- src/write_ahead_log/simple_wal.rs | 208 ++++++++++++++++++++++++++++ tests/write_ahead_log_test.rs | 2 +- 5 files changed, 210 insertions(+), 103 deletions(-) delete mode 100644 src/write_ahead_log/copy_wal.rs delete mode 100644 src/write_ahead_log/copy_wal_log.rs create mode 100644 src/write_ahead_log/simple_wal.rs diff --git a/src/write_ahead_log/copy_wal.rs b/src/write_ahead_log/copy_wal.rs deleted file mode 100644 index 80e9bdf..0000000 --- a/src/write_ahead_log/copy_wal.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::{io::{Read, Seek, Write}, path::PathBuf}; - -use crate::write_ahead_log::write_ahead_log::WriteAheadLog; - -const WAL_SIGNATURE: [u8; 24] = *b"COPY_WAL_2025_7_12______"; -const DATA_FILE_SIGNATURE: [u8; 8] = *b"DATA____"; -const COPY_FILE_SIGNATURE: [u8; 8] = *b"COPY____"; -const LOG_FILE_SIGNATURE: [u8; 8] = *b"LOG_____"; - -pub struct CopyWal { - dir_path: PathBuf, - data_file: std::fs::File, - copy_file: std::fs::File, - log_file: std::fs::File, -} - -impl CopyWal { - pub fn new_wal_at_directory(dir_path: PathBuf) -> Self { - assert!(dir_path.exists(), "Directory does not exist: {:?}", dir_path); - assert!(dir_path.is_dir(), "Path is not a directory: {:?}", dir_path); - assert!(dir_path.read_dir().expect("Could not read directory").next().is_none(), "Directory is not empty: {:?}", dir_path); - - let data_file_path = dir_path.join("wal.data"); - let mut data_file = std::fs::File::create(&data_file_path) - .expect("Failed to create data file"); - - let copy_file_path = dir_path.join("wal.copy"); - let mut copy_file = std::fs::File::create(©_file_path) - .expect("Failed to create copy file"); - - let log_file_path = dir_path.join("wal.log"); - let mut log_file = std::fs::File::create(&log_file_path) - .expect("Failed to create log file"); - - data_file.write_all(&WAL_SIGNATURE) - .expect("Failed to write data file signature"); - data_file.write_all(&DATA_FILE_SIGNATURE) - .expect("Failed to write data file signature"); - - copy_file.write_all(&WAL_SIGNATURE) - .expect("Failed to write copy file signature"); - copy_file.write_all(©_FILE_SIGNATURE) - .expect("Failed to write copy file signature"); - - log_file.write_all(&WAL_SIGNATURE) - .expect("Failed to write log file signature"); - log_file.write_all(&LOG_FILE_SIGNATURE) - .expect("Failed to write log file signature"); - - Self { - dir_path, - data_file, - copy_file, - log_file, - } - } -} - -impl WriteAheadLog for CopyWal { - fn read(&mut self, size: u64) -> Result, std::io::Error> { - let mut buffer = Vec::with_capacity(size as usize); - self.copy_file - .read_exact(&mut buffer)?; - Ok(buffer) - } - - fn write(&mut self, buf: Vec) -> Result<(), std::io::Error> { - self.copy_file - .write_all(buf.as_slice())?; - Ok(()) - } - - fn seek(&mut self, pos: std::io::SeekFrom) -> Result<(), std::io::Error> { - self.copy_file - .seek(pos)?; - Ok(()) - } - - fn stream_len(&mut self) -> Result { - let len = self.copy_file - .metadata()? - .len(); - Ok(len) - } - - fn stream_position(&mut self) -> Result { - let pos = self.copy_file - .stream_position()?; - Ok(pos) - } - - fn set_len(&mut self, size: u64) -> Result<(), std::io::Error> { - self.copy_file - .set_len(size)?; - Ok(()) - } - - fn atomic_checkpoint(&mut self) -> Result<(), std::io::Error> { - todo!() - } -} \ No newline at end of file diff --git a/src/write_ahead_log/copy_wal_log.rs b/src/write_ahead_log/copy_wal_log.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/write_ahead_log/mod.rs b/src/write_ahead_log/mod.rs index 976f89e..9a3c118 100644 --- a/src/write_ahead_log/mod.rs +++ b/src/write_ahead_log/mod.rs @@ -1,2 +1,2 @@ pub mod write_ahead_log; -pub mod copy_wal; \ No newline at end of file +pub mod simple_wal; \ No newline at end of file diff --git a/src/write_ahead_log/simple_wal.rs b/src/write_ahead_log/simple_wal.rs new file mode 100644 index 0000000..2b21351 --- /dev/null +++ b/src/write_ahead_log/simple_wal.rs @@ -0,0 +1,208 @@ +//! This WAL implementation consists of 4 files: +//! - wal.tick and wal.tock contain the persistent data and always one of them is the fallback and the other is the operational file. +//! - wal.log contains log entries since the last checkpoint +//! - wal.meta contains a signature that indicates the current operational file +//! +//! On every write operation, the following steps are performed: +//! 1. A log entry is created and written to the log file. Recovery Type A +//! 2. Then the data is written to the current operational file. Recovery Type A +//! +//! On checkpoint, the following steps are performed: +//! 1. The current operational file in wal.meta is switched to the fallback. Recovery Type B +//! 2. Iterate over the log file and apply all log entries to the new operational file. Recovery Type A +//! 3. Erase all log entries in the log file. Recovery Type A +//! +//! After every numbered step, fsync syscall is used to ensure durability. +//! +//! The Recovery Type for each step specifies how a faulty state is detected if the system crashes during this step and how the system can recover from it: +//! - Recovery Type A: A faulty state is detected when wal.meta contains a valid signature but wal.log is not empty and the operational file might be missing. The system can recover by deleting the current operational file, generating a new one as a copy of the fallback and afterwards erasing the log file. If this process is interrupted by a crash, the same recovery can be performed again. +//! - Recovery Type B: A faulty state is detected when wal.meta does not contain a valid signature and wal.tick might be missing. The system can recover by deleting wal.tick and generating it again as a copy of wal.tock. Then the wal.meta is updated with a new valid signature pointing to wal.tock. If this process is interrupted by a crash, the same recovery can be performed again. + +use std::{io::{Read, Seek, Write}, path::PathBuf}; + +use crate::write_ahead_log::write_ahead_log::WriteAheadLog; + +const WAL_SIGNATURE: [u8; 24] = *b"SIMPLE_WAL_2025_7_13____"; +const TOCK_FILE_SIGNATURE: [u8; 8] = *b"TOCK____"; +const TICK_FILE_SIGNATURE: [u8; 8] = *b"TICK____"; +const LOG_FILE_SIGNATURE: [u8; 8] = *b"LOG_____"; +const META_FILE_SIGNATURE: [u8; 8] = *b"META____"; + +pub struct SimpleWal { + tick_file: std::fs::File, + tock_file: std::fs::File, + log_file: std::fs::File, + meta_file: std::fs::File, +} + +impl SimpleWal { + + pub fn new_wal_at_directory(dir_path: PathBuf) -> Self { + assert!(dir_path.exists(), "Directory does not exist: {:?}", dir_path); + assert!(dir_path.is_dir(), "Path is not a directory: {:?}", dir_path); + assert!(dir_path.read_dir().expect("Could not read directory").next().is_none(), "Directory is not empty: {:?}", dir_path); + + let tick_file_path = dir_path.join("wal.tick"); + let mut tick_file = std::fs::File::create(&tick_file_path) + .expect("Failed to create tick file"); + + let tock_file_path = dir_path.join("wal.tock"); + let mut tock_file = std::fs::File::create(&tock_file_path) + .expect("Failed to create tock file"); + + let log_file_path = dir_path.join("wal.log"); + let mut log_file = std::fs::File::create(&log_file_path) + .expect("Failed to create log file"); + + let meta_file_path = dir_path.join("wal.meta"); + let mut meta_file = std::fs::File::create(&meta_file_path) + .expect("Failed to create meta file"); + + tick_file.write_all(&WAL_SIGNATURE) + .expect("Failed to write tick file signature"); + tick_file.write_all(&TICK_FILE_SIGNATURE) + .expect("Failed to write tick file signature"); + + tock_file.write_all(&WAL_SIGNATURE) + .expect("Failed to write tock file signature"); + tock_file.write_all(&TOCK_FILE_SIGNATURE) + .expect("Failed to write tock file signature"); + + log_file.write_all(&WAL_SIGNATURE) + .expect("Failed to write log file signature"); + log_file.write_all(&LOG_FILE_SIGNATURE) + .expect("Failed to write log file signature"); + + meta_file.write_all(&WAL_SIGNATURE) + .expect("Failed to write meta file signature"); + meta_file.write_all(&META_FILE_SIGNATURE) + .expect("Failed to write meta file signature"); + + Self { + tick_file, + tock_file, + log_file, + meta_file, + } + } + + fn get_current_operational_file(&mut self) -> &std::fs::File { + // Seek to the beginning of the meta file + self.meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + + // Read 32 bytes to check if it is a real meta file + let mut signature : [u8; 32] = [0; 32]; + self.meta_file + .read_exact(&mut signature) + .expect("Failed to read meta file signature"); + // Check if the signature matches + assert!(signature.starts_with(&WAL_SIGNATURE) && signature[24..32] == META_FILE_SIGNATURE, + "Invalid meta file signature"); + // Read the next 32 bytes to determine the current operational file: all zeros means tick file, all ones means tock file + let mut operational_file_indicator: [u8; 32] = [0; 32]; + self.meta_file + .read_exact(&mut operational_file_indicator) + .expect("Failed to read operational file indicator"); + if operational_file_indicator.iter().all(|&x| x == 0) { + return &self.tick_file; + } else if operational_file_indicator.iter().all(|&x| x == 1) { + return &self.tock_file; + } else { + panic!("Invalid operational file indicator"); + } + } + + fn write_log_entry(&mut self, stream_pos: u64, data: &Vec){ + self.log_file + .write_all(&stream_pos.to_le_bytes()) + .expect("Failed to write stream position to log file"); + self.log_file + .write_all(data.len().to_le_bytes().as_slice()) + .expect("Failed to write data length to log file"); + self.log_file + .write_all(&data) + .expect("Failed to write data to log file"); + } + + fn read_log_entry(&mut self) -> (u64, Vec) { + let mut stream_pos_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut stream_pos_bytes) + .expect("Failed to read stream position"); + let stream_pos = u64::from_le_bytes(stream_pos_bytes); + + let mut length_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut length_bytes) + .expect("Failed to read length"); + let length = usize::from_le_bytes(length_bytes); + + let mut data = vec![0u8; length]; + self.log_file + .read_exact(&mut data) + .expect("Failed to read data"); + + (stream_pos, data) + } + +} + +impl WriteAheadLog for SimpleWal { + fn read(&mut self, size: u64) -> Result, std::io::Error> { + let mut buffer = Vec::with_capacity(size as usize); + self.get_current_operational_file() + .read_exact(&mut buffer)?; + Ok(buffer) + } + + fn write(&mut self, buf: Vec) -> Result<(), std::io::Error> { + // 1. A log entry is created and written to the log file. + let stream_pos = self.get_current_operational_file() + .stream_position() + .expect("Failed to get stream position"); + self.write_log_entry(stream_pos, &buf); + // 2. Then the data is written to the current operational file. + self.get_current_operational_file() + .write_all(buf.as_slice()) + .expect("Failed to write data to operational file"); + Ok(()) + } + + fn seek(&mut self, pos: std::io::SeekFrom) -> Result<(), std::io::Error> { + self.get_current_operational_file() + .seek(pos)?; + Ok(()) + } + + fn stream_len(&mut self) -> Result { + let len = self.get_current_operational_file() + .metadata()? + .len(); + Ok(len) + } + + fn stream_position(&mut self) -> Result { + let pos = self.get_current_operational_file() + .stream_position()?; + Ok(pos) + } + + fn set_len(&mut self, size: u64) -> Result<(), std::io::Error> { + self.get_current_operational_file() + .set_len(size)?; + Ok(()) + } + + fn atomic_checkpoint(&mut self) -> Result<(), std::io::Error> { + //seek to the beginning of the meta file + self.meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + //c + + + Ok(()) + } +} \ No newline at end of file diff --git a/tests/write_ahead_log_test.rs b/tests/write_ahead_log_test.rs index 06edb38..db4667e 100644 --- a/tests/write_ahead_log_test.rs +++ b/tests/write_ahead_log_test.rs @@ -1,4 +1,4 @@ -use core_data::write_ahead_log::copy_wal::{CopyWal}; +use core_data::write_ahead_log::simple_wal::{SimpleWal}; #[test] fn test_segment_wal() { From 0e45cb6192ff3966c740236405dcd2864bae6d6d Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Sun, 13 Jul 2025 14:47:55 +0000 Subject: [PATCH 04/14] - --- src/write_ahead_log/simple_wal.rs | 73 +++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 4 deletions(-) diff --git a/src/write_ahead_log/simple_wal.rs b/src/write_ahead_log/simple_wal.rs index 2b21351..007b4c4 100644 --- a/src/write_ahead_log/simple_wal.rs +++ b/src/write_ahead_log/simple_wal.rs @@ -78,6 +78,9 @@ impl SimpleWal { meta_file.write_all(&META_FILE_SIGNATURE) .expect("Failed to write meta file signature"); + meta_file.write_all(&[0; 32]) // 32 bytes of zeros for the operational file indicator + .expect("Failed to write operational file indicator"); + Self { tick_file, tock_file, @@ -163,10 +166,16 @@ impl WriteAheadLog for SimpleWal { .stream_position() .expect("Failed to get stream position"); self.write_log_entry(stream_pos, &buf); + self.log_file + .sync_all() + .expect("Failed to sync log file"); // 2. Then the data is written to the current operational file. self.get_current_operational_file() .write_all(buf.as_slice()) .expect("Failed to write data to operational file"); + self.get_current_operational_file() + .sync_all() + .expect("Failed to sync operational file"); Ok(()) } @@ -196,13 +205,69 @@ impl WriteAheadLog for SimpleWal { } fn atomic_checkpoint(&mut self) -> Result<(), std::io::Error> { - //seek to the beginning of the meta file + // 1. The current operational file in wal.meta is switched to the fallback. self.meta_file .seek(std::io::SeekFrom::Start(0)) .expect("Failed to seek in meta file"); - //c - - + // first read the signature to check if it is a real meta file + let mut signature: [u8; 32] = [0; 32]; + self.meta_file + .read_exact(&mut signature) + .expect("Failed to read meta file signature"); + // Check if the signature matches + assert!(signature.starts_with(&WAL_SIGNATURE) && signature[24..32] == META_FILE_SIGNATURE, + "Invalid meta file signature"); + // then read the current operational file indicator and switch it + let mut operational_file_indicator = [0u8; 32]; + self.meta_file + .read_exact(&mut operational_file_indicator) + .expect("Failed to read operational file indicator"); + if operational_file_indicator.iter().all(|&x| x == 0) { + operational_file_indicator.fill(1); + } else if operational_file_indicator.iter().all(|&x| x == 1) { + operational_file_indicator.fill(0); + } else { + panic!("Invalid operational file indicator"); + } + self.meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + self.meta_file + .write_all(&WAL_SIGNATURE) + .expect("Failed to write meta file signature"); + self.meta_file + .write_all(&META_FILE_SIGNATURE) + .expect("Failed to write meta file signature"); + self.meta_file + .write_all(&operational_file_indicator) + .expect("Failed to write operational file indicator"); + self.meta_file + .sync_all() + .expect("Failed to sync meta file"); + // 2. Iterate over the log file and apply all log entries to the new operational file. + self.log_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in log file"); + while self.log_file.stream_position().unwrap() < self.log_file.metadata().unwrap().len() { + let (stream_pos, data) = self.read_log_entry(); + // Write the data to the current operational file at the correct position + self.get_current_operational_file() + .seek(std::io::SeekFrom::Start(stream_pos)) + .expect("Failed to seek in operational file"); + self.get_current_operational_file() + .write_all(&data) + .expect("Failed to write data to operational file"); + } + self.get_current_operational_file() + .sync_all() + .expect("Failed to sync operational file"); + // 3. Erase all log entries in the log file. + self.log_file + .set_len(0) + .expect("Failed to erase log file"); + self.log_file + .sync_all() + .expect("Failed to sync log file"); Ok(()) } } \ No newline at end of file From be1e32deaf85b782dd3e00eca7d73074a24984e1 Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Sun, 13 Jul 2025 15:27:47 +0000 Subject: [PATCH 05/14] - --- src/write_ahead_log/simple_wal.rs | 89 ++++++----------------------- tests/write_ahead_log_test.rs | 94 ++++++++++++++++++++++++++++++- 2 files changed, 109 insertions(+), 74 deletions(-) diff --git a/src/write_ahead_log/simple_wal.rs b/src/write_ahead_log/simple_wal.rs index 007b4c4..7ecc5a4 100644 --- a/src/write_ahead_log/simple_wal.rs +++ b/src/write_ahead_log/simple_wal.rs @@ -22,12 +22,6 @@ use std::{io::{Read, Seek, Write}, path::PathBuf}; use crate::write_ahead_log::write_ahead_log::WriteAheadLog; -const WAL_SIGNATURE: [u8; 24] = *b"SIMPLE_WAL_2025_7_13____"; -const TOCK_FILE_SIGNATURE: [u8; 8] = *b"TOCK____"; -const TICK_FILE_SIGNATURE: [u8; 8] = *b"TICK____"; -const LOG_FILE_SIGNATURE: [u8; 8] = *b"LOG_____"; -const META_FILE_SIGNATURE: [u8; 8] = *b"META____"; - pub struct SimpleWal { tick_file: std::fs::File, tock_file: std::fs::File, @@ -43,40 +37,16 @@ impl SimpleWal { assert!(dir_path.read_dir().expect("Could not read directory").next().is_none(), "Directory is not empty: {:?}", dir_path); let tick_file_path = dir_path.join("wal.tick"); - let mut tick_file = std::fs::File::create(&tick_file_path) - .expect("Failed to create tick file"); + let tick_file = create_file_with_permissions(&tick_file_path); let tock_file_path = dir_path.join("wal.tock"); - let mut tock_file = std::fs::File::create(&tock_file_path) - .expect("Failed to create tock file"); + let tock_file = create_file_with_permissions(&tock_file_path); let log_file_path = dir_path.join("wal.log"); - let mut log_file = std::fs::File::create(&log_file_path) - .expect("Failed to create log file"); + let log_file = create_file_with_permissions(&log_file_path); let meta_file_path = dir_path.join("wal.meta"); - let mut meta_file = std::fs::File::create(&meta_file_path) - .expect("Failed to create meta file"); - - tick_file.write_all(&WAL_SIGNATURE) - .expect("Failed to write tick file signature"); - tick_file.write_all(&TICK_FILE_SIGNATURE) - .expect("Failed to write tick file signature"); - - tock_file.write_all(&WAL_SIGNATURE) - .expect("Failed to write tock file signature"); - tock_file.write_all(&TOCK_FILE_SIGNATURE) - .expect("Failed to write tock file signature"); - - log_file.write_all(&WAL_SIGNATURE) - .expect("Failed to write log file signature"); - log_file.write_all(&LOG_FILE_SIGNATURE) - .expect("Failed to write log file signature"); - - meta_file.write_all(&WAL_SIGNATURE) - .expect("Failed to write meta file signature"); - meta_file.write_all(&META_FILE_SIGNATURE) - .expect("Failed to write meta file signature"); + let mut meta_file = create_file_with_permissions(&meta_file_path) meta_file.write_all(&[0; 32]) // 32 bytes of zeros for the operational file indicator .expect("Failed to write operational file indicator"); @@ -88,22 +58,14 @@ impl SimpleWal { meta_file, } } + + fn get_current_operational_file(&mut self) -> &std::fs::File { - // Seek to the beginning of the meta file self.meta_file .seek(std::io::SeekFrom::Start(0)) .expect("Failed to seek in meta file"); - // Read 32 bytes to check if it is a real meta file - let mut signature : [u8; 32] = [0; 32]; - self.meta_file - .read_exact(&mut signature) - .expect("Failed to read meta file signature"); - // Check if the signature matches - assert!(signature.starts_with(&WAL_SIGNATURE) && signature[24..32] == META_FILE_SIGNATURE, - "Invalid meta file signature"); - // Read the next 32 bytes to determine the current operational file: all zeros means tick file, all ones means tock file let mut operational_file_indicator: [u8; 32] = [0; 32]; self.meta_file .read_exact(&mut operational_file_indicator) @@ -149,7 +111,6 @@ impl SimpleWal { (stream_pos, data) } - } impl WriteAheadLog for SimpleWal { @@ -161,7 +122,6 @@ impl WriteAheadLog for SimpleWal { } fn write(&mut self, buf: Vec) -> Result<(), std::io::Error> { - // 1. A log entry is created and written to the log file. let stream_pos = self.get_current_operational_file() .stream_position() .expect("Failed to get stream position"); @@ -169,7 +129,6 @@ impl WriteAheadLog for SimpleWal { self.log_file .sync_all() .expect("Failed to sync log file"); - // 2. Then the data is written to the current operational file. self.get_current_operational_file() .write_all(buf.as_slice()) .expect("Failed to write data to operational file"); @@ -205,19 +164,6 @@ impl WriteAheadLog for SimpleWal { } fn atomic_checkpoint(&mut self) -> Result<(), std::io::Error> { - // 1. The current operational file in wal.meta is switched to the fallback. - self.meta_file - .seek(std::io::SeekFrom::Start(0)) - .expect("Failed to seek in meta file"); - // first read the signature to check if it is a real meta file - let mut signature: [u8; 32] = [0; 32]; - self.meta_file - .read_exact(&mut signature) - .expect("Failed to read meta file signature"); - // Check if the signature matches - assert!(signature.starts_with(&WAL_SIGNATURE) && signature[24..32] == META_FILE_SIGNATURE, - "Invalid meta file signature"); - // then read the current operational file indicator and switch it let mut operational_file_indicator = [0u8; 32]; self.meta_file .read_exact(&mut operational_file_indicator) @@ -229,28 +175,18 @@ impl WriteAheadLog for SimpleWal { } else { panic!("Invalid operational file indicator"); } - self.meta_file - .seek(std::io::SeekFrom::Start(0)) - .expect("Failed to seek in meta file"); - self.meta_file - .write_all(&WAL_SIGNATURE) - .expect("Failed to write meta file signature"); - self.meta_file - .write_all(&META_FILE_SIGNATURE) - .expect("Failed to write meta file signature"); self.meta_file .write_all(&operational_file_indicator) .expect("Failed to write operational file indicator"); self.meta_file .sync_all() .expect("Failed to sync meta file"); - // 2. Iterate over the log file and apply all log entries to the new operational file. + self.log_file .seek(std::io::SeekFrom::Start(0)) .expect("Failed to seek in log file"); while self.log_file.stream_position().unwrap() < self.log_file.metadata().unwrap().len() { let (stream_pos, data) = self.read_log_entry(); - // Write the data to the current operational file at the correct position self.get_current_operational_file() .seek(std::io::SeekFrom::Start(stream_pos)) .expect("Failed to seek in operational file"); @@ -261,7 +197,7 @@ impl WriteAheadLog for SimpleWal { self.get_current_operational_file() .sync_all() .expect("Failed to sync operational file"); - // 3. Erase all log entries in the log file. + self.log_file .set_len(0) .expect("Failed to erase log file"); @@ -270,4 +206,13 @@ impl WriteAheadLog for SimpleWal { .expect("Failed to sync log file"); Ok(()) } +} + +fn create_file_with_permissions(path: &PathBuf) -> std::fs::File { + std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .expect("Failed to create one of the WAL files") } \ No newline at end of file diff --git a/tests/write_ahead_log_test.rs b/tests/write_ahead_log_test.rs index db4667e..1118371 100644 --- a/tests/write_ahead_log_test.rs +++ b/tests/write_ahead_log_test.rs @@ -1,10 +1,100 @@ -use core_data::write_ahead_log::simple_wal::{SimpleWal}; +use core_data::write_ahead_log::{simple_wal::SimpleWal, write_ahead_log::WriteAheadLog}; +use std::io::{SeekFrom}; #[test] -fn test_segment_wal() { +fn test_read() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + wal.write(vec![1, 2, 3, 4]).expect("write failed"); + wal.seek(SeekFrom::Start(0)).expect("seek failed"); + let data = wal.read(4).expect("read failed"); + assert_eq!(data, vec![1, 2, 3, 4]); +} + +#[test] +fn test_write() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![5, 6, 7, 8]).expect("write failed"); + wal.seek(SeekFrom::Start(0)).expect("seek failed"); + let data = wal.read(4).expect("read failed"); + + assert_eq!(data, vec![5, 6, 7, 8]); +} + +#[test] +fn test_seek() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![9, 10, 11, 12]).expect("write failed"); + wal.seek(SeekFrom::Start(2)).expect("seek failed"); + let data = wal.read(2).expect("read failed"); + + assert_eq!(data, vec![11, 12]); +} + +#[test] +fn test_stream_len() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![13, 14, 15, 16]).expect("write failed"); + let len = wal.stream_len().expect("stream_len failed"); + + assert_eq!(len, 4); +} + +#[test] +fn test_stream_position() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![17, 18, 19, 20]).expect("write failed"); + wal.seek(SeekFrom::Start(2)).expect("seek failed"); + let pos = wal.stream_position().expect("stream_position failed"); + + assert_eq!(pos, 2); +} + +#[test] +fn test_atomic_checkpoint() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![21, 22, 23, 24]).expect("write failed"); + wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + wal.seek(SeekFrom::Start(0)).expect("seek failed"); + let data = wal.read(4).expect("read failed"); + + assert_eq!(data, vec![21, 22, 23, 24]); +} + +#[test] +fn test_set_len() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![25, 26, 27, 28]).expect("write failed"); + wal.set_len(2).expect("set_len failed"); + let len = wal.stream_len().expect("stream_len failed"); + + assert_eq!(len, 2); } From 5fbd244ad8e52e8c7ad07ce2f653c25e1b3ef74b Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Mon, 14 Jul 2025 18:43:51 +0200 Subject: [PATCH 06/14] - --- src/write_ahead_log/mod.rs | 3 +- src/write_ahead_log/simple_wal.rs | 194 +++++++++++++----- src/write_ahead_log/simple_wal_recovery.rs | 93 +++++++++ ...e_ahead_log_test.rs => simple_wal_test.rs} | 48 ++++- 4 files changed, 279 insertions(+), 59 deletions(-) create mode 100644 src/write_ahead_log/simple_wal_recovery.rs rename tests/{write_ahead_log_test.rs => simple_wal_test.rs} (56%) diff --git a/src/write_ahead_log/mod.rs b/src/write_ahead_log/mod.rs index 9a3c118..6b96af5 100644 --- a/src/write_ahead_log/mod.rs +++ b/src/write_ahead_log/mod.rs @@ -1,2 +1,3 @@ pub mod write_ahead_log; -pub mod simple_wal; \ No newline at end of file +pub mod simple_wal; +pub mod simple_wal_recovery; \ No newline at end of file diff --git a/src/write_ahead_log/simple_wal.rs b/src/write_ahead_log/simple_wal.rs index 7ecc5a4..1419833 100644 --- a/src/write_ahead_log/simple_wal.rs +++ b/src/write_ahead_log/simple_wal.rs @@ -3,9 +3,9 @@ //! - wal.log contains log entries since the last checkpoint //! - wal.meta contains a signature that indicates the current operational file //! -//! On every write operation, the following steps are performed: +//! On every write operation or set_len, the following steps are performed: //! 1. A log entry is created and written to the log file. Recovery Type A -//! 2. Then the data is written to the current operational file. Recovery Type A +//! 2. Then the operation is persited on the operational file. Recovery Type A //! //! On checkpoint, the following steps are performed: //! 1. The current operational file in wal.meta is switched to the fallback. Recovery Type B @@ -15,18 +15,34 @@ //! After every numbered step, fsync syscall is used to ensure durability. //! //! The Recovery Type for each step specifies how a faulty state is detected if the system crashes during this step and how the system can recover from it: -//! - Recovery Type A: A faulty state is detected when wal.meta contains a valid signature but wal.log is not empty and the operational file might be missing. The system can recover by deleting the current operational file, generating a new one as a copy of the fallback and afterwards erasing the log file. If this process is interrupted by a crash, the same recovery can be performed again. -//! - Recovery Type B: A faulty state is detected when wal.meta does not contain a valid signature and wal.tick might be missing. The system can recover by deleting wal.tick and generating it again as a copy of wal.tock. Then the wal.meta is updated with a new valid signature pointing to wal.tock. If this process is interrupted by a crash, the same recovery can be performed again. +//! - Recovery Type A: A faulty state is detected when wal.meta contains a valid signature but wal.log is not. The system can recover by erasing the current operational file, generating it again as a copy of the fallback and afterwards erasing the log file. If this process is interrupted by a crash, the same recovery can be performed again. +//! - Recovery Type B: A faulty state is detected when wal.meta does not contain a valid signature. The system can recover by erasing wal.tick and generating it again as a copy of wal.tock. Then the wal.meta is updated with a new valid signature pointing to wal.tock. If this process is interrupted by a crash, the same recovery can be performed again. -use std::{io::{Read, Seek, Write}, path::PathBuf}; +use std::{fs::File, io::{Read, Seek, SeekFrom, Write}, path::PathBuf}; use crate::write_ahead_log::write_ahead_log::WriteAheadLog; pub struct SimpleWal { - tick_file: std::fs::File, - tock_file: std::fs::File, - log_file: std::fs::File, - meta_file: std::fs::File, + pub(crate) tick_file: std::fs::File, + pub(crate) tock_file: std::fs::File, + pub(crate) log_file: std::fs::File, + pub(crate) meta_file: std::fs::File, +} + +enum LogEntry{ + Write(u64, Vec), + SetLen(u64), +} + +impl LogEntry { + fn get_data(&self) -> &Vec { + if let LogEntry::Write(_, data) = self { + data + } else { + panic!("LogEntry does not contain data"); + } + } + } impl SimpleWal { @@ -46,10 +62,9 @@ impl SimpleWal { let log_file = create_file_with_permissions(&log_file_path); let meta_file_path = dir_path.join("wal.meta"); - let mut meta_file = create_file_with_permissions(&meta_file_path) - - meta_file.write_all(&[0; 32]) // 32 bytes of zeros for the operational file indicator - .expect("Failed to write operational file indicator"); + let mut meta_file = create_file_with_permissions(&meta_file_path); + meta_file.write_all(&[0; 32]).expect("Failed to write operational file indicator"); // 32 bytes of zeros for the operational file indicator + meta_file.sync_all().expect("Failed to sync meta file"); Self { tick_file, @@ -59,13 +74,10 @@ impl SimpleWal { } } - - - fn get_current_operational_file(&mut self) -> &std::fs::File { + fn get_current_operational_file(&mut self) -> &File { self.meta_file .seek(std::io::SeekFrom::Start(0)) .expect("Failed to seek in meta file"); - let mut operational_file_indicator: [u8; 32] = [0; 32]; self.meta_file .read_exact(&mut operational_file_indicator) @@ -78,59 +90,98 @@ impl SimpleWal { panic!("Invalid operational file indicator"); } } + - fn write_log_entry(&mut self, stream_pos: u64, data: &Vec){ - self.log_file - .write_all(&stream_pos.to_le_bytes()) - .expect("Failed to write stream position to log file"); - self.log_file - .write_all(data.len().to_le_bytes().as_slice()) - .expect("Failed to write data length to log file"); - self.log_file - .write_all(&data) - .expect("Failed to write data to log file"); + fn write_log_entry(&mut self, log_entry: &LogEntry) { + match log_entry { + LogEntry::Write(stream_pos, data) => { + self.log_file + .write_all(b"WR") + .expect("Failed to write log entry opcode"); + self.log_file + .write_all(&stream_pos.to_le_bytes()) + .expect("Failed to write stream position to log file"); + self.log_file + .write_all(data.len().to_le_bytes().as_slice()) + .expect("Failed to write data length to log file"); + self.log_file + .write_all(data) + .expect("Failed to write data to log file"); + } + LogEntry::SetLen(size) => { + self.log_file + .write_all(b"SL") + .expect("Failed to write log entry opcode"); + self.log_file + .write_all(&size.to_le_bytes()) + .expect("Failed to write size to log file"); + } + } } - fn read_log_entry(&mut self) -> (u64, Vec) { - let mut stream_pos_bytes = [0u8; 8]; + fn read_log_entry(&mut self) -> LogEntry { + + let mut opcode = [0u8; 2]; self.log_file - .read_exact(&mut stream_pos_bytes) - .expect("Failed to read stream position"); - let stream_pos = u64::from_le_bytes(stream_pos_bytes); + .read_exact(&mut opcode) + .expect("Failed to read log entry opcode"); - let mut length_bytes = [0u8; 8]; - self.log_file - .read_exact(&mut length_bytes) - .expect("Failed to read length"); - let length = usize::from_le_bytes(length_bytes); + match &opcode { + b"WR" => { // Write operation + let mut stream_pos_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut stream_pos_bytes) + .expect("Failed to read stream position from log file"); + let stream_pos = u64::from_le_bytes(stream_pos_bytes); - let mut data = vec![0u8; length]; - self.log_file - .read_exact(&mut data) - .expect("Failed to read data"); + let mut data_length_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut data_length_bytes) + .expect("Failed to read data length from log file"); + let data_length = u64::from_le_bytes(data_length_bytes) as usize; + + let mut data = vec![0u8; data_length]; + self.log_file + .read_exact(&mut data) + .expect("Failed to read data from log file"); - (stream_pos, data) + LogEntry::Write(stream_pos, data) + } + b"SL" => { // SetLen operation + let mut size_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut size_bytes) + .expect("Failed to read size from log file"); + let size = u64::from_le_bytes(size_bytes); + LogEntry::SetLen(size) + } + _ => panic!("Unknown log entry opcode encountered"), + } + } } impl WriteAheadLog for SimpleWal { fn read(&mut self, size: u64) -> Result, std::io::Error> { - let mut buffer = Vec::with_capacity(size as usize); + let mut buffer = vec![0u8; size as usize]; self.get_current_operational_file() .read_exact(&mut buffer)?; Ok(buffer) } fn write(&mut self, buf: Vec) -> Result<(), std::io::Error> { + // 1. A log entry is created and written to the log file let stream_pos = self.get_current_operational_file() .stream_position() .expect("Failed to get stream position"); - self.write_log_entry(stream_pos, &buf); + let log_entry = LogEntry::Write(stream_pos, buf); + self.write_log_entry(&log_entry); self.log_file .sync_all() .expect("Failed to sync log file"); + // 2. Then the data is written to the current operational file self.get_current_operational_file() - .write_all(buf.as_slice()) + .write_all(log_entry.get_data()) .expect("Failed to write data to operational file"); self.get_current_operational_file() .sync_all() @@ -158,13 +209,27 @@ impl WriteAheadLog for SimpleWal { } fn set_len(&mut self, size: u64) -> Result<(), std::io::Error> { + // 1. A log entry is created and written to the log file + let log_entry = LogEntry::SetLen(size); + self.write_log_entry(&log_entry); + self.log_file + .sync_all() + .expect("Failed to sync log file"); + // 2. Then the size is set in the current operational file + self.get_current_operational_file() + .set_len(size) + .expect("Failed to set length of operational file"); self.get_current_operational_file() - .set_len(size)?; + .sync_all() + .expect("Failed to sync operational file"); Ok(()) } fn atomic_checkpoint(&mut self) -> Result<(), std::io::Error> { let mut operational_file_indicator = [0u8; 32]; + self.meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); self.meta_file .read_exact(&mut operational_file_indicator) .expect("Failed to read operational file indicator"); @@ -175,6 +240,9 @@ impl WriteAheadLog for SimpleWal { } else { panic!("Invalid operational file indicator"); } + self.meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); self.meta_file .write_all(&operational_file_indicator) .expect("Failed to write operational file indicator"); @@ -186,18 +254,29 @@ impl WriteAheadLog for SimpleWal { .seek(std::io::SeekFrom::Start(0)) .expect("Failed to seek in log file"); while self.log_file.stream_position().unwrap() < self.log_file.metadata().unwrap().len() { - let (stream_pos, data) = self.read_log_entry(); - self.get_current_operational_file() - .seek(std::io::SeekFrom::Start(stream_pos)) - .expect("Failed to seek in operational file"); - self.get_current_operational_file() - .write_all(&data) - .expect("Failed to write data to operational file"); + let log_entry = self.read_log_entry(); + match log_entry { + LogEntry::Write(stream_pos, data) => { + self.get_current_operational_file() + .seek(std::io::SeekFrom::Start(stream_pos)) + .expect("Failed to seek in operational file"); + self.get_current_operational_file() + .write_all(&data) + .expect("Failed to write data to operational file"); + } + LogEntry::SetLen(size) => { + self.get_current_operational_file() + .set_len(size) + .expect("Failed to set length of operational file"); + } + } } self.get_current_operational_file() .sync_all() .expect("Failed to sync operational file"); - + self.log_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in log file"); self.log_file .set_len(0) .expect("Failed to erase log file"); @@ -210,9 +289,14 @@ impl WriteAheadLog for SimpleWal { fn create_file_with_permissions(path: &PathBuf) -> std::fs::File { std::fs::OpenOptions::new() + .read(true) .write(true) .create(true) .truncate(true) .open(path) .expect("Failed to create one of the WAL files") -} \ No newline at end of file +} + + + + diff --git a/src/write_ahead_log/simple_wal_recovery.rs b/src/write_ahead_log/simple_wal_recovery.rs new file mode 100644 index 0000000..ba5fbb6 --- /dev/null +++ b/src/write_ahead_log/simple_wal_recovery.rs @@ -0,0 +1,93 @@ +use std::{fs::File, io::{Read, Seek}, path::PathBuf}; + +use crate::write_ahead_log::simple_wal::SimpleWal; + +impl SimpleWal { + + + /// Opens an existing WAL at the specified directory. + /// + /// Recovery will be attempted which means that this function call can take a long time if the WAL is in a faulty state. + /// If the WAL is in a valid state, it will return immediately and if the WAL can not be recovered, it will panic. + pub fn open_wal_at_directory(dir_path: PathBuf) -> Self { + assert!(dir_path.exists(), "Directory does not exist: {:?}", dir_path); + assert!(dir_path.is_dir(), "Path is not a directory: {:?}", dir_path); + + assert!(dir_path.join("wal.tick").exists(), "wal.tick does not exist in directory: {:?}", dir_path); + assert!(dir_path.join("wal.tock").exists(), "wal.tock does not exist in directory: {:?}", dir_path); + + assert!(dir_path.join("wal.log").exists(), "wal.log does not exist in directory: {:?}", dir_path); + assert!(dir_path.join("wal.meta").exists(), "wal.meta does not exist in directory: {:?}", dir_path); + + let tick_file_path = dir_path.join("wal.tick"); + let tick_file = open_file_with_permissions(&tick_file_path); + + let tock_file_path = dir_path.join("wal.tock"); + let tock_file = open_file_with_permissions(&tock_file_path); + + let log_file_path = dir_path.join("wal.log"); + let log_file = open_file_with_permissions(&log_file_path); + + let meta_file_path = dir_path.join("wal.meta"); + let mut meta_file = open_file_with_permissions(&meta_file_path); + + let mut operational_file_indicator = [0u8; 32]; + meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + meta_file + .read_exact(&mut operational_file_indicator) + .expect("Failed to read operational file indicator"); + let valid_indicator = operational_file_indicator.iter().all(|&x| x == 0) || operational_file_indicator.iter().all(|&x| x == 1); + let log_file_len = log_file.metadata().expect("Failed to get log file metadata").len(); + + + if valid_indicator && log_file_len > 0 { + // If the operational file indicator is valid but the log file is not empty, do a Recovery Type A + // 1. Copy the the fallback to the operational file + let (operational_file, fallback_file) = self.get_operational_and_fallback_file(); + + io::copy(fallback_file, operational_file) + .expect("Failed to copy fallback file to operational file"); + operational_file.sync_all() + .expect("Failed to sync operational file after copying fallback"); + } else if !valid_indicator && dir_path.join("wal.tick").exists() { + // If the operational file indicator is not valid and wal.tick exists, do a Recovery Type B + todo!("Recovery Type B: Implement the recovery logic for type B"); + } + + return Self { + tick_file, + tock_file, + log_file, + meta_file, + }; + } + + fn get_operational_and_fallback_file(&mut self) -> (&File, &File) { + self.meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + let mut operational_file_indicator: [u8; 32] = [0; 32]; + self.meta_file + .read_exact(&mut operational_file_indicator) + .expect("Failed to read operational file indicator"); + if operational_file_indicator.iter().all(|&x| x == 0) { + return (&self.tick_file, &self.tock_file); + } else if operational_file_indicator.iter().all(|&x| x == 1) { + return (&self.tock_file, &self.tick_file); + } else { + panic!("Invalid operational file indicator"); + } + } +} + + + +fn open_file_with_permissions(path: &PathBuf) -> std::fs::File { + std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(path) + .expect("Failed to open one of the WAL files") +} \ No newline at end of file diff --git a/tests/write_ahead_log_test.rs b/tests/simple_wal_test.rs similarity index 56% rename from tests/write_ahead_log_test.rs rename to tests/simple_wal_test.rs index 1118371..ed72821 100644 --- a/tests/write_ahead_log_test.rs +++ b/tests/simple_wal_test.rs @@ -1,5 +1,38 @@ use core_data::write_ahead_log::{simple_wal::SimpleWal, write_ahead_log::WriteAheadLog}; -use std::io::{SeekFrom}; +use std::{io::SeekFrom, path::Path}; + +fn _print_all_file_content(temp_path: &Path){ + let tick_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); + let tock_content = std::fs::read(temp_path.join("wal.tock")).expect("failed to read wal.tock"); + let log_content = std::fs::read(temp_path.join("wal.log")).expect("failed to read wal.log"); + let meta_content = std::fs::read(temp_path.join("wal.meta")).expect("failed to read wal.meta"); + println!("wal.tick content: {:?}", tick_content); + println!("wal.tock content: {:?}", tock_content); + println!("wal.log content: {:?}", log_content); + println!("wal.meta content: {:?}", meta_content); +} + +#[test] +fn test_new_wal_at_directory() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + let _ = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + assert!(temp_path.join("wal.tick").exists()); + assert!(temp_path.join("wal.tock").exists()); + assert!(temp_path.join("wal.log").exists()); + assert!(temp_path.join("wal.meta").exists()); + + let tick_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); + let tock_content = std::fs::read(temp_path.join("wal.tock")).expect("failed to read wal.tock"); + let log_content = std::fs::read(temp_path.join("wal.log")).expect("failed to read wal.log"); + let meta_content = std::fs::read(temp_path.join("wal.meta")).expect("failed to read wal.meta"); + + assert!(tick_content.is_empty(), "wal.tick should be empty"); + assert!(tock_content.is_empty(), "wal.tock should be empty"); + assert!(log_content.is_empty(), "wal.log should be empty"); + assert_eq!(meta_content.len(), 32, "wal.meta should contain 32 bytes of zeros"); + assert!(meta_content.iter().all(|&x| x == 0), "wal.meta should contain 32 bytes of zeros"); +} #[test] fn test_read() { @@ -76,7 +109,6 @@ fn test_atomic_checkpoint() { let temp_path = temp_dir.path(); let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![21, 22, 23, 24]).expect("write failed"); wal.atomic_checkpoint().expect("atomic_checkpoint failed"); wal.seek(SeekFrom::Start(0)).expect("seek failed"); @@ -93,8 +125,18 @@ fn test_set_len() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![25, 26, 27, 28]).expect("write failed"); + let len = wal.stream_len().expect("stream_len failed"); + assert_eq!(len, 4); + wal.set_len(3).expect("set_len failed"); + let len = wal.stream_len().expect("stream_len failed"); + assert_eq!(len, 3); + wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + let len = wal.stream_len().expect("stream_len failed"); + assert_eq!(len, 3); wal.set_len(2).expect("set_len failed"); let len = wal.stream_len().expect("stream_len failed"); - + assert_eq!(len, 2); + wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + let len = wal.stream_len().expect("stream_len failed"); assert_eq!(len, 2); } From 9bb1c3bff558ca20ce4b1a0c7d7919b2c28cf445 Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Mon, 14 Jul 2025 19:43:51 +0200 Subject: [PATCH 07/14] - --- src/write_ahead_log/simple_wal_recovery.rs | 75 +++++++++++++--------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/src/write_ahead_log/simple_wal_recovery.rs b/src/write_ahead_log/simple_wal_recovery.rs index ba5fbb6..474bd43 100644 --- a/src/write_ahead_log/simple_wal_recovery.rs +++ b/src/write_ahead_log/simple_wal_recovery.rs @@ -1,4 +1,4 @@ -use std::{fs::File, io::{Read, Seek}, path::PathBuf}; +use std::{fs::File, io::{self, Read, Seek, SeekFrom, Write}, path::PathBuf}; use crate::write_ahead_log::simple_wal::SimpleWal; @@ -20,10 +20,10 @@ impl SimpleWal { assert!(dir_path.join("wal.meta").exists(), "wal.meta does not exist in directory: {:?}", dir_path); let tick_file_path = dir_path.join("wal.tick"); - let tick_file = open_file_with_permissions(&tick_file_path); + let mut tick_file = open_file_with_permissions(&tick_file_path); let tock_file_path = dir_path.join("wal.tock"); - let tock_file = open_file_with_permissions(&tock_file_path); + let mut tock_file = open_file_with_permissions(&tock_file_path); let log_file_path = dir_path.join("wal.log"); let log_file = open_file_with_permissions(&log_file_path); @@ -44,16 +44,48 @@ impl SimpleWal { if valid_indicator && log_file_len > 0 { // If the operational file indicator is valid but the log file is not empty, do a Recovery Type A - // 1. Copy the the fallback to the operational file - let (operational_file, fallback_file) = self.get_operational_and_fallback_file(); - - io::copy(fallback_file, operational_file) - .expect("Failed to copy fallback file to operational file"); - operational_file.sync_all() - .expect("Failed to sync operational file after copying fallback"); + // 1. Copy the the fallback to the operational file + let operational_file: &mut File; + let fallback_file: &mut File; + if operational_file_indicator.iter().all(|&x| x == 0) { + operational_file = &mut tick_file; + fallback_file = &mut tock_file; + } else { + operational_file = &mut tock_file; + fallback_file = &mut tick_file; + } + operational_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in operational file"); + fallback_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in fallback file"); + operational_file.set_len(0).expect("Failed to erase operational file"); + io::copy(fallback_file, operational_file).expect("Failed to copy fallback file to operational file"); + operational_file.sync_all().expect("Failed to sync operational file"); + // 2. Erase the log file + log_file.set_len(0).expect("Failed to erase log file"); + log_file.sync_all().expect("Failed to sync log file"); } else if !valid_indicator && dir_path.join("wal.tick").exists() { - // If the operational file indicator is not valid and wal.tick exists, do a Recovery Type B - todo!("Recovery Type B: Implement the recovery logic for type B"); + // If the operational file indicator is not valid, do a Recovery Type B + // 1. Copy the tock file to the tick file + tock_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in tock file"); + tick_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in tick file"); + tick_file.set_len(0).expect("Failed to erase tick file"); + io::copy(&mut tock_file, &mut tick_file).expect("Failed to copy tock file to tick file"); + tick_file.sync_all().expect("Failed to sync tick file"); + // 2. Update the operational file indicator in the meta file (write all ones) + meta_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + meta_file + .write_all(&[1u8; 32]) + .expect("Failed to write operational file indicator"); + meta_file.sync_all().expect("Failed to sync meta file"); } return Self { @@ -64,26 +96,9 @@ impl SimpleWal { }; } - fn get_operational_and_fallback_file(&mut self) -> (&File, &File) { - self.meta_file - .seek(std::io::SeekFrom::Start(0)) - .expect("Failed to seek in meta file"); - let mut operational_file_indicator: [u8; 32] = [0; 32]; - self.meta_file - .read_exact(&mut operational_file_indicator) - .expect("Failed to read operational file indicator"); - if operational_file_indicator.iter().all(|&x| x == 0) { - return (&self.tick_file, &self.tock_file); - } else if operational_file_indicator.iter().all(|&x| x == 1) { - return (&self.tock_file, &self.tick_file); - } else { - panic!("Invalid operational file indicator"); - } - } + } - - fn open_file_with_permissions(path: &PathBuf) -> std::fs::File { std::fs::OpenOptions::new() .read(true) From 0e3b59931c2b7bd7848a737ffc6ef37f5d9901cc Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Tue, 15 Jul 2025 15:05:10 +0200 Subject: [PATCH 08/14] - --- src/write_ahead_log/simple_wal.rs | 4 +- src/write_ahead_log/simple_wal_recovery.rs | 3 + tests/simple_wal_test.rs | 75 ++++++++++++++++++++++ 3 files changed, 80 insertions(+), 2 deletions(-) diff --git a/src/write_ahead_log/simple_wal.rs b/src/write_ahead_log/simple_wal.rs index 1419833..56738b3 100644 --- a/src/write_ahead_log/simple_wal.rs +++ b/src/write_ahead_log/simple_wal.rs @@ -15,8 +15,8 @@ //! After every numbered step, fsync syscall is used to ensure durability. //! //! The Recovery Type for each step specifies how a faulty state is detected if the system crashes during this step and how the system can recover from it: -//! - Recovery Type A: A faulty state is detected when wal.meta contains a valid signature but wal.log is not. The system can recover by erasing the current operational file, generating it again as a copy of the fallback and afterwards erasing the log file. If this process is interrupted by a crash, the same recovery can be performed again. -//! - Recovery Type B: A faulty state is detected when wal.meta does not contain a valid signature. The system can recover by erasing wal.tick and generating it again as a copy of wal.tock. Then the wal.meta is updated with a new valid signature pointing to wal.tock. If this process is interrupted by a crash, the same recovery can be performed again. +//! - Recovery Type A: A faulty state is detected when wal.meta contains a valid signature but wal.log is not empty. The system can recover by erasing the current operational file, generating it again as a copy of the fallback and afterwards erasing the log file. If this process is interrupted by a crash, the same recovery can be performed again. +//! - Recovery Type B: A faulty state is detected when wal.meta does not contain a valid signature. The system can recover by erasing wal.tick and generating it again as a copy of wal.tock. Then the log file is erased. Then the wal.meta is updated with a new valid signature pointing to wal.tock. If this process is interrupted by a crash, recovery is still possible. use std::{fs::File, io::{Read, Seek, SeekFrom, Write}, path::PathBuf}; diff --git a/src/write_ahead_log/simple_wal_recovery.rs b/src/write_ahead_log/simple_wal_recovery.rs index 474bd43..cda7ad1 100644 --- a/src/write_ahead_log/simple_wal_recovery.rs +++ b/src/write_ahead_log/simple_wal_recovery.rs @@ -86,6 +86,9 @@ impl SimpleWal { .write_all(&[1u8; 32]) .expect("Failed to write operational file indicator"); meta_file.sync_all().expect("Failed to sync meta file"); + // 3. Erase the log file + log_file.set_len(0).expect("Failed to erase log file"); + log_file.sync_all().expect("Failed to sync log file"); } return Self { diff --git a/tests/simple_wal_test.rs b/tests/simple_wal_test.rs index ed72821..30c10dc 100644 --- a/tests/simple_wal_test.rs +++ b/tests/simple_wal_test.rs @@ -12,6 +12,20 @@ fn _print_all_file_content(temp_path: &Path){ println!("wal.meta content: {:?}", meta_content); } +fn health_check(temp_path: &Path) { + // Check if the log file is empty + let log_content = std::fs::read(temp_path.join("wal.log")).expect("failed to read wal.log"); + assert!(log_content.is_empty(), "wal.log should be empty after recovery"); + // Check if the meta file is valid + let meta_content = std::fs::read(temp_path.join("wal.meta")).expect("failed to read wal.meta"); + assert_eq!(meta_content.len(), 32, "wal.meta should contain 32 bytes"); + assert!(meta_content.iter().all(|&x| x == 1) || meta_content.iter().all(|&x| x == 0), "wal.meta should contain all ones or all zeros after recovery"); + // Check if tick and tock files are identical + let tick_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); + let tock_content = std::fs::read(temp_path.join("wal.tock")).expect("failed to read wal.tock"); + assert_eq!(tick_content, tock_content, "wal.tick and wal.tock should be identical after recovery"); +} + #[test] fn test_new_wal_at_directory() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); @@ -140,3 +154,64 @@ fn test_set_len() { let len = wal.stream_len().expect("stream_len failed"); assert_eq!(len, 2); } + +#[test] +fn test_open_wal_at_directory() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + wal.write(vec![29, 30, 31, 32]).expect("write failed"); + wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + drop(wal); + let mut wal = SimpleWal::open_wal_at_directory(temp_path.to_path_buf()); + wal.seek(SeekFrom::Start(0)).expect("seek failed"); + let data = wal.read(4).expect("read failed"); + assert_eq!(data, vec![29, 30, 31, 32]); +} + +#[test] +fn test_recovery_type_a() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + wal.write(vec![33, 34, 35, 36]).expect("write failed"); + wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + wal.seek(SeekFrom::Start(0)).expect("seek failed"); + wal.write(vec![37, 38, 39, 40]).expect("write failed"); + + // Simulate a crash by dropping the wal before the atomic checkpoint wich results in a non empty log file and a recovery type A + drop(wal); + let mut wal = SimpleWal::open_wal_at_directory(temp_path.to_path_buf()); + health_check(temp_path); + // Check if the content matches the checkpoint + wal.seek(SeekFrom::Start(0)).expect("seek failed"); + let data = wal.read(4).expect("read failed"); + assert_eq!(data, vec![33, 34, 35, 36]); +} + +#[test] +fn test_recovery_type_b() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + wal.write(vec![41, 42, 43, 44]).expect("write failed"); + wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + wal.seek(SeekFrom::Start(0)).expect("seek failed"); + wal.write(vec![45, 46, 47, 48]).expect("write failed"); + wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + drop(wal); + + //now we simulate a crash during the atomic checkpoint by writing the meta file to half ones and half zeros + //and the log file to a non empty state + std::fs::write(temp_path.join("wal.meta"), [1u8; 16].iter().chain([0u8; 16].iter()).cloned().collect::>()).expect("failed to write wal.meta"); + std::fs::write(temp_path.join("wal.log"), [49u8, 50u8, 51u8, 52u8]).expect("failed to write wal.log"); + + let mut wal = SimpleWal::open_wal_at_directory(temp_path.to_path_buf()); + health_check(temp_path); + // Check if the content matches one of the checkpoints + wal.seek(SeekFrom::Start(0)).expect("seek failed"); + let data = wal.read(4).expect("read failed"); + assert!(data == vec![41, 42, 43, 44] || data == vec![45, 46, 47, 48], "data should match one of the checkpoints"); +} + + From f65bacb1db8625fe72249cfffe647445a202754a Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Tue, 15 Jul 2025 15:24:41 +0200 Subject: [PATCH 09/14] - --- src/write_ahead_log/simple_wal.rs | 9 ++---- src/write_ahead_log/write_ahead_log.rs | 6 ++-- tests/simple_wal_test.rs | 42 +++++++++++++------------- 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/src/write_ahead_log/simple_wal.rs b/src/write_ahead_log/simple_wal.rs index 56738b3..38170be 100644 --- a/src/write_ahead_log/simple_wal.rs +++ b/src/write_ahead_log/simple_wal.rs @@ -169,7 +169,7 @@ impl WriteAheadLog for SimpleWal { Ok(buffer) } - fn write(&mut self, buf: Vec) -> Result<(), std::io::Error> { + fn write(&mut self, buf: Vec){ // 1. A log entry is created and written to the log file let stream_pos = self.get_current_operational_file() .stream_position() @@ -186,7 +186,6 @@ impl WriteAheadLog for SimpleWal { self.get_current_operational_file() .sync_all() .expect("Failed to sync operational file"); - Ok(()) } fn seek(&mut self, pos: std::io::SeekFrom) -> Result<(), std::io::Error> { @@ -208,7 +207,7 @@ impl WriteAheadLog for SimpleWal { Ok(pos) } - fn set_len(&mut self, size: u64) -> Result<(), std::io::Error> { + fn set_len(&mut self, size: u64){ // 1. A log entry is created and written to the log file let log_entry = LogEntry::SetLen(size); self.write_log_entry(&log_entry); @@ -222,10 +221,9 @@ impl WriteAheadLog for SimpleWal { self.get_current_operational_file() .sync_all() .expect("Failed to sync operational file"); - Ok(()) } - fn atomic_checkpoint(&mut self) -> Result<(), std::io::Error> { + fn atomic_checkpoint(&mut self){ let mut operational_file_indicator = [0u8; 32]; self.meta_file .seek(std::io::SeekFrom::Start(0)) @@ -283,7 +281,6 @@ impl WriteAheadLog for SimpleWal { self.log_file .sync_all() .expect("Failed to sync log file"); - Ok(()) } } diff --git a/src/write_ahead_log/write_ahead_log.rs b/src/write_ahead_log/write_ahead_log.rs index 4bfb786..be47562 100644 --- a/src/write_ahead_log/write_ahead_log.rs +++ b/src/write_ahead_log/write_ahead_log.rs @@ -4,7 +4,7 @@ pub trait WriteAheadLog { fn read(&mut self, size: u64) -> Result, Error>; - fn write(&mut self, buf: Vec) -> Result<(), Error>; + fn write(&mut self, buf: Vec); fn seek(&mut self, pos: SeekFrom) -> Result<(), Error>; @@ -12,7 +12,7 @@ pub trait WriteAheadLog { fn stream_position(&mut self) -> Result; - fn atomic_checkpoint(&mut self) -> Result<(), Error>; + fn atomic_checkpoint(&mut self); - fn set_len(&mut self, size: u64) -> Result<(), Error>; + fn set_len(&mut self, size: u64); } \ No newline at end of file diff --git a/tests/simple_wal_test.rs b/tests/simple_wal_test.rs index 30c10dc..af31e0d 100644 --- a/tests/simple_wal_test.rs +++ b/tests/simple_wal_test.rs @@ -55,7 +55,7 @@ fn test_read() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![1, 2, 3, 4]).expect("write failed"); + wal.write(vec![1, 2, 3, 4]); wal.seek(SeekFrom::Start(0)).expect("seek failed"); let data = wal.read(4).expect("read failed"); @@ -69,7 +69,7 @@ fn test_write() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![5, 6, 7, 8]).expect("write failed"); + wal.write(vec![5, 6, 7, 8]); wal.seek(SeekFrom::Start(0)).expect("seek failed"); let data = wal.read(4).expect("read failed"); @@ -83,7 +83,7 @@ fn test_seek() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![9, 10, 11, 12]).expect("write failed"); + wal.write(vec![9, 10, 11, 12]); wal.seek(SeekFrom::Start(2)).expect("seek failed"); let data = wal.read(2).expect("read failed"); @@ -97,7 +97,7 @@ fn test_stream_len() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![13, 14, 15, 16]).expect("write failed"); + wal.write(vec![13, 14, 15, 16]); let len = wal.stream_len().expect("stream_len failed"); assert_eq!(len, 4); @@ -110,7 +110,7 @@ fn test_stream_position() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![17, 18, 19, 20]).expect("write failed"); + wal.write(vec![17, 18, 19, 20]); wal.seek(SeekFrom::Start(2)).expect("seek failed"); let pos = wal.stream_position().expect("stream_position failed"); @@ -123,8 +123,8 @@ fn test_atomic_checkpoint() { let temp_path = temp_dir.path(); let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![21, 22, 23, 24]).expect("write failed"); - wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + wal.write(vec![21, 22, 23, 24]); + wal.atomic_checkpoint(); wal.seek(SeekFrom::Start(0)).expect("seek failed"); let data = wal.read(4).expect("read failed"); @@ -138,19 +138,19 @@ fn test_set_len() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![25, 26, 27, 28]).expect("write failed"); + wal.write(vec![25, 26, 27, 28]); let len = wal.stream_len().expect("stream_len failed"); assert_eq!(len, 4); - wal.set_len(3).expect("set_len failed"); + wal.set_len(3); let len = wal.stream_len().expect("stream_len failed"); assert_eq!(len, 3); - wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + wal.atomic_checkpoint(); let len = wal.stream_len().expect("stream_len failed"); assert_eq!(len, 3); - wal.set_len(2).expect("set_len failed"); + wal.set_len(2); let len = wal.stream_len().expect("stream_len failed"); assert_eq!(len, 2); - wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + wal.atomic_checkpoint(); let len = wal.stream_len().expect("stream_len failed"); assert_eq!(len, 2); } @@ -160,8 +160,8 @@ fn test_open_wal_at_directory() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![29, 30, 31, 32]).expect("write failed"); - wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + wal.write(vec![29, 30, 31, 32]); + wal.atomic_checkpoint(); drop(wal); let mut wal = SimpleWal::open_wal_at_directory(temp_path.to_path_buf()); wal.seek(SeekFrom::Start(0)).expect("seek failed"); @@ -174,10 +174,10 @@ fn test_recovery_type_a() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![33, 34, 35, 36]).expect("write failed"); - wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + wal.write(vec![33, 34, 35, 36]); + wal.atomic_checkpoint(); wal.seek(SeekFrom::Start(0)).expect("seek failed"); - wal.write(vec![37, 38, 39, 40]).expect("write failed"); + wal.write(vec![37, 38, 39, 40]); // Simulate a crash by dropping the wal before the atomic checkpoint wich results in a non empty log file and a recovery type A drop(wal); @@ -194,11 +194,11 @@ fn test_recovery_type_b() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); - wal.write(vec![41, 42, 43, 44]).expect("write failed"); - wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + wal.write(vec![41, 42, 43, 44]); + wal.atomic_checkpoint(); wal.seek(SeekFrom::Start(0)).expect("seek failed"); - wal.write(vec![45, 46, 47, 48]).expect("write failed"); - wal.atomic_checkpoint().expect("atomic_checkpoint failed"); + wal.write(vec![45, 46, 47, 48]); + wal.atomic_checkpoint(); drop(wal); //now we simulate a crash during the atomic checkpoint by writing the meta file to half ones and half zeros From 02b3fdae08da0fd7b51a6b37e452a71584e8f48e Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Tue, 15 Jul 2025 18:57:31 +0200 Subject: [PATCH 10/14] - --- Cargo.toml | 3 +- src/write_ahead_log/simple_wal.rs | 12 +++- tests/simple_wal_test.rs | 112 ++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5e1d427..caa4831 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,5 @@ edition = "2024" [dev-dependencies] -tempfile = "3.20.0" \ No newline at end of file +tempfile = "3.20.0" +rand = "0.9.1" \ No newline at end of file diff --git a/src/write_ahead_log/simple_wal.rs b/src/write_ahead_log/simple_wal.rs index 38170be..a074004 100644 --- a/src/write_ahead_log/simple_wal.rs +++ b/src/write_ahead_log/simple_wal.rs @@ -224,6 +224,11 @@ impl WriteAheadLog for SimpleWal { } fn atomic_checkpoint(&mut self){ + // save the currrent seek position of the operational file + let current_seek_pos = self.get_current_operational_file() + .stream_position() + .expect("Failed to get current seek position"); + // 1. The current operational file in wal.meta is switched to the fallback let mut operational_file_indicator = [0u8; 32]; self.meta_file .seek(std::io::SeekFrom::Start(0)) @@ -247,7 +252,7 @@ impl WriteAheadLog for SimpleWal { self.meta_file .sync_all() .expect("Failed to sync meta file"); - + // 2. Iterate over the log file and apply all log entries to the new operational file self.log_file .seek(std::io::SeekFrom::Start(0)) .expect("Failed to seek in log file"); @@ -272,6 +277,7 @@ impl WriteAheadLog for SimpleWal { self.get_current_operational_file() .sync_all() .expect("Failed to sync operational file"); + // 3. Erase all log entries in the log file self.log_file .seek(SeekFrom::Start(0)) .expect("Failed to seek in log file"); @@ -281,6 +287,10 @@ impl WriteAheadLog for SimpleWal { self.log_file .sync_all() .expect("Failed to sync log file"); + // Restore the seek position of the operational file + self.get_current_operational_file() + .seek(SeekFrom::Start(current_seek_pos)) + .expect("Failed to restore seek position in operational file"); } } diff --git a/tests/simple_wal_test.rs b/tests/simple_wal_test.rs index af31e0d..c442d64 100644 --- a/tests/simple_wal_test.rs +++ b/tests/simple_wal_test.rs @@ -1,5 +1,8 @@ use core_data::write_ahead_log::{simple_wal::SimpleWal, write_ahead_log::WriteAheadLog}; +use rand::SeedableRng; use std::{io::SeekFrom, path::Path}; +use std::fs::{File, OpenOptions}; +use std::io::{Read, Seek, Write}; fn _print_all_file_content(temp_path: &Path){ let tick_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); @@ -215,3 +218,112 @@ fn test_recovery_type_b() { } +//a function that is testing every operation by repeatedly writing and reading data between checkpoints while keeping a pseudo file in memory for comparison +#[test] +fn test_all_operations() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + + let comp_file_path = temp_path.join("comp.test"); + let mut comp_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&comp_file_path) + .expect("failed to create comp.test"); + + let seed: [u8; 32] = [ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, + 25, 26, 27, 28, 29, 30, 31, 32, + ]; + + let mut rng = rand::rngs::StdRng::from_seed(seed); + + for _ in 0..100 { + for _ in 0..10 { + let operation = rand::Rng::random_range(&mut rng, 0..7); + match operation { + 0 => { + // Write + let data: Vec = (0..10).map(|_| rand::Rng::random_range(&mut rng, 0..255)).collect(); + wal.write(data.clone()); + comp_file.write_all(&data).expect("write failed"); + } + 1 => { + // Read + let wal_size = wal.stream_len().expect("stream_len failed"); + let mut wal_stream_pos = wal.stream_position().expect("stream_position failed"); + if wal_stream_pos >= wal_size { + //seek to the beginning of the file + wal.seek(SeekFrom::Start(0)).expect("seek failed"); + comp_file.seek(SeekFrom::Start(0)).expect("seek failed"); + wal_stream_pos = 0; + } + let max_read_size = wal_size - wal_stream_pos; + let percentage: f64 = rand::Rng::random_range(&mut rng, 0.0..100.0); + let size = ((percentage as u64) * max_read_size) / 100; + let data = wal.read(size).expect(&format!("read failed, wal_size: {}, wal_stream_pos: {}, size: {}", wal_size, wal_stream_pos, size)); + let mut pseudo_data = vec![0u8; size as usize]; + comp_file.read_exact(&mut pseudo_data).expect("read failed"); + assert_eq!(data, pseudo_data, "data should match comp.test data"); + } + 2 => { + // Seek + let comp_file_len = comp_file.metadata().expect("metadata failed").len(); + let pos = rand::Rng::random_range(&mut rng, 0..=comp_file_len); + wal.seek(SeekFrom::Start(pos)).expect("seek failed"); + comp_file.seek(SeekFrom::Start(pos)).expect("seek failed"); + let comp_file_seek_pos = comp_file.stream_position().expect("stream_position failed"); + assert_eq!(wal.stream_position().expect("stream_position failed"), comp_file_seek_pos, "stream_position should match comp.test seek position"); + } + 3 => { + // StreamLen + let len = wal.stream_len().expect("stream_len failed"); + let comp_file_len = comp_file.metadata().expect("metadata failed").len(); + assert_eq!(len, comp_file_len, "stream_len should match comp.test length"); + } + 4 => { + // StreamPosition + let pos = wal.stream_position().expect("stream_position failed"); + let comp_file_seek_pos = comp_file.stream_position().expect("stream_position failed"); + assert_eq!(pos, comp_file_seek_pos, "stream_position should match comp.test position"); + } + 5 => { + // AtomicCheckpoint + wal.atomic_checkpoint(); + + health_check(temp_path); + let file_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); + let mut comp_content = Vec::new(); + let mut comp_file_check = File::open(&comp_file_path).expect("failed to open comp.test"); + comp_file_check.read_to_end(&mut comp_content).expect("failed to read comp.test"); + assert_eq!(file_content, comp_content, "wal.tick should match comp.test content after atomic checkpoint"); + //check if the stream positions and lengths are still equal + let wal_stream_pos = wal.stream_position().expect("stream_position failed"); + let comp_file_seek_pos = comp_file.stream_position().expect("stream_position failed"); + assert_eq!(wal_stream_pos, comp_file_seek_pos, "stream_position should match comp.test position after atomic checkpoint"); + let wal_len = wal.stream_len().expect("stream_len failed"); + let comp_file_len = comp_file.metadata().expect("metadata failed").len(); + assert_eq!(wal_len, comp_file_len, "stream_len should match comp.test length after atomic checkpoint"); + } + 6 => { + // SetLen + let comp_file_len = comp_file.metadata().expect("metadata failed").len(); + let new_len = rand::Rng::random_range(&mut rng, 0..=comp_file_len); + wal.set_len(new_len); + comp_file.set_len(new_len).expect("set_len failed"); + } + _ => unreachable!(), + } + } + } + wal.atomic_checkpoint(); + health_check(temp_path); + let file_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); + let mut comp_content = Vec::new(); + let mut comp_file_check = File::open(&comp_file_path).expect("failed to open comp.test"); + comp_file_check.read_to_end(&mut comp_content).expect("failed to read comp.test"); + assert_eq!(file_content, comp_content, "wal.tick should match comp.test content after all operations"); +} \ No newline at end of file From aa96ac18c3387754d119c56d860208c76eae3175 Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Fri, 18 Jul 2025 17:09:48 +0200 Subject: [PATCH 11/14] - --- src/write_ahead_log/simple_wal.rs | 26 +++++----- src/write_ahead_log/write_ahead_log.rs | 10 ++-- tests/simple_wal_test.rs | 68 +++++++++++++------------- 3 files changed, 53 insertions(+), 51 deletions(-) diff --git a/src/write_ahead_log/simple_wal.rs b/src/write_ahead_log/simple_wal.rs index a074004..8f291a1 100644 --- a/src/write_ahead_log/simple_wal.rs +++ b/src/write_ahead_log/simple_wal.rs @@ -162,11 +162,11 @@ impl SimpleWal { } impl WriteAheadLog for SimpleWal { - fn read(&mut self, size: u64) -> Result, std::io::Error> { + fn read(&mut self, size: u64) -> Vec { let mut buffer = vec![0u8; size as usize]; self.get_current_operational_file() - .read_exact(&mut buffer)?; - Ok(buffer) + .read_exact(&mut buffer).expect("Failed to read data from operational file during a WAL read operation"); + buffer } fn write(&mut self, buf: Vec){ @@ -188,23 +188,25 @@ impl WriteAheadLog for SimpleWal { .expect("Failed to sync operational file"); } - fn seek(&mut self, pos: std::io::SeekFrom) -> Result<(), std::io::Error> { + fn seek(&mut self, pos: std::io::SeekFrom){ self.get_current_operational_file() - .seek(pos)?; - Ok(()) + .seek(pos) + .expect("Failed to seek in operational file during a WAL seek operation"); } - fn stream_len(&mut self) -> Result { + fn stream_len(&mut self) -> u64{ let len = self.get_current_operational_file() - .metadata()? + .metadata() + .expect("Failed to get metadata of operational file during a WAL stream_len operation") .len(); - Ok(len) + return len; } - fn stream_position(&mut self) -> Result { + fn stream_position(&mut self) -> u64{ let pos = self.get_current_operational_file() - .stream_position()?; - Ok(pos) + .stream_position() + .expect("Failed to get stream position of operational file during a WAL stream_position operation"); + return pos; } fn set_len(&mut self, size: u64){ diff --git a/src/write_ahead_log/write_ahead_log.rs b/src/write_ahead_log/write_ahead_log.rs index be47562..87d3d2b 100644 --- a/src/write_ahead_log/write_ahead_log.rs +++ b/src/write_ahead_log/write_ahead_log.rs @@ -1,16 +1,16 @@ -use std::io::{Error, SeekFrom}; +use std::io::{SeekFrom}; pub trait WriteAheadLog { - fn read(&mut self, size: u64) -> Result, Error>; + fn read(&mut self, size: u64) -> Vec; fn write(&mut self, buf: Vec); - fn seek(&mut self, pos: SeekFrom) -> Result<(), Error>; + fn seek(&mut self, pos: SeekFrom); - fn stream_len(&mut self) -> Result; + fn stream_len(&mut self) -> u64; - fn stream_position(&mut self) -> Result; + fn stream_position(&mut self) -> u64; fn atomic_checkpoint(&mut self); diff --git a/tests/simple_wal_test.rs b/tests/simple_wal_test.rs index c442d64..21f83be 100644 --- a/tests/simple_wal_test.rs +++ b/tests/simple_wal_test.rs @@ -59,8 +59,8 @@ fn test_read() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![1, 2, 3, 4]); - wal.seek(SeekFrom::Start(0)).expect("seek failed"); - let data = wal.read(4).expect("read failed"); + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); assert_eq!(data, vec![1, 2, 3, 4]); } @@ -73,8 +73,8 @@ fn test_write() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![5, 6, 7, 8]); - wal.seek(SeekFrom::Start(0)).expect("seek failed"); - let data = wal.read(4).expect("read failed"); + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); assert_eq!(data, vec![5, 6, 7, 8]); } @@ -87,8 +87,8 @@ fn test_seek() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![9, 10, 11, 12]); - wal.seek(SeekFrom::Start(2)).expect("seek failed"); - let data = wal.read(2).expect("read failed"); + wal.seek(SeekFrom::Start(2)); + let data = wal.read(2); assert_eq!(data, vec![11, 12]); } @@ -101,7 +101,7 @@ fn test_stream_len() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![13, 14, 15, 16]); - let len = wal.stream_len().expect("stream_len failed"); + let len = wal.stream_len(); assert_eq!(len, 4); } @@ -114,8 +114,8 @@ fn test_stream_position() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![17, 18, 19, 20]); - wal.seek(SeekFrom::Start(2)).expect("seek failed"); - let pos = wal.stream_position().expect("stream_position failed"); + wal.seek(SeekFrom::Start(2)); + let pos = wal.stream_position(); assert_eq!(pos, 2); } @@ -128,8 +128,8 @@ fn test_atomic_checkpoint() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![21, 22, 23, 24]); wal.atomic_checkpoint(); - wal.seek(SeekFrom::Start(0)).expect("seek failed"); - let data = wal.read(4).expect("read failed"); + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); assert_eq!(data, vec![21, 22, 23, 24]); } @@ -142,19 +142,19 @@ fn test_set_len() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![25, 26, 27, 28]); - let len = wal.stream_len().expect("stream_len failed"); + let len = wal.stream_len(); assert_eq!(len, 4); wal.set_len(3); - let len = wal.stream_len().expect("stream_len failed"); + let len = wal.stream_len(); assert_eq!(len, 3); wal.atomic_checkpoint(); - let len = wal.stream_len().expect("stream_len failed"); + let len = wal.stream_len(); assert_eq!(len, 3); wal.set_len(2); - let len = wal.stream_len().expect("stream_len failed"); + let len = wal.stream_len(); assert_eq!(len, 2); wal.atomic_checkpoint(); - let len = wal.stream_len().expect("stream_len failed"); + let len = wal.stream_len(); assert_eq!(len, 2); } @@ -167,8 +167,8 @@ fn test_open_wal_at_directory() { wal.atomic_checkpoint(); drop(wal); let mut wal = SimpleWal::open_wal_at_directory(temp_path.to_path_buf()); - wal.seek(SeekFrom::Start(0)).expect("seek failed"); - let data = wal.read(4).expect("read failed"); + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); assert_eq!(data, vec![29, 30, 31, 32]); } @@ -179,7 +179,7 @@ fn test_recovery_type_a() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![33, 34, 35, 36]); wal.atomic_checkpoint(); - wal.seek(SeekFrom::Start(0)).expect("seek failed"); + wal.seek(SeekFrom::Start(0)); wal.write(vec![37, 38, 39, 40]); // Simulate a crash by dropping the wal before the atomic checkpoint wich results in a non empty log file and a recovery type A @@ -187,8 +187,8 @@ fn test_recovery_type_a() { let mut wal = SimpleWal::open_wal_at_directory(temp_path.to_path_buf()); health_check(temp_path); // Check if the content matches the checkpoint - wal.seek(SeekFrom::Start(0)).expect("seek failed"); - let data = wal.read(4).expect("read failed"); + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); assert_eq!(data, vec![33, 34, 35, 36]); } @@ -199,7 +199,7 @@ fn test_recovery_type_b() { let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![41, 42, 43, 44]); wal.atomic_checkpoint(); - wal.seek(SeekFrom::Start(0)).expect("seek failed"); + wal.seek(SeekFrom::Start(0)); wal.write(vec![45, 46, 47, 48]); wal.atomic_checkpoint(); drop(wal); @@ -212,8 +212,8 @@ fn test_recovery_type_b() { let mut wal = SimpleWal::open_wal_at_directory(temp_path.to_path_buf()); health_check(temp_path); // Check if the content matches one of the checkpoints - wal.seek(SeekFrom::Start(0)).expect("seek failed"); - let data = wal.read(4).expect("read failed"); + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); assert!(data == vec![41, 42, 43, 44] || data == vec![45, 46, 47, 48], "data should match one of the checkpoints"); } @@ -253,18 +253,18 @@ fn test_all_operations() { } 1 => { // Read - let wal_size = wal.stream_len().expect("stream_len failed"); - let mut wal_stream_pos = wal.stream_position().expect("stream_position failed"); + let wal_size = wal.stream_len(); + let mut wal_stream_pos = wal.stream_position(); if wal_stream_pos >= wal_size { //seek to the beginning of the file - wal.seek(SeekFrom::Start(0)).expect("seek failed"); + wal.seek(SeekFrom::Start(0)); comp_file.seek(SeekFrom::Start(0)).expect("seek failed"); wal_stream_pos = 0; } let max_read_size = wal_size - wal_stream_pos; let percentage: f64 = rand::Rng::random_range(&mut rng, 0.0..100.0); let size = ((percentage as u64) * max_read_size) / 100; - let data = wal.read(size).expect(&format!("read failed, wal_size: {}, wal_stream_pos: {}, size: {}", wal_size, wal_stream_pos, size)); + let data = wal.read(size); let mut pseudo_data = vec![0u8; size as usize]; comp_file.read_exact(&mut pseudo_data).expect("read failed"); assert_eq!(data, pseudo_data, "data should match comp.test data"); @@ -273,20 +273,20 @@ fn test_all_operations() { // Seek let comp_file_len = comp_file.metadata().expect("metadata failed").len(); let pos = rand::Rng::random_range(&mut rng, 0..=comp_file_len); - wal.seek(SeekFrom::Start(pos)).expect("seek failed"); + wal.seek(SeekFrom::Start(pos)); comp_file.seek(SeekFrom::Start(pos)).expect("seek failed"); let comp_file_seek_pos = comp_file.stream_position().expect("stream_position failed"); - assert_eq!(wal.stream_position().expect("stream_position failed"), comp_file_seek_pos, "stream_position should match comp.test seek position"); + assert_eq!(wal.stream_position(), comp_file_seek_pos, "stream_position should match comp.test seek position"); } 3 => { // StreamLen - let len = wal.stream_len().expect("stream_len failed"); + let len = wal.stream_len(); let comp_file_len = comp_file.metadata().expect("metadata failed").len(); assert_eq!(len, comp_file_len, "stream_len should match comp.test length"); } 4 => { // StreamPosition - let pos = wal.stream_position().expect("stream_position failed"); + let pos = wal.stream_position(); let comp_file_seek_pos = comp_file.stream_position().expect("stream_position failed"); assert_eq!(pos, comp_file_seek_pos, "stream_position should match comp.test position"); } @@ -301,10 +301,10 @@ fn test_all_operations() { comp_file_check.read_to_end(&mut comp_content).expect("failed to read comp.test"); assert_eq!(file_content, comp_content, "wal.tick should match comp.test content after atomic checkpoint"); //check if the stream positions and lengths are still equal - let wal_stream_pos = wal.stream_position().expect("stream_position failed"); + let wal_stream_pos = wal.stream_position(); let comp_file_seek_pos = comp_file.stream_position().expect("stream_position failed"); assert_eq!(wal_stream_pos, comp_file_seek_pos, "stream_position should match comp.test position after atomic checkpoint"); - let wal_len = wal.stream_len().expect("stream_len failed"); + let wal_len = wal.stream_len(); let comp_file_len = comp_file.metadata().expect("metadata failed").len(); assert_eq!(wal_len, comp_file_len, "stream_len should match comp.test length after atomic checkpoint"); } From c515c03006f6b7d6814be89387f5ff0ff2c5de64 Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Fri, 18 Jul 2025 17:12:43 +0200 Subject: [PATCH 12/14] - --- src/write_ahead_log/mod.rs | 4 +-- ...mple_wal.rs => write_ahead_log_default.rs} | 6 ++-- ...rs => write_ahead_log_default_recovery.rs} | 4 +-- ...le_wal_test.rs => write_ahead_log_test.rs} | 32 +++++++++---------- 4 files changed, 23 insertions(+), 23 deletions(-) rename src/write_ahead_log/{simple_wal.rs => write_ahead_log_default.rs} (99%) rename src/write_ahead_log/{simple_wal_recovery.rs => write_ahead_log_default_recovery.rs} (98%) rename tests/{simple_wal_test.rs => write_ahead_log_test.rs} (90%) diff --git a/src/write_ahead_log/mod.rs b/src/write_ahead_log/mod.rs index 6b96af5..f97c503 100644 --- a/src/write_ahead_log/mod.rs +++ b/src/write_ahead_log/mod.rs @@ -1,3 +1,3 @@ pub mod write_ahead_log; -pub mod simple_wal; -pub mod simple_wal_recovery; \ No newline at end of file +pub mod write_ahead_log_default; +pub mod write_ahead_log_default_recovery; \ No newline at end of file diff --git a/src/write_ahead_log/simple_wal.rs b/src/write_ahead_log/write_ahead_log_default.rs similarity index 99% rename from src/write_ahead_log/simple_wal.rs rename to src/write_ahead_log/write_ahead_log_default.rs index 8f291a1..516f383 100644 --- a/src/write_ahead_log/simple_wal.rs +++ b/src/write_ahead_log/write_ahead_log_default.rs @@ -22,7 +22,7 @@ use std::{fs::File, io::{Read, Seek, SeekFrom, Write}, path::PathBuf}; use crate::write_ahead_log::write_ahead_log::WriteAheadLog; -pub struct SimpleWal { +pub struct WriteAheadLogDefault { pub(crate) tick_file: std::fs::File, pub(crate) tock_file: std::fs::File, pub(crate) log_file: std::fs::File, @@ -45,7 +45,7 @@ impl LogEntry { } -impl SimpleWal { +impl WriteAheadLogDefault { pub fn new_wal_at_directory(dir_path: PathBuf) -> Self { assert!(dir_path.exists(), "Directory does not exist: {:?}", dir_path); @@ -161,7 +161,7 @@ impl SimpleWal { } } -impl WriteAheadLog for SimpleWal { +impl WriteAheadLog for WriteAheadLogDefault { fn read(&mut self, size: u64) -> Vec { let mut buffer = vec![0u8; size as usize]; self.get_current_operational_file() diff --git a/src/write_ahead_log/simple_wal_recovery.rs b/src/write_ahead_log/write_ahead_log_default_recovery.rs similarity index 98% rename from src/write_ahead_log/simple_wal_recovery.rs rename to src/write_ahead_log/write_ahead_log_default_recovery.rs index cda7ad1..d8de4bc 100644 --- a/src/write_ahead_log/simple_wal_recovery.rs +++ b/src/write_ahead_log/write_ahead_log_default_recovery.rs @@ -1,8 +1,8 @@ use std::{fs::File, io::{self, Read, Seek, SeekFrom, Write}, path::PathBuf}; -use crate::write_ahead_log::simple_wal::SimpleWal; +use crate::write_ahead_log::write_ahead_log_default::WriteAheadLogDefault; -impl SimpleWal { +impl WriteAheadLogDefault { /// Opens an existing WAL at the specified directory. diff --git a/tests/simple_wal_test.rs b/tests/write_ahead_log_test.rs similarity index 90% rename from tests/simple_wal_test.rs rename to tests/write_ahead_log_test.rs index 21f83be..d981711 100644 --- a/tests/simple_wal_test.rs +++ b/tests/write_ahead_log_test.rs @@ -1,4 +1,4 @@ -use core_data::write_ahead_log::{simple_wal::SimpleWal, write_ahead_log::WriteAheadLog}; +use core_data::write_ahead_log::{write_ahead_log_default::WriteAheadLogDefault, write_ahead_log::WriteAheadLog}; use rand::SeedableRng; use std::{io::SeekFrom, path::Path}; use std::fs::{File, OpenOptions}; @@ -33,7 +33,7 @@ fn health_check(temp_path: &Path) { fn test_new_wal_at_directory() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let _ = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let _ = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); assert!(temp_path.join("wal.tick").exists()); assert!(temp_path.join("wal.tock").exists()); assert!(temp_path.join("wal.log").exists()); @@ -56,7 +56,7 @@ fn test_read() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![1, 2, 3, 4]); wal.seek(SeekFrom::Start(0)); @@ -70,7 +70,7 @@ fn test_write() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![5, 6, 7, 8]); wal.seek(SeekFrom::Start(0)); @@ -84,7 +84,7 @@ fn test_seek() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![9, 10, 11, 12]); wal.seek(SeekFrom::Start(2)); @@ -98,7 +98,7 @@ fn test_stream_len() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![13, 14, 15, 16]); let len = wal.stream_len(); @@ -111,7 +111,7 @@ fn test_stream_position() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![17, 18, 19, 20]); wal.seek(SeekFrom::Start(2)); @@ -125,7 +125,7 @@ fn test_atomic_checkpoint() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![21, 22, 23, 24]); wal.atomic_checkpoint(); wal.seek(SeekFrom::Start(0)); @@ -139,7 +139,7 @@ fn test_set_len() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![25, 26, 27, 28]); let len = wal.stream_len(); @@ -162,11 +162,11 @@ fn test_set_len() { fn test_open_wal_at_directory() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![29, 30, 31, 32]); wal.atomic_checkpoint(); drop(wal); - let mut wal = SimpleWal::open_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::open_wal_at_directory(temp_path.to_path_buf()); wal.seek(SeekFrom::Start(0)); let data = wal.read(4); assert_eq!(data, vec![29, 30, 31, 32]); @@ -176,7 +176,7 @@ fn test_open_wal_at_directory() { fn test_recovery_type_a() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![33, 34, 35, 36]); wal.atomic_checkpoint(); wal.seek(SeekFrom::Start(0)); @@ -184,7 +184,7 @@ fn test_recovery_type_a() { // Simulate a crash by dropping the wal before the atomic checkpoint wich results in a non empty log file and a recovery type A drop(wal); - let mut wal = SimpleWal::open_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::open_wal_at_directory(temp_path.to_path_buf()); health_check(temp_path); // Check if the content matches the checkpoint wal.seek(SeekFrom::Start(0)); @@ -196,7 +196,7 @@ fn test_recovery_type_a() { fn test_recovery_type_b() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); wal.write(vec![41, 42, 43, 44]); wal.atomic_checkpoint(); wal.seek(SeekFrom::Start(0)); @@ -209,7 +209,7 @@ fn test_recovery_type_b() { std::fs::write(temp_path.join("wal.meta"), [1u8; 16].iter().chain([0u8; 16].iter()).cloned().collect::>()).expect("failed to write wal.meta"); std::fs::write(temp_path.join("wal.log"), [49u8, 50u8, 51u8, 52u8]).expect("failed to write wal.log"); - let mut wal = SimpleWal::open_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::open_wal_at_directory(temp_path.to_path_buf()); health_check(temp_path); // Check if the content matches one of the checkpoints wal.seek(SeekFrom::Start(0)); @@ -223,7 +223,7 @@ fn test_recovery_type_b() { fn test_all_operations() { let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); let temp_path = temp_dir.path(); - let mut wal = SimpleWal::new_wal_at_directory(temp_path.to_path_buf()); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); let comp_file_path = temp_path.join("comp.test"); let mut comp_file = OpenOptions::new() From dee2e8dbf60f02a7d7a1a4617a33707f3dc9848f Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Fri, 18 Jul 2025 20:23:37 +0200 Subject: [PATCH 13/14] - --- src/write_ahead_log/write_ahead_log_default.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/write_ahead_log/write_ahead_log_default.rs b/src/write_ahead_log/write_ahead_log_default.rs index 516f383..77ba2b7 100644 --- a/src/write_ahead_log/write_ahead_log_default.rs +++ b/src/write_ahead_log/write_ahead_log_default.rs @@ -23,10 +23,10 @@ use std::{fs::File, io::{Read, Seek, SeekFrom, Write}, path::PathBuf}; use crate::write_ahead_log::write_ahead_log::WriteAheadLog; pub struct WriteAheadLogDefault { - pub(crate) tick_file: std::fs::File, - pub(crate) tock_file: std::fs::File, - pub(crate) log_file: std::fs::File, - pub(crate) meta_file: std::fs::File, + pub(in crate::write_ahead_log) tick_file: std::fs::File, + pub(in crate::write_ahead_log) tock_file: std::fs::File, + pub(in crate::write_ahead_log) log_file: std::fs::File, + pub(in crate::write_ahead_log) meta_file: std::fs::File, } enum LogEntry{ From 1d1b4527312c4d53afe17cbfc3723a13760f9d4c Mon Sep 17 00:00:00 2001 From: Marvin Mielchen <100961250+marvinmielchen@users.noreply.github.com> Date: Fri, 18 Jul 2025 20:47:32 +0200 Subject: [PATCH 14/14] - --- src/write_ahead_log/mod.rs | 8 +- ...te_ahead_log_default.rs => wal_default.rs} | 96 +------------------ src/write_ahead_log/wal_default_file_io.rs | 20 ++++ src/write_ahead_log/wal_default_log_entry.rs | 89 +++++++++++++++++ ...lt_recovery.rs => wal_default_recovery.rs} | 9 +- .../{write_ahead_log.rs => wal_trait.rs} | 0 tests/write_ahead_log_test.rs | 2 +- 7 files changed, 118 insertions(+), 106 deletions(-) rename src/write_ahead_log/{write_ahead_log_default.rs => wal_default.rs} (74%) create mode 100644 src/write_ahead_log/wal_default_file_io.rs create mode 100644 src/write_ahead_log/wal_default_log_entry.rs rename src/write_ahead_log/{write_ahead_log_default_recovery.rs => wal_default_recovery.rs} (94%) rename src/write_ahead_log/{write_ahead_log.rs => wal_trait.rs} (100%) diff --git a/src/write_ahead_log/mod.rs b/src/write_ahead_log/mod.rs index f97c503..57d180f 100644 --- a/src/write_ahead_log/mod.rs +++ b/src/write_ahead_log/mod.rs @@ -1,3 +1,5 @@ -pub mod write_ahead_log; -pub mod write_ahead_log_default; -pub mod write_ahead_log_default_recovery; \ No newline at end of file +pub mod wal_trait; +pub mod wal_default; +pub mod wal_default_recovery; +pub mod wal_default_file_io; +pub mod wal_default_log_entry; \ No newline at end of file diff --git a/src/write_ahead_log/write_ahead_log_default.rs b/src/write_ahead_log/wal_default.rs similarity index 74% rename from src/write_ahead_log/write_ahead_log_default.rs rename to src/write_ahead_log/wal_default.rs index 77ba2b7..92c46ba 100644 --- a/src/write_ahead_log/write_ahead_log_default.rs +++ b/src/write_ahead_log/wal_default.rs @@ -20,7 +20,8 @@ use std::{fs::File, io::{Read, Seek, SeekFrom, Write}, path::PathBuf}; -use crate::write_ahead_log::write_ahead_log::WriteAheadLog; +use crate::write_ahead_log::{wal_default_log_entry::LogEntry, wal_trait::WriteAheadLog}; +use crate::write_ahead_log::wal_default_file_io::create_file_with_permissions; pub struct WriteAheadLogDefault { pub(in crate::write_ahead_log) tick_file: std::fs::File, @@ -29,22 +30,6 @@ pub struct WriteAheadLogDefault { pub(in crate::write_ahead_log) meta_file: std::fs::File, } -enum LogEntry{ - Write(u64, Vec), - SetLen(u64), -} - -impl LogEntry { - fn get_data(&self) -> &Vec { - if let LogEntry::Write(_, data) = self { - data - } else { - panic!("LogEntry does not contain data"); - } - } - -} - impl WriteAheadLogDefault { pub fn new_wal_at_directory(dir_path: PathBuf) -> Self { @@ -91,74 +76,6 @@ impl WriteAheadLogDefault { } } - - fn write_log_entry(&mut self, log_entry: &LogEntry) { - match log_entry { - LogEntry::Write(stream_pos, data) => { - self.log_file - .write_all(b"WR") - .expect("Failed to write log entry opcode"); - self.log_file - .write_all(&stream_pos.to_le_bytes()) - .expect("Failed to write stream position to log file"); - self.log_file - .write_all(data.len().to_le_bytes().as_slice()) - .expect("Failed to write data length to log file"); - self.log_file - .write_all(data) - .expect("Failed to write data to log file"); - } - LogEntry::SetLen(size) => { - self.log_file - .write_all(b"SL") - .expect("Failed to write log entry opcode"); - self.log_file - .write_all(&size.to_le_bytes()) - .expect("Failed to write size to log file"); - } - } - } - - fn read_log_entry(&mut self) -> LogEntry { - - let mut opcode = [0u8; 2]; - self.log_file - .read_exact(&mut opcode) - .expect("Failed to read log entry opcode"); - - match &opcode { - b"WR" => { // Write operation - let mut stream_pos_bytes = [0u8; 8]; - self.log_file - .read_exact(&mut stream_pos_bytes) - .expect("Failed to read stream position from log file"); - let stream_pos = u64::from_le_bytes(stream_pos_bytes); - - let mut data_length_bytes = [0u8; 8]; - self.log_file - .read_exact(&mut data_length_bytes) - .expect("Failed to read data length from log file"); - let data_length = u64::from_le_bytes(data_length_bytes) as usize; - - let mut data = vec![0u8; data_length]; - self.log_file - .read_exact(&mut data) - .expect("Failed to read data from log file"); - - LogEntry::Write(stream_pos, data) - } - b"SL" => { // SetLen operation - let mut size_bytes = [0u8; 8]; - self.log_file - .read_exact(&mut size_bytes) - .expect("Failed to read size from log file"); - let size = u64::from_le_bytes(size_bytes); - LogEntry::SetLen(size) - } - _ => panic!("Unknown log entry opcode encountered"), - } - - } } impl WriteAheadLog for WriteAheadLogDefault { @@ -296,15 +213,6 @@ impl WriteAheadLog for WriteAheadLogDefault { } } -fn create_file_with_permissions(path: &PathBuf) -> std::fs::File { - std::fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(path) - .expect("Failed to create one of the WAL files") -} diff --git a/src/write_ahead_log/wal_default_file_io.rs b/src/write_ahead_log/wal_default_file_io.rs new file mode 100644 index 0000000..a427e41 --- /dev/null +++ b/src/write_ahead_log/wal_default_file_io.rs @@ -0,0 +1,20 @@ +use std::path::PathBuf; + +pub(in crate::write_ahead_log) fn create_file_with_permissions(path: &PathBuf) -> std::fs::File { + std::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(path) + .expect("Failed to create one of the WAL files") +} + + +pub(in crate::write_ahead_log) fn open_file_with_permissions(path: &PathBuf) -> std::fs::File { + std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(path) + .expect("Failed to open one of the WAL files") +} \ No newline at end of file diff --git a/src/write_ahead_log/wal_default_log_entry.rs b/src/write_ahead_log/wal_default_log_entry.rs new file mode 100644 index 0000000..d7ae873 --- /dev/null +++ b/src/write_ahead_log/wal_default_log_entry.rs @@ -0,0 +1,89 @@ +use std::io::{Read, Write}; + +use crate::write_ahead_log::wal_default::WriteAheadLogDefault; + +pub(in crate::write_ahead_log) enum LogEntry{ + Write(u64, Vec), + SetLen(u64), +} + +impl LogEntry { + pub(in crate::write_ahead_log) fn get_data(&self) -> &Vec { + if let LogEntry::Write(_, data) = self { + data + } else { + panic!("LogEntry does not contain data"); + } + } +} + +impl WriteAheadLogDefault { + + pub(in crate::write_ahead_log) fn write_log_entry(&mut self, log_entry: &LogEntry) { + match log_entry { + LogEntry::Write(stream_pos, data) => { + self.log_file + .write_all(b"WR") + .expect("Failed to write log entry opcode"); + self.log_file + .write_all(&stream_pos.to_le_bytes()) + .expect("Failed to write stream position to log file"); + self.log_file + .write_all(data.len().to_le_bytes().as_slice()) + .expect("Failed to write data length to log file"); + self.log_file + .write_all(data) + .expect("Failed to write data to log file"); + } + LogEntry::SetLen(size) => { + self.log_file + .write_all(b"SL") + .expect("Failed to write log entry opcode"); + self.log_file + .write_all(&size.to_le_bytes()) + .expect("Failed to write size to log file"); + } + } + } + + pub(in crate::write_ahead_log) fn read_log_entry(&mut self) -> LogEntry { + + let mut opcode = [0u8; 2]; + self.log_file + .read_exact(&mut opcode) + .expect("Failed to read log entry opcode"); + + match &opcode { + b"WR" => { // Write operation + let mut stream_pos_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut stream_pos_bytes) + .expect("Failed to read stream position from log file"); + let stream_pos = u64::from_le_bytes(stream_pos_bytes); + + let mut data_length_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut data_length_bytes) + .expect("Failed to read data length from log file"); + let data_length = u64::from_le_bytes(data_length_bytes) as usize; + + let mut data = vec![0u8; data_length]; + self.log_file + .read_exact(&mut data) + .expect("Failed to read data from log file"); + + LogEntry::Write(stream_pos, data) + } + b"SL" => { // SetLen operation + let mut size_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut size_bytes) + .expect("Failed to read size from log file"); + let size = u64::from_le_bytes(size_bytes); + LogEntry::SetLen(size) + } + _ => panic!("Unknown log entry opcode encountered"), + } + } + +} \ No newline at end of file diff --git a/src/write_ahead_log/write_ahead_log_default_recovery.rs b/src/write_ahead_log/wal_default_recovery.rs similarity index 94% rename from src/write_ahead_log/write_ahead_log_default_recovery.rs rename to src/write_ahead_log/wal_default_recovery.rs index d8de4bc..33ce7c1 100644 --- a/src/write_ahead_log/write_ahead_log_default_recovery.rs +++ b/src/write_ahead_log/wal_default_recovery.rs @@ -1,6 +1,6 @@ use std::{fs::File, io::{self, Read, Seek, SeekFrom, Write}, path::PathBuf}; -use crate::write_ahead_log::write_ahead_log_default::WriteAheadLogDefault; +use crate::write_ahead_log::{wal_default_file_io::open_file_with_permissions, wal_default::WriteAheadLogDefault}; impl WriteAheadLogDefault { @@ -102,10 +102,3 @@ impl WriteAheadLogDefault { } -fn open_file_with_permissions(path: &PathBuf) -> std::fs::File { - std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(path) - .expect("Failed to open one of the WAL files") -} \ No newline at end of file diff --git a/src/write_ahead_log/write_ahead_log.rs b/src/write_ahead_log/wal_trait.rs similarity index 100% rename from src/write_ahead_log/write_ahead_log.rs rename to src/write_ahead_log/wal_trait.rs diff --git a/tests/write_ahead_log_test.rs b/tests/write_ahead_log_test.rs index d981711..327a2ee 100644 --- a/tests/write_ahead_log_test.rs +++ b/tests/write_ahead_log_test.rs @@ -1,4 +1,4 @@ -use core_data::write_ahead_log::{write_ahead_log_default::WriteAheadLogDefault, write_ahead_log::WriteAheadLog}; +use core_data::write_ahead_log::{wal_default::WriteAheadLogDefault, wal_trait::WriteAheadLog}; use rand::SeedableRng; use std::{io::SeekFrom, path::Path}; use std::fs::{File, OpenOptions};