Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ alloy-evm = { package = "alloy-evm", git = "https://github.com/Galxe/alloy-evm",
# async
futures = "0.3"
rand = "0.9"
tokio = { version = "1.44", features = ["sync"] }
auto_impl = "1.3"
lazy_static = "1.5.0"
dashmap = "6.0"
Expand Down
44 changes: 42 additions & 2 deletions src/async_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use revm_context::{
result::{EVMError, ExecutionResult, InvalidTransaction, ResultAndState},
};
use revm_primitives::Address;
use tokio::sync::mpsc;

use crate::{GrevmError, ParallelState, TxId};
use crate::{GrevmError, ParallelState, PrewarmTask, TxId};
use std::cmp::Ordering;

/// `StateAsyncCommit` asynchronously finalizes transaction states,
Expand All @@ -23,6 +24,8 @@ where
state: &'a ParallelState<DB>,
commit_result: Result<(), GrevmError<DB::Error>>,
disable_nonce_check: bool,
block_number: u64,
prewarm_sender: Option<mpsc::UnboundedSender<PrewarmTask>>,
}

impl<'a, DB> StateAsyncCommit<'a, DB>
Expand All @@ -34,7 +37,30 @@ where
state: &'a ParallelState<DB>,
disable_nonce_check: bool,
) -> Self {
Self { coinbase, results: vec![], state, commit_result: Ok(()), disable_nonce_check }
Self {
coinbase,
results: vec![],
state,
commit_result: Ok(()),
disable_nonce_check,
block_number: 0,
prewarm_sender: None,
}
}

/// Sets the block number for prewarm task validation.
pub(crate) fn with_block_number(mut self, block_number: u64) -> Self {
self.block_number = block_number;
self
}

/// Sets the prewarm sender for MPT prewarming.
pub(crate) fn with_prewarm_sender(
mut self,
sender: mpsc::UnboundedSender<PrewarmTask>,
) -> Self {
self.prewarm_sender = Some(sender);
self
}

fn state_mut(&self) -> &mut ParallelState<DB> {
Expand Down Expand Up @@ -69,6 +95,9 @@ where
// phase, the system enforces strict nonce monotonicity checks to guarantee transaction
// integrity and prevent double-spending attacks.
let ResultAndState { result, state, lazy_reward } = result_and_state;

let state_for_prewarm = self.prewarm_sender.as_ref().map(|_| state.clone());

if !self.disable_nonce_check {
match self.state.basic_ref(tx_env.caller) {
Ok(info) => {
Expand Down Expand Up @@ -118,5 +147,16 @@ where
// will read the proper miner state from ParallelState (verified via commit_idx) without
// creating artificial dependencies.
assert!(self.state_mut().increment_balances(vec![(self.coinbase, lazy_reward)]).is_ok());

// Send prewarm task for MPT prewarming if sender is available
if let Some(ref sender) = self.prewarm_sender.as_ref() {
// state_for_prewarm is Some if and only if prewarm_sender is Some
let state = state_for_prewarm
.expect("state_for_prewarm should be Some when prewarm_sender is Some");
let _ = sender.send(PrewarmTask {
block_number: self.block_number,
evm_state: state,
});
}
}
}
11 changes: 10 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@ use lazy_static::lazy_static;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use revm_context::result::{EVMError, ResultAndState};
use revm_primitives::{Address, B256, U256};
use revm_state::{AccountInfo, Bytecode};
use revm_state::{AccountInfo, Bytecode, EvmState};
use std::{cmp::min, thread};

/// Prewarm task containing block number and state changes for MPT prewarming.
#[derive(Debug)]
pub struct PrewarmTask {
/// The block number for validation purposes.
pub block_number: u64,
/// The result and state from transaction execution.
pub evm_state: EvmState,
}

lazy_static! {
static ref CONCURRENT_LEVEL: usize =
thread::available_parallelism().map(|n| n.get()).unwrap_or(8);
Expand Down
30 changes: 21 additions & 9 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
AbortReason, CONCURRENT_LEVEL, FALLBACK_SEQUENTIAL, GrevmError, LocationAndType, MemoryEntry,
ParallelState, ReadVersion, Task, TransactionResult, TransactionStatus, TxId, TxState,
TxVersion, async_commit::StateAsyncCommit, hint::ParallelExecutionHints, storage::CacheDB,
tx_dependency::TxDependency, utils::ContinuousDetectSet,
ParallelState, PrewarmTask, ReadVersion, Task, TransactionResult, TransactionStatus, TxId,
TxState, TxVersion, async_commit::StateAsyncCommit, hint::ParallelExecutionHints,
storage::CacheDB, tx_dependency::TxDependency, utils::ContinuousDetectSet,
};
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use alloy_evm::{EthEvm, Evm, precompiles::PrecompilesMap};
Expand All @@ -19,6 +19,7 @@ use revm_context::{
result::{EVMError, ExecutionResult},
};
use revm_inspector::NoOpInspector;
use tokio::sync::mpsc;

use std::{
cmp::max,
Expand Down Expand Up @@ -269,6 +270,7 @@ where
abort: AtomicBool,
abort_reason: OnceLock<AbortReason>,
metrics: ExecuteMetricsCollector,
prewarm_sender: Option<mpsc::UnboundedSender<PrewarmTask>>,
}

impl<DB> Debug for Scheduler<DB>
Expand Down Expand Up @@ -319,9 +321,16 @@ where
abort: AtomicBool::new(false),
abort_reason: OnceLock::new(),
metrics: ExecuteMetricsCollector::default(),
prewarm_sender: None,
}
}

/// Sets the prewarm sender for MPT prewarming.
pub fn with_prewarm_sender(mut self, sender: mpsc::UnboundedSender<PrewarmTask>) -> Self {
self.prewarm_sender = Some(sender);
self
}

fn async_finality(&self) {
let mut start = Instant::now();
let mut finality_idx = 0;
Expand Down Expand Up @@ -430,12 +439,15 @@ where
if *FALLBACK_SEQUENTIAL {
return self.fallback_sequential();
}
let commiter = Mutex::new(StateAsyncCommit::new(
self.env.beneficiary,
&self.state,
self.cfg.disable_nonce_check,
));
commiter.lock().init().map_err(|e| GrevmError { txid: 0, error: EVMError::Database(e) })?;
let mut commiter =
StateAsyncCommit::new(self.env.beneficiary, &self.state, self.cfg.disable_nonce_check);
let block_number = self.env.number.to::<u64>();
commiter = commiter.with_block_number(block_number);
if let Some(ref sender) = self.prewarm_sender {
commiter = commiter.with_prewarm_sender(sender.clone());
}
commiter.init().map_err(|e| GrevmError { txid: 0, error: EVMError::Database(e) })?;
let commiter = Mutex::new(commiter);
thread::scope(|scope| {
scope.spawn(|| {
self.async_finality();
Expand Down
Loading