Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 68 additions & 9 deletions spp_api_v2/routers/oauth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
"""OAuth 2.0 endpoints for API V2"""

import base64
import logging
import os
from datetime import datetime, timedelta
Expand Down Expand Up @@ -39,41 +40,99 @@
scope: str


async def _parse_token_request(http_request: Request) -> TokenRequest:
"""Parse token request from JSON, form-encoded body, or HTTP Basic Auth.

Supports three client authentication methods per RFC 6749:
1. HTTP Basic Auth header (Section 2.3.1) - used by QGIS native OAuth2
2. Form-encoded body with client_id/client_secret (Section 2.3.1)
3. JSON body (non-standard, for backwards compatibility)
"""
# Extract client credentials from Authorization: Basic header if present
# (RFC 6749 Section 2.3.1 - preferred method for confidential clients)
basic_client_id = ""
basic_client_secret = ""
auth_header = http_request.headers.get("authorization", "")
if auth_header.startswith("Basic "):
try:
decoded = base64.b64decode(auth_header[6:]).decode("utf-8")
if ":" in decoded:
basic_client_id, basic_client_secret = decoded.split(":", 1)
except (ValueError, UnicodeDecodeError) as e:
_logger.debug("Failed to decode Basic Auth header: %s", e)

content_type = http_request.headers.get("content-type", "")
if "application/x-www-form-urlencoded" in content_type:
form = await http_request.form()
return TokenRequest(
grant_type=form.get("grant_type", ""),
client_id=form.get("client_id", "") or basic_client_id,
client_secret=form.get("client_secret", "") or basic_client_secret,
)

# Try JSON body
try:
body = await http_request.json()
return TokenRequest(**body)
except Exception as e:
_logger.debug("Could not parse token request from JSON body, falling back: %s", e)

Check warning

Code scanning / Semgrep OSS

Semgrep Finding: python.lang.security.audit.logging.logger-credential-leak.python-logger-credential-disclosure Warning

Detected a python logger call with a potential hardcoded secret "Could not parse token request from JSON body, falling back: %s" being logged. This may lead to secret credentials being exposed. Make sure that the logger is not logging sensitive information.

# Fall back to Basic Auth only (grant_type defaults to client_credentials)
if basic_client_id:
return TokenRequest(
grant_type="client_credentials",
client_id=basic_client_id,
client_secret=basic_client_secret,
)

raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unable to parse token request. Send credentials via HTTP Basic Auth header, "
"form-encoded body, or JSON body.",
)


@oauth_router.post("/oauth/token", response_model=TokenResponse)
async def get_token(
http_request: Request,
request: TokenRequest,
token_request: Annotated[TokenRequest, Depends(_parse_token_request)],
env: Annotated[Environment, Depends(odoo_env)],
_rate_limit: Annotated[None, Depends(check_auth_rate_limit)],
):
"""
OAuth 2.0 Client Credentials flow.

Authenticates API client and returns JWT access token.
Accepts both JSON and form-encoded request bodies (RFC 6749).

SECURITY: Rate limited to 5 requests/minute per IP to prevent brute force.
"""
# Validate grant type
if request.grant_type != "client_credentials":
if token_request.grant_type != "client_credentials":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unsupported grant_type. Only 'client_credentials' is supported.",
)

# Authenticate client
# nosemgrep: odoo-sudo-without-context
api_client = env["spp.api.client"].sudo().authenticate(request.client_id, request.client_secret)
api_client = env["spp.api.client"].sudo().authenticate(token_request.client_id, token_request.client_secret)

if not api_client:
_logger.warning("Failed authentication attempt for client_id: %s", request.client_id)
_logger.warning("Failed authentication attempt for client_id: %s", token_request.client_id)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid client credentials",
)

# Read configurable token lifetime (default: 24 hours for long-lived sessions)
# nosemgrep: odoo-sudo-without-context
token_lifetime_hours = int(env["ir.config_parameter"].sudo().get_param("spp_api_v2.token_lifetime_hours", "24"))
expires_in = token_lifetime_hours * 3600

