diff --git a/pyproject.toml b/pyproject.toml index a1d8437..1067531 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,9 @@ dev = [ "build>=1.0", ] +[project.scripts] +create-x402-wallet = "x402_openai._cli:main" + [project.urls] Homepage = "https://github.com/qntx/x402-openai-python" Repository = "https://github.com/qntx/x402-openai-python" diff --git a/src/x402_openai/__init__.py b/src/x402_openai/__init__.py index 20247b2..6f89a92 100644 --- a/src/x402_openai/__init__.py +++ b/src/x402_openai/__init__.py @@ -20,9 +20,22 @@ policies=[prefer_network("eip155:8453")], ) + # Generic HTTP (any endpoint) + session = create_x402_session(wallet=EvmWallet(private_key="0x…")) + resp = session.post("https://apibase.pro/api/v1/tools/weather/call", + json={"city": "Tokyo"}) + + # MCP JSON-RPC + with X402MCPClient(wallet=EvmWallet(private_key="0x…"), + base_url="https://apibase.pro") as mcp: + result = mcp.call("/mcp/v1", "tools/call", + params={"name": "weather", "arguments": {"city": "Tokyo"}}) + Public API: -- :class:`X402OpenAI` / :class:`AsyncX402OpenAI` — recommended client classes. +- :class:`X402OpenAI` / :class:`AsyncX402OpenAI` — OpenAI drop-in clients. +- :func:`create_x402_session` / :func:`create_async_x402_session` — generic httpx clients. +- :class:`X402MCPClient` / :class:`AsyncX402MCPClient` — MCP JSON-RPC clients. - :class:`X402Transport` / :class:`AsyncX402Transport` — low-level transports. - :func:`prefer_network` / :func:`prefer_scheme` / :func:`max_amount` — payment policies. - :mod:`x402_openai.wallets` — chain-specific wallet adapters. @@ -31,17 +44,23 @@ from __future__ import annotations from x402_openai._client import AsyncX402OpenAI, X402OpenAI +from x402_openai._http import create_async_x402_session, create_x402_session +from x402_openai._mcp import AsyncX402MCPClient, X402MCPClient from x402_openai._transport import AsyncX402Transport, X402Transport from x402_openai.wallets import EvmWallet, SvmWallet, Wallet __all__ = [ + "AsyncX402MCPClient", "AsyncX402OpenAI", "AsyncX402Transport", "EvmWallet", "SvmWallet", "Wallet", + "X402MCPClient", "X402OpenAI", "X402Transport", + "create_async_x402_session", + "create_x402_session", # Lazily re-exported from x402 SDK. "max_amount", "prefer_network", diff --git a/src/x402_openai/_cli.py b/src/x402_openai/_cli.py new file mode 100644 index 0000000..17cfccf --- /dev/null +++ b/src/x402_openai/_cli.py @@ -0,0 +1,60 @@ +"""CLI: generate an x402 EVM wallet keypair. + +Creates a new keypair, saves it to ``~/.x402/wallet.json``, and prints the +funding address. Permissions are set to ``0o600`` so only the owner can +read the private key. + +Usage:: + + create-x402-wallet # via installed entry-point + python -m x402_openai # directly from the package +""" + +from __future__ import annotations + +import json +import pathlib +import sys + + +def main() -> None: + """Entry-point for the ``create-x402-wallet`` command.""" + try: + from eth_account import Account + except ImportError: + print( + "eth-account is required. Install with:\n pip install x402-openai[evm]", + file=sys.stderr, + ) + sys.exit(1) + + wallet_dir = pathlib.Path.home() / ".x402" + wallet_dir.mkdir(parents=True, exist_ok=True) + wallet_file = wallet_dir / "wallet.json" + + if wallet_file.exists(): + print( + f"Wallet already exists at {wallet_file}\n" + "Delete it manually before generating a new one.", + file=sys.stderr, + ) + sys.exit(1) + + account = Account.create() + data = { + "address": account.address, + "private_key": account.key.hex(), + "chain": "eip155", + } + wallet_file.write_text(json.dumps(data, indent=2)) + wallet_file.chmod(0o600) + + print(f"Wallet created : {wallet_file}") + print(f"Address : {account.address}") + print() + print("Fund this address with USDC on Base (chain ID 8453) to use x402.") + print("Minimum recommended balance: 1 USDC") + + +if __name__ == "__main__": + main() diff --git a/src/x402_openai/_http.py b/src/x402_openai/_http.py new file mode 100644 index 0000000..001b73b --- /dev/null +++ b/src/x402_openai/_http.py @@ -0,0 +1,109 @@ +"""Generic x402-aware httpx client factories. + +Drop-in replacements for ``httpx.Client`` / ``httpx.AsyncClient`` that +intercept HTTP 402 responses and automatically handle payment — framework +agnostic, works with any HTTP endpoint. + +Examples +-------- +Synchronous:: + + from x402_openai import create_x402_session + from x402_openai.wallets import EvmWallet + + client = create_x402_session(wallet=EvmWallet(private_key="0x…")) + resp = client.post("https://apibase.pro/api/v1/tools/weather/call", + json={"city": "Tokyo"}) + +Asynchronous:: + + from x402_openai import create_async_x402_session + from x402_openai.wallets import EvmWallet + + async with create_async_x402_session(wallet=EvmWallet(private_key="0x…")) as client: + resp = await client.post("https://apibase.pro/api/v1/tools/weather/call", + json={"city": "Tokyo"}) +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import httpx + +from x402_openai._transport import AsyncX402Transport, X402Transport +from x402_openai._wallet import create_x402_http_client + +if TYPE_CHECKING: + from x402_openai.wallets._base import Wallet + + +def create_x402_session( + *, + wallet: Wallet | None = None, + wallets: list[Wallet] | None = None, + x402_client: Any = None, + policies: list[Any] | None = None, + **httpx_kwargs: Any, +) -> httpx.Client: + """Return a synchronous ``httpx.Client`` with automatic x402 payment handling. + + Any request that receives a ``402 Payment Required`` response is + automatically paid and retried — the caller sees only the final 200. + + Parameters + ---------- + wallet: + Single chain wallet adapter. + wallets: + List of adapters for multi-chain support. + x402_client: + Pre-configured ``x402HTTPClientSync`` (takes precedence). + policies: + Optional x402 policy list (e.g. ``prefer_network``, ``max_amount``). + **httpx_kwargs: + Forwarded verbatim to ``httpx.Client()``. + """ + x402 = create_x402_http_client( + wallet=wallet, + wallets=wallets, + x402_client=x402_client, + policies=policies, + sync=True, + ) + return httpx.Client(transport=X402Transport(x402), **httpx_kwargs) + + +def create_async_x402_session( + *, + wallet: Wallet | None = None, + wallets: list[Wallet] | None = None, + x402_client: Any = None, + policies: list[Any] | None = None, + **httpx_kwargs: Any, +) -> httpx.AsyncClient: + """Return an asynchronous ``httpx.AsyncClient`` with automatic x402 payment handling. + + Same as :func:`create_x402_session` but async. + + Parameters + ---------- + wallet: + Single chain wallet adapter. + wallets: + List of adapters for multi-chain support. + x402_client: + Pre-configured ``x402HTTPClient`` (takes precedence). + policies: + Optional x402 policy list. + **httpx_kwargs: + Forwarded verbatim to ``httpx.AsyncClient()``. + """ + x402 = create_x402_http_client( + wallet=wallet, + wallets=wallets, + x402_client=x402_client, + policies=policies, + sync=False, + ) + return httpx.AsyncClient(transport=AsyncX402Transport(x402), **httpx_kwargs) diff --git a/src/x402_openai/_mcp.py b/src/x402_openai/_mcp.py new file mode 100644 index 0000000..9633b01 --- /dev/null +++ b/src/x402_openai/_mcp.py @@ -0,0 +1,331 @@ +"""x402-aware MCP JSON-RPC client. + +MCP servers signal payment requirements via JSON-RPC error code ``-32042``. +This client intercepts that error, signs the payment using the configured +wallet, and retries the request with credentials placed in ``params._meta``. + +HTTP-level ``402`` responses are also handled transparently by the underlying +:class:`~x402_openai._transport.X402Transport`. + +Examples +-------- +Synchronous:: + + from x402_openai import X402MCPClient + from x402_openai.wallets import EvmWallet + + with X402MCPClient( + wallet=EvmWallet(private_key="0x…"), + base_url="https://apibase.pro", + ) as client: + result = client.call( + "/mcp/v1", + "tools/call", + params={"name": "weather", "arguments": {"city": "Tokyo"}}, + ) + +Asynchronous:: + + from x402_openai import AsyncX402MCPClient + from x402_openai.wallets import EvmWallet + + async with AsyncX402MCPClient( + wallet=EvmWallet(private_key="0x…"), + base_url="https://apibase.pro", + ) as client: + result = await client.call( + "/mcp/v1", + "tools/call", + params={"name": "weather", "arguments": {"city": "Tokyo"}}, + ) +""" + +from __future__ import annotations + +import json +import logging +from typing import TYPE_CHECKING, Any + +import httpx + +from x402_openai._transport import AsyncX402Transport, X402Transport +from x402_openai._wallet import create_x402_http_client + +if TYPE_CHECKING: + from x402_openai.wallets._base import Wallet + +logger = logging.getLogger(__name__) + +# MCP x402 payment required error code (mirrors HTTP 402). +_MCP_PAYMENT_REQUIRED = -32042 + + +def _build_rpc_payload( + method: str, + params: dict[str, Any] | None, + rpc_id: int | str, +) -> bytes: + """Encode a JSON-RPC 2.0 request body.""" + payload: dict[str, Any] = { + "jsonrpc": "2.0", + "id": rpc_id, + "method": method, + } + if params is not None: + payload["params"] = params + return json.dumps(payload).encode() + + +def _inject_payment( + params: dict[str, Any] | None, + payment_headers: dict[str, str], +) -> dict[str, Any]: + """Merge *payment_headers* into ``params._meta`` for the retry request. + + Header keys are lower-cased so ``X-Payment`` becomes ``x-payment`` + inside ``_meta``, matching common MCP server expectations. + """ + base: dict[str, Any] = dict(params) if params else {} + meta: dict[str, Any] = dict(base.get("_meta") or {}) + meta.update({k.lower(): v for k, v in payment_headers.items()}) + base["_meta"] = meta + return base + + +class X402MCPClient: + """Synchronous MCP JSON-RPC client with transparent x402 payment. + + Parameters + ---------- + wallet / wallets / x402_client: + Provide exactly one credential source (same semantics as + :class:`~x402_openai.X402OpenAI`). + policies: + Optional x402 policy list. + base_url: + Default URL prefix; prepended to relative *url* arguments in + :meth:`call`. + """ + + __slots__ = ("_base_url", "_http", "_x402") + + def __init__( + self, + *, + wallet: Wallet | None = None, + wallets: list[Wallet] | None = None, + x402_client: Any = None, + policies: list[Any] | None = None, + base_url: str | None = None, + _inner: httpx.BaseTransport | None = None, + ) -> None: + self._x402 = create_x402_http_client( + wallet=wallet, + wallets=wallets, + x402_client=x402_client, + policies=policies, + sync=True, + ) + self._http = httpx.Client(transport=X402Transport(self._x402, inner=_inner)) + self._base_url = base_url or "" + + def call( + self, + url: str, + method: str, + params: dict[str, Any] | None = None, + *, + rpc_id: int | str = 1, + ) -> dict[str, Any]: + """Send a JSON-RPC request; on ``-32042`` pay and retry transparently. + + Parameters + ---------- + url: + Full URL or path appended to *base_url*. + method: + JSON-RPC method name (e.g. ``"tools/call"``). + params: + JSON-RPC params dict. + rpc_id: + JSON-RPC request id. + + Returns + ------- + The ``result`` field from the JSON-RPC response dict. + + Raises + ------ + ValueError + When a non-payment JSON-RPC error is returned, or when payment + signing fails. + httpx.HTTPStatusError + On non-2xx HTTP responses. + """ + full_url = url if url.startswith(("http://", "https://")) else self._base_url + url + body = _build_rpc_payload(method, params, rpc_id) + + response = self._http.post( + full_url, + content=body, + headers={"content-type": "application/json"}, + ) + response.raise_for_status() + data: dict[str, Any] = response.json() + + error = data.get("error") + if error and error.get("code") == _MCP_PAYMENT_REQUIRED: + logger.debug("x402-mcp: received -32042 — signing payment") + # Reconstruct headers expected by the x402 SDK: merge the + # JSON-RPC error data as if it were the X-Payment-Required header. + error_data = error.get("data") or {} + fake_headers: dict[str, str] = { + "x-payment-required": json.dumps(error_data), + **{k.lower(): v for k, v in response.headers.items()}, + } + try: + payment_headers, _ = self._x402.handle_402_response( + fake_headers, + response.content, + ) + except Exception as exc: + logger.exception("x402-mcp: payment signing failed") + raise ValueError(f"x402 payment failed: {error}") from exc + + retry_params = _inject_payment(params, payment_headers) + retry_body = _build_rpc_payload(method, retry_params, rpc_id) + retry_resp = self._http.post( + full_url, + content=retry_body, + headers={"content-type": "application/json"}, + ) + retry_resp.raise_for_status() + data = retry_resp.json() + + if "error" in data: + raise ValueError(f"MCP error: {data['error']}") + + result = data.get("result") + if result is None: + raise ValueError(f"MCP response missing 'result' field: {data}") + if not isinstance(result, dict): + raise ValueError(f"MCP 'result' is not a dict: {result!r}") + return result + + def close(self) -> None: + """Close the underlying HTTP client.""" + self._http.close() + + def __enter__(self) -> X402MCPClient: + return self + + def __exit__(self, *_: Any) -> None: + self.close() + + +class AsyncX402MCPClient: + """Asynchronous MCP JSON-RPC client with transparent x402 payment. + + Same as :class:`X402MCPClient` but all methods are ``async``. + + Parameters + ---------- + wallet / wallets / x402_client: + Provide exactly one credential source. + policies: + Optional x402 policy list. + base_url: + Default URL prefix. + """ + + __slots__ = ("_base_url", "_http", "_x402") + + def __init__( + self, + *, + wallet: Wallet | None = None, + wallets: list[Wallet] | None = None, + x402_client: Any = None, + policies: list[Any] | None = None, + base_url: str | None = None, + _inner: httpx.AsyncBaseTransport | None = None, + ) -> None: + self._x402 = create_x402_http_client( + wallet=wallet, + wallets=wallets, + x402_client=x402_client, + policies=policies, + sync=False, + ) + self._http = httpx.AsyncClient(transport=AsyncX402Transport(self._x402, inner=_inner)) + self._base_url = base_url or "" + + async def call( + self, + url: str, + method: str, + params: dict[str, Any] | None = None, + *, + rpc_id: int | str = 1, + ) -> dict[str, Any]: + """Send a JSON-RPC request; on ``-32042`` pay and retry transparently. + + Same contract as :meth:`X402MCPClient.call`. + """ + full_url = url if url.startswith(("http://", "https://")) else self._base_url + url + body = _build_rpc_payload(method, params, rpc_id) + + response = await self._http.post( + full_url, + content=body, + headers={"content-type": "application/json"}, + ) + response.raise_for_status() + data: dict[str, Any] = response.json() + + error = data.get("error") + if error and error.get("code") == _MCP_PAYMENT_REQUIRED: + logger.debug("x402-mcp: received -32042 — signing payment") + error_data = error.get("data") or {} + fake_headers: dict[str, str] = { + "x-payment-required": json.dumps(error_data), + **{k.lower(): v for k, v in response.headers.items()}, + } + try: + payment_headers, _ = await self._x402.handle_402_response( + fake_headers, + response.content, + ) + except Exception as exc: + logger.exception("x402-mcp: payment signing failed") + raise ValueError(f"x402 payment failed: {error}") from exc + + retry_params = _inject_payment(params, payment_headers) + retry_body = _build_rpc_payload(method, retry_params, rpc_id) + retry_resp = await self._http.post( + full_url, + content=retry_body, + headers={"content-type": "application/json"}, + ) + retry_resp.raise_for_status() + data = retry_resp.json() + + if "error" in data: + raise ValueError(f"MCP error: {data['error']}") + + result = data.get("result") + if result is None: + raise ValueError(f"MCP response missing 'result' field: {data}") + if not isinstance(result, dict): + raise ValueError(f"MCP 'result' is not a dict: {result!r}") + return result + + async def aclose(self) -> None: + """Close the underlying HTTP client.""" + await self._http.aclose() + + async def __aenter__(self) -> AsyncX402MCPClient: + return self + + async def __aexit__(self, *_: Any) -> None: + await self.aclose() diff --git a/tests/test_http.py b/tests/test_http.py new file mode 100644 index 0000000..f0e21ca --- /dev/null +++ b/tests/test_http.py @@ -0,0 +1,164 @@ +"""Tests for the generic x402 httpx session factories.""" + +from __future__ import annotations + +import httpx +import pytest + +from x402_openai._http import create_async_x402_session, create_x402_session + +# --------------------------------------------------------------------------- +# Fake x402 clients — no x402 SDK required. +# --------------------------------------------------------------------------- + + +class _FakeX402Sync: + def handle_402_response( + self, + headers: dict[str, str], + body: bytes, + ) -> tuple[dict[str, str], dict[str, str]]: + return {"x-payment": "signed"}, {} + + +class _FakeX402Async: + async def handle_402_response( + self, + headers: dict[str, str], + body: bytes, + ) -> tuple[dict[str, str], dict[str, str]]: + return {"x-payment": "signed"}, {} + + +# --------------------------------------------------------------------------- +# Fake inner transport helpers. +# --------------------------------------------------------------------------- + + +class _Pay402Transport(httpx.BaseTransport): + """Returns 402 on first call, 200 on second (after payment header added).""" + + def __init__(self) -> None: + self.calls = 0 + self.last_headers: dict[str, str] = {} + + def handle_request(self, request: httpx.Request) -> httpx.Response: + self.calls += 1 + if self.calls == 1: + return httpx.Response(402, headers={"x-402": "required"}, content=b"pay") + self.last_headers = dict(request.headers) + return httpx.Response(200, content=b"ok") + + +class _OkTransport(httpx.BaseTransport): + def __init__(self) -> None: + self.calls = 0 + + def handle_request(self, request: httpx.Request) -> httpx.Response: + self.calls += 1 + return httpx.Response(200, content=b"ok") + + +class _Pay402AsyncTransport(httpx.AsyncBaseTransport): + def __init__(self) -> None: + self.calls = 0 + self.last_headers: dict[str, str] = {} + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + self.calls += 1 + if self.calls == 1: + return httpx.Response(402, headers={"x-402": "required"}, content=b"pay") + self.last_headers = dict(request.headers) + return httpx.Response(200, content=b"ok") + + +class _OkAsyncTransport(httpx.AsyncBaseTransport): + def __init__(self) -> None: + self.calls = 0 + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + self.calls += 1 + return httpx.Response(200, content=b"ok") + + +# --------------------------------------------------------------------------- +# Sync session tests. +# --------------------------------------------------------------------------- + + +def test_create_x402_session_requires_credential() -> None: + with pytest.raises(ValueError, match="exactly one"): + create_x402_session() + + +def test_create_x402_session_returns_httpx_client() -> None: + client = create_x402_session(x402_client=_FakeX402Sync()) + assert isinstance(client, httpx.Client) + client.close() + + +def test_create_x402_session_retries_on_402() -> None: + inner = _Pay402Transport() + from x402_openai._transport import X402Transport + + client = httpx.Client(transport=X402Transport(_FakeX402Sync(), inner=inner)) + resp = client.get("https://example.com/resource") + + assert resp.status_code == 200 + assert inner.calls == 2 + assert inner.last_headers["x-payment"] == "signed" + client.close() + + +def test_create_x402_session_passthrough_non_402() -> None: + inner = _OkTransport() + from x402_openai._transport import X402Transport + + client = httpx.Client(transport=X402Transport(_FakeX402Sync(), inner=inner)) + resp = client.get("https://example.com/resource") + + assert resp.status_code == 200 + assert inner.calls == 1 + client.close() + + +# --------------------------------------------------------------------------- +# Async session tests. +# --------------------------------------------------------------------------- + + +def test_create_async_x402_session_requires_credential() -> None: + with pytest.raises(ValueError, match="exactly one"): + create_async_x402_session() + + +def test_create_async_x402_session_returns_httpx_async_client() -> None: + client = create_async_x402_session(x402_client=_FakeX402Async()) + assert isinstance(client, httpx.AsyncClient) + + +@pytest.mark.asyncio +async def test_create_async_x402_session_retries_on_402() -> None: + inner = _Pay402AsyncTransport() + from x402_openai._transport import AsyncX402Transport + + client = httpx.AsyncClient(transport=AsyncX402Transport(_FakeX402Async(), inner=inner)) + resp = await client.get("https://example.com/resource") + + assert resp.status_code == 200 + assert inner.calls == 2 + assert inner.last_headers["x-payment"] == "signed" + await client.aclose() + + +@pytest.mark.asyncio +async def test_create_async_x402_session_passthrough_non_402() -> None: + inner = _OkAsyncTransport() + from x402_openai._transport import AsyncX402Transport + + client = httpx.AsyncClient(transport=AsyncX402Transport(_FakeX402Async(), inner=inner)) + resp = await client.get("https://example.com/resource") + + assert resp.status_code == 200 + assert inner.calls == 1 + await client.aclose() diff --git a/tests/test_mcp.py b/tests/test_mcp.py new file mode 100644 index 0000000..521564e --- /dev/null +++ b/tests/test_mcp.py @@ -0,0 +1,353 @@ +"""Tests for X402MCPClient and AsyncX402MCPClient.""" + +from __future__ import annotations + +import json +from typing import Any + +import httpx +import pytest + +from x402_openai._mcp import ( + AsyncX402MCPClient, + X402MCPClient, + _build_rpc_payload, + _inject_payment, +) + +# --------------------------------------------------------------------------- +# Helpers — no x402 SDK required. +# --------------------------------------------------------------------------- + + +class _FakeX402Sync: + def __init__(self, headers: dict[str, str] | None = None) -> None: + self._headers = headers or {"x-payment": "signed-payload"} + self.calls = 0 + self.last_headers: dict[str, str] = {} + self.last_body = b"" + + def handle_402_response( + self, + headers: dict[str, str], + body: bytes, + ) -> tuple[dict[str, str], dict[str, str]]: + self.calls += 1 + self.last_headers = headers + self.last_body = body + return self._headers, {} + + +class _FailingX402Sync: + def handle_402_response( + self, + headers: dict[str, str], + body: bytes, + ) -> tuple[dict[str, str], dict[str, str]]: + raise RuntimeError("signing failed") + + +class _FakeX402Async: + def __init__(self, headers: dict[str, str] | None = None) -> None: + self._headers = headers or {"x-payment": "signed-payload"} + self.calls = 0 + + async def handle_402_response( + self, + headers: dict[str, str], + body: bytes, + ) -> tuple[dict[str, str], dict[str, str]]: + self.calls += 1 + return self._headers, {} + + +class _FailingX402Async: + async def handle_402_response( + self, + headers: dict[str, str], + body: bytes, + ) -> tuple[dict[str, str], dict[str, str]]: + raise RuntimeError("signing failed") + + +# --------------------------------------------------------------------------- +# Inner transports that return canned JSON-RPC responses. +# --------------------------------------------------------------------------- + + +def _rpc_ok(result: Any = {"status": "ok"}) -> bytes: # noqa: B006 + return json.dumps({"jsonrpc": "2.0", "id": 1, "result": result}).encode() + + +def _rpc_payment_required(data: dict[str, Any] | None = None) -> bytes: + return json.dumps( + { + "jsonrpc": "2.0", + "id": 1, + "error": { + "code": -32042, + "message": "Payment Required", + "data": data or {"x402Version": 1, "accepts": []}, + }, + } + ).encode() + + +def _rpc_error(code: int = -32600, message: str = "bad request") -> bytes: + return json.dumps( + {"jsonrpc": "2.0", "id": 1, "error": {"code": code, "message": message}} + ).encode() + + +class _SequentialTransport(httpx.BaseTransport): + """Returns responses in the order they were provided.""" + + def __init__(self, responses: list[httpx.Response]) -> None: + self._responses = list(responses) + self.requests: list[httpx.Request] = [] + + def handle_request(self, request: httpx.Request) -> httpx.Response: + self.requests.append(request) + return self._responses.pop(0) + + +class _SequentialAsyncTransport(httpx.AsyncBaseTransport): + def __init__(self, responses: list[httpx.Response]) -> None: + self._responses = list(responses) + self.requests: list[httpx.Request] = [] + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + self.requests.append(request) + return self._responses.pop(0) + + +# --------------------------------------------------------------------------- +# Unit tests for pure helper functions. +# --------------------------------------------------------------------------- + + +def test_build_rpc_payload_includes_method_and_id() -> None: + raw = _build_rpc_payload("tools/call", {"name": "weather"}, 42) + payload = json.loads(raw) + assert payload["jsonrpc"] == "2.0" + assert payload["method"] == "tools/call" + assert payload["id"] == 42 + assert payload["params"] == {"name": "weather"} + + +def test_build_rpc_payload_omits_params_when_none() -> None: + raw = _build_rpc_payload("ping", None, 1) + payload = json.loads(raw) + assert "params" not in payload + + +def test_inject_payment_merges_into_meta() -> None: + result = _inject_payment( + {"name": "weather"}, + {"X-Payment": "signed", "X-Other": "val"}, + ) + assert result["name"] == "weather" + assert result["_meta"]["x-payment"] == "signed" + assert result["_meta"]["x-other"] == "val" + + +def test_inject_payment_preserves_existing_meta() -> None: + result = _inject_payment( + {"_meta": {"existing": "value"}}, + {"X-Payment": "signed"}, + ) + assert result["_meta"]["existing"] == "value" + assert result["_meta"]["x-payment"] == "signed" + + +def test_inject_payment_handles_none_params() -> None: + result = _inject_payment(None, {"X-Payment": "signed"}) + assert result["_meta"]["x-payment"] == "signed" + + +# --------------------------------------------------------------------------- +# X402MCPClient sync tests. +# --------------------------------------------------------------------------- + + +def _make_sync_client( + x402: Any, + responses: list[httpx.Response], + base_url: str = "https://api.example.com", +) -> tuple[X402MCPClient, _SequentialTransport]: + inner = _SequentialTransport(responses) + client = X402MCPClient(x402_client=x402, base_url=base_url, _inner=inner) + return client, inner + + +def test_mcp_sync_passthrough_success() -> None: + client, inner = _make_sync_client( + _FakeX402Sync(), + [httpx.Response(200, content=_rpc_ok({"data": 42}))], + ) + result = client.call("/mcp", "tools/call", params={"name": "weather"}) + assert result == {"data": 42} + assert len(inner.requests) == 1 + client.close() + + +def test_mcp_sync_retries_on_32042() -> None: + x402 = _FakeX402Sync() + client, inner = _make_sync_client( + x402, + [ + httpx.Response(200, content=_rpc_payment_required()), + httpx.Response(200, content=_rpc_ok()), + ], + ) + result = client.call("/mcp", "tools/call", params={"name": "weather"}) + assert result == {"status": "ok"} + assert len(inner.requests) == 2 + assert x402.calls == 1 + client.close() + + +def test_mcp_sync_payment_injected_into_meta() -> None: + x402 = _FakeX402Sync(headers={"x-payment": "signed-abc"}) + client, inner = _make_sync_client( + x402, + [ + httpx.Response(200, content=_rpc_payment_required()), + httpx.Response(200, content=_rpc_ok()), + ], + ) + client.call("/mcp", "tools/call", params={"name": "weather"}) + + retry_body = json.loads(inner.requests[1].content) + assert retry_body["params"]["_meta"]["x-payment"] == "signed-abc" + client.close() + + +def test_mcp_sync_raises_on_signing_failure() -> None: + client, _ = _make_sync_client( + _FailingX402Sync(), + [httpx.Response(200, content=_rpc_payment_required())], + ) + with pytest.raises(ValueError, match="x402 payment failed"): + client.call("/mcp", "tools/call") + client.close() + + +def test_mcp_sync_raises_on_non_payment_rpc_error() -> None: + client, _ = _make_sync_client( + _FakeX402Sync(), + [httpx.Response(200, content=_rpc_error(-32600, "invalid request"))], + ) + with pytest.raises(ValueError, match="MCP error"): + client.call("/mcp", "tools/call") + client.close() + + +def test_mcp_sync_context_manager() -> None: + inner = _SequentialTransport([httpx.Response(200, content=_rpc_ok())]) + with X402MCPClient(x402_client=_FakeX402Sync(), _inner=inner) as client: + result = client.call("https://example.com/mcp", "ping") + assert result == {"status": "ok"} + + +def test_mcp_sync_absolute_url_not_prefixed() -> None: + inner = _SequentialTransport([httpx.Response(200, content=_rpc_ok())]) + client = X402MCPClient( + x402_client=_FakeX402Sync(), + base_url="https://should-not-appear.example.com", + _inner=inner, + ) + client.call("https://actual.example.com/mcp", "ping") + assert str(inner.requests[0].url) == "https://actual.example.com/mcp" + client.close() + + +# --------------------------------------------------------------------------- +# AsyncX402MCPClient tests. +# --------------------------------------------------------------------------- + + +def _make_async_client( + x402: Any, + responses: list[httpx.Response], + base_url: str = "https://api.example.com", +) -> tuple[AsyncX402MCPClient, _SequentialAsyncTransport]: + inner = _SequentialAsyncTransport(responses) + client = AsyncX402MCPClient(x402_client=x402, base_url=base_url, _inner=inner) + return client, inner + + +@pytest.mark.asyncio +async def test_mcp_async_passthrough_success() -> None: + client, inner = _make_async_client( + _FakeX402Async(), + [httpx.Response(200, content=_rpc_ok({"answer": 7}))], + ) + result = await client.call("/mcp", "tools/call", params={"name": "weather"}) + assert result == {"answer": 7} + assert len(inner.requests) == 1 + await client.aclose() + + +@pytest.mark.asyncio +async def test_mcp_async_retries_on_32042() -> None: + x402 = _FakeX402Async() + client, inner = _make_async_client( + x402, + [ + httpx.Response(200, content=_rpc_payment_required()), + httpx.Response(200, content=_rpc_ok()), + ], + ) + result = await client.call("/mcp", "tools/call", params={"name": "weather"}) + assert result == {"status": "ok"} + assert len(inner.requests) == 2 + assert x402.calls == 1 + await client.aclose() + + +@pytest.mark.asyncio +async def test_mcp_async_payment_injected_into_meta() -> None: + x402 = _FakeX402Async(headers={"x-payment": "async-signed"}) + client, inner = _make_async_client( + x402, + [ + httpx.Response(200, content=_rpc_payment_required()), + httpx.Response(200, content=_rpc_ok()), + ], + ) + await client.call("/mcp", "tools/call", params={"name": "weather"}) + + retry_body = json.loads(inner.requests[1].content) + assert retry_body["params"]["_meta"]["x-payment"] == "async-signed" + await client.aclose() + + +@pytest.mark.asyncio +async def test_mcp_async_raises_on_signing_failure() -> None: + client, _ = _make_async_client( + _FailingX402Async(), + [httpx.Response(200, content=_rpc_payment_required())], + ) + with pytest.raises(ValueError, match="x402 payment failed"): + await client.call("/mcp", "tools/call") + await client.aclose() + + +@pytest.mark.asyncio +async def test_mcp_async_raises_on_non_payment_rpc_error() -> None: + client, _ = _make_async_client( + _FakeX402Async(), + [httpx.Response(200, content=_rpc_error())], + ) + with pytest.raises(ValueError, match="MCP error"): + await client.call("/mcp", "tools/call") + await client.aclose() + + +@pytest.mark.asyncio +async def test_mcp_async_context_manager() -> None: + inner = _SequentialAsyncTransport([httpx.Response(200, content=_rpc_ok())]) + async with AsyncX402MCPClient(x402_client=_FakeX402Async(), _inner=inner) as client: + result = await client.call("https://example.com/mcp", "ping") + assert result == {"status": "ok"} diff --git a/tests/test_transport.py b/tests/test_transport.py index 44e5cb0..562115b 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -289,7 +289,9 @@ async def test_async_transport_passes_through_non_402_response() -> None: inner = _PassthroughAsyncTransport() transport = AsyncX402Transport(_FakeX402ClientAsync(), inner=inner) - response = await transport.handle_async_request(httpx.Request("GET", "https://example.com/v1/models")) + response = await transport.handle_async_request( + httpx.Request("GET", "https://example.com/v1/models") + ) assert response.status_code == 200 assert inner.calls == 1