From 4582e5fed659d7a4d82077437e66d15801e66630 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 1 Mar 2026 08:37:33 +0000 Subject: [PATCH] fix(python): prevent shell injection in BashkitBackend Use shlex.quote() for all user-supplied values interpolated into shell commands in deepagents.py: read(), write(), edit(), ls_info(), glob_info(), grep_raw(), and download_files(). Add test_shell_injection.py with source-level checks ensuring no unquoted interpolation patterns exist, plus shlex.quote behavior validation tests. Closes #411 https://claude.ai/code/session_01WZjYqxm5xMPAEe7FSHJkDy --- crates/bashkit-python/bashkit/deepagents.py | 17 +-- .../tests/test_shell_injection.py | 104 ++++++++++++++++++ 2 files changed, 114 insertions(+), 7 deletions(-) create mode 100644 crates/bashkit-python/tests/test_shell_injection.py diff --git a/crates/bashkit-python/bashkit/deepagents.py b/crates/bashkit-python/bashkit/deepagents.py index 327029fc..211da4cb 100644 --- a/crates/bashkit-python/bashkit/deepagents.py +++ b/crates/bashkit-python/bashkit/deepagents.py @@ -13,6 +13,7 @@ from __future__ import annotations +import shlex import uuid from datetime import datetime, timezone from typing import TYPE_CHECKING @@ -184,7 +185,7 @@ async def aexecute(self, command: str) -> ExecuteResponse: # === File Operations === def read(self, file_path: str, offset: int = 0, limit: int = 2000) -> str: - result = self._bash.execute_sync(f"cat {file_path}") + result = self._bash.execute_sync(f"cat {shlex.quote(file_path)}") if result.exit_code != 0: return f"Error: {result.stderr or 'File not found'}" lines = result.stdout.splitlines() @@ -195,7 +196,7 @@ async def aread(self, file_path: str, offset: int = 0, limit: int = 2000) -> str return self.read(file_path, offset, limit) def write(self, file_path: str, content: str) -> WriteResult: - cmd = f"cat > {file_path} << 'BASHKIT_EOF'\n{content}\nBASHKIT_EOF" + cmd = f"cat > {shlex.quote(file_path)} << 'BASHKIT_EOF'\n{content}\nBASHKIT_EOF" result = self._bash.execute_sync(cmd) return WriteResult(error=result.stderr if result.exit_code != 0 else None, path=file_path) @@ -203,7 +204,7 @@ async def awrite(self, file_path: str, content: str) -> WriteResult: return self.write(file_path, content) def edit(self, file_path: str, old_string: str, new_string: str, replace_all: bool = False) -> EditResult: - result = self._bash.execute_sync(f"cat {file_path}") + result = self._bash.execute_sync(f"cat {shlex.quote(file_path)}") if result.exit_code != 0: return EditResult(error=f"File not found: {file_path}") content = result.stdout @@ -227,7 +228,7 @@ async def aedit( # === File Discovery === def ls_info(self, path: str) -> list[FileInfo]: - result = self._bash.execute_sync(f"ls -la {path}") + result = self._bash.execute_sync(f"ls -la {shlex.quote(path)}") if result.exit_code != 0: return [] files = [] @@ -255,7 +256,7 @@ async def als_info(self, path: str) -> list[FileInfo]: def glob_info(self, pattern: str, path: str = "/") -> list[FileInfo]: name_pattern = pattern.replace("**/", "").replace("**", "*") if "**" in pattern else pattern - result = self._bash.execute_sync(f"find {path} -name '{name_pattern}' -type f") + result = self._bash.execute_sync(f"find {shlex.quote(path)} -name {shlex.quote(name_pattern)} -type f") if result.exit_code != 0: return [] return [ @@ -275,7 +276,9 @@ async def aglob_info(self, pattern: str, path: str = "/") -> list[FileInfo]: return self.glob_info(pattern, path) def grep_raw(self, pattern: str, path: str | None = None, glob: str | None = None) -> list[GrepMatch] | str: - cmd = f"grep -rn '{pattern}' {path}" if path else f"grep -rn '{pattern}' /home" + quoted_pattern = shlex.quote(pattern) + search_path = shlex.quote(path) if path else "/home" + cmd = f"grep -rn {quoted_pattern} {search_path}" result = self._bash.execute_sync(cmd) matches = [] for line in result.stdout.splitlines(): @@ -299,7 +302,7 @@ async def agrep_raw( def download_files(self, paths: list[str]) -> list[FileDownloadResponse]: responses = [] for p in paths: - result = self._bash.execute_sync(f"cat {p}") + result = self._bash.execute_sync(f"cat {shlex.quote(p)}") if result.exit_code == 0: responses.append(FileDownloadResponse(path=p, content=result.stdout.encode(), error=None)) else: diff --git a/crates/bashkit-python/tests/test_shell_injection.py b/crates/bashkit-python/tests/test_shell_injection.py new file mode 100644 index 00000000..98590529 --- /dev/null +++ b/crates/bashkit-python/tests/test_shell_injection.py @@ -0,0 +1,104 @@ +"""Tests for shell injection prevention in BashkitBackend (deepagents.py). + +Verifies that user-supplied paths and patterns are properly quoted +with shlex.quote() to prevent command injection via f-string interpolation. + +Ref: GitHub issue #411 +""" + +import shlex +from pathlib import Path + +# Read deepagents.py source directly (BashkitBackend only exists when +# deepagents is installed, so we inspect source text instead). +_DEEPAGENTS_SRC = (Path(__file__).resolve().parent.parent / "bashkit" / "deepagents.py").read_text() + + +# -- Module-level checks ----------------------------------------------------- + + +def test_shlex_imported_in_deepagents(): + """deepagents.py must import shlex for shell argument quoting.""" + assert "import shlex" in _DEEPAGENTS_SRC, "deepagents.py must import shlex" + + +def test_no_unquoted_cat_interpolation(): + """No raw f'cat {var}' patterns without shlex.quote.""" + # After fix, all cat uses should go through shlex.quote + for line in _DEEPAGENTS_SRC.splitlines(): + stripped = line.strip() + if stripped.startswith("#"): + continue + if 'f"cat {' in stripped or "f'cat {" in stripped: + if "shlex.quote" not in stripped: + assert False, f"Unquoted cat interpolation found: {stripped}" + + +def test_no_unquoted_ls_interpolation(): + """No raw f'ls -la {var}' patterns without shlex.quote.""" + for line in _DEEPAGENTS_SRC.splitlines(): + stripped = line.strip() + if stripped.startswith("#"): + continue + if 'f"ls ' in stripped and "{" in stripped and "shlex.quote" not in stripped: + assert False, f"Unquoted ls interpolation found: {stripped}" + + +def test_no_unquoted_find_interpolation(): + """No raw f'find {var}' patterns without shlex.quote.""" + for line in _DEEPAGENTS_SRC.splitlines(): + stripped = line.strip() + if stripped.startswith("#"): + continue + if 'f"find {' in stripped or "f'find {" in stripped: + if "shlex.quote" not in stripped: + assert False, f"Unquoted find interpolation found: {stripped}" + + +def test_no_unquoted_grep_interpolation(): + """grep_raw must use shlex.quote for pattern and path.""" + # Extract grep_raw method body and verify shlex.quote is used + in_grep_raw = False + grep_raw_lines = [] + for line in _DEEPAGENTS_SRC.splitlines(): + if "def grep_raw(" in line: + in_grep_raw = True + elif in_grep_raw and (line.strip().startswith("def ") or line.strip().startswith("async def ")): + break + if in_grep_raw: + grep_raw_lines.append(line) + grep_raw_body = "\n".join(grep_raw_lines) + assert "shlex.quote" in grep_raw_body, "grep_raw must use shlex.quote for pattern/path" + + +def test_shlex_quote_used_for_file_paths(): + """shlex.quote must appear in methods that interpolate file paths.""" + assert _DEEPAGENTS_SRC.count("shlex.quote") >= 7, ( + "Expected at least 7 uses of shlex.quote (read, write, edit, ls_info, glob_info, grep_raw, download_files)" + ) + + +# -- shlex.quote behavior validation ----------------------------------------- + + +def test_shlex_quote_prevents_semicolon_injection(): + """shlex.quote must neutralize semicolon-based injection.""" + malicious = "/dev/null; echo pwned" + quoted = shlex.quote(malicious) + # Quoted string wraps in single quotes, preventing shell interpretation + assert quoted.startswith("'"), "shlex.quote must single-quote dangerous input" + assert ";" in quoted # semicolon is inside quotes, not a command separator + + +def test_shlex_quote_prevents_backtick_injection(): + """shlex.quote must neutralize backtick-based injection.""" + malicious = "/tmp/`rm -rf /`/file" + quoted = shlex.quote(malicious) + assert quoted.startswith("'"), "shlex.quote must single-quote dangerous input" + + +def test_shlex_quote_prevents_dollar_expansion(): + """shlex.quote must neutralize $() command substitution.""" + malicious = "/tmp/$(cat /etc/passwd)/file" + quoted = shlex.quote(malicious) + assert quoted.startswith("'"), "shlex.quote must single-quote dangerous input"