Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions dissect/target/helpers/configutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ def __init__(
collapse_inverse: bool = False,
separator: tuple[str] = ("=",),
comment_prefixes: tuple[str] = (";", "#"),
namespace: str | None = None,
) -> None:
self.collapse_all = collapse is True
self.collapse = set(collapse) if isinstance(collapse, Iterable) else set()
self._collapse_check = self._key_not_in_collapse if collapse_inverse else self._key_in_collapse

self.separator = separator
self.comment_prefixes = comment_prefixes
self.namespace = namespace
self.parsed_data = {}

def __getitem__(self, item: Any) -> dict | str:
Expand Down Expand Up @@ -358,35 +360,43 @@ class Xml(ConfigurationParser):
"""Parses an XML file. Ignores any constructor parameters passed from ``ConfigurationParser``."""

def _tree(self, tree: ElementTree, root: bool = False) -> dict:
"""Very simple but robust xml -> dict implementation, see comments."""
"""XML to dictionary parser."""
nodes = {}
result = {}
counter = {}

# each node is a folder (so the structure is always the same! [1])
for node in tree.findall("*"):
# if a node contains multiple nodes with the same name, number them
if node.tag in counter:
counter[node.tag] += 1
nodes[f"{node.tag}-{counter[node.tag]}"] = self._tree(node)
# Remove the XML namespace prefix from the node tag name
if self.namespace and node.tag.startswith(self.namespace):
node.tag = node.tag.removeprefix(self.namespace)

# If a node contains multiple nodes with the same name, we turn that node into a list of trees
if node.tag in nodes:
if not isinstance(nodes[node.tag], list):
nodes[node.tag] = [nodes[node.tag]]
nodes[node.tag].append(self._tree(node))
else:
counter[node.tag] = 1
nodes[node.tag] = self._tree(node)

# all attribs go in the attribute folder
# (i.e. stable, does not change depending on xml structure! [2]
# Also, this way we "know" they have been attributes, i.e. we don't lose information! [3]
# All tree attributes go in the __attributes__ dunder dictionary of the result
if tree.attrib:
result["attributes"] = tree.attrib
result["__attributes__"] = tree.attrib

# all subnodes go in the nodes folder
# All subnodes go directly in the dictionary with their node name
if nodes:
result["nodes"] = nodes
result.update(nodes)

# content goes into the text folder
# we don't use special prefixes ($) because XML docs may use them anyway (even though they are forbidden)
# Any additional tree content goes into the __text__ dunder attribute
if tree.text and (text := tree.text.strip(" \n\r")):
result["text"] = text
# If this node does not contain attributes or other nodes, set the value directly
# instead of setting the __text__ dunder value.
if not nodes and not tree.attrib:
result = text
else:
result["__text__"] = text

# Remove the XML namespace from the tree tag too
if self.namespace and tree.tag.startswith(self.namespace):
tree.tag = tree.tag.removeprefix(self.namespace)

# if you need to store meta-data, you can extend add more entries here... CDATA, Comments, errors
return {tree.tag: result} if root else result
Expand Down Expand Up @@ -420,7 +430,7 @@ def parse_file(self, fh: TextIO) -> None:

if not tree:
# Error limit reached. Thus we consider the document not parseable.
raise ConfigurationParsingError(f"Could not parse XML file: {fh.name} after {errors} attempts.")
raise ConfigurationParsingError(f"Could not parse XML file: {getattr(fh, 'name', fh)}")

self.parsed_data = tree

Expand Down Expand Up @@ -915,6 +925,7 @@ class ParserOptions:
collapse_inverse: bool | None = None
separator: tuple[str] | None = None
comment_prefixes: tuple[str] | None = None
namespace: str | None = None


@dataclass(frozen=True)
Expand All @@ -924,12 +935,13 @@ class ParserConfig:
collapse_inverse: bool | None = None
separator: tuple[str] | None = None
comment_prefixes: tuple[str] | None = None
namespace: str | None = None
fields: tuple[str, ...] | None = None

def create_parser(self, options: ParserOptions | None = None) -> ConfigurationParser:
kwargs = {}

for field_name in ["collapse", "collapse_inverse", "separator", "comment_prefixes", "fields"]:
for field_name in ["collapse", "collapse_inverse", "separator", "comment_prefixes", "namespace", "fields"]:
value = getattr(options, field_name, None) or getattr(self, field_name)
if value:
kwargs.update({field_name: value})
Expand Down
2 changes: 1 addition & 1 deletion dissect/target/helpers/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def DynamicDescriptor(types: Sequence[str]) -> RecordDescriptor:
("string", "gecos"),
("path", "home"),
("string", "shell"),
("string", "source"),
("path", "source"),
]

