Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion kv/src/db/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::error::DbError;
use crate::flush::{flush_memtable_to_disk, flush_worker, FlushMessage, FlushQueue};
use crate::sst::SSTReader;
use crate::storage::Memtable;
use crate::transaction::Transaction;
use crate::wal::reader::{WalEntry, WalReader};
use crate::wal::thread::{wal_thread, WalMessage};
use crossbeam_channel::Sender;
Expand Down Expand Up @@ -130,7 +131,7 @@ impl Db {
}
}

let global_sequence = Arc::new(AtomicU64::new(max_seq + 1));
let global_sequence = Arc::new(AtomicU64::new(max_seq.saturating_add(1)));

let wal_path = dir.join("wal.log");

Expand All @@ -154,6 +155,15 @@ impl Db {
})
}

pub fn begin(&self) -> Transaction<'_> {
Transaction::new(self.global_sequence.load(Ordering::Acquire), self)
}

// sllocate a new sequence number for transaction commits or other operations
pub(crate) fn next_sequence(&self) -> u64 {
self.global_sequence.fetch_add(1, Ordering::SeqCst)
}

// write goes to the memtable
// if memtable reaches a certain max size that memtable is freezed and a new empty memtable
// takes its place
Expand All @@ -177,6 +187,22 @@ impl Db {
Ok(())
}

// put but with of a particular seq
// used in transactions
pub fn put_seq(&self, key: &[u8], val: &[u8], seq: u64) -> Result<()> {
let _ = self.wal_sender.send(WalMessage::Append(WalEntry {
seq,
key: key.to_vec(),
val: val.to_vec(),
}));
let memtable = self.memtable.load();
memtable.put(key.to_vec(), val.to_vec(), seq);

self.flush_if_needed();

Ok(())
}

// first we'll check the mutable memtable that's there for current writes
// then check the 2 immutable memtable
// if not found then fallback to SSTs
Expand Down Expand Up @@ -219,13 +245,53 @@ impl Db {
Ok(None)
}

pub fn get_seq(&self, key: &[u8], seq: u64) -> Result<Option<Vec<u8>>> {
let memtable = self.memtable.load();

if let Some(val) = memtable.get_seq(key, seq) {
if val.is_empty() {
return Ok(None);
}
return Ok(Some(val));
}

let immutables = self.immutable_memtables.load();
for imt in immutables.iter().rev() {
if let Some(val) = imt.get_seq(key, seq) {
if val.is_empty() {
return Ok(None);
}
return Ok(Some(val));
}
}

let ssts = self.sstables.load();

for sst in ssts.iter() {
if let Some(val) = sst.get_seq(key, seq)? {
if val.is_empty() {
return Ok(None);
}
return Ok(Some(val));
}
}

Ok(None)
}

// deletion is not on spot, rather its like putting a tombstone (i.e. emtpy value) to that
// particular key, after compaction the old entries with some value are removed, also the
// emtpy value entry is also removed
pub fn del(&self, key: &[u8]) -> Result<()> {
self.put(key, &[])
}

// deletion with a particular seq
// used in transactions
pub fn del_seq(&self, key: &[u8], seq: u64) -> Result<()> {
self.put_seq(key, &[], seq)
}

