Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions dissect/util/compression/lzop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

import io
from typing import BinaryIO

from dissect.util.compression import lzo

HEADER_HAS_FILTER = 0x00000800


def decompress(src: bytes | BinaryIO) -> bytes:
"""LZOP decompress from a file-like object or bytes.

Args:
src: File-like object or bytes to decompress.

Returns:
The decompressed data.
"""
if not hasattr(src, "read"):
src = io.BytesIO(src)

if src.read(9) != b"\x89LZO\x00\x0d\x0a\x1a\x0a":
raise ValueError("Invalid LZOP header")

version = int.from_bytes(src.read(2), "big")
src.seek(5, io.SEEK_CUR) # Skip library version (2), 'need to be extracted' version (2) and method (1)
if version >= 0x0940:
src.seek(1, io.SEEK_CUR) # Skip level (1)

if int.from_bytes(src.read(4), "big") & HEADER_HAS_FILTER:
src.seek(4, io.SEEK_CUR) # Skip filter info (4)

src.seek(8, io.SEEK_CUR) # Skip mode (4) and mtime_low (4)
if version >= 0x0940:
src.seek(4, io.SEEK_CUR) # Skip mtime_high (4)

i = src.read(1)[0]
src.seek(i + 4, io.SEEK_CUR) # Skip filename and checksum

result = []
while True:
uncompressed_block_size = int.from_bytes(src.read(4), "big")

if uncompressed_block_size == 0:
break

compressed_block_size = int.from_bytes(src.read(4), "big")
src.seek(4, io.SEEK_CUR) # Skip checksum

buf = src.read(compressed_block_size)

if uncompressed_block_size == compressed_block_size:
# Uncompressed block
result.append(buf)
else:
# Compressed block
result.append(lzo.decompress(buf, header=False, buflen=uncompressed_block_size))

return b"".join(result)
61 changes: 61 additions & 0 deletions tests/compression/test_lzop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import annotations

import hashlib
from typing import TYPE_CHECKING

import pytest

from dissect.util.compression import lzop

if TYPE_CHECKING:
from pytest_benchmark.fixture import BenchmarkFixture


PARAMS = (
("data", "digest"),
[
pytest.param(
"894c5a4f000d0a1a0a104020a00940010503000001000081a4699f2a5700"
"00000005736d616c6c515206300000012c00000021884d72d90361626361"
"626320f314000f6162636162636162636162636162636162631100000000"
"0000",
"d9f5aeb06abebb3be3f38adec9a2e3b94228d52193be923eb4e24c9b56ee0930",
id="basic",
),
pytest.param(
"894c5a4f000d0a1a0a104020a00940010503000001000081a4699f2a6e00"
"000000056c61726765521e0639000036df00000213975059030000b34c6f"
"72656d20697073756d20646f6c6f722073697420616d657420636f6e7365"
"6374657475722061646970697363696e6720656c69742e20517569737175"
"652066617563696275732065782073617069656e2076697461652070656c"
"6c656e7465737175652073656d20706c6163657261742e20496e20696420"
"637572737573206d69207072657469756d2074656c6c7573206475697320"
"636f6e76616c6c69732e2054656d707573206c656f2065752061656e6561"
"6e20736564206469616d2075726e612074656d706f722e2050756c76696e"
"617220766976616d7573206672696e67696c6c61206c61637573206e6563"
"206d6574757320626962656e64756d20656765737461732e20496163756c"
"6973206d61737361206e69736c206d616c657375616461206c6163696e69"
"6120696e7465676572206e756e6320706f73756572652e2055742068656e"
"6472657269742073656d7065722076656c20636c61737320617074656e74"
"2074616369746920736f63696f7371752e204164206c69746f726120746f"
"727175656e742070657220636f6e75626961206e6f7374726120696e6365"
"70746f732068696d656e61656f732e0a0a4c6f72656d20697073756d2064"
"200000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000020d8060c746f"
"732068696d656e61656f732e0a11000000000000",
"73d3dd96ca2e2f0144a117019256d770ee7c6febeaee09b24956c723ae22b529",
id="large",
),
],
)


@pytest.mark.parametrize(*PARAMS)
def test_lzop_decompress(data: str, digest: str) -> None:
assert hashlib.sha256(lzop.decompress(bytes.fromhex(data))).hexdigest() == digest


@pytest.mark.benchmark
@pytest.mark.parametrize(*PARAMS)
def test_benchmark_lzop_decompress(data: str, digest: str, benchmark: BenchmarkFixture) -> None:
assert hashlib.sha256(benchmark(lzop.decompress, bytes.fromhex(data))).hexdigest() == digest
Loading