Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 67 additions & 41 deletions zipfly/zipfly.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
# v

import io
import stat
import zipfile
import threading
import queue

ZIP64_LIMIT = (1 << 31) + 1

Expand All @@ -21,28 +22,45 @@ class ZipflyStream(io.RawIOBase):
RawIOBase to provide an interface to files in the machine’s file system.
"""

def __init__(self):
self._buffer = b''
def __init__(self, chunksize):
self._queue = queue.Queue(maxsize=64)
self._staging = bytearray()
self._size = 0
self._chunksize = chunksize

def writable(self):
return True

def write(self, b):
if self.closed:
raise RuntimeError("ZipFly stream was closed!")
self._buffer += b
# Accumulate data into staging buffer and emit fixed-size chunks to the queue
self._staging.extend(b)
self._size += len(b)
while len(self._staging) >= self._chunksize:
chunk = bytes(self._staging[:self._chunksize])
del self._staging[:self._chunksize]
self._queue.put(chunk)
return len(b)

def get(self):
chunk = self._buffer
self._buffer = b''
self._size += len(chunk)
return chunk
return self._queue.get()

def size(self):
return self._size

def flush(self):
self._queue.put(bytes(self._staging))
self._staging.clear()

def close(self):
if not self.closed:
# Flush remaining data and signal end-of-stream
self.flush()
# Sentinel to mark end
self._queue.put(None)
return super().close()


class ZipFly:

Expand Down Expand Up @@ -178,54 +196,62 @@ def buffer_prediction_size(self):
def generator(self):

# stream
stream = ZipflyStream()
stream = ZipflyStream(self.chunksize)

with zipfile.ZipFile(
stream,
mode = self.mode,
compression = self.compression,
allowZip64 = self.allowZip64,) as zf:
def writer():
with zipfile.ZipFile(
stream,
mode = self.mode,
compression = self.compression,
allowZip64 = self.allowZip64,) as zf:

for path in self.paths:
for path in self.paths:

if not self.filesystem in path:
if not self.filesystem in path:

raise RuntimeError(f"'{self.filesystem}' key is required")
raise RuntimeError(f"'{self.filesystem}' key is required")

"""
filesystem should be the path to a file or directory on the filesystem.
arcname is the name which it will have within the archive (by default,
this will be the same as filename
"""
"""
filesystem should be the path to a file or directory on the filesystem.
arcname is the name which it will have within the archive (by default,
this will be the same as filename
"""

if not self.arcname in path:
if not self.arcname in path:

# arcname will be default path
path[self.arcname] = path[self.filesystem]
# arcname will be default path
path[self.arcname] = path[self.filesystem]

z_info = zipfile.ZipInfo.from_file(
path[self.filesystem],
path[self.arcname]
)
z_info = zipfile.ZipInfo.from_file(
path[self.filesystem],
path[self.arcname]
)

with open( path[self.filesystem], 'rb' ) as e:
# Read from filesystem:
with zf.open( z_info, mode=self.mode ) as d:
with open( path[self.filesystem], 'rb' ) as e:
# Read from filesystem:
with zf.open( z_info, mode=self.mode ) as d:

for chunk in iter( lambda: e.read(self.chunksize), b'' ):
for chunk in iter( lambda: e.read(self.chunksize), b'' ):

d.write(chunk)
yield stream.get()
d.write(chunk)

self.set_comment(self.comment)
zf.comment = self.comment

self.set_comment(self.comment)
zf.comment = self.comment
# Ensure remaining bytes are flushed and end-of-stream is signaled
stream.close()

yield stream.get()
self._buffer_size = stream.size()
t = threading.Thread(target=writer, daemon=True)
t.start()

# Flush and close this stream.
stream.close()
while True:
chunk = stream.get()
if chunk is None:
break
yield chunk

t.join()
self._buffer_size = stream.size()


def get_size(self):
Expand Down