From 88f51c77b4ec6496b09b61bd795a5a131e65dcdc Mon Sep 17 00:00:00 2001 From: Oleg Volchkov Date: Mon, 12 Jan 2026 11:57:59 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A3=D0=B2=D0=B5=D0=BB=D0=B8=D1=87=D0=B8?= =?UTF-8?q?=D0=BB=20=D1=82=D0=B0=D0=B9=D0=BC=D0=B0=D1=83=D1=82=20=D0=B8=20?= =?UTF-8?q?=D1=87=D0=B8=D1=81=D0=BB=D0=BE=20=D1=80=D0=B5=D1=82=D1=80=D0=B0?= =?UTF-8?q?=D0=B5=D0=B2=20=D0=B2=20=D0=90=D0=9F=D0=98=20=D0=A2=D0=B5=D0=BB?= =?UTF-8?q?=D0=B5=D0=B3=D1=80=D0=B0=D0=BC=D0=B0.=20=D0=94=D0=BE=D0=B1?= =?UTF-8?q?=D0=B0=D0=B2=D0=B8=D0=BB=20=D0=BE=D0=B1=D1=80=D0=B0=D0=B1=D0=BE?= =?UTF-8?q?=D1=82=D0=BA=D1=83=20=D0=BE=D1=88=D0=B8=D0=B1=D0=BE=D0=BA=20asy?= =?UTF-8?q?ncio=20=D0=B8=20=D0=B4=D0=BE=D0=BF=D0=BE=D0=BB=D0=BD=D0=B8?= =?UTF-8?q?=D0=BB=20=D0=BE=D1=88=D0=B8=D0=B1=D0=BA=D0=B8=20aiohttp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .flake8 | 2 +- tests/__init__.py | 8 +-- tests/test_app.py | 4 +- tgproxy/__init__.py | 4 +- tgproxy/app.py | 59 ++++++++++------------- tgproxy/channel.py | 91 ++++++++++++++--------------------- tgproxy/providers/errors.py | 6 ++- tgproxy/providers/telegram.py | 71 +++++++++++++++------------ tgproxy/queue.py | 6 +-- 9 files changed, 117 insertions(+), 134 deletions(-) diff --git a/.flake8 b/.flake8 index 128c63a..0729c5f 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,3 @@ [flake8] #ignore=E121,E123,E126,E226,E24,E704,W503,W504,E501 -max-line-length=160 +max-line-length=240 diff --git a/tests/__init__.py b/tests/__init__.py index c53db8f..0004ecd 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,12 +1,12 @@ import time -class AnyValue(): +class AnyValue: def __eq__(self, value): return True -class NowTimeDeltaValue(): +class NowTimeDeltaValue: def __init__(self, delta_sec=2.5): self._delta_sec = delta_sec self._last_time = None @@ -14,7 +14,7 @@ def __init__(self, delta_sec=2.5): def __eq__(self, value): value = round(value, 1) self._last_time = round(time.time(), 1) - return (value - self._delta_sec < self._last_time < value + self._delta_sec) + return value - self._delta_sec < self._last_time < value + self._delta_sec def __repr__(self): - return f'{self.__class__.__name__}<{self._last_time}±{self._delta_sec}>' + return f"{self.__class__.__name__}<{self._last_time}±{self._delta_sec}>" diff --git a/tests/test_app.py b/tests/test_app.py index 36fd974..aa0034f 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -205,7 +205,7 @@ async def test_no_reties_on_fatal_error(sut): assert sut.server.app["api"].channels["main"].qsize() == 0 assert sut.server.app["api"].channels["main"].stat() == { "errors": 1, - "last_error": "Status: 401. Body: {}", + "last_error": "telegram fatal error: Status: 401. Body: {}", "last_error_at": NowTimeDeltaValue(), "queued": 1, "sended": 0, @@ -255,7 +255,7 @@ async def test_channel_statistics(sut): await asyncio.sleep(1) assert await resp.json() == { "errors": 1, - "last_error": "Status: 401. Body: bad request", + "last_error": "telegram fatal error: Status: 401. Body: bad request", "last_error_at": NowTimeDeltaValue(), "queued": 3, "sended": 2, diff --git a/tgproxy/__init__.py b/tgproxy/__init__.py index d4429bf..f32b6c1 100644 --- a/tgproxy/__init__.py +++ b/tgproxy/__init__.py @@ -2,6 +2,6 @@ from tgproxy.channel import build_channel __all__ = [ - 'HttpAPI', - 'build_channel', + "HttpAPI", + "build_channel", ] diff --git a/tgproxy/app.py b/tgproxy/app.py index deec02e..5063282 100644 --- a/tgproxy/app.py +++ b/tgproxy/app.py @@ -5,38 +5,33 @@ import tgproxy.errors as errors -DEFAULT_LOGGER_NAME = 'tgproxy.app' +DEFAULT_LOGGER_NAME = "tgproxy.app" class BaseApp: def __init__(self, logger_name=DEFAULT_LOGGER_NAME): self.app = web.Application( middlewares=[ - self._error_middleware, + self._error_middleware, ], ) self._log = logging.getLogger(logger_name) - self.app.add_routes([ - web.get('/ping.html', self._on_ping), - ]) + self.app.add_routes( + [ + web.get("/ping.html", self._on_ping), + ] + ) def _success_response(self, status=200, **kwargs): return web.json_response( - data=dict( - status='success', - **kwargs - ), + data=dict(status="success", **kwargs), status=status, ) def _error_response(self, message, status=500, **kwargs): return web.json_response( - dict( - status='error', - message=message or 'Unknown error', - **kwargs - ), + dict(status="error", message=message or "Unknown error", **kwargs), status=status, ) @@ -54,7 +49,7 @@ async def _error_middleware(self, request, handler): async def _on_ping(self, request): return web.Response( - text='OK', + text="OK", ) @@ -63,18 +58,20 @@ def __init__(self, channels): super().__init__() self.channels = dict(channels) - self.app.add_routes([ - web.get('/', self._on_index), - web.get('/{channel_name}', self._on_channel_stat), - web.post('/{channel_name}', self._on_channel_send), - ]) + self.app.add_routes( + [ + web.get("/", self._on_index), + web.get("/{channel_name}", self._on_channel_stat), + web.post("/{channel_name}", self._on_channel_send), + ] + ) self.app.on_startup.append(self.start_background_channels_tasks) self.app.on_shutdown.append(self.stop_background_channels_tasks) self.background_tasks = list() async def start_background_channels_tasks(self, app): - self._log.info('Start background tasks') + self._log.info("Start background tasks") for ch in self.channels.values(): self.background_tasks.append( asyncio.create_task( @@ -84,31 +81,29 @@ async def start_background_channels_tasks(self, app): ) async def stop_background_channels_tasks(self, app): - self._log.info('Stop background tasks') + self._log.info("Stop background tasks") for task in self.background_tasks: task.cancel() await task def _get_task_state(self, task): if task.cancelled(): - return 'cancelled' + return "cancelled" if task.done(): - return 'done' - return 'active' + return "done" + return "active" def _has_failed_workers(self): return any(map(lambda x: x.cancelled() or x.done(), self.background_tasks)) def _workers(self): - return { - task.get_name(): self._get_task_state(task) for task in self.background_tasks - } + return {task.get_name(): self._get_task_state(task) for task in self.background_tasks} async def _on_ping(self, request): if self._has_failed_workers(): return self._error_response( status=502, - message='Background workers canceled', + message="Background workers canceled", workers=self._workers(), ) @@ -118,13 +113,11 @@ async def _on_ping(self, request): async def _on_index(self, request): return self._success_response( - channels={ - name: str(ch) for name, ch in self.channels.items() - }, + channels={name: str(ch) for name, ch in self.channels.items()}, ) def _get_channel(self, request): - channel_name = request.match_info['channel_name'] + channel_name = request.match_info["channel_name"] channel = self.channels.get(channel_name) if not channel: raise errors.ChannelNotFound(f'Channel "{channel_name}" not found') diff --git a/tgproxy/channel.py b/tgproxy/channel.py index 86c6481..3e9e28d 100644 --- a/tgproxy/channel.py +++ b/tgproxy/channel.py @@ -11,7 +11,7 @@ import tgproxy.utils as utils from tgproxy.queue import MemoryQueue -DEFAULT_LOGGER_NAME = 'tgproxy.channel' +DEFAULT_LOGGER_NAME = "tgproxy.channel" CHANNELS_TYPES = dict() @@ -35,19 +35,13 @@ def build_channel(url, **kwargs): class Message: # dict: name: {default: value} request_fields = { - 'text': {'default': ''}, - 'request_id': {}, + "text": {"default": ""}, + "request_id": {}, } @classmethod def from_request(cls, request): - message = cls( - **{ - f: request.get(f, v.get('default')) - for f, v in cls.request_fields.items() - if request.get(f, v.get('default')) is not None - } - ) + message = cls(**{f: request.get(f, v.get("default")) for f, v in cls.request_fields.items() if request.get(f, v.get("default")) is not None}) return message def __init__(self, text, request_id=None, **options): @@ -61,7 +55,7 @@ def __repr__(self): class BaseChannel: - schema = '-' + schema = "-" message_class = Message @classmethod @@ -74,7 +68,7 @@ def __init__(self, name, queue, provider, send_banner_on_startup=False, logger_n self.send_banner_on_startup = send_banner_on_startup self._queue = queue or MemoryQueue() - self._log = logging.getLogger(f'{logger_name}.{name}') + self._log = logging.getLogger(f"{logger_name}.{name}") self._stat = dict( queued=0, sended=0, @@ -85,7 +79,7 @@ def __init__(self, name, queue, provider, send_banner_on_startup=False, logger_n ) self._retryMessageDelayInSeconds = 5 - self._log.info(f'self.send_banner_on_startup == {self.send_banner_on_startup}') + self._log.info(f"self.send_banner_on_startup == {self.send_banner_on_startup}") def qsize(self): return self._queue.qsize() @@ -102,7 +96,7 @@ async def put(self, message): await self._enqueue(message) async def process(self): - self._log.info(f'Start queue processor for {self}') + self._log.info(f"Start queue processor for {self}") if self.send_banner_on_startup: await self.put(self._get_banner()) @@ -111,7 +105,7 @@ async def process(self): while True: message = await self._dequeue() - self._log.info(f'Send message: {message}') + self._log.info(f"Send message: {message}") while True: error = await self._send_message(provider, message) if error is None or isinstance(error, providers.errors.ProviderFatalError): @@ -120,39 +114,39 @@ async def process(self): await self._queue.task_done() except asyncio.CancelledError: - self._log.info(f'Finish queue processor. Queue size: {self._queue.qsize()}') + self._log.info(f"Finish queue processor. Queue size: {self._queue.qsize()}") except Exception as e: self._log.error(str(e), exc_info=sys.exc_info()) - self._log.info(f'Failed queue processor. Queue size: {self._queue.qsize()}') + self._log.info(f"Failed queue processor. Queue size: {self._queue.qsize()}") raise async def _enqueue(self, message): - self._log.info(f'Enque message: {message}') + self._log.info(f"Enque message: {message}") await self._queue.enqueue(message) - self._stat['queued'] += 1 + self._stat["queued"] += 1 async def _dequeue(self): message = await self._queue.dequeue() - self._log.info(f'Deque message: {message}') + self._log.info(f"Deque message: {message}") return message def _get_banner(self): return self.message_class.from_request( - dict(text=f'Start tgproxy on {socket.gethostname()}'), + dict(text=f"Start tgproxy on {socket.gethostname()}"), ) async def _send_message(self, provider, message): # return Exception if failed try: await provider.send_message(message) - self._log.info(f'Message sended: {message}') - self._stat['sended'] += 1 - self._stat['last_sended_at'] = round(time.time(), 3) + self._log.info(f"Message sended: {message}") + self._stat["sended"] += 1 + self._stat["last_sended_at"] = round(time.time(), 3) except providers.errors.ProviderError as e: - self._stat['errors'] += 1 - self._stat['last_error'] = str(e) - self._stat['last_error_at'] = round(time.time(), 3) - self._log.error(f'Error: {str(e)} Message: {message}', exc_info=sys.exc_info()) + self._stat["errors"] += 1 + self._stat["last_error"] = str(e) + self._stat["last_error_at"] = round(time.time(), 3) + self._log.error(f"Error: {str(e)} Message: {message}", exc_info=sys.exc_info()) return e return None @@ -160,52 +154,37 @@ async def _send_message(self, provider, message): class TelegramMessage(Message): request_fields = { - 'text': {'default': ''}, - 'request_id': {}, - 'parse_mode': {}, - 'disable_web_page_preview': {'default': 0}, - 'disable_notifications': {'default': 0}, - 'reply_to_message_id': {}, + "text": {"default": ""}, + "request_id": {}, + "parse_mode": {}, + "disable_web_page_preview": {"default": 0}, + "disable_notifications": {"default": 0}, + "reply_to_message_id": {}, } class TelegramChannel(BaseChannel): - schema = 'telegram' + schema = "telegram" message_class = TelegramMessage @classmethod def from_url(cls, url, queue=None, **kwargs): parsed_url, options = utils.parse_url(url) - return cls( - name=parsed_url.path.strip('/'), - queue=queue, - provider=providers.TelegramChat( - chat_id=parsed_url.hostname, - bot_token=f'{parsed_url.username}:{parsed_url.password}', - **(options | kwargs) - ), - **(options | kwargs) - ) + return cls(name=parsed_url.path.strip("/"), queue=queue, provider=providers.TelegramChat(chat_id=parsed_url.hostname, bot_token=f"{parsed_url.username}:{parsed_url.password}", **(options | kwargs)), **(options | kwargs)) def __init__(self, name, queue, provider, send_banner_on_startup=True, **kwargs): - super().__init__( - name, - queue, - provider, - send_banner_on_startup=bool(int(send_banner_on_startup)), - **kwargs - ) + super().__init__(name, queue, provider, send_banner_on_startup=bool(int(send_banner_on_startup)), **kwargs) self._bot_token = provider.bot_token - self._bot_name = self._bot_token[:self._bot_token.find(":")] + self._bot_name = self._bot_token[: self._bot_token.find(":")] self._chat_id = provider.chat_id self._channel_options = dict(kwargs) def __str__(self): - co = [f'{k}={v}' for k, v in self._channel_options.items()] + co = [f"{k}={v}" for k, v in self._channel_options.items()] co = f'{"&".join(co)}' if co: - co = f'?{co}' - return f'{self.schema}://{self._bot_name}:***@{self._chat_id}/{self.name}{co}' + co = f"?{co}" + return f"{self.schema}://{self._bot_name}:***@{self._chat_id}/{self.name}{co}" register_channel_type(TelegramChannel) diff --git a/tgproxy/providers/errors.py b/tgproxy/providers/errors.py index 0000e86..6bbbe04 100644 --- a/tgproxy/providers/errors.py +++ b/tgproxy/providers/errors.py @@ -3,8 +3,10 @@ class ProviderError(Exception): class ProviderFatalError(ProviderError): - pass + def __init__(self, source=None, *args, **kwargs): + super().__init__(f"telegram fatal error: {source}" if source is not None else None, *args, **kwargs) class ProviderTemporaryError(ProviderError): - pass + def __init__(self, source=None, *args, **kwargs): + super().__init__(f"telegram temporary error: {source}" if source is not None else None, *args, **kwargs) diff --git a/tgproxy/providers/telegram.py b/tgproxy/providers/telegram.py index d56c4ef..84447cb 100644 --- a/tgproxy/providers/telegram.py +++ b/tgproxy/providers/telegram.py @@ -1,16 +1,18 @@ +import asyncio import contextlib import logging import aiohttp import tenacity -from .errors import ProviderFatalError, ProviderTemporaryError +from .errors import ProviderError, ProviderFatalError, ProviderTemporaryError -TELEGRAM_API_URL = 'https://api.telegram.org' -DEFAULT_LOGGER_NAME = 'tgproxy.providers.telegram' +TELEGRAM_API_URL = "https://api.telegram.org" +DEFAULT_LOGGER_NAME = "tgproxy.providers.telegram" +DEFAULT_TELEGRAM_TIMEOUT = 25 DEFAULT_RETRIES_OPTIONS = dict( - stop=tenacity.stop_after_attempt(10), + stop=tenacity.stop_after_attempt(15), wait=tenacity.wait_random_exponential( multiplier=1, min=2, @@ -18,18 +20,27 @@ ), ) +TELEGRAM_TEMPORARY_ERRORS = ( + aiohttp.ClientConnectionError, + aiohttp.ClientResponseError, + aiohttp.NonHttpUrlClientError, + aiohttp.ClientPayloadError, + asyncio.TimeoutError, + asyncio.CancelledError, +) + class TelegramChat: - def __init__(self, chat_id, bot_token, api_url=TELEGRAM_API_URL, timeout=5, logger_name=DEFAULT_LOGGER_NAME, **kwargs): + def __init__(self, chat_id, bot_token, api_url=TELEGRAM_API_URL, timeout=DEFAULT_TELEGRAM_TIMEOUT, logger_name=DEFAULT_LOGGER_NAME, **kwargs): self.chat_id = chat_id self.bot_token = bot_token - self.bot_name = self.bot_token[:self.bot_token.find(":")] + self.bot_name = self.bot_token[: self.bot_token.find(":")] self.api_url = api_url self.bot_url = f'{self.api_url.rstrip("/")}/bot{self.bot_token}' self.timeout = int(timeout) - self._log = logging.getLogger(f'{logger_name}.bot{self.bot_name}.{self.chat_id}') + self._log = logging.getLogger(f"{logger_name}.bot{self.bot_name}.{self.chat_id}") self.http_timeout = aiohttp.ClientTimeout(total=self.timeout) self._http_client = None @@ -41,13 +52,10 @@ def __init__(self, chat_id, bot_token, api_url=TELEGRAM_API_URL, timeout=5, logg ) async def send_message(self, message): - self._log.info(f'Send message {message}') + self._log.info(f"Send message {message}") await self._request( - 'sendMessage', - request_data=dict( - text=message.text, - **message.options - ), + "sendMessage", + request_data=dict(text=message.text, **message.options), ) @contextlib.asynccontextmanager @@ -59,30 +67,24 @@ async def session(self): async def _request(self, method, request_data): if not self._http_client: - raise RuntimeError('Call requests with in session context manager') + raise RuntimeError("Call requests with in session context manager") - @tenacity.retry( - reraise=True, - **self._retries_options - ) + @tenacity.retry(reraise=True, **self._retries_options) async def _call_request_with_retries(): try: resp = await self._http_client.post( - f'{self.bot_url}/{method}', - data=dict( - chat_id=self.chat_id, - **request_data - ), + f"{self.bot_url}/{method}", + data=dict(chat_id=self.chat_id, **request_data), timeout=self.http_timeout, allow_redirects=False, ) return await self._process_response(resp) - except (aiohttp.ClientConnectionError, aiohttp.ClientResponseError) as e: - raise ProviderTemporaryError(str(e)) - except ProviderTemporaryError: + except TELEGRAM_TEMPORARY_ERRORS as e: + raise ProviderTemporaryError({str(e)}) from e + except ProviderError: raise except Exception as e: - raise ProviderFatalError(str(e)) + raise ProviderFatalError(str(e)) from e return await _call_request_with_retries() @@ -90,15 +92,22 @@ async def _process_response(self, response): if response.ok: return (response.status, await response.json()) - resp_text = '' + resp_text = "" try: resp_text = await response.text() except aiohttp.ClientConnectionError: pass - if response.status >= 400 and response.status <= 499 and response.status not in [400, ]: + if ( + response.status >= 400 + and response.status <= 499 + and response.status + not in [ + 400, + ] + ): # На 400 не ретраимся, потому что иногда бывает временно: # {"ok":false,"error_code":400,"description":"Bad Request: not enough rights to send text messages to the chat"} - raise ProviderFatalError(f'Status: {response.status}. Body: {resp_text}') + raise ProviderFatalError(f"Status: {response.status}. Body: {resp_text}") - raise ProviderTemporaryError(f'Status: {response.status}. Body: {resp_text}') + raise ProviderTemporaryError(f"Status: {response.status}. Body: {resp_text}") diff --git a/tgproxy/queue.py b/tgproxy/queue.py index e39aa60..421cc6c 100644 --- a/tgproxy/queue.py +++ b/tgproxy/queue.py @@ -3,7 +3,7 @@ import tgproxy.errors as errors -DEFAULT_LOGGER_NAME = 'tgproxy.queue.memory' +DEFAULT_LOGGER_NAME = "tgproxy.queue.memory" DEFAULT_QUEUE_MAXSIZE = 10000 @@ -32,10 +32,10 @@ def qsize(self): async def enqueue(self, message): try: # Кладем очередь без блокировок на ожидании особождения места в очереди - self._log.info(f'Enque message {message}') + self._log.info(f"Enque message {message}") self._queue.put_nowait(message) except asyncio.QueueFull: - raise errors.QueueFull(f'Queue is full. Max size is {self._queue.maxsize}') + raise errors.QueueFull(f"Queue is full. Max size is {self._queue.maxsize}") async def dequeue(self): return await self._queue.get()