Skip to content
13 changes: 13 additions & 0 deletions asab/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .service import MCPService
from .decorators import mcp_tool, mcp_resource_template
from .datacls import MCPToolInfo, MCPResourceTemplateInfo, MCPToolResultTextContent, MCPToolResultResourceLink

__all__ = [
"MCPService",
"mcp_tool",
"mcp_resource_template",
"MCPToolInfo",
"MCPResourceTemplateInfo",
"MCPToolResultTextContent",
"MCPToolResultResourceLink",
]
46 changes: 46 additions & 0 deletions asab/mcp/datacls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import dataclasses


@dataclasses.dataclass
class MCPToolInfo:
name: str
title: str
description: str
inputSchema: dict
outputSchema: dict


@dataclasses.dataclass
class MCPToolResult:
pass


@dataclasses.dataclass
class MCPToolResultTextContent(MCPToolResult):
'''
https://modelcontextprotocol.io/specification/2025-06-18/server/tools#text-content
'''
text: str


@dataclasses.dataclass
class MCPToolResultResourceLink(MCPToolResult):
'''
https://modelcontextprotocol.io/specification/2025-06-18/server/tools#resource-links
'''
uri: str
name: str
description: str
mimeType: str
title: str = None # For resource listing
# TODO: Resource annotations


@dataclasses.dataclass
class MCPResourceTemplateInfo:
_uriPrefix: str
uriTemplate: str
name: str
title: str
description: str
mimeType: str
28 changes: 28 additions & 0 deletions asab/mcp/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from .datacls import MCPToolInfo, MCPResourceTemplateInfo


def mcp_tool(name, title, description, inputSchema=None, outputSchema=None):
def decorator(func):
func._mcp_tool_info = MCPToolInfo(
name=name.strip(),
title=title.strip(),
description=description.strip(),
inputSchema=inputSchema,
outputSchema=outputSchema,
)
return func
return decorator


def mcp_resource_template(uri_prefix: str, uri_template: str, name: str, title: str, description: str, mimeType: str):
def decorator(func):
func._mcp_resource_template_info = MCPResourceTemplateInfo(
uriTemplate=uri_template,
_uriPrefix=uri_prefix,
name=name.strip(),
title=title.strip(),
description=description.strip(),
mimeType=mimeType,
)
return func
return decorator
285 changes: 285 additions & 0 deletions asab/mcp/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
import logging
import dataclasses

import asab

import aiohttp_rpc

from .utils import rpc_ping, prune_nulls
from .datacls import MCPToolResultTextContent, MCPToolResultResourceLink


L = logging.getLogger(__name__)


class MCPService(asab.Service):

def __init__(self, app, web, service_name="asab.MCPService", name="asab-mcp", version="25.11.0"):
super().__init__(app, service_name)

self.Tools = {}
self.ResourceTemplates = {}
self.ResourceLists = {}
self.Instructions = {}

self.Name = name
self.Version = version

self.RPCServer = aiohttp_rpc.JsonRpcServer(middlewares=[logging_middleware])
web.add_post(r'/{tenant}/mcp', self._handle_http_request)

self.RPCServer.add_method(aiohttp_rpc.JsonRpcMethod(self._rpc_mcp_initialize, name="initialize"))
self.RPCServer.add_method(aiohttp_rpc.JsonRpcMethod(self._rpc_notifications_initialized, name="notifications/initialized"))
self.RPCServer.add_method(aiohttp_rpc.JsonRpcMethod(rpc_ping, name="ping"))

self.RPCServer.add_method(aiohttp_rpc.JsonRpcMethod(self._rcp_tools_list, name="tools/list"))
self.RPCServer.add_method(aiohttp_rpc.JsonRpcMethod(self._rpc_tools_call, name="tools/call"))

self.RPCServer.add_method(aiohttp_rpc.JsonRpcMethod(self._rpc_resources_list, name="resources/list"))
self.RPCServer.add_method(aiohttp_rpc.JsonRpcMethod(self._rpc_resources_read, name="resources/read"))

self.RPCServer.add_method(aiohttp_rpc.JsonRpcMethod(self._rpc_resource_templates_list, name="resources/templates/list"))


def add_tool(self, tool_function, mcp_tool_info=None):
if mcp_tool_info is None and hasattr(tool_function, '_mcp_tool_info'):
mcp_tool_info = tool_function._mcp_tool_info

