From 8e48c16fc8d4f3b34bca3c32b2ead2ad21198f66 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Thu, 26 Feb 2026 13:03:48 +0100 Subject: [PATCH] Add middleware loader --- dissect/target/loader.py | 36 +++++++++++++++++++++++++++++++++++ dissect/target/loaders/vbk.py | 36 +++++++++++++++-------------------- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/dissect/target/loader.py b/dissect/target/loader.py index b37ee5a205..7a82a19cbd 100644 --- a/dissect/target/loader.py +++ b/dissect/target/loader.py @@ -152,6 +152,42 @@ def map(self, target: Target) -> None: raise NotImplementedError +class MiddlewareLoader(Loader): + """A base class for preparing arbitrary data to be used by other :class:`Loader`s. + + Instead of mapping data directly to a :class:`Target `, loaders of this type + prepare data in some way and make it available for other :class:`Loader`s to use. + + Subclasses should implement the :method:`detect` method like any other loader, and return a path to the prepared + data in the :method:`prepare` method . The loading mechanism will then use that path to find other loaders to map + the prepared data into the target. + + Feels like forever since I've heard the term "middleware", I'm bringing it back baby! + """ + + def __init__(self, path: Path, *, fallbacks: list[type[Loader]] | None = None, **kwargs): + super().__init__(path, **kwargs) + # This will be the loader that successfully mapped the prepared path + self.loader = None + + @staticmethod + def detect(path: Path) -> bool: + raise NotImplementedError + + def prepare(self, target: Target) -> Path: + raise NotImplementedError + + def map(self, target: Target) -> None: + path = self.prepare(target) + + if (loader := find_loader(path, fallbacks=[DirLoader, RawLoader])) is not None: + ldr = loader(path) + ldr.map(target) + + # Store a reference to the loader if we successfully mapped + self.loader = ldr + + def register(module_name: str, class_name: str, internal: bool = True) -> None: """Registers a ``Loader`` class inside ``LOADERS``. diff --git a/dissect/target/loaders/vbk.py b/dissect/target/loaders/vbk.py index 13d6f6c203..0e911bf537 100644 --- a/dissect/target/loaders/vbk.py +++ b/dissect/target/loaders/vbk.py @@ -7,8 +7,7 @@ from dissect.target.exceptions import LoaderError from dissect.target.filesystem import VirtualFilesystem from dissect.target.filesystems.vbk import VbkFilesystem -from dissect.target.loader import Loader, find_loader -from dissect.target.loaders.raw import RawLoader +from dissect.target.loader import MiddlewareLoader if TYPE_CHECKING: from pathlib import Path @@ -19,7 +18,7 @@ RE_RAW_DISK = re.compile(r"(?:[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})|(?:DEV__.+)") -class VbkLoader(Loader): +class VbkLoader(MiddlewareLoader): """Load Veaam Backup (VBK) files. References: @@ -35,7 +34,7 @@ def __init__(self, path: Path, **kwargs): def detect(path: Path) -> bool: return path.suffix.lower() == ".vbk" - def map(self, target: Target) -> None: + def prepare(self, target: Target) -> Path: # We haven't really researched any of the VBK metadata yet, so just try some common formats root = self.vbkfs.path("/") if (base := next(root.glob("*"), None)) is None: @@ -51,24 +50,19 @@ def map(self, target: Target) -> None: candidates.append(root.joinpath("+".join(map(str, disks)))) - # Try to find a loader - for candidate in candidates: - if candidate.suffix.lower() == ".vmcx": - # For VMCX files we need to massage the file layout a bit - vfs = VirtualFilesystem() - vfs.map_file_entry(candidate.name, candidate) + # We should only have one candidate at this point + if len(candidates) > 1: + raise LoaderError("Unsupported VBK structure, use `-L raw` to manually inspect the VBK") - for entry in chain(base.glob("Ide*/*"), base.glob("Scsi*/*")): - vfs.map_file_entry(entry.name, entry) + candidate = candidates[0] + if candidate.suffix.lower() == ".vmcx": + # For VMCX files we need to massage the file layout a bit + vfs = VirtualFilesystem() + vfs.map_file_entry(candidate.name, candidate) - candidate = vfs.path(candidate.name) + for entry in chain(base.glob("Ide*/*"), base.glob("Scsi*/*")): + vfs.map_file_entry(entry.name, entry) - if (loader := find_loader(candidate, fallbacks=[RawLoader])) is not None: - ldr = loader(candidate) - ldr.map(target) + candidate = vfs.path(candidate.name) - # Store a reference to the loader if we successfully mapped - self.loader = ldr - break - else: - raise LoaderError("Unsupported VBK structure, use `-L raw` to manually inspect the VBK") + return candidate