Skip to content
5 changes: 5 additions & 0 deletions cabarchive/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,10 @@ def save(self, compress: bool = False, sort: bool = True) -> bytes:
"""
return CabArchiveWriter(self, compress=compress, sort=sort).write()

@property
def size(self) -> int:
"""Returns cabinet uncompressed data size"""
return sum(len(cffile) for cffile in self.values())

def __repr__(self) -> str:
return f"CabArchive({[str(self[cabfile]) for cabfile in self]})"
59 changes: 26 additions & 33 deletions cabarchive/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,12 @@
import sys
import os
import argparse
import tempfile
import subprocess
import glob

sys.path.append(os.path.realpath("."))

from cabarchive import CabArchive, CabFile, NotSupportedError


def repack(arc: CabArchive, arg: str) -> None:
with tempfile.TemporaryDirectory("cabarchive") as tmpdir:
print(f"Extracting to {tmpdir}")
subprocess.call(["cabextract", "--fix", "--quiet", "--directory", tmpdir, arg])
for fn in glob.iglob(os.path.join(tmpdir, "**"), recursive=True):
try:
with open(fn, "rb") as f:
fn_noprefix = fn[len(tmpdir) + 1 :]
print(f"Adding: {fn_noprefix}")
arc[fn_noprefix] = CabFile(f.read())
except IsADirectoryError as _:
pass


def main():
parser = argparse.ArgumentParser(description="Process cabinet archives.")
parser.add_argument(
Expand All @@ -42,9 +25,9 @@ def main():
default=False,
)
parser.add_argument(
"--autorepack",
"--create",
action="store_true",
help="Repack using cabextract when required",
help="create an archive",
default=False,
)
parser.add_argument(
Expand All @@ -65,27 +48,37 @@ def main():
return 1

args, argv = parser.parse_known_args()
for arg in argv:
arc = CabArchive()
try:
with open(arg, "rb") as f:
arc.parse(f.read())
except NotSupportedError as e:
if not args.autorepack:
print(f"Failed to parse: {str(e)}; perhaps try --autorepack")
if args.decompress:
for fn in argv:
arc = CabArchive()
try:
with open(fn, "rb") as f:
arc.parse(f.read())
except NotSupportedError as e:
print(f"Failed to parse: {str(e)}")
return 1
repack(arc, arg)
print(f"Parsing {arg}:")
if args.info:
for fn in arc:
print(fn)
if args.decompress:
print(f"Parsing {fn}:")
if args.info:
for fn in arc:
print(fn)
for fn in arc:
path = os.path.join(args.outdir, fn)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
print(f"Writing {fn}:")
f.write(arc[fn].buf)
elif args.create:
arc = CabArchive()
try:
print(f"Creating {argv[0]}:")
except IndexError:
print("Expected: ARCHIVE [FILE]...")
return 1
for fn in argv[1:]:
with open(fn, "rb") as f:
arc[os.path.basename(fn)] = CabFile(buf=f.read())
with open(argv[0], "wb") as f:
f.write(arc.save())

return 0

Expand Down
1 change: 0 additions & 1 deletion cabarchive/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def _is_ascii(text: str) -> bool:


class CabFile:

"""An object representing a file in a Cab archive

Any number of CabFile instances can be stored in a CabArchive.
Expand Down
21 changes: 15 additions & 6 deletions cabarchive/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ class CabArchiveParser:
def __init__(self, cfarchive: "CabArchive", flattern: bool = False):
self.cfarchive: "CabArchive" = cfarchive
self.flattern: bool = flattern
self._folder_data: List[bytearray] = []
self._folder_data: List[bytes] = []
self._buf: bytes = b""
self._header_reserved: bytes = b""
self._zdict: Optional[bytes] = None
self._rsvd_block: int = 0
self._ndatabsz: int = 0

def parse_cffile(self, offset: int) -> int:
"""Parse a CFFILE entry"""
Expand Down Expand Up @@ -103,10 +104,14 @@ def parse_cffolder(self, idx: int, offset: int) -> None:
raise NotSupportedError("LZX compression not supported")
raise NotSupportedError(f"Compression type 0x{compression:x} not supported")

# parse CDATA
self._folder_data.append(bytearray())
for _ in range(ndatab):
offset += self.parse_cfdata(idx, offset, compression)
# parse CDATA, either using the stream offset or the per-spec CFFOLDER.ndatab
self._folder_data.append(bytes())
if self._ndatabsz:
while offset < self._ndatabsz:
offset += self.parse_cfdata(idx, offset, compression)
else:
for _ in range(ndatab):
offset += self.parse_cfdata(idx, offset, compression)

