From e6c4e34429be19fdd4c3b9b51848a2958d5602aa Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Tue, 19 Mar 2024 15:40:04 +0530 Subject: [PATCH 1/4] chore: implement internal state manager in boilerplate --- echo-library/src/boilerplate/src/common.rs | 51 ++++++--- echo-library/src/boilerplate/src/lib.rs | 2 + echo-library/src/boilerplate/src/macros.rs | 59 +++++----- .../src/boilerplate/src/state_manager.rs | 106 ++++++++++++++++++ echo-library/src/boilerplate/src/traits.rs | 10 +- echo-library/src/common/composer.rs | 4 + 6 files changed, 185 insertions(+), 47 deletions(-) create mode 100644 echo-library/src/boilerplate/src/state_manager.rs diff --git a/echo-library/src/boilerplate/src/common.rs b/echo-library/src/boilerplate/src/common.rs index c368a04..5efece5 100644 --- a/echo-library/src/boilerplate/src/common.rs +++ b/echo-library/src/boilerplate/src/common.rs @@ -1,10 +1,14 @@ #![allow(unused_imports)] -use paste::paste; use super::*; +use alloc::task; +use paste::paste; +use workflow_macro::Flow; + #[derive(Debug, Flow)] pub struct WorkflowGraph { edges: Vec<(usize, usize)>, nodes: Vec>, + pub state_manager: StateManager, } impl WorkflowGraph { @@ -12,6 +16,7 @@ impl WorkflowGraph { WorkflowGraph { nodes: Vec::with_capacity(size), edges: Vec::new(), + state_manager: StateManager::init(), } } } @@ -20,22 +25,34 @@ impl WorkflowGraph { macro_rules! impl_execute_trait { ($ ($struct : ty), *) => { - paste!{ - $( impl Execute for $struct { - fn execute(&mut self) -> Result<(),String>{ - self.run() - } + paste!{$( + impl Execute for $struct { + fn execute(&mut self) -> Result<(),String>{ + self.run() + } - fn get_task_output(&self) -> Value { - self.output().clone().into() - } + fn get_task_output(&self) -> Value { + self.output().clone().into() + } + + fn set_output_to_task(&mut self, input: Value) { + self.setter(input) + } + + fn get_action_name(&self) -> String{ + self.action_name.clone() + } + + fn get_json_string(&self) -> String{ + serde_json::to_string(&self).unwrap() + } + + fn set_result_output(&mut self, inp: Value) { + self.set_result_output(inp) + } - fn set_output_to_task(&mut self, input: Value) { - self.setter(input) - } - } - )* } + )*} }; } @@ -65,6 +82,12 @@ pub unsafe extern "C" fn free_memory(ptr: *mut u8, size: u32, alignment: u32) { extern "C" { pub fn set_output(ptr: i32, size: i32); } + +#[link(wasm_import_module = "host")] +extern "C" { + pub fn set_state(ptr: i32, size: i32); +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Output { pub result: Value, diff --git a/echo-library/src/boilerplate/src/lib.rs b/echo-library/src/boilerplate/src/lib.rs index bae5683..45e38b0 100644 --- a/echo-library/src/boilerplate/src/lib.rs +++ b/echo-library/src/boilerplate/src/lib.rs @@ -29,6 +29,8 @@ use workflow_macro::Flow; extern crate alloc; use codec::{Decode, Encode}; use core::alloc::Layout; +mod state_manager; +use state_manager::*; #[no_mangle] pub fn _start(ptr: *mut u8, length: i32) { diff --git a/echo-library/src/boilerplate/src/macros.rs b/echo-library/src/boilerplate/src/macros.rs index f348df4..2bb7b8d 100644 --- a/echo-library/src/boilerplate/src/macros.rs +++ b/echo-library/src/boilerplate/src/macros.rs @@ -10,7 +10,7 @@ macro_rules! make_input_struct { [$($der:ident),*] ) => { #[derive($($der),*)] - pub struct $x { + pub struct $x { $( $(#[serde(default=$default_derive)])? $visibility $element: $ty @@ -42,6 +42,11 @@ macro_rules! make_main_struct { pub fn output(&self) -> Value { self.$output_field.clone() } + + pub fn set_result_output(&mut self, inp: Value) { + self.$output_field = inp + } + } } } @@ -61,7 +66,7 @@ macro_rules! impl_new { ..Default::default() }, ..Default::default() - } + } } } }; @@ -79,7 +84,7 @@ macro_rules! impl_new { ..Default::default() }, ..Default::default() - } + } } } } @@ -106,29 +111,27 @@ macro_rules! impl_setter { macro_rules! impl_map_setter { ( $name:ty, - $element:ident : $key:expr, + $element:ident : $key:expr, $typ_name : ty, $out:expr ) => { impl $name { pub fn setter(&mut self, val: Value) { - - let value = val.get($key).unwrap(); - let value = serde_json::from_value::>(value.clone()).unwrap(); - let mut map: HashMap<_, _> = value - .iter() - .map(|x| { - self.input.$element = x.to_owned() as $typ_name; - self.run(); - (x.to_owned(), self.output.get($out).unwrap().to_owned()) - }) - .collect(); - self.mapout = to_value(map).unwrap(); - + let value = val.get($key).unwrap(); + let value = serde_json::from_value::>(value.clone()).unwrap(); + let mut map: HashMap<_, _> = value + .iter() + .map(|x| { + self.input.$element = x.to_owned() as $typ_name; + self.run(); + (x.to_owned(), self.output.get($out).unwrap().to_owned()) + }) + .collect(); + self.mapout = to_value(map).unwrap(); } } - } - } + }; +} #[macro_export] macro_rules! impl_concat_setter { @@ -136,18 +139,17 @@ macro_rules! impl_concat_setter { $name:ty, $input:ident ) => { - impl $name{ + impl $name { pub fn setter(&mut self, val: Value) { - - let val: Vec = serde_json::from_value(val).unwrap(); - let res = join_hashmap( - serde_json::from_value(val[0].to_owned()).unwrap(), - serde_json::from_value(val[1].to_owned()).unwrap(), - ); - self.input.$input = res; + let val: Vec = serde_json::from_value(val).unwrap(); + let res = join_hashmap( + serde_json::from_value(val[0].to_owned()).unwrap(), + serde_json::from_value(val[1].to_owned()).unwrap(), + ); + self.input.$input = res; } } - } + }; } #[allow(unused)] @@ -176,4 +178,3 @@ macro_rules! impl_combine_setter { } } } - diff --git a/echo-library/src/boilerplate/src/state_manager.rs b/echo-library/src/boilerplate/src/state_manager.rs new file mode 100644 index 0000000..48bd54f --- /dev/null +++ b/echo-library/src/boilerplate/src/state_manager.rs @@ -0,0 +1,106 @@ +use super::*; +use crate::WorkflowGraph; +use core::default; + +#[derive(Debug, Serialize, Deserialize, Clone)] +enum ExecutionState { + Init, + Running, + Paused, + Failed, + Success, +} + +impl Default for ExecutionState { + fn default() -> Self { + ExecutionState::Running + } +} + +#[derive(Default, Debug)] +pub struct StateManager{ + action_name: String, + task_index: isize, + execution_state: ExecutionState, + output: Option, + error: Option, +} + +impl StateManager { + + fn update_state_data(&self) { + let state_data: serde_json::Value = serde_json::json!( + { + "action_name": self.action_name, + "task_index": self.task_index, + "execution_state": self.execution_state, + "output": self.output, + "error": self.error + } + ); + + let serialized = serde_json::to_vec(&state_data).unwrap(); + let size = serialized.len() as i32; + let ptr = serialized.as_ptr(); + + std::mem::forget(ptr); + + unsafe { + super::set_state(ptr as i32, size); + } + } + + pub fn init() -> Self { + let state_data = StateManager { + action_name: "Initializing Workflow".to_string(), + execution_state: ExecutionState::Init, + output: None, + task_index: -1, + error: None, + }; + + state_data.update_state_data(); + state_data + } + + pub fn update_workflow_initialized(&mut self) { + self.execution_state = ExecutionState::Success; + self.task_index = -1; + self.error = None; + self.update_state_data(); + } + + pub fn update_running(&mut self, action_name: &str, task_index: isize) { + self.action_name = action_name.to_string(); + self.task_index = task_index; + self.execution_state = ExecutionState::Running; + self.output = None; + self.update_state_data(); + } + + pub fn update_pause(&mut self) { + self.execution_state = ExecutionState::Paused; + self.update_state_data(); + } + + pub fn update_success(&mut self, output: Value) { + self.output = Some(output); + self.execution_state = ExecutionState::Success; + self.update_state_data(); + } + + pub fn update_restore_success(&mut self, action_name: &str, task_index: isize, output: Value) { + self.action_name = action_name.to_string(); + self.task_index = task_index; + self.execution_state = ExecutionState::Success; + self.output = Some(output); + self.update_state_data(); + } + + pub fn update_err(&mut self, error: &str) { + self.execution_state = ExecutionState::Failed; + self.error = Some(error.to_string()); + self.update_state_data(); + } + +} diff --git a/echo-library/src/boilerplate/src/traits.rs b/echo-library/src/boilerplate/src/traits.rs index 4905593..d85c1ca 100644 --- a/echo-library/src/boilerplate/src/traits.rs +++ b/echo-library/src/boilerplate/src/traits.rs @@ -1,10 +1,12 @@ - use super::*; -pub trait Execute : Debug + DynClone { - fn execute(&mut self)-> Result<(),String>; - fn get_task_output(&self)->Value; +pub trait Execute: Debug + DynClone { + fn execute(&mut self) -> Result<(), String>; + fn get_task_output(&self) -> Value; + fn set_result_output(&mut self, inp: Value); fn set_output_to_task(&mut self, inp: Value); + fn get_action_name(&self) -> String; + fn get_json_string(&self) -> String; } clone_trait_object!(Execute); diff --git a/echo-library/src/common/composer.rs b/echo-library/src/common/composer.rs index e1804ad..b2a28e2 100644 --- a/echo-library/src/common/composer.rs +++ b/echo-library/src/common/composer.rs @@ -13,6 +13,7 @@ const COMMON: &str = include_str!("../boilerplate/src/common.rs"); const LIB: &str = include_str!("../boilerplate/src/lib.rs"); const TRAIT: &str = include_str!("../boilerplate/src/traits.rs"); const MACROS: &str = include_str!("../boilerplate/src/macros.rs"); +const STATE_MANAGER: &str = include_str!("../boilerplate/src/state_manager.rs"); const CARGO: &str = include_str!("../boilerplate/Cargo.toml"); #[derive(Debug, ProvidesStaticType, Default)] @@ -136,6 +137,9 @@ impl Composer { let temp_path = src_curr.as_path().join("macros.rs"); std::fs::write(temp_path, MACROS)?; + let temp_path = src_curr.as_path().join("state_manager.rs"); + std::fs::write(temp_path, STATE_MANAGER)?; + let cargo_path = curr.join("Cargo.toml"); std::fs::write(cargo_path.clone(), CARGO)?; From d11735077b3a71008ed49f2f821a3636734346e1 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Tue, 19 Mar 2024 18:08:25 +0530 Subject: [PATCH 2/4] chore: implement state manager in boilerplate --- echo-library/src/boilerplate/Cargo.toml | 2 +- echo-library/src/common/parse_module.rs | 287 +++--------------------- 2 files changed, 38 insertions(+), 251 deletions(-) diff --git a/echo-library/src/boilerplate/Cargo.toml b/echo-library/src/boilerplate/Cargo.toml index de86c6f..5c8a3c3 100644 --- a/echo-library/src/boilerplate/Cargo.toml +++ b/echo-library/src/boilerplate/Cargo.toml @@ -23,7 +23,7 @@ derive-enum-from-into = "0.1.1" serde_derive = "1.0.192" paste = "1.0.7" dyn-clone = "1.0.7" -workflow_macro = "0.0.3" +workflow_macro = {git="https://github.com/HugoByte/aurras", branch = "feat/modify-workflow-macro-for-state-management", package = "workflow_macro"} openwhisk-rust = "0.1.2" serde_json = { version = "1.0", features = ["raw_value"] } serde = { version = "1.0.192", features = ["derive"] } diff --git a/echo-library/src/common/parse_module.rs b/echo-library/src/common/parse_module.rs index 3e258b8..cc14ca8 100644 --- a/echo-library/src/common/parse_module.rs +++ b/echo-library/src/common/parse_module.rs @@ -23,10 +23,15 @@ pub fn get_task_kind(kind: &str) -> Result { fn get_main_method_code_template(tasks_length: usize) -> String { format!( "#[allow(dead_code, unused)] -pub fn main(args: Value) -> Result {{ + pub fn main(args: Value) -> Result {{ const LIMIT: usize = {tasks_length}; let mut workflow = WorkflowGraph::new(LIMIT); - let input: Input = serde_json::from_value(args).map_err(|e| e.to_string())?; + workflow.state_manager.update_workflow_initialized(); + + let input: Input = + serde_json::from_value(args.get(\"data\").unwrap().clone()).map_err(|e| e.to_string())?; + let prev_output: Vec = serde_json::from_value(args.get(\"prev_output\").unwrap().clone()) + .map_err(|e| e.to_string())?; " ) } @@ -53,7 +58,6 @@ pub fn get_attributes(attributes: &HashMap) -> String { format!("[{}]", build_string.join(",")) } - fn get_default_value_functions_code(workflow: &Workflow) -> String { let mut default_value_functions = String::new(); @@ -369,6 +373,10 @@ fn get_add_nodes_code(flow: &Vec) -> String { } fn get_add_edges_code(workflow: &Workflow, flow: &Vec) -> Result { + if flow.len() == 1 { + return Ok("".to_string()); + } + let mut add_edges_code = "workflow.add_edges(&[\n".to_string(); for index in 0..flow.len() - 1 { @@ -394,43 +402,6 @@ fn get_add_edges_code(workflow: &Workflow, flow: &Vec) -> Result) -> Result { - let mut execute_code = "let result = workflow\n.init()?".to_string(); - - for task_index in 0..flow.len() - 1 { - execute_code = if task_index + 1 == flow.len() - 1 { - match workflow - .tasks - .get(&flow[task_index + 1]) - .unwrap() - .depend_on - .len() - { - 0 | 1 => { - format!( - "{execute_code}\n.term(Some({}_index))?;", - flow[task_index + 1].to_case(Case::Snake) - ) - } - - _ => { - format!( - "{execute_code}\n.pipe({}_index)?\n.term(None)?;", - flow[task_index + 1].to_case(Case::Snake) - ) - } - } - } else { - format!( - "{execute_code}\n.pipe({}_index)?", - flow[task_index + 1].to_case(Case::Snake) - ) - } - } - - Ok(execute_code) -} - /// Generates Rust code to add workflow nodes and edges /// /// # Arguments @@ -448,24 +419,29 @@ fn get_workflow_nodes_and_edges_code(workflow: &Workflow) -> Result Result { + pub fn main(args: Value) -> Result { const LIMIT: usize = 4; let mut workflow = WorkflowGraph::new(LIMIT); - let input: Input = serde_json::from_value(args).map_err(|e| e.to_string())?; + workflow.state_manager.update_workflow_initialized(); + + let input: Input = + serde_json::from_value(args.get(\"data\").unwrap().clone()).map_err(|e| e.to_string())?; + let prev_output: Vec = serde_json::from_value(args.get(\"prev_output\").unwrap().clone()) + .map_err(|e| e.to_string())?; " ); } @@ -665,7 +646,6 @@ pub fn main(args: Value) -> Result { let output = get_attributes(&attributes); assert_eq!(output, "[Key:\"value\"]"); - } #[test] @@ -1020,197 +1000,4 @@ impl_new!( || output == "impl_execute_trait!(Task1,Task0);" ); } - - #[test] - fn test_get_add_nodes_code() { - let flow = vec![ - "task0".to_string(), - "task2".to_string(), - "task1".to_string(), - "task4".to_string(), - "task3".to_string(), - ]; - - let output = get_add_nodes_code(&flow); - - assert_eq!( - output, - "\ -let task_0_index = workflow.add_node(Box::new(task_0)); -let task_2_index = workflow.add_node(Box::new(task_2)); -let task_1_index = workflow.add_node(Box::new(task_1)); -let task_4_index = workflow.add_node(Box::new(task_4)); -let task_3_index = workflow.add_node(Box::new(task_3)); -" - ) - } - - #[test] - fn test_get_add_edges_code() { - let task0 = Task { - action_name: "task0".to_string(), - ..Default::default() - }; - let task1 = Task { - action_name: "task1".to_string(), - depend_on: vec![Depend { - task_name: "task0".to_string(), - ..Default::default() - }], - ..Default::default() - }; - - let task2 = Task { - action_name: "task2".to_string(), - depend_on: vec![ - Depend { - task_name: "task1".to_string(), - ..Default::default() - }, - Depend { - task_name: "task0".to_string(), - ..Default::default() - }, - ], - ..Default::default() - }; - - let task3 = Task { - action_name: "task3".to_string(), - depend_on: vec![Depend { - task_name: "task2".to_string(), - ..Default::default() - }], - ..Default::default() - }; - - let task4 = Task { - action_name: "task4".to_string(), - depend_on: vec![ - Depend { - task_name: "task3".to_string(), - ..Default::default() - }, - Depend { - task_name: "task2".to_string(), - ..Default::default() - }, - ], - ..Default::default() - }; - - let mut tasks = HashMap::new(); - tasks.insert("task0".to_string(), task0); - tasks.insert("task1".to_string(), task1); - tasks.insert("task2".to_string(), task2); - tasks.insert("task3".to_string(), task3); - tasks.insert("task4".to_string(), task4); - - let workflow = Workflow { - name: "test-workflow".to_string(), - version: "0.0.1".to_string(), - tasks, - }; - - let flow = workflow.get_flow(); - - let output = get_add_edges_code(&workflow, &flow); - - assert_eq!( - output.unwrap(), - "\ -workflow.add_edges(&[ -(task_0_index, task_1_index), -(task_1_index, task_2_index), -(task_0_index, task_2_index), -(task_2_index, task_3_index), -(task_3_index, task_4_index), -(task_2_index, task_4_index), -]);" - ); - } - - #[test] - fn test_get_add_execute_workflow_code() { - let task0 = Task { - action_name: "task0".to_string(), - ..Default::default() - }; - let task1 = Task { - action_name: "task1".to_string(), - depend_on: vec![Depend { - task_name: "task0".to_string(), - ..Default::default() - }], - ..Default::default() - }; - - let task2 = Task { - action_name: "task2".to_string(), - depend_on: vec![ - Depend { - task_name: "task1".to_string(), - ..Default::default() - }, - Depend { - task_name: "task0".to_string(), - ..Default::default() - }, - ], - ..Default::default() - }; - - let task3 = Task { - action_name: "task3".to_string(), - depend_on: vec![Depend { - task_name: "task2".to_string(), - ..Default::default() - }], - ..Default::default() - }; - - let task4 = Task { - action_name: "task4".to_string(), - depend_on: vec![ - Depend { - task_name: "task3".to_string(), - ..Default::default() - }, - Depend { - task_name: "task2".to_string(), - ..Default::default() - }, - ], - ..Default::default() - }; - - let mut tasks = HashMap::new(); - tasks.insert("task0".to_string(), task0); - tasks.insert("task1".to_string(), task1); - tasks.insert("task2".to_string(), task2); - tasks.insert("task3".to_string(), task3); - tasks.insert("task4".to_string(), task4); - - let workflow = Workflow { - name: "test-workflow".to_string(), - version: "0.0.1".to_string(), - tasks, - }; - - let flow = workflow.get_flow(); - - let output = get_add_execute_workflow_code(&workflow, &flow); - - assert_eq!( - output.unwrap(), - "\ -let result = workflow -.init()? -.pipe(task_1_index)? -.pipe(task_2_index)? -.pipe(task_3_index)? -.pipe(task_4_index)? -.term(None)?;" - ); - } } From 1e892a1d2e0975a599fc485d6684ac080d78fc88 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Wed, 24 Apr 2024 17:08:26 +0530 Subject: [PATCH 3/4] chore: make execution state as init --- echo-library/src/boilerplate/src/state_manager.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/echo-library/src/boilerplate/src/state_manager.rs b/echo-library/src/boilerplate/src/state_manager.rs index 48bd54f..630dc62 100644 --- a/echo-library/src/boilerplate/src/state_manager.rs +++ b/echo-library/src/boilerplate/src/state_manager.rs @@ -13,7 +13,7 @@ enum ExecutionState { impl Default for ExecutionState { fn default() -> Self { - ExecutionState::Running + ExecutionState::Init } } @@ -102,5 +102,4 @@ impl StateManager { self.error = Some(error.to_string()); self.update_state_data(); } - } From 3a4d981e19f2ecbe7b69c2b7519c94d6bd9dc3d1 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Mon, 29 Apr 2024 12:23:17 +0530 Subject: [PATCH 4/4] refactor: change variable name 'i' to 'index' and 'val' to 'value' to increase readability --- echo-library/src/boilerplate/src/macros.rs | 12 ++++++------ echo-library/src/common/parse_module.rs | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/echo-library/src/boilerplate/src/macros.rs b/echo-library/src/boilerplate/src/macros.rs index 2bb7b8d..b45ad2f 100644 --- a/echo-library/src/boilerplate/src/macros.rs +++ b/echo-library/src/boilerplate/src/macros.rs @@ -140,13 +140,13 @@ macro_rules! impl_concat_setter { $input:ident ) => { impl $name { - pub fn setter(&mut self, val: Value) { - let val: Vec = serde_json::from_value(val).unwrap(); - let res = join_hashmap( - serde_json::from_value(val[0].to_owned()).unwrap(), - serde_json::from_value(val[1].to_owned()).unwrap(), + pub fn setter(&mut self, value: Value) { + let value: Vec = serde_json::from_value(value).unwrap(); + let response = join_hashmap( + serde_json::from_value(value[0].to_owned()).unwrap(), + serde_json::from_value(value[1].to_owned()).unwrap(), ); - self.input.$input = res; + self.input.$input = response; } } }; diff --git a/echo-library/src/common/parse_module.rs b/echo-library/src/common/parse_module.rs index cc14ca8..e5ae730 100644 --- a/echo-library/src/common/parse_module.rs +++ b/echo-library/src/common/parse_module.rs @@ -422,15 +422,15 @@ fn get_workflow_nodes_and_edges_code(workflow: &Workflow) -> Result