Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions flow/record/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,33 @@
from flow.record.exceptions import RecordAdapterNotFound, RecordDescriptorError
from flow.record.utils import get_stdin, get_stdout

# lz4
try:
import lz4.frame as lz4

HAS_LZ4 = True
except ImportError:
HAS_LZ4 = False

# bzip2
try:
import bz2

HAS_BZ2 = True
except ImportError:
HAS_BZ2 = False
try:
import zstandard as zstd

# zstandard
try:
if sys.version_info >= (3, 14):
from compression import zstd # novermin
else:
from backports import zstd
HAS_ZSTD = True
except ImportError:
HAS_ZSTD = False

# fastavro
try:
import fastavro as avro # noqa

Expand Down Expand Up @@ -727,8 +735,7 @@ def open_stream(fp: BinaryIO, mode: str) -> BinaryIO:
elif HAS_LZ4 and peek_data[:4] == LZ4_MAGIC:
fp = lz4.open(fp, mode=mode)
elif HAS_ZSTD and peek_data[:4] == ZSTD_MAGIC:
dctx = zstd.ZstdDecompressor()
fp = dctx.stream_reader(fp)
fp = zstd.ZstdFile(fp, mode=mode)

return fp

Expand Down Expand Up @@ -804,13 +811,8 @@ def open_path(path: str, mode: str, clobber: bool = True) -> IO:
fp = lz4.open(path, mode)
elif path.endswith((".zstd", ".zst")):
if not HAS_ZSTD:
raise RuntimeError("zstandard python module not available")
if not out:
dctx = zstd.ZstdDecompressor()
fp = dctx.stream_reader(pathobj.open("rb"))
else:
cctx = zstd.ZstdCompressor()
fp = cctx.stream_writer(pathobj.open("wb"))
raise RuntimeError("backports.zstd python module not available")
fp = zstd.ZstdFile(path, mode)

# normal file or stdio for reading or writing
if not fp:
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ repository = "https://github.com/fox-it/flow.record"
# Note: these compression libraries do not work well with pypy
compression = [
"lz4",
"zstandard; platform_python_implementation != 'PyPy'",
"backports.zstd; python_version < '3.14'",
]
elastic = [
"elasticsearch",
Expand Down Expand Up @@ -68,7 +68,7 @@ full = [
[dependency-groups]
compression = [
"lz4",
"zstandard; platform_python_implementation != 'PyPy'",
"backports.zstd; python_version < '3.14'",
]
elastic = [
"elasticsearch",
Expand Down
4 changes: 1 addition & 3 deletions tests/record/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,10 @@ def test_compressed_writer_reader(tmp_path: Path, compression: str) -> None:
if compression == "lz4" and not HAS_LZ4:
pytest.skip("lz4 module not installed")
if compression == "zstd" and not HAS_ZSTD:
pytest.skip("zstandard module not installed")
pytest.skip("backports.zstd module not installed")

if compression == "lz4" and platform.python_implementation() == "PyPy":
pytest.skip("lz4 module not supported on PyPy")
if compression == "zstd" and platform.python_implementation() == "PyPy":
pytest.skip("zstandard module not supported on PyPy")

p = tmp_path.joinpath(f"{compression}-test")
p.mkdir()
Expand Down