Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 184 additions & 71 deletions dissect/target/plugins/os/windows/ad/ntds.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,72 +16,116 @@
if TYPE_CHECKING:
from collections.abc import Iterator

from dissect.database.ese.ntds.objects import Computer, User
from dissect.database.ese.ntds.objects import Computer, DomainDNS, Object, OrganizationalUnit, SecurityObject, User

from dissect.target.target import Target


GENERIC_FIELDS = [
OBJECTS_FIELDS = [
("string", "cn"),
("string", "upn"),
("string", "sam_name"),
("string", "sam_type"),
("string", "description"),
("string", "sid"),
("string", "name"),
("string", "display_name"),
("string", "description"),
("string[]", "object_classes"),
("string", "distinguished_name"),
("string", "object_guid"),
("datetime", "creation_time"),
("datetime", "last_modified_time"),
("boolean", "is_deleted"),
("varint", "nt_security_descriptor"),
("string", "parent_guid"),
("string", "parent_type"),
]

SECURITY_PRINCIPAL_FIELDS = [
*OBJECTS_FIELDS,
("varint", "rid"),
("string", "sam_name"),
("string", "sam_type"),
("boolean", "admin_count"),
("string[]", "sid_history"),
]

ACCOUNT_FIELDS = [
*SECURITY_PRINCIPAL_FIELDS,
("string", "upn"),
("string", "user_account_control"),
("datetime", "password_last_set"),
("datetime", "logon_last_failed"),
("datetime", "logon_last_success"),
("datetime", "logon_last_success_observed"),
("datetime", "logon_last_success_reported"),
("datetime", "account_expires"),
("datetime", "creation_time"),
("datetime", "last_modified_time"),
("boolean", "admin_count"),
("boolean", "is_deleted"),
("uint32", "primary_group_id"),
("string[]", "member_of"),
("string[]", "allowed_to_delegate"),
("string", "lm"),
("string[]", "lm_history"),
("string", "nt"),
("string[]", "nt_history"),
("string", "supplemental_credentials"),
("string", "user_account_control"),
("string[]", "object_classes"),
("string", "distinguished_name"),
("string", "object_guid"),
("uint32", "primary_group_id"),
("string[]", "member_of"),
("string[]", "service_principal_name"),
("string", "info"),
("string", "comment"),
("string", "email"),
("string", "title"),
("string", "telephone_number"),
("string", "home_directory"),
("path", "logon_script"),
("string[]", "service_principal_names"),
]

CONTAINER_FIELDS = [
*OBJECTS_FIELDS,
("string", "gplink"),
]

# Record descriptor for NTDS user secrets
NtdsUserRecord = TargetRecordDescriptor(
"windows/ad/user",
[
*GENERIC_FIELDS,
("string", "info"),
("string", "comment"),
("string", "telephone_number"),
("string", "home_directory"),
*ACCOUNT_FIELDS,
],
)

NtdsComputerRecord = TargetRecordDescriptor(
"windows/ad/computer",
[
*GENERIC_FIELDS,
*ACCOUNT_FIELDS,
("string", "dns_hostname"),
("string", "operating_system"),
("string", "operating_system_version"),
("string[]", "service_principal_name"),
("varint", "allowed_to_act"),
],
)

NtdsGroupRecord = TargetRecordDescriptor(
"windows/ad/group",
[
*SECURITY_PRINCIPAL_FIELDS,
("string[]", "members"),
],
)

NtdsDomainRecord = TargetRecordDescriptor(
"windows/ad/domain",
[
*CONTAINER_FIELDS,
("uint32", "machine_account_quota"),
],
)

NtdsOURecord = TargetRecordDescriptor(
"windows/ad/ou",
[
*CONTAINER_FIELDS,
("boolean", "blocks_inheritance"),
],
)

NtdsGPORecord = TargetRecordDescriptor(
"windows/ad/gpo",
[
("string", "cn"),
("string", "distinguished_name"),
("string", "object_guid"),
("string", "name"),
("string", "display_name"),
("datetime", "creation_time"),
("datetime", "last_modified_time"),
*CONTAINER_FIELDS,
],
)

Expand Down Expand Up @@ -116,10 +160,7 @@ def __init__(self, target: Target):
self.path = self.target.fs.path(path)

def check_compatible(self) -> None:
if not self.target.has_function("lsa"):
raise UnsupportedPluginError("System Hive is not present or LSA function not available")

if not self.path.is_file():
if not self.path.is_file() or not self.path.exists():
raise UnsupportedPluginError("No NTDS.dit database found on target")

@cached_property
Expand All @@ -129,6 +170,8 @@ def ntds(self) -> NTDS:

if self.target.has_function("lsa"):
ntds.pek.unlock(self.target.lsa.syskey)
else:
self.target.log.warning("LSA plugin not available, cannot unlock PEK and decrypt sensitive data")

return ntds

Expand All @@ -138,10 +181,6 @@ def users(self) -> Iterator[NtdsUserRecord]:
for user in self.ntds.users():
yield NtdsUserRecord(
**extract_user_info(user, self.target),
info=user.get("info"),
comment=user.get("comment"),
telephone_number=user.get("telephoneNumber"),
home_directory=user.get("homeDirectory"),
_target=self.target,
)

Expand All @@ -154,30 +193,63 @@ def computers(self) -> Iterator[NtdsComputerRecord]:
dns_hostname=computer.get("dNSHostName"),
operating_system=computer.get("operatingSystem"),
operating_system_version=computer.get("operatingSystemVersion"),
service_principal_name=computer.get("servicePrincipalName"),
allowed_to_act=computer.get("msDS-AllowedToActOnBehalfOfOtherIdentity"),
_target=self.target,
)

