diff --git a/dissect/target/plugins/os/windows/ad/ntds.py b/dissect/target/plugins/os/windows/ad/ntds.py index 05b29c9222..a41293bdf8 100644 --- a/dissect/target/plugins/os/windows/ad/ntds.py +++ b/dissect/target/plugins/os/windows/ad/ntds.py @@ -16,72 +16,116 @@ if TYPE_CHECKING: from collections.abc import Iterator - from dissect.database.ese.ntds.objects import Computer, User + from dissect.database.ese.ntds.objects import Computer, DomainDNS, Object, OrganizationalUnit, SecurityObject, User from dissect.target.target import Target -GENERIC_FIELDS = [ +OBJECTS_FIELDS = [ ("string", "cn"), - ("string", "upn"), - ("string", "sam_name"), - ("string", "sam_type"), - ("string", "description"), ("string", "sid"), + ("string", "name"), + ("string", "display_name"), + ("string", "description"), + ("string[]", "object_classes"), + ("string", "distinguished_name"), + ("string", "object_guid"), + ("datetime", "creation_time"), + ("datetime", "last_modified_time"), + ("boolean", "is_deleted"), + ("varint", "nt_security_descriptor"), + ("string", "parent_guid"), + ("string", "parent_type"), +] + +SECURITY_PRINCIPAL_FIELDS = [ + *OBJECTS_FIELDS, ("varint", "rid"), + ("string", "sam_name"), + ("string", "sam_type"), + ("boolean", "admin_count"), + ("string[]", "sid_history"), +] + +ACCOUNT_FIELDS = [ + *SECURITY_PRINCIPAL_FIELDS, + ("string", "upn"), + ("string", "user_account_control"), ("datetime", "password_last_set"), ("datetime", "logon_last_failed"), - ("datetime", "logon_last_success"), + ("datetime", "logon_last_success_observed"), + ("datetime", "logon_last_success_reported"), ("datetime", "account_expires"), - ("datetime", "creation_time"), - ("datetime", "last_modified_time"), - ("boolean", "admin_count"), - ("boolean", "is_deleted"), + ("uint32", "primary_group_id"), + ("string[]", "member_of"), + ("string[]", "allowed_to_delegate"), ("string", "lm"), ("string[]", "lm_history"), ("string", "nt"), ("string[]", "nt_history"), ("string", "supplemental_credentials"), - ("string", "user_account_control"), - ("string[]", "object_classes"), - ("string", "distinguished_name"), - ("string", "object_guid"), - ("uint32", "primary_group_id"), - ("string[]", "member_of"), - ("string[]", "service_principal_name"), + ("string", "info"), + ("string", "comment"), + ("string", "email"), + ("string", "title"), + ("string", "telephone_number"), + ("string", "home_directory"), + ("path", "logon_script"), + ("string[]", "service_principal_names"), +] + +CONTAINER_FIELDS = [ + *OBJECTS_FIELDS, + ("string", "gplink"), ] -# Record descriptor for NTDS user secrets NtdsUserRecord = TargetRecordDescriptor( "windows/ad/user", [ - *GENERIC_FIELDS, - ("string", "info"), - ("string", "comment"), - ("string", "telephone_number"), - ("string", "home_directory"), + *ACCOUNT_FIELDS, ], ) + NtdsComputerRecord = TargetRecordDescriptor( "windows/ad/computer", [ - *GENERIC_FIELDS, + *ACCOUNT_FIELDS, ("string", "dns_hostname"), ("string", "operating_system"), ("string", "operating_system_version"), + ("string[]", "service_principal_name"), + ("varint", "allowed_to_act"), + ], +) + +NtdsGroupRecord = TargetRecordDescriptor( + "windows/ad/group", + [ + *SECURITY_PRINCIPAL_FIELDS, + ("string[]", "members"), + ], +) + +NtdsDomainRecord = TargetRecordDescriptor( + "windows/ad/domain", + [ + *CONTAINER_FIELDS, + ("uint32", "machine_account_quota"), + ], +) + +NtdsOURecord = TargetRecordDescriptor( + "windows/ad/ou", + [ + *CONTAINER_FIELDS, + ("boolean", "blocks_inheritance"), ], ) NtdsGPORecord = TargetRecordDescriptor( "windows/ad/gpo", [ - ("string", "cn"), - ("string", "distinguished_name"), - ("string", "object_guid"), - ("string", "name"), - ("string", "display_name"), - ("datetime", "creation_time"), - ("datetime", "last_modified_time"), + *CONTAINER_FIELDS, ], ) @@ -116,10 +160,7 @@ def __init__(self, target: Target): self.path = self.target.fs.path(path) def check_compatible(self) -> None: - if not self.target.has_function("lsa"): - raise UnsupportedPluginError("System Hive is not present or LSA function not available") - - if not self.path.is_file(): + if not self.path.is_file() or not self.path.exists(): raise UnsupportedPluginError("No NTDS.dit database found on target") @cached_property @@ -129,6 +170,8 @@ def ntds(self) -> NTDS: if self.target.has_function("lsa"): ntds.pek.unlock(self.target.lsa.syskey) + else: + self.target.log.warning("LSA plugin not available, cannot unlock PEK and decrypt sensitive data") return ntds @@ -138,10 +181,6 @@ def users(self) -> Iterator[NtdsUserRecord]: for user in self.ntds.users(): yield NtdsUserRecord( **extract_user_info(user, self.target), - info=user.get("info"), - comment=user.get("comment"), - telephone_number=user.get("telephoneNumber"), - home_directory=user.get("homeDirectory"), _target=self.target, ) @@ -154,22 +193,55 @@ def computers(self) -> Iterator[NtdsComputerRecord]: dns_hostname=computer.get("dNSHostName"), operating_system=computer.get("operatingSystem"), operating_system_version=computer.get("operatingSystemVersion"), + service_principal_name=computer.get("servicePrincipalName"), + allowed_to_act=computer.get("msDS-AllowedToActOnBehalfOfOtherIdentity"), + _target=self.target, + ) + + @export(record=NtdsGroupRecord) + def groups(self) -> Iterator[NtdsGroupRecord]: + """Extract all groups from the NTDS.dit database.""" + for group in self.ntds.groups(): + try: + members = [member.sid for member in group.members()] + except Exception as e: + members = [] + self.target.log.warning("Failed to extract group members for group %s: %s", group, e) + self.target.log.debug("", exc_info=e) + + yield NtdsGroupRecord( + **extract_security_info(group), + members=members, + _target=self.target, + ) + + @export(record=NtdsDomainRecord) + def domains(self) -> Iterator[NtdsDomainRecord]: + """Extract all domains from the NTDS.dit database.""" + for domain in self.ntds.search(objectClass="domainDNS"): + yield NtdsDomainRecord( + **extract_container_info(domain), + machine_account_quota=domain.get("ms-DS-MachineAccountQuota"), + _target=self.target, + ) + + @export(record=NtdsOURecord) + def ous(self) -> Iterator[NtdsOURecord]: + """Extract all ou's from the NTDS.dit database.""" + for ou in self.ntds.search(objectClass="organizationalUnit"): + gp_options = ou.get("gPOptions") + yield NtdsOURecord( + **extract_container_info(ou), + blocks_inheritance=gp_options == 1 if gp_options is not None else None, _target=self.target, ) @export(record=NtdsGPORecord) - def group_policies(self) -> Iterator[NtdsGPORecord]: + def gpos(self) -> Iterator[NtdsGPORecord]: """Extract all group policy objects (GPO) NTDS.dit database.""" - for gpo in self.ntds.group_policies(): yield NtdsGPORecord( - cn=gpo.cn, - distinguished_name=gpo.distinguished_name, - object_guid=gpo.guid, - name=gpo.name, - display_name=gpo.display_name, - creation_time=gpo.when_created, - last_modified_time=gpo.when_changed, + **extract_container_info(gpo), _target=self.target, ) @@ -177,7 +249,7 @@ def group_policies(self) -> Iterator[NtdsGPORecord]: def secretsdump(self) -> Iterator[str]: """Extract credentials in secretsdump format. Because it's a popular format.""" - # Keep impacket defined constants in the method so we don't polute our own + # Keep impacket defined constants in the method so we don't pollute our own kerberos_key_type = { 1: "dec-cbc-crc", 3: "des-cbc-md5", @@ -226,15 +298,60 @@ def secretsdump(self) -> Iterator[str]: yield f"{username}:CLEARTEXT:{supplemental['Primary:CLEARTEXT']}" +def extract_object_info(obj: Object) -> dict[str, Any]: + """Extract generic information from an Object.""" + parent = obj.parent() + return { + "cn": obj.cn, + "sid": obj.sid, + "name": obj.name, + "display_name": obj.display_name, + "description": obj.get("description"), + "object_classes": obj.object_class, + "distinguished_name": obj.distinguished_name, + "object_guid": obj.guid, + "creation_time": obj.when_created, + "last_modified_time": obj.when_changed, + "is_deleted": obj.is_deleted, + "nt_security_descriptor": obj.get("nTSecurityDescriptor"), + "parent_guid": str(parent.guid).upper(), + "parent_type": parent.object_category.capitalize(), + } + + +def extract_security_info(security_obj: SecurityObject) -> dict[str, Any]: + """Extract generic information from a Security Object.""" + return { + **extract_object_info(security_obj), + "rid": security_obj.rid, + "sam_name": security_obj.sam_account_name, + "sam_type": security_obj.get("sAMAccountType"), + "admin_count": bool(security_obj.get("adminCount")), + "sid_history": security_obj.get("sIDHistory"), + } + + +def extract_container_info(container_object: OrganizationalUnit | DomainDNS) -> dict[str, Any]: + """Extract generic information from a Container Object.""" + return { + **extract_object_info(container_object), + "gplink": container_object.get("gPLink"), + } + + def extract_user_info(user: User | Computer, target: Target) -> dict[str, Any]: """Extract generic information from a User or Computer account.""" - lm_hash = des_decrypt(lm_pwd, user.rid).hex() if (lm_pwd := user.get("dBCSPwd")) else DEFAULT_LM_HASH - nt_hash = des_decrypt(nt_pwd, user.rid).hex() if (nt_pwd := user.get("unicodePwd")) else DEFAULT_NT_HASH + if target.ad.ntds.pek.unlocked: + decrypt_func = lambda encrypted_hash, rid: des_decrypt(encrypted_hash, rid).hex() # noqa: E731 + else: + decrypt_func = lambda *args, **kwargs: None # noqa: E731 + + lm_hash = decrypt_func(lm_pwd, user.rid) if (lm_pwd := user.get("dBCSPwd")) else DEFAULT_LM_HASH + nt_hash = decrypt_func(nt_pwd, user.rid) if (nt_pwd := user.get("unicodePwd")) else DEFAULT_NT_HASH - # Decrypt password history - lm_history = [des_decrypt(lm, user.rid).hex() for lm in user.get("lmPwdHistory")] - nt_history = [des_decrypt(nt, user.rid).hex() for nt in user.get("ntPwdHistory")] + lm_history = [decrypt_func(lm, user.rid) for lm in user.get("lmPwdHistory")] + nt_history = [decrypt_func(nt, user.rid) for nt in user.get("ntPwdHistory")] try: member_of = [group.distinguished_name for group in user.groups()] @@ -243,33 +360,29 @@ def extract_user_info(user: User | Computer, target: Target) -> dict[str, Any]: target.log.warning("Failed to extract group membership for user %s: %s", user, e) target.log.debug("", exc_info=e) - # Extract supplemental credentials and yield records return { - "cn": user.cn, + **extract_security_info(user), "upn": user.get("userPrincipalName"), - "sam_name": user.sam_account_name, - "sam_type": user.sam_account_type.name, - "description": user.get("description"), - "sid": user.sid, - "rid": user.rid, "password_last_set": user.get("pwdLastSet"), "logon_last_failed": user.get("badPasswordTime"), - "logon_last_success": user.get("lastLogon"), + "logon_last_success_observed": user.get("lastLogon"), + "logon_last_success_reported": user.get("lastLogonTimestamp"), "account_expires": user.get("accountExpires") if isinstance(user.get("accountExpires"), datetime) else None, - "creation_time": user.when_created, - "last_modified_time": user.when_changed, - "admin_count": user.get("adminCount"), - "is_deleted": user.is_deleted, "lm": lm_hash, "lm_history": lm_history, "nt": nt_hash, "nt_history": nt_history, "supplemental_credentials": user.get("supplementalCredentials"), "user_account_control": user.user_account_control.name, - "object_classes": user.object_class, - "distinguished_name": user.distinguished_name, - "object_guid": user.guid, "primary_group_id": user.primary_group_id, "member_of": member_of, - "service_principal_name": user.get("servicePrincipalName"), + "allowed_to_delegate": user.get("msDS-AllowedToDelegateTo"), + "info": user.get("info"), + "comment": user.get("comment"), + "email": user.get("mail"), + "title": user.get("title"), + "telephone_number": user.get("telephoneNumber"), + "home_directory": user.get("homeDirectory"), + "logon_script": user.get("scriptPath"), + "service_principal_names": user.get("servicePrincipalName"), } diff --git a/tests/plugins/os/windows/ad/test_ntds.py b/tests/plugins/os/windows/ad/test_ntds.py index 4d6da2c3c8..64ce05b64a 100644 --- a/tests/plugins/os/windows/ad/test_ntds.py +++ b/tests/plugins/os/windows/ad/test_ntds.py @@ -91,6 +91,24 @@ def test_computers(target_win_ntds: Target) -> None: assert cn_to_ntlm_hash_mapping[result.cn] == result.nt +def test_groups(target_win_ntds: Target) -> None: + results = list(target_win_ntds.ad.groups()) + + assert len(results) == 102 + + +def test_domains(target_win_ntds: Target) -> None: + results = list(target_win_ntds.ad.domains()) + + assert len(results) == 5 + + +def test_ous(target_win_ntds: Target) -> None: + results = list(target_win_ntds.ad.ous()) + + assert len(results) == 10 + + def test_group_policies(target_win_ntds: Target) -> None: results = list(target_win_ntds.ad.group_policies())