Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions BusLane.Tests/Services/ServiceBus/ServiceBusOperationsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,97 @@ namespace BusLane.Tests.Services.ServiceBus;
using BusLane.Services.ServiceBus;
using FluentAssertions;
using NSubstitute;
using System.Reflection;
using Xunit;

public class ServiceBusOperationsTests
{
[Fact]
public async Task SelectAsync_WithMoreWorkThanBudget_DoesNotExceedConfiguredConcurrency()
{
// Arrange
var activeWorkers = 0;
var maxConcurrentWorkers = 0;
var items = Enumerable.Range(1, 18).ToArray();
const int maxConcurrency = 3;
var workerStartedSignals = items
.Select(_ => new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously))
.ToArray();
var workerReleaseSignals = items
.Select(_ => new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously))
.ToArray();

// Act
var resultsTask = InvokeBoundedAdminProjectorAsync(
items,
async (item, ct) =>
{
var index = item - 1;
var inFlight = Interlocked.Increment(ref activeWorkers);
UpdateMaxValue(ref maxConcurrentWorkers, inFlight);
workerStartedSignals[index].TrySetResult();

try
{
await workerReleaseSignals[index].Task.WaitAsync(ct);
return item * 2;
}
finally
{
Interlocked.Decrement(ref activeWorkers);
}
},
maxConcurrency: maxConcurrency);

await Task.WhenAll(workerStartedSignals.Take(maxConcurrency).Select(static signal => signal.Task))
.WaitAsync(TimeSpan.FromSeconds(1));

maxConcurrentWorkers.Should().Be(maxConcurrency);

for (var batchStart = 0; batchStart < items.Length; batchStart += maxConcurrency)
{
foreach (var signal in workerReleaseSignals.Skip(batchStart).Take(maxConcurrency))
{
signal.TrySetResult();
}

var nextBatchStart = batchStart + maxConcurrency;
if (nextBatchStart >= items.Length)
{
continue;
}

await Task.WhenAll(workerStartedSignals.Skip(nextBatchStart).Take(Math.Min(maxConcurrency, items.Length - nextBatchStart)).Select(static signal => signal.Task))
.WaitAsync(TimeSpan.FromSeconds(1));
}

var results = await resultsTask.WaitAsync(TimeSpan.FromSeconds(1));

// Assert
maxConcurrentWorkers.Should().BeLessThanOrEqualTo(3);
results.Should().Equal(items.Select(static item => item * 2));
}

[Fact]
public async Task SelectAsync_WhenWorkCompletesOutOfOrder_PreservesSourceOrder()
{
// Arrange
var items = new[] { 1, 2, 3, 4 };

// Act
var results = await InvokeBoundedAdminProjectorAsync(
items,
async (item, ct) =>
{
await Task.Delay(TimeSpan.FromMilliseconds((5 - item) * 20), ct);
return $"item-{item}";
},
maxConcurrency: 2);

// Assert
results.Should().Equal("item-1", "item-2", "item-3", "item-4");
}

[Fact(Skip = "Integration test - requires Service Bus client setup. Functionality verified through manual testing.")]
public async Task PeekMessagesAsync_WithSequenceNumber_ShouldCallPeekWithSequenceNumber()
{
Expand All @@ -30,4 +117,42 @@ private IServiceBusOperations CreateOperations(ServiceBusReceiver receiver)
// This is a placeholder - the real implementation would require complex mocking
throw new NotImplementedException("This test requires Service Bus client infrastructure setup");
}

private static async Task<IReadOnlyList<TResult>> InvokeBoundedAdminProjectorAsync<TSource, TResult>(
IReadOnlyList<TSource> source,
Func<TSource, CancellationToken, Task<TResult>> projector,
int maxConcurrency,
CancellationToken ct = default)
{
var helperType = typeof(ConnectionStringOperations).Assembly.GetType("BusLane.Services.ServiceBus.BoundedAdminProjector");
helperType.Should().NotBeNull();

var selectAsync = helperType!.GetMethod("SelectAsync", BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic);
selectAsync.Should().NotBeNull();

var genericMethod = selectAsync!.MakeGenericMethod(typeof(TSource), typeof(TResult));
var task = (Task)genericMethod.Invoke(null, [source, projector, maxConcurrency, ct])!;
await task;

var resultProperty = task.GetType().GetProperty("Result");
resultProperty.Should().NotBeNull();
return (IReadOnlyList<TResult>)resultProperty!.GetValue(task)!;
}

