Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 67 additions & 51 deletions dissect/database/ese/ntds/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from dissect.database.ese.ese import ESE
from dissect.database.ese.exception import KeyNotFoundError
from dissect.database.ese.ntds.objects import DomainDNS, Object
from dissect.database.ese.ntds.objects import DomainDNS, Object, Server
from dissect.database.ese.ntds.pek import PEK
from dissect.database.ese.ntds.query import Query
from dissect.database.ese.ntds.schema import Schema
from dissect.database.ese.ntds.sd import SecurityDescriptor
from dissect.database.ese.ntds.util import DN, SearchFlags, encode_value
from dissect.database.ese.ntds.util import DN, DatabaseFlags, SearchFlags, encode_value

if TYPE_CHECKING:
from collections.abc import Iterator
Expand All @@ -34,78 +34,91 @@ def __init__(self, fh: BinaryIO):
self.link = LinkTable(self)
self.sd = SecurityDescriptorTable(self)

self.data.schema.load(self)


class DataTable:
"""Represents the ``datatable`` in the NTDS database."""

def __init__(self, db: Database):
self.db = db
self.table = self.db.ese.table("datatable")
self.hiddentable = self.db.ese.table("hiddentable")
self.hiddentable = self.ese.table("hiddentable")
self.hiddeninfo = next(self.hiddentable.records(), None)

self.schema = Schema()

# Cache frequently used and "expensive" methods
self.get = lru_cache(4096)(self.get)
self._make_dn = lru_cache(4096)(self._make_dn)

def dsa(self) -> NTDSDSA:
"""Return the Directory System Agent (DSA) object."""
if not self.hiddeninfo:
raise ValueError("No hiddentable information available")
return self.get(self.hiddeninfo.get("dsa_col"))

def dmd(self) -> DMD:
"""Return the Directory Management Domain (DMD) object, a.k.a. the schema container."""
if not self.hiddeninfo:
raise ValueError("No hiddentable information available")
return self.get(self.dsa().get("dMDLocation", raw=True))

def root(self) -> Top:
"""Return the top-level object in the NTDS database."""
if (root := next(self.children_of(0), None)) is None:
raise ValueError("No root object found")
return root
self.data.schema.load(self)

def root_domain(self) -> DomainDNS | None:
"""Return the root domain object in the NTDS database. For AD LDS, this will return ``None``."""
stack = [self.root()]
while stack:
if (obj := stack.pop()).is_deleted:
continue
# Clear the cache of the data table to avoid caching results before the schema is loaded
self.data.get.cache_clear()
self.data._make_dn.cache_clear()

if isinstance(obj, DomainDNS) and obj.is_head_of_naming_context:
return obj
@cached_property
def flags(self) -> DatabaseFlags | None:
"""Return the database flags."""
if self.hiddeninfo is None:
return None

stack.extend(obj.children())
result = DatabaseFlags(0)
flags = self.hiddeninfo.get("flags_col")
for idx, member in enumerate(DatabaseFlags.__members__.values()):
if flags[idx] == ord(b"1"):
result = member if result is None else result | member

return None
return result

@cached_property
def pek(self) -> PEK | None:
"""Return the PEK."""
if (root_domain := self.root_domain()) is None:
if (domain := self.domain()) is None:
# Maybe this is an AD LDS database
if (root_pek := self.root().get("pekList")) is None:
if (root_pek := self.data.root().get("pekList")) is None:
# It's not
return None

# Lookup the schema pek and permutate the boot key
# https://www.synacktiv.com/publications/using-ntdissector-to-extract-secrets-from-adam-ntds-files
schema_pek = self.lookup(objectClass="dMD").get("pekList")
schema_pek = self.dmd().get("pekList")
boot_key = bytes(
[root_pek[i] for i in [2, 4, 25, 9, 7, 27, 5, 11]]
+ [schema_pek[i] for i in [37, 2, 17, 36, 20, 11, 22, 7]]
)

# Lookup the actual PEK and unlock it
pek = PEK(self.lookup(objectClass="configuration").get("pekList"))
pek = PEK(self.dmd().parent().get("pekList"))
pek.unlock(boot_key)
return pek
return root_domain.pek

return domain.pek

def dsa(self) -> NTDSDSA:
"""Return the Directory System Agent (DSA) object, a.k.a. the NTDS Settings object."""
if not self.hiddeninfo:
raise ValueError("No hiddentable information available")
return self.data.get(self.hiddeninfo.get("dsa_col"))

def dmd(self) -> DMD:
"""Return the Directory Management Domain (DMD) object, a.k.a. the schema container."""
if not self.hiddeninfo:
raise ValueError("No hiddentable information available")
return self.data.get(self.dsa().get("dMDLocation", raw=True))

def dc(self) -> Server:
"""Return the Domain Controller (DC) server object that corresponds to this NTDS database."""
return self.dsa().parent()