# Generate JWT token
try:
token = _generate_jwt_token(env, api_client)
token = _generate_jwt_token(env, api_client, token_lifetime_hours)
except Exception as e:
_logger.exception("Error generating JWT token")
raise HTTPException(
Expand All @@ -87,7 +146,7 @@
return TokenResponse(
access_token=token,
token_type="Bearer",
expires_in=3600, # 1 hour
expires_in=expires_in,
scope=scope_str,
)

Expand Down Expand Up @@ -129,14 +188,14 @@
return secret


def _generate_jwt_token(env: Environment, api_client) -> str:
def _generate_jwt_token(env: Environment, api_client, token_lifetime_hours: int) -> str:
"""
Generate JWT access token for API client.

Token contains:
- client_id (external identifier, NOT database ID)
- scopes
- expiration (1 hour)
- expiration time determined by token_lifetime_hours

SECURITY: Never include database IDs in JWT.
The auth middleware loads the full api_client record from DB using client_id.
Expand All @@ -160,7 +219,7 @@
"iss": "openspp-api-v2", # Issuer
"sub": api_client.client_id, # Subject (client_id)
"aud": "openspp", # Audience
"exp": now + timedelta(hours=1), # Expiration
"exp": now + timedelta(hours=token_lifetime_hours), # Expiration
"iat": now, # Issued at
"client_id": api_client.client_id,
"scopes": [f"{s.resource}:{s.action}" for s in api_client.scope_ids],
Expand Down
53 changes: 52 additions & 1 deletion spp_api_v2/tests/test_oauth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
"""Tests for OAuth endpoints"""

import base64
import json
from urllib.parse import urlencode

from ..middleware.rate_limit import get_rate_limiter
from .common import ApiV2HttpTestCase
Expand Down Expand Up @@ -45,7 +47,7 @@ def test_token_generation_success(self):
self.assertIn("token_type", data)
self.assertEqual(data["token_type"], "Bearer")
self.assertIn("expires_in", data)
self.assertEqual(data["expires_in"], 3600) # 1 hour
self.assertEqual(data["expires_in"], 86400) # 24 hours (default)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

While this test is correctly updated for the new default token lifetime, the test suite is missing coverage for the new authentication methods introduced in this PR (HTTP Basic Auth and application/x-www-form-urlencoded body). Adding tests for these new code paths is crucial to ensure they work as expected and to prevent future regressions.

self.assertIn("scope", data)
self.assertIn("individual:read", data["scope"])
self.assertIn("group:search", data["scope"])
Expand Down Expand Up @@ -218,6 +220,55 @@ def test_client_last_used_date_updated(self):
self.client.invalidate_recordset()
self.assertTrue(self.client.last_used_date)

def test_token_generation_basic_auth(self):
"""HTTP Basic Auth header returns access token"""
credentials = base64.b64encode(f"{self.client.client_id}:{self.client.client_secret}".encode()).decode("utf-8")

body = urlencode({"grant_type": "client_credentials"})

response = self.url_open(
self.url,
data=body,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {credentials}",
},
)

self.assertEqual(response.status_code, 200)

data = json.loads(response.content)
self.assertIn("access_token", data)
self.assertEqual(data["token_type"], "Bearer")
self.assertIn("expires_in", data)
self.assertIn("scope", data)

def test_token_generation_form_encoded(self):
"""Form-encoded body (application/x-www-form-urlencoded) returns access token"""
body = urlencode(
{
"grant_type": "client_credentials",
"client_id": self.client.client_id,
"client_secret": self.client.client_secret,
}
)

response = self.url_open(
self.url,
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)

self.assertEqual(response.status_code, 200)

data = json.loads(response.content)
self.assertIn("access_token", data)
self.assertEqual(data["token_type"], "Bearer")
self.assertIn("expires_in", data)
self.assertIn("scope", data)
self.assertIn("individual:read", data["scope"])
self.assertIn("group:search", data["scope"])

def test_token_no_scopes(self):
"""Client with no scopes still gets token but empty scope string"""
# Create client without scopes
Expand Down
7 changes: 6 additions & 1 deletion spp_dci_server/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import json
from unittest.mock import MagicMock, patch

from psycopg2 import IntegrityError

from odoo.exceptions import ValidationError
from odoo.tests import tagged

Expand Down Expand Up @@ -72,7 +74,9 @@ def test_transaction_uniqueness_per_sender(self):
)

# Same transaction_id, same sender - should fail
with self.assertRaises(ValidationError):
# SQL UNIQUE constraint raises IntegrityError, use cr.savepoint()
# to avoid breaking the test transaction
with self.assertRaises(IntegrityError), self.cr.savepoint():
self.Transaction.create(
{
"transaction_id": "unique-txn-001",
Expand All @@ -81,6 +85,7 @@ def test_transaction_uniqueness_per_sender(self):
"sender_uri": self.test_sender_id,
}
)
self.cr.flush()

def test_transaction_same_id_different_sender_allowed(self):
"""Test that same transaction_id is allowed for different senders."""
Expand Down
23 changes: 18 additions & 5 deletions spp_mis_demo_v2/tests/test_demo_programs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class TestDemoPrograms(TransactionCase):

The demo programs use activated registry variables for CEL expressions:
- Universal Child Grant: child_count variable
- Conditional Child Grant: members.exists() for first 1,000 days
- Elderly Social Pension: age + retirement_age variables
- Emergency Relief Fund: dependency_ratio, is_female_headed, elderly_count
- Cash Transfer Program: hh_total_income, poverty_line, hh_size
Expand All @@ -16,16 +17,17 @@ class TestDemoPrograms(TransactionCase):
"""

def test_get_all_demo_programs(self):
"""Test that all 6 demo programs are returned."""
"""Test that all 7 demo programs are returned."""
from odoo.addons.spp_mis_demo_v2.models import demo_programs

programs = demo_programs.get_all_demo_programs()
self.assertIsInstance(programs, list)
self.assertEqual(len(programs), 6, "Expected exactly 6 demo programs")
self.assertEqual(len(programs), 7, "Expected exactly 7 demo programs")

# Check expected programs exist (V3 names)
program_names = [p["name"] for p in programs]
self.assertIn("Universal Child Grant", program_names)
self.assertIn("Conditional Child Grant", program_names)
self.assertIn("Elderly Social Pension", program_names)
self.assertIn("Emergency Relief Fund", program_names)
self.assertIn("Cash Transfer Program", program_names)
Expand Down Expand Up @@ -348,8 +350,10 @@ def test_get_programs_by_pack(self):
from odoo.addons.spp_mis_demo_v2.models import demo_programs

programs = demo_programs.get_programs_by_pack("child_benefit")
self.assertEqual(len(programs), 1)
self.assertEqual(programs[0]["name"], "Universal Child Grant")
self.assertEqual(len(programs), 2)
program_names = [p["name"] for p in programs]
self.assertIn("Universal Child Grant", program_names)
self.assertIn("Conditional Child Grant", program_names)

# Non-existent pack should return empty
programs = demo_programs.get_programs_by_pack("nonexistent")
Expand Down Expand Up @@ -460,11 +464,20 @@ def test_story_program_alignment_complete(self):
)

def test_all_programs_have_enrolled_stories(self):
"""Test that every demo program has at least one enrolled story."""
"""Test that demo programs have at least one enrolled story.

Conditional Child Grant is excluded: it demonstrates compliance
features and members.exists() CEL patterns without persona stories.
"""
from odoo.addons.spp_mis_demo_v2.models import demo_programs

# Programs that intentionally have no story personas
programs_without_stories = {"Conditional Child Grant"}

for program in demo_programs.get_all_demo_programs():
program_name = program["name"]
if program_name in programs_without_stories:
continue
stories = program.get("stories", [])
self.assertGreater(
len(stories),
Expand Down
Loading