From 1731bdd1d130331615830ed9e6df5256aa0aab7a Mon Sep 17 00:00:00 2001 From: Alexandr Zarubkin Date: Wed, 1 Oct 2025 14:59:12 +0300 Subject: [PATCH] Streaming ZIP: avoid hang during central directory write and reduce CPU Problem: Download of ZIP file containing 50000 files stalled while zipfile wrote the central directory; Chrome spinner kept spinning and Python hit 100% CPU. Root cause: bytes are immutable; _buffer += b caused O(n^2) copying during many small writes at close time. No data was yielded until the end. Changes Reworked ZipflyStream to be queue-backed with chunked staging: * Use a mutable bytearray (_staging) to append writes in-place and cut fixed-size chunks to a bounded queue.Queue (default 64 chunks). * Convert chunks to immutable bytes before enqueue for thread-safety and to decouple memory from _staging. * Added flush() to push remaining bytes and close() to flush and enqueue a None sentinel. * Track total written size without concatenating buffers. Updated ZipFly.generator() to stream concurrently: * Start a background writer thread that builds the ZIP (files + central directory) and writes into ZipflyStream. * Drain the queue in the generator and yield chunks until the sentinel is seen; then join() the writer. * Set _buffer_size from the stream after completion. Small import additions: threading, queue. ZipflyStream.__init__ now takes chunksize (internal use only). --- zipfly/zipfly.py | 108 +++++++++++++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 41 deletions(-) diff --git a/zipfly/zipfly.py b/zipfly/zipfly.py index 9496270..08a8853 100644 --- a/zipfly/zipfly.py +++ b/zipfly/zipfly.py @@ -3,8 +3,9 @@ # v import io -import stat import zipfile +import threading +import queue ZIP64_LIMIT = (1 << 31) + 1 @@ -21,9 +22,11 @@ class ZipflyStream(io.RawIOBase): RawIOBase to provide an interface to files in the machine’s file system. """ - def __init__(self): - self._buffer = b'' + def __init__(self, chunksize): + self._queue = queue.Queue(maxsize=64) + self._staging = bytearray() self._size = 0 + self._chunksize = chunksize def writable(self): return True @@ -31,18 +34,33 @@ def writable(self): def write(self, b): if self.closed: raise RuntimeError("ZipFly stream was closed!") - self._buffer += b + # Accumulate data into staging buffer and emit fixed-size chunks to the queue + self._staging.extend(b) + self._size += len(b) + while len(self._staging) >= self._chunksize: + chunk = bytes(self._staging[:self._chunksize]) + del self._staging[:self._chunksize] + self._queue.put(chunk) return len(b) def get(self): - chunk = self._buffer - self._buffer = b'' - self._size += len(chunk) - return chunk + return self._queue.get() def size(self): return self._size + def flush(self): + self._queue.put(bytes(self._staging)) + self._staging.clear() + + def close(self): + if not self.closed: + # Flush remaining data and signal end-of-stream + self.flush() + # Sentinel to mark end + self._queue.put(None) + return super().close() + class ZipFly: @@ -178,54 +196,62 @@ def buffer_prediction_size(self): def generator(self): # stream - stream = ZipflyStream() + stream = ZipflyStream(self.chunksize) - with zipfile.ZipFile( - stream, - mode = self.mode, - compression = self.compression, - allowZip64 = self.allowZip64,) as zf: + def writer(): + with zipfile.ZipFile( + stream, + mode = self.mode, + compression = self.compression, + allowZip64 = self.allowZip64,) as zf: - for path in self.paths: + for path in self.paths: - if not self.filesystem in path: + if not self.filesystem in path: - raise RuntimeError(f"'{self.filesystem}' key is required") + raise RuntimeError(f"'{self.filesystem}' key is required") - """ - filesystem should be the path to a file or directory on the filesystem. - arcname is the name which it will have within the archive (by default, - this will be the same as filename - """ + """ + filesystem should be the path to a file or directory on the filesystem. + arcname is the name which it will have within the archive (by default, + this will be the same as filename + """ - if not self.arcname in path: + if not self.arcname in path: - # arcname will be default path - path[self.arcname] = path[self.filesystem] + # arcname will be default path + path[self.arcname] = path[self.filesystem] - z_info = zipfile.ZipInfo.from_file( - path[self.filesystem], - path[self.arcname] - ) + z_info = zipfile.ZipInfo.from_file( + path[self.filesystem], + path[self.arcname] + ) - with open( path[self.filesystem], 'rb' ) as e: - # Read from filesystem: - with zf.open( z_info, mode=self.mode ) as d: + with open( path[self.filesystem], 'rb' ) as e: + # Read from filesystem: + with zf.open( z_info, mode=self.mode ) as d: - for chunk in iter( lambda: e.read(self.chunksize), b'' ): + for chunk in iter( lambda: e.read(self.chunksize), b'' ): - d.write(chunk) - yield stream.get() + d.write(chunk) + self.set_comment(self.comment) + zf.comment = self.comment - self.set_comment(self.comment) - zf.comment = self.comment + # Ensure remaining bytes are flushed and end-of-stream is signaled + stream.close() - yield stream.get() - self._buffer_size = stream.size() + t = threading.Thread(target=writer, daemon=True) + t.start() - # Flush and close this stream. - stream.close() + while True: + chunk = stream.get() + if chunk is None: + break + yield chunk + + t.join() + self._buffer_size = stream.size() def get_size(self):