From 4e8fadf7625bdb19063c08ba097dbe098091d7bd Mon Sep 17 00:00:00 2001 From: slkzgm Date: Thu, 15 Jan 2026 18:01:05 +0100 Subject: [PATCH 1/5] feat(daemon): add remote backend TCP JSON-RPC prototype --- REMOTE_BACKEND_POC.md | 69 ++ src-tauri/Cargo.toml | 2 +- src-tauri/src/bin/codex_monitor_daemon.rs | 777 ++++++++++++++++++++++ 3 files changed, 847 insertions(+), 1 deletion(-) create mode 100644 REMOTE_BACKEND_POC.md create mode 100644 src-tauri/src/bin/codex_monitor_daemon.rs diff --git a/REMOTE_BACKEND_POC.md b/REMOTE_BACKEND_POC.md new file mode 100644 index 000000000..65cd6c674 --- /dev/null +++ b/REMOTE_BACKEND_POC.md @@ -0,0 +1,69 @@ +# Remote Backend POC (daemon) + +This fork includes a **proof-of-concept** daemon that runs CodexMonitor's backend logic in a separate process (intended for WSL2/Linux), exposing a simple **line-delimited JSON-RPC** protocol over TCP. + +This is **not** wired into the desktop app yet (no UI toggle / remote proxy), but it is useful to validate the architecture and iterate on the protocol. + +## Run + +From the repo root: + +```bash +cd src-tauri + +# pick a strong token (or export CODEX_MONITOR_DAEMON_TOKEN) +TOKEN="change-me" + +cargo run --bin codex_monitor_daemon -- \ + --listen 127.0.0.1:4732 \ + --data-dir "$HOME/.local/share/codex-monitor-daemon" \ + --token "$TOKEN" +``` + +Notes: +- In WSL2, Windows access usually requires binding to `0.0.0.0` (depending on your port forwarding setup). +- `--insecure-no-auth` exists for local dev only. + +## Protocol + +- One JSON object per line. +- Requests: `{"id": , "method": "", "params": }` +- Responses: `{"id": , "result": }` or `{"id": , "error": {"message": ""}}` +- Events (server → client notifications): `{"method":"app-server-event","params":{...}}` + +### Auth handshake (required unless `--insecure-no-auth`) + +First request must be: + +```json +{"id": 1, "method": "auth", "params": {"token": "..." }} +``` + +## Quick test with netcat + +```bash +printf '{\"id\":1,\"method\":\"auth\",\"params\":{\"token\":\"change-me\"}}\\n' | nc -w 1 127.0.0.1 4732 +printf '{\"id\":2,\"method\":\"ping\"}\\n' | nc -w 1 127.0.0.1 4732 +printf '{\"id\":3,\"method\":\"list_workspaces\",\"params\":{}}\\n' | nc -w 1 127.0.0.1 4732 +``` + +## Implemented methods (initial) + +- `ping` +- `list_workspaces` +- `add_workspace` (`{ path, codex_bin? }`) +- `connect_workspace` (`{ id }`) +- `get_app_settings` +- `update_app_settings` (`{ settings }`) +- `start_thread` (`{ workspaceId }`) +- `resume_thread` (`{ workspaceId, threadId }`) +- `list_threads` (`{ workspaceId, cursor?, limit? }`) +- `archive_thread` (`{ workspaceId, threadId }`) +- `send_user_message` (`{ workspaceId, threadId, text, model?, effort?, accessMode?, images? }`) +- `turn_interrupt` (`{ workspaceId, threadId, turnId }`) +- `start_review` (`{ workspaceId, threadId, target, delivery? }`) +- `model_list` (`{ workspaceId }`) +- `account_rate_limits` (`{ workspaceId }`) +- `skills_list` (`{ workspaceId }`) +- `respond_to_server_request` (`{ workspaceId, requestId, result }`) + diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index cea35972c..f2117a789 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -23,7 +23,7 @@ tauri-plugin-opener = "2" tauri-plugin-process = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" -tokio = { version = "1", features = ["io-util", "process", "rt", "sync", "time"] } +tokio = { version = "1", features = ["io-util", "net", "process", "rt", "sync", "time"] } uuid = { version = "1", features = ["v4"] } tauri-plugin-dialog = "2" git2 = "0.20.3" diff --git a/src-tauri/src/bin/codex_monitor_daemon.rs b/src-tauri/src/bin/codex_monitor_daemon.rs new file mode 100644 index 000000000..eedb14b8a --- /dev/null +++ b/src-tauri/src/bin/codex_monitor_daemon.rs @@ -0,0 +1,777 @@ +#[path = "../backend/mod.rs"] +mod backend; +#[path = "../storage.rs"] +mod storage; +#[path = "../types.rs"] +mod types; + +use serde_json::{json, Map, Value}; +use std::collections::HashMap; +use std::env; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::Arc; + +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{broadcast, mpsc, Mutex}; +use uuid::Uuid; + +use backend::app_server::{spawn_workspace_session, WorkspaceSession}; +use backend::events::{AppServerEvent, EventSink, TerminalOutput}; +use storage::{read_settings, read_workspaces, write_settings, write_workspaces}; +use types::{AppSettings, WorkspaceEntry, WorkspaceInfo, WorkspaceKind, WorkspaceSettings}; + +const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:4732"; + +#[derive(Clone)] +struct DaemonEventSink { + tx: broadcast::Sender, +} + +#[derive(Clone)] +enum DaemonEvent { + AppServer(AppServerEvent), + TerminalOutput(TerminalOutput), +} + +impl EventSink for DaemonEventSink { + fn emit_app_server_event(&self, event: AppServerEvent) { + let _ = self.tx.send(DaemonEvent::AppServer(event)); + } + + fn emit_terminal_output(&self, event: TerminalOutput) { + let _ = self.tx.send(DaemonEvent::TerminalOutput(event)); + } +} + +struct DaemonConfig { + listen: SocketAddr, + token: Option, + data_dir: PathBuf, +} + +struct DaemonState { + workspaces: Mutex>, + sessions: Mutex>>, + storage_path: PathBuf, + settings_path: PathBuf, + app_settings: Mutex, + event_sink: DaemonEventSink, +} + +impl DaemonState { + fn load(config: &DaemonConfig, event_sink: DaemonEventSink) -> Self { + let storage_path = config.data_dir.join("workspaces.json"); + let settings_path = config.data_dir.join("settings.json"); + let workspaces = read_workspaces(&storage_path).unwrap_or_default(); + let app_settings = read_settings(&settings_path).unwrap_or_default(); + Self { + workspaces: Mutex::new(workspaces), + sessions: Mutex::new(HashMap::new()), + storage_path, + settings_path, + app_settings: Mutex::new(app_settings), + event_sink, + } + } + + async fn list_workspaces(&self) -> Vec { + let workspaces = self.workspaces.lock().await; + let sessions = self.sessions.lock().await; + let mut result = Vec::new(); + for entry in workspaces.values() { + result.push(WorkspaceInfo { + id: entry.id.clone(), + name: entry.name.clone(), + path: entry.path.clone(), + connected: sessions.contains_key(&entry.id), + codex_bin: entry.codex_bin.clone(), + kind: entry.kind.clone(), + parent_id: entry.parent_id.clone(), + worktree: entry.worktree.clone(), + settings: entry.settings.clone(), + }); + } + sort_workspaces(&mut result); + result + } + + async fn add_workspace( + &self, + path: String, + codex_bin: Option, + client_version: String, + ) -> Result { + let name = PathBuf::from(&path) + .file_name() + .and_then(|s| s.to_str()) + .unwrap_or("Workspace") + .to_string(); + + let entry = WorkspaceEntry { + id: Uuid::new_v4().to_string(), + name: name.clone(), + path: path.clone(), + codex_bin, + kind: WorkspaceKind::Main, + parent_id: None, + worktree: None, + settings: WorkspaceSettings::default(), + }; + + let default_bin = { + let settings = self.app_settings.lock().await; + settings.codex_bin.clone() + }; + + let session = spawn_workspace_session( + entry.clone(), + default_bin, + client_version, + self.event_sink.clone(), + ) + .await?; + + let list = { + let mut workspaces = self.workspaces.lock().await; + workspaces.insert(entry.id.clone(), entry.clone()); + workspaces.values().cloned().collect::>() + }; + write_workspaces(&self.storage_path, &list)?; + + self.sessions.lock().await.insert(entry.id.clone(), session); + + Ok(WorkspaceInfo { + id: entry.id, + name: entry.name, + path: entry.path, + connected: true, + codex_bin: entry.codex_bin, + kind: entry.kind, + parent_id: entry.parent_id, + worktree: entry.worktree, + settings: entry.settings, + }) + } + + async fn connect_workspace(&self, id: String, client_version: String) -> Result<(), String> { + { + let sessions = self.sessions.lock().await; + if sessions.contains_key(&id) { + return Ok(()); + } + } + + let entry = { + let workspaces = self.workspaces.lock().await; + workspaces + .get(&id) + .cloned() + .ok_or("workspace not found")? + }; + + let default_bin = { + let settings = self.app_settings.lock().await; + settings.codex_bin.clone() + }; + + let session = spawn_workspace_session( + entry, + default_bin, + client_version, + self.event_sink.clone(), + ) + .await?; + + self.sessions.lock().await.insert(id, session); + Ok(()) + } + + async fn update_app_settings(&self, settings: AppSettings) -> Result { + write_settings(&self.settings_path, &settings)?; + let mut current = self.app_settings.lock().await; + *current = settings.clone(); + Ok(settings) + } + + async fn get_session(&self, workspace_id: &str) -> Result, String> { + let sessions = self.sessions.lock().await; + sessions + .get(workspace_id) + .cloned() + .ok_or("workspace not connected".to_string()) + } + + async fn start_thread(&self, workspace_id: String) -> Result { + let session = self.get_session(&workspace_id).await?; + let params = json!({ + "cwd": session.entry.path, + "approvalPolicy": "on-request" + }); + session.send_request("thread/start", params).await + } + + async fn resume_thread(&self, workspace_id: String, thread_id: String) -> Result { + let session = self.get_session(&workspace_id).await?; + let params = json!({ + "threadId": thread_id + }); + session.send_request("thread/resume", params).await + } + + async fn list_threads( + &self, + workspace_id: String, + cursor: Option, + limit: Option, + ) -> Result { + let session = self.get_session(&workspace_id).await?; + let params = json!({ + "cursor": cursor, + "limit": limit + }); + session.send_request("thread/list", params).await + } + + async fn archive_thread(&self, workspace_id: String, thread_id: String) -> Result { + let session = self.get_session(&workspace_id).await?; + let params = json!({ "threadId": thread_id }); + session.send_request("thread/archive", params).await + } + + async fn send_user_message( + &self, + workspace_id: String, + thread_id: String, + text: String, + model: Option, + effort: Option, + access_mode: Option, + images: Option>, + ) -> Result { + let session = self.get_session(&workspace_id).await?; + let params = json!({ + "threadId": thread_id, + "message": text, + "model": model, + "effort": effort, + "approvalPolicy": access_mode, + "images": images + }); + session.send_request("thread/user_message", params).await + } + + async fn turn_interrupt( + &self, + workspace_id: String, + thread_id: String, + turn_id: String, + ) -> Result { + let session = self.get_session(&workspace_id).await?; + let params = json!({ + "threadId": thread_id, + "turnId": turn_id + }); + session.send_notification("turn/interrupt", Some(params)).await?; + Ok(json!({ "ok": true })) + } + + async fn start_review( + &self, + workspace_id: String, + thread_id: String, + target: Value, + delivery: Option, + ) -> Result { + let session = self.get_session(&workspace_id).await?; + let mut params = Map::new(); + params.insert("threadId".to_string(), json!(thread_id)); + params.insert("target".to_string(), target); + if let Some(delivery) = delivery { + params.insert("delivery".to_string(), json!(delivery)); + } + session + .send_request("review/start", Value::Object(params)) + .await + } + + async fn model_list(&self, workspace_id: String) -> Result { + let session = self.get_session(&workspace_id).await?; + session.send_request("model/list", json!({})).await + } + + async fn account_rate_limits(&self, workspace_id: String) -> Result { + let session = self.get_session(&workspace_id).await?; + session.send_request("account/rate_limits", json!({})).await + } + + async fn skills_list(&self, workspace_id: String) -> Result { + let session = self.get_session(&workspace_id).await?; + session.send_request("skills/list", json!({})).await + } + + async fn respond_to_server_request( + &self, + workspace_id: String, + request_id: u64, + result: Value, + ) -> Result { + let session = self.get_session(&workspace_id).await?; + session.send_response(request_id, result).await?; + Ok(json!({ "ok": true })) + } +} + +fn sort_workspaces(workspaces: &mut [WorkspaceInfo]) { + workspaces.sort_by(|a, b| { + let a_order = a.settings.sort_order.unwrap_or(u32::MAX); + let b_order = b.settings.sort_order.unwrap_or(u32::MAX); + if a_order != b_order { + return a_order.cmp(&b_order); + } + a.name.cmp(&b.name) + }); +} + +fn default_data_dir() -> PathBuf { + if let Ok(xdg) = env::var("XDG_DATA_HOME") { + let trimmed = xdg.trim(); + if !trimmed.is_empty() { + return PathBuf::from(trimmed).join("codex-monitor-daemon"); + } + } + let home = env::var("HOME").unwrap_or_else(|_| ".".to_string()); + PathBuf::from(home) + .join(".local") + .join("share") + .join("codex-monitor-daemon") +} + +fn usage() -> String { + format!( + "\ +USAGE:\n codex-monitor-daemon [--listen ] [--data-dir ] [--token | --insecure-no-auth]\n\n\ +OPTIONS:\n --listen Bind address (default: {DEFAULT_LISTEN_ADDR})\n --data-dir Data dir holding workspaces.json/settings.json\n --token Shared token required by clients\n --insecure-no-auth Disable auth (dev only)\n -h, --help Show this help\n" + ) +} + +fn parse_args() -> Result { + let mut listen = DEFAULT_LISTEN_ADDR + .parse::() + .map_err(|err| err.to_string())?; + let mut token = env::var("CODEX_MONITOR_DAEMON_TOKEN") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()); + let mut insecure_no_auth = false; + let mut data_dir: Option = None; + + let mut args = env::args().skip(1); + while let Some(arg) = args.next() { + match arg.as_str() { + "-h" | "--help" => { + print!("{}", usage()); + std::process::exit(0); + } + "--listen" => { + let value = args.next().ok_or("--listen requires a value")?; + listen = value.parse::().map_err(|err| err.to_string())?; + } + "--token" => { + let value = args.next().ok_or("--token requires a value")?; + let trimmed = value.trim(); + if trimmed.is_empty() { + return Err("--token requires a non-empty value".to_string()); + } + token = Some(trimmed.to_string()); + } + "--data-dir" => { + let value = args.next().ok_or("--data-dir requires a value")?; + let trimmed = value.trim(); + if trimmed.is_empty() { + return Err("--data-dir requires a non-empty value".to_string()); + } + data_dir = Some(PathBuf::from(trimmed)); + } + "--insecure-no-auth" => { + insecure_no_auth = true; + token = None; + } + _ => return Err(format!("Unknown argument: {arg}")), + } + } + + if token.is_none() && !insecure_no_auth { + return Err( + "Missing --token (or set CODEX_MONITOR_DAEMON_TOKEN). Use --insecure-no-auth for local dev only." + .to_string(), + ); + } + + Ok(DaemonConfig { + listen, + token, + data_dir: data_dir.unwrap_or_else(default_data_dir), + }) +} + +fn build_error_response(id: Option, message: &str) -> Option { + let id = id?; + Some( + serde_json::to_string(&json!({ + "id": id, + "error": { "message": message } + })) + .unwrap_or_else(|_| "{\"id\":0,\"error\":{\"message\":\"serialization failed\"}}".to_string()), + ) +} + +fn build_result_response(id: Option, result: Value) -> Option { + let id = id?; + Some(serde_json::to_string(&json!({ "id": id, "result": result })).unwrap_or_else(|_| { + "{\"id\":0,\"error\":{\"message\":\"serialization failed\"}}".to_string() + })) +} + +fn build_event_notification(event: DaemonEvent) -> Option { + let payload = match event { + DaemonEvent::AppServer(payload) => json!({ + "method": "app-server-event", + "params": payload, + }), + DaemonEvent::TerminalOutput(payload) => json!({ + "method": "terminal-output", + "params": payload, + }), + }; + serde_json::to_string(&payload).ok() +} + +fn parse_auth_token(params: &Value) -> Option { + match params { + Value::String(value) => Some(value.clone()), + Value::Object(map) => map + .get("token") + .and_then(|value| value.as_str()) + .map(|v| v.to_string()), + _ => None, + } +} + +fn parse_string(value: &Value, key: &str) -> Result { + match value { + Value::Object(map) => map + .get(key) + .and_then(|value| value.as_str()) + .map(|value| value.to_string()) + .ok_or_else(|| format!("missing or invalid `{key}`")), + _ => Err(format!("missing `{key}`")), + } +} + +fn parse_optional_string(value: &Value, key: &str) -> Option { + match value { + Value::Object(map) => map + .get(key) + .and_then(|value| value.as_str()) + .map(|v| v.to_string()), + _ => None, + } +} + +fn parse_optional_u32(value: &Value, key: &str) -> Option { + match value { + Value::Object(map) => map.get(key).and_then(|value| value.as_u64()).and_then(|v| { + if v > u32::MAX as u64 { + None + } else { + Some(v as u32) + } + }), + _ => None, + } +} + +fn parse_optional_bool(value: &Value, key: &str) -> Option { + match value { + Value::Object(map) => map.get(key).and_then(|value| value.as_bool()), + _ => None, + } +} + +fn parse_optional_string_array(value: &Value, key: &str) -> Option> { + match value { + Value::Object(map) => map.get(key).and_then(|value| value.as_array()).map(|items| { + items + .iter() + .filter_map(|item| item.as_str().map(|value| value.to_string())) + .collect::>() + }), + _ => None, + } +} + +async fn handle_rpc_request( + state: &DaemonState, + method: &str, + params: Value, + client_version: String, +) -> Result { + match method { + "ping" => Ok(json!({ "ok": true })), + "list_workspaces" => { + let workspaces = state.list_workspaces().await; + serde_json::to_value(workspaces).map_err(|err| err.to_string()) + } + "add_workspace" => { + let path = parse_string(¶ms, "path")?; + let codex_bin = parse_optional_string(¶ms, "codex_bin"); + let workspace = state.add_workspace(path, codex_bin, client_version).await?; + serde_json::to_value(workspace).map_err(|err| err.to_string()) + } + "connect_workspace" => { + let id = parse_string(¶ms, "id")?; + state.connect_workspace(id, client_version).await?; + Ok(json!({ "ok": true })) + } + "get_app_settings" => { + let settings = state.app_settings.lock().await; + serde_json::to_value(settings.clone()).map_err(|err| err.to_string()) + } + "update_app_settings" => { + let settings_value = match params { + Value::Object(map) => map.get("settings").cloned().unwrap_or(Value::Null), + _ => Value::Null, + }; + let settings: AppSettings = + serde_json::from_value(settings_value).map_err(|err| err.to_string())?; + let updated = state.update_app_settings(settings).await?; + serde_json::to_value(updated).map_err(|err| err.to_string()) + } + "start_thread" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + state.start_thread(workspace_id).await + } + "resume_thread" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + let thread_id = parse_string(¶ms, "threadId")?; + state.resume_thread(workspace_id, thread_id).await + } + "list_threads" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + let cursor = parse_optional_string(¶ms, "cursor"); + let limit = parse_optional_u32(¶ms, "limit"); + state.list_threads(workspace_id, cursor, limit).await + } + "archive_thread" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + let thread_id = parse_string(¶ms, "threadId")?; + state.archive_thread(workspace_id, thread_id).await + } + "send_user_message" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + let thread_id = parse_string(¶ms, "threadId")?; + let text = parse_string(¶ms, "text")?; + let model = parse_optional_string(¶ms, "model"); + let effort = parse_optional_string(¶ms, "effort"); + let access_mode = parse_optional_string(¶ms, "accessMode"); + let images = parse_optional_string_array(¶ms, "images"); + state + .send_user_message(workspace_id, thread_id, text, model, effort, access_mode, images) + .await + } + "turn_interrupt" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + let thread_id = parse_string(¶ms, "threadId")?; + let turn_id = parse_string(¶ms, "turnId")?; + state.turn_interrupt(workspace_id, thread_id, turn_id).await + } + "start_review" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + let thread_id = parse_string(¶ms, "threadId")?; + let target = params + .as_object() + .and_then(|map| map.get("target")) + .cloned() + .ok_or("missing `target`")?; + let delivery = parse_optional_string(¶ms, "delivery"); + state.start_review(workspace_id, thread_id, target, delivery).await + } + "model_list" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + state.model_list(workspace_id).await + } + "account_rate_limits" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + state.account_rate_limits(workspace_id).await + } + "skills_list" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + state.skills_list(workspace_id).await + } + "respond_to_server_request" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + let map = params.as_object().ok_or("missing requestId")?; + let request_id = map + .get("requestId") + .and_then(|value| value.as_u64()) + .ok_or("missing requestId")?; + let result = map.get("result").cloned().ok_or("missing `result`")?; + state + .respond_to_server_request(workspace_id, request_id, result) + .await + } + _ => Err(format!("unknown method: {method}")), + } +} + +async fn handle_client( + socket: TcpStream, + config: Arc, + state: Arc, + events: broadcast::Sender, +) { + let (reader, mut writer) = socket.into_split(); + let mut lines = BufReader::new(reader).lines(); + + let (out_tx, mut out_rx) = mpsc::unbounded_channel::(); + let write_task = tokio::spawn(async move { + while let Some(message) = out_rx.recv().await { + if writer.write_all(message.as_bytes()).await.is_err() { + break; + } + if writer.write_all(b"\n").await.is_err() { + break; + } + } + }); + + let mut authenticated = config.token.is_none(); + let mut events_task: Option> = None; + + while let Ok(Some(line)) = lines.next_line().await { + let line = line.trim(); + if line.is_empty() { + continue; + } + + let message: Value = match serde_json::from_str(line) { + Ok(value) => value, + Err(_) => continue, + }; + + let id = message.get("id").and_then(|value| value.as_u64()); + let method = message + .get("method") + .and_then(|value| value.as_str()) + .unwrap_or("") + .to_string(); + let params = message.get("params").cloned().unwrap_or(Value::Null); + + if !authenticated { + if method != "auth" { + if let Some(response) = build_error_response(id, "unauthorized") { + let _ = out_tx.send(response); + } + continue; + } + + let expected = config.token.clone().unwrap_or_default(); + let provided = parse_auth_token(¶ms).unwrap_or_default(); + if expected != provided { + if let Some(response) = build_error_response(id, "invalid token") { + let _ = out_tx.send(response); + } + continue; + } + + authenticated = true; + if let Some(response) = build_result_response(id, json!({ "ok": true })) { + let _ = out_tx.send(response); + } + + let mut rx = events.subscribe(); + let out_tx_events = out_tx.clone(); + events_task = Some(tokio::spawn(async move { + while let Ok(event) = rx.recv().await { + let Some(payload) = build_event_notification(event) else { + continue; + }; + if out_tx_events.send(payload).is_err() { + break; + } + } + })); + + continue; + } + + let client_version = format!("daemon-{}", env!("CARGO_PKG_VERSION")); + let result = handle_rpc_request(&state, &method, params, client_version).await; + let response = match result { + Ok(result) => build_result_response(id, result), + Err(message) => build_error_response(id, &message), + }; + if let Some(response) = response { + let _ = out_tx.send(response); + } + } + + drop(out_tx); + if let Some(task) = events_task { + task.abort(); + } + write_task.abort(); +} + +fn main() { + let config = match parse_args() { + Ok(config) => config, + Err(err) => { + eprintln!("{err}\n\n{}", usage()); + std::process::exit(2); + } + }; + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build tokio runtime"); + + runtime.block_on(async move { + let (events_tx, _events_rx) = broadcast::channel::(2048); + let event_sink = DaemonEventSink { + tx: events_tx.clone(), + }; + let state = Arc::new(DaemonState::load(&config, event_sink)); + let config = Arc::new(config); + + let listener = TcpListener::bind(config.listen) + .await + .unwrap_or_else(|err| panic!("failed to bind {}: {err}", config.listen)); + eprintln!( + "codex-monitor-daemon listening on {} (data dir: {})", + config.listen, + state + .storage_path + .parent() + .unwrap_or(&state.storage_path) + .display() + ); + + loop { + match listener.accept().await { + Ok((socket, _addr)) => { + let config = Arc::clone(&config); + let state = Arc::clone(&state); + let events = events_tx.clone(); + tokio::spawn(async move { + handle_client(socket, config, state, events).await; + }); + } + Err(_) => continue, + } + } + }); +} From 2b62a69ee5738e747224ca72a3cf22c3be58fa03 Mon Sep 17 00:00:00 2001 From: slkzgm Date: Thu, 15 Jan 2026 18:18:01 +0100 Subject: [PATCH 2/5] feat(daemon): align RPC with current app server API --- REMOTE_BACKEND_POC.md | 2 +- src-tauri/src/bin/codex_monitor_daemon.rs | 153 ++++++++++++++++++++-- 2 files changed, 146 insertions(+), 9 deletions(-) diff --git a/REMOTE_BACKEND_POC.md b/REMOTE_BACKEND_POC.md index 65cd6c674..8b2b264f5 100644 --- a/REMOTE_BACKEND_POC.md +++ b/REMOTE_BACKEND_POC.md @@ -53,6 +53,7 @@ printf '{\"id\":3,\"method\":\"list_workspaces\",\"params\":{}}\\n' | nc -w 1 12 - `list_workspaces` - `add_workspace` (`{ path, codex_bin? }`) - `connect_workspace` (`{ id }`) +- `list_workspace_files` (`{ workspaceId }`) - `get_app_settings` - `update_app_settings` (`{ settings }`) - `start_thread` (`{ workspaceId }`) @@ -66,4 +67,3 @@ printf '{\"id\":3,\"method\":\"list_workspaces\",\"params\":{}}\\n' | nc -w 1 12 - `account_rate_limits` (`{ workspaceId }`) - `skills_list` (`{ workspaceId }`) - `respond_to_server_request` (`{ workspaceId, requestId, result }`) - diff --git a/src-tauri/src/bin/codex_monitor_daemon.rs b/src-tauri/src/bin/codex_monitor_daemon.rs index eedb14b8a..edb52d3c6 100644 --- a/src-tauri/src/bin/codex_monitor_daemon.rs +++ b/src-tauri/src/bin/codex_monitor_daemon.rs @@ -12,6 +12,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use ignore::WalkBuilder; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc, Mutex}; @@ -203,6 +204,19 @@ impl DaemonState { .ok_or("workspace not connected".to_string()) } + async fn list_workspace_files(&self, workspace_id: String) -> Result, String> { + let entry = { + let workspaces = self.workspaces.lock().await; + workspaces + .get(&workspace_id) + .cloned() + .ok_or("workspace not found")? + }; + + let root = PathBuf::from(entry.path); + Ok(list_workspace_files_inner(&root, 20000)) + } + async fn start_thread(&self, workspace_id: String) -> Result { let session = self.get_session(&workspace_id).await?; let params = json!({ @@ -251,15 +265,62 @@ impl DaemonState { images: Option>, ) -> Result { let session = self.get_session(&workspace_id).await?; + let access_mode = access_mode.unwrap_or_else(|| "current".to_string()); + let sandbox_policy = match access_mode.as_str() { + "full-access" => json!({ + "type": "dangerFullAccess" + }), + "read-only" => json!({ + "type": "readOnly" + }), + _ => json!({ + "type": "workspaceWrite", + "writableRoots": [session.entry.path], + "networkAccess": true + }), + }; + + let approval_policy = if access_mode == "full-access" { + "never" + } else { + "on-request" + }; + + let trimmed_text = text.trim(); + let mut input: Vec = Vec::new(); + if !trimmed_text.is_empty() { + input.push(json!({ "type": "text", "text": trimmed_text })); + } + if let Some(paths) = images { + for path in paths { + let trimmed = path.trim(); + if trimmed.is_empty() { + continue; + } + if trimmed.starts_with("data:") + || trimmed.starts_with("http://") + || trimmed.starts_with("https://") + { + input.push(json!({ "type": "image", "url": trimmed })); + } else { + input.push(json!({ "type": "localImage", "path": trimmed })); + } + } + } + if input.is_empty() { + return Err("empty user message".to_string()); + } + let params = json!({ "threadId": thread_id, - "message": text, + "input": input, + "cwd": session.entry.path, + "approvalPolicy": approval_policy, + "sandboxPolicy": sandbox_policy, "model": model, "effort": effort, - "approvalPolicy": access_mode, - "images": images }); - session.send_request("thread/user_message", params).await + session.send_request("turn/start", params).await } async fn turn_interrupt( @@ -273,8 +334,7 @@ impl DaemonState { "threadId": thread_id, "turnId": turn_id }); - session.send_notification("turn/interrupt", Some(params)).await?; - Ok(json!({ "ok": true })) + session.send_request("turn/interrupt", params).await } async fn start_review( @@ -303,12 +363,17 @@ impl DaemonState { async fn account_rate_limits(&self, workspace_id: String) -> Result { let session = self.get_session(&workspace_id).await?; - session.send_request("account/rate_limits", json!({})).await + session + .send_request("account/rateLimits/read", Value::Null) + .await } async fn skills_list(&self, workspace_id: String) -> Result { let session = self.get_session(&workspace_id).await?; - session.send_request("skills/list", json!({})).await + let params = json!({ + "cwd": session.entry.path + }); + session.send_request("skills/list", params).await } async fn respond_to_server_request( @@ -334,6 +399,58 @@ fn sort_workspaces(workspaces: &mut [WorkspaceInfo]) { }); } +fn should_skip_dir(name: &str) -> bool { + matches!( + name, + ".git" | "node_modules" | "dist" | "target" | "release-artifacts" + ) +} + +fn normalize_git_path(path: &str) -> String { + path.replace('\\', "/") +} + +fn list_workspace_files_inner(root: &PathBuf, max_files: usize) -> Vec { + let mut results = Vec::new(); + let walker = WalkBuilder::new(root) + .hidden(false) + .follow_links(false) + .require_git(false) + .filter_entry(|entry| { + if entry.depth() == 0 { + return true; + } + if entry.file_type().is_some_and(|ft| ft.is_dir()) { + let name = entry.file_name().to_string_lossy(); + return !should_skip_dir(&name); + } + true + }) + .build(); + + for entry in walker { + let entry = match entry { + Ok(entry) => entry, + Err(_) => continue, + }; + if !entry.file_type().is_some_and(|ft| ft.is_file()) { + continue; + } + if let Ok(rel_path) = entry.path().strip_prefix(root) { + let normalized = normalize_git_path(&rel_path.to_string_lossy()); + if !normalized.is_empty() { + results.push(normalized); + } + } + if results.len() >= max_files { + break; + } + } + + results.sort(); + results +} + fn default_data_dir() -> PathBuf { if let Ok(xdg) = env::var("XDG_DATA_HOME") { let trimmed = xdg.trim(); @@ -535,6 +652,11 @@ async fn handle_rpc_request( state.connect_workspace(id, client_version).await?; Ok(json!({ "ok": true })) } + "list_workspace_files" => { + let workspace_id = parse_string(¶ms, "workspaceId")?; + let files = state.list_workspace_files(workspace_id).await?; + serde_json::to_value(files).map_err(|err| err.to_string()) + } "get_app_settings" => { let settings = state.app_settings.lock().await; serde_json::to_value(settings.clone()).map_err(|err| err.to_string()) @@ -650,6 +772,21 @@ async fn handle_client( let mut authenticated = config.token.is_none(); let mut events_task: Option> = None; + if authenticated { + let mut rx = events.subscribe(); + let out_tx_events = out_tx.clone(); + events_task = Some(tokio::spawn(async move { + while let Ok(event) = rx.recv().await { + let Some(payload) = build_event_notification(event) else { + continue; + }; + if out_tx_events.send(payload).is_err() { + break; + } + } + })); + } + while let Ok(Some(line)) = lines.next_line().await { let line = line.trim(); if line.is_empty() { From 52f06da542f2fe3f33bb883ec3ec4dedd1d30778 Mon Sep 17 00:00:00 2001 From: slkzgm Date: Fri, 16 Jan 2026 12:41:38 +0100 Subject: [PATCH 3/5] fix(dev): set default cargo binary --- src-tauri/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index f2117a789..bbf5c8741 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" description = "A Tauri App" authors = ["you"] edition = "2021" +default-run = "codex-monitor" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From e57aa2c744e7fa2d750b207b072c1050bb0636e2 Mon Sep 17 00:00:00 2001 From: slkzgm Date: Fri, 16 Jan 2026 12:43:20 +0100 Subject: [PATCH 4/5] fix(daemon): keep event stream alive on lag --- src-tauri/src/bin/codex_monitor_daemon.rs | 47 ++++++++++++----------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/src-tauri/src/bin/codex_monitor_daemon.rs b/src-tauri/src/bin/codex_monitor_daemon.rs index edb52d3c6..e5f2723f6 100644 --- a/src-tauri/src/bin/codex_monitor_daemon.rs +++ b/src-tauri/src/bin/codex_monitor_daemon.rs @@ -748,6 +748,27 @@ async fn handle_rpc_request( } } +async fn forward_events( + mut rx: broadcast::Receiver, + out_tx_events: mpsc::UnboundedSender, +) { + loop { + let event = match rx.recv().await { + Ok(event) => event, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + }; + + let Some(payload) = build_event_notification(event) else { + continue; + }; + + if out_tx_events.send(payload).is_err() { + break; + } + } +} + async fn handle_client( socket: TcpStream, config: Arc, @@ -773,18 +794,9 @@ async fn handle_client( let mut events_task: Option> = None; if authenticated { - let mut rx = events.subscribe(); + let rx = events.subscribe(); let out_tx_events = out_tx.clone(); - events_task = Some(tokio::spawn(async move { - while let Ok(event) = rx.recv().await { - let Some(payload) = build_event_notification(event) else { - continue; - }; - if out_tx_events.send(payload).is_err() { - break; - } - } - })); + events_task = Some(tokio::spawn(forward_events(rx, out_tx_events))); } while let Ok(Some(line)) = lines.next_line().await { @@ -828,18 +840,9 @@ async fn handle_client( let _ = out_tx.send(response); } - let mut rx = events.subscribe(); + let rx = events.subscribe(); let out_tx_events = out_tx.clone(); - events_task = Some(tokio::spawn(async move { - while let Ok(event) = rx.recv().await { - let Some(payload) = build_event_notification(event) else { - continue; - }; - if out_tx_events.send(payload).is_err() { - break; - } - } - })); + events_task = Some(tokio::spawn(forward_events(rx, out_tx_events))); continue; } From 2818820b77a53c5fb3fa11d3f419cca436e8f92b Mon Sep 17 00:00:00 2001 From: slkzgm Date: Fri, 16 Jan 2026 13:42:49 +0100 Subject: [PATCH 5/5] fix(daemon): pass CODEX_HOME for worktrees --- src-tauri/src/bin/codex_monitor_daemon.rs | 30 +++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src-tauri/src/bin/codex_monitor_daemon.rs b/src-tauri/src/bin/codex_monitor_daemon.rs index e5f2723f6..2581f58dd 100644 --- a/src-tauri/src/bin/codex_monitor_daemon.rs +++ b/src-tauri/src/bin/codex_monitor_daemon.rs @@ -126,11 +126,13 @@ impl DaemonState { settings.codex_bin.clone() }; + let codex_home = resolve_codex_home(&entry, None); let session = spawn_workspace_session( entry.clone(), default_bin, client_version, self.event_sink.clone(), + codex_home, ) .await?; @@ -177,11 +179,23 @@ impl DaemonState { settings.codex_bin.clone() }; + let parent_path = if entry.kind.is_worktree() { + let workspaces = self.workspaces.lock().await; + entry + .parent_id + .as_deref() + .and_then(|parent_id| workspaces.get(parent_id)) + .map(|parent| parent.path.clone()) + } else { + None + }; + let codex_home = resolve_codex_home(&entry, parent_path.as_deref()); let session = spawn_workspace_session( entry, default_bin, client_version, self.event_sink.clone(), + codex_home, ) .await?; @@ -399,6 +413,22 @@ fn sort_workspaces(workspaces: &mut [WorkspaceInfo]) { }); } +fn resolve_codex_home(entry: &WorkspaceEntry, parent_path: Option<&str>) -> Option { + if entry.kind.is_worktree() { + if let Some(parent_path) = parent_path { + let legacy_home = PathBuf::from(parent_path).join(".codexmonitor"); + if legacy_home.is_dir() { + return Some(legacy_home); + } + } + } + let legacy_home = PathBuf::from(&entry.path).join(".codexmonitor"); + if legacy_home.is_dir() { + return Some(legacy_home); + } + None +} + fn should_skip_dir(name: &str) -> bool { matches!( name,