Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rs/ethereum/cketh/minter/src/deposit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub async fn update_last_observed_block_number() -> Option<BlockNumber> {
match read_state(rpc_client)
.get_block_by_number(block_height.clone())
.with_cycles(MIN_ATTACHED_CYCLES)
.send()
.try_send()
.await
.reduce_with_strategy(NoReduction)
{
Expand Down Expand Up @@ -257,7 +257,7 @@ where
.with_response_size_estimate(
ETH_GET_LOGS_INITIAL_RESPONSE_SIZE_ESTIMATE + HEADER_SIZE_LIMIT,
)
.send()
.try_send()
.await
.reduce_with_strategy(NoReduction)
.map(<S::Parser>::parse_all_logs);
Expand Down
56 changes: 44 additions & 12 deletions rs/ethereum/cketh/minter/src/eth_rpc_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use evm_rpc_types::{
RpcError, RpcService as EvmRpcService, RpcServices as EvmRpcServices,
};
use ic_canister_log::log;
use ic_canister_runtime::IcRuntime;
use ic_canister_runtime::{IcError, IcRuntime};
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
Expand Down Expand Up @@ -140,20 +140,34 @@ impl<T: PartialEq> MultiCallResults<T> {
let distinct_errors: BTreeSet<_> = self.errors.values().collect();
match distinct_errors.len() {
0 => panic!("BUG: expect errors should be non-empty"),
1 => {
MultiCallError::ConsistentError(distinct_errors.into_iter().next().unwrap().clone())
}
1 => MultiCallError::ConsistentError(ConsistentError::EvmRpc(
distinct_errors.into_iter().next().unwrap().clone(),
)),
_ => MultiCallError::InconsistentResults(self),
}
}
}

#[derive(Eq, PartialEq, Debug)]
pub enum MultiCallError<T> {
ConsistentError(RpcError),
ConsistentError(ConsistentError),
InconsistentResults(MultiCallResults<T>),
}

#[derive(Eq, PartialEq, Debug)]
pub enum ConsistentError {
/// Error coming from the client talking to the EVM RPC canister
Client(IcError),
/// Error coming from the EVM RPC canister itself.
EvmRpc(RpcError),
}

impl From<RpcError> for ConsistentError {
fn from(error: RpcError) -> Self {
Self::EvmRpc(error)
}
}

pub trait ReductionStrategy<T> {
fn reduce(&self, results: EvmMultiRpcResult<T>) -> Result<T, MultiCallError<T>>;
}
Expand Down Expand Up @@ -235,7 +249,9 @@ where
F: Fn(MultiCallResults<T>) -> Result<T, MultiCallError<T>>,
{
match result {
EvmMultiRpcResult::Consistent(result) => result.map_err(MultiCallError::ConsistentError),
EvmMultiRpcResult::Consistent(result) => {
result.map_err(|e| MultiCallError::ConsistentError(e.into()))
}
EvmMultiRpcResult::Inconsistent(results) => {
reduce(MultiCallResults::from_non_empty_iter(results))
}
Expand All @@ -249,25 +265,41 @@ pub trait ToReducedWithStrategy<T> {
) -> Result<T, MultiCallError<T>>;
}

impl<T> ToReducedWithStrategy<T> for EvmMultiRpcResult<T> {
impl<T> ToReducedWithStrategy<T> for Result<EvmMultiRpcResult<T>, IcError> {
fn reduce_with_strategy(
self,
strategy: impl ReductionStrategy<T>,
) -> Result<T, MultiCallError<T>> {
strategy.reduce(self)
match self {
Ok(result) => strategy.reduce(result),
Err(error) => Err(MultiCallError::from_client_error(error)),
}
}
}

impl<T> MultiCallError<T> {
pub fn from_client_error(error: IcError) -> Self {
MultiCallError::ConsistentError(ConsistentError::Client(error))
}

pub fn has_http_outcall_error_matching<P: Fn(&HttpOutcallError) -> bool>(
&self,
predicate: P,
) -> bool {
match self {
MultiCallError::ConsistentError(RpcError::HttpOutcallError(error)) => predicate(error),
MultiCallError::ConsistentError(RpcError::JsonRpcError { .. }) => false,
MultiCallError::ConsistentError(RpcError::ProviderError(_)) => false,
MultiCallError::ConsistentError(RpcError::ValidationError(_)) => false,
MultiCallError::ConsistentError(ConsistentError::Client(_)) => false,
MultiCallError::ConsistentError(ConsistentError::EvmRpc(
RpcError::HttpOutcallError(error),
)) => predicate(error),
MultiCallError::ConsistentError(ConsistentError::EvmRpc(RpcError::JsonRpcError {
..
})) => false,
MultiCallError::ConsistentError(ConsistentError::EvmRpc(RpcError::ProviderError(
_,
))) => false,
MultiCallError::ConsistentError(ConsistentError::EvmRpc(
RpcError::ValidationError(_),
)) => false,
MultiCallError::InconsistentResults(results) => {
results
.errors
Expand Down
21 changes: 16 additions & 5 deletions rs/ethereum/cketh/minter/src/eth_rpc_client/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
eth_rpc::Hash,
eth_rpc_client::{
MinByKey, MultiCallError, MultiCallResults, StrictMajorityByKey, ToReducedWithStrategy,
ConsistentError, MinByKey, MultiCallError, MultiCallResults, StrictMajorityByKey,
ToReducedWithStrategy,
responses::{TransactionReceipt, TransactionStatus},
},
numeric::{BlockNumber, GasAmount, TransactionCount, WeiPerGas},
Expand All @@ -21,6 +22,7 @@ const LLAMA_NODES: EvmRpcService = EvmRpcService::EthMainnet(EthMainnetService::

mod multi_call_results {
use super::*;
use crate::eth_rpc_client::ReductionStrategy;

mod reduce_with_min_by_key {
use super::*;
Expand Down Expand Up @@ -278,10 +280,10 @@ mod multi_call_results {
proptest! {
#[test]
fn should_not_match_when_consistent_json_rpc_error(code in any::<i64>(), message in ".*") {
let error: MultiCallError<String> = MultiCallError::ConsistentError(RpcError::JsonRpcError(JsonRpcError {
let error: MultiCallError<String> = MultiCallError::ConsistentError(ConsistentError::EvmRpc(RpcError::JsonRpcError(JsonRpcError {
code,
message,
}));
})));
let always_true = |_outcall_error: &HttpOutcallError| true;

assert!(!error.has_http_outcall_error_matching(always_true));
Expand All @@ -291,10 +293,10 @@ mod multi_call_results {
#[test]
fn should_match_when_consistent_http_outcall_error() {
let error: MultiCallError<String> = MultiCallError::ConsistentError(
RpcError::HttpOutcallError(HttpOutcallError::IcError {
ConsistentError::EvmRpc(RpcError::HttpOutcallError(HttpOutcallError::IcError {
code: LegacyRejectionCode::SysTransient,
message: "message".to_string(),
}),
})),
);
let always_true = |_outcall_error: &HttpOutcallError| true;
let always_false = |_outcall_error: &HttpOutcallError| false;
Expand Down Expand Up @@ -337,6 +339,15 @@ mod multi_call_results {
assert!(error_with_outcall_error.has_http_outcall_error_matching(always_true));
}
}

impl<T> ToReducedWithStrategy<T> for MultiRpcResult<T> {
fn reduce_with_strategy(
self,
strategy: impl ReductionStrategy<T>,
) -> Result<T, MultiCallError<T>> {
strategy.reduce(self)
}
}
}

mod eth_get_transaction_receipt {
Expand Down
2 changes: 1 addition & 1 deletion rs/ethereum/cketh/minter/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ pub async fn lazy_refresh_gas_fee_estimate() -> Option<GasFeeEstimate> {
.fee_history((5_u8, BlockTag::Latest))
.with_reward_percentiles(vec![20])
.with_cycles(MIN_ATTACHED_CYCLES)
.send()
.try_send()
.await
.reduce_with_strategy(StrictMajorityByKey::new(|fee_history: &FeeHistory| {
Nat::from(fee_history.oldest_block.clone())
Expand Down
12 changes: 6 additions & 6 deletions rs/ethereum/cketh/minter/src/withdraw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ async fn latest_transaction_count() -> Option<TransactionCount> {
match read_state(rpc_client)
.get_transaction_count((minter_address().await.into_bytes(), BlockTag::Latest))
.with_cycles(MIN_ATTACHED_CYCLES)
.send()
.try_send()
.await
.map(TransactionCount::from)
.map(|res| res.map(TransactionCount::from))
.reduce_with_strategy(MinByKey::new(|count: &TransactionCount| *count))
{
Ok(transaction_count) => Some(transaction_count),
Expand Down Expand Up @@ -354,7 +354,7 @@ async fn send_transactions_batch(latest_transaction_count: Option<TransactionCou
rpc_client
.send_raw_transaction(tx.raw_transaction_hex())
.with_cycles(MIN_ATTACHED_CYCLES)
.send()
.try_send()
.await
.reduce_with_strategy(AnyOf)
}))
Expand Down Expand Up @@ -401,7 +401,7 @@ async fn finalize_transactions_batch() {
rpc_client
.get_transaction_receipt(*hash)
.with_cycles(MIN_ATTACHED_CYCLES)
.send()
.try_send()
.await
.reduce_with_strategy(NoReduction)
}))
Expand Down Expand Up @@ -472,8 +472,8 @@ async fn finalized_transaction_count() -> Result<TransactionCount, MultiCallErro
read_state(rpc_client)
.get_transaction_count((minter_address().await.into_bytes(), BlockTag::Finalized))
.with_cycles(MIN_ATTACHED_CYCLES)
.send()
.try_send()
.await
.map(TransactionCount::from)
.map(|res| res.map(TransactionCount::from))
.reduce_with_strategy(NoReduction)
}
24 changes: 24 additions & 0 deletions rs/ethereum/cketh/minter/tests/cketh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1341,4 +1341,28 @@ mod cketh_evm_rpc {
.build()
.expect_rpc_calls(&cketh);
}

#[test]
fn should_not_panic_when_evm_rpc_canister_is_stopped() {
let cketh = CkEthSetup::default();
// The minter starts right away by scraping the logs,
// which leads the state machine to panick if we were to stop directly the EVM RPC canister.
// So we first stop the minter to start fresh.
cketh.stop_minter();
cketh
.env
.stop_canister(cketh.evm_rpc_id)
.expect("Failed to stop EVM RPC canister");
cketh.start_minter();

cketh.env.advance_time(SCRAPING_ETH_LOGS_INTERVAL);

for _ in 0..10 {
cketh.env.tick();
let logs = cketh.minter_canister_logs();
if let Some(panicking_log) = logs.iter().find(|l| l.content.contains("ic0.trap")) {
panic!("Minter panicked: {}", panicking_log.content);
}
}
}
}
24 changes: 23 additions & 1 deletion rs/ethereum/cketh/test_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ impl CkEthSetup {
.unwrap(),
)
.unwrap();
let minter_id = install_minter(&env, ledger_id, minter_id, evm_rpc_id);
install_evm_rpc(&env, evm_rpc_id);
let minter_id = install_minter(&env, ledger_id, minter_id, evm_rpc_id);

let caller = PrincipalId::new_user_test_id(DEFAULT_PRINCIPAL_ID);
let cketh = Self {
Expand Down Expand Up @@ -696,6 +696,21 @@ impl CkEthSetup {
)
.unwrap()
}

pub fn minter_canister_logs(&self) -> Vec<CanisterLog> {
let log = self.env.canister_log(self.minter_id);

let mut records = log.records().iter().collect::<Vec<_>>();
records.sort_by(|a, b| a.idx.cmp(&b.idx));
records
.into_iter()
.map(|log| CanisterLog {
timestamp_nanos: log.timestamp_nanos,
idx: log.idx,
content: String::from_utf8_lossy(&log.content).to_string(),
})
.collect()
}
}

pub fn format_ethereum_address_to_eip_55(address: &str) -> String {
Expand Down Expand Up @@ -782,3 +797,10 @@ pub struct LedgerBalance {
pub account: Account,
pub balance: Nat,
}

#[derive(Debug)]
pub struct CanisterLog {
pub timestamp_nanos: u64,
pub idx: u64,
pub content: String,
}
Loading