From 84001dcdd8be03ccfec08c444dfc22ad6a36fb6e Mon Sep 17 00:00:00 2001 From: Daniel Drizhuk Date: Sat, 29 Nov 2025 15:54:07 +0100 Subject: [PATCH] Process updates before shutdown --- CHANGELOG.md | 4 +++ README.md | 1 + bot.go | 2 ++ get_updates.go | 16 +++++++---- options.go | 11 +++++++ wait_updates.go | 20 ++++++++++--- wait_updates_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 113 insertions(+), 9 deletions(-) create mode 100644 wait_updates_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ee5278e..4ad178b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +- add option `WithProcessReceivedUpdatesBeforeShutdown` to process received updates before shutdown in long-polling mode + ## v1.17.0 (2025-08-18) - api 9.2 (#207) diff --git a/README.md b/README.md index afa701c..c27fa91 100644 --- a/README.md +++ b/README.md @@ -184,6 +184,7 @@ b, err := bot.New("YOUR_BOT_TOKEN_FROM_BOTFATHER", opts...) - `WithAllowedUpdates(params AllowedUpdates)` - set [allowed_updates](https://core.telegram.org/bots/api#getupdates) for getUpdates method - `WithUpdatesChannelCap(cap int)` - set updates channel capacity, by default 1024 - `WithWebhookSecretToken(webhookSecretToken string)` - set X-Telegram-Bot-Api-Secret-Token header sent from telegram servers to confirm validity of update +- `WithProcessReceivedUpdatesBeforeShutdown()` - in long-polling mode, push all received updates to handlers even after shutdown context is canceled (handlers still receive the same context) - `WithWorkers` - set the number of workers that are processing the Updates channel, by default 1 - `UseTestEnvironment()` - use test environment - `WithNotAsyncHandlers()` - allows to run handlers in the main goroutine diff --git a/bot.go b/bot.go index c9ae9da..cffbc73 100644 --- a/bot.go +++ b/bot.go @@ -43,6 +43,8 @@ type Bot struct { workers int notAsyncHandlers bool + processReceivedUpdatesBeforeShutdown bool + defaultHandlerFunc HandlerFunc errorsHandler ErrorsHandler diff --git a/get_updates.go b/get_updates.go index feb7704..5d756e8 100644 --- a/get_updates.go +++ b/get_updates.go @@ -29,6 +29,8 @@ func (b *Bot) getUpdates(ctx context.Context, wg *sync.WaitGroup) { var timeoutAfterError time.Duration + defer close(b.updates) + for { select { case <-ctx.Done(): @@ -72,11 +74,15 @@ func (b *Bot) getUpdates(ctx context.Context, wg *sync.WaitGroup) { for _, upd := range updates { atomic.StoreInt64(&b.lastUpdateID, upd.ID) - select { - case <-ctx.Done(): - b.error("some updates lost, ctx done") - return - case b.updates <- upd: + if b.processReceivedUpdatesBeforeShutdown { + b.updates <- upd + } else { + select { + case <-ctx.Done(): + b.error("some updates lost, ctx done") + return + case b.updates <- upd: + } } } } diff --git a/options.go b/options.go index 0530ab1..b59e895 100644 --- a/options.go +++ b/options.go @@ -83,6 +83,17 @@ func WithHTTPClient(pollTimeout time.Duration, client HttpClient) Option { } } +// WithProcessReceivedUpdatesBeforeShutdown allows to process all received updates +// before shutting down the bot without losing any. The context used to process updates +// will be canceled when the bot is shutting down, but the update will be sent +// to the handlers anyway. It's up to the handlers to make a new context if needed. +// Works only in long polling mode. +func WithProcessReceivedUpdatesBeforeShutdown() Option { + return func(b *Bot) { + b.processReceivedUpdatesBeforeShutdown = true + } +} + // WithServerURL allows to set custom server url func WithServerURL(serverURL string) Option { return func(b *Bot) { diff --git a/wait_updates.go b/wait_updates.go index 958b452..e05fb76 100644 --- a/wait_updates.go +++ b/wait_updates.go @@ -10,11 +10,23 @@ func (b *Bot) waitUpdates(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() for { - select { - case <-ctx.Done(): - return - case upd := <-b.updates: + if b.processReceivedUpdatesBeforeShutdown { + upd, ok := <-b.updates + if !ok { + return + } b.ProcessUpdate(ctx, upd) + } else { + select { + case <-ctx.Done(): + return + case upd, ok := <-b.updates: + if !ok { + return + } + b.ProcessUpdate(ctx, upd) + } } } + } diff --git a/wait_updates_test.go b/wait_updates_test.go new file mode 100644 index 0000000..484eb88 --- /dev/null +++ b/wait_updates_test.go @@ -0,0 +1,68 @@ +package bot + +import ( + "context" + "sync" + "testing" + + "github.com/go-telegram/bot/models" +) + +func TestWaitUpdates_ProcessAllAfterContextCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var processed []int64 + b := &Bot{ + processReceivedUpdatesBeforeShutdown: true, + notAsyncHandlers: true, + updates: make(chan *models.Update, 2), + defaultHandlerFunc: func(ctx context.Context, b *Bot, update *models.Update) { + processed = append(processed, update.ID) + }, + } + + wg := sync.WaitGroup{} + wg.Add(1) + go b.waitUpdates(ctx, &wg) + + b.updates <- &models.Update{ID: 1} + cancel() // simulate shutdown while updates are still coming in + b.updates <- &models.Update{ID: 2} + close(b.updates) + + wg.Wait() + + if len(processed) != 2 { + t.Fatalf("expected 2 updates processed, got %d", len(processed)) + } + if processed[0] != 1 || processed[1] != 2 { + t.Fatalf("unexpected updates order: %v", processed) + } +} + +func TestWaitUpdates_StopsWhenChannelClosed(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var processed int + b := &Bot{ + processReceivedUpdatesBeforeShutdown: true, + notAsyncHandlers: true, + updates: make(chan *models.Update), + defaultHandlerFunc: func(ctx context.Context, b *Bot, update *models.Update) { + processed++ + }, + } + + wg := sync.WaitGroup{} + wg.Add(1) + go b.waitUpdates(ctx, &wg) + + close(b.updates) + wg.Wait() + + if processed != 0 { + t.Fatalf("expected 0 updates processed after channel close, got %d", processed) + } +}