UnixUserRecord = TargetRecordDescriptor(
Expand Down
Empty file.
157 changes: 157 additions & 0 deletions dissect/target/plugins/os/unix/linux/redhat/epmm/_os.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from dissect.target.exceptions import ConfigurationParsingError
from dissect.target.helpers import configutil
from dissect.target.helpers.record import TargetRecordDescriptor, UnixUserRecord
from dissect.target.plugin import export
from dissect.target.plugins.os.unix.linux.redhat._os import RedHatPlugin

if TYPE_CHECKING:
from collections.abc import Iterator

from dissect.target.filesystem import Filesystem
from dissect.target.target import Target


EPMMUserRecord = TargetRecordDescriptor(
"epmm/user",
[
("string", "name"),
("string", "password"),
("string", "gecos"),
("string[]", "groups"),
("string[]", "roles"),
("path", "source"),
],
)


class IvantiEpmmPlugin(RedHatPlugin):
"""Ivanti Endpoint Protect Mobile Manager (EPMM) (previously Mobile Iron Core) OS plugin."""

DETECT_PATHS = (
"/mi/release",
"/mi/config-system",
"/var/log/mi.log",
)

def __init__(self, target: Target):
super().__init__(target)
self.config = self._parse_config()

def _parse_config(self) -> dict:
"""Mobile Iron stores configuration data in XML and XSL files in the ``/mi/config-system`` directory.

Currently we do not parse /mi/tomcat-properties/configurationService.properties -> configuration.directory.
"""
config = {}
for file in ("system", "identity", "antivirus", "debug", "filesystem", "ipsec"):
if (path := self.target.fs.path(f"/mi/config-system/startup_config/{file}config.xml")).is_file():
try:
config[file] = configutil.parse(
path, hint="xml", namespace=rf"{{http://xsdobjects.mi.com/{file}conf}}"
)
except ConfigurationParsingError as e:
self.target.log.warning("Unable to parse file %s: %s", path, e)
return config

@classmethod
def detect(cls, target: Target) -> Filesystem | None:
for fs in target.filesystems:
for path in cls.DETECT_PATHS:
if fs.exists(path):
return fs
return None

@export(property=True)
def hostname(self) -> str:
"""Return the configured hostname."""
try:
return self.config["system"]["configuration"]["system"]["hostname"]["hname"]
except KeyError:
return super().hostname

@export(property=True)
def domain(self) -> str:
"""Return the configured (search) domain."""
try:
return self.config["system"]["configuration"]["system"]["dnsname"]["domainname"]
except KeyError:
return super().domain

@export(property=True)
def ips(self) -> list[str]:
"""Return the configured IP address(es)."""
try:
# Parse all configured interfaces
interfaces = self.config["system"]["configuration"]["system"]["interface"]
ips = []
for ip_type in ("ipaddress", "ip6address"):
ips.extend([ip for iface in interfaces if (ip := iface.get(ip_type)) not in ("0.0.0.0", None)])
except KeyError:
pass

# Fall back to generic RedHat/Linux parsing if no ips were found
return ips or super().ips

@export(property=True)
def dns(self) -> list[str] | None:
"""Return the configured DNS servers."""
try:
return [item.get("ipaddress") for item in self.config["system"]["configuration"]["system"]["dns"]]
except KeyError:
pass

@export(property=True)
def gateway(self) -> list[str] | None:
"""Return list of configured gateway ip addresses."""
routes = self.config["system"]["configuration"]["system"]["route"]
if not isinstance(routes, list):
routes = [routes]
try:
return [r.get("gateway") for r in routes]
except KeyError:
pass

@export(property=True)
def version(self) -> str:
"""Return the Ivanti EPMM build version string."""
mi_version = (
rel.read_text().strip() if (rel := self.target.fs.path("/mi/release")).is_file() else "unknown build"
)
sys_version = super().version or "unknown Linux version"
return f"Ivanti EPMM {mi_version} ({sys_version})"

@export(record=UnixUserRecord)
def users(self) -> Iterator[EPMMUserRecord | UnixUserRecord]:
"""Yield Ivanti EPMM user records from identityconfig.xml and unix user records from /etc/passwd."""
# Yield unix-like users from /etc/passwd.
yield from super().users()

# Yield EPMM configured identities.
try:
identity = self.config["identity"]["configuration"]["identity"]
users = identity["user"]
roles = identity["roles"]
except KeyError:
pass

if not isinstance(users, list):
users = [users]

if not isinstance(roles, list):
roles = [roles]

for user in users:
full_name = f"{user.get('firstname')} {user.get('lastname')}".strip()
yield EPMMUserRecord(
name=user.get("principal"),
password=user.get("passwordHashSHA512"),
gecos=f"{full_name},,,,{user.get('email')}",
groups=[user.get("group")],
roles=next(r["role"] for r in roles if r["principal"] == user["principal"]),
source="/mi/config-system/startup_config/identityconfig.xml",
_target=self.target,
)
2 changes: 1 addition & 1 deletion dissect/target/tools/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,7 @@ def extend_args(args: argparse.Namespace, func: Callable) -> argparse.Namespace:

def _target_name(target: Target) -> str:
"""Return a printable FQDN target name for cmd.Cmd base prompts."""
if target.has_function("domain") and target.domain:
if target.has_function("domain") and target.domain is not None and not target.name.endswith(target.domain):
return escape_str(f"{target.name}.{target.domain}")

return escape_str(target.name)
Expand Down
Git LFS file not shown
Git LFS file not shown
42 changes: 42 additions & 0 deletions tests/helpers/test_configutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Leases,
ScopeManager,
SystemD,
Xml,
parse,
)
from tests._utils import absolute_path
Expand Down Expand Up @@ -403,3 +404,44 @@ def test_leases_parser(string_data: str, expected_output: dict) -> None:
parser.parse_file(StringIO(string_data))

assert parser.parsed_data == expected_output


@pytest.mark.parametrize(
("input", "namespace", "expected_output"),
[
pytest.param(
"""\
<?xml version="1.0" encoding="UTF-8"?>
<example xmlns="http://example.com/example" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<foo>
<attr>value</attr>
</foo>
<bar>
<attr>bar-1</attr>
</bar>
<bar>
<attr>bar-2</attr>
</bar>
</example>
""",
"{http://example.com/example}",
{
"example": {
"bar": [
{"attr": "bar-1"},
{"attr": "bar-2"},
],
"foo": {
"attr": "value",
},
}
},
id="xml-without-attributes",
)
],
)
def test_xml_parser(input: str, namespace: str, expected_output: dict) -> None:
"""Test the XML config parser."""
parser = Xml(namespace=namespace)
parser.parse_file(StringIO(textwrap.dedent(input)))
assert parser.parsed_data == expected_output
Empty file.
Loading
Loading