diff --git a/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs b/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs index 4af67201..5686a606 100644 --- a/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs +++ b/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs @@ -79,6 +79,7 @@ public GooglePubSubSubscription( } Task _subscriberTask = null!; + Task _monitorTask = null!; protected override async ValueTask Subscribe(CancellationToken cancellationToken) { var builder = new SubscriberClientBuilder { Logger = Log.Logger }; @@ -92,6 +93,7 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken _client = await builder.BuildAsync(cancellationToken).NoContext(); _subscriberTask = _client.StartAsync(Handle); + _monitorTask = MonitorSubscriberTask(_subscriberTask, cancellationToken); return; @@ -129,9 +131,26 @@ async Task Handle(PubsubMessage msg, CancellationToken ct) { Metadata AsMeta(MapField attributes) => new(attributes.ToDictionary(x => x.Key, object (x) => x.Value)!); } + async Task MonitorSubscriberTask(Task subscriberTask, CancellationToken cancellationToken) { + try { + await subscriberTask.NoContext(); + + // If the task completes without cancellation, the subscription was dropped + if (!cancellationToken.IsCancellationRequested) { + Dropped(DropReason.Stopped, null); + } + } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { + // Expected when shutting down + } catch (Exception ex) { + // Subscriber task failed with an unrecoverable error + Dropped(DropReason.ServerError, ex); + } + } + protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) { if (_client != null) await _client.StopAsync(cancellationToken).NoContext(); await _subscriberTask.NoContext(); + await _monitorTask.NoContext(); } public async Task CreateSubscription(