pub fn flush_if_needed(&self) {
let memtable = self.memtable.load();
// memtables are configured to be of a certain max size to cap the memory usage after that
Expand Down
1 change: 1 addition & 0 deletions kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod db;
pub mod error;
pub mod sst;
pub mod storage;
pub mod transaction;
pub mod wal;

mod compaction;
Expand Down
3 changes: 2 additions & 1 deletion kv/src/sst/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub use reader::SSTReader;
pub use writer::SSTWriter;

pub const BLOCK_SIZE: usize = 16 * 1024;
pub const FOOTER_SIZE: usize = 44;
pub const FOOTER_SIZE: usize = 52;
pub const MAGIC: u64 = 0x4B45594C54_u64;

#[derive(Debug, Error)]
Expand All @@ -71,6 +71,7 @@ pub struct Footer {
pub index_offset: u64,
pub bloom_offset: u64,
pub num_entries: u64,
pub min_sequence: u64,
pub max_sequence: u64,
}

Expand Down
183 changes: 177 additions & 6 deletions kv/src/sst/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct SSTReader {
pub(super) mmap: Mmap,
pub(super) block_indexes: Arc<Vec<BlockIndex>>,
bloom_filter: Arc<BloomFilter>,
min_sequence: u64,
max_sequence: u64,
}

Expand All @@ -30,13 +31,15 @@ impl SSTReader {

let block_indexes = Self::read_index_block(&mmap, footer.index_offset)?;
let bloom_filter = super::bloom::read_bloom_filter(&mmap, footer.bloom_offset)?;
let min_sequence = footer.min_sequence;
let max_sequence = footer.max_sequence;

Ok(Self {
path,
mmap,
block_indexes: Arc::new(block_indexes),
bloom_filter: Arc::new(bloom_filter),
min_sequence,
max_sequence,
})
}
Expand All @@ -48,7 +51,8 @@ impl SSTReader {
// 12..20 index_offset (u64)
// 20..28 bloom_offset (u64)
// 28..36 num_entries (u64)
// 36..44 max_sequence (u64)
// 36..44 min_sequence (u64)
// 44..52 max_sequence (u64)
debug_assert!(bytes.len() == FOOTER_SIZE);

let magic = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
Expand All @@ -60,14 +64,16 @@ impl SSTReader {
let index_offset = u64::from_le_bytes(bytes[12..20].try_into().unwrap());
let bloom_offset = u64::from_le_bytes(bytes[20..28].try_into().unwrap());
let num_entries = u64::from_le_bytes(bytes[28..36].try_into().unwrap());
let max_sequence = u64::from_le_bytes(bytes[36..44].try_into().unwrap());
let min_sequence = u64::from_le_bytes(bytes[36..44].try_into().unwrap());
let max_sequence = u64::from_le_bytes(bytes[44..52].try_into().unwrap());

Ok(Footer {
magic,
version,
index_offset,
bloom_offset,
num_entries,
min_sequence,
max_sequence,
})
}
Expand Down Expand Up @@ -169,9 +175,53 @@ impl SSTReader {
Ok(None)
}

/// Search for a key within a specific block.
pub fn get_seq(&self, key: &[u8], snapshot_seq: u64) -> Result<Option<Vec<u8>>> {
// Quick check: if snapshot is before this SST's min sequence, no data visible
if snapshot_seq <= self.min_sequence {
return Ok(None);
}

if !self.bloom_filter.might_contain(key) {
return Ok(None);
}

// If snapshot is after all data in this SST, use regular get
if snapshot_seq > self.max_sequence {
return self.get(key);
}

let block_idx = match self
.block_indexes
.binary_search_by(|idx| idx.first_key.as_ref().cmp(key))
{
Ok(i) => i,
Err(0) => return Ok(None),
Err(i) => i - 1,
};

let stard_idx = if block_idx > 0 { block_idx - 1 } else { 0 };

let mut best: Option<(u64, Option<Vec<u8>>)> = None;

for idx in stard_idx..=block_idx {
if let Some((ent_seq, ent_val)) =
self.search_block_seq(self.block_indexes[idx].offset, key, snapshot_seq)?
{
if best.is_none() || ent_seq > best.as_ref().unwrap().0 {
best = Some((ent_seq, ent_val));
}
}
}
match best {
Some((_, Some(v))) => Ok(Some(v)),
Some((_, None)) => Ok(None),
None => Ok(None),
}
}

/// search for a key within a specific block.
///
/// Returns:
/// returns:
/// - `Ok(Some(val))` if key found with a non-empty value
/// - `Ok(None)` if key found with empty value (tombstone/deleted)
/// - `Err(SSTError::NotFound)` if key not in this block
Expand Down Expand Up @@ -267,7 +317,7 @@ impl SSTReader {
// entries are sorted by key first, then by sequence (descending), so we need
// to find the first occurrence of this key (which has the highest seq).

// Move backward to find the first occurrence of this key
// move backward to find the first occurrence of this key
while pos > 0 {
let prev_entry = &entries[pos - 1];
let prev_key = &block_data
Expand All @@ -279,7 +329,6 @@ impl SSTReader {
}
}

// pow pos points to the first occurrence, which has the highest sequence number
let entry = &entries[pos];
let entry_val = &block_data[entry.val_start..entry.val_start + entry.val_len];

Expand All @@ -294,10 +343,131 @@ impl SSTReader {
}
}

fn search_block_seq(
&self,
offset: u64,
key: &[u8],
snapshot_seq: u64,
) -> Result<Option<(u64, Option<Vec<u8>>)>> {
let block_data = {
let mut pos = offset as usize;
let block_len =
u32::from_le_bytes(self.mmap[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;

let block_data = &self.mmap[pos..pos + block_len];
pos += block_len;

let crc = u32::from_le_bytes(self.mmap[pos..pos + 4].try_into().unwrap());
let mut hasher = Hasher::new();
hasher.update(block_data);
if hasher.finalize() != crc {
return Err(SSTError::Corrupt);
}

block_data
};

#[derive(Clone, Debug)]
struct Entry {
seq: u64,
key_start: usize,
key_len: usize,
val_start: usize,
val_len: usize,
}

let mut entries = Vec::new();
let mut idx = 0;
let len = block_data.len();

while idx < len {
if idx + 6 > len {
break;
}

let key_len = u16::from_le_bytes(block_data[idx..idx + 2].try_into().unwrap()) as usize;
idx += 2;

let val_len = u32::from_le_bytes(block_data[idx..idx + 4].try_into().unwrap()) as usize;
idx += 4;

if idx + key_len + 8 + val_len > len {
break;
}

let key_start = idx;
idx += key_len;

let seq = u64::from_le_bytes(block_data[idx..idx + 8].try_into().unwrap());
idx += 8;

let val_start = idx;
idx += val_len;

entries.push(Entry {
seq,
key_start,
key_len,
val_start,
val_len,
});
}

// binary search for key
let pos = match entries.binary_search_by(|e| {
let entry_key = &block_data[e.key_start..e.key_start + e.key_len];
entry_key.cmp(key)
}) {
Ok(pos) => pos,
Err(_) => return Ok(None),
};

let mut first = pos;
while first > 0 {
let pk = &block_data[entries[first - 1].key_start
..entries[first - 1].key_start + entries[first - 1].key_len];
if pk == key {
first -= 1;
} else {
break;
}
}

// iterate entries for this key for seq to be in descending order
let mut i = first;
while i < entries.len() {
let e = &entries[i];
let entry_key = &block_data[e.key_start..e.key_start + e.key_len];
if entry_key != key {
break;
}

// for snapshot isolation
// only return entries with seq < snapshot_seq (strict inequality)
if e.seq < snapshot_seq {
let val_slice = &block_data[e.val_start..e.val_start + e.val_len];
let val_opt = if val_slice.is_empty() {
None
} else {
Some(val_slice.to_vec())
};
return Ok(Some((e.seq, val_opt)));
}

i += 1;
}

Ok(None)
}

pub fn path(&self) -> &Path {
&self.path
}

pub fn min_sequence(&self) -> u64 {
self.min_sequence
}
pub fn max_sequence(&self) -> u64 {
self.max_sequence
}
Expand All @@ -315,6 +485,7 @@ impl Clone for SSTReader {
},
block_indexes: Arc::clone(&self.block_indexes),
bloom_filter: Arc::clone(&self.bloom_filter),
min_sequence: self.min_sequence,
max_sequence: self.max_sequence,
}
}
Expand Down
Loading