-
Notifications
You must be signed in to change notification settings - Fork 0
Chat providers Message Integration
A unified system for fetching messages from Slack, Microsoft Teams, and Google Chat using the Factory and Strategy design patterns.
- System Architecture
- Supported Platforms
- Design Approach
- Common Interface
- Platform-Specific Strategies
- Factory Implementation
- Usage Example
- Notes & Best Practices
The diagram below shows the end-to-end alert ingestion and response pipeline, spanning External Systems, the Agent Extension Tool, Independent Infrastructure Services, and the L0 Agent.
graph LR
%% ── Far Left: Sources ───────────────────────────────────────
Monitor["Prometheus\nAlertManager"]
subgraph PlatIn[" Inbound Platforms "]
direction TB
GChat["Google Chat"]
Teams["MS Teams"]
Slack["Slack"]
end
%% ── Extension Tool ──────────────────────────────────────────
subgraph ET[" 01 Agent Extension Tool "]
direction TB
%% Updated Message Gateway
subgraph GW["Message Gateway"]
direction TB
GWCore["<div style='width:420px'><b>Platform Gateway Adapter</b><br/><br/>
__init__()<br/>
verify_signature()<br/>
validate_event()<br/>
publish_to_dapr()<br/>
send_message()<br/>
reply_to_event()<br/>
consume_outbound()</div>"]
end
subgraph VP["Validation Pipeline"]
direction TB
SVal["<b>Schema Validator</b><br/>JSON Schema / alert_type<br/>→ ValidatedPayload · 400"]
Dedup["<b>Dedup Filter</b><br/>SHA-256(source+id+fired_at)<br/>Dapr State · TTL 24 h"]
RL["<b>Rate Limiter</b><br/>Dapr State · INCR/EXPIRE<br/>per tenant · 60 s window"]
end
DaprET["<b>Dapr Sidecar</b><br/>Pub: alerts-inbound<br/>Sub: alerts-outbound<br/>State: fingerprints · counters"]
GWCore --> SVal --> Dedup --> RL --> DaprET
Dedup <-->|"[State] Dapr State API"| DaprET
RL <-->|"[State] Dapr State API"| DaprET
end
%% ── Broker ──────────────────────────────────────────────────
subgraph Broker["Dapr Pub/Sub · RabbitMQ (swappable: Kafka · SQS · GCP Pub/Sub)"]
direction TB
CH1["alerts-inbound"]
CH2["alerts-outbound"]
end
%% ── Dapr State Store ─────────────────────────────────────────
subgraph StateStore["Dapr State Store · Redis 7 "]
direction TB
Redis[("<b>Redis 7</b><br/>Fingerprints<br/>Rate Counters")]
end
%% ── L0 Agent ────────────────────────────────────────────────
subgraph L0[" L0 Agent "]
direction TB
DaprL0["<b>Dapr Sidecar</b><br/>Sub: alerts-inbound<br/>Pub: alerts-outbound"]
subgraph QM["Queue Manager"]
direction TB
AC["<b>Alert Consumer</b><br/>Dapr /subscribe callback → RawAlert"]
PP["<b>Payload Parser</b><br/>Normalise · enrich · score severity<br/>→ NormalisedAlert {routingHint: auto|hitl}"]
end
subgraph Core["Core Components"]
A2A["<b>A2A Server</b><br/>Central core · A2A JSON-RPC · HTTPS<br/>auto → dispatches to L1 Agents<br/>hitl → HITL operator approval<br/>→ AgentResponse · HumanResponse"]
end
subgraph RH["Response Handler"]
direction TB
FM["<b>Formatter</b><br/>Slack → Block Kit<br/>Teams → Adaptive Card<br/>GChat → Card JSON<br/>→ FormattedResponse"]
RT["<b>Router</b><br/>Publishes to Dapr<br/>→ RoutedResponse"]
end
DaprL0 --> AC --> PP --> A2A
A2A -.->|"L1 response"| FM -.->|"L1 response"| RT -.->|"L1 response"| DaprL0
end
%% ── Far Right ────────────────────────────────────────────────
L1["L1 Agents"]
subgraph PlatOut[" Outbound Platform APIs "]
direction TB
SlackOut["Slack Web API<br/>/chat.postMessage · Block Kit"]
TeamsOut["Teams Graph API<br/>/sendActivity · Adaptive Card"]
GChatOut["GChat REST API<br/>/spaces/messages · Card JSON"]
end
%% ── FLOWS ───────────────────────────────────────────────────
Monitor -->|"webhook · bot msg · HTTPS"| GChat
Monitor -->|"webhook · bot msg · HTTPS"| Teams
Monitor -->|"webhook · bot msg · HTTPS"| Slack
GChat -->|"Chat Event JSON"| GWCore
Teams -->|"Activity JSON"| GWCore
Slack -->|"Event JSON"| GWCore
DaprET -->|"[Pub/Sub] publish · alerts-inbound"| CH1
CH1 -->|"[Pub/Sub] deliver · alerts-inbound"| DaprL0
DaprL0 -.->|"[Pub/Sub] publish · alerts-outbound"| CH2
CH2 -.->|"[Pub/Sub] deliver · alerts-outbound"| DaprET
DaprET <-->|"[State] Redis RESP3"| Redis
A2A <-.->|"A2A JSON-RPC · HTTPS"| L1
DaprET -.->|"ResponsePayload JSON"| GWCore
GWCore -.->|"Platform API"| SlackOut
GWCore -.->|"Platform API"| TeamsOut
GWCore -.->|"Platform API"| GChatOut
%% ── Styles ───────────────────────────────────────────────────
classDef external fill:#e0e7ff,stroke:#4f46e5,stroke-width:2px,color:#1e1b4b
classDef monitor fill:#ede9fe,stroke:#7c3aed,stroke-width:2px,color:#3b0764
classDef gateway fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e3a8a
classDef validation fill:#e0f2fe,stroke:#0284c7,stroke-width:2px,color:#0c4a6e
classDef queuemgr fill:#dbeafe,stroke:#2563eb,stroke-width:2px,color:#1e3a8a
classDef a2a fill:#d1fae5,stroke:#059669,stroke-width:3px,color:#065f46
classDef response fill:#e0f2fe,stroke:#0284c7,stroke-width:2px,color:#0c4a6e
classDef dapr fill:#f3e8ff,stroke:#7c3aed,stroke-width:2px,color:#4c1d95
classDef broker fill:#fef3c7,stroke:#d97706,stroke-width:2px,color:#92400e
classDef infra fill:#fef9c3,stroke:#ca8a04,stroke-width:2px,color:#713f12
classDef platout fill:#f0fdf4,stroke:#16a34a,stroke-width:2px,color:#14532d
class GChat,Teams,Slack,L1 external
class Monitor monitor
class GWCore gateway
class SVal,Dedup,RL validation
class AC,PP queuemgr
class A2A a2a
class FM,RT response
class DaprET,DaprL0 dapr
class CH1,CH2 broker
class Redis infra
class SlackOut,TeamsOut,GChatOut platout
The system is organized into four distinct layers, each with independent deployment boundaries:
External Systems — Monitoring tools (Prometheus, Grafana, AlertManager) detect infrastructure issues and publish alert notifications to configured channels across Google Chat, Microsoft Teams, and Slack.
01 Agent Extension Tool — A standalone service that acts as the entry point for all platform traffic. It contains two components: the Message Gateway, which handles platform detection, signature verification, and the Factory pattern for routing; and the Validation Pipeline, which enforces schema correctness, deduplication, and rate limiting against Redis.
Independent Services — Infrastructure components that run outside any agent boundary. Redis Cache stores deduplication fingerprints and rate-limit counters. The Message Queue Service (RabbitMQ) is a shared bus that carries both inbound alerts from the Extension Tool to L0 and outbound responses from L0 back to the Extension Tool.
L0 Agent — Consumes alerts from the Message Queue via the Queue Manager, fans them out to Core Components (the A2A Gateway for L1 handoff and the Web UI for real-time dashboard display), and routes formatted responses or HITL actions back to the Message Queue through the Response Handler.
Step 1 — Alert Generation: Monitoring tools detect infrastructure or application issues and publish alert notifications to configured chat platform channels.
Step 2 — Bidirectional Platform Ingestion: The Message Gateway inside the Extension Tool communicates with each platform over its native API (Chat API, Slack API, Graph API). It performs platform detection and verifies the request signature before passing traffic downstream. The bidirectional connection also allows the system to post responses back to the originating platform channel.
Step 3 — Validation and Deduplication: The Validation Pipeline checks each alert against Redis for duplicate fingerprints and enforces rate limits. Valid alerts are enqueued onto the Message Queue as inbound alerts; outbound responses from L0 pass back through this same pipeline before being delivered to the platform.
Step 4 — Queuing (Independent Service): The Message Queue (RabbitMQ) acts as the integration point between the Extension Tool and L0. It maintains priority FIFO ordering for inbound alerts and carries outbound responses in the reverse direction, decoupling the two services entirely.
Step 5 — Processing in L0: The Queue Manager consumes inbound alerts from the queue and fans them out to Core Components — the Web UI (real-time dashboard via WebSocket) and the A2A Gateway (L1 handoff over the A2A protocol).
Step 6 — Response Routing: After Core Components process an alert or a human-in-the-loop (HITL) action is triggered, the Response Handler formats the result and publishes it back onto the Message Queue, completing the round-trip to the originating chat platform.
| Platform | API Reference | Key Endpoints / Methods | Auth Type | Notes |
|---|---|---|---|---|
| Slack | [Slack Web API](https://api.slack.com/web) |
conversations.list (channels)conversations.history (messages)conversations.replies (thread replies) |
OAuth 2.0 (Bot Token) | Bot must be a member of the channel to read messages; rate-limited (~1 req/sec) |
| Microsoft Teams | [Microsoft Graph API](https://learn.microsoft.com/en-us/graph/overview) |
/teams/{team-id}/channels/teams/{team-id}/channels/{channel-id}/messages
|
OAuth 2.0 (Azure AD App) | App must have permission for the target teams/channels; supports historical messages |
| Google Chat | [Google Chat API](https://developers.google.com/chat) |
spaces.list (spaces/channels)spaces.messages.list (messages) |
OAuth 2.0 (Service Account/User) | Access may be limited based on workspace settings |
The Message Gateway inside the Extension Tool uses two classic design patterns to abstract platform-specific details while exposing a clean, unified interface.
- Each platform implements a strategy that defines how to fetch channels and messages.
- Provides a uniform interface to the rest of the application.
- Returns the correct strategy instance based on the platform name.
- Allows adding new platforms without modifying existing logic.
MessageFetcherFactory
│
├── 'slack' → SlackFetcher
├── 'teams' → TeamsFetcher
└── 'googlechat' → GoogleChatFetcher
All platform fetchers implement the same base interface:
// IMessageFetcher.js
export default class IMessageFetcher {
async fetchChannels() {
throw new Error("fetchChannels() not implemented");
}
async fetchMessages(channelId) {
throw new Error("fetchMessages() not implemented");
}
}Uses the official [@slack/web-api](https://www.npmjs.com/package/@slack/web-api) SDK.
// SlackFetcher.js
import IMessageFetcher from './IMessageFetcher';
import { WebClient } from '@slack/web-api';
export class SlackFetcher extends IMessageFetcher {
constructor(token) {
super();
this.client = new WebClient(token);
}
async fetchChannels() {
const res = await this.client.conversations.list();
return res.channels;
}
async fetchMessages(channelId) {
const res = await this.client.conversations.history({ channel: channelId });
return res.messages;
}
}Uses the [@microsoft/microsoft-graph-client](https://www.npmjs.com/package/@microsoft/microsoft-graph-client) SDK.
// TeamsFetcher.js
import IMessageFetcher from './IMessageFetcher';
import { Client } from '@microsoft/microsoft-graph-client';
export class TeamsFetcher extends IMessageFetcher {
constructor(authProvider) {
super();
this.client = Client.initWithMiddleware({ authProvider });
}
async fetchChannels(teamId) {
const res = await this.client.api(`/teams/${teamId}/channels`).get();
return res.value;
}
async fetchMessages(teamId, channelId) {
const res = await this.client
.api(`/teams/${teamId}/channels/${channelId}/messages`)
.get();
return res.value;
}
}Uses the [googleapis](https://www.npmjs.com/package/googleapis) SDK.
// GoogleChatFetcher.js
import IMessageFetcher from './IMessageFetcher';
import { google } from 'googleapis';
export class GoogleChatFetcher extends IMessageFetcher {
constructor(auth) {
super();
this.chat = google.chat({ version: 'v1', auth });
}
async fetchSpaces() {
const res = await this.chat.spaces.list();
return res.data.spaces;
}
async fetchMessages(spaceName) {
const res = await this.chat.spaces.messages.list({ parent: spaceName });
return res.data.messages;
}
}// MessageFetcherFactory.js
import { SlackFetcher } from './SlackFetcher';
import { TeamsFetcher } from './TeamsFetcher';
import { GoogleChatFetcher } from './GoogleChatFetcher';
export class MessageFetcherFactory {
static getFetcher(platform, config) {
switch (platform) {
case 'slack':
return new SlackFetcher(config.token);
case 'teams':
return new TeamsFetcher(config.authProvider);
case 'googlechat':
return new GoogleChatFetcher(config.auth);
default:
throw new Error(`Unsupported platform: "${platform}"`);
}
}
}// --- Slack ---
const slackFetcher = MessageFetcherFactory.getFetcher('slack', {
token: SLACK_BOT_TOKEN,
});
const channels = await slackFetcher.fetchChannels();
const messages = await slackFetcher.fetchMessages(channels[0].id);
// --- Microsoft Teams ---
const teamsFetcher = MessageFetcherFactory.getFetcher('teams', {
authProvider: teamsAuthProvider,
});
const teamsChannels = await teamsFetcher.fetchChannels(TEAM_ID);
const teamsMessages = await teamsFetcher.fetchMessages(TEAM_ID, teamsChannels[0].id);
// --- Google Chat ---
const googleFetcher = MessageFetcherFactory.getFetcher('googlechat', {
auth: googleAuthClient,
});
const spaces = await googleFetcher.fetchSpaces();
const googleMessages = await googleFetcher.fetchMessages(spaces[0].name);- Use OAuth 2.0 tokens or service accounts securely.
- Never hardcode secrets — use environment variables or a secrets manager.
Handle API rate limits gracefully:
| Platform | Rate Limit |
|---|---|
| Slack | ~1 request/sec per method |
| Microsoft Teams | Per [Graph API limits](https://learn.microsoft.com/en-us/graph/throttling) |
| Google Chat | Workspace-dependent limits |
Wrap all API calls in try/catch and log errors for monitoring:
try {
const messages = await fetcher.fetchMessages(channelId);
} catch (error) {
console.error(`Failed to fetch messages: ${error.message}`);
// handle or rethrow
}Redis Cache and the RabbitMQ Message Queue are deployed as independent services outside both the Extension Tool and the L0 Agent. This means:
- Either service can be scaled, restarted, or upgraded without affecting the other.
- The Message Queue acts as the sole integration contract between the Extension Tool and L0 — neither service calls the other directly.
- Redis is accessed only by the Validation Pipeline for fingerprint checks and rate counter reads/writes.
Unlike a pure ingestion model, the Message Gateway maintains bidirectional connections to each chat platform. This enables the Response Handler in L0 to post responses (or HITL prompts) back to the originating channel after an alert is processed.
To add a new chat platform:
- Create a new strategy class extending
IMessageFetcher. - Add a new
caseinMessageFetcherFactory. - Register the new platform's API connection in the Message Gateway.
- No changes needed to the Validation Pipeline, Message Queue, or any L0 components.
If your use case requires thread support:
-
Slack: Use
conversations.replies({ channel, ts })with the parent message timestamp. -
Microsoft Teams: Access the
repliesproperty on a message object via the Graph API.