diff --git a/Cargo.toml b/Cargo.toml index e66617493..755030069 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "clift", "fastn", "fastn-builtins", + "fastn-context", "fastn-core", "fastn-daemon", "fastn-ds", @@ -89,6 +90,7 @@ enum-iterator = "0.6" enum-iterator-derive = "0.6" env_logger = "0.11" fastn-builtins.path = "fastn-builtins" +fastn-context.path = "fastn-context" fastn-core.path = "fastn-core" fastn-ds.path = "fastn-ds" fastn-daemon.path = "fastn-daemon" diff --git a/fastn-context/Cargo.toml b/fastn-context/Cargo.toml new file mode 100644 index 000000000..191a7f8cd --- /dev/null +++ b/fastn-context/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "fastn-context" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +description.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +rust-version.workspace = true + +[dependencies] +tokio.workspace = true +tokio-util = { version = "0.7", features = ["sync"] } +eyre.workspace = true \ No newline at end of file diff --git a/fastn-context/NEXT-complete-design.md b/fastn-context/NEXT-complete-design.md new file mode 100644 index 000000000..eb48306d9 --- /dev/null +++ b/fastn-context/NEXT-complete-design.md @@ -0,0 +1,844 @@ +# fastn-context: Hierarchical Application Context for Debugging and Operations + +This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. + +> **Note**: This README documents the complete design iteratively. Some sections may overlap as features build on each other. The design is internally consistent - later sections refine and extend earlier concepts. + +## Design Philosophy + +- **Hierarchical Structure**: Applications naturally form trees of operations +- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents +- **Zero Boilerplate**: Context trees build themselves as applications run +- **Production Ready**: Status trees enable debugging of stuck/slow operations +- **Bounded Complexity**: Simple spawn vs detailed child creation as needed + +## Core Concepts + +### Context Tree Structure + +Every fastn application forms a natural hierarchy: + +``` +Global Context (application level) +├── Service Context (e.g., "remote-access-listener") +│ ├── Session Context (e.g., "alice@bv478gen") +│ │ ├── Task Context (e.g., "stdout-handler") +│ │ └── Task Context (e.g., "stderr-stream") +│ └── Session Context (e.g., "bob@p2nd7avq") +├── Service Context (e.g., "http-proxy") +└── Service Context (e.g., "chat-service") +``` + +### Automatic Context Creation + +fastn-context integrates seamlessly with fastn ecosystem: + +```rust +// 1. Global context created by main macro +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically available +} + +// 2. Service contexts created by operations +let listener = fastn_p2p::server::listen(key, protocols).await?; +// Creates child context: "p2p-listener" under global + +// 3. Session contexts created per connection +// Each incoming connection gets child context: "session-{peer_id}" + +// 4. Task contexts created by spawn operations +session_ctx.child("shell-handler").spawn(handle_shell); +``` + +## API Reference + +### Core Context + +```rust +pub struct Context { + /// Context name for debugging/status + pub name: String, + + /// When this context was created + pub created_at: std::time::Instant, + + // Private: parent, children, cancellation, metrics, data +} + +impl Context { + /// Create new root context (typically only used by main macro) + pub fn new(name: &str) -> std::sync::Arc; + + /// Create child context with given name + pub fn child(&self, name: &str) -> ContextBuilder; + + /// Simple spawn (inherits current context, no child creation) + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle + where F: std::future::Future + Send + 'static; + + /// Wait for cancellation signal + pub async fn wait(&self); + + /// Cancel this context and all children recursively + pub fn cancel(&self); + + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); + + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); + + /// Get stored data + pub fn get_data(&self, key: &str) -> Option; + + /// Increment total counter (historical count) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (current active count) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (when operation completes) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} +``` + +### Context Builder + +```rust +pub struct ContextBuilder { + // Pre-created child context ready for configuration +} + +impl ContextBuilder { + /// Add initial data to context + pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; + + /// Add initial metric to context + pub fn with_metric(self, key: &str, value: MetricValue) -> Self; + + /// Spawn task with this configured child context + pub fn spawn(self, task: F) -> tokio::task::JoinHandle + where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; +} +``` + +### Global Access + +```rust +/// Get the global application context +pub fn global() -> std::sync::Arc; + +/// Get current task's context (thread-local or task-local) +pub fn current() -> std::sync::Arc; + +/// Print status tree for debugging +pub fn status() -> StatusTree; +``` + +### Metric Types + +```rust +#[derive(Debug, Clone)] +pub enum MetricValue { + Counter(u64), + Gauge(f64), + Duration(std::time::Duration), + Text(String), + Bytes(u64), +} +``` + +## Usage Patterns + +### Simple Task Spawning + +```rust +// Inherit current context (no child creation) +let ctx = fastn_context::current(); +ctx.spawn(async { + // Simple background task +}); +``` + +### Detailed Task Spawning + +```rust +// Create child context with debugging info +ctx.child("remote-shell-handler") + .with_data("peer", alice_id52) + .with_data("shell", "bash") + .with_metric("commands_executed", 0) + .spawn(|task_ctx| async move { + // Task can update its own context + task_ctx.add_metric("commands_executed", cmd_count); + task_ctx.set_data("last_command", "ls -la"); + + // Task waits for its own cancellation + tokio::select! { + _ = task_ctx.wait() => { + println!("Shell handler cancelled"); + } + _ = handle_shell_session() => { + println!("Shell session completed"); + } + } + }); +``` + +### Status Tree Output + +``` +$ fastn status +Global Context (2h 15m 32s uptime) +├── Remote Access Listener (1h 45m active) +│ ├── alice@bv478gen (23m 12s, bash shell) +│ │ ├── stdout-handler (23m 12s, 15.2MB processed) +│ │ └── stderr-stream (18m 45s, 2.1KB processed) +│ └── bob@p2nd7avq (8m 33s, ls command) +│ └── command-executor (8m 33s, exit pending) +├── HTTP Proxy (2h 15m active) +│ ├── connection-pool (45 active, 1,234 requests) +│ └── request-handler-pool (12 workers active) +└── Chat Service (35m active) + ├── presence-monitor (35m, 15 users tracked) + └── message-relay (35m, 4,567 messages) +``` + +## Integration with fastn-p2p + +fastn-p2p depends on fastn-context and automatically creates context hierarchies: + +```rust +// fastn-p2p sessions provide access to their context +async fn handle_remote_shell(session: fastn_p2p::server::Session) { + let ctx = session.context(); // Auto-created by fastn-p2p + + // Simple spawn (inherits session context) + ctx.spawn(pipe_stdout(session.send)); + + // Detailed spawn (creates child for debugging) + ctx.child("command-executor") + .with_data("command", session.protocol.command) + .spawn(|task_ctx| async move { + let result = execute_command(&session.protocol.command).await; + task_ctx.set_data("exit_code", result.code); + }); +} +``` + +## Main Function Integration + +The main macro moves to fastn-context and sets up the global context: + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically created and available + + let ctx = fastn_context::global(); + ctx.child("startup") + .with_data("version", env!("CARGO_PKG_VERSION")) + .spawn(|_| async { + // Application initialization + }); +} +``` + +## Design Benefits + +1. **Names Required for Debugging** - Every important operation has a name in status tree +2. **Selective Complexity** - Simple spawn vs detailed child creation as needed +3. **Automatic Tree Building** - Context hierarchy builds as application runs +4. **Production Debugging** - `fastn status` shows exactly where system is stuck +5. **Clean Separation** - Context concerns separate from networking concerns +6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure + +**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. + +## Comprehensive Timing and Lock Monitoring + +Every context and operation tracks detailed timing for real-time debugging, including named lock monitoring for deadlock detection. + +### Timing Integration + +```rust +pub struct Context { + pub name: String, + pub created_at: std::time::Instant, // When context started + pub last_activity: std::sync::Arc>, // Last activity + // ... other fields +} + +impl Context { + /// Update last activity timestamp (called automatically by operations) + pub fn touch(&self); + + /// Get how long this context has been alive + pub fn duration(&self) -> std::time::Duration; + + /// Get how long since last activity + pub fn idle_duration(&self) -> std::time::Duration; + + /// Create named mutex within this context + pub fn mutex(&self, name: &str, data: T) -> ContextMutex; + + /// Create named RwLock within this context + pub fn rwlock(&self, name: &str, data: T) -> ContextRwLock; + + /// Create named semaphore within this context + pub fn semaphore(&self, name: &str, permits: usize) -> ContextSemaphore; +} +``` + +### Named Lock Types + +```rust +pub struct ContextMutex { + name: String, + context: std::sync::Arc, + inner: tokio::sync::Mutex, +} + +impl ContextMutex { + /// Lock with automatic status tracking + pub async fn lock(&self) -> ContextMutexGuard; +} + +pub struct ContextMutexGuard { + acquired_at: std::time::Instant, // When lock was acquired + context_name: String, // Which context holds it + lock_name: String, // Lock identifier + // Auto-reports to context status system + // Auto-cleanup on drop +} +``` + +### Detailed Status Output with Comprehensive Timing + +``` +$ fastn status +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +⏳ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK + +⚠️ Potential Issues: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O +``` + +### Automatic Activity Tracking + +```rust +// All operations automatically maintain timing +ctx.spawn(async { + // ctx.touch() called when task starts + loop { + do_work().await; + ctx.touch(); // Update activity timestamp + } +}); + +// Lock operations update timing automatically +let guard = ctx.mutex("data-lock", data).lock().await; +// Updates: context last_activity, tracks lock hold time + +// Long operations should periodically touch +async fn long_running_task(ctx: std::sync::Arc) { + loop { + process_batch().await; + ctx.touch(); // Show we're still active, not stuck + + tokio::select! { + _ = ctx.wait() => break, // Cancelled + _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {} + } + } +} +``` + +This provides **real-time operational debugging** - administrators can instantly identify stuck operations, deadlocked tasks, and performance bottlenecks with precise timing information. + +## Counter Management System + +Every context can track both historical totals and live counts for detailed operational metrics. + +### Global Counter Storage with Dotted Paths + +```rust +pub struct Context { + pub name: String, + pub full_path: String, // "global.remote-access.alice@bv478gen.stdout-handler" + // ... other fields +} + +impl Context { + /// Get full dotted path for this context + pub fn path(&self) -> &str; + + /// Increment total counter (stored in global hashmap by full path) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (stored in global hashmap by full path) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (stored in global hashmap by full path) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values (retrieved from global storage) + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} + +// Global counter storage (persists beyond context lifetimes) +static GLOBAL_COUNTERS: LazyLock>> = ...; + +// Counter keys format: "{context_path}.{counter_name}" +// Examples: +// "global.connections" -> 1,247 +// "global.remote-access.connections" -> 234 +// "global.remote-access.alice@bv478gen.commands" -> 45 +// "global.http-proxy.requests" -> 1,013 +``` + +### Automatic Counter Integration + +```rust +// fastn-p2p automatically maintains connection counters +async fn handle_incoming_connection(session: fastn_p2p::server::Session) { + let ctx = session.context(); + + // Automatically tracked by fastn-p2p: + ctx.increment_total("connections"); // Total connections ever + ctx.increment_live("connections"); // Current active connections + + // Your handler code... + + // When session ends: + ctx.decrement_live("connections"); // Automatically called +} + +// Custom counters for application logic +async fn handle_remote_command(session: server::Session) { + let ctx = session.context(); + + ctx.increment_total("commands"); // Total commands executed + ctx.increment_live("commands"); // Currently executing commands + + let result = execute_command(&session.protocol.command).await; + + ctx.decrement_live("commands"); // Command completed + + if result.success { + ctx.increment_total("successful_commands"); + } else { + ctx.increment_total("failed_commands"); + } +} +``` + +### Enhanced Status Display with Counters + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Total: 1,247 connections, 15,432 requests | Live: 47 connections, 12 active requests +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── Total: 234 connections, 2,156 commands | Live: 2 connections, 3 commands +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── Total: 45 commands (42 success, 3 failed) | Live: 1 command +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ ├── Total: 12 commands (12 success) | Live: 1 command +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── Total: 1,013 requests (987 success, 26 failed) | Live: 45 connections, 8 requests +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── Total: 4,567 messages, 89 users joined | Live: 15 users, 3 conversations + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): ... +⏳ Lock Waiters (1): ... +``` + +### Counter Storage and Paths + +```rust +// Counter keys are automatically generated from context paths: + +// Global level counters +// "global.connections" -> 1,247 total +// "global.live_connections" -> 47 current + +// Service level counters +// "global.remote-access.connections" -> 234 total +// "global.remote-access.live_connections" -> 2 current + +// Session level counters +// "global.remote-access.alice@bv478gen.commands" -> 45 total +// "global.remote-access.alice@bv478gen.live_commands" -> 1 current + +// Task level counters +// "global.remote-access.alice@bv478gen.stdout-handler.bytes_processed" -> 1,234,567 + +// Examples in code: +async fn handle_connection(session: server::Session) { + let ctx = session.context(); // Path: "global.remote-access.alice@bv478gen" + + // These create global entries: + ctx.increment_total("commands"); // Key: "global.remote-access.alice@bv478gen.commands" + ctx.increment_live("commands"); // Key: "global.remote-access.alice@bv478gen.live_commands" + + // Nested task context + ctx.child("stdout-handler").spawn(|task_ctx| async move { + // task_ctx.path() -> "global.remote-access.alice@bv478gen.stdout-handler" + task_ctx.increment_total("bytes_processed"); + }); +} +``` + +### Persistent Counter Benefits + +- **✅ Survives context drops** - Counters stored globally, persist after contexts end +- **✅ Hierarchical aggregation** - Can sum all child counters for parent totals +- **✅ Path-based queries** - Easy to find counters by context path +- **✅ Historical tracking** - Total counters accumulate across all context instances +- **✅ Live tracking** - Live counters automatically decremented when contexts drop + +**Live counters** show current activity (auto-decremented on context drop). +**Total counters** show historical activity (persist forever for trending). +**Global storage** ensures metrics survive context lifecycles. + +## Status Monitoring and HTTP Dashboard + +fastn-context automatically provides multiple ways to access real-time status information for debugging and monitoring. + +### P2P Status Access + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Status automatically available over P2P for remote access + // No HTTP server needed - uses secure P2P connections + + // Your application code... +} +``` + +Status is accessible over the P2P network using the remote access system. + +### Status API Functions + +```rust +/// Get current status snapshot with ANSI formatting +pub fn status() -> Status; + +/// Stream of status updates (max once per second) +pub fn status_stream() -> impl futures_core::stream::Stream; + +/// Get raw status data as structured JSON +pub fn status_json() -> serde_json::Value; +``` + +### Status Type with ANSI Display + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct Status { + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct ContextStatus { + pub name: String, + pub duration: std::time::Duration, + pub last_activity: std::time::Duration, // Time since last activity + pub children: Vec, + pub metrics: std::collections::HashMap, + pub data: std::collections::HashMap, + pub total_counters: std::collections::HashMap, // Historical counts + pub live_counters: std::collections::HashMap, // Current active counts +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct LockStatus { + pub name: String, + pub held_by_context: String, + pub held_duration: std::time::Duration, + pub lock_type: LockType, // Mutex, RwLock, Semaphore +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct StatusWarning { + pub message: String, + pub context_path: String, + pub severity: WarningSeverity, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub enum WarningSeverity { + Info, // FYI information + Warning, // Potential issue + Critical, // Likely problem +} +``` + +### ANSI-Formatted Display + +```rust +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use colored::*; // For ANSI colors + + // Header with timestamp + writeln!(f, "{}", "fastn Status Dashboard".bold().blue())?; + writeln!(f, "{}", format!("Snapshot: {}", + humantime::format_rfc3339(self.timestamp)).dimmed())?; + writeln!(f)?; + + // Context tree with colors and timing + self.display_context_tree(f, &self.global_context, 0)?; + + // Active locks section + if !self.active_locks.is_empty() { + writeln!(f, "\n{} Active Locks ({}):", "🔒".yellow(), self.active_locks.len())?; + for lock in &self.active_locks { + let duration_str = humantime::format_duration(lock.held_duration); + let color = if lock.held_duration.as_secs() > 10 { + "red" + } else { + "white" + }; + writeln!(f, " - \"{}\" held by {} ({})", + lock.name.cyan(), + lock.held_by_context.white(), + duration_str.color(color))?; + } + } + + // Lock waiters section + if !self.lock_waiters.is_empty() { + writeln!(f, "\n{} Lock Waiters ({}):", "⏳".yellow(), self.lock_waiters.len())?; + for waiter in &self.lock_waiters { + let duration_str = humantime::format_duration(waiter.waiting_duration); + writeln!(f, " - {} waiting for \"{}\" ({})", + waiter.context_name.white(), + waiter.lock_name.cyan(), + duration_str.red())?; + } + } + + // Warnings section + if !self.warnings.is_empty() { + writeln!(f, "\n{} Warnings:", "⚠️".red())?; + for warning in &self.warnings { + let icon = match warning.severity { + WarningSeverity::Info => "ℹ️", + WarningSeverity::Warning => "⚠️", + WarningSeverity::Critical => "🚨", + }; + writeln!(f, " {} {}", icon, warning.message.yellow())?; + } + } + + Ok(()) + } +} +``` + +### Status Stream (Event-Driven Updates) + +```rust +/// Stream provides updates only when context tree actually changes +/// No polling - efficient for long-running monitoring +let mut status_stream = fastn_context::status_stream(); +while let Some(status) = status_stream.next().await { + // Only prints when something actually changes + print!("\x1B[2J\x1B[H"); // Clear screen + println!("{}", status); // Display with colors +} +``` + +### CLI Integration with P2P Status Access + +fastn-context integrates with the main fastn CLI to provide both local and remote status access: + +```bash +# Local machine status +fastn status # One-time snapshot with ANSI colors +fastn status -w # Watch mode (event-driven, no polling) +fastn status --json # JSON output for programmatic use + +# Remote machine status over P2P (requires remote access) +fastn status alice # Status from machine with alias "alice" +fastn status bv478gen... # Status from machine with ID52 +fastn status alice -w # Watch remote machine's status in real-time +fastn status alice --json # Remote machine status as JSON + +# Multiple machines +fastn status alice,bob,prod # Status from multiple machines +``` + +**P2P Status Protocol:** +- Uses secure fastn remote access (same as `fastn rshell`) +- Requires target machine in your `remote-access/config.toml` +- Status data transmitted over encrypted P2P connection +- Real-time streaming for remote watch mode + +### Status Protocol Integration + +Status access integrates seamlessly with fastn's remote access system: + +```rust +// Status is available as a built-in remote command +// When fastn-daemon receives status requests, fastn-context provides the data + +// Server side - automatic status command handling +// fastn-daemon automatically handles: +// - StatusRequest -> returns current Status +// - StatusStreamRequest -> returns real-time Status stream + +// Client side - transparent remote access +fastn status alice // Translates to fastn_p2p::client::call(alice, StatusRequest) +fastn status alice -w // Translates to fastn_p2p::client::connect(alice, StatusStreamProtocol) +``` + +This gives **comprehensive status access** - terminal, HTTP, streaming, and programmatic - all from the same underlying Status structure with rich ANSI formatting for human consumption. + +## System Metrics Monitoring + +fastn-context automatically monitors system resources and integrates them into the status display. + +### Automatic System Monitoring + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct SystemMetrics { + pub cpu_usage_percent: f32, // Current CPU usage + pub memory_used_bytes: u64, // RAM usage + pub memory_total_bytes: u64, // Total RAM + pub disk_used_bytes: u64, // Disk usage + pub disk_total_bytes: u64, // Total disk + pub network_rx_bytes_per_sec: u64, // Network receive rate + pub network_tx_bytes_per_sec: u64, // Network transmit rate + pub load_average: [f32; 3], // 1min, 5min, 15min load + pub uptime: std::time::Duration, // System uptime +} + +// Added to Status structure +pub struct Status { + pub system_metrics: SystemMetrics, // System resource usage + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} +``` + +### Efficient Metric Collection + +```rust +// System metrics cached and updated appropriately: +// - CPU usage: Updated every 1 second (smooth average) +// - Memory/disk: Updated every 5 seconds (less volatile) +// - Network rates: Updated every 1 second (calculated from deltas) +// - Load average: Updated every 10 seconds (system provides this) + +// Metrics only recalculated when status is actually requested +// No background polling unless someone is watching +``` + +### Enhanced Status Display with System Info + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +⏳ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK + +⚠️ Alerts: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O + - CPU usage normal (12.3%), memory usage low (13%) +``` + +### Watch Mode (`fastn status -w`) + +```rust +// Event-driven updates - only when something changes +// No CPU overhead when system is idle +// Immediately shows when new contexts/locks appear or disappear + +$ fastn status -w +# Screen updates only when: +# - New context created/destroyed +# - Lock acquired/released +# - Significant activity changes +# - System metrics cross thresholds +# - No updates for days if system is stable +``` + +This provides **complete operational visibility** with both application-specific context trees and system resource monitoring, all with efficient event-driven updates instead of wasteful polling. \ No newline at end of file diff --git a/fastn-context/NEXT-counters.md b/fastn-context/NEXT-counters.md new file mode 100644 index 000000000..f240c3135 --- /dev/null +++ b/fastn-context/NEXT-counters.md @@ -0,0 +1,37 @@ +# NEXT: Global Counter Storage System + +Features for persistent counter tracking with dotted path keys. + +## Counter Types + +- **Total counters** - Historical cumulative counts (persist after context drops) +- **Live counters** - Current active operations (auto-decremented on drop) + +## Global Storage + +```rust +// Dotted path keys in global HashMap +"global.connections" -> 1,247 +"global.remote-access.alice@bv478gen.commands" -> 45 +"global.http-proxy.requests" -> 1,013 +``` + +## API + +```rust +impl Context { + pub fn increment_total(&self, counter: &str); + pub fn increment_live(&self, counter: &str); + pub fn decrement_live(&self, counter: &str); + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} +``` + +## Integration + +- fastn-p2p automatically tracks connection/request counters +- Counters survive context drops via global storage +- Hierarchical aggregation possible via path prefix matching + +**Implementation**: After basic Context + monitoring foundation. \ No newline at end of file diff --git a/fastn-context/NEXT-locks.md b/fastn-context/NEXT-locks.md new file mode 100644 index 000000000..9853624da --- /dev/null +++ b/fastn-context/NEXT-locks.md @@ -0,0 +1,32 @@ +# NEXT: Named Locks and Deadlock Detection + +Features for named lock monitoring and deadlock detection. + +## Named Lock Types + +- `ContextMutex` - Named mutex with timing tracking +- `ContextRwLock` - Named read-write lock +- `ContextSemaphore` - Named semaphore with permit tracking + +## Lock Monitoring + +- Track who holds what locks and for how long +- Track who's waiting for locks and wait times +- Automatic deadlock risk detection +- Lock status in context tree display + +## Usage Pattern + +```rust +// Create named locks within context +let user_lock = ctx.mutex("user-data-lock", UserData::new()); + +// Lock operations automatically tracked +let guard = user_lock.lock().await; +// Shows in status: "HOLDS user-data-lock (2.3s)" + +// Waiting operations tracked +// Shows in status: "WAITING user-data-lock (5.1s) ⚠️ STUCK" +``` + +**Implementation**: After basic Context system + monitoring foundation. \ No newline at end of file diff --git a/fastn-context/NEXT-metrics-and-data.md b/fastn-context/NEXT-metrics-and-data.md new file mode 100644 index 000000000..c1950c559 --- /dev/null +++ b/fastn-context/NEXT-metrics-and-data.md @@ -0,0 +1,68 @@ +# NEXT: Metrics and Data Storage + +Features for storing metrics and arbitrary data on contexts for debugging and monitoring. + +## Metric Storage + +```rust +impl Context { + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); +} + +#[derive(Debug, Clone)] +pub enum MetricValue { + Counter(u64), + Gauge(f64), + Duration(std::time::Duration), + Text(String), + Bytes(u64), +} +``` + +## Data Storage + +```rust +impl Context { + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); + + /// Get stored data + pub fn get_data(&self, key: &str) -> Option; +} +``` + +## Builder Pattern Integration + +```rust +impl ContextBuilder { + /// Add initial data to context + pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; + + /// Add initial metric to context + pub fn with_metric(self, key: &str, value: MetricValue) -> Self; +} +``` + +## Usage Examples + +```rust +// Add metrics during task execution +task_ctx.add_metric("commands_executed", MetricValue::Counter(cmd_count)); +task_ctx.add_metric("response_time", MetricValue::Duration(elapsed)); + +// Store debugging data +task_ctx.set_data("last_command", serde_json::Value::String("ls -la".to_string())); +task_ctx.set_data("exit_code", serde_json::Value::Number(0.into())); + +// Pre-configure context with builder +ctx.child("remote-shell-handler") + .with_data("peer", serde_json::Value::String(alice_id52)) + .with_data("shell", serde_json::Value::String("bash".to_string())) + .with_metric("commands_executed", MetricValue::Counter(0)) + .spawn(|task_ctx| async move { + // Task starts with pre-configured data and metrics + }); +``` + +**Implementation**: After basic Context tree structure is working. \ No newline at end of file diff --git a/fastn-context/NEXT-monitoring.md b/fastn-context/NEXT-monitoring.md new file mode 100644 index 000000000..644271d75 --- /dev/null +++ b/fastn-context/NEXT-monitoring.md @@ -0,0 +1,34 @@ +# NEXT: Comprehensive Monitoring System + +Features planned for future implementation after basic Context system is working. + +## Status Trees and Monitoring + +- Hierarchical status display with timing +- ANSI-formatted status output +- Event-driven status updates +- System metrics integration (CPU, RAM, disk, network) +- P2P status distribution (`fastn status `) + +## Counter Management + +- Global counter storage with dotted paths +- Total vs live counter tracking +- Automatic counter integration +- Hierarchical counter aggregation + +## Named Locks + +- ContextMutex, ContextRwLock, ContextSemaphore +- Deadlock detection and timing +- Lock status in monitoring tree +- Wait time tracking + +## Advanced Features + +- Builder pattern: `ctx.child("name").with_data().spawn()` +- Metric types and data storage +- HTTP status endpoints +- Status streaming API + +**Implementation**: After basic Context + fastn-p2p integration is complete. \ No newline at end of file diff --git a/fastn-context/NEXT-operation-tracking.md b/fastn-context/NEXT-operation-tracking.md new file mode 100644 index 000000000..bfce50f23 --- /dev/null +++ b/fastn-context/NEXT-operation-tracking.md @@ -0,0 +1,84 @@ +# NEXT: Operation Tracking for Precise Debugging + +Features for tracking exactly where tasks are stuck using named await and select operations. + +## Named Await Operations + +```rust +/// Track what operation a context is waiting for +let result = fastn_context::await!("waiting-for-response", some_operation()); +// No .await suffix - the macro handles it + +// Examples +let data = fastn_context::await!("reading-file", std::fs::read("config.toml")); +let response = fastn_context::await!("http-request", client.get(url).send()); +let connection = fastn_context::await!("database-connect", db.connect()); +``` + +## Simple Select Tracking + +```rust +/// Track that context is stuck on select (no branch naming needed) +fastn_context::select! { + _ = task_ctx.wait() => println!("Cancelled"), + _ = stream.read() => println!("Data received"), + _ = database.query() => println!("Query complete"), +} +// Records: "stuck on select" +``` + +## Status Display with Operation Names + +``` +Global Context (2h 15m uptime) +├── Remote Access Listener +│ ├── alice@bv478gen (23m connected) +│ │ ├── stdout-handler (stuck on: "reading-stream" 12.3s) ⚠️ STUCK +│ │ └── stderr-stream (stuck on: "select" 8.1s) +│ └── bob@p2nd7avq (stuck on: "database-query" 0.2s) ✅ ACTIVE +└── HTTP Proxy (stuck on: "select" 0.1s) ✅ ACTIVE +``` + +## Design Principles + +### Single Operation per Context +- **Good design**: One await/select per context encourages proper task breakdown +- **Multiple selects**: Suggests need for child contexts instead + +```rust +// ❌ Complex - hard to debug where it's stuck +fastn_context::select! { /* 5 different operations */ } +fastn_context::select! { /* 3 more operations */ } + +// ✅ Clear - each operation has its own context +ctx.spawn_child("network-handler", |ctx| async move { + fastn_context::select! { /* network operations */ } +}); +ctx.spawn_child("database-handler", |ctx| async move { + let result = fastn_context::await!("user-query", db.get_user(id)); +}); +``` + +### Automatic Operation Tracking + +```rust +// Context automatically tracks current operation +pub struct Context { + current_operation: std::sync::Arc>>, + operation_started: std::sync::Arc>>, +} + +// Status can show: +// - What operation is running +// - How long it's been running +// - If it's stuck (running too long) +``` + +## Benefits + +1. **Precise debugging** - Know exactly where each task is stuck +2. **Performance insights** - See which operations take too long +3. **Design enforcement** - Encourages proper context decomposition +4. **Production monitoring** - Real-time operation visibility + +**Implementation**: After basic Context + monitoring system is complete. \ No newline at end of file diff --git a/fastn-context/NEXT-status-distribution.md b/fastn-context/NEXT-status-distribution.md new file mode 100644 index 000000000..f44ecefb9 --- /dev/null +++ b/fastn-context/NEXT-status-distribution.md @@ -0,0 +1,36 @@ +# NEXT: P2P Status Distribution + +Features for distributed status monitoring over P2P network. + +## Remote Status Access + +```bash +# Remote machine status over P2P +fastn status alice # Status from machine with alias "alice" +fastn status alice -w # Watch remote machine in real-time +fastn status alice,bob,prod # Multiple machines +``` + +## P2P Integration + +- Uses secure fastn remote access (same as `fastn rshell`) +- Status transmitted over encrypted P2P connections +- Requires target machine in `remote-access/config.toml` +- Real-time streaming for watch mode + +## Protocol + +```rust +// Built-in status commands +StatusRequest -> Status // One-time snapshot +StatusStreamProtocol -> Stream // Real-time updates +``` + +## Benefits + +- **Distributed monitoring** - Monitor entire fastn network from any machine +- **Secure access** - Uses same permissions as remote shell +- **No HTTP servers** - Uses P2P infrastructure only +- **Real-time** - Event-driven updates across network + +**Implementation**: After P2P streaming API + basic status system. \ No newline at end of file diff --git a/fastn-context/README-FULL.md b/fastn-context/README-FULL.md new file mode 100644 index 000000000..eb48306d9 --- /dev/null +++ b/fastn-context/README-FULL.md @@ -0,0 +1,844 @@ +# fastn-context: Hierarchical Application Context for Debugging and Operations + +This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. + +> **Note**: This README documents the complete design iteratively. Some sections may overlap as features build on each other. The design is internally consistent - later sections refine and extend earlier concepts. + +## Design Philosophy + +- **Hierarchical Structure**: Applications naturally form trees of operations +- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents +- **Zero Boilerplate**: Context trees build themselves as applications run +- **Production Ready**: Status trees enable debugging of stuck/slow operations +- **Bounded Complexity**: Simple spawn vs detailed child creation as needed + +## Core Concepts + +### Context Tree Structure + +Every fastn application forms a natural hierarchy: + +``` +Global Context (application level) +├── Service Context (e.g., "remote-access-listener") +│ ├── Session Context (e.g., "alice@bv478gen") +│ │ ├── Task Context (e.g., "stdout-handler") +│ │ └── Task Context (e.g., "stderr-stream") +│ └── Session Context (e.g., "bob@p2nd7avq") +├── Service Context (e.g., "http-proxy") +└── Service Context (e.g., "chat-service") +``` + +### Automatic Context Creation + +fastn-context integrates seamlessly with fastn ecosystem: + +```rust +// 1. Global context created by main macro +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically available +} + +// 2. Service contexts created by operations +let listener = fastn_p2p::server::listen(key, protocols).await?; +// Creates child context: "p2p-listener" under global + +// 3. Session contexts created per connection +// Each incoming connection gets child context: "session-{peer_id}" + +// 4. Task contexts created by spawn operations +session_ctx.child("shell-handler").spawn(handle_shell); +``` + +## API Reference + +### Core Context + +```rust +pub struct Context { + /// Context name for debugging/status + pub name: String, + + /// When this context was created + pub created_at: std::time::Instant, + + // Private: parent, children, cancellation, metrics, data +} + +impl Context { + /// Create new root context (typically only used by main macro) + pub fn new(name: &str) -> std::sync::Arc; + + /// Create child context with given name + pub fn child(&self, name: &str) -> ContextBuilder; + + /// Simple spawn (inherits current context, no child creation) + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle + where F: std::future::Future + Send + 'static; + + /// Wait for cancellation signal + pub async fn wait(&self); + + /// Cancel this context and all children recursively + pub fn cancel(&self); + + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); + + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); + + /// Get stored data + pub fn get_data(&self, key: &str) -> Option; + + /// Increment total counter (historical count) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (current active count) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (when operation completes) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} +``` + +### Context Builder + +```rust +pub struct ContextBuilder { + // Pre-created child context ready for configuration +} + +impl ContextBuilder { + /// Add initial data to context + pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; + + /// Add initial metric to context + pub fn with_metric(self, key: &str, value: MetricValue) -> Self; + + /// Spawn task with this configured child context + pub fn spawn(self, task: F) -> tokio::task::JoinHandle + where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; +} +``` + +### Global Access + +```rust +/// Get the global application context +pub fn global() -> std::sync::Arc; + +/// Get current task's context (thread-local or task-local) +pub fn current() -> std::sync::Arc; + +/// Print status tree for debugging +pub fn status() -> StatusTree; +``` + +### Metric Types + +```rust +#[derive(Debug, Clone)] +pub enum MetricValue { + Counter(u64), + Gauge(f64), + Duration(std::time::Duration), + Text(String), + Bytes(u64), +} +``` + +## Usage Patterns + +### Simple Task Spawning + +```rust +// Inherit current context (no child creation) +let ctx = fastn_context::current(); +ctx.spawn(async { + // Simple background task +}); +``` + +### Detailed Task Spawning + +```rust +// Create child context with debugging info +ctx.child("remote-shell-handler") + .with_data("peer", alice_id52) + .with_data("shell", "bash") + .with_metric("commands_executed", 0) + .spawn(|task_ctx| async move { + // Task can update its own context + task_ctx.add_metric("commands_executed", cmd_count); + task_ctx.set_data("last_command", "ls -la"); + + // Task waits for its own cancellation + tokio::select! { + _ = task_ctx.wait() => { + println!("Shell handler cancelled"); + } + _ = handle_shell_session() => { + println!("Shell session completed"); + } + } + }); +``` + +### Status Tree Output + +``` +$ fastn status +Global Context (2h 15m 32s uptime) +├── Remote Access Listener (1h 45m active) +│ ├── alice@bv478gen (23m 12s, bash shell) +│ │ ├── stdout-handler (23m 12s, 15.2MB processed) +│ │ └── stderr-stream (18m 45s, 2.1KB processed) +│ └── bob@p2nd7avq (8m 33s, ls command) +│ └── command-executor (8m 33s, exit pending) +├── HTTP Proxy (2h 15m active) +│ ├── connection-pool (45 active, 1,234 requests) +│ └── request-handler-pool (12 workers active) +└── Chat Service (35m active) + ├── presence-monitor (35m, 15 users tracked) + └── message-relay (35m, 4,567 messages) +``` + +## Integration with fastn-p2p + +fastn-p2p depends on fastn-context and automatically creates context hierarchies: + +```rust +// fastn-p2p sessions provide access to their context +async fn handle_remote_shell(session: fastn_p2p::server::Session) { + let ctx = session.context(); // Auto-created by fastn-p2p + + // Simple spawn (inherits session context) + ctx.spawn(pipe_stdout(session.send)); + + // Detailed spawn (creates child for debugging) + ctx.child("command-executor") + .with_data("command", session.protocol.command) + .spawn(|task_ctx| async move { + let result = execute_command(&session.protocol.command).await; + task_ctx.set_data("exit_code", result.code); + }); +} +``` + +## Main Function Integration + +The main macro moves to fastn-context and sets up the global context: + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically created and available + + let ctx = fastn_context::global(); + ctx.child("startup") + .with_data("version", env!("CARGO_PKG_VERSION")) + .spawn(|_| async { + // Application initialization + }); +} +``` + +## Design Benefits + +1. **Names Required for Debugging** - Every important operation has a name in status tree +2. **Selective Complexity** - Simple spawn vs detailed child creation as needed +3. **Automatic Tree Building** - Context hierarchy builds as application runs +4. **Production Debugging** - `fastn status` shows exactly where system is stuck +5. **Clean Separation** - Context concerns separate from networking concerns +6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure + +**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. + +## Comprehensive Timing and Lock Monitoring + +Every context and operation tracks detailed timing for real-time debugging, including named lock monitoring for deadlock detection. + +### Timing Integration + +```rust +pub struct Context { + pub name: String, + pub created_at: std::time::Instant, // When context started + pub last_activity: std::sync::Arc>, // Last activity + // ... other fields +} + +impl Context { + /// Update last activity timestamp (called automatically by operations) + pub fn touch(&self); + + /// Get how long this context has been alive + pub fn duration(&self) -> std::time::Duration; + + /// Get how long since last activity + pub fn idle_duration(&self) -> std::time::Duration; + + /// Create named mutex within this context + pub fn mutex(&self, name: &str, data: T) -> ContextMutex; + + /// Create named RwLock within this context + pub fn rwlock(&self, name: &str, data: T) -> ContextRwLock; + + /// Create named semaphore within this context + pub fn semaphore(&self, name: &str, permits: usize) -> ContextSemaphore; +} +``` + +### Named Lock Types + +```rust +pub struct ContextMutex { + name: String, + context: std::sync::Arc, + inner: tokio::sync::Mutex, +} + +impl ContextMutex { + /// Lock with automatic status tracking + pub async fn lock(&self) -> ContextMutexGuard; +} + +pub struct ContextMutexGuard { + acquired_at: std::time::Instant, // When lock was acquired + context_name: String, // Which context holds it + lock_name: String, // Lock identifier + // Auto-reports to context status system + // Auto-cleanup on drop +} +``` + +### Detailed Status Output with Comprehensive Timing + +``` +$ fastn status +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +⏳ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK + +⚠️ Potential Issues: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O +``` + +### Automatic Activity Tracking + +```rust +// All operations automatically maintain timing +ctx.spawn(async { + // ctx.touch() called when task starts + loop { + do_work().await; + ctx.touch(); // Update activity timestamp + } +}); + +// Lock operations update timing automatically +let guard = ctx.mutex("data-lock", data).lock().await; +// Updates: context last_activity, tracks lock hold time + +// Long operations should periodically touch +async fn long_running_task(ctx: std::sync::Arc) { + loop { + process_batch().await; + ctx.touch(); // Show we're still active, not stuck + + tokio::select! { + _ = ctx.wait() => break, // Cancelled + _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {} + } + } +} +``` + +This provides **real-time operational debugging** - administrators can instantly identify stuck operations, deadlocked tasks, and performance bottlenecks with precise timing information. + +## Counter Management System + +Every context can track both historical totals and live counts for detailed operational metrics. + +### Global Counter Storage with Dotted Paths + +```rust +pub struct Context { + pub name: String, + pub full_path: String, // "global.remote-access.alice@bv478gen.stdout-handler" + // ... other fields +} + +impl Context { + /// Get full dotted path for this context + pub fn path(&self) -> &str; + + /// Increment total counter (stored in global hashmap by full path) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (stored in global hashmap by full path) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (stored in global hashmap by full path) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values (retrieved from global storage) + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} + +// Global counter storage (persists beyond context lifetimes) +static GLOBAL_COUNTERS: LazyLock>> = ...; + +// Counter keys format: "{context_path}.{counter_name}" +// Examples: +// "global.connections" -> 1,247 +// "global.remote-access.connections" -> 234 +// "global.remote-access.alice@bv478gen.commands" -> 45 +// "global.http-proxy.requests" -> 1,013 +``` + +### Automatic Counter Integration + +```rust +// fastn-p2p automatically maintains connection counters +async fn handle_incoming_connection(session: fastn_p2p::server::Session) { + let ctx = session.context(); + + // Automatically tracked by fastn-p2p: + ctx.increment_total("connections"); // Total connections ever + ctx.increment_live("connections"); // Current active connections + + // Your handler code... + + // When session ends: + ctx.decrement_live("connections"); // Automatically called +} + +// Custom counters for application logic +async fn handle_remote_command(session: server::Session) { + let ctx = session.context(); + + ctx.increment_total("commands"); // Total commands executed + ctx.increment_live("commands"); // Currently executing commands + + let result = execute_command(&session.protocol.command).await; + + ctx.decrement_live("commands"); // Command completed + + if result.success { + ctx.increment_total("successful_commands"); + } else { + ctx.increment_total("failed_commands"); + } +} +``` + +### Enhanced Status Display with Counters + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Total: 1,247 connections, 15,432 requests | Live: 47 connections, 12 active requests +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── Total: 234 connections, 2,156 commands | Live: 2 connections, 3 commands +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── Total: 45 commands (42 success, 3 failed) | Live: 1 command +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ ├── Total: 12 commands (12 success) | Live: 1 command +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── Total: 1,013 requests (987 success, 26 failed) | Live: 45 connections, 8 requests +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── Total: 4,567 messages, 89 users joined | Live: 15 users, 3 conversations + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): ... +⏳ Lock Waiters (1): ... +``` + +### Counter Storage and Paths + +```rust +// Counter keys are automatically generated from context paths: + +// Global level counters +// "global.connections" -> 1,247 total +// "global.live_connections" -> 47 current + +// Service level counters +// "global.remote-access.connections" -> 234 total +// "global.remote-access.live_connections" -> 2 current + +// Session level counters +// "global.remote-access.alice@bv478gen.commands" -> 45 total +// "global.remote-access.alice@bv478gen.live_commands" -> 1 current + +// Task level counters +// "global.remote-access.alice@bv478gen.stdout-handler.bytes_processed" -> 1,234,567 + +// Examples in code: +async fn handle_connection(session: server::Session) { + let ctx = session.context(); // Path: "global.remote-access.alice@bv478gen" + + // These create global entries: + ctx.increment_total("commands"); // Key: "global.remote-access.alice@bv478gen.commands" + ctx.increment_live("commands"); // Key: "global.remote-access.alice@bv478gen.live_commands" + + // Nested task context + ctx.child("stdout-handler").spawn(|task_ctx| async move { + // task_ctx.path() -> "global.remote-access.alice@bv478gen.stdout-handler" + task_ctx.increment_total("bytes_processed"); + }); +} +``` + +### Persistent Counter Benefits + +- **✅ Survives context drops** - Counters stored globally, persist after contexts end +- **✅ Hierarchical aggregation** - Can sum all child counters for parent totals +- **✅ Path-based queries** - Easy to find counters by context path +- **✅ Historical tracking** - Total counters accumulate across all context instances +- **✅ Live tracking** - Live counters automatically decremented when contexts drop + +**Live counters** show current activity (auto-decremented on context drop). +**Total counters** show historical activity (persist forever for trending). +**Global storage** ensures metrics survive context lifecycles. + +## Status Monitoring and HTTP Dashboard + +fastn-context automatically provides multiple ways to access real-time status information for debugging and monitoring. + +### P2P Status Access + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Status automatically available over P2P for remote access + // No HTTP server needed - uses secure P2P connections + + // Your application code... +} +``` + +Status is accessible over the P2P network using the remote access system. + +### Status API Functions + +```rust +/// Get current status snapshot with ANSI formatting +pub fn status() -> Status; + +/// Stream of status updates (max once per second) +pub fn status_stream() -> impl futures_core::stream::Stream; + +/// Get raw status data as structured JSON +pub fn status_json() -> serde_json::Value; +``` + +### Status Type with ANSI Display + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct Status { + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct ContextStatus { + pub name: String, + pub duration: std::time::Duration, + pub last_activity: std::time::Duration, // Time since last activity + pub children: Vec, + pub metrics: std::collections::HashMap, + pub data: std::collections::HashMap, + pub total_counters: std::collections::HashMap, // Historical counts + pub live_counters: std::collections::HashMap, // Current active counts +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct LockStatus { + pub name: String, + pub held_by_context: String, + pub held_duration: std::time::Duration, + pub lock_type: LockType, // Mutex, RwLock, Semaphore +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct StatusWarning { + pub message: String, + pub context_path: String, + pub severity: WarningSeverity, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub enum WarningSeverity { + Info, // FYI information + Warning, // Potential issue + Critical, // Likely problem +} +``` + +### ANSI-Formatted Display + +```rust +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use colored::*; // For ANSI colors + + // Header with timestamp + writeln!(f, "{}", "fastn Status Dashboard".bold().blue())?; + writeln!(f, "{}", format!("Snapshot: {}", + humantime::format_rfc3339(self.timestamp)).dimmed())?; + writeln!(f)?; + + // Context tree with colors and timing + self.display_context_tree(f, &self.global_context, 0)?; + + // Active locks section + if !self.active_locks.is_empty() { + writeln!(f, "\n{} Active Locks ({}):", "🔒".yellow(), self.active_locks.len())?; + for lock in &self.active_locks { + let duration_str = humantime::format_duration(lock.held_duration); + let color = if lock.held_duration.as_secs() > 10 { + "red" + } else { + "white" + }; + writeln!(f, " - \"{}\" held by {} ({})", + lock.name.cyan(), + lock.held_by_context.white(), + duration_str.color(color))?; + } + } + + // Lock waiters section + if !self.lock_waiters.is_empty() { + writeln!(f, "\n{} Lock Waiters ({}):", "⏳".yellow(), self.lock_waiters.len())?; + for waiter in &self.lock_waiters { + let duration_str = humantime::format_duration(waiter.waiting_duration); + writeln!(f, " - {} waiting for \"{}\" ({})", + waiter.context_name.white(), + waiter.lock_name.cyan(), + duration_str.red())?; + } + } + + // Warnings section + if !self.warnings.is_empty() { + writeln!(f, "\n{} Warnings:", "⚠️".red())?; + for warning in &self.warnings { + let icon = match warning.severity { + WarningSeverity::Info => "ℹ️", + WarningSeverity::Warning => "⚠️", + WarningSeverity::Critical => "🚨", + }; + writeln!(f, " {} {}", icon, warning.message.yellow())?; + } + } + + Ok(()) + } +} +``` + +### Status Stream (Event-Driven Updates) + +```rust +/// Stream provides updates only when context tree actually changes +/// No polling - efficient for long-running monitoring +let mut status_stream = fastn_context::status_stream(); +while let Some(status) = status_stream.next().await { + // Only prints when something actually changes + print!("\x1B[2J\x1B[H"); // Clear screen + println!("{}", status); // Display with colors +} +``` + +### CLI Integration with P2P Status Access + +fastn-context integrates with the main fastn CLI to provide both local and remote status access: + +```bash +# Local machine status +fastn status # One-time snapshot with ANSI colors +fastn status -w # Watch mode (event-driven, no polling) +fastn status --json # JSON output for programmatic use + +# Remote machine status over P2P (requires remote access) +fastn status alice # Status from machine with alias "alice" +fastn status bv478gen... # Status from machine with ID52 +fastn status alice -w # Watch remote machine's status in real-time +fastn status alice --json # Remote machine status as JSON + +# Multiple machines +fastn status alice,bob,prod # Status from multiple machines +``` + +**P2P Status Protocol:** +- Uses secure fastn remote access (same as `fastn rshell`) +- Requires target machine in your `remote-access/config.toml` +- Status data transmitted over encrypted P2P connection +- Real-time streaming for remote watch mode + +### Status Protocol Integration + +Status access integrates seamlessly with fastn's remote access system: + +```rust +// Status is available as a built-in remote command +// When fastn-daemon receives status requests, fastn-context provides the data + +// Server side - automatic status command handling +// fastn-daemon automatically handles: +// - StatusRequest -> returns current Status +// - StatusStreamRequest -> returns real-time Status stream + +// Client side - transparent remote access +fastn status alice // Translates to fastn_p2p::client::call(alice, StatusRequest) +fastn status alice -w // Translates to fastn_p2p::client::connect(alice, StatusStreamProtocol) +``` + +This gives **comprehensive status access** - terminal, HTTP, streaming, and programmatic - all from the same underlying Status structure with rich ANSI formatting for human consumption. + +## System Metrics Monitoring + +fastn-context automatically monitors system resources and integrates them into the status display. + +### Automatic System Monitoring + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct SystemMetrics { + pub cpu_usage_percent: f32, // Current CPU usage + pub memory_used_bytes: u64, // RAM usage + pub memory_total_bytes: u64, // Total RAM + pub disk_used_bytes: u64, // Disk usage + pub disk_total_bytes: u64, // Total disk + pub network_rx_bytes_per_sec: u64, // Network receive rate + pub network_tx_bytes_per_sec: u64, // Network transmit rate + pub load_average: [f32; 3], // 1min, 5min, 15min load + pub uptime: std::time::Duration, // System uptime +} + +// Added to Status structure +pub struct Status { + pub system_metrics: SystemMetrics, // System resource usage + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} +``` + +### Efficient Metric Collection + +```rust +// System metrics cached and updated appropriately: +// - CPU usage: Updated every 1 second (smooth average) +// - Memory/disk: Updated every 5 seconds (less volatile) +// - Network rates: Updated every 1 second (calculated from deltas) +// - Load average: Updated every 10 seconds (system provides this) + +// Metrics only recalculated when status is actually requested +// No background polling unless someone is watching +``` + +### Enhanced Status Display with System Info + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +⏳ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK + +⚠️ Alerts: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O + - CPU usage normal (12.3%), memory usage low (13%) +``` + +### Watch Mode (`fastn status -w`) + +```rust +// Event-driven updates - only when something changes +// No CPU overhead when system is idle +// Immediately shows when new contexts/locks appear or disappear + +$ fastn status -w +# Screen updates only when: +# - New context created/destroyed +# - Lock acquired/released +# - Significant activity changes +# - System metrics cross thresholds +# - No updates for days if system is stable +``` + +This provides **complete operational visibility** with both application-specific context trees and system resource monitoring, all with efficient event-driven updates instead of wasteful polling. \ No newline at end of file diff --git a/fastn-context/README.md b/fastn-context/README.md new file mode 100644 index 000000000..41bcd1b39 --- /dev/null +++ b/fastn-context/README.md @@ -0,0 +1,234 @@ +# fastn-context: Hierarchical Application Context for Debugging and Operations + +This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. + +## Design Philosophy + +- **Hierarchical Structure**: Applications naturally form trees of operations +- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents +- **Zero Boilerplate**: Context trees build themselves as applications run +- **Production Ready**: Status trees enable debugging of stuck/slow operations +- **Bounded Complexity**: Simple spawn vs detailed child creation as needed + +## Core Concepts + +### Context Tree Structure + +Every fastn application forms a natural hierarchy: + +``` +Global Context (application level) +├── Service Context (e.g., "remote-access-listener") +│ ├── Session Context (e.g., "alice@bv478gen") +│ │ ├── Task Context (e.g., "stdout-handler") +│ │ └── Task Context (e.g., "stderr-stream") +│ └── Session Context (e.g., "bob@p2nd7avq") +├── Service Context (e.g., "http-proxy") +└── Service Context (e.g., "chat-service") +``` + +### Automatic Context Creation + +fastn-context integrates seamlessly with fastn ecosystem: + +```rust +// 1. Global context created by main macro +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically available +} + +// 2. Service contexts created by operations +let listener = fastn_p2p::server::listen(key, protocols).await?; +// Creates child context: "p2p-listener" under global + +// 3. Session contexts created per connection +// Each incoming connection gets child context: "session-{peer_id}" + +// 4. Task contexts created by spawn operations +session_ctx.child("shell-handler").spawn(handle_shell); +``` + +## API Reference + +### Core Context + +```rust +pub struct Context { + /// Context name for debugging/status + pub name: String, + + // Private: parent, children, cancellation_token +} + +impl Context { + /// Create new root context (typically only used by main macro) + pub fn new(name: &str) -> std::sync::Arc; + + /// Create child context with given name + pub fn child(&self, name: &str) -> ContextBuilder; + + /// Simple spawn (inherits current context, no child creation) + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle + where F: std::future::Future + Send + 'static; + + /// Spawn task with named child context (common case shortcut) + pub fn spawn_child(&self, name: &str, task: F) -> tokio::task::JoinHandle + where + F: FnOnce(std::sync::Arc) -> Fut + Send + 'static, + Fut: std::future::Future + Send + 'static; + + /// Wait for cancellation signal + pub async fn wait(&self); + + /// Cancel this context and all children recursively + pub fn cancel(&self); +} +``` + +### Context Builder + +```rust +pub struct ContextBuilder { + // Pre-created child context ready for spawning +} + +impl ContextBuilder { + /// Spawn task with this child context + pub fn spawn(self, task: F) -> tokio::task::JoinHandle + where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; +} +``` + +### Global Access + +```rust +/// Get the global application context +pub fn global() -> std::sync::Arc; +``` + + +## Usage Patterns + +### Simple Task Spawning + +```rust +// Simple named task spawning (common case) +let ctx = fastn_context::global(); // or passed as parameter +ctx.spawn_child("background-task", |task_ctx| async move { + // Simple background task with explicit context + println!("Running in context: {}", task_ctx.name); +}); + +// Alternative: builder pattern for simple case +ctx.child("background-task") + .spawn(|task_ctx| async move { + // Same result, different syntax + println!("Running in context: {}", task_ctx.name); + }); +``` + +### Cancellation Handling + +```rust +// Task waits for context cancellation +ctx.spawn_child("shell-handler", |task_ctx| async move { + println!("Shell handler starting: {}", task_ctx.name); + + // Task waits for its own cancellation + tokio::select! { + _ = task_ctx.wait() => { + println!("Shell handler cancelled"); + } + _ = handle_shell_session() => { + println!("Shell session completed"); + } + } +}); +``` + +## Integration with fastn-p2p + +fastn-p2p depends on fastn-context and automatically creates context hierarchies: + +```rust +// fastn-p2p sessions provide access to their context +async fn handle_remote_shell(session: fastn_p2p::server::Session) { + let ctx = session.context(); // Auto-created by fastn-p2p + + // Simple spawn (inherits session context) + ctx.spawn(pipe_stdout(session.send)); + + // Named child spawn for debugging + ctx.spawn_child("command-executor", |task_ctx| async move { + println!("Executing command in context: {}", task_ctx.name); + let result = execute_command(&session.protocol.command).await; + println!("Command completed with: {:?}", result); + }); +} +``` + +## Main Function Integration + +The main macro sets up the global context and provides comprehensive configuration: + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically created and available + + let ctx = fastn_context::global(); + ctx.spawn_child("startup", |startup_ctx| async move { + println!("Application starting: {}", startup_ctx.name); + // Application initialization + }); +} +``` + +### Configuration Options + +```rust +#[fastn_context::main( + // Logging configuration + logging = true, // Default: true - simple logging setup + + // Shutdown behavior + shutdown_mode = "single_ctrl_c", // Default: "single_ctrl_c" + shutdown_timeout = "30s", // Default: "30s" - graceful shutdown timeout + + // Double Ctrl+C specific (only when shutdown_mode = "double_ctrl_c") + double_ctrl_c_window = "2s", // Default: "2s" - time window for second Ctrl+C + status_fn = my_status_printer, // Required for double_ctrl_c mode +)] +async fn main() -> eyre::Result<()> { + // Your application code +} + +// Status function (required for double_ctrl_c mode) +async fn my_status_printer() { + println!("=== Application Status ==="); + // Custom status logic - access global registries, counters, etc. + println!("Active services: {}", get_active_service_count()); +} +``` + +## Design Benefits + +1. **Names Required for Debugging** - Every important operation has a name in status tree +2. **Selective Complexity** - Simple spawn vs detailed child creation as needed +3. **Automatic Tree Building** - Context hierarchy builds as application runs +4. **Production Debugging** - Status trees show exactly where system is stuck +5. **Clean Separation** - Context concerns separate from networking concerns +6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure + +**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. + +## Future Features + +See NEXT-*.md files for planned enhancements: + +- **NEXT-metrics-and-data.md**: Metric storage and arbitrary data on contexts +- **NEXT-monitoring.md**: Status trees, timing, system metrics monitoring +- **NEXT-locks.md**: Named locks and deadlock detection +- **NEXT-counters.md**: Global counter storage with dotted paths +- **NEXT-status-distribution.md**: P2P distributed status access diff --git a/fastn-context/examples/minimal_test.rs b/fastn-context/examples/minimal_test.rs new file mode 100644 index 000000000..9d9de4b8a --- /dev/null +++ b/fastn-context/examples/minimal_test.rs @@ -0,0 +1,56 @@ +/// Test the minimal fastn-context API needed for fastn-p2p integration +/// This validates our basic Context design before implementation + +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + println!("Testing minimal fastn-context API..."); + + // Global context should be automatically available + let global_ctx = fastn_context::global(); + println!("Global context created: {}", global_ctx.name); + + // Test basic child creation with builder pattern + let service_ctx = global_ctx.child("test-service"); + println!("Service context created: {}", service_ctx.name); + + // Test simple task spawning with shortcut + service_ctx.spawn_child("simple-task", |task_ctx| async move { + println!("Task 1 running with explicit context: {}", task_ctx.name); + + // Simulate some work + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + println!("Task 1 completed"); + + // Context explicitly available for basic operations + println!("Task context available: {}", task_ctx.name); + }); + + // Test builder pattern with explicit context passing + service_ctx.child("test-session") + .spawn(|task_ctx| async move { + println!("Task 2 running with explicit context: {}", task_ctx.name); + + // Test cancellation handling with explicit context + tokio::select! { + _ = task_ctx.wait() => { + println!("Task 2 cancelled by explicit context"); + } + _ = tokio::time::sleep(tokio::time::Duration::from_millis(200)) => { + println!("Task 2 completed normally"); + } + } + }); + + // Let tasks run briefly + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + + // Test cancellation + println!("Cancelling service context..."); + service_ctx.cancel(); + + // Brief delay to see cancellation effects + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + println!("Minimal API test completed!"); + Ok(()) +} \ No newline at end of file diff --git a/v0.5/fastn-p2p/README.md b/v0.5/fastn-p2p/README.md index 8498f58e6..df4d98a19 100644 --- a/v0.5/fastn-p2p/README.md +++ b/v0.5/fastn-p2p/README.md @@ -4,12 +4,28 @@ This crate provides a high-level, type-safe API for P2P communication in the fas ## Design Philosophy -- **Type Safety First**: All communication uses strongly-typed REQUEST/RESPONSE/ERROR contracts +- **Type Safety First**: All communication uses strongly-typed protocol contracts - **Minimal Surface Area**: Only essential APIs are exposed to reduce complexity - **Bug Prevention**: API design makes common mistakes impossible or unlikely - **Ergonomic**: High-level APIs handle boilerplate automatically - **Zero Boilerplate**: Main functions are clean with automatic setup/teardown +## Core API Patterns + +fastn-p2p provides three main communication patterns: + +### 1. **Simple Request/Response** - `client::call()` + +For simple RPC-style communication with typed request/response. + +### 2. **Streaming Sessions** - `client::connect()` + +For complex interactions requiring bidirectional streaming with optional side channels. + +### 3. **Server Listening** - `server::listen()` + +For accepting incoming connections and handling both patterns. + ## Quick Start ### Application Entry Point @@ -23,7 +39,7 @@ async fn main() -> eyre::Result<()> { match cli.command { Command::Serve { port } => { - fastn_p2p::spawn(start_server(port)); + fastn_p2p::spawn(start_server()); } Command::Client { target } => { fastn_p2p::spawn(run_client(target)); @@ -82,6 +98,7 @@ The `logging` parameter supports multiple formats: ``` **Logging Examples:** + - **Production**: `logging = true` or `logging = "info"` - **Development**: `logging = "debug"` - **Deep debugging**: `logging = "fastn_p2p=trace,fastn_net=trace"` @@ -106,34 +123,146 @@ cargo run # Uses macro parameter as fallback ``` **Priority Order:** + 1. **`RUST_LOG` environment variable** (highest priority) 2. **Macro `logging` parameter** (fallback) 3. **Default `"info"`** (lowest priority) **Special Cases:** + ```rust // Even logging = false can be overridden for debugging #[fastn_p2p::main(logging = false)] ``` + ```bash RUST_LOG=debug cargo run # Still enables logging despite logging = false ``` This design allows developers to debug any application by setting `RUST_LOG` without modifying source code. +## API Reference + +### 1. Simple Request/Response - `client::call()` + +For RPC-style communication with typed request/response patterns: + +```rust +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct EchoRequest { + message: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct EchoResponse { + reply: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct EchoError { + reason: String, +} + +// Client side - must specify protocol (same as server) +let response: Result = fastn_p2p::client::call( + my_key, + target_id52, + EchoProtocol, // Must match server's protocol + EchoRequest { message: "Hello!".to_string() } +).await?; + +match response { + Ok(resp) => println!("Got reply: {}", resp.reply), + Err(err) => println!("Server error: {}", err.reason), +} +``` + +### 2. Streaming Sessions - `client::connect()` + +For complex interactions requiring persistent bidirectional communication with optional side channels: + +```rust +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct RemoteShellProtocol { + shell: String, + args: Vec, +} + +// Client side - establish streaming session +let mut session = fastn_p2p::client::connect( + my_key, + target_id52, + RemoteShellProtocol { + shell: "bash".to_string(), + args: vec![], + } +).await?; + +// Direct access to main streams +fastn_p2p::spawn(pipe_stdin_to_remote(session.stdin)); +fastn_p2p::spawn(pipe_stdout_from_remote(session.stdout)); + +// Optional: accept side channels back from server (iroh terminology) +if let Ok(stderr_stream) = session.accept_uni().await { + fastn_p2p::spawn(pipe_stderr_from_remote(stderr_stream)); +} +``` + +### 3. Server Listening - `server::listen()` + +For accepting both call() and connect() patterns with unified Session handling: + +```rust +// Server side - listen for incoming connections +let mut listener = fastn_p2p::server::listen!(my_key, &[ + EchoProtocol, + RemoteShellProtocol::default(), +]).await?; + +while let Some(session) = listener.next().await { + match session.protocol { + EchoProtocol => { + // RPC pattern - convert Session to Request + let request = session.into_request(); + request.handle(|input: EchoRequest| async move { + Ok::(EchoResponse { + reply: format!("Echo: {}", input.message) + }) + }).await?; + } + RemoteShellProtocol { .. } => { + // Streaming pattern - use Session directly + let mut shell_session = session; + + // Direct access to streams (public fields) + fastn_p2p::spawn(handle_stdin(shell_session.recv)); + fastn_p2p::spawn(handle_stdout(shell_session.send)); + + // Optional: open stderr channel back to client + if let Ok(stderr) = shell_session.open_uni().await { + fastn_p2p::spawn(handle_stderr(stderr)); + } + } + } +} +``` + #### Shutdown Modes **Single Ctrl+C Mode (Default):** + ```rust #[fastn_p2p::main(shutdown_mode = "single_ctrl_c")] async fn main() -> eyre::Result<()> { ... } ``` + - Ctrl+C immediately triggers graceful shutdown - Wait `shutdown_timeout` for tasks to complete - Force exit if timeout exceeded - Simple and predictable for most applications **Double Ctrl+C Mode:** + ```rust #[fastn_p2p::main( shutdown_mode = "double_ctrl_c", @@ -147,6 +276,7 @@ async fn print_status() { println!("Services: {} active", get_service_count()); } ``` + - First Ctrl+C calls `status_fn` and waits for second Ctrl+C - Second Ctrl+C within `double_ctrl_c_window` triggers shutdown - If no second Ctrl+C, continues running normally @@ -386,4 +516,295 @@ See the `/tests` directory for complete working examples: For advanced use cases that need direct access to `fastn_net::Graceful`, you can still access it through `fastn_p2p::globals`, but this is discouraged for most applications. -The `#[fastn_p2p::main]` approach handles 99% of use cases while providing excellent ergonomics and maintainability. \ No newline at end of file +The `#[fastn_p2p::main]` approach handles 99% of use cases while providing excellent ergonomics and maintainability. + +## Complete API Reference + +### Client API + +```rust +pub mod client { + /// Simple request/response communication + pub async fn call( + our_key: fastn_id52::SecretKey, + target: fastn_id52::PublicKey, + protocol: PROTOCOL, + request: REQUEST + ) -> Result, CallError>; + + /// Establish streaming session + pub async fn connect( + our_key: fastn_id52::SecretKey, + target: fastn_id52::PublicKey, + protocol: PROTOCOL + ) -> Result; + + /// Client-side streaming session + pub struct Session { + pub stdin: iroh::endpoint::SendStream, // To server + pub stdout: iroh::endpoint::RecvStream, // From server + } + + impl Session { + /// Accept unidirectional stream back from server (e.g., stderr) + pub async fn accept_uni(&mut self) -> Result; + + /// Accept bidirectional stream back from server + pub async fn accept_bi(&mut self) -> Result<(iroh::endpoint::RecvStream, iroh::endpoint::SendStream), ConnectionError>; + } +} +``` + +### Server API + +```rust +pub mod server { + /// Listen for incoming connections + pub async fn listen( + our_key: fastn_id52::SecretKey, + protocols: &[PROTOCOL] + ) -> impl futures_core::stream::Stream>; + + /// Server-side session (handles both RPC and streaming) + pub struct Session { + pub protocol: PROTOCOL, // Protocol negotiated + pub send: iroh::endpoint::SendStream, // To client (stdout) + pub recv: iroh::endpoint::RecvStream, // From client (stdin) + // private: peer + } + + impl Session { + /// Get the peer's public key + pub fn peer(&self) -> &fastn_id52::PublicKey; + + /// Convert to Request for RPC handling (consumes Session) + pub fn into_request(self) -> Request; + + /// Open unidirectional stream back to the client (e.g., stderr) + pub async fn open_uni(&mut self) -> Result; + + /// Open bidirectional stream to back to the client + pub async fn open_bi(&mut self) -> Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream), ConnectionError>; + } + + /// Request handle for RPC-style communication + pub struct Request { + pub protocol: PROTOCOL, + // private: peer, send, recv + } + + impl Request { + /// Get the peer's public key + pub fn peer(&self) -> &fastn_id52::PublicKey; + + /// Read and deserialize JSON request, get response handle + pub async fn get_input(self) -> Result<(INPUT, ResponseHandle), GetInputError>; + + /// Handle request with async closure (automatic serialization) + pub async fn handle( + self, + handler: F + ) -> Result<(), HandleRequestError>; + } +} +``` + +### Global Coordination + +```rust +/// Spawn task with graceful shutdown tracking +pub fn spawn(task: F) -> tokio::task::JoinHandle; + +/// Check for graceful shutdown signal +pub async fn cancelled(); + +/// Trigger graceful shutdown +pub async fn shutdown() -> eyre::Result<()>; +``` + +### Macros + +```rust +/// Main function macro with automatic setup +#[fastn_p2p::main] +#[fastn_p2p::main(logging = "debug", shutdown_mode = "double_ctrl_c")] +``` + +## Use Cases Analysis + +This section analyzes various future use cases to validate the API design and identify areas needing additional consideration. + +### ✅ **Remote Shell/Command Execution** +**Pattern**: Streaming sessions with optional stderr +**Implementation**: +- `client::connect(target, RemoteShellProtocol)` establishes session +- Use `stdin`/`stdout` for main interaction +- Server calls `session.open_uni()` for stderr when needed +- **Status**: Fully supported by current design + +### ✅ **Audio/Video/Text Chat** +**Pattern**: Single connection with multiple stream types +**Implementation**: +- `client::connect(target, ChatProtocol)` establishes base session +- Use `session.stdin`/`stdout` for text messages +- Server/client use `open_bi()` for audio streams, `open_uni()` for video +- Protocol negotiates capabilities (audio/video support) +- **Status**: Well supported - unlimited side channels enable this + +### ✅ **TCP Proxy (Port Forwarding)** +**Pattern**: Bridge local TCP socket to P2P session +**Implementation**: +- `client::connect(target, TcpProxyProtocol { port: 8080 })` +- Bridge local TCP socket ↔ `session.stdin`/`stdout` +- Server bridges session streams ↔ local TCP socket to target port +- **Status**: Trivial - direct stream bridging + +### 🤔 **HTTP Proxy (Multiple Requests per Connection)** +**Pattern**: Reuse connection for multiple HTTP requests +**Implementation Options**: +1. **New session per request** - High overhead but simple +2. **Session reuse** - `open_bi()` for each HTTP request within same session +3. **Request multiplexing** - Send multiple HTTP requests over same streams + +**Considerations Needed**: +- Session reuse patterns in API design +- Request correlation/multiplexing within sessions +- Connection pooling for efficiency + +### ✅ **KVM (Remote Desktop/Input)** +**Pattern**: Multiple unidirectional event streams +**Implementation**: +- `client::connect(target, KvmProtocol)` establishes session +- Server uses `open_uni()` for screen updates (high bandwidth) +- Client uses `open_uni()` for mouse/keyboard events (low latency) +- Multiple streams allow prioritization (input vs display) +- **Status**: Well supported - side channels handle different data types + +### 🤔 **File Transfer (SCP-like)** +**Pattern**: Large file transfers with progress +**Implementation**: +- `client::connect(target, FileTransferProtocol)` +- Use `stdin`/`stdout` for file data +- Use `open_uni()` for progress updates, metadata +- **Considerations**: Large stream handling, resume capability + +## Design Gaps Identified + +### 1. **Hierarchical Cancellation** ⚠️ +**Current**: Global `fastn_p2p::cancelled()` affects entire application +**Need**: Tree-based cancellation to shut down specific features +```rust +// Proposed +let cancellation_token = fastn_p2p::CancellationToken::new(); +let listener = fastn_p2p::server::listen(key, protocols) + .with_cancellation(cancellation_token.clone()).await?; + +// Later: cancel just this listener +cancellation_token.cancel(); +``` + +### 2. **Hierarchical Status/Metrics** ⚠️ +**Current**: Basic connection counting +**Need**: Tree-based status with timing and nested metrics +```rust +// Proposed +fastn_p2p::status() -> StatusTree +├── Application (45.2s uptime) +├── Remote Access (32.1s active) +│ ├── alice connection (12.3s, 2 streams) +│ └── bob connection (5.7s, 1 stream) +├── HTTP Proxy (45.2s active, 234 requests handled) +``` + +### 3. **Session Reuse Patterns** ⚠️ +**Current**: Each `connect()` creates new session +**Need**: Patterns for reusing sessions for multiple operations (HTTP-like) +```rust +// May need +session.create_request_channel().await? // For HTTP-like patterns +``` + +## Context: Hierarchical Cancellation and Metrics + +To address the hierarchical gaps, fastn-p2p will include a `Context` system that provides tree-based cancellation, metrics, and status tracking. + +### Context API + +```rust +pub struct Context { + // Identity and timing + name: String, + created_at: std::time::Instant, + + // Tree structure + children: std::sync::Arc>>>, + + // Functionality + cancellation: tokio_util::sync::CancellationToken, + metrics: std::sync::Arc>>, + data: std::sync::Arc>>, +} + +impl Context { + /// Create new child context with given name + pub fn create_child(&self, name: &str) -> std::sync::Arc; + + /// Wait for cancellation signal + pub async fn wait(&self); + + /// Cancel this context and all children recursively + pub fn cancel(&self); + + /// Spawn task that inherits this context's cancellation + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle; + + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); + + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); +} + +/// Global status function - prints context tree +pub fn status() -> StatusTree; +``` + +### Automatic Context Creation + +```rust +// Contexts are auto-created and passed to handlers: + +// 1. Global context (created by #[fastn_p2p::main]) +// 2. Listener context (created by server::listen()) +// 3. Session context (created per connection) + +async fn handle_remote_shell(session: server::Session) { + let ctx = session.context(); // Auto-created session context + + // Spawned tasks inherit session's cancellation automatically + ctx.spawn(handle_stdout(session.send)); + ctx.spawn(handle_stdin(session.recv)); + + // When session ends or is cancelled, all spawned tasks stop +} +``` + +### Status Tree Output + +``` +$ fastn status +Application Context (45.2s uptime) +├── Remote Access Listener (32.1s active, 2 connections) +│ ├── alice@bv478gen... (12.3s, 2 streams active) +│ │ ├── stdout handler (12.3s) +│ │ └── stderr stream (8.1s) +│ └── bob@p2nd7avq... (5.7s, 1 stream active) +│ └── shell session (5.7s) +├── HTTP Proxy Listener (45.2s active, 234 requests) +│ └── connection pool (15 active connections) +└── Chat Service (12.1s active, 3 users) + ├── alice chat (8.2s, text + audio) + └── bob chat (3.1s, text only) +``` + +**Recommendation**: Start with current Session/Request API, add Context as enhancement layer that threads through the existing design.