From 9d072f3a387c80e7f5191a41e56cc81e54690a71 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Wed, 4 Feb 2026 16:42:53 -0500 Subject: [PATCH 1/3] cetralized mutex --- crates/pctx_executor/src/lib.rs | 25 +++- .../src/tests/concurrent_v8_stress.rs | 40 +++++ crates/pctx_executor/src/tests/mod.rs | 1 + crates/pctx_type_check_runtime/src/lib.rs | 46 +++--- pctx-py/tests/scripts/manual_code_mode.py | 141 ++++++++++-------- 5 files changed, 171 insertions(+), 82 deletions(-) create mode 100644 crates/pctx_executor/src/tests/concurrent_v8_stress.rs diff --git a/crates/pctx_executor/src/lib.rs b/crates/pctx_executor/src/lib.rs index a470b5f..d6b2138 100644 --- a/crates/pctx_executor/src/lib.rs +++ b/crates/pctx_executor/src/lib.rs @@ -3,13 +3,27 @@ use deno_core::ModuleCodeString; use deno_core::RuntimeOptions; use deno_core::anyhow; use deno_core::error::CoreError; +use futures::lock::Mutex; use pctx_code_execution_runtime::CallbackRegistry; -pub use pctx_type_check_runtime::{CheckResult, Diagnostic, is_relevant_error, type_check}; +pub use pctx_type_check_runtime::{CheckResult, Diagnostic, is_relevant_error}; +use pctx_type_check_runtime::{init_v8_platform, type_check}; use serde::{Deserialize, Serialize}; use std::rc::Rc; use thiserror::Error; use tracing::{debug, warn}; +/// Process-wide mutex to serialize all V8 isolate creation and usage. +/// +/// V8 isolates share platform-level state (code pages, thread pool, etc.) that is not +/// safe to access concurrently from multiple OS threads. All code that creates or uses +/// a `JsRuntime` must hold this lock for the runtime's entire lifetime. +/// +/// This mutex is acquired by `execute()` and held for both type checking and code execution. +static V8_MUTEX: std::sync::LazyLock> = std::sync::LazyLock::new(|| { + init_v8_platform(); + Mutex::new(()) +}); + pub type Result = std::result::Result; #[derive(Clone, Default)] @@ -126,7 +140,14 @@ pub async fn execute(code: &str, options: ExecuteOptions) -> Result anyhow::Result { debug!("Starting code execution"); + // Note: V8_MUTEX is held by the caller (execute()) for the entire operation + // Transpile TypeScript to JavaScript let js_code = match pctx_deno_transpiler::transpile(code, None) { Ok(js) => { diff --git a/crates/pctx_executor/src/tests/concurrent_v8_stress.rs b/crates/pctx_executor/src/tests/concurrent_v8_stress.rs new file mode 100644 index 0000000..e1ba098 --- /dev/null +++ b/crates/pctx_executor/src/tests/concurrent_v8_stress.rs @@ -0,0 +1,40 @@ +//! Stress test for concurrent V8 isolate usage. +//! +//! Deliberately does NOT use #[serial] — the goal is to reproduce the +//! V8 vector-out-of-bounds crash that occurs when type_check and +//! execute_code create JsRuntimes on different OS threads simultaneously. + +use crate::{ExecuteOptions, execute}; + +#[test] +fn test_concurrent_execute_stress() { + // Mirror the production pattern: multiple OS threads each with their own + // single-threaded tokio runtime, calling execute() (type_check + execute_code) + // concurrently. This creates overlapping V8 isolates without serialization. + let handles: Vec<_> = (0..4) + .map(|i| { + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + for j in 0..3 { + let code = + format!("const x{i}_{j}: number = {i} + {j}; export default x{i}_{j};"); + let result = execute(&code, ExecuteOptions::new()).await.unwrap(); + assert!( + result.success, + "iteration {i}_{j} failed: {:?}", + result.diagnostics + ); + } + }) + }) + }) + .collect(); + + for h in handles { + h.join().unwrap(); + } +} diff --git a/crates/pctx_executor/src/tests/mod.rs b/crates/pctx_executor/src/tests/mod.rs index 03a155b..9c7e34c 100644 --- a/crates/pctx_executor/src/tests/mod.rs +++ b/crates/pctx_executor/src/tests/mod.rs @@ -15,6 +15,7 @@ pub(crate) fn init_rustls_crypto() { } mod callback_usage; +mod concurrent_v8_stress; mod default_export_capture; mod diagnostic_filtering; mod mcp_client_usage; diff --git a/crates/pctx_type_check_runtime/src/lib.rs b/crates/pctx_type_check_runtime/src/lib.rs index 245dcbd..50d4a4d 100644 --- a/crates/pctx_type_check_runtime/src/lib.rs +++ b/crates/pctx_type_check_runtime/src/lib.rs @@ -71,7 +71,6 @@ pub mod ignored_codes; use deno_core::JsRuntime; use deno_core::RuntimeOptions; -use futures::lock::Mutex; use serde::{Deserialize, Serialize}; use std::rc::Rc; use thiserror::Error; @@ -133,12 +132,16 @@ deno_core::extension!( esm = [ dir "src", "type_check_runtime_generated.js" ], ); -// Global mutex to serialize type checking operations and prevent V8 race conditions -static TYPE_CHECK_MUTEX: std::sync::LazyLock> = std::sync::LazyLock::new(|| { - // Initialize V8 platform once - deno_core::JsRuntime::init_platform(None); - Mutex::new(()) -}); +/// Initialize the V8 platform. Must be called before any JsRuntime is created. +/// Safe to call multiple times - only the first call has effect. +static V8_INIT: std::sync::Once = std::sync::Once::new(); + +/// Ensure V8 platform is initialized. Called automatically by type_check. +pub fn init_v8_platform() { + V8_INIT.call_once(|| { + deno_core::JsRuntime::init_platform(None); + }); +} /// Type check TypeScript code using an isolated Deno runtime with TypeScript compiler /// @@ -175,6 +178,7 @@ static TYPE_CHECK_MUTEX: std::sync::LazyLock> = std::sync::LazyLock::n /// # } /// ``` pub async fn type_check(code: &str) -> Result { + println!("[type_check] starting syntax check"); // First do a quick syntax check with deno_ast let parse_result = deno_ast::parse_module(deno_ast::ParseParams { specifier: deno_ast::ModuleSpecifier::parse("file:///check.ts") @@ -200,17 +204,17 @@ pub async fn type_check(code: &str) -> Result { }); } - // Create an isolated runtime with the type check snapshot - // Serialize runtime creation to prevent V8 race conditions - let mut js_runtime = { - let _guard = TYPE_CHECK_MUTEX.lock().await; - JsRuntime::new(RuntimeOptions { - module_loader: Some(Rc::new(deno_core::FsModuleLoader)), - startup_snapshot: Some(TYPE_CHECK_SNAPSHOT), - extensions: vec![pctx_type_check_snapshot::init()], - ..Default::default() - }) - }; + println!("[type_check] syntax check passed, creating JS runtime"); + // Ensure V8 platform is initialized (safe to call multiple times) + init_v8_platform(); + + let mut js_runtime = JsRuntime::new(RuntimeOptions { + module_loader: Some(Rc::new(deno_core::FsModuleLoader)), + startup_snapshot: Some(TYPE_CHECK_SNAPSHOT), + extensions: vec![pctx_type_check_snapshot::init()], + ..Default::default() + }); + println!("[type_check] creating JS runtime passed"); // Call the type checking function from the runtime let code_json = @@ -225,10 +229,14 @@ pub async fn type_check(code: &str) -> Result { " ); + println!("[type_check] starting typecheck"); + let result = js_runtime .execute_script("", check_script) .map_err(|e| TypeCheckError::InternalError(e.to_string()))?; + println!("[type_check] finished typecheck"); + // Extract the result using v8 scope let check_result = { deno_core::scope!(scope, &mut js_runtime); @@ -237,6 +245,8 @@ pub async fn type_check(code: &str) -> Result { .map_err(|e| TypeCheckError::InternalError(e.to_string()))? }; + println!("[type_check] finished check_result"); + Ok(check_result) } diff --git a/pctx-py/tests/scripts/manual_code_mode.py b/pctx-py/tests/scripts/manual_code_mode.py index ed67142..edba6f7 100755 --- a/pctx-py/tests/scripts/manual_code_mode.py +++ b/pctx-py/tests/scripts/manual_code_mode.py @@ -1,102 +1,117 @@ import asyncio import pprint -from datetime import datetime -from os import getenv - -from pydantic import BaseModel +from time import sleep from pctx_client import Pctx, tool +# @tool +# def now_timestamp() -> float: +# """Returns current timestamp""" +# return datetime.now().timestamp() -@tool -def now_timestamp() -> float: - """Returns current timestamp""" - return datetime.now().timestamp() +# @tool +# def search_logs(query: str = "", level: str = "info", limit: int = 100) -> list[dict]: +# """Search application logs with optional filters""" +# return [ +# {"message": f"match for '{query}'", "level": level, "index": i} +# for i in range(min(limit, 3)) +# ] -@tool -def search_logs(query: str = "", level: str = "info", limit: int = 100) -> list[dict]: - """Search application logs with optional filters""" - return [ - {"message": f"match for '{query}'", "level": level, "index": i} - for i in range(min(limit, 3)) - ] +# @tool("add", namespace="my_math") +# def add(a: float, b: float) -> float: +# """adds two numbers""" +# return a + b -@tool("add", namespace="my_math") -def add(a: float, b: float) -> float: - """adds two numbers""" - return a + b +# @tool("subtract", namespace="my_math") +# def subtract(a: float, b: float) -> float: +# """subtracts b from a""" +# return a - b -@tool("subtract", namespace="my_math") -def subtract(a: float, b: float) -> float: - """subtracts b from a""" - return a - b +# class MultiplyOutput(BaseModel): +# message: str +# result: float -class MultiplyOutput(BaseModel): - message: str - result: float +# @tool("multiply", namespace="my_math") +# def multiply(a: float, b: float) -> MultiplyOutput: +# """multiplies a and b""" +# return MultiplyOutput(message=f"Show your work! {a} * {b} = {a * b}", result=a * b) -@tool("multiply", namespace="my_math") -def multiply(a: float, b: float) -> MultiplyOutput: - """multiplies a and b""" - return MultiplyOutput(message=f"Show your work! {a} * {b} = {a * b}", result=a * b) +BLAH = 0 + + +@tool +def get_weather(city: str) -> str: + """Get the current weather for a city.""" + return f"72°F and sunny in {city}" + + +@tool +def get_time(timezone: str) -> str: + """Get the current time in a given timezone.""" + global BLAH + BLAH += 1 + print(f"SLEEPING {BLAH - 1}!") + sleep(15) + return f"3:00 PM in {timezone}" async def main(): async with Pctx( # url="https://....", # api_key="pctx_xxxx", - tools=[add, subtract, multiply, now_timestamp, search_logs], - servers=[ - { - "name": "stripe", - "url": "https://mcp.stripe.com", - "auth": { - "type": "bearer", - "token": getenv("STRIPE_MCP_KEY"), - }, - } - ], + # tools=[add, subtract, multiply, now_timestamp, search_logs], + tools=[get_time, get_weather], + # servers=[ + # { + # "name": "stripe", + # "url": "https://mcp.stripe.com", + # "auth": { + # "type": "bearer", + # "token": getenv("STRIPE_MCP_KEY"), + # }, + # } + # ], ) as p: - print("+++++++++++ LIST +++++++++++\n") - print((await p.list_functions()).code) + # print("+++++++++++ LIST +++++++++++\n") + # print((await p.list_functions()).code) - print("\n\n+++++++++++ DETAILS +++++++++++\n") - print((await p.get_function_details(["MyMath.add", "Tools.nowTimestamp"])).code) + # print("\n\n+++++++++++ DETAILS +++++++++++\n") + # print((await p.get_function_details(["MyMath.add", "Tools.nowTimestamp"])).code) code = """ async function run() { - let addval = await MyMath.add({a: 40, b: 2}); - let subval = await MyMath.subtract({a: addval, b: 2}); - let multval = await MyMath.multiply({a: subval, b: 2}); - let now = await Tools.nowTimestamp(); - let customers = await Stripe.listCustomers({}); - let logs = await Tools.searchLogs(); - let logs2 = await Tools.searchLogs({}); - let logs3 = await Tools.searchLogs({ query: "custom query" }); + // Get weather for London + const weather = await Tools.getWeather({ city: "London" }); + // Get current time in London timezone + const time = await Tools.getTime({ timezone: "Europe/London" }); - return { multval, now }; + return { + weather, + time + }; } """ output = await p.execute(code) pprint.pprint(output) - invalid_code = """ -async function run() { - let addval = await MyMath.add({a: "40", b: 2}); // invalid because `a` must be a number - return addval; -} - """ - invalid_output = await p.execute(invalid_code) - pprint.pprint(invalid_output) +# invalid_code = """ +# async function run() { +# let addval = await MyMath.add({a: "40", b: 2}); // invalid because `a` must be a number + +# return addval; +# } +# """ +# invalid_output = await p.execute(invalid_code) +# pprint.pprint(invalid_output) - print(p._session_id) +# print(p._session_id) if __name__ == "__main__": From 9edb5d886522e14523782f8305bd10d9757cf98c Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Wed, 4 Feb 2026 17:08:27 -0500 Subject: [PATCH 2/3] fix cargo fmt --- crates/pctx_executor/src/lib.rs | 2 - crates/pctx_type_check_runtime/src/lib.rs | 9 -- pctx-py/tests/scripts/manual_code_mode.py | 141 ++++++++++------------ 3 files changed, 63 insertions(+), 89 deletions(-) diff --git a/crates/pctx_executor/src/lib.rs b/crates/pctx_executor/src/lib.rs index d6b2138..e46b53e 100644 --- a/crates/pctx_executor/src/lib.rs +++ b/crates/pctx_executor/src/lib.rs @@ -145,9 +145,7 @@ pub async fn execute(code: &str, options: ExecuteOptions) -> Result Result { - println!("[type_check] starting syntax check"); // First do a quick syntax check with deno_ast let parse_result = deno_ast::parse_module(deno_ast::ParseParams { specifier: deno_ast::ModuleSpecifier::parse("file:///check.ts") @@ -204,7 +203,6 @@ pub async fn type_check(code: &str) -> Result { }); } - println!("[type_check] syntax check passed, creating JS runtime"); // Ensure V8 platform is initialized (safe to call multiple times) init_v8_platform(); @@ -214,7 +212,6 @@ pub async fn type_check(code: &str) -> Result { extensions: vec![pctx_type_check_snapshot::init()], ..Default::default() }); - println!("[type_check] creating JS runtime passed"); // Call the type checking function from the runtime let code_json = @@ -229,14 +226,10 @@ pub async fn type_check(code: &str) -> Result { " ); - println!("[type_check] starting typecheck"); - let result = js_runtime .execute_script("", check_script) .map_err(|e| TypeCheckError::InternalError(e.to_string()))?; - println!("[type_check] finished typecheck"); - // Extract the result using v8 scope let check_result = { deno_core::scope!(scope, &mut js_runtime); @@ -245,8 +238,6 @@ pub async fn type_check(code: &str) -> Result { .map_err(|e| TypeCheckError::InternalError(e.to_string()))? }; - println!("[type_check] finished check_result"); - Ok(check_result) } diff --git a/pctx-py/tests/scripts/manual_code_mode.py b/pctx-py/tests/scripts/manual_code_mode.py index edba6f7..ed67142 100755 --- a/pctx-py/tests/scripts/manual_code_mode.py +++ b/pctx-py/tests/scripts/manual_code_mode.py @@ -1,117 +1,102 @@ import asyncio import pprint -from time import sleep +from datetime import datetime +from os import getenv -from pctx_client import Pctx, tool - -# @tool -# def now_timestamp() -> float: -# """Returns current timestamp""" -# return datetime.now().timestamp() +from pydantic import BaseModel +from pctx_client import Pctx, tool -# @tool -# def search_logs(query: str = "", level: str = "info", limit: int = 100) -> list[dict]: -# """Search application logs with optional filters""" -# return [ -# {"message": f"match for '{query}'", "level": level, "index": i} -# for i in range(min(limit, 3)) -# ] +@tool +def now_timestamp() -> float: + """Returns current timestamp""" + return datetime.now().timestamp() -# @tool("add", namespace="my_math") -# def add(a: float, b: float) -> float: -# """adds two numbers""" -# return a + b +@tool +def search_logs(query: str = "", level: str = "info", limit: int = 100) -> list[dict]: + """Search application logs with optional filters""" + return [ + {"message": f"match for '{query}'", "level": level, "index": i} + for i in range(min(limit, 3)) + ] -# @tool("subtract", namespace="my_math") -# def subtract(a: float, b: float) -> float: -# """subtracts b from a""" -# return a - b +@tool("add", namespace="my_math") +def add(a: float, b: float) -> float: + """adds two numbers""" + return a + b -# class MultiplyOutput(BaseModel): -# message: str -# result: float +@tool("subtract", namespace="my_math") +def subtract(a: float, b: float) -> float: + """subtracts b from a""" + return a - b -# @tool("multiply", namespace="my_math") -# def multiply(a: float, b: float) -> MultiplyOutput: -# """multiplies a and b""" -# return MultiplyOutput(message=f"Show your work! {a} * {b} = {a * b}", result=a * b) -BLAH = 0 +class MultiplyOutput(BaseModel): + message: str + result: float -@tool -def get_weather(city: str) -> str: - """Get the current weather for a city.""" - return f"72°F and sunny in {city}" - - -@tool -def get_time(timezone: str) -> str: - """Get the current time in a given timezone.""" - global BLAH - BLAH += 1 - print(f"SLEEPING {BLAH - 1}!") - sleep(15) - return f"3:00 PM in {timezone}" +@tool("multiply", namespace="my_math") +def multiply(a: float, b: float) -> MultiplyOutput: + """multiplies a and b""" + return MultiplyOutput(message=f"Show your work! {a} * {b} = {a * b}", result=a * b) async def main(): async with Pctx( # url="https://....", # api_key="pctx_xxxx", - # tools=[add, subtract, multiply, now_timestamp, search_logs], - tools=[get_time, get_weather], - # servers=[ - # { - # "name": "stripe", - # "url": "https://mcp.stripe.com", - # "auth": { - # "type": "bearer", - # "token": getenv("STRIPE_MCP_KEY"), - # }, - # } - # ], + tools=[add, subtract, multiply, now_timestamp, search_logs], + servers=[ + { + "name": "stripe", + "url": "https://mcp.stripe.com", + "auth": { + "type": "bearer", + "token": getenv("STRIPE_MCP_KEY"), + }, + } + ], ) as p: - # print("+++++++++++ LIST +++++++++++\n") - # print((await p.list_functions()).code) + print("+++++++++++ LIST +++++++++++\n") + print((await p.list_functions()).code) - # print("\n\n+++++++++++ DETAILS +++++++++++\n") - # print((await p.get_function_details(["MyMath.add", "Tools.nowTimestamp"])).code) + print("\n\n+++++++++++ DETAILS +++++++++++\n") + print((await p.get_function_details(["MyMath.add", "Tools.nowTimestamp"])).code) code = """ async function run() { - // Get weather for London - const weather = await Tools.getWeather({ city: "London" }); + let addval = await MyMath.add({a: 40, b: 2}); + let subval = await MyMath.subtract({a: addval, b: 2}); + let multval = await MyMath.multiply({a: subval, b: 2}); + let now = await Tools.nowTimestamp(); + let customers = await Stripe.listCustomers({}); + let logs = await Tools.searchLogs(); + let logs2 = await Tools.searchLogs({}); + let logs3 = await Tools.searchLogs({ query: "custom query" }); - // Get current time in London timezone - const time = await Tools.getTime({ timezone: "Europe/London" }); - return { - weather, - time - }; + return { multval, now }; } """ output = await p.execute(code) pprint.pprint(output) + invalid_code = """ +async function run() { + let addval = await MyMath.add({a: "40", b: 2}); // invalid because `a` must be a number -# invalid_code = """ -# async function run() { -# let addval = await MyMath.add({a: "40", b: 2}); // invalid because `a` must be a number - -# return addval; -# } -# """ -# invalid_output = await p.execute(invalid_code) -# pprint.pprint(invalid_output) + return addval; +} + """ + invalid_output = await p.execute(invalid_code) + pprint.pprint(invalid_output) -# print(p._session_id) + print(p._session_id) if __name__ == "__main__": From 8144580d5384a92317c6a7dd76836fb65974d788 Mon Sep 17 00:00:00 2001 From: Elias Posen Date: Wed, 4 Feb 2026 17:13:19 -0500 Subject: [PATCH 3/3] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73d9c58..e976267 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- JS runtime race condition by moving the V8 mutex to be held for the entire typecheck/execute process. This previously caused a panic: `../../../../third_party/libc++/src/include/__vector/vector.h:416: libc++ Hardening assertion __n < size() failed: vector[] index out of bounds` + ## [v0.4.3] - 2026-01-27 ### Added