@export(record=NtdsGroupRecord)
def groups(self) -> Iterator[NtdsGroupRecord]:
"""Extract all groups from the NTDS.dit database."""
for group in self.ntds.groups():
try:
members = [member.sid for member in group.members()]
except Exception as e:
members = []
self.target.log.warning("Failed to extract group members for group %s: %s", group, e)
self.target.log.debug("", exc_info=e)

yield NtdsGroupRecord(
**extract_security_info(group),
members=members,
_target=self.target,
)

@export(record=NtdsDomainRecord)
def domains(self) -> Iterator[NtdsDomainRecord]:
"""Extract all domains from the NTDS.dit database."""
for domain in self.ntds.search(objectClass="domainDNS"):
yield NtdsDomainRecord(
**extract_container_info(domain),
machine_account_quota=domain.get("ms-DS-MachineAccountQuota"),
_target=self.target,
)

@export(record=NtdsOURecord)
def ous(self) -> Iterator[NtdsOURecord]:
"""Extract all ou's from the NTDS.dit database."""
for ou in self.ntds.search(objectClass="organizationalUnit"):
gp_options = ou.get("gPOptions")
yield NtdsOURecord(
**extract_container_info(ou),
blocks_inheritance=gp_options == 1 if gp_options is not None else None,
_target=self.target,
)

@export(record=NtdsGPORecord)
def group_policies(self) -> Iterator[NtdsGPORecord]:
def gpos(self) -> Iterator[NtdsGPORecord]:
"""Extract all group policy objects (GPO) NTDS.dit database."""

for gpo in self.ntds.group_policies():
yield NtdsGPORecord(
cn=gpo.cn,
distinguished_name=gpo.distinguished_name,
object_guid=gpo.guid,
name=gpo.name,
display_name=gpo.display_name,
creation_time=gpo.when_created,
last_modified_time=gpo.when_changed,
**extract_container_info(gpo),
_target=self.target,
)

@export(output="yield")
def secretsdump(self) -> Iterator[str]:
"""Extract credentials in secretsdump format. Because it's a popular format."""

