From af83202377e5b96c579d5741cefb1bb9de32a867 Mon Sep 17 00:00:00 2001 From: Oliver Rose Date: Sun, 15 Mar 2026 22:57:55 -0400 Subject: [PATCH 1/5] Refactor EventSub client to use references Optimizes the client structure by removing unnecessary Arc wrappers and changing method signatures to accept references instead of ownership. Enhances robustness by gracefully handling channel closures and deserialization errors during reconnection, and cleans up subscription management logic. --- src-tauri/src/api.rs | 4 +- src-tauri/src/eventsub/client.rs | 161 ++++++++++++++----------------- src-tauri/src/eventsub/mod.rs | 11 ++- 3 files changed, 81 insertions(+), 95 deletions(-) diff --git a/src-tauri/src/api.rs b/src-tauri/src/api.rs index 2ee3ec52..7a3888b1 100644 --- a/src-tauri/src/api.rs +++ b/src-tauri/src/api.rs @@ -126,7 +126,7 @@ pub async fn join( events.extend(mod_events) } - if let Err(err) = eventsub.subscribe_all(login_clone.as_str(), events).await { + if let Err(err) = eventsub.subscribe_all(login_clone.as_str(), &events).await { tracing::error!(%err, "Failed to batch subscribe to EventSub events"); } } @@ -202,7 +202,7 @@ pub async fn rejoin(state: State<'_, Mutex>, channel: String) -> Resul let subscriptions = eventsub.unsubscribe_all(&channel).await?; let subs_ref: Vec<_> = subscriptions.iter().map(|(e, c)| (*e, c)).collect(); - eventsub.subscribe_all(&channel, subs_ref).await?; + eventsub.subscribe_all(&channel, &subs_ref).await?; } if let Some(irc) = irc { diff --git a/src-tauri/src/eventsub/client.rs b/src-tauri/src/eventsub/client.rs index 0e13b7dc..f373cf96 100644 --- a/src-tauri/src/eventsub/client.rs +++ b/src-tauri/src/eventsub/client.rs @@ -14,7 +14,6 @@ use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async}; -use tracing::Instrument; use twitch_api::HelixClient; use twitch_api::eventsub::{EventSubSubscription, EventType}; use twitch_api::twitch_oauth2::{TwitchToken, UserToken}; @@ -163,37 +162,26 @@ impl EventSubClient { } #[tracing::instrument(name = "eventsub_connect", skip_all)] - pub async fn connect(self: Arc) -> Result<(), Error> { - tokio::spawn( - async move { - let ws_uri = TWITCH_EVENTSUB_WS_URI.to_string(); - tracing::info!("Connecting to EventSub"); - - let stream = match connect_async(&ws_uri).await { - Ok((stream, _)) => stream, - Err(err) => { - tracing::error!(%err, "Failed to connect to EventSub"); - return Err(Error::WebSocket(err)); - } - }; + pub async fn connect(&self) -> Result<(), Error> { + tracing::info!("Connecting to EventSub"); - tracing::info!("Connected to EventSub"); - self.set_connected(true); + let (stream, _) = connect_async(TWITCH_EVENTSUB_WS_URI).await.map_err(|err| { + tracing::error!(%err, "Failed to connect to EventSub"); + Error::WebSocket(err) + })?; - let _ = Arc::clone(&self).process_stream(stream).await; + tracing::info!("Connected to EventSub"); + self.set_connected(true); - self.set_connected(false); - *self.session_id.lock().await = None; + let _ = self.process_stream(stream).await; - Ok(()) - } - .in_current_span(), - ); + self.set_connected(false); + *self.session_id.lock().await = None; Ok(()) } - async fn process_stream(self: Arc, mut stream: Stream) -> Result<(), Error> { + async fn process_stream(&self, mut stream: Stream) -> Result<(), Error> { loop { match stream.next().await { Some(Ok(message)) => match message { @@ -201,7 +189,7 @@ impl EventSubClient { stream.send(Message::Pong(data)).await?; } Message::Text(data) => { - if let Some(new_stream) = Arc::clone(&self).handle_text(&data).await? { + if let Some(new_stream) = self.handle_text(&data).await? { let frame = CloseFrame { code: CloseCode::Normal, reason: "Reconnecting".into(), @@ -228,7 +216,7 @@ impl EventSubClient { Some(Err(err)) => { tracing::error!(%err, "EventSub connection error"); - match Arc::clone(&self).reconnect(TWITCH_EVENTSUB_WS_URI).await { + match self.reconnect(TWITCH_EVENTSUB_WS_URI).await { Ok(new_stream) => { stream = new_stream; } @@ -250,22 +238,25 @@ impl EventSubClient { Ok(()) } - async fn handle_text(self: Arc, data: &str) -> Result, Error> { - if let Ok(msg) = serde_json::from_str(data) - && let Some(url) = Arc::clone(&self).handle_message(msg).await? - { - tracing::info!("Reconnecting to EventSub at {url}"); - return Ok(Some(self.reconnect(&url).await?)); - } + async fn handle_text(&self, data: &str) -> Result, Error> { + let msg: WebSocketMessage = match serde_json::from_str(data) { + Ok(msg) => msg, + Err(err) => { + tracing::warn!(%err, "Failed to deserialize EventSub message"); + return Ok(None); + } + }; - Ok(None) + let Some(url) = self.handle_message(msg).await? else { + return Ok(None); + }; + + tracing::info!("Reconnecting to EventSub at {url}"); + Ok(Some(self.reconnect(&url).await?)) } #[tracing::instrument(skip_all)] - async fn handle_message( - self: Arc, - msg: WebSocketMessage, - ) -> Result, Error> { + async fn handle_message(&self, msg: WebSocketMessage) -> Result, Error> { use WebSocketMessage as Ws; match msg { @@ -273,32 +264,27 @@ impl EventSubClient { tracing::debug!("Set EventSub session id to {}", payload.session.id); *self.session_id.lock().await = Some(payload.session.id); - if self - .reconnecting - .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) - .is_err() - { - tracing::info!("Initial connection to EventSub established"); + let was_reconnecting = self.reconnecting.swap(false, Ordering::Relaxed); - let mut to_restore = Vec::new(); + if was_reconnecting { + tracing::info!("Reconnected to EventSub"); + } else { + tracing::info!("Initial connection to EventSub established"); - { + let to_restore: Vec<_> = { let mut map = self.subscriptions.lock().await; if !map.is_empty() { tracing::info!("Restoring {} subscriptions", map.len()); - - for (key, sub) in map.drain() { - if let Some((username, _)) = key.split_once(':') { - to_restore.push(( - username.to_string(), - sub.kind, - sub.condition, - )); - } - } } - } + + map.drain() + .filter_map(|(key, sub)| { + let (username, _) = key.split_once(':')?; + Some((username.to_string(), sub.kind, sub.condition)) + }) + .collect() + }; self.subscribe( self.token.login.as_str(), @@ -316,8 +302,6 @@ impl EventSubClient { tracing::error!(%err, "Failed to restore {kind} subscription"); } } - } else { - tracing::info!("Reconnected to EventSub"); } } Ws::Notification(payload) => { @@ -327,15 +311,18 @@ impl EventSubClient { payload.event ); - self.sender.send(payload).unwrap(); + if self.sender.send(payload).is_err() { + tracing::warn!("EventSub notification receiver dropped"); + } } Ws::Reconnect(payload) => { tracing::warn!("Reconnect requested for {}", payload.session.id); - let url = payload - .session - .reconnect_url - .expect("missing reconnect_url in reconnect payload"); + let Some(url) = payload.session.reconnect_url else { + return Err(Error::Generic(anyhow!( + "missing reconnect_url in reconnect payload" + ))); + }; self.reconnecting.store(true, Ordering::Relaxed); @@ -353,13 +340,13 @@ impl EventSubClient { .await .remove(&payload.subscription.kind.to_string()); } - _ => (), + Ws::Keepalive => (), } Ok(None) } - async fn reconnect(self: Arc, url: &str) -> Result { + async fn reconnect(&self, url: &str) -> Result { let (mut stream, _) = connect_async(url).await.map_err(Error::WebSocket)?; loop { @@ -388,7 +375,7 @@ impl EventSubClient { self.connected.load(Ordering::Relaxed) } - pub fn set_connected(&self, value: bool) { + fn set_connected(&self, value: bool) { self.connected.store(value, Ordering::Relaxed); } @@ -445,15 +432,17 @@ impl EventSubClient { pub async fn subscribe_all( &self, channel: &str, - subscriptions: Vec<(EventType, &serde_json::Value)>, + subscriptions: &[(EventType, &serde_json::Value)], ) -> Result<(), Error> { let futures = subscriptions .iter() .map(|&(event, condition)| self.subscribe(channel, event, condition.clone())); - let success = join_all(futures).await.iter().filter(|r| r.is_ok()).count(); + let results = join_all(futures).await; + let total = results.len(); + let succeeded = results.iter().filter(|r| r.is_ok()).count(); - tracing::info!("{} subscriptions created", success); + tracing::info!("{succeeded}/{total} subscriptions created"); Ok(()) } @@ -461,13 +450,10 @@ impl EventSubClient { pub async fn unsubscribe( &self, channel: &str, - event: String, + event: &str, ) -> Result, Error> { - let subscription = self - .subscriptions - .lock() - .await - .remove(&format!("{channel}:{event}")); + let key = format!("{channel}:{event}"); + let subscription = self.subscriptions.lock().await.remove(&key); if let Some(ref sub) = subscription { self.helix @@ -484,28 +470,25 @@ impl EventSubClient { ) -> Result, Error> { let prefix = format!("{channel}:"); - let events = { + let events: Vec = { let subscriptions = self.subscriptions.lock().await; subscriptions .keys() - .filter(|k| k.starts_with(&prefix)) - .map(|k| k.strip_prefix(&prefix).unwrap().to_string()) - .collect::>() + .filter_map(|k| k.strip_prefix(&prefix).map(String::from)) + .collect() }; let futures = events .iter() - .map(|event| self.unsubscribe(channel, event.clone())); - - let results = join_all(futures).await; - let mut unsubscribed = Vec::new(); + .map(|event| self.unsubscribe(channel, event)); - for result in results { - if let Ok(Some(sub)) = result { - unsubscribed.push((sub.kind, sub.condition)); - } - } + let unsubscribed = join_all(futures) + .await + .into_iter() + .filter_map(|r| r.ok().flatten()) + .map(|sub| (sub.kind, sub.condition)) + .collect(); Ok(unsubscribed) } diff --git a/src-tauri/src/eventsub/mod.rs b/src-tauri/src/eventsub/mod.rs index c7f2ab6a..3c361c15 100644 --- a/src-tauri/src/eventsub/mod.rs +++ b/src-tauri/src/eventsub/mod.rs @@ -35,19 +35,22 @@ pub async fn connect_eventsub( drop(guard); async_runtime::spawn(async move { - if Arc::clone(&client).connect().await.is_err() { + if let Err(err) = client.connect().await { + tracing::error!(%err, "EventSub connection failed"); + let state = app_handle.state::>(); let mut state = state.lock().await; state.eventsub = None; } - - Ok::<_, Error>(()) }); async_runtime::spawn(async move { while let Some(message) = incoming.recv().await { - channel.send(message).unwrap(); + if channel.send(message).is_err() { + tracing::warn!("EventSub frontend channel closed"); + break; + } } }); From 33a2b8a88d035a25b87c2715a79c5bc6f282260e Mon Sep 17 00:00:00 2001 From: Oliver Rose Date: Sun, 15 Mar 2026 22:57:55 -0400 Subject: [PATCH 2/5] Refactor 7TV client to remove internal Arc wrappers Simplifies the internal state management by replacing Arc with Mutex for fields, as the client itself is shared via Arc. Rewrites the connection loop to better handle session resumption and adds error checks for frontend channel communication. --- src-tauri/src/seventv/client.rs | 228 ++++++++++++++++---------------- src-tauri/src/seventv/mod.rs | 11 +- 2 files changed, 121 insertions(+), 118 deletions(-) diff --git a/src-tauri/src/seventv/client.rs b/src-tauri/src/seventv/client.rs index cacc605d..a9aeadff 100644 --- a/src-tauri/src/seventv/client.rs +++ b/src-tauri/src/seventv/client.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use anyhow::anyhow; @@ -10,7 +9,6 @@ use serde_json::json; use tokio::sync::{Mutex, mpsc}; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; -use tracing::Instrument; use crate::error::Error; @@ -23,12 +21,12 @@ struct WebSocketMessage { } pub struct SeventTvClient { - session_id: Arc>>, - subscriptions: Arc>>, + session_id: Mutex>, + subscriptions: Mutex>, sender: mpsc::UnboundedSender, connected: AtomicBool, message_tx: mpsc::UnboundedSender, - message_rx: Arc>>>, + message_rx: Mutex>>, } impl SeventTvClient { @@ -37,138 +35,137 @@ impl SeventTvClient { let (message_tx, message_rx) = mpsc::unbounded_channel(); let client = Self { - subscriptions: Arc::new(Mutex::new(HashMap::new())), - session_id: Arc::new(Mutex::new(None)), + subscriptions: Mutex::new(HashMap::new()), + session_id: Mutex::new(None), sender, connected: AtomicBool::default(), message_tx, - message_rx: Arc::new(Mutex::new(Some(message_rx))), + message_rx: Mutex::new(Some(message_rx)), }; (receiver, client) } #[tracing::instrument(name = "7tv_connect", skip_all)] - pub async fn connect(self: Arc) -> Result<(), Error> { - let this = Arc::clone(&self); - - let mut message_rx = { - let mut guard = this.message_rx.lock().await; + pub async fn connect(&self) -> Result<(), Error> { + let mut message_rx = self + .message_rx + .lock() + .await + .take() + .ok_or_else(|| Error::Generic(anyhow!("Message receiver already taken")))?; + + loop { + tracing::info!("Connecting to 7TV Event API"); + + let mut stream = match connect_async(SEVENTV_WS_URI).await { + Ok((stream, _)) => stream, + Err(err) => { + tracing::error!(%err, "Failed to connect to 7TV Event API"); + return Err(Error::WebSocket(err)); + } + }; - guard - .take() - .ok_or_else(|| Error::Generic(anyhow!("Message receiver already taken")))? - }; + tracing::info!("Connected to 7TV Event API"); - tokio::spawn(async move { - loop { - tracing::info!("Connecting to 7TV Event API"); + { + let session_id = self.session_id.lock().await; - let mut stream = match connect_async(SEVENTV_WS_URI).await { - Ok((stream, _)) => stream, - Err(err) => { - tracing::error!(%err, "Failed to connect to 7TV Event API"); - return Err::<(), _>(Error::WebSocket(err)) - }, - }; + if let Some(id) = &*session_id { + tracing::info!(%id, "Resuming 7TV session"); - tracing::info!("Connected to 7TV Event API"); + let payload = json!({ + "op": 34, + "d": { + "session_id": id + } + }); - { - let session_id = this.session_id.lock().await; + if let Err(err) = + stream.send(Message::Text(payload.to_string().into())).await + { + tracing::error!(%err, "Error sending resume message"); + } + } + } - if let Some(id) = &*session_id { - tracing::info!(%id, "Resuming 7TV session"); + self.connected.store(true, Ordering::Relaxed); - let payload = json!({ - "op": 34, - "d": { - "session_id": id + loop { + tokio::select! { + Some(data) = message_rx.recv() => { + if let Err(err) = stream.send(data).await { + tracing::error!(%err, "Error sending message"); + break; + } + } + Some(Ok(message)) = stream.next() => { + match message { + Message::Text(text) => { + match serde_json::from_str::(&text) { + Ok(msg) => self.handle_ws_message(msg).await, + Err(err) => tracing::warn!(%err, "Failed to deserialize 7TV message"), + } } - }); + Message::Close(cf) => { + if let Some(frame) = cf { + tracing::warn!(%frame, "Event API connection closed"); + } - if let Err(err) = stream.send(Message::Text(payload.to_string().into())).await { - tracing::error!(%err, "Error sending resume message"); + self.connected.store(false, Ordering::Relaxed); + break; + } + _ => (), } } } + } + } + } + + async fn handle_ws_message(&self, msg: WebSocketMessage) { + match msg.op { + 0 => { + if self.sender.send(msg.d).is_err() { + tracing::warn!("7TV event receiver dropped"); + } + } + 1 => { + if let Some(id) = msg.d["session_id"].as_str() { + *self.session_id.lock().await = Some(id.to_string()); + tracing::info!(%id, "Hello received, session established"); + } + } + 5 => { + tracing::debug!(payload = ?msg.d.to_string(), "Opcode acknowledged"); - self.connected.store(true, Ordering::Relaxed); + if let Some(false) = msg.d["data"]["success"].as_bool() { + let to_restore: Vec<_> = { + let mut subscriptions = self.subscriptions.lock().await; - 'recv: loop { - let this = Arc::clone(&this); + tracing::warn!( + "Resume unsuccessful, restoring {} events", + subscriptions.len() + ); - tokio::select! { - Some(data) = message_rx.recv() => { - if let Err(err) = stream.send(data).await { - tracing::error!(%err, "Error sending message"); - break 'recv; - } - } - Some(Ok(message)) = stream.next() => { - match message { - Message::Text(text) => { - if let Ok(msg) = serde_json::from_str::(&text) { - match msg.op { - 0 => { - if let Err(err) = this.sender.send(msg.d) { - tracing::error!(%err, "Error sending payload"); - } - } - 1 => { - if let Some(id) = msg.d["session_id"].as_str() { - let mut session = this.session_id.lock().await; - *session = Some(id.to_string()); - - tracing::info!(%id, "Hello received, session established"); - } - } - 5 => { - tracing::debug!(payload = ?msg.d.to_string(), "Opcode acknowledged"); - - if let Some(success) = msg.d["data"]["success"].as_bool() && !success { - let to_restore: Vec<_> = { - let mut subscriptions = this.subscriptions.lock().await; - - tracing::warn!( - "Resume unsuccessful, restoring {} events", - subscriptions.len() - ); - - subscriptions.drain().collect() - }; - - for (key, condition) in to_restore { - let (channel, event) = key.split_once(':').unwrap(); - - self.subscribe(channel, event, &condition).await; - } - } - } - 7 => { - tracing::info!(payload = ?msg.d.to_string(), "End of stream reached"); - } - _ => {} - } - } - } - Message::Close(cf) => { - if let Some(frame) = cf { - tracing::warn!(%frame, "Event API connection closed"); - } + subscriptions.drain().collect() + }; - this.connected.store(false, Ordering::Relaxed); - break 'recv; - } - _ => (), - } - } + for (key, condition) in to_restore { + let Some((channel, event)) = key.split_once(':') else { + tracing::warn!("Malformed subscription key: {key}"); + continue; + }; + + self.subscribe(channel, event, &condition).await; } } } - }.in_current_span()); - - Ok(()) + 7 => { + tracing::info!(payload = ?msg.d.to_string(), "End of stream reached"); + } + _ => {} + } } pub fn connected(&self) -> bool { @@ -190,8 +187,10 @@ impl SeventTvClient { .send(Message::Text(payload.to_string().into())) { Ok(_) => { - let mut subscriptions = self.subscriptions.lock().await; - subscriptions.insert(format!("{channel}:{event}"), condition.clone()); + self.subscriptions + .lock() + .await + .insert(format!("{channel}:{event}"), condition.clone()); tracing::trace!("Subscription created"); } @@ -222,14 +221,13 @@ impl SeventTvClient { pub async fn unsubscribe_all(&self, channel: &str) { let prefix = format!("{channel}:"); - let events = { + let events: Vec = { let subscriptions = self.subscriptions.lock().await; subscriptions .keys() - .filter(|k| k.starts_with(&prefix)) - .map(|k| k.strip_prefix(&prefix).unwrap().to_string()) - .collect::>() + .filter_map(|k| k.strip_prefix(&prefix).map(String::from)) + .collect() }; let futures = events.iter().map(|event| self.unsubscribe(channel, event)); diff --git a/src-tauri/src/seventv/mod.rs b/src-tauri/src/seventv/mod.rs index 1570e4a4..203297dc 100644 --- a/src-tauri/src/seventv/mod.rs +++ b/src-tauri/src/seventv/mod.rs @@ -32,17 +32,22 @@ pub async fn connect_seventv( drop(state); async_runtime::spawn(async move { - if Arc::clone(&client).connect().await.is_err() { + if let Err(err) = client.connect().await { + tracing::error!(%err, "7TV connection failed"); + let state = app_handle.state::>(); let mut state = state.lock().await; state.seventv = None; - }; + } }); async_runtime::spawn(async move { while let Some(message) = incoming.recv().await { - channel.send(message).unwrap(); + if channel.send(message).is_err() { + tracing::warn!("7TV frontend channel closed"); + break; + } } }); From 0106f6e8cb7f08eff1b4668f62d5903c0dba3e4e Mon Sep 17 00:00:00 2001 From: Oliver Rose Date: Sun, 15 Mar 2026 22:57:55 -0400 Subject: [PATCH 3/5] Harden IRC client against channel closures and overflows Prevents panics by checking for closed channels before sending messages in the event loop and client methods. Switches to wrapping addition for connection IDs to avoid potential overflow errors. --- src-tauri/src/irc/client/event_loop.rs | 24 +++++++++++---------- src-tauri/src/irc/client/mod.rs | 29 +++++++++++++++++++------- src-tauri/src/irc/mod.rs | 5 ++++- 3 files changed, 39 insertions(+), 19 deletions(-) diff --git a/src-tauri/src/irc/client/event_loop.rs b/src-tauri/src/irc/client/event_loop.rs index 5ca56cf4..c9152b35 100644 --- a/src-tauri/src/irc/client/event_loop.rs +++ b/src-tauri/src/irc/client/event_loop.rs @@ -85,7 +85,7 @@ impl ClientLoopWorker { #[must_use] fn make_new_connection(&mut self) -> PoolConnection { let connection_id = self.next_connection_id; - self.next_connection_id = self.next_connection_id.overflowing_add(1).0; + self.next_connection_id = self.next_connection_id.wrapping_add(1); let (connection_incoming_messages_rx, connection) = Connection::new(Arc::clone(&self.config)); @@ -120,16 +120,18 @@ impl ClientLoopWorker { break; } incoming_message = connection_incoming_messages_rx.recv() => { - if let Some(incoming_message) = incoming_message { - if let Some(client_loop_tx) = client_loop_tx.upgrade() { - client_loop_tx.send(ClientLoopCommand::IncomingMessage { - source_connection_id: connection_id, - message: Box::new(incoming_message) - }).unwrap(); - } else { - break; - } - } else { + let Some(incoming_message) = incoming_message else { + break; + }; + + let Some(client_loop_tx) = client_loop_tx.upgrade() else { + break; + }; + + if client_loop_tx.send(ClientLoopCommand::IncomingMessage { + source_connection_id: connection_id, + message: Box::new(incoming_message) + }).is_err() { break; } } diff --git a/src-tauri/src/irc/client/mod.rs b/src-tauri/src/irc/client/mod.rs index 8b45d191..fc94b07d 100644 --- a/src-tauri/src/irc/client/mod.rs +++ b/src-tauri/src/irc/client/mod.rs @@ -37,24 +37,39 @@ impl IrcClient { pub async fn connect(&self) { let (return_tx, return_rx) = oneshot::channel(); - self.client_loop_tx + if self + .client_loop_tx .send(ClientLoopCommand::Connect { return_sender: return_tx, }) - .unwrap(); + .is_err() + { + tracing::error!("IRC client loop has stopped"); + return; + } - return_rx.await.unwrap() + if return_rx.await.is_err() { + tracing::error!("IRC connect acknowledgment lost"); + } } pub fn join(&self, channel_login: String) { - self.client_loop_tx + if self + .client_loop_tx .send(ClientLoopCommand::Join { channel_login }) - .unwrap(); + .is_err() + { + tracing::warn!("IRC client loop has stopped, cannot join channel"); + } } pub fn part(&self, channel_login: String) { - self.client_loop_tx + if self + .client_loop_tx .send(ClientLoopCommand::Part { channel_login }) - .unwrap(); + .is_err() + { + tracing::warn!("IRC client loop has stopped, cannot part channel"); + } } } diff --git a/src-tauri/src/irc/mod.rs b/src-tauri/src/irc/mod.rs index 05ef208c..13c2833b 100644 --- a/src-tauri/src/irc/mod.rs +++ b/src-tauri/src/irc/mod.rs @@ -43,7 +43,10 @@ pub async fn connect_irc( tracing::trace!(?tags, "Received {command} message"); - channel.send(message).unwrap(); + if channel.send(message).is_err() { + tracing::warn!("IRC frontend channel closed"); + break; + } } }); From 15fb54ca3db4ab47fab8f2045ee2af642a65a21f Mon Sep 17 00:00:00 2001 From: Oliver Rose Date: Wed, 18 Mar 2026 19:03:20 -0400 Subject: [PATCH 4/5] apply suggestions --- src-tauri/src/seventv/client.rs | 41 ++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src-tauri/src/seventv/client.rs b/src-tauri/src/seventv/client.rs index a9aeadff..7f0c722c 100644 --- a/src-tauri/src/seventv/client.rs +++ b/src-tauri/src/seventv/client.rs @@ -81,9 +81,7 @@ impl SeventTvClient { } }); - if let Err(err) = - stream.send(Message::Text(payload.to_string().into())).await - { + if let Err(err) = stream.send(Message::Text(payload.to_string().into())).await { tracing::error!(%err, "Error sending resume message"); } } @@ -99,27 +97,38 @@ impl SeventTvClient { break; } } - Some(Ok(message)) = stream.next() => { - match message { - Message::Text(text) => { - match serde_json::from_str::(&text) { - Ok(msg) => self.handle_ws_message(msg).await, - Err(err) => tracing::warn!(%err, "Failed to deserialize 7TV message"), - } - } - Message::Close(cf) => { - if let Some(frame) = cf { - tracing::warn!(%frame, "Event API connection closed"); + result = stream.next() => { + match result { + Some(Ok(message)) => match message { + Message::Text(text) => { + match serde_json::from_str::(&text) { + Ok(msg) => self.handle_ws_message(msg).await, + Err(err) => tracing::warn!(%err, "Failed to deserialize 7TV message"), + } } + Message::Close(cf) => { + if let Some(frame) = cf { + tracing::warn!(%frame, "Event API connection closed"); + } - self.connected.store(false, Ordering::Relaxed); + break; + } + _ => (), + }, + Some(Err(err)) => { + tracing::error!(%err, "7TV WebSocket error"); + break; + } + None => { + tracing::warn!("7TV WebSocket stream ended unexpectedly"); break; } - _ => (), } } } } + + self.connected.store(false, Ordering::Relaxed); } } From 2fb666e1f3e7a6659f12aac14ff0311beb416dac Mon Sep 17 00:00:00 2001 From: Oliver Rose Date: Wed, 18 Mar 2026 19:06:13 -0400 Subject: [PATCH 5/5] propagate eventsub connection result --- src-tauri/src/eventsub/client.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src-tauri/src/eventsub/client.rs b/src-tauri/src/eventsub/client.rs index f373cf96..ac3c01c7 100644 --- a/src-tauri/src/eventsub/client.rs +++ b/src-tauri/src/eventsub/client.rs @@ -173,12 +173,12 @@ impl EventSubClient { tracing::info!("Connected to EventSub"); self.set_connected(true); - let _ = self.process_stream(stream).await; + let result = self.process_stream(stream).await; self.set_connected(false); *self.session_id.lock().await = None; - Ok(()) + result } async fn process_stream(&self, mut stream: Stream) -> Result<(), Error> { @@ -479,9 +479,7 @@ impl EventSubClient { .collect() }; - let futures = events - .iter() - .map(|event| self.unsubscribe(channel, event)); + let futures = events.iter().map(|event| self.unsubscribe(channel, event)); let unsubscribed = join_all(futures) .await