From e503d78257eae1e01675929c1713cafc650a995e Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Mon, 23 Feb 2026 14:41:16 +0100 Subject: [PATCH 1/2] Add retrieval of DPAPI backup keys --- dissect/database/ese/ntds/ntds.py | 38 +++++++++++++++++++-- dissect/database/ese/ntds/objects/secret.py | 34 ++++++++++++++++++ tests/ese/ntds/test_ntds.py | 17 +++++++++ 3 files changed, 87 insertions(+), 2 deletions(-) diff --git a/dissect/database/ese/ntds/ntds.py b/dissect/database/ese/ntds/ntds.py index 3b8fb5a..30cd3ca 100644 --- a/dissect/database/ese/ntds/ntds.py +++ b/dissect/database/ese/ntds/ntds.py @@ -1,14 +1,24 @@ from __future__ import annotations from typing import TYPE_CHECKING, BinaryIO +from uuid import UUID from dissect.database.ese.ntds.database import Database if TYPE_CHECKING: from collections.abc import Iterator - from dissect.database.ese.ntds.objects import Computer, DomainDNS, Group, GroupPolicyContainer, Object, Server, User - from dissect.database.ese.ntds.objects.trusteddomain import TrustedDomain + from dissect.database.ese.ntds.objects import ( + Computer, + DomainDNS, + Group, + GroupPolicyContainer, + Object, + Secret, + Server, + TrustedDomain, + User, + ) from dissect.database.ese.ntds.pek import PEK @@ -93,3 +103,27 @@ def trusts(self) -> Iterator[TrustedDomain]: def group_policies(self) -> Iterator[GroupPolicyContainer]: """Get all group policy objects (GPO) objects from the database.""" yield from self.search(objectClass="groupPolicyContainer") + + def secrets(self) -> Iterator[Secret]: + """Get all secret objects from the database.""" + yield from self.search(objectClass="secret") + + def backup_keys(self) -> Iterator[tuple[UUID, bytes]]: + """Get all DPAPI backup keys from the database as a tuple of the GUID and the key value. + + All key values start with a ``DWORD`` version number. + The current version (``2``) is followed by two more ``DWORD`` which are the length of the private key bytes and + the length of the public key bytes, followed by the private key and public key bytes respectively. + """ + if not self.pek.unlocked: + raise ValueError("PEK must be unlocked to retrieve backup keys") + + for secret in self.secrets(): + if secret.is_phantom: + continue + + # Just return all backup keys regardless if they're preferred or not + if not secret.name.startswith("BCKUPKEY_") or secret.name.startswith("BCKUPKEY_P"): + continue + + yield (UUID(secret.name.removeprefix("BCKUPKEY_").removesuffix(" Secret")), secret.current_value) diff --git a/dissect/database/ese/ntds/objects/secret.py b/dissect/database/ese/ntds/objects/secret.py index 99c1d75..07c6f84 100644 --- a/dissect/database/ese/ntds/objects/secret.py +++ b/dissect/database/ese/ntds/objects/secret.py @@ -1,7 +1,14 @@ from __future__ import annotations +from typing import TYPE_CHECKING + +from dissect.util.ts import wintimestamp + from dissect.database.ese.ntds.objects.leaf import Leaf +if TYPE_CHECKING: + from datetime import datetime + class Secret(Leaf): """Represents a secret object in the Active Directory. @@ -11,3 +18,30 @@ class Secret(Leaf): """ __object_class__ = "secret" + + def __repr_body__(self) -> str: + return f"name={self.name!r} last_set_time={self.last_set_time} prior_set_time={self.prior_set_time}" + + @property + def current_value(self) -> bytes: + """Return the current value of the secret.""" + return self.get("currentValue") + + @property + def last_set_time(self) -> datetime | None: + """Return the last set time of the secret.""" + if (ts := self.get("lastSetTime")) is not None: + return wintimestamp(ts) + return None + + @property + def prior_value(self) -> bytes: + """Return the prior value of the secret.""" + return self.get("priorValue") + + @property + def prior_set_time(self) -> datetime | None: + """Return the prior set time of the secret.""" + if (ts := self.get("priorSetTime")) is not None: + return wintimestamp(ts) + return None diff --git a/tests/ese/ntds/test_ntds.py b/tests/ese/ntds/test_ntds.py index e50c1e3..1108935 100644 --- a/tests/ese/ntds/test_ntds.py +++ b/tests/ese/ntds/test_ntds.py @@ -1,6 +1,7 @@ from __future__ import annotations from typing import TYPE_CHECKING +from uuid import UUID import pytest @@ -265,6 +266,7 @@ def test_all_memberships(large: NTDS) -> None: def test_group_policies(goad: NTDS) -> None: + """Test retrieval of group policies.""" gpos: list[GroupPolicyContainer] = sorted(goad.group_policies(), key=lambda x: x.distinguished_name) assert len(gpos) == 5 assert isinstance(gpos[0], GroupPolicyContainer) @@ -275,3 +277,18 @@ def test_group_policies(goad: NTDS) -> None: "CN={6AC1786C-016F-11D2-945F-00C04FB984F9},CN=POLICIES,CN=SYSTEM,DC=NORTH,DC=SEVENKINGDOMS,DC=LOCAL", "CN={6AC1786C-016F-11D2-945F-00C04FB984F9},CN=POLICIES,CN=SYSTEM,DC=SEVENKINGDOMS,DC=LOCAL", ] + + +def test_backup_keys(goad: NTDS) -> None: + """Test retrieval of DPAPI backup keys.""" + with pytest.raises(ValueError, match="PEK must be unlocked to retrieve backup keys"): + list(goad.backup_keys()) + + goad.pek.unlock(bytes.fromhex("079f95655b66f16deb28aa1ab3a81eb0")) + + keys = list(goad.backup_keys()) + assert len(keys) == 2 + assert keys[0][0] == UUID("dbea00d0-005f-4233-b140-41a9961da100") + assert keys[0][1][:4] == b"\x01\x00\x00\x00" # Legacy key version + assert keys[1][0] == UUID("b7d3c47b-2efe-4cad-b37a-bb2f8b18bd87") + assert keys[1][1][:4] == b"\x02\x00\x00\x00" # Current key version From 0aee55718c4f1d5e178a4588efc4b884ca365154 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Wed, 25 Feb 2026 22:03:09 +0100 Subject: [PATCH 2/2] Process review --- dissect/database/ese/ntds/ntds.py | 33 ++++++++----- dissect/database/ese/ntds/objects/secret.py | 52 +++++++++++++++++++++ tests/ese/ntds/test_ntds.py | 19 ++++++-- 3 files changed, 89 insertions(+), 15 deletions(-) diff --git a/dissect/database/ese/ntds/ntds.py b/dissect/database/ese/ntds/ntds.py index 30cd3ca..e62e742 100644 --- a/dissect/database/ese/ntds/ntds.py +++ b/dissect/database/ese/ntds/ntds.py @@ -4,6 +4,7 @@ from uuid import UUID from dissect.database.ese.ntds.database import Database +from dissect.database.ese.ntds.objects.secret import BackupKey if TYPE_CHECKING: from collections.abc import Iterator @@ -108,22 +109,32 @@ def secrets(self) -> Iterator[Secret]: """Get all secret objects from the database.""" yield from self.search(objectClass="secret") - def backup_keys(self) -> Iterator[tuple[UUID, bytes]]: - """Get all DPAPI backup keys from the database as a tuple of the GUID and the key value. - - All key values start with a ``DWORD`` version number. - The current version (``2``) is followed by two more ``DWORD`` which are the length of the private key bytes and - the length of the public key bytes, followed by the private key and public key bytes respectively. - """ + def backup_keys(self) -> Iterator[BackupKey]: + """Get all DPAPI backup keys from the database.""" if not self.pek.unlocked: raise ValueError("PEK must be unlocked to retrieve backup keys") for secret in self.secrets(): - if secret.is_phantom: + if secret.is_phantom or not secret.name.startswith("BCKUPKEY_") or secret.name.startswith("BCKUPKEY_P"): continue - # Just return all backup keys regardless if they're preferred or not - if not secret.name.startswith("BCKUPKEY_") or secret.name.startswith("BCKUPKEY_P"): + yield BackupKey(secret) + + def preferred_backup_keys(self) -> Iterator[BackupKey]: + """Get preferred DPAPI backup keys from the database.""" + if not self.pek.unlocked: + raise ValueError("PEK must be unlocked to retrieve backup keys") + + # We could do this the proper way (lookup the BCKUPKEY_P* secrets and then directly lookup the + # corresponding BCKUPKEY_* secrets), but in practice there are only a few backup keys, so just + # filter after the fact + preferred_guids = [] + for secret in self.secrets(): + if secret.is_phantom or not secret.name.startswith("BCKUPKEY_P"): continue - yield (UUID(secret.name.removeprefix("BCKUPKEY_").removesuffix(" Secret")), secret.current_value) + preferred_guids.append(UUID(bytes_le=secret.current_value)) + + for key in self.backup_keys(): + if key.guid in preferred_guids: + yield key diff --git a/dissect/database/ese/ntds/objects/secret.py b/dissect/database/ese/ntds/objects/secret.py index 07c6f84..98d6510 100644 --- a/dissect/database/ese/ntds/objects/secret.py +++ b/dissect/database/ese/ntds/objects/secret.py @@ -1,9 +1,12 @@ from __future__ import annotations +from functools import cached_property from typing import TYPE_CHECKING +from uuid import UUID from dissect.util.ts import wintimestamp +from dissect.database.ese.ntds.c_ds import c_ds from dissect.database.ese.ntds.objects.leaf import Leaf if TYPE_CHECKING: @@ -45,3 +48,52 @@ def prior_set_time(self) -> datetime | None: if (ts := self.get("priorSetTime")) is not None: return wintimestamp(ts) return None + + +class BackupKey: + """Represents a DPAPI backup key object in the Active Directory.""" + + def __init__(self, secret: Secret): + self.secret = secret + + def __repr__(self) -> str: + return f"" + + @cached_property + def guid(self) -> UUID: + """The GUID of the backup key.""" + return UUID(self.secret.name.removeprefix("BCKUPKEY_").removesuffix(" Secret")) + + @cached_property + def version(self) -> int: + """The version of the backup key.""" + return c_ds.DWORD(self.secret.current_value) + + @cached_property + def is_legacy(self) -> bool: + """Whether the backup key is a legacy key (version 1).""" + return self.version == 1 + + @cached_property + def key(self) -> bytes: + """The key bytes of the backup key, for legacy keys (version 1).""" + if self.version == 1: + return self.secret.current_value[4:] + raise TypeError(f"Backup key version {self.version} does not have a single key value") + + @cached_property + def private_key(self) -> bytes: + """The private key bytes of the backup key, for version 2 keys.""" + if self.version == 2: + private_length = c_ds.DWORD(self.secret.current_value[4:8]) + return self.secret.current_value[12 : 12 + private_length] + raise TypeError(f"Backup key version {self.version} does not have a private key value") + + @cached_property + def public_key(self) -> bytes: + """The public key bytes of the backup key, for version 2 keys.""" + if self.version == 2: + private_length = c_ds.DWORD(self.secret.current_value[4:8]) + public_length = c_ds.DWORD(self.secret.current_value[8:12]) + return self.secret.current_value[12 + private_length : 12 + private_length + public_length] + raise TypeError(f"Backup key version {self.version} does not have a public key value") diff --git a/tests/ese/ntds/test_ntds.py b/tests/ese/ntds/test_ntds.py index 1108935..5279980 100644 --- a/tests/ese/ntds/test_ntds.py +++ b/tests/ese/ntds/test_ntds.py @@ -1,5 +1,6 @@ from __future__ import annotations +import hashlib from typing import TYPE_CHECKING from uuid import UUID @@ -288,7 +289,17 @@ def test_backup_keys(goad: NTDS) -> None: keys = list(goad.backup_keys()) assert len(keys) == 2 - assert keys[0][0] == UUID("dbea00d0-005f-4233-b140-41a9961da100") - assert keys[0][1][:4] == b"\x01\x00\x00\x00" # Legacy key version - assert keys[1][0] == UUID("b7d3c47b-2efe-4cad-b37a-bb2f8b18bd87") - assert keys[1][1][:4] == b"\x02\x00\x00\x00" # Current key version + assert keys[0].guid == UUID("dbea00d0-005f-4233-b140-41a9961da100") + assert keys[0].version == 1 + assert hashlib.sha256(keys[0].key).hexdigest() == "bae7b058f277922b75d63d9803b85fca40a95a3cc9d47c0ef0a644a203009562" + + assert keys[1].guid == UUID("b7d3c47b-2efe-4cad-b37a-bb2f8b18bd87") + assert keys[1].version == 2 # Current key version + assert ( + hashlib.sha256(keys[1].private_key).hexdigest() + == "e7317dfe5f962121afead04e0dbb4249aa395ef281e2332f6179f940b54f202f" + ) + assert ( + hashlib.sha256(keys[1].public_key).hexdigest() + == "398fef9281677096b18785d0ad000251d41f76b82e28687718d6a9812ddaca8a" + )