From 8644ff406fca61a6496c88a0623d2e6b086a8a3e Mon Sep 17 00:00:00 2001 From: Jerome Swannack Date: Mon, 19 Jan 2026 16:52:20 +0000 Subject: [PATCH 1/4] feat: add message middleware support for Protocol Add a middleware pattern that allows transforming JSON-RPC messages before sending and after receiving. This provides a clean way to extend protocol messages (e.g., adding custom capabilities to initialize requests) without needing to subclass or override protocol methods. Middleware functions receive a JSON-RPC message and return a (possibly transformed) message. Both sync and async middleware are supported. Usage example: const client = new Client({ name: 'my-client', version: '1.0.0' }, { sendMiddleware: [ (message) => { // Transform outgoing message if (isJSONRPCRequest(message) && message.method === 'initialize') { // Add custom capabilities } return message; } ], receiveMiddleware: [ (message) => { // Transform incoming message return message; } ] }); Changes: - Add MessageMiddleware type and JSONRPCMessageLike type alias - Add sendMiddleware and receiveMiddleware to ProtocolOptions - Apply middleware in connect() for incoming messages - Apply middleware in request() and notification() for outgoing messages --- packages/core/src/shared/protocol.ts | 122 +++++++++++++++++++++------ 1 file changed, 96 insertions(+), 26 deletions(-) diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index 90c6116e0..44344a96d 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -60,6 +60,17 @@ import type { Transport, TransportSendOptions } from './transport.js'; */ export type ProgressCallback = (progress: Progress) => void; +/** + * A JSON-RPC message that can be transformed by middleware. + */ +export type JSONRPCMessageLike = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCResultResponse | JSONRPCErrorResponse; + +/** + * Middleware function for transforming JSON-RPC messages. + * Can be sync (returns message) or async (returns Promise). + */ +export type MessageMiddleware = (message: JSONRPCMessageLike) => JSONRPCMessageLike | Promise; + /** * Additional initialization options. */ @@ -102,6 +113,16 @@ export type ProtocolOptions = { * appropriately (e.g., by failing the task, dropping messages, etc.). */ maxTaskQueueSize?: number; + /** + * Middleware functions to apply to outgoing messages before sending. + * Middleware is applied in order, with each function receiving the output of the previous. + */ + sendMiddleware?: MessageMiddleware[]; + /** + * Middleware functions to apply to incoming messages after receiving. + * Middleware is applied in order, with each function receiving the output of the previous. + */ + receiveMiddleware?: MessageMiddleware[]; }; /** @@ -603,6 +624,25 @@ export abstract class Protocol( + message: T, + middlewareList: MessageMiddleware[] | undefined + ): Promise { + if (!middlewareList || middlewareList.length === 0) { + return message; + } + + let result: JSONRPCMessageLike = message; + for (const middleware of middlewareList) { + result = await middleware(result); + } + return result as T; + } + /** * Attaches to the given transport, starts it, and starts listening for messages. * @@ -625,15 +665,21 @@ export abstract class Protocol { _onmessage?.(message, extra); - if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) { - this._onresponse(message); - } else if (isJSONRPCRequest(message)) { - this._onrequest(message, extra); - } else if (isJSONRPCNotification(message)) { - this._onnotification(message); - } else { - this._onerror(new Error(`Unknown message type: ${JSON.stringify(message)}`)); - } + + // Apply receive middleware and then route the message + this._applyMiddleware(message, this._options?.receiveMiddleware) + .then(transformedMessage => { + if (isJSONRPCResultResponse(transformedMessage) || isJSONRPCErrorResponse(transformedMessage)) { + this._onresponse(transformedMessage); + } else if (isJSONRPCRequest(transformedMessage)) { + this._onrequest(transformedMessage, extra); + } else if (isJSONRPCNotification(transformedMessage)) { + this._onnotification(transformedMessage); + } else { + this._onerror(new Error(`Unknown message type: ${JSON.stringify(transformedMessage)}`)); + } + }) + .catch(error => this._onerror(new Error(`Receive middleware error: ${error}`))); }; await this._transport.start(); @@ -1211,23 +1257,38 @@ export abstract class Protocol { - this._cleanupTimeout(messageId); - reject(error); - }); + // Apply send middleware before queuing + this._applyMiddleware(jsonrpcRequest, this._options?.sendMiddleware) + .then(transformedRequest => { + this._enqueueTaskMessage(relatedTaskId, { + type: 'request', + message: transformedRequest as JSONRPCRequest, + timestamp: Date.now() + }).catch(error => { + this._cleanupTimeout(messageId); + reject(error); + }); + }) + .catch(error => { + this._cleanupTimeout(messageId); + reject(error); + }); // Don't send through transport - queued messages are delivered via tasks/result only // This prevents duplicate delivery for bidirectional transports } else { - // No related task - send through transport normally - this._transport.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => { - this._cleanupTimeout(messageId); - reject(error); - }); + // No related task - apply send middleware and send through transport normally + this._applyMiddleware(jsonrpcRequest, this._options?.sendMiddleware) + .then(transformedRequest => { + this._transport!.send(transformedRequest as JSONRPCRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => { + this._cleanupTimeout(messageId); + reject(error); + }); + }) + .catch(error => { + this._cleanupTimeout(messageId); + reject(error); + }); } }); } @@ -1302,9 +1363,12 @@ export abstract class Protocol this._onerror(error)); + this._applyMiddleware(jsonrpcNotification, this._options?.sendMiddleware) + .then(transformedNotification => { + this._transport?.send(transformedNotification as JSONRPCNotification, options).catch(error => this._onerror(error)); + }) + .catch(error => this._onerror(error)); }); // Return immediately. @@ -1386,7 +1454,9 @@ export abstract class Protocol Date: Mon, 19 Jan 2026 16:56:26 +0000 Subject: [PATCH 2/4] fix: apply middleware before Promise to maintain sync behavior for tests --- packages/core/src/shared/protocol.ts | 121 ++++++++++++--------------- 1 file changed, 55 insertions(+), 66 deletions(-) diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index 44344a96d..5b35fb858 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -1120,9 +1120,50 @@ export abstract class Protocol(request: SendRequestT, resultSchema: T, options?: RequestOptions): Promise> { + async request(request: SendRequestT, resultSchema: T, options?: RequestOptions): Promise> { const { relatedRequestId, resumptionToken, onresumptiontoken, task, relatedTask } = options ?? {}; + // Build the JSON-RPC request + const messageId = this._requestMessageId++; + let jsonrpcRequest: JSONRPCRequest = { + ...request, + jsonrpc: '2.0', + id: messageId + }; + + if (options?.onprogress) { + this._progressHandlers.set(messageId, options.onprogress); + jsonrpcRequest.params = { + ...request.params, + _meta: { + ...(request.params?._meta || {}), + progressToken: messageId + } + }; + } + + // Augment with task creation parameters if provided + if (task) { + jsonrpcRequest.params = { + ...jsonrpcRequest.params, + task: task + }; + } + + // Augment with related task metadata if relatedTask is provided + if (relatedTask) { + jsonrpcRequest.params = { + ...jsonrpcRequest.params, + _meta: { + ...(jsonrpcRequest.params?._meta || {}), + [RELATED_TASK_META_KEY]: relatedTask + } + }; + } + + // Apply send middleware before sending + jsonrpcRequest = await this._applyMiddleware(jsonrpcRequest, this._options?.sendMiddleware) as JSONRPCRequest; + // Send the request return new Promise>((resolve, reject) => { const earlyReject = (error: unknown) => { @@ -1150,43 +1191,6 @@ export abstract class Protocol { this._responseHandlers.delete(messageId); this._progressHandlers.delete(messageId); @@ -1257,38 +1261,23 @@ export abstract class Protocol { - this._enqueueTaskMessage(relatedTaskId, { - type: 'request', - message: transformedRequest as JSONRPCRequest, - timestamp: Date.now() - }).catch(error => { - this._cleanupTimeout(messageId); - reject(error); - }); - }) - .catch(error => { - this._cleanupTimeout(messageId); - reject(error); - }); + this._enqueueTaskMessage(relatedTaskId, { + type: 'request', + message: jsonrpcRequest, + timestamp: Date.now() + }).catch(error => { + this._cleanupTimeout(messageId); + reject(error); + }); // Don't send through transport - queued messages are delivered via tasks/result only // This prevents duplicate delivery for bidirectional transports } else { - // No related task - apply send middleware and send through transport normally - this._applyMiddleware(jsonrpcRequest, this._options?.sendMiddleware) - .then(transformedRequest => { - this._transport!.send(transformedRequest as JSONRPCRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => { - this._cleanupTimeout(messageId); - reject(error); - }); - }) - .catch(error => { - this._cleanupTimeout(messageId); - reject(error); - }); + // No related task - send through transport normally + this._transport!.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => { + this._cleanupTimeout(messageId); + reject(error); + }); } }); } From 74fa3560939bb802be82e7c1b5c2c0212808a352 Mon Sep 17 00:00:00 2001 From: Jerome Swannack Date: Mon, 19 Jan 2026 16:58:38 +0000 Subject: [PATCH 3/4] style: format with prettier --- packages/core/src/shared/protocol.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index 5b35fb858..28e8876fe 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -628,10 +628,7 @@ export abstract class Protocol( - message: T, - middlewareList: MessageMiddleware[] | undefined - ): Promise { + private async _applyMiddleware(message: T, middlewareList: MessageMiddleware[] | undefined): Promise { if (!middlewareList || middlewareList.length === 0) { return message; } @@ -1162,7 +1159,7 @@ export abstract class Protocol>((resolve, reject) => { From d4f8dfa4fd8e8757c74608122897c3d174d43f2e Mon Sep 17 00:00:00 2001 From: Jerome Swannack Date: Mon, 19 Jan 2026 17:00:01 +0000 Subject: [PATCH 4/4] fix: avoid async boundary when no middleware is configured Only await middleware application when there's actually middleware configured. This preserves synchronous behavior for existing tests that depend on immediate message queuing. --- packages/core/src/shared/protocol.ts | 81 ++++++++++++++++++---------- 1 file changed, 54 insertions(+), 27 deletions(-) diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index 28e8876fe..26f1868a6 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -663,20 +663,26 @@ export abstract class Protocol { _onmessage?.(message, extra); - // Apply receive middleware and then route the message - this._applyMiddleware(message, this._options?.receiveMiddleware) - .then(transformedMessage => { - if (isJSONRPCResultResponse(transformedMessage) || isJSONRPCErrorResponse(transformedMessage)) { - this._onresponse(transformedMessage); - } else if (isJSONRPCRequest(transformedMessage)) { - this._onrequest(transformedMessage, extra); - } else if (isJSONRPCNotification(transformedMessage)) { - this._onnotification(transformedMessage); - } else { - this._onerror(new Error(`Unknown message type: ${JSON.stringify(transformedMessage)}`)); - } - }) - .catch(error => this._onerror(new Error(`Receive middleware error: ${error}`))); + // Route the message, applying receive middleware if configured + const routeMessage = (msg: JSONRPCMessageLike) => { + if (isJSONRPCResultResponse(msg) || isJSONRPCErrorResponse(msg)) { + this._onresponse(msg); + } else if (isJSONRPCRequest(msg)) { + this._onrequest(msg, extra); + } else if (isJSONRPCNotification(msg)) { + this._onnotification(msg); + } else { + this._onerror(new Error(`Unknown message type: ${JSON.stringify(msg)}`)); + } + }; + + if (this._options?.receiveMiddleware && this._options.receiveMiddleware.length > 0) { + this._applyMiddleware(message, this._options.receiveMiddleware) + .then(routeMessage) + .catch(error => this._onerror(new Error(`Receive middleware error: ${error}`))); + } else { + routeMessage(message); + } }; await this._transport.start(); @@ -1158,8 +1164,10 @@ export abstract class Protocol 0) { + jsonrpcRequest = (await this._applyMiddleware(jsonrpcRequest, this._options.sendMiddleware)) as JSONRPCRequest; + } // Send the request return new Promise>((resolve, reject) => { @@ -1349,12 +1357,18 @@ export abstract class Protocol 0) { + notificationToQueue = (await this._applyMiddleware( + jsonrpcNotification, + this._options.sendMiddleware + )) as JSONRPCNotification; + } await this._enqueueTaskMessage(relatedTaskId, { type: 'notification', - message: transformedNotification as JSONRPCNotification, + message: notificationToQueue, timestamp: Date.now() }); @@ -1410,11 +1424,17 @@ export abstract class Protocol { - this._transport?.send(transformedNotification as JSONRPCNotification, options).catch(error => this._onerror(error)); - }) - .catch(error => this._onerror(error)); + if (this._options?.sendMiddleware && this._options.sendMiddleware.length > 0) { + this._applyMiddleware(jsonrpcNotification, this._options.sendMiddleware) + .then(transformedNotification => { + this._transport + ?.send(transformedNotification as JSONRPCNotification, options) + .catch(error => this._onerror(error)); + }) + .catch(error => this._onerror(error)); + } else { + this._transport?.send(jsonrpcNotification, options).catch(error => this._onerror(error)); + } }); // Return immediately. @@ -1440,9 +1460,16 @@ export abstract class Protocol 0) { + const transformedNotification = (await this._applyMiddleware( + jsonrpcNotification, + this._options.sendMiddleware + )) as JSONRPCNotification; + await this._transport.send(transformedNotification, options); + } else { + await this._transport.send(jsonrpcNotification, options); + } } /**