diff --git a/Cargo.toml b/Cargo.toml index c66312c..caa4831 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,3 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] + + +[dev-dependencies] +tempfile = "3.20.0" +rand = "0.9.1" \ No newline at end of file diff --git a/src/write_ahead_log/mod.rs b/src/write_ahead_log/mod.rs index 394fe71..57d180f 100644 --- a/src/write_ahead_log/mod.rs +++ b/src/write_ahead_log/mod.rs @@ -1 +1,5 @@ -pub mod write_ahead_log; \ No newline at end of file +pub mod wal_trait; +pub mod wal_default; +pub mod wal_default_recovery; +pub mod wal_default_file_io; +pub mod wal_default_log_entry; \ No newline at end of file diff --git a/src/write_ahead_log/wal_default.rs b/src/write_ahead_log/wal_default.rs new file mode 100644 index 0000000..92c46ba --- /dev/null +++ b/src/write_ahead_log/wal_default.rs @@ -0,0 +1,219 @@ +//! This WAL implementation consists of 4 files: +//! - wal.tick and wal.tock contain the persistent data and always one of them is the fallback and the other is the operational file. +//! - wal.log contains log entries since the last checkpoint +//! - wal.meta contains a signature that indicates the current operational file +//! +//! On every write operation or set_len, the following steps are performed: +//! 1. A log entry is created and written to the log file. Recovery Type A +//! 2. Then the operation is persited on the operational file. Recovery Type A +//! +//! On checkpoint, the following steps are performed: +//! 1. The current operational file in wal.meta is switched to the fallback. Recovery Type B +//! 2. Iterate over the log file and apply all log entries to the new operational file. Recovery Type A +//! 3. Erase all log entries in the log file. Recovery Type A +//! +//! After every numbered step, fsync syscall is used to ensure durability. +//! +//! The Recovery Type for each step specifies how a faulty state is detected if the system crashes during this step and how the system can recover from it: +//! - Recovery Type A: A faulty state is detected when wal.meta contains a valid signature but wal.log is not empty. The system can recover by erasing the current operational file, generating it again as a copy of the fallback and afterwards erasing the log file. If this process is interrupted by a crash, the same recovery can be performed again. +//! - Recovery Type B: A faulty state is detected when wal.meta does not contain a valid signature. The system can recover by erasing wal.tick and generating it again as a copy of wal.tock. Then the log file is erased. Then the wal.meta is updated with a new valid signature pointing to wal.tock. If this process is interrupted by a crash, recovery is still possible. + +use std::{fs::File, io::{Read, Seek, SeekFrom, Write}, path::PathBuf}; + +use crate::write_ahead_log::{wal_default_log_entry::LogEntry, wal_trait::WriteAheadLog}; +use crate::write_ahead_log::wal_default_file_io::create_file_with_permissions; + +pub struct WriteAheadLogDefault { + pub(in crate::write_ahead_log) tick_file: std::fs::File, + pub(in crate::write_ahead_log) tock_file: std::fs::File, + pub(in crate::write_ahead_log) log_file: std::fs::File, + pub(in crate::write_ahead_log) meta_file: std::fs::File, +} + +impl WriteAheadLogDefault { + + pub fn new_wal_at_directory(dir_path: PathBuf) -> Self { + assert!(dir_path.exists(), "Directory does not exist: {:?}", dir_path); + assert!(dir_path.is_dir(), "Path is not a directory: {:?}", dir_path); + assert!(dir_path.read_dir().expect("Could not read directory").next().is_none(), "Directory is not empty: {:?}", dir_path); + + let tick_file_path = dir_path.join("wal.tick"); + let tick_file = create_file_with_permissions(&tick_file_path); + + let tock_file_path = dir_path.join("wal.tock"); + let tock_file = create_file_with_permissions(&tock_file_path); + + let log_file_path = dir_path.join("wal.log"); + let log_file = create_file_with_permissions(&log_file_path); + + let meta_file_path = dir_path.join("wal.meta"); + let mut meta_file = create_file_with_permissions(&meta_file_path); + meta_file.write_all(&[0; 32]).expect("Failed to write operational file indicator"); // 32 bytes of zeros for the operational file indicator + meta_file.sync_all().expect("Failed to sync meta file"); + + Self { + tick_file, + tock_file, + log_file, + meta_file, + } + } + + fn get_current_operational_file(&mut self) -> &File { + self.meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + let mut operational_file_indicator: [u8; 32] = [0; 32]; + self.meta_file + .read_exact(&mut operational_file_indicator) + .expect("Failed to read operational file indicator"); + if operational_file_indicator.iter().all(|&x| x == 0) { + return &self.tick_file; + } else if operational_file_indicator.iter().all(|&x| x == 1) { + return &self.tock_file; + } else { + panic!("Invalid operational file indicator"); + } + } + +} + +impl WriteAheadLog for WriteAheadLogDefault { + fn read(&mut self, size: u64) -> Vec { + let mut buffer = vec![0u8; size as usize]; + self.get_current_operational_file() + .read_exact(&mut buffer).expect("Failed to read data from operational file during a WAL read operation"); + buffer + } + + fn write(&mut self, buf: Vec){ + // 1. A log entry is created and written to the log file + let stream_pos = self.get_current_operational_file() + .stream_position() + .expect("Failed to get stream position"); + let log_entry = LogEntry::Write(stream_pos, buf); + self.write_log_entry(&log_entry); + self.log_file + .sync_all() + .expect("Failed to sync log file"); + // 2. Then the data is written to the current operational file + self.get_current_operational_file() + .write_all(log_entry.get_data()) + .expect("Failed to write data to operational file"); + self.get_current_operational_file() + .sync_all() + .expect("Failed to sync operational file"); + } + + fn seek(&mut self, pos: std::io::SeekFrom){ + self.get_current_operational_file() + .seek(pos) + .expect("Failed to seek in operational file during a WAL seek operation"); + } + + fn stream_len(&mut self) -> u64{ + let len = self.get_current_operational_file() + .metadata() + .expect("Failed to get metadata of operational file during a WAL stream_len operation") + .len(); + return len; + } + + fn stream_position(&mut self) -> u64{ + let pos = self.get_current_operational_file() + .stream_position() + .expect("Failed to get stream position of operational file during a WAL stream_position operation"); + return pos; + } + + fn set_len(&mut self, size: u64){ + // 1. A log entry is created and written to the log file + let log_entry = LogEntry::SetLen(size); + self.write_log_entry(&log_entry); + self.log_file + .sync_all() + .expect("Failed to sync log file"); + // 2. Then the size is set in the current operational file + self.get_current_operational_file() + .set_len(size) + .expect("Failed to set length of operational file"); + self.get_current_operational_file() + .sync_all() + .expect("Failed to sync operational file"); + } + + fn atomic_checkpoint(&mut self){ + // save the currrent seek position of the operational file + let current_seek_pos = self.get_current_operational_file() + .stream_position() + .expect("Failed to get current seek position"); + // 1. The current operational file in wal.meta is switched to the fallback + let mut operational_file_indicator = [0u8; 32]; + self.meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + self.meta_file + .read_exact(&mut operational_file_indicator) + .expect("Failed to read operational file indicator"); + if operational_file_indicator.iter().all(|&x| x == 0) { + operational_file_indicator.fill(1); + } else if operational_file_indicator.iter().all(|&x| x == 1) { + operational_file_indicator.fill(0); + } else { + panic!("Invalid operational file indicator"); + } + self.meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + self.meta_file + .write_all(&operational_file_indicator) + .expect("Failed to write operational file indicator"); + self.meta_file + .sync_all() + .expect("Failed to sync meta file"); + // 2. Iterate over the log file and apply all log entries to the new operational file + self.log_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in log file"); + while self.log_file.stream_position().unwrap() < self.log_file.metadata().unwrap().len() { + let log_entry = self.read_log_entry(); + match log_entry { + LogEntry::Write(stream_pos, data) => { + self.get_current_operational_file() + .seek(std::io::SeekFrom::Start(stream_pos)) + .expect("Failed to seek in operational file"); + self.get_current_operational_file() + .write_all(&data) + .expect("Failed to write data to operational file"); + } + LogEntry::SetLen(size) => { + self.get_current_operational_file() + .set_len(size) + .expect("Failed to set length of operational file"); + } + } + } + self.get_current_operational_file() + .sync_all() + .expect("Failed to sync operational file"); + // 3. Erase all log entries in the log file + self.log_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in log file"); + self.log_file + .set_len(0) + .expect("Failed to erase log file"); + self.log_file + .sync_all() + .expect("Failed to sync log file"); + // Restore the seek position of the operational file + self.get_current_operational_file() + .seek(SeekFrom::Start(current_seek_pos)) + .expect("Failed to restore seek position in operational file"); + } +} + + + + + diff --git a/src/write_ahead_log/wal_default_file_io.rs b/src/write_ahead_log/wal_default_file_io.rs new file mode 100644 index 0000000..a427e41 --- /dev/null +++ b/src/write_ahead_log/wal_default_file_io.rs @@ -0,0 +1,20 @@ +use std::path::PathBuf; + +pub(in crate::write_ahead_log) fn create_file_with_permissions(path: &PathBuf) -> std::fs::File { + std::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(path) + .expect("Failed to create one of the WAL files") +} + + +pub(in crate::write_ahead_log) fn open_file_with_permissions(path: &PathBuf) -> std::fs::File { + std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(path) + .expect("Failed to open one of the WAL files") +} \ No newline at end of file diff --git a/src/write_ahead_log/wal_default_log_entry.rs b/src/write_ahead_log/wal_default_log_entry.rs new file mode 100644 index 0000000..d7ae873 --- /dev/null +++ b/src/write_ahead_log/wal_default_log_entry.rs @@ -0,0 +1,89 @@ +use std::io::{Read, Write}; + +use crate::write_ahead_log::wal_default::WriteAheadLogDefault; + +pub(in crate::write_ahead_log) enum LogEntry{ + Write(u64, Vec), + SetLen(u64), +} + +impl LogEntry { + pub(in crate::write_ahead_log) fn get_data(&self) -> &Vec { + if let LogEntry::Write(_, data) = self { + data + } else { + panic!("LogEntry does not contain data"); + } + } +} + +impl WriteAheadLogDefault { + + pub(in crate::write_ahead_log) fn write_log_entry(&mut self, log_entry: &LogEntry) { + match log_entry { + LogEntry::Write(stream_pos, data) => { + self.log_file + .write_all(b"WR") + .expect("Failed to write log entry opcode"); + self.log_file + .write_all(&stream_pos.to_le_bytes()) + .expect("Failed to write stream position to log file"); + self.log_file + .write_all(data.len().to_le_bytes().as_slice()) + .expect("Failed to write data length to log file"); + self.log_file + .write_all(data) + .expect("Failed to write data to log file"); + } + LogEntry::SetLen(size) => { + self.log_file + .write_all(b"SL") + .expect("Failed to write log entry opcode"); + self.log_file + .write_all(&size.to_le_bytes()) + .expect("Failed to write size to log file"); + } + } + } + + pub(in crate::write_ahead_log) fn read_log_entry(&mut self) -> LogEntry { + + let mut opcode = [0u8; 2]; + self.log_file + .read_exact(&mut opcode) + .expect("Failed to read log entry opcode"); + + match &opcode { + b"WR" => { // Write operation + let mut stream_pos_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut stream_pos_bytes) + .expect("Failed to read stream position from log file"); + let stream_pos = u64::from_le_bytes(stream_pos_bytes); + + let mut data_length_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut data_length_bytes) + .expect("Failed to read data length from log file"); + let data_length = u64::from_le_bytes(data_length_bytes) as usize; + + let mut data = vec![0u8; data_length]; + self.log_file + .read_exact(&mut data) + .expect("Failed to read data from log file"); + + LogEntry::Write(stream_pos, data) + } + b"SL" => { // SetLen operation + let mut size_bytes = [0u8; 8]; + self.log_file + .read_exact(&mut size_bytes) + .expect("Failed to read size from log file"); + let size = u64::from_le_bytes(size_bytes); + LogEntry::SetLen(size) + } + _ => panic!("Unknown log entry opcode encountered"), + } + } + +} \ No newline at end of file diff --git a/src/write_ahead_log/wal_default_recovery.rs b/src/write_ahead_log/wal_default_recovery.rs new file mode 100644 index 0000000..33ce7c1 --- /dev/null +++ b/src/write_ahead_log/wal_default_recovery.rs @@ -0,0 +1,104 @@ +use std::{fs::File, io::{self, Read, Seek, SeekFrom, Write}, path::PathBuf}; + +use crate::write_ahead_log::{wal_default_file_io::open_file_with_permissions, wal_default::WriteAheadLogDefault}; + +impl WriteAheadLogDefault { + + + /// Opens an existing WAL at the specified directory. + /// + /// Recovery will be attempted which means that this function call can take a long time if the WAL is in a faulty state. + /// If the WAL is in a valid state, it will return immediately and if the WAL can not be recovered, it will panic. + pub fn open_wal_at_directory(dir_path: PathBuf) -> Self { + assert!(dir_path.exists(), "Directory does not exist: {:?}", dir_path); + assert!(dir_path.is_dir(), "Path is not a directory: {:?}", dir_path); + + assert!(dir_path.join("wal.tick").exists(), "wal.tick does not exist in directory: {:?}", dir_path); + assert!(dir_path.join("wal.tock").exists(), "wal.tock does not exist in directory: {:?}", dir_path); + + assert!(dir_path.join("wal.log").exists(), "wal.log does not exist in directory: {:?}", dir_path); + assert!(dir_path.join("wal.meta").exists(), "wal.meta does not exist in directory: {:?}", dir_path); + + let tick_file_path = dir_path.join("wal.tick"); + let mut tick_file = open_file_with_permissions(&tick_file_path); + + let tock_file_path = dir_path.join("wal.tock"); + let mut tock_file = open_file_with_permissions(&tock_file_path); + + let log_file_path = dir_path.join("wal.log"); + let log_file = open_file_with_permissions(&log_file_path); + + let meta_file_path = dir_path.join("wal.meta"); + let mut meta_file = open_file_with_permissions(&meta_file_path); + + let mut operational_file_indicator = [0u8; 32]; + meta_file + .seek(std::io::SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + meta_file + .read_exact(&mut operational_file_indicator) + .expect("Failed to read operational file indicator"); + let valid_indicator = operational_file_indicator.iter().all(|&x| x == 0) || operational_file_indicator.iter().all(|&x| x == 1); + let log_file_len = log_file.metadata().expect("Failed to get log file metadata").len(); + + + if valid_indicator && log_file_len > 0 { + // If the operational file indicator is valid but the log file is not empty, do a Recovery Type A + // 1. Copy the the fallback to the operational file + let operational_file: &mut File; + let fallback_file: &mut File; + if operational_file_indicator.iter().all(|&x| x == 0) { + operational_file = &mut tick_file; + fallback_file = &mut tock_file; + } else { + operational_file = &mut tock_file; + fallback_file = &mut tick_file; + } + operational_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in operational file"); + fallback_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in fallback file"); + operational_file.set_len(0).expect("Failed to erase operational file"); + io::copy(fallback_file, operational_file).expect("Failed to copy fallback file to operational file"); + operational_file.sync_all().expect("Failed to sync operational file"); + // 2. Erase the log file + log_file.set_len(0).expect("Failed to erase log file"); + log_file.sync_all().expect("Failed to sync log file"); + } else if !valid_indicator && dir_path.join("wal.tick").exists() { + // If the operational file indicator is not valid, do a Recovery Type B + // 1. Copy the tock file to the tick file + tock_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in tock file"); + tick_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in tick file"); + tick_file.set_len(0).expect("Failed to erase tick file"); + io::copy(&mut tock_file, &mut tick_file).expect("Failed to copy tock file to tick file"); + tick_file.sync_all().expect("Failed to sync tick file"); + // 2. Update the operational file indicator in the meta file (write all ones) + meta_file + .seek(SeekFrom::Start(0)) + .expect("Failed to seek in meta file"); + meta_file + .write_all(&[1u8; 32]) + .expect("Failed to write operational file indicator"); + meta_file.sync_all().expect("Failed to sync meta file"); + // 3. Erase the log file + log_file.set_len(0).expect("Failed to erase log file"); + log_file.sync_all().expect("Failed to sync log file"); + } + + return Self { + tick_file, + tock_file, + log_file, + meta_file, + }; + } + + +} + diff --git a/src/write_ahead_log/wal_trait.rs b/src/write_ahead_log/wal_trait.rs new file mode 100644 index 0000000..87d3d2b --- /dev/null +++ b/src/write_ahead_log/wal_trait.rs @@ -0,0 +1,18 @@ +use std::io::{SeekFrom}; + +pub trait WriteAheadLog { + + fn read(&mut self, size: u64) -> Vec; + + fn write(&mut self, buf: Vec); + + fn seek(&mut self, pos: SeekFrom); + + fn stream_len(&mut self) -> u64; + + fn stream_position(&mut self) -> u64; + + fn atomic_checkpoint(&mut self); + + fn set_len(&mut self, size: u64); +} \ No newline at end of file diff --git a/src/write_ahead_log/write_ahead_log.rs b/src/write_ahead_log/write_ahead_log.rs deleted file mode 100644 index 889f45d..0000000 --- a/src/write_ahead_log/write_ahead_log.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::io::{Error, SeekFrom}; - -pub trait WriteAheadLog { - - fn read(&mut self) -> Result, Error>; - - fn write(&mut self, buf: Vec) -> Result<(), Error>; - - fn seek(&mut self, pos: SeekFrom) -> Result<(), Error>; - - fn stream_len(&mut self) -> Result; - - fn stream_position(&mut self) -> Result; - - fn atomic_checkpoint(&mut self) -> Result<(), Error>; - - fn set_len(&mut self, size: u64) -> Result<(), Error>; -} \ No newline at end of file diff --git a/tests/write_ahead_log.rs b/tests/write_ahead_log.rs deleted file mode 100644 index e69de29..0000000 diff --git a/tests/write_ahead_log_test.rs b/tests/write_ahead_log_test.rs new file mode 100644 index 0000000..327a2ee --- /dev/null +++ b/tests/write_ahead_log_test.rs @@ -0,0 +1,329 @@ +use core_data::write_ahead_log::{wal_default::WriteAheadLogDefault, wal_trait::WriteAheadLog}; +use rand::SeedableRng; +use std::{io::SeekFrom, path::Path}; +use std::fs::{File, OpenOptions}; +use std::io::{Read, Seek, Write}; + +fn _print_all_file_content(temp_path: &Path){ + let tick_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); + let tock_content = std::fs::read(temp_path.join("wal.tock")).expect("failed to read wal.tock"); + let log_content = std::fs::read(temp_path.join("wal.log")).expect("failed to read wal.log"); + let meta_content = std::fs::read(temp_path.join("wal.meta")).expect("failed to read wal.meta"); + println!("wal.tick content: {:?}", tick_content); + println!("wal.tock content: {:?}", tock_content); + println!("wal.log content: {:?}", log_content); + println!("wal.meta content: {:?}", meta_content); +} + +fn health_check(temp_path: &Path) { + // Check if the log file is empty + let log_content = std::fs::read(temp_path.join("wal.log")).expect("failed to read wal.log"); + assert!(log_content.is_empty(), "wal.log should be empty after recovery"); + // Check if the meta file is valid + let meta_content = std::fs::read(temp_path.join("wal.meta")).expect("failed to read wal.meta"); + assert_eq!(meta_content.len(), 32, "wal.meta should contain 32 bytes"); + assert!(meta_content.iter().all(|&x| x == 1) || meta_content.iter().all(|&x| x == 0), "wal.meta should contain all ones or all zeros after recovery"); + // Check if tick and tock files are identical + let tick_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); + let tock_content = std::fs::read(temp_path.join("wal.tock")).expect("failed to read wal.tock"); + assert_eq!(tick_content, tock_content, "wal.tick and wal.tock should be identical after recovery"); +} + +#[test] +fn test_new_wal_at_directory() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + let _ = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + assert!(temp_path.join("wal.tick").exists()); + assert!(temp_path.join("wal.tock").exists()); + assert!(temp_path.join("wal.log").exists()); + assert!(temp_path.join("wal.meta").exists()); + + let tick_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); + let tock_content = std::fs::read(temp_path.join("wal.tock")).expect("failed to read wal.tock"); + let log_content = std::fs::read(temp_path.join("wal.log")).expect("failed to read wal.log"); + let meta_content = std::fs::read(temp_path.join("wal.meta")).expect("failed to read wal.meta"); + + assert!(tick_content.is_empty(), "wal.tick should be empty"); + assert!(tock_content.is_empty(), "wal.tock should be empty"); + assert!(log_content.is_empty(), "wal.log should be empty"); + assert_eq!(meta_content.len(), 32, "wal.meta should contain 32 bytes of zeros"); + assert!(meta_content.iter().all(|&x| x == 0), "wal.meta should contain 32 bytes of zeros"); +} + +#[test] +fn test_read() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![1, 2, 3, 4]); + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); + + assert_eq!(data, vec![1, 2, 3, 4]); +} + +#[test] +fn test_write() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![5, 6, 7, 8]); + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); + + assert_eq!(data, vec![5, 6, 7, 8]); +} + +#[test] +fn test_seek() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![9, 10, 11, 12]); + wal.seek(SeekFrom::Start(2)); + let data = wal.read(2); + + assert_eq!(data, vec![11, 12]); +} + +#[test] +fn test_stream_len() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![13, 14, 15, 16]); + let len = wal.stream_len(); + + assert_eq!(len, 4); +} + +#[test] +fn test_stream_position() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![17, 18, 19, 20]); + wal.seek(SeekFrom::Start(2)); + let pos = wal.stream_position(); + + assert_eq!(pos, 2); +} + +#[test] +fn test_atomic_checkpoint() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + wal.write(vec![21, 22, 23, 24]); + wal.atomic_checkpoint(); + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); + + assert_eq!(data, vec![21, 22, 23, 24]); +} + +#[test] +fn test_set_len() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + + wal.write(vec![25, 26, 27, 28]); + let len = wal.stream_len(); + assert_eq!(len, 4); + wal.set_len(3); + let len = wal.stream_len(); + assert_eq!(len, 3); + wal.atomic_checkpoint(); + let len = wal.stream_len(); + assert_eq!(len, 3); + wal.set_len(2); + let len = wal.stream_len(); + assert_eq!(len, 2); + wal.atomic_checkpoint(); + let len = wal.stream_len(); + assert_eq!(len, 2); +} + +#[test] +fn test_open_wal_at_directory() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + wal.write(vec![29, 30, 31, 32]); + wal.atomic_checkpoint(); + drop(wal); + let mut wal = WriteAheadLogDefault::open_wal_at_directory(temp_path.to_path_buf()); + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); + assert_eq!(data, vec![29, 30, 31, 32]); +} + +#[test] +fn test_recovery_type_a() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + wal.write(vec![33, 34, 35, 36]); + wal.atomic_checkpoint(); + wal.seek(SeekFrom::Start(0)); + wal.write(vec![37, 38, 39, 40]); + + // Simulate a crash by dropping the wal before the atomic checkpoint wich results in a non empty log file and a recovery type A + drop(wal); + let mut wal = WriteAheadLogDefault::open_wal_at_directory(temp_path.to_path_buf()); + health_check(temp_path); + // Check if the content matches the checkpoint + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); + assert_eq!(data, vec![33, 34, 35, 36]); +} + +#[test] +fn test_recovery_type_b() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + wal.write(vec![41, 42, 43, 44]); + wal.atomic_checkpoint(); + wal.seek(SeekFrom::Start(0)); + wal.write(vec![45, 46, 47, 48]); + wal.atomic_checkpoint(); + drop(wal); + + //now we simulate a crash during the atomic checkpoint by writing the meta file to half ones and half zeros + //and the log file to a non empty state + std::fs::write(temp_path.join("wal.meta"), [1u8; 16].iter().chain([0u8; 16].iter()).cloned().collect::>()).expect("failed to write wal.meta"); + std::fs::write(temp_path.join("wal.log"), [49u8, 50u8, 51u8, 52u8]).expect("failed to write wal.log"); + + let mut wal = WriteAheadLogDefault::open_wal_at_directory(temp_path.to_path_buf()); + health_check(temp_path); + // Check if the content matches one of the checkpoints + wal.seek(SeekFrom::Start(0)); + let data = wal.read(4); + assert!(data == vec![41, 42, 43, 44] || data == vec![45, 46, 47, 48], "data should match one of the checkpoints"); +} + + +//a function that is testing every operation by repeatedly writing and reading data between checkpoints while keeping a pseudo file in memory for comparison +#[test] +fn test_all_operations() { + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + let temp_path = temp_dir.path(); + let mut wal = WriteAheadLogDefault::new_wal_at_directory(temp_path.to_path_buf()); + + let comp_file_path = temp_path.join("comp.test"); + let mut comp_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&comp_file_path) + .expect("failed to create comp.test"); + + let seed: [u8; 32] = [ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, + 25, 26, 27, 28, 29, 30, 31, 32, + ]; + + let mut rng = rand::rngs::StdRng::from_seed(seed); + + for _ in 0..100 { + for _ in 0..10 { + let operation = rand::Rng::random_range(&mut rng, 0..7); + match operation { + 0 => { + // Write + let data: Vec = (0..10).map(|_| rand::Rng::random_range(&mut rng, 0..255)).collect(); + wal.write(data.clone()); + comp_file.write_all(&data).expect("write failed"); + } + 1 => { + // Read + let wal_size = wal.stream_len(); + let mut wal_stream_pos = wal.stream_position(); + if wal_stream_pos >= wal_size { + //seek to the beginning of the file + wal.seek(SeekFrom::Start(0)); + comp_file.seek(SeekFrom::Start(0)).expect("seek failed"); + wal_stream_pos = 0; + } + let max_read_size = wal_size - wal_stream_pos; + let percentage: f64 = rand::Rng::random_range(&mut rng, 0.0..100.0); + let size = ((percentage as u64) * max_read_size) / 100; + let data = wal.read(size); + let mut pseudo_data = vec![0u8; size as usize]; + comp_file.read_exact(&mut pseudo_data).expect("read failed"); + assert_eq!(data, pseudo_data, "data should match comp.test data"); + } + 2 => { + // Seek + let comp_file_len = comp_file.metadata().expect("metadata failed").len(); + let pos = rand::Rng::random_range(&mut rng, 0..=comp_file_len); + wal.seek(SeekFrom::Start(pos)); + comp_file.seek(SeekFrom::Start(pos)).expect("seek failed"); + let comp_file_seek_pos = comp_file.stream_position().expect("stream_position failed"); + assert_eq!(wal.stream_position(), comp_file_seek_pos, "stream_position should match comp.test seek position"); + } + 3 => { + // StreamLen + let len = wal.stream_len(); + let comp_file_len = comp_file.metadata().expect("metadata failed").len(); + assert_eq!(len, comp_file_len, "stream_len should match comp.test length"); + } + 4 => { + // StreamPosition + let pos = wal.stream_position(); + let comp_file_seek_pos = comp_file.stream_position().expect("stream_position failed"); + assert_eq!(pos, comp_file_seek_pos, "stream_position should match comp.test position"); + } + 5 => { + // AtomicCheckpoint + wal.atomic_checkpoint(); + + health_check(temp_path); + let file_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); + let mut comp_content = Vec::new(); + let mut comp_file_check = File::open(&comp_file_path).expect("failed to open comp.test"); + comp_file_check.read_to_end(&mut comp_content).expect("failed to read comp.test"); + assert_eq!(file_content, comp_content, "wal.tick should match comp.test content after atomic checkpoint"); + //check if the stream positions and lengths are still equal + let wal_stream_pos = wal.stream_position(); + let comp_file_seek_pos = comp_file.stream_position().expect("stream_position failed"); + assert_eq!(wal_stream_pos, comp_file_seek_pos, "stream_position should match comp.test position after atomic checkpoint"); + let wal_len = wal.stream_len(); + let comp_file_len = comp_file.metadata().expect("metadata failed").len(); + assert_eq!(wal_len, comp_file_len, "stream_len should match comp.test length after atomic checkpoint"); + } + 6 => { + // SetLen + let comp_file_len = comp_file.metadata().expect("metadata failed").len(); + let new_len = rand::Rng::random_range(&mut rng, 0..=comp_file_len); + wal.set_len(new_len); + comp_file.set_len(new_len).expect("set_len failed"); + } + _ => unreachable!(), + } + } + } + wal.atomic_checkpoint(); + health_check(temp_path); + let file_content = std::fs::read(temp_path.join("wal.tick")).expect("failed to read wal.tick"); + let mut comp_content = Vec::new(); + let mut comp_file_check = File::open(&comp_file_path).expect("failed to open comp.test"); + comp_file_check.read_to_end(&mut comp_content).expect("failed to read comp.test"); + assert_eq!(file_content, comp_content, "wal.tick should match comp.test content after all operations"); +} \ No newline at end of file