|
2 | 2 |
|
3 | 3 | from fastapi import APIRouter, Depends, HTTPException |
4 | 4 | from fastapi.security import OAuth2PasswordRequestForm |
| 5 | +from sqlmodel import Session, select |
5 | 6 |
|
6 | | -from fob_api import auth |
| 7 | +from fob_api import auth, get_session |
7 | 8 | from fob_api.models.database import User |
| 9 | +from fob_api.models.database import Token as TokenDB |
8 | 10 | from fob_api.models.api import Token, TokenValidate |
9 | 11 |
|
10 | 12 | router = APIRouter() |
11 | 13 |
|
12 | 14 | @router.post("/token", response_model=Token, tags=["token"]) |
13 | | -def get_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> str: |
| 15 | +def get_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()], session: Session = Depends(get_session)) -> Token: |
14 | 16 | user = auth.basic_auth_validator(form_data.username, form_data.password) |
15 | 17 | if not user: |
16 | 18 | raise HTTPException(status_code=401, detail="Invalid credentials") |
17 | | - token = auth.encode_token(user.username) |
| 19 | + token_data = auth.make_token_data(user.username) |
| 20 | + token_db: TokenDB = TokenDB( |
| 21 | + expires_at=token_data["exp"], |
| 22 | + created_at=token_data["iat"], |
| 23 | + token_id=token_data["jti"], |
| 24 | + user_id=user.id, |
| 25 | + ) |
| 26 | + session.add(token_db) |
| 27 | + session.commit() |
| 28 | + token = auth.encode_token(token_data) |
18 | 29 | return Token(access_token=token, token_type="bearer") |
19 | 30 |
|
20 | 31 |
|
21 | 32 | @router.get("/token/refreshtoken", response_model=Token, tags=["token"]) |
22 | | -def refresh_token(user: Annotated[User, Depends(auth.get_current_user)]) -> str: |
23 | | - token = auth.encode_token(user.username) |
| 33 | +def refresh_token(user: Annotated[User, Depends(auth.get_current_user)], session: Session = Depends(get_session)) -> Token: |
| 34 | + token_data = auth.make_token_data(user.username) |
| 35 | + token_db: TokenDB = TokenDB( |
| 36 | + expires_at=token_data["exp"], |
| 37 | + created_at=token_data["iat"], |
| 38 | + token_id=token_data["jti"], |
| 39 | + user_id=user.id, |
| 40 | + ) |
| 41 | + session.add(token_db) |
| 42 | + session.commit() |
| 43 | + token = auth.encode_token(token_data) |
24 | 44 | return Token(access_token=token, token_type="bearer") |
25 | 45 |
|
| 46 | +@router.delete("/token/{jti}", tags=["token"]) |
| 47 | +def revoke_token(jti: str, user: Annotated[User, Depends(auth.get_current_user)], session: Session = Depends(get_session)) -> None: |
| 48 | + token = session.exec(select(TokenDB).where(TokenDB.token_id == jti)).first() |
| 49 | + if not token or token.user_id != user.id: |
| 50 | + raise HTTPException(status_code=404, detail="Cant revoke token") |
| 51 | + session.delete(token) |
| 52 | + session.commit() |
| 53 | + |
26 | 54 | @router.get("/token/verify", response_model=TokenValidate, tags=["token"]) |
27 | | -def verify_token(user: Annotated[User, Depends(auth.get_current_user)]) -> str: |
| 55 | +def verify_token(user: Annotated[User, Depends(auth.get_current_user)]) -> TokenValidate: |
28 | 56 | return TokenValidate(valid=True) |
0 commit comments