Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions crates/bashkit-python/bashkit/deepagents.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from __future__ import annotations

import shlex
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -184,7 +185,7 @@ async def aexecute(self, command: str) -> ExecuteResponse:
# === File Operations ===

def read(self, file_path: str, offset: int = 0, limit: int = 2000) -> str:
result = self._bash.execute_sync(f"cat {file_path}")
result = self._bash.execute_sync(f"cat {shlex.quote(file_path)}")
if result.exit_code != 0:
return f"Error: {result.stderr or 'File not found'}"
lines = result.stdout.splitlines()
Expand All @@ -195,15 +196,15 @@ async def aread(self, file_path: str, offset: int = 0, limit: int = 2000) -> str
return self.read(file_path, offset, limit)

def write(self, file_path: str, content: str) -> WriteResult:
cmd = f"cat > {file_path} << 'BASHKIT_EOF'\n{content}\nBASHKIT_EOF"
cmd = f"cat > {shlex.quote(file_path)} << 'BASHKIT_EOF'\n{content}\nBASHKIT_EOF"
result = self._bash.execute_sync(cmd)
return WriteResult(error=result.stderr if result.exit_code != 0 else None, path=file_path)

async def awrite(self, file_path: str, content: str) -> WriteResult:
return self.write(file_path, content)

