diff --git a/Cargo.lock b/Cargo.lock index adb7ff3bd..a45da0651 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2039,6 +2039,7 @@ version = "0.1.0" dependencies = [ "async-stream", "eyre", + "fastn-context", "fastn-id52", "fastn-net", "fastn-p2p-macros", diff --git a/v0.5/Cargo.lock b/v0.5/Cargo.lock index d117d3e11..cb15cc272 100644 --- a/v0.5/Cargo.lock +++ b/v0.5/Cargo.lock @@ -1974,6 +1974,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "fastn-context" +version = "0.1.0" +dependencies = [ + "fastn-context-macros", + "tokio", + "tokio-util", +] + +[[package]] +name = "fastn-context-macros" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "fastn-continuation" version = "0.1.0" @@ -2117,6 +2135,7 @@ dependencies = [ "async-stream", "enum-display-derive", "eyre", + "fastn-context", "fastn-id52", "fastn-net", "fastn-p2p-macros", diff --git a/v0.5/fastn-p2p/Cargo.toml b/v0.5/fastn-p2p/Cargo.toml index 5fdadb24f..ef421515c 100644 --- a/v0.5/fastn-p2p/Cargo.toml +++ b/v0.5/fastn-p2p/Cargo.toml @@ -21,6 +21,9 @@ tokio.workspace = true tokio-util.workspace = true tracing.workspace = true +# Context integration +fastn-context = { path = "../../fastn-context" } + # Re-export proc macros from separate crate fastn-p2p-macros = { path = "../fastn-p2p-macros", version = "0.1.0" } diff --git a/v0.5/fastn-p2p/src/client.rs b/v0.5/fastn-p2p/src/client.rs index 1d15c8652..d8044bbbd 100644 --- a/v0.5/fastn-p2p/src/client.rs +++ b/v0.5/fastn-p2p/src/client.rs @@ -1,57 +1,95 @@ -/// Error type for call function +/// Client-side P2P communication +/// +/// This module provides client APIs for establishing both simple request/response +/// connections and complex streaming sessions with remote P2P endpoints. + +/// Simple request/response communication (existing functionality) +pub async fn call( + our_key: fastn_id52::SecretKey, + target: fastn_id52::PublicKey, + protocol: PROTOCOL, + request: REQUEST +) -> Result, CallError> +where + PROTOCOL: serde::Serialize + for<'de> serde::Deserialize<'de> + std::fmt::Debug, + REQUEST: serde::Serialize + for<'de> serde::Deserialize<'de>, + RESPONSE: serde::Serialize + for<'de> serde::Deserialize<'de>, + ERROR: serde::Serialize + for<'de> serde::Deserialize<'de>, +{ + // TODO: Implement using existing fastn-p2p call infrastructure + todo!("Client call to {target} with protocol {protocol:?}") +} + +/// Establish streaming P2P session (new functionality) +pub async fn connect( + our_key: fastn_id52::SecretKey, + target: fastn_id52::PublicKey, + protocol: PROTOCOL +) -> Result +where + PROTOCOL: serde::Serialize + for<'de> serde::Deserialize<'de> + std::fmt::Debug, +{ + // TODO: Implement streaming connection establishment + todo!("Connect to {target} with protocol {protocol:?} using {}", our_key.id52()) +} + +/// Client-side streaming session +pub struct Session { + /// Input stream to server + pub stdin: iroh::endpoint::SendStream, + /// Output stream from server + pub stdout: iroh::endpoint::RecvStream, + // TODO: Add context integration + // context: std::sync::Arc, +} + +impl Session { + /// Accept unidirectional stream back from server (e.g., stderr) + pub async fn accept_uni(&mut self) -> Result { + // TODO: Accept incoming unidirectional stream from server + todo!("Accept unidirectional stream from server") + } + + /// Accept bidirectional stream back from server + pub async fn accept_bi(&mut self) -> Result<(iroh::endpoint::RecvStream, iroh::endpoint::SendStream), ConnectionError> { + // TODO: Accept incoming bidirectional stream from server + todo!("Accept bidirectional stream from server") + } +} + +/// Errors for client operations #[derive(Debug, thiserror::Error)] pub enum CallError { - #[error("Failed to establish P2P stream: {source}")] + #[error("Connection failed: {source}")] + Connection { source: eyre::Error }, + + #[error("Request/response error: {source}")] + RequestResponse { source: eyre::Error }, + + #[error("Serialization error: {source}")] + Serialization { source: serde_json::Error }, + + #[error("Endpoint error: {source}")] Endpoint { source: eyre::Error }, - - #[error("Failed to establish P2P stream: {source}")] + + #[error("Stream error: {source}")] Stream { source: eyre::Error }, - - #[error("Failed to serialize request: {source}")] - Serialization { source: serde_json::Error }, - - #[error("Failed to send request: {source}")] + + #[error("Send error: {source}")] Send { source: eyre::Error }, - - #[error("Failed to receive response: {source}")] + + #[error("Receive error: {source}")] Receive { source: eyre::Error }, - - #[error("Failed to deserialize response: {source}")] + + #[error("Deserialization error: {source}")] Deserialization { source: serde_json::Error }, } -/// Make a P2P call using global singletons -/// -/// This is the main function end users should use. It automatically uses -/// the global connection pool and graceful shutdown coordinator. -/// -/// # Example -/// -/// ```rust,ignore -/// let result: Result = fastn_p2p::call( -/// secret_key, &target, protocol, request -/// ).await?; -/// ``` -pub async fn call( - sender: fastn_id52::SecretKey, - target: &fastn_id52::PublicKey, - protocol: P, - input: INPUT, -) -> Result, CallError> -where - P: serde::Serialize - + for<'de> serde::Deserialize<'de> - + Clone - + PartialEq - + std::fmt::Display - + std::fmt::Debug - + Send - + Sync - + 'static, - INPUT: serde::Serialize, - OUTPUT: for<'de> serde::Deserialize<'de>, - ERROR: for<'de> serde::Deserialize<'de>, -{ - // Delegate to coordination module which has strict singleton access control - crate::coordination::internal_call(sender, target, protocol, input).await -} +#[derive(Debug, thiserror::Error)] +pub enum ConnectionError { + #[error("Failed to establish streaming connection: {source}")] + Connection { source: eyre::Error }, + + #[error("Stream error: {source}")] + Stream { source: eyre::Error }, +} \ No newline at end of file diff --git a/v0.5/fastn-p2p/src/lib.rs b/v0.5/fastn-p2p/src/lib.rs index a0940be15..01bcecfdb 100644 --- a/v0.5/fastn-p2p/src/lib.rs +++ b/v0.5/fastn-p2p/src/lib.rs @@ -31,11 +31,13 @@ extern crate self as fastn_p2p; -mod client; mod coordination; mod globals; mod macros; -mod server; + +// Export client and server modules (new modular API) +pub mod client; +pub mod server; // Re-export essential types from fastn-net that users need pub use fastn_net::{Graceful, Protocol}; @@ -48,12 +50,10 @@ pub use fastn_p2p_macros::main; pub use coordination::{cancelled, shutdown, spawn}; pub use globals::{graceful, pool}; -// Client API - clean, simple naming (only expose simple version) +// Legacy top-level exports (for backward compatibility) pub use client::{CallError, call}; - -// Server API - clean, simple naming pub use server::{ GetInputError, HandleRequestError, ListenerAlreadyActiveError, ListenerNotFoundError, Request, ResponseHandle, SendError, active_listener_count, active_listeners, is_listening, listen, - stop_listening, + stop_listening, Session, }; diff --git a/v0.5/fastn-p2p/src/server/mod.rs b/v0.5/fastn-p2p/src/server/mod.rs index c882759b7..2d481dc32 100644 --- a/v0.5/fastn-p2p/src/server/mod.rs +++ b/v0.5/fastn-p2p/src/server/mod.rs @@ -6,6 +6,7 @@ pub mod handle; pub mod listener; pub mod management; pub mod request; +pub mod session; // Public API exports - no use statements, direct qualification pub use handle::{ResponseHandle, SendError}; @@ -15,3 +16,4 @@ pub use management::{ is_listening, stop_listening, }; pub use request::{GetInputError, HandleRequestError, Request}; +pub use session::Session; diff --git a/v0.5/fastn-p2p/src/server/session.rs b/v0.5/fastn-p2p/src/server/session.rs new file mode 100644 index 000000000..13e12a552 --- /dev/null +++ b/v0.5/fastn-p2p/src/server/session.rs @@ -0,0 +1,61 @@ +/// Server-side streaming session (handles both RPC and streaming) +pub struct Session { + /// Protocol negotiated with client + pub protocol: PROTOCOL, + /// Stream to client (stdout) + pub send: iroh::endpoint::SendStream, + /// Stream from client (stdin) + pub recv: iroh::endpoint::RecvStream, + /// Peer's public key + peer: fastn_id52::PublicKey, + /// Context for this session (integration with fastn-context) + context: std::sync::Arc, +} + +impl Session { + /// Get the peer's public key + pub fn peer(&self) -> &fastn_id52::PublicKey { + &self.peer + } + + /// Get the context for this session + pub fn context(&self) -> &std::sync::Arc { + &self.context + } + + /// Convert to Request for RPC handling (consumes Session) + pub fn into_request(self) -> super::request::Request { + // TODO: Convert Session to Request for RPC pattern + todo!("Convert Session to Request for RPC handling") + } + + /// Open unidirectional stream back to client (e.g., stderr) + pub async fn open_uni(&mut self) -> Result { + // TODO: Open unidirectional stream to client + todo!("Open unidirectional stream back to client") + } + + /// Open bidirectional stream back to client + pub async fn open_bi(&mut self) -> Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream), crate::client::ConnectionError> { + // TODO: Open bidirectional stream to client + todo!("Open bidirectional stream back to client") + } +} + +/// Create a new Session (used internally by listener) +pub(crate) fn create_session( + protocol: PROTOCOL, + send: iroh::endpoint::SendStream, + recv: iroh::endpoint::RecvStream, + peer: fastn_id52::PublicKey, + parent_context: &std::sync::Arc, +) -> Session { + // Use parent context for now (can create child context later) + Session { + protocol, + send, + recv, + peer, + context: parent_context.clone(), + } +} \ No newline at end of file