Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion dfetch/manifest/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import re
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from pathlib import Path, PurePosixPath, PureWindowsPath
from typing import IO, Any

import yaml
Expand Down Expand Up @@ -395,6 +395,15 @@ def validate_project_name(self, name: str) -> None:
@staticmethod
def validate_destination(dst: str) -> None:
"""Raise ValueError if *dst* is not a safe manifest destination path."""
if (
PurePosixPath(dst).is_absolute()
or PureWindowsPath(dst).is_absolute()
or bool(PureWindowsPath(dst).anchor)
):
raise ValueError(
f"Destination '{dst}' is an absolute path. "
"Paths must be relative to the manifest directory."
)
if any(part == ".." for part in Path(dst).parts):
raise ValueError(
f"Destination '{dst}' contains '..'. "
Expand Down
5 changes: 2 additions & 3 deletions dfetch/project/svnsubproject.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def _remove_ignored_files(self) -> None:
def _fetch_impl(self, version: Version) -> tuple[Version, list[Dependency]]:
"""Get the revision of the remote and place it at the local path."""
branch, branch_path, revision = self._determine_what_to_fetch(version)
rev_arg = f"--revision {revision}" if revision else ""

complete_path = "/".join(
filter(None, [self.remote, branch_path.strip(), self.source])
Expand All @@ -129,7 +128,7 @@ def _fetch_impl(self, version: Version) -> tuple[Version, list[Dependency]]:

complete_path, file_pattern = self._parse_file_pattern(complete_path)

SvnRepo.export(complete_path, rev_arg, self.local_path)
SvnRepo.export(complete_path, revision, self.local_path)

if file_pattern:
for file in find_non_matching_files(self.local_path, (file_pattern,)):
Expand All @@ -148,7 +147,7 @@ def _fetch_impl(self, version: Version) -> tuple[Version, list[Dependency]]:
if os.path.isdir(self.local_path)
else os.path.dirname(self.local_path)
)
SvnRepo.export(f"{root_branch_path}/{license_files[0]}", rev_arg, dest)
SvnRepo.export(f"{root_branch_path}/{license_files[0]}", revision, dest)

if self.ignore:
self._remove_ignored_files()
Expand Down
54 changes: 50 additions & 4 deletions dfetch/vcs/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import os
import pathlib
import shutil
import stat
import sys
import tarfile
import tempfile
Expand Down Expand Up @@ -353,16 +354,63 @@ def check_zip_members(zf: zipfile.ZipFile) -> list[zipfile.ZipInfo]:

Raises:
RuntimeError: When any member contains an absolute path, a ``..``
component, or when the archive exceeds the size/count limits.
component, a symlink with an unsafe target, or when the archive
exceeds the size/count limits.
"""
members = zf.infolist()
ArchiveLocalRepo._check_archive_limits(
len(members), sum(info.file_size for info in members)
)
for info in members:
ArchiveLocalRepo._check_archive_member_path(info.filename)
ArchiveLocalRepo._check_zip_member_type(info, zf)
return members

@staticmethod
def _is_unsafe_symlink_target(target: str) -> bool:
r"""Return *True* when *target* is an unsafe symlink destination.

A target is unsafe if it is absolute or contains ``..`` components
under either POSIX or Windows path semantics, so that backslash-based
traversals like ``..\\..\\evil`` are caught on any host OS.
"""
posix = pathlib.PurePosixPath(target)
win = pathlib.PureWindowsPath(target)
return (
posix.is_absolute()
or bool(win.anchor)
or any(part == ".." for part in posix.parts)
or any(part == ".." for part in win.parts)
)

@staticmethod
def _check_zip_member_type(info: zipfile.ZipInfo, zf: zipfile.ZipFile) -> None:
"""Reject dangerous ZIP member types (mirrors :meth:`_check_tar_member_type`).

Detects Unix symlinks encoded in the ``external_attr`` high word and
validates their targets with the same rules applied to TAR symlinks:
absolute targets and targets containing ``..`` are rejected.

Raises:
RuntimeError: When *info* is a symlink with an unsafe target.
"""
max_symlink_target = 4096
unix_mode = info.external_attr >> 16
if stat.S_ISLNK(unix_mode):
with zf.open(info) as member_file:
raw = member_file.read(max_symlink_target + 1)
if len(raw) > max_symlink_target:
raise RuntimeError(
f"Archive contains a symlink with an oversized target: "
f"{info.filename!r}"
)
target = raw.decode(errors="replace")
if ArchiveLocalRepo._is_unsafe_symlink_target(target):
raise RuntimeError(
f"Archive contains a symlink with an unsafe target: "
f"{info.filename!r} -> {target!r}"
)

@staticmethod
def _check_tar_member_type(member: tarfile.TarInfo) -> None:
"""Reject dangerous TAR member types that could harm the host system.
Expand All @@ -389,9 +437,7 @@ def _check_tar_member_type(member: tarfile.TarInfo) -> None:
"""
if member.issym():
target = member.linkname
if os.path.isabs(target) or any(
part == ".." for part in pathlib.PurePosixPath(target).parts
):
if ArchiveLocalRepo._is_unsafe_symlink_target(target):
raise RuntimeError(
f"Archive contains a symlink with an unsafe target: "
f"{member.name!r} -> {target!r}"
Expand Down
16 changes: 14 additions & 2 deletions dfetch/vcs/svn.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,23 @@ def untracked_files(path: str, ignore: Sequence[str]) -> list[str]:

@staticmethod
def export(url: str, rev: str = "", dst: str = ".") -> None:
"""Export the given revision from url to destination."""
"""Export the given revision from url to destination.

Args:
url: Repository URL to export from.
rev: Bare revision number (digits only) or empty string for HEAD.
Must not include flag names such as ``--revision``.
dst: Local destination path.

Raises:
ValueError: If *rev* is non-empty and contains non-digit characters.
"""
if rev and not rev.isdigit():
raise ValueError(f"SVN revision must be digits only, got: {rev!r}")
run_on_cmdline(
logger,
["svn", "export", "--non-interactive", "--force"]
+ (rev.split(" ") if rev else [])
+ (["--revision", rev] if rev else [])
+ [url, dst],
)

Expand Down
47 changes: 47 additions & 0 deletions tests/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io
import os
import pathlib
import stat as _stat
import tarfile
import tempfile
import zipfile
Expand Down Expand Up @@ -136,6 +137,44 @@ def test_check_zip_members_absolute():
_check_zip_members(zf)


def _make_zip_with_symlink(link_name: str, link_target: str) -> zipfile.ZipFile:
"""Return an in-memory ZIP whose sole member is a Unix symlink."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
info = zipfile.ZipInfo(link_name)
info.external_attr = (_stat.S_IFLNK | 0o777) << 16
zf.writestr(info, link_target) # content == symlink target
buf.seek(0)
return zipfile.ZipFile(buf)


def test_check_zip_members_symlink_absolute_target():
"""ZIP symlink pointing to an absolute path must be rejected."""
zf = _make_zip_with_symlink("link", "/etc/passwd")
with pytest.raises(RuntimeError, match="symlink"):
_check_zip_members(zf)


def test_check_zip_members_symlink_dotdot_target():
"""ZIP symlink pointing outside extraction dir via .. must be rejected."""
zf = _make_zip_with_symlink("link", "../../etc/shadow")
with pytest.raises(RuntimeError, match="symlink"):
_check_zip_members(zf)


def test_check_zip_members_symlink_windows_dotdot_target():
"""ZIP symlink with Windows-style backslash traversal must be rejected."""
zf = _make_zip_with_symlink("link", "..\\..\\evil")
with pytest.raises(RuntimeError, match="symlink"):
_check_zip_members(zf)


def test_check_zip_members_symlink_safe_relative():
"""ZIP symlink with a safe relative target must be accepted (mirrors TAR behaviour)."""
zf = _make_zip_with_symlink("link", "relative/safe/target")
_check_zip_members(zf) # must NOT raise


# ---------------------------------------------------------------------------
# _check_tar_members
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -236,6 +275,14 @@ def test_check_tar_member_type_dotdot_symlink():
_check_tar_member_type(member)


def test_check_tar_member_type_windows_dotdot_symlink():
"""TAR symlink with Windows-style backslash traversal must be rejected."""
tf = _make_tar_with_member(lambda t: _add_symlink(t, "link", "..\\..\\evil"))
member = tf.getmembers()[0]
with pytest.raises(RuntimeError, match="unsafe target"):
_check_tar_member_type(member)


# ---------------------------------------------------------------------------
# _check_tar_member_type — hardlink validation
# ---------------------------------------------------------------------------
Expand Down
29 changes: 29 additions & 0 deletions tests/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,32 @@ def test_get_manifest_location(name, manifest, project_name, result) -> None:
manifest.find_name_in_manifest(project_name)
else:
assert manifest.find_name_in_manifest(project_name) == result


# ---------------------------------------------------------------------------
# validate_destination – security: absolute paths must be rejected
# ---------------------------------------------------------------------------


@pytest.mark.parametrize(
"dst",
[
"/etc/passwd",
"/tmp/evil",
"C:/Windows/System32",
"C:\\Windows\\System32",
"\\temp\\evil",
],
)
def test_validate_destination_rejects_absolute_paths(dst) -> None:
with pytest.raises(ValueError, match="absolute"):
Manifest.validate_destination(dst)


def test_validate_destination_rejects_dotdot() -> None:
with pytest.raises(ValueError):
Manifest.validate_destination("sub/../../../etc/passwd")


def test_validate_destination_accepts_relative() -> None:
Manifest.validate_destination("external/mylib") # must NOT raise
53 changes: 53 additions & 0 deletions tests/test_svn_vcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Unit tests for SvnRepo.export() argument safety."""

from unittest.mock import patch

import pytest

from dfetch.vcs.svn import SvnRepo


def test_export_with_revision_passes_correct_args():
"""export() must produce the exact expected SVN command."""
with patch("dfetch.vcs.svn.run_on_cmdline") as mock_run:
SvnRepo.export("svn://example.com/repo", rev="12345", dst="/tmp/out")
mock_run.assert_called_once()
assert mock_run.call_args[0][1] == [
"svn",
"export",
"--non-interactive",
"--force",
"--revision",
"12345",
"svn://example.com/repo",
"/tmp/out",
]


def test_export_without_revision_omits_revision_args():
"""export() with no revision must produce the exact expected SVN command."""
with patch("dfetch.vcs.svn.run_on_cmdline") as mock_run:
SvnRepo.export("svn://example.com/repo", dst="/tmp/out")
mock_run.assert_called_once()
assert mock_run.call_args[0][1] == [
"svn",
"export",
"--non-interactive",
"--force",
"svn://example.com/repo",
"/tmp/out",
]


def test_export_rejects_non_digit_revision():
"""Space-containing or flag-like revision strings must raise ValueError.

Before the fix, passing rev='--revision 12345' would be split on spaces and
inject '--revision' and '12345' as separate SVN arguments, allowing option
injection by any caller that bypasses the digit-only validation in
svnsubproject.py. The subprocess must never be invoked for an invalid rev.
"""
with patch("dfetch.vcs.svn.run_on_cmdline") as mock_run:
with pytest.raises(ValueError):
SvnRepo.export("svn://example.com/repo", rev="--revision 12345")
mock_run.assert_not_called()
Loading