Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions crates/bashkit-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ impl ExecResult {
#[allow(dead_code)]
pub struct PyBash {
inner: Arc<Mutex<Bash>>,
/// Shared tokio runtime — reused across all sync calls to avoid
/// per-call OS thread/fd exhaustion (issue #414).
rt: tokio::runtime::Runtime,
username: Option<String>,
hostname: Option<String>,
max_commands: Option<u64>,
Expand Down Expand Up @@ -201,8 +204,14 @@ impl PyBash {

let bash = builder.build();

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to create runtime: {e}")))?;

Ok(Self {
inner: Arc::new(Mutex::new(bash)),
rt,
username,
hostname,
max_commands,
Expand Down Expand Up @@ -236,11 +245,9 @@ impl PyBash {
/// Releases GIL before blocking on tokio to prevent deadlock with callbacks.
fn execute_sync(&self, py: Python<'_>, commands: String) -> PyResult<ExecResult> {
let inner = self.inner.clone();
let rt = tokio::runtime::Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to create runtime: {}", e)))?;

py.detach(|| {
rt.block_on(async move {
self.rt.block_on(async move {
let mut bash = inner.lock().await;
match bash.exec(&commands).await {
Ok(result) => Ok(ExecResult {
Expand All @@ -264,11 +271,9 @@ impl PyBash {
/// Releases GIL before blocking on tokio to prevent deadlock.
fn reset(&self, py: Python<'_>) -> PyResult<()> {
let inner = self.inner.clone();
let rt = tokio::runtime::Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to create runtime: {}", e)))?;

py.detach(|| {
rt.block_on(async move {
self.rt.block_on(async move {
let mut bash = inner.lock().await;
let builder = Bash::builder();
*bash = builder.build();
Expand Down Expand Up @@ -318,6 +323,9 @@ impl PyBash {
#[allow(dead_code)]
pub struct BashTool {
inner: Arc<Mutex<Bash>>,
/// Shared tokio runtime — reused across all sync calls to avoid
/// per-call OS thread/fd exhaustion (issue #414).
rt: tokio::runtime::Runtime,
username: Option<String>,
hostname: Option<String>,
max_commands: Option<u64>,
Expand Down Expand Up @@ -354,8 +362,14 @@ impl BashTool {

let bash = builder.build();

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to create runtime: {e}")))?;

Ok(Self {
inner: Arc::new(Mutex::new(bash)),
rt,
username,
hostname,
max_commands,
Expand Down Expand Up @@ -387,11 +401,9 @@ impl BashTool {
/// Releases GIL before blocking on tokio to prevent deadlock with callbacks.
fn execute_sync(&self, py: Python<'_>, commands: String) -> PyResult<ExecResult> {
let inner = self.inner.clone();
let rt = tokio::runtime::Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to create runtime: {}", e)))?;

py.detach(|| {
rt.block_on(async move {
self.rt.block_on(async move {
let mut bash = inner.lock().await;
match bash.exec(&commands).await {
Ok(result) => Ok(ExecResult {
Expand All @@ -414,11 +426,9 @@ impl BashTool {
/// Releases GIL before blocking on tokio to prevent deadlock.
fn reset(&self, py: Python<'_>) -> PyResult<()> {
let inner = self.inner.clone();
let rt = tokio::runtime::Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to create runtime: {}", e)))?;

py.detach(|| {
rt.block_on(async move {
self.rt.block_on(async move {
let mut bash = inner.lock().await;
let builder = Bash::builder();
*bash = builder.build();
Expand Down Expand Up @@ -521,6 +531,9 @@ pub struct ScriptedTool {
short_desc: Option<String>,
tools: Vec<PyToolEntry>,
env_vars: Vec<(String, String)>,
/// Shared tokio runtime — reused across all sync calls to avoid
/// per-call OS thread/fd exhaustion (issue #414).
rt: tokio::runtime::Runtime,
max_commands: Option<u64>,
max_loop_iterations: Option<u64>,
}
Expand Down Expand Up @@ -594,15 +607,21 @@ impl ScriptedTool {
short_description: Option<String>,
max_commands: Option<u64>,
max_loop_iterations: Option<u64>,
) -> Self {
Self {
) -> PyResult<Self> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to create runtime: {e}")))?;

Ok(Self {
name,
short_desc: short_description,
tools: Vec::new(),
env_vars: Vec::new(),
rt,
max_commands,
max_loop_iterations,
}
})
}

/// Register a tool command.
Expand Down Expand Up @@ -667,11 +686,9 @@ impl ScriptedTool {
/// Releases GIL before blocking on tokio to prevent deadlock with callbacks.
fn execute_sync(&self, py: Python<'_>, commands: String) -> PyResult<ExecResult> {
let mut tool = self.build_rust_tool();
let rt = tokio::runtime::Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to create runtime: {}", e)))?;

let resp = py.detach(|| {
rt.block_on(async move {
self.rt.block_on(async move {
tool.execute(ToolRequest {
commands,
timeout_ms: None,
Expand Down
44 changes: 44 additions & 0 deletions crates/bashkit-python/tests/test_bashkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,3 +848,47 @@ def run_in_thread(idx):
assert not t1.is_alive(), "Thread 1 deadlocked (GIL not released)"
assert errors[0] is None, f"Thread 0 error: {errors[0]}"
assert errors[1] is None, f"Thread 1 error: {errors[1]}"


# ===========================================================================
# Runtime reuse (issue #414)
# ===========================================================================


def test_bash_rapid_sync_calls_no_resource_exhaustion():
"""Rapid execute_sync calls reuse a single runtime (no thread/fd leak)."""
bash = Bash()
for i in range(200):
r = bash.execute_sync(f"echo {i}")
assert r.exit_code == 0
assert r.stdout.strip() == str(i)


def test_bashtool_rapid_sync_calls_no_resource_exhaustion():
"""Rapid execute_sync calls reuse a single runtime (no thread/fd leak)."""
tool = BashTool()
for i in range(200):
r = tool.execute_sync(f"echo {i}")
assert r.exit_code == 0
assert r.stdout.strip() == str(i)


def test_bashtool_rapid_reset_no_resource_exhaustion():
"""Rapid reset calls reuse a single runtime (no thread/fd leak)."""
tool = BashTool()
for _ in range(200):
tool.reset()
# After many resets, tool still works
r = tool.execute_sync("echo ok")
assert r.exit_code == 0
assert r.stdout.strip() == "ok"


def test_scripted_tool_rapid_sync_calls_no_resource_exhaustion():
"""Rapid execute_sync calls on ScriptedTool reuse a single runtime."""
tool = ScriptedTool("api")
tool.add_tool("ping", "Ping", callback=lambda p, s=None: "pong\n")
for i in range(200):
r = tool.execute_sync("ping")
assert r.exit_code == 0
assert r.stdout.strip() == "pong"
Loading