Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flow/record/adapter/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import TYPE_CHECKING

from flow.broker import Publisher, Subscriber

from flow.record.adapter import AbstractReader, AbstractWriter

if TYPE_CHECKING:
Expand Down
2 changes: 1 addition & 1 deletion flow/record/adapter/csvfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def __iter__(self) -> Iterator[Record]:
ctx = get_app_context()
selector = self.selector
for row in self.reader:
rdict = dict(zip(self.fields, row))
rdict = dict(zip(self.fields, row, strict=False))
record = self.desc.init_from_dict(rdict)
if match_record_with_context(record, selector, ctx):
yield record
2 changes: 1 addition & 1 deletion flow/record/adapter/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def read_table(self, table_name: str) -> Iterator[Record]:
row[idx] = None
elif isinstance(value, str):
row[idx] = value.encode(errors="surrogateescape")
yield descriptor_cls.init_from_dict(dict(zip(fnames, row)))
yield descriptor_cls.init_from_dict(dict(zip(fnames, row, strict=False)))

def __iter__(self) -> Iterator[Record]:
"""Iterate over all tables in the database and yield records."""
Expand Down
2 changes: 1 addition & 1 deletion flow/record/adapter/xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def __iter__(self) -> Iterator[Record]:
for col in row
if col.value and not col.value.startswith("_")
]
desc = record.RecordDescriptor(desc_name, list(zip(field_types, field_names)))
desc = record.RecordDescriptor(desc_name, list(zip(field_types, field_names, strict=False)))
continue

record_values = []
Expand Down
5 changes: 2 additions & 3 deletions flow/record/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
TYPE_CHECKING,
Any,
BinaryIO,
Callable,
)
from urllib.parse import parse_qsl, urlparse

Expand Down Expand Up @@ -60,7 +59,7 @@
from flow.record.whitelist import WHITELIST, WHITELIST_TREE

if TYPE_CHECKING:
from collections.abc import Iterator, Mapping, Sequence
from collections.abc import Callable, Iterator, Mapping, Sequence

from flow.record.adapter import AbstractReader, AbstractWriter

