Skip to content

Commit c50a58f

Browse files
committed
feat: add log queue
1 parent b9d3724 commit c50a58f

File tree

4 files changed

+67
-71
lines changed

4 files changed

+67
-71
lines changed

userbot/__main__.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import gc
33
import logging
44
import sys
5-
from typing import Dict, Any
5+
from typing import Dict, Any, List
66

77
from apscheduler.schedulers.asyncio import AsyncIOScheduler
88
from rich.console import Console
@@ -15,13 +15,40 @@
1515
ACTIVE_CLIENTS, FAKE, TelegramClient,
1616
db_setup, manage_clients, GLOBAL_HELP_INFO
1717
)
18-
from userbot.src.config import GC_INTERVAL_SECONDS
18+
from userbot.src.config import GC_INTERVAL_SECONDS, LOG_QUEUE_INTERVAL_SECONDS, LOG_QUEUE_BATCH_SIZE
1919
from userbot.src.db.session import get_db
2020
import userbot.src.db_manager as db_manager
21+
from userbot.src.log_handler import log_queue
2122

2223
console: Console = Console()
2324
logger: logging.Logger = logging.getLogger(__name__)
2425

26+
# --- Log Processing Worker ---
27+
async def process_log_queue():
28+
"""
29+
Periodically processes logs from the queue and writes them to the DB.
30+
"""
31+
logs_to_process: List[Dict[str, Any]] = []
32+
33+
# Drain the queue up to the batch size
34+
while len(logs_to_process) < LOG_QUEUE_BATCH_SIZE and not log_queue.empty():
35+
try:
36+
log_item = log_queue.get_nowait()
37+
logs_to_process.append(log_item)
38+
log_queue.task_done()
39+
except asyncio.QueueEmpty:
40+
break
41+
42+
if logs_to_process:
43+
try:
44+
async with get_db() as db_session:
45+
await db_manager.add_logs_bulk(db_session, logs_to_process)
46+
except Exception as e:
47+
# If the DB fails, the logs are lost, but we print an error.
48+
# The original handler already has a fallback for this scenario.
49+
logger.error(f"Failed to process log queue batch: {e}", exc_info=True)
50+
51+
2552
# --- Helper Functions ---
2653
async def get_account_id_from_client(client) -> int | None:
2754
return next((acc_id for acc_id, c in ACTIVE_CLIENTS.items() if c == client), None)
@@ -184,11 +211,13 @@ async def main():
184211

185212
scheduler = AsyncIOScheduler(timezone="UTC")
186213
scheduler.add_job(gc.collect, 'interval', seconds=GC_INTERVAL_SECONDS, id='gc_job')
214+
scheduler.add_job(process_log_queue, 'interval', seconds=LOG_QUEUE_INTERVAL_SECONDS, id='log_queue_job')
187215
scheduler.start()
216+
188217
console.print(f"-> [system] - GC scheduled every {GC_INTERVAL_SECONDS} seconds.", style="blue")
218+
console.print(f"-> [system] - Log queue processing scheduled every {LOG_QUEUE_INTERVAL_SECONDS} seconds.", style="blue")
189219

190220
logger.info("Userbot is running. Press Ctrl+C to stop.")
191-
# Keep the main coroutine alive indefinitely
192221
await asyncio.Event().wait()
193222

194223

userbot/src/config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616
DB_USER: str = os.getenv("DB_USER", "userbot")
1717
DB_PASS: str = os.getenv("DB_PASS", "userbot_password")
1818
DB_NAME: str = os.getenv("DB_NAME", "userbot_db")
19-
# New: Connection retry settings
2019
DB_CONN_RETRIES: int = int(os.getenv("DB_CONN_RETRIES", 5))
2120
DB_CONN_RETRY_DELAY: int = int(os.getenv("DB_CONN_RETRY_DELAY", 5))
2221

2322

2423
# --- Application Settings ---
2524
MODULE_FOLDER: str = "userbot.modules"
2625
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO").upper()
26+
# New: Logging queue settings
27+
LOG_QUEUE_INTERVAL_SECONDS: int = int(os.getenv("LOG_QUEUE_INTERVAL_SECONDS", 5))
28+
LOG_QUEUE_BATCH_SIZE: int = int(os.getenv("LOG_QUEUE_BATCH_SIZE", 50))
29+
2730

2831
# --- Scheduler Settings ---
2932
GC_INTERVAL_SECONDS: int = int(os.getenv("GC_INTERVAL_SECONDS", 60))

