Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/crypto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from typing import Dict

app = FastAPI()

# JWT Config
SECRET_KEY = "supersecretkey"
ALGORITHM = "HS256"

# Create a reusable security scheme
security = HTTPBearer()


# Token verification function
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload # contains user_id or username
except JWTError:
raise HTTPException(status_code=401, detail="Invalid or expired token")


@app.middleware("http")
async def auth_middleware(request: Request, call_next):
"""Global middleware to secure all routes."""
if request.url.path in ["/login", "/signup", "/docs", "/openapi.json"]:
# Allow public routes
return await call_next(request)

# Extract Authorization header
auth_header = request.headers.get("authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid Authorization header")

token = auth_header.split(" ")[1]
try:
jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise HTTPException(status_code=401, detail="Invalid or expired token")

return await call_next(request)


# Example: Protected Route
@app.get("/user/chat/{chat_id}")
async def get_chat(chat_id: str, user: Dict = Depends(verify_token)):
user_id = user.get("sub") # typically the user_id
if not chat_belongs_to_user(chat_id, user_id):
raise HTTPException(status_code=403, detail="Access denied to this chat session")
return {"chat_id": chat_id, "message": "Chat retrieved successfully"}


def chat_belongs_to_user(chat_id: str, user_id: str) -> bool:
# Mock validation for example purposes
# You’d normally check your database here
return chat_id.startswith(user_id)


@app.post("/login")
async def login():
"""Mock login to generate token"""
user_data = {"sub": "user123"}
token = jwt.encode(user_data, SECRET_KEY, algorithm=ALGORITHM)
return {"access_token": token}
Loading