diff --git a/.gitignore b/.gitignore index 4906374..904b9f8 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,8 @@ share/python-wheels/ *.egg-info/ .installed.cfg *.egg +dec_env/ +dec_Env/ MANIFEST # PyInstaller diff --git a/SSH/config.ini.TEMPLATE b/SSH/config.ini.TEMPLATE index a1b666b..47b6471 100644 --- a/SSH/config.ini.TEMPLATE +++ b/SSH/config.ini.TEMPLATE @@ -85,10 +85,21 @@ system_prompt = Interpret all inputs as though they were SSH commands and provid # a password by leaving that field blank (e.g., "guest =" on a line by # itself). You can set an account to accept ANY password, including an empty # password, by setting the password to "*" +# If "*" is used, login success is determined by the "wildcard_success_rate" +# setting under the [authentication] section. +# Regular accounts with defined passwords (e.g., "admin = admin123") will +# always require the exact password. + [user_accounts] guest = user1 = secretpw user2 = password123 root = * +admin = admin + +[authentication] +# Chance-based success rate for wildcard passwords (*). +# A value of 0.3 means a 30% chance of success per attempt. +wildcard_success_rate = 0.3 diff --git a/SSH/ssh_server.py b/SSH/ssh_server.py index 342fdcb..367963d 100755 --- a/SSH/ssh_server.py +++ b/SSH/ssh_server.py @@ -13,6 +13,7 @@ import logging import datetime import uuid +import random from base64 import b64encode from operator import itemgetter from langchain_openai import ChatOpenAI @@ -109,16 +110,33 @@ def public_key_auth_supported(self) -> bool: return False def kbdinit_auth_supported(self) -> bool: return False - + def validate_password(self, username: str, password: str) -> bool: - pw = accounts.get(username, '*') - - if pw == '*' or (pw != '*' and password == pw): - logger.info("Authentication success", extra={"username": username, "password": password}) - return True - else: - logger.info("Authentication failed", extra={"username": username, "password": password}) - return False + """Authenticate user, with a chance-based success for wildcard passwords.""" + pw = accounts.get(username, '*') # Get stored password or wildcard (*) + + # Fetch wildcard success rate safely from config, with a default of 30% + try: + wildcard_success_rate = float(config.get('authentication', 'wildcard_success_rate', fallback="0.3")) + except ValueError: + wildcard_success_rate = 0.3 # Default to 30% if there's an issue + + # Wildcard password handling with probability-based success + if pw == '*': + if random.random() < wildcard_success_rate: # Chance-based success + logger.info("Wildcard authentication success", extra={"username": username, "password": password}) + return True + else: + logger.info("Wildcard authentication failed (random chance)", extra={"username": username, "password": password}) + return False + + # Regular password validation + if password == pw: + logger.info("Authentication success", extra={"username": username, "password": password}) + return True + else: + logger.info("Authentication failed", extra={"username": username, "password": password}) + return False async def session_summary(process: asyncssh.SSHServerProcess, llm_config: dict, session: RunnableWithMessageHistory, server: MySSHServer): # Check if the summary has already been generated @@ -278,7 +296,7 @@ def filter(self, record): if task: task_name = task.get_name() else: - task_name = "-" + task_name = thread_local.__dict__.get('session_id', '-') record.src_ip = thread_local.__dict__.get('src_ip', '-') record.src_port = thread_local.__dict__.get('src_port', '-') @@ -305,10 +323,10 @@ def get_user_accounts() -> dict: return accounts -def choose_llm(): - llm_provider_name = config['llm'].get("llm_provider", "openai") +def choose_llm(llm_provider: Optional[str] = None, model_name: Optional[str] = None): + llm_provider_name = llm_provider or config['llm'].get("llm_provider", "openai") llm_provider_name = llm_provider_name.lower() - model_name = config['llm'].get("model_name", "gpt-3.5-turbo") + model_name = model_name or config['llm'].get("model_name", "gpt-3.5-turbo") if llm_provider_name == 'openai': llm_model = ChatOpenAI( @@ -360,26 +378,77 @@ def get_prompts(prompt: Optional[str], prompt_file: Optional[str]) -> dict: try: # Parse command line arguments parser = argparse.ArgumentParser(description='Start the SSH honeypot server.') - parser.add_argument('-c', '--config', type=str, default='config.ini', help='Path to the configuration file') + parser.add_argument('-c', '--config', type=str, default=None, help='Path to the configuration file') parser.add_argument('-p', '--prompt', type=str, help='The entire text of the prompt') parser.add_argument('-f', '--prompt-file', type=str, default='prompt.txt', help='Path to the prompt file') + parser.add_argument('-l', '--llm-provider', type=str, help='The LLM provider to use') + parser.add_argument('-m', '--model-name', type=str, help='The model name to use') + parser.add_argument('-t', '--trimmer-max-tokens', type=int, help='The maximum number of tokens to send to the LLM backend in a single request') + parser.add_argument('-s', '--system-prompt', type=str, help='System prompt for the LLM') + parser.add_argument('-P', '--port', type=int, help='The port the SSH honeypot will listen on') + parser.add_argument('-k', '--host-priv-key', type=str, help='The host key to use for the SSH server') + parser.add_argument('-v', '--server-version-string', type=str, help='The server version string to send to clients') + parser.add_argument('-L', '--log-file', type=str, help='The name of the file you wish to write the honeypot log to') + parser.add_argument('-S', '--sensor-name', type=str, help='The name of the sensor, used to identify this honeypot in the logs') + parser.add_argument('-u', '--user-account', action='append', help='User account in the form username=password. Can be repeated.') args = parser.parse_args() - # Check if the config file exists - if not os.path.exists(args.config): - print(f"Error: The specified config file '{args.config}' does not exist.", file=sys.stderr) - sys.exit(1) + # Determine which config file to load + config = ConfigParser() + if args.config is not None: + # User explicitly set a config file; error if it doesn't exist. + if not os.path.exists(args.config): + print(f"Error: The specified config file '{args.config}' does not exist.", file=sys.stderr) + sys.exit(1) + config.read(args.config) + else: + default_config = "config.ini" + if os.path.exists(default_config): + config.read(default_config) + else: + # Use defaults when no config file found. + config['honeypot'] = {'log_file': 'ssh_log.log', 'sensor_name': socket.gethostname()} + config['ssh'] = {'port': '8022', 'host_priv_key': 'ssh_host_key', 'server_version_string': 'SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.3'} + config['llm'] = {'llm_provider': 'openai', 'model_name': 'gpt-3.5-turbo', 'trimmer_max_tokens': '64000', 'system_prompt': ''} + config['user_accounts'] = {} + + # Override config values with command line arguments if provided + if args.llm_provider: + config['llm']['llm_provider'] = args.llm_provider + if args.model_name: + config['llm']['model_name'] = args.model_name + if args.trimmer_max_tokens: + config['llm']['trimmer_max_tokens'] = str(args.trimmer_max_tokens) + if args.system_prompt: + config['llm']['system_prompt'] = args.system_prompt + if args.port: + config['ssh']['port'] = str(args.port) + if args.host_priv_key: + config['ssh']['host_priv_key'] = args.host_priv_key + if args.server_version_string: + config['ssh']['server_version_string'] = args.server_version_string + if args.log_file: + config['honeypot']['log_file'] = args.log_file + if args.sensor_name: + config['honeypot']['sensor_name'] = args.sensor_name + + # Merge command-line user accounts into the config + if args.user_account: + if 'user_accounts' not in config: + config['user_accounts'] = {} + for account in args.user_account: + if '=' in account: + key, value = account.split('=', 1) + config['user_accounts'][key.strip()] = value.strip() + else: + config['user_accounts'][account.strip()] = '' + + # Read the user accounts from the configuration + accounts = get_user_accounts() # Always use UTC for logging logging.Formatter.formatTime = (lambda self, record, datefmt=None: datetime.datetime.fromtimestamp(record.created, datetime.timezone.utc).isoformat(sep="T",timespec="milliseconds")) - # Read our configuration file - config = ConfigParser() - config.read(args.config) - - # Read the user accounts from the configuration file - accounts = get_user_accounts() - # Get the sensor name from the config or use the system's hostname sensor_name = config['honeypot'].get('sensor_name', socket.gethostname()) @@ -401,7 +470,7 @@ def get_prompts(prompt: Optional[str], prompt_file: Optional[str]) -> dict: llm_system_prompt = prompts["system_prompt"] llm_user_prompt = prompts["user_prompt"] - llm = choose_llm() + llm = choose_llm(config['llm'].get("llm_provider"), config['llm'].get("model_name")) llm_sessions = dict()