diff --git a/Cargo.toml b/Cargo.toml index 578eaa9..59c6730 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,7 @@ metrics = "0.24" metrics-exporter-prometheus = "0.16" [features] -default = ["nats", "webhook", "oidc"] +default = ["nats", "webhook", "oidc", "mesh-broker-client"] nats = ["dep:async-nats"] kafka = ["dep:rdkafka"] webhook = ["dep:reqwest"] @@ -83,6 +83,7 @@ oidc = ["dep:reqwest"] postgres = ["dep:sqlx"] aws-kms = ["dep:aws-sdk-kms", "dep:aws-config"] vault = ["dep:reqwest", "dep:base64"] +mesh-broker-client = ["dep:reqwest"] full = ["nats", "kafka", "webhook", "oidc", "postgres", "aws-kms", "vault"] loadtest = ["dep:reqwest", "dep:tempfile"] diff --git a/src/api/formations.rs b/src/api/formations.rs index 4481d03..96f945c 100644 --- a/src/api/formations.rs +++ b/src/api/formations.rs @@ -141,10 +141,29 @@ async fn list_documents( None => return Ok(Json(Vec::new())), }; - // Query the broker's document store using the app_id as the collection name - let docs = match broker.list_documents(&app_id).await { - Some(d) => d, - None => return Ok(Json(Vec::new())), + // First prefer the historical gateway convention of using the app_id as + // the collection name. If the broker doesn't expose that collection, + // aggregate the default PEAT/tropiOS collections so the endpoint still + // surfaces live mesh documents. + let docs = if let Some(d) = broker.list_documents(&app_id).await { + d + } else { + let mut aggregated = Vec::new(); + for collection in ["cot-broadcast", "contacts", "markers", "missions"] { + if let Some(mut docs) = broker.list_documents(collection).await { + for doc in &mut docs { + if let Some(obj) = doc.as_object_mut() { + obj.entry("_collection".to_string()) + .or_insert_with(|| serde_json::Value::String(collection.to_string())); + } + } + aggregated.extend(docs); + } + } + if aggregated.is_empty() { + return Ok(Json(Vec::new())); + } + aggregated }; let summaries: Vec = docs @@ -152,6 +171,7 @@ async fn list_documents( .map(|val| { let doc_id = val .get("_id") + .or_else(|| val.get("uid")) .and_then(|v| v.as_str()) .unwrap_or("unknown") .to_string(); diff --git a/src/api/mod.rs b/src/api/mod.rs index 40b18a8..f8bcec6 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -15,6 +15,7 @@ use tower_http::services::{ServeDir, ServeFile}; use crate::cdc::CdcEngine; use crate::tenant::TenantManager; +use formations::MeshStateRegistry; /// Install the global Prometheus metrics recorder and return a handle for /// rendering the /metrics endpoint. Safe to call multiple times — only the @@ -38,7 +39,23 @@ pub fn router( ui_dir: Option<&str>, admin_token: Option, ) -> Router { - let r = app_authenticated(tenant_mgr.clone(), admin_token); + router_with_mesh( + tenant_mgr, + cdc_engine, + ui_dir, + admin_token, + MeshStateRegistry::new(), + ) +} + +pub fn router_with_mesh( + tenant_mgr: TenantManager, + cdc_engine: CdcEngine, + ui_dir: Option<&str>, + admin_token: Option, + mesh: MeshStateRegistry, +) -> Router { + let r = app_authenticated_with_mesh(tenant_mgr.clone(), admin_token, mesh); #[cfg(feature = "loadtest")] let r = r.nest("/orgs", cdc_test::router(tenant_mgr, cdc_engine)); @@ -58,13 +75,21 @@ pub fn router( /// Build the application router without admin auth (dev/test convenience). pub fn app(tenant_mgr: TenantManager) -> Router { - app_authenticated(tenant_mgr, None) + app_authenticated_with_mesh(tenant_mgr, None, MeshStateRegistry::new()) } /// Build the application router with optional admin token enforcement. pub fn app_authenticated(tenant_mgr: TenantManager, admin_token: Option) -> Router { + app_authenticated_with_mesh(tenant_mgr, admin_token, MeshStateRegistry::new()) +} + +pub fn app_authenticated_with_mesh( + tenant_mgr: TenantManager, + admin_token: Option, + mesh: MeshStateRegistry, +) -> Router { let prometheus_handle = install_prometheus_recorder(); - app_with_metrics(tenant_mgr, prometheus_handle, admin_token) + app_with_metrics_and_mesh(tenant_mgr, prometheus_handle, admin_token, mesh) } /// Build the application router with an explicit PrometheusHandle. @@ -72,6 +97,20 @@ pub fn app_with_metrics( tenant_mgr: TenantManager, prometheus_handle: PrometheusHandle, admin_token: Option, +) -> Router { + app_with_metrics_and_mesh( + tenant_mgr, + prometheus_handle, + admin_token, + MeshStateRegistry::new(), + ) +} + +pub fn app_with_metrics_and_mesh( + tenant_mgr: TenantManager, + prometheus_handle: PrometheusHandle, + admin_token: Option, + mesh: MeshStateRegistry, ) -> Router { // Admin routes — protected by bearer token when PEAT_ADMIN_TOKEN is set let admin_routes = Router::new() @@ -79,7 +118,10 @@ pub fn app_with_metrics( .nest("/orgs", tokens::router(tenant_mgr.clone())) .nest("/orgs", sinks::router(tenant_mgr.clone())) .nest("/orgs", identity::router(tenant_mgr.clone())) - .nest("/orgs", formations::router(tenant_mgr.clone())) + .nest( + "/orgs", + formations::router_with_mesh(tenant_mgr.clone(), mesh), + ) .layer(axum::middleware::from_fn_with_state( admin_token, auth::require_admin_token, diff --git a/src/config.rs b/src/config.rs index 0606860..62796d0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,6 +26,11 @@ pub struct GatewayConfig { pub vault_token: Option, /// Vault Transit secret engine key name. pub vault_transit_key: Option, + /// External peat-mesh broker mappings that should be surfaced as live + /// formation state inside the gateway. + pub mesh_brokers: Vec, + /// Poll interval for remote broker refreshes. + pub mesh_poll_interval_ms: u64, } #[derive(Debug, Clone, Deserialize)] @@ -42,6 +47,14 @@ pub struct CdcConfig { pub kafka_brokers: Option, } +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +pub struct MeshBrokerMapping { + pub org_id: String, + pub app_id: String, + pub base_url: String, + pub collections: Vec, +} + impl GatewayConfig { pub fn from_env() -> Result { let storage = match env::var("PEAT_STORAGE_BACKEND") @@ -75,6 +88,88 @@ impl GatewayConfig { vault_addr: env::var("PEAT_VAULT_ADDR").ok(), vault_token: env::var("PEAT_VAULT_TOKEN").ok(), vault_transit_key: env::var("PEAT_VAULT_TRANSIT_KEY").ok(), + mesh_brokers: parse_mesh_broker_mappings(env::var("PEAT_MESH_BROKERS").ok())?, + mesh_poll_interval_ms: env::var("PEAT_MESH_POLL_INTERVAL_MS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(5_000), }) } } + +fn parse_mesh_broker_mappings(raw: Option) -> Result> { + let Some(raw) = raw else { + return Ok(vec![]); + }; + + let mut mappings = Vec::new(); + for entry in raw.split(';').map(str::trim).filter(|s| !s.is_empty()) { + let mut parts = entry.split('|').map(str::trim); + let org_id = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| { + anyhow::anyhow!("missing org_id in PEAT_MESH_BROKERS entry '{entry}'") + })?; + let app_id = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| { + anyhow::anyhow!("missing app_id in PEAT_MESH_BROKERS entry '{entry}'") + })?; + let base_url = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| { + anyhow::anyhow!("missing base_url in PEAT_MESH_BROKERS entry '{entry}'") + })?; + let collections = parts + .next() + .map(|segment| { + segment + .split(',') + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(ToOwned::to_owned) + .collect::>() + }) + .filter(|v| !v.is_empty()) + .unwrap_or_else(default_mesh_collections); + + mappings.push(MeshBrokerMapping { + org_id: org_id.to_string(), + app_id: app_id.to_string(), + base_url: base_url.trim_end_matches('/').to_string(), + collections, + }); + } + + Ok(mappings) +} + +fn default_mesh_collections() -> Vec { + vec![ + "cot-broadcast".into(), + "contacts".into(), + "markers".into(), + "missions".into(), + ] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_mesh_broker_mappings() { + let mappings = parse_mesh_broker_mappings(Some( + "acme|mesh-a|http://127.0.0.1:9001|cot-broadcast,markers;bravo|mesh-b|http://mesh:8081|" + .into(), + )) + .unwrap(); + + assert_eq!(mappings.len(), 2); + assert_eq!(mappings[0].org_id, "acme"); + assert_eq!(mappings[0].app_id, "mesh-a"); + assert_eq!(mappings[0].base_url, "http://127.0.0.1:9001"); + assert_eq!(mappings[0].collections, vec!["cot-broadcast", "markers"]); + assert_eq!(mappings[1].collections, default_mesh_collections()); + } + + #[test] + fn rejects_invalid_mesh_broker_entry() { + let err = parse_mesh_broker_mappings(Some("acme|missing".into())).unwrap_err(); + assert!(err.to_string().contains("base_url")); + } +} diff --git a/src/lib.rs b/src/lib.rs index 3876eec..0b399ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,8 @@ pub mod cdc; pub mod cli; pub mod config; pub mod crypto; +#[cfg(feature = "mesh-broker-client")] +pub mod mesh_ingest; pub mod storage; pub mod tenant; diff --git a/src/main.rs b/src/main.rs index b9d7bb1..e37e8b5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,8 @@ use tracing::info; use tracing_subscriber::EnvFilter; use peat_gateway::{api, cdc, cli, config, tenant}; +#[cfg(feature = "mesh-broker-client")] +use peat_gateway::{api::formations::MeshStateRegistry, mesh_ingest::MeshIngestManager}; #[derive(Parser)] #[command( @@ -83,6 +85,33 @@ async fn serve(config: &config::GatewayConfig) -> Result<()> { let tenant_mgr = tenant::TenantManager::new(config).await?; let cdc_engine = cdc::CdcEngine::new(config, tenant_mgr.clone()).await?; + + #[cfg(feature = "mesh-broker-client")] + let mesh_registry = { + let registry = MeshStateRegistry::new(); + if !config.mesh_brokers.is_empty() { + let manager = MeshIngestManager::new( + registry.clone(), + std::time::Duration::from_millis(config.mesh_poll_interval_ms), + ) + .with_cdc(cdc_engine.clone()); + for mapping in config.mesh_brokers.clone() { + manager.register_remote_broker(mapping).await; + } + } + registry + }; + + #[cfg(feature = "mesh-broker-client")] + let app = api::router_with_mesh( + tenant_mgr, + cdc_engine, + config.ui_dir.as_deref(), + config.admin_token.clone(), + mesh_registry, + ); + + #[cfg(not(feature = "mesh-broker-client"))] let app = api::router( tenant_mgr, cdc_engine, diff --git a/src/mesh_ingest.rs b/src/mesh_ingest.rs new file mode 100644 index 0000000..c6e7276 --- /dev/null +++ b/src/mesh_ingest.rs @@ -0,0 +1,465 @@ +use std::collections::{HashMap, HashSet}; +use std::hash::{Hash, Hasher}; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use anyhow::Result; +use peat_mesh::broker::{ + MeshBrokerState, MeshEvent, MeshNodeInfo, PeerSummary, ReadinessCheck, ReadinessResponse, + TopologySummary, +}; +use reqwest::Client; +use serde::Deserialize; +use serde_json::Value; +use tokio::sync::broadcast; +use tracing::{debug, warn}; + +use crate::api::formations::MeshStateRegistry; +use crate::cdc::CdcEngine; +use crate::config::MeshBrokerMapping; +use crate::tenant::models::CdcEvent; + +#[derive(Clone)] +pub struct MeshIngestManager { + registry: MeshStateRegistry, + client: Client, + poll_interval: Duration, + cdc_engine: Option, +} + +impl MeshIngestManager { + pub fn new(registry: MeshStateRegistry, poll_interval: Duration) -> Self { + Self { + registry, + client: Client::new(), + poll_interval, + cdc_engine: None, + } + } + + /// Attach a CDC engine so document changes detected during polling are + /// forwarded to all configured sinks (e.g. the trop-server webhook). + pub fn with_cdc(mut self, engine: CdcEngine) -> Self { + self.cdc_engine = Some(engine); + self + } + + pub async fn register_remote_broker(&self, mapping: MeshBrokerMapping) { + let state = Arc::new(RemoteBrokerState::new( + self.client.clone(), + mapping.clone(), + self.poll_interval, + self.cdc_engine.clone(), + )); + + state.spawn_refresh_loop(); + self.registry + .register(mapping.org_id, mapping.app_id, state) + .await; + } +} + +#[derive(Clone)] +struct Snapshot { + node_info: MeshNodeInfo, + topology: TopologySummary, + readiness: ReadinessResponse, + peers: Vec, + documents: HashMap>, + last_sync_ms: Option, + last_error: Option, +} + +impl Snapshot { + fn new(node_id: String) -> Self { + Self { + node_info: MeshNodeInfo { + node_id: node_id.clone(), + uptime_secs: 0, + version: "remote-broker-pending".into(), + }, + topology: TopologySummary { + peer_count: 0, + role: "standalone".into(), + hierarchy_level: 0, + }, + readiness: ReadinessResponse { + ready: false, + node_id, + checks: vec![ReadinessCheck { + name: "remote-broker".into(), + ready: false, + message: Some("waiting for first successful poll".into()), + }], + }, + peers: vec![], + documents: HashMap::new(), + last_sync_ms: None, + last_error: None, + } + } +} + +pub struct RemoteBrokerState { + client: Client, + mapping: MeshBrokerMapping, + poll_interval: Duration, + snapshot: Arc>, + events_tx: broadcast::Sender, + cdc_engine: Option, +} + +impl RemoteBrokerState { + pub fn new( + client: Client, + mapping: MeshBrokerMapping, + poll_interval: Duration, + cdc_engine: Option, + ) -> Self { + let (events_tx, _) = broadcast::channel(256); + let snapshot = Arc::new(RwLock::new(Snapshot::new(format!( + "{}:{}", + mapping.org_id, mapping.app_id + )))); + + Self { + client, + mapping, + poll_interval, + snapshot, + events_tx, + cdc_engine, + } + } + + pub fn spawn_refresh_loop(self: &Arc) { + let this = Arc::clone(self); + tokio::spawn(async move { + if let Err(err) = this.refresh_once().await { + this.record_error(err.to_string()); + } + + let mut ticker = tokio::time::interval(this.poll_interval); + loop { + ticker.tick().await; + if let Err(err) = this.refresh_once().await { + warn!( + org_id = %this.mapping.org_id, + app_id = %this.mapping.app_id, + broker = %this.mapping.base_url, + error = %err, + "remote mesh broker refresh failed" + ); + this.record_error(err.to_string()); + } + } + }); + } + + async fn refresh_once(&self) -> Result<()> { + let node_info: MeshNodeInfo = self.get_json("/api/v1/node").await?; + let topology: TopologySummary = self.get_json("/api/v1/topology").await?; + let readiness = match self.get_json::("/api/v1/ready").await { + Ok(ready) => ready, + Err(err) => ReadinessResponse { + ready: false, + node_id: node_info.node_id.clone(), + checks: vec![ReadinessCheck { + name: "remote-broker".into(), + ready: false, + message: Some(format!("ready probe failed: {err}")), + }], + }, + }; + let peers_resp: PeersEnvelope = self.get_json("/api/v1/peers").await?; + + let mut documents = HashMap::new(); + for collection in &self.mapping.collections { + match self + .get_json::(&format!("/api/v1/documents/{collection}")) + .await + { + Ok(resp) => { + documents.insert(collection.clone(), resp.documents); + } + Err(err) => { + debug!( + org_id = %self.mapping.org_id, + app_id = %self.mapping.app_id, + collection = %collection, + broker = %self.mapping.base_url, + error = %err, + "remote broker collection unavailable" + ); + } + } + } + + let last_sync_ms = now_ms(); + let new_snapshot = Snapshot { + node_info, + topology, + readiness, + peers: peers_resp.peers, + documents, + last_sync_ms: Some(last_sync_ms), + last_error: None, + }; + + // Publish CDC events BEFORE swapping snapshot so old state is still readable + if let Some(ref engine) = self.cdc_engine { + self.publish_cdc_diffs(engine, &new_snapshot, last_sync_ms).await; + } + + self.emit_diffs(&new_snapshot); + let mut snapshot = self.snapshot.write().unwrap_or_else(|e| e.into_inner()); + *snapshot = new_snapshot; + Ok(()) + } + + /// Compare current snapshot to `new_snapshot` and publish a `CdcEvent` for + /// every document that is new or whose content has changed. + async fn publish_cdc_diffs( + &self, + engine: &CdcEngine, + new_snapshot: &Snapshot, + timestamp_ms: u64, + ) { + let old_docs = self + .snapshot + .read() + .unwrap_or_else(|e| e.into_inner()) + .documents + .clone(); + + for (collection, new_docs) in &new_snapshot.documents { + let old_by_id: HashMap = old_docs + .get(collection) + .into_iter() + .flatten() + .filter_map(|d| document_id(d).map(|id| (id, doc_content_hash(d)))) + .collect(); + + for doc in new_docs { + let Some(doc_id) = document_id(doc) else { + continue; + }; + + let new_hash = doc_content_hash(doc); + let old_hash = old_by_id.get(&doc_id).copied(); + + // Skip if unchanged + if old_hash == Some(new_hash) { + continue; + } + + let change_hash = format!("{new_hash:016x}"); + let actor_id = doc + .get("_actor") + .and_then(|v| v.as_str()) + .unwrap_or("remote-broker") + .to_string(); + + let event = CdcEvent { + org_id: self.mapping.org_id.clone(), + app_id: self.mapping.app_id.clone(), + document_id: doc_id.clone(), + change_hash, + actor_id, + timestamp_ms, + patches: doc.clone(), + }; + + if let Err(e) = engine.publish(&event).await { + warn!( + org_id = %self.mapping.org_id, + app_id = %self.mapping.app_id, + doc_id = %doc_id, + collection = %collection, + error = %e, + "Failed to publish CDC event for remote broker document" + ); + } + } + } + } + + async fn get_json Deserialize<'de>>(&self, path: &str) -> Result { + let url = format!("{}{}", self.mapping.base_url, path); + let response = self.client.get(url).send().await?.error_for_status()?; + Ok(response.json::().await?) + } + + fn emit_diffs(&self, new_snapshot: &Snapshot) { + let old_snapshot = self + .snapshot + .read() + .unwrap_or_else(|e| e.into_inner()) + .clone(); + + let old_peers: HashSet = old_snapshot.peers.into_iter().map(|p| p.id).collect(); + let new_peers: HashSet = new_snapshot.peers.iter().map(|p| p.id.clone()).collect(); + + for peer_id in new_peers.difference(&old_peers) { + let _ = self.events_tx.send(MeshEvent::PeerConnected { + peer_id: peer_id.clone(), + }); + } + for peer_id in old_peers.difference(&new_peers) { + let _ = self.events_tx.send(MeshEvent::PeerDisconnected { + peer_id: peer_id.clone(), + reason: "missing from remote broker snapshot".into(), + }); + } + + if old_snapshot.topology.peer_count != new_snapshot.topology.peer_count + || old_snapshot.topology.role != new_snapshot.topology.role + { + let _ = self.events_tx.send(MeshEvent::TopologyChanged { + new_role: new_snapshot.topology.role.clone(), + peer_count: new_snapshot.topology.peer_count, + }); + } + + for (collection, docs) in &new_snapshot.documents { + let old_ids: HashSet = old_snapshot + .documents + .get(collection) + .into_iter() + .flatten() + .filter_map(document_id) + .collect(); + let new_ids: HashSet = docs.iter().filter_map(document_id).collect(); + + for doc_id in new_ids.difference(&old_ids) { + let _ = self.events_tx.send(MeshEvent::SyncEvent { + collection: collection.clone(), + doc_id: doc_id.clone(), + action: "upsert".into(), + }); + } + } + } + + fn record_error(&self, error: String) { + let mut snapshot = self.snapshot.write().unwrap_or_else(|e| e.into_inner()); + snapshot.last_error = Some(error.clone()); + snapshot.readiness = ReadinessResponse { + ready: false, + node_id: snapshot.node_info.node_id.clone(), + checks: vec![ReadinessCheck { + name: "remote-broker".into(), + ready: false, + message: Some(error), + }], + }; + } +} + +#[async_trait::async_trait] +impl MeshBrokerState for RemoteBrokerState { + fn node_info(&self) -> MeshNodeInfo { + self.snapshot + .read() + .unwrap_or_else(|e| e.into_inner()) + .node_info + .clone() + } + + async fn list_peers(&self) -> Vec { + self.snapshot + .read() + .unwrap_or_else(|e| e.into_inner()) + .peers + .clone() + } + + async fn get_peer(&self, id: &str) -> Option { + self.snapshot + .read() + .unwrap_or_else(|e| e.into_inner()) + .peers + .iter() + .find(|peer| peer.id == id) + .cloned() + } + + fn topology(&self) -> TopologySummary { + self.snapshot + .read() + .unwrap_or_else(|e| e.into_inner()) + .topology + .clone() + } + + fn subscribe_events(&self) -> broadcast::Receiver { + self.events_tx.subscribe() + } + + fn readiness(&self) -> ReadinessResponse { + let snapshot = self.snapshot.read().unwrap_or_else(|e| e.into_inner()); + let mut readiness = snapshot.readiness.clone(); + if let Some(last_sync_ms) = snapshot.last_sync_ms { + readiness.checks.push(ReadinessCheck { + name: "last-sync-ms".into(), + ready: true, + message: Some(last_sync_ms.to_string()), + }); + } + readiness + } + + async fn list_documents(&self, collection: &str) -> Option> { + self.snapshot + .read() + .unwrap_or_else(|e| e.into_inner()) + .documents + .get(collection) + .cloned() + } + + async fn get_document(&self, collection: &str, id: &str) -> Option { + self.snapshot + .read() + .unwrap_or_else(|e| e.into_inner()) + .documents + .get(collection) + .and_then(|docs| { + docs.iter() + .find(|doc| document_id(doc).as_deref() == Some(id)) + .cloned() + }) + } +} + +#[derive(Debug, Deserialize)] +struct PeersEnvelope { + peers: Vec, +} + +#[derive(Debug, Deserialize)] +struct DocumentsEnvelope { + documents: Vec, +} + +fn document_id(doc: &Value) -> Option { + doc.get("_id") + .or_else(|| doc.get("uid")) + .and_then(|value| value.as_str()) + .map(ToOwned::to_owned) +} + +fn doc_content_hash(doc: &Value) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + if let Ok(s) = serde_json::to_string(doc) { + s.hash(&mut hasher); + } + hasher.finish() +} + +fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} diff --git a/tests/remote_broker_integration_tests.rs b/tests/remote_broker_integration_tests.rs new file mode 100644 index 0000000..bbe936b --- /dev/null +++ b/tests/remote_broker_integration_tests.rs @@ -0,0 +1,204 @@ +use std::net::SocketAddr; + +use axum::{extract::State, routing::get, Json, Router}; +use peat_gateway::api; +use peat_gateway::api::formations::MeshStateRegistry; +use peat_gateway::config::{CdcConfig, GatewayConfig, MeshBrokerMapping, StorageConfig}; +use peat_gateway::mesh_ingest::MeshIngestManager; +use peat_gateway::tenant::models::EnrollmentPolicy; +use peat_gateway::tenant::TenantManager; +use reqwest::Client; +use serde_json::{json, Value}; + +#[derive(Clone)] +struct FakeBrokerState { + node: Value, + topology: Value, + ready: Value, + peers: Value, + contacts: Value, + markers: Value, +} + +async fn fake_node(State(state): State) -> Json { + Json(state.node) +} + +async fn fake_topology(State(state): State) -> Json { + Json(state.topology) +} + +async fn fake_ready(State(state): State) -> Json { + Json(state.ready) +} + +async fn fake_peers(State(state): State) -> Json { + Json(state.peers) +} + +async fn fake_documents( + State(state): State, + axum::extract::Path(collection): axum::extract::Path, +) -> Result, axum::http::StatusCode> { + match collection.as_str() { + "contacts" => Ok(Json(state.contacts)), + "markers" => Ok(Json(state.markers)), + _ => Err(axum::http::StatusCode::NOT_FOUND), + } +} + +async fn spawn_fake_broker() -> (String, tokio::task::JoinHandle<()>) { + let state = FakeBrokerState { + node: json!({ + "node_id": "fake-broker-node", + "uptime_secs": 12, + "version": "0.5.2" + }), + topology: json!({ + "peer_count": 1, + "role": "standalone", + "hierarchy_level": 0 + }), + ready: json!({ + "ready": true, + "node_id": "fake-broker-node", + "checks": [{"name": "remote-broker", "ready": true, "message": "ok"}] + }), + peers: json!({ + "peers": [{ + "id": "peer-ios-1", + "connected": true, + "state": "connected", + "rtt_ms": 7 + }], + "count": 1 + }), + contacts: json!({ + "collection": "contacts", + "count": 1, + "documents": [{ + "_id": "contact-1", + "uid": "contact-1", + "callsign": "Alpha", + "lat": 52.1, + "lon": 21.0 + }] + }), + markers: json!({ + "collection": "markers", + "count": 1, + "documents": [{ + "_id": "marker-1", + "uid": "marker-1", + "type": "b-m-p-s-p-loc", + "lat": 52.2, + "lon": 21.1 + }] + }), + }; + + let app = Router::new() + .route("/api/v1/node", get(fake_node)) + .route("/api/v1/topology", get(fake_topology)) + .route("/api/v1/ready", get(fake_ready)) + .route("/api/v1/peers", get(fake_peers)) + .route("/api/v1/documents/:collection", get(fake_documents)) + .with_state(state); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr: SocketAddr = listener.local_addr().unwrap(); + let base = format!("http://{}", addr); + let handle = tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + (base, handle) +} + +async fn spawn_gateway_with_remote_broker( + broker_url: &str, +) -> (Client, String, tempfile::TempDir, TenantManager) { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.redb"); + + let config = GatewayConfig { + bind_addr: "127.0.0.1:0".into(), + storage: StorageConfig::Redb { + path: db_path.to_str().unwrap().into(), + }, + cdc: CdcConfig { + nats_url: None, + kafka_brokers: None, + }, + ui_dir: None, + kek: None, + kms_key_arn: None, + admin_token: None, + vault_addr: None, + vault_token: None, + vault_transit_key: None, + mesh_brokers: vec![], + mesh_poll_interval_ms: 25, + }; + + let tenant_mgr = TenantManager::new(&config).await.unwrap(); + let registry = MeshStateRegistry::new(); + let manager = MeshIngestManager::new(registry.clone(), std::time::Duration::from_millis(25)); + manager + .register_remote_broker(MeshBrokerMapping { + org_id: "acme".into(), + app_id: "myapp".into(), + base_url: broker_url.into(), + collections: vec!["contacts".into(), "markers".into()], + }) + .await; + + let app = api::app_authenticated_with_mesh(tenant_mgr.clone(), None, registry); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr: SocketAddr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + let client = Client::new(); + (client, format!("http://{}", addr), dir, tenant_mgr) +} + +#[tokio::test] +async fn gateway_surfaces_remote_broker_peers_and_documents() { + let (broker_url, _broker_task) = spawn_fake_broker().await; + let (client, base, _dir, mgr) = spawn_gateway_with_remote_broker(&broker_url).await; + + mgr.create_org("acme".into(), "Acme".into()).await.unwrap(); + mgr.create_formation("acme", "myapp".into(), EnrollmentPolicy::Open) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let peers_resp = client + .get(format!("{base}/orgs/acme/formations/myapp/peers")) + .send() + .await + .unwrap(); + assert!(peers_resp.status().is_success()); + let peers: Vec = peers_resp.json().await.unwrap(); + assert_eq!(peers.len(), 1); + assert_eq!(peers[0]["peer_id"], "peer-ios-1"); + assert_eq!(peers[0]["status"], "Connected"); + + let docs_resp = client + .get(format!("{base}/orgs/acme/formations/myapp/documents")) + .send() + .await + .unwrap(); + assert!(docs_resp.status().is_success()); + let docs: Vec = docs_resp.json().await.unwrap(); + assert_eq!(docs.len(), 2); + let ids: Vec = docs + .iter() + .filter_map(|doc| doc["doc_id"].as_str().map(ToOwned::to_owned)) + .collect(); + assert!(ids.contains(&"contact-1".to_string())); + assert!(ids.contains(&"marker-1".to_string())); +}