diff --git a/Makefile b/Makefile index cd2ab2dd8..a3c9b922f 100644 --- a/Makefile +++ b/Makefile @@ -30,8 +30,8 @@ L2_CHAIN_ID?=$(shell \ L2_CHAIN_ID_HEX:=$(shell printf "0x%064x" $(L2_CHAIN_ID)) PORTAL?=http://0.0.0.0:8080 TXPROXY?=http://0.0.0.0:8090 -L1_RPC_URL?=http://3.84.162.42:8545 -L1_BEACON_RPC_URL?=http://3.84.162.42:5051 +L1_RPC_URL?=https://ethereum-sepolia-rpc.publicnode.com +L1_BEACON_RPC_URL?=https://ethereum-sepolia-beacon-api.publicnode.com PUBLIC_IP?=$(shell curl ifconfig.me) # if GATEWAY_SEQUENCING_KEY is set, use that one, otherwise key_to_address will generate a new one GATEWAY_SEQUENCING_KEY ?= $(shell \ diff --git a/based/Cargo.lock b/based/Cargo.lock index af514c813..e162f759c 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -110,9 +110,9 @@ dependencies = [ [[package]] name = "alloy-consensus" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3abecb92ba478a285fbf5689100dbafe4003ded4a09bf4b5ef62cca87cd4f79e" +checksum = "8b6440213a22df93a87ed512d2f668e7dc1d62a05642d107f82d61edc9e12370" dependencies = [ "alloy-eips", "alloy-primitives", @@ -122,6 +122,7 @@ dependencies = [ "alloy-tx-macros", "arbitrary", "auto_impl", + "borsh", "c-kzg", "derive_more", "either", @@ -137,9 +138,9 @@ dependencies = [ [[package]] name = "alloy-consensus-any" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e864d4f11d1fb8d3ac2fd8f3a15f1ee46d55ec6d116b342ed1b2cb737f25894" +checksum = "15d0bea09287942405c4f9d2a4f22d1e07611c2dbd9d5bf94b75366340f9e6e0" dependencies = [ "alloy-consensus", "alloy-eips", @@ -184,26 +185,28 @@ dependencies = [ [[package]] name = "alloy-eip2930" -version = "0.2.1" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b82752a889170df67bbb36d42ca63c531eb16274f0d7299ae2a680facba17bd" +checksum = "9441120fa82df73e8959ae0e4ab8ade03de2aaae61be313fbf5746277847ce25" dependencies = [ "alloy-primitives", "alloy-rlp", "arbitrary", + "borsh", "rand 0.8.5", "serde", ] [[package]] name = "alloy-eip7702" -version = "0.6.1" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d4769c6ffddca380b0070d71c8b7f30bed375543fe76bb2f74ec0acf4b7cd16" +checksum = "2919c5a56a1007492da313e7a3b6d45ef5edc5d33416fdec63c0d7a2702a0d20" dependencies = [ "alloy-primitives", "alloy-rlp", "arbitrary", + "borsh", "k256", "rand 0.8.5", "serde", @@ -213,9 +216,9 @@ dependencies = [ [[package]] name = "alloy-eips" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07d9a64522a0db6ebcc4ff9c904e329e77dd737c2c25d30f1bdc32ca6c6ce334" +checksum = "4bd2c7ae05abcab4483ce821f12f285e01c0b33804e6883dd9ca1569a87ee2be" dependencies = [ "alloy-eip2124", "alloy-eip2930", @@ -225,6 +228,7 @@ dependencies = [ "alloy-serde", "arbitrary", "auto_impl", + "borsh", "c-kzg", "derive_more", "either", @@ -301,9 +305,9 @@ dependencies = [ [[package]] name = "alloy-json-rpc" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f87b774478fcc616993e97659697f3e3c7988fdad598e46ee0ed11209cd0d8ee" +checksum = "003f46c54f22854a32b9cc7972660a476968008ad505427eabab49225309ec40" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -316,9 +320,9 @@ dependencies = [ [[package]] name = "alloy-network" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5d6ed73d440bae8f27771b7cd507fa8f10f19ddf0b8f67e7622a52e0dbf798e" +checksum = "4f4029954d9406a40979f3a3b46950928a0fdcfe3ea8a9b0c17490d57e8aa0e3" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -342,9 +346,9 @@ dependencies = [ [[package]] name = "alloy-network-primitives" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219dccd2cf753a43bd9b0fbb7771a16927ffdb56e43e3a15755bef1a74d614aa" +checksum = "7805124ad69e57bbae7731c9c344571700b2a18d351bda9e0eba521c991d1bcb" dependencies = [ "alloy-consensus", "alloy-eips", @@ -415,9 +419,9 @@ dependencies = [ [[package]] name = "alloy-provider" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0ef8cbc2b68e2512acf04b2d296c05c98a661bc460462add6414528f4ff3d9b" +checksum = "d369e12c92870d069e0c9dc5350377067af8a056e29e3badf8446099d7e00889" dependencies = [ "alloy-chains", "alloy-consensus", @@ -457,9 +461,9 @@ dependencies = [ [[package]] name = "alloy-pubsub" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be028fb1c6c173f5765d0baa3580a11d69826ea89fe00ee5c9d7eddb2c3509cd" +checksum = "f77d20cdbb68a614c7a86b3ffef607b37d087bb47a03c58f4c3f8f99bc3ace3b" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -501,9 +505,9 @@ dependencies = [ [[package]] name = "alloy-rpc-client" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a0f67d1e655ed93efca217213340d21cce982333cc44a1d918af9150952ef66" +checksum = "31c89883fe6b7381744cbe80fef638ac488ead4f1956a4278956a1362c71cd2e" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -564,9 +568,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-any" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "425e14ee32eb8b7edd6a2247fe0ed640785e6eba75af27db27f1e6220c15ef0d" +checksum = "b43c1622aac2508d528743fd4cfdac1dea92d5a8fa894038488ff7edd0af0b32" dependencies = [ "alloy-consensus-any", "alloy-rpc-types-eth", @@ -627,9 +631,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0185f68a0f8391ab996d335a887087d7ccdbc97952efab3516f6307d456ba2cd" +checksum = "ed5fafb741c19b3cca4cdd04fa215c89413491f9695a3e928dee2ae5657f607e" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -690,9 +694,9 @@ dependencies = [ [[package]] name = "alloy-serde" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "596cfa360922ba9af901cc7370c68640e4f72adb6df0ab064de32f21fec498d7" +checksum = "a6f180c399ca7c1e2fe17ea58343910cad0090878a696ff5a50241aee12fc529" dependencies = [ "alloy-primitives", "arbitrary", @@ -702,9 +706,9 @@ dependencies = [ [[package]] name = "alloy-signer" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f06333680d04370c8ed3a6b0eccff384e422c3d8e6b19e61fedc3a9f0ab7743" +checksum = "ecc39ad2c0a3d2da8891f4081565780703a593f090f768f884049aa3aa929cbc" dependencies = [ "alloy-primitives", "async-trait", @@ -806,9 +810,9 @@ dependencies = [ [[package]] name = "alloy-transport" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55bbdcee53e4e3857b5ddbc2986ebe9c2ab5f352ec285cb0da04c1e8f2ca9c18" +checksum = "cae82426d98f8bc18f53c5223862907cac30ab8fc5e4cd2bb50808e6d3ab43d8" dependencies = [ "alloy-json-rpc", "auto_impl", @@ -829,9 +833,9 @@ dependencies = [ [[package]] name = "alloy-transport-http" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793967215109b4a334047c810ed6db5e873ad3ea07f65cc02202bd4b810d9615" +checksum = "90aa6825760905898c106aba9c804b131816a15041523e80b6d4fe7af6380ada" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -844,9 +848,9 @@ dependencies = [ [[package]] name = "alloy-transport-ipc" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e182e5ae0c4858bb87df23ebfe31018d7e51fe1a264b8a8a2b26932cb04861" +checksum = "6ace83a4a6bb896e5894c3479042e6ba78aa5271dde599aa8c36a021d49cc8cc" dependencies = [ "alloy-json-rpc", "alloy-pubsub", @@ -864,9 +868,9 @@ dependencies = [ [[package]] name = "alloy-transport-ws" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32e9dc891c80d6216003d4b04f0a7463015d0873d36e4ac2ec0bcc9196aa4ea7" +checksum = "86c9ab4c199e3a8f3520b60ba81aa67bb21fed9ed0d8304e0569094d0758a56f" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -902,9 +906,9 @@ dependencies = [ [[package]] name = "alloy-tx-macros" -version = "1.0.42" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab54221eccefa254ce9f65b079c097b1796e48c21c7ce358230f8988d75392fb" +checksum = "ae109e33814b49fc0a62f2528993aa8a2dd346c26959b151f05441dc0b9da292" dependencies = [ "darling 0.21.3", "proc-macro2", @@ -1590,6 +1594,42 @@ dependencies = [ "tracing", ] +[[package]] +name = "based-rpc" +version = "0.1.0" +dependencies = [ + "alloy-eips", + "alloy-network", + "alloy-primitives", + "alloy-provider", + "alloy-rpc-types", + "bop-common", + "bop-metrics", + "clap", + "crossbeam-channel", + "eyre", + "futures", + "futures-util", + "http", + "jsonrpsee", + "op-alloy-network", + "op-alloy-rpc-types", + "op-alloy-rpc-types-engine", + "parking_lot", + "reqwest", + "reth-rpc-layer", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tokio-websockets", + "tower", + "tower-http", + "tracing", + "tracing-subscriber 0.3.20", + "ttlhashmap", +] + [[package]] name = "based-txproxy" version = "0.1.0" @@ -2063,6 +2103,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "borsh" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1da5ab77c1437701eeff7c88d968729e7766172279eab0676857b3d63af7a6f" +dependencies = [ + "borsh-derive", + "cfg_aliases", +] + +[[package]] +name = "borsh-derive" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0686c856aa6aac0c4498f936d7d6a02df690f614c03e4d906d1018062b5c5e2c" +dependencies = [ + "once_cell", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "boyer-moore-magiclen" version = "0.2.20" @@ -11548,6 +11611,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "ttlhashmap" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50ddd0e3ea228684ba88409bd2987421f96130063f68ae29c0ac322219be83f2" + [[package]] name = "tui-big-text" version = "0.7.1" @@ -11602,7 +11671,6 @@ dependencies = [ "alloy-signer", "alloy-signer-local", "bop-common", - "bop-metrics", "clap", "eyre", "futures", diff --git a/based/Cargo.toml b/based/Cargo.toml index e5df44acc..75383c468 100644 --- a/based/Cargo.toml +++ b/based/Cargo.toml @@ -41,6 +41,7 @@ either = "1.15.0" ethereum_ssz = "0.9.0" eyre = "0.6.12" futures = "0.3.31" +futures-util = "0.3.31" hickory-resolver = "=0.25.0-alpha.5" # Use the exact version reth expects http = "1.3.1" hyper = "1.5.2" @@ -59,6 +60,7 @@ op-alloy-rpc-types = "0.22.0" op-alloy-rpc-types-engine = { version = "0.22.0", default-features = false, features = [ "serde", ] } +op-alloy-provider = "0.22.0" op-revm = "12.0.1" parking_lot = "0.12.3" paste = "0.1.18" @@ -124,6 +126,7 @@ strum_macros = "0.24" tabwriter = "1.4.1" thiserror = "2.0.11" tokio = { version = "1.43.0", features = ["full"] } +tokio-websockets = { version = "0.12.1", features = ["client", "openssl", "rand", "server"] } toml = "0.8.19" tower = { version = "0.5", features = ["timeout"] } tower-http = { version = "0.6", features = ["cors"] } diff --git a/based/bin/rpc/Cargo.toml b/based/bin/rpc/Cargo.toml new file mode 100644 index 000000000..421187e99 --- /dev/null +++ b/based/bin/rpc/Cargo.toml @@ -0,0 +1,37 @@ +[package] +edition.workspace = true +name = "based-rpc" +rust-version.workspace = true +version.workspace = true + +[dependencies] +alloy-eips.workspace = true +alloy-primitives.workspace = true +alloy-rpc-types.workspace = true +alloy-provider.workspace = true +alloy-network.workspace = true +bop-common.workspace = true +bop-metrics.workspace = true +clap.workspace = true +eyre.workspace = true +futures.workspace = true +jsonrpsee.workspace = true +op-alloy-rpc-types.workspace = true +op-alloy-rpc-types-engine.workspace = true +op-alloy-network.workspace = true +parking_lot.workspace = true +reqwest.workspace = true +reth-rpc-layer.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tokio.workspace = true +tower.workspace = true +tower-http.workspace = true +tracing.workspace = true +tokio-websockets.workspace = true +http.workspace = true +futures-util.workspace = true +crossbeam-channel.workspace = true +tracing-subscriber.workspace = true +ttlhashmap = "0.1.0" diff --git a/based/bin/rpc/src/cli.rs b/based/bin/rpc/src/cli.rs new file mode 100644 index 000000000..ae1767096 --- /dev/null +++ b/based/bin/rpc/src/cli.rs @@ -0,0 +1,65 @@ +use std::path::PathBuf; + +use bop_common::config::{LoggingConfig, LoggingFlags}; +use clap::Parser; +use tracing::level_filters::LevelFilter; + +#[derive(Parser, Debug, Clone)] +#[command(version, about, name = "based-rpc")] +pub struct RpcArgs { + /// The port to run the rpc on + #[arg(long = "port", default_value_t = 7545)] + pub port: u16, + + /// ws url of the frag stream + #[arg(long = "frag.url", default_value = "ws://0.0.0.0:9999/state_stream")] + pub frag_url: String, + + /// ws url of eth rpc + #[arg(long = "eth.ws.url", default_value = "ws://0.0.0.0:8546")] + pub eth_ws_url: String, + + /// http url of eth rpc + #[arg(long = "eth.http.url", default_value = "http://0.0.0.0:8545")] + pub eth_http_url: String, + + /// tx receiver url + #[arg(long = "sequencer.url")] + pub tx_receiver_url: Option, + + /// Enable debug logging + #[arg(long)] + pub debug: bool, + /// Enable trace logging + #[arg(long)] + pub trace: bool, + /// Enable file logging + #[arg(long = "log.disable_file_logging", action = clap::ArgAction::SetFalse, default_value_t = true)] + pub file_logging: bool, + /// Prefix of log files + #[arg(long = "log.prefix", default_value = "bop-txproxy.log")] + pub log_prefix: String, + /// Path for log files + #[arg(long = "log.dir", default_value = "/tmp")] + pub log_dir: PathBuf, + /// Maximum number of log files + #[arg(long = "log.max_files", default_value_t = 100)] + pub log_max_files: usize, +} + +impl From<&RpcArgs> for LoggingConfig { + fn from(args: &RpcArgs) -> Self { + Self { + level: args + .trace + .then_some(LevelFilter::TRACE) + .or(args.debug.then_some(LevelFilter::DEBUG)) + .unwrap_or(LevelFilter::INFO), + flags: if args.file_logging { LoggingFlags::all() } else { LoggingFlags::StdOut }, + prefix: args.file_logging.then(|| args.log_prefix.clone()), + max_files: args.log_max_files, + path: args.log_dir.clone(), + filters: None, + } + } +} diff --git a/based/bin/rpc/src/listener.rs b/based/bin/rpc/src/listener.rs new file mode 100644 index 000000000..02c2b4f1e --- /dev/null +++ b/based/bin/rpc/src/listener.rs @@ -0,0 +1,62 @@ +use std::{str::FromStr, time::Duration}; + +use alloy_provider::Provider; +use alloy_rpc_types::Header; +use bop_common::p2p::SignedVersionedMessage; +use crossbeam_channel::Sender; +use futures_util::stream::StreamExt; +use http::Uri; +use tokio_websockets::ClientBuilder; +use tracing::{error, info, warn}; + +use crate::types::OpRootProvider; + +pub fn spawn_block_listener(provider: OpRootProvider, block_tx: Sender
) { + tokio::spawn(async move { + loop { + info!("Attempting to subscribe to L1 block headers..."); + let sub_result = provider.subscribe_blocks().await; + + let mut block_stream = match sub_result { + Ok(sub) => { + info!("Successfully subscribed to L1 block headers."); + sub.into_stream() + } + Err(e) => { + error!(error = %e, "Failed to subscribe to L1 blocks, retrying in 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + while let Some(header) = block_stream.next().await { + block_tx.send(header).expect("failed to send block header"); + } + warn!("header stream ended. Attempting to resubscribe..."); + panic!("WS connection dropped. Restart the process."); // TODO: handle reconnection + // properly + } + }); +} + +pub fn spawn_receipt_listener_frag_stream(frag_url: &str, message_tx: Sender) { + let frag_url = frag_url.to_string(); + tokio::spawn(async move { + loop { + let uri = Uri::from_str(&frag_url).expect("invalid frag stream url"); + let maybe_client = ClientBuilder::from_uri(uri).connect().await; + let Ok((mut client, _)) = maybe_client else { + error!("failed to connect to frag stream, reconnecting..."); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + while let Some(Ok(msg)) = client.next().await { + let stream_data = msg.as_text().and_then(|s| serde_json::from_str::(s).ok()); + if let Some(msg) = stream_data { + message_tx.send(msg).expect("failed to send message"); + } + } + error!("frag stream closed, reconnecting..."); + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); +} diff --git a/based/bin/rpc/src/main.rs b/based/bin/rpc/src/main.rs new file mode 100644 index 000000000..277d12344 --- /dev/null +++ b/based/bin/rpc/src/main.rs @@ -0,0 +1,180 @@ +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + str::FromStr, + thread, + time::Duration, +}; + +use alloy_primitives::Address; +use alloy_provider::{Provider, ProviderBuilder, WsConnect}; +use bop_common::{ + p2p::VersionedMessage, + utils::{init_tracing, wait_for_signal}, +}; +use clap::Parser; +use cli::RpcArgs; +use jsonrpsee::{ + server::{ServerBuilder, ServerConfigBuilder}, + ws_client::RpcServiceBuilder, +}; +use op_alloy_network::Optimism; +use reqwest::Url; +use tokio::time::interval; +use tower::ServiceBuilder; +use tower_http::cors::{Any, CorsLayer}; +use tracing::{debug, error, info, warn}; + +use crate::{ + listener::{spawn_block_listener, spawn_receipt_listener_frag_stream}, + middleware::EthApiProxy, + server::{Server, create_client}, + types::EthApiServer, +}; + +mod cli; +mod listener; +mod middleware; +mod server; +mod types; +mod unsealed_block; + +#[tokio::main(flavor = "multi_thread", worker_threads = 10)] +async fn main() -> eyre::Result<()> { + let args = RpcArgs::parse(); + let _guard = init_tracing((&args).into()); + + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), args.port); + + let (block_tx, block_rx) = crossbeam_channel::bounded(100); + let (message_tx, message_rx) = crossbeam_channel::bounded(100); + + let eth_ws_url = args.eth_ws_url.clone(); + let provider_with_filler = ProviderBuilder::<_, _, Optimism>::default() + .connect_ws(WsConnect::new(Url::parse(ð_ws_url).unwrap())) + .await + .expect("failed to connect to eth rpc"); + + let provider = provider_with_filler.root(); + + spawn_receipt_listener_frag_stream(args.frag_url.as_str(), message_tx); + spawn_block_listener(provider.clone(), block_tx); + + let tx_receiver_provider = match args.tx_receiver_url { + Some(url) => { + let parsed_url = Url::parse(&url).expect("invalid tx receiver url"); + let provider_with_filler = match parsed_url.scheme() { + "ws" | "wss" => ProviderBuilder::<_, _, Optimism>::default() + .connect_ws(WsConnect::new(parsed_url)) + .await + .expect("failed to connect to tx receiver via ws"), + "http" | "https" => ProviderBuilder::<_, _, Optimism>::default().connect_http(parsed_url), + _ => panic!("unsupported URL scheme for tx receiver: {}", parsed_url.scheme()), + }; + provider_with_filler.root().clone() + } + None => provider.clone(), + }; + + let server_obj = Server::new(provider.clone(), tx_receiver_provider); + + let server = server_obj.clone(); + thread::spawn(move || { + loop { + let mut should_sleep = true; + + while let Ok(msg) = message_rx.try_recv() { + match msg.message { + VersionedMessage::FragV0(frag) => { + debug!("got frag: block number {} seq {}", frag.block_number, frag.seq); + server.on_frag(frag, msg.state_update); + } + VersionedMessage::SealV0(seal) => { + debug!("got seal: block number {}", seal.block_number); + server.on_seal(seal); + if msg.state_update.is_some() { + error!("seal message should not contain state update"); + } + } + VersionedMessage::EnvV0(env) => { + debug!("got env: block number {}", env.number); + server.on_env(env); + if msg.state_update.is_some() { + error!("env message should not contain state update"); + } + } + _ => { + warn!("unsupported message type: {:?}", msg.message); + if msg.state_update.is_some() { + error!("unsupported message type should not contain state update"); + } + } + } + should_sleep = false; + } + + while let Ok(header) = block_rx.try_recv() { + debug!("got block header: block number {}", header.number); + server.on_header(header); + should_sleep = false; + } + + if should_sleep { + thread::sleep(Duration::from_millis(1)); + } + } + }); + + if cfg!(debug_assertions) { + let server = server_obj.clone(); + tokio::spawn(async move { + let address_to_check = Address::from_str("0x0E2d15588e765f0ba315313C726041EA124e36CB").unwrap(); + let mut interval = interval(Duration::from_secs_f64(0.1)); + loop { + let transaction_count = server.get_transaction_count(address_to_check).await.unwrap(); + let balance = server.get_balance(address_to_check).await.unwrap(); + let block_number = server.block_number().await.unwrap(); + info!("block number: {} count: {} balance: {:?}", block_number, transaction_count, balance); + interval.tick().await; + } + }); + } + + let server = server_obj.clone(); + tokio::spawn(async move { + loop { + server.cleanup_tx_send_sync_waiter(); + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + + // temp: remove when factoring out the portal + let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any); + let cors_middleware = ServiceBuilder::new().layer(cors); + + let rpc_middleware = RpcServiceBuilder::new().layer_fn(move |s| EthApiProxy { + inner: s, + geth_client: create_client(Url::parse(args.eth_http_url.as_str()).unwrap(), Duration::from_secs(2)).unwrap(), + }); + + let rpc_server = ServerBuilder::default() + .set_config(ServerConfigBuilder::new().max_request_body_size(u32::MAX).max_response_body_size(u32::MAX).build()) + .set_rpc_middleware(rpc_middleware) + .set_http_middleware(cors_middleware) + .build(addr) + .await?; + + let module = EthApiServer::into_rpc(server_obj); + let server_handle = rpc_server.start(module); + + tokio::select! { + _ = server_handle.stopped() => { + error!("server stopped"); + } + + _ = wait_for_signal() => { + info!("received signal, shutting down"); + } + } + + Ok(()) +} diff --git a/based/bin/rpc/src/middleware.rs b/based/bin/rpc/src/middleware.rs new file mode 100644 index 000000000..19e314fab --- /dev/null +++ b/based/bin/rpc/src/middleware.rs @@ -0,0 +1,98 @@ +use futures::FutureExt; +use jsonrpsee::{ + MethodResponse, + core::{ + ClientError, + client::ClientT, + middleware::{Batch, Notification}, + traits::ToRpcParams, + }, + server::middleware::rpc::RpcServiceT, + types::{ErrorObject, Params, Request, ResponsePayload, error::INTERNAL_ERROR_CODE}, +}; +use serde_json::value::RawValue; +use tracing::{debug, error}; + +pub type RpcClient = jsonrpsee::http_client::HttpClient; + +#[derive(Clone)] +pub struct EthApiProxy { + pub inner: S, + pub geth_client: RpcClient, +} + +const SUPPORTED_METHODS: &[&str] = &[ + "eth_sendRawTransaction", + "eth_sendRawTransactionSync", + "eth_getTransactionReceipt", + // "eth_getBlockByNumber", + // "eth_getBlockByHash", + "eth_blockNumber", + "eth_getTransactionCount", + "eth_getBalance", + "eth_call", +]; + +impl RpcServiceT for EthApiProxy +where + S: RpcServiceT + Send + Sync + Clone + 'static, +{ + type BatchResponse = S::BatchResponse; + type MethodResponse = S::MethodResponse; + type NotificationResponse = S::NotificationResponse; + + #[tracing::instrument(skip_all, name = "middleware")] + fn call<'a>(&self, req: Request<'a>) -> impl Future + Send + 'a { + let inner = self.inner.clone(); + let fallback_client = self.geth_client.clone(); + + async move { + if SUPPORTED_METHODS.contains(&req.method_name()) { + inner.call(req).await + } else { + external_call(fallback_client.clone(), &req).await + } + } + .boxed() + } + + fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { + self.inner.batch(batch) + } + + fn notification<'a>(&self, n: Notification<'a>) -> impl Future + Send + 'a { + self.inner.notification(n) + } +} + +struct WrapParams<'a>(Params<'a>); +impl ToRpcParams for WrapParams<'_> { + fn to_rpc_params(self) -> Result>, serde_json::Error> { + self.0.as_str().map(String::from).map(RawValue::from_string).transpose() + } +} + +async fn external_call(client: S, req: &Request<'_>) -> MethodResponse +where + S: ClientT + Send + Sync + 'static, +{ + let r: Result = + client.request(req.method_name(), WrapParams(req.params())).await; + match r { + Ok(value) => { + let payload = ResponsePayload::success(&value); + debug!(method = %req.method_name(), "Forwarding request to client"); + MethodResponse::response(req.id.clone(), payload.into(), 4_000_000_000usize) + } + Err(err) => { + error!(error = %err, "Error calling client"); + match err { + ClientError::Call(e) => MethodResponse::error(req.id.clone(), e), + _ => MethodResponse::error( + req.id.clone(), + ErrorObject::owned(INTERNAL_ERROR_CODE, "client error".to_string(), Some(err.to_string())), + ), + } + } + } +} diff --git a/based/bin/rpc/src/server.rs b/based/bin/rpc/src/server.rs new file mode 100644 index 000000000..bd848a4aa --- /dev/null +++ b/based/bin/rpc/src/server.rs @@ -0,0 +1,412 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use alloy_eips::{BlockId, BlockNumberOrTag}; +use alloy_primitives::{ + Address, B256, Bytes, U256, + map::foldhash::{HashMap, HashMapExt}, +}; +use alloy_provider::Provider; +use alloy_rpc_types::{BlockOverrides, Header, state::StateOverride}; +use bop_common::{ + p2p::{EnvV0, FragV0, SealV0, StateUpdate}, + transaction::Transaction, +}; +use eyre::Result; +use jsonrpsee::{ + core::{RpcResult, async_trait}, + http_client::HttpClientBuilder, + types::ErrorObject, +}; +use op_alloy_rpc_types::{OpTransactionReceipt, OpTransactionRequest}; +use parking_lot::RwLock; +use reqwest::Url; +use tokio::sync::oneshot; +use tracing::{debug, error}; +use ttlhashmap::TtlHashMap; + +use crate::{ + middleware::RpcClient, + types::{EthApiServer, OpRootProvider}, + unsealed_block::{UnsealedBlock, UnsealedBlockStack}, +}; + +#[derive(Clone)] +pub struct Server { + unsealed_stack: Arc>, + provider: OpRootProvider, + tx_receiver_provider: OpRootProvider, + tx_send_sync_waiter: Arc>>>, +} + +impl Server { + pub fn new(provider: OpRootProvider, tx_receiver_provider: OpRootProvider) -> Self { + Self { + unsealed_stack: Arc::new(RwLock::new(UnsealedBlockStack::new())), + provider, + tx_receiver_provider, + tx_send_sync_waiter: Arc::new(RwLock::new(TtlHashMap::new(Duration::from_millis(1000)))), /* TODO: make this configurable */ + } + } + + pub fn on_env(&self, env: EnvV0) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + if unsealed_block.blocks.is_empty() || unsealed_block.blocks.back().unwrap().env.number + 1 == env.number { + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.push_back(UnsealedBlock { + env, + current_frag: None, + transaction_count_diff: HashMap::new(), + receipts: HashMap::new(), + balances: HashMap::new(), + state_changes: HashMap::new(), + seal: None, + }); + }); + } else { + error!("expected block number"); + } + } + + pub fn on_header(&self, header: Header) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + while !unsealed_block.blocks.is_empty() && unsealed_block.blocks.front().unwrap().env.number <= header.number { + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.pop_front(); + blocks.rebuild_overrides(); + blocks.root_provider_block_number = Some(header.number); + }); + } + } + + pub fn on_seal(&self, seal: SealV0) { + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + if unsealed_block.blocks.is_empty() { + // error!("trying to seal a block but there is no unsealed block"); + return; + } + let last_block = unsealed_block.blocks.back().unwrap(); // unwrap is safe because we just checked that the stack is not empty + if last_block.env.number != seal.block_number { + error!( + "trying to seal block number {} but the last unsealed block is number {}", + seal.block_number, last_block.env.number + ); + return; + } + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.back_mut().unwrap().apply_seal(seal); + }); + } + + pub fn on_frag(&self, frag: FragV0, state_update: Option) { + self.notify_tx_send_sync_waiter(&state_update); + let mut unsealed_block = self.unsealed_stack.upgradable_read(); + if unsealed_block.blocks.is_empty() { + return; + } + let last_block = unsealed_block.blocks.back().unwrap(); // unwrap is safe because we just checked that the stack is not empty + if last_block.env.number != frag.block_number { + error!( + "trying to apply frag for block number {} but the last unsealed block is number {}", + frag.block_number, last_block.env.number + ); + return; + } + + unsealed_block.with_upgraded(|blocks| { + blocks.blocks.back_mut().unwrap().apply_frag(frag, state_update); + blocks.rebuild_overrides(); + }); + } + + pub fn notify_tx_send_sync_waiter(&self, state_update: &Option) { + if let Some(state_update) = &state_update { + let mut tx_send_sync_waiter = self.tx_send_sync_waiter.upgradable_read(); + for (tx_hash, receipt) in state_update.receipts.iter() { + let tx_hash_b256 = B256::from(*tx_hash); + if tx_send_sync_waiter.contains_key(&tx_hash_b256) { + tx_send_sync_waiter.with_upgraded(|waiter| { + let sender = waiter.remove(&tx_hash_b256).unwrap(); // unwrap is safe because we just checked that the key exists + let _ = sender.send(receipt.clone()); + }); + } + } + } + } + + pub async fn get_transaction_count(&self, address: Address) -> Result { + let (transaction_count_diff, root_provider_block_number) = { + let stack = self.unsealed_stack.read(); + (stack.get_transaction_count_diff(address), stack.root_provider_block_number) + }; + + if let Some(root_provider_block_number) = root_provider_block_number { + let transaction_count = self + .provider + .get_transaction_count(address) + .block_id(BlockId::number(root_provider_block_number)) + .await?; + return Ok(transaction_count_diff + transaction_count); + } + let transaction_count = self.provider.get_transaction_count(address).await?; + Ok(transaction_count_diff + transaction_count) + } + + pub async fn get_balance(&self, address: Address) -> Result { + let balance = self.unsealed_stack.read().get_balance(address); + if let Some(balance) = balance { + return Ok(balance); + } + let balance = self.provider.get_balance(address).await?; + Ok(balance) + } + + pub async fn get_receipt(&self, hash: B256) -> Result> { + if let Some(receipt) = self.unsealed_stack.read().get_receipt(hash) { + return Ok(Some(receipt)); + } + let receipt = self.tx_receiver_provider.get_transaction_receipt(hash).await?; + Ok(receipt) + } + + pub async fn block_number(&self) -> Result { + if let Some(block_number) = self.unsealed_stack.read().block_number() { + Ok(block_number) + } else { + Ok(self.provider.get_block_number().await?) + } + } + + pub fn get_state_overrides(&self) -> (BlockNumberOrTag, StateOverride) { + let unsealed_block = self.unsealed_stack.read(); + if let Some(base_block_number) = unsealed_block.root_provider_block_number { + (BlockNumberOrTag::Number(base_block_number), unsealed_block.overrides.clone()) + } else { + (BlockNumberOrTag::Latest, Default::default()) + } + } + + pub fn cleanup_tx_send_sync_waiter(&self) { + self.tx_send_sync_waiter.write().cleanup(); + } +} + +#[async_trait] +impl EthApiServer for Server { + async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult { + match self.tx_receiver_provider.send_raw_transaction(&bytes).await { + Ok(pending_tx) => { + debug!("sent transaction with hash {}", *pending_tx.tx_hash()); + Ok(*pending_tx.tx_hash()) + } + Err(e) => Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to send transaction", + Some(e.to_string()), + )), + } + } + + async fn send_raw_transaction_sync( + &self, + bytes: Bytes, + maybe_timeout_ms: Option, + ) -> RpcResult { + let timeout_ms = maybe_timeout_ms.unwrap_or(3000); + if timeout_ms > 3000 { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Timeout is too long, maximum is 3000ms", + Some("Timeout is too long, maximum is 3000ms".to_string()), + )); + } + + let tx_hash = match Transaction::decode(bytes.clone()) { + Ok(tx) => tx.tx_hash(), + Err(e) => { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INVALID_PARAMS_CODE, + "Invalid transaction bytes", + Some(e.to_string()), + )); + } + }; + let (tx_send_sync_waiter_tx, tx_send_sync_waiter_rx) = oneshot::channel(); + self.tx_send_sync_waiter.write().insert(tx_hash, tx_send_sync_waiter_tx); + + let start = Instant::now(); + match self.tx_receiver_provider.send_raw_transaction(&bytes).await { + Ok(_pending_tx) => { + // Transaction sent successfully, tx_hash already calculated above + } + Err(e) => { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to send transaction", + Some(e.to_string()), + )) + } + } + + debug!("sent transaction sync with hash {}", tx_hash); + + match tx_send_sync_waiter_rx.await { + Ok(receipt) => return Ok(receipt), + _ => { + debug!("waiting for transaction receipt, waiter dropped"); + } + } + + while start.elapsed() < Duration::from_millis(timeout_ms) { + if let Ok(Some(receipt)) = self.get_receipt(tx_hash).await { + return Ok(receipt); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + + Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Timeout waiting for transaction receipt", + Some("Timeout waiting for transaction receipt".to_string()), + )) + } + + async fn transaction_receipt(&self, hash: B256) -> RpcResult> { + match self.get_receipt(hash).await { + Ok(Some(receipt)) => Ok(Some(receipt)), + Ok(None) => Ok(None), + Err(e) => Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get transaction receipt", + Some(e.to_string()), + )), + } + } + + // async fn block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult> { + // todo!() + // } + + // async fn block_by_hash(&self, hash: B256, full: bool) -> RpcResult> { + // todo!() + // } + + async fn block_number(&self) -> RpcResult { + match self.block_number().await { + Ok(block_number) => Ok(U256::from(block_number)), + Err(e) => Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get block number", + Some(e.to_string()), + )), + } + } + + async fn transaction_count(&self, address: Address, block_number: Option) -> RpcResult { + if let Some(BlockId::Number(BlockNumberOrTag::Pending)) = block_number { + if let Ok(transaction_count) = self.get_transaction_count(address).await { + return Ok(U256::from(transaction_count)); + } + } + + let transaction_count = + self.provider.get_transaction_count(address).block_id(block_number.unwrap_or_default()).await.map_err( + |e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get transaction count", + Some(e.to_string()), + ) + }, + )?; + Ok(U256::from(transaction_count)) + } + + async fn balance(&self, address: Address, block_number: Option) -> RpcResult { + if let Some(BlockId::Number(BlockNumberOrTag::Pending)) = block_number { + if let Ok(balance) = self.get_balance(address).await { + return Ok(U256::from(balance)); + } + } + + let balance = + self.provider.get_balance(address).block_id(block_number.unwrap_or_default()).await.map_err(|e| { + ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to get balance", + Some(e.to_string()), + ) + })?; + Ok(U256::from(balance)) + } + + async fn call( + &self, + transaction: OpTransactionRequest, + block_number: Option, + state_overrides: Option, + block_overrides: Option, + ) -> RpcResult { + if let Some(BlockId::Number(BlockNumberOrTag::Pending)) = block_number { + if block_overrides.is_some() { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Block overrides are not supported", + Some("Block overrides are not supported".to_string()), + )); + } + + if state_overrides.is_some() { + return Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "State overrides are not supported", + Some("State overrides are not supported".to_string()), + )); + } + + let (base_block_number, state_overrides) = self.get_state_overrides(); + + let result = self + .provider + .call(transaction) + .block(BlockId::Number(base_block_number)) + .overrides(state_overrides) + .await; + match result { + Ok(result) => Ok(result), + Err(e) => Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to call", + Some(e.to_string()), + )), + } + } else { + let request = self + .provider + .call(transaction) + .block(block_number.unwrap_or_default()) + .overrides_opt(state_overrides) + .with_block_overrides_opt(block_overrides); + let result = request.await; + match result { + Ok(result) => Ok(result), + Err(e) => Err(ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + "Failed to call", + Some(e.to_string()), + )), + } + } + } +} + +pub fn create_client(url: Url, timeout: Duration) -> eyre::Result { + let client = HttpClientBuilder::default() + .max_request_size(u32::MAX) + .max_response_size(u32::MAX) + .request_timeout(timeout) + .build(url)?; + Ok(client) +} diff --git a/based/bin/rpc/src/types.rs b/based/bin/rpc/src/types.rs new file mode 100644 index 000000000..868773ca2 --- /dev/null +++ b/based/bin/rpc/src/types.rs @@ -0,0 +1,58 @@ +use alloy_eips::BlockId; +use alloy_primitives::{Address, B256, Bytes, U256}; +use alloy_provider::RootProvider; +use alloy_rpc_types::{BlockOverrides, state::StateOverride}; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use op_alloy_network::Optimism; +use op_alloy_rpc_types::{OpTransactionReceipt, OpTransactionRequest}; + +pub type OpRootProvider = RootProvider; + +// taken from https://github.com/gattaca-com/based-op/blob/397d48b73d088f40721ae0ba002d251dcf6f38cc/based/crates/common/src/api.rs#L91-L128 +#[rpc(client, server, namespace = "eth")] +pub trait EthApi { + /// Sends signed transaction, returning its hash + #[method(name = "sendRawTransaction")] + async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult; + + /// Sends signed transaction, waiting for it to be mined and returning the receipt + #[method(name = "sendRawTransactionSync")] + async fn send_raw_transaction_sync( + &self, + bytes: Bytes, + maybe_timeout_ms: Option, + ) -> RpcResult; + + /// Returns the receipt of a transaction by transaction hash + #[method(name = "getTransactionReceipt")] + async fn transaction_receipt(&self, hash: B256) -> RpcResult>; + + // /// Returns a block with a given identifier + // #[method(name = "getBlockByNumber")] + // async fn block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult>; + + // /// Returns information about a block by hash. + // #[method(name = "getBlockByHash")] + // async fn block_by_hash(&self, hash: B256, full: bool) -> RpcResult>; + + /// Returns the number of most recent block + #[method(name = "blockNumber")] + async fn block_number(&self) -> RpcResult; + + /// Returns the nonce of a given address at a given block number. + #[method(name = "getTransactionCount")] + async fn transaction_count(&self, address: Address, block_number: Option) -> RpcResult; + + /// Returns the balance of the account of given address. + #[method(name = "getBalance")] + async fn balance(&self, address: Address, block_number: Option) -> RpcResult; + + #[method(name = "call")] + async fn call( + &self, + transaction: OpTransactionRequest, + block_number: Option, + state_overrides: Option, + block_overrides: Option, + ) -> RpcResult; +} diff --git a/based/bin/rpc/src/unsealed_block.rs b/based/bin/rpc/src/unsealed_block.rs new file mode 100644 index 000000000..7a9bd5894 --- /dev/null +++ b/based/bin/rpc/src/unsealed_block.rs @@ -0,0 +1,161 @@ +use std::collections::VecDeque; + +use alloy_network::ReceiptResponse; +use alloy_primitives::{Address, B256, U256, map::foldhash::HashMap}; +use alloy_rpc_types::state::{AccountOverride, StateOverride}; +use bop_common::p2p::{DetailedStateChange, EnvV0, FragV0, SealV0, StateUpdate}; +use op_alloy_rpc_types::OpTransactionReceipt; +use tracing::error; + +pub struct UnsealedBlock { + pub env: EnvV0, + pub current_frag: Option, + pub transaction_count_diff: HashMap, + pub receipts: HashMap, + pub balances: HashMap, + pub state_changes: HashMap, + pub seal: Option, +} + +impl UnsealedBlock { + pub fn apply_frag(&mut self, frag: FragV0, state_update: Option) { + if let Some(current_frag) = &self.current_frag { + let expected_seq = current_frag.seq + 1; + if expected_seq != frag.seq { + error!("expected frag seq {} but got seq {}", expected_seq, frag.seq); + return; + } + } else if frag.seq != 0 { + error!("expected first frag to have seq 0 but got seq {}", frag.seq); + return; + } + + if self.seal.is_some() { + error!("trying to apply frag after seal"); + return; + } + + self.current_frag = Some(frag); + + if let Some(state_update) = state_update { + for (_tx_hash, receipt) in state_update.receipts.iter() { + let sender = receipt.from(); + self.transaction_count_diff.entry(sender).and_modify(|count| *count += 1).or_insert(1); + } + self.receipts.extend(state_update.receipts); + self.balances.extend(state_update.balances); + + if let Some(state_changes) = state_update.state_changes { + for (address, state_change) in state_changes.iter() { + let account = self.state_changes.entry(*address).or_default(); + account.balance = state_change.balance; + account.nonce = state_change.nonce; + account.storage.extend(state_change.storage.iter()); + } + } + } + } + + pub fn apply_seal(&mut self, seal: SealV0) { + self.seal = Some(seal); + } + + pub fn get_transaction_count_diff(&self, address: Address) -> Option { + self.transaction_count_diff.get(&address).cloned() + } + + pub fn get_receipt(&self, tx_hash: B256) -> Option { + self.receipts.get(&tx_hash).cloned() + } + + pub fn get_balance(&self, address: Address) -> Option { + self.balances.get(&address).cloned() + } + + pub fn get_state_changes(&self) -> HashMap { + self.state_changes.clone() + } +} + +pub struct UnsealedBlockStack { + pub blocks: VecDeque, + pub root_provider_block_number: Option, + pub overrides: StateOverride, +} + +impl UnsealedBlockStack { + pub fn new() -> Self { + Self { blocks: VecDeque::new(), root_provider_block_number: None, overrides: StateOverride::default() } + } + + pub fn get_transaction_count_diff(&self, address: Address) -> u64 { + let mut total_diff = 0; + for block in self.blocks.iter().rev() { + total_diff += block.get_transaction_count_diff(address).unwrap_or(0); + } + total_diff + } + + pub fn get_receipt(&self, tx_hash: B256) -> Option { + for block in self.blocks.iter().rev() { + if let Some(receipt) = block.get_receipt(tx_hash) { + return Some(receipt); + } + } + None + } + + pub fn get_balance(&self, address: Address) -> Option { + for block in self.blocks.iter().rev() { + if let Some(balance) = block.get_balance(address) { + return Some(balance); + } + } + None + } + + pub fn block_number(&self) -> Option { + if let Some(block) = self.blocks.back() { + return Some(block.env.number); + } + if let Some(root_provider_block_number) = self.root_provider_block_number { + return Some(root_provider_block_number); + } + None + } + + pub fn get_state_changes(&self) -> HashMap { + let mut state_changes = HashMap::default(); + for block in self.blocks.iter() { + for (address, state_change) in block.get_state_changes().iter() { + let account = state_changes.entry(*address).or_insert_with(DetailedStateChange::default); + account.balance = state_change.balance; + account.nonce = state_change.nonce; + account.storage.extend(state_change.storage.iter()); + } + } + state_changes + } + + pub fn rebuild_overrides(&mut self) { + let mut state_overrides_full = StateOverride::default(); + + let state_overrides_unsealed_block = self.get_state_changes(); + for (address, state_change) in state_overrides_unsealed_block.iter() { + let account = state_overrides_full.entry(*address).or_insert_with(AccountOverride::default); + account.balance = Some(state_change.balance); + account.nonce = Some(state_change.nonce); + if !state_change.storage.is_empty() { + if account.state_diff.is_none() { + account.state_diff = Some(Default::default()); + } + let account_storage = account.state_diff.as_mut().unwrap(); + for (slot, value) in state_change.storage.iter() { + account_storage.insert((*slot).into(), (*value).into()); + } + } + } + + self.overrides = state_overrides_full; + } +} diff --git a/based/bin/txspammer/Cargo.toml b/based/bin/txspammer/Cargo.toml index b22212d90..fcf7891c6 100644 --- a/based/bin/txspammer/Cargo.toml +++ b/based/bin/txspammer/Cargo.toml @@ -8,17 +8,17 @@ version.workspace = true alloy-consensus.workspace = true alloy-eips.workspace = true alloy-primitives.workspace = true -alloy-provider.workspace = true +alloy-provider = "1.1.2" # Need this for eth_sendRawTransactionSync alloy-rpc-types.workspace = true alloy-signer.workspace = true alloy-signer-local.workspace = true bop-common.workspace = true -bop-metrics.workspace = true + clap.workspace = true eyre.workspace = true futures.workspace = true -futures-util = "0.3.31" +futures-util.workspace = true http.workspace = true jsonrpsee.workspace = true op-alloy-rpc-types.workspace = true @@ -31,7 +31,7 @@ serde.workspace = true serde_json.workspace = true thiserror.workspace = true tokio.workspace = true -tokio-websockets = { version = "0.12.1", features = ["client", "openssl", "rand", "server"] } +tokio-websockets.workspace = true tower.workspace = true tower-http.workspace = true tracing.workspace = true diff --git a/based/bin/txspammer/src/account.rs b/based/bin/txspammer/src/account.rs index e60ef40ce..993203f53 100644 --- a/based/bin/txspammer/src/account.rs +++ b/based/bin/txspammer/src/account.rs @@ -2,7 +2,7 @@ use alloy_consensus::{SignableTransaction, TxEip1559, TxEnvelope}; use alloy_eips::eip2718::Encodable2718; use alloy_primitives::{B256, Bytes, U256}; use alloy_provider::{Provider, RootProvider}; -use alloy_rpc_types::AccessList; +use alloy_rpc_types::{AccessList, TransactionReceipt}; use alloy_signer::SignerSync; use alloy_signer_local::PrivateKeySigner; @@ -90,6 +90,39 @@ impl Account { Ok(*tx.tx_hash()) } + pub async fn transfer_sync( + &mut self, + to: &mut Account, + spec: &TxSpec, + provider: &RootProvider, + sequencer: &Option, + ) -> eyre::Result { + let tx = TxEip1559 { + chain_id: spec.chain_id, + nonce: self.nonce, + gas_limit: spec.gas_limit, + max_fee_per_gas: spec.max_fee_per_gas, + max_priority_fee_per_gas: spec.max_priority_fee_per_gas, + to: to.signer.address().into(), + value: spec.value, + access_list: AccessList::default(), + input: Bytes::new(), + }; + + let sig = self.signer.sign_hash_sync(&tx.signature_hash()).unwrap(); + let tx: TxEnvelope = tx.into_signed(sig).into(); + let encoded = tx.encoded_2718(); + let provider_to_use = sequencer.as_ref().unwrap_or(provider); + + let receipt = provider_to_use.send_raw_transaction_sync(&encoded).await.unwrap(); + + self.nonce += 1; + self.balance -= spec.value + U256::from(spec.gas_limit) * U256::from(spec.max_fee_per_gas); + to.balance += spec.value; + + Ok(receipt) + } + pub async fn _self_transfer( &mut self, spec: &TxSpec, diff --git a/based/bin/txspammer/src/cli.rs b/based/bin/txspammer/src/cli.rs index 95d3b105d..c0014afff 100644 --- a/based/bin/txspammer/src/cli.rs +++ b/based/bin/txspammer/src/cli.rs @@ -40,6 +40,10 @@ pub struct TxSpammerArgs { /// Max priority fee per gas (miner tip) for EIP-1559 transactions (in Wei). #[arg(long = "max_priority_fee_per_gas", default_value_t = 20)] pub max_priority_fee_per_gas: u128, + /// Send eth_sendRawTransactionSync. this sends the transaction and waits for the receipt over and over to test + /// inclusion latency. + #[arg(long = "send_sync", action = clap::ArgAction::SetTrue)] + pub send_sync: bool, // --- Network Configuration --- /// The JSON-RPC URL of the Ethereum node (HTTP or WebSocket). diff --git a/based/bin/txspammer/src/main.rs b/based/bin/txspammer/src/main.rs index c41fb5ce9..1b42b5031 100644 --- a/based/bin/txspammer/src/main.rs +++ b/based/bin/txspammer/src/main.rs @@ -39,6 +39,7 @@ struct TxSpammer { sequencer: Option, root_account: Account, target_accounts: Vec, + send_sync_account: Option, tx_spec: TxSpec, args: TxSpammerArgs, request_rx: Option>, @@ -90,6 +91,7 @@ impl TxSpammer { sequencer, root_account, target_accounts: Vec::new(), + send_sync_account: None, tx_spec, args, request_rx: Some(request_rx), @@ -130,6 +132,12 @@ impl TxSpammer { } self.target_accounts = target_accounts; + + if self.args.send_sync { + let mut account = Account::new(account_generator.next()); + account.refresh(provider).await.expect("failed to fetch account nonce and balance"); + self.send_sync_account = Some(account); + } } pub async fn fund_target_accounts(&mut self) { @@ -139,7 +147,7 @@ impl TxSpammer { let threshold = parse_ether("0.01").unwrap(); // Fund target accounts - for account in self.target_accounts.iter_mut() { + for account in self.target_accounts.iter_mut().chain(self.send_sync_account.iter_mut()) { let amount_to_fund = funding_amount.saturating_sub(account.balance); if amount_to_fund.is_zero() || amount_to_fund < threshold { continue; @@ -166,7 +174,7 @@ impl TxSpammer { sleep(std::time::Duration::from_secs(1)).await; // Verifying account states - for account in self.target_accounts.iter_mut() { + for account in self.target_accounts.iter_mut().chain(self.send_sync_account.iter_mut()) { account.refresh(provider).await.expect("failed to fetch account nonce and balance"); if funding_amount.saturating_sub(account.balance) > threshold { panic!( @@ -178,6 +186,7 @@ impl TxSpammer { print!("Verifying {:?} balance: {} eth \r", account.signer.address(), format_ether(account.balance)); stdout().flush().unwrap(); } + info!("All {} target accounts are funded and ready. ", self.target_accounts.len()); } @@ -219,7 +228,11 @@ impl TxSpammer { let _ = loop { match provider.get_transaction_receipt(tx_hash).await { Ok(Some(receipt)) => break receipt, - _ => { + Err(e) => { + warn!("failed to get transaction receipt: {}", e); + sleep(Duration::from_millis(5)).await; + } + Ok(None) => { sleep(Duration::from_millis(5)).await; } } @@ -302,6 +315,42 @@ impl TxSpammer { }); } } + + pub fn spawn_sync_spammer(&self) { + let Some(mut send_sync_account) = self.send_sync_account.clone() else { + return; + }; + let full_provider = self.full_provider.clone(); + let tx_spec = self.tx_spec.clone(); + let mut accounts_clone = self.target_accounts.clone(); + let mut rag2 = rand::rngs::StdRng::seed_from_u64(Instant::now().elapsed().as_nanos() as u64); + let n = accounts_clone.len(); + let mut latencies = Percentile::new(); + let mut log_timer = Instant::now(); + tokio::spawn(async move { + loop { + let to = &mut accounts_clone[rag2.random_range(0..n)]; + let start_sending_at = Instant::now(); + send_sync_account.transfer_sync(to, &tx_spec, &full_provider, &None).await.expect("failed to send tx"); + let latency = start_sending_at.elapsed(); + latencies.add(latency.as_secs_f64()); + + if log_timer.elapsed() > Duration::from_secs(5) { + let p50 = latencies.percentile(50.0).unwrap_or(0.0); + let p95 = latencies.percentile(95.0).unwrap_or(0.0); + let p99 = latencies.percentile(99.0).unwrap_or(0.0); + info!( + "Send sync: Last 5s: {} tx confirmed, Latency P50: {:.2}s, P95: {:.2}s, P99: {:.2}s", + latencies.len(), + p50, + p95, + p99 + ); + log_timer = Instant::now(); + } + } + }); + } } #[tokio::main] @@ -337,6 +386,10 @@ async fn main() -> eyre::Result<()> { spammer.spawn_stats_logger(); spammer.spawn_spammer(); + if spammer.args.send_sync { + spammer.spawn_sync_spammer(); + } + tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c"); info!("Received Ctrl-C, shutting down..."); diff --git a/based/crates/common/src/db/frag.rs b/based/crates/common/src/db/frag.rs index 738cbec9d..e0b122927 100644 --- a/based/crates/common/src/db/frag.rs +++ b/based/crates/common/src/db/frag.rs @@ -51,7 +51,7 @@ impl DBFrag { let mut guard = self.db.write(); for t in txs { - let evm_state = std::mem::take(&mut t.result_and_state.state); + let evm_state = t.result_and_state.state.clone(); for a in evm_state.keys() { let _ = guard.load_cache_account(*a); } diff --git a/based/crates/common/src/p2p.rs b/based/crates/common/src/p2p.rs index aed897d32..a4e4a7a74 100644 --- a/based/crates/common/src/p2p.rs +++ b/based/crates/common/src/p2p.rs @@ -17,17 +17,17 @@ pub type ExtraData = VariableList; #[derive(Debug, Clone, PartialEq, Eq, TreeHash, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct EnvV0 { - number: u64, - parent_hash: B256, - beneficiary: Address, - timestamp: u64, - gas_limit: u64, - basefee: u64, - difficulty: U256, - prevrandao: B256, + pub number: u64, + pub parent_hash: B256, + pub beneficiary: Address, + pub timestamp: u64, + pub gas_limit: u64, + pub basefee: u64, + pub difficulty: U256, + pub prevrandao: B256, #[serde(with = "ssz_types::serde_utils::hex_var_list")] - extra_data: ExtraData, - parent_beacon_block_root: B256, + pub extra_data: ExtraData, + pub parent_beacon_block_root: B256, } impl EnvV0 { @@ -159,6 +159,16 @@ pub struct StateUpdate { pub receipts: HashMap, /// Updated balances for all accounts in txs in FragV0 pub balances: HashMap, + /// Includes storage + #[serde(skip_serializing_if = "Option::is_none")] + pub state_changes: Option>, +} + +#[derive(Default, Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] +pub struct DetailedStateChange { + pub balance: U256, + pub nonce: u64, + pub storage: HashMap, } impl SignedVersionedMessage { diff --git a/based/crates/rpc/src/gossiper.rs b/based/crates/rpc/src/gossiper.rs index 8a539a3ef..38912d078 100644 --- a/based/crates/rpc/src/gossiper.rs +++ b/based/crates/rpc/src/gossiper.rs @@ -37,8 +37,8 @@ impl Gossiper { let signed = msg.to_signed(&self.signer); let payload = signed.to_json(); - if matches!(signed.message, p2p::VersionedMessage::FragV0(_)) && self.frag_broadcast.send(signed).is_err() { - tracing::debug!("broadcast of frag failed") + if self.frag_broadcast.send(signed).is_err() { + tracing::debug!("broadcast of message failed") } let res = match self.client.post(self.target_rpc.clone()).json(&payload).send() { diff --git a/based/crates/sequencer/src/sorting/frag_sequence.rs b/based/crates/sequencer/src/sorting/frag_sequence.rs index 5c184c4ae..2686cd43a 100644 --- a/based/crates/sequencer/src/sorting/frag_sequence.rs +++ b/based/crates/sequencer/src/sorting/frag_sequence.rs @@ -2,7 +2,7 @@ use alloy_consensus::proofs::ordered_trie_root_with_encoder; use alloy_eips::eip2718::Encodable2718; use alloy_primitives::{Bloom, U256}; use bop_common::{ - p2p::{FragV0, StateUpdate, Transaction, Transactions}, + p2p::{DetailedStateChange, FragV0, StateUpdate, Transaction, Transactions}, telemetry::TelemetryUpdate, time::Instant, transaction::SimulatedTx, @@ -82,6 +82,7 @@ impl FragSequence { let mut txs = Vec::with_capacity(in_sort.txs.len()); let mut receipts = HashMap::with_capacity(in_sort.txs.len()); let mut balances = HashMap::with_capacity(in_sort.txs.len()); + let mut state_changes = HashMap::default(); let mut in_sort_da_used = 0; let in_sort_txs = in_sort.txs.len(); @@ -100,10 +101,18 @@ impl FragSequence { self.txs.len() as u64, ), ); - let address = tx.sender(); - let balance = - in_sort.db.basic_ref(address).map(|a| a.map(|a| a.balance).unwrap_or_default()).unwrap_or(U256::ZERO); - balances.insert(address, balance); + + for (address, account) in tx.result_and_state.state.iter() { + let state_change: &mut DetailedStateChange = state_changes.entry(*address).or_default(); + state_change.balance = account.info.balance; + state_change.nonce = account.info.nonce; + for (slot, value) in account.storage.iter() { + let slot: U256 = *slot; + let value: U256 = value.present_value(); + state_change.storage.insert(slot, value); + } + balances.insert(*address, account.info.balance); + } self.txs.push(tx); } @@ -120,7 +129,7 @@ impl FragSequence { txs: Transactions::from(txs), blob_gas_used: 0, }; - let state_update = StateUpdate { receipts, balances }; + let state_update = StateUpdate { receipts, balances, state_changes: Some(state_changes) }; TelemetryUpdate::send( uuid, @@ -296,7 +305,7 @@ mod tests { let (_frag, _, _sorting_db) = ctx.seal_frag(sorting_db, &mut seq); // Seal the block - let (_seal, payload) = ctx.seal_block(seq); + let (_seal, payload) = ctx.seal_block(seq, None); assert_eq!(block.hash_slow(), payload.execution_payload.payload_inner.payload_inner.payload_inner.block_hash); } } diff --git a/fetching_balance.py b/fetching_balance.py index ab6207949..500882844 100644 --- a/fetching_balance.py +++ b/fetching_balance.py @@ -1,29 +1,90 @@ import requests import time -previous_balance = 0 -previous_at = 0 -while True: +previous_balance = None +previous_at = time.time() + +url = "http://localhost:7545" + +token_address = "0x71a49e7ff0865d2de258e720782951879645df1b" +token_holder_address = "0x47Bae705382e91664F369d79aD8EcB8fDF23D355" +address = "0x0E2d15588e765f0ba315313C726041EA124e36CB" + +def fetch_erc20_balance(address: str, token_address: str) -> float: + # Remove '0x' prefix and pad address to 64 hex characters (32 bytes) + address_clean = address.lower().replace('0x', '') + address_padded = address_clean.zfill(64) + + # balanceOf(address) function selector is 0x70a08231 + # Format: function_selector (4 bytes) + address (32 bytes) + data = f"0x70a08231{address_padded}" + body = { "jsonrpc": "2.0", "id": 1, - "method": "eth_getBalance", + "method": "eth_call", "params": [ - "0x4D36DE6a194dDF98EE57323CfA3A45351d35e442", - "latest" + {"to": token_address, "data": data}, + "pending" ], } + + response = requests.post(url, json=body, timeout=5) + response.raise_for_status() + + result = response.json() + + # Check for RPC errors + if 'error' in result: + raise Exception(f"RPC error: {result['error']}") + + result_hex = result.get('result', '0x0') + if not result_hex or result_hex == '0x': + return 0.0 + + balance = int(result_hex, 16) / 1e18 + return balance + +def fetch_balance(address: str) -> float: + body = { + "jsonrpc": "2.0", + "id": 1, + "method": "eth_getBalance", + "params": [address, "pending"], + } + response = requests.post(url, json=body, timeout=5) + response.raise_for_status() + + result = response.json() + + # Check for RPC errors + if 'error' in result: + raise Exception(f"RPC error: {result['error']}") + + result_hex = result.get('result', '0x0') + if not result_hex or result_hex == '0x': + return 0.0 + + balance = int(result_hex, 16) / 1e18 + return balance + +while True: try: - response = requests.post("http://localhost:8645", json=body) - balance = int(response.json().get('result', '0x0'), 16) / 1e18 + # balance = fetch_balance(address) + balance = fetch_erc20_balance(token_holder_address, token_address) + + + if previous_balance is None or balance != previous_balance: + elapsed = time.time() - previous_at if previous_balance is not None else 0 + previous_at = time.time() + if previous_balance is None: + print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Initial balance: {balance:.10f} ETH") + else: + print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Balance changed: {previous_balance:.10f} ETH -> {balance:.10f} ETH (elapsed: {elapsed*1000:.1f}ms)") + previous_balance = balance except Exception as e: - balance = 0 print(f"Error fetching balance: {e}") - - if balance != previous_balance: - elapsed = time.time() - previous_at - previous_at = time.time() - print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Balance changed: {previous_balance:.10f} ETH -> {balance:.10f} ETH (elapsed: {elapsed*1000:.1f}ms)") - previous_balance = balance + time.sleep(1) # Wait longer on error + continue time.sleep(1/60) \ No newline at end of file