Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions dissect/database/ese/ntds/ntds.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
from __future__ import annotations

from typing import TYPE_CHECKING, BinaryIO
from uuid import UUID

from dissect.database.ese.ntds.database import Database
from dissect.database.ese.ntds.objects.secret import BackupKey

if TYPE_CHECKING:
from collections.abc import Iterator

from dissect.database.ese.ntds.objects import Computer, DomainDNS, Group, GroupPolicyContainer, Object, Server, User
from dissect.database.ese.ntds.objects.trusteddomain import TrustedDomain
from dissect.database.ese.ntds.objects import (
Computer,
DomainDNS,
Group,
GroupPolicyContainer,
Object,
Secret,
Server,
TrustedDomain,
User,
)
from dissect.database.ese.ntds.pek import PEK


Expand Down Expand Up @@ -93,3 +104,37 @@ def trusts(self) -> Iterator[TrustedDomain]:
def group_policies(self) -> Iterator[GroupPolicyContainer]:
"""Get all group policy objects (GPO) objects from the database."""
yield from self.search(objectClass="groupPolicyContainer")

def secrets(self) -> Iterator[Secret]:
"""Get all secret objects from the database."""
yield from self.search(objectClass="secret")

def backup_keys(self) -> Iterator[BackupKey]:
"""Get all DPAPI backup keys from the database."""
if not self.pek.unlocked:
raise ValueError("PEK must be unlocked to retrieve backup keys")

for secret in self.secrets():
if secret.is_phantom or not secret.name.startswith("BCKUPKEY_") or secret.name.startswith("BCKUPKEY_P"):
continue

yield BackupKey(secret)

def preferred_backup_keys(self) -> Iterator[BackupKey]:
"""Get preferred DPAPI backup keys from the database."""
if not self.pek.unlocked:
raise ValueError("PEK must be unlocked to retrieve backup keys")

# We could do this the proper way (lookup the BCKUPKEY_P* secrets and then directly lookup the
# corresponding BCKUPKEY_* secrets), but in practice there are only a few backup keys, so just
# filter after the fact
preferred_guids = []
for secret in self.secrets():
if secret.is_phantom or not secret.name.startswith("BCKUPKEY_P"):
continue

preferred_guids.append(UUID(bytes_le=secret.current_value))

for key in self.backup_keys():
if key.guid in preferred_guids:
yield key
86 changes: 86 additions & 0 deletions dissect/database/ese/ntds/objects/secret.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING
from uuid import UUID

from dissect.util.ts import wintimestamp

from dissect.database.ese.ntds.c_ds import c_ds
from dissect.database.ese.ntds.objects.leaf import Leaf

if TYPE_CHECKING:
from datetime import datetime


class Secret(Leaf):
"""Represents a secret object in the Active Directory.
Expand All @@ -11,3 +21,79 @@ class Secret(Leaf):
"""

__object_class__ = "secret"

def __repr_body__(self) -> str:
return f"name={self.name!r} last_set_time={self.last_set_time} prior_set_time={self.prior_set_time}"

@property
def current_value(self) -> bytes:
"""Return the current value of the secret."""
return self.get("currentValue")

@property
def last_set_time(self) -> datetime | None:
"""Return the last set time of the secret."""
if (ts := self.get("lastSetTime")) is not None:
return wintimestamp(ts)
return None

@property
def prior_value(self) -> bytes:
"""Return the prior value of the secret."""
return self.get("priorValue")

@property
def prior_set_time(self) -> datetime | None:
"""Return the prior set time of the secret."""
if (ts := self.get("priorSetTime")) is not None:
return wintimestamp(ts)
return None


class BackupKey:
"""Represents a DPAPI backup key object in the Active Directory."""

def __init__(self, secret: Secret):
self.secret = secret

def __repr__(self) -> str:
return f"<BackupKey guid={self.guid} version={self.version}>"

@cached_property
def guid(self) -> UUID:
"""The GUID of the backup key."""
return UUID(self.secret.name.removeprefix("BCKUPKEY_").removesuffix(" Secret"))

@cached_property
def version(self) -> int:
"""The version of the backup key."""
return c_ds.DWORD(self.secret.current_value)

@cached_property
def is_legacy(self) -> bool:
"""Whether the backup key is a legacy key (version 1)."""
return self.version == 1

@cached_property
def key(self) -> bytes:
"""The key bytes of the backup key, for legacy keys (version 1)."""
if self.version == 1:
return self.secret.current_value[4:]
raise TypeError(f"Backup key version {self.version} does not have a single key value")

@cached_property
def private_key(self) -> bytes:
"""The private key bytes of the backup key, for version 2 keys."""
if self.version == 2:
private_length = c_ds.DWORD(self.secret.current_value[4:8])
return self.secret.current_value[12 : 12 + private_length]
raise TypeError(f"Backup key version {self.version} does not have a private key value")

@cached_property
def public_key(self) -> bytes:
"""The public key bytes of the backup key, for version 2 keys."""
if self.version == 2:
private_length = c_ds.DWORD(self.secret.current_value[4:8])
public_length = c_ds.DWORD(self.secret.current_value[8:12])
return self.secret.current_value[12 + private_length : 12 + private_length + public_length]
raise TypeError(f"Backup key version {self.version} does not have a public key value")
28 changes: 28 additions & 0 deletions tests/ese/ntds/test_ntds.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import hashlib
from typing import TYPE_CHECKING
from uuid import UUID

import pytest

Expand Down Expand Up @@ -265,6 +267,7 @@ def test_all_memberships(large: NTDS) -> None:


def test_group_policies(goad: NTDS) -> None:
"""Test retrieval of group policies."""
gpos: list[GroupPolicyContainer] = sorted(goad.group_policies(), key=lambda x: x.distinguished_name)
assert len(gpos) == 5
assert isinstance(gpos[0], GroupPolicyContainer)
Expand All @@ -275,3 +278,28 @@ def test_group_policies(goad: NTDS) -> None:
"CN={6AC1786C-016F-11D2-945F-00C04FB984F9},CN=POLICIES,CN=SYSTEM,DC=NORTH,DC=SEVENKINGDOMS,DC=LOCAL",
"CN={6AC1786C-016F-11D2-945F-00C04FB984F9},CN=POLICIES,CN=SYSTEM,DC=SEVENKINGDOMS,DC=LOCAL",
]


def test_backup_keys(goad: NTDS) -> None:
"""Test retrieval of DPAPI backup keys."""
with pytest.raises(ValueError, match="PEK must be unlocked to retrieve backup keys"):
list(goad.backup_keys())

goad.pek.unlock(bytes.fromhex("079f95655b66f16deb28aa1ab3a81eb0"))

keys = list(goad.backup_keys())
assert len(keys) == 2
assert keys[0].guid == UUID("dbea00d0-005f-4233-b140-41a9961da100")
assert keys[0].version == 1
assert hashlib.sha256(keys[0].key).hexdigest() == "bae7b058f277922b75d63d9803b85fca40a95a3cc9d47c0ef0a644a203009562"

assert keys[1].guid == UUID("b7d3c47b-2efe-4cad-b37a-bb2f8b18bd87")
assert keys[1].version == 2 # Current key version
assert (
hashlib.sha256(keys[1].private_key).hexdigest()
== "e7317dfe5f962121afead04e0dbb4249aa395ef281e2332f6179f940b54f202f"
)
assert (
hashlib.sha256(keys[1].public_key).hexdigest()
== "398fef9281677096b18785d0ad000251d41f76b82e28687718d6a9812ddaca8a"
)
Loading