if mcp_tool_info is None:
raise ValueError("MCP tool info is required")

self.Tools[mcp_tool_info.name] = (tool_function, mcp_tool_info)


def add_resource_template(self, resource_template_function, mcp_resource_template_info=None):
if mcp_resource_template_info is None and hasattr(resource_template_function, '_mcp_resource_template_info'):
mcp_resource_template_info = resource_template_function._mcp_resource_template_info

if mcp_resource_template_info is None:
raise ValueError("MCP resource template info is required")

self.ResourceTemplates[mcp_resource_template_info.name] = (resource_template_function, mcp_resource_template_info)


def add_resource_list(self, resource_uri_prefix, resource_list_function):
self.ResourceLists[resource_uri_prefix] = resource_list_function


def add_instruction(self, who, instruction):
self.Instructions[who] = instruction


async def _handle_http_request(self, request):
# TODO: Handle tenant and authorization
return await self.RPCServer.handle_http_request(request)


async def _rpc_mcp_initialize(self, capabilities=None, clientInfo=None, *args, **kwargs):
capabilities = capabilities.copy() or {}
clientInfo = clientInfo or {}

L.log(asab.LOG_NOTICE, "MCP Client initializing", struct_data={
"name": clientInfo.get('name', 'unknown'),
"version": clientInfo.get('version', 'unknown'),

})

instructions = ""
for instruction in self.Instructions.values():
instructions += instruction + "\n"

if len(self.Tools) > 0:
capabilities['tools'] = {
'listChanged': True,
}

if len(self.ResourceTemplates) > 0 or len(self.ResourceLists) > 0:
capabilities['resources'] = {
'listChanged': True,
}

return {
"protocolVersion": "2024-11-05",
"serverInfo": {
"name": self.Name,
"version": self.Version,
},
"instructions": instructions,
"capabilities": capabilities,
}
Comment on lines +77 to +109
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix capabilities.copy() when capabilities is None.

With the MCP initialize request, capabilities is optional; when omitted you’ll hit AttributeError on capabilities.copy(). Initializing via the or {} branch needs to happen before calling .copy().

You can do:

-	async def _rpc_mcp_initialize(self, capabilities=None, clientInfo=None, *args, **kwargs):
-		capabilities = capabilities.copy() or {}
+	async def _rpc_mcp_initialize(self, capabilities=None, clientInfo=None, *args, **kwargs):
+		if capabilities is None:
+			capabilities = {}
+		else:
+			capabilities = capabilities.copy()

The rest of the capability augmentation logic looks fine.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def _rpc_mcp_initialize(self, capabilities=None, clientInfo=None, *args, **kwargs):
capabilities = capabilities.copy() or {}
clientInfo = clientInfo or {}
L.log(asab.LOG_NOTICE, "MCP Client initializing", struct_data={
"name": clientInfo.get('name', 'unknown'),
"version": clientInfo.get('version', 'unknown'),
})
instructions = ""
for instruction in self.Instructions.values():
instructions += instruction + "\n"
if len(self.Tools) > 0:
capabilities['tools'] = {
'listChanged': True,
}
if len(self.ResourceTemplates) > 0 or len(self.ResourceLists) > 0:
capabilities['resources'] = {
'listChanged': True,
}
return {
"protocolVersion": "2024-11-05",
"serverInfo": {
"name": self.Name,
"version": self.Version,
},
"instructions": instructions,
"capabilities": capabilities,
}
async def _rpc_mcp_initialize(self, capabilities=None, clientInfo=None, *args, **kwargs):
if capabilities is None:
capabilities = {}
else:
capabilities = capabilities.copy()
clientInfo = clientInfo or {}
L.log(asab.LOG_NOTICE, "MCP Client initializing", struct_data={
"name": clientInfo.get('name', 'unknown'),
"version": clientInfo.get('version', 'unknown'),
})
instructions = ""
for instruction in self.Instructions.values():
instructions += instruction + "\n"
if len(self.Tools) > 0:
capabilities['tools'] = {
'listChanged': True,
}
if len(self.ResourceTemplates) > 0 or len(self.ResourceLists) > 0:
capabilities['resources'] = {
'listChanged': True,
}
return {
"protocolVersion": "2024-11-05",
"serverInfo": {
"name": self.Name,
"version": self.Version,
},
"instructions": instructions,
"capabilities": capabilities,
}
🧰 Tools
🪛 Ruff (0.14.5)

