Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src-tauri/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub async fn join(
events.extend(mod_events)
}

if let Err(err) = eventsub.subscribe_all(login_clone.as_str(), events).await {
if let Err(err) = eventsub.subscribe_all(login_clone.as_str(), &events).await {
tracing::error!(%err, "Failed to batch subscribe to EventSub events");
}
}
Expand Down Expand Up @@ -202,7 +202,7 @@ pub async fn rejoin(state: State<'_, Mutex<AppState>>, channel: String) -> Resul
let subscriptions = eventsub.unsubscribe_all(&channel).await?;
let subs_ref: Vec<_> = subscriptions.iter().map(|(e, c)| (*e, c)).collect();

eventsub.subscribe_all(&channel, subs_ref).await?;
eventsub.subscribe_all(&channel, &subs_ref).await?;
}

if let Some(irc) = irc {
Expand Down
165 changes: 73 additions & 92 deletions src-tauri/src/eventsub/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
use tracing::Instrument;
use twitch_api::HelixClient;
use twitch_api::eventsub::{EventSubSubscription, EventType};
use twitch_api::twitch_oauth2::{TwitchToken, UserToken};
Expand Down Expand Up @@ -163,45 +162,34 @@ impl EventSubClient {
}

#[tracing::instrument(name = "eventsub_connect", skip_all)]
pub async fn connect(self: Arc<Self>) -> Result<(), Error> {
tokio::spawn(
async move {
let ws_uri = TWITCH_EVENTSUB_WS_URI.to_string();
tracing::info!("Connecting to EventSub");

let stream = match connect_async(&ws_uri).await {
Ok((stream, _)) => stream,
Err(err) => {
tracing::error!(%err, "Failed to connect to EventSub");
return Err(Error::WebSocket(err));
}
};
pub async fn connect(&self) -> Result<(), Error> {
tracing::info!("Connecting to EventSub");

tracing::info!("Connected to EventSub");
self.set_connected(true);
let (stream, _) = connect_async(TWITCH_EVENTSUB_WS_URI).await.map_err(|err| {
tracing::error!(%err, "Failed to connect to EventSub");
Error::WebSocket(err)
})?;

let _ = Arc::clone(&self).process_stream(stream).await;
tracing::info!("Connected to EventSub");
self.set_connected(true);

self.set_connected(false);
*self.session_id.lock().await = None;
let result = self.process_stream(stream).await;

Ok(())
}
.in_current_span(),
);
self.set_connected(false);
*self.session_id.lock().await = None;

Ok(())
result
}

async fn process_stream(self: Arc<Self>, mut stream: Stream) -> Result<(), Error> {
async fn process_stream(&self, mut stream: Stream) -> Result<(), Error> {
loop {
match stream.next().await {
Some(Ok(message)) => match message {
Message::Ping(data) => {
stream.send(Message::Pong(data)).await?;
}
Message::Text(data) => {
if let Some(new_stream) = Arc::clone(&self).handle_text(&data).await? {
if let Some(new_stream) = self.handle_text(&data).await? {
let frame = CloseFrame {
code: CloseCode::Normal,
reason: "Reconnecting".into(),
Expand All @@ -228,7 +216,7 @@ impl EventSubClient {
Some(Err(err)) => {
tracing::error!(%err, "EventSub connection error");

match Arc::clone(&self).reconnect(TWITCH_EVENTSUB_WS_URI).await {
match self.reconnect(TWITCH_EVENTSUB_WS_URI).await {
Ok(new_stream) => {
stream = new_stream;
}
Expand All @@ -250,55 +238,53 @@ impl EventSubClient {
Ok(())
}

async fn handle_text(self: Arc<Self>, data: &str) -> Result<Option<Stream>, Error> {
if let Ok(msg) = serde_json::from_str(data)
&& let Some(url) = Arc::clone(&self).handle_message(msg).await?
{
tracing::info!("Reconnecting to EventSub at {url}");
return Ok(Some(self.reconnect(&url).await?));
}
async fn handle_text(&self, data: &str) -> Result<Option<Stream>, Error> {
let msg: WebSocketMessage = match serde_json::from_str(data) {
Ok(msg) => msg,
Err(err) => {
tracing::warn!(%err, "Failed to deserialize EventSub message");
return Ok(None);
}
};

Ok(None)
let Some(url) = self.handle_message(msg).await? else {
return Ok(None);
};

tracing::info!("Reconnecting to EventSub at {url}");
Ok(Some(self.reconnect(&url).await?))
}

#[tracing::instrument(skip_all)]
async fn handle_message(
self: Arc<Self>,
msg: WebSocketMessage,
) -> Result<Option<String>, Error> {
async fn handle_message(&self, msg: WebSocketMessage) -> Result<Option<String>, Error> {
use WebSocketMessage as Ws;

match msg {
Ws::Welcome(payload) => {
tracing::debug!("Set EventSub session id to {}", payload.session.id);
*self.session_id.lock().await = Some(payload.session.id);

if self
.reconnecting
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
tracing::info!("Initial connection to EventSub established");
let was_reconnecting = self.reconnecting.swap(false, Ordering::Relaxed);

let mut to_restore = Vec::new();
if was_reconnecting {
tracing::info!("Reconnected to EventSub");
} else {
tracing::info!("Initial connection to EventSub established");

{
let to_restore: Vec<_> = {
let mut map = self.subscriptions.lock().await;

if !map.is_empty() {
tracing::info!("Restoring {} subscriptions", map.len());

for (key, sub) in map.drain() {
if let Some((username, _)) = key.split_once(':') {
to_restore.push((
username.to_string(),
sub.kind,
sub.condition,
));
}
}
}
}

map.drain()
.filter_map(|(key, sub)| {
let (username, _) = key.split_once(':')?;
Some((username.to_string(), sub.kind, sub.condition))
})
.collect()
};

self.subscribe(
self.token.login.as_str(),
Expand All @@ -316,8 +302,6 @@ impl EventSubClient {
tracing::error!(%err, "Failed to restore {kind} subscription");
}
}
} else {
tracing::info!("Reconnected to EventSub");
}
}
Ws::Notification(payload) => {
Expand All @@ -327,15 +311,18 @@ impl EventSubClient {
payload.event
);

self.sender.send(payload).unwrap();
if self.sender.send(payload).is_err() {
tracing::warn!("EventSub notification receiver dropped");
}
}
Ws::Reconnect(payload) => {
tracing::warn!("Reconnect requested for {}", payload.session.id);

let url = payload
.session
.reconnect_url
.expect("missing reconnect_url in reconnect payload");
let Some(url) = payload.session.reconnect_url else {
return Err(Error::Generic(anyhow!(
"missing reconnect_url in reconnect payload"
)));
};

self.reconnecting.store(true, Ordering::Relaxed);

Expand All @@ -353,13 +340,13 @@ impl EventSubClient {
.await
.remove(&payload.subscription.kind.to_string());
}
_ => (),
Ws::Keepalive => (),
}

Ok(None)
}

async fn reconnect(self: Arc<Self>, url: &str) -> Result<Stream, Error> {
async fn reconnect(&self, url: &str) -> Result<Stream, Error> {
let (mut stream, _) = connect_async(url).await.map_err(Error::WebSocket)?;

loop {
Expand Down Expand Up @@ -388,7 +375,7 @@ impl EventSubClient {
self.connected.load(Ordering::Relaxed)
}

pub fn set_connected(&self, value: bool) {
fn set_connected(&self, value: bool) {
self.connected.store(value, Ordering::Relaxed);
}

Expand Down Expand Up @@ -445,29 +432,28 @@ impl EventSubClient {
pub async fn subscribe_all(
&self,
channel: &str,
subscriptions: Vec<(EventType, &serde_json::Value)>,
subscriptions: &[(EventType, &serde_json::Value)],
) -> Result<(), Error> {
let futures = subscriptions
.iter()
.map(|&(event, condition)| self.subscribe(channel, event, condition.clone()));

let success = join_all(futures).await.iter().filter(|r| r.is_ok()).count();
let results = join_all(futures).await;
let total = results.len();
let succeeded = results.iter().filter(|r| r.is_ok()).count();

tracing::info!("{} subscriptions created", success);
tracing::info!("{succeeded}/{total} subscriptions created");

Ok(())
}

pub async fn unsubscribe(
&self,
channel: &str,
event: String,
event: &str,
) -> Result<Option<Subscription>, Error> {
let subscription = self
.subscriptions
.lock()
.await
.remove(&format!("{channel}:{event}"));
let key = format!("{channel}:{event}");
let subscription = self.subscriptions.lock().await.remove(&key);

if let Some(ref sub) = subscription {
self.helix
Expand All @@ -484,28 +470,23 @@ impl EventSubClient {
) -> Result<Vec<(EventType, serde_json::Value)>, Error> {
let prefix = format!("{channel}:");

let events = {
let events: Vec<String> = {
let subscriptions = self.subscriptions.lock().await;

subscriptions
.keys()
.filter(|k| k.starts_with(&prefix))
.map(|k| k.strip_prefix(&prefix).unwrap().to_string())
.collect::<Vec<_>>()
.filter_map(|k| k.strip_prefix(&prefix).map(String::from))
.collect()
};

let futures = events
.iter()
.map(|event| self.unsubscribe(channel, event.clone()));

let results = join_all(futures).await;
let mut unsubscribed = Vec::new();
let futures = events.iter().map(|event| self.unsubscribe(channel, event));

for result in results {
if let Ok(Some(sub)) = result {
unsubscribed.push((sub.kind, sub.condition));
}
}
let unsubscribed = join_all(futures)
.await
.into_iter()
.filter_map(|r| r.ok().flatten())
.map(|sub| (sub.kind, sub.condition))
.collect();

Ok(unsubscribed)
}
Expand Down
11 changes: 7 additions & 4 deletions src-tauri/src/eventsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,22 @@ pub async fn connect_eventsub(
drop(guard);

async_runtime::spawn(async move {
if Arc::clone(&client).connect().await.is_err() {
if let Err(err) = client.connect().await {
tracing::error!(%err, "EventSub connection failed");

let state = app_handle.state::<Mutex<AppState>>();
let mut state = state.lock().await;

state.eventsub = None;
}

Ok::<_, Error>(())
});

async_runtime::spawn(async move {
while let Some(message) = incoming.recv().await {
channel.send(message).unwrap();
if channel.send(message).is_err() {
tracing::warn!("EventSub frontend channel closed");
break;
}
}
});

Expand Down
24 changes: 13 additions & 11 deletions src-tauri/src/irc/client/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ClientLoopWorker {
#[must_use]
fn make_new_connection(&mut self) -> PoolConnection {
let connection_id = self.next_connection_id;
self.next_connection_id = self.next_connection_id.overflowing_add(1).0;
self.next_connection_id = self.next_connection_id.wrapping_add(1);

let (connection_incoming_messages_rx, connection) =
Connection::new(Arc::clone(&self.config));
Expand Down Expand Up @@ -120,16 +120,18 @@ impl ClientLoopWorker {
break;
}
incoming_message = connection_incoming_messages_rx.recv() => {
if let Some(incoming_message) = incoming_message {
if let Some(client_loop_tx) = client_loop_tx.upgrade() {
client_loop_tx.send(ClientLoopCommand::IncomingMessage {
source_connection_id: connection_id,
message: Box::new(incoming_message)
}).unwrap();
} else {
break;
}
} else {
let Some(incoming_message) = incoming_message else {
break;
};

let Some(client_loop_tx) = client_loop_tx.upgrade() else {
break;
};

if client_loop_tx.send(ClientLoopCommand::IncomingMessage {
source_connection_id: connection_id,
message: Box::new(incoming_message)
}).is_err() {
break;
}
}
Expand Down
Loading
Loading