Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
dec_env/
dec_Env/
MANIFEST

# PyInstaller
Expand Down
11 changes: 11 additions & 0 deletions SSH/config.ini.TEMPLATE
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,21 @@ system_prompt = Interpret all inputs as though they were SSH commands and provid
# a password by leaving that field blank (e.g., "guest =" on a line by
# itself). You can set an account to accept ANY password, including an empty
# password, by setting the password to "*"
# If "*" is used, login success is determined by the "wildcard_success_rate"
# setting under the [authentication] section.
# Regular accounts with defined passwords (e.g., "admin = admin123") will
# always require the exact password.

[user_accounts]
guest =
user1 = secretpw
user2 = password123
root = *
admin = admin


[authentication]
# Chance-based success rate for wildcard passwords (*).
# A value of 0.3 means a 30% chance of success per attempt.
wildcard_success_rate = 0.3

121 changes: 95 additions & 26 deletions SSH/ssh_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import logging
import datetime
import uuid
import random
from base64 import b64encode
from operator import itemgetter
from langchain_openai import ChatOpenAI
Expand Down Expand Up @@ -109,16 +110,33 @@ def public_key_auth_supported(self) -> bool:
return False
def kbdinit_auth_supported(self) -> bool:
return False

def validate_password(self, username: str, password: str) -> bool:
pw = accounts.get(username, '*')

if pw == '*' or (pw != '*' and password == pw):
logger.info("Authentication success", extra={"username": username, "password": password})
return True
else:
logger.info("Authentication failed", extra={"username": username, "password": password})
return False
"""Authenticate user, with a chance-based success for wildcard passwords."""
pw = accounts.get(username, '*') # Get stored password or wildcard (*)

# Fetch wildcard success rate safely from config, with a default of 30%
try:
wildcard_success_rate = float(config.get('authentication', 'wildcard_success_rate', fallback="0.3"))
except ValueError:
wildcard_success_rate = 0.3 # Default to 30% if there's an issue

# Wildcard password handling with probability-based success
if pw == '*':
if random.random() < wildcard_success_rate: # Chance-based success
logger.info("Wildcard authentication success", extra={"username": username, "password": password})
return True
else:
logger.info("Wildcard authentication failed (random chance)", extra={"username": username, "password": password})
return False

# Regular password validation
if password == pw:
logger.info("Authentication success", extra={"username": username, "password": password})
return True
else:
logger.info("Authentication failed", extra={"username": username, "password": password})
return False

async def session_summary(process: asyncssh.SSHServerProcess, llm_config: dict, session: RunnableWithMessageHistory, server: MySSHServer):
# Check if the summary has already been generated
Expand Down Expand Up @@ -278,7 +296,7 @@ def filter(self, record):
if task:
task_name = task.get_name()
else:
task_name = "-"
task_name = thread_local.__dict__.get('session_id', '-')

record.src_ip = thread_local.__dict__.get('src_ip', '-')
record.src_port = thread_local.__dict__.get('src_port', '-')
Expand All @@ -305,10 +323,10 @@ def get_user_accounts() -> dict:

return accounts

def choose_llm():
llm_provider_name = config['llm'].get("llm_provider", "openai")
def choose_llm(llm_provider: Optional[str] = None, model_name: Optional[str] = None):
llm_provider_name = llm_provider or config['llm'].get("llm_provider", "openai")
llm_provider_name = llm_provider_name.lower()
model_name = config['llm'].get("model_name", "gpt-3.5-turbo")
model_name = model_name or config['llm'].get("model_name", "gpt-3.5-turbo")

