From 3138d457bc881e6f17ad3eb471630b0b2edc04c1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Oct 2025 05:37:15 +0000 Subject: [PATCH 1/7] Initial plan From c7b974a13f9be08141b2dd15e72ba3f1b96a28fe Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Oct 2025 05:57:50 +0000 Subject: [PATCH 2/7] Add exception handling in DistributedApplicationEventing and tests Co-authored-by: davidfowl <95136+davidfowl@users.noreply.github.com> --- .../DistributedApplicationEventing.cs | 58 +++++++- .../Eventing/PublishEventException.cs | 40 ++++++ .../Orchestrator/ApplicationOrchestrator.cs | 29 ++++ ...tributedApplicationBuilderEventingTests.cs | 127 ++++++++++++++++++ 4 files changed, 250 insertions(+), 4 deletions(-) create mode 100644 src/Aspire.Hosting/Eventing/PublishEventException.cs diff --git a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs index 5333c276cc9..50ce3ecdc94 100644 --- a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs +++ b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs @@ -25,6 +25,9 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi { if (_eventSubscriptionListLookup.TryGetValue(typeof(T), out var subscriptions)) { + // Determine the resource associated with the event if it's a resource-specific event + var resource = @event is IDistributedApplicationResourceEvent resourceEvent ? resourceEvent.Resource : null; + if (dispatchBehavior == EventDispatchBehavior.BlockingConcurrent || dispatchBehavior == EventDispatchBehavior.NonBlockingConcurrent) { var pendingSubscriptionCallbacks = new List(subscriptions.Count); @@ -39,13 +42,27 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi // Non-blocking concurrent. _ = Task.Run(async () => { - await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false); + try + { + await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false); + } + catch (Exception ex) + { + await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); + } }, default); } else { // Blocking concurrent. - await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false); + try + { + await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false); + } + catch (Exception ex) + { + await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); + } } } else @@ -57,7 +74,14 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi { foreach (var subscription in subscriptions.ToArray()) { - await subscription.Callback(@event, cancellationToken).ConfigureAwait(false); + try + { + await subscription.Callback(@event, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); + } } }, default); } @@ -66,13 +90,39 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi // Blocking sequential. foreach (var subscription in subscriptions.ToArray()) { - await subscription.Callback(@event, cancellationToken).ConfigureAwait(false); + try + { + await subscription.Callback(@event, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); + } } } } } } + private async Task PublishExceptionEventAsync(Exception exception, Type eventType, IResource? resource) + { + // Avoid infinite loop if PublishEventException handler throws + if (eventType == typeof(PublishEventException)) + { + return; + } + + try + { + var exceptionEvent = new PublishEventException(exception, eventType, resource); + await PublishAsync(exceptionEvent, EventDispatchBehavior.BlockingSequential).ConfigureAwait(false); + } + catch + { + // If we can't publish the exception event, there's nothing we can do + } + } + /// public DistributedApplicationEventSubscription Subscribe(Func callback) where T : IDistributedApplicationEvent { diff --git a/src/Aspire.Hosting/Eventing/PublishEventException.cs b/src/Aspire.Hosting/Eventing/PublishEventException.cs new file mode 100644 index 00000000000..68339fd85a7 --- /dev/null +++ b/src/Aspire.Hosting/Eventing/PublishEventException.cs @@ -0,0 +1,40 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Aspire.Hosting.ApplicationModel; + +namespace Aspire.Hosting.Eventing; + +/// +/// This internal event is raised when an exception occurs during event publishing. +/// +internal sealed class PublishEventException : IDistributedApplicationEvent +{ + /// + /// Initializes a new instance of the class. + /// + /// The exception that was thrown. + /// The type of the event that was being published. + /// The resource associated with the event, if it's a resource-specific event. + public PublishEventException(Exception exception, Type eventType, IResource? resource) + { + Exception = exception; + EventType = eventType; + Resource = resource; + } + + /// + /// The exception that was thrown. + /// + public Exception Exception { get; } + + /// + /// The type of the event that was being published. + /// + public Type EventType { get; } + + /// + /// The resource associated with the event, if it's a resource-specific event. + /// + public IResource? Resource { get; } +} diff --git a/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs b/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs index a8246d5c35b..0c7b0eba50d 100644 --- a/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs +++ b/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs @@ -11,6 +11,8 @@ using Aspire.Hosting.Dcp; using Aspire.Hosting.Eventing; using Aspire.Hosting.Lifecycle; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; namespace Aspire.Hosting.Orchestrator; @@ -64,6 +66,7 @@ public ApplicationOrchestrator(DistributedApplicationModel model, _eventing.Subscribe(PublishConnectionStringValue); // Implement WaitFor functionality using BeforeResourceStartedEvent. _eventing.Subscribe(WaitForInBeforeResourceStartedEvent); + _eventing.Subscribe(OnPublishEventException); } private async Task PublishConnectionStringValue(ConnectionStringAvailableEvent @event, CancellationToken token) @@ -308,6 +311,32 @@ private async Task OnResourceEndpointsAllocated(ResourceEndpointsAllocatedEvent await PublishResourceEndpointUrls(@event.Resource, cancellationToken).ConfigureAwait(false); } + private Task OnPublishEventException(PublishEventException @event, CancellationToken cancellationToken) + { + // Log the exception to both the resource-specific logger (if available) and a general logger + if (@event.Resource is not null) + { + var resourceLogger = _loggerService.GetLogger(@event.Resource); + resourceLogger.LogError(@event.Exception, "An exception occurred while publishing event {EventType} for resource {ResourceName}.", @event.EventType.Name, @event.Resource.Name); + } + + // Also log to a general logger using IServiceProvider + var logger = _serviceProvider.GetService>(); + if (logger is not null) + { + if (@event.Resource is not null) + { + logger.LogError(@event.Exception, "An exception occurred while publishing event {EventType} for resource {ResourceName}.", @event.EventType.Name, @event.Resource.Name); + } + else + { + logger.LogError(@event.Exception, "An exception occurred while publishing event {EventType}.", @event.EventType.Name); + } + } + + return Task.CompletedTask; + } + private async Task OnResourceChanged(OnResourceChangedContext context) { // Get the previous state before updating to detect transitions to stopped states diff --git a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs index a71307295b8..df4fb35ec86 100644 --- a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs +++ b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs @@ -335,6 +335,133 @@ public async Task ResourceStoppedEventFiresWhenResourceStops() await resourceStoppedTcs.Task.DefaultTimeout(); } + [Fact] + public async Task ExceptionInEventHandlerIsPublishedAsPublishEventException() + { + using var builder = TestDistributedApplicationBuilder.Create(); + var exceptionEventFired = new TaskCompletionSource(); + var exceptionMessage = "Test exception in event handler"; + + builder.Eventing.Subscribe((@event, ct) => + { + throw new InvalidOperationException(exceptionMessage); + }); + + builder.Eventing.Subscribe((@event, ct) => + { + exceptionEventFired.TrySetResult(@event); + return Task.CompletedTask; + }); + + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingSequential); + + var exceptionEvent = await exceptionEventFired.Task.DefaultTimeout(); + Assert.NotNull(exceptionEvent); + Assert.IsType(exceptionEvent.Exception); + Assert.Equal(exceptionMessage, exceptionEvent.Exception.Message); + Assert.Equal(typeof(DummyEvent), exceptionEvent.EventType); + Assert.Null(exceptionEvent.Resource); + } + + [Fact] + public async Task ExceptionInResourceEventHandlerIncludesResource() + { + using var builder = TestDistributedApplicationBuilder.Create(); + var testResource = builder.AddResource(new TestResource("test-resource")); + var exceptionEventFired = new TaskCompletionSource(); + + builder.Eventing.Subscribe(testResource.Resource, (ResourceReadyEvent @event, CancellationToken ct) => + { + throw new InvalidOperationException("Test exception"); + }); + + builder.Eventing.Subscribe((@event, ct) => + { + exceptionEventFired.TrySetResult(@event); + return Task.CompletedTask; + }); + + using var app = builder.Build(); + await builder.Eventing.PublishAsync(new ResourceReadyEvent(testResource.Resource, app.Services), EventDispatchBehavior.BlockingSequential); + + var exceptionEvent = await exceptionEventFired.Task.DefaultTimeout(); + Assert.NotNull(exceptionEvent); + Assert.Equal(testResource.Resource, exceptionEvent.Resource); + Assert.Equal(typeof(ResourceReadyEvent), exceptionEvent.EventType); + } + + [Fact] + public async Task ExceptionInNonBlockingSequentialHandlerIsPublished() + { + using var builder = TestDistributedApplicationBuilder.Create(); + var exceptionEventFired = new TaskCompletionSource(); + + builder.Eventing.Subscribe((@event, ct) => + { + throw new InvalidOperationException("Test exception"); + }); + + builder.Eventing.Subscribe((@event, ct) => + { + exceptionEventFired.TrySetResult(@event); + return Task.CompletedTask; + }); + + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.NonBlockingSequential); + + var exceptionEvent = await exceptionEventFired.Task.DefaultTimeout(); + Assert.NotNull(exceptionEvent); + Assert.IsType(exceptionEvent.Exception); + } + + [Fact] + public async Task ExceptionInBlockingConcurrentHandlerIsPublished() + { + using var builder = TestDistributedApplicationBuilder.Create(); + var exceptionEventFired = new TaskCompletionSource(); + + builder.Eventing.Subscribe((@event, ct) => + { + throw new InvalidOperationException("Test exception"); + }); + + builder.Eventing.Subscribe((@event, ct) => + { + exceptionEventFired.TrySetResult(@event); + return Task.CompletedTask; + }); + + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingConcurrent); + + var exceptionEvent = await exceptionEventFired.Task.DefaultTimeout(); + Assert.NotNull(exceptionEvent); + Assert.IsType(exceptionEvent.Exception); + } + + [Fact] + public async Task ExceptionInNonBlockingConcurrentHandlerIsPublished() + { + using var builder = TestDistributedApplicationBuilder.Create(); + var exceptionEventFired = new TaskCompletionSource(); + + builder.Eventing.Subscribe((@event, ct) => + { + throw new InvalidOperationException("Test exception"); + }); + + builder.Eventing.Subscribe((@event, ct) => + { + exceptionEventFired.TrySetResult(@event); + return Task.CompletedTask; + }); + + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.NonBlockingConcurrent); + + var exceptionEvent = await exceptionEventFired.Task.DefaultTimeout(); + Assert.NotNull(exceptionEvent); + Assert.IsType(exceptionEvent.Exception); + } + public class DummyEvent : IDistributedApplicationEvent { } From 8a64d35a79c30897f32f03e363665e42dfe96b1e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Oct 2025 06:08:47 +0000 Subject: [PATCH 3/7] Fix concurrent exception handling and update tests Co-authored-by: davidfowl <95136+davidfowl@users.noreply.github.com> --- .../DistributedApplicationEventing.cs | 35 ++++---- ...tributedApplicationBuilderEventingTests.cs | 79 ++++++++++++------- 2 files changed, 65 insertions(+), 49 deletions(-) diff --git a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs index 50ce3ecdc94..e6d287581bb 100644 --- a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs +++ b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs @@ -33,36 +33,30 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi var pendingSubscriptionCallbacks = new List(subscriptions.Count); foreach (var subscription in subscriptions.ToArray()) { - var pendingSubscriptionCallback = subscription.Callback(@event, cancellationToken); - pendingSubscriptionCallbacks.Add(pendingSubscriptionCallback); - } - - if (dispatchBehavior == EventDispatchBehavior.NonBlockingConcurrent) - { - // Non-blocking concurrent. - _ = Task.Run(async () => + // Wrap each callback to catch exceptions individually + var wrappedCallback = Task.Run(async () => { try { - await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false); + await subscription.Callback(@event, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); } - }, default); + }, cancellationToken); + pendingSubscriptionCallbacks.Add(wrappedCallback); + } + + if (dispatchBehavior == EventDispatchBehavior.NonBlockingConcurrent) + { + // Non-blocking concurrent - fire and forget + _ = Task.WhenAll(pendingSubscriptionCallbacks); } else { - // Blocking concurrent. - try - { - await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false); - } - catch (Exception ex) - { - await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); - } + // Blocking concurrent - wait for all to complete + await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false); } } else @@ -115,7 +109,8 @@ private async Task PublishExceptionEventAsync(Exception exception, Type eventTyp try { var exceptionEvent = new PublishEventException(exception, eventType, resource); - await PublishAsync(exceptionEvent, EventDispatchBehavior.BlockingSequential).ConfigureAwait(false); + // Use NonBlockingSequential to avoid potential deadlocks when publishing from within an event handler + await PublishAsync(exceptionEvent, EventDispatchBehavior.NonBlockingSequential).ConfigureAwait(false); } catch { diff --git a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs index df4fb35ec86..b9f526b5682 100644 --- a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs +++ b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs @@ -339,8 +339,9 @@ public async Task ResourceStoppedEventFiresWhenResourceStops() public async Task ExceptionInEventHandlerIsPublishedAsPublishEventException() { using var builder = TestDistributedApplicationBuilder.Create(); - var exceptionEventFired = new TaskCompletionSource(); + PublishEventException? capturedExceptionEvent = null; var exceptionMessage = "Test exception in event handler"; + var tcs = new TaskCompletionSource(); builder.Eventing.Subscribe((@event, ct) => { @@ -349,18 +350,22 @@ public async Task ExceptionInEventHandlerIsPublishedAsPublishEventException() builder.Eventing.Subscribe((@event, ct) => { - exceptionEventFired.TrySetResult(@event); + capturedExceptionEvent = @event; + tcs.TrySetResult(); return Task.CompletedTask; }); await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingSequential); - var exceptionEvent = await exceptionEventFired.Task.DefaultTimeout(); - Assert.NotNull(exceptionEvent); - Assert.IsType(exceptionEvent.Exception); - Assert.Equal(exceptionMessage, exceptionEvent.Exception.Message); - Assert.Equal(typeof(DummyEvent), exceptionEvent.EventType); - Assert.Null(exceptionEvent.Resource); + // Wait for the async exception handler to complete + await tcs.Task.DefaultTimeout(); + + // The exception should have been caught and published + Assert.NotNull(capturedExceptionEvent); + Assert.IsType(capturedExceptionEvent.Exception); + Assert.Equal(exceptionMessage, capturedExceptionEvent.Exception.Message); + Assert.Equal(typeof(DummyEvent), capturedExceptionEvent.EventType); + Assert.Null(capturedExceptionEvent.Resource); } [Fact] @@ -368,7 +373,8 @@ public async Task ExceptionInResourceEventHandlerIncludesResource() { using var builder = TestDistributedApplicationBuilder.Create(); var testResource = builder.AddResource(new TestResource("test-resource")); - var exceptionEventFired = new TaskCompletionSource(); + PublishEventException? capturedExceptionEvent = null; + var tcs = new TaskCompletionSource(); builder.Eventing.Subscribe(testResource.Resource, (ResourceReadyEvent @event, CancellationToken ct) => { @@ -377,24 +383,28 @@ public async Task ExceptionInResourceEventHandlerIncludesResource() builder.Eventing.Subscribe((@event, ct) => { - exceptionEventFired.TrySetResult(@event); + capturedExceptionEvent = @event; + tcs.TrySetResult(); return Task.CompletedTask; }); using var app = builder.Build(); await builder.Eventing.PublishAsync(new ResourceReadyEvent(testResource.Resource, app.Services), EventDispatchBehavior.BlockingSequential); - var exceptionEvent = await exceptionEventFired.Task.DefaultTimeout(); - Assert.NotNull(exceptionEvent); - Assert.Equal(testResource.Resource, exceptionEvent.Resource); - Assert.Equal(typeof(ResourceReadyEvent), exceptionEvent.EventType); + // Wait for the async exception handler to complete + await tcs.Task.DefaultTimeout(); + + Assert.NotNull(capturedExceptionEvent); + Assert.Equal(testResource.Resource, capturedExceptionEvent.Resource); + Assert.Equal(typeof(ResourceReadyEvent), capturedExceptionEvent.EventType); } [Fact] public async Task ExceptionInNonBlockingSequentialHandlerIsPublished() { using var builder = TestDistributedApplicationBuilder.Create(); - var exceptionEventFired = new TaskCompletionSource(); + PublishEventException? capturedExceptionEvent = null; + var tcs = new TaskCompletionSource(); builder.Eventing.Subscribe((@event, ct) => { @@ -403,22 +413,26 @@ public async Task ExceptionInNonBlockingSequentialHandlerIsPublished() builder.Eventing.Subscribe((@event, ct) => { - exceptionEventFired.TrySetResult(@event); + capturedExceptionEvent = @event; + tcs.TrySetResult(); return Task.CompletedTask; }); await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.NonBlockingSequential); - var exceptionEvent = await exceptionEventFired.Task.DefaultTimeout(); - Assert.NotNull(exceptionEvent); - Assert.IsType(exceptionEvent.Exception); + // Wait for the async handler to complete + await tcs.Task.DefaultTimeout(); + + Assert.NotNull(capturedExceptionEvent); + Assert.IsType(capturedExceptionEvent.Exception); } [Fact] public async Task ExceptionInBlockingConcurrentHandlerIsPublished() { using var builder = TestDistributedApplicationBuilder.Create(); - var exceptionEventFired = new TaskCompletionSource(); + PublishEventException? capturedExceptionEvent = null; + var tcs = new TaskCompletionSource(); builder.Eventing.Subscribe((@event, ct) => { @@ -427,22 +441,26 @@ public async Task ExceptionInBlockingConcurrentHandlerIsPublished() builder.Eventing.Subscribe((@event, ct) => { - exceptionEventFired.TrySetResult(@event); + capturedExceptionEvent = @event; + tcs.TrySetResult(); return Task.CompletedTask; }); await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingConcurrent); - var exceptionEvent = await exceptionEventFired.Task.DefaultTimeout(); - Assert.NotNull(exceptionEvent); - Assert.IsType(exceptionEvent.Exception); + // Wait for the async exception handler to complete + await tcs.Task.DefaultTimeout(); + + Assert.NotNull(capturedExceptionEvent); + Assert.IsType(capturedExceptionEvent.Exception); } [Fact] public async Task ExceptionInNonBlockingConcurrentHandlerIsPublished() { using var builder = TestDistributedApplicationBuilder.Create(); - var exceptionEventFired = new TaskCompletionSource(); + PublishEventException? capturedExceptionEvent = null; + var tcs = new TaskCompletionSource(); builder.Eventing.Subscribe((@event, ct) => { @@ -451,15 +469,18 @@ public async Task ExceptionInNonBlockingConcurrentHandlerIsPublished() builder.Eventing.Subscribe((@event, ct) => { - exceptionEventFired.TrySetResult(@event); + capturedExceptionEvent = @event; + tcs.TrySetResult(); return Task.CompletedTask; }); await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.NonBlockingConcurrent); - var exceptionEvent = await exceptionEventFired.Task.DefaultTimeout(); - Assert.NotNull(exceptionEvent); - Assert.IsType(exceptionEvent.Exception); + // Wait for the async handler to complete + await tcs.Task.DefaultTimeout(); + + Assert.NotNull(capturedExceptionEvent); + Assert.IsType(capturedExceptionEvent.Exception); } public class DummyEvent : IDistributedApplicationEvent From fa494afa058609b52f65763f2455783c0779f48d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Oct 2025 06:30:27 +0000 Subject: [PATCH 4/7] Use GetRequiredService for logger in OnPublishEventException Co-authored-by: davidfowl <95136+davidfowl@users.noreply.github.com> --- .../Orchestrator/ApplicationOrchestrator.cs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs b/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs index 0c7b0eba50d..025a407e634 100644 --- a/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs +++ b/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs @@ -321,17 +321,14 @@ private Task OnPublishEventException(PublishEventException @event, CancellationT } // Also log to a general logger using IServiceProvider - var logger = _serviceProvider.GetService>(); - if (logger is not null) + var logger = _serviceProvider.GetRequiredService>(); + if (@event.Resource is not null) { - if (@event.Resource is not null) - { - logger.LogError(@event.Exception, "An exception occurred while publishing event {EventType} for resource {ResourceName}.", @event.EventType.Name, @event.Resource.Name); - } - else - { - logger.LogError(@event.Exception, "An exception occurred while publishing event {EventType}.", @event.EventType.Name); - } + logger.LogError(@event.Exception, "An exception occurred while publishing event {EventType} for resource {ResourceName}.", @event.EventType.Name, @event.Resource.Name); + } + else + { + logger.LogError(@event.Exception, "An exception occurred while publishing event {EventType}.", @event.EventType.Name); } return Task.CompletedTask; From 7e098e38084d9af75b43bb4da364aeb2bf6c0887 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Oct 2025 07:16:34 +0000 Subject: [PATCH 5/7] Rethrow exceptions after publishing PublishEventException Co-authored-by: davidfowl <95136+davidfowl@users.noreply.github.com> --- .../DistributedApplicationEventing.cs | 3 +++ ...tributedApplicationBuilderEventingTests.cs | 20 ++++++++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs index e6d287581bb..9cb17822f55 100644 --- a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs +++ b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs @@ -43,6 +43,7 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi catch (Exception ex) { await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); + throw; } }, cancellationToken); pendingSubscriptionCallbacks.Add(wrappedCallback); @@ -75,6 +76,7 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi catch (Exception ex) { await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); + throw; } } }, default); @@ -91,6 +93,7 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi catch (Exception ex) { await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); + throw; } } } diff --git a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs index b9f526b5682..a258fc38a3f 100644 --- a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs +++ b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs @@ -355,7 +355,11 @@ public async Task ExceptionInEventHandlerIsPublishedAsPublishEventException() return Task.CompletedTask; }); - await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingSequential); + // The exception should be rethrown after being published + var exception = await Assert.ThrowsAsync(async () => + { + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingSequential); + }); // Wait for the async exception handler to complete await tcs.Task.DefaultTimeout(); @@ -366,6 +370,7 @@ public async Task ExceptionInEventHandlerIsPublishedAsPublishEventException() Assert.Equal(exceptionMessage, capturedExceptionEvent.Exception.Message); Assert.Equal(typeof(DummyEvent), capturedExceptionEvent.EventType); Assert.Null(capturedExceptionEvent.Resource); + Assert.Equal(exceptionMessage, exception.Message); } [Fact] @@ -389,7 +394,12 @@ public async Task ExceptionInResourceEventHandlerIncludesResource() }); using var app = builder.Build(); - await builder.Eventing.PublishAsync(new ResourceReadyEvent(testResource.Resource, app.Services), EventDispatchBehavior.BlockingSequential); + + // The exception should be rethrown after being published + await Assert.ThrowsAsync(async () => + { + await builder.Eventing.PublishAsync(new ResourceReadyEvent(testResource.Resource, app.Services), EventDispatchBehavior.BlockingSequential); + }); // Wait for the async exception handler to complete await tcs.Task.DefaultTimeout(); @@ -446,7 +456,11 @@ public async Task ExceptionInBlockingConcurrentHandlerIsPublished() return Task.CompletedTask; }); - await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingConcurrent); + // The exception should be rethrown after being published + await Assert.ThrowsAsync(async () => + { + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingConcurrent); + }); // Wait for the async exception handler to complete await tcs.Task.DefaultTimeout(); From aa2c0d3d03812d0baa0d40bd295ccbb2ab804a9f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Oct 2025 19:46:58 +0000 Subject: [PATCH 6/7] Use Channel for async exception publishing to avoid delays Co-authored-by: mitchdenny <513398+mitchdenny@users.noreply.github.com> --- .../DistributedApplicationEventing.cs | 48 +++++++++++++------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs index 9cb17822f55..739c7cc2d69 100644 --- a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs +++ b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Concurrent; +using System.Threading.Channels; using Aspire.Hosting.ApplicationModel; namespace Aspire.Hosting.Eventing; @@ -11,6 +12,33 @@ public class DistributedApplicationEventing : IDistributedApplicationEventing { private readonly ConcurrentDictionary> _eventSubscriptionListLookup = new(); private readonly ConcurrentDictionary _subscriptionEventTypeLookup = new(); + private readonly Channel<(Exception Exception, Type EventType, IResource? Resource)> _exceptionChannel = Channel.CreateUnbounded<(Exception, Type, IResource?)>(); + + /// + /// Initializes a new instance of the class. + /// + public DistributedApplicationEventing() + { + // Start a background task to process exceptions from the channel + _ = Task.Run(ProcessExceptionChannelAsync); + } + + private async Task ProcessExceptionChannelAsync() + { + await foreach (var (exception, eventType, resource) in _exceptionChannel.Reader.ReadAllAsync().ConfigureAwait(false)) + { + try + { + var exceptionEvent = new PublishEventException(exception, eventType, resource); + // Use NonBlockingSequential to avoid potential deadlocks when publishing from within an event handler + await PublishAsync(exceptionEvent, EventDispatchBehavior.NonBlockingSequential).ConfigureAwait(false); + } + catch + { + // If we can't publish the exception event, there's nothing we can do + } + } + } /// [System.Diagnostics.CodeAnalysis.SuppressMessage("ApiDesign", "RS0026:Do not add multiple public overloads with optional parameters", Justification = "Cancellation token")] @@ -42,7 +70,7 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi } catch (Exception ex) { - await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); + PublishExceptionEventAsync(ex, typeof(T), resource); throw; } }, cancellationToken); @@ -75,7 +103,7 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi } catch (Exception ex) { - await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); + PublishExceptionEventAsync(ex, typeof(T), resource); throw; } } @@ -92,7 +120,7 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi } catch (Exception ex) { - await PublishExceptionEventAsync(ex, typeof(T), resource).ConfigureAwait(false); + PublishExceptionEventAsync(ex, typeof(T), resource); throw; } } @@ -101,7 +129,7 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi } } - private async Task PublishExceptionEventAsync(Exception exception, Type eventType, IResource? resource) + private void PublishExceptionEventAsync(Exception exception, Type eventType, IResource? resource) { // Avoid infinite loop if PublishEventException handler throws if (eventType == typeof(PublishEventException)) @@ -109,16 +137,8 @@ private async Task PublishExceptionEventAsync(Exception exception, Type eventTyp return; } - try - { - var exceptionEvent = new PublishEventException(exception, eventType, resource); - // Use NonBlockingSequential to avoid potential deadlocks when publishing from within an event handler - await PublishAsync(exceptionEvent, EventDispatchBehavior.NonBlockingSequential).ConfigureAwait(false); - } - catch - { - // If we can't publish the exception event, there's nothing we can do - } + // Write to the channel for async processing + _exceptionChannel.Writer.TryWrite((exception, eventType, resource)); } /// From 3a3e8849c749f73fa2f3500539ad201066fd5baf Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 9 Oct 2025 03:29:32 +0000 Subject: [PATCH 7/7] Rename PublishEventException to EventPublishExceptionEvent and make it public Co-authored-by: mitchdenny <513398+mitchdenny@users.noreply.github.com> --- .../DistributedApplicationEventing.cs | 6 ++--- ...ption.cs => EventPublishExceptionEvent.cs} | 8 +++---- .../Orchestrator/ApplicationOrchestrator.cs | 4 ++-- ...tributedApplicationBuilderEventingTests.cs | 22 +++++++++---------- 4 files changed, 20 insertions(+), 20 deletions(-) rename src/Aspire.Hosting/Eventing/{PublishEventException.cs => EventPublishExceptionEvent.cs} (75%) diff --git a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs index 739c7cc2d69..c19a3af67ff 100644 --- a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs +++ b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs @@ -29,7 +29,7 @@ private async Task ProcessExceptionChannelAsync() { try { - var exceptionEvent = new PublishEventException(exception, eventType, resource); + var exceptionEvent = new EventPublishExceptionEvent(exception, eventType, resource); // Use NonBlockingSequential to avoid potential deadlocks when publishing from within an event handler await PublishAsync(exceptionEvent, EventDispatchBehavior.NonBlockingSequential).ConfigureAwait(false); } @@ -131,8 +131,8 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi private void PublishExceptionEventAsync(Exception exception, Type eventType, IResource? resource) { - // Avoid infinite loop if PublishEventException handler throws - if (eventType == typeof(PublishEventException)) + // Avoid infinite loop if EventPublishExceptionEvent handler throws + if (eventType == typeof(EventPublishExceptionEvent)) { return; } diff --git a/src/Aspire.Hosting/Eventing/PublishEventException.cs b/src/Aspire.Hosting/Eventing/EventPublishExceptionEvent.cs similarity index 75% rename from src/Aspire.Hosting/Eventing/PublishEventException.cs rename to src/Aspire.Hosting/Eventing/EventPublishExceptionEvent.cs index 68339fd85a7..9f5c032a187 100644 --- a/src/Aspire.Hosting/Eventing/PublishEventException.cs +++ b/src/Aspire.Hosting/Eventing/EventPublishExceptionEvent.cs @@ -6,17 +6,17 @@ namespace Aspire.Hosting.Eventing; /// -/// This internal event is raised when an exception occurs during event publishing. +/// This event is raised when an exception occurs during event publishing. /// -internal sealed class PublishEventException : IDistributedApplicationEvent +public sealed class EventPublishExceptionEvent : IDistributedApplicationEvent { /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// The exception that was thrown. /// The type of the event that was being published. /// The resource associated with the event, if it's a resource-specific event. - public PublishEventException(Exception exception, Type eventType, IResource? resource) + public EventPublishExceptionEvent(Exception exception, Type eventType, IResource? resource) { Exception = exception; EventType = eventType; diff --git a/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs b/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs index 025a407e634..2de8180af1e 100644 --- a/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs +++ b/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs @@ -66,7 +66,7 @@ public ApplicationOrchestrator(DistributedApplicationModel model, _eventing.Subscribe(PublishConnectionStringValue); // Implement WaitFor functionality using BeforeResourceStartedEvent. _eventing.Subscribe(WaitForInBeforeResourceStartedEvent); - _eventing.Subscribe(OnPublishEventException); + _eventing.Subscribe(OnPublishEventException); } private async Task PublishConnectionStringValue(ConnectionStringAvailableEvent @event, CancellationToken token) @@ -311,7 +311,7 @@ private async Task OnResourceEndpointsAllocated(ResourceEndpointsAllocatedEvent await PublishResourceEndpointUrls(@event.Resource, cancellationToken).ConfigureAwait(false); } - private Task OnPublishEventException(PublishEventException @event, CancellationToken cancellationToken) + private Task OnPublishEventException(EventPublishExceptionEvent @event, CancellationToken cancellationToken) { // Log the exception to both the resource-specific logger (if available) and a general logger if (@event.Resource is not null) diff --git a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs index a258fc38a3f..f1b4a6dbfbd 100644 --- a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs +++ b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs @@ -336,10 +336,10 @@ public async Task ResourceStoppedEventFiresWhenResourceStops() } [Fact] - public async Task ExceptionInEventHandlerIsPublishedAsPublishEventException() + public async Task ExceptionInEventHandlerIsPublishedAsEventPublishExceptionEvent() { using var builder = TestDistributedApplicationBuilder.Create(); - PublishEventException? capturedExceptionEvent = null; + EventPublishExceptionEvent? capturedExceptionEvent = null; var exceptionMessage = "Test exception in event handler"; var tcs = new TaskCompletionSource(); @@ -348,7 +348,7 @@ public async Task ExceptionInEventHandlerIsPublishedAsPublishEventException() throw new InvalidOperationException(exceptionMessage); }); - builder.Eventing.Subscribe((@event, ct) => + builder.Eventing.Subscribe((@event, ct) => { capturedExceptionEvent = @event; tcs.TrySetResult(); @@ -378,7 +378,7 @@ public async Task ExceptionInResourceEventHandlerIncludesResource() { using var builder = TestDistributedApplicationBuilder.Create(); var testResource = builder.AddResource(new TestResource("test-resource")); - PublishEventException? capturedExceptionEvent = null; + EventPublishExceptionEvent? capturedExceptionEvent = null; var tcs = new TaskCompletionSource(); builder.Eventing.Subscribe(testResource.Resource, (ResourceReadyEvent @event, CancellationToken ct) => @@ -386,7 +386,7 @@ public async Task ExceptionInResourceEventHandlerIncludesResource() throw new InvalidOperationException("Test exception"); }); - builder.Eventing.Subscribe((@event, ct) => + builder.Eventing.Subscribe((@event, ct) => { capturedExceptionEvent = @event; tcs.TrySetResult(); @@ -413,7 +413,7 @@ await Assert.ThrowsAsync(async () => public async Task ExceptionInNonBlockingSequentialHandlerIsPublished() { using var builder = TestDistributedApplicationBuilder.Create(); - PublishEventException? capturedExceptionEvent = null; + EventPublishExceptionEvent? capturedExceptionEvent = null; var tcs = new TaskCompletionSource(); builder.Eventing.Subscribe((@event, ct) => @@ -421,7 +421,7 @@ public async Task ExceptionInNonBlockingSequentialHandlerIsPublished() throw new InvalidOperationException("Test exception"); }); - builder.Eventing.Subscribe((@event, ct) => + builder.Eventing.Subscribe((@event, ct) => { capturedExceptionEvent = @event; tcs.TrySetResult(); @@ -441,7 +441,7 @@ public async Task ExceptionInNonBlockingSequentialHandlerIsPublished() public async Task ExceptionInBlockingConcurrentHandlerIsPublished() { using var builder = TestDistributedApplicationBuilder.Create(); - PublishEventException? capturedExceptionEvent = null; + EventPublishExceptionEvent? capturedExceptionEvent = null; var tcs = new TaskCompletionSource(); builder.Eventing.Subscribe((@event, ct) => @@ -449,7 +449,7 @@ public async Task ExceptionInBlockingConcurrentHandlerIsPublished() throw new InvalidOperationException("Test exception"); }); - builder.Eventing.Subscribe((@event, ct) => + builder.Eventing.Subscribe((@event, ct) => { capturedExceptionEvent = @event; tcs.TrySetResult(); @@ -473,7 +473,7 @@ await Assert.ThrowsAsync(async () => public async Task ExceptionInNonBlockingConcurrentHandlerIsPublished() { using var builder = TestDistributedApplicationBuilder.Create(); - PublishEventException? capturedExceptionEvent = null; + EventPublishExceptionEvent? capturedExceptionEvent = null; var tcs = new TaskCompletionSource(); builder.Eventing.Subscribe((@event, ct) => @@ -481,7 +481,7 @@ public async Task ExceptionInNonBlockingConcurrentHandlerIsPublished() throw new InvalidOperationException("Test exception"); }); - builder.Eventing.Subscribe((@event, ct) => + builder.Eventing.Subscribe((@event, ct) => { capturedExceptionEvent = @event; tcs.TrySetResult();