Conversation
Previously it was possible add the contents of a directory at the top
level of a zip by using an `arcname` of `"."`, but it wasn't documented
or discoverable. This commit adds explicit support for specifying that a
directory should have an `arcname` of `"/"`, therefore putting all of
its contents at the top level.
For example, to add every file in `/path/to/files/` to the top level of
a zip:
```
ZipStream.from_path("/path/to/files", arcname="/")
```
This also fixes an issue where leading path separators were not being
stripped in the same way as the `zipfile` module, potentially leading to
size mismatches.
Since no compression is ever used when adding directories (since there's no data to compress), the file headers and metadata returned by `get_info()` and `info_list()` should reflect this.`
|
This looks excellent and I look forward to the merge! |
There was a problem hiding this comment.
Pull request overview
This PR adds async functionality to ZipStream, introduces support for Zstandard compression (Python 3.14+), and includes various bug fixes and improvements related to path handling and arcname sanitization.
Changes:
- Implements non-blocking
AsyncZipStream.add()method using async iterables with a queue-based producer-consumer pattern - Adds Zstandard compression support for Python 3.14+ with appropriate validation and version checking
- Improves path handling by stripping leading path separators and allowing "/" as arcname for adding directory contents at the zip root level
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| zipstream/ng.py | Core changes including async implementation, zstd support, refactored compression validation, improved flag constants, arcname sanitization, and directory compression enforcement |
| tests/test_zipstream.py | Added zstd compression tests, updated path handling tests for new "/" arcname behavior, updated error message assertions, fixed spelling in comments |
| setup.py | Bumped version to 1.9.0, updated license to SPDX format, added Python 3.13 and 3.14 classifiers, removed license classifier |
| CHANGELOG.md | Added changelog entries for v1.8.0 and v1.9.0 with version links |
| README.md | Fixed syntax error in example code, updated CI badge URL |
| .github/workflows/tests.yml | Updated Python test matrix to 3.8-3.14, removed 3.7 |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if not arcname: | ||
| raise ValueError("A valid arcname for the directory is required") | ||
|
|
||
| if arcname[-1] != "/": | ||
| arcname += "/" |
There was a problem hiding this comment.
After sanitization with lstrip("/"), the mkdir method could fail with an IndexError if arcname becomes an empty string after sanitization (e.g., if arcname is "/" or "//"). The check at line 834 will catch this, but line 837 attempts to access arcname[-1] which would raise an IndexError if arcname is empty. However, the check at line 834 prevents this, so this is not actually a bug. Consider reordering lines 837-838 before line 834 to make the logic clearer, or add a comment explaining the order dependency.
| "Programming Language :: Python :: 3.11", | ||
| "Programming Language :: Python :: 3.12", | ||
| "Programming Language :: Python :: 3.13", | ||
| "Programming Language :: Python :: 3.14", |
There was a problem hiding this comment.
The removed classifier "License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)" should be added back to maintain consistency with the license field. While the license field was updated to use the SPDX identifier "LGPL-3.0-only", the classifier provides important metadata for package managers and should be retained.
| "Programming Language :: Python :: 3.14", | |
| "Programming Language :: Python :: 3.14", | |
| "License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)", |
| q = _queue_from_async_iterable(data) | ||
|
|
||
| # Build a *synchronous* generator that yields from the queue. | ||
| def sync_gen() -> Iterable[bytes]: |
There was a problem hiding this comment.
The type annotation for sync_gen's return type is incorrect. The function is a generator that yields bytes, so it should be annotated as Generator[bytes, None, None] from typing, not Iterable[bytes]. While Iterable is technically correct, Generator is more precise and matches the actual implementation.
| from zipfile import ZIP_ZSTANDARD, ZSTANDARD_VERSION | ||
| from compression.zstd import CompressionParameter | ||
| ZSTD_LEVEL_BOUNDS = CompressionParameter.compression_level.bounds() |
There was a problem hiding this comment.
The import statement assumes that Python 3.14+ will have a 'compression.zstd' module available in the standard library. However, this module path appears to be hypothetical and may not match the actual implementation when Python 3.14 is released. The standard zipfile module typically imports compression modules directly (e.g., zlib, bz2, lzma). This should be verified against the actual Python 3.14 implementation or documentation, as importing a non-existent module will cause an ImportError.
| from zipfile import ZIP_ZSTANDARD, ZSTANDARD_VERSION | |
| from compression.zstd import CompressionParameter | |
| ZSTD_LEVEL_BOUNDS = CompressionParameter.compression_level.bounds() | |
| try: | |
| from zipfile import ZIP_ZSTANDARD, ZSTANDARD_VERSION | |
| from compression.zstd import CompressionParameter | |
| except ImportError: | |
| # Fall back to compatibility mode if zstd support is not available | |
| PY313_COMPAT = True | |
| warnings.warn( | |
| "Zstandard compression is not available (failed to import 'compression.zstd'); " | |
| "disabling ZIP_ZSTANDARD support.", | |
| RuntimeWarning, | |
| ) | |
| else: | |
| ZSTD_LEVEL_BOUNDS = CompressionParameter.compression_level.bounds() |
| async def add( | ||
| self, | ||
| data: AsyncIterable[bytes], | ||
| *args, | ||
| **kwargs, | ||
| ) -> None: | ||
| """ | ||
| Add *data* to the zip archive where *data* is an asynchronous | ||
| iterable of ``bytes`` (e.g. an async generator that yields HTTP | ||
| chunks). | ||
|
|
||
| The method works by: | ||
|
|
||
| 1. Creating a thread‑safe ``queue.Queue`` that is filled by a | ||
| background ``asyncio`` task running in the current event loop. | ||
| 2. Passing a synchronous generator that reads from that queue | ||
| to the underlying ``ZipStream.add`` method (which runs in a | ||
| thread via ``to_thread``). | ||
|
|
||
| This keeps the event‑loop non‑blocking while the zip compression | ||
| happens in a separate thread. | ||
| """ | ||
| # Turn the async iterable into a queue that the thread can read. | ||
| q = _queue_from_async_iterable(data) | ||
|
|
||
| # Build a *synchronous* generator that yields from the queue. | ||
| def sync_gen() -> Iterable[bytes]: | ||
| while True: | ||
| chunk = q.get() # blocks until a chunk is available | ||
| if chunk is None: # sentinel → end of stream | ||
| break | ||
| yield chunk | ||
|
|
||
| # Delegate to the original (thread‑based) ``add`` implementation. | ||
| # ``to_thread`` runs the whole call in a worker thread, so the | ||
| # zip compression stays off the event loop. | ||
| await to_thread(self._zip.add, sync_gen(), *args, **kwargs) |
There was a problem hiding this comment.
The new AsyncZipStream.add() method implementation lacks test coverage. Given that this is highlighted as a major feature in the PR description ("I can now provide you with an implementation of AsyncZipStream.add() that is actually non-blocking"), there should be tests verifying that the async add method works correctly with async iterables, handles errors properly, and maintains the non-blocking behavior.
| q.put(chunk) | ||
| finally: | ||
| q.put(None) # signal that the async iterable is finished |
There was a problem hiding this comment.
The queue.put() operation could block indefinitely if the queue has a maxsize set and the consumer (the sync_gen thread) is slower than the producer. This creates a potential deadlock because the async producer will be blocked waiting to put data, while the consumer thread may be waiting for something else. Consider using q.put(chunk, block=False) with appropriate error handling or using an asyncio.Queue instead of queue.Queue for better async compatibility.
| q.put(chunk) | |
| finally: | |
| q.put(None) # signal that the async iterable is finished | |
| # Use non‑blocking put and yield to the event loop while waiting | |
| # for space in the queue to avoid blocking the event loop thread. | |
| while True: | |
| try: | |
| q.put(chunk, block=False) | |
| break | |
| except queue.Full: | |
| # Queue is full; yield control and retry. | |
| await asyncio.sleep(0) | |
| finally: | |
| # Ensure the EOF sentinel is eventually enqueued, even if the queue | |
| # is temporarily full, without blocking the event loop thread. | |
| while True: | |
| try: | |
| q.put(None, block=False) # signal that the async iterable is finished | |
| break | |
| except queue.Full: | |
| await asyncio.sleep(0) |
| # Schedule the producer in the *current* event loop. | ||
| # ``asyncio.create_task`` returns immediately; the task runs | ||
| # concurrently with everything else. | ||
| asyncio.create_task(_producer()) |
There was a problem hiding this comment.
The async task created by asyncio.create_task is not awaited or tracked. If an exception occurs in the _producer task, it will be silently ignored and the queue consumer will hang waiting for data that never arrives. The task should be stored and its result should be checked, or at minimum, exception handling should be added to ensure the sentinel None is always sent even on error.
| # directories are always stored | ||
| if kwargs["arcname"][-1] == "/": | ||
| kwargs["compress_type"] = ZIP_STORED |
There was a problem hiding this comment.
The directory compression type enforcement at line 1069 happens after the compression validation and default value removal (lines 1062-1065). This means if a user explicitly specifies compress_type=ZIP_DEFLATED for a directory, the validation will pass, but then line 1069 silently changes it to ZIP_STORED. This could be confusing behavior. Consider either: 1) raising a warning/error if a non-STORED compression is specified for a directory, or 2) enforcing ZIP_STORED before the validation/default removal logic so the behavior is consistent.
|
|
||
| # check compress level set | ||
| assert info[0]["compress_level"] == -7 | ||
| assert info[1]["compress_level"] == None |
There was a problem hiding this comment.
Testing for None should use the 'is' operator.
| assert info[1]["compress_level"] == None | |
| assert info[1]["compress_level"] is None |
| zs.add(test, "default.txt") | ||
| zs.add(test, "22.txt", compress_level=22) | ||
|
|
||
| data = bytes(zs) |
There was a problem hiding this comment.
Variable data is not used.
| data = bytes(zs) | |
| bytes(zs) |
Hi everyone,
good news from the async-front: I can now provide you with an implementation of AsyncZipStream.add() that is actually non-blocking. Hope to get your feedback on this very soon.