Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dissect/ntfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
"ACE",
"ACL",
"ATTRIBUTE_TYPE_CODE",
"NTFS",
"NTFS_SIGNATURE",
"Attribute",
"AttributeHeader",
"AttributeRecord",
"Index",
"IndexEntry",
"Mft",
"MftRecord",
"NTFS",
"NTFS_SIGNATURE",
"Secure",
"SecurityDescriptor",
"UsnJrnl",
Expand Down
66 changes: 33 additions & 33 deletions dissect/ntfs/attr.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from __future__ import annotations

import io
from datetime import datetime
from typing import TYPE_CHECKING, Any, BinaryIO, Iterator, Optional
from typing import TYPE_CHECKING, Any, BinaryIO

from dissect.util.stream import RangeStream, RunlistStream
from dissect.util.ts import wintimestamp
Expand All @@ -18,6 +17,9 @@
from dissect.ntfs.util import ensure_volume, get_full_path, ts_to_ns

if TYPE_CHECKING:
from collections.abc import Iterator
from datetime import datetime

Check warning on line 21 in dissect/ntfs/attr.py

View check run for this annotation

Codecov / codecov/patch

dissect/ntfs/attr.py#L20-L21

Added lines #L20 - L21 were not covered by tests

from dissect.ntfs.mft import MftRecord


Expand All @@ -31,9 +33,9 @@
header: The AttributeHeader for this Attribute.
"""

__slots__ = ("record", "header", "attribute")
__slots__ = ("attribute", "header", "record")

def __init__(self, header: AttributeHeader, record: Optional[MftRecord] = None):
def __init__(self, header: AttributeHeader, record: MftRecord | None = None):
self.header = header
self.record = record
self.attribute = None
Expand All @@ -52,7 +54,7 @@
return f"<${self.header.type.name} name={self.header.name}>"

@classmethod
def from_fh(cls, fh: BinaryIO, record: Optional[MftRecord] = None) -> Attribute:
def from_fh(cls, fh: BinaryIO, record: MftRecord | None = None) -> Attribute:
"""Parse an attribute from a file-like object.

Args:
Expand All @@ -62,7 +64,7 @@
return cls(AttributeHeader(fh, 0, record), record)

@classmethod
def from_bytes(cls, data: bytes, record: Optional[MftRecord] = None) -> Attribute:
def from_bytes(cls, data: bytes, record: MftRecord | None = None) -> Attribute:
"""Parse an attribute from bytes.

Args:
Expand Down Expand Up @@ -120,9 +122,9 @@
offset: The offset in the file-like object to parse an attribute header from.
"""

__slots__ = ("record", "fh", "offset", "header")
__slots__ = ("fh", "header", "offset", "record")

def __init__(self, fh: BinaryIO, offset: int, record: Optional[MftRecord] = None):
def __init__(self, fh: BinaryIO, offset: int, record: MftRecord | None = None):
self.fh = fh
self.offset = offset
self.record = record
Expand All @@ -134,7 +136,7 @@
return f"<${self.type.name} size={self.size}>"

