Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions bridge/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bdiscord
import (
"bytes"
"fmt"
"strconv"
"strings"
"sync"

Expand All @@ -14,6 +15,33 @@ import (
lru "github.com/hashicorp/golang-lru"
)

// REASON: Slack thread IDs (e.g., "slack 1772220754.010239") can leak through as
// ParentID when the gateway message cache doesn't have a Discord-native mapping.
// Discord requires a purely numeric snowflake for message_reference. If we pass a
// non-snowflake string, Discord returns 400 "NUMBER_TYPE_COERCE".
func isValidSnowflake(id string) bool {
if id == "" {
return false
}
_, err := strconv.ParseUint(id, 10, 64)
return err == nil
}

// REASON: When a Slack timestamp like "1772220754.010239" leaks through, we strip
// non-digit characters to produce a numeric string (e.g., "1772220754010239").
// This won't match a real Discord message, but it satisfies format validation so
// Discord returns a cleaner "Unknown Message" error instead of a type coercion error,
// which we then catch and retry without threading.
func extractNumericID(id string) string {
var result strings.Builder
for _, c := range id {
if c >= '0' && c <= '9' {
result.WriteRune(c)
}
}
return result.String()
}

const (
MessageLength = 1950
cFileUpload = "file_upload"
Expand Down Expand Up @@ -357,16 +385,41 @@ func (b *Bdiscord) handleEventBotUser(msg *config.Message, channelID string) (st
AllowedMentions: b.getAllowedMentions(),
}

// REASON: The gateway may pass a non-snowflake ParentID (e.g., a Slack
// timestamp like "slack 1772220754.010239") when the message cache misses.
// We validate upfront, try a numeric-only extraction as fallback, and if
// the send still fails with a snowflake/form error, retry without threading
// entirely. Without this, thread replies from Slack silently get dropped.
if msg.ParentValid() {
m.Reference = &discordgo.MessageReference{
MessageID: msg.ParentID,
ChannelID: channelID,
GuildID: b.guildID,
parentID := msg.ParentID
if isValidSnowflake(parentID) {
m.Reference = &discordgo.MessageReference{
MessageID: parentID,
ChannelID: channelID,
GuildID: b.guildID,
}
} else {
numericID := extractNumericID(parentID)
if isValidSnowflake(numericID) {
b.Log.Debugf("ParentID %q is not a snowflake, trying numeric extraction: %s", parentID, numericID)
m.Reference = &discordgo.MessageReference{
MessageID: numericID,
ChannelID: channelID,
GuildID: b.guildID,
}
} else {
b.Log.Warnf("Dropping thread reference: ParentID %q is not a valid Discord snowflake", parentID)
}
}
}

// Post normal message
// Post normal message, retry without thread reference if Discord rejects it
res, err := b.c.ChannelMessageSendComplex(channelID, &m)
if err != nil && m.Reference != nil && (strings.Contains(err.Error(), "snowflake") || strings.Contains(err.Error(), "NUMBER_TYPE_COERCE") || strings.Contains(err.Error(), "Unknown message")) {
b.Log.Warnf("Thread reference rejected by Discord (%s), retrying without threading", err)
m.Reference = nil
res, err = b.c.ChannelMessageSendComplex(channelID, &m)
}
if err != nil {
return "", err
}
Expand Down
33 changes: 28 additions & 5 deletions bridge/slack/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config

var err error
var bot *slack.Bot
var rateLimitHits int
for {
bot, err = b.rtm.GetBotInfo(slack.GetBotInfoParameters{
Bot: ev.BotID,
Expand All @@ -108,7 +109,7 @@ func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config
break
}

if err = handleRateLimit(b.Log, err); err != nil {
if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil {
b.Log.Errorf("Could not retrieve bot information: %#v", err)
return err
}
Expand Down Expand Up @@ -223,18 +224,20 @@ func (b *Bslack) replaceCodeFence(text string) string {
// getUsersInConversation returns an array of userIDs that are members of channelID
func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
channelMembers := []string{}
var rateLimitHits int
for {
queryParams := &slack.GetUsersInConversationParameters{
ChannelID: channelID,
}

members, nextCursor, err := b.sc.GetUsersInConversation(queryParams)
if err != nil {
if err = handleRateLimit(b.Log, err); err != nil {
if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil {
return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err)
}
continue
}
rateLimitHits = 0

channelMembers = append(channelMembers, members...)

Expand All @@ -246,12 +249,32 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
return channelMembers, nil
}

func handleRateLimit(log *logrus.Entry, err error) error {
// REASON: Slack rate limits can cascade during reconnection bursts when matterbridge
// re-fetches channel/user info for many channels at once. Using just RetryAfter
// causes tight retry loops that keep hitting the limit. Exponential backoff
// (RetryAfter * 2^consecutiveHits) progressively backs off, and a max retry cap
// prevents infinite loops. The consecutiveHits counter resets on success (non-rate-limit).
func handleRateLimit(log *logrus.Entry, err error, consecutiveHits *int) error {
rateLimit, ok := err.(*slack.RateLimitedError)
if !ok {
*consecutiveHits = 0
return err
}
log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter)
time.Sleep(rateLimit.RetryAfter)

const maxConsecutiveRateLimits = 10
if *consecutiveHits >= maxConsecutiveRateLimits {
return fmt.Errorf("giving up after %d consecutive rate limits: %w", *consecutiveHits, err)
}

// Exponential backoff: base wait from Slack's RetryAfter, doubled per consecutive hit
multiplier := 1 << uint(*consecutiveHits)
backoff := rateLimit.RetryAfter * time.Duration(multiplier)
// Cap at 5 minutes to avoid absurdly long waits
if backoff > 5*time.Minute {
backoff = 5 * time.Minute
}
log.Infof("Rate-limited by Slack (attempt %d). Sleeping for %v", *consecutiveHits+1, backoff)
time.Sleep(backoff)
*consecutiveHits++
return nil
}
12 changes: 8 additions & 4 deletions bridge/slack/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,13 @@ func (b *Bslack) updateTopicOrPurpose(msg *config.Message, channelInfo *slack.Ch
b.Log.Errorf("Unhandled type received from extractTopicOrPurpose: %s", incomingChangeType)
return nil
}
var rateLimitHits int
for {
_, err := updateFunc(channelInfo.ID, text)
if err == nil {
return nil
}
if err = handleRateLimit(b.Log, err); err != nil {
if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil {
return err
}
}
Expand Down Expand Up @@ -392,13 +393,14 @@ func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel)
return true, nil
}

var rateLimitHits int
for {
_, _, err := b.rtm.DeleteMessage(channelInfo.ID, msg.ID)
if err == nil {
return true, nil
}

if err = handleRateLimit(b.Log, err); err != nil {
if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil {
b.Log.Errorf("Failed to delete user message from Slack: %#v", err)
return true, err
}
Expand All @@ -410,13 +412,14 @@ func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (b
return false, nil
}
messageOptions := b.prepareMessageOptions(msg)
var rateLimitHits int
for {
_, _, _, err := b.rtm.UpdateMessage(channelInfo.ID, msg.ID, messageOptions...)
if err == nil {
return true, nil
}

if err = handleRateLimit(b.Log, err); err != nil {
if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil {
b.Log.Errorf("Failed to edit user message on Slack: %#v", err)
return true, err
}
Expand All @@ -429,13 +432,14 @@ func (b *Bslack) postMessage(msg *config.Message, channelInfo *slack.Channel) (s
return "", nil
}
messageOptions := b.prepareMessageOptions(msg)
var rateLimitHits int
for {
_, id, err := b.rtm.PostMessage(channelInfo.ID, messageOptions...)
if err == nil {
return id, nil
}

if err = handleRateLimit(b.Log, err); err != nil {
if err = handleRateLimit(b.Log, err, &rateLimitHits); err != nil {
b.Log.Errorf("Failed to sent user message to Slack: %#v", err)
return "", err
}
Expand Down
8 changes: 6 additions & 2 deletions bridge/slack/users_channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (b *users) populateUsers(wait bool) {
newUsers := map[string]*slack.User{}
pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200))
count := 0
var rateLimitHits int
for {
var err error
pagination, err = pagination.Next(context.Background())
Expand All @@ -147,12 +148,13 @@ func (b *users) populateUsers(wait bool) {
break
}

if err = handleRateLimit(b.log, err); err != nil {
if err = handleRateLimit(b.log, err, &rateLimitHits); err != nil {
b.log.Errorf("Could not retrieve users: %#v", err)
return
}
continue
}
rateLimitHits = 0

for i := range pagination.Users {
newUsers[pagination.Users[i].ID] = &pagination.Users[i]
Expand Down Expand Up @@ -293,15 +295,17 @@ func (b *channels) populateChannels(wait bool) {
Types: []string{"public_channel,private_channel"},
Limit: 1000,
}
var rateLimitHits int
for {
channels, nextCursor, err := b.sc.GetConversations(queryParams)
if err != nil {
if err = handleRateLimit(b.log, err); err != nil {
if err = handleRateLimit(b.log, err, &rateLimitHits); err != nil {
b.log.Errorf("Could not retrieve channels: %#v", err)
return
}
continue
}
rateLimitHits = 0

for i := range channels {
newChannelsByID[channels[i].ID] = &channels[i]
Expand Down
21 changes: 20 additions & 1 deletion gateway/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,26 @@ func (gw *Gateway) handleMessage(rmsg *config.Message, dest *bridge.Bridge) []*B
channels := gw.getDestChannel(rmsg, *dest)
for idx := range channels {
channel := &channels[idx]
msgID, err := gw.SendMessage(rmsg, dest, channel, canonicalParentMsgID)

// REASON: Discord (and other APIs) occasionally return 503 Service Unavailable
// during transient outages. Without retry, the message is silently dropped.
// We retry up to 3 times with exponential backoff (1s, 2s, 4s) for 503 errors only.
var msgID string
var err error
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
msgID, err = gw.SendMessage(rmsg, dest, channel, canonicalParentMsgID)
if err == nil {
break
}
if strings.Contains(err.Error(), "503") || strings.Contains(err.Error(), "Service Unavailable") {
backoff := time.Duration(1<<uint(attempt)) * time.Second
gw.logger.Warnf("SendMessage got 503, retrying in %v (attempt %d/%d): %s", backoff, attempt+1, maxRetries, err)
time.Sleep(backoff)
continue
}
break // non-retryable error, stop immediately
}
if err != nil {
gw.logger.Errorf("SendMessage failed: %s", err)
continue
Expand Down