Skip to content

Commit 2aaadce

Browse files
authored
chore: refactor api endpoints (#31)
2 parents 88356e1 + e8039de commit 2aaadce

File tree

10 files changed

+666
-562
lines changed

10 files changed

+666
-562
lines changed

fob_api/auth/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,13 @@ def basic_auth_validator(username: str, password: str) -> User:
6666
if not user or not password_context.verify(password, user.password):
6767
return False
6868
return user
69+
70+
def is_admin(user: User):
71+
"""Check if the user is an admin"""
72+
if not user.is_admin:
73+
raise HTTPException(status_code=403, detail="Not enough permissions")
74+
75+
def is_admin_or_self(user: User, username: str):
76+
"""Check if the user is an admin or the user itself"""
77+
if not user.is_admin and user.username != username:
78+
raise HTTPException(status_code=403, detail="Not enough permissions")

fob_api/models/api/openstack.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ class OpenStackProjectCreate(BaseModel):
66

77
class OpenStackProject(OpenStackProjectCreate):
88
id: int
9+
type: str
910

1011
class OpenStackUserPassword(BaseModel):
1112
username: str

fob_api/routes/devices.py

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@
1616

1717
template = Jinja2Templates(directory="templates")
1818

19+
MAX_ALLOWED_DEVICES = 5
20+
21+
def count_devices_for_user(username: str) -> int:
22+
return len(headscale_driver.node.list(username=username))
23+
24+
def can_add_device(username: str) -> bool:
25+
return count_devices_for_user(username) <= MAX_ALLOWED_DEVICES
26+
1927
@router.get("/register/{mkey}", tags=["vpn"], response_class=HTMLResponse)
2028
def register_device_get(request: Request, mkey: str):
2129
"""
@@ -34,7 +42,6 @@ async def register_device_post(request: Request, mkey: str):
3442
"""
3543
Handle device registration for headscale require basic auth
3644
"""
37-
# todo add limit to max allowed devices
3845
form_data = await request.form()
3946
username = form_data.get("username")
4047
password = form_data.get("password")
@@ -63,6 +70,13 @@ async def register_device_post(request: Request, mkey: str):
6370

6471
headscale_tasks.get_or_create_user(username)
6572

73+
if not can_add_device(username):
74+
return template.TemplateResponse(
75+
request=request,
76+
name="register_device.html.j2",
77+
context={"mkey": mkey, "error": f"Max allowed devices reached ({MAX_ALLOWED_DEVICES})"}
78+
)
79+
6680
try:
6781
headscale_driver.node.register(username, mkey)
6882
except Exception as e:
@@ -79,22 +93,27 @@ async def register_device_post(request: Request, mkey: str):
7993
)
8094

8195
@router.get("/{username}", tags=["vpn"])
82-
def list_devices(user: Annotated[User, Depends(auth.get_current_user)], username: str):
96+
def list_devices(
97+
username: str,
98+
user: Annotated[User, Depends(auth.get_current_user)]
99+
):
83100
"""
84101
List all devices
85102
"""
86-
if user.username != username and not user.is_admin:
87-
raise HTTPException(status_code=403, detail="You are not an admin")
103+
auth.is_admin_or_self(user, username)
88104
nodes = headscale_driver.node.list(username=username)
89105
return [ApiDeviceResponse(**node.__dict__) for node in nodes]
90106

91107
@router.delete("/{username}/{name}", tags=["vpn"], response_model=DeviceDeleteResponse)
92-
def delete_device(user: Annotated[User, Depends(auth.get_current_user)], username: str, name: str):
108+
def delete_device(
109+
username: str,
110+
name: str,
111+
user: Annotated[User, Depends(auth.get_current_user)]
112+
):
93113
"""
94114
Delete a device
95115
"""
96-
if user.username != username and not user.is_admin:
97-
raise HTTPException(status_code=403, detail="You are not an admin")
116+
auth.is_admin_or_self(user, username)
98117
user_nodes: List[Node] = headscale_driver.node.list(username=username)
99118
for node in user_nodes:
100119
if node.givenName == name:
@@ -107,10 +126,11 @@ def generate_preauth_key(user: Annotated[User, Depends(auth.get_current_user)],
107126
"""
108127
Generate a preauth key active for 5 minutes
109128
"""
110-
# todo add limit to max allowed devices
111-
if user.username != username:
112-
raise HTTPException(status_code=403, detail="You are not allowed to generate preauth key for other users")
129+
auth.is_admin_or_self(user, username)
113130
headscale_tasks.get_or_create_user(username=username)
131+
132+
if not can_add_device(username):
133+
raise HTTPException(status_code=400, detail=f"Max allowed devices reached ({MAX_ALLOWED_DEVICES})")
134+
114135
pre_auth_key: PreAuthKey = headscale_driver.preauthkey.create(username=username, expiration=datetime.now() + timedelta(minutes=5))
115-
print(pre_auth_key.__dict__)
116136
return DevicePreAuthKeyResponse(**pre_auth_key.__dict__)

fob_api/routes/headscale.py

Lines changed: 84 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
from typing import Annotated, List
2-
from datetime import datetime, timedelta
32

43
from fastapi import APIRouter, Depends, HTTPException, Request
54
from sqlmodel import Session, select
65
from sqlalchemy.exc import IntegrityError
76

8-
from fob_api import auth, engine
7+
from fob_api import auth, get_session
98
from fob_api.models.database import User, HeadScalePolicyACL, HeadScalePolicyHost
109
from fob_api.models.api import HeadScalePolicyAcl as HeadScalePolicyAclAPI
1110
from fob_api.models.api import HeadScalePolicyAclCreate as HeadScalePolicyAclCreateAPI
@@ -16,14 +15,15 @@
1615
router = APIRouter(prefix="/headscale")
1716

1817
@router.get("/acls/", tags=["vpn"])
19-
def list(user: Annotated[User, Depends(auth.get_current_user)]) -> List[HeadScalePolicyAclAPI]:
18+
def list(
19+
user: Annotated[User, Depends(auth.get_current_user)],
20+
session: Session = Depends(get_session),
21+
) -> List[HeadScalePolicyAclAPI]:
2022
"""
2123
Return list of HeadScale ACLs in Policy
2224
"""
23-
if not user.is_admin:
24-
raise HTTPException(status_code=403, detail="Not an admin")
25-
with Session(engine) as session:
26-
acls = session.exec(select(HeadScalePolicyACL)).all()
25+
auth.is_admin(user)
26+
acls = session.exec(select(HeadScalePolicyACL)).all()
2727
return [HeadScalePolicyAclAPI(
2828
id=acl.id,
2929
action=acl.action,
@@ -34,114 +34,110 @@ def list(user: Annotated[User, Depends(auth.get_current_user)]) -> List[HeadScal
3434

3535
@router.post("/acls/", tags=["vpn"])
3636
def create(
37-
acl: HeadScalePolicyAclCreateAPI,
38-
user: Annotated[User, Depends(auth.get_current_user)],
39-
) -> HeadScalePolicyAclAPI | None:
37+
acl: HeadScalePolicyAclCreateAPI,
38+
user: Annotated[User, Depends(auth.get_current_user)],
39+
session: Session = Depends(get_session),
40+
) -> HeadScalePolicyAclAPI | None:
4041
"""
4142
Create a new HeadScale ACL in Policy
4243
"""
43-
if not user.is_admin:
44-
raise HTTPException(status_code=403, detail="Not an admin")
45-
with Session(engine) as session:
46-
new_acl = HeadScalePolicyACL(**acl.model_dump())
47-
session.add(new_acl)
48-
session.commit()
49-
session.refresh(new_acl)
44+
auth.is_admin(user)
45+
new_acl = HeadScalePolicyACL(**acl.model_dump())
46+
session.add(new_acl)
47+
session.commit()
48+
session.refresh(new_acl)
5049

51-
try:
52-
update_headscale_policy()
53-
except Exception as e:
54-
session.delete(new_acl)
55-
session.commit()
56-
raise HTTPException(status_code=400, detail=f"Failed to apply new policy: {e}")
57-
return HeadScalePolicyAclAPI(
58-
id=new_acl.id,
59-
action=new_acl.action,
60-
src=new_acl.src.split(","),
61-
dst=new_acl.dst.split(","),
62-
proto=new_acl.proto,
63-
)
50+
try:
51+
update_headscale_policy()
52+
except Exception as e:
53+
session.delete(new_acl)
54+
session.commit()
55+
raise HTTPException(status_code=400, detail=f"Failed to apply new policy: {e}")
56+
return HeadScalePolicyAclAPI(
57+
id=new_acl.id,
58+
action=new_acl.action,
59+
src=new_acl.src.split(","),
60+
dst=new_acl.dst.split(","),
61+
proto=new_acl.proto,
62+
)
6463

6564
@router.delete("/acls/{acl_id}/", tags=["vpn"])
6665
def delete(
67-
acl_id: int,
68-
user: Annotated[User, Depends(auth.get_current_user)],
69-
) -> None:
66+
acl_id: int,
67+
user: Annotated[User, Depends(auth.get_current_user)],
68+
session: Session = Depends(get_session),
69+
) -> None:
7070
"""
7171
Delete a HeadScale ACL from Policy
7272
"""
73-
if not user.is_admin:
74-
raise HTTPException(status_code=403, detail="Not an admin")
75-
with Session(engine) as session:
76-
acl = session.get(HeadScalePolicyACL, acl_id)
77-
if not acl:
78-
raise HTTPException(status_code=404, detail="ACL not found")
79-
session.delete(acl)
73+
auth.is_admin(user)
74+
acl = session.get(HeadScalePolicyACL, acl_id)
75+
if not acl:
76+
raise HTTPException(status_code=404, detail="ACL not found")
77+
session.delete(acl)
78+
session.commit()
79+
try:
80+
update_headscale_policy()
81+
except Exception as e:
82+
session.add(acl)
8083
session.commit()
81-
try:
82-
update_headscale_policy()
83-
except Exception as e:
84-
session.add(acl)
85-
session.commit()
86-
raise HTTPException(status_code=400, detail=f"Failed to apply new policy: {e}")
84+
raise HTTPException(status_code=400, detail=f"Failed to apply new policy: {e}")
8785

8886
@router.get("/host/", tags=["vpn"])
89-
def list_hosts(user: Annotated[User, Depends(auth.get_current_user)]) -> List[HeadScalePolicyHostAPI]:
87+
def list_hosts(
88+
user: Annotated[User, Depends(auth.get_current_user)],
89+
session: Session = Depends(get_session),
90+
) -> List[HeadScalePolicyHostAPI]:
9091
"""
9192
Return list of HeadScale hosts
9293
"""
93-
if not user.is_admin:
94-
raise HTTPException(status_code=403, detail="Not an admin")
95-
with Session(engine) as session:
96-
headscale_policy_host = session.exec(select(HeadScalePolicyHost)).all()
94+
auth.is_admin(user)
95+
headscale_policy_host = session.exec(select(HeadScalePolicyHost)).all()
9796
return headscale_policy_host
9897

9998
@router.post("/host/", tags=["vpn"])
10099
def create_host(
101-
host: HeadScalePolicyHostCreateAPI,
102-
user: Annotated[User, Depends(auth.get_current_user)],
103-
) -> HeadScalePolicyHostAPI | None:
100+
host: HeadScalePolicyHostCreateAPI,
101+
user: Annotated[User, Depends(auth.get_current_user)],
102+
session: Session = Depends(get_session),
103+
) -> HeadScalePolicyHostAPI | None:
104104
"""
105105
Create a new HeadScale host
106106
"""
107-
if not user.is_admin:
108-
raise HTTPException(status_code=403, detail="Not an admin")
109-
with Session(engine) as session:
110-
111-
new_host = HeadScalePolicyHost(**host.model_dump())
112-
session.add(new_host)
113-
try:
114-
session.commit()
115-
except IntegrityError:
116-
raise HTTPException(status_code=400, detail="This host binding already exists")
117-
session.refresh(new_host)
118-
try:
119-
update_headscale_policy()
120-
except Exception as e:
121-
session.delete(new_host)
122-
session.commit()
123-
raise HTTPException(status_code=400, detail=f"Failed to apply new policy: {e}")
124-
return new_host
107+
auth.is_admin(user)
108+
new_host = HeadScalePolicyHost(**host.model_dump())
109+
session.add(new_host)
110+
try:
111+
session.commit()
112+
except IntegrityError:
113+
raise HTTPException(status_code=400, detail="This host binding already exists")
114+
session.refresh(new_host)
115+
try:
116+
update_headscale_policy()
117+
except Exception as e:
118+
session.delete(new_host)
119+
session.commit()
120+
raise HTTPException(status_code=400, detail=f"Failed to apply new policy: {e}")
121+
return new_host
125122

126123
@router.delete("/host/{host_id}/", tags=["vpn"])
127124
def delete_host(
128-
host_id: int,
129-
user: Annotated[User, Depends(auth.get_current_user)],
130-
) -> None:
125+
host_id: int,
126+
user: Annotated[User, Depends(auth.get_current_user)],
127+
session: Session = Depends(get_session),
128+
) -> None:
131129
"""
132130
Delete a HeadScale host
133131
"""
134-
if not user.is_admin:
135-
raise HTTPException(status_code=403, detail="Not an admin")
136-
with Session(engine) as session:
137-
host = session.get(HeadScalePolicyHost, host_id)
138-
if not host:
139-
raise HTTPException(status_code=404, detail="Host not found")
140-
session.delete(host)
132+
auth.is_admin(user)
133+
host = session.get(HeadScalePolicyHost, host_id)
134+
if not host:
135+
raise HTTPException(status_code=404, detail="Host not found")
136+
session.delete(host)
137+
session.commit()
138+
try:
139+
update_headscale_policy()
140+
except Exception as e:
141+
session.add(host)
141142
session.commit()
142-
try:
143-
update_headscale_policy()
144-
except Exception as e:
145-
session.add(host)
146-
session.commit()
147-
raise HTTPException(status_code=400, detail=f"Failed to apply new policy: {e}")
143+
raise HTTPException(status_code=400, detail=f"Failed to apply new policy: {e}")

0 commit comments

Comments
 (0)