diff --git a/dissect/target/helpers/configutil.py b/dissect/target/helpers/configutil.py index 9c4ffcad51..7005059eff 100644 --- a/dissect/target/helpers/configutil.py +++ b/dissect/target/helpers/configutil.py @@ -111,6 +111,7 @@ def __init__( collapse_inverse: bool = False, separator: tuple[str] = ("=",), comment_prefixes: tuple[str] = (";", "#"), + namespace: str | None = None, ) -> None: self.collapse_all = collapse is True self.collapse = set(collapse) if isinstance(collapse, Iterable) else set() @@ -118,6 +119,7 @@ def __init__( self.separator = separator self.comment_prefixes = comment_prefixes + self.namespace = namespace self.parsed_data = {} def __getitem__(self, item: Any) -> dict | str: @@ -358,35 +360,43 @@ class Xml(ConfigurationParser): """Parses an XML file. Ignores any constructor parameters passed from ``ConfigurationParser``.""" def _tree(self, tree: ElementTree, root: bool = False) -> dict: - """Very simple but robust xml -> dict implementation, see comments.""" + """XML to dictionary parser.""" nodes = {} result = {} - counter = {} - # each node is a folder (so the structure is always the same! [1]) for node in tree.findall("*"): - # if a node contains multiple nodes with the same name, number them - if node.tag in counter: - counter[node.tag] += 1 - nodes[f"{node.tag}-{counter[node.tag]}"] = self._tree(node) + # Remove the XML namespace prefix from the node tag name + if self.namespace and node.tag.startswith(self.namespace): + node.tag = node.tag.removeprefix(self.namespace) + + # If a node contains multiple nodes with the same name, we turn that node into a list of trees + if node.tag in nodes: + if not isinstance(nodes[node.tag], list): + nodes[node.tag] = [nodes[node.tag]] + nodes[node.tag].append(self._tree(node)) else: - counter[node.tag] = 1 nodes[node.tag] = self._tree(node) - # all attribs go in the attribute folder - # (i.e. stable, does not change depending on xml structure! [2] - # Also, this way we "know" they have been attributes, i.e. we don't lose information! [3] + # All tree attributes go in the __attributes__ dunder dictionary of the result if tree.attrib: - result["attributes"] = tree.attrib + result["__attributes__"] = tree.attrib - # all subnodes go in the nodes folder + # All subnodes go directly in the dictionary with their node name if nodes: - result["nodes"] = nodes + result.update(nodes) - # content goes into the text folder - # we don't use special prefixes ($) because XML docs may use them anyway (even though they are forbidden) + # Any additional tree content goes into the __text__ dunder attribute if tree.text and (text := tree.text.strip(" \n\r")): - result["text"] = text + # If this node does not contain attributes or other nodes, set the value directly + # instead of setting the __text__ dunder value. + if not nodes and not tree.attrib: + result = text + else: + result["__text__"] = text + + # Remove the XML namespace from the tree tag too + if self.namespace and tree.tag.startswith(self.namespace): + tree.tag = tree.tag.removeprefix(self.namespace) # if you need to store meta-data, you can extend add more entries here... CDATA, Comments, errors return {tree.tag: result} if root else result @@ -420,7 +430,7 @@ def parse_file(self, fh: TextIO) -> None: if not tree: # Error limit reached. Thus we consider the document not parseable. - raise ConfigurationParsingError(f"Could not parse XML file: {fh.name} after {errors} attempts.") + raise ConfigurationParsingError(f"Could not parse XML file: {getattr(fh, 'name', fh)}") self.parsed_data = tree @@ -915,6 +925,7 @@ class ParserOptions: collapse_inverse: bool | None = None separator: tuple[str] | None = None comment_prefixes: tuple[str] | None = None + namespace: str | None = None @dataclass(frozen=True) @@ -924,12 +935,13 @@ class ParserConfig: collapse_inverse: bool | None = None separator: tuple[str] | None = None comment_prefixes: tuple[str] | None = None + namespace: str | None = None fields: tuple[str, ...] | None = None def create_parser(self, options: ParserOptions | None = None) -> ConfigurationParser: kwargs = {} - for field_name in ["collapse", "collapse_inverse", "separator", "comment_prefixes", "fields"]: + for field_name in ["collapse", "collapse_inverse", "separator", "comment_prefixes", "namespace", "fields"]: value = getattr(options, field_name, None) or getattr(self, field_name) if value: kwargs.update({field_name: value}) diff --git a/dissect/target/helpers/record.py b/dissect/target/helpers/record.py index d2821a2b1d..38a0df8d24 100644 --- a/dissect/target/helpers/record.py +++ b/dissect/target/helpers/record.py @@ -143,7 +143,7 @@ def DynamicDescriptor(types: Sequence[str]) -> RecordDescriptor: ("string", "gecos"), ("path", "home"), ("string", "shell"), - ("string", "source"), + ("path", "source"), ] UnixUserRecord = TargetRecordDescriptor( diff --git a/dissect/target/plugins/os/unix/linux/redhat/epmm/__init__.py b/dissect/target/plugins/os/unix/linux/redhat/epmm/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dissect/target/plugins/os/unix/linux/redhat/epmm/_os.py b/dissect/target/plugins/os/unix/linux/redhat/epmm/_os.py new file mode 100644 index 0000000000..201561cdcf --- /dev/null +++ b/dissect/target/plugins/os/unix/linux/redhat/epmm/_os.py @@ -0,0 +1,157 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from dissect.target.exceptions import ConfigurationParsingError +from dissect.target.helpers import configutil +from dissect.target.helpers.record import TargetRecordDescriptor, UnixUserRecord +from dissect.target.plugin import export +from dissect.target.plugins.os.unix.linux.redhat._os import RedHatPlugin + +if TYPE_CHECKING: + from collections.abc import Iterator + + from dissect.target.filesystem import Filesystem + from dissect.target.target import Target + + +EPMMUserRecord = TargetRecordDescriptor( + "epmm/user", + [ + ("string", "name"), + ("string", "password"), + ("string", "gecos"), + ("string[]", "groups"), + ("string[]", "roles"), + ("path", "source"), + ], +) + + +class IvantiEpmmPlugin(RedHatPlugin): + """Ivanti Endpoint Protect Mobile Manager (EPMM) (previously Mobile Iron Core) OS plugin.""" + + DETECT_PATHS = ( + "/mi/release", + "/mi/config-system", + "/var/log/mi.log", + ) + + def __init__(self, target: Target): + super().__init__(target) + self.config = self._parse_config() + + def _parse_config(self) -> dict: + """Mobile Iron stores configuration data in XML and XSL files in the ``/mi/config-system`` directory. + + Currently we do not parse /mi/tomcat-properties/configurationService.properties -> configuration.directory. + """ + config = {} + for file in ("system", "identity", "antivirus", "debug", "filesystem", "ipsec"): + if (path := self.target.fs.path(f"/mi/config-system/startup_config/{file}config.xml")).is_file(): + try: + config[file] = configutil.parse( + path, hint="xml", namespace=rf"{{http://xsdobjects.mi.com/{file}conf}}" + ) + except ConfigurationParsingError as e: + self.target.log.warning("Unable to parse file %s: %s", path, e) + return config + + @classmethod + def detect(cls, target: Target) -> Filesystem | None: + for fs in target.filesystems: + for path in cls.DETECT_PATHS: + if fs.exists(path): + return fs + return None + + @export(property=True) + def hostname(self) -> str: + """Return the configured hostname.""" + try: + return self.config["system"]["configuration"]["system"]["hostname"]["hname"] + except KeyError: + return super().hostname + + @export(property=True) + def domain(self) -> str: + """Return the configured (search) domain.""" + try: + return self.config["system"]["configuration"]["system"]["dnsname"]["domainname"] + except KeyError: + return super().domain + + @export(property=True) + def ips(self) -> list[str]: + """Return the configured IP address(es).""" + try: + # Parse all configured interfaces + interfaces = self.config["system"]["configuration"]["system"]["interface"] + ips = [] + for ip_type in ("ipaddress", "ip6address"): + ips.extend([ip for iface in interfaces if (ip := iface.get(ip_type)) not in ("0.0.0.0", None)]) + except KeyError: + pass + + # Fall back to generic RedHat/Linux parsing if no ips were found + return ips or super().ips + + @export(property=True) + def dns(self) -> list[str] | None: + """Return the configured DNS servers.""" + try: + return [item.get("ipaddress") for item in self.config["system"]["configuration"]["system"]["dns"]] + except KeyError: + pass + + @export(property=True) + def gateway(self) -> list[str] | None: + """Return list of configured gateway ip addresses.""" + routes = self.config["system"]["configuration"]["system"]["route"] + if not isinstance(routes, list): + routes = [routes] + try: + return [r.get("gateway") for r in routes] + except KeyError: + pass + + @export(property=True) + def version(self) -> str: + """Return the Ivanti EPMM build version string.""" + mi_version = ( + rel.read_text().strip() if (rel := self.target.fs.path("/mi/release")).is_file() else "unknown build" + ) + sys_version = super().version or "unknown Linux version" + return f"Ivanti EPMM {mi_version} ({sys_version})" + + @export(record=UnixUserRecord) + def users(self) -> Iterator[EPMMUserRecord | UnixUserRecord]: + """Yield Ivanti EPMM user records from identityconfig.xml and unix user records from /etc/passwd.""" + # Yield unix-like users from /etc/passwd. + yield from super().users() + + # Yield EPMM configured identities. + try: + identity = self.config["identity"]["configuration"]["identity"] + users = identity["user"] + roles = identity["roles"] + except KeyError: + pass + + if not isinstance(users, list): + users = [users] + + if not isinstance(roles, list): + roles = [roles] + + for user in users: + full_name = f"{user.get('firstname')} {user.get('lastname')}".strip() + yield EPMMUserRecord( + name=user.get("principal"), + password=user.get("passwordHashSHA512"), + gecos=f"{full_name},,,,{user.get('email')}", + groups=[user.get("group")], + roles=next(r["role"] for r in roles if r["principal"] == user["principal"]), + source="/mi/config-system/startup_config/identityconfig.xml", + _target=self.target, + ) diff --git a/dissect/target/tools/shell.py b/dissect/target/tools/shell.py index 6a4eca480b..5c87897ccb 100644 --- a/dissect/target/tools/shell.py +++ b/dissect/target/tools/shell.py @@ -1489,7 +1489,7 @@ def extend_args(args: argparse.Namespace, func: Callable) -> argparse.Namespace: def _target_name(target: Target) -> str: """Return a printable FQDN target name for cmd.Cmd base prompts.""" - if target.has_function("domain") and target.domain: + if target.has_function("domain") and target.domain is not None and not target.name.endswith(target.domain): return escape_str(f"{target.name}.{target.domain}") return escape_str(target.name) diff --git a/tests/_data/plugins/os/unix/linux/redhat/epmm/identityconfig.xml b/tests/_data/plugins/os/unix/linux/redhat/epmm/identityconfig.xml new file mode 100644 index 0000000000..c8754c4e2d --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/redhat/epmm/identityconfig.xml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5ff1068c7d97878f514cad7708c5128abc481ac38dbf6d42d70b6eeeff08bace +size 1183 diff --git a/tests/_data/plugins/os/unix/linux/redhat/epmm/systemconfig.xml b/tests/_data/plugins/os/unix/linux/redhat/epmm/systemconfig.xml new file mode 100644 index 0000000000..8e4a19363d --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/redhat/epmm/systemconfig.xml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:db17cf3ad2af3a9fd130cf3889dde7654ef75f25d153e434d6f6b4bfb0749828 +size 1210 diff --git a/tests/helpers/test_configutil.py b/tests/helpers/test_configutil.py index 06ebb7dfee..dbd5fce777 100644 --- a/tests/helpers/test_configutil.py +++ b/tests/helpers/test_configutil.py @@ -18,6 +18,7 @@ Leases, ScopeManager, SystemD, + Xml, parse, ) from tests._utils import absolute_path @@ -403,3 +404,44 @@ def test_leases_parser(string_data: str, expected_output: dict) -> None: parser.parse_file(StringIO(string_data)) assert parser.parsed_data == expected_output + + +@pytest.mark.parametrize( + ("input", "namespace", "expected_output"), + [ + pytest.param( + """\ + + + + value + + + bar-1 + + + bar-2 + + + """, + "{http://example.com/example}", + { + "example": { + "bar": [ + {"attr": "bar-1"}, + {"attr": "bar-2"}, + ], + "foo": { + "attr": "value", + }, + } + }, + id="xml-without-attributes", + ) + ], +) +def test_xml_parser(input: str, namespace: str, expected_output: dict) -> None: + """Test the XML config parser.""" + parser = Xml(namespace=namespace) + parser.parse_file(StringIO(textwrap.dedent(input))) + assert parser.parsed_data == expected_output diff --git a/tests/plugins/os/unix/linux/redhat/epmm/__init__.py b/tests/plugins/os/unix/linux/redhat/epmm/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/plugins/os/unix/linux/redhat/epmm/test__os.py b/tests/plugins/os/unix/linux/redhat/epmm/test__os.py new file mode 100644 index 0000000000..70f25c43e7 --- /dev/null +++ b/tests/plugins/os/unix/linux/redhat/epmm/test__os.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from io import BytesIO +from typing import TYPE_CHECKING + +import pytest + +from dissect.target.filesystem import VirtualFilesystem +from dissect.target.plugins.os.unix.linux.redhat.epmm._os import IvantiEpmmPlugin +from tests._utils import absolute_path +from tests.conftest import make_os_target + +if TYPE_CHECKING: + from pathlib import Path + + from dissect.target.target import Target + + +@pytest.fixture +def fs_epmm() -> VirtualFilesystem: + fs = VirtualFilesystem() + fs.map_file_fh( + "etc/os-release", + BytesIO(b"Oracle Linux Server 13.37"), + ) + fs.map_file_fh( + "mi/release", + BytesIO(b"Example Standalone 1.2.3 Build 4 (Branch example-1.2.3-example-release)"), + ) + fs.map_file( + "mi/config-system/startup_config/systemconfig.xml", + absolute_path("_data/plugins/os/unix/linux/redhat/epmm/systemconfig.xml"), + ) + fs.map_file( + "mi/config-system/startup_config/identityconfig.xml", + absolute_path("_data/plugins/os/unix/linux/redhat/epmm/identityconfig.xml"), + ) + return fs + + +@pytest.fixture +def target_epmm(tmp_path: Path, fs_epmm: VirtualFilesystem) -> Target: + return make_os_target(tmp_path, IvantiEpmmPlugin, root_fs=fs_epmm) + + +def test_ivanti_epmm(target_epmm: Target, fs_epmm: VirtualFilesystem) -> None: + """Test if we can detect and parse system configuration of Ivanti EPMM (Mobile Iron Core).""" + assert target_epmm.os == "linux" + assert ( + target_epmm.version + == "Ivanti EPMM Example Standalone 1.2.3 Build 4 (Branch example-1.2.3-example-release) (Oracle Linux Server 13.37)" # noqa: E501 + ) + + assert target_epmm.hostname == "epmm.example.com" + assert target_epmm.domain == "example.com" + assert target_epmm.ips == ["1.2.3.4"] + + user = next(target_epmm.users()) + assert user.name == "username" + assert user.password == "$6$...$..." + assert user.gecos == "First Last,,,,username@example.com" + assert user.groups == ["DEFAULT"] + assert user.roles == ["ROLE_EXAMPLE_1", "ROLE_EXAMPLE_2"] + assert user.source == "/mi/config-system/startup_config/identityconfig.xml"