Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ dist/
*.pyc
__pycache__/
.pytest_cache/
tests/docs/api
tests/docs/build
tests/_docs/api
tests/_docs/build
.tox/
20 changes: 20 additions & 0 deletions dissect/hypervisor/tools/vmtar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import tarfile

from dissect.hypervisor.util import vmtar


def main() -> None:
# We just want to run the main function of the tarfile module, but with our VisorTarFile and is_tarfile functions
type(tarfile.main)(
tarfile.main.__code__,
tarfile.main.__globals__
| {
"TarFile": vmtar.VisorTarFile,
"is_tarfile": vmtar.is_tarfile,
"open": vmtar.open,
},
)()


if __name__ == "__main__":
main()

Check warning on line 20 in dissect/hypervisor/tools/vmtar.py

View check run for this annotation

Codecov / codecov/patch

dissect/hypervisor/tools/vmtar.py#L20

Added line #L20 was not covered by tests
75 changes: 70 additions & 5 deletions dissect/hypervisor/util/vmtar.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import struct
import tarfile
from io import BytesIO
from typing import BinaryIO, Final


class VisorTarInfo(tarfile.TarInfo):
Expand Down Expand Up @@ -49,9 +51,72 @@
return super()._proc_member(tarfile)


def VisorTarFile(*args, **kwargs) -> tarfile.TarFile:
return tarfile.TarFile(*args, **kwargs, tarinfo=VisorTarInfo)
class VisorTarFile(tarfile.TarFile):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs, tarinfo=VisorTarInfo)


def open(*args, **kwargs) -> tarfile.TarFile:
return tarfile.open(*args, **kwargs, tarinfo=VisorTarInfo)
@classmethod
def visoropen(cls, name: str, mode: str = "r", fileobj: BinaryIO | None = None, **kwargs) -> VisorTarFile:
"""Open a visor tar file for reading. Supports gzip and lzma compression."""
if mode not in ("r",):
raise tarfile.TarError("visor currently only supports read mode")

Check warning on line 62 in dissect/hypervisor/util/vmtar.py

View check run for this annotation

Codecov / codecov/patch

dissect/hypervisor/util/vmtar.py#L62

Added line #L62 was not covered by tests

try:
from gzip import GzipFile
except ImportError:
raise tarfile.CompressionError("gzip module is not available") from None

Check warning on line 67 in dissect/hypervisor/util/vmtar.py

View check run for this annotation

Codecov / codecov/patch

dissect/hypervisor/util/vmtar.py#L66-L67

Added lines #L66 - L67 were not covered by tests

try:
from lzma import LZMAError, LZMAFile
except ImportError:
raise tarfile.CompressionError("lzma module is not available") from None

Check warning on line 72 in dissect/hypervisor/util/vmtar.py

View check run for this annotation

Codecov / codecov/patch

dissect/hypervisor/util/vmtar.py#L71-L72

Added lines #L71 - L72 were not covered by tests

compressed = False

try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except Exception:
try:
fileobj = GzipFile(name, mode + "b", fileobj=fileobj)
except OSError as e:
if fileobj is not None and mode == "r":
raise tarfile.ReadError("not a visor file") from e
raise

Check warning on line 84 in dissect/hypervisor/util/vmtar.py

View check run for this annotation

Codecov / codecov/patch

dissect/hypervisor/util/vmtar.py#L78-L84

Added lines #L78 - L84 were not covered by tests

try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except Exception:
fileobj.seek(0)
fileobj = LZMAFile(fileobj or name, mode) # noqa: SIM115

Check warning on line 90 in dissect/hypervisor/util/vmtar.py

View check run for this annotation

Codecov / codecov/patch

dissect/hypervisor/util/vmtar.py#L86-L90

Added lines #L86 - L90 were not covered by tests

try:
t = cls.taropen(name, mode, fileobj, **kwargs)
except (LZMAError, EOFError, OSError) as e:
fileobj.close()
if mode == "r":
raise tarfile.ReadError("not a visor file") from e
raise
except:
fileobj.close()
raise

Check warning on line 101 in dissect/hypervisor/util/vmtar.py

View check run for this annotation

Codecov / codecov/patch

dissect/hypervisor/util/vmtar.py#L92-L101

Added lines #L92 - L101 were not covered by tests

compressed = True

Check warning on line 103 in dissect/hypervisor/util/vmtar.py

View check run for this annotation

Codecov / codecov/patch

dissect/hypervisor/util/vmtar.py#L103

