From 191681a65a261b313c0b7e295af1d664263821c5 Mon Sep 17 00:00:00 2001 From: slkzgm Date: Mon, 19 Jan 2026 15:49:13 +0100 Subject: [PATCH 1/4] feat(remote): proxy core commands via daemon --- src-tauri/src/codex.rs | 136 +++++++++++++++++++++ src-tauri/src/lib.rs | 1 + src-tauri/src/remote_backend.rs | 201 ++++++++++++++++++++++++++++++++ src-tauri/src/state.rs | 2 + src-tauri/src/workspaces.rs | 38 ++++++ 5 files changed, 378 insertions(+) create mode 100644 src-tauri/src/remote_backend.rs diff --git a/src-tauri/src/codex.rs b/src-tauri/src/codex.rs index ee6b1462d..91bd7561b 100644 --- a/src-tauri/src/codex.rs +++ b/src-tauri/src/codex.rs @@ -14,6 +14,7 @@ use crate::backend::app_server::{ spawn_workspace_session as spawn_workspace_session_inner, }; use crate::event_sink::TauriEventSink; +use crate::remote_backend; use crate::state::AppState; use crate::types::WorkspaceEntry; @@ -131,7 +132,18 @@ pub(crate) async fn codex_doctor( pub(crate) async fn start_thread( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "start_thread", + json!({ "workspaceId": workspace_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -148,7 +160,18 @@ pub(crate) async fn resume_thread( workspace_id: String, thread_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "resume_thread", + json!({ "workspaceId": workspace_id, "threadId": thread_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -165,7 +188,18 @@ pub(crate) async fn list_threads( cursor: Option, limit: Option, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "list_threads", + json!({ "workspaceId": workspace_id, "cursor": cursor, "limit": limit }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -182,7 +216,18 @@ pub(crate) async fn archive_thread( workspace_id: String, thread_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "archive_thread", + json!({ "workspaceId": workspace_id, "threadId": thread_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -204,7 +249,26 @@ pub(crate) async fn send_user_message( images: Option>, collaboration_mode: Option, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "send_user_message", + json!({ + "workspaceId": workspace_id, + "threadId": thread_id, + "text": text, + "model": model, + "effort": effort, + "accessMode": access_mode, + "images": images, + }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -288,7 +352,18 @@ pub(crate) async fn turn_interrupt( thread_id: String, turn_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "turn_interrupt", + json!({ "workspaceId": workspace_id, "threadId": thread_id, "turnId": turn_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -307,7 +382,23 @@ pub(crate) async fn start_review( target: Value, delivery: Option, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "start_review", + json!({ + "workspaceId": workspace_id, + "threadId": thread_id, + "target": target, + "delivery": delivery, + }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -327,7 +418,18 @@ pub(crate) async fn start_review( pub(crate) async fn model_list( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "model_list", + json!({ "workspaceId": workspace_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -340,7 +442,18 @@ pub(crate) async fn model_list( pub(crate) async fn account_rate_limits( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "account_rate_limits", + json!({ "workspaceId": workspace_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -354,7 +467,18 @@ pub(crate) async fn account_rate_limits( pub(crate) async fn skills_list( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "skills_list", + json!({ "workspaceId": workspace_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) @@ -371,7 +495,19 @@ pub(crate) async fn respond_to_server_request( request_id: u64, result: Value, state: State<'_, AppState>, + app: AppHandle, ) -> Result<(), String> { + if remote_backend::is_remote_mode(&*state).await { + remote_backend::call_remote( + &*state, + app, + "respond_to_server_request", + json!({ "workspaceId": workspace_id, "requestId": request_id, "result": result }), + ) + .await?; + return Ok(()); + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 7495007e4..fa1ce7b6b 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -10,6 +10,7 @@ mod git; mod git_utils; mod local_usage; mod prompts; +mod remote_backend; mod settings; mod state; mod terminal; diff --git a/src-tauri/src/remote_backend.rs b/src-tauri/src/remote_backend.rs new file mode 100644 index 000000000..5e1c6487f --- /dev/null +++ b/src-tauri/src/remote_backend.rs @@ -0,0 +1,201 @@ +use serde_json::{json, Value}; +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use tauri::{AppHandle, Emitter}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, oneshot, Mutex}; + +use crate::state::AppState; +use crate::types::BackendMode; + +const DEFAULT_REMOTE_HOST: &str = "127.0.0.1:4732"; +const DISCONNECTED_MESSAGE: &str = "remote backend disconnected"; + +type PendingMap = HashMap>>; + +#[derive(Clone)] +pub(crate) struct RemoteBackend { + inner: Arc, +} + +struct RemoteBackendInner { + out_tx: mpsc::UnboundedSender, + pending: Arc>, + next_id: AtomicU64, +} + +impl RemoteBackend { + pub(crate) async fn call(&self, method: &str, params: Value) -> Result { + let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); + let (tx, rx) = oneshot::channel(); + self.inner.pending.lock().await.insert(id, tx); + + let request = json!({ + "id": id, + "method": method, + "params": params, + }); + let message = serde_json::to_string(&request).map_err(|err| err.to_string())?; + self.inner + .out_tx + .send(message) + .map_err(|_| DISCONNECTED_MESSAGE.to_string())?; + + rx.await + .map_err(|_| DISCONNECTED_MESSAGE.to_string())? + } +} + +pub(crate) async fn is_remote_mode(state: &AppState) -> bool { + let settings = state.app_settings.lock().await; + matches!(settings.backend_mode, BackendMode::Remote) +} + +pub(crate) async fn call_remote( + state: &AppState, + app: AppHandle, + method: &str, + params: Value, +) -> Result { + let client = ensure_remote_backend(state, app).await?; + match client.call(method, params).await { + Ok(value) => Ok(value), + Err(err) => { + *state.remote_backend.lock().await = None; + Err(err) + } + } +} + +async fn ensure_remote_backend(state: &AppState, app: AppHandle) -> Result { + { + let guard = state.remote_backend.lock().await; + if let Some(client) = guard.as_ref() { + return Ok(client.clone()); + } + } + + let (host, token) = { + let settings = state.app_settings.lock().await; + ( + settings.remote_backend_host.clone(), + settings.remote_backend_token.clone(), + ) + }; + + let resolved_host = if host.trim().is_empty() { + DEFAULT_REMOTE_HOST.to_string() + } else { + host + }; + + let stream = TcpStream::connect(resolved_host.clone()) + .await + .map_err(|err| format!("Failed to connect to remote backend at {resolved_host}: {err}"))?; + let (reader, mut writer) = stream.into_split(); + + let (out_tx, mut out_rx) = mpsc::unbounded_channel::(); + let write_task = tokio::spawn(async move { + while let Some(message) = out_rx.recv().await { + if writer.write_all(message.as_bytes()).await.is_err() { + break; + } + if writer.write_all(b"\n").await.is_err() { + break; + } + } + }); + + let pending = Arc::new(Mutex::new(PendingMap::new())); + let pending_for_reader = Arc::clone(&pending); + let app_for_reader = app.clone(); + let read_task = tokio::spawn(async move { + read_loop(app_for_reader, reader, pending_for_reader).await; + }); + + let client = RemoteBackend { + inner: Arc::new(RemoteBackendInner { + out_tx, + pending, + next_id: AtomicU64::new(1), + }), + }; + + if let Some(token) = token { + client + .call("auth", json!({ "token": token })) + .await + .map(|_| ())?; + } + + { + let mut guard = state.remote_backend.lock().await; + *guard = Some(client.clone()); + } + + drop((write_task, read_task)); + + Ok(client) +} + +async fn read_loop(app: AppHandle, reader: tokio::net::tcp::OwnedReadHalf, pending: Arc>) { + let mut lines = BufReader::new(reader).lines(); + + while let Ok(Some(line)) = lines.next_line().await { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let message: Value = match serde_json::from_str(trimmed) { + Ok(value) => value, + Err(_) => continue, + }; + + if let Some(id) = message.get("id").and_then(|value| value.as_u64()) { + let sender = pending.lock().await.remove(&id); + let Some(sender) = sender else { + continue; + }; + + if let Some(error) = message.get("error") { + let err_message = error + .get("message") + .and_then(|value| value.as_str()) + .unwrap_or("remote error"); + let _ = sender.send(Err(err_message.to_string())); + continue; + } + + let result = message.get("result").cloned().unwrap_or(Value::Null); + let _ = sender.send(Ok(result)); + continue; + } + + let method = message + .get("method") + .and_then(|value| value.as_str()) + .unwrap_or(""); + if method.is_empty() { + continue; + } + let params = message.get("params").cloned().unwrap_or(Value::Null); + match method { + "app-server-event" => { + let _ = app.emit("app-server-event", params); + } + "terminal-output" => { + let _ = app.emit("terminal-output", params); + } + _ => {} + } + } + + let mut pending = pending.lock().await; + for (_, sender) in pending.drain() { + let _ = sender.send(Err(DISCONNECTED_MESSAGE.to_string())); + } +} diff --git a/src-tauri/src/state.rs b/src-tauri/src/state.rs index 5465e697f..eb82133d9 100644 --- a/src-tauri/src/state.rs +++ b/src-tauri/src/state.rs @@ -14,6 +14,7 @@ pub(crate) struct AppState { pub(crate) sessions: Mutex>>, pub(crate) terminal_sessions: Mutex>>, + pub(crate) remote_backend: Mutex>, pub(crate) storage_path: PathBuf, pub(crate) settings_path: PathBuf, pub(crate) app_settings: Mutex, @@ -34,6 +35,7 @@ impl AppState { workspaces: Mutex::new(workspaces), sessions: Mutex::new(HashMap::new()), terminal_sessions: Mutex::new(HashMap::new()), + remote_backend: Mutex::new(None), storage_path, settings_path, app_settings: Mutex::new(app_settings), diff --git a/src-tauri/src/workspaces.rs b/src-tauri/src/workspaces.rs index 1b8d1f5ec..e72ac4beb 100644 --- a/src-tauri/src/workspaces.rs +++ b/src-tauri/src/workspaces.rs @@ -3,12 +3,14 @@ use std::path::PathBuf; use std::process::Stdio; use ignore::WalkBuilder; +use serde_json::json; use tauri::{AppHandle, Manager, State}; use tokio::io::AsyncWriteExt; use tokio::process::Command; use uuid::Uuid; use crate::codex::spawn_workspace_session; +use crate::remote_backend; use crate::state::AppState; use crate::git_utils::resolve_git_root; use crate::storage::write_workspaces; @@ -229,7 +231,13 @@ fn unique_worktree_path(base_dir: &PathBuf, name: &str) -> PathBuf { #[tauri::command] pub(crate) async fn list_workspaces( state: State<'_, AppState>, + app: AppHandle, ) -> Result, String> { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote(&*state, app, "list_workspaces", json!({})).await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let workspaces = state.workspaces.lock().await; let sessions = state.sessions.lock().await; let mut result = Vec::new(); @@ -257,6 +265,17 @@ pub(crate) async fn add_workspace( state: State<'_, AppState>, app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote( + &*state, + app, + "add_workspace", + json!({ "path": path, "codex_bin": codex_bin }), + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let name = PathBuf::from(&path) .file_name() .and_then(|s| s.to_str()) @@ -703,6 +722,13 @@ pub(crate) async fn connect_workspace( state: State<'_, AppState>, app: AppHandle, ) -> Result<(), String> { + let (entry, parent_path) = { + if remote_backend::is_remote_mode(&*state).await { + remote_backend::call_remote(&*state, app, "connect_workspace", json!({ "id": id })) + .await?; + return Ok(()); + } + let (entry, parent_path) = { let workspaces = state.workspaces.lock().await; workspaces @@ -733,7 +759,19 @@ pub(crate) async fn connect_workspace( pub(crate) async fn list_workspace_files( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result, String> { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote( + &*state, + app, + "list_workspace_files", + json!({ "workspaceId": workspace_id }), + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let workspaces = state.workspaces.lock().await; let entry = workspaces .get(&workspace_id) From ade002203a26bbc821ca3a6ce3bd4a4ab77d35a2 Mon Sep 17 00:00:00 2001 From: slkzgm Date: Mon, 19 Jan 2026 15:49:13 +0100 Subject: [PATCH 2/4] fix(remote): repair connect_workspace remote mode --- src-tauri/src/workspaces.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src-tauri/src/workspaces.rs b/src-tauri/src/workspaces.rs index e72ac4beb..954c19f36 100644 --- a/src-tauri/src/workspaces.rs +++ b/src-tauri/src/workspaces.rs @@ -722,7 +722,6 @@ pub(crate) async fn connect_workspace( state: State<'_, AppState>, app: AppHandle, ) -> Result<(), String> { - let (entry, parent_path) = { if remote_backend::is_remote_mode(&*state).await { remote_backend::call_remote(&*state, app, "connect_workspace", json!({ "id": id })) .await?; From 561dfa48fd512c53bb8705ccf49d0e2eba5f1be6 Mon Sep 17 00:00:00 2001 From: slkzgm Date: Mon, 19 Jan 2026 15:50:43 +0100 Subject: [PATCH 3/4] feat(remote): proxy workspace management and normalize WSL paths --- src-tauri/src/codex.rs | 6 +++++ src-tauri/src/remote_backend.rs | 34 +++++++++++++++++++++++ src-tauri/src/workspaces.rs | 48 +++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+) diff --git a/src-tauri/src/codex.rs b/src-tauri/src/codex.rs index 91bd7561b..4ed244779 100644 --- a/src-tauri/src/codex.rs +++ b/src-tauri/src/codex.rs @@ -252,6 +252,12 @@ pub(crate) async fn send_user_message( app: AppHandle, ) -> Result { if remote_backend::is_remote_mode(&*state).await { + let images = images.map(|paths| { + paths + .into_iter() + .map(remote_backend::normalize_path_for_remote) + .collect::>() + }); return remote_backend::call_remote( &*state, app, diff --git a/src-tauri/src/remote_backend.rs b/src-tauri/src/remote_backend.rs index 5e1c6487f..42f472c3d 100644 --- a/src-tauri/src/remote_backend.rs +++ b/src-tauri/src/remote_backend.rs @@ -16,6 +16,40 @@ const DISCONNECTED_MESSAGE: &str = "remote backend disconnected"; type PendingMap = HashMap>>; +pub(crate) fn normalize_path_for_remote(path: String) -> String { + let trimmed = path.trim(); + if trimmed.is_empty() { + return path; + } + + if let Some(normalized) = normalize_wsl_unc_path(trimmed) { + return normalized; + } + + path +} + +fn normalize_wsl_unc_path(path: &str) -> Option { + let lower = path.to_ascii_lowercase(); + let (prefix_len, raw) = if lower.starts_with("\\\\wsl$\\") { + (7, path) + } else if lower.starts_with("\\\\wsl.localhost\\") { + (16, path) + } else { + return None; + }; + + let remainder = raw.get(prefix_len..)?; + let mut segments = remainder.split('\\').filter(|segment| !segment.is_empty()); + segments.next()?; + let joined = segments.collect::>().join("/"); + Some(if joined.is_empty() { + "/".to_string() + } else { + format!("/{joined}") + }) +} + #[derive(Clone)] pub(crate) struct RemoteBackend { inner: Arc, diff --git a/src-tauri/src/workspaces.rs b/src-tauri/src/workspaces.rs index 954c19f36..1af61087c 100644 --- a/src-tauri/src/workspaces.rs +++ b/src-tauri/src/workspaces.rs @@ -266,6 +266,7 @@ pub(crate) async fn add_workspace( app: AppHandle, ) -> Result { if remote_backend::is_remote_mode(&*state).await { + let path = remote_backend::normalize_path_for_remote(path); let response = remote_backend::call_remote( &*state, app, @@ -330,6 +331,17 @@ pub(crate) async fn add_worktree( state: State<'_, AppState>, app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote( + &*state, + app, + "add_worktree", + json!({ "parentId": parent_id, "branch": branch }), + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let branch = branch.trim(); if branch.is_empty() { return Err("Branch name is required.".to_string()); @@ -423,7 +435,13 @@ pub(crate) async fn add_worktree( pub(crate) async fn remove_workspace( id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result<(), String> { + if remote_backend::is_remote_mode(&*state).await { + remote_backend::call_remote(&*state, app, "remove_workspace", json!({ "id": id })).await?; + return Ok(()); + } + let (entry, child_worktrees) = { let workspaces = state.workspaces.lock().await; let entry = workspaces @@ -480,7 +498,13 @@ pub(crate) async fn remove_workspace( pub(crate) async fn remove_worktree( id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result<(), String> { + if remote_backend::is_remote_mode(&*state).await { + remote_backend::call_remote(&*state, app, "remove_worktree", json!({ "id": id })).await?; + return Ok(()); + } + let (entry, parent) = { let workspaces = state.workspaces.lock().await; let entry = workspaces @@ -659,7 +683,19 @@ pub(crate) async fn update_workspace_settings( id: String, settings: WorkspaceSettings, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote( + &*state, + app, + "update_workspace_settings", + json!({ "id": id, "settings": settings }), + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let (entry_snapshot, list) = { let mut workspaces = state.workspaces.lock().await; let entry_snapshot = apply_workspace_settings_update(&mut workspaces, &id, settings)?; @@ -687,7 +723,19 @@ pub(crate) async fn update_workspace_codex_bin( id: String, codex_bin: Option, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let response = remote_backend::call_remote( + &*state, + app, + "update_workspace_codex_bin", + json!({ "id": id, "codex_bin": codex_bin }), + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + let (entry_snapshot, list) = { let mut workspaces = state.workspaces.lock().await; let entry_snapshot = match workspaces.get_mut(&id) { From dd41d93de16d2639a6de77f48418ee4c81832dbf Mon Sep 17 00:00:00 2001 From: slkzgm Date: Tue, 20 Jan 2026 10:30:38 +0100 Subject: [PATCH 4/4] fix(remote): forward collaboration mode and handle disconnect --- src-tauri/src/codex.rs | 12 ++++++++ src-tauri/src/remote_backend.rs | 54 +++++++++++++++++++++++++-------- 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/src-tauri/src/codex.rs b/src-tauri/src/codex.rs index 4ed244779..df0f6b79f 100644 --- a/src-tauri/src/codex.rs +++ b/src-tauri/src/codex.rs @@ -270,6 +270,7 @@ pub(crate) async fn send_user_message( "effort": effort, "accessMode": access_mode, "images": images, + "collaborationMode": collaboration_mode, }), ) .await; @@ -342,7 +343,18 @@ pub(crate) async fn send_user_message( pub(crate) async fn collaboration_mode_list( workspace_id: String, state: State<'_, AppState>, + app: AppHandle, ) -> Result { + if remote_backend::is_remote_mode(&*state).await { + return remote_backend::call_remote( + &*state, + app, + "collaboration_mode_list", + json!({ "workspaceId": workspace_id }), + ) + .await; + } + let sessions = state.sessions.lock().await; let session = sessions .get(&workspace_id) diff --git a/src-tauri/src/remote_backend.rs b/src-tauri/src/remote_backend.rs index 42f472c3d..a844f0fff 100644 --- a/src-tauri/src/remote_backend.rs +++ b/src-tauri/src/remote_backend.rs @@ -1,6 +1,6 @@ use serde_json::{json, Value}; use std::collections::HashMap; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use tauri::{AppHandle, Emitter}; @@ -59,10 +59,15 @@ struct RemoteBackendInner { out_tx: mpsc::UnboundedSender, pending: Arc>, next_id: AtomicU64, + connected: Arc, } impl RemoteBackend { pub(crate) async fn call(&self, method: &str, params: Value) -> Result { + if !self.inner.connected.load(Ordering::SeqCst) { + return Err(DISCONNECTED_MESSAGE.to_string()); + } + let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); let (tx, rx) = oneshot::channel(); self.inner.pending.lock().await.insert(id, tx); @@ -73,10 +78,10 @@ impl RemoteBackend { "params": params, }); let message = serde_json::to_string(&request).map_err(|err| err.to_string())?; - self.inner - .out_tx - .send(message) - .map_err(|_| DISCONNECTED_MESSAGE.to_string())?; + if self.inner.out_tx.send(message).is_err() { + self.inner.pending.lock().await.remove(&id); + return Err(DISCONNECTED_MESSAGE.to_string()); + } rx.await .map_err(|_| DISCONNECTED_MESSAGE.to_string())? @@ -132,22 +137,38 @@ async fn ensure_remote_backend(state: &AppState, app: AppHandle) -> Result(); + let pending = Arc::new(Mutex::new(PendingMap::new())); + let pending_for_writer = Arc::clone(&pending); + let pending_for_reader = Arc::clone(&pending); + + let connected = Arc::new(AtomicBool::new(true)); + let connected_for_writer = Arc::clone(&connected); + let connected_for_reader = Arc::clone(&connected); + let write_task = tokio::spawn(async move { while let Some(message) = out_rx.recv().await { - if writer.write_all(message.as_bytes()).await.is_err() { - break; - } - if writer.write_all(b"\n").await.is_err() { + if writer.write_all(message.as_bytes()).await.is_err() + || writer.write_all(b"\n").await.is_err() + { + connected_for_writer.store(false, Ordering::SeqCst); + let mut pending = pending_for_writer.lock().await; + for (_, sender) in pending.drain() { + let _ = sender.send(Err(DISCONNECTED_MESSAGE.to_string())); + } break; } } }); - let pending = Arc::new(Mutex::new(PendingMap::new())); - let pending_for_reader = Arc::clone(&pending); let app_for_reader = app.clone(); let read_task = tokio::spawn(async move { - read_loop(app_for_reader, reader, pending_for_reader).await; + read_loop( + app_for_reader, + reader, + pending_for_reader, + connected_for_reader, + ) + .await; }); let client = RemoteBackend { @@ -155,6 +176,7 @@ async fn ensure_remote_backend(state: &AppState, app: AppHandle) -> Result Result>) { +async fn read_loop( + app: AppHandle, + reader: tokio::net::tcp::OwnedReadHalf, + pending: Arc>, + connected: Arc, +) { let mut lines = BufReader::new(reader).lines(); while let Ok(Some(line)) = lines.next_line().await { @@ -228,6 +255,7 @@ async fn read_loop(app: AppHandle, reader: tokio::net::tcp::OwnedReadHalf, pendi } } + connected.store(false, Ordering::SeqCst); let mut pending = pending.lock().await; for (_, sender) in pending.drain() { let _ = sender.send(Err(DISCONNECTED_MESSAGE.to_string()));