Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ eyre = "0.6.12"
aws-sdk-s3 = "=1.35.0"
flate2 = "1.0"

flux = { git = "https://github.com/gattaca-com/flux", rev = "67322430e54abe9250bfafdc42ef11fde3405cfd" }
flux-network = { package = "flux-network", git = "https://github.com/gattaca-com/flux", rev = "67322430e54abe9250bfafdc42ef11fde3405cfd" }
flux-utils = { package = "flux-utils", git = "https://github.com/gattaca-com/flux", rev = "67322430e54abe9250bfafdc42ef11fde3405cfd" }
flux-type-hash = { package = "type-hash", git = "https://github.com/gattaca-com/flux", rev = "67322430e54abe9250bfafdc42ef11fde3405cfd" }
flux-type-hash-derive = { package = "type-hash-derive", git = "https://github.com/gattaca-com/flux", rev = "67322430e54abe9250bfafdc42ef11fde3405cfd" }
flux = { git = "https://github.com/gattaca-com/flux", rev = "e70d4a2c77758e21b2a8c025419ea867f5fa86c3" }
flux-network = { package = "flux-network", git = "https://github.com/gattaca-com/flux", rev = "e70d4a2c77758e21b2a8c025419ea867f5fa86c3" }
flux-utils = { package = "flux-utils", git = "https://github.com/gattaca-com/flux", rev = "e70d4a2c77758e21b2a8c025419ea867f5fa86c3" }
flux-type-hash = { package = "type-hash", git = "https://github.com/gattaca-com/flux", rev = "e70d4a2c77758e21b2a8c025419ea867f5fa86c3" }
flux-type-hash-derive = { package = "type-hash-derive", git = "https://github.com/gattaca-com/flux", rev = "e70d4a2c77758e21b2a8c025419ea867f5fa86c3" }