userbot/src/db_manager.py

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ async def add_account(
2525
app_version: str,
2626
user_telegram_id: Optional[int] = None
2727
) -> Optional[Account]:
28-
"""Adds a new account to the database with encrypted credentials."""
2928
try:
3029
new_account = Account(
3130
account_name=account_name,
@@ -43,7 +42,7 @@ async def add_account(
4342
logger.info(f"Added account '{account_name}' with ID: {new_account.account_id}")
4443
return new_account
4544
except IntegrityError:
46-
logger.warning(f"Account with name '{account_name}' already exists.")
45+
logger.warning(f"Account with name '{account_name}' or user_id '{user_telegram_id}' already exists.")
4746
await db.rollback()
4847
return None
4948
except Exception as e:
@@ -52,27 +51,22 @@ async def add_account(
5251
return None
5352

5453
async def get_account(db: AsyncSession, account_name: str) -> Optional[Account]:
55-
"""Retrieves an account by its name."""
5654
result = await db.execute(select(Account).where(Account.account_name == account_name))
5755
return result.scalars().first()
5856

5957
async def get_account_by_user_id(db: AsyncSession, user_id: int) -> Optional[Account]:
60-
"""Retrieves an account by its Telegram User ID."""
6158
result = await db.execute(select(Account).where(Account.user_telegram_id == user_id))
6259
return result.scalars().first()
6360

6461
async def get_all_accounts(db: AsyncSession) -> List[Account]:
65-
"""Retrieves all accounts from the database, with their sessions for status display."""
6662
result = await db.execute(select(Account).options(selectinload(Account.session)).order_by(Account.account_id))
6763
return result.scalars().all()
6864

6965
async def get_all_active_accounts(db: AsyncSession) -> List[Account]:
70-
"""Retrieves all enabled accounts from the database."""
7166
result = await db.execute(select(Account).where(Account.is_enabled == True))
7267
return result.scalars().all()
7368

7469
async def delete_account(db: AsyncSession, account_name: str) -> bool:
75-
"""Deletes an account by its name."""
7670
account = await get_account(db, account_name)
7771
if not account:
7872
return False
@@ -81,7 +75,6 @@ async def delete_account(db: AsyncSession, account_name: str) -> bool:
8175
return True
8276

8377
async def toggle_account_status(db: AsyncSession, account_name: str) -> Optional[bool]:
84-
"""Toggles the is_enabled status of an account."""
8578
account = await get_account(db, account_name)
8679
if not account:
8780
return None
@@ -90,14 +83,12 @@ async def toggle_account_status(db: AsyncSession, account_name: str) -> Optional
9083
return account.is_enabled
9184

9285
async def update_account_lang(db: AsyncSession, account_id: int, lang_code: str) -> bool:
93-
"""Updates the language for a specific account."""
9486
stmt = update(Account).where(Account.account_id == account_id).values(lang_code=lang_code)
9587
result = await db.execute(stmt)
9688
return result.rowcount > 0
9789

9890
# --- Session CRUD ---
9991
async def get_session(db: AsyncSession, account_id: int) -> Optional[Session]:
100-
"""Retrieves a session for a given account and decrypts its auth key."""
10192
result = await db.execute(select(Session).where(Session.account_id == account_id))
10293
session = result.scalars().first()
10394
if session and session.auth_key_data:
@@ -109,7 +100,6 @@ async def get_session(db: AsyncSession, account_id: int) -> Optional[Session]:
109100
return session
110101

111102
async def add_or_update_session(db: AsyncSession, **kwargs) -> Optional[Session]:
112-
"""Adds or updates a session in the database, encrypting the auth key."""
113103
account_id = kwargs.get("account_id")
114104
if not account_id: return None
115105

@@ -129,37 +119,17 @@ async def add_or_update_session(db: AsyncSession, **kwargs) -> Optional[Session]
129119
await db.flush()
130120
return session
131121

132-
async def delete_session(db: AsyncSession, account_id: int) -> bool:
133-
"""Deletes a session from the database."""
134-
stmt = delete(Session).where(Session.account_id == account_id)
135-
result = await db.execute(stmt)
136-
await db.flush()
137-
return result.rowcount > 0
138-
139122
# --- Log CRUD ---
140-
async def add_log(db: AsyncSession, level: str, message: str, account_id: Optional[int] = None, module_name: Optional[str] = None) -> Optional[Log]:
141-
"""Adds a log entry to the database."""
123+
async def add_logs_bulk(db: AsyncSession, logs: List[Dict[str, Any]]) -> None:
124+
"""Adds a batch of log entries to the database."""
125+
if not logs:
126+
return
142127
try:
143-
new_log = Log(
144-
level=level.upper(),
145-
message=message,
146-
account_id=account_id,
147-
module_name=module_name
148-
)
149-
db.add(new_log)
128+
# SQLAlchemy 2.0 style bulk insert
129+
db.add_all([Log(**log_data) for log_data in logs])
150130
await db.flush()
151-
return new_log
152131
except Exception as e:
153-
print(f"CRITICAL: Error adding log to database: {e}")
154-
return None
155-
156-
async def get_logs(db: AsyncSession, account_id: Optional[int] = None, limit: int = 100, level: Optional[str] = None) -> List[Log]:
157-
"""Retrieves logs from the database with optional filters."""
158-
stmt = select(Log).order_by(Log.timestamp.desc()).limit(limit)
159-
if account_id is not None:
160-
stmt = stmt.where(Log.account_id == account_id)
161-
if level is not None:
162-
stmt = stmt.where(Log.level == level.upper())
163-
164-
result = await db.execute(stmt)
165-
return result.scalars().all()
132+
# Fallback to printing if bulk DB logging fails
133+
print(f"CRITICAL: Error during bulk log insert: {e}")
134+
for log in logs:
135+
print(f"Fallback Log: [{log.get('level')}] {log.get('message')}")

userbot/src/log_handler.py

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import logging
22
import asyncio
3-
from typing import Optional
3+
from typing import Optional, Dict, Any
44

5-
from userbot.src.db_manager import add_log
5+
# A simple, thread-safe in-memory queue for log records.
6+
# asyncio.Queue is used because the consumer (writer) is async.
7+
log_queue: asyncio.Queue = asyncio.Queue()
68

79
class DBLogHandler(logging.Handler):
810
"""
9-
A custom logging handler that writes log records to the database.
11+
A custom logging handler that puts log records into an async queue.
1012
"""
1113
def __init__(self, level: int = logging.NOTSET) -> None:
1214
"""
@@ -19,34 +21,26 @@ def __init__(self, level: int = logging.NOTSET) -> None:
1921

2022
def emit(self, record: logging.LogRecord) -> None:
2123
"""
22-
Writes a log record to the database.
24+
Puts a log record into the in-memory queue.
2325
24-
This method is called by the logging framework for each log message.
25-
It schedules the database write operation on the running event loop.
26+
This operation is non-blocking and safe to call from synchronous code.
2627
2728
Args:
2829
record (logging.LogRecord): The log record to be processed.
2930
"""
30-
# We need to get the running event loop to schedule our async db call
3131
try:
32-
loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
33-
# Default values for account_id and module_name
34-
account_id: Optional[int] = getattr(record, 'account_id', None)
35-
module_name: Optional[str] = getattr(record, 'module_name', None)
36-
37-
# Fire and forget: schedule the coroutine to run on the loop
38-
asyncio.ensure_future(add_log(
39-
level=record.levelname,
40-
message=self.format(record),
41-
account_id=account_id,
42-
module_name=module_name
43-
), loop=loop)
44-
except RuntimeError:
45-
# This can happen if logging occurs when no event loop is running.
46-
# In this case, we can't log to the DB.
47-
# We can print to stderr as a fallback for critical logs.
48-
if record.levelno >= logging.ERROR:
49-
print(f"CRITICAL [DBLogHandler-Fallback]: Could not log to DB (no running loop): {self.format(record)}")
32+
log_entry: Dict[str, Any] = {
33+
'level': record.levelname,
34+
'message': self.format(record),
35+
'account_id': getattr(record, 'account_id', None),
36+
'module_name': getattr(record, 'module_name', None)
37+
}
38+
# put_nowait is a non-blocking call, safe for sync contexts.
39+
log_queue.put_nowait(log_entry)
40+
except asyncio.QueueFull:
41+
# This should rarely happen with a default-sized queue.
42+
# A fallback print is better than losing the log.
43+
print(f"CRITICAL [DBLogHandler-Fallback]: Log queue is full. Dropping log: {self.format(record)}")
5044
except Exception:
51-
# Fallback for any other error during logging to prevent crashes.
45+
# Broad exception to prevent any logging-related crash.
5246
print(f"CRITICAL [DBLogHandler-Fallback]: An unexpected error occurred in emit: {self.format(record)}")

0 commit comments

Comments
 (0)