From 97efc6aa85ebc50973e88fd0b0f0a4cb7bcc2da3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Narwojsz?= Date: Fri, 28 Nov 2025 13:50:32 +0100 Subject: [PATCH] Implement automatic reconnection for Google PubSub subscription In case Google PubSub subscription encounters an unrecoverable error, perform normal connection dropped handling: call the OnDropped callback and try to reconnect. --- .../Subscriptions/GooglePubSubSubscription.cs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs b/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs index 4af67201..5686a606 100644 --- a/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs +++ b/src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs @@ -79,6 +79,7 @@ public GooglePubSubSubscription( } Task _subscriberTask = null!; + Task _monitorTask = null!; protected override async ValueTask Subscribe(CancellationToken cancellationToken) { var builder = new SubscriberClientBuilder { Logger = Log.Logger }; @@ -92,6 +93,7 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken _client = await builder.BuildAsync(cancellationToken).NoContext(); _subscriberTask = _client.StartAsync(Handle); + _monitorTask = MonitorSubscriberTask(_subscriberTask, cancellationToken); return; @@ -129,9 +131,26 @@ async Task Handle(PubsubMessage msg, CancellationToken ct) { Metadata AsMeta(MapField attributes) => new(attributes.ToDictionary(x => x.Key, object (x) => x.Value)!); } + async Task MonitorSubscriberTask(Task subscriberTask, CancellationToken cancellationToken) { + try { + await subscriberTask.NoContext(); + + // If the task completes without cancellation, the subscription was dropped + if (!cancellationToken.IsCancellationRequested) { + Dropped(DropReason.Stopped, null); + } + } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { + // Expected when shutting down + } catch (Exception ex) { + // Subscriber task failed with an unrecoverable error + Dropped(DropReason.ServerError, ex); + } + } + protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) { if (_client != null) await _client.StopAsync(cancellationToken).NoContext(); await _subscriberTask.NoContext(); + await _monitorTask.NoContext(); } public async Task CreateSubscription(