diff --git a/zipfly/zipfly.py b/zipfly/zipfly.py index 9496270..08a8853 100644 --- a/zipfly/zipfly.py +++ b/zipfly/zipfly.py @@ -3,8 +3,9 @@ # v import io -import stat import zipfile +import threading +import queue ZIP64_LIMIT = (1 << 31) + 1 @@ -21,9 +22,11 @@ class ZipflyStream(io.RawIOBase): RawIOBase to provide an interface to files in the machine’s file system. """ - def __init__(self): - self._buffer = b'' + def __init__(self, chunksize): + self._queue = queue.Queue(maxsize=64) + self._staging = bytearray() self._size = 0 + self._chunksize = chunksize def writable(self): return True @@ -31,18 +34,33 @@ def writable(self): def write(self, b): if self.closed: raise RuntimeError("ZipFly stream was closed!") - self._buffer += b + # Accumulate data into staging buffer and emit fixed-size chunks to the queue + self._staging.extend(b) + self._size += len(b) + while len(self._staging) >= self._chunksize: + chunk = bytes(self._staging[:self._chunksize]) + del self._staging[:self._chunksize] + self._queue.put(chunk) return len(b) def get(self): - chunk = self._buffer - self._buffer = b'' - self._size += len(chunk) - return chunk + return self._queue.get() def size(self): return self._size + def flush(self): + self._queue.put(bytes(self._staging)) + self._staging.clear() + + def close(self): + if not self.closed: + # Flush remaining data and signal end-of-stream + self.flush() + # Sentinel to mark end + self._queue.put(None) + return super().close() + class ZipFly: @@ -178,54 +196,62 @@ def buffer_prediction_size(self): def generator(self): # stream - stream = ZipflyStream() + stream = ZipflyStream(self.chunksize) - with zipfile.ZipFile( - stream, - mode = self.mode, - compression = self.compression, - allowZip64 = self.allowZip64,) as zf: + def writer(): + with zipfile.ZipFile( + stream, + mode = self.mode, + compression = self.compression, + allowZip64 = self.allowZip64,) as zf: - for path in self.paths: + for path in self.paths: - if not self.filesystem in path: + if not self.filesystem in path: - raise RuntimeError(f"'{self.filesystem}' key is required") + raise RuntimeError(f"'{self.filesystem}' key is required") - """ - filesystem should be the path to a file or directory on the filesystem. - arcname is the name which it will have within the archive (by default, - this will be the same as filename - """ + """ + filesystem should be the path to a file or directory on the filesystem. + arcname is the name which it will have within the archive (by default, + this will be the same as filename + """ - if not self.arcname in path: + if not self.arcname in path: - # arcname will be default path - path[self.arcname] = path[self.filesystem] + # arcname will be default path + path[self.arcname] = path[self.filesystem] - z_info = zipfile.ZipInfo.from_file( - path[self.filesystem], - path[self.arcname] - ) + z_info = zipfile.ZipInfo.from_file( + path[self.filesystem], + path[self.arcname] + ) - with open( path[self.filesystem], 'rb' ) as e: - # Read from filesystem: - with zf.open( z_info, mode=self.mode ) as d: + with open( path[self.filesystem], 'rb' ) as e: + # Read from filesystem: + with zf.open( z_info, mode=self.mode ) as d: - for chunk in iter( lambda: e.read(self.chunksize), b'' ): + for chunk in iter( lambda: e.read(self.chunksize), b'' ): - d.write(chunk) - yield stream.get() + d.write(chunk) + self.set_comment(self.comment) + zf.comment = self.comment - self.set_comment(self.comment) - zf.comment = self.comment + # Ensure remaining bytes are flushed and end-of-stream is signaled + stream.close() - yield stream.get() - self._buffer_size = stream.size() + t = threading.Thread(target=writer, daemon=True) + t.start() - # Flush and close this stream. - stream.close() + while True: + chunk = stream.get() + if chunk is None: + break + yield chunk + + t.join() + self._buffer_size = stream.size() def get_size(self):