diff --git a/.gitignore b/.gitignore index 2e944b3e83..56bb9673b1 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ __pycache__/ tests/_docs/api tests/_docs/build .tox/ +uv.lock \ No newline at end of file diff --git a/dissect/target/tools/fs.py b/dissect/target/tools/fs.py index 625d486233..7bdc2028a5 100644 --- a/dissect/target/tools/fs.py +++ b/dissect/target/tools/fs.py @@ -64,24 +64,47 @@ def walk(t: Target, path: TargetPath, args: argparse.Namespace) -> None: def cp(t: Target, path: TargetPath, args: argparse.Namespace) -> None: output = pathlib.Path(args.output).expanduser().resolve() + preserve_links = _preserve_links(args) - if path.is_file(): + if path.is_symlink() and preserve_links: + _extract_path(path, output.joinpath(path.name), preserve_links=True) + elif path.is_file(): _extract_path(path, output.joinpath(path.name)) elif path.is_dir(): for extract_path in path.rglob("*"): out_path = output.joinpath(str(extract_path.relative_to(path))) - _extract_path(extract_path, out_path) + _extract_path(extract_path, out_path, preserve_links=preserve_links) else: print(f"[!] Failed, unsuported file type: {path}") +def _preserve_links(args: argparse.Namespace) -> bool: + """Return True if symlinks should be preserved based on the given arguments. + + Symlink preservation is enabled by any of: + - ``-P`` / ``--no-dereference`` + - ``--preserve=links`` or ``--preserve=all`` + - ``-d`` (equivalent to ``--no-dereference --preserve=links``) + """ + if getattr(args, "no_dereference", False) or getattr(args, "d", False): + return True + preserve = getattr(args, "preserve", None) + if preserve is not None: + attrs = {a.strip() for a in preserve.split(",")} + unsupported = attrs - {"links", "all"} + if unsupported: + log.warning("Unsupported --preserve attributes (will be ignored): %s", ", ".join(sorted(unsupported))) + return "links" in attrs or "all" in attrs + return False + + def stat(t: Target, path: TargetPath, args: argparse.Namespace) -> None: if not path or not path.exists(): return print_stat(path, sys.stdout, args.dereference) -def _extract_path(path: TargetPath, output_path: pathlib.Path) -> None: +def _extract_path(path: TargetPath, output_path: pathlib.Path, preserve_links: bool = False) -> None: print(f"{path} -> {output_path}") out_dir = output_path if path.is_dir() else output_path.parent @@ -90,7 +113,10 @@ def _extract_path(path: TargetPath, output_path: pathlib.Path) -> None: if not out_dir.exists(): out_dir.mkdir(parents=True) - if path.is_file(): + if preserve_links and path.is_symlink(): + if sys.platform != "win32": + output_path.symlink_to(path.readlink()) + elif path.is_file(): with output_path.open("wb") as fh: shutil.copyfileobj(path.open(), fh) @@ -142,6 +168,25 @@ def main() -> int: parents=[baseparser], ) parser_cp.add_argument("-o", "--output", default=".", help="output directory") + parser_cp.add_argument( + "-P", + "--no-dereference", + action="store_true", + dest="no_dereference", + help="never follow symbolic links in SOURCE, preserve them as symlinks in the output (UNIX only)", + ) + parser_cp.add_argument( + "--preserve", + metavar="ATTR_LIST", + dest="preserve", + help="preserve the specified attributes (supported: links); --preserve=links preserves symlinks (UNIX only)", + ) + parser_cp.add_argument( + "-d", + action="store_true", + dest="d", + help="same as --no-dereference --preserve=links", + ) parser_cp.set_defaults(handler=cp) configure_generic_arguments(parser) diff --git a/tests/tools/test_fs.py b/tests/tools/test_fs.py index 248a844cf7..82649e6ce3 100644 --- a/tests/tools/test_fs.py +++ b/tests/tools/test_fs.py @@ -1,13 +1,14 @@ from __future__ import annotations import io +import sys as _sys from typing import TYPE_CHECKING from unittest.mock import Mock import pytest from dissect.target.filesystem import VirtualFile, VirtualFilesystem -from dissect.target.tools.fs import _extract_path, cp +from dissect.target.tools.fs import _extract_path, _preserve_links, cp from dissect.target.tools.fs import main as target_fs if TYPE_CHECKING: @@ -37,7 +38,8 @@ def test_target_fs( path: str, expected_files: int, tmp_path: Path, capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch ) -> None: with monkeypatch.context() as m: - m.setattr("sys.argv", ["target-fs", "tests/_data/tools/info/image.tar", "cp", path, "-o", str(tmp_path)]) + m.setattr("sys.argv", [ + "target-fs", "tests/_data/tools/info/image.tar", "cp", path, "-o", str(tmp_path)]) target_fs() stdout, _ = capsys.readouterr() @@ -118,7 +120,8 @@ def test_cp_non_existing_file(vfs: VirtualFilesystem, tmp_path: Path) -> None: @pytest.mark.parametrize( "files", - [["dir/", "dir/test", "dir/subdirectory_1/", "dir/subdirectory_2/", "dir/subdirectory_3/subdirectory_4/"]], + [["dir/", "dir/test", "dir/subdirectory_1/", + "dir/subdirectory_2/", "dir/subdirectory_3/subdirectory_4/"]], ) def test_cp_subdirectories(vfs: VirtualFilesystem, files: list[str], tmp_path: Path) -> None: output_path = tmp_path / "out" @@ -132,3 +135,157 @@ def test_cp_subdirectories(vfs: VirtualFilesystem, files: list[str], tmp_path: P for directories in filesystem_files: assert output_path.joinpath(directories).exists() + + +def _symlink_args(**kwargs) -> Mock: + """Return a Mock args object with symlink-related flags set to their defaults.""" + args = Mock() + args.no_dereference = False + args.preserve = None + args.d = False + for k, v in kwargs.items(): + setattr(args, k, v) + return args + + +@pytest.mark.parametrize("files", [[]]) +@pytest.mark.skipif(_sys.platform == "win32", reason="symlink preservation not supported on Windows") +def test_cp_symlink_no_dereference(vfs: VirtualFilesystem, tmp_path: Path) -> None: + """Test that -P/--no-dereference preserves symlinks instead of copying target content.""" + vfs.map_file_entry("target_file", VirtualFile( + vfs, "target_file", io.BytesIO(b"content"))) + vfs.symlink("/target_file", "link_to_file") + + output_path = tmp_path / "out" + args = _symlink_args(output=str(output_path), no_dereference=True) + + cp(None, vfs.path("link_to_file"), args) + + result = output_path / "link_to_file" + assert result.is_symlink() + assert str(result.readlink()) == "/target_file" + + +@pytest.mark.parametrize("files", [[]]) +@pytest.mark.skipif(_sys.platform == "win32", reason="symlink preservation not supported on Windows") +def test_cp_symlink_preserve_links(vfs: VirtualFilesystem, tmp_path: Path) -> None: + """Test that --preserve=links preserves symlinks.""" + vfs.map_file_entry("target_file", VirtualFile( + vfs, "target_file", io.BytesIO(b"content"))) + vfs.symlink("/target_file", "link_to_file") + + output_path = tmp_path / "out" + args = _symlink_args(output=str(output_path), preserve="links") + + cp(None, vfs.path("link_to_file"), args) + + result = output_path / "link_to_file" + assert result.is_symlink() + assert str(result.readlink()) == "/target_file" + + +@pytest.mark.parametrize("files", [[]]) +@pytest.mark.skipif(_sys.platform == "win32", reason="symlink preservation not supported on Windows") +def test_cp_symlink_d_flag(vfs: VirtualFilesystem, tmp_path: Path) -> None: + """Test that -d (--no-dereference --preserve=links) preserves symlinks.""" + vfs.map_file_entry("target_file", VirtualFile( + vfs, "target_file", io.BytesIO(b"content"))) + vfs.symlink("/target_file", "link_to_file") + + output_path = tmp_path / "out" + args = _symlink_args(output=str(output_path), d=True) + + cp(None, vfs.path("link_to_file"), args) + + result = output_path / "link_to_file" + assert result.is_symlink() + assert str(result.readlink()) == "/target_file" + + +@pytest.mark.parametrize("files", [[]]) +@pytest.mark.skipif(_sys.platform == "win32", reason="symlink preservation not supported on Windows") +def test_cp_symlink_in_directory_no_dereference(vfs: VirtualFilesystem, tmp_path: Path) -> None: + """Test that -P preserves symlinks encountered during directory traversal.""" + vfs.makedirs("dir") + vfs.map_file_entry("dir/real_file", VirtualFile(vfs, + "dir/real_file", io.BytesIO(b"content"))) + vfs.symlink("/dir/real_file", "dir/link_to_file") + + output_path = tmp_path / "out" + args = _symlink_args(output=str(output_path), no_dereference=True) + + cp(None, vfs.path("dir"), args) + + assert (output_path / "link_to_file").is_symlink() + assert str((output_path / "link_to_file").readlink()) == "/dir/real_file" + + +@pytest.mark.parametrize("files", [[]]) +def test_cp_symlink_dereference_by_default(vfs: VirtualFilesystem, tmp_path: Path) -> None: + """Test that without any flags, symlinks are followed and target content is copied.""" + vfs.map_file_entry("target_file", VirtualFile( + vfs, "target_file", io.BytesIO(b"content"))) + vfs.symlink("/target_file", "link_to_file") + + output_path = tmp_path / "out" + args = _symlink_args(output=str(output_path)) + + cp(None, vfs.path("link_to_file"), args) + + result = output_path / "link_to_file" + assert result.exists() + assert not result.is_symlink() + + +@pytest.mark.parametrize( + ("no_dereference", "preserve", "d", "expected"), + [ + (True, None, False, True), + (False, "links", False, True), + (False, "all", False, True), + (False, "links,mode", False, True), + (False, None, True, True), + (False, None, False, False), + (False, "mode,ownership", False, False), + ], +) +def test_preserve_links( + no_dereference: bool, preserve: str | None, d: bool, expected: bool +) -> None: + args = Mock() + args.no_dereference = no_dereference + args.preserve = preserve + args.d = d + assert _preserve_links(args) == expected + + +@pytest.mark.parametrize( + ("preserve", "expected_warnings"), + [ + ("mode", {"mode"}), + ("ownership", {"ownership"}), + ("mode,ownership", {"mode", "ownership"}), + ("links,mode", {"mode"}), + ("links", set()), + ("all", set()), + ], +) +def test_preserve_links_warns_unsupported( + preserve: str, expected_warnings: set[str], caplog: pytest.LogCaptureFixture +) -> None: + args = Mock() + args.no_dereference = False + args.preserve = preserve + args.d = False + + import logging + + with caplog.at_level(logging.WARNING, logger="dissect.target.tools.fs"): + _preserve_links(args) + + if expected_warnings: + assert caplog.records, "Expected a warning but none was emitted" + warned_attrs = {attr for record in caplog.records for attr in record.message.split(": ", 1)[-1].split(", ")} + assert warned_attrs == expected_warnings + else: + assert not caplog.records, f"Expected no warnings but got: {[r.message for r in caplog.records]}"