diff --git a/runtime/lite/src/consumer.rs b/runtime/lite/src/consumer.rs index c64e1452..827a8e7d 100644 --- a/runtime/lite/src/consumer.rs +++ b/runtime/lite/src/consumer.rs @@ -19,7 +19,7 @@ use dotenv::dotenv; #[tokio::main] async fn main() { dotenv().ok(); - let db = CoreStorage::new("runtime").unwrap(); + let db = CoreStorage::new("runtime-db").unwrap(); let logger = CoreLogger::new(Some("./ssb-consumer.log")); let state_manager = GlobalState::new(logger.clone()); @@ -47,9 +47,6 @@ async fn main() { .await .unwrap(); - let logger = ssb_context.clone().lock().unwrap().get_logger().clone(); - logger.info("consumer successfully started✅"); - client .live_feed_with_execution_of_workflow(true, ssb_context) .await @@ -89,7 +86,7 @@ fn handle_client(mut stream: TcpStream, ctx: Arc>) { logger.info("Data Deserialized"); let db = ctx.get_db(); - db.insert(&body.pub_id.clone(), body).unwrap(); + db.insert_request_body(&body.pub_id.clone(), body).unwrap(); logger.info("Data inserted successfully"); // Respond to the client (optional) diff --git a/runtime/lite/src/modules/common/mod.rs b/runtime/lite/src/modules/common/mod.rs index cc4fd5c5..5a3de043 100644 --- a/runtime/lite/src/modules/common/mod.rs +++ b/runtime/lite/src/modules/common/mod.rs @@ -6,7 +6,7 @@ pub struct RequestBody { pub wasm: Vec, pub invite: String, pub pub_id: String, - pub allowed_hosts : Option>, + pub allowed_hosts: Option>, pub input: Value, } @@ -24,4 +24,4 @@ pub fn combine_values(dest: &mut serde_json::Value, src: &serde_json::Value) { } (_, _) => panic!("update_with only works with two serde_json::Value::Object s"), } -} \ No newline at end of file +} diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs index 8f8c328a..39738487 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs @@ -1,7 +1,10 @@ use std::sync::{Arc, Mutex}; use crate::{ - modules::logger::Logger, modules::storage::Storage, modules::wasmtime_wasi_module, Ctx, + modules::{ + logger::Logger, state_manager::GlobalStateManager, storage::Storage, wasmtime_wasi_module, + }, + Ctx, }; use super::*; @@ -80,9 +83,9 @@ impl Client { let server_pk = ed25519::PublicKey::from_slice(&base64::decode(&server_pk)?).expect("bad public key"); - let server_ipport = format!("{}:{}", ip, port); + let server_ip_port = format!("{}:{}", ip, port); - let mut socket = TcpStream::connect(server_ipport).await?; + let mut socket = TcpStream::connect(server_ip_port).await?; let handshake = handshake_client(&mut socket, ssb_net_id(), pk, sk.clone(), server_pk).await?; @@ -187,7 +190,13 @@ impl Client { Ok(()) } - pub async fn publish_event(&mut self, event: &str, section: &str, content: &str, mentions: Option>) -> Result<()> { + pub async fn publish_event( + &mut self, + event: &str, + section: &str, + content: &str, + mentions: Option>, + ) -> Result<()> { let _req_id = self .api .publish_req_send(TypedMessage::Event { @@ -217,7 +226,8 @@ impl Client { let (id, msg) = self.rpc_reader.recv().await?; let ctx = ctx.lock().unwrap(); let db = ctx.get_db(); - let logger = ctx.get_logger(); + let mut logger = ctx.get_logger(); + let mut state_manager = ctx.get_state_manager(); if id == req_no { match msg { @@ -234,23 +244,31 @@ impl Client { match serde_json::from_str::(&x.text) { Ok(mut event) => { - - logger.info(&format!("Event: {:#?}", event)); - match db.get(&x.mentions.unwrap()[0].link) { + match db.get_request_body( + &x.mentions.unwrap()[0].link, + ) { Ok(body) => { let data = serde_json::json!({ "data" : crate::common::combine_values(&mut event, &body.input), "allowed_hosts": body.allowed_hosts }); + + let workflow_index = ctx + .get_state_manager() + .new_workflow(0, "hello"); + let _ = wasmtime_wasi_module::run_workflow( + &mut state_manager, + &mut logger, serde_json::to_value(data) .unwrap(), body.wasm, - 0, - "hello", + workflow_index, + false, + None ); } Err(e) => logger.error(&e.to_string()), diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs index baf1beea..f18d0484 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs @@ -6,7 +6,7 @@ mod tests { use serde_json::{json, Value}; // ssb-server should keep running for testing - /* configure the env variables such as ssb-sercret file path, ip and port where + /* configure the env variables such as ssb-secret file path, ip and port where ssb-server is running in .env file */ // use `cargo test -- --ignored` command for testing diff --git a/runtime/lite/src/modules/logger/logger.rs b/runtime/lite/src/modules/logger/logger.rs index 1b2eed59..83736b9d 100644 --- a/runtime/lite/src/modules/logger/logger.rs +++ b/runtime/lite/src/modules/logger/logger.rs @@ -28,14 +28,12 @@ impl CoreLogger { let file = match log_file { Some(file) => OpenOptions::new() .write(true) - .append(true) .create(true) .open(file) .unwrap(), None => OpenOptions::new() .create(true) .write(true) - .append(true) .open("./workflows.log") .unwrap(), }; diff --git a/runtime/lite/src/modules/logger/tests.rs b/runtime/lite/src/modules/logger/tests.rs index 0a21a4c5..31c2ff61 100644 --- a/runtime/lite/src/modules/logger/tests.rs +++ b/runtime/lite/src/modules/logger/tests.rs @@ -27,7 +27,7 @@ fn test_writing_to_log_file() { } #[test] -fn test_logger_in_multi_threads(){ +fn test_logger_in_multi_threads() { let logger = CoreLogger::new(Some("test3.log")); let mut handles = vec![]; @@ -44,4 +44,4 @@ fn test_logger_in_multi_threads(){ } fs::remove_file("test3.log").unwrap(); -} \ No newline at end of file +} diff --git a/runtime/lite/src/modules/state_manager/mod.rs b/runtime/lite/src/modules/state_manager/mod.rs index 88b777db..f9e73a40 100644 --- a/runtime/lite/src/modules/state_manager/mod.rs +++ b/runtime/lite/src/modules/state_manager/mod.rs @@ -5,12 +5,15 @@ use serde_json::Value; pub mod traits; pub mod types; +pub use logger::{CoreLogger, Logger}; pub use traits::*; pub use types::*; -pub use logger::{CoreLogger, Logger}; use super::logger; +#[cfg(test)] +mod tests; +#[derive(Debug)] pub struct GlobalState { workflows: Vec, logger: U, @@ -25,12 +28,13 @@ impl GlobalState { } } -impl GlobalStateManager for GlobalState -{ - fn new_workflow(&mut self, workflow_id: usize, workflow_name: &str) { +impl GlobalStateManager for GlobalState { + fn new_workflow(&mut self, workflow_id: usize, workflow_name: &str) -> usize { self.workflows .push(WorkflowState::new(workflow_id, workflow_name)); - self.logger.info(&format!("[new workflow created with id:{}]", workflow_id)); + self.logger + .info(&format!("[new workflow created with id:{}]", workflow_id)); + self.workflows.len() - 1 } fn get_state_data(&self, workflow_index: usize) -> Result> { @@ -45,9 +49,15 @@ impl GlobalStateManager for GlobalState if self.workflows.len() <= workflow_index { Err(anyhow!("index out of bound")) } else { - self.logger.info(&format!("[workflow:{} starting...]", self.workflows[workflow_index].get_id())); + self.logger.info(&format!( + "[workflow:{} starting...]", + self.workflows[workflow_index].get_id() + )); self.workflows[workflow_index].update_running()?; - self.logger.info(&format!("[workflow:{} running]", self.workflows[workflow_index].get_id())); + self.logger.info(&format!( + "[workflow:{} running]", + self.workflows[workflow_index].get_id() + )); Ok(()) } } @@ -57,7 +67,10 @@ impl GlobalStateManager for GlobalState Err(anyhow!("index out of bound")) } else { self.workflows[workflow_index].update_paused(output)?; - self.logger.warn(&format!("[workflow:{} paused]", self.workflows[workflow_index].get_id())); + self.logger.warn(&format!( + "[workflow:{} paused]", + self.workflows[workflow_index].get_id() + )); Ok(()) } } @@ -67,163 +80,37 @@ impl GlobalStateManager for GlobalState workflow_index: usize, result: Value, is_success: bool, + is_cached: bool ) -> Result<()> { if self.workflows.len() <= workflow_index { Err(anyhow!("index out of bound")) } else { self.workflows[workflow_index].update_result(result.clone(), is_success)?; - if is_success{ - self.logger.info(&format!("[workflow:{} execution success✅]", self.workflows[workflow_index].get_id())); - }else{ + if is_success { + + if is_cached { + self.logger.warn(&format!( + "[workflow:{} cached result used]", + self.workflows[workflow_index].get_id() + )); + } + + self.logger.info(&format!( + "[workflow:{} execution success✅]", + self.workflows[workflow_index].get_id() + )); + } else { let id = self.workflows[workflow_index].get_id(); - self.logger.error(&format!("[workflow:{} execution failed❌]", id)); - let result: String = serde_json::from_value(result.get("Err").unwrap().clone()).unwrap(); - self.logger.error(&format!("[workflow:{} result: {}]", id, result)); + self.logger + .error(&format!("[workflow:{} execution failed❌]", id)); + let result: String = + serde_json::from_value(result.get("Err").unwrap().clone()).unwrap(); + self.logger + .error(&format!("[workflow:{} result: {}]", id, result)); } Ok(()) } } - - } - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn test_new_workflow() { - let logger = CoreLogger::new(Some("./test_log_1.log")); - let mut global_state= GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - let new_workflow_state = WorkflowState::new(0, "test_workflow"); - assert_eq!(global_state.workflows[0], new_workflow_state); - std::fs::remove_file("./test_log_1.log").unwrap() - } - - #[test] - fn test_get_state_data_pass() { - let logger = CoreLogger::new(Some("./test_log_2.log")); - let mut global_state = GlobalState::new(logger); - - global_state.new_workflow(0, "test_workflow"); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_id(), 0); - assert_eq!(state_data.get_workflow_name(), "test_workflow"); - assert_eq!(state_data.get_execution_state(), ExecutionState::Init); - assert!(state_data.get_result().is_err()); - - - global_state.update_running(0).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_id(), 0); - assert_eq!(state_data.get_workflow_name(), "test_workflow"); - assert_eq!(state_data.get_execution_state(), ExecutionState::Running); - assert!(state_data.get_result().is_err()); - - // without result - global_state.update_paused(0, None).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_id(), 0); - assert_eq!(state_data.get_workflow_name(), "test_workflow"); - assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); - assert!(state_data.get_result().is_err()); - - - global_state.update_running(0).unwrap(); - // with result - let data = Value::String("some result".to_string()); - global_state.update_paused(0, Some(data.clone())).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_id(), 0); - assert_eq!(state_data.get_workflow_name(), "test_workflow"); - assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); - assert_eq!(state_data.get_result().unwrap(), data); - - global_state.update_running(0).unwrap(); - global_state.update_result(0, data.clone(), true).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_id(), 0); - assert_eq!(state_data.get_workflow_name(), "test_workflow"); - assert_eq!(state_data.get_execution_state(), ExecutionState::Success); - assert_eq!(state_data.get_result().unwrap(), data); - std::fs::remove_file("./test_log_2.log").unwrap() - } - - #[test] - #[should_panic="index out of bound"] - fn test_get_state_data_fail(){ - let logger = CoreLogger::new(Some("./test_log_3.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - std::fs::remove_file("./test_log_3.log").unwrap(); - global_state.get_state_data(1).unwrap(); - } - - #[test] - fn test_update_running_pass(){ - let logger = CoreLogger::new(Some("./test_log_4.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - global_state.update_running(0).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_execution_state(), ExecutionState::Running); - std::fs::remove_file("./test_log_4.log").unwrap() - } - - #[test] - #[should_panic="index out of bound"] - fn test_update_running_fail(){ - let logger = CoreLogger::new(Some("./test_log_5.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - std::fs::remove_file("./test_log_5.log").unwrap(); - global_state.update_running(1).unwrap(); - } - - #[test] - fn test_update_paused_pass(){ - let logger = CoreLogger::new(Some("./test_log_6.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - global_state.update_paused(0, None).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); - std::fs::remove_file("./test_log_6.log").unwrap(); - } - - #[test] - #[should_panic="index out of bound"] - fn test_update_paused_fail(){ - let logger = CoreLogger::new(Some("./test_log_7.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - std::fs::remove_file("./test_log_7.log").unwrap(); - global_state.update_paused(1, None).unwrap(); - } - - #[test] - fn test_update_result_pass(){ - let logger = CoreLogger::new(Some("./test_log_8.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - global_state.update_running(0).unwrap(); - let data = Value::String("some result".to_string()); - global_state.update_result(0, data.clone(), true).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - std::fs::remove_file("./test_log_8.log").unwrap(); - assert_eq!(state_data.get_result().unwrap(), data); - } - - #[test] - #[should_panic="index out of bound"] - fn test_update_result_fail(){ - let logger = CoreLogger::new(Some("./test_log_9.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - std::fs::remove_file("./test_log_9.log").unwrap(); - global_state.update_result(1, Value::Null, true).unwrap(); - } - -} \ No newline at end of file diff --git a/runtime/lite/src/modules/state_manager/tests.rs b/runtime/lite/src/modules/state_manager/tests.rs new file mode 100644 index 00000000..b7503678 --- /dev/null +++ b/runtime/lite/src/modules/state_manager/tests.rs @@ -0,0 +1,132 @@ +use super::*; +#[test] +fn test_new_workflow() { + let logger = CoreLogger::new(Some("./test_log_1.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + let new_workflow_state = WorkflowState::new(0, "test_workflow"); + assert_eq!(global_state.workflows[0], new_workflow_state); + std::fs::remove_file("./test_log_1.log").unwrap() +} + +#[test] +fn test_get_state_data_pass() { + let logger = CoreLogger::new(Some("./test_log_2.log")); + let mut global_state = GlobalState::new(logger); + + global_state.new_workflow(0, "test_workflow"); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_id(), 0); + assert_eq!(state_data.get_workflow_name(), "test_workflow"); + assert_eq!(state_data.get_execution_state(), ExecutionState::Init); + assert!(state_data.get_result().is_err()); + + global_state.update_running(0).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_id(), 0); + assert_eq!(state_data.get_workflow_name(), "test_workflow"); + assert_eq!(state_data.get_execution_state(), ExecutionState::Running); + assert!(state_data.get_result().is_err()); + + // without result + global_state.update_paused(0, None).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_id(), 0); + assert_eq!(state_data.get_workflow_name(), "test_workflow"); + assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); + assert!(state_data.get_result().is_err()); + + global_state.update_running(0).unwrap(); + // with result + let data = Value::String("some result".to_string()); + global_state.update_paused(0, Some(data.clone())).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_id(), 0); + assert_eq!(state_data.get_workflow_name(), "test_workflow"); + assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); + assert_eq!(state_data.get_result().unwrap(), data); + + global_state.update_running(0).unwrap(); + global_state.update_result(0, data.clone(), true, false).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_id(), 0); + assert_eq!(state_data.get_workflow_name(), "test_workflow"); + assert_eq!(state_data.get_execution_state(), ExecutionState::Success); + assert_eq!(state_data.get_result().unwrap(), data); + std::fs::remove_file("./test_log_2.log").unwrap() +} + +#[test] +#[should_panic = "index out of bound"] +fn test_get_state_data_fail() { + let logger = CoreLogger::new(Some("./test_log_3.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + std::fs::remove_file("./test_log_3.log").unwrap(); + global_state.get_state_data(1).unwrap(); +} + +#[test] +fn test_update_running_pass() { + let logger = CoreLogger::new(Some("./test_log_4.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + global_state.update_running(0).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_execution_state(), ExecutionState::Running); + std::fs::remove_file("./test_log_4.log").unwrap() +} + +#[test] +#[should_panic = "index out of bound"] +fn test_update_running_fail() { + let logger = CoreLogger::new(Some("./test_log_5.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + std::fs::remove_file("./test_log_5.log").unwrap(); + global_state.update_running(1).unwrap(); +} + +#[test] +fn test_update_paused_pass() { + let logger = CoreLogger::new(Some("./test_log_6.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + global_state.update_paused(0, None).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); + std::fs::remove_file("./test_log_6.log").unwrap(); +} + +#[test] +#[should_panic = "index out of bound"] +fn test_update_paused_fail() { + let logger = CoreLogger::new(Some("./test_log_7.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + std::fs::remove_file("./test_log_7.log").unwrap(); + global_state.update_paused(1, None).unwrap(); +} + +#[test] +fn test_update_result_pass() { + let logger = CoreLogger::new(Some("./test_log_8.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + global_state.update_running(0).unwrap(); + let data = Value::String("some result".to_string()); + global_state.update_result(0, data.clone(), true, false).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + std::fs::remove_file("./test_log_8.log").unwrap(); + assert_eq!(state_data.get_result().unwrap(), data); +} + +#[test] +#[should_panic = "index out of bound"] +fn test_update_result_fail() { + let logger = CoreLogger::new(Some("./test_log_9.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + std::fs::remove_file("./test_log_9.log").unwrap(); + global_state.update_result(1, Value::Null, true, false).unwrap(); +} diff --git a/runtime/lite/src/modules/state_manager/traits.rs b/runtime/lite/src/modules/state_manager/traits.rs index eaee9490..66fc9da5 100644 --- a/runtime/lite/src/modules/state_manager/traits.rs +++ b/runtime/lite/src/modules/state_manager/traits.rs @@ -12,9 +12,9 @@ pub trait WorkflowStateManager{ } pub trait GlobalStateManager { - fn new_workflow(&mut self, workflow_id: usize, workflow_name: &str); // returns index(used as id also) + fn new_workflow(&mut self, workflow_id: usize, workflow_name: &str) -> usize; // returns index(used as id also) fn get_state_data(&self, workflow_index: usize) -> Result>; fn update_running(&mut self, workflow_index: usize) -> Result<()>; fn update_paused(&mut self, workflow_index: usize, output: Option) -> Result<()>; - fn update_result(&mut self, workflow_index: usize, result:Value, is_success: bool) -> Result<()>; + fn update_result(&mut self, workflow_index: usize, result:Value, is_success: bool, is_cached: bool) -> Result<()>; } diff --git a/runtime/lite/src/modules/state_manager/types.rs b/runtime/lite/src/modules/state_manager/types.rs index 9bcbbabe..0e7aef96 100644 --- a/runtime/lite/src/modules/state_manager/types.rs +++ b/runtime/lite/src/modules/state_manager/types.rs @@ -8,6 +8,7 @@ pub enum ExecutionState { Paused, Failed, Success, + Cached, } #[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] @@ -37,6 +38,8 @@ impl WorkflowStateManager for WorkflowState { self.get_result() )), + ExecutionState::Cached => Err(anyhow!("workflow already cached")), + ExecutionState::Running => Err(anyhow!("workflow execution already in progress!")), ExecutionState::Init | ExecutionState::Paused => { @@ -57,7 +60,7 @@ impl WorkflowStateManager for WorkflowState { Err(anyhow!("workflow does not executed! execution_state: Init")) } - ExecutionState::Running | ExecutionState::Paused => { + ExecutionState::Running | ExecutionState::Paused | ExecutionState::Cached => { if is_success { self.execution_state = ExecutionState::Success; } else { @@ -76,6 +79,8 @@ impl WorkflowStateManager for WorkflowState { self.get_result() )), + ExecutionState::Cached => Err(anyhow!("workflow already cached")), + ExecutionState::Paused => Err(anyhow!("workflow already paused")), ExecutionState::Init | ExecutionState::Running => { @@ -108,7 +113,7 @@ impl WorkflowStateManager for WorkflowState { ExecutionState::Running => Err(anyhow!("execution in-progress")), ExecutionState::Paused => Err(anyhow!("execution paused")), ExecutionState::Failed => Ok(false), - ExecutionState::Success => Ok(true), + ExecutionState::Success | ExecutionState::Cached => Ok(true), } } @@ -120,7 +125,7 @@ impl WorkflowStateManager for WorkflowState { Some(value) => Ok(value.clone()), None => Err(anyhow!("no result is stored!")), }, - ExecutionState::Failed | ExecutionState::Success => Ok(self.result.clone().unwrap()), + ExecutionState::Failed | ExecutionState::Success | ExecutionState::Cached => Ok(self.result.clone().unwrap()), } } } diff --git a/runtime/lite/src/modules/storage/storage.rs b/runtime/lite/src/modules/storage/storage.rs index 1843e26e..5b1ddbfc 100644 --- a/runtime/lite/src/modules/storage/storage.rs +++ b/runtime/lite/src/modules/storage/storage.rs @@ -36,7 +36,7 @@ impl From for CustomError { impl From for io::Error { fn from(error: CustomError) -> Self { - match error { + match error { CustomError::RocksDB(e) => io::Error::new(io::ErrorKind::Other, e), CustomError::Io(e) => e, CustomError::Custom(e) => io::Error::new(io::ErrorKind::Other, e), @@ -94,8 +94,7 @@ impl Storage for CoreStorage { /// /// The `set_data` function returns a `Result<(), Error>`. fn set_data(&self, key: &str, value: Vec) -> Result<(), CustomError> { - let serialized_value = value; - self.db.put(key, serialized_value)?; + self.db.put(key, value)?; Ok(()) } @@ -137,25 +136,6 @@ impl Storage for CoreStorage { Ok(()) } - /// The function `store_wasm` stores a WebAssembly binary file in a key-value database. - /// - /// Arguments: - /// - /// * `key`: The `key` parameter is a reference to a string that represents the key under which the - /// WebAssembly binary will be stored in the database. - /// * `wasm_path`: The `wasm_path` parameter in the `store_wasm` function represents the file path - /// to the WebAssembly (Wasm) file that you want to store in the database. This function reads the - /// contents of the Wasm file as bytes and stores them in the database with the specified key. - /// - /// Returns: - /// - /// The `store_wasm` function is returning a `Result<(), Error>`. - fn store_wasm(&self, key: &str, wasm: &[u8]) -> Result<(), CustomError> { - self.db.put(key, wasm)?; - - Ok(()) - } - /// The function `get_wasm` retrieves a WebAssembly module from a database using a given key. /// /// Arguments: @@ -168,24 +148,14 @@ impl Storage for CoreStorage { /// The `get_wasm` function returns a `Result` containing either a vector of `u8` bytes or an /// `Error`. - fn get_wasm(&self, key: &str) -> Result, CustomError> { - match self.db.get(key) { - Ok(Some(retrieved_wasm_bytes)) => Ok(retrieved_wasm_bytes), - Ok(None) => Err(CustomError::Custom(format!( - "WASM module not found with key: {:?}", - key - ))), - Err(err) => Err(CustomError::RocksDB(err)), - } - } - - fn insert(&self, key: &str, value: crate::common::RequestBody) -> Result<(), CustomError> { + fn insert_request_body(&self, key: &str, value: crate::common::RequestBody) -> Result<(), CustomError> { let bytes = serde_json::to_vec(&value).map_err(|e| CustomError::Custom(e.to_string()))?; self.db .put(key, bytes) .map_err(|e| CustomError::Custom(e.to_string())) } - fn get(&self, key: &str) -> Result { + + fn get_request_body(&self, key: &str) -> Result { let res = self .db .get(key) diff --git a/runtime/lite/src/modules/storage/test.rs b/runtime/lite/src/modules/storage/test.rs index 7c8f2e7e..7e5270e5 100644 --- a/runtime/lite/src/modules/storage/test.rs +++ b/runtime/lite/src/modules/storage/test.rs @@ -92,35 +92,4 @@ mod tests { fs::remove_dir_all(std::path::Path::new("test5")).unwrap(); result.unwrap(); } - - /// The test function stores a WebAssembly file in a database and then retrieves it to compare with - /// the original file. - #[test] - fn test_store_and_get_wasm() { - let core_storage = CoreStorage::new("test8").unwrap(); - let wasm_bytes = vec![0x00, 0x61, 0x01]; - - let key = "boilerplate"; - core_storage.store_wasm(key, &wasm_bytes).unwrap(); - - let retrieved_wasm = core_storage.get_wasm(key).unwrap(); - fs::remove_dir_all(std::path::Path::new("test8")).unwrap(); - assert_eq!(retrieved_wasm, wasm_bytes) - } - - /// The test function is checking if an error is raised when trying to retrieve a WebAssembly module - /// with a different key than the one it was stored with. - #[test] - #[should_panic] - fn test_get_wasm_with_different_key() { - let core_storage = CoreStorage::new("test9").unwrap(); - let wasm_bytes = vec![0x00, 0x61, 0x01]; - - let key = "boilerplate"; - core_storage.store_wasm(key, &wasm_bytes).unwrap(); - - let result = core_storage.get_wasm("hello"); - fs::remove_dir_all(std::path::Path::new("test9")).unwrap(); - result.unwrap(); - } } diff --git a/runtime/lite/src/modules/storage/traits.rs b/runtime/lite/src/modules/storage/traits.rs index 15176bc7..49409641 100644 --- a/runtime/lite/src/modules/storage/traits.rs +++ b/runtime/lite/src/modules/storage/traits.rs @@ -5,8 +5,6 @@ pub trait Storage { fn set_data(&self, key: &str, value: Vec) -> Result<(), CustomError>; fn modify_data(&self, key: &str, value: Vec) -> Result<(), CustomError>; fn delete_data(&self, key: &str) -> Result<(), CustomError>; - fn store_wasm(&self, key: &str, wasm : &[u8]) -> Result<(), CustomError>; - fn get_wasm(&self, key: &str) -> Result, CustomError>; - fn insert(&self, key: &str, value: RequestBody) -> Result<(), CustomError>; - fn get(&self, key: &str) -> Result; + fn insert_request_body(&self, key: &str, value: RequestBody) -> Result<(), CustomError>; + fn get_request_body(&self, key: &str) -> Result; } diff --git a/runtime/lite/src/modules/wasmtime_wasi_module/help.rs b/runtime/lite/src/modules/wasmtime_wasi_module/help.rs deleted file mode 100644 index 9fdbc87e..00000000 --- a/runtime/lite/src/modules/wasmtime_wasi_module/help.rs +++ /dev/null @@ -1,11 +0,0 @@ -// use wiremock::MockServer; - -// async fn create_server(add: &str) -> MockServer { -// let listener = std::net::TcpListener::bind(add).unwrap(); -// MockServer::builder().listener(listener).start().await -// } - -// pub async fn post(address: &str) -> MockServer { -// let server = create_server(address).await; -// server -// } diff --git a/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs b/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs index e6a283a8..9adf2ae7 100644 --- a/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs +++ b/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs @@ -1,19 +1,17 @@ -use serde::{Deserialize, Serialize}; -use serde_json::Value; -pub mod help; -pub use help::*; -mod tests; - use crate::modules::state_manager::{ ExecutionState, GlobalState, GlobalStateManager, WorkflowState, }; +use serde::{Deserialize, Serialize}; +use serde_json::Value; use sha256::digest; +use std::sync::MutexGuard; use std::sync::{Arc, Mutex}; +mod tests; mod types; pub use types::*; -use logger::{CoreLogger, Logger}; +use logger::Logger; use rocksdb::DB; use wasi_common::WasiCtx; use wasi_experimental_http_wasmtime::{HttpCtx, HttpState}; @@ -23,23 +21,25 @@ use wasmtime_wasi::sync::WasiCtxBuilder; use super::logger; -#[allow(dead_code)] -fn run_workflow_helper( +pub fn run_workflow( + state_manager: &mut MutexGuard>, + logger: &mut MutexGuard, data: Value, wasm_file: Vec, - hash_key: String, - state_manager: &mut GlobalState, workflow_index: usize, - restart: bool, // ignores the cache - logger: U, + ignore_cache: bool, // ignores the cache + pause_at: Option, ) -> Result { - let id = state_manager + let workflow_id = state_manager .get_state_data(workflow_index) .unwrap() .get_id(); - let cache = DB::open_default(format!("./.cache/{:?}", id)).unwrap(); - let prev_internal_state_data = if !restart { + let hash_key = digest(format!("{:?}{:?}", data, workflow_id)); + + let cache = DB::open_default(format!("./.cache/{:?}", workflow_id)).unwrap(); + + let prev_internal_state_data = if !ignore_cache { let prev_internal_state_data: Value = match cache.get(hash_key.as_bytes()).unwrap() { Some(data) => serde_json::from_slice(&data).unwrap(), None => serde_json::json!([]), @@ -48,9 +48,8 @@ fn run_workflow_helper( // returns the main output without passing the state data to the workflow if let Some(output) = prev_internal_state_data.get("success") { state_manager.update_running(workflow_index).unwrap(); - logger.warn(&format!("[workflow:{id} cached result used]")); state_manager - .update_result(workflow_index, output.clone(), true) + .update_result(workflow_index, output.clone(), true, true) .unwrap(); return Ok(serde_json::from_value(output.clone()).unwrap()); } @@ -64,9 +63,9 @@ fn run_workflow_helper( let mut input: MainInput = serde_json::from_value(data).unwrap(); input.data = if prev_internal_state_data.is_some() { - serde_json::json!({"data": input.data, "prev_output": prev_internal_state_data}) + serde_json::json!({"data": input.data, "prev_output": prev_internal_state_data, "pause_at": pause_at}) } else { - serde_json::json!({"data": input.data, "prev_output": []}) + serde_json::json!({"data": input.data, "prev_output": [], "pause_at": pause_at}) }; let engine = Engine::default(); @@ -120,7 +119,7 @@ fn run_workflow_helper( let output_2: Arc>> = Arc::new(Mutex::new(Vec::new())); let output_ = output_2.clone(); - let logger_cln = Arc::new(Mutex::new(logger)); + let logger_cln = Arc::new(Mutex::new(logger.clone())); linker .func_wrap( @@ -142,21 +141,34 @@ fn run_workflow_helper( ExecutionState::Init => { logger_cln.lock().unwrap().info(&format!( "[workflow:{:?} task[{}...] ]", - id, task_state_data.action_name + workflow_id, task_state_data.action_name )); } ExecutionState::Running => { logger_cln.lock().unwrap().info(&format!( "[workflow:{:?} task[{}:{}] running]", - id, task_state_data.task_index, task_state_data.action_name + workflow_id, + task_state_data.task_index, + task_state_data.action_name )); } ExecutionState::Paused => { logger_cln.lock().unwrap().warn(&format!( "[workflow:{:?} task[{}:{}] paused]", - id, task_state_data.task_index, task_state_data.action_name + workflow_id, + task_state_data.task_index, + task_state_data.action_name + )); + } + + ExecutionState::Cached => { + logger_cln.lock().unwrap().warn(&format!( + "[workflow:{:?} task[{}:{}] cached result used]", + workflow_id, + task_state_data.task_index, + task_state_data.action_name )); } @@ -167,14 +179,14 @@ fn run_workflow_helper( -1 => { logger_cln.lock().unwrap().info(&format!( "[workflow:{:?} task[{}] success]", - id, task_state_data.action_name + workflow_id, task_state_data.action_name )); } _ => { logger_cln.lock().unwrap().info(&format!( "[workflow:{:?} task[{}:{}] success]", - id, + workflow_id, task_state_data.task_index, task_state_data.action_name )); @@ -188,7 +200,7 @@ fn run_workflow_helper( ExecutionState::Failed => { logger_cln.lock().unwrap().error(&format!( "[workflow:{:?} task[{}:{}] failed[{}]]", - id, + workflow_id, task_state_data.task_index, task_state_data.action_name, task_state_data.error.unwrap() @@ -262,15 +274,27 @@ fn run_workflow_helper( if res.result.get("Err").is_some() { state_manager - .update_result(workflow_index, res.result.clone(), false) + .update_result(workflow_index, res.result.clone(), false, false) .unwrap(); + let mut bytes: Vec = Vec::new(); + serde_json::to_writer(&mut bytes, &state_output).unwrap(); + cache.put(hash_key.as_bytes(), bytes).unwrap(); + } else if res + .result + .get("Ok") + .unwrap() + .get("workflow_paused") + .is_some() + { + state_manager.update_paused(workflow_index, None).unwrap(); + let mut bytes: Vec = Vec::new(); serde_json::to_writer(&mut bytes, &state_output).unwrap(); cache.put(hash_key.as_bytes(), bytes).unwrap(); } else { state_manager - .update_result(workflow_index, res.result.clone(), true) + .update_result(workflow_index, res.result.clone(), true, false) .unwrap(); let state_result = serde_json::json!({ "success" : res }); @@ -281,26 +305,3 @@ fn run_workflow_helper( Ok(res) } - -pub fn run_workflow( - data: Value, - wasm_file: Vec, - workflow_id: usize, - workflow_name: &str, -) -> Result { - let logger = CoreLogger::new(Some("./workflow.log")); - let mut state_manager = GlobalState::new(logger.clone()); - - state_manager.new_workflow(workflow_id, workflow_name); - - let digest = digest(format!("{:?}{:?}", data, workflow_name)); - run_workflow_helper( - data, - wasm_file, - digest, - &mut state_manager, - 0, - false, - logger, - ) -} diff --git a/runtime/lite/src/modules/wasmtime_wasi_module/tests.rs b/runtime/lite/src/modules/wasmtime_wasi_module/tests.rs index dfef33d9..78807960 100644 --- a/runtime/lite/src/modules/wasmtime_wasi_module/tests.rs +++ b/runtime/lite/src/modules/wasmtime_wasi_module/tests.rs @@ -1,43 +1,152 @@ -#[async_std::test] -async fn test_hello_world() { - let path = std::env::var("WORKFLOW_WASM") - .unwrap_or("../../workflow/examples/hello_world.wasm".to_string()); +#[cfg(test)] +mod tests { - let wasm = std::fs::read(&path).unwrap(); + use crate::context::Ctx; + use crate::logger::CoreLogger; + use crate::state_manager::{GlobalState, GlobalStateManager}; + use crate::storage::CoreStorage; + use crate::wasmtime_wasi_module::run_workflow; - let input = serde_json::json!({ - "allowed_hosts": [], - "data": { - "hello" : "world" - } - }); + #[async_std::test] + async fn test_hello_world() { + let logger = CoreLogger::new(Some("./workflow-1.log")); + let ctx = crate::context::Context::new( + logger.clone(), + CoreStorage::new("workflow_db_1").unwrap(), + GlobalState::new(logger), + ); - let result = super::run_workflow(input, wasm, 1,"hello_world").unwrap(); + let path = std::env::var("WORKFLOW_WASM") + .unwrap_or("../../workflow/examples/hello_world.wasm".to_string()); - assert!(result.result.to_string().contains("Hello")); -} + let wasm = std::fs::read(&path).unwrap(); + + let input = serde_json::json!({ + "allowed_hosts": [], + "data": { + "hello" : "world" + } + }); + + let mut logger = ctx.get_logger(); + let mut state_manager = ctx.get_state_manager(); + + let index = state_manager.new_workflow(1, "hello_world"); + + let result = run_workflow( + &mut state_manager, + &mut logger, + input.clone(), + wasm, + index, + true, + None, + ) + .unwrap(); + + assert!(result.result.to_string().contains("Hello")); + } + + #[async_std::test] + async fn test_employee_salary() { + let logger = CoreLogger::new(Some("./workflow-2.log")); + let ctx = crate::context::Context::new( + logger.clone(), + CoreStorage::new("workflow_db_2").unwrap(), + GlobalState::new(logger), + ); + + let path = std::env::var("WORKFLOW_WASM") + .unwrap_or("../../workflow/examples/employee_salary_state_managed.wasm".to_string()); + let wasm = std::fs::read(&path).unwrap(); + + let server = test_util::post("127.0.0.1:1234").await; + let input = serde_json::json!({ + "allowed_hosts": [ + server.uri() + ], + "data": { + "role":"Software Developer", + } + }); + + let mut logger = ctx.get_logger(); + let mut state_manager = ctx.get_state_manager(); + + let index = state_manager.new_workflow(2, "employee_salary"); + + let result = run_workflow( + &mut state_manager, + &mut logger, + input.clone(), + wasm, + index, + true, + None, + ) + .unwrap(); + assert!(result + .result + .to_string() + .contains("Salary creditted for emp id 1 from Hugobyte")); + } + + #[async_std::test] + async fn test_pause_resume_employee_salary() { + let logger = CoreLogger::new(Some("./workflow-3.log")); + let ctx = crate::context::Context::new( + logger.clone(), + CoreStorage::new("workflow_db_3").unwrap(), + GlobalState::new(logger), + ); + + let path = std::env::var("WORKFLOW_WASM") + .unwrap_or("../../workflow/examples/employee_salary_state_paused.wasm".to_string()); + let wasm = std::fs::read(&path).unwrap(); + + let server = test_util::post("127.0.0.1:1235").await; + let input = serde_json::json!({ + "allowed_hosts": [ + server.uri() + ], + "data": { + "role":"Software Developer", + } + }); + + let mut logger = ctx.get_logger(); + let mut state_manager = ctx.get_state_manager(); + + let index = state_manager.new_workflow(3, "employee_salary"); + + let result = run_workflow( + &mut state_manager, + &mut logger, + input.clone(), + wasm.clone(), + index, + true, // ignore cache + Some(2), // pause at task 2 + ) + .unwrap(); + + let expected_result = serde_json::json!({ "Ok" : { "workflow_paused" : true }}); + assert_eq!(result.result, expected_result); + + let result = run_workflow( + &mut state_manager, + &mut logger, + input.clone(), + wasm, + index, + false, // don't ignore cache + None, // don't pause + ) + .unwrap(); -#[async_std::test] -async fn test_employee_salary() { - - let path = std::env::var("WORKFLOW_WASM").unwrap_or( - "../../workflow/examples/employee_salary_state_managed.wasm".to_string(), - ); - let wasm = std::fs::read(&path).unwrap(); - - let server = test_util::post("127.0.0.1:1234").await; - let input = serde_json::json!({ - "allowed_hosts": [ - server.uri() - ], - "data": { - "role":"Software Developer", - } - }); - - let result = super::run_workflow(input.clone(), wasm, 2, "employee_salary").unwrap(); - assert!(result - .result - .to_string() - .contains("Salary creditted for emp id 1 from Hugobyte")); + assert!(result + .result + .to_string() + .contains("Salary creditted for emp id 1 from Hugobyte")); + } } diff --git a/workflow/examples/employee_salary_state_paused.wasm b/workflow/examples/employee_salary_state_paused.wasm new file mode 100755 index 00000000..380eb7e3 Binary files /dev/null and b/workflow/examples/employee_salary_state_paused.wasm differ