def domain(self) -> DomainDNS | None:
"""Return the root domain object in the NTDS database. For AD LDS (ADAM), this will return ``None``."""
return self.dsa().domain()


class DataTable:
"""Represents the ``datatable`` in the NTDS database."""

def __init__(self, db: Database):
self.db = db
self.table = self.db.ese.table("datatable")
self.schema = Schema()

# Cache frequently used and "expensive" methods
self.get = lru_cache(4096)(self.get)
self._make_dn = lru_cache(4096)(self._make_dn)

def root(self) -> Top:
"""Return the top-level object in the NTDS database."""
if (root := next(self.children_of(0), None)) is None:
raise ValueError("No root object found")
return root

def walk(self) -> Iterator[Object]:
"""Walk through all objects in the NTDS database."""
Expand Down Expand Up @@ -201,8 +214,11 @@ def children_of(self, dnt: int) -> Iterator[Object]:
cursor.seek([dnt])

record = cursor.record()
while record is not None and record != end:
while record is not None:
yield Object.from_record(self.db, record)
if record == end:
break

record = cursor.next()

def _make_dn(self, dnt: int) -> DN:
Expand Down
14 changes: 7 additions & 7 deletions dissect/database/ese/ntds/ntds.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ class NTDS:
def __init__(self, fh: BinaryIO):
self.db = Database(fh)

@property
def pek(self) -> PEK | None:
"""Return the PEK associated with the root domain."""
return self.db.pek

def root(self) -> Object:
"""Return the root object of the Active Directory."""
return self.db.data.root()

def root_domain(self) -> DomainDNS | None:
def domain(self) -> DomainDNS | None:
"""Return the root domain object of the Active Directory."""
return self.db.data.root_domain()

@property
def pek(self) -> PEK | None:
"""Return the PEK associated with the root domain."""
return self.db.data.pek
return self.db.domain()

def walk(self) -> Iterator[Object]:
"""Walk through all objects in the NTDS database."""
Expand Down
14 changes: 13 additions & 1 deletion dissect/database/ese/ntds/objects/ntdsdsa.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
if TYPE_CHECKING:
from collections.abc import Iterator

from dissect.database.ese.ntds.objects import Object
from dissect.database.ese.ntds.objects import DomainDNS, MSDSOptionalFeature, Object


class NTDSDSA(ApplicationSettings):
Expand All @@ -19,6 +19,18 @@ class NTDSDSA(ApplicationSettings):

__object_class__ = "nTDSDSA"

def domain(self) -> DomainDNS | None:
"""Return the domain object associated with this NTDS DSA object, if any."""
self._assert_local()

return next(self.db.link.links(self.dnt, "msDS-HasDomainNCs"), None)

def features(self) -> Iterator[MSDSOptionalFeature]:
"""Return the optional features that are enabled on this NTDS DSA object."""
self._assert_local()

yield from self.db.link.links(self.dnt, "msDS-EnabledFeature")

def managed_by(self) -> Iterator[Object]:
"""Return the objects that manage this NTDS DSA object."""
self._assert_local()
Expand Down
8 changes: 7 additions & 1 deletion dissect/database/ese/ntds/objects/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
if TYPE_CHECKING:
from collections.abc import Iterator

from dissect.database.ese.ntds.objects import Object
from dissect.database.ese.ntds.objects import Computer, Object


class Server(Top):
Expand All @@ -19,6 +19,12 @@ class Server(Top):

__object_class__ = "server"

def computer(self) -> Computer | None:
"""Return the computer object associated with this server, if any."""
self._assert_local()

return next(self.db.link.links(self.dnt, "serverReference"), None)

