diff --git a/cabarchive/archive.py b/cabarchive/archive.py index 847ecb5..48bc7f6 100644 --- a/cabarchive/archive.py +++ b/cabarchive/archive.py @@ -126,5 +126,10 @@ def save(self, compress: bool = False, sort: bool = True) -> bytes: """ return CabArchiveWriter(self, compress=compress, sort=sort).write() + @property + def size(self) -> int: + """Returns cabinet uncompressed data size""" + return sum(len(cffile) for cffile in self.values()) + def __repr__(self) -> str: return f"CabArchive({[str(self[cabfile]) for cabfile in self]})" diff --git a/cabarchive/cli.py b/cabarchive/cli.py index 3152cea..7aa246b 100755 --- a/cabarchive/cli.py +++ b/cabarchive/cli.py @@ -10,29 +10,12 @@ import sys import os import argparse -import tempfile -import subprocess -import glob sys.path.append(os.path.realpath(".")) from cabarchive import CabArchive, CabFile, NotSupportedError -def repack(arc: CabArchive, arg: str) -> None: - with tempfile.TemporaryDirectory("cabarchive") as tmpdir: - print(f"Extracting to {tmpdir}") - subprocess.call(["cabextract", "--fix", "--quiet", "--directory", tmpdir, arg]) - for fn in glob.iglob(os.path.join(tmpdir, "**"), recursive=True): - try: - with open(fn, "rb") as f: - fn_noprefix = fn[len(tmpdir) + 1 :] - print(f"Adding: {fn_noprefix}") - arc[fn_noprefix] = CabFile(f.read()) - except IsADirectoryError as _: - pass - - def main(): parser = argparse.ArgumentParser(description="Process cabinet archives.") parser.add_argument( @@ -42,9 +25,9 @@ def main(): default=False, ) parser.add_argument( - "--autorepack", + "--create", action="store_true", - help="Repack using cabextract when required", + help="create an archive", default=False, ) parser.add_argument( @@ -65,27 +48,37 @@ def main(): return 1 args, argv = parser.parse_known_args() - for arg in argv: - arc = CabArchive() - try: - with open(arg, "rb") as f: - arc.parse(f.read()) - except NotSupportedError as e: - if not args.autorepack: - print(f"Failed to parse: {str(e)}; perhaps try --autorepack") + if args.decompress: + for fn in argv: + arc = CabArchive() + try: + with open(fn, "rb") as f: + arc.parse(f.read()) + except NotSupportedError as e: + print(f"Failed to parse: {str(e)}") return 1 - repack(arc, arg) - print(f"Parsing {arg}:") - if args.info: - for fn in arc: - print(fn) - if args.decompress: + print(f"Parsing {fn}:") + if args.info: + for fn in arc: + print(fn) for fn in arc: path = os.path.join(args.outdir, fn) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as f: print(f"Writing {fn}:") f.write(arc[fn].buf) + elif args.create: + arc = CabArchive() + try: + print(f"Creating {argv[0]}:") + except IndexError: + print("Expected: ARCHIVE [FILE]...") + return 1 + for fn in argv[1:]: + with open(fn, "rb") as f: + arc[os.path.basename(fn)] = CabFile(buf=f.read()) + with open(argv[0], "wb") as f: + f.write(arc.save()) return 0 diff --git a/cabarchive/file.py b/cabarchive/file.py index fdad3de..6b34f55 100644 --- a/cabarchive/file.py +++ b/cabarchive/file.py @@ -18,7 +18,6 @@ def _is_ascii(text: str) -> bool: class CabFile: - """An object representing a file in a Cab archive Any number of CabFile instances can be stored in a CabArchive. diff --git a/cabarchive/parser.py b/cabarchive/parser.py index 385c31e..3e69140 100644 --- a/cabarchive/parser.py +++ b/cabarchive/parser.py @@ -30,11 +30,12 @@ class CabArchiveParser: def __init__(self, cfarchive: "CabArchive", flattern: bool = False): self.cfarchive: "CabArchive" = cfarchive self.flattern: bool = flattern - self._folder_data: List[bytearray] = [] + self._folder_data: List[bytes] = [] self._buf: bytes = b"" self._header_reserved: bytes = b"" self._zdict: Optional[bytes] = None self._rsvd_block: int = 0 + self._ndatabsz: int = 0 def parse_cffile(self, offset: int) -> int: """Parse a CFFILE entry""" @@ -103,10 +104,14 @@ def parse_cffolder(self, idx: int, offset: int) -> None: raise NotSupportedError("LZX compression not supported") raise NotSupportedError(f"Compression type 0x{compression:x} not supported") - # parse CDATA - self._folder_data.append(bytearray()) - for _ in range(ndatab): - offset += self.parse_cfdata(idx, offset, compression) + # parse CDATA, either using the stream offset or the per-spec CFFOLDER.ndatab + self._folder_data.append(bytes()) + if self._ndatabsz: + while offset < self._ndatabsz: + offset += self.parse_cfdata(idx, offset, compression) + else: + for _ in range(ndatab): + offset += self.parse_cfdata(idx, offset, compression) def parse_cfdata(self, idx: int, offset: int, compression: int) -> int: """Parse a CFDATA entry""" @@ -127,7 +132,7 @@ def parse_cfdata(self, idx: int, offset: int, compression: int) -> int: # verify checksum if checksum != 0: checksum_actual = _checksum_compute(buf_cfdata) - hdr = bytearray(struct.pack(" None: # read this so we can do round-trip self.cfarchive.set_id = set_id + # if the only folder is >= 2GB then CFFOLDER.ndatab will overflow + if len(self._buf) >= 0x8000 * 0xFFFF and nr_folders == 1: + self._ndatabsz = len(self._buf) + # parse CFFOLDER for i in range(nr_folders): self.parse_cffolder(i, offset) diff --git a/cabarchive/test_misc.py b/cabarchive/test_misc.py index f38c849..56508ab 100755 --- a/cabarchive/test_misc.py +++ b/cabarchive/test_misc.py @@ -25,7 +25,7 @@ def _check_range(data: bytes, expected: bytes) -> None: assert data assert expected - failures = 0 + failures: int = 0 if len(data) != len(expected): print(f"different sizes, got {len(data)} expected {len(expected)}") failures += 1 @@ -36,7 +36,8 @@ def _check_range(data: bytes, expected: bytes) -> None: if failures > 10: print("More than 10 failures, giving up...") break - assert failures == 0, "Data is not the same" + if failures: + raise ValueError("Data is not the same") class TestInfParser(unittest.TestCase): @@ -134,7 +135,6 @@ def test_large_compressed(self): hashlib.sha1(cff.buf).hexdigest(), "8497fe89c41871e3cbd7955e13321e056dfbd170", ) - _check_range(arc.save(compress=True), old) def test_multi_folder(self): # open a folder with multiple folders @@ -213,7 +213,7 @@ def test_create(self): b"\x20\x70\x72\x69\x6E\x74\x66\x28\x22\x57\x65\x6C\x63\x6F\x6D\x65" b"\x21\x5C\x6E\x22\x29\x3B\x0D\x0A\x7D\x0D\x0A\x0D\x0A" ) - _check_range(bytearray(data), bytearray(expected)) + _check_range(data, expected) # use cabextract to test validity try: diff --git a/cabarchive/utils.py b/cabarchive/utils.py index 39beaad..b438f60 100644 --- a/cabarchive/utils.py +++ b/cabarchive/utils.py @@ -7,6 +7,8 @@ # # pylint: disable=protected-access,too-few-public-methods +import struct + from typing import List FMT_CFHEADER = "<4sxxxxIxxxxIxxxxBBHHHHH" @@ -16,33 +18,29 @@ FMT_CFDATA = " List[bytearray]: +def _chunkify(arr: bytes, size: int) -> List[bytes]: """Split up a bytestream into chunks""" arrs = [] for i in range(0, len(arr), size): - chunk = bytearray(arr[i : i + size]) - arrs.append(chunk) + arrs.append(arr[i : i + size]) return arrs -def _checksum_compute(content: bytes, seed: int = 0) -> int: +def _checksum_compute(buf: bytes, seed: int = 0) -> int: """Compute the MS cabinet checksum""" - csum = seed - chunks = _chunkify(content, 4) - for chunk in chunks: - if len(chunk) == 4: - ul = chunk[0] - ul |= chunk[1] << 8 - ul |= chunk[2] << 16 - ul |= chunk[3] << 24 - else: + csum: int = seed + for offset in range(0, len(buf), 4): + try: + (ul,) = struct.unpack_from(" bytes: cffiles.extend(self.cfarchive.values()) # create linear CFDATA block - cfdata_linear = bytearray() - for f in cffiles: - if f.buf: - cfdata_linear += f.buf + if len(cffiles) > 1: + cfdata_linear = bytes() + for f in cffiles: + if f.buf: + cfdata_linear += f.buf + else: + cfdata_linear = cffiles[0].buf or bytes() # _chunkify and compress with a fixed size chunks = _chunkify(cfdata_linear, 0x8000) @@ -54,7 +57,7 @@ def write(self) -> bytes: chunks_zlib = [] for chunk in chunks: compressobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS) - chunk_zlib = bytearray(b"CK") + chunk_zlib = b"CK" chunk_zlib += compressobj.compress(chunk) chunk_zlib += compressobj.flush() chunks_zlib.append(chunk_zlib) @@ -97,7 +100,7 @@ def write(self) -> bytes: data += struct.pack( FMT_CFFOLDER, offset, # offset to CFDATA - len(chunks), # number of CFDATA blocks + min(len(chunks), 0xFFFF), # number of CFDATA blocks self.compress, ) # compression type @@ -126,7 +129,7 @@ def write(self) -> bytes: # first do the 'checksum' on the data, then the partial # header. slightly crazy, but anyway checksum = _checksum_compute(chunk_zlib) - hdr = bytearray(struct.pack(" bytes: ) # uncompressed bytes data += chunk_zlib - # return bytearray + # success return data