77-77: Unused method argument: args

(ARG002)


77-77: Unused method argument: kwargs

(ARG002)

🤖 Prompt for AI Agents
In asab/mcp/service.py around lines 77 to 109, capabilities.copy() is called
when capabilities may be None which raises AttributeError; fix by ensuring
capabilities is defaulted before copying — e.g. evaluate (capabilities or {})
first and then call .copy() (capabilities = (capabilities or {}).copy()),
leaving the rest of the augmentation logic unchanged.



async def _rcp_tools_list(self, *args, **kwargs):
'''
To discover available tools, clients send a tools/list request.

https://modelcontextprotocol.io/specification/2025-06-18/server/tools#listing-tools
'''
# TODO: Pagination ...
return {
"tools": [
prune_nulls(dataclasses.asdict(mcp_tool_info))
for _, mcp_tool_info in self.Tools.values()
],
}


async def _rpc_tools_call(self, name, arguments, *args, **kwargs):
'''
To invoke a tool, clients send a tools/call request.

https://modelcontextprotocol.io/specification/2025-06-18/server/tools#invoking-tools
'''
arguments = arguments or {}

x = self.Tools.get(name)
if x is None:
L.warning("Tool not found", struct_data={"name": name})
raise KeyError(f"Tool {name} not found")

tool_function, _ = x

try:
result = await tool_function(**arguments)
except Exception as e:
L.exception("Tool failed", struct_data={"name": name, "error": str(e)})
return {
"content": [{
"type": "text",
"text": "General error occurred."
}],
"isError": True,
}

if not isinstance(result, list):
result = [result]

transformed_result = []
for item in result:
if isinstance(item, MCPToolResultTextContent):
transformed_result.append({
"type": "text",
"text": item.text,
})
elif isinstance(item, str):
# A shortcut for Text content.
transformed_result.append({
"type": "text",
"text": item,
})
elif isinstance(item, MCPToolResultResourceLink):
transformed_result.append({
"type": "resource_link",
"uri": item.uri,
"name": item.name,
"description": item.description,
"mimeType": item.mimeType,
})
else:
raise ValueError(f"Unsupported result type: {type(item)}")

return {
"content": transformed_result,
"isError": False,
}


async def _rpc_resources_list(self, *args, **kwargs):
'''
To list resources, clients send a resources/list request.

https://modelcontextprotocol.io/specification/2025-06-18/server/resources#listing-resources
'''
resources = []

for _, resource_list_function in self.ResourceLists.items():
resources.extend(await resource_list_function())

transformed_resources = []
for resource in resources:
if isinstance(resource, MCPToolResultResourceLink):
transformed_resources.append(prune_nulls({
"uri": resource.uri,
"name": resource.name,
"title": resource.title,
"description": resource.description,
"mimeType": resource.mimeType,
}))
else:
raise ValueError(f"Unsupported resource type: {type(resource)}")

return {
"resources": transformed_resources,
}


async def _rpc_resources_read(self, uri, *args, **kwargs):
'''
To read a resource, clients send a resources/read request.

https://modelcontextprotocol.io/specification/2025-06-18/server/resources#reading-resources
'''
fnct = None

# TODO: Check the "direct"

# Find the resource template function that matches the URI
if fnct is None:
for resource_template_function, mcp_resource_template_info in self.ResourceTemplates.values():
if uri.startswith(mcp_resource_template_info._uriPrefix):
fnct = resource_template_function
break

if fnct is None:
# TODO: Find a more compliant way to handle this, but for now we'll just raise an error.
raise KeyError(f"Resource template {uri} not found")

result = await fnct(uri)
if result is None:
return {
"contents": [],
}

if not isinstance(result, list):
result = [result]

return {
"contents": result,
}
Comment on lines +216 to +248
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Ensure resources/read returns JSON-serializable content, not dataclass instances.

_rpc_resources_read returns:

result = await fnct(uri)
...
if not isinstance(result, list):
    result = [result]

return {
    "contents": result,
}

Unlike tools listing and resource templates listing (which use dataclasses.asdict + prune_nulls) and tools/call (which maps to plain dicts), this handler forwards result verbatim. If fnct returns dataclass instances or other non-JSON-serializable objects, the JSON-RPC layer will fail to encode the response.

