diff --git a/MANIFEST.in b/MANIFEST.in index 9ae349b..23519f8 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,4 @@ +exclude .gitattributes exclude .gitignore recursive-exclude .github/ * +recursive-exclude tests/_data/ * diff --git a/dissect/evidence/ewf/__init__.py b/dissect/evidence/ewf/__init__.py index 1be30e7..b09b7bf 100644 --- a/dissect/evidence/ewf/__init__.py +++ b/dissect/evidence/ewf/__init__.py @@ -3,18 +3,16 @@ from dissect.evidence.ewf.c_ewf import c_ewf from dissect.evidence.ewf.ewf import ( EWF, - EWFError, - EWFStream, HeaderSection, SectionDescriptor, Segment, TableSection, VolumeSection, ) +from dissect.evidence.ewf.stream import EWFStream __all__ = [ "EWF", - "EWFError", "EWFStream", "HeaderSection", "SectionDescriptor", diff --git a/dissect/evidence/ewf/c_ewf.py b/dissect/evidence/ewf/c_ewf.py index 84c21ea..2a72c56 100644 --- a/dissect/evidence/ewf/c_ewf.py +++ b/dissect/evidence/ewf/c_ewf.py @@ -25,65 +25,95 @@ }; typedef struct { - char signature[8]; - uint8 fields_start; - uint16 segment_number; - uint16 fields_end; -} EWFHeader; + char signature[8]; + uint8 fields_start; + uint16 segment_number; + uint16 fields_end; +} SegmentHeader; typedef struct { - char type[16]; - uint64 next; - uint64 size; - uint8 pad[40]; - uint32 checksum; -} EWFSectionDescriptor; + char type[16]; + uint64 next; + uint64 size; + uint8 pad[40]; + uint32 checksum; +} SectionDescriptor; typedef struct { - uint32 reserved_1; - uint32 chunk_count; - uint32 sector_count; - uint32 sector_size; - uint32 total_sector_count; - uint8 reserved[20]; - uint8 pad[45]; - char signature[5]; - uint32 checksum; -} EWFVolumeSectionSpec; + uint32 reserved_1; + uint32 number_of_chunks; + uint32 sectors_per_chunk; + uint32 bytes_per_sector; + uint32 number_of_sectors; + uint8 reserved[20]; + uint8 pad[45]; + char signature[5]; + uint32 checksum; +} VolumeSectionSmart; typedef struct { MediaType media_type; uint8 reserved_1[3]; - uint32 chunk_count; - uint32 sector_count; - uint32 sector_size; - uint64 total_sector_count; - uint32 num_cylinders; - uint32 num_heads; - uint32 num_sectors; + uint32 number_of_chunks; + uint32 sectors_per_chunk; + uint32 bytes_per_sector; + uint64 number_of_sectors; + uint32 chs_cylinders; + uint32 chs_heads; + uint32 chs_sectors; uint8 media_flags; uint8 unknown_1[3]; - uint32 palm_start_sector; + uint32 palm_volume_start_sector; uint32 unknown_2; - uint32 smart_start_sector; + uint32 smart_logs_start_sector; CompressionLevel compression_level; uint8 unknown_3[3]; uint32 error_granularity; uint32 unknown_4; - uint8 uuid[16]; + uint8 set_identifier[16]; uint8 pad[963]; char signature[5]; uint32 checksum; -} EWFVolumeSection; +} VolumeSection; + +typedef struct { + MediaType media_type; + uint8 unknown1[3]; + uint32 number_of_chunks; + uint32 sectors_per_chunk; + uint32 bytes_per_sector; + uint64 number_of_sectors; + uint32 chs_cylinders; + uint32 chs_heads; + uint32 chs_sectors; + MediaFlags media_flags; + uint8 unknown2[3]; + uint32 palm_volume_start_sector; + uint32 unknown3; + uint32 smart_logs_start_sector; + CompressionLevel compression_level; + uint8 unknown4[3]; + uint32 error_granularity; + uint32 unknown5; + uint8 set_identifier[16]; + char pad[963]; + char signature[5]; + uint32 checksum; +} DataSection; + +typedef struct { + uint32 number_of_entries; + uint32 _; + uint64 base_offset; + uint32 _; + uint32 checksum; +} TableSection; typedef struct { - uint32 num_entries; - uint32 _; - uint64 base_offset; - uint32 _; - uint32 checksum; - uint32 entries[num_entries]; -} EWFTableSection; + char md5[16]; + char unknown1[16]; + uint32 checksum; +} HashSection; """ c_ewf = cstruct().load(ewf_def) diff --git a/dissect/evidence/ewf/c_ewf.pyi b/dissect/evidence/ewf/c_ewf.pyi index f100653..6388ceb 100644 --- a/dissect/evidence/ewf/c_ewf.pyi +++ b/dissect/evidence/ewf/c_ewf.pyi @@ -22,7 +22,7 @@ class _c_ewf(__cs__.cstruct): Good = ... Best = ... - class EWFHeader(__cs__.Structure): + class SegmentHeader(__cs__.Structure): signature: __cs__.CharArray fields_start: _c_ewf.uint8 segment_number: _c_ewf.uint16 @@ -38,7 +38,7 @@ class _c_ewf(__cs__.cstruct): @overload def __init__(self, fh: bytes | memoryview | bytearray | BinaryIO, /): ... - class EWFSectionDescriptor(__cs__.Structure): + class SectionDescriptor(__cs__.Structure): type: __cs__.CharArray next: _c_ewf.uint64 size: _c_ewf.uint64 @@ -56,12 +56,12 @@ class _c_ewf(__cs__.cstruct): @overload def __init__(self, fh: bytes | memoryview | bytearray | BinaryIO, /): ... - class EWFVolumeSectionSpec(__cs__.Structure): + class VolumeSectionSmart(__cs__.Structure): reserved_1: _c_ewf.uint32 - chunk_count: _c_ewf.uint32 - sector_count: _c_ewf.uint32 - sector_size: _c_ewf.uint32 - total_sector_count: _c_ewf.uint32 + number_of_chunks: _c_ewf.uint32 + sectors_per_chunk: _c_ewf.uint32 + bytes_per_sector: _c_ewf.uint32 + number_of_sectors: _c_ewf.uint32 reserved: __cs__.Array[_c_ewf.uint8] pad: __cs__.Array[_c_ewf.uint8] signature: __cs__.CharArray @@ -70,10 +70,10 @@ class _c_ewf(__cs__.cstruct): def __init__( self, reserved_1: _c_ewf.uint32 | None = ..., - chunk_count: _c_ewf.uint32 | None = ..., - sector_count: _c_ewf.uint32 | None = ..., - sector_size: _c_ewf.uint32 | None = ..., - total_sector_count: _c_ewf.uint32 | None = ..., + number_of_chunks: _c_ewf.uint32 | None = ..., + sectors_per_chunk: _c_ewf.uint32 | None = ..., + bytes_per_sector: _c_ewf.uint32 | None = ..., + number_of_sectors: _c_ewf.uint32 | None = ..., reserved: __cs__.Array[_c_ewf.uint8] | None = ..., pad: __cs__.Array[_c_ewf.uint8] | None = ..., signature: __cs__.CharArray | None = ..., @@ -82,26 +82,26 @@ class _c_ewf(__cs__.cstruct): @overload def __init__(self, fh: bytes | memoryview | bytearray | BinaryIO, /): ... - class EWFVolumeSection(__cs__.Structure): + class VolumeSection(__cs__.Structure): media_type: _c_ewf.MediaType reserved_1: __cs__.Array[_c_ewf.uint8] - chunk_count: _c_ewf.uint32 - sector_count: _c_ewf.uint32 - sector_size: _c_ewf.uint32 - total_sector_count: _c_ewf.uint64 - num_cylinders: _c_ewf.uint32 - num_heads: _c_ewf.uint32 - num_sectors: _c_ewf.uint32 + number_of_chunks: _c_ewf.uint32 + sectors_per_chunk: _c_ewf.uint32 + bytes_per_sector: _c_ewf.uint32 + number_of_sectors: _c_ewf.uint64 + chs_cylinders: _c_ewf.uint32 + chs_heads: _c_ewf.uint32 + chs_sectors: _c_ewf.uint32 media_flags: _c_ewf.uint8 unknown_1: __cs__.Array[_c_ewf.uint8] - palm_start_sector: _c_ewf.uint32 + palm_volume_start_sector: _c_ewf.uint32 unknown_2: _c_ewf.uint32 - smart_start_sector: _c_ewf.uint32 + smart_logs_start_sector: _c_ewf.uint32 compression_level: _c_ewf.CompressionLevel unknown_3: __cs__.Array[_c_ewf.uint8] error_granularity: _c_ewf.uint32 unknown_4: _c_ewf.uint32 - uuid: __cs__.Array[_c_ewf.uint8] + set_identifier: __cs__.Array[_c_ewf.uint8] pad: __cs__.Array[_c_ewf.uint8] signature: __cs__.CharArray checksum: _c_ewf.uint32 @@ -110,23 +110,23 @@ class _c_ewf(__cs__.cstruct): self, media_type: _c_ewf.MediaType | None = ..., reserved_1: __cs__.Array[_c_ewf.uint8] | None = ..., - chunk_count: _c_ewf.uint32 | None = ..., - sector_count: _c_ewf.uint32 | None = ..., - sector_size: _c_ewf.uint32 | None = ..., - total_sector_count: _c_ewf.uint64 | None = ..., - num_cylinders: _c_ewf.uint32 | None = ..., - num_heads: _c_ewf.uint32 | None = ..., - num_sectors: _c_ewf.uint32 | None = ..., + number_of_chunks: _c_ewf.uint32 | None = ..., + sectors_per_chunk: _c_ewf.uint32 | None = ..., + bytes_per_sector: _c_ewf.uint32 | None = ..., + number_of_sectors: _c_ewf.uint64 | None = ..., + chs_cylinders: _c_ewf.uint32 | None = ..., + chs_heads: _c_ewf.uint32 | None = ..., + chs_sectors: _c_ewf.uint32 | None = ..., media_flags: _c_ewf.uint8 | None = ..., unknown_1: __cs__.Array[_c_ewf.uint8] | None = ..., - palm_start_sector: _c_ewf.uint32 | None = ..., + palm_volume_start_sector: _c_ewf.uint32 | None = ..., unknown_2: _c_ewf.uint32 | None = ..., - smart_start_sector: _c_ewf.uint32 | None = ..., + smart_logs_start_sector: _c_ewf.uint32 | None = ..., compression_level: _c_ewf.CompressionLevel | None = ..., unknown_3: __cs__.Array[_c_ewf.uint8] | None = ..., error_granularity: _c_ewf.uint32 | None = ..., unknown_4: _c_ewf.uint32 | None = ..., - uuid: __cs__.Array[_c_ewf.uint8] | None = ..., + set_identifier: __cs__.Array[_c_ewf.uint8] | None = ..., pad: __cs__.Array[_c_ewf.uint8] | None = ..., signature: __cs__.CharArray | None = ..., checksum: _c_ewf.uint32 | None = ..., @@ -134,20 +134,84 @@ class _c_ewf(__cs__.cstruct): @overload def __init__(self, fh: bytes | memoryview | bytearray | BinaryIO, /): ... - class EWFTableSection(__cs__.Structure): - num_entries: _c_ewf.uint32 + class DataSection(__cs__.Structure): + media_type: _c_ewf.MediaType + unknown1: __cs__.Array[_c_ewf.uint8] + number_of_chunks: _c_ewf.uint32 + sectors_per_chunk: _c_ewf.uint32 + bytes_per_sector: _c_ewf.uint32 + number_of_sectors: _c_ewf.uint64 + chs_cylinders: _c_ewf.uint32 + chs_heads: _c_ewf.uint32 + chs_sectors: _c_ewf.uint32 + media_flags: _c_ewf.MediaFlags + unknown2: __cs__.Array[_c_ewf.uint8] + palm_volume_start_sector: _c_ewf.uint32 + unknown3: _c_ewf.uint32 + smart_logs_start_sector: _c_ewf.uint32 + compression_level: _c_ewf.CompressionLevel + unknown4: __cs__.Array[_c_ewf.uint8] + error_granularity: _c_ewf.uint32 + unknown5: _c_ewf.uint32 + set_identifier: __cs__.Array[_c_ewf.uint8] + pad: __cs__.CharArray + signature: __cs__.CharArray + checksum: _c_ewf.uint32 + @overload + def __init__( + self, + media_type: _c_ewf.MediaType | None = ..., + unknown1: __cs__.Array[_c_ewf.uint8] | None = ..., + number_of_chunks: _c_ewf.uint32 | None = ..., + sectors_per_chunk: _c_ewf.uint32 | None = ..., + bytes_per_sector: _c_ewf.uint32 | None = ..., + number_of_sectors: _c_ewf.uint64 | None = ..., + chs_cylinders: _c_ewf.uint32 | None = ..., + chs_heads: _c_ewf.uint32 | None = ..., + chs_sectors: _c_ewf.uint32 | None = ..., + media_flags: _c_ewf.MediaFlags | None = ..., + unknown2: __cs__.Array[_c_ewf.uint8] | None = ..., + palm_volume_start_sector: _c_ewf.uint32 | None = ..., + unknown3: _c_ewf.uint32 | None = ..., + smart_logs_start_sector: _c_ewf.uint32 | None = ..., + compression_level: _c_ewf.CompressionLevel | None = ..., + unknown4: __cs__.Array[_c_ewf.uint8] | None = ..., + error_granularity: _c_ewf.uint32 | None = ..., + unknown5: _c_ewf.uint32 | None = ..., + set_identifier: __cs__.Array[_c_ewf.uint8] | None = ..., + pad: __cs__.CharArray | None = ..., + signature: __cs__.CharArray | None = ..., + checksum: _c_ewf.uint32 | None = ..., + ): ... + @overload + def __init__(self, fh: bytes | memoryview | bytearray | BinaryIO, /): ... + + class TableSection(__cs__.Structure): + number_of_entries: _c_ewf.uint32 _: _c_ewf.uint32 base_offset: _c_ewf.uint64 checksum: _c_ewf.uint32 - entries: __cs__.Array[_c_ewf.uint32] @overload def __init__( self, - num_entries: _c_ewf.uint32 | None = ..., + number_of_entries: _c_ewf.uint32 | None = ..., _: _c_ewf.uint32 | None = ..., base_offset: _c_ewf.uint64 | None = ..., checksum: _c_ewf.uint32 | None = ..., - entries: __cs__.Array[_c_ewf.uint32] | None = ..., + ): ... + @overload + def __init__(self, fh: bytes | memoryview | bytearray | BinaryIO, /): ... + + class HashSection(__cs__.Structure): + md5: __cs__.CharArray + unknown1: __cs__.CharArray + checksum: _c_ewf.uint32 + @overload + def __init__( + self, + md5: __cs__.CharArray | None = ..., + unknown1: __cs__.CharArray | None = ..., + checksum: _c_ewf.uint32 | None = ..., ): ... @overload def __init__(self, fh: bytes | memoryview | bytearray | BinaryIO, /): ... diff --git a/dissect/evidence/ewf/ewf.py b/dissect/evidence/ewf/ewf.py index e02ae41..d11259e 100644 --- a/dissect/evidence/ewf/ewf.py +++ b/dissect/evidence/ewf/ewf.py @@ -1,21 +1,20 @@ from __future__ import annotations -import logging -import os +import codecs import zlib from bisect import bisect_right -from functools import lru_cache +from functools import cached_property, lru_cache from pathlib import Path -from typing import BinaryIO - -from dissect.util.stream import AlignedStream +from typing import TYPE_CHECKING, BinaryIO +from dissect.evidence.adcrypt.adcrypt import ADCrypt, is_adcrypt from dissect.evidence.ewf import c_ewf -from dissect.evidence.exception import EWFError +from dissect.evidence.ewf.stream import EWFStream -log = logging.getLogger(__name__) -log.setLevel(os.getenv("DISSECT_LOG_EWF", "CRITICAL")) +if TYPE_CHECKING: + from types import TracebackType + from typing_extensions import Self MAX_OPEN_SEGMENTS = 128 @@ -29,7 +28,7 @@ def find_files(path: str | Path) -> list[Path]: ext = path.suffix if ext[1].upper() not in "ELS": - raise EWFError(f"Invalid EWF file: {path}") + raise ValueError(f"Invalid EWF file: {path}") ewfglob = f"[{ext[1]}-{'Z' if ext[1].isupper() else 'z'}]" if len(ext) == 4 else f"{ext[1]}[x-z]" @@ -37,50 +36,57 @@ def find_files(path: str | Path) -> list[Path]: class EWF: - """Expert Witness Disk Image Format.""" + """Expert Witness Disk Image Format. + + Args: + fh: A file handle, list of file handles, path or list of paths to the EWF segment files. + If a path is provided, all related segment files will be automatically discovered. + """ def __init__(self, fh: BinaryIO | list[BinaryIO] | Path | list[Path]): - fhs = [fh] if not isinstance(fh, list) else fh + fhs = find_files(fh) if isinstance(fh, Path) else [fh] if not isinstance(fh, list) else fh self.fh = fhs - self.header: HeaderSection = None - self.volume: VolumeSection = None - self._segments: dict[str, Segment] = {} - self._segment_offsets = [] - self._segment_lru = [] + self.header: HeaderSection | None = None + self.volume: VolumeSection | None = None - segment_offset = 0 + self.chunk_size = 0 + self.size = 0 - for i in range(len(fhs)): - try: - segment = self.segment(i) - except Exception: - log.exception("Failed to parse as EWF file: %s", fh) - continue + self._segments: dict[int, Segment] = {} + self._segment_lru: list[int] = [] - if segment.header and not self.header: - self.header = segment.header + self._chunk_lookup: list[int] = [] + self._chunk_map: list[tuple[int, int]] = [] - if segment.volume and not self.volume: - self.volume = segment.volume + if not self.fh: + raise ValueError("No segment files provided for EWF container") + + self.adcrypt = None - if segment_offset != 0: - self._segment_offsets.append(segment_offset) + first_segment = self.segment(0) + if is_adcrypt(first_segment.fh): + self.adcrypt = ADCrypt(first_segment.fh) + else: + self._open_ewf() - segment.offset = segment_offset * self.volume.sector_size - segment.sector_offset = segment_offset - segment_offset += segment.sector_count + self.read_chunk = lru_cache(128)(self.read_chunk) - if not self.header or not self.volume: - raise EWFError(f"Failed to load EWF: {fh}") + def __enter__(self) -> Self: + return self - self.chunk_size = self.volume.sector_count * self.volume.sector_size + def __exit__( + self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None + ) -> None: + self.close() - max_size = self.volume.chunk_count * self.volume.sector_count * self.volume.sector_size - last_table = self.segment(len(self.fh) - 1).tables[-1] - last_chunk_size = len(last_table.read_chunk(last_table.num_entries - 1)) + def is_adcrypt(self) -> bool: + """Return whether the AD1 container is ADCRYPT encrypted.""" + return self.adcrypt is not None - self.size = max_size - (self.chunk_size - last_chunk_size) + def is_locked(self) -> bool: + """Return whether the ADCRYPT container is locked.""" + return self.is_adcrypt() and self.adcrypt.is_locked() def segment(self, idx: int) -> Segment: # Poor mans LRU @@ -93,274 +99,370 @@ def segment(self, idx: int) -> Segment: oldest_idx = self._segment_lru.pop(0) oldest_segment = self._segments.pop(oldest_idx) - # Don't close it if we received it as a file-like object - if not hasattr(self.fh[oldest_idx], "read"): + # Only close file handles that we opened ourselves + if isinstance(self.fh[oldest_idx], Path): oldest_segment.fh.close() del oldest_segment fh = self.fh[idx] - if not hasattr(fh, "read"): - fh = fh.open("rb") if isinstance(fh, Path) else Path(fh).open("rb") # noqa: SIM115 - - segment = Segment(self, fh) - if self.volume and 0 < idx <= len(self._segment_offsets): - # We already have a known segment offset for this segment, so set it back - segment_offset = self._segment_offsets[idx - 1] - segment.offset = segment_offset * self.volume.sector_size - segment.sector_offset = segment_offset - else: - # Otherwise we're in the initialization loop (or we're idx == 0) - segment.offset = 0 - segment.sector_offset = 0 + if isinstance(fh, Path): + fh = fh.open("rb") + + if self.is_adcrypt() and not self.is_locked(): + fh = self.adcrypt.wrap(fh, idx) + segment = Segment(fh) self._segments[idx] = segment self._segment_lru.append(idx) return segment - def open(self) -> BinaryIO: - return EWFStream(self) + def unlock(self, *, passphrase: str | bytes | None = None, private_key: Path | bytes | None = None) -> None: + """Unlock the ADCRYPT container with a given passphrase or private key. + Args: + passphrase: The passphrase to unlock the container. + private_key: The private key to unlock the container. -class EWFStream(AlignedStream): - def __init__(self, ewf: EWF): - self.ewf = ewf - self.sector_size = self.ewf.volume.sector_size - super().__init__(ewf.size) + Raises: + RuntimeError: If required dependencies are missing. + ValueError: If unlocking failed. + """ + self.adcrypt.unlock(passphrase=passphrase, private_key=private_key) - def _read(self, offset: int, length: int) -> bytes: - result = [] + # Reset LRU + self._segments = {} + self._segment_lru = [] - sector_offset = offset // self.sector_size - sector_count = (length + self.sector_size - 1) // self.sector_size + # Open the EWF + self._open_ewf() - segment_idx = bisect_right(self.ewf._segment_offsets, sector_offset) + def _open_ewf(self) -> None: + """Open the EWF container and initialize the volume and chunk information.""" + self.volume = None - while sector_count > 0: - if segment_idx > len(self.ewf._segment_offsets): - raise EWFError(f"Missing EWF file for segment index: {segment_idx}") + chunk = 0 + for i in range(len(self.fh)): + segment = self.segment(i) + if segment.header.signature not in (b"EVF\x09\x0d\x0a\xff\x00", b"LVF\x09\x0d\x0a\xff\x00"): + raise ValueError(f"Invalid EWF signature in segment {i}, got {segment.header.signature!r}") - segment = self.ewf.segment(segment_idx) + if segment.number != i + 1: + raise ValueError(f"Invalid EWF segment number in segment {i}, got {segment.number}, expected {i + 1}") - segment_remaining_sectors = segment.sector_count - (sector_offset - segment.sector_offset) - segment_sectors = min(segment_remaining_sectors, sector_count) + if not self.volume and segment.volume: + self.volume = segment.volume + self.chunk_size = self.volume.sectors_per_chunk * self.volume.sector_size + elif not self.volume and not segment.volume: + raise ValueError("Missing expected volume section in first segment") - result.append(segment.read_sectors(sector_offset, segment_sectors)) - sector_offset += segment_sectors - sector_count -= segment_sectors + for j, table in enumerate(segment.tables): + chunk += table.number_of_entries + self._chunk_lookup.append(chunk) + self._chunk_map.append((i, j)) - segment_idx += 1 + max_size = self.volume.number_of_chunks * self.chunk_size + last_chunk = self.read_chunk(self.volume.number_of_chunks - 1) - return b"".join(result) + self.size = max_size - (self.chunk_size - len(last_chunk)) + def read_chunk(self, chunk: int) -> bytes: + """Read a chunk of data from the EWF container. -class Segment: - def __init__(self, ewf: EWF, fh: BinaryIO): - self.ewf = ewf - self.fh = fh + Args: + chunk: The chunk index to read. + """ + lookup_idx = bisect_right(self._chunk_lookup, chunk) + if lookup_idx >= len(self._chunk_map): + raise IndexError(f"Chunk {chunk} out of range") - fh.seek(0) - self.ewfheader = c_ewf.EWFHeader(fh) - self.header = ewf.header - self.volume = ewf.volume + segment_idx, table_idx = self._chunk_map[lookup_idx] - if self.ewfheader.signature not in (b"EVF\x09\x0d\x0a\xff\x00", b"LVF\x09\x0d\x0a\xff\x00"): - raise EWFError(f"Invalid signature, got {self.ewfheader.signature!r}") + segment = self.segment(segment_idx) + table = segment.tables[table_idx] - self.sections: list[SectionDescriptor] = [] - self.tables: list[TableSection] = [] - self.table_offsets = [] + chunk_offset = 0 if lookup_idx == 0 else self._chunk_lookup[lookup_idx - 1] + relative_chunk = chunk - chunk_offset - offset = 0 - sector_offset = 0 + entry = table.entries[relative_chunk] + offset_of_chunk = table.base_offset + (entry & 0x7FFFFFFF) + compressed = entry >> 31 == 1 - while True: - section = SectionDescriptor(fh) - self.sections.append(section) + # We don't know the chunk size, so try to determine it using the offset of the next chunk + # When it's the last chunk in the table though, this becomes trickier + # We have to check if the chunk data is preceding the table, or if it's contained within the table section + if relative_chunk == table.number_of_entries - 1: + # The chunk data is stored before the table section (probably in a sectors section) + if offset_of_chunk < table.descriptor.offset: + end_of_chunk = table.descriptor.offset + + # The chunk data is stored within the table section + elif offset_of_chunk < table.descriptor.offset + table.descriptor.size: + end_of_chunk = table.descriptor.offset + table.descriptor.size - if section.type in (b"header", b"header2") and not self.header: - self.header = HeaderSection(self, section) + else: + raise ValueError("Unknown size of last chunk") + else: + end_of_chunk = table.base_offset + (table.entries[relative_chunk + 1] & 0x7FFFFFFF) - if section.type in (b"disk", b"volume") and not self.volume: - self.volume = VolumeSection(self, section) + size_of_chunk = end_of_chunk - offset_of_chunk - if section.type == b"table": - table = TableSection(self, section) + # Uncompressed chunks have a 4 byte checksum at the end + if not compressed: + size_of_chunk -= 4 - if sector_offset != 0: - self.table_offsets.append(sector_offset) + segment.fh.seek(offset_of_chunk) + buf = segment.fh.read(size_of_chunk) + if compressed: + buf = zlib.decompress(buf) - table.offset = sector_offset * self.volume.sector_size - table.sector_offset = sector_offset - sector_offset += table.sector_count + return buf - self.tables.append(table) + def open(self) -> BinaryIO: + """Open a stream to read the EWF container contents.""" + if self.is_locked(): + raise ValueError("EWF container is locked by ADCRYPT") - if section.next == offset or section.type == b"done": - break + return EWFStream(self) - offset = section.next - fh.seek(offset) + def close(self) -> None: + """Close all segment file handles that we opened ourselves and clear the segment cache.""" + for idx, segment in self._segments.items(): + if not hasattr(self.fh[idx], "read"): + segment.fh.close() - self.chunk_count = sum([t.num_entries for t in self.tables]) - self.sector_count = self.chunk_count * self.volume.sector_count - self.size = self.chunk_count * self.volume.sector_count * self.volume.sector_size - self.sector_offset = None # Set later - self.offset = None # Set later + self._segments = {} + self._segment_lru = [] - def read_sectors(self, sector: int, count: int) -> bytes: - log.debug("Segment::read_sectors(0x%x, 0x%x)", sector, count) - segment_sector = sector - self.sector_offset - r = [] - table_idx = bisect_right(self.table_offsets, segment_sector) - while count > 0: - table = self.tables[table_idx] +class Segment: + """EWF segment.""" - table_remaining_sectors = table.sector_count - (segment_sector - table.sector_offset) - table_sectors = min(table_remaining_sectors, count) + def __init__(self, fh: BinaryIO): + self.fh = fh + + self.fh.seek(0) + self.header = c_ewf.SegmentHeader(fh) + self.number = self.header.segment_number + + @cached_property + def sections(self) -> list[Section]: + """Return all sections in this segment.""" + result = [] - r.append(table.read_sectors(segment_sector, table_sectors)) - segment_sector += table_sectors - count -= table_sectors + offset = len(c_ewf.SegmentHeader) + while True: + self.fh.seek(offset) + section = Section.from_fh(self.fh) + result.append(section) + + if section.descriptor.next == offset or section.descriptor.type == b"done": + break - table_idx += 1 + offset = section.descriptor.next - return b"".join(r) + return result + @cached_property + def headers(self) -> list[HeaderSection]: + """Return all header sections in this segment.""" + return [section for section in self.sections if isinstance(section, HeaderSection)] -class HeaderSection: - def __init__(self, segment: Segment, section: SectionDescriptor): - self.segment = segment - self.section = section + @cached_property + def tables(self) -> list[TableSection]: + """Return all table sections in this segment.""" + return [section for section in self.sections if type(section) is TableSection] # Ignore Table2Section - fh = segment.fh - fh.seek(section.data_offset) - self.data = zlib.decompress(fh.read(section.size)) + @cached_property + def volume(self) -> VolumeSection | None: + """Return the volume section in this segment, if present.""" + for section in self.sections: + if isinstance(section, VolumeSection): + return section + return None - if self.data[0] in (b"\xff", b"\xfe"): - self.data = self.data.decode("utf16") + +class SectionDescriptor: + """EWF section descriptor.""" + + def __init__(self, fh: BinaryIO): + self.fh = fh + + self.offset = fh.tell() + self.descriptor = c_ewf.SectionDescriptor(fh) def __repr__(self) -> str: - return f"" + return ( + f"" + ) + @property + def type(self) -> str: + """The type of the section.""" + return self.descriptor.type.rstrip(b"\x00").decode() -class VolumeSection: - def __init__(self, segment: Segment, section: SectionDescriptor): - self.segment = segment - self.section = section + @property + def next(self) -> int: + """The offset of the next section.""" + return self.descriptor.next - fh = segment.fh - fh.seek(section.data_offset) - data = c_ewf.EWFVolumeSection(fh) if section.size == 1052 else c_ewf.EWFVolumeSectionSpec(fh) + @property + def size(self) -> int: + """The size of the section data.""" + return (self.descriptor.size - len(c_ewf.SectionDescriptor)) if self.descriptor.size else 0 - self.volume = data - self.chunk_count = data.chunk_count - self.sector_count = data.sector_count - self.sector_size = data.sector_size + @property + def data(self) -> bytes: + """The raw data of the section.""" + self.fh.seek(self.offset + len(c_ewf.SectionDescriptor)) + return self.fh.read(self.size) + @property + def checksum(self) -> int: + """The checksum of the section data.""" + return self.descriptor.checksum -class TableSection: - def __init__(self, segment: Segment, section: SectionDescriptor): - self.segment = segment - self.section = section - fh = segment.fh - fh.seek(section.data_offset) +class Section: + """EWF section.""" - self.header = c_ewf.EWFTableSection(fh) - self.num_entries = self.header.num_entries - self.base_offset = self.header.base_offset - self.entries = self.header.entries + def __init__(self, descriptor: SectionDescriptor): + self.descriptor = descriptor - self.sector_count = self.num_entries * self.segment.volume.sector_count - self.size = self.sector_count * self.segment.volume.sector_size - self.sector_offset = None # Set later - self.offset = None # Set later + def __repr__(self) -> str: + return f"<{self.__class__.__name__} type={self.descriptor.type!r} size={self.descriptor.size:#x}>" - self.read_chunk = lru_cache(1024)(self.read_chunk) + @classmethod + def from_fh( + cls, fh: BinaryIO + ) -> Section | HeaderSection | VolumeSection | DataSection | TableSection | Table2Section | HashSection: + """Open a section from a file-like object.""" + descriptor = SectionDescriptor(fh) - def read_chunk(self, chunk: int) -> bytes: - log.debug("TableSection::read_chunk(0x%x)", chunk) + if descriptor.type in ("header", "header2"): + return HeaderSection(descriptor) - chunk_entry = self.entries[chunk] - chunk_offset = self.base_offset + (chunk_entry & 0x7FFFFFFF) - compressed = chunk_entry >> 31 == 1 + if descriptor.type in ("disk", "volume"): + return VolumeSection(descriptor) - # EWF sucks - # We don't know the chunk size, so try to determine it using the offset of the next chunk - # When it's the last chunk in the table though, this becomes trickier. - # We have to check if the chunk data is preceding the table, or if it's contained within the table section - # Then we can calculate the chunk size using these offsets - if chunk + 1 == self.num_entries: - # The chunk data is stored before the table section - if chunk_offset < self.section.offset: - chunk_size = self.section.offset - chunk_offset - # The chunk data is stored within the table section - elif chunk_offset < self.section.offset + self.section.size: - chunk_size = self.section.offset + self.section.size - chunk_offset - else: - raise EWFError("Unknown size of last chunk") - else: - chunk_size = self.base_offset + (self.entries[chunk + 1] & 0x7FFFFFFF) - chunk_offset + if descriptor.type == "data": + return DataSection(descriptor) - # Non compressed chunks have a 4 byte checksum - if not compressed: - chunk_size -= 4 + if descriptor.type in ("table"): + return TableSection(descriptor) - self.segment.fh.seek(chunk_offset) - buf = self.segment.fh.read(chunk_size) + if descriptor.type == "table2": + return Table2Section(descriptor) - if compressed: - buf = zlib.decompress(buf) + if descriptor.type == "hash": + return HashSection(descriptor) - return buf + return Section(descriptor) - def read_sectors(self, sector: int, count: int) -> bytes: - log.debug("TableSection::read_sectors(0x%x, 0x%x)", sector, count) - result = [] + @property + def type(self) -> str: + """The type of the section.""" + return self.descriptor.type - chunk_sector_count = self.segment.volume.sector_count - sector_size = self.segment.volume.sector_size + @cached_property + def data(self) -> bytes: + """The raw data of the section.""" + return self.descriptor.data - table_sector = sector - self.sector_offset - table_chunk = table_sector // chunk_sector_count - while count > 0: - table_sector_offset = table_sector % chunk_sector_count - chunk_remaining_sectors = chunk_sector_count - table_sector_offset - table_sectors = min(chunk_remaining_sectors, count) +class HeaderSection(Section): + """EWF header section.""" - chunk_pos = table_sector_offset * sector_size - chunk_end = chunk_pos + (table_sectors * sector_size) + @cached_property + def data(self) -> str: + """The header data.""" + data = zlib.decompress(self.descriptor.data) + return data.decode("utf16") if data[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE) else data.decode() - buf = self.read_chunk(table_chunk) - if chunk_pos != 0 or table_sectors != chunk_sector_count: - buf = buf[chunk_pos:chunk_end] - result.append(buf) - count -= table_sectors - table_sector += table_sectors - table_chunk += 1 +class VolumeSection(Section): + """EWF volume section.""" - return b"".join(result) + @cached_property + def data(self) -> c_ewf.VolumeSection | c_ewf.VolumeSectionSmart: + """The volume data.""" + return (c_ewf.VolumeSection if self.descriptor.size == len(c_ewf.VolumeSection) else c_ewf.VolumeSectionSmart)( + self.descriptor.data + ) + @property + def number_of_chunks(self) -> int: + """The number of chunks in the volume.""" + return self.data.number_of_chunks -class SectionDescriptor: - def __init__(self, fh: BinaryIO): - self.fh = fh + @property + def sectors_per_chunk(self) -> int: + """The number of sectors per chunk.""" + return self.data.sectors_per_chunk + + @property + def sector_size(self) -> int: + """The size of a sector in bytes.""" + return self.data.bytes_per_sector - self.offset = fh.tell() - descriptor = c_ewf.EWFSectionDescriptor(fh) - self.type = descriptor.type.rstrip(b"\x00") - self.next = descriptor.next - self.size = descriptor.size - len(c_ewf.EWFSectionDescriptor) - self.checksum = descriptor.checksum - self.data_offset = fh.tell() + +class DataSection(Section): + """EWF data section.""" + + @cached_property + def data(self) -> c_ewf.DataSection: + """The data section.""" + return c_ewf.DataSection(self.descriptor.data) + + +class TableSection(Section): + """EWF table section.""" def __repr__(self) -> str: return ( - f"" + f"<{self.__class__.__name__} " + f"type={self.descriptor.type!r} size={self.descriptor.size:#x} " + f"number_of_entries={self.number_of_entries} base_offset={self.base_offset:#x}>" ) + + @cached_property + def data(self) -> c_ewf.TableSection: + """The table section.""" + self.descriptor.fh.seek(self.descriptor.offset + len(c_ewf.SectionDescriptor)) + return c_ewf.TableSection(self.descriptor.fh) + + @cached_property + def entries(self) -> list[int]: + """The table entries.""" + self.descriptor.fh.seek(self.descriptor.offset + len(c_ewf.SectionDescriptor) + len(c_ewf.TableSection)) + return c_ewf.uint32[self.number_of_entries](self.descriptor.fh) + + @property + def number_of_entries(self) -> int: + """The number of entries in the table.""" + return self.data.number_of_entries + + @property + def base_offset(self) -> int: + """The base offset of the table.""" + return self.data.base_offset + + +class Table2Section(TableSection): + """EWF table2 section.""" + + +class HashSection(Section): + """EWF hash section.""" + + @cached_property + def data(self) -> c_ewf.HashSection: + """The hash section.""" + return c_ewf.HashSection(self.descriptor.data) + + @property + def md5(self) -> str: + """The MD5 hash of the section.""" + return self.data.md5.hex() diff --git a/dissect/evidence/ewf/stream.py b/dissect/evidence/ewf/stream.py new file mode 100644 index 0000000..ef73623 --- /dev/null +++ b/dissect/evidence/ewf/stream.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from dissect.util.stream import AlignedStream + +if TYPE_CHECKING: + from dissect.evidence.ewf.ewf import EWF + + +class EWFStream(AlignedStream): + """Provide a stitched stream over all EWF segments.""" + + def __init__(self, ewf: EWF): + self.ewf = ewf + super().__init__(ewf.size, ewf.chunk_size) + + def _read(self, offset: int, length: int) -> bytes: + result = [] + + chunk, offset_in_chunk = divmod(offset, self.ewf.chunk_size) + + while length > 0: + buf = self.ewf.read_chunk(chunk) + read_size = min(length, self.ewf.chunk_size - offset_in_chunk) + result.append(buf[offset_in_chunk : offset_in_chunk + read_size]) + + offset_in_chunk = 0 + offset += read_size + length -= read_size + chunk += 1 + + return b"".join(result) diff --git a/dissect/evidence/exception.py b/dissect/evidence/exception.py index 50130b0..e098f0d 100644 --- a/dissect/evidence/exception.py +++ b/dissect/evidence/exception.py @@ -18,10 +18,6 @@ class NotASymlinkError(Error): pass -class EWFError(Error): - """Related to EWF (Expert Witness disk image Format)""" - - class InvalidSnapshot(Error): """Related to ASDF (Acquire Snapshot Data Format)""" diff --git a/pyproject.toml b/pyproject.toml index 9a09d58..4f8adae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ test = [ lint = [ "ruff==0.13.1", "vermin", + "typing_extensions", ] build = [ "build", diff --git a/tests/_data/ewf/encrypted-certificate/encrypted.E01 b/tests/_data/ewf/encrypted-certificate/encrypted.E01 new file mode 100644 index 0000000..fb35f66 --- /dev/null +++ b/tests/_data/ewf/encrypted-certificate/encrypted.E01 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:33754290c700d3e949a3e8cc6d3eed2a5f1f852b6583716289d2aff753c8f867 +size 13541 diff --git a/tests/_data/ewf/encrypted-certificate/encrypted.E01.txt b/tests/_data/ewf/encrypted-certificate/encrypted.E01.txt new file mode 100644 index 0000000..256f6cd --- /dev/null +++ b/tests/_data/ewf/encrypted-certificate/encrypted.E01.txt @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d57adb8bdb46ed43497627954c0031b0c77fb9ef4fe49ccc80670cc27d752fea +size 1566 diff --git a/tests/_data/ewf/encrypted-certificate/key b/tests/_data/ewf/encrypted-certificate/key new file mode 100644 index 0000000..55ec2fb --- /dev/null +++ b/tests/_data/ewf/encrypted-certificate/key @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:b25ae23283b944d75442d3e68251965360f332d90be0ff4a6e705f14172fb3e5 +size 1679 diff --git a/tests/_data/ewf/encrypted-passphrase/encrypted.E01 b/tests/_data/ewf/encrypted-passphrase/encrypted.E01 new file mode 100644 index 0000000..afa319f --- /dev/null +++ b/tests/_data/ewf/encrypted-passphrase/encrypted.E01 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5938acbd0605408a6167cd1423a3a6a599e0dfe23c2241a71ebd46695d6ea0fa +size 13541 diff --git a/tests/_data/ewf/encrypted-passphrase/encrypted.E01.txt b/tests/_data/ewf/encrypted-passphrase/encrypted.E01.txt new file mode 100644 index 0000000..e5219cd --- /dev/null +++ b/tests/_data/ewf/encrypted-passphrase/encrypted.E01.txt @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:b781b7a586752e8f33435137f71fa6ed578325209b10fb7081f95364a6acf7de +size 1574 diff --git a/tests/_data/ewf/ewf.E01 b/tests/_data/ewf/ewf.E01 deleted file mode 100644 index 964a591..0000000 --- a/tests/_data/ewf/ewf.E01 +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:b9b150a1f40024c7b0c3cf3c09cf809a8636419cab5f55ab4d4f78c918c1e082 -size 7630 diff --git a/tests/_data/ewf/segmented/image.E01 b/tests/_data/ewf/segmented/image.E01 new file mode 100644 index 0000000..69b1121 --- /dev/null +++ b/tests/_data/ewf/segmented/image.E01 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:dff882f955c5cccb79690e9f5be42125c8b5875052f1d3d1dda1d7de597fb6c8 +size 1018464 diff --git a/tests/_data/ewf/segmented/image.E02 b/tests/_data/ewf/segmented/image.E02 new file mode 100644 index 0000000..78fd7a2 --- /dev/null +++ b/tests/_data/ewf/segmented/image.E02 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9bd439cdbb5158c1bbc4c12ccac98dc874655573977214763895eb043004f32e +size 1017681 diff --git a/tests/_data/ewf/segmented/image.E03 b/tests/_data/ewf/segmented/image.E03 new file mode 100644 index 0000000..df39941 --- /dev/null +++ b/tests/_data/ewf/segmented/image.E03 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:35bc51ba09665794b3852cfe57b9415c7aca901185e64abcd22e90c8b6ebb361 +size 1017681 diff --git a/tests/_data/ewf/segmented/image.E04 b/tests/_data/ewf/segmented/image.E04 new file mode 100644 index 0000000..9b26c1f --- /dev/null +++ b/tests/_data/ewf/segmented/image.E04 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:b4823add0c226c42255be4c2cc09ee80cce4f03f107c8c2c7b84e48a46449fce +size 1017681 diff --git a/tests/_data/ewf/segmented/image.E05 b/tests/_data/ewf/segmented/image.E05 new file mode 100644 index 0000000..f4a9f74 --- /dev/null +++ b/tests/_data/ewf/segmented/image.E05 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:78844b5d5b80695ba6d8c0901c28820389ebd6bacb0330a934ca01ec06bcd8d1 +size 132761 diff --git a/tests/_data/ewf/single/image.E01 b/tests/_data/ewf/single/image.E01 new file mode 100644 index 0000000..04126db --- /dev/null +++ b/tests/_data/ewf/single/image.E01 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:fbc3da213475f9692be6bf00fb68eb87adaab3195c204f21ea42efc3b2b35097 +size 13344 diff --git a/tests/_tools/ewf/generate.sh b/tests/_tools/ewf/generate.sh new file mode 100755 index 0000000..5b6acee --- /dev/null +++ b/tests/_tools/ewf/generate.sh @@ -0,0 +1,77 @@ +#!/usr/bin/env bash +set -euo pipefail + +readonly SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +readonly TESTS_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +readonly OUT_DIR="${TESTS_ROOT}/_data/ewf" + +log() { printf '[INFO] %s\n' "$*" >&2; } +warn() { printf '[WARN] %s\n' "$*" >&2; } +error() { printf '[ERROR] %s\n' "$*" >&2; } + +have() { command -v "$1" >/dev/null 2>&1; } + +require_tools() { + local -a tools=(ewfacquirestream dd xxd) + local missing=0 + + for t in "${tools[@]}"; do + if ! have "$t"; then + error "Missing required tool: $t" + missing=1 + fi + done + + if (( missing != 0 )); then + error "One or more required tools are missing. Aborting." + exit 1 + fi +} + +pattern() { + local size="$1" + + stream() { + while true; do + for i in $(seq 0 255); do + printf "`printf '%02x' "${i}"`%.0s" {0..4095} + done + done + } + + stream | xxd -r -ps | head -c "${size}" || true + + # Add a final message to test unaligned sizes + echo -n "kusjes van SRT<3" +} + +generate() { + local name="$1" + local size="$2" + local split="$3" + local compression="$4" + + local outdir="${OUT_DIR}/${name}" + mkdir -p "${outdir}" + + pattern "${size}" | ewfacquirestream \ + -t "${outdir}/image" \ + -S "${split}" \ + -c "${compression}" \ + >/dev/null 2>/dev/null + + log "Generated test case: ${outdir}" +} + +main() { + require_tools + + mkdir -p "${OUT_DIR}" + + generate "single" "$((4 * 1024 * 1024))" "256M" best + generate "segmented" "$((4 * 1024 * 1024))" "1M" none + + log "All test cases generated under: ${OUT_DIR}" +} + +main "$@" diff --git a/tests/conftest.py b/tests/conftest.py index 3cfa66f..ecd01b5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -76,8 +76,29 @@ def ad1_encrypted_certificate() -> list[Path]: @pytest.fixture -def ewf_data() -> Iterator[BinaryIO]: - yield from open_data("_data/ewf/ewf.E01") +def ewf_single() -> Iterator[BinaryIO]: + yield from open_data("_data/ewf/single/image.E01") + + +@pytest.fixture +def ewf_segmented() -> list[Path]: + return [ + absolute_path("_data/ewf/segmented/image.E01"), + absolute_path("_data/ewf/segmented/image.E02"), + absolute_path("_data/ewf/segmented/image.E03"), + absolute_path("_data/ewf/segmented/image.E04"), + absolute_path("_data/ewf/segmented/image.E05"), + ] + + +@pytest.fixture +def ewf_encrypted_passphrase() -> Iterator[BinaryIO]: + yield from open_data("_data/ewf/encrypted-passphrase/encrypted.E01") + + +@pytest.fixture +def ewf_encrypted_certificate() -> Iterator[BinaryIO]: + yield from open_data("_data/ewf/encrypted-certificate/encrypted.E01") @pytest.fixture diff --git a/tests/test_ewf.py b/tests/test_ewf.py index c8f3daa..7ffc0d4 100644 --- a/tests/test_ewf.py +++ b/tests/test_ewf.py @@ -1,41 +1,90 @@ from __future__ import annotations from typing import TYPE_CHECKING, BinaryIO -from unittest.mock import MagicMock, patch + +import pytest from dissect.evidence.ewf import ewf +from tests._utils import absolute_path if TYPE_CHECKING: - import pytest + from pathlib import Path -def test_ewf(ewf_data: BinaryIO) -> None: - e = ewf.EWF(ewf_data) +def test_ewf(ewf_single: BinaryIO) -> None: + e = ewf.EWF(ewf_single) - assert e.size == 4097 - assert e.open().read(4097) == (b"\xde\xad\xbe\xef" * 1024) + b"\n" + assert e.size == (4 * 1024 * 1024) + 16 + _assert_pattern(e.open(), e.size) + with pytest.raises(IndexError, match="Chunk 1337 out of range"): + e.read_chunk(1337) -@patch("dissect.evidence.ewf.ewf.Segment") -def test_ewf_open_segment(MockSegment: MagicMock, monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr(ewf, "MAX_OPEN_SEGMENTS", 2) - mock_segment = MockSegment.return_value - mock_segment.volume.sector_size = 512 - mock_segment.sector_count = 2 +def test_ewf_segmented(ewf_segmented: list[Path]) -> None: + e = ewf.EWF(ewf_segmented) + + assert e.size == (4 * 1024 * 1024) + 16 + _assert_pattern(e.open(), e.size) - mock_fh = [MagicMock(), MagicMock(), MagicMock(), MagicMock()] - e = ewf.EWF(mock_fh) - assert e._segment_offsets == [2, 4, 6] - assert e._segment_lru == [2, 3] +def _assert_pattern(fh: BinaryIO, size: int) -> None: + for i in range(size // 4096): + expected = bytes([i % 256] * 4096) + assert fh.read(4096) == expected, f"Mismatch at offset {i * 4096:#x}" - tmp = e.segment(0) - assert tmp.offset == 0 - assert tmp.sector_offset == 0 - assert e._segment_lru == [3, 0] + if size % 4096: + assert fh.read() == b"kusjes van SRT<3" + + +def test_ewf_segment_lru(ewf_segmented: list[Path], monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(ewf, "MAX_OPEN_SEGMENTS", 2) - tmp = e.segment(1) - assert tmp.offset == 1024 - assert tmp.sector_offset == 2 + e = ewf.EWF(ewf_segmented) + assert e._segment_lru == [3, 4] + + e.segment(0) + assert e._segment_lru == [4, 0] + + e.segment(1) assert e._segment_lru == [0, 1] + + +def test_adcrypt_passphrase(ewf_encrypted_passphrase: BinaryIO) -> None: + """Test if we can decrypt ADCRYPT EWF images, in this example a single segment EWF image.""" + e = ewf.EWF(ewf_encrypted_passphrase) + + assert e.is_adcrypt() + assert e.is_locked() + + with pytest.raises(ValueError, match="EWF container is locked by ADCRYPT"): + e.open() + + with pytest.raises(ValueError, match="Unable to unlock: HMAC verification of passphrase failed"): + e.unlock(passphrase="asdf") + + e.unlock(passphrase="password") + + # FTK Imager strips our non-aligned footer... + assert e.size == (4 * 1024 * 1024) + _assert_pattern(e.open(), e.size) + + +def test_adcrypt_certificate(ewf_encrypted_certificate: BinaryIO) -> None: + """Test if we can decrypt ADCRYPT EWF images, in this example a single segment EWF image.""" + e = ewf.EWF(ewf_encrypted_certificate) + + assert e.is_adcrypt() + assert e.is_locked() + + with pytest.raises(ValueError, match="EWF container is locked by ADCRYPT"): + e.open() + + with pytest.raises(ValueError, match="Unable to unlock: HMAC verification of passphrase failed"): + e.unlock(passphrase="asdf") + + e.unlock(private_key=absolute_path("_data/ewf/encrypted-certificate/key")) + + # FTK Imager strips our non-aligned footer... + assert e.size == (4 * 1024 * 1024) + _assert_pattern(e.open(), e.size)