diff --git a/crates/flux-network/Cargo.toml b/crates/flux-network/Cargo.toml index c1e6125..6cebc19 100644 --- a/crates/flux-network/Cargo.toml +++ b/crates/flux-network/Cargo.toml @@ -8,6 +8,7 @@ keywords = ["low-latency", "performance", "communication"] repository = "https://github.com/gattaca-com/flux" [dependencies] +flux.workspace = true flux-communication.workspace = true flux-timing.workspace = true flux-utils.workspace = true @@ -17,5 +18,7 @@ tracing.workspace = true wincode = { workspace = true, optional = true } [dev-dependencies] +spine-derive.workspace = true +tempfile.workspace = true wincode-derive = { workspace = true } wincode = { workspace = true } diff --git a/crates/flux-network/src/tcp/connector.rs b/crates/flux-network/src/tcp/connector.rs index 8532c35..04117a6 100644 --- a/crates/flux-network/src/tcp/connector.rs +++ b/crates/flux-network/src/tcp/connector.rs @@ -1,11 +1,12 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; +use flux::spine::{SpineProducerWithDCache, SpineProducers}; use flux_timing::{Duration, Nanos, Repeater}; -use flux_utils::{DCache, safe_panic}; +use flux_utils::{DCachePtr, safe_panic}; use mio::{Events, Interest, Poll, Token, event::Event, net::TcpListener}; use tracing::{debug, error, warn}; -use crate::tcp::{ConnState, MessagePayload, TcpStream, TcpTelemetry, stream::set_socket_buf_size}; +use crate::tcp::{ConnState, TcpStream, TcpTelemetry, stream::set_socket_buf_size}; #[derive(Clone, Copy, Debug)] #[repr(u8)] @@ -28,8 +29,12 @@ pub enum ConnectionVariant { Listener(TcpListener), } -/// Event emitted by [`TcpConnector::poll_with`] for each notable IO occurrence. -pub enum PollEvent<'a> { +/// Event emitted by [`TcpConnector::poll_with`] and +/// [`TcpConnector::poll_with_produce`] for each notable IO occurrence. +/// +/// For [`poll_with`]: `Payload = &'a [u8]`. +/// For [`poll_with_produce`]: `Payload = Result`. +pub enum PollEvent { /// A new connection was accepted from a listener. /// /// - `listener`: token of the listening socket that accepted @@ -43,7 +48,7 @@ pub enum PollEvent<'a> { /// A connection was closed (by the remote or due to an IO error). Disconnect { token: Token }, /// A complete framed message was received. - Message { token: Token, payload: MessagePayload<'a>, send_ts: Nanos }, + Message { token: Token, payload: Payload, send_ts: Nanos }, } struct ConnectionManager { @@ -53,7 +58,7 @@ struct ConnectionManager { on_connect_msg: Option>, telemetry: TcpTelemetry, socket_buf_size: Option, - dcache: Option>, + dcache: Option, // Always only outbound/client side connection streams to_be_reconnected: Vec<(Token, ConnectionVariant)>, @@ -314,7 +319,7 @@ impl ConnectionManager { #[inline] fn handle_event(&mut self, e: &Event, handler: &mut F) where - F: for<'a> FnMut(PollEvent<'a>), + F: for<'a> FnMut(PollEvent<&'a [u8]>), { let event_token = e.token(); let Some(stream_id) = self.conns.iter().position(|(t, _)| t == &event_token) else { @@ -330,8 +335,8 @@ impl ConnectionManager { self.poll.registry(), e, self.dcache.as_deref(), - &mut |token, payload, send_ts| { - handler(PollEvent::Message { token, payload, send_ts }); + &mut |token, bytes, send_ts| { + handler(PollEvent::Message { token, payload: bytes, send_ts }); }, ) == ConnState::Disconnected { @@ -390,6 +395,97 @@ impl ConnectionManager { } } } + + #[inline] + fn handle_event_produce( + &mut self, + e: &Event, + parse: &mut G, + produce: &mut P, + handler: &mut F, + ) where + T: 'static + Copy, + G: FnMut(Token, &[u8]) -> Result, + P: SpineProducers + AsRef>, + F: FnMut(PollEvent>), + { + let event_token = e.token(); + let Some(stream_id) = self.conns.iter().position(|(t, _)| t == &event_token) else { + safe_panic!("got event for unknown token"); + return; + }; + + loop { + match &mut self.conns[stream_id].1 { + ConnectionVariant::Outbound(tcp_connection) | + ConnectionVariant::Inbound(tcp_connection) => { + let dcache = + self.dcache.as_deref().expect("dcache required for poll_with_produce"); + if tcp_connection.poll_with_produce( + self.poll.registry(), + e, + dcache, + parse, + produce, + &mut |token, result, send_ts| { + handler(PollEvent::Message { token, payload: result, send_ts }); + }, + ) == ConnState::Disconnected + { + handler(PollEvent::Disconnect { token: event_token }); + self.disconnect_at_index(stream_id); + } + return; + } + ConnectionVariant::Listener(tcp_listener) => { + if let Ok((mut stream, addr)) = tcp_listener.accept() { + tracing::info!(?addr, "client connected"); + if let Some(size) = self.socket_buf_size { + set_socket_buf_size(&stream, size); + } + let token = Token(self.next_token); + if let Err(e) = + self.poll.registry().register(&mut stream, token, Interest::READABLE) + { + error!("couldn't register client {e}"); + let _ = stream.shutdown(std::net::Shutdown::Both); + continue; + }; + if let Err(e) = stream.set_nodelay(true) { + error!("couldn't set nodelay on stream to {addr}: {e}"); + continue; + } + let mut conn = TcpStream::from_stream_with_telemetry( + stream, + token, + addr, + self.telemetry, + self.dcache.is_some(), + ); + if let Some(msg) = &self.on_connect_msg && + conn.write_or_enqueue_with( + self.poll.registry(), + |buf: &mut Vec| { + buf.extend_from_slice(msg); + }, + ) == ConnState::Disconnected + { + continue; + } + handler(PollEvent::Accept { + listener: event_token, + stream: token, + peer_addr: addr, + }); + self.conns.push((token, ConnectionVariant::Inbound(conn))); + self.next_token += 1; + } else { + return; + } + } + } + } + } } /// Non-blocking TCP connector/acceptor built on `mio`. @@ -456,9 +552,9 @@ impl TcpConnector { self } - /// Attaches a dcache writer as the shared receive buffer for all streams. - pub fn with_dcache(mut self, writer: Arc) -> Self { - self.conn_mgr.dcache = Some(writer); + /// Attaches a dcache as the shared receive buffer for all streams. + pub fn with_dcache(mut self, dcache: DCachePtr) -> Self { + self.conn_mgr.dcache = Some(dcache); self } @@ -486,7 +582,7 @@ impl TcpConnector { #[inline] pub fn poll_with(&mut self, mut handler: F) -> bool where - F: for<'a> FnMut(PollEvent<'a>), + F: for<'a> FnMut(PollEvent<&'a [u8]>), { self.conn_mgr.maybe_reconnect(); for token in self.conn_mgr.reconnected_to.drain(..) { @@ -506,6 +602,43 @@ impl TcpConnector { o } + /// Like [`poll_with`] but for dcache-backed streams. For each received + /// message, applies `parse` to the payload bytes; if `Ok`, calls + /// `produce(t, send_ts)`. The handler receives + /// `PollEvent::Message { payload: Result, .. }`. + /// + /// # Panics + /// Panics if no dcache was configured via [`with_dcache`]. + #[inline] + pub fn poll_with_produce( + &mut self, + parse: &mut G, + produce: &mut P, + mut handler: F, + ) -> bool + where + T: 'static + Copy, + G: FnMut(Token, &[u8]) -> Result, + P: SpineProducers + AsRef>, + F: FnMut(PollEvent>), + { + self.conn_mgr.maybe_reconnect(); + for token in self.conn_mgr.reconnected_to.drain(..) { + handler(PollEvent::Reconnect { token }); + } + if let Err(e) = self.conn_mgr.poll.poll(&mut self.events, Some(std::time::Duration::ZERO)) { + safe_panic!("got error polling {e}"); + return false; + } + let mut o = false; + for e in self.events.iter() { + o = true; + self.conn_mgr.handle_event_produce(e, parse, produce, &mut handler); + } + self.conn_mgr.flush_backlogs(); + o + } + /// Writes immediately or enqueues bytes for later sending. /// /// `serialise` is called with a mutable send buffer and must return the diff --git a/crates/flux-network/src/tcp/mod.rs b/crates/flux-network/src/tcp/mod.rs index 8a01d56..0a5d24a 100644 --- a/crates/flux-network/src/tcp/mod.rs +++ b/crates/flux-network/src/tcp/mod.rs @@ -2,4 +2,4 @@ mod connector; mod stream; pub use connector::{PollEvent, SendBehavior, TcpConnector}; -pub use stream::{ConnState, MessagePayload, TcpStream, TcpTelemetry}; +pub use stream::{ConnState, TcpStream, TcpTelemetry}; diff --git a/crates/flux-network/src/tcp/stream.rs b/crates/flux-network/src/tcp/stream.rs index 6793884..5ed0b43 100644 --- a/crates/flux-network/src/tcp/stream.rs +++ b/crates/flux-network/src/tcp/stream.rs @@ -4,6 +4,7 @@ use std::{ net::SocketAddr, }; +use flux::spine::{SpineProducerWithDCache, SpineProducers}; use flux_communication::Timer; use flux_timing::Nanos; use flux_utils::{DCache, DCacheRef}; @@ -15,7 +16,7 @@ enum RxBuf { use mio::{Interest, Registry, Token, event::Event}; use tracing::{debug, warn}; -pub enum MessagePayload<'a> { +enum MessagePayload<'a> { Raw(&'a [u8]), Cached(DCacheRef), } @@ -198,9 +199,9 @@ impl TcpStream { /// Poll socket and calls `on_msg` for every fully assembled frame. /// - /// When no DCache is set, `payload` is [`MessagePayload::Raw`] and the - /// slice is only valid for the duration of the callback. When DCache is - /// set, `payload` is [`MessagePayload::Cached`] and the ref may be kept. + /// The byte slice passed to `on_msg` is only valid for the duration of the + /// callback. Use with non-dcache connectors; for dcache use + /// [`poll_with_produce`]. #[inline] pub fn poll_with( &mut self, @@ -210,13 +211,69 @@ impl TcpStream { on_msg: &mut F, ) -> ConnState where - F: for<'a> FnMut(Token, MessagePayload<'a>, Nanos), + F: for<'a> FnMut(Token, &'a [u8], Nanos), { if ev.is_readable() { loop { match self.read_frame(dcache) { - ReadOutcome::PayloadDone { payload, send_ts } => { - on_msg(ev.token(), payload, send_ts); + ReadOutcome::PayloadDone { payload: MessagePayload::Raw(bytes), send_ts } => { + on_msg(ev.token(), bytes, send_ts); + } + ReadOutcome::PayloadDone { payload: MessagePayload::Cached(_), .. } => { + flux_utils::safe_panic!( + "poll_with called on dcache stream; use poll_with_produce" + ); + } + ReadOutcome::WouldBlock => break, + ReadOutcome::Disconnected => return ConnState::Disconnected, + } + } + } + + if ev.is_writable() && self.drain_backlog(registry) == ConnState::Disconnected { + return ConnState::Disconnected; + } + + ConnState::Alive + } + + /// Like [`poll_with`] but for dcache-backed streams. Applies `parse` to + /// each payload's bytes and calls `produce(t, send_ts)` on `Ok`. Use with + /// dcache connectors; for raw use [`poll_with`]. + #[inline] + pub fn poll_with_produce( + &mut self, + registry: &Registry, + ev: &Event, + dcache: &DCache, + parse: &mut G, + produce: &mut P, + on_msg: &mut F, + ) -> ConnState + where + T: 'static + Copy, + G: FnMut(Token, &[u8]) -> Result, + P: SpineProducers + AsRef>, + F: FnMut(Token, Result, Nanos), + { + if ev.is_readable() { + loop { + match self.read_frame(Some(dcache)) { + ReadOutcome::PayloadDone { payload: MessagePayload::Raw(_), .. } => { + flux_utils::safe_panic!( + "poll_with_produce called on non-dcache stream; use poll_with" + ); + } + ReadOutcome::PayloadDone { payload: MessagePayload::Cached(dref), send_ts } => { + match dcache.map(dref, |bytes| parse(self.token, bytes)) { + Ok(result) => { + if let Ok(t) = &result { + produce.produce_with_dref(*t, dref, send_ts); + } + on_msg(ev.token(), result, send_ts); + } + Err(e) => warn!("dcache map failed: {e}"), + } } ReadOutcome::WouldBlock => break, ReadOutcome::Disconnected => return ConnState::Disconnected, diff --git a/crates/flux-network/tests/tcp_broadcast_burst.rs b/crates/flux-network/tests/tcp_broadcast_burst.rs index 4b3267d..a0564c3 100644 --- a/crates/flux-network/tests/tcp_broadcast_burst.rs +++ b/crates/flux-network/tests/tcp_broadcast_burst.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use flux_network::tcp::{MessagePayload, PollEvent, SendBehavior, TcpConnector}; +use flux_network::tcp::{PollEvent, SendBehavior, TcpConnector}; const NUM_RECEIVERS: usize = 4; const BURST_SIZE: usize = 20; @@ -26,10 +26,8 @@ fn spawn_receiver(addr: SocketAddr) -> thread::JoinHandle>> { while !disconnected && std::time::Instant::now() < deadline { conn.poll_with(|event| match event { - PollEvent::Message { payload, .. } => { - if let MessagePayload::Raw(bytes) = payload { - frames.push(bytes.to_vec()); - } + PollEvent::Message { payload: bytes, .. } => { + frames.push(bytes.to_vec()); } PollEvent::Disconnect { .. } => { disconnected = true; diff --git a/crates/flux-network/tests/tcp_dcache.rs b/crates/flux-network/tests/tcp_dcache.rs index 078a9d6..71b278a 100644 --- a/crates/flux-network/tests/tcp_dcache.rs +++ b/crates/flux-network/tests/tcp_dcache.rs @@ -1,62 +1,143 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::{ + Arc, Mutex, + atomic::{AtomicBool, Ordering}, + }, thread, time::{Duration, Instant}, }; -use flux_network::tcp::{MessagePayload, PollEvent, SendBehavior, TcpConnector}; -use flux_utils::DCache; +use flux::{ + communication::{ShmemData, cleanup_shmem}, + spine::{DCacheRead, ScopedSpine, SpineAdapter, SpineProducerWithDCache}, + tile::{Tile, TileConfig, TileInfo, attach_tile}, +}; +use flux_network::tcp::{PollEvent, SendBehavior, TcpConnector}; +use spine_derive::from_spine; + +#[derive(Clone, Copy, Debug, Default)] +#[repr(C)] +struct Payload([u8; 8]); + +#[from_spine("tcp-dcache-test")] +#[derive(Debug)] +struct TcpDcacheSpine { + pub tile_info: ShmemData, + #[queue(mtu(64))] + pub msg: SpineQueue, +} + +const BIND_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 24713); + +struct NetworkTile { + conn: Option, + ready: Arc, +} + +impl Tile for NetworkTile { + fn try_init(&mut self, adapter: &mut SpineAdapter) -> bool { + let sp: &SpineProducerWithDCache = adapter.producers.as_ref(); + let mut conn = TcpConnector::default().with_dcache(sp.dcache_ptr()); + conn.listen_at(BIND_ADDR).unwrap(); + self.conn = Some(conn); + self.ready.store(true, Ordering::Release); + true + } + + fn loop_body(&mut self, adapter: &mut SpineAdapter) { + let Some(conn) = &mut self.conn else { return }; + conn.poll_with_produce( + &mut |_, bytes| -> Result { + Ok(Payload(bytes.try_into().map_err(|_| ())?)) + }, + &mut adapter.producers, + |_: PollEvent<_>| {}, + ); + } +} + +struct ReaderTile { + received: Arc>>, + count: usize, + expected: usize, +} + +impl Tile for ReaderTile { + fn loop_body(&mut self, adapter: &mut SpineAdapter) { + let received = &self.received; + let count = &mut self.count; + adapter.consume_with_dcache::( + |msg, bytes| assert_eq!(&msg.0[..], bytes), + |result, _| { + if let DCacheRead::Ok((msg, ())) = result { + received.lock().unwrap().push(msg); + *count += 1; + } + }, + ); + if self.count >= self.expected { + adapter.request_stop_scope(); + } + } +} -/// Two inbound connections both write into the same connector dcache. -/// Verifies that both payloads are readable via the single shared reader. +/// Two TCP streams into the same dcache-backed spine queue. +/// Verifies dcache bytes match the queue message (same shmem region). #[test] fn dcache_multi_stream() { - let bind_addr = SocketAddr::from((IpAddr::V4(Ipv4Addr::LOCALHOST), 24712)); + let tmp = tempfile::tempdir().unwrap(); + let base = tmp.path(); - const MSG_A: &[u8] = b"stream-a"; - const MSG_B: &[u8] = b"stream-b"; - const CAPACITY: usize = 4096; + let mut spine = TcpDcacheSpine::new_with_base_dir(base, None); - let writer = DCache::new(CAPACITY); - let reader = writer.clone(); + const MSG_A: &[u8; 8] = b"stream-a"; + const MSG_B: &[u8; 8] = b"stream-b"; - let server = thread::spawn(move || { - let mut conn = TcpConnector::default().with_dcache(writer); - conn.listen_at(bind_addr).unwrap(); + let ready = Arc::new(AtomicBool::new(false)); + let received: Arc>> = Arc::new(Mutex::new(Vec::new())); - let mut accepted = 0usize; - let mut received = Vec::new(); + let ready_c = ready.clone(); + let client = thread::spawn(move || { let deadline = Instant::now() + Duration::from_secs(5); - while (accepted < 2 || received.len() < 2) && Instant::now() < deadline { - conn.poll_with(|ev| match ev { - PollEvent::Accept { .. } => accepted += 1, - PollEvent::Message { payload: MessagePayload::Cached(r), .. } => { - received.push(reader.map(r, |b| b.to_vec()).unwrap()); - } - _ => {} - }); - thread::sleep(Duration::from_micros(50)); + while !ready_c.load(Ordering::Acquire) && Instant::now() < deadline { + thread::sleep(Duration::from_millis(1)); } - assert_eq!(received.len(), 2); - assert!(received.contains(&MSG_A.to_vec())); - assert!(received.contains(&MSG_B.to_vec())); - }); - - for msg in [MSG_A, MSG_B] { - thread::spawn(move || { - thread::sleep(Duration::from_millis(10)); + for msg in [MSG_A, MSG_B] { let mut conn = TcpConnector::default(); - let tok = conn.connect(bind_addr).unwrap(); + let tok = conn.connect(BIND_ADDR).unwrap(); conn.write_or_enqueue_with(SendBehavior::Single(tok), |buf| { buf.extend_from_slice(msg); }); - let deadline = Instant::now() + Duration::from_secs(5); - while Instant::now() < deadline { + let flush = Instant::now() + Duration::from_millis(100); + while Instant::now() < flush { conn.poll_with(|_| {}); thread::sleep(Duration::from_micros(50)); } - }); - } + } + }); - server.join().unwrap(); + thread::scope(|scope| { + let mut scoped = ScopedSpine::new(&mut spine, scope, None, None); + attach_tile( + NetworkTile { conn: None, ready: ready.clone() }, + &mut scoped, + TileConfig::background(None, Some(flux::timing::Duration::from_millis(1))), + ); + attach_tile( + ReaderTile { received: received.clone(), count: 0, expected: 2 }, + &mut scoped, + TileConfig::background(None, None), + ); + }); + + client.join().unwrap(); + + { + let guard = received.lock().unwrap(); + assert_eq!(guard.len(), 2); + assert!(guard.iter().any(|p| p.0 == *MSG_A)); + assert!(guard.iter().any(|p| p.0 == *MSG_B)); + } + cleanup_shmem(base); } diff --git a/crates/flux-network/tests/tcp_roundtrip.rs b/crates/flux-network/tests/tcp_roundtrip.rs index 9f2fd55..9cbf960 100644 --- a/crates/flux-network/tests/tcp_roundtrip.rs +++ b/crates/flux-network/tests/tcp_roundtrip.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use flux_network::tcp::{MessagePayload, PollEvent, SendBehavior, TcpConnector}; +use flux_network::tcp::{PollEvent, SendBehavior, TcpConnector}; use wincode_derive::{SchemaRead, SchemaWrite}; #[derive(Debug, PartialEq, SchemaRead, SchemaWrite)] @@ -40,12 +40,10 @@ fn tcp_roundtrip() { let mut recv = None; loop { listener.poll_with(|event| { - if let PollEvent::Message { token, payload, .. } = event { + if let PollEvent::Message { token, payload: bytes, .. } = event { assert_eq!(token, stream_token); - if let MessagePayload::Raw(bytes) = payload { - let msg: TestMsg = wincode::deserialize(bytes).unwrap(); - recv = Some(msg); - } + let msg: TestMsg = wincode::deserialize(bytes).unwrap(); + recv = Some(msg); } }); if recv.is_some() { @@ -77,11 +75,9 @@ fn tcp_roundtrip() { let mut recv = None; loop { conn.poll_with(|event| { - if let PollEvent::Message { payload, .. } = event { - if let MessagePayload::Raw(bytes) = payload { - let msg: TestMsg = wincode::deserialize(bytes).unwrap(); - recv = Some(msg); - } + if let PollEvent::Message { payload: bytes, .. } = event { + let msg: TestMsg = wincode::deserialize(bytes).unwrap(); + recv = Some(msg); } }); if recv.is_some() { diff --git a/crates/flux/Cargo.toml b/crates/flux/Cargo.toml index f320e83..a2a3a83 100644 --- a/crates/flux/Cargo.toml +++ b/crates/flux/Cargo.toml @@ -11,7 +11,6 @@ repository = "https://github.com/gattaca-com/flux" flux-communication.workspace = true flux-timing.workspace = true flux-utils.workspace = true -flux-network.workspace = true spine-derive.workspace = true type-hash.workspace = true type-hash-derive.workspace = true @@ -39,5 +38,4 @@ wincode = [ "dep:wincode-derive", "flux-utils/wincode", "flux-timing/wincode", - "flux-network/wincode", ] diff --git a/crates/flux/src/lib.rs b/crates/flux/src/lib.rs index f65c9a8..03ff5ef 100644 --- a/crates/flux/src/lib.rs +++ b/crates/flux/src/lib.rs @@ -7,7 +7,6 @@ mod timer; pub use core_affinity; pub use flux_communication as communication; -pub use flux_network as network; pub use flux_timing as timing; pub use flux_utils as utils; pub use spine_derive; diff --git a/crates/flux/src/spine/adapter.rs b/crates/flux/src/spine/adapter.rs index e919ff9..7459889 100644 --- a/crates/flux/src/spine/adapter.rs +++ b/crates/flux/src/spine/adapter.rs @@ -8,8 +8,8 @@ use signal_hook::consts::SIGINT; use crate::{ spine::{ - DCacheMsg, DCacheRead, FluxSpine, SpineConsumer, SpineProducer, SpineProducerWithDCache, - SpineProducers, + DCacheRead, FluxSpine, SpineConsumer, SpineDCacheConsumer, SpineProducer, + SpineProducerWithDCache, SpineProducers, }, tile::Tile, }; @@ -101,15 +101,10 @@ impl SpineAdapter { pub fn produce_with_dcache(&mut self, data: T, payload: Option<(usize, F)>) where T: 'static + Copy, - S::Producers: SpineProducers + AsMut>, + S::Producers: SpineProducers + AsRef>, F: FnOnce(&mut [u8]), { - let ts = self.producers.timestamp().with_new_publish_delta(); - let p: &mut SpineProducerWithDCache = self.producers.as_mut(); - let dref = - payload.map(|(len, write)| p.dcache.write(len, write).expect("dcache write failed")); - let msg = InternalMessage::new(ts, DCacheMsg::new(data, dref)); - p.inner.produce_without_first(&msg); + self.producers.produce_with_dcache(data, payload); self.did_work = true; } @@ -206,61 +201,84 @@ impl SpineAdapter { } #[inline] - pub fn consume_with_dcache(&mut self, mut read: F) -> DCacheRead + pub fn consume_with_dcache(&mut self, mut read: F, mut handle: G) where T: 'static + Copy, - S::Consumers: AsMut>>, - F: FnMut(&[u8]) -> R, + S::Consumers: AsMut>, + S::Producers: SpineProducers, + F: FnMut(T, &[u8]) -> R, + G: FnMut(DCacheRead, &mut S::Producers), { - let c: &mut SpineConsumer> = self.consumers.as_mut(); - let result = c.consume_with_dcache(&mut read); - self.did_work |= !matches!(result, DCacheRead::Empty); - result + let c: &mut SpineDCacheConsumer = self.consumers.as_mut(); + loop { + match c.consume(&mut self.producers, &mut read) { + DCacheRead::Empty => break, + result => { + self.did_work = true; + handle(result, &mut self.producers); + } + } + } } #[inline] - pub fn consume_with_dcache_collaborative(&mut self, mut read: F) -> DCacheRead + pub fn consume_with_dcache_collaborative(&mut self, mut read: F, mut handle: G) where T: 'static + Copy, - S::Consumers: AsMut>>, + S::Consumers: AsMut>, + S::Producers: SpineProducers, F: FnMut(T, &[u8]) -> R, + G: FnMut(DCacheRead, &mut S::Producers), { - let c: &mut SpineConsumer> = self.consumers.as_mut(); - let result = c.consume_with_dcache_collaborative(&mut read); - self.did_work |= !matches!(result, DCacheRead::Empty); - result + let c: &mut SpineDCacheConsumer = self.consumers.as_mut(); + match c.consume_collaborative(&mut self.producers, &mut read) { + result => { + self.did_work = true; + handle(result, &mut self.producers); + } + } } #[inline] - pub fn consume_with_dcache_internal_message( - &mut self, - mut read: F, - ) -> DCacheRead, R> + pub fn consume_with_dcache_internal_message(&mut self, mut read: F, mut handle: G) where T: 'static + Copy, - S::Consumers: AsMut>>, + S::Consumers: AsMut>, + S::Producers: SpineProducers, F: FnMut(&InternalMessage, &[u8]) -> R, + G: FnMut(DCacheRead, R>, &mut S::Producers), { - let c: &mut SpineConsumer> = self.consumers.as_mut(); - let result = c.consume_with_dcache_internal_message(&mut read); - self.did_work |= !matches!(result, DCacheRead::Empty); - result + let c: &mut SpineDCacheConsumer = self.consumers.as_mut(); + loop { + match c.consume_internal_message(&mut self.producers, &mut read) { + DCacheRead::Empty => break, + result => { + self.did_work = true; + handle(result, &mut self.producers); + } + } + } } #[inline] - pub fn consume_with_dcache_collaborative_internal_message( + pub fn consume_with_dcache_collaborative_internal_message( &mut self, mut read: F, - ) -> DCacheRead, R> - where + mut handle: G, + ) where T: 'static + Copy, - S::Consumers: AsMut>>, + S::Consumers: AsMut>, + S::Producers: SpineProducers, F: FnMut(&InternalMessage, &[u8]) -> R, + G: FnMut(DCacheRead, R>, &mut S::Producers), { - let c: &mut SpineConsumer> = self.consumers.as_mut(); - let result = c.consume_with_dcache_collaborative_internal_message(&mut read); - self.did_work |= !matches!(result, DCacheRead::Empty); - result + let c: &mut SpineDCacheConsumer = self.consumers.as_mut(); + match c.consume_collaborative_internal_message(&mut self.producers, &mut read) { + result => { + self.did_work = true; + handle(result, &mut self.producers); + } + } } /// Override the collaborative group label for queue `T`. By default each @@ -281,6 +299,14 @@ impl SpineAdapter { c.inner.set_collaborative_group(group_label); } + pub fn set_collaborative_group_dcache(&mut self, group_label: &'static str) + where + S::Consumers: AsMut>, + { + let c: &mut SpineDCacheConsumer = self.consumers.as_mut(); + c.inner.set_collaborative_group(group_label); + } + #[inline] pub fn consume_internal_message(&mut self, mut f: F) where diff --git a/crates/flux/src/spine/consumer.rs b/crates/flux/src/spine/consumer.rs index bbc8929..68bc902 100644 --- a/crates/flux/src/spine/consumer.rs +++ b/crates/flux/src/spine/consumer.rs @@ -1,48 +1,19 @@ use std::{ops::Deref, path::Path}; use flux_timing::InternalMessage; -use flux_utils::{DCachePtr, DCacheRef, safe_panic, short_typename}; +use flux_utils::{DCachePtr, short_typename}; use crate::{ Timer, communication::{ReadError, queue}, - spine::{FluxSpine, SpineProducers, SpineQueue}, + spine::{DCacheMsg, FluxSpine, SpineProducers, SpineQueue}, tile::Tile, }; -#[derive(Debug)] -pub enum DCacheRead { - Ok((T, R)), - /// Message consumed but no dcache ref present; payload not read. - NoRef(T), - /// Queue was empty. - Empty, - /// Consumer got sped past. - SpedPast, - /// A message was dequeued but the payload could not be safely read - /// (producer lapped the consumer in either the queue seqlock or dcache). - Lost(T), -} - -/// Queue element wrapper for dcache-backed queues. Produced by -/// [`SpineProducerWithDCache`]; never constructed directly by users. -#[derive(Clone, Copy, Debug)] -pub struct DCacheMsg { - pub data: T, - dref: DCacheRef, -} - -impl DCacheMsg { - pub(crate) fn new(data: T, dref: Option) -> Self { - Self { data, dref: dref.unwrap_or(DCacheRef::NONE) } - } -} - #[derive(Clone, Copy, Debug)] pub struct SpineConsumer { timer: Timer, pub inner: queue::Consumer>, - dcache: Option, } impl Deref for SpineConsumer { @@ -54,6 +25,24 @@ impl Deref for SpineConsumer { } impl SpineConsumer { + #[inline] + pub fn attach(base_dir: D, tile: &Tl, queue: SpineQueue) -> Self + where + D: AsRef, + S: FluxSpine, + Tl: Tile, + { + let label: &'static str = Box::leak(tile.name().as_str().to_owned().into_boxed_str()); + + let timer = Timer::new_with_base_dir( + base_dir, + S::app_name(), + format!("{}-{}", tile.name(), short_typename::()), + ); + + Self { timer, inner: queue::Consumer::new(queue, label) } + } + #[inline] pub fn consume(&mut self, producers: &mut P, mut f: F) -> bool where @@ -227,13 +216,64 @@ impl SpineConsumer { } } -impl SpineConsumer> { +#[derive(Debug)] +pub enum DCacheRead { + Ok((T, R)), + /// Message consumed but no dcache ref present; payload not read. + NoRef(T), + /// Queue was empty. + Empty, + /// Consumer got sped past. + SpedPast, + /// A message was dequeued but the payload could not be safely read + /// (producer lapped the consumer in either the queue seqlock or dcache). + Lost(T), +} + +#[derive(Clone, Copy, Debug)] +pub struct SpineDCacheConsumer { + timer: Timer, + pub inner: queue::Consumer>>, + dcache: DCachePtr, +} + +impl Deref for SpineDCacheConsumer { + type Target = queue::Consumer>>; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl SpineDCacheConsumer { + #[inline] + pub fn attach( + base_dir: D, + tile: &Tl, + queue: SpineQueue>, + dcache: DCachePtr, + ) -> Self + where + D: AsRef, + S: FluxSpine, + Tl: Tile, + { + let label: &'static str = Box::leak(tile.name().as_str().to_owned().into_boxed_str()); + let timer = Timer::new_with_base_dir( + base_dir, + S::app_name(), + format!("{}-{}", tile.name(), short_typename::()), + ); + Self { timer, inner: queue::Consumer::new(queue, label), dcache } + } + #[inline] - pub fn consume_with_dcache(&mut self, mut read: F) -> DCacheRead + pub fn consume(&mut self, producers: &mut P, mut read: F) -> DCacheRead where - F: FnMut(&[u8]) -> R, + P: SpineProducers, + F: FnMut(T, &[u8]) -> R, { - match self.consume_with_dcache_internal_message(|_msg, payload| read(payload)) { + match self.consume_internal_message(producers, |msg, payload| read(**msg, payload)) { DCacheRead::Ok((msg, r)) => DCacheRead::Ok((msg.into_data(), r)), DCacheRead::Lost(msg) => DCacheRead::Lost(msg.into_data()), DCacheRead::NoRef(msg) => DCacheRead::NoRef(msg.into_data()), @@ -243,12 +283,17 @@ impl SpineConsumer> { } #[inline] - pub fn consume_with_dcache_collaborative(&mut self, mut read: F) -> DCacheRead + pub fn consume_collaborative( + &mut self, + producers: &mut P, + mut read: F, + ) -> DCacheRead where + P: SpineProducers, F: FnMut(T, &[u8]) -> R, { match self - .consume_with_dcache_collaborative_internal_message(|msg, payload| read(**msg, payload)) + .consume_collaborative_internal_message(producers, |msg, payload| read(**msg, payload)) { DCacheRead::Ok((msg, r)) => DCacheRead::Ok((msg.into_data(), r)), DCacheRead::Lost(msg) => DCacheRead::Lost(msg.into_data()), @@ -259,32 +304,33 @@ impl SpineConsumer> { } #[inline] - pub fn consume_with_dcache_collaborative_internal_message( + pub fn consume_collaborative_internal_message( &mut self, + producers: &mut P, mut read: F, ) -> DCacheRead, R> where + P: SpineProducers, F: FnMut(&InternalMessage, &[u8]) -> R, { match self.inner.try_consume_with_epoch_collaborative() { Ok((&msg, slot_pos, slot_ver)) => { + let ingestion_t = msg.ingestion_time(); + *producers.timestamp_mut().ingestion_t_mut() = ingestion_t; let dref = msg.data().dref; if dref.is_none() { return DCacheRead::NoRef(msg.with_data(msg.data().data)); } - let Some(dc) = self.dcache else { - safe_panic!( - "dcache-backed consumer has no dcache handle; use attach_with_dcache" - ); - return DCacheRead::NoRef(msg.with_data(msg.data().data)); - }; let user_msg = msg.with_data(msg.data().data); - let Ok(extracted) = dc.map(dref, |payload| read(&user_msg, payload)) else { + self.timer.start(); + let Ok(extracted) = self.dcache.map(dref, |payload| read(&user_msg, payload)) + else { return DCacheRead::Lost(user_msg); }; if self.inner.slot_version(slot_pos) != slot_ver { return DCacheRead::Lost(user_msg); } + self.timer.record_processing_and_latency_from(ingestion_t.into()); DCacheRead::Ok((user_msg, extracted)) } Err(ReadError::SpedPast) => { @@ -296,33 +342,34 @@ impl SpineConsumer> { } #[inline] - pub fn consume_with_dcache_internal_message( + pub fn consume_internal_message( &mut self, + producers: &mut P, mut read: F, ) -> DCacheRead, R> where + P: SpineProducers, F: FnMut(&InternalMessage, &[u8]) -> R, { loop { match self.inner.try_consume_with_epoch() { Ok((&msg, slot_pos, slot_ver)) => { + let ingestion_t = msg.ingestion_time(); + *producers.timestamp_mut().ingestion_t_mut() = ingestion_t; let dref = msg.data().dref; if dref.is_none() { return DCacheRead::NoRef(msg.with_data(msg.data().data)); } - let Some(dc) = self.dcache else { - safe_panic!( - "dcache-backed consumer has no dcache handle; use attach_with_dcache" - ); - return DCacheRead::NoRef(msg.with_data(msg.data().data)); - }; let user_msg = msg.with_data(msg.data().data); - let Ok(extracted) = dc.map(dref, |payload| read(&user_msg, payload)) else { + self.timer.start(); + let Ok(extracted) = self.dcache.map(dref, |payload| read(&user_msg, payload)) + else { return DCacheRead::Lost(user_msg); }; if self.inner.slot_version(slot_pos) != slot_ver { return DCacheRead::Lost(user_msg); } + self.timer.record_processing_and_latency_from(ingestion_t.into()); return DCacheRead::Ok((user_msg, extracted)); } Err(ReadError::SpedPast) => { @@ -333,40 +380,3 @@ impl SpineConsumer> { } } } - -impl SpineConsumer { - #[inline] - pub fn attach(base_dir: D, tile: &Tl, queue: SpineQueue) -> Self - where - D: AsRef, - S: FluxSpine, - Tl: Tile, - { - let label: &'static str = Box::leak(tile.name().as_str().to_owned().into_boxed_str()); - - let timer = Timer::new_with_base_dir( - base_dir, - S::app_name(), - format!("{}-{}", tile.name(), short_typename::()), - ); - - Self { timer, inner: queue::Consumer::new(queue, label), dcache: None } - } - - #[inline] - pub fn attach_with_dcache( - base_dir: D, - tile: &Tl, - queue: SpineQueue, - dcache: DCachePtr, - ) -> Self - where - D: AsRef, - S: FluxSpine, - Tl: Tile, - { - let mut s = Self::attach::(base_dir, tile, queue); - s.dcache = Some(dcache); - s - } -} diff --git a/crates/flux/src/spine/mod.rs b/crates/flux/src/spine/mod.rs index 4bef1b2..423d686 100644 --- a/crates/flux/src/spine/mod.rs +++ b/crates/flux/src/spine/mod.rs @@ -6,11 +6,11 @@ mod standalone_producer; use std::path::Path; pub use adapter::SpineAdapter; -pub use consumer::{DCacheMsg, DCacheRead, SpineConsumer}; -use flux_timing::{IngestionTime, InternalMessage, TrackingTimestamp}; -use flux_utils::{DCachePtr, directories::shmem_dir}; +pub use consumer::{DCacheRead, SpineConsumer, SpineDCacheConsumer}; +use flux_timing::{IngestionTime, InternalMessage, Nanos, TrackingTimestamp}; +use flux_utils::{DCachePtr, DCacheRef, directories::shmem_dir}; pub use scoped::ScopedSpine; -pub use standalone_producer::StandaloneProducer; +pub use standalone_producer::{StandaloneDCacheProducer, StandaloneProducer}; use crate::{ communication::queue::{self}, @@ -20,6 +20,20 @@ use crate::{ pub type SpineProducer = queue::Producer>; pub type SpineQueue = queue::Queue>; +/// Wire type for dcache-backed queues. Internal to the spine; users see `T` +/// and `&[u8]` at consume sites. +#[derive(Clone, Copy, Debug)] +pub struct DCacheMsg { + pub data: T, + dref: DCacheRef, +} + +impl DCacheMsg { + pub(crate) fn new(data: T, dref: Option) -> Self { + Self { data, dref: dref.unwrap_or(DCacheRef::NONE) } + } +} + #[derive(Clone, Copy, Debug)] pub struct SpineProducerWithDCache { pub(crate) inner: SpineProducer>, @@ -30,6 +44,10 @@ impl SpineProducerWithDCache { pub fn new(queue: SpineQueue>, dcache: DCachePtr) -> Self { Self { inner: queue::Producer::from(queue), dcache } } + + pub fn dcache_ptr(&self) -> DCachePtr { + self.dcache + } } impl AsRef>> for SpineProducerWithDCache { @@ -70,6 +88,47 @@ pub trait SpineProducers { { self.as_ref().produce_without_first(msg); } + + fn produce_with_dcache( + &self, + data: T, + payload: Option<(usize, F)>, + ) where + Self: AsRef>, + { + let ts = self.timestamp().with_new_publish_delta(); + let p: &SpineProducerWithDCache = self.as_ref(); + let dref = + payload.map(|(len, write)| p.dcache.write(len, write).expect("dcache write failed")); + let msg = InternalMessage::new(ts, DCacheMsg::new(data, dref)); + p.inner.produce_without_first(&msg); + } + + fn produce_with_dref(&self, data: T, dref: DCacheRef, send_ts: Nanos) + where + Self: AsRef>, + { + let ts = self.timestamp().with_ingestion_t(send_ts.into()); + let p: &SpineProducerWithDCache = self.as_ref(); + let msg = InternalMessage::new(ts, DCacheMsg::new(data, Some(dref))); + p.inner.produce_without_first(&msg); + } + + fn produce_with_dcache_and_ingestion( + &self, + data: T, + payload: Option<(usize, F)>, + ingestion_t: IngestionTime, + ) where + Self: AsRef>, + { + let ts = self.timestamp().with_ingestion_t(ingestion_t); + let p: &SpineProducerWithDCache = self.as_ref(); + let dref = + payload.map(|(len, write)| p.dcache.write(len, write).expect("dcache write failed")); + let msg = InternalMessage::new(ts, DCacheMsg::new(data, dref)); + p.inner.produce_without_first(&msg); + } } pub trait FluxSpine: Sized + Send { @@ -97,13 +156,13 @@ pub trait FluxSpine: Sized + Send { fn standalone_dcache_producer_for( &mut self, name: TileName, - ) -> StandaloneProducer> + ) -> StandaloneDCacheProducer where Self: HasDCacheQueue, { let id = self.register_tile(name); let (queue, dcache) = self.dcache_queue_and_ptr(); - StandaloneProducer::new_with_dcache(queue, dcache, id) + StandaloneDCacheProducer::new(queue, dcache, id) } /// Removes all files related to a given spine. Does not clear the shared diff --git a/crates/flux/src/spine/standalone_producer.rs b/crates/flux/src/spine/standalone_producer.rs index b9ca980..9d2f74e 100644 --- a/crates/flux/src/spine/standalone_producer.rs +++ b/crates/flux/src/spine/standalone_producer.rs @@ -1,68 +1,50 @@ -use flux_timing::{TrackingTimestamp, UNREGISTERED_TILE_ID}; +use flux_timing::{IngestionTime, InternalMessage, TrackingTimestamp}; use flux_utils::DCachePtr; -use super::{DCacheMsg, SpineProducer, SpineProducers, SpineQueue}; +use super::{DCacheMsg, SpineProducer, SpineProducerWithDCache, SpineQueue}; use crate::communication::queue; #[derive(Debug)] pub struct StandaloneProducer { producer: SpineProducer, timestamp: TrackingTimestamp, - dcache: Option, } impl StandaloneProducer { pub(super) fn new(queue: SpineQueue, tile_id: u16) -> Self { - Self { - producer: queue::Producer::from(queue), - timestamp: TrackingTimestamp::new(tile_id), - dcache: None, - } - } - - pub(super) fn new_with_dcache(queue: SpineQueue, dcache: DCachePtr, tile_id: u16) -> Self { - Self { - producer: queue::Producer::from(queue), - timestamp: TrackingTimestamp::new(tile_id), - dcache: Some(dcache), - } - } -} - -impl SpineProducers for StandaloneProducer { - fn timestamp(&self) -> &TrackingTimestamp { - &self.timestamp + Self { producer: queue::Producer::from(queue), timestamp: TrackingTimestamp::new(tile_id) } } - fn timestamp_mut(&mut self) -> &mut TrackingTimestamp { - &mut self.timestamp + pub fn produce_with_ingestion(&self, d: T, ingestion_t: IngestionTime) { + let msg = InternalMessage::new(self.timestamp.with_ingestion_t(ingestion_t), d); + self.producer.produce_without_first(&msg); } } -impl AsRef> for StandaloneProducer { - fn as_ref(&self) -> &SpineProducer { - &self.producer - } +#[derive(Debug)] +pub struct StandaloneDCacheProducer { + inner: SpineProducerWithDCache, + timestamp: TrackingTimestamp, } -impl From> for StandaloneProducer { - fn from(queue: SpineQueue) -> Self { - Self::new(queue, UNREGISTERED_TILE_ID) +impl StandaloneDCacheProducer { + pub(super) fn new(queue: SpineQueue>, dcache: DCachePtr, tile_id: u16) -> Self { + Self { + inner: SpineProducerWithDCache::new(queue, dcache), + timestamp: TrackingTimestamp::new(tile_id), + } } -} -impl StandaloneProducer> { - pub fn produce_with_dcache( - &mut self, + pub fn produce_with_ingestion( + &self, data: T, payload: Option<(usize, F)>, + ingestion_t: IngestionTime, ) { - let dref = payload.map(|(len, write)| { - self.dcache - .expect("producer has no dcache") - .write(len, write) - .expect("dcache write failed") - }); - self.produce(DCacheMsg::new(data, dref)); + let ts = self.timestamp.with_ingestion_t(ingestion_t); + let dref = + payload.map(|(len, write)| self.inner.dcache.write(len, write).expect("dcache write")); + let msg = InternalMessage::new(ts, DCacheMsg::new(data, dref)); + self.inner.inner.produce_without_first(&msg); } } diff --git a/crates/spine-derive/src/lib.rs b/crates/spine-derive/src/lib.rs index faa3a61..75e11ee 100644 --- a/crates/spine-derive/src/lib.rs +++ b/crates/spine-derive/src/lib.rs @@ -187,14 +187,14 @@ pub fn from_spine(attr: TokenStream, item: TokenStream) -> TokenStream { let dcache_ident = format_ident!("{}_dcache", field_ident); consumer_fields.push(quote! { - pub #field_ident : ::flux::spine::SpineConsumer<::flux::spine::DCacheMsg<#inner_ty>> + pub #field_ident : ::flux::spine::SpineDCacheConsumer<#inner_ty> }); producer_fields.push(quote! { pub #field_ident : ::flux::spine::SpineProducerWithDCache<#inner_ty> }); consumer_init.push(quote! { - #field_ident : ::flux::spine::SpineConsumer::attach_with_dcache::<_, #struct_ident, _>( + #field_ident : ::flux::spine::SpineDCacheConsumer::attach::<_, #struct_ident, _>( &spine.base_dir, tile, spine.#field_ident, spine.#dcache_ident) }); producer_init.push(quote! { @@ -210,27 +210,27 @@ pub fn from_spine(attr: TokenStream, item: TokenStream) -> TokenStream { self.#field_ident.as_ref() } } - impl AsMut<::flux::spine::SpineProducerWithDCache<#inner_ty>> + impl AsRef<::flux::spine::SpineProducerWithDCache<#inner_ty>> for #producers_ident { - fn as_mut(&mut self) -> &mut ::flux::spine::SpineProducerWithDCache<#inner_ty> { - &mut self.#field_ident + fn as_ref(&self) -> &::flux::spine::SpineProducerWithDCache<#inner_ty> { + &self.#field_ident } } }); as_mut_impls.push(quote! { - impl AsMut<::flux::spine::SpineConsumer<::flux::spine::DCacheMsg<#inner_ty>>> + impl AsMut<::flux::spine::SpineDCacheConsumer<#inner_ty>> for #consumers_ident { - fn as_mut(&mut self) -> &mut ::flux::spine::SpineConsumer<::flux::spine::DCacheMsg<#inner_ty>> { + fn as_mut(&mut self) -> &mut ::flux::spine::SpineDCacheConsumer<#inner_ty> { &mut self.#field_ident } } - impl AsRef<::flux::spine::SpineConsumer<::flux::spine::DCacheMsg<#inner_ty>>> + impl AsRef<::flux::spine::SpineDCacheConsumer<#inner_ty>> for #consumers_ident { - fn as_ref(&self) -> &::flux::spine::SpineConsumer<::flux::spine::DCacheMsg<#inner_ty>> { + fn as_ref(&self) -> &::flux::spine::SpineDCacheConsumer<#inner_ty> { &self.#field_ident } }