Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/pctx_code_execution_runtime/src/callback_registry.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use serde_json::json;
use std::{
collections::HashMap,
future::Future,
pin::Pin,
sync::{Arc, RwLock},
};
use tracing::instrument;

use crate::error::McpError;

Expand Down Expand Up @@ -103,6 +105,13 @@ impl CallbackRegistry {
///
/// This function will return an error if a callback by the provided id doesn't exist
/// or if the callback itself fails
#[instrument(
name = "invoke_callback_tool",
skip_all,
fields(id=id, args = json!(args).to_string()),
ret(Display),
err
)]
pub async fn invoke(
&self,
id: &str,
Expand Down
32 changes: 20 additions & 12 deletions crates/pctx_code_execution_runtime/src/mcp_registry.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::error::McpError;
use pctx_config::server::ServerConfig;
use rmcp::model::{CallToolRequestParam, JsonObject, RawContent};
use serde_json::json;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tracing::warn;
use tracing::{info, instrument, warn};

/// Singleton registry for MCP server configurations
#[derive(Clone)]
Expand Down Expand Up @@ -87,6 +88,13 @@ impl Default for MCPRegistry {
}

/// Call an MCP tool on a registered server
#[instrument(
name = "invoke_mcp_tool",
skip_all,
fields(id=format!("{server_name}.{tool_name}"), args = json!(args).to_string()),
ret(Display),
err
)]
pub(crate) async fn call_mcp_tool(
registry: &MCPRegistry,
server_name: &str,
Expand Down Expand Up @@ -132,22 +140,22 @@ pub(crate) async fn call_mcp_tool(
}

// Prefer structuredContent if available, otherwise use content array
if let Some(structured) = tool_result.structured_content {
return Ok(structured);
}

// Convert content to JSON value
// For simplicity, we'll extract text content and try to parse as JSON
if let Some(RawContent::Text(text_content)) = tool_result.content.first().map(|a| &**a) {
let has_structured = tool_result.structured_content.is_some();
let val = if let Some(structured) = tool_result.structured_content {
structured
} else if let Some(RawContent::Text(text_content)) = tool_result.content.first().map(|a| &**a) {
// Try to parse as JSON, fallback to string value
serde_json::from_str(&text_content.text)
.or_else(|_| Ok(serde_json::Value::String(text_content.text.clone())))
.map_err(|e: serde_json::Error| {
McpError::ToolCall(format!("Failed to parse content: {e}"))
})
})?
} else {
// Return the whole content array as JSON
serde_json::to_value(&tool_result.content)
.map_err(|e| McpError::ToolCall(format!("Failed to serialize content: {e}")))
}
json!(tool_result.content)
};

info!(structured_content = has_structured, result =? &val, "Tool result");

Ok(val)
}
7 changes: 4 additions & 3 deletions crates/pctx_code_mode/src/code_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pctx_code_execution_runtime::CallbackRegistry;
use pctx_config::server::ServerConfig;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{debug, warn};
use tracing::{debug, instrument, warn};

use crate::{
Error, Result,
Expand All @@ -15,7 +15,7 @@ use crate::{
},
};

#[derive(Clone, Default, Serialize, Deserialize)]
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct CodeMode {
// Codegen interfaces
pub tool_sets: Vec<codegen::ToolSet>,
Expand Down Expand Up @@ -105,6 +105,7 @@ impl CodeMode {
GetFunctionDetailsOutput { code, functions }
}

#[instrument(skip(self, callback_registry), ret(Display), err)]
pub async fn execute(
&self,
code: &str,
Expand Down Expand Up @@ -160,7 +161,7 @@ impl CodeMode {
namespaces = namespaces.join("\n\n"),
);

debug!("Executing code in sandbox");
debug!(to_execute = %to_execute, "Executing code in sandbox");

let options = pctx_executor::ExecuteOptions::new()
.with_allowed_hosts(self.allowed_hosts().into_iter().collect())
Expand Down
5 changes: 5 additions & 0 deletions crates/pctx_code_mode/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ impl ExecuteOutput {
)
}
}
impl Display for ExecuteOutput {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", json!(&self))
}
}

