Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion core/src/room_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ struct RoomServiceInner {
// TODO: See if we can use a sync::Mutex instead of tokio::sync::Mutex
room: Mutex<Option<Room>>,
buffer_source: Arc<std::sync::Mutex<Option<NativeVideoSource>>>,
/// Current remote control enabled state. Updated when sharer toggles remote control.
/// Used for rebroadcasting to late joiners.
remote_control_enabled: std::sync::Mutex<bool>,
}

/// RoomService is a wrapper around the LiveKit room, on creation it
Expand Down Expand Up @@ -129,6 +132,7 @@ impl RoomService {
let inner = Arc::new(RoomServiceInner {
room: Mutex::new(None),
buffer_source: Arc::new(std::sync::Mutex::new(None)),
remote_control_enabled: std::sync::Mutex::new(true),
});
let (service_command_tx, service_command_rx) = mpsc::unbounded_channel();
let (service_command_res_tx, service_command_res_rx) = std::sync::mpsc::channel();
Expand Down Expand Up @@ -431,7 +435,12 @@ async fn room_service_commands(
let user_sid = room.local_participant().sid().as_str().to_string();
// TODO: Check if this will need cleanup
/* Spawn thread for handling livekit data events. */
tokio::spawn(handle_room_events(rx, event_loop_proxy, user_sid));
tokio::spawn(handle_room_events(
rx,
event_loop_proxy,
user_sid,
inner.clone(),
));

let mut inner_room = inner.room.lock().await;
*inner_room = Some(room);
Expand Down Expand Up @@ -506,6 +515,9 @@ async fn room_service_commands(
let mut inner_buffer_source = inner.buffer_source.lock().unwrap();
*inner_buffer_source = Some(buffer_source);

let mut rc_state = inner.remote_control_enabled.lock().unwrap();
*rc_state = true;

let res = tx.send(RoomServiceCommandResult::Success);
if let Err(e) = res {
log::error!("room_service_commands: Failed to send result: {e:?}");
Expand Down Expand Up @@ -561,6 +573,12 @@ async fn room_service_commands(
);
}
RoomServiceCommand::PublishControllerCursorEnabled(enabled) => {
// Update internal state for late joiner rebroadcast
{
let mut rc_state = inner.remote_control_enabled.lock().unwrap();
*rc_state = enabled;
}

let inner_room = inner.room.lock().await;
if inner_room.is_none() {
log::warn!("room_service_commands: Room doesn't exist");
Expand Down Expand Up @@ -867,6 +885,7 @@ async fn handle_room_events(
mut receiver: mpsc::UnboundedReceiver<RoomEvent>,
event_loop_proxy: EventLoopProxy<UserEvent>,
user_sid: String,
inner: Arc<RoomServiceInner>,
) {
while let Some(msg) = receiver.recv().await {
match msg {
Expand Down Expand Up @@ -997,6 +1016,40 @@ async fn handle_room_events(
continue;
}

let rc_enabled = {
let rc_state = inner.remote_control_enabled.lock().unwrap();
*rc_state
};

// Get the room and publish the current state
let room_guard = inner.room.lock().await;
if let Some(room) = room_guard.as_ref() {
let local_participant = room.local_participant();
let res = local_participant
.publish_data(DataPacket {
payload: serde_json::to_vec(&ClientEvent::RemoteControlEnabled(
RemoteControlEnabled {
enabled: rc_enabled,
},
))
.unwrap(),
reliable: true,
topic: Some(TOPIC_REMOTE_CONTROL_ENABLED.to_string()),
..Default::default()
})
.await;
if let Err(e) = res {
log::warn!(
"handle_room_events: Failed to rebroadcast remote control state: {e:?}"
);
} else {
log::info!(
"handle_room_events: Rebroadcast remote control state ({}) to late joiner",
rc_enabled
);
}
}

if let Err(e) =
event_loop_proxy.send_event(UserEvent::ParticipantConnected(ParticipantData {
name,
Expand Down
18 changes: 16 additions & 2 deletions tauri/src/components/SharingScreen/SharingScreen.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ import Draggable from "react-draggable";
import { throttle } from "lodash";
import { RiDraggable } from "react-icons/ri";
import { HiPencil } from "react-icons/hi2";
import { LiveKitRoom, useDataChannel, useLocalParticipant, useRoomContext, useTracks, VideoTrack } from "@livekit/components-react";
import {
LiveKitRoom,
useDataChannel,
useLocalParticipant,
useRoomContext,
useTracks,
VideoTrack,
} from "@livekit/components-react";
import { ConnectionState, DataPublishOptions, LocalParticipant, Track } from "livekit-client";
import React, { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { resizeWindow } from "./utils";
Expand Down Expand Up @@ -123,7 +130,14 @@ const ConsumerComponent = React.memo(() => {
useDataChannel("remote_control_enabled", (msg) => {
const decoder = new TextDecoder();
const payload: TPRemoteControlEnabled = JSON.parse(decoder.decode(msg.payload));
if (payload.payload.enabled == false) {
const newValue = payload.payload.enabled;

// Skip if value matches current state
if (newValue === isRemoteControlEnabled) {
return;
}

if (newValue === false) {
updateCallTokens({
isRemoteControlEnabled: false,
});
Expand Down
Loading