Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
build:
env:
RUST_VERSION: 1.65.0
RUST_VERSION: 1.92.0
runs-on: ubuntu-latest
steps:
- name: Install protoc
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ members = [
"coordinator_ctrl",
]

resolver = "2"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
6 changes: 3 additions & 3 deletions coordinator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "coordinator"
version = "0.1.0"
edition = "2018"
authors = ["Srinath Setty <srinath@microsoft.com>", "Sudheesh Singanamalla <t-sudheeshs@microsoft.com>"]
edition = "2024"
authors = ["Srinath Setty <srinath@microsoft.com>", "Weidong Cui <weidong.cui@microsoft.com>", "Sebastian Angel <sebastian.angel@cis.upenn.edu>", "Sudheesh Singanamalla <t-sudheeshs@microsoft.com>"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -29,4 +29,4 @@ rand = "0.8.4"

[build-dependencies]
tonic-build = "0.8.2"
prost-build = "0.11.1"
prost-build = "0.11.1"
110 changes: 46 additions & 64 deletions coordinator/src/coordinator_state.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::errors::CoordinatorError;
use ledger::{
compute_aggregated_block_hash, compute_cut_diffs, compute_max_cut,
Block, CustomSerde, EndorserHostnames, Handle, MetaBlock, NimbleDigest, NimbleHashTrait, Nonce,
Nonces, Receipt, Receipts, VerifierState, compute_aggregated_block_hash, compute_cut_diffs,
compute_max_cut,
errors::VerificationError,
signature::{PublicKey, PublicKeyTrait},
Block, CustomSerde, EndorserHostnames, Handle, MetaBlock, NimbleDigest, NimbleHashTrait, Nonce,
Nonces, Receipt, Receipts, VerifierState,
};
use rand::random;
use std::{
Expand All @@ -14,14 +14,14 @@ use std::{
sync::{Arc, RwLock},
};
use store::ledger::{
azure_table::TableLedgerStore, filestore::FileStore, in_memory::InMemoryLedgerStore,
mongodb_cosmos::MongoCosmosLedgerStore, LedgerEntry, LedgerStore,
LedgerEntry, LedgerStore, azure_table::TableLedgerStore, filestore::FileStore,
in_memory::InMemoryLedgerStore, mongodb_cosmos::MongoCosmosLedgerStore,
};
use store::{errors::LedgerStoreError, errors::StorageError};
use tokio::sync::mpsc;
use tonic::{
transport::{Channel, Endpoint},
Code, Status,
transport::{Channel, Endpoint},
};

use ledger::endorser_proto;
Expand Down Expand Up @@ -333,8 +333,7 @@ async fn update_endorser(
};

let res = Receipt::from_bytes(&receipt);
if res.is_ok() {
let receipt_rs = res.unwrap();
if let Ok(receipt_rs) = res {
let mut receipts = Receipts::new();
receipts.add(&receipt_rs);
let res = ledger_store
Expand Down Expand Up @@ -630,7 +629,8 @@ impl CoordinatorState {
let mut endorsers = EndorserHostnames::new();

for (pk, uri) in &endorser_hostnames {
let pks = self.connect_endorsers(&[uri.clone()]).await;
let pks = self.connect_endorsers(std::slice::from_ref(uri)).await;

if pks.len() == 1 && pks[0].0 == *pk {
endorsers.push((pk.clone(), uri.clone()));
}
Expand Down Expand Up @@ -666,10 +666,7 @@ impl CoordinatorState {

pub fn get_endorser_pks(&self) -> Vec<Vec<u8>> {
if let Ok(conn_map_rd) = self.conn_map.read() {
conn_map_rd
.iter()
.map(|(pk, _endorser)| pk.clone())
.collect::<Vec<Vec<u8>>>()
conn_map_rd.keys().cloned().collect::<Vec<Vec<u8>>>()
} else {
eprintln!("Failed to acquire read lock");
Vec::new()
Expand All @@ -679,8 +676,8 @@ impl CoordinatorState {
pub fn get_endorser_uris(&self) -> Vec<String> {
if let Ok(conn_map_rd) = self.conn_map.read() {
conn_map_rd
.iter()
.map(|(_pk, endorser)| endorser.uri.clone())
.values()
.map(|endorser| endorser.uri.clone())
.collect::<Vec<String>>()
} else {
eprintln!("Failed to acquire read lock");
Expand Down Expand Up @@ -984,10 +981,10 @@ impl CoordinatorState {
match res {
Ok(receipt_rs) => {
receipts.add(&receipt_rs);
if let Ok(vs) = self.verifier_state.read() {
if receipts.check_quorum(&vs).is_ok() {
return Ok(receipts);
}
if let Ok(vs) = self.verifier_state.read()
&& receipts.check_quorum(&vs).is_ok()
{
return Ok(receipts);
}
},
Err(error) => eprintln!("Failed to parse a receipt ({:?})", error),
Expand Down Expand Up @@ -1144,10 +1141,10 @@ impl CoordinatorState {
Ok(receipt) => match Receipt::from_bytes(&receipt) {
Ok(receipt_rs) => {
receipts.add(&receipt_rs);
if let Ok(vs) = self.verifier_state.read() {
if receipts.check_quorum(&vs).is_ok() {
return Ok(receipts);
}
if let Ok(vs) = self.verifier_state.read()
&& receipts.check_quorum(&vs).is_ok()
{
return Ok(receipts);
}
},
Err(error) => {
Expand Down Expand Up @@ -1307,14 +1304,12 @@ impl CoordinatorState {
max_height = height;
}
receipts.add(&receipt_rs);
if let Ok(vs) = self.verifier_state.read() {
if let Ok(_h) = receipts.check_quorum(&vs) {
if let Ok(block_rs) = Block::from_bytes(&block) {
if let Ok(nonces_rs) = Nonces::from_bytes(&nonces) {
return Ok(LedgerEntry::new(block_rs, receipts, Some(nonces_rs)));
}
}
}
if let Ok(vs) = self.verifier_state.read()
&& let Ok(_h) = receipts.check_quorum(&vs)
&& let Ok(block_rs) = Block::from_bytes(&block)
&& let Ok(nonces_rs) = Nonces::from_bytes(&nonces)
{
return Ok(LedgerEntry::new(block_rs, receipts, Some(nonces_rs)));
}
},
Err(error) => {
Expand Down Expand Up @@ -1503,10 +1498,10 @@ impl CoordinatorState {
// Read the current ledger tail
let res = self.ledger_store.read_view_ledger_tail().await;

if res.is_err() {
if let Err(e) = res {
eprintln!(
"Failed to read from the view ledger in the ledger store ({:?})",
res.unwrap_err()
e
);
return Err(CoordinatorError::FailedToCallLedgerStore);
}
Expand Down Expand Up @@ -1622,10 +1617,10 @@ impl CoordinatorState {
.ledger_store
.attach_view_ledger_receipts(view_ledger_height, &receipts)
.await;
if res.is_err() {
if let Err(e) = res {
eprintln!(
"Failed to attach view ledger receipt in the ledger store ({:?})",
res.unwrap_err()
e
);
return Err(CoordinatorError::FailedToCallLedgerStore);
}
Expand All @@ -1637,14 +1632,10 @@ impl CoordinatorState {
if cut_diff.low == cut_diff.high {
continue;
}
let mut block_hashes: Vec<Vec<u8>> =
Vec::with_capacity((cut_diff.high - cut_diff.low) as usize);
let mut block_hashes: Vec<Vec<u8>> = Vec::with_capacity(cut_diff.high - cut_diff.low);
let h = NimbleDigest::from_bytes(&cut_diff.handle).unwrap();
for index in (cut_diff.low + 1)..=cut_diff.high {
let res = self
.ledger_store
.read_ledger_by_index(&h, index as usize)
.await;
let res = self.ledger_store.read_ledger_by_index(&h, index).await;
if let Err(e) = res {
eprintln!("Failed to read the ledger store {:?}", e);
return Err(CoordinatorError::FailedToCallLedgerStore);
Expand Down Expand Up @@ -1723,11 +1714,8 @@ impl CoordinatorState {
.ledger_store
.create_ledger(&handle, genesis_block.clone())
.await;
if res.is_err() {
eprintln!(
"Failed to create ledger in the ledger store ({:?})",
res.unwrap_err()
);
if let Err(e) = res {
eprintln!("Failed to create ledger in the ledger store ({:?})", e);
return Err(CoordinatorError::FailedToCreateLedger);
}

Expand All @@ -1740,9 +1728,9 @@ impl CoordinatorState {
let res = self
.endorser_create_ledger(&endorsers, &handle, &block_hash, genesis_block)
.await;
if res.is_err() {
eprintln!("Failed to create ledger in endorsers ({:?})", res);
return Err(res.unwrap_err());
if let Err(e) = res {
eprintln!("Failed to create ledger in endorsers ({:?})", e);
return Err(e);
}
res.unwrap()
};
Expand Down Expand Up @@ -1781,11 +1769,8 @@ impl CoordinatorState {
.ledger_store
.append_ledger(&handle, &data_block, expected_height)
.await;
if res.is_err() {
eprintln!(
"Failed to append to the ledger in the ledger store {:?}",
res.unwrap_err()
);
if let Err(e) = res {
eprintln!("Failed to append to the ledger in the ledger store {:?}", e);
return Err(CoordinatorError::FailedToAppendLedger);
}

Expand All @@ -1811,9 +1796,9 @@ impl CoordinatorState {
nonces,
)
.await;
if res.is_err() {
eprintln!("Failed to append to the ledger in endorsers {:?}", res);
return Err(res.unwrap_err());
if let Err(e) = res {
eprintln!("Failed to append to the ledger in endorsers {:?}", e);
return Err(e);
}
res.unwrap()
};
Expand All @@ -1822,10 +1807,10 @@ impl CoordinatorState {
.ledger_store
.attach_ledger_receipts(&handle, expected_height, &receipts)
.await;
if res.is_err() {
if let Err(e) = res {
eprintln!(
"Failed to attach ledger receipt to the ledger store ({:?})",
res.unwrap_err()
e
);
return Err(CoordinatorError::FailedToAttachReceipt);
}
Expand Down Expand Up @@ -1887,11 +1872,8 @@ impl CoordinatorState {
CoordinatorError::FailedToObtainQuorum => {
if !nonce_attached {
let res = self.ledger_store.attach_ledger_nonce(&handle, &nonce).await;
if res.is_err() {
eprintln!(
"Failed to attach the nonce for reading ledger tail {:?}",
res.unwrap_err()
);
if let Err(e) = res {
eprintln!("Failed to attach the nonce for reading ledger tail {:?}", e);
return Err(CoordinatorError::FailedToAttachNonce);
}
nonce_attached = true;
Expand Down
Loading