Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions v0.5/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions v0.5/fastn-p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true

# Context integration
fastn-context = { path = "../../fastn-context" }

# Re-export proc macros from separate crate
fastn-p2p-macros = { path = "../fastn-p2p-macros", version = "0.1.0" }

Expand Down
134 changes: 86 additions & 48 deletions v0.5/fastn-p2p/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,95 @@
/// Error type for call function
/// Client-side P2P communication
///
/// This module provides client APIs for establishing both simple request/response
/// connections and complex streaming sessions with remote P2P endpoints.

/// Simple request/response communication (existing functionality)
pub async fn call<PROTOCOL, REQUEST, RESPONSE, ERROR>(
our_key: fastn_id52::SecretKey,
target: fastn_id52::PublicKey,
protocol: PROTOCOL,
request: REQUEST
) -> Result<Result<RESPONSE, ERROR>, CallError>
where
PROTOCOL: serde::Serialize + for<'de> serde::Deserialize<'de> + std::fmt::Debug,
REQUEST: serde::Serialize + for<'de> serde::Deserialize<'de>,
RESPONSE: serde::Serialize + for<'de> serde::Deserialize<'de>,
ERROR: serde::Serialize + for<'de> serde::Deserialize<'de>,
{
// TODO: Implement using existing fastn-p2p call infrastructure
todo!("Client call to {target} with protocol {protocol:?}")
}

/// Establish streaming P2P session (new functionality)
pub async fn connect<PROTOCOL>(
our_key: fastn_id52::SecretKey,
target: fastn_id52::PublicKey,
protocol: PROTOCOL
) -> Result<Session, ConnectionError>
where
PROTOCOL: serde::Serialize + for<'de> serde::Deserialize<'de> + std::fmt::Debug,
{
// TODO: Implement streaming connection establishment
todo!("Connect to {target} with protocol {protocol:?} using {}", our_key.id52())
}

/// Client-side streaming session
pub struct Session {
/// Input stream to server
pub stdin: iroh::endpoint::SendStream,
/// Output stream from server
pub stdout: iroh::endpoint::RecvStream,
// TODO: Add context integration
// context: std::sync::Arc<fastn_context::Context>,
}

impl Session {
/// Accept unidirectional stream back from server (e.g., stderr)
pub async fn accept_uni(&mut self) -> Result<iroh::endpoint::RecvStream, ConnectionError> {
// TODO: Accept incoming unidirectional stream from server
todo!("Accept unidirectional stream from server")
}

/// Accept bidirectional stream back from server
pub async fn accept_bi(&mut self) -> Result<(iroh::endpoint::RecvStream, iroh::endpoint::SendStream), ConnectionError> {
// TODO: Accept incoming bidirectional stream from server
todo!("Accept bidirectional stream from server")
}
}

/// Errors for client operations
#[derive(Debug, thiserror::Error)]
pub enum CallError {
#[error("Failed to establish P2P stream: {source}")]
#[error("Connection failed: {source}")]
Connection { source: eyre::Error },

#[error("Request/response error: {source}")]
RequestResponse { source: eyre::Error },

#[error("Serialization error: {source}")]
Serialization { source: serde_json::Error },

#[error("Endpoint error: {source}")]
Endpoint { source: eyre::Error },

#[error("Failed to establish P2P stream: {source}")]
#[error("Stream error: {source}")]
Stream { source: eyre::Error },

#[error("Failed to serialize request: {source}")]
Serialization { source: serde_json::Error },

#[error("Failed to send request: {source}")]

#[error("Send error: {source}")]
Send { source: eyre::Error },

#[error("Failed to receive response: {source}")]
#[error("Receive error: {source}")]
Receive { source: eyre::Error },

#[error("Failed to deserialize response: {source}")]
#[error("Deserialization error: {source}")]
Deserialization { source: serde_json::Error },
}

