From d1a6dc1f45ddf866ad2c67daed4af8bae8dbada1 Mon Sep 17 00:00:00 2001 From: Mike Young Date: Sun, 25 Jan 2026 03:39:22 +0000 Subject: [PATCH 1/4] Draft trait for a network service --- core/src/net/mod.rs | 3 +- core/src/net/service.rs | 63 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 core/src/net/service.rs diff --git a/core/src/net/mod.rs b/core/src/net/mod.rs index 66141a6..c43a1fe 100644 --- a/core/src/net/mod.rs +++ b/core/src/net/mod.rs @@ -3,4 +3,5 @@ pub mod error; pub mod packet; pub mod types; pub mod client; -pub mod session_store; \ No newline at end of file +pub mod session_store; +pub mod service; \ No newline at end of file diff --git a/core/src/net/service.rs b/core/src/net/service.rs new file mode 100644 index 0000000..2098f58 --- /dev/null +++ b/core/src/net/service.rs @@ -0,0 +1,63 @@ +use tokio_util::sync::CancellationToken; +use tokio::task::JoinError; +use thiserror::Error; +use crate::{net::{session::NetSession,error::{NetError,CryptoError}},message::{IncomingMessage,OutgoingMessage,Payload},peer::{PeerId},utils::{SerdeError},channels::{ChannelError}}; + +#[derive(Debug,Error)] +pub enum NetServError { + #[error(transparent)] + Serde(#[from] SerdeError), + + #[error(transparent)] + Channel(#[from] ChannelError), + + #[error(transparent)] + Crypto(#[from] CryptoError), + + #[error(transparent)] + Net(#[from] NetError), + + #[error(transparent)] + Join(#[from] JoinError), + + #[error("client not found")] + ClientNotFound, + + #[error("sessions not found")] + SessionsNotFound, + + #[error("Session already exists")] + SessionAlreadyExists, + + #[error("peer not found")] + PeerNotFound, + +} + + + +pub trait NetService{ + // Interface for handling connections out from a single peer/client + + type Error: Into; + + // Add fully formed sessions to the service + // Deal with handshakes prior to adding into service + // Fail on adding a session if the peer already exists + fn add_session(&mut self, client: (PeerId,NetSession)) -> Result<(), Self::Error>; + + // Drop a session and return it + // Don't close it, hand it back to the caller to let them handle it + fn drop_session(&mut self, peer: &PeerId) -> Result; + + // Listen for incoming messages from all peers + fn listen(&self, token: CancellationToken) -> impl Future> + Send; + + // Broadcast messages to all sessions + // Responsible for encrypting for each peer + fn broadcast(&self, msg: Payload, token: CancellationToken) -> impl Future> + Send; + + // Transmit messages to a specific session + // OutgoingMessage has its own PeerID + fn transmit(&self, msg: OutgoingMessage, token: CancellationToken) -> impl Future> + Send; +} \ No newline at end of file From 5398a343a7b3b5240e7c6618823075610019ab9d Mon Sep 17 00:00:00 2001 From: Mike Young Date: Sat, 7 Mar 2026 00:38:30 +0000 Subject: [PATCH 2/4] WIP --- core/Cargo.lock | 95 +++++++++- core/Cargo.toml | 4 +- core/src/net/ip.rs | 24 +++ core/src/net/service.rs | 16 +- core/src/payload/dht.rs | 4 +- core/src/payload/mod.rs | 8 +- core/src/payload/pow.rs | 6 +- core/src/payload/tag.rs | 4 +- core/src/pow.rs | 2 +- core/src/tag/tag.rs | 2 +- core/src/transport/error.rs | 25 ++- core/src/transport/mod.rs | 1 + core/src/transport/tcp.rs | 334 ++++++++++++++++++++++++++++++++++++ 13 files changed, 498 insertions(+), 27 deletions(-) create mode 100644 core/src/net/ip.rs create mode 100644 core/src/transport/tcp.rs diff --git a/core/Cargo.lock b/core/Cargo.lock index fd21648..2d980a3 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -350,7 +350,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -613,7 +613,7 @@ checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -786,7 +786,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -881,6 +881,16 @@ version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" +[[package]] +name = "socket2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "spin" version = "0.9.8" @@ -933,7 +943,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -967,8 +977,9 @@ dependencies = [ "mio", "pin-project-lite", "signal-hook-registry", + "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1055,6 +1066,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -1064,6 +1084,71 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index ab8e52b..ec89c04 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -30,10 +30,10 @@ pqc_kyber = { version = "0.7.1", features = ["std", "kyber512"] } rand_chacha = "0.3.1" pqc_dilithium_edit = "0.2.0" async-trait = "0.1.89" -tokio = { version = "1.48.0", features = ["sync", "rt", "macros", "rt-multi-thread", "time", "fs", "io-util", "signal"] } +tokio = { version = "1.48.0", features = ["sync", "rt", "macros", "rt-multi-thread", "time", "fs", "io-util", "signal", "net"] } bytes = { version = "1.11.0", features = ["serde"] } serde = { version = "1.0.228", features = ["derive"] } -postcard = { version = "1.0", features = ["alloc"] } +postcard = { version = "1.0", features = ["alloc", "use-std"] } heapless = "0.9.2" tokio-macros = "2.6.0" aes-gcm = { version= "0.10.3", features = ["rand_core"] } diff --git a/core/src/net/ip.rs b/core/src/net/ip.rs new file mode 100644 index 0000000..6a46665 --- /dev/null +++ b/core/src/net/ip.rs @@ -0,0 +1,24 @@ +use thiserror::Error; + + +pub struct IPV4{ + addr: [u8; 4], + port: u16 +} + +#[derive(Error, Debug)] +pub enum AddressParseError { + #[error("Invalid address format")] + InvalidAddress, + #[error("Invalid port number")] + InvalidPort, + + #[error(transparent)] + ParseIntError(#[from] std::num::ParseIntError), +} + +implement IPV4 { + pub fn from_string(s: &str) -> Result { + + } +} \ No newline at end of file diff --git a/core/src/net/service.rs b/core/src/net/service.rs index f5b1238..28d2831 100644 --- a/core/src/net/service.rs +++ b/core/src/net/service.rs @@ -1,7 +1,7 @@ use tokio_util::sync::CancellationToken; use tokio::task::JoinError; use thiserror::Error; -use crate::{net::{session::NetSession,error::{NetError,CryptoError}},message::{IncomingMessage,OutgoingMessage},payload::Payload,peer::{PeerId},utils::{SerdeError,ChannelError}}; +use crate::{net::{session::NetSession,error::{NetError,CryptoError}},message::{IncomingMessage,OutgoingMessage},payload::Payload,peer::{Peer,PeerId},utils::{SerdeError,ChannelError}, transport::{TransportError}}; #[derive(Debug,Error)] pub enum NetServError { @@ -20,6 +20,9 @@ pub enum NetServError { #[error(transparent)] Join(#[from] JoinError), + #[error(transparent)] + Transport(#[from] TransportError), + #[error("client not found")] ClientNotFound, @@ -44,11 +47,12 @@ pub trait NetService{ // Add fully formed sessions to the service // Deal with handshakes prior to adding into service // Fail on adding a session if the peer already exists - fn add_session(&mut self, client: (PeerId,NetSession)) -> Result<(), Self::Error>; + fn add_session(&mut self, client: (Peer,NetSession)) -> impl Future> + Send; + // It is possible that you'd want multiple sessions between two peers. Currently this would be an error. + // If not changed now, it will be hard to fix in the future. - // Drop a session and return it - // Don't close it, hand it back to the caller to let them handle it - fn drop_session(&mut self, peer: &PeerId) -> Result; + // Close a session and drop it from the table + fn drop_session(&mut self, peer: &PeerId) -> Result<(), Self::Error>; // Listen for incoming messages from all peers fn listen(&self, token: CancellationToken) -> impl Future> + Send; @@ -59,5 +63,5 @@ pub trait NetService{ // Transmit messages to a specific session // OutgoingMessage has its own PeerID - fn transmit(&self, msg: OutgoingMessage, token: CancellationToken) -> impl Future> + Send; + fn transmit(&self, msg: Payload,target: Peer, token: CancellationToken) -> impl Future> + Send; } \ No newline at end of file diff --git a/core/src/payload/dht.rs b/core/src/payload/dht.rs index 384e9f8..140c65c 100644 --- a/core/src/payload/dht.rs +++ b/core/src/payload/dht.rs @@ -3,13 +3,13 @@ use serde::{Deserialize, Serialize}; use crate::{dht::CID, payload::{Query, QueryError, Reply, ReplyError, TryFromQuery, TryFromReply}}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq,Clone)] pub enum DhtQuery { Get(CID), Put(Bytes), } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq,Clone)] pub enum DhtReply { Return(Option), } diff --git a/core/src/payload/mod.rs b/core/src/payload/mod.rs index 72dc564..7de9fb4 100644 --- a/core/src/payload/mod.rs +++ b/core/src/payload/mod.rs @@ -9,20 +9,20 @@ pub use dht::*; use serde::{Deserialize, Serialize}; use thiserror::Error; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq,Clone)] pub enum Payload { Query(Query), Reply(Reply), } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq,Clone)] pub enum Query { Pow(PowQuery), Tag(TagQuery), Dht(DhtQuery), } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq,Clone)] pub enum Reply { Empty, Ok, @@ -46,7 +46,7 @@ pub enum ReplyError { pub trait TryFromQuery: TryFrom {} pub trait TryFromReply: TryFrom {} -#[derive(Serialize, Deserialize, Clone, Copy, Debug)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)] #[repr(u16)] pub enum Action { PublishTag = 1, diff --git a/core/src/payload/pow.rs b/core/src/payload/pow.rs index fc063f3..6f4d812 100644 --- a/core/src/payload/pow.rs +++ b/core/src/payload/pow.rs @@ -3,18 +3,18 @@ use thiserror::Error; use crate::{payload::{Action, Query, QueryError, Reply, ReplyError, TryFromQuery, TryFromReply}, pow::Pow}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq,Clone)] pub enum PowQuery { Get(Action), } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug,PartialEq,Clone)] pub enum PowReply { Require(Pow), Err(PowReplyErr), } -#[derive(Serialize, Deserialize, Debug, Error)] +#[derive(Serialize, Deserialize, Debug, Error, PartialEq,Clone)] pub enum PowReplyErr { #[error("Incorrect nonce")] IncorrectNonce, diff --git a/core/src/payload/tag.rs b/core/src/payload/tag.rs index b01f7ad..49e0673 100644 --- a/core/src/payload/tag.rs +++ b/core/src/payload/tag.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::{payload::{Query, QueryError, Reply, ReplyError, TryFromQuery, TryFromReply}, pow::Pow, tag::Tag}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq,Clone)] pub enum TagQuery { Get, Publish { @@ -12,7 +12,7 @@ pub enum TagQuery { }, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq,Clone)] pub enum TagReply { Return(Vec), } diff --git a/core/src/pow.rs b/core/src/pow.rs index f8b1a20..6ebe616 100644 --- a/core/src/pow.rs +++ b/core/src/pow.rs @@ -13,7 +13,7 @@ fn pow_input(secret: &[u8; 32], timestamp: u64, action: Action, random: &[u8; 16 hasher.finalize().into() } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq,Clone)] pub struct Pow { pub input: [u8; 32], pub timestamp: u64, diff --git a/core/src/tag/tag.rs b/core/src/tag/tag.rs index 601547c..38fb582 100644 --- a/core/src/tag/tag.rs +++ b/core/src/tag/tag.rs @@ -8,7 +8,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use crate::{VERSION, tag::TagError, utils::{self, deserialize, random_bytes, serialize}}; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct Tag { nonce: [u8; 12], content: Vec, diff --git a/core/src/transport/error.rs b/core/src/transport/error.rs index eecb6e1..893fe13 100644 --- a/core/src/transport/error.rs +++ b/core/src/transport/error.rs @@ -1,6 +1,7 @@ use thiserror::Error; +use tokio::task::JoinError; -use crate::{net::{CryptoError, NetError}, utils::{ChannelError, SerdeError}}; +use crate::{net::{CryptoError, NetError}, utils::{ChannelError, SerdeError},peer::{PeerId}}; #[derive(Debug, Error)] pub enum MockTransportError { @@ -24,4 +25,26 @@ pub enum MockTransportError { #[error("peer not found")] PeerNotFound, +} + +#[derive(Debug, Error)] +pub enum TransportError { + #[error("request was cancelled by token")] + Cancelled, + + // Consider returning peer ID as data in this error + #[error("session not found")] + SessionNotFound, + + #[error("peer already connected")] + PeerAlreadyConnected, + + #[error(transparent)] + Serialization(#[from] postcard::Error), + + #[error(transparent)] + IO(#[from] std::io::Error), + + #[error(transparent)] + Join(#[from] JoinError), } \ No newline at end of file diff --git a/core/src/transport/mod.rs b/core/src/transport/mod.rs index 1ae1f7a..7e1bd39 100644 --- a/core/src/transport/mod.rs +++ b/core/src/transport/mod.rs @@ -2,6 +2,7 @@ mod mock; mod error; mod controls; mod participant; +mod tcp; pub use mock::*; pub use error::*; diff --git a/core/src/transport/tcp.rs b/core/src/transport/tcp.rs new file mode 100644 index 0000000..e382490 --- /dev/null +++ b/core/src/transport/tcp.rs @@ -0,0 +1,334 @@ +use tokio_util::sync::CancellationToken; + +use tokio::net::{TcpStream, TcpListener, ToSocketAddrs}; + +use std::{collections::HashMap}; + +use postcard::{to_slice,from_bytes}; + +use crate::{net::{NetService, NetSession, NetClient},payload::{Payload}, transport::TransportError, peer::{Peer,PeerId}, message::{IncomingMessage,OutgoingMessage}}; + +const BUFSIZE: usize = 256; +pub struct TcpTransport{ + sessions: HashMap, + client: NetClient, + listener: TcpListener, +} + +impl TcpTransport { + pub async fn bind(client: NetClient, addr: T) -> Result { + // Create a listener on given IP + let listener = TcpListener::bind(addr).await?; + Ok(TcpTransport { + sessions: HashMap::new(), + client, + listener, + }) + } + +} + +impl NetService for TcpTransport { + type Error = TransportError; + + async fn add_session(&mut self, client: (Peer, NetSession)) -> Result<(), Self::Error> { + if self.sessions.contains_key(&client.0.id) { + return Err(TransportError::PeerAlreadyConnected) + } + let stream = TcpStream::connect(client.0.address).await?; + self.sessions.insert(client.0.id, (client.1, stream)); + Ok(()) + } + + fn drop_session(&mut self, peer: &PeerId) -> Result<(), Self::Error> { + self.sessions.remove(peer).ok_or(TransportError::SessionNotFound)?; + Ok(()) + } + + async fn listen(&self, token: CancellationToken) -> Result { + loop { + tokio::select!{ + _ = token.cancelled() => return Err(TransportError::Cancelled), + } + } + todo!(); + } + + async fn broadcast(&self, msg: Payload, token: CancellationToken) -> Result<(), Self::Error> { + loop { + tokio::select!{ + _ = token.cancelled() => return Err(TransportError::Cancelled), + } + } + todo!(); + } + + async fn transmit(&self, msg: Payload, target: Peer, token: CancellationToken) -> Result<(), Self::Error> { + todo!(); + // Need to implement encyrption using NetSession + // Need to get a new NetSession by doing a handshake with the target peer + // I have a NetClient. + // My NetClient can do an encryption handshake with a target NetIdentity + // Then I accept the handshake with my NetClient and get a NetSession + // NetSession can then encrypt and decrypt. + // But my target needs the same NetSession - how does it get that? + // If this is Diffie-Hellman then both sides have a NetClient, both sides send their own public key in the form of a NetIdentity. + // The handshake then creates a shared secret through DH magic + // The accept creates a net session? + + // So I need to: + // 1. Send my NetIdentity to the target peer + // 2. Await receiving a NetIdentity - error if it is not a valid NetIdentity + // 3. Create a NetSession using the received NetIdentity + // 1a. Do steps 1-3 also on the target peer as well + // 4. Encrypt the message using the NetSession + // 5. Send the encrypted message to the target peer + // 6. Decrypt the message using the NetSession + // 7. Process the message + tokio::select!{ + _ = token.cancelled() => return Err(TransportError::Cancelled), + r = async { + let stream = TcpStream::connect(target.address).await?; + let mut buf = [0u8; BUFSIZE]; + let data = to_slice(&msg, &mut buf)?; + loop { + stream.writable().await?; + + match stream.try_write(data) { + Ok(_) => break, + Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => panic!("Would block"), + Err(e) => return Err(e.into()), + } + } + return Ok(()); + } => return r, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{payload::{Query,TagQuery,Reply,Action}, tag::{Tag,TagPayload}, pow::{Pow}}; + use std::sync::mpsc::{channel,Sender}; + use tokio::time::timeout; + use std::{thread,time::{Duration}}; + use std::io::Read; + + // Timeout for async function calls + const TIMEOUT: Duration = Duration::from_millis(10); + // struct size is 200 + const BUFSIZE: usize = 256; + + // Create a generic transport for testing + async fn ephemeral_transport() -> TcpTransport { + TcpTransport::bind(NetClient::Ephemeral, "127.0.0.1:0").await.unwrap() + } + + #[tokio::test] + async fn test_bind() { + // This test needs to use concrete addresses (non port 0) to check it fails correctly + let client = NetClient::Ephemeral; + let addr = "127.0.0.1:80"; + let transport = TcpTransport::bind(client, addr).await; + + let static_client = NetClient::from_seed([1u8; 32]); + let addr = "127.0.0.1:8080"; + let transport_2 = TcpTransport::bind(static_client, addr).await; + + let client = NetClient::from_seed([1u8; 32]); + let transport_3 = TcpTransport::bind(client, addr).await; + + + assert!(transport.is_ok(), "Bind should succeed with Ephemeral client but got: {}",transport.err().unwrap()); + assert!(transport_2.is_ok(), "Bind should succeed with Static client but got: {}",transport_2.err().unwrap()); + assert!(transport_3.is_err(), "Bind should fail with duplicate address but got success"); + match transport_3.err().unwrap() { + TransportError::IO(_) => {}, + err @ _ => panic!("Expected PeerAlreadyConnected error, got: {}", err) + } + } + + #[tokio::test] + async fn test_add_session() { + let mut transport = ephemeral_transport().await; + let client = NetClient::from_seed([1u8;32]); + + + // Start a listener and move it into a thread after getting the port assigned by the OS. + // Consider making the thread cancellable. + // Consider making the listener more flexible as a double. + let srv = TcpListener::bind("127.0.0.1:0").await.expect("Failed to start test server"); + let addr = srv.local_addr().expect("Failed to get local address of thread"); + tokio::spawn(async move { + srv.accept().await + }); + + // First add should succeed - unique Peer ID + let peer = Peer::new(client.identity().expect("Expect static identity"), addr.to_string()); + let session = NetSession::new([0u8;32],0u64); + let expect_ok = transport.add_session((peer, session)).await; + assert!(expect_ok.is_ok(),"Failed to add session: {}",expect_ok.err().unwrap()); + assert_eq!(transport.sessions.len(), 1); + + // Creating a new Peer with the same peer ID should fail when added. + // See comment in NetService trait about whether this behaviour should be changed. + let peer = Peer::new(client.identity().expect("Expect static identity"), addr.to_string()); + let session = NetSession::new([1u8;32],1u64); + let expect_fail = transport.add_session((peer,session)).await; + assert!(expect_fail.is_err(),"Expected failure adding second entry with same peer ID"); + } + + #[tokio::test] + async fn test_drop_session() { + let srv = TcpListener::bind("127.0.0.1:0").await.expect("Failed to start test server"); + let addr = srv.local_addr().expect("Failed to get local address of thread"); + // sessions: HashMap, + + // Create a transport client with 3 sessions + let mut transport = ephemeral_transport().await; + let mut ids = vec!(); + for i in 0..3 { + let client = NetClient::from_seed([i as u8;32]); + let net_session = NetSession::new([i as u8;32],i as u64); + let stream = TcpStream::connect(addr).await.expect("Failed to connect to test server on {addr}"); + let peer = Peer::new(client.identity().expect("Expect static identity"), addr.to_string()); + transport.sessions.insert(peer.id,(net_session,stream)); + ids.push(peer.id); + } + + assert_eq!(transport.sessions.len(),3,"Expected 3 elements in starting transport"); + + for i in (0..3).rev() { + let expect_ok = transport.drop_session(&ids.pop().unwrap()); + assert!(expect_ok.is_ok()); + assert_eq!(transport.sessions.len(),i,"Failed to remove session {i}"); + } + + let client = NetClient::from_seed([5 as u8;32]); + let peer = Peer::new(client.identity().expect("Expect static identity"), addr.to_string()); + let expect_err = transport.drop_session(&peer.id); + + assert!(expect_err.is_err(),"Expected failure removing non-existent session"); + + } + + #[tokio::test] + async fn test_transmit(){ + // Start a listener and move it into a thread after getting the port assigned by the OS. + // Send a channel into the thread to read success/failure of expected values + // Give it a handler function checking expected results and sending true/false over a channel + + let client = NetClient::from_seed([1u8;32]); + let (send,results) = channel::(); + let expect_messages = sample_messages(); + let send_messages = expect_messages.clone(); + + let srv = std::net::TcpListener::bind("127.0.0.1:0").expect("Failed to start test server"); + let addr = srv.local_addr().expect("Failed to get local address of thread"); + + std::thread::spawn(move || { + for (stream,msg) in srv.incoming().zip(expect_messages.into_iter()) { + expect_message_tcp( + stream.expect("Failed to get incoming stream"), + msg, + send.clone()); + } + }); + + let transport = ephemeral_transport().await; + let peer = Peer::new(client.identity().expect("Expect static identity"), addr.to_string()); + for (i,msg) in send_messages.into_iter().enumerate(){ + let cncl = CancellationToken::new(); + let res = timeout( + TIMEOUT, + transport.transmit(msg,peer.clone(), cncl) + ).await; + assert!(res.is_ok(),"Failed to transmit message {}: {}",i,res.err().unwrap()); + let got_msg = results.recv().expect("Failed to check receive result"); + assert!(got_msg,"Message {} did not match expected",i); + } + + } + + #[tokio::test] + async fn test_listen(){ + let mut transport = ephemeral_transport(); + let client = NetClient::from_seed([1u8;32]); + + + // Start a server that will transmit messages + let srv = TcpListener::bind("127.0.0.1:0").await.expect("Failed to start test server"); + let addr = srv.local_addr().expect("Failed to get local address of thread"); + thread::spawn(async move || { + match srv.accept().await { + Ok(_) => {}, + Err(e) => panic!("Failed to accept test server request: {}",e), + } + }); + todo!(); + } + + // Helper to test receiving messages over TCP + fn expect_message_tcp(mut stream: std::net::TcpStream, expect_message: Payload, results: Sender) { + 'receive: loop { + let mut buf = [0u8;BUFSIZE]; + match stream.read(&mut buf) { + Ok(0) => break, + Ok(_) => {}, + Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => { + continue; + }, + Err(e) => { + eprintln!("Error reading message: {}", e); + results.send(false).expect("Failed to send test failure due to TCP read"); + return + } + } + + match from_bytes::(&buf) { + Ok(msg) => { + if msg == expect_message { + // Golden path + results.send(true).expect("Failed to send test success"); + } else { + // Bad deserialization + eprintln!("Bad message. Got: \n{msg:?}\n expected: \n{expect_message:?}"); + results.send(false).expect("Failed to send test failure due to deserialization"); + break; + } + }, + Err(e) => { + match e { + postcard::Error::DeserializeUnexpectedEnd => { + // Loop again to get more data + continue 'receive; + }, + _ => { + results.send(false).expect("Failed to send test failure"); + eprintln!("Error reading message: {}", e); + break; + } + } + } + } + } + } + + // Helper to generate sample messages for testing + fn sample_messages() -> Vec { + let first_message = Payload::Query(Query::Tag(TagQuery::Get)); + let second_message = Payload::Reply(Reply::Ok); + let (test_tag, test_pow) = {( + Tag::new(&[7u8;32],TagPayload{data:vec!()}).expect("Failed to generate test tag"), + Pow::new(&[4u8;32],Action::PublishTag, 213u8) + )}; + let third_message = Payload::Query(Query::Tag(TagQuery::Publish{tag: test_tag, pow: test_pow, nonce: 17u64})); + vec!( + second_message, + first_message, + third_message + ) + } + +} \ No newline at end of file From a116571082000550589599c527fe7f9dcff85fb7 Mon Sep 17 00:00:00 2001 From: Mike Young Date: Sun, 15 Mar 2026 04:58:03 +0000 Subject: [PATCH 3/4] Spawn independent tasks when adding sessions - allowing listen to pick off a channel --- core/Cargo.toml | 2 +- core/src/net/service.rs | 2 +- core/src/transport/error.rs | 25 +++- core/src/transport/tcp.rs | 264 ++++++++++++++++++++++++------------ 4 files changed, 200 insertions(+), 93 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index ec89c04..7b9f42f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -37,7 +37,7 @@ postcard = { version = "1.0", features = ["alloc", "use-std"] } heapless = "0.9.2" tokio-macros = "2.6.0" aes-gcm = { version= "0.10.3", features = ["rand_core"] } -tokio-util = "0.7.17" +tokio-util = { version = "0.7.17", features = ["join-map", "rt"] } thiserror = "2.0.17" serde-big-array = "0.5.1" tokio-stream = "0.1.18" diff --git a/core/src/net/service.rs b/core/src/net/service.rs index 1285963..ccb2354 100644 --- a/core/src/net/service.rs +++ b/core/src/net/service.rs @@ -52,7 +52,7 @@ pub trait NetService{ // If not changed now, it will be hard to fix in the future. // Close a session and drop it from the table - fn drop_session(&mut self, peer: &PeerId) -> Result<(), Self::Error>; + fn drop_session(&mut self, peer: &PeerId) -> impl Future> + Send; // Listen for incoming messages from all peers fn listen(&mut self, token: CancellationToken) -> impl Future> + Send; diff --git a/core/src/transport/error.rs b/core/src/transport/error.rs index c646c32..94fbbbd 100644 --- a/core/src/transport/error.rs +++ b/core/src/transport/error.rs @@ -1,7 +1,7 @@ use thiserror::Error; -use tokio::task::JoinError; +use tokio::{task::JoinError,sync::mpsc::error::{SendError},net::tcp::OwnedReadHalf}; -use crate::{net::{SessionManagerDispatcherError,CryptoError,NetError}, utils::ChannelError}; +use crate::{net::{SessionManagerDispatcherError,CryptoError,NetError,Message}, utils::ChannelError,peer::{PeerId}}; #[derive(Debug, Error)] pub enum MockTransportError { @@ -20,26 +20,41 @@ pub enum MockTransportError { #[derive(Debug, Error)] pub enum TransportError { + // Make PeerId optional so that callers can add information without having to pass PeerId down to every function + #[error("session's read task has been cancelled")] + ReadCancelled(OwnedReadHalf,Option), + #[error("request was cancelled by token")] Cancelled, // Consider returning peer ID as data in this error #[error("session not found")] - SessionNotFound, + SessionNotFound(Option), + + // Consider returning peer ID as data in this error + #[error("connection not found in active map")] + ConnectionNotInMap(Option), // Consider returning peer ID as data in this error #[error("no sessions available")] NoSessions, + // Consider returning peer ID as data in this error + #[error("message channel has been closed")] + MessageChannelClosed, + #[error("peer already connected")] - PeerAlreadyConnected, + PeerAlreadyConnected(Option), #[error("connection closed")] - ConnectionClosed, + ConnectionClosed(Option), #[error(transparent)] Serialization(#[from] postcard::Error), + #[error(transparent)] + Reading(#[from] SendError::<(PeerId,Message)>), + #[error(transparent)] IO(#[from] std::io::Error), diff --git a/core/src/transport/tcp.rs b/core/src/transport/tcp.rs index b4672a8..7e87efb 100644 --- a/core/src/transport/tcp.rs +++ b/core/src/transport/tcp.rs @@ -1,17 +1,23 @@ -use tokio_util::sync::CancellationToken; +use tokio_util::{sync::CancellationToken,task::JoinMap}; -use tokio::{net::{TcpStream, TcpListener, ToSocketAddrs}}; +use tokio::{net::{TcpStream, TcpListener, ToSocketAddrs, tcp::{OwnedReadHalf, OwnedWriteHalf}}, sync::{mpsc::{channel,Sender, Receiver}}, io::{AsyncReadExt}}; use std::{collections::HashMap}; -use postcard::{to_slice,from_bytes}; +use postcard::{to_stdvec,take_from_bytes,from_bytes}; use crate::{net::{ActiveSession, PendingSession, NetClient,NetService,Message},payload::{Payload}, transport::TransportError, peer::{Peer,PeerId}, message::{IncomingMessage,OutgoingMessage}}; +const CHANNELSIZE: usize = 128; +// Max message len current 226 const BUFSIZE: usize = 256; pub struct TcpTransport{ + incoming_messages: Receiver<(PeerId,Message)>, + message_sender: Sender<(PeerId, Message)>, + active_conns: JoinMap>, + cancel_tokens: HashMap, sessions: HashMap, - streams: HashMap, + write_streams: HashMap, client: NetClient, listener: TcpListener, // Used to establish a new session - not in trait currently - todo } @@ -20,83 +26,160 @@ impl TcpTransport { pub async fn bind(client: NetClient, addr: T) -> Result { // Create a listener on given IP let listener = TcpListener::bind(addr).await?; + let (sender,receiver) = channel(CHANNELSIZE); Ok(TcpTransport { + incoming_messages: receiver, + message_sender: sender, + active_conns: JoinMap::new(), + cancel_tokens: HashMap::new(), sessions: HashMap::new(), - streams: HashMap::new(), + write_streams: HashMap::new(), client, listener, }) } } +async fn read_from_stream(stream: &mut OwnedReadHalf) -> Result, TransportError> { + let res = loop { + stream.readable().await?; + let mut buf = Vec::with_capacity(BUFSIZE); + let res = stream.read_buf(&mut buf).await; + match res { + Ok(0) => { + return Err(TransportError::ConnectionClosed(None)); + }, + Ok(_) => { + break Ok(buf); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + Err(e) => return Err(e.into()), + } + }; + res +} + +async fn read_message(stream: &mut OwnedReadHalf,id: PeerId, results: &Sender::<(PeerId,Message)>) -> Result<(), TransportError> { + let mut msg_buf = vec!(); + loop { + let data = read_from_stream(stream).await.map_err(|e| { + match e { + // Add peer Id info + TransportError::ConnectionClosed(None) => { + TransportError::ConnectionClosed(Some(id)) + }, + _ => e, + } + })?; + msg_buf.extend_from_slice(&data); + let rem = match take_from_bytes::(&msg_buf) { + Ok((msg,rem)) => { + results.send((id,msg)).await?; + rem.to_owned() + }, + Err(e) => { + match e { + postcard::Error::DeserializeUnexpectedEnd => { + // Buffer not large enough - continue reading + continue + }, + _ => return Err(e.into()), + } + } + }; + msg_buf = rem; + } +} impl NetService for TcpTransport { type Error = TransportError; async fn add_session(&mut self, client: (Peer, ActiveSession)) -> Result<(), Self::Error> { if self.sessions.contains_key(&client.0.id) { - return Err(TransportError::PeerAlreadyConnected) + return Err(TransportError::PeerAlreadyConnected(Some(client.0.id))); } - let stream = TcpStream::connect(client.0.address).await?; + let (mut reader,writer) = TcpStream::connect(client.0.address).await?.into_split(); self.sessions.insert(client.0.id, client.1); - self.streams.insert(client.0.id,stream); + self.write_streams.insert(client.0.id,writer); + + let cncl = CancellationToken::new(); + let cncl_task = cncl.clone(); + let send = self.message_sender.clone(); + self.active_conns.spawn(client.0.id,async move { + // Use cancellation token instead of handle abortion to retrieve the OwnedReadHalf on cancel + loop { + tokio::select!{ + _ = cncl_task.cancelled() => return Err(TransportError::ReadCancelled(reader,Some(client.0.id))), + out = read_message(&mut reader,client.0.id,&send) => { + match out { + Ok(_) => {}, + Err(e) => { + return Err(e); + } + } + }, + } + } + }); + self.cancel_tokens.insert(client.0.id, cncl); Ok(()) } - fn drop_session(&mut self, peer: &PeerId) -> Result<(), Self::Error> { - self.sessions.remove(peer).ok_or(TransportError::SessionNotFound)?; - self.streams.remove(peer).ok_or(TransportError::SessionNotFound)?; + async fn drop_session(&mut self, peer: &PeerId) -> Result<(), Self::Error> { + self.sessions.remove(peer).ok_or(TransportError::SessionNotFound(Some(*peer)))?; + self.write_streams.remove(peer).ok_or(TransportError::SessionNotFound(Some(*peer)))?; + let aborted = self.active_conns.abort(peer); + if !aborted { + return Err(TransportError::ConnectionNotInMap(Some(*peer))) + }; + self.cancel_tokens.remove(peer); Ok(()) } async fn listen(&mut self, token: CancellationToken) -> Result { - let mut tasks= vec!(); - for (id,stream) in self.streams.iter() { - let fut = Box::pin(async { - // let mut session = session_mutex.lock().await; - let res = loop { - stream.readable().await?; - let mut buf = [0u8; BUFSIZE]; - match stream.try_read(&mut buf) { - Ok(0) => break Err(TransportError::ConnectionClosed), - Ok(_) => { - break Ok((*id,buf)); - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, - Err(e) => return Err::<(PeerId,[u8;BUFSIZE]),TransportError>(e.into()), - } - }; - res - }); - tasks.push(fut); - }; - let message = loop { - let futs = futures::future::select_all(tasks); + loop { tokio::select!{ - _ = token.cancelled() => return Err(TransportError::Cancelled), - (res,_,tasks_rem) = futs => { - tasks = tasks_rem; - match res { - Ok((id,buf)) => { - let session = self.sessions.get_mut(&id).ok_or(TransportError::SessionNotFound)?; - let encrypted = from_bytes::(&buf)?; + _ = token.cancelled() => { return Err(TransportError::Cancelled); }, + Some((id,res)) = self.active_conns.join_next() => { + match res { + Ok(r) => { + match r { + Ok(_) => {}, // Session ended gracefully, ignore it + Err(e) => { + match self.drop_session(&id).await { + Ok(_) => {}, + Err(e) => { + match e { + // join_next removes the connection from the map + // drop_session would return an error that we can ignore + // We still want to know if other parts of drop_session fail + TransportError::ConnectionNotInMap(_) => {}, + _ => return Err(e) + } + } + } + return Err(e); + } + } + }, + // We don't want to stop listening if the connection had previously been cancelled and is hanging around + Err(e) if e.is_cancelled() => continue, + Err(e) => return Err(e.into()) + } + } + msg = self.incoming_messages.recv() => { + if let Some((id,encrypted)) = msg { + let session = self.sessions.get_mut(&id).ok_or(TransportError::SessionNotFound(Some(id)))?; let decrypted = session.receive(encrypted)?; - let received = from_bytes::(&decrypted)?; - break Ok(IncomingMessage::receive(id,received)) - }, - Err(e) => { - match e { - TransportError::ConnectionClosed => { - continue; - }, - e => break Err(e), - } + let outgoing = from_bytes::(&decrypted)?; + return Ok(IncomingMessage::receive(id, outgoing)) + } else { + // Channel has been closed + return Err(TransportError::MessageChannelClosed) } } } - }; - }; - message + } } async fn broadcast(&mut self, msg: Payload, token: CancellationToken) -> Result<(), Self::Error> { @@ -109,30 +192,26 @@ impl NetService for TcpTransport { } async fn transmit(&mut self, msg: Payload, target: PeerId, token: CancellationToken) -> Result<(), Self::Error> { - let session = self.sessions.get_mut(&target).ok_or(TransportError::SessionNotFound)?; - let stream = self.streams.get(&target).ok_or(TransportError::SessionNotFound)?; + let session = self.sessions.get_mut(&target).ok_or(TransportError::SessionNotFound(Some(target)))?; + let stream = self.write_streams.get(&target).ok_or(TransportError::SessionNotFound(Some(target)))?; let res = tokio::select!{ _ = token.cancelled() => { Err(TransportError::Cancelled) }, _ = async { - let mut buf = [0u8; BUFSIZE]; // Serialize once to get a format that can be encrypted - let data = to_slice(&msg, &mut buf)?; - let encrypted_data = session.send(data)?; - // Empty buffer - Message is concrete and owned so we no longer need to keep the old buffer around - let mut buf = [0u8; BUFSIZE]; + let data = to_stdvec(&msg)?; + let encrypted_data = session.send(&data)?; // Serialize again to get a sendable stream - let serialized_data = to_slice(&encrypted_data, &mut buf)?; + let serialized_data = to_stdvec(&encrypted_data)?; loop { stream.writable().await?; - match stream.try_write(serialized_data) { + match stream.try_write(&serialized_data) { Ok(_) => break, Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => continue, Err(e) => return Err(e.into()), }; - } Ok::<(),TransportError>(()) } => Ok(()), @@ -200,8 +279,6 @@ mod test { // Start a listener and move it into a thread after getting the port assigned by the OS. - // Consider making the thread cancellable. - // Consider making the listener more flexible as a double. let srv = TcpListener::bind("127.0.0.1:0").await.expect("Failed to start test server"); let addr = srv.local_addr().expect("Failed to get local address of thread"); tokio::spawn(async move { @@ -229,35 +306,41 @@ mod test { async fn test_drop_session() { let srv = TcpListener::bind("127.0.0.1:0").await.expect("Failed to start test server"); let addr = srv.local_addr().expect("Failed to get local address of thread"); - // sessions: HashMap, // Create a transport client with 3 sessions let mut transport = ephemeral_transport().await; let mut ids = vec!(); for i in 0..3 { let client = NetClient::from_seed([i as u8;32]); + let peer = Peer::new(client.identity().expect("Expect static identity"), addr.to_string()); + let pend_session = PendingSession::new([i as u8;32],Some(i as u64)); let net_session = pend_session.activate(None).expect("Failed to active test session"); - let stream = TcpStream::connect(addr).await.expect("Failed to connect to test server on {addr}"); - let peer = Peer::new(client.identity().expect("Expect static identity"), addr.to_string()); transport.sessions.insert(peer.id,net_session); - transport.streams.insert(peer.id,stream); + + let (_,write_stream) = TcpStream::connect(addr).await.expect("Failed to connect to test server on {addr}").into_split(); + transport.write_streams.insert(peer.id,write_stream); + transport.active_conns.spawn(peer.id, async {Ok(())}); + transport.cancel_tokens.insert(peer.id, CancellationToken::new()); + + ids.push(peer.id); } assert_eq!(transport.sessions.len(),3,"Expected 3 elements in starting transport"); - assert_eq!(transport.streams.len(),3,"Expected 3 elements in starting transport"); + assert_eq!(transport.write_streams.len(),3,"Expected 3 elements in starting transport"); + assert_eq!(transport.active_conns.len(),3,"Expected 3 elements in starting transport"); for i in (0..3).rev() { - let expect_ok = transport.drop_session(&ids.pop().unwrap()); + let expect_ok = transport.drop_session(&ids.pop().unwrap()).await; assert!(expect_ok.is_ok()); - assert_eq!(transport.sessions.len(),i,"Failed to remove session {i}"); - assert_eq!(transport.streams.len(),i,"Failed to remove session {i}"); + assert_eq!(transport.sessions.len(),i,"Failed to remove session {i} from sessions"); + assert_eq!(transport.write_streams.len(),i,"Failed to remove session {i} from write_streams"); } let client = NetClient::from_seed([5 as u8;32]); let peer = Peer::new(client.identity().expect("Expect static identity"), addr.to_string()); - let expect_err = transport.drop_session(&peer.id); + let expect_err = transport.drop_session(&peer.id).await; assert!(expect_err.is_err(),"Expected failure removing non-existent session"); @@ -320,7 +403,9 @@ mod test { let mut expect_messages = vec!(); // Set up test clients to listen to - send a different message from each one - for (i,msg) in sample_messages().into_iter().enumerate() { + let sample_messages = sample_messages(); + let num_messages = sample_messages.len(); + for (i,msg) in sample_messages.into_iter().enumerate() { let (mut sender_session,receiver_session) = gen_shared_sessions([i as u8;32],i as u64 + 12); let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("Failed to start test server"); @@ -328,7 +413,7 @@ mod test { let client = NetClient::from_seed([i as u8;32]); let peer = Peer::new(client.identity().expect("Expect static ID"), addr.to_string()); - + expect_messages.push( IncomingMessage{ from: peer.id.clone(), @@ -344,24 +429,27 @@ mod test { }; transport.add_session((peer,receiver_session)).await.expect("Failed to add test session to transport"); + let handle = std::thread::spawn(move || { let (mut stream, _) = listener.accept().expect("Failed to accept incoming stream"); - let mut buf = [0; BUFSIZE]; - let msg_bytes = to_slice(&send_message, &mut buf).expect("Failed to serialize message"); + let msg_bytes = to_stdvec(&send_message).expect("Failed to serialize message"); let encrypt = sender_session.send(&msg_bytes).expect("Failed to encrypt message"); - let mut buf = [0; BUFSIZE]; - let serialized = to_slice(&encrypt, &mut buf).expect("Failed to serialize encrypted message"); - stream.write_all(serialized).expect("Failed to write to stream"); + let serialized = to_stdvec(&encrypt).expect("Failed to serialize encrypted message"); + stream.write_all(&serialized).expect("Failed to write to stream"); + stream }); threads.push(handle); + + } - + + let mut streams = vec!(); for thread in threads { - thread.join().expect("Failed to join thread"); + streams.push(thread.join().expect("Failed to join thread")); } let mut got_messages = vec!(); - for _ in 0..3 { + for _ in 0..num_messages { let cncl = CancellationToken::new(); let msg = transport.listen(cncl).await.expect("Failed to listen for messages"); got_messages.push(msg); @@ -371,6 +459,8 @@ mod test { assert!(got_messages.contains(&msg), "Received message did not match expected" ); } + drop(streams); + } #[tokio::test] @@ -528,10 +618,12 @@ mod test { Pow::new(&[4u8;32],Action::PublishTag, 213u8) )}; let third_message = Payload::Query(Query::Tag(TagQuery::Publish{tag: test_tag, pow: test_pow, nonce: 17u64})); + let fourth_message = Payload::Reply(Reply::Ok); vec!( - second_message, first_message, - third_message + second_message, + third_message, + fourth_message ) } From 232baa61a74453dec503d2e6f11ad52ce660b7ea Mon Sep 17 00:00:00 2001 From: Mike Young Date: Sun, 22 Mar 2026 04:14:12 +0000 Subject: [PATCH 4/4] High ports for test. Replace Vec with bytes::BufMut --- core/src/transport/tcp.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/transport/tcp.rs b/core/src/transport/tcp.rs index 7e87efb..f9d0b09 100644 --- a/core/src/transport/tcp.rs +++ b/core/src/transport/tcp.rs @@ -4,6 +4,8 @@ use tokio::{net::{TcpStream, TcpListener, ToSocketAddrs, tcp::{OwnedReadHalf, Ow use std::{collections::HashMap}; +use bytes::BytesMut; + use postcard::{to_stdvec,take_from_bytes,from_bytes}; use crate::{net::{ActiveSession, PendingSession, NetClient,NetService,Message},payload::{Payload}, transport::TransportError, peer::{Peer,PeerId}, message::{IncomingMessage,OutgoingMessage}}; @@ -40,10 +42,10 @@ impl TcpTransport { } } -async fn read_from_stream(stream: &mut OwnedReadHalf) -> Result, TransportError> { +async fn read_from_stream(stream: &mut OwnedReadHalf) -> Result { let res = loop { stream.readable().await?; - let mut buf = Vec::with_capacity(BUFSIZE); + let mut buf = BytesMut::with_capacity(BUFSIZE); let res = stream.read_buf(&mut buf).await; match res { Ok(0) => { @@ -60,7 +62,7 @@ async fn read_from_stream(stream: &mut OwnedReadHalf) -> Result, Transpo } async fn read_message(stream: &mut OwnedReadHalf,id: PeerId, results: &Sender::<(PeerId,Message)>) -> Result<(), TransportError> { - let mut msg_buf = vec!(); + let mut msg_buf = BytesMut::with_capacity(BUFSIZE); loop { let data = read_from_stream(stream).await.map_err(|e| { match e { @@ -75,7 +77,7 @@ async fn read_message(stream: &mut OwnedReadHalf,id: PeerId, results: &Sender::< let rem = match take_from_bytes::(&msg_buf) { Ok((msg,rem)) => { results.send((id,msg)).await?; - rem.to_owned() + BytesMut::from(rem) }, Err(e) => { match e { @@ -87,6 +89,8 @@ async fn read_message(stream: &mut OwnedReadHalf,id: PeerId, results: &Sender::< } } }; + // take_from_bytes cannot mutate data so msg_buf will still contain the used + remaining bytes + // replace it with the remaining bytes msg_buf = rem; } } @@ -252,11 +256,11 @@ mod test { async fn test_bind() { // This test needs to use concrete addresses (non port 0) to check it fails correctly let client = NetClient::Ephemeral; - let addr = "127.0.0.1:80"; + let addr = "127.0.0.1:30000"; let transport = TcpTransport::bind(client, addr).await; let static_client = NetClient::from_seed([1u8; 32]); - let addr = "127.0.0.1:8080"; + let addr = "127.0.0.1:30030"; let transport_2 = TcpTransport::bind(static_client, addr).await; let client = NetClient::from_seed([1u8; 32]);