if llm_provider_name == 'openai':
llm_model = ChatOpenAI(
Expand Down Expand Up @@ -360,26 +378,77 @@ def get_prompts(prompt: Optional[str], prompt_file: Optional[str]) -> dict:
try:
# Parse command line arguments
parser = argparse.ArgumentParser(description='Start the SSH honeypot server.')
parser.add_argument('-c', '--config', type=str, default='config.ini', help='Path to the configuration file')
parser.add_argument('-c', '--config', type=str, default=None, help='Path to the configuration file')
parser.add_argument('-p', '--prompt', type=str, help='The entire text of the prompt')
parser.add_argument('-f', '--prompt-file', type=str, default='prompt.txt', help='Path to the prompt file')
parser.add_argument('-l', '--llm-provider', type=str, help='The LLM provider to use')
parser.add_argument('-m', '--model-name', type=str, help='The model name to use')
parser.add_argument('-t', '--trimmer-max-tokens', type=int, help='The maximum number of tokens to send to the LLM backend in a single request')
parser.add_argument('-s', '--system-prompt', type=str, help='System prompt for the LLM')
parser.add_argument('-P', '--port', type=int, help='The port the SSH honeypot will listen on')
parser.add_argument('-k', '--host-priv-key', type=str, help='The host key to use for the SSH server')
parser.add_argument('-v', '--server-version-string', type=str, help='The server version string to send to clients')
parser.add_argument('-L', '--log-file', type=str, help='The name of the file you wish to write the honeypot log to')
parser.add_argument('-S', '--sensor-name', type=str, help='The name of the sensor, used to identify this honeypot in the logs')
parser.add_argument('-u', '--user-account', action='append', help='User account in the form username=password. Can be repeated.')
args = parser.parse_args()

# Check if the config file exists
if not os.path.exists(args.config):
print(f"Error: The specified config file '{args.config}' does not exist.", file=sys.stderr)
sys.exit(1)
# Determine which config file to load
config = ConfigParser()
if args.config is not None:
# User explicitly set a config file; error if it doesn't exist.
if not os.path.exists(args.config):
print(f"Error: The specified config file '{args.config}' does not exist.", file=sys.stderr)
sys.exit(1)
config.read(args.config)
else:
default_config = "config.ini"
if os.path.exists(default_config):
config.read(default_config)
else:
# Use defaults when no config file found.
config['honeypot'] = {'log_file': 'ssh_log.log', 'sensor_name': socket.gethostname()}
config['ssh'] = {'port': '8022', 'host_priv_key': 'ssh_host_key', 'server_version_string': 'SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.3'}
config['llm'] = {'llm_provider': 'openai', 'model_name': 'gpt-3.5-turbo', 'trimmer_max_tokens': '64000', 'system_prompt': ''}
config['user_accounts'] = {}

# Override config values with command line arguments if provided
if args.llm_provider:
config['llm']['llm_provider'] = args.llm_provider
if args.model_name:
config['llm']['model_name'] = args.model_name
if args.trimmer_max_tokens:
config['llm']['trimmer_max_tokens'] = str(args.trimmer_max_tokens)
if args.system_prompt:
config['llm']['system_prompt'] = args.system_prompt
if args.port:
config['ssh']['port'] = str(args.port)
if args.host_priv_key:
config['ssh']['host_priv_key'] = args.host_priv_key
if args.server_version_string:
config['ssh']['server_version_string'] = args.server_version_string
if args.log_file:
config['honeypot']['log_file'] = args.log_file
if args.sensor_name:
config['honeypot']['sensor_name'] = args.sensor_name

# Merge command-line user accounts into the config
if args.user_account:
if 'user_accounts' not in config:
config['user_accounts'] = {}
for account in args.user_account:
if '=' in account:
key, value = account.split('=', 1)
config['user_accounts'][key.strip()] = value.strip()
else:
config['user_accounts'][account.strip()] = ''

# Read the user accounts from the configuration
accounts = get_user_accounts()

# Always use UTC for logging
logging.Formatter.formatTime = (lambda self, record, datefmt=None: datetime.datetime.fromtimestamp(record.created, datetime.timezone.utc).isoformat(sep="T",timespec="milliseconds"))

# Read our configuration file
config = ConfigParser()
config.read(args.config)

# Read the user accounts from the configuration file
accounts = get_user_accounts()

# Get the sensor name from the config or use the system's hostname
sensor_name = config['honeypot'].get('sensor_name', socket.gethostname())

Expand All @@ -401,7 +470,7 @@ def get_prompts(prompt: Optional[str], prompt_file: Optional[str]) -> dict:
llm_system_prompt = prompts["system_prompt"]
llm_user_prompt = prompts["user_prompt"]

llm = choose_llm()
llm = choose_llm(config['llm'].get("llm_provider"), config['llm'].get("model_name"))

llm_sessions = dict()

Expand Down