def edit(self, file_path: str, old_string: str, new_string: str, replace_all: bool = False) -> EditResult:
result = self._bash.execute_sync(f"cat {file_path}")
result = self._bash.execute_sync(f"cat {shlex.quote(file_path)}")
if result.exit_code != 0:
return EditResult(error=f"File not found: {file_path}")
content = result.stdout
Expand All @@ -227,7 +228,7 @@ async def aedit(
# === File Discovery ===

def ls_info(self, path: str) -> list[FileInfo]:
result = self._bash.execute_sync(f"ls -la {path}")
result = self._bash.execute_sync(f"ls -la {shlex.quote(path)}")
if result.exit_code != 0:
return []
files = []
Expand Down Expand Up @@ -255,7 +256,7 @@ async def als_info(self, path: str) -> list[FileInfo]:

def glob_info(self, pattern: str, path: str = "/") -> list[FileInfo]:
name_pattern = pattern.replace("**/", "").replace("**", "*") if "**" in pattern else pattern
result = self._bash.execute_sync(f"find {path} -name '{name_pattern}' -type f")
result = self._bash.execute_sync(f"find {shlex.quote(path)} -name {shlex.quote(name_pattern)} -type f")
if result.exit_code != 0:
return []
return [
Expand All @@ -275,7 +276,9 @@ async def aglob_info(self, pattern: str, path: str = "/") -> list[FileInfo]:
return self.glob_info(pattern, path)

def grep_raw(self, pattern: str, path: str | None = None, glob: str | None = None) -> list[GrepMatch] | str:
cmd = f"grep -rn '{pattern}' {path}" if path else f"grep -rn '{pattern}' /home"
quoted_pattern = shlex.quote(pattern)
search_path = shlex.quote(path) if path else "/home"
cmd = f"grep -rn {quoted_pattern} {search_path}"
result = self._bash.execute_sync(cmd)
matches = []
for line in result.stdout.splitlines():
Expand All @@ -299,7 +302,7 @@ async def agrep_raw(
def download_files(self, paths: list[str]) -> list[FileDownloadResponse]:
responses = []
for p in paths:
result = self._bash.execute_sync(f"cat {p}")
result = self._bash.execute_sync(f"cat {shlex.quote(p)}")
if result.exit_code == 0:
responses.append(FileDownloadResponse(path=p, content=result.stdout.encode(), error=None))
else:
Expand Down
104 changes: 104 additions & 0 deletions crates/bashkit-python/tests/test_shell_injection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Tests for shell injection prevention in BashkitBackend (deepagents.py).

Verifies that user-supplied paths and patterns are properly quoted
with shlex.quote() to prevent command injection via f-string interpolation.

Ref: GitHub issue #411
"""

import shlex
from pathlib import Path

# Read deepagents.py source directly (BashkitBackend only exists when
# deepagents is installed, so we inspect source text instead).
_DEEPAGENTS_SRC = (Path(__file__).resolve().parent.parent / "bashkit" / "deepagents.py").read_text()


# -- Module-level checks -----------------------------------------------------


def test_shlex_imported_in_deepagents():
"""deepagents.py must import shlex for shell argument quoting."""
assert "import shlex" in _DEEPAGENTS_SRC, "deepagents.py must import shlex"


def test_no_unquoted_cat_interpolation():
"""No raw f'cat {var}' patterns without shlex.quote."""
# After fix, all cat uses should go through shlex.quote
for line in _DEEPAGENTS_SRC.splitlines():
stripped = line.strip()
if stripped.startswith("#"):
continue
if 'f"cat {' in stripped or "f'cat {" in stripped:
if "shlex.quote" not in stripped:
assert False, f"Unquoted cat interpolation found: {stripped}"


def test_no_unquoted_ls_interpolation():
"""No raw f'ls -la {var}' patterns without shlex.quote."""
for line in _DEEPAGENTS_SRC.splitlines():
stripped = line.strip()
if stripped.startswith("#"):
continue
if 'f"ls ' in stripped and "{" in stripped and "shlex.quote" not in stripped:
assert False, f"Unquoted ls interpolation found: {stripped}"


def test_no_unquoted_find_interpolation():
"""No raw f'find {var}' patterns without shlex.quote."""
for line in _DEEPAGENTS_SRC.splitlines():
stripped = line.strip()
if stripped.startswith("#"):
continue
if 'f"find {' in stripped or "f'find {" in stripped:
if "shlex.quote" not in stripped:
assert False, f"Unquoted find interpolation found: {stripped}"


def test_no_unquoted_grep_interpolation():
"""grep_raw must use shlex.quote for pattern and path."""
# Extract grep_raw method body and verify shlex.quote is used
in_grep_raw = False
grep_raw_lines = []
for line in _DEEPAGENTS_SRC.splitlines():
if "def grep_raw(" in line:
in_grep_raw = True
elif in_grep_raw and (line.strip().startswith("def ") or line.strip().startswith("async def ")):
break
if in_grep_raw:
grep_raw_lines.append(line)
grep_raw_body = "\n".join(grep_raw_lines)
assert "shlex.quote" in grep_raw_body, "grep_raw must use shlex.quote for pattern/path"


def test_shlex_quote_used_for_file_paths():
"""shlex.quote must appear in methods that interpolate file paths."""
assert _DEEPAGENTS_SRC.count("shlex.quote") >= 7, (
"Expected at least 7 uses of shlex.quote (read, write, edit, ls_info, glob_info, grep_raw, download_files)"
)


# -- shlex.quote behavior validation -----------------------------------------


def test_shlex_quote_prevents_semicolon_injection():
"""shlex.quote must neutralize semicolon-based injection."""
malicious = "/dev/null; echo pwned"
quoted = shlex.quote(malicious)
# Quoted string wraps in single quotes, preventing shell interpretation
assert quoted.startswith("'"), "shlex.quote must single-quote dangerous input"
assert ";" in quoted # semicolon is inside quotes, not a command separator


def test_shlex_quote_prevents_backtick_injection():
"""shlex.quote must neutralize backtick-based injection."""
malicious = "/tmp/`rm -rf /`/file"
quoted = shlex.quote(malicious)
assert quoted.startswith("'"), "shlex.quote must single-quote dangerous input"


def test_shlex_quote_prevents_dollar_expansion():
"""shlex.quote must neutralize $() command substitution."""
malicious = "/tmp/$(cat /etc/passwd)/file"
quoted = shlex.quote(malicious)
assert quoted.startswith("'"), "shlex.quote must single-quote dangerous input"
Loading