From 64f40847b3edf4c643b91de702abd5e6e4f8593b Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Wed, 22 Jan 2025 11:38:17 +0100 Subject: [PATCH] Change linter to Ruff --- dissect/hypervisor/descriptor/c_hyperv.py | 2 + dissect/hypervisor/descriptor/hyperv.py | 28 ++-- dissect/hypervisor/descriptor/ovf.py | 14 +- dissect/hypervisor/descriptor/pvs.py | 11 +- dissect/hypervisor/descriptor/vbox.py | 11 +- dissect/hypervisor/descriptor/vmx.py | 104 ++++++++------- dissect/hypervisor/disk/c_hdd.py | 2 +- dissect/hypervisor/disk/c_qcow2.py | 5 +- dissect/hypervisor/disk/c_vdi.py | 2 + dissect/hypervisor/disk/c_vhd.py | 2 + dissect/hypervisor/disk/c_vhdx.py | 2 + dissect/hypervisor/disk/c_vmdk.py | 2 + dissect/hypervisor/disk/hdd.py | 26 ++-- dissect/hypervisor/disk/qcow2.py | 154 ++++++++++++---------- dissect/hypervisor/disk/vdi.py | 7 +- dissect/hypervisor/disk/vhd.py | 29 ++-- dissect/hypervisor/disk/vhdx.py | 40 +++--- dissect/hypervisor/disk/vmdk.py | 60 ++++----- dissect/hypervisor/tools/envelope.py | 6 +- dissect/hypervisor/util/envelope.py | 20 +-- dissect/hypervisor/util/vmtar.py | 23 ++-- pyproject.toml | 53 +++++++- tests/conftest.py | 27 ++-- tests/test_envelope.py | 9 +- tests/test_hdd.py | 4 +- tests/test_hyperv.py | 8 +- tests/test_ovf.py | 4 +- tests/test_pvs.py | 4 +- tests/test_vbox.py | 2 + tests/test_vhd.py | 12 +- tests/test_vhdx.py | 33 ++--- tests/test_vmdk.py | 12 +- tests/test_vmtar.py | 6 +- tests/test_vmx.py | 8 +- tox.ini | 21 +-- 35 files changed, 447 insertions(+), 306 deletions(-) diff --git a/dissect/hypervisor/descriptor/c_hyperv.py b/dissect/hypervisor/descriptor/c_hyperv.py index 02165cd..8cfa3d7 100644 --- a/dissect/hypervisor/descriptor/c_hyperv.py +++ b/dissect/hypervisor/descriptor/c_hyperv.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dissect.cstruct import cstruct hyperv_def = """ diff --git a/dissect/hypervisor/descriptor/hyperv.py b/dissect/hypervisor/descriptor/hyperv.py index b031b87..1411359 100644 --- a/dissect/hypervisor/descriptor/hyperv.py +++ b/dissect/hypervisor/descriptor/hyperv.py @@ -3,8 +3,7 @@ from __future__ import annotations import struct -from collections.abc import ItemsView, KeysView, ValuesView -from typing import BinaryIO, Optional, Union +from typing import TYPE_CHECKING, BinaryIO from dissect.util.stream import RangeStream @@ -16,6 +15,9 @@ ) from dissect.hypervisor.exceptions import InvalidSignature +if TYPE_CHECKING: + from collections.abc import ItemsView, KeysView, ValuesView + class HyperVFile: """HyperVFile implementation. @@ -278,7 +280,7 @@ def __repr__(self) -> str: return f"" @property - def parent(self) -> Optional[HyperVStorageKeyTableEntry]: + def parent(self) -> HyperVStorageKeyTableEntry | None: """Return the entry parent, if there is any. Requires that all key tables are loaded. @@ -333,8 +335,8 @@ def data(self) -> memoryview: file_object = self.get_file_object() # This memoryview has no purpose, only do it so the return value type is consistent return memoryview(file_object.read(size)) - else: - return self.raw[self.header.data_offset :] + + return self.raw[self.header.data_offset :] @property def key(self) -> str: @@ -343,7 +345,7 @@ def key(self) -> str: return self.raw.tobytes()[: self.header.data_offset - 1].decode("utf-8") @property - def value(self) -> Union[int, bytes, str]: + def value(self) -> int | bytes | str: """Return a Python native value for this entry.""" data = self.data @@ -369,6 +371,8 @@ def value(self) -> Union[int, bytes, str]: if self.type == KeyDataType.Bool: return struct.unpack(" int: """Return the total amount of data bytes, including the key name. @@ -427,10 +431,7 @@ def as_dict(self) -> dict: obj = {} for key, child in self.children.items(): - if child.type == KeyDataType.Node: - value = child.as_dict() - else: - value = child.value + value = child.as_dict() if child.type == KeyDataType.Node else child.value obj[key] = value @@ -466,13 +467,10 @@ def read(self, n: int = -1) -> bytes: if n is not None and n < -1: raise ValueError("invalid number of bytes to read") - if n == -1: - read_length = self.size - else: - read_length = min(n, self.size) + read_length = self.size if n == -1 else min(n, self.size) self.file.fh.seek(self.offset) return self.file.fh.read(read_length) - def open(self, size: Optional[int] = None) -> BinaryIO: + def open(self, size: int | None = None) -> BinaryIO: return RangeStream(self.file.fh, self.offset, size or self.size) diff --git a/dissect/hypervisor/descriptor/ovf.py b/dissect/hypervisor/descriptor/ovf.py index 0040918..9f3e30b 100644 --- a/dissect/hypervisor/descriptor/ovf.py +++ b/dissect/hypervisor/descriptor/ovf.py @@ -1,11 +1,16 @@ -from typing import Iterator, TextIO -from xml.etree.ElementTree import Element +from __future__ import annotations + +from typing import TYPE_CHECKING, Final, TextIO from defusedxml import ElementTree +if TYPE_CHECKING: + from collections.abc import Iterator + from xml.etree.ElementTree import Element + class OVF: - NS = { + NS: Final[dict[str, str]] = { "ovf": "http://schemas.dmtf.org/ovf/envelope/1", "rasd": "http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/CIM_ResourceAllocationSettingData", } @@ -34,8 +39,7 @@ def disks(self) -> Iterator[str]: for disk in self.xml.findall(self.DISK_DRIVE_XPATH, self.NS): resource = disk.find("{{{rasd}}}HostResource".format(**self.NS)) xpath = resource.text - if xpath.startswith("ovf:"): - xpath = xpath[4:] + xpath = xpath.removeprefix("ovf:") if xpath.startswith("/disk/"): disk_ref = xpath.split("/")[-1] diff --git a/dissect/hypervisor/descriptor/pvs.py b/dissect/hypervisor/descriptor/pvs.py index 75008c0..c2db088 100644 --- a/dissect/hypervisor/descriptor/pvs.py +++ b/dissect/hypervisor/descriptor/pvs.py @@ -1,8 +1,13 @@ -from typing import IO, Iterator -from xml.etree.ElementTree import Element +from __future__ import annotations + +from typing import TYPE_CHECKING, TextIO from defusedxml import ElementTree +if TYPE_CHECKING: + from collections.abc import Iterator + from xml.etree.ElementTree import Element + class PVS: """Parallels VM settings file. @@ -11,7 +16,7 @@ class PVS: fh: The file-like object to a PVS file. """ - def __init__(self, fh: IO): + def __init__(self, fh: TextIO): self._xml: Element = ElementTree.fromstring(fh.read()) def disks(self) -> Iterator[str]: diff --git a/dissect/hypervisor/descriptor/vbox.py b/dissect/hypervisor/descriptor/vbox.py index 0711ed4..f79b208 100644 --- a/dissect/hypervisor/descriptor/vbox.py +++ b/dissect/hypervisor/descriptor/vbox.py @@ -1,13 +1,18 @@ -from typing import IO, Iterator -from xml.etree.ElementTree import Element +from __future__ import annotations + +from typing import TYPE_CHECKING, TextIO from defusedxml import ElementTree +if TYPE_CHECKING: + from collections.abc import Iterator + from xml.etree.ElementTree import Element + class VBox: VBOX_XML_NAMESPACE = "{http://www.virtualbox.org/}" - def __init__(self, fh: IO): + def __init__(self, fh: TextIO): self._xml: Element = ElementTree.fromstring(fh.read()) def disks(self) -> Iterator[str]: diff --git a/dissect/hypervisor/descriptor/vmx.py b/dissect/hypervisor/descriptor/vmx.py index 987a9b0..c51bae3 100644 --- a/dissect/hypervisor/descriptor/vmx.py +++ b/dissect/hypervisor/descriptor/vmx.py @@ -7,11 +7,10 @@ import hashlib import hmac import re -from typing import Dict, List from urllib.parse import unquote try: - import _pystandalone + import _pystandalone # type: ignore HAS_PYSTANDALONE = True except ImportError: @@ -44,8 +43,8 @@ class VMX: - def __init__(self, vm_settings: Dict[str, str]): - self.attr = vm_settings + def __init__(self, attr: dict[str, str]): + self.attr = attr @classmethod def parse(cls, string: str) -> VMX: @@ -56,19 +55,28 @@ def parse(cls, string: str) -> VMX: def encrypted(self) -> bool: """Return whether this VMX is encrypted. - Encrypted VMXs will have both a `encryption.keySafe` and `encryption.data` value. - The `encryption.keySafe` is a string encoded `KeySafe`, which is made up of key locators. + Encrypted VMXs will have both a ``encryption.keySafe`` and ``encryption.data`` value. + The ``encryption.keySafe`` is a string encoded ``KeySafe``, which is made up of key locators. For example: + + .. code-block:: none + vmware:key/list/(pair/(phrase/phrase_id/phrase_content,hmac,data),pair/(.../...,...,...)) - A KeySafe must be a list of Pairs. Each Pair has a wrapped key, an HMAC type and some encrypted data. + A ``KeySafe`` must be a list of ``Pairs``. Each ``Pair`` has a wrapped key, an HMAC type and encrypted data. It's implementation specific how to unwrap a key. E.g. a phrase is just PBKDF2. The unwrapped key - can be used to unlock the encrypted Pair data. This will contain the final encryption key to decrypt - the data in `encryption.data`. + can be used to unlock the encrypted ``Pair`` data. This will contain the final encryption key to decrypt + the data in ``encryption.data``. + + So, in summary, to unseal a ``KeySafe``: - So, in summary, to unseal a KeySafe: - Parse KeySafe -> iterate pairs -> unlock Pair -> unwrap key (e.g. Phrase) -> decrypt Pair data -> parse dict + - Parse ``KeySafe`` + - Iterate pairs + - Unlock ``Pair`` + - Unwrap key (e.g. ``Phrase``) + - Decrypt ``Pair`` data + - Parse dictionary The terms for unwrapping, unlocking and unsealing are taken from VMware. """ @@ -77,7 +85,7 @@ def encrypted(self) -> bool: def unlock_with_phrase(self, passphrase: str) -> None: """Unlock this VMX in-place with a passphrase if it's encrypted. - This will load the KeySafe from the current dictionary and attempt to recover the encryption key + This will load the ``KeySafe`` from the current dictionary and attempt to recover the encryption key from it using the given passphrase. This key is used to decrypt the encrypted VMX data. The dictionary is updated in-place with the encrypted VMX data. @@ -92,7 +100,7 @@ def unlock_with_phrase(self, passphrase: str) -> None: decrypted = _decrypt_hmac(key, encrypted, mac) self.attr.update(**_parse_dictionary(decrypted.decode())) - def disks(self) -> List[str]: + def disks(self) -> list[str]: """Return a list of paths to disk files""" dev_classes = ("scsi", "sata", "ide", "nvme") devices = {} @@ -129,7 +137,7 @@ def disks(self) -> List[str]: return sorted(disk_files) -def _parse_dictionary(string: str) -> Dict[str, str]: +def _parse_dictionary(string: str) -> dict[str, str]: """Parse a VMX dictionary.""" dictionary = {} @@ -148,11 +156,11 @@ def _parse_dictionary(string: str) -> Dict[str, str]: class KeySafe: - def __init__(self, locators: List[Pair]): + def __init__(self, locators: list[Pair]): self.locators = locators def unseal_with_phrase(self, passphrase: str) -> bytes: - """Unseal this KeySafe with a passphrase and return the decrypted key.""" + """Unseal this ``KeySafe`` with a passphrase and return the decrypted key.""" for locator in self.locators: if not locator.has_phrase(): continue @@ -170,7 +178,7 @@ def unseal_with_phrase(self, passphrase: str) -> bytes: @classmethod def from_text(cls, text: str) -> KeySafe: - """Parse a KeySafe from a string.""" + """Parse a ``KeySafe`` from a string.""" # Key safes are a list of key locators. It's a key locator string with a specific prefix identifier, _, remainder = text.partition("/") @@ -179,41 +187,41 @@ def from_text(cls, text: str) -> KeySafe: # First part must be a list of pairs locators = _parse_key_locator(remainder) - if not isinstance(locators, list) and not all([isinstance(member, Pair) for member in locators]): + if not isinstance(locators, list) and not all(isinstance(member, Pair) for member in locators): raise ValueError("Invalid KeySafe string, not a list of pairs") return KeySafe(locators) class Pair: - def __init__(self, wrapped_key, mac: str, data: bytes): + def __init__(self, wrapped_key: Phrase, mac: str, data: bytes): self.wrapped_key = wrapped_key self.mac = mac self.data = data - def __repr__(self): + def __repr__(self) -> str: return f"" def has_phrase(self) -> bool: - """Return whether this Pair is a Phrase pair.""" + """Return whether this ``Pair`` is a ``Phrase`` pair.""" return isinstance(self.wrapped_key, Phrase) def _unlock(self, key: bytes) -> bytes: - """Decrypt the data in this Pair.""" + """Decrypt the data in this ``Pair``.""" return _decrypt_hmac(key, self.data, self.mac) def unlock(self, *args, **kwargs) -> bytes: - """Helper method to unlock this Pair for various wrapped keys. + """Helper method to unlock this ``Pair`` for various wrapped keys. Currently only supports `Phrase`. """ if self.has_phrase(): return self.unlock_with_phrase(*args, **kwargs) - else: - raise TypeError(f"Unable to unlock {self.key}") + + raise TypeError(f"Unable to unlock {self.wrapped_key}") def unlock_with_phrase(self, passphrase: str) -> bytes: - """Unlock this Pair with a passphrase and return the decrypted data.""" + """Unlock this ``Pair`` with a passphrase and return the decrypted data.""" if not self.has_phrase(): raise TypeError("Pair doesn't have a phrase protected key") @@ -229,13 +237,13 @@ def __init__(self, id: str, pass2key: str, cipher: str, rounds: int, salt: bytes self.rounds = rounds self.salt = salt - def __repr__(self): + def __repr__(self) -> str: return f"" def unwrap(self, passphrase: str) -> bytes: """Unwrap/generate the encryption key for a given passphrase. - VMware calls this unwrapping, but really it's a KDF with the properties of this Phrase. + VMware calls this unwrapping, but really it's a KDF with the properties of this ``Phrase``. """ return hashlib.pbkdf2_hmac( PASS2KEY_MAP[self.pass2key], @@ -246,13 +254,13 @@ def unwrap(self, passphrase: str) -> bytes: ) -def _parse_key_locator(locator_string: str): +def _parse_key_locator(locator_string: str) -> Pair | Phrase | list[Pair | Phrase]: """Parse a key locator from a string. - Key locators are string formatted data structures with a forward slash (/) separator. Each component is - prefixed with a type, followed by that types' specific data. Values between separators are url encoded. + Key locators are string formatted data structures with a forward slash (``/``) separator. Each component is + prefixed with a type, followed by that types' specific data. Values between separators are URL encoded. - Interally called `KeyLocator`. + Interally called ``KeyLocator``. """ identifier, _, remainder = locator_string.partition("/") @@ -261,7 +269,8 @@ def _parse_key_locator(locator_string: str): # Comma separated list in between braces # list/(member,member) return [_parse_key_locator(member) for member in _split_list(remainder)] - elif identifier == "pair": + + if identifier == "pair": # Comma separated tuple with 3 members # pair/(key data,mac type,encrypted data) members = _split_list(remainder) @@ -270,7 +279,8 @@ def _parse_key_locator(locator_string: str): unquote(members[1]), base64.b64decode(unquote(members[2])), ) - elif identifier == "phrase": + + if identifier == "phrase": # Serialized crypto dict, prefixed with an identifier # phrase/encoded id/encoded dict phrase_id, _, phrase_data = remainder.partition("/") @@ -282,20 +292,19 @@ def _parse_key_locator(locator_string: str): int(crypto_dict["rounds"]), base64.b64decode(crypto_dict["salt"]), ) - else: - # rawkey, ldap, script, role, fqid - raise NotImplementedError(f"Not implemented keysafe identifier: {identifier}") + + # rawkey, ldap, script, role, fqid + raise NotImplementedError(f"Not implemented keysafe identifier: {identifier}") -def _split_list(list_string: str) -> List[str]: +def _split_list(value: str) -> list[str]: """Parse a key locator list from a string. Lists are wrapped by braces and separated by comma. They can contain nested lists/pairs, so we need to separate at the correct nest level. """ - match = re.match(r"\((.+)\)", list_string) - if not match: + if not (match := re.match(r"\((.+)\)", value)): raise ValueError("Invalid list string") contents = match.group(1) @@ -321,12 +330,12 @@ def _split_list(list_string: str) -> List[str]: return members -def _parse_crypto_dict(dict_string: str) -> Dict[str, str]: +def _parse_crypto_dict(dict_string: str) -> dict[str, str]: """Parse a crypto dict from a string. - Crypto dicts are encoded as `key=encoded_value:key=encoded_value`. + Crypto dicts are encoded as ``key=encoded_value:key=encoded_value``. - Internally called `CryptoDict`. + Internally called ``CryptoDict``. """ crypto_dict = {} @@ -360,7 +369,7 @@ def _decrypt_hmac(key: bytes, data: bytes, digest: str) -> bytes: return decrypted -def _create_cipher(key: bytes, iv: bytes): +def _create_cipher(key: bytes, iv: bytes) -> AES.CbcMode: """Create a cipher object. Dynamic based on the available crypto module. @@ -377,7 +386,8 @@ def _create_cipher(key: bytes, iv: bytes): raise ValueError(f"Invalid key size: {len(key)}") return _pystandalone.cipher(cipher, key, iv) - elif HAS_PYCRYPTODOME: + + if HAS_PYCRYPTODOME: return AES.new(key, AES.MODE_CBC, iv=iv) - else: - raise RuntimeError("No crypto module available") + + raise RuntimeError("No crypto module available") diff --git a/dissect/hypervisor/disk/c_hdd.py b/dissect/hypervisor/disk/c_hdd.py index afe7044..d3c60cd 100644 --- a/dissect/hypervisor/disk/c_hdd.py +++ b/dissect/hypervisor/disk/c_hdd.py @@ -1,7 +1,7 @@ # References: # - https://src.openvz.org/projects/OVZ/repos/ploop/browse/include/ploop1_image.h # - https://github.com/qemu/qemu/blob/master/docs/interop/parallels.txt - +from __future__ import annotations from dissect.cstruct import cstruct diff --git a/dissect/hypervisor/disk/c_qcow2.py b/dissect/hypervisor/disk/c_qcow2.py index dda10b0..a98664c 100644 --- a/dissect/hypervisor/disk/c_qcow2.py +++ b/dissect/hypervisor/disk/c_qcow2.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dissect.cstruct import cstruct qcow2_def = """ @@ -169,8 +171,9 @@ ) -def ctz(value, size=32): +def ctz(value: int, size: int = 32) -> int: """Count the number of zero bits in an integer of a given size.""" for i in range(size): if value & (1 << i): return i + return 0 diff --git a/dissect/hypervisor/disk/c_vdi.py b/dissect/hypervisor/disk/c_vdi.py index 2b3dace..e57c0d6 100644 --- a/dissect/hypervisor/disk/c_vdi.py +++ b/dissect/hypervisor/disk/c_vdi.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dissect.cstruct import cstruct # https://www.virtualbox.org/browser/vbox/trunk/src/VBox/Storage/VDICore.h diff --git a/dissect/hypervisor/disk/c_vhd.py b/dissect/hypervisor/disk/c_vhd.py index 2a1b434..0cf04e6 100644 --- a/dissect/hypervisor/disk/c_vhd.py +++ b/dissect/hypervisor/disk/c_vhd.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dissect.cstruct import cstruct vhd_def = """ diff --git a/dissect/hypervisor/disk/c_vhdx.py b/dissect/hypervisor/disk/c_vhdx.py index e44b604..8788037 100644 --- a/dissect/hypervisor/disk/c_vhdx.py +++ b/dissect/hypervisor/disk/c_vhdx.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from uuid import UUID from dissect.cstruct import cstruct diff --git a/dissect/hypervisor/disk/c_vmdk.py b/dissect/hypervisor/disk/c_vmdk.py index 7694bb7..07f3f77 100644 --- a/dissect/hypervisor/disk/c_vmdk.py +++ b/dissect/hypervisor/disk/c_vmdk.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import struct from dissect.cstruct import cstruct diff --git a/dissect/hypervisor/disk/hdd.py b/dissect/hypervisor/disk/hdd.py index a451d4a..212c137 100644 --- a/dissect/hypervisor/disk/hdd.py +++ b/dissect/hypervisor/disk/hdd.py @@ -4,9 +4,8 @@ from dataclasses import dataclass from functools import cached_property from pathlib import Path -from typing import BinaryIO, Iterator, Optional, Tuple, Union +from typing import TYPE_CHECKING, BinaryIO from uuid import UUID -from xml.etree.ElementTree import Element from defusedxml import ElementTree from dissect.util.stream import AlignedStream @@ -14,6 +13,10 @@ from dissect.hypervisor.disk.c_hdd import SECTOR_SIZE, c_hdd from dissect.hypervisor.exceptions import InvalidHeaderError +if TYPE_CHECKING: + from collections.abc import Iterator + from xml.etree.ElementTree import Element + DEFAULT_TOP_GUID = UUID("{5fbaabe3-6958-40ff-92a7-860e329aab41}") NULL_GUID = UUID("00000000-0000-0000-0000-000000000000") @@ -76,7 +79,7 @@ def _open_image(self, path: Path) -> BinaryIO: # If the path is relative, it's always relative to the HDD root return (root / path).open("rb") - def open(self, guid: Optional[Union[str, UUID]] = None) -> BinaryIO: + def open(self, guid: str | UUID | None = None) -> BinaryIO: """Open a stream for this HDD, optionally for a specific snapshot. If no snapshot GUID is provided, the "top" snapshot will be used. @@ -154,7 +157,7 @@ def from_xml(cls, element: Element) -> XMLEntry: @classmethod def _from_xml(cls, element: Element) -> XMLEntry: - raise NotImplementedError() + raise NotImplementedError @dataclass @@ -213,7 +216,7 @@ def _from_xml(cls, element: Element) -> Image: @dataclass class Snapshots(XMLEntry): - top_guid: Optional[UUID] + top_guid: UUID | None shots: list[Shot] @classmethod @@ -307,7 +310,7 @@ class HDS(AlignedStream): parent: Optional file-like object for the parent HDS file. """ - def __init__(self, fh: BinaryIO, parent: Optional[BinaryIO] = None): + def __init__(self, fh: BinaryIO, parent: BinaryIO | None = None): self.fh = fh self.parent = parent @@ -357,7 +360,7 @@ def _read(self, offset: int, length: int) -> bytes: return b"".join(result) - def _iter_runs(self, offset: int, length: int) -> Iterator[Tuple[int, int]]: + def _iter_runs(self, offset: int, length: int) -> Iterator[tuple[int, int]]: """Iterate optimized read runs for a given offset and read length. Args: @@ -374,12 +377,9 @@ def _iter_runs(self, offset: int, length: int) -> Iterator[Tuple[int, int]]: read_size = min(self.cluster_size - offset_in_cluster, length) bat_entry = bat[cluster_idx] - if bat_entry == 0: - # BAT entry of 0 means either a sparse or a parent read - # Use 0 to denote a sparse run for now to make calculations easier - read_offset = 0 - else: - read_offset = (bat_entry * self._bat_multiplier * SECTOR_SIZE) + offset_in_cluster + # BAT entry of 0 means either a sparse or a parent read + # Use 0 to denote a sparse run for now to make calculations easier + read_offset = 0 if bat_entry == 0 else bat_entry * self._bat_multiplier * SECTOR_SIZE + offset_in_cluster if run_offset is None: # First iteration diff --git a/dissect/hypervisor/disk/qcow2.py b/dissect/hypervisor/disk/qcow2.py index a66c58c..ce1ef91 100644 --- a/dissect/hypervisor/disk/qcow2.py +++ b/dissect/hypervisor/disk/qcow2.py @@ -1,11 +1,13 @@ # References: # - https://github.com/qemu/qemu/blob/master/block/qcow2.c # - https://github.com/qemu/qemu/blob/master/docs/interop/qcow2.txt +from __future__ import annotations import copy import zlib from functools import cached_property, lru_cache from io import BytesIO +from typing import TYPE_CHECKING, BinaryIO from dissect.util.stream import AlignedStream @@ -21,6 +23,9 @@ ) from dissect.hypervisor.exceptions import Error, InvalidHeaderError +if TYPE_CHECKING: + from collections.abc import Iterator + try: import zstandard as zstd @@ -45,7 +50,7 @@ class QCow2(AlignedStream): in all null bytes being read. """ - def __init__(self, fh, data_file=None, backing_file=None): + def __init__(self, fh: BinaryIO, data_file: BinaryIO | None = None, backing_file: BinaryIO | int | None = None): self.fh = fh self.header = c_qcow2.QCowHeader(fh) @@ -120,7 +125,7 @@ def __init__(self, fh, data_file=None, backing_file=None): super().__init__(self.header.size) - def _read_extensions(self): + def _read_extensions(self) -> None: start_offset = self.header.header_length end_offset = self.header.backing_file_offset or 1 << self.cluster_bits @@ -135,7 +140,8 @@ def _read_extensions(self): if ext.magic == c_qcow2.QCOW2_EXT_MAGIC_END: break - elif ext.magic == c_qcow2.QCOW2_EXT_MAGIC_BACKING_FORMAT: + + if ext.magic == c_qcow2.QCOW2_EXT_MAGIC_BACKING_FORMAT: self.backing_format = self.fh.read(ext.len).decode().upper() self.image_backing_format = self.backing_format.upper() elif ext.magic == c_qcow2.QCOW2_EXT_MAGIC_FEATURE_TABLE: @@ -153,7 +159,7 @@ def _read_extensions(self): offset += (ext.len + 7) & 0xFFFFFFF8 @cached_property - def snapshots(self): + def snapshots(self) -> list[QCow2Snapshot]: snapshots = [] offset = self.header.snapshots_offset @@ -164,46 +170,46 @@ def snapshots(self): return snapshots @cached_property - def l1_table(self): + def l1_table(self) -> list[int]: # L1 table is usually relatively small, it can be at most 32MB on PB or EB size disks self.fh.seek(self.header.l1_table_offset) return c_qcow2.uint64[self.header.l1_size](self.fh) - def l2_table(self, l2_offset): + def l2_table(self, l2_offset: int) -> L2Table: return L2Table(self, l2_offset) @property - def has_backing_file(self): + def has_backing_file(self) -> bool: return self.backing_file is not None @property - def has_data_file(self): + def has_data_file(self) -> bool: return self.data_file != self.fh @property - def has_subclusters(self): + def has_subclusters(self) -> bool: return bool(self.header.incompatible_features & c_qcow2.QCOW2_INCOMPAT_EXTL2) - def _read(self, offset, length): + def _read(self, offset: int, length: int) -> bytes: result = [] - for sc_type, offset, run_offset, run_length in self._yield_runs(offset, length): + for sc_type, read_offset, run_offset, run_length in self._yield_runs(offset, length): unalloc_zeroed = sc_type in UNALLOCATED_SUBCLUSTER_TYPES and not self.has_backing_file if sc_type in ZERO_SUBCLUSTER_TYPES or unalloc_zeroed: result.append(b"\x00" * run_length) elif sc_type in UNALLOCATED_SUBCLUSTER_TYPES and self.has_backing_file: - self.backing_file.seek(offset) + self.backing_file.seek(read_offset) result.append(self.backing_file.read(run_length)) elif sc_type == QCow2SubclusterType.QCOW2_SUBCLUSTER_COMPRESSED: - result.append(self._read_compressed(run_offset, offset, run_length)) + result.append(self._read_compressed(run_offset, read_offset, run_length)) elif sc_type == QCow2SubclusterType.QCOW2_SUBCLUSTER_NORMAL: self.data_file.seek(run_offset) result.append(self.data_file.read(run_length)) return b"".join(result) - def _read_compressed(self, cluster_descriptor, offset, length): + def _read_compressed(self, cluster_descriptor: int, offset: int, length: int) -> bytes: offset_in_cluster = offset_into_cluster(self, offset) coffset = cluster_descriptor & self.cluster_offset_mask nb_csectors = ((cluster_descriptor >> self.csize_shift) & self.csize_mask) + 1 @@ -217,11 +223,12 @@ def _read_compressed(self, cluster_descriptor, offset, length): return decompressed[offset_in_cluster : offset_in_cluster + length] - def _decompress(self, buf): + def _decompress(self, buf: bytes) -> bytes: if self.compression_type == c_qcow2.QCOW2_COMPRESSION_TYPE_ZLIB: dctx = zlib.decompressobj(-12) return dctx.decompress(buf, self.cluster_size) - elif self.compression_type == c_qcow2.QCOW2_COMPRESSION_TYPE_ZSTD: + + if self.compression_type == c_qcow2.QCOW2_COMPRESSION_TYPE_ZSTD: result = [] dctx = zstd.ZstdDecompressor() @@ -232,10 +239,10 @@ def _decompress(self, buf): break result.append(chunk) return b"".join(result) - else: - raise Error(f"Invalid compression type: {self.compression_type}") - def _yield_runs(self, offset, length): + raise Error(f"Invalid compression type: {self.compression_type}") + + def _yield_runs(self, offset: int, length: int) -> Iterator[tuple[QCow2SubclusterType, int, int, int]]: # reference: qcow2_get_host_offset while length > 0: sc_type = None @@ -303,7 +310,7 @@ def _yield_runs(self, offset, length): class L2Table: """Convenience class for accessing the L2 table.""" - def __init__(self, qcow2, offset): + def __init__(self, qcow2: QCow2, offset: int): self.qcow2 = qcow2 self.offset = offset @@ -311,10 +318,10 @@ def __init__(self, qcow2, offset): self.qcow2.fh.seek(offset) self._table = c_qcow2.uint64[l2_table_size](self.qcow2.fh) - def entry(self, idx): + def entry(self, idx: int) -> int: return self._table[idx * self.qcow2._l2_entry_size // 8] - def bitmap(self, idx): + def bitmap(self, idx: int) -> int: if self.qcow2.has_subclusters: return self._table[(idx * self.qcow2._l2_entry_size // 8) + 1] return 0 @@ -323,7 +330,7 @@ def bitmap(self, idx): class QCow2Snapshot: """Wrapper class for snapshot table entries.""" - def __init__(self, qcow2, offset): + def __init__(self, qcow2: QCow2, offset: int): self.qcow2 = qcow2 self.offset = offset @@ -343,64 +350,65 @@ def __init__(self, qcow2, offset): self.entry_size = self.qcow2.fh.tell() - offset - def open(self): + def open(self) -> QCow2: disk = copy.copy(self.qcow2) disk.l1_table = self.l1_table disk.seek(0) return disk @cached_property - def l1_table(self): + def l1_table(self) -> list[int]: # L1 table is usually relatively small, it can be at most 32MB on PB or EB size disks self.qcow2.fh.seek(self.header.l1_table_offset) return c_qcow2.uint64[self.header.l1_size](self.qcow2.fh) -def offset_into_cluster(qcow2, offset): +def offset_into_cluster(qcow2: QCow2, offset: int) -> int: return offset & (qcow2.cluster_size - 1) -def offset_into_subcluster(qcow2, offset): +def offset_into_subcluster(qcow2: QCow2, offset: int) -> int: return offset & (qcow2.subcluster_size - 1) -def size_to_clusters(qcow2, size): +def size_to_clusters(qcow2: QCow2, size: int) -> int: return (size + (qcow2.cluster_size - 1)) >> qcow2.cluster_bits -def size_to_subclusters(qcow2, size): +def size_to_subclusters(qcow2: QCow2, size: int) -> int: return (size + (qcow2.subcluster_size - 1)) >> qcow2.subcluster_bits -def offset_to_l1_index(qcow2, offset): +def offset_to_l1_index(qcow2: QCow2, offset: int) -> int: return offset >> (qcow2.l2_bits + qcow2.cluster_bits) -def offset_to_l2_index(qcow2, offset): +def offset_to_l2_index(qcow2: QCow2, offset: int) -> int: return (offset >> qcow2.cluster_bits) & (qcow2.l2_size - 1) -def offset_to_sc_index(qcow2, offset): +def offset_to_sc_index(qcow2: QCow2, offset: int) -> int: return (offset >> qcow2.subcluster_bits) & (qcow2.subclusters_per_cluster - 1) -def get_cluster_type(qcow2, l2_entry): +def get_cluster_type(qcow2: QCow2, l2_entry: int) -> QCow2ClusterType: if l2_entry & c_qcow2.QCOW_OFLAG_COMPRESSED: return QCow2ClusterType.QCOW2_CLUSTER_COMPRESSED - elif (l2_entry & c_qcow2.QCOW_OFLAG_ZERO) and not qcow2.has_subclusters: + + if (l2_entry & c_qcow2.QCOW_OFLAG_ZERO) and not qcow2.has_subclusters: if l2_entry & c_qcow2.L2E_OFFSET_MASK: return QCow2ClusterType.QCOW2_CLUSTER_ZERO_ALLOC return QCow2ClusterType.QCOW2_CLUSTER_ZERO_PLAIN - elif not l2_entry & c_qcow2.L2E_OFFSET_MASK: + + if not l2_entry & c_qcow2.L2E_OFFSET_MASK: if qcow2.has_data_file and l2_entry & c_qcow2.QCOW_OFLAG_COPIED: return QCow2ClusterType.QCOW2_CLUSTER_NORMAL - else: - return QCow2ClusterType.QCOW2_CLUSTER_UNALLOCATED - else: - return QCow2ClusterType.QCOW2_CLUSTER_NORMAL + return QCow2ClusterType.QCOW2_CLUSTER_UNALLOCATED + return QCow2ClusterType.QCOW2_CLUSTER_NORMAL -def get_subcluster_type(qcow2, l2_entry, l2_bitmap, sc_index): + +def get_subcluster_type(qcow2: QCow2, l2_entry: int, l2_bitmap: int, sc_index: int) -> QCow2SubclusterType: c_type = get_cluster_type(qcow2, l2_entry) sc_alloc_mask = 1 << sc_index @@ -409,40 +417,40 @@ def get_subcluster_type(qcow2, l2_entry, l2_bitmap, sc_index): if qcow2.has_subclusters: if c_type == QCow2ClusterType.QCOW2_CLUSTER_COMPRESSED: return QCow2SubclusterType.QCOW2_SUBCLUSTER_COMPRESSED - elif c_type == QCow2ClusterType.QCOW2_CLUSTER_NORMAL: + if c_type == QCow2ClusterType.QCOW2_CLUSTER_NORMAL: if (l2_bitmap >> 32) & l2_bitmap: return QCow2SubclusterType.QCOW2_SUBCLUSTER_INVALID - elif l2_bitmap & sc_zero_mask: # QCOW_OFLAG_SUB_ZERO(sc_index) + if l2_bitmap & sc_zero_mask: # QCOW_OFLAG_SUB_ZERO(sc_index) return QCow2SubclusterType.QCOW2_SUBCLUSTER_ZERO_ALLOC - elif l2_bitmap & sc_alloc_mask: # QCOW_OFLAG_SUB_ALLOC(sc_index) + if l2_bitmap & sc_alloc_mask: # QCOW_OFLAG_SUB_ALLOC(sc_index) return QCow2SubclusterType.QCOW2_SUBCLUSTER_NORMAL - else: - return QCow2SubclusterType.QCOW2_SUBCLUSTER_UNALLOCATED_ALLOC - elif c_type == QCow2ClusterType.QCOW2_CLUSTER_UNALLOCATED: + return QCow2SubclusterType.QCOW2_SUBCLUSTER_UNALLOCATED_ALLOC + if c_type == QCow2ClusterType.QCOW2_CLUSTER_UNALLOCATED: if l2_bitmap & ((1 << 32) - 1): return QCow2SubclusterType.QCOW2_SUBCLUSTER_INVALID - elif l2_bitmap & sc_zero_mask: # QCOW_OFLAG_SUB_ZERO(sc_index) + if l2_bitmap & sc_zero_mask: # QCOW_OFLAG_SUB_ZERO(sc_index) return QCow2SubclusterType.QCOW2_SUBCLUSTER_ZERO_PLAIN - else: - return QCow2SubclusterType.QCOW2_SUBCLUSTER_UNALLOCATED_PLAIN - else: - raise Error(f"Invalid cluster type: {c_type}") - else: - if c_type == QCow2ClusterType.QCOW2_CLUSTER_COMPRESSED: - return QCow2SubclusterType.QCOW2_SUBCLUSTER_COMPRESSED - elif c_type == QCow2ClusterType.QCOW2_CLUSTER_ZERO_PLAIN: - return QCow2SubclusterType.QCOW2_SUBCLUSTER_ZERO_PLAIN - elif c_type == QCow2ClusterType.QCOW2_CLUSTER_ZERO_ALLOC: - return QCow2SubclusterType.QCOW2_SUBCLUSTER_ZERO_ALLOC - elif c_type == QCow2ClusterType.QCOW2_CLUSTER_NORMAL: - return QCow2SubclusterType.QCOW2_SUBCLUSTER_NORMAL - elif c_type == QCow2ClusterType.QCOW2_CLUSTER_UNALLOCATED: return QCow2SubclusterType.QCOW2_SUBCLUSTER_UNALLOCATED_PLAIN - else: - raise Error(f"Invalid cluster type: {c_type}") + raise Error(f"Invalid cluster type: {c_type}") -def get_subcluster_range_type(qcow2, l2_entry, l2_bitmap, sc_from): + if c_type == QCow2ClusterType.QCOW2_CLUSTER_COMPRESSED: + return QCow2SubclusterType.QCOW2_SUBCLUSTER_COMPRESSED + if c_type == QCow2ClusterType.QCOW2_CLUSTER_ZERO_PLAIN: + return QCow2SubclusterType.QCOW2_SUBCLUSTER_ZERO_PLAIN + if c_type == QCow2ClusterType.QCOW2_CLUSTER_ZERO_ALLOC: + return QCow2SubclusterType.QCOW2_SUBCLUSTER_ZERO_ALLOC + if c_type == QCow2ClusterType.QCOW2_CLUSTER_NORMAL: + return QCow2SubclusterType.QCOW2_SUBCLUSTER_NORMAL + if c_type == QCow2ClusterType.QCOW2_CLUSTER_UNALLOCATED: + return QCow2SubclusterType.QCOW2_SUBCLUSTER_UNALLOCATED_PLAIN + + raise Error(f"Invalid cluster type: {c_type}") + + +def get_subcluster_range_type( + qcow2: QCow2, l2_entry: int, l2_bitmap: int, sc_from: int +) -> tuple[QCow2SubclusterType, int]: sc_type = get_subcluster_type(qcow2, l2_entry, l2_bitmap, sc_from) # No subclusters, so count the entire cluster @@ -452,21 +460,23 @@ def get_subcluster_range_type(qcow2, l2_entry, l2_bitmap, sc_from): sc_mask = (1 << sc_from) - 1 if sc_type == QCow2SubclusterType.QCOW2_SUBCLUSTER_NORMAL: val = l2_bitmap | sc_mask # QCOW_OFLAG_SUB_ALLOC_RANGE(0, sc_from) - return ctz(val, 32) - sc_from - elif sc_type in ZERO_SUBCLUSTER_TYPES: + return sc_type, ctz(val, 32) - sc_from + if sc_type in ZERO_SUBCLUSTER_TYPES: val = (l2_bitmap | sc_mask) >> 32 # QCOW_OFLAG_SUB_ZERO_RANGE(0, sc_from) - return ctz(val, 32) - sc_from - elif sc_type in UNALLOCATED_SUBCLUSTER_TYPES: + return sc_type, ctz(val, 32) - sc_from + if sc_type in UNALLOCATED_SUBCLUSTER_TYPES: # We need to mask it with a 64bit mask because Python flips the sign bit inv_mask = ~sc_mask & ((1 << 64) - 1) # ~QCOW_OFLAG_SUB_ALLOC_RANGE(0, sc_from) val = ((l2_bitmap >> 32) | l2_bitmap) & inv_mask - return ctz(val, 32) - sc_from - else: - raise Error(f"Invalid subcluster type: {sc_type}") + return sc_type, ctz(val, 32) - sc_from + + raise Error(f"Invalid subcluster type: {sc_type}") -def count_contiguous_subclusters(qcow2, nb_clusters, sc_index, l2_table, l2_index): +def count_contiguous_subclusters( + qcow2: QCow2, nb_clusters: int, sc_index: int, l2_table: L2Table, l2_index: int +) -> int: count = 0 expected_type = None expected_offset = None diff --git a/dissect/hypervisor/disk/vdi.py b/dissect/hypervisor/disk/vdi.py index 0a99301..7da92c3 100644 --- a/dissect/hypervisor/disk/vdi.py +++ b/dissect/hypervisor/disk/vdi.py @@ -1,4 +1,7 @@ +from __future__ import annotations + import array +from typing import BinaryIO from dissect.util.stream import AlignedStream @@ -7,7 +10,7 @@ class VDI(AlignedStream): - def __init__(self, fh, parent=None): + def __init__(self, fh: BinaryIO, parent: VDI | None = None): self.fh = fh self.parent = parent self.header = c_vdi.HeaderDescriptor(fh) @@ -32,7 +35,7 @@ def __init__(self, fh, parent=None): self.sector_size = self.header.SectorSize super().__init__(size=self.header.DiskSize) - def _read(self, offset, length): + def _read(self, offset: int, length: int) -> bytes: block_idx, block_offset = divmod(offset, self.block_size) bytes_read = [] diff --git a/dissect/hypervisor/disk/vhd.py b/dissect/hypervisor/disk/vhd.py index b660892..d58e98b 100644 --- a/dissect/hypervisor/disk/vhd.py +++ b/dissect/hypervisor/disk/vhd.py @@ -1,13 +1,16 @@ +from __future__ import annotations + import io import struct from functools import lru_cache +from typing import BinaryIO from dissect.util.stream import AlignedStream from dissect.hypervisor.disk.c_vhd import SECTOR_SIZE, c_vhd -def read_footer(fh): +def read_footer(fh: BinaryIO) -> c_vhd.footer: fh.seek(-512, io.SEEK_END) footer = c_vhd.footer(fh) if not footer.features & 0x00000002: @@ -19,7 +22,7 @@ def read_footer(fh): class VHD(AlignedStream): # Note: split VHD files are currently unsupported. - def __init__(self, fh): + def __init__(self, fh: BinaryIO): self.fh = fh footer = read_footer(fh) @@ -31,7 +34,7 @@ def __init__(self, fh): super().__init__(self.disk.size) - def _read(self, offset, length): + def _read(self, offset: int, length: int) -> bytes: sector = offset // SECTOR_SIZE count = (length + SECTOR_SIZE - 1) // SECTOR_SIZE @@ -39,23 +42,23 @@ def _read(self, offset, length): class Disk: - def __init__(self, fh, footer=None): + def __init__(self, fh: BinaryIO, footer: c_vhd.footer | None = None): self.fh = fh self.footer = footer if footer else read_footer(fh) self.size = self.footer.current_size - def read_sectors(self, sector, count): - raise NotImplementedError() + def read_sectors(self, sector: int, count: int) -> bytes: + raise NotImplementedError class FixedDisk(Disk): - def read_sectors(self, sector, count): + def read_sectors(self, sector: int, count: int) -> bytes: self.fh.seek(sector * SECTOR_SIZE) return self.fh.read(count * SECTOR_SIZE) class DynamicDisk(Disk): - def __init__(self, fh, footer=None): + def __init__(self, fh: BinaryIO, footer: c_vhd.footer | None = None): super().__init__(fh, footer) fh.seek(self.footer.data_offset) self.header = c_vhd.dynamic_header(fh) @@ -66,7 +69,7 @@ def __init__(self, fh, footer=None): # Save bitmap size in sectors self._sector_bitmap_size = ((self._sectors_per_block // 8) + SECTOR_SIZE - 1) // SECTOR_SIZE - def read_sectors(self, sector, count): + def read_sectors(self, sector: int, count: int) -> bytes: result = [] while count > 0: block, offset = divmod(sector, self._sectors_per_block) @@ -96,17 +99,17 @@ class BlockAllocationTable: ENTRY = struct.Struct(">I") - def __init__(self, fh, offset, max_entries): + def __init__(self, fh: BinaryIO, offset: int, max_entries: int): self.fh = fh self.offset = offset self.max_entries = max_entries self.get = lru_cache(4096)(self.get) - def get(self, block): + def get(self, block: int) -> int | None: # This could be improved by caching the entire BAT (or chunks if too large) if block + 1 > self.max_entries: - raise ValueError("Invalid block {} (max block is {})".format(block, self.max_entries - 1)) + raise ValueError(f"Invalid block {block} (max block is {self.max_entries - 1})") self.fh.seek(self.offset + block * 4) sector_offset = self.ENTRY.unpack(self.fh.read(4))[0] @@ -114,5 +117,5 @@ def get(self, block): sector_offset = None return sector_offset - def __getitem__(self, block): + def __getitem__(self, block: int) -> int | None: return self.get(block) diff --git a/dissect/hypervisor/disk/vhdx.py b/dissect/hypervisor/disk/vhdx.py index 4da6775..592bb47 100644 --- a/dissect/hypervisor/disk/vhdx.py +++ b/dissect/hypervisor/disk/vhdx.py @@ -1,10 +1,13 @@ # References: # - [MS-VHDX] https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-vhdx/83e061f8-f6e2-4de1-91bd-5d518a43d477 +from __future__ import annotations + import logging import os from functools import lru_cache from pathlib import Path +from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Final from uuid import UUID from dissect.util.stream import AlignedStream @@ -25,6 +28,9 @@ ) from dissect.hypervisor.exceptions import InvalidSignature, InvalidVirtualDisk +if TYPE_CHECKING: + from collections.abc import Iterator + log = logging.getLogger(__name__) log.setLevel(os.getenv("DISSECT_LOG_VHDX", "CRITICAL")) @@ -38,7 +44,7 @@ class VHDX(AlignedStream): the parent VHDX in the same directory, or the registered absolute directory. """ - def __init__(self, fh): + def __init__(self, fh: BinaryIO | Path | str): if hasattr(fh, "read"): name = getattr(fh, "name", None) path = Path(name) if name else None @@ -85,7 +91,7 @@ def __init__(self, fh): self._chunk_ratio = ((2**23) * self.sector_size) // self.block_size self.parent = None - self.parent_locator = None + self.parent_locator: ParentLocator = None if self.has_parent: self.parent_locator = self.metadata.get(PARENT_LOCATOR_GUID) if self.parent_locator.type != VHDX_PARENT_LOCATOR_GUID: @@ -97,7 +103,7 @@ def __init__(self, fh): super().__init__(self.size) - def read_sectors(self, sector, count): + def read_sectors(self, sector: int, count: int) -> bytes: log.debug("VHDX::read_sectors(0x%x, 0x%x)", sector, count) sectors_read = [] @@ -165,7 +171,7 @@ def read_sectors(self, sector, count): return b"".join(sectors_read) - def _read(self, offset, length): + def _read(self, offset: int, length: int) -> bytes: sector = offset // self.sector_size count = (length + self.sector_size - 1) // self.sector_size @@ -173,7 +179,7 @@ def _read(self, offset, length): class RegionTable: - def __init__(self, fh, offset): + def __init__(self, fh: BinaryIO, offset: int): self.fh = fh self.offset = offset @@ -185,7 +191,7 @@ def __init__(self, fh, offset): self.entries = c_vhdx.region_table_entry[self.header.entry_count](fh) self.lookup = {UUID(bytes_le=e.guid): e for e in self.entries} - def get(self, guid, required=True): + def get(self, guid: UUID, required: bool = True) -> c_vhdx.region_table_entry | None: data = self.lookup.get(guid) if not data and required: raise InvalidVirtualDisk(f"Missing required region: {guid}") @@ -193,7 +199,7 @@ def get(self, guid, required=True): class BlockAllocationTable: - def __init__(self, vhdx, offset): + def __init__(self, vhdx: VHDX, offset: int): self.vhdx = vhdx self.offset = offset self.chunk_ratio = vhdx._chunk_ratio @@ -208,7 +214,7 @@ def __init__(self, vhdx, offset): self.get = lru_cache(4096)(self.get) - def get(self, entry): + def get(self, entry: int) -> c_vhdx.bat_entry: """Get a BAT entry.""" if entry + 1 > self.entry_count: raise ValueError(f"Invalid entry for BAT lookup: {entry} (max entry is {self.entry_count - 1})") @@ -216,13 +222,13 @@ def get(self, entry): self.vhdx.fh.seek(self.offset + entry * 8) return c_vhdx.bat_entry(self.vhdx.fh) - def pb(self, block): + def pb(self, block: int) -> c_vhdx.bat_entry: """Get a payload block entry for a given block.""" # Calculate how many interleaved sector bitmap entries there must be for this block sb_entries = block // self.chunk_ratio return self.get(block + sb_entries) - def sb(self, block): + def sb(self, block: int) -> c_vhdx.bat_entry: """Get a sector bitmap entry for a given block.""" # Calculate how many interleaved sector bitmap entries there must be for this block num_sb = block // self.chunk_ratio @@ -230,14 +236,14 @@ def sb(self, block): class ParentLocator: - def __init__(self, fh): + def __init__(self, fh: BinaryIO): self.fh = fh self.offset = fh.tell() self.header = c_vhdx.parent_locator_header(fh) self.type = UUID(bytes_le=self.header.locator_type) self._entries = c_vhdx.parent_locator_entry[self.header.key_value_count](fh) - self.entries = {} + self.entries: dict[str, str] = {} for entry in self._entries: fh.seek(self.offset + entry.key_offset) key = fh.read(entry.key_length).decode("utf-16-le") @@ -248,7 +254,7 @@ def __init__(self, fh): class MetadataTable: - METADATA_MAP = { + METADATA_MAP: Final[dict[UUID, Callable[[BinaryIO], Any]]] = { FILE_PARAMETERS_GUID: c_vhdx.file_parameters, VIRTUAL_DISK_SIZE_GUID: c_vhdx.virtual_disk_size, VIRTUAL_DISK_ID_GUID: c_vhdx.virtual_disk_id, @@ -257,7 +263,7 @@ class MetadataTable: PARENT_LOCATOR_GUID: ParentLocator, } - def __init__(self, fh, offset, length): + def __init__(self, fh: BinaryIO, offset: int, length: int): self.fh = fh self.offset = offset self.length = length @@ -277,14 +283,14 @@ def __init__(self, fh, offset, length): value = self.METADATA_MAP[item_id](fh) self.lookup[item_id] = value - def get(self, guid, required=True): + def get(self, guid: UUID, required: bool = True) -> Any | None: data = self.lookup.get(guid) if not data and required: raise InvalidVirtualDisk(f"Missing required region: {guid}") return data -def _iter_partial_runs(bitmap, start_idx, length): +def _iter_partial_runs(bitmap: bytes, start_idx: int, length: int) -> Iterator[tuple[int, int]]: current_type = (bitmap[0] & (1 << start_idx)) >> start_idx current_count = 0 @@ -311,7 +317,7 @@ def _iter_partial_runs(bitmap, start_idx, length): yield (current_type, current_count) -def open_parent(path, locator): +def open_parent(path: Path, locator: dict[str, str]) -> VHDX: try: filepath = path.joinpath(locator["relative_path"].replace("\\", "/")) if not filepath.exists(): diff --git a/dissect/hypervisor/disk/vmdk.py b/dissect/hypervisor/disk/vmdk.py index 2d5e8dd..0ca9662 100644 --- a/dissect/hypervisor/disk/vmdk.py +++ b/dissect/hypervisor/disk/vmdk.py @@ -11,6 +11,7 @@ from dataclasses import dataclass from functools import lru_cache from pathlib import Path +from typing import Any, BinaryIO from dissect.util.stream import AlignedStream @@ -27,14 +28,11 @@ class VMDK(AlignedStream): - def __init__(self, fh): + def __init__(self, fh: BinaryIO | Path | str | list[BinaryIO | Path | str]): """ Input can be a file handle to a Disk Descriptor file or a list of file handles to multiple VMDK files. """ - if not isinstance(fh, list): - fhs = [fh] - else: - fhs = fh + fhs = [fh] if not isinstance(fh, list) else fh self.disks = [] self.parent = None @@ -92,7 +90,7 @@ def __init__(self, fh): super().__init__(size) - def read_sectors(self, sector, count): + def read_sectors(self, sector: int, count: int) -> bytes: log.debug("VMDK::read_sectors(0x%x, 0x%x)", sector, count) sectors_read = [] @@ -113,7 +111,7 @@ def read_sectors(self, sector, count): return b"".join(sectors_read) - def _read(self, offset, length): + def _read(self, offset: int, length: int) -> bytes: log.debug("VMDK::_read(0x%x, 0x%x)", offset, length) sector = offset // SECTOR_SIZE @@ -123,7 +121,7 @@ def _read(self, offset, length): class RawDisk: - def __init__(self, fh, size=None, offset=0, sector_offset=0): + def __init__(self, fh: BinaryIO, size: int | None = None, offset: int = 0, sector_offset: int = 0): self.fh = fh self.offset = offset self.sector_offset = sector_offset @@ -141,7 +139,7 @@ def __init__(self, fh, size=None, offset=0, sector_offset=0): self.seek = fh.seek self.tell = fh.tell - def read_sectors(self, sector, count): + def read_sectors(self, sector: int, count: int) -> bytes: log.debug("RawDisk::read_sectors(0x%x)", sector) self.fh.seek((sector - self.sector_offset) * SECTOR_SIZE) @@ -149,7 +147,9 @@ def read_sectors(self, sector, count): class SparseDisk: - def __init__(self, fh, parent=None, offset=0, sector_offset=0): + def __init__( + self, fh: BinaryIO, parent: VMDK | RawDisk | SparseDisk | None = None, offset: int = 0, sector_offset: int = 0 + ): self.fh = fh self.parent = parent self.offset = offset @@ -205,7 +205,7 @@ def __init__(self, fh, parent=None, offset=0, sector_offset=0): self._lookup_grain_table = lru_cache(128)(self._lookup_grain_table) - def _lookup_grain_table(self, directory): + def _lookup_grain_table(self, directory: int) -> list[int]: gtbl_offset = self._grain_directory[directory] if self.is_sesparse: @@ -233,7 +233,7 @@ def _lookup_grain_table(self, directory): return table - def _lookup_grain(self, grain): + def _lookup_grain(self, grain: int) -> int: gdir_entry, gtbl_entry = divmod(grain, self._grain_table_size) table = self._lookup_grain_table(gdir_entry) @@ -247,21 +247,23 @@ def _lookup_grain(self, grain): if grain_type in (c_vmdk.SESPARSE_GRAIN_TYPE_UNALLOCATED, c_vmdk.SESPARSE_GRAIN_TYPE_FALLTHROUGH): # Unallocated or scsi unmapped, fallthrough return 0 - elif grain_type == c_vmdk.SESPARSE_GRAIN_TYPE_ZERO: + if grain_type == c_vmdk.SESPARSE_GRAIN_TYPE_ZERO: # Sparse, zero grain return 1 - elif grain_type == c_vmdk.SESPARSE_GRAIN_TYPE_ALLOCATED: + if grain_type == c_vmdk.SESPARSE_GRAIN_TYPE_ALLOCATED: # Allocated cluster_sector_hi = (grain_entry & 0x0FFF000000000000) >> 48 cluster_sector_lo = (grain_entry & 0x0000FFFFFFFFFFFF) << 12 cluster_sector = cluster_sector_hi | cluster_sector_lo return self.header.grains_offset + cluster_sector * self.header.grain_size - else: - return grain_entry - else: - return 0 - def get_runs(self, sector, count): + raise ValueError("Unknown grain type") + + return grain_entry + + return 0 + + def get_runs(self, sector: int, count: int) -> list[tuple[int, int, int, int | None]]: disk_sector = sector - self.sector_offset run_type = None @@ -283,9 +285,7 @@ def get_runs(self, sector, count): grain_sector = self._lookup_grain(grain) read_sector_count = min(read_count, self.header.grain_size - grain_offset) - if run_type == 0 and grain_sector == 0: - run_count += read_sector_count - elif run_type == 1 and grain_sector == 1: + if (run_type == 0 and grain_sector == 0) or (run_type == 1 and grain_sector == 1): run_count += read_sector_count elif run_type and run_type > 1 and grain_sector == next_grain_sector: next_grain_sector += self.header.grain_size @@ -317,7 +317,7 @@ def get_runs(self, sector, count): return runs - def read_sectors(self, sector, count): + def read_sectors(self, sector: int, count: int) -> bytes: log.debug("SparseDisk::read_sectors(0x%x, 0x%x)", sector, count) runs = self.get_runs(sector, count) @@ -363,7 +363,7 @@ def read_sectors(self, sector, count): return b"".join(sectors_read) - def _read_compressed_grain(self, sector): + def _read_compressed_grain(self, sector: int) -> bytes: self.fh.seek(sector * SECTOR_SIZE) buf = self.fh.read(SECTOR_SIZE) @@ -385,7 +385,7 @@ def _read_compressed_grain(self, sector): class SparseExtentHeader: - def __init__(self, fh): + def __init__(self, fh: BinaryIO): magic = fh.read(4) fh.seek(-4, io.SEEK_CUR) @@ -398,7 +398,7 @@ def __init__(self, fh): else: raise NotImplementedError("Unsupported sparse extent") - def __getattr__(self, attr): + def __getattr__(self, attr: str) -> Any: return getattr(self.hdr, attr) @@ -461,7 +461,7 @@ def parse(cls, vmdk_config: str) -> DiskDescriptor: Resources: - https://github.com/libyal/libvmdk/blob/main/documentation/VMWare%20Virtual%20Disk%20Format%20(VMDK).asciidoc - """ # noqa: E501 + """ descriptor_settings = {} extents: list[ExtentDescriptor] = [] @@ -474,7 +474,7 @@ def parse(cls, vmdk_config: str) -> DiskDescriptor: if not line or line.startswith("#"): continue - if line.startswith("RW ") or line.startswith("RDONLY ") or line.startswith("NOACCESS "): + if line.startswith(("RW ", "RDONLY ", "NOACCESS ")): match = RE_EXTENT_DESCRIPTOR.search(line) if not match: @@ -529,7 +529,7 @@ def __str__(self) -> str: return str_template.format(descriptor_settings, extents, disk_db) -def open_parent(path, filename_hint): +def open_parent(path: Path, filename_hint: str) -> VMDK: try: filename_hint = filename_hint.replace("\\", "/") hint_path, _, filename = filename_hint.rpartition("/") @@ -539,6 +539,6 @@ def open_parent(path, filename_hint): filepath = path.parent.joinpath(hint_path_name).joinpath(filename) vmdk = VMDK(filepath) except Exception as err: - raise IOError("Failed to open parent disk with hint {} from path {}: {}".format(filename_hint, path, err)) + raise IOError(f"Failed to open parent disk with hint {filename_hint} from path {path}: {err}") return vmdk diff --git a/dissect/hypervisor/tools/envelope.py b/dissect/hypervisor/tools/envelope.py index 5dddf90..c83350b 100644 --- a/dissect/hypervisor/tools/envelope.py +++ b/dissect/hypervisor/tools/envelope.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import argparse import sys from pathlib import Path @@ -5,7 +7,7 @@ from dissect.hypervisor.util.envelope import Envelope, KeyStore -def main(): +def main() -> int: parser = argparse.ArgumentParser(description="ESXi envelope file decrypter") parser.add_argument("envelope", type=Path, help="envelope file") parser.add_argument("-ks", "--keystore", type=Path, required=True, help="keystore file") @@ -22,6 +24,8 @@ def main(): with args.output.open("wb") as fhout: fhout.write(envelope.decrypt(keystore.key)) + return 0 + if __name__ == "__main__": try: diff --git a/dissect/hypervisor/util/envelope.py b/dissect/hypervisor/util/envelope.py index 0dc62da..07d130e 100644 --- a/dissect/hypervisor/util/envelope.py +++ b/dissect/hypervisor/util/envelope.py @@ -3,11 +3,12 @@ # - /usr/lib/vmware/tpm/bin/keypersist # - /lib/libvmlibs.so +from __future__ import annotations + import hashlib import io from base64 import b64decode -from collections import namedtuple -from typing import BinaryIO, Dict +from typing import BinaryIO, NamedTuple from urllib.parse import unquote from uuid import UUID @@ -97,7 +98,10 @@ DECRYPT_CHUNK_SIZE = 1024 * 1024 * 4 -EnvelopeAttribute = namedtuple("EnvelopeAttribute", ("type", "flag", "value")) +class EnvelopeAttribute(NamedTuple): + type: int + flag: int + value: bytes class Envelope: @@ -142,7 +146,7 @@ def __init__(self, fh: BinaryIO, verify: bool = True): self.size = size - (2 * ENVELOPE_BLOCK_SIZE) self.data = RangeStream(self.fh, ENVELOPE_BLOCK_SIZE, self.size) - def decrypt(self, key: bytes, aad: bytes = None) -> bytes: + def decrypt(self, key: bytes, aad: bytes | None = None) -> bytes: """Decrypt the data in this envelope. Arguments: @@ -200,7 +204,7 @@ def decrypt(self, key: bytes, aad: bytes = None) -> bytes: class KeyStore: """Implements a file based keystore as used in ESXi.""" - def __init__(self, store: Dict[str, str]): + def __init__(self, store: dict[str, str]): self.store = store self.mode = self.store.get("mode", None) @@ -235,7 +239,7 @@ def key(self) -> bytes: return self._key @classmethod - def from_text(cls, text: str): + def from_text(cls, text: str) -> KeyStore: """Parse a key store from a string. Arguments: @@ -268,7 +272,7 @@ def from_text(cls, text: str): return cls(store) -def _read_envelope_attributes(buf: BinaryIO) -> Dict[str, EnvelopeAttribute]: +def _read_envelope_attributes(buf: BinaryIO) -> dict[str, EnvelopeAttribute]: attributes = {} while True: try: @@ -320,7 +324,7 @@ def _pack_envelope_header(envelope: Envelope, block_size: int = ENVELOPE_BLOCK_S return stream.getvalue() -def _pack_attributes(stream: BinaryIO, attributes: Dict[str, EnvelopeAttribute]): +def _pack_attributes(stream: BinaryIO, attributes: dict[str, EnvelopeAttribute]) -> None: for name, attribute in attributes.items(): c_envelope.AttributeType.write(stream, attribute.type) c_envelope.uint8.write(stream, attribute.flag) diff --git a/dissect/hypervisor/util/vmtar.py b/dissect/hypervisor/util/vmtar.py index 86009a3..4b79203 100644 --- a/dissect/hypervisor/util/vmtar.py +++ b/dissect/hypervisor/util/vmtar.py @@ -1,6 +1,8 @@ # References: # - /bin/vmtar +from __future__ import annotations + import struct import tarfile @@ -12,8 +14,13 @@ class VisorTarInfo(tarfile.TarInfo): an offset specified in the header. """ + is_visor: bool + offset_data: int | None + textPgs: int | None + fixUpPgs: int | None + @classmethod - def frombuf(cls, buf, encoding, errors): + def frombuf(cls, buf: bytes, encoding: str, errors: str) -> VisorTarInfo: obj = super().frombuf(buf, encoding, errors) obj.is_visor = buf[257:264] == b"visor " @@ -28,7 +35,7 @@ def frombuf(cls, buf, encoding, errors): return obj - def _proc_member(self, tarfile): + def _proc_member(self, tarfile: tarfile.TarFile) -> VisorTarInfo | tarfile.TarInfo: if self.is_visor and self.offset_data: # Don't advance the offset with the filesize tarfile.offset = tarfile.fileobj.tell() @@ -38,13 +45,13 @@ def _proc_member(self, tarfile): self._apply_pax_info(tarfile.pax_headers, tarfile.encoding, tarfile.errors) return self - else: - return super()._proc_member(tarfile) + + return super()._proc_member(tarfile) -def VisorTarFile(*args, **kwargs): - return tarfile.TarFile(tarinfo=VisorTarInfo, *args, **kwargs) +def VisorTarFile(*args, **kwargs) -> tarfile.TarFile: + return tarfile.TarFile(*args, **kwargs, tarinfo=VisorTarInfo) -def open(*args, **kwargs): - return tarfile.open(tarinfo=VisorTarInfo, *args, **kwargs) +def open(*args, **kwargs) -> tarfile.TarFile: + return tarfile.open(*args, **kwargs, tarinfo=VisorTarInfo) diff --git a/pyproject.toml b/pyproject.toml index 153bfb6..8a8d670 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,13 +49,56 @@ dev = [ [project.scripts] envelope-decrypt = "dissect.hypervisor.tools.envelope:main" -[tool.black] +[tool.ruff] line-length = 120 +required-version = ">=0.9.0" -[tool.isort] -profile = "black" -known_first_party = ["dissect.hypervisor"] -known_third_party = ["dissect"] +[tool.ruff.format] +docstring-code-format = true + +[tool.ruff.lint] +select = [ + "F", + "E", + "W", + "I", + "UP", + "YTT", + "ANN", + "B", + "C4", + "DTZ", + "T10", + "FA", + "ISC", + "G", + "INP", + "PIE", + "PYI", + "PT", + "Q", + "RSE", + "RET", + "SLOT", + "SIM", + "TID", + "TCH", + "PTH", + "PLC", + "TRY", + "FLY", + "PERF", + "FURB", + "RUF", +] +ignore = ["E203", "B904", "UP024", "ANN002", "ANN003", "ANN204", "ANN401", "SIM105", "TRY003"] + +[tool.ruff.lint.per-file-ignores] +"tests/docs/**" = ["INP001"] + +[tool.ruff.lint.isort] +known-first-party = ["dissect.hypervisor"] +known-third-party = ["dissect"] [tool.setuptools] license-files = ["LICENSE", "COPYRIGHT"] diff --git a/tests/conftest.py b/tests/conftest.py index 3acd221..91e0069 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,22 +1,27 @@ +from __future__ import annotations + import gzip -import os -from typing import BinaryIO, Iterator, TextIO +from pathlib import Path +from typing import TYPE_CHECKING, BinaryIO, TextIO import pytest +if TYPE_CHECKING: + from collections.abc import Iterator + -def absolute_path(filename) -> str: - return os.path.join(os.path.dirname(__file__), filename) +def absolute_path(filename: str) -> Path: + return Path(__file__).parent / filename def open_file(name: str, mode: str = "rb") -> Iterator[BinaryIO]: - with open(absolute_path(name), mode) as f: - yield f + with absolute_path(name).open(mode) as fh: + yield fh def open_file_gz(name: str, mode: str = "rb") -> Iterator[BinaryIO]: - with gzip.GzipFile(absolute_path(name), mode) as f: - yield f + with gzip.GzipFile(absolute_path(name), mode) as fh: + yield fh @pytest.fixture @@ -66,17 +71,17 @@ def sesparse_vmdk() -> Iterator[BinaryIO]: @pytest.fixture def plain_hdd() -> Iterator[str]: - yield absolute_path("data/plain.hdd") + return absolute_path("data/plain.hdd") @pytest.fixture def expanding_hdd() -> Iterator[str]: - yield absolute_path("data/expanding.hdd") + return absolute_path("data/expanding.hdd") @pytest.fixture def split_hdd() -> Iterator[str]: - yield absolute_path("data/split.hdd") + return absolute_path("data/split.hdd") @pytest.fixture diff --git a/tests/test_envelope.py b/tests/test_envelope.py index 01e4db5..66bd8dc 100644 --- a/tests/test_envelope.py +++ b/tests/test_envelope.py @@ -1,4 +1,7 @@ +from __future__ import annotations + import hashlib +from typing import BinaryIO import pytest @@ -10,7 +13,7 @@ ) -def test_envelope_keystore(keystore): +def test_envelope_keystore(keystore: BinaryIO) -> None: store = KeyStore.from_text(keystore.read()) assert store.store[".encoding"] == "UTF-8" @@ -28,7 +31,7 @@ def test_envelope_keystore(keystore): assert store._key == bytes.fromhex("ae29634dca8627013f7c7cf2d05b4d5cc444d42cd4e8acbaa4fb815dda3b3066") -def test_envelope(envelope): +def test_envelope(envelope: BinaryIO) -> None: ev = Envelope(envelope) assert ev.key_info == "7e62cec5-6aef-4d7e-838b-cae32eefd251" @@ -40,7 +43,7 @@ def test_envelope(envelope): @pytest.mark.skipif((not HAS_PYCRYPTODOME and not HAS_PYSTANDALONE), reason="No crypto module available") -def test_envelope_decrypt(envelope, keystore): +def test_envelope_decrypt(envelope: BinaryIO, keystore: BinaryIO) -> None: ev = Envelope(envelope) store = KeyStore.from_text(keystore.read()) diff --git a/tests/test_hdd.py b/tests/test_hdd.py index 1ee6e6a..98861f7 100644 --- a/tests/test_hdd.py +++ b/tests/test_hdd.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import gzip from pathlib import Path from typing import BinaryIO @@ -8,7 +10,7 @@ Path_open = Path.open -def mock_open_gz(self, *args, **kwargs) -> BinaryIO: +def mock_open_gz(self: Path, *args, **kwargs) -> BinaryIO: if self.suffix.lower() != ".hds": return Path_open(self, *args, **kwargs) diff --git a/tests/test_hyperv.py b/tests/test_hyperv.py index 4cd0505..ea494f6 100644 --- a/tests/test_hyperv.py +++ b/tests/test_hyperv.py @@ -1,7 +1,11 @@ +from __future__ import annotations + +from typing import BinaryIO + from dissect.hypervisor.descriptor.hyperv import HyperVFile -def test_hyperv_vmcx(vmcx): +def test_hyperv_vmcx(vmcx: BinaryIO) -> None: hf = HyperVFile(vmcx) assert hf.header is hf.headers[0] @@ -18,7 +22,7 @@ def test_hyperv_vmcx(vmcx): assert len(obj["configuration"]["settings"].keys()) == 6 -def test_hyperv_vmrs(vmrs): +def test_hyperv_vmrs(vmrs: BinaryIO) -> None: hf = HyperVFile(vmrs) assert hf.header is hf.headers[0] diff --git a/tests/test_ovf.py b/tests/test_ovf.py index bd3b13a..d3a71e8 100644 --- a/tests/test_ovf.py +++ b/tests/test_ovf.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from io import StringIO from dissect.hypervisor.descriptor.ovf import OVF @@ -163,6 +165,6 @@ """ # noqa -def test_ovf(): +def test_ovf() -> None: ovf = OVF(StringIO(TEST_OVF)) assert list(ovf.disks()) == ["disk1.vmdk", "disk2.vmdk", "disk3.vmdk"] diff --git a/tests/test_pvs.py b/tests/test_pvs.py index 2bec592..db163ec 100644 --- a/tests/test_pvs.py +++ b/tests/test_pvs.py @@ -1,9 +1,11 @@ +from __future__ import annotations + from io import StringIO from dissect.hypervisor.descriptor.pvs import PVS -def test_pvs(): +def test_pvs() -> None: xml = """ diff --git a/tests/test_vbox.py b/tests/test_vbox.py index be2d107..7702ffd 100644 --- a/tests/test_vbox.py +++ b/tests/test_vbox.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from io import StringIO from dissect.hypervisor.descriptor.vbox import VBox diff --git a/tests/test_vhd.py b/tests/test_vhd.py index fe68d93..4b35713 100644 --- a/tests/test_vhd.py +++ b/tests/test_vhd.py @@ -1,7 +1,11 @@ +from __future__ import annotations + +from typing import BinaryIO + from dissect.hypervisor.disk.vhd import VHD, DynamicDisk, FixedDisk -def test_vhd_fixed(fixed_vhd): +def test_vhd_fixed(fixed_vhd: BinaryIO) -> None: vhd = VHD(fixed_vhd) assert vhd.size == 10485760 assert isinstance(vhd.disk, FixedDisk) @@ -26,10 +30,10 @@ def test_vhd_fixed(fixed_vhd): ) vhd.seek(0x200000) - assert vhd.read(512) == b"\xFF" * 512 + assert vhd.read(512) == b"\xff" * 512 -def test_vhd_dynamic(dynamic_vhd): +def test_vhd_dynamic(dynamic_vhd: BinaryIO) -> None: vhd = VHD(dynamic_vhd) assert vhd.size == 10485760 assert isinstance(vhd.disk, DynamicDisk) @@ -54,5 +58,5 @@ def test_vhd_dynamic(dynamic_vhd): ) vhd.seek(0x200000) - assert vhd.read(512) == b"\xFF" * 512 + assert vhd.read(512) == b"\xff" * 512 assert vhd.disk.read_sectors(0x3FFF, 16) == (b"\x00" * 512 * 16) diff --git a/tests/test_vhdx.py b/tests/test_vhdx.py index 4c853c7..1fc03f8 100644 --- a/tests/test_vhdx.py +++ b/tests/test_vhdx.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from typing import BinaryIO from uuid import UUID import pytest @@ -5,7 +8,7 @@ from dissect.hypervisor.disk.vhdx import VHDX, _iter_partial_runs, c_vhdx -def test_vhdx_fixed(fixed_vhdx): +def test_vhdx_fixed(fixed_vhdx: BinaryIO) -> None: v = VHDX(fixed_vhdx) assert v.size == 0xA00000 @@ -35,14 +38,14 @@ def test_vhdx_fixed(fixed_vhdx): ) v.seek(0x200000) - assert v.read(512) == b"\xFF" * 512 + assert v.read(512) == b"\xff" * 512 for block in range(v.bat._pb_count): block_entry = v.bat.pb(block) assert block_entry.state == c_vhdx.PAYLOAD_BLOCK_FULLY_PRESENT -def test_vhdx_dynamic(dynamic_vhdx): +def test_vhdx_dynamic(dynamic_vhdx: BinaryIO) -> None: v = VHDX(dynamic_vhdx) assert v.size == 0xA00000 @@ -72,30 +75,30 @@ def test_vhdx_dynamic(dynamic_vhdx): ) v.seek(0x200000) - assert v.read(512) == b"\xFF" * 512 + assert v.read(512) == b"\xff" * 512 -def test_vhdx_differencing(differencing_vhdx): - with pytest.raises(IOError): +def test_vhdx_differencing(differencing_vhdx: BinaryIO) -> None: + with pytest.raises(IOError, match="Failed to open parent disk with locator"): VHDX(differencing_vhdx) @pytest.mark.parametrize( - "test_input,expected", + ("test_input", "expected"), [ - ((b"\xFF", 0, 8), [(1, 8)]), - ((b"\xFF", 4, 4), [(1, 4)]), + ((b"\xff", 0, 8), [(1, 8)]), + ((b"\xff", 4, 4), [(1, 4)]), ((b"\x00", 0, 8), [(0, 8)]), ((b"\x00", 4, 4), [(0, 4)]), - ((b"\xFF\x00", 0, 8), [(1, 8)]), - ((b"\xFF\x00", 4, 8), [(1, 4), (0, 4)]), + ((b"\xff\x00", 0, 8), [(1, 8)]), + ((b"\xff\x00", 4, 8), [(1, 4), (0, 4)]), ((b"\x00\x00", 0, 12), [(0, 12)]), - ((b"\x00\xFF", 4, 8), [(0, 4), (1, 4)]), - ((b"\xF0\xF0", 0, 16), [(0, 4), (1, 4), (0, 4), (1, 4)]), - ((b"\x0F\x0F", 0, 16), [(1, 4), (0, 4), (1, 4), (0, 4)]), + ((b"\x00\xff", 4, 8), [(0, 4), (1, 4)]), + ((b"\xf0\xf0", 0, 16), [(0, 4), (1, 4), (0, 4), (1, 4)]), + ((b"\x0f\x0f", 0, 16), [(1, 4), (0, 4), (1, 4), (0, 4)]), ((b"\x00", 0, 6), [(0, 6)]), ((b"\x00", 1, 6), [(0, 6)]), ], ) -def test_vhdx_partial_runs(test_input, expected): +def test_vhdx_partial_runs(test_input: tuple[bytes, int, int], expected: list[tuple[int, int]]) -> None: assert list(_iter_partial_runs(*test_input)) == expected diff --git a/tests/test_vmdk.py b/tests/test_vmdk.py index e43546e..467a872 100644 --- a/tests/test_vmdk.py +++ b/tests/test_vmdk.py @@ -1,10 +1,14 @@ +from __future__ import annotations + +from typing import BinaryIO + import pytest from dissect.hypervisor.disk.c_vmdk import c_vmdk from dissect.hypervisor.disk.vmdk import VMDK, DiskDescriptor, ExtentDescriptor -def test_vmdk_sesparse(sesparse_vmdk): +def test_vmdk_sesparse(sesparse_vmdk: BinaryIO) -> None: vmdk = VMDK(sesparse_vmdk) disk = vmdk.disks[0] @@ -23,7 +27,7 @@ def test_vmdk_sesparse(sesparse_vmdk): @pytest.mark.parametrize( - "extent_description, expected_extents", + ("extent_description", "expected_extents"), [ ( 'RW 123456789 SPARSE "disk.vmdk"', @@ -188,12 +192,12 @@ def test_vmdk_sesparse(sesparse_vmdk): "emoji-four-parts", ), ) -def test_vmdk_extent_description(extent_description: str, expected_extents: list) -> None: +def test_vmdk_extent_description(extent_description: str, expected_extents: list[ExtentDescriptor]) -> None: """test if we correctly parse VMDK sparse and flat extent descriptions. Resources: - https://github.com/libyal/libvmdk/blob/main/documentation/VMWare%20Virtual%20Disk%20Format%20(VMDK).asciidoc#22-extent-descriptions - """ # noqa: E501 + """ descriptor = DiskDescriptor.parse(extent_description) assert descriptor.extents == expected_extents diff --git a/tests/test_vmtar.py b/tests/test_vmtar.py index 72ceb52..a3769fa 100644 --- a/tests/test_vmtar.py +++ b/tests/test_vmtar.py @@ -1,7 +1,11 @@ +from __future__ import annotations + +from typing import BinaryIO + from dissect.hypervisor.util import vmtar -def test_vmtar(vgz): +def test_vmtar(vgz: BinaryIO) -> None: tar = vmtar.open(fileobj=vgz) members = {member.name: member for member in tar.getmembers()} diff --git a/tests/test_vmx.py b/tests/test_vmx.py index 3894e40..403f13a 100644 --- a/tests/test_vmx.py +++ b/tests/test_vmx.py @@ -1,9 +1,13 @@ +from __future__ import annotations + +from typing import BinaryIO + import pytest from dissect.hypervisor.descriptor.vmx import HAS_PYCRYPTODOME, HAS_PYSTANDALONE, VMX -def test_vmx(): +def test_vmx() -> None: data_scsi = """ scsi0.virtualDev = "lsisas1068" scsi0.present = "TRUE" @@ -62,7 +66,7 @@ def test_vmx(): @pytest.mark.skipif((not HAS_PYCRYPTODOME and not HAS_PYSTANDALONE), reason="No crypto module available") -def test_vmx_encrypted(encrypted_vmx): +def test_vmx_encrypted(encrypted_vmx: BinaryIO) -> None: vmx = VMX.parse(encrypted_vmx.read().decode()) assert vmx.encrypted diff --git a/tox.ini b/tox.ini index bfcf133..17e3629 100644 --- a/tox.ini +++ b/tox.ini @@ -32,32 +32,19 @@ commands = [testenv:fix] package = skip deps = - black==23.1.0 - isort==5.11.4 + ruff==0.9.2 commands = - black dissect tests - isort dissect tests + ruff format dissect tests [testenv:lint] package = skip deps = - black==23.1.0 - flake8 - flake8-black - flake8-isort - isort==5.11.4 + ruff==0.9.2 vermin commands = - flake8 dissect tests + ruff check dissect tests vermin -t=3.9- --no-tips --lint dissect tests -[flake8] -max-line-length = 120 -extend-ignore = - # See https://github.com/PyCQA/pycodestyle/issues/373 - E203, -statistics = True - [testenv:docs-build] allowlist_externals = make deps =