-
Notifications
You must be signed in to change notification settings - Fork 2
[Guardrails] Banlist API #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
59a0ae5
c1078ca
8c0b7de
8757fac
42be28f
fe5e1bb
840990b
d367855
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| """Added ban_list table | ||
|
|
||
| Revision ID: 004 | ||
| Revises: 003 | ||
| Create Date: 2026-02-05 09:42:54.128852 | ||
|
|
||
| """ | ||
| from typing import Sequence, Union | ||
|
|
||
| from alembic import op | ||
| from sqlalchemy.dialects import postgresql | ||
| import sqlalchemy as sa | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision: str = '004' | ||
| down_revision = '003' | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| op.create_table('ban_list', | ||
| sa.Column('id', sa.Uuid(), nullable=False), | ||
| sa.Column('name', sa.String(), nullable=False), | ||
| sa.Column('description', sa.String(), nullable=False), | ||
| sa.Column('organization_id', sa.Integer(), nullable=False), | ||
| sa.Column('project_id', sa.Integer(), nullable=False), | ||
| sa.Column('domain', sa.String(), nullable=False), | ||
| sa.Column('is_public', sa.Boolean(), nullable=False, server_default=sa.false()), | ||
| sa.Column("banned_words", postgresql.ARRAY(sa.String()), nullable=False, server_default="{}"), | ||
| sa.Column('created_at', sa.DateTime(), nullable=False), | ||
| sa.Column('updated_at', sa.DateTime(), nullable=False), | ||
|
|
||
| sa.PrimaryKeyConstraint('id'), | ||
| sa.UniqueConstraint('name', 'organization_id', 'project_id', name='uq_banlist_name_org_project'), | ||
| ) | ||
|
|
||
| op.create_index("idx_banlist_organization", "ban_list", ["organization_id"]) | ||
| op.create_index("idx_banlist_project", "ban_list", ["project_id"]) | ||
| op.create_index("idx_banlist_domain", "ban_list", ["domain"]) | ||
| op.create_index("idx_banlist_is_public", "ban_list", ["is_public"]) | ||
|
|
||
| def downgrade() -> None: | ||
| op.drop_table('ban_list') | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,12 @@ | ||
| from fastapi import APIRouter | ||
|
|
||
| from app.api.routes import utils, guardrails, validator_configs | ||
| from app.api.routes import banlist_configs, guardrails, validator_configs, utils | ||
|
|
||
| api_router = APIRouter() | ||
| api_router.include_router(utils.router) | ||
| api_router.include_router(banlist_configs.router) | ||
| api_router.include_router(guardrails.router) | ||
| api_router.include_router(validator_configs.router) | ||
| api_router.include_router(utils.router) | ||
|
|
||
| # if settings.ENVIRONMENT == "local": | ||
| # api_router.include_router(private.router) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| from typing import Optional | ||
| from uuid import UUID | ||
|
|
||
| from fastapi import APIRouter, HTTPException | ||
|
|
||
| from app.api.deps import AuthDep, SessionDep | ||
| from app.crud.banlist import banlist_crud | ||
| from app.schemas.banlist import ( | ||
| BanListCreate, | ||
| BanListUpdate, | ||
| BanListResponse | ||
| ) | ||
| from app.utils import APIResponse | ||
|
|
||
| router = APIRouter( | ||
| prefix="/guardrails/ban-lists", | ||
| tags=["Ban Lists"] | ||
| ) | ||
|
|
||
| @router.post( | ||
| "/", | ||
| response_model=APIResponse[BanListResponse] | ||
| ) | ||
| def create_banlist( | ||
| payload: BanListCreate, | ||
| session: SessionDep, | ||
| organization_id: int, | ||
| project_id: int, | ||
| _: AuthDep, | ||
| ): | ||
| try: | ||
| response_model = banlist_crud.create(session, payload, organization_id, project_id) | ||
| return APIResponse.success_response(data=response_model) | ||
| except HTTPException as exc: | ||
| return APIResponse.failure_response(error=str(exc.detail)) | ||
| except Exception as exc: | ||
| return APIResponse.failure_response(error=str(exc)) | ||
|
|
||
| @router.get( | ||
| "/", | ||
| response_model=APIResponse[list[BanListResponse]] | ||
| ) | ||
| def list_banlists( | ||
| organization_id: int, | ||
| project_id: int, | ||
| session: SessionDep, | ||
| _: AuthDep, | ||
| domain: Optional[str] = None, | ||
| ): | ||
| try: | ||
| response_model = banlist_crud.list(session, organization_id, project_id, domain) | ||
| return APIResponse.success_response(data=response_model) | ||
| except HTTPException as exc: | ||
| return APIResponse.failure_response(error=str(exc.detail)) | ||
| except Exception as exc: | ||
| return APIResponse.failure_response(error=str(exc)) | ||
|
|
||
|
|
||
| @router.get( | ||
| "/{id}", | ||
| response_model=APIResponse[BanListResponse] | ||
| ) | ||
| def get_banlist( | ||
| id: UUID, | ||
| organization_id: int, | ||
| project_id: int, | ||
| session: SessionDep, | ||
| _: AuthDep, | ||
| ): | ||
| try: | ||
| obj = banlist_crud.get(session, id, organization_id, project_id) | ||
| return APIResponse.success_response(data=obj) | ||
| except HTTPException as exc: | ||
| return APIResponse.failure_response(error=str(exc.detail)) | ||
| except Exception as exc: | ||
| return APIResponse.failure_response(error=str(exc)) | ||
|
|
||
|
|
||
| @router.patch( | ||
| "/{id}", | ||
| response_model=APIResponse[BanListResponse] | ||
| ) | ||
| def update_banlist( | ||
| id: UUID, | ||
| organization_id: int, | ||
| project_id: int, | ||
| payload: BanListUpdate, | ||
| session: SessionDep, | ||
| _: AuthDep, | ||
| ): | ||
| try: | ||
| response_model = banlist_crud.update( | ||
| session, | ||
| id=id, | ||
| organization_id=organization_id, | ||
| project_id=project_id, | ||
| data=payload, | ||
| ) | ||
| return APIResponse.success_response(data=response_model) | ||
| except HTTPException as exc: | ||
| return APIResponse.failure_response(error=str(exc.detail)) | ||
| except Exception as exc: | ||
| return APIResponse.failure_response(error=str(exc)) | ||
|
|
||
| @router.delete( | ||
| "/{id}", | ||
| response_model=APIResponse[dict] | ||
| ) | ||
| def delete_banlist( | ||
| id: UUID, | ||
| organization_id: int, | ||
| project_id: int, | ||
| session: SessionDep, | ||
| _: AuthDep, | ||
| ): | ||
| try: | ||
| obj = banlist_crud.get(session, id, organization_id, project_id) | ||
| banlist_crud.delete(session, obj) | ||
| return APIResponse.success_response(data={"message": "Banlist deleted successfully"}) | ||
| except HTTPException as exc: | ||
| return APIResponse.failure_response(error=str(exc.detail)) | ||
| except Exception as exc: | ||
| return APIResponse.failure_response(error=str(exc)) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| from typing import List, Optional | ||
| from uuid import UUID | ||
|
|
||
| from fastapi import HTTPException | ||
| from sqlalchemy.exc import IntegrityError | ||
| from sqlmodel import Session, select | ||
|
|
||
| from app.models.config.banlist import BanList | ||
| from app.schemas.banlist import BanListCreate, BanListUpdate | ||
| from app.utils import now | ||
|
|
||
|
|
||
| class BanListCrud: | ||
| def create( | ||
| self, | ||
| session: Session, | ||
| data: BanListCreate, | ||
| organization_id: int, | ||
| project_id: int, | ||
| ) -> BanList: | ||
| obj = BanList( | ||
| **data.model_dump(), | ||
| organization_id=organization_id, | ||
| project_id=project_id, | ||
| ) | ||
| session.add(obj) | ||
|
|
||
| try: | ||
| session.commit() | ||
| except IntegrityError: | ||
| session.rollback() | ||
| raise HTTPException( | ||
| 400, | ||
| "Banlist already exists for the given configuration" | ||
| ) | ||
| except Exception: | ||
| session.rollback() | ||
| raise | ||
|
|
||
| session.refresh(obj) | ||
| return obj | ||
|
|
||
| def get( | ||
| self, | ||
| session: Session, | ||
| id: UUID, | ||
| organization_id: int, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can just pass the schema you made containing both org id and project id, and then use it later this way , schema.organization_id, schema.project_id
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, didn't get you |
||
| project_id: int | ||
| ) -> BanList: | ||
| obj = session.get(BanList, id) | ||
|
|
||
| if obj is None: | ||
| raise HTTPException(status_code=404, detail="Banlist not found") | ||
|
|
||
| if not obj.is_public: | ||
| self.check_owner(obj, organization_id, project_id) | ||
|
|
||
| return obj | ||
|
|
||
| def list( | ||
| self, | ||
| session: Session, | ||
| organization_id: int, | ||
| project_id: int, | ||
| domain: Optional[str] = None, | ||
| ) -> List[BanList]: | ||
| stmt = select(BanList).where( | ||
| ( | ||
| (BanList.organization_id == organization_id) & | ||
| (BanList.project_id == project_id) | ||
| ) | | ||
| (BanList.is_public == True) | ||
| ) | ||
|
|
||
| if domain: | ||
| stmt = stmt.where(BanList.domain == domain) | ||
|
|
||
| return list(session.exec(stmt)) | ||
|
|
||
| def update( | ||
| self, | ||
| session: Session, | ||
| id: UUID, | ||
| organization_id: int, | ||
| project_id: int, | ||
| data: BanListUpdate, | ||
| ) -> BanList: | ||
| obj = self.get(session, id, organization_id, project_id) | ||
| update_data = data.model_dump(exclude_unset=True) | ||
|
|
||
| for k, v in update_data.items(): | ||
| setattr(obj, k, v) | ||
|
|
||
| obj.updated_at = now() | ||
|
|
||
| session.add(obj) | ||
| try: | ||
| session.commit() | ||
| except IntegrityError: | ||
| session.rollback() | ||
| raise HTTPException( | ||
| 400, | ||
| "Banlist already exists for the given configuration" | ||
| ) | ||
| except Exception: | ||
| session.rollback() | ||
| raise | ||
|
|
||
| session.refresh(obj) | ||
| return obj | ||
|
|
||
| def delete(self, session: Session, obj: BanList): | ||
| session.delete(obj) | ||
| try: | ||
| session.commit() | ||
| except Exception: | ||
| session.rollback() | ||
| raise | ||
|
|
||
| def check_owner(self, obj: BanList, organization_id: int, project_id: int) -> None: | ||
| is_owner = ( | ||
| obj.organization_id == organization_id | ||
| and obj.project_id == project_id | ||
| ) | ||
|
|
||
| if not is_owner: | ||
| raise HTTPException( | ||
| status_code=403, | ||
| detail="You do not have permission to access this resource." | ||
| ) | ||
|
|
||
|
|
||
| banlist_crud = BanListCrud() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to this we might need to introduce them to other table as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which table?