Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.6.4"
__version__ = "0.6.6"
21 changes: 19 additions & 2 deletions src/bot/middlewares/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
from aiogram.types import ErrorEvent, TelegramObject
from aiogram.types import User as AiogramUser
from aiogram.utils.formatting import Text
from aiogram_dialog.api.exceptions import (
InvalidStackIdError,
OutdatedIntent,
UnknownIntent,
UnknownState,
)
from dishka import AsyncContainer

from src.bot.keyboards import get_user_keyboard
Expand All @@ -29,8 +35,19 @@ async def middleware_logic(
data: dict[str, Any],
) -> Any:
aiogram_user: Optional[AiogramUser] = self._get_aiogram_user(event)

error_event = cast(ErrorEvent, event)

if isinstance(
error_event.exception,
(
InvalidStackIdError,
OutdatedIntent,
UnknownIntent,
UnknownState,
),
):
return await handler(event, data)

error = error_event.exception
traceback_str = traceback.format_exc()
error_type_name = type(error).__name__
Expand Down Expand Up @@ -59,7 +76,7 @@ async def middleware_logic(
"user": True if user else False,
"user_id": str(user.telegram_id) if user else False,
"user_name": user.name if user else False,
"username": user.username if user else False,
"username": user.username if user and user.username else False,
"error": f"{error_type_name}: {error_message.as_html()}",
},
reply_markup=reply_markup,
Expand Down
6 changes: 3 additions & 3 deletions src/core/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ def setup_logger() -> None:
sink=LOG_DIR / LOG_FILENAME,
level=LOG_LEVEL,
format=LOG_FORMAT,
rotation=LOG_ROTATION,
retention=LOG_RETENTION,
compression=compress_log_file,
rotation="1GB",
retention="3 days",
compression="zip",
encoding=LOG_ENCODING,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

revision: str = "0017"
down_revision: Union[str, None] = "0016"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.add_column("plans", sa.Column("external_squad_new", sa.UUID(), nullable=True))

op.execute("""
UPDATE plans
SET external_squad_new = external_squad[1]
WHERE external_squad IS NOT NULL;
""")

op.drop_column("plans", "external_squad")

op.alter_column("plans", "external_squad_new", new_column_name="external_squad")


def downgrade() -> None:
op.add_column("plans", sa.Column("external_squad_new", sa.ARRAY(sa.UUID()), nullable=True))

op.execute("""
UPDATE plans
SET external_squad_new = ARRAY[external_squad]
WHERE external_squad IS NOT NULL;
""")

op.drop_column("plans", "external_squad")

op.alter_column("plans", "external_squad_new", new_column_name="external_squad")
2 changes: 1 addition & 1 deletion src/infrastructure/database/models/sql/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Plan(BaseSql, TimestampMixin):
)
allowed_user_ids: Mapped[list[int]] = mapped_column(ARRAY(BigInteger), nullable=True)
internal_squads: Mapped[list[UUID]] = mapped_column(ARRAY(PG_UUID), nullable=False)
external_squad: Mapped[Optional[UUID]] = mapped_column(ARRAY(PG_UUID), nullable=True)
external_squad: Mapped[Optional[UUID]] = mapped_column(PG_UUID, nullable=True)

