Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions lightapi/_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from starlette.requests import Request
from starlette.responses import JSONResponse

from .rate_limiter import RateLimiter

# JWTAuthentication imported locally where needed to avoid circular import

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -67,7 +69,10 @@ async def _read_body(request: Request) -> dict[str, Any]:
"""Read JSON body; return {} on empty or invalid."""
try:
body = await request.body()
return json.loads(body) if body else {}
if body:
result: dict[str, Any] = json.loads(body)
return result
return {}
except (json.JSONDecodeError, TypeError):
return {}

Expand All @@ -80,7 +85,7 @@ async def login_handler(
jwt_expiration: int | None = None,
jwt_extra_claims: list[str] | None = None,
jwt_algorithm: str | None = None,
rate_limiter: Optional[Any] = None,
rate_limiter: Optional[RateLimiter] = None,
) -> JSONResponse:
"""
Handle POST /auth/login and POST /auth/token.
Expand All @@ -91,7 +96,7 @@ async def login_handler(
# Apply rate limiting if a rate limiter is provided
if rate_limiter is not None:
is_limited, window = rate_limiter.is_rate_limited(request, endpoint="auth")
if is_limited:
if is_limited and window is not None:
return rate_limiter.get_rate_limit_response(request, window)

if request.method != "POST":
Expand Down
163 changes: 109 additions & 54 deletions lightapi/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from typing import Any, Optional

import jwt
from starlette.requests import Request
Expand Down Expand Up @@ -29,90 +30,130 @@ def authenticate(self, request: Request) -> bool:
"""
return True

def get_auth_error_response(self, request: Request) -> JSONResponse:
"""
Get the response to return when authentication fails.

def get_auth_error_response(self, request: Request) -> JSONResponse:
"""
Get the response to return when authentication fails.

Args:
request: The HTTP request object.
Args:
request: The HTTP request object.

Returns:
Response object for authentication error.
"""
return JSONResponse(
{"error": "authentication failed"},
status_code=401,
headers={"WWW-Authenticate": 'Basic realm="Restricted Area"'},
)
Returns:
Response object for authentication error.
"""
return JSONResponse({"error": "authentication failed"}, status_code=401)
Comment on lines +33 to +43
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep the frozen auth classes untouched.

This PR adds behavior directly to BaseAuthentication and rewrites JWTAuthentication, but both class bodies are explicitly frozen in this repo. Please move the new auth flow into helpers/new backends instead of editing these classes in place.

As per coding guidelines, "lightapi/auth.py: Do not modify lightapi/auth.py: JWTAuthentication and BaseAuthentication class bodies are FROZEN`."

Also applies to: 46-131

🧰 Tools
🪛 Ruff (0.15.5)

[warning] 33-33: Unused method argument: request

