From a328b11b05cde532298ad4ccc3cd4886bb80d22c Mon Sep 17 00:00:00 2001 From: Divide-By-0 Date: Fri, 27 Mar 2026 16:12:30 -0700 Subject: [PATCH] Fix Discord snowflake validation, add 503 retry, and Slack rate limit backoff When bridging Slack threads to Discord, the gateway can pass a Slack message timestamp (e.g., "slack 1772220754.010239") as the ParentID when the message cache misses. Discord rejects this as "not snowflake", silently dropping the message. This adds upfront validation of the ParentID, attempts numeric extraction as a fallback, and retries without threading if Discord still rejects the reference. Also adds retry with exponential backoff (1s, 2s, 4s) at the gateway level for transient 503 Service Unavailable errors from Discord. Finally, upgrades the Slack rate limit handler to use exponential backoff (RetryAfter * 2^consecutiveHits), capped at 5 minutes and 10 consecutive hits, to prevent tight retry loops during reconnection bursts. Co-Authored-By: Claude Opus 4.6 (1M context) --- bridge/discord/discord.go | 63 +++++++++++++++++++++++++++++++--- bridge/slack/helpers.go | 33 +++++++++++++++--- bridge/slack/slack.go | 12 ++++--- bridge/slack/users_channels.go | 8 +++-- gateway/handlers.go | 21 +++++++++++- 5 files changed, 120 insertions(+), 17 deletions(-) diff --git a/bridge/discord/discord.go b/bridge/discord/discord.go index 3f99da0cee..f1aa45a43b 100644 --- a/bridge/discord/discord.go +++ b/bridge/discord/discord.go @@ -3,6 +3,7 @@ package bdiscord import ( "bytes" "fmt" + "strconv" "strings" "sync" @@ -14,6 +15,33 @@ import ( lru "github.com/hashicorp/golang-lru" ) +// REASON: Slack thread IDs (e.g., "slack 1772220754.010239") can leak through as +// ParentID when the gateway message cache doesn't have a Discord-native mapping. +// Discord requires a purely numeric snowflake for message_reference. If we pass a +// non-snowflake string, Discord returns 400 "NUMBER_TYPE_COERCE". +func isValidSnowflake(id string) bool { + if id == "" { + return false + } + _, err := strconv.ParseUint(id, 10, 64) + return err == nil +} + +// REASON: When a Slack timestamp like "1772220754.010239" leaks through, we strip +// non-digit characters to produce a numeric string (e.g., "1772220754010239"). +// This won't match a real Discord message, but it satisfies format validation so +// Discord returns a cleaner "Unknown Message" error instead of a type coercion error, +// which we then catch and retry without threading. +func extractNumericID(id string) string { + var result strings.Builder + for _, c := range id { + if c >= '0' && c <= '9' { + result.WriteRune(c) + } + } + return result.String() +} + const ( MessageLength = 1950 cFileUpload = "file_upload" @@ -357,16 +385,41 @@ func (b *Bdiscord) handleEventBotUser(msg *config.Message, channelID string) (st AllowedMentions: b.getAllowedMentions(), } + // REASON: The gateway may pass a non-snowflake ParentID (e.g., a Slack + // timestamp like "slack 1772220754.010239") when the message cache misses. + // We validate upfront, try a numeric-only extraction as fallback, and if + // the send still fails with a snowflake/form error, retry without threading + // entirely. Without this, thread replies from Slack silently get dropped. if msg.ParentValid() { - m.Reference = &discordgo.MessageReference{ - MessageID: msg.ParentID, - ChannelID: channelID, - GuildID: b.guildID, + parentID := msg.ParentID + if isValidSnowflake(parentID) { + m.Reference = &discordgo.MessageReference{ + MessageID: parentID, + ChannelID: channelID, + GuildID: b.guildID, + } + } else { + numericID := extractNumericID(parentID) + if isValidSnowflake(numericID) { + b.Log.Debugf("ParentID %q is not a snowflake, trying numeric extraction: %s", parentID, numericID) + m.Reference = &discordgo.MessageReference{ + MessageID: numericID, + ChannelID: channelID, + GuildID: b.guildID, + } + } else { + b.Log.Warnf("Dropping thread reference: ParentID %q is not a valid Discord snowflake", parentID) + } } } - // Post normal message + // Post normal message, retry without thread reference if Discord rejects it res, err := b.c.ChannelMessageSendComplex(channelID, &m) + if err != nil && m.Reference != nil && (strings.Contains(err.Error(), "snowflake") || strings.Contains(err.Error(), "NUMBER_TYPE_COERCE") || strings.Contains(err.Error(), "Unknown message")) { + b.Log.Warnf("Thread reference rejected by Discord (%s), retrying without threading", err) + m.Reference = nil + res, err = b.c.ChannelMessageSendComplex(channelID, &m) + } if err != nil { return "", err } diff --git a/bridge/slack/helpers.go b/bridge/slack/helpers.go index 309b3af84e..765151f235 100644 --- a/bridge/slack/helpers.go +++ b/bridge/slack/helpers.go @@ -100,6 +100,7 @@ func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config var err error var bot *slack.Bot + var rateLimitHits int for { bot, err = b.rtm.GetBotInfo(slack.GetBotInfoParameters{ Bot: ev.BotID, @@ -108,7 +109,7 @@ func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config break } - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { b.Log.Errorf("Could not retrieve bot information: %#v", err) return err } @@ -223,6 +224,7 @@ func (b *Bslack) replaceCodeFence(text string) string { // getUsersInConversation returns an array of userIDs that are members of channelID func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { channelMembers := []string{} + var rateLimitHits int for { queryParams := &slack.GetUsersInConversationParameters{ ChannelID: channelID, @@ -230,11 +232,12 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { members, nextCursor, err := b.sc.GetUsersInConversation(queryParams) if err != nil { - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err) } continue } + rateLimitHits = 0 channelMembers = append(channelMembers, members...) @@ -246,12 +249,32 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { return channelMembers, nil } -func handleRateLimit(log *logrus.Entry, err error) error { +// REASON: Slack rate limits can cascade during reconnection bursts when matterbridge +// re-fetches channel/user info for many channels at once. Using just RetryAfter +// causes tight retry loops that keep hitting the limit. Exponential backoff +// (RetryAfter * 2^consecutiveHits) progressively backs off, and a max retry cap +// prevents infinite loops. The consecutiveHits counter resets on success (non-rate-limit). +func handleRateLimit(log *logrus.Entry, err error, consecutiveHits *int) error { rateLimit, ok := err.(*slack.RateLimitedError) if !ok { + *consecutiveHits = 0 return err } - log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter) - time.Sleep(rateLimit.RetryAfter) + + const maxConsecutiveRateLimits = 10 + if *consecutiveHits >= maxConsecutiveRateLimits { + return fmt.Errorf("giving up after %d consecutive rate limits: %w", *consecutiveHits, err) + } + + // Exponential backoff: base wait from Slack's RetryAfter, doubled per consecutive hit + multiplier := 1 << uint(*consecutiveHits) + backoff := rateLimit.RetryAfter * time.Duration(multiplier) + // Cap at 5 minutes to avoid absurdly long waits + if backoff > 5*time.Minute { + backoff = 5 * time.Minute + } + log.Infof("Rate-limited by Slack (attempt %d). Sleeping for %v", *consecutiveHits+1, backoff) + time.Sleep(backoff) + *consecutiveHits++ return nil } diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index c39c60826d..5282c237e3 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -352,12 +352,13 @@ func (b *Bslack) updateTopicOrPurpose(msg *config.Message, channelInfo *slack.Ch b.Log.Errorf("Unhandled type received from extractTopicOrPurpose: %s", incomingChangeType) return nil } + var rateLimitHits int for { _, err := updateFunc(channelInfo.ID, text) if err == nil { return nil } - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { return err } } @@ -392,13 +393,14 @@ func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel) return true, nil } + var rateLimitHits int for { _, _, err := b.rtm.DeleteMessage(channelInfo.ID, msg.ID) if err == nil { return true, nil } - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { b.Log.Errorf("Failed to delete user message from Slack: %#v", err) return true, err } @@ -410,13 +412,14 @@ func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (b return false, nil } messageOptions := b.prepareMessageOptions(msg) + var rateLimitHits int for { _, _, _, err := b.rtm.UpdateMessage(channelInfo.ID, msg.ID, messageOptions...) if err == nil { return true, nil } - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { b.Log.Errorf("Failed to edit user message on Slack: %#v", err) return true, err } @@ -429,13 +432,14 @@ func (b *Bslack) postMessage(msg *config.Message, channelInfo *slack.Channel) (s return "", nil } messageOptions := b.prepareMessageOptions(msg) + var rateLimitHits int for { _, id, err := b.rtm.PostMessage(channelInfo.ID, messageOptions...) if err == nil { return id, nil } - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { b.Log.Errorf("Failed to sent user message to Slack: %#v", err) return "", err } diff --git a/bridge/slack/users_channels.go b/bridge/slack/users_channels.go index 85b944bd50..47acab2c9d 100644 --- a/bridge/slack/users_channels.go +++ b/bridge/slack/users_channels.go @@ -138,6 +138,7 @@ func (b *users) populateUsers(wait bool) { newUsers := map[string]*slack.User{} pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200)) count := 0 + var rateLimitHits int for { var err error pagination, err = pagination.Next(context.Background()) @@ -147,12 +148,13 @@ func (b *users) populateUsers(wait bool) { break } - if err = handleRateLimit(b.log, err); err != nil { + if err = handleRateLimit(b.log, err, &rateLimitHits); err != nil { b.log.Errorf("Could not retrieve users: %#v", err) return } continue } + rateLimitHits = 0 for i := range pagination.Users { newUsers[pagination.Users[i].ID] = &pagination.Users[i] @@ -293,15 +295,17 @@ func (b *channels) populateChannels(wait bool) { Types: []string{"public_channel,private_channel"}, Limit: 1000, } + var rateLimitHits int for { channels, nextCursor, err := b.sc.GetConversations(queryParams) if err != nil { - if err = handleRateLimit(b.log, err); err != nil { + if err = handleRateLimit(b.log, err, &rateLimitHits); err != nil { b.log.Errorf("Could not retrieve channels: %#v", err) return } continue } + rateLimitHits = 0 for i := range channels { newChannelsByID[channels[i].ID] = &channels[i] diff --git a/gateway/handlers.go b/gateway/handlers.go index 44cefe4506..ab87072746 100644 --- a/gateway/handlers.go +++ b/gateway/handlers.go @@ -223,7 +223,26 @@ func (gw *Gateway) handleMessage(rmsg *config.Message, dest *bridge.Bridge) []*B channels := gw.getDestChannel(rmsg, *dest) for idx := range channels { channel := &channels[idx] - msgID, err := gw.SendMessage(rmsg, dest, channel, canonicalParentMsgID) + + // REASON: Discord (and other APIs) occasionally return 503 Service Unavailable + // during transient outages. Without retry, the message is silently dropped. + // We retry up to 3 times with exponential backoff (1s, 2s, 4s) for 503 errors only. + var msgID string + var err error + const maxRetries = 3 + for attempt := 0; attempt < maxRetries; attempt++ { + msgID, err = gw.SendMessage(rmsg, dest, channel, canonicalParentMsgID) + if err == nil { + break + } + if strings.Contains(err.Error(), "503") || strings.Contains(err.Error(), "Service Unavailable") { + backoff := time.Duration(1<