Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion backend/.env-example
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
DATABASE_URL="sqlite:///./db.sqlite3" # Conexão para o banco de dados
JWT_SECRET_KEY="SUA_SECRET_KEY" # Recomendo gerar uma chave usando: python -c "import secrets; print(secrets.token_hex(32))"
JWT_EXPIRATION_TIME=30 # Tempo de expiração do token em minutos
JWT_EXPIRATION_TIME=30 # Tempo de expiração do token em minutos
WEBHOOK_LOG_INFO="" # Preencha com o seu link de webhook
WEBHOOK_LOG_ERROR=""
37 changes: 36 additions & 1 deletion backend/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ dependencies = [
"passlib[argon2] (>=1.7.4,<2.0.0)",
"pyjwt (>=2.10.1,<3.0.0)",
"slowapi (>=0.1.9,<0.2.0)",
"apscheduler (>=3.11.0,<4.0.0)"
"apscheduler (>=3.11.0,<4.0.0)",
"loguru (>=0.7.3,<0.8.0)"
]

[tool.poetry]
Expand Down
16 changes: 14 additions & 2 deletions backend/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
from fastapi.responses import JSONResponse
from slowapi.middleware import SlowAPIMiddleware
from slowapi.errors import RateLimitExceeded
from src.utils import limiter
from src.utils import limiter, get_user_ip
import uvicorn
from src.logger import setup_discord_logging, logger

setup_discord_logging()

app = FastAPI(docs_url=None, redoc_url=None)
app.state.limiter = limiter
Expand All @@ -22,6 +25,14 @@
allow_headers=['*'],
)

@app.middleware("http")
async def capture_exceptions(request: Request, call_next):
try:
return await call_next(request)
except Exception as exc:
logger.critical(f"`{request.method} {request.url.path}` Erro não tratado!\n- IP: {get_user_ip(request)} - Session ID: {request.cookies.get('session_id')}\n```{exc}```")
return JSONResponse(status_code=500, content={"detail": "Internal Server Error"})

@app.exception_handler(RateLimitExceeded)
async def rate_limit_exception_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
Expand All @@ -34,4 +45,5 @@ async def rate_limit_exception_handler(request: Request, exc: RateLimitExceeded)
scheduler.start()

if __name__ == '__main__':
uvicorn.run(app, host='0.0.0.0', port=80, reload=False)
uvicorn.run(app, host='0.0.0.0', port=80, reload=False)
logger.info("API iniciada!")
30 changes: 30 additions & 0 deletions backend/src/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
from datetime import datetime
import threading
import httpx
from loguru import logger
from src.settings import WEBHOOK_INFO, WEBHOOK_ERROR

def send_webhook(message: str, webhook_url: str):
Comment on lines +7 to +8
Copy link

Copilot AI Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The webhook URLs are imported without validation. If these environment variables are None or empty, the webhook calls will fail. Add validation to ensure webhooks are properly configured before attempting to send messages.

Suggested change
def send_webhook(message: str, webhook_url: str):
def validate_webhook_url(webhook_url: str) -> bool:
if not webhook_url or not isinstance(webhook_url, str):
logger.warning(f"Invalid webhook URL: {webhook_url}")
return False
return True
def send_webhook(message: str, webhook_url: str):
if not validate_webhook_url(webhook_url):
return

Copilot uses AI. Check for mistakes.
def runner():
async def task():
try:
async with httpx.AsyncClient(timeout=5) as client:
await client.post(webhook_url, json={"content": message})
except Exception as e:
logger.error(f"[Log Error]: {e}")
Copy link

Copilot AI Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a circular dependency risk - if the webhook fails, it tries to log the error using the same logger that might call the webhook again. Consider using a different logging mechanism or print statement for webhook failures.

Suggested change
logger.error(f"[Log Error]: {e}")
print(f"[Log Error]: {e}")

Copilot uses AI. Check for mistakes.

asyncio.run(task())

threading.Thread(target=runner, daemon=True).start()

def info_sink(message):
send_webhook(f"**[{message.record['level'].name}]** {message.record['message']}\n-# **{datetime.now().strftime('%d/%m/%Y - %H:%M:%S')}**", WEBHOOK_INFO)

def error_sink(message):
send_webhook(f"**[{message.record['level'].name}]** {message.record['message']}\n-# **{datetime.now().strftime('%d/%m/%Y - %H:%M:%S')}**", WEBHOOK_ERROR)

def setup_discord_logging():
logger.remove()
logger.add(info_sink, level="INFO", filter=lambda r: r["level"].name in ("INFO", "WARNING"))
logger.add(error_sink, level="ERROR", filter=lambda r: r["level"].name in ("ERROR", "CRITICAL"))
17 changes: 16 additions & 1 deletion backend/src/routes/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from src.db.models import User, RefreshToken
from src.security import get_user, verify_password, generate_password_hash, generate_jwt_token, clear_auth_cookie, generate_session_id, invalidate_all_sessions
from src.schemas import LoginRequestSchema, RegisterRequestSchema, LoginResponseSchema, UsernameUpdateSchema, EmailUpdateSchema, PasswordUpdateSchema
from src.utils import limiter
from src.utils import limiter, get_user_ip
from sqlalchemy.orm import Session
from src.logger import logger

router = APIRouter()

