From fcdb8960022771f9906a04c7f2ca72c21d3ad165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20H=C3=A4fele?= Date: Sat, 17 Jan 2026 13:30:16 +0100 Subject: [PATCH 1/4] Move test fixtures/mocks to new RemoteViewer.TestFixtures Extract shared test fixtures and mocks from RemoteViewer.IntegrationTests into a new RemoteViewer.TestFixtures project. Update namespaces and using directives accordingly. Add RemoteViewer.TestFixtures.csproj with necessary dependencies and project references. Update test projects to reference the new shared fixture project, improving reusability and maintainability. --- RemoteViewer.slnx | 1 + .../ConnectionHubClientTests.cs | 2 +- .../RemoteViewer.IntegrationTests.csproj | 3 +-- .../Hubs/SignalRHubTests.cs | 2 +- .../RemoteViewer.Server.Tests.csproj | 2 +- .../Fixtures/ClientFixture.cs | 7 +++--- .../Fixtures/ServerFixture.cs | 4 ++-- .../Fixtures/TestHelpers.cs | 2 +- .../Mocks/TestDispatcher.cs | 2 +- .../RemoteViewer.TestFixtures.csproj | 24 +++++++++++++++++++ 10 files changed, 36 insertions(+), 13 deletions(-) rename tests/{RemoteViewer.IntegrationTests => RemoteViewer.TestFixtures}/Fixtures/ClientFixture.cs (98%) rename tests/{RemoteViewer.IntegrationTests => RemoteViewer.TestFixtures}/Fixtures/ServerFixture.cs (94%) rename tests/{RemoteViewer.IntegrationTests => RemoteViewer.TestFixtures}/Fixtures/TestHelpers.cs (96%) rename tests/{RemoteViewer.IntegrationTests => RemoteViewer.TestFixtures}/Mocks/TestDispatcher.cs (82%) create mode 100644 tests/RemoteViewer.TestFixtures/RemoteViewer.TestFixtures.csproj diff --git a/RemoteViewer.slnx b/RemoteViewer.slnx index 028f70a..b9551d8 100644 --- a/RemoteViewer.slnx +++ b/RemoteViewer.slnx @@ -27,5 +27,6 @@ + diff --git a/tests/RemoteViewer.IntegrationTests/ConnectionHubClientTests.cs b/tests/RemoteViewer.IntegrationTests/ConnectionHubClientTests.cs index 73ad36c..d6d7146 100644 --- a/tests/RemoteViewer.IntegrationTests/ConnectionHubClientTests.cs +++ b/tests/RemoteViewer.IntegrationTests/ConnectionHubClientTests.cs @@ -2,7 +2,7 @@ using RemoteViewer.Client.Controls.Dialogs; using RemoteViewer.Client.Services.FileTransfer; using RemoteViewer.Client.Services.HubClient; -using RemoteViewer.IntegrationTests.Fixtures; +using RemoteViewer.TestFixtures.Fixtures; using RemoteViewer.Shared; using RemoteViewer.Shared.Protocol; diff --git a/tests/RemoteViewer.IntegrationTests/RemoteViewer.IntegrationTests.csproj b/tests/RemoteViewer.IntegrationTests/RemoteViewer.IntegrationTests.csproj index 1526d78..680c7a0 100644 --- a/tests/RemoteViewer.IntegrationTests/RemoteViewer.IntegrationTests.csproj +++ b/tests/RemoteViewer.IntegrationTests/RemoteViewer.IntegrationTests.csproj @@ -17,8 +17,7 @@ - - + diff --git a/tests/RemoteViewer.Server.Tests/Hubs/SignalRHubTests.cs b/tests/RemoteViewer.Server.Tests/Hubs/SignalRHubTests.cs index 1a6825d..9ce4f4c 100644 --- a/tests/RemoteViewer.Server.Tests/Hubs/SignalRHubTests.cs +++ b/tests/RemoteViewer.Server.Tests/Hubs/SignalRHubTests.cs @@ -1,7 +1,7 @@ using Microsoft.AspNetCore.Http.Connections; using Microsoft.AspNetCore.SignalR.Client; using Nerdbank.MessagePack.SignalR; -using RemoteViewer.IntegrationTests.Fixtures; +using RemoteViewer.TestFixtures.Fixtures; using RemoteViewer.Shared; namespace RemoteViewer.Server.Tests.Hubs; diff --git a/tests/RemoteViewer.Server.Tests/RemoteViewer.Server.Tests.csproj b/tests/RemoteViewer.Server.Tests/RemoteViewer.Server.Tests.csproj index de402fc..79947d0 100644 --- a/tests/RemoteViewer.Server.Tests/RemoteViewer.Server.Tests.csproj +++ b/tests/RemoteViewer.Server.Tests/RemoteViewer.Server.Tests.csproj @@ -16,7 +16,7 @@ - + diff --git a/tests/RemoteViewer.IntegrationTests/Fixtures/ClientFixture.cs b/tests/RemoteViewer.TestFixtures/Fixtures/ClientFixture.cs similarity index 98% rename from tests/RemoteViewer.IntegrationTests/Fixtures/ClientFixture.cs rename to tests/RemoteViewer.TestFixtures/Fixtures/ClientFixture.cs index 6015344..9b2f6e1 100644 --- a/tests/RemoteViewer.IntegrationTests/Fixtures/ClientFixture.cs +++ b/tests/RemoteViewer.TestFixtures/Fixtures/ClientFixture.cs @@ -1,4 +1,4 @@ -using System.Collections.Immutable; +using System.Collections.Immutable; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; @@ -18,12 +18,11 @@ using RemoteViewer.Client.Services.SessionRecorderIpc; using RemoteViewer.Client.Services.WinServiceIpc; using RemoteViewer.Client.Views.Presenter; -using RemoteViewer.IntegrationTests.Mocks; +using RemoteViewer.TestFixtures.Mocks; using RemoteViewer.Shared; using RemoteViewer.Shared.Protocol; -using TUnit.Core; -namespace RemoteViewer.IntegrationTests.Fixtures; +namespace RemoteViewer.TestFixtures.Fixtures; public class ClientFixture : IAsyncDisposable { diff --git a/tests/RemoteViewer.IntegrationTests/Fixtures/ServerFixture.cs b/tests/RemoteViewer.TestFixtures/Fixtures/ServerFixture.cs similarity index 94% rename from tests/RemoteViewer.IntegrationTests/Fixtures/ServerFixture.cs rename to tests/RemoteViewer.TestFixtures/Fixtures/ServerFixture.cs index 146555c..8854884 100644 --- a/tests/RemoteViewer.IntegrationTests/Fixtures/ServerFixture.cs +++ b/tests/RemoteViewer.TestFixtures/Fixtures/ServerFixture.cs @@ -1,10 +1,10 @@ -using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Mvc.Testing; using Microsoft.AspNetCore.TestHost; using Microsoft.Extensions.Logging; using TUnit.Core.Interfaces; -namespace RemoteViewer.IntegrationTests.Fixtures; +namespace RemoteViewer.TestFixtures.Fixtures; public class ServerFixture : WebApplicationFactory, IAsyncInitializer { diff --git a/tests/RemoteViewer.IntegrationTests/Fixtures/TestHelpers.cs b/tests/RemoteViewer.TestFixtures/Fixtures/TestHelpers.cs similarity index 96% rename from tests/RemoteViewer.IntegrationTests/Fixtures/TestHelpers.cs rename to tests/RemoteViewer.TestFixtures/Fixtures/TestHelpers.cs index d320961..a5efc24 100644 --- a/tests/RemoteViewer.IntegrationTests/Fixtures/TestHelpers.cs +++ b/tests/RemoteViewer.TestFixtures/Fixtures/TestHelpers.cs @@ -1,4 +1,4 @@ -namespace RemoteViewer.IntegrationTests.Fixtures; +namespace RemoteViewer.TestFixtures.Fixtures; public static class TestHelpers { diff --git a/tests/RemoteViewer.IntegrationTests/Mocks/TestDispatcher.cs b/tests/RemoteViewer.TestFixtures/Mocks/TestDispatcher.cs similarity index 82% rename from tests/RemoteViewer.IntegrationTests/Mocks/TestDispatcher.cs rename to tests/RemoteViewer.TestFixtures/Mocks/TestDispatcher.cs index 13f6971..ad86475 100644 --- a/tests/RemoteViewer.IntegrationTests/Mocks/TestDispatcher.cs +++ b/tests/RemoteViewer.TestFixtures/Mocks/TestDispatcher.cs @@ -1,6 +1,6 @@ using RemoteViewer.Client.Services.Dispatching; -namespace RemoteViewer.IntegrationTests.Mocks; +namespace RemoteViewer.TestFixtures.Mocks; public class TestDispatcher : IDispatcher { diff --git a/tests/RemoteViewer.TestFixtures/RemoteViewer.TestFixtures.csproj b/tests/RemoteViewer.TestFixtures/RemoteViewer.TestFixtures.csproj new file mode 100644 index 0000000..dc74d97 --- /dev/null +++ b/tests/RemoteViewer.TestFixtures/RemoteViewer.TestFixtures.csproj @@ -0,0 +1,24 @@ + + + + net10.0-windows7.0 + false + + + + + + + + + + + + + + + + + + + From 63ad51b69421b230a0c5c89ea168eb682b8f7fad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20H=C3=A4fele?= Date: Sat, 17 Jan 2026 13:52:09 +0100 Subject: [PATCH 2/4] Efficient frame coalescing and ack for viewer delivery Introduce ClientSendGrain to queue and coalesce screen frames per viewer, ensuring only the latest frame is delivered after explicit client ack. Update client/server protocol to support frame acknowledgments. Add integration tests for coalescing logic. Improve logging and refactor message delivery for reliability and backpressure. --- .../RemoteViewer.Client.csproj | 2 +- .../Services/HubClient/Connection.cs | 4 + .../Services/HubClient/ConnectionHubClient.cs | 49 +++-- .../HubClient/ConnectionHubClientOptions.cs | 5 +- src/RemoteViewer.Server/Hubs/ConnectionHub.cs | 5 + .../Orleans/Grains/ClientGrain.cs | 10 +- .../Orleans/Grains/ClientSendGrain.cs | 179 ++++++++++++++++++ .../Orleans/Grains/ClientSendGrainLogs.cs | 8 + .../Orleans/Grains/ConnectionGrain.cs | 64 +++++-- .../Services/ConnectionsOrleansService.cs | 6 + .../Services/IConnectionsService.cs | 7 + .../ViewerSendGrainTests.cs | 131 +++++++++++++ 12 files changed, 431 insertions(+), 39 deletions(-) create mode 100644 src/RemoteViewer.Server/Orleans/Grains/ClientSendGrain.cs create mode 100644 src/RemoteViewer.Server/Orleans/Grains/ClientSendGrainLogs.cs create mode 100644 tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs diff --git a/src/RemoteViewer.Client/RemoteViewer.Client.csproj b/src/RemoteViewer.Client/RemoteViewer.Client.csproj index 0655023..ddf6fd3 100644 --- a/src/RemoteViewer.Client/RemoteViewer.Client.csproj +++ b/src/RemoteViewer.Client/RemoteViewer.Client.csproj @@ -15,7 +15,6 @@ - @@ -54,4 +53,5 @@ runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/src/RemoteViewer.Client/Services/HubClient/Connection.cs b/src/RemoteViewer.Client/Services/HubClient/Connection.cs index b67ff02..baefc2b 100644 --- a/src/RemoteViewer.Client/Services/HubClient/Connection.cs +++ b/src/RemoteViewer.Client/Services/HubClient/Connection.cs @@ -463,6 +463,10 @@ async void IConnectionImpl.OnMessageReceived(string senderClientId, string messa { var message = ProtocolSerializer.Deserialize(data); ((IViewerServiceImpl)this.ViewerService!).HandleFrame(message.DisplayId, message.FrameNumber, message.Codec, message.Regions); + + if (this.Owner.Options.SuppressAutoFrameAck is false) + await this.Owner.SendAckFrameAsync(); + break; } diff --git a/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClient.cs b/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClient.cs index 6f6dc34..975251b 100644 --- a/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClient.cs +++ b/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClient.cs @@ -23,13 +23,15 @@ public ConnectionHubClient( { this._logger = logger; this._serviceProvider = serviceProvider; + this.Options = options.Value; this._connection = new HubConnectionBuilder() - .WithUrl($"{options.Value.BaseUrl}/connection", httpOptions => + .WithUrl($"{this.Options.BaseUrl}/connection", httpOptions => { httpOptions.Headers.Add("X-Client-Version", ThisAssembly.AssemblyInformationalVersion); httpOptions.Headers.Add("X-Display-Name", this.DisplayName); }) + .WithAutomaticReconnect() .AddMessagePackProtocol(Witness.GeneratedTypeShapeProvider) .Build(); @@ -174,6 +176,8 @@ private void CloseAllConnections() this._connections.Clear(); } + public ConnectionHubClientOptions Options { get; } + public string? ClientId { get; private set; } public string? Username { get; private set; } public string? Password { get; private set; } @@ -260,9 +264,9 @@ public async Task ConnectToHub() return error; } - catch (Exception ex) when (!this.IsConnected) + catch (Exception ex) { - this._logger.LogWarning(ex, "Failed to connect to device - hub disconnected"); + this._logger.LogWarning(ex, "Failed to connect to device"); return null; } } @@ -278,9 +282,9 @@ public async Task GenerateNewPassword() await this._connection.InvokeAsync("GenerateNewPassword"); this._logger.LogInformation("New password generated"); } - catch (Exception ex) when (!this.IsConnected) + catch (Exception ex) { - this._logger.LogWarning(ex, "Failed to generate new password - hub disconnected"); + this._logger.LogWarning(ex, "Failed to generate new password"); } } @@ -297,9 +301,9 @@ public async Task SetDisplayName(string displayName) await this._connection.InvokeAsync("SetDisplayName", displayName); this._logger.LogInformation("Display name set successfully"); } - catch (Exception ex) when (!this.IsConnected) + catch (Exception ex) { - this._logger.LogWarning(ex, "Failed to set display name - hub disconnected"); + this._logger.LogWarning(ex, "Failed to set display name"); } } @@ -321,9 +325,9 @@ internal async Task SendMessageAsync(string connectionId, string messageType, Re await this._connection.SendAsync("SendMessage", connectionId, messageType, data, destination, targetClientIds); this._logger.LogDebug("Message sent successfully"); } - catch (Exception ex) when (!this.IsConnected) + catch (Exception ex) { - this._logger.LogWarning(ex, "Failed to send message - hub disconnected"); + this._logger.LogWarning(ex, "Failed to send message"); } } @@ -338,9 +342,9 @@ internal async Task DisconnectAsync(string connectionId) await this._connection.InvokeAsync("Disconnect", connectionId); this._logger.LogInformation("Disconnected from connection: {ConnectionId}", connectionId); } - catch (Exception ex) when (!this.IsConnected) + catch (Exception ex) { - this._logger.LogWarning(ex, "Failed to disconnect - hub disconnected"); + this._logger.LogWarning(ex, "Failed to disconnect"); } } @@ -354,9 +358,9 @@ internal async Task SetConnectionPropertiesAsync(string connectionId, Connection this._logger.LogDebug("Setting connection properties - ConnectionId: {ConnectionId}", connectionId); await this._connection.InvokeAsync("SetConnectionProperties", connectionId, properties); } - catch (Exception ex) when (!this.IsConnected) + catch (Exception ex) { - this._logger.LogWarning(ex, "Failed to set connection properties - hub disconnected"); + this._logger.LogWarning(ex, "Failed to set connection properties"); } } @@ -370,12 +374,27 @@ internal async Task SetConnectionPropertiesAsync(string connectionId, Connection this._logger.LogDebug("Generating IPC auth token for connection: {ConnectionId}", connectionId); return await this._connection.InvokeAsync("GenerateIpcAuthToken", connectionId); } - catch (Exception ex) when (!this.IsConnected) + catch (Exception ex) { - this._logger.LogWarning(ex, "Failed to generate IPC auth token - hub disconnected"); + this._logger.LogWarning(ex, "Failed to generate IPC auth token"); return null; } } + + internal async Task SendAckFrameAsync() + { + if (!this.IsConnected || this.IsReconnecting) + return; + + try + { + await this._connection.SendAsync("AckFrame"); + } + catch (Exception ex) + { + this._logger.LogWarning(ex, "Failed to send frame ack"); + } + } } #region EventArgs Classes diff --git a/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClientOptions.cs b/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClientOptions.cs index 2eef82c..bb50be5 100644 --- a/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClientOptions.cs +++ b/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClientOptions.cs @@ -1,6 +1,5 @@ -using Microsoft.AspNetCore.Http.Connections; +namespace RemoteViewer.Client.Services.HubClient; -namespace RemoteViewer.Client.Services.HubClient; public class ConnectionHubClientOptions { @@ -9,4 +8,6 @@ public class ConnectionHubClientOptions #else public string BaseUrl { get; set; } = "https://rdp.xemio.net"; #endif + + public bool SuppressAutoFrameAck { get; set; } } diff --git a/src/RemoteViewer.Server/Hubs/ConnectionHub.cs b/src/RemoteViewer.Server/Hubs/ConnectionHub.cs index 19c0e15..66ef324 100644 --- a/src/RemoteViewer.Server/Hubs/ConnectionHub.cs +++ b/src/RemoteViewer.Server/Hubs/ConnectionHub.cs @@ -79,6 +79,11 @@ public async Task SendMessage(string connectionId, string messageType, byte[] da await clientsService.SendMessage(this.Context.ConnectionId, connectionId, messageType, data, destination, targetClientIds); } + public Task AckFrame() + { + return clientsService.AckFrame(this.Context.ConnectionId); + } + public async Task SetConnectionProperties(string connectionId, ConnectionProperties properties) { await clientsService.SetConnectionProperties(this.Context.ConnectionId, connectionId, properties); diff --git a/src/RemoteViewer.Server/Orleans/Grains/ClientGrain.cs b/src/RemoteViewer.Server/Orleans/Grains/ClientGrain.cs index cea8276..05a54a0 100644 --- a/src/RemoteViewer.Server/Orleans/Grains/ClientGrain.cs +++ b/src/RemoteViewer.Server/Orleans/Grains/ClientGrain.cs @@ -35,6 +35,8 @@ public sealed partial class ClientGrain(ILogger logger, IHubContext private string _displayName = string.Empty; + private IClientSendGrain? _sendGrain; + private IConnectionGrain? _presenterConnectionGrain; private readonly List _viewerConnectionGrains = []; @@ -67,6 +69,8 @@ public async Task Initialize(string? displayName) this.LogUsernameCollision(attempts); } + this._sendGrain = this.GrainFactory.GetGrain(this.GetPrimaryKeyString()); + this.LogClientInitialized(this._clientId, this._usernameGrain.GetPrimaryKeyString()); await hubContext.Clients @@ -93,6 +97,7 @@ public async Task Deactivate() await connection.Internal_RemoveClient(this.AsReference()); } + await this._sendGrain.Disconnect(); await this._usernameGrain.ReleaseAsync(this.GetPrimaryKeyString()); this.DeactivateOnIdle(); @@ -208,14 +213,15 @@ private static string FormatUsername(string username) } return sb.ToString(); } - [MemberNotNull(nameof(_clientId), nameof(_usernameGrain), nameof(_password))] + [MemberNotNull(nameof(_clientId), nameof(_usernameGrain), nameof(_sendGrain), nameof(_password))] private void EnsureInitialized() { - if (this._clientId is null || this._usernameGrain is null || this._password is null) + if (this._clientId is null || this._usernameGrain is null || this._sendGrain is null || this._password is null) { throw new InvalidOperationException( $"ClientGrain not initialized: clientId={(this._clientId is null ? "null" : "set")}, " + $"usernameGrain={(this._usernameGrain is null ? "null" : "set")}, " + + $"sendGrain={(this._sendGrain is null ? "null" : "set")}, " + $"password={(this._password is null ? "null" : "set")}"); } } diff --git a/src/RemoteViewer.Server/Orleans/Grains/ClientSendGrain.cs b/src/RemoteViewer.Server/Orleans/Grains/ClientSendGrain.cs new file mode 100644 index 0000000..643360f --- /dev/null +++ b/src/RemoteViewer.Server/Orleans/Grains/ClientSendGrain.cs @@ -0,0 +1,179 @@ +using System.Diagnostics.CodeAnalysis; +using Microsoft.AspNetCore.SignalR; +using Orleans.Concurrency; +using RemoteViewer.Server.Hubs; +using RemoteViewer.Shared.Protocol; +using System.Threading.Channels; + +namespace RemoteViewer.Server.Orleans.Grains; + +public interface IClientSendGrain : IGrainWithStringKey +{ + Task Enqueue(string connectionId, string senderClientId, string messageType, byte[] data); + Task AckFrame(); + Task Disconnect(); +} + +[Reentrant] +[SuppressMessage("IDisposableAnalyzers", "CA1001", Justification = "Orleans grains don't implement IDisposable; cleanup is in OnDeactivateAsync")] +public sealed partial class ClientSendGrain(ILogger logger, IHubContext hubContext) + : Grain, IClientSendGrain +{ + private readonly Lock _sync = new(); + private readonly Channel _nonFrameChannel = Channel.CreateUnbounded(new() { SingleReader = true, SingleWriter = false }); + private readonly CancellationTokenSource _shutdownCts = new(); + + private QueuedMessage? _pendingFrame; + private bool _frameInFlight; + private Task? _processingTask; + + public override Task OnActivateAsync(CancellationToken cancellationToken) + { + this._processingTask = Task.Run(() => this.ProcessNonFrameMessagesAsync(this._shutdownCts.Token), cancellationToken); + + return Task.CompletedTask; + } + + public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken) + { + this._shutdownCts.Cancel(); + this._nonFrameChannel.Writer.TryComplete(); + + if (this._processingTask is not null) + { + try + { + await this._processingTask; + } + catch (OperationCanceledException) + { + } + } + + this._shutdownCts.Dispose(); + } + + public Task Enqueue(string connectionId, string senderClientId, string messageType, byte[] data) + { + var message = new QueuedMessage(connectionId, senderClientId, messageType, data); + + if (messageType == MessageTypes.Screen.Frame) + { + return this.EnqueueFrame(message); + } + else + { + this._nonFrameChannel.Writer.TryWrite(message); + return Task.CompletedTask; + } + } + + private Task EnqueueFrame(QueuedMessage message) + { + using (this._sync.EnterScope()) + { + if (this._frameInFlight is false) + { + this._frameInFlight = true; + return this.SendFrameAsync(message); + } + else + { + var dropped = this._pendingFrame; + this._pendingFrame = message; + + if (dropped is not null) + { + this.LogFrameDropped(dropped.Value.MessageType); + } + + return Task.CompletedTask; + } + } + } + + public Task AckFrame() + { + QueuedMessage? toSend = null; + + using (this._sync.EnterScope()) + { + if (this._pendingFrame is not null) + { + toSend = this._pendingFrame; + this._pendingFrame = null; + } + else + { + this._frameInFlight = false; + } + } + + return toSend is not null + ? this.SendFrameAsync(toSend.Value) + : Task.CompletedTask; + } + + public async Task Disconnect() + { + this._shutdownCts.Cancel(); + this._nonFrameChannel.Writer.TryComplete(); + + if (this._processingTask is not null) + { + try + { + await this._processingTask; + } + catch (OperationCanceledException) + { + } + } + + this.DeactivateOnIdle(); + } + + private async Task SendFrameAsync(QueuedMessage frame) + { + try + { + await this.SendAsync(frame); + } + catch + { + // If delivery fails, clear in-flight state so the next frame can be sent + using (this._sync.EnterScope()) + { + this._frameInFlight = false; + this._pendingFrame = null; + } + } + } + + private async Task ProcessNonFrameMessagesAsync(CancellationToken ct) + { + try + { + await foreach (var message in this._nonFrameChannel.Reader.ReadAllAsync(ct)) + { + await this.SendAsync(message); + } + } + catch (OperationCanceledException) + { + } + } + + private Task SendAsync(QueuedMessage message) + { + return hubContext.Clients + .Client(this.GetPrimaryKeyString()) + .MessageReceived(message.ConnectionId, message.SenderClientId, message.MessageType, message.Data); + } + + private readonly record struct QueuedMessage( + string ConnectionId, + string SenderClientId, + string MessageType, + byte[] Data); +} diff --git a/src/RemoteViewer.Server/Orleans/Grains/ClientSendGrainLogs.cs b/src/RemoteViewer.Server/Orleans/Grains/ClientSendGrainLogs.cs new file mode 100644 index 0000000..4be64eb --- /dev/null +++ b/src/RemoteViewer.Server/Orleans/Grains/ClientSendGrainLogs.cs @@ -0,0 +1,8 @@ +using Microsoft.Extensions.Logging; +namespace RemoteViewer.Server.Orleans.Grains; + +public sealed partial class ClientSendGrain +{ + [LoggerMessage(Level = LogLevel.Debug, Message = "Dropped frame {MessageType}")] + partial void LogFrameDropped(string messageType); +} diff --git a/src/RemoteViewer.Server/Orleans/Grains/ConnectionGrain.cs b/src/RemoteViewer.Server/Orleans/Grains/ConnectionGrain.cs index 452f637..ac42d8b 100644 --- a/src/RemoteViewer.Server/Orleans/Grains/ConnectionGrain.cs +++ b/src/RemoteViewer.Server/Orleans/Grains/ConnectionGrain.cs @@ -1,7 +1,12 @@ -using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.CodeAnalysis; using Microsoft.AspNetCore.SignalR; using RemoteViewer.Server.Hubs; using RemoteViewer.Shared; +using RemoteViewer.Shared.Protocol; +using Orleans; +using System.Threading; +using System.Threading.Tasks; +using System.Linq; using ConnectionInfo = RemoteViewer.Shared.ConnectionInfo; @@ -66,42 +71,36 @@ public async Task SendMessage(string senderSignalrConnectionId, string messageTy switch (destination) { case MessageDestination.PresenterOnly: - if (!isSenderPresenter && this._presenter is not null) + if (!isSenderPresenter) { - await hubContext.Clients.Client(this._presenter.GetPrimaryKeyString()).MessageReceived(this.GetPrimaryKeyString(), senderClientId, messageType, data); + await this.SendMessageToPresenterAsync(senderClientId, messageType, data); } break; case MessageDestination.AllViewers: - foreach (var viewer in this._viewers) - { - await hubContext.Clients.Client(viewer.GetPrimaryKeyString()).MessageReceived(this.GetPrimaryKeyString(), senderClientId, messageType, data); - } + await Task.WhenAll(this._viewers.Select(viewer => this.SendMessageToViewerAsync(viewer, senderClientId, messageType, data))); break; case MessageDestination.All: - if (this._presenter is not null) - { - await hubContext.Clients.Client(this._presenter.GetPrimaryKeyString()).MessageReceived(this.GetPrimaryKeyString(), senderClientId, messageType, data); - } - foreach (var viewer in this._viewers) - { - await hubContext.Clients.Client(viewer.GetPrimaryKeyString()).MessageReceived(this.GetPrimaryKeyString(), senderClientId, messageType, data); - } + await Task.WhenAll( + this.SendMessageToPresenterAsync(senderClientId, messageType, data), + Task.WhenAll(this._viewers.Select(viewer => this.SendMessageToViewerAsync(viewer, senderClientId, messageType, data)))); break; case MessageDestination.AllExceptSender: + var broadcastTasks = new List(); if (this._presenter is not null && this._presenter.GetPrimaryKeyString() != senderSignalrConnectionId) { - await hubContext.Clients.Client(this._presenter.GetPrimaryKeyString()).MessageReceived(this.GetPrimaryKeyString(), senderClientId, messageType, data); + broadcastTasks.Add(this.SendMessageToPresenterAsync(senderClientId, messageType, data)); } foreach (var viewer in this._viewers) { if (viewer.GetPrimaryKeyString() != senderSignalrConnectionId) { - await hubContext.Clients.Client(viewer.GetPrimaryKeyString()).MessageReceived(this.GetPrimaryKeyString(), senderClientId, messageType, data); + broadcastTasks.Add(this.SendMessageToViewerAsync(viewer, senderClientId, messageType, data)); } } + await Task.WhenAll(broadcastTasks); break; case MessageDestination.SpecificClients: @@ -109,10 +108,11 @@ public async Task SendMessage(string senderSignalrConnectionId, string messageTy break; var targetClientIdSet = targetClientIds.ToHashSet(StringComparer.Ordinal); + var specificTasks = new List(); if (this._presenter is not null && targetClientIdSet.Contains(await this._presenter.Internal_GetClientId())) { - await hubContext.Clients.Client(this._presenter.GetPrimaryKeyString()).MessageReceived(this.GetPrimaryKeyString(), senderClientId, messageType, data); + specificTasks.Add(this.SendMessageToPresenterAsync(senderClientId, messageType, data)); } foreach (var viewer in this._viewers) @@ -120,9 +120,11 @@ public async Task SendMessage(string senderSignalrConnectionId, string messageTy var viewerClientId = await viewer.Internal_GetClientId(); if (targetClientIdSet.Contains(viewerClientId)) { - await hubContext.Clients.Client(viewer.GetPrimaryKeyString()).MessageReceived(this.GetPrimaryKeyString(), senderClientId, messageType, data); + specificTasks.Add(this.SendMessageToViewerAsync(viewer, senderClientId, messageType, data)); } } + + await Task.WhenAll(specificTasks); break; } @@ -231,6 +233,24 @@ private async Task NotifyConnectionChangedAsync() } } + private Task SendMessageToPresenterAsync(string senderClientId, string messageType, byte[] data) + { + if (this._presenter is null) + { + return Task.CompletedTask; + } + + var presenterSenderGrain = this.GrainFactory.GetGrain(this._presenter.GetPrimaryKeyString()); + return presenterSenderGrain.Enqueue(this.GetPrimaryKeyString(), senderClientId, messageType, data); + } + + private Task SendMessageToViewerAsync(IClientGrain viewer, string senderClientId, string messageType, byte[] data) + { + var viewerSignalrId = viewer.GetPrimaryKeyString(); + var senderGrain = this.GrainFactory.GetGrain(viewerSignalrId); + return senderGrain.Enqueue(this.GetPrimaryKeyString(), senderClientId, messageType, data); + } + [MemberNotNull(nameof(_presenter))] private void EnsureInitialized() { @@ -264,4 +284,10 @@ private void EnsureInitialized() [LoggerMessage(Level = LogLevel.Information, Message = "Viewer disconnected: ConnectionId={ConnectionId}, ViewerCount={ViewerCount}")] private partial void LogViewerDisconnected(string connectionId, int viewerCount); + + [LoggerMessage(Level = LogLevel.Debug, Message = "Queued frame for viewer {ViewerSignalrId} (bytes={PayloadBytes})")] + private partial void LogFrameQueued(string viewerSignalrId, int payloadBytes); + + [LoggerMessage(Level = LogLevel.Debug, Message = "Coalesced frame for viewer {ViewerSignalrId}: {PreviousBytes} bytes -> {NewBytes} bytes")] + private partial void LogFrameCoalesced(string viewerSignalrId, int previousBytes, int newBytes); } diff --git a/src/RemoteViewer.Server/Services/ConnectionsOrleansService.cs b/src/RemoteViewer.Server/Services/ConnectionsOrleansService.cs index 84352b4..0a74845 100644 --- a/src/RemoteViewer.Server/Services/ConnectionsOrleansService.cs +++ b/src/RemoteViewer.Server/Services/ConnectionsOrleansService.cs @@ -78,6 +78,12 @@ public async Task SendMessage(string signalrConnectionId, string connectionId, s await connectionGrain.SendMessage(signalrConnectionId, messageType, data, destination, targetClientIds); } + public Task AckFrame(string signalrConnectionId) + { + var grain = grainFactory.GetGrain(signalrConnectionId); + return grain.AckFrame(); + } + public async Task IsPresenterOfConnection(string signalrConnectionId, string connectionId) { var connectionGrain = grainFactory.GetGrain(connectionId); diff --git a/src/RemoteViewer.Server/Services/IConnectionsService.cs b/src/RemoteViewer.Server/Services/IConnectionsService.cs index d91c601..f4b277e 100644 --- a/src/RemoteViewer.Server/Services/IConnectionsService.cs +++ b/src/RemoteViewer.Server/Services/IConnectionsService.cs @@ -21,6 +21,7 @@ public interface IConnectionsService Task DisconnectFromConnection(string signalrConnectionId, string connectionId); Task SetConnectionProperties(string signalrConnectionId, string connectionId, ConnectionProperties properties); Task SendMessage(string signalrConnectionId, string connectionId, string messageType, byte[] data, MessageDestination destination, IReadOnlyList? targetClientIds = null); + Task AckFrame(string signalrConnectionId); Task IsPresenterOfConnection(string signalrConnectionId, string connectionId); } @@ -324,6 +325,12 @@ public async Task SendMessage(string signalrConnectionId, string connectionId, s this._logger.MessageSendCompleted(senderId, connectionId, messageType); } + public Task AckFrame(string signalrConnectionId) + { + // No-op - we don't have frame-backbuffer mechanisms in this implementation + return Task.CompletedTask; + } + public Task IsPresenterOfConnection(string signalrConnectionId, string connectionId) { using (this._lock.ReadLock()) diff --git a/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs b/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs new file mode 100644 index 0000000..6a44680 --- /dev/null +++ b/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs @@ -0,0 +1,131 @@ +using RemoteViewer.Client.Services.HubClient; +using RemoteViewer.TestFixtures.Fixtures; +using RemoteViewer.Shared.Protocol; +using System.Reflection; + +namespace RemoteViewer.IntegrationTests; + +[NotInParallel] +public class ViewerSendGrainTests() +{ + private static readonly ulong[] s_expectedFrames_1_3 = [1UL, 3UL]; + + [ClassDataSource(Shared = SharedType.PerTestSession)] + public required ServerFixture Server { get; init; } + + [Test] + public async Task FramesCoalesceLatestWinsPerViewer() + { + await using var presenter = await this.Server.CreateClientAsync("Presenter"); + await using var viewer = await this.Server.CreateClientAsync("Viewer"); + await this.Server.CreateConnectionAsync(presenter, viewer); + + var presenterConn = presenter.CurrentConnection!; + var viewerConn = viewer.CurrentConnection!; + await InvokePresenterSelectDisplayAsync(presenterConn, viewer.HubClient.ClientId!, "DISPLAY1"); + + // Suppress auto-acks so we can control timing manually + viewer.HubClient.Options.SuppressAutoFrameAck = true; + + var receivedFrames = new List(); + var firstFrameReceived = new TaskCompletionSource(); + + viewerConn.RequiredViewerService.FrameReady += (_, args) => + { + receivedFrames.Add(args.FrameNumber); + firstFrameReceived.TrySetResult(); + }; + + // Send 20 frames rapidly - frame 1 goes immediately, rest coalesce + for (var i = 1; i <= 20; i++) + { + await InvokeSendFrameAsync(presenterConn, (ulong)i); + } + + // Wait for frame 1 to arrive + await firstFrameReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + await Assert.That(receivedFrames).Contains(1UL); + + // Give time for frames 2-20 to coalesce on server + await Task.Delay(100); + + // Ack frame 1 - should get frame 20 (latest) + var secondFrameReceived = new TaskCompletionSource(); + viewerConn.RequiredViewerService.FrameReady += (_, args) => + { + if (args.FrameNumber != 1) + secondFrameReceived.TrySetResult(); + }; + + await SendAckFrameAsync(viewer.HubClient); + await secondFrameReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + await Assert.That(receivedFrames[^1]).IsEqualTo(20UL); + } + + [Test] + public async Task ViewerSendQueueDropOldestWhenBusy() + { + await using var presenter = await this.Server.CreateClientAsync("Presenter"); + await using var viewer = await this.Server.CreateClientAsync("Viewer"); + await this.Server.CreateConnectionAsync(presenter, viewer); + + var presenterConn = presenter.CurrentConnection!; + var viewerConn = viewer.CurrentConnection!; + var viewerService = viewerConn.RequiredViewerService; + await InvokePresenterSelectDisplayAsync(presenterConn, viewer.HubClient.ClientId!, "DISPLAY1"); + + // Suppress auto-acks so we can control timing manually + viewer.HubClient.Options.SuppressAutoFrameAck = true; + + var receivedFrames = new List(); + var firstFrameReceived = new TaskCompletionSource(); + var secondFrameReceived = new TaskCompletionSource(); + + viewerService.FrameReady += (_, args) => + { + receivedFrames.Add(args.FrameNumber); + if (receivedFrames.Count == 1) + firstFrameReceived.TrySetResult(); + else if (receivedFrames.Count >= 2) + secondFrameReceived.TrySetResult(); + }; + + // Send frame 1 - should be delivered immediately + await InvokeSendFrameAsync(presenterConn, 1); + + // Wait for frame 1 to actually arrive before sending more + await firstFrameReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + // DON'T ack frame 1 yet - send frames 2 and 3 + await InvokeSendFrameAsync(presenterConn, 2); + await InvokeSendFrameAsync(presenterConn, 3); + await Task.Delay(100); // Let frames 2 and 3 coalesce on server + + // Now ack - server should send the latest buffered frame (3, not 2) + await SendAckFrameAsync(viewer.HubClient); + + await secondFrameReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + await Assert.That(receivedFrames).IsEquivalentTo(s_expectedFrames_1_3); + } + + private static Task SendAckFrameAsync(ConnectionHubClient client) + { + var method = client.GetType().GetMethod("SendAckFrameAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; + return (Task)method.Invoke(client, Array.Empty())!; + } + + private static Task InvokeSendFrameAsync(Connection connection, ulong frameNumber) + { + var method = connection.GetType().GetMethod("RemoteViewer.Client.Services.HubClient.IConnectionImpl.SendFrameAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; + return (Task)method.Invoke(connection, ["DISPLAY1", frameNumber, FrameCodec.Jpeg90, Array.Empty()])!; + } + + private static Task InvokePresenterSelectDisplayAsync(Connection connection, string viewerClientId, string displayId) + { + var service = connection.PresenterService ?? throw new InvalidOperationException("Presenter service not available."); + var method = service.GetType().GetMethod("RemoteViewer.Client.Services.HubClient.IPresenterServiceImpl.SelectViewerDisplayAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; + return (Task)method.Invoke(service, [viewerClientId, displayId, CancellationToken.None])!; + } +} From db8c6b23c28cf0a8de412b2c74ae1a8cf5005d09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20H=C3=A4fele?= Date: Sat, 17 Jan 2026 21:59:02 +0100 Subject: [PATCH 3/4] Add per-connection frame ack and fix FrameSendState synchronization Add connectionId parameter to AckFrame to support per-connection frame tracking, allowing multiple viewer connections to have independent frame buffers. Fix synchronization issue in FrameSendState where Interlocked.Exchange was used on nullable structs (QueuedMessage?) which doesn't work correctly in .NET. Replaced with proper lock-based approach that atomically manages both _inFlight and _pendingFrame state. Co-Authored-By: Claude Opus 4.5 --- .../Services/HubClient/Connection.cs | 4 +- .../Services/HubClient/ConnectionHubClient.cs | 7 +- src/RemoteViewer.Server/Hubs/ConnectionHub.cs | 5 +- .../Orleans/Grains/ClientSendGrain.cs | 145 +++++++++++++----- .../Services/ConnectionsOrleansService.cs | 7 +- .../Services/IConnectionsService.cs | 8 +- .../ViewerSendGrainTests.cs | 11 +- 7 files changed, 130 insertions(+), 57 deletions(-) diff --git a/src/RemoteViewer.Client/Services/HubClient/Connection.cs b/src/RemoteViewer.Client/Services/HubClient/Connection.cs index baefc2b..9f2f445 100644 --- a/src/RemoteViewer.Client/Services/HubClient/Connection.cs +++ b/src/RemoteViewer.Client/Services/HubClient/Connection.cs @@ -1,4 +1,4 @@ -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using RemoteViewer.Client.Common; using RemoteViewer.Client.Services.FileTransfer; @@ -465,7 +465,7 @@ async void IConnectionImpl.OnMessageReceived(string senderClientId, string messa ((IViewerServiceImpl)this.ViewerService!).HandleFrame(message.DisplayId, message.FrameNumber, message.Codec, message.Regions); if (this.Owner.Options.SuppressAutoFrameAck is false) - await this.Owner.SendAckFrameAsync(); + await this.Owner.SendAckFrameAsync(this.ConnectionId); break; } diff --git a/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClient.cs b/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClient.cs index 975251b..ce1027c 100644 --- a/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClient.cs +++ b/src/RemoteViewer.Client/Services/HubClient/ConnectionHubClient.cs @@ -1,4 +1,4 @@ -using System.Collections.Concurrent; +using System.Collections.Concurrent; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -381,20 +381,21 @@ internal async Task SetConnectionPropertiesAsync(string connectionId, Connection } } - internal async Task SendAckFrameAsync() + internal async Task SendAckFrameAsync(string connectionId) { if (!this.IsConnected || this.IsReconnecting) return; try { - await this._connection.SendAsync("AckFrame"); + await this._connection.SendAsync("AckFrame", connectionId); } catch (Exception ex) { this._logger.LogWarning(ex, "Failed to send frame ack"); } } + } #region EventArgs Classes diff --git a/src/RemoteViewer.Server/Hubs/ConnectionHub.cs b/src/RemoteViewer.Server/Hubs/ConnectionHub.cs index 66ef324..772e0e5 100644 --- a/src/RemoteViewer.Server/Hubs/ConnectionHub.cs +++ b/src/RemoteViewer.Server/Hubs/ConnectionHub.cs @@ -79,11 +79,12 @@ public async Task SendMessage(string connectionId, string messageType, byte[] da await clientsService.SendMessage(this.Context.ConnectionId, connectionId, messageType, data, destination, targetClientIds); } - public Task AckFrame() + public Task AckFrame(string connectionId) { - return clientsService.AckFrame(this.Context.ConnectionId); + return clientsService.AckFrame(this.Context.ConnectionId, connectionId); } + public async Task SetConnectionProperties(string connectionId, ConnectionProperties properties) { await clientsService.SetConnectionProperties(this.Context.ConnectionId, connectionId, properties); diff --git a/src/RemoteViewer.Server/Orleans/Grains/ClientSendGrain.cs b/src/RemoteViewer.Server/Orleans/Grains/ClientSendGrain.cs index 643360f..6f4f25d 100644 --- a/src/RemoteViewer.Server/Orleans/Grains/ClientSendGrain.cs +++ b/src/RemoteViewer.Server/Orleans/Grains/ClientSendGrain.cs @@ -1,16 +1,20 @@ +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; +using System.Threading; using Microsoft.AspNetCore.SignalR; using Orleans.Concurrency; using RemoteViewer.Server.Hubs; using RemoteViewer.Shared.Protocol; using System.Threading.Channels; + namespace RemoteViewer.Server.Orleans.Grains; public interface IClientSendGrain : IGrainWithStringKey { Task Enqueue(string connectionId, string senderClientId, string messageType, byte[] data); - Task AckFrame(); + Task AckFrame(string connectionId); Task Disconnect(); } @@ -19,14 +23,14 @@ public interface IClientSendGrain : IGrainWithStringKey public sealed partial class ClientSendGrain(ILogger logger, IHubContext hubContext) : Grain, IClientSendGrain { - private readonly Lock _sync = new(); private readonly Channel _nonFrameChannel = Channel.CreateUnbounded(new() { SingleReader = true, SingleWriter = false }); private readonly CancellationTokenSource _shutdownCts = new(); - private QueuedMessage? _pendingFrame; - private bool _frameInFlight; + private readonly ConcurrentDictionary _frameStates = new(StringComparer.Ordinal); private Task? _processingTask; + + public override Task OnActivateAsync(CancellationToken cancellationToken) { this._processingTask = Task.Run(() => this.ProcessNonFrameMessagesAsync(this._shutdownCts.Token), cancellationToken); @@ -70,52 +74,52 @@ public Task Enqueue(string connectionId, string senderClientId, string messageTy private Task EnqueueFrame(QueuedMessage message) { - using (this._sync.EnterScope()) - { - if (this._frameInFlight is false) - { - this._frameInFlight = true; - return this.SendFrameAsync(message); - } - else - { - var dropped = this._pendingFrame; - this._pendingFrame = message; + var state = this._frameStates.GetOrAdd(message.ConnectionId, _ => new FrameSendState()); - if (dropped is not null) - { - this.LogFrameDropped(dropped.Value.MessageType); - } + var (wasIdle, dropped) = state.TryEnqueueOrSend(message); - return Task.CompletedTask; - } + if (wasIdle) + { + return this.SendFrameAsync(message); + } + + if (dropped is { } droppedMessage) + { + this.LogFrameDropped(droppedMessage.MessageType); } + + return Task.CompletedTask; } - public Task AckFrame() + public Task AckFrame(string connectionId) { - QueuedMessage? toSend = null; + if (!this._frameStates.TryGetValue(connectionId, out var state)) + { + return Task.CompletedTask; + } + + var toSend = state.TryGetPendingAndClearInFlight(); - using (this._sync.EnterScope()) + if (toSend is { } message) { - if (this._pendingFrame is not null) - { - toSend = this._pendingFrame; - this._pendingFrame = null; - } - else - { - this._frameInFlight = false; - } + return this.SendFrameAsync(message); } - return toSend is not null - ? this.SendFrameAsync(toSend.Value) - : Task.CompletedTask; + this.TryRemoveState(connectionId, state); + return Task.CompletedTask; + } + + private void TryRemoveState(string connectionId, FrameSendState state) + { + if (state.CanRemove()) + { + this._frameStates.TryRemove(new KeyValuePair(connectionId, state)); + } } public async Task Disconnect() { + this._shutdownCts.Cancel(); this._nonFrameChannel.Writer.TryComplete(); @@ -142,14 +146,16 @@ private async Task SendFrameAsync(QueuedMessage frame) catch { // If delivery fails, clear in-flight state so the next frame can be sent - using (this._sync.EnterScope()) + if (this._frameStates.TryGetValue(frame.ConnectionId, out var state)) { - this._frameInFlight = false; - this._pendingFrame = null; + state.ClearOnError(); + this.TryRemoveState(frame.ConnectionId, state); } } } + + private async Task ProcessNonFrameMessagesAsync(CancellationToken ct) { try @@ -176,4 +182,65 @@ private readonly record struct QueuedMessage( string SenderClientId, string MessageType, byte[] Data); + + private sealed class FrameSendState + { + private readonly Lock _lock = new(); + private bool _inFlight; + private QueuedMessage? _pendingFrame; + + public (bool wasIdle, QueuedMessage? dropped) TryEnqueueOrSend(QueuedMessage message) + { + using (this._lock.EnterScope()) + { + if (this._inFlight is false) + { + this._inFlight = true; + return (wasIdle: true, dropped: null); + } + else + { + var dropped = this._pendingFrame; + this._pendingFrame = message; + return (wasIdle: false, dropped: dropped); + } + } + } + + public QueuedMessage? TryGetPendingAndClearInFlight() + { + using (this._lock.EnterScope()) + { + if (this._pendingFrame is { } pending) + { + this._pendingFrame = null; + // Keep _inFlight = true since we're about to send pending + return pending; + } + else + { + this._inFlight = false; + return null; + } + } + } + + public void ClearOnError() + { + using (this._lock.EnterScope()) + { + this._inFlight = false; + this._pendingFrame = null; + } + } + + public bool CanRemove() + { + using (this._lock.EnterScope()) + { + return this._inFlight is false && this._pendingFrame is null; + } + } + } } + diff --git a/src/RemoteViewer.Server/Services/ConnectionsOrleansService.cs b/src/RemoteViewer.Server/Services/ConnectionsOrleansService.cs index 0a74845..c248a83 100644 --- a/src/RemoteViewer.Server/Services/ConnectionsOrleansService.cs +++ b/src/RemoteViewer.Server/Services/ConnectionsOrleansService.cs @@ -1,4 +1,4 @@ -using RemoteViewer.Server.Orleans.Grains; +using RemoteViewer.Server.Orleans.Grains; using RemoteViewer.Shared; namespace RemoteViewer.Server.Services; @@ -78,12 +78,13 @@ public async Task SendMessage(string signalrConnectionId, string connectionId, s await connectionGrain.SendMessage(signalrConnectionId, messageType, data, destination, targetClientIds); } - public Task AckFrame(string signalrConnectionId) + public Task AckFrame(string signalrConnectionId, string connectionId) { var grain = grainFactory.GetGrain(signalrConnectionId); - return grain.AckFrame(); + return grain.AckFrame(connectionId); } + public async Task IsPresenterOfConnection(string signalrConnectionId, string connectionId) { var connectionGrain = grainFactory.GetGrain(connectionId); diff --git a/src/RemoteViewer.Server/Services/IConnectionsService.cs b/src/RemoteViewer.Server/Services/IConnectionsService.cs index f4b277e..dd242f7 100644 --- a/src/RemoteViewer.Server/Services/IConnectionsService.cs +++ b/src/RemoteViewer.Server/Services/IConnectionsService.cs @@ -1,4 +1,4 @@ -using Microsoft.AspNetCore.SignalR; +using Microsoft.AspNetCore.SignalR; using RemoteViewer.Server.Common; using RemoteViewer.Server.Hubs; using RemoteViewer.Shared; @@ -21,7 +21,8 @@ public interface IConnectionsService Task DisconnectFromConnection(string signalrConnectionId, string connectionId); Task SetConnectionProperties(string signalrConnectionId, string connectionId, ConnectionProperties properties); Task SendMessage(string signalrConnectionId, string connectionId, string messageType, byte[] data, MessageDestination destination, IReadOnlyList? targetClientIds = null); - Task AckFrame(string signalrConnectionId); + Task AckFrame(string signalrConnectionId, string connectionId); + Task IsPresenterOfConnection(string signalrConnectionId, string connectionId); } @@ -325,12 +326,13 @@ public async Task SendMessage(string signalrConnectionId, string connectionId, s this._logger.MessageSendCompleted(senderId, connectionId, messageType); } - public Task AckFrame(string signalrConnectionId) + public Task AckFrame(string signalrConnectionId, string connectionId) { // No-op - we don't have frame-backbuffer mechanisms in this implementation return Task.CompletedTask; } + public Task IsPresenterOfConnection(string signalrConnectionId, string connectionId) { using (this._lock.ReadLock()) diff --git a/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs b/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs index 6a44680..2dcd14b 100644 --- a/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs +++ b/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs @@ -1,4 +1,4 @@ -using RemoteViewer.Client.Services.HubClient; +using RemoteViewer.Client.Services.HubClient; using RemoteViewer.TestFixtures.Fixtures; using RemoteViewer.Shared.Protocol; using System.Reflection; @@ -56,8 +56,8 @@ public async Task FramesCoalesceLatestWinsPerViewer() if (args.FrameNumber != 1) secondFrameReceived.TrySetResult(); }; + await SendAckFrameAsync(viewer.HubClient, viewerConn.ConnectionId); - await SendAckFrameAsync(viewer.HubClient); await secondFrameReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); await Assert.That(receivedFrames[^1]).IsEqualTo(20UL); @@ -103,19 +103,20 @@ public async Task ViewerSendQueueDropOldestWhenBusy() await Task.Delay(100); // Let frames 2 and 3 coalesce on server // Now ack - server should send the latest buffered frame (3, not 2) - await SendAckFrameAsync(viewer.HubClient); + await SendAckFrameAsync(viewer.HubClient, viewerConn.ConnectionId); await secondFrameReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); await Assert.That(receivedFrames).IsEquivalentTo(s_expectedFrames_1_3); } - private static Task SendAckFrameAsync(ConnectionHubClient client) + private static Task SendAckFrameAsync(ConnectionHubClient client, string connectionId) { var method = client.GetType().GetMethod("SendAckFrameAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; - return (Task)method.Invoke(client, Array.Empty())!; + return (Task)method.Invoke(client, [connectionId])!; } + private static Task InvokeSendFrameAsync(Connection connection, ulong frameNumber) { var method = connection.GetType().GetMethod("RemoteViewer.Client.Services.HubClient.IConnectionImpl.SendFrameAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; From 18733c37368a11fc2228a4c7a40120b1318bde7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20H=C3=A4fele?= Date: Sun, 18 Jan 2026 11:27:11 +0100 Subject: [PATCH 4/4] Fix ViewerSendGrainTests to be more reliable --- .../ConnectionHubClientTests.cs | 80 +++++++++---------- .../ViewerSendGrainTests.cs | 69 ++++++++-------- .../Fixtures/ClientFixture.cs | 13 ++- .../Fixtures/TestHelpers.cs | 52 ------------ .../RemoteViewer.TestFixtures/TestHelpers.cs | 62 ++++++++++++++ 5 files changed, 146 insertions(+), 130 deletions(-) delete mode 100644 tests/RemoteViewer.TestFixtures/Fixtures/TestHelpers.cs create mode 100644 tests/RemoteViewer.TestFixtures/TestHelpers.cs diff --git a/tests/RemoteViewer.IntegrationTests/ConnectionHubClientTests.cs b/tests/RemoteViewer.IntegrationTests/ConnectionHubClientTests.cs index d6d7146..4619970 100644 --- a/tests/RemoteViewer.IntegrationTests/ConnectionHubClientTests.cs +++ b/tests/RemoteViewer.IntegrationTests/ConnectionHubClientTests.cs @@ -2,6 +2,7 @@ using RemoteViewer.Client.Controls.Dialogs; using RemoteViewer.Client.Services.FileTransfer; using RemoteViewer.Client.Services.HubClient; +using RemoteViewer.TestFixtures; using RemoteViewer.TestFixtures.Fixtures; using RemoteViewer.Shared; using RemoteViewer.Shared.Protocol; @@ -45,7 +46,7 @@ public async Task ChatMessagesAreSentFromViewerToPresenter() var presenterConn = presenter.CurrentConnection!; var viewerConn = viewer.CurrentConnection!; - var receiveTask = TestHelpers.WaitForEventAsync( + var receiveTask = TestHelpers.WaitForEvent( onResult => presenterConn.Chat.MessageReceived += (s, msg) => onResult(msg)); await viewerConn.Chat.SendMessageAsync("Hello from viewer!"); @@ -65,7 +66,7 @@ public async Task ChatMessagesAreSentFromPresenterToViewer() var presenterConn = presenter.CurrentConnection!; var viewerConn = viewer.CurrentConnection!; - var receiveTask = TestHelpers.WaitForEventAsync( + var receiveTask = TestHelpers.WaitForEvent( onResult => viewerConn.Chat.MessageReceived += (s, msg) => onResult(msg)); await presenterConn.Chat.SendMessageAsync("Hello from presenter!"); @@ -89,7 +90,7 @@ public async Task InputBlockingUpdatesConnectionProperties() // Wait for presenter to receive server confirmation // (presenter is now server-authoritative - no local optimistic update) - var presenterPropertyTask = TestHelpers.WaitForEventAsync( + var presenterPropertyTask = TestHelpers.WaitForEvent( onComplete => presenterConn.ConnectionPropertiesChanged += (s, e) => { if (presenterConn.ConnectionProperties.InputBlockedViewerIds.Contains(viewerClientId)) @@ -118,7 +119,7 @@ public async Task ViewerMouseMoveIsSentToPresenter() await viewerConn.RequiredViewerService.SendMouseMoveAsync(0.5f, 0.5f); // Wait for message to be received and processed - await TestHelpers.WaitForReceivedCallAsync(() => + await TestHelpers.WaitForReceivedCall(() => presenter.InputInjectionService.ReceivedCalls() .Any(c => c.GetMethodInfo().Name == "InjectMouseMove")); @@ -143,7 +144,7 @@ public async Task ViewerKeyPressIsSentToPresenter() await viewerConn.RequiredViewerService.SendKeyUpAsync(0x41, KeyModifiers.None); // Wait for both key events to be received - await TestHelpers.WaitForReceivedCallAsync(() => + await TestHelpers.WaitForReceivedCall(() => presenter.InputInjectionService.ReceivedCalls() .Count(c => c.GetMethodInfo().Name == "InjectKey") >= 2); @@ -164,7 +165,7 @@ public async Task PresenterDisconnectClosesViewerConnection() var presenterConn = presenter.CurrentConnection!; var viewerConn = viewer.CurrentConnection!; - var closedTask = TestHelpers.WaitForEventAsync( + var closedTask = TestHelpers.WaitForEvent( onComplete => viewerConn.Closed += (s, e) => onComplete()); await presenterConn.DisconnectAsync(); @@ -206,9 +207,9 @@ public async Task MultipleViewersCanConnectToSamePresenter() var presenterConn = presenter.CurrentConnection!; // Wait for all viewers to be registered (eventual consistency) - await TestHelpers.WaitForConditionAsync( + await TestHelpers.WaitUntil( () => presenterConn.Viewers.Count == 3, - timeoutMessage: $"Expected 3 viewers but got {presenterConn.Viewers.Count}"); + message: $"Expected 3 viewers but got {presenterConn.Viewers.Count}"); await Assert.That(presenterConn.Viewers.Count).IsEqualTo(3); } @@ -226,7 +227,7 @@ public async Task ViewerDisconnectDoesNotAffectOtherViewers() var viewer2Conn = viewer2.CurrentConnection!; // Subscribe BEFORE disconnect to wait for it to be fully processed - var viewer1DisconnectedTask = TestHelpers.WaitForEventAsync( + var viewer1DisconnectedTask = TestHelpers.WaitForEvent( onComplete => presenterConn.ViewersChanged += (s, e) => { if (presenterConn.Viewers.Count == 1) @@ -240,7 +241,7 @@ public async Task ViewerDisconnectDoesNotAffectOtherViewers() await viewer1DisconnectedTask; // Viewer2 should still be connected and able to communicate - var receiveTask = TestHelpers.WaitForEventAsync( + var receiveTask = TestHelpers.WaitForEvent( onResult => presenterConn.Chat.MessageReceived += (s, msg) => onResult(msg)); await viewer2Conn.Chat.SendMessageAsync("I'm still connected!"); @@ -265,7 +266,7 @@ public async Task ViewersChangedEventFiresOnViewerConnect() var presenterConn = await presenterConnTask; // Subscribe BEFORE waiting for viewer connect to avoid race condition - var viewersChangedTask = TestHelpers.WaitForEventAsync( + var viewersChangedTask = TestHelpers.WaitForEvent( onComplete => presenterConn.ViewersChanged += (s, e) => { if (presenterConn.Viewers.Count > 0) @@ -293,7 +294,7 @@ public async Task ConnectionPropertiesChangedEventFires() var viewerClientId = viewer.HubClient.ClientId!; // Subscribe BEFORE triggering the property change to avoid race condition - var propertyChangedTask = TestHelpers.WaitForEventAsync( + var propertyChangedTask = TestHelpers.WaitForEvent( onComplete => viewerConn.ConnectionPropertiesChanged += (s, e) => { if (viewerConn.ConnectionProperties.InputBlockedViewerIds.Contains(viewerClientId)) @@ -322,7 +323,7 @@ public async Task IsClosedReflectsConnectionState() await Assert.That(viewerConn.IsClosed).IsFalse(); // Subscribe first, then disconnect - var closedTask = TestHelpers.WaitForEventAsync( + var closedTask = TestHelpers.WaitForEvent( onComplete => viewerConn.Closed += (s, e) => onComplete()); await viewerConn.DisconnectAsync(); @@ -349,7 +350,7 @@ public async Task ViewerMouseClickIsSentToPresenter() await viewerConn.RequiredViewerService.SendMouseUpAsync(MouseButton.Left, 0.5f, 0.5f); // Wait for both mouse events to be received - await TestHelpers.WaitForReceivedCallAsync(() => + await TestHelpers.WaitForReceivedCall(() => presenter.InputInjectionService.ReceivedCalls() .Count(c => c.GetMethodInfo().Name == "InjectMouseButton") >= 2); @@ -375,7 +376,7 @@ public async Task ViewerMouseWheelIsSentToPresenter() await viewerConn.RequiredViewerService.SendMouseWheelAsync(0f, 120f, 0.5f, 0.5f); // Wait for wheel event to be received - await TestHelpers.WaitForReceivedCallAsync(() => + await TestHelpers.WaitForReceivedCall(() => presenter.InputInjectionService.ReceivedCalls() .Any(c => c.GetMethodInfo().Name == "InjectMouseWheel")); @@ -403,7 +404,7 @@ public async Task InputBlockingPreventsInputInjection() // Wait for PRESENTER to receive server confirmation // (input blocking check happens on presenter, so we must wait for presenter's state) - var presenterPropertyTask = TestHelpers.WaitForEventAsync( + var presenterPropertyTask = TestHelpers.WaitForEvent( onComplete => presenterConn.ConnectionPropertiesChanged += (s, e) => { if (presenterConn.ConnectionProperties.InputBlockedViewerIds.Contains(viewerClientId)) @@ -448,7 +449,7 @@ public async Task MultipleKeyModifiersAreSent() await viewerConn.RequiredViewerService.SendKeyDownAsync(0x41, KeyModifiers.Control | KeyModifiers.Shift); // Wait for key event to be received - await TestHelpers.WaitForReceivedCallAsync(() => + await TestHelpers.WaitForReceivedCall(() => presenter.InputInjectionService.ReceivedCalls() .Any(c => c.GetMethodInfo().Name == "InjectKey")); @@ -473,7 +474,7 @@ public async Task GetMessagesReturnsHistory() var viewerConn = viewer.CurrentConnection!; // Wait for the last message to arrive - var lastMessageTask = TestHelpers.WaitForEventAsync( + var lastMessageTask = TestHelpers.WaitForEvent( onResult => presenterConn.Chat.MessageReceived += (s, msg) => { if (msg.Text == "Message 3") @@ -505,9 +506,9 @@ public async Task MultipleViewersReceiveSameChatMessage() var viewer1Conn = viewer1.CurrentConnection!; var viewer2Conn = viewer2.CurrentConnection!; - var msg1Task = TestHelpers.WaitForEventAsync( + var msg1Task = TestHelpers.WaitForEvent( onResult => viewer1Conn.Chat.MessageReceived += (s, msg) => onResult(msg)); - var msg2Task = TestHelpers.WaitForEventAsync( + var msg2Task = TestHelpers.WaitForEvent( onResult => viewer2Conn.Chat.MessageReceived += (s, msg) => onResult(msg)); await presenterConn.Chat.SendMessageAsync("Broadcast to all!"); @@ -529,7 +530,7 @@ public async Task ChatMessagesContainCorrectSenderInfo() var viewerConn = viewer.CurrentConnection!; // Test 1: Viewer sends to presenter - verify IsFromPresenter is false - var fromViewerTask = TestHelpers.WaitForEventAsync( + var fromViewerTask = TestHelpers.WaitForEvent( onResult => presenterConn.Chat.MessageReceived += (s, msg) => { if (msg.Text == "From viewer") @@ -542,7 +543,7 @@ public async Task ChatMessagesContainCorrectSenderInfo() await Assert.That(fromViewer.IsFromPresenter).IsFalse(); // Test 2: Presenter sends to viewer - verify IsFromPresenter is true - var fromPresenterTask = TestHelpers.WaitForEventAsync( + var fromPresenterTask = TestHelpers.WaitForEvent( onResult => viewerConn.Chat.MessageReceived += (s, msg) => { if (msg.Text == "From presenter") @@ -574,7 +575,7 @@ public async Task ViewerCanBeBlockedWhileOthersAreNot() var viewer1ClientId = viewer1.HubClient.ClientId!; // Wait for property to propagate to viewer2 - var propertyChangedTask = TestHelpers.WaitForEventAsync( + var propertyChangedTask = TestHelpers.WaitForEvent( onComplete => viewer2Conn.ConnectionPropertiesChanged += (s, e) => { if (viewer2Conn.ConnectionProperties.InputBlockedViewerIds.Contains(viewer1ClientId)) @@ -593,7 +594,7 @@ await presenterConn.UpdateConnectionPropertiesAndSend(props => await viewer2Conn.RequiredViewerService.SendMouseMoveAsync(0.5f, 0.5f); // Wait for mouse event to be received - await TestHelpers.WaitForReceivedCallAsync(() => + await TestHelpers.WaitForReceivedCall(() => presenter.InputInjectionService.ReceivedCalls() .Any(c => c.GetMethodInfo().Name == "InjectMouseMove")); @@ -617,9 +618,9 @@ public async Task BroadcastMessagesReachAllViewers() var viewer1Conn = viewer1.CurrentConnection!; var viewer2Conn = viewer2.CurrentConnection!; - var msg1Task = TestHelpers.WaitForEventAsync( + var msg1Task = TestHelpers.WaitForEvent( onResult => viewer1Conn.Chat.MessageReceived += (s, msg) => onResult(msg)); - var msg2Task = TestHelpers.WaitForEventAsync( + var msg2Task = TestHelpers.WaitForEvent( onResult => viewer2Conn.Chat.MessageReceived += (s, msg) => onResult(msg)); await presenterConn.Chat.SendMessageAsync("Broadcast message"); @@ -642,7 +643,7 @@ public async Task AvailableDisplaysChangedEventFires() var viewerConn = viewer.CurrentConnection!; // Subscribe first - var displaysChangedTask = TestHelpers.WaitForEventAsync( + var displaysChangedTask = TestHelpers.WaitForEvent( onComplete => viewerConn.RequiredViewerService.AvailableDisplaysChanged += (s, e) => { if (viewerConn.RequiredViewerService.AvailableDisplays.Count >= 2) @@ -691,7 +692,7 @@ public async Task FileTransferRequestIsSentToPresenter() _ = viewerConn.FileTransfers.SendFileAsync(tempFile); // Wait for the dialog to be called - await TestHelpers.WaitForReceivedCallAsync(() => + await TestHelpers.WaitForReceivedCall(() => presenter.DialogService.ReceivedCalls() .Any(c => c.GetMethodInfo().Name == "ShowFileTransferConfirmationAsync")); @@ -726,7 +727,7 @@ public async Task RejectedFileTransferDoesNotComplete() await File.WriteAllTextAsync(tempFile, "Test file content"); // Subscribe first, then start transfer - var failedTask = TestHelpers.WaitForEventAsync( + var failedTask = TestHelpers.WaitForEvent( onResult => viewerConn.FileTransfers.TransferFailed += (s, e) => onResult(true)); // Start the transfer @@ -867,7 +868,7 @@ public async Task GenerateNewPasswordChangesCredentials() var oldId = presenter.HubClient.ClientId; // Wait for new credentials event - var newCredentialsTask = TestHelpers.WaitForEventAsync( + var newCredentialsTask = TestHelpers.WaitForEvent( onResult => presenter.HubClient.CredentialsAssigned += (s, e) => { // Only trigger on a different password @@ -908,7 +909,7 @@ public async Task SetDisplayNameUpdatesDisplayNameOnServer() var viewerConn = viewer.CurrentConnection!; // Wait for the ViewersChanged event which carries the display name - await TestHelpers.WaitForEventAsync( + await TestHelpers.WaitForEvent( onComplete => viewerConn.ViewersChanged += (s, e) => { if (viewerConn.Presenter?.DisplayName == "CustomPresenterName") @@ -930,7 +931,7 @@ public async Task SetDisplayNameCanBeChangedAfterConnection() var viewerConn = viewer.CurrentConnection!; // Wait for ViewersChanged event with the updated name - var viewersChangedTask = TestHelpers.WaitForEventAsync( + var viewersChangedTask = TestHelpers.WaitForEvent( onComplete => viewerConn.ViewersChanged += (s, e) => { if (viewerConn.Presenter?.DisplayName == "UpdatedName") @@ -996,9 +997,8 @@ public async Task FileTransferSuccessfulTransferCompletesAndFiresEvent() await File.WriteAllTextAsync(tempFile, "Test file content for successful transfer"); // Subscribe to transfer completed event on viewer (sender) side - var completedTask = TestHelpers.WaitForEventAsync( - onResult => viewerConn.FileTransfers.TransferCompleted += (s, e) => onResult(e), - timeout: TimeSpan.FromSeconds(10)); + var completedTask = TestHelpers.WaitForEvent( + onResult => viewerConn.FileTransfers.TransferCompleted += (s, e) => onResult(e)); // Start the transfer _ = viewerConn.FileTransfers.SendFileAsync(tempFile); @@ -1037,9 +1037,8 @@ public async Task FileTransferPresenterCanSendFileToViewer() await File.WriteAllTextAsync(tempFile, "Test file from presenter to viewer"); // Subscribe to transfer completed event on presenter (sender) side - var completedTask = TestHelpers.WaitForEventAsync( - onResult => presenterConn.FileTransfers.TransferCompleted += (s, e) => onResult(e), - timeout: TimeSpan.FromSeconds(10)); + var completedTask = TestHelpers.WaitForEvent( + onResult => presenterConn.FileTransfers.TransferCompleted += (s, e) => onResult(e)); // Start the transfer to specific viewer _ = presenterConn.FileTransfers.SendFileToViewerAsync(tempFile, viewerClientId); @@ -1077,9 +1076,8 @@ public async Task FileTransferCancellationFiresTransferFailedEvent() await File.WriteAllTextAsync(tempFile, "Test file content for cancellation test"); // Subscribe to transfer failed event - var failedTask = TestHelpers.WaitForEventAsync( - onResult => viewerConn.FileTransfers.TransferFailed += (s, e) => onResult(e), - timeout: TimeSpan.FromSeconds(10)); + var failedTask = TestHelpers.WaitForEvent( + onResult => viewerConn.FileTransfers.TransferFailed += (s, e) => onResult(e)); // Start the transfer var operation = await viewerConn.FileTransfers.SendFileAsync(tempFile); diff --git a/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs b/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs index 2dcd14b..0cfac70 100644 --- a/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs +++ b/tests/RemoteViewer.IntegrationTests/ViewerSendGrainTests.cs @@ -1,7 +1,8 @@ -using RemoteViewer.Client.Services.HubClient; +using RemoteViewer.Client.Services.HubClient; using RemoteViewer.TestFixtures.Fixtures; using RemoteViewer.Shared.Protocol; using System.Reflection; +using static RemoteViewer.TestFixtures.TestHelpers; namespace RemoteViewer.IntegrationTests; @@ -28,12 +29,10 @@ public async Task FramesCoalesceLatestWinsPerViewer() viewer.HubClient.Options.SuppressAutoFrameAck = true; var receivedFrames = new List(); - var firstFrameReceived = new TaskCompletionSource(); viewerConn.RequiredViewerService.FrameReady += (_, args) => { receivedFrames.Add(args.FrameNumber); - firstFrameReceived.TrySetResult(); }; // Send 20 frames rapidly - frame 1 goes immediately, rest coalesce @@ -43,23 +42,23 @@ public async Task FramesCoalesceLatestWinsPerViewer() } // Wait for frame 1 to arrive - await firstFrameReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); - await Assert.That(receivedFrames).Contains(1UL); - - // Give time for frames 2-20 to coalesce on server - await Task.Delay(100); - - // Ack frame 1 - should get frame 20 (latest) - var secondFrameReceived = new TaskCompletionSource(); - viewerConn.RequiredViewerService.FrameReady += (_, args) => - { - if (args.FrameNumber != 1) - secondFrameReceived.TrySetResult(); - }; - await SendAckFrameAsync(viewer.HubClient, viewerConn.ConnectionId); - - await secondFrameReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); - + await WaitUntil( + () => receivedFrames.Contains(1UL), + message: "Frame 1 was not received"); + + // Ack and wait for frame 20 (frames should coalesce to latest) + // Retry acking if frame 20 hasn't arrived yet (frames might still be in transit) + await WaitUntil( + async () => + { + await SendAckFrameAsync(viewer.HubClient, viewerConn.ConnectionId); + await Task.Delay(100); // Give time for frame to arrive + return receivedFrames.Contains(20UL); + }, + message: "Frame 20 was not received after acking"); + + // Verify the last received frame is 20 + await Assert.That(receivedFrames.Count).IsEqualTo(2); await Assert.That(receivedFrames[^1]).IsEqualTo(20UL); } @@ -79,34 +78,36 @@ public async Task ViewerSendQueueDropOldestWhenBusy() viewer.HubClient.Options.SuppressAutoFrameAck = true; var receivedFrames = new List(); - var firstFrameReceived = new TaskCompletionSource(); - var secondFrameReceived = new TaskCompletionSource(); viewerService.FrameReady += (_, args) => { receivedFrames.Add(args.FrameNumber); - if (receivedFrames.Count == 1) - firstFrameReceived.TrySetResult(); - else if (receivedFrames.Count >= 2) - secondFrameReceived.TrySetResult(); }; // Send frame 1 - should be delivered immediately await InvokeSendFrameAsync(presenterConn, 1); - // Wait for frame 1 to actually arrive before sending more - await firstFrameReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + // Wait for frame 1 to arrive + await WaitUntil( + () => receivedFrames.Contains(1UL), + message: "Frame 1 was not received"); // DON'T ack frame 1 yet - send frames 2 and 3 await InvokeSendFrameAsync(presenterConn, 2); await InvokeSendFrameAsync(presenterConn, 3); - await Task.Delay(100); // Let frames 2 and 3 coalesce on server - - // Now ack - server should send the latest buffered frame (3, not 2) - await SendAckFrameAsync(viewer.HubClient, viewerConn.ConnectionId); - - await secondFrameReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + // Ack and wait for frame 3 (frames should coalesce, dropping frame 2) + // Retry acking if frame 3 hasn't arrived yet (frames might still be in transit) + await WaitUntil( + async () => + { + await SendAckFrameAsync(viewer.HubClient, viewerConn.ConnectionId); + await Task.Delay(100); // Give time for frame to arrive + return receivedFrames.Contains(3UL); + }, + message: "Frame 3 was not received after acking"); + + // Verify we got exactly frames 1 and 3 (frame 2 was dropped) await Assert.That(receivedFrames).IsEquivalentTo(s_expectedFrames_1_3); } diff --git a/tests/RemoteViewer.TestFixtures/Fixtures/ClientFixture.cs b/tests/RemoteViewer.TestFixtures/Fixtures/ClientFixture.cs index 9b2f6e1..f842d2d 100644 --- a/tests/RemoteViewer.TestFixtures/Fixtures/ClientFixture.cs +++ b/tests/RemoteViewer.TestFixtures/Fixtures/ClientFixture.cs @@ -1,4 +1,4 @@ -using System.Collections.Immutable; +using System.Collections.Immutable; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; @@ -176,12 +176,19 @@ private static ILocalInputMonitorService CreateLocalInputMonitorServiceMock() public async Task WaitForConnectionAsync(TimeSpan? timeout = null) { + var effectiveTimeout = timeout ?? TimeSpan.FromSeconds(30); var tcs = new TaskCompletionSource(); this.HubClient.ConnectionStarted += (s, e) => tcs.TrySetResult(e.Connection); - using var cts = new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(30)); + using var cts = new CancellationTokenSource(effectiveTimeout); cts.Token.Register(() => tcs.TrySetCanceled()); - return await tcs.Task; + var connection = await tcs.Task; + + await TestHelpers.WaitUntil( + () => connection.Presenter is not null && connection.Viewers.Count > 0, + message: "Connection was started but Presenter and Viewer info didn't arrive."); + + return connection; } public async ValueTask DisposeAsync() diff --git a/tests/RemoteViewer.TestFixtures/Fixtures/TestHelpers.cs b/tests/RemoteViewer.TestFixtures/Fixtures/TestHelpers.cs deleted file mode 100644 index a5efc24..0000000 --- a/tests/RemoteViewer.TestFixtures/Fixtures/TestHelpers.cs +++ /dev/null @@ -1,52 +0,0 @@ -namespace RemoteViewer.TestFixtures.Fixtures; - -public static class TestHelpers -{ - public static async Task WaitForEventAsync( - Action> subscribe, - TimeSpan? timeout = null) - { - var tcs = new TaskCompletionSource(); - subscribe(value => tcs.TrySetResult(value)); - - using var cts = new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(5)); - cts.Token.Register(() => tcs.TrySetCanceled()); - return await tcs.Task; - } - - public static async Task WaitForEventAsync( - Action subscribe, - TimeSpan? timeout = null) - { - var tcs = new TaskCompletionSource(); - subscribe(() => tcs.TrySetResult()); - - using var cts = new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(5)); - cts.Token.Register(() => tcs.TrySetCanceled()); - await tcs.Task; - } - - public static async Task WaitForReceivedCallAsync( - Func checkReceived, - TimeSpan? timeout = null) - { - await WaitForConditionAsync(checkReceived, timeout, "WaitForReceivedCallAsync timed out"); - } - - public static async Task WaitForConditionAsync( - Func condition, - TimeSpan? timeout = null, - string? timeoutMessage = null) - { - var deadline = DateTime.UtcNow + (timeout ?? TimeSpan.FromSeconds(5)); - - while (DateTime.UtcNow < deadline) - { - if (condition()) - return; - await Task.Delay(50); - } - - throw new TimeoutException(timeoutMessage ?? "WaitForConditionAsync timed out"); - } -} diff --git a/tests/RemoteViewer.TestFixtures/TestHelpers.cs b/tests/RemoteViewer.TestFixtures/TestHelpers.cs new file mode 100644 index 0000000..6deb3dd --- /dev/null +++ b/tests/RemoteViewer.TestFixtures/TestHelpers.cs @@ -0,0 +1,62 @@ +namespace RemoteViewer.TestFixtures; + +public static class TestHelpers +{ + private static readonly TimeSpan s_defaultTimeout = TimeSpan.FromSeconds(30); + private static readonly TimeSpan s_defaultPollInterval = TimeSpan.FromMilliseconds(50); + + public static async Task WaitForEvent(Action> subscribe) + { + var tcs = new TaskCompletionSource(); + subscribe(value => tcs.TrySetResult(value)); + + using var cts = new CancellationTokenSource(s_defaultTimeout); + cts.Token.Register(() => tcs.TrySetCanceled()); + return await tcs.Task; + } + + public static async Task WaitForEvent(Action subscribe) + { + var tcs = new TaskCompletionSource(); + subscribe(() => tcs.TrySetResult()); + + using var cts = new CancellationTokenSource(s_defaultTimeout); + cts.Token.Register(() => tcs.TrySetCanceled()); + await tcs.Task; + } + + public static async Task WaitForReceivedCall(Func checkReceived) + { + await WaitUntil(checkReceived, message: "WaitForReceivedCall timed out"); + } + + public static async Task WaitUntil(Func condition, string? message = null) + { + var deadline = DateTime.UtcNow + s_defaultTimeout; + + while (DateTime.UtcNow < deadline) + { + if (condition()) + return; + + await Task.Delay(s_defaultPollInterval); + } + + throw new TimeoutException(message ?? $"Condition was not met within {s_defaultTimeout.TotalSeconds} seconds."); + } + + public static async Task WaitUntil(Func> condition, string? message = null) + { + var deadline = DateTime.UtcNow + s_defaultTimeout; + + while (DateTime.UtcNow < deadline) + { + if (await condition()) + return; + + await Task.Delay(s_defaultPollInterval); + } + + throw new TimeoutException(message ?? $"Condition was not met within {s_defaultTimeout.TotalSeconds} seconds."); + } +}