rmcp is the official Rust implementation of the Model Context Protocol (MCP), a protocol designed for AI assistants to communicate with other services. This library can be used to build both servers that expose capabilities to AI assistants and clients that interact with such servers.
Creating a server with tools is simple using the #[tool] macro:
use rmcp::{
ServerHandler, ServiceExt,
handler::server::tool::ToolRouter,
model::*,
tool, tool_handler, tool_router,
transport::stdio,
ErrorData as McpError,
};
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct Counter {
counter: Arc<Mutex<i32>>,
tool_router: ToolRouter<Self>,
}
#[tool_router]
impl Counter {
fn new() -> Self {
Self {
counter: Arc::new(Mutex::new(0)),
tool_router: Self::tool_router(),
}
}
#[tool(description = "Increment the counter by 1")]
async fn increment(&self) -> Result<CallToolResult, McpError> {
let mut counter = self.counter.lock().await;
*counter += 1;
Ok(CallToolResult::success(vec![Content::text(
counter.to_string(),
)]))
}
#[tool(description = "Get the current counter value")]
async fn get(&self) -> Result<CallToolResult, McpError> {
let counter = self.counter.lock().await;
Ok(CallToolResult::success(vec![Content::text(
counter.to_string(),
)]))
}
}
// Implement the server handler
#[tool_handler]
impl ServerHandler for Counter {
fn get_info(&self) -> ServerInfo {
ServerInfo {
instructions: Some("A simple counter that tallies the number of times the increment tool has been used".into()),
capabilities: ServerCapabilities::builder().enable_tools().build(),
..Default::default()
}
}
}
// Run the server
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create and run the server with STDIO transport
let service = Counter::new().serve(stdio()).await.inspect_err(|e| {
println!("Error starting server: {}", e);
})?;
service.waiting().await?;
Ok(())
}Tools can return structured JSON data with schemas. Use the [Json] wrapper:
# use rmcp::{tool, tool_router, handler::server::{tool::ToolRouter, wrapper::Parameters}, Json};
# use schemars::JsonSchema;
# use serde::{Serialize, Deserialize};
#
#[derive(Serialize, Deserialize, JsonSchema)]
struct CalculationRequest {
a: i32,
b: i32,
operation: String,
}
#[derive(Serialize, Deserialize, JsonSchema)]
struct CalculationResult {
result: i32,
operation: String,
}
# #[derive(Clone)]
# struct Calculator {
# tool_router: ToolRouter<Self>,
# }
#
# #[tool_router]
# impl Calculator {
#[tool(name = "calculate", description = "Perform a calculation")]
async fn calculate(&self, params: Parameters<CalculationRequest>) -> Result<Json<CalculationResult>, String> {
let result = match params.0.operation.as_str() {
"add" => params.0.a + params.0.b,
"multiply" => params.0.a * params.0.b,
_ => return Err("Unknown operation".to_string()),
};
Ok(Json(CalculationResult { result, operation: params.0.operation }))
}
# }The #[tool] macro automatically generates an output schema from the CalculationResult type.
RMCP implements the task lifecycle from SEP-1686 so long-running or asynchronous tool calls can be queued and polled safely.
- Create: set the
taskfield onCallToolRequestParamto ask the server to enqueue the tool call. The response is aCreateTaskResultthat includes the generatedtask.task_id. - Inspect: use
tasks/get(GetTaskInfoRequest) to retrieve metadata such as status, timestamps, TTL, and poll interval. - Await results: call
tasks/result(GetTaskResultRequest) to block until the task completes and receive either the finalCallToolResultpayload or a protocol error. - Cancel: call
tasks/cancel(CancelTaskRequest) to request termination of a running task.
To expose task support, enable the tasks capability when building ServerCapabilities. The #[task_handler] macro and OperationProcessor utility provide reference implementations for enqueuing, tracking, and collecting task results.
Creating a client to interact with a server:
use rmcp::{
ServiceExt,
model::CallToolRequestParams,
transport::{ConfigureCommandExt, TokioChildProcess},
};
use tokio::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to a server running as a child process
let service = ()
.serve(TokioChildProcess::new(Command::new("uvx").configure(
|cmd| {
cmd.arg("mcp-server-git");
},
))?)
.await?;
// Get server information
let server_info = service.peer_info();
println!("Connected to server: {server_info:#?}");
// List available tools
let tools = service.list_tools(Default::default()).await?;
println!("Available tools: {tools:#?}");
// Call a tool
let result = service
.call_tool(CallToolRequestParams {
meta: None,
name: "git_status".into(),
arguments: serde_json::json!({ "repo_path": "." }).as_object().cloned(),
task: None,
})
.await?;
println!("Result: {result:#?}");
// Gracefully close the connection
service.cancel().await?;
Ok(())
}For more examples, see the examples directory in the repository.
RMCP supports multiple transport mechanisms, each suited for different use cases:
Low-level interface for asynchronous read/write operations. This is the foundation for many other transports.
For working directly with I/O streams (tokio::io::AsyncRead and tokio::io::AsyncWrite).
Run MCP servers as child processes and communicate via standard I/O.
Example:
use rmcp::transport::TokioChildProcess;
use tokio::process::Command;
let transport = TokioChildProcess::new(Command::new("mcp-server"))?;
let service = client.serve(transport).await?;You can get the Peer struct from NotificationContext and RequestContext.
# use rmcp::{
# ServerHandler,
# model::{LoggingLevel, LoggingMessageNotificationParam, ProgressNotificationParam},
# service::{NotificationContext, RoleServer},
# };
# pub struct Handler;
impl ServerHandler for Handler {
async fn on_progress(
&self,
notification: ProgressNotificationParam,
context: NotificationContext<RoleServer>,
) {
let peer = context.peer;
let _ = peer
.notify_logging_message(LoggingMessageNotificationParam {
level: LoggingLevel::Info,
logger: None,
data: serde_json::json!({
"message": format!("Progress: {}", notification.progress),
}),
})
.await;
}
}For many cases you need to manage several service in a collection, you can call into_dyn to convert services into the same type.
let service = service.into_dyn();RMCP uses feature flags to control which components are included:
client: Enable client functionalityserver: Enable server functionality and the tool systemmacros: Enable the#[tool]macro (enabled by default)- Transport-specific features:
transport-async-rw: Async read/write supporttransport-io: I/O stream supporttransport-child-process: Child process supporttransport-streamable-http-client/transport-streamable-http-server: HTTP streaming (client agnostic, seeStreamableHttpClientTransportfor details)transport-streamable-http-client-reqwest: a defaultreqwestimplementation of the streamable http client
auth: OAuth2 authentication supportschemars: JSON Schema generation (for tool definitions)- TLS backend options (for HTTP transports):
reqwest: Uses rustls (pure Rust TLS, recommended default)reqwest-native-tls: Uses platform native TLS (OpenSSL on Linux, Secure Transport on macOS, SChannel on Windows)reqwest-tls-no-provider: Uses rustls without a default crypto provider (bring your own)
transport-io: Server stdio transporttransport-child-process: Client stdio transporttransport-streamable-http-serverstreamable http server transporttransport-streamable-http-clientstreamable http client transport
Transport
The transport type must implement the Transport trait, which allows it to send messages concurrently and receive messages sequentially.
There are 2 pairs of standard transport types:
| transport | client | server |
|---|---|---|
| std IO | TokioChildProcess |
stdio |
| streamable http | StreamableHttpClientTransport |
StreamableHttpService |
IntoTransport is a helper trait that implicitly converts a type into a transport type.
These types automatically implement IntoTransport:
- A type that implements both
futures::Sinkandfutures::Stream, or a tuple(Tx, Rx)whereTxisfutures::SinkandRxisfutures::Stream. - A type that implements both
tokio::io::AsyncReadandtokio::io::AsyncWrite, or a tuple(R, W)whereRistokio::io::AsyncReadandWistokio::io::AsyncWrite. - A type that implements the
Workertrait. - A type that implements the
Transporttrait.
This project is licensed under the terms specified in the repository's LICENSE file.