From d56cbee8ea42b400f1693b3515c3b7fde3d6f8b5 Mon Sep 17 00:00:00 2001 From: Florin Iucha Date: Fri, 16 Jan 2026 22:29:16 +0000 Subject: [PATCH] feat: implement mock AP2 signature verification Why: - To verify that the server correctly rejects invalid AP2 mandates with the appropriate error code. - Required for conformance testing of the AP2 security flow. What: - Added 'Ap2VerificationError' to exceptions. - Implemented '_verify_ap2_mandate' in 'CheckoutService' to reject mandates containing the substring 'invalid_signature'. - Updated 'complete_checkout' to invoke this verification. --- rest/python/server/exceptions.py | 8 ++++++++ .../python/server/services/checkout_service.py | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/rest/python/server/exceptions.py b/rest/python/server/exceptions.py index 6f2c32e..8586ce5 100644 --- a/rest/python/server/exceptions.py +++ b/rest/python/server/exceptions.py @@ -76,3 +76,11 @@ class InvalidRequestError(UcpError): def __init__(self, message: str): """Initialize InvalidRequestError.""" super().__init__(message, code="INVALID_REQUEST", status_code=400) + + +class Ap2VerificationError(UcpError): + """Raised when AP2 mandate verification fails.""" + + def __init__(self, message: str, code: str = "mandate_invalid_signature"): + """Initialize Ap2VerificationError.""" + super().__init__(message, code=code, status_code=400) diff --git a/rest/python/server/services/checkout_service.py b/rest/python/server/services/checkout_service.py index 3769aef..96e42fd 100644 --- a/rest/python/server/services/checkout_service.py +++ b/rest/python/server/services/checkout_service.py @@ -40,6 +40,7 @@ import config import db from enums import CheckoutStatus +from exceptions import Ap2VerificationError from exceptions import CheckoutNotModifiableError from exceptions import IdempotencyConflictError from exceptions import InvalidRequestError @@ -650,6 +651,10 @@ async def complete_checkout( checkout = await self._get_and_validate_checkout(checkout_id) self._ensure_modifiable(checkout, "complete") + # Verify AP2 Mandate if present + if ap2: + self._verify_ap2_mandate(ap2) + # Process Payment await self._process_payment(payment) @@ -1160,6 +1165,19 @@ async def _recalculate_totals( checkout.totals.append(Total(type="total", amount=grand_total)) + def _verify_ap2_mandate(self, ap2: Ap2CompleteRequest) -> None: + """Verify the AP2 mandate. + + In this sample implementation, we simulate verification failure if the + mandate contains a specific trigger string. + """ + mandate_str = ap2.checkout_mandate.root + if "invalid_signature" in mandate_str: + raise Ap2VerificationError( + "Invalid AP2 mandate signature (mock)", + code="mandate_invalid_signature", + ) + async def _process_payment(self, payment: PaymentCreateRequest) -> None: """Validate and process payment instruments.""" instruments = payment.instruments