diff --git a/bridge/discord/discord.go b/bridge/discord/discord.go index 3f99da0cee..f1aa45a43b 100644 --- a/bridge/discord/discord.go +++ b/bridge/discord/discord.go @@ -3,6 +3,7 @@ package bdiscord import ( "bytes" "fmt" + "strconv" "strings" "sync" @@ -14,6 +15,33 @@ import ( lru "github.com/hashicorp/golang-lru" ) +// REASON: Slack thread IDs (e.g., "slack 1772220754.010239") can leak through as +// ParentID when the gateway message cache doesn't have a Discord-native mapping. +// Discord requires a purely numeric snowflake for message_reference. If we pass a +// non-snowflake string, Discord returns 400 "NUMBER_TYPE_COERCE". +func isValidSnowflake(id string) bool { + if id == "" { + return false + } + _, err := strconv.ParseUint(id, 10, 64) + return err == nil +} + +// REASON: When a Slack timestamp like "1772220754.010239" leaks through, we strip +// non-digit characters to produce a numeric string (e.g., "1772220754010239"). +// This won't match a real Discord message, but it satisfies format validation so +// Discord returns a cleaner "Unknown Message" error instead of a type coercion error, +// which we then catch and retry without threading. +func extractNumericID(id string) string { + var result strings.Builder + for _, c := range id { + if c >= '0' && c <= '9' { + result.WriteRune(c) + } + } + return result.String() +} + const ( MessageLength = 1950 cFileUpload = "file_upload" @@ -357,16 +385,41 @@ func (b *Bdiscord) handleEventBotUser(msg *config.Message, channelID string) (st AllowedMentions: b.getAllowedMentions(), } + // REASON: The gateway may pass a non-snowflake ParentID (e.g., a Slack + // timestamp like "slack 1772220754.010239") when the message cache misses. + // We validate upfront, try a numeric-only extraction as fallback, and if + // the send still fails with a snowflake/form error, retry without threading + // entirely. Without this, thread replies from Slack silently get dropped. if msg.ParentValid() { - m.Reference = &discordgo.MessageReference{ - MessageID: msg.ParentID, - ChannelID: channelID, - GuildID: b.guildID, + parentID := msg.ParentID + if isValidSnowflake(parentID) { + m.Reference = &discordgo.MessageReference{ + MessageID: parentID, + ChannelID: channelID, + GuildID: b.guildID, + } + } else { + numericID := extractNumericID(parentID) + if isValidSnowflake(numericID) { + b.Log.Debugf("ParentID %q is not a snowflake, trying numeric extraction: %s", parentID, numericID) + m.Reference = &discordgo.MessageReference{ + MessageID: numericID, + ChannelID: channelID, + GuildID: b.guildID, + } + } else { + b.Log.Warnf("Dropping thread reference: ParentID %q is not a valid Discord snowflake", parentID) + } } } - // Post normal message + // Post normal message, retry without thread reference if Discord rejects it res, err := b.c.ChannelMessageSendComplex(channelID, &m) + if err != nil && m.Reference != nil && (strings.Contains(err.Error(), "snowflake") || strings.Contains(err.Error(), "NUMBER_TYPE_COERCE") || strings.Contains(err.Error(), "Unknown message")) { + b.Log.Warnf("Thread reference rejected by Discord (%s), retrying without threading", err) + m.Reference = nil + res, err = b.c.ChannelMessageSendComplex(channelID, &m) + } if err != nil { return "", err } diff --git a/bridge/slack/helpers.go b/bridge/slack/helpers.go index 309b3af84e..765151f235 100644 --- a/bridge/slack/helpers.go +++ b/bridge/slack/helpers.go @@ -100,6 +100,7 @@ func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config var err error var bot *slack.Bot + var rateLimitHits int for { bot, err = b.rtm.GetBotInfo(slack.GetBotInfoParameters{ Bot: ev.BotID, @@ -108,7 +109,7 @@ func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config break } - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { b.Log.Errorf("Could not retrieve bot information: %#v", err) return err } @@ -223,6 +224,7 @@ func (b *Bslack) replaceCodeFence(text string) string { // getUsersInConversation returns an array of userIDs that are members of channelID func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { channelMembers := []string{} + var rateLimitHits int for { queryParams := &slack.GetUsersInConversationParameters{ ChannelID: channelID, @@ -230,11 +232,12 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { members, nextCursor, err := b.sc.GetUsersInConversation(queryParams) if err != nil { - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err) } continue } + rateLimitHits = 0 channelMembers = append(channelMembers, members...) @@ -246,12 +249,32 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { return channelMembers, nil } -func handleRateLimit(log *logrus.Entry, err error) error { +// REASON: Slack rate limits can cascade during reconnection bursts when matterbridge +// re-fetches channel/user info for many channels at once. Using just RetryAfter +// causes tight retry loops that keep hitting the limit. Exponential backoff +// (RetryAfter * 2^consecutiveHits) progressively backs off, and a max retry cap +// prevents infinite loops. The consecutiveHits counter resets on success (non-rate-limit). +func handleRateLimit(log *logrus.Entry, err error, consecutiveHits *int) error { rateLimit, ok := err.(*slack.RateLimitedError) if !ok { + *consecutiveHits = 0 return err } - log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter) - time.Sleep(rateLimit.RetryAfter) + + const maxConsecutiveRateLimits = 10 + if *consecutiveHits >= maxConsecutiveRateLimits { + return fmt.Errorf("giving up after %d consecutive rate limits: %w", *consecutiveHits, err) + } + + // Exponential backoff: base wait from Slack's RetryAfter, doubled per consecutive hit + multiplier := 1 << uint(*consecutiveHits) + backoff := rateLimit.RetryAfter * time.Duration(multiplier) + // Cap at 5 minutes to avoid absurdly long waits + if backoff > 5*time.Minute { + backoff = 5 * time.Minute + } + log.Infof("Rate-limited by Slack (attempt %d). Sleeping for %v", *consecutiveHits+1, backoff) + time.Sleep(backoff) + *consecutiveHits++ return nil } diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index c39c60826d..5282c237e3 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -352,12 +352,13 @@ func (b *Bslack) updateTopicOrPurpose(msg *config.Message, channelInfo *slack.Ch b.Log.Errorf("Unhandled type received from extractTopicOrPurpose: %s", incomingChangeType) return nil } + var rateLimitHits int for { _, err := updateFunc(channelInfo.ID, text) if err == nil { return nil } - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { return err } } @@ -392,13 +393,14 @@ func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel) return true, nil } + var rateLimitHits int for { _, _, err := b.rtm.DeleteMessage(channelInfo.ID, msg.ID) if err == nil { return true, nil } - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { b.Log.Errorf("Failed to delete user message from Slack: %#v", err) return true, err } @@ -410,13 +412,14 @@ func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (b return false, nil } messageOptions := b.prepareMessageOptions(msg) + var rateLimitHits int for { _, _, _, err := b.rtm.UpdateMessage(channelInfo.ID, msg.ID, messageOptions...) if err == nil { return true, nil } - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { b.Log.Errorf("Failed to edit user message on Slack: %#v", err) return true, err } @@ -429,13 +432,14 @@ func (b *Bslack) postMessage(msg *config.Message, channelInfo *slack.Channel) (s return "", nil } messageOptions := b.prepareMessageOptions(msg) + var rateLimitHits int for { _, id, err := b.rtm.PostMessage(channelInfo.ID, messageOptions...) if err == nil { return id, nil } - if err = handleRateLimit(b.Log, err); err != nil { + if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil { b.Log.Errorf("Failed to sent user message to Slack: %#v", err) return "", err } diff --git a/bridge/slack/users_channels.go b/bridge/slack/users_channels.go index 85b944bd50..47acab2c9d 100644 --- a/bridge/slack/users_channels.go +++ b/bridge/slack/users_channels.go @@ -138,6 +138,7 @@ func (b *users) populateUsers(wait bool) { newUsers := map[string]*slack.User{} pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200)) count := 0 + var rateLimitHits int for { var err error pagination, err = pagination.Next(context.Background()) @@ -147,12 +148,13 @@ func (b *users) populateUsers(wait bool) { break } - if err = handleRateLimit(b.log, err); err != nil { + if err = handleRateLimit(b.log, err, &rateLimitHits); err != nil { b.log.Errorf("Could not retrieve users: %#v", err) return } continue } + rateLimitHits = 0 for i := range pagination.Users { newUsers[pagination.Users[i].ID] = &pagination.Users[i] @@ -293,15 +295,17 @@ func (b *channels) populateChannels(wait bool) { Types: []string{"public_channel,private_channel"}, Limit: 1000, } + var rateLimitHits int for { channels, nextCursor, err := b.sc.GetConversations(queryParams) if err != nil { - if err = handleRateLimit(b.log, err); err != nil { + if err = handleRateLimit(b.log, err, &rateLimitHits); err != nil { b.log.Errorf("Could not retrieve channels: %#v", err) return } continue } + rateLimitHits = 0 for i := range channels { newChannelsByID[channels[i].ID] = &channels[i] diff --git a/gateway/handlers.go b/gateway/handlers.go index 44cefe4506..ab87072746 100644 --- a/gateway/handlers.go +++ b/gateway/handlers.go @@ -223,7 +223,26 @@ func (gw *Gateway) handleMessage(rmsg *config.Message, dest *bridge.Bridge) []*B channels := gw.getDestChannel(rmsg, *dest) for idx := range channels { channel := &channels[idx] - msgID, err := gw.SendMessage(rmsg, dest, channel, canonicalParentMsgID) + + // REASON: Discord (and other APIs) occasionally return 503 Service Unavailable + // during transient outages. Without retry, the message is silently dropped. + // We retry up to 3 times with exponential backoff (1s, 2s, 4s) for 503 errors only. + var msgID string + var err error + const maxRetries = 3 + for attempt := 0; attempt < maxRetries; attempt++ { + msgID, err = gw.SendMessage(rmsg, dest, channel, canonicalParentMsgID) + if err == nil { + break + } + if strings.Contains(err.Error(), "503") || strings.Contains(err.Error(), "Service Unavailable") { + backoff := time.Duration(1<