Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ rara-sessions.workspace = true
rara-skills.workspace = true
rara-soul.workspace = true
rara-symphony.workspace = true
rara-vault = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
Expand Down
290 changes: 276 additions & 14 deletions crates/app/src/config_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ use std::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use rara_domain_shared::settings::SettingsProvider;
use rara_vault::VaultClient;
use tokio::sync::{RwLock, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
Expand All @@ -43,10 +44,12 @@ const DEBOUNCE_MS: u64 = 1500;

/// Bidirectional sync between config.yaml and the settings KV store.
pub struct ConfigFileSync {
settings: Arc<dyn SettingsProvider>,
app_config: Arc<RwLock<AppConfig>>,
config_path: PathBuf,
last_written_hash: Arc<AtomicU64>,
settings: Arc<dyn SettingsProvider>,
app_config: Arc<RwLock<AppConfig>>,
config_path: PathBuf,
last_written_hash: Arc<AtomicU64>,
vault_client: Option<Arc<VaultClient>>,
last_vault_push_ms: Arc<AtomicU64>,
}

fn content_hash(content: &str) -> u64 {
Expand All @@ -61,14 +64,19 @@ impl ConfigFileSync {
settings: Arc<dyn SettingsProvider>,
app_config: AppConfig,
config_path: PathBuf,
vault_client: Option<Arc<VaultClient>>,
last_vault_push_ms: Arc<AtomicU64>,
) -> anyhow::Result<Self> {
let sync = Self {
settings,
app_config: Arc::new(RwLock::new(app_config)),
config_path,
last_written_hash: Arc::new(AtomicU64::new(0)),
vault_client,
last_vault_push_ms,
};
sync.sync_from_file().await?;
let initial_config = sync.app_config.read().await.clone();
sync.sync_from_app_config(&initial_config).await?;
Ok(sync)
}

Expand All @@ -77,17 +85,21 @@ impl ConfigFileSync {
async fn sync_from_file(&self) -> anyhow::Result<()> {
let content = tokio::fs::read_to_string(&self.config_path).await?;
let new_config: AppConfig = serde_yaml::from_str(&content)?;
self.sync_from_app_config(&new_config).await
}

async fn sync_from_app_config(&self, new_config: &AppConfig) -> anyhow::Result<()> {
let pairs = flatten::flatten_config_sections(&new_config);
if !pairs.is_empty() {
let patches = pairs.into_iter().map(|(k, v)| (k, Some(v))).collect();
self.settings.batch_update(patches).await?;
}
{
let mut cfg = self.app_config.write().await;
cfg.llm = new_config.llm;
cfg.telegram = new_config.telegram;
cfg.composio = new_config.composio;
cfg.knowledge = new_config.knowledge;
cfg.llm = new_config.llm.clone();
cfg.telegram = new_config.telegram.clone();
cfg.composio = new_config.composio.clone();
cfg.knowledge = new_config.knowledge.clone();
}
info!("config.yaml synced to settings store");
Ok(())
Expand All @@ -106,10 +118,27 @@ impl ConfigFileSync {
cfg.knowledge = knowledge;
serde_yaml::to_string(&*cfg)?
};
let vault_pairs = {
let cfg = self.app_config.read().await;
flatten::flatten_config_sections(&cfg)
};

let hash = content_hash(&yaml);
tokio::fs::write(&self.config_path, &yaml).await?;
self.last_written_hash.store(hash, Ordering::Relaxed);

if let Some(ref client) = self.vault_client {
if let Err(e) = client.push_changes(vault_pairs).await {
warn!(error = %e, "failed to push settings to vault");
} else {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
self.last_vault_push_ms.store(now, Ordering::Relaxed);
debug!("settings pushed to vault");
}
}
debug!("settings written back to config.yaml");
Ok(())
}
Expand Down Expand Up @@ -247,7 +276,10 @@ impl ConfigFileSync {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::{
Arc,
atomic::AtomicU64,
};

use super::ConfigFileSync;
use crate::AppConfig;
Expand Down Expand Up @@ -299,6 +331,8 @@ knowledge:

gateway:
repo_url: "https://github.com/example/repo"
bot_token: "telegram-gateway-token"
chat_id: 123456
"#;

#[tokio::test]
Expand Down Expand Up @@ -357,9 +391,15 @@ composio:
tokio::fs::write(&config_path, yaml).await.unwrap();

let config: AppConfig = serde_yaml::from_str(yaml).unwrap();
let _sync = ConfigFileSync::new(settings_provider.clone(), config, config_path)
.await
.unwrap();
let _sync = ConfigFileSync::new(
settings_provider.clone(),
config,
config_path,
None,
Arc::new(AtomicU64::new(0)),
)
.await
.unwrap();

// Verify KV store was populated
assert_eq!(
Expand Down Expand Up @@ -394,6 +434,157 @@ composio:
);
}

#[tokio::test]
async fn initial_sync_prefers_bootstrapped_config() {
use rara_domain_shared::settings::SettingsProvider;

let tmp_dir = tempfile::tempdir().unwrap();
let db_path = tmp_dir.path().join("test.db");
let db_url = format!("sqlite:{}?mode=rwc", db_path.display());

let pool = sqlx::sqlite::SqlitePoolOptions::new()
.max_connections(1)
.connect(&db_url)
.await
.unwrap();
sqlx::migrate!("../rara-model/migrations")
.run(&pool)
.await
.unwrap();

let db_store = yunara_store::db::DBStore::new(pool.clone());
let kv = db_store.kv_store();
let settings_svc = rara_backend_admin::settings::SettingsSvc::load(kv, pool)
.await
.unwrap();
let settings_provider: Arc<dyn SettingsProvider> = Arc::new(settings_svc);

let config_path = tmp_dir.path().join("config.yaml");
let yaml = r#"
http:
bind_address: "127.0.0.1:25555"
grpc:
bind_address: "127.0.0.1:50051"
server_address: "127.0.0.1:50051"
users:
- name: test
role: root
platforms: []
mita:
heartbeat_interval: "30m"
llm:
default_provider: "local-provider"
providers:
local-provider:
base_url: "http://localhost:1234"
api_key: "local-key"
default_model: "local-model"
"#;
tokio::fs::write(&config_path, yaml).await.unwrap();

let mut config: AppConfig = serde_yaml::from_str(yaml).unwrap();
config.llm.as_mut().unwrap().default_provider = Some("vault-provider".into());
config.llm.as_mut().unwrap().providers.insert(
"vault-provider".into(),
crate::flatten::ProviderConfig {
base_url: Some("http://vault:1234".into()),
api_key: Some("vault-key".into()),
default_model: Some("vault-model".into()),
fallback_models: None,
},
);

let _sync = ConfigFileSync::new(
settings_provider.clone(),
config,
config_path,
None,
Arc::new(AtomicU64::new(0)),
)
.await
.unwrap();

assert_eq!(
settings_provider
.get("llm.default_provider")
.await
.as_deref(),
Some("vault-provider"),
);
assert_eq!(
settings_provider
.get("llm.providers.vault-provider.base_url")
.await
.as_deref(),
Some("http://vault:1234"),
);
}

#[tokio::test]
async fn writeback_without_vault_still_works() {
use rara_domain_shared::settings::SettingsProvider;

let tmp_dir = tempfile::tempdir().unwrap();
let db_path = tmp_dir.path().join("test.db");
let db_url = format!("sqlite:{}?mode=rwc", db_path.display());

let pool = sqlx::sqlite::SqlitePoolOptions::new()
.max_connections(1)
.connect(&db_url)
.await
.unwrap();
sqlx::migrate!("../rara-model/migrations")
.run(&pool)
.await
.unwrap();

let db_store = yunara_store::db::DBStore::new(pool.clone());
let kv = db_store.kv_store();
let settings_svc = rara_backend_admin::settings::SettingsSvc::load(kv, pool)
.await
.unwrap();
let settings_provider: Arc<dyn SettingsProvider> = Arc::new(settings_svc);

let config_path = tmp_dir.path().join("config.yaml");
let yaml = r#"
http:
bind_address: "127.0.0.1:25555"
grpc:
bind_address: "127.0.0.1:50051"
server_address: "127.0.0.1:50051"
users:
- name: test
role: root
platforms: []
mita:
heartbeat_interval: "30m"
llm:
default_provider: "test-provider"
providers:
test-provider:
base_url: "http://localhost:1234"
api_key: "test-key"
default_model: "test-model"
"#;
tokio::fs::write(&config_path, yaml).await.unwrap();

let config: AppConfig = serde_yaml::from_str(yaml).unwrap();
let sync = ConfigFileSync::new(
settings_provider.clone(),
config,
config_path.clone(),
None,
Arc::new(AtomicU64::new(0)),
)
.await
.unwrap();

sync.writeback_to_file().await.unwrap();

let content = tokio::fs::read_to_string(&config_path).await.unwrap();
assert!(content.contains("test-provider"));
}

#[test]
fn appconfig_yaml_roundtrip() {
let config: AppConfig = serde_yaml::from_str(TEST_YAML).expect("TEST_YAML should parse");
Expand Down Expand Up @@ -477,4 +668,75 @@ composio:
.and_then(|t| t.allowed_group_chat_id.as_deref()),
);
}

#[test]
fn appconfig_with_vault_yaml_roundtrip() {
let yaml = r#"
users:
- name: "testuser"
role: root
platforms:
- type: telegram
user_id: "12345"
http:
bind_address: "127.0.0.1:25555"
grpc:
bind_address: "127.0.0.1:50051"
server_address: "127.0.0.1:50051"
mita:
heartbeat_interval: "30m"
vault:
address: "http://10.0.0.1:30820"
mount_path: "secret/rara"
auth:
method: approle
role_id_file: "/etc/rara/vault-role-id"
secret_id_file: "/etc/rara/vault-secret-id"
watch_interval: "30s"
timeout: "5s"
fallback_to_local: true
"#;
let config: AppConfig = serde_yaml::from_str(yaml).expect("should parse with vault");
assert_eq!(
config.vault.as_ref().map(|vault| vault.address.as_str()),
Some("http://10.0.0.1:30820"),
);
assert_eq!(
config.vault.as_ref().map(|vault| vault.fallback_to_local),
Some(true),
);
let serialized = serde_yaml::to_string(&config).unwrap();
let reparsed: serde_yaml::Value = serde_yaml::from_str(&serialized).unwrap();

assert_eq!(
reparsed["vault"]["address"].as_str(),
Some("http://10.0.0.1:30820"),
);
assert_eq!(reparsed["vault"]["fallback_to_local"].as_bool(), Some(true));
}

#[test]
fn appconfig_without_vault_parses() {
let yaml = r#"
users:
- name: "testuser"
role: root
platforms:
- type: telegram
user_id: "12345"
http:
bind_address: "127.0.0.1:25555"
grpc:
bind_address: "127.0.0.1:50051"
server_address: "127.0.0.1:50051"
mita:
heartbeat_interval: "30m"
"#;
let config: AppConfig = serde_yaml::from_str(yaml).expect("should parse without vault");
assert!(config.vault.is_none());
let serialized = serde_yaml::to_string(&config).unwrap();
let reparsed: serde_yaml::Value = serde_yaml::from_str(&serialized).unwrap();

assert!(reparsed.get("vault").is_none());
}
}
Loading
Loading