Two options:

  • Require fnct implementations to return fully JSON-serializable content structures ({"type": "text", "text": ...}, etc.), and document that contract clearly; or
  • Mirror the tools handling by transforming known MCP content dataclasses into dicts (or dataclasses.asdict(...) plus prune_nulls) before returning.

I’d recommend picking one contract and enforcing it here so resources/read can’t break at serialization time.


I'll verify the review comment by examining the codebase to confirm the serialization concerns and compare with other handlers.


---

🏁 Script executed:

```shell
#!/bin/bash
# Find the other handlers mentioned in the review comment
rg -n "def _rpc_tools" asab/mcp/service.py -A 30

Length of output: 908


🏁 Script executed:

#!/bin/bash
# Search for dataclasses.asdict usage in the codebase
rg -n "asdict" asab/ -B 2 -A 2

Length of output: 757


🏁 Script executed:

#!/bin/bash
# Look for prune_nulls usage
rg -n "prune_nulls" asab/ -B 2 -A 2

Length of output: 1739


🏁 Script executed:

#!/bin/bash
# Check for ResourceTemplates definition and usage
rg -n "ResourceTemplates" asab/mcp/service.py -B 2 -A 2

Length of output: 963


🏁 Script executed:

#!/bin/bash
# Search for JSON encoding/serialization mechanisms
rg -n "json\.dumps\|JSONEncoder\|__dict__" asab/mcp/service.py -B 1 -A 1

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look for the complete _rpc_resources_read method and surrounding context
sed -n '210,260p' asab/mcp/service.py

Length of output: 1362


🏁 Script executed:

#!/bin/bash
# Search for other _rpc_ methods to compare patterns
rg -n "async def _rpc_" asab/mcp/service.py

Length of output: 426


I need to see the complete _rpc_tools_call method and understand how responses are handled. Let me gather more information.


---

🏁 Script executed:

```shell
#!/bin/bash
# Get the full _rpc_tools_call method
sed -n '127,180p' asab/mcp/service.py

Length of output: 1396


🏁 Script executed:

#!/bin/bash
# Search for how aiohttp_rpc handles responses and JSON encoding
rg -n "aiohttp_rpc" asab/mcp/service.py -B 2 -A 5

Length of output: 1407


🏁 Script executed:

#!/bin/bash
# Look for any test files or examples showing resource template function returns
find asab -name "*test*" -o -name "*example*" | head -20

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check the custom JSON encoder in web/rest/json.py
cat asab/web/rest/json.py

Length of output: 7978


🏁 Script executed:

#!/bin/bash
# Look for where _rpc_ methods are registered/called to understand response flow
rg -n "setattr\|getattr\|RpcHandler" asab/mcp/service.py -B 1 -A 1

Length of output: 0


I need to understand how the HTTP responses are serialized for RPC calls.


---

🏁 Script executed:

