diff --git a/bridge/bridge.go b/bridge/bridge.go index 44642e733..15bbfa4f8 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -13,6 +13,7 @@ import ( "time" "github.com/matterbridge-org/matterbridge/bridge/config" + "github.com/rs/xid" "github.com/sirupsen/logrus" ) @@ -23,6 +24,7 @@ type Bridger interface { Disconnect() error NewHttpRequest(method, uri string, body io.Reader) (*http.Request, error) NewHttpClient(proxy string) (*http.Client, error) + AckSentMessage(internal xid.ID, external string, channel string) } type Bridge struct { @@ -44,7 +46,17 @@ type Bridge struct { type Config struct { *Bridge - Remote chan config.Message + Remote chan config.Message + MessageSentAck chan MessageSent +} + +// MessageSent is an acknowledgement received from a remote +// network that a message has been successfully sent, along +// with a protocol-dependent unique ID. +type MessageSent struct { + DestBridge *Bridge + InternalID xid.ID + ExternalID config.MessageSentID } // Factory is the factory function to create a bridge @@ -404,3 +416,16 @@ func (b *Bridge) addAttachmentProcess(msg *config.Message, filename string, id s return nil } + +func (b *Config) AckSentMessage(internal xid.ID, external string, channel string) { + go func() { + b.MessageSentAck <- MessageSent{ + DestBridge: b.Bridge, + InternalID: internal, + ExternalID: config.MessageSentID{ + ChannelID: channel, + ID: external, + }, + } + }() +} diff --git a/bridge/config/config.go b/bridge/config/config.go index 994b83488..16c954e48 100644 --- a/bridge/config/config.go +++ b/bridge/config/config.go @@ -12,6 +12,7 @@ import ( "time" "github.com/fsnotify/fsnotify" + "github.com/rs/xid" "github.com/sirupsen/logrus" "github.com/spf13/viper" ) @@ -34,20 +35,26 @@ const ( const ParentIDNotFound = "msg-parent-not-found" +type MessageSentID struct { + ChannelID string + ID string +} + type Message struct { - Text string `json:"text"` - Channel string `json:"channel"` - Username string `json:"username"` - UserID string `json:"userid"` // userid on the bridge - Avatar string `json:"avatar"` - Account string `json:"account"` - Event string `json:"event"` - Protocol string `json:"protocol"` - Gateway string `json:"gateway"` - ParentID string `json:"parent_id"` - Timestamp time.Time `json:"timestamp"` - ID string `json:"id"` - Extra map[string][]interface{} + Text string `json:"text"` + Channel string `json:"channel"` + Username string `json:"username"` + UserID string `json:"userid"` // userid on the bridge + Avatar string `json:"avatar"` + Account string `json:"account"` + Event string `json:"event"` + Protocol string `json:"protocol"` + Gateway string `json:"gateway"` + ParentID string `json:"parent_id"` + Timestamp time.Time `json:"timestamp"` + ID string `json:"id"` + InternalID xid.ID + Extra map[string][]interface{} } func (m Message) ParentNotFound() bool { diff --git a/bridge/discord/discord.go b/bridge/discord/discord.go index 5b6b7e413..d9a43a36d 100644 --- a/bridge/discord/discord.go +++ b/bridge/discord/discord.go @@ -7,7 +7,7 @@ import ( "sync" "github.com/bwmarrin/discordgo" - lru "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru/v2" "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/bridge/discord/transmitter" @@ -39,11 +39,11 @@ type Bdiscord struct { // Webhook specific logic useAutoWebhooks bool transmitter *transmitter.Transmitter - cache *lru.Cache + cache *lru.Cache[string, string] } func New(cfg *bridge.Config) bridge.Bridger { - newCache, err := lru.New(5000) + newCache, err := lru.New[string, string](5000) if err != nil { cfg.Log.Fatalf("Could not create LRU cache: %v", err) } @@ -305,7 +305,7 @@ func (b *Bdiscord) handleEventBotUser(msg *config.Message, channelID string) (st } if fi, ok := b.cache.Get(cFileUpload + msg.ID); ok { - err := b.c.ChannelMessageDelete(channelID, fi.(string)) // nolint:forcetypeassert + err := b.c.ChannelMessageDelete(channelID, fi) b.cache.Remove(cFileUpload + msg.ID) return "", err } diff --git a/bridge/rocketchat/rocketchat.go b/bridge/rocketchat/rocketchat.go index 247f2b466..af8a766a9 100644 --- a/bridge/rocketchat/rocketchat.go +++ b/bridge/rocketchat/rocketchat.go @@ -5,7 +5,7 @@ import ( "strings" "sync" - lru "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru/v2" "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/bridge/helper" @@ -22,7 +22,7 @@ type Brocketchat struct { rh *rockethook.Client c *realtime.Client r *rest.Client - cache *lru.Cache + cache *lru.Cache[string, bool] *bridge.Config messageChan chan models.Message channelMap map[string]string @@ -37,7 +37,7 @@ const ( ) func New(cfg *bridge.Config) bridge.Bridger { - newCache, err := lru.New(100) + newCache, err := lru.New[string, bool](100) if err != nil { cfg.Log.Fatalf("Could not create LRU cache for rocketchat bridge: %v", err) } diff --git a/bridge/slack/handlers.go b/bridge/slack/handlers.go index e6649a98d..7794ceb63 100644 --- a/bridge/slack/handlers.go +++ b/bridge/slack/handlers.go @@ -231,7 +231,7 @@ func (b *Bslack) handleMessageEvent(ev *slack.MessageEvent) (*config.Message, er } func (b *Bslack) handleFileDeletedEvent(ev *slack.FileDeletedEvent) (*config.Message, error) { - if rawChannel, ok := b.cache.Get(cfileDownloadChannel + ev.FileID); ok { + if rawChannel, ok := b.cacheChan.Get(cfileDownloadChannel + ev.FileID); ok { channel, err := b.channels.getChannelByID(rawChannel.(string)) if err != nil { return nil, err @@ -320,7 +320,7 @@ func (b *Bslack) handleAttachments(ev *slack.MessageEvent, rmsg *config.Message) // If we have files attached, download them (in memory) and put a pointer to it in msg.Extra. for i := range ev.Files { // keep reference in cache on which channel we added this file - b.cache.Add(cfileDownloadChannel+ev.Files[i].ID, ev.Channel) + b.cacheChan.Add(cfileDownloadChannel+ev.Files[i].ID, ev.Channel) if err := b.handleDownloadFile(rmsg, &ev.Files[i], false); err != nil { b.Log.Errorf("Could not download incoming file: %#v", err) } @@ -406,9 +406,9 @@ func (b *Bslack) handleGetChannelMembers(rmsg *config.Message) bool { // (the assumption is that such name collisions will not occur within the given // timeframes). func (b *Bslack) fileCached(file *slack.File) bool { - if ts, ok := b.cache.Get("file" + file.ID); ok && time.Since(ts.(time.Time)) < time.Minute { + if ts, ok := b.cacheTs.Get("file" + file.ID); ok && time.Since(ts) < time.Minute { return true - } else if ts, ok = b.cache.Get("filename" + file.Name); ok && time.Since(ts.(time.Time)) < 10*time.Second { + } else if ts, ok = b.cacheTs.Get("filename" + file.Name); ok && time.Since(ts) < 10*time.Second { return true } return false diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index 1eb0b8ffd..3832dc915 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -8,7 +8,7 @@ import ( "sync" "time" - lru "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru/v2" "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/bridge/helper" @@ -26,7 +26,10 @@ type Bslack struct { rtm *slack.RTM si *slack.Info - cache *lru.Cache + // File timestamps + cacheTs *lru.Cache[string, time.Time] + // Channels + cacheChan *lru.Cache[string, any] uuid string useChannelID bool @@ -81,14 +84,20 @@ func New(cfg *bridge.Config) bridge.Bridger { } func newBridge(cfg *bridge.Config) *Bslack { - newCache, err := lru.New(5000) + newCache, err := lru.New[string, time.Time](5000) + if err != nil { + cfg.Log.Fatalf("Could not create LRU cache for Slack bridge: %v", err) + } + + newCache2, err := lru.New[string, any](5000) if err != nil { cfg.Log.Fatalf("Could not create LRU cache for Slack bridge: %v", err) } b := &Bslack{ - Config: cfg, - uuid: xid.New().String(), - cache: newCache, + Config: cfg, + uuid: xid.New().String(), + cacheTs: newCache, + cacheChan: newCache2, } return b } @@ -458,7 +467,7 @@ func (b *Bslack) uploadFile(msg *config.Message, channelID string) (string, erro // we can't match on the file ID yet, so we have to match on the filename too. ts := time.Now() b.Log.Debugf("Adding file %s to cache at %s with timestamp", fi.Name, ts.String()) - b.cache.Add("filename"+fi.Name, ts) + b.cacheTs.Add("filename"+fi.Name, ts) initialComment := fmt.Sprintf("File from %s", msg.Username) if fi.Comment != "" { initialComment += fmt.Sprintf(" with comment: %s", fi.Comment) @@ -484,7 +493,7 @@ func (b *Bslack) uploadFile(msg *config.Message, channelID string) (string, erro } b.Log.Debugf("Adding file ID %s to cache with timestamp %s", res.ID, ts.String()) - b.cache.Add("file"+res.ID, ts) + b.cacheTs.Add("file"+res.ID, ts) // search for message id by uploaded file in private/public channels, get thread timestamp from uploaded file if v, ok := sfi.Shares.Private[channelID]; ok && len(v) > 0 { diff --git a/bridge/xmpp/xmpp.go b/bridge/xmpp/xmpp.go index 8e17e682d..b8f63e038 100644 --- a/bridge/xmpp/xmpp.go +++ b/bridge/xmpp/xmpp.go @@ -26,6 +26,11 @@ type Bxmpp struct { avatarAvailability map[string]bool avatarMap map[string]string + + // Mapping of sent origin-id to internal matterbridge ID of source msg + // An internal ID may correspond to several XMPP origin-id because + // of split message (eg. multiple file upload) + messageMap map[string]xid.ID } func New(cfg *bridge.Config) bridge.Bridger { @@ -34,6 +39,7 @@ func New(cfg *bridge.Config) bridge.Bridger { xmppMap: make(map[string]string), avatarAvailability: make(map[string]bool), avatarMap: make(map[string]string), + messageMap: make(map[string]xid.ID), } } @@ -94,10 +100,15 @@ func (b *Bxmpp) Send(msg config.Message) (string, error) { if msg.Extra != nil { for _, rmsg := range helper.HandleExtra(&msg, b.General) { b.Log.Debugf("=> Sending attachement message %#v", rmsg) + + originId := xid.New().String() + b.messageMap[originId] = msg.InternalID + _, err = b.xc.Send(xmpp.Chat{ - Type: "groupchat", - Remote: rmsg.Channel + "@" + b.GetString("Muc"), - Text: rmsg.Username + rmsg.Text, + Type: "groupchat", + Remote: rmsg.Channel + "@" + b.GetString("Muc"), + Text: rmsg.Username + rmsg.Text, + OriginID: originId, }) if err != nil { @@ -105,24 +116,27 @@ func (b *Bxmpp) Send(msg config.Message) (string, error) { } } if len(msg.Extra["file"]) > 0 { + // TODO: log return "", b.handleUploadFile(&msg) } } + originId := xid.New().String() + b.messageMap[originId] = msg.InternalID + // Post normal message. b.Log.Debugf("=> Sending message %#v", msg) if _, err := b.xc.Send(xmpp.Chat{ - Type: "groupchat", - Remote: msg.Channel + "@" + b.GetString("Muc"), - Text: msg.Username + msg.Text, + Type: "groupchat", + Remote: msg.Channel + "@" + b.GetString("Muc"), + Text: msg.Username + msg.Text, + OriginID: originId, }); err != nil { + // TODO: log return "", err } - // Generate a dummy ID because to avoid collision with other internal messages - // However this does not provide proper Edits/Replies integration on XMPP side. - msgID := xid.New().String() - return msgID, nil + return "", nil } func (b *Bxmpp) createXMPP() error { @@ -338,20 +352,29 @@ func (b *Bxmpp) handleUploadFile(msg *config.Message) error { urlDesc = fileInfo.Comment } } + + originId := xid.New().String() + b.messageMap[originId] = msg.InternalID + if _, err := b.xc.Send(xmpp.Chat{ - Type: "groupchat", - Remote: msg.Channel + "@" + b.GetString("Muc"), - Text: msg.Username + msg.Text, + Type: "groupchat", + Remote: msg.Channel + "@" + b.GetString("Muc"), + Text: msg.Username + msg.Text, + OriginID: originId, }); err != nil { return err } + originId = xid.New().String() + + b.messageMap[originId] = msg.InternalID if fileInfo.URL != "" { if _, err := b.xc.SendOOB(xmpp.Chat{ - Type: "groupchat", - Remote: msg.Channel + "@" + b.GetString("Muc"), - Ooburl: fileInfo.URL, - Oobdesc: urlDesc, + Type: "groupchat", + Remote: msg.Channel + "@" + b.GetString("Muc"), + Ooburl: fileInfo.URL, + Oobdesc: urlDesc, + OriginID: originId, }); err != nil { b.Log.WithError(err).Warn("Failed to send share URL.") } @@ -383,6 +406,12 @@ func (b *Bxmpp) parseChannel(remote string) string { func (b *Bxmpp) skipMessage(message xmpp.Chat) bool { // skip messages from ourselves if b.parseNick(message.Remote) == b.GetString("Nick") { + // We received our own message, we will further ignore it, + // but first send back the ID it was assigned. + if message.OriginID != "" { + internal := b.messageMap[message.OriginID] + b.AckSentMessage(internal, message.OriginID, b.parseChannel(message.Remote)) + } return true } diff --git a/gateway/gateway.go b/gateway/gateway.go index 7ca84e4f4..2d2140410 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -9,11 +9,12 @@ import ( "github.com/d5/tengo/v2" "github.com/d5/tengo/v2/stdlib" - lru "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru/v2" "github.com/kyokomi/emoji/v2" "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/internal" + "github.com/rs/xid" "github.com/sirupsen/logrus" ) @@ -27,7 +28,7 @@ type Gateway struct { ChannelOptions map[string]config.ChannelOptions Message chan config.Message Name string - Messages *lru.Cache + Messages *lru.Cache[xid.ID, []*BrMsgID] logger *logrus.Entry } @@ -45,7 +46,7 @@ const apiProtocol = "api" func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) *Gateway { logger := rootLogger.WithFields(logrus.Fields{"prefix": "gateway"}) - cache, _ := lru.New(5000) + cache, _ := lru.New[xid.ID, []*BrMsgID](5000) gw := &Gateway{ Channels: make(map[string]*config.ChannelInfo), Message: r.Message, @@ -62,23 +63,19 @@ func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) *Gateway { } // FindCanonicalMsgID returns the ID under which a message was stored in the cache. -func (gw *Gateway) FindCanonicalMsgID(protocol string, mID string) string { - ID := protocol + " " + mID - if gw.Messages.Contains(ID) { - return ID - } - - // If not keyed, iterate through cache for downstream, and infer upstream. - for _, mid := range gw.Messages.Keys() { - v, _ := gw.Messages.Peek(mid) - ids := v.([]*BrMsgID) - for _, downstreamMsgObj := range ids { - if ID == downstreamMsgObj.ID { - return mid.(string) +func (gw *Gateway) FindCanonicalMsgID(protocol string, externalID string) *xid.ID { + // Now that we have internal IDs, mID is never going to be the key + for _, internalID := range gw.Messages.Keys() { + // TODO: should we check if the mapping exists here? or is this method + // only used when we're 100% sure? + externalIDs, _ := gw.Messages.Peek(internalID) + for _, downstreamMsgObj := range externalIDs { + if externalID == downstreamMsgObj.ID { + return &internalID } } } - return "" + return nil } // AddBridge sets up a new bridge on startup. @@ -102,9 +99,43 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { br.HttpClient = http_client + // The channel to receive message IDs is shared with the + // bridges, but is not kept in the gateway. + messageAck := make(chan bridge.MessageSent, 100) + // Start listening for sent message acknowledgements + go func() { + for ack := range messageAck { + gw.logger.Warnf("Message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.DestBridge.Protocol, ack.ExternalID.ID) + // TODO 2026: this was a comment in the previous ID handling. Not + // sure what to do about ID changing on edit???? + // + // Only add the message ID if it doesn't already exist + // + // For some bridges we always add/update the message ID. + // This is necessary as msgIDs will change if a bridge returns + // a different ID in response to edits. + + // brMsgIDs is always initialized in Router.handleReceive(). However, + // we may still receive a message we don't know about. Maybe matterbridge + // was restarted, or another client is connected on the same account sending + // messages, or the remote server is melting down and dinosaurs are walking + // the Earth... + brMsgIDs, exists := gw.Messages.Get(ack.InternalID) + + if !exists { + gw.logger.Warnf("Unknown message %s has been acked by %s as ID: %s", ack.InternalID.String(), ack.DestBridge.Protocol, ack.ExternalID.ID) + continue + } + + brMsgIDs = append(brMsgIDs, &BrMsgID{ack.DestBridge, ack.DestBridge.Protocol + " " + ack.ExternalID.ID, ack.ExternalID.ChannelID}) + gw.Messages.Add(ack.InternalID, brMsgIDs) + } + }() + brconfig := &bridge.Config{ - Remote: gw.Message, - Bridge: br, + Remote: gw.Message, + MessageSentAck: messageAck, + Bridge: br, } // add the actual bridger for this protocol to this bridge using the bridgeMap if _, ok := gw.Router.BridgeMap[br.Protocol]; !ok { @@ -117,6 +148,86 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { return nil } +// SendMessage sends a message (with specified parentID) to the channel on the selected +// destination bridge and returns a message ID or an error. +func (gw *Gateway) SendMessage( + rmsg *config.Message, + dest *bridge.Bridge, + channel *config.ChannelInfo, + canonicalParentMsgID *xid.ID, +) { + msg := *rmsg + if msg.Event == config.EventAvatarDownload && channel.ID != getChannelID(rmsg) { + // Only send the avatar download event to ourselves. + return + } else if channel.ID == getChannelID(rmsg) { + // do not send to ourself for any other event + return + } + + // Only send irc notices to irc + if msg.Event == config.EventNoticeIRC && dest.Protocol != "irc" { + return + } + + // for api we need originchannel as channel + if dest.Protocol != apiProtocol { + msg.Channel = channel.Name + } + + msg.Avatar = gw.modifyAvatar(rmsg, dest) + msg.Username = gw.modifyUsername(rmsg, dest) + + // exclude file delete event as the msg ID here is the native file ID that needs to be deleted + if msg.Event != config.EventFileDelete { + // TODO: should we do something special in case of config.EventFileDelete? Or just leave hte origin message ID? + // Here the message ID as received by the origin bridge is removed. Why? I don't know why. + // Don't ask me questions. Let's just go with the flow and figure the rest later. + msg.ID = "" + } + + // TODO: ParentID should be typed too + if canonicalParentMsgID != nil { + msg.ParentID = gw.getDestMsgID(*canonicalParentMsgID, dest, channel) + } + + // if the parentID is still empty and we have a parentID set in the original message + // this means that we didn't find it in the cache so set it to a "msg-parent-not-found" constant + if msg.ParentID == "" && rmsg.ParentID != "" { + msg.ParentID = config.ParentIDNotFound + } + + drop, err := gw.modifyOutMessageTengo(rmsg, &msg, dest) + if err != nil { + gw.logger.Errorf("modifySendMessageTengo: %s", err) + } + + if drop { + gw.logger.Debugf("=> Tengo dropping %#v from %s (%s) to %s (%s)", msg, msg.Account, rmsg.Channel, dest.Account, channel.Name) + return + } + + // Too noisy to log like other events + if msg.Event != config.EventUserTyping { + debugSendMessage := fmt.Sprintf("=> Sending %#v from %s (%s) to %s (%s)", msg, msg.Account, rmsg.Channel, dest.Account, channel.Name) + gw.logger.Debug(debugSendMessage) + } + + // if we are using mattermost plugin account, send messages to MattermostPlugin channel + // that can be picked up by the mattermost matterbridge plugin + if dest.Account == "mattermost.plugin" { + gw.Router.MattermostPlugin <- msg + } + + // Send the message in the background + go func() { + t := time.Now() + // TODO: remove this when the interface removes the return type + _, _ = dest.Send(msg) + gw.logger.Debugf("=> Send from %s (%s) to %s (%s) took %s", msg.Account, rmsg.Channel, dest.Account, channel.Name, time.Since(t)) + }() +} + // checkConfig checks a bridge config, on startup. // // This is not triggered when config is reloaded from disk. @@ -272,14 +383,12 @@ func (gw *Gateway) getDestChannel(msg *config.Message, dest bridge.Bridge) []con return channels } -func (gw *Gateway) getDestMsgID(msgID string, dest *bridge.Bridge, channel *config.ChannelInfo) string { - if res, ok := gw.Messages.Get(msgID); ok { - IDs := res.([]*BrMsgID) +func (gw *Gateway) getDestMsgID(msgID xid.ID, dest *bridge.Bridge, channel *config.ChannelInfo) string { + if IDs, ok := gw.Messages.Get(msgID); ok { for _, id := range IDs { // check protocol, bridge name and channelname - // for people that reuse the same bridge multiple times. see #342 if dest.Protocol == id.br.Protocol && dest.Name == id.br.Name && channel.ID == id.ChannelID { - return strings.Replace(id.ID, dest.Protocol+" ", "", 1) + return id.ID } } } @@ -442,100 +551,6 @@ func (gw *Gateway) modifyMessage(msg *config.Message) { } } -// SendMessage sends a message (with specified parentID) to the channel on the selected -// destination bridge and returns a message ID or an error. -func (gw *Gateway) SendMessage( - rmsg *config.Message, - dest *bridge.Bridge, - channel *config.ChannelInfo, - canonicalParentMsgID string, -) (string, error) { - msg := *rmsg - // Only send the avatar download event to ourselves. - if msg.Event == config.EventAvatarDownload { - if channel.ID != getChannelID(rmsg) { - return "", nil - } - } else { - // do not send to ourself for any other event - if channel.ID == getChannelID(rmsg) { - return "", nil - } - } - - // Only send irc notices to irc - if msg.Event == config.EventNoticeIRC && dest.Protocol != "irc" { - return "", nil - } - - // Too noisy to log like other events - debugSendMessage := "" - if msg.Event != config.EventUserTyping { - debugSendMessage = fmt.Sprintf("=> Sending %#v from %s (%s) to %s (%s)", msg, msg.Account, rmsg.Channel, dest.Account, channel.Name) - } - - msg.Channel = channel.Name - msg.Avatar = gw.modifyAvatar(rmsg, dest) - msg.Username = gw.modifyUsername(rmsg, dest) - - // exclude file delete event as the msg ID here is the native file ID that needs to be deleted - if msg.Event != config.EventFileDelete { - msg.ID = gw.getDestMsgID(rmsg.Protocol+" "+rmsg.ID, dest, channel) - } - - // for api we need originchannel as channel - if dest.Protocol == apiProtocol { - msg.Channel = rmsg.Channel - } - - msg.ParentID = gw.getDestMsgID(canonicalParentMsgID, dest, channel) - if msg.ParentID == "" { - msg.ParentID = strings.Replace(canonicalParentMsgID, dest.Protocol+" ", "", 1) - } - - // if the parentID is still empty and we have a parentID set in the original message - // this means that we didn't find it in the cache so set it to a "msg-parent-not-found" constant - if msg.ParentID == "" && rmsg.ParentID != "" { - msg.ParentID = config.ParentIDNotFound - } - - drop, err := gw.modifyOutMessageTengo(rmsg, &msg, dest) - if err != nil { - gw.logger.Errorf("modifySendMessageTengo: %s", err) - } - - if drop { - gw.logger.Debugf("=> Tengo dropping %#v from %s (%s) to %s (%s)", msg, msg.Account, rmsg.Channel, dest.Account, channel.Name) - return "", nil - } - - if debugSendMessage != "" { - gw.logger.Debug(debugSendMessage) - } - // if we are using mattermost plugin account, send messages to MattermostPlugin channel - // that can be picked up by the mattermost matterbridge plugin - if dest.Account == "mattermost.plugin" { - gw.Router.MattermostPlugin <- msg - } - - defer func(t time.Time) { - gw.logger.Debugf("=> Send from %s (%s) to %s (%s) took %s", msg.Account, rmsg.Channel, dest.Account, channel.Name, time.Since(t)) - }(time.Now()) - - mID, err := dest.Send(msg) - if err != nil { - return mID, err - } - - // append the message ID (mID) from this bridge (dest) to our brMsgIDs slice - if mID != "" { - gw.logger.Debugf("mID %s: %s", dest.Account, mID) - return mID, nil - // brMsgIDs = append(brMsgIDs, &BrMsgID{dest, dest.Protocol + " " + mID, channel.ID}) - } - return "", nil -} - func (gw *Gateway) validGatewayDest(msg *config.Message) bool { return msg.Gateway == gw.Name } diff --git a/gateway/handlers.go b/gateway/handlers.go index b934f5343..8cdbae9ed 100644 --- a/gateway/handlers.go +++ b/gateway/handlers.go @@ -11,6 +11,7 @@ import ( "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/gateway/bridgemap" + "github.com/rs/xid" ) // handleEventFailure handles failures and reconnects bridges. @@ -177,7 +178,7 @@ func (gw *Gateway) handleMessage(rmsg *config.Message, dest *bridge.Bridge) []*B } // Get the ID of the parent message in thread - var canonicalParentMsgID string + var canonicalParentMsgID *xid.ID if rmsg.ParentID != "" && dest.GetBool("PreserveThreading") { canonicalParentMsgID = gw.FindCanonicalMsgID(rmsg.Protocol, rmsg.ParentID) } @@ -185,15 +186,8 @@ func (gw *Gateway) handleMessage(rmsg *config.Message, dest *bridge.Bridge) []*B channels := gw.getDestChannel(rmsg, *dest) for idx := range channels { channel := &channels[idx] - msgID, err := gw.SendMessage(rmsg, dest, channel, canonicalParentMsgID) - if err != nil { - gw.logger.Errorf("SendMessage failed: %s", err) - continue - } - if msgID == "" { - continue - } - brMsgIDs = append(brMsgIDs, &BrMsgID{dest, dest.Protocol + " " + msgID, channel.ID}) + gw.SendMessage(rmsg, dest, channel, canonicalParentMsgID) + // brMsgIDs = append(brMsgIDs, &BrMsgID{dest, dest.Protocol + " " + msgID, channel.ID}) } return brMsgIDs } diff --git a/gateway/router.go b/gateway/router.go index a8aad1aae..414bd76d8 100644 --- a/gateway/router.go +++ b/gateway/router.go @@ -8,6 +8,7 @@ import ( "github.com/matterbridge-org/matterbridge/bridge" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/gateway/samechannel" + "github.com/rs/xid" "github.com/sirupsen/logrus" ) @@ -136,17 +137,23 @@ func (r *Router) getBridge(account string) *bridge.Bridge { func (r *Router) handleReceive() { for msg := range r.Message { msg := msg // scopelint + + // We add an internal UUID which will allow destination protocols + // to send back their own ID(s) corresponding to the message go the + // gateway in an asynchronous manner (for replies/reactions). + msg.InternalID = xid.New() + r.handleEventGetChannelMembers(&msg) r.handleEventFailure(&msg) r.handleEventRejoinChannels(&msg) + msgBridge := r.getBridge(msg.Account) // Set message protocol based on the account it came from - msg.Protocol = r.getBridge(msg.Account).Protocol + msg.Protocol = msgBridge.Protocol filesHandled := false for _, gw := range r.Gateways { // record all the message ID's of the different bridges - var msgIDs []*BrMsgID if gw.ignoreMessage(&msg) { continue } @@ -156,21 +163,18 @@ func (r *Router) handleReceive() { gw.handleFiles(&msg) filesHandled = true } - for _, br := range gw.Bridges { - msgIDs = append(msgIDs, gw.handleMessage(&msg, br)...) - } + // If the origin bridge brought us a message ID, map it with our + // internal ID for replies/reactions. + BrMsgIDs := []*BrMsgID{} if msg.ID != "" { - _, exists := gw.Messages.Get(msg.Protocol + " " + msg.ID) - - // Only add the message ID if it doesn't already exist - // - // For some bridges we always add/update the message ID. - // This is necessary as msgIDs will change if a bridge returns - // a different ID in response to edits. - if !exists { - gw.Messages.Add(msg.Protocol+" "+msg.ID, msgIDs) - } + BrMsgIDs = append(BrMsgIDs, &BrMsgID{msgBridge, msg.ID, msg.Channel}) + } + // Even if it might be empty, already initialize the mapping + gw.Messages.Add(msg.InternalID, BrMsgIDs) + + for _, br := range gw.Bridges { + gw.handleMessage(&msg, br) } } } diff --git a/go.mod b/go.mod index 75301890a..79ce19a6f 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/gomarkdown/markdown v0.0.0-20240419095408-642f0ee99ae2 github.com/google/gops v0.3.27 github.com/gorilla/schema v1.4.1 - github.com/hashicorp/golang-lru v1.0.2 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jpillora/backoff v1.0.0 github.com/kyokomi/emoji/v2 v2.2.13 github.com/labstack/echo/v4 v4.12.0 @@ -44,7 +44,7 @@ require ( go.mau.fi/whatsmeow v0.0.0-20260123132415-83db04703aee golang.org/x/image v0.19.0 golang.org/x/oauth2 v0.22.0 - golang.org/x/text v0.33.0 + golang.org/x/text v0.34.0 gomod.garykim.dev/nc-talk v0.3.0 google.golang.org/protobuf v1.36.11 layeh.com/gumble v0.0.0-20221205141517-d1df60a3cc14 @@ -76,7 +76,7 @@ require ( github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-plugin v1.6.1 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/kettek/apng v0.0.0-20191108220231-414630eed80f // indirect @@ -133,11 +133,11 @@ require ( go.mau.fi/libsignal v0.2.1 // indirect go.mau.fi/util v0.9.5 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.47.0 // indirect + golang.org/x/crypto v0.48.0 // indirect golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect - golang.org/x/net v0.49.0 // indirect - golang.org/x/sys v0.40.0 // indirect - golang.org/x/term v0.39.0 // indirect + golang.org/x/net v0.50.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/term v0.40.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade // indirect google.golang.org/grpc v1.65.0 // indirect @@ -156,4 +156,6 @@ require ( //replace github.com/matrix-org/gomatrix => github.com/matterbridge/gomatrix v0.0.0-20220205235239-607eb9ee6419 +replace github.com/xmppo/go-xmpp => github.com/selfhoster1312/go-xmpp v0.0.0-20260227182655-43afd5dbe004 + go 1.24.0 diff --git a/go.sum b/go.sum index be7409297..b395c4b86 100644 --- a/go.sum +++ b/go.sum @@ -137,8 +137,8 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-plugin v1.6.1 h1:P7MR2UP6gNKGPp+y7EZw2kOiq4IR9WiqLvp0XOsVdwI= github.com/hashicorp/go-plugin v1.6.1/go.mod h1:XPHFku2tFo3o3QKFgSYo+cghcUhw1NA1hZyMK0PWAw0= -github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= -github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -308,6 +308,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d h1:hrujxIzL1woJ7AwssoOcM/tq5JjjG2yYOc8odClEiXA= github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU= +github.com/selfhoster1312/go-xmpp v0.0.0-20260227182655-43afd5dbe004 h1:t50UGMPvT3PB/J0uGxcmE0q0NyN9CAjRqlipRyUjdDA= +github.com/selfhoster1312/go-xmpp v0.0.0-20260227182655-43afd5dbe004/go.mod h1:uUJCG8facQZa7XN9qS/WnNQu3+KkwypbYm4HlmlJ0ME= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= @@ -419,8 +421,6 @@ github.com/writeas/go-strip-markdown v2.0.1+incompatible h1:IIqxTM5Jr7RzhigcL6Fk github.com/writeas/go-strip-markdown v2.0.1+incompatible/go.mod h1:Rsyu10ZhbEK9pXdk8V6MVnZmTzRG0alMNLMwa0J01fE= github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= -github.com/xmppo/go-xmpp v0.3.1 h1:GZUCGZozZdfvv25FVvlgwKoNdRjmfJCUcVG8349/E4A= -github.com/xmppo/go-xmpp v0.3.1/go.mod h1:EBLbzPt4Y9OJBEF58Lhc4IrTnO226aIkbfosQo6KWeA= github.com/yaegashi/msgraph.go v0.1.4 h1:leDXSczAbwBpYFSmmZrdByTiPoUw8dbTfNMetAjJvbw= github.com/yaegashi/msgraph.go v0.1.4/go.mod h1:vgeYhHa5skJt/3lTyjGXThTZhwbhRnGo6uUxzoJIGME= github.com/yaegashi/wtz.go v0.0.2/go.mod h1:nOLA5QXsmdkRxBkP5tljhua13ADHCKirLBrzPf4PEJc= @@ -447,8 +447,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= -golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20260112195511-716be5621a96 h1:Z/6YuSHTLOHfNFdb8zVZomZr7cqNgTJvA8+Qz75D8gU= golang.org/x/exp v0.0.0-20260112195511-716be5621a96/go.mod h1:nzimsREAkjBCIEFtHiYkrJyT+2uy9YZJB7H1k68CXZU= @@ -476,8 +476,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= -golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= +golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -517,20 +517,20 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY= -golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww= +golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg= +golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= -golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=