def parse_cfdata(self, idx: int, offset: int, compression: int) -> int:
"""Parse a CFDATA entry"""
Expand All @@ -127,7 +132,7 @@ def parse_cfdata(self, idx: int, offset: int, compression: int) -> int:
# verify checksum
if checksum != 0:
checksum_actual = _checksum_compute(buf_cfdata)
hdr = bytearray(struct.pack("<HH", blob_comp, blob_uncomp))
hdr = struct.pack("<HH", blob_comp, blob_uncomp)
checksum_actual = _checksum_compute(hdr, checksum_actual)
if checksum_actual != checksum:
raise CorruptionError(
Expand Down Expand Up @@ -244,6 +249,10 @@ def parse(self, buf: bytes) -> None:
# read this so we can do round-trip
self.cfarchive.set_id = set_id

# if the only folder is >= 2GB then CFFOLDER.ndatab will overflow
if len(self._buf) >= 0x8000 * 0xFFFF and nr_folders == 1:
self._ndatabsz = len(self._buf)

# parse CFFOLDER
for i in range(nr_folders):
self.parse_cffolder(i, offset)
Expand Down
8 changes: 4 additions & 4 deletions cabarchive/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
def _check_range(data: bytes, expected: bytes) -> None:
assert data
assert expected
failures = 0
failures: int = 0
if len(data) != len(expected):
print(f"different sizes, got {len(data)} expected {len(expected)}")
failures += 1
Expand All @@ -36,7 +36,8 @@ def _check_range(data: bytes, expected: bytes) -> None:
if failures > 10:
print("More than 10 failures, giving up...")
break
assert failures == 0, "Data is not the same"
if failures:
raise ValueError("Data is not the same")


class TestInfParser(unittest.TestCase):
Expand Down Expand Up @@ -134,7 +135,6 @@ def test_large_compressed(self):
hashlib.sha1(cff.buf).hexdigest(),
"8497fe89c41871e3cbd7955e13321e056dfbd170",
)
_check_range(arc.save(compress=True), old)

def test_multi_folder(self):
# open a folder with multiple folders
Expand Down Expand Up @@ -213,7 +213,7 @@ def test_create(self):
b"\x20\x70\x72\x69\x6E\x74\x66\x28\x22\x57\x65\x6C\x63\x6F\x6D\x65"
b"\x21\x5C\x6E\x22\x29\x3B\x0D\x0A\x7D\x0D\x0A\x0D\x0A"
)
_check_range(bytearray(data), bytearray(expected))
_check_range(data, expected)

# use cabextract to test validity
try:
Expand Down
36 changes: 17 additions & 19 deletions cabarchive/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#
# pylint: disable=protected-access,too-few-public-methods

import struct

from typing import List

FMT_CFHEADER = "<4sxxxxIxxxxIxxxxBBHHHHH"
Expand All @@ -16,33 +18,29 @@
FMT_CFDATA = "<IHH"


def _chunkify(arr: bytes, size: int) -> List[bytearray]:
def _chunkify(arr: bytes, size: int) -> List[bytes]:
"""Split up a bytestream into chunks"""
arrs = []
for i in range(0, len(arr), size):
chunk = bytearray(arr[i : i + size])
arrs.append(chunk)
arrs.append(arr[i : i + size])
return arrs


def _checksum_compute(content: bytes, seed: int = 0) -> int:
def _checksum_compute(buf: bytes, seed: int = 0) -> int:
"""Compute the MS cabinet checksum"""
csum = seed
chunks = _chunkify(content, 4)
for chunk in chunks:
if len(chunk) == 4:
ul = chunk[0]
ul |= chunk[1] << 8
ul |= chunk[2] << 16
ul |= chunk[3] << 24
else:
csum: int = seed
for offset in range(0, len(buf), 4):
try:
(ul,) = struct.unpack_from("<I", buf, offset)
except struct.error:
left: int = len(buf) - offset
# WTF: I can only assume this is a typo from the original
# author of the cabinet file specification
if len(chunk) == 3:
ul = (chunk[0] << 16) | (chunk[1] << 8) | chunk[2]
elif len(chunk) == 2:
ul = (chunk[0] << 8) | chunk[1]
elif len(chunk) == 1:
ul = chunk[0]
if left == 3:
ul = (buf[offset + 0] << 16) | (buf[offset + 1] << 8) | buf[offset + 2]
elif left == 2:
ul = (buf[offset + 0] << 8) | buf[offset + 1]
elif left == 1:
ul = buf[offset + 0]
csum ^= ul
return csum
19 changes: 11 additions & 8 deletions cabarchive/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,21 @@ def write(self) -> bytes:
cffiles.extend(self.cfarchive.values())

# create linear CFDATA block
cfdata_linear = bytearray()
for f in cffiles:
if f.buf:
cfdata_linear += f.buf
if len(cffiles) > 1:
cfdata_linear = bytes()
for f in cffiles:
if f.buf:
cfdata_linear += f.buf
else:
cfdata_linear = cffiles[0].buf or bytes()

# _chunkify and compress with a fixed size
chunks = _chunkify(cfdata_linear, 0x8000)
if self.compress:
chunks_zlib = []
for chunk in chunks:
compressobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS)
chunk_zlib = bytearray(b"CK")
chunk_zlib = b"CK"
chunk_zlib += compressobj.compress(chunk)
chunk_zlib += compressobj.flush()
chunks_zlib.append(chunk_zlib)
Expand Down Expand Up @@ -97,7 +100,7 @@ def write(self) -> bytes:
data += struct.pack(
FMT_CFFOLDER,
offset, # offset to CFDATA
len(chunks), # number of CFDATA blocks
min(len(chunks), 0xFFFF), # number of CFDATA blocks
self.compress,
) # compression type

Expand Down Expand Up @@ -126,7 +129,7 @@ def write(self) -> bytes:
# first do the 'checksum' on the data, then the partial
# header. slightly crazy, but anyway
checksum = _checksum_compute(chunk_zlib)
hdr = bytearray(struct.pack("<HH", len(chunk_zlib), len(chunk)))
hdr = struct.pack("<HH", len(chunk_zlib), len(chunk))
checksum = _checksum_compute(hdr, checksum)
data += struct.pack(
FMT_CFDATA,
Expand All @@ -136,5 +139,5 @@ def write(self) -> bytes:
) # uncompressed bytes
data += chunk_zlib

# return bytearray
# success
return data