Added line #L103 was not covered by tests

# If we get here, we have a valid visor tar file
if fileobj is not None and compressed:
# Just read the entire file into memory, it's probably small
fileobj.seek(0)
fileobj = BytesIO(fileobj.read())

Check warning on line 109 in dissect/hypervisor/util/vmtar.py

View check run for this annotation

Codecov / codecov/patch

dissect/hypervisor/util/vmtar.py#L108-L109

Added lines #L108 - L109 were not covered by tests

t = cls.taropen(name, mode, fileobj, **kwargs)

t._extfileobj = False
return t

# Only allow opening visor tar files
OPEN_METH: Final[dict[str, str]] = {"visor": "visoropen"}


open = VisorTarFile.open

is_tarfile = type(tarfile.is_tarfile)(tarfile.is_tarfile.__code__, tarfile.is_tarfile.__globals__ | {"open": open})
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ dev = [

[project.scripts]
envelope-decrypt = "dissect.hypervisor.tools.envelope:main"
vmtar = "dissect.hypervisor.tools.vmtar:main"

[tool.ruff]
line-length = 120
required-version = ">=0.9.0"
required-version = ">=0.11.0"

[tool.ruff.format]
docstring-code-format = true
Expand Down Expand Up @@ -94,7 +95,7 @@ select = [
ignore = ["E203", "B904", "UP024", "ANN002", "ANN003", "ANN204", "ANN401", "SIM105", "TRY003"]

[tool.ruff.lint.per-file-ignores]
"tests/docs/**" = ["INP001"]
"tests/_docs/**" = ["INP001"]

[tool.ruff.lint.isort]
known-first-party = ["dissect.hypervisor"]
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 2 additions & 0 deletions tests/docs/conf.py → tests/_docs/conf.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
project = "dissect.hypervisor"

extensions = [
"autoapi.extension",
"sphinx.ext.autodoc",
Expand Down
File renamed without changes.
30 changes: 15 additions & 15 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,74 +26,74 @@ def open_file_gz(name: str, mode: str = "rb") -> Iterator[BinaryIO]:

@pytest.fixture
def encrypted_vmx() -> Iterator[BinaryIO]:
yield from open_file("data/encrypted.vmx")
yield from open_file("_data/descriptor/vmx/encrypted.vmx")


@pytest.fixture
def vmcx() -> Iterator[BinaryIO]:
yield from open_file("data/test.vmcx")
yield from open_file("_data/descriptor/hyperv/test.vmcx")


@pytest.fixture
def vmrs() -> Iterator[BinaryIO]:
yield from open_file("data/test.VMRS")
yield from open_file("_data/descriptor/hyperv/test.VMRS")


@pytest.fixture
def fixed_vhd() -> Iterator[BinaryIO]:
yield from open_file_gz("data/fixed.vhd.gz")
yield from open_file_gz("_data/disk/vhd/fixed.vhd.gz")


@pytest.fixture
def dynamic_vhd() -> Iterator[BinaryIO]:
yield from open_file_gz("data/dynamic.vhd.gz")
yield from open_file_gz("_data/disk/vhd/dynamic.vhd.gz")


@pytest.fixture
def fixed_vhdx() -> Iterator[BinaryIO]:
yield from open_file_gz("data/fixed.vhdx.gz")
yield from open_file_gz("_data/disk/vhdx/fixed.vhdx.gz")


@pytest.fixture
def dynamic_vhdx() -> Iterator[BinaryIO]:
yield from open_file_gz("data/dynamic.vhdx.gz")
yield from open_file_gz("_data/disk/vhdx/dynamic.vhdx.gz")


@pytest.fixture
def differencing_vhdx() -> Iterator[BinaryIO]:
yield from open_file_gz("data/differencing.avhdx.gz")
yield from open_file_gz("_data/disk/vhdx/differencing.avhdx.gz")


@pytest.fixture
def sesparse_vmdk() -> Iterator[BinaryIO]:
yield from open_file_gz("data/sesparse.vmdk.gz")
yield from open_file_gz("_data/disk/vmdk/sesparse.vmdk.gz")


@pytest.fixture
def plain_hdd() -> Iterator[str]:
return absolute_path("data/plain.hdd")
return absolute_path("_data/disk/hdd/plain.hdd")


@pytest.fixture
def expanding_hdd() -> Iterator[str]:
return absolute_path("data/expanding.hdd")
return absolute_path("_data/disk/hdd/expanding.hdd")


@pytest.fixture
def split_hdd() -> Iterator[str]:
return absolute_path("data/split.hdd")
return absolute_path("_data/disk/hdd/split.hdd")


@pytest.fixture
def envelope() -> Iterator[BinaryIO]:
yield from open_file("data/local.tgz.ve")
yield from open_file("_data/util/envelope/local.tgz.ve")


@pytest.fixture
def keystore() -> Iterator[TextIO]:
yield from open_file("data/encryption.info", "r")
yield from open_file("_data/util/envelope/encryption.info", "r")


@pytest.fixture
def vgz() -> Iterator[BinaryIO]:
yield from open_file("data/test.vgz")
yield from open_file("_data/util/vmtar/test.vgz")
Empty file added tests/descriptor/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Empty file added tests/disk/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
27 changes: 0 additions & 27 deletions tests/test_vmtar.py

This file was deleted.

Empty file added tests/util/__init__.py
Empty file.
File renamed without changes.
80 changes: 80 additions & 0 deletions tests/util/test_vmtar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from __future__ import annotations

from typing import TYPE_CHECKING, BinaryIO

from dissect.hypervisor.tools.vmtar import main as vmtar_main
from dissect.hypervisor.util import vmtar
from tests.conftest import absolute_path

if TYPE_CHECKING:
from pathlib import Path

import pytest


def test_vmtar(vgz: BinaryIO) -> None:
tar = vmtar.open(fileobj=vgz)

members = {member.name: member for member in tar.getmembers()}

# The test file has no textPgs/fixUpPgs
assert all(member.is_visor for member in members.values())
assert set(members.keys()) == {
"test/file1",
"test/file2",
"test/file3",
"test/subdir",
"test/subdir/file4",
}

assert tar.extractfile(members["test/file1"]).read() == (b"a" * 512) + b"\n"
assert tar.extractfile(members["test/file2"]).read() == (b"b" * 1024) + b"\n"
assert tar.extractfile(members["test/file3"]).read() == (b"c" * 2048) + b"\n"
assert tar.extractfile(members["test/subdir/file4"]).read() == (b"f" * 2048) + b"\n"


def test_vmtar_tool(tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture) -> None:
vgz_path = absolute_path("_data/util/vmtar/test.vgz")

with monkeypatch.context() as m:
m.setattr("sys.argv", ["vmtar", "-l", str(vgz_path)])

vmtar_main()

out, _ = capsys.readouterr()
assert out.splitlines() == [
"test/ ",
"test/file3 ",
"test/file2 ",
"test/subdir/ ",
"test/subdir/file4 ",
"test/file1 ",
]

with monkeypatch.context() as m:
m.setattr("sys.argv", ["vmtar", "-t", str(vgz_path)])

vmtar_main()

_, err = capsys.readouterr()
assert err.startswith("[<VisorTarInfo 'test'")

with monkeypatch.context() as m:
m.setattr("sys.argv", ["vmtar", "-e", str(vgz_path), str(tmp_path)])

vmtar_main()

for path in (
"test",
"test/file1",
"test/file2",
"test/file3",
"test/subdir",
"test/subdir/file4",
):
assert tmp_path.joinpath(path).exists()

assert tmp_path.joinpath("test/file1").read_text() == (b"a" * 512).decode() + "\n"
assert tmp_path.joinpath("test/file2").read_text() == (b"b" * 1024).decode() + "\n"
assert tmp_path.joinpath("test/file3").read_text() == (b"c" * 2048).decode() + "\n"
assert tmp_path.joinpath("test/subdir/file4").read_text() == (b"f" * 2048).decode() + "\n"
14 changes: 8 additions & 6 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ commands =
[testenv:fix]
package = skip
deps =
ruff==0.9.2
ruff==0.11.10
commands =
ruff format dissect tests
ruff check --fix dissect tests

[testenv:lint]
package = skip
deps =
ruff==0.9.2
ruff==0.11.10
vermin
commands =
ruff format --check dissect tests
ruff check dissect tests
vermin -t=3.9- --no-tips --lint dissect tests

Expand All @@ -55,12 +57,12 @@ deps =
sphinx-design
furo
commands =
make -C tests/docs clean
make -C tests/docs html
make -C tests/_docs clean
make -C tests/_docs html

[testenv:docs-linkcheck]
allowlist_externals = make
deps = {[testenv:docs-build]deps}
commands =
make -C tests/docs clean
make -C tests/docs linkcheck
make -C tests/_docs clean
make -C tests/_docs linkcheck