durations: Mapped[list["PlanDuration"]] = relationship(
"PlanDuration",
Expand Down
24 changes: 9 additions & 15 deletions src/infrastructure/database/repositories/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional, Type, TypeVar, Union, cast
from typing import Any, Optional, Type, TypeVar, Union

from sqlalchemy import ColumnExpressionArgument, delete, func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -42,7 +42,8 @@ async def delete_instance(self, instance: T) -> None:
await self.session.delete(instance)

async def _get_one(self, model: ModelType[T], *conditions: ConditionType) -> Optional[T]:
result = await self.session.execute(select(model).where(*conditions))
stmt = select(model).where(*conditions)
result = await self.session.execute(stmt)
return result.unique().scalar_one_or_none()

async def _get_many(
Expand Down Expand Up @@ -78,23 +79,16 @@ async def _update(
**kwargs: Any,
) -> Optional[T]:
if not kwargs:
if not load_result:
return None
return cast(Optional[T], await self._get_one(model, *conditions))
return await self._get_one(model, *conditions) if load_result else None

query = update(model).where(*conditions).values(**kwargs)
stmt = update(model).where(*conditions).values(**kwargs)

if load_result:
query = query.returning(model.id) # type: ignore [attr-defined]

result = await self.session.execute(query)
obj_id: Optional[int] = result.scalar_one_or_none()

if obj_id is not None and load_result:
db_obj = await self.session.get(model, obj_id)
await self.session.refresh(db_obj)
return db_obj
stmt = stmt.returning(model)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()

await self.session.execute(stmt)
return None

async def _delete(self, model: ModelType[T], *conditions: ConditionType) -> int:
Expand Down
37 changes: 20 additions & 17 deletions src/infrastructure/database/uow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@


class UnitOfWork:
session_pool: async_sessionmaker[AsyncSession]
session: Optional[AsyncSession] = None
def __init__(self, session_maker: async_sessionmaker[AsyncSession]) -> None:
self.session_maker = session_maker
self.session: Optional[AsyncSession] = None
self._repository: Optional[RepositoriesFacade] = None

repository: RepositoriesFacade

def __init__(self, session_pool: async_sessionmaker[AsyncSession]) -> None:
self.session_pool = session_pool
@property
def repository(self) -> RepositoriesFacade:
if self._repository is None:
raise RuntimeError("SQL session not started. Use 'async with uow:'")
return self._repository

async def __aenter__(self) -> Self:
self.session = self.session_pool()
self.repository = RepositoriesFacade(session=self.session)
logger.debug(f"Opened session '{id(self.session)}'")
self.session = self.session_maker()
self._repository = RepositoriesFacade(session=self.session)

logger.debug(f"SQL session started. Session ID: '{id(self.session)}'")
return self

async def __aexit__(
Expand All @@ -31,19 +35,18 @@ async def __aexit__(
if self.session is None:
return

session_id = id(self.session)
try:
if exc_type is None:
await self.commit()
if exc_type:
await self.session.rollback()
logger.warning(f"SQL transaction rolled back due to error: '{exc_val}'")
else:
logger.warning(
f"Exception detected '{exc_val}', rolling back session '{session_id}'"
)
await self.rollback()
await self.session.commit()
logger.debug("SQL transaction committed successfully")
finally:
await self.session.close()
logger.debug(f"Closed session '{session_id}'")
self.session = None
self._repository = None
logger.debug("SQL session closed")

async def commit(self) -> None:
if self.session:
Expand Down
4 changes: 2 additions & 2 deletions src/infrastructure/di/providers/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ async def get_uow(
self,
session_maker: async_sessionmaker[AsyncSession],
) -> AsyncIterable[UnitOfWork]:
async with UnitOfWork(session_maker) as uow:
yield uow
uow = UnitOfWork(session_maker)
yield uow
13 changes: 11 additions & 2 deletions src/infrastructure/taskiq/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@


def create_broker(config: AppConfig) -> RedisStreamBroker:
result_backend: AsyncResultBackend[Any] = RedisAsyncResultBackend(redis_url=config.redis.dsn)
broker = RedisStreamBroker(url=config.redis.dsn).with_result_backend(result_backend)
result_backend: AsyncResultBackend[Any] = RedisAsyncResultBackend(
redis_url=config.redis.dsn,
keep_results=False,
result_ex_time=3600,
)

broker = RedisStreamBroker(
url=config.redis.dsn,
maxlen=1000,
).with_result_backend(result_backend)

return broker


Expand Down
13 changes: 5 additions & 8 deletions src/infrastructure/taskiq/tasks/notifications.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import asyncio
from typing import Any, Union, cast

from aiogram.types import BufferedInputFile
from dishka.integrations.taskiq import FromDishka, inject

from src.bot.keyboards import get_buy_keyboard, get_renew_keyboard
from src.core.constants import BATCH_DELAY, BATCH_SIZE
from src.core.enums import MediaType, UserNotificationType
from src.core.enums import UserNotificationType
from src.core.utils.iterables import chunked
from src.core.utils.message_payload import MessagePayload
from src.core.utils.types import RemnaUserDto
Expand All @@ -23,13 +22,11 @@ async def send_error_notification_task(
payload: MessagePayload,
notification_service: FromDishka[NotificationService],
) -> None:
file_data = BufferedInputFile(
file=traceback_str.encode(),
filename=f"error_{error_id}.txt",
await notification_service.error_notify(
traceback_str=traceback_str,
payload=payload,
error_id=error_id,
)
payload.media = file_data
payload.media_type = MediaType.DOCUMENT
await notification_service.notify_super_dev(payload=payload)


@broker.task
Expand Down
Loading