forked from remixer-dec/botality-ii
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmiddleware.py
More file actions
79 lines (69 loc) · 2.58 KB
/
middleware.py
File metadata and controls
79 lines (69 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from aiogram.dispatcher.flags import get_flag
from aiogram.utils.chat_action import ChatActionSender
from aiogram import BaseMiddleware
from config_reader import config
from custom_queue import CallCooldown
from collections import defaultdict
import asyncio
import logging
logger = logging.getLogger(__name__)
class ChatActionMiddleware(BaseMiddleware):
async def __call__(self, handler, event, data):
long_operation_type = get_flag(data, "long_operation")
if not long_operation_type:
return await handler(event, data)
async with ChatActionSender(action=long_operation_type, chat_id=event.chat.id, bot=data["bot"]):
return await handler(event, data)
class AccessMiddleware(BaseMiddleware):
async def __call__(self, handler, event, data):
uid = event.from_user.id
cid = event.chat.id
logger.info(f'message in chat {cid} ({event.chat.title or "private"}) from {uid} (@{event.from_user.username or event.from_user.first_name})')
if config.ignore_mode == 'whitelist' or config.ignore_mode == 'both':
if cid not in config.whitelist:
return
if config.ignore_mode == 'blacklist' or config.ignore_mode == 'both':
if uid in config.blacklist or cid in config.blacklist:
return
if get_flag(data, "admins_only"):
if uid not in config.adminlist:
return
return await handler(event, data)
class CooldownMiddleware(BaseMiddleware):
async def __call__(self, handler, event, data):
cooldown_seconds = get_flag(data, "cooldown")
if cooldown_seconds:
function_name = data['handler'].callback.__name__
if CallCooldown.check_call(event.from_user.id, function_name, cooldown_seconds):
return await handler(event, data)
else:
return
else:
return await handler(event, data)
class MediaGroupMiddleware(BaseMiddleware):
albums = defaultdict(lambda: [])
def __init__(self, delay = 1):
self.delay = delay
async def __call__(self, handler, event, data):
if not event.media_group_id:
return await handler(event, data)
try:
self.albums[event.media_group_id].append(event)
await asyncio.sleep(self.delay)
data["album"] = self.albums.pop(event.media_group_id)
except Exception as e:
logger.error(e)
return await handler(event, data)
class CounterMiddleware(BaseMiddleware):
def __init__(self, dp):
self.dp = dp
self.counter = 0
async def __call__(
self,
make_request,
bot,
method
):
self.counter += 1
self.dp.counters['msg'] = self.counter
return await make_request(bot, method)