Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/matterbridge-org/matterbridge/bridge/config"
"github.com/rs/xid"
"github.com/sirupsen/logrus"
)

Expand All @@ -23,6 +24,7 @@ type Bridger interface {
Disconnect() error
NewHttpRequest(method, uri string, body io.Reader) (*http.Request, error)
NewHttpClient(proxy string) (*http.Client, error)
AckSentMessage(internal xid.ID, external string, channel string)
}

type Bridge struct {
Expand All @@ -44,7 +46,17 @@ type Bridge struct {
type Config struct {
*Bridge

Remote chan config.Message
Remote chan config.Message
MessageSentAck chan MessageSent
}

// MessageSent is an acknowledgement received from a remote
// network that a message has been successfully sent, along
// with a protocol-dependent unique ID.
type MessageSent struct {
DestBridge *Bridge
InternalID xid.ID
ExternalID config.MessageSentID
}

// Factory is the factory function to create a bridge
Expand Down Expand Up @@ -404,3 +416,16 @@ func (b *Bridge) addAttachmentProcess(msg *config.Message, filename string, id s

return nil
}

func (b *Config) AckSentMessage(internal xid.ID, external string, channel string) {
go func() {
b.MessageSentAck <- MessageSent{
DestBridge: b.Bridge,
InternalID: internal,
ExternalID: config.MessageSentID{
ChannelID: channel,
ID: external,
},
}
}()
}
33 changes: 20 additions & 13 deletions bridge/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/fsnotify/fsnotify"
"github.com/rs/xid"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
Expand All @@ -34,20 +35,26 @@ const (

const ParentIDNotFound = "msg-parent-not-found"

type MessageSentID struct {
ChannelID string
ID string
}

type Message struct {
Text string `json:"text"`
Channel string `json:"channel"`
Username string `json:"username"`
UserID string `json:"userid"` // userid on the bridge
Avatar string `json:"avatar"`
Account string `json:"account"`
Event string `json:"event"`
Protocol string `json:"protocol"`
Gateway string `json:"gateway"`
ParentID string `json:"parent_id"`
Timestamp time.Time `json:"timestamp"`
ID string `json:"id"`
Extra map[string][]interface{}
Text string `json:"text"`
Channel string `json:"channel"`
Username string `json:"username"`
UserID string `json:"userid"` // userid on the bridge
Avatar string `json:"avatar"`
Account string `json:"account"`
Event string `json:"event"`
Protocol string `json:"protocol"`
Gateway string `json:"gateway"`
ParentID string `json:"parent_id"`
Timestamp time.Time `json:"timestamp"`
ID string `json:"id"`
InternalID xid.ID
Comment on lines +55 to +56
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have 'ID' here. Can we reuse it instead or is adding a new field fundamentally important?

This ID field currently holds the ID set by the receiving bridge, which leads to confusing ambiguities when tracing message flow because a config.Message will say one ID on incoming and then say another on the outgoing side. It would be a lot easier to understand what was going on without that.

The most difficult knot to untangle is how to associate the IDs tracked at the gateway with IDs tracked by each protocol. So I guess they do need to be associated; perhaps to make it clearer:

Suggested change
ID string `json:"id"`
InternalID xid.ID
ExternalID string `json:"id"`
InternalID xid.ID

The ExternalIDs should not be controllable by the gateway, it's up to each bridge to pick them (or have them picked for them by the remote server and report that), so there shouldn't be any reason to pass them gateway -> bridge, so maybe don't associate them in the struct at all:

Suggested change
ID string `json:"id"`
InternalID xid.ID

Instead Bridges can send { ExternalID string, rmsg config.Message } to the gateway's incoming chan in handleMessage which then immediately generates InternalID, and at the point it should initialize

gw.Messages[internalID] = [ BrMsgID{ExternalID} ]

and on Acks we append to gw.Messages[internalID] as you're doing below :)

Copy link
Copy Markdown
Collaborator Author

@selfhoster1312 selfhoster1312 Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a special case with the ID for config.EventFileDelete but i did not investigate further. Also i'm not sure how that would affect other parts of the codebase to remove this, but i think it's good to think about it! For example, i believe it would actually break the HTTP JSON API for 3rd party bridges outside of our codebase (since the ID is unmarshalled from JSON in that case and could not be passed as an argument)

Extra map[string][]interface{}
}

func (m Message) ParentNotFound() bool {
Expand Down
8 changes: 4 additions & 4 deletions bridge/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"

"github.com/bwmarrin/discordgo"
lru "github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/matterbridge-org/matterbridge/bridge"
"github.com/matterbridge-org/matterbridge/bridge/config"
"github.com/matterbridge-org/matterbridge/bridge/discord/transmitter"
Expand Down Expand Up @@ -39,11 +39,11 @@ type Bdiscord struct {
// Webhook specific logic
useAutoWebhooks bool
transmitter *transmitter.Transmitter
cache *lru.Cache
cache *lru.Cache[string, string]
}

func New(cfg *bridge.Config) bridge.Bridger {
newCache, err := lru.New(5000)
newCache, err := lru.New[string, string](5000)
if err != nil {
cfg.Log.Fatalf("Could not create LRU cache: %v", err)
}
Expand Down Expand Up @@ -305,7 +305,7 @@ func (b *Bdiscord) handleEventBotUser(msg *config.Message, channelID string) (st
}

if fi, ok := b.cache.Get(cFileUpload + msg.ID); ok {
err := b.c.ChannelMessageDelete(channelID, fi.(string)) // nolint:forcetypeassert
err := b.c.ChannelMessageDelete(channelID, fi)
b.cache.Remove(cFileUpload + msg.ID)
return "", err
}
Expand Down
6 changes: 3 additions & 3 deletions bridge/rocketchat/rocketchat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"
"sync"

lru "github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/matterbridge-org/matterbridge/bridge"
"github.com/matterbridge-org/matterbridge/bridge/config"
"github.com/matterbridge-org/matterbridge/bridge/helper"
Expand All @@ -22,7 +22,7 @@ type Brocketchat struct {
rh *rockethook.Client
c *realtime.Client
r *rest.Client
cache *lru.Cache
cache *lru.Cache[string, bool]
*bridge.Config
messageChan chan models.Message
channelMap map[string]string
Expand All @@ -37,7 +37,7 @@ const (
)

func New(cfg *bridge.Config) bridge.Bridger {
newCache, err := lru.New(100)
newCache, err := lru.New[string, bool](100)
if err != nil {
cfg.Log.Fatalf("Could not create LRU cache for rocketchat bridge: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions bridge/slack/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (b *Bslack) handleMessageEvent(ev *slack.MessageEvent) (*config.Message, er
}

func (b *Bslack) handleFileDeletedEvent(ev *slack.FileDeletedEvent) (*config.Message, error) {
if rawChannel, ok := b.cache.Get(cfileDownloadChannel + ev.FileID); ok {
if rawChannel, ok := b.cacheChan.Get(cfileDownloadChannel + ev.FileID); ok {
channel, err := b.channels.getChannelByID(rawChannel.(string))
if err != nil {
return nil, err
Expand Down Expand Up @@ -320,7 +320,7 @@ func (b *Bslack) handleAttachments(ev *slack.MessageEvent, rmsg *config.Message)
// If we have files attached, download them (in memory) and put a pointer to it in msg.Extra.
for i := range ev.Files {
// keep reference in cache on which channel we added this file
b.cache.Add(cfileDownloadChannel+ev.Files[i].ID, ev.Channel)
b.cacheChan.Add(cfileDownloadChannel+ev.Files[i].ID, ev.Channel)
if err := b.handleDownloadFile(rmsg, &ev.Files[i], false); err != nil {
b.Log.Errorf("Could not download incoming file: %#v", err)
}
Expand Down Expand Up @@ -406,9 +406,9 @@ func (b *Bslack) handleGetChannelMembers(rmsg *config.Message) bool {
// (the assumption is that such name collisions will not occur within the given
// timeframes).
func (b *Bslack) fileCached(file *slack.File) bool {
if ts, ok := b.cache.Get("file" + file.ID); ok && time.Since(ts.(time.Time)) < time.Minute {
if ts, ok := b.cacheTs.Get("file" + file.ID); ok && time.Since(ts) < time.Minute {
return true
} else if ts, ok = b.cache.Get("filename" + file.Name); ok && time.Since(ts.(time.Time)) < 10*time.Second {
} else if ts, ok = b.cacheTs.Get("filename" + file.Name); ok && time.Since(ts) < 10*time.Second {
return true
}
return false
Expand Down
25 changes: 17 additions & 8 deletions bridge/slack/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/matterbridge-org/matterbridge/bridge"
"github.com/matterbridge-org/matterbridge/bridge/config"
"github.com/matterbridge-org/matterbridge/bridge/helper"
Expand All @@ -26,7 +26,10 @@ type Bslack struct {
rtm *slack.RTM
si *slack.Info

cache *lru.Cache
// File timestamps
cacheTs *lru.Cache[string, time.Time]
// Channels
cacheChan *lru.Cache[string, any]
uuid string
useChannelID bool

Expand Down Expand Up @@ -81,14 +84,20 @@ func New(cfg *bridge.Config) bridge.Bridger {
}

func newBridge(cfg *bridge.Config) *Bslack {
newCache, err := lru.New(5000)
newCache, err := lru.New[string, time.Time](5000)
if err != nil {
cfg.Log.Fatalf("Could not create LRU cache for Slack bridge: %v", err)
}

newCache2, err := lru.New[string, any](5000)
if err != nil {
cfg.Log.Fatalf("Could not create LRU cache for Slack bridge: %v", err)
}
b := &Bslack{
Config: cfg,
uuid: xid.New().String(),
cache: newCache,
Config: cfg,
uuid: xid.New().String(),
cacheTs: newCache,
cacheChan: newCache2,
}
return b
}
Expand Down Expand Up @@ -458,7 +467,7 @@ func (b *Bslack) uploadFile(msg *config.Message, channelID string) (string, erro
// we can't match on the file ID yet, so we have to match on the filename too.
ts := time.Now()
b.Log.Debugf("Adding file %s to cache at %s with timestamp", fi.Name, ts.String())
b.cache.Add("filename"+fi.Name, ts)
b.cacheTs.Add("filename"+fi.Name, ts)
initialComment := fmt.Sprintf("File from %s", msg.Username)
if fi.Comment != "" {
initialComment += fmt.Sprintf(" with comment: %s", fi.Comment)
Expand All @@ -484,7 +493,7 @@ func (b *Bslack) uploadFile(msg *config.Message, channelID string) (string, erro
}

b.Log.Debugf("Adding file ID %s to cache with timestamp %s", res.ID, ts.String())
b.cache.Add("file"+res.ID, ts)
b.cacheTs.Add("file"+res.ID, ts)

// search for message id by uploaded file in private/public channels, get thread timestamp from uploaded file
if v, ok := sfi.Shares.Private[channelID]; ok && len(v) > 0 {
Expand Down
63 changes: 46 additions & 17 deletions bridge/xmpp/xmpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ type Bxmpp struct {

avatarAvailability map[string]bool
avatarMap map[string]string

// Mapping of sent origin-id to internal matterbridge ID of source msg
// An internal ID may correspond to several XMPP origin-id because
// of split message (eg. multiple file upload)
messageMap map[string]xid.ID
}

func New(cfg *bridge.Config) bridge.Bridger {
Expand All @@ -34,6 +39,7 @@ func New(cfg *bridge.Config) bridge.Bridger {
xmppMap: make(map[string]string),
avatarAvailability: make(map[string]bool),
avatarMap: make(map[string]string),
messageMap: make(map[string]xid.ID),
}
}

Expand Down Expand Up @@ -94,35 +100,43 @@ func (b *Bxmpp) Send(msg config.Message) (string, error) {
if msg.Extra != nil {
for _, rmsg := range helper.HandleExtra(&msg, b.General) {
b.Log.Debugf("=> Sending attachement message %#v", rmsg)

originId := xid.New().String()
b.messageMap[originId] = msg.InternalID

_, err = b.xc.Send(xmpp.Chat{
Type: "groupchat",
Remote: rmsg.Channel + "@" + b.GetString("Muc"),
Text: rmsg.Username + rmsg.Text,
Type: "groupchat",
Remote: rmsg.Channel + "@" + b.GetString("Muc"),
Text: rmsg.Username + rmsg.Text,
OriginID: originId,
})

if err != nil {
b.Log.WithError(err).Error("Unable to send message with share URL.")
}
}
if len(msg.Extra["file"]) > 0 {
// TODO: log
return "", b.handleUploadFile(&msg)
}
}

originId := xid.New().String()
b.messageMap[originId] = msg.InternalID

// Post normal message.
b.Log.Debugf("=> Sending message %#v", msg)
if _, err := b.xc.Send(xmpp.Chat{
Type: "groupchat",
Remote: msg.Channel + "@" + b.GetString("Muc"),
Text: msg.Username + msg.Text,
Type: "groupchat",
Remote: msg.Channel + "@" + b.GetString("Muc"),
Text: msg.Username + msg.Text,
OriginID: originId,
}); err != nil {
// TODO: log
return "", err
}

// Generate a dummy ID because to avoid collision with other internal messages
// However this does not provide proper Edits/Replies integration on XMPP side.
msgID := xid.New().String()
return msgID, nil
return "", nil
}

func (b *Bxmpp) createXMPP() error {
Expand Down Expand Up @@ -338,20 +352,29 @@ func (b *Bxmpp) handleUploadFile(msg *config.Message) error {
urlDesc = fileInfo.Comment
}
}

originId := xid.New().String()
b.messageMap[originId] = msg.InternalID

if _, err := b.xc.Send(xmpp.Chat{
Type: "groupchat",
Remote: msg.Channel + "@" + b.GetString("Muc"),
Text: msg.Username + msg.Text,
Type: "groupchat",
Remote: msg.Channel + "@" + b.GetString("Muc"),
Text: msg.Username + msg.Text,
OriginID: originId,
}); err != nil {
return err
}

originId = xid.New().String()

b.messageMap[originId] = msg.InternalID
if fileInfo.URL != "" {
if _, err := b.xc.SendOOB(xmpp.Chat{
Type: "groupchat",
Remote: msg.Channel + "@" + b.GetString("Muc"),
Ooburl: fileInfo.URL,
Oobdesc: urlDesc,
Type: "groupchat",
Remote: msg.Channel + "@" + b.GetString("Muc"),
Ooburl: fileInfo.URL,
Oobdesc: urlDesc,
OriginID: originId,
}); err != nil {
b.Log.WithError(err).Warn("Failed to send share URL.")
}
Expand Down Expand Up @@ -383,6 +406,12 @@ func (b *Bxmpp) parseChannel(remote string) string {
func (b *Bxmpp) skipMessage(message xmpp.Chat) bool {
// skip messages from ourselves
if b.parseNick(message.Remote) == b.GetString("Nick") {
// We received our own message, we will further ignore it,
// but first send back the ID it was assigned.
if message.OriginID != "" {
internal := b.messageMap[message.OriginID]
b.AckSentMessage(internal, message.OriginID, b.parseChannel(message.Remote))
}
return true
}

Expand Down
Loading
Loading