diff --git a/dissect/database/ese/ntds/ntds.py b/dissect/database/ese/ntds/ntds.py index 3b8fb5a..e62e742 100644 --- a/dissect/database/ese/ntds/ntds.py +++ b/dissect/database/ese/ntds/ntds.py @@ -1,14 +1,25 @@ from __future__ import annotations from typing import TYPE_CHECKING, BinaryIO +from uuid import UUID from dissect.database.ese.ntds.database import Database +from dissect.database.ese.ntds.objects.secret import BackupKey if TYPE_CHECKING: from collections.abc import Iterator - from dissect.database.ese.ntds.objects import Computer, DomainDNS, Group, GroupPolicyContainer, Object, Server, User - from dissect.database.ese.ntds.objects.trusteddomain import TrustedDomain + from dissect.database.ese.ntds.objects import ( + Computer, + DomainDNS, + Group, + GroupPolicyContainer, + Object, + Secret, + Server, + TrustedDomain, + User, + ) from dissect.database.ese.ntds.pek import PEK @@ -93,3 +104,37 @@ def trusts(self) -> Iterator[TrustedDomain]: def group_policies(self) -> Iterator[GroupPolicyContainer]: """Get all group policy objects (GPO) objects from the database.""" yield from self.search(objectClass="groupPolicyContainer") + + def secrets(self) -> Iterator[Secret]: + """Get all secret objects from the database.""" + yield from self.search(objectClass="secret") + + def backup_keys(self) -> Iterator[BackupKey]: + """Get all DPAPI backup keys from the database.""" + if not self.pek.unlocked: + raise ValueError("PEK must be unlocked to retrieve backup keys") + + for secret in self.secrets(): + if secret.is_phantom or not secret.name.startswith("BCKUPKEY_") or secret.name.startswith("BCKUPKEY_P"): + continue + + yield BackupKey(secret) + + def preferred_backup_keys(self) -> Iterator[BackupKey]: + """Get preferred DPAPI backup keys from the database.""" + if not self.pek.unlocked: + raise ValueError("PEK must be unlocked to retrieve backup keys") + + # We could do this the proper way (lookup the BCKUPKEY_P* secrets and then directly lookup the + # corresponding BCKUPKEY_* secrets), but in practice there are only a few backup keys, so just + # filter after the fact + preferred_guids = [] + for secret in self.secrets(): + if secret.is_phantom or not secret.name.startswith("BCKUPKEY_P"): + continue + + preferred_guids.append(UUID(bytes_le=secret.current_value)) + + for key in self.backup_keys(): + if key.guid in preferred_guids: + yield key diff --git a/dissect/database/ese/ntds/objects/secret.py b/dissect/database/ese/ntds/objects/secret.py index 99c1d75..98d6510 100644 --- a/dissect/database/ese/ntds/objects/secret.py +++ b/dissect/database/ese/ntds/objects/secret.py @@ -1,7 +1,17 @@ from __future__ import annotations +from functools import cached_property +from typing import TYPE_CHECKING +from uuid import UUID + +from dissect.util.ts import wintimestamp + +from dissect.database.ese.ntds.c_ds import c_ds from dissect.database.ese.ntds.objects.leaf import Leaf +if TYPE_CHECKING: + from datetime import datetime + class Secret(Leaf): """Represents a secret object in the Active Directory. @@ -11,3 +21,79 @@ class Secret(Leaf): """ __object_class__ = "secret" + + def __repr_body__(self) -> str: + return f"name={self.name!r} last_set_time={self.last_set_time} prior_set_time={self.prior_set_time}" + + @property + def current_value(self) -> bytes: + """Return the current value of the secret.""" + return self.get("currentValue") + + @property + def last_set_time(self) -> datetime | None: + """Return the last set time of the secret.""" + if (ts := self.get("lastSetTime")) is not None: + return wintimestamp(ts) + return None + + @property + def prior_value(self) -> bytes: + """Return the prior value of the secret.""" + return self.get("priorValue") + + @property + def prior_set_time(self) -> datetime | None: + """Return the prior set time of the secret.""" + if (ts := self.get("priorSetTime")) is not None: + return wintimestamp(ts) + return None + + +class BackupKey: + """Represents a DPAPI backup key object in the Active Directory.""" + + def __init__(self, secret: Secret): + self.secret = secret + + def __repr__(self) -> str: + return f"" + + @cached_property + def guid(self) -> UUID: + """The GUID of the backup key.""" + return UUID(self.secret.name.removeprefix("BCKUPKEY_").removesuffix(" Secret")) + + @cached_property + def version(self) -> int: + """The version of the backup key.""" + return c_ds.DWORD(self.secret.current_value) + + @cached_property + def is_legacy(self) -> bool: + """Whether the backup key is a legacy key (version 1).""" + return self.version == 1 + + @cached_property + def key(self) -> bytes: + """The key bytes of the backup key, for legacy keys (version 1).""" + if self.version == 1: + return self.secret.current_value[4:] + raise TypeError(f"Backup key version {self.version} does not have a single key value") + + @cached_property + def private_key(self) -> bytes: + """The private key bytes of the backup key, for version 2 keys.""" + if self.version == 2: + private_length = c_ds.DWORD(self.secret.current_value[4:8]) + return self.secret.current_value[12 : 12 + private_length] + raise TypeError(f"Backup key version {self.version} does not have a private key value") + + @cached_property + def public_key(self) -> bytes: + """The public key bytes of the backup key, for version 2 keys.""" + if self.version == 2: + private_length = c_ds.DWORD(self.secret.current_value[4:8]) + public_length = c_ds.DWORD(self.secret.current_value[8:12]) + return self.secret.current_value[12 + private_length : 12 + private_length + public_length] + raise TypeError(f"Backup key version {self.version} does not have a public key value") diff --git a/tests/ese/ntds/test_ntds.py b/tests/ese/ntds/test_ntds.py index e50c1e3..5279980 100644 --- a/tests/ese/ntds/test_ntds.py +++ b/tests/ese/ntds/test_ntds.py @@ -1,6 +1,8 @@ from __future__ import annotations +import hashlib from typing import TYPE_CHECKING +from uuid import UUID import pytest @@ -265,6 +267,7 @@ def test_all_memberships(large: NTDS) -> None: def test_group_policies(goad: NTDS) -> None: + """Test retrieval of group policies.""" gpos: list[GroupPolicyContainer] = sorted(goad.group_policies(), key=lambda x: x.distinguished_name) assert len(gpos) == 5 assert isinstance(gpos[0], GroupPolicyContainer) @@ -275,3 +278,28 @@ def test_group_policies(goad: NTDS) -> None: "CN={6AC1786C-016F-11D2-945F-00C04FB984F9},CN=POLICIES,CN=SYSTEM,DC=NORTH,DC=SEVENKINGDOMS,DC=LOCAL", "CN={6AC1786C-016F-11D2-945F-00C04FB984F9},CN=POLICIES,CN=SYSTEM,DC=SEVENKINGDOMS,DC=LOCAL", ] + + +def test_backup_keys(goad: NTDS) -> None: + """Test retrieval of DPAPI backup keys.""" + with pytest.raises(ValueError, match="PEK must be unlocked to retrieve backup keys"): + list(goad.backup_keys()) + + goad.pek.unlock(bytes.fromhex("079f95655b66f16deb28aa1ab3a81eb0")) + + keys = list(goad.backup_keys()) + assert len(keys) == 2 + assert keys[0].guid == UUID("dbea00d0-005f-4233-b140-41a9961da100") + assert keys[0].version == 1 + assert hashlib.sha256(keys[0].key).hexdigest() == "bae7b058f277922b75d63d9803b85fca40a95a3cc9d47c0ef0a644a203009562" + + assert keys[1].guid == UUID("b7d3c47b-2efe-4cad-b37a-bb2f8b18bd87") + assert keys[1].version == 2 # Current key version + assert ( + hashlib.sha256(keys[1].private_key).hexdigest() + == "e7317dfe5f962121afead04e0dbb4249aa395ef281e2332f6179f940b54f202f" + ) + assert ( + hashlib.sha256(keys[1].public_key).hexdigest() + == "398fef9281677096b18785d0ad000251d41f76b82e28687718d6a9812ddaca8a" + )