/// Make a P2P call using global singletons
///
/// This is the main function end users should use. It automatically uses
/// the global connection pool and graceful shutdown coordinator.
///
/// # Example
///
/// ```rust,ignore
/// let result: Result<MyResponse, MyError> = fastn_p2p::call(
/// secret_key, &target, protocol, request
/// ).await?;
/// ```
pub async fn call<P, INPUT, OUTPUT, ERROR>(
sender: fastn_id52::SecretKey,
target: &fastn_id52::PublicKey,
protocol: P,
input: INPUT,
) -> Result<Result<OUTPUT, ERROR>, CallError>
where
P: serde::Serialize
+ for<'de> serde::Deserialize<'de>
+ Clone
+ PartialEq
+ std::fmt::Display
+ std::fmt::Debug
+ Send
+ Sync
+ 'static,
INPUT: serde::Serialize,
OUTPUT: for<'de> serde::Deserialize<'de>,
ERROR: for<'de> serde::Deserialize<'de>,
{
// Delegate to coordination module which has strict singleton access control
crate::coordination::internal_call(sender, target, protocol, input).await
}
#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
#[error("Failed to establish streaming connection: {source}")]
Connection { source: eyre::Error },

#[error("Stream error: {source}")]
Stream { source: eyre::Error },
}
12 changes: 6 additions & 6 deletions v0.5/fastn-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@

extern crate self as fastn_p2p;

mod client;
mod coordination;
mod globals;
mod macros;
mod server;

// Export client and server modules (new modular API)
pub mod client;
pub mod server;

// Re-export essential types from fastn-net that users need
pub use fastn_net::{Graceful, Protocol};
Expand All @@ -48,12 +50,10 @@ pub use fastn_p2p_macros::main;
pub use coordination::{cancelled, shutdown, spawn};
pub use globals::{graceful, pool};

// Client API - clean, simple naming (only expose simple version)
// Legacy top-level exports (for backward compatibility)
pub use client::{CallError, call};

// Server API - clean, simple naming
pub use server::{
GetInputError, HandleRequestError, ListenerAlreadyActiveError, ListenerNotFoundError, Request,
ResponseHandle, SendError, active_listener_count, active_listeners, is_listening, listen,
stop_listening,
stop_listening, Session,
};
2 changes: 2 additions & 0 deletions v0.5/fastn-p2p/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod handle;
pub mod listener;
pub mod management;
pub mod request;
pub mod session;

// Public API exports - no use statements, direct qualification
pub use handle::{ResponseHandle, SendError};
Expand All @@ -15,3 +16,4 @@ pub use management::{
is_listening, stop_listening,
};
pub use request::{GetInputError, HandleRequestError, Request};
pub use session::Session;
61 changes: 61 additions & 0 deletions v0.5/fastn-p2p/src/server/session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/// Server-side streaming session (handles both RPC and streaming)
pub struct Session<PROTOCOL> {
/// Protocol negotiated with client
pub protocol: PROTOCOL,
/// Stream to client (stdout)
pub send: iroh::endpoint::SendStream,
/// Stream from client (stdin)
pub recv: iroh::endpoint::RecvStream,
/// Peer's public key
peer: fastn_id52::PublicKey,
/// Context for this session (integration with fastn-context)
context: std::sync::Arc<fastn_context::Context>,
}

impl<PROTOCOL> Session<PROTOCOL> {
/// Get the peer's public key
pub fn peer(&self) -> &fastn_id52::PublicKey {
&self.peer
}

/// Get the context for this session
pub fn context(&self) -> &std::sync::Arc<fastn_context::Context> {
&self.context
}

/// Convert to Request for RPC handling (consumes Session)
pub fn into_request(self) -> super::request::Request<PROTOCOL> {
// TODO: Convert Session to Request for RPC pattern
todo!("Convert Session to Request for RPC handling")
}

/// Open unidirectional stream back to client (e.g., stderr)
pub async fn open_uni(&mut self) -> Result<iroh::endpoint::SendStream, crate::client::ConnectionError> {
// TODO: Open unidirectional stream to client
todo!("Open unidirectional stream back to client")
}

/// Open bidirectional stream back to client
pub async fn open_bi(&mut self) -> Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream), crate::client::ConnectionError> {
// TODO: Open bidirectional stream to client
todo!("Open bidirectional stream back to client")
}
}

/// Create a new Session (used internally by listener)
pub(crate) fn create_session<PROTOCOL>(
protocol: PROTOCOL,
send: iroh::endpoint::SendStream,
recv: iroh::endpoint::RecvStream,
peer: fastn_id52::PublicKey,
parent_context: &std::sync::Arc<fastn_context::Context>,
) -> Session<PROTOCOL> {
// Use parent context for now (can create child context later)
Session {
protocol,
send,
recv,
peer,
context: parent_context.clone(),
}
}
Loading