From c32d2965d9a77da420df9c84839c749ebae2bd9e Mon Sep 17 00:00:00 2001 From: selfhoster1312 Date: Fri, 27 Feb 2026 16:46:38 +0100 Subject: [PATCH 1/8] WIP --- gateway/gateway.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gateway/gateway.go b/gateway/gateway.go index 7ca84e4f4..0f85a4c51 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -449,7 +449,7 @@ func (gw *Gateway) SendMessage( dest *bridge.Bridge, channel *config.ChannelInfo, canonicalParentMsgID string, -) (string, error) { +) { msg := *rmsg // Only send the avatar download event to ourselves. if msg.Event == config.EventAvatarDownload { @@ -522,7 +522,7 @@ func (gw *Gateway) SendMessage( gw.logger.Debugf("=> Send from %s (%s) to %s (%s) took %s", msg.Account, rmsg.Channel, dest.Account, channel.Name, time.Since(t)) }(time.Now()) - mID, err := dest.Send(msg) + mID, err := dest.Send(msg, MsgIdChannel) if err != nil { return mID, err } From db3a11cdaf22628790727026bae359f63b0f6f61 Mon Sep 17 00:00:00 2001 From: selfhoster1312 Date: Fri, 27 Feb 2026 17:51:57 +0100 Subject: [PATCH 2/8] WIP: setup a channel to receive message acks --- bridge/bridge.go | 17 ++++++++++++++- bridge/config/config.go | 45 ++++++++++++++++++++++++++++------------ gateway/gateway.go | 46 ++++++++++++++++++----------------------- gateway/handlers.go | 11 ++-------- gateway/router.go | 18 ++++++++++++++++ 5 files changed, 88 insertions(+), 49 deletions(-) diff --git a/bridge/bridge.go b/bridge/bridge.go index 44642e733..bba95b67a 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -13,6 +13,7 @@ import ( "time" "github.com/matterbridge-org/matterbridge/bridge/config" + "github.com/rs/xid" "github.com/sirupsen/logrus" ) @@ -23,6 +24,7 @@ type Bridger interface { Disconnect() error NewHttpRequest(method, uri string, body io.Reader) (*http.Request, error) NewHttpClient(proxy string) (*http.Client, error) + AckSentMessage(internal xid.ID, external string) } type Bridge struct { @@ -44,7 +46,8 @@ type Bridge struct { type Config struct { *Bridge - Remote chan config.Message + Remote chan config.Message + MessageSentAck chan config.MessageSent } // Factory is the factory function to create a bridge @@ -404,3 +407,15 @@ func (b *Bridge) addAttachmentProcess(msg *config.Message, filename string, id s return nil } + +func (b *Config) AckSentMessage(internal xid.ID, external string, channel string) { + go func() { + b.MessageSentAck <- MessageSent{ + InternalID: internal, + ExternalID: config.MessageSentID{ + Protocol: b.Protocol, + ID: external, + }, + } + }() +} diff --git a/bridge/config/config.go b/bridge/config/config.go index 994b83488..59d73465a 100644 --- a/bridge/config/config.go +++ b/bridge/config/config.go @@ -12,6 +12,7 @@ import ( "time" "github.com/fsnotify/fsnotify" + "github.com/rs/xid" "github.com/sirupsen/logrus" "github.com/spf13/viper" ) @@ -34,20 +35,38 @@ const ( const ParentIDNotFound = "msg-parent-not-found" +// MessageSent is an acknowledgement received from a remote +// network that a message has been successfully sent, along +// with a protocol-dependent unique ID. +type MessageSent struct { + InternalID xid.ID + ExternalID MessageSentID +} + +// TODO: to avoid circular import we define a new type +// but that's not really what we want to do ... +type MessageSentID struct { + Protocol string + // TODO: do we need the chan here to disambiguate? + // ChannelID string + ID string +} + type Message struct { - Text string `json:"text"` - Channel string `json:"channel"` - Username string `json:"username"` - UserID string `json:"userid"` // userid on the bridge - Avatar string `json:"avatar"` - Account string `json:"account"` - Event string `json:"event"` - Protocol string `json:"protocol"` - Gateway string `json:"gateway"` - ParentID string `json:"parent_id"` - Timestamp time.Time `json:"timestamp"` - ID string `json:"id"` - Extra map[string][]interface{} + Text string `json:"text"` + Channel string `json:"channel"` + Username string `json:"username"` + UserID string `json:"userid"` // userid on the bridge + Avatar string `json:"avatar"` + Account string `json:"account"` + Event string `json:"event"` + Protocol string `json:"protocol"` + Gateway string `json:"gateway"` + ParentID string `json:"parent_id"` + Timestamp time.Time `json:"timestamp"` + ID string `json:"id"` + InternalID xid.ID + Extra map[string][]interface{} } func (m Message) ParentNotFound() bool { diff --git a/gateway/gateway.go b/gateway/gateway.go index 0f85a4c51..5e7f77c13 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -26,6 +26,7 @@ type Gateway struct { Channels map[string]*config.ChannelInfo ChannelOptions map[string]config.ChannelOptions Message chan config.Message + MessageSentAck chan config.MessageSent Name string Messages *lru.Cache @@ -47,13 +48,14 @@ func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) *Gateway { cache, _ := lru.New(5000) gw := &Gateway{ - Channels: make(map[string]*config.ChannelInfo), - Message: r.Message, - Router: r, - Bridges: make(map[string]*bridge.Bridge), - Config: r.Config, - Messages: cache, - logger: logger, + Channels: make(map[string]*config.ChannelInfo), + Message: r.Message, + MessageSentAck: r.MessageSentAck, + Router: r, + Bridges: make(map[string]*bridge.Bridge), + Config: r.Config, + Messages: cache, + logger: logger, } if err := gw.AddConfig(cfg); err != nil { logger.Errorf("Failed to add configuration to gateway: %#v", err) @@ -454,18 +456,18 @@ func (gw *Gateway) SendMessage( // Only send the avatar download event to ourselves. if msg.Event == config.EventAvatarDownload { if channel.ID != getChannelID(rmsg) { - return "", nil + return } } else { // do not send to ourself for any other event if channel.ID == getChannelID(rmsg) { - return "", nil + return } } // Only send irc notices to irc if msg.Event == config.EventNoticeIRC && dest.Protocol != "irc" { - return "", nil + return } // Too noisy to log like other events @@ -506,7 +508,7 @@ func (gw *Gateway) SendMessage( if drop { gw.logger.Debugf("=> Tengo dropping %#v from %s (%s) to %s (%s)", msg, msg.Account, rmsg.Channel, dest.Account, channel.Name) - return "", nil + return } if debugSendMessage != "" { @@ -518,22 +520,14 @@ func (gw *Gateway) SendMessage( gw.Router.MattermostPlugin <- msg } - defer func(t time.Time) { - gw.logger.Debugf("=> Send from %s (%s) to %s (%s) took %s", msg.Account, rmsg.Channel, dest.Account, channel.Name, time.Since(t)) - }(time.Now()) - - mID, err := dest.Send(msg, MsgIdChannel) - if err != nil { - return mID, err - } + // Send the message in the background + go func() { + defer func(t time.Time) { + gw.logger.Debugf("=> Send from %s (%s) to %s (%s) took %s", msg.Account, rmsg.Channel, dest.Account, channel.Name, time.Since(t)) + }(time.Now()) - // append the message ID (mID) from this bridge (dest) to our brMsgIDs slice - if mID != "" { - gw.logger.Debugf("mID %s: %s", dest.Account, mID) - return mID, nil - // brMsgIDs = append(brMsgIDs, &BrMsgID{dest, dest.Protocol + " " + mID, channel.ID}) - } - return "", nil + dest.Send(msg) + }() } func (gw *Gateway) validGatewayDest(msg *config.Message) bool { diff --git a/gateway/handlers.go b/gateway/handlers.go index b934f5343..497d1a85c 100644 --- a/gateway/handlers.go +++ b/gateway/handlers.go @@ -185,15 +185,8 @@ func (gw *Gateway) handleMessage(rmsg *config.Message, dest *bridge.Bridge) []*B channels := gw.getDestChannel(rmsg, *dest) for idx := range channels { channel := &channels[idx] - msgID, err := gw.SendMessage(rmsg, dest, channel, canonicalParentMsgID) - if err != nil { - gw.logger.Errorf("SendMessage failed: %s", err) - continue - } - if msgID == "" { - continue - } - brMsgIDs = append(brMsgIDs, &BrMsgID{dest, dest.Protocol + " " + msgID, channel.ID}) + gw.SendMessage(rmsg, dest, channel, canonicalParentMsgID) + // brMsgIDs = append(brMsgIDs, &BrMsgID{dest, dest.Protocol + " " + msgID, channel.ID}) } return brMsgIDs } diff --git a/gateway/router.go b/gateway/router.go index a8aad1aae..f9614771f 100644 --- a/gateway/router.go +++ b/gateway/router.go @@ -8,6 +8,7 @@ import ( "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/gateway/samechannel" + "github.com/rs/xid" "github.com/sirupsen/logrus" ) @@ -18,6 +19,7 @@ type Router struct { BridgeMap map[string]bridge.Factory Gateways map[string]*Gateway Message chan config.Message + MessageSentAck chan config.MessageSent MattermostPlugin chan config.Message logger *logrus.Entry @@ -32,6 +34,7 @@ func NewRouter(rootLogger *logrus.Logger, cfg config.Config, bridgeMap map[strin Config: cfg, BridgeMap: bridgeMap, Message: make(chan config.Message), + MessageSentAck: make(chan config.MessageSent), MattermostPlugin: make(chan config.Message), Gateways: make(map[string]*Gateway), logger: logger, @@ -39,6 +42,15 @@ func NewRouter(rootLogger *logrus.Logger, cfg config.Config, bridgeMap map[strin sgw := samechannel.New(cfg) gwconfigs := append(sgw.GetConfig(), cfg.BridgeValues().Gateway...) + // Start listening for sent message acknowledgements + go func() { + for { + ack := <-r.MessageSentAck + // TODO: actually save it in the right place + logger.Warnf("Message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.ExternalID.Protocol, ack.ExternalID.ID) + } + }() + for idx := range gwconfigs { entry := &gwconfigs[idx] if !entry.Enable { @@ -136,6 +148,12 @@ func (r *Router) getBridge(account string) *bridge.Bridge { func (r *Router) handleReceive() { for msg := range r.Message { msg := msg // scopelint + + // We add an internal UUID which will allow destination protocols + // to send back their own ID(s) corresponding to the message go the + // gateway in an asynchronous manner. + msg.InternalID = xid.New() + r.handleEventGetChannelMembers(&msg) r.handleEventFailure(&msg) r.handleEventRejoinChannels(&msg) From 2ecd9484146a7a69f4b98b39e53ef1fa067662ce Mon Sep 17 00:00:00 2001 From: selfhoster1312 Date: Fri, 27 Feb 2026 19:34:21 +0100 Subject: [PATCH 3/8] WIP: initial async message acks prototype --- bridge/bridge.go | 19 +++++++-- bridge/config/config.go | 17 ++------ bridge/discord/discord.go | 8 ++-- bridge/rocketchat/rocketchat.go | 6 +-- bridge/slack/handlers.go | 8 ++-- bridge/slack/slack.go | 25 ++++++++---- bridge/xmpp/xmpp.go | 63 +++++++++++++++++++++-------- gateway/gateway.go | 70 ++++++++++++++++++++++++--------- gateway/router.go | 27 +------------ go.mod | 16 ++++---- go.sum | 28 ++++++------- 11 files changed, 167 insertions(+), 120 deletions(-) diff --git a/bridge/bridge.go b/bridge/bridge.go index bba95b67a..2a15665fa 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -24,7 +24,7 @@ type Bridger interface { Disconnect() error NewHttpRequest(method, uri string, body io.Reader) (*http.Request, error) NewHttpClient(proxy string) (*http.Client, error) - AckSentMessage(internal xid.ID, external string) + AckSentMessage(internal xid.ID, external string, channel string) } type Bridge struct { @@ -47,7 +47,16 @@ type Config struct { *Bridge Remote chan config.Message - MessageSentAck chan config.MessageSent + MessageSentAck chan MessageSent +} + +// MessageSent is an acknowledgement received from a remote +// network that a message has been successfully sent, along +// with a protocol-dependent unique ID. +type MessageSent struct { + DestBridge *Bridge + InternalID xid.ID + ExternalID config.MessageSentID } // Factory is the factory function to create a bridge @@ -411,10 +420,12 @@ func (b *Bridge) addAttachmentProcess(msg *config.Message, filename string, id s func (b *Config) AckSentMessage(internal xid.ID, external string, channel string) { go func() { b.MessageSentAck <- MessageSent{ + DestBridge: b.Bridge, InternalID: internal, ExternalID: config.MessageSentID{ - Protocol: b.Protocol, - ID: external, + ChannelID: channel, + Protocol: b.Protocol, + ID: external, }, } }() diff --git a/bridge/config/config.go b/bridge/config/config.go index 59d73465a..8c2f56f01 100644 --- a/bridge/config/config.go +++ b/bridge/config/config.go @@ -35,21 +35,10 @@ const ( const ParentIDNotFound = "msg-parent-not-found" -// MessageSent is an acknowledgement received from a remote -// network that a message has been successfully sent, along -// with a protocol-dependent unique ID. -type MessageSent struct { - InternalID xid.ID - ExternalID MessageSentID -} - -// TODO: to avoid circular import we define a new type -// but that's not really what we want to do ... type MessageSentID struct { - Protocol string - // TODO: do we need the chan here to disambiguate? - // ChannelID string - ID string + Protocol string + ChannelID string + ID string } type Message struct { diff --git a/bridge/discord/discord.go b/bridge/discord/discord.go index 5b6b7e413..d9a43a36d 100644 --- a/bridge/discord/discord.go +++ b/bridge/discord/discord.go @@ -7,7 +7,7 @@ import ( "sync" "github.com/bwmarrin/discordgo" - lru "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru/v2" "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/bridge/discord/transmitter" @@ -39,11 +39,11 @@ type Bdiscord struct { // Webhook specific logic useAutoWebhooks bool transmitter *transmitter.Transmitter - cache *lru.Cache + cache *lru.Cache[string, string] } func New(cfg *bridge.Config) bridge.Bridger { - newCache, err := lru.New(5000) + newCache, err := lru.New[string, string](5000) if err != nil { cfg.Log.Fatalf("Could not create LRU cache: %v", err) } @@ -305,7 +305,7 @@ func (b *Bdiscord) handleEventBotUser(msg *config.Message, channelID string) (st } if fi, ok := b.cache.Get(cFileUpload + msg.ID); ok { - err := b.c.ChannelMessageDelete(channelID, fi.(string)) // nolint:forcetypeassert + err := b.c.ChannelMessageDelete(channelID, fi) b.cache.Remove(cFileUpload + msg.ID) return "", err } diff --git a/bridge/rocketchat/rocketchat.go b/bridge/rocketchat/rocketchat.go index 247f2b466..af8a766a9 100644 --- a/bridge/rocketchat/rocketchat.go +++ b/bridge/rocketchat/rocketchat.go @@ -5,7 +5,7 @@ import ( "strings" "sync" - lru "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru/v2" "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/bridge/helper" @@ -22,7 +22,7 @@ type Brocketchat struct { rh *rockethook.Client c *realtime.Client r *rest.Client - cache *lru.Cache + cache *lru.Cache[string, bool] *bridge.Config messageChan chan models.Message channelMap map[string]string @@ -37,7 +37,7 @@ const ( ) func New(cfg *bridge.Config) bridge.Bridger { - newCache, err := lru.New(100) + newCache, err := lru.New[string, bool](100) if err != nil { cfg.Log.Fatalf("Could not create LRU cache for rocketchat bridge: %v", err) } diff --git a/bridge/slack/handlers.go b/bridge/slack/handlers.go index e6649a98d..7794ceb63 100644 --- a/bridge/slack/handlers.go +++ b/bridge/slack/handlers.go @@ -231,7 +231,7 @@ func (b *Bslack) handleMessageEvent(ev *slack.MessageEvent) (*config.Message, er } func (b *Bslack) handleFileDeletedEvent(ev *slack.FileDeletedEvent) (*config.Message, error) { - if rawChannel, ok := b.cache.Get(cfileDownloadChannel + ev.FileID); ok { + if rawChannel, ok := b.cacheChan.Get(cfileDownloadChannel + ev.FileID); ok { channel, err := b.channels.getChannelByID(rawChannel.(string)) if err != nil { return nil, err @@ -320,7 +320,7 @@ func (b *Bslack) handleAttachments(ev *slack.MessageEvent, rmsg *config.Message) // If we have files attached, download them (in memory) and put a pointer to it in msg.Extra. for i := range ev.Files { // keep reference in cache on which channel we added this file - b.cache.Add(cfileDownloadChannel+ev.Files[i].ID, ev.Channel) + b.cacheChan.Add(cfileDownloadChannel+ev.Files[i].ID, ev.Channel) if err := b.handleDownloadFile(rmsg, &ev.Files[i], false); err != nil { b.Log.Errorf("Could not download incoming file: %#v", err) } @@ -406,9 +406,9 @@ func (b *Bslack) handleGetChannelMembers(rmsg *config.Message) bool { // (the assumption is that such name collisions will not occur within the given // timeframes). func (b *Bslack) fileCached(file *slack.File) bool { - if ts, ok := b.cache.Get("file" + file.ID); ok && time.Since(ts.(time.Time)) < time.Minute { + if ts, ok := b.cacheTs.Get("file" + file.ID); ok && time.Since(ts) < time.Minute { return true - } else if ts, ok = b.cache.Get("filename" + file.Name); ok && time.Since(ts.(time.Time)) < 10*time.Second { + } else if ts, ok = b.cacheTs.Get("filename" + file.Name); ok && time.Since(ts) < 10*time.Second { return true } return false diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index 1eb0b8ffd..3832dc915 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -8,7 +8,7 @@ import ( "sync" "time" - lru "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru/v2" "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/bridge/helper" @@ -26,7 +26,10 @@ type Bslack struct { rtm *slack.RTM si *slack.Info - cache *lru.Cache + // File timestamps + cacheTs *lru.Cache[string, time.Time] + // Channels + cacheChan *lru.Cache[string, any] uuid string useChannelID bool @@ -81,14 +84,20 @@ func New(cfg *bridge.Config) bridge.Bridger { } func newBridge(cfg *bridge.Config) *Bslack { - newCache, err := lru.New(5000) + newCache, err := lru.New[string, time.Time](5000) + if err != nil { + cfg.Log.Fatalf("Could not create LRU cache for Slack bridge: %v", err) + } + + newCache2, err := lru.New[string, any](5000) if err != nil { cfg.Log.Fatalf("Could not create LRU cache for Slack bridge: %v", err) } b := &Bslack{ - Config: cfg, - uuid: xid.New().String(), - cache: newCache, + Config: cfg, + uuid: xid.New().String(), + cacheTs: newCache, + cacheChan: newCache2, } return b } @@ -458,7 +467,7 @@ func (b *Bslack) uploadFile(msg *config.Message, channelID string) (string, erro // we can't match on the file ID yet, so we have to match on the filename too. ts := time.Now() b.Log.Debugf("Adding file %s to cache at %s with timestamp", fi.Name, ts.String()) - b.cache.Add("filename"+fi.Name, ts) + b.cacheTs.Add("filename"+fi.Name, ts) initialComment := fmt.Sprintf("File from %s", msg.Username) if fi.Comment != "" { initialComment += fmt.Sprintf(" with comment: %s", fi.Comment) @@ -484,7 +493,7 @@ func (b *Bslack) uploadFile(msg *config.Message, channelID string) (string, erro } b.Log.Debugf("Adding file ID %s to cache with timestamp %s", res.ID, ts.String()) - b.cache.Add("file"+res.ID, ts) + b.cacheTs.Add("file"+res.ID, ts) // search for message id by uploaded file in private/public channels, get thread timestamp from uploaded file if v, ok := sfi.Shares.Private[channelID]; ok && len(v) > 0 { diff --git a/bridge/xmpp/xmpp.go b/bridge/xmpp/xmpp.go index 8e17e682d..b8f63e038 100644 --- a/bridge/xmpp/xmpp.go +++ b/bridge/xmpp/xmpp.go @@ -26,6 +26,11 @@ type Bxmpp struct { avatarAvailability map[string]bool avatarMap map[string]string + + // Mapping of sent origin-id to internal matterbridge ID of source msg + // An internal ID may correspond to several XMPP origin-id because + // of split message (eg. multiple file upload) + messageMap map[string]xid.ID } func New(cfg *bridge.Config) bridge.Bridger { @@ -34,6 +39,7 @@ func New(cfg *bridge.Config) bridge.Bridger { xmppMap: make(map[string]string), avatarAvailability: make(map[string]bool), avatarMap: make(map[string]string), + messageMap: make(map[string]xid.ID), } } @@ -94,10 +100,15 @@ func (b *Bxmpp) Send(msg config.Message) (string, error) { if msg.Extra != nil { for _, rmsg := range helper.HandleExtra(&msg, b.General) { b.Log.Debugf("=> Sending attachement message %#v", rmsg) + + originId := xid.New().String() + b.messageMap[originId] = msg.InternalID + _, err = b.xc.Send(xmpp.Chat{ - Type: "groupchat", - Remote: rmsg.Channel + "@" + b.GetString("Muc"), - Text: rmsg.Username + rmsg.Text, + Type: "groupchat", + Remote: rmsg.Channel + "@" + b.GetString("Muc"), + Text: rmsg.Username + rmsg.Text, + OriginID: originId, }) if err != nil { @@ -105,24 +116,27 @@ func (b *Bxmpp) Send(msg config.Message) (string, error) { } } if len(msg.Extra["file"]) > 0 { + // TODO: log return "", b.handleUploadFile(&msg) } } + originId := xid.New().String() + b.messageMap[originId] = msg.InternalID + // Post normal message. b.Log.Debugf("=> Sending message %#v", msg) if _, err := b.xc.Send(xmpp.Chat{ - Type: "groupchat", - Remote: msg.Channel + "@" + b.GetString("Muc"), - Text: msg.Username + msg.Text, + Type: "groupchat", + Remote: msg.Channel + "@" + b.GetString("Muc"), + Text: msg.Username + msg.Text, + OriginID: originId, }); err != nil { + // TODO: log return "", err } - // Generate a dummy ID because to avoid collision with other internal messages - // However this does not provide proper Edits/Replies integration on XMPP side. - msgID := xid.New().String() - return msgID, nil + return "", nil } func (b *Bxmpp) createXMPP() error { @@ -338,20 +352,29 @@ func (b *Bxmpp) handleUploadFile(msg *config.Message) error { urlDesc = fileInfo.Comment } } + + originId := xid.New().String() + b.messageMap[originId] = msg.InternalID + if _, err := b.xc.Send(xmpp.Chat{ - Type: "groupchat", - Remote: msg.Channel + "@" + b.GetString("Muc"), - Text: msg.Username + msg.Text, + Type: "groupchat", + Remote: msg.Channel + "@" + b.GetString("Muc"), + Text: msg.Username + msg.Text, + OriginID: originId, }); err != nil { return err } + originId = xid.New().String() + + b.messageMap[originId] = msg.InternalID if fileInfo.URL != "" { if _, err := b.xc.SendOOB(xmpp.Chat{ - Type: "groupchat", - Remote: msg.Channel + "@" + b.GetString("Muc"), - Ooburl: fileInfo.URL, - Oobdesc: urlDesc, + Type: "groupchat", + Remote: msg.Channel + "@" + b.GetString("Muc"), + Ooburl: fileInfo.URL, + Oobdesc: urlDesc, + OriginID: originId, }); err != nil { b.Log.WithError(err).Warn("Failed to send share URL.") } @@ -383,6 +406,12 @@ func (b *Bxmpp) parseChannel(remote string) string { func (b *Bxmpp) skipMessage(message xmpp.Chat) bool { // skip messages from ourselves if b.parseNick(message.Remote) == b.GetString("Nick") { + // We received our own message, we will further ignore it, + // but first send back the ID it was assigned. + if message.OriginID != "" { + internal := b.messageMap[message.OriginID] + b.AckSentMessage(internal, message.OriginID, b.parseChannel(message.Remote)) + } return true } diff --git a/gateway/gateway.go b/gateway/gateway.go index 5e7f77c13..097acd7dd 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -9,7 +9,7 @@ import ( "github.com/d5/tengo/v2" "github.com/d5/tengo/v2/stdlib" - lru "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru/v2" "github.com/kyokomi/emoji/v2" "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" @@ -26,9 +26,8 @@ type Gateway struct { Channels map[string]*config.ChannelInfo ChannelOptions map[string]config.ChannelOptions Message chan config.Message - MessageSentAck chan config.MessageSent Name string - Messages *lru.Cache + Messages *lru.Cache[string, []*BrMsgID] logger *logrus.Entry } @@ -46,16 +45,15 @@ const apiProtocol = "api" func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) *Gateway { logger := rootLogger.WithFields(logrus.Fields{"prefix": "gateway"}) - cache, _ := lru.New(5000) + cache, _ := lru.New[string, []*BrMsgID](5000) gw := &Gateway{ - Channels: make(map[string]*config.ChannelInfo), - Message: r.Message, - MessageSentAck: r.MessageSentAck, - Router: r, - Bridges: make(map[string]*bridge.Bridge), - Config: r.Config, - Messages: cache, - logger: logger, + Channels: make(map[string]*config.ChannelInfo), + Message: r.Message, + Router: r, + Bridges: make(map[string]*bridge.Bridge), + Config: r.Config, + Messages: cache, + logger: logger, } if err := gw.AddConfig(cfg); err != nil { logger.Errorf("Failed to add configuration to gateway: %#v", err) @@ -72,11 +70,10 @@ func (gw *Gateway) FindCanonicalMsgID(protocol string, mID string) string { // If not keyed, iterate through cache for downstream, and infer upstream. for _, mid := range gw.Messages.Keys() { - v, _ := gw.Messages.Peek(mid) - ids := v.([]*BrMsgID) + ids, _ := gw.Messages.Peek(mid) for _, downstreamMsgObj := range ids { if ID == downstreamMsgObj.ID { - return mid.(string) + return mid } } } @@ -104,9 +101,45 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { br.HttpClient = http_client + // The channel to receive message IDs is shared with the + // bridges, but is not kept in the gateway. + messageAck := make(chan bridge.MessageSent, 100) + // Start listening for sent message acknowledgements + go func() { + for ack := range messageAck { + gw.logger.Warnf("Message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.ExternalID.Protocol, ack.ExternalID.ID) + // TODO 2026: this was a comment in the previous ID handling. Not + // sure what to do about ID changing on edit???? + // + // Only add the message ID if it doesn't already exist + // + // For some bridges we always add/update the message ID. + // This is necessary as msgIDs will change if a bridge returns + // a different ID in response to edits. + values, exists := gw.Messages.Get(ack.ExternalID.Protocol + " " + ack.InternalID.String()) + + var brMsgIDs []*BrMsgID + if exists { + // We want to append, not overwrite + brMsgIDs = values + } + + brMsgIDs = append(brMsgIDs, &BrMsgID{ack.DestBridge, ack.ExternalID.Protocol + " " + ack.ExternalID.ID, ack.ExternalID.ChannelID}) + gw.Messages.Add(ack.ExternalID.Protocol+" "+ack.InternalID.String(), brMsgIDs) + + // Just for testing that we added everything correctly + // TODO: remove this useless debug log + values, _ = gw.Messages.Get(ack.ExternalID.Protocol + " " + ack.InternalID.String()) + for _, id := range values { + gw.logger.Warnf(" | %s -> %s (%s)", ack.InternalID.String(), id.ID, id.ChannelID) + } + } + }() + brconfig := &bridge.Config{ - Remote: gw.Message, - Bridge: br, + Remote: gw.Message, + MessageSentAck: messageAck, + Bridge: br, } // add the actual bridger for this protocol to this bridge using the bridgeMap if _, ok := gw.Router.BridgeMap[br.Protocol]; !ok { @@ -275,8 +308,7 @@ func (gw *Gateway) getDestChannel(msg *config.Message, dest bridge.Bridge) []con } func (gw *Gateway) getDestMsgID(msgID string, dest *bridge.Bridge, channel *config.ChannelInfo) string { - if res, ok := gw.Messages.Get(msgID); ok { - IDs := res.([]*BrMsgID) + if IDs, ok := gw.Messages.Get(msgID); ok { for _, id := range IDs { // check protocol, bridge name and channelname // for people that reuse the same bridge multiple times. see #342 diff --git a/gateway/router.go b/gateway/router.go index f9614771f..8c3ed06ea 100644 --- a/gateway/router.go +++ b/gateway/router.go @@ -19,7 +19,6 @@ type Router struct { BridgeMap map[string]bridge.Factory Gateways map[string]*Gateway Message chan config.Message - MessageSentAck chan config.MessageSent MattermostPlugin chan config.Message logger *logrus.Entry @@ -34,7 +33,6 @@ func NewRouter(rootLogger *logrus.Logger, cfg config.Config, bridgeMap map[strin Config: cfg, BridgeMap: bridgeMap, Message: make(chan config.Message), - MessageSentAck: make(chan config.MessageSent), MattermostPlugin: make(chan config.Message), Gateways: make(map[string]*Gateway), logger: logger, @@ -42,15 +40,6 @@ func NewRouter(rootLogger *logrus.Logger, cfg config.Config, bridgeMap map[strin sgw := samechannel.New(cfg) gwconfigs := append(sgw.GetConfig(), cfg.BridgeValues().Gateway...) - // Start listening for sent message acknowledgements - go func() { - for { - ack := <-r.MessageSentAck - // TODO: actually save it in the right place - logger.Warnf("Message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.ExternalID.Protocol, ack.ExternalID.ID) - } - }() - for idx := range gwconfigs { entry := &gwconfigs[idx] if !entry.Enable { @@ -164,7 +153,6 @@ func (r *Router) handleReceive() { filesHandled := false for _, gw := range r.Gateways { // record all the message ID's of the different bridges - var msgIDs []*BrMsgID if gw.ignoreMessage(&msg) { continue } @@ -175,20 +163,7 @@ func (r *Router) handleReceive() { filesHandled = true } for _, br := range gw.Bridges { - msgIDs = append(msgIDs, gw.handleMessage(&msg, br)...) - } - - if msg.ID != "" { - _, exists := gw.Messages.Get(msg.Protocol + " " + msg.ID) - - // Only add the message ID if it doesn't already exist - // - // For some bridges we always add/update the message ID. - // This is necessary as msgIDs will change if a bridge returns - // a different ID in response to edits. - if !exists { - gw.Messages.Add(msg.Protocol+" "+msg.ID, msgIDs) - } + gw.handleMessage(&msg, br) } } } diff --git a/go.mod b/go.mod index 75301890a..79ce19a6f 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/gomarkdown/markdown v0.0.0-20240419095408-642f0ee99ae2 github.com/google/gops v0.3.27 github.com/gorilla/schema v1.4.1 - github.com/hashicorp/golang-lru v1.0.2 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jpillora/backoff v1.0.0 github.com/kyokomi/emoji/v2 v2.2.13 github.com/labstack/echo/v4 v4.12.0 @@ -44,7 +44,7 @@ require ( go.mau.fi/whatsmeow v0.0.0-20260123132415-83db04703aee golang.org/x/image v0.19.0 golang.org/x/oauth2 v0.22.0 - golang.org/x/text v0.33.0 + golang.org/x/text v0.34.0 gomod.garykim.dev/nc-talk v0.3.0 google.golang.org/protobuf v1.36.11 layeh.com/gumble v0.0.0-20221205141517-d1df60a3cc14 @@ -76,7 +76,7 @@ require ( github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-plugin v1.6.1 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/kettek/apng v0.0.0-20191108220231-414630eed80f // indirect @@ -133,11 +133,11 @@ require ( go.mau.fi/libsignal v0.2.1 // indirect go.mau.fi/util v0.9.5 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.47.0 // indirect + golang.org/x/crypto v0.48.0 // indirect golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect - golang.org/x/net v0.49.0 // indirect - golang.org/x/sys v0.40.0 // indirect - golang.org/x/term v0.39.0 // indirect + golang.org/x/net v0.50.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/term v0.40.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade // indirect google.golang.org/grpc v1.65.0 // indirect @@ -156,4 +156,6 @@ require ( //replace github.com/matrix-org/gomatrix => github.com/matterbridge/gomatrix v0.0.0-20220205235239-607eb9ee6419 +replace github.com/xmppo/go-xmpp => github.com/selfhoster1312/go-xmpp v0.0.0-20260227182655-43afd5dbe004 + go 1.24.0 diff --git a/go.sum b/go.sum index be7409297..b395c4b86 100644 --- a/go.sum +++ b/go.sum @@ -137,8 +137,8 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-plugin v1.6.1 h1:P7MR2UP6gNKGPp+y7EZw2kOiq4IR9WiqLvp0XOsVdwI= github.com/hashicorp/go-plugin v1.6.1/go.mod h1:XPHFku2tFo3o3QKFgSYo+cghcUhw1NA1hZyMK0PWAw0= -github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= -github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -308,6 +308,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d h1:hrujxIzL1woJ7AwssoOcM/tq5JjjG2yYOc8odClEiXA= github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU= +github.com/selfhoster1312/go-xmpp v0.0.0-20260227182655-43afd5dbe004 h1:t50UGMPvT3PB/J0uGxcmE0q0NyN9CAjRqlipRyUjdDA= +github.com/selfhoster1312/go-xmpp v0.0.0-20260227182655-43afd5dbe004/go.mod h1:uUJCG8facQZa7XN9qS/WnNQu3+KkwypbYm4HlmlJ0ME= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= @@ -419,8 +421,6 @@ github.com/writeas/go-strip-markdown v2.0.1+incompatible h1:IIqxTM5Jr7RzhigcL6Fk github.com/writeas/go-strip-markdown v2.0.1+incompatible/go.mod h1:Rsyu10ZhbEK9pXdk8V6MVnZmTzRG0alMNLMwa0J01fE= github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= -github.com/xmppo/go-xmpp v0.3.1 h1:GZUCGZozZdfvv25FVvlgwKoNdRjmfJCUcVG8349/E4A= -github.com/xmppo/go-xmpp v0.3.1/go.mod h1:EBLbzPt4Y9OJBEF58Lhc4IrTnO226aIkbfosQo6KWeA= github.com/yaegashi/msgraph.go v0.1.4 h1:leDXSczAbwBpYFSmmZrdByTiPoUw8dbTfNMetAjJvbw= github.com/yaegashi/msgraph.go v0.1.4/go.mod h1:vgeYhHa5skJt/3lTyjGXThTZhwbhRnGo6uUxzoJIGME= github.com/yaegashi/wtz.go v0.0.2/go.mod h1:nOLA5QXsmdkRxBkP5tljhua13ADHCKirLBrzPf4PEJc= @@ -447,8 +447,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= -golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20260112195511-716be5621a96 h1:Z/6YuSHTLOHfNFdb8zVZomZr7cqNgTJvA8+Qz75D8gU= golang.org/x/exp v0.0.0-20260112195511-716be5621a96/go.mod h1:nzimsREAkjBCIEFtHiYkrJyT+2uy9YZJB7H1k68CXZU= @@ -476,8 +476,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= -golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= +golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -517,20 +517,20 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY= -golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww= +golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg= +golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= -golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= From a0c0fc1fb7f26e0987b559d004e032975c66f47a Mon Sep 17 00:00:00 2001 From: selfhoster1312 Date: Fri, 27 Feb 2026 20:14:33 +0100 Subject: [PATCH 4/8] make linter happy --- gateway/gateway.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gateway/gateway.go b/gateway/gateway.go index 097acd7dd..860190ebc 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -558,7 +558,8 @@ func (gw *Gateway) SendMessage( gw.logger.Debugf("=> Send from %s (%s) to %s (%s) took %s", msg.Account, rmsg.Channel, dest.Account, channel.Name, time.Since(t)) }(time.Now()) - dest.Send(msg) + // TODO: remove this when the interface removes the return type + _, _ = dest.Send(msg) }() } From b4497fc87dac9ccfe679f6544b14151aa9b73393 Mon Sep 17 00:00:00 2001 From: selfhoster1312 Date: Fri, 27 Feb 2026 22:16:49 +0100 Subject: [PATCH 5/8] Add ID mapping on incoming messages --- gateway/gateway.go | 22 ++++++++++------------ gateway/router.go | 15 +++++++++++++-- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/gateway/gateway.go b/gateway/gateway.go index 860190ebc..aae13a532 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -116,23 +116,21 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { // For some bridges we always add/update the message ID. // This is necessary as msgIDs will change if a bridge returns // a different ID in response to edits. - values, exists := gw.Messages.Get(ack.ExternalID.Protocol + " " + ack.InternalID.String()) - var brMsgIDs []*BrMsgID - if exists { - // We want to append, not overwrite - brMsgIDs = values + // brMsgIDs is always initialized in Router.handleReceive(). However, + // we may still receive a message we don't know about. Maybe matterbridge + // was restarted, or another client is connected on the same account sending + // messages, or the remote server is melting down and dinosaurs are walking + // the Earth... + brMsgIDs, exists := gw.Messages.Get(ack.ExternalID.Protocol + " " + ack.InternalID.String()) + + if !exists { + gw.logger.Warnf("Unknown message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.ExternalID.Protocol, ack.ExternalID.ID) + continue } brMsgIDs = append(brMsgIDs, &BrMsgID{ack.DestBridge, ack.ExternalID.Protocol + " " + ack.ExternalID.ID, ack.ExternalID.ChannelID}) gw.Messages.Add(ack.ExternalID.Protocol+" "+ack.InternalID.String(), brMsgIDs) - - // Just for testing that we added everything correctly - // TODO: remove this useless debug log - values, _ = gw.Messages.Get(ack.ExternalID.Protocol + " " + ack.InternalID.String()) - for _, id := range values { - gw.logger.Warnf(" | %s -> %s (%s)", ack.InternalID.String(), id.ID, id.ChannelID) - } } }() diff --git a/gateway/router.go b/gateway/router.go index 8c3ed06ea..5be891bc2 100644 --- a/gateway/router.go +++ b/gateway/router.go @@ -140,15 +140,16 @@ func (r *Router) handleReceive() { // We add an internal UUID which will allow destination protocols // to send back their own ID(s) corresponding to the message go the - // gateway in an asynchronous manner. + // gateway in an asynchronous manner (for replies/reactions). msg.InternalID = xid.New() r.handleEventGetChannelMembers(&msg) r.handleEventFailure(&msg) r.handleEventRejoinChannels(&msg) + msgBridge := r.getBridge(msg.Account) // Set message protocol based on the account it came from - msg.Protocol = r.getBridge(msg.Account).Protocol + msg.Protocol = msgBridge.Protocol filesHandled := false for _, gw := range r.Gateways { @@ -162,6 +163,16 @@ func (r *Router) handleReceive() { gw.handleFiles(&msg) filesHandled = true } + + // If the origin bridge brought us a message ID, map it with our + // internal ID for replies/reactions. + BrMsgIDs := []*BrMsgID{} + if msg.ID != "" { + BrMsgIDs = append(BrMsgIDs, &BrMsgID{msgBridge, msg.ID, msg.Channel}) + } + // Even if it might be empty, already initialize the mapping + gw.Messages.Add(msg.Protocol+" "+msg.InternalID.String(), BrMsgIDs) + for _, br := range gw.Bridges { gw.handleMessage(&msg, br) } From 12260d21ea080aa8465b2f45cde2fd5732a79952 Mon Sep 17 00:00:00 2001 From: selfhoster1312 Date: Fri, 27 Feb 2026 23:10:10 +0100 Subject: [PATCH 6/8] Use typed ID as key in message mapping (unsure about method logic) --- gateway/gateway.go | 47 +++++++++++++++++++++------------------------ gateway/handlers.go | 3 ++- gateway/router.go | 2 +- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/gateway/gateway.go b/gateway/gateway.go index aae13a532..654d3e36c 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -14,6 +14,7 @@ import ( "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/internal" + "github.com/rs/xid" "github.com/sirupsen/logrus" ) @@ -27,7 +28,7 @@ type Gateway struct { ChannelOptions map[string]config.ChannelOptions Message chan config.Message Name string - Messages *lru.Cache[string, []*BrMsgID] + Messages *lru.Cache[xid.ID, []*BrMsgID] logger *logrus.Entry } @@ -45,7 +46,7 @@ const apiProtocol = "api" func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) *Gateway { logger := rootLogger.WithFields(logrus.Fields{"prefix": "gateway"}) - cache, _ := lru.New[string, []*BrMsgID](5000) + cache, _ := lru.New[xid.ID, []*BrMsgID](5000) gw := &Gateway{ Channels: make(map[string]*config.ChannelInfo), Message: r.Message, @@ -62,22 +63,19 @@ func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) *Gateway { } // FindCanonicalMsgID returns the ID under which a message was stored in the cache. -func (gw *Gateway) FindCanonicalMsgID(protocol string, mID string) string { - ID := protocol + " " + mID - if gw.Messages.Contains(ID) { - return ID - } - - // If not keyed, iterate through cache for downstream, and infer upstream. - for _, mid := range gw.Messages.Keys() { - ids, _ := gw.Messages.Peek(mid) - for _, downstreamMsgObj := range ids { - if ID == downstreamMsgObj.ID { - return mid +func (gw *Gateway) FindCanonicalMsgID(protocol string, externalID string) *xid.ID { + // Now that we have internal IDs, mID is never going to be the key + for _, internalID := range gw.Messages.Keys() { + // TODO: should we check if the mapping exists here? or is this method + // only used when we're 100% sure? + externalIDs, _ := gw.Messages.Peek(internalID) + for _, downstreamMsgObj := range externalIDs { + if externalID == downstreamMsgObj.ID { + return &internalID } } } - return "" + return nil } // AddBridge sets up a new bridge on startup. @@ -122,7 +120,7 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { // was restarted, or another client is connected on the same account sending // messages, or the remote server is melting down and dinosaurs are walking // the Earth... - brMsgIDs, exists := gw.Messages.Get(ack.ExternalID.Protocol + " " + ack.InternalID.String()) + brMsgIDs, exists := gw.Messages.Get(ack.InternalID) if !exists { gw.logger.Warnf("Unknown message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.ExternalID.Protocol, ack.ExternalID.ID) @@ -130,7 +128,7 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { } brMsgIDs = append(brMsgIDs, &BrMsgID{ack.DestBridge, ack.ExternalID.Protocol + " " + ack.ExternalID.ID, ack.ExternalID.ChannelID}) - gw.Messages.Add(ack.ExternalID.Protocol+" "+ack.InternalID.String(), brMsgIDs) + gw.Messages.Add(ack.InternalID, brMsgIDs) } }() @@ -305,13 +303,12 @@ func (gw *Gateway) getDestChannel(msg *config.Message, dest bridge.Bridge) []con return channels } -func (gw *Gateway) getDestMsgID(msgID string, dest *bridge.Bridge, channel *config.ChannelInfo) string { +func (gw *Gateway) getDestMsgID(msgID xid.ID, dest *bridge.Bridge, channel *config.ChannelInfo) string { if IDs, ok := gw.Messages.Get(msgID); ok { for _, id := range IDs { // check protocol, bridge name and channelname - // for people that reuse the same bridge multiple times. see #342 if dest.Protocol == id.br.Protocol && dest.Name == id.br.Name && channel.ID == id.ChannelID { - return strings.Replace(id.ID, dest.Protocol+" ", "", 1) + return id.ID } } } @@ -480,7 +477,7 @@ func (gw *Gateway) SendMessage( rmsg *config.Message, dest *bridge.Bridge, channel *config.ChannelInfo, - canonicalParentMsgID string, + canonicalParentMsgID *xid.ID, ) { msg := *rmsg // Only send the avatar download event to ourselves. @@ -512,7 +509,7 @@ func (gw *Gateway) SendMessage( // exclude file delete event as the msg ID here is the native file ID that needs to be deleted if msg.Event != config.EventFileDelete { - msg.ID = gw.getDestMsgID(rmsg.Protocol+" "+rmsg.ID, dest, channel) + msg.ID = gw.getDestMsgID(rmsg.InternalID, dest, channel) } // for api we need originchannel as channel @@ -520,9 +517,9 @@ func (gw *Gateway) SendMessage( msg.Channel = rmsg.Channel } - msg.ParentID = gw.getDestMsgID(canonicalParentMsgID, dest, channel) - if msg.ParentID == "" { - msg.ParentID = strings.Replace(canonicalParentMsgID, dest.Protocol+" ", "", 1) + // TODO: ParentID should be typed too + if canonicalParentMsgID != nil { + msg.ParentID = gw.getDestMsgID(*canonicalParentMsgID, dest, channel) } // if the parentID is still empty and we have a parentID set in the original message diff --git a/gateway/handlers.go b/gateway/handlers.go index 497d1a85c..8cdbae9ed 100644 --- a/gateway/handlers.go +++ b/gateway/handlers.go @@ -11,6 +11,7 @@ import ( "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/gateway/bridgemap" + "github.com/rs/xid" ) // handleEventFailure handles failures and reconnects bridges. @@ -177,7 +178,7 @@ func (gw *Gateway) handleMessage(rmsg *config.Message, dest *bridge.Bridge) []*B } // Get the ID of the parent message in thread - var canonicalParentMsgID string + var canonicalParentMsgID *xid.ID if rmsg.ParentID != "" && dest.GetBool("PreserveThreading") { canonicalParentMsgID = gw.FindCanonicalMsgID(rmsg.Protocol, rmsg.ParentID) } diff --git a/gateway/router.go b/gateway/router.go index 5be891bc2..414bd76d8 100644 --- a/gateway/router.go +++ b/gateway/router.go @@ -171,7 +171,7 @@ func (r *Router) handleReceive() { BrMsgIDs = append(BrMsgIDs, &BrMsgID{msgBridge, msg.ID, msg.Channel}) } // Even if it might be empty, already initialize the mapping - gw.Messages.Add(msg.Protocol+" "+msg.InternalID.String(), BrMsgIDs) + gw.Messages.Add(msg.InternalID, BrMsgIDs) for _, br := range gw.Bridges { gw.handleMessage(&msg, br) From 77f7a3fdc6d1009c9cd0286248b74d3dcc124cb1 Mon Sep 17 00:00:00 2001 From: selfhoster1312 Date: Fri, 27 Feb 2026 23:13:13 +0100 Subject: [PATCH 7/8] remove Protocol from MessageSentID --- bridge/bridge.go | 1 - bridge/config/config.go | 1 - gateway/gateway.go | 6 +++--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/bridge/bridge.go b/bridge/bridge.go index 2a15665fa..15bbfa4f8 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -424,7 +424,6 @@ func (b *Config) AckSentMessage(internal xid.ID, external string, channel string InternalID: internal, ExternalID: config.MessageSentID{ ChannelID: channel, - Protocol: b.Protocol, ID: external, }, } diff --git a/bridge/config/config.go b/bridge/config/config.go index 8c2f56f01..16c954e48 100644 --- a/bridge/config/config.go +++ b/bridge/config/config.go @@ -36,7 +36,6 @@ const ( const ParentIDNotFound = "msg-parent-not-found" type MessageSentID struct { - Protocol string ChannelID string ID string } diff --git a/gateway/gateway.go b/gateway/gateway.go index 654d3e36c..2c4d5b236 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -105,7 +105,7 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { // Start listening for sent message acknowledgements go func() { for ack := range messageAck { - gw.logger.Warnf("Message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.ExternalID.Protocol, ack.ExternalID.ID) + gw.logger.Warnf("Message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.DestBridge.Protocol, ack.ExternalID.ID) // TODO 2026: this was a comment in the previous ID handling. Not // sure what to do about ID changing on edit???? // @@ -123,11 +123,11 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { brMsgIDs, exists := gw.Messages.Get(ack.InternalID) if !exists { - gw.logger.Warnf("Unknown message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.ExternalID.Protocol, ack.ExternalID.ID) + gw.logger.Warnf("Unknown message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.DestBridge.Protocol, ack.ExternalID.ID) continue } - brMsgIDs = append(brMsgIDs, &BrMsgID{ack.DestBridge, ack.ExternalID.Protocol + " " + ack.ExternalID.ID, ack.ExternalID.ChannelID}) + brMsgIDs = append(brMsgIDs, &BrMsgID{ack.DestBridge, ack.DestBridge.Protocol + " " + ack.ExternalID.ID, ack.ExternalID.ChannelID}) gw.Messages.Add(ack.InternalID, brMsgIDs) } }() From cbc30a5cf0602e460e668e8d299cdce9173f39e6 Mon Sep 17 00:00:00 2001 From: selfhoster1312 Date: Fri, 27 Feb 2026 23:51:32 +0100 Subject: [PATCH 8/8] Public method goes before private method --- gateway/gateway.go | 167 ++++++++++++++++++++++----------------------- 1 file changed, 80 insertions(+), 87 deletions(-) diff --git a/gateway/gateway.go b/gateway/gateway.go index 2c4d5b236..2d2140410 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -148,6 +148,86 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { return nil } +// SendMessage sends a message (with specified parentID) to the channel on the selected +// destination bridge and returns a message ID or an error. +func (gw *Gateway) SendMessage( + rmsg *config.Message, + dest *bridge.Bridge, + channel *config.ChannelInfo, + canonicalParentMsgID *xid.ID, +) { + msg := *rmsg + if msg.Event == config.EventAvatarDownload && channel.ID != getChannelID(rmsg) { + // Only send the avatar download event to ourselves. + return + } else if channel.ID == getChannelID(rmsg) { + // do not send to ourself for any other event + return + } + + // Only send irc notices to irc + if msg.Event == config.EventNoticeIRC && dest.Protocol != "irc" { + return + } + + // for api we need originchannel as channel + if dest.Protocol != apiProtocol { + msg.Channel = channel.Name + } + + msg.Avatar = gw.modifyAvatar(rmsg, dest) + msg.Username = gw.modifyUsername(rmsg, dest) + + // exclude file delete event as the msg ID here is the native file ID that needs to be deleted + if msg.Event != config.EventFileDelete { + // TODO: should we do something special in case of config.EventFileDelete? Or just leave hte origin message ID? + // Here the message ID as received by the origin bridge is removed. Why? I don't know why. + // Don't ask me questions. Let's just go with the flow and figure the rest later. + msg.ID = "" + } + + // TODO: ParentID should be typed too + if canonicalParentMsgID != nil { + msg.ParentID = gw.getDestMsgID(*canonicalParentMsgID, dest, channel) + } + + // if the parentID is still empty and we have a parentID set in the original message + // this means that we didn't find it in the cache so set it to a "msg-parent-not-found" constant + if msg.ParentID == "" && rmsg.ParentID != "" { + msg.ParentID = config.ParentIDNotFound + } + + drop, err := gw.modifyOutMessageTengo(rmsg, &msg, dest) + if err != nil { + gw.logger.Errorf("modifySendMessageTengo: %s", err) + } + + if drop { + gw.logger.Debugf("=> Tengo dropping %#v from %s (%s) to %s (%s)", msg, msg.Account, rmsg.Channel, dest.Account, channel.Name) + return + } + + // Too noisy to log like other events + if msg.Event != config.EventUserTyping { + debugSendMessage := fmt.Sprintf("=> Sending %#v from %s (%s) to %s (%s)", msg, msg.Account, rmsg.Channel, dest.Account, channel.Name) + gw.logger.Debug(debugSendMessage) + } + + // if we are using mattermost plugin account, send messages to MattermostPlugin channel + // that can be picked up by the mattermost matterbridge plugin + if dest.Account == "mattermost.plugin" { + gw.Router.MattermostPlugin <- msg + } + + // Send the message in the background + go func() { + t := time.Now() + // TODO: remove this when the interface removes the return type + _, _ = dest.Send(msg) + gw.logger.Debugf("=> Send from %s (%s) to %s (%s) took %s", msg.Account, rmsg.Channel, dest.Account, channel.Name, time.Since(t)) + }() +} + // checkConfig checks a bridge config, on startup. // // This is not triggered when config is reloaded from disk. @@ -471,93 +551,6 @@ func (gw *Gateway) modifyMessage(msg *config.Message) { } } -// SendMessage sends a message (with specified parentID) to the channel on the selected -// destination bridge and returns a message ID or an error. -func (gw *Gateway) SendMessage( - rmsg *config.Message, - dest *bridge.Bridge, - channel *config.ChannelInfo, - canonicalParentMsgID *xid.ID, -) { - msg := *rmsg - // Only send the avatar download event to ourselves. - if msg.Event == config.EventAvatarDownload { - if channel.ID != getChannelID(rmsg) { - return - } - } else { - // do not send to ourself for any other event - if channel.ID == getChannelID(rmsg) { - return - } - } - - // Only send irc notices to irc - if msg.Event == config.EventNoticeIRC && dest.Protocol != "irc" { - return - } - - // Too noisy to log like other events - debugSendMessage := "" - if msg.Event != config.EventUserTyping { - debugSendMessage = fmt.Sprintf("=> Sending %#v from %s (%s) to %s (%s)", msg, msg.Account, rmsg.Channel, dest.Account, channel.Name) - } - - msg.Channel = channel.Name - msg.Avatar = gw.modifyAvatar(rmsg, dest) - msg.Username = gw.modifyUsername(rmsg, dest) - - // exclude file delete event as the msg ID here is the native file ID that needs to be deleted - if msg.Event != config.EventFileDelete { - msg.ID = gw.getDestMsgID(rmsg.InternalID, dest, channel) - } - - // for api we need originchannel as channel - if dest.Protocol == apiProtocol { - msg.Channel = rmsg.Channel - } - - // TODO: ParentID should be typed too - if canonicalParentMsgID != nil { - msg.ParentID = gw.getDestMsgID(*canonicalParentMsgID, dest, channel) - } - - // if the parentID is still empty and we have a parentID set in the original message - // this means that we didn't find it in the cache so set it to a "msg-parent-not-found" constant - if msg.ParentID == "" && rmsg.ParentID != "" { - msg.ParentID = config.ParentIDNotFound - } - - drop, err := gw.modifyOutMessageTengo(rmsg, &msg, dest) - if err != nil { - gw.logger.Errorf("modifySendMessageTengo: %s", err) - } - - if drop { - gw.logger.Debugf("=> Tengo dropping %#v from %s (%s) to %s (%s)", msg, msg.Account, rmsg.Channel, dest.Account, channel.Name) - return - } - - if debugSendMessage != "" { - gw.logger.Debug(debugSendMessage) - } - // if we are using mattermost plugin account, send messages to MattermostPlugin channel - // that can be picked up by the mattermost matterbridge plugin - if dest.Account == "mattermost.plugin" { - gw.Router.MattermostPlugin <- msg - } - - // Send the message in the background - go func() { - defer func(t time.Time) { - gw.logger.Debugf("=> Send from %s (%s) to %s (%s) took %s", msg.Account, rmsg.Channel, dest.Account, channel.Name, time.Since(t)) - }(time.Now()) - - // TODO: remove this when the interface removes the return type - _, _ = dest.Send(msg) - }() -} - func (gw *Gateway) validGatewayDest(msg *config.Message) bool { return msg.Gateway == gw.Name }