diff --git a/dissect/target/plugins/os/windows/ad/__init__.py b/dissect/target/plugins/os/windows/ad/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dissect/target/plugins/os/windows/ad/ntds.py b/dissect/target/plugins/os/windows/ad/ntds.py new file mode 100644 index 0000000000..f0baf4e4b1 --- /dev/null +++ b/dissect/target/plugins/os/windows/ad/ntds.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +from datetime import datetime +from functools import cached_property +from typing import TYPE_CHECKING, Any + +from dissect.database.ese.ntds import NTDS + +from dissect.target.helpers.record import TargetRecordDescriptor +from dissect.target.plugin import Plugin, UnsupportedPluginError, export, internal +from dissect.target.plugins.os.windows.credential.sam import des_decrypt + +if TYPE_CHECKING: + from collections.abc import Iterator + + from dissect.database.ese.ntds.objects import Computer, User + + from dissect.target.target import Target + + +GENERIC_FIELDS = [ + ("string", "cn"), + ("string", "upn"), + ("string", "sam_name"), + ("string", "sam_type"), + ("string", "description"), + ("string", "sid"), + ("varint", "rid"), + ("datetime", "password_last_set"), + ("datetime", "logon_last_failed"), + ("datetime", "logon_last_success"), + ("datetime", "account_expires"), + ("datetime", "creation_time"), + ("datetime", "last_modified_time"), + ("boolean", "admin_count"), + ("boolean", "is_deleted"), + ("string", "lm"), + ("string[]", "lm_history"), + ("string", "nt"), + ("string[]", "nt_history"), + ("string", "supplemental_credentials"), + ("string", "user_account_control"), + ("string[]", "object_classes"), + ("string", "distinguished_name"), + ("string", "object_guid"), + ("uint32", "primary_group_id"), + ("string[]", "member_of"), + ("string[]", "service_principal_name"), +] + +# Record descriptor for NTDS user secrets +NtdsUserRecord = TargetRecordDescriptor( + "windows/ad/user", + [ + *GENERIC_FIELDS, + ("string", "info"), + ("string", "comment"), + ("string", "telephone_number"), + ("string", "home_directory"), + ], +) +NtdsComputerRecord = TargetRecordDescriptor( + "windows/ad/computer", + [ + *GENERIC_FIELDS, + ("string", "dns_hostname"), + ("string", "operating_system"), + ("string", "operating_system_version"), + ], +) + + +# NTDS Registry consts +NTDS_PARAMETERS_REGISTRY_PATH = "HKLM\\SYSTEM\\CurrentControlSet\\Services\\NTDS\\Parameters" +NTDS_PARAMETERS_DB_VALUE = "DSA Database file" + +# Default values +DEFAULT_LM_HASH = "aad3b435b51404eeaad3b435b51404ee" +DEFAULT_NT_HASH = "31d6cfe0d16ae931b73c59d7e0c089c0" + + +class NtdsPlugin(Plugin): + """Plugin to parse NTDS.dit Active Directory database and extract user credentials. + + This plugin extracts user password hashes, password history, Kerberos keys, and other authentication data + from the NTDS.dit database found on Windows Domain Controllers. + """ + + __namespace__ = "ad" + + def __init__(self, target: Target): + super().__init__(target) + self.path = None + + if self.target.has_function("registry"): + key = self.target.registry.value(NTDS_PARAMETERS_REGISTRY_PATH, NTDS_PARAMETERS_DB_VALUE) + self.path = self.target.fs.path(key.value) + + def check_compatible(self) -> None: + if not self.target.has_function("lsa"): + raise UnsupportedPluginError("System Hive is not present or LSA function not available") + + if self.path is None or not self.path.is_file(): + raise UnsupportedPluginError("No NTDS.dit database found on target") + + @cached_property + @internal + def ntds(self) -> NTDS: + ntds = NTDS(self.path.open("rb")) + + if self.target.has_function("lsa"): + ntds.pek.unlock(self.target.lsa.syskey) + + return ntds + + @export(record=NtdsUserRecord) + def users(self) -> Iterator[NtdsUserRecord]: + """Extract all user accounts from the NTDS.dit database.""" + for user in self.ntds.users(): + yield NtdsUserRecord( + **extract_user_info(user, self.target), + info=user.get("info"), + comment=user.get("comment"), + telephone_number=user.get("telephoneNumber"), + home_directory=user.get("homeDirectory"), + _target=self.target, + ) + + @export(record=NtdsComputerRecord) + def computers(self) -> Iterator[NtdsComputerRecord]: + """Extract all computer accounts from the NTDS.dit database.""" + for computer in self.ntds.computers(): + yield NtdsComputerRecord( + **extract_user_info(computer, self.target), + dns_hostname=computer.get("dNSHostName"), + operating_system=computer.get("operatingSystem"), + operating_system_version=computer.get("operatingSystemVersion"), + _target=self.target, + ) + + +def extract_user_info(user: User | Computer, target: Target) -> dict[str, Any]: + """Extract generic information from a User or Computer account.""" + + lm_hash = des_decrypt(lm_pwd, user.rid).hex() if (lm_pwd := user.get("dBCSPwd")) else DEFAULT_LM_HASH + nt_hash = des_decrypt(nt_pwd, user.rid).hex() if (nt_pwd := user.get("unicodePwd")) else DEFAULT_NT_HASH + + # Decrypt password history + lm_history = [des_decrypt(lm, user.rid).hex() for lm in user.get("lmPwdHistory")] + nt_history = [des_decrypt(nt, user.rid).hex() for nt in user.get("ntPwdHistory")] + + try: + member_of = [group.distinguished_name for group in user.groups()] + except Exception as e: + member_of = [] + target.log.warning("Failed to extract group membership for user %s: %s", user, e) + target.log.debug("", exc_info=e) + + # Extract supplemental credentials and yield records + return { + "cn": user.get("cn"), + "upn": user.get("userPrincipalName"), + "sam_name": user.sam_account_name, + "sam_type": user.sam_account_type.name, + "description": user.get("description"), + "sid": user.sid, + "rid": user.rid, + "password_last_set": user.get("pwdLastSet"), + "logon_last_failed": user.get("badPasswordTime"), + "logon_last_success": user.get("lastLogon"), + "account_expires": user.get("accountExpires") if isinstance(user.get("accountExpires"), datetime) else None, + "creation_time": user.when_created, + "last_modified_time": user.when_changed, + "admin_count": user.get("adminCount"), + "is_deleted": user.is_deleted, + "lm": lm_hash, + "lm_history": lm_history, + "nt": nt_hash, + "nt_history": nt_history, + "supplemental_credentials": user.get("supplementalCredentials"), + "user_account_control": user.user_account_control.name, + "object_classes": user.object_class, + "distinguished_name": user.distinguished_name, + "object_guid": user.guid, + "primary_group_id": user.primary_group_id, + "member_of": member_of, + "service_principal_name": user.get("servicePrincipalName"), + } diff --git a/dissect/target/plugins/os/windows/credential/lsa.py b/dissect/target/plugins/os/windows/credential/lsa.py index 51af560d26..d4984d80a1 100644 --- a/dissect/target/plugins/os/windows/credential/lsa.py +++ b/dissect/target/plugins/os/windows/credential/lsa.py @@ -6,7 +6,7 @@ from dissect.target.exceptions import RegistryKeyNotFoundError, UnsupportedPluginError from dissect.target.helpers.record import TargetRecordDescriptor -from dissect.target.plugin import Plugin, export +from dissect.target.plugin import Plugin, export, internal if TYPE_CHECKING: from collections.abc import Iterator @@ -52,6 +52,7 @@ def check_compatible(self) -> None: raise UnsupportedPluginError("Registry key not found: %s", self.SYSTEM_KEY) @cached_property + @internal def syskey(self) -> bytes: """Return byte value of Windows system SYSKEY, also called BootKey.""" lsa = self.target.registry.key(self.SYSTEM_KEY) @@ -63,6 +64,7 @@ def syskey(self) -> bytes: return bytes(r[i] for i in alterator) @cached_property + @internal def lsakey(self) -> bytes: """Decrypt and return the LSA key of the Windows system.""" security_pol = self.target.registry.key(self.SECURITY_POLICY_KEY) diff --git a/dissect/target/plugins/os/windows/credential/sam.py b/dissect/target/plugins/os/windows/credential/sam.py index 942d67b91b..ca2c2291ad 100644 --- a/dissect/target/plugins/os/windows/credential/sam.py +++ b/dissect/target/plugins/os/windows/credential/sam.py @@ -263,6 +263,29 @@ def rid_to_key(rid: int) -> tuple[bytes, bytes]: return k1, k2 +def des_decrypt(data: bytes, rid: int) -> bytes: + """Decrypt a DES-encrypted hash using the RID-derived keys. + + Args: + data: Encrypted data (16 bytes). + rid: Relative ID of the user account. + + Raises: + ValueError: If data is not 16 bytes. + """ + if len(data) != 16: + raise ValueError("data must be 16 bytes long") + + key1, key2 = rid_to_key(rid) + des1 = DES.new(key1, DES.MODE_ECB) + des2 = DES.new(key2, DES.MODE_ECB) + + block1 = des1.decrypt(data[:8]) + block2 = des2.decrypt(data[8:]) + + return block1 + block2 + + def decrypt_single_hash(rid: int, samkey: bytes, enc_hash: bytes, apwd: bytes) -> bytes: if not enc_hash: return b"" @@ -272,8 +295,6 @@ def decrypt_single_hash(rid: int, samkey: bytes, enc_hash: bytes, apwd: bytes) - if sh.revision not in [0x01, 0x02]: raise ValueError(f"Unsupported LM/NT hash revision encountered: {sh.revision}") - d1, d2 = (DES.new(k, DES.MODE_ECB) for k in rid_to_key(rid)) - if sh.revision == 0x01: # LM/NT revision 0x01 involving RC4 sh_hash = enc_hash[len(c_sam.SAM_HASH) :] if not sh_hash: # Empty hash @@ -290,7 +311,7 @@ def decrypt_single_hash(rid: int, samkey: bytes, enc_hash: bytes, apwd: bytes) - sh_hash = enc_hash[len(c_sam.SAM_HASH_AES) :] obfkey = AES.new(samkey, AES.MODE_CBC, sh.salt).decrypt(sh_hash)[:16] - return d1.decrypt(obfkey[:8]) + d2.decrypt(obfkey[8:]) + return des_decrypt(obfkey, rid) class SamPlugin(Plugin): diff --git a/pyproject.toml b/pyproject.toml index cef977dd5a..b6a790b066 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ classifiers = [ dependencies = [ "defusedxml", "dissect.cstruct>=4,<5", - "dissect.database>=1.1.dev3,<2", # TODO: update on release! + "dissect.database>=1.1.dev4,<2", # TODO: update on release! "dissect.eventlog>=3,<4", "dissect.evidence>=3.13.dev3,<4", # TODO: update on release! "dissect.hypervisor>=3.21.dev3,<4", # TODO: update on release! @@ -84,7 +84,7 @@ dev = [ "dissect.clfs[dev]>=1.0.dev,<2.0.dev", "dissect.cramfs[dev]>=1.0.dev,<2.0.dev", "dissect.cstruct>=4.0.dev,<5.0.dev", - "dissect.database[dev]>=1.1.dev3,<2.0.dev", # TODO: update on release! + "dissect.database[dev]>=1.1.dev8,<2.0.dev", # TODO: update on release! "dissect.etl[dev]>=3.0.dev,<4.0.dev", "dissect.eventlog[dev]>=3.0.dev,<4.0.dev", "dissect.evidence[dev]>=3.13.dev2,<4.0.dev", diff --git a/tests/_data/plugins/os/windows/ad/ntds/goad/ntds.dit.gz b/tests/_data/plugins/os/windows/ad/ntds/goad/ntds.dit.gz new file mode 100644 index 0000000000..442fdbb81f --- /dev/null +++ b/tests/_data/plugins/os/windows/ad/ntds/goad/ntds.dit.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:f38cbda2b136e160f8c7e7ca2e7b4f1389975c4b40098dba1b6f944ba2c8950c +size 2159695 diff --git a/tests/plugins/os/windows/ad/__init__.py b/tests/plugins/os/windows/ad/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/plugins/os/windows/ad/test_ntds.py b/tests/plugins/os/windows/ad/test_ntds.py new file mode 100644 index 0000000000..1dca870c1c --- /dev/null +++ b/tests/plugins/os/windows/ad/test_ntds.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from dissect.target.helpers.regutil import VirtualHive, VirtualKey, VirtualValue +from dissect.target.plugins.os.windows.ad.ntds import DEFAULT_NT_HASH +from tests._utils import absolute_path +from tests.plugins.os.windows.credential.test_credhist import md4 +from tests.plugins.os.windows.credential.test_lsa import map_lsa_system_keys + +if TYPE_CHECKING: + from dissect.target.target import Target + + +@pytest.fixture +def target_win_ntds(target_win: Target, hive_hklm: VirtualHive) -> Target: + registry_path = "SYSTEM\\ControlSet001\\Services\\NTDS\\Parameters" + hive_hklm.map_key(registry_path, VirtualKey(hive_hklm, registry_path)) + hive_hklm.map_value( + registry_path, + "DSA Database file", + VirtualValue(hive_hklm, "DSA Database file", "c:/windows/ntds/ntds.dit"), + ) + + map_lsa_system_keys( + hive_hklm, + { + "JD": "ebaa656d", + "Skew1": "959f28b0", + "GBG": "0766a85b", + "Data": "1af1b31e", + }, + ) + + target_win.fs.map_file( + "c:/windows/ntds/ntds.dit", + absolute_path("_data/plugins/os/windows/ad/ntds/goad/ntds.dit.gz"), + compression="gzip", + ) + + return target_win + + +def test_users(target_win_ntds: Target) -> None: + """Tests if ``ad.users`` outputs the correct amount of records and their content""" + cn_to_ntlm_hash_mapping = { + "krbtgt": "988160b622eb37838dbff2523015e44c", # Unknown Password + "NORTH$": "8048b2621bb71945d6ca6e9a14084af1", # Unknown Password + "ESSOS$": "f1580437d0120689ad3909b9fe9b74fe", # Unknown Password + "Administrator": "c66d72021a2d4744409969a581a1705e", # Unknown Password + "renly.baratheon": "f667bd83b30c87801cef53856618d534", # Unknown Password + "vagrant": md4("vagrant").hex(), + "lord.varys": md4("_W1sper_$").hex(), + "jaime.lannister": md4("cersei").hex(), + "tyron.lannister": md4("Alc00L&S3x").hex(), + "cersei.lannister": md4("il0vejaime").hex(), + "joffrey.baratheon": md4("1killerlion").hex(), + "stannis.baratheon": md4("Drag0nst0ne").hex(), + "petyer.baelish": md4("@littlefinger@").hex(), + "tywin.lannister": md4("powerkingftw135").hex(), + "maester.pycelle": md4("MaesterOfMaesters").hex(), + } + + results = list(target_win_ntds.ad.users()) + + assert len(results) == 33 + + for result in results: + if result.cn not in cn_to_ntlm_hash_mapping or result.nt == DEFAULT_NT_HASH: + continue + + assert cn_to_ntlm_hash_mapping[result.cn] == result.nt + + +def test_computers(target_win_ntds: Target) -> None: + """Tests if ``ad.computers`` outputs the correct amount of records and their content""" + cn_to_ntlm_hash_mapping = { + "KINGSLANDING": "00e3201a59af7ecc72e939a8c9794c64", # Unknown Password + } + + results = list(target_win_ntds.ad.computers()) + + assert len(results) == 3 + + for result in results: + if result.cn not in cn_to_ntlm_hash_mapping or result.nt == DEFAULT_NT_HASH: + continue + + assert cn_to_ntlm_hash_mapping[result.cn] == result.nt diff --git a/tests/plugins/os/windows/credential/test_credhist.py b/tests/plugins/os/windows/credential/test_credhist.py index e2c8a57a78..445f2903d9 100644 --- a/tests/plugins/os/windows/credential/test_credhist.py +++ b/tests/plugins/os/windows/credential/test_credhist.py @@ -69,9 +69,9 @@ def test_credhist_partial(target_win_users: Target, fs_win: VirtualFilesystem) - assert [result.nt for result in results] == [md4("user").hex(), md4("password").hex(), None] -def md4(plaintext: str) -> str: +def md4(plaintext: str) -> bytes: return MD4.new(plaintext.encode("utf-16-le")).digest() -def sha1(plaintext: str) -> str: +def sha1(plaintext: str) -> bytes: return hashlib.sha1(plaintext.encode("utf-16-le")).digest()