private static void UpdateMaxValue(ref int target, int candidate)
{
while (true)
{
var current = target;
if (candidate <= current)
{
return;
}

if (Interlocked.CompareExchange(ref target, candidate, current) == current)
{
return;
}
}
}
}
168 changes: 166 additions & 2 deletions BusLane.Tests/ViewModels/MainWindowViewModelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace BusLane.Tests.ViewModels;
using BusLane.ViewModels.Dashboard;
using FluentAssertions;
using NSubstitute;
using System.Reflection;
using static BusLane.Services.Infrastructure.SafeJsonSerializer;

public class MainWindowViewModelTests
Expand Down Expand Up @@ -215,6 +216,126 @@ public async Task ConnectionStringTab_DoesNotStartDashboardRefreshUntilChartsAre
sut.FeaturePanels.ShowCharts.Should().BeFalse();
}

[Fact]
public async Task ToggleDeadLetterViewAsync_WithActiveQueue_LoadsMessagesOnce()
{
// Arrange
var preferences = new TestPreferencesService();
var operationsFactory = Substitute.For<IServiceBusOperationsFactory>();
var operations = Substitute.For<IConnectionStringOperations>();
var messageLoadStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
using var sut = CreateSut(preferences, operationsFactory: operationsFactory);

operationsFactory.CreateFromConnectionString(Arg.Any<string>()).Returns(operations);
operations.GetQueueInfoAsync("orders", Arg.Any<CancellationToken>())
.Returns(new QueueInfo(
"orders",
12,
10,
2,
0,
1024,
DateTimeOffset.UtcNow,
false,
TimeSpan.FromDays(14),
TimeSpan.FromMinutes(1)));
operations.PeekMessagesAsync(
"orders",
null,
Arg.Any<int>(),
null,
true,
false,
null,
Arg.Any<CancellationToken>())
.Returns(_ =>
{
messageLoadStarted.TrySetResult();
return Task.FromResult<IEnumerable<MessageInfo>>(Array.Empty<MessageInfo>());
});

var tab = CreateConnectedQueueTab("tab-1", preferences, operationsFactory, connectionName: "Orders", entityName: "orders");
sut.ConnectionTabs.Add(tab);
sut.ActiveTab = tab;

// Act
await sut.ToggleDeadLetterViewCommand.ExecuteAsync(null);
await messageLoadStarted.Task.WaitAsync(TimeSpan.FromSeconds(1));

// Assert
await operations.Received(1).PeekMessagesAsync(
"orders",
null,
preferences.MessagesPerPage,
null,
true,
false,
null,
Arg.Any<CancellationToken>());
}

