Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 145 additions & 67 deletions dissect/target/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pathlib
import stat
from collections import defaultdict
from functools import cache
from typing import TYPE_CHECKING, Any, BinaryIO, Final

from dissect.target.exceptions import (
Expand All @@ -24,6 +25,8 @@
if TYPE_CHECKING:
from collections.abc import Callable, Iterator

from typing_extensions import Self

from dissect.target.target import Target

FILESYSTEMS: list[type[Filesystem]] = []
Expand Down Expand Up @@ -191,7 +194,7 @@ def iterdir(self, path: str) -> Iterator[str]:
"""
return self.get(path).iterdir()

def scandir(self, path: str) -> Iterator[FilesystemEntry]:
def scandir(self, path: str) -> Iterator[DirEntry]:
"""Iterate over the contents of a directory, return them as FilesystemEntry's.

Args:
Expand Down Expand Up @@ -222,7 +225,7 @@ def listdir_ext(self, path: str) -> list[FilesystemEntry]:
Returns:
A list of FilesystemEntry's.
"""
return list(self.scandir(path))
return [e.get() for e in self.scandir(path)]

def walk(
self,
Expand Down Expand Up @@ -251,7 +254,7 @@ def walk_ext(
topdown: bool = True,
onerror: Callable[[Exception], None] | None = None,
followlinks: bool = False,
) -> Iterator[tuple[list[FilesystemEntry], list[FilesystemEntry], list[FilesystemEntry]]]:
) -> Iterator[tuple[list[FilesystemEntry], list[DirEntry], list[DirEntry]]]:
"""Recursively walk a directory pointed to by ``path``, returning :class:`FilesystemEntry` of files
and directories.

Expand All @@ -266,7 +269,7 @@ def walk_ext(
"""
return self.get(path).walk_ext(topdown, onerror, followlinks)

def recurse(self, path: str) -> Iterator[FilesystemEntry]:
def recurse(self, path: str) -> Iterator[DirEntry]:
"""Recursively walk a directory and yield contents as :class:`FilesystemEntry`.

Does not follow symbolic links.
Expand Down Expand Up @@ -519,7 +522,7 @@ def __repr__(self) -> str:
def __str__(self) -> str:
return str(self.path)

def _resolve(self, follow_symlinks: bool = True) -> FilesystemEntry:
def _resolve(self, follow_symlinks: bool = True) -> Self:
"""Helper method to resolve symbolic links.

If ``follow_symlinks`` is ``False``, this function is effectively a no-op.
Expand All @@ -535,7 +538,7 @@ def _resolve(self, follow_symlinks: bool = True) -> FilesystemEntry:
return self.readlink_ext()
return self

def get(self, path: str) -> FilesystemEntry:
def get(self, path: str) -> Self:
"""Retrieve a :class:`FilesystemEntry` relative to this entry.

Args:
Expand All @@ -560,9 +563,10 @@ def iterdir(self) -> Iterator[str]:
Returns:
An iterator of directory entries as path strings.
"""
raise NotImplementedError
for entry in self.scandir():
yield entry.name

def scandir(self) -> Iterator[FilesystemEntry]:
def scandir(self) -> Iterator[DirEntry]:
"""Iterate over the contents of a directory, yields :class:`FilesystemEntry`.

Returns:
Expand All @@ -578,13 +582,13 @@ def listdir(self) -> list[str]:
"""
return list(self.iterdir())

def listdir_ext(self) -> list[FilesystemEntry]:
def listdir_ext(self) -> list[Self]:
"""List the contents of a directory as a list of :class:`FilesystemEntry`.

Returns:
A list of :class:`FilesystemEntry`.
"""
return list(self.scandir())
return [e.get() for e in self.scandir()]

def walk(
self,
Expand Down Expand Up @@ -615,7 +619,7 @@ def walk_ext(
topdown: bool = True,
onerror: Callable[[Exception], None] | None = None,
followlinks: bool = False,
) -> Iterator[tuple[list[FilesystemEntry], list[FilesystemEntry], list[FilesystemEntry]]]:
) -> Iterator[tuple[list[Self], list[Self], list[Self]]]:
"""Recursively walk a directory and yield its contents as :class:`FilesystemEntry` split in a tuple of
lists of files, directories and symlinks.

Expand All @@ -629,13 +633,13 @@ def walk_ext(
"""
yield from fsutil.walk_ext(self, topdown, onerror, followlinks)

def recurse(self) -> Iterator[FilesystemEntry]:
"""Recursively walk a directory and yield its contents as :class:`FilesystemEntry`.
def recurse(self) -> Iterator[DirEntry]:
"""Recursively walk a directory and yield its contents as :class:`DirEntry`.

Does not follow symbolic links.

Returns:
An iterator of :class:`FilesystemEntry`.
An iterator of :class:`DirEntry`.
"""
yield from fsutil.recurse(self)

Expand All @@ -651,7 +655,7 @@ def glob(self, pattern: str) -> Iterator[str]:
for entry in self.glob_ext(pattern):
yield entry.path

def glob_ext(self, pattern: str) -> Iterator[FilesystemEntry]:
def glob_ext(self, pattern: str) -> Iterator[Self]:
"""Iterate over the directory part of ``pattern``, returning entries matching
``pattern`` as :class:`FilesysmteEntry`.

Expand Down Expand Up @@ -746,7 +750,7 @@ def readlink(self) -> str:
The path the link points to."""
raise NotImplementedError

def readlink_ext(self) -> FilesystemEntry:
def readlink_ext(self) -> Self:
"""Read the link where this entry points to, return the resulting path as :class:`FilesystemEntry`.

If it is a symlink and returns the string that corresponds to that path.
Expand Down Expand Up @@ -841,14 +845,109 @@ def hash(self, algos: list[str] | list[Callable] | None = None) -> tuple[str]:
return hashutil.common(self.open())


class DirEntry:
"""Directory entry base class. Closely models ``os.DirEntry``.

Filesystem implementations are encouraged to subclass this class to provide efficient
implementations of the various methods.

Args:
fs: The filesystem the entry belongs to.
path: The path of the parent directory.
name: The name of the entry.
entry: The raw entry backing this directory entry.
"""

def __init__(self, fs: Filesystem, path: str, name: str, entry: Any):
self.fs = fs
"""The filesystem the entry belongs to."""
self.path = fsutil.join(path, name, alt_separator=self.fs.alt_separator)
"""The full path of the entry."""
self.name = name
"""The name of the entry."""
self.entry = entry
"""The raw entry backing this directory entry."""

self.stat = cache(self.stat)

def __fspath__(self) -> str:
return self.path

def __repr__(self) -> str:
return f"<DirEntry {self.name!r}>"

def get(self) -> FilesystemEntry:
"""Retrieve the :class:`FilesystemEntry` this directory entry points to.

Subclasses should override this method to provide an efficient implementation.
"""
return self.fs.get(self.path)

def is_dir(self, *, follow_symlinks: bool = True) -> bool:
"""Return whether this entry is a directory or a symbolic link pointing to a directory.

Subclasses should override this method to provide an efficient implementation.
"""
try:
return stat.S_ISDIR(self.stat(follow_symlinks=follow_symlinks).st_mode)
except FileNotFoundError:
return False

def is_file(self, *, follow_symlinks: bool = True) -> bool:
"""Return whether this entry is a file or a symbolic link pointing to a file.

Subclasses should override this method to provide an efficient implementation.
"""
try:
return stat.S_ISREG(self.stat(follow_symlinks=follow_symlinks).st_mode)
except FileNotFoundError:
return False

def is_symlink(self) -> bool:
"""Return whether this entry is a symbolic link.

Subclasses should override this method to provide an efficient implementation.
"""
return stat.S_ISLNK(self.stat(follow_symlinks=False).st_mode)

def is_junction(self) -> bool:
"""Return whether this entry is a junction (only valid for NTFS)."""
return False

def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry.

Subclasses should override this method to provide an efficient implementation.

Note that this may return slightly different information than a "full" stat on the full filesystem entry,
as in most cases this will generate a stat based on the information available in the directory entry only.
"""
return self.fs.stat(self.path, follow_symlinks=follow_symlinks)

def inode(self) -> int:
"""Return the inode number of this entry."""
return self.stat(follow_symlinks=False).st_ino


class VirtualDirEntry(DirEntry):
fs: VirtualFilesystem
entry: FilesystemEntry

def get(self) -> FilesystemEntry:
return self.entry

def stat(self, *, follow_symlinks: bool = True) -> fsutil.stat_result:
return self.entry.stat(follow_symlinks=follow_symlinks)


class VirtualDirectory(FilesystemEntry):
"""Virtual directory implementation. Backed by a dict."""

def __init__(self, fs: VirtualFilesystem, path: str):
super().__init__(fs, path, None)
self.up = None
self.top = None
self.entries = {}
self.top: FilesystemEntry | None = None
self.entries: dict[str, FilesystemEntry] = {}

def __getitem__(self, item: str) -> FilesystemEntry:
if not self.fs.case_sensitive:
Expand Down Expand Up @@ -879,25 +978,11 @@ def add(self, name: str, entry: FilesystemEntry) -> None:
def get(self, path: str) -> FilesystemEntry:
return self.fs.get(path, relentry=self)

def iterdir(self) -> Iterator[str]:
def scandir(self) -> Iterator[DirEntry]:
yielded = set()
for entry in self.entries:
yield entry
yielded.add(entry)

# self.top used to be a reference to a filesystem. This is now a reference to
# any filesystem entry, usually the root of a filesystem.
if self.top:
for entry in self.top.iterdir():
if entry in yielded or (not self.fs.case_sensitive and entry.lower() in yielded):
continue
yield entry

def scandir(self) -> Iterator[FilesystemEntry]:
yielded = set()
for entry in self.entries.values():
yield entry
yielded.add(entry.name)
for name, entry in self.entries.items():
yield VirtualDirEntry(self.fs, self.path, entry.name, entry)
yielded.add(name)

# self.top used to be a reference to a filesystem. This is now a reference to
# any filesystem entry, usually the root of a filesystem.
Expand Down Expand Up @@ -970,10 +1055,7 @@ def get(self, path: str) -> FilesystemEntry:
return self
raise NotADirectoryError(f"'{self.path}' is not a directory")

def iterdir(self) -> Iterator[str]:
raise NotADirectoryError(f"'{self.path}' is not a directory")

def scandir(self) -> Iterator[FilesystemEntry]:
def scandir(self) -> Iterator[DirEntry]:
raise NotADirectoryError(f"'{self.path}' is not a directory")

def open(self) -> BinaryIO:
Expand Down Expand Up @@ -1056,10 +1138,7 @@ def lattr(self) -> Any:
def get(self, path: str) -> FilesystemEntry:
return self.fs.get(path, self)

def iterdir(self) -> Iterator[str]:
yield from self.readlink_ext().iterdir()

def scandir(self) -> Iterator[FilesystemEntry]:
def scandir(self) -> Iterator[DirEntry]:
yield from self.readlink_ext().scandir()

def open(self) -> BinaryIO:
Expand Down Expand Up @@ -1517,6 +1596,14 @@ def __getattr__(self, attr: str) -> Any:
return object.__getattribute__(self, attr)


class LayerDirEntry(DirEntry):
fs: LayerFilesystem
entry: list[DirEntry]

def get(self) -> LayerFilesystemEntry:
return LayerFilesystemEntry(self.fs, self.path, [e.get() for e in self.entry])


class LayerFilesystemEntry(FilesystemEntry):
def __init__(self, fs: Filesystem, path: str, entry: FilesystemEntry):
super().__init__(fs, path, EntryList(entry))
Expand All @@ -1542,24 +1629,12 @@ def get(self, path: str) -> FilesystemEntry:
def open(self) -> BinaryIO:
return self._resolve()._exec("open")

def iterdir(self) -> Iterator[str]:
yielded = {".", ".."}
selfentry = self._resolve()
for fsentry in selfentry.entries:
for entry_name in fsentry.iterdir():
name = entry_name if selfentry.fs.case_sensitive else entry_name.lower()
if name in yielded:
continue

yield entry_name
yielded.add(name)

def scandir(self) -> Iterator[LayerFilesystemEntry]:
def scandir(self) -> Iterator[LayerDirEntry]:
# Every entry is actually a list of entries from the different
# overlaying FSes, of which each may implement a different function
# like .stat() or .open()
items = defaultdict(list)
selfentry = self._resolve()
selfentry: LayerFilesystemEntry = self._resolve()
for fsentry in selfentry.entries:
for entry in fsentry.scandir():
name = entry.name if selfentry.fs.case_sensitive else entry.name.lower()
Expand All @@ -1572,9 +1647,7 @@ def scandir(self) -> Iterator[LayerFilesystemEntry]:
# The filename for the first entry is taken. Note that in case of
# non case-sensitive FSes, the different entries from the
# overlaying FSes may have different casing of the name.
entry_name = entries[0].name
path = fsutil.join(selfentry.path, entry_name, alt_separator=selfentry.fs.alt_separator)
yield LayerFilesystemEntry(selfentry.fs, path, entries)
yield DirEntry(selfentry.fs, selfentry.path, entries[0].name, entries)

def is_file(self, follow_symlinks: bool = True) -> bool:
try:
Expand Down Expand Up @@ -1627,6 +1700,15 @@ def get(self, path: str, relentry: LayerFilesystemEntry | None = None) -> RootFi
return entry


class RootDirEntry(LayerDirEntry):
fs: RootFilesystem

def get(self) -> RootFilesystemEntry:
entry = super().get()
entry.__class__ = RootFilesystemEntry
return entry


class RootFilesystemEntry(LayerFilesystemEntry):
fs: RootFilesystem

Expand All @@ -1640,14 +1722,10 @@ def open(self) -> BinaryIO:
self.fs.target.log.trace("%r::open()", self)
return super().open()

def iterdir(self) -> Iterator[str]:
self.fs.target.log.trace("%r::iterdir()", self)
yield from super().iterdir()

def scandir(self) -> Iterator[RootFilesystemEntry]:
def scandir(self) -> Iterator[DirEntry]:
self.fs.target.log.trace("%r::scandir()", self)
for entry in super().scandir():
entry.__class__ = RootFilesystemEntry
entry.__class__ = RootDirEntry
yield entry

def is_file(self, follow_symlinks: bool = True) -> bool:
Expand Down
Loading
Loading