diff --git a/kinode/src/kv.rs b/kinode/src/kv.rs index a08c132a3..7539d4c1a 100644 --- a/kinode/src/kv.rs +++ b/kinode/src/kv.rs @@ -6,6 +6,7 @@ use lib::types::core::{ PackageId, PrintSender, Printout, ProcessId, Request, Response, FD_MANAGER_PROCESS_ID, KV_PROCESS_ID, }; +use rand::random; use rocksdb::OptimisticTransactionDB; use std::{ collections::{HashMap, VecDeque}, @@ -24,6 +25,8 @@ struct KvState { /// access order of dbs, used to cull if we hit the fds limit access_order: Arc>>, txs: Arc>)>>>, + /// track active iterators: (package_id, db_name, iterator_id) -> (prefix, current_key) + iterators: Arc, Vec)>>, fds_limit: u64, } @@ -42,6 +45,7 @@ impl KvState { open_kvs: Arc::new(DashMap::new()), access_order: Arc::new(Mutex::new(UniqueQueue::new())), txs: Arc::new(DashMap::new()), + iterators: Arc::new(DashMap::new()), fds_limit: 10, } } @@ -98,6 +102,117 @@ impl KvState { self.remove_db(key.0, key.1).await; } } + + async fn handle_iter_start( + &mut self, + package_id: PackageId, + db: String, + prefix: Option>, + ) -> Result { + // Ensure DB exists + let db_key = (package_id.clone(), db.clone()); + if !self.open_kvs.contains_key(&db_key) { + return Err(KvError::NoDb); + } + + // Generate unique iterator ID + let mut iterator_id = random::(); + while self + .iterators + .contains_key(&(package_id.clone(), db.clone(), iterator_id)) + { + iterator_id = random::(); + } + + // Store initial state: (prefix, current_key) + self.iterators.insert( + (package_id, db, iterator_id), + (prefix.unwrap_or_default(), Vec::new()), + ); + + Ok(iterator_id) + } + + async fn handle_iter_next( + &mut self, + package_id: PackageId, + db: String, + iterator_id: u64, + count: u64, + ) -> Result<(Vec<(Vec, Vec)>, bool), KvError> { + let iter_key = (package_id.clone(), db.clone(), iterator_id); + let db_key = (package_id, db); + + // Get DB and iterator state + let db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?; + let (prefix, current_key) = self + .iterators + .get(&iter_key) + .ok_or(KvError::NoIterator)? + .clone(); + + let mut entries = Vec::new(); + + // create iterator based on whether we're doing prefix iteration + let mut iter = if !prefix.is_empty() { + if !current_key.is_empty() { + db.iterator(rocksdb::IteratorMode::From( + ¤t_key, + rocksdb::Direction::Forward, + )) + } else { + db.prefix_iterator(&prefix) + } + } else { + if !current_key.is_empty() { + db.iterator(rocksdb::IteratorMode::From( + ¤t_key, + rocksdb::Direction::Forward, + )) + } else { + db.iterator(rocksdb::IteratorMode::Start) + } + }; + + let mut items_collected = 0; + + // collect entries until we hit our batch size + while let Some(Ok((key, value))) = iter.next() { + let key_vec = key.to_vec(); + // if we have a prefix, check that the key still starts with it + if !prefix.is_empty() { + if key_vec.len() < prefix.len() || !key_vec.starts_with(&prefix) { + // we've moved past our prefix range, we're done + self.iterators.remove(&iter_key); + return Ok((entries, true)); + } + } + + entries.push((key_vec.clone(), value.to_vec())); + items_collected += 1; + + if items_collected >= count { + // not done, save last key for next batch + self.iterators.insert(iter_key, (prefix, key_vec)); + return Ok((entries, false)); + } + } + + // no more entries, clean up iterator state + self.iterators.remove(&iter_key); + Ok((entries, true)) + } + + async fn handle_iter_close( + &mut self, + package_id: PackageId, + db: String, + iterator_id: u64, + ) -> Result<(), KvError> { + let iter_key = (package_id, db, iterator_id); + self.iterators.remove(&iter_key); + Ok(()) + } } pub async fn kv( @@ -222,8 +337,8 @@ async fn handle_request( let request: KvRequest = match serde_json::from_slice(&body) { Ok(r) => r, - Err(e) => { - println!("kv: got invalid Request: {}", e); + Err(_e) => { + // println!("kv: got invalid Request: {}", e); return Err(KvError::InputError { error: "didn't serialize to KvAction.".into(), }); @@ -379,6 +494,39 @@ async fn handle_request( } (serde_json::to_vec(&KvResponse::Ok).unwrap(), None) } + KvAction::IterStart { prefix } => { + let iterator_id = state + .handle_iter_start( + request.package_id.clone(), + request.db.clone(), + prefix.clone(), + ) + .await?; + ( + serde_json::to_vec(&KvResponse::IterStart { iterator_id }).unwrap(), + None, + ) + } + KvAction::IterNext { iterator_id, count } => { + let (entries, done) = state + .handle_iter_next( + request.package_id.clone(), + request.db.clone(), + *iterator_id, + *count, + ) + .await?; + ( + serde_json::to_vec(&KvResponse::IterNext { done }).unwrap(), + Some(serde_json::to_vec(&entries).unwrap()), + ) + } + KvAction::IterClose { iterator_id } => { + state + .handle_iter_close(request.package_id.clone(), request.db.clone(), *iterator_id) + .await?; + (serde_json::to_vec(&KvResponse::Ok).unwrap(), None) + } }; if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) { @@ -534,6 +682,9 @@ async fn check_caps( Ok(()) } KvAction::Backup { .. } => Ok(()), + KvAction::IterStart { .. } => Ok(()), + KvAction::IterNext { .. } => Ok(()), + KvAction::IterClose { .. } => Ok(()), } } diff --git a/lib/src/kv.rs b/lib/src/kv.rs index 24974bb38..4146d6648 100644 --- a/lib/src/kv.rs +++ b/lib/src/kv.rs @@ -2,7 +2,9 @@ use crate::types::core::{CapMessage, PackageId}; use serde::{Deserialize, Serialize}; use thiserror::Error; -/// IPC Request format for the kv:distro:sys runtime module. +/// Actions are sent to a specific key value database, `db` is the name, +/// `package_id` is the [`PackageId`]. Capabilities are checked, you can access another process's +/// database if it has given you the [`crate::Capability`]. #[derive(Debug, Serialize, Deserialize)] pub struct KvRequest { pub package_id: PackageId, @@ -10,41 +12,143 @@ pub struct KvRequest { pub action: KvAction, } +/// IPC Action format, representing operations that can be performed on the key-value runtime module. +/// These actions are included in a KvRequest sent to the kv:distro:sys runtime module. #[derive(Debug, Serialize, Deserialize, Clone)] pub enum KvAction { + /// Opens an existing key-value database or creates a new one if it doesn't exist. Open, + /// Permanently deletes the entire key-value database. RemoveDb, + /// Sets a value for the specified key in the database. + /// + /// # Parameters + /// * `key` - The key as a byte vector + /// * `tx_id` - Optional transaction ID if this operation is part of a transaction Set { key: Vec, tx_id: Option }, + /// Deletes a key-value pair from the database. + /// + /// # Parameters + /// * `key` - The key to delete as a byte vector + /// * `tx_id` - Optional transaction ID if this operation is part of a transaction Delete { key: Vec, tx_id: Option }, + /// Retrieves the value associated with the specified key. + /// + /// # Parameters + /// * `key` - The key to look up as a byte vector Get { key: Vec }, + /// Begins a new transaction for atomic operations. BeginTx, + /// Commits all operations in the specified transaction. + /// + /// # Parameters + /// * `tx_id` - The ID of the transaction to commit Commit { tx_id: u64 }, + /// Creates a backup of the database. Backup, + /// Starts an iterator over the database contents. + /// + /// # Parameters + /// * `prefix` - Optional byte vector to filter keys by prefix + IterStart { prefix: Option> }, + /// Advances the iterator and returns the next batch of items. + /// + /// # Parameters + /// * `iterator_id` - The ID of the iterator to advance + /// * `count` - Maximum number of items to return + IterNext { iterator_id: u64, count: u64 }, + /// Closes an active iterator. + /// + /// # Parameters + /// * `iterator_id` - The ID of the iterator to close + IterClose { iterator_id: u64 }, } +/// Response types for key-value store operations. +/// These responses are returned after processing a KvAction request. #[derive(Debug, Serialize, Deserialize)] pub enum KvResponse { + /// Indicates successful completion of an operation. Ok, + /// Returns the transaction ID for a newly created transaction. + /// + /// # Fields + /// * `tx_id` - The ID of the newly created transaction BeginTx { tx_id: u64 }, + /// Returns the key that was retrieved from the database. + /// + /// # Fields + /// * `key` - The retrieved key as a byte vector Get { key: Vec }, + /// Indicates an error occurred during the operation. + /// + /// # Fields + /// * `error` - The specific error that occurred Err { error: KvError }, + /// Returns the ID of a newly created iterator. + /// + /// # Fields + /// * `iterator_id` - The ID of the created iterator + IterStart { iterator_id: u64 }, + /// Indicates whether the iterator has more items. + /// + /// # Fields + /// * `done` - True if there are no more items to iterate over + IterNext { done: bool }, + /// Confirms the closure of an iterator. + /// + /// # Fields + /// * `iterator_id` - The ID of the closed iterator + IterClose { iterator_id: u64 }, } +/// Errors that can occur during key-value store operations. +/// These errors are returned as part of `KvResponse::Err` when an operation fails. #[derive(Debug, Serialize, Deserialize, Error)] pub enum KvError { - #[error("DbDoesNotExist")] + /// The requested database does not exist. + #[error("Database does not exist")] NoDb, - #[error("KeyNotFound")] + + /// The requested key was not found in the database. + #[error("Key not found in database")] KeyNotFound, - #[error("no Tx found")] + + /// No active transaction found for the given transaction ID. + #[error("Transaction not found")] NoTx, - #[error("No capability: {error}")] + + /// The specified iterator was not found. + #[error("Iterator not found")] + NoIterator, + + /// The operation requires capabilities that the caller doesn't have. + /// + /// # Fields + /// * `error` - Description of the missing capability or permission + #[error("Missing required capability: {error}")] NoCap { error: String }, - #[error("rocksdb internal error: {error}")] + + /// An internal RocksDB error occurred during the operation. + /// + /// # Fields + /// * `action` - The operation that was being performed + /// * `error` - The specific error message from RocksDB + #[error("RocksDB error during {action}: {error}")] RocksDBError { action: String, error: String }, - #[error("input bytes/json/key error: {error}")] + + /// Error parsing or processing input data. + /// + /// # Fields + /// * `error` - Description of what was invalid about the input + #[error("Invalid input: {error}")] InputError { error: String }, - #[error("IO error: {error}")] + + /// An I/O error occurred during the operation. + /// + /// # Fields + /// * `error` - Description of the I/O error + #[error("I/O error: {error}")] IOError { error: String }, }