Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ metrics = "0.24"
metrics-exporter-prometheus = "0.16"

[features]
default = ["nats", "webhook", "oidc"]
default = ["nats", "webhook", "oidc", "mesh-broker-client"]
nats = ["dep:async-nats"]
kafka = ["dep:rdkafka"]
webhook = ["dep:reqwest"]
oidc = ["dep:reqwest"]
postgres = ["dep:sqlx"]
aws-kms = ["dep:aws-sdk-kms", "dep:aws-config"]
vault = ["dep:reqwest", "dep:base64"]
mesh-broker-client = ["dep:reqwest"]
full = ["nats", "kafka", "webhook", "oidc", "postgres", "aws-kms", "vault"]
loadtest = ["dep:reqwest", "dep:tempfile"]

Expand Down
28 changes: 24 additions & 4 deletions src/api/formations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,37 @@ async fn list_documents(
None => return Ok(Json(Vec::new())),
};

// Query the broker's document store using the app_id as the collection name
let docs = match broker.list_documents(&app_id).await {
Some(d) => d,
None => return Ok(Json(Vec::new())),
// First prefer the historical gateway convention of using the app_id as
// the collection name. If the broker doesn't expose that collection,
// aggregate the default PEAT/tropiOS collections so the endpoint still
// surfaces live mesh documents.
let docs = if let Some(d) = broker.list_documents(&app_id).await {
d
} else {
let mut aggregated = Vec::new();
for collection in ["cot-broadcast", "contacts", "markers", "missions"] {
if let Some(mut docs) = broker.list_documents(collection).await {
for doc in &mut docs {
if let Some(obj) = doc.as_object_mut() {
obj.entry("_collection".to_string())
.or_insert_with(|| serde_json::Value::String(collection.to_string()));
}
}
aggregated.extend(docs);
}
}
if aggregated.is_empty() {
return Ok(Json(Vec::new()));
}
aggregated
};

let summaries: Vec<DocumentSummary> = docs
.into_iter()
.map(|val| {
let doc_id = val
.get("_id")
.or_else(|| val.get("uid"))
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
Expand Down
50 changes: 46 additions & 4 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tower_http::services::{ServeDir, ServeFile};

use crate::cdc::CdcEngine;
use crate::tenant::TenantManager;
use formations::MeshStateRegistry;

/// Install the global Prometheus metrics recorder and return a handle for
/// rendering the /metrics endpoint. Safe to call multiple times — only the
Expand All @@ -38,7 +39,23 @@ pub fn router(
ui_dir: Option<&str>,
admin_token: Option<String>,
) -> Router {
let r = app_authenticated(tenant_mgr.clone(), admin_token);
router_with_mesh(
tenant_mgr,
cdc_engine,
ui_dir,
admin_token,
MeshStateRegistry::new(),
)
}

pub fn router_with_mesh(
tenant_mgr: TenantManager,
cdc_engine: CdcEngine,
ui_dir: Option<&str>,
admin_token: Option<String>,
mesh: MeshStateRegistry,
) -> Router {
let r = app_authenticated_with_mesh(tenant_mgr.clone(), admin_token, mesh);

#[cfg(feature = "loadtest")]
let r = r.nest("/orgs", cdc_test::router(tenant_mgr, cdc_engine));
Expand All @@ -58,28 +75,53 @@ pub fn router(

/// Build the application router without admin auth (dev/test convenience).
pub fn app(tenant_mgr: TenantManager) -> Router {
app_authenticated(tenant_mgr, None)
app_authenticated_with_mesh(tenant_mgr, None, MeshStateRegistry::new())
}

/// Build the application router with optional admin token enforcement.
pub fn app_authenticated(tenant_mgr: TenantManager, admin_token: Option<String>) -> Router {
app_authenticated_with_mesh(tenant_mgr, admin_token, MeshStateRegistry::new())
}

pub fn app_authenticated_with_mesh(
tenant_mgr: TenantManager,
admin_token: Option<String>,
mesh: MeshStateRegistry,
) -> Router {
let prometheus_handle = install_prometheus_recorder();
app_with_metrics(tenant_mgr, prometheus_handle, admin_token)
app_with_metrics_and_mesh(tenant_mgr, prometheus_handle, admin_token, mesh)
}

/// Build the application router with an explicit PrometheusHandle.
pub fn app_with_metrics(
tenant_mgr: TenantManager,
prometheus_handle: PrometheusHandle,
admin_token: Option<String>,
) -> Router {
app_with_metrics_and_mesh(
tenant_mgr,
prometheus_handle,
admin_token,
MeshStateRegistry::new(),
)
}

pub fn app_with_metrics_and_mesh(
tenant_mgr: TenantManager,
prometheus_handle: PrometheusHandle,
admin_token: Option<String>,
mesh: MeshStateRegistry,
) -> Router {
// Admin routes — protected by bearer token when PEAT_ADMIN_TOKEN is set
let admin_routes = Router::new()
.nest("/orgs", orgs::router(tenant_mgr.clone()))
.nest("/orgs", tokens::router(tenant_mgr.clone()))
.nest("/orgs", sinks::router(tenant_mgr.clone()))
.nest("/orgs", identity::router(tenant_mgr.clone()))
.nest("/orgs", formations::router(tenant_mgr.clone()))
.nest(
"/orgs",
formations::router_with_mesh(tenant_mgr.clone(), mesh),
)
.layer(axum::middleware::from_fn_with_state(
admin_token,
auth::require_admin_token,
Expand Down
95 changes: 95 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ pub struct GatewayConfig {
pub vault_token: Option<String>,
/// Vault Transit secret engine key name.
pub vault_transit_key: Option<String>,
/// External peat-mesh broker mappings that should be surfaced as live
/// formation state inside the gateway.
pub mesh_brokers: Vec<MeshBrokerMapping>,
/// Poll interval for remote broker refreshes.
pub mesh_poll_interval_ms: u64,
}

#[derive(Debug, Clone, Deserialize)]
Expand All @@ -42,6 +47,14 @@ pub struct CdcConfig {
pub kafka_brokers: Option<String>,
}

#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct MeshBrokerMapping {
pub org_id: String,
pub app_id: String,
pub base_url: String,
pub collections: Vec<String>,
}

impl GatewayConfig {
pub fn from_env() -> Result<Self> {
let storage = match env::var("PEAT_STORAGE_BACKEND")
Expand Down Expand Up @@ -75,6 +88,88 @@ impl GatewayConfig {
vault_addr: env::var("PEAT_VAULT_ADDR").ok(),
vault_token: env::var("PEAT_VAULT_TOKEN").ok(),
vault_transit_key: env::var("PEAT_VAULT_TRANSIT_KEY").ok(),
mesh_brokers: parse_mesh_broker_mappings(env::var("PEAT_MESH_BROKERS").ok())?,
mesh_poll_interval_ms: env::var("PEAT_MESH_POLL_INTERVAL_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5_000),
})
}
}

fn parse_mesh_broker_mappings(raw: Option<String>) -> Result<Vec<MeshBrokerMapping>> {
let Some(raw) = raw else {
return Ok(vec![]);
};

let mut mappings = Vec::new();
for entry in raw.split(';').map(str::trim).filter(|s| !s.is_empty()) {
let mut parts = entry.split('|').map(str::trim);
let org_id = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
anyhow::anyhow!("missing org_id in PEAT_MESH_BROKERS entry '{entry}'")
})?;
let app_id = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
anyhow::anyhow!("missing app_id in PEAT_MESH_BROKERS entry '{entry}'")
})?;
let base_url = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
anyhow::anyhow!("missing base_url in PEAT_MESH_BROKERS entry '{entry}'")
})?;
let collections = parts
.next()
.map(|segment| {
segment
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
})
.filter(|v| !v.is_empty())
.unwrap_or_else(default_mesh_collections);

mappings.push(MeshBrokerMapping {
org_id: org_id.to_string(),
app_id: app_id.to_string(),
base_url: base_url.trim_end_matches('/').to_string(),
collections,
});
}

Ok(mappings)
}

fn default_mesh_collections() -> Vec<String> {
vec![
"cot-broadcast".into(),
"contacts".into(),
"markers".into(),
"missions".into(),
]
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn parses_mesh_broker_mappings() {
let mappings = parse_mesh_broker_mappings(Some(
"acme|mesh-a|http://127.0.0.1:9001|cot-broadcast,markers;bravo|mesh-b|http://mesh:8081|"
.into(),
))
.unwrap();

assert_eq!(mappings.len(), 2);
assert_eq!(mappings[0].org_id, "acme");
assert_eq!(mappings[0].app_id, "mesh-a");
assert_eq!(mappings[0].base_url, "http://127.0.0.1:9001");
assert_eq!(mappings[0].collections, vec!["cot-broadcast", "markers"]);
assert_eq!(mappings[1].collections, default_mesh_collections());
}

#[test]
fn rejects_invalid_mesh_broker_entry() {
let err = parse_mesh_broker_mappings(Some("acme|missing".into())).unwrap_err();
assert!(err.to_string().contains("base_url"));
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pub mod cdc;
pub mod cli;
pub mod config;
pub mod crypto;
#[cfg(feature = "mesh-broker-client")]
pub mod mesh_ingest;
pub mod storage;
pub mod tenant;

Expand Down
29 changes: 29 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use tracing::info;
use tracing_subscriber::EnvFilter;

use peat_gateway::{api, cdc, cli, config, tenant};
#[cfg(feature = "mesh-broker-client")]
use peat_gateway::{api::formations::MeshStateRegistry, mesh_ingest::MeshIngestManager};

#[derive(Parser)]
#[command(
Expand Down Expand Up @@ -83,6 +85,33 @@ async fn serve(config: &config::GatewayConfig) -> Result<()> {

let tenant_mgr = tenant::TenantManager::new(config).await?;
let cdc_engine = cdc::CdcEngine::new(config, tenant_mgr.clone()).await?;

#[cfg(feature = "mesh-broker-client")]
let mesh_registry = {
let registry = MeshStateRegistry::new();
if !config.mesh_brokers.is_empty() {
let manager = MeshIngestManager::new(
registry.clone(),
std::time::Duration::from_millis(config.mesh_poll_interval_ms),
)
.with_cdc(cdc_engine.clone());
for mapping in config.mesh_brokers.clone() {
manager.register_remote_broker(mapping).await;
}
}
registry
};

#[cfg(feature = "mesh-broker-client")]
let app = api::router_with_mesh(
tenant_mgr,
cdc_engine,
config.ui_dir.as_deref(),
config.admin_token.clone(),
mesh_registry,
);

#[cfg(not(feature = "mesh-broker-client"))]
let app = api::router(
tenant_mgr,
cdc_engine,
Expand Down
Loading