From b382f4c030a80d3c758332fa7e5ecc179f71d6bc Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Thu, 27 Nov 2025 16:38:10 +0100 Subject: [PATCH 1/6] Add certificate parsing to webserver plugins --- dissect/target/helpers/certificate.py | 105 ++++++++++++++++++ .../target/plugins/apps/webserver/apache.py | 54 +++++++-- .../target/plugins/apps/webserver/caddy.py | 1 + dissect/target/plugins/apps/webserver/iis.py | 1 + .../target/plugins/apps/webserver/nginx.py | 78 +++++++++---- .../plugins/apps/webserver/webserver.py | 23 +++- .../_data/plugins/apps/webserver/example.crt | 3 + .../_data/plugins/apps/webserver/example.key | 3 + tests/plugins/apps/webserver/test_apache.py | 42 +++++++ tests/plugins/apps/webserver/test_nginx.py | 34 ++++++ 10 files changed, 312 insertions(+), 32 deletions(-) create mode 100755 dissect/target/helpers/certificate.py create mode 100644 tests/_data/plugins/apps/webserver/example.crt create mode 100644 tests/_data/plugins/apps/webserver/example.key diff --git a/dissect/target/helpers/certificate.py b/dissect/target/helpers/certificate.py new file mode 100755 index 0000000000..dec92fee9c --- /dev/null +++ b/dissect/target/helpers/certificate.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import base64 +import binascii +import hashlib +from pathlib import Path + +from asn1crypto import pem, x509 +from flow.record import RecordDescriptor + +COMMON_CERTIFICATE_FIELDS = [ + ("digest", "fingerprint"), + ("varint", "serial_number"), + ("datetime", "not_valid_before"), + ("datetime", "not_valid_after"), + ("string", "issuer_dn"), + ("string", "subject_dn"), + ("bytes", "pem"), +] + +CertificateRecord = RecordDescriptor( + "certificate", + [ + *COMMON_CERTIFICATE_FIELDS, + ], +) + +# Translation layer for asn1crypto names to RFC4514 names. +# References: https://github.com/wbond/asn1crypto/blob/master/asn1crypto/x509.py @ NameType +# References: https://github.com/pyca/cryptography/blob/main/src/cryptography/x509/name.py +NAMEOID_TO_NAME = { + "common_name": "CN", # 2.5.4.3 + "country_name": "C", # 2.5.4.6 + "locality_name": "L", # 2.5.4.7 + "state_or_province_name": "ST", # 2.5.4.8 + "street_address": "STREET", # 2.5.4.9 + "organization_name": "O", # 2.5.4.10 + "organizational_unit_name": "OU", # 2.5.4.11 + "domain_component": "DC", # 0.9.2342.192.00300.100.1.25 + "user_id": "UID", # 0.9.2342.192.00300.100.1.1 +} + + +def compute_pem_fingerprints(pem: str | bytes) -> tuple[str, str, str]: + """Compute the MD5, SHA-1 and SHA-256 fingerprint hash of a x509 certificate PEM.""" + + if pem is None: + raise ValueError("No PEM provided") + + if isinstance(pem, bytes): + pem = pem.decode() + + elif not isinstance(pem, str): + raise TypeError("Provided PEM is not str or bytes") + + stripped_pem = pem.strip().removeprefix("-----BEGIN CERTIFICATE-----").removesuffix("-----END CERTIFICATE-----") + + try: + der = base64.b64decode(stripped_pem) + except binascii.Error as e: + raise ValueError(f"Unable to parse PEM: {e!s}") from e + + md5 = hashlib.md5(der).hexdigest() + sha1 = hashlib.sha1(der).hexdigest() + sha256 = hashlib.sha256(der).hexdigest() + + return md5, sha1, sha256 + + +def parse_x509(file: str | bytes | Path) -> CertificateRecord: + """Parses a PEM file. Returns a CertificateREcord. Does not parse a public key embedded in a x509 certificate.""" + + if isinstance(file, str): + content = file.encode() + + elif isinstance(file, bytes): + content = file + + elif isinstance(file, Path) or hasattr(file, "read_bytes"): + content = file.read_bytes() + + else: + raise TypeError("Parameter file is not of type str, bytes or Path") + + md5, _, _ = compute_pem_fingerprints(content.decode()) + _, _, der = pem.unarmor(content) + crt = x509.Certificate.load(der) + + issuer = [] + for key, value in crt.issuer.native.items(): + issuer.append(f"{NAMEOID_TO_NAME.get(key, key)}={value}") + + subject = [] + for key, value in crt.subject.native.items(): + subject.append(f"{NAMEOID_TO_NAME.get(key, key)}={value}") + + return CertificateRecord( + not_valid_before=crt.not_valid_before, + not_valid_after=crt.not_valid_after, + issuer_dn=",".join(issuer), + subject_dn=",".join(subject), + fingerprint=(md5, crt.sha1.hex(), crt.sha256.hex()), + serial_number=crt.serial_number, + pem=crt.dump(), + ) diff --git a/dissect/target/plugins/apps/webserver/apache.py b/dissect/target/plugins/apps/webserver/apache.py index 9d61b10116..3e0ce6b8ca 100644 --- a/dissect/target/plugins/apps/webserver/apache.py +++ b/dissect/target/plugins/apps/webserver/apache.py @@ -7,10 +7,12 @@ from typing import TYPE_CHECKING, NamedTuple from dissect.target.exceptions import FileNotFoundError, UnsupportedPluginError +from dissect.target.helpers.certificate import parse_x509 from dissect.target.helpers.fsutil import open_decompress from dissect.target.plugin import OperatingSystem, export from dissect.target.plugins.apps.webserver.webserver import ( WebserverAccessLogRecord, + WebserverCertificateRecord, WebserverErrorLogRecord, WebserverHostRecord, WebserverPlugin, @@ -409,6 +411,7 @@ def access(self) -> Iterator[WebserverAccessLogRecord]: yield WebserverAccessLogRecord( ts=datetime.strptime(log["ts"], "%d/%b/%Y:%H:%M:%S %z"), + webserver=self.__namespace__, remote_user=clean_value(log["remote_user"]), remote_ip=log["remote_ip"], local_ip=clean_value(log.get("local_ip")), @@ -458,6 +461,7 @@ def error(self) -> Iterator[WebserverErrorLogRecord]: yield WebserverErrorLogRecord( ts=ts, + webserver=self.__namespace__, pid=log.get("pid"), remote_ip=remote_ip, module=log["module"], @@ -483,32 +487,64 @@ def hosts(self) -> Iterator[WebserverHostRecord]: for path in self.virtual_hosts: # A configuration file can contain multiple VirtualHost directives. - current_vhost = {} + vhost = {} for line in path.open("rt"): line_lower = line.lower() if " Iterator[WebserverCertificateRecord]: + """Return host certificates for found Apache ``VirtualHost`` directives.""" + certs = set() + + for host in self.hosts(): + if host.tls_certificate and (cert_path := self.target.fs.path(host.tls_certificate)).is_file(): + certs.add(cert_path) + + if self.server_root: + for cert_path in itertools.chain(self.server_root.glob("**/*.crt"), self.server_root.glob("**/*.pem")): + if cert_path not in certs: + certs.add(cert_path) + + for cert_path in certs: + try: + cert = parse_x509(cert_path) + yield WebserverCertificateRecord( + ts=cert_path.lstat().st_mtime, + webserver=self.__namespace__, + **cert._asdict(), + host=host.server_name, + source=cert_path, + _target=self.target, + ) + except Exception as e: # noqa: PERF203 + self.target.log.warning("Unable to parse certificate %s :%s", cert_path, e) + self.target.log.debug("", exc_info=e) def _iterate_log_lines(self, paths: list[Path]) -> Iterator[tuple[str, Path]]: """Iterate through a list of paths and yield tuples of loglines and the path of the file where they're from.""" diff --git a/dissect/target/plugins/apps/webserver/caddy.py b/dissect/target/plugins/apps/webserver/caddy.py index acbaa13a55..cc636bb6df 100644 --- a/dissect/target/plugins/apps/webserver/caddy.py +++ b/dissect/target/plugins/apps/webserver/caddy.py @@ -142,6 +142,7 @@ def access(self) -> Iterator[WebserverAccessLogRecord]: log = match.groupdict() yield WebserverAccessLogRecord( ts=datetime.strptime(log["ts"], "%d/%b/%Y:%H:%M:%S %z"), + webserver="caddy", remote_ip=log["remote_ip"], method=log["method"], uri=log["uri"], diff --git a/dissect/target/plugins/apps/webserver/iis.py b/dissect/target/plugins/apps/webserver/iis.py index afd70f78c1..640bb1ecec 100644 --- a/dissect/target/plugins/apps/webserver/iis.py +++ b/dissect/target/plugins/apps/webserver/iis.py @@ -162,6 +162,7 @@ def access(self) -> Iterator[WebserverAccessLogRecord]: for iis_record in self.logs(): yield WebserverAccessLogRecord( ts=iis_record.ts, + webserver="iis", remote_user=iis_record.username, remote_ip=iis_record.client_ip, method=iis_record.request_method, diff --git a/dissect/target/plugins/apps/webserver/nginx.py b/dissect/target/plugins/apps/webserver/nginx.py index a9bf62764e..2879e2ce21 100644 --- a/dissect/target/plugins/apps/webserver/nginx.py +++ b/dissect/target/plugins/apps/webserver/nginx.py @@ -3,13 +3,16 @@ import json import re from datetime import datetime +from itertools import chain from typing import TYPE_CHECKING from dissect.target.exceptions import UnsupportedPluginError +from dissect.target.helpers.certificate import parse_x509 from dissect.target.helpers.fsutil import open_decompress from dissect.target.plugin import export from dissect.target.plugins.apps.webserver.webserver import ( WebserverAccessLogRecord, + WebserverCertificateRecord, WebserverErrorLogRecord, WebserverHostRecord, WebserverPlugin, @@ -236,6 +239,7 @@ def access(self) -> Iterator[WebserverAccessLogRecord]: yield WebserverAccessLogRecord( ts=ts, + webserver="nginx", bytes_sent=bytes_sent, **log, source=path, @@ -278,6 +282,7 @@ def error(self) -> Iterator[WebserverErrorLogRecord]: yield WebserverErrorLogRecord( ts=ts, + webserver="nginx", **log, source=path, _target=self.target, @@ -291,34 +296,67 @@ def hosts(self) -> Iterator[WebserverHostRecord]: - https://nginx.org/en/docs/http/ngx_http_core_module.html#server """ - def yield_record(current_server: dict) -> Iterator[WebserverHostRecord]: - yield WebserverHostRecord( - ts=host_path.lstat().st_mtime, - server_name=current_server.get("server_name") or current_server.get("listen"), - server_port=current_server.get("listen"), - root_path=current_server.get("root"), - access_log_config=current_server.get("access_log"), - error_log_config=current_server.get("error_log"), - source=host_path, - _target=self.target, - ) - for host_path in self.host_paths: - current_server = {} + server = {} seen_server_directive = False for line in host_path.open("rt"): if "server {" in line: - if current_server: - yield from yield_record(current_server) - current_server = {} + if server: + yield from self.yield_hosts_record(host_path, server) + server = {} seen_server_directive = True elif seen_server_directive: key, _, value = line.strip().partition(" ") - current_server[key] = value.rstrip(";") - - if current_server: - yield from yield_record(current_server) + server[key] = value.rstrip(";").strip() + + if server: + yield from self.yield_hosts_record(host_path, server) + + def yield_hosts_record(self, host_path: Path, server: dict) -> Iterator[WebserverHostRecord]: + yield WebserverHostRecord( + ts=host_path.lstat().st_mtime, + webserver="nginx", + server_name=server.get("server_name") or server.get("listen"), + server_port=server.get("listen", "").replace(" ssl", "") or None, + root_path=server.get("root"), + access_log_config=server.get("access_log"), + error_log_config=server.get("error_log"), + tls_certificate=server.get("ssl_certificate"), + tls_key=server.get("ssl_certificate_key"), + source=host_path, + _target=self.target, + ) + + @export(record=WebserverCertificateRecord) + def certificates(self) -> Iterator[WebserverCertificateRecord]: + """Return found server certificates in the NGINX configuration.""" + certs = set() + + for host in self.hosts(): + # Parse x509 certificate + if host.tls_certificate and (cert_path := self.target.fs.path(host.tls_certificate)).is_file(): + certs.add(cert_path) + + root = self.target.fs.path("/etc/nginx") + for cert_path in chain(root.glob("**/*.crt"), root.glob("**/*.pem")): + if cert_path not in certs: + certs.add(cert_path) + + for cert_path in certs: + try: + cert = parse_x509(cert_path) + yield WebserverCertificateRecord( + ts=cert_path.lstat().st_mtime, + webserver="nginx", + **cert._asdict(), + host=host.server_name, + source=cert_path, + _target=self.target, + ) + except Exception as e: # noqa: PERF203 + self.target.log.warning("Unable to parse certificate %s :%s", cert_path, e) + self.target.log.debug("", exc_info=e) def parse_json_line(line: str) -> dict[str, str] | None: diff --git a/dissect/target/plugins/apps/webserver/webserver.py b/dissect/target/plugins/apps/webserver/webserver.py index b8a5b55559..a549a24f98 100644 --- a/dissect/target/plugins/apps/webserver/webserver.py +++ b/dissect/target/plugins/apps/webserver/webserver.py @@ -2,6 +2,7 @@ from typing import TYPE_CHECKING +from dissect.target.helpers.certificate import COMMON_CERTIFICATE_FIELDS from dissect.target.helpers.record import TargetRecordDescriptor from dissect.target.plugin import NamespacePlugin, export @@ -9,9 +10,10 @@ from collections.abc import Iterator WebserverAccessLogRecord = TargetRecordDescriptor( - "application/log/webserver/access", + "application/webserver/log/access", [ ("datetime", "ts"), + ("string", "webserver"), ("string", "remote_user"), ("net.ipaddress", "remote_ip"), ("net.ipaddress", "local_ip"), @@ -30,9 +32,10 @@ ) WebserverErrorLogRecord = TargetRecordDescriptor( - "application/log/webserver/error", + "application/webserver/log/error", [ ("datetime", "ts"), + ("string", "webserver"), ("net.ipaddress", "remote_ip"), ("varint", "pid"), ("string", "module"), @@ -45,14 +48,28 @@ ) WebserverHostRecord = TargetRecordDescriptor( - "application/log/webserver/host", + "application/webserver/host", [ ("datetime", "ts"), + ("string", "webserver"), ("string", "server_name"), ("varint", "server_port"), ("path", "root_path"), ("path", "access_log_config"), ("path", "error_log_config"), + ("path", "tls_certificate"), + ("path", "tls_key"), + ("path", "source"), + ], +) + +WebserverCertificateRecord = TargetRecordDescriptor( + "application/webserver/host/certificate", + [ + ("datetime", "ts"), + ("string", "webserver"), + *COMMON_CERTIFICATE_FIELDS, + ("string", "host"), ("path", "source"), ], ) diff --git a/tests/_data/plugins/apps/webserver/example.crt b/tests/_data/plugins/apps/webserver/example.crt new file mode 100644 index 0000000000..61f4425201 --- /dev/null +++ b/tests/_data/plugins/apps/webserver/example.crt @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:508e91a1cb9cdfdcbe02b9bfc3d8c4ef219f49ea0a0688abc839b7b199c83c6e +size 1306 diff --git a/tests/_data/plugins/apps/webserver/example.key b/tests/_data/plugins/apps/webserver/example.key new file mode 100644 index 0000000000..fb8c5c7280 --- /dev/null +++ b/tests/_data/plugins/apps/webserver/example.key @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e14c9bf31eb514faef34a451033147d95b8181a4a020efddeb1cd6fe62761f54 +size 1704 diff --git a/tests/plugins/apps/webserver/test_apache.py b/tests/plugins/apps/webserver/test_apache.py index f11b4bc2da..5edb6ad507 100644 --- a/tests/plugins/apps/webserver/test_apache.py +++ b/tests/plugins/apps/webserver/test_apache.py @@ -464,3 +464,45 @@ def test_apache_access_format_malformed_regression(target_unix: Target, fs_unix: assert results[1].useragent is None assert results[1].response_time_ms is None assert results[1].source == "/var/log/apache2/access.log" + + +def test_apache_hosts_certificates(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + """Test if we can parse Apache ``VirtualHost`` certificates.""" + + fs_unix.map_file_fh("/etc/apache2/apache2.conf", BytesIO(b'ServerRoot "/etc/apache2"\n')) + + site = r""" + + ServerName example.com + ServerAlias www.example.com + DocumentRoot /var/www/html + ErrorLog ${APACHE_LOG_DIR}/error.log + CustomLog ${APACHE_LOG_DIR}/access.log combined + SSLEngine on + SSLCertificateFile /path/to/cert.crt + SSLCertificateKeyFile /path/to/cert.key + + """ + fs_unix.map_file_fh("/etc/apache2/sites-available/example.conf", BytesIO(textwrap.dedent(site).encode())) + fs_unix.map_file("/path/to/cert.crt", absolute_path("_data/plugins/apps/webserver/example.crt")) + fs_unix.map_file("/path/to/cert.key", absolute_path("_data/plugins/apps/webserver/example.key")) + + # Map a default location too + fs_unix.map_file("/etc/apache2/ssl/example/cert.crt", absolute_path("_data/plugins/apps/webserver/example.crt")) + fs_unix.map_file("/etc/apache2/ssl/example/cert.key", absolute_path("_data/plugins/apps/webserver/example.key")) + + target_unix.add_plugin(ApachePlugin) + + records = list(target_unix.apache.certificates()) + assert len(records) == 2 + + assert records[0].webserver == "apache" + assert records[0].fingerprint.md5 == "a218ac9b6dbdaa8b23658c4d18c1cfc1" + assert records[0].fingerprint.sha1 == "6566d8ebea1feb4eb3d12d9486cddb69e4e9e827" + assert records[0].fingerprint.sha256 == "7221d881743505f13b7bfe854bdf800d7f0cd22d34307ed7157808a295299471" + assert records[0].serial_number == 21067204948278457910649605551283467908287726794 + assert records[0].not_valid_before == datetime(2025, 11, 27, 15, 31, 20, tzinfo=timezone.utc) + assert records[0].not_valid_after == datetime(2026, 11, 27, 15, 31, 20, tzinfo=timezone.utc) + assert records[0].issuer_dn == "C=AU,ST=Some-State,O=Internet Widgits Pty Ltd,CN=example.com" + assert records[0].host == "example.com" + assert records[0].source == "/etc/apache2/ssl/example/cert.crt" diff --git a/tests/plugins/apps/webserver/test_nginx.py b/tests/plugins/apps/webserver/test_nginx.py index 8f8b5b380a..de5a77b37a 100644 --- a/tests/plugins/apps/webserver/test_nginx.py +++ b/tests/plugins/apps/webserver/test_nginx.py @@ -241,3 +241,37 @@ def test_nginx_parse_config(target_unix: Target, fs_unix: VirtualFilesystem) -> assert records[1].access_log_config == "/eighty/access.log" assert not records[1].error_log_config assert records[1].source == "/more/confs/one.conf" + + +def test_nginx_host_certificate(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + """Test if we can parse NGINX host TLS certificates.""" + + bar_conf = """ + server { + listen 443 ssl; + ssl_certificate /path/to/cert.pem; + ssl_certificate_key /path/to/key.pem; + server_name example.com; + root /path/to/www; + } + """ + fs_unix.map_file_fh("/etc/nginx/sites-enabled/example.com.conf", BytesIO(textwrap.dedent(bar_conf).encode())) + fs_unix.map_file("/path/to/cert.pem", absolute_path("_data/plugins/apps/webserver/example.crt")) + fs_unix.map_file("/path/to/key.pem", absolute_path("_data/plugins/apps/webserver/example.key")) + target_unix.add_plugin(NginxPlugin) + + records = list(target_unix.nginx.hosts()) + assert len(records) == 1 + assert records[0].server_name == "example.com" + assert records[0].server_port == 443 + assert records[0].tls_certificate == "/path/to/cert.pem" + + records = list(target_unix.nginx.certificates()) + assert len(records) == 1 + + # x509 + assert records[0].fingerprint.md5 == "a218ac9b6dbdaa8b23658c4d18c1cfc1" + assert records[0].fingerprint.sha1 == "6566d8ebea1feb4eb3d12d9486cddb69e4e9e827" + assert records[0].fingerprint.sha256 == "7221d881743505f13b7bfe854bdf800d7f0cd22d34307ed7157808a295299471" + assert records[0].serial_number == 21067204948278457910649605551283467908287726794 + assert records[0].issuer_dn == "C=AU,ST=Some-State,O=Internet Widgits Pty Ltd,CN=example.com" From 6e7d693cf33acdb85061c19bddfb3fe90d1e0b8d Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Thu, 4 Dec 2025 13:16:32 +0100 Subject: [PATCH 2/6] implement review feedback --- .../target/plugins/apps/webserver/nginx.py | 35 ++++++++++--------- tests/plugins/apps/webserver/test_apache.py | 2 +- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/dissect/target/plugins/apps/webserver/nginx.py b/dissect/target/plugins/apps/webserver/nginx.py index 70358329b8..585333b24e 100644 --- a/dissect/target/plugins/apps/webserver/nginx.py +++ b/dissect/target/plugins/apps/webserver/nginx.py @@ -311,7 +311,7 @@ def hosts(self) -> Iterator[WebserverHostRecord]: for line in host_path.open("rt"): if "server {" in line: if server: - yield from self.yield_hosts_record(host_path, server) + yield construct_hosts_record(self.target, host_path, server) server = {} seen_server_directive = True @@ -320,22 +320,7 @@ def hosts(self) -> Iterator[WebserverHostRecord]: server[key] = value.rstrip(";").strip() if server: - yield from self.yield_hosts_record(host_path, server) - - def yield_hosts_record(self, host_path: Path, server: dict) -> Iterator[WebserverHostRecord]: - yield WebserverHostRecord( - ts=host_path.lstat().st_mtime, - webserver="nginx", - server_name=server.get("server_name") or server.get("listen"), - server_port=server.get("listen", "").replace(" ssl", "") or None, - root_path=server.get("root"), - access_log_config=server.get("access_log"), - error_log_config=server.get("error_log"), - tls_certificate=server.get("ssl_certificate"), - tls_key=server.get("ssl_certificate_key"), - source=host_path, - _target=self.target, - ) + yield construct_hosts_record(self.target, host_path, server) @export(record=WebserverCertificateRecord) def certificates(self) -> Iterator[WebserverCertificateRecord]: @@ -368,6 +353,22 @@ def certificates(self) -> Iterator[WebserverCertificateRecord]: self.target.log.debug("", exc_info=e) +def construct_hosts_record(target: Target, host_path: Path, server: dict) -> WebserverHostRecord: + return WebserverHostRecord( + ts=host_path.lstat().st_mtime, + webserver="nginx", + server_name=server.get("server_name") or server.get("listen"), + server_port=server.get("listen", "").replace(" ssl", "") or None, + root_path=server.get("root"), + access_log_config=server.get("access_log"), + error_log_config=server.get("error_log"), + tls_certificate=server.get("ssl_certificate"), + tls_key=server.get("ssl_certificate_key"), + source=host_path, + _target=target, + ) + + def parse_json_line(line: str) -> dict[str, str] | None: """Attempt to parse a default NGINX JSON log line. diff --git a/tests/plugins/apps/webserver/test_apache.py b/tests/plugins/apps/webserver/test_apache.py index 5edb6ad507..0e2b918bba 100644 --- a/tests/plugins/apps/webserver/test_apache.py +++ b/tests/plugins/apps/webserver/test_apache.py @@ -493,7 +493,7 @@ def test_apache_hosts_certificates(target_unix: Target, fs_unix: VirtualFilesyst target_unix.add_plugin(ApachePlugin) - records = list(target_unix.apache.certificates()) + records = sorted(target_unix.apache.certificates(), key=lambda r: r.source) assert len(records) == 2 assert records[0].webserver == "apache" From edf254899561d686be7af65eb71cfd63491fe33e Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:26:36 +0100 Subject: [PATCH 3/6] rename (untested) ual field to prevent field type conflict --- dissect/target/plugins/os/windows/ual.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dissect/target/plugins/os/windows/ual.py b/dissect/target/plugins/os/windows/ual.py index 651c1a53c6..6abc16b291 100644 --- a/dissect/target/plugins/os/windows/ual.py +++ b/dissect/target/plugins/os/windows/ual.py @@ -53,7 +53,7 @@ ("datetime", "last_seen_active_date"), ("string", "vm_guid"), ("string", "bios_guid"), - ("string", "serial_number"), + ("string", "serial"), ("string", "path"), ], ) @@ -129,7 +129,7 @@ "ProductName": "product_name", "RoleGuid": "role_guid", "RoleName": "role_name", - "SerialNumber": "serial_number", + "SerialNumber": "serial", "ServicePackMajor": "service_pack_major_version", "ServicePackMinor": "service_pack_minor_version", "SystemDNSHostName": "system_dns_hostname", From 97ca4ae05296431042646ef15adecfa255b6ab93 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Thu, 11 Dec 2025 14:16:52 +0100 Subject: [PATCH 4/6] guard asn1 import --- dissect/target/helpers/certificate.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/dissect/target/helpers/certificate.py b/dissect/target/helpers/certificate.py index dec92fee9c..a9b799a468 100755 --- a/dissect/target/helpers/certificate.py +++ b/dissect/target/helpers/certificate.py @@ -5,9 +5,17 @@ import hashlib from pathlib import Path -from asn1crypto import pem, x509 from flow.record import RecordDescriptor +try: + from asn1crypto import pem, x509 + + HAS_ASN1 = True + +except ImportError: + HAS_ASN1 = False + + COMMON_CERTIFICATE_FIELDS = [ ("digest", "fingerprint"), ("varint", "serial_number"), @@ -82,6 +90,9 @@ def parse_x509(file: str | bytes | Path) -> CertificateRecord: else: raise TypeError("Parameter file is not of type str, bytes or Path") + if not HAS_ASN1: + raise ValueError("Missing asn1crypto dependency") + md5, _, _ = compute_pem_fingerprints(content.decode()) _, _, der = pem.unarmor(content) crt = x509.Certificate.load(der) From 1f9b66a53c477d75c9754b4aacd7c62c37ef22ec Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Thu, 11 Dec 2025 15:49:32 +0100 Subject: [PATCH 5/6] workaround for serial_number varint field --- dissect/target/plugins/os/windows/certlog.py | 4 ++-- tests/plugins/os/windows/test_certlog.py | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dissect/target/plugins/os/windows/certlog.py b/dissect/target/plugins/os/windows/certlog.py index 9f9746c84d..e871e870c5 100644 --- a/dissect/target/plugins/os/windows/certlog.py +++ b/dissect/target/plugins/os/windows/certlog.py @@ -57,7 +57,7 @@ ("string", "organization"), ("string", "organizational_unit"), ("string", "public_key_algorithm"), - ("string", "serial_number"), + ("string", "serial_number_hex"), ("string", "state_or_province"), ("string", "street_address"), ("string", "subject_key_identifier"), @@ -193,7 +193,7 @@ "$PublicKeyAlgorithm": "public_key_algorithm", "$RequestAttributes": "request_attributes", "$RequesterName": "requester_name", - "$SerialNumber": "serial_number", + "$SerialNumber": "serial_number_hex", "$SignerApplicationPolicies": "signer_application_policies", "$SignerPolicies": "signer_policies", "$StateOrProvince": "state_or_province", diff --git a/tests/plugins/os/windows/test_certlog.py b/tests/plugins/os/windows/test_certlog.py index 803694fc7e..1ebacfbbc2 100644 --- a/tests/plugins/os/windows/test_certlog.py +++ b/tests/plugins/os/windows/test_certlog.py @@ -20,9 +20,14 @@ def test_certlog_plugin(target_win: Target, fs_win: VirtualFilesystem) -> None: assert len(list(target_win.certlog.requests())) == 11 assert len(list(target_win.certlog.request_attributes())) == 26 assert len(list(target_win.certlog.crls())) == 2 - assert len(list(target_win.certlog.certificates())) == 11 + certificates = list(target_win.certlog.certificates()) + assert len(certificates) == 11 assert len(list(target_win.certlog.certificate_extensions())) == 92 + assert certificates[0].serial_number_hex == "1169f0517d9b598e4ba7af46e4674066" + assert int(certificates[0].serial_number_hex, 16) == 23146941333149199441888068127529844838 + # TODO: Further certificate normalization, see https://github.com/fox-it/dissect.target/issues/1452 + def test_certlog_plugin_direct() -> None: ca_edb = absolute_path("_data/plugins/os/windows/certlog/SEVENKINGDOMS-CA.edb") From 82f9193220dfd4bd7cd89a4ff81a391f1b6ac4e2 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Mon, 12 Jan 2026 14:58:36 +0100 Subject: [PATCH 6/6] Fix paths on Windows --- dissect/target/plugins/apps/webserver/apache.py | 15 +++++++++------ dissect/target/plugins/apps/webserver/nginx.py | 15 +++++++++------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/dissect/target/plugins/apps/webserver/apache.py b/dissect/target/plugins/apps/webserver/apache.py index fc58c0c636..664dfc94d9 100644 --- a/dissect/target/plugins/apps/webserver/apache.py +++ b/dissect/target/plugins/apps/webserver/apache.py @@ -9,7 +9,7 @@ from dissect.target.exceptions import FileNotFoundError, UnsupportedPluginError from dissect.target.helpers.certificate import parse_x509 -from dissect.target.helpers.fsutil import open_decompress +from dissect.target.helpers.fsutil import TargetPath, open_decompress from dissect.target.plugin import OperatingSystem, export from dissect.target.plugins.apps.webserver.webserver import ( WebserverAccessLogRecord, @@ -499,6 +499,9 @@ def hosts(self) -> Iterator[WebserverHostRecord]: - https://httpd.apache.org/docs/2.4/mod/core.html#virtualhost """ + def _map_path(path: str | None) -> TargetPath: + return self.target.fs.path(path) if path else None + for path in self.virtual_hosts: # A configuration file can contain multiple VirtualHost directives. vhost = {} @@ -518,11 +521,11 @@ def hosts(self) -> Iterator[WebserverHostRecord]: webserver=self.__namespace__, server_name=vhost.get("servername") or vhost.get("addr"), server_port=vhost.get("port"), - root_path=vhost.get("documentroot"), - access_log_config=vhost.get("customlog", "").rpartition(" ")[0], - error_log_config=vhost.get("errorlog"), - tls_certificate=vhost.get("sslcertificatefile"), - tls_key=vhost.get("sslcertificatekeyfile"), + root_path=_map_path(vhost.get("documentroot")), + access_log_config=_map_path(vhost.get("customlog", "").rpartition(" ")[0]), + error_log_config=_map_path(vhost.get("errorlog")), + tls_certificate=_map_path(vhost.get("sslcertificatefile")), + tls_key=_map_path(vhost.get("sslcertificatekeyfile")), source=path, _target=self.target, ) diff --git a/dissect/target/plugins/apps/webserver/nginx.py b/dissect/target/plugins/apps/webserver/nginx.py index 585333b24e..0d53f705dd 100644 --- a/dissect/target/plugins/apps/webserver/nginx.py +++ b/dissect/target/plugins/apps/webserver/nginx.py @@ -8,7 +8,7 @@ from dissect.target.exceptions import UnsupportedPluginError from dissect.target.helpers.certificate import parse_x509 -from dissect.target.helpers.fsutil import open_decompress +from dissect.target.helpers.fsutil import TargetPath, open_decompress from dissect.target.plugin import export from dissect.target.plugins.apps.webserver.webserver import ( WebserverAccessLogRecord, @@ -354,16 +354,19 @@ def certificates(self) -> Iterator[WebserverCertificateRecord]: def construct_hosts_record(target: Target, host_path: Path, server: dict) -> WebserverHostRecord: + def _map_path(path: str | None) -> TargetPath: + return target.fs.path(path) if path else None + return WebserverHostRecord( ts=host_path.lstat().st_mtime, webserver="nginx", server_name=server.get("server_name") or server.get("listen"), server_port=server.get("listen", "").replace(" ssl", "") or None, - root_path=server.get("root"), - access_log_config=server.get("access_log"), - error_log_config=server.get("error_log"), - tls_certificate=server.get("ssl_certificate"), - tls_key=server.get("ssl_certificate_key"), + root_path=_map_path(server.get("root")), + access_log_config=_map_path(server.get("access_log")), + error_log_config=_map_path(server.get("error_log")), + tls_certificate=_map_path(server.get("ssl_certificate")), + tls_key=_map_path(server.get("ssl_certificate_key")), source=host_path, _target=target, )