diff --git a/flake.nix b/flake.nix index e6c063d..31e61f9 100644 --- a/flake.nix +++ b/flake.nix @@ -48,7 +48,7 @@ cargoLock = { lockFile = ./Cargo.lock; outputHashes = { - "iroh-proxy-utils-0.1.0" = "sha256-ZV71q22zCWBqFdrc0jzkwyQdVc/H0r0BBB6dKrNARr8="; + "iroh-proxy-utils-0.1.0" = "sha256-gBEL8FiUhHZ6fI4R/EmngZp+eUR/PcwHYYby9YpIUk8="; "dioxus-primitives-0.0.1" = "sha256-gN0cb0Icp0S/Oi7eddfwfoN9PHhdlID2BKzdeP5j8PM="; }; }; @@ -83,7 +83,7 @@ cargoLock = { lockFile = ./Cargo.lock; outputHashes = { - "iroh-proxy-utils-0.1.0" = "sha256-ZV71q22zCWBqFdrc0jzkwyQdVc/H0r0BBB6dKrNARr8="; + "iroh-proxy-utils-0.1.0" = "sha256-gBEL8FiUhHZ6fI4R/EmngZp+eUR/PcwHYYby9YpIUk8="; "dioxus-primitives-0.0.1" = "sha256-gN0cb0Icp0S/Oi7eddfwfoN9PHhdlID2BKzdeP5j8PM="; }; }; diff --git a/lib/src/node.rs b/lib/src/node.rs index c99ff1f..4dbb3ea 100644 --- a/lib/src/node.rs +++ b/lib/src/node.rs @@ -1,23 +1,21 @@ -use std::{fmt::Debug, net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; +use std::{fmt::Debug, net::SocketAddr, str::FromStr, sync::Arc}; use iroh::{ Endpoint, EndpointId, SecretKey, discovery::dns::DnsDiscovery, endpoint::default_relay_mode, protocol::Router, }; use iroh_n0des::ApiSecret; -use iroh_proxy_utils::{ALPN as IROH_HTTP_CONNECT_ALPN, HttpProxyRequest, HttpProxyRequestKind}; +use iroh_proxy_utils::upstream::UpstreamMetrics; +use iroh_proxy_utils::{ + ALPN as IROH_HTTP_CONNECT_ALPN, Authority, HttpProxyRequest, HttpProxyRequestKind, +}; use iroh_proxy_utils::{ downstream::{DownstreamProxy, EndpointAuthority, ProxyMode}, upstream::{AuthError, AuthHandler, UpstreamProxy}, }; use iroh_relay::dns::{DnsProtocol, DnsResolver}; use n0_error::{Result, StackResultExt, StdResultExt}; -use n0_future::task::AbortOnDropHandle; -use tokio::{ - net::TcpListener, - sync::{broadcast, futures::Notified}, - task::JoinHandle, -}; +use tokio::{net::TcpListener, sync::futures::Notified, task::JoinHandle}; use tracing::{Instrument, debug, error_span, info, instrument, warn}; use crate::{ProxyState, Repo, StateWrapper, TcpProxyData, config::Config}; @@ -47,9 +45,8 @@ pub struct ListenNode { router: Router, state: StateWrapper, repo: Repo, + metrics: Arc, _n0des: Option>, - metrics_tx: broadcast::Sender, - _metrics_task: Arc>, } impl ListenNode { @@ -70,43 +67,17 @@ impl ListenNode { let state = repo.load_state().await?; let upstream_proxy = UpstreamProxy::new(state.clone())?; + let metrics = upstream_proxy.metrics(); let router = Router::builder(endpoint) .accept(IROH_HTTP_CONNECT_ALPN, upstream_proxy) .spawn(); - let (metrics_tx, _) = broadcast::channel(1); - - let metrics_update_interval = Duration::from_millis(100); - let metrics_task = tokio::spawn( - { - let endpoint = router.endpoint().clone(); - let metrics_tx = metrics_tx.clone(); - async move { - loop { - let metrics = endpoint.metrics(); - let recv_total = metrics.magicsock.recv_data_ipv4.get() - + metrics.magicsock.recv_data_ipv6.get() - + metrics.magicsock.recv_data_relay.get(); - let send_total = metrics.magicsock.send_data.get(); - let update = MetricsUpdate { - send: send_total, - recv: recv_total, - }; - metrics_tx.send(update).ok(); - n0_future::time::sleep(metrics_update_interval).await; - } - } - } - .instrument(error_span!("metrics")), - ); - let this = Self { repo, router, state, - metrics_tx, - _metrics_task: Arc::new(AbortOnDropHandle::new(metrics_task)), + metrics, _n0des: n0des, }; Ok(this) @@ -120,8 +91,8 @@ impl ListenNode { &self.state } - pub fn metrics(&self) -> broadcast::Receiver { - self.metrics_tx.subscribe() + pub fn metrics(&self) -> &Arc { + &self.metrics } pub fn proxies(&self) -> Vec { @@ -220,9 +191,9 @@ impl AuthHandler for StateWrapper { } } HttpProxyRequestKind::Absolute { target, .. } => { - let target_str = target.to_string(); - if let Some((host, port)) = parse_host_port_from_url(&target_str) { - if self.tcp_proxy_exists(&host, port) { + // Parse host:port from absolute URL (e.g., "http://localhost:5173/path") + if let Ok(authority) = Authority::from_absolute_uri(&target) { + if self.tcp_proxy_exists(&authority.host, authority.port) { Ok(()) } else { Err(AuthError::Forbidden) @@ -236,30 +207,6 @@ impl AuthHandler for StateWrapper { } } -/// Parse host and port from an absolute URL (e.g., "http://localhost:5173/path") -fn parse_host_port_from_url(url: &str) -> Option<(String, u16)> { - // Remove scheme - let without_scheme = url - .strip_prefix("http://") - .or_else(|| url.strip_prefix("https://"))?; - - // Split off the path - let authority = without_scheme.split('/').next()?; - - // Split host and port - if let Some((host, port_str)) = authority.rsplit_once(':') { - let port = port_str.parse().ok()?; - Some((host.to_string(), port)) - } else { - // Default ports - if url.starts_with("https://") { - Some((authority.to_string(), 443)) - } else { - Some((authority.to_string(), 80)) - } - } -} - #[derive(Debug, Clone)] pub struct ConnectNode { endpoint: Endpoint, diff --git a/lib/src/tunnels.rs b/lib/src/tunnels.rs index fe18053..1615fb9 100644 --- a/lib/src/tunnels.rs +++ b/lib/src/tunnels.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, HashMap}; +use iroh_proxy_utils::Authority; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams}; use kube::{Api, ResourceExt}; @@ -63,6 +64,16 @@ pub struct TunnelSummary { pub programmed: bool, } +impl TunnelSummary { + // TODO(Frando): this should all be cleared up and use more common types instead of + // converting around wildly. + pub fn origin_authority(&self) -> Option { + TcpProxyData::from_host_port_str(&strip_scheme(&self.endpoint)) + .ok() + .map(Authority::from) + } +} + #[derive(Debug, Clone)] pub struct TunnelDeleteOutcome { pub project_id: String, diff --git a/ui/src/components/bandwidth_timeseries_chart.rs b/ui/src/components/bandwidth_timeseries_chart.rs deleted file mode 100644 index 86bd6c7..0000000 --- a/ui/src/components/bandwidth_timeseries_chart.rs +++ /dev/null @@ -1,239 +0,0 @@ -// use chrono::{DateTime, Local}; -// use dioxus::prelude::*; -// use crate::util::humanize_bytes; - -// #[derive(Debug, Clone, PartialEq)] -// pub struct ChartData { -// pub ts: DateTime, -// pub send: u64, -// pub recv: u64, -// } - -// impl std::default::Default for ChartData { -// fn default() -> Self { -// Self { -// ts: Local::now(), -// send: 0, -// recv: 0, -// } -// } -// } - -// #[derive(PartialEq, Clone, Props)] -// pub struct BwTsChartProps { -// pub data: Vec, -// } -// #[component] -// pub fn BwTsChart(props: BwTsChartProps) -> Element { -// let data = &props.data; - -// // Chart dimensions -// let width = 800.0; -// let height = 300.0; -// let padding = 60.0; -// let chart_width = width - padding * 2.0; -// let chart_height = height - padding * 2.0; - -// // Find max value for scaling -// let max_value = data.iter().map(|d| d.send.max(d.recv)).max().unwrap_or(0) as f64; - -// // Generate paths -// let send_path = generate_path(data, |d| d.send, chart_width, chart_height, max_value); -// let recv_path = generate_path(data, |d| d.recv, chart_width, chart_height, max_value); - -// // Generate Y-axis labels (5 ticks) -// let y_labels: Vec<_> = (0..=4) -// .map(|i| { -// let value = (max_value as f64 / 4.0 * (4 - i) as f64) as u64; -// let y = padding + (chart_height / 4.0 * i as f64); -// (humanize_bytes(value), y) -// }) -// .collect(); - -// // Generate X-axis labels (show every ~20th data point, max 6 labels) -// let x_labels: Vec<_> = if !data.is_empty() { -// let step = (data.len() / 5).max(1); -// data.iter() -// .enumerate() -// .step_by(step) -// .map(|(i, point)| { -// let x = padding + (i as f64 / (data.len() - 1).max(1) as f64) * chart_width; -// let time_str = point.ts.format("%H:%M:%S").to_string(); -// (time_str, x) -// }) -// .collect() -// } else { -// vec![] -// }; - -// rsx! { -// div { -// class: "p-4", -// h2 { -// class: "text-xl font-bold mb-4", -// "Bandwidth" -// } - -// if data.is_empty() { -// div { -// class: "text-gray-500 text-center py-8", -// "No data available" -// } -// } else { -// div { -// class: "flex gap-4 mb-2", -// div { -// class: "flex items-center gap-2", -// div { -// class: "w-4 h-0.5", -// style: "text-color: #3b82f6;", -// } -// span { -// class: "text-sm", -// "Send" -// } -// } -// div { -// class: "flex items-center gap-2", -// div { -// class: "w-4 h-0.5", -// style: "background-color: #10b981;", -// } -// span { -// class: "text-sm", -// "Receive" -// } -// } -// } - -// svg { -// width: "{width}", -// height: "{height}", -// view_box: "0 0 {width} {height}", - -// // Y-axis -// line { -// x1: "{padding}", -// y1: "{padding}", -// x2: "{padding}", -// y2: "{height - padding}", -// stroke: "#666", -// stroke_width: "1", -// } - -// // X-axis -// line { -// x1: "{padding}", -// y1: "{height - padding}", -// x2: "{width - padding}", -// y2: "{height - padding}", -// stroke: "#666", -// stroke_width: "1", -// } - -// // Y-axis labels and grid lines -// for (label, y) in y_labels { -// g { -// // Grid line -// line { -// x1: "{padding}", -// y1: "{y}", -// x2: "{width - padding}", -// y2: "{y}", -// stroke: "#333", -// stroke_width: "0.5", -// stroke_dasharray: "2,2", -// } -// // Label -// text { -// x: "{padding - 10.0}", -// y: "{y + 5.0}", -// text_anchor: "end", -// font_size: "12", -// fill: "#999", -// "{label}" -// } -// } -// } - -// // X-axis labels -// for (label, x) in x_labels { -// text { -// x: "{x}", -// y: "{height - padding + 20.0}", -// text_anchor: "middle", -// font_size: "10", -// fill: "#999", -// "{label}" -// } -// } - -// // Chart area group -// g { -// transform: "translate({padding}, {padding})", - -// // Send line (blue) -// path { -// d: "{send_path}", -// fill: "none", -// stroke: "#3b82f6", -// stroke_width: "2", -// } - -// // Receive line (green) -// path { -// d: "{recv_path}", -// fill: "none", -// stroke: "#10b981", -// stroke_width: "2", -// } -// } - -// // X-axis label -// text { -// x: "{width / 2.0}", -// y: "{height - 10.0}", -// text_anchor: "middle", -// font_size: "12", -// fill: "#999", -// "Time" -// } -// } -// } -// } -// } -// } - -// // Generate SVG path for a line -// fn generate_path( -// data: &[ChartData], -// get_value: fn(&ChartData) -> u64, -// width: f64, -// height: f64, -// max_value: f64, -// ) -> String { -// if data.is_empty() { -// return String::new(); -// } - -// let points: Vec = data -// .iter() -// .enumerate() -// .map(|(i, point)| { -// let x = (i as f64 / (data.len() - 1).max(1) as f64) * width; -// let value = get_value(point) as f64; -// let y = if max_value > 0.0 { -// height - (value / max_value * height) -// } else { -// height -// }; -// format!("{},{}", x, y) -// }) -// .collect(); - -// if points.is_empty() { -// return String::new(); -// } - -// format!("M {}", points.join(" L ")) -// } diff --git a/ui/src/components/mod.rs b/ui/src/components/mod.rs index a1f8705..59e0937 100644 --- a/ui/src/components/mod.rs +++ b/ui/src/components/mod.rs @@ -3,7 +3,6 @@ //! component to be used in our app. mod add_tunnel_dialog; -mod bandwidth_timeseries_chart; mod button; mod delete_tunnel_dialog; mod head; diff --git a/ui/src/views/tunnel_bandwidth.rs b/ui/src/views/tunnel_bandwidth.rs index dfc6e2b..a891baf 100644 --- a/ui/src/views/tunnel_bandwidth.rs +++ b/ui/src/views/tunnel_bandwidth.rs @@ -1,6 +1,9 @@ +use std::{sync::Arc, time::Duration}; + use chrono::{DateTime, Local}; use dioxus::prelude::*; use lib::TunnelSummary; +use tokio::sync::Notify; use super::{OpenEditTunnelDialog, TunnelCard}; use crate::{ @@ -10,6 +13,8 @@ use crate::{ Route, }; +const SAMPLING_INTERVAL: Duration = Duration::from_millis(100); + #[derive(Debug, Clone, PartialEq)] struct RatePoint { ts: DateTime, @@ -32,14 +37,17 @@ pub fn TunnelBandwidth(id: String) -> Element { let mut points = use_signal(Vec::::new); let mut latest_send = use_signal(|| 0u64); let mut latest_recv = use_signal(|| 0u64); + let notify_loaded = Arc::new(Notify::new()); // Load tunnel metadata and keep it in sync when state updates (e.g. after edit/save). let state_for_future = state.clone(); use_future({ let id = id.clone(); + let notify_loaded = notify_loaded.clone(); move || { let id = id.clone(); let state = state_for_future.clone(); + let notify_loaded = notify_loaded.clone(); async move { let refresh = state.tunnel_refresh(); @@ -55,6 +63,7 @@ pub fn TunnelBandwidth(id: String) -> Element { title.set(tunnel.label.clone()); codename.set(tunnel.id.clone()); tunnel_loaded.set(Some(tunnel)); + notify_loaded.notify_waiters(); } Ok(None) => { loading.set(false); @@ -74,8 +83,9 @@ pub fn TunnelBandwidth(id: String) -> Element { use_future(move || { let state = consume_context::(); + let notify_loaded = notify_loaded.clone(); async move { - let mut metrics_sub = state.node().listen.metrics(); + let metrics = state.node().listen.metrics().clone(); // We compute bytes/sec over the interval between *plotted* samples (not per-metric tick), // otherwise bursty traffic can happen between samples and we'd plot a flatline. @@ -90,17 +100,35 @@ pub fn TunnelBandwidth(id: String) -> Element { // higher = more responsive, lower = smoother let alpha: f64 = 0.12; - while let Ok(metric) = metrics_sub.recv().await { - let now = std::time::Instant::now(); + loop { + let notified = notify_loaded.notified(); + let Some(tunnel) = tunnel_loaded() else { + notified.await; + continue; + }; + let Some(authority) = tunnel.origin_authority() else { + warn!(?tunnel, "failed to parse authority from tunnel summary"); + break; + }; + + let (send, recv) = match metrics.get(&authority) { + Some(m) => (m.bytes_from_origin(), m.bytes_to_origin()), + None => (0, 0), + }; + // First metric just initializes the baseline. + let now = std::time::Instant::now(); let (Some(prev_send), Some(prev_recv)) = (last_sample_send, last_sample_recv) else { - last_sample_send = Some(metric.send); - last_sample_recv = Some(metric.recv); + last_sample_send = Some(send); + last_sample_recv = Some(recv); last_sample_at = now; continue; }; + tokio::time::sleep(SAMPLING_INTERVAL).await; + let now = std::time::Instant::now(); + // Downsample to ~2Hz so the UI stays smooth. let dt = now.duration_since(last_sample_at); if dt < std::time::Duration::from_millis(650) { @@ -108,8 +136,8 @@ pub fn TunnelBandwidth(id: String) -> Element { } let dt_s = dt.as_secs_f64().max(0.001); - let raw_send = (metric.send.saturating_sub(prev_send)) as f64 / dt_s; - let raw_recv = (metric.recv.saturating_sub(prev_recv)) as f64 / dt_s; + let raw_send = (send.saturating_sub(prev_send)) as f64 / dt_s; + let raw_recv = (recv.saturating_sub(prev_recv)) as f64 / dt_s; // EMA update ema_send = if ema_send == 0.0 { @@ -142,8 +170,8 @@ pub fn TunnelBandwidth(id: String) -> Element { } points.set(next); - last_sample_send = Some(metric.send); - last_sample_recv = Some(metric.recv); + last_sample_send = Some(send); + last_sample_recv = Some(recv); last_sample_at = now; } } @@ -353,23 +381,37 @@ pub fn TunnelBandwidth(id: String) -> Element { } } +/// Format a byte value for the Y-axis so all labels use the same unit (B or KB). +fn format_axis_bytes(val: u64, max_v: f64) -> String { + if val == 0 { + return "0 B".to_string(); + } + if max_v >= 1024.0 { + let kb = val as f64 / 1024.0; + format!("{:.1} KB", kb) + } else { + format!("{} B", val) + } +} + #[component] fn BandwidthChart(points: Vec) -> Element { - // Render with a fixed viewBox but scale to the container width to avoid overflow. - // Give the left axis more room so labels don't get clipped. + // Fixed viewBox; SVG scales to container. Left padding gives Y-axis labels room so they don't clip. let width = 860.0; let height = 400.0; - let padding_x = 52.0; + let padding_x = 76.0; let padding_y = 22.0; let w = width - padding_x * 2.0; let h = height - padding_y * 2.0; + // Data max; use a minimum display scale so the Y-axis doesn't collapse to "0 B" everywhere when idle. let max_v = points .iter() .map(|p| p.send_per_s.max(p.recv_per_s)) .max() .unwrap_or(0) .max(1) as f64; + let display_max = max_v.max(10.0); #[derive(Clone, Copy)] struct Pt { @@ -433,7 +475,7 @@ fn BandwidthChart(points: Vec) -> Element { .map(|(i, p)| { let x = (i as f64 / (points.len().saturating_sub(1).max(1) as f64)) * w; let v = get(p) as f64; - let y = h - (v / max_v * h); + let y = h - (v / display_max * h); Pt { x, y } }) .collect(); @@ -458,13 +500,14 @@ fn BandwidthChart(points: Vec) -> Element { let send_color = "#BF9595"; let recv_color = "#4D6356"; - let y_ticks = 2; + // Y-axis ticks: use display_max so idle (max_v < 10) still shows distinct labels (e.g. 10, 7, 5, 2, 0 B). + let y_ticks = 4; let mut y_labels = Vec::new(); for i in 0..=y_ticks { let frac = i as f64 / y_ticks as f64; let y = padding_y + frac * h; - let val = ((1.0 - frac) * max_v) as u64; - y_labels.push((humanize_bytes(val), y)); + let val = ((1.0 - frac) * display_max) as u64; + y_labels.push((format_axis_bytes(val, display_max), y)); } rsx! { @@ -520,7 +563,7 @@ fn BandwidthChart(points: Vec) -> Element { stroke: "none", } - // grid + y labels + // grid + y labels (text_anchor end = right edge of text at x; dominant-baseline = align with line) for (label , y) in y_labels { line { x1: "{padding_x}", @@ -532,11 +575,12 @@ fn BandwidthChart(points: Vec) -> Element { stroke_dasharray: "10 10", } text { - x: "{padding_x - 12.0}", - y: "{y + 4.0}", + x: "{padding_x - 8.0}", + y: "{y}", text_anchor: "end", - font_size: "17", - fill: "#94a3b8", + dominant_baseline: "middle", + font_size: "15", + fill: "#64748b", "{label}" } }