Skip to content
14 changes: 6 additions & 8 deletions dissect/target/plugins/apps/ssh/openssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,22 @@
from dissect.target.target import Target


def find_sshd_directory(target: Target) -> TargetPath:
def find_sshd_directory(target: Target) -> TargetPath | None:
"""finds which sshd directory exists on a given target, returns None if none exist"""
SSHD_DIRECTORIES = ["/sysvol/ProgramData/ssh", "/etc/ssh"]

for sshd in SSHD_DIRECTORIES:
if (target_path := target.fs.path(sshd)).exists():
return target_path

# A default, so there is no need to check for None
return target.fs.path("/etc/ssh/")
return None


class OpenSSHPlugin(SSHPlugin):
"""OpenSSH plugin."""

__namespace__ = "openssh"

SSHD_DIRECTORIES = ("/sysvol/ProgramData/ssh", "/etc/ssh")

def __init__(self, target: Target):
super().__init__(target)
self.sshd_directory = find_sshd_directory(target)
Expand All @@ -54,14 +52,14 @@ def check_compatible(self) -> None:
user_details.home_path.joinpath(".ssh").exists()
for user_details in self.target.user_details.all_with_home()
)
if not ssh_user_dirs and not self.sshd_directory.exists():
if not ssh_user_dirs and not self.sshd_directory:
raise UnsupportedPluginError("No OpenSSH directories found")

def ssh_directory_globs(self, glob_user: str, glob_sshd: str) -> Iterator[tuple[UserDetails | None, TargetPath]]:
for user_details in self.target.user_details.all_with_home():
yield from product([user_details], user_details.home_path.glob(f".ssh/{glob_user}"))

yield from product([None], self.sshd_directory.glob(glob_sshd))
if self.sshd_directory is not None:
yield from product([None], self.sshd_directory.glob(glob_sshd))

@export(record=AuthorizedKeysRecord)
def authorized_keys(self) -> Iterator[AuthorizedKeysRecord]:
Expand Down
4 changes: 2 additions & 2 deletions dissect/target/plugins/apps/ssh/opensshd.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ class SSHServerPlugin(SSHPlugin):
def __init__(self, target: Target):
super().__init__(target)
self.sshd_directory = find_sshd_directory(target)
self.sshd_config_path = self.sshd_directory.joinpath("sshd_config")
self.sshd_config_path = self.sshd_directory.joinpath("sshd_config") if self.sshd_directory else None

def check_compatible(self) -> None:
if not self.sshd_config_path.exists():
if not (self.sshd_config_path and self.sshd_config_path.exists()):
raise UnsupportedPluginError("No sshd config found")

@export(record=DynamicDescriptor(["datetime", "path"]))
Expand Down
2 changes: 1 addition & 1 deletion dissect/target/plugins/apps/webserver/iis.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ def _read_config_log_paths(self, dirs: dict[str, set[str]]) -> None:
try:
xml_data = ElementTree.fromstring(self.config.read_bytes(), forbid_dtd=True)
for log_file_element in xml_data.findall("*/sites/*/logFile"):
log_format = log_file_element.get("logFormat") or "W3C"
if log_dir := log_file_element.get("directory"):
log_format = log_file_element.get("logFormat") or "W3C"
if log_format not in dirs:
self.target.log.warning("Unsupported log format %s, skipping %s", log_format, log_dir)
continue
Expand Down
6 changes: 3 additions & 3 deletions dissect/target/plugins/os/windows/firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class WindowsFirewallPlugin(Plugin):

