From 25e5417b59eb33b26475d73557775f225f293e26 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 18 Dec 2024 00:12:36 +0200 Subject: [PATCH 1/6] kv: add iterator types --- kinode/src/kv.rs | 147 +++++++++++++++++++++++++++++++++++++++++++++++ lib/src/kv.rs | 10 ++++ 2 files changed, 157 insertions(+) diff --git a/kinode/src/kv.rs b/kinode/src/kv.rs index a08c132a3..0fa79980f 100644 --- a/kinode/src/kv.rs +++ b/kinode/src/kv.rs @@ -6,6 +6,7 @@ use lib::types::core::{ PackageId, PrintSender, Printout, ProcessId, Request, Response, FD_MANAGER_PROCESS_ID, KV_PROCESS_ID, }; +use rand::random; use rocksdb::OptimisticTransactionDB; use std::{ collections::{HashMap, VecDeque}, @@ -24,6 +25,13 @@ struct KvState { /// access order of dbs, used to cull if we hit the fds limit access_order: Arc>>, txs: Arc>)>>>, + /// track active iterators: (package_id, db_name) -> (iterator_id -> current position) + iterators: Arc< + DashMap< + (PackageId, String), + DashMap>, // Store last seen key instead of iterator + >, + >, fds_limit: u64, } @@ -42,6 +50,7 @@ impl KvState { open_kvs: Arc::new(DashMap::new()), access_order: Arc::new(Mutex::new(UniqueQueue::new())), txs: Arc::new(DashMap::new()), + iterators: Arc::new(DashMap::new()), fds_limit: 10, } } @@ -98,6 +107,108 @@ impl KvState { self.remove_db(key.0, key.1).await; } } + + async fn handle_iter_start( + &mut self, + package_id: PackageId, + db: String, + prefix: Option>, + ) -> Result { + let db_key = (package_id.clone(), db.clone()); + let _db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?; + + // Generate a random iterator ID and ensure it's unique + let iterators = self + .iterators + .entry(db_key.clone()) + .or_insert_with(|| DashMap::new()); + + let mut iterator_id = random::(); + while iterators.contains_key(&iterator_id) { + iterator_id = random::(); + } + + // Store the starting position (prefix or empty vec for start) + iterators.insert(iterator_id, prefix.unwrap_or_default()); + + Ok(iterator_id) + } + + async fn handle_iter_next( + &mut self, + package_id: PackageId, + db: String, + iterator_id: u64, + count: u64, + ) -> Result<(Vec<(Vec, Vec)>, bool), KvError> { + let db_key = (package_id.clone(), db.clone()); + let db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?; + + let db_iters = self.iterators.get(&db_key).ok_or(KvError::NoDb)?; + let last_key = db_iters + .get(&iterator_id) + .ok_or(KvError::NoIterator)? + .clone(); + + let mut entries = Vec::new(); + let mut done = true; + + // Create a fresh iterator starting from our last position + let mode = if last_key.is_empty() { + rocksdb::IteratorMode::Start + } else { + rocksdb::IteratorMode::From(&last_key, rocksdb::Direction::Forward) + }; + + let mut iter = db.iterator(mode); + let mut count_remaining = count; + + while let Some(item) = iter.next() { + if count_remaining == 0 { + done = false; + break; + } + + match item { + Ok((key, value)) => { + let key_vec = key.to_vec(); + if !key_vec.starts_with(&last_key) && !last_key.is_empty() { + // We've moved past our prefix + break; + } + entries.push((key_vec.clone(), value.to_vec())); + if let Some(mut last_key_entry) = db_iters.get_mut(&iterator_id) { + *last_key_entry = key_vec; + } + count_remaining -= 1; + } + Err(e) => { + return Err(KvError::RocksDBError { + action: "iter_next".to_string(), + error: e.to_string(), + }); + } + } + } + + Ok((entries, done)) + } + + async fn handle_iter_close( + &mut self, + package_id: PackageId, + db: String, + iterator_id: u64, + ) -> Result<(), KvError> { + let db_key = (package_id, db); + if let Some(db_iters) = self.iterators.get_mut(&db_key) { + db_iters.remove(&iterator_id); + if db_iters.is_empty() { + self.iterators.remove(&db_key); + } + } + Ok(()) + } } pub async fn kv( @@ -379,6 +490,39 @@ async fn handle_request( } (serde_json::to_vec(&KvResponse::Ok).unwrap(), None) } + KvAction::IterStart { prefix } => { + let iterator_id = state + .handle_iter_start( + request.package_id.clone(), + request.db.clone(), + prefix.clone(), + ) + .await?; + ( + serde_json::to_vec(&KvResponse::IterStart { iterator_id }).unwrap(), + None, + ) + } + KvAction::IterNext { iterator_id, count } => { + let (entries, done) = state + .handle_iter_next( + request.package_id.clone(), + request.db.clone(), + *iterator_id, + *count, + ) + .await?; + ( + serde_json::to_vec(&KvResponse::IterNext { done }).unwrap(), + Some(serde_json::to_vec(&entries).unwrap()), + ) + } + KvAction::IterClose { iterator_id } => { + state + .handle_iter_close(request.package_id.clone(), request.db.clone(), *iterator_id) + .await?; + (serde_json::to_vec(&KvResponse::Ok).unwrap(), None) + } }; if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) { @@ -534,6 +678,9 @@ async fn check_caps( Ok(()) } KvAction::Backup { .. } => Ok(()), + KvAction::IterStart { .. } => Ok(()), + KvAction::IterNext { .. } => Ok(()), + KvAction::IterClose { .. } => Ok(()), } } diff --git a/lib/src/kv.rs b/lib/src/kv.rs index 24974bb38..3118e8f5b 100644 --- a/lib/src/kv.rs +++ b/lib/src/kv.rs @@ -20,6 +20,10 @@ pub enum KvAction { BeginTx, Commit { tx_id: u64 }, Backup, + // Iterator operations + IterStart { prefix: Option> }, + IterNext { iterator_id: u64, count: u64 }, + IterClose { iterator_id: u64 }, } #[derive(Debug, Serialize, Deserialize)] @@ -28,6 +32,10 @@ pub enum KvResponse { BeginTx { tx_id: u64 }, Get { key: Vec }, Err { error: KvError }, + // Iterator responses + IterStart { iterator_id: u64 }, + IterNext { done: bool }, + IterClose { iterator_id: u64 }, } #[derive(Debug, Serialize, Deserialize, Error)] @@ -38,6 +46,8 @@ pub enum KvError { KeyNotFound, #[error("no Tx found")] NoTx, + #[error("Iterator not found")] + NoIterator, #[error("No capability: {error}")] NoCap { error: String }, #[error("rocksdb internal error: {error}")] From cc0e06598a21478ca3a919d34b3c5bda1bd86e26 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Thu, 19 Dec 2024 23:55:30 +0200 Subject: [PATCH 2/6] kv: interface cleanup --- lib/src/kv.rs | 110 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 101 insertions(+), 9 deletions(-) diff --git a/lib/src/kv.rs b/lib/src/kv.rs index 3118e8f5b..df390bd5b 100644 --- a/lib/src/kv.rs +++ b/lib/src/kv.rs @@ -10,51 +10,143 @@ pub struct KvRequest { pub action: KvAction, } +/// IPC Action format, representing operations that can be performed on the key-value runtime module. +/// These actions are included in a KvRequest sent to the kv:distro:sys runtime module. #[derive(Debug, Serialize, Deserialize, Clone)] pub enum KvAction { + /// Opens an existing key-value database or creates a new one if it doesn't exist. Open, + /// Permanently deletes the entire key-value database. RemoveDb, + /// Sets a value for the specified key in the database. + /// + /// # Parameters + /// * `key` - The key as a byte vector + /// * `tx_id` - Optional transaction ID if this operation is part of a transaction Set { key: Vec, tx_id: Option }, + /// Deletes a key-value pair from the database. + /// + /// # Parameters + /// * `key` - The key to delete as a byte vector + /// * `tx_id` - Optional transaction ID if this operation is part of a transaction Delete { key: Vec, tx_id: Option }, + /// Retrieves the value associated with the specified key. + /// + /// # Parameters + /// * `key` - The key to look up as a byte vector Get { key: Vec }, + /// Begins a new transaction for atomic operations. BeginTx, + /// Commits all operations in the specified transaction. + /// + /// # Parameters + /// * `tx_id` - The ID of the transaction to commit Commit { tx_id: u64 }, + /// Creates a backup of the database. Backup, - // Iterator operations + /// Starts an iterator over the database contents. + /// + /// # Parameters + /// * `prefix` - Optional byte vector to filter keys by prefix IterStart { prefix: Option> }, + /// Advances the iterator and returns the next batch of items. + /// + /// # Parameters + /// * `iterator_id` - The ID of the iterator to advance + /// * `count` - Maximum number of items to return IterNext { iterator_id: u64, count: u64 }, + /// Closes an active iterator. + /// + /// # Parameters + /// * `iterator_id` - The ID of the iterator to close IterClose { iterator_id: u64 }, } +/// Response types for key-value store operations. +/// These responses are returned after processing a KvAction request. #[derive(Debug, Serialize, Deserialize)] pub enum KvResponse { + /// Indicates successful completion of an operation. Ok, + /// Returns the transaction ID for a newly created transaction. + /// + /// # Fields + /// * `tx_id` - The ID of the newly created transaction BeginTx { tx_id: u64 }, + /// Returns the key that was retrieved from the database. + /// + /// # Fields + /// * `key` - The retrieved key as a byte vector Get { key: Vec }, + /// Indicates an error occurred during the operation. + /// + /// # Fields + /// * `error` - The specific error that occurred Err { error: KvError }, - // Iterator responses + /// Returns the ID of a newly created iterator. + /// + /// # Fields + /// * `iterator_id` - The ID of the created iterator IterStart { iterator_id: u64 }, + /// Indicates whether the iterator has more items. + /// + /// # Fields + /// * `done` - True if there are no more items to iterate over IterNext { done: bool }, + /// Confirms the closure of an iterator. + /// + /// # Fields + /// * `iterator_id` - The ID of the closed iterator IterClose { iterator_id: u64 }, } +/// Errors that can occur during key-value store operations. +/// These errors are returned as part of `KvResponse::Err` when an operation fails. #[derive(Debug, Serialize, Deserialize, Error)] pub enum KvError { - #[error("DbDoesNotExist")] + /// The requested database does not exist. + #[error("Database does not exist")] NoDb, - #[error("KeyNotFound")] + + /// The requested key was not found in the database. + #[error("Key not found in database")] KeyNotFound, - #[error("no Tx found")] + + /// No active transaction found for the given transaction ID. + #[error("Transaction not found")] NoTx, + + /// The specified iterator was not found. #[error("Iterator not found")] NoIterator, - #[error("No capability: {error}")] + + /// The operation requires capabilities that the caller doesn't have. + /// + /// # Fields + /// * `error` - Description of the missing capability or permission + #[error("Missing required capability: {error}")] NoCap { error: String }, - #[error("rocksdb internal error: {error}")] + + /// An internal RocksDB error occurred during the operation. + /// + /// # Fields + /// * `action` - The operation that was being performed + /// * `error` - The specific error message from RocksDB + #[error("RocksDB error during {action}: {error}")] RocksDBError { action: String, error: String }, - #[error("input bytes/json/key error: {error}")] + + /// Error parsing or processing input data. + /// + /// # Fields + /// * `error` - Description of what was invalid about the input + #[error("Invalid input: {error}")] InputError { error: String }, - #[error("IO error: {error}")] + + /// An I/O error occurred during the operation. + /// + /// # Fields + /// * `error` - Description of the I/O error + #[error("I/O error: {error}")] IOError { error: String }, } From cc56a19311be53b966982cedc29c47260ca5c306 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Fri, 20 Dec 2024 00:01:25 +0200 Subject: [PATCH 3/6] kv: close iterator if done --- kinode/src/kv.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/kinode/src/kv.rs b/kinode/src/kv.rs index 0fa79980f..faed2b93b 100644 --- a/kinode/src/kv.rs +++ b/kinode/src/kv.rs @@ -191,6 +191,16 @@ impl KvState { } } + // if we're done, automatically close the iterator + if done { + if let Some(db_iters) = self.iterators.get_mut(&db_key) { + db_iters.remove(&iterator_id); + if db_iters.is_empty() { + self.iterators.remove(&db_key); + } + } + } + Ok((entries, done)) } From 4e170fb0bf1c37d86a4d6476064cc704b18a6d57 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Fri, 20 Dec 2024 00:07:37 +0200 Subject: [PATCH 4/6] kv: comment tweak --- lib/src/kv.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/src/kv.rs b/lib/src/kv.rs index df390bd5b..4146d6648 100644 --- a/lib/src/kv.rs +++ b/lib/src/kv.rs @@ -2,7 +2,9 @@ use crate::types::core::{CapMessage, PackageId}; use serde::{Deserialize, Serialize}; use thiserror::Error; -/// IPC Request format for the kv:distro:sys runtime module. +/// Actions are sent to a specific key value database, `db` is the name, +/// `package_id` is the [`PackageId`]. Capabilities are checked, you can access another process's +/// database if it has given you the [`crate::Capability`]. #[derive(Debug, Serialize, Deserialize)] pub struct KvRequest { pub package_id: PackageId, From 45fc56b3793347b09c5c6e8824762640ccf06cca Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Fri, 20 Dec 2024 02:39:07 +0200 Subject: [PATCH 5/6] kv: iterator tweaks.. --- kinode/src/kv.rs | 123 ++++++++++++++++++++--------------------------- 1 file changed, 52 insertions(+), 71 deletions(-) diff --git a/kinode/src/kv.rs b/kinode/src/kv.rs index faed2b93b..25a83de5a 100644 --- a/kinode/src/kv.rs +++ b/kinode/src/kv.rs @@ -25,13 +25,8 @@ struct KvState { /// access order of dbs, used to cull if we hit the fds limit access_order: Arc>>, txs: Arc>)>>>, - /// track active iterators: (package_id, db_name) -> (iterator_id -> current position) - iterators: Arc< - DashMap< - (PackageId, String), - DashMap>, // Store last seen key instead of iterator - >, - >, + /// track active iterators: (package_id, db_name, iterator_id) -> (prefix, current_key) + iterators: Arc, Vec)>>, fds_limit: u64, } @@ -114,22 +109,26 @@ impl KvState { db: String, prefix: Option>, ) -> Result { + // Ensure DB exists let db_key = (package_id.clone(), db.clone()); - let _db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?; - - // Generate a random iterator ID and ensure it's unique - let iterators = self - .iterators - .entry(db_key.clone()) - .or_insert_with(|| DashMap::new()); + if !self.open_kvs.contains_key(&db_key) { + return Err(KvError::NoDb); + } + // Generate unique iterator ID let mut iterator_id = random::(); - while iterators.contains_key(&iterator_id) { + while self + .iterators + .contains_key(&(package_id.clone(), db.clone(), iterator_id)) + { iterator_id = random::(); } - // Store the starting position (prefix or empty vec for start) - iterators.insert(iterator_id, prefix.unwrap_or_default()); + // Store initial state: (prefix, current_key) + self.iterators.insert( + (package_id, db, iterator_id), + (prefix.unwrap_or_default(), Vec::new()), + ); Ok(iterator_id) } @@ -141,67 +140,54 @@ impl KvState { iterator_id: u64, count: u64, ) -> Result<(Vec<(Vec, Vec)>, bool), KvError> { - let db_key = (package_id.clone(), db.clone()); - let db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?; + let iter_key = (package_id.clone(), db.clone(), iterator_id); + let db_key = (package_id, db); - let db_iters = self.iterators.get(&db_key).ok_or(KvError::NoDb)?; - let last_key = db_iters - .get(&iterator_id) + // Get DB and iterator state + let db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?; + let (prefix, current_key) = self + .iterators + .get(&iter_key) .ok_or(KvError::NoIterator)? .clone(); let mut entries = Vec::new(); - let mut done = true; - // Create a fresh iterator starting from our last position - let mode = if last_key.is_empty() { - rocksdb::IteratorMode::Start + // Create the appropriate iterator + let mut iter = if !prefix.is_empty() { + if !current_key.is_empty() { + db.iterator(rocksdb::IteratorMode::From( + ¤t_key, + rocksdb::Direction::Forward, + )) + } else { + db.prefix_iterator(&prefix) + } } else { - rocksdb::IteratorMode::From(&last_key, rocksdb::Direction::Forward) + db.iterator(rocksdb::IteratorMode::Start) }; - let mut iter = db.iterator(mode); - let mut count_remaining = count; + let mut items_collected = 0; + let mut last_key = None; - while let Some(item) = iter.next() { - if count_remaining == 0 { - done = false; - break; - } + while let Some(Ok((key, value))) = iter.next() { + let key_vec = key.to_vec(); + entries.push((key_vec.clone(), value.to_vec())); + last_key = Some(key_vec); + items_collected += 1; - match item { - Ok((key, value)) => { - let key_vec = key.to_vec(); - if !key_vec.starts_with(&last_key) && !last_key.is_empty() { - // We've moved past our prefix - break; - } - entries.push((key_vec.clone(), value.to_vec())); - if let Some(mut last_key_entry) = db_iters.get_mut(&iterator_id) { - *last_key_entry = key_vec; - } - count_remaining -= 1; - } - Err(e) => { - return Err(KvError::RocksDBError { - action: "iter_next".to_string(), - error: e.to_string(), - }); + if items_collected >= count { + // Not done, save the last key + if let Some(last_key) = last_key { + self.iterators.insert(iter_key, (prefix, last_key)); } + return Ok((entries, false)); } } - // if we're done, automatically close the iterator - if done { - if let Some(db_iters) = self.iterators.get_mut(&db_key) { - db_iters.remove(&iterator_id); - if db_iters.is_empty() { - self.iterators.remove(&db_key); - } - } - } - - Ok((entries, done)) + // We've exhausted the iterator + self.iterators.remove(&iter_key); + Ok((entries, true)) } async fn handle_iter_close( @@ -210,13 +196,8 @@ impl KvState { db: String, iterator_id: u64, ) -> Result<(), KvError> { - let db_key = (package_id, db); - if let Some(db_iters) = self.iterators.get_mut(&db_key) { - db_iters.remove(&iterator_id); - if db_iters.is_empty() { - self.iterators.remove(&db_key); - } - } + let iter_key = (package_id, db, iterator_id); + self.iterators.remove(&iter_key); Ok(()) } } @@ -344,7 +325,7 @@ async fn handle_request( let request: KvRequest = match serde_json::from_slice(&body) { Ok(r) => r, Err(e) => { - println!("kv: got invalid Request: {}", e); + // println!("kv: got invalid Request: {}", e); return Err(KvError::InputError { error: "didn't serialize to KvAction.".into(), }); From 97d8d822bae79effe01bf0cbd194773e96d9762c Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Fri, 20 Dec 2024 15:08:53 +0200 Subject: [PATCH 6/6] kv: fixes --- kinode/src/kv.rs | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/kinode/src/kv.rs b/kinode/src/kv.rs index 25a83de5a..7539d4c1a 100644 --- a/kinode/src/kv.rs +++ b/kinode/src/kv.rs @@ -153,7 +153,7 @@ impl KvState { let mut entries = Vec::new(); - // Create the appropriate iterator + // create iterator based on whether we're doing prefix iteration let mut iter = if !prefix.is_empty() { if !current_key.is_empty() { db.iterator(rocksdb::IteratorMode::From( @@ -164,28 +164,41 @@ impl KvState { db.prefix_iterator(&prefix) } } else { - db.iterator(rocksdb::IteratorMode::Start) + if !current_key.is_empty() { + db.iterator(rocksdb::IteratorMode::From( + ¤t_key, + rocksdb::Direction::Forward, + )) + } else { + db.iterator(rocksdb::IteratorMode::Start) + } }; let mut items_collected = 0; - let mut last_key = None; + // collect entries until we hit our batch size while let Some(Ok((key, value))) = iter.next() { let key_vec = key.to_vec(); + // if we have a prefix, check that the key still starts with it + if !prefix.is_empty() { + if key_vec.len() < prefix.len() || !key_vec.starts_with(&prefix) { + // we've moved past our prefix range, we're done + self.iterators.remove(&iter_key); + return Ok((entries, true)); + } + } + entries.push((key_vec.clone(), value.to_vec())); - last_key = Some(key_vec); items_collected += 1; if items_collected >= count { - // Not done, save the last key - if let Some(last_key) = last_key { - self.iterators.insert(iter_key, (prefix, last_key)); - } + // not done, save last key for next batch + self.iterators.insert(iter_key, (prefix, key_vec)); return Ok((entries, false)); } } - // We've exhausted the iterator + // no more entries, clean up iterator state self.iterators.remove(&iter_key); Ok((entries, true)) } @@ -324,7 +337,7 @@ async fn handle_request( let request: KvRequest = match serde_json::from_slice(&body) { Ok(r) => r, - Err(e) => { + Err(_e) => { // println!("kv: got invalid Request: {}", e); return Err(KvError::InputError { error: "didn't serialize to KvAction.".into(),