feat(ssh): backend SSH connector (safe defaults)#79
Conversation
Summary of ChangesHello @heidi-dang, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a new SSH connector to the Heidi CLI backend, providing a secure and audited way to interact with remote SSH targets. The connector is designed with strong security defaults, such as being disabled by default, binding only to localhost, and requiring an explicit allowlist for target hosts. It also includes robust audit logging with secret redaction, ensuring all operations are recorded safely. This feature significantly enhances the CLI's capabilities for secure remote execution. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new SSH connector, which is a significant feature. While it incorporates good security practices like being disabled by default, using an allowlist, and having audit logs, it contains critical security vulnerabilities. Specifically, there's a Remote Command Injection vulnerability due to the use of shell=True with user-controlled input in the exec_command function, even in its mock implementation, which could be enabled in production. Additionally, the target allowlist logic is too permissive, and there's a lack of proper session ownership checks, which could lead to unauthorized access in a multi-user environment. There are also a few bugs and areas for improvement related to request handling and configuration reporting. These critical issues must be addressed to ensure the connector's security and robustness before merging.
| result = subprocess.run( | ||
| command, | ||
| shell=True, |
There was a problem hiding this comment.
The exec_command function uses subprocess.run with shell=True and user-provided input (command), which is a critical Remote Command Injection vulnerability. Even though it's currently a mock implementation, this code is executable and can be enabled in production, allowing an attacker with API access to execute arbitrary commands on the host system. To fix this, set shell=False and safely parse the command string into a list using shlex.split().
| def is_target_allowed(target: str) -> bool: | ||
| """Check if target is in allowlist. | ||
|
|
||
| Args: | ||
| target: Target string like "host:port" or "host" | ||
|
|
||
| Returns: | ||
| True if target is allowed, False otherwise | ||
| """ | ||
| if not SSH_TARGET_ALLOWLIST: | ||
| return False # Deny by default if no allowlist | ||
|
|
||
| # Normalize target | ||
| if ":" not in target: | ||
| target = f"{target}:22" | ||
|
|
||
| # Check exact match | ||
| if target in SSH_TARGET_ALLOWLIST: | ||
| return True | ||
|
|
||
| # Check host-only match (port 22 default) | ||
| host = target.split(":")[0] | ||
| if host in SSH_TARGET_ALLOWLIST: | ||
| return True | ||
|
|
||
| return False |
There was a problem hiding this comment.
The current implementation of is_target_allowed has a security vulnerability. It allows connections to any port on a host if the host name alone is in the allowlist. For example, if HEIDI_SSH_TARGETS is set to "localhost", a user can connect to localhost:2222 or any other port, which is likely not the intended behavior. The logic should be stricter to only allow specified host:port combinations or hosts with the default port 22.
I suggest refactoring the function to be more explicit about what is allowed. Also, please add a test case to cover this scenario, for example, checking that is_target_allowed("localhost:2222") returns False when SSH_TARGET_ALLOWLIST is {"localhost"}.
def is_target_allowed(target: str) -> bool:
"""Check if target is in allowlist.
Args:
target: Target string like "host:port" or "host"
Returns:
True if target is allowed, False otherwise
"""
if not SSH_TARGET_ALLOWLIST:
return False # Deny by default if no allowlist
# Check for an exact match, which could be "host" or "host:port".
if target in SSH_TARGET_ALLOWLIST:
return True
# If the user provided a target without a port, check if the version
# with the default port 22 is in the allowlist.
if ":" not in target:
if f"{target}:22" in SSH_TARGET_ALLOWLIST:
return True
return False| target = body.target | ||
| if body.port and body.port != 22: | ||
| target = f"{body.target}:{body.port}" |
There was a problem hiding this comment.
The logic for constructing the target string is buggy. If a user provides a target with a port in body.target (e.g., "localhost:1234") and also provides a body.port (e.g., 5678), the resulting target string will be invalid ("localhost:1234:5678").
You should handle these cases more robustly, for example by raising an error if both are provided, or by giving one precedence over the other.
| target = body.target | |
| if body.port and body.port != 22: | |
| target = f"{body.target}:{body.port}" | |
| target = body.target | |
| if ":" in target and body.port is not None and body.port != 22: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Cannot specify a port in the target string and a different one in the 'port' field.", | |
| ) | |
| if ":" not in target and body.port is not None: | |
| target = f"{target}:{body.port}" |
| ) | ||
|
|
||
|
|
||
| @router.post("/sessions/{session_id}/exec", response_model=ExecResponse) |
There was a problem hiding this comment.
The exec_ssh_command endpoint does not verify that the authenticated user is the owner of the session they are accessing. The verify_auth dependency returns a generic "authenticated" string for any user with a valid API key, and it does not integrate with the request.state.user set by the AuthMiddleware for OAuth-authenticated users. This allows any authenticated user to interact with SSH sessions created by other users if they can obtain the session_id (which is a UUID, but still represents a broken access control).
|
|
||
|
|
||
| @router.delete("/sessions/{session_id}", status_code=204) | ||
| async def close_ssh_session( |
| def get_audit_logger(log_path: Path) -> SSHAuditLogger: | ||
| """Get or create the global audit logger instance. | ||
|
|
||
| Args: | ||
| log_path: Path to audit log file | ||
|
|
||
| Returns: | ||
| SSHAuditLogger instance | ||
| """ | ||
| global _audit_logger | ||
| if _audit_logger is None: | ||
| _audit_logger = SSHAuditLogger(log_path) | ||
| return _audit_logger |
There was a problem hiding this comment.
The get_audit_logger function is implemented as a singleton, but its signature get_audit_logger(log_path: Path) is misleading. If the function is called with different paths, only the path from the first call will be used to initialize the logger.
To make this less error-prone, I recommend refactoring it to not take any arguments and instead fetch the log path directly from the configuration. This will make its behavior as a singleton clearer.
You will then need to update the calls to this function in src/heidi_cli/connectors/ssh/routes.py to get_audit_logger().
| def get_audit_logger(log_path: Path) -> SSHAuditLogger: | |
| """Get or create the global audit logger instance. | |
| Args: | |
| log_path: Path to audit log file | |
| Returns: | |
| SSHAuditLogger instance | |
| """ | |
| global _audit_logger | |
| if _audit_logger is None: | |
| _audit_logger = SSHAuditLogger(log_path) | |
| return _audit_logger | |
| def get_audit_logger() -> SSHAuditLogger: | |
| """Get or create the global audit logger instance. | |
| Returns: | |
| SSHAuditLogger instance | |
| """ | |
| from .config import SSH_AUDIT_LOG_PATH | |
| global _audit_logger | |
| if _audit_logger is None: | |
| _audit_logger = SSHAuditLogger(SSH_AUDIT_LOG_PATH) | |
| return _audit_logger |
| return { | ||
| "enabled": is_ssh_enabled(), | ||
| "bind_address": os.getenv("HEIDI_SSH_BIND", "127.0.0.1"), | ||
| "target_allowlist_count": len(os.getenv("HEIDI_SSH_TARGETS", "").split(",")), |
There was a problem hiding this comment.
The calculation for target_allowlist_count is incorrect. os.getenv("HEIDI_SSH_TARGETS", "").split(",") will result in a list with one empty string (['']) if the environment variable is not set, leading to a count of 1 instead of 0.
You should use the already parsed SSH_TARGET_ALLOWLIST set from the config module to get the correct count. You will need to import SSH_TARGET_ALLOWLIST from .config.
| "target_allowlist_count": len(os.getenv("HEIDI_SSH_TARGETS", "").split(",")), | |
| "target_allowlist_count": len(SSH_TARGET_ALLOWLIST), |
- Remove shell=True - exec_command now returns NOT_IMPLEMENTED stub - Fix tests to use monkeypatch instead of global env at import time - Add PTY streaming endpoints (Phase 2 stubs) - Fix lint errors
031e35e to
7abb850
Compare
Summary
Key Changes
API
Security Defaults
How to Test