(ARG002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lightapi/auth.py` around lines 33 - 43, The change incorrectly modifies
frozen class bodies in BaseAuthentication and JWTAuthentication (including
get_auth_error_response); revert any edits to those classes and instead
implement the new auth flow in a new helper/backend module (e.g.,
helpers/new_auth or helpers/new_backends) that provides the new
functions/classes and an adapter to call from existing entry points; move all
new logic (error response creation, token parsing, validation flow) out of
lightapi/auth.py into the new module and update callers to use the adapter or
helper functions without changing BaseAuthentication or JWTAuthentication
bodies.



class BasicAuthentication(BaseAuthentication):
class JWTAuthentication(BaseAuthentication):
"""
Basic (Base64) authentication.
JWT (JSON Web Token) based authentication.

Authenticates requests using Authorization: Basic <base64(username:password)>.
Delegates credential validation to the app-level login_validator from the registry.
Authenticates requests using JWT tokens from the Authorization header.
Validates token signatures and expiration times.
Automatically skips authentication for OPTIONS requests (CORS preflight).

Attributes:
secret_key: Secret key for signing tokens.
algorithm: JWT algorithm to use.
expiration: Token expiration time in seconds.
"""

def __init__(
self,
login_validator: Optional[LoginValidator] = None,
) -> None:
self.login_validator = login_validator
secret_key: str | None = None,
algorithm: str | None = None,
expiration: int | None = None,
):
self.secret_key = secret_key or config.jwt_secret
if not self.secret_key:
raise ValueError(
"JWT secret key not configured. Set LIGHTAPI_JWT_SECRET environment variable."
)

self.algorithm = algorithm or config.jwt_algorithm
self.expiration = expiration or 3600 # 1 hour default

def authenticate(self, request: Request) -> bool:
"""
Authenticate a request using JWT token.
Automatically allows OPTIONS requests for CORS preflight.

Args:
request: The HTTP request object.

Returns:
bool: True if authentication succeeds, False otherwise.
"""
# Skip authentication for OPTIONS requests (CORS preflight)
if request.method == "OPTIONS":
return True

auth_header = request.headers.get("Authorization")
if not auth_header:
return False

# Use the shared Basic auth parsing function
from lightapi._login import _parse_basic_header

credentials = _parse_basic_header(auth_header)
if credentials is None:
return False

username, password = credentials
from lightapi._registry import get_login_validator

validator = self.login_validator or get_login_validator()
if validator is None:
if not auth_header or not auth_header.lower().startswith("bearer "):
return False

try:
payload = validator(username, password)
except Exception:
return False

if payload is None:
token = auth_header.split(" ", 1)[1]
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
except (jwt.InvalidTokenError, ValueError, IndexError):
return False

request.state.user = payload
return True

def get_auth_error_response(self, request: Request) -> JSONResponse:
def generate_token(
self, payload: dict[str, Any], expiration: int | None = None
) -> str:
"""
Get the response to return when authentication fails.
Generate a JWT token with the given payload.

Args:
request: The HTTP request object.
payload: Dictionary of claims to include in the token.
expiration: Optional expiration time in seconds (overrides default).

Returns:
Response object for authentication error.
str: The encoded JWT token.

Raises:
ValueError: If payload contains 'exp' claim which will be overwritten.
"""
return JSONResponse({"error": "authentication failed"}, status_code=401)
# Check for 'exp' in payload since we overwrite it
if "exp" in payload:
raise ValueError(
"Payload contains 'exp' claim which will be overwritten. "
"Use the 'expiration' parameter instead."
)

exp_seconds = expiration or self.expiration
token_data = {
**payload,
"exp": datetime.utcnow() + timedelta(seconds=exp_seconds),
}
return jwt.encode(token_data, self.secret_key, algorithm=self.algorithm)


class BasicAuthentication(BaseAuthentication):
"""
Basic (Base64) authentication.

Authenticates requests using Authorization: Basic <base64(username:password)>.
Delegates credential validation to the provided login_validator.
"""

def __init__(
self,
login_validator: Optional[LoginValidator] = None,
) -> None:
self.login_validator = login_validator

def authenticate(self, request: Request) -> bool:
if request.method == "OPTIONS":
return True

auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.lower().startswith("basic "):
return False

try:
import base64

token = auth_header.split(" ", 1)[1]
decoded = base64.b64decode(token).decode("utf-8")
except (ValueError, IndexError, UnicodeDecodeError):
Expand All @@ -123,15 +164,13 @@ def get_auth_error_response(self, request: Request) -> JSONResponse:
return False

username, password = parts[0], parts[1]
from lightapi._registry import get_login_validator

validator = get_login_validator()
validator = self.login_validator or get_login_validator()
if validator is None:
return False

try:
payload = validator(username, password)
except Exception:
except (ValueError, TypeError, RuntimeError):
return False

if payload is None:
Expand All @@ -140,11 +179,27 @@ def get_auth_error_response(self, request: Request) -> JSONResponse:
request.state.user = payload
return True

def get_auth_error_response(self, request: Request) -> JSONResponse:
"""
Get the response to return when authentication fails.

Args:
request: The HTTP request object.

Returns:
Response object for authentication error.
"""
return JSONResponse(
{"error": "authentication failed"},
status_code=401,
headers={"WWW-Authenticate": 'Basic realm="Restricted Area"'},
)


class AllowAny:
"""Permits all requests regardless of authentication state."""

def has_permission(self, request: Request) -> bool:
def has_permission(self, _request: Request) -> bool:
return True


Expand Down
9 changes: 6 additions & 3 deletions lightapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ def jwt_algorithm(self) -> str:
class Authentication:
"""Authentication configuration for a RestEndpoint."""

# Standard JWT reserved claims that cannot be used as extra claims
RESERVED_CLAIMS = {"exp", "iat", "nbf", "iss", "sub", "aud", "jti"}
Comment on lines +59 to +60
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make RESERVED_CLAIMS immutable.

Ruff is correctly flagging this mutable class attribute (RUF012), and mutating it later would silently change JWT-claim validation for every Authentication instance. ClassVar[frozenset[str]] matches the current read-only usage.

Suggested fix
-from typing import Any
+from typing import Any, ClassVar
@@
-    RESERVED_CLAIMS = {"exp", "iat", "nbf", "iss", "sub", "aud", "jti"}
+    RESERVED_CLAIMS: ClassVar[frozenset[str]] = frozenset(
+        {"exp", "iat", "nbf", "iss", "sub", "aud", "jti"}
+    )
As per coding guidelines, "`**/*.py`: Pass all ruff check linting before merge`."
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Standard JWT reserved claims that cannot be used as extra claims
RESERVED_CLAIMS = {"exp", "iat", "nbf", "iss", "sub", "aud", "jti"}
# Standard JWT reserved claims that cannot be used as extra claims
RESERVED_CLAIMS: ClassVar[frozenset[str]] = frozenset(
{"exp", "iat", "nbf", "iss", "sub", "aud", "jti"}
)
🧰 Tools
🪛 Ruff (0.15.5)

[warning] 60-60: Mutable default value for class attribute

(RUF012)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lightapi/config.py` around lines 59 - 60, Change the mutable set
RESERVED_CLAIMS to an immutable class-level frozenset with an explicit
ClassVar[frozenset[str]] annotation so it cannot be mutated at runtime; update
the declaration of RESERVED_CLAIMS (the class attribute used during JWT
validation) to use ClassVar[frozenset[str]] and construct it with
frozenset({"exp", "iat", "nbf", "iss", "sub", "aud", "jti"}), adding any needed
typing import (ClassVar) so linters (RUF012) pass and Authentication-related
validation cannot be altered by accidental mutation.


def __init__(
self,
backend: type | None = None,
Expand All @@ -74,16 +77,16 @@ def __init__(

# Validate jwt_extra_claims - reject reserved claims
if jwt_extra_claims:
RESERVED_CLAIMS = {"exp", "iat", "nbf", "iss", "sub", "aud", "jti"}
reserved_found = []
for claim in jwt_extra_claims:
if claim in RESERVED_CLAIMS:
if claim in self.RESERVED_CLAIMS:
reserved_found.append(claim)

if reserved_found:
raise ConfigurationError(
f"JWT extra claims cannot include reserved claims: "
f"{reserved_found}. Reserved claims are: {sorted(RESERVED_CLAIMS)}"
f"{reserved_found}. Reserved claims are: "
f"{sorted(self.RESERVED_CLAIMS)}"
)

self.jwt_extra_claims = jwt_extra_claims
Expand Down
8 changes: 6 additions & 2 deletions lightapi/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@

class RateLimiter:
"""
Simple in-memory rate limiter.
Simple in-memory rate limiter.

Tracks requests by IP address and endpoint.
Tracks requests by IP address and endpoint.
NOTE: This implementation uses process-local counters. In a multi-process
deployment (e.g., with multiple workers), rate limiting will not be shared
across processes. For production use with multiple workers, consider using
a shared storage backend like Redis.
"""

def __init__(
Expand Down
13 changes: 10 additions & 3 deletions lightapi/yaml_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,16 @@ def _resolve_callable(dotted_path: str) -> Any:

try:
sig = inspect.signature(fn)
except ValueError:
# Some callables (e.g., builtins) don't have inspectable signatures
return fn
except (ValueError, TypeError) as exc:
# Only allow specific cases where signature inspection legitimately fails
if hasattr(fn, "__name__") and fn.__name__ in ("<lambda>",):
# Lambdas can't be properly inspected in some Python versions
return fn
# For other cases, raise a clear error about the validation function
raise ValueError(
f"Login validation function {fn!r} cannot be inspected: {exc}. "
f"Ensure it's a regular Python function with inspectable signature."
) from exc
Comment on lines +115 to +124
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject uninspectable login validators instead of letting them through.

lightapi/_registry.py:1-8 types this callable as (str, str) -> ..., and both lightapi/_login.py:8-45 and lightapi/auth.py:32-48 invoke it with exactly (username, password). Returning fn here skips that contract check, and the non-lambda path now leaks ValueError instead of the loader’s normal ConfigurationError. Keep this branch fail-closed so bad config is rejected during load_config().

Suggested fix
-    except (ValueError, TypeError) as exc:
-        # Only allow specific cases where signature inspection legitimately fails
-        if hasattr(fn, "__name__") and fn.__name__ in ("<lambda>",):
-            # Lambdas can't be properly inspected in some Python versions
-            return fn
-        # For other cases, raise a clear error about the validation function
-        raise ValueError(
+    except (ValueError, TypeError) as exc:
+        raise ConfigurationError(
             f"Login validation function {fn!r} cannot be inspected: {exc}. "
             f"Ensure it's a regular Python function with inspectable signature."
         ) from exc
🧰 Tools
🪛 Ruff (0.15.5)

[warning] 121-124: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lightapi/yaml_loader.py` around lines 115 - 124, The except block currently
lets uninspectable validators (including lambdas) pass by returning fn; instead
reject them during loading by raising the loader's ConfigurationError instead of
returning fn (and avoid leaking a raw ValueError). Change the lambda branch so
it does not return fn but raises ConfigurationError with a clear message
referencing the invalid validator (fn) and the original exception (exc), so
load_config() fails closed; keep references to fn, load_config(), and
ConfigurationError in the message to make debugging straightforward.


# Count required positional parameters
required_params = 0
Expand Down
Empty file added tests/__init__.py
Empty file.
18 changes: 9 additions & 9 deletions tests/test_cache_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def client():
class TestCacheGetList:
def test_cache_get_list_returns_cached_on_second_request(self, client):
"""First GET hits DB, second GET returns cached (when get_cached returns data)."""
with patch("lightapi.cache.get_cached") as mock_get:
with patch("lightapi.cache.set_cached") as mock_set:
with patch("lightapi.lightapi.get_cached") as mock_get:
with patch("lightapi.lightapi.set_cached") as mock_set:
mock_get.return_value = None # First call: cache miss
resp1 = client.get("/cached")
assert resp1.status_code == 200
Expand All @@ -60,13 +60,13 @@ def test_cache_get_list_returns_cached_on_second_request(self, client):
class TestCacheInvalidation:
def test_cache_post_invalidates(self, client):
"""POST triggers cache invalidation."""
with patch("lightapi.cache.invalidate_cache_prefix") as mock_inv:
with patch("lightapi.lightapi.invalidate_cache_prefix") as mock_inv:
client.post("/cached", json={"name": "new"})
mock_inv.assert_called()

def test_cache_put_invalidates(self, client):
"""PUT triggers cache invalidation."""
with patch("lightapi.cache.invalidate_cache_prefix") as mock_inv:
with patch("lightapi.lightapi.invalidate_cache_prefix") as mock_inv:
post_resp = client.post("/cached", json={"name": "item"})
item_id = post_resp.json()["id"]
version = post_resp.json()["version"]
Expand All @@ -78,7 +78,7 @@ def test_cache_put_invalidates(self, client):

def test_cache_delete_invalidates(self, client):
"""DELETE triggers cache invalidation."""
with patch("lightapi.cache.invalidate_cache_prefix") as mock_inv:
with patch("lightapi.lightapi.invalidate_cache_prefix") as mock_inv:
post_resp = client.post("/cached", json={"name": "to_delete"})
item_id = post_resp.json()["id"]
client.delete(f"/cached/{item_id}")
Expand All @@ -101,8 +101,8 @@ def test_cache_redis_unreachable_startup_warning(self):

def test_cache_redis_unreachable_mid_request_serves_db(self, client):
"""When get_cached raises/fails, GET still returns 200 from DB."""
with patch("lightapi.cache.get_cached", side_effect=Exception("Redis down")):
with patch("lightapi.cache.set_cached"):
with patch("lightapi.lightapi.get_cached", side_effect=Exception("Redis down")):
with patch("lightapi.lightapi.set_cached"):
resp = client.get("/cached")
assert resp.status_code == 200
assert "results" in resp.json()
Expand All @@ -121,8 +121,8 @@ def test_cache_vary_on_query_params_uses_different_keys(self):
c = TestClient(app.build_app())
c.post("/cached_vary", json={"label": "x"})

with patch("lightapi.cache.get_cached") as mock_get:
with patch("lightapi.cache.set_cached") as mock_set:
with patch("lightapi.lightapi.get_cached") as mock_get:
with patch("lightapi.lightapi.set_cached") as mock_set:
mock_get.return_value = None
c.get("/cached_vary?page=1")
c.get("/cached_vary?page=2")
Expand Down
Loading
Loading