diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..ae9f574 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +# Make all line endings LF +* text=auto diff --git a/.github/workflows/rust_build.yml b/.github/workflows/rust_build.yml new file mode 100644 index 0000000..2bdb7ca --- /dev/null +++ b/.github/workflows/rust_build.yml @@ -0,0 +1,39 @@ +name: Rust CI + +on: + push: + branches: + - "*" # Trigger for all branches + pull_request: + branches: + - "*" # Trigger for all branches + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable # Use stable Rust; change if needed + + - name: Cache Cargo + uses: actions/cache@v3 + with: + path: ~/.cargo + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo- + + - name: Build with Cargo + run: cargo build --verbose diff --git a/.gitignore b/.gitignore index 6985cf1..e097d36 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# pycache +experiments/__pycache/* + # Generated by Cargo # will have compiled files and executables debug/ diff --git a/README.md b/README.md index 3d09404..7f87e3a 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,9 @@ running the binary and with the `--help` flag. -s "memory" # use "table" to use Azure table instead and provide the following -a AZURE_STORAGE_ACCOUNT_NAME -k AZURE_STORAGE_MASTER_KEY + -m The maximum number each endorser can fail a ping before it is considered dead. Don't set this, or set it to 0 to disable pinging. + -pr the percentage of endorsers that should be held at all time + -to the time at which a ping times out. This is in secounds ``` Below is a helper tool to interact with the coordinator. After you diff --git a/SUPPORT.md b/SUPPORT.md index 37ad5c4..5096e3a 100644 --- a/SUPPORT.md +++ b/SUPPORT.md @@ -1,13 +1,13 @@ -# Support - -## How to file issues and get help - -This project uses GitHub Issues to track bugs and feature requests. Please search the existing -issues before filing new issues to avoid duplicates. For new issues, file your bug or -feature request as a new Issue. - -For help and questions about using this project, please open an issue on GitHub. - -## Microsoft Support Policy - -Support for this **PROJECT or PRODUCT** is limited to the resources listed above. +# Support + +## How to file issues and get help + +This project uses GitHub Issues to track bugs and feature requests. Please search the existing +issues before filing new issues to avoid duplicates. For new issues, file your bug or +feature request as a new Issue. + +For help and questions about using this project, please open an issue on GitHub. + +## Microsoft Support Policy + +Support for this **PROJECT or PRODUCT** is limited to the resources listed above. diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 148d29b..a5aabc6 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -23,10 +23,14 @@ base64-url = "1.4.13" serde_derive = { version = "1.0" } serde_json = "1.0" rand = "0.8.4" +clokwerk = "0.4.0" +time = "0.3.37" +log = "0.4.14" +async-lock = "3.4.0" [dev-dependencies] rand = "0.8.4" [build-dependencies] tonic-build = "0.8.2" -prost-build = "0.11.1" \ No newline at end of file +prost-build = "0.11.1" diff --git a/coordinator/src/coordinator_state.rs b/coordinator/src/coordinator_state.rs index fa10db1..20084a7 100644 --- a/coordinator/src/coordinator_state.rs +++ b/coordinator/src/coordinator_state.rs @@ -3,15 +3,20 @@ use ledger::{ compute_aggregated_block_hash, compute_cut_diffs, compute_max_cut, errors::VerificationError, signature::{PublicKey, PublicKeyTrait}, - Block, CustomSerde, EndorserHostnames, Handle, MetaBlock, NimbleDigest, NimbleHashTrait, Nonce, - Nonces, Receipt, Receipts, VerifierState, + Block, CustomSerde, EndorserHostnames, Handle, IdSig, MetaBlock, NimbleDigest, NimbleHashTrait, + Nonce, Nonces, Receipt, Receipts, VerifierState, }; -use rand::random; +use log::error; +use rand::{random, Rng}; use std::{ collections::{HashMap, HashSet}, convert::TryInto, ops::Deref, - sync::{Arc, RwLock}, + sync::{ + atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering::SeqCst}, + Arc, RwLock, + }, + time::Duration, }; use store::ledger::{ azure_table::TableLedgerStore, filestore::FileStore, in_memory::InMemoryLedgerStore, @@ -24,32 +29,51 @@ use tonic::{ Code, Status, }; +use clokwerk::TimeUnits; use ledger::endorser_proto; +//use tracing::{error, info}; +//use tracing_subscriber; + const DEFAULT_NUM_GRPC_CHANNELS: usize = 1; // the default number of GRPC channels +enum EndorserUsageState { + Uninitialized, + Initialized, + Active, + Finalized, +} + struct EndorserClients { clients: Vec>, uri: String, + failures: u64, + usage_state: EndorserUsageState, } type EndorserConnMap = HashMap, EndorserClients>; type LedgerStoreRef = Arc>; +#[derive(Clone)] pub struct CoordinatorState { pub(crate) ledger_store: LedgerStoreRef, conn_map: Arc>, verifier_state: Arc>, num_grpc_channels: usize, + _used_nonces: Arc>>>, } const ENDORSER_MPSC_CHANNEL_BUFFER: usize = 8; // limited by the number of endorsers const ENDORSER_CONNECT_TIMEOUT: u64 = 10; // seconds: the connect timeout to endorsres -const ENDORSER_REQUEST_TIMEOUT: u64 = 10; // seconds: the request timeout to endorsers const ATTESTATION_STR: &str = "THIS IS A PLACE HOLDER FOR ATTESTATION"; +static DEAD_ENDORSERS: AtomicUsize = AtomicUsize::new(0); // Set the number of currently dead endorsers +static MAX_FAILURES: AtomicU64 = AtomicU64::new(3); +static ENDORSER_REQUEST_TIMEOUT: AtomicU64 = AtomicU64::new(10); +static PING_INTERVAL: AtomicU32 = AtomicU32::new(10); // seconds + async fn get_public_key_with_retry( endorser_client: &mut endorser_proto::endorser_call_client::EndorserCallClient, request: endorser_proto::GetPublicKeyReq, @@ -76,6 +100,32 @@ async fn get_public_key_with_retry( } } +async fn get_ping_with_retry( + endorser_client: &mut endorser_proto::endorser_call_client::EndorserCallClient, + request: endorser_proto::PingReq, +) -> Result, Status> { + loop { + let res = endorser_client + .ping(tonic::Request::new(request.clone())) + .await; + match res { + Ok(resp) => { + return Ok(resp); + }, + Err(status) => { + match status.code() { + Code::ResourceExhausted => { + continue; + }, + _ => { + return Err(status); + }, + }; + }, + }; + } +} + async fn new_ledger_with_retry( endorser_client: &mut endorser_proto::endorser_call_client::EndorserCallClient, request: endorser_proto::NewLedgerReq, @@ -443,6 +493,17 @@ fn process_error( } impl CoordinatorState { + /// Creates a new instance of `CoordinatorState`. + /// + /// # Arguments + /// + /// * `ledger_store_type` - The type of ledger store to use. + /// * `args` - A map of arguments for the ledger store. + /// * `num_grpc_channels_opt` - An optional number of gRPC channels. + /// + /// # Returns + /// + /// A result containing the new `CoordinatorState` or a `CoordinatorError`. pub async fn new( ledger_store_type: &str, args: &HashMap, @@ -458,24 +519,28 @@ impl CoordinatorState { conn_map: Arc::new(RwLock::new(HashMap::new())), verifier_state: Arc::new(RwLock::new(VerifierState::new())), num_grpc_channels, + _used_nonces: Arc::new(RwLock::new(HashSet::new())), }, "table" => CoordinatorState { ledger_store: Arc::new(Box::new(TableLedgerStore::new(args).await.unwrap())), conn_map: Arc::new(RwLock::new(HashMap::new())), verifier_state: Arc::new(RwLock::new(VerifierState::new())), num_grpc_channels, + _used_nonces: Arc::new(RwLock::new(HashSet::new())), }, "filestore" => CoordinatorState { ledger_store: Arc::new(Box::new(FileStore::new(args).await.unwrap())), conn_map: Arc::new(RwLock::new(HashMap::new())), verifier_state: Arc::new(RwLock::new(VerifierState::new())), num_grpc_channels, + _used_nonces: Arc::new(RwLock::new(HashSet::new())), }, _ => CoordinatorState { ledger_store: Arc::new(Box::new(InMemoryLedgerStore::new())), conn_map: Arc::new(RwLock::new(HashMap::new())), verifier_state: Arc::new(RwLock::new(VerifierState::new())), num_grpc_channels, + _used_nonces: Arc::new(RwLock::new(HashSet::new())), }, }; @@ -613,6 +678,34 @@ impl CoordinatorState { Ok(coordinator) } + /// Starts the auto scheduler for pinging endorsers. + pub async fn start_auto_scheduler(self: Arc) { + let mut scheduler = clokwerk::AsyncScheduler::new(); + scheduler + .every(PING_INTERVAL.load(SeqCst).seconds()) + .run(move || { + let value = self.clone(); + async move { value.ping_all_endorsers().await } + }); + + tokio::spawn(async move { + loop { + scheduler.run_pending().await; + tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + println!("Started the scheduler"); + } + + /// Connects to existing endorsers using the view ledger block. + /// + /// # Arguments + /// + /// * `view_ledger_block` - The view ledger block. + /// + /// # Returns + /// + /// A result containing the endorser hostnames or a `CoordinatorError`. async fn connect_to_existing_endorsers( &self, view_ledger_block: &[u8], @@ -639,6 +732,15 @@ impl CoordinatorState { Ok(endorsers) } + /// Gets the endorser client for the given public key. + /// + /// # Arguments + /// + /// * `pk` - The public key of the endorser. + /// + /// # Returns + /// + /// An optional tuple containing the endorser client and URI. fn get_endorser_client( &self, pk: &[u8], @@ -664,6 +766,11 @@ impl CoordinatorState { } } + /// Gets the public keys of all endorsers. + /// + /// # Returns + /// + /// A vector of public keys. pub fn get_endorser_pks(&self) -> Vec> { if let Ok(conn_map_rd) = self.conn_map.read() { conn_map_rd @@ -676,6 +783,11 @@ impl CoordinatorState { } } + /// Gets the URIs of all endorsers. + /// + /// # Returns + /// + /// A vector of URIs. pub fn get_endorser_uris(&self) -> Vec { if let Ok(conn_map_rd) = self.conn_map.read() { conn_map_rd @@ -688,6 +800,11 @@ impl CoordinatorState { } } + /// Gets the hostnames of all endorsers. + /// + /// # Returns + /// + /// A vector of endorser hostnames. fn get_endorser_hostnames(&self) -> EndorserHostnames { if let Ok(conn_map_rd) = self.conn_map.read() { conn_map_rd @@ -700,6 +817,15 @@ impl CoordinatorState { } } + /// Gets the public key of an endorser by hostname. + /// + /// # Arguments + /// + /// * `hostname` - The hostname of the endorser. + /// + /// # Returns + /// + /// An optional public key. pub fn get_endorser_pk(&self, hostname: &str) -> Option> { if let Ok(conn_map_rd) = self.conn_map.read() { for (pk, endorser) in conn_map_rd.iter() { @@ -711,6 +837,15 @@ impl CoordinatorState { None } + /// Connects to the given endorsers. + /// + /// # Arguments + /// + /// * `hostnames` - The hostnames of the endorsers. + /// + /// # Returns + /// + /// A vector of endorser hostnames. pub async fn connect_endorsers(&self, hostnames: &[String]) -> EndorserHostnames { let (mpsc_tx, mut mpsc_rx) = mpsc::channel(ENDORSER_MPSC_CHANNEL_BUFFER); for hostname in hostnames { @@ -723,8 +858,9 @@ impl CoordinatorState { if let Ok(endorser_endpoint) = res { let endorser_endpoint = endorser_endpoint .connect_timeout(std::time::Duration::from_secs(ENDORSER_CONNECT_TIMEOUT)); - let endorser_endpoint = - endorser_endpoint.timeout(std::time::Duration::from_secs(ENDORSER_REQUEST_TIMEOUT)); + let endorser_endpoint = endorser_endpoint.timeout(std::time::Duration::from_secs( + ENDORSER_REQUEST_TIMEOUT.load(SeqCst), + )); let res = endorser_endpoint.connect().await; if let Ok(channel) = res { let mut client = @@ -774,6 +910,8 @@ impl CoordinatorState { let mut endorser_clients = EndorserClients { clients: Vec::new(), uri: endorser, + failures: 0, + usage_state: EndorserUsageState::Uninitialized, }; endorser_clients.clients.push(client); conn_map_wr.insert(pk, endorser_clients); @@ -783,7 +921,7 @@ impl CoordinatorState { }, }; } else { - eprintln!("Failed to acquire the write lock"); + eprintln!("Failed to acquire the conn_map write lock"); } } } @@ -791,6 +929,11 @@ impl CoordinatorState { endorser_hostnames } + /// Disconnects the given endorsers. + /// + /// # Arguments + /// + /// * `endorsers` - The endorsers to disconnect. pub async fn disconnect_endorsers(&self, endorsers: &EndorserHostnames) { if let Ok(mut conn_map_wr) = self.conn_map.write() { for (pk, uri) in endorsers { @@ -810,6 +953,16 @@ impl CoordinatorState { } } + /// Filters the endorsers based on the view ledger height. + /// + /// # Arguments + /// + /// * `endorsers` - The endorsers to filter. + /// * `view_ledger_height` - The height of the view ledger. + /// + /// # Returns + /// + /// A result indicating success or a `CoordinatorError`. async fn filter_endorsers( &self, endorsers: &EndorserHostnames, @@ -871,6 +1024,20 @@ impl CoordinatorState { Ok(()) } + /// Initializes the state of the endorsers. + /// + /// # Arguments + /// + /// * `group_identity` - The group identity of the endorsers. + /// * `endorsers` - The endorsers to initialize. + /// * `ledger_tail_map` - The ledger tail map. + /// * `view_tail_metablock` - The tail metablock of the view ledger. + /// * `block_hash` - The hash of the block. + /// * `expected_height` - The expected height of the ledger. + /// + /// # Returns + /// + /// A `Receipts` object containing the receipts. async fn endorser_initialize_state( &self, group_identity: &NimbleDigest, @@ -917,7 +1084,18 @@ impl CoordinatorState { let endorser_proto::InitializeStateResp { receipt } = resp.into_inner(); let res = Receipt::from_bytes(&receipt); match res { - Ok(receipt_rs) => receipts.add(&receipt_rs), + Ok(receipt_rs) => { + receipts.add(&receipt_rs); + if let Ok(mut conn_map_wr) = self.conn_map.write() { + let e = conn_map_wr.get_mut(&pk_bytes); + match e { + None => eprintln!("Couldn't find Endorser in conn_map"), + Some(v) => v.usage_state = EndorserUsageState::Initialized, + } + } else { + eprintln!("Couldn't get write lock on conn_map"); + } + }, Err(error) => eprintln!("Failed to parse a receipt ({:?})", error), } }, @@ -940,6 +1118,18 @@ impl CoordinatorState { receipts } + /// Creates a new ledger with the given handle, block hash, and block. + /// + /// # Arguments + /// + /// * `endorsers` - The endorsers to create the ledger. + /// * `ledger_handle` - The handle of the ledger. + /// * `ledger_block_hash` - The hash of the block. + /// * `ledger_block` - The block to add to the ledger. + /// + /// # Returns + /// + /// A result containing the receipts or a `CoordinatorError`. async fn endorser_create_ledger( &self, endorsers: &[Vec], @@ -1014,6 +1204,20 @@ impl CoordinatorState { Ok(receipts) } + /// Appends a block to the ledger with the given handle, block hash, expected height, block, and nonces. + /// + /// # Arguments + /// + /// * `endorsers` - The endorsers to append the ledger. + /// * `ledger_handle` - The handle of the ledger. + /// * `block_hash` - The hash of the block. + /// * `expected_height` - The expected height of the ledger. + /// * `block` - The block to append to the ledger. + /// * `nonces` - The nonces to use for appending the block. + /// + /// # Returns + /// + /// A result containing the receipts or a `CoordinatorError`. pub async fn endorser_append_ledger( &self, endorsers: &[Vec], @@ -1169,6 +1373,14 @@ impl CoordinatorState { Ok(receipts) } + /// Updates the ledger for the given endorsers. + /// + /// # Arguments + /// + /// * `endorsers` - The endorsers to update the ledger. + /// * `ledger_handle` - The handle of the ledger. + /// * `max_height` - The maximum height of the ledger. + /// * `endorser_height_map` - A map of endorser heights. async fn endorser_update_ledger( &self, endorsers: &[Vec], @@ -1233,6 +1445,17 @@ impl CoordinatorState { } } + /// Reads the tail of the ledger for the given endorsers. + /// + /// # Arguments + /// + /// * `endorsers` - The endorsers to read the ledger tail. + /// * `ledger_handle` - The handle of the ledger. + /// * `client_nonce` - The nonce to use for reading the ledger tail. + /// + /// # Returns + /// + /// A result containing the ledger entry or a `CoordinatorError`. async fn endorser_read_ledger_tail( &self, endorsers: &[Vec], @@ -1341,6 +1564,17 @@ impl CoordinatorState { Err(CoordinatorError::FailedToObtainQuorum) } + /// Finalizes the state of the endorsers. + /// + /// # Arguments + /// + /// * `endorsers` - The endorsers to finalize the state. + /// * `block_hash` - The hash of the block. + /// * `expected_height` - The expected height of the ledger. + /// + /// # Returns + /// + /// A tuple containing the receipts and ledger tail maps. async fn endorser_finalize_state( &self, endorsers: &EndorserHostnames, @@ -1388,6 +1622,14 @@ impl CoordinatorState { let receipt_rs = match res { Ok(receipt_rs) => { receipts.add(&receipt_rs); + if let Ok(mut conn_map_wr) = self.conn_map.write() { + match conn_map_wr.get_mut(&pk_bytes) { + None => eprintln!("Endorser wasn't in conn_map during finalization."), + Some(e) => e.usage_state = EndorserUsageState::Finalized, + } + } else { + eprint!("Couldn't get write lock on conn_map"); + } receipt_rs }, Err(error) => { @@ -1417,6 +1659,20 @@ impl CoordinatorState { (receipts, ledger_tail_maps) } + /// Verifies the view change for the given endorsers. + /// + /// # Arguments + /// + /// * `endorsers` - The endorsers to verify the view change. + /// * `old_config` - The old configuration. + /// * `new_config` - The new configuration. + /// * `ledger_tail_maps` - The ledger tail maps. + /// * `ledger_chunks` - The ledger chunks. + /// * `receipts` - The receipts. + /// + /// # Returns + /// + /// The number of verified endorsers. async fn endorser_verify_view_change( &self, endorsers: &EndorserHostnames, @@ -1460,9 +1716,23 @@ impl CoordinatorState { let mut num_verified_endorers = 0; + // TODO: Better error handling here while let Some((endorser, pk_bytes, res)) = mpsc_rx.recv().await { match res { Ok(_resp) => { + if let Ok(mut conn_map_wr) = self.conn_map.write() { + let e = conn_map_wr.get_mut(&pk_bytes); + match e { + None => { + eprintln!("Couldn't find endorser in conn_map"); + }, + Some(v) => { + v.usage_state = EndorserUsageState::Active; + }, + } + } else { + eprintln!("Couldn't get write lock on conn_map"); + } num_verified_endorers += 1; }, Err(status) => { @@ -1476,19 +1746,79 @@ impl CoordinatorState { }, } } - num_verified_endorers } + /// Replaces the endorsers with the given hostnames. + /// + /// # Arguments + /// + /// * `hostnames` - The hostnames of the new endorsers. + /// + /// # Returns + /// + /// A result indicating success or a `CoordinatorError`. pub async fn replace_endorsers(&self, hostnames: &[String]) -> Result<(), CoordinatorError> { - let existing_endorsers = self.get_endorser_hostnames(); + let existing_endorsers = self.get_endorser_uris(); + + // Check if hostnames contains endorsers that are not in existing_endorsers. + // If yes, connect to those and then continue + // Once done, select the new endorser quorum from the conn_map and reconfigure + + if !hostnames.is_empty() { + // Filter out those endorsers which haven't been connected to, yet and connect to them. + let mut added_endorsers: Vec = hostnames.to_vec(); + added_endorsers.retain(|x| !existing_endorsers.contains(x)); + + let added_endorsers = self.connect_endorsers(&added_endorsers).await; + // After the previous ^ line the new endorsers are in the conn_map as uninitialized + if added_endorsers.is_empty() { + // This is not an error as long as there are enough qualified endorsers already connected + println!("New endorsers couldn't be reached"); + } else { + println!("Connected to new endorsers"); + } + } + + // Now all available endorsers are in the conn_map, so we select the new quorum from + // there + + let new_endorsers: EndorserHostnames; + let old_endorsers: EndorserHostnames; - // Connect to new endorsers - let new_endorsers = self.connect_endorsers(hostnames).await; - if new_endorsers.is_empty() { - return Err(CoordinatorError::NoNewEndorsers); + if let Ok(conn_map_rd) = self.conn_map.read() { + new_endorsers = conn_map_rd + .iter() + .filter(|(_pk, endorser)| { + matches!(endorser.usage_state, EndorserUsageState::Uninitialized) + && endorser.failures == 0 + }) + .map(|(pk, endorser)| (pk.clone(), endorser.uri.clone())) + .collect(); + + old_endorsers = conn_map_rd + .iter() + .filter(|(_pk, endorser)| matches!(endorser.usage_state, EndorserUsageState::Active)) + .map(|(pk, endorser)| (pk.clone(), endorser.uri.clone())) + .collect(); + if new_endorsers.is_empty() { + eprintln!("No eligible endorsers"); + return Err(CoordinatorError::FailedToObtainQuorum); + } + } else { + eprintln!("Couldn't get read lock on conn_map"); + return Err(CoordinatorError::FailedToAcquireReadLock); } + for (_pk, uri) in &new_endorsers { + println!("New endorser URI: {}", uri); + } + + DEAD_ENDORSERS.store(0, SeqCst); + + // At this point new_endorsers should contain the hostnames of the new quorum + // and old_endorsers should contain the currently active quorum + // Package the list of endorsers into a genesis block of the view ledger let view_ledger_genesis_block = { let res = bincode::serialize(&new_endorsers); @@ -1499,7 +1829,7 @@ impl CoordinatorState { let block_vec = res.unwrap(); Block::new(&block_vec) }; - + println!("created view ledger genesis block"); // Read the current ledger tail let res = self.ledger_store.read_view_ledger_tail().await; @@ -1510,7 +1840,7 @@ impl CoordinatorState { ); return Err(CoordinatorError::FailedToCallLedgerStore); } - + println!("read view ledger tail"); let (tail, height) = res.unwrap(); // Store the genesis block of the view ledger in the ledger store @@ -1525,12 +1855,12 @@ impl CoordinatorState { ); return Err(CoordinatorError::FailedToCallLedgerStore); } - + println!("appended view ledger genesis block"); let view_ledger_height = res.unwrap(); self .apply_view_change( - &existing_endorsers, + &old_endorsers, &new_endorsers, &tail, &view_ledger_genesis_block, @@ -1539,6 +1869,19 @@ impl CoordinatorState { .await } + /// Applies the view change to the verifier state. + /// + /// # Arguments + /// + /// * `existing_endorsers` - The existing endorsers. + /// * `new_endorsers` - The new endorsers. + /// * `view_ledger_entry` - The view ledger entry. + /// * `view_ledger_genesis_block` - The genesis block of the view ledger. + /// * `view_ledger_height` - The height of the view ledger. + /// + /// # Returns + /// + /// A result indicating success or a `CoordinatorError`. async fn apply_view_change( &self, existing_endorsers: &EndorserHostnames, @@ -1564,7 +1907,7 @@ impl CoordinatorState { match res { Ok(metablock) => metablock, Err(_e) => { - eprintln!("faield to retrieve metablock from view receipts"); + eprintln!("failed to retrieve metablock from view receipts"); return Err(CoordinatorError::UnexpectedError); }, } @@ -1674,6 +2017,8 @@ impl CoordinatorState { &receipts, ) .await; + // TODO: Change this line? Would allow to use a smaller quorum if not enough eligible endorsers + // are available if num_verified_endorsers * 2 <= new_endorsers.len() { eprintln!( "insufficient verified endorsers {} * 2 <= {}", @@ -1701,11 +2046,23 @@ impl CoordinatorState { Ok(()) } + /// Resets the ledger store. pub async fn reset_ledger_store(&self) { let res = self.ledger_store.reset_store().await; assert!(res.is_ok()); } + /// Creates a new ledger with the given handle and block. + /// + /// # Arguments + /// + /// * `endorsers_opt` - An optional vector of endorsers. + /// * `handle_bytes` - The handle of the ledger. + /// * `block_bytes` - The block to add to the ledger. + /// + /// # Returns + /// + /// A result containing the receipts or a `CoordinatorError`. pub async fn create_ledger( &self, endorsers_opt: Option>>, @@ -1763,6 +2120,18 @@ impl CoordinatorState { Ok(receipts) } + /// Appends a block to the ledger with the given handle, block, and expected height. + /// + /// # Arguments + /// + /// * `endorsers_opt` - An optional vector of endorsers. + /// * `handle_bytes` - The handle of the ledger. + /// * `block_bytes` - The block to append to the ledger. + /// * `expected_height` - The expected height of the ledger. + /// + /// # Returns + /// + /// A result containing the hash of the nonces and the receipts or a `CoordinatorError`. pub async fn append_ledger( &self, endorsers_opt: Option>>, @@ -1861,6 +2230,16 @@ impl CoordinatorState { } } + /// Reads the tail of the ledger with the given handle and nonce. + /// + /// # Arguments + /// + /// * `handle_bytes` - The handle of the ledger. + /// * `nonce_bytes` - The nonce to use for reading the ledger tail. + /// + /// # Returns + /// + /// A result containing the ledger entry or a `CoordinatorError`. pub async fn read_ledger_tail( &self, handle_bytes: &[u8], @@ -1920,6 +2299,16 @@ impl CoordinatorState { } } + /// Reads a block from the ledger by index. + /// + /// # Arguments + /// + /// * `handle_bytes` - The handle of the ledger. + /// * `index` - The index of the block to read. + /// + /// # Returns + /// + /// A result containing the ledger entry or a `CoordinatorError`. pub async fn read_ledger_by_index( &self, handle_bytes: &[u8], @@ -1939,6 +2328,15 @@ impl CoordinatorState { } } + /// Reads a block from the view ledger by index. + /// + /// # Arguments + /// + /// * `index` - The index of the block to read. + /// + /// # Returns + /// + /// A result containing the ledger entry or a `CoordinatorError`. pub async fn read_view_by_index(&self, index: usize) -> Result { let ledger_entry = { let res = self.ledger_store.read_view_ledger_by_index(index).await; @@ -1951,6 +2349,11 @@ impl CoordinatorState { Ok(ledger_entry) } + /// Reads the tail of the view ledger. + /// + /// # Returns + /// + /// A result containing the ledger entry, height, and attestation string or a `CoordinatorError`. pub async fn read_view_tail(&self) -> Result<(LedgerEntry, usize, Vec), CoordinatorError> { let res = self.ledger_store.read_view_ledger_tail().await; if let Err(error) = res { @@ -1964,4 +2367,292 @@ impl CoordinatorState { let (ledger_entry, height) = res.unwrap(); Ok((ledger_entry, height, ATTESTATION_STR.as_bytes().to_vec())) } + + /// Pings all endorsers. + pub async fn ping_all_endorsers(self: Arc) { + println!("Pinging all endorsers from coordinator_state"); + let hostnames = self.get_endorser_hostnames(); + let (mpsc_tx, mut mpsc_rx) = mpsc::channel(ENDORSER_MPSC_CHANNEL_BUFFER); + + for (pk, hostname) in hostnames { + let tx = mpsc_tx.clone(); + let endorser = hostname.clone(); + let endorser_key = pk.clone(); + let conn_map = self.conn_map.clone(); + let self_c = self.clone(); + + let _job = tokio::spawn(async move { + let nonce = generate_secure_nonce_bytes(16); // Nonce is a randomly generated with 16B length + // TODO: Save the nonce for replay protection + // Create a connection endpoint + + let endpoint = Endpoint::from_shared(endorser.to_string()); + match endpoint { + Ok(endpoint) => { + let endpoint = endpoint + .connect_timeout(Duration::from_secs(ENDORSER_CONNECT_TIMEOUT)) + .timeout(Duration::from_secs(ENDORSER_REQUEST_TIMEOUT.load(SeqCst))); + + match endpoint.connect().await { + Ok(channel) => { + let mut client = + endorser_proto::endorser_call_client::EndorserCallClient::new(channel); + + // Include the nonce in the request + let ping_req = endorser_proto::PingReq { + nonce: nonce.clone(), // Send the nonce in the request + ..Default::default() // Set other fields to their default values (in this case, none) + }; + + // Call the method with retry logic + let res = get_ping_with_retry(&mut client, ping_req).await; + match res { + Ok(resp) => { + let endorser_proto::PingResp { id_sig } = resp.into_inner(); + match IdSig::from_bytes(&id_sig) { + Ok(id_signature) => { + let id_pubkey = id_signature.get_id(); + if *id_pubkey != endorser_key { + let error_message = format!( + "Endorser public_key mismatch. Expected {:?}, got {:?}", + endorser_key, id_pubkey + ); + self_c + .endorser_ping_failed(endorser.clone(), &error_message, endorser_key) + .await; + return; + } + + // Verify the signature with the original nonce + if id_signature.verify(&nonce).is_ok() { + // TODO: Replace println with info + println!("Nonce match for endorser: {}", endorser); //HERE If the nonce matched + + if let Ok(mut conn_map_wr) = conn_map.write() { + if let Some(endorser_clients) = conn_map_wr.get_mut(&endorser_key) { + if endorser_clients.failures > 0 { + // Only update DEAD_ENDORSERS if endorser_client is part of the + // quorum and has previously been marked as unavailable + if endorser_clients.failures > MAX_FAILURES.load(SeqCst) + && matches!( + endorser_clients.usage_state, + EndorserUsageState::Active + ) + { + DEAD_ENDORSERS.fetch_sub(1, SeqCst); + } + println!( + "Endorser {} reconnected after {} tries", + endorser, endorser_clients.failures + ); + // Reset failures on success + endorser_clients.failures = 0; + // TODO: Replace println with info + } + } else { + eprintln!("Endorser key not found in conn_map"); + } + } else { + eprintln!("Failed to acquire write lock on conn_map"); + } + } else { + let error_message = format!( + "Nonce did not match. Expected {:?}, got {:?}", + nonce, id_signature + ); + self_c + .endorser_ping_failed(endorser.clone(), &error_message, endorser_key) + .await; + } + }, + Err(_) => { + let error_message = format!("Failed to decode IdSig."); + self_c + .endorser_ping_failed(endorser.clone(), &error_message, endorser_key) + .await; + }, + } + }, + Err(status) => { + let error_message = format!( + "Failed to connect to the endorser {}: {:?}.", + endorser, status + ); + self_c + .endorser_ping_failed(endorser.clone(), &error_message, endorser_key) + .await; + }, + } + }, + Err(err) => { + let error_message = + format!("Failed to connect to the endorser {}: {:?}.", endorser, err); + self_c + .endorser_ping_failed(endorser.clone(), &error_message, endorser_key) + .await; + }, + } + }, + Err(err) => { + error!( + "Failed to resolve the endorser host name {}: {:?}", + endorser, err + ); + if let Err(_) = tx + .send(( + endorser.clone(), + Err::< + ( + endorser_proto::endorser_call_client::EndorserCallClient, + Vec, + ), + CoordinatorError, + >(CoordinatorError::CannotResolveHostName), + )) + .await + { + error!("Failed to send failure result for endorser: {}", endorser); + } + }, + } + }); + } + + drop(mpsc_tx); + + // Receive results from the channel and process them + while let Some((endorser, res)) = mpsc_rx.recv().await { + match res { + Ok((_client, _pk)) => { + // Process the client and public key + }, + Err(_) => { + // TODO: Call endorser refresh for "client" + error!("Endorser {} needs to be refreshed", endorser); + }, + } + } + } + + /// Handles the failure of an endorser ping. + /// + /// # Arguments + /// + /// * `endorser` - The endorser that failed to respond. + /// * `error_message` - The error message. + /// * `endorser_key` - The public key of the endorser. + pub async fn endorser_ping_failed( + self: Arc, + endorser: String, + error_message: &str, + endorser_key: Vec, + ) { + if let Ok(mut conn_map_wr) = self.conn_map.write() { + if let Some(endorser_clients) = conn_map_wr.get_mut(&endorser_key) { + // Increment the failures count + endorser_clients.failures += 1; + } else { + eprintln!("Endorser key not found in conn_map"); + } + } else { + eprintln!("Failed to acquire write lock on conn_map"); + } + + let mut alive_endorser_percentage = 100; + + if let Ok(conn_map_r) = self.conn_map.read() { + if let Some(endorser_clients) = conn_map_r.get(&endorser_key) { + // Log the failure + // TODO: Replace with warn! + println!( + "Ping failed for endorser {}. {} pings failed.\n{}", + endorser, endorser_clients.failures, error_message + ); + + // Only count towards allowance if it first crosses the boundary + if matches!(endorser_clients.usage_state, EndorserUsageState::Active) + && endorser_clients.failures >= MAX_FAILURES.load(SeqCst) + 1 + { + // Increment dead endorser count + if matches!(endorser_clients.usage_state, EndorserUsageState::Active) + && endorser_clients.failures == MAX_FAILURES.load(SeqCst) + 1 + { + DEAD_ENDORSERS.fetch_add(1, SeqCst); + } + + println!( + "Active endorser {} failed more than {} times! Now {} endorsers are dead.", + endorser, + MAX_FAILURES.load(SeqCst), + DEAD_ENDORSERS.load(SeqCst) + ); + + let active_endorsers_count = conn_map_r + .values() + .filter(|&e| matches!(e.usage_state, EndorserUsageState::Active)) + .count(); + let dead_endorsers_count = DEAD_ENDORSERS.load(SeqCst); + println!("Debug: active_endorsers_count = {}", active_endorsers_count); + println!("Debug: dead_endorsers_count = {}", dead_endorsers_count); + alive_endorser_percentage = 100 - ((dead_endorsers_count * 100) / active_endorsers_count); + println!("Debug: {} % alive", alive_endorser_percentage); + } + } else { + eprintln!("Endorser key not found in conn_map"); + } + } else { + eprintln!("Failed to acquire read lock on conn_map"); + } + + println!( + "Debug: {} % alive before replace trigger", + alive_endorser_percentage + ); + } + + /// Gets the timeout map for the endorsers. + /// + /// # Returns + /// + /// A result containing the timeout map or a `CoordinatorError`. + pub fn get_timeout_map(&self) -> Result, CoordinatorError> { + if let Ok(conn_map_rd) = self.conn_map.read() { + let mut timeout_map = HashMap::new(); + for (_pk, endorser_clients) in conn_map_rd.iter() { + // Convert Vec to String (assuming UTF-8 encoding) + timeout_map.insert(endorser_clients.uri.clone(), endorser_clients.failures); + } + Ok(timeout_map) + } else { + eprintln!("Failed to acquire read lock on conn_map"); + Err(CoordinatorError::FailedToGetTimeoutMap) + } + } + + /// Overwrites the configuration variables. + /// + /// # Arguments + /// + /// * `max_failures` - The maximum number of failures allowed. + /// * `request_timeout` - The request timeout in seconds. + /// * `min_alive_percentage` - The minimum percentage of alive endorsers. + /// * `quorum_size` - The desired quorum size. + /// * `ping_interval` - The interval for pinging endorsers in seconds. + /// * `deactivate_auto_reconfig` - Whether to deactivate auto reconfiguration. + pub fn overwrite_variables( + &mut self, + max_failures: u64, + request_timeout: u64, + ping_interval: u32, + ) { + MAX_FAILURES.store(max_failures, SeqCst); + ENDORSER_REQUEST_TIMEOUT.store(request_timeout, SeqCst); + PING_INTERVAL.store(ping_interval, SeqCst); + } +} + +fn generate_secure_nonce_bytes(size: usize) -> Vec { + let mut rng = rand::thread_rng(); + let nonce: Vec = (0..size).map(|_| rng.gen()).collect(); + nonce } diff --git a/coordinator/src/errors.rs b/coordinator/src/errors.rs index d6e25c5..60adde3 100644 --- a/coordinator/src/errors.rs +++ b/coordinator/src/errors.rs @@ -64,4 +64,6 @@ pub enum CoordinatorError { FailedToObtainQuorum, /// returned if failed to verify view change FailedToActivate, + /// returned if get timeout map fails + FailedToGetTimeoutMap, } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index d86487f..ff8ed29 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -3,9 +3,14 @@ mod errors; use crate::coordinator_state::CoordinatorState; use ledger::CustomSerde; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }, +}; use tonic::{transport::Server, Request, Response, Status}; - #[allow(clippy::derive_partial_eq_without_eq)] pub mod coordinator_proto { tonic::include_proto!("coordinator_proto"); @@ -14,7 +19,8 @@ pub mod coordinator_proto { use clap::{App, Arg}; use coordinator_proto::{ call_server::{Call, CallServer}, - AppendReq, AppendResp, NewLedgerReq, NewLedgerResp, ReadByIndexReq, ReadByIndexResp, + AddEndorsersReq, AddEndorsersResp, AppendReq, AppendResp, GetTimeoutMapReq, GetTimeoutMapResp, + NewLedgerReq, NewLedgerResp, PingAllReq, PingAllResp, ReadByIndexReq, ReadByIndexResp, ReadLatestReq, ReadLatestResp, ReadViewByIndexReq, ReadViewByIndexResp, ReadViewTailReq, ReadViewTailResp, }; @@ -30,11 +36,14 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use tower::ServiceBuilder; +static DEACTIVATE_AUTO_RECONFIG: AtomicBool = AtomicBool::new(false); + pub struct CoordinatorServiceState { state: Arc, } impl CoordinatorServiceState { + /// Creates a new instance of `CoordinatorServiceState`. pub fn new(coordinator: Arc) -> Self { CoordinatorServiceState { state: coordinator } } @@ -47,6 +56,7 @@ impl CoordinatorServiceState { #[tonic::async_trait] impl Call for CoordinatorServiceState { + /// Creates a new ledger with the given handle and block. async fn new_ledger( &self, req: Request, @@ -71,6 +81,7 @@ impl Call for CoordinatorServiceState { Ok(Response::new(reply)) } + /// Appends a block to the ledger with the given handle, block, and expected height. async fn append(&self, request: Request) -> Result, Status> { let AppendReq { handle: handle_bytes, @@ -95,6 +106,7 @@ impl Call for CoordinatorServiceState { Ok(Response::new(reply)) } + /// Reads the latest block from the ledger with the given handle and nonce. async fn read_latest( &self, request: Request, @@ -122,6 +134,7 @@ impl Call for CoordinatorServiceState { Ok(Response::new(reply)) } + /// Reads a block from the ledger by index. async fn read_by_index( &self, request: Request, @@ -148,6 +161,7 @@ impl Call for CoordinatorServiceState { } } + /// Reads a block from the view ledger by index. async fn read_view_by_index( &self, request: Request, @@ -168,6 +182,7 @@ impl Call for CoordinatorServiceState { Ok(Response::new(reply)) } + /// Reads the tail of the view ledger. async fn read_view_tail( &self, _request: Request, @@ -187,6 +202,58 @@ impl Call for CoordinatorServiceState { Ok(Response::new(reply)) } + + /// Pings all endorsers. + async fn ping_all_endorsers( + &self, + _request: Request, // Accept the gRPC request + ) -> Result, Status> { + // Call the state method to perform the ping task (no return value) + println!("Pining all endorsers now from main.rs"); + self.state.clone().ping_all_endorsers().await; + + // Construct and return the PingAllResp + let reply = PingAllResp {}; + + // Return the response + Ok(Response::new(reply)) + } + + /// Gets the timeout map from the coordinator. + async fn get_timeout_map( + &self, + _request: Request, + ) -> Result, Status> { + let res = self.state.get_timeout_map(); + + if res.is_err() { + return Err(Status::aborted("Failed to get the timeout map")); + } + + let res = res.unwrap(); + + let reply = GetTimeoutMapResp { timeout_map: res }; + + Ok(Response::new(reply)) + } + + /// Adds endorsers with the given URIs. + async fn add_endorsers( + &self, + request: Request, + ) -> Result, Status> { + let AddEndorsersReq { endorsers } = request.into_inner(); + + let endorsers_uris = endorsers + .split(';') + .filter(|e| !e.is_empty()) + .map(|e| e.to_string()) + .collect::>(); + + let _res = self.state.connect_endorsers(&endorsers_uris).await; + let reply = AddEndorsersResp {}; + Ok(Response::new(reply)) + } } #[derive(Debug, Serialize, Deserialize)] @@ -195,6 +262,7 @@ struct EndorserOpResponse { pub pk: String, } +/// Retrieves the public key of an endorser. async fn get_endorser( Path(uri): Path, Extension(state): Extension>, @@ -234,6 +302,7 @@ async fn get_endorser( } } +/// Adds a new endorser. async fn new_endorser( Path(uri): Path, Extension(state): Extension>, @@ -261,10 +330,14 @@ async fn new_endorser( .map(|e| e.to_string()) .collect::>(); - let res = state.replace_endorsers(&endorsers).await; - if res.is_err() { - eprintln!("failed to add the endorser ({:?})", res); - return (StatusCode::BAD_REQUEST, Json(json!({}))); + if DEACTIVATE_AUTO_RECONFIG.load(SeqCst) { + let res = state.replace_endorsers(&endorsers).await; + if res.is_err() { + eprintln!("failed to add the endorser ({:?})", res); + return (StatusCode::BAD_REQUEST, Json(json!({}))); + } + } else { + let _res = state.connect_endorsers(&endorsers).await; } let pks = state.get_endorser_pks(); @@ -278,6 +351,7 @@ async fn new_endorser( (StatusCode::OK, Json(json!(resp))) } +/// Deletes an existing endorser. async fn delete_endorser( Path(uri): Path, Extension(state): Extension>, @@ -322,6 +396,25 @@ async fn delete_endorser( (StatusCode::OK, Json(json!(resp))) } +/// Retrieves the timeout map of endorsers. +async fn get_timeout_map(Extension(state): Extension>) -> impl IntoResponse { + let res = state.get_timeout_map(); + if res.is_err() { + eprintln!("failed to get the timeout map ({:?})", res); + return (StatusCode::BAD_REQUEST, Json(json!({}))); + } + return (StatusCode::OK, Json(json!(res.unwrap()))); +} + +/// Pings all endorsers. +async fn ping_all_endorsers( + Extension(state): Extension>, +) -> impl IntoResponse { + let _res = state.ping_all_endorsers(); + return (StatusCode::OK, Json(json!({}))); +} + +/// Main function to start the coordinator service. #[tokio::main] async fn main() -> Result<(), Box> { let config = App::new("coordinator") @@ -395,6 +488,34 @@ async fn main() -> Result<(), Box> { .long("channels") .takes_value(true) .help("The number of grpc channels"), + ) + .arg( + Arg::with_name("max_failures") + .short("f") + .long("max-failures") + .value_name("COUNT") + .help( + "Sets the maximum number of allowed ping failures before an endorser is declared dead", + ) + .takes_value(true) + .default_value("3"), + ) + .arg( + Arg::with_name("request_timeout") + .long("request-timeout") + .value_name("SECONDS") + .help("Sets the request timeout in seconds before a ping is considered failed") + .takes_value(true) + .default_value("10"), + ) + .arg( + Arg::with_name("ping_inverval") + .short("i") + .long("ping-interval") + .value_name("SEC") + .help("How often to ping endorsers in seconds") + .takes_value(true) + .default_value("10"), ); let cli_matches = config.get_matches(); @@ -404,6 +525,21 @@ async fn main() -> Result<(), Box> { let store = cli_matches.value_of("store").unwrap(); let addr = format!("{}:{}", hostname, port_number).parse()?; let str_vec: Vec<&str> = cli_matches.values_of("endorser").unwrap().collect(); + + let max_failures_str = cli_matches.value_of("max_failures").unwrap(); + let max_failures = max_failures_str.parse::().unwrap_or(5).max(1); + + let request_timeout_str = cli_matches.value_of("request_timeout").unwrap(); + let request_timeout = request_timeout_str.parse::().unwrap_or(12).max(1); + + let ping_interval_str = cli_matches.value_of("ping_inverval").unwrap(); + let ping_interval = ping_interval_str.parse::().unwrap_or(10).max(1); + + println!( + "Coordinator starting with max_failures: {}, request_timeout: {}", + max_failures, request_timeout + ); + let endorser_hostnames = str_vec .iter() .filter(|e| !e.is_empty()) @@ -434,6 +570,9 @@ async fn main() -> Result<(), Box> { let res = CoordinatorState::new(store, &ledger_store_args, num_grpc_channels).await; assert!(res.is_ok()); let coordinator = res.unwrap(); + let mut mutcoordinator = coordinator.clone(); + + mutcoordinator.overwrite_variables(max_failures, request_timeout, ping_interval); if !endorser_hostnames.is_empty() { let _ = coordinator.replace_endorsers(&endorser_hostnames).await; @@ -447,9 +586,15 @@ async fn main() -> Result<(), Box> { let server = CoordinatorServiceState::new(coordinator_ref.clone()); + println!("Pinging all Endorsers method called from main.rs"); + coordinator_ref.clone().ping_all_endorsers().await; + + coordinator_ref.clone().start_auto_scheduler().await; // Start the REST server for management let control_server = Router::new() .route("/endorsers/:uri", get(get_endorser).put(new_endorser).delete(delete_endorser)) + .route("/pingallendorsers", get(ping_all_endorsers)) + .route("/timeoutmap", get(get_timeout_map)) // Add middleware to all routes .layer( ServiceBuilder::new() @@ -483,8 +628,9 @@ async fn main() -> Result<(), Box> { mod tests { use crate::{ coordinator_proto::{ - call_server::Call, AppendReq, AppendResp, NewLedgerReq, NewLedgerResp, ReadByIndexReq, - ReadByIndexResp, ReadLatestReq, ReadLatestResp, ReadViewTailReq, ReadViewTailResp, + call_server::Call, AppendReq, AppendResp, NewLedgerReq, NewLedgerResp, PingAllReq, + ReadByIndexReq, ReadByIndexResp, ReadLatestReq, ReadLatestResp, ReadViewTailReq, + ReadViewTailResp, }, CoordinatorServiceState, CoordinatorState, }; @@ -611,19 +757,19 @@ mod tests { // Launch the endorser let endorser = launch_endorser(&endorser_cmd, endorser_args.clone()); - + println!("Endorser started"); // Create the coordinator let coordinator = Arc::new( CoordinatorState::new(&store, &ledger_store_args, None) .await .unwrap(), ); - + println!("Coordinator started"); let res = coordinator .replace_endorsers(&["http://[::1]:9090".to_string()]) .await; assert!(res.is_ok()); - + println!("Endorser replaced"); let server = CoordinatorServiceState::new(coordinator); // Initialization: Fetch view ledger to build VerifierState @@ -774,6 +920,8 @@ mod tests { let endorser_args3 = endorser_args.clone() + " -p 9093"; let endorser3 = launch_endorser(&endorser_cmd, endorser_args3); + println!("2 more Endorsers started"); + let res = server .get_state() .replace_endorsers(&[ @@ -964,6 +1112,8 @@ mod tests { let endorser_args6 = endorser_args.clone() + " -p 9096"; let endorser6 = launch_endorser(&endorser_cmd, endorser_args6); + println!("3 more Endorsers started"); + let res = server .get_state() .replace_endorsers(&[ @@ -1190,4 +1340,126 @@ mod tests { println!("endorser5 process ID is {}", endorser5.child.id()); println!("endorser6 process ID is {}", endorser6.child.id()); } + + #[tokio::test] + #[ignore] + async fn test_ping() { + if std::env::var_os("ENDORSER_CMD").is_none() { + panic!("The ENDORSER_CMD environment variable is not specified"); + } + let endorser_cmd = { + match std::env::var_os("ENDORSER_CMD") { + None => panic!("The ENDORSER_CMD environment variable is not specified"), + Some(x) => x, + } + }; + + let endorser_args = { + match std::env::var_os("ENDORSER_ARGS") { + None => String::from(""), + Some(x) => x.into_string().unwrap(), + } + }; + + let store = { + match std::env::var_os("LEDGER_STORE") { + None => String::from("memory"), + Some(x) => x.into_string().unwrap(), + } + }; + + let mut ledger_store_args = HashMap::::new(); + if std::env::var_os("COSMOS_URL").is_some() { + ledger_store_args.insert( + String::from("COSMOS_URL"), + std::env::var_os("COSMOS_URL") + .unwrap() + .into_string() + .unwrap(), + ); + } + + if std::env::var_os("STORAGE_ACCOUNT").is_some() { + ledger_store_args.insert( + String::from("STORAGE_ACCOUNT"), + std::env::var_os("STORAGE_ACCOUNT") + .unwrap() + .into_string() + .unwrap(), + ); + } + + if std::env::var_os("STORAGE_MASTER_KEY").is_some() { + ledger_store_args.insert( + String::from("STORAGE_MASTER_KEY"), + std::env::var_os("STORAGE_MASTER_KEY") + .unwrap() + .into_string() + .unwrap(), + ); + } + + if std::env::var_os("NIMBLE_DB").is_some() { + ledger_store_args.insert( + String::from("NIMBLE_DB"), + std::env::var_os("NIMBLE_DB") + .unwrap() + .into_string() + .unwrap(), + ); + } + + if std::env::var_os("NIMBLE_FSTORE_DIR").is_some() { + ledger_store_args.insert( + String::from("NIMBLE_FSTORE_DIR"), + std::env::var_os("NIMBLE_FSTORE_DIR") + .unwrap() + .into_string() + .unwrap(), + ); + } + + // Launch the endorser + let _endorser = launch_endorser(&endorser_cmd, endorser_args.clone()); + println!("Endorser started"); + // Create the coordinator + let coordinator = Arc::new( + CoordinatorState::new(&store, &ledger_store_args, None) + .await + .unwrap(), + ); + println!("Coordinator started"); + let res = coordinator + .replace_endorsers(&["http://[::1]:9090".to_string()]) + .await; + assert!(res.is_ok()); + println!("Endorser replaced"); + let server = CoordinatorServiceState::new(coordinator); + + // Print the whole timeout_map from the coordinator state + let timeout_map = server.get_state().get_timeout_map(); + println!("Timeout Map: {:?}", timeout_map); + + // Print the whole timeout_map from the coordinator state again + let req = tonic::Request::new(PingAllReq {}); + let res = server.ping_all_endorsers(req).await; + assert!(res.is_ok()); + let timeout_map = server.get_state().get_timeout_map(); + println!("Timeout Map after waiting: {:?}", timeout_map); + + let _ = Command::new("pkill") + .arg("-f") + .arg("endorser") + .status() + .expect("failed to execute process"); + + let req1 = tonic::Request::new(PingAllReq {}); + let res1 = server.ping_all_endorsers(req1).await; + assert!(res1.is_ok()); + let timeout_map = server.get_state().get_timeout_map(); + println!( + "Timeout Map after waiting and killing process: {:?}", + timeout_map + ); + } } diff --git a/coordinator_ctrl/src/main.rs b/coordinator_ctrl/src/main.rs index 7a51363..b217a0f 100644 --- a/coordinator_ctrl/src/main.rs +++ b/coordinator_ctrl/src/main.rs @@ -9,6 +9,7 @@ struct EndorserOpResponse { pub pk: String, } +/// Main function to start the coordinator control client. #[tokio::main] async fn main() { let config = App::new("client") @@ -17,7 +18,7 @@ async fn main() { .short("c") .long("coordinator") .help("The hostname of the coordinator") - .default_value("http://127.0.0.1:8090"), + .default_value("http://localhost:8090"), ) .arg( Arg::with_name("add") @@ -39,12 +40,25 @@ async fn main() { .long("get") .takes_value(true) .help("Endorser to read"), + ) + .arg( + Arg::with_name("gettimeoutmap") + .long("gettimeoutmap") + .help("Get the timeout map of endorsers") + .takes_value(false), + ) + .arg( + Arg::with_name("pingallendorsers") + .long("pingallendorsers") + .help("Ping all endorsers") + .takes_value(false), ); let cli_matches = config.get_matches(); let coordinator_addr = cli_matches.value_of("coordinator").unwrap(); let client = reqwest::Client::new(); + // Adds a new endorser. if let Some(x) = cli_matches.value_of("add") { let uri = base64_url::encode(&x); let endorser_url = @@ -66,6 +80,8 @@ async fn main() { }, } } + + // Deletes an existing endorser. if let Some(x) = cli_matches.value_of("delete") { let uri = base64_url::encode(&x); let endorser_url = @@ -83,6 +99,8 @@ async fn main() { }, } } + + // Retrieves information about an endorser. if let Some(x) = cli_matches.value_of("get") { let uri = base64_url::encode(&x); let endorser_url = @@ -100,4 +118,36 @@ async fn main() { }, } } + + // Retrieves the timeout map of endorsers. + if cli_matches.is_present("gettimeoutmap") { + let endorser_url = reqwest::Url::parse(&format!("{}/timeoutmap", coordinator_addr)).unwrap(); + let res = client.get(endorser_url).send().await; + match res { + Ok(resp) => { + assert!(resp.status() == reqwest::StatusCode::OK); + let timeout_map: serde_json::Value = resp.json().await.unwrap(); + println!("Timeout map: {:?}", timeout_map); + }, + Err(error) => { + eprintln!("get_timeout_map failed: {:?}", error); + }, + } + } + + // Pings all endorsers. + if cli_matches.is_present("pingallendorsers") { + let endorser_url = reqwest::Url::parse(&format!("{}/pingallendorsers", coordinator_addr)).unwrap(); + let res = client.get(endorser_url).send().await; + match res { + Ok(resp) => { + assert!(resp.status() == reqwest::StatusCode::OK); + let ping_results: serde_json::Value = resp.json().await.unwrap(); + println!("Ping all endorsers: {:?}", ping_results); + }, + Err(error) => { + eprintln!("ping_all_endorsers failed: {:?}", error); + }, + } + } } diff --git a/endorser/src/endorser_state.rs b/endorser/src/endorser_state.rs index 00c0fcd..332be73 100644 --- a/endorser/src/endorser_state.rs +++ b/endorser/src/endorser_state.rs @@ -45,6 +45,7 @@ pub struct EndorserState { } impl EndorserState { + /// Creates a new instance of `EndorserState`. pub fn new() -> Self { let private_key = PrivateKey::new(); let public_key = private_key.get_public_key().unwrap(); @@ -62,6 +63,19 @@ impl EndorserState { } } + /// Initializes the state of the endorser. + /// + /// # Arguments + /// + /// * `group_identity` - The group identity of the endorser. + /// * `ledger_tail_map` - The ledger tail map. + /// * `view_ledger_tail_metablock` - The tail metablock of the view ledger. + /// * `block_hash` - The hash of the block. + /// * `expected_height` - The expected height of the ledger. + /// + /// # Returns + /// + /// A result containing a receipt or an `EndorserError`. pub fn initialize_state( &self, group_identity: &NimbleDigest, @@ -106,6 +120,17 @@ impl EndorserState { } } + /// Creates a new ledger with the given handle, block hash, and block. + /// + /// # Arguments + /// + /// * `handle` - The handle of the ledger. + /// * `block_hash` - The hash of the block. + /// * `block` - The block to add to the ledger. + /// + /// # Returns + /// + /// A result containing a receipt or an `EndorserError`. pub fn new_ledger( &self, handle: &NimbleDigest, @@ -155,6 +180,16 @@ impl EndorserState { } } + /// Reads the latest block from the ledger with the given handle and nonce. + /// + /// # Arguments + /// + /// * `handle` - The handle of the ledger. + /// * `nonce` - The nonce to use for reading the latest block. + /// + /// # Returns + /// + /// A result containing a tuple of receipt, block, and nonces or an `EndorserError`. pub fn read_latest( &self, handle: &NimbleDigest, @@ -206,6 +241,15 @@ impl EndorserState { } } + /// Gets the height of the ledger with the given handle. + /// + /// # Arguments + /// + /// * `handle` - The handle of the ledger. + /// + /// # Returns + /// + /// A result containing the height of the ledger or an `EndorserError`. pub fn get_height(&self, handle: &NimbleDigest) -> Result { if let Ok(view_ledger_state) = self.view_ledger_state.read() { match view_ledger_state.endorser_mode { @@ -237,6 +281,19 @@ impl EndorserState { } } + /// Appends a block to the ledger with the given handle, block hash, expected height, block, and nonces. + /// + /// # Arguments + /// + /// * `handle` - The handle of the ledger. + /// * `block_hash` - The hash of the block. + /// * `expected_height` - The expected height of the ledger. + /// * `block` - The block to append to the ledger. + /// * `nonces` - The nonces to use for appending the block. + /// + /// # Returns + /// + /// A result containing a receipt or an `EndorserError`. pub fn append( &self, handle: &NimbleDigest, @@ -307,10 +364,27 @@ impl EndorserState { } } + /// Retrieves the public key of the endorser. + /// + /// # Returns + /// + /// The public key of the endorser. pub fn get_public_key(&self) -> PublicKey { self.public_key.clone() } + /// Appends a block to the view ledger. + /// + /// # Arguments + /// + /// * `view_ledger_state` - The state of the view ledger. + /// * `ledger_tail_map` - The ledger tail map. + /// * `block_hash` - The hash of the block. + /// * `expected_height` - The expected height of the ledger. + /// + /// # Returns + /// + /// A result containing a receipt or an `EndorserError`. fn append_view_ledger( &self, view_ledger_state: &mut ViewLedgerState, @@ -351,6 +425,16 @@ impl EndorserState { Ok(self.sign_view_ledger(view_ledger_state, ledger_tail_map)) } + /// Signs the view ledger. + /// + /// # Arguments + /// + /// * `view_ledger_state` - The state of the view ledger. + /// * `ledger_tail_map` - The ledger tail map. + /// + /// # Returns + /// + /// A receipt. fn sign_view_ledger( &self, view_ledger_state: &ViewLedgerState, @@ -370,6 +454,11 @@ impl EndorserState { ) } + /// Constructs the ledger tail map. + /// + /// # Returns + /// + /// A result containing the ledger tail map or an `EndorserError`. fn construct_ledger_tail_map(&self) -> Result, EndorserError> { let mut ledger_tail_map = Vec::new(); if let Ok(ledger_tail_map_rd) = self.ledger_tail_map.read() { @@ -393,6 +482,16 @@ impl EndorserState { Ok(ledger_tail_map) } + /// Finalizes the state of the endorser. + /// + /// # Arguments + /// + /// * `block_hash` - The hash of the block. + /// * `expected_height` - The expected height of the ledger. + /// + /// # Returns + /// + /// A result containing a tuple of receipt and ledger tail map or an `EndorserError`. pub fn finalize_state( &self, block_hash: &NimbleDigest, @@ -426,6 +525,11 @@ impl EndorserState { } } + /// Reads the current state of the endorser. + /// + /// # Returns + /// + /// A result containing a tuple of receipt, endorser mode, and ledger tail map or an `EndorserError`. pub fn read_state( &self, ) -> Result<(Receipt, EndorserMode, Vec), EndorserError> { @@ -442,6 +546,19 @@ impl EndorserState { } } + /// Activates the endorser with the given parameters. + /// + /// # Arguments + /// + /// * `old_config` - The old configuration. + /// * `new_config` - The new configuration. + /// * `ledger_tail_maps` - The ledger tail maps. + /// * `ledger_chunks` - The ledger chunks. + /// * `receipts` - The receipts. + /// + /// # Returns + /// + /// A result indicating success or an `EndorserError`. pub fn activate( &self, old_config: &[u8], @@ -485,6 +602,33 @@ impl EndorserState { Err(EndorserError::FailedToAcquireViewLedgerWriteLock) } } + + /// Pings the endorser with the given nonce. + /// + /// # Arguments + /// + /// * `nonce` - The nonce to use for pinging the endorser. + /// + /// # Returns + /// + /// A result containing an `IdSig` or an `EndorserError`. + pub fn ping(&self, nonce: &[u8]) -> Result { + println!("Pinged Endorser"); + if let Ok(view_ledger_state) = self.view_ledger_state.read() { + match view_ledger_state.endorser_mode { + EndorserMode::Finalized => { + // If finalized then there is no key for signing + return Err(EndorserError::AlreadyFinalized); + }, + _ => {}, + } + let signature = self.private_key.sign(&nonce).unwrap(); + let id_sig = IdSig::new(self.public_key.clone(), signature); + Ok(id_sig) + } else { + Err(EndorserError::FailedToAcquireViewLedgerReadLock) + } + } } #[cfg(test)] @@ -493,7 +637,7 @@ mod tests { use rand::Rng; #[test] - pub fn check_endorser_new_ledger_and_get_tail() { + pub fn check_endorser_new_ledger_and_greceiptet_tail() { let endorser_state = EndorserState::new(); // The coordinator sends the hashed contents of the configuration to the endorsers @@ -729,4 +873,54 @@ mod tests { panic!("Signature verification failed when it should not have failed"); } } + + #[test] + pub fn check_ping() { + let endorser_state = EndorserState::new(); + + // The coordinator sends the hashed contents of the configuration to the endorsers + // We will pick a dummy view value for testing purposes + let view_block_hash = { + let t = rand::thread_rng().gen::<[u8; 32]>(); + let n = NimbleDigest::from_bytes(&t); + assert!(n.is_ok(), "This should not have occured"); + n.unwrap() + }; + + // perform a checked addition of height with 1 + let height_plus_one = { + let res = endorser_state + .view_ledger_state + .read() + .expect("failed") + .view_ledger_tail_metablock + .get_height() + .checked_add(1); + assert!(res.is_some()); + res.unwrap() + }; + + // The coordinator initializes the endorser by calling initialize_state + let res = endorser_state.initialize_state( + &view_block_hash, + &Vec::new(), + &MetaBlock::default(), + &view_block_hash, + height_plus_one, + ); + assert!(res.is_ok()); + + // Set the endorser mode directly + endorser_state + .view_ledger_state + .write() + .expect("failed to acquire write lock") + .endorser_mode = ledger::endorser_proto::EndorserMode::Active; + + let nonce = rand::thread_rng().gen::<[u8; 32]>(); + let result = endorser_state.ping(&nonce); + assert!(result.is_ok(), "Ping should be successful when endorser_state is active"); + let id_sig = result.unwrap(); + assert!(id_sig.verify(&nonce).is_ok(), "Signature verification failed"); + } } diff --git a/endorser/src/main.rs b/endorser/src/main.rs index 56071d2..0c0d5e4 100644 --- a/endorser/src/main.rs +++ b/endorser/src/main.rs @@ -12,7 +12,7 @@ use ledger::endorser_proto::{ endorser_call_server::{EndorserCall, EndorserCallServer}, ActivateReq, ActivateResp, AppendReq, AppendResp, FinalizeStateReq, FinalizeStateResp, GetPublicKeyReq, GetPublicKeyResp, InitializeStateReq, InitializeStateResp, NewLedgerReq, - NewLedgerResp, ReadLatestReq, ReadLatestResp, ReadStateReq, ReadStateResp, + NewLedgerResp, PingReq, PingResp, ReadLatestReq, ReadLatestResp, ReadStateReq, ReadStateResp, }; pub struct EndorserServiceState { @@ -20,12 +20,20 @@ pub struct EndorserServiceState { } impl EndorserServiceState { + /// Creates a new instance of `EndorserServiceState`. pub fn new() -> Self { EndorserServiceState { state: EndorserState::new(), } } + /// Processes an error and returns a corresponding gRPC `Status`. + /// + /// # Arguments + /// + /// * `error` - The error to process. + /// * `handle` - An optional handle associated with the error. + /// * `default_msg` - A default message to use if the error does not match any known cases. fn process_error( &self, error: EndorserError, @@ -50,7 +58,7 @@ impl EndorserServiceState { EndorserError::LedgerHeightOverflow => Status::out_of_range("Ledger height overflow"), EndorserError::InvalidTailHeight => Status::invalid_argument("Invalid ledger height"), EndorserError::AlreadyInitialized => { - Status::already_exists("Enodrser is already initialized") + Status::already_exists("Endorser is already initialized") }, EndorserError::NotInitialized => Status::unimplemented("Endorser is not initialized"), EndorserError::AlreadyFinalized => Status::unavailable("Endorser is already finalized"), @@ -67,6 +75,7 @@ impl Default for EndorserServiceState { #[tonic::async_trait] impl EndorserCall for EndorserServiceState { + /// Retrieves the public key of the endorser. async fn get_public_key( &self, _req: Request, @@ -80,6 +89,7 @@ impl EndorserCall for EndorserServiceState { Ok(Response::new(reply)) } + /// Creates a new ledger with the given handle, block hash, and block. async fn new_ledger( &self, req: Request, @@ -133,6 +143,7 @@ impl EndorserCall for EndorserServiceState { } } + /// Appends a block to the ledger with the given handle, block hash, expected height, block, and nonces. async fn append(&self, req: Request) -> Result, Status> { let AppendReq { handle, @@ -191,6 +202,7 @@ impl EndorserCall for EndorserServiceState { } } + /// Reads the latest block from the ledger with the given handle and nonce. async fn read_latest( &self, request: Request, @@ -225,6 +237,7 @@ impl EndorserCall for EndorserServiceState { } } + /// Finalizes the state of the endorser with the given block hash and expected height. async fn finalize_state( &self, req: Request, @@ -250,6 +263,7 @@ impl EndorserCall for EndorserServiceState { receipt: receipt.to_bytes().to_vec(), ledger_tail_map, }; + println!("Finalized endorser"); Ok(Response::new(reply)) }, Err(error) => { @@ -263,6 +277,7 @@ impl EndorserCall for EndorserServiceState { } } + /// Initializes the state of the endorser with the given parameters. async fn initialize_state( &self, req: Request, @@ -303,6 +318,7 @@ impl EndorserCall for EndorserServiceState { } } + /// Reads the current state of the endorser. async fn read_state( &self, _req: Request, @@ -329,6 +345,7 @@ impl EndorserCall for EndorserServiceState { } } + /// Activates the endorser with the given parameters. async fn activate(&self, req: Request) -> Result, Status> { let ActivateReq { old_config, @@ -361,8 +378,32 @@ impl EndorserCall for EndorserServiceState { }, } } + + /// Pings the endorser with the given nonce. + async fn ping(&self, req: Request) -> Result, Status> { + let PingReq { nonce } = req.into_inner(); + let res = self.state.ping(&nonce); + + match res { + Ok(id_sig) => { + let reply = PingResp { + id_sig: id_sig.to_bytes().to_vec(), + }; + Ok(Response::new(reply)) + }, + Err(e) => { + let status = self.process_error( + e, + None, + "Failed to compute signature due to an internal error", + ); + Err(status) + }, + } + } } +/// Main function to start the endorser service. #[tokio::main] async fn main() -> Result<(), Box> { let config = App::new("endorser") diff --git a/endpoint/src/errors.rs b/endpoint/src/errors.rs index a491d07..9b0c005 100644 --- a/endpoint/src/errors.rs +++ b/endpoint/src/errors.rs @@ -26,4 +26,10 @@ pub enum EndpointError { FailedToAcquireWriteLock, /// returned if the endpoint fails to apply view change FailedToApplyViewChange, + /// returned if the endpoint fails to get the timeout map + FailedToGetTimeoutMap, + /// returned if failed to ping all endorsers + FailedToPingAllEndorsers, + /// returned if failed to add endorsers + FailedToAddEndorsers, } diff --git a/endpoint/src/lib.rs b/endpoint/src/lib.rs index 114e6f9..40f9937 100644 --- a/endpoint/src/lib.rs +++ b/endpoint/src/lib.rs @@ -13,7 +13,7 @@ pub mod coordinator_proto { use crate::errors::EndpointError; use coordinator_proto::{ call_client::CallClient, AppendReq, AppendResp, NewLedgerReq, NewLedgerResp, ReadLatestReq, - ReadLatestResp, ReadViewByIndexReq, ReadViewByIndexResp, ReadViewTailReq, ReadViewTailResp, + ReadLatestResp, ReadViewByIndexReq, ReadViewByIndexResp, ReadViewTailReq, ReadViewTailResp, GetTimeoutMapReq, GetTimeoutMapResp, PingAllReq, PingAllResp, AddEndorsersReq, AddEndorsersResp }; use ledger::{ errors::VerificationError, @@ -22,8 +22,7 @@ use ledger::{ }; use rand::random; use std::{ - convert::TryFrom, - sync::{Arc, RwLock}, + collections::HashMap, convert::TryFrom, sync::{Arc, RwLock} }; #[allow(dead_code)] @@ -45,6 +44,7 @@ pub struct Connection { } impl Connection { + /// Creates a new connection to the coordinator. pub async fn new( coordinator_endpoint_address: String, num_grpc_channels_opt: Option, @@ -70,6 +70,7 @@ impl Connection { }) } + /// Creates a new ledger with the given handle and block. pub async fn new_ledger(&self, handle: &[u8], block: &[u8]) -> Result, EndpointError> { let req = Request::new(NewLedgerReq { handle: handle.to_vec(), @@ -87,6 +88,7 @@ impl Connection { Ok(receipts) } + /// Appends a block to the ledger with the given handle and expected height. pub async fn append( &self, handle: &[u8], @@ -113,6 +115,7 @@ impl Connection { Ok((hash_nonces, receipts)) } + /// Reads the latest block from the ledger with the given handle and nonce. pub async fn read_latest( &self, handle: &[u8], @@ -137,6 +140,7 @@ impl Connection { Ok((block, nonces, receipts)) } + /// Reads a block from the view ledger by index. pub async fn read_view_by_index( &self, index: usize, @@ -153,6 +157,7 @@ impl Connection { Ok((block, receipts)) } + /// Reads the tail of the view ledger. pub async fn read_view_tail(&self) -> Result<(Vec, Vec, usize, Vec), EndpointError> { let ReadViewTailResp { block, @@ -167,6 +172,50 @@ impl Connection { .into_inner(); Ok((block, receipts, height as usize, attestations)) } + + /// Gets the timeout map from the coordinator. + pub async fn get_timeout_map( + &self, + ) -> Result, EndpointError> { + let GetTimeoutMapResp { + timeout_map, + } = self.clients[random::() % self.num_grpc_channels] + .clone() + .get_timeout_map(GetTimeoutMapReq {}) + .await + .map_err(|_e| EndpointError::FailedToGetTimeoutMap)? + .into_inner(); + Ok(timeout_map) + } + + /// Pings all endorsers. + pub async fn ping_all_endorsers( + &self, + ) -> Result<(), EndpointError> { + let PingAllResp {} = self.clients[random::() % self.num_grpc_channels] + .clone() + .ping_all_endorsers(PingAllReq {}) + .await + .map_err(|_e| EndpointError::FailedToPingAllEndorsers)? + .into_inner(); + Ok(()) + } + + /// Adds endorsers with the given URI. + pub async fn add_endorsers( + &self, + uri: String, + ) -> Result<(), EndpointError> { + let AddEndorsersResp {} = self.clients[random::() % self.num_grpc_channels] + .clone() + .add_endorsers(AddEndorsersReq { + endorsers: uri, + }) + .await + .map_err(|_e| EndpointError::FailedToAddEndorsers)? + .into_inner(); + Ok(()) + } } pub struct EndpointState { @@ -191,6 +240,7 @@ pub enum SignatureFormat { } impl EndpointState { + /// Creates a new endpoint state. pub async fn new( hostname: String, pem_opt: Option, @@ -253,6 +303,7 @@ impl EndpointState { }) } + /// Gets the identity of the endpoint. pub fn get_identity( &self, pkformat: PublicKeyFormat, @@ -268,6 +319,7 @@ impl EndpointState { )) } + /// Updates the view of the endpoint. async fn update_view(&self) -> Result<(), EndpointError> { let start_height = { if let Ok(vs_rd) = self.vs.read() { @@ -302,6 +354,7 @@ impl EndpointState { Ok(()) } + /// Creates a new counter with the given handle, tag, and signature format. pub async fn new_counter( &self, handle: &[u8], @@ -389,6 +442,7 @@ impl EndpointState { Ok(signature) } + /// Increments the counter with the given handle, tag, expected counter, and signature format. pub async fn increment_counter( &self, handle: &[u8], @@ -485,6 +539,7 @@ impl EndpointState { Ok(signature) } + /// Reads the counter with the given handle, nonce, and signature format. pub async fn read_counter( &self, handle: &[u8], @@ -589,4 +644,62 @@ impl EndpointState { // respond to the light client Ok((tag.to_vec(), counter as u64, signature)) } + + /// Gets the timeout map from the coordinator. + pub async fn get_timeout_map( + &self + ) -> Result, EndpointError> { + + + let timeout_map = { + let res = self.conn.get_timeout_map().await; + + if res.is_err() { + return Err(EndpointError::FailedToGetTimeoutMap); + } + res.unwrap() + }; + + // respond to the light client + Ok(timeout_map) + } + + /// Pings all endorsers. + pub async fn ping_all_endorsers( + &self, + ) -> Result<(), EndpointError> { + + + let _block = { + let res = self.conn.ping_all_endorsers().await; + + if res.is_err() { + return Err(EndpointError::FailedToPingAllEndorsers); + } + res.unwrap() + }; + + // respond to the light client + Ok(()) + } + + /// Adds endorsers with the given URI. + pub async fn add_endorsers( + &self, + uri: String, + ) -> Result<(), EndpointError> { + + + let _block = { + let res = self.conn.add_endorsers(uri).await; + + if res.is_err() { + return Err(EndpointError::FailedToAddEndorsers); + } + res.unwrap() + }; + + // respond to the light client + Ok(()) + } } diff --git a/endpoint_rest/src/main.rs b/endpoint_rest/src/main.rs index d0f8b79..b2f3d84 100644 --- a/endpoint_rest/src/main.rs +++ b/endpoint_rest/src/main.rs @@ -4,7 +4,7 @@ use axum::{ extract::{Extension, Path, Query}, http::StatusCode, response::IntoResponse, - routing::get, + routing::{get, put}, Json, Router, }; use axum_server::tls_rustls::RustlsConfig; @@ -16,6 +16,7 @@ use clap::{App, Arg}; use serde::{Deserialize, Serialize}; +/// Main function to start the endpoint service. #[tokio::main] async fn main() -> Result<(), Box> { let config = App::new("endpoint") @@ -97,6 +98,9 @@ async fn main() -> Result<(), Box> { // Build our application by composing routes let app = Router::new() .route("/serviceid", get(get_identity)) + .route("/timeoutmap", get(get_timeout_map)) + .route("/pingallendorsers", get(ping_all_endorsers)) + .route("/addendorsers", put(add_endorsers)) .route("/counters/:handle", get(read_counter).put(new_counter).post(increment_counter)) // Add middleware to all routes .layer( @@ -133,6 +137,7 @@ async fn main() -> Result<(), Box> { Ok(()) } +/// Response structure for the get_identity endpoint. #[derive(Debug, Serialize, Deserialize)] struct GetIdentityResponse { #[serde(rename = "Identity")] @@ -141,18 +146,21 @@ struct GetIdentityResponse { pub pk: String, } +/// Request structure for the new_counter endpoint. #[derive(Debug, Serialize, Deserialize)] struct NewCounterRequest { #[serde(rename = "Tag")] pub tag: String, } +/// Response structure for the new_counter endpoint. #[derive(Debug, Serialize, Deserialize)] struct NewCounterResponse { #[serde(rename = "Signature")] pub signature: String, } +/// Request structure for the increment_counter endpoint. #[derive(Debug, Serialize, Deserialize)] struct IncrementCounterRequest { #[serde(rename = "Tag")] @@ -161,12 +169,14 @@ struct IncrementCounterRequest { pub expected_counter: u64, } +/// Response structure for the increment_counter endpoint. #[derive(Debug, Serialize, Deserialize)] struct IncrementCounterResponse { #[serde(rename = "Signature")] pub signature: String, } +/// Response structure for the read_counter endpoint. #[derive(Debug, Serialize, Deserialize)] struct ReadCounterResponse { #[serde(rename = "Tag")] @@ -177,6 +187,29 @@ struct ReadCounterResponse { pub signature: String, } +/// Response structure for the get_timeout_map endpoint. +#[derive(Debug, Serialize, Deserialize)] +struct GetTimeoutMapResp { + #[serde(rename = "timeout_map")] + pub timeout_map: HashMap, +} + +/// Response structure for the ping_all_endorsers endpoint. +#[derive(Debug, Serialize, Deserialize)] +struct PingAllResp { +} + +/// Response structure for the add_endorsers endpoint. +#[derive(Debug, Serialize, Deserialize)] +struct AddEndorsersResp { +} + +/// Request structure for the add_endorsers endpoint. +#[derive(Debug, Serialize, Deserialize)] +struct AddEndorsersRequest { +} + +/// Handler for the get_identity endpoint. async fn get_identity( Query(params): Query>, Extension(state): Extension>, @@ -203,6 +236,7 @@ async fn get_identity( (StatusCode::OK, Json(json!(resp))) } +/// Handler for the new_counter endpoint. async fn new_counter( Path(handle): Path, Json(req): Json, @@ -246,6 +280,7 @@ async fn new_counter( (StatusCode::OK, Json(json!(resp))) } +/// Handler for the read_counter endpoint. async fn read_counter( Path(handle): Path, Query(params): Query>, @@ -294,6 +329,7 @@ async fn read_counter( (StatusCode::OK, Json(json!(resp))) } +/// Handler for the increment_counter endpoint. async fn increment_counter( Path(handle): Path, Json(req): Json, @@ -338,3 +374,74 @@ async fn increment_counter( (StatusCode::OK, Json(json!(resp))) } + +/// Handler for the get_timeout_map endpoint. +async fn get_timeout_map( + Extension(state): Extension>, +) -> impl IntoResponse { + + let res = state.get_timeout_map().await; + if res.is_err() { + eprintln!("failed to get the timeout map"); + return (StatusCode::CONFLICT, Json(json!({}))); + } + let timeout_map = res.unwrap(); + + let resp = GetTimeoutMapResp { + timeout_map: timeout_map, + }; + + (StatusCode::OK, Json(json!(resp))) +} + +/// Handler for the ping_all_endorsers endpoint. +async fn ping_all_endorsers( + Extension(state): Extension>, +) -> impl IntoResponse { + + let res = state.ping_all_endorsers().await; + if res.is_err() { + eprintln!("failed to ping all endorsers"); + return (StatusCode::CONFLICT, Json(json!({}))); + } + + let resp = PingAllResp {}; + + (StatusCode::OK, Json(json!(resp))) +} + +/// Handler for the add_endorsers endpoint. +async fn add_endorsers( + Query(params): Query>, + Extension(state): Extension>, +) -> impl IntoResponse { + + if !params.contains_key("endorsers") { + eprintln!("missing a uri endorsers"); + return (StatusCode::BAD_REQUEST, Json(json!({}))); + } + + let res = base64_url::decode(¶ms["endorsers"]); + if res.is_err() { + eprintln!("received no endorsers uri {:?}", res); + return (StatusCode::BAD_REQUEST, Json(json!({}))); + } + let endorsers = res.unwrap(); + let endorsers = endorsers.as_slice(); + let endorsers = std::str::from_utf8(endorsers); + if endorsers.is_err() { + eprintln!("received a bad endorsers uri {:?}", endorsers); + return (StatusCode::BAD_REQUEST, Json(json!({}))); + } + let endorsers = endorsers.unwrap(); + + let res = state.add_endorsers(endorsers.to_string()).await; + if res.is_err() { + eprintln!("failed to add endorsers"); + return (StatusCode::CONFLICT, Json(json!({}))); + } + + let resp = AddEndorsersResp {}; + + (StatusCode::OK, Json(json!(resp))) +} \ No newline at end of file diff --git a/experiments/HadoodBenchmarks.py b/experiments/HadoodBenchmarks.py new file mode 100644 index 0000000..9a53919 --- /dev/null +++ b/experiments/HadoodBenchmarks.py @@ -0,0 +1,84 @@ +import time +from concurrent.futures import ThreadPoolExecutor +import pydoop.hdfs as hdfs + +# Configuration +NR_FILES = 500000 +NR_THREADS = 64 +NR_FILES_PER_DIR = 4 +BASE_DIR = "/benchmark_test" + +# Utility functions for Hadoop operations +def create_file(file_path): + with hdfs.open(file_path, 'w') as f: + f.write("test data") + +def mkdir(dir_path): + hdfs.mkdir(dir_path) + +def open_file(file_path): + with hdfs.open(file_path, 'r') as f: + f.read() + +def delete(file_path): + hdfs.rm(file_path, recursive=True) + +def file_status(file_path): + return hdfs.stat(file_path) + +def rename(src_path, dest_path): + hdfs.rename(src_path, dest_path) + +# Benchmarking function +def benchmark(operation, paths, nr_threads): + start_time = time.time() + with ThreadPoolExecutor(max_workers=nr_threads) as executor: + executor.map(operation, paths) + end_time = time.time() + elapsed_time = end_time - start_time + print(f"{operation.__name__}: {len(paths)} operations in {elapsed_time:.2f} seconds.") + return elapsed_time + +# Main benchmark +def main(): + # Setup paths + directories = [f"{BASE_DIR}/dir_{i}" for i in range(NR_FILES // NR_FILES_PER_DIR)] + file_paths = [f"{dir}/file_{j}" for dir in directories for j in range(NR_FILES_PER_DIR)] + rename_paths = [(file, file + "_renamed") for file in file_paths] + + # Ensure the base directory is clean + if hdfs.path.exists(BASE_DIR): + delete(BASE_DIR) + mkdir(BASE_DIR) + + # Create directories + benchmark(mkdir, directories, NR_THREADS) + + # Create files + create_time = benchmark(create_file, file_paths, NR_THREADS) + + # Open files + open_time = benchmark(open_file, file_paths, NR_THREADS) + + # Retrieve file status + status_time = benchmark(file_status, file_paths, NR_THREADS) + + # Rename files + rename_time = benchmark(lambda pair: rename(*pair), rename_paths, NR_THREADS) + + # Delete files + delete_time = benchmark(delete, [file for file, _ in rename_paths], NR_THREADS) + + # Delete directories + benchmark(delete, directories, NR_THREADS) + + # Summary + print("\n--- Benchmark Summary ---") + print(f"Create Time: {create_time:.2f}s") + print(f"Open Time: {open_time:.2f}s") + print(f"FileStatus Time: {status_time:.2f}s") + print(f"Rename Time: {rename_time:.2f}s") + print(f"Delete Time: {delete_time:.2f}s") + +if __name__ == "__main__": + main() diff --git a/experiments/append.lua b/experiments/append.lua index e2e72d6..2e2e05d 100644 --- a/experiments/append.lua +++ b/experiments/append.lua @@ -4,7 +4,7 @@ package.path = current_folder .. "/?.lua;" .. package.path local base64url = require("base64url") local socket = require("socket") local json = require("json") -local uuid = require("uuid") +local uuid = require("uuidgen") local sha = require("sha2") time = math.floor(socket.gettime() * 1000) diff --git a/experiments/config.py b/experiments/config.py index 319fc31..967f9f8 100644 --- a/experiments/config.py +++ b/experiments/config.py @@ -1,10 +1,8 @@ -LOCAL_RUN = False # set to True if you want to run all nodes and experiments locally. Else set to False. +LOCAL_RUN = False # set to True if you want to run all nodes and experiments locally. Else set to False. # If set to True, you can ignore all the IP addresses and SSH stuff below. They won't be used. # You cannot run any of the Azure table experiments locally. - # Set the IPs below and make sure that the machine running this script can ssh into those IPs - # The SSH_IPs are IP addresses that our script can use to SSH to the machines and set things up # The LISTEN_IPs are IP addresses on which the machine can listen on a port. # For example, these could be private IP addresses in a VNET. In many cases, LISTEN_IPs can just the SSH_IPs. @@ -25,7 +23,7 @@ SSH_IP_COORDINATOR = "127.0.0.1" LISTEN_IP_COORDINATOR = "127.0.0.1" PORT_COORDINATOR = "8080" -PORT_COORDINATOR_CTRL = "8090" # control plane +PORT_COORDINATOR_CTRL = "8090" # control pane SSH_IP_ENDPOINT_1 = "127.0.0.1" LISTEN_IP_ENDPOINT_1 = "127.0.0.1" @@ -35,16 +33,16 @@ LISTEN_IP_ENDPOINT_2 = "127.0.0.1" PORT_ENDPOINT_2 = "8082" -LISTEN_IP_LOAD_BALANCER = "127.0.0.1" # if no load balancer is available just use one endpoint (ENDPOINT_1) +LISTEN_IP_LOAD_BALANCER = "127.0.0.1" # if no load balancer is available just use one endpoint (ENDPOINT_1) # and set the LISTEN IP of that endpoint here -PORT_LOAD_BALANCER = "8082" #if no load balancer is available just use one endpoint (ENDPOINT_1) - # and set the PORT of that endpoint here - -SSH_IP_CLIENT = "127.0.0.1" # IP of the machine that will be running our workload generator. +PORT_LOAD_BALANCER = "8082" # if no load balancer is available just use one endpoint (ENDPOINT_1) + # and set the PORT of that endpoint here +SSH_IP_CLIENT = "127.0.0.1" # IP of the machine that will be running our workload generator. # If you are going to be running the reconfiguration experiment, set the backup endorsers +# Backup Endorsers for reconfiguration experiment SSH_IP_ENDORSER_4 = "127.0.0.1" LISTEN_IP_ENDORSER_4 = "127.0.0.1" PORT_ENDORSER_4 = "9094" @@ -57,8 +55,8 @@ LISTEN_IP_ENDORSER_6 = "127.0.0.1" PORT_ENDORSER_6 = "9096" - # If you are going to be running the SGX experiment on SGX machines, set the SGX endorsers +# SGX experiment on SGX machines SSH_IP_SGX_ENDORSER_1 = "127.0.0.1" LISTEN_IP_SGX_ENDORSER_1 = "127.0.0.1" PORT_SGX_ENDORSER_1 = "9091" @@ -71,14 +69,13 @@ LISTEN_IP_SGX_ENDORSER_3 = "127.0.0.1" PORT_SGX_ENDORSER_3 = "9093" - # Set the PATHs below to the folder containing the nimble executables (e.g. "/home/user/nimble/target/release") # wrk2 executable, and the directory where the logs and results should be stored. # We assume all of the machines have the same path. NIMBLE_PATH = "/home/user/nimble" NIMBLE_BIN_PATH = NIMBLE_PATH + "/target/release" -WRK2_PATH = NIMBLE_PATH + "/experiments/wrk2" +WRK2_PATH = NIMBLE_PATH + "/experiments/wrk" OUTPUT_FOLDER = NIMBLE_PATH + "/experiments/results" # Set the SSH user for the machines that we will be connecting to. @@ -87,3 +84,5 @@ # To use Azure storage, you need to set the STORAGE_ACCOUNT_NAME and STORAGE_MASTER_KEY environment variables # with the corresponding values that you get from Azure. +STORAGE_ACCOUNT_NAME = "" +STORAGE_MASTER_KEY = "" diff --git a/experiments/create.lua b/experiments/create.lua index a45d9e2..d2d728b 100644 --- a/experiments/create.lua +++ b/experiments/create.lua @@ -4,7 +4,7 @@ package.path = current_folder .. "/?.lua;" .. package.path local base64url = require("base64url") local socket = require("socket") local json = require("json") -local uuid = require("uuid") +local uuid = require("uuidgen") local sha = require("sha2") time = math.floor(socket.gettime() * 1000) diff --git a/experiments/read.lua b/experiments/read.lua index 1d7772a..f76d83a 100644 --- a/experiments/read.lua +++ b/experiments/read.lua @@ -4,7 +4,7 @@ package.path = current_folder .. "/?.lua;" .. package.path local base64url = require("base64url") local socket = require("socket") local json = require("json") -local uuid = require("uuid") +local uuid = require("uuidgen") local sha = require("sha2") time = math.floor(socket.gettime() * 1000) math.randomseed(time) diff --git a/experiments/run_3a.py b/experiments/run_3a.py index 154dc7d..22ee8ac 100644 --- a/experiments/run_3a.py +++ b/experiments/run_3a.py @@ -1,9 +1,10 @@ import os -import time -import random -from config import * -from setup_nodes import * +import subprocess +import logging from datetime import datetime +from setup_nodes import * +from config import * # Assuming your configuration is correctly set up + timestamp = time.time() dt_object = datetime.fromtimestamp(timestamp) @@ -13,25 +14,59 @@ NUM_ITERATIONS = 1 LOAD = [50000] #[5000, 10000, 15000, 20000, 25000, 50000, 55000] # requests/sec + +# Setup logging +def setup_logging(log_folder): + # Create log folder if it doesn't exist + if not os.path.exists(log_folder): + os.makedirs(log_folder) + + log_file = os.path.join(log_folder, "experiment.log") + + logging.basicConfig( + filename=log_file, + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(message)s', + ) + def run_3a(time, op, out_folder): + # Setup logging for the experiment + setup_logging(out_folder) + log_dir = os.path.dirname("./logs") + if not os.path.exists(log_dir): + os.makedirs(log_dir) + # Run client (wrk2) for i in LOAD: - cmd = "\'" + WRK2_PATH + "/wrk -t120 -c120 -d" + time + " -R" + str(i) + cmd = "\'" + WRK2_PATH + "/wrk2 -t120 -c120 -d" + time + " -R" + str(i) cmd += " --latency http://" + LISTEN_IP_LOAD_BALANCER + ":" + PORT_LOAD_BALANCER cmd += " -s " + NIMBLE_PATH + "/experiments/" + op + ".lua" cmd += " -- " + str(i) + "req" cmd += " > " + out_folder + op + "-" + str(i) + ".log\'" + + logging.info(f"Executing command: {cmd}") cmd = ssh_cmd(SSH_IP_CLIENT, cmd) print(cmd) - os.system(cmd) + + # Use subprocess to execute the command and capture output + result = subprocess.run(cmd, shell=True, capture_output=True) + + if result.returncode != 0: + logging.error(f"Command failed with return code: {result.returncode}") + logging.error(f"Standard Output: {result.stdout.decode()}") + logging.error(f"Standard Error: {result.stderr.decode()}") + else: + logging.info(f"Command executed successfully. Output captured in: {out_folder}{op}-{i}.log") +# Main experiment loop out_folder = OUTPUT_FOLDER + "/" + EXP_NAME + "/" setup_output_folder(SSH_IP_CLIENT, out_folder) + for i in range(NUM_ITERATIONS): teardown(False) setup("", False) @@ -52,4 +87,6 @@ def run_3a(time, op, out_folder): run_3a(duration, operation, out_folder) teardown(False) +print(f"{SSH_IP_CLIENT=}") collect_results(SSH_IP_CLIENT) + diff --git a/experiments/run_3b.py b/experiments/run_3b.py index d7b9325..c5bd589 100644 --- a/experiments/run_3b.py +++ b/experiments/run_3b.py @@ -1,10 +1,18 @@ import os +import subprocess import time import random + +import logging + from config import * from setup_nodes import * from datetime import datetime +RED = "\033[31;1m" # Red and Bold for failure +GREEN = "\033[32;1m" # Green and Bold for success +RESET = "\033[0m" # Reset to default + timestamp = time.time() dt_object = datetime.fromtimestamp(timestamp) dt_string = dt_object.strftime("date-%Y-%m-%d-time-%H-%M-%S") @@ -13,27 +21,58 @@ NUM_ITERATIONS = 1 # Our table implementation can support much higher throughput for reads than create or append -CREATE_APPEND_LOAD = [2000] #[500, 1000, 1500, 2000, 2500] # requests/second -READ_LOAD = [50000] # CREATE_APPEND_LOAD + [10000, 15000, 25000, 50000, 55000] +CREATE_APPEND_LOAD = [50000] # [500, 1000, 1500, 2000, 2500] requests/second +READ_LOAD = [50000] # CREATE_APPEND_LOAD + [10000, 15000, 25000, 50000, 55000] + + +# Setup logging +def setup_logging(log_folder): + if not os.path.exists(log_folder): + os.makedirs(log_folder) + + log_file = os.path.join(log_folder, "experiment.log") + + logging.basicConfig( + filename=log_file, + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(message)s', + ) + def run_3b(time, op, out_folder): load = CREATE_APPEND_LOAD + setup_logging(out_folder) + log_dir = os.path.dirname("./logs") + if not os.path.exists(log_dir): + os.makedirs(log_dir) if op == "read": load = READ_LOAD # Run client (wrk2) for i in load: - cmd = "\'" + WRK2_PATH + "/wrk -t120 -c120 -d" + time + " -R" + str(i) + cmd = "\'" + WRK2_PATH + "/wrk2 -t120 -c120 -d" + time + " -R" + str(i) cmd += " --latency http://" + LISTEN_IP_LOAD_BALANCER + ":" + PORT_LOAD_BALANCER cmd += " -s " + NIMBLE_PATH + "/experiments/" + op + ".lua" cmd += " -- " + str(i) + "req" cmd += " > " + out_folder + op + "-" + str(i) + ".log\'" + logging.info(f"Executing command: {cmd}") + cmd = ssh_cmd(SSH_IP_CLIENT, cmd) print(cmd) - os.system(cmd) + result = subprocess.run(cmd, shell=True, capture_output=True) + + if result.returncode != 0: + logging.error(f"{RED}Command failed with return code: {result.returncode}{RESET}") + logging.error(f"{RED}Standard Output: {result.stdout.decode()}{RESET}") + logging.error(f"{RED}Standard Error: {result.stderr.decode()}{RESET}") + print(f"{RED}An error happened with: {cmd} \nError output: {result.stderr.decode()}\n\n{RESET}") + else: + logging.info(f"{GREEN}Command executed successfully. Output captured in: {out_folder}{op}-{i}.log{RESET}") + print(f"{GREEN}Command executed successfully. Output captured in: {out_folder}{op}-{i}.log{RESET}") + if os.environ.get('STORAGE_MASTER_KEY', '') == "" or os.environ.get('STORAGE_ACCOUNT_NAME', '') == "": print("Make sure to set the STORAGE_MASTER_KEY and STORAGE_ACCOUNT_NAME environment variables") @@ -42,8 +81,8 @@ def run_3b(time, op, out_folder): out_folder = OUTPUT_FOLDER + "/" + EXP_NAME + "/" setup_output_folder(SSH_IP_CLIENT, out_folder) -store = " -s table -n nimble" + str(random.randint(1,100000000)) + " -a \"" + os.environ['STORAGE_ACCOUNT_NAME'] + "\"" -store += " -k \"" + os.environ['STORAGE_MASTER_KEY'] + "\"" +store = f" -s table -n nimble{random.randint(1, 100000000)} -a \"{os.environ['STORAGE_ACCOUNT_NAME']}\"" +store += f" -k \"{os.environ['STORAGE_MASTER_KEY']}\"" for i in range(NUM_ITERATIONS): teardown(False) diff --git a/experiments/run_3c.py b/experiments/run_3c.py index 11f5888..fc134d7 100644 --- a/experiments/run_3c.py +++ b/experiments/run_3c.py @@ -1,30 +1,66 @@ import os +import subprocess import time -import random from config import * from setup_nodes import * from datetime import datetime +import logging timestamp = time.time() dt_object = datetime.fromtimestamp(timestamp) dt_string = dt_object.strftime("date-%Y-%m-%d-time-%H-%M-%S") + +def setup_logging(log_folder): + # Create log folder if it doesn't exist + if not os.path.exists(log_folder): + os.makedirs(log_folder) + + log_file = os.path.join(log_folder, "experiment.log") + + logging.basicConfig( + filename=log_file, + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(message)s', + ) + + EXP_NAME = "fig-3c-" + dt_string NUM_ITERATIONS = 1 LOAD = [20000] # [5000, 10000, 15000, 20000, 25000] # requests/sec def run_3c(time, op, out_folder): + setup_logging(out_folder) + log_dir = os.path.dirname("./logs") + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + for i in LOAD: - cmd = "\'" + WRK2_PATH + "/wrk -t120 -c120 -d" + time + " -R" + str(i) + cmd = "\'" + WRK2_PATH + "/wrk2 -t120 -c120 -d" + time + " -R" + str(i) cmd += " --latency http://" + LISTEN_IP_LOAD_BALANCER + ":" + PORT_LOAD_BALANCER cmd += " -s " + NIMBLE_PATH + "/experiments/" + op + ".lua" cmd += " -- " + str(i) + "req" cmd += " > " + out_folder + op + "-" + str(i) + ".log\'" + logging.info(f"Executing command: {cmd}") + + cmd = ssh_cmd(SSH_IP_CLIENT, cmd) + + print(cmd) - os.system(cmd) + #os.system(cmd) + result = subprocess.run(cmd, shell=True, capture_output=True) + + if result.returncode != 0: + logging.error(f"Command failed with return code: {result.returncode}") + logging.error(f"Standard Output: {result.stdout.decode()}") + logging.error(f"Standard Error: {result.stderr.decode()}") + else: + logging.info(f"Command executed successfully. Output captured in: {out_folder}{op}-{i}.log") + out_folder = OUTPUT_FOLDER + "/" + EXP_NAME + "/" setup_output_folder(SSH_IP_CLIENT, out_folder) diff --git a/experiments/run_4.py b/experiments/run_4.py index feb81a9..b5b9d79 100644 --- a/experiments/run_4.py +++ b/experiments/run_4.py @@ -34,7 +34,8 @@ def reconfigure(out_folder, tcpdump_folder, num): def start_tcp_dump(num, tcpdump_folder): # Stop tcpdump in case it is still running - cmd = "\"sudo pkill tcpdump\"" + # cmd = "\"sudo pkill tcpdump\"" + cmd = "sudo pkill tcpdump" cmd = ssh_cmd(SSH_IP_COORDINATOR, cmd) print(cmd) @@ -45,11 +46,13 @@ def start_tcp_dump(num, tcpdump_folder): # Start tcpdump to collect network traffic to and from all endorsers tcp_file_name = tcpdump_folder + "/" + str(num) + ".pcap" - cmd = "screen -d -m \"sudo tcpdump" + # cmd = "screen -d -m \"sudo tcpdump" + cmd = "screen -d -m sudo tcpdump" for port in endorser_ports: cmd += " tcp dst port " + port + " or tcp src port " + port + " or " cmd = cmd.rsplit(" or ", 1)[0] - cmd += " -w " + tcp_file_name + "\"" + # cmd += " -w " + tcp_file_name + "\"" + cmd += " -w " + tcp_file_name + "" cmd = ssh_cmd(SSH_IP_COORDINATOR, cmd) print(cmd) @@ -58,7 +61,8 @@ def start_tcp_dump(num, tcpdump_folder): def complete_tcp_dump(out_folder, num, file_name): - cmd = "\"sudo pkill tcpdump\"" + # cmd = "\"sudo pkill tcpdump\"" + cmd = "sudo pkill tcpdump" cmd = ssh_cmd(SSH_IP_COORDINATOR, cmd) print(cmd) @@ -68,8 +72,10 @@ def complete_tcp_dump(out_folder, num, file_name): time.sleep(30) # enough time # Parse pcap file and output statistics to log - cmd = "\"bash " + NIMBLE_PATH + "/experiments/tcpdump-stats.sh " + file_name + " > " - cmd += out_folder + "/reconf-bw-" + str(num) + "ledgers.log\"" + # cmd = "\"bash " + NIMBLE_PATH + "/experiments/tcpdump-stats.sh " + file_name + " > " + cmd = "bash "+ NIMBLE_PATH + "/experiments/tcpdump-stats.sh " + file_name + " > " + # cmd += out_folder + "/reconf-bw-" + str(num) + "ledgers.log\"" + cmd += out_folder + "/reconf-bw-" + str(num) + "ledgers.log" cmd = ssh_cmd(SSH_IP_COORDINATOR, cmd) print(cmd) @@ -83,7 +89,7 @@ def create_ledgers(num): duration = str(int(num/rps)) + "s" # Run client (wrk2) to set up the ledgers - cmd = "\'" + WRK2_PATH + "/wrk -t60 -c60 -d" + duration + " -R" + str(rps) + cmd = "\'" + WRK2_PATH + "/wrk2 -t60 -c60 -d" + duration + " -R" + str(rps) cmd += " --latency http://" + LISTEN_IP_LOAD_BALANCER + ":" + PORT_LOAD_BALANCER cmd += " -s " + NIMBLE_PATH + "/experiments/create.lua" cmd += " -- " + str(rps) + "req > /dev/null\'" diff --git a/experiments/setup_nodes.py b/experiments/setup_nodes.py index 16e17b1..e81e75c 100644 --- a/experiments/setup_nodes.py +++ b/experiments/setup_nodes.py @@ -62,7 +62,7 @@ def setup_sgx_endorsers(): def setup_coordinator(store): - coordinator = CMD + "/coordinator -t " + LISTEN_IP_COORDINATOR + " -p " + PORT_COORDINATOR + " -r " + PORT_COORDINATOR_CTRL + coordinator = CMD + "/coordinator -i1 -t " + LISTEN_IP_COORDINATOR + " -p " + PORT_COORDINATOR + " -r " + PORT_COORDINATOR_CTRL coordinator += " -e \"http://" + LISTEN_IP_ENDORSER_1 + ":" + PORT_ENDORSER_1 coordinator += ",http://" + LISTEN_IP_ENDORSER_2 + ":" + PORT_ENDORSER_2 coordinator += ",http://" + LISTEN_IP_ENDORSER_3 + ":" + PORT_ENDORSER_3 @@ -76,7 +76,7 @@ def setup_coordinator(store): time.sleep(5) def setup_coordinator_sgx(store): - coordinator = CMD + "/coordinator -t " + LISTEN_IP_COORDINATOR + " -p " + PORT_COORDINATOR + " -r " + PORT_COORDINATOR_CTRL + coordinator = CMD + "/coordinator -i1 -t " + LISTEN_IP_COORDINATOR + " -p " + PORT_COORDINATOR + " -r " + PORT_COORDINATOR_CTRL coordinator += " -e \"http://" + LISTEN_IP_SGX_ENDORSER_1 + ":" + PORT_SGX_ENDORSER_1 coordinator += ",http://" + LISTEN_IP_SGX_ENDORSER_2 + ":" + PORT_SGX_ENDORSER_2 coordinator += ",http://" + LISTEN_IP_SGX_ENDORSER_3 + ":" + PORT_SGX_ENDORSER_3 @@ -200,4 +200,4 @@ def collect_results(ip): else: cmd = "scp -r -i " + SSH_KEY_PATH + " " + SSH_USER + "@" + ip + ":" + OUTPUT_FOLDER + " ./" print(cmd) - os.system(cmd) + os.system(cmd) \ No newline at end of file diff --git a/experiments/testing_ping.py b/experiments/testing_ping.py new file mode 100644 index 0000000..b42aa9a --- /dev/null +++ b/experiments/testing_ping.py @@ -0,0 +1,67 @@ +import os +import subprocess +import logging +from datetime import datetime +from setup_nodes import * +from config import * + +# /home/user/Nimble/target/release/endorser + +# Setup logging +def setup_logging(log_folder): + if not os.path.exists(log_folder): + os.makedirs(log_folder) + + log_file = os.path.join(log_folder, "testing_ping.log") + + logging.basicConfig( + filename=log_file, + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(message)s', + ) + +def run_ping_test(time, out_folder): + setup_logging(out_folder) + log_dir = os.path.dirname("./logs") + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + LOAD = [50] + for i in LOAD: + cmd = f"'{WRK2_PATH}/wrk2 -t120 -c120 -d{time} -R{i} --latency http://{LISTEN_IP_LOAD_BALANCER}:{PORT_LOAD_BALANCER}" + cmd += f" -s {NIMBLE_PATH}/experiments/ping.lua -- {i}req > {out_folder}ping-{i}.log'" + + logging.info(f"Executing command: {cmd}") + + cmd = ssh_cmd(SSH_IP_CLIENT, cmd) + + print(cmd) + + result = subprocess.run(cmd, shell=True, capture_output=True) + + if result.returncode != 0: + logging.error(f"Command failed with return code: {result.returncode}") + logging.error(f"Standard Output: {result.stdout.decode()}") + logging.error(f"Standard Error: {result.stderr.decode()}") + else: + logging.info(f"Command executed successfully. Output captured in: {out_folder}ping-{i}.log") + +# Main test loop +timestamp = time.time() +dt_object = datetime.fromtimestamp(timestamp) +dt_string = dt_object.strftime("date-%Y-%m-%d-time-%H-%M-%S") + +EXP_NAME = "ping-test-" + dt_string +out_folder = OUTPUT_FOLDER + "/" + EXP_NAME + "/" +setup_output_folder(SSH_IP_CLIENT, out_folder) + +teardown(False) +setup("", False) + +operation = "ping" +duration = "30s" +run_ping_test(duration, out_folder) + +teardown(False) +print(f"{SSH_IP_CLIENT=}") +collect_results(SSH_IP_CLIENT) diff --git a/proto/coordinator.proto b/proto/coordinator.proto index 0585862..d45ea0b 100644 --- a/proto/coordinator.proto +++ b/proto/coordinator.proto @@ -9,6 +9,9 @@ service Call { rpc ReadByIndex(ReadByIndexReq) returns (ReadByIndexResp); rpc ReadViewByIndex(ReadViewByIndexReq) returns (ReadViewByIndexResp); rpc ReadViewTail(ReadViewTailReq) returns (ReadViewTailResp); + rpc PingAllEndorsers(PingAllReq) returns (PingAllResp); + rpc GetTimeoutMap(GetTimeoutMapReq) returns (GetTimeoutMapResp); + rpc AddEndorsers(AddEndorsersReq) returns (AddEndorsersResp); } message NewLedgerReq { @@ -70,4 +73,24 @@ message ReadViewTailResp { bytes receipts = 2; uint64 height = 3; bytes attestations = 4; // TODO: place holder for attestation reports +} + +message PingAllReq { +} + +message PingAllResp { +} + +message GetTimeoutMapReq { +} + +message GetTimeoutMapResp { + map timeout_map = 1; +} + +message AddEndorsersReq { + string endorsers = 1; +} + +message AddEndorsersResp { } \ No newline at end of file diff --git a/proto/endorser.proto b/proto/endorser.proto index e85c068..d82db60 100644 --- a/proto/endorser.proto +++ b/proto/endorser.proto @@ -12,14 +12,12 @@ service EndorserCall { rpc ReadLatest(ReadLatestReq) returns (ReadLatestResp); rpc Append(AppendReq) returns (AppendResp); rpc Activate(ActivateReq) returns (ActivateResp); + rpc Ping(PingReq) returns (PingResp); } -message GetPublicKeyReq { -} +message GetPublicKeyReq {} -message GetPublicKeyResp { - bytes pk = 1; -} +message GetPublicKeyResp { bytes pk = 1; } message NewLedgerReq { bytes handle = 1; @@ -27,9 +25,7 @@ message NewLedgerReq { bytes block = 3; } -message NewLedgerResp { - bytes receipt = 1; -} +message NewLedgerResp { bytes receipt = 1; } message ReadLatestReq { bytes handle = 1; @@ -50,9 +46,7 @@ message AppendReq { bytes nonces = 5; } -message AppendResp { - bytes receipt = 1; -} +message AppendResp { bytes receipt = 1; } message LedgerTailMapEntry { bytes handle = 1; @@ -62,25 +56,23 @@ message LedgerTailMapEntry { bytes nonces = 5; } -message LedgerTailMap { - repeated LedgerTailMapEntry entries = 1; -} +message LedgerTailMap { repeated LedgerTailMapEntry entries = 1; } -// protobuf supports maps (https://developers.google.com/protocol-buffers/docs/proto#maps), -// but it does not allow using bytes as keys in the map -// gRPC messages are limited to 4 MB, which allows about 50+K entries. -// In the future, we can either increase the limit on gRPC messages or switch to gRPC streaming +// protobuf supports maps +// (https://developers.google.com/protocol-buffers/docs/proto#maps), but it does +// not allow using bytes as keys in the map gRPC messages are limited to 4 MB, +// which allows about 50+K entries. In the future, we can either increase the +// limit on gRPC messages or switch to gRPC streaming message InitializeStateReq { bytes group_identity = 1; repeated LedgerTailMapEntry ledger_tail_map = 2; // the list of ledger tails bytes view_tail_metablock = 3; // the view ledger tail's metablock bytes block_hash = 4; // the block hash of the latest block on the view ledger - uint64 expected_height = 5; // the conditional updated height of the latest block on the view ledger + uint64 expected_height = 5; // the conditional updated height of the latest + // block on the view ledger } -message InitializeStateResp { - bytes receipt = 1; -} +message InitializeStateResp { bytes receipt = 1; } message FinalizeStateReq { bytes block_hash = 1; @@ -99,9 +91,7 @@ enum EndorserMode { Finalized = 3; } -message ReadStateReq { - -} +message ReadStateReq {} message ReadStateResp { bytes receipt = 1; @@ -124,6 +114,8 @@ message ActivateReq { bytes receipts = 5; } -message ActivateResp { +message ActivateResp {} -} +message PingReq { bytes nonce = 1; } + +message PingResp { bytes id_sig = 1; } diff --git a/proto/endpoint.proto b/proto/endpoint.proto index ac8de52..f40ace2 100644 --- a/proto/endpoint.proto +++ b/proto/endpoint.proto @@ -7,6 +7,9 @@ service Call { rpc NewCounter(NewCounterReq) returns (NewCounterResp); rpc IncrementCounter(IncrementCounterReq) returns (IncrementCounterResp); rpc ReadCounter(ReadCounterReq) returns (ReadCounterResp); + rpc PingAllEndorsers(PingAllReq) returns (PingAllResp); + rpc GetTimeoutMap(GetTimeoutMapReq) returns (GetTimeoutMapResp); + rpc AddEndorsers(AddEndorsersReq) returns (AddEndorsersResp); } message GetIdentityReq { @@ -45,4 +48,24 @@ message ReadCounterResp { bytes tag = 1; uint64 counter = 2; bytes signature = 3; -} \ No newline at end of file +} + +message PingAllReq { +} + +message PingAllResp { +} + +message GetTimeoutMapReq { +} + +message GetTimeoutMapResp { + map timeout_map = 1; +} + +message AddEndorsersReq { + string endorsers = 1; +} + +message AddEndorsersResp { +} diff --git a/runNNTBenchmark.sh b/runNNTBenchmark.sh new file mode 100644 index 0000000..b8a148d --- /dev/null +++ b/runNNTBenchmark.sh @@ -0,0 +1,18 @@ +#!/bin/bash -e +THREADS=64 +FILES=500000 +DIRS=500000 + +function bench { + op=$1 + echo "Running $op:" + hadoop org.apache.hadoop.hdfs.server.namenode.NNThroughputBenchmark -op $* +} + +bench create -threads $THREADS -files $FILES +bench mkdirs -threads $THREADS -dirs $DIRS +bench open -threads $THREADS -files $FILES +bench delete -threads $THREADS -files $FILES +bench fileStatus -threads $THREADS -files $FILES +bench rename -threads $THREADS -files $FILES +bench clean \ No newline at end of file