Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ RAG_WARMUP_ON_LAUNCH=1
# YouTube API Configuration (Optional - for robust metadata)
# Get your key from: https://console.cloud.google.com/apis/credentials
YOUTUBE_API_KEY=

# Telegram Bot Configuration
# Get your token from @BotFather on Telegram
TELEGRAM_BOT_TOKEN=
9 changes: 9 additions & 0 deletions deploy/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
#!/usr/bin/env bash
set -euo pipefail

# Start Telegram bot in background (if token is configured)
if [ -n "${TELEGRAM_BOT_TOKEN:-}" ]; then
echo "[entrypoint] Starting Telegram bot (polling mode)..."
python run_telegram_bot.py &
TELEGRAM_PID=$!
echo "[entrypoint] Telegram bot started (PID: $TELEGRAM_PID)"
fi

# Start Streamlit frontend (foreground — container stays alive)
exec python deploy/start_frontend.py
6 changes: 4 additions & 2 deletions deploy/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ locals {

streamlit_container_secrets = concat(
local.streamlit_openai_value != "" && local.streamlit_openai_is_arn ? [{ name = "OPENAI_API_KEY", valueFrom = local.streamlit_openai_value }] : [],
var.youtube_api_key_secret_arn != "" ? [{ name = "YOUTUBE_API_KEY", valueFrom = var.youtube_api_key_secret_arn }] : []
var.youtube_api_key_secret_arn != "" ? [{ name = "YOUTUBE_API_KEY", valueFrom = var.youtube_api_key_secret_arn }] : [],
var.telegram_bot_token_secret_arn != "" ? [{ name = "TELEGRAM_BOT_TOKEN", valueFrom = var.telegram_bot_token_secret_arn }] : []
)

worker_container_secrets = concat(
Expand All @@ -37,7 +38,8 @@ locals {

exec_secret_arns = concat(
local.openai_secret_arns,
var.youtube_api_key_secret_arn != "" ? [var.youtube_api_key_secret_arn] : []
var.youtube_api_key_secret_arn != "" ? [var.youtube_api_key_secret_arn] : [],
var.telegram_bot_token_secret_arn != "" ? [var.telegram_bot_token_secret_arn] : []
)
}

Expand Down
3 changes: 3 additions & 0 deletions deploy/terraform.tfvars.example
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ streamlit_desired_count = 2
streamlit_min_capacity = 2
streamlit_max_capacity = 10

# Telegram Bot (optional — leave empty to disable)
telegram_bot_token_secret_arn = ""

# Enable aws ecs execute-command support
enable_ecs_exec = true

Expand Down
6 changes: 6 additions & 0 deletions deploy/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,9 @@ variable "ask_assistant_enabled" {
type = string
default = "0"
}

variable "telegram_bot_token_secret_arn" {
description = "Secrets Manager ARN for TELEGRAM_BOT_TOKEN. Leave empty to disable the Telegram bot."
type = string
default = ""
}
6 changes: 6 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,9 @@ mypy>=1.10.0
# Testing
pytest>=8.0.0
pytest-cov>=5.0.0

# Telegram Bot
python-telegram-bot>=21.0

# Testing (async)
pytest-asyncio>=0.23.0
40 changes: 40 additions & 0 deletions run_telegram_bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Entry point for the NEET PYQ Telegram Bot."""

import logging
import os
from importlib import import_module


def _load_dotenv() -> None:
try:
dotenv = import_module("dotenv")
except ModuleNotFoundError:
return

load_fn = getattr(dotenv, "load_dotenv", None)
if callable(load_fn):
load_fn()


_load_dotenv()

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger(__name__)


def main() -> None:
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
raise ValueError("TELEGRAM_BOT_TOKEN environment variable is required")
logger.info("Initializing NEET PYQ Telegram Bot...")
from src.telegram_bot.bot import create_application, run_polling

app = create_application(token)
run_polling(app)


if __name__ == "__main__":
main()
5 changes: 5 additions & 0 deletions src/telegram_bot/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Telegram bot interface for the NEET PYQ Assistant."""

from src.telegram_bot.bot import create_application

__all__ = ["create_application"]
225 changes: 225 additions & 0 deletions src/telegram_bot/bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import asyncio
import logging
from typing import Any, Mapping

from telegram import Update
from telegram.constants import ChatAction, ParseMode
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)

from src.telegram_bot.formatting import format_response
from src.telegram_bot.history import TelegramChatHistory

logger = logging.getLogger(__name__)

_ERR_GENERIC = "Sorry, I couldn't process your question right now. Please try again."
_ERR_IMAGE_DOWNLOAD = "I couldn't download your image. Please try sending it again."
_ERR_IMAGE_EXTRACT = "I couldn't read the question from your image. Try a clearer photo or type the question."


def _is_error_response(result: Mapping[str, object]) -> bool:
if "error" in result:
return True
answer = result.get("answer", "")
return isinstance(answer, str) and answer.startswith("Error generating answer:")


def create_application(token: str, rag: Any | None = None):
app = Application.builder().token(token).build()
if rag is None:
from src.utils.rag_singleton import get_rag_system

rag = get_rag_system()

app.bot_data["rag"] = rag
app.bot_data["history"] = TelegramChatHistory()
app.add_handler(CommandHandler("start", start_command))
app.add_handler(CommandHandler("help", help_command))
app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
app.add_error_handler(error_handler)
return app


def run_polling(app) -> None:
app.run_polling(drop_pending_updates=True)


async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
_ = context
message = update.message
if message is None:
return
welcome = (
"Welcome to the <b>NEET PYQ Assistant</b>!\n\n"
"Send a text question or a photo question and I will help with answers and sources.\n\n"
"Type /help for usage instructions."
)
_ = await message.reply_text(welcome, parse_mode=ParseMode.HTML)


async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
_ = context
message = update.message
if message is None:
return
help_text = (
"<b>How to use:</b>\n"
"- Send a text question\n"
"- Or send a photo of a question with an optional caption"
)
_ = await message.reply_text(help_text, parse_mode=ParseMode.HTML)


async def _send_typing_periodically(
chat_id: int, context: ContextTypes.DEFAULT_TYPE
) -> None:
try:
while True:
await context.bot.send_chat_action(
chat_id=chat_id, action=ChatAction.TYPING
)
await asyncio.sleep(5)
except asyncio.CancelledError:
return


async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
message = update.message
user = update.effective_user
chat = update.effective_chat
if message is None or user is None or chat is None or message.text is None:
return

question = message.text
user_id = user.id
chat_id = chat.id
rag = context.bot_data["rag"]
history = context.bot_data["history"]

await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
typing_task = asyncio.create_task(_send_typing_periodically(chat_id, context))
try:
chat_history = history.load_history(user_id)
result = rag.query_with_history(
question,
chat_history=chat_history,
session_id=str(user_id),
user_id=str(user_id),
)

if _is_error_response(result):
_ = await message.reply_text(_ERR_GENERIC)
return

raw_answer = str(result.get("answer", ""))
parts = format_response(
answer_text=raw_answer,
youtube_sources=result.get("sources", []),
question_sources=result.get("question_sources", []),
)
for part in parts:
_ = await message.reply_text(part, parse_mode=ParseMode.HTML)

history.save_turn(
user_id=user_id,
user_message=question,
assistant_message=raw_answer,
)
except Exception:
logger.exception("Failed to handle text message")
_ = await message.reply_text(_ERR_GENERIC)
finally:
typing_task.cancel()


async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
message = update.message
user = update.effective_user
chat = update.effective_chat
if message is None or user is None or chat is None:
return

user_id = user.id
chat_id = chat.id
caption = message.caption or ""
rag = context.bot_data["rag"]
history = context.bot_data["history"]

await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
typing_task = asyncio.create_task(_send_typing_periodically(chat_id, context))
try:
try:
photo = message.photo[-1]
file = await context.bot.get_file(photo.file_id)
image_bytes = bytes(await file.download_as_bytearray())
except Exception:
logger.exception("Failed downloading photo")
_ = await message.reply_text(_ERR_IMAGE_DOWNLOAD)
return

try:
extracted = rag.llm_manager.extract_image_context(
image_bytes=image_bytes,
filename="telegram_photo.jpg",
user_hint=caption,
session_id=str(user_id),
user_id=str(user_id),
)
except Exception:
logger.exception("Failed extracting photo context")
_ = await message.reply_text(_ERR_IMAGE_EXTRACT)
return

if caption:
question = f"{caption}\n\nImage context:\n{extracted}"
else:
question = str(extracted)

chat_history = history.load_history(user_id)
result = rag.query_with_history(
question,
chat_history=chat_history,
session_id=str(user_id),
user_id=str(user_id),
)

if _is_error_response(result):
_ = await message.reply_text(_ERR_GENERIC)
return

raw_answer = str(result.get("answer", ""))
parts = format_response(
answer_text=raw_answer,
youtube_sources=result.get("sources", []),
question_sources=result.get("question_sources", []),
)
for part in parts:
_ = await message.reply_text(part, parse_mode=ParseMode.HTML)

history.save_turn(
user_id=user_id,
user_message=question,
assistant_message=raw_answer,
)
except Exception:
logger.exception("Failed to handle photo message")
_ = await message.reply_text(_ERR_GENERIC)
finally:
typing_task.cancel()


async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.error("Unhandled exception", exc_info=context.error)
if isinstance(update, Update) and update.effective_chat:
try:
_ = await context.bot.send_message(
chat_id=update.effective_chat.id, text=_ERR_GENERIC
)
except Exception:
logger.exception("Failed to send error message")
Loading
Loading