diff --git a/.gitignore b/.gitignore index e02456b..742cb0b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +tests/test-workdir/* target/ .idea *.iml diff --git a/Cargo.lock b/Cargo.lock index 7b7d603..b6a5e1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -614,6 +614,18 @@ dependencies = [ "synstructure", ] +[[package]] +name = "filetime" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d34cfa13a63ae058bfa601fe9e313bbdb3746427c1459185464ce0fcf62e1e8" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall 0.2.5", + "winapi 0.3.9", +] + [[package]] name = "fixedbitset" version = "0.2.0" @@ -676,6 +688,25 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ab7d1bd1bd33cc98b0889831b72da23c0aa4df9cec7e0702f46ecea04b35db6" +dependencies = [ + "bitflags", + "fsevent-sys", +] + +[[package]] +name = "fsevent-sys" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f41b048a94555da0f42f1d632e2e19510084fb8e303b0daa2816e733fb3644a0" +dependencies = [ + "libc", +] + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -1020,6 +1051,26 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "inotify" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4816c66d2c8ae673df83366c18341538f234a26d65a9ecea5c348b453ac1d02f" +dependencies = [ + "bitflags", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.9" @@ -1093,6 +1144,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.86" @@ -1215,6 +1272,18 @@ dependencies = [ "winapi 0.2.8", ] +[[package]] +name = "mio-extras" +version = "2.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" +dependencies = [ + "lazycell", + "log", + "mio", + "slab", +] + [[package]] name = "mio-uds" version = "0.6.8" @@ -1255,6 +1324,24 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "notify" +version = "4.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80ae4a7688d1fab81c5bf19c64fc8db920be8d519ce6336ed4e7efe024724dbd" +dependencies = [ + "bitflags", + "filetime", + "fsevent", + "fsevent-sys", + "inotify", + "libc", + "mio", + "mio-extras", + "walkdir", + "winapi 0.3.9", +] + [[package]] name = "num-bigint" version = "0.2.6" @@ -1386,7 +1473,7 @@ checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" dependencies = [ "instant", "lock_api 0.4.2", - "parking_lot_core 0.8.2", + "parking_lot_core 0.8.3", ] [[package]] @@ -1405,14 +1492,14 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ccb628cad4f84851442432c60ad8e1f607e29752d0bf072cbd0baf28aa34272" +checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" dependencies = [ "cfg-if 1.0.0", "instant", "libc", - "redox_syscall 0.1.57", + "redox_syscall 0.2.5", "smallvec", "winapi 0.3.9", ] @@ -1672,7 +1759,7 @@ checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" dependencies = [ "libc", "rand_chacha 0.3.0", - "rand_core 0.6.1", + "rand_core 0.6.2", "rand_hc 0.3.0", ] @@ -1703,7 +1790,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" dependencies = [ "ppv-lite86", - "rand_core 0.6.1", + "rand_core 0.6.2", ] [[package]] @@ -1732,9 +1819,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5" +checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" dependencies = [ "getrandom 0.2.2", ] @@ -1763,7 +1850,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" dependencies = [ - "rand_core 0.6.1", + "rand_core 0.6.2", ] [[package]] @@ -1836,9 +1923,9 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] name = "redox_syscall" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ec8ca9416c5ea37062b502703cd7fcb207736bc294f6e0cf367ac6fc234570" +checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9" dependencies = [ "bitflags", ] @@ -1901,6 +1988,15 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -1912,9 +2008,18 @@ name = "secp256k1" version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2932dc07acd2066ff2e3921a4419606b220ba6cd03a9935123856cc534877056" +dependencies = [ + "secp256k1-sys 0.1.2", +] + +[[package]] +name = "secp256k1" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6179428c22c73ac0fbb7b5579a56353ce78ba29759b3b8575183336ea74cdfb" dependencies = [ "rand 0.6.5", - "secp256k1-sys", + "secp256k1-sys 0.3.0", "serde", ] @@ -1927,6 +2032,15 @@ dependencies = [ "cc", ] +[[package]] +name = "secp256k1-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11553d210db090930f4432bea123b31f70bbf693ace14504ea2a35e796c28dd2" +dependencies = [ + "cc", +] + [[package]] name = "semver" version = "0.10.0" @@ -2196,7 +2310,7 @@ dependencies = [ "cfg-if 1.0.0", "libc", "rand 0.8.3", - "redox_syscall 0.2.4", + "redox_syscall 0.2.5", "remove_dir_all", "winapi 0.3.9", ] @@ -2376,11 +2490,11 @@ dependencies = [ [[package]] name = "tracing-futures" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ - "pin-project 0.4.27", + "pin-project 1.0.5", "tracing", ] @@ -2522,6 +2636,17 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" +[[package]] +name = "walkdir" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "777182bc735b6424e1a57516d35ed72cb8019d85c8c9bf536dccb3445c1a2f7d" +dependencies = [ + "same-file", + "winapi 0.3.9", + "winapi-util", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -2644,7 +2769,7 @@ dependencies = [ "mime", "openssl", "rand 0.6.5", - "secp256k1", + "secp256k1 0.19.0", "serde", "serde_json", "serde_qs", @@ -2665,7 +2790,7 @@ dependencies = [ "hex", "openssl", "rand 0.7.3", - "secp256k1", + "secp256k1 0.19.0", "serde", "serde_json", "strum 0.19.5", @@ -2777,13 +2902,16 @@ dependencies = [ "futures-util", "gftp", "log", + "notify", "pin-project 1.0.5", - "secp256k1", + "secp256k1 0.17.2", "semver 0.10.0", "serde", "serde_json", "sha3 0.9.1", "structopt", + "tempfile", + "thiserror", "tokio", "url", "ya-agreement-utils", diff --git a/Cargo.toml b/Cargo.toml index d4e2046..83acf70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,24 +18,28 @@ ya-agreement-utils = "0.2" actix = "0.9" actix-rt = "1.0" -anyhow = "1.0.28" +anyhow = "^1.0" bigdecimal = "0.1.0" -chrono = "0.4.10" -dotenv = "0.15.0" -env_logger = "0.6" -futures = "0.3" +chrono = { version = "0.4.10", features = ["clock"] } +dotenv = "^0.15.0" +env_logger = "^0.6" +futures = "^0.3" futures-core = "0.3.8" futures-util = "0.3.7" -log = "0.4" +log = "^0.4" pin-project = "1.0.2" secp256k1 = "0.17" -semver = "0.10.0" -serde = "1.0.118" -serde_json = "1.0" +semver = "^0.10.0" +serde = "^1.0" +serde_json = "^1.0" sha3 = "0.9.1" +tempfile = "^3.1.0" +thiserror = "^1.0" tokio = { version = "0.2.10", features = ["fs"] } url = "2.1.1" +notify = "4.0.12" + [dev-dependencies] structopt = "0.3" tokio = { version = "0.2.10", features = ["macros"] } @@ -54,4 +58,4 @@ enable-all-features = true #ya-service-bus = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "6b494e17d7a662e0b710af8c5a2e99ab4007fdb9"} #ya-sb-proto = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "6b494e17d7a662e0b710af8c5a2e99ab4007fdb9"} ##ya-sb-router = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "6b494e17d7a662e0b710af8c5a2e99ab4007fdb9"} -#ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "6b494e17d7a662e0b710af8c5a2e99ab4007fdb9"} \ No newline at end of file +#ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "6b494e17d7a662e0b710af8c5a2e99ab4007fdb9"} diff --git a/src/rest.rs b/src/rest.rs index 4efb182..5b0c936 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -1,9 +1,14 @@ pub mod activity; mod async_drop; mod market; +mod sgx; pub mod streaming; +mod transfer; pub use activity::{Activity, Credentials, Event as BatchEvent, ExeScriptCommand, RunningBatch}; +pub use transfer::{ + FileTransferError, JsonTransferError, TransferCmdError, TransferInternalError, Transfers, +}; pub use ya_client::web::{WebClient, WebClientBuilder}; use futures::prelude::*; @@ -36,6 +41,15 @@ impl Session { .await } + /// Creates Activity object for existing Activity. Created object + /// isn't owner of Activity and won't destroy it on Drop. Use for Debugging. + pub async fn attach_to_activity( + &self, + activity_id: &str, + ) -> anyhow::Result { + activity::DefaultActivity::attach_to_activity(self.client.interface()?, activity_id) + } + pub async fn create_secure_activity( &self, agreement: &market::Agreement, diff --git a/src/rest/activity.rs b/src/rest/activity.rs index 766826a..ece1152 100644 --- a/src/rest/activity.rs +++ b/src/rest/activity.rs @@ -1,18 +1,21 @@ use anyhow::{anyhow, Context, Result}; - -use crate::rest::async_drop::{CancelableDropList, DropList}; use futures::future::LocalBoxFuture; use futures::prelude::*; -use futures::stream::LocalBoxStream; use futures::{FutureExt, StreamExt}; use std::sync::Arc; + +use crate::rest::async_drop::DropList; use ya_client::activity::ActivityRequestorApi; pub use ya_client::activity::SecureActivityRequestorApi; pub use ya_client::model::activity::Credentials; pub use ya_client::model::activity::ExeScriptCommand; -use ya_client::model::activity::ExeScriptRequest; +use ya_client::model::activity::{ + ActivityState, ActivityUsage, ExeScriptCommandState, ExeScriptRequest, +}; use ya_client::model::activity::{CommandResult, ExeScriptCommandResult}; +pub use super::sgx::{SgxActivity, SgxBatch}; + #[derive(Debug)] pub enum Event { StepSuccess { @@ -72,6 +75,28 @@ impl DefaultActivity { }) } + /// Debug function, that allows to attach to existing Activity. + pub(crate) fn attach_to_activity( + api: ActivityRequestorApi, + activity_id: &str, + ) -> anyhow::Result { + Ok(Self { + api, + activity_id: activity_id.to_string(), + drop_list: None, + }) + } + + /// Debug function to attach to existing batch. + pub fn attach_to_batch(&self, batch_id: &str) -> DefaultBatch { + DefaultBatch { + batch_id: batch_id.to_string(), + commands: Arc::from(vec![]), + api: self.api.clone(), + activity_id: self.activity_id.clone(), + } + } + pub async fn execute_commands( &self, commands: Vec, @@ -95,6 +120,51 @@ impl DefaultActivity { .try_collect() .await } + + pub async fn get_state(&self) -> anyhow::Result { + Ok(self + .api + .state() + .get_state(&self.activity_id) + .await + .map_err(|e| { + anyhow!( + "Failed to get state for activity [{}]. {}", + &self.activity_id, + e + ) + })?) + } + + pub async fn get_running_command(&self) -> anyhow::Result { + Ok(self + .api + .state() + .get_running_command(&self.activity_id) + .await + .map_err(|e| { + anyhow!( + "Failed to get running command for activity [{}]. {}", + &self.activity_id, + e + ) + })?) + } + + pub async fn get_usage(&self) -> anyhow::Result { + Ok(self + .api + .state() + .get_usage(&self.activity_id) + .await + .map_err(|e| { + anyhow!( + "Failed to get usage for activity [{}]. {}", + &self.activity_id, + e + ) + })?) + } } impl Drop for DefaultActivity { @@ -169,7 +239,7 @@ pub struct DefaultBatch { commands: Arc<[ExeScriptCommand]>, } -fn generate_events( +pub(crate) fn generate_events( generator: Generator, commands: Arc<[ExeScriptCommand]>, ) -> impl Stream> @@ -258,132 +328,3 @@ impl RunningBatch for DefaultBatch { .boxed_local() } } - -pub struct SgxActivity { - secure_api: SecureActivityRequestorApi, - api: ActivityRequestorApi, - activity_id: String, - drop_list: CancelableDropList, -} - -impl Drop for SgxActivity { - fn drop(&mut self) { - if let Some(ref drop_list) = self.drop_list.take() { - let api = self.api.clone(); - let id = self.activity_id.clone(); - drop_list.async_drop(async move { - api.control() - .destroy_activity(&id) - .await - .with_context(|| format!("Failed to auto destroy Activity: {:?}", id))?; - log::debug!(target:"yarapi::drop", "Activity {:?} destroyed", id); - Ok(()) - }) - } - } -} - -impl SgxActivity { - pub(crate) async fn create( - api: ActivityRequestorApi, - agreement_id: &str, - drop_list: CancelableDropList, - ) -> Result { - let secure_api = api - .control() - .create_secure_activity(agreement_id) - .await - .with_context(|| { - format!("failed to create activity for agreement {:?}", agreement_id) - })?; - let activity_id = secure_api.activity_id(); - - Ok(Self { - api, - secure_api, - activity_id, - drop_list, - }) - } -} - -impl Activity for SgxActivity { - type RunningBatch = SgxBatch; - - fn id(&self) -> &str { - &self.activity_id - } - - fn exec( - &self, - commands: Vec, - ) -> LocalBoxFuture<'static, Result> { - let api = self.secure_api.clone(); - async move { - let batch_commands = commands.clone().into(); - let batch_id = api.exec(commands).await?; - Ok(SgxBatch { - api, - batch_id, - commands: batch_commands, - }) - } - .boxed_local() - } - - fn credentials(&self) -> Option { - Some(self.secure_api.proof()) - } - - fn destroy(&self) -> LocalBoxFuture<'static, Result<()>> { - let api = self.api.clone(); - let activity_id = self.activity_id.clone(); - async move { - api.control() - .destroy_activity(&activity_id) - .await - .with_context(|| format!("failed to destroy sgx activity: {:?}", activity_id)) - } - .boxed_local() - } -} - -pub struct SgxBatch { - api: SecureActivityRequestorApi, - batch_id: String, - commands: Arc<[ExeScriptCommand]>, -} - -impl RunningBatch for SgxBatch { - fn id(&self) -> &str { - &self.batch_id - } - - fn commands(&self) -> Vec { - self.commands.iter().cloned().collect() - } - - fn events(&self) -> LocalBoxStream<'static, Result> { - let api = self.api.clone(); - let batch_id: Arc = self.batch_id.clone().into(); - - generate_events( - move |idx| { - let api = api.clone(); - let batch_id = batch_id.clone(); - async move { - loop { - match api.get_exec_batch_results(&batch_id, Some(10.0), idx).await { - Ok(v) => return Ok(v), - Err(ya_client::Error::TimeoutError { .. }) => (), - Err(ya_client::Error::InternalError(ref msg)) if msg == "Timeout" => (), - Err(e) => return Err(e.into()), - } - } - } - }, - self.commands.clone(), - ) - .boxed_local() - } -} diff --git a/src/rest/market.rs b/src/rest/market.rs index e2da3b5..ea1ca9d 100644 --- a/src/rest/market.rs +++ b/src/rest/market.rs @@ -1,14 +1,15 @@ use anyhow::{anyhow, bail, Context}; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use futures::prelude::*; use futures::TryStreamExt; use std::sync::Arc; use tokio::sync::mpsc; use crate::rest::async_drop::{CancelableDropList, DropList}; +use std::collections::HashSet; +use std::fmt::Display; use ya_client::market::MarketRequestorApi; -use ya_client::model::market::NewDemand; -use ya_client::model::market::{AgreementProposal, RequestorEvent}; +use ya_client::model::market::{NewDemand, AgreementEventType, AgreementOperationEvent, AgreementProposal, RequestorEvent,}; use ya_client::model::NodeId; use ya_client::web::WebClient; @@ -28,7 +29,7 @@ impl AsRef for SubscriptionId { } pub struct Market { - api: MarketRequestorApi, + pub api: MarketRequestorApi, drop_list: DropList, } @@ -70,6 +71,84 @@ impl Market { pub fn subscriptions(&self) -> impl Stream> { stream::empty() } + + /// Lists all Agreements for identity, that is currently used. + /// You can filter Agreements using AppSessionId parameter. + pub async fn list_agreements( + &self, + since: &DateTime, + app_session_id: Option, + ) -> anyhow::Result> + where + Tz: TimeZone, + Tz::Offset: Display, + { + Ok(self + .list_agreement_events(since, app_session_id) + .await? + .into_iter() + .filter_map(|event| match event.event_type { + AgreementEventType::AgreementApprovedEvent => Some(event.agreement_id), + _ => None, + }) + .collect()) + } + + pub async fn list_active_agreements( + &self, + since: &DateTime, + app_session_id: Option, + ) -> anyhow::Result> + where + Tz: TimeZone, + Tz::Offset: Display, + { + let mut agreements = HashSet::new(); + self.list_agreement_events(since, app_session_id) + .await? + .into_iter() + .for_each(|event| { + match event.event_type { + AgreementEventType::AgreementApprovedEvent => { + agreements.insert(event.agreement_id) + } + AgreementEventType::AgreementTerminatedEvent { .. } => { + agreements.remove(&event.agreement_id) + } + _ => false, + }; + () + }); + + Ok(agreements.into_iter().collect()) + } + + pub async fn list_agreement_events( + &self, + since: &DateTime, + app_session_id: Option, + ) -> anyhow::Result> + where + Tz: TimeZone, + Tz::Offset: Display, + { + let mut since = since.with_timezone(&Utc); + let mut events = vec![]; + + loop { + let new_events = self + .api + .collect_agreement_events(Some(0.0), Some(&since), Some(30), app_session_id.clone()) + .await?; + + if new_events.is_empty() { + return Ok(events); + } + + since = new_events.last().unwrap().event_date; + events.extend(new_events); + } + } } #[derive(Clone)] diff --git a/src/rest/sgx.rs b/src/rest/sgx.rs new file mode 100644 index 0000000..404ce2e --- /dev/null +++ b/src/rest/sgx.rs @@ -0,0 +1,143 @@ +use anyhow::{Context, Result}; +use futures::future::LocalBoxFuture; +use futures::stream::LocalBoxStream; +use futures::{FutureExt, StreamExt}; +use std::sync::Arc; + +use crate::rest::activity::{generate_events, Event}; +use crate::rest::async_drop::CancelableDropList; +use crate::rest::{Activity, RunningBatch}; + +use ya_client::activity::ActivityRequestorApi; +pub use ya_client::activity::SecureActivityRequestorApi; +pub use ya_client::model::activity::Credentials; +pub use ya_client::model::activity::ExeScriptCommand; + +pub struct SgxActivity { + secure_api: SecureActivityRequestorApi, + api: ActivityRequestorApi, + activity_id: String, + drop_list: CancelableDropList, +} + +impl Drop for SgxActivity { + fn drop(&mut self) { + if let Some(ref drop_list) = self.drop_list.take() { + let api = self.api.clone(); + let id = self.activity_id.clone(); + drop_list.async_drop(async move { + api.control() + .destroy_activity(&id) + .await + .with_context(|| format!("Failed to auto destroy Activity: {:?}", id))?; + log::debug!(target:"yarapi::drop", "Activity {:?} destroyed", id); + Ok(()) + }) + } + } +} + +impl SgxActivity { + pub(crate) async fn create( + api: ActivityRequestorApi, + agreement_id: &str, + drop_list: CancelableDropList, + ) -> Result { + let secure_api = api + .control() + .create_secure_activity(agreement_id) + .await + .with_context(|| { + format!("failed to create activity for agreement {:?}", agreement_id) + })?; + let activity_id = secure_api.activity_id(); + + Ok(Self { + api, + secure_api, + activity_id, + drop_list, + }) + } +} + +impl Activity for SgxActivity { + type RunningBatch = SgxBatch; + + fn id(&self) -> &str { + &self.activity_id + } + + fn exec( + &self, + commands: Vec, + ) -> LocalBoxFuture<'static, Result> { + let api = self.secure_api.clone(); + async move { + let batch_commands = commands.clone().into(); + let batch_id = api.exec(commands).await?; + Ok(SgxBatch { + api, + batch_id, + commands: batch_commands, + }) + } + .boxed_local() + } + + fn credentials(&self) -> Option { + Some(self.secure_api.proof()) + } + + fn destroy(&self) -> LocalBoxFuture<'static, Result<()>> { + let api = self.api.clone(); + let activity_id = self.activity_id.clone(); + async move { + api.control() + .destroy_activity(&activity_id) + .await + .with_context(|| format!("failed to destroy sgx activity: {:?}", activity_id)) + } + .boxed_local() + } +} + +pub struct SgxBatch { + api: SecureActivityRequestorApi, + batch_id: String, + commands: Arc<[ExeScriptCommand]>, +} + +impl RunningBatch for SgxBatch { + fn id(&self) -> &str { + &self.batch_id + } + + fn commands(&self) -> Vec { + self.commands.iter().cloned().collect() + } + + fn events(&self) -> LocalBoxStream<'static, Result> { + let api = self.api.clone(); + let batch_id: Arc = self.batch_id.clone().into(); + + generate_events( + move |idx| { + let api = api.clone(); + let batch_id = batch_id.clone(); + async move { + loop { + match api.get_exec_batch_results(&batch_id, Some(10.0), idx).await { + Ok(v) => return Ok(v), + Err(ya_client::Error::TimeoutError { .. }) => (), + Err(ya_client::Error::InternalError(ref msg)) if msg == "Timeout" => (), + Err(e) => return Err(e.into()), + } + } + } + }, + self.commands.clone(), + ) + .boxed_local() + } +} diff --git a/src/rest/streaming.rs b/src/rest/streaming.rs index 3fbdb39..ccac7df 100644 --- a/src/rest/streaming.rs +++ b/src/rest/streaming.rs @@ -1,5 +1,4 @@ mod batch; -mod capture_messages; mod forward_to_file; mod forward_to_std; mod messaging; @@ -10,4 +9,4 @@ pub use result_stream::ResultStream; pub use ya_client::model::activity::{CommandOutput, RuntimeEvent, RuntimeEventKind}; -pub use messaging::{send_to_guest, ExeUnitMessage}; +pub use messaging::{send_to_guest, ExeUnitMessage, MessagingExeUnit, MessagingRequestor}; diff --git a/src/rest/streaming/batch.rs b/src/rest/streaming/batch.rs index 3e9da8a..573ae0e 100644 --- a/src/rest/streaming/batch.rs +++ b/src/rest/streaming/batch.rs @@ -1,9 +1,10 @@ use anyhow::{anyhow, Result}; use futures::prelude::*; use futures::FutureExt; +use std::io::Write; use std::sync::Arc; -use crate::rest::activity::DefaultActivity; +use crate::rest::activity::{DefaultActivity, DefaultBatch}; use crate::rest::{Activity, RunningBatch}; use ya_client::activity::ActivityRequestorApi; @@ -71,6 +72,21 @@ impl StreamingBatch { } } } + + pub fn debug(self, filename: &str) -> anyhow::Result { + let debug_content = format!( + "ACTIVITY_ID={}\n\ + BATCH_ID={}\n", + &self.activity_id, &self.batch_id + ); + + std::fs::File::create(filename) + .map_err(|e| anyhow!("Failed to create debug file {}. {}", filename, e))? + .write_all(debug_content.as_bytes()) + .map_err(|e| anyhow!("Failed to write to debug file {}. {}", filename, e))?; + + Ok(self) + } } impl StreamingActivity for DefaultActivity { @@ -79,15 +95,7 @@ impl StreamingActivity for DefaultActivity { commands: Vec, ) -> future::LocalBoxFuture<'static, Result> { let batch_fut = self.exec(commands); - async move { - batch_fut.await.map(|batch| StreamingBatch { - batch_id: batch.id().to_string(), - commands: Arc::from(batch.commands()), - api: batch.api, - activity_id: batch.activity_id, - }) - } - .boxed_local() + async move { batch_fut.await.map(|batch| StreamingBatch::from(batch)) }.boxed_local() } fn run_streaming( @@ -112,3 +120,14 @@ impl StreamingActivity for DefaultActivity { self.exec_streaming(commands) } } + +impl From for StreamingBatch { + fn from(batch: DefaultBatch) -> Self { + StreamingBatch { + batch_id: batch.id().to_string(), + commands: Arc::from(batch.commands()), + api: batch.api, + activity_id: batch.activity_id, + } + } +} diff --git a/src/rest/streaming/messaging.rs b/src/rest/streaming/messaging.rs index 6c4764e..228c111 100644 --- a/src/rest/streaming/messaging.rs +++ b/src/rest/streaming/messaging.rs @@ -1,25 +1,11 @@ +pub mod capture; +mod exeunit; +mod requestor; + use serde::de::DeserializeOwned; use serde::Serialize; -use std::io::{self, Write}; - -pub trait ExeUnitMessage: Serialize + DeserializeOwned + Send + Sync {} - -pub fn encode_message(msg: &impl ExeUnitMessage) -> anyhow::Result> { - let mut data = serde_json::to_vec(msg)?; - // Add control characters - data.insert(0, 0x02 as u8); - data.push(0x03 as u8); +pub use exeunit::{encode_message, send_to_guest, MessagingExeUnit}; +pub use requestor::MessagingRequestor; - Ok(data) -} - -pub fn send_to_guest(msg: &impl ExeUnitMessage) -> anyhow::Result<()> { - let data = encode_message(msg)?; - - // Write atomically to stdout. - let mut stdout = io::stdout(); - //let mut stdout = stdout.lock(); - stdout.write(data.as_ref())?; - Ok(()) -} +pub trait ExeUnitMessage: Serialize + DeserializeOwned + Send + Sync {} diff --git a/src/rest/streaming/capture_messages.rs b/src/rest/streaming/messaging/capture.rs similarity index 99% rename from src/rest/streaming/capture_messages.rs rename to src/rest/streaming/messaging/capture.rs index d4dca3b..bc0f66f 100644 --- a/src/rest/streaming/capture_messages.rs +++ b/src/rest/streaming/messaging/capture.rs @@ -9,7 +9,7 @@ use tokio::sync::mpsc; use ya_client::model::activity::{CommandOutput, RuntimeEvent, RuntimeEventKind}; -use super::messaging::ExeUnitMessage; +use super::ExeUnitMessage; struct MessageProcessor { notifier: mpsc::UnboundedSender, diff --git a/src/rest/streaming/messaging/exeunit.rs b/src/rest/streaming/messaging/exeunit.rs new file mode 100644 index 0000000..becfca4 --- /dev/null +++ b/src/rest/streaming/messaging/exeunit.rs @@ -0,0 +1,128 @@ +use anyhow::anyhow; +use notify::{raw_watcher, Op, RecursiveMode, Watcher}; +use std::fs; +use std::io::{self, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +use super::ExeUnitMessage; + +pub fn encode_message(msg: &impl ExeUnitMessage) -> anyhow::Result> { + let mut data = serde_json::to_vec(msg)?; + + // Add control characters + data.insert(0, 0x02 as u8); + data.push(0x03 as u8); + + Ok(data) +} + +pub fn send_to_guest(msg: &impl ExeUnitMessage) -> anyhow::Result<()> { + let data = encode_message(msg)?; + + // Write atomically to stdout. + let mut stdout = io::stdout(); + //let mut stdout = stdout.lock(); + stdout.write(data.as_ref())?; + Ok(()) +} + +pub struct MessagingExeUnit { + new_message_notifier: broadcast::Sender, +} + +impl MessagingExeUnit { + pub fn new(tracked_dir: &Path) -> anyhow::Result> { + std::fs::create_dir_all(&tracked_dir).map_err(|e| { + anyhow!( + "Can't create directory [{}] for messages. {}", + &tracked_dir.display(), + e + ) + })?; + + let sender = spawn_file_notifier(&tracked_dir)?; + + Ok(Arc::new(MessagingExeUnit { + new_message_notifier: sender, + })) + } + + pub fn listen(&self) -> mpsc::UnboundedReceiver { + let (msg_sender, msg_receiver) = mpsc::unbounded_channel(); + let mut file_receiver = self.new_message_notifier.subscribe(); + + let future = async move { + while let Ok(path) = file_receiver.recv().await { + if let Some(content) = fs::read_to_string(&path) + .map_err(|e| { + log::warn!( + "[Messaging] Can't load msg from file: '{}'. {}", + &path.display(), + e + ) + }) + .ok() + { + serde_json::from_slice::(&content.as_bytes()) + .map_err(|e| { + log::warn!( + "Can't deserialize message from file '{}'. {}", + &path.display(), + e + ) + }) + .map(|msg| msg_sender.send(msg)) + .ok(); + } + } + }; + tokio::spawn(future); + return msg_receiver; + } + + pub fn send(&self, msg: &impl ExeUnitMessage) -> anyhow::Result<()> { + send_to_guest(msg) + } +} + +fn spawn_file_notifier(tracked_dir: &Path) -> anyhow::Result> { + let (event_sender, _) = broadcast::channel(150); + let sender = event_sender.clone(); + + let (watcher_sender, watcher_receiver) = std::sync::mpsc::channel(); + let mut watcher = + raw_watcher(watcher_sender).map_err(|e| anyhow!("Initializing watcher failed. {}", e))?; + + watcher + .watch(&tracked_dir, RecursiveMode::NonRecursive) + .map_err(|e| { + anyhow!( + "Starting watching directory '{}' failed. {}", + &tracked_dir.display(), + e + ) + })?; + + std::thread::spawn(move || { + // Take ownership of watcher. + let _watcher = watcher; + + while let Ok(event) = watcher_receiver.recv() { + match event.op { + Ok(Op::CLOSE_WRITE) => { + let path = match event.path { + Some(path) => path, + None => continue, + }; + event_sender.send(path).ok(); + } + _ => (), + } + } + }); + + Ok(sender) +} diff --git a/src/rest/streaming/messaging/requestor.rs b/src/rest/streaming/messaging/requestor.rs new file mode 100644 index 0000000..cd3ef42 --- /dev/null +++ b/src/rest/streaming/messaging/requestor.rs @@ -0,0 +1,34 @@ +use std::path::{Path, PathBuf}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use super::ExeUnitMessage; +use crate::rest::Transfers; + +#[derive(Clone)] +pub struct MessagingRequestor { + activity: Arc, + msg_counter: Arc, + tracked_dir: PathBuf, +} + +impl MessagingRequestor { + pub fn new( + activity: Arc, + tracked_dir: &Path, + ) -> MessagingRequestor { + MessagingRequestor { + activity, + msg_counter: Arc::new(AtomicUsize::new(0)), + tracked_dir: tracked_dir.to_path_buf(), + } + } + + pub async fn send(&self, msg: &impl ExeUnitMessage) -> anyhow::Result<()> { + let filename = format!("msg-{}", self.msg_counter.fetch_add(1, Ordering::SeqCst)); + let path = self.tracked_dir.join(filename); + + Ok(self.activity.send_json(&path, &msg).await?) + } +} diff --git a/src/rest/streaming/result_stream.rs b/src/rest/streaming/result_stream.rs index 6f39fd1..33a8efe 100644 --- a/src/rest/streaming/result_stream.rs +++ b/src/rest/streaming/result_stream.rs @@ -2,9 +2,9 @@ use futures::prelude::*; use std::path::Path; use tokio::sync::mpsc; -use super::capture_messages::CaptureMessages; use super::forward_to_file::ForwardToFile; use super::forward_to_std::ForwardStd; +use super::messaging::capture::CaptureMessages; use super::messaging::ExeUnitMessage; use ya_client::model::activity::RuntimeEvent; @@ -27,6 +27,8 @@ pub trait ResultStream: Stream { ForwardToFile::new(self, stdout, stderr) } + /// Uses passed channel to notify about messages discovered in stream. + /// Recognized messages are removed from stream. fn capture_messages( self, notifier: mpsc::UnboundedSender, diff --git a/src/rest/transfer.rs b/src/rest/transfer.rs new file mode 100644 index 0000000..2de13ec --- /dev/null +++ b/src/rest/transfer.rs @@ -0,0 +1,205 @@ +use anyhow::*; +use futures::prelude::*; +use futures::TryStreamExt; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::io::BufReader; +use std::path::Path; +use url::Url; + +use crate::rest::activity::Event; +use crate::rest::{Activity, ExeScriptCommand, RunningBatch}; + +#[derive(thiserror::Error, Debug)] +pub enum FileTransferError { + #[error(transparent)] + Transfer(#[from] TransferCmdError), + #[error("[{path}] is invalid URL. {e}")] + BadUrl { path: String, e: String }, + #[error("Gftp error transferring from [{src}] to [{dest}]: {e}")] + Gftp { + src: String, + dest: String, + e: String, + }, +} + +#[derive(thiserror::Error, Debug)] +pub enum TransferCmdError { + #[error("Error transferring file from [{src}] to [{dest}]. Error: {e}")] + CommandError { + src: String, + dest: String, + e: String, + }, +} + +#[derive(thiserror::Error, Debug)] +pub enum JsonTransferError { + #[error("Error sending json. {0}")] + Transfer(#[from] FileTransferError), + #[error("Failed to serialize object to temp file. {0}")] + Serialization(String), + #[error("Failed to deserialize object from temp file. {0}")] + Deserialization(String), + #[error("Error sending json. {0}")] + Internal(#[from] TransferInternalError), +} + +#[derive(thiserror::Error, Debug)] +pub enum TransferInternalError { + #[error("Failed to create temporary file. {0}")] + CreatingTemporary(String), + #[error("Can't persist temporary file. {0}")] + PersistTemporary(String), +} + +pub trait Transfers: Activity { + fn send_file<'a>( + &'a self, + src: &Path, + dest: &Path, + ) -> future::LocalBoxFuture<'a, Result<(), FileTransferError>> { + let src = src.to_path_buf(); + let dest = format!("container:{}", dest.display()); + + async move { + let dest = Url::parse(&dest).map_err(|e| FileTransferError::BadUrl { + path: dest.clone(), + e: e.to_string(), + })?; + + let src = gftp::publish(&src) + .await + .map_err(|e| FileTransferError::Gftp { + src: src.to_string_lossy().to_string(), + dest: dest.to_string(), + e: e.to_string(), + })?; + + self.transfer(&src, &dest) + .await + .map_err(FileTransferError::from) + } + .boxed_local() + } + + fn send_json<'a, T: Serialize>( + &'a self, + dest: &Path, + to_serialize: &'a T, + ) -> future::LocalBoxFuture<'a, Result<(), JsonTransferError>> { + let dest = dest.to_path_buf(); + async move { + let (file, file_path) = tempfile::NamedTempFile::new() + .map_err(|e| TransferInternalError::CreatingTemporary(e.to_string()))? + .keep() + .map_err(|e| TransferInternalError::PersistTemporary(e.to_string()))?; + + serde_json::to_writer(file, to_serialize) + .map_err(|e| JsonTransferError::Serialization(e.to_string()))?; + + Ok(self.send_file(&file_path, &dest).await?) + } + .boxed_local() + } + + fn download_file<'a>( + &'a self, + src: &Path, + dest: &Path, + ) -> future::LocalBoxFuture<'a, Result<(), FileTransferError>> { + let dest = dest.to_path_buf(); + let src = format!("container:{}", src.display()); + + async move { + let src = Url::parse(&src).map_err(|e| FileTransferError::BadUrl { + path: src.to_string(), + e: e.to_string(), + })?; + + let dest = gftp::open_for_upload(&dest) + .await + .map_err(|e| FileTransferError::Gftp { + src: src.to_string(), + dest: dest.to_string_lossy().to_string(), + e: e.to_string(), + })?; + + Ok(self.transfer(&src, &dest).await?) + } + .boxed_local() + } + + fn download_json( + &self, + src: &Path, + ) -> future::LocalBoxFuture> { + let src = src.to_path_buf(); + async move { + let (file, file_path) = tempfile::NamedTempFile::new() + .map_err(|e| TransferInternalError::CreatingTemporary(e.to_string()))? + .keep() + .map_err(|e| TransferInternalError::PersistTemporary(e.to_string()))?; + + self.download_file(&src, &file_path).await?; + + let reader = BufReader::new(file); + serde_json::from_reader(reader) + .map_err(|e| JsonTransferError::Deserialization(e.to_string())) + } + .boxed_local() + } + + fn transfer<'a>( + &'a self, + src: &Url, + dest: &Url, + ) -> future::LocalBoxFuture<'a, Result<(), TransferCmdError>> { + let src = src.to_string(); + let dest = dest.to_string(); + + async move { + let commands = vec![ExeScriptCommand::Transfer { + from: src.clone(), + to: dest.clone(), + args: Default::default(), + }]; + + if let Err(e) = self.execute_commands(commands).await { + let error = TransferCmdError::CommandError { + src: src.clone(), + dest: dest.clone(), + e: e.to_string(), + }; + log::error!("{}", &error); + return Err(error); + } + Ok(()) + } + .boxed_local() + } + + fn execute_commands( + &self, + commands: Vec, + ) -> future::LocalBoxFuture>> { + let batch = self.exec(commands); + async move { + batch + .await? + .events() + .and_then(|event| match event { + Event::StepFailed { message } => { + future::err::(anyhow!("Step failed: {}", message)) + } + Event::StepSuccess { output, .. } => future::ok(output), + }) + .try_collect() + .await + } + .boxed_local() + } +} + +impl Transfers for T where T: Activity {} diff --git a/tests/test_messaging.rs b/tests/test_messaging.rs new file mode 100644 index 0000000..c1d482a --- /dev/null +++ b/tests/test_messaging.rs @@ -0,0 +1,56 @@ +mod testing; + +use testing::prepare_test_dir; + +use serde::{Deserialize, Serialize}; +use std::fs::File; +use std::path::Path; + +use std::time::Duration; +use yarapi::rest::streaming::{ExeUnitMessage, MessagingExeUnit}; + +#[derive(Serialize, Deserialize)] +pub enum Messages { + Progress(f64), + Info(String), +} + +impl ExeUnitMessage for Messages {} + +fn emulate_send(msg: impl ExeUnitMessage, dir: &Path, idx: u32) -> anyhow::Result<()> { + let path = dir.join(format!("msg-{}.json", idx)); + let file = File::create(path)?; + + Ok(serde_json::to_writer(file, &msg)?) +} + +#[actix_rt::test] +async fn test_messaging_to_exeunit() -> anyhow::Result<()> { + let dir = prepare_test_dir("test_messaging_to_exeunit")?; + env_logger::init(); + + let receiver = MessagingExeUnit::new(&dir).unwrap(); + let mut events = receiver.listen::(); + + emulate_send(Messages::Progress(0.02), &dir, 1).unwrap(); + + let msg = tokio::time::timeout(Duration::from_millis(500), events.recv()) + .await? + .unwrap(); + match msg { + Messages::Progress(progress) => assert_eq!(progress, 0.02), + _ => panic!("Expected Messages::Progress"), + }; + + emulate_send(Messages::Progress(0.06), &dir, 2).unwrap(); + + let msg = tokio::time::timeout(Duration::from_millis(500), events.recv()) + .await? + .unwrap(); + match msg { + Messages::Progress(progress) => assert_eq!(progress, 0.06), + _ => panic!("Expected Messages::Progress"), + }; + + Ok(()) +} diff --git a/tests/testing.rs b/tests/testing.rs new file mode 100644 index 0000000..bcc11a6 --- /dev/null +++ b/tests/testing.rs @@ -0,0 +1,28 @@ +use anyhow::{Context, Result}; +use std::fs; +use std::path::PathBuf; + +fn test_data_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests") + .join("test-workdir") +} + +fn escape_path(path: &str) -> String { + // Windows can't handle colons + path.replace("::", "_").to_string() +} + +pub fn prepare_test_dir(dir_name: &str) -> Result { + let test_dir: PathBuf = test_data_dir().join(escape_path(dir_name).as_str()); + + log::info!("Preparing test directory: {}", test_dir.display()); + + if test_dir.exists() { + fs::remove_dir_all(&test_dir) + .with_context(|| format!("Removing test directory: {}", test_dir.display()))?; + } + fs::create_dir_all(&test_dir) + .with_context(|| format!("Creating test directory: {}", test_dir.display()))?; + Ok(test_dir) +}