diff --git a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs index 5333c276cc9..c19a3af67ff 100644 --- a/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs +++ b/src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Concurrent; +using System.Threading.Channels; using Aspire.Hosting.ApplicationModel; namespace Aspire.Hosting.Eventing; @@ -11,6 +12,33 @@ public class DistributedApplicationEventing : IDistributedApplicationEventing { private readonly ConcurrentDictionary> _eventSubscriptionListLookup = new(); private readonly ConcurrentDictionary _subscriptionEventTypeLookup = new(); + private readonly Channel<(Exception Exception, Type EventType, IResource? Resource)> _exceptionChannel = Channel.CreateUnbounded<(Exception, Type, IResource?)>(); + + /// + /// Initializes a new instance of the class. + /// + public DistributedApplicationEventing() + { + // Start a background task to process exceptions from the channel + _ = Task.Run(ProcessExceptionChannelAsync); + } + + private async Task ProcessExceptionChannelAsync() + { + await foreach (var (exception, eventType, resource) in _exceptionChannel.Reader.ReadAllAsync().ConfigureAwait(false)) + { + try + { + var exceptionEvent = new EventPublishExceptionEvent(exception, eventType, resource); + // Use NonBlockingSequential to avoid potential deadlocks when publishing from within an event handler + await PublishAsync(exceptionEvent, EventDispatchBehavior.NonBlockingSequential).ConfigureAwait(false); + } + catch + { + // If we can't publish the exception event, there's nothing we can do + } + } + } /// [System.Diagnostics.CodeAnalysis.SuppressMessage("ApiDesign", "RS0026:Do not add multiple public overloads with optional parameters", Justification = "Cancellation token")] @@ -25,26 +53,38 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi { if (_eventSubscriptionListLookup.TryGetValue(typeof(T), out var subscriptions)) { + // Determine the resource associated with the event if it's a resource-specific event + var resource = @event is IDistributedApplicationResourceEvent resourceEvent ? resourceEvent.Resource : null; + if (dispatchBehavior == EventDispatchBehavior.BlockingConcurrent || dispatchBehavior == EventDispatchBehavior.NonBlockingConcurrent) { var pendingSubscriptionCallbacks = new List(subscriptions.Count); foreach (var subscription in subscriptions.ToArray()) { - var pendingSubscriptionCallback = subscription.Callback(@event, cancellationToken); - pendingSubscriptionCallbacks.Add(pendingSubscriptionCallback); + // Wrap each callback to catch exceptions individually + var wrappedCallback = Task.Run(async () => + { + try + { + await subscription.Callback(@event, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + PublishExceptionEventAsync(ex, typeof(T), resource); + throw; + } + }, cancellationToken); + pendingSubscriptionCallbacks.Add(wrappedCallback); } if (dispatchBehavior == EventDispatchBehavior.NonBlockingConcurrent) { - // Non-blocking concurrent. - _ = Task.Run(async () => - { - await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false); - }, default); + // Non-blocking concurrent - fire and forget + _ = Task.WhenAll(pendingSubscriptionCallbacks); } else { - // Blocking concurrent. + // Blocking concurrent - wait for all to complete await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false); } } @@ -57,7 +97,15 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi { foreach (var subscription in subscriptions.ToArray()) { - await subscription.Callback(@event, cancellationToken).ConfigureAwait(false); + try + { + await subscription.Callback(@event, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + PublishExceptionEventAsync(ex, typeof(T), resource); + throw; + } } }, default); } @@ -66,13 +114,33 @@ public async Task PublishAsync(T @event, EventDispatchBehavior dispatchBehavi // Blocking sequential. foreach (var subscription in subscriptions.ToArray()) { - await subscription.Callback(@event, cancellationToken).ConfigureAwait(false); + try + { + await subscription.Callback(@event, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + PublishExceptionEventAsync(ex, typeof(T), resource); + throw; + } } } } } } + private void PublishExceptionEventAsync(Exception exception, Type eventType, IResource? resource) + { + // Avoid infinite loop if EventPublishExceptionEvent handler throws + if (eventType == typeof(EventPublishExceptionEvent)) + { + return; + } + + // Write to the channel for async processing + _exceptionChannel.Writer.TryWrite((exception, eventType, resource)); + } + /// public DistributedApplicationEventSubscription Subscribe(Func callback) where T : IDistributedApplicationEvent { diff --git a/src/Aspire.Hosting/Eventing/EventPublishExceptionEvent.cs b/src/Aspire.Hosting/Eventing/EventPublishExceptionEvent.cs new file mode 100644 index 00000000000..9f5c032a187 --- /dev/null +++ b/src/Aspire.Hosting/Eventing/EventPublishExceptionEvent.cs @@ -0,0 +1,40 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Aspire.Hosting.ApplicationModel; + +namespace Aspire.Hosting.Eventing; + +/// +/// This event is raised when an exception occurs during event publishing. +/// +public sealed class EventPublishExceptionEvent : IDistributedApplicationEvent +{ + /// + /// Initializes a new instance of the class. + /// + /// The exception that was thrown. + /// The type of the event that was being published. + /// The resource associated with the event, if it's a resource-specific event. + public EventPublishExceptionEvent(Exception exception, Type eventType, IResource? resource) + { + Exception = exception; + EventType = eventType; + Resource = resource; + } + + /// + /// The exception that was thrown. + /// + public Exception Exception { get; } + + /// + /// The type of the event that was being published. + /// + public Type EventType { get; } + + /// + /// The resource associated with the event, if it's a resource-specific event. + /// + public IResource? Resource { get; } +} diff --git a/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs b/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs index a8246d5c35b..2de8180af1e 100644 --- a/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs +++ b/src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs @@ -11,6 +11,8 @@ using Aspire.Hosting.Dcp; using Aspire.Hosting.Eventing; using Aspire.Hosting.Lifecycle; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; namespace Aspire.Hosting.Orchestrator; @@ -64,6 +66,7 @@ public ApplicationOrchestrator(DistributedApplicationModel model, _eventing.Subscribe(PublishConnectionStringValue); // Implement WaitFor functionality using BeforeResourceStartedEvent. _eventing.Subscribe(WaitForInBeforeResourceStartedEvent); + _eventing.Subscribe(OnPublishEventException); } private async Task PublishConnectionStringValue(ConnectionStringAvailableEvent @event, CancellationToken token) @@ -308,6 +311,29 @@ private async Task OnResourceEndpointsAllocated(ResourceEndpointsAllocatedEvent await PublishResourceEndpointUrls(@event.Resource, cancellationToken).ConfigureAwait(false); } + private Task OnPublishEventException(EventPublishExceptionEvent @event, CancellationToken cancellationToken) + { + // Log the exception to both the resource-specific logger (if available) and a general logger + if (@event.Resource is not null) + { + var resourceLogger = _loggerService.GetLogger(@event.Resource); + resourceLogger.LogError(@event.Exception, "An exception occurred while publishing event {EventType} for resource {ResourceName}.", @event.EventType.Name, @event.Resource.Name); + } + + // Also log to a general logger using IServiceProvider + var logger = _serviceProvider.GetRequiredService>(); + if (@event.Resource is not null) + { + logger.LogError(@event.Exception, "An exception occurred while publishing event {EventType} for resource {ResourceName}.", @event.EventType.Name, @event.Resource.Name); + } + else + { + logger.LogError(@event.Exception, "An exception occurred while publishing event {EventType}.", @event.EventType.Name); + } + + return Task.CompletedTask; + } + private async Task OnResourceChanged(OnResourceChangedContext context) { // Get the previous state before updating to detect transitions to stopped states diff --git a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs index a71307295b8..f1b4a6dbfbd 100644 --- a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs +++ b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs @@ -335,6 +335,168 @@ public async Task ResourceStoppedEventFiresWhenResourceStops() await resourceStoppedTcs.Task.DefaultTimeout(); } + [Fact] + public async Task ExceptionInEventHandlerIsPublishedAsEventPublishExceptionEvent() + { + using var builder = TestDistributedApplicationBuilder.Create(); + EventPublishExceptionEvent? capturedExceptionEvent = null; + var exceptionMessage = "Test exception in event handler"; + var tcs = new TaskCompletionSource(); + + builder.Eventing.Subscribe((@event, ct) => + { + throw new InvalidOperationException(exceptionMessage); + }); + + builder.Eventing.Subscribe((@event, ct) => + { + capturedExceptionEvent = @event; + tcs.TrySetResult(); + return Task.CompletedTask; + }); + + // The exception should be rethrown after being published + var exception = await Assert.ThrowsAsync(async () => + { + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingSequential); + }); + + // Wait for the async exception handler to complete + await tcs.Task.DefaultTimeout(); + + // The exception should have been caught and published + Assert.NotNull(capturedExceptionEvent); + Assert.IsType(capturedExceptionEvent.Exception); + Assert.Equal(exceptionMessage, capturedExceptionEvent.Exception.Message); + Assert.Equal(typeof(DummyEvent), capturedExceptionEvent.EventType); + Assert.Null(capturedExceptionEvent.Resource); + Assert.Equal(exceptionMessage, exception.Message); + } + + [Fact] + public async Task ExceptionInResourceEventHandlerIncludesResource() + { + using var builder = TestDistributedApplicationBuilder.Create(); + var testResource = builder.AddResource(new TestResource("test-resource")); + EventPublishExceptionEvent? capturedExceptionEvent = null; + var tcs = new TaskCompletionSource(); + + builder.Eventing.Subscribe(testResource.Resource, (ResourceReadyEvent @event, CancellationToken ct) => + { + throw new InvalidOperationException("Test exception"); + }); + + builder.Eventing.Subscribe((@event, ct) => + { + capturedExceptionEvent = @event; + tcs.TrySetResult(); + return Task.CompletedTask; + }); + + using var app = builder.Build(); + + // The exception should be rethrown after being published + await Assert.ThrowsAsync(async () => + { + await builder.Eventing.PublishAsync(new ResourceReadyEvent(testResource.Resource, app.Services), EventDispatchBehavior.BlockingSequential); + }); + + // Wait for the async exception handler to complete + await tcs.Task.DefaultTimeout(); + + Assert.NotNull(capturedExceptionEvent); + Assert.Equal(testResource.Resource, capturedExceptionEvent.Resource); + Assert.Equal(typeof(ResourceReadyEvent), capturedExceptionEvent.EventType); + } + + [Fact] + public async Task ExceptionInNonBlockingSequentialHandlerIsPublished() + { + using var builder = TestDistributedApplicationBuilder.Create(); + EventPublishExceptionEvent? capturedExceptionEvent = null; + var tcs = new TaskCompletionSource(); + + builder.Eventing.Subscribe((@event, ct) => + { + throw new InvalidOperationException("Test exception"); + }); + + builder.Eventing.Subscribe((@event, ct) => + { + capturedExceptionEvent = @event; + tcs.TrySetResult(); + return Task.CompletedTask; + }); + + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.NonBlockingSequential); + + // Wait for the async handler to complete + await tcs.Task.DefaultTimeout(); + + Assert.NotNull(capturedExceptionEvent); + Assert.IsType(capturedExceptionEvent.Exception); + } + + [Fact] + public async Task ExceptionInBlockingConcurrentHandlerIsPublished() + { + using var builder = TestDistributedApplicationBuilder.Create(); + EventPublishExceptionEvent? capturedExceptionEvent = null; + var tcs = new TaskCompletionSource(); + + builder.Eventing.Subscribe((@event, ct) => + { + throw new InvalidOperationException("Test exception"); + }); + + builder.Eventing.Subscribe((@event, ct) => + { + capturedExceptionEvent = @event; + tcs.TrySetResult(); + return Task.CompletedTask; + }); + + // The exception should be rethrown after being published + await Assert.ThrowsAsync(async () => + { + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingConcurrent); + }); + + // Wait for the async exception handler to complete + await tcs.Task.DefaultTimeout(); + + Assert.NotNull(capturedExceptionEvent); + Assert.IsType(capturedExceptionEvent.Exception); + } + + [Fact] + public async Task ExceptionInNonBlockingConcurrentHandlerIsPublished() + { + using var builder = TestDistributedApplicationBuilder.Create(); + EventPublishExceptionEvent? capturedExceptionEvent = null; + var tcs = new TaskCompletionSource(); + + builder.Eventing.Subscribe((@event, ct) => + { + throw new InvalidOperationException("Test exception"); + }); + + builder.Eventing.Subscribe((@event, ct) => + { + capturedExceptionEvent = @event; + tcs.TrySetResult(); + return Task.CompletedTask; + }); + + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.NonBlockingConcurrent); + + // Wait for the async handler to complete + await tcs.Task.DefaultTimeout(); + + Assert.NotNull(capturedExceptionEvent); + Assert.IsType(capturedExceptionEvent.Exception); + } + public class DummyEvent : IDistributedApplicationEvent { }