diff --git a/dissect/database/ese/ntds/database.py b/dissect/database/ese/ntds/database.py index 7d080c6..8713f03 100644 --- a/dissect/database/ese/ntds/database.py +++ b/dissect/database/ese/ntds/database.py @@ -6,12 +6,12 @@ from dissect.database.ese.ese import ESE from dissect.database.ese.exception import KeyNotFoundError -from dissect.database.ese.ntds.objects import DomainDNS, Object +from dissect.database.ese.ntds.objects import DomainDNS, Object, Server from dissect.database.ese.ntds.pek import PEK from dissect.database.ese.ntds.query import Query from dissect.database.ese.ntds.schema import Schema from dissect.database.ese.ntds.sd import SecurityDescriptor -from dissect.database.ese.ntds.util import DN, SearchFlags, encode_value +from dissect.database.ese.ntds.util import DN, DatabaseFlags, SearchFlags, encode_value if TYPE_CHECKING: from collections.abc import Iterator @@ -34,78 +34,91 @@ def __init__(self, fh: BinaryIO): self.link = LinkTable(self) self.sd = SecurityDescriptorTable(self) - self.data.schema.load(self) - - -class DataTable: - """Represents the ``datatable`` in the NTDS database.""" - - def __init__(self, db: Database): - self.db = db - self.table = self.db.ese.table("datatable") - self.hiddentable = self.db.ese.table("hiddentable") + self.hiddentable = self.ese.table("hiddentable") self.hiddeninfo = next(self.hiddentable.records(), None) - self.schema = Schema() - - # Cache frequently used and "expensive" methods - self.get = lru_cache(4096)(self.get) - self._make_dn = lru_cache(4096)(self._make_dn) - - def dsa(self) -> NTDSDSA: - """Return the Directory System Agent (DSA) object.""" - if not self.hiddeninfo: - raise ValueError("No hiddentable information available") - return self.get(self.hiddeninfo.get("dsa_col")) - - def dmd(self) -> DMD: - """Return the Directory Management Domain (DMD) object, a.k.a. the schema container.""" - if not self.hiddeninfo: - raise ValueError("No hiddentable information available") - return self.get(self.dsa().get("dMDLocation", raw=True)) - - def root(self) -> Top: - """Return the top-level object in the NTDS database.""" - if (root := next(self.children_of(0), None)) is None: - raise ValueError("No root object found") - return root + self.data.schema.load(self) - def root_domain(self) -> DomainDNS | None: - """Return the root domain object in the NTDS database. For AD LDS, this will return ``None``.""" - stack = [self.root()] - while stack: - if (obj := stack.pop()).is_deleted: - continue + # Clear the cache of the data table to avoid caching results before the schema is loaded + self.data.get.cache_clear() + self.data._make_dn.cache_clear() - if isinstance(obj, DomainDNS) and obj.is_head_of_naming_context: - return obj + @cached_property + def flags(self) -> DatabaseFlags | None: + """Return the database flags.""" + if self.hiddeninfo is None: + return None - stack.extend(obj.children()) + result = DatabaseFlags(0) + flags = self.hiddeninfo.get("flags_col") + for idx, member in enumerate(DatabaseFlags.__members__.values()): + if flags[idx] == ord(b"1"): + result = member if result is None else result | member - return None + return result @cached_property def pek(self) -> PEK | None: """Return the PEK.""" - if (root_domain := self.root_domain()) is None: + if (domain := self.domain()) is None: # Maybe this is an AD LDS database - if (root_pek := self.root().get("pekList")) is None: + if (root_pek := self.data.root().get("pekList")) is None: # It's not return None # Lookup the schema pek and permutate the boot key # https://www.synacktiv.com/publications/using-ntdissector-to-extract-secrets-from-adam-ntds-files - schema_pek = self.lookup(objectClass="dMD").get("pekList") + schema_pek = self.dmd().get("pekList") boot_key = bytes( [root_pek[i] for i in [2, 4, 25, 9, 7, 27, 5, 11]] + [schema_pek[i] for i in [37, 2, 17, 36, 20, 11, 22, 7]] ) # Lookup the actual PEK and unlock it - pek = PEK(self.lookup(objectClass="configuration").get("pekList")) + pek = PEK(self.dmd().parent().get("pekList")) pek.unlock(boot_key) return pek - return root_domain.pek + + return domain.pek + + def dsa(self) -> NTDSDSA: + """Return the Directory System Agent (DSA) object, a.k.a. the NTDS Settings object.""" + if not self.hiddeninfo: + raise ValueError("No hiddentable information available") + return self.data.get(self.hiddeninfo.get("dsa_col")) + + def dmd(self) -> DMD: + """Return the Directory Management Domain (DMD) object, a.k.a. the schema container.""" + if not self.hiddeninfo: + raise ValueError("No hiddentable information available") + return self.data.get(self.dsa().get("dMDLocation", raw=True)) + + def dc(self) -> Server: + """Return the Domain Controller (DC) server object that corresponds to this NTDS database.""" + return self.dsa().parent() + + def domain(self) -> DomainDNS | None: + """Return the root domain object in the NTDS database. For AD LDS (ADAM), this will return ``None``.""" + return self.dsa().domain() + + +class DataTable: + """Represents the ``datatable`` in the NTDS database.""" + + def __init__(self, db: Database): + self.db = db + self.table = self.db.ese.table("datatable") + self.schema = Schema() + + # Cache frequently used and "expensive" methods + self.get = lru_cache(4096)(self.get) + self._make_dn = lru_cache(4096)(self._make_dn) + + def root(self) -> Top: + """Return the top-level object in the NTDS database.""" + if (root := next(self.children_of(0), None)) is None: + raise ValueError("No root object found") + return root def walk(self) -> Iterator[Object]: """Walk through all objects in the NTDS database.""" @@ -201,8 +214,11 @@ def children_of(self, dnt: int) -> Iterator[Object]: cursor.seek([dnt]) record = cursor.record() - while record is not None and record != end: + while record is not None: yield Object.from_record(self.db, record) + if record == end: + break + record = cursor.next() def _make_dn(self, dnt: int) -> DN: diff --git a/dissect/database/ese/ntds/ntds.py b/dissect/database/ese/ntds/ntds.py index 3b8fb5a..71f58f0 100644 --- a/dissect/database/ese/ntds/ntds.py +++ b/dissect/database/ese/ntds/ntds.py @@ -30,18 +30,18 @@ class NTDS: def __init__(self, fh: BinaryIO): self.db = Database(fh) + @property + def pek(self) -> PEK | None: + """Return the PEK associated with the root domain.""" + return self.db.pek + def root(self) -> Object: """Return the root object of the Active Directory.""" return self.db.data.root() - def root_domain(self) -> DomainDNS | None: + def domain(self) -> DomainDNS | None: """Return the root domain object of the Active Directory.""" - return self.db.data.root_domain() - - @property - def pek(self) -> PEK | None: - """Return the PEK associated with the root domain.""" - return self.db.data.pek + return self.db.domain() def walk(self) -> Iterator[Object]: """Walk through all objects in the NTDS database.""" diff --git a/dissect/database/ese/ntds/objects/ntdsdsa.py b/dissect/database/ese/ntds/objects/ntdsdsa.py index 65469e4..aeca2d8 100644 --- a/dissect/database/ese/ntds/objects/ntdsdsa.py +++ b/dissect/database/ese/ntds/objects/ntdsdsa.py @@ -7,7 +7,7 @@ if TYPE_CHECKING: from collections.abc import Iterator - from dissect.database.ese.ntds.objects import Object + from dissect.database.ese.ntds.objects import DomainDNS, MSDSOptionalFeature, Object class NTDSDSA(ApplicationSettings): @@ -19,6 +19,18 @@ class NTDSDSA(ApplicationSettings): __object_class__ = "nTDSDSA" + def domain(self) -> DomainDNS | None: + """Return the domain object associated with this NTDS DSA object, if any.""" + self._assert_local() + + return next(self.db.link.links(self.dnt, "msDS-HasDomainNCs"), None) + + def features(self) -> Iterator[MSDSOptionalFeature]: + """Return the optional features that are enabled on this NTDS DSA object.""" + self._assert_local() + + yield from self.db.link.links(self.dnt, "msDS-EnabledFeature") + def managed_by(self) -> Iterator[Object]: """Return the objects that manage this NTDS DSA object.""" self._assert_local() diff --git a/dissect/database/ese/ntds/objects/server.py b/dissect/database/ese/ntds/objects/server.py index f110a73..6ca4a56 100644 --- a/dissect/database/ese/ntds/objects/server.py +++ b/dissect/database/ese/ntds/objects/server.py @@ -7,7 +7,7 @@ if TYPE_CHECKING: from collections.abc import Iterator - from dissect.database.ese.ntds.objects import Object + from dissect.database.ese.ntds.objects import Computer, Object class Server(Top): @@ -19,6 +19,12 @@ class Server(Top): __object_class__ = "server" + def computer(self) -> Computer | None: + """Return the computer object associated with this server, if any.""" + self._assert_local() + + return next(self.db.link.links(self.dnt, "serverReference"), None) + def managed_by(self) -> Iterator[Object]: """Return the objects that manage this server.""" self._assert_local() diff --git a/dissect/database/ese/ntds/query.py b/dissect/database/ese/ntds/query.py index a95e18a..7cf5c1d 100644 --- a/dissect/database/ese/ntds/query.py +++ b/dissect/database/ese/ntds/query.py @@ -124,6 +124,7 @@ def _process_or_operation(self, filter: SearchFilter, records: Iterator[Record] Yields: Records matching any condition in the OR operation. """ + records = list(records) if records is not None else None for child in filter.children: yield from self._process_query(child, records=records) @@ -186,8 +187,10 @@ def _process_wildcard_tail(index: Index, filter_value: str) -> Iterator[Record]: # Yield all records in range record = cursor.record() - while record is not None and record != end: + while record is not None: yield record + if record == end: + break record = cursor.next() diff --git a/dissect/database/ese/ntds/schema.py b/dissect/database/ese/ntds/schema.py index 0d752de..2496423 100644 --- a/dissect/database/ese/ntds/schema.py +++ b/dissect/database/ese/ntds/schema.py @@ -203,7 +203,7 @@ def load(self, db: Database) -> None: # This _should_ have all the attribute and class schema entries # We used to perform an index search on objectClass (ATTc0, INDEX_00000000), but apparently # not all databases have this index - dmd = db.data.dmd() + dmd = db.dmd() # We bootstrapped these earlier attribute_schema = self.lookup_class(name="attributeSchema") @@ -233,7 +233,7 @@ def load(self, db: Database) -> None: ) # Load user-defined OID prefixes - if (prefix_map := db.data.dmd().get("prefixMap")) is not None: + if (prefix_map := dmd.get("prefixMap")) is not None: self._oid_idx_to_prefix_index.update(parse_prefix_map(prefix_map)) # Rebuild the reverse index self._oid_prefix_to_idx_index = {prefix: idx for idx, prefix in self._oid_idx_to_prefix_index.items()} diff --git a/dissect/database/ese/ntds/util.py b/dissect/database/ese/ntds/util.py index fba7631..46a4d76 100644 --- a/dissect/database/ese/ntds/util.py +++ b/dissect/database/ese/ntds/util.py @@ -1,7 +1,7 @@ from __future__ import annotations import struct -from enum import IntEnum, IntFlag +from enum import Flag, IntEnum, IntFlag, auto from typing import TYPE_CHECKING, Any from uuid import UUID @@ -18,6 +18,23 @@ from dissect.database.ese.ntds.schema import AttributeEntry +class DatabaseFlags(Flag): + """Database flags that are stored in the hiddentable. + + The flags are weirdly stored as ``1``, ``0`` or ``\x00`` in a byte array. + To make parsing a bit easier, we use the index of each flag in this class as the character offset in the byte array. + """ + + AUXCLASS = auto() + SD_CONVERSION_REQUIRED = auto() + ROOT_GUID_UPDATED = auto() + ADAM = auto() + ASCII_INDICES_REBUILT = auto() + SHOW_IN_AB_ARRAY_REBUILD = auto() + UPDATE_NC_TYPE_REQUIRED = auto() + LINK_QUOTA_USN = auto() + + # https://learn.microsoft.com/en-us/windows/win32/adschema/a-instancetype class InstanceType(IntFlag): HeadOfNamingContext = 0x00000001 @@ -102,10 +119,10 @@ def _pek_decrypt(db: Database, value: bytes) -> bytes: Returns: The decrypted data blob, or the original value if the PEK is locked. """ - if db.data.pek is None or not db.data.pek.unlocked: + if db.pek is None or not db.pek.unlocked: return value - return db.data.pek.decrypt(value) + return db.pek.decrypt(value) def _decode_supplemental_credentials(db: Database, value: bytes) -> dict[str, bytes] | bytes: @@ -118,10 +135,10 @@ def _decode_supplemental_credentials(db: Database, value: bytes) -> dict[str, by Returns: A dictionary mapping credential types to their data blobs, or the original value if the PEK is locked. """ - if db.data.pek is None or not db.data.pek.unlocked: + if db.pek is None or not db.pek.unlocked: return value - value = db.data.pek.decrypt(value) + value = db.pek.decrypt(value) header = c_ds.USER_PROPERTIES_HEADER(value) result = {} @@ -222,12 +239,12 @@ def _decode_pwd_history(db: Database, value: list[bytes]) -> list[bytes]: Returns: A list of decrypted password hashes, or the original value if the PEK is locked. """ - if db.data.pek is None or not db.data.pek.unlocked: + if db.pek is None or not db.pek.unlocked: return value result = [] for buf in value: - buf = db.data.pek.decrypt(buf) + buf = db.pek.decrypt(buf) # The history attributes can contain multiple hashes concatenated together, so we need to split them up # NT and LM hashes are both 16 bytes long result.extend(buf[i : i + 16] for i in range(0, len(buf), 16)) diff --git a/tests/ese/ntds/test_ntds.py b/tests/ese/ntds/test_ntds.py index e50c1e3..537e772 100644 --- a/tests/ese/ntds/test_ntds.py +++ b/tests/ese/ntds/test_ntds.py @@ -11,6 +11,23 @@ from dissect.database.ese.ntds import NTDS +def test_dsa(goad: NTDS) -> None: + """Test retrieval of the NTDS DSA object and its associated domain and features.""" + dsa = goad.db.dsa() + assert dsa is not None + assert dsa.domain() is not None + assert dsa.domain().name == "sevenkingdoms" + assert [f.name for f in dsa.features()] == ["Recycle Bin Feature"] + + +def test_dc(goad: NTDS) -> None: + """Test retrieval of the domain controller objects and their associated computer and managedBy links.""" + dc = goad.db.dc() + assert dc.name == "KINGSLANDING" + assert dc.computer() is not None + assert dc.computer().name == "KINGSLANDING" + + def test_groups(goad: NTDS) -> None: groups = sorted(goad.groups(), key=lambda x: x.distinguished_name) diff --git a/tests/ese/ntds/test_pek.py b/tests/ese/ntds/test_pek.py index 33249a4..4b87bff 100644 --- a/tests/ese/ntds/test_pek.py +++ b/tests/ese/ntds/test_pek.py @@ -2,6 +2,8 @@ from typing import TYPE_CHECKING +from dissect.database.ese.ntds.util import DatabaseFlags + if TYPE_CHECKING: from dissect.database.ese.ntds import NTDS @@ -44,6 +46,8 @@ def test_pek(goad: NTDS) -> None: def test_pek_adam(adam: NTDS) -> None: """Test PEK unlocking and decryption for AD LDS NTDS.dit.""" + assert DatabaseFlags.ADAM in adam.db.flags + # The PEK in AD LDS is derived within the database itself assert adam.pek.unlocked