diff --git a/dissect/ntfs/__init__.py b/dissect/ntfs/__init__.py index ccac451..412fbe1 100644 --- a/dissect/ntfs/__init__.py +++ b/dissect/ntfs/__init__.py @@ -10,6 +10,8 @@ "ACE", "ACL", "ATTRIBUTE_TYPE_CODE", + "NTFS", + "NTFS_SIGNATURE", "Attribute", "AttributeHeader", "AttributeRecord", @@ -17,8 +19,6 @@ "IndexEntry", "Mft", "MftRecord", - "NTFS", - "NTFS_SIGNATURE", "Secure", "SecurityDescriptor", "UsnJrnl", diff --git a/dissect/ntfs/attr.py b/dissect/ntfs/attr.py index 2ca4d65..8bb69a2 100644 --- a/dissect/ntfs/attr.py +++ b/dissect/ntfs/attr.py @@ -1,8 +1,7 @@ from __future__ import annotations import io -from datetime import datetime -from typing import TYPE_CHECKING, Any, BinaryIO, Iterator, Optional +from typing import TYPE_CHECKING, Any, BinaryIO from dissect.util.stream import RangeStream, RunlistStream from dissect.util.ts import wintimestamp @@ -18,6 +17,9 @@ from dissect.ntfs.util import ensure_volume, get_full_path, ts_to_ns if TYPE_CHECKING: + from collections.abc import Iterator + from datetime import datetime + from dissect.ntfs.mft import MftRecord @@ -31,9 +33,9 @@ class Attribute: header: The AttributeHeader for this Attribute. """ - __slots__ = ("record", "header", "attribute") + __slots__ = ("attribute", "header", "record") - def __init__(self, header: AttributeHeader, record: Optional[MftRecord] = None): + def __init__(self, header: AttributeHeader, record: MftRecord | None = None): self.header = header self.record = record self.attribute = None @@ -52,7 +54,7 @@ def __repr__(self) -> str: return f"<${self.header.type.name} name={self.header.name}>" @classmethod - def from_fh(cls, fh: BinaryIO, record: Optional[MftRecord] = None) -> Attribute: + def from_fh(cls, fh: BinaryIO, record: MftRecord | None = None) -> Attribute: """Parse an attribute from a file-like object. Args: @@ -62,7 +64,7 @@ def from_fh(cls, fh: BinaryIO, record: Optional[MftRecord] = None) -> Attribute: return cls(AttributeHeader(fh, 0, record), record) @classmethod - def from_bytes(cls, data: bytes, record: Optional[MftRecord] = None) -> Attribute: + def from_bytes(cls, data: bytes, record: MftRecord | None = None) -> Attribute: """Parse an attribute from bytes. Args: @@ -120,9 +122,9 @@ class AttributeHeader: offset: The offset in the file-like object to parse an attribute header from. """ - __slots__ = ("record", "fh", "offset", "header") + __slots__ = ("fh", "header", "offset", "record") - def __init__(self, fh: BinaryIO, offset: int, record: Optional[MftRecord] = None): + def __init__(self, fh: BinaryIO, offset: int, record: MftRecord | None = None): self.fh = fh self.offset = offset self.record = record @@ -134,7 +136,7 @@ def __repr__(self) -> str: return f"<${self.type.name} size={self.size}>" @classmethod - def from_bytes(cls, data: bytes, record: Optional[MftRecord] = None) -> AttributeHeader: + def from_bytes(cls, data: bytes, record: MftRecord | None = None) -> AttributeHeader: """Parse an attribute header from bytes. Args: @@ -175,22 +177,22 @@ def size(self) -> int: return self.header.Form.Resident.ValueLength if self.resident else self.header.Form.Nonresident.FileSize @property - def allocated_size(self) -> Optional[int]: + def allocated_size(self) -> int | None: """Return the allocated size if non-resident, else None.""" return self.header.Form.Nonresident.AllocatedLength if not self.resident else None @property - def lowest_vcn(self) -> Optional[int]: + def lowest_vcn(self) -> int | None: """Return the lowest VCN if non-resident, else None.""" return self.header.Form.Nonresident.LowestVcn if not self.resident else None @property - def highest_vcn(self) -> Optional[int]: + def highest_vcn(self) -> int | None: """Return the highest VCN if non-resident, else None.""" return self.header.Form.Nonresident.HighestVcn if not self.resident else None @property - def compression_unit(self) -> Optional[int]: + def compression_unit(self) -> int | None: """Return the compression unit if non-resident, else None.""" return self.header.Form.Nonresident.CompressionUnit if not self.resident else None @@ -242,16 +244,16 @@ def open(self) -> BinaryIO: self.offset + self.header.Form.Resident.ValueOffset, self.size, ) - else: - ntfs = self.record.ntfs if self.record else None - ensure_volume(ntfs) - return RunlistStream( - ntfs.fh, - self.dataruns(), - self.size, - ntfs.cluster_size, - ) + ntfs = self.record.ntfs if self.record else None + ensure_volume(ntfs) + + return RunlistStream( + ntfs.fh, + self.dataruns(), + self.size, + ntfs.cluster_size, + ) def data(self) -> bytes: """Read and return all the data of this attribute. @@ -272,13 +274,11 @@ class AttributeRecord: __slots__ = ("record",) - def __init__(self, fh: BinaryIO, record: Optional[MftRecord] = None): + def __init__(self, fh: BinaryIO, record: MftRecord | None = None): self.record = record @classmethod - def from_fh( - cls, fh: BinaryIO, attr_type: ATTRIBUTE_TYPE_CODE, record: Optional[MftRecord] = None - ) -> AttributeRecord: + def from_fh(cls, fh: BinaryIO, attr_type: ATTRIBUTE_TYPE_CODE, record: MftRecord | None = None) -> AttributeRecord: """Parse an attribute from a file-like object. Selects a more specific :class:`AttributeRecord` class if one is available for the given attribute type. @@ -296,7 +296,7 @@ class AttributeList(AttributeRecord): __slots__ = ("entries",) - def __init__(self, fh: BinaryIO, record: Optional[MftRecord] = None): + def __init__(self, fh: BinaryIO, record: MftRecord | None = None): super().__init__(fh, record) offset = 0 @@ -343,7 +343,7 @@ class StandardInformation(AttributeRecord): __slots__ = ("attr",) - def __init__(self, fh: BinaryIO, record: Optional[MftRecord] = None): + def __init__(self, fh: BinaryIO, record: MftRecord | None = None): super().__init__(fh, record) # Data can be less than the _STANDARD_INFORMATION structure size, so pad remaining fields with null bytes data = fh.read().ljust(len(c_ntfs._STANDARD_INFORMATION), b"\x00") @@ -413,7 +413,7 @@ class FileName(AttributeRecord): __slots__ = ("attr",) - def __init__(self, fh: BinaryIO, record: Optional[MftRecord] = None): + def __init__(self, fh: BinaryIO, record: MftRecord | None = None): super().__init__(fh, record) data = fh.read().ljust(len(c_ntfs.STANDARD_INFORMATION_EX), b"\x00") self.attr = c_ntfs._FILE_NAME(data) @@ -489,9 +489,9 @@ def full_path(self) -> str: class ReparsePoint(AttributeRecord): """Specific :class:`AttributeRecord` parser for ``$REPARSE_POINT``.""" - __slots__ = ("attr", "tag_header", "buffer") + __slots__ = ("attr", "buffer", "tag_header") - def __init__(self, fh: BinaryIO, record: Optional[MftRecord] = None): + def __init__(self, fh: BinaryIO, record: MftRecord | None = None): super().__init__(fh, record) self.attr = c_ntfs._REPARSE_DATA_BUFFER(fh) data = io.BytesIO(fh.read(self.attr.ReparseDataLength)) @@ -512,7 +512,7 @@ def tag(self) -> IO_REPARSE_TAG: return self.attr.ReparseTag @property - def substitute_name(self) -> Optional[str]: + def substitute_name(self) -> str | None: if not self.tag_header: return None @@ -521,7 +521,7 @@ def substitute_name(self) -> Optional[str]: return self.buffer[offset : offset + length].decode("utf-16-le") @property - def print_name(self) -> Optional[str]: + def print_name(self) -> str | None: if not self.tag_header: return None diff --git a/dissect/ntfs/c_ntfs.py b/dissect/ntfs/c_ntfs.py index 83877f8..40924d4 100644 --- a/dissect/ntfs/c_ntfs.py +++ b/dissect/ntfs/c_ntfs.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import struct from dissect.cstruct import cstruct @@ -652,3 +654,4 @@ def bsf(value: int, size: int = 32) -> int: for i in range(size): if value & (1 << i): return i + return 0 diff --git a/dissect/ntfs/index.py b/dissect/ntfs/index.py index c06b758..ec01ca3 100644 --- a/dissect/ntfs/index.py +++ b/dissect/ntfs/index.py @@ -3,7 +3,7 @@ import io from enum import Enum, auto from functools import cached_property, lru_cache -from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Iterator, Optional +from typing import TYPE_CHECKING, Any, BinaryIO, Callable from dissect.ntfs.attr import AttributeRecord from dissect.ntfs.c_ntfs import ( @@ -24,6 +24,8 @@ from dissect.ntfs.util import apply_fixup if TYPE_CHECKING: + from collections.abc import Iterator + from dissect.ntfs.mft import MftRecord @@ -79,7 +81,7 @@ def index_buffer(self, vcn: int) -> IndexBuffer: return IndexBuffer(self, self._index_stream, vcn << self._vcn_size_shift, self.root.bytes_per_index_buffer) def search( - self, value: Any, exact: bool = True, cmp: Optional[Callable[[IndexEntry, Any], Match]] = None + self, value: Any, exact: bool = True, cmp: Callable[[IndexEntry, Any], Match] | None = None ) -> IndexEntry: """Perform a binary search on this index. @@ -114,8 +116,7 @@ def search( entry = _bsearch(entries, search_value, cmp) if not entry.is_node or (not entry.is_end and cmp(entry, search_value) == Match.Equal): break - else: - entries = list(self.index_buffer(entry.node_vcn).entries()) + entries = list(self.index_buffer(entry.node_vcn).entries()) if exact and (entry.is_end or cmp(entry, search_value) != Match.Equal): raise KeyError(f"Value not found: {value}") @@ -216,7 +217,7 @@ def __init__(self, index: Index, fh: BinaryIO, offset: int, size: int): buf = fh.read(size) if len(buf) != size: - raise EOFError() + raise EOFError if buf[:4] != b"INDX": raise BrokenIndexError("Broken INDX header") @@ -264,7 +265,7 @@ def dereference(self) -> MftRecord: """ record = self.index.record if not record or not record.ntfs or not record.ntfs.mft: - raise MftNotAvailableError() + raise MftNotAvailableError return record.ntfs.mft.get(segment_reference(self.header.FileReference)) @@ -284,7 +285,7 @@ def data(self) -> bytes: return self.buf[offset : offset + self.header.DataLength] @cached_property - def attribute(self) -> Optional[AttributeRecord]: + def attribute(self) -> AttributeRecord | None: """Return the :class:`dissect.ntfs.attr.AttributeRecord` of the attribute contained in this entry.""" if self.key_length and self.index.root.attribute_type: return AttributeRecord.from_fh( @@ -366,10 +367,9 @@ def _cmp_filename(entry: IndexEntry, value: str) -> Match: if value < test_value: return Match.Less - elif value == test_value: + if value == test_value: return Match.Equal - else: - return Match.Greater + return Match.Greater def _cmp_ulong(entry: IndexEntry, value: int) -> Match: @@ -380,7 +380,6 @@ def _cmp_ulong(entry: IndexEntry, value: int) -> Match: if value < test_value: return Match.Less - elif value == test_value: + if value == test_value: return Match.Equal - else: - return Match.Greater + return Match.Greater diff --git a/dissect/ntfs/mft.py b/dissect/ntfs/mft.py index 152b0d5..b978268 100644 --- a/dissect/ntfs/mft.py +++ b/dissect/ntfs/mft.py @@ -4,7 +4,7 @@ from functools import cached_property, lru_cache from io import BytesIO from operator import itemgetter -from typing import TYPE_CHECKING, Any, BinaryIO, Iterator, Optional, Union +from typing import TYPE_CHECKING, BinaryIO from dissect.ntfs.attr import Attribute, AttributeHeader from dissect.ntfs.c_ntfs import ( @@ -30,6 +30,8 @@ from dissect.ntfs.util import AttributeCollection, AttributeMap, apply_fixup if TYPE_CHECKING: + from collections.abc import Iterator + from dissect.ntfs.ntfs import NTFS @@ -41,13 +43,13 @@ class Mft: ntfs: An optional NTFS class instance. """ - def __init__(self, fh: BinaryIO, ntfs: Optional[NTFS] = None): + def __init__(self, fh: BinaryIO, ntfs: NTFS | None = None): self.fh = fh self.ntfs = ntfs self.get = lru_cache(4096)(self.get) - def __call__(self, ref, *args, **kwargs) -> MftRecord: + def __call__(self, ref: int | str | c_ntfs._MFT_SEGMENT_REFERENCE, *args, **kwargs) -> MftRecord: return self.get(ref, *args, **kwargs) @cached_property @@ -55,7 +57,7 @@ def root(self) -> MftRecord: """Return the root directory MFT record.""" return self.get(FILE_NUMBER_ROOT) - def _get_path(self, path: str, root: Optional[MftRecord] = None) -> MftRecord: + def _get_path(self, path: str, root: MftRecord | None = None) -> MftRecord: """Resolve a file path to the correct MFT record. Args: @@ -89,7 +91,7 @@ def _get_path(self, path: str, root: Optional[MftRecord] = None) -> MftRecord: return node - def get(self, ref: Union[int, str, c_ntfs._MFT_SEGMENT_REFERENCE], root: Optional[MftRecord] = None) -> MftRecord: + def get(self, ref: int | str | c_ntfs._MFT_SEGMENT_REFERENCE, root: MftRecord | None = None) -> MftRecord: """Retrieve an MFT record using a variety of methods. Supported references are: @@ -113,10 +115,11 @@ def get(self, ref: Union[int, str, c_ntfs._MFT_SEGMENT_REFERENCE], root: Optiona record = MftRecord.from_fh(self.fh, ref * record_size, ntfs=self.ntfs) record.segment = ref return record - elif isinstance(ref, str): + + if isinstance(ref, str): return self._get_path(ref, root) - else: - raise TypeError(f"Invalid MFT reference: {ref!r}") + + raise TypeError(f"Invalid MFT reference: {ref!r}") def segments(self, start: int = 0, end: int = -1) -> Iterator[MftRecord]: """Yield all valid MFT records, regardless if they're allocated or not. @@ -134,7 +137,7 @@ def segments(self, start: int = 0, end: int = -1) -> Iterator[MftRecord]: for segment in range(start, end + step, step): try: yield self.get(segment) - except Error: + except Error: # noqa: PERF203 continue except EOFError: break @@ -147,16 +150,16 @@ class MftRecord: """ def __init__(self): - self.ntfs: Optional[NTFS] = None - self.segment: Optional[int] = None - self.offset: Optional[int] = None - self.data: Optional[bytes] = None - self.header: Optional[c_ntfs._FILE_RECORD_SEGMENT_HEADER] = None + self.ntfs: NTFS | None = None + self.segment: int | None = None + self.offset: int | None = None + self.data: bytes | None = None + self.header: c_ntfs._FILE_RECORD_SEGMENT_HEADER | None = None def __repr__(self) -> str: return f"" - def __eq__(self, other: Any) -> bool: + def __eq__(self, other: object) -> bool: if isinstance(other, MftRecord): return self.segment == other.segment and self.header.SequenceNumber == other.header.SequenceNumber return False @@ -164,7 +167,7 @@ def __eq__(self, other: Any) -> bool: __hash__ = object.__hash__ @classmethod - def from_fh(cls, fh: BinaryIO, offset: int, ntfs: Optional[NTFS] = None) -> MftRecord: + def from_fh(cls, fh: BinaryIO, offset: int, ntfs: NTFS | None = None) -> MftRecord: """Parse an MFT record from a file-like object. Args: @@ -182,7 +185,7 @@ def from_fh(cls, fh: BinaryIO, offset: int, ntfs: Optional[NTFS] = None) -> MftR return obj @classmethod - def from_bytes(cls, data: bytes, ntfs: Optional[NTFS] = None) -> MftRecord: + def from_bytes(cls, data: bytes, ntfs: NTFS | None = None) -> MftRecord: """Parse an MFT record from bytes. Args: @@ -212,7 +215,7 @@ def get(self, path: str) -> MftRecord: MftNotAvailableError: If no MFT is available. """ if not self.ntfs or not self.ntfs.mft: - raise MftNotAvailableError() + raise MftNotAvailableError return self.ntfs.mft.get(path, root=self) @cached_property @@ -264,7 +267,7 @@ def resident(self) -> bool: return any(attr.header.resident for attr in self.attributes[ATTRIBUTE_TYPE_CODE.DATA]) @cached_property - def filename(self) -> Optional[str]: + def filename(self) -> str | None: """Return the first file name, or ``None`` if this record has no file names.""" filenames = self.filenames() return filenames[0] if filenames else None @@ -282,7 +285,7 @@ def filenames(self, ignore_dos: bool = False) -> list[str]: result.append((attr.flags, attr.file_name)) return [item[1] for item in sorted(result, key=itemgetter(0))] - def full_path(self, ignore_dos: bool = False) -> Optional[str]: + def full_path(self, ignore_dos: bool = False) -> str | None: """Return the first full path, or ``None`` if this record has no file names. Args: @@ -353,7 +356,7 @@ def reparse_point_record(self) -> MftRecord: raise NotAReparsePointError(f"{self!r} is not a reparse point") if not self.ntfs or not self.ntfs.mft: - raise MftNotAvailableError() + raise MftNotAvailableError reparse_point = self.attributes[ATTRIBUTE_TYPE_CODE.REPARSE_POINT] @@ -442,7 +445,7 @@ def index(self, name: str) -> Index: """ return Index(self, name) - def iterdir(self, dereference: bool = False, ignore_dos: bool = False) -> Iterator[Union[IndexEntry, MftRecord]]: + def iterdir(self, dereference: bool = False, ignore_dos: bool = False) -> Iterator[IndexEntry | MftRecord]: """Yield directory entries of this record. Args: @@ -460,7 +463,7 @@ def iterdir(self, dereference: bool = False, ignore_dos: bool = False) -> Iterat continue yield entry.dereference() if dereference else entry - def listdir(self, dereference: bool = False, ignore_dos: bool = False) -> dict[str, Union[IndexEntry, MftRecord]]: + def listdir(self, dereference: bool = False, ignore_dos: bool = False) -> dict[str, IndexEntry | MftRecord]: """Return a dictionary of the directory entries of this record. Args: diff --git a/dissect/ntfs/ntfs.py b/dissect/ntfs/ntfs.py index a74c9fd..2f44270 100644 --- a/dissect/ntfs/ntfs.py +++ b/dissect/ntfs/ntfs.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from functools import cached_property -from typing import BinaryIO, Iterator, Optional +from typing import TYPE_CHECKING, BinaryIO from dissect.ntfs.c_ntfs import ( ATTRIBUTE_TYPE_CODE, @@ -17,6 +19,9 @@ from dissect.ntfs.secure import Secure from dissect.ntfs.usnjrnl import UsnJrnl +if TYPE_CHECKING: + from collections.abc import Iterator + class NTFS: """Implementation for Microsoft NTFS. @@ -36,11 +41,11 @@ class NTFS: def __init__( self, - fh: Optional[BinaryIO] = None, - boot: Optional[BinaryIO] = None, - mft: Optional[BinaryIO] = None, - usnjrnl: Optional[BinaryIO] = None, - sds: Optional[BinaryIO] = None, + fh: BinaryIO | None = None, + boot: BinaryIO | None = None, + mft: BinaryIO | None = None, + usnjrnl: BinaryIO | None = None, + sds: BinaryIO | None = None, ): self.fh = fh @@ -130,11 +135,11 @@ def __init__( pass @cached_property - def serial(self) -> Optional[int]: + def serial(self) -> int | None: return self.boot_sector.SerialNumber if self.boot_sector else None @cached_property - def volume_name(self) -> Optional[str]: + def volume_name(self) -> str | None: if not self.mft: return None diff --git a/dissect/ntfs/secure.py b/dissect/ntfs/secure.py index 380186e..70fc872 100644 --- a/dissect/ntfs/secure.py +++ b/dissect/ntfs/secure.py @@ -2,13 +2,17 @@ import io from functools import lru_cache -from typing import BinaryIO, Iterator +from typing import TYPE_CHECKING, BinaryIO from uuid import UUID from dissect.util.sid import read_sid from dissect.ntfs.c_ntfs import ACE_OBJECT_FLAGS, ACE_TYPE, c_ntfs -from dissect.ntfs.mft import MftRecord + +if TYPE_CHECKING: + from collections.abc import Iterator + + from dissect.ntfs.mft import MftRecord class Secure: @@ -21,7 +25,7 @@ class Secure: sds: A file-like object of the ``$SDS`` stream, used when opening from separate system files. """ - def __init__(self, record: MftRecord = None, sds: BinaryIO = None): + def __init__(self, record: MftRecord = None, sds: BinaryIO | None = None): self.record = record self.sds = None self.sii = None @@ -187,18 +191,17 @@ def __init__(self, fh: BinaryIO): def __repr__(self) -> str: if self.is_standard_ace: return f"<{self.header.AceType.name} mask=0x{self.mask:x} sid={self.sid}>" - elif self.is_compound_ace: + if self.is_compound_ace: return ( f"<{self.header.AceType.name} mask=0x{self.mask:x} type={self.compound_type.name}" f" server_sid={self.server_sid} client_sid={self.sid}>" ) - elif self.is_object_ace: + if self.is_object_ace: return ( f"<{self.header.AceType.name} mask=0x{self.mask:x} flags={self.flags} object_type={self.object_type}" f" inherited_object_type={self.inherited_object_type} sid={self.sid}>" ) - else: - return f"" + return f"" @property def type(self) -> ACE_TYPE: diff --git a/dissect/ntfs/stream.py b/dissect/ntfs/stream.py index acff539..68b6610 100644 --- a/dissect/ntfs/stream.py +++ b/dissect/ntfs/stream.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import io from typing import BinaryIO diff --git a/dissect/ntfs/usnjrnl.py b/dissect/ntfs/usnjrnl.py index 3553caa..807c427 100644 --- a/dissect/ntfs/usnjrnl.py +++ b/dissect/ntfs/usnjrnl.py @@ -1,18 +1,20 @@ from __future__ import annotations -from datetime import datetime from functools import cached_property -from typing import TYPE_CHECKING, Any, BinaryIO, Iterator, Optional +from typing import TYPE_CHECKING, Any, BinaryIO from dissect.util.stream import RunlistStream from dissect.util.ts import wintimestamp from dissect.ntfs.c_ntfs import USN_PAGE_SIZE, c_ntfs, segment_reference from dissect.ntfs.exceptions import Error -from dissect.ntfs.mft import MftRecord from dissect.ntfs.util import ts_to_ns if TYPE_CHECKING: + from collections.abc import Iterator + from datetime import datetime + + from dissect.ntfs.mft import MftRecord from dissect.ntfs.ntfs import NTFS @@ -24,7 +26,7 @@ class UsnJrnl: ntfs: An optional :class:`~dissect.ntfs.ntfs.NTFS` class instance, used for resolving file paths. """ - def __init__(self, fh: BinaryIO, ntfs: Optional[NTFS] = None): + def __init__(self, fh: BinaryIO, ntfs: NTFS | None = None): self.fh = fh self.ntfs = ntfs @@ -105,13 +107,13 @@ def __getattr__(self, attr: str) -> Any: return getattr(self.record, attr) @cached_property - def file(self) -> Optional[MftRecord]: + def file(self) -> MftRecord | None: if self.usnjrnl.ntfs and self.usnjrnl.ntfs.mft: return self.usnjrnl.ntfs.mft(self.record.FileReferenceNumber) return None @cached_property - def parent(self) -> Optional[MftRecord]: + def parent(self) -> MftRecord | None: if self.usnjrnl.ntfs and self.usnjrnl.ntfs.mft: return self.usnjrnl.ntfs.mft(self.record.ParentFileReferenceNumber) return None @@ -133,12 +135,10 @@ def full_path(self) -> str: ref = segment_reference(self.record.ParentFileReferenceNumber) if parent is None: - parent_path = ( - f"" - ) + parent_path = f"" elif parent.header.SequenceNumber == self.record.ParentFileReferenceNumber.SequenceNumber: parent_path = parent.full_path() else: - parent_path = f"" + parent_path = f"" - return "\\".join([parent_path, self.filename]) + return f"{parent_path}\\{self.filename}" diff --git a/dissect/ntfs/util.py b/dissect/ntfs/util.py index 2051eb0..4b0f302 100644 --- a/dissect/ntfs/util.py +++ b/dissect/ntfs/util.py @@ -2,7 +2,7 @@ import struct from collections import UserDict -from typing import TYPE_CHECKING, Any, BinaryIO, Optional, Union +from typing import TYPE_CHECKING, Any, BinaryIO from dissect.cstruct import Enum from dissect.util.stream import RunlistStream @@ -44,12 +44,12 @@ def __getattr__(self, attr: str) -> AttributeCollection: return super().__getattribute__(attr) - def __getitem__(self, item: Union[ATTRIBUTE_TYPE_CODE, int]) -> AttributeCollection: + def __getitem__(self, item: ATTRIBUTE_TYPE_CODE | int) -> AttributeCollection: if isinstance(item, Enum): item = item.value return self.data.get(item, AttributeCollection()) - def __contains__(self, key: Union[ATTRIBUTE_TYPE_CODE, int]) -> bool: + def __contains__(self, key: ATTRIBUTE_TYPE_CODE | int) -> bool: if isinstance(key, Enum): key = key.value return super().__contains__(key) @@ -132,13 +132,13 @@ def open(self, allocated: bool = False) -> BinaryIO: ntfs.cluster_size, attrs[0].header.compression_unit, ) - else: - return RunlistStream( - ntfs.fh, - runs, - size, - ntfs.cluster_size, - ) + + return RunlistStream( + ntfs.fh, + runs, + size, + ntfs.cluster_size, + ) def size(self, allocated: bool = False) -> int: """Retrieve the data stream size for this list of attributes. @@ -169,7 +169,7 @@ def dataruns(self) -> list[tuple[int, int]]: def _get_stream_attrs(self) -> list[Attribute]: return sorted((attr for attr in self if not attr.header.resident), key=lambda attr: attr.header.lowest_vcn) - def _get_dataruns(self, attrs: Optional[list[Attribute]] = None) -> list[tuple[int, int]]: + def _get_dataruns(self, attrs: list[Attribute] | None = None) -> list[tuple[int, int]]: attrs = attrs or self._get_stream_attrs() runs = [] @@ -232,10 +232,10 @@ def ensure_volume(ntfs: NTFS) -> None: VolumeNotAvailableError: If a volume is not available. """ if not ntfs or not ntfs.fh: - raise VolumeNotAvailableError() + raise VolumeNotAvailableError -def get_full_path(mft: Mft, name: str, parent: c_ntfs._MFT_SEGMENT_REFERENCE, seen: set[str] = None) -> str: +def get_full_path(mft: Mft, name: str, parent: c_ntfs._MFT_SEGMENT_REFERENCE, seen: set[str] | None = None) -> str: """Walk up parent file references to construct a full path. Args: @@ -264,7 +264,7 @@ def get_full_path(mft: Mft, name: str, parent: c_ntfs._MFT_SEGMENT_REFERENCE, se try: record = mft.get(parent_ref) if not record.filename: - raise FilenameNotAvailableError("No filename") + raise FilenameNotAvailableError("No filename") # noqa: TRY301 if record.header.SequenceNumber != parent.SequenceNumber: path.append(f"") diff --git a/pyproject.toml b/pyproject.toml index a9a502f..8cd8558 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,13 +41,56 @@ dev = [ "dissect.util>=3.0.dev,<4.0.dev", ] -[tool.black] +[tool.ruff] line-length = 120 +required-version = ">=0.9.0" -[tool.isort] -profile = "black" -known_first_party = ["dissect.ntfs"] -known_third_party = ["dissect"] +[tool.ruff.format] +docstring-code-format = true + +[tool.ruff.lint] +select = [ + "F", + "E", + "W", + "I", + "UP", + "YTT", + "ANN", + "B", + "C4", + "DTZ", + "T10", + "FA", + "ISC", + "G", + "INP", + "PIE", + "PYI", + "PT", + "Q", + "RSE", + "RET", + "SLOT", + "SIM", + "TID", + "TCH", + "PTH", + "PLC", + "TRY", + "FLY", + "PERF", + "FURB", + "RUF", +] +ignore = ["E203", "B904", "UP024", "ANN002", "ANN003", "ANN204", "ANN401", "SIM105", "TRY003"] + +[tool.ruff.lint.per-file-ignores] +"tests/docs/**" = ["INP001"] + +[tool.ruff.lint.isort] +known-first-party = ["dissect.ntfs"] +known-third-party = ["dissect"] [tool.setuptools] license-files = ["LICENSE", "COPYRIGHT"] diff --git a/tests/conftest.py b/tests/conftest.py index bb81f1e..7a22967 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,15 +1,20 @@ +from __future__ import annotations + import csv import gzip import io -import os -from typing import BinaryIO, Iterator +from pathlib import Path +from typing import TYPE_CHECKING, BinaryIO import pytest from dissect.util.stream import MappingStream +if TYPE_CHECKING: + from collections.abc import Iterator + -def absolute_path(filename: str) -> str: - return os.path.join(os.path.dirname(__file__), filename) +def absolute_path(filename: str) -> Path: + return Path(__file__).parent / filename def open_file_gz(name: str, mode: str = "rb") -> Iterator[BinaryIO]: @@ -43,7 +48,7 @@ def boot_2m_bin() -> Iterator[BinaryIO]: @pytest.fixture -def ntfs_fragmented_mft_fh() -> Iterator[BinaryIO]: +def ntfs_fragmented_mft_fh() -> BinaryIO: # Test data from https://github.com/msuhanov/ntfs-samples # This is from the file ntfs_extremely_fragmented_mft.raw which has, as the name implies, a heavily fragmented MFT # The entire file is way too large, so only take just enough data that we actually need to make dissect.ntfs happy @@ -55,4 +60,4 @@ def ntfs_fragmented_mft_fh() -> Iterator[BinaryIO]: buf = bytes.fromhex(data) stream.add(int(offset), len(buf), io.BytesIO(buf), 0) - yield stream + return stream diff --git a/tests/test_attr.py b/tests/test_attr.py index 43083a5..9b930e4 100644 --- a/tests/test_attr.py +++ b/tests/test_attr.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import pytest diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py index 6f1d857..8fe0d33 100644 --- a/tests/test_exceptions.py +++ b/tests/test_exceptions.py @@ -1,10 +1,12 @@ +from __future__ import annotations + import pytest from dissect.ntfs import exceptions @pytest.mark.parametrize( - "exc, std", + ("exc", "std"), [ (exceptions.FileNotFoundError, FileNotFoundError), (exceptions.IsADirectoryError, IsADirectoryError), diff --git a/tests/test_index.py b/tests/test_index.py index e37da27..1639ffe 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import io import struct from typing import BinaryIO diff --git a/tests/test_mft.py b/tests/test_mft.py index 0b430c0..6800775 100644 --- a/tests/test_mft.py +++ b/tests/test_mft.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import io from typing import BinaryIO diff --git a/tests/test_ntfs.py b/tests/test_ntfs.py index 8fb8f7e..b5c7a5b 100644 --- a/tests/test_ntfs.py +++ b/tests/test_ntfs.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from io import BytesIO from typing import BinaryIO diff --git a/tests/test_secure.py b/tests/test_secure.py index 008b44e..baafabf 100644 --- a/tests/test_secure.py +++ b/tests/test_secure.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import BinaryIO import pytest @@ -58,5 +60,5 @@ def test_secure_complex_acl(sds_complex_bin: BinaryIO) -> None: def test_secure_fail() -> None: - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="Either record or SDS stream is required"): Secure() diff --git a/tests/test_usnjrnl.py b/tests/test_usnjrnl.py index 6a33916..8e8aaac 100644 --- a/tests/test_usnjrnl.py +++ b/tests/test_usnjrnl.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from io import BytesIO from dissect.ntfs.usnjrnl import UsnRecord diff --git a/tests/test_util.py b/tests/test_util.py index df7eb87..a154ed9 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dissect.ntfs.attr import Attribute from dissect.ntfs.c_ntfs import ATTRIBUTE_TYPE_CODE from dissect.ntfs.util import AttributeMap, apply_fixup @@ -7,7 +9,7 @@ def test_fixup() -> None: buf = bytearray( b"FILE\x30\x00" + (b"\x00" * 42) - + b"\x02\x00\xFF\x00\xFE\x00" + + b"\x02\x00\xff\x00\xfe\x00" + (b"\x00" * 456) + b"\x02\x00" + (b"\x00" * 510) @@ -15,13 +17,13 @@ def test_fixup() -> None: ) fixed = apply_fixup(buf) - assert fixed[510:512] == b"\xFF\x00" - assert fixed[1022:1024] == b"\xFE\x00" + assert fixed[510:512] == b"\xff\x00" + assert fixed[1022:1024] == b"\xfe\x00" buf = bytearray( b"FILE\x30\x00" + (b"\x00" * 42) - + b"\x02\x00\xFF\x00\xFE\x00\xFD\x00\xFC\x00" + + b"\x02\x00\xff\x00\xfe\x00\xfd\x00\xfc\x00" + (b"\x00" * 452) + b"\x02\x00" + (b"\x00" * 510) @@ -33,10 +35,10 @@ def test_fixup() -> None: ) fixed = apply_fixup(buf) - assert fixed[510:512] == b"\xFF\x00" - assert fixed[1022:1024] == b"\xFE\x00" - assert fixed[1534:1536] == b"\xFD\x00" - assert fixed[2046:2048] == b"\xFC\x00" + assert fixed[510:512] == b"\xff\x00" + assert fixed[1022:1024] == b"\xfe\x00" + assert fixed[1534:1536] == b"\xfd\x00" + assert fixed[2046:2048] == b"\xfc\x00" def test_attribute_map() -> None: diff --git a/tox.ini b/tox.ini index bfcf133..17e3629 100644 --- a/tox.ini +++ b/tox.ini @@ -32,32 +32,19 @@ commands = [testenv:fix] package = skip deps = - black==23.1.0 - isort==5.11.4 + ruff==0.9.2 commands = - black dissect tests - isort dissect tests + ruff format dissect tests [testenv:lint] package = skip deps = - black==23.1.0 - flake8 - flake8-black - flake8-isort - isort==5.11.4 + ruff==0.9.2 vermin commands = - flake8 dissect tests + ruff check dissect tests vermin -t=3.9- --no-tips --lint dissect tests -[flake8] -max-line-length = 120 -extend-ignore = - # See https://github.com/PyCQA/pycodestyle/issues/373 - E203, -statistics = True - [testenv:docs-build] allowlist_externals = make deps =