diff --git a/backend/app/services/auth_service.py b/backend/app/services/auth_service.py index fa8571c..4d0f68c 100644 --- a/backend/app/services/auth_service.py +++ b/backend/app/services/auth_service.py @@ -4,7 +4,7 @@ from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, update +from sqlalchemy import select, update, Null from app.models.user import User, UserStatus from app.schemas.auth import LoginRequest, RegisterRequest, AuthUser, TokenResponse @@ -318,13 +318,61 @@ async def _handle_failed_login( :param user: User object if it exists, otherwise None. :return: None """ + # TODO: Define where place redis key cleanup + # https://github.com/Anvoria/smithy/issues/3 if not user: # Don't reveal if user exists return - # TODO: Implement logic to track failed login attempts - # https://github.com/Anvoria/smithy/issues/3 - pass + login_key = f"login_attempts:{email}" + + login_data = await redis_client.get(key=login_key) or { + "attempts": 0, + "is_locked": False, + } + login_data["attempts"] += 1 + + if login_data.get("attempts") >= 5: + login_data["is_locked"] = True + + await self._lock_user_account(email=email) + + await redis_client.set( + key=login_key, value=login_data, expire=timedelta(minutes=15) + ) + + async def _lock_user_account(self, email: str) -> None: + """ + Update user's locked field if passed max login attempts. + :param email: User email to update. + :return: None. + """ + locked_until = datetime.now(UTC) + timedelta(minutes=15) + + stmt = ( + update(User) + .where(User.email == email) + .values(is_locked=True, locked_until=locked_until) + ) + + await self.db.execute(stmt) + await self.db.commit() + + async def _unlock_user_account(self, email: str) -> None: + """ + Update user's locked field. + :param email: User email to update. + :return: None. + """ + stmt = ( + update(User) + .where(User.email == email) + .values(is_locked=False, locked_until=Null) + ) + + await redis_client.delete(key=f"login_attempts:{email}") + await self.db.execute(stmt) + await self.db.commit() async def _update_login_info(self, user: User) -> None: """