Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public GooglePubSubSubscription(
}

Task _subscriberTask = null!;
Task _monitorTask = null!;

protected override async ValueTask Subscribe(CancellationToken cancellationToken) {
var builder = new SubscriberClientBuilder { Logger = Log.Logger };
Expand All @@ -92,6 +93,7 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken
_client = await builder.BuildAsync(cancellationToken).NoContext();

_subscriberTask = _client.StartAsync(Handle);
_monitorTask = MonitorSubscriberTask(_subscriberTask, cancellationToken);

return;

Expand Down Expand Up @@ -129,9 +131,26 @@ async Task<Reply> Handle(PubsubMessage msg, CancellationToken ct) {
Metadata AsMeta(MapField<string, string> attributes) => new(attributes.ToDictionary(x => x.Key, object (x) => x.Value)!);
}

async Task MonitorSubscriberTask(Task subscriberTask, CancellationToken cancellationToken) {
try {
await subscriberTask.NoContext();

// If the task completes without cancellation, the subscription was dropped
if (!cancellationToken.IsCancellationRequested) {
Dropped(DropReason.Stopped, null);
}
} catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) {
// Expected when shutting down
} catch (Exception ex) {
// Subscriber task failed with an unrecoverable error
Dropped(DropReason.ServerError, ex);
}
}

protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) {
if (_client != null) await _client.StopAsync(cancellationToken).NoContext();
await _subscriberTask.NoContext();
await _monitorTask.NoContext();
}

public async Task CreateSubscription(
Expand Down
Loading