Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ __pycache__/
tests/_docs/api
tests/_docs/build
.tox/
uv.lock
53 changes: 49 additions & 4 deletions dissect/target/tools/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,47 @@ def walk(t: Target, path: TargetPath, args: argparse.Namespace) -> None:

def cp(t: Target, path: TargetPath, args: argparse.Namespace) -> None:
output = pathlib.Path(args.output).expanduser().resolve()
preserve_links = _preserve_links(args)

if path.is_file():
if path.is_symlink() and preserve_links:
_extract_path(path, output.joinpath(path.name), preserve_links=True)
elif path.is_file():
_extract_path(path, output.joinpath(path.name))
elif path.is_dir():
for extract_path in path.rglob("*"):
out_path = output.joinpath(str(extract_path.relative_to(path)))
_extract_path(extract_path, out_path)
_extract_path(extract_path, out_path, preserve_links=preserve_links)
else:
print(f"[!] Failed, unsuported file type: {path}")


def _preserve_links(args: argparse.Namespace) -> bool:
"""Return True if symlinks should be preserved based on the given arguments.

Symlink preservation is enabled by any of:
- ``-P`` / ``--no-dereference``
- ``--preserve=links`` or ``--preserve=all``
- ``-d`` (equivalent to ``--no-dereference --preserve=links``)
"""
if getattr(args, "no_dereference", False) or getattr(args, "d", False):
return True
preserve = getattr(args, "preserve", None)
if preserve is not None:
attrs = {a.strip() for a in preserve.split(",")}
unsupported = attrs - {"links", "all"}
if unsupported:
log.warning("Unsupported --preserve attributes (will be ignored): %s", ", ".join(sorted(unsupported)))
return "links" in attrs or "all" in attrs
return False


def stat(t: Target, path: TargetPath, args: argparse.Namespace) -> None:
if not path or not path.exists():
return
print_stat(path, sys.stdout, args.dereference)


def _extract_path(path: TargetPath, output_path: pathlib.Path) -> None:
def _extract_path(path: TargetPath, output_path: pathlib.Path, preserve_links: bool = False) -> None:
print(f"{path} -> {output_path}")

out_dir = output_path if path.is_dir() else output_path.parent
Expand All @@ -90,7 +113,10 @@ def _extract_path(path: TargetPath, output_path: pathlib.Path) -> None:
if not out_dir.exists():
out_dir.mkdir(parents=True)

if path.is_file():
if preserve_links and path.is_symlink():
if sys.platform != "win32":
output_path.symlink_to(path.readlink())
elif path.is_file():
with output_path.open("wb") as fh:
shutil.copyfileobj(path.open(), fh)

Expand Down Expand Up @@ -142,6 +168,25 @@ def main() -> int:
parents=[baseparser],
)
parser_cp.add_argument("-o", "--output", default=".", help="output directory")
parser_cp.add_argument(
"-P",
"--no-dereference",
action="store_true",
dest="no_dereference",
help="never follow symbolic links in SOURCE, preserve them as symlinks in the output (UNIX only)",
)
parser_cp.add_argument(
"--preserve",
metavar="ATTR_LIST",
dest="preserve",
help="preserve the specified attributes (supported: links); --preserve=links preserves symlinks (UNIX only)",
)
parser_cp.add_argument(
"-d",
action="store_true",
dest="d",
help="same as --no-dereference --preserve=links",
)
parser_cp.set_defaults(handler=cp)
configure_generic_arguments(parser)

Expand Down
163 changes: 160 additions & 3 deletions tests/tools/test_fs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from __future__ import annotations

import io
import sys as _sys
from typing import TYPE_CHECKING
from unittest.mock import Mock

import pytest

from dissect.target.filesystem import VirtualFile, VirtualFilesystem
from dissect.target.tools.fs import _extract_path, cp
from dissect.target.tools.fs import _extract_path, _preserve_links, cp
from dissect.target.tools.fs import main as target_fs

if TYPE_CHECKING:
Expand Down Expand Up @@ -37,7 +38,8 @@ def test_target_fs(
path: str, expected_files: int, tmp_path: Path, capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch
) -> None:
with monkeypatch.context() as m:
m.setattr("sys.argv", ["target-fs", "tests/_data/tools/info/image.tar", "cp", path, "-o", str(tmp_path)])
m.setattr("sys.argv", [
"target-fs", "tests/_data/tools/info/image.tar", "cp", path, "-o", str(tmp_path)])

target_fs()
stdout, _ = capsys.readouterr()
Expand Down Expand Up @@ -118,7 +120,8 @@ def test_cp_non_existing_file(vfs: VirtualFilesystem, tmp_path: Path) -> None:

