diff --git a/Cargo.lock b/Cargo.lock index 75dc5b2a..64ccd6ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4896,7 +4896,7 @@ dependencies = [ [[package]] name = "flux" version = "0.0.38" -source = "git+https://github.com/gattaca-com/flux?rev=67322430e54abe9250bfafdc42ef11fde3405cfd#67322430e54abe9250bfafdc42ef11fde3405cfd" +source = "git+https://github.com/gattaca-com/flux?rev=e70d4a2c77758e21b2a8c025419ea867f5fa86c3#e70d4a2c77758e21b2a8c025419ea867f5fa86c3" dependencies = [ "bitcode", "core_affinity", @@ -4919,7 +4919,7 @@ dependencies = [ [[package]] name = "flux-communication" version = "0.0.27" -source = "git+https://github.com/gattaca-com/flux?rev=67322430e54abe9250bfafdc42ef11fde3405cfd#67322430e54abe9250bfafdc42ef11fde3405cfd" +source = "git+https://github.com/gattaca-com/flux?rev=e70d4a2c77758e21b2a8c025419ea867f5fa86c3#e70d4a2c77758e21b2a8c025419ea867f5fa86c3" dependencies = [ "directories", "flux-timing", @@ -4934,7 +4934,7 @@ dependencies = [ [[package]] name = "flux-network" version = "0.0.8" -source = "git+https://github.com/gattaca-com/flux?rev=67322430e54abe9250bfafdc42ef11fde3405cfd#67322430e54abe9250bfafdc42ef11fde3405cfd" +source = "git+https://github.com/gattaca-com/flux?rev=e70d4a2c77758e21b2a8c025419ea867f5fa86c3#e70d4a2c77758e21b2a8c025419ea867f5fa86c3" dependencies = [ "flux-communication", "flux-timing", @@ -4947,7 +4947,7 @@ dependencies = [ [[package]] name = "flux-timing" version = "0.0.16" -source = "git+https://github.com/gattaca-com/flux?rev=67322430e54abe9250bfafdc42ef11fde3405cfd#67322430e54abe9250bfafdc42ef11fde3405cfd" +source = "git+https://github.com/gattaca-com/flux?rev=e70d4a2c77758e21b2a8c025419ea867f5fa86c3#e70d4a2c77758e21b2a8c025419ea867f5fa86c3" dependencies = [ "bitcode", "chrono", @@ -4965,7 +4965,7 @@ dependencies = [ [[package]] name = "flux-utils" version = "0.0.23" -source = "git+https://github.com/gattaca-com/flux?rev=67322430e54abe9250bfafdc42ef11fde3405cfd#67322430e54abe9250bfafdc42ef11fde3405cfd" +source = "git+https://github.com/gattaca-com/flux?rev=e70d4a2c77758e21b2a8c025419ea867f5fa86c3#e70d4a2c77758e21b2a8c025419ea867f5fa86c3" dependencies = [ "core_affinity", "directories", @@ -6202,7 +6202,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.62.2", + "windows-core 0.57.0", ] [[package]] @@ -13422,7 +13422,7 @@ dependencies = [ [[package]] name = "spine-derive" version = "0.0.10" -source = "git+https://github.com/gattaca-com/flux?rev=67322430e54abe9250bfafdc42ef11fde3405cfd#67322430e54abe9250bfafdc42ef11fde3405cfd" +source = "git+https://github.com/gattaca-com/flux?rev=e70d4a2c77758e21b2a8c025419ea867f5fa86c3#e70d4a2c77758e21b2a8c025419ea867f5fa86c3" dependencies = [ "proc-macro2", "quote", @@ -14765,12 +14765,12 @@ dependencies = [ [[package]] name = "type-hash" version = "0.0.1" -source = "git+https://github.com/gattaca-com/flux?rev=67322430e54abe9250bfafdc42ef11fde3405cfd#67322430e54abe9250bfafdc42ef11fde3405cfd" +source = "git+https://github.com/gattaca-com/flux?rev=e70d4a2c77758e21b2a8c025419ea867f5fa86c3#e70d4a2c77758e21b2a8c025419ea867f5fa86c3" [[package]] name = "type-hash-derive" version = "0.0.4" -source = "git+https://github.com/gattaca-com/flux?rev=67322430e54abe9250bfafdc42ef11fde3405cfd#67322430e54abe9250bfafdc42ef11fde3405cfd" +source = "git+https://github.com/gattaca-com/flux?rev=e70d4a2c77758e21b2a8c025419ea867f5fa86c3#e70d4a2c77758e21b2a8c025419ea867f5fa86c3" dependencies = [ "proc-macro-crate", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index 6f973d9f..07202d6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,11 +43,11 @@ eyre = "0.6.12" aws-sdk-s3 = "=1.35.0" flate2 = "1.0" -flux = { git = "https://github.com/gattaca-com/flux", rev = "67322430e54abe9250bfafdc42ef11fde3405cfd" } -flux-network = { package = "flux-network", git = "https://github.com/gattaca-com/flux", rev = "67322430e54abe9250bfafdc42ef11fde3405cfd" } -flux-utils = { package = "flux-utils", git = "https://github.com/gattaca-com/flux", rev = "67322430e54abe9250bfafdc42ef11fde3405cfd" } -flux-type-hash = { package = "type-hash", git = "https://github.com/gattaca-com/flux", rev = "67322430e54abe9250bfafdc42ef11fde3405cfd" } -flux-type-hash-derive = { package = "type-hash-derive", git = "https://github.com/gattaca-com/flux", rev = "67322430e54abe9250bfafdc42ef11fde3405cfd" } +flux = { git = "https://github.com/gattaca-com/flux", rev = "e70d4a2c77758e21b2a8c025419ea867f5fa86c3" } +flux-network = { package = "flux-network", git = "https://github.com/gattaca-com/flux", rev = "e70d4a2c77758e21b2a8c025419ea867f5fa86c3" } +flux-utils = { package = "flux-utils", git = "https://github.com/gattaca-com/flux", rev = "e70d4a2c77758e21b2a8c025419ea867f5fa86c3" } +flux-type-hash = { package = "type-hash", git = "https://github.com/gattaca-com/flux", rev = "e70d4a2c77758e21b2a8c025419ea867f5fa86c3" } +flux-type-hash-derive = { package = "type-hash-derive", git = "https://github.com/gattaca-com/flux", rev = "e70d4a2c77758e21b2a8c025419ea867f5fa86c3" } futures = "0.3" futures-util = { version = "0.3", features = ["compat"] } diff --git a/crates/relay/src/api/builder/submit_block.rs b/crates/relay/src/api/builder/submit_block.rs index e7ab7a8f..af4990ea 100644 --- a/crates/relay/src/api/builder/submit_block.rs +++ b/crates/relay/src/api/builder/submit_block.rs @@ -62,6 +62,7 @@ impl BuilderApi { Ok(dref) => api.producer.produce_with_ingestion( NewBidSubmission { dref, + payload_offset: 0, header, submission_ref: SubmissionRef::Http(future_ix), trace, diff --git a/crates/relay/src/bid_decoder/tile.rs b/crates/relay/src/bid_decoder/tile.rs index df9c8737..a29f6751 100644 --- a/crates/relay/src/bid_decoder/tile.rs +++ b/crates/relay/src/bid_decoder/tile.rs @@ -52,7 +52,8 @@ impl Tile for DecoderTile { adapter.consume_internal_message( |new_bid: &mut InternalMessage, producers| match self.submissions.map( new_bid.dref, - |payload| { + |full_payload| { + let payload = &full_payload[new_bid.payload_offset..]; let sent_at = new_bid.tracking_timestamp().publish_t(); Self::handle_block_submission( &self.cache, diff --git a/crates/relay/src/spine/messages.rs b/crates/relay/src/spine/messages.rs index 55eae64e..c9808f25 100644 --- a/crates/relay/src/spine/messages.rs +++ b/crates/relay/src/spine/messages.rs @@ -12,6 +12,7 @@ use crate::{ #[derive(Debug, Clone, Copy)] pub struct NewBidSubmission { pub dref: DCacheRef, + pub payload_offset: usize, pub submission_ref: SubmissionRef, pub header: InternalBidSubmissionHeader, pub trace: SubmissionTrace, diff --git a/crates/relay/src/tcp_bid_recv/mod.rs b/crates/relay/src/tcp_bid_recv/mod.rs index f6e88fd8..3a9a3666 100644 --- a/crates/relay/src/tcp_bid_recv/mod.rs +++ b/crates/relay/src/tcp_bid_recv/mod.rs @@ -4,7 +4,7 @@ use dashmap::DashMap; use flux::{spine::FluxSpine, tile::Tile, timing::Nanos}; use flux_network::{ Token, - tcp::{PollEvent, SendBehavior, TcpConnector, TcpTelemetry}, + tcp::{MessagePayload, PollEvent, SendBehavior, TcpConnector, TcpTelemetry}, }; use flux_utils::DCache; use helix_common::{SubmissionTrace, metrics::SUB_CLIENT_TO_SERVER_LATENCY, utils::utcnow_ns}; @@ -53,7 +53,8 @@ impl BidSubmissionTcpListener { ) -> Self { let mut listener = TcpConnector::default() .with_telemetry(TcpTelemetry::Enabled { app_name: HelixSpine::app_name() }) - .with_socket_buf_size(64 * 1024 * 1024); // 64MB + .with_socket_buf_size(64 * 1024 * 1024) // 64MB + .with_dcache(submissions.clone()); listener.listen_at(listener_addr).expect("failed to initialise the TCP listener"); @@ -82,10 +83,18 @@ impl Tile for BidSubmissionTcpListener { self.registered.remove(&token); } PollEvent::Message { token, payload, send_ts } => { + let MessagePayload::Cached(dref) = payload else { + tracing::error!("expected dcache-backed payload; dcache not configured"); + return; + }; + if let Some(expected_pubkey) = self.registered.get(&token) { - let header = match BidSubmissionHeader::try_from(payload) { - Ok(header) => header, - Err(e) => { + let header = match self + .submissions + .map(dref, |bytes| BidSubmissionHeader::try_from(bytes)) + { + Ok(Ok(header)) => header, + Ok(Err(e)) => { tracing::error!("{e} failed to parse the bid submission header"); self.submission_errors.push(( token, @@ -93,7 +102,16 @@ impl Tile for BidSubmissionTcpListener { None, BidSubmissionError::ParseError(e), )); - + return; + } + Err(e) => { + tracing::error!("{e} dcache read failed for tcp header"); + self.submission_errors.push(( + token, + None, + None, + BidSubmissionError::InternalError, + )); return; } }; @@ -108,7 +126,6 @@ impl Tile for BidSubmissionTcpListener { block_hash = tracing::field::Empty, ) .entered(); - let body = &payload[BID_SUB_HEADER_SIZE..]; let submission_ref = SubmissionRef::Tcp { id, token, seq_num: header.sequence_number }; @@ -124,29 +141,17 @@ impl Tile for BidSubmissionTcpListener { ..Default::default() }; - match self.submissions.write(body.len(), |buffer| buffer.copy_from_slice(body)) - { - Ok(dref) => adapter.produce(NewBidSubmission { - dref, - header: InternalBidSubmissionHeader::from_tcp_header(id, header), - submission_ref, - trace, - expected_pubkey: Some(*expected_pubkey), - }), - Err(e) => { - tracing::error!("{e} failed to write the bid submission into dcache"); - self.submission_errors.push(( - token, - Some(header.sequence_number), - Some(id), - BidSubmissionError::InternalError, - )); - } - } + adapter.produce(NewBidSubmission { + dref, + payload_offset: BID_SUB_HEADER_SIZE, + header: InternalBidSubmissionHeader::from_tcp_header(id, header), + submission_ref, + trace, + expected_pubkey: Some(*expected_pubkey), + }); } else { - let registration_message = RegistrationMsg::from_ssz_bytes(payload); - match registration_message { - Ok(msg) => { + match self.submissions.map(dref, RegistrationMsg::from_ssz_bytes) { + Ok(Ok(msg)) => { let api_key = Uuid::from_bytes(msg.api_key).to_string(); if self .api_key_cache @@ -163,10 +168,14 @@ impl Tile for BidSubmissionTcpListener { self.to_disconnect.push(token); } } - Err(e) => { + Ok(Err(e)) => { tracing::error!(err=?e, "invalid registration message"); self.to_disconnect.push(token); } + Err(e) => { + tracing::error!("{e} dcache read failed for registration message"); + self.to_disconnect.push(token); + } } } } diff --git a/crates/relay/src/tcp_bid_recv/s3.rs b/crates/relay/src/tcp_bid_recv/s3.rs index dcd107c2..53ff041d 100644 --- a/crates/relay/src/tcp_bid_recv/s3.rs +++ b/crates/relay/src/tcp_bid_recv/s3.rs @@ -43,7 +43,9 @@ impl S3PayloadSaver { let header = r.header.to_bytes(); let header_slice = header.as_slice(); let header_len = header_slice.len() as u16; - match submissions.map(r.dref, |payload| { + let payload_offset = r.payload_offset; + match submissions.map(r.dref, |full_payload| { + let payload = &full_payload[payload_offset..]; // format: [u16 LE header_len][header bytes][payload bytes] let mut buf = bytes::BytesMut::with_capacity(2 + header_slice.len() + payload.len());