@classmethod
def from_bytes(cls, data: bytes, record: Optional[MftRecord] = None) -> AttributeHeader:
def from_bytes(cls, data: bytes, record: MftRecord | None = None) -> AttributeHeader:
"""Parse an attribute header from bytes.

Args:
Expand Down Expand Up @@ -175,22 +177,22 @@
return self.header.Form.Resident.ValueLength if self.resident else self.header.Form.Nonresident.FileSize

@property
def allocated_size(self) -> Optional[int]:
def allocated_size(self) -> int | None:
"""Return the allocated size if non-resident, else None."""
return self.header.Form.Nonresident.AllocatedLength if not self.resident else None

@property
def lowest_vcn(self) -> Optional[int]:
def lowest_vcn(self) -> int | None:
"""Return the lowest VCN if non-resident, else None."""
return self.header.Form.Nonresident.LowestVcn if not self.resident else None

@property
def highest_vcn(self) -> Optional[int]:
def highest_vcn(self) -> int | None:
"""Return the highest VCN if non-resident, else None."""
return self.header.Form.Nonresident.HighestVcn if not self.resident else None

@property
def compression_unit(self) -> Optional[int]:
def compression_unit(self) -> int | None:
"""Return the compression unit if non-resident, else None."""
return self.header.Form.Nonresident.CompressionUnit if not self.resident else None

Expand Down Expand Up @@ -242,16 +244,16 @@
self.offset + self.header.Form.Resident.ValueOffset,
self.size,
)
else:
ntfs = self.record.ntfs if self.record else None
ensure_volume(ntfs)

return RunlistStream(
ntfs.fh,
self.dataruns(),
self.size,
ntfs.cluster_size,
)
ntfs = self.record.ntfs if self.record else None
ensure_volume(ntfs)

return RunlistStream(
ntfs.fh,
self.dataruns(),
self.size,
ntfs.cluster_size,
)

def data(self) -> bytes:
"""Read and return all the data of this attribute.
Expand All @@ -272,13 +274,11 @@

__slots__ = ("record",)

def __init__(self, fh: BinaryIO, record: Optional[MftRecord] = None):
def __init__(self, fh: BinaryIO, record: MftRecord | None = None):
self.record = record

@classmethod
def from_fh(
cls, fh: BinaryIO, attr_type: ATTRIBUTE_TYPE_CODE, record: Optional[MftRecord] = None
) -> AttributeRecord:
def from_fh(cls, fh: BinaryIO, attr_type: ATTRIBUTE_TYPE_CODE, record: MftRecord | None = None) -> AttributeRecord:
"""Parse an attribute from a file-like object.

Selects a more specific :class:`AttributeRecord` class if one is available for the given attribute type.
Expand All @@ -296,7 +296,7 @@

__slots__ = ("entries",)

def __init__(self, fh: BinaryIO, record: Optional[MftRecord] = None):
def __init__(self, fh: BinaryIO, record: MftRecord | None = None):
super().__init__(fh, record)

offset = 0
Expand Down Expand Up @@ -343,7 +343,7 @@

__slots__ = ("attr",)

def __init__(self, fh: BinaryIO, record: Optional[MftRecord] = None):
def __init__(self, fh: BinaryIO, record: MftRecord | None = None):
super().__init__(fh, record)
# Data can be less than the _STANDARD_INFORMATION structure size, so pad remaining fields with null bytes
data = fh.read().ljust(len(c_ntfs._STANDARD_INFORMATION), b"\x00")
Expand Down Expand Up @@ -413,7 +413,7 @@

__slots__ = ("attr",)

def __init__(self, fh: BinaryIO, record: Optional[MftRecord] = None):
def __init__(self, fh: BinaryIO, record: MftRecord | None = None):
super().__init__(fh, record)
data = fh.read().ljust(len(c_ntfs.STANDARD_INFORMATION_EX), b"\x00")
self.attr = c_ntfs._FILE_NAME(data)
Expand Down Expand Up @@ -489,9 +489,9 @@
class ReparsePoint(AttributeRecord):
"""Specific :class:`AttributeRecord` parser for ``$REPARSE_POINT``."""

__slots__ = ("attr", "tag_header", "buffer")
__slots__ = ("attr", "buffer", "tag_header")

def __init__(self, fh: BinaryIO, record: Optional[MftRecord] = None):
def __init__(self, fh: BinaryIO, record: MftRecord | None = None):
super().__init__(fh, record)
self.attr = c_ntfs._REPARSE_DATA_BUFFER(fh)
data = io.BytesIO(fh.read(self.attr.ReparseDataLength))
Expand All @@ -512,7 +512,7 @@
return self.attr.ReparseTag

@property
def substitute_name(self) -> Optional[str]:
def substitute_name(self) -> str | None:
if not self.tag_header:
return None

Expand All @@ -521,7 +521,7 @@
return self.buffer[offset : offset + length].decode("utf-16-le")

@property
def print_name(self) -> Optional[str]:
def print_name(self) -> str | None:
if not self.tag_header:
return None

Expand Down
3 changes: 3 additions & 0 deletions dissect/ntfs/c_ntfs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import struct

from dissect.cstruct import cstruct
Expand Down Expand Up @@ -652,3 +654,4 @@
for i in range(size):
if value & (1 << i):
return i
return 0

Check warning on line 657 in dissect/ntfs/c_ntfs.py

View check run for this annotation

Codecov / codecov/patch

dissect/ntfs/c_ntfs.py#L657

Added line #L657 was not covered by tests
25 changes: 12 additions & 13 deletions dissect/ntfs/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io
from enum import Enum, auto
from functools import cached_property, lru_cache
from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Iterator, Optional
from typing import TYPE_CHECKING, Any, BinaryIO, Callable

from dissect.ntfs.attr import AttributeRecord
from dissect.ntfs.c_ntfs import (
Expand All @@ -24,6 +24,8 @@
from dissect.ntfs.util import apply_fixup

if TYPE_CHECKING:
from collections.abc import Iterator

Check warning on line 27 in dissect/ntfs/index.py

View check run for this annotation

Codecov / codecov/patch

dissect/ntfs/index.py#L27

Added line #L27 was not covered by tests

from dissect.ntfs.mft import MftRecord


Expand Down Expand Up @@ -79,7 +81,7 @@
return IndexBuffer(self, self._index_stream, vcn << self._vcn_size_shift, self.root.bytes_per_index_buffer)

def search(
self, value: Any, exact: bool = True, cmp: Optional[Callable[[IndexEntry, Any], Match]] = None
self, value: Any, exact: bool = True, cmp: Callable[[IndexEntry, Any], Match] | None = None
) -> IndexEntry:
"""Perform a binary search on this index.

