diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 9276b4790..0d0bfb731 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -43,6 +43,7 @@ rara-sessions.workspace = true rara-skills.workspace = true rara-soul.workspace = true rara-symphony.workspace = true +rara-vault = { workspace = true } reqwest = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json.workspace = true diff --git a/crates/app/src/config_sync.rs b/crates/app/src/config_sync.rs index 93f90bc68..6a946f12e 100644 --- a/crates/app/src/config_sync.rs +++ b/crates/app/src/config_sync.rs @@ -28,11 +28,12 @@ use std::{ Arc, atomic::{AtomicU64, Ordering}, }, - time::Duration, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use rara_domain_shared::settings::SettingsProvider; +use rara_vault::VaultClient; use tokio::sync::{RwLock, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -43,10 +44,12 @@ const DEBOUNCE_MS: u64 = 1500; /// Bidirectional sync between config.yaml and the settings KV store. pub struct ConfigFileSync { - settings: Arc, - app_config: Arc>, - config_path: PathBuf, - last_written_hash: Arc, + settings: Arc, + app_config: Arc>, + config_path: PathBuf, + last_written_hash: Arc, + vault_client: Option>, + last_vault_push_ms: Arc, } fn content_hash(content: &str) -> u64 { @@ -61,14 +64,19 @@ impl ConfigFileSync { settings: Arc, app_config: AppConfig, config_path: PathBuf, + vault_client: Option>, + last_vault_push_ms: Arc, ) -> anyhow::Result { let sync = Self { settings, app_config: Arc::new(RwLock::new(app_config)), config_path, last_written_hash: Arc::new(AtomicU64::new(0)), + vault_client, + last_vault_push_ms, }; - sync.sync_from_file().await?; + let initial_config = sync.app_config.read().await.clone(); + sync.sync_from_app_config(&initial_config).await?; Ok(sync) } @@ -77,6 +85,10 @@ impl ConfigFileSync { async fn sync_from_file(&self) -> anyhow::Result<()> { let content = tokio::fs::read_to_string(&self.config_path).await?; let new_config: AppConfig = serde_yaml::from_str(&content)?; + self.sync_from_app_config(&new_config).await + } + + async fn sync_from_app_config(&self, new_config: &AppConfig) -> anyhow::Result<()> { let pairs = flatten::flatten_config_sections(&new_config); if !pairs.is_empty() { let patches = pairs.into_iter().map(|(k, v)| (k, Some(v))).collect(); @@ -84,10 +96,10 @@ impl ConfigFileSync { } { let mut cfg = self.app_config.write().await; - cfg.llm = new_config.llm; - cfg.telegram = new_config.telegram; - cfg.composio = new_config.composio; - cfg.knowledge = new_config.knowledge; + cfg.llm = new_config.llm.clone(); + cfg.telegram = new_config.telegram.clone(); + cfg.composio = new_config.composio.clone(); + cfg.knowledge = new_config.knowledge.clone(); } info!("config.yaml synced to settings store"); Ok(()) @@ -106,10 +118,27 @@ impl ConfigFileSync { cfg.knowledge = knowledge; serde_yaml::to_string(&*cfg)? }; + let vault_pairs = { + let cfg = self.app_config.read().await; + flatten::flatten_config_sections(&cfg) + }; let hash = content_hash(&yaml); tokio::fs::write(&self.config_path, &yaml).await?; self.last_written_hash.store(hash, Ordering::Relaxed); + + if let Some(ref client) = self.vault_client { + if let Err(e) = client.push_changes(vault_pairs).await { + warn!(error = %e, "failed to push settings to vault"); + } else { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + self.last_vault_push_ms.store(now, Ordering::Relaxed); + debug!("settings pushed to vault"); + } + } debug!("settings written back to config.yaml"); Ok(()) } @@ -247,7 +276,10 @@ impl ConfigFileSync { #[cfg(test)] mod tests { - use std::sync::Arc; + use std::sync::{ + Arc, + atomic::AtomicU64, + }; use super::ConfigFileSync; use crate::AppConfig; @@ -299,6 +331,8 @@ knowledge: gateway: repo_url: "https://github.com/example/repo" + bot_token: "telegram-gateway-token" + chat_id: 123456 "#; #[tokio::test] @@ -357,9 +391,15 @@ composio: tokio::fs::write(&config_path, yaml).await.unwrap(); let config: AppConfig = serde_yaml::from_str(yaml).unwrap(); - let _sync = ConfigFileSync::new(settings_provider.clone(), config, config_path) - .await - .unwrap(); + let _sync = ConfigFileSync::new( + settings_provider.clone(), + config, + config_path, + None, + Arc::new(AtomicU64::new(0)), + ) + .await + .unwrap(); // Verify KV store was populated assert_eq!( @@ -394,6 +434,157 @@ composio: ); } + #[tokio::test] + async fn initial_sync_prefers_bootstrapped_config() { + use rara_domain_shared::settings::SettingsProvider; + + let tmp_dir = tempfile::tempdir().unwrap(); + let db_path = tmp_dir.path().join("test.db"); + let db_url = format!("sqlite:{}?mode=rwc", db_path.display()); + + let pool = sqlx::sqlite::SqlitePoolOptions::new() + .max_connections(1) + .connect(&db_url) + .await + .unwrap(); + sqlx::migrate!("../rara-model/migrations") + .run(&pool) + .await + .unwrap(); + + let db_store = yunara_store::db::DBStore::new(pool.clone()); + let kv = db_store.kv_store(); + let settings_svc = rara_backend_admin::settings::SettingsSvc::load(kv, pool) + .await + .unwrap(); + let settings_provider: Arc = Arc::new(settings_svc); + + let config_path = tmp_dir.path().join("config.yaml"); + let yaml = r#" +http: + bind_address: "127.0.0.1:25555" +grpc: + bind_address: "127.0.0.1:50051" + server_address: "127.0.0.1:50051" +users: + - name: test + role: root + platforms: [] +mita: + heartbeat_interval: "30m" +llm: + default_provider: "local-provider" + providers: + local-provider: + base_url: "http://localhost:1234" + api_key: "local-key" + default_model: "local-model" +"#; + tokio::fs::write(&config_path, yaml).await.unwrap(); + + let mut config: AppConfig = serde_yaml::from_str(yaml).unwrap(); + config.llm.as_mut().unwrap().default_provider = Some("vault-provider".into()); + config.llm.as_mut().unwrap().providers.insert( + "vault-provider".into(), + crate::flatten::ProviderConfig { + base_url: Some("http://vault:1234".into()), + api_key: Some("vault-key".into()), + default_model: Some("vault-model".into()), + fallback_models: None, + }, + ); + + let _sync = ConfigFileSync::new( + settings_provider.clone(), + config, + config_path, + None, + Arc::new(AtomicU64::new(0)), + ) + .await + .unwrap(); + + assert_eq!( + settings_provider + .get("llm.default_provider") + .await + .as_deref(), + Some("vault-provider"), + ); + assert_eq!( + settings_provider + .get("llm.providers.vault-provider.base_url") + .await + .as_deref(), + Some("http://vault:1234"), + ); + } + + #[tokio::test] + async fn writeback_without_vault_still_works() { + use rara_domain_shared::settings::SettingsProvider; + + let tmp_dir = tempfile::tempdir().unwrap(); + let db_path = tmp_dir.path().join("test.db"); + let db_url = format!("sqlite:{}?mode=rwc", db_path.display()); + + let pool = sqlx::sqlite::SqlitePoolOptions::new() + .max_connections(1) + .connect(&db_url) + .await + .unwrap(); + sqlx::migrate!("../rara-model/migrations") + .run(&pool) + .await + .unwrap(); + + let db_store = yunara_store::db::DBStore::new(pool.clone()); + let kv = db_store.kv_store(); + let settings_svc = rara_backend_admin::settings::SettingsSvc::load(kv, pool) + .await + .unwrap(); + let settings_provider: Arc = Arc::new(settings_svc); + + let config_path = tmp_dir.path().join("config.yaml"); + let yaml = r#" +http: + bind_address: "127.0.0.1:25555" +grpc: + bind_address: "127.0.0.1:50051" + server_address: "127.0.0.1:50051" +users: + - name: test + role: root + platforms: [] +mita: + heartbeat_interval: "30m" +llm: + default_provider: "test-provider" + providers: + test-provider: + base_url: "http://localhost:1234" + api_key: "test-key" + default_model: "test-model" +"#; + tokio::fs::write(&config_path, yaml).await.unwrap(); + + let config: AppConfig = serde_yaml::from_str(yaml).unwrap(); + let sync = ConfigFileSync::new( + settings_provider.clone(), + config, + config_path.clone(), + None, + Arc::new(AtomicU64::new(0)), + ) + .await + .unwrap(); + + sync.writeback_to_file().await.unwrap(); + + let content = tokio::fs::read_to_string(&config_path).await.unwrap(); + assert!(content.contains("test-provider")); + } + #[test] fn appconfig_yaml_roundtrip() { let config: AppConfig = serde_yaml::from_str(TEST_YAML).expect("TEST_YAML should parse"); @@ -477,4 +668,75 @@ composio: .and_then(|t| t.allowed_group_chat_id.as_deref()), ); } + + #[test] + fn appconfig_with_vault_yaml_roundtrip() { + let yaml = r#" +users: + - name: "testuser" + role: root + platforms: + - type: telegram + user_id: "12345" +http: + bind_address: "127.0.0.1:25555" +grpc: + bind_address: "127.0.0.1:50051" + server_address: "127.0.0.1:50051" +mita: + heartbeat_interval: "30m" +vault: + address: "http://10.0.0.1:30820" + mount_path: "secret/rara" + auth: + method: approle + role_id_file: "/etc/rara/vault-role-id" + secret_id_file: "/etc/rara/vault-secret-id" + watch_interval: "30s" + timeout: "5s" + fallback_to_local: true +"#; + let config: AppConfig = serde_yaml::from_str(yaml).expect("should parse with vault"); + assert_eq!( + config.vault.as_ref().map(|vault| vault.address.as_str()), + Some("http://10.0.0.1:30820"), + ); + assert_eq!( + config.vault.as_ref().map(|vault| vault.fallback_to_local), + Some(true), + ); + let serialized = serde_yaml::to_string(&config).unwrap(); + let reparsed: serde_yaml::Value = serde_yaml::from_str(&serialized).unwrap(); + + assert_eq!( + reparsed["vault"]["address"].as_str(), + Some("http://10.0.0.1:30820"), + ); + assert_eq!(reparsed["vault"]["fallback_to_local"].as_bool(), Some(true)); + } + + #[test] + fn appconfig_without_vault_parses() { + let yaml = r#" +users: + - name: "testuser" + role: root + platforms: + - type: telegram + user_id: "12345" +http: + bind_address: "127.0.0.1:25555" +grpc: + bind_address: "127.0.0.1:50051" + server_address: "127.0.0.1:50051" +mita: + heartbeat_interval: "30m" +"#; + let config: AppConfig = serde_yaml::from_str(yaml).expect("should parse without vault"); + assert!(config.vault.is_none()); + let serialized = serde_yaml::to_string(&config).unwrap(); + let reparsed: serde_yaml::Value = serde_yaml::from_str(&serialized).unwrap(); + + assert!(reparsed.get("vault").is_none()); + } } diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index dc91e2370..217c1025f 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -18,6 +18,8 @@ mod context_mode; pub mod flatten; pub mod gateway; mod tools; +mod vault_bootstrap; +mod vault_watcher; use std::{ path::{Path, PathBuf}, @@ -37,7 +39,7 @@ use rara_server::{ http::{RestServerConfig, health_routes, start_rest_server}, }; use serde::{Deserialize, Serialize}; -use snafu::{ResultExt, Whatever}; +use snafu::{ResultExt, Whatever, whatever}; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -96,6 +98,9 @@ pub struct AppConfig { /// Symphony autonomous coding agent orchestrator (optional). #[serde(default, skip_serializing_if = "Option::is_none")] pub symphony: Option, + /// Vault configuration center (optional). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub vault: Option, } /// Configuration for the Mita background proactive agent. @@ -253,6 +258,27 @@ pub async fn start_with_options( options: StartOptions, ) -> Result { info!("Initializing job application"); + let mut config = config; + + // Build a single VaultClient for the entire app lifetime (if configured). + let vault_client: Option> = + build_vault_client(&config).await; + + // Bootstrap: pull dynamic config from Vault and merge into AppConfig. + if let Some(ref client) = vault_client { + match vault_bootstrap::pull_and_merge(&mut config, client).await { + Ok(true) => info!("Vault config merged into AppConfig"), + Ok(false) => {} + Err(e) => { + let fallback = config.vault.as_ref().is_some_and(|v| v.fallback_to_local); + if fallback { + warn!(error = %e, "Vault pull failed, falling back to local config"); + } else { + whatever!("Vault bootstrap failed: {e}"); + } + } + } + } let db_store = init_infra(&config) .await @@ -268,16 +294,25 @@ pub async fn start_with_options( Arc::new(settings_svc.clone()); info!("Runtime settings service loaded"); - // Resolve config file path (same logic as AppConfig::new) + // Resolve config file path: prefer local ./config.yaml if it exists, + // otherwise fall back to the global config path. This mirrors the + // load priority in AppConfig::new(). let config_path = { - let mut path = std::env::current_dir().unwrap_or_default(); - path.push("config.yaml"); - path + let local = std::env::current_dir() + .unwrap_or_default() + .join("config.yaml"); + if local.is_file() { local } else { rara_paths::config_file().clone() } }; - let config_file_sync = - config_sync::ConfigFileSync::new(settings_provider.clone(), config.clone(), config_path) - .await - .whatever_context("Failed to initialize config file sync")?; + let last_vault_push_ms = Arc::new(std::sync::atomic::AtomicU64::new(0)); + let config_file_sync = config_sync::ConfigFileSync::new( + settings_provider.clone(), + config.clone(), + config_path, + vault_client.clone(), + last_vault_push_ms.clone(), + ) + .await + .whatever_context("Failed to initialize config file sync")?; let rara = crate::boot::boot(pool.clone(), settings_provider.clone(), &config.users) .await @@ -372,6 +407,18 @@ pub async fn start_with_options( config_file_sync.start(cancel).await; }); } + if let Some(ref client) = vault_client { + if let Some(ref vault_config) = config.vault { + vault_watcher::spawn_vault_watcher( + client.clone(), + vault_config.watch_interval, + settings_provider.clone(), + last_vault_push_ms.clone(), + cancellation_token.clone(), + ); + info!("Vault config watcher started"); + } + } let (_kernel_arc, kernel_handle) = kernel.start(cancellation_token.clone()); @@ -678,6 +725,36 @@ async fn init_infra(config: &AppConfig) -> Result { Ok(db_store) } +async fn build_vault_client(config: &AppConfig) -> Option> { + let vault_config = config.vault.as_ref()?; + let client = match rara_vault::VaultClient::new(vault_config.clone()) { + Ok(client) => Arc::new(client), + Err(e) => { + if vault_config.fallback_to_local { + warn!(error = %e, "failed to build vault client, falling back to local config"); + return None; + } + error!(error = %e, "failed to build vault client"); + return None; + } + }; + + match client.login().await { + Ok(()) => { + info!("Vault client authenticated"); + Some(client) + } + Err(e) => { + if vault_config.fallback_to_local { + warn!(error = %e, "vault login failed, falling back to local config"); + } else { + error!(error = %e, "vault login failed"); + } + None + } + } +} + // --------------------------------------------------------------------------- // AppHandle // --------------------------------------------------------------------------- diff --git a/crates/app/src/vault_bootstrap.rs b/crates/app/src/vault_bootstrap.rs new file mode 100644 index 000000000..753d6b94d --- /dev/null +++ b/crates/app/src/vault_bootstrap.rs @@ -0,0 +1,147 @@ +// Copyright 2025 Rararulab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Vault bootstrap: pull config from Vault at startup and merge into AppConfig. + +use std::collections::HashMap; + +use rara_vault::VaultClient; +use tracing::info; + +use crate::AppConfig; + +/// Pull config from Vault via the already-authenticated client and merge +/// dynamic sections into AppConfig. +/// +/// Returns `Ok(true)` if values were merged, `Ok(false)` if Vault returned +/// no data. +pub async fn pull_and_merge( + config: &mut AppConfig, + client: &VaultClient, +) -> Result { + let pairs = client.pull_all().await?; + if pairs.is_empty() { + return Ok(false); + } + merge_vault_pairs_into_config(config, &pairs); + info!(count = pairs.len(), "vault config pulled and merged"); + Ok(true) +} + +fn merge_vault_pairs_into_config(config: &mut AppConfig, pairs: &[(String, String)]) { + let settings_map: HashMap = pairs.iter().cloned().collect(); + let (llm, telegram, composio, knowledge) = + crate::flatten::unflatten_from_settings(&settings_map); + + if llm.is_some() { + config.llm = llm; + } + if telegram.is_some() { + config.telegram = telegram; + } + if composio.is_some() { + config.composio = composio; + } + if knowledge.is_some() { + config.knowledge = knowledge; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn merge_vault_pairs_overrides_llm() { + let yaml = r#" +users: + - name: test + role: root + platforms: [] +http: + bind_address: "127.0.0.1:25555" +grpc: + bind_address: "127.0.0.1:50051" + server_address: "127.0.0.1:50051" +mita: + heartbeat_interval: "30m" +llm: + default_provider: "local-ollama" +"#; + let mut config: AppConfig = serde_yaml::from_str(yaml).unwrap(); + assert_eq!( + config.llm.as_ref().unwrap().default_provider.as_deref(), + Some("local-ollama") + ); + + let pairs = vec![ + ( + "llm.default_provider".to_string(), + "vault-provider".to_string(), + ), + ( + "llm.providers.vault-provider.base_url".to_string(), + "http://vault:1234".to_string(), + ), + ( + "llm.providers.vault-provider.api_key".to_string(), + "sk-vault".to_string(), + ), + ( + "llm.providers.vault-provider.default_model".to_string(), + "gpt-4".to_string(), + ), + ]; + merge_vault_pairs_into_config(&mut config, &pairs); + + assert_eq!( + config.llm.as_ref().unwrap().default_provider.as_deref(), + Some("vault-provider") + ); + assert!( + config + .llm + .as_ref() + .unwrap() + .providers + .contains_key("vault-provider") + ); + } + + #[test] + fn merge_vault_pairs_does_not_override_when_empty() { + let yaml = r#" +users: + - name: test + role: root + platforms: [] +http: + bind_address: "127.0.0.1:25555" +grpc: + bind_address: "127.0.0.1:50051" + server_address: "127.0.0.1:50051" +mita: + heartbeat_interval: "30m" +llm: + default_provider: "local-ollama" +"#; + let mut config: AppConfig = serde_yaml::from_str(yaml).unwrap(); + merge_vault_pairs_into_config(&mut config, &[]); + assert_eq!( + config.llm.as_ref().unwrap().default_provider.as_deref(), + Some("local-ollama") + ); + } + +} diff --git a/crates/app/src/vault_watcher.rs b/crates/app/src/vault_watcher.rs new file mode 100644 index 000000000..ed9bd5e1c --- /dev/null +++ b/crates/app/src/vault_watcher.rs @@ -0,0 +1,226 @@ +// Copyright 2025 Rararulab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Background task that polls Vault for config changes and syncs to settings +//! KV. + +use std::{ + collections::HashMap, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; + +use anyhow::Context; +use rara_domain_shared::settings::SettingsProvider; +use rara_vault::VaultClient; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; + +/// Grace period after a config-sync push during which the watcher will +/// update its version cache but skip applying changes to the settings +/// store, preventing push → pull → push echo loops. +const ECHO_GRACE_MS: u64 = 5_000; + +pub fn spawn_vault_watcher( + client: Arc, + interval: Duration, + settings: Arc, + last_vault_push_ms: Arc, + cancel: CancellationToken, +) { + tokio::spawn(async move { + let mut last_versions = HashMap::new(); + match fetch_versions(&client).await { + Ok(versions) => { + last_versions = versions; + } + Err(error) => { + warn!(error = %error, "vault watcher: failed to read initial metadata"); + } + } + + let mut ticker = tokio::time::interval(interval); + ticker.tick().await; + + loop { + tokio::select! { + _ = cancel.cancelled() => { + info!("vault watcher stopped"); + break; + } + _ = ticker.tick() => { + if let Err(error) = poll_and_sync( + &client, + &settings, + &mut last_versions, + &last_vault_push_ms, + ).await { + warn!(error = %error, "vault watcher: poll failed, will retry next interval"); + } + } + } + } + }); +} + +fn build_settings_patches( + current: &HashMap, + vault_pairs: &[(String, String)], +) -> HashMap> { + let mut patches = HashMap::new(); + for (key, value) in vault_pairs { + match current.get(key) { + Some(existing) if existing == value => {} + _ => { + patches.insert(key.clone(), Some(value.clone())); + } + } + } + patches +} + +fn versions_changed(previous: &HashMap, current: &HashMap) -> bool { + previous != current +} + +async fn poll_and_sync( + client: &VaultClient, + settings: &Arc, + last_versions: &mut HashMap, + last_vault_push_ms: &AtomicU64, +) -> anyhow::Result<()> { + ensure_authenticated(client).await?; + + let current_versions = fetch_versions(client).await?; + if !versions_changed(last_versions, ¤t_versions) { + debug!("vault watcher: no metadata changes detected"); + return Ok(()); + } + + // If config_sync recently pushed to Vault, the version bump is our + // own echo. Update the cache but skip applying to settings. + let push_ago = epoch_ms().saturating_sub(last_vault_push_ms.load(Ordering::Relaxed)); + if push_ago < ECHO_GRACE_MS { + debug!("vault watcher: version change within echo grace window, skipping apply"); + *last_versions = current_versions; + return Ok(()); + } + + let vault_pairs = client.pull_all().await?; + let current_settings = settings.list().await; + let patches = build_settings_patches(¤t_settings, &vault_pairs); + if patches.is_empty() { + debug!("vault watcher: metadata changed but settings payload is unchanged"); + } else { + info!( + count = patches.len(), + "vault watcher: applying config changes from vault" + ); + settings + .batch_update(patches) + .await + .context("failed to apply vault settings patches")?; + } + + *last_versions = current_versions; + Ok(()) +} + +fn epoch_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + +async fn ensure_authenticated(client: &VaultClient) -> Result<(), rara_vault::VaultError> { + if client.token_needs_renewal().await { + match client.renew_token().await { + Ok(()) => {} + Err(error) => { + warn!(error = %error, "vault watcher: token renewal failed, re-logging in"); + client.login().await?; + } + } + } + Ok(()) +} + +async fn fetch_versions( + client: &VaultClient, +) -> Result, rara_vault::VaultError> { + let mut versions = HashMap::new(); + for prefix in ["config", "secrets"] { + for key in client.list_secrets(prefix).await? { + if key.ends_with('/') { + continue; + } + let path = format!("{prefix}/{key}"); + let metadata = client.get_metadata(&path).await?; + versions.insert(path, metadata.version); + } + } + Ok(versions) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn build_settings_patches_updates_changed_values() { + let current = HashMap::from([ + ("llm.default_provider".to_string(), "local".to_string()), + ( + "llm.providers.local.base_url".to_string(), + "http://localhost:11434".to_string(), + ), + ]); + let vault_pairs = vec![ + ("llm.default_provider".to_string(), "vault".to_string()), + ( + "llm.providers.vault.base_url".to_string(), + "http://vault:1234".to_string(), + ), + ]; + + let patches = build_settings_patches(¤t, &vault_pairs); + + assert_eq!( + patches.get("llm.default_provider"), + Some(&Some("vault".to_string())), + ); + assert_eq!( + patches.get("llm.providers.vault.base_url"), + Some(&Some("http://vault:1234".to_string())), + ); + } + + #[test] + fn versions_changed_detects_metadata_updates() { + let previous = HashMap::from([ + ("config/llm".to_string(), 1_u64), + ("secrets/telegram".to_string(), 1_u64), + ]); + let current = HashMap::from([ + ("config/llm".to_string(), 2_u64), + ("secrets/telegram".to_string(), 1_u64), + ]); + + assert!(versions_changed(&previous, ¤t)); + } +} diff --git a/crates/rara-vault/Cargo.toml b/crates/rara-vault/Cargo.toml index 5cd9d2589..c88d39753 100644 --- a/crates/rara-vault/Cargo.toml +++ b/crates/rara-vault/Cargo.toml @@ -8,17 +8,17 @@ repository.workspace = true publish = false [dependencies] +humantime-serde = "1" reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } snafu = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -humantime-serde = "1" [dev-dependencies] -tokio = { workspace = true, features = ["full", "macros"] } serde_yaml = { workspace = true } +tokio = { workspace = true, features = ["full", "macros"] } [lints] workspace = true diff --git a/crates/rara-vault/src/client.rs b/crates/rara-vault/src/client.rs index 7b84afbed..bc00772e0 100644 --- a/crates/rara-vault/src/client.rs +++ b/crates/rara-vault/src/client.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use reqwest::Client; use serde::{Deserialize, Serialize}; @@ -21,8 +20,10 @@ use snafu::ResultExt; use tokio::sync::RwLock; use tracing::{debug, warn}; -use crate::config::VaultConfig; -use crate::error::{self, VaultError}; +use crate::{ + config::VaultConfig, + error::{self, VaultError}, +}; // --------------------------------------------------------------------------- // Vault API response types @@ -37,17 +38,17 @@ pub struct KvV2ReadResponse { /// Inner data envelope of a KV v2 read response. #[derive(Debug, Deserialize)] pub struct KvV2Data { - pub data: HashMap, + pub data: HashMap, pub metadata: KvV2Metadata, } /// Metadata attached to a KV v2 secret version. #[derive(Debug, Deserialize)] pub struct KvV2Metadata { - pub version: u64, + pub version: u64, pub created_time: String, #[serde(default)] - pub destroyed: bool, + pub destroyed: bool, } /// Response from `LIST` on the metadata endpoint. @@ -70,7 +71,7 @@ struct AuthResponse { #[derive(Debug, Deserialize)] struct AuthData { - client_token: String, + client_token: String, lease_duration: u64, } @@ -93,17 +94,17 @@ struct KvV2WriteBody { /// Internal token state. struct TokenState { - token: String, + token: String, /// Seconds remaining when the token was acquired. lease_duration: u64, /// When the token was acquired (monotonic). - acquired_at: tokio::time::Instant, + acquired_at: tokio::time::Instant, } /// HTTP client for HashiCorp Vault KV v2 with AppRole authentication. pub struct VaultClient { - config: VaultConfig, - http: Client, + config: VaultConfig, + http: Client, token_state: Arc>>, } @@ -159,7 +160,7 @@ impl VaultClient { if !status.is_success() { let msg = extract_error_message(resp).await; return Err(VaultError::Api { - status: status.as_u16(), + status: status.as_u16(), message: msg, }); } @@ -172,9 +173,9 @@ impl VaultClient { let mut state = self.token_state.write().await; *state = Some(TokenState { - token: auth_resp.auth.client_token, + token: auth_resp.auth.client_token, lease_duration: auth_resp.auth.lease_duration, - acquired_at: tokio::time::Instant::now(), + acquired_at: tokio::time::Instant::now(), }); Ok(()) } @@ -197,7 +198,7 @@ impl VaultClient { let msg = extract_error_message(resp).await; warn!(status = status.as_u16(), msg, "Token renewal failed"); return Err(VaultError::Api { - status: status.as_u16(), + status: status.as_u16(), message: msg, }); } @@ -205,9 +206,9 @@ impl VaultClient { let auth_resp: AuthResponse = resp.json().await.context(error::ConnectionSnafu)?; let mut state = self.token_state.write().await; *state = Some(TokenState { - token: auth_resp.auth.client_token, + token: auth_resp.auth.client_token, lease_duration: auth_resp.auth.lease_duration, - acquired_at: tokio::time::Instant::now(), + acquired_at: tokio::time::Instant::now(), }); debug!("Vault token renewed"); Ok(()) @@ -254,7 +255,7 @@ impl VaultClient { if !status.is_success() { let msg = extract_error_message(resp).await; return Err(VaultError::Api { - status: status.as_u16(), + status: status.as_u16(), message: msg, }); } @@ -273,7 +274,10 @@ impl VaultClient { let resp = self .http - .request(reqwest::Method::from_bytes(b"LIST").expect("valid method"), &url) + .request( + reqwest::Method::from_bytes(b"LIST").expect("valid method"), + &url, + ) .header("X-Vault-Token", &token) .send() .await @@ -286,7 +290,7 @@ impl VaultClient { if !status.is_success() { let msg = extract_error_message(resp).await; return Err(VaultError::Api { - status: status.as_u16(), + status: status.as_u16(), message: msg, }); } @@ -319,12 +323,17 @@ impl VaultClient { match self.read_secret(&vault_path).await { Ok(resp) => { let settings_prefix = vault_key_to_settings_prefix(key); - flatten_value(&settings_prefix, &serde_json::Value::Object( - resp.data.data.into_iter().collect(), - ), &mut pairs); + flatten_value( + &settings_prefix, + &serde_json::Value::Object(resp.data.data.into_iter().collect()), + &mut pairs, + ); } Err(VaultError::NotFound { .. }) => { - debug!(path = vault_path, "secret not found during pull_all, skipping"); + debug!( + path = vault_path, + "secret not found during pull_all, skipping" + ); } Err(e) => return Err(e), } @@ -364,7 +373,7 @@ impl VaultClient { if !status.is_success() { let msg = extract_error_message(resp).await; return Err(VaultError::Api { - status: status.as_u16(), + status: status.as_u16(), message: msg, }); } @@ -414,7 +423,7 @@ impl VaultClient { if !status.is_success() { let msg = extract_error_message(resp).await; return Err(VaultError::Api { - status: status.as_u16(), + status: status.as_u16(), message: msg, }); } @@ -426,16 +435,15 @@ impl VaultClient { #[derive(Deserialize)] struct MetadataInner { current_version: u64, - created_time: String, + created_time: String, } let body = resp.text().await.context(error::ConnectionSnafu)?; - let meta: MetadataResp = - serde_json::from_str(&body).context(error::DeserializeSnafu)?; + let meta: MetadataResp = serde_json::from_str(&body).context(error::DeserializeSnafu)?; Ok(KvV2Metadata { - version: meta.data.current_version, + version: meta.data.current_version, created_time: meta.data.created_time, - destroyed: false, + destroyed: false, }) } @@ -537,9 +545,9 @@ pub(crate) fn flatten_value( serde_json::Value::Array(arr) => { let joined: Vec = arr .iter() - .filter_map(|v| match v { - serde_json::Value::String(s) => Some(s.clone()), - other => Some(other.to_string()), + .map(|v| match v { + serde_json::Value::String(s) => s.clone(), + other => other.to_string(), }) .collect(); out.push((prefix.to_string(), joined.join(","))); @@ -601,7 +609,11 @@ pub(crate) fn unflatten_to_vault_paths( for (key, value) in pairs { if let Some(dot_pos) = key.find('.') { let vault_key = settings_prefix_to_vault_key(&key); - let vault_prefix = if is_secret_key(&key) { "secrets" } else { "config" }; + let vault_prefix = if is_secret_key(&key) { + "secrets" + } else { + "config" + }; let path = format!("{vault_prefix}/{vault_key}"); // Strip the settings prefix to get the field path within @@ -609,7 +621,9 @@ pub(crate) fn unflatten_to_vault_paths( // vault_key is "knowledge", so we need the part after // "memory.knowledge." → "embedding_model". let settings_prefix = vault_key_to_settings_prefix(vault_key); - let rest = key.strip_prefix(&format!("{settings_prefix}.")).unwrap_or(&key[dot_pos + 1..]); + let rest = key + .strip_prefix(&format!("{settings_prefix}.")) + .unwrap_or(&key[dot_pos + 1..]); let entry = grouped.entry(path).or_default(); set_nested_value(entry, rest, serde_json::Value::String(value)); @@ -636,10 +650,8 @@ fn set_nested_value( .entry(parts[0].to_string()) .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new())); if let serde_json::Value::Object(inner) = entry { - let mut inner_map: HashMap = inner - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); + let mut inner_map: HashMap = + inner.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); set_nested_value(&mut inner_map, parts[1], value); *entry = serde_json::Value::Object(inner_map.into_iter().collect()); } @@ -676,9 +688,7 @@ impl VaultClient { /// Build the URL for the AppRole login endpoint (exposed for unit tests). #[cfg(test)] - fn login_url(&self) -> String { - format!("{}/v1/auth/approle/login", self.config.address) - } + fn login_url(&self) -> String { format!("{}/v1/auth/approle/login", self.config.address) } } // --------------------------------------------------------------------------- @@ -687,21 +697,22 @@ impl VaultClient { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use crate::config::{VaultAuthConfig, VaultConfig}; - use std::time::Duration; fn test_config() -> VaultConfig { VaultConfig { - address: "http://10.0.0.5:30820".into(), - mount_path: "secret/rara".into(), - auth: VaultAuthConfig { - method: "approle".into(), - role_id_file: "/etc/rara/vault-role-id".into(), + address: "http://10.0.0.5:30820".into(), + mount_path: "secret/rara".into(), + auth: VaultAuthConfig { + method: "approle".into(), + role_id_file: "/etc/rara/vault-role-id".into(), secret_id_file: "/etc/rara/vault-secret-id".into(), }, - watch_interval: Duration::from_secs(30), - timeout: Duration::from_secs(5), + watch_interval: Duration::from_secs(30), + timeout: Duration::from_secs(5), fallback_to_local: true, } } @@ -754,10 +765,7 @@ mod tests { pairs.sort(); assert_eq!(pairs.len(), 2); - assert!(pairs.contains(&( - "llm.providers.openrouter.api_key".into(), - "sk-xxx".into() - ))); + assert!(pairs.contains(&("llm.providers.openrouter.api_key".into(), "sk-xxx".into()))); assert!(pairs.contains(&( "llm.providers.openrouter.base_url".into(), "https://openrouter.ai/api/v1".into() @@ -788,10 +796,7 @@ mod tests { ("http.bind_address".into(), "127.0.0.1:25555".into()), ("http.port".into(), "8080".into()), // api_key should go to secrets/, not config/ - ( - "llm.providers.openrouter.api_key".into(), - "sk-xxx".into(), - ), + ("llm.providers.openrouter.api_key".into(), "sk-xxx".into()), // base_url is not a secret, goes to config/ ( "llm.providers.openrouter.base_url".into(), @@ -827,7 +832,9 @@ mod tests { let openrouter = providers.get("openrouter").expect("openrouter"); assert_eq!( openrouter.get("base_url"), - Some(&serde_json::Value::String("https://openrouter.ai/api/v1".into())) + Some(&serde_json::Value::String( + "https://openrouter.ai/api/v1".into() + )) ); // telegram.bot_token → secrets/telegram @@ -876,8 +883,14 @@ mod tests { #[test] fn vault_key_mapping_roundtrip() { // knowledge ↔ memory.knowledge - assert_eq!(vault_key_to_settings_prefix("knowledge"), "memory.knowledge"); - assert_eq!(settings_prefix_to_vault_key("memory.knowledge.embedding_model"), "knowledge"); + assert_eq!( + vault_key_to_settings_prefix("knowledge"), + "memory.knowledge" + ); + assert_eq!( + settings_prefix_to_vault_key("memory.knowledge.embedding_model"), + "knowledge" + ); // Most keys are identity mappings assert_eq!(vault_key_to_settings_prefix("http"), "http"); @@ -889,11 +902,7 @@ mod tests { #[test] fn set_nested_creates_structure() { let mut map = HashMap::new(); - set_nested_value( - &mut map, - "a.b.c", - serde_json::Value::String("deep".into()), - ); + set_nested_value(&mut map, "a.b.c", serde_json::Value::String("deep".into())); let a = map.get("a").expect("a"); let b = a.get("b").expect("b"); diff --git a/crates/rara-vault/src/config.rs b/crates/rara-vault/src/config.rs index a931dee16..9ee1af9c8 100644 --- a/crates/rara-vault/src/config.rs +++ b/crates/rara-vault/src/config.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::PathBuf; -use std::time::Duration; +use std::{path::PathBuf, time::Duration}; use serde::{Deserialize, Serialize}; @@ -37,7 +36,6 @@ pub struct VaultConfig { pub address: String, /// KV v2 mount path, e.g. `"secret/rara"`. - #[serde(default = "default_mount_path")] pub mount_path: String, /// Authentication configuration. @@ -60,7 +58,6 @@ pub struct VaultConfig { pub timeout: Duration, /// Whether to fall back to local config when Vault is unreachable. - #[serde(default = "default_fallback")] pub fallback_to_local: bool, } @@ -78,25 +75,11 @@ pub struct VaultAuthConfig { pub secret_id_file: PathBuf, } -fn default_mount_path() -> String { - "secret/rara".into() -} - -fn default_watch_interval() -> Duration { - Duration::from_secs(30) -} +fn default_watch_interval() -> Duration { Duration::from_secs(30) } -fn default_timeout() -> Duration { - Duration::from_secs(5) -} +fn default_timeout() -> Duration { Duration::from_secs(5) } -fn default_fallback() -> bool { - true -} - -fn default_auth_method() -> String { - "approle".into() -} +fn default_auth_method() -> String { "approle".into() } #[cfg(test)] mod tests { @@ -105,15 +88,15 @@ mod tests { #[test] fn serde_roundtrip() { let config = VaultConfig { - address: "http://10.0.0.5:30820".into(), - mount_path: "secret/rara".into(), - auth: VaultAuthConfig { - method: "approle".into(), - role_id_file: "/etc/rara/vault-role-id".into(), + address: "http://10.0.0.5:30820".into(), + mount_path: "secret/rara".into(), + auth: VaultAuthConfig { + method: "approle".into(), + role_id_file: "/etc/rara/vault-role-id".into(), secret_id_file: "/etc/rara/vault-secret-id".into(), }, - watch_interval: Duration::from_secs(30), - timeout: Duration::from_secs(5), + watch_interval: Duration::from_secs(30), + timeout: Duration::from_secs(5), fallback_to_local: true, }; @@ -134,15 +117,17 @@ mod tests { fn deserialize_with_defaults() { let yaml = r#" address: "http://localhost:8200" +mount_path: "secret/data" +fallback_to_local: false auth: role_id_file: /tmp/role-id secret_id_file: /tmp/secret-id "#; let config: VaultConfig = serde_yaml::from_str(yaml).expect("deserialize"); - assert_eq!(config.mount_path, "secret/rara"); + assert_eq!(config.mount_path, "secret/data"); assert_eq!(config.watch_interval, Duration::from_secs(30)); assert_eq!(config.timeout, Duration::from_secs(5)); - assert!(config.fallback_to_local); + assert!(!config.fallback_to_local); assert_eq!(config.auth.method, "approle"); } } diff --git a/crates/rara-vault/src/error.rs b/crates/rara-vault/src/error.rs index 7b4b64627..fe5238e9a 100644 --- a/crates/rara-vault/src/error.rs +++ b/crates/rara-vault/src/error.rs @@ -26,7 +26,7 @@ pub enum VaultError { #[snafu(display("Failed to read auth credential file {path}: {source}"))] CredentialFile { - path: String, + path: String, source: std::io::Error, },