diff --git a/dissect/util/compression/lzop.py b/dissect/util/compression/lzop.py new file mode 100644 index 0000000..0e61ef6 --- /dev/null +++ b/dissect/util/compression/lzop.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import io +from typing import BinaryIO + +from dissect.util.compression import lzo + +HEADER_HAS_FILTER = 0x00000800 + + +def decompress(src: bytes | BinaryIO) -> bytes: + """LZOP decompress from a file-like object or bytes. + + Args: + src: File-like object or bytes to decompress. + + Returns: + The decompressed data. + """ + if not hasattr(src, "read"): + src = io.BytesIO(src) + + if src.read(9) != b"\x89LZO\x00\x0d\x0a\x1a\x0a": + raise ValueError("Invalid LZOP header") + + version = int.from_bytes(src.read(2), "big") + src.seek(5, io.SEEK_CUR) # Skip library version (2), 'need to be extracted' version (2) and method (1) + if version >= 0x0940: + src.seek(1, io.SEEK_CUR) # Skip level (1) + + if int.from_bytes(src.read(4), "big") & HEADER_HAS_FILTER: + src.seek(4, io.SEEK_CUR) # Skip filter info (4) + + src.seek(8, io.SEEK_CUR) # Skip mode (4) and mtime_low (4) + if version >= 0x0940: + src.seek(4, io.SEEK_CUR) # Skip mtime_high (4) + + i = src.read(1)[0] + src.seek(i + 4, io.SEEK_CUR) # Skip filename and checksum + + result = [] + while True: + uncompressed_block_size = int.from_bytes(src.read(4), "big") + + if uncompressed_block_size == 0: + break + + compressed_block_size = int.from_bytes(src.read(4), "big") + src.seek(4, io.SEEK_CUR) # Skip checksum + + buf = src.read(compressed_block_size) + + if uncompressed_block_size == compressed_block_size: + # Uncompressed block + result.append(buf) + else: + # Compressed block + result.append(lzo.decompress(buf, header=False, buflen=uncompressed_block_size)) + + return b"".join(result) diff --git a/tests/compression/test_lzop.py b/tests/compression/test_lzop.py new file mode 100644 index 0000000..15de9f7 --- /dev/null +++ b/tests/compression/test_lzop.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +import hashlib +from typing import TYPE_CHECKING + +import pytest + +from dissect.util.compression import lzop + +if TYPE_CHECKING: + from pytest_benchmark.fixture import BenchmarkFixture + + +PARAMS = ( + ("data", "digest"), + [ + pytest.param( + "894c5a4f000d0a1a0a104020a00940010503000001000081a4699f2a5700" + "00000005736d616c6c515206300000012c00000021884d72d90361626361" + "626320f314000f6162636162636162636162636162636162631100000000" + "0000", + "d9f5aeb06abebb3be3f38adec9a2e3b94228d52193be923eb4e24c9b56ee0930", + id="basic", + ), + pytest.param( + "894c5a4f000d0a1a0a104020a00940010503000001000081a4699f2a6e00" + "000000056c61726765521e0639000036df00000213975059030000b34c6f" + "72656d20697073756d20646f6c6f722073697420616d657420636f6e7365" + "6374657475722061646970697363696e6720656c69742e20517569737175" + "652066617563696275732065782073617069656e2076697461652070656c" + "6c656e7465737175652073656d20706c6163657261742e20496e20696420" + "637572737573206d69207072657469756d2074656c6c7573206475697320" + "636f6e76616c6c69732e2054656d707573206c656f2065752061656e6561" + "6e20736564206469616d2075726e612074656d706f722e2050756c76696e" + "617220766976616d7573206672696e67696c6c61206c61637573206e6563" + "206d6574757320626962656e64756d20656765737461732e20496163756c" + "6973206d61737361206e69736c206d616c657375616461206c6163696e69" + "6120696e7465676572206e756e6320706f73756572652e2055742068656e" + "6472657269742073656d7065722076656c20636c61737320617074656e74" + "2074616369746920736f63696f7371752e204164206c69746f726120746f" + "727175656e742070657220636f6e75626961206e6f7374726120696e6365" + "70746f732068696d656e61656f732e0a0a4c6f72656d20697073756d2064" + "200000000000000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000020d8060c746f" + "732068696d656e61656f732e0a11000000000000", + "73d3dd96ca2e2f0144a117019256d770ee7c6febeaee09b24956c723ae22b529", + id="large", + ), + ], +) + + +@pytest.mark.parametrize(*PARAMS) +def test_lzop_decompress(data: str, digest: str) -> None: + assert hashlib.sha256(lzop.decompress(bytes.fromhex(data))).hexdigest() == digest + + +@pytest.mark.benchmark +@pytest.mark.parametrize(*PARAMS) +def test_benchmark_lzop_decompress(data: str, digest: str, benchmark: BenchmarkFixture) -> None: + assert hashlib.sha256(benchmark(lzop.decompress, bytes.fromhex(data))).hexdigest() == digest