Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

- add option `WithProcessReceivedUpdatesBeforeShutdown` to process received updates before shutdown in long-polling mode

## v1.17.0 (2025-08-18)

- api 9.2 (#207)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ b, err := bot.New("YOUR_BOT_TOKEN_FROM_BOTFATHER", opts...)
- `WithAllowedUpdates(params AllowedUpdates)` - set [allowed_updates](https://core.telegram.org/bots/api#getupdates) for getUpdates method
- `WithUpdatesChannelCap(cap int)` - set updates channel capacity, by default 1024
- `WithWebhookSecretToken(webhookSecretToken string)` - set X-Telegram-Bot-Api-Secret-Token header sent from telegram servers to confirm validity of update
- `WithProcessReceivedUpdatesBeforeShutdown()` - in long-polling mode, push all received updates to handlers even after shutdown context is canceled (handlers still receive the same context)
- `WithWorkers` - set the number of workers that are processing the Updates channel, by default 1
- `UseTestEnvironment()` - use test environment
- `WithNotAsyncHandlers()` - allows to run handlers in the main goroutine
Expand Down
2 changes: 2 additions & 0 deletions bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Bot struct {
workers int
notAsyncHandlers bool

processReceivedUpdatesBeforeShutdown bool

defaultHandlerFunc HandlerFunc

errorsHandler ErrorsHandler
Expand Down
16 changes: 11 additions & 5 deletions get_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func (b *Bot) getUpdates(ctx context.Context, wg *sync.WaitGroup) {

var timeoutAfterError time.Duration

defer close(b.updates)

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -72,11 +74,15 @@ func (b *Bot) getUpdates(ctx context.Context, wg *sync.WaitGroup) {

for _, upd := range updates {
atomic.StoreInt64(&b.lastUpdateID, upd.ID)
select {
case <-ctx.Done():
b.error("some updates lost, ctx done")
return
case b.updates <- upd:
if b.processReceivedUpdatesBeforeShutdown {
b.updates <- upd
} else {
select {
case <-ctx.Done():
b.error("some updates lost, ctx done")
return
case b.updates <- upd:
}
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ func WithHTTPClient(pollTimeout time.Duration, client HttpClient) Option {
}
}

// WithProcessReceivedUpdatesBeforeShutdown allows to process all received updates
// before shutting down the bot without losing any. The context used to process updates
// will be canceled when the bot is shutting down, but the update will be sent
// to the handlers anyway. It's up to the handlers to make a new context if needed.
// Works only in long polling mode.
func WithProcessReceivedUpdatesBeforeShutdown() Option {
return func(b *Bot) {
b.processReceivedUpdatesBeforeShutdown = true
}
}

// WithServerURL allows to set custom server url
func WithServerURL(serverURL string) Option {
return func(b *Bot) {
Expand Down
20 changes: 16 additions & 4 deletions wait_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,23 @@ func (b *Bot) waitUpdates(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
case <-ctx.Done():
return
case upd := <-b.updates:
if b.processReceivedUpdatesBeforeShutdown {
upd, ok := <-b.updates
if !ok {
return
}
b.ProcessUpdate(ctx, upd)
} else {
select {
case <-ctx.Done():
return
case upd, ok := <-b.updates:
if !ok {
return
}
b.ProcessUpdate(ctx, upd)
}
}
}

}
68 changes: 68 additions & 0 deletions wait_updates_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package bot

import (
"context"
"sync"
"testing"

"github.com/go-telegram/bot/models"
)

func TestWaitUpdates_ProcessAllAfterContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var processed []int64
b := &Bot{
processReceivedUpdatesBeforeShutdown: true,
notAsyncHandlers: true,
updates: make(chan *models.Update, 2),
defaultHandlerFunc: func(ctx context.Context, b *Bot, update *models.Update) {
processed = append(processed, update.ID)
},
}

wg := sync.WaitGroup{}
wg.Add(1)
go b.waitUpdates(ctx, &wg)

b.updates <- &models.Update{ID: 1}
cancel() // simulate shutdown while updates are still coming in
b.updates <- &models.Update{ID: 2}
close(b.updates)

wg.Wait()

if len(processed) != 2 {
t.Fatalf("expected 2 updates processed, got %d", len(processed))
}
if processed[0] != 1 || processed[1] != 2 {
t.Fatalf("unexpected updates order: %v", processed)
}
}

func TestWaitUpdates_StopsWhenChannelClosed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var processed int
b := &Bot{
processReceivedUpdatesBeforeShutdown: true,
notAsyncHandlers: true,
updates: make(chan *models.Update),
defaultHandlerFunc: func(ctx context.Context, b *Bot, update *models.Update) {
processed++
},
}

wg := sync.WaitGroup{}
wg.Add(1)
go b.waitUpdates(ctx, &wg)

close(b.updates)
wg.Wait()

if processed != 0 {
t.Fatalf("expected 0 updates processed after channel close, got %d", processed)
}
}