[Fact]
public async Task AutoRefresh_WhenPreviousTickIsStillRunning_SkipsOverlappingAlertEvaluation()
{
// Arrange
var preferences = new TestPreferencesService
{
AutoRefreshMessages = false
};
var alertService = Substitute.For<IAlertService>();
var activeAlertEvaluations = 0;
var maxConcurrentAlertEvaluations = 0;
var firstEvaluationStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var allowEvaluationToComplete = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
using var sut = CreateSut(
preferences,
alertService: alertService);

sut.Navigation.Queues.Add(new QueueInfo(
"orders",
12,
10,
2,
0,
1024,
DateTimeOffset.UtcNow,
false,
TimeSpan.FromDays(14),
TimeSpan.FromMinutes(1)));
alertService.EvaluateAlertsAsync(Arg.Any<IEnumerable<QueueInfo>>(), Arg.Any<IEnumerable<SubscriptionInfo>>())
.Returns(async _ =>
{
var inFlight = Interlocked.Increment(ref activeAlertEvaluations);
UpdateMaxValue(ref maxConcurrentAlertEvaluations, inFlight);
firstEvaluationStarted.TrySetResult();

try
{
await allowEvaluationToComplete.Task;
return Enumerable.Empty<AlertEvent>();
}
finally
{
Interlocked.Decrement(ref activeAlertEvaluations);
}
});

// Act
var firstTick = InvokeHandleAutoRefreshTickAsync(sut);
await firstEvaluationStarted.Task.WaitAsync(TimeSpan.FromSeconds(1));

var secondTick = InvokeHandleAutoRefreshTickAsync(sut);
await secondTick.WaitAsync(TimeSpan.FromSeconds(1));
allowEvaluationToComplete.TrySetResult();
await firstTick;

// Assert
maxConcurrentAlertEvaluations.Should().Be(1);
await alertService.Received(1).EvaluateAlertsAsync(
Arg.Any<IEnumerable<QueueInfo>>(),
Arg.Any<IEnumerable<SubscriptionInfo>>());
}

[Fact]
public void ShowNamespaceSelectionPrompt_IsTrueOnlyWhenAzureIsReadyWithoutActiveConnection()
{
Expand Down Expand Up @@ -506,7 +627,8 @@ private static MainWindowViewModel CreateSut(
IAppLockService? appLockService = null,
IBiometricAuthService? biometricAuthService = null,
IServiceBusOperationsFactory? operationsFactory = null,
IDashboardRefreshService? dashboardRefreshService = null)
IDashboardRefreshService? dashboardRefreshService = null,
IAlertService? alertService = null)
{
auth ??= Substitute.For<IAzureAuthService>();
var azureResources = Substitute.For<IAzureResourceService>();
Expand All @@ -515,7 +637,7 @@ private static MainWindowViewModel CreateSut(
var connectionBackupService = Substitute.For<IConnectionBackupService>();
var versionService = Substitute.For<IVersionService>();
var liveStreamService = Substitute.For<ILiveStreamService>();
var alertService = Substitute.For<IAlertService>();
alertService ??= Substitute.For<IAlertService>();
var notificationService = Substitute.For<INotificationService>();
updateService ??= Substitute.For<IUpdateService>();
var diagnosticBundleService = Substitute.For<IDiagnosticBundleService>();
Expand Down Expand Up @@ -595,6 +717,41 @@ private static ConnectionTabViewModel CreateTab(
};
}

private static ConnectionTabViewModel CreateConnectedQueueTab(
string tabId,
TestPreferencesService preferences,
IServiceBusOperationsFactory operationsFactory,
string connectionName,
string entityName)
{
var tab = CreateTab(tabId, preferences);
var connection = SavedConnection.Create(
connectionName,
"Endpoint=sb://orders.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test",
ConnectionType.Queue,
entityName: entityName);

tab.ConnectWithConnectionStringAsync(connection, operationsFactory).GetAwaiter().GetResult();
return tab;
}

private static void UpdateMaxValue(ref int target, int candidate)
{
while (true)
{
var current = target;
if (candidate <= current)
{
return;
}

if (Interlocked.CompareExchange(ref target, candidate, current) == current)
{
return;
}
}
}

private static ILogSink CreateLogSink()
{
var logSink = Substitute.For<ILogSink>();
Expand All @@ -608,6 +765,13 @@ private static bool GetIsActive(ConnectionTabViewModel tab)
return property?.GetValue(tab) as bool? ?? false;
}

private static Task InvokeHandleAutoRefreshTickAsync(MainWindowViewModel sut)
{
var method = typeof(MainWindowViewModel).GetMethod("HandleAutoRefreshTickAsync", BindingFlags.Instance | BindingFlags.NonPublic);
method.Should().NotBeNull();
return (Task)method!.Invoke(sut, [])!;
}

private sealed class TestPreferencesService : IPreferencesService
{
public bool ConfirmBeforeDelete { get; set; } = true;
Expand Down
Loading
Loading