```shell
#!/bin/bash
# Find _handle_http_request implementation
rg -n "_handle_http_request" asab/mcp/service.py -A 20

Length of output: 1928


🏁 Script executed:

#!/bin/bash
# Search for imports and see if JSONDumper or json_response is used
rg -n "from\|import" asab/mcp/service.py | head -30

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look at the file structure to find if there are more details
wc -l asab/mcp/service.py

Length of output: 23


🏁 Script executed:

#!/bin/bash
# Check if there's any serialization setup in the service init or elsewhere
rg -n "json\|dumps\|JSONDumper" asab/mcp/service.py

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Get info about aiohttp_rpc to see if it has built-in JSON encoder handling
rg -n "JsonRpcServer" asab/mcp/service.py -B 2 -A 10

Length of output: 806


Let me search for information about aiohttp_rpc's JSON handling and any examples of resource template functions.





🏁 Script executed:

#!/bin/bash
# Look for any examples or tests of resource template usage
find asab -type f -name "*.py" | xargs grep -l "add_resource_template\|resource_template_function" 2>/dev/null | head -5

Length of output: 19


🏁 Script executed:

#!/bin/bash
# Search entire codebase for any resource template function definitions
rg -n "add_resource_template" asab/ -B 2 -A 10

Length of output: 893


🏁 Script executed:

#!/bin/bash
# Check MCPToolResultTextContent and related dataclasses to understand structure
rg -n "class MCPToolResult\|class MCPResourceContent" asab/mcp/

Length of output: 0


🌐 Web query:

aiohttp_rpc JsonRpcServer JSON encoder dataclass handling

💡 Result:

Use the JSONRPCServer's json_serialize hook and pass a json.dumps that knows how to convert dataclasses (and nested non‑JSON types). Example:

  • install/see docs: aiohttp-rpc exposes JSONRPCServer with a json_serialize argument. [1][2]

Minimal example:

from aiohttp import web
import aiohttp_rpc
import json
from dataclasses import dataclass, asdict, is_dataclass
from functools import partial
import uuid

@DataClass
class User:
id: uuid.UUID
name: str

async def get_user(user_id) -> User:
return User(id=uuid.UUID(user_id), name="alice")

def default_json(o):
# dataclass -> dict, UUID -> str, fallback to repr
if is_dataclass(o):
return asdict(o)
if isinstance(o, uuid.UUID):
return str(o)
return repr(o)

rpc_server = aiohttp_rpc.JSONRPCServer(
json_serialize=partial(json.dumps, default=default_json),
)
rpc_server.add_method(get_user)

app = web.Application()

mount rpc_server.handle_http_request(...) as your handler

Notes:

  • asdict handles nested dataclasses.
  • If you need type/field control or validation, consider a dedicated dataclass serializer (e.g., serpyco, dataclasses-serialization) and call that from default_json. [1][3][4]

Sources:
[1] aiohttp-rpc (integration / json_serialize example). (pypi.org)
[2] aiohttp-json-rpc / related projects (implementation patterns). (pypi.org)
[3] serpyco-rs (dataclass serializer alternative). (libraries.io)
[4] dataclasses-serialization (JSON serializer for dataclasses). (pypi.org)

Citations:


Verify the contract: resource template functions may return dataclass instances, and _rpc_resources_read lacks serialization.

The review comment is correct and well-founded. Here's what verification confirmed:

  1. Inconsistent handling is real: _rpc_tools_call explicitly transforms results to plain dicts (lines 157–176), while _rpc_resources_read returns result verbatim (line 245).

  2. aiohttp_rpc has no custom JSON serializer configured: The JsonRpcServer is created without a json_serialize parameter (line 28), meaning it uses standard json.dumps(), which cannot serialize dataclass instances.

  3. Established pattern elsewhere: Other listing handlers use prune_nulls(dataclasses.asdict(...)) (lines 121, 260), suggesting the codebase expects dataclass handling.

  4. Actual runtime risk: If resource template functions return dataclass instances (a reasonable expectation given the pattern), the JSON-RPC layer will fail at serialization time with TypeError: Object of type X is not JSON serializable.

The two approaches from the review comment remain valid:

  • Explicitly transform known MCP content dataclasses (mirror _rpc_tools_call), or
  • Enforce a documentation contract requiring fully JSON-serializable structures.
🧰 Tools
🪛 Ruff (0.14.5)

216-216: Unused method argument: args

(ARG002)


216-216: Unused method argument: kwargs

(ARG002)


235-235: Avoid specifying long messages outside the exception class

(TRY003)



async def _rpc_resource_templates_list(self, *args, **kwargs):
'''
To discover available resource templates, clients send a resources/templates/list request.

https://modelcontextprotocol.io/specification/2025-06-18/server/resources#resource-templates
'''
# TODO: Pagination ...
return {
"resourceTemplates": [
prune_nulls(dataclasses.asdict(mcp_resource_template_info))
for _, mcp_resource_template_info in self.ResourceTemplates.values()
],
}


async def _rpc_notifications_initialized(self, *args, **kwargs):
"""
This notification is sent from the client to the server after initialization has finished.

https://modelcontextprotocol.io/specification/2025-06-18/schema#notifications%2Finitialized
"""
L.log(asab.LOG_NOTICE, "MCP Client initialized")
return {}


async def logging_middleware(request, handler):
response = await handler(request)
if response.error is None:
L.log(asab.LOG_NOTICE, "JSON-RPC request completed", struct_data={"method": request.method_name})
else:
L.warning("JSON-RPC request failed", struct_data={
"method": request.method_name,
"error": response.error.message,
})
return response
Loading