Expand All @@ -14,13 +15,16 @@
def login(login: LoginRequestSchema, request: Request, response: Response, db: Session = Depends(get_db)):
user = db.query(User).filter(User.email == login.email).first()
if not user:
logger.warning(f"`/auth/login`: Tentativa de login com email inexistente\n```Email: {login.email}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
raise HTTPException(status_code=401, detail='E-mail ou senha estão inválidos. Por favor, tente novamente.')

if not verify_password(login.password, user.password):
logger.warning(f"`/auth/login`: Tentativa de login com senha inválida\n```Email: {login.email}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
raise HTTPException(status_code=401, detail='E-mail ou senha estão inválidos. Por favor, tente novamente.')

session_id = generate_session_id(response)
token = generate_jwt_token(user.id, user.username, session_id, request, response, login.remember, db)
logger.info(f"`/auth/login`: Login realizado com sucesso\n```Email: {login.email}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
return {
'access_token': token['access_token'],
'refresh_token': token['refresh_token'],
Expand All @@ -35,12 +39,15 @@ def login(login: LoginRequestSchema, request: Request, response: Response, db: S
@limiter.limit("3/minute;15/day")
def register(register: RegisterRequestSchema, request: Request, response: Response, db: Session = Depends(get_db)):
if register.password != register.confirm_password:
logger.warning(f"`/auth/register`: Tentativa de registro com senhas diferentes\n```Username: {register.username} - Email: {register.email}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
raise HTTPException(status_code=400, detail='As senhas não coincidem. Verifique se as senhas estão iguais.')

if db.query(User).filter(User.username == register.username).first():
logger.warning(f"`/auth/register`: Tentativa de registro com username já existente\n```Username: {register.username} - Email: {register.email}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
raise HTTPException(status_code=409, detail='Nome de usuário já registrado. Por favor, tente outro nome de usuário.')

if db.query(User).filter(User.email == register.email).first():
logger.warning(f"`/auth/register`: Tentativa de registro com e-mail já existente\n```Username: {register.username} - Email: {register.email}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
raise HTTPException(status_code=409, detail='E-mail já registrado. Por favor, tente outro e-mail.')

user = User(username=register.username, email=register.email, password=generate_password_hash(register.password))
Expand All @@ -50,6 +57,7 @@ def register(register: RegisterRequestSchema, request: Request, response: Respon

session_id = generate_session_id(response)
token = generate_jwt_token(user.id, user.username, session_id, request, response, True, db)
logger.info(f"`/auth/register`: Registro realizado com sucesso\n```Username: {register.username} - Email: {register.email}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
return {
'access_token': token['access_token'],
'refresh_token': token['refresh_token'],
Expand All @@ -66,6 +74,7 @@ def logout(response: Response, request: Request, db: Session = Depends(get_db)):
if session_id:
db.query(RefreshToken).filter(RefreshToken.session_id == session_id).update({'is_active': False})
db.commit()
logger.info(f"`/auth/logout`: Logout realizado com sucesso\n```Session ID: {session_id}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")

clear_auth_cookie(response)
return {'message': 'Successfully logged out'}
Expand All @@ -82,6 +91,7 @@ def get_current_user(request: Request, response: Response, db: Session = Depends
def update_username(new: UsernameUpdateSchema, request: Request, response: Response, db: Session = Depends(get_db)):
user = get_user(request, response, db)
if not user:
logger.warning(f"`/auth/me/username`: Tentativa com token inválido\n```IP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
raise HTTPException(status_code=401, detail='Invalid token or user not found')

if db.query(User).filter(User.username == new.username).first():
Expand All @@ -90,13 +100,15 @@ def update_username(new: UsernameUpdateSchema, request: Request, response: Respo
user_db = db.query(User).filter(User.id == user['id']).first()
user_db.username = new.username
db.commit()
logger.info(f"`/auth/me/username`: Username atualizado com sucesso\n```Username: {user['username']} -> {new.username}\nSession ID: {request.cookies.get('session_id')}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
return {'message': 'Username updated successfully'}

@router.patch('/me/email')
@limiter.limit("3/day")
def update_email(new: EmailUpdateSchema, request: Request, response: Response, db: Session = Depends(get_db)):
user = get_user(request, response, db)
if not user:
logger.warning(f"`/auth/me/email`: Tentativa com token inválido\n```IP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
raise HTTPException(status_code=401, detail='Invalid token or user not found')

if db.query(User).filter(User.email == new.email).first():
Expand All @@ -106,13 +118,15 @@ def update_email(new: EmailUpdateSchema, request: Request, response: Response, d
user_db.email = new.email
db.commit()
invalidate_all_sessions(user['id'], request.cookies.get('session_id'), True, db)
logger.info(f"`/auth/me/email`: E-mail atualizado com sucesso\n```Email: {user['email']} -> {new.email}\nSession ID: {request.cookies.get('session_id')}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
return {'message': 'E-mail updated successfully'}

@router.patch('/me/password')
@limiter.limit("3/day")
def update_password(new: PasswordUpdateSchema, request: Request, response: Response, db: Session = Depends(get_db)):
user = get_user(request, response, db)
if not user:
logger.warning(f"`/auth/me/password`: Tentativa com token inválido\n```IP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
raise HTTPException(status_code=401, detail='Invalid token or user not found')

user_db = db.query(User).filter(User.id == user['id']).first()
Expand All @@ -122,4 +136,5 @@ def update_password(new: PasswordUpdateSchema, request: Request, response: Respo
user_db.password = generate_password_hash(new.password)
db.commit()
invalidate_all_sessions(user['id'], request.cookies.get('session_id'), True, db)
logger.info(f"`/auth/me/password`: Senha atualizada com sucesso\n```Email: {user['email']} - Session ID: {request.cookies.get('session_id')}\nIP: {get_user_ip(request)} | User-Agent: {request.headers.get('User-Agent')}```")
return {'message': 'Password updated successfully'}
Loading