@pytest.mark.parametrize(
"files",
[["dir/", "dir/test", "dir/subdirectory_1/", "dir/subdirectory_2/", "dir/subdirectory_3/subdirectory_4/"]],
[["dir/", "dir/test", "dir/subdirectory_1/",
"dir/subdirectory_2/", "dir/subdirectory_3/subdirectory_4/"]],
)
def test_cp_subdirectories(vfs: VirtualFilesystem, files: list[str], tmp_path: Path) -> None:
output_path = tmp_path / "out"
Expand All @@ -132,3 +135,157 @@ def test_cp_subdirectories(vfs: VirtualFilesystem, files: list[str], tmp_path: P

for directories in filesystem_files:
assert output_path.joinpath(directories).exists()


def _symlink_args(**kwargs) -> Mock:
"""Return a Mock args object with symlink-related flags set to their defaults."""
args = Mock()
args.no_dereference = False
args.preserve = None
args.d = False
for k, v in kwargs.items():
setattr(args, k, v)
return args


@pytest.mark.parametrize("files", [[]])
@pytest.mark.skipif(_sys.platform == "win32", reason="symlink preservation not supported on Windows")
def test_cp_symlink_no_dereference(vfs: VirtualFilesystem, tmp_path: Path) -> None:
"""Test that -P/--no-dereference preserves symlinks instead of copying target content."""
vfs.map_file_entry("target_file", VirtualFile(
vfs, "target_file", io.BytesIO(b"content")))
vfs.symlink("/target_file", "link_to_file")

output_path = tmp_path / "out"
args = _symlink_args(output=str(output_path), no_dereference=True)

cp(None, vfs.path("link_to_file"), args)

result = output_path / "link_to_file"
assert result.is_symlink()
assert str(result.readlink()) == "/target_file"


@pytest.mark.parametrize("files", [[]])
@pytest.mark.skipif(_sys.platform == "win32", reason="symlink preservation not supported on Windows")
def test_cp_symlink_preserve_links(vfs: VirtualFilesystem, tmp_path: Path) -> None:
"""Test that --preserve=links preserves symlinks."""
vfs.map_file_entry("target_file", VirtualFile(
vfs, "target_file", io.BytesIO(b"content")))
vfs.symlink("/target_file", "link_to_file")

output_path = tmp_path / "out"
args = _symlink_args(output=str(output_path), preserve="links")

cp(None, vfs.path("link_to_file"), args)

result = output_path / "link_to_file"
assert result.is_symlink()
assert str(result.readlink()) == "/target_file"


@pytest.mark.parametrize("files", [[]])
@pytest.mark.skipif(_sys.platform == "win32", reason="symlink preservation not supported on Windows")
def test_cp_symlink_d_flag(vfs: VirtualFilesystem, tmp_path: Path) -> None:
"""Test that -d (--no-dereference --preserve=links) preserves symlinks."""
vfs.map_file_entry("target_file", VirtualFile(
vfs, "target_file", io.BytesIO(b"content")))
vfs.symlink("/target_file", "link_to_file")

output_path = tmp_path / "out"
args = _symlink_args(output=str(output_path), d=True)

cp(None, vfs.path("link_to_file"), args)

result = output_path / "link_to_file"
assert result.is_symlink()
assert str(result.readlink()) == "/target_file"


@pytest.mark.parametrize("files", [[]])
@pytest.mark.skipif(_sys.platform == "win32", reason="symlink preservation not supported on Windows")
def test_cp_symlink_in_directory_no_dereference(vfs: VirtualFilesystem, tmp_path: Path) -> None:
"""Test that -P preserves symlinks encountered during directory traversal."""
vfs.makedirs("dir")
vfs.map_file_entry("dir/real_file", VirtualFile(vfs,
"dir/real_file", io.BytesIO(b"content")))
vfs.symlink("/dir/real_file", "dir/link_to_file")

output_path = tmp_path / "out"
args = _symlink_args(output=str(output_path), no_dereference=True)

cp(None, vfs.path("dir"), args)

assert (output_path / "link_to_file").is_symlink()
assert str((output_path / "link_to_file").readlink()) == "/dir/real_file"


@pytest.mark.parametrize("files", [[]])
def test_cp_symlink_dereference_by_default(vfs: VirtualFilesystem, tmp_path: Path) -> None:
"""Test that without any flags, symlinks are followed and target content is copied."""
vfs.map_file_entry("target_file", VirtualFile(
vfs, "target_file", io.BytesIO(b"content")))
vfs.symlink("/target_file", "link_to_file")

output_path = tmp_path / "out"
args = _symlink_args(output=str(output_path))

cp(None, vfs.path("link_to_file"), args)

result = output_path / "link_to_file"
assert result.exists()
assert not result.is_symlink()


@pytest.mark.parametrize(
("no_dereference", "preserve", "d", "expected"),
[
(True, None, False, True),
(False, "links", False, True),
(False, "all", False, True),
(False, "links,mode", False, True),
(False, None, True, True),
(False, None, False, False),
(False, "mode,ownership", False, False),
],
)
def test_preserve_links(
no_dereference: bool, preserve: str | None, d: bool, expected: bool
) -> None:
args = Mock()
args.no_dereference = no_dereference
args.preserve = preserve
args.d = d
assert _preserve_links(args) == expected


@pytest.mark.parametrize(
("preserve", "expected_warnings"),
[
("mode", {"mode"}),
("ownership", {"ownership"}),
("mode,ownership", {"mode", "ownership"}),
("links,mode", {"mode"}),
("links", set()),
("all", set()),
],
)
def test_preserve_links_warns_unsupported(
preserve: str, expected_warnings: set[str], caplog: pytest.LogCaptureFixture
) -> None:
args = Mock()
args.no_dereference = False
args.preserve = preserve
args.d = False

import logging

with caplog.at_level(logging.WARNING, logger="dissect.target.tools.fs"):
_preserve_links(args)

if expected_warnings:
assert caplog.records, "Expected a warning but none was emitted"
warned_attrs = {attr for record in caplog.records for attr in record.message.split(": ", 1)[-1].split(", ")}
assert warned_attrs == expected_warnings
else:
assert not caplog.records, f"Expected no warnings but got: {[r.message for r in caplog.records]}"