From 71a02f919a73e01aca14c1bfe907ad4e1daf5942 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Thu, 18 Dec 2025 16:38:27 +0100 Subject: [PATCH 01/12] api-key in client and ws client --- pctx-py/src/pctx_client/_client.py | 14 ++++-- pctx-py/src/pctx_client/_websocket_client.py | 13 ++++- pctx-py/tests/scripts/manual_code_mode.py | 50 +++++++++----------- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/pctx-py/src/pctx_client/_client.py b/pctx-py/src/pctx_client/_client.py index e72d5b60..308bc821 100644 --- a/pctx-py/src/pctx_client/_client.py +++ b/pctx-py/src/pctx_client/_client.py @@ -45,6 +45,7 @@ def __init__( tools: list[Tool | AsyncTool] | None = None, servers: list[ServerConfig] | None = None, url: str = "http://localhost:8080", + api_key: str | None = None, execute_timeout: float = 30.0, ): """ @@ -70,9 +71,15 @@ def __init__( ws_scheme = "wss" if http_scheme == "https" else "ws" - self._ws_client = WebSocketClient(url=f"{ws_scheme}://{host}/ws", tools=tools) - self._client = AsyncClient(base_url=f"{http_scheme}://{host}") + self._ws_client = WebSocketClient( + url=f"{ws_scheme}://{host}{parsed.path}/ws", api_key=api_key, tools=tools + ) + self._client = AsyncClient( + base_url=f"{http_scheme}://{host}{parsed.path}", + headers={"x-pctx-api-key": api_key or ""}, + ) self._session_id: str | None = None + self._api_key = api_key self._tools = tools or [] self._servers = servers or [] @@ -118,8 +125,7 @@ async def connect(self): f"Received invalid response from PCTX server at {self._client.base_url}. " "The server may be running but not responding correctly." ) from e - - self._client.headers = {"x-code-mode-session": self._session_id or ""} + self._client.headers.update({"x-code-mode-session": self._session_id or ""}) # Register all local tools & MCP servers configs: list[ToolConfig] = [ diff --git a/pctx-py/src/pctx_client/_websocket_client.py b/pctx-py/src/pctx_client/_websocket_client.py index c335b58a..c4f0fdc3 100644 --- a/pctx-py/src/pctx_client/_websocket_client.py +++ b/pctx-py/src/pctx_client/_websocket_client.py @@ -46,7 +46,12 @@ class WebSocketClient: receive and handle tool execution requests from the server """ - def __init__(self, url: str, tools: list[Tool | AsyncTool] | None = None): + def __init__( + self, + url: str, + api_key: str | None = None, + tools: list[Tool | AsyncTool] | None = None, + ): """ Initialize the WebSocket client. @@ -56,6 +61,7 @@ def __init__(self, url: str, tools: list[Tool | AsyncTool] | None = None): self.url = url self.ws: ClientConnection | None = None self.tools = tools or [] + self._api_key = api_key self._pending_executions: dict[str | int, asyncio.Future] = {} self._request_counter = 0 @@ -67,7 +73,10 @@ async def _connect(self, code_mode_session: str): ConnectionError: If connection fails """ try: - headers = {"x-code-mode-session": code_mode_session} + headers = { + "x-code-mode-session": code_mode_session, + "x-pctx-api-key": self._api_key, + } self.ws = await websockets.connect(self.url, additional_headers=headers) except Exception as e: raise ConnectionError(f"Failed to connect to {self.url}: {e}") from e diff --git a/pctx-py/tests/scripts/manual_code_mode.py b/pctx-py/tests/scripts/manual_code_mode.py index 9bf953ec..d8b24ede 100755 --- a/pctx-py/tests/scripts/manual_code_mode.py +++ b/pctx-py/tests/scripts/manual_code_mode.py @@ -28,7 +28,9 @@ def multiply(a: float, b: float) -> MultiplyOutput: async def main(): - p = Pctx( + async with Pctx( + url="http://localhost:8080/some-org/some-server", + api_key="asdlkfjasldf", tools=[add, subtract, multiply], # servers=[ # { @@ -40,32 +42,26 @@ async def main(): # }, # } # ], - ) - print("connecting....") - await p.connect() - - print("+++++++++++ LIST +++++++++++\n") - print((await p.list_functions()).code) - - print("\n\n+++++++++++ DETAILS +++++++++++\n") - print((await p.get_function_details(["MyMath.add"])).code) - - code = """ -async function run() { - let addval = await MyMath.add({a: 40, b: 2}); - let subval = await MyMath.subtract({a: addval, b: 2}); - let multval = await MyMath.multiply({a: subval, b: 2}); - - - return multval; -} -""" - print(code) - output = await p.execute(code) - pprint.pprint(output) - - print("disconnecting....") - await p.disconnect() + ) as p: + print("+++++++++++ LIST +++++++++++\n") + print((await p.list_functions()).code) + + print("\n\n+++++++++++ DETAILS +++++++++++\n") + print((await p.get_function_details(["MyMath.add"])).code) + + code = """ + async function run() { + let addval = await MyMath.add({a: 40, b: 2}); + let subval = await MyMath.subtract({a: addval, b: 2}); + let multval = await MyMath.multiply({a: subval, b: 2}); + + + return multval; + } + """ + print(code) + output = await p.execute(code) + pprint.pprint(output) if __name__ == "__main__": From ade41b7ad881bb7983dc19dd3f74e04497b2d62c Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Mon, 22 Dec 2025 13:55:56 -0500 Subject: [PATCH 02/12] wip --- Cargo.lock | 1 + crates/pctx_code_execution_runtime/Cargo.toml | 1 + .../pctx_code_execution_runtime/src/callback_registry.rs | 3 +++ crates/pctx_code_execution_runtime/src/mcp_registry.rs | 4 ++++ crates/pctx_session_server/src/websocket/handler.rs | 9 ++++++--- 5 files changed, 15 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef887b02..57c2163a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6947,6 +6947,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tokio", + "tracing", "url", ] diff --git a/crates/pctx_code_execution_runtime/Cargo.toml b/crates/pctx_code_execution_runtime/Cargo.toml index 70e9b243..4855f295 100644 --- a/crates/pctx_code_execution_runtime/Cargo.toml +++ b/crates/pctx_code_execution_runtime/Cargo.toml @@ -20,6 +20,7 @@ anyhow = "1.0" deno_error = "0.7" url = "2.5" rmcp = { workspace = true } +tracing = { workspace = true } [build-dependencies] pctx_config = { path = "../pctx_config" } diff --git a/crates/pctx_code_execution_runtime/src/callback_registry.rs b/crates/pctx_code_execution_runtime/src/callback_registry.rs index 8bef6e74..d91beabb 100644 --- a/crates/pctx_code_execution_runtime/src/callback_registry.rs +++ b/crates/pctx_code_execution_runtime/src/callback_registry.rs @@ -4,6 +4,7 @@ use std::{ pin::Pin, sync::{Arc, RwLock}, }; +use tracing::info; use crate::error::McpError; @@ -112,6 +113,8 @@ impl CallbackRegistry { McpError::ToolCall(format!("Callback with id \"{id}\" does not exist")) })?; + info!(callback_id =? id, "Handling callback"); + callback(args).await.map_err(|e| { McpError::ExecutionError(format!("Failed calling callback with id \"{id}\": {e}",)) }) diff --git a/crates/pctx_code_execution_runtime/src/mcp_registry.rs b/crates/pctx_code_execution_runtime/src/mcp_registry.rs index 664a4070..1882163e 100644 --- a/crates/pctx_code_execution_runtime/src/mcp_registry.rs +++ b/crates/pctx_code_execution_runtime/src/mcp_registry.rs @@ -3,6 +3,7 @@ use pctx_config::server::ServerConfig; use rmcp::model::{CallToolRequestParam, JsonObject, RawContent}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use tracing::info; /// Singleton registry for MCP server configurations #[derive(Clone)] @@ -99,6 +100,9 @@ pub(crate) async fn call_mcp_tool( )) })?; + info!( + server_name=?server_name,tool_name=?tool_name, "Handling MCP call"); + let client = mcp_cfg.connect().await?; let tool_result = client .call_tool(CallToolRequestParam { diff --git a/crates/pctx_session_server/src/websocket/handler.rs b/crates/pctx_session_server/src/websocket/handler.rs index 4a0ae903..e01e82b8 100644 --- a/crates/pctx_session_server/src/websocket/handler.rs +++ b/crates/pctx_session_server/src/websocket/handler.rs @@ -244,10 +244,14 @@ async fn handle_execute_code_request( } } + let execution_span = tracing::info_span!( + "execute_code_in_session", + code_mode_session_id = %code_mode_session_id + ); + tokio::spawn(async move { - let current_span = tracing::Span::current(); let output = tokio::task::spawn_blocking(move || -> Result<_, anyhow::Error> { - let _guard = current_span.enter(); + let _guard = execution_span.enter(); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() @@ -349,7 +353,6 @@ async fn handle_message( } Message::Close(_) => { info!("Received close message for session {ws_session}"); - println!("CLOSING...."); Ok(()) } Message::Ping(_) | Message::Pong(_) => Ok(()), From 4399893695cfae081cfd60a6c7b80b0fe2cdf70d Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Sun, 28 Dec 2025 10:33:03 -0700 Subject: [PATCH 03/12] more logging on session endpoints --- crates/pctx_session_server/src/routes.rs | 74 ++++++++++++++++++------ 1 file changed, 55 insertions(+), 19 deletions(-) diff --git a/crates/pctx_session_server/src/routes.rs b/crates/pctx_session_server/src/routes.rs index b9fccb78..3cd41c49 100644 --- a/crates/pctx_session_server/src/routes.rs +++ b/crates/pctx_session_server/src/routes.rs @@ -3,7 +3,9 @@ use axum::{Json, extract::State, http::StatusCode}; use pctx_code_mode::{ CodeMode, - model::{GetFunctionDetailsInput, GetFunctionDetailsOutput, ListFunctionsOutput}, + model::{ + CallbackConfig, GetFunctionDetailsInput, GetFunctionDetailsOutput, ListFunctionsOutput, + }, }; use tracing::{debug, error, info}; use uuid::Uuid; @@ -46,7 +48,10 @@ pub(crate) async fn create_session( State(state): State>, ) -> ApiResult> { let session_id = Uuid::new_v4(); - info!("Creating new CodeMode session: {session_id}"); + info!( + session_id =? session_id, + "Creating new CodeMode session" + ); let code_mode = CodeMode::default(); state @@ -55,7 +60,10 @@ pub(crate) async fn create_session( .await .context("Failed inserting code mode session into backend")?; - info!("Created CodeMode session: {session_id}"); + info!( + session_id =? session_id, + "Created CodeMode session" + ); Ok(Json(CreateSessionResponse { session_id })) } @@ -78,7 +86,7 @@ pub(crate) async fn close_session( State(state): State>, CodeModeSession(session_id): CodeModeSession, ) -> ApiResult> { - info!("Closing CodeMode session: {session_id}"); + info!(session_id =? session_id, "Closing CodeMode session"); let existed = state .backend @@ -97,7 +105,7 @@ pub(crate) async fn close_session( )); } - info!("Closed CodeMode session: {session_id}"); + info!(session_id =? session_id, "Closed CodeMode session"); Ok(Json(CloseSessionResponse { success: true })) } @@ -119,7 +127,7 @@ pub(crate) async fn list_functions( State(state): State>, CodeModeSession(session_id): CodeModeSession, ) -> ApiResult> { - info!(session_id =? session_id, "Listing tools"); + info!(session_id =? session_id, "Listing functions"); let code_mode = state .backend @@ -168,7 +176,8 @@ pub(crate) async fn get_function_details( .join(", "); info!( session_id =? session_id, - "Getting function details for {requested_functions}" + functions =? requested_functions, + "Getting function details", ); let code_mode = state.backend.get(session_id).await?.ok_or(ApiError::new( @@ -205,9 +214,15 @@ pub(crate) async fn register_tools( CodeModeSession(session_id): CodeModeSession, Json(request): Json, ) -> ApiResult> { + let tool_ids = request + .tools + .iter() + .map(CallbackConfig::id) + .collect::>(); info!( - "Registering {} tools for session {session_id}", - request.tools.len(), + session_id =? session_id, + tools =? &tool_ids, + "Registering tools...", ); let mut code_mode = state @@ -224,19 +239,25 @@ pub(crate) async fn register_tools( }, ))?; - let mut registered = 0; for tool in &request.tools { + debug!(tool =? tool.id(), "Adding callback tool {}", tool.id()); code_mode .add_callback(tool) .context("Failed adding callback")?; - - registered += 1; } // Update the backend with the modified CodeMode state.backend.update(session_id, code_mode).await?; - Ok(Json(RegisterToolsResponse { registered })) + info!( + session_id =? session_id, + tools =? &tool_ids, + "Registered tools", + ); + + Ok(Json(RegisterToolsResponse { + registered: request.tools.len(), + })) } /// Register MCP servers dynamically at runtime @@ -258,9 +279,15 @@ pub(crate) async fn register_servers( CodeModeSession(session_id): CodeModeSession, Json(request): Json, ) -> ApiResult> { + let server_ids = request + .servers + .iter() + .map(|s| format!("{} ({})", &s.name, &s.url)) + .collect::>(); info!( - "Registering {} MCP servers in session {session_id}", - request.servers.len() + session_id =? session_id, + servers =? &server_ids, + "Registering MCP servers...", ); let mut code_mode = state @@ -277,13 +304,13 @@ pub(crate) async fn register_servers( }, ))?; - let mut registered = 0; + let mut registered = Vec::new(); let mut failed = Vec::new(); for server in &request.servers { match register_mcp_server(&mut code_mode, server).await { Ok(()) => { - registered += 1; + registered.push(server); debug!("Successfully registered MCP server: {}", server.name); } Err(e) => { @@ -300,7 +327,16 @@ pub(crate) async fn register_servers( .await .context("Failed updating code mode session in backend")?; - Ok(Json(RegisterMcpServersResponse { registered, failed })) + info!( + session_id =? session_id, + servers =? &server_ids, + "Registered MCP servers", + ); + + Ok(Json(RegisterMcpServersResponse { + registered: registered.len(), + failed, + })) } async fn register_mcp_server( @@ -324,7 +360,7 @@ async fn register_mcp_server( .await .map_err(|e| format!("Failed to add MCP server: {e}"))?; - info!( + debug!( "Successfully registered MCP server '{}' with {} tools", server.name, code_mode From 8e21478b1b7d60c997d61214fd37492c11eed509 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Sun, 28 Dec 2025 13:02:48 -0700 Subject: [PATCH 04/12] instrument callback & mcp calls --- .../src/callback_registry.rs | 12 ++++++-- .../src/mcp_registry.rs | 30 +++++++++---------- .../src/websocket/handler.rs | 20 ++++++++----- pctx-py/tests/scripts/manual_code_mode.py | 13 +++++++- 4 files changed, 49 insertions(+), 26 deletions(-) diff --git a/crates/pctx_code_execution_runtime/src/callback_registry.rs b/crates/pctx_code_execution_runtime/src/callback_registry.rs index d91beabb..fbeecd4e 100644 --- a/crates/pctx_code_execution_runtime/src/callback_registry.rs +++ b/crates/pctx_code_execution_runtime/src/callback_registry.rs @@ -1,10 +1,11 @@ +use serde_json::json; use std::{ collections::HashMap, future::Future, pin::Pin, sync::{Arc, RwLock}, }; -use tracing::info; +use tracing::instrument; use crate::error::McpError; @@ -104,6 +105,13 @@ impl CallbackRegistry { /// /// This function will return an error if a callback by the provided id doesn't exist /// or if the callback itself fails + #[instrument( + name = "invoke_callback_tool", + skip_all, + fields(id=id, args = json!(args).to_string()), + ret(Display), + err + )] pub async fn invoke( &self, id: &str, @@ -113,8 +121,6 @@ impl CallbackRegistry { McpError::ToolCall(format!("Callback with id \"{id}\" does not exist")) })?; - info!(callback_id =? id, "Handling callback"); - callback(args).await.map_err(|e| { McpError::ExecutionError(format!("Failed calling callback with id \"{id}\": {e}",)) }) diff --git a/crates/pctx_code_execution_runtime/src/mcp_registry.rs b/crates/pctx_code_execution_runtime/src/mcp_registry.rs index 1882163e..97547cc8 100644 --- a/crates/pctx_code_execution_runtime/src/mcp_registry.rs +++ b/crates/pctx_code_execution_runtime/src/mcp_registry.rs @@ -1,9 +1,10 @@ use crate::error::McpError; use pctx_config::server::ServerConfig; use rmcp::model::{CallToolRequestParam, JsonObject, RawContent}; +use serde_json::json; use std::collections::HashMap; use std::sync::{Arc, RwLock}; -use tracing::info; +use tracing::{info, instrument}; /// Singleton registry for MCP server configurations #[derive(Clone)] @@ -87,6 +88,7 @@ impl Default for MCPRegistry { } /// Call an MCP tool on a registered server +#[instrument(name = "invoke_mcp_tool", skip(registry))] pub(crate) async fn call_mcp_tool( registry: &MCPRegistry, server_name: &str, @@ -100,9 +102,6 @@ pub(crate) async fn call_mcp_tool( )) })?; - info!( - server_name=?server_name,tool_name=?tool_name, "Handling MCP call"); - let client = mcp_cfg.connect().await?; let tool_result = client .call_tool(CallToolRequestParam { @@ -121,22 +120,23 @@ pub(crate) async fn call_mcp_tool( } // Prefer structuredContent if available, otherwise use content array - if let Some(structured) = tool_result.structured_content { - return Ok(structured); - } - - // Convert content to JSON value - // For simplicity, we'll extract text content and try to parse as JSON - if let Some(RawContent::Text(text_content)) = tool_result.content.first().map(|a| &**a) { + let has_structured = tool_result.structured_content.is_some(); + let val = if let Some(structured) = tool_result.structured_content { + // info!(structured_content = true, result =? &structured, "Tool result"); + structured + } else if let Some(RawContent::Text(text_content)) = tool_result.content.first().map(|a| &**a) { // Try to parse as JSON, fallback to string value serde_json::from_str(&text_content.text) .or_else(|_| Ok(serde_json::Value::String(text_content.text.clone()))) .map_err(|e: serde_json::Error| { McpError::ToolCall(format!("Failed to parse content: {e}")) - }) + })? } else { // Return the whole content array as JSON - serde_json::to_value(&tool_result.content) - .map_err(|e| McpError::ToolCall(format!("Failed to serialize content: {e}"))) - } + json!(tool_result.content) + }; + + info!(structured_content = has_structured, result =? &val, "Tool result"); + + Ok(val) } diff --git a/crates/pctx_session_server/src/websocket/handler.rs b/crates/pctx_session_server/src/websocket/handler.rs index e01e82b8..d7fbd212 100644 --- a/crates/pctx_session_server/src/websocket/handler.rs +++ b/crates/pctx_session_server/src/websocket/handler.rs @@ -82,9 +82,7 @@ async fn handle_socket( state: AppState, code_mode_session: Uuid, ) { - info!("New WebSocket connection with code_mode_session: {code_mode_session}"); - - info!("Verified code mode session {code_mode_session} exists, proceeding with WebSocket setup"); + info!(session_id =? code_mode_session, "New WebSocket connection"); // Split socket into sender and receiver let (sender, receiver) = socket.split(); @@ -96,7 +94,9 @@ async fn handle_socket( let session = WsSession::new(tx.clone(), code_mode_session); let ws_session = session.id; - info!( + debug!( + session_id =? code_mode_session, + ws_session =? ws_session, "Created session {ws_session} connected to code mode session {}", session.code_mode_session_id ); @@ -246,7 +246,8 @@ async fn handle_execute_code_request( let execution_span = tracing::info_span!( "execute_code_in_session", - code_mode_session_id = %code_mode_session_id + session_id =? code_mode_session_id, + code =? params.code, ); tokio::spawn(async move { @@ -259,12 +260,17 @@ async fn handle_execute_code_request( // create callback registry to execute callback requests over the same ws which // initiated the request - rt.block_on(async { + let res = rt.block_on(async { code_mode .execute(¶ms.code, Some(callback_registry)) .await .map_err(|e| anyhow::anyhow!("Execution error: {e}")) - }) + }); + if let Ok(output) = &res { + info!(result = json!(output).to_string(), "Execution completed"); + } + + res }) .await; diff --git a/pctx-py/tests/scripts/manual_code_mode.py b/pctx-py/tests/scripts/manual_code_mode.py index d8b24ede..ac1c5326 100755 --- a/pctx-py/tests/scripts/manual_code_mode.py +++ b/pctx-py/tests/scripts/manual_code_mode.py @@ -59,10 +59,21 @@ async def main(): return multval; } """ - print(code) output = await p.execute(code) pprint.pprint(output) + invalid_code = """ + async function run() { + let addval = await MyMath.add({a: "40", b: 2}); // invalid because `a` must be a number + + return addval; + } + """ + invalid_output = await p.execute(invalid_code) + pprint.pprint(invalid_output) + + print(p._session_id) + if __name__ == "__main__": asyncio.run(main()) From b3b172c2391a15ed0544f7bde63c5bf35d3da119 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Sun, 28 Dec 2025 13:47:42 -0700 Subject: [PATCH 05/12] instrument execute --- crates/pctx_code_mode/src/code_mode.rs | 5 +++-- crates/pctx_code_mode/src/model.rs | 5 +++++ crates/pctx_session_server/src/websocket/handler.rs | 12 +++--------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/pctx_code_mode/src/code_mode.rs b/crates/pctx_code_mode/src/code_mode.rs index 677044d5..61ccc74b 100644 --- a/crates/pctx_code_mode/src/code_mode.rs +++ b/crates/pctx_code_mode/src/code_mode.rs @@ -5,7 +5,7 @@ use pctx_code_execution_runtime::CallbackRegistry; use pctx_config::server::ServerConfig; use serde::{Deserialize, Serialize}; use serde_json::json; -use tracing::{debug, warn}; +use tracing::{debug, instrument, warn}; use crate::{ Error, Result, @@ -105,6 +105,7 @@ impl CodeMode { GetFunctionDetailsOutput { code, functions } } + #[instrument(skip(self, callback_registry), ret(Display), err)] pub async fn execute( &self, code: &str, @@ -156,7 +157,7 @@ impl CodeMode { namespaces = namespaces.join("\n\n"), )); - debug!("Executing code in sandbox"); + debug!(to_execute = %to_execute, "Executing code in sandbox"); let options = pctx_executor::ExecuteOptions::new() .with_allowed_hosts(self.allowed_hosts().into_iter().collect()) diff --git a/crates/pctx_code_mode/src/model.rs b/crates/pctx_code_mode/src/model.rs index 50e16e3c..852d6090 100644 --- a/crates/pctx_code_mode/src/model.rs +++ b/crates/pctx_code_mode/src/model.rs @@ -163,6 +163,11 @@ impl ExecuteOutput { ) } } +impl Display for ExecuteOutput { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", json!(&self)) + } +} // -------------- Callbacks -------------- diff --git a/crates/pctx_session_server/src/websocket/handler.rs b/crates/pctx_session_server/src/websocket/handler.rs index d7fbd212..7e05d75c 100644 --- a/crates/pctx_session_server/src/websocket/handler.rs +++ b/crates/pctx_session_server/src/websocket/handler.rs @@ -246,8 +246,7 @@ async fn handle_execute_code_request( let execution_span = tracing::info_span!( "execute_code_in_session", - session_id =? code_mode_session_id, - code =? params.code, + session_id =% code_mode_session_id, ); tokio::spawn(async move { @@ -260,17 +259,12 @@ async fn handle_execute_code_request( // create callback registry to execute callback requests over the same ws which // initiated the request - let res = rt.block_on(async { + rt.block_on(async { code_mode .execute(¶ms.code, Some(callback_registry)) .await .map_err(|e| anyhow::anyhow!("Execution error: {e}")) - }); - if let Ok(output) = &res { - info!(result = json!(output).to_string(), "Execution completed"); - } - - res + }) }) .await; From 84e2da211b132640546df2e0266be87a816e0ff6 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Tue, 30 Dec 2025 10:43:18 -0700 Subject: [PATCH 06/12] post execution hook --- crates/pctx_code_mode/src/code_mode.rs | 2 +- .../pctx_session_server/src/state/backend.rs | 13 +++- .../src/websocket/handler.rs | 68 ++++++++++++------- 3 files changed, 58 insertions(+), 25 deletions(-) diff --git a/crates/pctx_code_mode/src/code_mode.rs b/crates/pctx_code_mode/src/code_mode.rs index 6ea193ee..5abf28d4 100644 --- a/crates/pctx_code_mode/src/code_mode.rs +++ b/crates/pctx_code_mode/src/code_mode.rs @@ -113,7 +113,7 @@ impl CodeMode { ) -> Result { let registry = callback_registry.unwrap_or_default(); // Format for logging only - let formatted_code = codegen::format::format_ts(&code); + let formatted_code = codegen::format::format_ts(code); debug!( code_from_llm = %code, diff --git a/crates/pctx_session_server/src/state/backend.rs b/crates/pctx_session_server/src/state/backend.rs index 0631c4af..89db3ddb 100644 --- a/crates/pctx_session_server/src/state/backend.rs +++ b/crates/pctx_session_server/src/state/backend.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::{Context, Result}; use async_trait::async_trait; -use pctx_code_mode::CodeMode; +use pctx_code_mode::{CodeMode, model::ExecuteOutput}; use tokio::sync::RwLock; use uuid::Uuid; @@ -30,6 +30,17 @@ pub trait PctxSessionBackend: Clone + Send + Sync + 'static { /// Returns a full list of active `CodeMode` sessions in the backend. async fn list_sessions(&self) -> Result>; + + /// Hook called after every code mode execution websocket event + async fn post_execution( + &self, + _session_id: Uuid, + _execution_id: Uuid, + _code_mode: CodeMode, + _execution_res: Result, + ) -> Result<()> { + Ok(()) + } } /// Manages `CodeMode` sessions locally using thread-safe diff --git a/crates/pctx_session_server/src/websocket/handler.rs b/crates/pctx_session_server/src/websocket/handler.rs index 7e05d75c..c5a901a3 100644 --- a/crates/pctx_session_server/src/websocket/handler.rs +++ b/crates/pctx_session_server/src/websocket/handler.rs @@ -9,6 +9,7 @@ use crate::{ }, state::ws_manager::WsSession, }; +use anyhow::anyhow; use axum::{ extract::{ State, @@ -168,7 +169,7 @@ async fn handle_execute_code_request( req_id: RequestId, params: ExecuteCodeParams, ws_session: Uuid, - state: &AppState, + state: AppState, ) -> Result<(), String> { // Save the WebSocket session for later response let ws_session_lock = state @@ -201,8 +202,9 @@ async fn handle_execute_code_request( debug!("Found CodeMode session with ID: {code_mode_session_id}"); - let callback_registry = CallbackRegistry::default(); + let execution_id = Uuid::new_v4(); + let callback_registry = CallbackRegistry::default(); for callback_cfg in &code_mode.callbacks { let ws_session_lock_clone = ws_session_lock.clone(); let cfg = callback_cfg.clone(); @@ -246,10 +248,13 @@ async fn handle_execute_code_request( let execution_span = tracing::info_span!( "execute_code_in_session", - session_id =% code_mode_session_id, + session_id = %code_mode_session_id, + execution_id = %execution_id, ); tokio::spawn(async move { + let code_mode_clone = code_mode.clone(); + let output = tokio::task::spawn_blocking(move || -> Result<_, anyhow::Error> { let _guard = execution_span.enter(); let rt = tokio::runtime::Builder::new_current_thread() @@ -260,7 +265,7 @@ async fn handle_execute_code_request( // create callback registry to execute callback requests over the same ws which // initiated the request rt.block_on(async { - code_mode + code_mode_clone .execute(¶ms.code, Some(callback_registry)) .await .map_err(|e| anyhow::anyhow!("Execution error: {e}")) @@ -268,28 +273,45 @@ async fn handle_execute_code_request( }) .await; - let msg = match output { - Ok(Ok(exec_output)) => { - WsJsonRpcMessage::response(PctxJsonRpcResponse::ExecuteCode(exec_output), req_id) - } - Ok(Err(e)) => WsJsonRpcMessage::error( - ErrorData { - code: ErrorCode::INTERNAL_ERROR, - message: format!("Execution failed: {e}").into(), - data: None, - }, - req_id, + let (msg, execution_res) = match output { + Ok(Ok(exec_output)) => ( + WsJsonRpcMessage::response( + PctxJsonRpcResponse::ExecuteCode(exec_output.clone()), + req_id, + ), + Ok(exec_output), ), - Err(e) => WsJsonRpcMessage::error( - ErrorData { - code: ErrorCode::INTERNAL_ERROR, - message: format!("Task join failed: {e}").into(), - data: None, - }, - req_id, + Ok(Err(e)) => ( + WsJsonRpcMessage::error( + ErrorData { + code: ErrorCode::INTERNAL_ERROR, + message: format!("Execution failed: {e}").into(), + data: None, + }, + req_id, + ), + Err(anyhow!(e)), + ), + Err(e) => ( + WsJsonRpcMessage::error( + ErrorData { + code: ErrorCode::INTERNAL_ERROR, + message: format!("Task join failed: {e}").into(), + data: None, + }, + req_id, + ), + Err(anyhow!(e)), ), }; + if let Err(e) = state + .backend + .post_execution(code_mode_session_id, execution_id, code_mode, execution_res) + .await + { + error!("Failed to post_execution hook: {e}"); + } if let Err(e) = sender.send(msg) { error!("Failed to send execute_code response: {e}"); } @@ -316,7 +338,7 @@ async fn handle_message( JsonRpcMessage::Request(req) => match req.request { PctxJsonRpcRequest::ExecuteCode { params } => { debug!("Executing code..."); - handle_execute_code_request(req.id, params, ws_session, state).await + handle_execute_code_request(req.id, params, ws_session, state.clone()).await } PctxJsonRpcRequest::ExecuteTool { .. } => { // the server is only responsible for servicing execute_code requests, execute_tool From 9e0cfab8e8f5b1ecd71ec801bb5c066afe3d3271 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Tue, 30 Dec 2025 11:04:00 -0700 Subject: [PATCH 07/12] serde --- crates/pctx_session_server/src/model.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/pctx_session_server/src/model.rs b/crates/pctx_session_server/src/model.rs index b2ffcd27..1e2d0791 100644 --- a/crates/pctx_session_server/src/model.rs +++ b/crates/pctx_session_server/src/model.rs @@ -74,7 +74,7 @@ pub enum ErrorCode { } /// Request to register tools -#[derive(Debug, Deserialize, ToSchema)] +#[derive(Debug, Serialize, Deserialize, ToSchema)] pub struct RegisterToolsRequest { pub tools: Vec, } @@ -92,7 +92,7 @@ pub struct RegisterMcpServersRequest { } // TODO: de-dup with pctx_config -#[derive(Debug, Deserialize, Clone, ToSchema)] +#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)] pub struct McpServerConfig { pub name: String, pub url: String, From 8acb95710ecbfe5d3cb636e1b1147b8f9b32b38b Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Tue, 30 Dec 2025 11:10:38 -0700 Subject: [PATCH 08/12] more derives --- crates/pctx_session_server/src/model.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/pctx_session_server/src/model.rs b/crates/pctx_session_server/src/model.rs index 1e2d0791..6b67de23 100644 --- a/crates/pctx_session_server/src/model.rs +++ b/crates/pctx_session_server/src/model.rs @@ -53,7 +53,7 @@ impl IntoResponse for ApiError { } /// Health check response -#[derive(Debug, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct HealthResponse { pub status: String, pub version: String, @@ -74,25 +74,25 @@ pub enum ErrorCode { } /// Request to register tools -#[derive(Debug, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct RegisterToolsRequest { pub tools: Vec, } /// Response to registering tools -#[derive(Debug, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct RegisterToolsResponse { pub registered: usize, } /// Request to register MCP servers -#[derive(Debug, Deserialize, ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct RegisterMcpServersRequest { pub servers: Vec, } // TODO: de-dup with pctx_config -#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct McpServerConfig { pub name: String, pub url: String, @@ -102,20 +102,20 @@ pub struct McpServerConfig { } /// Response after registering MCP servers -#[derive(Debug, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct RegisterMcpServersResponse { pub registered: usize, pub failed: Vec, } /// Response after creating a new `CodeMode` session -#[derive(Debug, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct CreateSessionResponse { #[schema(value_type = String)] pub session_id: Uuid, } /// Response after closing a `CodeMode` session -#[derive(Debug, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct CloseSessionResponse { pub success: bool, } From e70adfa36e9839d2388cd411f3231100bf21a1c0 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Tue, 6 Jan 2026 08:45:27 -0500 Subject: [PATCH 09/12] update post_execution hook --- crates/pctx_code_mode/src/code_mode.rs | 2 +- crates/pctx_session_server/src/state/backend.rs | 6 +++++- crates/pctx_session_server/src/websocket/handler.rs | 12 ++++++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/crates/pctx_code_mode/src/code_mode.rs b/crates/pctx_code_mode/src/code_mode.rs index 5abf28d4..5f6726b8 100644 --- a/crates/pctx_code_mode/src/code_mode.rs +++ b/crates/pctx_code_mode/src/code_mode.rs @@ -15,7 +15,7 @@ use crate::{ }, }; -#[derive(Clone, Default, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct CodeMode { // Codegen interfaces pub tool_sets: Vec, diff --git a/crates/pctx_session_server/src/state/backend.rs b/crates/pctx_session_server/src/state/backend.rs index 89db3ddb..e84de88c 100644 --- a/crates/pctx_session_server/src/state/backend.rs +++ b/crates/pctx_session_server/src/state/backend.rs @@ -2,7 +2,10 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::{Context, Result}; use async_trait::async_trait; -use pctx_code_mode::{CodeMode, model::ExecuteOutput}; +use pctx_code_mode::{ + CodeMode, + model::{ExecuteInput, ExecuteOutput}, +}; use tokio::sync::RwLock; use uuid::Uuid; @@ -37,6 +40,7 @@ pub trait PctxSessionBackend: Clone + Send + Sync + 'static { _session_id: Uuid, _execution_id: Uuid, _code_mode: CodeMode, + _execution_req: ExecuteInput, _execution_res: Result, ) -> Result<()> { Ok(()) diff --git a/crates/pctx_session_server/src/websocket/handler.rs b/crates/pctx_session_server/src/websocket/handler.rs index c5a901a3..11567cb7 100644 --- a/crates/pctx_session_server/src/websocket/handler.rs +++ b/crates/pctx_session_server/src/websocket/handler.rs @@ -23,6 +23,7 @@ use futures::{ stream::{SplitSink, SplitStream}, }; use pctx_code_execution_runtime::{CallbackFn, CallbackRegistry}; +use pctx_code_mode::model::ExecuteInput; use rmcp::{ ErrorData, model::{ErrorCode, JsonRpcMessage, RequestId}, @@ -254,6 +255,7 @@ async fn handle_execute_code_request( tokio::spawn(async move { let code_mode_clone = code_mode.clone(); + let code_clone = params.code.clone(); let output = tokio::task::spawn_blocking(move || -> Result<_, anyhow::Error> { let _guard = execution_span.enter(); @@ -266,7 +268,7 @@ async fn handle_execute_code_request( // initiated the request rt.block_on(async { code_mode_clone - .execute(¶ms.code, Some(callback_registry)) + .execute(&code_clone, Some(callback_registry)) .await .map_err(|e| anyhow::anyhow!("Execution error: {e}")) }) @@ -307,7 +309,13 @@ async fn handle_execute_code_request( if let Err(e) = state .backend - .post_execution(code_mode_session_id, execution_id, code_mode, execution_res) + .post_execution( + code_mode_session_id, + execution_id, + code_mode, + ExecuteInput { code: params.code }, + execution_res, + ) .await { error!("Failed to post_execution hook: {e}"); From e7c82f3cfd2e6dbf746267c09ec223662abb0e56 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Wed, 7 Jan 2026 17:23:31 -0500 Subject: [PATCH 10/12] check len before register --- crates/pctx_executor/src/lib.rs | 7 ++++--- pctx-py/src/pctx_client/_client.py | 6 ++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/pctx_executor/src/lib.rs b/crates/pctx_executor/src/lib.rs index de6dc95f..aa8b7c6e 100644 --- a/crates/pctx_executor/src/lib.rs +++ b/crates/pctx_executor/src/lib.rs @@ -131,15 +131,16 @@ pub async fn execute(code: &str, options: ExecuteOptions) -> Result 0: + await self._register_tools(configs) + if len(self._servers) > 0: + await self._register_servers(self._servers) async def disconnect(self): """Disconnect closes current code-mode session.""" From 87f70f7884b2a948e6ca677f7d9bd92b3722b282 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Wed, 7 Jan 2026 18:11:34 -0500 Subject: [PATCH 11/12] consistent instrument --- crates/pctx_code_execution_runtime/src/mcp_registry.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/pctx_code_execution_runtime/src/mcp_registry.rs b/crates/pctx_code_execution_runtime/src/mcp_registry.rs index 97547cc8..607d5480 100644 --- a/crates/pctx_code_execution_runtime/src/mcp_registry.rs +++ b/crates/pctx_code_execution_runtime/src/mcp_registry.rs @@ -88,7 +88,13 @@ impl Default for MCPRegistry { } /// Call an MCP tool on a registered server -#[instrument(name = "invoke_mcp_tool", skip(registry))] +#[instrument( + name = "invoke_mcp_tool", + skip_all, + fields(id=format!("{server_name}.{tool_name}"), args = json!(args).to_string()), + ret(Display), + err +)] pub(crate) async fn call_mcp_tool( registry: &MCPRegistry, server_name: &str, From bdcd266033b52541c6018d10bb26704008cda4c2 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Thu, 8 Jan 2026 09:43:56 -0500 Subject: [PATCH 12/12] unused --- crates/pctx_code_execution_runtime/src/mcp_registry.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/pctx_code_execution_runtime/src/mcp_registry.rs b/crates/pctx_code_execution_runtime/src/mcp_registry.rs index 607d5480..f51b637b 100644 --- a/crates/pctx_code_execution_runtime/src/mcp_registry.rs +++ b/crates/pctx_code_execution_runtime/src/mcp_registry.rs @@ -128,7 +128,6 @@ pub(crate) async fn call_mcp_tool( // Prefer structuredContent if available, otherwise use content array let has_structured = tool_result.structured_content.is_some(); let val = if let Some(structured) = tool_result.structured_content { - // info!(structured_content = true, result =? &structured, "Tool result"); structured } else if let Some(RawContent::Text(text_content)) = tool_result.content.first().map(|a| &**a) { // Try to parse as JSON, fallback to string value