diff --git a/workflow/test_util/Cargo.toml b/workflow/test_util/Cargo.toml index d2cf5a6e..ed3ba6dd 100644 --- a/workflow/test_util/Cargo.toml +++ b/workflow/test_util/Cargo.toml @@ -44,4 +44,4 @@ openwhisk-client-rust = { git = "https://github.com/HugoByte/openwhisk-client-ru wiremock = "0.5.17" async-std = { version = "1.12.0", features = ["attributes"] } workflow_macro = {path = "../workflow_macro"} -dyn-clone = "1.0.7" \ No newline at end of file +dyn-clone = "1.0.7" diff --git a/workflow/test_util/src/lib.rs b/workflow/test_util/src/lib.rs index d061b0eb..b04e04d3 100644 --- a/workflow/test_util/src/lib.rs +++ b/workflow/test_util/src/lib.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; +mod types; #[cfg(test)] mod wasi_http; @@ -202,6 +203,7 @@ mod tests { #[cfg(test)] mod flow_macro_tests { + use super::*; use dyn_clone::{clone_trait_object, DynClone}; use serde_json::Value; use std::fmt::Debug; @@ -210,7 +212,10 @@ mod tests { pub trait Execute: Debug + DynClone { fn execute(&mut self) -> Result<(), String>; fn get_task_output(&self) -> Value; + fn set_result_output(&mut self, inp: Value); fn set_output_to_task(&mut self, inp: Value); + fn get_action_name(&self) -> String; + fn get_json_string(&self) -> String; } clone_trait_object!(Execute); @@ -219,6 +224,7 @@ mod tests { pub struct WorkflowGraph { edges: Vec<(usize, usize)>, nodes: Vec>, + state_manager: types::StateManager, } impl WorkflowGraph { @@ -226,6 +232,7 @@ mod tests { WorkflowGraph { nodes: Vec::with_capacity(size), edges: Vec::new(), + state_manager: types::StateManager::default(), } } } @@ -256,6 +263,18 @@ mod tests { fn set_output_to_task(&mut self, inp: Value) { todo!() } + + fn get_action_name(&self) -> String { + todo!() + } + + fn get_json_string(&self) -> String { + todo!() + } + + fn set_result_output(&mut self, inp: Value) { + todo!() + } } let node = Action::default(); let mut workflow = WorkflowGraph::new(5); diff --git a/workflow/test_util/src/types.rs b/workflow/test_util/src/types.rs new file mode 100644 index 00000000..386590b8 --- /dev/null +++ b/workflow/test_util/src/types.rs @@ -0,0 +1,36 @@ +use super::*; + +#[derive(Default, Debug, Serialize, Deserialize, Clone)] +enum ExecutionState { + #[default] + Init, + Running, + Paused, + Failed, + Success, +} + +#[allow(unused)] +#[derive(Default, Debug)] +pub struct StateManager { + action_name: String, + task_index: isize, + execution_state: ExecutionState, + output: Option, + error: Option, +} + +#[allow(unused)] +impl StateManager { + pub fn update_running(&mut self, action_name: &str, task_index: isize) { + todo!() + } + + pub fn update_success(&mut self, output: Value) { + todo!() + } + + pub fn update_err(&mut self, error: &str) { + todo!() + } +} diff --git a/workflow/workflow_macro/src/lib.rs b/workflow/workflow_macro/src/lib.rs index 863ccfeb..e0371dad 100644 --- a/workflow/workflow_macro/src/lib.rs +++ b/workflow/workflow_macro/src/lib.rs @@ -18,9 +18,7 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream { let methods = quote! { - - impl #workflow { - + impl #workflow{ pub fn node_count(&self) -> usize { self.nodes.len() } @@ -53,81 +51,69 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream { (0..self.node_count()).collect::>() } - pub fn init(&mut self) -> Result<&mut Self,String> { - match self.get_task_as_mut(0).execute(){ - Ok(()) => Ok(self), - Err(err) => Err(err) - } + pub fn run(&mut self, task_index: usize) -> Result<&mut Self, String> { + let len = self.nodes.len() - 1; - } - pub fn term(&mut self, task_index: Option) -> Result { - - match task_index { - Some(index) => { - let previous_index = (index - 1).try_into().unwrap(); - let previous_task = self.get_task(previous_index); - let previous_task_output = previous_task.get_task_output(); - let current_task = self.get_task_as_mut(index); - current_task.set_output_to_task(previous_task_output); - match current_task.execute(){ - Ok(()) => Ok(current_task.get_task_output()), - Err(err) => Err(err), - } - - }, - None => { - let len = self.node_count(); - Ok(self.get_task(len-1).get_task_output()) - }, - } - - } - - pub fn pipe(&mut self, task_index: usize) -> Result<&mut Self,String> { - let mut list = Vec::new(); - let edges_list = self.edges.clone(); - edges_list.iter().for_each(|(source, destination)| { - if destination == &task_index { - list.push(source) - } - }); - let mut res: Vec = Vec::new(); - match list.len() { - 0 => { - match self.get_task_as_mut(task_index).execute(){ - - Ok(()) => Ok(self), - Err(err) => Err(err), + let task = self.get_task(task_index); + let action_name = task.get_action_name(); + self.state_manager.update_running(&action_name, task_index as isize); + let result = { + let mut list = Vec::new(); + let edges_list = self.edges.clone(); + edges_list.iter().for_each(|(source, destination)| { + if destination == &task_index { + list.push(source) } + }); + + match list.len() { + 0 => { + let task = self.get_task_as_mut(task_index); + match task.execute() { + Ok(()) => Ok(task.get_task_output()), + Err(err) => Err(err), + } }, - 1 => { - let previous_task_output = self.get_task(*list[0]).get_task_output(); - let current_task = self.get_task_as_mut(task_index); - current_task.set_output_to_task(previous_task_output); - match current_task.execute(){ - Ok(()) => Ok(self), - Err(err) => Err(err), + 1 => { + let previous_task_output = self.get_task(*list[0]).get_task_output(); + let current_task = self.get_task_as_mut(task_index); + current_task.set_output_to_task(previous_task_output); + match current_task.execute() { + Ok(()) => Ok(current_task.get_task_output()), + Err(err) => Err(err), + } + } + _ => { + let mut outputs: Vec = list + .iter() + .map(|index| { + let previous_task = self.get_task(**index); + let previous_task_output = previous_task.get_task_output(); + previous_task_output + }) + .collect(); + + let outputs: Value = outputs.into(); + let current_task = self.get_task_as_mut(task_index); + current_task.set_output_to_task(outputs); + + match current_task.execute() { + Ok(()) => Ok(current_task.get_task_output()), + Err(err) => Err(err), + } } } - _ => { - res = list - .iter() - .map(|index| { - let previous_task = self.get_task(**index); - let previous_task_output = previous_task.get_task_output(); - previous_task_output - }) - .collect(); - - let s: Value = res.into(); - let current_task = self.get_task_as_mut(task_index); - current_task.set_output_to_task(s); - - match current_task.execute(){ - Ok(()) => Ok(self), - Err(err) => Err(err), - } + }; + + match result { + Ok(output) => { + self.state_manager.update_success(output); + Ok(self) + } + Err(err) => { + self.state_manager.update_err(&err); + Err(err) } } }