LOGGING_KEYS = (
# Defaults
"HKLM\\SYSTEM\\CurrentControlSet\\Services\\SharedAccess\\Defaults\\FirewallPolicy\\PublicProfile\\Logging"
"HKLM\\SYSTEM\\CurrentControlSet\\Services\\SharedAccess\\Defaults\\FirewallPolicy\\StandardProfile\\Logging"
"HKLM\\SYSTEM\\CurrentControlSet\\Services\\SharedAccess\\Defaults\\FirewallPolicy\\DomainProfile\\Logging"
"HKLM\\SYSTEM\\CurrentControlSet\\Services\\SharedAccess\\Defaults\\FirewallPolicy\\PublicProfile\\Logging",
"HKLM\\SYSTEM\\CurrentControlSet\\Services\\SharedAccess\\Defaults\\FirewallPolicy\\StandardProfile\\Logging",
"HKLM\\SYSTEM\\CurrentControlSet\\Services\\SharedAccess\\Defaults\\FirewallPolicy\\DomainProfile\\Logging",
# Parameters
"HKLM\\SYSTEM\\CurrentControlSet\\Services\\SharedAccess\\Parameters\\FirewallPolicy\\PublicProfile\\Logging",
"HKLM\\SYSTEM\\CurrentControlSet\\Services\\SharedAccess\\Parameters\\FirewallPolicy\\StandardProfile\\Logging",
Expand Down
85 changes: 37 additions & 48 deletions dissect/target/plugins/os/windows/regf/clsid.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Final
from typing import TYPE_CHECKING

from dissect.target.exceptions import RegistryError, UnsupportedPluginError
from dissect.target.helpers.descriptor_extensions import (
Expand Down Expand Up @@ -47,72 +47,61 @@ class CLSIDPlugin(Plugin):
"""

__namespace__ = "clsid"

KEYS: Final[dict[str, str]] = {
"user": "HKEY_CURRENT_USER\\Software\\Classes\\CLSID",
"machine": "HKEY_LOCAL_MACHINE\\SOFTWARE\\Classes\\CLSID",
}
USER_KEY = "HKEY_CURRENT_USER\\Software\\Classes\\CLSID"
MACHINE_KEY = "HKEY_LOCAL_MACHINE\\SOFTWARE\\Classes\\CLSID"

def __init__(self, target: Target):
super().__init__(target)

def check_compatible(self) -> None:
if not len(list(self.target.registry.keys(list(self.KEYS.values())))) > 0:
if not len(list(self.target.registry.keys((self.USER_KEY, self.MACHINE_KEY)))) > 0:
raise UnsupportedPluginError("No CLSID key found")

def create_records(self, keys: list[RegistryKey]) -> Iterator[CLSIDRecord]:
"""Iterate all CLSID keys from HKEY_CURRENT_USER\\Software\\Classes\\CLSID and
HKEY_LOCAL_MACHINE\\SOFTWARE\\Classes\\CLSID.

Yields CLSIDRecords with fields:

.. code-block:: text

hostname (string): The target hostname.
domain (string): The target domain.
ts (datetime): Last modified timestamp of the registry key.
clsid (string): The CLSID key name.
path (uri): The CLSID path value.
def create_records(self, key_path: RegistryKey) -> Iterator[CLSIDRecord]:
"""Iterates all CLSID keys from any CLSID registry
Args:
key: a ``str`` representing the path to the key
Yields:
``CLSIDRecords`` for each entry
"""

names = [
"InprocServer32",
"InprocServer",
"LocalServer",
"LocalServer32",
]

for reg in self.target.registry.keys(keys):
user = self.target.registry.get_user(reg)

for subkey in reg.subkeys():
try:
name = subkey.value("(default)").value
except RegistryError:
name = None

for entry in subkey.subkeys():
if entry.name in names:
try:
subkey_value = entry.value("(default)")
except RegistryError:
continue

yield CLSIDRecord(
ts=entry.ts,
clsid=subkey.name,
name=name,
value=subkey_value.value,
_target=self.target,
_user=user,
_key=entry,
)
key = self.target.registry.key(key_path)
user = self.target.registry.get_user(key)
for subkey in key.subkeys():
try:
name = subkey.value("(default)").value
except RegistryError:
name = None

for entry in subkey.subkeys():
if entry.name in names:
try:
subkey_value = entry.value("(default)")
except RegistryError:
continue

yield CLSIDRecord(
ts=entry.ts,
clsid=subkey.name,
name=name,
value=subkey_value.value,
_target=self.target,
_user=user,
_key=entry,
)

@export(record=CLSIDRecord)
def user(self) -> Iterator[CLSIDRecord]:
"""Return only the user CLSID registry keys."""
yield from self.create_records(self.KEYS["user"])
yield from self.create_records(self.USER_KEY)

@export(record=CLSIDRecord)
def machine(self) -> Iterator[CLSIDRecord]:
"""Return only the machine CLSID registry keys."""
yield from self.create_records(self.KEYS["machine"])
yield from self.create_records(self.MACHINE_KEY)
22 changes: 9 additions & 13 deletions dissect/target/plugins/os/windows/ual.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Any

from dissect.database.ese.tools import ual
Expand All @@ -11,7 +12,6 @@

if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path

from dissect.target.target import Target

Expand Down Expand Up @@ -157,15 +157,13 @@ class UalPlugin(Plugin):
"""

__namespace__ = "ual"

LOG_DB_GLOB = "%windir%/System32/LogFiles/Sum/*.mdb"

IDENTITY_DB_FILENAME = "SystemIdentity.mdb"
IDENTITY_DB_PATH = f"%windir%/System32/LogFiles/Sum/{IDENTITY_DB_FILENAME}"
LOG_DIR = "%windir%/System32/LogFiles/Sum/"
LOG_DB_GLOB = "*.mdb"
IDENTITY_FILE_NAME = "SystemIdentity.mdb"

def __init__(self, target: Target):
super().__init__(target)

self.identity_db_path = f"{self.LOG_DIR}{self.IDENTITY_FILE_NAME}"
self.mdb_paths = self.find_mdb_files()

self.role_guid_map = {}
Expand All @@ -177,16 +175,14 @@ def check_compatible(self) -> None:
raise UnsupportedPluginError("No MDB files found")

def find_mdb_files(self) -> list[Path]:
base, _, glob = self.LOG_DB_GLOB.partition("*")

return [
path
for path in self.target.resolve(base).glob(f"*{glob}")
if path.exists() and path.name != self.IDENTITY_DB_FILENAME
for path in self.target.resolve(self.LOG_DIR).glob(self.LOG_DB_GLOB)
if path.name != Path(self.identity_db_path).name
]

def populate_role_guid_map(self) -> None:
identity_db = self.target.resolve(self.IDENTITY_DB_PATH)
identity_db = self.target.resolve(str(self.identity_db_path))
if not identity_db.exists():
return

Expand Down Expand Up @@ -271,7 +267,7 @@ def system_identities(self) -> Iterator[SystemIdentityRecord]:
for record in self.identity_db_parser.get_table_records("SYSTEM_IDENTITY"):
values = {FIELD_NAME_MAP.get(key, key): value for key, value in record.items()}
yield SystemIdentityRecord(
path=self.target.fs.path(self.IDENTITY_DB_PATH),
path=self.target.fs.path(self.identity_db_path),
_target=self.target,
**values,
)
9 changes: 6 additions & 3 deletions tests/plugins/apps/ssh/test_opensshd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@


def test_sshd_config_plugin(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None:

config_file = absolute_path("_data/plugins/apps/ssh/opensshd/sshd_config")
fs_unix.map_file("/etc/ssh/sshd_config", config_file)
plugin = SSHServerPlugin(target_unix_users)
fs_unix.map_file(str(plugin.sshd_config_path), config_file)


target_unix_users.add_plugin(SSHServerPlugin)
results = list(target_unix_users.opensshd.config())
Expand All @@ -41,11 +43,12 @@ def test_sshd_config_plugin_multiple_definitions(target_unix_users: Target, fs_u
ListenAddress 9.8.7.6
"""

plugin = SSHServerPlugin(target_unix_users)

fs_unix.map_file_fh(
str(plugin.sshd_config_path),
"/etc/ssh/sshd_config",
BytesIO(textwrap.dedent(config).encode()),
)
plugin = SSHServerPlugin(target_unix_users)

target_unix_users.add_plugin(SSHServerPlugin)
results = list(target_unix_users.opensshd.config())
Expand Down
Loading