# Keep impacket defined constants in the method so we don't polute our own
# Keep impacket defined constants in the method so we don't pollute our own
kerberos_key_type = {
1: "dec-cbc-crc",
3: "des-cbc-md5",
Expand Down Expand Up @@ -226,15 +298,60 @@ def secretsdump(self) -> Iterator[str]:
yield f"{username}:CLEARTEXT:{supplemental['Primary:CLEARTEXT']}"


def extract_object_info(obj: Object) -> dict[str, Any]:
"""Extract generic information from an Object."""
parent = obj.parent()
return {
"cn": obj.cn,
"sid": obj.sid,
"name": obj.name,
"display_name": obj.display_name,
"description": obj.get("description"),
"object_classes": obj.object_class,
"distinguished_name": obj.distinguished_name,
"object_guid": obj.guid,
"creation_time": obj.when_created,
"last_modified_time": obj.when_changed,
"is_deleted": obj.is_deleted,
"nt_security_descriptor": obj.get("nTSecurityDescriptor"),
"parent_guid": str(parent.guid).upper(),
"parent_type": parent.object_category.capitalize(),
}


def extract_security_info(security_obj: SecurityObject) -> dict[str, Any]:
"""Extract generic information from a Security Object."""
return {
**extract_object_info(security_obj),
"rid": security_obj.rid,
"sam_name": security_obj.sam_account_name,
"sam_type": security_obj.get("sAMAccountType"),
"admin_count": bool(security_obj.get("adminCount")),
"sid_history": security_obj.get("sIDHistory"),
}


def extract_container_info(container_object: OrganizationalUnit | DomainDNS) -> dict[str, Any]:
"""Extract generic information from a Container Object."""
return {
**extract_object_info(container_object),
"gplink": container_object.get("gPLink"),
}


def extract_user_info(user: User | Computer, target: Target) -> dict[str, Any]:
"""Extract generic information from a User or Computer account."""

lm_hash = des_decrypt(lm_pwd, user.rid).hex() if (lm_pwd := user.get("dBCSPwd")) else DEFAULT_LM_HASH
nt_hash = des_decrypt(nt_pwd, user.rid).hex() if (nt_pwd := user.get("unicodePwd")) else DEFAULT_NT_HASH
if target.ad.ntds.pek.unlocked:
decrypt_func = lambda encrypted_hash, rid: des_decrypt(encrypted_hash, rid).hex() # noqa: E731
else:
decrypt_func = lambda *args, **kwargs: None # noqa: E731

lm_hash = decrypt_func(lm_pwd, user.rid) if (lm_pwd := user.get("dBCSPwd")) else DEFAULT_LM_HASH
nt_hash = decrypt_func(nt_pwd, user.rid) if (nt_pwd := user.get("unicodePwd")) else DEFAULT_NT_HASH

# Decrypt password history
lm_history = [des_decrypt(lm, user.rid).hex() for lm in user.get("lmPwdHistory")]
nt_history = [des_decrypt(nt, user.rid).hex() for nt in user.get("ntPwdHistory")]
lm_history = [decrypt_func(lm, user.rid) for lm in user.get("lmPwdHistory")]
nt_history = [decrypt_func(nt, user.rid) for nt in user.get("ntPwdHistory")]

try:
member_of = [group.distinguished_name for group in user.groups()]
Expand All @@ -243,33 +360,29 @@ def extract_user_info(user: User | Computer, target: Target) -> dict[str, Any]:
target.log.warning("Failed to extract group membership for user %s: %s", user, e)
target.log.debug("", exc_info=e)

# Extract supplemental credentials and yield records
return {
"cn": user.cn,
**extract_security_info(user),
"upn": user.get("userPrincipalName"),
"sam_name": user.sam_account_name,
"sam_type": user.sam_account_type.name,
"description": user.get("description"),
"sid": user.sid,
"rid": user.rid,
"password_last_set": user.get("pwdLastSet"),
"logon_last_failed": user.get("badPasswordTime"),
"logon_last_success": user.get("lastLogon"),
"logon_last_success_observed": user.get("lastLogon"),
"logon_last_success_reported": user.get("lastLogonTimestamp"),
"account_expires": user.get("accountExpires") if isinstance(user.get("accountExpires"), datetime) else None,
"creation_time": user.when_created,
"last_modified_time": user.when_changed,
"admin_count": user.get("adminCount"),
"is_deleted": user.is_deleted,
"lm": lm_hash,
"lm_history": lm_history,
"nt": nt_hash,
"nt_history": nt_history,
"supplemental_credentials": user.get("supplementalCredentials"),
"user_account_control": user.user_account_control.name,
"object_classes": user.object_class,
"distinguished_name": user.distinguished_name,
"object_guid": user.guid,
"primary_group_id": user.primary_group_id,
"member_of": member_of,
"service_principal_name": user.get("servicePrincipalName"),
"allowed_to_delegate": user.get("msDS-AllowedToDelegateTo"),
"info": user.get("info"),
"comment": user.get("comment"),
"email": user.get("mail"),
"title": user.get("title"),
"telephone_number": user.get("telephoneNumber"),
"home_directory": user.get("homeDirectory"),
"logon_script": user.get("scriptPath"),
"service_principal_names": user.get("servicePrincipalName"),
}
18 changes: 18 additions & 0 deletions tests/plugins/os/windows/ad/test_ntds.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,24 @@ def test_computers(target_win_ntds: Target) -> None:
assert cn_to_ntlm_hash_mapping[result.cn] == result.nt


def test_groups(target_win_ntds: Target) -> None:
results = list(target_win_ntds.ad.groups())

assert len(results) == 102


def test_domains(target_win_ntds: Target) -> None:
results = list(target_win_ntds.ad.domains())

assert len(results) == 5


def test_ous(target_win_ntds: Target) -> None:
results = list(target_win_ntds.ad.ous())

assert len(results) == 10


def test_group_policies(target_win_ntds: Target) -> None:
results = list(target_win_ntds.ad.group_policies())

Expand Down
Loading