Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion workflow/test_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ openwhisk-client-rust = { git = "https://github.com/HugoByte/openwhisk-client-ru
wiremock = "0.5.17"
async-std = { version = "1.12.0", features = ["attributes"] }
workflow_macro = {path = "../workflow_macro"}
dyn-clone = "1.0.7"
dyn-clone = "1.0.7"
19 changes: 19 additions & 0 deletions workflow/test_util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
mod types;
#[cfg(test)]
mod wasi_http;

Expand Down Expand Up @@ -202,6 +203,7 @@ mod tests {

#[cfg(test)]
mod flow_macro_tests {
use super::*;
use dyn_clone::{clone_trait_object, DynClone};
use serde_json::Value;
use std::fmt::Debug;
Expand All @@ -210,7 +212,10 @@ mod tests {
pub trait Execute: Debug + DynClone {
fn execute(&mut self) -> Result<(), String>;
fn get_task_output(&self) -> Value;
fn set_result_output(&mut self, inp: Value);
fn set_output_to_task(&mut self, inp: Value);
fn get_action_name(&self) -> String;
fn get_json_string(&self) -> String;
}

clone_trait_object!(Execute);
Expand All @@ -219,13 +224,15 @@ mod tests {
pub struct WorkflowGraph {
edges: Vec<(usize, usize)>,
nodes: Vec<Box<dyn Execute>>,
state_manager: types::StateManager,
}

impl WorkflowGraph {
pub fn new(size: usize) -> Self {
WorkflowGraph {
nodes: Vec::with_capacity(size),
edges: Vec::new(),
state_manager: types::StateManager::default(),
}
}
}
Expand Down Expand Up @@ -256,6 +263,18 @@ mod tests {
fn set_output_to_task(&mut self, inp: Value) {
todo!()
}

fn get_action_name(&self) -> String {
todo!()
}

fn get_json_string(&self) -> String {
todo!()
}

fn set_result_output(&mut self, inp: Value) {
todo!()
}
}
let node = Action::default();
let mut workflow = WorkflowGraph::new(5);
Expand Down
36 changes: 36 additions & 0 deletions workflow/test_util/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use super::*;

#[derive(Default, Debug, Serialize, Deserialize, Clone)]
enum ExecutionState {
#[default]
Init,
Running,
Paused,
Failed,
Success,
}

#[allow(unused)]
#[derive(Default, Debug)]
pub struct StateManager {
action_name: String,
task_index: isize,
execution_state: ExecutionState,
output: Option<Value>,
error: Option<String>,
}

#[allow(unused)]
impl StateManager {
pub fn update_running(&mut self, action_name: &str, task_index: isize) {
todo!()
}

pub fn update_success(&mut self, output: Value) {
todo!()
}

pub fn update_err(&mut self, error: &str) {
todo!()
}
}
130 changes: 58 additions & 72 deletions workflow/workflow_macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream {

let methods = quote! {


impl #workflow {

impl #workflow{
pub fn node_count(&self) -> usize {
self.nodes.len()
}
Expand Down Expand Up @@ -53,81 +51,69 @@ fn impl_workflow(ast: DeriveInput) -> TokenStream {
(0..self.node_count()).collect::<Vec<_>>()
}

pub fn init(&mut self) -> Result<&mut Self,String> {
match self.get_task_as_mut(0).execute(){
Ok(()) => Ok(self),
Err(err) => Err(err)
}
pub fn run(&mut self, task_index: usize) -> Result<&mut Self, String> {
let len = self.nodes.len() - 1;

}
pub fn term(&mut self, task_index: Option<usize>) -> Result<Value,String> {

match task_index {
Some(index) => {
let previous_index = (index - 1).try_into().unwrap();
let previous_task = self.get_task(previous_index);
let previous_task_output = previous_task.get_task_output();
let current_task = self.get_task_as_mut(index);
current_task.set_output_to_task(previous_task_output);
match current_task.execute(){
Ok(()) => Ok(current_task.get_task_output()),
Err(err) => Err(err),
}

},
None => {
let len = self.node_count();
Ok(self.get_task(len-1).get_task_output())
},
}

}

pub fn pipe(&mut self, task_index: usize) -> Result<&mut Self,String> {
let mut list = Vec::new();
let edges_list = self.edges.clone();
edges_list.iter().for_each(|(source, destination)| {
if destination == &task_index {
list.push(source)
}
});
let mut res: Vec<Value> = Vec::new();
match list.len() {
0 => {
match self.get_task_as_mut(task_index).execute(){

Ok(()) => Ok(self),
Err(err) => Err(err),
let task = self.get_task(task_index);
let action_name = task.get_action_name();
self.state_manager.update_running(&action_name, task_index as isize);

let result = {
let mut list = Vec::new();
let edges_list = self.edges.clone();
edges_list.iter().for_each(|(source, destination)| {
if destination == &task_index {
list.push(source)
}
});

match list.len() {
0 => {
let task = self.get_task_as_mut(task_index);
match task.execute() {
Ok(()) => Ok(task.get_task_output()),
Err(err) => Err(err),
}
},
1 => {
let previous_task_output = self.get_task(*list[0]).get_task_output();
let current_task = self.get_task_as_mut(task_index);
current_task.set_output_to_task(previous_task_output);
match current_task.execute(){
Ok(()) => Ok(self),
Err(err) => Err(err),
1 => {
let previous_task_output = self.get_task(*list[0]).get_task_output();
let current_task = self.get_task_as_mut(task_index);
current_task.set_output_to_task(previous_task_output);
match current_task.execute() {
Ok(()) => Ok(current_task.get_task_output()),
Err(err) => Err(err),
}
}
_ => {
let mut outputs: Vec<Value> = list
.iter()
.map(|index| {
let previous_task = self.get_task(**index);
let previous_task_output = previous_task.get_task_output();
previous_task_output
})
.collect();

let outputs: Value = outputs.into();
let current_task = self.get_task_as_mut(task_index);
current_task.set_output_to_task(outputs);

match current_task.execute() {
Ok(()) => Ok(current_task.get_task_output()),
Err(err) => Err(err),
}
}
}
_ => {
res = list
.iter()
.map(|index| {
let previous_task = self.get_task(**index);
let previous_task_output = previous_task.get_task_output();
previous_task_output
})
.collect();

let s: Value = res.into();
let current_task = self.get_task_as_mut(task_index);
current_task.set_output_to_task(s);

match current_task.execute(){
Ok(()) => Ok(self),
Err(err) => Err(err),
}
};

match result {
Ok(output) => {
self.state_manager.update_success(output);
Ok(self)
}
Err(err) => {
self.state_manager.update_err(&err);
Err(err)
}
}
}
Expand Down