Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 220 additions & 19 deletions src/channels/telegram-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ interface PendingMessage {
chatId: number;
messageId: number;
timestamp: number;
streamPreviewMessageId?: number;
streamPreviewDisabled?: boolean;
}

interface QueueData {
Expand All @@ -69,6 +71,12 @@ interface ResponseData {
files?: string[];
}

const TELEGRAM_TEXT_LIMIT = 4096;
const TELEGRAM_MESSAGE_NOT_MODIFIED_RE = /message is not modified/i;
const DEFAULT_PENDING_MAX_AGE_MS = 10 * 60 * 1000;
const MAX_PENDING_MAX_AGE_MS = 2 * 60 * 60 * 1000;
const PENDING_FILE = path.join(TINYCLAW_HOME, 'queue', 'pending-telegram.json');

function sanitizeFileName(fileName: string): string {
const baseName = path.basename(fileName).replace(/[<>:"/\\|?*\x00-\x1f]/g, '_').trim();
return baseName.length > 0 ? baseName : 'file.bin';
Expand Down Expand Up @@ -98,6 +106,38 @@ function buildUniqueFilePath(dir: string, preferredName: string): string {
const pendingMessages = new Map<string, PendingMessage>();
let processingOutgoingQueue = false;

function loadPendingMessages(): void {
try {
if (!fs.existsSync(PENDING_FILE)) {
return;
}
const data: Record<string, PendingMessage> = JSON.parse(fs.readFileSync(PENDING_FILE, 'utf8'));
const tenMinutesAgo = Date.now() - DEFAULT_PENDING_MAX_AGE_MS;
for (const [id, msg] of Object.entries(data)) {
if (msg.timestamp >= tenMinutesAgo) {
pendingMessages.set(id, msg);
}
}
if (pendingMessages.size > 0) {
log('INFO', `Restored ${pendingMessages.size} pending message(s) from disk`);
}
} catch (error) {
log('WARN', `Failed to load pending messages: ${(error as Error).message}`);
}
}

function savePendingMessages(): void {
try {
const obj: Record<string, PendingMessage> = {};
for (const [id, msg] of pendingMessages.entries()) {
obj[id] = msg;
}
fs.writeFileSync(PENDING_FILE, JSON.stringify(obj, null, 2));
} catch (error) {
log('WARN', `Failed to save pending messages: ${(error as Error).message}`);
}
}

// Logger
function log(level: string, message: string): void {
const timestamp = new Date().toISOString();
Expand All @@ -106,6 +146,94 @@ function log(level: string, message: string): void {
fs.appendFileSync(LOG_FILE, logMessage);
}

function resolvePendingMaxAgeMs(): number {
try {
if (!fs.existsSync(SETTINGS_FILE)) {
return DEFAULT_PENDING_MAX_AGE_MS;
}
const settings = JSON.parse(fs.readFileSync(SETTINGS_FILE, 'utf8'));
const maxResponseTimeSec = Number(settings?.monitoring?.max_response_time);
if (!Number.isFinite(maxResponseTimeSec) || maxResponseTimeSec <= 0) {
return DEFAULT_PENDING_MAX_AGE_MS;
}
const ms = Math.trunc(maxResponseTimeSec * 1000);
return Math.max(DEFAULT_PENDING_MAX_AGE_MS, Math.min(ms, MAX_PENDING_MAX_AGE_MS));
} catch {
return DEFAULT_PENDING_MAX_AGE_MS;
}
}

const PENDING_MAX_AGE_MS = resolvePendingMaxAgeMs();

function isTelegramMessageNotModified(err: any): boolean {
const description = err?.response?.body?.description;
return typeof description === 'string' && TELEGRAM_MESSAGE_NOT_MODIFIED_RE.test(description);
}

async function sendFormattedMessage(chatId: number, text: string, opts?: Record<string, any>): Promise<TelegramBot.Message> {
return await bot.sendMessage(chatId, text, opts || {});
}

async function editFormattedMessage(chatId: number, messageId: number, text: string): Promise<boolean> {
try {
await bot.editMessageText(text, {
chat_id: chatId,
message_id: messageId,
});
return true;
} catch (err: any) {
if (isTelegramMessageNotModified(err)) {
return true;
}
throw err;
}
}

async function clearPreviewMessage(pending: PendingMessage): Promise<void> {
const previewMessageId = pending.streamPreviewMessageId;
pending.streamPreviewMessageId = undefined;
pending.streamPreviewDisabled = false;
if (typeof previewMessageId !== 'number') {
return;
}
try {
await bot.deleteMessage(pending.chatId, previewMessageId);
} catch (err: any) {
const description = err?.response?.body?.description;
if (typeof description === 'string' && description.toLowerCase().includes('message to delete not found')) {
return;
}
log('WARN', `Failed to clear stream preview ${previewMessageId}: ${(err as Error).message}`);
}
}

async function cleanupPendingMessages(options?: { maxAgeMs?: number; chatId?: number; reason?: string }): Promise<number> {
const now = Date.now();
const maxAgeMs = options?.maxAgeMs ?? PENDING_MAX_AGE_MS;
const targetChatId = options?.chatId;
let removed = 0;

for (const [id, data] of pendingMessages.entries()) {
if (typeof targetChatId === 'number' && data.chatId !== targetChatId) {
continue;
}
const isExpired = now - data.timestamp > maxAgeMs;
const shouldRemove = typeof targetChatId === 'number' ? true : isExpired;
if (!shouldRemove) {
continue;
}
pendingMessages.delete(id);
await clearPreviewMessage(data);
removed++;
log('INFO', `Cleared pending message ${id}${options?.reason ? ` (${options.reason})` : ''}`);
}

if (removed > 0) {
savePendingMessages();
}
return removed;
}

// Load teams from settings for /team command
function getTeamListText(): string {
try {
Expand Down Expand Up @@ -153,7 +281,7 @@ function getAgentListText(): string {
}

// Split long messages for Telegram's 4096 char limit
function splitMessage(text: string, maxLength = 4096): string[] {
function splitMessage(text: string, maxLength = TELEGRAM_TEXT_LIMIT): string[] {
if (text.length <= maxLength) {
return [text];
}
Expand Down Expand Up @@ -259,6 +387,9 @@ function pairingMessage(code: string): string {
// Initialize Telegram bot (polling mode)
const bot = new TelegramBot(TELEGRAM_BOT_TOKEN, { polling: true });

// Restore pending messages so restarts do not orphan in-flight requests.
loadPendingMessages();

// Bot ready
bot.getMe().then(async (me: TelegramBot.User) => {
log('INFO', `Telegram bot connected as @${me.username}`);
Expand Down Expand Up @@ -390,7 +521,14 @@ bot.on('message', async (msg: TelegramBot.Message) => {
// Check for reset command: /reset @agent_id [@agent_id2 ...]
const resetMatch = messageText.trim().match(/^[!/]reset\s+(.+)$/i);
if (messageText.trim().match(/^[!/]reset$/i)) {
await bot.sendMessage(msg.chat.id, 'Usage: /reset @agent_id [@agent_id2 ...]\nSpecify which agent(s) to reset.', {
log('INFO', 'Reset command received');
const resetFlagPath = path.join(TINYCLAW_HOME, 'reset_flag');
fs.writeFileSync(resetFlagPath, 'reset');
await cleanupPendingMessages({
chatId: msg.chat.id,
reason: 'reset',
});
await bot.sendMessage(msg.chat.id, 'Conversation reset! Next message will start a fresh conversation.', {
reply_to_message_id: msg.message_id,
});
return;
Expand Down Expand Up @@ -422,6 +560,10 @@ bot.on('message', async (msg: TelegramBot.Message) => {
reply_to_message_id: msg.message_id,
});
}
await cleanupPendingMessages({
chatId: msg.chat.id,
reason: 'reset',
});
return;
}

Expand Down Expand Up @@ -457,14 +599,11 @@ bot.on('message', async (msg: TelegramBot.Message) => {
messageId: msg.message_id,
timestamp: Date.now(),
});

// Clean up old pending messages (older than 10 minutes)
const tenMinutesAgo = Date.now() - (10 * 60 * 1000);
for (const [id, data] of pendingMessages.entries()) {
if (data.timestamp < tenMinutesAgo) {
pendingMessages.delete(id);
}
}
await cleanupPendingMessages({
maxAgeMs: PENDING_MAX_AGE_MS,
reason: 'expired',
});
savePendingMessages();

} catch (error) {
log('ERROR', `Message handling error: ${(error as Error).message}`);
Expand All @@ -490,13 +629,58 @@ async function checkOutgoingQueue(): Promise<void> {
const responseData: ResponseData = JSON.parse(fs.readFileSync(filePath, 'utf8'));
const { messageId, message: responseText, sender, senderId } = responseData;

// Handle partial streaming responses.
if (messageId.startsWith('partial_')) {
const realMessageId = messageId.replace(/^partial_/, '').replace(/_r\d+$/, '');
const pending = pendingMessages.get(realMessageId);
if (pending && responseData.message) {
const previewText = responseData.message.trimEnd();
if (!pending.streamPreviewDisabled && previewText) {
if (previewText.length > TELEGRAM_TEXT_LIMIT) {
pending.streamPreviewDisabled = true;
log('WARN', `Stopped stream preview for ${realMessageId}: text exceeded ${TELEGRAM_TEXT_LIMIT} chars`);
} else if (typeof pending.streamPreviewMessageId === 'number') {
await editFormattedMessage(pending.chatId, pending.streamPreviewMessageId, previewText);
} else {
const sent = await sendFormattedMessage(pending.chatId, previewText);
pending.streamPreviewMessageId = sent.message_id;
}
}
pending.timestamp = Date.now();
savePendingMessages();
log('INFO', `Updated stream preview (${responseData.message.length} chars) for ${realMessageId}`);
}
fs.unlinkSync(filePath);
continue;
}

// Find pending message, or fall back to senderId for proactive messages
const pending = pendingMessages.get(messageId);
const targetChatId = pending?.chatId ?? (senderId ? Number(senderId) : null);

if (targetChatId && !Number.isNaN(targetChatId)) {
let finalizedViaPreview = false;
if (pending && typeof pending.streamPreviewMessageId === 'number' && responseText && (!responseData.files || responseData.files.length === 0)) {
const finalText = responseText.trimEnd();
if (finalText.length > 0 && finalText.length <= TELEGRAM_TEXT_LIMIT) {
try {
await editFormattedMessage(targetChatId, pending.streamPreviewMessageId, finalText);
finalizedViaPreview = true;
pending.streamPreviewMessageId = undefined;
pending.streamPreviewDisabled = false;
log('INFO', `Finalized stream preview in place for ${messageId}`);
} catch (err: any) {
log('WARN', `Preview finalization failed for ${messageId}, falling back to normal send: ${err.message}`);
}
}
}

if (!finalizedViaPreview && pending && typeof pending.streamPreviewMessageId === 'number') {
await clearPreviewMessage(pending);
}

// Send any attached files first
if (responseData.files && responseData.files.length > 0) {
if (!finalizedViaPreview && responseData.files && responseData.files.length > 0) {
for (const file of responseData.files) {
try {
if (!fs.existsSync(file)) continue;
Expand All @@ -518,23 +702,31 @@ async function checkOutgoingQueue(): Promise<void> {
}

// Split message if needed (Telegram 4096 char limit)
if (responseText) {
if (!finalizedViaPreview && responseText) {
const chunks = splitMessage(responseText);

if (chunks.length > 0) {
await bot.sendMessage(targetChatId, chunks[0]!, pending
await sendFormattedMessage(targetChatId, chunks[0]!, pending
? { reply_to_message_id: pending.messageId }
: {},
);
}
for (let i = 1; i < chunks.length; i++) {
await bot.sendMessage(targetChatId, chunks[i]!);
await sendFormattedMessage(targetChatId, chunks[i]!);
}
} else if (!finalizedViaPreview && pending) {
log('WARN', `Empty response for message ${messageId}, sending fallback`);
await bot.sendMessage(targetChatId, 'Response was empty. The agent may still be processing. Try again.', {
reply_to_message_id: pending.messageId,
});
}

log('INFO', `Sent ${pending ? 'response' : 'proactive message'} to ${sender} (${responseText.length} chars${responseData.files ? `, ${responseData.files.length} file(s)` : ''})`);

if (pending) pendingMessages.delete(messageId);
if (pending) {
pendingMessages.delete(messageId);
savePendingMessages();
}
fs.unlinkSync(filePath);
} else {
log('WARN', `No pending message for ${messageId} and no valid senderId, cleaning up`);
Expand All @@ -557,11 +749,18 @@ setInterval(checkOutgoingQueue, 1000);

// Refresh typing indicator every 4 seconds for pending messages
setInterval(() => {
for (const [, data] of pendingMessages.entries()) {
bot.sendChatAction(data.chatId, 'typing').catch(() => {
// Ignore typing errors silently
void (async () => {
await cleanupPendingMessages({
maxAgeMs: PENDING_MAX_AGE_MS,
reason: 'expired',
});
}

for (const [, data] of pendingMessages.entries()) {
bot.sendChatAction(data.chatId, 'typing').catch(() => {
// Ignore typing errors silently
});
}
})();
}, 4000);

// Handle polling errors
Expand All @@ -572,12 +771,14 @@ bot.on('polling_error', (error: Error) => {
// Graceful shutdown
process.on('SIGINT', () => {
log('INFO', 'Shutting down Telegram client...');
savePendingMessages();
bot.stopPolling();
process.exit(0);
});

process.on('SIGTERM', () => {
log('INFO', 'Shutting down Telegram client...');
savePendingMessages();
bot.stopPolling();
process.exit(0);
});
Expand Down
Loading