diff --git a/Cargo.toml b/Cargo.toml index 86544f51a..c0016a728 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,7 +132,6 @@ members = [ "crates/extensions/rara-git", "crates/soul", "crates/symphony", - "crates/rara-vault", "crates/rara-dock", "crates/rara-acp", "crates/drivers/browser", @@ -270,7 +269,6 @@ rara-sessions = { path = "crates/sessions" } rara-skills = { path = "crates/skills" } rara-soul = { path = "crates/soul" } rara-symphony = { path = "crates/symphony" } -rara-vault = { path = "crates/rara-vault" } # fff (Fast File Finder) — Cargo git dependencies (not vendored) fff-search = { git = "https://github.com/dmtrKovalenko/fff.nvim", default-features = false } diff --git a/crates/app/src/boot.rs b/crates/app/src/boot.rs index 3d1907a22..c377e734f 100644 --- a/crates/app/src/boot.rs +++ b/crates/app/src/boot.rs @@ -94,6 +94,7 @@ pub struct PlatformBindingConfig { pub(crate) async fn boot( pool: sqlx::SqlitePool, settings_provider: Arc, + settings_svc: Arc, users: &[UserConfig], browser_manager: Option, ) -> Result { @@ -214,6 +215,7 @@ pub(crate) async fn boot( &mut tool_registry, crate::tools::ToolDeps { settings: settings_provider.clone(), + settings_svc: settings_svc.clone(), composio_auth_provider, skill_registry: skill_registry.clone(), mcp_manager: mcp_manager.clone(), diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index dbb7c8253..78d4f0379 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -321,10 +321,9 @@ pub async fn start_with_options( .whatever_context("Failed to initialize infrastructure services")?; let pool = db_store.pool().clone(); - let settings_svc = - rara_backend_admin::settings::SettingsSvc::load(db_store.kv_store(), pool.clone()) - .await - .whatever_context("Failed to initialize runtime settings")?; + let settings_svc = rara_backend_admin::settings::SettingsSvc::load(pool.clone()) + .await + .whatever_context("Failed to initialize runtime settings")?; let settings_provider: Arc = Arc::new(settings_svc.clone()); @@ -363,6 +362,7 @@ pub async fn start_with_options( let rara = crate::boot::boot( pool.clone(), settings_provider.clone(), + Arc::new(settings_svc.clone()), &config.users, browser_manager, ) diff --git a/crates/app/src/tools/mod.rs b/crates/app/src/tools/mod.rs index 3d3a632cc..b5702d0c9 100644 --- a/crates/app/src/tools/mod.rs +++ b/crates/app/src/tools/mod.rs @@ -133,6 +133,7 @@ pub fn rara_tool_names() -> Vec { /// Dependencies required to construct all tools. pub struct ToolDeps { pub settings: Arc, + pub settings_svc: Arc, pub composio_auth_provider: Arc, pub skill_registry: rara_skills::registry::InMemoryRegistry, pub mcp_manager: rara_mcp::manager::mgr::McpManager, @@ -194,7 +195,7 @@ pub fn register_all(registry: &mut ToolRegistry, deps: ToolDeps) -> ToolRegistra Arc::new(SendEmailTool::new(deps.settings.clone())), Arc::new(SendFileTool::new()), Arc::new(SetAvatarTool::new(deps.settings.clone())), - Arc::new(SettingsTool::new(deps.settings.clone())), + Arc::new(SettingsTool::new(deps.settings_svc.clone())), // Skill tools Arc::new(ListSkillsTool::new(deps.skill_registry.clone())), Arc::new(CreateSkillTool::new(deps.skill_registry.clone())), diff --git a/crates/app/src/tools/settings.rs b/crates/app/src/tools/settings.rs index 7c5c4e65f..2e6334127 100644 --- a/crates/app/src/tools/settings.rs +++ b/crates/app/src/tools/settings.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; +use rara_backend_admin::settings::SettingsSvc; use rara_domain_shared::settings::SettingsProvider; use rara_kernel::tool::{ToolContext, ToolExecute}; use rara_tool_macro::ToolDef; @@ -26,29 +27,60 @@ use serde_json::{Value, json}; const SENSITIVE_FRAGMENTS: &[&str] = &["api_key", "token", "password", "secret"]; +/// Number of version entries returned by the history action. +const TOOL_HISTORY_LIMIT: i64 = 20; + +/// Number of leading characters shown before masking a sensitive value. +const MASK_VISIBLE_LEN: usize = 6; + +/// Available actions for the settings tool. +#[derive(Debug, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +enum SettingsAction { + /// List all settings. + List, + /// Get a single setting by key. + Get, + /// Set a single setting by key. + Set, + /// Show the current settings version number. + Version, + /// Show recent version history. + History, + /// Show a point-in-time snapshot at a given version. + Snapshot, + /// Rollback settings to a given version. + Rollback, +} + #[derive(Debug, Deserialize, JsonSchema)] pub struct SettingsParams { - /// The action to perform: list, get, or set. - action: String, - /// The setting key (required for get and set). - key: Option, + /// The action to perform. + action: SettingsAction, + /// The setting key (required for get/set). + key: Option, /// The value to set (required for set). - value: Option, + value: Option, + /// The version number (required for snapshot/rollback). + version: Option, } /// Agent tool that reads and modifies runtime settings. #[derive(ToolDef)] #[tool( name = "settings", - description = "Read and modify runtime settings. Use 'list' to see all settings, 'get' to \ - read a specific key, 'set' to update a value.", + description = "Read and modify runtime settings. Actions: 'list' to see all, 'get' to read a \ + key, 'set' to update a value, 'version' for current version, 'history' for \ + recent changes, 'snapshot' for point-in-time view, 'rollback' to revert to a \ + version.", tier = "deferred" )] pub struct SettingsTool { - settings: Arc, + settings: Arc, } impl SettingsTool { - pub fn new(settings: Arc) -> Self { Self { settings } } + /// Create a new settings tool backed by the MVCC settings service. + pub fn new(settings: Arc) -> Self { Self { settings } } } #[async_trait] @@ -57,8 +89,8 @@ impl ToolExecute for SettingsTool { type Params = SettingsParams; async fn run(&self, params: SettingsParams, _context: &ToolContext) -> anyhow::Result { - match params.action.as_str() { - "list" => { + match params.action { + SettingsAction::List => { let all = self.settings.list().await; let masked: serde_json::Map = all .into_iter() @@ -69,7 +101,7 @@ impl ToolExecute for SettingsTool { .collect(); Ok(json!({"settings": masked})) } - "get" => { + SettingsAction::Get => { let key = params .key .as_deref() @@ -79,7 +111,7 @@ impl ToolExecute for SettingsTool { None => Ok(json!({"key": key, "value": null})), } } - "set" => { + SettingsAction::Set => { let key = params .key .as_deref() @@ -91,7 +123,47 @@ impl ToolExecute for SettingsTool { self.settings.set(key, value).await?; Ok(json!({"key": key, "updated": true})) } - other => Ok(json!({"error": format!("unknown action: {other}")})), + SettingsAction::Version => { + let ver = self.settings.current_version().await?; + Ok(json!({"version": ver})) + } + SettingsAction::History => { + let entries = self.settings.list_versions(TOOL_HISTORY_LIMIT).await?; + let masked: Vec = entries + .into_iter() + .map(|e| { + let masked_val = e.value.as_deref().map(|v| maybe_mask(&e.key, v)); + json!({ + "version": e.version, + "key": e.key, + "value": masked_val, + "changed_at": e.changed_at, + }) + }) + .collect(); + Ok(json!({"versions": masked})) + } + SettingsAction::Snapshot => { + let ver = params + .version + .ok_or_else(|| anyhow::anyhow!("missing required parameter: version"))?; + let snap = self.settings.snapshot(ver).await?; + let masked: serde_json::Map = snap + .into_iter() + .map(|(k, v)| { + let display = maybe_mask(&k, &v); + (k, Value::String(display)) + }) + .collect(); + Ok(json!({"version": ver, "settings": masked})) + } + SettingsAction::Rollback => { + let ver = params + .version + .ok_or_else(|| anyhow::anyhow!("missing required parameter: version"))?; + let new_ver = self.settings.rollback_to(ver).await?; + Ok(json!({"rolled_back_to": ver, "new_version": new_ver})) + } } } } @@ -102,10 +174,13 @@ fn maybe_mask(key: &str, value: &str) -> String { .iter() .any(|frag| key_lower.contains(frag)); if is_sensitive { - if value.len() < 6 { + // Use char iterator to avoid panic on multi-byte UTF-8 values. + let char_count = value.chars().count(); + if char_count < MASK_VISIBLE_LEN { "****".to_owned() } else { - format!("{}****", &value[..6]) + let prefix: String = value.chars().take(MASK_VISIBLE_LEN).collect(); + format!("{prefix}****") } } else { value.to_owned() diff --git a/crates/extensions/backend-admin/AGENT.md b/crates/extensions/backend-admin/AGENT.md index 05dbca69e..8fe3f9eee 100644 --- a/crates/extensions/backend-admin/AGENT.md +++ b/crates/extensions/backend-admin/AGENT.md @@ -8,7 +8,9 @@ Unified HTTP admin routes for all backend subsystems — settings management, mo ### Key modules -- `src/settings/` — Runtime settings CRUD backed by a KV store. `SettingsSvc` loads settings at startup and provides a `SettingsProvider` trait implementation with change notifications via `watch::Receiver`. +- `src/settings/` — Runtime settings CRUD with MVCC versioning. `SettingsSvc` stores settings in a `settings_version` table (append-only log) where every mutation bumps a global version counter. Provides a `SettingsProvider` trait implementation with change notifications via `watch::Receiver`. + - `service.rs` — `SettingsSvc` backed directly by `SqlitePool` (no KVStore dependency). Public API: `get()`, `set()`, `delete()`, `batch_update()`, `current_version()`, `snapshot(version)`, `list_versions(limit)`, `rollback_to(version)`. + - `router.rs` — Axum routes under `/api/v1/settings/`. Version endpoints under `/api/v1/settings/versions/` for listing versions, getting current version, snapshots, and rollback. - `src/chat/` — Chat and session HTTP endpoints (list sessions, send messages, stream responses). - `src/kernel/` — Kernel control routes (agent info, execution traces, debug endpoints). - `src/agents/` — Agent manifest listing and management routes. @@ -24,20 +26,30 @@ Unified HTTP admin routes for all backend subsystems — settings management, mo 2. `state.routes()` returns an Axum router with all admin endpoints merged. 3. Routes are mounted into the main HTTP server by `rara-app`. +### Settings MVCC model + +- The `settings_version` table is an append-only log. Each row contains a version number, the full settings snapshot (JSON), and a timestamp. +- Writes (`set`, `delete`, `batch_update`) read the current snapshot, apply the mutation, bump the version, and append a new row — all within a single SQLite transaction. +- Rollback is **forward-only**: `rollback_to(v)` reads the snapshot at version `v` and appends it as a new version. History is never rewritten. +- `SettingsSvc` depends only on `SqlitePool` — it does NOT use `KVStore` or any external store abstraction. + ## Critical Invariants - `SettingsSvc` is the single source of truth for runtime-mutable settings (LLM keys, Telegram tokens, etc.). - Settings changes are broadcast via `tokio::sync::watch` — subscribers get notified of all changes. -- Admin routes should not bypass `SettingsSvc` to read/write settings directly in the KV store. +- Admin routes should not bypass `SettingsSvc` to read/write settings directly in the database. +- Every settings mutation MUST go through the versioned write path — never insert directly into `settings_version`. ## What NOT To Do - Do NOT put repository implementations in this crate — it provides HTTP routes, not data access. - Do NOT hardcode settings values — all mutable config goes through `SettingsSvc`. - Do NOT duplicate route paths — each subsystem owns its own `/api/v1//` namespace. +- Do NOT delete rows from `settings_version` — the table is append-only by design. +- Do NOT bypass `SettingsSvc` to write settings directly to SQLite — this breaks version consistency and watch notifications. ## Dependencies -**Upstream:** `rara-kernel` (for `KernelHandle`, session/tape types), `rara-skills`, `rara-mcp`, `yunara-store` (KV store), `axum`. +**Upstream:** `rara-kernel` (for `KernelHandle`, session/tape types), `rara-skills`, `rara-mcp`, `axum`, `sqlx`. **Downstream:** `rara-app` (mounts routes into the HTTP server). diff --git a/crates/extensions/backend-admin/src/settings/router.rs b/crates/extensions/backend-admin/src/settings/router.rs index 460c3c877..d11aa7895 100644 --- a/crates/extensions/backend-admin/src/settings/router.rs +++ b/crates/extensions/backend-admin/src/settings/router.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Flat KV settings HTTP API. +//! MVCC-versioned settings HTTP API. //! -//! | Method | Path | Description | -//! |--------|----------------------------|------------------------| -//! | GET | `/api/v1/settings` | list all | -//! | PATCH | `/api/v1/settings` | batch update | -//! | GET | `/api/v1/settings/{*key}` | get one | -//! | PUT | `/api/v1/settings/{*key}` | set one | -//! | DELETE | `/api/v1/settings/{*key}` | delete one | +//! | Method | Path | Description | +//! |--------|-----------------------------------------|------------------------| +//! | GET | `/api/v1/settings` | list all | +//! | PATCH | `/api/v1/settings` | batch update | +//! | GET | `/api/v1/settings/{*key}` | get one | +//! | PUT | `/api/v1/settings/{*key}` | set one | +//! | DELETE | `/api/v1/settings/{*key}` | delete one | +//! | GET | `/api/v1/settings/versions` | list recent versions | +//! | GET | `/api/v1/settings/versions/current` | current version number | +//! | GET | `/api/v1/settings/versions/{n}` | snapshot at version N | +//! | POST | `/api/v1/settings/versions/{n}/rollback`| rollback to version N | use std::{collections::HashMap, sync::Arc}; @@ -28,19 +32,45 @@ use axum::{ Json, extract::{Path, State}, http::StatusCode, - routing::get, + routing::{get, post}, }; use rara_domain_shared::settings::SettingsProvider; use utoipa_axum::router::OpenApiRouter; -use crate::settings::SettingsSvc; +use crate::settings::{SettingsSvc, service::VersionEntry}; + +/// Default number of version entries returned by the list endpoint. +const DEFAULT_VERSION_LIMIT: i64 = 100; + +// -- typed response structs for version endpoints -- + +/// Response for the current version endpoint. +#[derive(serde::Serialize)] +struct VersionResponse { + version: i64, +} + +/// Response for the snapshot endpoint. +#[derive(serde::Serialize)] +struct SnapshotResponse { + version: i64, + settings: HashMap, +} + +/// Response for the rollback endpoint. +#[derive(serde::Serialize)] +struct RollbackResponse { + rolled_back_to: i64, + new_version: i64, +} // -- state wrapper -- type SharedProvider = Arc; pub fn routes(svc: SettingsSvc) -> OpenApiRouter { - let provider: SharedProvider = Arc::new(svc); + let svc = Arc::new(svc); + let provider: SharedProvider = svc.clone(); let settings_router = axum::Router::new() .route( @@ -53,7 +83,19 @@ pub fn routes(svc: SettingsSvc) -> OpenApiRouter { ) .with_state(provider); - OpenApiRouter::from(settings_router) + // Version routes use Arc directly — these methods live on the + // concrete type, not the SettingsProvider trait. Nested under a fixed prefix + // so they cannot collide with the `{*key}` wildcard. + let version_router = axum::Router::new() + .route("/", get(list_versions)) + .route("/current", get(get_current_version)) + .route("/{n}", get(snapshot_at_version)) + .route("/{n}/rollback", post(rollback_to_version)) + .with_state(svc); + + let combined = settings_router.nest("/api/v1/settings/versions", version_router); + + OpenApiRouter::from(combined) } // -- request / response types ----------------------------------------------- @@ -115,3 +157,71 @@ async fn batch_update_settings( .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; Ok(StatusCode::NO_CONTENT) } + +// -- version handlers +// ----------------------------------------------------------- + +/// List recent version log entries. +async fn list_versions( + State(svc): State>, +) -> Result>, StatusCode> { + svc.list_versions(DEFAULT_VERSION_LIMIT) + .await + .map(Json) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) +} + +/// Return the current global version number. +async fn get_current_version( + State(svc): State>, +) -> Result, StatusCode> { + svc.current_version() + .await + .map(|v| Json(VersionResponse { version: v })) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) +} + +/// Return a point-in-time snapshot of all settings at version `n`. +/// +/// Returns 404 if the requested version does not exist. +async fn snapshot_at_version( + State(svc): State>, + Path(version): Path, +) -> Result, (StatusCode, String)> { + svc.snapshot(version) + .await + .map(|settings| Json(SnapshotResponse { version, settings })) + .map_err(|e| { + let msg = e.to_string(); + if msg.contains("does not exist") { + (StatusCode::NOT_FOUND, msg) + } else { + (StatusCode::INTERNAL_SERVER_ERROR, msg) + } + }) +} + +/// Rollback settings to the state at version `n` (creates a new version). +/// +/// Returns 404 if the target version does not exist. +async fn rollback_to_version( + State(svc): State>, + Path(version): Path, +) -> Result, (StatusCode, String)> { + svc.rollback_to(version) + .await + .map(|new_version| { + Json(RollbackResponse { + rolled_back_to: version, + new_version, + }) + }) + .map_err(|e| { + let msg = e.to_string(); + if msg.contains("does not exist") { + (StatusCode::NOT_FOUND, msg) + } else { + (StatusCode::INTERNAL_SERVER_ERROR, msg) + } + }) +} diff --git a/crates/extensions/backend-admin/src/settings/service.rs b/crates/extensions/backend-admin/src/settings/service.rs index 79f091df5..1d28ecc68 100644 --- a/crates/extensions/backend-admin/src/settings/service.rs +++ b/crates/extensions/backend-admin/src/settings/service.rs @@ -12,38 +12,38 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Runtime-changeable settings backed by flat KV store. +//! Runtime-changeable settings backed by the MVCC `settings_version` table. //! -//! Each setting is stored as a separate row in `kv_table` with a -//! `settings.` prefix (e.g. `settings.llm.provider`). +//! Each mutation appends a row with a monotonically increasing global version. +//! `value = NULL` represents a tombstone (deleted key). Current state is the +//! snapshot at the maximum version. use std::{collections::HashMap, sync::Arc}; +use rara_domain_shared::settings::SettingsProvider; use snafu::Whatever; -use sqlx::SqlitePool; +use sqlx::{QueryBuilder, Sqlite, SqlitePool}; use tokio::sync::watch; -use yunara_store::KVStore; +use tracing::warn; -/// Internal prefix applied to all settings keys in the KV store. +/// Internal prefix applied to all settings keys in the version table. const PREFIX: &str = "settings."; -/// Service that manages flat KV settings with PostgreSQL persistence. +/// Settings service backed entirely by the MVCC `settings_version` table. /// /// Implements -/// [`SettingsProvider`](rara_domain_shared::settings::SettingsProvider). +/// [`SettingsProvider`]. #[derive(Clone)] pub struct SettingsSvc { - kv: KVStore, pool: SqlitePool, tx: Arc>, } impl SettingsSvc { - /// Load settings from the flat KV store. - pub async fn load(kv: KVStore, pool: SqlitePool) -> Result { + /// Load settings service with the given database pool. + pub async fn load(pool: SqlitePool) -> Result { let (tx, _rx) = watch::channel(()); Ok(Self { - kv, pool, tx: Arc::new(tx), }) @@ -51,76 +51,388 @@ impl SettingsSvc { /// Notify subscribers after a mutation. fn notify(&self) { let _ = self.tx.send(()); } + + /// Bump version counter and append entries in a single transaction. + /// + /// Returns the new version number. If `entries` is empty, returns the + /// current version without writing anything. + async fn versioned_write(&self, entries: Vec<(String, Option)>) -> anyhow::Result { + if entries.is_empty() { + return self.current_version().await; + } + let mut tx = self.pool.begin().await?; + let (ver,): (i64,) = sqlx::query_as( + "UPDATE settings_version_counter SET current = current + 1 WHERE id = 1 RETURNING \ + current", + ) + .fetch_one(&mut *tx) + .await?; + + let mut builder = + QueryBuilder::::new("INSERT INTO settings_version (version, key, value) "); + builder.push_values(&entries, |mut row, (key, value)| { + row.push_bind(ver).push_bind(key).push_bind(value); + }); + builder.build().execute(&mut *tx).await?; + tx.commit().await?; + Ok(ver) + } + + /// Read the current global version number. + pub async fn current_version(&self) -> anyhow::Result { + let (ver,): (i64,) = + sqlx::query_as("SELECT current FROM settings_version_counter WHERE id = 1") + .fetch_one(&self.pool) + .await?; + Ok(ver) + } + + /// Snapshot all settings at a given version (MVCC point-in-time read). + /// + /// Fails if `version` is outside the valid range `[0, current]`. + pub async fn snapshot(&self, version: i64) -> anyhow::Result> { + let current_ver = self.current_version().await?; + anyhow::ensure!( + version >= 0 && version <= current_ver, + "version {version} does not exist (current: {current_ver})" + ); + let rows: Vec<(String, Option)> = sqlx::query_as( + "SELECT sv.key, sv.value + FROM settings_version sv + INNER JOIN ( + SELECT key, MAX(version) AS max_ver + FROM settings_version + WHERE version <= ?1 + GROUP BY key + ) latest ON sv.key = latest.key AND sv.version = latest.max_ver + WHERE sv.value IS NOT NULL", + ) + .bind(version) + .fetch_all(&self.pool) + .await?; + + Ok(rows + .into_iter() + .filter_map(|(k, v)| { + let stripped = k.strip_prefix(PREFIX)?; + let plain: String = serde_json::from_str(&v?).unwrap_or_default(); + Some((stripped.to_owned(), plain)) + }) + .collect()) + } + + /// List version log entries (limited to the most recent `limit` versions). + pub async fn list_versions(&self, limit: i64) -> anyhow::Result> { + let rows: Vec = sqlx::query_as( + "SELECT sv.version, sv.key, sv.value, sv.changed_at + FROM settings_version sv + WHERE sv.version IN ( + SELECT DISTINCT version FROM settings_version + ORDER BY version DESC + LIMIT ?1 + ) + ORDER BY sv.version DESC, sv.key ASC", + ) + .bind(limit) + .fetch_all(&self.pool) + .await?; + + // Strip the internal `settings.` prefix and decode JSON-encoded values + // so consumers see clean keys and plain strings. + Ok(rows + .into_iter() + .map(|mut e| { + if let Some(stripped) = e.key.strip_prefix(PREFIX) { + e.key = stripped.to_owned(); + } + if let Some(ref raw) = e.value { + if let Ok(decoded) = serde_json::from_str::(raw) { + e.value = Some(decoded); + } + } + e + }) + .collect()) + } + + /// Rollback to a specific version (forward operation — creates a new + /// version). + /// + /// Fails if `target_version` is outside the valid range `[0, current]`. + pub async fn rollback_to(&self, target_version: i64) -> anyhow::Result { + let current_ver = self.current_version().await?; + anyhow::ensure!( + target_version >= 0 && target_version <= current_ver, + "target version {target_version} does not exist (current: {current_ver})" + ); + + let target_snap = self.snapshot(target_version).await?; + let current = self.list().await; + + let mut entries: Vec<(String, Option)> = Vec::new(); + for (k, v) in &target_snap { + if current.get(k) != Some(v) { + let prefixed = format!("{PREFIX}{k}"); + let json_val = serde_json::to_string(v).expect("String is always valid JSON"); + entries.push((prefixed, Some(json_val))); + } + } + for k in current.keys() { + if !target_snap.contains_key(k.as_str()) { + let prefixed = format!("{PREFIX}{k}"); + entries.push((prefixed, None)); + } + } + + if entries.is_empty() { + return Ok(current_ver); + } + + let new_ver = self.versioned_write(entries).await?; + self.notify(); + Ok(new_ver) + } +} + +/// A single entry in the version log. +#[derive(Debug, Clone, serde::Serialize, sqlx::FromRow)] +pub struct VersionEntry { + /// The version number this entry belongs to. + pub version: i64, + /// The settings key (prefix-stripped, e.g. `llm.provider`). + pub key: String, + /// The value, or `None` for tombstones. + pub value: Option, + /// When this entry was created. + pub changed_at: String, } #[async_trait::async_trait] impl rara_domain_shared::settings::SettingsProvider for SettingsSvc { async fn get(&self, key: &str) -> Option { let prefixed = format!("{PREFIX}{key}"); - self.kv.get::(&prefixed).await.ok().flatten() + let row: Option<(Option,)> = sqlx::query_as( + "SELECT value FROM settings_version + WHERE key = ?1 + ORDER BY version DESC LIMIT 1", + ) + .bind(&prefixed) + .fetch_optional(&self.pool) + .await + .ok()?; + + row.and_then(|(v,)| { + let raw = v?; // None = tombstone — treat as absent + serde_json::from_str(&raw).ok() + }) } async fn set(&self, key: &str, value: &str) -> anyhow::Result<()> { let prefixed = format!("{PREFIX}{key}"); - self.kv - .set(&prefixed, &value.to_owned()) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; + let json_val = serde_json::to_string(&value)?; + self.versioned_write(vec![(prefixed, Some(json_val))]) + .await?; self.notify(); Ok(()) } async fn delete(&self, key: &str) -> anyhow::Result<()> { let prefixed = format!("{PREFIX}{key}"); - self.kv - .remove(&prefixed) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; + self.versioned_write(vec![(prefixed, None)]).await?; self.notify(); Ok(()) } async fn list(&self) -> HashMap { - // Query all rows with the settings prefix. - let rows: Vec<(String, String)> = - sqlx::query_as("SELECT key, value FROM kv_table WHERE key LIKE ?1") - .bind(format!("{PREFIX}%")) - .fetch_all(&self.pool) - .await - .unwrap_or_default(); - - rows.into_iter() - .filter_map(|(k, v)| { - let stripped = k.strip_prefix(PREFIX)?; - // Values are JSON-encoded strings in the KV store, so we - // need to deserialize the outer JSON quotes. - let plain: String = serde_json::from_str(&v).unwrap_or(v); - Some((stripped.to_owned(), plain)) - }) - .collect() + match self.current_version().await { + Ok(ver) => self.snapshot(ver).await.unwrap_or_default(), + Err(e) => { + warn!("failed to read current settings version: {e}"); + HashMap::new() + } + } } async fn batch_update(&self, patches: HashMap>) -> anyhow::Result<()> { - for (key, value) in &patches { - let prefixed = format!("{PREFIX}{key}"); - match value { - Some(v) => { - self.kv - .set(&prefixed, &v.to_owned()) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - } - None => { - self.kv - .remove(&prefixed) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - } - } + if patches.is_empty() { + return Ok(()); } + let entries: Vec<(String, Option)> = patches + .into_iter() + .map(|(key, value)| { + let prefixed = format!("{PREFIX}{key}"); + let json_val = + value.map(|v| serde_json::to_string(&v).expect("String is always valid JSON")); + (prefixed, json_val) + }) + .collect(); + self.versioned_write(entries).await?; self.notify(); Ok(()) } fn subscribe(&self) -> watch::Receiver<()> { self.tx.subscribe() } } + +#[cfg(test)] +mod tests { + use rara_domain_shared::settings::SettingsProvider; + + use super::*; + + async fn test_pool() -> SqlitePool { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + sqlx::raw_sql(include_str!( + "../../../../rara-model/migrations/20260304000000_init.up.sql" + )) + .execute(&pool) + .await + .unwrap(); + sqlx::raw_sql(include_str!( + "../../../../rara-model/migrations/20260412000000_settings_version.up.sql" + )) + .execute(&pool) + .await + .unwrap(); + pool + } + + #[tokio::test] + async fn set_bumps_version() { + let pool = test_pool().await; + let svc = SettingsSvc::load(pool).await.unwrap(); + + svc.set("llm.provider", "ollama").await.unwrap(); + assert_eq!(svc.current_version().await.unwrap(), 1); + + svc.set("llm.provider", "openrouter").await.unwrap(); + assert_eq!(svc.current_version().await.unwrap(), 2); + } + + #[tokio::test] + async fn get_returns_latest_value() { + let pool = test_pool().await; + let svc = SettingsSvc::load(pool).await.unwrap(); + + svc.set("key", "v1").await.unwrap(); + svc.set("key", "v2").await.unwrap(); + assert_eq!(svc.get("key").await.unwrap(), "v2"); + } + + #[tokio::test] + async fn snapshot_at_version() { + let pool = test_pool().await; + let svc = SettingsSvc::load(pool).await.unwrap(); + + svc.set("a", "1").await.unwrap(); // v1 + svc.set("b", "2").await.unwrap(); // v2 + svc.set("a", "3").await.unwrap(); // v3 + + let snap = svc.snapshot(2).await.unwrap(); + assert_eq!(snap.get("a").unwrap(), "1"); + assert_eq!(snap.get("b").unwrap(), "2"); + + let snap = svc.snapshot(3).await.unwrap(); + assert_eq!(snap.get("a").unwrap(), "3"); + } + + #[tokio::test] + async fn delete_creates_tombstone() { + let pool = test_pool().await; + let svc = SettingsSvc::load(pool).await.unwrap(); + + svc.set("x", "val").await.unwrap(); // v1 + svc.delete("x").await.unwrap(); // v2 tombstone + + assert!(svc.get("x").await.is_none()); + + let snap = svc.snapshot(1).await.unwrap(); + assert_eq!(snap.get("x").unwrap(), "val"); + + let snap = svc.snapshot(2).await.unwrap(); + assert!(!snap.contains_key("x")); + } + + #[tokio::test] + async fn batch_update_single_version() { + let pool = test_pool().await; + let svc = SettingsSvc::load(pool).await.unwrap(); + + let mut patches = HashMap::new(); + patches.insert("a".to_owned(), Some("1".to_owned())); + patches.insert("b".to_owned(), Some("2".to_owned())); + svc.batch_update(patches).await.unwrap(); + + assert_eq!(svc.current_version().await.unwrap(), 1); + assert_eq!(svc.get("a").await.unwrap(), "1"); + assert_eq!(svc.get("b").await.unwrap(), "2"); + } + + #[tokio::test] + async fn list_returns_current_snapshot() { + let pool = test_pool().await; + let svc = SettingsSvc::load(pool).await.unwrap(); + + svc.set("a", "1").await.unwrap(); + svc.set("b", "2").await.unwrap(); + + let all = svc.list().await; + assert_eq!(all.get("a").unwrap(), "1"); + assert_eq!(all.get("b").unwrap(), "2"); + } + + #[tokio::test] + async fn rollback_creates_new_version() { + let pool = test_pool().await; + let svc = SettingsSvc::load(pool).await.unwrap(); + + svc.set("a", "original").await.unwrap(); // v1 + svc.set("a", "changed").await.unwrap(); // v2 + + let new_ver = svc.rollback_to(1).await.unwrap(); + assert_eq!(new_ver, 3); // rollback creates v3 + assert_eq!(svc.get("a").await.unwrap(), "original"); + } + + #[tokio::test] + async fn snapshot_rejects_future_version() { + let pool = test_pool().await; + let svc = SettingsSvc::load(pool).await.unwrap(); + + svc.set("a", "val").await.unwrap(); // v1 + + let err = svc.snapshot(999).await.unwrap_err(); + assert!( + err.to_string().contains("does not exist"), + "expected 'does not exist' error, got: {err}" + ); + } + + #[tokio::test] + async fn list_versions_decodes_json_values() { + let pool = test_pool().await; + let svc = SettingsSvc::load(pool).await.unwrap(); + + svc.set("llm.provider", "openrouter").await.unwrap(); + let entries = svc.list_versions(10).await.unwrap(); + let entry = entries.first().expect("should have one entry"); + // Value should be the plain string, not JSON-encoded. + assert_eq!(entry.value.as_deref(), Some("openrouter")); + } + + #[tokio::test] + async fn rollback_rejects_invalid_version() { + let pool = test_pool().await; + let svc = SettingsSvc::load(pool).await.unwrap(); + + svc.set("a", "val").await.unwrap(); // v1 + + let err = svc.rollback_to(999).await.unwrap_err(); + assert!( + err.to_string().contains("does not exist"), + "expected 'does not exist' error, got: {err}" + ); + // Negative version also rejected. + assert!(svc.rollback_to(-1).await.is_err()); + } +} diff --git a/crates/rara-model/migrations/20260412000000_settings_version.down.sql b/crates/rara-model/migrations/20260412000000_settings_version.down.sql new file mode 100644 index 000000000..fbff8435a --- /dev/null +++ b/crates/rara-model/migrations/20260412000000_settings_version.down.sql @@ -0,0 +1,14 @@ +-- Restore settings rows back to kv_table from the latest snapshot. +INSERT OR REPLACE INTO kv_table (key, value) +SELECT sv.key, sv.value +FROM settings_version sv +INNER JOIN ( + SELECT key, MAX(version) AS max_ver + FROM settings_version + GROUP BY key +) latest ON sv.key = latest.key AND sv.version = latest.max_ver +WHERE sv.value IS NOT NULL; + +DROP INDEX IF EXISTS idx_settings_version_key; +DROP TABLE IF EXISTS settings_version; +DROP TABLE IF EXISTS settings_version_counter; diff --git a/crates/rara-model/migrations/20260412000000_settings_version.up.sql b/crates/rara-model/migrations/20260412000000_settings_version.up.sql new file mode 100644 index 000000000..1cce2d7cd --- /dev/null +++ b/crates/rara-model/migrations/20260412000000_settings_version.up.sql @@ -0,0 +1,31 @@ +-- settings_version: MVCC append-only log — the SOLE storage for settings. +-- Each mutation (set/delete) appends a row with a monotonically increasing +-- global version. value=NULL is a tombstone (key deleted at this version). +-- Current state = snapshot at max version. kv_table no longer stores settings. +-------------------------------------------------------------------------------- + +CREATE TABLE settings_version ( + version INTEGER NOT NULL, + key TEXT NOT NULL, + value TEXT, -- NULL = tombstone (deleted) + changed_at TEXT NOT NULL DEFAULT (datetime('now')), + PRIMARY KEY (version, key) +); + +-- Fast snapshot queries: "latest version per key where version <= N" +CREATE INDEX idx_settings_version_key ON settings_version (key, version); + +-- Global version counter. Single row, updated via UPDATE ... RETURNING. +CREATE TABLE settings_version_counter ( + id INTEGER PRIMARY KEY CHECK (id = 1), + current INTEGER NOT NULL DEFAULT 0 +); + +INSERT INTO settings_version_counter (id, current) VALUES (1, 0); + +-- Seed version 0: migrate all existing settings from kv_table as baseline. +INSERT INTO settings_version (version, key, value) +SELECT 0, key, value FROM kv_table WHERE key LIKE 'settings.%'; + +-- Remove settings rows from kv_table — settings_version is now authoritative. +DELETE FROM kv_table WHERE key LIKE 'settings.%'; diff --git a/crates/rara-vault/AGENT.md b/crates/rara-vault/AGENT.md deleted file mode 100644 index 62aa1b515..000000000 --- a/crates/rara-vault/AGENT.md +++ /dev/null @@ -1,98 +0,0 @@ -# rara-vault — Agent Guide - -## 这个 crate 是什么 - -Vault 配置中心客户端。rara 通过它连接部署在 k8s 集群上的 HashiCorp Vault,实现配置的集中管理、版本化、加密存储和远程变更。 - -## 为什么需要它 - -之前所有配置写在本地 YAML 文件里,有三个痛点: -- **改坏了没法回滚** — 配置迭代快,YAML 改错没有版本历史 -- **敏感信息明文** — API key、bot token 直接写在文件里 -- **远程管理不便** — 必须 SSH 到服务器手动编辑 - -Vault KV v2 引擎天然解决这三个问题:每次写入自动生成版本号,支持 rollback;数据加密存储;有 Web UI 远程管理。 - -## 关键设计决策 - -### 为什么用 AppRole 认证而不是 Token - -AppRole 是 Vault 推荐的机器对机器认证方式。role_id + secret_id 从文件读取,不硬编码。Token 有 TTL 会过期,client 自动在 TTL 过半时 renew。 - -### 为什么 config/ 和 secrets/ 分离 - -Vault policy 按路径控制权限。secrets/ 路径可以设更严格的 ACL(只有 rara 的 AppRole 能读),config/ 可以开放给运维人员直接在 UI 上改。 - -### 为什么 flatten 格式要兼容现有 KV store - -rara 已有一套 Settings KV store(SQLite)+ 双向 sync 机制(`crates/app/src/flatten.rs`)。Vault 拉下来的配置最终要灌入这个 KV store,所以 `pull_all()` 输出的 key 格式必须和 `flatten.rs` 的一致: - -``` -llm.default_provider → "openrouter" -llm.providers.openrouter.api_key → "sk-xxx" -telegram.bot_token → "xxx" -knowledge.embedding_model → "text-embedding-3-small" -``` - -这样下游的 kernel、server、channel 完全不知道配置来自 Vault 还是本地 YAML。 - -### 为什么用 poll 而不是 watch - -Vault KV v2 没有原生 watch/subscribe 机制。client 按 `watch_interval`(默认 30s)轮询各 path 的 metadata version,发现版本号变化才 pull 最新数据。 - -### 降级策略 - -Vault 是外部依赖,必须容忍不可达: -- 启动时连不上 + `fallback_to_local: true` → 用本地 YAML 启动,日志 WARN -- 启动时连不上 + `fallback_to_local: false` → 报错退出 -- 运行中断连 → 保持当前配置不变,后台定时重连 - -## 数据流 - -``` -Vault (k8s) - │ - │ pull_all() / push_changes() - ▼ -rara-vault crate - │ - │ Vec<(String, String)> ← 与 flatten.rs 格式兼容 - ▼ -Settings KV Store (SQLite) ← 现有,不变 - │ - ▼ -kernel / server / channels ← 无感知 -``` - -## Vault 数据结构 - -``` -secret/rara/ -├── config/ # 非敏感配置 -│ ├── http # { bind_address, max_body_size, ... } -│ ├── grpc # { bind_address, server_address, ... } -│ ├── llm # { default_provider, providers: { ... } } -│ ├── mita # { heartbeat_interval } -│ ├── knowledge # { embedding_model, dimensions, ... } -│ └── symphony # { enabled, poll_interval, ... } -├── secrets/ # 敏感信息(严格 ACL) -│ ├── telegram # { bot_token } -│ ├── llm # { providers: { openrouter: { api_key } } } -│ ├── composio # { api_key, entity_id } -│ └── symphony # { linear_api_key } -└── users/ # 用户身份 - └── ryan # { role, platforms: [...] } -``` - -## 部署信息 - -- Vault 跑在 k8s 集群,standalone 模式,NodePort :30820 -- rara 服务器和 k8s 在同一局域网,HTTP 直连,不需要 TLS -- AppRole 凭证文件放在 rara 服务器的 `/etc/rara/vault-role-id` 和 `/etc/rara/vault-secret-id` - -## 相关文件 - -- `docs/plans/2026-03-13-vault-config-center-design.md` — 完整设计文档 -- `crates/app/src/flatten.rs` — 现有的 flatten/unflatten 逻辑(本 crate 的输出格式必须与之兼容) -- `crates/app/src/config_sync.rs` — 现有的双向 sync(#299 中会扩展为三向) -- `crates/app/src/lib.rs` — AppConfig 和启动流程(#299 中会插入 Vault 拉取步骤) diff --git a/crates/rara-vault/Cargo.toml b/crates/rara-vault/Cargo.toml deleted file mode 100644 index c88d39753..000000000 --- a/crates/rara-vault/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "rara-vault" -version.workspace = true -edition.workspace = true -license.workspace = true -authors.workspace = true -repository.workspace = true -publish = false - -[dependencies] -humantime-serde = "1" -reqwest = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -snafu = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } - -[dev-dependencies] -serde_yaml = { workspace = true } -tokio = { workspace = true, features = ["full", "macros"] } - -[lints] -workspace = true diff --git a/crates/rara-vault/src/client.rs b/crates/rara-vault/src/client.rs deleted file mode 100644 index c14b631de..000000000 --- a/crates/rara-vault/src/client.rs +++ /dev/null @@ -1,912 +0,0 @@ -// Copyright 2025 Rararulab -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{collections::HashMap, sync::Arc}; - -use reqwest::Client; -use serde::{Deserialize, Serialize}; -use snafu::ResultExt; -use tokio::sync::RwLock; -use tracing::{debug, warn}; - -use crate::{ - config::VaultConfig, - error::{self, VaultError}, -}; - -// --------------------------------------------------------------------------- -// Vault API response types -// --------------------------------------------------------------------------- - -/// Wrapper for Vault KV v2 read responses. -#[derive(Debug, Deserialize)] -pub struct KvV2ReadResponse { - pub data: KvV2Data, -} - -/// Inner data envelope of a KV v2 read response. -#[derive(Debug, Deserialize)] -pub struct KvV2Data { - pub data: HashMap, - pub metadata: KvV2Metadata, -} - -/// Metadata attached to a KV v2 secret version. -#[derive(Debug, Deserialize)] -pub struct KvV2Metadata { - pub version: u64, - pub created_time: String, - #[serde(default)] - pub destroyed: bool, -} - -/// Response from `LIST` on the metadata endpoint. -#[derive(Debug, Deserialize)] -pub struct ListResponse { - pub data: ListKeys, -} - -/// Key list inside a `LIST` response. -#[derive(Debug, Deserialize)] -pub struct ListKeys { - pub keys: Vec, -} - -/// Vault auth/login response. -#[derive(Debug, Deserialize)] -struct AuthResponse { - auth: AuthData, -} - -#[derive(Debug, Deserialize)] -struct AuthData { - client_token: String, - lease_duration: u64, -} - -/// Generic Vault error response body. -#[derive(Debug, Deserialize)] -struct VaultErrorBody { - #[serde(default)] - errors: Vec, -} - -/// Body sent to the KV v2 `data/` endpoint for writes. -#[derive(Debug, Serialize)] -struct KvV2WriteBody { - data: HashMap, -} - -// --------------------------------------------------------------------------- -// VaultClient -// --------------------------------------------------------------------------- - -/// Internal token state. -struct TokenState { - token: String, - /// Seconds remaining when the token was acquired. - lease_duration: u64, - /// When the token was acquired (monotonic). - acquired_at: tokio::time::Instant, -} - -/// HTTP client for HashiCorp Vault KV v2 with AppRole authentication. -pub struct VaultClient { - config: VaultConfig, - http: Client, - token_state: Arc>>, -} - -impl VaultClient { - /// Create a new `VaultClient` from the given config. - /// - /// The client is *not* authenticated yet — call [`login()`](Self::login) - /// before issuing any secret operations. - pub fn new(config: VaultConfig) -> Result { - let http = Client::builder() - .timeout(config.timeout) - .build() - .context(error::ConnectionSnafu)?; - Ok(Self { - config, - http, - token_state: Arc::new(RwLock::new(None)), - }) - } - - // ------------------------------------------------------------------ - // Authentication - // ------------------------------------------------------------------ - - /// Authenticate with Vault using AppRole credentials read from disk. - pub async fn login(&self) -> Result<(), VaultError> { - let role_id = tokio::fs::read_to_string(&self.config.auth.role_id_file) - .await - .context(error::CredentialFileSnafu { - path: self.config.auth.role_id_file.display().to_string(), - })?; - let secret_id = tokio::fs::read_to_string(&self.config.auth.secret_id_file) - .await - .context(error::CredentialFileSnafu { - path: self.config.auth.secret_id_file.display().to_string(), - })?; - - let url = format!("{}/v1/auth/approle/login", self.config.address); - let body = serde_json::json!({ - "role_id": role_id.trim(), - "secret_id": secret_id.trim(), - }); - - let resp = self - .http - .post(&url) - .json(&body) - .send() - .await - .context(error::AuthSnafu)?; - - let status = resp.status(); - if !status.is_success() { - let msg = extract_error_message(resp).await; - return Err(VaultError::Api { - status: status.as_u16(), - message: msg, - }); - } - - let auth_resp: AuthResponse = resp.json().await.context(error::AuthSnafu)?; - debug!( - lease_duration = auth_resp.auth.lease_duration, - "Vault AppRole login succeeded" - ); - - let mut state = self.token_state.write().await; - *state = Some(TokenState { - token: auth_resp.auth.client_token, - lease_duration: auth_resp.auth.lease_duration, - acquired_at: tokio::time::Instant::now(), - }); - Ok(()) - } - - /// Renew the current client token in-place. - pub async fn renew_token(&self) -> Result<(), VaultError> { - let token = self.get_token_raw().await?; - let url = format!("{}/v1/auth/token/renew-self", self.config.address); - - let resp = self - .http - .post(&url) - .header("X-Vault-Token", &token) - .send() - .await - .context(error::ConnectionSnafu)?; - - let status = resp.status(); - if !status.is_success() { - let msg = extract_error_message(resp).await; - warn!(status = status.as_u16(), msg, "Token renewal failed"); - return Err(VaultError::Api { - status: status.as_u16(), - message: msg, - }); - } - - let auth_resp: AuthResponse = resp.json().await.context(error::ConnectionSnafu)?; - let mut state = self.token_state.write().await; - *state = Some(TokenState { - token: auth_resp.auth.client_token, - lease_duration: auth_resp.auth.lease_duration, - acquired_at: tokio::time::Instant::now(), - }); - debug!("Vault token renewed"); - Ok(()) - } - - /// Returns `true` when the token has passed half its lease duration. - pub async fn token_needs_renewal(&self) -> bool { - let state = self.token_state.read().await; - match state.as_ref() { - Some(ts) => { - let elapsed = ts.acquired_at.elapsed().as_secs(); - elapsed >= ts.lease_duration / 2 - } - None => true, - } - } - - // ------------------------------------------------------------------ - // Secret read operations - // ------------------------------------------------------------------ - - /// Read a single secret from the KV v2 store. - pub async fn read_secret(&self, path: &str) -> Result { - let token = self.get_token().await?; - let url = format!( - "{}/v1/{}/data/{}", - self.config.address, self.config.mount_path, path - ); - - let resp = self - .http - .get(&url) - .header("X-Vault-Token", &token) - .send() - .await - .context(error::ConnectionSnafu)?; - - let status = resp.status(); - if status.as_u16() == 404 { - return Err(VaultError::NotFound { - path: path.to_string(), - }); - } - if !status.is_success() { - let msg = extract_error_message(resp).await; - return Err(VaultError::Api { - status: status.as_u16(), - message: msg, - }); - } - - let body = resp.text().await.context(error::ConnectionSnafu)?; - serde_json::from_str(&body).context(error::DeserializeSnafu) - } - - /// List child keys under a metadata path. - pub async fn list_secrets(&self, path: &str) -> Result, VaultError> { - let token = self.get_token().await?; - let url = format!( - "{}/v1/{}/metadata/{}", - self.config.address, self.config.mount_path, path - ); - - let resp = self - .http - .request( - reqwest::Method::from_bytes(b"LIST").expect("valid method"), - &url, - ) - .header("X-Vault-Token", &token) - .send() - .await - .context(error::ConnectionSnafu)?; - - let status = resp.status(); - if status.as_u16() == 404 { - return Ok(vec![]); - } - if !status.is_success() { - let msg = extract_error_message(resp).await; - return Err(VaultError::Api { - status: status.as_u16(), - message: msg, - }); - } - - let body = resp.text().await.context(error::ConnectionSnafu)?; - let list: ListResponse = serde_json::from_str(&body).context(error::DeserializeSnafu)?; - Ok(list.data.keys) - } - - /// Pull all secrets under `config/` and `secrets/` and flatten them - /// into dot-separated key-value pairs compatible with the settings - /// store format used by `crates/app/src/flatten.rs`. - /// - /// Vault key names are mapped to settings key prefixes via - /// `vault_key_to_settings_prefix`. For example, the Vault path - /// `secrets/knowledge` maps to settings prefix `memory.knowledge`, - /// so `{"embedding_model": "..."}` becomes - /// `[("memory.knowledge.embedding_model", "...")]`. - pub async fn pull_all(&self) -> Result, VaultError> { - let mut pairs = Vec::new(); - - for prefix in &["config", "secrets"] { - let keys = self.list_secrets(prefix).await?; - for key in &keys { - // Skip directory markers (trailing slash) - if key.ends_with('/') { - continue; - } - let vault_path = format!("{prefix}/{key}"); - match self.read_secret(&vault_path).await { - Ok(resp) => { - let settings_prefix = vault_key_to_settings_prefix(key); - flatten_value( - &settings_prefix, - &serde_json::Value::Object(resp.data.data.into_iter().collect()), - &mut pairs, - ); - } - Err(VaultError::NotFound { .. }) => { - debug!( - path = vault_path, - "secret not found during pull_all, skipping" - ); - } - Err(e) => return Err(e), - } - } - } - - Ok(pairs) - } - - // ------------------------------------------------------------------ - // Secret write operations - // ------------------------------------------------------------------ - - /// Write a secret to the KV v2 store. - pub async fn write_secret( - &self, - path: &str, - data: HashMap, - ) -> Result<(), VaultError> { - let token = self.get_token().await?; - let url = format!( - "{}/v1/{}/data/{}", - self.config.address, self.config.mount_path, path - ); - - let body = KvV2WriteBody { data }; - let resp = self - .http - .post(&url) - .header("X-Vault-Token", &token) - .json(&body) - .send() - .await - .context(error::ConnectionSnafu)?; - - let status = resp.status(); - if !status.is_success() { - let msg = extract_error_message(resp).await; - return Err(VaultError::Api { - status: status.as_u16(), - message: msg, - }); - } - Ok(()) - } - - /// Push flat key-value pairs back into Vault by unflattening them - /// into the appropriate path structure. - /// - /// Keys are expected in the format `"section.field"` or - /// `"section.nested.field"`. The first segment determines the Vault - /// path under `config/`. - pub async fn push_changes(&self, changes: Vec<(String, String)>) -> Result<(), VaultError> { - let grouped = unflatten_to_vault_paths(changes); - for (path, data) in grouped { - self.write_secret(&path, data).await?; - } - Ok(()) - } - - // ------------------------------------------------------------------ - // Metadata - // ------------------------------------------------------------------ - - /// Read metadata (version info) for a secret path. - pub async fn get_metadata(&self, path: &str) -> Result { - let token = self.get_token().await?; - let url = format!( - "{}/v1/{}/metadata/{}", - self.config.address, self.config.mount_path, path - ); - - let resp = self - .http - .get(&url) - .header("X-Vault-Token", &token) - .send() - .await - .context(error::ConnectionSnafu)?; - - let status = resp.status(); - if status.as_u16() == 404 { - return Err(VaultError::NotFound { - path: path.to_string(), - }); - } - if !status.is_success() { - let msg = extract_error_message(resp).await; - return Err(VaultError::Api { - status: status.as_u16(), - message: msg, - }); - } - - #[derive(Deserialize)] - struct MetadataResp { - data: MetadataInner, - } - #[derive(Deserialize)] - struct MetadataInner { - current_version: u64, - created_time: String, - } - - let body = resp.text().await.context(error::ConnectionSnafu)?; - let meta: MetadataResp = serde_json::from_str(&body).context(error::DeserializeSnafu)?; - Ok(KvV2Metadata { - version: meta.data.current_version, - created_time: meta.data.created_time, - destroyed: false, - }) - } - - // ------------------------------------------------------------------ - // Internal helpers - // ------------------------------------------------------------------ - - /// Return the cached token without triggering renewal. - /// Used internally by `renew_token()` and `login()` to avoid recursion. - async fn get_token_raw(&self) -> Result { - let state = self.token_state.read().await; - match state.as_ref() { - Some(ts) => Ok(ts.token.clone()), - None => Err(VaultError::TokenExpired), - } - } - - /// Return a valid token, renewing transparently if past half TTL. - async fn get_token(&self) -> Result { - { - let state = self.token_state.read().await; - match state.as_ref() { - Some(ts) => { - let elapsed = ts.acquired_at.elapsed().as_secs(); - if elapsed >= ts.lease_duration / 2 { - drop(state); - if let Err(e) = self.renew_token_or_relogin().await { - warn!(error = %e, "token renewal failed, returning cached token"); - } - } - } - None => return Err(VaultError::TokenExpired), - } - } - self.get_token_raw().await - } - - /// Try to renew the token; if renewal fails, fall back to a full re-login. - async fn renew_token_or_relogin(&self) -> Result<(), VaultError> { - debug!("token past half TTL, attempting renewal"); - match self.renew_token().await { - Ok(()) => Ok(()), - Err(e) => { - warn!(error = %e, "token renewal failed, attempting full re-login"); - self.login().await - } - } - } -} - -// --------------------------------------------------------------------------- -// Flatten / unflatten helpers -// --------------------------------------------------------------------------- - -/// Map a Vault key name to the settings key prefix used by the existing -/// Settings KV store. -/// -/// Most Vault keys map 1:1 (e.g. `"http"` → `"http"`), but some have -/// different prefixes in the settings store: -/// - `"knowledge"` → `"memory.knowledge"` (settings keys: `memory.knowledge.*`) -/// - `"composio"` → uses constants from `rara_domain_shared::settings::keys` -/// but those happen to match `"composio.*"` pattern -/// -/// The reverse mapping is in [`settings_prefix_to_vault_key`]. -fn vault_key_to_settings_prefix(vault_key: &str) -> String { - match vault_key { - "knowledge" => "memory.knowledge".into(), - other => other.into(), - } -} - -/// Map a settings key prefix back to the Vault key name. -/// -/// Inverse of [`vault_key_to_settings_prefix`]. -fn settings_prefix_to_vault_key(settings_key: &str) -> &str { - if settings_key.starts_with("memory.knowledge") { - "knowledge" - } else { - // For most keys, the first segment is the Vault key name. - settings_key.split('.').next().unwrap_or(settings_key) - } -} - -/// Recursively flatten a JSON value into dot-separated key-value pairs. -/// -/// The `prefix` is the top-level section name (e.g. `"http"` or `"llm"`). -pub(crate) fn flatten_value( - prefix: &str, - value: &serde_json::Value, - out: &mut Vec<(String, String)>, -) { - match value { - serde_json::Value::Object(map) => { - for (k, v) in map { - let key = format!("{prefix}.{k}"); - flatten_value(&key, v, out); - } - } - serde_json::Value::Array(arr) => { - let joined: Vec = arr - .iter() - .map(|v| match v { - serde_json::Value::String(s) => s.clone(), - other => other.to_string(), - }) - .collect(); - out.push((prefix.to_string(), joined.join(","))); - } - serde_json::Value::String(s) => { - out.push((prefix.to_string(), s.clone())); - } - serde_json::Value::Number(n) => { - out.push((prefix.to_string(), n.to_string())); - } - serde_json::Value::Bool(b) => { - out.push((prefix.to_string(), b.to_string())); - } - serde_json::Value::Null => {} - } -} - -/// Settings keys that must be routed to `secrets/` instead of `config/`. -/// -/// This list defines which flattened key prefixes contain sensitive data -/// (API keys, tokens, passwords) and must be written to the `secrets/` -/// path in Vault, where stricter ACL policies apply. -const SECRET_KEY_PREFIXES: &[&str] = &[ - "telegram.bot_token", - "composio.api_key", - "composio.entity_id", - "gmail.app_password", - "memory.memos.token", -]; - -/// Check if a flattened settings key contains a secret value that should -/// be stored under `secrets/` rather than `config/`. -fn is_secret_key(key: &str) -> bool { - // Exact match on known secret keys - if SECRET_KEY_PREFIXES.contains(&key) { - return true; - } - // LLM provider api_key fields: "llm.providers.{name}.api_key" - if key.starts_with("llm.providers.") && key.ends_with(".api_key") { - return true; - } - // Symphony tracker api_key - if key.starts_with("symphony.") && key.contains("api_key") { - return true; - } - false -} - -/// Group flat key-value pairs into Vault write paths. -/// -/// Sensitive keys (API keys, tokens) are routed to `secrets/{section}`, -/// everything else goes to `config/{section}`. This maintains the -/// permission separation between config/ and secrets/ in Vault. -pub(crate) fn unflatten_to_vault_paths( - pairs: Vec<(String, String)>, -) -> HashMap> { - let mut grouped: HashMap> = HashMap::new(); - - for (key, value) in pairs { - if let Some(dot_pos) = key.find('.') { - let vault_key = settings_prefix_to_vault_key(&key); - let vault_prefix = if is_secret_key(&key) { - "secrets" - } else { - "config" - }; - let path = format!("{vault_prefix}/{vault_key}"); - - // Strip the settings prefix to get the field path within - // the Vault secret. For "memory.knowledge.embedding_model", - // vault_key is "knowledge", so we need the part after - // "memory.knowledge." → "embedding_model". - let settings_prefix = vault_key_to_settings_prefix(vault_key); - let rest = key - .strip_prefix(&format!("{settings_prefix}.")) - .unwrap_or(&key[dot_pos + 1..]); - - let entry = grouped.entry(path).or_default(); - set_nested_value(entry, rest, serde_json::Value::String(value)); - } - } - - grouped -} - -/// Set a nested value in a `HashMap` using a dot-separated key. -/// -/// For `"providers.openrouter.api_key"` it creates nested objects: -/// `{"providers": {"openrouter": {"api_key": value}}}`. -fn set_nested_value( - map: &mut HashMap, - dotted_key: &str, - value: serde_json::Value, -) { - let parts: Vec<&str> = dotted_key.splitn(2, '.').collect(); - if parts.len() == 1 { - map.insert(parts[0].to_string(), value); - } else { - let entry = map - .entry(parts[0].to_string()) - .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new())); - if let serde_json::Value::Object(inner) = entry { - let mut inner_map: HashMap = - inner.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); - set_nested_value(&mut inner_map, parts[1], value); - *entry = serde_json::Value::Object(inner_map.into_iter().collect()); - } - } -} - -/// Extract a human-readable error message from a Vault error response. -async fn extract_error_message(resp: reqwest::Response) -> String { - resp.text().await.unwrap_or_else(|_| "unknown error".into()) -} - -// --------------------------------------------------------------------------- -// URL builder (for testing) -// --------------------------------------------------------------------------- - -impl VaultClient { - /// Build the URL for reading a secret (exposed for unit tests). - #[cfg(test)] - fn read_secret_url(&self, path: &str) -> String { - format!( - "{}/v1/{}/data/{}", - self.config.address, self.config.mount_path, path - ) - } - - /// Build the URL for listing secrets (exposed for unit tests). - #[cfg(test)] - fn list_secrets_url(&self, path: &str) -> String { - format!( - "{}/v1/{}/metadata/{}", - self.config.address, self.config.mount_path, path - ) - } - - /// Build the URL for the AppRole login endpoint (exposed for unit tests). - #[cfg(test)] - fn login_url(&self) -> String { format!("{}/v1/auth/approle/login", self.config.address) } -} - -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use super::*; - use crate::config::{VaultAuthConfig, VaultConfig}; - - fn test_config() -> VaultConfig { - VaultConfig { - address: "http://10.0.0.5:30820".into(), - mount_path: "secret/rara".into(), - auth: VaultAuthConfig { - method: "approle".into(), - role_id_file: "/etc/rara/vault-role-id".into(), - secret_id_file: "/etc/rara/vault-secret-id".into(), - }, - watch_interval: Duration::from_secs(30), - timeout: Duration::from_secs(5), - fallback_to_local: true, - } - } - - #[test] - fn url_construction() { - let client = VaultClient::new(test_config()).unwrap(); - - assert_eq!( - client.read_secret_url("config/http"), - "http://10.0.0.5:30820/v1/secret/rara/data/config/http" - ); - assert_eq!( - client.list_secrets_url("config"), - "http://10.0.0.5:30820/v1/secret/rara/metadata/config" - ); - assert_eq!( - client.login_url(), - "http://10.0.0.5:30820/v1/auth/approle/login" - ); - } - - #[test] - fn flatten_simple_object() { - let data = serde_json::json!({ - "bind_address": "127.0.0.1:25555", - "port": 8080 - }); - let mut pairs = Vec::new(); - flatten_value("http", &data, &mut pairs); - pairs.sort(); - - assert_eq!(pairs.len(), 2); - assert!(pairs.contains(&("http.bind_address".into(), "127.0.0.1:25555".into()))); - assert!(pairs.contains(&("http.port".into(), "8080".into()))); - } - - #[test] - fn flatten_nested_object() { - let data = serde_json::json!({ - "providers": { - "openrouter": { - "api_key": "sk-xxx", - "base_url": "https://openrouter.ai/api/v1" - } - } - }); - let mut pairs = Vec::new(); - flatten_value("llm", &data, &mut pairs); - pairs.sort(); - - assert_eq!(pairs.len(), 2); - assert!(pairs.contains(&("llm.providers.openrouter.api_key".into(), "sk-xxx".into()))); - assert!(pairs.contains(&( - "llm.providers.openrouter.base_url".into(), - "https://openrouter.ai/api/v1".into() - ))); - } - - #[test] - fn flatten_array_values() { - let data = serde_json::json!({ - "fallback_models": ["qwen3:14b", "llama3:8b"] - }); - let mut pairs = Vec::new(); - flatten_value("llm.providers.ollama", &data, &mut pairs); - - assert_eq!(pairs.len(), 1); - assert_eq!( - pairs[0], - ( - "llm.providers.ollama.fallback_models".into(), - "qwen3:14b,llama3:8b".into() - ) - ); - } - - #[test] - fn unflatten_routes_secrets_correctly() { - let input = vec![ - ("http.bind_address".into(), "127.0.0.1:25555".into()), - ("http.port".into(), "8080".into()), - // api_key should go to secrets/, not config/ - ("llm.providers.openrouter.api_key".into(), "sk-xxx".into()), - // base_url is not a secret, goes to config/ - ( - "llm.providers.openrouter.base_url".into(), - "https://openrouter.ai/api/v1".into(), - ), - // telegram bot_token should go to secrets/ - ("telegram.bot_token".into(), "123:ABC".into()), - // telegram chat_id is not a secret - ("telegram.chat_id".into(), "456".into()), - ]; - - let grouped = unflatten_to_vault_paths(input); - - // http.bind_address → config/http - let http_data = grouped.get("config/http").expect("config/http"); - assert_eq!( - http_data.get("bind_address"), - Some(&serde_json::Value::String("127.0.0.1:25555".into())) - ); - - // llm api_key → secrets/llm - let llm_secrets = grouped.get("secrets/llm").expect("secrets/llm"); - let providers = llm_secrets.get("providers").expect("providers"); - let openrouter = providers.get("openrouter").expect("openrouter"); - assert_eq!( - openrouter.get("api_key"), - Some(&serde_json::Value::String("sk-xxx".into())) - ); - - // llm base_url → config/llm (not secrets) - let llm_config = grouped.get("config/llm").expect("config/llm"); - let providers = llm_config.get("providers").expect("providers"); - let openrouter = providers.get("openrouter").expect("openrouter"); - assert_eq!( - openrouter.get("base_url"), - Some(&serde_json::Value::String( - "https://openrouter.ai/api/v1".into() - )) - ); - - // telegram.bot_token → secrets/telegram - let tg_secrets = grouped.get("secrets/telegram").expect("secrets/telegram"); - assert_eq!( - tg_secrets.get("bot_token"), - Some(&serde_json::Value::String("123:ABC".into())) - ); - - // telegram.chat_id → config/telegram - let tg_config = grouped.get("config/telegram").expect("config/telegram"); - assert_eq!( - tg_config.get("chat_id"), - Some(&serde_json::Value::String("456".into())) - ); - } - - #[test] - fn unflatten_knowledge_uses_vault_key() { - // Settings key is "memory.knowledge.embedding_model" - // but Vault path should be "config/knowledge" with field "embedding_model" - let input = vec![ - ( - "memory.knowledge.embedding_model".into(), - "text-embedding-3-small".into(), - ), - ( - "memory.knowledge.embedding_dimensions".into(), - "1536".into(), - ), - ]; - - let grouped = unflatten_to_vault_paths(input); - - let know_data = grouped.get("config/knowledge").expect("config/knowledge"); - assert_eq!( - know_data.get("embedding_model"), - Some(&serde_json::Value::String("text-embedding-3-small".into())) - ); - assert_eq!( - know_data.get("embedding_dimensions"), - Some(&serde_json::Value::String("1536".into())) - ); - } - - #[test] - fn vault_key_mapping_roundtrip() { - // knowledge ↔ memory.knowledge - assert_eq!( - vault_key_to_settings_prefix("knowledge"), - "memory.knowledge" - ); - assert_eq!( - settings_prefix_to_vault_key("memory.knowledge.embedding_model"), - "knowledge" - ); - - // Most keys are identity mappings - assert_eq!(vault_key_to_settings_prefix("http"), "http"); - assert_eq!(settings_prefix_to_vault_key("http.bind_address"), "http"); - assert_eq!(vault_key_to_settings_prefix("llm"), "llm"); - assert_eq!(settings_prefix_to_vault_key("llm.default_provider"), "llm"); - } - - #[test] - fn set_nested_creates_structure() { - let mut map = HashMap::new(); - set_nested_value(&mut map, "a.b.c", serde_json::Value::String("deep".into())); - - let a = map.get("a").expect("a"); - let b = a.get("b").expect("b"); - let c = b.get("c").expect("c"); - assert_eq!(c, &serde_json::Value::String("deep".into())); - } -} diff --git a/crates/rara-vault/src/config.rs b/crates/rara-vault/src/config.rs deleted file mode 100644 index b7ab29f05..000000000 --- a/crates/rara-vault/src/config.rs +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2025 Rararulab -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{path::PathBuf, time::Duration}; - -use serde::{Deserialize, Serialize}; - -/// Top-level Vault configuration. -/// -/// ```yaml -/// vault: -/// address: "http://10.0.0.5:30820" -/// mount_path: "secret/rara" -/// auth: -/// method: approle -/// role_id_file: /etc/rara/vault-role-id -/// secret_id_file: /etc/rara/vault-secret-id -/// watch_interval: 30s -/// timeout: 5s -/// fallback_to_local: true -/// ``` -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct VaultConfig { - /// Vault server address, e.g. `"http://10.0.0.5:30820"`. - pub address: String, - - /// KV v2 mount path, e.g. `"secret/rara"`. - #[serde(default = "default_mount_path")] - pub mount_path: String, - - /// Authentication configuration. - pub auth: VaultAuthConfig, - - /// How often to poll Vault for changes. - #[serde( - default = "default_watch_interval", - deserialize_with = "humantime_serde::deserialize", - serialize_with = "humantime_serde::serialize" - )] - pub watch_interval: Duration, - - /// HTTP request timeout. - #[serde( - default = "default_timeout", - deserialize_with = "humantime_serde::deserialize", - serialize_with = "humantime_serde::serialize" - )] - pub timeout: Duration, - - /// Whether to fall back to local config when Vault is unreachable. - #[serde(default = "default_fallback")] - pub fallback_to_local: bool, -} - -/// Authentication method configuration for Vault AppRole. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct VaultAuthConfig { - /// Auth method name (currently only `"approle"` is supported). - #[serde(default = "default_auth_method")] - pub method: String, - - /// Path to a file containing the AppRole `role_id`. - pub role_id_file: PathBuf, - - /// Path to a file containing the AppRole `secret_id`. - pub secret_id_file: PathBuf, -} - -fn default_mount_path() -> String { "secret/rara".into() } - -fn default_watch_interval() -> Duration { Duration::from_secs(30) } - -fn default_timeout() -> Duration { Duration::from_secs(5) } - -fn default_fallback() -> bool { true } - -fn default_auth_method() -> String { "approle".into() } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn serde_roundtrip() { - let config = VaultConfig { - address: "http://10.0.0.5:30820".into(), - mount_path: "secret/rara".into(), - auth: VaultAuthConfig { - method: "approle".into(), - role_id_file: "/etc/rara/vault-role-id".into(), - secret_id_file: "/etc/rara/vault-secret-id".into(), - }, - watch_interval: Duration::from_secs(30), - timeout: Duration::from_secs(5), - fallback_to_local: true, - }; - - let yaml = serde_yaml::to_string(&config).expect("serialize"); - let restored: VaultConfig = serde_yaml::from_str(&yaml).expect("deserialize"); - - assert_eq!(restored.address, config.address); - assert_eq!(restored.mount_path, config.mount_path); - assert_eq!(restored.auth.method, config.auth.method); - assert_eq!(restored.auth.role_id_file, config.auth.role_id_file); - assert_eq!(restored.auth.secret_id_file, config.auth.secret_id_file); - assert_eq!(restored.watch_interval, config.watch_interval); - assert_eq!(restored.timeout, config.timeout); - assert_eq!(restored.fallback_to_local, config.fallback_to_local); - } - - #[test] - fn deserialize_with_defaults() { - let yaml = r#" -address: "http://localhost:8200" -auth: - role_id_file: /tmp/role-id - secret_id_file: /tmp/secret-id -"#; - let config: VaultConfig = serde_yaml::from_str(yaml).expect("deserialize"); - assert_eq!(config.mount_path, "secret/rara"); - assert_eq!(config.watch_interval, Duration::from_secs(30)); - assert_eq!(config.timeout, Duration::from_secs(5)); - assert!(config.fallback_to_local); - assert_eq!(config.auth.method, "approle"); - } -} diff --git a/crates/rara-vault/src/error.rs b/crates/rara-vault/src/error.rs deleted file mode 100644 index fe5238e9a..000000000 --- a/crates/rara-vault/src/error.rs +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2025 Rararulab -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use snafu::Snafu; - -/// Errors produced by Vault operations. -#[derive(Debug, Snafu)] -#[snafu(visibility(pub(crate)))] -pub enum VaultError { - #[snafu(display("Vault authentication failed: {source}"))] - Auth { source: reqwest::Error }, - - #[snafu(display("Vault connection error: {source}"))] - Connection { source: reqwest::Error }, - - #[snafu(display("Failed to read auth credential file {path}: {source}"))] - CredentialFile { - path: String, - source: std::io::Error, - }, - - #[snafu(display("Vault API error: {status} - {message}"))] - Api { status: u16, message: String }, - - #[snafu(display("Secret not found: {path}"))] - NotFound { path: String }, - - #[snafu(display("Failed to deserialize Vault response: {source}"))] - Deserialize { source: serde_json::Error }, - - #[snafu(display("Token expired and renewal failed"))] - TokenExpired, -} diff --git a/crates/rara-vault/src/lib.rs b/crates/rara-vault/src/lib.rs deleted file mode 100644 index 43d41b0a6..000000000 --- a/crates/rara-vault/src/lib.rs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2025 Rararulab -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! `rara-vault` — HashiCorp Vault KV v2 client with AppRole authentication. -//! -//! This crate provides [`VaultClient`] for reading and writing secrets -//! stored in a Vault KV v2 backend, authenticating via the AppRole method. -//! -//! # Quick start -//! -//! ```rust,no_run -//! use rara_vault::{VaultClient, VaultConfig}; -//! -//! # async fn example() -> Result<(), rara_vault::VaultError> { -//! let config: VaultConfig = todo!("load from YAML"); -//! let client = VaultClient::new(config)?; -//! client.login().await?; -//! -//! // Pull all secrets as flat key-value pairs -//! let pairs = client.pull_all().await?; -//! for (key, value) in &pairs { -//! println!("{key} = {value}"); -//! } -//! # Ok(()) -//! # } -//! ``` - -pub mod client; -pub mod config; -pub mod error; - -pub use client::VaultClient; -pub use config::{VaultAuthConfig, VaultConfig}; -pub use error::VaultError; diff --git a/docs/plans/2026-03-13-vault-config-center-design.md b/docs/plans/2026-03-13-vault-config-center-design.md deleted file mode 100644 index 213bcc503..000000000 --- a/docs/plans/2026-03-13-vault-config-center-design.md +++ /dev/null @@ -1,250 +0,0 @@ -# Vault 配置中心设计 - -**Date**: 2026-03-13 -**Status**: Draft - -## 背景 - -rara 当前使用本地 YAML 文件 + SQLite KV store 双向同步管理配置。存在三个痛点: - -1. **运行时变更无审计/回滚** — 配置迭代快,改坏了无法回退到已知好的版本 -2. **敏感信息明文** — API key、bot token 直接写在 YAML 里 -3. **远程管理不便** — 需要 SSH 到服务器手动编辑文件 - -## 方案选择 - -选择 HashiCorp Vault(部署在 k8s 集群,rara 通过局域网 HTTP API 连接): - -| 对比项 | 自建版本化 | etcd | Vault | -|--------|-----------|------|-------| -| 版本历史/回滚 | 自己写 | 内置 | KV v2 内置 | -| 敏感信息加密 | 自己写 | 需额外处理 | 原生支持 | -| 管理 UI | 无 | 弱 | 内置 Web UI | -| 部署复杂度 | 无 | 中 | 中(Helm) | -| 外部依赖 | 无 | etcd 进程 | Vault 进程 | - -选 Vault 的理由:KV v2 自带版本历史 + rollback;敏感信息天然加密;Web UI 远程管理;k8s Helm 一键部署。 - -## 部署架构 - -``` -┌────────────────────────────┐ ┌──────────────────────────┐ -│ k8s 集群(局域网) │ │ rara 服务器(局域网) │ -│ │ │ │ -│ ┌──────────────────────┐ │ │ ┌────────────────────┐ │ -│ │ Vault (standalone) │ │ │ │ rara │ │ -│ │ NodePort :30820 │◄─┼─────┼──│ rara-vault crate │ │ -│ │ KV v2 engine │ │ │ └────────────────────┘ │ -│ │ AppRole auth │ │ │ │ -│ │ Web UI │ │ │ /etc/rara/ │ -│ └──────────────────────┘ │ │ vault-role-id │ -│ │ │ vault-secret-id │ -└────────────────────────────┘ └──────────────────────────┘ -``` - -- 局域网通信,不需要 TLS/Ingress -- Vault standalone 模式,单节点够用 -- rara 用 AppRole 认证,凭证文件存服务器本地 - -## Vault 数据结构 - -``` -secret/rara/ -├── config/ # 非敏感配置 -│ ├── http # { bind_address, max_body_size, enable_cors, request_timeout } -│ ├── grpc # { bind_address, server_address, max_recv/send_message_size } -│ ├── llm # { default_provider, providers: { name: { base_url, default_model } } } -│ ├── mita # { heartbeat_interval } -│ ├── knowledge # { embedding_model, dimensions, search_top_k, similarity_threshold } -│ ├── symphony # { enabled, poll_interval, max_concurrent_agents, ... } -│ └── gateway # { check_interval, health_timeout, max_restart_attempts, ... } -├── secrets/ # 敏感信息(严格 ACL) -│ ├── database # { url } -│ ├── telegram # { bot_token } -│ ├── llm # { providers: { openrouter: { api_key }, ollama: { api_key } } } -│ ├── composio # { api_key, entity_id } -│ └── symphony # { linear_api_key } -└── users/ # 用户身份映射 - └── ryan # { role, platforms: [...] } -``` - -**分离原则**:config/ 和 secrets/ 使用不同的 Vault policy,secrets/ 路径限制只有 rara AppRole 能读。 - -## Vault 部署步骤 - -### 1. Helm 安装 - -```bash -helm repo add hashicorp https://helm.releases.hashicorp.com -helm install vault hashicorp/vault \ - --namespace vault --create-namespace \ - --set server.standalone.enabled=true \ - --set server.dataStorage.size=1Gi \ - --set ui.enabled=true -``` - -### 2. 初始化 + 解封 - -```bash -kubectl exec -it vault-0 -n vault -- vault operator init -key-shares=1 -key-threshold=1 -# 记下 Unseal Key + Root Token - -kubectl exec -it vault-0 -n vault -- vault operator unseal -``` - -### 3. 配置引擎 + AppRole - -```bash -vault secrets enable -path=secret kv-v2 -vault auth enable approle - -vault policy write rara-policy - <<'EOF' -path "secret/data/rara/*" { - capabilities = ["create", "read", "update", "list"] -} -path "secret/metadata/rara/*" { - capabilities = ["read", "list"] -} -EOF - -vault write auth/approle/role/rara \ - token_policies="rara-policy" \ - token_ttl=1h \ - token_max_ttl=4h -``` - -### 4. 暴露 NodePort - -```bash -kubectl patch svc vault -n vault \ - -p '{"spec":{"type":"NodePort","ports":[{"port":8200,"nodePort":30820}]}}' -``` - -### 5. Auto-unseal(可选) - -Vault 重启后需要 unseal。简单方案:K8s Secret 存 unseal key + init container 自动解封。 - -## rara 侧集成设计 - -### 新 crate:rara-vault - -``` -crates/rara-vault/ -├── Cargo.toml -└── src/ - ├── lib.rs # pub API: VaultConfigSource - ├── client.rs # VaultClient: HTTP 交互、AppRole auth、token 自动续期 - ├── config.rs # VaultConfig struct - ├── watcher.rs # 后台 poll 变更(对比 version 号) - └── error.rs # snafu 错误类型 -``` - -### VaultConfig(本地 YAML 新增) - -```yaml -vault: - address: http://:30820 - mount_path: secret/rara - auth: - method: approle - role_id_file: /etc/rara/vault-role-id - secret_id_file: /etc/rara/vault-secret-id - watch_interval: 30s # poll 间隔 - timeout: 5s # 单次请求超时 - fallback_to_local: true # Vault 不可达时降级到本地 YAML -``` - -### 核心 trait - -```rust -/// 配置源抽象,Vault 和本地 YAML 都实现此 trait -#[async_trait] -pub trait ConfigSource: Send + Sync { - /// 拉取全量配置,合并为 AppConfig 的动态部分 - async fn pull_all(&self) -> Result; - - /// 检查是否有变更(返回变更的 key 列表) - async fn check_changes(&self, since_version: u64) -> Result>; - - /// 写回配置变更 - async fn push_changes(&self, changes: Vec) -> Result<()>; -} -``` - -### 启动流程变化 - -``` -1. 读本地 YAML → 获取 vault 连接信息 + 作为 fallback 配置 -2. 尝试 AppRole 认证获取 Vault token - ├─ 成功 → 从 Vault pull_all() 拉全量配置 - │ 合并覆盖本地 YAML 中的对应 section - └─ 失败 → 日志告警,使用本地 YAML 继续启动 -3. 合并后的 AppConfig 灌入 Settings KV store(复用现有 flatten 逻辑) -4. 启动 Vault watcher 后台 task -5. 正常启动 kernel、server...(下游完全无感知) -``` - -### 三向同步 - -``` - Vault (KV v2) - ▲ │ - push_changes() watch/poll - │ ▼ - Local YAML ◄─► config_sync ◄─► Settings KV Store (SQLite) - file watcher (现有) (现有,不变) -``` - -| 变更来源 | 流向 | -|---------|------| -| Vault UI/CLI 修改 | Vault → watcher poll → Settings KV → 写回 YAML | -| 本地 YAML 编辑 | file watcher → Settings KV → push 到 Vault | -| gRPC/HTTP API 修改 | Settings KV → 写回 Vault + 写回 YAML | - -**冲突策略**:Vault 版本号为准。本地改和 Vault 改冲突时,Vault 胜出(因为 Vault 有版本历史可回滚)。 - -### 降级策略 - -| 场景 | 行为 | -|------|------| -| 启动时 Vault 不可达 | 用本地 YAML 启动,日志 WARN,后台持续重连 | -| 运行中 Vault 断连 | 保持当前配置不变,停止 push,定时重连 | -| Vault 恢复连接 | 重新 pull 全量,diff 合并,恢复正常同步 | -| `fallback_to_local: false` | 启动时 Vault 不可达直接报错退出 | - -### 对现有代码的改动 - -| 文件 | 改动 | -|------|------| -| `crates/app/Cargo.toml` | 新增 `rara-vault` 依赖 | -| `crates/app/src/lib.rs` | `AppConfig` 加 `vault: Option` 字段;`start()` 插入 Vault 拉取步骤 | -| `crates/app/src/config_sync.rs` | 扩展为三向同步,加 Vault push/pull 通道 | -| 其他 crate | **不动** — Vault 的存在对 kernel、server、channel 完全透明 | - -## 实现计划 - -### Phase 1:基础集成(MVP) - -1. 创建 `rara-vault` crate — VaultClient + AppRole auth -2. 实现 `pull_all()` — 启动时从 Vault 拉配置 -3. `AppConfig` 加 vault section,启动流程插入 Vault 拉取 -4. 降级逻辑:Vault 不可达用本地 YAML - -**交付**:rara 能从 Vault 读配置启动,Vault 挂了不影响。 - -### Phase 2:Watch + 三向同步 - -5. 实现 watcher — 后台 poll Vault 变更 -6. 扩展 config_sync — Vault 变更 → Settings KV → YAML -7. 反向同步 — Settings KV 变更 → push 到 Vault -8. 冲突处理 — Vault 版本号优先 - -**交付**:运行时通过 Vault UI 改配置,rara 自动生效。 - -### Phase 3:迁移 + 运维 - -9. 初始化脚本 — 把现有 YAML 配置导入 Vault -10. 文档 — Vault 部署 runbook、配置回滚操作手册 -11. 监控 — Vault 连接状态暴露到 telemetry - -**交付**:完整的配置中心运维体系。 diff --git a/docs/src/harness-engineering.md b/docs/src/harness-engineering.md index 709a4c6fa..d4d163bc4 100644 --- a/docs/src/harness-engineering.md +++ b/docs/src/harness-engineering.md @@ -35,7 +35,7 @@ Validates crate dependency direction against a 7-layer architecture map. Lower-l ``` Layer 0 (foundation) → common/*, paths, rara-model, domain/*, rara-api -Layer 1 (domain) → soul, symphony, skills, vault, composio, ... +Layer 1 (domain) → soul, symphony, skills, composio, ... Layer 2 (core) → kernel Layer 3 (subsystems) → dock, sessions, agents, mcp, ... Layer 4 (integration) → channels, backend-admin diff --git a/scripts/internal/deps/commands.go b/scripts/internal/deps/commands.go index dcc78bb71..24eacecd5 100644 --- a/scripts/internal/deps/commands.go +++ b/scripts/internal/deps/commands.go @@ -56,8 +56,7 @@ var layerMap = map[string]int{ "rara-soul": 1, "rara-symphony": 1, "rara-skills": 1, - "rara-vault": 1, - "rara-composio": 1, +"rara-composio": 1, "rara-keyring-store": 1, "rara-git": 1,