From 3e366b44d014fb9626f6c16b985d5dc1889127bf Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Thu, 6 Nov 2025 20:44:25 +0100 Subject: [PATCH] Add new DirEntry class --- dissect/target/filesystem.py | 212 ++++++++++++------ dissect/target/filesystems/ad1.py | 24 +- dissect/target/filesystems/btrfs.py | 44 ++-- dissect/target/filesystems/cb.py | 35 +-- dissect/target/filesystems/config.py | 26 ++- dissect/target/filesystems/cramfs.py | 31 +-- dissect/target/filesystems/dir.py | 26 ++- dissect/target/filesystems/exfat.py | 33 +-- dissect/target/filesystems/extfs.py | 46 ++-- dissect/target/filesystems/fat.py | 34 +-- dissect/target/filesystems/ffs.py | 40 ++-- dissect/target/filesystems/itunes.py | 11 +- dissect/target/filesystems/jffs.py | 31 +-- dissect/target/filesystems/nfs.py | 26 ++- dissect/target/filesystems/ntfs.py | 123 +++++++--- dissect/target/filesystems/qnxfs.py | 30 +-- dissect/target/filesystems/smb.py | 41 ++-- dissect/target/filesystems/squashfs.py | 39 ++-- dissect/target/filesystems/tar.py | 8 +- dissect/target/filesystems/vbk.py | 36 +-- dissect/target/filesystems/vmfs.py | 61 ++--- dissect/target/filesystems/xfs.py | 47 ++-- dissect/target/filesystems/zip.py | 21 +- dissect/target/helpers/compat/path_310.py | 13 +- dissect/target/helpers/compat/path_311.py | 13 +- dissect/target/helpers/compat/path_312.py | 13 +- dissect/target/helpers/compat/path_313.py | 13 +- dissect/target/helpers/compat/path_common.py | 2 +- dissect/target/helpers/fsutil.py | 26 +-- .../plugins/filesystem/unix/capability.py | 2 +- dissect/target/plugins/filesystem/walkfs.py | 1 - dissect/target/tools/diff.py | 9 +- tests/filesystems/test_cb.py | 4 +- tests/filesystems/test_exfat.py | 23 -- tests/filesystems/test_smb.py | 2 +- tests/helpers/test_fsutil.py | 34 ++- tests/plugins/apps/browser/test_chrome.py | 19 +- tests/plugins/apps/browser/test_edge.py | 16 +- 38 files changed, 700 insertions(+), 515 deletions(-) diff --git a/dissect/target/filesystem.py b/dissect/target/filesystem.py index bd041237b3..d4e24b45a8 100644 --- a/dissect/target/filesystem.py +++ b/dissect/target/filesystem.py @@ -6,6 +6,7 @@ import pathlib import stat from collections import defaultdict +from functools import cache from typing import TYPE_CHECKING, Any, BinaryIO, Final from dissect.target.exceptions import ( @@ -24,6 +25,8 @@ if TYPE_CHECKING: from collections.abc import Callable, Iterator + from typing_extensions import Self + from dissect.target.target import Target FILESYSTEMS: list[type[Filesystem]] = [] @@ -191,7 +194,7 @@ def iterdir(self, path: str) -> Iterator[str]: """ return self.get(path).iterdir() - def scandir(self, path: str) -> Iterator[FilesystemEntry]: + def scandir(self, path: str) -> Iterator[DirEntry]: """Iterate over the contents of a directory, return them as FilesystemEntry's. Args: @@ -222,7 +225,7 @@ def listdir_ext(self, path: str) -> list[FilesystemEntry]: Returns: A list of FilesystemEntry's. """ - return list(self.scandir(path)) + return [e.get() for e in self.scandir(path)] def walk( self, @@ -251,7 +254,7 @@ def walk_ext( topdown: bool = True, onerror: Callable[[Exception], None] | None = None, followlinks: bool = False, - ) -> Iterator[tuple[list[FilesystemEntry], list[FilesystemEntry], list[FilesystemEntry]]]: + ) -> Iterator[tuple[list[FilesystemEntry], list[DirEntry], list[DirEntry]]]: """Recursively walk a directory pointed to by ``path``, returning :class:`FilesystemEntry` of files and directories. @@ -266,7 +269,7 @@ def walk_ext( """ return self.get(path).walk_ext(topdown, onerror, followlinks) - def recurse(self, path: str) -> Iterator[FilesystemEntry]: + def recurse(self, path: str) -> Iterator[DirEntry]: """Recursively walk a directory and yield contents as :class:`FilesystemEntry`. Does not follow symbolic links. @@ -519,7 +522,7 @@ def __repr__(self) -> str: def __str__(self) -> str: return str(self.path) - def _resolve(self, follow_symlinks: bool = True) -> FilesystemEntry: + def _resolve(self, follow_symlinks: bool = True) -> Self: """Helper method to resolve symbolic links. If ``follow_symlinks`` is ``False``, this function is effectively a no-op. @@ -535,7 +538,7 @@ def _resolve(self, follow_symlinks: bool = True) -> FilesystemEntry: return self.readlink_ext() return self - def get(self, path: str) -> FilesystemEntry: + def get(self, path: str) -> Self: """Retrieve a :class:`FilesystemEntry` relative to this entry. Args: @@ -560,9 +563,10 @@ def iterdir(self) -> Iterator[str]: Returns: An iterator of directory entries as path strings. """ - raise NotImplementedError + for entry in self.scandir(): + yield entry.name - def scandir(self) -> Iterator[FilesystemEntry]: + def scandir(self) -> Iterator[DirEntry]: """Iterate over the contents of a directory, yields :class:`FilesystemEntry`. Returns: @@ -578,13 +582,13 @@ def listdir(self) -> list[str]: """ return list(self.iterdir()) - def listdir_ext(self) -> list[FilesystemEntry]: + def listdir_ext(self) -> list[Self]: """List the contents of a directory as a list of :class:`FilesystemEntry`. Returns: A list of :class:`FilesystemEntry`. """ - return list(self.scandir()) + return [e.get() for e in self.scandir()] def walk( self, @@ -615,7 +619,7 @@ def walk_ext( topdown: bool = True, onerror: Callable[[Exception], None] | None = None, followlinks: bool = False, - ) -> Iterator[tuple[list[FilesystemEntry], list[FilesystemEntry], list[FilesystemEntry]]]: + ) -> Iterator[tuple[list[Self], list[Self], list[Self]]]: """Recursively walk a directory and yield its contents as :class:`FilesystemEntry` split in a tuple of lists of files, directories and symlinks. @@ -629,13 +633,13 @@ def walk_ext( """ yield from fsutil.walk_ext(self, topdown, onerror, followlinks) - def recurse(self) -> Iterator[FilesystemEntry]: - """Recursively walk a directory and yield its contents as :class:`FilesystemEntry`. + def recurse(self) -> Iterator[DirEntry]: + """Recursively walk a directory and yield its contents as :class:`DirEntry`. Does not follow symbolic links. Returns: - An iterator of :class:`FilesystemEntry`. + An iterator of :class:`DirEntry`. """ yield from fsutil.recurse(self) @@ -651,7 +655,7 @@ def glob(self, pattern: str) -> Iterator[str]: for entry in self.glob_ext(pattern): yield entry.path - def glob_ext(self, pattern: str) -> Iterator[FilesystemEntry]: + def glob_ext(self, pattern: str) -> Iterator[Self]: """Iterate over the directory part of ``pattern``, returning entries matching ``pattern`` as :class:`FilesysmteEntry`. @@ -746,7 +750,7 @@ def readlink(self) -> str: The path the link points to.""" raise NotImplementedError - def readlink_ext(self) -> FilesystemEntry: + def readlink_ext(self) -> Self: """Read the link where this entry points to, return the resulting path as :class:`FilesystemEntry`. If it is a symlink and returns the string that corresponds to that path. @@ -841,14 +845,109 @@ def hash(self, algos: list[str] | list[Callable] | None = None) -> tuple[str]: return hashutil.common(self.open()) +class DirEntry: + """Directory entry base class. Closely models ``os.DirEntry``. + + Filesystem implementations are encouraged to subclass this class to provide efficient + implementations of the various methods. + + Args: + fs: The filesystem the entry belongs to. + path: The path of the parent directory. + name: The name of the entry. + entry: The raw entry backing this directory entry. + """ + + def __init__(self, fs: Filesystem, path: str, name: str, entry: Any): + self.fs = fs + """The filesystem the entry belongs to.""" + self.path = fsutil.join(path, name, alt_separator=self.fs.alt_separator) + """The full path of the entry.""" + self.name = name + """The name of the entry.""" + self.entry = entry + """The raw entry backing this directory entry.""" + + self.stat = cache(self.stat) + + def __fspath__(self) -> str: + return self.path + + def __repr__(self) -> str: + return f"" + + def get(self) -> FilesystemEntry: + """Retrieve the :class:`FilesystemEntry` this directory entry points to. + + Subclasses should override this method to provide an efficient implementation. + """ + return self.fs.get(self.path) + + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + """Return whether this entry is a directory or a symbolic link pointing to a directory. + + Subclasses should override this method to provide an efficient implementation. + """ + try: + return stat.S_ISDIR(self.stat(follow_symlinks=follow_symlinks).st_mode) + except FileNotFoundError: + return False + + def is_file(self, *, follow_symlinks: bool = True) -> bool: + """Return whether this entry is a file or a symbolic link pointing to a file. + + Subclasses should override this method to provide an efficient implementation. + """ + try: + return stat.S_ISREG(self.stat(follow_symlinks=follow_symlinks).st_mode) + except FileNotFoundError: + return False + + def is_symlink(self) -> bool: + """Return whether this entry is a symbolic link. + + Subclasses should override this method to provide an efficient implementation. + """ + return stat.S_ISLNK(self.stat(follow_symlinks=False).st_mode) + + def is_junction(self) -> bool: + """Return whether this entry is a junction (only valid for NTFS).""" + return False + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + """Return the stat information of this entry. + + Subclasses should override this method to provide an efficient implementation. + + Note that this may return slightly different information than a "full" stat on the full filesystem entry, + as in most cases this will generate a stat based on the information available in the directory entry only. + """ + return self.fs.stat(self.path, follow_symlinks=follow_symlinks) + + def inode(self) -> int: + """Return the inode number of this entry.""" + return self.stat(follow_symlinks=False).st_ino + + +class VirtualDirEntry(DirEntry): + fs: VirtualFilesystem + entry: FilesystemEntry + + def get(self) -> FilesystemEntry: + return self.entry + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.entry.stat(follow_symlinks=follow_symlinks) + + class VirtualDirectory(FilesystemEntry): """Virtual directory implementation. Backed by a dict.""" def __init__(self, fs: VirtualFilesystem, path: str): super().__init__(fs, path, None) self.up = None - self.top = None - self.entries = {} + self.top: FilesystemEntry | None = None + self.entries: dict[str, FilesystemEntry] = {} def __getitem__(self, item: str) -> FilesystemEntry: if not self.fs.case_sensitive: @@ -879,25 +978,11 @@ def add(self, name: str, entry: FilesystemEntry) -> None: def get(self, path: str) -> FilesystemEntry: return self.fs.get(path, relentry=self) - def iterdir(self) -> Iterator[str]: + def scandir(self) -> Iterator[DirEntry]: yielded = set() - for entry in self.entries: - yield entry - yielded.add(entry) - - # self.top used to be a reference to a filesystem. This is now a reference to - # any filesystem entry, usually the root of a filesystem. - if self.top: - for entry in self.top.iterdir(): - if entry in yielded or (not self.fs.case_sensitive and entry.lower() in yielded): - continue - yield entry - - def scandir(self) -> Iterator[FilesystemEntry]: - yielded = set() - for entry in self.entries.values(): - yield entry - yielded.add(entry.name) + for name, entry in self.entries.items(): + yield VirtualDirEntry(self.fs, self.path, entry.name, entry) + yielded.add(name) # self.top used to be a reference to a filesystem. This is now a reference to # any filesystem entry, usually the root of a filesystem. @@ -970,10 +1055,7 @@ def get(self, path: str) -> FilesystemEntry: return self raise NotADirectoryError(f"'{self.path}' is not a directory") - def iterdir(self) -> Iterator[str]: - raise NotADirectoryError(f"'{self.path}' is not a directory") - - def scandir(self) -> Iterator[FilesystemEntry]: + def scandir(self) -> Iterator[DirEntry]: raise NotADirectoryError(f"'{self.path}' is not a directory") def open(self) -> BinaryIO: @@ -1056,10 +1138,7 @@ def lattr(self) -> Any: def get(self, path: str) -> FilesystemEntry: return self.fs.get(path, self) - def iterdir(self) -> Iterator[str]: - yield from self.readlink_ext().iterdir() - - def scandir(self) -> Iterator[FilesystemEntry]: + def scandir(self) -> Iterator[DirEntry]: yield from self.readlink_ext().scandir() def open(self) -> BinaryIO: @@ -1517,6 +1596,14 @@ def __getattr__(self, attr: str) -> Any: return object.__getattribute__(self, attr) +class LayerDirEntry(DirEntry): + fs: LayerFilesystem + entry: list[DirEntry] + + def get(self) -> LayerFilesystemEntry: + return LayerFilesystemEntry(self.fs, self.path, [e.get() for e in self.entry]) + + class LayerFilesystemEntry(FilesystemEntry): def __init__(self, fs: Filesystem, path: str, entry: FilesystemEntry): super().__init__(fs, path, EntryList(entry)) @@ -1542,24 +1629,12 @@ def get(self, path: str) -> FilesystemEntry: def open(self) -> BinaryIO: return self._resolve()._exec("open") - def iterdir(self) -> Iterator[str]: - yielded = {".", ".."} - selfentry = self._resolve() - for fsentry in selfentry.entries: - for entry_name in fsentry.iterdir(): - name = entry_name if selfentry.fs.case_sensitive else entry_name.lower() - if name in yielded: - continue - - yield entry_name - yielded.add(name) - - def scandir(self) -> Iterator[LayerFilesystemEntry]: + def scandir(self) -> Iterator[LayerDirEntry]: # Every entry is actually a list of entries from the different # overlaying FSes, of which each may implement a different function # like .stat() or .open() items = defaultdict(list) - selfentry = self._resolve() + selfentry: LayerFilesystemEntry = self._resolve() for fsentry in selfentry.entries: for entry in fsentry.scandir(): name = entry.name if selfentry.fs.case_sensitive else entry.name.lower() @@ -1572,9 +1647,7 @@ def scandir(self) -> Iterator[LayerFilesystemEntry]: # The filename for the first entry is taken. Note that in case of # non case-sensitive FSes, the different entries from the # overlaying FSes may have different casing of the name. - entry_name = entries[0].name - path = fsutil.join(selfentry.path, entry_name, alt_separator=selfentry.fs.alt_separator) - yield LayerFilesystemEntry(selfentry.fs, path, entries) + yield DirEntry(selfentry.fs, selfentry.path, entries[0].name, entries) def is_file(self, follow_symlinks: bool = True) -> bool: try: @@ -1627,6 +1700,15 @@ def get(self, path: str, relentry: LayerFilesystemEntry | None = None) -> RootFi return entry +class RootDirEntry(LayerDirEntry): + fs: RootFilesystem + + def get(self) -> RootFilesystemEntry: + entry = super().get() + entry.__class__ = RootFilesystemEntry + return entry + + class RootFilesystemEntry(LayerFilesystemEntry): fs: RootFilesystem @@ -1640,14 +1722,10 @@ def open(self) -> BinaryIO: self.fs.target.log.trace("%r::open()", self) return super().open() - def iterdir(self) -> Iterator[str]: - self.fs.target.log.trace("%r::iterdir()", self) - yield from super().iterdir() - - def scandir(self) -> Iterator[RootFilesystemEntry]: + def scandir(self) -> Iterator[DirEntry]: self.fs.target.log.trace("%r::scandir()", self) for entry in super().scandir(): - entry.__class__ = RootFilesystemEntry + entry.__class__ = RootDirEntry yield entry def is_file(self, follow_symlinks: bool = True) -> bool: diff --git a/dissect/target/filesystems/ad1.py b/dissect/target/filesystems/ad1.py index 25a5189a17..c0d7dbb405 100644 --- a/dissect/target/filesystems/ad1.py +++ b/dissect/target/filesystems/ad1.py @@ -11,7 +11,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -39,6 +39,17 @@ def _get_entry(self, path: str) -> ad1.LogicalImage | ad1.FileEntry: raise FileNotFoundError(path) +class AD1DirEntry(DirEntry): + fs: AD1Filesystem + entry: ad1.FileEntry + + def get(self) -> AD1FilesystemEntry: + return AD1FilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class AD1FilesystemEntry(FilesystemEntry): def get(self, path: str) -> AD1FilesystemEntry: path = fsutil.join(self.path, path, alt_separator=self.alt_separator) @@ -49,19 +60,12 @@ def open(self) -> BinaryIO: raise IsADirectoryError(self.path) return self.entry.open() - def iterdir(self) -> Iterator[str]: - if not self.is_dir(): - raise NotADirectoryError(self.path) - - yield from self.entry.listdir().keys() - def scandir(self) -> Iterator[AD1FilesystemEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - for fname, file_ in self.entry.listdir().items(): - path = fsutil.join(self.path, fname, alt_separator=self.alt_separator) - yield AD1FilesystemEntry(self.fs, path, file_) + for name, entry in self.entry.listdir().items(): + yield AD1DirEntry(self.fs, self.path, name, entry) def is_file(self, follow_symlinks: bool = True) -> bool: return self.entry.is_file() diff --git a/dissect/target/filesystems/btrfs.py b/dissect/target/filesystems/btrfs.py index 5434367401..de6317e0ef 100644 --- a/dissect/target/filesystems/btrfs.py +++ b/dissect/target/filesystems/btrfs.py @@ -13,7 +13,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -91,42 +91,40 @@ def _get_node(self, path: str, node: btrfs.INode | None = None) -> btrfs.INode: raise FileNotFoundError(path) from e +class BtrfsDirEntry(DirEntry): + fs: BtrfsSubvolumeFilesystem + entry: btrfs.INode + + def get(self) -> BtrfsFilesystemEntry: + return BtrfsFilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class BtrfsFilesystemEntry(FilesystemEntry): - fs: BtrfsFilesystem + fs: BtrfsSubvolumeFilesystem entry: btrfs.INode def get(self, path: str) -> FilesystemEntry: - entry_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator) - entry = self.fs._get_node(path, self.entry) - return BtrfsFilesystemEntry(self.fs, entry_path, entry) + path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator) + return BtrfsFilesystemEntry(self.fs, path, self.fs._get_node(path, self.entry)) def open(self) -> BinaryIO: if self.is_dir(): raise IsADirectoryError(self.path) return self._resolve().entry.open() - def _iterdir(self) -> Iterator[btrfs.INode]: + def scandir(self) -> Iterator[BtrfsDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - for entry in self.readlink_ext().iterdir(): - yield entry - else: - for name, entry in self.entry.iterdir(): - if name in (".", ".."): - continue - - yield name, entry - - def iterdir(self) -> Iterator[str]: - for name, _ in self._iterdir(): - yield name + for name, entry in self._resolve().entry.iterdir(): + if name in (".", ".."): + continue - def scandir(self) -> Iterator[FilesystemEntry]: - for name, entry in self._iterdir(): - entry_path = fsutil.join(self.path, name, alt_separator=self.fs.alt_separator) - yield BtrfsFilesystemEntry(self.fs, entry_path, entry) + # TODO: Separate INodes and DirEntry in dissect.btrfs + yield BtrfsDirEntry(self.fs, self.path, name, entry) def is_dir(self, follow_symlinks: bool = True) -> bool: try: diff --git a/dissect/target/filesystems/cb.py b/dissect/target/filesystems/cb.py index 2fd4bc551f..6e438fc839 100644 --- a/dissect/target/filesystems/cb.py +++ b/dissect/target/filesystems/cb.py @@ -8,7 +8,7 @@ from dissect.util import ts from dissect.target.exceptions import FileNotFoundError, NotADirectoryError -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -86,6 +86,18 @@ def get(self, path: str) -> CbFilesystemEntry: raise FileNotFoundError(path) +class CbDirEntry(DirEntry): + fs: CbFilesystem + entry: tuple[dict[str, Any], str] + + def get(self) -> CbFilesystemEntry: + entry, cbpath = self.entry + return CbFilesystemEntry(self.fs, self.path, entry, cbpath) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class CbFilesystemEntry(FilesystemEntry): def __init__(self, fs: Filesystem, path: str, entry: Any, cbpath: str): super().__init__(fs, path, entry) @@ -100,25 +112,22 @@ def open(self) -> BinaryIO: """Returns file handle (file-like object).""" return self.fs.session.get_raw_file(self.cbpath) - def iterdir(self) -> Iterator[str]: - """List the directory contents of a directory. Returns a generator of strings.""" - for entry in self.scandir(): - yield entry.name - - def scandir(self) -> Iterator[CbFilesystemEntry]: + def scandir(self) -> Iterator[CbDirEntry]: """List the directory contents of this directory. Returns a generator of filesystem entries.""" if not self.is_dir(): - raise NotADirectoryError(f"'{self.path}' is not a directory") + raise NotADirectoryError(self.path) separator = self.fs.alt_separator or "/" for entry in self.fs.session.list_directory(self.cbpath + separator): - filename = entry["filename"] - if filename in (".", ".."): + if entry["filename"] in (".", ".."): continue - path = fsutil.join(self.path, filename, alt_separator=self.fs.alt_separator) - cbpath = separator.join([self.cbpath, filename]) - yield CbFilesystemEntry(self.fs, path, entry, cbpath) + yield CbDirEntry( + self.fs, + self.path, + entry["filename"], + (entry, separator.join([self.cbpath, entry["filename"]])), + ) def is_dir(self, follow_symlinks: bool = True) -> bool: """Return whether this entry is a directory.""" diff --git a/dissect/target/filesystems/config.py b/dissect/target/filesystems/config.py index 5123ddc3a4..b7b74b5cab 100644 --- a/dissect/target/filesystems/config.py +++ b/dissect/target/filesystems/config.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Any, BinaryIO from dissect.target.exceptions import ConfigurationParsingError, FileNotFoundError -from dissect.target.filesystem import FilesystemEntry, VirtualFilesystem +from dissect.target.filesystem import DirEntry, FilesystemEntry, VirtualFilesystem from dissect.target.helpers import fsutil from dissect.target.helpers.configutil import ConfigurationParser, parse from dissect.target.helpers.logging import get_logger @@ -131,6 +131,17 @@ def _convert_entry(self, file_entry: FilesystemEntry, *args, **kwargs) -> Config return ConfigurationEntry(self, entry.path, entry, config_parser) +class ConfigurationDirEntry(DirEntry): + fs: ConfigurationFilesystem + entry: tuple[FilesystemEntry, dict | ConfigurationParser | str | list | None] + + def get(self) -> ConfigurationEntry: + return ConfigurationEntry(self.fs, self.path, self.entry[0], self.entry[1]) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class ConfigurationEntry(FilesystemEntry): """A Special filesystem entry. @@ -262,22 +273,17 @@ def open(self) -> BinaryIO: output_data = self._write_value_mapping(self.parser_items) return io.BytesIO(bytes(output_data, "utf-8")) - def iterdir(self) -> Iterator[str]: - for entry in self.scandir(): - yield entry.name - - def scandir(self) -> Iterator[ConfigurationEntry]: - """Return the items inside :attr:`parser_items` as ``ConfigurationEntries``.""" + def scandir(self) -> Iterator[ConfigurationDirEntry]: if self.is_file(): - raise NotADirectoryError + raise NotADirectoryError(self.path) if self.parser_items is None and self.entry.is_dir(): for entry in self.entry.scandir(): - yield ConfigurationEntry(self.fs, entry.name, entry, None) + yield ConfigurationDirEntry(self.fs, self.path, entry.name, (entry.get(), None)) return for key, values in self.parser_items.items(): - yield ConfigurationEntry(self.fs, key, self.entry, values) + yield ConfigurationDirEntry(self.fs, self.path, key, (self.entry, values)) def is_file(self, follow_symlinks: bool = True) -> bool: return not self.is_dir(follow_symlinks) diff --git a/dissect/target/filesystems/cramfs.py b/dissect/target/filesystems/cramfs.py index 917f812030..4d90524031 100644 --- a/dissect/target/filesystems/cramfs.py +++ b/dissect/target/filesystems/cramfs.py @@ -12,7 +12,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -50,6 +50,17 @@ def _get_node(self, path: str, node: INode | None = None) -> INode: raise FileNotFoundError(path) from e +class CramfsDirEntry(DirEntry): + fs: CramfsFilesystem + entry: INode + + def get(self) -> CramfsFilesystemEntry: + return CramfsFilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class CramfsFilesystemEntry(FilesystemEntry): fs: CramfsFilesystem entry: INode @@ -64,22 +75,16 @@ def open(self) -> BinaryIO: raise IsADirectoryError(self.path) return self._resolve().entry.open() - def _iterdir(self) -> Iterator[INode]: + def scandir(self) -> Iterator[CramfsDirEntry]: + """List the directory contents of this directory. Returns a generator of filesystem entries.""" if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - yield from self.readlink_ext().iterdir() - else: - yield from self.entry.iterdir() + for entry in self._resolve().entry.iterdir(): + if entry.name in (".", ".."): + continue - def iterdir(self) -> Iterator[str]: - """List the directory contents of a directory. Returns a generator of strings.""" - yield from (inode.name for inode in self._iterdir()) - - def scandir(self) -> Iterator[FilesystemEntry]: - """List the directory contents of this directory. Returns a generator of filesystem entries.""" - yield from (self.get(entry.name) for entry in self._iterdir()) + yield CramfsDirEntry(self.fs, self.path, entry.name, entry) def is_dir(self, follow_symlinks: bool = True) -> bool: """Return whether this entry is a directory.""" diff --git a/dissect/target/filesystems/dir.py b/dissect/target/filesystems/dir.py index ee82f899ca..4f0111fdbc 100644 --- a/dissect/target/filesystems/dir.py +++ b/dissect/target/filesystems/dir.py @@ -10,7 +10,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -63,6 +63,20 @@ def get(self, path: str) -> FilesystemEntry: raise FileNotFoundError(path) +class DirectoryDirEntry(DirEntry): + fs: DirectoryFilesystem + entry: Path + + def get(self) -> FilesystemEntry: + return DirectoryFilesystemEntry(self.fs, self.path, self.entry) + + def is_junction(self) -> bool: + return hasattr(self.entry, "is_junction") and self.entry.is_junction() + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.entry.stat(follow_symlinks=follow_symlinks) + + class DirectoryFilesystemEntry(FilesystemEntry): entry: Path @@ -88,16 +102,12 @@ def iterdir(self) -> Iterator[str]: for item in self.entry.iterdir(): yield item.name - def scandir(self) -> Iterator[FilesystemEntry]: + def scandir(self) -> Iterator[DirectoryDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - yield from self.readlink_ext().scandir() - else: - for item in self.entry.iterdir(): - path = fsutil.join(self.path, item.name, alt_separator=self.fs.alt_separator) - yield DirectoryFilesystemEntry(self.fs, path, item) + for item in self._resolve().entry.iterdir(): + yield DirectoryDirEntry(self.fs, self.path, item.name, item) def exists(self) -> bool: try: diff --git a/dissect/target/filesystems/exfat.py b/dissect/target/filesystems/exfat.py index e55e0e545f..cd64189a62 100644 --- a/dissect/target/filesystems/exfat.py +++ b/dissect/target/filesystems/exfat.py @@ -9,7 +9,7 @@ from dissect.util.ts import dostimestamp from dissect.target.exceptions import FileNotFoundError, NotADirectoryError -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -59,6 +59,17 @@ def _get_entry(self, path: str, root: ExfatFileTree | None = None) -> ExfatFileT return dirent +class ExfatDirEntry(DirEntry): + fs: ExfatFilesystem + entry: ExfatFileTree + + def get(self) -> ExfatFilesystemEntry: + return ExfatFilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class ExfatFilesystemEntry(FilesystemEntry): def __init__( self, @@ -82,25 +93,15 @@ def open(self) -> BinaryIO: runlist = self.fs.exfat.runlist(self.cluster, False) return RunlistStream(self.fs.exfat.filesystem, runlist, self.size, self.fs.cluster_size) - def _iterdir(self) -> Iterator[tuple[str, ExfatFileTree]]: + def scandir(self) -> Iterator[ExfatDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - for entry_name, entry_file_tree in self.entry[1].items(): - if entry_name in (".", ".."): + for name, file_tree in self.entry[1].items(): + if name in (".", ".."): continue - yield (entry_name, entry_file_tree) - - def iterdir(self) -> Iterator[str]: - """List the directory contents of a directory. Returns a generator of strings.""" - for entry_name, _ in self._iterdir(): - yield entry_name - - def scandir(self) -> Iterator[ExfatFilesystemEntry]: - """List the directory contents of this directory. Returns a generator of filesystem entries.""" - for entry_name, entry_file_tree in self._iterdir(): - path = fsutil.join(self.path, entry_name, alt_separator=self.fs.alt_separator) - yield ExfatFilesystemEntry(self.fs, path, entry_file_tree) + + yield ExfatDirEntry(self.fs, self.path, name, file_tree) def is_symlink(self) -> bool: """Return whether this entry is a link.""" diff --git a/dissect/target/filesystems/extfs.py b/dissect/target/filesystems/extfs.py index 47a733cc81..c65774dba6 100644 --- a/dissect/target/filesystems/extfs.py +++ b/dissect/target/filesystems/extfs.py @@ -12,7 +12,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -47,7 +47,21 @@ def _get_node(self, path: str, node: extfs.INode | None = None) -> extfs.INode: raise FileNotFoundError(path) from e +class ExtDirEntry(DirEntry): + fs: ExtFilesystem + entry: extfs.INode + + def get(self) -> ExtFilesystemEntry: + return ExtFilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class ExtFilesystemEntry(FilesystemEntry): + fs: ExtFilesystem + entry: extfs.INode + def get(self, path: str) -> FilesystemEntry: full_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator) return ExtFilesystemEntry(self.fs, full_path, self.fs._get_node(path, self.entry)) @@ -57,34 +71,16 @@ def open(self) -> BinaryIO: raise IsADirectoryError(self.path) return self._resolve().entry.open() - def iterdir(self) -> Iterator[str]: - if not self.is_dir(): - raise NotADirectoryError(self.path) - - if self.is_symlink(): - for f in self.readlink_ext().iterdir(): - yield f - else: - for f in self.entry.listdir(): - if f in (".", ".."): - continue - - yield f - - def scandir(self) -> Iterator[FilesystemEntry]: + def scandir(self) -> Iterator[ExtDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - for f in self.readlink_ext().scandir(): - yield f - else: - for fname, f in self.entry.listdir().items(): - if fname in (".", ".."): - continue + for fname, f in self._resolve().entry.listdir().items(): + if fname in (".", ".."): + continue - path = fsutil.join(self.path, fname, alt_separator=self.fs.alt_separator) - yield ExtFilesystemEntry(self.fs, path, f) + # TODO: Separate INodes and DirEntry in dissect.extfs + yield ExtDirEntry(self.fs, self.path, fname, f) def is_dir(self, follow_symlinks: bool = True) -> bool: try: diff --git a/dissect/target/filesystems/fat.py b/dissect/target/filesystems/fat.py index 268dca14c6..965473126f 100644 --- a/dissect/target/filesystems/fat.py +++ b/dissect/target/filesystems/fat.py @@ -9,7 +9,7 @@ from dissect.fat import fat from dissect.target.exceptions import FileNotFoundError, NotADirectoryError -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -54,7 +54,21 @@ def _get_entry( raise FileNotFoundError(path) from e +class FatDirEntry(DirEntry): + fs: FatFilesystem + entry: fat.RootDirectory | fat.DirectoryEntry + + def get(self) -> FatFilesystemEntry: + return FatFilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class FatFilesystemEntry(FilesystemEntry): + fs: FatFilesystem + entry: fat.RootDirectory | fat.DirectoryEntry + def get(self, path: str) -> FilesystemEntry: """Get a filesystem entry relative from the current one.""" full_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator) @@ -66,26 +80,16 @@ def open(self) -> BinaryIO: raise IsADirectoryError(self.path) return self.entry.open() - def iterdir(self) -> Iterator[str]: - """List the directory contents of a directory. Returns a generator of strings.""" - if not self.is_dir(): - raise NotADirectoryError(self.path) - - for f in self.entry.iterdir(): - if f.name in (".", ".."): - continue - yield f.name - def scandir(self) -> Iterator[FilesystemEntry]: """List the directory contents of this directory. Returns a generator of filesystem entries.""" if not self.is_dir(): raise NotADirectoryError(self.path) - for f in self.entry.iterdir(): - if f.name in (".", ".."): + for entry in self.entry.iterdir(): + if entry.name in (".", ".."): continue - path = fsutil.join(self.path, f.name, alt_separator=self.fs.alt_separator) - yield FatFilesystemEntry(self.fs, path, f) + + yield FatDirEntry(self.fs, self.path, entry.name, entry) def is_symlink(self) -> bool: """Return whether this entry is a link.""" diff --git a/dissect/target/filesystems/ffs.py b/dissect/target/filesystems/ffs.py index 78f9914c4a..b4c59e12e3 100644 --- a/dissect/target/filesystems/ffs.py +++ b/dissect/target/filesystems/ffs.py @@ -11,7 +11,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -53,7 +53,21 @@ def _get_node(self, path: str, node: ffs.INode | None = None) -> ffs.INode: raise FileNotFoundError(path) from e +class FfsDirEntry(DirEntry): + fs: FfsFilesystem + entry: ffs.INode + + def get(self) -> FfsFilesystemEntry: + return FfsFilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class FfsFilesystemEntry(FilesystemEntry): + fs: FfsFilesystem + entry: ffs.INode + def get(self, path: str) -> FilesystemEntry: entry_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator) entry = self.fs._get_node(path, self.entry) @@ -64,28 +78,16 @@ def open(self) -> BinaryIO: raise IsADirectoryError(self.path) return self._resolve().entry.open() - def _iterdir(self) -> Iterator[ffs.INode]: + def scandir(self) -> Iterator[FfsDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - for entry in self.readlink_ext().iterdir(): - yield entry - else: - for entry in self.entry.iterdir(): - if entry.name in (".", ".."): - continue - - yield entry - - def iterdir(self) -> Iterator[str]: - for entry in self._iterdir(): - yield entry.name + for entry in self._resolve().entry.iterdir(): + if entry.name in (".", ".."): + continue - def scandir(self) -> Iterator[FilesystemEntry]: - for entry in self._iterdir(): - entry_path = fsutil.join(self.path, entry.name, alt_separator=self.fs.alt_separator) - yield FfsFilesystemEntry(self.fs, entry_path, entry) + # TODO: Separate INode and DirEntry in dissect.ffs + yield FfsDirEntry(self.fs, self.path, entry.name, entry) def is_dir(self, follow_symlinks: bool = True) -> bool: try: diff --git a/dissect/target/filesystems/itunes.py b/dissect/target/filesystems/itunes.py index a277747206..e0b6596291 100644 --- a/dissect/target/filesystems/itunes.py +++ b/dissect/target/filesystems/itunes.py @@ -7,6 +7,7 @@ from dissect.target.exceptions import FilesystemError, NotASymlinkError from dissect.target.filesystem import ( + DirEntry, Filesystem, FilesystemEntry, VirtualDirectory, @@ -50,6 +51,9 @@ def get(self, path: str, relentry: FilesystemEntry | None = None) -> FilesystemE class ITunesFilesystemEntry(VirtualFile): + fs: ITunesFilesystem + entry: FileInfo + def open(self) -> BinaryIO: """Returns file handle (file-like object).""" if self.is_dir(): @@ -59,12 +63,7 @@ def open(self) -> BinaryIO: return EncryptedFileStream(self.entry) return self.entry.get().open("rb") - def iterdir(self) -> Iterator[str]: - if self.is_dir(): - return self._resolve().iterdir() - return super().iterdir() - - def scandir(self) -> Iterator[FilesystemEntry]: + def scandir(self) -> Iterator[DirEntry]: if self.is_dir(): return self._resolve().scandir() return super().scandir() diff --git a/dissect/target/filesystems/jffs.py b/dissect/target/filesystems/jffs.py index 2f6bc21bc6..78dd5b922b 100644 --- a/dissect/target/filesystems/jffs.py +++ b/dissect/target/filesystems/jffs.py @@ -12,7 +12,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -49,6 +49,17 @@ def _get_node(self, path: str, node: jffs2.INode | None = None) -> jffs2.INode: raise FileNotFoundError(path) from e +class JffsDirEntry(DirEntry): + fs: JffsFilesystem + entry: jffs2.INode + + def get(self) -> JffsFilesystemEntry: + return JffsFilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class JffsFilesystemEntry(FilesystemEntry): fs: JffsFilesystem entry: jffs2.INode @@ -63,23 +74,13 @@ def open(self) -> BinaryIO: raise IsADirectoryError(self.path) return self._resolve().entry.open() - def _iterdir(self) -> Iterator[tuple[str, jffs2.INode]]: + def scandir(self) -> Iterator[JffsDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - yield from self.readlink_ext().iterdir() - else: - yield from self.entry.iterdir() - - def iterdir(self) -> Iterator[str]: - for name, _ in self._iterdir(): - yield name - - def scandir(self) -> Iterator[FilesystemEntry]: - for name, entry in self._iterdir(): - entry_path = fsutil.join(self.path, name, alt_separator=self.fs.alt_separator) - yield JffsFilesystemEntry(self.fs, entry_path, entry) + for name, entry in self._resolve().entry.iterdir(): + # TODO: Separate INode and DirEntry in dissect.jffs + yield JffsDirEntry(self.fs, self.path, name, entry) def is_dir(self, follow_symlinks: bool = True) -> bool: try: diff --git a/dissect/target/filesystems/nfs.py b/dissect/target/filesystems/nfs.py index 947f116b29..b39c952c45 100644 --- a/dissect/target/filesystems/nfs.py +++ b/dissect/target/filesystems/nfs.py @@ -8,7 +8,7 @@ from dissect.util.stream import AlignedStream from dissect.target.exceptions import NotADirectoryError -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil from dissect.target.helpers.nfs.client.mount import Client as MountClient from dissect.target.helpers.nfs.client.nfs import Client as NfsClient @@ -134,6 +134,17 @@ def get(self, path: str, relentry: NfsFilesystemEntry | None = None) -> NfsFiles return NfsFilesystemEntry(self, path, result.object, result.obj_attributes) +class NfsDirEntry(DirEntry): + fs: NfsFilesystem + entry: EntryPlus + + def get(self) -> NfsFilesystemEntry: + return NfsFilesystemEntry(self.fs, self.path, self.entry.handle, self.entry.attributes) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class NfsFilesystemEntry(FilesystemEntry): fs: NfsFilesystem entry: FileHandle @@ -179,19 +190,12 @@ def readlink_ext(self) -> NfsFilesystemEntry: target = self.fs._client.readlink(self.entry) return self.fs.get(target) # The target is an absolute path - def _iterdir(self) -> Iterator[EntryPlus]: + def scandir(self) -> Iterator[NfsDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - yield from self.fs._client.readdir(self.entry).entries - - def iterdir(self) -> Iterator[str]: - for entry in self._iterdir(): - yield entry.name - - def scandir(self) -> Iterator[FilesystemEntry]: - for entry in self._iterdir(): - yield NfsFilesystemEntry(self.fs, entry.name, entry.handle, entry.attributes) + for entry in self.fs._client.readdir(self.entry).entries: + yield NfsDirEntry(self.fs, self.path, entry.name, entry) def open(self) -> NfsStream: # Pass size if available but don't sweat it diff --git a/dissect/target/filesystems/ntfs.py b/dissect/target/filesystems/ntfs.py index 0e2e76fdf5..83ff586694 100644 --- a/dissect/target/filesystems/ntfs.py +++ b/dissect/target/filesystems/ntfs.py @@ -8,6 +8,7 @@ from dissect.ntfs.exceptions import Error as NtfsError from dissect.ntfs.exceptions import FileNotFoundError as NtfsFileNotFoundError from dissect.ntfs.exceptions import NotADirectoryError as NtfsNotADirectoryError +from dissect.ntfs.util import segment_reference from dissect.target.exceptions import ( FileNotFoundError, @@ -15,12 +16,13 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: from collections.abc import Iterator + from dissect.ntfs.attr import FileName, StandardInformation from dissect.ntfs.util import AttributeMap @@ -60,27 +62,92 @@ def _get_record(self, path: str, root: MftRecord | None = None) -> MftRecord: raise FileNotFoundError(path) from e +class NtfsDirEntry(DirEntry): + entry: IndexEntry + + def get(self) -> FilesystemEntry: + return NtfsFilesystemEntry(self.fs, self.path, self.entry.dereference()) + + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + if follow_symlinks and self.is_symlink(): + return super().is_dir(follow_symlinks=follow_symlinks) + + return self.entry.attribute.is_dir() + + def is_file(self, *, follow_symlinks: bool = True) -> bool: + if follow_symlinks and self.is_symlink(): + return super().is_file(follow_symlinks=follow_symlinks) + + return self.entry.attribute.is_file() + + def is_symlink(self) -> bool: + return self.entry.attribute.is_symlink() or self.entry.attribute.is_mount_point() + + def is_junction(self) -> bool: + return self.entry.attribute.is_mount_point() + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + if follow_symlinks and self.is_symlink(): + return self.fs.stat(self.path) + + attr: FileName = self.entry.attribute + if attr.is_symlink() or attr.is_mount_point(): + # Technically NTFS mount points/junctions are not symlinks, but it's easier if we pretend they are + mode = stat.S_IFLNK + elif attr.is_dir(): + mode = stat.S_IFDIR + else: + mode = stat.S_IFREG + + # mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime + st_info = fsutil.stat_result( + [ + mode | 0o777, + segment_reference(self.entry.header.FileReference), + id(self.fs), + 0, + 0, + 0, + attr.file_size, + attr.last_access_time.timestamp(), + attr.last_modification_time.timestamp(), + # ctime gets set to creation time for python <3.12 purposes + attr.creation_time.timestamp(), + ] + ) + + # Set the nanosecond resolution separately + st_info.st_atime_ns = attr.last_access_time_ns + st_info.st_mtime_ns = attr.last_modification_time_ns + + st_info.st_ctime_ns = attr.creation_time_ns + + st_info.st_birthtime = attr.creation_time.timestamp() + st_info.st_birthtime_ns = attr.creation_time_ns + + # real_size is none if the size is resident + st_info.st_blksize = self.entry.index.record.ntfs.cluster_size + st_info.st_blocks = math.ceil(attr.allocated_size / 512) + + return st_info + + class NtfsFilesystemEntry(FilesystemEntry): - def __init__( - self, fs: NtfsFilesystem, path: str, entry: MftRecord | None = None, index_entry: IndexEntry | None = None - ): + fs: NtfsFilesystem + entry: MftRecord + + def __init__(self, fs: NtfsFilesystem, path: str, entry: MftRecord): super().__init__(fs, path, entry) - self.index_entry = index_entry self.ads = "" if ":" in self.path: self.path, self.ads = path.rsplit(":", maxsplit=1) - def dereference(self) -> MftRecord: - if not self.entry: - self.entry = self.index_entry.dereference() - return self.entry - def get(self, path: str) -> NtfsFilesystemEntry: return NtfsFilesystemEntry( self.fs, fsutil.join(self.path, path, alt_separator=self.fs.alt_separator), - self.fs._get_record(path, self.dereference()), + self.fs._get_record(path, self.entry), ) def open(self, name: str = "") -> BinaryIO: @@ -91,36 +158,23 @@ def open(self, name: str = "") -> BinaryIO: return self.readlink_ext().open(name) stream = name or self.ads - return self.dereference().open(stream) + return self.entry.open(stream) - def _iterdir(self, ignore_dos: bool = True) -> Iterator[IndexEntry]: + def scandir(self, ignore_dos: bool = True) -> Iterator[NtfsDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - yield from self.readlink_ext()._iterdir(ignore_dos=ignore_dos) - return - - for entry in self.dereference().iterdir(ignore_dos=ignore_dos): + for entry in self._resolve().entry.iterdir(ignore_dos=ignore_dos): if entry.attribute.file_name == ".": continue - yield entry - - def iterdir(self) -> Iterator[str]: - for index_entry in self._iterdir(): - yield index_entry.attribute.file_name - - def scandir(self) -> Iterator[NtfsFilesystemEntry]: - for index_entry in self._iterdir(): - path = fsutil.join(self.path, index_entry.attribute.file_name, alt_separator=self.fs.alt_separator) - yield NtfsFilesystemEntry(self.fs, path, index_entry=index_entry) + yield NtfsDirEntry(self.fs, self.path, entry.attribute.file_name, entry) def is_dir(self, follow_symlinks: bool = True) -> bool: if not follow_symlinks and self.is_symlink(): return False - return self.dereference().is_dir() + return self.entry.is_dir() def is_file(self, follow_symlinks: bool = True) -> bool: if not follow_symlinks and self.is_symlink(): @@ -129,8 +183,7 @@ def is_file(self, follow_symlinks: bool = True) -> bool: return not self.is_dir(follow_symlinks=follow_symlinks) def is_symlink(self) -> bool: - entry = self.dereference() - return entry.is_symlink() or entry.is_mount_point() + return self.entry.is_symlink() or self.entry.is_mount_point() def readlink(self) -> str: # Note: we only need to check and resolve symlinks when actually interacting with the target, such as @@ -139,7 +192,7 @@ def readlink(self) -> str: if not self.is_symlink(): raise NotASymlinkError - reparse_point = self.dereference().attributes.REPARSE_POINT + reparse_point = self.entry.attributes.REPARSE_POINT print_name = reparse_point.print_name if reparse_point.absolute: # Prefix with \\ to make the path play ball with all the filesystem utilities @@ -153,7 +206,7 @@ def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: return self._resolve(follow_symlinks=follow_symlinks).lstat() def lstat(self) -> fsutil.stat_result: - record = self.dereference() + record = self.entry size = 0 real_size = 0 @@ -170,7 +223,7 @@ def lstat(self) -> fsutil.stat_result: else: mode = stat.S_IFDIR - stdinfo = record.attributes.STANDARD_INFORMATION + stdinfo: StandardInformation = record.attributes.STANDARD_INFORMATION # mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime st_info = fsutil.stat_result( @@ -214,4 +267,4 @@ def attr(self) -> AttributeMap: return self.lattr() def lattr(self) -> AttributeMap: - return self.dereference().attributes + return self.entry.attributes diff --git a/dissect/target/filesystems/qnxfs.py b/dissect/target/filesystems/qnxfs.py index fe7bd9772c..eabe696240 100644 --- a/dissect/target/filesystems/qnxfs.py +++ b/dissect/target/filesystems/qnxfs.py @@ -13,7 +13,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -49,6 +49,17 @@ def _get_node(self, path: str, node: INode | None = None) -> INode: raise FileNotFoundError(path, cause=e) +class QnxDirEntry(DirEntry): + fs: QnxFilesystem + entry: INode + + def get(self) -> QnxFilesystemEntry: + return QnxFilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class QnxFilesystemEntry(FilesystemEntry): fs: QnxFilesystem entry: INode @@ -62,22 +73,13 @@ def open(self) -> BinaryIO: raise IsADirectoryError(self.path) return self._resolve().entry.open() - def _iterdir(self) -> Iterator[tuple[str, INode]]: + def scandir(self) -> Iterator[QnxDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - yield from self.readlink_ext().iterdir() - else: - yield from self.entry.iterdir() - - def iterdir(self) -> Iterator[str]: - yield from (name for name, _ in self._iterdir()) - - def scandir(self) -> Iterator[FilesystemEntry]: - for name, entry in self._iterdir(): - entry_path = fsutil.join(self.path, name, alt_separator=self.fs.alt_separator) - yield QnxFilesystemEntry(self.fs, entry_path, entry) + for name, entry in self._resolve().entry.iterdir(): + # TODO: Separate INode and DirEntry in dissect.qnxfs + yield QnxDirEntry(self.fs, self.path, name, entry) def is_dir(self, follow_symlinks: bool = True) -> bool: try: diff --git a/dissect/target/filesystems/smb.py b/dissect/target/filesystems/smb.py index 601d863a65..84a28ed44c 100644 --- a/dissect/target/filesystems/smb.py +++ b/dissect/target/filesystems/smb.py @@ -11,7 +11,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil from dissect.target.helpers.logging import get_logger @@ -84,6 +84,29 @@ def _get_entry(self, path: str) -> SharedFile: return result[0] +class SmbDirEntry(DirEntry): + fs: SmbFilesystem + entry: SharedFile + + def get(self) -> SmbFilesystemEntry: + return SmbFilesystemEntry(self.fs, self.path, self.entry) + + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + if follow_symlinks and self.is_symlink(): + return super().is_dir(follow_symlinks=follow_symlinks) + + return bool(self.entry.is_directory()) + + def is_file(self, *, follow_symlinks: bool = True) -> bool: + return not self.is_dir(follow_symlinks=follow_symlinks) + + def is_symlink(self) -> bool: + return False + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class SmbFilesystemEntry(FilesystemEntry): fs: SmbFilesystem entry: SharedFile @@ -91,31 +114,21 @@ class SmbFilesystemEntry(FilesystemEntry): def get(self, path: str) -> FilesystemEntry: return self.fs.get(fsutil.join(self.path, path, alt_separator=self.fs.alt_separator)) - def _iterdir(self) -> Iterator[SharedFile]: + def scandir(self) -> Iterator[SmbDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) path = fsutil.join(self.path, "*", alt_separator=self.fs.alt_separator) try: entry: SharedFile - for entry in self.fs.conn.listPath(self.fs.share_name, path): - if entry.get_longname() in (".", ".."): + if (name := entry.get_longname()) in (".", ".."): continue - yield entry + yield SmbDirEntry(self.fs, self.path, name, entry) except SessionError as e: log.error("Failed to list directory '%s' share '%s', error: %s", path, self.fs.share_name, e) # noqa: TRY400 - def iterdir(self) -> Iterator[str]: - for entry in self._iterdir(): - yield entry.get_longname() - - def scandir(self) -> Iterator[FilesystemEntry]: - for entry in self._iterdir(): - entry_path = fsutil.join(self.path, entry.get_longname(), alt_separator=self.fs.alt_separator) - yield SmbFilesystemEntry(self.fs, entry_path, entry) - def open(self) -> SmbStream: log.debug("Attempting to open file: %s", self.path) try: diff --git a/dissect/target/filesystems/squashfs.py b/dissect/target/filesystems/squashfs.py index 52128cd65d..2d4c0cb301 100644 --- a/dissect/target/filesystems/squashfs.py +++ b/dissect/target/filesystems/squashfs.py @@ -12,7 +12,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: @@ -46,7 +46,21 @@ def _get_node(self, path: str, node: INode | None = None) -> INode: raise FileNotFoundError(path) from e +class SquashFSDirEntry(DirEntry): + fs: SquashFSFilesystem + entry: INode + + def get(self) -> SquashFSFilesystemEntry: + return SquashFSFilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class SquashFSFilesystemEntry(FilesystemEntry): + fs: SquashFSFilesystem + entry: INode + def get(self, path: str) -> FilesystemEntry: entry_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator) entry = self.fs._get_node(path, self.entry) @@ -57,28 +71,15 @@ def open(self) -> BinaryIO: raise IsADirectoryError(self.path) return self._resolve().entry.open() - def _iterdir(self) -> Iterator[INode]: + def scandir(self) -> Iterator[SquashFSDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - for entry in self.readlink_ext().iterdir(): - yield entry - else: - for entry in self.entry.iterdir(): - if entry.name in (".", ".."): - continue - - yield entry - - def iterdir(self) -> Iterator[str]: - for entry in self._iterdir(): - yield entry.name + for entry in self._resolve().entry.iterdir(): + if entry.name in (".", ".."): + continue - def scandir(self) -> Iterator[FilesystemEntry]: - for entry in self._iterdir(): - entry_path = fsutil.join(self.path, entry.name, alt_separator=self.fs.alt_separator) - yield SquashFSFilesystemEntry(self.fs, entry_path, entry) + yield SquashFSDirEntry(self.fs, self.path, entry.name, entry) def is_dir(self, follow_symlinks: bool = True) -> bool: try: diff --git a/dissect/target/filesystems/tar.py b/dissect/target/filesystems/tar.py index 316b0c528f..294f374d3c 100644 --- a/dissect/target/filesystems/tar.py +++ b/dissect/target/filesystems/tar.py @@ -16,6 +16,7 @@ Filesystem, FilesystemEntry, VirtualDirectory, + VirtualDirEntry, VirtualFile, VirtualFilesystem, ) @@ -93,12 +94,7 @@ def open(self) -> BinaryIO: except Exception: raise FileNotFoundError - def iterdir(self) -> Iterator[str]: - if self.is_dir(): - return self._resolve().iterdir() - return super().iterdir() - - def scandir(self) -> Iterator[FilesystemEntry]: + def scandir(self) -> Iterator[VirtualDirEntry]: if self.is_dir(): return self._resolve().scandir() return super().scandir() diff --git a/dissect/target/filesystems/vbk.py b/dissect/target/filesystems/vbk.py index 0800716ecc..ae347a5136 100644 --- a/dissect/target/filesystems/vbk.py +++ b/dissect/target/filesystems/vbk.py @@ -13,6 +13,7 @@ NotASymlinkError, ) from dissect.target.filesystem import ( + DirEntry, Filesystem, FilesystemEntry, ) @@ -56,6 +57,26 @@ def _get_node(self, path: str, node: vbk.DirItem | None = None) -> FilesystemEnt raise FilesystemError(path) from e +class VbkDirEntry(DirEntry): + fs: VbkFilesystem + entry: vbk.DirItem + + def get(self) -> VbkFilesystemEntry: + return VbkFilesystemEntry(self.fs, self.path, self.entry) + + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + return self.entry.is_dir() + + def is_file(self, *, follow_symlinks: bool = True) -> bool: + return self.entry.is_file() + + def is_symlink(self) -> bool: + return False + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class VbkFilesystemEntry(FilesystemEntry): fs: VbkFilesystem entry: vbk.DirItem @@ -72,23 +93,12 @@ def open(self) -> None: raise IsADirectoryError(self.path) return self.entry.open() - def iterdir(self) -> Iterator[str]: - if not self.is_dir(): - raise NotADirectoryError(self.path) - - for entry in self.entry.iterdir(): - yield entry.name - - def scandir(self) -> Iterator[FilesystemEntry]: + def scandir(self) -> Iterator[VbkDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) for entry in self.entry.iterdir(): - yield VbkFilesystemEntry( - self.fs, - fsutil.join(self.path, entry.name, alt_separator=self.fs.alt_separator), - entry, - ) + yield VbkDirEntry(self.fs, self.path, entry.name, entry) def is_dir(self, follow_symlinks: bool = True) -> bool: return self.entry.is_dir() diff --git a/dissect/target/filesystems/vmfs.py b/dissect/target/filesystems/vmfs.py index b0cdf3355f..aa5c619fa5 100644 --- a/dissect/target/filesystems/vmfs.py +++ b/dissect/target/filesystems/vmfs.py @@ -4,7 +4,6 @@ import dissect.vmfs as vmfs from dissect.vmfs.c_vmfs import c_vmfs -from dissect.vmfs.descriptor import FileDescriptor from dissect.target.exceptions import ( FileNotFoundError, @@ -13,14 +12,12 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil if TYPE_CHECKING: from collections.abc import Iterator - from dissect.vmfs.vmfs import FileDescriptor - class VmfsFilesystem(Filesystem): __type__ = "vmfs" @@ -42,7 +39,7 @@ def _detect(fh: BinaryIO) -> bool: def get(self, path: str) -> FilesystemEntry: return VmfsFilesystemEntry(self, path, self._get_node(path)) - def _get_node(self, path: str, node: FileDescriptor | None = None) -> FileDescriptor: + def _get_node(self, path: str, node: vmfs.FileDescriptor | None = None) -> vmfs.FileDescriptor: """Returns an internal VMFS entry for a given path and optional relative entry.""" try: return self.vmfs.get(path, node) @@ -56,9 +53,35 @@ def _get_node(self, path: str, node: FileDescriptor | None = None) -> FileDescri raise FileNotFoundError(path) from e +class VmfsDirEntry(DirEntry): + fs: VmfsFilesystem + entry: vmfs.DirEntry + + def get(self) -> VmfsFilesystemEntry: + return VmfsFilesystemEntry(self.fs, self.path, self.entry.file_descriptor) + + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + if follow_symlinks and self.is_symlink(): + return super().is_dir(follow_symlinks=follow_symlinks) + + return self.entry.type == c_vmfs.FS3_DescriptorType.DIRECTORY + + def is_file(self, *, follow_symlinks: bool = True) -> bool: + if follow_symlinks and self.is_symlink(): + return super().is_file(follow_symlinks=follow_symlinks) + + return self.entry.type == c_vmfs.FS3_DescriptorType.REGFILE + + def is_symlink(self) -> bool: + return self.entry.type == c_vmfs.FS3_DescriptorType.SYMLINK + + def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class VmfsFilesystemEntry(FilesystemEntry): fs: VmfsFilesystem - entry: FileDescriptor + entry: vmfs.FileDescriptor def get(self, path: str) -> FilesystemEntry: """Get a filesystem entry relative from the current one.""" @@ -71,30 +94,16 @@ def open(self) -> BinaryIO: raise IsADirectoryError(self.path) return self._resolve().entry.open() - def _iterdir(self) -> Iterator[FileDescriptor]: + def scandir(self) -> Iterator[VmfsDirEntry]: + """List the directory contents of this directory. Returns a generator of filesystem entries.""" if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - for f in self.readlink_ext().iterdir(): - yield f - else: - for f in self.entry.iterdir(): - if f.name in (".", ".."): - continue - - yield f + for entry in self._resolve().entry.iterdir(): + if entry.name in (".", ".."): + continue - def iterdir(self) -> Iterator[str]: - """List the directory contents of a directory. Returns a generator of strings.""" - for f in self._iterdir(): - yield f.name - - def scandir(self) -> Iterator[FilesystemEntry]: - """List the directory contents of this directory. Returns a generator of filesystem entries.""" - for f in self._iterdir(): - path = fsutil.join(self.path, f.name, alt_separator=self.fs.alt_separator) - yield VmfsFilesystemEntry(self.fs, path, f.file_descriptor) + yield VmfsDirEntry(self.fs, self.path, entry.name, entry) def is_dir(self, follow_symlinks: bool = True) -> bool: """Return whether this entry is a directory.""" diff --git a/dissect/target/filesystems/xfs.py b/dissect/target/filesystems/xfs.py index f27292f290..aa9aef4148 100644 --- a/dissect/target/filesystems/xfs.py +++ b/dissect/target/filesystems/xfs.py @@ -12,7 +12,7 @@ NotADirectoryError, NotASymlinkError, ) -from dissect.target.filesystem import Filesystem, FilesystemEntry +from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry from dissect.target.helpers import fsutil from dissect.target.helpers.logging import get_logger @@ -51,7 +51,21 @@ def _get_node(self, path: str, node: xfs.INode | None = None) -> xfs.INode: raise FileNotFoundError(path) from e +class XfsDirEntry(DirEntry): + fs: XfsFilesystem + entry: xfs.INode + + def get(self) -> XfsFilesystemEntry: + return XfsFilesystemEntry(self.fs, self.path, self.entry) + + def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: + return self.get().stat(follow_symlinks=follow_symlinks) + + class XfsFilesystemEntry(FilesystemEntry): + fs: XfsFilesystem + entry: xfs.INode + def get(self, path: str) -> FilesystemEntry: full_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator) return XfsFilesystemEntry(self.fs, full_path, self.fs._get_node(path, self.entry)) @@ -61,35 +75,16 @@ def open(self) -> BinaryIO: raise IsADirectoryError(self.path) return self._resolve().entry.open() - def iterdir(self) -> Iterator[str]: + def scandir(self) -> Iterator[XfsDirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - if self.is_symlink(): - for f in self.readlink_ext().iterdir(): - yield f - else: - for f in self.entry.listdir(): - if f in (".", ".."): - continue - yield f - - def scandir(self) -> Iterator[FilesystemEntry]: - if not self.is_dir(): - raise NotADirectoryError(self.path) + for name, entry in self._resolve().entry.listdir().items(): + if name in (None, ".", ".."): + continue - if self.is_symlink(): - for f in self.readlink_ext().scandir(): - yield f - else: - for filename, f in self.entry.listdir().items(): - if filename in (".", ".."): - continue - - if filename is None: - continue - path = fsutil.join(self.path, filename, alt_separator=self.fs.alt_separator) - yield XfsFilesystemEntry(self.fs, path, f) + # TODO: Separate INode and DirEntry in dissect.xfs + yield XfsDirEntry(self.fs, self.path, name, entry) def is_dir(self, follow_symlinks: bool = True) -> bool: try: diff --git a/dissect/target/filesystems/zip.py b/dissect/target/filesystems/zip.py index 7dd8f1e257..3b3eebdc76 100644 --- a/dissect/target/filesystems/zip.py +++ b/dissect/target/filesystems/zip.py @@ -15,6 +15,7 @@ NotASymlinkError, ) from dissect.target.filesystem import ( + DirEntry, Filesystem, FilesystemEntry, VirtualDirectory, @@ -98,25 +99,13 @@ def open(self) -> BinaryIO: except Exception: raise FileNotFoundError(self.path) - def iterdir(self) -> Iterator[str]: + def scandir(self) -> Iterator[DirEntry]: if not self.is_dir(): raise NotADirectoryError(self.path) - entry = self._resolve() - if isinstance(entry, ZipFilesystemEntry): - yield from super(ZipFilesystemEntry, entry).iterdir() - else: - yield from entry.iterdir() - - def scandir(self) -> Iterator[FilesystemEntry]: - if not self.is_dir(): - raise NotADirectoryError(self.path) - - entry = self._resolve() - if isinstance(entry, ZipFilesystemEntry): - yield from super(ZipFilesystemEntry, entry).scandir() - else: - yield from entry.scandir() + if isinstance(entry := self._resolve(), ZipFilesystemEntry): + return super(ZipFilesystemEntry, entry).scandir() + return entry.scandir() def is_dir(self, follow_symlinks: bool = True) -> bool: try: diff --git a/dissect/target/helpers/compat/path_310.py b/dissect/target/helpers/compat/path_310.py index ebc2631afe..0fd8a7918a 100644 --- a/dissect/target/helpers/compat/path_310.py +++ b/dissect/target/helpers/compat/path_310.py @@ -248,7 +248,7 @@ def parents(self) -> path_common._DissectPathParents: class TargetPath(Path, PureDissectPath): _accessor = _dissect_accessor - __slots__ = ("_entry",) + __slots__ = ("_direntry", "_entry") def _make_child_relpath(self, part: str) -> Self: child = super()._make_child_relpath(part) @@ -257,11 +257,10 @@ def _make_child_relpath(self, part: str) -> Self: return child def get(self) -> FilesystemEntry: - try: - return self._entry - except AttributeError: - self._entry = self._fs.get(str(self)) - return self._entry + """Return the :class:`FilesystemEntry` for this path.""" + if not hasattr(self, "_entry"): + self._entry = self._direntry.get() if hasattr(self, "_direntry") else self._fs.get(str(self)) + return self._entry @classmethod def cwd(cls) -> Self: @@ -286,7 +285,7 @@ def iterdir(self) -> Iterator[Self]: # Yielding a path object for these makes little sense continue child_path = self._make_child_relpath(entry.name) - child_path._entry = entry + child_path._direntry = entry yield child_path # NOTE: Forward compatibility with CPython >= 3.12 diff --git a/dissect/target/helpers/compat/path_311.py b/dissect/target/helpers/compat/path_311.py index 1330ed0607..9059c49ade 100644 --- a/dissect/target/helpers/compat/path_311.py +++ b/dissect/target/helpers/compat/path_311.py @@ -151,7 +151,7 @@ def parents(self) -> path_common._DissectPathParents: class TargetPath(Path, PureDissectPath): - __slots__ = ("_entry",) + __slots__ = ("_direntry", "_entry") def _make_child_relpath(self, part: str) -> Self: child = super()._make_child_relpath(part) @@ -160,11 +160,10 @@ def _make_child_relpath(self, part: str) -> Self: return child def get(self) -> FilesystemEntry: - try: - return self._entry - except AttributeError: - self._entry = self._fs.get(str(self)) - return self._entry + """Return the :class:`FilesystemEntry` for this path.""" + if not hasattr(self, "_entry"): + self._entry = self._direntry.get() if hasattr(self, "_direntry") else self._fs.get(str(self)) + return self._entry @classmethod def cwd(cls) -> Self: @@ -189,7 +188,7 @@ def iterdir(self) -> Iterator[Self]: # Yielding a path object for these makes little sense continue child_path = self._make_child_relpath(entry.name) - child_path._entry = entry + child_path._direntry = entry yield child_path def _scandir(self) -> path_common._DissectScandirIterator: diff --git a/dissect/target/helpers/compat/path_312.py b/dissect/target/helpers/compat/path_312.py index d87586810c..d364078703 100644 --- a/dissect/target/helpers/compat/path_312.py +++ b/dissect/target/helpers/compat/path_312.py @@ -155,14 +155,13 @@ def is_reserved(self) -> bool: class TargetPath(Path, PureDissectPath): - __slots__ = ("_entry",) + __slots__ = ("_direntry", "_entry") def get(self) -> FilesystemEntry: - try: - return self._entry - except AttributeError: - self._entry = self._fs.get(str(self)) - return self._entry + """Return the :class:`FilesystemEntry` for this path.""" + if not hasattr(self, "_entry"): + self._entry = self._direntry.get() if hasattr(self, "_direntry") else self._fs.get(str(self)) + return self._entry def stat(self, *, follow_symlinks: bool = True) -> stat_result: """ @@ -213,7 +212,7 @@ def iterdir(self) -> Iterator[Self]: # Yielding a path object for these makes little sense continue child_path = self._make_child_relpath(entry.name) - child_path._entry = entry + child_path._direntry = entry yield child_path def _scandir(self) -> path_common._DissectScandirIterator: diff --git a/dissect/target/helpers/compat/path_313.py b/dissect/target/helpers/compat/path_313.py index ad3f9e7e98..eb58d366bc 100644 --- a/dissect/target/helpers/compat/path_313.py +++ b/dissect/target/helpers/compat/path_313.py @@ -142,18 +142,17 @@ def _parse_path(self, path: str) -> tuple[str, str, list[str]]: class TargetPath(Path, PureDissectPath): - __slots__ = ("_entry",) + __slots__ = ("_direntry", "_entry") @classmethod def _unsupported_msg(cls, attribute: str) -> str: return f"{cls.__name__}.{attribute} is unsupported" def get(self) -> FilesystemEntry: - try: - return self._entry - except AttributeError: - self._entry = self._fs.get(str(self)) - return self._entry + """Return the :class:`FilesystemEntry` for this path.""" + if not hasattr(self, "_entry"): + self._entry = self._direntry.get() if hasattr(self, "_direntry") else self._fs.get(str(self)) + return self._entry def stat(self, *, follow_symlinks: bool = True) -> stat_result: """ @@ -228,7 +227,7 @@ def iterdir(self) -> Iterator[Self]: for entry in scandir_it: name = entry.name child_path = self.joinpath(name) - child_path._entry = entry + child_path._direntry = entry yield child_path def glob( diff --git a/dissect/target/helpers/compat/path_common.py b/dissect/target/helpers/compat/path_common.py index cc13db422f..79faf4857b 100644 --- a/dissect/target/helpers/compat/path_common.py +++ b/dissect/target/helpers/compat/path_common.py @@ -97,7 +97,7 @@ def isjunction(path: TargetPath) -> bool: entry = path.get() # Python's ntpath isjunction() only checks for mount point reparse tags - return isinstance(entry, NtfsFilesystemEntry) and entry.dereference().is_mount_point() + return isinstance(entry, NtfsFilesystemEntry) and entry.is_mount_point() # Join two paths, normalizing and eliminating any symbolic links diff --git a/dissect/target/helpers/fsutil.py b/dissect/target/helpers/fsutil.py index 4570072d4b..b76087b0e9 100644 --- a/dissect/target/helpers/fsutil.py +++ b/dissect/target/helpers/fsutil.py @@ -289,15 +289,15 @@ def walk_ext( ) -> Iterator[ tuple[list[filesystem.FilesystemEntry], list[filesystem.FilesystemEntry], list[filesystem.FilesystemEntry]] ]: - dirs = [] - files = [] + dirs: list[filesystem.FilesystemEntry] = [] + files: list[filesystem.FilesystemEntry] = [] try: for entry in path_entry.scandir(): if entry.is_dir(): - dirs.append(entry) + dirs.append(entry.get()) else: - files.append(entry) + files.append(entry.get()) except Exception as e: if onerror is not None and callable(onerror): e.entry = path_entry @@ -316,18 +316,18 @@ def walk_ext( yield [path_entry], dirs, files -def recurse(path_entry: filesystem.FilesystemEntry) -> Iterator[filesystem.FilesystemEntry]: - """Recursively walk the given :class:`FilesystemEntry`, yields :class:`FilesystemEntry` instances.""" - yield path_entry +def recurse(entry: filesystem.FilesystemEntry) -> Iterator[filesystem.FilesystemEntry]: + """Recursively walk the given :class:`FilesystemEntry`, yields :class:`DirEntry` instances.""" + yield entry - if not path_entry.is_dir(): + if not entry.is_dir(): return - for child_entry in path_entry.scandir(): - if child_entry.is_dir() and not child_entry.is_symlink(): - yield from recurse(child_entry) + for direntry in entry.scandir(): + if direntry.is_dir(follow_symlinks=False): + yield from recurse(direntry.get()) else: - yield child_entry + yield direntry.get() def glob_split(pattern: str, alt_separator: str = "") -> tuple[str, str]: @@ -424,7 +424,7 @@ def glob_ext1(direntry: filesystem.FilesystemEntry, pattern: str) -> Iterator[fi name = entry.name if case_sensitive else entry.name.lower() pattern = pattern if case_sensitive else pattern.lower() if fnmatch.fnmatch(name, pattern): - yield entry + yield entry.get() def glob_ext0(direntry: filesystem.FilesystemEntry, path: str) -> Iterator[filesystem.FilesystemEntry]: diff --git a/dissect/target/plugins/filesystem/unix/capability.py b/dissect/target/plugins/filesystem/unix/capability.py index 2214094fc3..1f686af6a0 100644 --- a/dissect/target/plugins/filesystem/unix/capability.py +++ b/dissect/target/plugins/filesystem/unix/capability.py @@ -105,7 +105,7 @@ def capability_binaries(self) -> Iterator[CapabilityRecord]: """ for entry in self.target.fs.recurse("/"): - if not entry.is_file() or entry.is_symlink(): + if not entry.is_file(follow_symlinks=False): continue yield from parse_entry(entry, self.target) diff --git a/dissect/target/plugins/filesystem/walkfs.py b/dissect/target/plugins/filesystem/walkfs.py index 089cce2374..d2cb236ec1 100644 --- a/dissect/target/plugins/filesystem/walkfs.py +++ b/dissect/target/plugins/filesystem/walkfs.py @@ -92,7 +92,6 @@ def walkfs(self, walkfs_path: str = "/", capability: bool = False) -> Iterator[F for entry in self.target.fs.recurse(walkfs_path): try: yield from generate_record(self.target, entry, capability) - except FileNotFoundError as e: # noqa: PERF203 self.target.log.warning("File not found: %s", entry) self.target.log.debug("", exc_info=e) diff --git a/dissect/target/tools/diff.py b/dissect/target/tools/diff.py index 1d58b08ae1..2c0c7545ef 100644 --- a/dissect/target/tools/diff.py +++ b/dissect/target/tools/diff.py @@ -158,21 +158,22 @@ def scandir(self, path: str) -> DirectoryDifferential: exists_as_directory_src = self.src_target.fs.exists(path) and self.src_target.fs.get(path).is_dir() exists_as_directory_dst = self.dst_target.fs.exists(path) and self.dst_target.fs.get(path).is_dir() + # TODO: Adjust the following code to deal more efficiently with DirEntry instead of FilesystemEntry if not (exists_as_directory_src and exists_as_directory_dst): if exists_as_directory_src: # Path only exists on src target, hence all entries can be considered 'deleted' - entries = list(self.src_target.fs.scandir(path)) + entries = [entry.get() for entry in self.src_target.fs.scandir(path)] return DirectoryDifferential(path, deleted=entries) if exists_as_directory_dst: # Path only exists on dst target, hence all entries can be considered 'created' - entries = list(self.dst_target.fs.scandir(path)) + entries = [entry.get() for entry in self.dst_target.fs.scandir(path)] return DirectoryDifferential(path, created=entries) raise ValueError(f"{path} is not a directory on either the source or destination target!") - src_target_entries = list(self.src_target.fs.scandir(path)) + src_target_entries = [entry.get() for entry in self.src_target.fs.scandir(path)] src_target_children_paths = {entry.path for entry in src_target_entries} - dst_target_entries = list(self.dst_target.fs.scandir(path)) + dst_target_entries = [entry.get() for entry in self.dst_target.fs.scandir(path)] dst_target_children_paths = {entry.path for entry in dst_target_entries} paths_only_on_src_target = src_target_children_paths - dst_target_children_paths diff --git a/tests/filesystems/test_cb.py b/tests/filesystems/test_cb.py index 121add812e..e9616a9d5b 100644 --- a/tests/filesystems/test_cb.py +++ b/tests/filesystems/test_cb.py @@ -73,9 +73,9 @@ def test_cb_filesystem_windows(monkeypatch: pytest.MonkeyPatch) -> None: assert not entries[0].is_symlink() with pytest.raises(NotADirectoryError): - entries[0].listdir() + entries[0].get().listdir() - entries[0].open() + entries[0].get().open() mock_session.get_raw_file.assert_called_with("c:\\windows\\system32\\myfile.txt") stat_result = entries[0].stat() diff --git a/tests/filesystems/test_exfat.py b/tests/filesystems/test_exfat.py index adbf6d412f..df1599855d 100644 --- a/tests/filesystems/test_exfat.py +++ b/tests/filesystems/test_exfat.py @@ -113,29 +113,6 @@ def test_filesystem_entry_get(exfat_fs: ExfatFilesystem, some_path: ExfatFilesys assert some_file.entry[0]._mock_name == "Mock_some_path_some_file" -def test_filesystem_entry__iterdir(exfat_fs: ExfatFilesystem, some_path: ExfatFilesystemEntry) -> None: - file_names = [] - file_entries = [] - - for entry_name, entry_file_tree in some_path._iterdir(): - file_names.append(entry_name) - file_entries.append(entry_file_tree[0]._mock_name) - - assert len(file_names) == 2 - assert "some_file" in file_names - assert "other_file" in file_names - assert "Mock_some_path_some_file" in file_entries - assert "Mock_some_path_other_file" in file_entries - - -def test_filesystem_entry__iterdir_raises( - exfat_fs: ExfatFilesystem, - other_file: ExfatFilesystemEntry, -) -> None: - with pytest.raises(NotADirectoryError): - list(other_file._iterdir()) - - def test_filesystem_entry_iterdir( exfat_fs: ExfatFilesystem, some_path: ExfatFilesystemEntry, diff --git a/tests/filesystems/test_smb.py b/tests/filesystems/test_smb.py index ba64e8cefe..1e60948da1 100644 --- a/tests/filesystems/test_smb.py +++ b/tests/filesystems/test_smb.py @@ -58,7 +58,7 @@ def test_smb_filesystem_windows(monkeypatch: pytest.MonkeyPatch) -> None: mock_conn.listPath.return_value = [mock_file] - entries = list(entry.scandir()) + entries = list(entry.listdir_ext()) mock_conn.listPath.assert_called_with("C$", "testdir/*") assert len(entries) == 1 diff --git a/tests/helpers/test_fsutil.py b/tests/helpers/test_fsutil.py index a77546d818..106ef1fb3b 100644 --- a/tests/helpers/test_fsutil.py +++ b/tests/helpers/test_fsutil.py @@ -413,6 +413,13 @@ def test_target_path_rglob(path_fs: VirtualFilesystem) -> None: ] assert list(path_fs.path("/some").rglob("*.TXT")) == [] assert list(path_fs.path("/some").rglob("*.csv")) == [] + assert list(map(str, path_fs.path("/").rglob("*.*"))) == [ + "/some/symlink.txt", + "/some/file.txt", + "/some/dir/link.txt", + "/some/dir/file.txt", + "/some/dir/nested/file.txt", + ] with patch.object(path_fs, "case_sensitive", False): assert list(map(str, path_fs.path("/some").rglob("*.TXT"))) == [ @@ -476,8 +483,9 @@ def test_target_path_is_symlink(path_fs: VirtualFilesystem) -> None: def test_target_path_is_junction(path_fs: VirtualFilesystem) -> None: assert not path_fs.path("/some").is_junction() - mock_entry = Mock(spec=NtfsFilesystemEntry) - mock_entry.dereference.return_value.is_mount_point.return_value = True + mock_entry = Mock() + mock_entry.__class__ = NtfsFilesystemEntry + mock_entry.entry.is_mount_point.return_value = True path_fs.map_file_entry("/junction", mock_entry) assert path_fs.path("/junction").is_junction() @@ -584,6 +592,28 @@ def test_target_path_errors(path_fs: VirtualFilesystem) -> None: path_fs.path("some/file.txt/dir").stat() +def test_target_path_get(path_fs: VirtualFilesystem) -> None: + # Ensure that TargetPath.get() works as expected + p = path_fs.path("/some/file.txt") + assert not hasattr(p, "_entry") + + entry = p.get() + assert entry is p._entry + + p = next(path_fs.path("/some").iterdir()) + assert not hasattr(p, "_entry") + assert hasattr(p, "_direntry") + + entry = p.get() + assert entry is p._entry + + with ( + patch.object(path_fs, "get", side_effect=Exception("Test exception")), + pytest.raises(Exception, match="Test exception"), + ): + path_fs.path("/some/file.txt").get() + + def test_target_path_not_implemented(path_fs: VirtualFilesystem) -> None: # TargetPath can't do some things, such as write actions or stuff related to a "current" user or path # Ensure all those methods properly error diff --git a/tests/plugins/apps/browser/test_chrome.py b/tests/plugins/apps/browser/test_chrome.py index 44ec6541c7..89a6e2515f 100644 --- a/tests/plugins/apps/browser/test_chrome.py +++ b/tests/plugins/apps/browser/test_chrome.py @@ -382,16 +382,16 @@ def test_windows_chrome_cookies_dpapi(target_win_users_dpapi: Target, fs_win: Vi def test_chrome_windows_snapshots(target_win_users: Target, fs_win: VirtualFilesystem) -> None: - base_dirs = [ + base_dirs = ( "Users\\John\\AppData\\Local\\Google\\Chrome\\User Data\\Default", "Users\\John\\AppData\\Local\\Google\\Chrome\\User Data\\Profile 1", - ] - snapshot_dirs = [ + ) + snapshot_dirs = ( "Users\\John\\AppData\\Local\\Google\\Chrome\\User Data\\Snapshots\\116.0.5038.150\\Default", "Users\\John\\AppData\\Local\\Google\\Chrome\\User Data\\Snapshots\\119.0.7845.119\\Default", "Users\\John\\AppData\\Local\\Google\\Chrome\\User Data\\Snapshots\\116.0.5038.150\\Profile 1", "Users\\John\\AppData\\Local\\Google\\Chrome\\User Data\\Snapshots\\119.0.7845.119\\Profile 1", - ] + ) profile_dirs = base_dirs + snapshot_dirs for dir in profile_dirs: @@ -412,15 +412,14 @@ def test_chrome_windows_snapshots(target_win_users: Target, fs_win: VirtualFiles for records in records_list: assert {"chrome"} == {record.browser for record in records} - for base_dir in base_dirs: - base_path_records = [r for r in records if str(r.source.parent).endswith(base_dir)] + base_path_records = [r for r in records if str(r.source.parent).endswith(base_dirs)] - for snapshot_dir in snapshot_dirs: - # Retrieve records that are in the snapshot's directory. - snapshot_records = [r for r in records if str(r.source.parent).endswith(snapshot_dir)] + # Retrieve records that are in the snapshot's directory. + snapshot_records = [r for r in records if str(r.source.parent).endswith(snapshot_dirs)] # We map the same files in each of the snapshot directories. - assert len(base_path_records) == len(snapshot_records) + # We have two base directories and four snapshot directories, so we expect twice the amount of records. + assert len(base_path_records) == len(snapshot_records) // 2 def test_chrome_windows_11_decryption(target_win_11_users_dpapi: Target, fs_win: VirtualFilesystem) -> None: diff --git a/tests/plugins/apps/browser/test_edge.py b/tests/plugins/apps/browser/test_edge.py index 0c43d2a8ed..58978ab40f 100644 --- a/tests/plugins/apps/browser/test_edge.py +++ b/tests/plugins/apps/browser/test_edge.py @@ -166,14 +166,14 @@ def test_unix_edge_passwords_gnome_plugin(target_edge_unix: Target, fs_unix: Vir def test_edge_windows_snapshots(target_win_users: Target, fs_win: VirtualFilesystem) -> None: - base_dirs = [ + base_dirs = ( "Users\\John\\AppData\\Local\\Microsoft\\Edge\\User Data\\Default", "Users\\John\\AppData\\Local\\Microsoft\\Edge\\User Data\\Profile 1", - ] - snapshot_dirs = [ + ) + snapshot_dirs = ( "Users\\John\\AppData\\Local\\Microsoft\\Edge\\User Data\\Snapshots\\116.0.5038.150\\Default", "Users\\John\\AppData\\Local\\Microsoft\\Edge\\User Data\\Snapshots\\119.0.7845.119\\Default", - ] + ) profile_dirs = base_dirs + snapshot_dirs for dir in profile_dirs: @@ -194,12 +194,10 @@ def test_edge_windows_snapshots(target_win_users: Target, fs_win: VirtualFilesys for records in records_list: assert {"edge"} == {record.browser for record in records} - for base_dir in base_dirs: - base_path_records = [r for r in records if str(r.source.parent).endswith(base_dir)] + base_path_records = [r for r in records if str(r.source.parent).endswith(base_dirs)] - for snapshot_dir in snapshot_dirs: - # Retrieve records that are in the snapshot's directory. - snapshot_records = [r for r in records if str(r.source.parent).endswith(snapshot_dir)] + # Retrieve records that are in the snapshot's directory. + snapshot_records = [r for r in records if str(r.source.parent).endswith(snapshot_dirs)] # We map the same files in each of the snapshot directories. assert len(base_path_records) == len(snapshot_records)