From c44249e71cc471c652b2758d6fdef52d26bdcbe5 Mon Sep 17 00:00:00 2001 From: Amit Upadhyay Date: Tue, 16 Sep 2025 13:50:58 +0530 Subject: [PATCH 1/9] docs: design complete fastn-p2p API with client/server modules and streaming sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Document three core patterns: client::call(), client::connect(), server::listen() - Design unified server::Session that handles both RPC and streaming - Add server::Session -> server::Request conversion with into_request() - Specify public fields for direct stream access (send, recv, stdin, stdout) - Include complete API reference with all methods and signatures - Use fully qualified paths throughout examples (no use statements) - Add protocol consistency requirements (client and server must use same protocol) - Clarify stream directions with "back to client" terminology Key design decisions: - server::listen() returns Session stream - Session can convert to Request for RPC or use streams directly - Client Session accepts streams from server, Server Session opens streams to client - Public fields eliminate mutability conflicts with accessor methods Ready for implementation phase after design agreement. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- v0.5/fastn-p2p/README.md | 248 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 245 insertions(+), 3 deletions(-) diff --git a/v0.5/fastn-p2p/README.md b/v0.5/fastn-p2p/README.md index 8498f58e6..1d26b294e 100644 --- a/v0.5/fastn-p2p/README.md +++ b/v0.5/fastn-p2p/README.md @@ -4,12 +4,28 @@ This crate provides a high-level, type-safe API for P2P communication in the fas ## Design Philosophy -- **Type Safety First**: All communication uses strongly-typed REQUEST/RESPONSE/ERROR contracts +- **Type Safety First**: All communication uses strongly-typed protocol contracts - **Minimal Surface Area**: Only essential APIs are exposed to reduce complexity - **Bug Prevention**: API design makes common mistakes impossible or unlikely - **Ergonomic**: High-level APIs handle boilerplate automatically - **Zero Boilerplate**: Main functions are clean with automatic setup/teardown +## Core API Patterns + +fastn-p2p provides three main communication patterns: + +### 1. **Simple Request/Response** - `client::call()` + +For simple RPC-style communication with typed request/response. + +### 2. **Streaming Sessions** - `client::connect()` + +For complex interactions requiring bidirectional streaming with optional side channels. + +### 3. **Server Listening** - `server::listen()` + +For accepting incoming connections and handling both patterns. + ## Quick Start ### Application Entry Point @@ -23,7 +39,7 @@ async fn main() -> eyre::Result<()> { match cli.command { Command::Serve { port } => { - fastn_p2p::spawn(start_server(port)); + fastn_p2p::spawn(start_server()); } Command::Client { target } => { fastn_p2p::spawn(run_client(target)); @@ -82,6 +98,7 @@ The `logging` parameter supports multiple formats: ``` **Logging Examples:** + - **Production**: `logging = true` or `logging = "info"` - **Development**: `logging = "debug"` - **Deep debugging**: `logging = "fastn_p2p=trace,fastn_net=trace"` @@ -106,34 +123,146 @@ cargo run # Uses macro parameter as fallback ``` **Priority Order:** + 1. **`RUST_LOG` environment variable** (highest priority) 2. **Macro `logging` parameter** (fallback) 3. **Default `"info"`** (lowest priority) **Special Cases:** + ```rust // Even logging = false can be overridden for debugging #[fastn_p2p::main(logging = false)] ``` + ```bash RUST_LOG=debug cargo run # Still enables logging despite logging = false ``` This design allows developers to debug any application by setting `RUST_LOG` without modifying source code. +## API Reference + +### 1. Simple Request/Response - `client::call()` + +For RPC-style communication with typed request/response patterns: + +```rust +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct EchoRequest { + message: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct EchoResponse { + reply: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct EchoError { + reason: String, +} + +// Client side - must specify protocol (same as server) +let response: Result = fastn_p2p::client::call( + my_key, + target_id52, + EchoProtocol, // Must match server's protocol + EchoRequest { message: "Hello!".to_string() } +).await?; + +match response { + Ok(resp) => println!("Got reply: {}", resp.reply), + Err(err) => println!("Server error: {}", err.reason), +} +``` + +### 2. Streaming Sessions - `client::connect()` + +For complex interactions requiring persistent bidirectional communication with optional side channels: + +```rust +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct RemoteShellProtocol { + shell: String, + args: Vec, +} + +// Client side - establish streaming session +let mut session = fastn_p2p::client::connect( + my_key, + target_id52, + RemoteShellProtocol { + shell: "bash".to_string(), + args: vec![], + } +).await?; + +// Direct access to main streams +fastn_p2p::spawn(pipe_stdin_to_remote(session.stdin)); +fastn_p2p::spawn(pipe_stdout_from_remote(session.stdout)); + +// Optional: accept side channels back from server (iroh terminology) +if let Ok(stderr_stream) = session.accept_uni().await { + fastn_p2p::spawn(pipe_stderr_from_remote(stderr_stream)); +} +``` + +### 3. Server Listening - `server::listen()` + +For accepting both call() and connect() patterns with unified Session handling: + +```rust +// Server side - listen for incoming connections +let mut listener = fastn_p2p::server::listen!(my_key, &[ + EchoProtocol, + RemoteShellProtocol::default(), +]).await?; + +while let Some(session) = listener.next().await { + match session.protocol { + EchoProtocol => { + // RPC pattern - convert Session to Request + let request = session.into_request(); + request.handle(|input: EchoRequest| async move { + Ok::(EchoResponse { + reply: format!("Echo: {}", input.message) + }) + }).await?; + } + RemoteShellProtocol { .. } => { + // Streaming pattern - use Session directly + let mut shell_session = session; + + // Direct access to streams (public fields) + fastn_p2p::spawn(handle_stdin(shell_session.recv)); + fastn_p2p::spawn(handle_stdout(shell_session.send)); + + // Optional: open stderr channel back to client + if let Ok(stderr) = shell_session.open_uni().await { + fastn_p2p::spawn(handle_stderr(stderr)); + } + } + } +} +``` + #### Shutdown Modes **Single Ctrl+C Mode (Default):** + ```rust #[fastn_p2p::main(shutdown_mode = "single_ctrl_c")] async fn main() -> eyre::Result<()> { ... } ``` + - Ctrl+C immediately triggers graceful shutdown - Wait `shutdown_timeout` for tasks to complete - Force exit if timeout exceeded - Simple and predictable for most applications **Double Ctrl+C Mode:** + ```rust #[fastn_p2p::main( shutdown_mode = "double_ctrl_c", @@ -147,6 +276,7 @@ async fn print_status() { println!("Services: {} active", get_service_count()); } ``` + - First Ctrl+C calls `status_fn` and waits for second Ctrl+C - Second Ctrl+C within `double_ctrl_c_window` triggers shutdown - If no second Ctrl+C, continues running normally @@ -386,4 +516,116 @@ See the `/tests` directory for complete working examples: For advanced use cases that need direct access to `fastn_net::Graceful`, you can still access it through `fastn_p2p::globals`, but this is discouraged for most applications. -The `#[fastn_p2p::main]` approach handles 99% of use cases while providing excellent ergonomics and maintainability. \ No newline at end of file +The `#[fastn_p2p::main]` approach handles 99% of use cases while providing excellent ergonomics and maintainability. + +## Complete API Reference + +### Client API + +```rust +pub mod client { + /// Simple request/response communication + pub async fn call( + our_key: fastn_id52::SecretKey, + target: fastn_id52::PublicKey, + protocol: PROTOCOL, + request: REQUEST + ) -> Result, CallError>; + + /// Establish streaming session + pub async fn connect( + our_key: fastn_id52::SecretKey, + target: fastn_id52::PublicKey, + protocol: PROTOCOL + ) -> Result; + + /// Client-side streaming session + pub struct Session { + pub stdin: iroh::endpoint::SendStream, // To server + pub stdout: iroh::endpoint::RecvStream, // From server + } + + impl Session { + /// Accept unidirectional stream back from server (e.g., stderr) + pub async fn accept_uni(&mut self) -> Result; + + /// Accept bidirectional stream back from server + pub async fn accept_bi(&mut self) -> Result<(iroh::endpoint::RecvStream, iroh::endpoint::SendStream), ConnectionError>; + } +} +``` + +### Server API + +```rust +pub mod server { + /// Listen for incoming connections + pub async fn listen( + our_key: fastn_id52::SecretKey, + protocols: &[PROTOCOL] + ) -> impl futures_core::stream::Stream>; + + /// Server-side session (handles both RPC and streaming) + pub struct Session { + pub protocol: PROTOCOL, // Protocol negotiated + pub send: iroh::endpoint::SendStream, // To client (stdout) + pub recv: iroh::endpoint::RecvStream, // From client (stdin) + // private: peer + } + + impl Session { + /// Get the peer's public key + pub fn peer(&self) -> &fastn_id52::PublicKey; + + /// Convert to Request for RPC handling (consumes Session) + pub fn into_request(self) -> Request; + + /// Open unidirectional stream back to the client (e.g., stderr) + pub async fn open_uni(&mut self) -> Result; + + /// Open bidirectional stream to back to the client + pub async fn open_bi(&mut self) -> Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream), ConnectionError>; + } + + /// Request handle for RPC-style communication + pub struct Request { + pub protocol: PROTOCOL, + // private: peer, send, recv + } + + impl Request { + /// Get the peer's public key + pub fn peer(&self) -> &fastn_id52::PublicKey; + + /// Read and deserialize JSON request, get response handle + pub async fn get_input(self) -> Result<(INPUT, ResponseHandle), GetInputError>; + + /// Handle request with async closure (automatic serialization) + pub async fn handle( + self, + handler: F + ) -> Result<(), HandleRequestError>; + } +} +``` + +### Global Coordination + +```rust +/// Spawn task with graceful shutdown tracking +pub fn spawn(task: F) -> tokio::task::JoinHandle; + +/// Check for graceful shutdown signal +pub async fn cancelled(); + +/// Trigger graceful shutdown +pub async fn shutdown() -> eyre::Result<()>; +``` + +### Macros + +```rust +/// Main function macro with automatic setup +#[fastn_p2p::main] +#[fastn_p2p::main(logging = "debug", shutdown_mode = "double_ctrl_c")] +``` From 2676f20f38cb94fb09b54a5e8c7c01a70dfd289e Mon Sep 17 00:00:00 2001 From: Amit Upadhyay Date: Tue, 16 Sep 2025 15:32:09 +0530 Subject: [PATCH 2/9] docs: add persistent global counter storage with dotted path keys to fastn-context MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add global counter storage system that survives context drops - Design dotted path keys: "global.service.session.task.counter_name" - Add path tracking to Context structure with full_path field - Document persistent total counters vs auto-cleanup live counters - Add counter storage examples with hierarchical path structures - Ensure historical metrics persist beyond context lifecycles - Enable hierarchical counter aggregation via path-based queries Counter examples: - "global.connections" (app-level) - "global.remote-access.alice@bv478gen.commands" (session-level) - "global.http-proxy.requests" (service-level) Global HashMap storage ensures metrics survive context drops while maintaining hierarchical structure for status display and aggregation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- v0.5/fastn-context/README.md | 842 +++++++++++++++++++++++++++++++++++ 1 file changed, 842 insertions(+) create mode 100644 v0.5/fastn-context/README.md diff --git a/v0.5/fastn-context/README.md b/v0.5/fastn-context/README.md new file mode 100644 index 000000000..bce17f2a4 --- /dev/null +++ b/v0.5/fastn-context/README.md @@ -0,0 +1,842 @@ +# fastn-context: Hierarchical Application Context for Debugging and Operations + +This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. + +## Design Philosophy + +- **Hierarchical Structure**: Applications naturally form trees of operations +- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents +- **Zero Boilerplate**: Context trees build themselves as applications run +- **Production Ready**: Status trees enable debugging of stuck/slow operations +- **Bounded Complexity**: Simple spawn vs detailed child creation as needed + +## Core Concepts + +### Context Tree Structure + +Every fastn application forms a natural hierarchy: + +``` +Global Context (application level) +├── Service Context (e.g., "remote-access-listener") +│ ├── Session Context (e.g., "alice@bv478gen") +│ │ ├── Task Context (e.g., "stdout-handler") +│ │ └── Task Context (e.g., "stderr-stream") +│ └── Session Context (e.g., "bob@p2nd7avq") +├── Service Context (e.g., "http-proxy") +└── Service Context (e.g., "chat-service") +``` + +### Automatic Context Creation + +fastn-context integrates seamlessly with fastn ecosystem: + +```rust +// 1. Global context created by main macro +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically available +} + +// 2. Service contexts created by operations +let listener = fastn_p2p::server::listen(key, protocols).await?; +// Creates child context: "p2p-listener" under global + +// 3. Session contexts created per connection +// Each incoming connection gets child context: "session-{peer_id}" + +// 4. Task contexts created by spawn operations +session_ctx.child("shell-handler").spawn(handle_shell); +``` + +## API Reference + +### Core Context + +```rust +pub struct Context { + /// Context name for debugging/status + pub name: String, + + /// When this context was created + pub created_at: std::time::Instant, + + // Private: parent, children, cancellation, metrics, data +} + +impl Context { + /// Create new root context (typically only used by main macro) + pub fn new(name: &str) -> std::sync::Arc; + + /// Create child context with given name + pub fn child(&self, name: &str) -> ContextBuilder; + + /// Simple spawn (inherits current context, no child creation) + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle + where F: std::future::Future + Send + 'static; + + /// Wait for cancellation signal + pub async fn wait(&self); + + /// Cancel this context and all children recursively + pub fn cancel(&self); + + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); + + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); + + /// Get stored data + pub fn get_data(&self, key: &str) -> Option; + + /// Increment total counter (historical count) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (current active count) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (when operation completes) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} +``` + +### Context Builder + +```rust +pub struct ContextBuilder { + // Pre-created child context ready for configuration +} + +impl ContextBuilder { + /// Add initial data to context + pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; + + /// Add initial metric to context + pub fn with_metric(self, key: &str, value: MetricValue) -> Self; + + /// Spawn task with this configured child context + pub fn spawn(self, task: F) -> tokio::task::JoinHandle + where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; +} +``` + +### Global Access + +```rust +/// Get the global application context +pub fn global() -> std::sync::Arc; + +/// Get current task's context (thread-local or task-local) +pub fn current() -> std::sync::Arc; + +/// Print status tree for debugging +pub fn status() -> StatusTree; +``` + +### Metric Types + +```rust +#[derive(Debug, Clone)] +pub enum MetricValue { + Counter(u64), + Gauge(f64), + Duration(std::time::Duration), + Text(String), + Bytes(u64), +} +``` + +## Usage Patterns + +### Simple Task Spawning + +```rust +// Inherit current context (no child creation) +let ctx = fastn_context::current(); +ctx.spawn(async { + // Simple background task +}); +``` + +### Detailed Task Spawning + +```rust +// Create child context with debugging info +ctx.child("remote-shell-handler") + .with_data("peer", alice_id52) + .with_data("shell", "bash") + .with_metric("commands_executed", 0) + .spawn(|task_ctx| async move { + // Task can update its own context + task_ctx.add_metric("commands_executed", cmd_count); + task_ctx.set_data("last_command", "ls -la"); + + // Task waits for its own cancellation + tokio::select! { + _ = task_ctx.wait() => { + println!("Shell handler cancelled"); + } + _ = handle_shell_session() => { + println!("Shell session completed"); + } + } + }); +``` + +### Status Tree Output + +``` +$ fastn status +Global Context (2h 15m 32s uptime) +├── Remote Access Listener (1h 45m active) +│ ├── alice@bv478gen (23m 12s, bash shell) +│ │ ├── stdout-handler (23m 12s, 15.2MB processed) +│ │ └── stderr-stream (18m 45s, 2.1KB processed) +│ └── bob@p2nd7avq (8m 33s, ls command) +│ └── command-executor (8m 33s, exit pending) +├── HTTP Proxy (2h 15m active) +│ ├── connection-pool (45 active, 1,234 requests) +│ └── request-handler-pool (12 workers active) +└── Chat Service (35m active) + ├── presence-monitor (35m, 15 users tracked) + └── message-relay (35m, 4,567 messages) +``` + +## Integration with fastn-p2p + +fastn-p2p depends on fastn-context and automatically creates context hierarchies: + +```rust +// fastn-p2p sessions provide access to their context +async fn handle_remote_shell(session: fastn_p2p::server::Session) { + let ctx = session.context(); // Auto-created by fastn-p2p + + // Simple spawn (inherits session context) + ctx.spawn(pipe_stdout(session.send)); + + // Detailed spawn (creates child for debugging) + ctx.child("command-executor") + .with_data("command", session.protocol.command) + .spawn(|task_ctx| async move { + let result = execute_command(&session.protocol.command).await; + task_ctx.set_data("exit_code", result.code); + }); +} +``` + +## Main Function Integration + +The main macro moves to fastn-context and sets up the global context: + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically created and available + + let ctx = fastn_context::global(); + ctx.child("startup") + .with_data("version", env!("CARGO_PKG_VERSION")) + .spawn(|_| async { + // Application initialization + }); +} +``` + +## Design Benefits + +1. **Names Required for Debugging** - Every important operation has a name in status tree +2. **Selective Complexity** - Simple spawn vs detailed child creation as needed +3. **Automatic Tree Building** - Context hierarchy builds as application runs +4. **Production Debugging** - `fastn status` shows exactly where system is stuck +5. **Clean Separation** - Context concerns separate from networking concerns +6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure + +**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. + +## Comprehensive Timing and Lock Monitoring + +Every context and operation tracks detailed timing for real-time debugging, including named lock monitoring for deadlock detection. + +### Timing Integration + +```rust +pub struct Context { + pub name: String, + pub created_at: std::time::Instant, // When context started + pub last_activity: std::sync::Arc>, // Last activity + // ... other fields +} + +impl Context { + /// Update last activity timestamp (called automatically by operations) + pub fn touch(&self); + + /// Get how long this context has been alive + pub fn duration(&self) -> std::time::Duration; + + /// Get how long since last activity + pub fn idle_duration(&self) -> std::time::Duration; + + /// Create named mutex within this context + pub fn mutex(&self, name: &str, data: T) -> ContextMutex; + + /// Create named RwLock within this context + pub fn rwlock(&self, name: &str, data: T) -> ContextRwLock; + + /// Create named semaphore within this context + pub fn semaphore(&self, name: &str, permits: usize) -> ContextSemaphore; +} +``` + +### Named Lock Types + +```rust +pub struct ContextMutex { + name: String, + context: std::sync::Arc, + inner: tokio::sync::Mutex, +} + +impl ContextMutex { + /// Lock with automatic status tracking + pub async fn lock(&self) -> ContextMutexGuard; +} + +pub struct ContextMutexGuard { + acquired_at: std::time::Instant, // When lock was acquired + context_name: String, // Which context holds it + lock_name: String, // Lock identifier + // Auto-reports to context status system + // Auto-cleanup on drop +} +``` + +### Detailed Status Output with Comprehensive Timing + +``` +$ fastn status +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── âŗ WAITING "session-output-lock" (8.1s waiting) âš ī¸ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) âš ī¸ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +âŗ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) âš ī¸ STUCK + +âš ī¸ Potential Issues: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O +``` + +### Automatic Activity Tracking + +```rust +// All operations automatically maintain timing +ctx.spawn(async { + // ctx.touch() called when task starts + loop { + do_work().await; + ctx.touch(); // Update activity timestamp + } +}); + +// Lock operations update timing automatically +let guard = ctx.mutex("data-lock", data).lock().await; +// Updates: context last_activity, tracks lock hold time + +// Long operations should periodically touch +async fn long_running_task(ctx: std::sync::Arc) { + loop { + process_batch().await; + ctx.touch(); // Show we're still active, not stuck + + tokio::select! { + _ = ctx.wait() => break, // Cancelled + _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {} + } + } +} +``` + +This provides **real-time operational debugging** - administrators can instantly identify stuck operations, deadlocked tasks, and performance bottlenecks with precise timing information. + +## Counter Management System + +Every context can track both historical totals and live counts for detailed operational metrics. + +### Global Counter Storage with Dotted Paths + +```rust +pub struct Context { + pub name: String, + pub full_path: String, // "global.remote-access.alice@bv478gen.stdout-handler" + // ... other fields +} + +impl Context { + /// Get full dotted path for this context + pub fn path(&self) -> &str; + + /// Increment total counter (stored in global hashmap by full path) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (stored in global hashmap by full path) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (stored in global hashmap by full path) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values (retrieved from global storage) + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} + +// Global counter storage (persists beyond context lifetimes) +static GLOBAL_COUNTERS: LazyLock>> = ...; + +// Counter keys format: "{context_path}.{counter_name}" +// Examples: +// "global.connections" -> 1,247 +// "global.remote-access.connections" -> 234 +// "global.remote-access.alice@bv478gen.commands" -> 45 +// "global.http-proxy.requests" -> 1,013 +``` + +### Automatic Counter Integration + +```rust +// fastn-p2p automatically maintains connection counters +async fn handle_incoming_connection(session: fastn_p2p::server::Session) { + let ctx = session.context(); + + // Automatically tracked by fastn-p2p: + ctx.increment_total("connections"); // Total connections ever + ctx.increment_live("connections"); // Current active connections + + // Your handler code... + + // When session ends: + ctx.decrement_live("connections"); // Automatically called +} + +// Custom counters for application logic +async fn handle_remote_command(session: server::Session) { + let ctx = session.context(); + + ctx.increment_total("commands"); // Total commands executed + ctx.increment_live("commands"); // Currently executing commands + + let result = execute_command(&session.protocol.command).await; + + ctx.decrement_live("commands"); // Command completed + + if result.success { + ctx.increment_total("successful_commands"); + } else { + ctx.increment_total("failed_commands"); + } +} +``` + +### Enhanced Status Display with Counters + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Total: 1,247 connections, 15,432 requests | Live: 47 connections, 12 active requests +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── Total: 234 connections, 2,156 commands | Live: 2 connections, 3 commands +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── Total: 45 commands (42 success, 3 failed) | Live: 1 command +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── âŗ WAITING "session-output-lock" (8.1s waiting) âš ī¸ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ ├── Total: 12 commands (12 success) | Live: 1 command +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── Total: 1,013 requests (987 success, 26 failed) | Live: 45 connections, 8 requests +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── Total: 4,567 messages, 89 users joined | Live: 15 users, 3 conversations + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): ... +âŗ Lock Waiters (1): ... +``` + +### Counter Storage and Paths + +```rust +// Counter keys are automatically generated from context paths: + +// Global level counters +// "global.connections" -> 1,247 total +// "global.live_connections" -> 47 current + +// Service level counters +// "global.remote-access.connections" -> 234 total +// "global.remote-access.live_connections" -> 2 current + +// Session level counters +// "global.remote-access.alice@bv478gen.commands" -> 45 total +// "global.remote-access.alice@bv478gen.live_commands" -> 1 current + +// Task level counters +// "global.remote-access.alice@bv478gen.stdout-handler.bytes_processed" -> 1,234,567 + +// Examples in code: +async fn handle_connection(session: server::Session) { + let ctx = session.context(); // Path: "global.remote-access.alice@bv478gen" + + // These create global entries: + ctx.increment_total("commands"); // Key: "global.remote-access.alice@bv478gen.commands" + ctx.increment_live("commands"); // Key: "global.remote-access.alice@bv478gen.live_commands" + + // Nested task context + ctx.child("stdout-handler").spawn(|task_ctx| async move { + // task_ctx.path() -> "global.remote-access.alice@bv478gen.stdout-handler" + task_ctx.increment_total("bytes_processed"); + }); +} +``` + +### Persistent Counter Benefits + +- **✅ Survives context drops** - Counters stored globally, persist after contexts end +- **✅ Hierarchical aggregation** - Can sum all child counters for parent totals +- **✅ Path-based queries** - Easy to find counters by context path +- **✅ Historical tracking** - Total counters accumulate across all context instances +- **✅ Live tracking** - Live counters automatically decremented when contexts drop + +**Live counters** show current activity (auto-decremented on context drop). +**Total counters** show historical activity (persist forever for trending). +**Global storage** ensures metrics survive context lifecycles. + +## Status Monitoring and HTTP Dashboard + +fastn-context automatically provides multiple ways to access real-time status information for debugging and monitoring. + +### P2P Status Access + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Status automatically available over P2P for remote access + // No HTTP server needed - uses secure P2P connections + + // Your application code... +} +``` + +Status is accessible over the P2P network using the remote access system. + +### Status API Functions + +```rust +/// Get current status snapshot with ANSI formatting +pub fn status() -> Status; + +/// Stream of status updates (max once per second) +pub fn status_stream() -> impl futures_core::stream::Stream; + +/// Get raw status data as structured JSON +pub fn status_json() -> serde_json::Value; +``` + +### Status Type with ANSI Display + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct Status { + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct ContextStatus { + pub name: String, + pub duration: std::time::Duration, + pub last_activity: std::time::Duration, // Time since last activity + pub children: Vec, + pub metrics: std::collections::HashMap, + pub data: std::collections::HashMap, + pub total_counters: std::collections::HashMap, // Historical counts + pub live_counters: std::collections::HashMap, // Current active counts +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct LockStatus { + pub name: String, + pub held_by_context: String, + pub held_duration: std::time::Duration, + pub lock_type: LockType, // Mutex, RwLock, Semaphore +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct StatusWarning { + pub message: String, + pub context_path: String, + pub severity: WarningSeverity, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub enum WarningSeverity { + Info, // FYI information + Warning, // Potential issue + Critical, // Likely problem +} +``` + +### ANSI-Formatted Display + +```rust +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use colored::*; // For ANSI colors + + // Header with timestamp + writeln!(f, "{}", "fastn Status Dashboard".bold().blue())?; + writeln!(f, "{}", format!("Snapshot: {}", + humantime::format_rfc3339(self.timestamp)).dimmed())?; + writeln!(f)?; + + // Context tree with colors and timing + self.display_context_tree(f, &self.global_context, 0)?; + + // Active locks section + if !self.active_locks.is_empty() { + writeln!(f, "\n{} Active Locks ({}):", "🔒".yellow(), self.active_locks.len())?; + for lock in &self.active_locks { + let duration_str = humantime::format_duration(lock.held_duration); + let color = if lock.held_duration.as_secs() > 10 { + "red" + } else { + "white" + }; + writeln!(f, " - \"{}\" held by {} ({})", + lock.name.cyan(), + lock.held_by_context.white(), + duration_str.color(color))?; + } + } + + // Lock waiters section + if !self.lock_waiters.is_empty() { + writeln!(f, "\n{} Lock Waiters ({}):", "âŗ".yellow(), self.lock_waiters.len())?; + for waiter in &self.lock_waiters { + let duration_str = humantime::format_duration(waiter.waiting_duration); + writeln!(f, " - {} waiting for \"{}\" ({})", + waiter.context_name.white(), + waiter.lock_name.cyan(), + duration_str.red())?; + } + } + + // Warnings section + if !self.warnings.is_empty() { + writeln!(f, "\n{} Warnings:", "âš ī¸".red())?; + for warning in &self.warnings { + let icon = match warning.severity { + WarningSeverity::Info => "â„šī¸", + WarningSeverity::Warning => "âš ī¸", + WarningSeverity::Critical => "🚨", + }; + writeln!(f, " {} {}", icon, warning.message.yellow())?; + } + } + + Ok(()) + } +} +``` + +### Status Stream (Event-Driven Updates) + +```rust +/// Stream provides updates only when context tree actually changes +/// No polling - efficient for long-running monitoring +let mut status_stream = fastn_context::status_stream(); +while let Some(status) = status_stream.next().await { + // Only prints when something actually changes + print!("\x1B[2J\x1B[H"); // Clear screen + println!("{}", status); // Display with colors +} +``` + +### CLI Integration with P2P Status Access + +fastn-context integrates with the main fastn CLI to provide both local and remote status access: + +```bash +# Local machine status +fastn status # One-time snapshot with ANSI colors +fastn status -w # Watch mode (event-driven, no polling) +fastn status --json # JSON output for programmatic use + +# Remote machine status over P2P (requires remote access) +fastn status alice # Status from machine with alias "alice" +fastn status bv478gen... # Status from machine with ID52 +fastn status alice -w # Watch remote machine's status in real-time +fastn status alice --json # Remote machine status as JSON + +# Multiple machines +fastn status alice,bob,prod # Status from multiple machines +``` + +**P2P Status Protocol:** +- Uses secure fastn remote access (same as `fastn rshell`) +- Requires target machine in your `remote-access/config.toml` +- Status data transmitted over encrypted P2P connection +- Real-time streaming for remote watch mode + +### Status Protocol Integration + +Status access integrates seamlessly with fastn's remote access system: + +```rust +// Status is available as a built-in remote command +// When fastn-daemon receives status requests, fastn-context provides the data + +// Server side - automatic status command handling +// fastn-daemon automatically handles: +// - StatusRequest -> returns current Status +// - StatusStreamRequest -> returns real-time Status stream + +// Client side - transparent remote access +fastn status alice // Translates to fastn_p2p::client::call(alice, StatusRequest) +fastn status alice -w // Translates to fastn_p2p::client::connect(alice, StatusStreamProtocol) +``` + +This gives **comprehensive status access** - terminal, HTTP, streaming, and programmatic - all from the same underlying Status structure with rich ANSI formatting for human consumption. + +## System Metrics Monitoring + +fastn-context automatically monitors system resources and integrates them into the status display. + +### Automatic System Monitoring + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct SystemMetrics { + pub cpu_usage_percent: f32, // Current CPU usage + pub memory_used_bytes: u64, // RAM usage + pub memory_total_bytes: u64, // Total RAM + pub disk_used_bytes: u64, // Disk usage + pub disk_total_bytes: u64, // Total disk + pub network_rx_bytes_per_sec: u64, // Network receive rate + pub network_tx_bytes_per_sec: u64, // Network transmit rate + pub load_average: [f32; 3], // 1min, 5min, 15min load + pub uptime: std::time::Duration, // System uptime +} + +// Added to Status structure +pub struct Status { + pub system_metrics: SystemMetrics, // System resource usage + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} +``` + +### Efficient Metric Collection + +```rust +// System metrics cached and updated appropriately: +// - CPU usage: Updated every 1 second (smooth average) +// - Memory/disk: Updated every 5 seconds (less volatile) +// - Network rates: Updated every 1 second (calculated from deltas) +// - Load average: Updated every 10 seconds (system provides this) + +// Metrics only recalculated when status is actually requested +// No background polling unless someone is watching +``` + +### Enhanced Status Display with System Info + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── âŗ WAITING "session-output-lock" (8.1s waiting) âš ī¸ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) âš ī¸ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +âŗ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) âš ī¸ STUCK + +âš ī¸ Alerts: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O + - CPU usage normal (12.3%), memory usage low (13%) +``` + +### Watch Mode (`fastn status -w`) + +```rust +// Event-driven updates - only when something changes +// No CPU overhead when system is idle +// Immediately shows when new contexts/locks appear or disappear + +$ fastn status -w +# Screen updates only when: +# - New context created/destroyed +# - Lock acquired/released +# - Significant activity changes +# - System metrics cross thresholds +# - No updates for days if system is stable +``` + +This provides **complete operational visibility** with both application-specific context trees and system resource monitoring, all with efficient event-driven updates instead of wasteful polling. \ No newline at end of file From c7fb9f6598d435f72edddc303198ac58a8d48f82 Mon Sep 17 00:00:00 2001 From: Amit Upadhyay Date: Tue, 16 Sep 2025 15:33:41 +0530 Subject: [PATCH 3/9] docs: add note about iterative design documentation in fastn-context README --- v0.5/fastn-context/README.md | 2 + v0.5/fastn-p2p/README.md | 179 ++++++++++++++++++++++++++++++++++ v0.5/fastn-p2p/src/command.rs | 99 +++++++++++++++++++ v0.5/fastn-p2p/src/lib.rs | 4 + 4 files changed, 284 insertions(+) create mode 100644 v0.5/fastn-p2p/src/command.rs diff --git a/v0.5/fastn-context/README.md b/v0.5/fastn-context/README.md index bce17f2a4..eb48306d9 100644 --- a/v0.5/fastn-context/README.md +++ b/v0.5/fastn-context/README.md @@ -2,6 +2,8 @@ This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. +> **Note**: This README documents the complete design iteratively. Some sections may overlap as features build on each other. The design is internally consistent - later sections refine and extend earlier concepts. + ## Design Philosophy - **Hierarchical Structure**: Applications naturally form trees of operations diff --git a/v0.5/fastn-p2p/README.md b/v0.5/fastn-p2p/README.md index 1d26b294e..df4d98a19 100644 --- a/v0.5/fastn-p2p/README.md +++ b/v0.5/fastn-p2p/README.md @@ -629,3 +629,182 @@ pub async fn shutdown() -> eyre::Result<()>; #[fastn_p2p::main] #[fastn_p2p::main(logging = "debug", shutdown_mode = "double_ctrl_c")] ``` + +## Use Cases Analysis + +This section analyzes various future use cases to validate the API design and identify areas needing additional consideration. + +### ✅ **Remote Shell/Command Execution** +**Pattern**: Streaming sessions with optional stderr +**Implementation**: +- `client::connect(target, RemoteShellProtocol)` establishes session +- Use `stdin`/`stdout` for main interaction +- Server calls `session.open_uni()` for stderr when needed +- **Status**: Fully supported by current design + +### ✅ **Audio/Video/Text Chat** +**Pattern**: Single connection with multiple stream types +**Implementation**: +- `client::connect(target, ChatProtocol)` establishes base session +- Use `session.stdin`/`stdout` for text messages +- Server/client use `open_bi()` for audio streams, `open_uni()` for video +- Protocol negotiates capabilities (audio/video support) +- **Status**: Well supported - unlimited side channels enable this + +### ✅ **TCP Proxy (Port Forwarding)** +**Pattern**: Bridge local TCP socket to P2P session +**Implementation**: +- `client::connect(target, TcpProxyProtocol { port: 8080 })` +- Bridge local TCP socket ↔ `session.stdin`/`stdout` +- Server bridges session streams ↔ local TCP socket to target port +- **Status**: Trivial - direct stream bridging + +### 🤔 **HTTP Proxy (Multiple Requests per Connection)** +**Pattern**: Reuse connection for multiple HTTP requests +**Implementation Options**: +1. **New session per request** - High overhead but simple +2. **Session reuse** - `open_bi()` for each HTTP request within same session +3. **Request multiplexing** - Send multiple HTTP requests over same streams + +**Considerations Needed**: +- Session reuse patterns in API design +- Request correlation/multiplexing within sessions +- Connection pooling for efficiency + +### ✅ **KVM (Remote Desktop/Input)** +**Pattern**: Multiple unidirectional event streams +**Implementation**: +- `client::connect(target, KvmProtocol)` establishes session +- Server uses `open_uni()` for screen updates (high bandwidth) +- Client uses `open_uni()` for mouse/keyboard events (low latency) +- Multiple streams allow prioritization (input vs display) +- **Status**: Well supported - side channels handle different data types + +### 🤔 **File Transfer (SCP-like)** +**Pattern**: Large file transfers with progress +**Implementation**: +- `client::connect(target, FileTransferProtocol)` +- Use `stdin`/`stdout` for file data +- Use `open_uni()` for progress updates, metadata +- **Considerations**: Large stream handling, resume capability + +## Design Gaps Identified + +### 1. **Hierarchical Cancellation** âš ī¸ +**Current**: Global `fastn_p2p::cancelled()` affects entire application +**Need**: Tree-based cancellation to shut down specific features +```rust +// Proposed +let cancellation_token = fastn_p2p::CancellationToken::new(); +let listener = fastn_p2p::server::listen(key, protocols) + .with_cancellation(cancellation_token.clone()).await?; + +// Later: cancel just this listener +cancellation_token.cancel(); +``` + +### 2. **Hierarchical Status/Metrics** âš ī¸ +**Current**: Basic connection counting +**Need**: Tree-based status with timing and nested metrics +```rust +// Proposed +fastn_p2p::status() -> StatusTree +├── Application (45.2s uptime) +├── Remote Access (32.1s active) +│ ├── alice connection (12.3s, 2 streams) +│ └── bob connection (5.7s, 1 stream) +├── HTTP Proxy (45.2s active, 234 requests handled) +``` + +### 3. **Session Reuse Patterns** âš ī¸ +**Current**: Each `connect()` creates new session +**Need**: Patterns for reusing sessions for multiple operations (HTTP-like) +```rust +// May need +session.create_request_channel().await? // For HTTP-like patterns +``` + +## Context: Hierarchical Cancellation and Metrics + +To address the hierarchical gaps, fastn-p2p will include a `Context` system that provides tree-based cancellation, metrics, and status tracking. + +### Context API + +```rust +pub struct Context { + // Identity and timing + name: String, + created_at: std::time::Instant, + + // Tree structure + children: std::sync::Arc>>>, + + // Functionality + cancellation: tokio_util::sync::CancellationToken, + metrics: std::sync::Arc>>, + data: std::sync::Arc>>, +} + +impl Context { + /// Create new child context with given name + pub fn create_child(&self, name: &str) -> std::sync::Arc; + + /// Wait for cancellation signal + pub async fn wait(&self); + + /// Cancel this context and all children recursively + pub fn cancel(&self); + + /// Spawn task that inherits this context's cancellation + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle; + + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); + + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); +} + +/// Global status function - prints context tree +pub fn status() -> StatusTree; +``` + +### Automatic Context Creation + +```rust +// Contexts are auto-created and passed to handlers: + +// 1. Global context (created by #[fastn_p2p::main]) +// 2. Listener context (created by server::listen()) +// 3. Session context (created per connection) + +async fn handle_remote_shell(session: server::Session) { + let ctx = session.context(); // Auto-created session context + + // Spawned tasks inherit session's cancellation automatically + ctx.spawn(handle_stdout(session.send)); + ctx.spawn(handle_stdin(session.recv)); + + // When session ends or is cancelled, all spawned tasks stop +} +``` + +### Status Tree Output + +``` +$ fastn status +Application Context (45.2s uptime) +├── Remote Access Listener (32.1s active, 2 connections) +│ ├── alice@bv478gen... (12.3s, 2 streams active) +│ │ ├── stdout handler (12.3s) +│ │ └── stderr stream (8.1s) +│ └── bob@p2nd7avq... (5.7s, 1 stream active) +│ └── shell session (5.7s) +├── HTTP Proxy Listener (45.2s active, 234 requests) +│ └── connection pool (15 active connections) +└── Chat Service (12.1s active, 3 users) + ├── alice chat (8.2s, text + audio) + └── bob chat (3.1s, text only) +``` + +**Recommendation**: Start with current Session/Request API, add Context as enhancement layer that threads through the existing design. diff --git a/v0.5/fastn-p2p/src/command.rs b/v0.5/fastn-p2p/src/command.rs new file mode 100644 index 000000000..612d0ff40 --- /dev/null +++ b/v0.5/fastn-p2p/src/command.rs @@ -0,0 +1,99 @@ +/// P2P streaming connections with three-stream protocol +/// +/// This module provides networking APIs for establishing streaming P2P +/// connections with stdin/stdout/stderr-like semantics. + +/// A P2P streaming session with main bidirectional stream and optional side channels +/// +/// Simple abstraction: you get the main streams, create side channels on demand. +/// Protocol-specific communication happens over the available streams. +pub struct Session { + /// The protocol data negotiated for this session + pub protocol: PROTOCOL, + /// Input stream: Client → Server + pub stdin: iroh::endpoint::RecvStream, + /// Output stream: Server → Client + pub stdout: iroh::endpoint::SendStream, + // TODO: Store underlying iroh connection for side channel creation + // iroh_connection: iroh::Connection, +} + +impl Session { + /// Open unidirectional stream (matches iroh terminology) + pub async fn open_uni(&mut self) -> Result { + // TODO: Use iroh_connection.open_uni() + todo!("Open unidirectional stream") + } + + /// Open bidirectional stream (matches iroh terminology) + pub async fn open_bi(&mut self) -> Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream), ConnectionError> { + // TODO: Use iroh_connection.open_bi() + todo!("Open bidirectional stream") + } + + /// Accept incoming unidirectional stream (matches iroh terminology) + pub async fn accept_uni(&mut self) -> Result { + // TODO: Use iroh_connection.accept_uni() + todo!("Accept unidirectional stream") + } + + /// Accept incoming bidirectional stream (matches iroh terminology) + pub async fn accept_bi(&mut self) -> Result<(iroh::endpoint::RecvStream, iroh::endpoint::SendStream), ConnectionError> { + // TODO: Use iroh_connection.accept_bi() + todo!("Accept bidirectional stream") + } +} + +/// Errors related to P2P connections +#[derive(Debug, thiserror::Error)] +pub enum ConnectionError { + #[error("Failed to establish command connection: {source}")] + Connection { source: eyre::Error }, + + #[error("Failed to send command: {source}")] + Send { source: eyre::Error }, + + #[error("Failed to receive from command: {source}")] + Receive { source: eyre::Error }, + + #[error("Command serialization error: {source}")] + Serialization { source: serde_json::Error }, + + #[error("Exit code protocol error: {message}")] + ExitProtocol { message: String }, +} + +/// Establish a streaming P2P connection (client-side) +/// +/// Returns a Connection with direct access to stdin/stdout/stderr streams. +/// You handle the streams however you want - no magic abstractions. +/// +/// # Example +/// +/// ```rust,ignore +/// #[derive(serde::Serialize, serde::Deserialize, Debug)] +/// struct RemoteShellProtocol { +/// shell: String, +/// args: Vec, +/// } +/// +/// let mut conn = fastn_p2p::connect(our_key, target_key, RemoteShellProtocol { +/// shell: "bash".to_string(), +/// args: vec![], +/// }).await?; +/// +/// // Use the streams directly - no lookups needed +/// tokio::spawn(pipe_stdout(conn.stdout)); +/// tokio::spawn(pipe_stderr(conn.stderr)); +/// ``` +pub async fn connect( + our_key: fastn_id52::SecretKey, + target: fastn_id52::PublicKey, + protocol: PROTOCOL +) -> Result, ConnectionError> +where + PROTOCOL: serde::Serialize + for<'de> serde::Deserialize<'de> + std::fmt::Debug, +{ + // TODO: Implement streaming connection over P2P + todo!("Connect to {target} with protocol {protocol:?} using {}", our_key.id52()) +} \ No newline at end of file diff --git a/v0.5/fastn-p2p/src/lib.rs b/v0.5/fastn-p2p/src/lib.rs index a0940be15..0e4618678 100644 --- a/v0.5/fastn-p2p/src/lib.rs +++ b/v0.5/fastn-p2p/src/lib.rs @@ -32,6 +32,7 @@ extern crate self as fastn_p2p; mod client; +mod command; mod coordination; mod globals; mod macros; @@ -51,6 +52,9 @@ pub use globals::{graceful, pool}; // Client API - clean, simple naming (only expose simple version) pub use client::{CallError, call}; +// Streaming API - P2P sessions with stdin/stdout streams and optional side channels +pub use command::{Session, ConnectionError, connect}; + // Server API - clean, simple naming pub use server::{ GetInputError, HandleRequestError, ListenerAlreadyActiveError, ListenerNotFoundError, Request, From 85ee3663c20cc9513c9f2b6c4ea225e00e6a7f86 Mon Sep 17 00:00:00 2001 From: Amit Upadhyay Date: Tue, 16 Sep 2025 15:37:37 +0530 Subject: [PATCH 4/9] cleanup: remove WIP fastn-p2p code files, keep only README designs - Remove incomplete command.rs file that was accidentally committed - Revert lib.rs to clean state without WIP command exports - Preserve complete fastn-context README design - Keep fastn-p2p README design for reference - Ready to begin implementation phase with clean slate --- v0.5/fastn-p2p/src/command.rs | 99 ----------------------------------- v0.5/fastn-p2p/src/lib.rs | 4 -- 2 files changed, 103 deletions(-) delete mode 100644 v0.5/fastn-p2p/src/command.rs diff --git a/v0.5/fastn-p2p/src/command.rs b/v0.5/fastn-p2p/src/command.rs deleted file mode 100644 index 612d0ff40..000000000 --- a/v0.5/fastn-p2p/src/command.rs +++ /dev/null @@ -1,99 +0,0 @@ -/// P2P streaming connections with three-stream protocol -/// -/// This module provides networking APIs for establishing streaming P2P -/// connections with stdin/stdout/stderr-like semantics. - -/// A P2P streaming session with main bidirectional stream and optional side channels -/// -/// Simple abstraction: you get the main streams, create side channels on demand. -/// Protocol-specific communication happens over the available streams. -pub struct Session { - /// The protocol data negotiated for this session - pub protocol: PROTOCOL, - /// Input stream: Client → Server - pub stdin: iroh::endpoint::RecvStream, - /// Output stream: Server → Client - pub stdout: iroh::endpoint::SendStream, - // TODO: Store underlying iroh connection for side channel creation - // iroh_connection: iroh::Connection, -} - -impl Session { - /// Open unidirectional stream (matches iroh terminology) - pub async fn open_uni(&mut self) -> Result { - // TODO: Use iroh_connection.open_uni() - todo!("Open unidirectional stream") - } - - /// Open bidirectional stream (matches iroh terminology) - pub async fn open_bi(&mut self) -> Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream), ConnectionError> { - // TODO: Use iroh_connection.open_bi() - todo!("Open bidirectional stream") - } - - /// Accept incoming unidirectional stream (matches iroh terminology) - pub async fn accept_uni(&mut self) -> Result { - // TODO: Use iroh_connection.accept_uni() - todo!("Accept unidirectional stream") - } - - /// Accept incoming bidirectional stream (matches iroh terminology) - pub async fn accept_bi(&mut self) -> Result<(iroh::endpoint::RecvStream, iroh::endpoint::SendStream), ConnectionError> { - // TODO: Use iroh_connection.accept_bi() - todo!("Accept bidirectional stream") - } -} - -/// Errors related to P2P connections -#[derive(Debug, thiserror::Error)] -pub enum ConnectionError { - #[error("Failed to establish command connection: {source}")] - Connection { source: eyre::Error }, - - #[error("Failed to send command: {source}")] - Send { source: eyre::Error }, - - #[error("Failed to receive from command: {source}")] - Receive { source: eyre::Error }, - - #[error("Command serialization error: {source}")] - Serialization { source: serde_json::Error }, - - #[error("Exit code protocol error: {message}")] - ExitProtocol { message: String }, -} - -/// Establish a streaming P2P connection (client-side) -/// -/// Returns a Connection with direct access to stdin/stdout/stderr streams. -/// You handle the streams however you want - no magic abstractions. -/// -/// # Example -/// -/// ```rust,ignore -/// #[derive(serde::Serialize, serde::Deserialize, Debug)] -/// struct RemoteShellProtocol { -/// shell: String, -/// args: Vec, -/// } -/// -/// let mut conn = fastn_p2p::connect(our_key, target_key, RemoteShellProtocol { -/// shell: "bash".to_string(), -/// args: vec![], -/// }).await?; -/// -/// // Use the streams directly - no lookups needed -/// tokio::spawn(pipe_stdout(conn.stdout)); -/// tokio::spawn(pipe_stderr(conn.stderr)); -/// ``` -pub async fn connect( - our_key: fastn_id52::SecretKey, - target: fastn_id52::PublicKey, - protocol: PROTOCOL -) -> Result, ConnectionError> -where - PROTOCOL: serde::Serialize + for<'de> serde::Deserialize<'de> + std::fmt::Debug, -{ - // TODO: Implement streaming connection over P2P - todo!("Connect to {target} with protocol {protocol:?} using {}", our_key.id52()) -} \ No newline at end of file diff --git a/v0.5/fastn-p2p/src/lib.rs b/v0.5/fastn-p2p/src/lib.rs index 0e4618678..a0940be15 100644 --- a/v0.5/fastn-p2p/src/lib.rs +++ b/v0.5/fastn-p2p/src/lib.rs @@ -32,7 +32,6 @@ extern crate self as fastn_p2p; mod client; -mod command; mod coordination; mod globals; mod macros; @@ -52,9 +51,6 @@ pub use globals::{graceful, pool}; // Client API - clean, simple naming (only expose simple version) pub use client::{CallError, call}; -// Streaming API - P2P sessions with stdin/stdout streams and optional side channels -pub use command::{Session, ConnectionError, connect}; - // Server API - clean, simple naming pub use server::{ GetInputError, HandleRequestError, ListenerAlreadyActiveError, ListenerNotFoundError, Request, From decdb7281573c98590dca7fb3598a50fb4c684b3 Mon Sep 17 00:00:00 2001 From: Amit Upadhyay Date: Tue, 16 Sep 2025 16:01:54 +0530 Subject: [PATCH 5/9] refactor: organize fastn-context documentation and create minimal implementation scope MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move fastn-context to repo root (not v0.5) for ecosystem-wide availability - Split comprehensive README into focused current vs future features - Create README.md with minimal Context API needed for fastn-p2p integration - Split future features into focused NEXT-*.md files: - NEXT-monitoring.md: Status trees, timing, system metrics - NEXT-locks.md: Named locks and deadlock detection - NEXT-counters.md: Global counter storage with dotted paths - NEXT-status-distribution.md: P2P status access - NEXT-complete-design.md: Full original design reference - Add workspace integration for fastn-context crate - Create minimal test example showing required API surface - Add basic Cargo.toml with minimal dependencies This focuses implementation on just what's needed for remote shell functionality: - Basic Context tree structure - Hierarchical cancellation - Task spawning with inheritance - Integration points for fastn-p2p Comprehensive monitoring features deferred to future PRs after core functionality proven. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- Cargo.toml | 2 + fastn-context/Cargo.toml | 15 +++ .../NEXT-complete-design.md | 0 fastn-context/NEXT-counters.md | 37 ++++++++ fastn-context/NEXT-locks.md | 32 +++++++ fastn-context/NEXT-monitoring.md | 34 +++++++ fastn-context/NEXT-status-distribution.md | 36 +++++++ fastn-context/README.md | 93 +++++++++++++++++++ fastn-context/examples/minimal_test.rs | 53 +++++++++++ 9 files changed, 302 insertions(+) create mode 100644 fastn-context/Cargo.toml rename v0.5/fastn-context/README.md => fastn-context/NEXT-complete-design.md (100%) create mode 100644 fastn-context/NEXT-counters.md create mode 100644 fastn-context/NEXT-locks.md create mode 100644 fastn-context/NEXT-monitoring.md create mode 100644 fastn-context/NEXT-status-distribution.md create mode 100644 fastn-context/README.md create mode 100644 fastn-context/examples/minimal_test.rs diff --git a/Cargo.toml b/Cargo.toml index e66617493..755030069 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "clift", "fastn", "fastn-builtins", + "fastn-context", "fastn-core", "fastn-daemon", "fastn-ds", @@ -89,6 +90,7 @@ enum-iterator = "0.6" enum-iterator-derive = "0.6" env_logger = "0.11" fastn-builtins.path = "fastn-builtins" +fastn-context.path = "fastn-context" fastn-core.path = "fastn-core" fastn-ds.path = "fastn-ds" fastn-daemon.path = "fastn-daemon" diff --git a/fastn-context/Cargo.toml b/fastn-context/Cargo.toml new file mode 100644 index 000000000..191a7f8cd --- /dev/null +++ b/fastn-context/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "fastn-context" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +description.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +rust-version.workspace = true + +[dependencies] +tokio.workspace = true +tokio-util = { version = "0.7", features = ["sync"] } +eyre.workspace = true \ No newline at end of file diff --git a/v0.5/fastn-context/README.md b/fastn-context/NEXT-complete-design.md similarity index 100% rename from v0.5/fastn-context/README.md rename to fastn-context/NEXT-complete-design.md diff --git a/fastn-context/NEXT-counters.md b/fastn-context/NEXT-counters.md new file mode 100644 index 000000000..f240c3135 --- /dev/null +++ b/fastn-context/NEXT-counters.md @@ -0,0 +1,37 @@ +# NEXT: Global Counter Storage System + +Features for persistent counter tracking with dotted path keys. + +## Counter Types + +- **Total counters** - Historical cumulative counts (persist after context drops) +- **Live counters** - Current active operations (auto-decremented on drop) + +## Global Storage + +```rust +// Dotted path keys in global HashMap +"global.connections" -> 1,247 +"global.remote-access.alice@bv478gen.commands" -> 45 +"global.http-proxy.requests" -> 1,013 +``` + +## API + +```rust +impl Context { + pub fn increment_total(&self, counter: &str); + pub fn increment_live(&self, counter: &str); + pub fn decrement_live(&self, counter: &str); + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} +``` + +## Integration + +- fastn-p2p automatically tracks connection/request counters +- Counters survive context drops via global storage +- Hierarchical aggregation possible via path prefix matching + +**Implementation**: After basic Context + monitoring foundation. \ No newline at end of file diff --git a/fastn-context/NEXT-locks.md b/fastn-context/NEXT-locks.md new file mode 100644 index 000000000..9853624da --- /dev/null +++ b/fastn-context/NEXT-locks.md @@ -0,0 +1,32 @@ +# NEXT: Named Locks and Deadlock Detection + +Features for named lock monitoring and deadlock detection. + +## Named Lock Types + +- `ContextMutex` - Named mutex with timing tracking +- `ContextRwLock` - Named read-write lock +- `ContextSemaphore` - Named semaphore with permit tracking + +## Lock Monitoring + +- Track who holds what locks and for how long +- Track who's waiting for locks and wait times +- Automatic deadlock risk detection +- Lock status in context tree display + +## Usage Pattern + +```rust +// Create named locks within context +let user_lock = ctx.mutex("user-data-lock", UserData::new()); + +// Lock operations automatically tracked +let guard = user_lock.lock().await; +// Shows in status: "HOLDS user-data-lock (2.3s)" + +// Waiting operations tracked +// Shows in status: "WAITING user-data-lock (5.1s) âš ī¸ STUCK" +``` + +**Implementation**: After basic Context system + monitoring foundation. \ No newline at end of file diff --git a/fastn-context/NEXT-monitoring.md b/fastn-context/NEXT-monitoring.md new file mode 100644 index 000000000..644271d75 --- /dev/null +++ b/fastn-context/NEXT-monitoring.md @@ -0,0 +1,34 @@ +# NEXT: Comprehensive Monitoring System + +Features planned for future implementation after basic Context system is working. + +## Status Trees and Monitoring + +- Hierarchical status display with timing +- ANSI-formatted status output +- Event-driven status updates +- System metrics integration (CPU, RAM, disk, network) +- P2P status distribution (`fastn status `) + +## Counter Management + +- Global counter storage with dotted paths +- Total vs live counter tracking +- Automatic counter integration +- Hierarchical counter aggregation + +## Named Locks + +- ContextMutex, ContextRwLock, ContextSemaphore +- Deadlock detection and timing +- Lock status in monitoring tree +- Wait time tracking + +## Advanced Features + +- Builder pattern: `ctx.child("name").with_data().spawn()` +- Metric types and data storage +- HTTP status endpoints +- Status streaming API + +**Implementation**: After basic Context + fastn-p2p integration is complete. \ No newline at end of file diff --git a/fastn-context/NEXT-status-distribution.md b/fastn-context/NEXT-status-distribution.md new file mode 100644 index 000000000..f44ecefb9 --- /dev/null +++ b/fastn-context/NEXT-status-distribution.md @@ -0,0 +1,36 @@ +# NEXT: P2P Status Distribution + +Features for distributed status monitoring over P2P network. + +## Remote Status Access + +```bash +# Remote machine status over P2P +fastn status alice # Status from machine with alias "alice" +fastn status alice -w # Watch remote machine in real-time +fastn status alice,bob,prod # Multiple machines +``` + +## P2P Integration + +- Uses secure fastn remote access (same as `fastn rshell`) +- Status transmitted over encrypted P2P connections +- Requires target machine in `remote-access/config.toml` +- Real-time streaming for watch mode + +## Protocol + +```rust +// Built-in status commands +StatusRequest -> Status // One-time snapshot +StatusStreamProtocol -> Stream // Real-time updates +``` + +## Benefits + +- **Distributed monitoring** - Monitor entire fastn network from any machine +- **Secure access** - Uses same permissions as remote shell +- **No HTTP servers** - Uses P2P infrastructure only +- **Real-time** - Event-driven updates across network + +**Implementation**: After P2P streaming API + basic status system. \ No newline at end of file diff --git a/fastn-context/README.md b/fastn-context/README.md new file mode 100644 index 000000000..bfe814f6c --- /dev/null +++ b/fastn-context/README.md @@ -0,0 +1,93 @@ +# fastn-context: Hierarchical Application Context (Minimal Implementation) + +This crate provides basic hierarchical context system for fastn applications, enabling tree-based cancellation and task management. This is the minimal implementation needed for fastn-p2p integration. + +## Design Philosophy + +- **Hierarchical Structure**: Applications form trees of operations +- **Automatic Inheritance**: Child contexts inherit cancellation from parents +- **Zero Boilerplate**: Context trees build themselves as applications run +- **Minimal Surface**: Only essential features for P2P integration + +## Current API (Minimal) + +### Core Context + +```rust +pub struct Context { + pub name: String, + // private: parent, children, cancellation_token +} + +impl Context { + /// Create new root context (used by main macro) + pub fn new(name: &str) -> std::sync::Arc; + + /// Create child context + pub fn child(&self, name: &str) -> std::sync::Arc; + + /// Spawn task with context inheritance + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle; + + /// Wait for cancellation signal + pub async fn wait(&self); + + /// Cancel this context and all children + pub fn cancel(&self); +} +``` + +### Global Access + +```rust +/// Get the global application context +pub fn global() -> std::sync::Arc; + +/// Get current task's context +pub fn current() -> std::sync::Arc; +``` + +### Main Function Macro + +```rust +/// Main function with automatic context setup +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically created and available + let ctx = fastn_context::global(); + + ctx.spawn(async { + // Task inherits global context cancellation + }); +} +``` + +## Usage for fastn-p2p Integration + +```rust +// fastn-p2p sessions will provide context access +async fn handle_remote_shell(session: fastn_p2p::server::Session) { + let ctx = session.context(); // Context provided by fastn-p2p + + // Spawn tasks with inherited cancellation + ctx.spawn(handle_stdout(session.send)); + ctx.spawn(handle_stdin(session.recv)); + + // Tasks automatically cancelled when session context cancelled +} +``` + +## Scope + +This minimal implementation provides: +- ✅ Basic context tree structure +- ✅ Hierarchical cancellation +- ✅ Task spawning with inheritance +- ✅ Integration points for fastn-p2p + +**Not included** (see NEXT-*.md files): +- Detailed metrics and counters +- Named locks and deadlock detection +- System monitoring +- Status trees and HTTP dashboard +- Advanced timing and monitoring \ No newline at end of file diff --git a/fastn-context/examples/minimal_test.rs b/fastn-context/examples/minimal_test.rs new file mode 100644 index 000000000..c722be362 --- /dev/null +++ b/fastn-context/examples/minimal_test.rs @@ -0,0 +1,53 @@ +/// Test the minimal fastn-context API needed for fastn-p2p integration +/// This validates our basic Context design before implementation + +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + println!("Testing minimal fastn-context API..."); + + // Global context should be automatically available + let global_ctx = fastn_context::global(); + println!("Global context created: {}", global_ctx.name); + + // Test basic child creation + let service_ctx = global_ctx.child("test-service"); + println!("Service context created: {}", service_ctx.name); + + // Test task spawning with context inheritance + service_ctx.spawn(async { + println!("Task 1 running in service context"); + + // Simulate some work + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + println!("Task 1 completed"); + }); + + // Test nested contexts + let session_ctx = service_ctx.child("test-session"); + session_ctx.spawn(async { + println!("Task 2 running in session context"); + + // Test cancellation handling + tokio::select! { + _ = fastn_context::current().wait() => { + println!("Task 2 cancelled by context"); + } + _ = tokio::time::sleep(tokio::time::Duration::from_millis(200)) => { + println!("Task 2 completed normally"); + } + } + }); + + // Let tasks run briefly + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + + // Test cancellation + println!("Cancelling service context..."); + service_ctx.cancel(); + + // Brief delay to see cancellation effects + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + println!("Minimal API test completed!"); + Ok(()) +} \ No newline at end of file From 5ef6007dcd33d60ade122f02806ae652af6dea84 Mon Sep 17 00:00:00 2001 From: Amit Upadhyay Date: Tue, 16 Sep 2025 17:05:23 +0530 Subject: [PATCH 6/9] docs: restore complete fastn-context README with all essential current-scope content MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Restore Context Tree Structure and Automatic Context Creation sections - Add back complete Context API with metrics, data storage, and builder pattern - Restore ContextBuilder fluent API with with_data() and with_metric() methods - Include detailed main macro configuration options (logging, shutdown modes) - Add back integration examples showing fastn-p2p usage patterns - Restore usage patterns for simple vs detailed task spawning - Include design benefits and debugging rationale - Reference NEXT-*.md files for future planned features All important current implementation details restored from original comprehensive design. No content lost - proper categorization between current API and future enhancements. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- fastn-context/README-FULL.md | 844 +++++++++++++++++++++++++++++++++++ fastn-context/README.md | 246 ++++++++-- 2 files changed, 1050 insertions(+), 40 deletions(-) create mode 100644 fastn-context/README-FULL.md diff --git a/fastn-context/README-FULL.md b/fastn-context/README-FULL.md new file mode 100644 index 000000000..eb48306d9 --- /dev/null +++ b/fastn-context/README-FULL.md @@ -0,0 +1,844 @@ +# fastn-context: Hierarchical Application Context for Debugging and Operations + +This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. + +> **Note**: This README documents the complete design iteratively. Some sections may overlap as features build on each other. The design is internally consistent - later sections refine and extend earlier concepts. + +## Design Philosophy + +- **Hierarchical Structure**: Applications naturally form trees of operations +- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents +- **Zero Boilerplate**: Context trees build themselves as applications run +- **Production Ready**: Status trees enable debugging of stuck/slow operations +- **Bounded Complexity**: Simple spawn vs detailed child creation as needed + +## Core Concepts + +### Context Tree Structure + +Every fastn application forms a natural hierarchy: + +``` +Global Context (application level) +├── Service Context (e.g., "remote-access-listener") +│ ├── Session Context (e.g., "alice@bv478gen") +│ │ ├── Task Context (e.g., "stdout-handler") +│ │ └── Task Context (e.g., "stderr-stream") +│ └── Session Context (e.g., "bob@p2nd7avq") +├── Service Context (e.g., "http-proxy") +└── Service Context (e.g., "chat-service") +``` + +### Automatic Context Creation + +fastn-context integrates seamlessly with fastn ecosystem: + +```rust +// 1. Global context created by main macro +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically available +} + +// 2. Service contexts created by operations +let listener = fastn_p2p::server::listen(key, protocols).await?; +// Creates child context: "p2p-listener" under global + +// 3. Session contexts created per connection +// Each incoming connection gets child context: "session-{peer_id}" + +// 4. Task contexts created by spawn operations +session_ctx.child("shell-handler").spawn(handle_shell); +``` + +## API Reference + +### Core Context + +```rust +pub struct Context { + /// Context name for debugging/status + pub name: String, + + /// When this context was created + pub created_at: std::time::Instant, + + // Private: parent, children, cancellation, metrics, data +} + +impl Context { + /// Create new root context (typically only used by main macro) + pub fn new(name: &str) -> std::sync::Arc; + + /// Create child context with given name + pub fn child(&self, name: &str) -> ContextBuilder; + + /// Simple spawn (inherits current context, no child creation) + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle + where F: std::future::Future + Send + 'static; + + /// Wait for cancellation signal + pub async fn wait(&self); + + /// Cancel this context and all children recursively + pub fn cancel(&self); + + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); + + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); + + /// Get stored data + pub fn get_data(&self, key: &str) -> Option; + + /// Increment total counter (historical count) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (current active count) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (when operation completes) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} +``` + +### Context Builder + +```rust +pub struct ContextBuilder { + // Pre-created child context ready for configuration +} + +impl ContextBuilder { + /// Add initial data to context + pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; + + /// Add initial metric to context + pub fn with_metric(self, key: &str, value: MetricValue) -> Self; + + /// Spawn task with this configured child context + pub fn spawn(self, task: F) -> tokio::task::JoinHandle + where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; +} +``` + +### Global Access + +```rust +/// Get the global application context +pub fn global() -> std::sync::Arc; + +/// Get current task's context (thread-local or task-local) +pub fn current() -> std::sync::Arc; + +/// Print status tree for debugging +pub fn status() -> StatusTree; +``` + +### Metric Types + +```rust +#[derive(Debug, Clone)] +pub enum MetricValue { + Counter(u64), + Gauge(f64), + Duration(std::time::Duration), + Text(String), + Bytes(u64), +} +``` + +## Usage Patterns + +### Simple Task Spawning + +```rust +// Inherit current context (no child creation) +let ctx = fastn_context::current(); +ctx.spawn(async { + // Simple background task +}); +``` + +### Detailed Task Spawning + +```rust +// Create child context with debugging info +ctx.child("remote-shell-handler") + .with_data("peer", alice_id52) + .with_data("shell", "bash") + .with_metric("commands_executed", 0) + .spawn(|task_ctx| async move { + // Task can update its own context + task_ctx.add_metric("commands_executed", cmd_count); + task_ctx.set_data("last_command", "ls -la"); + + // Task waits for its own cancellation + tokio::select! { + _ = task_ctx.wait() => { + println!("Shell handler cancelled"); + } + _ = handle_shell_session() => { + println!("Shell session completed"); + } + } + }); +``` + +### Status Tree Output + +``` +$ fastn status +Global Context (2h 15m 32s uptime) +├── Remote Access Listener (1h 45m active) +│ ├── alice@bv478gen (23m 12s, bash shell) +│ │ ├── stdout-handler (23m 12s, 15.2MB processed) +│ │ └── stderr-stream (18m 45s, 2.1KB processed) +│ └── bob@p2nd7avq (8m 33s, ls command) +│ └── command-executor (8m 33s, exit pending) +├── HTTP Proxy (2h 15m active) +│ ├── connection-pool (45 active, 1,234 requests) +│ └── request-handler-pool (12 workers active) +└── Chat Service (35m active) + ├── presence-monitor (35m, 15 users tracked) + └── message-relay (35m, 4,567 messages) +``` + +## Integration with fastn-p2p + +fastn-p2p depends on fastn-context and automatically creates context hierarchies: + +```rust +// fastn-p2p sessions provide access to their context +async fn handle_remote_shell(session: fastn_p2p::server::Session) { + let ctx = session.context(); // Auto-created by fastn-p2p + + // Simple spawn (inherits session context) + ctx.spawn(pipe_stdout(session.send)); + + // Detailed spawn (creates child for debugging) + ctx.child("command-executor") + .with_data("command", session.protocol.command) + .spawn(|task_ctx| async move { + let result = execute_command(&session.protocol.command).await; + task_ctx.set_data("exit_code", result.code); + }); +} +``` + +## Main Function Integration + +The main macro moves to fastn-context and sets up the global context: + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically created and available + + let ctx = fastn_context::global(); + ctx.child("startup") + .with_data("version", env!("CARGO_PKG_VERSION")) + .spawn(|_| async { + // Application initialization + }); +} +``` + +## Design Benefits + +1. **Names Required for Debugging** - Every important operation has a name in status tree +2. **Selective Complexity** - Simple spawn vs detailed child creation as needed +3. **Automatic Tree Building** - Context hierarchy builds as application runs +4. **Production Debugging** - `fastn status` shows exactly where system is stuck +5. **Clean Separation** - Context concerns separate from networking concerns +6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure + +**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. + +## Comprehensive Timing and Lock Monitoring + +Every context and operation tracks detailed timing for real-time debugging, including named lock monitoring for deadlock detection. + +### Timing Integration + +```rust +pub struct Context { + pub name: String, + pub created_at: std::time::Instant, // When context started + pub last_activity: std::sync::Arc>, // Last activity + // ... other fields +} + +impl Context { + /// Update last activity timestamp (called automatically by operations) + pub fn touch(&self); + + /// Get how long this context has been alive + pub fn duration(&self) -> std::time::Duration; + + /// Get how long since last activity + pub fn idle_duration(&self) -> std::time::Duration; + + /// Create named mutex within this context + pub fn mutex(&self, name: &str, data: T) -> ContextMutex; + + /// Create named RwLock within this context + pub fn rwlock(&self, name: &str, data: T) -> ContextRwLock; + + /// Create named semaphore within this context + pub fn semaphore(&self, name: &str, permits: usize) -> ContextSemaphore; +} +``` + +### Named Lock Types + +```rust +pub struct ContextMutex { + name: String, + context: std::sync::Arc, + inner: tokio::sync::Mutex, +} + +impl ContextMutex { + /// Lock with automatic status tracking + pub async fn lock(&self) -> ContextMutexGuard; +} + +pub struct ContextMutexGuard { + acquired_at: std::time::Instant, // When lock was acquired + context_name: String, // Which context holds it + lock_name: String, // Lock identifier + // Auto-reports to context status system + // Auto-cleanup on drop +} +``` + +### Detailed Status Output with Comprehensive Timing + +``` +$ fastn status +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── âŗ WAITING "session-output-lock" (8.1s waiting) âš ī¸ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) âš ī¸ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +âŗ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) âš ī¸ STUCK + +âš ī¸ Potential Issues: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O +``` + +### Automatic Activity Tracking + +```rust +// All operations automatically maintain timing +ctx.spawn(async { + // ctx.touch() called when task starts + loop { + do_work().await; + ctx.touch(); // Update activity timestamp + } +}); + +// Lock operations update timing automatically +let guard = ctx.mutex("data-lock", data).lock().await; +// Updates: context last_activity, tracks lock hold time + +// Long operations should periodically touch +async fn long_running_task(ctx: std::sync::Arc) { + loop { + process_batch().await; + ctx.touch(); // Show we're still active, not stuck + + tokio::select! { + _ = ctx.wait() => break, // Cancelled + _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {} + } + } +} +``` + +This provides **real-time operational debugging** - administrators can instantly identify stuck operations, deadlocked tasks, and performance bottlenecks with precise timing information. + +## Counter Management System + +Every context can track both historical totals and live counts for detailed operational metrics. + +### Global Counter Storage with Dotted Paths + +```rust +pub struct Context { + pub name: String, + pub full_path: String, // "global.remote-access.alice@bv478gen.stdout-handler" + // ... other fields +} + +impl Context { + /// Get full dotted path for this context + pub fn path(&self) -> &str; + + /// Increment total counter (stored in global hashmap by full path) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (stored in global hashmap by full path) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (stored in global hashmap by full path) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values (retrieved from global storage) + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} + +// Global counter storage (persists beyond context lifetimes) +static GLOBAL_COUNTERS: LazyLock>> = ...; + +// Counter keys format: "{context_path}.{counter_name}" +// Examples: +// "global.connections" -> 1,247 +// "global.remote-access.connections" -> 234 +// "global.remote-access.alice@bv478gen.commands" -> 45 +// "global.http-proxy.requests" -> 1,013 +``` + +### Automatic Counter Integration + +```rust +// fastn-p2p automatically maintains connection counters +async fn handle_incoming_connection(session: fastn_p2p::server::Session) { + let ctx = session.context(); + + // Automatically tracked by fastn-p2p: + ctx.increment_total("connections"); // Total connections ever + ctx.increment_live("connections"); // Current active connections + + // Your handler code... + + // When session ends: + ctx.decrement_live("connections"); // Automatically called +} + +// Custom counters for application logic +async fn handle_remote_command(session: server::Session) { + let ctx = session.context(); + + ctx.increment_total("commands"); // Total commands executed + ctx.increment_live("commands"); // Currently executing commands + + let result = execute_command(&session.protocol.command).await; + + ctx.decrement_live("commands"); // Command completed + + if result.success { + ctx.increment_total("successful_commands"); + } else { + ctx.increment_total("failed_commands"); + } +} +``` + +### Enhanced Status Display with Counters + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Total: 1,247 connections, 15,432 requests | Live: 47 connections, 12 active requests +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── Total: 234 connections, 2,156 commands | Live: 2 connections, 3 commands +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── Total: 45 commands (42 success, 3 failed) | Live: 1 command +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── âŗ WAITING "session-output-lock" (8.1s waiting) âš ī¸ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ ├── Total: 12 commands (12 success) | Live: 1 command +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── Total: 1,013 requests (987 success, 26 failed) | Live: 45 connections, 8 requests +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── Total: 4,567 messages, 89 users joined | Live: 15 users, 3 conversations + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): ... +âŗ Lock Waiters (1): ... +``` + +### Counter Storage and Paths + +```rust +// Counter keys are automatically generated from context paths: + +// Global level counters +// "global.connections" -> 1,247 total +// "global.live_connections" -> 47 current + +// Service level counters +// "global.remote-access.connections" -> 234 total +// "global.remote-access.live_connections" -> 2 current + +// Session level counters +// "global.remote-access.alice@bv478gen.commands" -> 45 total +// "global.remote-access.alice@bv478gen.live_commands" -> 1 current + +// Task level counters +// "global.remote-access.alice@bv478gen.stdout-handler.bytes_processed" -> 1,234,567 + +// Examples in code: +async fn handle_connection(session: server::Session) { + let ctx = session.context(); // Path: "global.remote-access.alice@bv478gen" + + // These create global entries: + ctx.increment_total("commands"); // Key: "global.remote-access.alice@bv478gen.commands" + ctx.increment_live("commands"); // Key: "global.remote-access.alice@bv478gen.live_commands" + + // Nested task context + ctx.child("stdout-handler").spawn(|task_ctx| async move { + // task_ctx.path() -> "global.remote-access.alice@bv478gen.stdout-handler" + task_ctx.increment_total("bytes_processed"); + }); +} +``` + +### Persistent Counter Benefits + +- **✅ Survives context drops** - Counters stored globally, persist after contexts end +- **✅ Hierarchical aggregation** - Can sum all child counters for parent totals +- **✅ Path-based queries** - Easy to find counters by context path +- **✅ Historical tracking** - Total counters accumulate across all context instances +- **✅ Live tracking** - Live counters automatically decremented when contexts drop + +**Live counters** show current activity (auto-decremented on context drop). +**Total counters** show historical activity (persist forever for trending). +**Global storage** ensures metrics survive context lifecycles. + +## Status Monitoring and HTTP Dashboard + +fastn-context automatically provides multiple ways to access real-time status information for debugging and monitoring. + +### P2P Status Access + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Status automatically available over P2P for remote access + // No HTTP server needed - uses secure P2P connections + + // Your application code... +} +``` + +Status is accessible over the P2P network using the remote access system. + +### Status API Functions + +```rust +/// Get current status snapshot with ANSI formatting +pub fn status() -> Status; + +/// Stream of status updates (max once per second) +pub fn status_stream() -> impl futures_core::stream::Stream; + +/// Get raw status data as structured JSON +pub fn status_json() -> serde_json::Value; +``` + +### Status Type with ANSI Display + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct Status { + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct ContextStatus { + pub name: String, + pub duration: std::time::Duration, + pub last_activity: std::time::Duration, // Time since last activity + pub children: Vec, + pub metrics: std::collections::HashMap, + pub data: std::collections::HashMap, + pub total_counters: std::collections::HashMap, // Historical counts + pub live_counters: std::collections::HashMap, // Current active counts +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct LockStatus { + pub name: String, + pub held_by_context: String, + pub held_duration: std::time::Duration, + pub lock_type: LockType, // Mutex, RwLock, Semaphore +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct StatusWarning { + pub message: String, + pub context_path: String, + pub severity: WarningSeverity, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub enum WarningSeverity { + Info, // FYI information + Warning, // Potential issue + Critical, // Likely problem +} +``` + +### ANSI-Formatted Display + +```rust +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use colored::*; // For ANSI colors + + // Header with timestamp + writeln!(f, "{}", "fastn Status Dashboard".bold().blue())?; + writeln!(f, "{}", format!("Snapshot: {}", + humantime::format_rfc3339(self.timestamp)).dimmed())?; + writeln!(f)?; + + // Context tree with colors and timing + self.display_context_tree(f, &self.global_context, 0)?; + + // Active locks section + if !self.active_locks.is_empty() { + writeln!(f, "\n{} Active Locks ({}):", "🔒".yellow(), self.active_locks.len())?; + for lock in &self.active_locks { + let duration_str = humantime::format_duration(lock.held_duration); + let color = if lock.held_duration.as_secs() > 10 { + "red" + } else { + "white" + }; + writeln!(f, " - \"{}\" held by {} ({})", + lock.name.cyan(), + lock.held_by_context.white(), + duration_str.color(color))?; + } + } + + // Lock waiters section + if !self.lock_waiters.is_empty() { + writeln!(f, "\n{} Lock Waiters ({}):", "âŗ".yellow(), self.lock_waiters.len())?; + for waiter in &self.lock_waiters { + let duration_str = humantime::format_duration(waiter.waiting_duration); + writeln!(f, " - {} waiting for \"{}\" ({})", + waiter.context_name.white(), + waiter.lock_name.cyan(), + duration_str.red())?; + } + } + + // Warnings section + if !self.warnings.is_empty() { + writeln!(f, "\n{} Warnings:", "âš ī¸".red())?; + for warning in &self.warnings { + let icon = match warning.severity { + WarningSeverity::Info => "â„šī¸", + WarningSeverity::Warning => "âš ī¸", + WarningSeverity::Critical => "🚨", + }; + writeln!(f, " {} {}", icon, warning.message.yellow())?; + } + } + + Ok(()) + } +} +``` + +### Status Stream (Event-Driven Updates) + +```rust +/// Stream provides updates only when context tree actually changes +/// No polling - efficient for long-running monitoring +let mut status_stream = fastn_context::status_stream(); +while let Some(status) = status_stream.next().await { + // Only prints when something actually changes + print!("\x1B[2J\x1B[H"); // Clear screen + println!("{}", status); // Display with colors +} +``` + +### CLI Integration with P2P Status Access + +fastn-context integrates with the main fastn CLI to provide both local and remote status access: + +```bash +# Local machine status +fastn status # One-time snapshot with ANSI colors +fastn status -w # Watch mode (event-driven, no polling) +fastn status --json # JSON output for programmatic use + +# Remote machine status over P2P (requires remote access) +fastn status alice # Status from machine with alias "alice" +fastn status bv478gen... # Status from machine with ID52 +fastn status alice -w # Watch remote machine's status in real-time +fastn status alice --json # Remote machine status as JSON + +# Multiple machines +fastn status alice,bob,prod # Status from multiple machines +``` + +**P2P Status Protocol:** +- Uses secure fastn remote access (same as `fastn rshell`) +- Requires target machine in your `remote-access/config.toml` +- Status data transmitted over encrypted P2P connection +- Real-time streaming for remote watch mode + +### Status Protocol Integration + +Status access integrates seamlessly with fastn's remote access system: + +```rust +// Status is available as a built-in remote command +// When fastn-daemon receives status requests, fastn-context provides the data + +// Server side - automatic status command handling +// fastn-daemon automatically handles: +// - StatusRequest -> returns current Status +// - StatusStreamRequest -> returns real-time Status stream + +// Client side - transparent remote access +fastn status alice // Translates to fastn_p2p::client::call(alice, StatusRequest) +fastn status alice -w // Translates to fastn_p2p::client::connect(alice, StatusStreamProtocol) +``` + +This gives **comprehensive status access** - terminal, HTTP, streaming, and programmatic - all from the same underlying Status structure with rich ANSI formatting for human consumption. + +## System Metrics Monitoring + +fastn-context automatically monitors system resources and integrates them into the status display. + +### Automatic System Monitoring + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct SystemMetrics { + pub cpu_usage_percent: f32, // Current CPU usage + pub memory_used_bytes: u64, // RAM usage + pub memory_total_bytes: u64, // Total RAM + pub disk_used_bytes: u64, // Disk usage + pub disk_total_bytes: u64, // Total disk + pub network_rx_bytes_per_sec: u64, // Network receive rate + pub network_tx_bytes_per_sec: u64, // Network transmit rate + pub load_average: [f32; 3], // 1min, 5min, 15min load + pub uptime: std::time::Duration, // System uptime +} + +// Added to Status structure +pub struct Status { + pub system_metrics: SystemMetrics, // System resource usage + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} +``` + +### Efficient Metric Collection + +```rust +// System metrics cached and updated appropriately: +// - CPU usage: Updated every 1 second (smooth average) +// - Memory/disk: Updated every 5 seconds (less volatile) +// - Network rates: Updated every 1 second (calculated from deltas) +// - Load average: Updated every 10 seconds (system provides this) + +// Metrics only recalculated when status is actually requested +// No background polling unless someone is watching +``` + +### Enhanced Status Display with System Info + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── âŗ WAITING "session-output-lock" (8.1s waiting) âš ī¸ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) âš ī¸ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +âŗ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) âš ī¸ STUCK + +âš ī¸ Alerts: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O + - CPU usage normal (12.3%), memory usage low (13%) +``` + +### Watch Mode (`fastn status -w`) + +```rust +// Event-driven updates - only when something changes +// No CPU overhead when system is idle +// Immediately shows when new contexts/locks appear or disappear + +$ fastn status -w +# Screen updates only when: +# - New context created/destroyed +# - Lock acquired/released +# - Significant activity changes +# - System metrics cross thresholds +# - No updates for days if system is stable +``` + +This provides **complete operational visibility** with both application-specific context trees and system resource monitoring, all with efficient event-driven updates instead of wasteful polling. \ No newline at end of file diff --git a/fastn-context/README.md b/fastn-context/README.md index bfe814f6c..fc9103eb9 100644 --- a/fastn-context/README.md +++ b/fastn-context/README.md @@ -1,39 +1,114 @@ -# fastn-context: Hierarchical Application Context (Minimal Implementation) +# fastn-context: Hierarchical Application Context for Debugging and Operations -This crate provides basic hierarchical context system for fastn applications, enabling tree-based cancellation and task management. This is the minimal implementation needed for fastn-p2p integration. +This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. ## Design Philosophy -- **Hierarchical Structure**: Applications form trees of operations -- **Automatic Inheritance**: Child contexts inherit cancellation from parents +- **Hierarchical Structure**: Applications naturally form trees of operations +- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents - **Zero Boilerplate**: Context trees build themselves as applications run -- **Minimal Surface**: Only essential features for P2P integration +- **Production Ready**: Status trees enable debugging of stuck/slow operations +- **Bounded Complexity**: Simple spawn vs detailed child creation as needed -## Current API (Minimal) +## Core Concepts + +### Context Tree Structure + +Every fastn application forms a natural hierarchy: + +``` +Global Context (application level) +├── Service Context (e.g., "remote-access-listener") +│ ├── Session Context (e.g., "alice@bv478gen") +│ │ ├── Task Context (e.g., "stdout-handler") +│ │ └── Task Context (e.g., "stderr-stream") +│ └── Session Context (e.g., "bob@p2nd7avq") +├── Service Context (e.g., "http-proxy") +└── Service Context (e.g., "chat-service") +``` + +### Automatic Context Creation + +fastn-context integrates seamlessly with fastn ecosystem: + +```rust +// 1. Global context created by main macro +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically available +} + +// 2. Service contexts created by operations +let listener = fastn_p2p::server::listen(key, protocols).await?; +// Creates child context: "p2p-listener" under global + +// 3. Session contexts created per connection +// Each incoming connection gets child context: "session-{peer_id}" + +// 4. Task contexts created by spawn operations +session_ctx.child("shell-handler").spawn(handle_shell); +``` + +## API Reference ### Core Context ```rust pub struct Context { + /// Context name for debugging/status pub name: String, - // private: parent, children, cancellation_token + + /// When this context was created + pub created_at: std::time::Instant, + + // Private: parent, children, cancellation, metrics, data } impl Context { - /// Create new root context (used by main macro) + /// Create new root context (typically only used by main macro) pub fn new(name: &str) -> std::sync::Arc; - /// Create child context - pub fn child(&self, name: &str) -> std::sync::Arc; + /// Create child context with given name + pub fn child(&self, name: &str) -> ContextBuilder; - /// Spawn task with context inheritance - pub fn spawn(&self, task: F) -> tokio::task::JoinHandle; + /// Simple spawn (inherits current context, no child creation) + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle + where F: std::future::Future + Send + 'static; /// Wait for cancellation signal pub async fn wait(&self); - /// Cancel this context and all children + /// Cancel this context and all children recursively pub fn cancel(&self); + + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); + + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); + + /// Get stored data + pub fn get_data(&self, key: &str) -> Option; +} +``` + +### Context Builder + +```rust +pub struct ContextBuilder { + // Pre-created child context ready for configuration +} + +impl ContextBuilder { + /// Add initial data to context + pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; + + /// Add initial metric to context + pub fn with_metric(self, key: &str, value: MetricValue) -> Self; + + /// Spawn task with this configured child context + pub fn spawn(self, task: F) -> tokio::task::JoinHandle + where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; } ``` @@ -43,51 +118,142 @@ impl Context { /// Get the global application context pub fn global() -> std::sync::Arc; -/// Get current task's context +/// Get current task's context (thread-local or task-local) pub fn current() -> std::sync::Arc; ``` -### Main Function Macro +### Metric Types + +```rust +#[derive(Debug, Clone)] +pub enum MetricValue { + Counter(u64), + Gauge(f64), + Duration(std::time::Duration), + Text(String), + Bytes(u64), +} +``` + +## Usage Patterns + +### Simple Task Spawning + +```rust +// Inherit current context (no child creation) +let ctx = fastn_context::current(); +ctx.spawn(async { + // Simple background task +}); +``` + +### Detailed Task Spawning + +```rust +// Create child context with debugging info +ctx.child("remote-shell-handler") + .with_data("peer", alice_id52) + .with_data("shell", "bash") + .with_metric("commands_executed", 0) + .spawn(|task_ctx| async move { + // Task can update its own context + task_ctx.add_metric("commands_executed", cmd_count); + task_ctx.set_data("last_command", "ls -la"); + + // Task waits for its own cancellation + tokio::select! { + _ = task_ctx.wait() => { + println!("Shell handler cancelled"); + } + _ = handle_shell_session() => { + println!("Shell session completed"); + } + } + }); +``` + +## Integration with fastn-p2p + +fastn-p2p depends on fastn-context and automatically creates context hierarchies: + +```rust +// fastn-p2p sessions provide access to their context +async fn handle_remote_shell(session: fastn_p2p::server::Session) { + let ctx = session.context(); // Auto-created by fastn-p2p + + // Simple spawn (inherits session context) + ctx.spawn(pipe_stdout(session.send)); + + // Detailed spawn (creates child for debugging) + ctx.child("command-executor") + .with_data("command", session.protocol.command) + .spawn(|task_ctx| async move { + let result = execute_command(&session.protocol.command).await; + task_ctx.set_data("exit_code", result.code); + }); +} +``` + +## Main Function Integration + +The main macro sets up the global context and provides comprehensive configuration: ```rust -/// Main function with automatic context setup #[fastn_context::main] async fn main() -> eyre::Result<()> { // Global context automatically created and available - let ctx = fastn_context::global(); - ctx.spawn(async { - // Task inherits global context cancellation - }); + let ctx = fastn_context::global(); + ctx.child("startup") + .with_data("version", env!("CARGO_PKG_VERSION")) + .spawn(|_| async { + // Application initialization + }); } ``` -## Usage for fastn-p2p Integration +### Configuration Options ```rust -// fastn-p2p sessions will provide context access -async fn handle_remote_shell(session: fastn_p2p::server::Session) { - let ctx = session.context(); // Context provided by fastn-p2p +#[fastn_context::main( + // Logging configuration + logging = true, // Default: true - simple logging setup - // Spawn tasks with inherited cancellation - ctx.spawn(handle_stdout(session.send)); - ctx.spawn(handle_stdin(session.recv)); + // Shutdown behavior + shutdown_mode = "single_ctrl_c", // Default: "single_ctrl_c" + shutdown_timeout = "30s", // Default: "30s" - graceful shutdown timeout - // Tasks automatically cancelled when session context cancelled + // Double Ctrl+C specific (only when shutdown_mode = "double_ctrl_c") + double_ctrl_c_window = "2s", // Default: "2s" - time window for second Ctrl+C + status_fn = my_status_printer, // Required for double_ctrl_c mode +)] +async fn main() -> eyre::Result<()> { + // Your application code +} + +// Status function (required for double_ctrl_c mode) +async fn my_status_printer() { + println!("=== Application Status ==="); + // Custom status logic - access global registries, counters, etc. + println!("Active services: {}", get_active_service_count()); } ``` -## Scope +## Design Benefits + +1. **Names Required for Debugging** - Every important operation has a name in status tree +2. **Selective Complexity** - Simple spawn vs detailed child creation as needed +3. **Automatic Tree Building** - Context hierarchy builds as application runs +4. **Production Debugging** - Status trees show exactly where system is stuck +5. **Clean Separation** - Context concerns separate from networking concerns +6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure + +**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. -This minimal implementation provides: -- ✅ Basic context tree structure -- ✅ Hierarchical cancellation -- ✅ Task spawning with inheritance -- ✅ Integration points for fastn-p2p +## Future Features -**Not included** (see NEXT-*.md files): -- Detailed metrics and counters -- Named locks and deadlock detection -- System monitoring -- Status trees and HTTP dashboard -- Advanced timing and monitoring \ No newline at end of file +See NEXT-*.md files for planned enhancements: +- **NEXT-monitoring.md**: Status trees, timing, system metrics monitoring +- **NEXT-locks.md**: Named locks and deadlock detection +- **NEXT-counters.md**: Global counter storage with dotted paths +- **NEXT-status-distribution.md**: P2P distributed status access \ No newline at end of file From a814f97c0b2c78c1bd9279fbc0c452ba6b64f7c7 Mon Sep 17 00:00:00 2001 From: Amit Upadhyay Date: Tue, 16 Sep 2025 17:09:35 +0530 Subject: [PATCH 7/9] refactor: update fastn-context to use explicit context passing instead of current() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove fastn_context::current() function from API to avoid hidden dependencies - Update test example to use explicit context passing patterns - Show two explicit patterns: context cloning and builder pattern with context parameter - Update README examples to use explicit context instead of current() - Remove thread-local/task-local context access from design Explicit context approach benefits: - Clear dependencies - functions declare context needs explicitly - Better testability - easy to provide specific context for tests - No magic behavior - context comes from explicit sources - Reviewable code - easy to see context usage patterns Will prove if explicit approach works well or if current() convenience needed later. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- fastn-context/README.md | 13 ++++----- fastn-context/examples/minimal_test.rs | 40 +++++++++++++++----------- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/fastn-context/README.md b/fastn-context/README.md index fc9103eb9..4751f1e17 100644 --- a/fastn-context/README.md +++ b/fastn-context/README.md @@ -117,9 +117,6 @@ impl ContextBuilder { ```rust /// Get the global application context pub fn global() -> std::sync::Arc; - -/// Get current task's context (thread-local or task-local) -pub fn current() -> std::sync::Arc; ``` ### Metric Types @@ -140,10 +137,12 @@ pub enum MetricValue { ### Simple Task Spawning ```rust -// Inherit current context (no child creation) -let ctx = fastn_context::current(); -ctx.spawn(async { - // Simple background task +// Use explicit context (no child creation) +let ctx = fastn_context::global(); // or passed as parameter +let ctx_clone = ctx.clone(); +ctx.spawn(async move { + // Simple background task with explicit context + ctx_clone.add_metric("task_completed", fastn_context::MetricValue::Counter(1)); }); ``` diff --git a/fastn-context/examples/minimal_test.rs b/fastn-context/examples/minimal_test.rs index c722be362..d36afb1ba 100644 --- a/fastn-context/examples/minimal_test.rs +++ b/fastn-context/examples/minimal_test.rs @@ -9,34 +9,40 @@ async fn main() -> eyre::Result<()> { let global_ctx = fastn_context::global(); println!("Global context created: {}", global_ctx.name); - // Test basic child creation + // Test basic child creation with builder pattern let service_ctx = global_ctx.child("test-service"); println!("Service context created: {}", service_ctx.name); - // Test task spawning with context inheritance - service_ctx.spawn(async { + // Test simple task spawning with explicit context + let service_ctx_clone = service_ctx.clone(); + service_ctx.spawn(async move { println!("Task 1 running in service context"); // Simulate some work tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; println!("Task 1 completed"); + + // Context explicitly available + service_ctx_clone.add_metric("task1_completed", fastn_context::MetricValue::Counter(1)); }); - // Test nested contexts - let session_ctx = service_ctx.child("test-session"); - session_ctx.spawn(async { - println!("Task 2 running in session context"); - - // Test cancellation handling - tokio::select! { - _ = fastn_context::current().wait() => { - println!("Task 2 cancelled by context"); + // Test builder pattern with explicit context passing + service_ctx.child("test-session") + .with_data("session_type", serde_json::Value::String("test".to_string())) + .spawn(|task_ctx| async move { + println!("Task 2 running with explicit context: {}", task_ctx.name); + + // Test cancellation handling with explicit context + tokio::select! { + _ = task_ctx.wait() => { + println!("Task 2 cancelled by explicit context"); + } + _ = tokio::time::sleep(tokio::time::Duration::from_millis(200)) => { + println!("Task 2 completed normally"); + task_ctx.set_data("completion_status", serde_json::Value::String("success".to_string())); + } } - _ = tokio::time::sleep(tokio::time::Duration::from_millis(200)) => { - println!("Task 2 completed normally"); - } - } - }); + }); // Let tasks run briefly tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; From 611e55af80fa6c64df98f22d0e903f088abf5e60 Mon Sep 17 00:00:00 2001 From: Amit Upadhyay Date: Tue, 16 Sep 2025 17:17:56 +0530 Subject: [PATCH 8/9] feat: add spawn_child() shortcut method to fastn-context API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add spawn_child(name, |ctx|) as clean shortcut for common named task pattern - Update README to show three spawning patterns: - spawn() - inherit current context (no child) - spawn_child(name, |ctx|) - common case shortcut - child(name).with_*().spawn(|ctx|) - full builder pattern - Update test example to use spawn_child() instead of verbose cloning - Show both shortcut and builder pattern alternatives in documentation The spawn_child() shortcut eliminates verbose context cloning while providing named tasks essential for debugging. Most common pattern for everyday use. Three-tier API provides appropriate complexity levels: - Simple inheritance for basic tasks - Named children for debuggable tasks - Full builder for complex initialization 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- fastn-context/README.md | 20 ++++++++++++++++---- fastn-context/examples/minimal_test.rs | 9 ++++----- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/fastn-context/README.md b/fastn-context/README.md index 4751f1e17..db1f9fc60 100644 --- a/fastn-context/README.md +++ b/fastn-context/README.md @@ -75,6 +75,12 @@ impl Context { pub fn spawn(&self, task: F) -> tokio::task::JoinHandle where F: std::future::Future + Send + 'static; + /// Spawn task with named child context (common case shortcut) + pub fn spawn_child(&self, name: &str, task: F) -> tokio::task::JoinHandle + where + F: FnOnce(std::sync::Arc) -> Fut + Send + 'static, + Fut: std::future::Future + Send + 'static; + /// Wait for cancellation signal pub async fn wait(&self); @@ -137,13 +143,19 @@ pub enum MetricValue { ### Simple Task Spawning ```rust -// Use explicit context (no child creation) +// Simple named task spawning (common case) let ctx = fastn_context::global(); // or passed as parameter -let ctx_clone = ctx.clone(); -ctx.spawn(async move { +ctx.spawn_child("background-task", |task_ctx| async move { // Simple background task with explicit context - ctx_clone.add_metric("task_completed", fastn_context::MetricValue::Counter(1)); + task_ctx.add_metric("task_completed", fastn_context::MetricValue::Counter(1)); }); + +// Alternative: builder pattern for simple case +ctx.child("background-task") + .spawn(|task_ctx| async move { + // Same result, different syntax + task_ctx.add_metric("task_completed", fastn_context::MetricValue::Counter(1)); + }); ``` ### Detailed Task Spawning diff --git a/fastn-context/examples/minimal_test.rs b/fastn-context/examples/minimal_test.rs index d36afb1ba..dc5c58274 100644 --- a/fastn-context/examples/minimal_test.rs +++ b/fastn-context/examples/minimal_test.rs @@ -13,17 +13,16 @@ async fn main() -> eyre::Result<()> { let service_ctx = global_ctx.child("test-service"); println!("Service context created: {}", service_ctx.name); - // Test simple task spawning with explicit context - let service_ctx_clone = service_ctx.clone(); - service_ctx.spawn(async move { - println!("Task 1 running in service context"); + // Test simple task spawning with shortcut + service_ctx.spawn_child("simple-task", |task_ctx| async move { + println!("Task 1 running with explicit context: {}", task_ctx.name); // Simulate some work tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; println!("Task 1 completed"); // Context explicitly available - service_ctx_clone.add_metric("task1_completed", fastn_context::MetricValue::Counter(1)); + task_ctx.add_metric("task1_completed", fastn_context::MetricValue::Counter(1)); }); // Test builder pattern with explicit context passing From eb135de9127340e6100698eaeac36f34407c4983 Mon Sep 17 00:00:00 2001 From: Amit Upadhyay Date: Tue, 16 Sep 2025 17:42:43 +0530 Subject: [PATCH 9/9] feat: add operation tracking design and clean minimal fastn-context README MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Create NEXT-operation-tracking.md with named await/select macro design - Design fastn_context::await!() macro for tracking stuck operations - Design fastn_context::select!() macro for select operation tracking - Remove all metrics and data storage from current README scope - Clean up test examples to focus only on context trees and cancellation - Simplify Context struct to just name and cancellation (no metrics/data) - Update ContextBuilder to remove with_data/with_metric methods Current README now focused only on core context functionality: - Context tree structure with parent/child relationships - Hierarchical cancellation with wait/cancel - Three spawning patterns: spawn, spawn_child, child().spawn - Integration patterns for fastn-p2p All advanced features properly moved to NEXT-*.md files for future implementation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- fastn-context/NEXT-metrics-and-data.md | 68 +++++++++++++++ fastn-context/NEXT-operation-tracking.md | 84 +++++++++++++++++++ fastn-context/README.md | 100 ++++++++--------------- fastn-context/examples/minimal_test.rs | 6 +- 4 files changed, 186 insertions(+), 72 deletions(-) create mode 100644 fastn-context/NEXT-metrics-and-data.md create mode 100644 fastn-context/NEXT-operation-tracking.md diff --git a/fastn-context/NEXT-metrics-and-data.md b/fastn-context/NEXT-metrics-and-data.md new file mode 100644 index 000000000..c1950c559 --- /dev/null +++ b/fastn-context/NEXT-metrics-and-data.md @@ -0,0 +1,68 @@ +# NEXT: Metrics and Data Storage + +Features for storing metrics and arbitrary data on contexts for debugging and monitoring. + +## Metric Storage + +```rust +impl Context { + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); +} + +#[derive(Debug, Clone)] +pub enum MetricValue { + Counter(u64), + Gauge(f64), + Duration(std::time::Duration), + Text(String), + Bytes(u64), +} +``` + +## Data Storage + +```rust +impl Context { + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); + + /// Get stored data + pub fn get_data(&self, key: &str) -> Option; +} +``` + +## Builder Pattern Integration + +```rust +impl ContextBuilder { + /// Add initial data to context + pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; + + /// Add initial metric to context + pub fn with_metric(self, key: &str, value: MetricValue) -> Self; +} +``` + +## Usage Examples + +```rust +// Add metrics during task execution +task_ctx.add_metric("commands_executed", MetricValue::Counter(cmd_count)); +task_ctx.add_metric("response_time", MetricValue::Duration(elapsed)); + +// Store debugging data +task_ctx.set_data("last_command", serde_json::Value::String("ls -la".to_string())); +task_ctx.set_data("exit_code", serde_json::Value::Number(0.into())); + +// Pre-configure context with builder +ctx.child("remote-shell-handler") + .with_data("peer", serde_json::Value::String(alice_id52)) + .with_data("shell", serde_json::Value::String("bash".to_string())) + .with_metric("commands_executed", MetricValue::Counter(0)) + .spawn(|task_ctx| async move { + // Task starts with pre-configured data and metrics + }); +``` + +**Implementation**: After basic Context tree structure is working. \ No newline at end of file diff --git a/fastn-context/NEXT-operation-tracking.md b/fastn-context/NEXT-operation-tracking.md new file mode 100644 index 000000000..bfce50f23 --- /dev/null +++ b/fastn-context/NEXT-operation-tracking.md @@ -0,0 +1,84 @@ +# NEXT: Operation Tracking for Precise Debugging + +Features for tracking exactly where tasks are stuck using named await and select operations. + +## Named Await Operations + +```rust +/// Track what operation a context is waiting for +let result = fastn_context::await!("waiting-for-response", some_operation()); +// No .await suffix - the macro handles it + +// Examples +let data = fastn_context::await!("reading-file", std::fs::read("config.toml")); +let response = fastn_context::await!("http-request", client.get(url).send()); +let connection = fastn_context::await!("database-connect", db.connect()); +``` + +## Simple Select Tracking + +```rust +/// Track that context is stuck on select (no branch naming needed) +fastn_context::select! { + _ = task_ctx.wait() => println!("Cancelled"), + _ = stream.read() => println!("Data received"), + _ = database.query() => println!("Query complete"), +} +// Records: "stuck on select" +``` + +## Status Display with Operation Names + +``` +Global Context (2h 15m uptime) +├── Remote Access Listener +│ ├── alice@bv478gen (23m connected) +│ │ ├── stdout-handler (stuck on: "reading-stream" 12.3s) âš ī¸ STUCK +│ │ └── stderr-stream (stuck on: "select" 8.1s) +│ └── bob@p2nd7avq (stuck on: "database-query" 0.2s) ✅ ACTIVE +└── HTTP Proxy (stuck on: "select" 0.1s) ✅ ACTIVE +``` + +## Design Principles + +### Single Operation per Context +- **Good design**: One await/select per context encourages proper task breakdown +- **Multiple selects**: Suggests need for child contexts instead + +```rust +// ❌ Complex - hard to debug where it's stuck +fastn_context::select! { /* 5 different operations */ } +fastn_context::select! { /* 3 more operations */ } + +// ✅ Clear - each operation has its own context +ctx.spawn_child("network-handler", |ctx| async move { + fastn_context::select! { /* network operations */ } +}); +ctx.spawn_child("database-handler", |ctx| async move { + let result = fastn_context::await!("user-query", db.get_user(id)); +}); +``` + +### Automatic Operation Tracking + +```rust +// Context automatically tracks current operation +pub struct Context { + current_operation: std::sync::Arc>>, + operation_started: std::sync::Arc>>, +} + +// Status can show: +// - What operation is running +// - How long it's been running +// - If it's stuck (running too long) +``` + +## Benefits + +1. **Precise debugging** - Know exactly where each task is stuck +2. **Performance insights** - See which operations take too long +3. **Design enforcement** - Encourages proper context decomposition +4. **Production monitoring** - Real-time operation visibility + +**Implementation**: After basic Context + monitoring system is complete. \ No newline at end of file diff --git a/fastn-context/README.md b/fastn-context/README.md index db1f9fc60..41bcd1b39 100644 --- a/fastn-context/README.md +++ b/fastn-context/README.md @@ -58,10 +58,7 @@ pub struct Context { /// Context name for debugging/status pub name: String, - /// When this context was created - pub created_at: std::time::Instant, - - // Private: parent, children, cancellation, metrics, data + // Private: parent, children, cancellation_token } impl Context { @@ -86,15 +83,6 @@ impl Context { /// Cancel this context and all children recursively pub fn cancel(&self); - - /// Add metric data for status reporting - pub fn add_metric(&self, key: &str, value: MetricValue); - - /// Store arbitrary data on this context - pub fn set_data(&self, key: &str, value: serde_json::Value); - - /// Get stored data - pub fn get_data(&self, key: &str) -> Option; } ``` @@ -102,17 +90,11 @@ impl Context { ```rust pub struct ContextBuilder { - // Pre-created child context ready for configuration + // Pre-created child context ready for spawning } impl ContextBuilder { - /// Add initial data to context - pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; - - /// Add initial metric to context - pub fn with_metric(self, key: &str, value: MetricValue) -> Self; - - /// Spawn task with this configured child context + /// Spawn task with this child context pub fn spawn(self, task: F) -> tokio::task::JoinHandle where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; } @@ -125,18 +107,6 @@ impl ContextBuilder { pub fn global() -> std::sync::Arc; ``` -### Metric Types - -```rust -#[derive(Debug, Clone)] -pub enum MetricValue { - Counter(u64), - Gauge(f64), - Duration(std::time::Duration), - Text(String), - Bytes(u64), -} -``` ## Usage Patterns @@ -147,40 +117,34 @@ pub enum MetricValue { let ctx = fastn_context::global(); // or passed as parameter ctx.spawn_child("background-task", |task_ctx| async move { // Simple background task with explicit context - task_ctx.add_metric("task_completed", fastn_context::MetricValue::Counter(1)); + println!("Running in context: {}", task_ctx.name); }); // Alternative: builder pattern for simple case ctx.child("background-task") .spawn(|task_ctx| async move { // Same result, different syntax - task_ctx.add_metric("task_completed", fastn_context::MetricValue::Counter(1)); + println!("Running in context: {}", task_ctx.name); }); ``` -### Detailed Task Spawning +### Cancellation Handling ```rust -// Create child context with debugging info -ctx.child("remote-shell-handler") - .with_data("peer", alice_id52) - .with_data("shell", "bash") - .with_metric("commands_executed", 0) - .spawn(|task_ctx| async move { - // Task can update its own context - task_ctx.add_metric("commands_executed", cmd_count); - task_ctx.set_data("last_command", "ls -la"); - - // Task waits for its own cancellation - tokio::select! { - _ = task_ctx.wait() => { - println!("Shell handler cancelled"); - } - _ = handle_shell_session() => { - println!("Shell session completed"); - } +// Task waits for context cancellation +ctx.spawn_child("shell-handler", |task_ctx| async move { + println!("Shell handler starting: {}", task_ctx.name); + + // Task waits for its own cancellation + tokio::select! { + _ = task_ctx.wait() => { + println!("Shell handler cancelled"); } - }); + _ = handle_shell_session() => { + println!("Shell session completed"); + } + } +}); ``` ## Integration with fastn-p2p @@ -195,13 +159,12 @@ async fn handle_remote_shell(session: fastn_p2p::server::Session eyre::Result<()> { // Global context automatically created and available let ctx = fastn_context::global(); - ctx.child("startup") - .with_data("version", env!("CARGO_PKG_VERSION")) - .spawn(|_| async { - // Application initialization - }); + ctx.spawn_child("startup", |startup_ctx| async move { + println!("Application starting: {}", startup_ctx.name); + // Application initialization + }); } ``` @@ -264,7 +226,9 @@ async fn my_status_printer() { ## Future Features See NEXT-*.md files for planned enhancements: + +- **NEXT-metrics-and-data.md**: Metric storage and arbitrary data on contexts - **NEXT-monitoring.md**: Status trees, timing, system metrics monitoring - **NEXT-locks.md**: Named locks and deadlock detection - **NEXT-counters.md**: Global counter storage with dotted paths -- **NEXT-status-distribution.md**: P2P distributed status access \ No newline at end of file +- **NEXT-status-distribution.md**: P2P distributed status access diff --git a/fastn-context/examples/minimal_test.rs b/fastn-context/examples/minimal_test.rs index dc5c58274..9d9de4b8a 100644 --- a/fastn-context/examples/minimal_test.rs +++ b/fastn-context/examples/minimal_test.rs @@ -21,13 +21,12 @@ async fn main() -> eyre::Result<()> { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; println!("Task 1 completed"); - // Context explicitly available - task_ctx.add_metric("task1_completed", fastn_context::MetricValue::Counter(1)); + // Context explicitly available for basic operations + println!("Task context available: {}", task_ctx.name); }); // Test builder pattern with explicit context passing service_ctx.child("test-session") - .with_data("session_type", serde_json::Value::String("test".to_string())) .spawn(|task_ctx| async move { println!("Task 2 running with explicit context: {}", task_ctx.name); @@ -38,7 +37,6 @@ async fn main() -> eyre::Result<()> { } _ = tokio::time::sleep(tokio::time::Duration::from_millis(200)) => { println!("Task 2 completed normally"); - task_ctx.set_data("completion_status", serde_json::Value::String("success".to_string())); } } });