From e0d302284d8bac9f8ba891e46f928327362e4a25 Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Tue, 25 Nov 2025 13:54:21 +0000 Subject: [PATCH 01/14] draft --- Makefile | 6 +++-- based/Cargo.lock | 27 +++++++++++++++++++ based/bin/rpc/Cargo.toml | 28 ++++++++++++++++++++ based/bin/rpc/src/cli.rs | 53 ++++++++++++++++++++++++++++++++++++++ based/bin/rpc/src/main.rs | 16 ++++++++++++ based/bin/rpc/src/types.rs | 0 6 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 based/bin/rpc/Cargo.toml create mode 100644 based/bin/rpc/src/cli.rs create mode 100644 based/bin/rpc/src/main.rs create mode 100644 based/bin/rpc/src/types.rs diff --git a/Makefile b/Makefile index cd2ab2dd8..f0138b018 100644 --- a/Makefile +++ b/Makefile @@ -30,8 +30,10 @@ L2_CHAIN_ID?=$(shell \ L2_CHAIN_ID_HEX:=$(shell printf "0x%064x" $(L2_CHAIN_ID)) PORTAL?=http://0.0.0.0:8080 TXPROXY?=http://0.0.0.0:8090 -L1_RPC_URL?=http://3.84.162.42:8545 -L1_BEACON_RPC_URL?=http://3.84.162.42:5051 +# L1_RPC_URL?=http://3.84.162.42:8545 +# L1_BEACON_RPC_URL?=http://3.84.162.42:5051 +L1_RPC_URL?=https://ethereum-sepolia-rpc.publicnode.com +L1_BEACON_RPC_URL?=https://ethereum-sepolia-beacon-api.publicnode.com PUBLIC_IP?=$(shell curl ifconfig.me) # if GATEWAY_SEQUENCING_KEY is set, use that one, otherwise key_to_address will generate a new one GATEWAY_SEQUENCING_KEY ?= $(shell \ diff --git a/based/Cargo.lock b/based/Cargo.lock index af514c813..bb4fa099d 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -1590,6 +1590,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "based-rpc" +version = "0.1.0" +dependencies = [ + "alloy-eips", + "alloy-primitives", + "alloy-rpc-types", + "bop-common", + "bop-metrics", + "clap", + "eyre", + "futures", + "jsonrpsee", + "op-alloy-rpc-types", + "op-alloy-rpc-types-engine", + "parking_lot", + "reqwest", + "reth-rpc-layer", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tower", + "tower-http", + "tracing", +] + [[package]] name = "based-txproxy" version = "0.1.0" diff --git a/based/bin/rpc/Cargo.toml b/based/bin/rpc/Cargo.toml new file mode 100644 index 000000000..3da20a82d --- /dev/null +++ b/based/bin/rpc/Cargo.toml @@ -0,0 +1,28 @@ +[package] +edition.workspace = true +name = "based-rpc" +rust-version.workspace = true +version.workspace = true + +[dependencies] +alloy-eips.workspace = true +alloy-primitives.workspace = true +alloy-rpc-types.workspace = true +bop-common.workspace = true +bop-metrics.workspace = true +clap.workspace = true +eyre.workspace = true +futures.workspace = true +jsonrpsee.workspace = true +op-alloy-rpc-types.workspace = true +op-alloy-rpc-types-engine.workspace = true +parking_lot.workspace = true +reqwest.workspace = true +reth-rpc-layer.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tokio.workspace = true +tower.workspace = true +tower-http.workspace = true +tracing.workspace = true diff --git a/based/bin/rpc/src/cli.rs b/based/bin/rpc/src/cli.rs new file mode 100644 index 000000000..f766dcfe3 --- /dev/null +++ b/based/bin/rpc/src/cli.rs @@ -0,0 +1,53 @@ +use std::path::PathBuf; + +use bop_common::config::{LoggingConfig, LoggingFlags}; +use clap::Parser; +use tracing::level_filters::LevelFilter; + +#[derive(Parser, Debug, Clone)] +#[command(version, about, name = "based-rpc")] +pub struct RpcArgs { + /// The port to run the rpc on + #[arg(long = "port", default_value_t = 10545)] + pub port: u16, + + /// path to the json file containing the frag stream urls + #[arg(long = "fragstream_urls")] + pub fragstream_urls_path: PathBuf, + + /// Enable debug logging + #[arg(long)] + pub debug: bool, + /// Enable trace logging + #[arg(long)] + pub trace: bool, + /// Enable file logging + #[arg(long = "log.disable_file_logging", action = clap::ArgAction::SetFalse, default_value_t = true)] + pub file_logging: bool, + /// Prefix of log files + #[arg(long = "log.prefix", default_value = "bop-txproxy.log")] + pub log_prefix: String, + /// Path for log files + #[arg(long = "log.dir", default_value = "/tmp")] + pub log_dir: PathBuf, + /// Maximum number of log files + #[arg(long = "log.max_files", default_value_t = 100)] + pub log_max_files: usize, +} + +impl From<&RpcArgs> for LoggingConfig { + fn from(args: &RpcArgs) -> Self { + Self { + level: args + .trace + .then_some(LevelFilter::TRACE) + .or(args.debug.then_some(LevelFilter::DEBUG)) + .unwrap_or(LevelFilter::INFO), + flags: if args.file_logging { LoggingFlags::all() } else { LoggingFlags::StdOut }, + prefix: args.file_logging.then(|| args.log_prefix.clone()), + max_files: args.log_max_files, + path: args.log_dir.clone(), + filters: None, + } + } +} diff --git a/based/bin/rpc/src/main.rs b/based/bin/rpc/src/main.rs new file mode 100644 index 000000000..9c5857d1a --- /dev/null +++ b/based/bin/rpc/src/main.rs @@ -0,0 +1,16 @@ +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use bop_common::utils::init_tracing; +use clap::Parser; +use cli::RpcArgs; +mod cli; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let args = RpcArgs::parse(); + let _guard = init_tracing((&args).into()); + + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), args.port); + + todo!() +} diff --git a/based/bin/rpc/src/types.rs b/based/bin/rpc/src/types.rs new file mode 100644 index 000000000..e69de29bb From f68b12dfe1636287ff34ae3280748db71e5260c7 Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Tue, 25 Nov 2025 19:08:30 +0000 Subject: [PATCH 02/14] works --- based/Cargo.lock | 8 + based/Cargo.toml | 3 + based/bin/rpc/Cargo.toml | 8 + based/bin/rpc/src/cli.rs | 20 +- based/bin/rpc/src/main.rs | 560 ++++++++++++++++++++++++++++++- based/bin/rpc/src/middleware.rs | 101 ++++++ based/bin/rpc/src/types.rs | 38 +++ based/bin/txspammer/Cargo.toml | 4 +- based/bin/txspammer/src/main.rs | 6 +- based/crates/common/src/p2p.rs | 20 +- based/crates/rpc/src/gossiper.rs | 7 +- 11 files changed, 752 insertions(+), 23 deletions(-) create mode 100644 based/bin/rpc/src/middleware.rs diff --git a/based/Cargo.lock b/based/Cargo.lock index bb4fa099d..b2cbff036 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -1595,14 +1595,20 @@ name = "based-rpc" version = "0.1.0" dependencies = [ "alloy-eips", + "alloy-network", "alloy-primitives", + "alloy-provider", "alloy-rpc-types", "bop-common", "bop-metrics", "clap", + "crossbeam-channel", "eyre", "futures", + "futures-util", + "http", "jsonrpsee", + "op-alloy-network", "op-alloy-rpc-types", "op-alloy-rpc-types-engine", "parking_lot", @@ -1612,9 +1618,11 @@ dependencies = [ "serde_json", "thiserror 2.0.12", "tokio", + "tokio-websockets", "tower", "tower-http", "tracing", + "tracing-subscriber 0.3.20", ] [[package]] diff --git a/based/Cargo.toml b/based/Cargo.toml index e5df44acc..75383c468 100644 --- a/based/Cargo.toml +++ b/based/Cargo.toml @@ -41,6 +41,7 @@ either = "1.15.0" ethereum_ssz = "0.9.0" eyre = "0.6.12" futures = "0.3.31" +futures-util = "0.3.31" hickory-resolver = "=0.25.0-alpha.5" # Use the exact version reth expects http = "1.3.1" hyper = "1.5.2" @@ -59,6 +60,7 @@ op-alloy-rpc-types = "0.22.0" op-alloy-rpc-types-engine = { version = "0.22.0", default-features = false, features = [ "serde", ] } +op-alloy-provider = "0.22.0" op-revm = "12.0.1" parking_lot = "0.12.3" paste = "0.1.18" @@ -124,6 +126,7 @@ strum_macros = "0.24" tabwriter = "1.4.1" thiserror = "2.0.11" tokio = { version = "1.43.0", features = ["full"] } +tokio-websockets = { version = "0.12.1", features = ["client", "openssl", "rand", "server"] } toml = "0.8.19" tower = { version = "0.5", features = ["timeout"] } tower-http = { version = "0.6", features = ["cors"] } diff --git a/based/bin/rpc/Cargo.toml b/based/bin/rpc/Cargo.toml index 3da20a82d..8e58859c6 100644 --- a/based/bin/rpc/Cargo.toml +++ b/based/bin/rpc/Cargo.toml @@ -8,6 +8,8 @@ version.workspace = true alloy-eips.workspace = true alloy-primitives.workspace = true alloy-rpc-types.workspace = true +alloy-provider.workspace = true +alloy-network.workspace = true bop-common.workspace = true bop-metrics.workspace = true clap.workspace = true @@ -16,6 +18,7 @@ futures.workspace = true jsonrpsee.workspace = true op-alloy-rpc-types.workspace = true op-alloy-rpc-types-engine.workspace = true +op-alloy-network.workspace = true parking_lot.workspace = true reqwest.workspace = true reth-rpc-layer.workspace = true @@ -26,3 +29,8 @@ tokio.workspace = true tower.workspace = true tower-http.workspace = true tracing.workspace = true +tokio-websockets.workspace = true +http.workspace = true +futures-util.workspace = true +crossbeam-channel.workspace = true +tracing-subscriber.workspace = true \ No newline at end of file diff --git a/based/bin/rpc/src/cli.rs b/based/bin/rpc/src/cli.rs index f766dcfe3..5b18a9e87 100644 --- a/based/bin/rpc/src/cli.rs +++ b/based/bin/rpc/src/cli.rs @@ -8,12 +8,24 @@ use tracing::level_filters::LevelFilter; #[command(version, about, name = "based-rpc")] pub struct RpcArgs { /// The port to run the rpc on - #[arg(long = "port", default_value_t = 10545)] + #[arg(long = "port", default_value_t = 7545)] pub port: u16, - /// path to the json file containing the frag stream urls - #[arg(long = "fragstream_urls")] - pub fragstream_urls_path: PathBuf, + /// ws url of the frag stream + #[arg(long = "frag.url", default_value = "ws://0.0.0.0:9999/state_stream")] + pub frag_url: String, + + /// ws url of eth rpc + #[arg(long = "eth.ws.url", default_value = "ws://0.0.0.0:8546")] + pub eth_ws_url: String, + + /// http url of eth rpc + #[arg(long = "eth.http.url", default_value = "http://0.0.0.0:8545")] + pub eth_http_url: String, + + /// tx receiver url + #[arg(long = "sequencer.url", default_value = "http://0.0.0.0:8545")] + pub tx_receiver_url: Option, /// Enable debug logging #[arg(long)] diff --git a/based/bin/rpc/src/main.rs b/based/bin/rpc/src/main.rs index 9c5857d1a..98e919231 100644 --- a/based/bin/rpc/src/main.rs +++ b/based/bin/rpc/src/main.rs @@ -1,16 +1,568 @@ -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::{ + collections::VecDeque, + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::Arc, + thread, + time::{Duration, Instant}, +}; -use bop_common::utils::init_tracing; +use alloy_eips::{BlockId, BlockNumberOrTag}; +use alloy_network::ReceiptResponse; +use alloy_primitives::{ + Address, B256, Bytes, U64, U256, + map::foldhash::{HashMap, HashMapExt}, +}; +use alloy_provider::{Provider, ProviderBuilder, RootProvider, WsConnect, fillers::RecommendedFillers}; +use alloy_rpc_types::Header; +use bop_common::{ + api::OpRpcBlock, + p2p::{EnvV0, FragV0, SealV0, SignedVersionedMessage, StateUpdate, VersionedMessage}, + utils::{init_tracing, wait_for_signal}, +}; use clap::Parser; use cli::RpcArgs; +use crossbeam_channel::Sender; +use eyre::Result; +use futures_util::stream::StreamExt; +use http::Uri; +use jsonrpsee::{ + core::{RpcResult, async_trait}, http_client::HttpClientBuilder, server::{ServerBuilder, ServerConfigBuilder}, types::ErrorObject, ws_client::RpcServiceBuilder +}; +use op_alloy_network::Optimism; +use op_alloy_rpc_types::OpTransactionReceipt; +use parking_lot::RwLock; +use reqwest::Url; +use tower::ServiceBuilder; +use tower_http::cors::{Any, CorsLayer}; +use std::str::FromStr; +use tokio::time::interval; +use tokio_websockets::ClientBuilder; +use tracing::{debug, error, info, warn}; + +use crate::{middleware::{EthApiProxy, RpcClient}, types::EthApiServer}; + mod cli; +mod types; +mod middleware; + +type OpRootProvider = RootProvider; + +struct UnsealedBlock { + env: EnvV0, + current_frag: Option, + transaction_count_diff: HashMap, + receipts: HashMap, + balances: HashMap, + seal: Option, +} + +impl UnsealedBlock { + fn apply_frag(&mut self, frag: FragV0, state_update: Option) { + if self.current_frag.is_none() { + if frag.seq != 0 { + error!("expected first frag to have seq 0 but got seq {}", frag.seq); + return; + } + } else { + let current_frag = self.current_frag.as_ref().unwrap(); + let expected_seq = current_frag.seq + 1; + if expected_seq != frag.seq { + error!("expected frag seq {} but got seq {}", expected_seq, frag.seq); + return; + } + } + if self.seal.is_some() { + error!("trying to apply frag after seal"); + return; + } + + self.current_frag = Some(frag); + + if let Some(state_update) = state_update { + for (_tx_hash, receipt) in state_update.receipts.iter() { + let sender = receipt.from(); + self.transaction_count_diff.entry(sender).and_modify(|count| *count += 1).or_insert(1); + } + self.receipts.extend(state_update.receipts); + self.balances.extend(state_update.balances); + } + } + + fn apply_seal(&mut self, seal: SealV0) { + self.seal = Some(seal); + } + + fn get_transaction_count_diff(&self, address: Address) -> Option { + self.transaction_count_diff.get(&address).cloned() + } + + fn get_receipt(&self, tx_hash: B256) -> Option { + self.receipts.get(&tx_hash).cloned() + } + + fn get_balance(&self, address: Address) -> Option { + self.balances.get(&address).cloned() + } +} + +struct UnsealedBlockStack { + blocks: VecDeque, + root_provider_block_number: Option, +} + +impl UnsealedBlockStack { + fn new() -> Self { + Self { blocks: VecDeque::new(), root_provider_block_number: None } + } + + fn get_transaction_count_diff(&self, address: Address) -> u64 { + let mut total_diff = 0; + for block in self.blocks.iter().rev() { + total_diff += block.get_transaction_count_diff(address).unwrap_or(0); + } + total_diff + } + + fn get_receipt(&self, tx_hash: B256) -> Option { + for block in self.blocks.iter().rev() { + if let Some(receipt) = block.get_receipt(tx_hash) { + return Some(receipt); + } + } + None + } + + fn get_balance(&self, address: Address) -> Option { + for block in self.blocks.iter().rev() { + if let Some(balance) = block.get_balance(address) { + return Some(balance); + } + } + None + } + + fn block_number(&self) -> Option { + if let Some(block) = self.blocks.back() { + return Some(block.env.number); + } + if let Some(root_provider_block_number) = self.root_provider_block_number { + return Some(root_provider_block_number); + } + None + } +} + +pub fn spawn_receipt_listener_frag_stream(frag_url: &str, message_tx: Sender) { + let frag_url = frag_url.to_string(); + tokio::spawn(async move { + loop { + let uri = Uri::from_str(&frag_url).expect("invalid frag stream url"); + let maybe_client = ClientBuilder::from_uri(uri).connect().await; + let Ok((mut client, _)) = maybe_client else { + error!("failed to connect to frag stream, reconnecting..."); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + while let Some(Ok(msg)) = client.next().await { + let stream_data = msg.as_text().and_then(|s| serde_json::from_str::(s).ok()); + if let Some(msg) = stream_data { + message_tx.send(msg).expect("failed to send message"); + } + } + error!("frag stream closed, reconnecting..."); + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); +} + +#[derive(Clone)] +struct Server { + unsealed_stack: Arc>, + provider: OpRootProvider, + tx_receiver_provider: OpRootProvider, +} + +impl Server { + fn new(provider: OpRootProvider, tx_receiver_provider: OpRootProvider) -> Self { + Self { unsealed_stack: Arc::new(RwLock::new(UnsealedBlockStack::new())), provider, tx_receiver_provider } + } + + fn on_env(&self, env: EnvV0) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + if unsealed_block.blocks.is_empty() || unsealed_block.blocks.back().unwrap().env.number + 1 == env.number { + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.push_back(UnsealedBlock { + env, + current_frag: None, + transaction_count_diff: HashMap::new(), + receipts: HashMap::new(), + balances: HashMap::new(), + seal: None, + }); + }); + } else { + error!("expected block number"); + } + } + + fn on_header(&self, header: Header) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + while !unsealed_block.blocks.is_empty() && unsealed_block.blocks.front().unwrap().env.number <= header.number { + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.pop_front(); + blocks.root_provider_block_number = Some(header.number); + }); + } + } + + fn on_seal(&self, seal: SealV0) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + if unsealed_block.blocks.is_empty() { + // error!("trying to seal a block but there is no unsealed block"); + return; + } + let last_block = unsealed_block.blocks.back().unwrap(); // unwrap is safe because we just checked that the stack is not empty + if last_block.env.number != seal.block_number { + error!( + "trying to seal block number {} but the last unsealed block is number {}", + seal.block_number, last_block.env.number + ); + return; + } + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.back_mut().unwrap().apply_seal(seal); + }); + } + + fn on_frag(&self, frag: FragV0, state_update: Option) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + if unsealed_block.blocks.is_empty() { + return; + } + let last_block = unsealed_block.blocks.back().unwrap(); // unwrap is safe because we just checked that the stack is not empty + if last_block.env.number != frag.block_number { + error!( + "trying to apply frag for block number {} but the last unsealed block is number {}", + frag.block_number, last_block.env.number + ); + return; + } + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.back_mut().unwrap().apply_frag(frag, state_update); + }); + } + + async fn get_transaction_count(&self, address: Address) -> Result { + let stack = self.unsealed_stack.read(); + let transaction_count_diff = stack.get_transaction_count_diff(address); + let root_provider_block_number = stack.root_provider_block_number; + if let Some(root_provider_block_number) = root_provider_block_number { + let transaction_count = self + .provider + .get_transaction_count(address) + .block_id(BlockId::number(root_provider_block_number)) + .await?; + return Ok(transaction_count_diff + transaction_count); + } + let transaction_count = self.provider.get_transaction_count(address).await?; + Ok(transaction_count_diff + transaction_count) + } + + async fn get_balance(&self, address: Address) -> Result { + let stack = self.unsealed_stack.read(); + let balance = stack.get_balance(address); + if let Some(balance) = balance { + return Ok(balance); + } + let balance = self.provider.get_balance(address).await?; + Ok(balance) + } + + async fn get_receipt(&self, hash: B256) -> Result> { + let stack = self.unsealed_stack.read(); + if let Some(receipt) = stack.get_receipt(hash) { + return Ok(Some(receipt)); + } + let receipt = self.tx_receiver_provider.get_transaction_receipt(hash).await?; + Ok(receipt) + } -#[tokio::main] + async fn block_number(&self) -> Result { + let stack = self.unsealed_stack.read(); + if let Some(block_number) = stack.block_number() { + return Ok(block_number); + } else { + return Ok(self.provider.get_block_number().await?); + } + } +} + +pub fn spawn_block_listener(provider: OpRootProvider, block_tx: Sender
) { + tokio::spawn(async move { + loop { + info!("Attempting to subscribe to L1 block headers..."); + let sub_result = provider.subscribe_blocks().await; + + let mut block_stream = match sub_result { + Ok(sub) => { + info!("Successfully subscribed to L1 block headers."); + sub.into_stream() + } + Err(e) => { + error!(error = %e, "Failed to subscribe to L1 blocks, retrying in 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + while let Some(header) = block_stream.next().await { + block_tx.send(header).expect("failed to send block header"); + } + warn!("header stream ended. Attempting to resubscribe..."); + panic!("WS connection dropped. Restart the process."); // TODO: handle reconnection + // properly + } + }); +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> eyre::Result<()> { let args = RpcArgs::parse(); let _guard = init_tracing((&args).into()); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), args.port); - todo!() + let (block_tx, block_rx) = crossbeam_channel::bounded(100); + let (message_tx, message_rx) = crossbeam_channel::bounded(100); + + let eth_ws_url = args.eth_ws_url.clone(); + let provider_with_filler = ProviderBuilder::<_, _, Optimism>::default() + .connect_ws(WsConnect::new(Url::parse(ð_ws_url).unwrap())) + .await + .expect("failed to connect to eth rpc"); + + let provider = provider_with_filler.root(); + + spawn_receipt_listener_frag_stream(&args.frag_url.as_str(), message_tx); + spawn_block_listener(provider.clone(), block_tx); + + let tx_receiver_provider = match args.tx_receiver_url { + Some(url) => { + let parsed_url = Url::parse(&url).expect("invalid tx receiver url"); + let provider_with_filler = match parsed_url.scheme() { + "ws" | "wss" => ProviderBuilder::<_, _, Optimism>::default() + .connect_ws(WsConnect::new(parsed_url)) + .await + .expect("failed to connect to tx receiver via ws"), + "http" | "https" => ProviderBuilder::<_, _, Optimism>::default().connect_http(parsed_url), + _ => panic!("unsupported URL scheme for tx receiver: {}", parsed_url.scheme()), + }; + provider_with_filler.root().clone() + } + None => provider.clone(), + }; + + let server_obj = Server::new(provider.clone(), tx_receiver_provider); + + let server = server_obj.clone(); + thread::spawn(move || { + loop { + let mut should_sleep = true; + + while let Ok(msg) = message_rx.try_recv() { + match msg.message { + VersionedMessage::FragV0(frag) => { + debug!("got frag: block number {} seq {}", frag.block_number, frag.seq); + server.on_frag(frag, msg.state_update); + } + VersionedMessage::SealV0(seal) => { + debug!("got seal: block number {}", seal.block_number); + server.on_seal(seal); + if msg.state_update.is_some() { + error!("seal message should not contain state update"); + } + } + VersionedMessage::EnvV0(env) => { + debug!("got env: block number {}", env.number); + server.on_env(env); + if msg.state_update.is_some() { + error!("env message should not contain state update"); + } + } + _ => { + warn!("unsupported message type: {:?}", msg.message); + if msg.state_update.is_some() { + error!("unsupported message type should not contain state update"); + } + } + } + should_sleep = false; + } + + while let Ok(header) = block_rx.try_recv() { + debug!("got block header: block number {}", header.number); + server.on_header(header); + should_sleep = false; + } + + if should_sleep { + thread::sleep(Duration::from_millis(1)); + } + } + }); + + let server = server_obj.clone(); + tokio::spawn(async move { + let address_to_check = Address::from_str("0x4D36DE6a194dDF98EE57323CfA3A45351d35e442").unwrap(); + let mut interval = interval(Duration::from_secs_f64(0.1)); + loop { + let transaction_count = server.get_transaction_count(address_to_check).await.unwrap(); + let balance = server.get_balance(address_to_check).await.unwrap(); + let block_number = server.block_number().await.unwrap(); + info!("block number: {} count: {} balance: {:?}", block_number, transaction_count, balance); + interval.tick().await; + } + }); + + // temp: remove when factoring out the portal + let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any); + let cors_middleware = ServiceBuilder::new().layer(cors); + + let rpc_middleware = RpcServiceBuilder::new().layer_fn(move |s| EthApiProxy { + inner: s, + geth_client: create_client(Url::parse(args.eth_http_url.as_str()).unwrap(), Duration::from_secs(2)).unwrap(), + }); + + let rpc_server = ServerBuilder::default() + .set_config(ServerConfigBuilder::new().max_request_body_size(u32::MAX).max_response_body_size(u32::MAX).build()) + .set_rpc_middleware(rpc_middleware) + .set_http_middleware(cors_middleware) + .build(addr) + .await?; + + let mut module = EthApiServer::into_rpc(server_obj); + let server_handle = rpc_server.start(module); + + + tokio::select! { + _ = server_handle.stopped() => { + error!("server stopped"); + } + + _ = wait_for_signal() => { + info!("received signal, shutting down"); + } + } + + Ok(()) +} + +#[async_trait] +impl EthApiServer for Server { + async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult { + let hash = self.tx_receiver_provider.send_raw_transaction(&bytes).await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to send transaction", + Some(e.to_string()), + ) + })?; + Ok(*hash.tx_hash()) + } + + async fn transaction_receipt(&self, hash: B256) -> RpcResult> { + let receipt = self.get_receipt(hash).await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get transaction receipt", + Some(e.to_string()), + ) + })?; + Ok(receipt) + } + + async fn block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult> { + todo!() + } + + async fn block_by_hash(&self, hash: B256, full: bool) -> RpcResult> { + todo!() + } + + async fn block_number(&self) -> RpcResult { + let block_number = self.block_number().await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get block number", + Some(e.to_string()), + ) + })?; + Ok(U256::from(block_number)) + } + + async fn transaction_count(&self, address: Address, block_number: Option) -> RpcResult { + match block_number { + Some(BlockId::Number(BlockNumberOrTag::Pending)) => { + let block_number = self.block_number().await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get block number", + Some(e.to_string()), + ) + })?; + Ok(U256::from(block_number)) + } + _ => { + let transaction_count = self + .provider + .get_transaction_count(address) + .block_id(block_number.unwrap_or_default()) + .await + .map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get transaction count", + Some(e.to_string()), + ) + })?; + Ok(U256::from(transaction_count)) + } + } + } + + async fn balance(&self, address: Address, block_number: Option) -> RpcResult { + match block_number { + Some(BlockId::Number(BlockNumberOrTag::Pending)) => { + let block_number = self.block_number().await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get block number", + Some(e.to_string()), + ) + })?; + Ok(U256::from(block_number)) + } + _ => { + let balance = + self.provider.get_balance(address).block_id(block_number.unwrap_or_default()).await.map_err( + |e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get balance", + Some(e.to_string()), + ) + }, + )?; + Ok(U256::from(balance)) + } + } + } } + +pub fn create_client(url: Url, timeout: Duration) -> eyre::Result { + let client = HttpClientBuilder::default() + .max_request_size(u32::MAX) + .max_response_size(u32::MAX) + .request_timeout(timeout.into()) + .build(url)?; + Ok(client) +} \ No newline at end of file diff --git a/based/bin/rpc/src/middleware.rs b/based/bin/rpc/src/middleware.rs new file mode 100644 index 000000000..0607cbde8 --- /dev/null +++ b/based/bin/rpc/src/middleware.rs @@ -0,0 +1,101 @@ +use bop_common::{ + communication::Producer, + metrics::{Counter, Metric, MetricsUpdate}, + utils::uuid, +}; +use futures::FutureExt; +use jsonrpsee::{ + MethodResponse, + core::{ + ClientError, + client::ClientT, + middleware::{Batch, Notification}, + traits::ToRpcParams, + }, + server::middleware::rpc::RpcServiceT, + types::{ErrorObject, Params, Request, ResponsePayload, error::INTERNAL_ERROR_CODE}, +}; +use serde_json::value::RawValue; +use tracing::{debug, error}; + +pub type RpcClient = jsonrpsee::http_client::HttpClient; + +#[derive(Clone)] +pub struct EthApiProxy { + pub inner: S, + pub geth_client: RpcClient, +} + +const SUPPORTED_METHODS: &[&str] = &[ + "eth_sendRawTransaction", + "eth_getTransactionReceipt", + "eth_getBlockByNumber", + "eth_getBlockByHash", + "eth_blockNumber", + "eth_getTransactionCount", + "eth_getBalance", +]; + +impl RpcServiceT for EthApiProxy +where + S: RpcServiceT + Send + Sync + Clone + 'static, +{ + type BatchResponse = S::BatchResponse; + type MethodResponse = S::MethodResponse; + type NotificationResponse = S::NotificationResponse; + + #[tracing::instrument(skip_all, name = "middleware")] + fn call<'a>(&self, req: Request<'a>) -> impl Future + Send + 'a { + let inner = self.inner.clone(); + let fallback_client = self.geth_client.clone(); + + async move { + if SUPPORTED_METHODS.contains(&req.method_name()) { + inner.call(req).await + } else { + external_call(fallback_client.clone(), &req).await + } + } + .boxed() + } + + fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { + self.inner.batch(batch) + } + + fn notification<'a>(&self, n: Notification<'a>) -> impl Future + Send + 'a { + self.inner.notification(n) + } +} + +struct WrapParams<'a>(Params<'a>); +impl ToRpcParams for WrapParams<'_> { + fn to_rpc_params(self) -> Result>, serde_json::Error> { + self.0.as_str().map(String::from).map(RawValue::from_string).transpose() + } +} + +async fn external_call(client: S, req: &Request<'_>) -> MethodResponse +where + S: ClientT + Send + Sync + 'static, +{ + let r: Result = + client.request(req.method_name(), WrapParams(req.params())).await; + match r { + Ok(value) => { + let payload = ResponsePayload::success(&value); + debug!(method = %req.method_name(), "Forwarding request to client"); + MethodResponse::response(req.id.clone(), payload.into(), 4_000_000_000usize) + } + Err(err) => { + error!(error = %err, "Error calling client"); + match err { + ClientError::Call(e) => MethodResponse::error(req.id.clone(), e), + _ => MethodResponse::error( + req.id.clone(), + ErrorObject::owned(INTERNAL_ERROR_CODE, "client error".to_string(), Some(err.to_string())), + ), + } + } + } +} diff --git a/based/bin/rpc/src/types.rs b/based/bin/rpc/src/types.rs index e69de29bb..df550af6a 100644 --- a/based/bin/rpc/src/types.rs +++ b/based/bin/rpc/src/types.rs @@ -0,0 +1,38 @@ +use alloy_eips::{BlockId, BlockNumberOrTag}; +use alloy_primitives::{Address, B256, Bytes, U256}; +use bop_common::api::OpRpcBlock; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use op_alloy_rpc_types::OpTransactionReceipt; + + +// taken from https://github.com/gattaca-com/based-op/blob/397d48b73d088f40721ae0ba002d251dcf6f38cc/based/crates/common/src/api.rs#L91-L128 +#[rpc(client, server, namespace = "eth")] +pub trait EthApi { + /// Sends signed transaction, returning its hash + #[method(name = "sendRawTransaction")] + async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult; + + /// Returns the receipt of a transaction by transaction hash + #[method(name = "getTransactionReceipt")] + async fn transaction_receipt(&self, hash: B256) -> RpcResult>; + + /// Returns a block with a given identifier + #[method(name = "getBlockByNumber")] + async fn block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult>; + + /// Returns information about a block by hash. + #[method(name = "getBlockByHash")] + async fn block_by_hash(&self, hash: B256, full: bool) -> RpcResult>; + + /// Returns the number of most recent block + #[method(name = "blockNumber")] + async fn block_number(&self) -> RpcResult; + + /// Returns the nonce of a given address at a given block number. + #[method(name = "getTransactionCount")] + async fn transaction_count(&self, address: Address, block_number: Option) -> RpcResult; + + /// Returns the balance of the account of given address. + #[method(name = "getBalance")] + async fn balance(&self, address: Address, block_number: Option) -> RpcResult; +} diff --git a/based/bin/txspammer/Cargo.toml b/based/bin/txspammer/Cargo.toml index b22212d90..5f88f97c9 100644 --- a/based/bin/txspammer/Cargo.toml +++ b/based/bin/txspammer/Cargo.toml @@ -18,7 +18,7 @@ bop-metrics.workspace = true clap.workspace = true eyre.workspace = true futures.workspace = true -futures-util = "0.3.31" +futures-util.workspace = true http.workspace = true jsonrpsee.workspace = true op-alloy-rpc-types.workspace = true @@ -31,7 +31,7 @@ serde.workspace = true serde_json.workspace = true thiserror.workspace = true tokio.workspace = true -tokio-websockets = { version = "0.12.1", features = ["client", "openssl", "rand", "server"] } +tokio-websockets.workspace = true tower.workspace = true tower-http.workspace = true tracing.workspace = true diff --git a/based/bin/txspammer/src/main.rs b/based/bin/txspammer/src/main.rs index c41fb5ce9..66b29f387 100644 --- a/based/bin/txspammer/src/main.rs +++ b/based/bin/txspammer/src/main.rs @@ -219,7 +219,11 @@ impl TxSpammer { let _ = loop { match provider.get_transaction_receipt(tx_hash).await { Ok(Some(receipt)) => break receipt, - _ => { + Err(e) => { + warn!("failed to get transaction receipt: {}", e); + sleep(Duration::from_millis(5)).await; + } + Ok(None) => { sleep(Duration::from_millis(5)).await; } } diff --git a/based/crates/common/src/p2p.rs b/based/crates/common/src/p2p.rs index aed897d32..affc34da4 100644 --- a/based/crates/common/src/p2p.rs +++ b/based/crates/common/src/p2p.rs @@ -17,17 +17,17 @@ pub type ExtraData = VariableList; #[derive(Debug, Clone, PartialEq, Eq, TreeHash, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct EnvV0 { - number: u64, - parent_hash: B256, - beneficiary: Address, - timestamp: u64, - gas_limit: u64, - basefee: u64, - difficulty: U256, - prevrandao: B256, + pub number: u64, + pub parent_hash: B256, + pub beneficiary: Address, + pub timestamp: u64, + pub gas_limit: u64, + pub basefee: u64, + pub difficulty: U256, + pub prevrandao: B256, #[serde(with = "ssz_types::serde_utils::hex_var_list")] - extra_data: ExtraData, - parent_beacon_block_root: B256, + pub extra_data: ExtraData, + pub parent_beacon_block_root: B256, } impl EnvV0 { diff --git a/based/crates/rpc/src/gossiper.rs b/based/crates/rpc/src/gossiper.rs index 8a539a3ef..79dffffb6 100644 --- a/based/crates/rpc/src/gossiper.rs +++ b/based/crates/rpc/src/gossiper.rs @@ -37,8 +37,11 @@ impl Gossiper { let signed = msg.to_signed(&self.signer); let payload = signed.to_json(); - if matches!(signed.message, p2p::VersionedMessage::FragV0(_)) && self.frag_broadcast.send(signed).is_err() { - tracing::debug!("broadcast of frag failed") + // if matches!(signed.message, p2p::VersionedMessage::FragV0(_)) && self.frag_broadcast.send(signed).is_err() { + // tracing::debug!("broadcast of frag failed") + // } + if self.frag_broadcast.send(signed).is_err() { + tracing::debug!("broadcast of message failed") } let res = match self.client.post(self.target_rpc.clone()).json(&payload).send() { From 4649f699f1fb404a871285a476ded3bd8eff77ab Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Thu, 27 Nov 2025 11:48:56 +0000 Subject: [PATCH 03/14] clean up --- based/bin/rpc/src/listener.rs | 62 ++++ based/bin/rpc/src/main.rs | 438 ++-------------------------- based/bin/rpc/src/middleware.rs | 9 +- based/bin/rpc/src/server.rs | 258 ++++++++++++++++ based/bin/rpc/src/types.rs | 18 +- based/bin/rpc/src/unsealed_block.rs | 112 +++++++ 6 files changed, 464 insertions(+), 433 deletions(-) create mode 100644 based/bin/rpc/src/listener.rs create mode 100644 based/bin/rpc/src/server.rs create mode 100644 based/bin/rpc/src/unsealed_block.rs diff --git a/based/bin/rpc/src/listener.rs b/based/bin/rpc/src/listener.rs new file mode 100644 index 000000000..02c2b4f1e --- /dev/null +++ b/based/bin/rpc/src/listener.rs @@ -0,0 +1,62 @@ +use std::{str::FromStr, time::Duration}; + +use alloy_provider::Provider; +use alloy_rpc_types::Header; +use bop_common::p2p::SignedVersionedMessage; +use crossbeam_channel::Sender; +use futures_util::stream::StreamExt; +use http::Uri; +use tokio_websockets::ClientBuilder; +use tracing::{error, info, warn}; + +use crate::types::OpRootProvider; + +pub fn spawn_block_listener(provider: OpRootProvider, block_tx: Sender
) { + tokio::spawn(async move { + loop { + info!("Attempting to subscribe to L1 block headers..."); + let sub_result = provider.subscribe_blocks().await; + + let mut block_stream = match sub_result { + Ok(sub) => { + info!("Successfully subscribed to L1 block headers."); + sub.into_stream() + } + Err(e) => { + error!(error = %e, "Failed to subscribe to L1 blocks, retrying in 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + while let Some(header) = block_stream.next().await { + block_tx.send(header).expect("failed to send block header"); + } + warn!("header stream ended. Attempting to resubscribe..."); + panic!("WS connection dropped. Restart the process."); // TODO: handle reconnection + // properly + } + }); +} + +pub fn spawn_receipt_listener_frag_stream(frag_url: &str, message_tx: Sender) { + let frag_url = frag_url.to_string(); + tokio::spawn(async move { + loop { + let uri = Uri::from_str(&frag_url).expect("invalid frag stream url"); + let maybe_client = ClientBuilder::from_uri(uri).connect().await; + let Ok((mut client, _)) = maybe_client else { + error!("failed to connect to frag stream, reconnecting..."); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + while let Some(Ok(msg)) = client.next().await { + let stream_data = msg.as_text().and_then(|s| serde_json::from_str::(s).ok()); + if let Some(msg) = stream_data { + message_tx.send(msg).expect("failed to send message"); + } + } + error!("frag stream closed, reconnecting..."); + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); +} diff --git a/based/bin/rpc/src/main.rs b/based/bin/rpc/src/main.rs index 98e919231..ba8ac16b4 100644 --- a/based/bin/rpc/src/main.rs +++ b/based/bin/rpc/src/main.rs @@ -1,328 +1,42 @@ use std::{ - collections::VecDeque, net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::Arc, + str::FromStr, thread, - time::{Duration, Instant}, + time::Duration, }; -use alloy_eips::{BlockId, BlockNumberOrTag}; -use alloy_network::ReceiptResponse; -use alloy_primitives::{ - Address, B256, Bytes, U64, U256, - map::foldhash::{HashMap, HashMapExt}, -}; -use alloy_provider::{Provider, ProviderBuilder, RootProvider, WsConnect, fillers::RecommendedFillers}; -use alloy_rpc_types::Header; +use alloy_primitives::Address; +use alloy_provider::{Provider, ProviderBuilder, WsConnect}; use bop_common::{ - api::OpRpcBlock, - p2p::{EnvV0, FragV0, SealV0, SignedVersionedMessage, StateUpdate, VersionedMessage}, + p2p::VersionedMessage, utils::{init_tracing, wait_for_signal}, }; use clap::Parser; use cli::RpcArgs; -use crossbeam_channel::Sender; -use eyre::Result; -use futures_util::stream::StreamExt; -use http::Uri; use jsonrpsee::{ - core::{RpcResult, async_trait}, http_client::HttpClientBuilder, server::{ServerBuilder, ServerConfigBuilder}, types::ErrorObject, ws_client::RpcServiceBuilder + server::{ServerBuilder, ServerConfigBuilder}, + ws_client::RpcServiceBuilder, }; use op_alloy_network::Optimism; -use op_alloy_rpc_types::OpTransactionReceipt; -use parking_lot::RwLock; use reqwest::Url; +use tokio::time::interval; use tower::ServiceBuilder; use tower_http::cors::{Any, CorsLayer}; -use std::str::FromStr; -use tokio::time::interval; -use tokio_websockets::ClientBuilder; use tracing::{debug, error, info, warn}; -use crate::{middleware::{EthApiProxy, RpcClient}, types::EthApiServer}; +use crate::{ + listener::{spawn_block_listener, spawn_receipt_listener_frag_stream}, + middleware::EthApiProxy, + server::{Server, create_client}, + types::EthApiServer, +}; mod cli; -mod types; +mod listener; mod middleware; - -type OpRootProvider = RootProvider; - -struct UnsealedBlock { - env: EnvV0, - current_frag: Option, - transaction_count_diff: HashMap, - receipts: HashMap, - balances: HashMap, - seal: Option, -} - -impl UnsealedBlock { - fn apply_frag(&mut self, frag: FragV0, state_update: Option) { - if self.current_frag.is_none() { - if frag.seq != 0 { - error!("expected first frag to have seq 0 but got seq {}", frag.seq); - return; - } - } else { - let current_frag = self.current_frag.as_ref().unwrap(); - let expected_seq = current_frag.seq + 1; - if expected_seq != frag.seq { - error!("expected frag seq {} but got seq {}", expected_seq, frag.seq); - return; - } - } - if self.seal.is_some() { - error!("trying to apply frag after seal"); - return; - } - - self.current_frag = Some(frag); - - if let Some(state_update) = state_update { - for (_tx_hash, receipt) in state_update.receipts.iter() { - let sender = receipt.from(); - self.transaction_count_diff.entry(sender).and_modify(|count| *count += 1).or_insert(1); - } - self.receipts.extend(state_update.receipts); - self.balances.extend(state_update.balances); - } - } - - fn apply_seal(&mut self, seal: SealV0) { - self.seal = Some(seal); - } - - fn get_transaction_count_diff(&self, address: Address) -> Option { - self.transaction_count_diff.get(&address).cloned() - } - - fn get_receipt(&self, tx_hash: B256) -> Option { - self.receipts.get(&tx_hash).cloned() - } - - fn get_balance(&self, address: Address) -> Option { - self.balances.get(&address).cloned() - } -} - -struct UnsealedBlockStack { - blocks: VecDeque, - root_provider_block_number: Option, -} - -impl UnsealedBlockStack { - fn new() -> Self { - Self { blocks: VecDeque::new(), root_provider_block_number: None } - } - - fn get_transaction_count_diff(&self, address: Address) -> u64 { - let mut total_diff = 0; - for block in self.blocks.iter().rev() { - total_diff += block.get_transaction_count_diff(address).unwrap_or(0); - } - total_diff - } - - fn get_receipt(&self, tx_hash: B256) -> Option { - for block in self.blocks.iter().rev() { - if let Some(receipt) = block.get_receipt(tx_hash) { - return Some(receipt); - } - } - None - } - - fn get_balance(&self, address: Address) -> Option { - for block in self.blocks.iter().rev() { - if let Some(balance) = block.get_balance(address) { - return Some(balance); - } - } - None - } - - fn block_number(&self) -> Option { - if let Some(block) = self.blocks.back() { - return Some(block.env.number); - } - if let Some(root_provider_block_number) = self.root_provider_block_number { - return Some(root_provider_block_number); - } - None - } -} - -pub fn spawn_receipt_listener_frag_stream(frag_url: &str, message_tx: Sender) { - let frag_url = frag_url.to_string(); - tokio::spawn(async move { - loop { - let uri = Uri::from_str(&frag_url).expect("invalid frag stream url"); - let maybe_client = ClientBuilder::from_uri(uri).connect().await; - let Ok((mut client, _)) = maybe_client else { - error!("failed to connect to frag stream, reconnecting..."); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - }; - while let Some(Ok(msg)) = client.next().await { - let stream_data = msg.as_text().and_then(|s| serde_json::from_str::(s).ok()); - if let Some(msg) = stream_data { - message_tx.send(msg).expect("failed to send message"); - } - } - error!("frag stream closed, reconnecting..."); - tokio::time::sleep(Duration::from_secs(1)).await; - } - }); -} - -#[derive(Clone)] -struct Server { - unsealed_stack: Arc>, - provider: OpRootProvider, - tx_receiver_provider: OpRootProvider, -} - -impl Server { - fn new(provider: OpRootProvider, tx_receiver_provider: OpRootProvider) -> Self { - Self { unsealed_stack: Arc::new(RwLock::new(UnsealedBlockStack::new())), provider, tx_receiver_provider } - } - - fn on_env(&self, env: EnvV0) { - let mut unsealed_block = self.unsealed_stack.upgradable_read(); - if unsealed_block.blocks.is_empty() || unsealed_block.blocks.back().unwrap().env.number + 1 == env.number { - unsealed_block.with_upgraded(|blocks| { - blocks.blocks.push_back(UnsealedBlock { - env, - current_frag: None, - transaction_count_diff: HashMap::new(), - receipts: HashMap::new(), - balances: HashMap::new(), - seal: None, - }); - }); - } else { - error!("expected block number"); - } - } - - fn on_header(&self, header: Header) { - let mut unsealed_block = self.unsealed_stack.upgradable_read(); - while !unsealed_block.blocks.is_empty() && unsealed_block.blocks.front().unwrap().env.number <= header.number { - unsealed_block.with_upgraded(|blocks| { - blocks.blocks.pop_front(); - blocks.root_provider_block_number = Some(header.number); - }); - } - } - - fn on_seal(&self, seal: SealV0) { - let mut unsealed_block = self.unsealed_stack.upgradable_read(); - if unsealed_block.blocks.is_empty() { - // error!("trying to seal a block but there is no unsealed block"); - return; - } - let last_block = unsealed_block.blocks.back().unwrap(); // unwrap is safe because we just checked that the stack is not empty - if last_block.env.number != seal.block_number { - error!( - "trying to seal block number {} but the last unsealed block is number {}", - seal.block_number, last_block.env.number - ); - return; - } - unsealed_block.with_upgraded(|blocks| { - blocks.blocks.back_mut().unwrap().apply_seal(seal); - }); - } - - fn on_frag(&self, frag: FragV0, state_update: Option) { - let mut unsealed_block = self.unsealed_stack.upgradable_read(); - if unsealed_block.blocks.is_empty() { - return; - } - let last_block = unsealed_block.blocks.back().unwrap(); // unwrap is safe because we just checked that the stack is not empty - if last_block.env.number != frag.block_number { - error!( - "trying to apply frag for block number {} but the last unsealed block is number {}", - frag.block_number, last_block.env.number - ); - return; - } - unsealed_block.with_upgraded(|blocks| { - blocks.blocks.back_mut().unwrap().apply_frag(frag, state_update); - }); - } - - async fn get_transaction_count(&self, address: Address) -> Result { - let stack = self.unsealed_stack.read(); - let transaction_count_diff = stack.get_transaction_count_diff(address); - let root_provider_block_number = stack.root_provider_block_number; - if let Some(root_provider_block_number) = root_provider_block_number { - let transaction_count = self - .provider - .get_transaction_count(address) - .block_id(BlockId::number(root_provider_block_number)) - .await?; - return Ok(transaction_count_diff + transaction_count); - } - let transaction_count = self.provider.get_transaction_count(address).await?; - Ok(transaction_count_diff + transaction_count) - } - - async fn get_balance(&self, address: Address) -> Result { - let stack = self.unsealed_stack.read(); - let balance = stack.get_balance(address); - if let Some(balance) = balance { - return Ok(balance); - } - let balance = self.provider.get_balance(address).await?; - Ok(balance) - } - - async fn get_receipt(&self, hash: B256) -> Result> { - let stack = self.unsealed_stack.read(); - if let Some(receipt) = stack.get_receipt(hash) { - return Ok(Some(receipt)); - } - let receipt = self.tx_receiver_provider.get_transaction_receipt(hash).await?; - Ok(receipt) - } - - async fn block_number(&self) -> Result { - let stack = self.unsealed_stack.read(); - if let Some(block_number) = stack.block_number() { - return Ok(block_number); - } else { - return Ok(self.provider.get_block_number().await?); - } - } -} - -pub fn spawn_block_listener(provider: OpRootProvider, block_tx: Sender
) { - tokio::spawn(async move { - loop { - info!("Attempting to subscribe to L1 block headers..."); - let sub_result = provider.subscribe_blocks().await; - - let mut block_stream = match sub_result { - Ok(sub) => { - info!("Successfully subscribed to L1 block headers."); - sub.into_stream() - } - Err(e) => { - error!(error = %e, "Failed to subscribe to L1 blocks, retrying in 5s"); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - }; - while let Some(header) = block_stream.next().await { - block_tx.send(header).expect("failed to send block header"); - } - warn!("header stream ended. Attempting to resubscribe..."); - panic!("WS connection dropped. Restart the process."); // TODO: handle reconnection - // properly - } - }); -} +mod server; +mod types; +mod unsealed_block; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> eyre::Result<()> { @@ -342,7 +56,7 @@ async fn main() -> eyre::Result<()> { let provider = provider_with_filler.root(); - spawn_receipt_listener_frag_stream(&args.frag_url.as_str(), message_tx); + spawn_receipt_listener_frag_stream(args.frag_url.as_str(), message_tx); spawn_block_listener(provider.clone(), block_tx); let tx_receiver_provider = match args.tx_receiver_url { @@ -439,10 +153,9 @@ async fn main() -> eyre::Result<()> { .build(addr) .await?; - let mut module = EthApiServer::into_rpc(server_obj); + let module = EthApiServer::into_rpc(server_obj); let server_handle = rpc_server.start(module); - tokio::select! { _ = server_handle.stopped() => { error!("server stopped"); @@ -455,114 +168,3 @@ async fn main() -> eyre::Result<()> { Ok(()) } - -#[async_trait] -impl EthApiServer for Server { - async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult { - let hash = self.tx_receiver_provider.send_raw_transaction(&bytes).await.map_err(|e| { - ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to send transaction", - Some(e.to_string()), - ) - })?; - Ok(*hash.tx_hash()) - } - - async fn transaction_receipt(&self, hash: B256) -> RpcResult> { - let receipt = self.get_receipt(hash).await.map_err(|e| { - ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to get transaction receipt", - Some(e.to_string()), - ) - })?; - Ok(receipt) - } - - async fn block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult> { - todo!() - } - - async fn block_by_hash(&self, hash: B256, full: bool) -> RpcResult> { - todo!() - } - - async fn block_number(&self) -> RpcResult { - let block_number = self.block_number().await.map_err(|e| { - ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to get block number", - Some(e.to_string()), - ) - })?; - Ok(U256::from(block_number)) - } - - async fn transaction_count(&self, address: Address, block_number: Option) -> RpcResult { - match block_number { - Some(BlockId::Number(BlockNumberOrTag::Pending)) => { - let block_number = self.block_number().await.map_err(|e| { - ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to get block number", - Some(e.to_string()), - ) - })?; - Ok(U256::from(block_number)) - } - _ => { - let transaction_count = self - .provider - .get_transaction_count(address) - .block_id(block_number.unwrap_or_default()) - .await - .map_err(|e| { - ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to get transaction count", - Some(e.to_string()), - ) - })?; - Ok(U256::from(transaction_count)) - } - } - } - - async fn balance(&self, address: Address, block_number: Option) -> RpcResult { - match block_number { - Some(BlockId::Number(BlockNumberOrTag::Pending)) => { - let block_number = self.block_number().await.map_err(|e| { - ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to get block number", - Some(e.to_string()), - ) - })?; - Ok(U256::from(block_number)) - } - _ => { - let balance = - self.provider.get_balance(address).block_id(block_number.unwrap_or_default()).await.map_err( - |e| { - ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to get balance", - Some(e.to_string()), - ) - }, - )?; - Ok(U256::from(balance)) - } - } - } -} - -pub fn create_client(url: Url, timeout: Duration) -> eyre::Result { - let client = HttpClientBuilder::default() - .max_request_size(u32::MAX) - .max_response_size(u32::MAX) - .request_timeout(timeout.into()) - .build(url)?; - Ok(client) -} \ No newline at end of file diff --git a/based/bin/rpc/src/middleware.rs b/based/bin/rpc/src/middleware.rs index 0607cbde8..e0505d12d 100644 --- a/based/bin/rpc/src/middleware.rs +++ b/based/bin/rpc/src/middleware.rs @@ -1,8 +1,3 @@ -use bop_common::{ - communication::Producer, - metrics::{Counter, Metric, MetricsUpdate}, - utils::uuid, -}; use futures::FutureExt; use jsonrpsee::{ MethodResponse, @@ -29,8 +24,8 @@ pub struct EthApiProxy { const SUPPORTED_METHODS: &[&str] = &[ "eth_sendRawTransaction", "eth_getTransactionReceipt", - "eth_getBlockByNumber", - "eth_getBlockByHash", + // "eth_getBlockByNumber", + // "eth_getBlockByHash", "eth_blockNumber", "eth_getTransactionCount", "eth_getBalance", diff --git a/based/bin/rpc/src/server.rs b/based/bin/rpc/src/server.rs new file mode 100644 index 000000000..3390659f3 --- /dev/null +++ b/based/bin/rpc/src/server.rs @@ -0,0 +1,258 @@ +use std::{sync::Arc, time::Duration}; + +use alloy_eips::{BlockId, BlockNumberOrTag}; +use alloy_primitives::{ + Address, B256, Bytes, U256, + map::foldhash::{HashMap, HashMapExt}, +}; +use alloy_provider::Provider; +use alloy_rpc_types::Header; +use bop_common::p2p::{EnvV0, FragV0, SealV0, StateUpdate}; +use eyre::Result; +use jsonrpsee::{ + core::{RpcResult, async_trait}, + http_client::HttpClientBuilder, + types::ErrorObject, +}; +use op_alloy_rpc_types::OpTransactionReceipt; +use parking_lot::RwLock; +use reqwest::Url; +use tracing::error; + +use crate::{ + middleware::RpcClient, + types::{EthApiServer, OpRootProvider}, + unsealed_block::{UnsealedBlock, UnsealedBlockStack}, +}; + +#[derive(Clone)] +pub struct Server { + unsealed_stack: Arc>, + provider: OpRootProvider, + tx_receiver_provider: OpRootProvider, +} + +impl Server { + pub fn new(provider: OpRootProvider, tx_receiver_provider: OpRootProvider) -> Self { + Self { unsealed_stack: Arc::new(RwLock::new(UnsealedBlockStack::new())), provider, tx_receiver_provider } + } + + pub fn on_env(&self, env: EnvV0) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + if unsealed_block.blocks.is_empty() || unsealed_block.blocks.back().unwrap().env.number + 1 == env.number { + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.push_back(UnsealedBlock { + env, + current_frag: None, + transaction_count_diff: HashMap::new(), + receipts: HashMap::new(), + balances: HashMap::new(), + seal: None, + }); + }); + } else { + error!("expected block number"); + } + } + + pub fn on_header(&self, header: Header) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + while !unsealed_block.blocks.is_empty() && unsealed_block.blocks.front().unwrap().env.number <= header.number { + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.pop_front(); + blocks.root_provider_block_number = Some(header.number); + }); + } + } + + pub fn on_seal(&self, seal: SealV0) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + if unsealed_block.blocks.is_empty() { + // error!("trying to seal a block but there is no unsealed block"); + return; + } + let last_block = unsealed_block.blocks.back().unwrap(); // unwrap is safe because we just checked that the stack is not empty + if last_block.env.number != seal.block_number { + error!( + "trying to seal block number {} but the last unsealed block is number {}", + seal.block_number, last_block.env.number + ); + return; + } + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.back_mut().unwrap().apply_seal(seal); + }); + } + + pub fn on_frag(&self, frag: FragV0, state_update: Option) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + if unsealed_block.blocks.is_empty() { + return; + } + let last_block = unsealed_block.blocks.back().unwrap(); // unwrap is safe because we just checked that the stack is not empty + if last_block.env.number != frag.block_number { + error!( + "trying to apply frag for block number {} but the last unsealed block is number {}", + frag.block_number, last_block.env.number + ); + return; + } + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.back_mut().unwrap().apply_frag(frag, state_update); + }); + } + + pub async fn get_transaction_count(&self, address: Address) -> Result { + let (transaction_count_diff, root_provider_block_number) = { + let stack = self.unsealed_stack.read(); + (stack.get_transaction_count_diff(address), stack.root_provider_block_number) + }; + + if let Some(root_provider_block_number) = root_provider_block_number { + let transaction_count = self + .provider + .get_transaction_count(address) + .block_id(BlockId::number(root_provider_block_number)) + .await?; + return Ok(transaction_count_diff + transaction_count); + } + let transaction_count = self.provider.get_transaction_count(address).await?; + Ok(transaction_count_diff + transaction_count) + } + + pub async fn get_balance(&self, address: Address) -> Result { + let balance = self.unsealed_stack.read().get_balance(address); + if let Some(balance) = balance { + return Ok(balance); + } + let balance = self.provider.get_balance(address).await?; + Ok(balance) + } + + pub async fn get_receipt(&self, hash: B256) -> Result> { + if let Some(receipt) = self.unsealed_stack.read().get_receipt(hash) { + return Ok(Some(receipt)); + } + let receipt = self.tx_receiver_provider.get_transaction_receipt(hash).await?; + Ok(receipt) + } + + pub async fn block_number(&self) -> Result { + if let Some(block_number) = self.unsealed_stack.read().block_number() { + Ok(block_number) + } else { + Ok(self.provider.get_block_number().await?) + } + } +} + +#[async_trait] +impl EthApiServer for Server { + async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult { + let hash = self.tx_receiver_provider.send_raw_transaction(&bytes).await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to send transaction", + Some(e.to_string()), + ) + })?; + Ok(*hash.tx_hash()) + } + + async fn transaction_receipt(&self, hash: B256) -> RpcResult> { + let receipt = self.get_receipt(hash).await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get transaction receipt", + Some(e.to_string()), + ) + })?; + Ok(receipt) + } + + // async fn block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult> { + // todo!() + // } + + // async fn block_by_hash(&self, hash: B256, full: bool) -> RpcResult> { + // todo!() + // } + + async fn block_number(&self) -> RpcResult { + let block_number = self.block_number().await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get block number", + Some(e.to_string()), + ) + })?; + Ok(U256::from(block_number)) + } + + async fn transaction_count(&self, address: Address, block_number: Option) -> RpcResult { + match block_number { + Some(BlockId::Number(BlockNumberOrTag::Pending)) => { + let block_number = self.block_number().await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get block number", + Some(e.to_string()), + ) + })?; + Ok(U256::from(block_number)) + } + _ => { + let transaction_count = self + .provider + .get_transaction_count(address) + .block_id(block_number.unwrap_or_default()) + .await + .map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get transaction count", + Some(e.to_string()), + ) + })?; + Ok(U256::from(transaction_count)) + } + } + } + + async fn balance(&self, address: Address, block_number: Option) -> RpcResult { + match block_number { + Some(BlockId::Number(BlockNumberOrTag::Pending)) => { + let block_number = self.block_number().await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get block number", + Some(e.to_string()), + ) + })?; + Ok(U256::from(block_number)) + } + _ => { + let balance = + self.provider.get_balance(address).block_id(block_number.unwrap_or_default()).await.map_err( + |e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get balance", + Some(e.to_string()), + ) + }, + )?; + Ok(U256::from(balance)) + } + } + } +} + +pub fn create_client(url: Url, timeout: Duration) -> eyre::Result { + let client = HttpClientBuilder::default() + .max_request_size(u32::MAX) + .max_response_size(u32::MAX) + .request_timeout(timeout) + .build(url)?; + Ok(client) +} diff --git a/based/bin/rpc/src/types.rs b/based/bin/rpc/src/types.rs index df550af6a..25600898b 100644 --- a/based/bin/rpc/src/types.rs +++ b/based/bin/rpc/src/types.rs @@ -1,9 +1,11 @@ -use alloy_eips::{BlockId, BlockNumberOrTag}; +use alloy_eips::BlockId; use alloy_primitives::{Address, B256, Bytes, U256}; -use bop_common::api::OpRpcBlock; +use alloy_provider::RootProvider; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use op_alloy_network::Optimism; use op_alloy_rpc_types::OpTransactionReceipt; +pub type OpRootProvider = RootProvider; // taken from https://github.com/gattaca-com/based-op/blob/397d48b73d088f40721ae0ba002d251dcf6f38cc/based/crates/common/src/api.rs#L91-L128 #[rpc(client, server, namespace = "eth")] @@ -16,13 +18,13 @@ pub trait EthApi { #[method(name = "getTransactionReceipt")] async fn transaction_receipt(&self, hash: B256) -> RpcResult>; - /// Returns a block with a given identifier - #[method(name = "getBlockByNumber")] - async fn block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult>; + // /// Returns a block with a given identifier + // #[method(name = "getBlockByNumber")] + // async fn block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult>; - /// Returns information about a block by hash. - #[method(name = "getBlockByHash")] - async fn block_by_hash(&self, hash: B256, full: bool) -> RpcResult>; + // /// Returns information about a block by hash. + // #[method(name = "getBlockByHash")] + // async fn block_by_hash(&self, hash: B256, full: bool) -> RpcResult>; /// Returns the number of most recent block #[method(name = "blockNumber")] diff --git a/based/bin/rpc/src/unsealed_block.rs b/based/bin/rpc/src/unsealed_block.rs new file mode 100644 index 000000000..d3a7bbe5b --- /dev/null +++ b/based/bin/rpc/src/unsealed_block.rs @@ -0,0 +1,112 @@ +use std::collections::VecDeque; + +use alloy_network::ReceiptResponse; +use alloy_primitives::{Address, B256, U256, map::foldhash::HashMap}; +use bop_common::p2p::{EnvV0, FragV0, SealV0, StateUpdate}; +use op_alloy_rpc_types::OpTransactionReceipt; +use tracing::error; + +pub struct UnsealedBlock { + pub env: EnvV0, + pub current_frag: Option, + pub transaction_count_diff: HashMap, + pub receipts: HashMap, + pub balances: HashMap, + pub seal: Option, +} + +impl UnsealedBlock { + pub fn apply_frag(&mut self, frag: FragV0, state_update: Option) { + if self.current_frag.is_none() { + if frag.seq != 0 { + error!("expected first frag to have seq 0 but got seq {}", frag.seq); + return; + } + } else { + let current_frag = self.current_frag.as_ref().unwrap(); + let expected_seq = current_frag.seq + 1; + if expected_seq != frag.seq { + error!("expected frag seq {} but got seq {}", expected_seq, frag.seq); + return; + } + } + if self.seal.is_some() { + error!("trying to apply frag after seal"); + return; + } + + self.current_frag = Some(frag); + + if let Some(state_update) = state_update { + for (_tx_hash, receipt) in state_update.receipts.iter() { + let sender = receipt.from(); + self.transaction_count_diff.entry(sender).and_modify(|count| *count += 1).or_insert(1); + } + self.receipts.extend(state_update.receipts); + self.balances.extend(state_update.balances); + } + } + + pub fn apply_seal(&mut self, seal: SealV0) { + self.seal = Some(seal); + } + + pub fn get_transaction_count_diff(&self, address: Address) -> Option { + self.transaction_count_diff.get(&address).cloned() + } + + pub fn get_receipt(&self, tx_hash: B256) -> Option { + self.receipts.get(&tx_hash).cloned() + } + + pub fn get_balance(&self, address: Address) -> Option { + self.balances.get(&address).cloned() + } +} + +pub struct UnsealedBlockStack { + pub blocks: VecDeque, + pub root_provider_block_number: Option, +} + +impl UnsealedBlockStack { + pub fn new() -> Self { + Self { blocks: VecDeque::new(), root_provider_block_number: None } + } + + pub fn get_transaction_count_diff(&self, address: Address) -> u64 { + let mut total_diff = 0; + for block in self.blocks.iter().rev() { + total_diff += block.get_transaction_count_diff(address).unwrap_or(0); + } + total_diff + } + + pub fn get_receipt(&self, tx_hash: B256) -> Option { + for block in self.blocks.iter().rev() { + if let Some(receipt) = block.get_receipt(tx_hash) { + return Some(receipt); + } + } + None + } + + pub fn get_balance(&self, address: Address) -> Option { + for block in self.blocks.iter().rev() { + if let Some(balance) = block.get_balance(address) { + return Some(balance); + } + } + None + } + + pub fn block_number(&self) -> Option { + if let Some(block) = self.blocks.back() { + return Some(block.env.number); + } + if let Some(root_provider_block_number) = self.root_provider_block_number { + return Some(root_provider_block_number); + } + None + } +} From 06597ce625dfbbc13778616c4e488779b245642a Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Thu, 27 Nov 2025 19:18:22 +0000 Subject: [PATCH 04/14] eth_call works --- based/bin/rpc/src/main.rs | 2 +- based/bin/rpc/src/middleware.rs | 1 + based/bin/rpc/src/server.rs | 107 ++++++++++++++---- based/bin/rpc/src/types.rs | 12 +- based/bin/rpc/src/unsealed_block.rs | 29 ++++- based/crates/common/src/db/frag.rs | 2 +- based/crates/common/src/p2p.rs | 10 ++ .../sequencer/src/sorting/frag_sequence.rs | 25 ++-- fetching_balance.py | 91 ++++++++++++--- 9 files changed, 227 insertions(+), 52 deletions(-) diff --git a/based/bin/rpc/src/main.rs b/based/bin/rpc/src/main.rs index ba8ac16b4..df9a6c44c 100644 --- a/based/bin/rpc/src/main.rs +++ b/based/bin/rpc/src/main.rs @@ -126,7 +126,7 @@ async fn main() -> eyre::Result<()> { let server = server_obj.clone(); tokio::spawn(async move { - let address_to_check = Address::from_str("0x4D36DE6a194dDF98EE57323CfA3A45351d35e442").unwrap(); + let address_to_check = Address::from_str("0x0E2d15588e765f0ba315313C726041EA124e36CB").unwrap(); let mut interval = interval(Duration::from_secs_f64(0.1)); loop { let transaction_count = server.get_transaction_count(address_to_check).await.unwrap(); diff --git a/based/bin/rpc/src/middleware.rs b/based/bin/rpc/src/middleware.rs index e0505d12d..57a38e965 100644 --- a/based/bin/rpc/src/middleware.rs +++ b/based/bin/rpc/src/middleware.rs @@ -29,6 +29,7 @@ const SUPPORTED_METHODS: &[&str] = &[ "eth_blockNumber", "eth_getTransactionCount", "eth_getBalance", + "eth_call", ]; impl RpcServiceT for EthApiProxy diff --git a/based/bin/rpc/src/server.rs b/based/bin/rpc/src/server.rs index 3390659f3..25ead8060 100644 --- a/based/bin/rpc/src/server.rs +++ b/based/bin/rpc/src/server.rs @@ -6,15 +6,18 @@ use alloy_primitives::{ map::foldhash::{HashMap, HashMapExt}, }; use alloy_provider::Provider; -use alloy_rpc_types::Header; -use bop_common::p2p::{EnvV0, FragV0, SealV0, StateUpdate}; +use alloy_rpc_types::{ + BlockOverrides, Header, + state::{AccountOverride, StateOverride}, +}; +use bop_common::p2p::{DetailedStateChange, EnvV0, FragV0, SealV0, StateUpdate}; use eyre::Result; use jsonrpsee::{ core::{RpcResult, async_trait}, http_client::HttpClientBuilder, types::ErrorObject, }; -use op_alloy_rpc_types::OpTransactionReceipt; +use op_alloy_rpc_types::{OpTransactionReceipt, OpTransactionRequest}; use parking_lot::RwLock; use reqwest::Url; use tracing::error; @@ -47,6 +50,7 @@ impl Server { transaction_count_diff: HashMap::new(), receipts: HashMap::new(), balances: HashMap::new(), + state_changes: HashMap::new(), seal: None, }); }); @@ -144,6 +148,10 @@ impl Server { Ok(self.provider.get_block_number().await?) } } + + pub fn get_state_changes(&self) -> HashMap { + self.unsealed_stack.read().get_state_changes() + } } #[async_trait] @@ -220,31 +228,80 @@ impl EthApiServer for Server { } async fn balance(&self, address: Address, block_number: Option) -> RpcResult { - match block_number { - Some(BlockId::Number(BlockNumberOrTag::Pending)) => { - let block_number = self.block_number().await.map_err(|e| { - ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to get block number", - Some(e.to_string()), - ) - })?; - Ok(U256::from(block_number)) + if let Some(BlockId::Number(BlockNumberOrTag::Pending)) = block_number { + if let Ok(balance) = self.get_balance(address).await { + return Ok(U256::from(balance)); } - _ => { - let balance = - self.provider.get_balance(address).block_id(block_number.unwrap_or_default()).await.map_err( - |e| { - ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to get balance", - Some(e.to_string()), - ) - }, - )?; - Ok(U256::from(balance)) + } + + let balance = + self.provider.get_balance(address).block_id(block_number.unwrap_or_default()).await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get balance", + Some(e.to_string()), + ) + })?; + Ok(U256::from(balance)) + } + + async fn call( + &self, + transaction: OpTransactionRequest, + block_number: Option, + state_overrides: Option, + block_overrides: Option>, + ) -> RpcResult { + if block_overrides.is_some() { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Block overrides are not supported", + Some("Block overrides are not supported".to_string()), + )) + } + + if state_overrides.is_some() { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "State overrides are not supported", + Some("State overrides are not supported".to_string()), + )) + } + + let mut state_overrides_full = StateOverride::default(); + + let state_overrides_unsealed_block = self.get_state_changes(); + for (address, state_change) in state_overrides_unsealed_block.iter() { + let account = state_overrides_full.entry(*address).or_insert_with(AccountOverride::default); + account.balance = Some(state_change.balance); + account.nonce = Some(state_change.nonce); + if !state_change.storage.is_empty() { + if account.state_diff.is_none() { + account.state_diff = Some(Default::default()); + } + let account_storage = account.state_diff.as_mut().unwrap(); + for (slot, value) in state_change.storage.iter() { + account_storage.insert((*slot).into(), (*value).into()); + } } } + + // info!("overrides: {:?}", state_overrides_full); + + let result = self + .provider + .call(transaction) + .block(block_number.unwrap_or_default()) + .overrides(state_overrides_full) + .await; + match result { + Ok(result) => Ok(result), + Err(e) => Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to call", + Some(e.to_string()), + )), + } } } diff --git a/based/bin/rpc/src/types.rs b/based/bin/rpc/src/types.rs index 25600898b..acdfafd8d 100644 --- a/based/bin/rpc/src/types.rs +++ b/based/bin/rpc/src/types.rs @@ -1,9 +1,10 @@ use alloy_eips::BlockId; use alloy_primitives::{Address, B256, Bytes, U256}; use alloy_provider::RootProvider; +use alloy_rpc_types::{BlockOverrides, state::StateOverride}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use op_alloy_network::Optimism; -use op_alloy_rpc_types::OpTransactionReceipt; +use op_alloy_rpc_types::{OpTransactionReceipt, OpTransactionRequest}; pub type OpRootProvider = RootProvider; @@ -37,4 +38,13 @@ pub trait EthApi { /// Returns the balance of the account of given address. #[method(name = "getBalance")] async fn balance(&self, address: Address, block_number: Option) -> RpcResult; + + #[method(name = "call")] + async fn call( + &self, + transaction: OpTransactionRequest, + block_number: Option, + state_overrides: Option, + block_overrides: Option>, + ) -> RpcResult; } diff --git a/based/bin/rpc/src/unsealed_block.rs b/based/bin/rpc/src/unsealed_block.rs index d3a7bbe5b..1b3f111e7 100644 --- a/based/bin/rpc/src/unsealed_block.rs +++ b/based/bin/rpc/src/unsealed_block.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use alloy_network::ReceiptResponse; use alloy_primitives::{Address, B256, U256, map::foldhash::HashMap}; -use bop_common::p2p::{EnvV0, FragV0, SealV0, StateUpdate}; +use bop_common::p2p::{DetailedStateChange, EnvV0, FragV0, SealV0, StateUpdate}; use op_alloy_rpc_types::OpTransactionReceipt; use tracing::error; @@ -12,6 +12,7 @@ pub struct UnsealedBlock { pub transaction_count_diff: HashMap, pub receipts: HashMap, pub balances: HashMap, + pub state_changes: HashMap, pub seal: Option, } @@ -44,6 +45,15 @@ impl UnsealedBlock { } self.receipts.extend(state_update.receipts); self.balances.extend(state_update.balances); + + if let Some(state_changes) = state_update.state_changes { + for (address, state_change) in state_changes.iter() { + let account = self.state_changes.entry(*address).or_default(); + account.balance = state_change.balance; + account.nonce = state_change.nonce; + account.storage.extend(state_change.storage.iter()); + } + } } } @@ -62,6 +72,10 @@ impl UnsealedBlock { pub fn get_balance(&self, address: Address) -> Option { self.balances.get(&address).cloned() } + + pub fn get_state_changes(&self) -> HashMap { + self.state_changes.clone() + } } pub struct UnsealedBlockStack { @@ -109,4 +123,17 @@ impl UnsealedBlockStack { } None } + + pub fn get_state_changes(&self) -> HashMap { + let mut state_changes = HashMap::default(); + for block in self.blocks.iter() { + for (address, state_change) in block.get_state_changes().iter() { + let account = state_changes.entry(*address).or_insert_with(DetailedStateChange::default); + account.balance = state_change.balance; + account.nonce = state_change.nonce; + account.storage.extend(state_change.storage.iter()); + } + } + state_changes + } } diff --git a/based/crates/common/src/db/frag.rs b/based/crates/common/src/db/frag.rs index 738cbec9d..e0b122927 100644 --- a/based/crates/common/src/db/frag.rs +++ b/based/crates/common/src/db/frag.rs @@ -51,7 +51,7 @@ impl DBFrag { let mut guard = self.db.write(); for t in txs { - let evm_state = std::mem::take(&mut t.result_and_state.state); + let evm_state = t.result_and_state.state.clone(); for a in evm_state.keys() { let _ = guard.load_cache_account(*a); } diff --git a/based/crates/common/src/p2p.rs b/based/crates/common/src/p2p.rs index affc34da4..a4e4a7a74 100644 --- a/based/crates/common/src/p2p.rs +++ b/based/crates/common/src/p2p.rs @@ -159,6 +159,16 @@ pub struct StateUpdate { pub receipts: HashMap, /// Updated balances for all accounts in txs in FragV0 pub balances: HashMap, + /// Includes storage + #[serde(skip_serializing_if = "Option::is_none")] + pub state_changes: Option>, +} + +#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] +pub struct DetailedStateChange { + pub balance: U256, + pub nonce: u64, + pub storage: HashMap, } impl SignedVersionedMessage { diff --git a/based/crates/sequencer/src/sorting/frag_sequence.rs b/based/crates/sequencer/src/sorting/frag_sequence.rs index 5c184c4ae..aa12d8dc6 100644 --- a/based/crates/sequencer/src/sorting/frag_sequence.rs +++ b/based/crates/sequencer/src/sorting/frag_sequence.rs @@ -2,7 +2,7 @@ use alloy_consensus::proofs::ordered_trie_root_with_encoder; use alloy_eips::eip2718::Encodable2718; use alloy_primitives::{Bloom, U256}; use bop_common::{ - p2p::{FragV0, StateUpdate, Transaction, Transactions}, + p2p::{DetailedStateChange, FragV0, StateUpdate, Transaction, Transactions}, telemetry::TelemetryUpdate, time::Instant, transaction::SimulatedTx, @@ -12,7 +12,7 @@ use revm_primitives::{ B256, Bytes, map::foldhash::{HashMap, HashMapExt}, }; -use tracing::debug; +use tracing::{debug, info}; use super::{SortingData, sorting_data::SortingTelemetry}; use crate::context::SequencerContext; @@ -82,6 +82,7 @@ impl FragSequence { let mut txs = Vec::with_capacity(in_sort.txs.len()); let mut receipts = HashMap::with_capacity(in_sort.txs.len()); let mut balances = HashMap::with_capacity(in_sort.txs.len()); + let mut state_changes = HashMap::default(); let mut in_sort_da_used = 0; let in_sort_txs = in_sort.txs.len(); @@ -100,10 +101,18 @@ impl FragSequence { self.txs.len() as u64, ), ); - let address = tx.sender(); - let balance = - in_sort.db.basic_ref(address).map(|a| a.map(|a| a.balance).unwrap_or_default()).unwrap_or(U256::ZERO); - balances.insert(address, balance); + + for (address, account) in tx.result_and_state.state.iter() { + let state_change: &mut DetailedStateChange = state_changes.entry(*address).or_default(); + state_change.balance = account.info.balance; + state_change.nonce = account.info.nonce; + for (slot, value) in account.storage.iter() { + let slot: U256 = *slot; + let value: U256 = value.present_value(); + state_change.storage.insert(slot, value); + } + balances.insert(*address, account.info.balance); + } self.txs.push(tx); } @@ -120,7 +129,7 @@ impl FragSequence { txs: Transactions::from(txs), blob_gas_used: 0, }; - let state_update = StateUpdate { receipts, balances }; + let state_update = StateUpdate { receipts, balances, state_changes: Some(state_changes) }; TelemetryUpdate::send( uuid, @@ -296,7 +305,7 @@ mod tests { let (_frag, _, _sorting_db) = ctx.seal_frag(sorting_db, &mut seq); // Seal the block - let (_seal, payload) = ctx.seal_block(seq); + let (_seal, payload) = ctx.seal_block(seq, None); assert_eq!(block.hash_slow(), payload.execution_payload.payload_inner.payload_inner.payload_inner.block_hash); } } diff --git a/fetching_balance.py b/fetching_balance.py index ab6207949..500882844 100644 --- a/fetching_balance.py +++ b/fetching_balance.py @@ -1,29 +1,90 @@ import requests import time -previous_balance = 0 -previous_at = 0 -while True: +previous_balance = None +previous_at = time.time() + +url = "http://localhost:7545" + +token_address = "0x71a49e7ff0865d2de258e720782951879645df1b" +token_holder_address = "0x47Bae705382e91664F369d79aD8EcB8fDF23D355" +address = "0x0E2d15588e765f0ba315313C726041EA124e36CB" + +def fetch_erc20_balance(address: str, token_address: str) -> float: + # Remove '0x' prefix and pad address to 64 hex characters (32 bytes) + address_clean = address.lower().replace('0x', '') + address_padded = address_clean.zfill(64) + + # balanceOf(address) function selector is 0x70a08231 + # Format: function_selector (4 bytes) + address (32 bytes) + data = f"0x70a08231{address_padded}" + body = { "jsonrpc": "2.0", "id": 1, - "method": "eth_getBalance", + "method": "eth_call", "params": [ - "0x4D36DE6a194dDF98EE57323CfA3A45351d35e442", - "latest" + {"to": token_address, "data": data}, + "pending" ], } + + response = requests.post(url, json=body, timeout=5) + response.raise_for_status() + + result = response.json() + + # Check for RPC errors + if 'error' in result: + raise Exception(f"RPC error: {result['error']}") + + result_hex = result.get('result', '0x0') + if not result_hex or result_hex == '0x': + return 0.0 + + balance = int(result_hex, 16) / 1e18 + return balance + +def fetch_balance(address: str) -> float: + body = { + "jsonrpc": "2.0", + "id": 1, + "method": "eth_getBalance", + "params": [address, "pending"], + } + response = requests.post(url, json=body, timeout=5) + response.raise_for_status() + + result = response.json() + + # Check for RPC errors + if 'error' in result: + raise Exception(f"RPC error: {result['error']}") + + result_hex = result.get('result', '0x0') + if not result_hex or result_hex == '0x': + return 0.0 + + balance = int(result_hex, 16) / 1e18 + return balance + +while True: try: - response = requests.post("http://localhost:8645", json=body) - balance = int(response.json().get('result', '0x0'), 16) / 1e18 + # balance = fetch_balance(address) + balance = fetch_erc20_balance(token_holder_address, token_address) + + + if previous_balance is None or balance != previous_balance: + elapsed = time.time() - previous_at if previous_balance is not None else 0 + previous_at = time.time() + if previous_balance is None: + print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Initial balance: {balance:.10f} ETH") + else: + print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Balance changed: {previous_balance:.10f} ETH -> {balance:.10f} ETH (elapsed: {elapsed*1000:.1f}ms)") + previous_balance = balance except Exception as e: - balance = 0 print(f"Error fetching balance: {e}") - - if balance != previous_balance: - elapsed = time.time() - previous_at - previous_at = time.time() - print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Balance changed: {previous_balance:.10f} ETH -> {balance:.10f} ETH (elapsed: {elapsed*1000:.1f}ms)") - previous_balance = balance + time.sleep(1) # Wait longer on error + continue time.sleep(1/60) \ No newline at end of file From a09e6d14fb4f39469e0c2a7eddf658782f7442fa Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Thu, 27 Nov 2025 19:25:05 +0000 Subject: [PATCH 05/14] pending --- based/bin/rpc/src/server.rs | 104 +++++++++++++++++++++--------------- based/bin/rpc/src/types.rs | 2 +- 2 files changed, 61 insertions(+), 45 deletions(-) diff --git a/based/bin/rpc/src/server.rs b/based/bin/rpc/src/server.rs index 25ead8060..00b4e06d7 100644 --- a/based/bin/rpc/src/server.rs +++ b/based/bin/rpc/src/server.rs @@ -250,57 +250,73 @@ impl EthApiServer for Server { transaction: OpTransactionRequest, block_number: Option, state_overrides: Option, - block_overrides: Option>, + block_overrides: Option, ) -> RpcResult { - if block_overrides.is_some() { - return Err(ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Block overrides are not supported", - Some("Block overrides are not supported".to_string()), - )) - } + if let Some(BlockId::Number(BlockNumberOrTag::Pending)) = block_number { + if block_overrides.is_some() { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Block overrides are not supported", + Some("Block overrides are not supported".to_string()), + )); + } - if state_overrides.is_some() { - return Err(ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "State overrides are not supported", - Some("State overrides are not supported".to_string()), - )) - } + if state_overrides.is_some() { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "State overrides are not supported", + Some("State overrides are not supported".to_string()), + )); + } - let mut state_overrides_full = StateOverride::default(); + let mut state_overrides_full = StateOverride::default(); - let state_overrides_unsealed_block = self.get_state_changes(); - for (address, state_change) in state_overrides_unsealed_block.iter() { - let account = state_overrides_full.entry(*address).or_insert_with(AccountOverride::default); - account.balance = Some(state_change.balance); - account.nonce = Some(state_change.nonce); - if !state_change.storage.is_empty() { - if account.state_diff.is_none() { - account.state_diff = Some(Default::default()); - } - let account_storage = account.state_diff.as_mut().unwrap(); - for (slot, value) in state_change.storage.iter() { - account_storage.insert((*slot).into(), (*value).into()); + let state_overrides_unsealed_block = self.get_state_changes(); + for (address, state_change) in state_overrides_unsealed_block.iter() { + let account = state_overrides_full.entry(*address).or_insert_with(AccountOverride::default); + account.balance = Some(state_change.balance); + account.nonce = Some(state_change.nonce); + if !state_change.storage.is_empty() { + if account.state_diff.is_none() { + account.state_diff = Some(Default::default()); + } + let account_storage = account.state_diff.as_mut().unwrap(); + for (slot, value) in state_change.storage.iter() { + account_storage.insert((*slot).into(), (*value).into()); + } } } - } - - // info!("overrides: {:?}", state_overrides_full); - let result = self - .provider - .call(transaction) - .block(block_number.unwrap_or_default()) - .overrides(state_overrides_full) - .await; - match result { - Ok(result) => Ok(result), - Err(e) => Err(ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to call", - Some(e.to_string()), - )), + let result = self + .provider + .call(transaction) + .block(block_number.unwrap_or_default()) + .overrides(state_overrides_full) + .await; + match result { + Ok(result) => Ok(result), + Err(e) => Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to call", + Some(e.to_string()), + )), + } + } else { + let request = self + .provider + .call(transaction) + .block(block_number.unwrap_or_default()) + .overrides_opt(state_overrides) + .with_block_overrides_opt(block_overrides); + let result = request.await; + match result { + Ok(result) => Ok(result), + Err(e) => Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to call", + Some(e.to_string()), + )), + } } } } diff --git a/based/bin/rpc/src/types.rs b/based/bin/rpc/src/types.rs index acdfafd8d..ae8a50b55 100644 --- a/based/bin/rpc/src/types.rs +++ b/based/bin/rpc/src/types.rs @@ -45,6 +45,6 @@ pub trait EthApi { transaction: OpTransactionRequest, block_number: Option, state_overrides: Option, - block_overrides: Option>, + block_overrides: Option, ) -> RpcResult; } From de4cd1c987b85777c09865b3de658b7e6ffe0e13 Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Thu, 27 Nov 2025 19:27:30 +0000 Subject: [PATCH 06/14] remove comment --- based/crates/rpc/src/gossiper.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/based/crates/rpc/src/gossiper.rs b/based/crates/rpc/src/gossiper.rs index 79dffffb6..38912d078 100644 --- a/based/crates/rpc/src/gossiper.rs +++ b/based/crates/rpc/src/gossiper.rs @@ -37,9 +37,6 @@ impl Gossiper { let signed = msg.to_signed(&self.signer); let payload = signed.to_json(); - // if matches!(signed.message, p2p::VersionedMessage::FragV0(_)) && self.frag_broadcast.send(signed).is_err() { - // tracing::debug!("broadcast of frag failed") - // } if self.frag_broadcast.send(signed).is_err() { tracing::debug!("broadcast of message failed") } From 595ae418057e0e235626b6f09d317f04a4e3e96a Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Thu, 27 Nov 2025 19:28:24 +0000 Subject: [PATCH 07/14] make file public node --- Makefile | 2 -- 1 file changed, 2 deletions(-) diff --git a/Makefile b/Makefile index f0138b018..a3c9b922f 100644 --- a/Makefile +++ b/Makefile @@ -30,8 +30,6 @@ L2_CHAIN_ID?=$(shell \ L2_CHAIN_ID_HEX:=$(shell printf "0x%064x" $(L2_CHAIN_ID)) PORTAL?=http://0.0.0.0:8080 TXPROXY?=http://0.0.0.0:8090 -# L1_RPC_URL?=http://3.84.162.42:8545 -# L1_BEACON_RPC_URL?=http://3.84.162.42:5051 L1_RPC_URL?=https://ethereum-sepolia-rpc.publicnode.com L1_BEACON_RPC_URL?=https://ethereum-sepolia-beacon-api.publicnode.com PUBLIC_IP?=$(shell curl ifconfig.me) From 5e54db82f6557da99b9fd736199a93d555c9594d Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Thu, 27 Nov 2025 19:35:08 +0000 Subject: [PATCH 08/14] fmt clippy --- based/crates/sequencer/src/sorting/frag_sequence.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/based/crates/sequencer/src/sorting/frag_sequence.rs b/based/crates/sequencer/src/sorting/frag_sequence.rs index aa12d8dc6..2686cd43a 100644 --- a/based/crates/sequencer/src/sorting/frag_sequence.rs +++ b/based/crates/sequencer/src/sorting/frag_sequence.rs @@ -12,7 +12,7 @@ use revm_primitives::{ B256, Bytes, map::foldhash::{HashMap, HashMapExt}, }; -use tracing::{debug, info}; +use tracing::debug; use super::{SortingData, sorting_data::SortingTelemetry}; use crate::context::SequencerContext; From 4f1076c11fc142f53297e6f6c240962f9692401d Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Thu, 27 Nov 2025 19:44:21 +0000 Subject: [PATCH 09/14] ensure override on the correct block --- based/bin/rpc/src/server.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/based/bin/rpc/src/server.rs b/based/bin/rpc/src/server.rs index 00b4e06d7..e2ce5e822 100644 --- a/based/bin/rpc/src/server.rs +++ b/based/bin/rpc/src/server.rs @@ -149,6 +149,10 @@ impl Server { } } + pub fn base_block_number(&self) -> Option { + self.unsealed_stack.read().root_provider_block_number + } + pub fn get_state_changes(&self) -> HashMap { self.unsealed_stack.read().get_state_changes() } @@ -271,6 +275,7 @@ impl EthApiServer for Server { let mut state_overrides_full = StateOverride::default(); + let base_block_number = self.base_block_number().unwrap_or_default(); let state_overrides_unsealed_block = self.get_state_changes(); for (address, state_change) in state_overrides_unsealed_block.iter() { let account = state_overrides_full.entry(*address).or_insert_with(AccountOverride::default); @@ -287,10 +292,11 @@ impl EthApiServer for Server { } } + let result = self .provider .call(transaction) - .block(block_number.unwrap_or_default()) + .block(BlockId::number(base_block_number)) .overrides(state_overrides_full) .await; match result { From b28149c6a0d78f9246ef73799ca96f40b8be14c4 Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Thu, 27 Nov 2025 19:45:34 +0000 Subject: [PATCH 10/14] fmt --- based/bin/rpc/src/server.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/based/bin/rpc/src/server.rs b/based/bin/rpc/src/server.rs index e2ce5e822..4ba808b58 100644 --- a/based/bin/rpc/src/server.rs +++ b/based/bin/rpc/src/server.rs @@ -292,7 +292,6 @@ impl EthApiServer for Server { } } - let result = self .provider .call(transaction) From 2914d4529ca16569835782cb55a4fba148aed0c7 Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Fri, 28 Nov 2025 10:19:32 +0000 Subject: [PATCH 11/14] clean up --- based/bin/rpc/src/server.rs | 99 +++++++++++------------------ based/bin/rpc/src/unsealed_block.rs | 26 +++++++- 2 files changed, 61 insertions(+), 64 deletions(-) diff --git a/based/bin/rpc/src/server.rs b/based/bin/rpc/src/server.rs index 4ba808b58..e0dd90907 100644 --- a/based/bin/rpc/src/server.rs +++ b/based/bin/rpc/src/server.rs @@ -6,11 +6,8 @@ use alloy_primitives::{ map::foldhash::{HashMap, HashMapExt}, }; use alloy_provider::Provider; -use alloy_rpc_types::{ - BlockOverrides, Header, - state::{AccountOverride, StateOverride}, -}; -use bop_common::p2p::{DetailedStateChange, EnvV0, FragV0, SealV0, StateUpdate}; +use alloy_rpc_types::{BlockOverrides, Header, state::StateOverride}; +use bop_common::p2p::{EnvV0, FragV0, SealV0, StateUpdate}; use eyre::Result; use jsonrpsee::{ core::{RpcResult, async_trait}, @@ -103,6 +100,7 @@ impl Server { } unsealed_block.with_upgraded(|blocks| { blocks.blocks.back_mut().unwrap().apply_frag(frag, state_update); + blocks.rebuild_overrides(); }); } @@ -153,33 +151,34 @@ impl Server { self.unsealed_stack.read().root_provider_block_number } - pub fn get_state_changes(&self) -> HashMap { - self.unsealed_stack.read().get_state_changes() + pub fn get_state_overrides(&self) -> StateOverride { + self.unsealed_stack.read().overrides.clone() } } #[async_trait] impl EthApiServer for Server { async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult { - let hash = self.tx_receiver_provider.send_raw_transaction(&bytes).await.map_err(|e| { - ErrorObject::owned( + match self.tx_receiver_provider.send_raw_transaction(&bytes).await { + Ok(pending_tx) => Ok(*pending_tx.tx_hash()), + Err(e) => Err(ErrorObject::owned( jsonrpsee::types::error::INTERNAL_ERROR_CODE, "Failed to send transaction", Some(e.to_string()), - ) - })?; - Ok(*hash.tx_hash()) + )), + } } async fn transaction_receipt(&self, hash: B256) -> RpcResult> { - let receipt = self.get_receipt(hash).await.map_err(|e| { - ErrorObject::owned( + match self.get_receipt(hash).await { + Ok(Some(receipt)) => Ok(Some(receipt)), + Ok(None) => Ok(None), + Err(e) => Err(ErrorObject::owned( jsonrpsee::types::error::INTERNAL_ERROR_CODE, "Failed to get transaction receipt", Some(e.to_string()), - ) - })?; - Ok(receipt) + )), + } } // async fn block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult> { @@ -191,44 +190,34 @@ impl EthApiServer for Server { // } async fn block_number(&self) -> RpcResult { - let block_number = self.block_number().await.map_err(|e| { - ErrorObject::owned( + match self.block_number().await { + Ok(block_number) => Ok(U256::from(block_number)), + Err(e) => Err(ErrorObject::owned( jsonrpsee::types::error::INTERNAL_ERROR_CODE, "Failed to get block number", Some(e.to_string()), - ) - })?; - Ok(U256::from(block_number)) + )), + } } async fn transaction_count(&self, address: Address, block_number: Option) -> RpcResult { - match block_number { - Some(BlockId::Number(BlockNumberOrTag::Pending)) => { - let block_number = self.block_number().await.map_err(|e| { + if let Some(BlockId::Number(BlockNumberOrTag::Pending)) = block_number { + if let Ok(transaction_count) = self.get_transaction_count(address).await { + return Ok(U256::from(transaction_count)); + } + } + + let transaction_count = + self.provider.get_transaction_count(address).block_id(block_number.unwrap_or_default()).await.map_err( + |e| { ErrorObject::owned( jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to get block number", + "Failed to get transaction count", Some(e.to_string()), ) - })?; - Ok(U256::from(block_number)) - } - _ => { - let transaction_count = self - .provider - .get_transaction_count(address) - .block_id(block_number.unwrap_or_default()) - .await - .map_err(|e| { - ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - "Failed to get transaction count", - Some(e.to_string()), - ) - })?; - Ok(U256::from(transaction_count)) - } - } + }, + )?; + Ok(U256::from(transaction_count)) } async fn balance(&self, address: Address, block_number: Option) -> RpcResult { @@ -273,30 +262,14 @@ impl EthApiServer for Server { )); } - let mut state_overrides_full = StateOverride::default(); - let base_block_number = self.base_block_number().unwrap_or_default(); - let state_overrides_unsealed_block = self.get_state_changes(); - for (address, state_change) in state_overrides_unsealed_block.iter() { - let account = state_overrides_full.entry(*address).or_insert_with(AccountOverride::default); - account.balance = Some(state_change.balance); - account.nonce = Some(state_change.nonce); - if !state_change.storage.is_empty() { - if account.state_diff.is_none() { - account.state_diff = Some(Default::default()); - } - let account_storage = account.state_diff.as_mut().unwrap(); - for (slot, value) in state_change.storage.iter() { - account_storage.insert((*slot).into(), (*value).into()); - } - } - } + let state_overrides = self.get_state_overrides(); let result = self .provider .call(transaction) .block(BlockId::number(base_block_number)) - .overrides(state_overrides_full) + .overrides(state_overrides) .await; match result { Ok(result) => Ok(result), diff --git a/based/bin/rpc/src/unsealed_block.rs b/based/bin/rpc/src/unsealed_block.rs index 1b3f111e7..119b1911b 100644 --- a/based/bin/rpc/src/unsealed_block.rs +++ b/based/bin/rpc/src/unsealed_block.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use alloy_network::ReceiptResponse; use alloy_primitives::{Address, B256, U256, map::foldhash::HashMap}; +use alloy_rpc_types::state::{AccountOverride, StateOverride}; use bop_common::p2p::{DetailedStateChange, EnvV0, FragV0, SealV0, StateUpdate}; use op_alloy_rpc_types::OpTransactionReceipt; use tracing::error; @@ -81,11 +82,12 @@ impl UnsealedBlock { pub struct UnsealedBlockStack { pub blocks: VecDeque, pub root_provider_block_number: Option, + pub overrides: StateOverride, } impl UnsealedBlockStack { pub fn new() -> Self { - Self { blocks: VecDeque::new(), root_provider_block_number: None } + Self { blocks: VecDeque::new(), root_provider_block_number: None, overrides: StateOverride::default() } } pub fn get_transaction_count_diff(&self, address: Address) -> u64 { @@ -136,4 +138,26 @@ impl UnsealedBlockStack { } state_changes } + + pub fn rebuild_overrides(&mut self) { + let mut state_overrides_full = StateOverride::default(); + + let state_overrides_unsealed_block = self.get_state_changes(); + for (address, state_change) in state_overrides_unsealed_block.iter() { + let account = state_overrides_full.entry(*address).or_insert_with(AccountOverride::default); + account.balance = Some(state_change.balance); + account.nonce = Some(state_change.nonce); + if !state_change.storage.is_empty() { + if account.state_diff.is_none() { + account.state_diff = Some(Default::default()); + } + let account_storage = account.state_diff.as_mut().unwrap(); + for (slot, value) in state_change.storage.iter() { + account_storage.insert((*slot).into(), (*value).into()); + } + } + } + + self.overrides = state_overrides_full; + } } From df46031749e2f4e7f98908212a5a98ff8e3e4e6c Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Fri, 28 Nov 2025 10:30:06 +0000 Subject: [PATCH 12/14] tight couple between base number and state_override --- based/bin/rpc/src/server.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/based/bin/rpc/src/server.rs b/based/bin/rpc/src/server.rs index e0dd90907..7ba0d5f48 100644 --- a/based/bin/rpc/src/server.rs +++ b/based/bin/rpc/src/server.rs @@ -61,6 +61,7 @@ impl Server { while !unsealed_block.blocks.is_empty() && unsealed_block.blocks.front().unwrap().env.number <= header.number { unsealed_block.with_upgraded(|blocks| { blocks.blocks.pop_front(); + blocks.rebuild_overrides(); blocks.root_provider_block_number = Some(header.number); }); } @@ -147,12 +148,13 @@ impl Server { } } - pub fn base_block_number(&self) -> Option { - self.unsealed_stack.read().root_provider_block_number - } - - pub fn get_state_overrides(&self) -> StateOverride { - self.unsealed_stack.read().overrides.clone() + pub fn get_state_overrides(&self) -> (BlockNumberOrTag, StateOverride) { + let unsealed_block = self.unsealed_stack.read(); + if let Some(base_block_number) = unsealed_block.root_provider_block_number { + (BlockNumberOrTag::Number(base_block_number), unsealed_block.overrides.clone()) + } else { + (BlockNumberOrTag::Latest, Default::default()) + } } } @@ -262,13 +264,12 @@ impl EthApiServer for Server { )); } - let base_block_number = self.base_block_number().unwrap_or_default(); - let state_overrides = self.get_state_overrides(); + let (base_block_number, state_overrides) = self.get_state_overrides(); let result = self .provider .call(transaction) - .block(BlockId::number(base_block_number)) + .block(BlockId::Number(base_block_number)) .overrides(state_overrides) .await; match result { From fbdf16d26d144904fe4d90b289a8f678a19f128d Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Mon, 1 Dec 2025 11:20:00 +0000 Subject: [PATCH 13/14] tx send sync --- based/Cargo.lock | 7 +++ based/bin/rpc/Cargo.toml | 3 +- based/bin/rpc/src/main.rs | 24 ++++++--- based/bin/rpc/src/server.rs | 83 +++++++++++++++++++++++++++-- based/bin/rpc/src/types.rs | 4 ++ based/bin/rpc/src/unsealed_block.rs | 12 ++--- 6 files changed, 115 insertions(+), 18 deletions(-) diff --git a/based/Cargo.lock b/based/Cargo.lock index b2cbff036..8592bfa6b 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -1623,6 +1623,7 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber 0.3.20", + "ttlhashmap", ] [[package]] @@ -11583,6 +11584,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "ttlhashmap" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50ddd0e3ea228684ba88409bd2987421f96130063f68ae29c0ac322219be83f2" + [[package]] name = "tui-big-text" version = "0.7.1" diff --git a/based/bin/rpc/Cargo.toml b/based/bin/rpc/Cargo.toml index 8e58859c6..421187e99 100644 --- a/based/bin/rpc/Cargo.toml +++ b/based/bin/rpc/Cargo.toml @@ -33,4 +33,5 @@ tokio-websockets.workspace = true http.workspace = true futures-util.workspace = true crossbeam-channel.workspace = true -tracing-subscriber.workspace = true \ No newline at end of file +tracing-subscriber.workspace = true +ttlhashmap = "0.1.0" diff --git a/based/bin/rpc/src/main.rs b/based/bin/rpc/src/main.rs index df9a6c44c..277d12344 100644 --- a/based/bin/rpc/src/main.rs +++ b/based/bin/rpc/src/main.rs @@ -124,16 +124,26 @@ async fn main() -> eyre::Result<()> { } }); + if cfg!(debug_assertions) { + let server = server_obj.clone(); + tokio::spawn(async move { + let address_to_check = Address::from_str("0x0E2d15588e765f0ba315313C726041EA124e36CB").unwrap(); + let mut interval = interval(Duration::from_secs_f64(0.1)); + loop { + let transaction_count = server.get_transaction_count(address_to_check).await.unwrap(); + let balance = server.get_balance(address_to_check).await.unwrap(); + let block_number = server.block_number().await.unwrap(); + info!("block number: {} count: {} balance: {:?}", block_number, transaction_count, balance); + interval.tick().await; + } + }); + } + let server = server_obj.clone(); tokio::spawn(async move { - let address_to_check = Address::from_str("0x0E2d15588e765f0ba315313C726041EA124e36CB").unwrap(); - let mut interval = interval(Duration::from_secs_f64(0.1)); loop { - let transaction_count = server.get_transaction_count(address_to_check).await.unwrap(); - let balance = server.get_balance(address_to_check).await.unwrap(); - let block_number = server.block_number().await.unwrap(); - info!("block number: {} count: {} balance: {:?}", block_number, transaction_count, balance); - interval.tick().await; + server.cleanup_tx_send_sync_waiter(); + tokio::time::sleep(Duration::from_secs(1)).await; } }); diff --git a/based/bin/rpc/src/server.rs b/based/bin/rpc/src/server.rs index 7ba0d5f48..7695cb0ab 100644 --- a/based/bin/rpc/src/server.rs +++ b/based/bin/rpc/src/server.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_primitives::{ @@ -17,7 +20,9 @@ use jsonrpsee::{ use op_alloy_rpc_types::{OpTransactionReceipt, OpTransactionRequest}; use parking_lot::RwLock; use reqwest::Url; -use tracing::error; +use tokio::sync::oneshot; +use tracing::{debug, error}; +use ttlhashmap::TtlHashMap; use crate::{ middleware::RpcClient, @@ -30,11 +35,17 @@ pub struct Server { unsealed_stack: Arc>, provider: OpRootProvider, tx_receiver_provider: OpRootProvider, + tx_send_sync_waiter: Arc>>>, } impl Server { pub fn new(provider: OpRootProvider, tx_receiver_provider: OpRootProvider) -> Self { - Self { unsealed_stack: Arc::new(RwLock::new(UnsealedBlockStack::new())), provider, tx_receiver_provider } + Self { + unsealed_stack: Arc::new(RwLock::new(UnsealedBlockStack::new())), + provider, + tx_receiver_provider, + tx_send_sync_waiter: Arc::new(RwLock::new(TtlHashMap::new(Duration::from_millis(1000)))), /* TODO: make this configurable */ + } } pub fn on_env(&self, env: EnvV0) { @@ -99,6 +110,20 @@ impl Server { ); return; } + + if let Some(state_update) = &state_update { + let mut tx_send_sync_waiter = self.tx_send_sync_waiter.upgradable_read(); + for (tx_hash, receipt) in state_update.receipts.iter() { + let tx_hash_b256 = B256::from(*tx_hash); + if tx_send_sync_waiter.contains_key(&tx_hash_b256) { + tx_send_sync_waiter.with_upgraded(|waiter| { + let sender = waiter.remove(&tx_hash_b256).unwrap(); // unwrap is safe because we just checked that the key exists + let _ = sender.send(receipt.clone()); + }); + } + } + } + unsealed_block.with_upgraded(|blocks| { blocks.blocks.back_mut().unwrap().apply_frag(frag, state_update); blocks.rebuild_overrides(); @@ -156,6 +181,10 @@ impl Server { (BlockNumberOrTag::Latest, Default::default()) } } + + pub fn cleanup_tx_send_sync_waiter(&self) { + self.tx_send_sync_waiter.write().cleanup(); + } } #[async_trait] @@ -171,6 +200,54 @@ impl EthApiServer for Server { } } + async fn send_raw_transaction_sync(&self, bytes: Bytes, timeout_ms: u64) -> RpcResult { + if timeout_ms > 3000 { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Timeout is too long, maximum is 3000ms", + Some("Timeout is too long, maximum is 3000ms".to_string()), + )); + } + + let start = Instant::now(); + + let pending_tx = match self.tx_receiver_provider.send_raw_transaction(&bytes).await { + Ok(pending_tx) => pending_tx, + Err(e) => { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to send transaction", + Some(e.to_string()), + )) + } + }; + + let tx_hash = *pending_tx.tx_hash(); + + let (tx_send_sync_waiter_tx, tx_send_sync_waiter_rx) = oneshot::channel(); + self.tx_send_sync_waiter.write().insert(tx_hash, tx_send_sync_waiter_tx); + + match tx_send_sync_waiter_rx.await { + Ok(receipt) => return Ok(receipt), + _ => { + debug!("waiting for transaction receipt, waiter dropped"); + } + } + + while start.elapsed() < Duration::from_millis(timeout_ms) { + if let Ok(Some(receipt)) = self.get_receipt(tx_hash).await { + return Ok(receipt); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + + Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Timeout waiting for transaction receipt", + Some("Timeout waiting for transaction receipt".to_string()), + )) + } + async fn transaction_receipt(&self, hash: B256) -> RpcResult> { match self.get_receipt(hash).await { Ok(Some(receipt)) => Ok(Some(receipt)), diff --git a/based/bin/rpc/src/types.rs b/based/bin/rpc/src/types.rs index ae8a50b55..5739f9d50 100644 --- a/based/bin/rpc/src/types.rs +++ b/based/bin/rpc/src/types.rs @@ -15,6 +15,10 @@ pub trait EthApi { #[method(name = "sendRawTransaction")] async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult; + /// Sends signed transaction, waiting for it to be mined and returning the receipt + #[method(name = "sendRawTransactionSync")] + async fn send_raw_transaction_sync(&self, bytes: Bytes, timeout_ms: u64) -> RpcResult; + /// Returns the receipt of a transaction by transaction hash #[method(name = "getTransactionReceipt")] async fn transaction_receipt(&self, hash: B256) -> RpcResult>; diff --git a/based/bin/rpc/src/unsealed_block.rs b/based/bin/rpc/src/unsealed_block.rs index 119b1911b..7a9bd5894 100644 --- a/based/bin/rpc/src/unsealed_block.rs +++ b/based/bin/rpc/src/unsealed_block.rs @@ -19,19 +19,17 @@ pub struct UnsealedBlock { impl UnsealedBlock { pub fn apply_frag(&mut self, frag: FragV0, state_update: Option) { - if self.current_frag.is_none() { - if frag.seq != 0 { - error!("expected first frag to have seq 0 but got seq {}", frag.seq); - return; - } - } else { - let current_frag = self.current_frag.as_ref().unwrap(); + if let Some(current_frag) = &self.current_frag { let expected_seq = current_frag.seq + 1; if expected_seq != frag.seq { error!("expected frag seq {} but got seq {}", expected_seq, frag.seq); return; } + } else if frag.seq != 0 { + error!("expected first frag to have seq 0 but got seq {}", frag.seq); + return; } + if self.seal.is_some() { error!("trying to apply frag after seal"); return; From 464feb8baa2f9d20b27d5c6ea57926069ccf7d47 Mon Sep 17 00:00:00 2001 From: Suthiwat Date: Mon, 1 Dec 2025 13:05:40 +0000 Subject: [PATCH 14/14] send sync --- based/Cargo.lock | 108 ++++++++++++++++++----------- based/bin/rpc/src/cli.rs | 2 +- based/bin/rpc/src/middleware.rs | 1 + based/bin/rpc/src/server.rs | 57 ++++++++++----- based/bin/rpc/src/types.rs | 6 +- based/bin/txspammer/Cargo.toml | 4 +- based/bin/txspammer/src/account.rs | 35 +++++++++- based/bin/txspammer/src/cli.rs | 4 ++ based/bin/txspammer/src/main.rs | 53 +++++++++++++- 9 files changed, 206 insertions(+), 64 deletions(-) diff --git a/based/Cargo.lock b/based/Cargo.lock index 8592bfa6b..e162f759c 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -110,9 +110,9 @@ dependencies = [ [[package]] name = "alloy-consensus" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3abecb92ba478a285fbf5689100dbafe4003ded4a09bf4b5ef62cca87cd4f79e" +checksum = "8b6440213a22df93a87ed512d2f668e7dc1d62a05642d107f82d61edc9e12370" dependencies = [ "alloy-eips", "alloy-primitives", @@ -122,6 +122,7 @@ dependencies = [ "alloy-tx-macros", "arbitrary", "auto_impl", + "borsh", "c-kzg", "derive_more", "either", @@ -137,9 +138,9 @@ dependencies = [ [[package]] name = "alloy-consensus-any" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e864d4f11d1fb8d3ac2fd8f3a15f1ee46d55ec6d116b342ed1b2cb737f25894" +checksum = "15d0bea09287942405c4f9d2a4f22d1e07611c2dbd9d5bf94b75366340f9e6e0" dependencies = [ "alloy-consensus", "alloy-eips", @@ -184,26 +185,28 @@ dependencies = [ [[package]] name = "alloy-eip2930" -version = "0.2.1" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b82752a889170df67bbb36d42ca63c531eb16274f0d7299ae2a680facba17bd" +checksum = "9441120fa82df73e8959ae0e4ab8ade03de2aaae61be313fbf5746277847ce25" dependencies = [ "alloy-primitives", "alloy-rlp", "arbitrary", + "borsh", "rand 0.8.5", "serde", ] [[package]] name = "alloy-eip7702" -version = "0.6.1" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d4769c6ffddca380b0070d71c8b7f30bed375543fe76bb2f74ec0acf4b7cd16" +checksum = "2919c5a56a1007492da313e7a3b6d45ef5edc5d33416fdec63c0d7a2702a0d20" dependencies = [ "alloy-primitives", "alloy-rlp", "arbitrary", + "borsh", "k256", "rand 0.8.5", "serde", @@ -213,9 +216,9 @@ dependencies = [ [[package]] name = "alloy-eips" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07d9a64522a0db6ebcc4ff9c904e329e77dd737c2c25d30f1bdc32ca6c6ce334" +checksum = "4bd2c7ae05abcab4483ce821f12f285e01c0b33804e6883dd9ca1569a87ee2be" dependencies = [ "alloy-eip2124", "alloy-eip2930", @@ -225,6 +228,7 @@ dependencies = [ "alloy-serde", "arbitrary", "auto_impl", + "borsh", "c-kzg", "derive_more", "either", @@ -301,9 +305,9 @@ dependencies = [ [[package]] name = "alloy-json-rpc" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f87b774478fcc616993e97659697f3e3c7988fdad598e46ee0ed11209cd0d8ee" +checksum = "003f46c54f22854a32b9cc7972660a476968008ad505427eabab49225309ec40" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -316,9 +320,9 @@ dependencies = [ [[package]] name = "alloy-network" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5d6ed73d440bae8f27771b7cd507fa8f10f19ddf0b8f67e7622a52e0dbf798e" +checksum = "4f4029954d9406a40979f3a3b46950928a0fdcfe3ea8a9b0c17490d57e8aa0e3" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -342,9 +346,9 @@ dependencies = [ [[package]] name = "alloy-network-primitives" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219dccd2cf753a43bd9b0fbb7771a16927ffdb56e43e3a15755bef1a74d614aa" +checksum = "7805124ad69e57bbae7731c9c344571700b2a18d351bda9e0eba521c991d1bcb" dependencies = [ "alloy-consensus", "alloy-eips", @@ -415,9 +419,9 @@ dependencies = [ [[package]] name = "alloy-provider" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0ef8cbc2b68e2512acf04b2d296c05c98a661bc460462add6414528f4ff3d9b" +checksum = "d369e12c92870d069e0c9dc5350377067af8a056e29e3badf8446099d7e00889" dependencies = [ "alloy-chains", "alloy-consensus", @@ -457,9 +461,9 @@ dependencies = [ [[package]] name = "alloy-pubsub" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be028fb1c6c173f5765d0baa3580a11d69826ea89fe00ee5c9d7eddb2c3509cd" +checksum = "f77d20cdbb68a614c7a86b3ffef607b37d087bb47a03c58f4c3f8f99bc3ace3b" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -501,9 +505,9 @@ dependencies = [ [[package]] name = "alloy-rpc-client" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a0f67d1e655ed93efca217213340d21cce982333cc44a1d918af9150952ef66" +checksum = "31c89883fe6b7381744cbe80fef638ac488ead4f1956a4278956a1362c71cd2e" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -564,9 +568,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-any" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "425e14ee32eb8b7edd6a2247fe0ed640785e6eba75af27db27f1e6220c15ef0d" +checksum = "b43c1622aac2508d528743fd4cfdac1dea92d5a8fa894038488ff7edd0af0b32" dependencies = [ "alloy-consensus-any", "alloy-rpc-types-eth", @@ -627,9 +631,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0185f68a0f8391ab996d335a887087d7ccdbc97952efab3516f6307d456ba2cd" +checksum = "ed5fafb741c19b3cca4cdd04fa215c89413491f9695a3e928dee2ae5657f607e" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -690,9 +694,9 @@ dependencies = [ [[package]] name = "alloy-serde" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "596cfa360922ba9af901cc7370c68640e4f72adb6df0ab064de32f21fec498d7" +checksum = "a6f180c399ca7c1e2fe17ea58343910cad0090878a696ff5a50241aee12fc529" dependencies = [ "alloy-primitives", "arbitrary", @@ -702,9 +706,9 @@ dependencies = [ [[package]] name = "alloy-signer" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f06333680d04370c8ed3a6b0eccff384e422c3d8e6b19e61fedc3a9f0ab7743" +checksum = "ecc39ad2c0a3d2da8891f4081565780703a593f090f768f884049aa3aa929cbc" dependencies = [ "alloy-primitives", "async-trait", @@ -806,9 +810,9 @@ dependencies = [ [[package]] name = "alloy-transport" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55bbdcee53e4e3857b5ddbc2986ebe9c2ab5f352ec285cb0da04c1e8f2ca9c18" +checksum = "cae82426d98f8bc18f53c5223862907cac30ab8fc5e4cd2bb50808e6d3ab43d8" dependencies = [ "alloy-json-rpc", "auto_impl", @@ -829,9 +833,9 @@ dependencies = [ [[package]] name = "alloy-transport-http" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793967215109b4a334047c810ed6db5e873ad3ea07f65cc02202bd4b810d9615" +checksum = "90aa6825760905898c106aba9c804b131816a15041523e80b6d4fe7af6380ada" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -844,9 +848,9 @@ dependencies = [ [[package]] name = "alloy-transport-ipc" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e182e5ae0c4858bb87df23ebfe31018d7e51fe1a264b8a8a2b26932cb04861" +checksum = "6ace83a4a6bb896e5894c3479042e6ba78aa5271dde599aa8c36a021d49cc8cc" dependencies = [ "alloy-json-rpc", "alloy-pubsub", @@ -864,9 +868,9 @@ dependencies = [ [[package]] name = "alloy-transport-ws" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32e9dc891c80d6216003d4b04f0a7463015d0873d36e4ac2ec0bcc9196aa4ea7" +checksum = "86c9ab4c199e3a8f3520b60ba81aa67bb21fed9ed0d8304e0569094d0758a56f" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -902,9 +906,9 @@ dependencies = [ [[package]] name = "alloy-tx-macros" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab54221eccefa254ce9f65b079c097b1796e48c21c7ce358230f8988d75392fb" +checksum = "ae109e33814b49fc0a62f2528993aa8a2dd346c26959b151f05441dc0b9da292" dependencies = [ "darling 0.21.3", "proc-macro2", @@ -2099,6 +2103,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "borsh" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1da5ab77c1437701eeff7c88d968729e7766172279eab0676857b3d63af7a6f" +dependencies = [ + "borsh-derive", + "cfg_aliases", +] + +[[package]] +name = "borsh-derive" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0686c856aa6aac0c4498f936d7d6a02df690f614c03e4d906d1018062b5c5e2c" +dependencies = [ + "once_cell", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "boyer-moore-magiclen" version = "0.2.20" @@ -11644,7 +11671,6 @@ dependencies = [ "alloy-signer", "alloy-signer-local", "bop-common", - "bop-metrics", "clap", "eyre", "futures", diff --git a/based/bin/rpc/src/cli.rs b/based/bin/rpc/src/cli.rs index 5b18a9e87..ae1767096 100644 --- a/based/bin/rpc/src/cli.rs +++ b/based/bin/rpc/src/cli.rs @@ -24,7 +24,7 @@ pub struct RpcArgs { pub eth_http_url: String, /// tx receiver url - #[arg(long = "sequencer.url", default_value = "http://0.0.0.0:8545")] + #[arg(long = "sequencer.url")] pub tx_receiver_url: Option, /// Enable debug logging diff --git a/based/bin/rpc/src/middleware.rs b/based/bin/rpc/src/middleware.rs index 57a38e965..19e314fab 100644 --- a/based/bin/rpc/src/middleware.rs +++ b/based/bin/rpc/src/middleware.rs @@ -23,6 +23,7 @@ pub struct EthApiProxy { const SUPPORTED_METHODS: &[&str] = &[ "eth_sendRawTransaction", + "eth_sendRawTransactionSync", "eth_getTransactionReceipt", // "eth_getBlockByNumber", // "eth_getBlockByHash", diff --git a/based/bin/rpc/src/server.rs b/based/bin/rpc/src/server.rs index 7695cb0ab..bd848a4aa 100644 --- a/based/bin/rpc/src/server.rs +++ b/based/bin/rpc/src/server.rs @@ -10,7 +10,10 @@ use alloy_primitives::{ }; use alloy_provider::Provider; use alloy_rpc_types::{BlockOverrides, Header, state::StateOverride}; -use bop_common::p2p::{EnvV0, FragV0, SealV0, StateUpdate}; +use bop_common::{ + p2p::{EnvV0, FragV0, SealV0, StateUpdate}, + transaction::Transaction, +}; use eyre::Result; use jsonrpsee::{ core::{RpcResult, async_trait}, @@ -98,6 +101,7 @@ impl Server { } pub fn on_frag(&self, frag: FragV0, state_update: Option) { + self.notify_tx_send_sync_waiter(&state_update); let mut unsealed_block = self.unsealed_stack.upgradable_read(); if unsealed_block.blocks.is_empty() { return; @@ -111,6 +115,13 @@ impl Server { return; } + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.back_mut().unwrap().apply_frag(frag, state_update); + blocks.rebuild_overrides(); + }); + } + + pub fn notify_tx_send_sync_waiter(&self, state_update: &Option) { if let Some(state_update) = &state_update { let mut tx_send_sync_waiter = self.tx_send_sync_waiter.upgradable_read(); for (tx_hash, receipt) in state_update.receipts.iter() { @@ -123,11 +134,6 @@ impl Server { } } } - - unsealed_block.with_upgraded(|blocks| { - blocks.blocks.back_mut().unwrap().apply_frag(frag, state_update); - blocks.rebuild_overrides(); - }); } pub async fn get_transaction_count(&self, address: Address) -> Result { @@ -191,7 +197,10 @@ impl Server { impl EthApiServer for Server { async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult { match self.tx_receiver_provider.send_raw_transaction(&bytes).await { - Ok(pending_tx) => Ok(*pending_tx.tx_hash()), + Ok(pending_tx) => { + debug!("sent transaction with hash {}", *pending_tx.tx_hash()); + Ok(*pending_tx.tx_hash()) + } Err(e) => Err(ErrorObject::owned( jsonrpsee::types::error::INTERNAL_ERROR_CODE, "Failed to send transaction", @@ -200,7 +209,12 @@ impl EthApiServer for Server { } } - async fn send_raw_transaction_sync(&self, bytes: Bytes, timeout_ms: u64) -> RpcResult { + async fn send_raw_transaction_sync( + &self, + bytes: Bytes, + maybe_timeout_ms: Option, + ) -> RpcResult { + let timeout_ms = maybe_timeout_ms.unwrap_or(3000); if timeout_ms > 3000 { return Err(ErrorObject::owned( jsonrpsee::types::error::INTERNAL_ERROR_CODE, @@ -209,10 +223,24 @@ impl EthApiServer for Server { )); } - let start = Instant::now(); + let tx_hash = match Transaction::decode(bytes.clone()) { + Ok(tx) => tx.tx_hash(), + Err(e) => { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INVALID_PARAMS_CODE, + "Invalid transaction bytes", + Some(e.to_string()), + )); + } + }; + let (tx_send_sync_waiter_tx, tx_send_sync_waiter_rx) = oneshot::channel(); + self.tx_send_sync_waiter.write().insert(tx_hash, tx_send_sync_waiter_tx); - let pending_tx = match self.tx_receiver_provider.send_raw_transaction(&bytes).await { - Ok(pending_tx) => pending_tx, + let start = Instant::now(); + match self.tx_receiver_provider.send_raw_transaction(&bytes).await { + Ok(_pending_tx) => { + // Transaction sent successfully, tx_hash already calculated above + } Err(e) => { return Err(ErrorObject::owned( jsonrpsee::types::error::INTERNAL_ERROR_CODE, @@ -220,12 +248,9 @@ impl EthApiServer for Server { Some(e.to_string()), )) } - }; - - let tx_hash = *pending_tx.tx_hash(); + } - let (tx_send_sync_waiter_tx, tx_send_sync_waiter_rx) = oneshot::channel(); - self.tx_send_sync_waiter.write().insert(tx_hash, tx_send_sync_waiter_tx); + debug!("sent transaction sync with hash {}", tx_hash); match tx_send_sync_waiter_rx.await { Ok(receipt) => return Ok(receipt), diff --git a/based/bin/rpc/src/types.rs b/based/bin/rpc/src/types.rs index 5739f9d50..868773ca2 100644 --- a/based/bin/rpc/src/types.rs +++ b/based/bin/rpc/src/types.rs @@ -17,7 +17,11 @@ pub trait EthApi { /// Sends signed transaction, waiting for it to be mined and returning the receipt #[method(name = "sendRawTransactionSync")] - async fn send_raw_transaction_sync(&self, bytes: Bytes, timeout_ms: u64) -> RpcResult; + async fn send_raw_transaction_sync( + &self, + bytes: Bytes, + maybe_timeout_ms: Option, + ) -> RpcResult; /// Returns the receipt of a transaction by transaction hash #[method(name = "getTransactionReceipt")] diff --git a/based/bin/txspammer/Cargo.toml b/based/bin/txspammer/Cargo.toml index 5f88f97c9..fcf7891c6 100644 --- a/based/bin/txspammer/Cargo.toml +++ b/based/bin/txspammer/Cargo.toml @@ -8,13 +8,13 @@ version.workspace = true alloy-consensus.workspace = true alloy-eips.workspace = true alloy-primitives.workspace = true -alloy-provider.workspace = true +alloy-provider = "1.1.2" # Need this for eth_sendRawTransactionSync alloy-rpc-types.workspace = true alloy-signer.workspace = true alloy-signer-local.workspace = true bop-common.workspace = true -bop-metrics.workspace = true + clap.workspace = true eyre.workspace = true futures.workspace = true diff --git a/based/bin/txspammer/src/account.rs b/based/bin/txspammer/src/account.rs index e60ef40ce..993203f53 100644 --- a/based/bin/txspammer/src/account.rs +++ b/based/bin/txspammer/src/account.rs @@ -2,7 +2,7 @@ use alloy_consensus::{SignableTransaction, TxEip1559, TxEnvelope}; use alloy_eips::eip2718::Encodable2718; use alloy_primitives::{B256, Bytes, U256}; use alloy_provider::{Provider, RootProvider}; -use alloy_rpc_types::AccessList; +use alloy_rpc_types::{AccessList, TransactionReceipt}; use alloy_signer::SignerSync; use alloy_signer_local::PrivateKeySigner; @@ -90,6 +90,39 @@ impl Account { Ok(*tx.tx_hash()) } + pub async fn transfer_sync( + &mut self, + to: &mut Account, + spec: &TxSpec, + provider: &RootProvider, + sequencer: &Option, + ) -> eyre::Result { + let tx = TxEip1559 { + chain_id: spec.chain_id, + nonce: self.nonce, + gas_limit: spec.gas_limit, + max_fee_per_gas: spec.max_fee_per_gas, + max_priority_fee_per_gas: spec.max_priority_fee_per_gas, + to: to.signer.address().into(), + value: spec.value, + access_list: AccessList::default(), + input: Bytes::new(), + }; + + let sig = self.signer.sign_hash_sync(&tx.signature_hash()).unwrap(); + let tx: TxEnvelope = tx.into_signed(sig).into(); + let encoded = tx.encoded_2718(); + let provider_to_use = sequencer.as_ref().unwrap_or(provider); + + let receipt = provider_to_use.send_raw_transaction_sync(&encoded).await.unwrap(); + + self.nonce += 1; + self.balance -= spec.value + U256::from(spec.gas_limit) * U256::from(spec.max_fee_per_gas); + to.balance += spec.value; + + Ok(receipt) + } + pub async fn _self_transfer( &mut self, spec: &TxSpec, diff --git a/based/bin/txspammer/src/cli.rs b/based/bin/txspammer/src/cli.rs index 95d3b105d..c0014afff 100644 --- a/based/bin/txspammer/src/cli.rs +++ b/based/bin/txspammer/src/cli.rs @@ -40,6 +40,10 @@ pub struct TxSpammerArgs { /// Max priority fee per gas (miner tip) for EIP-1559 transactions (in Wei). #[arg(long = "max_priority_fee_per_gas", default_value_t = 20)] pub max_priority_fee_per_gas: u128, + /// Send eth_sendRawTransactionSync. this sends the transaction and waits for the receipt over and over to test + /// inclusion latency. + #[arg(long = "send_sync", action = clap::ArgAction::SetTrue)] + pub send_sync: bool, // --- Network Configuration --- /// The JSON-RPC URL of the Ethereum node (HTTP or WebSocket). diff --git a/based/bin/txspammer/src/main.rs b/based/bin/txspammer/src/main.rs index 66b29f387..1b42b5031 100644 --- a/based/bin/txspammer/src/main.rs +++ b/based/bin/txspammer/src/main.rs @@ -39,6 +39,7 @@ struct TxSpammer { sequencer: Option, root_account: Account, target_accounts: Vec, + send_sync_account: Option, tx_spec: TxSpec, args: TxSpammerArgs, request_rx: Option>, @@ -90,6 +91,7 @@ impl TxSpammer { sequencer, root_account, target_accounts: Vec::new(), + send_sync_account: None, tx_spec, args, request_rx: Some(request_rx), @@ -130,6 +132,12 @@ impl TxSpammer { } self.target_accounts = target_accounts; + + if self.args.send_sync { + let mut account = Account::new(account_generator.next()); + account.refresh(provider).await.expect("failed to fetch account nonce and balance"); + self.send_sync_account = Some(account); + } } pub async fn fund_target_accounts(&mut self) { @@ -139,7 +147,7 @@ impl TxSpammer { let threshold = parse_ether("0.01").unwrap(); // Fund target accounts - for account in self.target_accounts.iter_mut() { + for account in self.target_accounts.iter_mut().chain(self.send_sync_account.iter_mut()) { let amount_to_fund = funding_amount.saturating_sub(account.balance); if amount_to_fund.is_zero() || amount_to_fund < threshold { continue; @@ -166,7 +174,7 @@ impl TxSpammer { sleep(std::time::Duration::from_secs(1)).await; // Verifying account states - for account in self.target_accounts.iter_mut() { + for account in self.target_accounts.iter_mut().chain(self.send_sync_account.iter_mut()) { account.refresh(provider).await.expect("failed to fetch account nonce and balance"); if funding_amount.saturating_sub(account.balance) > threshold { panic!( @@ -178,6 +186,7 @@ impl TxSpammer { print!("Verifying {:?} balance: {} eth \r", account.signer.address(), format_ether(account.balance)); stdout().flush().unwrap(); } + info!("All {} target accounts are funded and ready. ", self.target_accounts.len()); } @@ -306,6 +315,42 @@ impl TxSpammer { }); } } + + pub fn spawn_sync_spammer(&self) { + let Some(mut send_sync_account) = self.send_sync_account.clone() else { + return; + }; + let full_provider = self.full_provider.clone(); + let tx_spec = self.tx_spec.clone(); + let mut accounts_clone = self.target_accounts.clone(); + let mut rag2 = rand::rngs::StdRng::seed_from_u64(Instant::now().elapsed().as_nanos() as u64); + let n = accounts_clone.len(); + let mut latencies = Percentile::new(); + let mut log_timer = Instant::now(); + tokio::spawn(async move { + loop { + let to = &mut accounts_clone[rag2.random_range(0..n)]; + let start_sending_at = Instant::now(); + send_sync_account.transfer_sync(to, &tx_spec, &full_provider, &None).await.expect("failed to send tx"); + let latency = start_sending_at.elapsed(); + latencies.add(latency.as_secs_f64()); + + if log_timer.elapsed() > Duration::from_secs(5) { + let p50 = latencies.percentile(50.0).unwrap_or(0.0); + let p95 = latencies.percentile(95.0).unwrap_or(0.0); + let p99 = latencies.percentile(99.0).unwrap_or(0.0); + info!( + "Send sync: Last 5s: {} tx confirmed, Latency P50: {:.2}s, P95: {:.2}s, P99: {:.2}s", + latencies.len(), + p50, + p95, + p99 + ); + log_timer = Instant::now(); + } + } + }); + } } #[tokio::main] @@ -341,6 +386,10 @@ async fn main() -> eyre::Result<()> { spammer.spawn_stats_logger(); spammer.spawn_spammer(); + if spammer.args.send_sync { + spammer.spawn_sync_spammer(); + } + tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c"); info!("Received Ctrl-C, shutting down...");