diff --git a/Cargo.lock b/Cargo.lock index 05b922e..92c014e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2164,6 +2164,22 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" +[[package]] +name = "testing-scenarios" +version = "0.1.0" +dependencies = [ + "anyhow", + "assert_fs", + "blake2", + "file-hashing", + "messages", + "myceli", + "rand", + "tracing", + "tracing-subscriber", + "transports", +] + [[package]] name = "thiserror" version = "1.0.40" diff --git a/Cargo.toml b/Cargo.toml index 77224f1..cb6f0ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "local-storage", "messages", "myceli", + "testing/testing-scenarios", "transports" ] @@ -57,4 +58,5 @@ tracing-subscriber = { version = "0.3.14", default-features = false, features = ipfs-unixfs = { path = "ipfs-unixfs" } local-storage = { path = "local-storage" } messages = { path = "messages" } +myceli = { path = "myceli" } transports = { path = "transports" } diff --git a/local-storage/src/provider.rs b/local-storage/src/provider.rs index 6ea747b..cd5639a 100644 --- a/local-storage/src/provider.rs +++ b/local-storage/src/provider.rs @@ -303,9 +303,9 @@ impl StorageProvider for SqliteStorageProvider { WITH RECURSIVE cids(x) AS ( VALUES(?1) UNION - SELECT block_cid FROM links JOIN cids ON root_cid=x + SELECT block_cid FROM links JOIN cids ON root_cid=x WHERE block_id IS NOT null ) - SELECT x FROM cids; + SELECT cid FROM blocks WHERE cid in cids ", )? .query_map([cid], |row| { @@ -524,4 +524,43 @@ pub mod tests { 0 ); } + + #[test] + pub fn test_verify_get_all_cids() { + let harness = TestHarness::new(); + + let cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"00")); + let cid_str = cid.to_string(); + let block_cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"11")); + let child_cid_str = block_cid.to_string(); + + let other_child_cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"11")); + + let block = StoredBlock { + cid: cid_str.to_string(), + data: vec![], + links: vec![block_cid.to_string(), other_child_cid.to_string()], + filename: None, + }; + + let child_block = StoredBlock { + cid: block_cid.to_string(), + data: b"101293910101".to_vec(), + links: vec![], + filename: None, + }; + + harness.provider.import_block(&block).unwrap(); + + let dag_cids = harness.provider.get_all_dag_cids(&cid_str).unwrap(); + assert_eq!(dag_cids, vec![cid_str.to_string()]); + + harness.provider.import_block(&child_block).unwrap(); + + let dag_cids = harness.provider.get_all_dag_cids(&cid_str).unwrap(); + assert_eq!( + dag_cids, + vec![child_cid_str.to_string(), cid_str.to_string()] + ); + } } diff --git a/local-storage/src/storage.rs b/local-storage/src/storage.rs index a6a04aa..0af6231 100644 --- a/local-storage/src/storage.rs +++ b/local-storage/src/storage.rs @@ -38,6 +38,7 @@ impl Storage { Ok(blocks) }); let blocks = blocks?; + info!("FileBuilder found {} blocks in {path:?}", blocks.len()); let mut root_cid: Option = None; blocks.iter().for_each(|b| { @@ -66,6 +67,7 @@ impl Storage { if blocks.len() == 1 { if let Some(first) = blocks.first() { root_cid = Some(first.cid().to_string()); + info!("set final root {root_cid:?}"); } } if let Some(root_cid) = root_cid { @@ -144,6 +146,29 @@ impl Storage { self.provider .get_dag_blocks_by_window(cid, offset, window_size) } + + pub fn get_last_dag_cid(&self, cid: &str) -> Result { + let dag_cids = self.get_all_dag_cids(cid)?; + match dag_cids.last() { + Some(cid) => Ok(cid.to_owned()), + None => bail!("No last cid found for dag {cid}"), + } + } + + // Given a root CID, a number of CIDs, approximate the window we should be in + // pub fn find_dag_window(&self, root: &str, cid_count: u32, window_size: u32) -> Result { + + // let all_cids = self.get_all_dag_cids(root)?; + // let chunks = all_cids.chunks(window_size as usize); + // let mut window_num = 0; + // for c in chunks { + // if c.contains(&child.to_string()) { + // return Ok(window_num); + // } + // window_num += 1; + // } + // bail!("Failed to find child cid {child} in dag {root}"); + // } } #[cfg(test)] @@ -174,6 +199,46 @@ pub mod tests { } } + fn generate_stored_blocks(num_blocks: u16) -> Result> { + const CHUNK_SIZE: u16 = 20; + let data_size = CHUNK_SIZE * num_blocks; + let mut data = Vec::::new(); + data.resize(data_size.into(), 1); + thread_rng().fill_bytes(&mut data); + + let rt = tokio::runtime::Runtime::new().unwrap(); + let blocks = rt.block_on(async { + let file: File = FileBuilder::new() + .content_bytes(data) + .name("testfile") + .fixed_chunker(CHUNK_SIZE.into()) + .build() + .await + .unwrap(); + let blocks: Vec<_> = file.encode().await.unwrap().try_collect().await.unwrap(); + blocks + }); + let mut stored_blocks = vec![]; + + blocks.iter().for_each(|b| { + let links = b + .links() + .iter() + .map(|c| c.to_string()) + .collect::>(); + let stored = StoredBlock { + cid: b.cid().to_string(), + data: b.data().to_vec(), + links, + filename: None, + }; + + stored_blocks.push(stored); + }); + + Ok(stored_blocks) + } + #[test] pub fn test_import_path_to_storage_single_block() { let harness = TestHarness::new(); @@ -320,6 +385,38 @@ pub mod tests { assert_eq!(blocks.len(), cids.len()); } + #[test] + pub fn test_get_all_dag_cids() { + let harness = TestHarness::new(); + + let mut dag_blocks = generate_stored_blocks(50).unwrap(); + let total_block_count = dag_blocks.len(); + + let root = dag_blocks.pop().unwrap(); + + harness.storage.import_block(&root).unwrap(); + + let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap(); + assert_eq!(dag_cids.len(), 1); + + for _ in (1..10) { + harness + .storage + .import_block(&dag_blocks.pop().unwrap()) + .unwrap(); + } + + let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap(); + assert_eq!(dag_cids.len(), 10); + + while let Some(block) = dag_blocks.pop() { + harness.storage.import_block(&block).unwrap() + } + + let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap(); + assert_eq!(dag_cids.len(), total_block_count); + } + // TODO: duplicated data is not being handled correctly right now, need to fix this // #[test] // pub fn export_from_storage_various_file_sizes_duplicated_data() { diff --git a/messages/src/api.rs b/messages/src/api.rs index d65bf7a..c111576 100644 --- a/messages/src/api.rs +++ b/messages/src/api.rs @@ -82,10 +82,22 @@ pub enum ApplicationAPI { }, // Resumes the transmission of all dags which may be paused ResumeTransmitAllDags, + // Resumes the transmission of a dag from a prior session, given the last received CID + // for determining where to restart the transmission + ResumePriorDagTransmit { + cid: String, + num_received_cids: u32, + retries: u8, + }, /// Listens on address for data and writes out files received Receive { listen_addr: String, }, + /// Commands a node to request another node at target_addr to resume dag transfer + RequestResumeDagTransfer { + cid: String, + target_addr: String, + }, /// Request Available Blocks RequestAvailableBlocks, /// Advertise all available blocks by CID @@ -125,6 +137,8 @@ pub enum ApplicationAPI { AvailableDags { dags: Vec, }, + /// Asks IPFS instance to terminate + Terminate, // TODO: Implement later // Information about the next pass used for calculating // data transfer parameters diff --git a/messages/src/protocol.rs b/messages/src/protocol.rs index ea05f3b..337f2da 100644 --- a/messages/src/protocol.rs +++ b/messages/src/protocol.rs @@ -45,6 +45,14 @@ pub enum DataProtocol { target_addr: String, retries: u8, }, + // Resumes the transmission of a dag which isn't currently tracked in sessions + // This accounts for resuming after restarting of transmitter + ResumePriorDagTransmit { + cid: String, + num_received_cids: u32, + target_addr: String, + retries: u8, + }, // Resumes the transmission of a dag which may have run out of retries or // paused due to connectivity lost ResumeTransmitDag { @@ -66,4 +74,6 @@ pub enum DataProtocol { cid: String, blocks: Vec, }, + // Used by listener to terminate shipper on program exit + Terminate, } diff --git a/myceli/src/handlers.rs b/myceli/src/handlers.rs index 89e8520..561bc4c 100644 --- a/myceli/src/handlers.rs +++ b/myceli/src/handlers.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{bail, Result}; use local_storage::storage::Storage; use messages::{ApplicationAPI, DagInfo, DataProtocol, Message}; use std::path::PathBuf; @@ -77,6 +77,11 @@ pub fn get_available_dags(storage: Rc) -> Result { })) } +pub fn get_last_dag_cid(cid: &str, storage: Rc) -> Result { + let last_dag_cid = storage.get_last_dag_cid(cid)?; + Ok(last_dag_cid) +} + #[cfg(test)] pub mod tests { use super::*; diff --git a/myceli/src/listener.rs b/myceli/src/listener.rs index 8857755..793d258 100644 --- a/myceli/src/listener.rs +++ b/myceli/src/listener.rs @@ -9,7 +9,7 @@ use std::path::PathBuf; use std::rc::Rc; use std::sync::mpsc::{self, Sender}; use std::sync::{Arc, Mutex}; -use std::thread::spawn; +use std::thread::{spawn, JoinHandle}; use tracing::{debug, error, info}; use transports::Transport; @@ -19,6 +19,7 @@ pub struct Listener { transport: Arc, connected: Arc>, radio_address: Option, + shipper_handle: Option>, } impl Listener { @@ -39,6 +40,7 @@ impl Listener { transport, connected: Arc::new(Mutex::new(true)), radio_address, + shipper_handle: None, }) } @@ -55,7 +57,7 @@ impl Listener { let shipper_transport = Arc::clone(&self.transport); let initial_connected = Arc::clone(&self.connected); let shipper_radio = self.radio_address.clone(); - spawn(move || { + self.shipper_handle = Some(spawn(move || { let mut shipper = Shipper::new( &shipper_storage_path, shipper_receiver, @@ -69,7 +71,7 @@ impl Listener { ) .expect("Shipper creation failed"); shipper.receive_msg_loop(); - }); + })); loop { match self.transport.receive() { @@ -79,7 +81,14 @@ impl Listener { } else { sender_addr.to_owned() }; - match self.handle_message(message, &target_addr, shipper_sender.clone()) { + match self.handle_message(message, &sender_addr, shipper_sender.clone()) { + Ok(Some(Message::ApplicationAPI(ApplicationAPI::Terminate))) => { + info!("Received termination command, exiting listener"); + if let Some(handle) = self.shipper_handle.take() { + handle.join().unwrap(); + } + return Ok(()); + } Ok(Some(resp)) => { if let Err(e) = self.transmit_response(resp, &target_addr) { error!("TransmitResponse error: {e}"); @@ -216,6 +225,43 @@ impl Listener { Message::ApplicationAPI(ApplicationAPI::RequestAvailableDags) => { Some(handlers::get_available_dags(self.storage.clone())?) } + Message::ApplicationAPI(ApplicationAPI::Terminate) => { + shipper_sender.send((DataProtocol::Terminate, sender_addr.to_string()))?; + Some(Message::ApplicationAPI(ApplicationAPI::Terminate)) + } + Message::ApplicationAPI(ApplicationAPI::RequestResumeDagTransfer { + cid, + target_addr, + }) => { + let all_cids = self.storage.get_all_dag_cids(&cid)?; + println!("found cids for {cid}, {all_cids:?}"); + let num_received_cids = self.storage.get_all_dag_cids(&cid)?.len() as u32; + self.transmit_response( + Message::ApplicationAPI(ApplicationAPI::ResumePriorDagTransmit { + cid, + num_received_cids, + retries: 5, + }), + &target_addr, + )?; + None + } + Message::ApplicationAPI(ApplicationAPI::ResumePriorDagTransmit { + cid, + num_received_cids, + retries, + }) => { + shipper_sender.send(( + DataProtocol::ResumePriorDagTransmit { + cid, + num_received_cids, + target_addr: sender_addr.to_string(), + retries, + }, + sender_addr.to_string(), + ))?; + None + } // Default case for valid messages which don't have handling code implemented yet message => { info!("Received message: {:?}", message); diff --git a/myceli/src/shipper.rs b/myceli/src/shipper.rs index 50196bc..4ff249c 100644 --- a/myceli/src/shipper.rs +++ b/myceli/src/shipper.rs @@ -78,6 +78,10 @@ impl Shipper { pub fn receive_msg_loop(&mut self) { loop { if let Ok((message, sender_addr)) = self.receiver.recv() { + if let DataProtocol::Terminate = message { + info!("Shipper received terminate signal, exiting"); + break; + } if let Err(e) = self.receive(message, &sender_addr) { error!("{e:?}"); } @@ -181,18 +185,42 @@ impl Shipper { self.resume_all_dag_window_sessions()?; } } + DataProtocol::ResumePriorDagTransmit { + cid, + num_received_cids, + target_addr, + retries, + } => { + // Create dag transmit session with cid, target_addr, retries + // Determine last window using last_received_cid and set in dag transmission session + // If connected then kick off transmission for session + let window_num = (num_received_cids / self.window_size); + // self.storage + // .find_dag_window(&cid, &last_received_cid, self.window_size)?; + self.create_dag_window_session(&cid, retries, &target_addr, window_num); + if *self.connected.lock().unwrap() { + self.resume_dag_window_session(&cid)?; + } + } + DataProtocol::Terminate => {} } Ok(()) } // Helper function for adding a new session to the session list - fn open_dag_window_session(&mut self, cid: &str, retries: u8, target_addr: &str) { + fn create_dag_window_session( + &mut self, + cid: &str, + retries: u8, + target_addr: &str, + window_num: u32, + ) { self.window_sessions .entry(cid.to_string()) .or_insert(WindowSession { max_retries: retries, remaining_window_retries: retries, - window_num: 0, + window_num, target_addr: target_addr.to_string(), }); } @@ -310,10 +338,10 @@ impl Shipper { if *self.connected.lock().unwrap() { self.dag_window_session_run(cid, 0, target_addr)?; let retries = if retries == 0 { 0 } else { retries - 1 }; - self.open_dag_window_session(cid, retries, target_addr); + self.create_dag_window_session(cid, retries, target_addr, 0); self.start_dag_window_retry_timeout(cid); } else { - self.open_dag_window_session(cid, retries, target_addr); + self.create_dag_window_session(cid, retries, target_addr, 0); } Ok(()) diff --git a/testing/testing-scenarios/Cargo.toml b/testing/testing-scenarios/Cargo.toml new file mode 100644 index 0000000..76293d2 --- /dev/null +++ b/testing/testing-scenarios/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "testing-scenarios" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow.workspace = true +file-hashing.workspace = true +myceli.workspace = true +transports.workspace = true +messages.workspace = true +assert_fs.workspace = true +blake2.workspace = true +rand.workspace = true +tracing.workspace = true +tracing-subscriber = { workspace = true, features = ["env-filter", "registry"] } \ No newline at end of file diff --git a/testing/testing-scenarios/src/main.rs b/testing/testing-scenarios/src/main.rs new file mode 100644 index 0000000..7b6bf4e --- /dev/null +++ b/testing/testing-scenarios/src/main.rs @@ -0,0 +1,40 @@ +mod plans; +mod utils; + +use plans::*; +use std::time::Instant; + +fn main() { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .init(); + let testing_list: Vec<(&str, fn())> = vec![ + // ("Transmit 5kb Dag", test_transmit_5kb_dag), + // ("Transmit 500kb Dag", test_transmit_500kb_dag), + // ("Transmit 5mb Dag", test_transmit_5mb_dag), + // ( + // "Transmit 20mb Dag, 5s passes", + // test_transmit_20mb_dag_over_5s_passes, + // ), + // ( + // "Transmit 20mb Dag, 5s passes, receiver off outside of pass", + // test_transmit_20mb_dag_over_5s_passes_receiver_off, + // ), + ( + "Transmit 20mb Dag, 5s passes, transmitter off outside of pass", + test_transmit_20mb_dag_over_5s_passes_transmitter_off, + ), + ]; + + for (name, test_fn) in testing_list { + println!("Running: {name}"); + let start = Instant::now(); + let result = std::panic::catch_unwind(test_fn); + let end = start.elapsed(); + if result.is_err() { + println!("Test failed after {end:.2?}\n"); + } else { + println!("Test passed after {end:.2?}!\n"); + } + } +} diff --git a/testing/testing-scenarios/src/plans/mod.rs b/testing/testing-scenarios/src/plans/mod.rs new file mode 100644 index 0000000..5b76b4f --- /dev/null +++ b/testing/testing-scenarios/src/plans/mod.rs @@ -0,0 +1,423 @@ +use crate::utils::*; +use messages::{ApplicationAPI, Message}; +use std::thread::sleep; +use std::time::Duration; + +pub fn test_transmit_5kb_dag() { + let (mut transmitter, mut receiver, mut controller) = testing_setup(); + + transmitter.start().unwrap(); + receiver.start().unwrap(); + + let test_file_path = transmitter.generate_file(1024 * 5).unwrap(); + let resp = controller.send_and_recv( + &transmitter.listen_addr, + Message::import_file(&test_file_path), + ); + let root_cid = match resp { + Message::ApplicationAPI(ApplicationAPI::FileImported { cid, .. }) => cid, + other => panic!("Failed to receive FileImported msg {other:?}"), + }; + + controller.send_msg( + Message::transmit_dag(&root_cid, &receiver.listen_addr, 0), + &transmitter.listen_addr, + ); + + wait_receiving_done(&receiver, &mut controller); + + let receiver_blocks = + controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); + + let transmitter_blocks = controller.send_and_recv( + &transmitter.listen_addr, + Message::request_available_blocks(), + ); + + assert_eq!(receiver_blocks, transmitter_blocks); + + let receiver_validate_dag = controller.send_and_recv( + &receiver.listen_addr, + Message::ApplicationAPI(ApplicationAPI::ValidateDag { + cid: root_cid.to_string(), + }), + ); + assert_eq!( + receiver_validate_dag, + Message::ApplicationAPI(ApplicationAPI::ValidateDagResponse { + cid: root_cid, + result: "Dag is valid".to_string() + }) + ); +} + +pub fn test_transmit_500kb_dag() { + let (mut transmitter, mut receiver, mut controller) = testing_setup(); + + transmitter.start().unwrap(); + receiver.start().unwrap(); + + let test_file_path = transmitter.generate_file(1024 * 500).unwrap(); + let resp = controller.send_and_recv( + &transmitter.listen_addr, + Message::import_file(&test_file_path), + ); + let root_cid = match resp { + Message::ApplicationAPI(ApplicationAPI::FileImported { cid, .. }) => cid, + other => panic!("Failed to receive FileImported msg {other:?}"), + }; + + controller.send_msg( + Message::transmit_dag(&root_cid, &receiver.listen_addr, 0), + &transmitter.listen_addr, + ); + + wait_receiving_done(&receiver, &mut controller); + + let receiver_blocks = + controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); + + let transmitter_blocks = controller.send_and_recv( + &transmitter.listen_addr, + Message::request_available_blocks(), + ); + + assert_eq!(receiver_blocks, transmitter_blocks); + + let receiver_validate_dag = controller.send_and_recv( + &receiver.listen_addr, + Message::ApplicationAPI(ApplicationAPI::ValidateDag { + cid: root_cid.to_string(), + }), + ); + assert_eq!( + receiver_validate_dag, + Message::ApplicationAPI(ApplicationAPI::ValidateDagResponse { + cid: root_cid, + result: "Dag is valid".to_string() + }) + ); +} + +pub fn test_transmit_5mb_dag() { + let (mut transmitter, mut receiver, mut controller) = testing_setup(); + + transmitter.start().unwrap(); + receiver.start().unwrap(); + + let test_file_path = transmitter.generate_file(1024 * 1024 * 5).unwrap(); + let resp = controller.send_and_recv( + &transmitter.listen_addr, + Message::import_file(&test_file_path), + ); + let root_cid = match resp { + Message::ApplicationAPI(ApplicationAPI::FileImported { cid, .. }) => cid, + other => panic!("Failed to receive FileImported msg {other:?}"), + }; + + controller.send_msg( + Message::transmit_dag(&root_cid, &receiver.listen_addr, 0), + &transmitter.listen_addr, + ); + + wait_receiving_done(&receiver, &mut controller); + + let receiver_blocks = + controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); + + let transmitter_blocks = controller.send_and_recv( + &transmitter.listen_addr, + Message::request_available_blocks(), + ); + + assert_eq!(receiver_blocks, transmitter_blocks); + + let receiver_validate_dag = controller.send_and_recv( + &receiver.listen_addr, + Message::ApplicationAPI(ApplicationAPI::ValidateDag { + cid: root_cid.to_string(), + }), + ); + assert_eq!( + receiver_validate_dag, + Message::ApplicationAPI(ApplicationAPI::ValidateDagResponse { + cid: root_cid, + result: "Dag is valid".to_string() + }) + ); +} + +pub fn test_transmit_20mb_dag_over_5s_passes() { + let (mut transmitter, mut receiver, mut controller) = testing_setup(); + + transmitter.start().unwrap(); + receiver.start().unwrap(); + + let test_file_path = transmitter.generate_file(1024 * 1024 * 20).unwrap(); + let resp = controller.send_and_recv( + &transmitter.listen_addr, + Message::import_file(&test_file_path), + ); + let root_cid = match resp { + Message::ApplicationAPI(ApplicationAPI::FileImported { cid, .. }) => cid, + other => panic!("Failed to receive FileImported msg {other:?}"), + }; + + // Begin transmission + controller.send_msg( + Message::transmit_dag(&root_cid, &receiver.listen_addr, 0), + &transmitter.listen_addr, + ); + + // Begin pass cycle, capping out at 10 passes + for _ in 1..10 { + // Begin pass + controller.send_msg( + Message::ApplicationAPI(ApplicationAPI::SetConnected { connected: true }), + &transmitter.listen_addr, + ); + + // wait for 5s pass + sleep(Duration::from_secs(5)); + + // End pass + controller.send_msg( + Message::ApplicationAPI(ApplicationAPI::SetConnected { connected: false }), + &transmitter.listen_addr, + ); + + // wait 1s inter-pass period + sleep(Duration::from_secs(1)); + + // Check if transfer completed in prior pass + if controller.send_and_recv( + &receiver.listen_addr, + Message::ApplicationAPI(ApplicationAPI::ValidateDag { + cid: root_cid.to_string(), + }), + ) == Message::ApplicationAPI(ApplicationAPI::ValidateDagResponse { + cid: root_cid.to_string(), + result: "Dag is valid".to_string(), + }) { + break; + } + } + + // Verify complete file was passes complete + let receiver_blocks = + controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); + + let transmitter_blocks = controller.send_and_recv( + &transmitter.listen_addr, + Message::request_available_blocks(), + ); + + assert_eq!(receiver_blocks, transmitter_blocks); + + let receiver_validate_dag = controller.send_and_recv( + &receiver.listen_addr, + Message::ApplicationAPI(ApplicationAPI::ValidateDag { + cid: root_cid.to_string(), + }), + ); + assert_eq!( + receiver_validate_dag, + Message::ApplicationAPI(ApplicationAPI::ValidateDagResponse { + cid: root_cid, + result: "Dag is valid".to_string() + }) + ); +} + +pub fn test_transmit_20mb_dag_over_5s_passes_receiver_off() { + let (mut transmitter, mut receiver, mut controller) = testing_setup(); + + transmitter.start().unwrap(); + receiver.start().unwrap(); + + let test_file_path = transmitter.generate_file(1024 * 1024 * 20).unwrap(); + let resp = controller.send_and_recv( + &transmitter.listen_addr, + Message::import_file(&test_file_path), + ); + let root_cid = match resp { + Message::ApplicationAPI(ApplicationAPI::FileImported { cid, .. }) => cid, + other => panic!("Failed to receive FileImported msg {other:?}"), + }; + + // Begin transmission + controller.send_msg( + Message::transmit_dag(&root_cid, &receiver.listen_addr, 0), + &transmitter.listen_addr, + ); + + // Begin pass cycle, capping out at 10 passes + for _ in 1..10 { + receiver.start().unwrap(); + // Begin pass + controller.send_msg( + Message::ApplicationAPI(ApplicationAPI::SetConnected { connected: true }), + &transmitter.listen_addr, + ); + + // wait for 5s pass + sleep(Duration::from_secs(5)); + + // End pass + controller.send_msg( + Message::ApplicationAPI(ApplicationAPI::SetConnected { connected: false }), + &transmitter.listen_addr, + ); + + // wait 1s inter-pass period + sleep(Duration::from_secs(1)); + + // Check if transfer completed in prior pass + if controller.send_and_recv( + &receiver.listen_addr, + Message::ApplicationAPI(ApplicationAPI::ValidateDag { + cid: root_cid.to_string(), + }), + ) == Message::ApplicationAPI(ApplicationAPI::ValidateDagResponse { + cid: root_cid.to_string(), + result: "Dag is valid".to_string(), + }) { + break; + } + + // Terminate listener + controller.send_msg( + Message::ApplicationAPI(ApplicationAPI::Terminate), + &receiver.listen_addr, + ); + receiver.stop(); + } + + // Verify complete file was passes complete + let receiver_blocks = + controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); + + let transmitter_blocks = controller.send_and_recv( + &transmitter.listen_addr, + Message::request_available_blocks(), + ); + + assert_eq!(receiver_blocks, transmitter_blocks); + + let receiver_validate_dag = controller.send_and_recv( + &receiver.listen_addr, + Message::ApplicationAPI(ApplicationAPI::ValidateDag { + cid: root_cid.to_string(), + }), + ); + assert_eq!( + receiver_validate_dag, + Message::ApplicationAPI(ApplicationAPI::ValidateDagResponse { + cid: root_cid, + result: "Dag is valid".to_string() + }) + ); +} + +pub fn test_transmit_20mb_dag_over_5s_passes_transmitter_off() { + let (mut transmitter, mut receiver, mut controller) = testing_setup(); + + transmitter.start().unwrap(); + receiver.start().unwrap(); + + let test_file_path = transmitter.generate_file(1024 * 1024 * 20).unwrap(); + println!("Sending import-file {} to transmitter", &test_file_path); + let resp = controller.send_and_recv( + &transmitter.listen_addr, + Message::import_file(&test_file_path), + ); + let root_cid = match resp { + Message::ApplicationAPI(ApplicationAPI::FileImported { cid, .. }) => cid, + other => panic!("Failed to receive FileImported msg {other:?}"), + }; + + // Begin transmission + controller.send_msg( + Message::transmit_dag(&root_cid, &receiver.listen_addr, 5), + &transmitter.listen_addr, + ); + + sleep(Duration::from_secs(5)); + + // Begin pass cycle, capping out at 5 passes + for _ in 1..5 { + // End pass by terminating transmitter and disconnecting receiver + controller.send_msg( + Message::ApplicationAPI(ApplicationAPI::Terminate), + &transmitter.listen_addr, + ); + transmitter.stop(); + + controller.send_msg( + Message::ApplicationAPI(ApplicationAPI::SetConnected { connected: false }), + &receiver.listen_addr, + ); + println!("Sending validate-dag"); + // Check if transfer completed in prior pass + let resp = controller.send_and_recv( + &receiver.listen_addr, + Message::ApplicationAPI(ApplicationAPI::ValidateDag { + cid: root_cid.to_string(), + }), + ); + println!("got validate dag resp {resp:?}"); + if resp + == Message::ApplicationAPI(ApplicationAPI::ValidateDagResponse { + cid: root_cid.to_string(), + result: "Dag is valid".to_string(), + }) + { + break; + } + + // wait 1s inter-pass period + sleep(Duration::from_secs(1)); + + // Start next pass by starting transmitter and connecting receiver + transmitter.start().unwrap(); + controller.send_msg( + Message::ApplicationAPI(ApplicationAPI::SetConnected { connected: true }), + &receiver.listen_addr, + ); + controller.send_msg( + Message::ApplicationAPI(ApplicationAPI::RequestResumeDagTransfer { + cid: root_cid.to_string(), + target_addr: transmitter.listen_addr.to_string(), + }), + &receiver.listen_addr, + ); + + // wait for 5s pass + sleep(Duration::from_secs(5)); + } + println!("Sending request-available-blocks to receiver"); + // Verify complete file was passes complete + let receiver_blocks = + controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); + println!("Sending request-available-blocks to transmitter"); + let transmitter_blocks = controller.send_and_recv( + &transmitter.listen_addr, + Message::request_available_blocks(), + ); + + // assert_eq!(receiver_blocks.len(), transmitter_blocks.len()); + println!("Sending validate-dag to receiver"); + let receiver_validate_dag = controller.send_and_recv( + &receiver.listen_addr, + Message::ApplicationAPI(ApplicationAPI::ValidateDag { + cid: root_cid.to_string(), + }), + ); + assert_eq!( + receiver_validate_dag, + Message::ApplicationAPI(ApplicationAPI::ValidateDagResponse { + cid: root_cid, + result: "Dag is valid".to_string() + }) + ); +} diff --git a/testing/testing-scenarios/src/utils.rs b/testing/testing-scenarios/src/utils.rs new file mode 100644 index 0000000..2be56a7 --- /dev/null +++ b/testing/testing-scenarios/src/utils.rs @@ -0,0 +1,173 @@ +use anyhow::Result; +use assert_fs::fixture::ChildPath; +use assert_fs::{fixture::FileWriteBin, fixture::PathChild, TempDir}; +use blake2::{Blake2s256, Digest}; +use file_hashing::get_hash_file; +use messages::Message; +use myceli::listener::Listener; +use rand::{thread_rng, Rng, RngCore}; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::path::PathBuf; +use std::sync::Arc; +use std::thread::{sleep, spawn}; +use std::time::Duration; +use transports::{Transport, UdpTransport}; + +pub fn testing_setup() -> (TestListener, TestListener, TestController) { + let transmitter = TestListener::new(); + let receiver = TestListener::new(); + let controller = TestController::new(); + (transmitter, receiver, controller) +} + +pub fn wait_receiving_done(receiver: &TestListener, controller: &mut TestController) { + let mut prev_num_blocks = 0; + let mut num_retries = 0; + + loop { + let current_blocks = + if let Message::ApplicationAPI(messages::ApplicationAPI::AvailableBlocks { cids }) = + controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()) + { + cids + } else { + panic!("Failed to get correct response to blocks request"); + }; + let current_num_blocks = current_blocks.len(); + if current_num_blocks > prev_num_blocks { + prev_num_blocks = current_num_blocks; + num_retries = 0; + } else { + if num_retries > 10 { + break; + } + num_retries += 1; + } + sleep(Duration::from_millis(100)); + } +} + +pub struct TestListener { + pub listen_addr: String, + pub test_dir: TempDir, + thread_handle: Option>, + db_path: ChildPath, +} + +impl TestListener { + pub fn new() -> TestListener { + let test_dir = TempDir::new().unwrap(); + let mut rng = thread_rng(); + let port_num = rng.gen_range(6000..9000); + let listen_addr = format!("127.0.0.1:{port_num}"); + let db_path = test_dir.child("storage.db"); + + TestListener { + listen_addr, + test_dir, + db_path, + thread_handle: None, + } + } + + pub fn start(&mut self) -> Result<()> { + if self.thread_handle.is_none() { + let thread_listen_addr = self + .listen_addr + .to_owned() + .to_socket_addrs() + .map(|mut i| i.next().unwrap()) + .unwrap(); + + let thread_db_path = self.db_path.to_owned(); + + self.thread_handle = Some(spawn(move || { + start_listener_thread(thread_listen_addr, thread_db_path) + })); + + // A little wait so the listener can get listening + sleep(Duration::from_millis(50)); + } + Ok(()) + } + + pub fn stop(&mut self) { + if let Some(handle) = self.thread_handle.take() { + handle.join().unwrap(); + } + } + + pub fn generate_file(&self, size: u32) -> Result { + let mut data = Vec::::new(); + data.resize(size as usize, 1); + thread_rng().fill_bytes(&mut data); + + let tmp_file = self.test_dir.child("test.file"); + tmp_file.write_binary(&data)?; + Ok(tmp_file.path().to_str().unwrap().to_owned()) + } +} + +fn start_listener_thread(listen_addr: SocketAddr, db_path: PathBuf) { + let db_path = db_path.to_str().unwrap(); + let listen_addr_str = listen_addr.to_string(); + let mut transport = UdpTransport::new(&listen_addr_str, 512, None).unwrap(); + transport + .set_read_timeout(Some(Duration::from_millis(500))) + .unwrap(); + transport.set_max_read_attempts(Some(10)); + let transport = Arc::new(transport); + let mut listener = Listener::new(&listen_addr, db_path, transport, 1024 * 3, None).unwrap(); + listener + .start(10, 5, 1024 * 3) + .expect("Error encountered in listener"); +} + +pub struct TestController { + pub transport: UdpTransport, +} + +impl TestController { + pub fn new() -> Self { + let mut transport = UdpTransport::new("127.0.0.1:0", 512, None).unwrap(); + transport + .set_read_timeout(Some(Duration::from_millis(50))) + .unwrap(); + transport.set_max_read_attempts(Some(1)); + TestController { transport } + } + + pub fn send_and_recv(&mut self, target_addr: &str, message: Message) -> Message { + println!("\n\t#\tSending a msg to {}, will expect a response: {:?}\n", target_addr, &message); + self.send_msg(message, target_addr); + let mut retries = 0; + loop { + if let Ok(msg) = self.recv_msg() { + println!("\t#\tGot the expected response: {:?}\n\n", &msg); + return msg; + } + if retries > 50 { + panic!("Send recv failed"); + } + retries += 1; + sleep(Duration::from_secs(1)); + } + } + + pub fn send_msg(&self, message: Message, target_addr: &str) { + self.transport + .send(message, target_addr) + .expect("Transport send failed"); + } + + pub fn recv_msg(&mut self) -> Result { + let (msg, _) = self.transport.receive()?; + Ok(msg) + } +} + +pub fn hash_file(path_str: &str) -> String { + let path = PathBuf::from(path_str); + let mut hash = Blake2s256::new(); + get_hash_file(path, &mut hash).unwrap() +}