From f8c8b0e13fa8f82f8c8fecb8d2ff64f31a37e8dc Mon Sep 17 00:00:00 2001 From: Daniel Voogsgerd Date: Thu, 31 Oct 2024 11:59:20 +0100 Subject: [PATCH 1/2] refactor(let): Cleaned up the exec ecu code a lot - Remove old commented code - Use more rusty language features (more consise) --- brane-let/src/exec_ecu.rs | 494 +++++--------------------------------- brane-let/src/main.rs | 54 +---- 2 files changed, 62 insertions(+), 486 deletions(-) diff --git a/brane-let/src/exec_ecu.rs b/brane-let/src/exec_ecu.rs index 14439b88..d5884fc3 100644 --- a/brane-let/src/exec_ecu.rs +++ b/brane-let/src/exec_ecu.rs @@ -45,8 +45,6 @@ const PREFIX: &str = "~~>"; /***** ENTRYPOINT *****/ -/// **Edited: working with new callback interface + events.** -/// /// Handles a package containing ExeCUtable code (ECU). /// /// **Arguments** @@ -66,69 +64,19 @@ pub async fn handle( debug!("Executing '{}' (ecu) using arguments:\n{:#?}", function, arguments); // Initialize the package - let (container_info, function) = match initialize(&function, &arguments, &working_dir) { - Ok(results) => { - // if let Some(callback) = callback { - // if let Err(err) = callback.initialized().await { warn!("Could not update driver on Initialized: {}", err); } - // } - - info!("Reached target 'Initialized'"); - results - }, - Err(err) => { - // if let Some(callback) = callback { - // if let Err(err) = callback.initialize_failed(format!("{}", &err)).await { warn!("Could not update driver on InitializeFailed: {}", err); } - // } - return Err(err); - }, - }; + let (container_info, function) = initialize(&function, &arguments, &working_dir)?; + info!("Reached target 'Initialized'"); // Launch the job - let (command, process) = match start(&container_info, &function, &arguments, &working_dir) { - Ok(result) => { - // if let Some(callback) = callback { - // if let Err(err) = callback.started().await { warn!("Could not update driver on Started: {}", err); } - // } - - info!("Reached target 'Started'"); - result - }, - Err(err) => { - // if let Some(callback) = callback { - // if let Err(err) = callback.start_failed(format!("{}", &err)).await { warn!("Could not update driver on StartFailed: {}", err); } - // } - return Err(err); - }, - }; + let (command, process) = start(&container_info, &function, &arguments, &working_dir)?; + info!("Reached target 'Started'"); // Wait until the job is completed - let result = match complete(process).await { - Ok(result) => { - // if let Some(callback) = callback { - // if let Err(err) = callback.completed().await { warn!("Could not update driver on Completed: {}", err); } - // } - - info!("Reached target 'Completed'"); - result - }, - Err(err) => { - // if let Some(callback) = callback { - // if let Err(err) = callback.complete_failed(format!("{}", &err)).await { warn!("Could not update driver on CompleteFailed: {}", err); } - // } - return Err(err); - }, - }; + let result = complete(process).await?; + info!("Reached target 'Completed'"); // Convert the call to a PackageReturn value instead of state - let result = match decode(result, &command.capture) { - Ok(result) => result, - Err(err) => { - // if let Some(callback) = callback { - // if let Err(err) = callback.decode_failed(format!("{}", &err)).await { warn!("Could not update driver on DecodeFailed: {}", err); } - // } - return Err(err); - }, - }; + let result = decode(result, &command.capture)?; info!("Reached target 'Decode'"); // Return the package call result! @@ -140,32 +88,26 @@ pub async fn handle( /***** INITIALIZATION *****/ -/// **Edited: returning LetErrors + now also doing the steps before the specific working dir initialization.** -/// /// Initializes the environment for the nested package by reading the container.yml and preparing the working directory. /// -/// **Arguments** -/// * `function`: The function name to execute in the package. -/// * `arguments`: The arguments, as a map of argument name / value pairs. -/// * `working_dir`: The wokring directory for this package. +/// Arguments: +/// * `function`: The function name to execute in the package. +/// * `arguments`: The arguments, as a map of argument name / value pairs. +/// * `working_dir`: The wokring directory for this package. /// -/// **Returns** -/// * On success, a tuple with (in order): -/// * The LocalContainerInfo struct representing the local_container.yml in this package -/// * The function represented as an Action that we should execute -/// * A list of Parmaters describing the function's _output_ -/// * On failure: -/// * A LetError describing what went wrong. +/// Returns: +/// * On success, a tuple with (in order): +/// * The LocalContainerInfo struct representing the local_container.yml in this package +/// * The function represented as an Action that we should execute +/// * A list of Parmaters describing the function's _output_ +/// * On failure: +/// * A LetError describing what went wrong. fn initialize(function: &str, arguments: &Map, working_dir: &Path) -> Result<(LocalContainerInfo, Action), LetError> { debug!("Reading local_container.yml..."); // Get the container info from the path let container_info_path = working_dir.join("local_container.yml"); - let container_info = match LocalContainerInfo::from_path(container_info_path.clone()) { - Ok(container_info) => container_info, - Err(err) => { - return Err(LetError::LocalContainerInfoError { path: container_info_path, err }); - }, - }; + let container_info = LocalContainerInfo::from_path(container_info_path.clone()) + .map_err(|err| LetError::LocalContainerInfoError { path: container_info_path, err })?; // Resolve the function we're supposed to call let action = match container_info.actions.get(function) { @@ -180,8 +122,6 @@ fn initialize(function: &str, arguments: &Map, working_dir: &Path) -> // Make sure the input matches what we expect assert_input(&function_input, arguments, function, &container_info.name, container_info.kind)?; - - debug!("Preparing working directory..."); let init_sh = working_dir.join("init.sh"); if !init_sh.exists() { @@ -193,12 +133,8 @@ fn initialize(function: &str, arguments: &Map, working_dir: &Path) -> let mut command = Command::new(init_sh); command.stdout(Stdio::piped()); command.stderr(Stdio::piped()); - let result = match command.output() { - Ok(result) => result, - Err(err) => { - return Err(LetError::WorkdirInitLaunchError { command: format!("{command:?}"), err }); - }, - }; + let result = command.output().map_err(|err| LetError::WorkdirInitLaunchError { command: format!("{command:?}"), err })?; + if !result.status.success() { return Err(LetError::WorkdirInitError { command: format!("{command:?}"), @@ -237,23 +173,8 @@ fn start( let entrypoint = &container_info.entrypoint.exec; let command = function.command.clone().unwrap_or_else(|| ActionCommand { args: Default::default(), capture: None }); let entrypoint_path = working_dir.join(entrypoint); - let entrypoint_path = match entrypoint_path.canonicalize() { - Ok(entrypoint_path) => entrypoint_path, - Err(err) => { - return Err(LetError::EntrypointPathError { path: entrypoint_path, err }); - }, - }; + let entrypoint_path = entrypoint_path.canonicalize().map_err(|err| LetError::EntrypointPathError { path: entrypoint_path, err })?; - // Prepare the actual subprocess crate command to execute - // No idea what is happening here precisely, so disabling it until I run into it missing >:) - // let command = if entrypoint_path.is_file() { - // Exec::cmd(entrypoint_path) - // } else { - // let segments = entrypoint.split_whitespace().collect::>(); - // let entrypoint_path = working_dir.join(&segments[0]).canonicalize()?; - - // Exec::cmd(entrypoint_path).args(&segments[1..]) - // }; let mut exec_command = TokioCommand::new(entrypoint_path); // Construct the environment variables @@ -266,19 +187,11 @@ fn start( exec_command.envs(envs); exec_command.stdout(Stdio::piped()); exec_command.stderr(Stdio::piped()); - let process = match exec_command.spawn() { - Ok(process) => process, - Err(err) => { - return Err(LetError::PackageLaunchError { command: format!("{exec_command:?}"), err }); - }, - }; + let process = exec_command.spawn().map_err(|err| LetError::PackageLaunchError { command: format!("{exec_command:?}"), err })?; - // Done, return the process!! Ok((command, process)) } -/// **Edited: now returning LetErrors.** -/// /// Creates a map with enviroment variables for the nested package based on the given arguments. /// /// **Arguments** @@ -288,6 +201,7 @@ fn start( /// A new map with the environment on success, or a LetError on failure. fn construct_envs(variables: &Map) -> Result, LetError> { // Simply add the values one-by-one + // FIXME: Use iterators let mut envs = Map::::new(); for (name, variable) in variables.iter() { // Get an UPPERCASE equivalent of the variable name for proper environment variable naming scheme @@ -298,92 +212,15 @@ fn construct_envs(variables: &Map) -> Result, LetError> { } // Convert the argument's value to some sort of valid string - envs.insert(name.clone(), match serde_json::to_string(variable) { - Ok(value) => value, - Err(err) => { - return Err(LetError::SerializeError { argument: name, data_type: variable.data_type(), err }); - }, - }); - // use FullValue::*; - // match variable { - // Boolean(value) => { envs.insert(name, format!("{}", value)); }, - // Integer(value) => { envs.insert(name, format!("{}", value)); }, - // Real(value) => { envs.insert(name, format!("{}", value)); }, - // String(value) => { envs.insert(name, value.clone()); }, - - // Array(values) => { envs.insert(name.clone(), match serde_json::to_string(values) { - // Ok(value) => value, - // Err(err) => { return Err(LetError::ArraySerializeError{ argument: name, err }); }, - // }); }, - // Instance(c_name, values) => { envs.insert(name.clone(), match serde_json::to_string(values) { - // Ok(value) => value, - // Err(err) => { return Err(LetError::ClassSerializeError{ argument: name, class: c_name.clone(), err }); } - // }); }, - - // // The rest (i.e., Void) is not supported. - // _ => return Err(LetError::UnsupportedType{ argument: name.clone(), elem_type: variable.data_type() }), - // } + envs.insert( + name.clone(), + serde_json::to_string(variable).map_err(|err| LetError::SerializeError { argument: name, data_type: variable.data_type(), err })?, + ); } Ok(envs) } -// /// **Edited: now returning LetErrors + accepting a single basename instead of name + index.** -// /// -// /// Translates a struct to environment variables. -// /// -// /// **Arguments** -// /// * `base_name`: The base name of the struct environment variable, which is either its name or an array element. -// /// * `properties`: The struct's properties. -// /// * `envs`: The resulting dict containing the environment. -// /// -// /// **Returns** -// /// Nothing on success, or a LetError otherwise. -// fn construct_struct_envs( -// base_name: &str, -// properties: &Map, -// envs: &mut Map, -// ) -> Result<(), LetError> { -// // Simply add each property under its own name -// for (key, entry) in properties.iter() { -// // Make sure the field name doesn't already exist -// let field_name = format!("{}_{}", base_name, key); -// if envs.contains_key(&field_name) { return Err(LetError::DuplicateStructArgument{ sname: base_name.to_string(), field: key.clone(), name: field_name }); } - -// // Match on the value type -// let value = match entry { -// Value::Array { entries: _, .. } => { return Err(LetError::UnsupportedStructArray{ name: base_name.to_string(), field: key.clone() }) }, -// Value::Boolean(value) => value.to_string(), -// Value::Integer(value) => value.to_string(), -// Value::Real(value) => value.to_string(), -// Value::Unicode(value) => value.to_string(), -// Value::Struct { data_type, properties } => match data_type.as_str() { -// "Directory" | "File" => { -// // Make sure they have a URL field -// let value = match properties.get("url") { -// Some(value) => value.to_string(), -// None => { return Err(LetError::IllegalNestedURL{ name: base_name.to_string(), field: key.clone() }); } -// }; -// // Construct the nested field name -// let nested_field_name = format!("{}_URL", field_name); -// if envs.contains_key(&nested_field_name) { return Err(LetError::DuplicateStructArgument{ sname: field_name, field: "URL".to_string(), name: nested_field_name }); } -// // Add it! -// envs.insert(nested_field_name, value); -// continue; -// } -// _ => { return Err(LetError::UnsupportedNestedStruct{ name: base_name.to_string(), field: key.clone() }); }, -// }, -// _ => { return Err(LetError::UnsupportedStructField{ name: base_name.to_string(), field: key.clone(), elem_type: entry.data_type() }); }, -// }; - -// // Add the converted value -// envs.insert(field_name, value); -// } - -// // Done! -// Ok(()) -// } - @@ -404,6 +241,7 @@ async fn complete( let mut process = process; // Handle waiting for the subprocess and doing heartbeats in a neat way, using select + // FIXME: We are not doing heartbeats anymore, so I think this can be cleaned up a bit let status = loop { // Prepare the timer let sleep = time::sleep(Duration::from_millis(HEARTBEAT_DELAY)); @@ -416,12 +254,6 @@ async fn complete( Some(status) }, _ = &mut sleep => { - // // Timeout occurred; send the heartbeat and continue - // if let Some(callback) = callback { - // if let Err(err) = callback.heartbeat().await { warn!("Could not update driver on Heartbeat: {}", err); } - // else { debug!("Sent Heartbeat to driver."); } - // } - // Stop without result None }, @@ -433,56 +265,35 @@ async fn complete( } }; - // Match the status result - let status = match status { - Ok(status) => status, - Err(err) => { - return Err(LetError::PackageRunError { err }); - }, - }; + let status = status.map_err(|err| LetError::PackageRunError { err })?; // Try to get stdout and stderr readers - let mut stdout = match process.stdout { - Some(stdout) => stdout, - None => { - return Err(LetError::ClosedStdout); - }, - }; - let mut stderr = match process.stderr { - Some(stderr) => stderr, - None => { - return Err(LetError::ClosedStderr); - }, - }; + let mut stdout = process.stdout.ok_or(LetError::ClosedStdout)?; + let mut stderr = process.stderr.ok_or(LetError::ClosedStderr)?; + // Consume the readers into the raw text let mut stdout_text: Vec = Vec::with_capacity(DEFAULT_STD_BUFFER_SIZE); - let _n_stdout = match stdout.read_to_end(&mut stdout_text).await { - Ok(n_stdout) => n_stdout, - Err(err) => { - return Err(LetError::StdoutReadError { err }); - }, - }; + let _n_stdout = stdout.read_to_end(&mut stdout_text).await.map_err(|err| LetError::StdoutReadError { err })?; + let mut stderr_text: Vec = Vec::with_capacity(DEFAULT_STD_BUFFER_SIZE); - let _n_stderr = match stderr.read_to_end(&mut stderr_text).await { - Ok(n_stderr) => n_stderr, - Err(err) => { - return Err(LetError::StderrReadError { err }); - }, - }; + let _n_stderr = stderr.read_to_end(&mut stderr_text).await.map_err(|err| LetError::StderrReadError { err })?; + // Convert the bytes to text let stdout = String::from_utf8_lossy(&stdout_text).to_string(); let stderr = String::from_utf8_lossy(&stderr_text).to_string(); + let barrier = "-".repeat(80); // Always print stdout/stderr - debug!("Job stdout (unprocessed):\n{}\n{}\n{}\n\n", (0..80).map(|_| '-').collect::(), stdout, (0..80).map(|_| '-').collect::()); - debug!("Job stderr (unprocessed):\n{}\n{}\n{}\n\n", (0..80).map(|_| '-').collect::(), stdout, (0..80).map(|_| '-').collect::()); + debug!("Job stdout (unprocessed):\n{barrier}\n{stdout}\n{barrier}\n\n"); + debug!("Job stderr (unprocessed):\n{barrier}\n{stderr}\n{barrier}\n\n"); // If the process failed, return it does if !status.success() { // Check if it was killed - if status.signal().is_some() { - return Ok(PackageReturnState::Stopped { signal: status.signal().unwrap() }); + if let Some(signal) = status.signal() { + return Ok(PackageReturnState::Stopped { signal }); } + return Ok(PackageReturnState::Failed { code: status.code().unwrap_or(-1), stdout, stderr }); } @@ -490,8 +301,6 @@ async fn complete( Ok(PackageReturnState::Finished { stdout }) } -/// **Edited: returns LetErrors + changed to accept string instead of split stuff.** -/// /// Preprocesses stdout by only leaving the stuff that is relevant for the branelet (i.e., only that which is marked as captured by the mode). /// /// **Arguments** @@ -525,6 +334,15 @@ fn preprocess_stdout(stdout: String, mode: &Option) -> String { captured.push(line); } } + + // FIXME: The following is probably more consise & faster + // captured = stdout + // .lines() + // .skip_while(|line| !line.trim_start().starts_with(MARK_START)) + // .skip(1) + // .take_while(|line| !line.trim_start().starts_with(MARK_END)) + // .inspect(|line| debug!("Captured: {line}")) + // .collect::>(); }, "prefixed" => { for line in stdout.lines() { @@ -534,6 +352,13 @@ fn preprocess_stdout(stdout: String, mode: &Option) -> String { captured.push(trimmed); } } + // FIXME: The following is probably more consise & faster + // captured = stdout + // .lines() + // .filter(|line| line.starts_with(PREFIX)) + // .map(|line| line.trim_start_matches(PREFIX)) + // .inspect(|line| debug!("Captured: {line}")) + // .collect::>(); }, _ => panic!("Encountered illegal capture mode '{}'; this should never happen!", mode), } @@ -564,12 +389,7 @@ fn decode(result: PackageReturnState, mode: &Option) -> Result = match serde_yaml::from_str(&stdout) { - Ok(value) => value, - Err(err) => { - return Err(LetError::DecodeError { stdout, err }); - }, - }; + let output: HashMap = serde_yaml::from_str(&stdout).map_err(|err| LetError::DecodeError { stdout, err })?; // Get the only key if output.len() > 1 { @@ -595,195 +415,3 @@ fn decode(result: PackageReturnState, mode: &Option) -> Result, -// ) -> Result, DecodeError> { -// // Get the hashmap variant of the YAML data -// let map = match value.as_hash() { -// Some(map) => map, -// None => { return Err(DecodeError::NotAHash); } -// }; - -// // Go through the parameters to try to get them all -// let mut output = Map::::new(); -// for p in parameters { -// // Try to get this parameter from the map -// let key = Yaml::from_str(p.name.as_str()); -// let value = &map[&key]; - -// // Match the values -// let value = match value { -// Yaml::Array(elements) => { -// // Get the expected array type as everything before the '[]' in the typename as provided by container.yml -// let n = match p.data_type.find('[') { -// Some(n) => n, -// None => { return Err(DecodeError::OutputTypeMismatch{ name: p.name.clone(), expected: p.data_type.clone(), got: "Array".to_string() }); } -// }; -// let value_type: String = p.data_type.chars().take(n).collect(); - -// // Unwrap the entry values as the expected type -// let mut values = vec![]; -// for element in elements.iter() { -// let variable = unwrap_yaml_value(element, &value_type, &p.name)?; -// values.push(variable); -// } - -// // Return the value as an Array -// let data_type = p.data_type.to_string(); -// FullValue::Array { values } -// } -// Yaml::Hash(_) => unwrap_yaml_struct(value, &p.data_type, types, &p.name)?, -// Yaml::BadValue => { return Err(DecodeError::MissingOutputArgument{ name: p.name.clone() }); } -// _ => unwrap_yaml_value(value, &p.data_type, &p.name)?, -// }; - -// output.insert(p.name.clone(), value); -// } - -// // Done! -// Ok(output) -// } - -// /// **Edited: now returning DecodeErrors.** -// /// -// /// Converts a given Yaml Hash value to a Value struct. -// /// -// /// **Arguments** -// /// * `value`: The YAML value to parse. -// /// * `data_type`: The data type to parse the value as. -// /// * `types`: A list of class types that we know of at the time of parsing. -// /// * `p_name`: The name of the output argument we're currently parsing. Used for writing sensible errors only. -// fn unwrap_yaml_struct( -// value: &Yaml, -// data_type: &str, -// types: &Map, -// p_name: &str, -// ) -> Result { -// // Try to get the class type -// let class_type = match types.get(data_type) { -// Some(class_type) => class_type, -// None => { return Err(DecodeError::UnknownClassType{ name: p_name.to_string(), class_name: data_type.to_string() }); } -// }; -// let mut values = Map::::new(); - -// // Loop through the properties of this class to parse them all -// for p in &class_type.properties { -// // Define the temporary p_name -// let mut class_p_name = String::from(p_name); class_p_name.push('.'); class_p_name.push_str(&p.name); - -// // Get the property -// let prop_value = value[p.name.as_str()].clone(); -// if let Yaml::BadValue = prop_value { return Err(DecodeError::MissingStructProperty{ name: p_name.to_string(), class_name: data_type.to_string(), property_name: p.name.clone() }); } -// let prop = unwrap_yaml_value(&prop_value, &p.data_type, &class_p_name)?; - -// // Insert it into the list -// values.insert(p.name.to_string(), prop); -// } - -// // Return the new struct -// Ok(FullValue::Instance { -// values, -// name : class_type.name, -// }) -// } - -// /// **Edited: now returning DecodeErrors.** -// /// -// /// Converts a given Yaml value to a Value value. -// /// -// /// **Arguments** -// /// * `value`: The YAML value to parse. -// /// * `data_type`: The data type to parse the value as. -// /// * `p_name`: The name of the output argument we're currently parsing. Used for writing sensible errors only. -// fn unwrap_yaml_value( -// value: &Yaml, -// data_type: &str, -// p_name: &str, -// ) -> Result { -// debug!("Unwrapping as {}: {:?} ", data_type, value); - -// // Match on the data type -// let value = match data_type { -// "boolean" => { -// match value.as_bool() { -// Some(value) => FullValue::Boolean(value), -// None => { return Err(DecodeError::OutputTypeMismatch{ name: p_name.to_string(), expected: data_type.to_string(), got: "Boolean".to_string() }) }, -// } -// } -// "File[]" => { -// // It's an array of files -// if let Yaml::Array(elements) = value { -// // Go through each of the elements, recursing to parse those -// let mut entries = vec![]; -// for element in elements.iter() { -// let variable = unwrap_yaml_value(element, "File", p_name)?; -// entries.push(variable); -// } - -// // Construct an array with the parsed values -// Value::Array { -// data_type: data_type.to_string(), -// entries, -// } -// } else { -// return Err(DecodeError::OutputTypeMismatch{ name: p_name.to_string(), expected: data_type.to_string(), got: "a non-array".to_string() }); -// } -// } -// "Directory" | "File" => { -// // We expected a string URL now -// let url = match value.as_str() { -// Some(value) => Value::Unicode(String::from(value)), -// None => { -// // Pimp the expected type a little before returning -// let mut expected = String::from(data_type); expected.push_str(" (URL as String)"); -// return Err(DecodeError::OutputTypeMismatch{ name: p_name.to_string(), expected, got: "a non-string".to_string() }); -// } -// }; - -// // Create a struct to wrap this property -// let mut properties: Map = Default::default(); -// properties.insert(String::from("url"), url); - -// // Return it -// Value::Struct { -// data_type: String::from(data_type), -// properties, -// } -// } -// "integer" => { -// match value.as_i64() { -// Some(value) => Value::Integer(value), -// None => { return Err(DecodeError::OutputTypeMismatch{ name: p_name.to_string(), expected: data_type.to_string(), got: "a non-integer".to_string() }); } -// } -// } -// "real" => { -// match value.as_f64() { -// Some(value) => Value::Real(value), -// None => { return Err(DecodeError::OutputTypeMismatch{ name: p_name.to_string(), expected: data_type.to_string(), got: "a non-float".to_string() }); } -// } -// } -// _ => { -// // Otherwise, just get as a string(?) -// match value.as_str() { -// Some(value) => Value::Unicode(String::from(value)), -// None => { return Err(DecodeError::OutputTypeMismatch{ name: p_name.to_string(), expected: data_type.to_string(), got: "a non-string".to_string() }); } -// } -// } -// }; - -// Ok(value) -// } diff --git a/brane-let/src/main.rs b/brane-let/src/main.rs index 770d7147..23d7ea54 100644 --- a/brane-let/src/main.rs +++ b/brane-let/src/main.rs @@ -97,43 +97,11 @@ async fn main() { debug!("BRANELET v{}", env!("CARGO_PKG_VERSION")); debug!("Initializing..."); - // // Mount DFS via JuiceFS. - // if let Some(ref mount_dfs) = opts.mount_dfs { - // debug!("Initializing JuiceFS..."); - // // Try to run the command - // let mut command = Command::new("/juicefs"); - // command.args(vec!["mount", "-d", mount_dfs, "/data"]); - // command.stdout(Stdio::piped()); - // command.stderr(Stdio::piped()); - // debug!(" > Running '{:?}'", &command); - // let output = match command.output() { - // Ok(output) => output, - // Err(err) => { log::error!("{}", LetError::JuiceFSLaunchError{ command: format!("{:?}", command), err }); std::process::exit(-1); } - // }; - - // // Make sure we completed OK - // debug!(" > Return status: {}", output.status); - // if !output.status.success() { - // log::error!("{}", LetError::JuiceFSError{ command: format!("{:?}", command), code: output.status.code().unwrap_or(-1), stdout: String::from_utf8_lossy(&output.stdout).to_string(), stderr: String::from_utf8_lossy(&output.stderr).to_string() }); - // std::process::exit(-1); - // } - // } - // Start redirector in the background, if proxy address is set. if proxy_address.is_some() { warn!("Proxy is not implemented anymore"); } - // // Callbacks may be called at any time of the execution. - // debug!("Initializing callback..."); - // let callback: Option = match callback_to { - // Some(callback_to) => match Callback::new(application_id, location_id, job_id, callback_to).await { - // Ok(callback) => Some(callback), - // Err(err) => { log::error!("Could not setup callback connection: {}", err); std::process::exit(-1); } - // }, - // None => None, - // }; - // Wrap actual execution, so we can always log errors. match run(sub_command).await { Ok(code) => process::exit(code), @@ -159,9 +127,6 @@ async fn run( // callback: Option, ) -> Result { // // We've initialized! - // if let Some(ref mut callback) = callback { - // if let Err(err) = callback.ready().await { log::error!("Could not update driver on Ready: {}", err); } - // } // Switch on the sub_command to do the actual work let output = match sub_command { @@ -177,18 +142,10 @@ async fn run( Ok(output) => output, Err(err) => { let err = LetError::ResultJSONError { value: format!("{result:?}"), err }; - // if let Some(ref mut callback) = callback { - // if let Err(err) = callback.decode_failed(format!("{}", err)).await { log::error!("Could not update driver on DecodeFailed: {}", err); } - // } return Err(err); }, }; - // If that went successfull, output the result in some way - // if let Some(ref mut callback) = callback { - // // Use the callback to report it - // if let Err(err) = callback.finished(output).await { log::error!("Could not update driver on Finished: {}", err); } - // } else { // Print to stdout as (base64-encoded) JSON if std::env::vars().any(|(name, value)| name == OUTPUT_PREFIX_NAME && value == "1") { debug!("Writing output prefix enabled"); @@ -203,11 +160,7 @@ async fn run( Ok(PackageResult::Failed { code, stdout, stderr }) => { // Back it up to the user - // if let Some(ref mut callback) = callback { - // // Use the callback to report it - // if let Err(err) = callback.failed(code, stdout, stderr).await { log::error!("Could not update driver on Failed: {}", err); } - // } else { - // Gnerate the line divider + // Generate the line divider let lines = (0..80).map(|_| '-').collect::(); // Print to stderr log::error!( @@ -227,13 +180,8 @@ async fn run( Ok(PackageResult::Stopped { signal }) => { // Back it up to the user - // if let Some(ref mut callback) = callback { - // // Use the callback to report it - // if let Err(err) = callback.stopped(signal).await { log::error!("Could not update driver on Stopped: {}", err); } - // } else { // Print to stderr log::error!("Internal package call was forcefully stopped with signal {}", signal); - // } Ok(-1) }, From a2424d474b7bc580e11078be9cb49def0e1e0fe6 Mon Sep 17 00:00:00 2001 From: Lut99 Date: Mon, 4 Nov 2024 11:27:38 +0100 Subject: [PATCH 2/2] Fixed 3/4 FIXME's Test pending, need to try out on a VM to get `branelet` container compatability lol --- brane-let/src/exec_ecu.rs | 92 ++++++--------------------------------- brane-let/src/main.rs | 7 +-- 2 files changed, 16 insertions(+), 83 deletions(-) diff --git a/brane-let/src/exec_ecu.rs b/brane-let/src/exec_ecu.rs index d5884fc3..bec00f98 100644 --- a/brane-let/src/exec_ecu.rs +++ b/brane-let/src/exec_ecu.rs @@ -4,7 +4,7 @@ // Created: // 20 Sep 2022, 13:55:30 // Last edited: -// 25 May 2023, 20:43:21 +// 04 Nov 2024, 11:19:13 // Auto updated? // Yes // @@ -16,17 +16,16 @@ use std::collections::HashMap; use std::os::unix::process::ExitStatusExt; use std::path::{Path, PathBuf}; -use std::process::{Command, Stdio}; +use std::process::{Command, ExitStatus, Stdio}; use brane_exe::FullValue; use log::{debug, info}; use specifications::container::{Action, ActionCommand, LocalContainerInfo}; use tokio::io::AsyncReadExt as _; use tokio::process::{Child as TokioChild, Command as TokioCommand}; -use tokio::time::{self, Duration}; // use crate::callback::Callback; -use crate::common::{HEARTBEAT_DELAY, Map, PackageResult, PackageReturnState, assert_input}; +use crate::common::{Map, PackageResult, PackageReturnState, assert_input}; use crate::errors::LetError; @@ -241,31 +240,7 @@ async fn complete( let mut process = process; // Handle waiting for the subprocess and doing heartbeats in a neat way, using select - // FIXME: We are not doing heartbeats anymore, so I think this can be cleaned up a bit - let status = loop { - // Prepare the timer - let sleep = time::sleep(Duration::from_millis(HEARTBEAT_DELAY)); - tokio::pin!(sleep); - - // Wait for either the timer or the process - let status = tokio::select! { - status = process.wait() => { - // Process is finished! - Some(status) - }, - _ = &mut sleep => { - // Stop without result - None - }, - }; - - // If we have a result, break from the main loop; otherwise, try again - if let Some(status) = status { - break status; - } - }; - - let status = status.map_err(|err| LetError::PackageRunError { err })?; + let status: ExitStatus = process.wait().await.map_err(|err| LetError::PackageRunError { err })?; // Try to get stdout and stderr readers let mut stdout = process.stdout.ok_or(LetError::ClosedStdout)?; @@ -282,8 +257,8 @@ async fn complete( let stdout = String::from_utf8_lossy(&stdout_text).to_string(); let stderr = String::from_utf8_lossy(&stderr_text).to_string(); - let barrier = "-".repeat(80); // Always print stdout/stderr + let barrier = "-".repeat(80); debug!("Job stdout (unprocessed):\n{barrier}\n{stdout}\n{barrier}\n\n"); debug!("Job stderr (unprocessed):\n{barrier}\n{stderr}\n{barrier}\n\n"); @@ -311,59 +286,20 @@ async fn complete( /// The preprocessed stdout. fn preprocess_stdout(stdout: String, mode: &Option) -> String { let mode = mode.clone().unwrap_or_else(|| String::from("complete")); - - let mut captured = Vec::new(); match mode.as_str() { - "complete" => return stdout, - "marked" => { - let mut capture = false; - - for line in stdout.lines() { - if line.trim_start().starts_with(MARK_START) { - capture = true; - continue; - } - - // Stop capturing after observing MARK_END after MARK_START - if capture && line.trim_start().starts_with(MARK_END) { - break; - } - - if capture { - debug!("captured: {}", line); - captured.push(line); - } - } - - // FIXME: The following is probably more consise & faster - // captured = stdout - // .lines() - // .skip_while(|line| !line.trim_start().starts_with(MARK_START)) - // .skip(1) - // .take_while(|line| !line.trim_start().starts_with(MARK_END)) - // .inspect(|line| debug!("Captured: {line}")) - // .collect::>(); - }, + "complete" => stdout, + "marked" => stdout + .lines() + .skip_while(|line| !line.trim_start().starts_with(MARK_START)) + .skip(1) + .take_while(|line| !line.trim_start().starts_with(MARK_END)) + .collect::>() + .join("\n"), "prefixed" => { - for line in stdout.lines() { - if line.starts_with(PREFIX) { - let trimmed = line.trim_start_matches(PREFIX); - debug!("captured: {}", trimmed); - captured.push(trimmed); - } - } - // FIXME: The following is probably more consise & faster - // captured = stdout - // .lines() - // .filter(|line| line.starts_with(PREFIX)) - // .map(|line| line.trim_start_matches(PREFIX)) - // .inspect(|line| debug!("Captured: {line}")) - // .collect::>(); + stdout.lines().filter(|line| line.starts_with(PREFIX)).map(|line| line.trim_start_matches(PREFIX)).collect::>().join("\n") }, _ => panic!("Encountered illegal capture mode '{}'; this should never happen!", mode), } - - captured.join("\n") } diff --git a/brane-let/src/main.rs b/brane-let/src/main.rs index 23d7ea54..c4a4ad45 100644 --- a/brane-let/src/main.rs +++ b/brane-let/src/main.rs @@ -4,7 +4,7 @@ // Created: // 20 Sep 2022, 13:53:43 // Last edited: -// 02 Oct 2023, 17:13:16 +// 04 Nov 2024, 11:13:25 // Auto updated? // Yes // @@ -126,8 +126,6 @@ async fn run( sub_command: SubCommand, // callback: Option, ) -> Result { - // // We've initialized! - // Switch on the sub_command to do the actual work let output = match sub_command { SubCommand::Code { function, arguments, working_dir } => exec_ecu::handle(function, decode_b64(arguments)?, working_dir).await, @@ -161,7 +159,7 @@ async fn run( Ok(PackageResult::Failed { code, stdout, stderr }) => { // Back it up to the user // Generate the line divider - let lines = (0..80).map(|_| '-').collect::(); + let lines = "-".repeat(80); // Print to stderr log::error!( "Internal package call return non-zero exit code {}\n\nstdout:\n{}\n{}\n{}\n\nstderr:\n{}\n{}\n{}\n\n", @@ -173,7 +171,6 @@ async fn run( stderr, &lines ); - // } Ok(code) },