Expand Down Expand Up @@ -114,8 +116,7 @@
entry = _bsearch(entries, search_value, cmp)
if not entry.is_node or (not entry.is_end and cmp(entry, search_value) == Match.Equal):
break
else:
entries = list(self.index_buffer(entry.node_vcn).entries())
entries = list(self.index_buffer(entry.node_vcn).entries())

if exact and (entry.is_end or cmp(entry, search_value) != Match.Equal):
raise KeyError(f"Value not found: {value}")
Expand Down Expand Up @@ -216,7 +217,7 @@
buf = fh.read(size)

if len(buf) != size:
raise EOFError()
raise EOFError

if buf[:4] != b"INDX":
raise BrokenIndexError("Broken INDX header")
Expand Down Expand Up @@ -264,7 +265,7 @@
"""
record = self.index.record
if not record or not record.ntfs or not record.ntfs.mft:
raise MftNotAvailableError()
raise MftNotAvailableError

Check warning on line 268 in dissect/ntfs/index.py

View check run for this annotation

Codecov / codecov/patch

dissect/ntfs/index.py#L268

Added line #L268 was not covered by tests

return record.ntfs.mft.get(segment_reference(self.header.FileReference))

Expand All @@ -284,7 +285,7 @@
return self.buf[offset : offset + self.header.DataLength]

@cached_property
def attribute(self) -> Optional[AttributeRecord]:
def attribute(self) -> AttributeRecord | None:
"""Return the :class:`dissect.ntfs.attr.AttributeRecord` of the attribute contained in this entry."""
if self.key_length and self.index.root.attribute_type:
return AttributeRecord.from_fh(
Expand Down Expand Up @@ -366,10 +367,9 @@

if value < test_value:
return Match.Less
elif value == test_value:
if value == test_value:
return Match.Equal
else:
return Match.Greater
return Match.Greater


def _cmp_ulong(entry: IndexEntry, value: int) -> Match:
Expand All @@ -380,7 +380,6 @@

if value < test_value:
return Match.Less
elif value == test_value:
if value == test_value:
return Match.Equal
else:
return Match.Greater
return Match.Greater
Loading
Loading