Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions examples/mcp-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Payment-protected MCP tools using Server-Sent Events (SSE).

This example demonstrates:
- **Server**: SSE-based MCP server with free and paid tools
- **Client**: Connects to server and handles the payment flow
- **Client**: Connects to server and handles payment automatically via `McpClient`

The server and client run in separate terminals, communicating via SSE.

Expand Down Expand Up @@ -73,15 +73,9 @@ Available tools:
1. Calling free tool (echo)...
Result: Echo: Hello, world!

2. Calling paid tool without credential (premium_echo)...
Got error code: -32042
Challenge ID: abc123...

3. Creating payment credential...
Credential created for challenge: abc123...

4. Retrying with credential...
2. Calling paid tool (premium_echo)...
Result: ✨ Premium Echo ✨: Hello, premium! (paid by 0x..., tx: 0x...)
Receipt: success, ref=0x...
```

## Server Implementations
Expand Down
83 changes: 21 additions & 62 deletions examples/mcp-server/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#!/usr/bin/env python3
"""MCP client demonstrating the payment flow.
"""MCP client demonstrating automatic payment handling.

Connects to an already-running MCP server via SSE and demonstrates:
1. Calling a free tool (echo)
2. Calling a paid tool without credentials (gets -32042 error)
3. Parsing the challenge, creating a credential, and retrying
2. Calling a paid tool (premium_echo) with automatic payment

Uses McpClient to handle the payment flow automatically—no manual
challenge parsing or credential creation needed.

Usage:
# Terminal 1: Start the server
Expand All @@ -27,11 +29,7 @@
from mcp import ClientSession
from mcp.client.sse import sse_client

from mpp.extensions.mcp import (
CODE_PAYMENT_REQUIRED,
MCPChallenge,
MCPCredential,
)
from mpp.extensions.mcp import McpClient
from mpp.methods.tempo import ChargeIntent, TempoAccount, tempo

SERVER_URL = os.environ.get("MCP_SERVER_URL", "http://127.0.0.1:8000/sse")
Expand All @@ -57,69 +55,30 @@ async def run_client() -> None:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()

# Wrap the session with automatic payment handling
client = McpClient(session, methods=[method])

tools = await session.list_tools()
print("Available tools:")
for tool in tools.tools:
print(f" - {tool.name}: {tool.description}")
print()

# 1. Free tool — works without payment
print("1. Calling free tool (echo)...")
result = await session.call_tool("echo", {"message": "Hello, world!"})
print(f" Result: {result.content[0].text}")
result = await client.call_tool("echo", {"message": "Hello, world!"})
print(f" Result: {result.result.content[0].text}")
print()

print("2. Calling paid tool without credential (premium_echo)...")
try:
result = await session.call_tool(
"premium_echo", {"message": "Hello, premium!"}
)
print(f" Result: {result.content[0].text}")
except Exception as e:
error_data = getattr(e, "error", None) or {}
error_code = (
error_data.get("code")
if isinstance(error_data, dict)
else getattr(error_data, "code", None)
)

print(f" Got error code: {error_code}")

if error_code == CODE_PAYMENT_REQUIRED:
data = (
error_data.get("data", {})
if isinstance(error_data, dict)
else getattr(error_data, "data", {})
)
challenges = (
data.get("challenges", []) if isinstance(data, dict) else []
)

if challenges:
challenge_data = challenges[0]
print(f" Challenge ID: {challenge_data.get('id', 'unknown')}")
print()

print("3. Creating payment credential...")
challenge = MCPChallenge.from_dict(challenge_data)
core_credential = await method.create_credential(
challenge.to_core()
)

mcp_credential = MCPCredential.from_core(
core_credential, challenge
)
print(f" Credential created for challenge: {challenge.id}")
print()

print("4. Retrying with credential...")
result = await session.call_tool(
"premium_echo",
{"message": "Hello, premium!"},
meta=mcp_credential.to_meta(),
)
print(f" Result: {result.content[0].text}")
else:
print(f" Unexpected error: {e}")
# 2. Paid tool — McpClient handles payment automatically
print("2. Calling paid tool (premium_echo)...")
result = await client.call_tool(
"premium_echo", {"message": "Hello, premium!"}
)
print(f" Result: {result.result.content[0].text}")
if result.receipt:
print(f" Receipt: {result.receipt.status}, ref={result.receipt.reference}")
print()


def main() -> None:
Expand Down
1 change: 1 addition & 0 deletions src/mpp/extensions/mcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async def expensive_tool(query: str, *, credential, receipt) -> str:
"""

from mpp.extensions.mcp.capabilities import payment_capabilities
from mpp.extensions.mcp.client import McpClient, McpToolResult
from mpp.extensions.mcp.constants import (
CODE_MALFORMED_CREDENTIAL,
CODE_PAYMENT_REQUIRED,
Expand Down
210 changes: 210 additions & 0 deletions src/mpp/extensions/mcp/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""Payment-aware MCP client wrapper.

Wraps an MCP SDK ``ClientSession`` with automatic payment handling.
When a tool call returns a ``-32042`` payment required error, the wrapper
creates a Credential and retries the call—mirroring the TypeScript
``McpClient.wrap`` API.

Example:
from mcp import ClientSession
from mcp.client.sse import sse_client
from mpp.extensions.mcp import McpClient
from mpp.methods.tempo import tempo, TempoAccount, ChargeIntent

account = TempoAccount.from_key("0x...")
method = tempo(account=account, intents={"charge": ChargeIntent()})

async with sse_client("http://localhost:8000/sse") as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()

client = McpClient(session, methods=[method])
result = await client.call_tool("premium_tool", {"query": "hello"})
print(result.receipt)
"""

from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable

from mpp.extensions.mcp.constants import CODE_PAYMENT_REQUIRED, META_RECEIPT
from mpp.extensions.mcp.types import MCPChallenge, MCPCredential, MCPReceipt

if TYPE_CHECKING:
from mpp import Challenge, Credential

logger = logging.getLogger(__name__)


@runtime_checkable
class Method(Protocol):
"""Payment method interface for MCP client credential creation."""

name: str

async def create_credential(self, challenge: Challenge) -> Credential:
"""Create a credential to satisfy the given challenge."""
...


def _is_payment_required_error(error: Exception) -> bool:
"""Check whether an MCP error is a -32042 payment required error.

Distinguishes payment errors from other uses of -32042 (such as
URL elicitation) by checking for a ``challenges`` array in ``error.data``.
"""
code = getattr(error, "code", None)
if code != CODE_PAYMENT_REQUIRED:
return False
data = getattr(error, "data", None)
if not isinstance(data, dict):
return False
challenges = data.get("challenges")
return isinstance(challenges, list) and len(challenges) > 0


def _extract_challenges(error: Exception) -> list[dict[str, Any]]:
"""Extract the challenges array from a payment required error."""
data = getattr(error, "data", {})
return data.get("challenges", []) if isinstance(data, dict) else []


@dataclass(frozen=True, slots=True)
class McpToolResult:
"""Result of a payment-aware tool call.

Wraps the raw MCP ``CallToolResult`` and surfaces the payment receipt.
"""

result: Any
receipt: MCPReceipt | None = None


class McpClient:
"""Payment-aware MCP client wrapper.

Wraps an MCP SDK ``ClientSession`` and overrides ``call_tool`` with
automatic payment handling. When a tool call returns ``-32042``, the
wrapper matches the challenge to an installed payment method, creates
a credential, and retries.

Args:
session: An initialized ``mcp.ClientSession``.
methods: Payment methods available for credential creation.

Example:
client = McpClient(session, methods=[tempo(...)])
result = await client.call_tool("premium_tool", {"query": "hello"})
print(result.receipt)
"""

def __init__(self, session: Any, methods: list[Method]) -> None:
self._session = session
self._methods = methods

async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
*,
timeout: float | None = None,
meta: dict[str, Any] | None = None,
) -> McpToolResult:
"""Call an MCP tool with automatic payment handling.

On a ``-32042`` error, matches the challenge to an installed method,
creates a credential, and retries the call with the credential in
``params._meta``.

Args:
name: Tool name.
arguments: Tool arguments.
timeout: Per-call timeout override (passed as ``read_timeout_seconds``).
meta: Additional ``_meta`` fields to include in the request.

Returns:
An ``McpToolResult`` with the tool result and an optional receipt.

Raises:
McpError: If the error is not payment-related or no method matches.
ValueError: If no installed method matches the server's challenge.
"""
from mcp.shared.exceptions import McpError

call_kwargs: dict[str, Any] = {}
if timeout is not None:
call_kwargs["read_timeout_seconds"] = timeout
if meta is not None:
call_kwargs["meta"] = meta

try:
result = await self._session.call_tool(name, arguments, **call_kwargs)
receipt = self._extract_receipt(result)
return McpToolResult(result=result, receipt=receipt)

except McpError as e:
if not _is_payment_required_error(e):
raise

challenges_data = _extract_challenges(e)
challenge, method = self._match_challenge(challenges_data)

core_credential = await method.create_credential(challenge.to_core())
mcp_credential = MCPCredential.from_core(core_credential, challenge)

retry_meta = dict(meta) if meta else {}
retry_meta.update(mcp_credential.to_meta())

retry_kwargs: dict[str, Any] = {"meta": retry_meta}
if timeout is not None:
retry_kwargs["read_timeout_seconds"] = timeout

retry_result = await self._session.call_tool(name, arguments, **retry_kwargs)
receipt = self._extract_receipt(retry_result)
return McpToolResult(result=retry_result, receipt=receipt)

def _match_challenge(
self, challenges_data: list[dict[str, Any]]
) -> tuple[MCPChallenge, Method]:
"""Match a challenge to an installed method.

Iterates installed methods in order (client preference) and returns
the first match by ``name`` and ``intent``.
"""
for method in self._methods:
for cd in challenges_data:
if cd.get("method") == method.name and cd.get("intent") in self._intent_names(
method
):
return MCPChallenge.from_dict(cd), method

available = [cd.get("method") for cd in challenges_data]
installed = [m.name for m in self._methods]
raise ValueError(
f"No compatible payment method. Server offered: {available}, client has: {installed}"
)

@staticmethod
def _intent_names(method: Method) -> set[str]:
"""Get intent names supported by a method."""
intents = getattr(method, "intents", None) or getattr(method, "_intents", None)
if isinstance(intents, dict):
return set(intents.keys())
return {"charge"}

@staticmethod
def _extract_receipt(result: Any) -> MCPReceipt | None:
"""Extract a payment receipt from a tool result's _meta."""
meta = getattr(result, "meta", None)
if not meta or not isinstance(meta, dict):
return None
receipt_data = meta.get(META_RECEIPT)
if receipt_data is None:
return None
try:
return MCPReceipt.from_dict(receipt_data)
except (KeyError, TypeError):
logger.warning("Failed to parse receipt from _meta")
return None
Loading
Loading