diff --git a/README.md b/README.md
index 0175e02..4d467a8 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,6 @@
- 🎵 Поиск и прослушивание музыки
- 🎧 Высокое качество звука
-- 🌅 Аудио с обложкой
- 📱 Интуитивный интерфейс
- 🌍 Поддержка нескольких языков (🇬🇧 English, 🇷🇺 Русский)
- 🌍 Автоопределение языка пользователя
diff --git a/bot/__init__.py b/bot/__init__.py
index e69de29..7c3397f 100644
--- a/bot/__init__.py
+++ b/bot/__init__.py
@@ -0,0 +1 @@
+"""Bot package."""
diff --git a/bot/filters/__init__.py b/bot/filters/__init__.py
index 9a7b82c..6128a91 100644
--- a/bot/filters/__init__.py
+++ b/bot/filters/__init__.py
@@ -1,6 +1,7 @@
"""Filters for the bot."""
+
+from .is_private_chat import IsPrivateChatFilter
from .language import LanguageFilter
from .not_subbed import NotSubbedFilter
-
-__all__ = ['LanguageFilter', 'NotSubbedFilter']
+__all__ = ["IsPrivateChatFilter", "LanguageFilter", "NotSubbedFilter"]
diff --git a/bot/filters/is_private_chat.py b/bot/filters/is_private_chat.py
new file mode 100644
index 0000000..3b2c406
--- /dev/null
+++ b/bot/filters/is_private_chat.py
@@ -0,0 +1,25 @@
+"""Filter for checking if the chat is private."""
+
+from __future__ import annotations
+
+from aiogram.enums import ChatType
+from aiogram.filters import Filter
+from aiogram.types import CallbackQuery, Message
+
+
+class IsPrivateChatFilter(Filter):
+ """Filter for checking if the chat is private."""
+
+ async def __call__(
+ self,
+ event: Message | CallbackQuery,
+ ) -> bool:
+ """Check if the chat is private."""
+ if isinstance(event, CallbackQuery):
+ message = event.message
+ if not isinstance(message, Message):
+ return False
+ else:
+ message = event
+
+ return message.chat.type == ChatType.PRIVATE
diff --git a/bot/filters/language.py b/bot/filters/language.py
index 373a20a..042aa0d 100644
--- a/bot/filters/language.py
+++ b/bot/filters/language.py
@@ -1,20 +1,30 @@
"""Check language filter."""
-from aiogram import types
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
from aiogram.filters import Filter
-from database.models import User
+
from locales import support_languages
+if TYPE_CHECKING:
+ from aiogram import types
+
+ from database.models import User
+
class LanguageFilter(Filter):
- """
- A filter that checks if a user's language matches the specified language.
- """
+ """checks if user's language matches the specified language."""
async def __call__(
- self, update: types.Message | types.CallbackQuery, user: User
+ self,
+ update: types.Message | types.CallbackQuery, # noqa: ARG002
+ user: User,
) -> bool:
- """
- Check if the user's language matches the specified language.
- """
- return not support_languages.is_supported(user.language_code)
+ """Check if the user's language matches the specified language."""
+ language_code = user.language_code
+ if language_code is None:
+ return True
+
+ return not support_languages.is_supported(language_code)
diff --git a/bot/filters/not_subbed.py b/bot/filters/not_subbed.py
index bd9d70d..d67a516 100644
--- a/bot/filters/not_subbed.py
+++ b/bot/filters/not_subbed.py
@@ -1,35 +1,41 @@
"""Check if user is subbed to the channel."""
+
+from __future__ import annotations
+
import logging
-from aiogram import types
-from aiogram.filters import Filter
-from aiogram import Bot
+from typing import TYPE_CHECKING, ClassVar
+
from aiogram.enums import ChatMemberStatus
-from database.models import RequiredSubscriptions, User
+from aiogram.filters import Filter
+
from database.crud import CRUD
+from database.models import RequiredSubscriptions, User
+
+if TYPE_CHECKING:
+ from aiogram import Bot, types
logger = logging.getLogger(__name__)
class NotSubbedFilter(Filter):
- """
- A filter that checks if a user is subbed to the channel.
- """
+ """A filter that checks if a user is subbed to the channel."""
- ALLOWED_STATUSES = {
+ ALLOWED_STATUSES: ClassVar[set[ChatMemberStatus]] = {
ChatMemberStatus.MEMBER,
ChatMemberStatus.ADMINISTRATOR,
ChatMemberStatus.CREATOR,
}
async def __call__(
- self, update: types.Message | types.CallbackQuery, user: User, bot: Bot
+ self,
+ update: types.Message | types.CallbackQuery, # noqa: ARG002
+ user: User,
+ bot: Bot,
) -> bool:
- """
- Check if the user is not subscribed to any required channels.
- Returns True if user is NOT subscribed to ANY required channel.
- Returns False if user is subscribed to ALL required channels.
- """
- chats = await CRUD(RequiredSubscriptions).get_all()
+ """Check if the user is not subscribed to any required channels."""
+ chats: list[RequiredSubscriptions] = await CRUD(
+ RequiredSubscriptions,
+ ).get_all()
if not chats:
return False
@@ -40,21 +46,21 @@ async def __call__(
return False
async def _not_subscribe(
- self, sub: RequiredSubscriptions, user: User, bot: Bot
+ self,
+ sub: RequiredSubscriptions,
+ user: User,
+ bot: Bot,
) -> bool:
- """
- Check if the user is subscribed to the channel.
- Returns True if user is NOT subscribed.
- Returns False if user is subscribed or if channel is not accessible.
- """
+ """Check if the user is subscribed to the channel."""
try:
chat = await bot.get_chat(sub.chat_id)
- except Exception as e:
- logger.error(f"Failed to get chat {sub.chat_id}: {e}")
+ except Exception:
+ logger.exception("Failed to get chat %s", sub.chat_id)
return False
try:
member = await chat.get_member(user.id)
- return member.status not in self.ALLOWED_STATUSES
- except Exception:
+ except Exception: # noqa: BLE001
return True
+
+ return member.status not in self.ALLOWED_STATUSES
diff --git a/bot/handlers/__init__.py b/bot/handlers/__init__.py
index 82d0393..f875220 100644
--- a/bot/handlers/__init__.py
+++ b/bot/handlers/__init__.py
@@ -1,27 +1,27 @@
-"""Setup router for the bot."""
+"""Set up handlers for the bot."""
+
from aiogram import Dispatcher, Router
-from . import (
- get_track,
- language,
- menu,
- faq,
- search,
- pages,
- subscribe
-)
+
+from bot.filters import IsPrivateChatFilter
+
+from . import faq, get_track, language, menu, pages, search, subscribe
def setup(dp: Dispatcher) -> None:
- """Setup handlers for the bot."""
+ """Set up handlers for the bot."""
router = Router()
+ # Add filters
+ router.message.filter(IsPrivateChatFilter())
+ router.callback_query.filter(IsPrivateChatFilter())
+
# Register routers
language.register(router)
menu.register(router)
faq.register(router)
- subscribe.register(router)
search.register(router)
pages.register(router)
+ subscribe.register(router)
get_track.register(router)
dp.include_router(router)
diff --git a/bot/handlers/faq.py b/bot/handlers/faq.py
index 3baa00e..069e3b6 100644
--- a/bot/handlers/faq.py
+++ b/bot/handlers/faq.py
@@ -1,7 +1,10 @@
"""FAQ handler for the bot."""
+
import logging
-from aiogram import types, Router, F
+
+from aiogram import F, Router, types
from aiogram.utils.i18n import gettext
+
from bot.keyboards import inline
logger = logging.getLogger(__name__)
@@ -10,14 +13,21 @@
async def faq_handler(callback: types.CallbackQuery) -> None:
"""FAQ handler."""
try:
+ if callback.message is None or isinstance(
+ callback.message,
+ types.InaccessibleMessage,
+ ):
+ await callback.answer(gettext("cannot_edit_message"))
+ return
+
await callback.message.edit_text(
gettext("faq"),
- reply_markup=inline.get_back_keyboard(gettext, "menu")
+ reply_markup=inline.get_back_keyboard(gettext, "menu"),
)
- except Exception as e:
- logger.error("Failed to send message: %s", e)
+ except Exception:
+ logger.exception("Failed to send message")
def register(router: Router) -> None:
- """Registers FAQ handler with the router."""
+ """Register FAQ handler with the router."""
router.callback_query.register(faq_handler, F.data == "faq")
diff --git a/bot/handlers/get_track.py b/bot/handlers/get_track.py
index b70b7cd..1743147 100644
--- a/bot/handlers/get_track.py
+++ b/bot/handlers/get_track.py
@@ -1,11 +1,13 @@
"""Get track handler for the bot."""
+
import logging
-from aiogram import types, Router, F, Bot
+
+from aiogram import Bot, F, Router, types
from aiogram.types import BufferedInputFile
from aiogram.utils.i18n import gettext
-from service import Music, Track
-from bot.utils import load_tracks_from_db
+from bot.utils import load_tracks_from_db
+from service import Music, Track
logger = logging.getLogger(__name__)
@@ -13,67 +15,88 @@
async def send_track(
callback: types.CallbackQuery,
bot: Bot,
- track: Track
+ track: Track,
) -> None:
"""Send track."""
try:
+ if callback.message is None or callback.message.chat is None:
+ await callback.answer(gettext("cannot_access_chat"))
+ return
+
await bot.send_chat_action(callback.message.chat.id, "upload_document")
async with Music() as service:
audio_bytes = await service.get_audio_bytes(track)
- thumbnail_bytes = await service.get_thumbnail_bytes(track)
audio_file = BufferedInputFile(audio_bytes, filename=track.name)
- thumbnail_file = BufferedInputFile(
- thumbnail_bytes, filename=track.name
- )
+
+ me = await bot.get_me()
+
+ if callback.message is None:
+ await callback.answer("Cannot send message")
+ return
await callback.message.answer_audio(
audio_file,
title=track.title,
performer=track.performer,
- caption=gettext("promo_caption").format(username=bot._me.username),
- thumbnail=thumbnail_file,
+ caption=gettext("promo_caption").format(username=me.username),
)
- except Exception as e:
- await callback.message.answer(gettext("send_track_error"))
- logger.error("Failed to send track: %s", e)
+ except Exception:
+ if callback.message is not None:
+ await callback.message.answer(gettext("send_track_error"))
+ else:
+ await callback.answer(gettext("send_track_error"))
+ logger.exception("Failed to send track")
async def get_track_handler(callback: types.CallbackQuery, bot: Bot) -> None:
"""Get track handler."""
try:
- _, _, search_id, index = callback.data.split(":")
+ if callback.data is None:
+ await callback.answer(gettext("invalid_data"))
+ return
+
+ _, _, search_id_str, index = callback.data.split(":")
+ search_id = int(search_id_str)
tracks: list[Track] = await load_tracks_from_db(search_id)
track: Track = tracks[int(index)]
await callback.answer(gettext("track_sending"))
await send_track(callback, bot, track)
- except Exception as e:
- logger.error("Failed get track handler: %s", e)
+ except Exception:
+ logger.exception("Failed get track handler")
async def get_all_from_page_handler(
- callback: types.CallbackQuery, bot: Bot
+ callback: types.CallbackQuery,
+ bot: Bot,
) -> None:
"""Get all tracks from page handler."""
try:
- _, _, search_id, start_indx, end_indx = callback.data.split(":")
+ if callback.data is None:
+ await callback.answer(gettext("invalid_data"))
+ return
+
+ _, _, search_id_str, start_indx, end_indx = callback.data.split(":")
+ search_id = int(search_id_str)
all_tracks: list[Track] = await load_tracks_from_db(search_id)
- page_tracks = all_tracks[int(start_indx):int(end_indx)]
+ page_tracks = all_tracks[int(start_indx) : int(end_indx)]
await callback.answer(gettext("track_sending"))
for track in page_tracks:
await send_track(callback, bot, track)
- except Exception as e:
- logger.error("Failed get all from page handler: %s", e)
+ except Exception:
+ logger.exception("Failed get all from page handler")
def register(router: Router) -> None:
- """Registers get track handler with the router."""
+ """Register get track handler with the router."""
router.callback_query.register(
- get_track_handler, F.data.startswith("track:get:")
+ get_track_handler,
+ F.data.startswith("track:get:"),
)
router.callback_query.register(
- get_all_from_page_handler, F.data.startswith("track:all:")
+ get_all_from_page_handler,
+ F.data.startswith("track:all:"),
)
diff --git a/bot/handlers/language.py b/bot/handlers/language.py
index fe9522e..00d4076 100644
--- a/bot/handlers/language.py
+++ b/bot/handlers/language.py
@@ -1,66 +1,82 @@
"""Language handler for the bot."""
+
+from __future__ import annotations
+
import logging
-from typing import Union
-from aiogram import types, Router, Bot, F
+
+from aiogram import Bot, F, Router, types
+from aiogram.filters import Command
from aiogram.utils import i18n
from aiogram.utils.i18n import gettext
-from aiogram.filters import Command
+
from bot.filters import LanguageFilter
-from bot.keyboards import inline, command
+from bot.keyboards import command, inline
from database.crud import CRUD
from database.models import User
from locales import support_languages
-from .menu import menu_handler
+from .menu import menu_handler
logger = logging.getLogger(__name__)
async def language_handler(
- event: Union[types.Message, types.CallbackQuery],
+ event: types.Message | types.CallbackQuery,
user: User,
) -> None:
"""Language handler."""
try:
+ language_code = user.language_code
text = (
gettext("language_choose")
- if support_languages.is_supported(user.language_code)
+ if language_code is not None
+ and support_languages.is_supported(language_code)
else "🌎 Choose language:"
)
keyboard = inline.language_keyboard
if isinstance(event, types.CallbackQuery):
+ if event.message is None or isinstance(
+ event.message,
+ types.InaccessibleMessage,
+ ):
+ await event.answer(gettext("cannot_edit_message"))
+ return
+
await event.message.edit_text(text, reply_markup=keyboard)
else:
await event.answer(text, reply_markup=keyboard)
- except Exception as e:
- logger.error("Failed to send message: %s", e)
+ except Exception:
+ logger.exception("Failed to send message")
async def language_set_handler(
callback: types.CallbackQuery,
user: User,
- bot: Bot
+ bot: Bot,
) -> None:
"""Language set handler."""
try:
- language_code = callback.data.split(":")[-1]
+ if callback.data:
+ language_code = callback.data.split(":")[-1]
+
+ user_crud = CRUD(User)
+ await user_crud.update(user, language_code=language_code)
+ i18n.get_i18n().ctx_locale.set(language_code)
- user_crud = CRUD(User)
- await user_crud.update(user, language_code=language_code)
- i18n.get_i18n().ctx_locale.set(language_code)
+ await bot.set_my_commands(command.get_commands(gettext))
+ await menu_handler(callback)
- await bot.set_my_commands(command.get_commands(gettext))
- await menu_handler(callback)
- except Exception as e:
- logger.error("Failed to set language: %s", e)
+ except Exception:
+ logger.exception("Failed to set language")
def register(router: Router) -> None:
- """Registers language handler with the router."""
+ """Register language handler with the router."""
router.message.register(language_handler, LanguageFilter())
router.message.register(language_handler, Command("language"))
router.callback_query.register(language_handler, F.data == "language")
router.callback_query.register(
- language_set_handler, F.data.startswith("language:set:")
+ language_set_handler,
+ F.data.startswith("language:set:"),
)
diff --git a/bot/handlers/menu.py b/bot/handlers/menu.py
index 0992752..2e83469 100644
--- a/bot/handlers/menu.py
+++ b/bot/handlers/menu.py
@@ -1,17 +1,20 @@
"""Menu handler for the bot."""
+
+from __future__ import annotations
+
import logging
-from typing import Union
-from aiogram import types, Router, F
-from aiogram.filters import CommandStart, Command
+
+from aiogram import F, Router, types
+from aiogram.filters import Command, CommandStart
from aiogram.utils.i18n import gettext
-from bot.keyboards import inline
+from bot.keyboards import inline
logger = logging.getLogger(__name__)
async def menu_handler(
- event: Union[types.Message, types.CallbackQuery]
+ event: types.Message | types.CallbackQuery,
) -> None:
"""Menu handler."""
try:
@@ -19,15 +22,22 @@ async def menu_handler(
keyboard = inline.get_menu_keyboard(gettext)
if isinstance(event, types.CallbackQuery):
+ if event.message is None or isinstance(
+ event.message,
+ types.InaccessibleMessage,
+ ):
+ await event.answer(gettext("cannot_edit_message"))
+ return
+
await event.message.edit_text(text, reply_markup=keyboard)
else:
await event.answer(text, reply_markup=keyboard)
- except Exception as e:
- logger.error("Failed to handle menu event: %s", e)
+ except Exception:
+ logger.exception("Failed to handle menu event")
def register(router: Router) -> None:
- """Registers start handler with the router."""
+ """Register start handler with the router."""
router.message.register(menu_handler, CommandStart())
router.message.register(menu_handler, Command("menu"))
router.callback_query.register(menu_handler, F.data == "menu")
diff --git a/bot/handlers/pages.py b/bot/handlers/pages.py
index 31d6c4c..997d35a 100644
--- a/bot/handlers/pages.py
+++ b/bot/handlers/pages.py
@@ -1,31 +1,60 @@
"""Pages handler for the bot."""
+
import logging
-from aiogram import types, Router, F
-from service.data import Track
+from typing import TYPE_CHECKING
+
+from aiogram import F, Router, types
+from aiogram.exceptions import TelegramBadRequest
+from aiogram.utils.i18n import gettext
+
from bot.keyboards import inline
from bot.utils import load_tracks_from_db
+if TYPE_CHECKING:
+ from service.data import Track
logger = logging.getLogger(__name__)
async def pages_handler(callback: types.CallbackQuery) -> None:
- """Handles the pages navigation."""
+ """Handle the pages navigation."""
try:
- _, _, search_id, page = callback.data.split(":")
- tracks: list[Track] = await load_tracks_from_db(search_id)
+ if not callback.data:
+ await callback.answer(gettext("invalid_data"))
+ return
+
+ data_parts = callback.data.split(":")
+ if len(data_parts) < 4: # noqa: PLR2004
+ await callback.answer(gettext("invalid_data"))
+ return
+
+ _, _, search_id, page = data_parts
+ tracks: list[Track] = await load_tracks_from_db(int(search_id))
+
+ if callback.message is None or isinstance(
+ callback.message,
+ types.InaccessibleMessage,
+ ):
+ await callback.answer(gettext("cannot_edit_message"))
+ return
await callback.message.edit_reply_markup(
reply_markup=inline.get_keyboard_of_tracks(
- tracks, search_id, int(page)
- )
+ tracks,
+ int(search_id),
+ int(page),
+ ),
)
- except Exception as e:
- logger.error("Failed to send message: %s", e)
+ except TelegramBadRequest:
+ await callback.answer(gettext("cannot_edit_message"))
+ except Exception:
+ logger.exception("Failed to handle pages navigation")
+ await callback.answer(gettext("error_occurred"))
def register(router: Router) -> None:
- """Registers pages handler with the router."""
+ """Register pages handler with the router."""
router.callback_query.register(
- pages_handler, F.data.startswith("track:page")
+ pages_handler,
+ F.data.startswith("track:page"),
)
diff --git a/bot/handlers/search.py b/bot/handlers/search.py
index e6fa900..88c7161 100644
--- a/bot/handlers/search.py
+++ b/bot/handlers/search.py
@@ -1,42 +1,52 @@
"""Search handler for the bot."""
+
import logging
-from aiogram import types, Router, F
+
+from aiogram import F, Router, types
from aiogram.utils.i18n import gettext
-from service import Music, Track
+
+from bot.keyboards import inline
from database.crud import CRUD
from database.models import SearchHistory, User
-from bot.keyboards import inline
+from service import Music, Track
+# Constants
+MAX_KEYWORD_LENGTH = 100
logger = logging.getLogger(__name__)
async def update_search(
- user: User, keyword: str, tracks: list[Track]
+ user: User,
+ keyword: str,
+ tracks: list[Track],
) -> SearchHistory:
- """Updates the user in the database."""
+ """Update the user in the database."""
user_crud = CRUD(User)
await user_crud.update(user, search_queries=user.search_queries + 1)
search_history_crud = CRUD(SearchHistory)
- search = await search_history_crud.create(
+ return await search_history_crud.create(
user_id=user.id,
keyword=keyword,
- tracks=[track.__dict__ for track in tracks]
+ tracks=[track.__dict__ for track in tracks],
)
- return search
async def search_handler(message: types.Message, user: User) -> None:
- """Handles the search."""
+ """Handle the search."""
try:
+ if message.text is None:
+ await message.answer(gettext("search_query_error"))
+ return
+
keyword = message.text.strip()
- if not keyword or len(keyword) > 100:
+ if not keyword or len(keyword) > MAX_KEYWORD_LENGTH:
await message.answer(gettext("search_query_error"))
return
-
+
search_message = await message.answer(
- gettext("searching").format(keyword=keyword)
+ gettext("searching").format(keyword=keyword),
)
async with Music() as service:
tracks = await service.search(keyword)
@@ -44,41 +54,51 @@ async def search_handler(message: types.Message, user: User) -> None:
search = await update_search(user, keyword, tracks)
await search_message.edit_text(
gettext("search_result").format(keyword=keyword),
- reply_markup=inline.get_keyboard_of_tracks(tracks, search.id)
+ reply_markup=inline.get_keyboard_of_tracks(tracks, search.id),
)
- except Exception as e:
- logger.error("Failed to send message: %s", e)
+ except Exception:
+ logger.exception("Failed to send message")
async def get_track_list(list_type: str) -> list[Track]:
- """Gets the track list."""
+ """Get the track list."""
async with Music() as service:
map_list_type = {
"top_hits": service.get_top_hits,
- "new_hits": service.get_new_hits
}
return await map_list_type[list_type]()
async def track_lists_handler(
callback: types.CallbackQuery,
- user: User
+ user: User,
) -> None:
- """Handles the track lists."""
+ """Handle the track lists."""
+ if callback.data is None:
+ await callback.answer(gettext("invalid_data"))
+ return
+
_, _, list_type = callback.data.split(":")
tracks = await get_track_list(list_type)
search = await update_search(user, list_type, tracks)
+
+ if callback.message is None or isinstance(
+ callback.message,
+ types.InaccessibleMessage,
+ ):
+ await callback.answer(gettext("cannot_send_message"))
+ return
+
await callback.message.answer(
gettext(list_type),
- reply_markup=inline.get_keyboard_of_tracks(
- tracks, search.id
- )
+ reply_markup=inline.get_keyboard_of_tracks(tracks, search.id),
)
def register(router: Router) -> None:
- """Registers search handler with the router."""
+ """Register search handler with the router."""
router.message.register(search_handler)
router.callback_query.register(
- track_lists_handler, F.data.startswith("track:list:")
+ track_lists_handler,
+ F.data.startswith("track:list:"),
)
diff --git a/bot/handlers/subscribe.py b/bot/handlers/subscribe.py
index 1e00c78..243fc43 100644
--- a/bot/handlers/subscribe.py
+++ b/bot/handlers/subscribe.py
@@ -1,11 +1,16 @@
"""Subscription required handler for the bot."""
+
+from __future__ import annotations
+
import logging
-from aiogram import types, Router, F, Bot
+
+from aiogram import Bot, F, Router, types
from aiogram.utils.i18n import gettext
-from bot.keyboards import inline
+
from bot.filters import NotSubbedFilter
-from database.models import RequiredSubscriptions, User
+from bot.keyboards import inline
from database.crud import CRUD
+from database.models import RequiredSubscriptions, User
logger = logging.getLogger(__name__)
@@ -15,23 +20,40 @@ async def sub_required_handler(
) -> None:
"""Subscription required handler."""
try:
- required_chats = await CRUD(RequiredSubscriptions).get_all()
+ required_chats: list[RequiredSubscriptions] = await CRUD(
+ RequiredSubscriptions,
+ ).get_all()
text = gettext("not_subscribed")
keyboard = inline.get_subscribe_keyboard(gettext, required_chats)
if isinstance(event, types.Message):
await event.answer(text, reply_markup=keyboard)
else:
+ if event.message is None or isinstance(
+ event.message,
+ types.InaccessibleMessage,
+ ):
+ await event.answer(gettext("cannot_send_message"))
+ return
+
await event.message.answer(text, reply_markup=keyboard)
- except Exception as e:
- logger.error("Failed to send message: %s", e)
+ except Exception:
+ logger.exception("Failed to send message")
async def sub_check_handler(
- callback: types.CallbackQuery, user: User, bot: Bot,
+ callback: types.CallbackQuery,
+ user: User,
+ bot: Bot,
) -> None:
"""Subscription check handler."""
sub_check = NotSubbedFilter()
+ if callback.message is None or isinstance(
+ callback.message,
+ types.InaccessibleMessage,
+ ):
+ await callback.answer(gettext("cannot_send_message"))
+ return
if await sub_check(callback, user, bot):
await callback.message.answer(gettext("not_subscribed"))
@@ -40,7 +62,7 @@ async def sub_check_handler(
def register(router: Router) -> None:
- """Registers FAQ handler with the router."""
+ """Register FAQ handler with the router."""
router.callback_query.register(sub_check_handler, F.data == "sub_check")
router.callback_query.register(sub_required_handler, NotSubbedFilter())
router.message.register(sub_required_handler, NotSubbedFilter())
diff --git a/bot/keyboards/__init__.py b/bot/keyboards/__init__.py
index e69de29..a5cbae6 100644
--- a/bot/keyboards/__init__.py
+++ b/bot/keyboards/__init__.py
@@ -0,0 +1 @@
+"""Keyboards package."""
diff --git a/bot/keyboards/command.py b/bot/keyboards/command.py
index 19b22ab..12a4c20 100644
--- a/bot/keyboards/command.py
+++ b/bot/keyboards/command.py
@@ -1,5 +1,7 @@
"""Command keyboard for the bot."""
+
from typing import Callable
+
from aiogram.types import BotCommand
@@ -7,5 +9,8 @@ def get_commands(gettext: Callable[[str], str]) -> list[BotCommand]:
"""Get commands for the bot."""
return [
BotCommand(command="menu", description=gettext("menu_command")),
- BotCommand(command="language", description=gettext("language_command"))
+ BotCommand(
+ command="language",
+ description=gettext("language_command"),
+ ),
]
diff --git a/bot/keyboards/inline.py b/bot/keyboards/inline.py
index fbc05e7..1919e8c 100644
--- a/bot/keyboards/inline.py
+++ b/bot/keyboards/inline.py
@@ -1,67 +1,94 @@
"""Inline keyboard templates."""
+
+from enum import Enum, auto
from typing import Callable
-from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
-from service.data import Track
-from locales import support_languages
+from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
+
from database.models import RequiredSubscriptions
+from locales import support_languages
+from service.data import Track
+
+
+class NavigationDirection(Enum):
+ """Navigation direction for pagination."""
+
+ PREVIOUS = auto()
+ NEXT = auto()
def get_keyboard_of_tracks(
tracks: list[Track],
search_id: int,
- page: int = 0
+ page: int = 0,
) -> InlineKeyboardMarkup:
"""Create paginated inline keyboard for track selection."""
- TRACKS_PER_PAGE = 10
+ tracks_per_page = 10
- total_pages = max((len(tracks) - 1) // TRACKS_PER_PAGE, 0)
+ total_pages = max((len(tracks) - 1) // tracks_per_page, 0)
page = min(max(0, page), total_pages)
- start_indx = page * TRACKS_PER_PAGE
- end_indx = (page + 1) * TRACKS_PER_PAGE
+ start_indx = page * tracks_per_page
+ end_indx = (page + 1) * tracks_per_page
current_page = tracks[start_indx:end_indx]
keyboard = [
[
InlineKeyboardButton(
text=f"{track.performer} - {track.title}",
- callback_data=f"track:get:{search_id}:{track.index}"
- )
- ] for track in current_page
+ callback_data=f"track:get:{search_id}:{track.index}",
+ ),
+ ]
+ for track in current_page
]
if total_pages > 0:
- def create_navigation_button(is_next: bool) -> InlineKeyboardButton:
+
+ def create_navigation_button(
+ direction: NavigationDirection,
+ ) -> InlineKeyboardButton:
"""Create navigation button (prev/next) based on current page."""
+ is_next = direction == NavigationDirection.NEXT
is_available = page < total_pages if is_next else page > 0
return InlineKeyboardButton(
text="▶️"
- if is_next and is_available else "⏺️"
- if is_next and not is_available else "◀️"
- if not is_next and is_available else "⏺️",
+ if is_next and is_available
+ else "⏺️"
+ if is_next and not is_available
+ else "◀️"
+ if not is_next and is_available
+ else "⏺️",
callback_data=(
f"track:page:{search_id}:"
f"{page + 1 if is_next else page - 1}"
- if is_available else "track:noop"
- )
+ if is_available
+ else "track:noop"
+ ),
)
- keyboard.append([
- create_navigation_button(is_next=False),
- InlineKeyboardButton(
- text=f"{page + 1}/{total_pages + 1}",
- callback_data="track:noop"
- ),
- create_navigation_button(is_next=True),
- ])
- keyboard.append([
- InlineKeyboardButton(
- text="🔽",
- callback_data=f"track:all:{search_id}:{start_indx}:{end_indx}"
- )
- ])
+ keyboard.append(
+ [
+ create_navigation_button(
+ direction=NavigationDirection.PREVIOUS,
+ ),
+ InlineKeyboardButton(
+ text=f"{page + 1}/{total_pages + 1}",
+ callback_data="track:noop",
+ ),
+ create_navigation_button(direction=NavigationDirection.NEXT),
+ ],
+ )
+ keyboard.append(
+ [
+ InlineKeyboardButton(
+ text="🔽",
+ callback_data=(
+ f"track:all:{search_id}:{start_indx}:{end_indx}"
+ ),
+ ),
+ ],
+ )
return InlineKeyboardMarkup(inline_keyboard=keyboard)
@@ -70,10 +97,11 @@ def create_navigation_button(is_next: bool) -> InlineKeyboardButton:
[
InlineKeyboardButton(
text=language.name,
- callback_data=f"language:set:{language.code}"
- ) for language in support_languages.languages
- ]
- ]
+ callback_data=f"language:set:{language.code}",
+ )
+ for language in support_languages.languages
+ ],
+ ],
)
@@ -84,22 +112,20 @@ def get_menu_keyboard(gettext: Callable[[str], str]) -> InlineKeyboardMarkup:
[
InlineKeyboardButton(
text=gettext("top_hits_button"),
- callback_data="track:list:top_hits"
+ callback_data="track:list:top_hits",
),
- InlineKeyboardButton(
- text=gettext("new_hits_button"),
- callback_data="track:list:new_hits"
- )
],
[
InlineKeyboardButton(
- text=gettext("faq_button"), callback_data="faq"
+ text=gettext("faq_button"),
+ callback_data="faq",
),
InlineKeyboardButton(
- text=gettext("language_button"), callback_data="language"
- )
+ text=gettext("language_button"),
+ callback_data="language",
+ ),
],
- ]
+ ],
)
@@ -109,33 +135,37 @@ def get_subscribe_keyboard(
) -> InlineKeyboardMarkup:
"""Get subscribe keyboard."""
chats = [
- [
- InlineKeyboardButton(
- text=f"➕ {sub.chat_title}", url=sub.chat_link
- )
- ]
+ [
+ InlineKeyboardButton(
+ text=f"➕ {sub.chat_title}", # noqa: RUF001
+ url=sub.chat_link,
+ ),
+ ]
for sub in sub_required
]
- chats.append([
- InlineKeyboardButton(
- text=gettext("sub_check_button"), callback_data="sub_check"
- )
- ])
- return InlineKeyboardMarkup(
- inline_keyboard=chats
+ chats.append(
+ [
+ InlineKeyboardButton(
+ text=gettext("sub_check_button"),
+ callback_data="sub_check",
+ ),
+ ],
)
+ return InlineKeyboardMarkup(inline_keyboard=chats)
def get_back_keyboard(
- gettext: Callable[[str], str], callback_data: str
+ gettext: Callable[[str], str],
+ callback_data: str,
) -> InlineKeyboardMarkup:
"""Get back keyboard."""
return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
- text=gettext("back_button"), callback_data=callback_data
- )
- ]
- ]
+ text=gettext("back_button"),
+ callback_data=callback_data,
+ ),
+ ],
+ ],
)
diff --git a/bot/middlewares/__init__.py b/bot/middlewares/__init__.py
index bc2ee51..e0c007f 100644
--- a/bot/middlewares/__init__.py
+++ b/bot/middlewares/__init__.py
@@ -1,11 +1,13 @@
"""Middlewares for the bot."""
+
from aiogram import Dispatcher
from aiogram.utils.i18n import I18n
+
from .auth_middleware import AuthMiddleware
from .i18n_middleware import I18nMiddleware
def setup(dp: Dispatcher, i18n: I18n) -> None:
- """Setup middleware"""
+ """Set up middleware."""
dp.update.outer_middleware(AuthMiddleware())
dp.update.outer_middleware(I18nMiddleware(i18n))
diff --git a/bot/middlewares/auth_middleware.py b/bot/middlewares/auth_middleware.py
index d271cfb..303320d 100644
--- a/bot/middlewares/auth_middleware.py
+++ b/bot/middlewares/auth_middleware.py
@@ -1,15 +1,16 @@
"""Auth middleware module for the bot."""
+
import logging
-from typing import Any, Awaitable, Callable, Dict
+from collections.abc import Awaitable
+from datetime import datetime
+from typing import Any, Callable
from aiogram import BaseMiddleware
-from aiogram.types import Update
-from sqlalchemy import func
+from aiogram.types import TelegramObject
from database.crud import CRUD
from database.models import User
-
logger = logging.getLogger(__name__)
@@ -18,58 +19,56 @@ class AuthMiddleware(BaseMiddleware):
async def __call__(
self,
- handler: Callable[[Update, Dict[str, Any]], Awaitable[Any]],
- event: Update,
- data: Dict[str, Any]
- ) -> Any:
- """Intercepts incoming updates, processes them and calls the next."""
- data['user'] = await self.ensure_user_in_db(data['event_from_user'])
+ handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
+ event: TelegramObject,
+ data: dict[str, Any],
+ ) -> Awaitable[Any]:
+ """Intercept incoming updates, process them and call the next."""
+ data["user"] = await self.ensure_user_in_db(data["event_from_user"])
return await handler(event, data)
async def ensure_user_in_db(self, user: User) -> User:
- """Ensures that the user is registered in the database."""
+ """Ensure that the user is registered in the database."""
user_crud = self._get_user_crud()
user_data = self._prepare_user_data(user)
try:
- return await self._get_or_create_user(
- user, user_crud, user_data
- )
+ return await self._get_or_create_user(user, user_crud, user_data)
- except Exception as e:
- logger.error("Failed to process user %s: %s", user.id, str(e))
+ except Exception:
+ logger.exception("Failed to process user %s", user.id)
raise
def _get_user_crud(self) -> CRUD:
- """Creates CRUD instance for User model."""
+ """Create CRUD instance for User model."""
return CRUD(User)
@staticmethod
- def _prepare_user_data(user: User) -> Dict[str, Any]:
- """Prepares user data for database operations."""
+ def _prepare_user_data(user: User) -> dict[str, Any]:
+ """Prepare user data for database operations."""
return {
- 'id': user.id,
- 'username': user.username,
- 'first_name': user.first_name,
- 'last_name': user.last_name,
+ "id": user.id,
+ "username": user.username,
+ "first_name": user.first_name,
+ "last_name": user.last_name,
}
@staticmethod
async def _get_or_create_user(
user: User,
user_crud: CRUD,
- user_data: Dict[str, Any]
+ user_data: dict[str, Any],
) -> User:
- """Gets existing user or creates new one."""
- db_user = await user_crud.get(id=user.id)
+ """Get existing user or create new one."""
+ db_user: User | None = await user_crud.get(id=user.id)
if not db_user:
- user_data['language_code'] = user.language_code
+ user_data["language_code"] = user.language_code
db_user = await user_crud.create(**user_data)
logger.info("User %s registered in the database.", user.id)
else:
- user_data['updated_at'] = func.now()
+ user_data["updated_at"] = datetime.now() # noqa: DTZ005
db_user = await user_crud.update(db_user, **user_data)
logger.info("User %s updated in the database.", user.id)
diff --git a/bot/middlewares/i18n_middleware.py b/bot/middlewares/i18n_middleware.py
index 41e6b8a..0459112 100644
--- a/bot/middlewares/i18n_middleware.py
+++ b/bot/middlewares/i18n_middleware.py
@@ -1,13 +1,22 @@
"""Custom i18n middleware (language selection)."""
-from aiogram.types import Message
+
+from typing import TYPE_CHECKING
+
+from aiogram.types import TelegramObject
from aiogram.utils.i18n.middleware import I18nMiddleware as BaseI18nMiddleware
-from database.models import User
+
+if TYPE_CHECKING:
+ from database.models import User
class I18nMiddleware(BaseI18nMiddleware):
"""Custom i18n middleware for the bot."""
- async def get_locale(self, event: Message, data: dict) -> str:
+ async def get_locale(
+ self,
+ event: TelegramObject, # noqa: ARG002
+ data: dict,
+ ) -> str:
"""Get user locale."""
- user: User = data['user']
- return user.language_code
+ user: User = data["user"]
+ return user.language_code or "en"
diff --git a/bot/utils.py b/bot/utils.py
index e636bb2..a938477 100644
--- a/bot/utils.py
+++ b/bot/utils.py
@@ -1,4 +1,5 @@
"""Utils for the bot."""
+
from database.crud import CRUD
from database.models import SearchHistory
from service.data import Track
@@ -7,5 +8,11 @@
async def load_tracks_from_db(search_id: int) -> list[Track]:
"""Get tracks from the database."""
search_history_crud = CRUD(SearchHistory)
- search: SearchHistory = await search_history_crud.get(id=int(search_id))
- return [Track(**track) for track in search.tracks]
+ search: SearchHistory | None = await search_history_crud.get(
+ id=int(search_id),
+ )
+
+ if not search:
+ return []
+
+ return [Track.from_dict(track) for track in search.tracks]
diff --git a/configs.py b/configs.py
index 78ae17c..f85c0fe 100644
--- a/configs.py
+++ b/configs.py
@@ -1,26 +1,43 @@
"""Configurations for the app."""
+
+from __future__ import annotations
+
import os
import time
from dataclasses import dataclass
-os.environ['TZ'] = os.getenv('TIMEZONE', 'UTC')
+os.environ["TZ"] = os.getenv("TIMEZONE", "UTC")
time.tzset()
@dataclass
class BotConfig:
"""Configuration class for the bot."""
- token: str = os.getenv('BOT_TOKEN')
+
+ token: str | None = os.getenv("BOT_TOKEN")
+
+ def __post_init__(self) -> None:
+ """Post-init method for the bot configuration."""
+ if not self.token:
+ msg = "Bot token is not set"
+ raise ValueError(msg)
@dataclass
class DBConfig:
"""Configuration class for the database."""
- host: str = os.getenv('POSTGRES_HOST', 'db')
- port: str = os.getenv('POSTGRES_PORT', '5432')
- user: str = os.getenv('POSTGRES_USER')
- password: str = os.getenv('POSTGRES_PASSWORD')
- db: str = os.getenv('POSTGRES_DB')
+
+ host: str = os.getenv("POSTGRES_HOST", "db")
+ port: str = os.getenv("POSTGRES_PORT", "5432")
+ user: str | None = os.getenv("POSTGRES_USER")
+ password: str | None = os.getenv("POSTGRES_PASSWORD")
+ db: str | None = os.getenv("POSTGRES_DB")
+
+ def __post_init__(self) -> None:
+ """Post-init method for the database configuration."""
+ if not self.user or not self.password or not self.db:
+ msg = "Database configuration is incomplete"
+ raise ValueError(msg)
@property
def url(self) -> str:
diff --git a/database/__init__.py b/database/__init__.py
index e69de29..cdce083 100644
--- a/database/__init__.py
+++ b/database/__init__.py
@@ -0,0 +1 @@
+"""Database package."""
diff --git a/database/crud.py b/database/crud.py
index e01eb4b..c93b449 100644
--- a/database/crud.py
+++ b/database/crud.py
@@ -1,27 +1,36 @@
-"""CRUD operations."""
+"""Database CRUD operations."""
+
+from __future__ import annotations
+
import logging
-from typing import AsyncGenerator, Type, TypeVar
from contextlib import asynccontextmanager
+from typing import TYPE_CHECKING, Any, Generic, TypeVar
-from sqlalchemy.exc import SQLAlchemyError
-from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
+from sqlalchemy.exc import SQLAlchemyError
+
from .engine import async_session_factory
+if TYPE_CHECKING:
+ from collections.abc import AsyncGenerator
+
+ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
-T = TypeVar('T')
+
+T = TypeVar("T")
logger = logging.getLogger(__name__)
-class CRUD:
+class CRUD(Generic[T]):
"""Generic class to handle CRUD operations for any model."""
def __init__(
self,
- model: Type[T],
- session_factory: sessionmaker = async_session_factory
+ model: type[T],
+ session_factory: async_sessionmaker[
+ AsyncSession
+ ] = async_session_factory,
) -> None:
"""Initialize the CRUD class."""
self.model = model
@@ -33,72 +42,74 @@ async def get_session(self) -> AsyncGenerator[AsyncSession, None]:
session = self.session_factory()
try:
yield session
- except Exception as e:
+ except Exception:
await session.rollback()
- logger.error("Failed to get session: %s", e)
+ logger.exception("Failed to get session")
raise
finally:
await session.close()
- async def create(self, **kwargs) -> T:
+ async def create(self, **kwargs: Any) -> T: # noqa: ANN401
"""Create a new record in the database."""
async with self.get_session() as session:
- instance = self.model(**kwargs)
- session.add(instance)
try:
+ instance = self.model(**kwargs)
+ session.add(instance)
await session.commit()
await session.refresh(instance)
- return instance
- except SQLAlchemyError as e:
+ except SQLAlchemyError:
await session.rollback()
- logger.error(
- f"Failed to create {self.model.__name__}: {e}"
+ logger.exception(
+ "Failed to create %s",
+ self.model.__name__,
)
raise
+ return instance
- async def get(self, **kwargs) -> T:
+ async def get(self, **kwargs: Any) -> T | None: # noqa: ANN401
"""Retrieve a record by any field."""
async with self.get_session() as session:
query = await session.execute(
- select(self.model).filter_by(**kwargs)
+ select(self.model).filter_by(**kwargs),
)
- instance = query.scalar_one_or_none()
- return instance
+ return query.scalar_one_or_none()
async def get_all(self) -> list[T]:
"""Get all records."""
async with self.get_session() as session:
query = await session.execute(select(self.model))
- return query.scalars().all()
+ return list(query.scalars().all())
- async def update(self, instance: T, **kwargs) -> T:
+ async def update(self, instance: T, **kwargs: Any) -> T: # noqa: ANN401
"""Update a record's information."""
async with self.get_session() as session:
- instance = await session.merge(instance)
- for key, value in kwargs.items():
- setattr(instance, key, value)
try:
+ for key, value in kwargs.items():
+ setattr(instance, key, value)
+ session.add(instance)
await session.commit()
await session.refresh(instance)
- return instance
- except SQLAlchemyError as e:
+ except SQLAlchemyError:
await session.rollback()
- logger.error(
- f"Failed to update {self.model.__name__}: {e}"
+ logger.exception(
+ "Failed to update %s",
+ self.model.__name__,
)
raise
+ return instance
+
async def delete(self, instance: T) -> bool:
- """Delete a record."""
+ """Delete a record from the database."""
async with self.get_session() as session:
- instance = await session.merge(instance)
try:
await session.delete(instance)
await session.commit()
- return True
- except SQLAlchemyError as e:
+ except SQLAlchemyError:
await session.rollback()
- logger.error(
- f"Failed to delete {self.model.__name__}: {e}"
+ logger.exception(
+ "Failed to delete %s",
+ self.model.__name__,
)
raise
+ return True
diff --git a/database/engine.py b/database/engine.py
index ef63bcd..ccc4c93 100644
--- a/database/engine.py
+++ b/database/engine.py
@@ -1,18 +1,23 @@
"""Database engine."""
+
import logging
+
+from sqlalchemy.ext.asyncio import (
+ AsyncSession,
+ async_sessionmaker,
+ create_async_engine,
+)
from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
-from sqlalchemy.orm import sessionmaker
-from configs import db_config
+from configs import db_config
logger = logging.getLogger(__name__)
engine = create_async_engine(db_config.url, future=True)
Base = declarative_base()
-async_session_factory = sessionmaker(
- bind=engine,
+async_session_factory = async_sessionmaker(
+ engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
diff --git a/database/models/__init__.py b/database/models/__init__.py
index 6e9562f..c8a6984 100644
--- a/database/models/__init__.py
+++ b/database/models/__init__.py
@@ -1,6 +1,7 @@
"""Database models."""
-from .user import User
-from .search_history import SearchHistory
+
from .required_subs import RequiredSubscriptions
+from .search_history import SearchHistory
+from .user import User
-__all__ = ['User', 'SearchHistory', 'RequiredSubscriptions']
+__all__ = ["RequiredSubscriptions", "SearchHistory", "User"]
diff --git a/database/models/required_subs.py b/database/models/required_subs.py
index 483b371..e8a6a78 100644
--- a/database/models/required_subs.py
+++ b/database/models/required_subs.py
@@ -1,6 +1,11 @@
"""Required subscriptions database model."""
-from sqlalchemy import Column, String, DateTime, Integer, BigInteger, func
-from ..engine import Base
+
+from datetime import datetime
+
+from sqlalchemy import BigInteger, DateTime, Integer, String
+from sqlalchemy.orm import Mapped, mapped_column
+
+from database.engine import Base
class RequiredSubscriptions(Base):
@@ -8,8 +13,17 @@ class RequiredSubscriptions(Base):
__tablename__ = "required_subscriptions"
- id = Column(Integer, primary_key=True, autoincrement=True)
- chat_id = Column(BigInteger)
- chat_title = Column(String)
- chat_link = Column(String)
- created_at = Column(DateTime, default=func.now())
+ id: Mapped[int] = mapped_column(
+ Integer,
+ primary_key=True,
+ autoincrement=True,
+ )
+
+ chat_id: Mapped[int] = mapped_column(BigInteger)
+ chat_title: Mapped[str] = mapped_column(String)
+ chat_link: Mapped[str] = mapped_column(String)
+
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime,
+ default=datetime.now,
+ )
diff --git a/database/models/search_history.py b/database/models/search_history.py
index f96b2fa..66575b5 100644
--- a/database/models/search_history.py
+++ b/database/models/search_history.py
@@ -1,9 +1,20 @@
"""Search history database model."""
-from sqlalchemy.dialects.postgresql import JSONB
+
+from __future__ import annotations
+
+from datetime import datetime
+
from sqlalchemy import (
- Column, String, DateTime, Integer, BigInteger, ForeignKey, func
+ BigInteger,
+ DateTime,
+ ForeignKey,
+ Integer,
+ String,
)
-from ..engine import Base
+from sqlalchemy.dialects.postgresql import JSONB
+from sqlalchemy.orm import Mapped, mapped_column
+
+from database.engine import Base
class SearchHistory(Base):
@@ -11,8 +22,17 @@ class SearchHistory(Base):
__tablename__ = "search_history"
- id = Column(Integer, primary_key=True, autoincrement=True)
- user_id = Column(BigInteger, ForeignKey('users.id'))
- keyword = Column(String, default=None)
- tracks = Column(JSONB, default=[])
- created_at = Column(DateTime, default=func.now())
+ id: Mapped[int] = mapped_column(
+ Integer,
+ primary_key=True,
+ autoincrement=True,
+ )
+
+ user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("users.id"))
+ keyword: Mapped[str | None] = mapped_column(String, default=None)
+ tracks: Mapped[list[dict[str, str]]] = mapped_column(JSONB, default=[])
+
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime,
+ default=datetime.now,
+ )
diff --git a/database/models/user.py b/database/models/user.py
index dd860c7..bfb0899 100644
--- a/database/models/user.py
+++ b/database/models/user.py
@@ -1,18 +1,38 @@
"""User database model."""
-from sqlalchemy import Column, String, BigInteger, DateTime, Integer, func
-from ..engine import Base
+
+from __future__ import annotations
+
+from datetime import datetime
+
+from sqlalchemy import BigInteger, DateTime, Integer, String
+from sqlalchemy.orm import Mapped, mapped_column
+
+from database.engine import Base
class User(Base):
"""User model."""
- __tablename__ = 'users'
-
- id = Column(BigInteger, primary_key=True, autoincrement=True)
- username = Column(String, default=None)
- first_name = Column(String, default=None)
- last_name = Column(String, default=None)
- language_code = Column(String, default=None)
- state = Column(String, default=None)
- search_queries = Column(Integer, default=0)
- created_at = Column(DateTime, default=func.now())
- updated_at = Column(DateTime, default=func.now())
+
+ __tablename__ = "users"
+
+ id: Mapped[int] = mapped_column(
+ BigInteger,
+ primary_key=True,
+ autoincrement=True,
+ )
+ username: Mapped[str | None] = mapped_column(String, default=None)
+ first_name: Mapped[str | None] = mapped_column(String, default=None)
+ last_name: Mapped[str | None] = mapped_column(String, default=None)
+ language_code: Mapped[str | None] = mapped_column(String, default=None)
+ state: Mapped[str | None] = mapped_column(String, default=None)
+
+ search_queries: Mapped[int] = mapped_column(Integer, default=0)
+
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime,
+ default=datetime.now,
+ )
+ updated_at: Mapped[datetime] = mapped_column(
+ DateTime,
+ default=datetime.now,
+ )
diff --git a/locales/__init__.py b/locales/__init__.py
index 0d7a1dd..71face3 100644
--- a/locales/__init__.py
+++ b/locales/__init__.py
@@ -1,5 +1,5 @@
"""Locales module."""
-from ._support_languages import support_languages
+from ._support_languages import support_languages
-__all__ = ['support_languages']
+__all__ = ["support_languages"]
diff --git a/locales/_support_languages.py b/locales/_support_languages.py
index 2f32bcc..a0b5b2a 100644
--- a/locales/_support_languages.py
+++ b/locales/_support_languages.py
@@ -1,10 +1,14 @@
"""Supported languages."""
+
+from __future__ import annotations
+
from dataclasses import dataclass
@dataclass
class Language:
"""Supported language."""
+
code: str
name: str
@@ -12,16 +16,19 @@ class Language:
@dataclass
class LanguageList:
"""Supported languages."""
+
languages: list[Language]
- def is_supported(self, code: str) -> bool:
+ def is_supported(self, code: str | None) -> bool:
"""Check if the language is supported."""
+ if code is None:
+ return False
return any(language.code == code for language in self.languages)
support_languages: LanguageList = LanguageList(
languages=[
- Language('en', '🇬🇧 English'),
- Language('ru', '🇷🇺 Русский')
- ]
+ Language("en", "🇬🇧 English"),
+ Language("ru", "🇷🇺 Русский"),
+ ],
)
diff --git a/locales/en/LC_MESSAGES/messages.po b/locales/en/LC_MESSAGES/messages.po
index 1b22ecf..9b713b6 100644
--- a/locales/en/LC_MESSAGES/messages.po
+++ b/locales/en/LC_MESSAGES/messages.po
@@ -56,12 +56,6 @@ msgstr "🔥 Top"
msgid "top_hits"
msgstr "🔥 Popular music"
-msgid "new_hits_button"
-msgstr "🆕 New tracks"
-
-msgid "new_hits"
-msgstr "🆕 New tracks"
-
msgid "searching"
msgstr "🔍 Searching: {keyword}"
@@ -88,3 +82,18 @@ msgstr "❗️ To use the bot, you need to subscribe to the channels."
msgid "sub_check_button"
msgstr "✅ I'm subscribed"
+
+msgid "cannot_edit_message"
+msgstr "❌ Failed to edit message."
+
+msgid "cannot_send_message"
+msgstr "❌ Failed to send message."
+
+msgid "invalid_data"
+msgstr "❌ Invalid data."
+
+msgid "cannot_access_chat"
+msgstr "❌ Failed to access chat."
+
+msgid "error_occurred"
+msgstr "❌ Error occurred."
\ No newline at end of file
diff --git a/locales/ru/LC_MESSAGES/messages.po b/locales/ru/LC_MESSAGES/messages.po
index 8aa3ce2..5bfebe0 100644
--- a/locales/ru/LC_MESSAGES/messages.po
+++ b/locales/ru/LC_MESSAGES/messages.po
@@ -57,12 +57,6 @@ msgstr "🔥 Топ песен"
msgid "top_hits"
msgstr "🔥 Популярная музыка"
-msgid "new_hits_button"
-msgstr "🆕 Новинки"
-
-msgid "new_hits"
-msgstr "🆕 Новинки музыки"
-
msgid "searching"
msgstr "🔍 Поиск: {keyword}"
@@ -89,3 +83,18 @@ msgstr "❗️ Для использования бота необходимо
msgid "sub_check_button"
msgstr "✅ Я подписан"
+
+msgid "cannot_edit_message"
+msgstr "❌ Не удалось отредактировать сообщение."
+
+msgid "cannot_send_message"
+msgstr "❌ Не удалось отправить сообщение."
+
+msgid "invalid_data"
+msgstr "❌ Некорректные данные."
+
+msgid "cannot_access_chat"
+msgstr "❌ Не удалось получить доступ к чату."
+
+msgid "error_occurred"
+msgstr "❌ Произошла ошибка."
\ No newline at end of file
diff --git a/main.py b/main.py
index f71e921..5d13a0c 100644
--- a/main.py
+++ b/main.py
@@ -1,60 +1,61 @@
"""Entry point of the bot application."""
-import logging
+
import asyncio
+import logging
from aiogram import Bot, Dispatcher
from aiogram.client.bot import DefaultBotProperties
-from aiogram.utils.token import TokenValidationError
-from aiogram.utils.i18n import I18n
+from aiogram.enums import ParseMode
from aiogram.fsm.storage.memory import MemoryStorage
+from aiogram.utils.i18n import I18n
+from aiogram.utils.token import TokenValidationError
from bot import handlers, middlewares
-from database.engine import init_db
from configs import bot_config
+from database.engine import init_db
logging.basicConfig(
level=logging.INFO,
- format='%(asctime)s - %(levelname)s:%(name)s - %(message)s'
+ format="%(asctime)s - %(levelname)s:%(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
async def create_bot() -> Bot:
- """
- Create and return a Bot instance."""
+ """Create and return a Bot instance."""
try:
bot = Bot(
- token=bot_config.token,
- default=DefaultBotProperties(parse_mode='HTML')
+ token=bot_config.token or "",
+ default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)
- logger.info('Successfully created bot instance.')
- return bot
- except TokenValidationError as e:
- logger.error('Invalid token provided: %s', bot_config.token)
- raise e
- except Exception as e:
- logger.error('Failed to create bot instance: %s', str(e))
- raise e
+ logger.info("Successfully created bot instance.")
+
+ except TokenValidationError:
+ logger.exception("Invalid token provided: %s", bot_config.token)
+ raise
+
+ return bot
async def main() -> None:
- """The entry point of the bot application."""
+ """Start the bot application."""
bot = await create_bot()
storage = MemoryStorage()
dp = Dispatcher(storage=storage)
- i18n = I18n(path='locales', domain='messages')
- logger.info('Successfully created dispatcher, i18n and storage instance.')
+ i18n = I18n(path="locales", domain="messages")
+ logger.info("Successfully created dispatcher, i18n and storage instance.")
middlewares.setup(dp, i18n)
- logger.info('Successfully set up middleware.')
+ logger.info("Successfully set up middleware.")
handlers.setup(dp)
- logger.info('Successfully set up handlers.')
+ logger.info("Successfully set up handlers.")
await init_db()
await dp.start_polling(bot)
-if __name__ == '__main__':
- logger.info('Starting application...')
+
+if __name__ == "__main__":
+ logger.info("Starting application...")
asyncio.run(main())
diff --git a/service/__init__.py b/service/__init__.py
index f629b2a..d45606f 100644
--- a/service/__init__.py
+++ b/service/__init__.py
@@ -1,5 +1,6 @@
"""Service module."""
+
from .core import Music
from .data import Track
-__all__ = ['Music', 'Track']
+__all__ = ["Music", "Track"]
diff --git a/service/core.py b/service/core.py
index ece45ae..26cf15a 100644
--- a/service/core.py
+++ b/service/core.py
@@ -1,42 +1,52 @@
"""Music service core module for downloading and searching music."""
+
+from __future__ import annotations
+
import logging
-import urllib.parse
-from typing import Optional
+from typing import TYPE_CHECKING
import aiohttp
-from bs4 import BeautifulSoup
+from aiohttp import ClientTimeout
+from bs4 import BeautifulSoup, Tag
+from typing_extensions import Self
-from .data import Track, ServiceConfig
+from .data import ServiceConfig, Track
from .exceptions import MusicServiceError
+if TYPE_CHECKING:
+ from types import TracebackType
+
logger = logging.getLogger(__name__)
class Music:
"""Service for searching and downloading music."""
- BASE_URL = "https://mp3wr.com"
- TRACK_DOWNLOAD_URL = "https://cdn.mp3wr.com"
- def __init__(self, config: Optional[ServiceConfig] = None) -> None:
+ BASE_URL = "vuxo7.com"
+
+ def __init__(self, config: ServiceConfig | None = None) -> None:
"""Initialize music service with optional configuration."""
self._config = config or ServiceConfig()
- self._session: Optional[aiohttp.ClientSession] = None
+ self._session: aiohttp.ClientSession | None = None
- async def __aenter__(self) -> 'Music':
+ async def __aenter__(self) -> Self:
"""Context manager entry point."""
await self.connect()
return self
- async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
"""Context manager exit point."""
await self.disconnect()
async def connect(self) -> None:
"""Initialize HTTP session."""
if self._session is None:
- self._session = aiohttp.ClientSession(
- headers=self._config.headers
- )
+ self._session = aiohttp.ClientSession(headers=self._config.headers)
async def disconnect(self) -> None:
"""Close HTTP session."""
@@ -47,87 +57,99 @@ async def disconnect(self) -> None:
async def search(self, keyword: str) -> list[Track]:
"""Search for music by keyword."""
if not self._session:
- await self.connect()
+ msg = "Failed to initialize session"
+ raise MusicServiceError(msg)
- url = urllib.parse.urljoin(self.BASE_URL, f"search/{keyword}")
+ url = self.build_search_query(keyword)
logger.info("Searching music with keyword: %s", keyword)
- return await self._parse_tracks(url, is_search=True)
+ return await self._parse_tracks(url)
async def get_top_hits(self) -> list[Track]:
"""Get top tracks."""
- url = urllib.parse.urljoin(self.BASE_URL, "besthit")
- return await self._parse_tracks(url)
-
- async def get_new_hits(self) -> list[Track]:
- """Get new hits."""
- url = urllib.parse.urljoin(self.BASE_URL, "newhit")
- return await self._parse_tracks(url)
+ return await self._parse_tracks(f"https://{self.BASE_URL}")
- async def _parse_tracks(
- self, url: str, is_search: bool = False
- ) -> list[Track]:
+ async def _parse_tracks(self, url: str) -> list[Track]:
"""Parse tracks from the given URL."""
try:
+ if not self._session:
+ msg = "Failed to initialize session"
+ raise MusicServiceError(msg)
+
async with self._session.get(
- url, timeout=self._config.timeout
+ url,
+ timeout=ClientTimeout(total=self._config.timeout),
) as response:
response.raise_for_status()
soup = BeautifulSoup(await response.text(), "html.parser")
+ playlist = soup.find("ul", class_="playlist")
+
+ if not isinstance(playlist, Tag):
+ msg = "Could not find playlist element"
+ raise TypeError(msg)
+
tracks = [
- Track.from_element(track_data, index, is_search)
+ Track.from_element(track_data, index)
for index, track_data in enumerate(
- soup.find_all("item") if is_search
- else soup.find_all("li", class_="sarki-liste")
+ playlist.find_all("li"),
)
]
logger.info("Found %d tracks", len(tracks))
- return tracks
except (aiohttp.ClientError, TimeoutError) as e:
- raise MusicServiceError(f"Failed to search music: {str(e)}") from e
+ msg = f"Failed to search music: {e!s}"
+ raise MusicServiceError(msg) from e
+
+ return tracks
+
+ def _raise_file_too_large_error(self, content_length: int) -> None:
+ """Raise an error for files that are too large."""
+ msg = f"File too large: {content_length} bytes"
+ raise MusicServiceError(msg)
async def _download_data(
- self, url: str, resource_type: str, track_name: str
+ self,
+ url: str,
+ resource_type: str,
+ track_name: str,
) -> bytes:
- """Generic method for downloading data."""
- MAX_SIZE = 50 * 1024 * 1024 # 50MB
+ """Download data."""
+ max_size = 50 * 1024 * 1024 # 50MB
if not self._session:
- await self.connect()
+ msg = "Failed to initialize session"
+ raise MusicServiceError(msg)
logger.info("Downloading %s for track: %s", resource_type, track_name)
try:
async with self._session.get(
url,
- timeout=self._config.timeout
+ timeout=ClientTimeout(total=self._config.timeout),
) as response:
response.raise_for_status()
content_length = response.content_length
- if content_length and content_length > MAX_SIZE:
- raise MusicServiceError(
- f"File too large: {content_length} bytes"
- )
+ if content_length and content_length > max_size:
+ self._raise_file_too_large_error(content_length)
return await response.read()
- except Exception as e:
- raise MusicServiceError(
- f"Failed to download {resource_type}: {str(e)}"
- ) from e
+
+ except (aiohttp.ClientError, TimeoutError) as e:
+ msg = f"Failed to download {resource_type}"
+ raise MusicServiceError(msg) from e
async def get_audio_bytes(self, track: Track) -> bytes:
"""Download music file."""
- url = track.audio_url
- if url.startswith("/"):
- url = urllib.parse.urljoin(self.BASE_URL, track.audio_url)
- return await self._download_data(url, "audio", track.name)
-
- async def get_thumbnail_bytes(self, track: Track) -> bytes:
- """Download thumbnail image."""
- url = track.thumbnail_url
- if url.startswith("/"):
- url = urllib.parse.urljoin(self.BASE_URL, track.thumbnail_url)
- return await self._download_data(url, "thumbnail", track.name)
+ return await self._download_data(track.audio_url, "audio", track.name)
+
+ def build_search_query(self, keyword: str) -> str:
+ """Build search query."""
+ query = keyword.strip().lower().replace(" ", "-")
+ try:
+ subdomain = query.encode("idna").decode("ascii")
+ except UnicodeError:
+ subdomain = query
+
+ return f"https://{subdomain}.{self.BASE_URL}"
diff --git a/service/data.py b/service/data.py
index 63a603c..7f2d4bc 100644
--- a/service/data.py
+++ b/service/data.py
@@ -1,52 +1,78 @@
-"""Data classes"""
+"""Data classes."""
+
+from __future__ import annotations
+
import json
-import os
from dataclasses import dataclass, field
-from bs4 import BeautifulSoup
+from pathlib import Path
+from typing import Any
-headers_path = os.path.join(os.path.dirname(__file__), "headers.json")
+from bs4 import BeautifulSoup, Tag
+
+headers_path = Path(__file__).parent / "headers.json"
@dataclass
class ServiceConfig:
"""Configuration for music service."""
+
timeout: int = 30
headers: dict = field(
- default_factory=lambda: json.load(open(headers_path))
+ default_factory=lambda: json.load(
+ Path.open(headers_path, encoding="utf-8"),
+ ),
)
@dataclass
class Track:
- """Track data class"""
+ """Track data class."""
index: int
name: str
title: str
performer: str
audio_url: str
- thumbnail_url: str
+
+ @classmethod
+ def from_dict(cls, data: dict[str, Any]) -> Track:
+ """Create Track from a dictionary."""
+ return cls(
+ index=int(data["index"]),
+ name=data["name"],
+ title=data["title"],
+ performer=data["performer"],
+ audio_url=data["audio_url"],
+ )
@classmethod
def from_element(
cls,
- element: BeautifulSoup,
+ element: BeautifulSoup | Tag,
index: int,
- is_search: bool = False,
- ) -> "Track":
- """Create Track from BeautifulSoup element"""
- full_name = element.find(class_="artist_name").text.strip()
- performer, title = full_name.split(" - ", 1)
- audio_url = element.find(class_="right").get("data-id")
- thumbnail_element = element.find(
- class_="little_thumb" if is_search else "resim_thumb"
- )
- thumbnail_url = thumbnail_element.find("img").get("data-src")
+ ) -> Track:
+ """Create Track from BeautifulSoup element."""
+ artist_name_element = element.find(class_="playlist-name-artist")
+ track_name_element = element.find(class_="playlist-name-title")
+ if artist_name_element is None or track_name_element is None:
+ msg = "Could not find artist name element"
+ raise TypeError(msg)
+
+ performer = artist_name_element.text.strip()
+ title = track_name_element.text.strip()
+
+ full_name = f"{performer} - {title}"
+
+ audio_url = element.find(class_="playlist-play")
+ if not isinstance(audio_url, Tag):
+ msg = "Could not find audio URL element"
+ raise TypeError(msg)
+ audio_url = audio_url.get("data-url", "")
+
return cls(
index=index,
name=full_name,
title=title,
performer=performer,
- audio_url=audio_url,
- thumbnail_url=thumbnail_url,
+ audio_url=str(audio_url),
)
diff --git a/service/exceptions.py b/service/exceptions.py
index 6b1414d..f693db9 100644
--- a/service/exceptions.py
+++ b/service/exceptions.py
@@ -1,6 +1,5 @@
-"""Exceptions for music service"""
+"""Exceptions for music service."""
class MusicServiceError(Exception):
"""Base exception for music service errors."""
- pass
diff --git a/service/headers.json b/service/headers.json
index 3271380..fe0cfe2 100644
--- a/service/headers.json
+++ b/service/headers.json
@@ -1,12 +1,11 @@
{
- "user_agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
- "referer": "https://mp3wr.com",
- "sec-ch-ua": "Google Chrome;v=131, Chromium;v=131, Not_A Brand;v=24",
+ "user_agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
+ "sec-ch-ua": "'Not)A;Brand';v='8', 'Chromium';v='138', 'Google Chrome';v='138'",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "Linux",
- "sec-fetch-dest": "document",
- "sec-fetch-mode": "navigate",
- "sec-fetch-site": "same-site",
- "sec-fetch-user": "?1",
+ "sec-fetch-dest": "audio",
+ "sec-fetch-mode": "no-cors",
+ "sec-fetch-site": "cross-site",
+ "sec-fetch-storage-access": "active",
"upgrade-insecure-requests": "1"
}