diff --git a/src-tauri/src/codex.rs b/src-tauri/src/codex.rs index ee6b1462d..df0f6b79f 100644 --- a/src-tauri/src/codex.rs +++ b/src-tauri/src/codex.rs @@ -14,6 +14,7 @@ use crate::backend::app_server::{ spawn_workspace_session as spawn_workspace_session_inner, }; use crate::event_sink::TauriEventSink; +use crate::remote_backend; use crate::state::AppState; use crate::types::WorkspaceEntry; @@ -131,7 +132,18 @@ pub(crate) async fn codex_doctor( pub(crate) async fn start_thread( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "start_thread", + json!({ "workspaceId": workspace_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -148,7 +160,18 @@ pub(crate) async fn resume_thread( workspace_id: String, thread_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "resume_thread", + json!({ "workspaceId": workspace_id, "threadId": thread_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -165,7 +188,18 @@ pub(crate) async fn list_threads( cursor: Option, limit: Option, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "list_threads", + json!({ "workspaceId": workspace_id, "cursor": cursor, "limit": limit }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -182,7 +216,18 @@ pub(crate) async fn archive_thread( workspace_id: String, thread_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "archive_thread", + json!({ "workspaceId": workspace_id, "threadId": thread_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -204,7 +249,33 @@ pub(crate) async fn send_user_message( images: Option>, collaboration_mode: Option, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let images = images.map(|paths| { + paths + .into_iter() + .map(remote_backend::normalize_path_for_remote) + .collect::>() + }); + return remote_backend::call_remote( + &*state, + app, + "send_user_message", + json!({ + "workspaceId": workspace_id, + "threadId": thread_id, + "text": text, + "model": model, + "effort": effort, + "accessMode": access_mode, + "images": images, + "collaborationMode": collaboration_mode, + }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -272,7 +343,18 @@ pub(crate) async fn send_user_message( pub(crate) async fn collaboration_mode_list( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "collaboration_mode_list", + json!({ "workspaceId": workspace_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -288,7 +370,18 @@ pub(crate) async fn turn_interrupt( thread_id: String, turn_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "turn_interrupt", + json!({ "workspaceId": workspace_id, "threadId": thread_id, "turnId": turn_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -307,7 +400,23 @@ pub(crate) async fn start_review( target: Value, delivery: Option, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "start_review", + json!({ + "workspaceId": workspace_id, + "threadId": thread_id, + "target": target, + "delivery": delivery, + }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -327,7 +436,18 @@ pub(crate) async fn start_review( pub(crate) async fn model_list( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "model_list", + json!({ "workspaceId": workspace_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -340,7 +460,18 @@ pub(crate) async fn model_list( pub(crate) async fn account_rate_limits( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "account_rate_limits", + json!({ "workspaceId": workspace_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -354,7 +485,18 @@ pub(crate) async fn account_rate_limits( pub(crate) async fn skills_list( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "skills_list", + json!({ "workspaceId": workspace_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -371,7 +513,19 @@ pub(crate) async fn respond_to_server_request( request_id: u64, result: Value, state: State<'_, AppState>, + app: AppHandle, ) -> Result<(), String> { + if remote_backend::is_remote_mode(&*state).await { + remote_backend::call_remote( + &*state, + app, + "respond_to_server_request", + json!({ "workspaceId": workspace_id, "requestId": request_id, "result": result }), + ) + .await?; + return Ok(()); + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 7495007e4..fa1ce7b6b 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -10,6 +10,7 @@ mod git; mod git_utils; mod local_usage; mod prompts; +mod remote_backend; mod settings; mod state; mod terminal; diff --git a/src-tauri/src/remote_backend.rs b/src-tauri/src/remote_backend.rs new file mode 100644 index 000000000..a844f0fff --- /dev/null +++ b/src-tauri/src/remote_backend.rs @@ -0,0 +1,263 @@ +use serde_json::{json, Value}; +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; + +use tauri::{AppHandle, Emitter}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, oneshot, Mutex}; + +use crate::state::AppState; +use crate::types::BackendMode; + +const DEFAULT_REMOTE_HOST: &str = "127.0.0.1:4732"; +const DISCONNECTED_MESSAGE: &str = "remote backend disconnected"; + +type PendingMap = HashMap>>; + +pub(crate) fn normalize_path_for_remote(path: String) -> String { + let trimmed = path.trim(); + if trimmed.is_empty() { + return path; + } + + if let Some(normalized) = normalize_wsl_unc_path(trimmed) { + return normalized; + } + + path +} + +fn normalize_wsl_unc_path(path: &str) -> Option { + let lower = path.to_ascii_lowercase(); + let (prefix_len, raw) = if lower.starts_with("\\\\wsl$\\") { + (7, path) + } else if lower.starts_with("\\\\wsl.localhost\\") { + (16, path) + } else { + return None; + }; + + let remainder = raw.get(prefix_len..)?; + let mut segments = remainder.split('\\').filter(|segment| !segment.is_empty()); + segments.next()?; + let joined = segments.collect::>().join("/"); + Some(if joined.is_empty() { + "/".to_string() + } else { + format!("/{joined}") + }) +} + +#[derive(Clone)] +pub(crate) struct RemoteBackend { + inner: Arc, +} + +struct RemoteBackendInner { + out_tx: mpsc::UnboundedSender, + pending: Arc>, + next_id: AtomicU64, + connected: Arc, +} + +impl RemoteBackend { + pub(crate) async fn call(&self, method: &str, params: Value) -> Result { + if !self.inner.connected.load(Ordering::SeqCst) { + return Err(DISCONNECTED_MESSAGE.to_string()); + } + + let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); + let (tx, rx) = oneshot::channel(); + self.inner.pending.lock().await.insert(id, tx); + + let request = json!({ + "id": id, + "method": method, + "params": params, + }); + let message = serde_json::to_string(&request).map_err(|err| err.to_string())?; + if self.inner.out_tx.send(message).is_err() { + self.inner.pending.lock().await.remove(&id); + return Err(DISCONNECTED_MESSAGE.to_string()); + } + + rx.await + .map_err(|_| DISCONNECTED_MESSAGE.to_string())? + } +} + +pub(crate) async fn is_remote_mode(state: &AppState) -> bool { + let settings = state.app_settings.lock().await; + matches!(settings.backend_mode, BackendMode::Remote) +} + +pub(crate) async fn call_remote( + state: &AppState, + app: AppHandle, + method: &str, + params: Value, +) -> Result { + let client = ensure_remote_backend(state, app).await?; + match client.call(method, params).await { + Ok(value) => Ok(value), + Err(err) => { + *state.remote_backend.lock().await = None; + Err(err) + } + } +} + +async fn ensure_remote_backend(state: &AppState, app: AppHandle) -> Result { + { + let guard = state.remote_backend.lock().await; + if let Some(client) = guard.as_ref() { + return Ok(client.clone()); + } + } + + let (host, token) = { + let settings = state.app_settings.lock().await; + ( + settings.remote_backend_host.clone(), + settings.remote_backend_token.clone(), + ) + }; + + let resolved_host = if host.trim().is_empty() { + DEFAULT_REMOTE_HOST.to_string() + } else { + host + }; + + let stream = TcpStream::connect(resolved_host.clone()) + .await + .map_err(|err| format!("Failed to connect to remote backend at {resolved_host}: {err}"))?; + let (reader, mut writer) = stream.into_split(); + + let (out_tx, mut out_rx) = mpsc::unbounded_channel::(); + let pending = Arc::new(Mutex::new(PendingMap::new())); + let pending_for_writer = Arc::clone(&pending); + let pending_for_reader = Arc::clone(&pending); + + let connected = Arc::new(AtomicBool::new(true)); + let connected_for_writer = Arc::clone(&connected); + let connected_for_reader = Arc::clone(&connected); + + let write_task = tokio::spawn(async move { + while let Some(message) = out_rx.recv().await { + if writer.write_all(message.as_bytes()).await.is_err() + || writer.write_all(b"\n").await.is_err() + { + connected_for_writer.store(false, Ordering::SeqCst); + let mut pending = pending_for_writer.lock().await; + for (_, sender) in pending.drain() { + let _ = sender.send(Err(DISCONNECTED_MESSAGE.to_string())); + } + break; + } + } + }); + + let app_for_reader = app.clone(); + let read_task = tokio::spawn(async move { + read_loop( + app_for_reader, + reader, + pending_for_reader, + connected_for_reader, + ) + .await; + }); + + let client = RemoteBackend { + inner: Arc::new(RemoteBackendInner { + out_tx, + pending, + next_id: AtomicU64::new(1), + connected, + }), + }; + + if let Some(token) = token { + client + .call("auth", json!({ "token": token })) + .await + .map(|_| ())?; + } + + { + let mut guard = state.remote_backend.lock().await; + *guard = Some(client.clone()); + } + + drop((write_task, read_task)); + + Ok(client) +} + +async fn read_loop( + app: AppHandle, + reader: tokio::net::tcp::OwnedReadHalf, + pending: Arc>, + connected: Arc, +) { + let mut lines = BufReader::new(reader).lines(); + + while let Ok(Some(line)) = lines.next_line().await { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let message: Value = match serde_json::from_str(trimmed) { + Ok(value) => value, + Err(_) => continue, + }; + + if let Some(id) = message.get("id").and_then(|value| value.as_u64()) { + let sender = pending.lock().await.remove(&id); + let Some(sender) = sender else { + continue; + }; + + if let Some(error) = message.get("error") { + let err_message = error + .get("message") + .and_then(|value| value.as_str()) + .unwrap_or("remote error"); + let _ = sender.send(Err(err_message.to_string())); + continue; + } + + let result = message.get("result").cloned().unwrap_or(Value::Null); + let _ = sender.send(Ok(result)); + continue; + } + + let method = message + .get("method") + .and_then(|value| value.as_str()) + .unwrap_or(""); + if method.is_empty() { + continue; + } + let params = message.get("params").cloned().unwrap_or(Value::Null); + match method { + "app-server-event" => { + let _ = app.emit("app-server-event", params); + } + "terminal-output" => { + let _ = app.emit("terminal-output", params); + } + _ => {} + } + } + + connected.store(false, Ordering::SeqCst); + let mut pending = pending.lock().await; + for (_, sender) in pending.drain() { + let _ = sender.send(Err(DISCONNECTED_MESSAGE.to_string())); + } +} diff --git a/src-tauri/src/state.rs b/src-tauri/src/state.rs index 5465e697f..eb82133d9 100644 --- a/src-tauri/src/state.rs +++ b/src-tauri/src/state.rs @@ -14,6 +14,7 @@ pub(crate) struct AppState { pub(crate) sessions: Mutex>>, pub(crate) terminal_sessions: Mutex>>, + pub(crate) remote_backend: Mutex>, pub(crate) storage_path: PathBuf, pub(crate) settings_path: PathBuf, pub(crate) app_settings: Mutex, @@ -34,6 +35,7 @@ impl AppState { workspaces: Mutex::new(workspaces), sessions: Mutex::new(HashMap::new()), terminal_sessions: Mutex::new(HashMap::new()), + remote_backend: Mutex::new(None), storage_path, settings_path, app_settings: Mutex::new(app_settings), diff --git a/src-tauri/src/workspaces.rs b/src-tauri/src/workspaces.rs index 1b8d1f5ec..1af61087c 100644 --- a/src-tauri/src/workspaces.rs +++ b/src-tauri/src/workspaces.rs @@ -3,12 +3,14 @@ use std::path::PathBuf; use std::process::Stdio; use ignore::WalkBuilder; +use serde_json::json; use tauri::{AppHandle, Manager, State}; use tokio::io::AsyncWriteExt; use tokio::process::Command; use uuid::Uuid; use crate::codex::spawn_workspace_session; +use crate::remote_backend; use crate::state::AppState; use crate::git_utils::resolve_git_root; use crate::storage::write_workspaces; @@ -229,7 +231,13 @@ fn unique_worktree_path(base_dir: &PathBuf, name: &str) -> PathBuf { #[tauri::command] pub(crate) async fn list_workspaces( state: State<'_, AppState>, + app: AppHandle, ) -> Result, String> { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote(&*state, app, "list_workspaces", json!({})).await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let workspaces = state.workspaces.lock().await; let sessions = state.sessions.lock().await; let mut result = Vec::new(); @@ -257,6 +265,18 @@ pub(crate) async fn add_workspace( state: State<'_, AppState>, app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let path = remote_backend::normalize_path_for_remote(path); + let response = remote_backend::call_remote( + &*state, + app, + "add_workspace", + json!({ "path": path, "codex_bin": codex_bin }), + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let name = PathBuf::from(&path) .file_name() .and_then(|s| s.to_str()) @@ -311,6 +331,17 @@ pub(crate) async fn add_worktree( state: State<'_, AppState>, app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote( + &*state, + app, + "add_worktree", + json!({ "parentId": parent_id, "branch": branch }), + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let branch = branch.trim(); if branch.is_empty() { return Err("Branch name is required.".to_string()); @@ -404,7 +435,13 @@ pub(crate) async fn add_worktree( pub(crate) async fn remove_workspace( id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result<(), String> { + if remote_backend::is_remote_mode(&*state).await { + remote_backend::call_remote(&*state, app, "remove_workspace", json!({ "id": id })).await?; + return Ok(()); + } + let (entry, child_worktrees) = { let workspaces = state.workspaces.lock().await; let entry = workspaces @@ -461,7 +498,13 @@ pub(crate) async fn remove_workspace( pub(crate) async fn remove_worktree( id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result<(), String> { + if remote_backend::is_remote_mode(&*state).await { + remote_backend::call_remote(&*state, app, "remove_worktree", json!({ "id": id })).await?; + return Ok(()); + } + let (entry, parent) = { let workspaces = state.workspaces.lock().await; let entry = workspaces @@ -640,7 +683,19 @@ pub(crate) async fn update_workspace_settings( id: String, settings: WorkspaceSettings, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote( + &*state, + app, + "update_workspace_settings", + json!({ "id": id, "settings": settings }), + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let (entry_snapshot, list) = { let mut workspaces = state.workspaces.lock().await; let entry_snapshot = apply_workspace_settings_update(&mut workspaces, &id, settings)?; @@ -668,7 +723,19 @@ pub(crate) async fn update_workspace_codex_bin( id: String, codex_bin: Option, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote( + &*state, + app, + "update_workspace_codex_bin", + json!({ "id": id, "codex_bin": codex_bin }), + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let (entry_snapshot, list) = { let mut workspaces = state.workspaces.lock().await; let entry_snapshot = match workspaces.get_mut(&id) { @@ -703,6 +770,12 @@ pub(crate) async fn connect_workspace( state: State<'_, AppState>, app: AppHandle, ) -> Result<(), String> { + if remote_backend::is_remote_mode(&*state).await { + remote_backend::call_remote(&*state, app, "connect_workspace", json!({ "id": id })) + .await?; + return Ok(()); + } + let (entry, parent_path) = { let workspaces = state.workspaces.lock().await; workspaces @@ -733,7 +806,19 @@ pub(crate) async fn connect_workspace( pub(crate) async fn list_workspace_files( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result, String> { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote( + &*state, + app, + "list_workspace_files", + json!({ "workspaceId": workspace_id }), + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let workspaces = state.workspaces.lock().await; let entry = workspaces .get(&workspace_id)