def managed_by(self) -> Iterator[Object]:
"""Return the objects that manage this server."""
self._assert_local()
Expand Down
5 changes: 4 additions & 1 deletion dissect/database/ese/ntds/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def _process_or_operation(self, filter: SearchFilter, records: Iterator[Record]
Yields:
Records matching any condition in the OR operation.
"""
records = list(records) if records is not None else None
for child in filter.children:
yield from self._process_query(child, records=records)

Expand Down Expand Up @@ -186,8 +187,10 @@ def _process_wildcard_tail(index: Index, filter_value: str) -> Iterator[Record]:

# Yield all records in range
record = cursor.record()
while record is not None and record != end:
while record is not None:
yield record
if record == end:
break
record = cursor.next()


Expand Down
4 changes: 2 additions & 2 deletions dissect/database/ese/ntds/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def load(self, db: Database) -> None:
# This _should_ have all the attribute and class schema entries
# We used to perform an index search on objectClass (ATTc0, INDEX_00000000), but apparently
# not all databases have this index
dmd = db.data.dmd()
dmd = db.dmd()

# We bootstrapped these earlier
attribute_schema = self.lookup_class(name="attributeSchema")
Expand Down Expand Up @@ -233,7 +233,7 @@ def load(self, db: Database) -> None:
)

# Load user-defined OID prefixes
if (prefix_map := db.data.dmd().get("prefixMap")) is not None:
if (prefix_map := dmd.get("prefixMap")) is not None:
self._oid_idx_to_prefix_index.update(parse_prefix_map(prefix_map))
# Rebuild the reverse index
self._oid_prefix_to_idx_index = {prefix: idx for idx, prefix in self._oid_idx_to_prefix_index.items()}
Expand Down
31 changes: 24 additions & 7 deletions dissect/database/ese/ntds/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import struct
from enum import IntEnum, IntFlag
from enum import Flag, IntEnum, IntFlag, auto
from typing import TYPE_CHECKING, Any
from uuid import UUID

Expand All @@ -18,6 +18,23 @@
from dissect.database.ese.ntds.schema import AttributeEntry


class DatabaseFlags(Flag):
"""Database flags that are stored in the hiddentable.

The flags are weirdly stored as ``1``, ``0`` or ``\x00`` in a byte array.
To make parsing a bit easier, we use the index of each flag in this class as the character offset in the byte array.
"""

AUXCLASS = auto()
SD_CONVERSION_REQUIRED = auto()
ROOT_GUID_UPDATED = auto()
ADAM = auto()
ASCII_INDICES_REBUILT = auto()
SHOW_IN_AB_ARRAY_REBUILD = auto()
UPDATE_NC_TYPE_REQUIRED = auto()
LINK_QUOTA_USN = auto()


# https://learn.microsoft.com/en-us/windows/win32/adschema/a-instancetype
class InstanceType(IntFlag):
HeadOfNamingContext = 0x00000001
Expand Down Expand Up @@ -102,10 +119,10 @@ def _pek_decrypt(db: Database, value: bytes) -> bytes:
Returns:
The decrypted data blob, or the original value if the PEK is locked.
"""
if db.data.pek is None or not db.data.pek.unlocked:
if db.pek is None or not db.pek.unlocked:
return value

return db.data.pek.decrypt(value)
return db.pek.decrypt(value)


def _decode_supplemental_credentials(db: Database, value: bytes) -> dict[str, bytes] | bytes:
Expand All @@ -118,10 +135,10 @@ def _decode_supplemental_credentials(db: Database, value: bytes) -> dict[str, by
Returns:
A dictionary mapping credential types to their data blobs, or the original value if the PEK is locked.
"""
if db.data.pek is None or not db.data.pek.unlocked:
if db.pek is None or not db.pek.unlocked:
return value

value = db.data.pek.decrypt(value)
value = db.pek.decrypt(value)
header = c_ds.USER_PROPERTIES_HEADER(value)

result = {}
Expand Down Expand Up @@ -222,12 +239,12 @@ def _decode_pwd_history(db: Database, value: list[bytes]) -> list[bytes]:
Returns:
A list of decrypted password hashes, or the original value if the PEK is locked.
"""
if db.data.pek is None or not db.data.pek.unlocked:
if db.pek is None or not db.pek.unlocked:
return value

result = []
for buf in value:
buf = db.data.pek.decrypt(buf)
buf = db.pek.decrypt(buf)
# The history attributes can contain multiple hashes concatenated together, so we need to split them up
# NT and LM hashes are both 16 bytes long
result.extend(buf[i : i + 16] for i in range(0, len(buf), 16))
Expand Down
17 changes: 17 additions & 0 deletions tests/ese/ntds/test_ntds.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@
from dissect.database.ese.ntds import NTDS


def test_dsa(goad: NTDS) -> None:
"""Test retrieval of the NTDS DSA object and its associated domain and features."""
dsa = goad.db.dsa()
assert dsa is not None
assert dsa.domain() is not None
assert dsa.domain().name == "sevenkingdoms"
assert [f.name for f in dsa.features()] == ["Recycle Bin Feature"]


def test_dc(goad: NTDS) -> None:
"""Test retrieval of the domain controller objects and their associated computer and managedBy links."""
dc = goad.db.dc()
assert dc.name == "KINGSLANDING"
assert dc.computer() is not None
assert dc.computer().name == "KINGSLANDING"


def test_groups(goad: NTDS) -> None:
groups = sorted(goad.groups(), key=lambda x: x.distinguished_name)

Expand Down
4 changes: 4 additions & 0 deletions tests/ese/ntds/test_pek.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import TYPE_CHECKING

from dissect.database.ese.ntds.util import DatabaseFlags

if TYPE_CHECKING:
from dissect.database.ese.ntds import NTDS

Expand Down Expand Up @@ -44,6 +46,8 @@ def test_pek(goad: NTDS) -> None:

def test_pek_adam(adam: NTDS) -> None:
"""Test PEK unlocking and decryption for AD LDS NTDS.dit."""
assert DatabaseFlags.ADAM in adam.db.flags

# The PEK in AD LDS is derived within the database itself
assert adam.pek.unlocked

Expand Down
Loading