diff --git a/src-tauri/src/api.rs b/src-tauri/src/api.rs index 2ee3ec52..7a3888b1 100644 --- a/src-tauri/src/api.rs +++ b/src-tauri/src/api.rs @@ -126,7 +126,7 @@ pub async fn join( events.extend(mod_events) } - if let Err(err) = eventsub.subscribe_all(login_clone.as_str(), events).await { + if let Err(err) = eventsub.subscribe_all(login_clone.as_str(), &events).await { tracing::error!(%err, "Failed to batch subscribe to EventSub events"); } } @@ -202,7 +202,7 @@ pub async fn rejoin(state: State<'_, Mutex>, channel: String) -> Resul let subscriptions = eventsub.unsubscribe_all(&channel).await?; let subs_ref: Vec<_> = subscriptions.iter().map(|(e, c)| (*e, c)).collect(); - eventsub.subscribe_all(&channel, subs_ref).await?; + eventsub.subscribe_all(&channel, &subs_ref).await?; } if let Some(irc) = irc { diff --git a/src-tauri/src/eventsub/client.rs b/src-tauri/src/eventsub/client.rs index 0e13b7dc..ac3c01c7 100644 --- a/src-tauri/src/eventsub/client.rs +++ b/src-tauri/src/eventsub/client.rs @@ -14,7 +14,6 @@ use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async}; -use tracing::Instrument; use twitch_api::HelixClient; use twitch_api::eventsub::{EventSubSubscription, EventType}; use twitch_api::twitch_oauth2::{TwitchToken, UserToken}; @@ -163,37 +162,26 @@ impl EventSubClient { } #[tracing::instrument(name = "eventsub_connect", skip_all)] - pub async fn connect(self: Arc) -> Result<(), Error> { - tokio::spawn( - async move { - let ws_uri = TWITCH_EVENTSUB_WS_URI.to_string(); - tracing::info!("Connecting to EventSub"); - - let stream = match connect_async(&ws_uri).await { - Ok((stream, _)) => stream, - Err(err) => { - tracing::error!(%err, "Failed to connect to EventSub"); - return Err(Error::WebSocket(err)); - } - }; + pub async fn connect(&self) -> Result<(), Error> { + tracing::info!("Connecting to EventSub"); - tracing::info!("Connected to EventSub"); - self.set_connected(true); + let (stream, _) = connect_async(TWITCH_EVENTSUB_WS_URI).await.map_err(|err| { + tracing::error!(%err, "Failed to connect to EventSub"); + Error::WebSocket(err) + })?; - let _ = Arc::clone(&self).process_stream(stream).await; + tracing::info!("Connected to EventSub"); + self.set_connected(true); - self.set_connected(false); - *self.session_id.lock().await = None; + let result = self.process_stream(stream).await; - Ok(()) - } - .in_current_span(), - ); + self.set_connected(false); + *self.session_id.lock().await = None; - Ok(()) + result } - async fn process_stream(self: Arc, mut stream: Stream) -> Result<(), Error> { + async fn process_stream(&self, mut stream: Stream) -> Result<(), Error> { loop { match stream.next().await { Some(Ok(message)) => match message { @@ -201,7 +189,7 @@ impl EventSubClient { stream.send(Message::Pong(data)).await?; } Message::Text(data) => { - if let Some(new_stream) = Arc::clone(&self).handle_text(&data).await? { + if let Some(new_stream) = self.handle_text(&data).await? { let frame = CloseFrame { code: CloseCode::Normal, reason: "Reconnecting".into(), @@ -228,7 +216,7 @@ impl EventSubClient { Some(Err(err)) => { tracing::error!(%err, "EventSub connection error"); - match Arc::clone(&self).reconnect(TWITCH_EVENTSUB_WS_URI).await { + match self.reconnect(TWITCH_EVENTSUB_WS_URI).await { Ok(new_stream) => { stream = new_stream; } @@ -250,22 +238,25 @@ impl EventSubClient { Ok(()) } - async fn handle_text(self: Arc, data: &str) -> Result, Error> { - if let Ok(msg) = serde_json::from_str(data) - && let Some(url) = Arc::clone(&self).handle_message(msg).await? - { - tracing::info!("Reconnecting to EventSub at {url}"); - return Ok(Some(self.reconnect(&url).await?)); - } + async fn handle_text(&self, data: &str) -> Result, Error> { + let msg: WebSocketMessage = match serde_json::from_str(data) { + Ok(msg) => msg, + Err(err) => { + tracing::warn!(%err, "Failed to deserialize EventSub message"); + return Ok(None); + } + }; - Ok(None) + let Some(url) = self.handle_message(msg).await? else { + return Ok(None); + }; + + tracing::info!("Reconnecting to EventSub at {url}"); + Ok(Some(self.reconnect(&url).await?)) } #[tracing::instrument(skip_all)] - async fn handle_message( - self: Arc, - msg: WebSocketMessage, - ) -> Result, Error> { + async fn handle_message(&self, msg: WebSocketMessage) -> Result, Error> { use WebSocketMessage as Ws; match msg { @@ -273,32 +264,27 @@ impl EventSubClient { tracing::debug!("Set EventSub session id to {}", payload.session.id); *self.session_id.lock().await = Some(payload.session.id); - if self - .reconnecting - .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) - .is_err() - { - tracing::info!("Initial connection to EventSub established"); + let was_reconnecting = self.reconnecting.swap(false, Ordering::Relaxed); - let mut to_restore = Vec::new(); + if was_reconnecting { + tracing::info!("Reconnected to EventSub"); + } else { + tracing::info!("Initial connection to EventSub established"); - { + let to_restore: Vec<_> = { let mut map = self.subscriptions.lock().await; if !map.is_empty() { tracing::info!("Restoring {} subscriptions", map.len()); - - for (key, sub) in map.drain() { - if let Some((username, _)) = key.split_once(':') { - to_restore.push(( - username.to_string(), - sub.kind, - sub.condition, - )); - } - } } - } + + map.drain() + .filter_map(|(key, sub)| { + let (username, _) = key.split_once(':')?; + Some((username.to_string(), sub.kind, sub.condition)) + }) + .collect() + }; self.subscribe( self.token.login.as_str(), @@ -316,8 +302,6 @@ impl EventSubClient { tracing::error!(%err, "Failed to restore {kind} subscription"); } } - } else { - tracing::info!("Reconnected to EventSub"); } } Ws::Notification(payload) => { @@ -327,15 +311,18 @@ impl EventSubClient { payload.event ); - self.sender.send(payload).unwrap(); + if self.sender.send(payload).is_err() { + tracing::warn!("EventSub notification receiver dropped"); + } } Ws::Reconnect(payload) => { tracing::warn!("Reconnect requested for {}", payload.session.id); - let url = payload - .session - .reconnect_url - .expect("missing reconnect_url in reconnect payload"); + let Some(url) = payload.session.reconnect_url else { + return Err(Error::Generic(anyhow!( + "missing reconnect_url in reconnect payload" + ))); + }; self.reconnecting.store(true, Ordering::Relaxed); @@ -353,13 +340,13 @@ impl EventSubClient { .await .remove(&payload.subscription.kind.to_string()); } - _ => (), + Ws::Keepalive => (), } Ok(None) } - async fn reconnect(self: Arc, url: &str) -> Result { + async fn reconnect(&self, url: &str) -> Result { let (mut stream, _) = connect_async(url).await.map_err(Error::WebSocket)?; loop { @@ -388,7 +375,7 @@ impl EventSubClient { self.connected.load(Ordering::Relaxed) } - pub fn set_connected(&self, value: bool) { + fn set_connected(&self, value: bool) { self.connected.store(value, Ordering::Relaxed); } @@ -445,15 +432,17 @@ impl EventSubClient { pub async fn subscribe_all( &self, channel: &str, - subscriptions: Vec<(EventType, &serde_json::Value)>, + subscriptions: &[(EventType, &serde_json::Value)], ) -> Result<(), Error> { let futures = subscriptions .iter() .map(|&(event, condition)| self.subscribe(channel, event, condition.clone())); - let success = join_all(futures).await.iter().filter(|r| r.is_ok()).count(); + let results = join_all(futures).await; + let total = results.len(); + let succeeded = results.iter().filter(|r| r.is_ok()).count(); - tracing::info!("{} subscriptions created", success); + tracing::info!("{succeeded}/{total} subscriptions created"); Ok(()) } @@ -461,13 +450,10 @@ impl EventSubClient { pub async fn unsubscribe( &self, channel: &str, - event: String, + event: &str, ) -> Result, Error> { - let subscription = self - .subscriptions - .lock() - .await - .remove(&format!("{channel}:{event}")); + let key = format!("{channel}:{event}"); + let subscription = self.subscriptions.lock().await.remove(&key); if let Some(ref sub) = subscription { self.helix @@ -484,28 +470,23 @@ impl EventSubClient { ) -> Result, Error> { let prefix = format!("{channel}:"); - let events = { + let events: Vec = { let subscriptions = self.subscriptions.lock().await; subscriptions .keys() - .filter(|k| k.starts_with(&prefix)) - .map(|k| k.strip_prefix(&prefix).unwrap().to_string()) - .collect::>() + .filter_map(|k| k.strip_prefix(&prefix).map(String::from)) + .collect() }; - let futures = events - .iter() - .map(|event| self.unsubscribe(channel, event.clone())); - - let results = join_all(futures).await; - let mut unsubscribed = Vec::new(); + let futures = events.iter().map(|event| self.unsubscribe(channel, event)); - for result in results { - if let Ok(Some(sub)) = result { - unsubscribed.push((sub.kind, sub.condition)); - } - } + let unsubscribed = join_all(futures) + .await + .into_iter() + .filter_map(|r| r.ok().flatten()) + .map(|sub| (sub.kind, sub.condition)) + .collect(); Ok(unsubscribed) } diff --git a/src-tauri/src/eventsub/mod.rs b/src-tauri/src/eventsub/mod.rs index c7f2ab6a..3c361c15 100644 --- a/src-tauri/src/eventsub/mod.rs +++ b/src-tauri/src/eventsub/mod.rs @@ -35,19 +35,22 @@ pub async fn connect_eventsub( drop(guard); async_runtime::spawn(async move { - if Arc::clone(&client).connect().await.is_err() { + if let Err(err) = client.connect().await { + tracing::error!(%err, "EventSub connection failed"); + let state = app_handle.state::>(); let mut state = state.lock().await; state.eventsub = None; } - - Ok::<_, Error>(()) }); async_runtime::spawn(async move { while let Some(message) = incoming.recv().await { - channel.send(message).unwrap(); + if channel.send(message).is_err() { + tracing::warn!("EventSub frontend channel closed"); + break; + } } }); diff --git a/src-tauri/src/irc/client/event_loop.rs b/src-tauri/src/irc/client/event_loop.rs index 5ca56cf4..c9152b35 100644 --- a/src-tauri/src/irc/client/event_loop.rs +++ b/src-tauri/src/irc/client/event_loop.rs @@ -85,7 +85,7 @@ impl ClientLoopWorker { #[must_use] fn make_new_connection(&mut self) -> PoolConnection { let connection_id = self.next_connection_id; - self.next_connection_id = self.next_connection_id.overflowing_add(1).0; + self.next_connection_id = self.next_connection_id.wrapping_add(1); let (connection_incoming_messages_rx, connection) = Connection::new(Arc::clone(&self.config)); @@ -120,16 +120,18 @@ impl ClientLoopWorker { break; } incoming_message = connection_incoming_messages_rx.recv() => { - if let Some(incoming_message) = incoming_message { - if let Some(client_loop_tx) = client_loop_tx.upgrade() { - client_loop_tx.send(ClientLoopCommand::IncomingMessage { - source_connection_id: connection_id, - message: Box::new(incoming_message) - }).unwrap(); - } else { - break; - } - } else { + let Some(incoming_message) = incoming_message else { + break; + }; + + let Some(client_loop_tx) = client_loop_tx.upgrade() else { + break; + }; + + if client_loop_tx.send(ClientLoopCommand::IncomingMessage { + source_connection_id: connection_id, + message: Box::new(incoming_message) + }).is_err() { break; } } diff --git a/src-tauri/src/irc/client/mod.rs b/src-tauri/src/irc/client/mod.rs index 8b45d191..fc94b07d 100644 --- a/src-tauri/src/irc/client/mod.rs +++ b/src-tauri/src/irc/client/mod.rs @@ -37,24 +37,39 @@ impl IrcClient { pub async fn connect(&self) { let (return_tx, return_rx) = oneshot::channel(); - self.client_loop_tx + if self + .client_loop_tx .send(ClientLoopCommand::Connect { return_sender: return_tx, }) - .unwrap(); + .is_err() + { + tracing::error!("IRC client loop has stopped"); + return; + } - return_rx.await.unwrap() + if return_rx.await.is_err() { + tracing::error!("IRC connect acknowledgment lost"); + } } pub fn join(&self, channel_login: String) { - self.client_loop_tx + if self + .client_loop_tx .send(ClientLoopCommand::Join { channel_login }) - .unwrap(); + .is_err() + { + tracing::warn!("IRC client loop has stopped, cannot join channel"); + } } pub fn part(&self, channel_login: String) { - self.client_loop_tx + if self + .client_loop_tx .send(ClientLoopCommand::Part { channel_login }) - .unwrap(); + .is_err() + { + tracing::warn!("IRC client loop has stopped, cannot part channel"); + } } } diff --git a/src-tauri/src/irc/mod.rs b/src-tauri/src/irc/mod.rs index 05ef208c..13c2833b 100644 --- a/src-tauri/src/irc/mod.rs +++ b/src-tauri/src/irc/mod.rs @@ -43,7 +43,10 @@ pub async fn connect_irc( tracing::trace!(?tags, "Received {command} message"); - channel.send(message).unwrap(); + if channel.send(message).is_err() { + tracing::warn!("IRC frontend channel closed"); + break; + } } }); diff --git a/src-tauri/src/seventv/client.rs b/src-tauri/src/seventv/client.rs index cacc605d..7f0c722c 100644 --- a/src-tauri/src/seventv/client.rs +++ b/src-tauri/src/seventv/client.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use anyhow::anyhow; @@ -10,7 +9,6 @@ use serde_json::json; use tokio::sync::{Mutex, mpsc}; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; -use tracing::Instrument; use crate::error::Error; @@ -23,12 +21,12 @@ struct WebSocketMessage { } pub struct SeventTvClient { - session_id: Arc>>, - subscriptions: Arc>>, + session_id: Mutex>, + subscriptions: Mutex>, sender: mpsc::UnboundedSender, connected: AtomicBool, message_tx: mpsc::UnboundedSender, - message_rx: Arc>>>, + message_rx: Mutex>>, } impl SeventTvClient { @@ -37,119 +35,75 @@ impl SeventTvClient { let (message_tx, message_rx) = mpsc::unbounded_channel(); let client = Self { - subscriptions: Arc::new(Mutex::new(HashMap::new())), - session_id: Arc::new(Mutex::new(None)), + subscriptions: Mutex::new(HashMap::new()), + session_id: Mutex::new(None), sender, connected: AtomicBool::default(), message_tx, - message_rx: Arc::new(Mutex::new(Some(message_rx))), + message_rx: Mutex::new(Some(message_rx)), }; (receiver, client) } #[tracing::instrument(name = "7tv_connect", skip_all)] - pub async fn connect(self: Arc) -> Result<(), Error> { - let this = Arc::clone(&self); - - let mut message_rx = { - let mut guard = this.message_rx.lock().await; - - guard - .take() - .ok_or_else(|| Error::Generic(anyhow!("Message receiver already taken")))? - }; - - tokio::spawn(async move { - loop { - tracing::info!("Connecting to 7TV Event API"); - - let mut stream = match connect_async(SEVENTV_WS_URI).await { - Ok((stream, _)) => stream, - Err(err) => { - tracing::error!(%err, "Failed to connect to 7TV Event API"); - return Err::<(), _>(Error::WebSocket(err)) - }, - }; - - tracing::info!("Connected to 7TV Event API"); + pub async fn connect(&self) -> Result<(), Error> { + let mut message_rx = self + .message_rx + .lock() + .await + .take() + .ok_or_else(|| Error::Generic(anyhow!("Message receiver already taken")))?; + + loop { + tracing::info!("Connecting to 7TV Event API"); + + let mut stream = match connect_async(SEVENTV_WS_URI).await { + Ok((stream, _)) => stream, + Err(err) => { + tracing::error!(%err, "Failed to connect to 7TV Event API"); + return Err(Error::WebSocket(err)); + } + }; - { - let session_id = this.session_id.lock().await; + tracing::info!("Connected to 7TV Event API"); - if let Some(id) = &*session_id { - tracing::info!(%id, "Resuming 7TV session"); + { + let session_id = self.session_id.lock().await; - let payload = json!({ - "op": 34, - "d": { - "session_id": id - } - }); + if let Some(id) = &*session_id { + tracing::info!(%id, "Resuming 7TV session"); - if let Err(err) = stream.send(Message::Text(payload.to_string().into())).await { - tracing::error!(%err, "Error sending resume message"); + let payload = json!({ + "op": 34, + "d": { + "session_id": id } + }); + + if let Err(err) = stream.send(Message::Text(payload.to_string().into())).await { + tracing::error!(%err, "Error sending resume message"); } } + } - self.connected.store(true, Ordering::Relaxed); - - 'recv: loop { - let this = Arc::clone(&this); + self.connected.store(true, Ordering::Relaxed); - tokio::select! { - Some(data) = message_rx.recv() => { - if let Err(err) = stream.send(data).await { - tracing::error!(%err, "Error sending message"); - break 'recv; - } + loop { + tokio::select! { + Some(data) = message_rx.recv() => { + if let Err(err) = stream.send(data).await { + tracing::error!(%err, "Error sending message"); + break; } - Some(Ok(message)) = stream.next() => { - match message { + } + result = stream.next() => { + match result { + Some(Ok(message)) => match message { Message::Text(text) => { - if let Ok(msg) = serde_json::from_str::(&text) { - match msg.op { - 0 => { - if let Err(err) = this.sender.send(msg.d) { - tracing::error!(%err, "Error sending payload"); - } - } - 1 => { - if let Some(id) = msg.d["session_id"].as_str() { - let mut session = this.session_id.lock().await; - *session = Some(id.to_string()); - - tracing::info!(%id, "Hello received, session established"); - } - } - 5 => { - tracing::debug!(payload = ?msg.d.to_string(), "Opcode acknowledged"); - - if let Some(success) = msg.d["data"]["success"].as_bool() && !success { - let to_restore: Vec<_> = { - let mut subscriptions = this.subscriptions.lock().await; - - tracing::warn!( - "Resume unsuccessful, restoring {} events", - subscriptions.len() - ); - - subscriptions.drain().collect() - }; - - for (key, condition) in to_restore { - let (channel, event) = key.split_once(':').unwrap(); - - self.subscribe(channel, event, &condition).await; - } - } - } - 7 => { - tracing::info!(payload = ?msg.d.to_string(), "End of stream reached"); - } - _ => {} - } + match serde_json::from_str::(&text) { + Ok(msg) => self.handle_ws_message(msg).await, + Err(err) => tracing::warn!(%err, "Failed to deserialize 7TV message"), } } Message::Close(cf) => { @@ -157,18 +111,70 @@ impl SeventTvClient { tracing::warn!(%frame, "Event API connection closed"); } - this.connected.store(false, Ordering::Relaxed); - break 'recv; + break; } _ => (), + }, + Some(Err(err)) => { + tracing::error!(%err, "7TV WebSocket error"); + break; + } + None => { + tracing::warn!("7TV WebSocket stream ended unexpectedly"); + break; } } } } } - }.in_current_span()); - Ok(()) + self.connected.store(false, Ordering::Relaxed); + } + } + + async fn handle_ws_message(&self, msg: WebSocketMessage) { + match msg.op { + 0 => { + if self.sender.send(msg.d).is_err() { + tracing::warn!("7TV event receiver dropped"); + } + } + 1 => { + if let Some(id) = msg.d["session_id"].as_str() { + *self.session_id.lock().await = Some(id.to_string()); + tracing::info!(%id, "Hello received, session established"); + } + } + 5 => { + tracing::debug!(payload = ?msg.d.to_string(), "Opcode acknowledged"); + + if let Some(false) = msg.d["data"]["success"].as_bool() { + let to_restore: Vec<_> = { + let mut subscriptions = self.subscriptions.lock().await; + + tracing::warn!( + "Resume unsuccessful, restoring {} events", + subscriptions.len() + ); + + subscriptions.drain().collect() + }; + + for (key, condition) in to_restore { + let Some((channel, event)) = key.split_once(':') else { + tracing::warn!("Malformed subscription key: {key}"); + continue; + }; + + self.subscribe(channel, event, &condition).await; + } + } + } + 7 => { + tracing::info!(payload = ?msg.d.to_string(), "End of stream reached"); + } + _ => {} + } } pub fn connected(&self) -> bool { @@ -190,8 +196,10 @@ impl SeventTvClient { .send(Message::Text(payload.to_string().into())) { Ok(_) => { - let mut subscriptions = self.subscriptions.lock().await; - subscriptions.insert(format!("{channel}:{event}"), condition.clone()); + self.subscriptions + .lock() + .await + .insert(format!("{channel}:{event}"), condition.clone()); tracing::trace!("Subscription created"); } @@ -222,14 +230,13 @@ impl SeventTvClient { pub async fn unsubscribe_all(&self, channel: &str) { let prefix = format!("{channel}:"); - let events = { + let events: Vec = { let subscriptions = self.subscriptions.lock().await; subscriptions .keys() - .filter(|k| k.starts_with(&prefix)) - .map(|k| k.strip_prefix(&prefix).unwrap().to_string()) - .collect::>() + .filter_map(|k| k.strip_prefix(&prefix).map(String::from)) + .collect() }; let futures = events.iter().map(|event| self.unsubscribe(channel, event)); diff --git a/src-tauri/src/seventv/mod.rs b/src-tauri/src/seventv/mod.rs index 1570e4a4..203297dc 100644 --- a/src-tauri/src/seventv/mod.rs +++ b/src-tauri/src/seventv/mod.rs @@ -32,17 +32,22 @@ pub async fn connect_seventv( drop(state); async_runtime::spawn(async move { - if Arc::clone(&client).connect().await.is_err() { + if let Err(err) = client.connect().await { + tracing::error!(%err, "7TV connection failed"); + let state = app_handle.state::>(); let mut state = state.lock().await; state.seventv = None; - }; + } }); async_runtime::spawn(async move { while let Some(message) = incoming.recv().await { - channel.send(message).unwrap(); + if channel.send(message).is_err() { + tracing::warn!("7TV frontend channel closed"); + break; + } } });