diff --git a/bridge/bridge.go b/bridge/bridge.go index 44642e7339..72c475c82a 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -45,6 +45,26 @@ type Config struct { *Bridge Remote chan config.Message + + // IsMessageBridged checks whether a message has already been bridged + // (exists in the persistent cache). Used by replay to avoid duplicates. + IsMessageBridged func(protocol, msgID string) bool + + // GetLastSeen returns the timestamp of the last processed message for a channel. + // Used by replay to determine the cutoff point. + GetLastSeen func(channelKey string) (time.Time, bool) + + // MarkMessageBridged marks a message as handled in the persistent cache. + // Used by bridges when they handle a message locally (e.g. posting an error + // notification) without routing it through the gateway, so replay won't + // re-process it on the next restart. + MarkMessageBridged func(protocol, msgID string) + + // SetDeltaToken stores a Graph API delta token for a channel. + SetDeltaToken func(channelKey, token string) + + // GetDeltaToken returns the stored Graph API delta token for a channel. + GetDeltaToken func(channelKey string) (string, bool) } // Factory is the factory function to create a bridge diff --git a/bridge/config/config.go b/bridge/config/config.go index 994b83488c..7d7f7a99db 100644 --- a/bridge/config/config.go +++ b/bridge/config/config.go @@ -30,6 +30,8 @@ const ( EventUserTyping = "user_typing" EventGetChannelMembers = "get_channel_members" EventNoticeIRC = "notice_irc" + EventHistoricalMapping = "historical_mapping" + EventReplayMessage = "replay_message" ) const ParentIDNotFound = "msg-parent-not-found" @@ -155,6 +157,8 @@ type Protocol struct { MediaServerDownload string MediaConvertTgs string // telegram MediaConvertWebPToPNG bool // telegram + MessageCacheDuration string // general, msteams, mattermost: max age of cache entries (default "168h" = 7 days) + MessageCacheFile string // general, msteams, mattermost: persistent message ID cache file MessageDelay int // IRC, time in millisecond to wait between messages MessageFormat string // telegram MessageLength int // IRC, max length of a message allowed diff --git a/bridge/helper/helper.go b/bridge/helper/helper.go index dd9a6d4fa6..116da8a441 100644 --- a/bridge/helper/helper.go +++ b/bridge/helper/helper.go @@ -23,10 +23,74 @@ import ( var errHttpGetNotOk = errors.New("HTTP server responded non-OK code") +// ErrFileTooLarge is returned when a file exceeds the configured MediaDownloadSize. +type ErrFileTooLarge struct { + Size int64 + MaxSize int +} + +func (e *ErrFileTooLarge) Error() string { + return fmt.Sprintf("file too large (%d bytes, limit %d bytes)", e.Size, e.MaxSize) +} + func HttpGetNotOkError(url string, code int) error { return fmt.Errorf("%w: %s returned code %d", errHttpGetNotOk, url, code) } +// DownloadFileWithSizeCheck downloads a file, aborting if it exceeds maxSize bytes. +// First tries an HTTP HEAD request to check Content-Length before downloading. +// If HEAD is not supported or returns no Content-Length, falls back to a size-limited +// download that reads at most maxSize+1 bytes to detect oversized files without +// buffering the entire content. +func DownloadFileWithSizeCheck(url string, maxSize int) (*[]byte, error) { + client := &http.Client{Timeout: time.Second * 30} + + // Try HEAD first to check Content-Length without downloading. + headReq, err := http.NewRequest("HEAD", url, nil) + if err == nil { + if headResp, headErr := client.Do(headReq); headErr == nil { + headResp.Body.Close() + if cl := headResp.ContentLength; cl > 0 && cl > int64(maxSize) { + return nil, &ErrFileTooLarge{Size: cl, MaxSize: maxSize} + } + } + } + + // Download with size limit. + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, HttpGetNotOkError(url, resp.StatusCode) + } + + // Check Content-Length from GET response too. + if cl := resp.ContentLength; cl > 0 && cl > int64(maxSize) { + return nil, &ErrFileTooLarge{Size: cl, MaxSize: maxSize} + } + + // Read up to maxSize+1 bytes. If we get more, the file is too large. + limited := io.LimitReader(resp.Body, int64(maxSize)+1) + var buf bytes.Buffer + n, err := io.Copy(&buf, limited) + if err != nil { + return nil, err + } + if n > int64(maxSize) { + return nil, &ErrFileTooLarge{Size: n, MaxSize: maxSize} + } + + data := buf.Bytes() + return &data, nil +} + // DownloadFile downloads the given non-authenticated URL. func DownloadFile(url string) (*[]byte, error) { return DownloadFileAuth(url, "") diff --git a/bridge/mattermost/handlers.go b/bridge/mattermost/handlers.go index 88e44757c0..8bf07878e3 100644 --- a/bridge/mattermost/handlers.go +++ b/bridge/mattermost/handlers.go @@ -2,6 +2,8 @@ package bmattermost import ( "context" + "fmt" + "strings" "github.com/matterbridge-org/matterbridge/bridge/config" "github.com/matterbridge-org/matterbridge/bridge/helper" @@ -96,6 +98,21 @@ func (b *Bmattermost) handleMatter() { //nolint:cyclop func (b *Bmattermost) handleMatterClient(messages chan *config.Message) { for message := range b.mc.MessageChan { + // Allowlist: only process events relevant to bridging. + // This avoids logging noise from status_change, hello, preferences_changed, etc. + et := message.Raw.EventType() + switch et { + case "posted", model.WebsocketEventPostEdited, model.WebsocketEventPostDeleted: + // Post events: always process (join/leave come as "posted" with system message type). + case "typing": + // Typing events: only process if ShowUserTyping is enabled. + if !b.GetBool("ShowUserTyping") { + continue + } + default: + continue + } + b.Log.Debugf("%#v %#v", message.Raw.GetData(), message.Raw.EventType()) if b.skipMessage(message) { @@ -128,6 +145,15 @@ func (b *Bmattermost) handleMatterClient(messages chan *config.Message) { // handle mattermost post properties (override username and attachments) b.handleProps(rmsg, message) + // Extract message priority from Post.Metadata if present. + if message.Post.Metadata != nil && message.Post.Metadata.Priority != nil && + message.Post.Metadata.Priority.Priority != nil { + prio := *message.Post.Metadata.Priority.Priority + if prio != "" && prio != "standard" { + rmsg.Extra["priority"] = []interface{}{prio} + } + } + // create a text for bridges that don't support native editing if message.Raw.EventType() == model.WebsocketEventPostEdited && !b.GetBool("EditDisable") { rmsg.Text = message.Text + b.GetString("EditSuffix") @@ -144,6 +170,13 @@ func (b *Bmattermost) handleMatterClient(messages chan *config.Message) { } } + // Intercept test command before relaying. + if b.isTestCommand(rmsg.Text) { + b.Log.Info("Test command received, starting test sequence") + go b.runTestSequence(channelName) + continue + } + // Use nickname instead of username if defined if !b.GetBool("useusername") { if nick := b.mc.GetNickName(rmsg.UserID); nick != "" { @@ -151,6 +184,11 @@ func (b *Bmattermost) handleMatterClient(messages chan *config.Message) { } } + // Populate full display name (FirstName + LastName) for bridges that support it. + if dn := b.getDisplayName(rmsg.UserID); dn != "" { + rmsg.Extra["displayname"] = []interface{}{dn} + } + messages <- rmsg } } @@ -170,22 +208,58 @@ func (b *Bmattermost) handleMatterHook(messages chan *config.Message) { } func (b *Bmattermost) handleUploadFile(msg *config.Message) (string, error) { - var err error - var res, id string channelID := b.getChannelID(msg.Channel) - for _, f := range msg.Extra["file"] { + + // Upload all files first, then create a single post with all file IDs. + var fileIDs []string + var firstComment string + for i, f := range msg.Extra["file"] { fi := f.(config.FileInfo) - id, err = b.mc.UploadFile(*fi.Data, channelID, fi.Name) + fileID, err := b.mc.UploadFile(*fi.Data, channelID, fi.Name) if err != nil { - return "", err + b.Log.Errorf("upload file %s failed: %s", fi.Name, err) + continue } - msg.Text = fi.Comment - if b.GetBool("PrefixMessagesWithNick") { - msg.Text = msg.Username + msg.Text + fileIDs = append(fileIDs, fileID) + if i == 0 { + firstComment = fi.Comment } - res, err = b.mc.PostMessageWithFiles(channelID, msg.Text, msg.ParentID, []string{id}) } - return res, err + + if len(fileIDs) == 0 { + return "", fmt.Errorf("no files uploaded successfully") + } + + text := firstComment + + // Build a single post with all files so they appear as one message + // with the bridged user's name and avatar via override_username. + post := &model.Post{ + ChannelId: channelID, + Message: text, + RootId: msg.ParentID, + FileIds: fileIDs, + Props: model.StringInterface{ + "from_webhook": "true", + "override_username": strings.TrimSpace(msg.Username), + "matterbridge_" + b.uuid: true, + }, + } + if msg.Avatar != "" { + post.Props["override_icon_url"] = msg.Avatar + } + if msg.Extra != nil { + if srcIDs, ok := msg.Extra["source_msgid"]; ok && len(srcIDs) > 0 { + if srcID, ok := srcIDs[0].(string); ok { + post.Props["matterbridge_srcid"] = srcID + } + } + } + created, _, err := b.mc.Client.CreatePost(context.TODO(), post) + if err != nil { + return "", err + } + return created.Id, nil } //nolint:forcetypeassert diff --git a/bridge/mattermost/helpers.go b/bridge/mattermost/helpers.go index 63e143dda9..3dfa56907a 100644 --- a/bridge/mattermost/helpers.go +++ b/bridge/mattermost/helpers.go @@ -117,7 +117,11 @@ func (b *Bmattermost) sendWebhook(msg config.Message) (string, error) { return "", nil } - if b.GetBool("PrefixMessagesWithNick") { + // When the webhook sets override_username (msg.Username is non-empty), + // do NOT prefix the message with the nick — it would show up twice + // (once as the sender name, once in the message body). + // Only prefix when there's no username to use as override. + if b.GetBool("PrefixMessagesWithNick") && msg.Username == "" { msg.Text = msg.Username + msg.Text } @@ -227,8 +231,11 @@ func (b *Bmattermost) skipMessage(message *matterclient.Message) bool { } } - // Ignore messages sent from a user logged in as the bot - if b.mc.User.Username == message.Username { + // Allow test messages from ourselves to be relayed (bypass echo prevention). + isTestMessage := message.Post.Props != nil && message.Post.Props["matterbridge_test"] != nil + + // Ignore messages sent from a user logged in as the bot (unless it's a test message). + if !isTestMessage && b.mc.User.Username == message.Username { b.Log.Debug("message from same user as bot, ignoring") return true } diff --git a/bridge/mattermost/mattermost.go b/bridge/mattermost/mattermost.go index 2a9cbf7670..e0e8db215b 100644 --- a/bridge/mattermost/mattermost.go +++ b/bridge/mattermost/mattermost.go @@ -1,194 +1,560 @@ package bmattermost import ( - "context" - "errors" - "fmt" - "strings" - "sync" - - "github.com/matterbridge-org/matterbridge/bridge" - "github.com/matterbridge-org/matterbridge/bridge/config" - "github.com/matterbridge-org/matterbridge/bridge/helper" - "github.com/matterbridge-org/matterbridge/matterhook" - "github.com/matterbridge/matterclient" - "github.com/rs/xid" + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/matterbridge-org/matterbridge/bridge" + "github.com/matterbridge-org/matterbridge/bridge/config" + "github.com/matterbridge-org/matterbridge/bridge/helper" + "github.com/matterbridge-org/matterbridge/matterhook" + "github.com/matterbridge/matterclient" + "github.com/mattermost/mattermost/server/public/model" + "github.com/rs/xid" ) type Bmattermost struct { - mh *matterhook.Client - mc *matterclient.Client - v6 bool - uuid string - TeamID string - *bridge.Config - avatarMap map[string]string - channelsMutex sync.RWMutex - channelInfoMap map[string]*config.ChannelInfo + mh *matterhook.Client + mc *matterclient.Client + v6 bool + uuid string + TeamID string + *bridge.Config + avatarMap map[string]string + displayNameCache map[string]string + channelsMutex sync.RWMutex + channelInfoMap map[string]*config.ChannelInfo } const mattermostPlugin = "mattermost.plugin" func New(cfg *bridge.Config) bridge.Bridger { - b := &Bmattermost{ - Config: cfg, - avatarMap: make(map[string]string), - channelInfoMap: make(map[string]*config.ChannelInfo), - } + b := &Bmattermost{ + Config: cfg, + avatarMap: make(map[string]string), + displayNameCache: make(map[string]string), + channelInfoMap: make(map[string]*config.ChannelInfo), + } + + b.v6 = b.GetBool("v6") + b.uuid = xid.New().String() - b.v6 = b.GetBool("v6") - b.uuid = xid.New().String() + return b +} - return b +// getDisplayName returns the full display name (FirstName + LastName) for a +// Mattermost user, using a cache to avoid redundant API calls. Returns "" if +// the user has no first/last name set. +func (b *Bmattermost) getDisplayName(userID string) string { + if dn, ok := b.displayNameCache[userID]; ok { + return dn + } + if b.mc == nil { + return "" + } + user, _, err := b.mc.Client.GetUser(context.TODO(), userID, "") + if err != nil { + b.Log.Debugf("getDisplayName: GetUser failed for %s: %s", userID, err) + b.displayNameCache[userID] = "" + return "" + } + if user == nil { + b.displayNameCache[userID] = "" + return "" + } + b.Log.Debugf("getDisplayName: user %s FirstName=%q LastName=%q Nickname=%q", + userID, user.FirstName, user.LastName, user.Nickname) + dn := strings.TrimSpace(user.FirstName + " " + user.LastName) + b.displayNameCache[userID] = dn + return dn } func (b *Bmattermost) Command(cmd string) string { - return "" + return "" } func (b *Bmattermost) Connect() error { - if b.Account == mattermostPlugin { - return nil - } - - if strings.HasPrefix(b.getVersion(), "6.") || strings.HasPrefix(b.getVersion(), "7.") { - if !b.v6 { - b.v6 = true - } - } - - if b.GetString("WebhookBindAddress") != "" { - if err := b.doConnectWebhookBind(); err != nil { - return err - } - go b.handleMatter() - return nil - } - switch { - case b.GetString("WebhookURL") != "": - if err := b.doConnectWebhookURL(); err != nil { - return err - } - go b.handleMatter() - return nil - case b.GetString("Token") != "": - b.Log.Info("Connecting using token (sending and receiving)") - err := b.apiLogin() - if err != nil { - return err - } - go b.handleMatter() - case b.GetString("Login") != "": - b.Log.Info("Connecting using login/password (sending and receiving)") - b.Log.Infof("Using mattermost v6 methods: %t", b.v6) - err := b.apiLogin() - if err != nil { - return err - } - go b.handleMatter() - } - if b.GetString("WebhookBindAddress") == "" && b.GetString("WebhookURL") == "" && - b.GetString("Login") == "" && b.GetString("Token") == "" { - return errors.New("no connection method found. See that you have WebhookBindAddress, WebhookURL or Token/Login/Password/Server/Team configured") - } - return nil + if b.Account == mattermostPlugin { + return nil + } + + if strings.HasPrefix(b.getVersion(), "6.") || strings.HasPrefix(b.getVersion(), "7.") { + if !b.v6 { + b.v6 = true + } + } + + if b.GetString("WebhookBindAddress") != "" { + if err := b.doConnectWebhookBind(); err != nil { + return err + } + go b.handleMatter() + return nil + } + + switch { + case b.GetString("WebhookURL") != "": + if err := b.doConnectWebhookURL(); err != nil { + return err + } + // doConnectWebhookURL() already calls apiLogin() if Token or Login + // is configured, so b.mc is available for hybrid mode. + if b.mc != nil { + b.Log.Info("Hybrid mode: webhook for new messages, API for thread replies/edits/deletes") + } + go b.handleMatter() + return nil + case b.GetString("Token") != "": + b.Log.Info("Connecting using token (sending and receiving)") + err := b.apiLogin() + if err != nil { + return err + } + go b.handleMatter() + case b.GetString("Login") != "": + b.Log.Info("Connecting using login/password (sending and receiving)") + b.Log.Infof("Using mattermost v6 methods: %t", b.v6) + err := b.apiLogin() + if err != nil { + return err + } + go b.handleMatter() + } + + if b.GetString("WebhookBindAddress") == "" && b.GetString("WebhookURL") == "" && + b.GetString("Login") == "" && b.GetString("Token") == "" { + return errors.New("no connection method found. See that you have WebhookBindAddress, WebhookURL or Token/Login/Password/Server/Team configured") + } + + return nil } func (b *Bmattermost) Disconnect() error { - return nil + return nil } func (b *Bmattermost) JoinChannel(channel config.ChannelInfo) error { - if b.Account == mattermostPlugin { - return nil - } + if b.Account == mattermostPlugin { + return nil + } + + b.channelsMutex.Lock() + b.channelInfoMap[channel.ID] = &channel + b.channelsMutex.Unlock() + + // we can only join channels using the API + if b.GetString("WebhookURL") == "" && b.GetString("WebhookBindAddress") == "" { + id := b.getChannelID(channel.Name) + if id == "" { + return fmt.Errorf("Could not find channel ID for channel %s", channel.Name) + } + if err := b.mc.JoinChannel(id); err != nil { + return err + } + } + + // Scan recent messages for historical source-ID markers, then replay missed messages. + go func() { + b.scanHistoricalMappings(channel) + b.replayMissedMessages(channel) + }() + + return nil +} + +// scanHistoricalMappings scans recent channel messages for matterbridge_srcid +// props and sends EventHistoricalMapping events to the gateway for persistent +// cache population. +func (b *Bmattermost) scanHistoricalMappings(channel config.ChannelInfo) { + if b.mc == nil { + return + } + channelID := b.getChannelID(channel.Name) + if channelID == "" { + return + } + + postList, _, err := b.mc.Client.GetPostsForChannel(context.TODO(), channelID, 0, 200, "", false, false) + if err != nil { + b.Log.Debugf("scanHistoricalMappings: GetPostsForChannel %s: %s", channel.Name, err) + return + } + + count := 0 + for _, id := range postList.Order { + post := postList.Posts[id] + srcID, ok := post.Props["matterbridge_srcid"].(string) + if !ok || srcID == "" { + continue + } + b.Remote <- config.Message{ + Event: config.EventHistoricalMapping, + Account: b.Account, + Channel: channel.Name, + ID: post.Id, + Extra: map[string][]interface{}{"source_msgid": {srcID}}, + } + count++ + } + if count > 0 { + b.Log.Infof("scanHistoricalMappings: found %d mappings in %s", count, channel.Name) + } +} + +// replayMissedMessages fetches recent messages from the channel and replays any +// that were not yet bridged. This catches up on messages missed during downtime. +func (b *Bmattermost) replayMissedMessages(channel config.ChannelInfo) { + if b.mc == nil || b.IsMessageBridged == nil || b.GetLastSeen == nil { + return + } + + channelID := b.getChannelID(channel.Name) + if channelID == "" { + return + } + + channelKey := channel.Name + b.Account + lastSeen, ok := b.GetLastSeen(channelKey) + if !ok { + // First start: no replay, let the cache initialize through normal operation. + b.Log.Debugf("replayMissedMessages: no lastSeen for %s, skipping (first start)", channelKey) + return + } + cutoff := lastSeen + + sinceMillis := cutoff.UnixMilli() + postList, _, err := b.mc.Client.GetPostsSince(context.TODO(), channelID, sinceMillis, false) + if err != nil { + b.Log.Errorf("replayMissedMessages: GetPostsSince failed: %s", err) + return + } + + // Collect and sort posts by CreateAt ascending (oldest first). + type postEntry struct { + id string + post *model.Post + } + var posts []postEntry + for _, id := range postList.Order { + post := postList.Posts[id] + if post.CreateAt < sinceMillis { + continue + } + posts = append(posts, postEntry{id, post}) + } + // Sort oldest first. + for i := 0; i < len(posts); i++ { + for j := i + 1; j < len(posts); j++ { + if posts[j].post.CreateAt < posts[i].post.CreateAt { + posts[i], posts[j] = posts[j], posts[i] + } + } + } + + count := 0 + propKey := "matterbridge_" + b.uuid + for _, pe := range posts { + post := pe.post + + // Skip messages sent by matterbridge itself. + if post.Props != nil { + if _, ok := post.Props[propKey].(bool); ok { + continue + } + // Also skip test messages. + if _, ok := post.Props["matterbridge_test"]; ok { + continue + } + // Skip messages bridged from another platform (any bridge instance). + // The matterbridge_srcid prop is set on all bridged messages and + // survives across bridge restarts (unlike the UUID-based prop). + if _, ok := post.Props["matterbridge_srcid"]; ok { + continue + } + } + + // Skip system messages. + if post.Type != "" && strings.HasPrefix(post.Type, "system_") { + continue + } - b.channelsMutex.Lock() - b.channelInfoMap[channel.ID] = &channel - b.channelsMutex.Unlock() + // Skip if already bridged. + if b.IsMessageBridged("mattermost", post.Id) { + continue + } - // we can only join channels using the API - if b.GetString("WebhookURL") == "" && b.GetString("WebhookBindAddress") == "" { - id := b.getChannelID(channel.Name) - if id == "" { - return fmt.Errorf("Could not find channel ID for channel %s", channel.Name) - } + // Resolve username for the post author. + username := "" + if post.Props != nil { + if override, ok := post.Props["override_username"].(string); ok && override != "" { + username = override + } + } + var displayName string + if username == "" { + user, _, userErr := b.mc.Client.GetUser(context.TODO(), post.UserId, "") + if userErr == nil && user != nil { + if !b.GetBool("useusername") && user.Nickname != "" { + username = user.Nickname + } else { + username = user.Username + } + dn := strings.TrimSpace(user.FirstName + " " + user.LastName) + b.displayNameCache[post.UserId] = dn + displayName = dn + } else { + username = "unknown" + } + } - return b.mc.JoinChannel(id) - } + // Format replay prefix with original timestamp. + createTime := time.UnixMilli(post.CreateAt) + replayPrefix := fmt.Sprintf("[Replay %s]\n", createTime.Format("2006-01-02 15:04 MST")) - return nil + rmsg := config.Message{ + Event: config.EventReplayMessage, + Account: b.Account, + Channel: channel.Name, + Username: username, + UserID: post.UserId, + Text: replayPrefix + post.Message, + ID: post.Id, + ParentID: post.RootId, + Extra: make(map[string][]interface{}), + } + if displayName != "" { + rmsg.Extra["displayname"] = []interface{}{displayName} + } + + // Handle file attachments. + for _, fileID := range post.FileIds { + if dlErr := b.handleDownloadFile(&rmsg, fileID); dlErr != nil { + b.Log.Errorf("replay: download failed for %s: %s", fileID, dlErr) + } + } + + b.Remote <- rmsg + count++ + time.Sleep(500 * time.Millisecond) + } + + if count > 0 { + b.Log.Infof("replayMissedMessages: replayed %d messages from %s", count, channel.Name) + } +} + +// lookupWebhookPostID searches recent channel posts to find the ID of a message +// just sent via webhook. Webhooks don't return post IDs, but the gateway needs +// them to map replies across bridges. We look for a recent post from the bot +// user that has our matterbridge uuid prop set and matches the expected text. +func (b *Bmattermost) lookupWebhookPostID(channelName, text string) string { + if b.mc == nil { + return "" + } + + channelID := b.getChannelID(channelName) + if channelID == "" { + return "" + } + + postList, _, err := b.mc.Client.GetPostsForChannel(context.TODO(), channelID, 0, 10, "", false, false) + if err != nil { + b.Log.Debugf("lookupWebhookPostID: GetPostsForChannel failed: %s", err) + return "" + } + + now := time.Now().UnixMilli() + propKey := "matterbridge_" + b.uuid + + for _, id := range postList.Order { + post := postList.Posts[id] + if now-post.CreateAt > 5000 { + continue + } + if _, ok := post.Props[propKey]; !ok { + continue + } + if post.RootId != "" { + continue + } + if strings.Contains(post.Message, text) || post.Message == text { + b.Log.Debugf("lookupWebhookPostID: found post %s for webhook message", post.Id) + return post.Id + } + } + + b.Log.Debugf("lookupWebhookPostID: no matching post found") + return "" } func (b *Bmattermost) Send(msg config.Message) (string, error) { - if b.Account == mattermostPlugin { - return "", nil - } - b.Log.Debugf("=> Receiving %#v", msg) - - // Make a action /me of the message - if msg.Event == config.EventUserAction { - msg.Text = "*" + msg.Text + "*" - } - - // map the file SHA to our user (caches the avatar) - if msg.Event == config.EventAvatarDownload { - return b.cacheAvatar(&msg) - } - - // Use webhook to send the message - if b.GetString("WebhookURL") != "" { - return b.sendWebhook(msg) - } - - // Delete message - if msg.Event == config.EventMsgDelete { - if msg.ID == "" { - return "", nil - } - - return msg.ID, b.mc.DeleteMessage(msg.ID) - } - - // Handle prefix hint for unthreaded messages. - if msg.ParentNotFound() { - msg.ParentID = "" - msg.Text = fmt.Sprintf("[thread]: %s", msg.Text) - } - - // we only can reply to the root of the thread, not to a specific ID (like discord for example does) - if msg.ParentID != "" { - post, _, err := b.mc.Client.GetPost(context.TODO(), msg.ParentID, "") - if err != nil { - b.Log.Errorf("getting post %s failed: %s", msg.ParentID, err) - } - if post != nil && post.RootId != "" { - msg.ParentID = post.RootId - } - } - - // Upload a file if it exists - if msg.Extra != nil { - for _, rmsg := range helper.HandleExtra(&msg, b.General) { - if _, err := b.mc.PostMessage(b.getChannelID(rmsg.Channel), rmsg.Username+rmsg.Text, msg.ParentID); err != nil { - b.Log.Errorf("PostMessage failed: %s", err) - } - } - if len(msg.Extra["file"]) > 0 { - return b.handleUploadFile(&msg) - } - } - - // Prepend nick if configured - if b.GetBool("PrefixMessagesWithNick") { - msg.Text = msg.Username + msg.Text - } - - // Edit message if we have an ID - if msg.ID != "" { - return b.mc.EditMessage(msg.ID, msg.Text) - } - - // Post normal message - return b.mc.PostMessage(b.getChannelID(msg.Channel), msg.Text, msg.ParentID) + if b.Account == mattermostPlugin { + return "", nil + } + + b.Log.Debugf("=> Receiving %#v", msg) + + // Make a action /me of the message + if msg.Event == config.EventUserAction { + msg.Text = "*" + msg.Text + "*" + } + + // map the file SHA to our user (caches the avatar) + if msg.Event == config.EventAvatarDownload { + return b.cacheAvatar(&msg) + } + + // --- Hybrid mode: webhook for top-level, API for replies/edits/deletes --- + if b.GetString("WebhookURL") != "" { + isReply := msg.ParentValid() + isEdit := msg.ID != "" && msg.Event == "msg_update" + isDelete := msg.Event == config.EventMsgDelete + + if !isReply && !isEdit && !isDelete { + // Top-level new message with files → upload files via API first, + // then send any remaining text via webhook. Webhooks can't upload + // binary files, so we need the API path for actual file uploads. + if msg.Extra != nil && len(msg.Extra["file"]) > 0 && b.mc != nil { + postID, err := b.handleUploadFile(&msg) + if err != nil { + b.Log.Errorf("handleUploadFile failed: %s", err) + } + // If there's no remaining text, return the upload post ID + // so the gateway can cache it for thread-reply mapping. + if strings.TrimSpace(msg.Text) == "" { + return postID, nil + } + // Clear the files so sendWebhook doesn't append URLs again. + delete(msg.Extra, "file") + } + + // Top-level new message → use webhook (username/avatar override). + // Remember the original text for post-ID lookup. + originalText := msg.Text + + _, err := b.sendWebhook(msg) + if err != nil { + return "", err + } + + // Look up the post ID so the gateway can cache it for reply mapping. + postID := b.lookupWebhookPostID(msg.Channel, originalText) + if postID != "" { + return postID, nil + } + return "", nil + } + + // Reply, edit, or delete → need API client. + if b.mc == nil { + b.Log.Warnf("Cannot send thread reply/edit/delete via API: no API connection.") + if isReply { + return b.sendWebhook(msg) + } + return "", fmt.Errorf("API client not available for edit/delete") + } + // Fall through to the API path below. + } + + // Delete message + if msg.Event == config.EventMsgDelete { + if msg.ID == "" { + return "", nil + } + return msg.ID, b.mc.DeleteMessage(msg.ID) + } + + // Handle prefix hint for unthreaded messages. + if msg.ParentNotFound() { + msg.ParentID = "" + msg.Text = fmt.Sprintf("[thread]: %s", msg.Text) + } + + // we only can reply to the root of the thread, not to a specific ID + if msg.ParentID != "" { + post, _, err := b.mc.Client.GetPost(context.TODO(), msg.ParentID, "") + if err != nil { + b.Log.Errorf("getting post %s failed: %s", msg.ParentID, err) + } + if post != nil && post.RootId != "" { + msg.ParentID = post.RootId + } + } + + // Upload a file if it exists + if msg.Extra != nil { + for _, rmsg := range helper.HandleExtra(&msg, b.General) { + extraPost := &model.Post{ + ChannelId: b.getChannelID(rmsg.Channel), + Message: rmsg.Text, + RootId: msg.ParentID, + Props: model.StringInterface{ + "from_webhook": "true", + "override_username": strings.TrimSpace(rmsg.Username), + "matterbridge_" + b.uuid: true, + }, + } + if rmsg.Avatar != "" { + extraPost.Props["override_icon_url"] = rmsg.Avatar + } + if _, _, err := b.mc.Client.CreatePost(context.TODO(), extraPost); err != nil { + b.Log.Errorf("PostMessage failed: %s", err) + } + } + if len(msg.Extra["file"]) > 0 { + return b.handleUploadFile(&msg) + } + } + + // Edit message if we have an ID — use PatchPost to preserve override props. + if msg.ID != "" { + props := model.StringInterface{ + "from_webhook": "true", + "override_username": strings.TrimSpace(msg.Username), + "matterbridge_" + b.uuid: true, + } + if msg.Avatar != "" { + props["override_icon_url"] = msg.Avatar + } + _, _, err := b.mc.Client.PatchPost(context.TODO(), msg.ID, &model.PostPatch{ + Message: &msg.Text, + Props: &props, + }) + if err != nil { + return "", err + } + return msg.ID, nil + } + + // Post normal message with override_username/icon so it appears as the + // bridged user (same as handleUploadFile does for file posts). + post := &model.Post{ + ChannelId: b.getChannelID(msg.Channel), + Message: msg.Text, + RootId: msg.ParentID, + Props: model.StringInterface{ + "from_webhook": "true", + "override_username": strings.TrimSpace(msg.Username), + "matterbridge_" + b.uuid: true, + }, + } + if msg.Avatar != "" { + post.Props["override_icon_url"] = msg.Avatar + } + if msg.Extra != nil { + if srcIDs, ok := msg.Extra["source_msgid"]; ok && len(srcIDs) > 0 { + if srcID, ok := srcIDs[0].(string); ok { + post.Props["matterbridge_srcid"] = srcID + } + } + } + created, _, err := b.mc.Client.CreatePost(context.TODO(), post) + if err != nil { + return "", err + } + return created.Id, nil } diff --git a/bridge/mattermost/test.go b/bridge/mattermost/test.go new file mode 100644 index 0000000000..f6e38c38ae --- /dev/null +++ b/bridge/mattermost/test.go @@ -0,0 +1,215 @@ +package bmattermost + +import ( + "context" + "strings" + "time" + + "github.com/matterbridge-org/matterbridge/testdata" + "github.com/mattermost/mattermost/server/public/model" +) + +// isTestCommand returns true if the message text is exactly "@matterbridge test". +func (b *Bmattermost) isTestCommand(text string) bool { + return strings.TrimSpace(strings.ToLower(text)) == "@matterbridge test" +} + +// runTestSequence posts a series of test messages to the given channel. +// The messages are posted via the API with a special "matterbridge_test" prop +// so that skipMessage() allows them through for relay to the other bridge side. +func (b *Bmattermost) runTestSequence(channelName string) { + channelID := b.getChannelID(channelName) + if channelID == "" { + b.Log.Errorf("test: could not resolve channel ID for %s", channelName) + return + } + + b.Log.Infof("test: starting test sequence in channel %s", channelName) + + testProps := model.StringInterface{ + "matterbridge_test": true, + "from_webhook": "true", + } + if iconURL := b.GetString("IconURL"); iconURL != "" { + testProps["override_icon_url"] = iconURL + } + + // Helper to post a message and return the post ID. + post := func(message, rootID string) string { + p := &model.Post{ + ChannelId: channelID, + Message: message, + RootId: rootID, + Props: testProps, + } + created, _, err := b.mc.Client.CreatePost(context.TODO(), p) + if err != nil { + b.Log.Errorf("test: CreatePost failed: %s", err) + return "" + } + return created.Id + } + + // Step 1: Root message + rootID := post("🧪 **Matterbridge Test Sequence**\nThis is a root message to test the bridge relay.", "") + if rootID == "" { + return + } + time.Sleep(time.Second) + + // Step 2: Thread reply + post("This is a thread reply to test threading support.", rootID) + time.Sleep(time.Second) + + // Step 3: Typo message (will be edited later) + typoID := post("this message contains a tipo", rootID) + time.Sleep(time.Second) + + // Step 4: Code block + post("```python\ndef hello():\n for i in range(3):\n print(f\"Hello from Matterbridge! ({i+1})\")\n\nhello()\n```", rootID) + time.Sleep(time.Second) + + // Step 5: Message to be deleted + deleteID := post("this message will be deleted", rootID) + time.Sleep(time.Second) + + // Step 6: Quote block + post("> This is a quoted line.\n> Matterbridge supports quote blocks.\n> Third line of the quote.", rootID) + time.Sleep(time.Second) + + // Step 7: Emojis + post(":thumbsup: :tada: :rocket: :heart: :eyes: :flag-at:", rootID) + time.Sleep(time.Second) + + // Step 8: Edit the typo message — include Props to preserve override_username/icon. + if typoID != "" { + newText := "this message contained a typo" + editProps := model.StringInterface{ + "from_webhook": "true", + "override_username": "matterbridge", + } + if b.GetString("IconURL") != "" { + editProps["override_icon_url"] = b.GetString("IconURL") + } + _, _, err := b.mc.Client.PatchPost(context.TODO(), typoID, &model.PostPatch{ + Message: &newText, + Props: &editProps, + }) + if err != nil { + b.Log.Errorf("test: PatchPost failed: %s", err) + } + } + time.Sleep(time.Second) + + // Step 9: Text formatting demo + post("**This text is bold**\n*This text is italic*\n~~This text is strikethrough~~\n### This is a heading\n[This is a link](https://github.com/matterbridge-org/matterbridge)", rootID) + time.Sleep(time.Second) + + // Step 10: Unordered list + post("- Item one\n- Item two\n- Item three", rootID) + time.Sleep(time.Second) + + // Step 11: Ordered list + post("1. First point\n2. Second point\n3. Third point", rootID) + time.Sleep(time.Second) + + // Step 12: Single PNG image + if pngID, err := b.mc.UploadFile(testdata.DemoPNG, channelID, "demo.png"); err != nil { + b.Log.Errorf("test: upload demo.png failed: %s", err) + } else { + p := &model.Post{ChannelId: channelID, Message: "Image test: PNG", RootId: rootID, FileIds: model.StringArray{pngID}, Props: testProps} + if _, _, err := b.mc.Client.CreatePost(context.TODO(), p); err != nil { + b.Log.Errorf("test: CreatePost with PNG failed: %s", err) + } + } + time.Sleep(time.Second) + + // Step 13: Single GIF image + if gifID, err := b.mc.UploadFile(testdata.DemoGIF, channelID, "demo.gif"); err != nil { + b.Log.Errorf("test: upload demo.gif failed: %s", err) + } else { + p := &model.Post{ChannelId: channelID, Message: "Image test: GIF", RootId: rootID, FileIds: model.StringArray{gifID}, Props: testProps} + if _, _, err := b.mc.Client.CreatePost(context.TODO(), p); err != nil { + b.Log.Errorf("test: CreatePost with GIF failed: %s", err) + } + } + time.Sleep(time.Second) + + // Step 14: Multi-image (2x PNG in one message) + { + var fileIDs model.StringArray + for _, name := range []string{"demo1.png", "demo2.png"} { + id, err := b.mc.UploadFile(testdata.DemoPNG, channelID, name) + if err != nil { + b.Log.Errorf("test: upload %s failed: %s", name, err) + continue + } + fileIDs = append(fileIDs, id) + } + if len(fileIDs) > 0 { + p := &model.Post{ChannelId: channelID, Message: "Image test: multi-image (2x PNG)", RootId: rootID, FileIds: fileIDs, Props: testProps} + if _, _, err := b.mc.Client.CreatePost(context.TODO(), p); err != nil { + b.Log.Errorf("test: CreatePost with multi-image failed: %s", err) + } + } + } + time.Sleep(time.Second) + + // Step 15: Delete the marked message + if deleteID != "" { + _, err := b.mc.Client.DeletePost(context.TODO(), deleteID) + if err != nil { + b.Log.Errorf("test: DeletePost failed: %s", err) + } + } + time.Sleep(time.Second) + + // Step 16: Important priority message as ROOT POST (priority only allowed on root posts) + { + prio := "important" + p := &model.Post{ + ChannelId: channelID, + Message: "Priority test: important message", + RootId: "", + Props: testProps, + Metadata: &model.PostMetadata{ + Priority: &model.PostPriority{ + Priority: &prio, + }, + }, + } + if created, _, err := b.mc.Client.CreatePost(context.TODO(), p); err != nil { + b.Log.Errorf("test: CreatePost with important priority failed: %s", err) + } else if created != nil { + b.Log.Infof("test: posted important priority message %s", created.Id) + } + } + time.Sleep(time.Second) + + // Step 17: Urgent priority message as ROOT POST (priority only allowed on root posts) + { + prio := "urgent" + p := &model.Post{ + ChannelId: channelID, + Message: "Priority test: urgent message", + RootId: "", + Props: testProps, + Metadata: &model.PostMetadata{ + Priority: &model.PostPriority{ + Priority: &prio, + }, + }, + } + if created, _, err := b.mc.Client.CreatePost(context.TODO(), p); err != nil { + b.Log.Errorf("test: CreatePost with urgent priority failed: %s", err) + } else if created != nil { + b.Log.Infof("test: posted urgent priority message %s", created.Id) + } + } + time.Sleep(time.Second) + + // Step 18: Test finished + post("✅ Test finished", rootID) + + b.Log.Info("test: test sequence completed") +} diff --git a/bridge/msteams/emoji.go b/bridge/msteams/emoji.go new file mode 100644 index 0000000000..230c539221 --- /dev/null +++ b/bridge/msteams/emoji.go @@ -0,0 +1,34 @@ +package bmsteams + +import ( + "regexp" + + "github.com/kyokomi/emoji/v2" +) + +// emojiMapping defines a regex-based emoji name replacement rule. +// This allows mapping platform-specific emoji shortcodes to a canonical form +// that the emoji library can resolve to unicode. +type emojiMapping struct { + pattern *regexp.Regexp + replace string +} + +// emojiMappings contains all emoji name conversions. +// Add new entries here to handle additional platform differences. +var emojiMappings = []emojiMapping{ + // Mattermost flag emojis use hyphens (:flag-at:), standard format uses underscores (:flag_at:). + {regexp.MustCompile(`:flag-([a-z]{2}):`), ":flag_$1:"}, +} + +// mapEmojis applies all emoji name mappings and then converts any resulting +// shortcodes to unicode. This catches platform-specific shortcodes that +// the gateway's initial emoji.Sprint() pass could not resolve. +func mapEmojis(text string) string { + for _, m := range emojiMappings { + text = m.pattern.ReplaceAllString(text, m.replace) + } + // Re-run emoji sprint for any newly mapped shortcodes. + emoji.ReplacePadding = "" + return emoji.Sprint(text) +} diff --git a/bridge/msteams/handler.go b/bridge/msteams/handler.go index 992e28a9e1..080a8132a6 100644 --- a/bridge/msteams/handler.go +++ b/bridge/msteams/handler.go @@ -2,8 +2,10 @@ package bmsteams import ( "encoding/json" + "errors" "fmt" "io" + "regexp" "strings" "github.com/matterbridge-org/matterbridge/bridge/config" @@ -12,13 +14,14 @@ import ( msgraph "github.com/yaegashi/msgraph.go/beta" ) +var hostedContentImgRE = regexp.MustCompile(`(?i)]*src="[^"]*hostedContents/([^/]+)/\$value"[^>]*(?:alt="([^"]*)")?[^>]*/?>`) + + func (b *Bmsteams) findFile(weburl string) (string, error) { itemRB, err := b.gc.GetDriveItemByURL(b.ctx, weburl) if err != nil { return "", err } - itemRB.Workbook().Worksheets() - b.gc.Workbooks() item, err := itemRB.Request().Get(b.ctx) if err != nil { return "", err @@ -29,15 +32,21 @@ func (b *Bmsteams) findFile(weburl string) (string, error) { return "", nil } -// handleDownloadFile handles file download +// handleDownloadFile handles file download with size validation. func (b *Bmsteams) handleDownloadFile(rmsg *config.Message, filename, weburl string) error { realURL, err := b.findFile(weburl) if err != nil { return err } - // Actually download the file. - data, err := helper.DownloadFile(realURL) + // Download the file with size limit enforcement. + data, err := helper.DownloadFileWithSizeCheck(realURL, b.General.MediaDownloadSize) if err != nil { + var tooLarge *helper.ErrFileTooLarge + if errors.As(err, &tooLarge) { + b.Log.Warnf("file %s too large (%d bytes, limit %d bytes)", filename, tooLarge.Size, tooLarge.MaxSize) + b.notifyFileTooLarge(rmsg, filename, tooLarge.Size, tooLarge.MaxSize) + return err + } return fmt.Errorf("download %s failed %#v", weburl, err) } @@ -50,21 +59,77 @@ func (b *Bmsteams) handleDownloadFile(rmsg *config.Message, filename, weburl str return nil } +// notifyFileTooLarge posts a warning reply directly into the Teams channel +// (via Graph API) so the sender sees that their file was not transferred. +// This must NOT use b.Remote because the handler runs on the source side — +// b.Remote would route the warning to the destination bridge instead. +func (b *Bmsteams) notifyFileTooLarge(rmsg *config.Message, filename string, actualSize int64, maxSize int) { + teamID := b.GetString("TeamID") + channelID := decodeChannelID(rmsg.Channel) + parentID := rmsg.ID + + text := fmt.Sprintf("⚠️ File %s could not be transferred — file too large (%d MB, limit: %d MB).", + htmlEscape(filename), actualSize/(1024*1024), maxSize/(1024*1024)) + htmlType := msgraph.BodyTypeVHTML + content := &msgraph.ItemBody{Content: &text, ContentType: &htmlType} + chatMsg := &msgraph.ChatMessage{Body: content} + + var res *msgraph.ChatMessage + var err error + if parentID != "" { + ct := b.gc.Teams().ID(teamID).Channels().ID(channelID).Messages().ID(parentID).Replies().Request() + res, err = ct.Add(b.ctx, chatMsg) + } else { + ct := b.gc.Teams().ID(teamID).Channels().ID(channelID).Messages().Request() + res, err = ct.Add(b.ctx, chatMsg) + } + if err != nil { + b.Log.Errorf("notifyFileTooLarge: failed to post warning: %s", err) + return + } + if res != nil && res.ID != nil { + b.sentIDs[*res.ID] = struct{}{} + + // Persist the warning reply and the original message in the cache so + // replay won't re-process them after a restart (sentIDs is in-memory only). + if b.MarkMessageBridged != nil { + // Mark the original message as handled. + if parentID != "" { + b.MarkMessageBridged("msteams", parentID) + // Mark the reply (warning) itself using the composite key. + b.MarkMessageBridged("msteams", parentID+"/"+*res.ID) + } else { + b.MarkMessageBridged("msteams", *res.ID) + } + } + } +} + func (b *Bmsteams) handleAttachments(rmsg *config.Message, msg msgraph.ChatMessage) { for _, a := range msg.Attachments { //remove the attachment tags from the text rmsg.Text = attachRE.ReplaceAllString(rmsg.Text, "") + // Skip attachments without required fields (e.g. messageReference from + // "reply with quote" has no Name/ContentURL). + if a.ContentType == nil { + continue + } + //handle a code snippet (code block) if *a.ContentType == "application/vnd.microsoft.card.codesnippet" { b.handleCodeSnippet(rmsg, a) continue } + if a.Name == nil || a.ContentURL == nil { + continue + } + //handle the download err := b.handleDownloadFile(rmsg, *a.Name, *a.ContentURL) if err != nil { - b.Log.Errorf("download of %s failed: %s", *a.Name, err) + b.Log.Warnf("download of %s failed: %s", *a.Name, err) } } } @@ -86,6 +151,10 @@ func (b *Bmsteams) handleCodeSnippet(rmsg *config.Message, attach msgraph.ChatMe b.Log.Errorf("codesnippetUrl has unexpected size: %s", content.CodeSnippetURL) return } + if !strings.HasPrefix(content.CodeSnippetURL, "https://graph.microsoft.com/") { + b.Log.Errorf("codesnippetUrl has unexpected host: %s", content.CodeSnippetURL) + return + } resp, err := b.gc.Teams().Request().Client().Get(content.CodeSnippetURL) if err != nil { b.Log.Errorf("retrieving snippet content failed:%s", err) @@ -100,3 +169,70 @@ func (b *Bmsteams) handleCodeSnippet(rmsg *config.Message, attach msgraph.ChatMe } rmsg.Text = rmsg.Text + "\n```" + content.Language + "\n" + string(res) + "\n```\n" } + +// handleHostedContents downloads inline images embedded via hostedContents +// in the Teams message HTML body and adds them to rmsg.Extra["file"]. +// parentMsgID should be empty for top-level messages, or the parent message ID for replies. +func (b *Bmsteams) handleHostedContents(rmsg *config.Message, msg msgraph.ChatMessage, parentMsgID string) { + if msg.Body == nil || msg.Body.Content == nil { + return + } + + matches := hostedContentImgRE.FindAllStringSubmatch(*msg.Body.Content, -1) + if len(matches) == 0 { + return + } + + teamID := b.GetString("TeamID") + channelID := decodeChannelID(rmsg.Channel) + msgID := *msg.ID + + for _, m := range matches { + hcID := m[1] + filename := m[2] // from alt attribute + if filename == "" { + filename = fmt.Sprintf("image_%s.png", hcID) + } + + // Build the Graph API URL for the hostedContent binary. + var apiURL string + if parentMsgID == "" { + apiURL = fmt.Sprintf("https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/%s/hostedContents/%s/$value", + teamID, channelID, msgID, hcID) + } else { + apiURL = fmt.Sprintf("https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/%s/replies/%s/hostedContents/%s/$value", + teamID, channelID, parentMsgID, msgID, hcID) + } + + resp, err := b.gc.Teams().Request().Client().Get(apiURL) + if err != nil { + b.Log.Errorf("handleHostedContents: GET %s failed: %s", apiURL, err) + continue + } + + if resp.StatusCode >= 400 { + resp.Body.Close() + b.Log.Errorf("handleHostedContents: GET %s returned %d", apiURL, resp.StatusCode) + continue + } + + maxSize := b.General.MediaDownloadSize + data, err := io.ReadAll(io.LimitReader(resp.Body, int64(maxSize)+1)) + resp.Body.Close() + if err != nil { + b.Log.Errorf("handleHostedContents: reading body for %s failed: %s", filename, err) + continue + } + + if len(data) > maxSize { + b.Log.Warnf("handleHostedContents: %s too large (>%d bytes, limit %d bytes)", filename, maxSize, maxSize) + b.notifyFileTooLarge(rmsg, filename, int64(len(data)), maxSize) + continue + } + + b.Log.Debugf("handleHostedContents: downloaded %s (%d bytes)", filename, len(data)) + comment := rmsg.Text + rmsg.Text = "" + helper.HandleDownloadData(b.Log, rmsg, filename, comment, "", &data, b.General) + } +} diff --git a/bridge/msteams/msteams.go b/bridge/msteams/msteams.go index f79e718884..f685686b87 100644 --- a/bridge/msteams/msteams.go +++ b/bridge/msteams/msteams.go @@ -1,229 +1,1325 @@ package bmsteams import ( - "context" - "fmt" - "os" - "regexp" - "strings" - "time" - - "github.com/davecgh/go-spew/spew" - "github.com/matterbridge-org/matterbridge/bridge" - "github.com/matterbridge-org/matterbridge/bridge/config" - - "github.com/mattn/godown" - msgraph "github.com/yaegashi/msgraph.go/beta" - "github.com/yaegashi/msgraph.go/msauth" - - "golang.org/x/oauth2" + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/url" + "os" + "path/filepath" + "regexp" + "strings" + "time" + + "github.com/matterbridge-org/matterbridge/bridge" + "github.com/matterbridge-org/matterbridge/bridge/config" + "github.com/gomarkdown/markdown" + mdhtml "github.com/gomarkdown/markdown/html" + "github.com/gomarkdown/markdown/parser" + "github.com/mattn/godown" + msgraph "github.com/yaegashi/msgraph.go/beta" + "github.com/yaegashi/msgraph.go/msauth" + "golang.org/x/oauth2" ) var ( - defaultScopes = []string{"openid", "profile", "offline_access", "Group.Read.All", "Group.ReadWrite.All"} - attachRE = regexp.MustCompile(``) + defaultScopes = []string{"openid", "profile", "offline_access", "Group.Read.All", "Group.ReadWrite.All"} + attachRE = regexp.MustCompile(``) ) type Bmsteams struct { - gc *msgraph.GraphServiceRequestBuilder - ctx context.Context - botID string - *bridge.Config + gc *msgraph.GraphServiceRequestBuilder + ctx context.Context + botID string + ts oauth2.TokenSource // token source for fresh access tokens + sentIDs map[string]struct{} // IDs of messages/replies we posted (echo prevention) + updatedIDs map[string]time.Time // IDs of messages we PATCHed, with expiry time + *bridge.Config } func New(cfg *bridge.Config) bridge.Bridger { - return &Bmsteams{Config: cfg} + return &Bmsteams{ + Config: cfg, + sentIDs: make(map[string]struct{}), + updatedIDs: make(map[string]time.Time), + } } func (b *Bmsteams) Connect() error { - tokenCachePath := b.GetString("sessionFile") - if tokenCachePath == "" { - tokenCachePath = "msteams_session.json" - } - ctx := context.Background() - m := msauth.NewManager() - m.LoadFile(tokenCachePath) //nolint:errcheck - ts, err := m.DeviceAuthorizationGrant(ctx, b.GetString("TenantID"), b.GetString("ClientID"), defaultScopes, nil) - if err != nil { - return err - } - err = m.SaveFile(tokenCachePath) - if err != nil { - b.Log.Errorf("Couldn't save sessionfile in %s: %s", tokenCachePath, err) - } - // make file readable only for matterbridge user - err = os.Chmod(tokenCachePath, 0o600) - if err != nil { - b.Log.Errorf("Couldn't change permissions for %s: %s", tokenCachePath, err) - } - httpClient := oauth2.NewClient(ctx, ts) - graphClient := msgraph.NewClient(httpClient) - b.gc = graphClient - b.ctx = ctx - - err = b.setBotID() - if err != nil { - return err - } - b.Log.Info("Connection succeeded") - return nil + tokenCachePath := b.GetString("sessionFile") + if tokenCachePath == "" { + tokenCachePath = "msteams_session.json" + } + + ctx := context.Background() + m := msauth.NewManager() + m.LoadFile(tokenCachePath) //nolint:errcheck + + ts, err := m.DeviceAuthorizationGrant(ctx, b.GetString("TenantID"), b.GetString("ClientID"), defaultScopes, nil) + if err != nil { + return err + } + + err = m.SaveFile(tokenCachePath) + if err != nil { + b.Log.Errorf("Couldn't save sessionfile in %s: %s", tokenCachePath, err) + } + + err = os.Chmod(tokenCachePath, 0o600) + if err != nil { + b.Log.Errorf("Couldn't change permissions for %s: %s", tokenCachePath, err) + } + + httpClient := oauth2.NewClient(ctx, ts) + graphClient := msgraph.NewClient(httpClient) + b.gc = graphClient + b.ctx = ctx + + // Store the token source so we can get fresh tokens for direct HTTP calls. + b.ts = ts + + err = b.setBotID() + if err != nil { + return err + } + + b.Log.Info("Connection succeeded") + return nil } func (b *Bmsteams) Disconnect() error { - return nil + return nil } func (b *Bmsteams) JoinChannel(channel config.ChannelInfo) error { - go func(name string) { - for { - err := b.poll(name) - if err != nil { - b.Log.Errorf("polling failed for %s: %s. retrying in 5 seconds", name, err) - } - time.Sleep(time.Second * 5) - } - }(channel.Name) - return nil + go func(name string) { + for { + err := b.poll(name) + if err != nil { + b.Log.Errorf("polling failed for %s: %s. retrying in 5 seconds", name, err) + } + time.Sleep(5 * time.Second) + } + }(channel.Name) + return nil +} + +// errDeltaTokenExpired is returned by fetchDelta when the server responds with +// HTTP 410 Gone, indicating the delta token has expired. +var errDeltaTokenExpired = fmt.Errorf("delta token expired") + +// deltaResponse is the JSON structure returned by the Graph API delta endpoint. +type deltaResponse struct { + Value []json.RawMessage `json:"value"` + NextLink string `json:"@odata.nextLink"` + DeltaLink string `json:"@odata.deltaLink"` +} + +// deltaMessageMeta extracts the replyToId field that msgraph.ChatMessage lacks. +type deltaMessageMeta struct { + ReplyToID *string `json:"replyToId"` +} + +// fetchDelta calls the Graph API delta endpoint and paginates through all pages. +// Returns the list of messages, a map of messageID→parentID for replies, and the +// new deltaLink URL for the next incremental sync. +func (b *Bmsteams) fetchDelta(deltaURL string) ( + messages []msgraph.ChatMessage, + replyToIDs map[string]string, + nextDeltaLink string, + err error, +) { + replyToIDs = make(map[string]string) + + token, err := b.getAccessToken() + if err != nil { + return nil, nil, "", fmt.Errorf("getAccessToken: %w", err) + } + + currentURL := deltaURL + const maxPages = 10 + + for page := 0; page < maxPages && currentURL != ""; page++ { + req, reqErr := http.NewRequestWithContext(b.ctx, http.MethodGet, currentURL, nil) + if reqErr != nil { + return nil, nil, "", fmt.Errorf("NewRequest: %w", reqErr) + } + req.Header.Set("Authorization", "Bearer "+token) + + resp, doErr := http.DefaultClient.Do(req) + if doErr != nil { + return nil, nil, "", fmt.Errorf("HTTP request: %w", doErr) + } + + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + + if resp.StatusCode == http.StatusGone { + return nil, nil, "", errDeltaTokenExpired + } + if resp.StatusCode != http.StatusOK { + return nil, nil, "", fmt.Errorf("API returned %d: %s", resp.StatusCode, string(body)) + } + + var result deltaResponse + if jsonErr := json.Unmarshal(body, &result); jsonErr != nil { + return nil, nil, "", fmt.Errorf("JSON parse: %w", jsonErr) + } + + for _, raw := range result.Value { + var msg msgraph.ChatMessage + if err := json.Unmarshal(raw, &msg); err != nil { + b.Log.Debugf("fetchDelta: skipping unparseable message: %s", err) + continue + } + if msg.ID == nil { + continue + } + + var meta deltaMessageMeta + _ = json.Unmarshal(raw, &meta) + + if meta.ReplyToID != nil && *meta.ReplyToID != "" { + replyToIDs[*msg.ID] = *meta.ReplyToID + } + + messages = append(messages, msg) + } + + if len(result.Value) > 0 { + b.Log.Debugf("fetchDelta page %d: %d items, nextLink=%v, deltaLink=%v", + page, len(result.Value), result.NextLink != "", result.DeltaLink != "") + } + + if result.DeltaLink != "" { + nextDeltaLink = result.DeltaLink + } + currentURL = result.NextLink + } + + // If we exhausted maxPages without getting a deltaLink, use the last nextLink + // as a workaround (will continue pagination on next call). + if nextDeltaLink == "" && currentURL != "" { + nextDeltaLink = currentURL + } + + return messages, replyToIDs, nextDeltaLink, nil +} + +// deltaMessageKey returns the cache key and parentID for a delta message. +func deltaMessageKey(msg msgraph.ChatMessage, replyToIDs map[string]string) (key, parentID string) { + if parent, isReply := replyToIDs[*msg.ID]; isReply { + return parent + "/" + *msg.ID, parent + } + return *msg.ID, "" +} + +// seedMsgmap populates the msgmap with timestamps from messages without relaying them. +// If rootMsgCreated is non-nil, root message IDs are also tracked for reply polling. +func (b *Bmsteams) seedMsgmap(messages []msgraph.ChatMessage, replyToIDs map[string]string, msgmap map[string]time.Time, mbSrcRE *regexp.Regexp, channelName string, rootMsgCreated map[string]time.Time) { + for _, msg := range messages { + if msg.ID == nil || msg.CreatedDateTime == nil { + continue + } + key, _ := deltaMessageKey(msg, replyToIDs) + if msg.LastModifiedDateTime != nil { + msgmap[key] = *msg.LastModifiedDateTime + } else { + msgmap[key] = *msg.CreatedDateTime + } + // Track root messages for reply polling. + if rootMsgCreated != nil { + if _, isReply := replyToIDs[*msg.ID]; !isReply { + rootMsgCreated[*msg.ID] = *msg.CreatedDateTime + } + } + + // Extract source ID marker from message body for persistent cache population. + if msg.Body != nil && msg.Body.Content != nil { + if matches := mbSrcRE.FindStringSubmatch(*msg.Body.Content); len(matches) == 2 { + b.Remote <- config.Message{ + Event: config.EventHistoricalMapping, + Account: b.Account, + Channel: channelName, + ID: *msg.ID, + Extra: map[string][]interface{}{"source_msgid": {matches[1]}}, + } + } + } + } +} + +// processReplay relays missed messages (from a delta sync after restart) to the gateway. +func (b *Bmsteams) processReplay(messages []msgraph.ChatMessage, replyToIDs map[string]string, channelName string) int { + count := 0 + for _, msg := range messages { + if msg.ID == nil || msg.CreatedDateTime == nil { + continue + } + if msg.From == nil || msg.From.User == nil || msg.Body == nil { + continue + } + if msg.DeletedDateTime != nil { + continue + } + + key, parentID := deltaMessageKey(msg, replyToIDs) + + // Skip messages posted by the bridge itself. All messages sent + // by the bridge contain a hidden data-mb-src marker in the HTML + // body (added by formatMessageHTML and test sequences). Manually + // typed messages in Teams don't have this marker. + if msg.Body != nil && msg.Body.Content != nil && + strings.Contains(*msg.Body.Content, "data-mb-src=") { + continue + } + + // Skip messages we sent (in-memory, current run only). + if _, wasSentByUs := b.sentIDs[*msg.ID]; wasSentByUs { + continue + } + // Skip if already bridged. + if b.IsMessageBridged != nil && b.IsMessageBridged("msteams", key) { + continue + } + + text := b.convertToMD(*msg.Body.Content) + if msg.Subject != nil && *msg.Subject != "" { + text = "**" + *msg.Subject + "**\n" + text + } + + createTime := *msg.CreatedDateTime + replayPrefix := fmt.Sprintf("[Replay %s]\n", createTime.Format("2006-01-02 15:04 MST")) + + rmsg := config.Message{ + Event: config.EventReplayMessage, + Username: *msg.From.User.DisplayName, + Text: replayPrefix + text, + Channel: channelName, + Account: b.Account, + UserID: *msg.From.User.ID, + ID: key, + ParentID: parentID, + Avatar: b.GetString("IconURL"), + Extra: make(map[string][]interface{}), + } + + b.handleAttachments(&rmsg, msg) + b.handleHostedContents(&rmsg, msg, parentID) + + // Skip empty messages (e.g. failed file download with no text content). + hasFiles := len(rmsg.Extra["file"]) > 0 + textAfterPrefix := strings.TrimSpace(strings.TrimPrefix(rmsg.Text, replayPrefix)) + if textAfterPrefix == "" && !hasFiles { + continue + } + + b.Remote <- rmsg + count++ + time.Sleep(500 * time.Millisecond) + } + return count +} + +// processDelta handles messages from a normal delta poll cycle (not replay). +func (b *Bmsteams) processDelta(messages []msgraph.ChatMessage, replyToIDs map[string]string, channelName string, msgmap map[string]time.Time, mbSrcRE *regexp.Regexp, startTime time.Time) { + for _, msg := range messages { + if msg.ID == nil || msg.CreatedDateTime == nil { + continue + } + + key, parentID := deltaMessageKey(msg, replyToIDs) + + // Check if this message is new or changed. + isNewOrChanged := true + if mtime, ok := msgmap[key]; ok { + if mtime == *msg.CreatedDateTime && msg.LastModifiedDateTime == nil { + isNewOrChanged = false + } else if msg.LastModifiedDateTime != nil && mtime == *msg.LastModifiedDateTime { + isNewOrChanged = false + } + } + + if !isNewOrChanged { + continue + } + + // Guard against first-start flooding: messages created before poll + // started that aren't in our seed (e.g. delta returning old messages + // on first start with $deltatoken=latest) are silently seeded. + if _, inMap := msgmap[key]; !inMap && msg.CreatedDateTime.Before(startTime) { + if msg.LastModifiedDateTime != nil { + msgmap[key] = *msg.LastModifiedDateTime + } else { + msgmap[key] = *msg.CreatedDateTime + } + continue + } + + if b.GetBool("debug") { + b.Log.Debugf("Msg dump: %+v", msg) + } + + if msg.From == nil || msg.From.User == nil { + // System message or bot — update msgmap silently. + if msg.LastModifiedDateTime != nil { + msgmap[key] = *msg.LastModifiedDateTime + } else { + msgmap[key] = *msg.CreatedDateTime + } + continue + } + + // Echo prevention: check if we PATCHed this message. + if expiry, wasUpdatedByUs := b.updatedIDs[*msg.ID]; wasUpdatedByUs && time.Now().Before(expiry) { + b.Log.Debugf("skipping echo of our own edit for %s", key) + if msg.LastModifiedDateTime != nil { + msgmap[key] = *msg.LastModifiedDateTime + } else { + msgmap[key] = *msg.CreatedDateTime + } + continue + } + + // Echo prevention: check if we posted this message. + if _, wasSentByUs := b.sentIDs[*msg.ID]; wasSentByUs { + b.Log.Debug("skipping own message") + if msg.LastModifiedDateTime != nil { + msgmap[key] = *msg.LastModifiedDateTime + } else { + msgmap[key] = *msg.CreatedDateTime + } + delete(b.sentIDs, *msg.ID) + continue + } + + // Determine event type: delete, edit, or new. + isDelete := msg.DeletedDateTime != nil + isEdit := false + if !isDelete { + if _, alreadySeen := msgmap[key]; alreadySeen { + isEdit = true + } + } + + // Update msgmap. + if msg.LastModifiedDateTime != nil { + msgmap[key] = *msg.LastModifiedDateTime + } else { + msgmap[key] = *msg.CreatedDateTime + } + + // Extract source ID marker. + if msg.Body != nil && msg.Body.Content != nil { + if matches := mbSrcRE.FindStringSubmatch(*msg.Body.Content); len(matches) == 2 { + b.Remote <- config.Message{ + Event: config.EventHistoricalMapping, + Account: b.Account, + Channel: channelName, + ID: *msg.ID, + Extra: map[string][]interface{}{"source_msgid": {matches[1]}}, + } + } + } + + text := "" + if msg.Body != nil && msg.Body.Content != nil { + text = b.convertToMD(*msg.Body.Content) + } + + // Intercept test command (only for new root messages). + if !isDelete && !isEdit && parentID == "" && b.isTestCommand(text) { + b.Log.Info("Test command received, starting test sequence") + go b.runTestSequence(channelName) + continue + } + + // Prepend subject if present. + if msg.Subject != nil && *msg.Subject != "" { + text = "**" + *msg.Subject + "**\n" + text + } + + event := "" + if isDelete { + event = config.EventMsgDelete + text = config.EventMsgDelete + } else if isEdit { + event = "msg_update" + } + + b.Log.Debugf("<= Sending message from %s on %s to gateway", *msg.From.User.DisplayName, b.Account) + + rmsg := config.Message{ + Username: *msg.From.User.DisplayName, + Text: text, + Channel: channelName, + Account: b.Account, + UserID: *msg.From.User.ID, + ID: key, + ParentID: parentID, + Event: event, + Avatar: b.GetString("IconURL"), + Extra: make(map[string][]interface{}), + } + if !isEdit && !isDelete { + b.handleAttachments(&rmsg, msg) + b.handleHostedContents(&rmsg, msg, parentID) + } + b.Log.Debugf("<= Message is %#v", rmsg) + b.Remote <- rmsg + } } func (b *Bmsteams) Send(msg config.Message) (string, error) { - b.Log.Debugf("=> Receiving %#v", msg) - if msg.ParentValid() { - return b.sendReply(msg) - } - - // Handle prefix hint for unthreaded messages. - if msg.ParentNotFound() { - msg.ParentID = "" - msg.Text = fmt.Sprintf("[thread]: %s", msg.Text) - } - - ct := b.gc.Teams().ID(b.GetString("TeamID")).Channels().ID(msg.Channel).Messages().Request() - text := msg.Username + msg.Text - content := &msgraph.ItemBody{Content: &text} - rmsg := &msgraph.ChatMessage{Body: content} - res, err := ct.Add(b.ctx, rmsg) - if err != nil { - return "", err - } - return *res.ID, nil + b.Log.Debugf("=> Receiving %#v", msg) + + // Handle deletes from Mattermost → Teams. + if msg.Event == config.EventMsgDelete && msg.ID != "" { + b.Log.Debugf("delete: soft-deleting Teams message ID %s", msg.ID) + return b.deleteMessage(msg) + } + + // Handle edits from Mattermost → Teams. + // The gateway sets msg.ID="" on first send, but on edits it maps the Mattermost + // post-ID to the Teams message-ID (returned by our Send()) and passes it here. + // So msg.ID != "" (and not a delete) means this is an edit. + if msg.ID != "" { + b.Log.Debugf("edit: updating Teams message ID %s", msg.ID) + return b.updateMessage(msg) + } + + // Prepend priority indicator emoji for Mattermost important/urgent messages. + if msg.Extra != nil { + if priorities, ok := msg.Extra["priority"]; ok && len(priorities) > 0 { + if prio, ok := priorities[0].(string); ok { + switch prio { + case "important": + msg.Text = "❗ " + msg.Text + case "urgent": + msg.Text = "🚨 " + msg.Text + } + } + } + } + + // Handle file/image attachments. + if msg.Extra != nil && len(msg.Extra["file"]) > 0 { + // Build caption from msg.Text for the first message. + captionHTML := "" + if msg.Text != "" { + captionText := mapEmojis(msg.Text) + captionHTML = mdToTeamsHTML(captionText) + } + + // Classify files: supported images (hostedContents) vs others. + var supportedImages []config.FileInfo + var otherFiles []config.FileInfo + for _, files := range msg.Extra["file"] { + fi, ok := files.(config.FileInfo) + if !ok { + continue + } + if isImageFile(fi.Name) && fi.Data != nil && isSupportedHostedContentType(fi.Name) { + supportedImages = append(supportedImages, fi) + } else { + otherFiles = append(otherFiles, fi) + } + } + + var firstID string + + // Send all supported images in a single Teams message. + if len(supportedImages) > 0 { + id, err := b.sendImageHostedContent(msg, supportedImages, captionHTML) + if err != nil { + b.Log.Warnf("sendImageHostedContent failed: %s", err) + } else { + firstID = id + captionHTML = "" // caption was included, don't duplicate + } + } + + // Handle remaining files individually (URL-based or notification). + for _, fi := range otherFiles { + id, err := b.sendFileAsMessage(msg, fi, captionHTML) + if err != nil { + b.Log.Warnf("sending file %s: %s", fi.Name, err) + } + if firstID == "" && id != "" { + firstID = id + } + captionHTML = "" // only include caption once + } + + // Return the first message ID for gateway thread-reply mapping. + return firstID, nil + } + + if msg.ParentValid() { + return b.sendReply(msg) + } + + if msg.ParentNotFound() { + msg.ParentID = "" + // Don't add a [thread] prefix — the message is posted to the correct + // context already and the prefix just clutters the content. + } + + ct := b.gc.Teams().ID(b.GetString("TeamID")).Channels().ID(decodeChannelID(msg.Channel)).Messages().Request() + + // Apply emoji mapping for any platform-specific shortcodes. + msg.Text = mapEmojis(msg.Text) + + // Convert markdown to Teams HTML and prepend formatted username. + htmlText := b.formatMessageHTML(msg, mdToTeamsHTML(msg.Text)) + htmlType := msgraph.BodyTypeVHTML + content := &msgraph.ItemBody{Content: &htmlText, ContentType: &htmlType} + rmsg := &msgraph.ChatMessage{Body: content} + + res, err := ct.Add(b.ctx, rmsg) + if err != nil { + return "", err + } + b.sentIDs[*res.ID] = struct{}{} + b.updatedIDs[*res.ID] = time.Now().Add(30 * time.Second) + return *res.ID, nil +} + +// mdToTeamsHTML converts markdown text to Teams-compatible HTML. +// Handles bold, italic, strikethrough, headings, links, blockquotes, +// code fences, and line breaks using the gomarkdown library. +// Code fences are post-processed to use Teams-native tags. +func mdToTeamsHTML(text string) string { + extensions := parser.HardLineBreak | parser.NoIntraEmphasis | parser.FencedCode | parser.Strikethrough | parser.Autolink + p := parser.NewWithExtensions(extensions) + renderer := mdhtml.NewRenderer(mdhtml.RendererOptions{Flags: 0}) + result := string(markdown.ToHTML([]byte(text), p, renderer)) + + // Post-process: convert gomarkdown's
 to Teams .
+        preCodeLangRE := regexp.MustCompile(`
`)
+        result = preCodeLangRE.ReplaceAllString(result, ``)
+        result = strings.ReplaceAll(result, "
", "
") + result = strings.ReplaceAll(result, "
", "")
+
+        // Post-process: convert gomarkdown's  to  for Teams strikethrough support.
+        // Teams renders  but not .
+        result = strings.ReplaceAll(result, "", "")
+        result = strings.ReplaceAll(result, "", "")
+
+        return strings.TrimSpace(result)
+}
+
+// htmlEscape escapes HTML special characters in a string.
+func htmlEscape(s string) string {
+        s = strings.ReplaceAll(s, "&", "&")
+        s = strings.ReplaceAll(s, "<", "<")
+        s = strings.ReplaceAll(s, ">", ">")
+        s = strings.ReplaceAll(s, "\"", """)
+        return s
+}
+
+// extractBridgeName returns the bridge name part from an account string like "mattermost.mybot".
+func extractBridgeName(account string) string {
+        parts := strings.SplitN(account, ".", 2)
+        if len(parts) > 1 {
+                return parts[1]
+        }
+        return account
+}
+
+// formatMessageHTML builds an HTML username prefix from the RemoteNickFormat template.
+// It replaces {NICK} with nick, \n with 
, and expands other placeholders. +func (b *Bmsteams) formatMessageHTML(msg config.Message, bodyHTML string) string { + template := b.GetString("RemoteNickFormat") + if template == "" { + return bodyHTML + } + + // Extract original nick from Extra (set by gateway). + originalNick := "" + if nicks, ok := msg.Extra["nick"]; ok && len(nicks) > 0 { + if n, ok := nicks[0].(string); ok { + originalNick = n + } + } + if originalNick == "" { + originalNick = strings.TrimSpace(msg.Username) + } + + // Extract full display name from Extra (set by Mattermost bridge). + displayName := "" + if dns, ok := msg.Extra["displayname"]; ok && len(dns) > 0 { + if dn, ok := dns[0].(string); ok { + displayName = dn + } + } + if displayName == "" { + displayName = originalNick + } + + // HTML-aware expansion. + result := template + result = strings.ReplaceAll(result, "{DISPLAYNAME}", ""+htmlEscape(displayName)+"") + result = strings.ReplaceAll(result, "{NICK}", ""+htmlEscape(originalNick)+"") + result = strings.ReplaceAll(result, "{NOPINGNICK}", ""+htmlEscape(originalNick)+"") + result = strings.ReplaceAll(result, "{PROTOCOL}", htmlEscape(msg.Protocol)) + result = strings.ReplaceAll(result, "{BRIDGE}", htmlEscape(extractBridgeName(msg.Account))) + result = strings.ReplaceAll(result, "{GATEWAY}", htmlEscape(msg.Gateway)) + result = strings.ReplaceAll(result, "{USERID}", htmlEscape(msg.UserID)) + result = strings.ReplaceAll(result, "{CHANNEL}", htmlEscape(msg.Channel)) + result = strings.ReplaceAll(result, "\n", "
") + + html := result + bodyHTML + + // Embed source message ID as hidden span for historical cache population. + if srcIDs, ok := msg.Extra["source_msgid"]; ok && len(srcIDs) > 0 { + if srcID, ok := srcIDs[0].(string); ok { + html += `` + } + } + + return html +} + +// getAccessToken returns a fresh access token from the token source. +func (b *Bmsteams) getAccessToken() (string, error) { + t, err := b.ts.Token() + if err != nil { + return "", fmt.Errorf("failed to get access token: %w", err) + } + return t.AccessToken, nil +} + +// updateMessage patches an existing Teams message with new content. +// The Teams Graph API only allows the original sender to update via delegated perms, +// so this may fail if matterbridge is not authenticated as the message author. +func (b *Bmsteams) updateMessage(msg config.Message) (string, error) { + // Apply emoji mapping and convert markdown to Teams HTML. + msg.Text = mapEmojis(msg.Text) + htmlText := b.formatMessageHTML(msg, mdToTeamsHTML(msg.Text)) + + type patchBody struct { + Body struct { + ContentType string `json:"contentType"` + Content string `json:"content"` + } `json:"body"` + } + + var patch patchBody + patch.Body.ContentType = "html" + patch.Body.Content = htmlText + + jsonData, err := json.Marshal(patch) + if err != nil { + return "", err + } + + teamID := b.GetString("TeamID") + channelID := decodeChannelID(msg.Channel) + messageID := msg.ID + + url := fmt.Sprintf("https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/%s", + teamID, channelID, messageID) + + token, err := b.getAccessToken() + if err != nil { + return "", err + } + + req, err := http.NewRequestWithContext(b.ctx, http.MethodPatch, url, bytes.NewReader(jsonData)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return "", fmt.Errorf("update message failed: %d %s", resp.StatusCode, string(body)) + } + + // Suppress echo: ignore this message in the poll loop for the next 30 seconds. + // Teams may update LastModifiedDateTime multiple times after a PATCH. + b.updatedIDs[msg.ID] = time.Now().Add(30 * time.Second) + return msg.ID, nil +} + +// deleteMessage soft-deletes a Teams channel message or reply via the Graph API. +// For replies, msg.ParentID must be set to the top-level message ID. +func (b *Bmsteams) deleteMessage(msg config.Message) (string, error) { + teamID := b.GetString("TeamID") + channelID := decodeChannelID(msg.Channel) + messageID := msg.ID + + var url string + if msg.ParentID != "" { + // This is a reply — use the reply softDelete endpoint. + url = fmt.Sprintf("https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/%s/replies/%s/softDelete", + teamID, channelID, msg.ParentID, messageID) + } else { + url = fmt.Sprintf("https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/%s/softDelete", + teamID, channelID, messageID) + } + + req, err := http.NewRequestWithContext(b.ctx, http.MethodPost, url, nil) + if err != nil { + return "", err + } + + token, err := b.getAccessToken() + if err != nil { + return "", err + } + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return "", fmt.Errorf("delete message failed: %d %s", resp.StatusCode, string(body)) + } + + // Suppress echo for the deletion event. + b.updatedIDs[messageID] = time.Now().Add(30 * time.Second) + return messageID, nil +} + +// uploadToMediaServer uploads file bytes to the configured MediaServerUpload endpoint. +func (b *Bmsteams) uploadToMediaServer(fi config.FileInfo) (string, error) { + serverURL := b.GetString("MediaServerUpload") + if serverURL == "" { + return "", fmt.Errorf("no MediaServerUpload configured") + } + + var buf bytes.Buffer + writer := multipart.NewWriter(&buf) + + part, err := writer.CreateFormFile("file", fi.Name) + if err != nil { + return "", err + } + if _, err = io.Copy(part, bytes.NewReader(*fi.Data)); err != nil { + return "", err + } + writer.Close() + + resp, err := http.Post(serverURL+"/"+url.PathEscape(fi.Name), writer.FormDataContentType(), &buf) //nolint:gosec + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("media server returned %d", resp.StatusCode) + } + + urlBytes, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + return strings.TrimSpace(string(urlBytes)), nil +} + +// mimeTypeForFile returns a MIME type for image files, or empty string otherwise. +func mimeTypeForFile(name string) string { + switch strings.ToLower(filepath.Ext(name)) { + case ".jpg", ".jpeg": + return "image/jpeg" + case ".png": + return "image/png" + case ".gif": + return "image/gif" + case ".webp": + return "image/webp" + case ".svg": + return "image/svg+xml" + case ".bmp": + return "image/bmp" + default: + return "" + } +} + +func isImageFile(name string) bool { + return mimeTypeForFile(name) != "" +} + +// isSupportedHostedContentType returns true if the file type can be embedded +// via the Graph API hostedContents endpoint. Only JPG and PNG are supported. +func isSupportedHostedContentType(name string) bool { + mime := mimeTypeForFile(name) + return mime == "image/jpeg" || mime == "image/png" +} + +// sendImageHostedContent sends one or more images as a single Teams message using +// the hostedContents API. Image data is base64-encoded and embedded directly in the +// message, so no external server or public URL is required. Only works for JPG/PNG. +// The captionHTML parameter allows including additional text in the same message. +func (b *Bmsteams) sendImageHostedContent(msg config.Message, files []config.FileInfo, captionHTML string) (string, error) { + if len(files) == 0 { + return "", fmt.Errorf("sendImageHostedContent requires at least one file") + } + + type hostedContent struct { + TempID string `json:"@microsoft.graph.temporaryId"` + ContentBytes string `json:"contentBytes"` + ContentType string `json:"contentType"` + } + type msgBody struct { + ContentType string `json:"contentType"` + Content string `json:"content"` + } + type graphMessage struct { + Body msgBody `json:"body"` + HostedContents []hostedContent `json:"hostedContents"` + } + + usernameHTML := b.formatMessageHTML(msg, "") + bodyHTML := usernameHTML + if captionHTML != "" { + bodyHTML += captionHTML + "
" + } + + var hosted []hostedContent + for i, fi := range files { + if fi.Data == nil { + continue + } + id := fmt.Sprintf("%d", i+1) + bodyHTML += fmt.Sprintf( + `%s`, + id, htmlEscape(fi.Name), + ) + if i < len(files)-1 { + bodyHTML += "
" + } + hosted = append(hosted, hostedContent{ + TempID: id, + ContentBytes: base64.StdEncoding.EncodeToString(*fi.Data), + ContentType: mimeTypeForFile(fi.Name), + }) + } + + if len(hosted) == 0 { + return "", fmt.Errorf("no valid image data to send") + } + + payload := graphMessage{ + Body: msgBody{ + ContentType: "html", + Content: bodyHTML, + }, + HostedContents: hosted, + } + + jsonData, err := json.Marshal(payload) + if err != nil { + return "", err + } + + teamID := b.GetString("TeamID") + channelID := decodeChannelID(msg.Channel) + + var apiURL string + if msg.ParentValid() { + apiURL = fmt.Sprintf("https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/%s/replies", + teamID, channelID, msg.ParentID) + } else { + apiURL = fmt.Sprintf("https://graph.microsoft.com/beta/teams/%s/channels/%s/messages", + teamID, channelID) + } + + token, err := b.getAccessToken() + if err != nil { + return "", err + } + + req, err := http.NewRequestWithContext(b.ctx, http.MethodPost, apiURL, bytes.NewReader(jsonData)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + + if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("sendImageHostedContent failed: %d %s", resp.StatusCode, string(respBody)) + } + + // Parse the response to extract the message ID for echo prevention. + var result struct { + ID string `json:"id"` + } + if err := json.Unmarshal(respBody, &result); err == nil && result.ID != "" { + b.sentIDs[result.ID] = struct{}{} + b.updatedIDs[result.ID] = time.Now().Add(30 * time.Second) + return result.ID, nil + } + return "", nil +} + +// sendFileAsMessage sends a file as a Teams message using a URL (either from the +// source bridge or uploaded to a MediaServer). For hostedContents-supported images, +// use sendImageHostedContent instead (called from Send()). +// The captionHTML parameter allows including additional text in the same message. +func (b *Bmsteams) sendFileAsMessage(msg config.Message, fi config.FileInfo, captionHTML string) (string, error) { + isImage := isImageFile(fi.Name) + + contentType := msgraph.BodyTypeVHTML + var bodyText string + + fileURL := fi.URL + if fileURL == "" && fi.Data != nil { + uploadedURL, err := b.uploadToMediaServer(fi) + if err != nil { + b.Log.Debugf("media server upload failed for %s: %s", fi.Name, err) + } else { + fileURL = uploadedURL + } + } + + usernameHTML := b.formatMessageHTML(msg, "") + captionPart := "" + if captionHTML != "" { + captionPart = captionHTML + "
" + } + + switch { + case fileURL != "" && isImage: + bodyText = fmt.Sprintf( + `%s%s%s`, + usernameHTML, captionPart, htmlEscape(fileURL), htmlEscape(fi.Name), + ) + case fileURL != "": + bodyText = fmt.Sprintf( + `%s%s📎 %s`, + usernameHTML, captionPart, htmlEscape(fileURL), htmlEscape(fi.Name), + ) + default: + // File can't be sent: no hostedContents support and no MediaServer URL. + // Send a notification back to the source side via b.Remote so users + // know the file didn't arrive (instead of posting to Teams). + b.Log.Warnf("cannot send file %s (%s) to Teams: type not supported by hostedContents and no MediaServerUpload configured", + fi.Name, mimeTypeForFile(fi.Name)) + // Return a fake ID so the gateway caches it as a BrMsgID for this + // message. The notification references it as ParentID — the gateway + // then resolves it back to the original source post ID via the + // downstream search in FindCanonicalMsgID + the protocol-strip fallback + // in getDestMsgID. + fakeID := fmt.Sprintf("unsupported-%d", time.Now().UnixNano()) + go func() { + b.Remote <- config.Message{ + Text: fmt.Sprintf("⚠️ File **%s** (%s) could not be transferred to Teams"+ + " — format not supported, no MediaServer configured.", + fi.Name, mimeTypeForFile(fi.Name)), + Channel: msg.Channel, + Account: b.Account, + Username: "matterbridge", + ParentID: fakeID, + Extra: make(map[string][]interface{}), + } + }() + return fakeID, nil + } + + content := &msgraph.ItemBody{ + Content: &bodyText, + ContentType: &contentType, + } + chatMsg := &msgraph.ChatMessage{Body: content} + + var res *msgraph.ChatMessage + var err error + if msg.ParentValid() { + ct := b.gc.Teams().ID(b.GetString("TeamID")).Channels().ID(decodeChannelID(msg.Channel)).Messages().ID(msg.ParentID).Replies().Request() + res, err = ct.Add(b.ctx, chatMsg) + } else { + ct := b.gc.Teams().ID(b.GetString("TeamID")).Channels().ID(decodeChannelID(msg.Channel)).Messages().Request() + res, err = ct.Add(b.ctx, chatMsg) + } + if err != nil { + return "", err + } + if res != nil && res.ID != nil { + b.sentIDs[*res.ID] = struct{}{} + b.updatedIDs[*res.ID] = time.Now().Add(30 * time.Second) + return *res.ID, nil + } + return "", nil } func (b *Bmsteams) sendReply(msg config.Message) (string, error) { - ct := b.gc.Teams().ID(b.GetString("TeamID")).Channels().ID(msg.Channel).Messages().ID(msg.ParentID).Replies().Request() - // Handle prefix hint for unthreaded messages. + channelID := decodeChannelID(msg.Channel) + b.Log.Debugf("sendReply: ParentID=%s Channel=%s", msg.ParentID, channelID) + ct := b.gc.Teams().ID(b.GetString("TeamID")).Channels().ID(channelID).Messages().ID(msg.ParentID).Replies().Request() + + // Apply emoji mapping for any platform-specific shortcodes. + msg.Text = mapEmojis(msg.Text) + + // Convert markdown to Teams HTML and prepend formatted username. + htmlText := b.formatMessageHTML(msg, mdToTeamsHTML(msg.Text)) + htmlType := msgraph.BodyTypeVHTML + content := &msgraph.ItemBody{Content: &htmlText, ContentType: &htmlType} + rmsg := &msgraph.ChatMessage{Body: content} - text := msg.Username + msg.Text - content := &msgraph.ItemBody{Content: &text} - rmsg := &msgraph.ChatMessage{Body: content} - res, err := ct.Add(b.ctx, rmsg) - if err != nil { - return "", err - } - return *res.ID, nil + res, err := ct.Add(b.ctx, rmsg) + if err != nil { + b.Log.Errorf("sendReply failed: ParentID=%s err=%s", msg.ParentID, err) + return "", err + } + b.sentIDs[*res.ID] = struct{}{} + b.updatedIDs[*res.ID] = time.Now().Add(30 * time.Second) + return *res.ID, nil } -func (b *Bmsteams) getMessages(channel string) ([]msgraph.ChatMessage, error) { - ct := b.gc.Teams().ID(b.GetString("TeamID")).Channels().ID(channel).Messages().Request() - rct, err := ct.Get(b.ctx) - if err != nil { - return nil, err - } - b.Log.Debugf("got %#v messages", len(rct)) - return rct, nil +// decodeChannelID URL-decodes a channel ID if needed. +// The gateway stores channel IDs URL-encoded (e.g. 19%3A...%40thread.tacv2) +// but the Teams Graph API requires the decoded form (19:...@thread.tacv2). +func decodeChannelID(id string) string { + decoded, err := url.PathUnescape(id) + if err != nil { + return id + } + return decoded } +func (b *Bmsteams) getReplies(channel, messageID string) ([]msgraph.ChatMessage, error) { + ct := b.gc.Teams().ID(b.GetString("TeamID")).Channels().ID(decodeChannelID(channel)).Messages().ID(messageID).Replies().Request() + return ct.Get(b.ctx) +} + +// poll uses Graph API delta queries to detect new/changed/deleted root messages +// and getReplies() to poll thread replies for recent threads. +// On first start (no stored delta token), it initializes with $deltatoken=latest. +// On restart (stored delta token), it replays missed messages before entering the poll loop. +// //nolint:gocognit func (b *Bmsteams) poll(channelName string) error { - msgmap := make(map[string]time.Time) - b.Log.Debug("getting initial messages") - res, err := b.getMessages(channelName) - if err != nil { - return err - } - for _, msg := range res { - msgmap[*msg.ID] = *msg.CreatedDateTime - if msg.LastModifiedDateTime != nil { - msgmap[*msg.ID] = *msg.LastModifiedDateTime - } - } - time.Sleep(time.Second * 5) - b.Log.Debug("polling for messages") - for { - res, err := b.getMessages(channelName) - if err != nil { - return err - } - for i := len(res) - 1; i >= 0; i-- { - msg := res[i] - if mtime, ok := msgmap[*msg.ID]; ok { - if mtime == *msg.CreatedDateTime && msg.LastModifiedDateTime == nil { - continue - } - if msg.LastModifiedDateTime != nil && mtime == *msg.LastModifiedDateTime { - continue - } - } - - if b.GetBool("debug") { - b.Log.Debug("Msg dump: ", spew.Sdump(msg)) - } - - // skip non-user message for now. - if msg.From == nil || msg.From.User == nil { - continue - } - - if *msg.From.User.ID == b.botID { - b.Log.Debug("skipping own message") - msgmap[*msg.ID] = *msg.CreatedDateTime - continue - } - - msgmap[*msg.ID] = *msg.CreatedDateTime - if msg.LastModifiedDateTime != nil { - msgmap[*msg.ID] = *msg.LastModifiedDateTime - } - b.Log.Debugf("<= Sending message from %s on %s to gateway", *msg.From.User.DisplayName, b.Account) - text := b.convertToMD(*msg.Body.Content) - rmsg := config.Message{ - Username: *msg.From.User.DisplayName, - Text: text, - Channel: channelName, - Account: b.Account, - Avatar: "", - UserID: *msg.From.User.ID, - ID: *msg.ID, - Extra: make(map[string][]interface{}), - } - - b.handleAttachments(&rmsg, msg) - b.Log.Debugf("<= Message is %#v", rmsg) - b.Remote <- rmsg - } - time.Sleep(time.Second * 5) - } + channelKey := channelName + b.Account + teamID := b.GetString("TeamID") + channelID := decodeChannelID(channelName) + mbSrcRE := regexp.MustCompile(`data-mb-src="([^"]+)"`) + startTime := time.Now() + + // 1. Determine initial delta URL: stored token (replay) or $deltatoken=latest (first start). + isReplay := false + var deltaURL string + if b.GetDeltaToken != nil { + if token, ok := b.GetDeltaToken(channelKey); ok && token != "" { + deltaURL = token + isReplay = true + } + } + if deltaURL == "" { + deltaURL = fmt.Sprintf( + "https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/delta?$deltatoken=latest", + teamID, channelID) + b.Log.Debugf("poll: first start for %s, using $deltatoken=latest", channelName) + } + + // 2. Initial fetch. + messages, replyToIDs, deltaLink, err := b.fetchDelta(deltaURL) + if err == errDeltaTokenExpired { + b.Log.Warn("poll: delta token expired, re-initializing") + deltaURL = fmt.Sprintf( + "https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/delta?$deltatoken=latest", + teamID, channelID) + messages, replyToIDs, deltaLink, err = b.fetchDelta(deltaURL) + isReplay = false + } + if err != nil { + return fmt.Errorf("initial fetchDelta: %w", err) + } + + msgmap := make(map[string]time.Time) + rootMsgCreated := make(map[string]time.Time) // rootID → createdDateTime (for reply polling) + + if isReplay { + count := b.processReplay(messages, replyToIDs, channelName) + if count > 0 { + b.Log.Infof("poll: replayed %d missed messages from %s", count, channelName) + } + } + // Seed msgmap with all messages from the initial fetch (including replayed ones). + b.seedMsgmap(messages, replyToIDs, msgmap, mbSrcRE, channelName, rootMsgCreated) + + // Seed replies for known root messages to avoid false-positive relaying + // on the first poll cycle. + for rootID := range rootMsgCreated { + replies, err := b.getReplies(channelName, rootID) + if err != nil { + b.Log.Errorf("seeding replies for %s: %s", rootID, err) + continue + } + rids := make(map[string]string) + for _, r := range replies { + if r.ID != nil { + rids[*r.ID] = rootID + } + } + if isReplay { + count := b.processReplay(replies, rids, channelName) + if count > 0 { + b.Log.Infof("poll: replayed %d missed replies for thread %s", count, rootID) + } + } + b.seedMsgmap(replies, rids, msgmap, mbSrcRE, channelName, nil) + } + + if b.SetDeltaToken != nil && deltaLink != "" { + b.SetDeltaToken(channelKey, deltaLink) + } + + b.Log.Debugf("poll: entering delta poll loop for %s (%d root messages tracked)", channelName, len(rootMsgCreated)) + + // 3. Poll loop. + for { + time.Sleep(2 * time.Second) + + messages, replyToIDs, newDeltaLink, err := b.fetchDelta(deltaLink) + if err == errDeltaTokenExpired { + return fmt.Errorf("delta token expired mid-poll: %w", err) + } + if err != nil { + return fmt.Errorf("fetchDelta: %w", err) + } + + b.processDelta(messages, replyToIDs, channelName, msgmap, mbSrcRE, startTime) + + // Delta-guided reply polling: when a root message appears in delta + // (e.g. lastModifiedDateTime updated because a reply was posted), + // fetch and process its replies. + for _, msg := range messages { + if msg.ID == nil { + continue + } + // Skip if this is somehow a reply (shouldn't happen with delta, but be safe). + if _, isReply := replyToIDs[*msg.ID]; isReply { + continue + } + replies, err := b.getReplies(channelName, *msg.ID) + if err != nil { + b.Log.Errorf("getReplies for %s: %s", *msg.ID, err) + continue + } + rids := make(map[string]string) + for _, r := range replies { + if r.ID != nil { + rids[*r.ID] = *msg.ID + } + } + b.processDelta(replies, rids, channelName, msgmap, mbSrcRE, startTime) + } + + if newDeltaLink != "" { + deltaLink = newDeltaLink + if b.SetDeltaToken != nil { + b.SetDeltaToken(channelKey, deltaLink) + } + } + } } func (b *Bmsteams) setBotID() error { - req := b.gc.Me().Request() - r, err := req.Get(b.ctx) - if err != nil { - return err - } - b.botID = *r.ID - return nil + req := b.gc.Me().Request() + r, err := req.Get(b.ctx) + if err != nil { + return err + } + b.botID = *r.ID + return nil } func (b *Bmsteams) convertToMD(text string) string { - if !strings.Contains(text, "
") { - return text - } - var sb strings.Builder - err := godown.Convert(&sb, strings.NewReader(text), nil) - if err != nil { - b.Log.Errorf("Couldn't convert message to markdown %s", text) - return text - } - return sb.String() + // Pre-process Teams-specific tags that godown doesn't understand. + + // Convert to just the alt text (the actual emoji character). + emojiRE := regexp.MustCompile(`]*\salt="([^"]*)"[^>]*>.*?`) + text = emojiRE.ReplaceAllString(text, "$1") + + // Convert ... to markdown fenced code blocks. + codeblockRE := regexp.MustCompile(`(?is)]*class="([^"]*)"[^>]*>]*>(.*?)`) + if codeblockRE.MatchString(text) { + parts := codeblockRE.FindStringSubmatch(text) + lang := strings.ToLower(parts[1]) + code := parts[2] + + // Replace
with newlines first (before stripping other tags). + code = regexp.MustCompile(`(?i)`).ReplaceAllString(code, "\n") + + // Replace block-level closing/opening tags with newlines. + code = regexp.MustCompile(`(?i)]*)?>`).ReplaceAllString(code, "\n") + + // Strip remaining HTML tags (syntax highlighting spans etc.) + code = regexp.MustCompile(`<[^>]+>`).ReplaceAllString(code, "") + + // Decode HTML entities. + code = strings.ReplaceAll(code, "<", "<") + code = strings.ReplaceAll(code, ">", ">") + code = strings.ReplaceAll(code, "&", "&") + code = strings.ReplaceAll(code, " ", " ") + code = strings.ReplaceAll(code, " ", " ") + + // Replace non-breaking space (U+00A0) used by Teams as line separator. + code = strings.ReplaceAll(code, "\u00a0", "\n") + + // Collapse excessive newlines. + code = regexp.MustCompile(`\n{3,}`).ReplaceAllString(code, "\n\n") + code = strings.TrimSpace(code) + + replacement := "\n```" + lang + "\n" + code + "\n```\n" + text = codeblockRE.ReplaceAllLiteralString(text, replacement) + } + + // Strip inline tags that reference hostedContents URLs — these are + // Teams-internal image URLs that require authentication and would produce + // broken markdown like ![image](https://graph.microsoft.com/.../hostedContents/.../$value). + // The actual image data is handled separately via handleAttachments(). + hostedImgRE := regexp.MustCompile(`(?i)]*src="[^"]*hostedContents[^"]*"[^>]*/?>`) + text = hostedImgRE.ReplaceAllString(text, "") + + // Convert strikethrough HTML tags to markdown before godown (godown may not handle these). + strikeRE := regexp.MustCompile(`(?is)<(s|del|strike)>(.*?)`) + text = strikeRE.ReplaceAllString(text, "~~$2~~") + + // Strip empty paragraphs that Teams inserts around code blocks. + emptyParaRE := regexp.MustCompile(`(?i)]*>\s*( |\s)*

`) + text = emptyParaRE.ReplaceAllString(text, "") + + // If no HTML tags remain, return as-is (preserves codeblock newlines). + if !strings.ContainsAny(text, "<>") { + return strings.TrimSpace(text) + } + + // Convert remaining HTML to Markdown using godown. + var sb strings.Builder + err := godown.Convert(&sb, strings.NewReader(text), nil) + if err != nil { + b.Log.Errorf("Couldn't convert message to markdown: %s", err) + return text + } + + return strings.TrimSpace(sb.String()) } diff --git a/bridge/msteams/test.go b/bridge/msteams/test.go new file mode 100644 index 0000000000..690f9b9bcd --- /dev/null +++ b/bridge/msteams/test.go @@ -0,0 +1,319 @@ +package bmsteams + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/matterbridge-org/matterbridge/testdata" + msgraph "github.com/yaegashi/msgraph.go/beta" +) + +// isTestCommand returns true if the message text is exactly "@matterbridge test". +func (b *Bmsteams) isTestCommand(text string) bool { + return strings.TrimSpace(strings.ToLower(text)) == "@matterbridge test" +} + +// runTestSequence posts a series of test messages to the given channel. +// Messages are posted via the Graph API but NOT added to sentIDs/updatedIDs, +// so the poll loop picks them up and relays them to the other bridge side. +func (b *Bmsteams) runTestSequence(channelName string) { + teamID := b.GetString("TeamID") + channelID := decodeChannelID(channelName) + + b.Log.Infof("test: starting test sequence in channel %s", channelName) + + htmlType := msgraph.BodyTypeVHTML + + // Helper to post a top-level message and return its ID. + postRoot := func(text string, contentType *msgraph.BodyType) string { + // Add bridge marker so processReplay() skips test messages on restart. + text += `` + if contentType == nil { + contentType = &htmlType + } + ct := b.gc.Teams().ID(teamID).Channels().ID(channelID).Messages().Request() + content := &msgraph.ItemBody{Content: &text} + if contentType != nil { + content.ContentType = contentType + } + res, err := ct.Add(b.ctx, &msgraph.ChatMessage{Body: content}) + if err != nil { + b.Log.Errorf("test: post root failed: %s", err) + return "" + } + // Do NOT add to sentIDs — let poll() pick it up for relay. + return *res.ID + } + + // Helper to post a reply and return its ID. + postReply := func(rootID, text string, contentType *msgraph.BodyType) string { + // Add bridge marker so processReplay() skips test messages on restart. + text += `` + if contentType == nil { + contentType = &htmlType + } + ct := b.gc.Teams().ID(teamID).Channels().ID(channelID).Messages().ID(rootID).Replies().Request() + content := &msgraph.ItemBody{Content: &text} + if contentType != nil { + content.ContentType = contentType + } + res, err := ct.Add(b.ctx, &msgraph.ChatMessage{Body: content}) + if err != nil { + b.Log.Errorf("test: post reply failed: %s", err) + return "" + } + // Do NOT add to sentIDs — let poll() pick it up for relay. + return *res.ID + } + + // Helper to edit a reply without adding to updatedIDs. + editReply := func(rootID, replyID, newText string) { + type patchBody struct { + Body struct { + ContentType string `json:"contentType"` + Content string `json:"content"` + } `json:"body"` + } + var patch patchBody + patch.Body.ContentType = "text" + patch.Body.Content = newText + + jsonData, err := json.Marshal(patch) + if err != nil { + b.Log.Errorf("test: marshal failed: %s", err) + return + } + + url := fmt.Sprintf("https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/%s/replies/%s", + teamID, channelID, rootID, replyID) + + token, err := b.getAccessToken() + if err != nil { + b.Log.Errorf("test: getAccessToken failed: %s", err) + return + } + + req, err := http.NewRequestWithContext(b.ctx, http.MethodPatch, url, bytes.NewReader(jsonData)) + if err != nil { + b.Log.Errorf("test: NewRequest failed: %s", err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + b.Log.Errorf("test: PATCH failed: %s", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + b.Log.Errorf("test: edit reply failed: %d %s", resp.StatusCode, string(body)) + } + // Do NOT add to updatedIDs — let poll() pick up the edit for relay. + } + + // Helper to soft-delete a reply without adding to updatedIDs. + deleteReply := func(rootID, replyID string) { + url := fmt.Sprintf("https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/%s/replies/%s/softDelete", + teamID, channelID, rootID, replyID) + + token, err := b.getAccessToken() + if err != nil { + b.Log.Errorf("test: getAccessToken failed: %s", err) + return + } + + req, err := http.NewRequestWithContext(b.ctx, http.MethodPost, url, nil) + if err != nil { + b.Log.Errorf("test: NewRequest failed: %s", err) + return + } + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + b.Log.Errorf("test: softDelete failed: %s", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + b.Log.Errorf("test: delete reply failed: %d %s", resp.StatusCode, string(body)) + } + // Do NOT add to updatedIDs — let poll() pick up the delete for relay. + } + + // Step 1: Root message + rootID := postRoot("🧪 Matterbridge Test Sequence
This is a root message to test the bridge relay.", &htmlType) + if rootID == "" { + return + } + time.Sleep(time.Second) + + // Step 2: Thread reply + postReply(rootID, "This is a thread reply to test threading support.", nil) + time.Sleep(time.Second) + + // Step 3: Typo message (will be edited later) + typoID := postReply(rootID, "this message contains a tipo", nil) + time.Sleep(time.Second) + + // Step 4: Code block + codeHTML := `def hello():
for i in range(3):
print(f"Hello from Matterbridge! ({i+1})")

hello()
` + postReply(rootID, codeHTML, &htmlType) + time.Sleep(time.Second) + + // Step 5: Message to be deleted + deleteID := postReply(rootID, "this message will be deleted", nil) + time.Sleep(time.Second) + + // Step 6: Quote block + postReply(rootID, "
This is a quoted line.
Matterbridge supports quote blocks.
Third line of the quote.
", &htmlType) + time.Sleep(time.Second) + + // Step 7: Emojis + postReply(rootID, "👍 🎉 🚀 ❤️ 👀 🇦🇹", nil) + time.Sleep(time.Second) + + // Step 8: Edit the typo message + if typoID != "" { + editReply(rootID, typoID, "this message contained a typo") + } + time.Sleep(time.Second) + + // Step 9: Text formatting demo + formattingHTML := `This text is bold
` + + `This text is italic
` + + `This text is strikethrough
` + + `

This is a heading

` + + `This is a link` + postReply(rootID, formattingHTML, &htmlType) + time.Sleep(time.Second) + + // Step 10: Unordered list + postReply(rootID, "
  • Item one
  • Item two
  • Item three
", &htmlType) + time.Sleep(time.Second) + + // Step 11: Ordered list + postReply(rootID, "
  1. First point
  2. Second point
  3. Third point
", &htmlType) + time.Sleep(time.Second) + + type testImage struct { + name string + contentType string + data []byte + } + + // Helper to post a reply with hostedContents images. + postReplyWithImages := func(rootID, caption string, images []testImage) { + type hostedContent struct { + TempID string `json:"@microsoft.graph.temporaryId"` + ContentBytes string `json:"contentBytes"` + ContentType string `json:"contentType"` + } + type msgBody struct { + ContentType string `json:"contentType"` + Content string `json:"content"` + } + type graphMessage struct { + Body msgBody `json:"body"` + HostedContents []hostedContent `json:"hostedContents"` + } + + bodyHTML := caption + if bodyHTML != "" { + bodyHTML += "
" + } + var hosted []hostedContent + for i, img := range images { + id := fmt.Sprintf("%d", i+1) + bodyHTML += fmt.Sprintf(`%s`, id, img.name) + if i < len(images)-1 { + bodyHTML += "
" + } + hosted = append(hosted, hostedContent{ + TempID: id, + ContentBytes: base64.StdEncoding.EncodeToString(img.data), + ContentType: img.contentType, + }) + } + + payload := graphMessage{ + Body: msgBody{ContentType: "html", Content: bodyHTML}, + HostedContents: hosted, + } + jsonData, err := json.Marshal(payload) + if err != nil { + b.Log.Errorf("test: marshal image payload failed: %s", err) + return + } + + apiURL := fmt.Sprintf("https://graph.microsoft.com/beta/teams/%s/channels/%s/messages/%s/replies", + teamID, channelID, rootID) + token, err := b.getAccessToken() + if err != nil { + b.Log.Errorf("test: getAccessToken failed: %s", err) + return + } + req, err := http.NewRequestWithContext(b.ctx, http.MethodPost, apiURL, bytes.NewReader(jsonData)) + if err != nil { + b.Log.Errorf("test: NewRequest failed: %s", err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + b.Log.Errorf("test: image post failed: %s", err) + return + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + b.Log.Errorf("test: image reply failed: %d %s", resp.StatusCode, string(body)) + } + // Do NOT add to sentIDs — let poll() pick it up for relay. + } + + // Step 12: Single PNG image + postReplyWithImages(rootID, "Image test: PNG", []testImage{ + {name: "demo.png", contentType: "image/png", data: testdata.DemoPNG}, + }) + time.Sleep(3 * time.Second) + + // Step 13: GIF — hostedContents only supports JPG/PNG; Teams client uses SharePoint for GIFs. + postReply(rootID, "⚠️ Please manually check GIF file transmission from Teams to Mattermost — this test cannot upload files to your SharePoint.", nil) + time.Sleep(time.Second) + + // Step 14: File upload — SharePoint needed, cannot be automated. + postReply(rootID, "⚠️ Please manually check file (PDF, etc.) transmission from Teams to Mattermost — this test cannot upload files to your SharePoint.", nil) + time.Sleep(time.Second) + + // Step 15: Multi-image (2x PNG in one message) + postReplyWithImages(rootID, "Image test: multi-image (2x PNG)", []testImage{ + {name: "demo1.png", contentType: "image/png", data: testdata.DemoPNG}, + {name: "demo2.png", contentType: "image/png", data: testdata.DemoPNG}, + }) + time.Sleep(3 * time.Second) + + // Step 16: Delete the marked message + if deleteID != "" { + deleteReply(rootID, deleteID) + } + + // Step 17: Test finished + postReply(rootID, "✅ Test finished", nil) + + b.Log.Info("test: test sequence completed") +} diff --git a/gateway/gateway.go b/gateway/gateway.go index 7ca84e4f49..033bcbab56 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -20,14 +20,15 @@ import ( type Gateway struct { config.Config - Router *Router - MyConfig *config.Gateway - Bridges map[string]*bridge.Bridge - Channels map[string]*config.ChannelInfo - ChannelOptions map[string]config.ChannelOptions - Message chan config.Message - Name string - Messages *lru.Cache + Router *Router + MyConfig *config.Gateway + Bridges map[string]*bridge.Bridge + Channels map[string]*config.ChannelInfo + ChannelOptions map[string]config.ChannelOptions + Message chan config.Message + Name string + Messages *lru.Cache + BridgeCaches map[string]*PersistentMsgCache // per-bridge persistent caches, keyed by Account logger *logrus.Entry } @@ -58,6 +59,28 @@ func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) *Gateway { if err := gw.AddConfig(cfg); err != nil { logger.Errorf("Failed to add configuration to gateway: %#v", err) } + + // Initialize per-bridge persistent message ID caches. + // Each bridge with a MessageCacheFile setting gets its own cache. + // br.GetString() checks per-bridge config first, then falls back to [general]. + // Bridges resolving to the same file path share one cache instance. + gw.BridgeCaches = make(map[string]*PersistentMsgCache) + pathToCache := make(map[string]*PersistentMsgCache) + for _, br := range gw.Bridges { + p := br.GetString("MessageCacheFile") + if p == "" { + continue + } + if existing, ok := pathToCache[p]; ok { + gw.BridgeCaches[br.Account] = existing + } else { + maxAge := parseCacheDuration(br.GetString("MessageCacheDuration")) + cache := NewPersistentMsgCache(p, maxAge, logger) + pathToCache[p] = cache + gw.BridgeCaches[br.Account] = cache + } + } + return gw } @@ -78,9 +101,125 @@ func (gw *Gateway) FindCanonicalMsgID(protocol string, mID string) string { } } } + + // Fallback to persistent cache if LRU missed. + if gw.hasPersistentCache() { + // Check if ID is a direct key in persistent cache. + if entries, ok := gw.persistentCacheGet(ID); ok { + gw.restoreToCacheFindCanonical(ID, entries) + return ID + } + // Check if ID is a downstream value. + if canonical := gw.persistentCacheFindDownstream(ID); canonical != "" { + if entries, ok := gw.persistentCacheGet(canonical); ok { + gw.restoreToCacheFindCanonical(canonical, entries) + } + return canonical + } + } + + return "" +} + +// restoreToCacheFindCanonical restores persistent cache entries into the LRU cache. +func (gw *Gateway) restoreToCacheFindCanonical(key string, entries []PersistentMsgEntry) { + var brMsgIDs []*BrMsgID + for _, entry := range entries { + br := gw.findBridge(entry.Protocol, entry.BridgeName) + if br == nil { + continue + } + brMsgIDs = append(brMsgIDs, &BrMsgID{ + br: br, + ID: entry.ID, + ChannelID: entry.ChannelID, + }) + } + if len(brMsgIDs) > 0 { + gw.Messages.Add(key, brMsgIDs) + } +} + +// findBridge looks up a bridge by protocol and name across the gateway. +func (gw *Gateway) findBridge(protocol, name string) *bridge.Bridge { + for _, br := range gw.Bridges { + if br.Protocol == protocol && br.Name == name { + return br + } + } + return nil +} + +// hasPersistentCache returns true if any bridge has a persistent cache configured. +func (gw *Gateway) hasPersistentCache() bool { + return len(gw.BridgeCaches) > 0 +} + +// persistentCacheAdd writes an entry to the persistent cache of the given +// source bridge account. Lookups (Get/FindDownstream) still search all caches. +func (gw *Gateway) persistentCacheAdd(key string, entries []PersistentMsgEntry, sourceAccount string) { + if cache, ok := gw.BridgeCaches[sourceAccount]; ok && cache != nil { + cache.Add(key, entries) + gw.logger.Debugf("persistentCacheAdd: %s → %d entries (cache: %s)", key, len(entries), sourceAccount) + } else { + gw.logger.Debugf("persistentCacheAdd: %s SKIPPED (no cache for %s, have: %v)", key, sourceAccount, func() []string { + keys := make([]string, 0, len(gw.BridgeCaches)) + for k := range gw.BridgeCaches { + keys = append(keys, k) + } + return keys + }()) + } +} + +// persistentCacheGet looks up an entry across all persistent caches. +func (gw *Gateway) persistentCacheGet(key string) ([]PersistentMsgEntry, bool) { + for _, cache := range gw.BridgeCaches { + if cache != nil { + if entries, ok := cache.Get(key); ok { + return entries, true + } + } + } + return nil, false +} + +// persistentCacheFindDownstream searches for a downstream ID across all persistent caches. +func (gw *Gateway) persistentCacheFindDownstream(id string) string { + for _, cache := range gw.BridgeCaches { + if cache != nil { + if key := cache.FindDownstream(id); key != "" { + return key + } + } + } return "" } +// stopPersistentCaches stops all unique persistent cache instances. +func (gw *Gateway) stopPersistentCaches() { + seen := make(map[*PersistentMsgCache]bool) + for _, cache := range gw.BridgeCaches { + if cache != nil && !seen[cache] { + cache.Stop() + seen[cache] = true + } + } +} + +// parseCacheDuration parses a MessageCacheDuration string (e.g. "168h", "24h"). +// Returns 0 for empty/invalid values (caller should apply default). +func parseCacheDuration(s string) time.Duration { + if s == "" { + return 0 + } + d, err := time.ParseDuration(s) + if err != nil || d <= 0 { + return 0 + } + return d +} + // AddBridge sets up a new bridge on startup. // // It's added in the gateway object with the specified configuration, and is @@ -105,6 +244,56 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error { brconfig := &bridge.Config{ Remote: gw.Message, Bridge: br, + IsMessageBridged: func(protocol, msgID string) bool { + key := protocol + " " + msgID + if entries, exists := gw.persistentCacheGet(key); exists { + gw.logger.Debugf("IsMessageBridged: %s found (direct, %d entries)", key, len(entries)) + return true + } + if downstream := gw.persistentCacheFindDownstream(key); downstream != "" { + gw.logger.Debugf("IsMessageBridged: %s found (downstream of %s)", key, downstream) + return true + } + gw.logger.Debugf("IsMessageBridged: %s NOT found (caches: %d)", key, len(gw.BridgeCaches)) + return false + }, + GetLastSeen: func(channelKey string) (time.Time, bool) { + for _, cache := range gw.BridgeCaches { + if cache != nil { + if t, ok := cache.GetLastSeen(channelKey); ok { + return t, true + } + } + } + return time.Time{}, false + }, + MarkMessageBridged: func(protocol, msgID string) { + key := protocol + " " + msgID + gw.logger.Debugf("MarkMessageBridged: %s", key) + for _, cache := range gw.BridgeCaches { + if cache != nil { + // Store a sentinel entry (not empty) so prune() doesn't delete it. + cache.Add(key, []PersistentMsgEntry{{Protocol: protocol}}) + } + } + }, + SetDeltaToken: func(channelKey, token string) { + for _, cache := range gw.BridgeCaches { + if cache != nil { + cache.SetDeltaToken(channelKey, token) + } + } + }, + GetDeltaToken: func(channelKey string) (string, bool) { + for _, cache := range gw.BridgeCaches { + if cache != nil { + if token, ok := cache.GetDeltaToken(channelKey); ok { + return token, true + } + } + } + return "", false + }, } // add the actual bridger for this protocol to this bridge using the bridgeMap if _, ok := gw.Router.BridgeMap[br.Protocol]; !ok { @@ -283,6 +472,23 @@ func (gw *Gateway) getDestMsgID(msgID string, dest *bridge.Bridge, channel *conf } } } + + // Fallback to persistent cache if LRU missed. + if gw.hasPersistentCache() { + if entries, ok := gw.persistentCacheGet(msgID); ok { + // Restore to LRU and retry. + gw.restoreToCacheFindCanonical(msgID, entries) + if res, ok := gw.Messages.Get(msgID); ok { + IDs := res.([]*BrMsgID) + for _, id := range IDs { + if dest.Protocol == id.br.Protocol && dest.Name == id.br.Name && channel.ID == id.ChannelID { + return strings.Replace(id.ID, dest.Protocol+" ", "", 1) + } + } + } + } + } + return "" } @@ -378,6 +584,13 @@ func (gw *Gateway) modifyUsername(msg *config.Message, dest *bridge.Bridge) stri nick = strings.ReplaceAll(nick, "{NICK}", msg.Username) nick = strings.ReplaceAll(nick, "{USERID}", msg.UserID) nick = strings.ReplaceAll(nick, "{CHANNEL}", msg.Channel) + displayName := msg.Username + if dns, ok := msg.Extra["displayname"]; ok && len(dns) > 0 { + if dn, ok := dns[0].(string); ok && dn != "" { + displayName = dn + } + } + nick = strings.ReplaceAll(nick, "{DISPLAYNAME}", displayName) tengoNick, err := gw.modifyUsernameTengo(msg, br) if err != nil { gw.logger.Errorf("modifyUsernameTengo error: %s", err) @@ -476,6 +689,19 @@ func (gw *Gateway) SendMessage( msg.Channel = channel.Name msg.Avatar = gw.modifyAvatar(rmsg, dest) + + // Store original nick before RemoteNickFormat expansion, so bridges + // that need HTML-aware formatting can access it. + if msg.Extra == nil { + msg.Extra = make(map[string][]interface{}) + } + msg.Extra["nick"] = []interface{}{rmsg.Username} + + // Pass source message ID so bridges can embed it for historical cache population. + if rmsg.ID != "" { + msg.Extra["source_msgid"] = []interface{}{rmsg.Protocol + ":" + rmsg.ID} + } + msg.Username = gw.modifyUsername(rmsg, dest) // exclude file delete event as the msg ID here is the native file ID that needs to be deleted diff --git a/gateway/msgcache.go b/gateway/msgcache.go new file mode 100644 index 0000000000..059cfa2cc4 --- /dev/null +++ b/gateway/msgcache.go @@ -0,0 +1,264 @@ +package gateway + +import ( + "encoding/json" + "os" + "strings" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// PersistentMsgEntry represents a single downstream message ID mapping. +type PersistentMsgEntry struct { + Protocol string `json:"protocol"` + BridgeName string `json:"bridge_name"` + ID string `json:"id"` + ChannelID string `json:"channel_id"` + CreatedAt time.Time `json:"created_at,omitempty"` +} + +// PersistentMsgCache is a file-backed message ID cache that persists +// cross-bridge message ID mappings across restarts. +type PersistentMsgCache struct { + mu sync.Mutex + path string + data map[string][]PersistentMsgEntry + dirty bool + ticker *time.Ticker + stopCh chan struct{} + doneCh chan struct{} + logger *logrus.Entry + maxAge time.Duration + lastPrune time.Time +} + +const defaultMaxAge = 168 * time.Hour // 7 days +const pruneInterval = 1 * time.Hour + +// NewPersistentMsgCache creates a new persistent cache backed by the given file path. +// Returns nil if path is empty. Loads existing data on creation and starts a +// background flush loop that writes changes to disk every 30 seconds. +// maxAge controls how long message ID entries are kept; zero uses the default (7 days). +func NewPersistentMsgCache(path string, maxAge time.Duration, logger *logrus.Entry) *PersistentMsgCache { + if path == "" { + return nil + } + if maxAge <= 0 { + maxAge = defaultMaxAge + } + c := &PersistentMsgCache{ + path: path, + data: make(map[string][]PersistentMsgEntry), + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), + logger: logger, + maxAge: maxAge, + } + c.load() + c.prune() // clean up stale entries on startup + c.ticker = time.NewTicker(30 * time.Second) + go c.flushLoop() + return c +} + +func (c *PersistentMsgCache) load() { + f, err := os.ReadFile(c.path) + if err != nil { + if !os.IsNotExist(err) { + c.logger.Warnf("failed to read message cache %s: %s", c.path, err) + } + return + } + if err := json.Unmarshal(f, &c.data); err != nil { + c.logger.Warnf("failed to parse message cache %s: %s", c.path, err) + } else { + // Count non-metadata entries and show a sample. + msgEntries := 0 + sample := make([]string, 0, 5) + for key := range c.data { + if !strings.HasPrefix(key, lastSeenPrefix) && !strings.HasPrefix(key, deltaTokenPrefix) { + msgEntries++ + if len(sample) < 5 { + sample = append(sample, key) + } + } + } + c.logger.Infof("loaded %d entries from message cache %s (%d msg, %d metadata, sample: %v)", + len(c.data), c.path, msgEntries, len(c.data)-msgEntries, sample) + } +} + +func (c *PersistentMsgCache) flushLoop() { + defer close(c.doneCh) + for { + select { + case <-c.ticker.C: + if time.Since(c.lastPrune) >= pruneInterval { + c.prune() + } + c.Flush() + case <-c.stopCh: + c.ticker.Stop() + c.Flush() + return + } + } +} + +// prune removes message ID entries older than maxAge. +// Metadata keys (__last_seen__, __delta_token__) are never pruned. +func (c *PersistentMsgCache) prune() { + c.mu.Lock() + defer c.mu.Unlock() + cutoff := time.Now().Add(-c.maxAge) + pruned := 0 + for key, entries := range c.data { + if strings.HasPrefix(key, lastSeenPrefix) || strings.HasPrefix(key, deltaTokenPrefix) { + continue + } + if len(entries) == 0 { + delete(c.data, key) + pruned++ + continue + } + // Use CreatedAt of first entry as the age of this mapping. + // Zero time (old entries without CreatedAt) are pruned immediately. + t := entries[0].CreatedAt + if t.IsZero() || t.Before(cutoff) { + delete(c.data, key) + pruned++ + } + } + if pruned > 0 { + c.dirty = true + c.logger.Infof("pruned %d stale entries from message cache (older than %s)", pruned, c.maxAge) + } + c.lastPrune = time.Now() +} + +// Add stores a message ID mapping. Sets CreatedAt on all entries. +func (c *PersistentMsgCache) Add(key string, entries []PersistentMsgEntry) { + c.mu.Lock() + defer c.mu.Unlock() + now := time.Now() + for i := range entries { + entries[i].CreatedAt = now + } + c.data[key] = entries + c.dirty = true +} + +// Get returns downstream IDs for a key, or nil if not found. +func (c *PersistentMsgCache) Get(key string) ([]PersistentMsgEntry, bool) { + c.mu.Lock() + defer c.mu.Unlock() + v, ok := c.data[key] + return v, ok +} + +// FindDownstream searches all entries for a downstream match (by ID field) +// and returns the canonical (upstream) key. This mirrors the linear scan +// in Gateway.FindCanonicalMsgID but over the persistent store. +func (c *PersistentMsgCache) FindDownstream(id string) string { + c.mu.Lock() + defer c.mu.Unlock() + for key, entries := range c.data { + for _, entry := range entries { + if entry.ID == id { + return key + } + } + } + return "" +} + +// Flush writes the cache to disk if it has been modified since the last flush. +func (c *PersistentMsgCache) Flush() { + c.mu.Lock() + defer c.mu.Unlock() + if !c.dirty { + return + } + // Count non-metadata entries for logging. + msgEntries := 0 + for key := range c.data { + if !strings.HasPrefix(key, lastSeenPrefix) && !strings.HasPrefix(key, deltaTokenPrefix) { + msgEntries++ + } + } + data, err := json.MarshalIndent(c.data, "", " ") + if err != nil { + c.logger.Errorf("failed to marshal message cache: %s", err) + return + } + if err := os.WriteFile(c.path, data, 0600); err != nil { + c.logger.Errorf("failed to write message cache %s: %s", c.path, err) + return + } + c.dirty = false + c.logger.Infof("flushed message cache %s (%d msg entries, %d total keys)", c.path, msgEntries, len(c.data)) +} + +// SetLastSeen stores the timestamp of the last processed message for a channel. +// The channelKey should uniquely identify a channel+account combination. +func (c *PersistentMsgCache) SetLastSeen(channelKey string, t time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + key := lastSeenPrefix + channelKey + formatted := t.Format(time.RFC3339Nano) + if entries, ok := c.data[key]; ok && len(entries) > 0 && entries[0].ID == formatted { + return + } + c.data[key] = []PersistentMsgEntry{{ID: formatted}} + c.dirty = true +} + +// GetLastSeen returns the timestamp of the last processed message for a channel. +func (c *PersistentMsgCache) GetLastSeen(channelKey string) (time.Time, bool) { + c.mu.Lock() + defer c.mu.Unlock() + entries, ok := c.data[lastSeenPrefix+channelKey] + if !ok || len(entries) == 0 { + return time.Time{}, false + } + t, err := time.Parse(time.RFC3339Nano, entries[0].ID) + if err != nil { + return time.Time{}, false + } + return t, true +} + +const lastSeenPrefix = "__last_seen__:" +const deltaTokenPrefix = "__delta_token__:" + +// SetDeltaToken stores a Graph API delta token for a channel. +// The channelKey should uniquely identify a channel+account combination. +func (c *PersistentMsgCache) SetDeltaToken(channelKey, token string) { + c.mu.Lock() + defer c.mu.Unlock() + key := deltaTokenPrefix + channelKey + if entries, ok := c.data[key]; ok && len(entries) > 0 && entries[0].ID == token { + return + } + c.data[key] = []PersistentMsgEntry{{ID: token}} + c.dirty = true +} + +// GetDeltaToken returns the stored Graph API delta token for a channel. +func (c *PersistentMsgCache) GetDeltaToken(channelKey string) (string, bool) { + c.mu.Lock() + defer c.mu.Unlock() + entries, ok := c.data[deltaTokenPrefix+channelKey] + if !ok || len(entries) == 0 { + return "", false + } + return entries[0].ID, true +} + +// Stop stops the background flush loop and waits for the final flush to complete. +func (c *PersistentMsgCache) Stop() { + close(c.stopCh) + <-c.doneCh // block until flushLoop completes its final Flush() +} diff --git a/gateway/router.go b/gateway/router.go index a8aad1aaef..7a73fbccab 100644 --- a/gateway/router.go +++ b/gateway/router.go @@ -2,6 +2,7 @@ package gateway import ( "fmt" + "strings" "sync" "time" @@ -110,6 +111,13 @@ func (r *Router) Start() error { return nil } +// Stop performs a graceful shutdown: flushes and stops all persistent caches. +func (r *Router) Stop() { + for _, gw := range r.Gateways { + gw.stopPersistentCaches() + } +} + // disableBridge returns true and empties a bridge if we have IgnoreFailureOnStart configured // otherwise returns false func (r *Router) disableBridge(br *bridge.Bridge, err error) bool { @@ -143,6 +151,42 @@ func (r *Router) handleReceive() { // Set message protocol based on the account it came from msg.Protocol = r.getBridge(msg.Account).Protocol + // Handle historical cache population events — don't relay, just cache. + if msg.Event == config.EventHistoricalMapping { + r.handleHistoricalMapping(&msg) + continue + } + + // Handle replay messages — check persistent cache for dedup, then treat as normal. + isReplay := msg.Event == config.EventReplayMessage + if isReplay { + if msg.ID != "" { + cacheKey := msg.Protocol + " " + msg.ID + r.logger.Debugf("replay: dedup check for %s (account=%s)", cacheKey, msg.Account) + alreadyBridged := false + for _, gw := range r.Gateways { + if !gw.hasPersistentCache() { + r.logger.Debugf("replay: gateway %s has no persistent cache", gw.Name) + continue + } + if _, exists := gw.persistentCacheGet(cacheKey); exists { + alreadyBridged = true + break + } + if downstream := gw.persistentCacheFindDownstream(cacheKey); downstream != "" { + alreadyBridged = true + break + } + } + if alreadyBridged { + r.logger.Debugf("replay: skipping already-bridged message %s", cacheKey) + continue + } + r.logger.Debugf("replay: message %s NOT found in cache, will bridge", cacheKey) + } + msg.Event = "" // clear so downstream pipeline treats it as a normal message + } + filesHandled := false for _, gw := range r.Gateways { // record all the message ID's of the different bridges @@ -161,7 +205,8 @@ func (r *Router) handleReceive() { } if msg.ID != "" { - _, exists := gw.Messages.Get(msg.Protocol + " " + msg.ID) + cacheKey := msg.Protocol + " " + msg.ID + _, exists := gw.Messages.Get(cacheKey) // Only add the message ID if it doesn't already exist // @@ -169,13 +214,130 @@ func (r *Router) handleReceive() { // This is necessary as msgIDs will change if a bridge returns // a different ID in response to edits. if !exists { - gw.Messages.Add(msg.Protocol+" "+msg.ID, msgIDs) + gw.Messages.Add(cacheKey, msgIDs) + } + + // Write-through to persistent cache. + if gw.hasPersistentCache() && len(msgIDs) > 0 { + var entries []PersistentMsgEntry + for _, mid := range msgIDs { + if mid.br != nil && mid.ID != "" { + entries = append(entries, PersistentMsgEntry{ + Protocol: mid.br.Protocol, + BridgeName: mid.br.Name, + ID: mid.ID, + ChannelID: mid.ChannelID, + }) + } + } + if len(entries) > 0 { + gw.persistentCacheAdd(cacheKey, entries, msg.Account) + } else if isReplay { + r.logger.Debugf("replay: no cacheable entries for %s (msgIDs=%d)", cacheKey, len(msgIDs)) + } + // Update last-seen timestamp for the source channel. + channelKey := msg.Channel + msg.Account + if cache, ok := gw.BridgeCaches[msg.Account]; ok && cache != nil { + cache.SetLastSeen(channelKey, msg.Timestamp) + } } } } } } +// handleHistoricalMapping processes historical ID mapping events from bridges. +// It extracts the source-ID marker and stores a bidirectional mapping in the +// persistent cache of every gateway that has both the reporting bridge and +// the source bridge configured. +func (r *Router) handleHistoricalMapping(msg *config.Message) { + if msg.ID == "" || msg.Extra == nil { + return + } + srcIDs, ok := msg.Extra["source_msgid"] + if !ok || len(srcIDs) == 0 { + return + } + sourceIDStr, ok := srcIDs[0].(string) + if !ok || sourceIDStr == "" { + return + } + + // Parse "protocol:messageID" from the source marker. + parts := strings.SplitN(sourceIDStr, ":", 2) + if len(parts) != 2 { + return + } + sourceProtocol := parts[0] + sourceMessageID := parts[1] + + localKey := msg.Protocol + " " + msg.ID + sourceKey := sourceProtocol + " " + sourceMessageID + + for _, gw := range r.Gateways { + if !gw.hasPersistentCache() { + continue + } + + // Find the local bridge (the one that reported this mapping). + localBridge := gw.findBridge(msg.Protocol, extractBridgeName(msg.Account)) + if localBridge == nil { + continue + } + + // Find a bridge matching the source protocol in this gateway. + var sourceBridge *bridge.Bridge + for _, br := range gw.Bridges { + if br.Protocol == sourceProtocol { + sourceBridge = br + break + } + } + if sourceBridge == nil { + continue + } + + // Find channel IDs for both sides. + localChannelID := msg.Channel + msg.Account + var sourceChannelID string + for chID, ch := range gw.Channels { + if ch.Account == sourceBridge.Account { + sourceChannelID = chID + break + } + } + + // Store: sourceKey → points to local bridge (e.g., "mattermost POST123" → msteams entry) + if _, exists := gw.persistentCacheGet(sourceKey); !exists { + gw.persistentCacheAdd(sourceKey, []PersistentMsgEntry{{ + Protocol: localBridge.Protocol, + BridgeName: localBridge.Name, + ID: localKey, + ChannelID: localChannelID, + }}, sourceBridge.Account) + } + + // Store: localKey → points to source bridge (e.g., "msteams TEAMS456" → mattermost entry) + if _, exists := gw.persistentCacheGet(localKey); !exists && sourceChannelID != "" { + gw.persistentCacheAdd(localKey, []PersistentMsgEntry{{ + Protocol: sourceBridge.Protocol, + BridgeName: sourceBridge.Name, + ID: sourceKey, + ChannelID: sourceChannelID, + }}, msg.Account) + } + } +} + +// extractBridgeName returns the part after the dot in an account string like "msteams.windoof". +func extractBridgeName(account string) string { + parts := strings.SplitN(account, ".", 2) + if len(parts) == 2 { + return parts[1] + } + return account +} + // updateChannelMembers sends every minute an GetChannelMembers event to all bridges. func (r *Router) updateChannelMembers() { // TODO sleep a minute because slack can take a while diff --git a/matterbridge.go b/matterbridge.go index f5a18b65cf..269798ebc5 100644 --- a/matterbridge.go +++ b/matterbridge.go @@ -4,8 +4,10 @@ import ( "flag" "fmt" "os" + "os/signal" "runtime" "strings" + "syscall" "github.com/google/gops/agent" "github.com/matterbridge-org/matterbridge/bridge/config" @@ -67,7 +69,11 @@ func main() { logger.Fatalf("Starting gateway failed: %s", err) } logger.Printf("Gateway(s) started successfully. Now relaying messages") - select {} + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + logger.Printf("Received signal, shutting down...") + r.Stop() } func setupLogger() *logrus.Logger { diff --git a/testdata/demo.gif b/testdata/demo.gif new file mode 100644 index 0000000000..9dd5d0c524 Binary files /dev/null and b/testdata/demo.gif differ diff --git a/testdata/demo.png b/testdata/demo.png new file mode 100644 index 0000000000..89cac59824 Binary files /dev/null and b/testdata/demo.png differ diff --git a/testdata/embed.go b/testdata/embed.go new file mode 100644 index 0000000000..5f7a5de5f1 --- /dev/null +++ b/testdata/embed.go @@ -0,0 +1,11 @@ +// Package testdata provides embedded demo images for the @matterbridge test sequence. +// Place demo.png and demo.gif in this directory before building. +package testdata + +import _ "embed" + +//go:embed demo.png +var DemoPNG []byte + +//go:embed demo.gif +var DemoGIF []byte