From bb898d0b23fec694255873a47ce28f945c9d1773 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Tue, 19 Mar 2024 16:07:22 +0530 Subject: [PATCH 1/3] chore: implement state management inside workflow macro --- workflow/test_util/Cargo.toml | 2 +- workflow/test_util/src/main.rs | 20 ++++ workflow/test_util/src/types.rs | 63 ++++++++++++ workflow/workflow_macro/src/lib.rs | 148 +++++++++++++---------------- 4 files changed, 152 insertions(+), 81 deletions(-) create mode 100644 workflow/test_util/src/types.rs diff --git a/workflow/test_util/Cargo.toml b/workflow/test_util/Cargo.toml index d2cf5a6e..ed3ba6dd 100644 --- a/workflow/test_util/Cargo.toml +++ b/workflow/test_util/Cargo.toml @@ -44,4 +44,4 @@ openwhisk-client-rust = { git = "https://github.com/HugoByte/openwhisk-client-ru wiremock = "0.5.17" async-std = { version = "1.12.0", features = ["attributes"] } workflow_macro = {path = "../workflow_macro"} -dyn-clone = "1.0.7" \ No newline at end of file +dyn-clone = "1.0.7" diff --git a/workflow/test_util/src/main.rs b/workflow/test_util/src/main.rs index 812e71fd..fa4f8c3f 100644 --- a/workflow/test_util/src/main.rs +++ b/workflow/test_util/src/main.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; #[cfg(test)] mod wasi_http; +mod types; pub mod helper; pub use helper::*; @@ -204,6 +205,7 @@ mod tests { #[cfg(test)] mod flow_macro_tests { + use super::*; use dyn_clone::{clone_trait_object, DynClone}; use serde_json::Value; use std::fmt::Debug; @@ -212,15 +214,20 @@ mod tests { pub trait Execute: Debug + DynClone { fn execute(&mut self) -> Result<(), String>; fn get_task_output(&self) -> Value; + fn set_result_output(&mut self, inp: Value); fn set_output_to_task(&mut self, inp: Value); + fn get_action_name(&self) -> String; + fn get_json_string(&self) -> String; } + clone_trait_object!(Execute); #[derive(Debug, Flow)] #[allow(dead_code)] pub struct WorkflowGraph { edges: Vec<(usize, usize)>, nodes: Vec>, + state_manager: types::StateManager, } impl WorkflowGraph { @@ -228,6 +235,7 @@ mod tests { WorkflowGraph { nodes: Vec::with_capacity(size), edges: Vec::new(), + state_manager: types::StateManager::init(), } } } @@ -258,6 +266,18 @@ mod tests { fn set_output_to_task(&mut self, inp: Value) { todo!() } + + fn get_action_name(&self) -> String{ + todo!() + } + + fn get_json_string(&self) -> String{ + todo!() + } + + fn set_result_output(&mut self, inp: Value) { + todo!() + } } let node = Action::default(); let mut workflow = WorkflowGraph::new(5); diff --git a/workflow/test_util/src/types.rs b/workflow/test_util/src/types.rs new file mode 100644 index 00000000..994cfd23 --- /dev/null +++ b/workflow/test_util/src/types.rs @@ -0,0 +1,63 @@ +use super::*; + +#[derive(Debug, Serialize, Deserialize, Clone)] +enum ExecutionState { + Init, + Running, + Paused, + Failed, + Success, +} + +impl Default for ExecutionState { + fn default() -> Self { + ExecutionState::Running + } +} + +#[allow(unused)] +#[derive(Default, Debug)] +pub struct StateManager{ + action_name: String, + task_index: isize, + execution_state: ExecutionState, + output: Option, + error: Option, +} + +#[allow(unused)] +impl StateManager { + + fn update_state_data(&self) { + todo!() + } + + pub fn init() -> Self { + todo!() + } + + pub fn update_workflow_initialized(&mut self) { + todo!() + } + + pub fn update_running(&mut self, action_name: &str, task_index: isize) { + todo!() + } + + pub fn update_pause(&mut self) { + todo!() + } + + pub fn update_success(&mut self, output: Value) { + todo!() + } + + pub fn update_restore_success(&mut self, action_name: &str, task_index: isize, output: Value) { + todo!() + } + + pub fn update_err(&mut self, error: &str) { + todo!() + } + +} diff --git a/workflow/workflow_macro/src/lib.rs b/workflow/workflow_macro/src/lib.rs index 863ccfeb..d8ec6ecf 100644 --- a/workflow/workflow_macro/src/lib.rs +++ b/workflow/workflow_macro/src/lib.rs @@ -18,116 +18,104 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream { let methods = quote! { - - impl #workflow { - + impl #workflow{ pub fn node_count(&self) -> usize { self.nodes.len() } - + pub fn add_node(&mut self, task: Box) -> usize { let len = self.nodes.len(); self.nodes.push(task); len } - + pub fn add_edge(&mut self, parent: usize, child: usize) { self.edges.push((parent, child)); } - + pub fn add_edges(&mut self, edges: &[(usize, usize)]) { edges .iter() .for_each(|(source, destination)| self.add_edge(*source, *destination)); } - + pub fn get_task(&self, index: usize) -> &Box { self.nodes.get(index).unwrap() } - + pub fn get_task_as_mut(&mut self, index: usize) -> &mut Box { self.nodes.get_mut(index).unwrap() } - + pub fn node_indices(&self) -> Vec { (0..self.node_count()).collect::>() } - - pub fn init(&mut self) -> Result<&mut Self,String> { - match self.get_task_as_mut(0).execute(){ - Ok(()) => Ok(self), - Err(err) => Err(err) - } - - } - pub fn term(&mut self, task_index: Option) -> Result { - - match task_index { - Some(index) => { - let previous_index = (index - 1).try_into().unwrap(); - let previous_task = self.get_task(previous_index); - let previous_task_output = previous_task.get_task_output(); - let current_task = self.get_task_as_mut(index); - current_task.set_output_to_task(previous_task_output); - match current_task.execute(){ - Ok(()) => Ok(current_task.get_task_output()), - Err(err) => Err(err), + + pub fn run(&mut self, task_index: usize) -> Result<&mut Self, String> { + let len = self.nodes.len() - 1; + + let task = self.get_task(task_index); + let action_name = task.get_action_name(); + self.state_manager.update_running(&action_name, task_index as isize); + + let result = { + + let mut list = Vec::new(); + let edges_list = self.edges.clone(); + edges_list.iter().for_each(|(source, destination)| { + if destination == &task_index { + list.push(source) } - + }); + + match list.len() { + 0 => { + + let task = self.get_task_as_mut(task_index); + match task.execute() { + Ok(()) => Ok(task.get_task_output()), + Err(err) => Err(err), + } }, - None => { - let len = self.node_count(); - Ok(self.get_task(len-1).get_task_output()) - }, - } - - } - - pub fn pipe(&mut self, task_index: usize) -> Result<&mut Self,String> { - let mut list = Vec::new(); - let edges_list = self.edges.clone(); - edges_list.iter().for_each(|(source, destination)| { - if destination == &task_index { - list.push(source) - } - }); - let mut res: Vec = Vec::new(); - match list.len() { - 0 => { - match self.get_task_as_mut(task_index).execute(){ - - Ok(()) => Ok(self), - Err(err) => Err(err), - + 1 => { + let previous_task_output = self.get_task(*list[0]).get_task_output(); + let current_task = self.get_task_as_mut(task_index); + current_task.set_output_to_task(previous_task_output); + match current_task.execute() { + Ok(()) => Ok(current_task.get_task_output()), + Err(err) => Err(err), + } } - }, - 1 => { - let previous_task_output = self.get_task(*list[0]).get_task_output(); - let current_task = self.get_task_as_mut(task_index); - current_task.set_output_to_task(previous_task_output); - match current_task.execute(){ - Ok(()) => Ok(self), - Err(err) => Err(err), + _ => { + let mut res: Vec = list + .iter() + .map(|index| { + let previous_task = self.get_task(**index); + let previous_task_output = previous_task.get_task_output(); + previous_task_output + }) + .collect(); + + let s: Value = res.into(); + let current_task = self.get_task_as_mut(task_index); + current_task.set_output_to_task(s); + + match current_task.execute() { + Ok(()) => Ok(current_task.get_task_output()), + Err(err) => Err(err), + } } } - _ => { - res = list - .iter() - .map(|index| { - let previous_task = self.get_task(**index); - let previous_task_output = previous_task.get_task_output(); - previous_task_output - }) - .collect(); - - let s: Value = res.into(); - let current_task = self.get_task_as_mut(task_index); - current_task.set_output_to_task(s); - - match current_task.execute(){ - Ok(()) => Ok(self), - Err(err) => Err(err), - } + }; + + match result { + Ok(output) => { + self.state_manager.update_success(output); + Ok(self) + } + Err(err) => { + self.state_manager.update_err(&err); + Err(err) } } } From 78247aa05d0d9b8dd8ae21927ac5d1bd964494cd Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Wed, 24 Apr 2024 17:30:49 +0530 Subject: [PATCH 2/3] chore: remove unwanted method implementations in test util --- workflow/test_util/src/lib.rs | 13 ++++++------- workflow/test_util/src/types.rs | 33 +++------------------------------ 2 files changed, 9 insertions(+), 37 deletions(-) diff --git a/workflow/test_util/src/lib.rs b/workflow/test_util/src/lib.rs index 5ae133ad..b04e04d3 100644 --- a/workflow/test_util/src/lib.rs +++ b/workflow/test_util/src/lib.rs @@ -1,8 +1,8 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; +mod types; #[cfg(test)] mod wasi_http; -mod types; pub mod helper; pub use helper::*; @@ -218,7 +218,6 @@ mod tests { fn get_json_string(&self) -> String; } - clone_trait_object!(Execute); #[derive(Debug, Flow)] #[allow(dead_code)] @@ -233,7 +232,7 @@ mod tests { WorkflowGraph { nodes: Vec::with_capacity(size), edges: Vec::new(), - state_manager: types::StateManager::init(), + state_manager: types::StateManager::default(), } } } @@ -265,14 +264,14 @@ mod tests { todo!() } - fn get_action_name(&self) -> String{ + fn get_action_name(&self) -> String { todo!() } - - fn get_json_string(&self) -> String{ + + fn get_json_string(&self) -> String { todo!() } - + fn set_result_output(&mut self, inp: Value) { todo!() } diff --git a/workflow/test_util/src/types.rs b/workflow/test_util/src/types.rs index 994cfd23..386590b8 100644 --- a/workflow/test_util/src/types.rs +++ b/workflow/test_util/src/types.rs @@ -1,7 +1,8 @@ use super::*; -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Default, Debug, Serialize, Deserialize, Clone)] enum ExecutionState { + #[default] Init, Running, Paused, @@ -9,15 +10,9 @@ enum ExecutionState { Success, } -impl Default for ExecutionState { - fn default() -> Self { - ExecutionState::Running - } -} - #[allow(unused)] #[derive(Default, Debug)] -pub struct StateManager{ +pub struct StateManager { action_name: String, task_index: isize, execution_state: ExecutionState, @@ -27,37 +22,15 @@ pub struct StateManager{ #[allow(unused)] impl StateManager { - - fn update_state_data(&self) { - todo!() - } - - pub fn init() -> Self { - todo!() - } - - pub fn update_workflow_initialized(&mut self) { - todo!() - } - pub fn update_running(&mut self, action_name: &str, task_index: isize) { todo!() } - pub fn update_pause(&mut self) { - todo!() - } - pub fn update_success(&mut self, output: Value) { todo!() } - pub fn update_restore_success(&mut self, action_name: &str, task_index: isize, output: Value) { - todo!() - } - pub fn update_err(&mut self, error: &str) { todo!() } - } From 74776cd0ac981639ab008add12211c04b7ce31ee Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Mon, 29 Apr 2024 11:38:38 +0530 Subject: [PATCH 3/3] refactor: rename variable 's' with 'outputs' for clarity --- workflow/workflow_macro/src/lib.rs | 34 ++++++++++++++---------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/workflow/workflow_macro/src/lib.rs b/workflow/workflow_macro/src/lib.rs index d8ec6ecf..e0371dad 100644 --- a/workflow/workflow_macro/src/lib.rs +++ b/workflow/workflow_macro/src/lib.rs @@ -22,44 +22,43 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream { pub fn node_count(&self) -> usize { self.nodes.len() } - + pub fn add_node(&mut self, task: Box) -> usize { let len = self.nodes.len(); self.nodes.push(task); len } - + pub fn add_edge(&mut self, parent: usize, child: usize) { self.edges.push((parent, child)); } - + pub fn add_edges(&mut self, edges: &[(usize, usize)]) { edges .iter() .for_each(|(source, destination)| self.add_edge(*source, *destination)); } - + pub fn get_task(&self, index: usize) -> &Box { self.nodes.get(index).unwrap() } - + pub fn get_task_as_mut(&mut self, index: usize) -> &mut Box { self.nodes.get_mut(index).unwrap() } - + pub fn node_indices(&self) -> Vec { (0..self.node_count()).collect::>() } - + pub fn run(&mut self, task_index: usize) -> Result<&mut Self, String> { let len = self.nodes.len() - 1; - + let task = self.get_task(task_index); let action_name = task.get_action_name(); self.state_manager.update_running(&action_name, task_index as isize); - + let result = { - let mut list = Vec::new(); let edges_list = self.edges.clone(); edges_list.iter().for_each(|(source, destination)| { @@ -67,10 +66,9 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream { list.push(source) } }); - + match list.len() { 0 => { - let task = self.get_task_as_mut(task_index); match task.execute() { Ok(()) => Ok(task.get_task_output()), @@ -87,7 +85,7 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream { } } _ => { - let mut res: Vec = list + let mut outputs: Vec = list .iter() .map(|index| { let previous_task = self.get_task(**index); @@ -95,11 +93,11 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream { previous_task_output }) .collect(); - - let s: Value = res.into(); + + let outputs: Value = outputs.into(); let current_task = self.get_task_as_mut(task_index); - current_task.set_output_to_task(s); - + current_task.set_output_to_task(outputs); + match current_task.execute() { Ok(()) => Ok(current_task.get_task_output()), Err(err) => Err(err), @@ -107,7 +105,7 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream { } } }; - + match result { Ok(output) => { self.state_manager.update_success(output);