// -------------- Callbacks --------------

Expand Down
7 changes: 4 additions & 3 deletions crates/pctx_executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,16 @@ pub async fn execute(code: &str, options: ExecuteOptions) -> Result<ExecuteResul

// Check if we have diagnostics
if !check_result.diagnostics.is_empty() {
// Format diagnostics as rich stderr output
let stderr = format_diagnostics(&check_result.diagnostics);

warn!(
runtime = "type_check",
diagnostic = %stderr,
diagnostic_count = check_result.diagnostics.len(),
"Type check failed with diagnostics"
);

// Format diagnostics as rich stderr output
let stderr = format_diagnostics(&check_result.diagnostics);

return Ok(ExecuteResult {
success: false,
diagnostics: check_result.diagnostics,
Expand Down
16 changes: 8 additions & 8 deletions crates/pctx_session_server/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl IntoResponse for ApiError {
}

/// Health check response
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct HealthResponse {
pub status: String,
pub version: String,
Expand All @@ -74,25 +74,25 @@ pub enum ErrorCode {
}

/// Request to register tools
#[derive(Debug, Deserialize, ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct RegisterToolsRequest {
pub tools: Vec<pctx_code_mode::model::CallbackConfig>,
}

/// Response to registering tools
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct RegisterToolsResponse {
pub registered: usize,
}

/// Request to register MCP servers
#[derive(Debug, Deserialize, ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct RegisterMcpServersRequest {
pub servers: Vec<McpServerConfig>,
}

// TODO: de-dup with pctx_config
#[derive(Debug, Deserialize, Clone, ToSchema)]
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
#[serde(untagged)]
pub enum McpServerConfig {
Http {
Expand All @@ -114,20 +114,20 @@ pub enum McpServerConfig {
}

/// Response after registering MCP servers
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct RegisterMcpServersResponse {
pub registered: usize,
pub failed: Vec<String>,
}

/// Response after creating a new `CodeMode` session
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct CreateSessionResponse {
#[schema(value_type = String)]
pub session_id: Uuid,
}
/// Response after closing a `CodeMode` session
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct CloseSessionResponse {
pub success: bool,
}
Expand Down
56 changes: 42 additions & 14 deletions crates/pctx_session_server/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use axum::{Json, extract::State, http::StatusCode};

use pctx_code_mode::{
CodeMode,
model::{GetFunctionDetailsInput, GetFunctionDetailsOutput, ListFunctionsOutput},
model::{
CallbackConfig, GetFunctionDetailsInput, GetFunctionDetailsOutput, ListFunctionsOutput,
},
};
use tracing::info;
use tracing::{debug, info};
use uuid::Uuid;

use crate::extractors::CodeModeSession;
Expand Down Expand Up @@ -46,7 +48,10 @@ pub(crate) async fn create_session<B: PctxSessionBackend>(
State(state): State<AppState<B>>,
) -> ApiResult<Json<CreateSessionResponse>> {
let session_id = Uuid::new_v4();
info!("Creating new CodeMode session: {session_id}");
info!(
session_id =? session_id,
"Creating new CodeMode session"
);

let code_mode = CodeMode::default();
state
Expand All @@ -55,7 +60,10 @@ pub(crate) async fn create_session<B: PctxSessionBackend>(
.await
.context("Failed inserting code mode session into backend")?;

info!("Created CodeMode session: {session_id}");
info!(
session_id =? session_id,
"Created CodeMode session"
);

Ok(Json(CreateSessionResponse { session_id }))
}
Expand All @@ -78,7 +86,7 @@ pub(crate) async fn close_session<B: PctxSessionBackend>(
State(state): State<AppState<B>>,
CodeModeSession(session_id): CodeModeSession,
) -> ApiResult<Json<CloseSessionResponse>> {
info!("Closing CodeMode session: {session_id}");
info!(session_id =? session_id, "Closing CodeMode session");

let existed = state
.backend
Expand All @@ -97,7 +105,7 @@ pub(crate) async fn close_session<B: PctxSessionBackend>(
));
}

info!("Closed CodeMode session: {session_id}");
info!(session_id =? session_id, "Closed CodeMode session");

Ok(Json(CloseSessionResponse { success: true }))
}
Expand All @@ -119,7 +127,7 @@ pub(crate) async fn list_functions<B: PctxSessionBackend>(
State(state): State<AppState<B>>,
CodeModeSession(session_id): CodeModeSession,
) -> ApiResult<Json<ListFunctionsOutput>> {
info!(session_id =? session_id, "Listing tools");
info!(session_id =? session_id, "Listing functions");

let code_mode = state
.backend
Expand Down Expand Up @@ -168,7 +176,8 @@ pub(crate) async fn get_function_details<B: PctxSessionBackend>(
.join(", ");
info!(
session_id =? session_id,
"Getting function details for {requested_functions}"
functions =? requested_functions,
"Getting function details",
);

let code_mode = state.backend.get(session_id).await?.ok_or(ApiError::new(
Expand Down Expand Up @@ -205,9 +214,15 @@ pub(crate) async fn register_tools<B: PctxSessionBackend>(
CodeModeSession(session_id): CodeModeSession,
Json(request): Json<RegisterToolsRequest>,
) -> ApiResult<Json<RegisterToolsResponse>> {
let tool_ids = request
.tools
.iter()
.map(CallbackConfig::id)
.collect::<Vec<_>>();
info!(
"Registering {} tools for session {session_id}",
request.tools.len(),
session_id =? session_id,
tools =? &tool_ids,
"Registering tools...",
);

let mut code_mode = state
Expand All @@ -224,19 +239,25 @@ pub(crate) async fn register_tools<B: PctxSessionBackend>(
},
))?;

let mut registered = 0;
for tool in &request.tools {
debug!(tool =? tool.id(), "Adding callback tool {}", tool.id());
code_mode
.add_callback(tool)
.context("Failed adding callback")?;

registered += 1;
}

// Update the backend with the modified CodeMode
state.backend.update(session_id, code_mode).await?;

Ok(Json(RegisterToolsResponse { registered }))
info!(
session_id =? session_id,
tools =? &tool_ids,
"Registered tools",
);

Ok(Json(RegisterToolsResponse {
registered: request.tools.len(),
}))
}

/// Register MCP servers dynamically at runtime
Expand Down Expand Up @@ -303,6 +324,13 @@ pub(crate) async fn register_servers<B: PctxSessionBackend>(
.await
.context("Failed updating code mode session in backend")?;

info!(
session_id =% session_id,
registered =% registered,
failed =? failed,
"Registered MCP servers",
);

Ok(Json(RegisterMcpServersResponse { registered, failed }))
}

Expand Down
17 changes: 16 additions & 1 deletion crates/pctx_session_server/src/state/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{collections::HashMap, sync::Arc};

use anyhow::{Context, Result};
use async_trait::async_trait;
use pctx_code_mode::CodeMode;
use pctx_code_mode::{
CodeMode,
model::{ExecuteInput, ExecuteOutput},
};
use tokio::sync::RwLock;
use uuid::Uuid;

Expand Down Expand Up @@ -30,6 +33,18 @@ pub trait PctxSessionBackend: Clone + Send + Sync + 'static {

/// Returns a full list of active `CodeMode` sessions in the backend.
async fn list_sessions(&self) -> Result<Vec<Uuid>>;

/// Hook called after every code mode execution websocket event
async fn post_execution(
&self,
_session_id: Uuid,
_execution_id: Uuid,
_code_mode: CodeMode,
_execution_req: ExecuteInput,
_execution_res: Result<ExecuteOutput>,
) -> Result<()> {
Ok(())
}
}

/// Manages `CodeMode` sessions locally using thread-safe
Expand Down
Loading