From cef20daad530a077df2671ded1812a579979bada Mon Sep 17 00:00:00 2001 From: Lennart Haagsma <6630974+lhaagsma@users.noreply.github.com> Date: Fri, 2 May 2025 09:03:39 +0200 Subject: [PATCH] Extend child-support --- dissect/target/helpers/record.py | 1 + .../plugins/apps/container/container.py | 2 +- .../target/plugins/apps/container/docker.py | 2 +- .../target/plugins/apps/container/podman.py | 4 +- dissect/target/plugins/child/colima.py | 54 +++++++----- dissect/target/plugins/child/docker.py | 1 + dissect/target/plugins/child/esxi.py | 11 +++ dissect/target/plugins/child/hyperv.py | 35 +++++++- dissect/target/plugins/child/lima.py | 52 +++++++++++ dissect/target/plugins/child/parallels.py | 12 +++ dissect/target/plugins/child/podman.py | 1 + dissect/target/plugins/child/proxmox.py | 19 +++- dissect/target/plugins/child/qemu.py | 16 +++- dissect/target/plugins/child/virtualbox.py | 14 ++- dissect/target/plugins/child/virtuozzo.py | 22 ++++- .../plugins/child/vmware_workstation.py | 38 +++++--- dissect/target/plugins/child/wsl.py | 15 +++- dissect/target/plugins/os/unix/esxi/_os.py | 2 +- dissect/target/target.py | 59 +++++++++---- dissect/target/tools/dd.py | 6 +- dissect/target/tools/diff.py | 2 +- dissect/target/tools/dump/run.py | 2 +- dissect/target/tools/fs.py | 2 +- dissect/target/tools/info.py | 36 +++++--- dissect/target/tools/mount.py | 6 +- dissect/target/tools/qfind.py | 8 +- dissect/target/tools/query.py | 10 ++- dissect/target/tools/reg.py | 6 +- dissect/target/tools/shell.py | 11 ++- dissect/target/tools/utils.py | 87 +++++++++++++++++-- dissect/target/tools/yara.py | 7 +- .../_data/plugins/child/parallels/config.pvs | 3 + tests/_data/plugins/child/virtualbox/vm.vbox | 3 + .../plugins/child/virtuozzo/example.conf | 3 + tests/plugins/apps/container/test_docker.py | 4 +- tests/plugins/apps/container/test_podman.py | 4 +- tests/plugins/child/test_colima.py | 23 +++-- tests/plugins/child/test_docker.py | 3 +- tests/plugins/child/test_esxi.py | 43 +++++++++ tests/plugins/child/test_hyperv.py | 16 +++- tests/plugins/child/test_lima.py | 31 +++++++ tests/plugins/child/test_parallels.py | 40 ++++++--- tests/plugins/child/test_podman.py | 8 +- tests/plugins/child/test_proxmox.py | 35 ++++++++ tests/plugins/child/test_qemu.py | 7 +- tests/plugins/child/test_virtualbox.py | 12 ++- tests/plugins/child/test_virtuozzo.py | 9 +- .../plugins/child/test_vmware_workstation.py | 7 +- tests/plugins/child/test_wsl.py | 13 ++- tests/test_target.py | 77 ++++++++++++++++ tests/tools/test_mount.py | 8 +- tests/tools/test_utils.py | 4 +- 52 files changed, 736 insertions(+), 160 deletions(-) create mode 100644 dissect/target/plugins/child/lima.py create mode 100644 tests/_data/plugins/child/parallels/config.pvs create mode 100644 tests/_data/plugins/child/virtualbox/vm.vbox create mode 100644 tests/_data/plugins/child/virtuozzo/example.conf create mode 100644 tests/plugins/child/test_esxi.py create mode 100644 tests/plugins/child/test_lima.py create mode 100644 tests/plugins/child/test_proxmox.py diff --git a/dissect/target/helpers/record.py b/dissect/target/helpers/record.py index b1b478f129..2106730db4 100644 --- a/dissect/target/helpers/record.py +++ b/dissect/target/helpers/record.py @@ -116,6 +116,7 @@ def DynamicDescriptor(types: Sequence[str]) -> RecordDescriptor: "target/child", [ ("string", "type"), + ("string", "name"), ("path", "path"), ], ) diff --git a/dissect/target/plugins/apps/container/container.py b/dissect/target/plugins/apps/container/container.py index 647685dd6f..7b689a2375 100755 --- a/dissect/target/plugins/apps/container/container.py +++ b/dissect/target/plugins/apps/container/container.py @@ -22,7 +22,7 @@ ("datetime", "started"), ("datetime", "finished"), ("string[]", "ports"), - ("string", "names"), + ("string", "name"), ("string[]", "volumes"), ("string[]", "environment"), ("path", "mount_path"), diff --git a/dissect/target/plugins/apps/container/docker.py b/dissect/target/plugins/apps/container/docker.py index 3f27464e80..dcb68c9659 100644 --- a/dissect/target/plugins/apps/container/docker.py +++ b/dissect/target/plugins/apps/container/docker.py @@ -200,7 +200,7 @@ def containers(self) -> Iterator[DockerContainerRecord]: started=convert_timestamp(config.get("State", {}).get("StartedAt")), finished=convert_timestamp(config.get("State", {}).get("FinishedAt")), ports=list(convert_ports(ports)), - names=config.get("Name", "").replace("/", "", 1), + name=config.get("Name", "").replace("/", "", 1), volumes=volumes, environment=config.get("Config", {}).get("Env", []), mount_path=mount_path, diff --git a/dissect/target/plugins/apps/container/podman.py b/dissect/target/plugins/apps/container/podman.py index 1e29a6051b..7fb32b965a 100755 --- a/dissect/target/plugins/apps/container/podman.py +++ b/dissect/target/plugins/apps/container/podman.py @@ -221,7 +221,7 @@ def _find_containers_sqlite(self, path: Path) -> Iterator[PodmanContainerRecord] started=container.get("startedTime"), finished=container.get("finishedTime"), ports=list(convert_ports(container.get("newPortMappings", []))), # TODO: research "exposedPorts" - names=container.get("name"), + name=container.get("name"), volumes=volumes, environment=container.get("spec", {}).get("process", {}).get("env", []), mount_path=mount_path, @@ -271,7 +271,7 @@ def _find_containers_fs(self, path: Path) -> Iterator[PodmanContainerRecord]: image_id=other_config.get("image"), command=" ".join(config.get("process", {}).get("args", [])), created=other_config.get("created"), - names=other_config.get("names"), + name=other_config.get("names"), environment=config.get("process", {}).get("env", []), mount_path=path.joinpath(f"storage/overlay/{other_config.get('layer')}") if other_config else None, config_path=config_path, diff --git a/dissect/target/plugins/child/colima.py b/dissect/target/plugins/child/colima.py index de44e37027..309c7a9c20 100644 --- a/dissect/target/plugins/child/colima.py +++ b/dissect/target/plugins/child/colima.py @@ -13,42 +13,56 @@ from dissect.target import Target -def find_containers(paths: list[Path]) -> Iterator[Path]: - for path in paths: - for config_path in path.iterdir(): - if (config_file := config_path.joinpath("colima.yaml")).exists(): - name = f"-{config_file.parts[-2]}" if config_file.parts[-2] != "default" else "" - if (disk_path := path.joinpath("_lima", f"colima{name}", "diffdisk")).exists(): - yield disk_path +def find_vms(path: Path) -> Iterator[tuple[str, Path, Path]]: + """Find the Lima VMs from Colima and yield the name, Colima configuration path and Lima VM path. + + References: + - https://github.com/abiosoft/colima/blob/5ddf1e0dc67772f6e28f84c7c7b32f2343ad4bfb/config/profile.go#L19-L39 + """ + for config_file in path.glob("*/colima.yaml"): + profile = config_file.parent.name + if profile == "default": + lima_id = "colima" + else: + profile = profile.removeprefix("colima-") + lima_id = f"colima-{profile}" + + if (lima_path := path.joinpath("_lima", lima_id)).exists(): + yield profile, config_file, lima_path class ColimaChildTargetPlugin(ChildTargetPlugin): - """Child target plugin that yields Colima containers. + """Child target plugin that yields Colima VMs. Colima is a container runtime for macOS and Linux. References: - - https://github.com/abiosoft/colima/blob/5d2e91c4a491d4ae35d69fb2583f4f959401bc37 + - https://github.com/abiosoft/colima """ __type__ = "colima" def __init__(self, target: Target): super().__init__(target) - self.paths = [ - path - for user in self.target.user_details.all_with_home() - if (path := user.home_path.joinpath(".colima")).exists() - ] + self.paths = [] + for user in self.target.user_details.all_with_home(): + # check .colima folder in ~/ + if (path := user.home_path.joinpath(".colima")).exists(): + self.paths.append(path) + # check .colima folder in ~/.config/ + if (path := user.home_path.joinpath(".config", "colima")).exists(): + self.paths.append(path) def check_compatible(self) -> None: if not self.paths: raise UnsupportedPluginError("No Colima configurations found") def list_children(self) -> Iterator[ChildTargetRecord]: - for container in find_containers(self.paths): - yield ChildTargetRecord( - type=self.__type__, - path=container, - _target=self.target, - ) + for path in self.paths: + for name, _, lima_path in find_vms(path): + yield ChildTargetRecord( + type=self.__type__, + name=name, + path=lima_path.joinpath("diffdisk"), + _target=self.target, + ) diff --git a/dissect/target/plugins/child/docker.py b/dissect/target/plugins/child/docker.py index d591489f03..12938c9337 100644 --- a/dissect/target/plugins/child/docker.py +++ b/dissect/target/plugins/child/docker.py @@ -24,6 +24,7 @@ def list_children(self) -> Iterator[ChildTargetRecord]: if container.mount_path: yield ChildTargetRecord( type=self.__type__, + name=container.name, path=container.mount_path, _target=self.target, ) diff --git a/dissect/target/plugins/child/esxi.py b/dissect/target/plugins/child/esxi.py index 670693bc70..84231c99b1 100644 --- a/dissect/target/plugins/child/esxi.py +++ b/dissect/target/plugins/child/esxi.py @@ -2,6 +2,8 @@ from typing import TYPE_CHECKING +from dissect.hypervisor import vmx + from dissect.target.exceptions import UnsupportedPluginError from dissect.target.helpers.record import ChildTargetRecord from dissect.target.plugin import ChildTargetPlugin @@ -21,8 +23,17 @@ def check_compatible(self) -> None: def list_children(self) -> Iterator[ChildTargetRecord]: for vm in self.target.vm_inventory(): + try: + name = vmx.VMX.parse(self.target.fs.path(vm.path).read_text()).attr.get("displayname") + except Exception as e: + self.target.log.exception("Failed parsing displayname from VMX: %s", vm.path) + self.target.log.debug("", exc_info=e) + + name = None + yield ChildTargetRecord( type=self.__type__, + name=name, path=vm.path, _target=self.target, ) diff --git a/dissect/target/plugins/child/hyperv.py b/dissect/target/plugins/child/hyperv.py index 5331051f66..b110e54d84 100644 --- a/dissect/target/plugins/child/hyperv.py +++ b/dissect/target/plugins/child/hyperv.py @@ -2,6 +2,7 @@ from typing import TYPE_CHECKING +from defusedxml import ElementTree from dissect.hypervisor import hyperv from dissect.target.exceptions import UnsupportedPluginError @@ -10,6 +11,7 @@ if TYPE_CHECKING: from collections.abc import Iterator + from pathlib import Path from dissect.target.target import Target @@ -45,16 +47,43 @@ def list_children(self) -> Iterator[ChildTargetRecord]: data = hyperv.HyperVFile(self.data_vmcx.open()).as_dict() if virtual_machines := data["Configurations"].get("VirtualMachines"): - for vm_path in virtual_machines.values(): + for path in virtual_machines.values(): + vm_path = self.target.fs.path(path) yield ChildTargetRecord( type=self.__type__, - path=self.target.fs.path(vm_path), + name=self._name_from_vmcx(vm_path), + path=vm_path, _target=self.target, ) for xml_path in self.vm_xml: + vm_path = xml_path.resolve() yield ChildTargetRecord( type=self.__type__, - path=xml_path.resolve(), + name=self._name_from_xml(vm_path), + path=vm_path, _target=self.target, ) + + def _name_from_vmcx(self, path: Path) -> str | None: + try: + with path.open("rb") as fh: + config = hyperv.HyperVFile(fh).as_dict() + return config.get("configuration", {}).get("properties", {}).get("name") + except Exception as e: + self.target.log.error("Failed parsing name from VMCX: %s", path) # noqa: TRY400 + self.target.log.debug("", exc_info=e) + + return None + + def _name_from_xml(self, path: Path) -> str | None: + try: + xml = ElementTree.fromstring(path.read_bytes()) + + if (name := xml.find(".//properties/name")) is not None: + return name.text + except Exception as e: + self.target.log.error("Failed parsing name from XML: %s", path) # noqa: TRY400 + self.target.log.debug("", exc_info=e) + + return None diff --git a/dissect/target/plugins/child/lima.py b/dissect/target/plugins/child/lima.py new file mode 100644 index 0000000000..2aa4d16260 --- /dev/null +++ b/dissect/target/plugins/child/lima.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from dissect.target.exceptions import UnsupportedPluginError +from dissect.target.helpers.record import ChildTargetRecord +from dissect.target.plugin import ChildTargetPlugin + +if TYPE_CHECKING: + from collections.abc import Iterator + + from dissect.target import Target + + +class LimaChildTargetPlugin(ChildTargetPlugin): + """Child target plugin that yields Lima VMs. + + Lima (Linux Machines) is a Linux VM or container runtime for macOS and Linux. + + References: + - https://github.com/lima-vm/lima + """ + + __type__ = "lima" + + def __init__(self, target: Target): + super().__init__(target) + self.paths = [] + for user in self.target.user_details.all_with_home(): + # check .lima folder in ~/ + if (path := user.home_path.joinpath(".lima")).exists(): + self.paths.append(path) + # check .lima folder in ~/.config/ + if (path := user.home_path.joinpath(".config", "lima")).exists(): + self.paths.append(path) + + def check_compatible(self) -> None: + if not self.paths: + raise UnsupportedPluginError("No Lima configurations found") + + def list_children(self) -> Iterator[ChildTargetRecord]: + for path in self.paths: + for instance in path.iterdir(): + if instance.name.startswith((".", "_")) or not instance.is_dir(): + continue + + yield ChildTargetRecord( + type=self.__type__, + name=instance.name, + path=instance.joinpath("diffdisk"), + _target=self.target, + ) diff --git a/dissect/target/plugins/child/parallels.py b/dissect/target/plugins/child/parallels.py index 3231c99f8f..128fc423ff 100644 --- a/dissect/target/plugins/child/parallels.py +++ b/dissect/target/plugins/child/parallels.py @@ -2,6 +2,8 @@ from typing import TYPE_CHECKING +from defusedxml import ElementTree + from dissect.target.exceptions import UnsupportedPluginError from dissect.target.helpers.record import ChildTargetRecord from dissect.target.plugin import ChildTargetPlugin @@ -67,8 +69,18 @@ def check_compatible(self) -> None: def list_children(self) -> Iterator[ChildTargetRecord]: for pvm in self.pvms: + try: + config = ElementTree.fromstring(pvm.joinpath("config.pvs").read_bytes()) + name = config.find(".//VmName").text + except Exception as e: + self.target.log.error("Failed parsing VmName from config.pvs: %s", pvm) # noqa: TRY400 + self.target.log.debug("", exc_info=e) + + name = None + yield ChildTargetRecord( type=self.__type__, + name=name, path=pvm, _target=self.target, ) diff --git a/dissect/target/plugins/child/podman.py b/dissect/target/plugins/child/podman.py index 3b5c33a06f..888ab417c0 100755 --- a/dissect/target/plugins/child/podman.py +++ b/dissect/target/plugins/child/podman.py @@ -24,6 +24,7 @@ def list_children(self) -> Iterator[ChildTargetRecord]: if container.mount_path: yield ChildTargetRecord( type=self.__type__, + name=container.name, path=container.mount_path, _target=self.target, ) diff --git a/dissect/target/plugins/child/proxmox.py b/dissect/target/plugins/child/proxmox.py index 9116c2e6dd..17d7f049bb 100644 --- a/dissect/target/plugins/child/proxmox.py +++ b/dissect/target/plugins/child/proxmox.py @@ -21,8 +21,25 @@ def check_compatible(self) -> None: def list_children(self) -> Iterator[ChildTargetRecord]: for vm in self.target.vmlist(): + vm_path = self.target.fs.path(vm.path) + + name = None + try: + with vm_path.open("rt") as fh: + for line in fh: + if not (line := line.strip()): + continue + + if (key_value := line.split(":", 1)) and key_value[0] == "name": + name = key_value[1].strip() + break + except Exception as e: + self.target.log.error("Failed parsing name from VM config: %s", vm_path) # noqa: TRY400 + self.target.log.debug("", exc_info=e) + yield ChildTargetRecord( type=self.__type__, - path=vm.path, + name=name, + path=vm_path, _target=self.target, ) diff --git a/dissect/target/plugins/child/qemu.py b/dissect/target/plugins/child/qemu.py index c17c1ac4cc..396be08f47 100644 --- a/dissect/target/plugins/child/qemu.py +++ b/dissect/target/plugins/child/qemu.py @@ -2,6 +2,8 @@ from typing import TYPE_CHECKING +from defusedxml import ElementTree + from dissect.target.exceptions import UnsupportedPluginError from dissect.target.helpers.record import ChildTargetRecord from dissect.target.plugin import ChildTargetPlugin @@ -21,4 +23,16 @@ def check_compatible(self) -> None: def list_children(self) -> Iterator[ChildTargetRecord]: for domain in self.target.fs.path("/etc/libvirt/qemu").glob("*.xml"): - yield ChildTargetRecord(type=self.__type__, path=domain) + try: + name = ElementTree.fromstring(domain.read_bytes()).find("name").text + except Exception as e: + self.target.log.error("Failed to parse name from QEMU config: %s", domain) # noqa: TRY400 + self.target.log.debug("", exc_info=e) + name = None + + yield ChildTargetRecord( + type=self.__type__, + name=name, + path=domain, + _target=self.target, + ) diff --git a/dissect/target/plugins/child/virtualbox.py b/dissect/target/plugins/child/virtualbox.py index 1896067e89..c0a8f2329d 100755 --- a/dissect/target/plugins/child/virtualbox.py +++ b/dissect/target/plugins/child/virtualbox.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING -from defusedxml import ElementTree as ET +from defusedxml import ElementTree from dissect.hypervisor.descriptor.vbox import VBox from dissect.target.exceptions import UnsupportedPluginError @@ -63,7 +63,7 @@ def find_vms(self) -> Iterator[Path]: continue try: - config = ET.fromstring(path.read_text()) + config = ElementTree.fromstring(path.read_text()) except Exception as e: self.target.log.warning("Unable to parse %s: %s", path, e) self.target.log.debug("", exc_info=e) @@ -95,8 +95,18 @@ def check_compatible(self) -> None: def list_children(self) -> Iterator[ChildTargetRecord]: for vbox in self.vboxes: + try: + config = ElementTree.fromstring(vbox.read_bytes()) + name = config.find(f".//{VBox.VBOX_XML_NAMESPACE}Machine").attrib["name"] + except Exception as e: + self.target.log.error("Failed to parse name from VirtualBox XML: %s", vbox) # noqa: TRY400 + self.target.log.debug("", exc_info=e) + + name = None + yield ChildTargetRecord( type=self.__type__, + name=name, path=vbox, _target=self.target, ) diff --git a/dissect/target/plugins/child/virtuozzo.py b/dissect/target/plugins/child/virtuozzo.py index 73fd2ac1d0..9ff6509144 100644 --- a/dissect/target/plugins/child/virtuozzo.py +++ b/dissect/target/plugins/child/virtuozzo.py @@ -13,7 +13,7 @@ class VirtuozzoChildTargetPlugin(ChildTargetPlugin): """Child target plugin that yields from Virtuozzo container's root. - Virtuozzo conatiners are by default registered in the folder ``vz/root/$VEID``, + Virtuozzo containers are by default registered in the folder ``vz/root/$VEID``, where VEID will be substituted with the actual container UUID. .. code-block:: @@ -28,11 +28,14 @@ class VirtuozzoChildTargetPlugin(ChildTargetPlugin): References: - https://docs.virtuozzo.com/virtuozzo_hybrid_server_7_command_line_reference/managing-system/configuration-files.html + - https://wiki.openvz.org/Man/ctid.conf.5 + - https://docs.virtuozzo.com/pdf/virtuozzo_hybrid_server_7_command_line_reference.pdf """ __type__ = "virtuozzo" PATH = "/vz/root" + CONFIG_PATH = "/etc/vz/conf" def check_compatible(self) -> None: if not self.target.fs.path(self.PATH).exists(): @@ -40,8 +43,25 @@ def check_compatible(self) -> None: def list_children(self) -> Iterator[ChildTargetRecord]: for container in self.target.fs.path(self.PATH).iterdir(): + name = None + try: + if (vm_config := self.target.fs.path(self.CONFIG_PATH).joinpath(f"{container.name}.conf")).exists(): + with vm_config.open("rt") as fh: + for line in fh: + if not (line := line.strip()): + continue + + key, _, value = line.partition("=") + if key == "NAME": + name = value.strip('"') + break + except Exception as e: + self.target.log.exception("Failed to parse NAME from Virtuozzo config: %s", container) + self.target.log.debug("", exc_info=e) + yield ChildTargetRecord( type=self.__type__, + name=name, path=container, _target=self.target, ) diff --git a/dissect/target/plugins/child/vmware_workstation.py b/dissect/target/plugins/child/vmware_workstation.py index a2e9d8d2e6..1545679e28 100644 --- a/dissect/target/plugins/child/vmware_workstation.py +++ b/dissect/target/plugins/child/vmware_workstation.py @@ -8,6 +8,7 @@ if TYPE_CHECKING: from collections.abc import Iterator + from pathlib import Path from dissect.target.helpers.fsutil import TargetPath from dissect.target.target import Target @@ -31,6 +32,26 @@ def find_vm_inventory(target: Target) -> Iterator[TargetPath]: yield inv_file +def parse_vm_inventory(path: Path) -> dict[str, dict[str, str]]: + config = {} + + with path.open("rt") as fh: + for line in fh: + if not (line := line.strip()) or line.startswith("."): + continue + + full_key, value = line.split("=", 1) + vm, key = full_key.strip().split(".", 1) + + # Only process vmlist entries, not index entries + if "vmlist" not in vm: + continue + + config.setdefault(vm, {})[key] = value.strip().strip('"') + + return config + + class VmwareWorkstationChildTargetPlugin(ChildTargetPlugin): """Child target plugin that yields from VMware Workstation VM inventory.""" @@ -46,21 +67,12 @@ def check_compatible(self) -> None: def list_children(self) -> Iterator[ChildTargetRecord]: for inv in self.inventories: - for line in inv.open("rt"): - line = line.strip() - if not line.startswith("vmlist"): - continue - - key, _, value = line.partition("=") - if not key.strip().endswith(".config"): - continue - - value = value.strip().strip('"') - if value.startswith("folder") or not value: - continue + inventory = parse_vm_inventory(inv) + for config in inventory.values(): yield ChildTargetRecord( type=self.__type__, - path=self.target.fs.path(value.strip('"')), + name=config.get("DisplayName"), + path=config.get("config"), _target=self.target, ) diff --git a/dissect/target/plugins/child/wsl.py b/dissect/target/plugins/child/wsl.py index c93f4988b0..004651eea6 100644 --- a/dissect/target/plugins/child/wsl.py +++ b/dissect/target/plugins/child/wsl.py @@ -13,7 +13,7 @@ from dissect.target.target import Target -def find_wsl_installs(target: Target) -> Iterator[Path]: +def find_wsl_installs(target: Target) -> Iterator[str, Path]: """Find all WSL disk files. Disk files for working (custom) Linux distributions can be located anywhere on the system. @@ -30,9 +30,15 @@ def find_wsl_installs(target: Target) -> Iterator[Path]: for distribution_key in lxss_key.subkeys(): if not distribution_key.name.startswith("{"): continue + + name = distribution_key.value("DistributionName").value base_path = target.resolve(distribution_key.value("BasePath").value) # WSL needs diskname to be ext4.vhdx, but they can be renamed when WSL is not active - yield from base_path.glob("*.vhdx") + if not (disk_path := next(base_path.glob("*.vhdx"), None)): + target.log.warning("No WSL disk file found in %s, check WSL install %r manually!", base_path, name) + continue + + yield name, disk_path except PluginError: pass @@ -59,9 +65,10 @@ def check_compatible(self) -> None: raise UnsupportedPluginError("No WSL installs found") def list_children(self) -> Iterator[ChildTargetRecord]: - for install_path in self.installs: + for name, disk_path in self.installs: yield ChildTargetRecord( type=self.__type__, - path=install_path, + name=name, + path=disk_path, _target=self.target, ) diff --git a/dissect/target/plugins/os/unix/esxi/_os.py b/dissect/target/plugins/os/unix/esxi/_os.py index cc15ce679a..25b3d20854 100644 --- a/dissect/target/plugins/os/unix/esxi/_os.py +++ b/dissect/target/plugins/os/unix/esxi/_os.py @@ -180,7 +180,7 @@ def vm_inventory(self) -> Iterator[VirtualMachineRecord]: if not inv_file.exists(): return [] - root = ElementTree.fromstring(inv_file.read_text("utf-8")) + root = ElementTree.fromstring(inv_file.read_text()) for entry in root.iter("ConfigEntry"): yield VirtualMachineRecord( path=self.target.fs.path(entry.findtext("vmxCfgPath")), diff --git a/dissect/target/target.py b/dissect/target/target.py index 8fb5ab4df8..bccbdb88b5 100644 --- a/dissect/target/target.py +++ b/dissect/target/target.py @@ -493,23 +493,39 @@ def _load_child_plugins(self) -> None: "An exception occurred while checking for child plugin compatibility: %s", plugin_desc.qualname ) - def open_child(self, child: str | Path) -> Target: + def open_child(self, child: int | str | Path) -> Target: """Open a child target. + Allows opening a nested child target by path, index or child pattern. + Paths will simply attempt to open the path as a child target. + Indexes are zero-based, so the first child is 0, the second is 1, etc. + If the child is a pattern, such as ``4.2``, it will open the 2nd child of the 4th child target of this target. + Args: - child: The location of a target within the current ``Target``. + child: The location of a target within the current ``Target``, or a child pattern. Returns: An opened ``Target`` object of the child target. """ - if isinstance(child, str) and child.isdecimal(): - child_num = int(child) - for child_record in self.list_children(): - if child_num == 0: - return Target.open(self.fs.path(child_record.path)) - child_num -= 1 - raise IndexError("Child target index out of range") + # Open child identified by its single digit index (from list_children), int or str + # OR + # Open sub-child (grandchild?) uing a sub-child pattern string such as "4.2": + # - open the 4th child of the current target + # - open the 2nd child of this new target + if isinstance(child, int) or (isinstance(child, str) and all(part.isdecimal() for part in child.split("."))): + current_target = self + for child_idx in map(int, str(child).split(".")): + for _, child in current_target.list_children(): + if child_idx == 0: + current_target = Target.open(current_target.fs.path(child.path)) + break + child_idx -= 1 + else: + raise IndexError("Child target index out of range") + return current_target + + # Open child by path return Target.open(self.fs.path(child)) def open_children(self, recursive: bool = False) -> Iterator[Target]: @@ -523,7 +539,7 @@ def open_children(self, recursive: bool = False) -> Iterator[Target]: Returns: An iterator of ``Targets``. """ - for child in self.list_children(): + for _, child in self.list_children(): try: target = self.open_child(child.path) except TargetError: @@ -535,13 +551,24 @@ def open_children(self, recursive: bool = False) -> Iterator[Target]: if recursive: yield from target.open_children(recursive=recursive) - def list_children(self) -> Iterator[ChildTargetRecord]: - """Lists all child targets that compatible :class:`~dissect.target.plugin.ChildTargetPlugin` classes - can discover. - """ + def list_children(self, recursive: bool = False) -> Iterator[tuple[str, ChildTargetRecord]]: + """Lists all discovered child targets.""" self._load_child_plugins() - for child_plugin in self._child_plugins.values(): - yield from child_plugin.list_children() + + idx = 0 + for child_plugin_type in sorted(self._child_plugins): + for child in self._child_plugins[child_plugin_type].list_children(): + yield str(idx), child + + if recursive: + try: + target = self.open_child(child.path) + for sub_idx, sub_child in target.list_children(recursive=recursive): + yield f"{idx}.{sub_idx}", sub_child + except TargetError: + self.log.warning("Failed to open child target %s", child.path) + + idx += 1 def reload(self) -> Target: """Reload the current target. diff --git a/dissect/target/tools/dd.py b/dissect/target/tools/dd.py index 9ffc4ca694..7fcf51dc88 100644 --- a/dissect/target/tools/dd.py +++ b/dissect/target/tools/dd.py @@ -9,10 +9,10 @@ from dissect.util.stream import RangeStream from dissect.target.exceptions import TargetError -from dissect.target.target import Target from dissect.target.tools.utils import ( catch_sigpipe, configure_generic_arguments, + open_target, process_generic_arguments, ) @@ -36,10 +36,10 @@ def main() -> int: configure_generic_arguments(parser) args, _ = parser.parse_known_args() - process_generic_arguments(args) + process_generic_arguments(parser, args) try: - t = Target.open(args.target) + t = open_target(args) except TargetError as e: log.error(e) # noqa: TRY400 log.debug("", exc_info=e) diff --git a/dissect/target/tools/diff.py b/dissect/target/tools/diff.py index 9df6547027..545d0d13c7 100644 --- a/dissect/target/tools/diff.py +++ b/dissect/target/tools/diff.py @@ -966,7 +966,7 @@ def main() -> int: configure_generic_arguments(parser) args, _ = parser.parse_known_args() - process_generic_arguments(args) + process_generic_arguments(parser, args) if len(args.targets) < 2: parser.error("at least two targets are required for target-diff") diff --git a/dissect/target/tools/dump/run.py b/dissect/target/tools/dump/run.py index f87206d68a..e5064f67c3 100644 --- a/dissect/target/tools/dump/run.py +++ b/dissect/target/tools/dump/run.py @@ -300,7 +300,7 @@ def parse_arguments() -> tuple[argparse.Namespace, list[str]]: configure_generic_arguments(parser) args, rest = parser.parse_known_args() - process_generic_arguments(args) + process_generic_arguments(parser, args) if not args.function and ("-h" in rest or "--help" in rest): parser.print_help() diff --git a/dissect/target/tools/fs.py b/dissect/target/tools/fs.py index ce067253c2..cc6ab944b8 100644 --- a/dissect/target/tools/fs.py +++ b/dissect/target/tools/fs.py @@ -144,7 +144,7 @@ def main() -> int: configure_generic_arguments(parser) args, _ = parser.parse_known_args() - process_generic_arguments(args) + process_generic_arguments(parser, args) if args.subcommand is None: parser.error("No subcommand specified") diff --git a/dissect/target/tools/info.py b/dissect/target/tools/info.py index a025d7bb21..08df27d375 100644 --- a/dissect/target/tools/info.py +++ b/dissect/target/tools/info.py @@ -6,17 +6,22 @@ import logging from datetime import datetime from pathlib import Path +from typing import TYPE_CHECKING from dissect.target.exceptions import TargetError from dissect.target.helpers.record import TargetRecordDescriptor -from dissect.target.target import Target from dissect.target.tools.query import record_output from dissect.target.tools.utils import ( catch_sigpipe, configure_generic_arguments, + open_targets, process_generic_arguments, ) +if TYPE_CHECKING: + from dissect.target.target import Target + + InfoRecord = TargetRecordDescriptor( "target/info", [ @@ -50,7 +55,6 @@ def main() -> int: formatter_class=help_formatter, ) parser.add_argument("targets", metavar="TARGETS", nargs="*", help="Targets to display info from") - parser.add_argument("--children", action="store_true", help="include children") parser.add_argument("--from-file", nargs="?", type=Path, help="file containing targets to load") parser.add_argument("-s", "--strings", action="store_true", help="print output as string") parser.add_argument("-r", "--record", action="store_true", help="print output as record") @@ -72,22 +76,23 @@ def main() -> int: targets = targets[:-1] args.targets = targets - process_generic_arguments(args) + process_generic_arguments(parser, args) try: - for i, target in enumerate(Target.open_all(args.targets, include_children=args.children)): + for i, target in enumerate(open_targets(args)): try: + target_info = get_target_info(target, args.recursive) if args.jsonlines: - print(json.dumps(get_target_info(target), default=str)) + print(json.dumps(target_info, default=str)) elif args.json: - print(json.dumps(get_target_info(target), indent=4, default=str)) + print(json.dumps(target_info, indent=4, default=str)) elif args.record: rs = record_output(args.strings) - rs.write(InfoRecord(**get_target_info(target), _target=target)) + rs.write(InfoRecord(**target_info, _target=target)) else: if i > 0: print("-" * 70) - print_target_info(target) + print_target_info(target, target_info) except Exception as e: # noqa: PERF203 target.log.error("Exception in retrieving information for target: `%s`, use `-vv` for details", target) # noqa: TRY400 target.log.debug("", exc_info=e) @@ -99,12 +104,12 @@ def main() -> int: return 0 -def get_target_info(target: Target) -> dict[str, str | list[str]]: +def get_target_info(target: Target, recursive: bool = False) -> dict[str, str | list[str]]: return { "disks": get_disks_info(target), "volumes": get_volumes_info(target), "mounts": get_mounts_info(target), - "children": get_children_info(target), + "children": get_children_info(target, recursive), "hostname": target.hostname, "domain": get_optional_func(target, "domain"), "ips": target.ips, @@ -124,10 +129,10 @@ def get_optional_func(target: Target, func: str) -> str | None: return None -def print_target_info(target: Target) -> None: +def print_target_info(target: Target, target_info: dict[str, str | list[str]]) -> None: print(target) - for name, value in get_target_info(target).items(): + for name, value in target_info.items(): if name in ["disks", "volumes", "mounts", "children"]: if not any(value): continue @@ -161,8 +166,11 @@ def get_mounts_info(target: Target) -> list[dict[str, str | None]]: return [{"fs": fs.__type__, "path": path} for path, fs in target.fs.mounts.items()] -def get_children_info(target: Target) -> list[dict[str, str]]: - return [{"type": c.type, "path": str(c.path)} for c in target.list_children()] +def get_children_info(target: Target, recursive: bool = False) -> list[dict[str, str]]: + return [ + {"id": child_id, "type": child.type, "name": child.name, "path": str(child.path)} + for child_id, child in target.list_children(recursive=recursive) + ] if __name__ == "__main__": diff --git a/dissect/target/tools/mount.py b/dissect/target/tools/mount.py index f95dd7fcf7..3481aca352 100644 --- a/dissect/target/tools/mount.py +++ b/dissect/target/tools/mount.py @@ -8,10 +8,10 @@ from dissect.target import filesystem from dissect.target.exceptions import TargetError from dissect.target.helpers.utils import parse_options_string -from dissect.target.target import Target from dissect.target.tools.utils import ( catch_sigpipe, configure_generic_arguments, + open_target, process_generic_arguments, ) @@ -59,14 +59,14 @@ def main() -> int: configure_generic_arguments(parser) args, _ = parser.parse_known_args() - process_generic_arguments(args) + process_generic_arguments(parser, args) if not HAS_FUSE: log.error("fusepy is not installed: pip install fusepy") return 1 try: - t = Target.open(args.target) + t = open_target(args) except TargetError as e: log.error(e) # noqa: TRY400 log.debug("", exc_info=e) diff --git a/dissect/target/tools/qfind.py b/dissect/target/tools/qfind.py index eccdd7b39e..a7732d2d0a 100644 --- a/dissect/target/tools/qfind.py +++ b/dissect/target/tools/qfind.py @@ -13,11 +13,11 @@ from dissect.target.exceptions import TargetError from dissect.target.helpers.scrape import recover_string from dissect.target.plugins.scrape.qfind import QFindMatchRecord, QFindPlugin -from dissect.target.target import Target from dissect.target.tools.query import record_output from dissect.target.tools.utils import ( catch_sigpipe, configure_generic_arguments, + open_targets, process_generic_arguments, ) @@ -25,6 +25,7 @@ from typing import Callable from dissect.target.container import Container + from dissect.target.target import Target from dissect.target.volume import Volume log = logging.getLogger(__name__) @@ -43,7 +44,6 @@ def main() -> int: ) parser.add_argument("targets", metavar="TARGETS", nargs="*", help="Targets to load") - parser.add_argument("--children", action="store_true", help="include children") parser.add_argument( "-R", "--raw", action="store_true", help="show raw hex dumps instead of post-processed string output" ) @@ -58,7 +58,7 @@ def main() -> int: configure_generic_arguments(parser) args, _ = parser.parse_known_args() - process_generic_arguments(args) + process_generic_arguments(parser, args) if not args.targets: log.error("No targets provided") @@ -69,7 +69,7 @@ def main() -> int: rs = record_output(args.strings, args.json) try: - for target in Target.open_all(args.targets, args.children): + for target in open_targets(args): hit: QFindMatchRecord for hit in target.qfind( args.needles, diff --git a/dissect/target/tools/query.py b/dissect/target/tools/query.py index c684d206c0..087f2dac93 100644 --- a/dissect/target/tools/query.py +++ b/dissect/target/tools/query.py @@ -67,8 +67,6 @@ def main() -> int: add_help=False, ) parser.add_argument("targets", metavar="TARGETS", nargs="*", help="Targets to load") - parser.add_argument("--child", help="load a specific child path or index") - parser.add_argument("--children", action="store_true", help="include children") parser.add_argument("--direct", action="store_true", help="treat TARGETS as paths to pass to plugins directly") configure_plugin_arguments(parser) @@ -108,7 +106,7 @@ def main() -> int: parser.print_help() return 0 - process_generic_arguments(args) + process_generic_arguments(parser, args) if args.no_cache: cache.IGNORE_CACHE = True @@ -125,10 +123,11 @@ def main() -> int: "The --rewrite-cache option will be ignored as --no-cache or --only-read-cache are specified", ) + # Process plugin arguments after host and child args are checked different_output_types = process_plugin_arguments(parser, args, rest) if not args.targets: - parser.error("too few arguments") + parser.error("too few arguments - missing targets") if args.report_dir and not args.report_dir.is_dir(): parser.error(f"--report-dir {args.report_dir} is not a valid directory") @@ -148,6 +147,9 @@ def main() -> int: basic_entries = [] yield_entries = [] + if args.dry_run: + print("Dry run on:", target) + first_seen_output_type = default_output_type for func_def in find_and_filter_plugins(args.function, target, args.excluded_functions): diff --git a/dissect/target/tools/reg.py b/dissect/target/tools/reg.py index 72b12fb563..9ff62bd993 100644 --- a/dissect/target/tools/reg.py +++ b/dissect/target/tools/reg.py @@ -11,10 +11,10 @@ RegistryKeyNotFoundError, TargetError, ) -from dissect.target.target import Target from dissect.target.tools.utils import ( catch_sigpipe, configure_generic_arguments, + open_targets, process_generic_arguments, ) @@ -42,10 +42,10 @@ def main() -> int: configure_generic_arguments(parser) args, _ = parser.parse_known_args() - process_generic_arguments(args) + process_generic_arguments(parser, args) try: - for target in Target.open_all(args.targets): + for target in open_targets(args): if not target.has_function("registry"): target.log.error("Target has no Windows Registry") continue diff --git a/dissect/target/tools/shell.py b/dissect/target/tools/shell.py index 180615573d..7c9bb2dd68 100644 --- a/dissect/target/tools/shell.py +++ b/dissect/target/tools/shell.py @@ -50,6 +50,7 @@ execute_function_on_target, find_and_filter_plugins, generate_argparse_for_method, + open_targets, process_generic_arguments, ) @@ -1181,7 +1182,7 @@ def cmd_enter(self, args: argparse.Namespace, stdout: TextIO) -> bool: if args.python: # Quick path that doesn't require CLI caching - open_shell(paths, args.python, args.registry) + open_shell(list(open_targets(args)), args.python, args.registry) return False clikey = tuple(str(path) for path in paths) @@ -1477,10 +1478,8 @@ def build_pipe_stdout(pipe_parts: list[str]) -> Iterator[TextIO]: yield pipe_stdin -def open_shell(targets: list[str | pathlib.Path], python: bool, registry: bool, commands: list[str] | None) -> None: +def open_shell(targets: list[Target], python: bool, registry: bool, commands: list[str] | None) -> None: """Helper method for starting a regular, Python or registry shell for one or multiple targets.""" - targets = list(Target.open_all(targets)) - if python: python_shell(targets, commands=commands) else: @@ -1584,7 +1583,7 @@ def main() -> int: configure_generic_arguments(parser) args, _ = parser.parse_known_args() - process_generic_arguments(args) + process_generic_arguments(parser, args) # For the shell tool we want -q to log slightly more then just CRITICAL messages. if args.quiet: @@ -1602,7 +1601,7 @@ def main() -> int: ) try: - open_shell(args.targets, args.python, args.registry, args.commands) + open_shell(list(open_targets(args)), args.python, args.registry, args.commands) except TargetError as e: log.error("Error opening shell: %s", e) # noqa: TRY400 log.debug("", exc_info=e) diff --git a/dissect/target/tools/utils.py b/dissect/target/tools/utils.py index c061352a16..f6292b2d14 100644 --- a/dissect/target/tools/utils.py +++ b/dissect/target/tools/utils.py @@ -41,10 +41,40 @@ USAGE_FORMAT_TMPL = "{prog} -f {name}{usage}" +class _OverrideRequiredAction(argparse.Action): + """A special argparse action that sets all required arguments to not required. + + This is useful for implementing flags such as ``--list-children``, that should not be "blocked" by the ommission of + an otherwise required argument. + """ + + def __init__(self, option_strings: list, dest: str, **kwargs): + super().__init__( + option_strings=option_strings, + dest=dest, + nargs=0, + const=True, + default=False, + **kwargs, + ) + + def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, *args, **kwargs): + for action in parser._get_positional_actions(): + action.required = False + + setattr(namespace, self.dest, self.const) + + def configure_generic_arguments(parser: argparse.ArgumentParser) -> None: parser.add_argument("-K", "--keychain-file", type=Path, help="keychain file in CSV format") parser.add_argument("-Kv", "--keychain-value", help="passphrase, recovery key or key file path value") - parser.add_argument("-L", "--loader", action="store", help="select a specific loader (i.e. vmx, raw)") + parser.add_argument("-L", "--loader", help="select a specific loader (i.e. vmx, raw)") + parser.add_argument("--child", help="load child of target by path of index (see --list-children)") + parser.add_argument("--children", action="store_true", help="include children") + parser.add_argument( + "--list-children", action=_OverrideRequiredAction, help="list all children indices and paths, then exit" + ) + parser.add_argument("--recursive", action="store_true", help="make --list-children behave recursively") parser.add_argument("-v", "--verbose", action="count", default=0, help="increase output verbosity") parser.add_argument("--version", action="store_true", help="print version") parser.add_argument("-q", "--quiet", action="store_true", help="do not output logging information") @@ -57,7 +87,7 @@ def configure_generic_arguments(parser: argparse.ArgumentParser) -> None: ) -def process_generic_arguments(args: argparse.Namespace) -> None: +def process_generic_arguments(parser: argparse.ArgumentParser, args: argparse.Namespace) -> None: configure_logging(args.verbose, args.quiet, as_plain_text=True) if args.version: @@ -65,7 +95,7 @@ def process_generic_arguments(args: argparse.Namespace) -> None: print("dissect.target version " + version("dissect.target")) except PackageNotFoundError: print("unable to determine version") - sys.exit(0) + parser.exit(0) targets = args.targets if hasattr(args, "targets") else [args.target] if hasattr(args, "target") else [] if targets and args.loader: @@ -76,6 +106,14 @@ def process_generic_arguments(args: argparse.Namespace) -> None: elif hasattr(args, "target"): args.target = targets[0] + if args.child and len(getattr(args, "targets", [])) > 1: + parser.error("--child can only be used on a single target, not multiple") + + if args.list_children: + # List found children on targets and exit + list_children(args) + parser.exit(0) + if args.keychain_file: keychain.register_keychain_file(args.keychain_file) @@ -173,10 +211,31 @@ def process_plugin_arguments(parser: argparse.ArgumentParser, args: argparse.Nam return {func.output for func in funcs if func.path not in args.excluded_functions} +def open_target(args: argparse.Namespace) -> Target: + direct: bool = getattr(args, "direct", False) + child: str | None = getattr(args, "child", None) + + target = Target.open_direct(args.target) if direct else Target.open(args.target) + + if child: + try: + target.log.warning("Switching to --child %s", child) + target = target.open_child(child) + except Exception as e: + target.log.exception("Exception while opening child %r: %s", child, e) # noqa: TRY401 + target.log.debug("", exc_info=e) + + # Do not continue processing, we requested a child but got none + raise + + return target + + def open_targets(args: argparse.Namespace) -> Iterator[Target]: direct: bool = getattr(args, "direct", False) children: bool = getattr(args, "children", False) child: str | None = getattr(args, "child", None) + targets: Iterable[Target] = ( [Target.open_direct(args.targets)] if direct else Target.open_all(args.targets, children) ) @@ -184,13 +243,14 @@ def open_targets(args: argparse.Namespace) -> Iterator[Target]: for target in targets: if child: try: - target: Target = target.open_child(child) + target.log.warning("Switching to --child %s", child) + target = target.open_child(child) except Exception as e: target.log.exception("Exception while opening child %r: %s", child, e) # noqa: TRY401 target.log.debug("", exc_info=e) - if getattr(args, "dry_run", False): - print(f"Dry run on: {target}") + # Do not continue processing, we requested a child but got none + continue yield target @@ -236,6 +296,21 @@ def list_plugins( print("}") +def list_children(args: argparse.Namespace) -> None: + """Pretty print children of targets (recursively).""" + found_one = False + for target in open_targets(args) if hasattr(args, "targets") else [open_target(args)]: + for child_id, child in target.list_children(recursive=args.recursive): + found_one = True + prefix = "-" * child_id.count(".") + print( + f" {prefix}{' ' if prefix else ''}[{child_id}]: type={child.type!r} name={child.name!r} path={str(child.path)!r}" # noqa: E501 + ) + + if not found_one: + print("No children found on target(s)") + + def generate_argparse_for_method( method: Callable, usage_tmpl: str | None = None, diff --git a/dissect/target/tools/yara.py b/dissect/target/tools/yara.py index 3ed1feab2d..055c86b85f 100755 --- a/dissect/target/tools/yara.py +++ b/dissect/target/tools/yara.py @@ -6,11 +6,11 @@ from dissect.target.exceptions import TargetError from dissect.target.plugins.filesystem.yara import HAS_YARA, YaraPlugin -from dissect.target.target import Target from dissect.target.tools.query import record_output from dissect.target.tools.utils import ( catch_sigpipe, configure_generic_arguments, + open_targets, process_generic_arguments, ) @@ -28,7 +28,6 @@ def main() -> int: parser.add_argument("targets", metavar="TARGETS", nargs="*", help="Targets to load") parser.add_argument("-s", "--strings", action="store_true", help="print output as string") - parser.add_argument("--children", action="store_true", help="include children") for args, kwargs in getattr(YaraPlugin.yara, "__args__", []): parser.add_argument(*args, **kwargs) @@ -36,7 +35,7 @@ def main() -> int: configure_generic_arguments(parser) args, _ = parser.parse_known_args() - process_generic_arguments(args) + process_generic_arguments(parser, args) if not HAS_YARA: log.error("yara-python is not installed: pip install yara-python") @@ -47,7 +46,7 @@ def main() -> int: return 1 try: - for target in Target.open_all(args.targets, args.children): + for target in open_targets(args): rs = record_output(args.strings, False) for record in target.yara(args.rules, args.path, args.max_size, args.check): rs.write(record) diff --git a/tests/_data/plugins/child/parallels/config.pvs b/tests/_data/plugins/child/parallels/config.pvs new file mode 100644 index 0000000000..2a49fc6d27 --- /dev/null +++ b/tests/_data/plugins/child/parallels/config.pvs @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:57676b297eaa4b094a488817f31561199919c177abf80d901fa1a0b841bdbfd1 +size 1470 diff --git a/tests/_data/plugins/child/virtualbox/vm.vbox b/tests/_data/plugins/child/virtualbox/vm.vbox new file mode 100644 index 0000000000..02e85e1bf7 --- /dev/null +++ b/tests/_data/plugins/child/virtualbox/vm.vbox @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:40fa532aeaeeb94af09f12803c3bbb7311b579c9cb49dd868d872fbfff02f38f +size 2744 diff --git a/tests/_data/plugins/child/virtuozzo/example.conf b/tests/_data/plugins/child/virtuozzo/example.conf new file mode 100644 index 0000000000..1d8551f391 --- /dev/null +++ b/tests/_data/plugins/child/virtuozzo/example.conf @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:3723262396699180e269ab982e4e7748fb634d57e57fc0153c062b231ef9cde8 +size 332 diff --git a/tests/plugins/apps/container/test_docker.py b/tests/plugins/apps/container/test_docker.py index 011debe4db..36e7542f62 100644 --- a/tests/plugins/apps/container/test_docker.py +++ b/tests/plugins/apps/container/test_docker.py @@ -101,7 +101,7 @@ def test_docker_plugin_containers(target_unix_users: Target, fs_unix: VirtualFil "0.0.0.0:5678->5678/tcp", ":::5678->5678/tcp", ] - assert result.names == "example_container_name" + assert result.name == "example_container_name" assert result.volumes == ["/tmp/test:/test"] assert result.config_path == path.from_posix(f"/var/lib/docker/containers/{id}/config.v2.json") assert result.mount_path == path.from_posix(f"/var/lib/docker/image/overlay2/layerdb/mounts/{id}") @@ -218,7 +218,7 @@ def test_regression_running_container_parsing(target_unix: Target, fs_unix: Virt assert results[0].started == datetime.datetime(2024, 12, 31, 13, 37, 0, 123456, tzinfo=datetime.timezone.utc) assert results[0].finished is None assert results[0].ports == ["0.0.0.0:1337->1337/tcp", "0.0.0.0:1337->1337/udp"] - assert results[0].names == "foo" + assert results[0].name == "foo" assert results[0].volumes == ["/somewhere/on/host/file.txt:/dest/file.txt"] assert results[0].environment == ["FOO=bar", "HELLO=world"] assert results[0].mount_path == "/var/lib/docker/image/overlay2/layerdb/mounts/deadbeef" diff --git a/tests/plugins/apps/container/test_podman.py b/tests/plugins/apps/container/test_podman.py index 51b532eaa1..77e8de3345 100644 --- a/tests/plugins/apps/container/test_podman.py +++ b/tests/plugins/apps/container/test_podman.py @@ -76,7 +76,7 @@ def test_podman_containers(target_unix_podman: Target, fs_unix: VirtualFilesyste target_unix_podman.add_plugin(PodmanPlugin) records = list(target_unix_podman.container.containers()) - assert sorted([r.names for r in records]) == [ + assert sorted([r.name for r in records]) == [ "boring_mirzakhani", "fervent_proskuriakova", "hardcore_khayyam", @@ -93,7 +93,7 @@ def test_podman_containers(target_unix_podman: Target, fs_unix: VirtualFilesyste assert records[0].started == datetime(2025, 4, 9, 11, 37, 42, 68128, tzinfo=timezone.utc) assert records[0].finished == datetime(1, 1, 1, tzinfo=timezone.utc) assert records[0].ports == [] - assert records[0].names == "hardcore_khayyam" + assert records[0].name == "hardcore_khayyam" assert records[0].volumes == ["/tmp/host-folder:/data"] assert records[0].environment == [ "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", diff --git a/tests/plugins/child/test_colima.py b/tests/plugins/child/test_colima.py index 957a9e102f..a23be16da1 100644 --- a/tests/plugins/child/test_colima.py +++ b/tests/plugins/child/test_colima.py @@ -15,15 +15,26 @@ def test_child_colima(target_unix_users: Target, fs_unix: VirtualFilesystem) -> fs_unix.map_file_fh("/root/.colima/_lima/colima/diffdisk", BytesIO()) fs_unix.map_file_fh("/root/.colima/test/colima.yaml", BytesIO()) fs_unix.map_file_fh("/root/.colima/_lima/colima-test/diffdisk", BytesIO()) - fs_unix.map_file_fh("/home/user/.colima/default/colima.yaml", BytesIO()) - fs_unix.map_file_fh("/home/user/.colima/_lima/colima/diffdisk", BytesIO()) + fs_unix.map_file_fh("/home/user/.config/colima/default/colima.yaml", BytesIO()) + fs_unix.map_file_fh("/home/user/.config/colima/_lima/colima/diffdisk", BytesIO()) target_unix_users.add_plugin(ColimaChildTargetPlugin) children = list(target_unix_users.list_children()) assert len(children) == 3 - assert children[0].type == "colima" - assert children[0].path == "/root/.colima/_lima/colima/diffdisk" - assert children[1].path == "/root/.colima/_lima/colima-test/diffdisk" - assert children[2].path == "/home/user/.colima/_lima/colima/diffdisk" + + child_id, child_record = children[0] + assert child_id == "0" + assert child_record.type == "colima" + assert child_record.name == "default" + assert child_record.path == "/root/.colima/_lima/colima/diffdisk" + + child_id, child_record = children[1] + assert child_id == "1" + assert child_record.name == "test" + assert child_record.path == "/root/.colima/_lima/colima-test/diffdisk" + + child_id, child_record = children[2] + assert child_id == "2" + assert child_record.path == "/home/user/.config/colima/_lima/colima/diffdisk" diff --git a/tests/plugins/child/test_docker.py b/tests/plugins/child/test_docker.py index 0d442c089a..48f6caddd1 100644 --- a/tests/plugins/child/test_docker.py +++ b/tests/plugins/child/test_docker.py @@ -10,10 +10,11 @@ def test_docker(target_linux_docker: Target) -> None: target_linux_docker.add_plugin(DockerChildTargetPlugin) - children = sorted(target_linux_docker.list_children(), key=lambda r: r.path) + children = sorted([child for _, child in target_linux_docker.list_children()], key=lambda r: r.path) assert len(children) == 3 assert children[0].type == "docker" + assert children[0].name == "epic_lichterman" assert [c.path for c in children] == [ "/var/lib/docker/image/overlay2/layerdb/mounts/01b646bc043eb4ad72f3a64b4ffd9be2cbeb399e0a07497d749d724460ccad3a", diff --git a/tests/plugins/child/test_esxi.py b/tests/plugins/child/test_esxi.py new file mode 100644 index 0000000000..b257fc572b --- /dev/null +++ b/tests/plugins/child/test_esxi.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from io import BytesIO + +from dissect.target.filesystem import VirtualFilesystem +from dissect.target.plugins.child.esxi import ESXiChildTargetPlugin +from dissect.target.plugins.os.unix.esxi._os import ESXiPlugin +from dissect.target.target import Target + + +def test_esxi_children() -> None: + """Test that the ESXi child target plugin lists children correctly.""" + vfs = VirtualFilesystem() + vfs.map_file_fh( + "/etc/vmware/hostd/vmInventory.xml", + BytesIO(b""" + + + 1 + + /vmfs/volumes/6800e48c-3dcffc58-6af7-bc2411ec8065/Alpine/Alpine.vmx + +"""), + ) + vfs.map_file_fh( + "/vmfs/volumes/6800e48c-3dcffc58-6af7-bc2411ec8065/Alpine/Alpine.vmx", BytesIO(b'displayName = "Alpine"') + ) + + target = Target() + target._os_plugin = ESXiPlugin(target) + target.filesystems.add(vfs) + target.fs.mount("/", vfs) + target.apply() + + target.add_plugin(ESXiChildTargetPlugin) + + children = [child for _, child in target.list_children()] + + assert len(children) == 1 + + assert children[0].type == "esxi" + assert children[0].name == "Alpine" + assert str(children[0].path) == "/vmfs/volumes/6800e48c-3dcffc58-6af7-bc2411ec8065/Alpine/Alpine.vmx" diff --git a/tests/plugins/child/test_hyperv.py b/tests/plugins/child/test_hyperv.py index 5167c0dc9f..3e0cd37830 100644 --- a/tests/plugins/child/test_hyperv.py +++ b/tests/plugins/child/test_hyperv.py @@ -15,6 +15,14 @@ def test_wsl(target_win: Target, fs_win: VirtualFilesystem) -> None: "ProgramData/Microsoft/Windows/Hyper-V/data.vmcx", absolute_path("_data/plugins/child/hyperv/data.vmcx"), ) + fs_win.map_file( + "Hyper-V/EasyToFind/Virtual Machines/EC04F346-DB96-4700-AF5B-77B3C56C38BD.vmcx", + absolute_path("_data/loaders/hyperv/EC04F346-DB96-4700-AF5B-77B3C56C38BD.vmcx"), + ) + fs_win.map_file( + "Hyper-V/EasyToFind/Virtual Machines/993F7B33-6057-4D1E-A1FE-A1A1D77BE974.vmcx", + absolute_path("_data/loaders/hyperv/993F7B33-6057-4D1E-A1FE-A1A1D77BE974.vmcx"), + ) fs_win.map_file( "ProgramData/Microsoft/Windows/Hyper-V/Virtual Machines/B90AC31B-C6F8-479F-9B91-07B894A6A3F6.xml", absolute_path("_data/loaders/hyperv/B90AC31B-C6F8-479F-9B91-07B894A6A3F6.xml"), @@ -22,15 +30,19 @@ def test_wsl(target_win: Target, fs_win: VirtualFilesystem) -> None: target_win.add_plugin(HyperVChildTargetPlugin) - children = list(target_win.list_children()) + children = [child for _, child in target_win.list_children()] assert len(children) == 5 + assert children[0].name == "Default Generation 1" assert ( str(children[0].path) == "C:\\Hyper-V\\EasyToFind\\Virtual Machines\\EC04F346-DB96-4700-AF5B-77B3C56C38BD.vmcx" ) + + assert children[1].name == "Default Generation 2" assert ( str(children[1].path) == "C:\\Hyper-V\\EasyToFind\\Virtual Machines\\993F7B33-6057-4D1E-A1FE-A1A1D77BE974.vmcx" ) + assert ( str(children[2].path) == "C:\\VM\\Other Generation 1\\Virtual Machines\\A5B56431-CA94-482A-B70A-F1F2B12373BE.vmcx" @@ -39,6 +51,8 @@ def test_wsl(target_win: Target, fs_win: VirtualFilesystem) -> None: str(children[3].path) == "C:\\VM\\Other Generation 2\\Virtual Machines\\4C57771A-3230-4B92-B029-D63F96518E70.vmcx" ) + + assert children[4].name == "Default Generation 1" assert ( str(children[4].path) == "\\sysvol\\ProgramData\\Microsoft\\Windows\\Hyper-V\\Virtual Machines\\B90AC31B-C6F8-479F-9B91-07B894A6A3F6.xml" # noqa E501 diff --git a/tests/plugins/child/test_lima.py b/tests/plugins/child/test_lima.py new file mode 100644 index 0000000000..8bc8723755 --- /dev/null +++ b/tests/plugins/child/test_lima.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from io import BytesIO +from typing import TYPE_CHECKING + +from dissect.target.plugins.child.lima import LimaChildTargetPlugin + +if TYPE_CHECKING: + from dissect.target.filesystem import VirtualFilesystem + from dissect.target.target import Target + + +def test_child_colima(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: + fs_unix.map_file_fh("/root/.lima/default/diffdisk", BytesIO()) + fs_unix.map_file_fh("/root/.lima/docker/diffdisk", BytesIO()) + fs_unix.map_file_fh("/home/user/.config/lima/ligma/diffdisk", BytesIO()) + + target_unix_users.add_plugin(LimaChildTargetPlugin) + + children = [child for _, child in target_unix_users.list_children()] + + assert len(children) == 3 + + assert children[0].type == "lima" + assert children[0].name == "default" + assert children[0].path == "/root/.lima/default/diffdisk" + + assert children[1].name == "docker" + assert children[1].path == "/root/.lima/docker/diffdisk" + + assert children[2].path == "/home/user/.config/lima/ligma/diffdisk" diff --git a/tests/plugins/child/test_parallels.py b/tests/plugins/child/test_parallels.py index 4eeb4a17ab..20cf61f719 100644 --- a/tests/plugins/child/test_parallels.py +++ b/tests/plugins/child/test_parallels.py @@ -3,6 +3,7 @@ from typing import TYPE_CHECKING from dissect.target.plugins.child.parallels import ParallelsChildTargetPlugin +from tests._utils import absolute_path if TYPE_CHECKING: from dissect.target.filesystem import VirtualFilesystem @@ -12,20 +13,35 @@ def test_parallels_child_detection(target_macos_users: Target, fs_macos: VirtualFilesystem) -> None: """Test if we correctly find Parallels child VMs on MacOS targets.""" - fs_macos.makedirs("Users/dissect/Parallels/Windows 11.pvm") - fs_macos.makedirs("Users/dissect/Documents/Parallels/Windows 10.pvm") - fs_macos.makedirs( - "Users/dissect/Library/Group Containers/someversionnumber.com.parallels.desktop.appstore/Shared/Parallels/Windows 8.pvm" # noqa: E501 + config = absolute_path("_data/plugins/child/parallels/config.pvs") + fs_macos.map_file("Users/dissect/Parallels/Windows 11.pvm/config.pvs", config) + fs_macos.map_file("Users/dissect/Documents/Parallels/Windows 10.pvm/config.pvs", config) + fs_macos.map_file("Users/Shared/Parallels/Windows 7.pvm/config.pvs", config) + fs_macos.map_file( + "Users/dissect/Library/Group Containers/svn.com.parallels.desktop.appstore/Shared/Parallels/My VM.pvm/config.pvs", # noqa: E501 + config, ) - fs_macos.makedirs("Users/Shared/Parallels/Windows 7.pvm") target_macos_users.add_plugin(ParallelsChildTargetPlugin) - children = list(target_macos_users.list_children()) + children = [child for _, child in target_macos_users.list_children()] assert len(children) == 4 - assert [c.path for c in children] == [ - "/Users/Shared/Parallels/Windows 7.pvm", - "/Users/dissect/Parallels/Windows 11.pvm", - "/Users/dissect/Documents/Parallels/Windows 10.pvm", - "/Users/dissect/Library/Group Containers/someversionnumber.com.parallels.desktop.appstore/Shared/Parallels/Windows 8.pvm", # noqa: E501 - ] + + assert children[0].type == "parallels" + assert children[0].name == "My VM" + assert children[0].path == "/Users/Shared/Parallels/Windows 7.pvm" + + assert children[1].type == "parallels" + assert children[1].name == "My VM" + assert children[1].path == "/Users/dissect/Parallels/Windows 11.pvm" + + assert children[2].type == "parallels" + assert children[2].name == "My VM" + assert children[2].path == "/Users/dissect/Documents/Parallels/Windows 10.pvm" + + assert children[3].type == "parallels" + assert children[3].name == "My VM" + assert ( + children[3].path + == "/Users/dissect/Library/Group Containers/svn.com.parallels.desktop.appstore/Shared/Parallels/My VM.pvm" + ) diff --git a/tests/plugins/child/test_podman.py b/tests/plugins/child/test_podman.py index 54da85780d..7e58cf098b 100644 --- a/tests/plugins/child/test_podman.py +++ b/tests/plugins/child/test_podman.py @@ -8,19 +8,17 @@ if TYPE_CHECKING: from dissect.target import Target - from dissect.target.filesystem import VirtualFilesystem -def test_plugins_child_podman(target_unix_podman: Target, fs_unix: VirtualFilesystem) -> None: # noqa: F811 +def test_plugins_child_podman(target_unix_podman: Target) -> None: # noqa: F811 """Test if we can find, parse and correctly yield child Podman targets.""" - target_unix_podman.add_plugin(PodmanPlugin) target_unix_podman.add_plugin(PodmanChildTargetPlugin) - - children = sorted(target_unix_podman.list_children(), key=lambda r: r.path) + children = sorted([child for _, child in target_unix_podman.list_children()], key=lambda r: r.path) assert len(children) == 3 assert children[0].type == "podman" + assert children[0].name == "zen_taussig" assert sorted([c.path for c in children]) == [ "/home/user/.local/share/containers/storage/overlay/04a40aded310ba9deffbd5b5b0120a0a4416e6083420e338e998250f1a2e2f2b", diff --git a/tests/plugins/child/test_proxmox.py b/tests/plugins/child/test_proxmox.py new file mode 100644 index 0000000000..266e974516 --- /dev/null +++ b/tests/plugins/child/test_proxmox.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from io import BytesIO + +from dissect.target.filesystem import VirtualFilesystem +from dissect.target.plugins.child.proxmox import ProxmoxChildTargetPlugin +from dissect.target.plugins.os.unix.linux.debian.proxmox._os import ProxmoxPlugin +from dissect.target.target import Target + + +def test_proxmox_children() -> None: + """Test that the Proxmox child target plugin lists children correctly.""" + vfs = VirtualFilesystem() + vfs.map_file_fh("/etc/pve/qemu-server/100.conf", BytesIO(b"name: VM-100")) + vfs.map_file_fh("/etc/pve/qemu-server/101.conf", BytesIO(b"name: VM-101")) + + target = Target() + target._os_plugin = ProxmoxPlugin(target) + target.filesystems.add(vfs) + target.fs.mount("/", vfs) + target.apply() + + target.add_plugin(ProxmoxChildTargetPlugin) + + children = [child for _, child in target.list_children()] + + assert len(children) == 2 + + assert children[0].type == "proxmox" + assert children[0].name == "VM-100" + assert str(children[0].path) == "/etc/pve/qemu-server/100.conf" + + assert children[1].type == "proxmox" + assert children[1].name == "VM-101" + assert str(children[1].path) == "/etc/pve/qemu-server/101.conf" diff --git a/tests/plugins/child/test_qemu.py b/tests/plugins/child/test_qemu.py index c1f2a07f06..3331090ef5 100644 --- a/tests/plugins/child/test_qemu.py +++ b/tests/plugins/child/test_qemu.py @@ -17,8 +17,10 @@ def test_compatible(target_linux: Target, fs_linux: VirtualFilesystem) -> None: with pytest.raises(UnsupportedPluginError): QemuChildTargetPlugin(target_linux).check_compatible() - qemu_xml = absolute_path("_data/loaders/libvirt/qemu.xml") - fs_linux.map_file("/etc/libvirt/qemu/qemu.xml", qemu_xml) + fs_linux.map_file( + "/etc/libvirt/qemu/qemu.xml", + absolute_path("_data/loaders/libvirt/qemu.xml"), + ) QemuChildTargetPlugin(target_linux).check_compatible() @@ -35,4 +37,5 @@ def test_list_children(target_linux: Target, fs_linux: VirtualFilesystem) -> Non child = children[0] assert child.type == "qemu" + assert child.name == "linux2022" assert child.path == "/etc/libvirt/qemu/linux2022.xml" diff --git a/tests/plugins/child/test_virtualbox.py b/tests/plugins/child/test_virtualbox.py index f3c12ef6be..9dbfca69c2 100755 --- a/tests/plugins/child/test_virtualbox.py +++ b/tests/plugins/child/test_virtualbox.py @@ -19,8 +19,10 @@ def test_child_virtualbox_linux(target_unix_users: Target, fs_unix: VirtualFiles absolute_path("_data/plugins/child/virtualbox/VirtualBox.xml"), ) + vbox = absolute_path("_data/plugins/child/virtualbox/vm.vbox") + # vbox to be found by traversing MachineRegistry values - fs_unix.map_file_fh("/example/vms/example-vm/example-vm.vbox", BytesIO()) + fs_unix.map_file("/example/vms/example-vm/example-vm.vbox", vbox) fs_unix.map_file_fh("/example/vms/second-vm/second-vm.vbox", BytesIO()) fs_unix.map_file_fh("/example/vms/third-vm/third-vm.vbox", BytesIO()) @@ -37,7 +39,13 @@ def test_child_virtualbox_linux(target_unix_users: Target, fs_unix: VirtualFiles ) target_unix_users.add_plugin(VirtualBoxChildTargetPlugin) - children = list(target_unix_users.list_children()) + children = [child for _, child in target_unix_users.list_children()] + + assert len(children) == 5 + + assert children[0].type == "virtualbox" + assert children[0].name == "test_vm" + assert children[0].path == "/example/vms/example-vm/example-vm.vbox" assert sorted(map(str, [child.path for child in children])) == [ "/example/vms/example-vm/example-vm.vbox", diff --git a/tests/plugins/child/test_virtuozzo.py b/tests/plugins/child/test_virtuozzo.py index 051d1ca8ef..27b040fbfc 100644 --- a/tests/plugins/child/test_virtuozzo.py +++ b/tests/plugins/child/test_virtuozzo.py @@ -3,6 +3,7 @@ from typing import TYPE_CHECKING from dissect.target.plugins.child.virtuozzo import VirtuozzoChildTargetPlugin +from tests._utils import absolute_path if TYPE_CHECKING: from dissect.target.filesystem import VirtualFilesystem @@ -10,14 +11,20 @@ def test_virtuozzo(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + fs_unix.map_file("/etc/vz/conf/a.conf", absolute_path("_data/plugins/child/virtuozzo/example.conf")) + fs_unix.makedirs("vz/root/a") fs_unix.makedirs("vz/root/b") target_unix.add_plugin(VirtuozzoChildTargetPlugin) - children = list(target_unix.list_children()) + children = [child for _, child in target_unix.list_children()] + assert len(children) == 2 + assert children[0].type == "virtuozzo" + assert children[0].name == "a" assert str(children[0].path) == "/vz/root/a" + assert children[1].type == "virtuozzo" assert str(children[1].path) == "/vz/root/b" diff --git a/tests/plugins/child/test_vmware_workstation.py b/tests/plugins/child/test_vmware_workstation.py index a2a5d0db7f..e415db0c48 100644 --- a/tests/plugins/child/test_vmware_workstation.py +++ b/tests/plugins/child/test_vmware_workstation.py @@ -30,10 +30,15 @@ def test_child_vmware_workstation( fs = request.getfixturevalue(fs) fs.map_file(inventory_path, absolute_path("_data/plugins/child/vmware_workstation/inventory.vmls")) + target.add_plugin(VmwareWorkstationChildTargetPlugin) - children = list(target.list_children()) + children = [child for _, child in target.list_children()] assert len(children) == 3 + assert children[0].type == "vmware_workstation" + assert children[0].name == "First Virtual Machine" + assert children[0].path == "/path/to/first/vm/vm.vmx" + assert [c.path for c in children] == [ "/path/to/first/vm/vm.vmx", "/path/to/second/vm/vm.vmx", diff --git a/tests/plugins/child/test_wsl.py b/tests/plugins/child/test_wsl.py index 5b7c8c8f39..f91a23b1d9 100644 --- a/tests/plugins/child/test_wsl.py +++ b/tests/plugins/child/test_wsl.py @@ -29,15 +29,26 @@ def test_wsl(target_win_users: Target, hive_hku: VirtualHive, fs_win: VirtualFil "C:\\Users\\John\\AppData\\Local\\Packages\\CanonicalGroupLimited.Ubuntu22.04LTS_79rhkp1fndgsc\\LocalState", ), ) + wsl_key.add_value( + "DistributionName", + VirtualValue( + hive_hku, + "DistributionName", + "my_wsl_name", + ), + ) wsl_keys.add_subkey(wsl_key.name, wsl_key) hive_hku.map_key(wsl_keys_name, wsl_keys) target_win_users.add_plugin(WSLChildTargetPlugin) - children = list(target_win_users.list_children()) + children = [child for _, child in target_win_users.list_children()] + assert len(children) == 1 + assert children[0].type == "wsl" + assert children[0].name == "my_wsl_name" assert ( str(children[0].path) == "C:\\Users\\John\\AppData\\Local\\Packages\\CanonicalGroupLimited.Ubuntu22.04LTS_79rhkp1fndgsc\\LocalState\\ext4.vhdx" # noqa E501 diff --git a/tests/test_target.py b/tests/test_target.py index bc9b724597..e51bd8b4d7 100644 --- a/tests/test_target.py +++ b/tests/test_target.py @@ -17,6 +17,7 @@ from dissect.target.filesystem import VirtualFilesystem from dissect.target.filesystems.dir import DirectoryFilesystem from dissect.target.helpers.fsutil import TargetPath +from dissect.target.helpers.record import ChildTargetRecord from dissect.target.loaders.dir import DirLoader from dissect.target.loaders.raw import RawLoader from dissect.target.loaders.vbox import VBoxLoader @@ -770,3 +771,79 @@ def test_exception_invalid_path() -> None: match=r"Failed to find any loader for targets: \['smb://invalid'\]", ): next(Target.open_all("smb://invalid")) + + +def test_list_children() -> None: + """Test that ``list_children`` returns child records.""" + + class MockChildTargetPlugin: + def list_children(self) -> Iterator[ChildTargetRecord]: + yield ChildTargetRecord(type="mock", name="child0", path="/mock/child0") + yield ChildTargetRecord(type="mock", name="child1", path="/mock/child1") + + target = Target() + target._child_plugins = {"mock": MockChildTargetPlugin()} + + children = list(target.list_children()) + assert len(children) == 2 + + child_id, child_record = children[0] + assert child_id == "0" + assert child_record.type == "mock" + assert child_record.name == "child0" + assert child_record.path == "/mock/child0" + + child_id, child_record = children[1] + assert child_id == "1" + assert child_record.type == "mock" + assert child_record.name == "child1" + assert child_record.path == "/mock/child1" + + +def test_list_children_recursive() -> None: + """Test that ``list_children(recursive=True)`` returns child records recursively.""" + + class MockChildTargetPlugin: + def list_children(self) -> Iterator[ChildTargetRecord]: + yield ChildTargetRecord(type="mock", name="child0", path="/mock/child0") + yield ChildTargetRecord(type="mock", name="child1", path="/mock/child1") + + class EmptyChildTargetPlugin: + def list_children(self) -> Iterator[ChildTargetRecord]: + return iter([]) + + target = Target() + target._child_plugins = {"mock": MockChildTargetPlugin()} + + child_0_target = Target() + child_0_target._child_plugins = {"mock": EmptyChildTargetPlugin()} + child_1_target = Target() + child_1_target._child_plugins = {"mock": MockChildTargetPlugin()} + + child_1_0_target = Target() + child_1_0_target._child_plugins = {"mock": MockChildTargetPlugin()} + child_1_1_target = Target() + child_1_1_target._child_plugins = {"mock": EmptyChildTargetPlugin()} + + child_target_1_0_0 = Target() + child_target_1_0_0._child_plugins = {"mock": EmptyChildTargetPlugin()} + child_target_1_0_1 = Target() + child_target_1_0_1._child_plugins = {"mock": MockChildTargetPlugin()} + + target.open_child = lambda path: child_0_target if path == "/mock/child0" else child_1_target + child_1_target.open_child = lambda path: child_1_0_target if path == "/mock/child0" else child_1_1_target + child_1_0_target.open_child = lambda path: child_target_1_0_0 if path == "/mock/child0" else child_target_1_0_1 + child_target_1_0_1.open_child = Mock(side_effect=TargetError("Mock error")) + + children = list(target.list_children(recursive=True)) + + assert [child_id for child_id, _ in children] == [ + "0", + "1", + "1.0", + "1.0.0", + "1.0.1", + "1.0.1.0", + "1.0.1.1", + "1.1", + ] diff --git a/tests/tools/test_mount.py b/tests/tools/test_mount.py index ad03b339b0..c0226ff562 100644 --- a/tests/tools/test_mount.py +++ b/tests/tools/test_mount.py @@ -20,12 +20,10 @@ def test_duplicate_volume_name(target_bare: Target, monkeypatch: pytest.MonkeyPa m.setattr("dissect.target.tools.mount.HAS_FUSE", True) with ( - patch("dissect.target.tools.mount.Target") as MockTarget, + patch("dissect.target.tools.mount.open_target", return_value=target_bare), patch("dissect.target.tools.mount.FUSE", create=True) as MockFUSE, patch("dissect.target.tools.mount.DissectMount", create=True) as MockDissectMount, ): - MockTarget.open.return_value = target_bare - target_bare.volumes.add(Volume(BytesIO(), 1, 0, 0, None, name="first")) target_bare.volumes.add(Volume(BytesIO(), 2, 0, 0, None, name="second")) target_bare.volumes.add(Volume(BytesIO(), 3, 0, 0, None, name="second_1")) @@ -57,12 +55,10 @@ def test_mounting_multi_volume_filesystem(target_bare: Target, monkeypatch: pyte m.setattr("dissect.target.tools.mount.HAS_FUSE", True) with ( - patch("dissect.target.tools.mount.Target") as MockTarget, + patch("dissect.target.tools.mount.open_target", return_value=target_bare), patch("dissect.target.tools.mount.FUSE", create=True) as MockFUSE, patch("dissect.target.tools.mount.DissectMount", create=True) as MockDissectMount, ): - MockTarget.open.return_value = target_bare - volumes = [ Volume(BytesIO(), 1, 0, 0, None, name="first"), Volume(BytesIO(), 2, 0, 0, None, name="second"), diff --git a/tests/tools/test_utils.py b/tests/tools/test_utils.py index 4f5d733736..5a7daf6f48 100644 --- a/tests/tools/test_utils.py +++ b/tests/tools/test_utils.py @@ -111,7 +111,7 @@ def test_process_generic_arguments(monkeypatch: pytest.MonkeyPatch) -> None: ) as mocked_get_external_module_paths, patch("dissect.target.tools.utils.load_modules_from_paths") as mocked_load_modules_from_paths, ): - process_generic_arguments(args) + process_generic_arguments(parser, args) mocked_configure_logging.assert_called_once_with(0, False, as_plain_text=True) mocked_version.assert_called_once_with("dissect.target") @@ -126,7 +126,7 @@ def test_process_generic_arguments(monkeypatch: pytest.MonkeyPatch) -> None: del args.targets args.target = "target1" - process_generic_arguments(args) + process_generic_arguments(parser, args) assert args.target == "loader_name://target1"