Expand Down Expand Up @@ -1000,7 +999,7 @@ def merge_record_descriptors(
field_map[fname] = ftype
if name is None and descriptors:
name = descriptors[0].name
return RecordDescriptor(name, zip(field_map.values(), field_map.keys()))
return RecordDescriptor(name, zip(field_map.values(), field_map.keys(), strict=False))


def extend_record(
Expand Down
11 changes: 1 addition & 10 deletions flow/record/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import sys
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass
Expand Down Expand Up @@ -37,15 +36,7 @@ def fresh_app_context() -> Generator[AppContext, None, None]:
APP_CONTEXT.reset(token)


# Use slots=True on dataclass for better performance which requires Python 3.10 or later.
# This can be removed when we drop support for Python 3.9.
if sys.version_info >= (3, 10):
app_dataclass = dataclass(slots=True) # novermin
else:
app_dataclass = dataclass


@app_dataclass
@dataclass(slots=True)
class AppContext:
"""Context for the application, holding metrics like amount of processed records."""

Expand Down
11 changes: 5 additions & 6 deletions flow/record/fieldtypes/net/ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@
ip_interface,
ip_network,
)
from typing import Union

from flow.record.base import FieldType
from flow.record.fieldtypes import defang

_IPNetwork = Union[IPv4Network, IPv6Network]
_IPAddress = Union[IPv4Address, IPv6Address]
_IPInterface = Union[IPv4Interface, IPv6Interface]
_ConversionTypes = Union[str, int, bytes]
_IPTypes = Union[_IPNetwork, _IPAddress, _IPInterface]
_IPNetwork = IPv4Network | IPv6Network
_IPAddress = IPv4Address | IPv6Address
_IPInterface = IPv4Interface | IPv6Interface
_ConversionTypes = str | int | bytes
_IPTypes = _IPNetwork | _IPAddress | _IPInterface


class ipaddress(FieldType):
Expand Down
6 changes: 3 additions & 3 deletions flow/record/fieldtypes/net/ipv4.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def addr_str(s: address | int | str) -> str:


def mask_to_bits(n: int) -> int:
return bin(n).count("1")
return n.bit_count()


def bits_to_mask(b: int) -> int:
Expand All @@ -51,7 +51,7 @@ def __init__(self, addr: str, netmask: int | None = None):
raise TypeError(f"Subnet() argument 1 must be string, not {type(addr).__name__}")

if netmask is None:
ip, sep, mask = addr.partition("/")
ip, _, mask = addr.partition("/")
self.mask = bits_to_mask(int(mask)) if mask else 0xFFFFFFFF
self.net = addr_long(ip)
else:
Expand Down Expand Up @@ -93,7 +93,7 @@ def __init__(self):
def load(self, path: str | Path) -> None:
with Path(path).open() as fh:
for line in fh:
entry, desc = line.split(" ", 1)
entry, _ = line.split(" ", 1)
self.subnets.append(subnet(entry))

def add(self, entry: str) -> None:
Expand Down
4 changes: 2 additions & 2 deletions flow/record/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import ast
import operator
import re
from typing import TYPE_CHECKING, Any, Callable
from typing import TYPE_CHECKING, Any

from flow.record.base import GroupedRecord, Record, dynamic_fieldtype
from flow.record.fieldtypes import net
from flow.record.whitelist import WHITELIST, WHITELIST_TREE

if TYPE_CHECKING:
from collections.abc import Iterator
from collections.abc import Callable, Iterator

try:
import astor
Expand Down
5 changes: 4 additions & 1 deletion flow/record/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import sys
import warnings
from functools import wraps
from typing import Any, BinaryIO, Callable, TextIO
from typing import TYPE_CHECKING, Any, BinaryIO, TextIO

if TYPE_CHECKING:
from collections.abc import Callable

LOGGING_TRACE_LEVEL = 5

Expand Down
117 changes: 57 additions & 60 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
name = "flow.record"
description = "A library for defining and creating structured data (called records) that can be streamed to disk or piped to other tools that use flow.record"
readme = "README.md"
requires-python = ">=3.9.0"
requires-python = ">=3.10"
license = "AGPL-3.0-or-later"
license-files = ["LICENSE", "COPYRIGHT"]
authors = [
Expand Down Expand Up @@ -37,8 +37,7 @@ repository = "https://github.com/fox-it/flow.record"
# Note: these compression libraries do not work well with pypy
compression = [
"lz4",
"zstandard==0.23.0; platform_python_implementation == 'PyPy' and python_version == '3.9'", # Pin to last working for PyPy3.9
"zstandard; platform_python_implementation != 'PyPy' and python_version != '3.9'", # Otherwise, pick the latest
"zstandard; platform_python_implementation != 'PyPy'",
]
elastic = [
"elasticsearch",
Expand All @@ -47,12 +46,11 @@ geoip = [
"maxminddb",
]
avro = [
"cramjam<2.8.4; platform_python_implementation == 'PyPy' and python_version == '3.9'", # Pin to last working for PyPy3.9
"fastavro[snappy]",
]
duckdb = [
"duckdb",
"pytz", # duckdb requires pytz for timezone support
"duckdb; platform_python_implementation != 'PyPy'", # Don't install duckdb on PyPy
"pytz; platform_python_implementation != 'PyPy'", # Don't install duckdb on PyPy
]
splunk = [
"httpx",
Expand All @@ -66,13 +64,65 @@ full = [
"structlog",
]

# This list is duplicated due to https://github.com/fox-it/flow.record/pull/182#discussion_r2284582481
[dependency-groups]
compression = [
"lz4",
"zstandard; platform_python_implementation != 'PyPy'",
]
elastic = [
"elasticsearch",
]
geoip = [
"maxminddb",
]
avro = [
"fastavro[snappy]",
]
duckdb = [
"duckdb; platform_python_implementation != 'PyPy'", # Don't install duckdb on PyPy
"pytz; platform_python_implementation != 'PyPy'", # Don't install duckdb on PyPy
]
splunk = [
"httpx",
]
xlsx = [
"openpyxl",
]
test = [
{include-group = "compression"},
{include-group = "avro"},
{include-group = "elastic"},
{include-group = "xlsx"},
{include-group = "duckdb"},
"tqdm",
"structlog",
"pytest",
]
full = [
{include-group = "compression"},
"tqdm",
"structlog",
]
build = [
"build",
]
lint = [
"ruff==0.13.1",
"vermin",
]
dev = [
{include-group = "test"},
{include-group = "lint"},
]

[project.scripts]
rdump = "flow.record.tools.rdump:main"
rgeoip = "flow.record.tools.geoip:main"

[tool.ruff]
line-length = 120
required-version = ">=0.9.0"
required-version = ">=0.13.1"
extend-exclude = ["flow/record/version.py"]

[tool.ruff.format]
Expand Down Expand Up @@ -127,56 +177,3 @@ include = ["flow.*"]
[tool.setuptools_scm]
version_file = "flow/record/version.py"

# This list is duplicated due to https://github.com/fox-it/flow.record/pull/182#discussion_r2284582481
[dependency-groups]
compression = [
"lz4",
"zstandard==0.23.0; platform_python_implementation == 'PyPy' and python_version == '3.9'", # Pin to last working for PyPy3.9
"zstandard; platform_python_implementation != 'PyPy' and python_version != '3.9'", # Otherwise, pick the latest
]
elastic = [
"elasticsearch",
]
geoip = [
"maxminddb",
]
avro = [
"cramjam<2.8.4; platform_python_implementation == 'PyPy' and python_version == '3.9'", # Pin to last working for PyPy3.9
"fastavro[snappy]",
]
duckdb = [
"duckdb; platform_python_implementation != 'PyPy'", # Don't install duckdb on PyPy
"pytz; platform_python_implementation != 'PyPy'", # Don't install duckdb on PyPy
]
splunk = [
"httpx",
]
xlsx = [
"openpyxl",
]
test = [
{include-group = "compression"},
{include-group = "avro"},
{include-group = "elastic"},
{include-group = "xlsx"},
{include-group = "duckdb"},
"tqdm",
"structlog",
"pytest",
]
full = [
{include-group = "compression"},
"tqdm",
"structlog",
]
build = [
"build",
]
lint = [
"ruff==0.12.9",
"vermin",
]
dev = [
{include-group = "test"},
{include-group = "lint"},
]
6 changes: 5 additions & 1 deletion tests/adapter/test_splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ def mock_httpx_package(monkeypatch: pytest.MonkeyPatch) -> Iterator[MagicMock]:


@pytest.mark.parametrize(
("field", "escaped"), [*list(zip(escaped_fields, [True] * len(escaped_fields))), ("not_escaped", False)]
("field", "escaped"),
[
*list(zip(escaped_fields, [True] * len(escaped_fields), strict=False)),
("not_escaped", False),
],
)
def test_escape_field_name(field: str, escaped: bool) -> None:
if escaped:
Expand Down
2 changes: 1 addition & 1 deletion tests/adapter/test_sqlite_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def test_invalid_field_names_quoting(tmp_path: Path, invalid_field_name: str) ->

# However, these field names are invalid in flow.record and should raise an exception
with (
pytest.raises(RecordDescriptorError, match="Field .* is an invalid or reserved field name."),
pytest.raises(RecordDescriptorError, match=r"Field .* is an invalid or reserved field name."),
RecordReader(f"sqlite://{db}") as reader,
):
_ = next(iter(reader))
Expand Down
9 changes: 6 additions & 3 deletions tests/fieldtypes/test_fieldtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import posixpath
import types
from datetime import datetime, timedelta, timezone
from typing import Callable
from typing import TYPE_CHECKING

import pytest

Expand All @@ -30,6 +30,9 @@
)
from flow.record.fieldtypes import datetime as dt

if TYPE_CHECKING:
from collections.abc import Callable

UTC = timezone.utc

INT64_MAX = (1 << 63) - 1
Expand Down Expand Up @@ -1006,10 +1009,10 @@ def test_datetime_timezone_aware(tmp_path: pathlib.Path, record_filename: str) -


def test_datetime_comparisions() -> None:
with pytest.raises(TypeError, match=".* compare .*naive"):
with pytest.raises(TypeError, match=r".* compare .*naive"):
assert dt("2023-01-01") > datetime(2022, 1, 1) # noqa: DTZ001

with pytest.raises(TypeError, match=".* compare .*naive"):
with pytest.raises(TypeError, match=r".* compare .*naive"):
assert datetime(2022, 1, 1) < dt("2023-01-01") # noqa: DTZ001

assert dt("2023-01-01") > datetime(2022, 1, 1, tzinfo=UTC)
Expand Down
4 changes: 2 additions & 2 deletions tests/fieldtypes/test_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_field_ipaddress() -> None:
a = net.IPAddress("192.168.1.1")
assert a == "192.168.1.1"

with pytest.raises(ValueError, match=".* does not appear to be an IPv4 or IPv6 address"):
with pytest.raises(ValueError, match=r".* does not appear to be an IPv4 or IPv6 address"):
net.IPAddress("a.a.a.a")


Expand All @@ -27,7 +27,7 @@ def test_field_ipnetwork() -> None:
assert a == "192.168.1.0/24"

# Host bits set
with pytest.raises(ValueError, match=".* has host bits set"):
with pytest.raises(ValueError, match=r".* has host bits set"):
net.IPNetwork("192.168.1.10/24")


Expand Down
Loading
Loading