diff --git a/Cargo.lock b/Cargo.lock index d8ee37ad4..adb7ff3bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1829,6 +1829,24 @@ dependencies = [ "regex", ] +[[package]] +name = "fastn-context" +version = "0.1.0" +dependencies = [ + "fastn-context-macros", + "tokio", + "tokio-util", +] + +[[package]] +name = "fastn-context-macros" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "fastn-core" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index e66617493..69e5b9b18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,8 @@ members = [ "clift", "fastn", "fastn-builtins", + "fastn-context", + "fastn-context-macros", "fastn-core", "fastn-daemon", "fastn-ds", @@ -89,6 +91,7 @@ enum-iterator = "0.6" enum-iterator-derive = "0.6" env_logger = "0.11" fastn-builtins.path = "fastn-builtins" +fastn-context.path = "fastn-context" fastn-core.path = "fastn-core" fastn-ds.path = "fastn-ds" fastn-daemon.path = "fastn-daemon" @@ -139,6 +142,7 @@ snafu = "0.8" thiserror = "2" tokio = { version = "1", features = ["full"] } tokio-postgres = { version = "0.7", features = ["with-serde_json-1", "with-uuid-1"] } +tokio-util = "0.7" tracing = "0.1" url = "2" walkdir = "2" diff --git a/fastn-context-macros/Cargo.toml b/fastn-context-macros/Cargo.toml new file mode 100644 index 000000000..984941ed9 --- /dev/null +++ b/fastn-context-macros/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "fastn-context-macros" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +description.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +rust-version.workspace = true + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1" +quote = "1" +syn = { version = "2", features = ["full", "extra-traits"] } \ No newline at end of file diff --git a/fastn-context-macros/src/lib.rs b/fastn-context-macros/src/lib.rs new file mode 100644 index 000000000..9bb31b100 --- /dev/null +++ b/fastn-context-macros/src/lib.rs @@ -0,0 +1,35 @@ +use proc_macro::TokenStream; +use quote::quote; +use syn::{ItemFn, parse_macro_input}; + +/// Main function attribute macro for fastn applications with context support +#[proc_macro_attribute] +pub fn main(_args: TokenStream, input: TokenStream) -> TokenStream { + let input_fn = parse_macro_input!(input as ItemFn); + + let user_fn_name = syn::Ident::new("__fastn_user_main", proc_macro2::Span::call_site()); + let fn_block = &input_fn.block; + let fn_attrs = &input_fn.attrs; + let fn_vis = &input_fn.vis; + + quote! { + #(#fn_attrs)* + #fn_vis fn main() -> std::result::Result<(), Box> { + // Initialize tokio runtime + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()? + .block_on(async { + // Global context automatically created + + // Call user's main function + let result = #user_fn_name().await; + + result + }) + } + + async fn #user_fn_name() -> std::result::Result<(), Box> #fn_block + } + .into() +} diff --git a/fastn-context/Cargo.toml b/fastn-context/Cargo.toml new file mode 100644 index 000000000..6278095d5 --- /dev/null +++ b/fastn-context/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "fastn-context" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +description.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +rust-version.workspace = true + +[dependencies] +tokio.workspace = true +tokio-util.workspace = true +fastn-context-macros = { path = "../fastn-context-macros" } \ No newline at end of file diff --git a/fastn-context/NEXT-complete-design.md b/fastn-context/NEXT-complete-design.md new file mode 100644 index 000000000..eb48306d9 --- /dev/null +++ b/fastn-context/NEXT-complete-design.md @@ -0,0 +1,844 @@ +# fastn-context: Hierarchical Application Context for Debugging and Operations + +This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. + +> **Note**: This README documents the complete design iteratively. Some sections may overlap as features build on each other. The design is internally consistent - later sections refine and extend earlier concepts. + +## Design Philosophy + +- **Hierarchical Structure**: Applications naturally form trees of operations +- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents +- **Zero Boilerplate**: Context trees build themselves as applications run +- **Production Ready**: Status trees enable debugging of stuck/slow operations +- **Bounded Complexity**: Simple spawn vs detailed child creation as needed + +## Core Concepts + +### Context Tree Structure + +Every fastn application forms a natural hierarchy: + +``` +Global Context (application level) +├── Service Context (e.g., "remote-access-listener") +│ ├── Session Context (e.g., "alice@bv478gen") +│ │ ├── Task Context (e.g., "stdout-handler") +│ │ └── Task Context (e.g., "stderr-stream") +│ └── Session Context (e.g., "bob@p2nd7avq") +├── Service Context (e.g., "http-proxy") +└── Service Context (e.g., "chat-service") +``` + +### Automatic Context Creation + +fastn-context integrates seamlessly with fastn ecosystem: + +```rust +// 1. Global context created by main macro +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically available +} + +// 2. Service contexts created by operations +let listener = fastn_p2p::server::listen(key, protocols).await?; +// Creates child context: "p2p-listener" under global + +// 3. Session contexts created per connection +// Each incoming connection gets child context: "session-{peer_id}" + +// 4. Task contexts created by spawn operations +session_ctx.child("shell-handler").spawn(handle_shell); +``` + +## API Reference + +### Core Context + +```rust +pub struct Context { + /// Context name for debugging/status + pub name: String, + + /// When this context was created + pub created_at: std::time::Instant, + + // Private: parent, children, cancellation, metrics, data +} + +impl Context { + /// Create new root context (typically only used by main macro) + pub fn new(name: &str) -> std::sync::Arc; + + /// Create child context with given name + pub fn child(&self, name: &str) -> ContextBuilder; + + /// Simple spawn (inherits current context, no child creation) + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle + where F: std::future::Future + Send + 'static; + + /// Wait for cancellation signal + pub async fn wait(&self); + + /// Cancel this context and all children recursively + pub fn cancel(&self); + + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); + + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); + + /// Get stored data + pub fn get_data(&self, key: &str) -> Option; + + /// Increment total counter (historical count) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (current active count) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (when operation completes) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} +``` + +### Context Builder + +```rust +pub struct ContextBuilder { + // Pre-created child context ready for configuration +} + +impl ContextBuilder { + /// Add initial data to context + pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; + + /// Add initial metric to context + pub fn with_metric(self, key: &str, value: MetricValue) -> Self; + + /// Spawn task with this configured child context + pub fn spawn(self, task: F) -> tokio::task::JoinHandle + where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; +} +``` + +### Global Access + +```rust +/// Get the global application context +pub fn global() -> std::sync::Arc; + +/// Get current task's context (thread-local or task-local) +pub fn current() -> std::sync::Arc; + +/// Print status tree for debugging +pub fn status() -> StatusTree; +``` + +### Metric Types + +```rust +#[derive(Debug, Clone)] +pub enum MetricValue { + Counter(u64), + Gauge(f64), + Duration(std::time::Duration), + Text(String), + Bytes(u64), +} +``` + +## Usage Patterns + +### Simple Task Spawning + +```rust +// Inherit current context (no child creation) +let ctx = fastn_context::current(); +ctx.spawn(async { + // Simple background task +}); +``` + +### Detailed Task Spawning + +```rust +// Create child context with debugging info +ctx.child("remote-shell-handler") + .with_data("peer", alice_id52) + .with_data("shell", "bash") + .with_metric("commands_executed", 0) + .spawn(|task_ctx| async move { + // Task can update its own context + task_ctx.add_metric("commands_executed", cmd_count); + task_ctx.set_data("last_command", "ls -la"); + + // Task waits for its own cancellation + tokio::select! { + _ = task_ctx.wait() => { + println!("Shell handler cancelled"); + } + _ = handle_shell_session() => { + println!("Shell session completed"); + } + } + }); +``` + +### Status Tree Output + +``` +$ fastn status +Global Context (2h 15m 32s uptime) +├── Remote Access Listener (1h 45m active) +│ ├── alice@bv478gen (23m 12s, bash shell) +│ │ ├── stdout-handler (23m 12s, 15.2MB processed) +│ │ └── stderr-stream (18m 45s, 2.1KB processed) +│ └── bob@p2nd7avq (8m 33s, ls command) +│ └── command-executor (8m 33s, exit pending) +├── HTTP Proxy (2h 15m active) +│ ├── connection-pool (45 active, 1,234 requests) +│ └── request-handler-pool (12 workers active) +└── Chat Service (35m active) + ├── presence-monitor (35m, 15 users tracked) + └── message-relay (35m, 4,567 messages) +``` + +## Integration with fastn-p2p + +fastn-p2p depends on fastn-context and automatically creates context hierarchies: + +```rust +// fastn-p2p sessions provide access to their context +async fn handle_remote_shell(session: fastn_p2p::server::Session) { + let ctx = session.context(); // Auto-created by fastn-p2p + + // Simple spawn (inherits session context) + ctx.spawn(pipe_stdout(session.send)); + + // Detailed spawn (creates child for debugging) + ctx.child("command-executor") + .with_data("command", session.protocol.command) + .spawn(|task_ctx| async move { + let result = execute_command(&session.protocol.command).await; + task_ctx.set_data("exit_code", result.code); + }); +} +``` + +## Main Function Integration + +The main macro moves to fastn-context and sets up the global context: + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically created and available + + let ctx = fastn_context::global(); + ctx.child("startup") + .with_data("version", env!("CARGO_PKG_VERSION")) + .spawn(|_| async { + // Application initialization + }); +} +``` + +## Design Benefits + +1. **Names Required for Debugging** - Every important operation has a name in status tree +2. **Selective Complexity** - Simple spawn vs detailed child creation as needed +3. **Automatic Tree Building** - Context hierarchy builds as application runs +4. **Production Debugging** - `fastn status` shows exactly where system is stuck +5. **Clean Separation** - Context concerns separate from networking concerns +6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure + +**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. + +## Comprehensive Timing and Lock Monitoring + +Every context and operation tracks detailed timing for real-time debugging, including named lock monitoring for deadlock detection. + +### Timing Integration + +```rust +pub struct Context { + pub name: String, + pub created_at: std::time::Instant, // When context started + pub last_activity: std::sync::Arc>, // Last activity + // ... other fields +} + +impl Context { + /// Update last activity timestamp (called automatically by operations) + pub fn touch(&self); + + /// Get how long this context has been alive + pub fn duration(&self) -> std::time::Duration; + + /// Get how long since last activity + pub fn idle_duration(&self) -> std::time::Duration; + + /// Create named mutex within this context + pub fn mutex(&self, name: &str, data: T) -> ContextMutex; + + /// Create named RwLock within this context + pub fn rwlock(&self, name: &str, data: T) -> ContextRwLock; + + /// Create named semaphore within this context + pub fn semaphore(&self, name: &str, permits: usize) -> ContextSemaphore; +} +``` + +### Named Lock Types + +```rust +pub struct ContextMutex { + name: String, + context: std::sync::Arc, + inner: tokio::sync::Mutex, +} + +impl ContextMutex { + /// Lock with automatic status tracking + pub async fn lock(&self) -> ContextMutexGuard; +} + +pub struct ContextMutexGuard { + acquired_at: std::time::Instant, // When lock was acquired + context_name: String, // Which context holds it + lock_name: String, // Lock identifier + // Auto-reports to context status system + // Auto-cleanup on drop +} +``` + +### Detailed Status Output with Comprehensive Timing + +``` +$ fastn status +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +⏳ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK + +⚠️ Potential Issues: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O +``` + +### Automatic Activity Tracking + +```rust +// All operations automatically maintain timing +ctx.spawn(async { + // ctx.touch() called when task starts + loop { + do_work().await; + ctx.touch(); // Update activity timestamp + } +}); + +// Lock operations update timing automatically +let guard = ctx.mutex("data-lock", data).lock().await; +// Updates: context last_activity, tracks lock hold time + +// Long operations should periodically touch +async fn long_running_task(ctx: std::sync::Arc) { + loop { + process_batch().await; + ctx.touch(); // Show we're still active, not stuck + + tokio::select! { + _ = ctx.wait() => break, // Cancelled + _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {} + } + } +} +``` + +This provides **real-time operational debugging** - administrators can instantly identify stuck operations, deadlocked tasks, and performance bottlenecks with precise timing information. + +## Counter Management System + +Every context can track both historical totals and live counts for detailed operational metrics. + +### Global Counter Storage with Dotted Paths + +```rust +pub struct Context { + pub name: String, + pub full_path: String, // "global.remote-access.alice@bv478gen.stdout-handler" + // ... other fields +} + +impl Context { + /// Get full dotted path for this context + pub fn path(&self) -> &str; + + /// Increment total counter (stored in global hashmap by full path) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (stored in global hashmap by full path) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (stored in global hashmap by full path) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values (retrieved from global storage) + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} + +// Global counter storage (persists beyond context lifetimes) +static GLOBAL_COUNTERS: LazyLock>> = ...; + +// Counter keys format: "{context_path}.{counter_name}" +// Examples: +// "global.connections" -> 1,247 +// "global.remote-access.connections" -> 234 +// "global.remote-access.alice@bv478gen.commands" -> 45 +// "global.http-proxy.requests" -> 1,013 +``` + +### Automatic Counter Integration + +```rust +// fastn-p2p automatically maintains connection counters +async fn handle_incoming_connection(session: fastn_p2p::server::Session) { + let ctx = session.context(); + + // Automatically tracked by fastn-p2p: + ctx.increment_total("connections"); // Total connections ever + ctx.increment_live("connections"); // Current active connections + + // Your handler code... + + // When session ends: + ctx.decrement_live("connections"); // Automatically called +} + +// Custom counters for application logic +async fn handle_remote_command(session: server::Session) { + let ctx = session.context(); + + ctx.increment_total("commands"); // Total commands executed + ctx.increment_live("commands"); // Currently executing commands + + let result = execute_command(&session.protocol.command).await; + + ctx.decrement_live("commands"); // Command completed + + if result.success { + ctx.increment_total("successful_commands"); + } else { + ctx.increment_total("failed_commands"); + } +} +``` + +### Enhanced Status Display with Counters + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Total: 1,247 connections, 15,432 requests | Live: 47 connections, 12 active requests +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── Total: 234 connections, 2,156 commands | Live: 2 connections, 3 commands +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── Total: 45 commands (42 success, 3 failed) | Live: 1 command +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ ├── Total: 12 commands (12 success) | Live: 1 command +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── Total: 1,013 requests (987 success, 26 failed) | Live: 45 connections, 8 requests +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── Total: 4,567 messages, 89 users joined | Live: 15 users, 3 conversations + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): ... +⏳ Lock Waiters (1): ... +``` + +### Counter Storage and Paths + +```rust +// Counter keys are automatically generated from context paths: + +// Global level counters +// "global.connections" -> 1,247 total +// "global.live_connections" -> 47 current + +// Service level counters +// "global.remote-access.connections" -> 234 total +// "global.remote-access.live_connections" -> 2 current + +// Session level counters +// "global.remote-access.alice@bv478gen.commands" -> 45 total +// "global.remote-access.alice@bv478gen.live_commands" -> 1 current + +// Task level counters +// "global.remote-access.alice@bv478gen.stdout-handler.bytes_processed" -> 1,234,567 + +// Examples in code: +async fn handle_connection(session: server::Session) { + let ctx = session.context(); // Path: "global.remote-access.alice@bv478gen" + + // These create global entries: + ctx.increment_total("commands"); // Key: "global.remote-access.alice@bv478gen.commands" + ctx.increment_live("commands"); // Key: "global.remote-access.alice@bv478gen.live_commands" + + // Nested task context + ctx.child("stdout-handler").spawn(|task_ctx| async move { + // task_ctx.path() -> "global.remote-access.alice@bv478gen.stdout-handler" + task_ctx.increment_total("bytes_processed"); + }); +} +``` + +### Persistent Counter Benefits + +- **✅ Survives context drops** - Counters stored globally, persist after contexts end +- **✅ Hierarchical aggregation** - Can sum all child counters for parent totals +- **✅ Path-based queries** - Easy to find counters by context path +- **✅ Historical tracking** - Total counters accumulate across all context instances +- **✅ Live tracking** - Live counters automatically decremented when contexts drop + +**Live counters** show current activity (auto-decremented on context drop). +**Total counters** show historical activity (persist forever for trending). +**Global storage** ensures metrics survive context lifecycles. + +## Status Monitoring and HTTP Dashboard + +fastn-context automatically provides multiple ways to access real-time status information for debugging and monitoring. + +### P2P Status Access + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Status automatically available over P2P for remote access + // No HTTP server needed - uses secure P2P connections + + // Your application code... +} +``` + +Status is accessible over the P2P network using the remote access system. + +### Status API Functions + +```rust +/// Get current status snapshot with ANSI formatting +pub fn status() -> Status; + +/// Stream of status updates (max once per second) +pub fn status_stream() -> impl futures_core::stream::Stream; + +/// Get raw status data as structured JSON +pub fn status_json() -> serde_json::Value; +``` + +### Status Type with ANSI Display + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct Status { + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct ContextStatus { + pub name: String, + pub duration: std::time::Duration, + pub last_activity: std::time::Duration, // Time since last activity + pub children: Vec, + pub metrics: std::collections::HashMap, + pub data: std::collections::HashMap, + pub total_counters: std::collections::HashMap, // Historical counts + pub live_counters: std::collections::HashMap, // Current active counts +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct LockStatus { + pub name: String, + pub held_by_context: String, + pub held_duration: std::time::Duration, + pub lock_type: LockType, // Mutex, RwLock, Semaphore +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct StatusWarning { + pub message: String, + pub context_path: String, + pub severity: WarningSeverity, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub enum WarningSeverity { + Info, // FYI information + Warning, // Potential issue + Critical, // Likely problem +} +``` + +### ANSI-Formatted Display + +```rust +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use colored::*; // For ANSI colors + + // Header with timestamp + writeln!(f, "{}", "fastn Status Dashboard".bold().blue())?; + writeln!(f, "{}", format!("Snapshot: {}", + humantime::format_rfc3339(self.timestamp)).dimmed())?; + writeln!(f)?; + + // Context tree with colors and timing + self.display_context_tree(f, &self.global_context, 0)?; + + // Active locks section + if !self.active_locks.is_empty() { + writeln!(f, "\n{} Active Locks ({}):", "🔒".yellow(), self.active_locks.len())?; + for lock in &self.active_locks { + let duration_str = humantime::format_duration(lock.held_duration); + let color = if lock.held_duration.as_secs() > 10 { + "red" + } else { + "white" + }; + writeln!(f, " - \"{}\" held by {} ({})", + lock.name.cyan(), + lock.held_by_context.white(), + duration_str.color(color))?; + } + } + + // Lock waiters section + if !self.lock_waiters.is_empty() { + writeln!(f, "\n{} Lock Waiters ({}):", "⏳".yellow(), self.lock_waiters.len())?; + for waiter in &self.lock_waiters { + let duration_str = humantime::format_duration(waiter.waiting_duration); + writeln!(f, " - {} waiting for \"{}\" ({})", + waiter.context_name.white(), + waiter.lock_name.cyan(), + duration_str.red())?; + } + } + + // Warnings section + if !self.warnings.is_empty() { + writeln!(f, "\n{} Warnings:", "⚠️".red())?; + for warning in &self.warnings { + let icon = match warning.severity { + WarningSeverity::Info => "ℹ️", + WarningSeverity::Warning => "⚠️", + WarningSeverity::Critical => "🚨", + }; + writeln!(f, " {} {}", icon, warning.message.yellow())?; + } + } + + Ok(()) + } +} +``` + +### Status Stream (Event-Driven Updates) + +```rust +/// Stream provides updates only when context tree actually changes +/// No polling - efficient for long-running monitoring +let mut status_stream = fastn_context::status_stream(); +while let Some(status) = status_stream.next().await { + // Only prints when something actually changes + print!("\x1B[2J\x1B[H"); // Clear screen + println!("{}", status); // Display with colors +} +``` + +### CLI Integration with P2P Status Access + +fastn-context integrates with the main fastn CLI to provide both local and remote status access: + +```bash +# Local machine status +fastn status # One-time snapshot with ANSI colors +fastn status -w # Watch mode (event-driven, no polling) +fastn status --json # JSON output for programmatic use + +# Remote machine status over P2P (requires remote access) +fastn status alice # Status from machine with alias "alice" +fastn status bv478gen... # Status from machine with ID52 +fastn status alice -w # Watch remote machine's status in real-time +fastn status alice --json # Remote machine status as JSON + +# Multiple machines +fastn status alice,bob,prod # Status from multiple machines +``` + +**P2P Status Protocol:** +- Uses secure fastn remote access (same as `fastn rshell`) +- Requires target machine in your `remote-access/config.toml` +- Status data transmitted over encrypted P2P connection +- Real-time streaming for remote watch mode + +### Status Protocol Integration + +Status access integrates seamlessly with fastn's remote access system: + +```rust +// Status is available as a built-in remote command +// When fastn-daemon receives status requests, fastn-context provides the data + +// Server side - automatic status command handling +// fastn-daemon automatically handles: +// - StatusRequest -> returns current Status +// - StatusStreamRequest -> returns real-time Status stream + +// Client side - transparent remote access +fastn status alice // Translates to fastn_p2p::client::call(alice, StatusRequest) +fastn status alice -w // Translates to fastn_p2p::client::connect(alice, StatusStreamProtocol) +``` + +This gives **comprehensive status access** - terminal, HTTP, streaming, and programmatic - all from the same underlying Status structure with rich ANSI formatting for human consumption. + +## System Metrics Monitoring + +fastn-context automatically monitors system resources and integrates them into the status display. + +### Automatic System Monitoring + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct SystemMetrics { + pub cpu_usage_percent: f32, // Current CPU usage + pub memory_used_bytes: u64, // RAM usage + pub memory_total_bytes: u64, // Total RAM + pub disk_used_bytes: u64, // Disk usage + pub disk_total_bytes: u64, // Total disk + pub network_rx_bytes_per_sec: u64, // Network receive rate + pub network_tx_bytes_per_sec: u64, // Network transmit rate + pub load_average: [f32; 3], // 1min, 5min, 15min load + pub uptime: std::time::Duration, // System uptime +} + +// Added to Status structure +pub struct Status { + pub system_metrics: SystemMetrics, // System resource usage + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} +``` + +### Efficient Metric Collection + +```rust +// System metrics cached and updated appropriately: +// - CPU usage: Updated every 1 second (smooth average) +// - Memory/disk: Updated every 5 seconds (less volatile) +// - Network rates: Updated every 1 second (calculated from deltas) +// - Load average: Updated every 10 seconds (system provides this) + +// Metrics only recalculated when status is actually requested +// No background polling unless someone is watching +``` + +### Enhanced Status Display with System Info + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +⏳ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK + +⚠️ Alerts: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O + - CPU usage normal (12.3%), memory usage low (13%) +``` + +### Watch Mode (`fastn status -w`) + +```rust +// Event-driven updates - only when something changes +// No CPU overhead when system is idle +// Immediately shows when new contexts/locks appear or disappear + +$ fastn status -w +# Screen updates only when: +# - New context created/destroyed +# - Lock acquired/released +# - Significant activity changes +# - System metrics cross thresholds +# - No updates for days if system is stable +``` + +This provides **complete operational visibility** with both application-specific context trees and system resource monitoring, all with efficient event-driven updates instead of wasteful polling. \ No newline at end of file diff --git a/fastn-context/NEXT-counters.md b/fastn-context/NEXT-counters.md new file mode 100644 index 000000000..f240c3135 --- /dev/null +++ b/fastn-context/NEXT-counters.md @@ -0,0 +1,37 @@ +# NEXT: Global Counter Storage System + +Features for persistent counter tracking with dotted path keys. + +## Counter Types + +- **Total counters** - Historical cumulative counts (persist after context drops) +- **Live counters** - Current active operations (auto-decremented on drop) + +## Global Storage + +```rust +// Dotted path keys in global HashMap +"global.connections" -> 1,247 +"global.remote-access.alice@bv478gen.commands" -> 45 +"global.http-proxy.requests" -> 1,013 +``` + +## API + +```rust +impl Context { + pub fn increment_total(&self, counter: &str); + pub fn increment_live(&self, counter: &str); + pub fn decrement_live(&self, counter: &str); + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} +``` + +## Integration + +- fastn-p2p automatically tracks connection/request counters +- Counters survive context drops via global storage +- Hierarchical aggregation possible via path prefix matching + +**Implementation**: After basic Context + monitoring foundation. \ No newline at end of file diff --git a/fastn-context/NEXT-locks.md b/fastn-context/NEXT-locks.md new file mode 100644 index 000000000..9853624da --- /dev/null +++ b/fastn-context/NEXT-locks.md @@ -0,0 +1,32 @@ +# NEXT: Named Locks and Deadlock Detection + +Features for named lock monitoring and deadlock detection. + +## Named Lock Types + +- `ContextMutex` - Named mutex with timing tracking +- `ContextRwLock` - Named read-write lock +- `ContextSemaphore` - Named semaphore with permit tracking + +## Lock Monitoring + +- Track who holds what locks and for how long +- Track who's waiting for locks and wait times +- Automatic deadlock risk detection +- Lock status in context tree display + +## Usage Pattern + +```rust +// Create named locks within context +let user_lock = ctx.mutex("user-data-lock", UserData::new()); + +// Lock operations automatically tracked +let guard = user_lock.lock().await; +// Shows in status: "HOLDS user-data-lock (2.3s)" + +// Waiting operations tracked +// Shows in status: "WAITING user-data-lock (5.1s) ⚠️ STUCK" +``` + +**Implementation**: After basic Context system + monitoring foundation. \ No newline at end of file diff --git a/fastn-context/NEXT-metrics-and-data.md b/fastn-context/NEXT-metrics-and-data.md new file mode 100644 index 000000000..e4ba056e3 --- /dev/null +++ b/fastn-context/NEXT-metrics-and-data.md @@ -0,0 +1,121 @@ +# NEXT: Metrics and Data Storage + +Features for storing metrics and arbitrary data on contexts for debugging and monitoring. + +## Metric Storage + +```rust +impl Context { + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); +} + +#[derive(Debug, Clone)] +pub enum MetricValue { + Counter(u64), + Gauge(f64), + Duration(std::time::Duration), + Text(String), + Bytes(u64), +} +``` + +## Data Storage + +```rust +impl Context { + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); + + /// Get stored data + pub fn get_data(&self, key: &str) -> Option; +} +``` + +## Builder Pattern Integration + +```rust +impl ContextBuilder { + /// Add initial data to context + pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; + + /// Add initial metric to context + pub fn with_metric(self, key: &str, value: MetricValue) -> Self; +} +``` + +## Usage Examples + +```rust +// Add metrics during task execution +task_ctx.add_metric("commands_executed", MetricValue::Counter(cmd_count)); +task_ctx.add_metric("response_time", MetricValue::Duration(elapsed)); + +// Store debugging data +task_ctx.set_data("last_command", serde_json::Value::String("ls -la".to_string())); +task_ctx.set_data("exit_code", serde_json::Value::Number(0.into())); + +// Pre-configure context with builder +ctx.child("remote-shell-handler") + .with_data("peer", serde_json::Value::String(alice_id52)) + .with_data("shell", serde_json::Value::String("bash".to_string())) + .with_metric("commands_executed", MetricValue::Counter(0)) + .spawn(|task_ctx| async move { + // Task starts with pre-configured data and metrics + }); +``` + +## Automatic Request Tracking + +For contexts that call `.persist()`, automatic time-windowed counters will be maintained: + +```rust +// Automatic counters for persisted contexts (no manual tracking needed) +// Uses full dotted context path as key + +// When ctx.persist() is called on "global.p2p.alice@bv478gen.stream-123": +// Auto-increments these counters: +"global.p2p.alice@bv478gen.requests_since_start" // Total ever +"global.p2p.alice@bv478gen.requests_last_day" // Last 24 hours +"global.p2p.alice@bv478gen.requests_last_hour" // Last 60 minutes +"global.p2p.alice@bv478gen.requests_last_minute" // Last 60 seconds +"global.p2p.alice@bv478gen.requests_last_second" // Last 1 second + +// Hierarchical aggregation automatically available: +"global.p2p.requests_last_hour" // All P2P requests +"global.requests_last_hour" // All application requests +``` + +### Time Window Implementation + +```rust +// Sliding window counters with efficient circular buffers +// Updated automatically when any context calls persist() + +// Status display shows rates: +✅ global.p2p.alice@bv478gen (23m, active) + Requests: 1,247 total | 234 last hour | 45 last minute | 2/sec current + +// Automatic rate calculation and trending +``` + +### Usage Pattern + +```rust +// P2P stream handler +async fn handle_stream(ctx: Arc) { + // Process stream... + ctx.persist(); // Automatically increments all time window counters + + // No manual counter management needed! + // All metrics tracked automatically by dotted context path +} + +// HTTP request handler +async fn handle_request(ctx: Arc) { + // Process request... + ctx.persist(); // Auto-tracks "global.http.endpoint-xyz.requests_*" +} +``` + +**Implementation**: After basic Context + counter storage foundation. \ No newline at end of file diff --git a/fastn-context/NEXT-monitoring.md b/fastn-context/NEXT-monitoring.md new file mode 100644 index 000000000..644271d75 --- /dev/null +++ b/fastn-context/NEXT-monitoring.md @@ -0,0 +1,34 @@ +# NEXT: Comprehensive Monitoring System + +Features planned for future implementation after basic Context system is working. + +## Status Trees and Monitoring + +- Hierarchical status display with timing +- ANSI-formatted status output +- Event-driven status updates +- System metrics integration (CPU, RAM, disk, network) +- P2P status distribution (`fastn status `) + +## Counter Management + +- Global counter storage with dotted paths +- Total vs live counter tracking +- Automatic counter integration +- Hierarchical counter aggregation + +## Named Locks + +- ContextMutex, ContextRwLock, ContextSemaphore +- Deadlock detection and timing +- Lock status in monitoring tree +- Wait time tracking + +## Advanced Features + +- Builder pattern: `ctx.child("name").with_data().spawn()` +- Metric types and data storage +- HTTP status endpoints +- Status streaming API + +**Implementation**: After basic Context + fastn-p2p integration is complete. \ No newline at end of file diff --git a/fastn-context/NEXT-operation-tracking.md b/fastn-context/NEXT-operation-tracking.md new file mode 100644 index 000000000..bfce50f23 --- /dev/null +++ b/fastn-context/NEXT-operation-tracking.md @@ -0,0 +1,84 @@ +# NEXT: Operation Tracking for Precise Debugging + +Features for tracking exactly where tasks are stuck using named await and select operations. + +## Named Await Operations + +```rust +/// Track what operation a context is waiting for +let result = fastn_context::await!("waiting-for-response", some_operation()); +// No .await suffix - the macro handles it + +// Examples +let data = fastn_context::await!("reading-file", std::fs::read("config.toml")); +let response = fastn_context::await!("http-request", client.get(url).send()); +let connection = fastn_context::await!("database-connect", db.connect()); +``` + +## Simple Select Tracking + +```rust +/// Track that context is stuck on select (no branch naming needed) +fastn_context::select! { + _ = task_ctx.wait() => println!("Cancelled"), + _ = stream.read() => println!("Data received"), + _ = database.query() => println!("Query complete"), +} +// Records: "stuck on select" +``` + +## Status Display with Operation Names + +``` +Global Context (2h 15m uptime) +├── Remote Access Listener +│ ├── alice@bv478gen (23m connected) +│ │ ├── stdout-handler (stuck on: "reading-stream" 12.3s) ⚠️ STUCK +│ │ └── stderr-stream (stuck on: "select" 8.1s) +│ └── bob@p2nd7avq (stuck on: "database-query" 0.2s) ✅ ACTIVE +└── HTTP Proxy (stuck on: "select" 0.1s) ✅ ACTIVE +``` + +## Design Principles + +### Single Operation per Context +- **Good design**: One await/select per context encourages proper task breakdown +- **Multiple selects**: Suggests need for child contexts instead + +```rust +// ❌ Complex - hard to debug where it's stuck +fastn_context::select! { /* 5 different operations */ } +fastn_context::select! { /* 3 more operations */ } + +// ✅ Clear - each operation has its own context +ctx.spawn_child("network-handler", |ctx| async move { + fastn_context::select! { /* network operations */ } +}); +ctx.spawn_child("database-handler", |ctx| async move { + let result = fastn_context::await!("user-query", db.get_user(id)); +}); +``` + +### Automatic Operation Tracking + +```rust +// Context automatically tracks current operation +pub struct Context { + current_operation: std::sync::Arc>>, + operation_started: std::sync::Arc>>, +} + +// Status can show: +// - What operation is running +// - How long it's been running +// - If it's stuck (running too long) +``` + +## Benefits + +1. **Precise debugging** - Know exactly where each task is stuck +2. **Performance insights** - See which operations take too long +3. **Design enforcement** - Encourages proper context decomposition +4. **Production monitoring** - Real-time operation visibility + +**Implementation**: After basic Context + monitoring system is complete. \ No newline at end of file diff --git a/fastn-context/NEXT-status-distribution.md b/fastn-context/NEXT-status-distribution.md new file mode 100644 index 000000000..f44ecefb9 --- /dev/null +++ b/fastn-context/NEXT-status-distribution.md @@ -0,0 +1,36 @@ +# NEXT: P2P Status Distribution + +Features for distributed status monitoring over P2P network. + +## Remote Status Access + +```bash +# Remote machine status over P2P +fastn status alice # Status from machine with alias "alice" +fastn status alice -w # Watch remote machine in real-time +fastn status alice,bob,prod # Multiple machines +``` + +## P2P Integration + +- Uses secure fastn remote access (same as `fastn rshell`) +- Status transmitted over encrypted P2P connections +- Requires target machine in `remote-access/config.toml` +- Real-time streaming for watch mode + +## Protocol + +```rust +// Built-in status commands +StatusRequest -> Status // One-time snapshot +StatusStreamProtocol -> Stream // Real-time updates +``` + +## Benefits + +- **Distributed monitoring** - Monitor entire fastn network from any machine +- **Secure access** - Uses same permissions as remote shell +- **No HTTP servers** - Uses P2P infrastructure only +- **Real-time** - Event-driven updates across network + +**Implementation**: After P2P streaming API + basic status system. \ No newline at end of file diff --git a/fastn-context/README-FULL.md b/fastn-context/README-FULL.md new file mode 100644 index 000000000..eb48306d9 --- /dev/null +++ b/fastn-context/README-FULL.md @@ -0,0 +1,844 @@ +# fastn-context: Hierarchical Application Context for Debugging and Operations + +This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. + +> **Note**: This README documents the complete design iteratively. Some sections may overlap as features build on each other. The design is internally consistent - later sections refine and extend earlier concepts. + +## Design Philosophy + +- **Hierarchical Structure**: Applications naturally form trees of operations +- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents +- **Zero Boilerplate**: Context trees build themselves as applications run +- **Production Ready**: Status trees enable debugging of stuck/slow operations +- **Bounded Complexity**: Simple spawn vs detailed child creation as needed + +## Core Concepts + +### Context Tree Structure + +Every fastn application forms a natural hierarchy: + +``` +Global Context (application level) +├── Service Context (e.g., "remote-access-listener") +│ ├── Session Context (e.g., "alice@bv478gen") +│ │ ├── Task Context (e.g., "stdout-handler") +│ │ └── Task Context (e.g., "stderr-stream") +│ └── Session Context (e.g., "bob@p2nd7avq") +├── Service Context (e.g., "http-proxy") +└── Service Context (e.g., "chat-service") +``` + +### Automatic Context Creation + +fastn-context integrates seamlessly with fastn ecosystem: + +```rust +// 1. Global context created by main macro +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically available +} + +// 2. Service contexts created by operations +let listener = fastn_p2p::server::listen(key, protocols).await?; +// Creates child context: "p2p-listener" under global + +// 3. Session contexts created per connection +// Each incoming connection gets child context: "session-{peer_id}" + +// 4. Task contexts created by spawn operations +session_ctx.child("shell-handler").spawn(handle_shell); +``` + +## API Reference + +### Core Context + +```rust +pub struct Context { + /// Context name for debugging/status + pub name: String, + + /// When this context was created + pub created_at: std::time::Instant, + + // Private: parent, children, cancellation, metrics, data +} + +impl Context { + /// Create new root context (typically only used by main macro) + pub fn new(name: &str) -> std::sync::Arc; + + /// Create child context with given name + pub fn child(&self, name: &str) -> ContextBuilder; + + /// Simple spawn (inherits current context, no child creation) + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle + where F: std::future::Future + Send + 'static; + + /// Wait for cancellation signal + pub async fn wait(&self); + + /// Cancel this context and all children recursively + pub fn cancel(&self); + + /// Add metric data for status reporting + pub fn add_metric(&self, key: &str, value: MetricValue); + + /// Store arbitrary data on this context + pub fn set_data(&self, key: &str, value: serde_json::Value); + + /// Get stored data + pub fn get_data(&self, key: &str) -> Option; + + /// Increment total counter (historical count) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (current active count) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (when operation completes) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} +``` + +### Context Builder + +```rust +pub struct ContextBuilder { + // Pre-created child context ready for configuration +} + +impl ContextBuilder { + /// Add initial data to context + pub fn with_data(self, key: &str, value: serde_json::Value) -> Self; + + /// Add initial metric to context + pub fn with_metric(self, key: &str, value: MetricValue) -> Self; + + /// Spawn task with this configured child context + pub fn spawn(self, task: F) -> tokio::task::JoinHandle + where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; +} +``` + +### Global Access + +```rust +/// Get the global application context +pub fn global() -> std::sync::Arc; + +/// Get current task's context (thread-local or task-local) +pub fn current() -> std::sync::Arc; + +/// Print status tree for debugging +pub fn status() -> StatusTree; +``` + +### Metric Types + +```rust +#[derive(Debug, Clone)] +pub enum MetricValue { + Counter(u64), + Gauge(f64), + Duration(std::time::Duration), + Text(String), + Bytes(u64), +} +``` + +## Usage Patterns + +### Simple Task Spawning + +```rust +// Inherit current context (no child creation) +let ctx = fastn_context::current(); +ctx.spawn(async { + // Simple background task +}); +``` + +### Detailed Task Spawning + +```rust +// Create child context with debugging info +ctx.child("remote-shell-handler") + .with_data("peer", alice_id52) + .with_data("shell", "bash") + .with_metric("commands_executed", 0) + .spawn(|task_ctx| async move { + // Task can update its own context + task_ctx.add_metric("commands_executed", cmd_count); + task_ctx.set_data("last_command", "ls -la"); + + // Task waits for its own cancellation + tokio::select! { + _ = task_ctx.wait() => { + println!("Shell handler cancelled"); + } + _ = handle_shell_session() => { + println!("Shell session completed"); + } + } + }); +``` + +### Status Tree Output + +``` +$ fastn status +Global Context (2h 15m 32s uptime) +├── Remote Access Listener (1h 45m active) +│ ├── alice@bv478gen (23m 12s, bash shell) +│ │ ├── stdout-handler (23m 12s, 15.2MB processed) +│ │ └── stderr-stream (18m 45s, 2.1KB processed) +│ └── bob@p2nd7avq (8m 33s, ls command) +│ └── command-executor (8m 33s, exit pending) +├── HTTP Proxy (2h 15m active) +│ ├── connection-pool (45 active, 1,234 requests) +│ └── request-handler-pool (12 workers active) +└── Chat Service (35m active) + ├── presence-monitor (35m, 15 users tracked) + └── message-relay (35m, 4,567 messages) +``` + +## Integration with fastn-p2p + +fastn-p2p depends on fastn-context and automatically creates context hierarchies: + +```rust +// fastn-p2p sessions provide access to their context +async fn handle_remote_shell(session: fastn_p2p::server::Session) { + let ctx = session.context(); // Auto-created by fastn-p2p + + // Simple spawn (inherits session context) + ctx.spawn(pipe_stdout(session.send)); + + // Detailed spawn (creates child for debugging) + ctx.child("command-executor") + .with_data("command", session.protocol.command) + .spawn(|task_ctx| async move { + let result = execute_command(&session.protocol.command).await; + task_ctx.set_data("exit_code", result.code); + }); +} +``` + +## Main Function Integration + +The main macro moves to fastn-context and sets up the global context: + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically created and available + + let ctx = fastn_context::global(); + ctx.child("startup") + .with_data("version", env!("CARGO_PKG_VERSION")) + .spawn(|_| async { + // Application initialization + }); +} +``` + +## Design Benefits + +1. **Names Required for Debugging** - Every important operation has a name in status tree +2. **Selective Complexity** - Simple spawn vs detailed child creation as needed +3. **Automatic Tree Building** - Context hierarchy builds as application runs +4. **Production Debugging** - `fastn status` shows exactly where system is stuck +5. **Clean Separation** - Context concerns separate from networking concerns +6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure + +**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. + +## Comprehensive Timing and Lock Monitoring + +Every context and operation tracks detailed timing for real-time debugging, including named lock monitoring for deadlock detection. + +### Timing Integration + +```rust +pub struct Context { + pub name: String, + pub created_at: std::time::Instant, // When context started + pub last_activity: std::sync::Arc>, // Last activity + // ... other fields +} + +impl Context { + /// Update last activity timestamp (called automatically by operations) + pub fn touch(&self); + + /// Get how long this context has been alive + pub fn duration(&self) -> std::time::Duration; + + /// Get how long since last activity + pub fn idle_duration(&self) -> std::time::Duration; + + /// Create named mutex within this context + pub fn mutex(&self, name: &str, data: T) -> ContextMutex; + + /// Create named RwLock within this context + pub fn rwlock(&self, name: &str, data: T) -> ContextRwLock; + + /// Create named semaphore within this context + pub fn semaphore(&self, name: &str, permits: usize) -> ContextSemaphore; +} +``` + +### Named Lock Types + +```rust +pub struct ContextMutex { + name: String, + context: std::sync::Arc, + inner: tokio::sync::Mutex, +} + +impl ContextMutex { + /// Lock with automatic status tracking + pub async fn lock(&self) -> ContextMutexGuard; +} + +pub struct ContextMutexGuard { + acquired_at: std::time::Instant, // When lock was acquired + context_name: String, // Which context holds it + lock_name: String, // Lock identifier + // Auto-reports to context status system + // Auto-cleanup on drop +} +``` + +### Detailed Status Output with Comprehensive Timing + +``` +$ fastn status +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +⏳ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK + +⚠️ Potential Issues: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O +``` + +### Automatic Activity Tracking + +```rust +// All operations automatically maintain timing +ctx.spawn(async { + // ctx.touch() called when task starts + loop { + do_work().await; + ctx.touch(); // Update activity timestamp + } +}); + +// Lock operations update timing automatically +let guard = ctx.mutex("data-lock", data).lock().await; +// Updates: context last_activity, tracks lock hold time + +// Long operations should periodically touch +async fn long_running_task(ctx: std::sync::Arc) { + loop { + process_batch().await; + ctx.touch(); // Show we're still active, not stuck + + tokio::select! { + _ = ctx.wait() => break, // Cancelled + _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {} + } + } +} +``` + +This provides **real-time operational debugging** - administrators can instantly identify stuck operations, deadlocked tasks, and performance bottlenecks with precise timing information. + +## Counter Management System + +Every context can track both historical totals and live counts for detailed operational metrics. + +### Global Counter Storage with Dotted Paths + +```rust +pub struct Context { + pub name: String, + pub full_path: String, // "global.remote-access.alice@bv478gen.stdout-handler" + // ... other fields +} + +impl Context { + /// Get full dotted path for this context + pub fn path(&self) -> &str; + + /// Increment total counter (stored in global hashmap by full path) + pub fn increment_total(&self, counter: &str); + + /// Increment live counter (stored in global hashmap by full path) + pub fn increment_live(&self, counter: &str); + + /// Decrement live counter (stored in global hashmap by full path) + pub fn decrement_live(&self, counter: &str); + + /// Get counter values (retrieved from global storage) + pub fn get_total(&self, counter: &str) -> u64; + pub fn get_live(&self, counter: &str) -> u64; +} + +// Global counter storage (persists beyond context lifetimes) +static GLOBAL_COUNTERS: LazyLock>> = ...; + +// Counter keys format: "{context_path}.{counter_name}" +// Examples: +// "global.connections" -> 1,247 +// "global.remote-access.connections" -> 234 +// "global.remote-access.alice@bv478gen.commands" -> 45 +// "global.http-proxy.requests" -> 1,013 +``` + +### Automatic Counter Integration + +```rust +// fastn-p2p automatically maintains connection counters +async fn handle_incoming_connection(session: fastn_p2p::server::Session) { + let ctx = session.context(); + + // Automatically tracked by fastn-p2p: + ctx.increment_total("connections"); // Total connections ever + ctx.increment_live("connections"); // Current active connections + + // Your handler code... + + // When session ends: + ctx.decrement_live("connections"); // Automatically called +} + +// Custom counters for application logic +async fn handle_remote_command(session: server::Session) { + let ctx = session.context(); + + ctx.increment_total("commands"); // Total commands executed + ctx.increment_live("commands"); // Currently executing commands + + let result = execute_command(&session.protocol.command).await; + + ctx.decrement_live("commands"); // Command completed + + if result.success { + ctx.increment_total("successful_commands"); + } else { + ctx.increment_total("failed_commands"); + } +} +``` + +### Enhanced Status Display with Counters + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Total: 1,247 connections, 15,432 requests | Live: 47 connections, 12 active requests +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── Total: 234 connections, 2,156 commands | Live: 2 connections, 3 commands +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── Total: 45 commands (42 success, 3 failed) | Live: 1 command +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ ├── Total: 12 commands (12 success) | Live: 1 command +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── Total: 1,013 requests (987 success, 26 failed) | Live: 45 connections, 8 requests +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── Total: 4,567 messages, 89 users joined | Live: 15 users, 3 conversations + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): ... +⏳ Lock Waiters (1): ... +``` + +### Counter Storage and Paths + +```rust +// Counter keys are automatically generated from context paths: + +// Global level counters +// "global.connections" -> 1,247 total +// "global.live_connections" -> 47 current + +// Service level counters +// "global.remote-access.connections" -> 234 total +// "global.remote-access.live_connections" -> 2 current + +// Session level counters +// "global.remote-access.alice@bv478gen.commands" -> 45 total +// "global.remote-access.alice@bv478gen.live_commands" -> 1 current + +// Task level counters +// "global.remote-access.alice@bv478gen.stdout-handler.bytes_processed" -> 1,234,567 + +// Examples in code: +async fn handle_connection(session: server::Session) { + let ctx = session.context(); // Path: "global.remote-access.alice@bv478gen" + + // These create global entries: + ctx.increment_total("commands"); // Key: "global.remote-access.alice@bv478gen.commands" + ctx.increment_live("commands"); // Key: "global.remote-access.alice@bv478gen.live_commands" + + // Nested task context + ctx.child("stdout-handler").spawn(|task_ctx| async move { + // task_ctx.path() -> "global.remote-access.alice@bv478gen.stdout-handler" + task_ctx.increment_total("bytes_processed"); + }); +} +``` + +### Persistent Counter Benefits + +- **✅ Survives context drops** - Counters stored globally, persist after contexts end +- **✅ Hierarchical aggregation** - Can sum all child counters for parent totals +- **✅ Path-based queries** - Easy to find counters by context path +- **✅ Historical tracking** - Total counters accumulate across all context instances +- **✅ Live tracking** - Live counters automatically decremented when contexts drop + +**Live counters** show current activity (auto-decremented on context drop). +**Total counters** show historical activity (persist forever for trending). +**Global storage** ensures metrics survive context lifecycles. + +## Status Monitoring and HTTP Dashboard + +fastn-context automatically provides multiple ways to access real-time status information for debugging and monitoring. + +### P2P Status Access + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Status automatically available over P2P for remote access + // No HTTP server needed - uses secure P2P connections + + // Your application code... +} +``` + +Status is accessible over the P2P network using the remote access system. + +### Status API Functions + +```rust +/// Get current status snapshot with ANSI formatting +pub fn status() -> Status; + +/// Stream of status updates (max once per second) +pub fn status_stream() -> impl futures_core::stream::Stream; + +/// Get raw status data as structured JSON +pub fn status_json() -> serde_json::Value; +``` + +### Status Type with ANSI Display + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct Status { + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct ContextStatus { + pub name: String, + pub duration: std::time::Duration, + pub last_activity: std::time::Duration, // Time since last activity + pub children: Vec, + pub metrics: std::collections::HashMap, + pub data: std::collections::HashMap, + pub total_counters: std::collections::HashMap, // Historical counts + pub live_counters: std::collections::HashMap, // Current active counts +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct LockStatus { + pub name: String, + pub held_by_context: String, + pub held_duration: std::time::Duration, + pub lock_type: LockType, // Mutex, RwLock, Semaphore +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct StatusWarning { + pub message: String, + pub context_path: String, + pub severity: WarningSeverity, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub enum WarningSeverity { + Info, // FYI information + Warning, // Potential issue + Critical, // Likely problem +} +``` + +### ANSI-Formatted Display + +```rust +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use colored::*; // For ANSI colors + + // Header with timestamp + writeln!(f, "{}", "fastn Status Dashboard".bold().blue())?; + writeln!(f, "{}", format!("Snapshot: {}", + humantime::format_rfc3339(self.timestamp)).dimmed())?; + writeln!(f)?; + + // Context tree with colors and timing + self.display_context_tree(f, &self.global_context, 0)?; + + // Active locks section + if !self.active_locks.is_empty() { + writeln!(f, "\n{} Active Locks ({}):", "🔒".yellow(), self.active_locks.len())?; + for lock in &self.active_locks { + let duration_str = humantime::format_duration(lock.held_duration); + let color = if lock.held_duration.as_secs() > 10 { + "red" + } else { + "white" + }; + writeln!(f, " - \"{}\" held by {} ({})", + lock.name.cyan(), + lock.held_by_context.white(), + duration_str.color(color))?; + } + } + + // Lock waiters section + if !self.lock_waiters.is_empty() { + writeln!(f, "\n{} Lock Waiters ({}):", "⏳".yellow(), self.lock_waiters.len())?; + for waiter in &self.lock_waiters { + let duration_str = humantime::format_duration(waiter.waiting_duration); + writeln!(f, " - {} waiting for \"{}\" ({})", + waiter.context_name.white(), + waiter.lock_name.cyan(), + duration_str.red())?; + } + } + + // Warnings section + if !self.warnings.is_empty() { + writeln!(f, "\n{} Warnings:", "⚠️".red())?; + for warning in &self.warnings { + let icon = match warning.severity { + WarningSeverity::Info => "ℹ️", + WarningSeverity::Warning => "⚠️", + WarningSeverity::Critical => "🚨", + }; + writeln!(f, " {} {}", icon, warning.message.yellow())?; + } + } + + Ok(()) + } +} +``` + +### Status Stream (Event-Driven Updates) + +```rust +/// Stream provides updates only when context tree actually changes +/// No polling - efficient for long-running monitoring +let mut status_stream = fastn_context::status_stream(); +while let Some(status) = status_stream.next().await { + // Only prints when something actually changes + print!("\x1B[2J\x1B[H"); // Clear screen + println!("{}", status); // Display with colors +} +``` + +### CLI Integration with P2P Status Access + +fastn-context integrates with the main fastn CLI to provide both local and remote status access: + +```bash +# Local machine status +fastn status # One-time snapshot with ANSI colors +fastn status -w # Watch mode (event-driven, no polling) +fastn status --json # JSON output for programmatic use + +# Remote machine status over P2P (requires remote access) +fastn status alice # Status from machine with alias "alice" +fastn status bv478gen... # Status from machine with ID52 +fastn status alice -w # Watch remote machine's status in real-time +fastn status alice --json # Remote machine status as JSON + +# Multiple machines +fastn status alice,bob,prod # Status from multiple machines +``` + +**P2P Status Protocol:** +- Uses secure fastn remote access (same as `fastn rshell`) +- Requires target machine in your `remote-access/config.toml` +- Status data transmitted over encrypted P2P connection +- Real-time streaming for remote watch mode + +### Status Protocol Integration + +Status access integrates seamlessly with fastn's remote access system: + +```rust +// Status is available as a built-in remote command +// When fastn-daemon receives status requests, fastn-context provides the data + +// Server side - automatic status command handling +// fastn-daemon automatically handles: +// - StatusRequest -> returns current Status +// - StatusStreamRequest -> returns real-time Status stream + +// Client side - transparent remote access +fastn status alice // Translates to fastn_p2p::client::call(alice, StatusRequest) +fastn status alice -w // Translates to fastn_p2p::client::connect(alice, StatusStreamProtocol) +``` + +This gives **comprehensive status access** - terminal, HTTP, streaming, and programmatic - all from the same underlying Status structure with rich ANSI formatting for human consumption. + +## System Metrics Monitoring + +fastn-context automatically monitors system resources and integrates them into the status display. + +### Automatic System Monitoring + +```rust +#[derive(Debug, Clone, serde::Serialize)] +pub struct SystemMetrics { + pub cpu_usage_percent: f32, // Current CPU usage + pub memory_used_bytes: u64, // RAM usage + pub memory_total_bytes: u64, // Total RAM + pub disk_used_bytes: u64, // Disk usage + pub disk_total_bytes: u64, // Total disk + pub network_rx_bytes_per_sec: u64, // Network receive rate + pub network_tx_bytes_per_sec: u64, // Network transmit rate + pub load_average: [f32; 3], // 1min, 5min, 15min load + pub uptime: std::time::Duration, // System uptime +} + +// Added to Status structure +pub struct Status { + pub system_metrics: SystemMetrics, // System resource usage + pub global_context: ContextStatus, + pub active_locks: Vec, + pub lock_waiters: Vec, + pub warnings: Vec, + pub timestamp: std::time::SystemTime, +} +``` + +### Efficient Metric Collection + +```rust +// System metrics cached and updated appropriately: +// - CPU usage: Updated every 1 second (smooth average) +// - Memory/disk: Updated every 5 seconds (less volatile) +// - Network rates: Updated every 1 second (calculated from deltas) +// - Load average: Updated every 10 seconds (system provides this) + +// Metrics only recalculated when status is actually requested +// No background polling unless someone is watching +``` + +### Enhanced Status Display with System Info + +``` +$ fastn status +fastn Status Dashboard +System: CPU 12.3% | RAM 2.1GB/16GB (13%) | Disk 45GB/500GB (9%) | Load 0.8,1.2,1.5 +Network: ↓ 125KB/s ↑ 67KB/s | Uptime 5d 12h 45m + +Global Context (2h 15m 32s uptime, active 0.1s ago) +├── Remote Access Listener (1h 45m active, last activity 2.3s ago) +│ ├── alice@bv478gen (23m 12s connected, active 0.5s ago) +│ │ ├── stdout-handler (23m 12s running, CPU active) +│ │ │ └── 🔒 HOLDS "session-output-lock" (12.3s held) +│ │ └── stderr-stream (18m 45s running, idle 8.1s) +│ │ └── ⏳ WAITING "session-output-lock" (8.1s waiting) ⚠️ STUCK +│ └── bob@p2nd7avq (8m 33s connected, active 0.1s ago) +│ └── command-executor (8m 33s running, exit pending) +├── HTTP Proxy (2h 15m active, last request 0.8s ago) +│ ├── connection-pool (2h 15m running, 45 connections, oldest 34m 12s) +│ └── 🔒 HOLDS "pool-resize-lock" (0.2s held) +└── Chat Service (35m active, last message 1.2s ago) + ├── presence-monitor (35m running, heartbeat 30s ago) + └── message-relay (35m running, processing queue) + +🔒 Active Locks (3): + - "session-output-lock" held by alice/stdout-handler (12.3s) ⚠️ LONG HELD + - "user-table-write-lock" held by user-service/db-writer (0.1s) + - "pool-resize-lock" held by http-proxy/connection-pool (0.2s) + +⏳ Lock Waiters (1): + - alice/stderr-stream waiting for "session-output-lock" (8.1s) ⚠️ STUCK + +⚠️ Alerts: + - Long-held lock "session-output-lock" (12.3s) may indicate deadlock + - stderr-stream stuck waiting (8.1s) suggests blocked I/O + - CPU usage normal (12.3%), memory usage low (13%) +``` + +### Watch Mode (`fastn status -w`) + +```rust +// Event-driven updates - only when something changes +// No CPU overhead when system is idle +// Immediately shows when new contexts/locks appear or disappear + +$ fastn status -w +# Screen updates only when: +# - New context created/destroyed +# - Lock acquired/released +# - Significant activity changes +# - System metrics cross thresholds +# - No updates for days if system is stable +``` + +This provides **complete operational visibility** with both application-specific context trees and system resource monitoring, all with efficient event-driven updates instead of wasteful polling. \ No newline at end of file diff --git a/fastn-context/README.md b/fastn-context/README.md new file mode 100644 index 000000000..156f95c46 --- /dev/null +++ b/fastn-context/README.md @@ -0,0 +1,337 @@ +# fastn-context: Hierarchical Application Context for Debugging and Operations + +This crate provides a hierarchical context system for fastn applications, enabling tree-based cancellation, metrics collection, and operational visibility. It forms the operational backbone for all fastn services. + +## Design Philosophy + +- **Hierarchical Structure**: Applications naturally form trees of operations +- **Automatic Inheritance**: Child contexts inherit cancellation and settings from parents +- **Zero Boilerplate**: Context trees build themselves as applications run +- **Production Ready**: Status trees enable debugging of stuck/slow operations +- **Bounded Complexity**: Simple spawn vs detailed child creation as needed + +## Core Concepts + +### Context Tree Structure + +Every fastn application forms a natural hierarchy: + +``` +Global Context (application level) +├── Service Context (e.g., "remote-access-listener") +│ ├── Session Context (e.g., "alice@bv478gen") +│ │ ├── Task Context (e.g., "stdout-handler") +│ │ └── Task Context (e.g., "stderr-stream") +│ └── Session Context (e.g., "bob@p2nd7avq") +├── Service Context (e.g., "http-proxy") +└── Service Context (e.g., "chat-service") +``` + +### Automatic Context Creation + +fastn-context integrates seamlessly with fastn ecosystem: + +```rust +// 1. Global context created by main macro +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically available +} + +// 2. Service contexts created by operations +let listener = fastn_p2p::server::listen(key, protocols).await?; +// Creates child context: "p2p-listener" under global + +// 3. Session contexts created per connection +// Each incoming connection gets child context: "session-{peer_id}" + +// 4. Task contexts created by spawn operations +session_ctx.child("shell-handler").spawn(handle_shell); +``` + +## API Reference + +### Core Context + +```rust +pub struct Context { + /// Context name for debugging/status + pub name: String, + + // Private: parent, children, cancellation_token +} + +impl Context { + /// Create new root context (typically only used by main macro) + pub fn new(name: &str) -> std::sync::Arc; + + /// Create child context with given name + pub fn child(&self, name: &str) -> ContextBuilder; + + /// Simple spawn (inherits current context, no child creation) + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle + where F: std::future::Future + Send + 'static; + + /// Spawn task with named child context (common case shortcut) + pub fn spawn_child(&self, name: &str, task: F) -> tokio::task::JoinHandle + where + F: FnOnce(std::sync::Arc) -> Fut + Send + 'static, + Fut: std::future::Future + Send + 'static; + + /// Wait for cancellation signal (returns Future for tokio::select!) + pub fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_>; + + /// Cancel this context and all children recursively + pub fn cancel(&self); + +} +``` + +### Context Builder + +```rust +pub struct ContextBuilder { + // Pre-created child context ready for spawning +} + +impl ContextBuilder { + /// Spawn task with this child context + pub fn spawn(self, task: F) -> tokio::task::JoinHandle + where F: FnOnce(std::sync::Arc) -> Fut + Send + 'static; +} +``` + +#### Global Access + +```rust +/// Get the global application context +pub fn global() -> std::sync::Arc; +``` + +### Status Display + +```rust +/// Get current status snapshot of entire context tree +pub fn status() -> Status; + +/// Get status including recent completed contexts (distributed tracing) +pub fn status_with_latest() -> Status; + +#[derive(Debug, Clone)] +pub struct Status { + pub global_context: ContextStatus, + pub persisted_contexts: Option>, // Recent completed contexts + pub timestamp: std::time::SystemTime, +} + +#[derive(Debug, Clone)] +pub struct ContextStatus { + pub name: String, + pub is_cancelled: bool, + pub duration: std::time::Duration, + pub children: Vec, +} + +#[derive(Debug, Clone)] +pub struct PersistedContext { + pub name: String, + pub context_path: String, + pub duration: std::time::Duration, + pub completion_time: std::time::SystemTime, + pub success: bool, + pub message: String, +} + +impl std::fmt::Display for Status { + // ANSI-formatted display with tree structure and status icons +} +``` + + +## Usage Patterns + +### Simple Task Spawning + +```rust +// Simple named task spawning (common case) +let ctx = fastn_context::global(); // or passed as parameter +ctx.spawn_child("background-task", |task_ctx| async move { + // Simple background task with explicit context + println!("Running in context: {}", task_ctx.name); +}); + +// Alternative: builder pattern for simple case +ctx.child("background-task") + .spawn(|task_ctx| async move { + // Same result, different syntax + println!("Running in context: {}", task_ctx.name); + }); +``` + +### Status Monitoring + +```rust +// Get current context tree status +let status = fastn_context::status(); +println!("{}", status); + +// Example output: +// fastn Context Status +// ✅ global (2h 15m, active) +// ✅ remote-access-listener (1h 45m, active) +// ✅ alice@bv478gen (23m, active) +// ✅ stdout-handler (23m, active) +// ✅ startup-task (2h 15m, active) + +// With recent completed contexts: +let status = fastn_context::status_with_latest(); +// Shows live contexts + recent completed ones +``` + +### Context Persistence (Distributed Tracing) + +```rust +// P2P stream handler example +async fn handle_p2p_stream(session: Session) { + let ctx = session.context(); // "global.p2p.alice@bv478gen.stream-456" + + let result = process_stream().await; + + // Persist completed context for tracing + ctx.complete_with_status(result.is_ok(), &format!("Processed {} bytes", result.bytes)); + // -> Logs trace, adds to circular buffer, sends to external systems +} + +// HTTP request handler example +async fn handle_http_request(request: HttpRequest) { + let ctx = request.context(); // "global.http.request-789" + + let result = process_request().await; + + // Persist with completion info + ctx.complete_with_status(result.status.is_success(), &result.summary); +} +``` + +### Enhanced Status Display + +``` +$ fastn status --include-latest +✅ global (2h 15m, active) + ✅ p2p-listener (1h 45m, active) + ✅ alice@bv478gen (23m, active, 3 live streams) + +Recent completed contexts (last 10): +- global.p2p.alice@bv478gen.stream-455 (2.3s, success: "Processed 1.2MB") +- global.p2p.bob@p2nd7avq.stream-454 (1.1s, success: "Processed 512KB") +- global.http.request-123 (0.8s, failed: "Database timeout") +- global.p2p.alice@bv478gen.stream-453 (4.1s, success: "Processed 2.1MB") +``` + +### Cancellation Handling + +```rust +// Task waits for context cancellation (proper select pattern) +ctx.spawn_child("shell-handler", |task_ctx| async move { + println!("Shell handler starting: {}", task_ctx.name); + + // Proper cancellation in select (non-blocking) + tokio::select! { + _ = task_ctx.cancelled() => { + println!("Shell handler cancelled"); + } + result = handle_shell_session() => { + println!("Shell session completed: {:?}", result); + } + data = connection.accept() => { + println!("Got connection: {:?}", data); + } + } +}); +``` + +## Integration with fastn-p2p + +fastn-p2p depends on fastn-context and automatically creates context hierarchies: + +```rust +// fastn-p2p sessions provide access to their context +async fn handle_remote_shell(session: fastn_p2p::server::Session) { + let ctx = session.context(); // Auto-created by fastn-p2p + + // Simple spawn (inherits session context) + ctx.spawn(pipe_stdout(session.send)); + + // Named child spawn for debugging + ctx.spawn_child("command-executor", |task_ctx| async move { + println!("Executing command in context: {}", task_ctx.name); + let result = execute_command(&session.protocol.command).await; + println!("Command completed with: {:?}", result); + }); +} +``` + +## Main Function Integration + +The main macro sets up the global context and provides comprehensive configuration: + +```rust +#[fastn_context::main] +async fn main() -> eyre::Result<()> { + // Global context automatically created and available + + let ctx = fastn_context::global(); + ctx.spawn_child("startup", |startup_ctx| async move { + println!("Application starting: {}", startup_ctx.name); + // Application initialization + }); +} +``` + +### Configuration Options + +```rust +#[fastn_context::main( + // Logging configuration + logging = true, // Default: true - simple logging setup + + // Shutdown behavior + shutdown_mode = "single_ctrl_c", // Default: "single_ctrl_c" + shutdown_timeout = "30s", // Default: "30s" - graceful shutdown timeout + + // Double Ctrl+C specific (only when shutdown_mode = "double_ctrl_c") + double_ctrl_c_window = "2s", // Default: "2s" - time window for second Ctrl+C + status_fn = my_status_printer, // Required for double_ctrl_c mode +)] +async fn main() -> eyre::Result<()> { + // Your application code +} + +// Status function (required for double_ctrl_c mode) +async fn my_status_printer() { + println!("=== Application Status ==="); + // Custom status logic - access global registries, counters, etc. + println!("Active services: {}", get_active_service_count()); +} +``` + +## Design Benefits + +1. **Names Required for Debugging** - Every important operation has a name in status tree +2. **Selective Complexity** - Simple spawn vs detailed child creation as needed +3. **Automatic Tree Building** - Context hierarchy builds as application runs +4. **Production Debugging** - Status trees show exactly where system is stuck +5. **Clean Separation** - Context concerns separate from networking concerns +6. **Ecosystem Wide** - All fastn crates can use the same context infrastructure + +**Key Insight**: Names aren't optional - they're essential for production debugging and operational visibility. + +## Future Features + +See NEXT-*.md files for planned enhancements: + +- **NEXT-metrics-and-data.md**: Metric storage and arbitrary data on contexts +- **NEXT-monitoring.md**: Status trees, timing, system metrics monitoring +- **NEXT-locks.md**: Named locks and deadlock detection +- **NEXT-counters.md**: Global counter storage with dotted paths +- **NEXT-status-distribution.md**: P2P distributed status access diff --git a/fastn-context/examples/minimal_test.rs b/fastn-context/examples/minimal_test.rs new file mode 100644 index 000000000..6a6bf7c77 --- /dev/null +++ b/fastn-context/examples/minimal_test.rs @@ -0,0 +1,68 @@ +/// Test the minimal fastn-context API needed for fastn-p2p integration +/// This validates our basic Context design before implementation + +#[fastn_context::main] +async fn main() -> Result<(), Box> { + println!("Testing minimal fastn-context API..."); + + // Global context should be automatically available + let global_ctx = fastn_context::global(); + println!("Global context created: {}", global_ctx.name); + + // Test basic child creation with builder + global_ctx + .child("test-service") + .spawn(|service_ctx| async move { + println!("Service context created: {}", service_ctx.name); + + // Test cancellation with proper select pattern + tokio::select! { + _ = service_ctx.cancelled() => { + println!("Service context cancelled"); + } + _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => { + println!("Service context completed"); + } + } + }); + + // Test global context functionality + println!("Global context is cancelled: {}", global_ctx.is_cancelled()); + + // Give tasks time to run and build tree + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + // Test status display + println!("\n=== Context Tree Status ==="); + let status = fastn_context::status(); + println!("{}", status); + + // Test persistence functionality + global_ctx.spawn_child("persist-test", |task_ctx| async move { + tokio::time::sleep(tokio::time::Duration::from_millis(30)).await; + task_ctx.persist(); + }); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Test status with persisted contexts + println!("\n=== Status with Persisted Contexts ==="); + let status_with_latest = fastn_context::status_with_latest(); + println!("{}", status_with_latest); + + // Test persistence functionality + global_ctx.spawn_child("persist-test", |task_ctx| async move { + tokio::time::sleep(tokio::time::Duration::from_millis(30)).await; + task_ctx.persist(); + }); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Test status with persisted contexts + println!("\n=== Status with Persisted Contexts ==="); + let status_with_latest = fastn_context::status_with_latest(); + println!("{}", status_with_latest); + + println!("Basic API test completed!"); + Ok(()) +} diff --git a/fastn-context/src/context.rs b/fastn-context/src/context.rs new file mode 100644 index 000000000..b03421e5e --- /dev/null +++ b/fastn-context/src/context.rs @@ -0,0 +1,158 @@ +/// Hierarchical context for task management and cancellation +pub struct Context { + /// Context name for debugging + pub name: String, + + /// When this context was created + pub created_at: std::time::Instant, + + /// Parent context (None for root) + parent: Option>, + + /// Child contexts + children: std::sync::Arc>>>, + + /// Cancellation token (proper async cancellation) + cancellation_token: tokio_util::sync::CancellationToken, +} + +impl Context { + /// Create new root context (typically only used by main macro) + pub fn new(name: &str) -> std::sync::Arc { + std::sync::Arc::new(Context { + name: name.to_string(), + created_at: std::time::Instant::now(), + parent: None, + children: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())), + cancellation_token: tokio_util::sync::CancellationToken::new(), + }) + } + + /// Create child context + pub fn child(&self, name: &str) -> ContextBuilder { + let child_context = std::sync::Arc::new(Context { + name: name.to_string(), + created_at: std::time::Instant::now(), + parent: Some(std::sync::Arc::new(self.clone())), + children: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())), + cancellation_token: self.cancellation_token.child_token(), + }); + + // Add to parent's children list + if let Ok(mut children) = self.children.lock() { + children.push(child_context.clone()); + } + + ContextBuilder { + context: child_context, + } + } + + /// Simple spawn (inherits current context, no child creation) + pub fn spawn(&self, task: F) -> tokio::task::JoinHandle + where + F: std::future::Future + Send + 'static, + F::Output: Send + 'static, + { + tokio::spawn(task) + } + + /// Spawn task with named child context (common case shortcut) + pub fn spawn_child(&self, name: &str, task: F) -> tokio::task::JoinHandle + where + F: FnOnce(std::sync::Arc) -> Fut + Send + 'static, + Fut: std::future::Future + Send + 'static, + Fut::Output: Send + 'static, + { + let child_ctx = self.child(name); + child_ctx.spawn(task) + } + + /// Wait for cancellation signal (for use in tokio::select!) + pub async fn wait(&self) { + // Poll-based future that completes when cancelled + loop { + if self.is_cancelled() { + return; + } + // Yield to allow other tasks to run, then check again + tokio::task::yield_now().await; + } + } + + /// Wait for cancellation signal (returns proper Future for tokio::select!) + pub fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> { + self.cancellation_token.cancelled() + } + + /// Check if this context is cancelled + pub fn is_cancelled(&self) -> bool { + self.cancellation_token.is_cancelled() + } + + /// Cancel this context and all children recursively + pub fn cancel(&self) { + self.cancellation_token.cancel(); + } + + /// Mark this context for persistence (distributed tracing) + pub fn persist(&self) { + let context_status = self.status(); + crate::status::add_persisted_context(context_status); + } + + /// Get status information for this context and all children + pub fn status(&self) -> crate::status::ContextStatus { + let children = if let Ok(children_lock) = self.children.lock() { + children_lock.iter().map(|child| child.status()).collect() + } else { + Vec::new() + }; + + crate::status::ContextStatus { + name: self.name.clone(), + is_cancelled: self.is_cancelled(), + duration: self.created_at.elapsed(), + children, + } + } +} + +impl Clone for Context { + fn clone(&self) -> Self { + Context { + name: self.name.clone(), + created_at: self.created_at, + parent: self.parent.clone(), + children: self.children.clone(), + cancellation_token: self.cancellation_token.clone(), + } + } +} + +/// Builder for configuring child contexts before spawning +pub struct ContextBuilder { + pub(crate) context: std::sync::Arc, +} + +impl ContextBuilder { + /// Spawn task with this child context + pub fn spawn(self, task: F) -> tokio::task::JoinHandle + where + F: FnOnce(std::sync::Arc) -> Fut + Send + 'static, + Fut: std::future::Future + Send + 'static, + Fut::Output: Send + 'static, + { + let context = self.context; + tokio::spawn(async move { task(context).await }) + } +} + +/// Global context storage +static GLOBAL_CONTEXT: std::sync::LazyLock> = + std::sync::LazyLock::new(|| Context::new("global")); + +/// Get the global application context +pub fn global() -> std::sync::Arc { + GLOBAL_CONTEXT.clone() +} diff --git a/fastn-context/src/lib.rs b/fastn-context/src/lib.rs new file mode 100644 index 000000000..e49a69954 --- /dev/null +++ b/fastn-context/src/lib.rs @@ -0,0 +1,14 @@ +#![warn(unused_extern_crates)] +#![deny(unused_crate_dependencies)] + +use tokio as _; // used by main macro +use tokio_util as _; // used for cancellation tokens + +mod context; +mod status; + +pub use context::{Context, ContextBuilder, global}; +pub use status::{ContextStatus, Status, status, status_with_latest}; + +// Re-export main macro +pub use fastn_context_macros::main; diff --git a/fastn-context/src/status.rs b/fastn-context/src/status.rs new file mode 100644 index 000000000..17fb73e74 --- /dev/null +++ b/fastn-context/src/status.rs @@ -0,0 +1,145 @@ +/// Status snapshot of the context tree +#[derive(Debug, Clone)] +pub struct Status { + pub global_context: ContextStatus, + pub persisted_contexts: Option>, + pub timestamp: std::time::SystemTime, +} + +/// Status information for a single context +#[derive(Debug, Clone)] +pub struct ContextStatus { + pub name: String, + pub is_cancelled: bool, + pub duration: std::time::Duration, + pub children: Vec, +} + +/// Global storage for persisted contexts (circular buffer) +static PERSISTED_CONTEXTS: std::sync::LazyLock< + std::sync::RwLock>, +> = std::sync::LazyLock::new(|| std::sync::RwLock::new(std::collections::VecDeque::new())); + +/// Maximum number of persisted contexts to keep (configurable via env) +const MAX_PERSISTED_CONTEXTS: usize = 10; // TODO: Make configurable via env var + +/// Add a context to the persisted contexts circular buffer +pub fn add_persisted_context(context_status: ContextStatus) { + if let Ok(mut contexts) = PERSISTED_CONTEXTS.write() { + // Add to front + contexts.push_front(context_status.clone()); + + // Keep only max number + if contexts.len() > MAX_PERSISTED_CONTEXTS { + contexts.pop_back(); + } + } + + // Log as trace event + println!( + "TRACE: {} completed in {:?}", + context_status.name, context_status.duration + ); +} + +/// Get current status snapshot of entire context tree +pub fn status() -> Status { + Status { + global_context: crate::context::global().status(), + persisted_contexts: None, + timestamp: std::time::SystemTime::now(), + } +} + +/// Get status including recent completed contexts (distributed tracing) +pub fn status_with_latest() -> Status { + let persisted = if let Ok(contexts) = PERSISTED_CONTEXTS.read() { + Some(contexts.iter().cloned().collect()) + } else { + None + }; + + Status { + global_context: crate::context::global().status(), + persisted_contexts: persisted, + timestamp: std::time::SystemTime::now(), + } +} + +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "fastn Context Status")?; + writeln!(f, "Snapshot: {:?}", self.timestamp)?; + writeln!(f)?; + + Self::display_context(&self.global_context, f, 0)?; + + // Show persisted contexts if included + if let Some(persisted) = &self.persisted_contexts + && !persisted.is_empty() + { + writeln!(f, "\nRecent completed contexts (last {}):", persisted.len())?; + for ctx in persisted { + let duration_str = if ctx.duration.as_secs() > 60 { + format!( + "{}m {}s", + ctx.duration.as_secs() / 60, + ctx.duration.as_secs() % 60 + ) + } else { + format!("{:.1}s", ctx.duration.as_secs_f64()) + }; + + let status_str = if ctx.is_cancelled { + "cancelled" + } else { + "completed" + }; + writeln!(f, "- {} ({}, {})", ctx.name, duration_str, status_str)?; + } + } + + Ok(()) + } +} + +impl Status { + fn display_context( + ctx: &ContextStatus, + f: &mut std::fmt::Formatter<'_>, + depth: usize, + ) -> std::fmt::Result { + let indent = " ".repeat(depth); + let status_icon = if ctx.is_cancelled { "❌" } else { "✅" }; + + let duration_str = if ctx.duration.as_secs() > 60 { + format!( + "{}m {}s", + ctx.duration.as_secs() / 60, + ctx.duration.as_secs() % 60 + ) + } else { + format!("{:.1}s", ctx.duration.as_secs_f64()) + }; + + writeln!( + f, + "{}{} {} ({}, {})", + indent, + status_icon, + ctx.name, + duration_str, + if ctx.is_cancelled { + "cancelled" + } else { + "active" + } + )?; + + for child in &ctx.children { + Self::display_context(child, f, depth + 1)?; + } + + Ok(()) + } +} diff --git a/v0.5/fastn-p2p/README.md b/v0.5/fastn-p2p/README.md index 8498f58e6..8644ab48b 100644 --- a/v0.5/fastn-p2p/README.md +++ b/v0.5/fastn-p2p/README.md @@ -386,4 +386,4 @@ See the `/tests` directory for complete working examples: For advanced use cases that need direct access to `fastn_net::Graceful`, you can still access it through `fastn_p2p::globals`, but this is discouraged for most applications. -The `#[fastn_p2p::main]` approach handles 99% of use cases while providing excellent ergonomics and maintainability. \ No newline at end of file +The `#[fastn_p2p::main]` approach handles 99% of use cases while providing excellent ergonomics and maintainability.