diff --git a/Cargo.toml b/Cargo.toml index 42b87d1..593588c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ alloy-evm = { package = "alloy-evm", git = "https://github.com/Galxe/alloy-evm", # async futures = "0.3" rand = "0.9" +tokio = { version = "1.44", features = ["sync"] } auto_impl = "1.3" lazy_static = "1.5.0" dashmap = "6.0" diff --git a/src/async_commit.rs b/src/async_commit.rs index c26542f..2007004 100644 --- a/src/async_commit.rs +++ b/src/async_commit.rs @@ -4,8 +4,9 @@ use revm_context::{ result::{EVMError, ExecutionResult, InvalidTransaction, ResultAndState}, }; use revm_primitives::Address; +use tokio::sync::mpsc; -use crate::{GrevmError, ParallelState, TxId}; +use crate::{GrevmError, ParallelState, PrewarmTask, TxId}; use std::cmp::Ordering; /// `StateAsyncCommit` asynchronously finalizes transaction states, @@ -23,6 +24,8 @@ where state: &'a ParallelState, commit_result: Result<(), GrevmError>, disable_nonce_check: bool, + block_number: u64, + prewarm_sender: Option>, } impl<'a, DB> StateAsyncCommit<'a, DB> @@ -34,7 +37,30 @@ where state: &'a ParallelState, disable_nonce_check: bool, ) -> Self { - Self { coinbase, results: vec![], state, commit_result: Ok(()), disable_nonce_check } + Self { + coinbase, + results: vec![], + state, + commit_result: Ok(()), + disable_nonce_check, + block_number: 0, + prewarm_sender: None, + } + } + + /// Sets the block number for prewarm task validation. + pub(crate) fn with_block_number(mut self, block_number: u64) -> Self { + self.block_number = block_number; + self + } + + /// Sets the prewarm sender for MPT prewarming. + pub(crate) fn with_prewarm_sender( + mut self, + sender: mpsc::UnboundedSender, + ) -> Self { + self.prewarm_sender = Some(sender); + self } fn state_mut(&self) -> &mut ParallelState { @@ -69,6 +95,9 @@ where // phase, the system enforces strict nonce monotonicity checks to guarantee transaction // integrity and prevent double-spending attacks. let ResultAndState { result, state, lazy_reward } = result_and_state; + + let state_for_prewarm = self.prewarm_sender.as_ref().map(|_| state.clone()); + if !self.disable_nonce_check { match self.state.basic_ref(tx_env.caller) { Ok(info) => { @@ -118,5 +147,16 @@ where // will read the proper miner state from ParallelState (verified via commit_idx) without // creating artificial dependencies. assert!(self.state_mut().increment_balances(vec![(self.coinbase, lazy_reward)]).is_ok()); + + // Send prewarm task for MPT prewarming if sender is available + if let Some(ref sender) = self.prewarm_sender.as_ref() { + // state_for_prewarm is Some if and only if prewarm_sender is Some + let state = state_for_prewarm + .expect("state_for_prewarm should be Some when prewarm_sender is Some"); + let _ = sender.send(PrewarmTask { + block_number: self.block_number, + evm_state: state, + }); + } } } diff --git a/src/lib.rs b/src/lib.rs index b3c5126..7a383b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,9 +31,18 @@ use lazy_static::lazy_static; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use revm_context::result::{EVMError, ResultAndState}; use revm_primitives::{Address, B256, U256}; -use revm_state::{AccountInfo, Bytecode}; +use revm_state::{AccountInfo, Bytecode, EvmState}; use std::{cmp::min, thread}; +/// Prewarm task containing block number and state changes for MPT prewarming. +#[derive(Debug)] +pub struct PrewarmTask { + /// The block number for validation purposes. + pub block_number: u64, + /// The result and state from transaction execution. + pub evm_state: EvmState, +} + lazy_static! { static ref CONCURRENT_LEVEL: usize = thread::available_parallelism().map(|n| n.get()).unwrap_or(8); diff --git a/src/scheduler.rs b/src/scheduler.rs index 98f9e91..51513dd 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,8 +1,8 @@ use crate::{ AbortReason, CONCURRENT_LEVEL, FALLBACK_SEQUENTIAL, GrevmError, LocationAndType, MemoryEntry, - ParallelState, ReadVersion, Task, TransactionResult, TransactionStatus, TxId, TxState, - TxVersion, async_commit::StateAsyncCommit, hint::ParallelExecutionHints, storage::CacheDB, - tx_dependency::TxDependency, utils::ContinuousDetectSet, + ParallelState, PrewarmTask, ReadVersion, Task, TransactionResult, TransactionStatus, TxId, + TxState, TxVersion, async_commit::StateAsyncCommit, hint::ParallelExecutionHints, + storage::CacheDB, tx_dependency::TxDependency, utils::ContinuousDetectSet, }; use ahash::{AHashMap as HashMap, AHashSet as HashSet}; use alloy_evm::{EthEvm, Evm, precompiles::PrecompilesMap}; @@ -19,6 +19,7 @@ use revm_context::{ result::{EVMError, ExecutionResult}, }; use revm_inspector::NoOpInspector; +use tokio::sync::mpsc; use std::{ cmp::max, @@ -269,6 +270,7 @@ where abort: AtomicBool, abort_reason: OnceLock, metrics: ExecuteMetricsCollector, + prewarm_sender: Option>, } impl Debug for Scheduler @@ -319,9 +321,16 @@ where abort: AtomicBool::new(false), abort_reason: OnceLock::new(), metrics: ExecuteMetricsCollector::default(), + prewarm_sender: None, } } + /// Sets the prewarm sender for MPT prewarming. + pub fn with_prewarm_sender(mut self, sender: mpsc::UnboundedSender) -> Self { + self.prewarm_sender = Some(sender); + self + } + fn async_finality(&self) { let mut start = Instant::now(); let mut finality_idx = 0; @@ -430,12 +439,15 @@ where if *FALLBACK_SEQUENTIAL { return self.fallback_sequential(); } - let commiter = Mutex::new(StateAsyncCommit::new( - self.env.beneficiary, - &self.state, - self.cfg.disable_nonce_check, - )); - commiter.lock().init().map_err(|e| GrevmError { txid: 0, error: EVMError::Database(e) })?; + let mut commiter = + StateAsyncCommit::new(self.env.beneficiary, &self.state, self.cfg.disable_nonce_check); + let block_number = self.env.number.to::(); + commiter = commiter.with_block_number(block_number); + if let Some(ref sender) = self.prewarm_sender { + commiter = commiter.with_prewarm_sender(sender.clone()); + } + commiter.init().map_err(|e| GrevmError { txid: 0, error: EVMError::Database(e) })?; + let commiter = Mutex::new(commiter); thread::scope(|scope| { scope.spawn(|| { self.async_finality();