futures = "0.3"
futures-util = { version = "0.3", features = ["compat"] }
Expand Down
1 change: 1 addition & 0 deletions crates/relay/src/api/builder/submit_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl<A: Api> BuilderApi<A> {
Ok(dref) => api.producer.produce_with_ingestion(
NewBidSubmission {
dref,
payload_offset: 0,
header,
submission_ref: SubmissionRef::Http(future_ix),
trace,
Expand Down
3 changes: 2 additions & 1 deletion crates/relay/src/bid_decoder/tile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ impl Tile<HelixSpine> for DecoderTile {
adapter.consume_internal_message(
|new_bid: &mut InternalMessage<NewBidSubmission>, producers| match self.submissions.map(
new_bid.dref,
|payload| {
|full_payload| {
let payload = &full_payload[new_bid.payload_offset..];
let sent_at = new_bid.tracking_timestamp().publish_t();
Self::handle_block_submission(
&self.cache,
Expand Down
1 change: 1 addition & 0 deletions crates/relay/src/spine/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
#[derive(Debug, Clone, Copy)]
pub struct NewBidSubmission {
pub dref: DCacheRef,
pub payload_offset: usize,
pub submission_ref: SubmissionRef,
pub header: InternalBidSubmissionHeader,
pub trace: SubmissionTrace,
Expand Down
69 changes: 39 additions & 30 deletions crates/relay/src/tcp_bid_recv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use dashmap::DashMap;
use flux::{spine::FluxSpine, tile::Tile, timing::Nanos};
use flux_network::{
Token,
tcp::{PollEvent, SendBehavior, TcpConnector, TcpTelemetry},
tcp::{MessagePayload, PollEvent, SendBehavior, TcpConnector, TcpTelemetry},
};
use flux_utils::DCache;
use helix_common::{SubmissionTrace, metrics::SUB_CLIENT_TO_SERVER_LATENCY, utils::utcnow_ns};
Expand Down Expand Up @@ -53,7 +53,8 @@ impl BidSubmissionTcpListener {
) -> Self {
let mut listener = TcpConnector::default()
.with_telemetry(TcpTelemetry::Enabled { app_name: HelixSpine::app_name() })
.with_socket_buf_size(64 * 1024 * 1024); // 64MB
.with_socket_buf_size(64 * 1024 * 1024) // 64MB
.with_dcache(submissions.clone());

listener.listen_at(listener_addr).expect("failed to initialise the TCP listener");

Expand Down Expand Up @@ -82,18 +83,35 @@ impl Tile<HelixSpine> for BidSubmissionTcpListener {
self.registered.remove(&token);
}
PollEvent::Message { token, payload, send_ts } => {
let MessagePayload::Cached(dref) = payload else {
tracing::error!("expected dcache-backed payload; dcache not configured");
return;
};

if let Some(expected_pubkey) = self.registered.get(&token) {
let header = match BidSubmissionHeader::try_from(payload) {
Ok(header) => header,
Err(e) => {
let header = match self
.submissions
.map(dref, |bytes| BidSubmissionHeader::try_from(bytes))
{
Ok(Ok(header)) => header,
Ok(Err(e)) => {
tracing::error!("{e} failed to parse the bid submission header");
self.submission_errors.push((
token,
None,
None,
BidSubmissionError::ParseError(e),
));

return;
}
Err(e) => {
tracing::error!("{e} dcache read failed for tcp header");
self.submission_errors.push((
token,
None,
None,
BidSubmissionError::InternalError,
));
return;
}
};
Expand All @@ -108,7 +126,6 @@ impl Tile<HelixSpine> for BidSubmissionTcpListener {
block_hash = tracing::field::Empty,
)
.entered();
let body = &payload[BID_SUB_HEADER_SIZE..];

let submission_ref =
SubmissionRef::Tcp { id, token, seq_num: header.sequence_number };
Expand All @@ -124,29 +141,17 @@ impl Tile<HelixSpine> for BidSubmissionTcpListener {
..Default::default()
};

match self.submissions.write(body.len(), |buffer| buffer.copy_from_slice(body))
{
Ok(dref) => adapter.produce(NewBidSubmission {
dref,
header: InternalBidSubmissionHeader::from_tcp_header(id, header),
submission_ref,
trace,
expected_pubkey: Some(*expected_pubkey),
}),
Err(e) => {
tracing::error!("{e} failed to write the bid submission into dcache");
self.submission_errors.push((
token,
Some(header.sequence_number),
Some(id),
BidSubmissionError::InternalError,
));
}
}
adapter.produce(NewBidSubmission {
dref,
payload_offset: BID_SUB_HEADER_SIZE,
header: InternalBidSubmissionHeader::from_tcp_header(id, header),
submission_ref,
trace,
expected_pubkey: Some(*expected_pubkey),
});
} else {
let registration_message = RegistrationMsg::from_ssz_bytes(payload);
match registration_message {
Ok(msg) => {
match self.submissions.map(dref, RegistrationMsg::from_ssz_bytes) {
Ok(Ok(msg)) => {
let api_key = Uuid::from_bytes(msg.api_key).to_string();
if self
.api_key_cache
Expand All @@ -163,10 +168,14 @@ impl Tile<HelixSpine> for BidSubmissionTcpListener {
self.to_disconnect.push(token);
}
}
Err(e) => {
Ok(Err(e)) => {
tracing::error!(err=?e, "invalid registration message");
self.to_disconnect.push(token);
}
Err(e) => {
tracing::error!("{e} dcache read failed for registration message");
self.to_disconnect.push(token);
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/relay/src/tcp_bid_recv/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl S3PayloadSaver {
let header = r.header.to_bytes();
let header_slice = header.as_slice();
let header_len = header_slice.len() as u16;
match submissions.map(r.dref, |payload| {
let payload_offset = r.payload_offset;
match submissions.map(r.dref, |full_payload| {
let payload = &full_payload[payload_offset..];
// format: [u16 LE header_len][header bytes][payload bytes]
let mut buf =
bytes::BytesMut::with_capacity(2 + header_slice.len() + payload.len());
Expand Down
Loading