Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 27 additions & 35 deletions flow/record/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import gzip
import os
from pathlib import Path

from flow.record.base import (
IGNORE_FIELDS_FOR_COMPARISON,
Expand Down Expand Up @@ -39,71 +41,61 @@

__all__ = [
"IGNORE_FIELDS_FOR_COMPARISON",
"RECORD_VERSION",
"RECORDSTREAM_MAGIC",
"RECORD_VERSION",
"DynamicDescriptor",
"FieldType",
"Record",
"GroupedRecord",
"RecordDescriptor",
"JsonRecordPacker",
"PathTemplateWriter",
"Record",
"RecordAdapter",
"RecordArchiver",
"RecordDescriptor",
"RecordDescriptorError",
"RecordField",
"RecordReader",
"RecordWriter",
"RecordOutput",
"RecordPrinter",
"RecordPacker",
"JsonRecordPacker",
"RecordStreamWriter",
"RecordPrinter",
"RecordReader",
"RecordStreamReader",
"open_path_or_stream",
"RecordStreamWriter",
"RecordWriter",
"dynamic_fieldtype",
"extend_record",
"ignore_fields_for_comparison",
"iter_timestamped_records",
"open_path",
"open_path_or_stream",
"open_stream",
"ignore_fields_for_comparison",
"record_stream",
"set_ignored_fields_for_comparison",
"stream",
"dynamic_fieldtype",
"DynamicDescriptor",
"PathTemplateWriter",
"RecordArchiver",
"RecordDescriptorError",
"record_stream",
"extend_record",
"iter_timestamped_records",
]


class View:
fields = None

def __init__(self, fields):
self.fields = fields

def __iter__(self, fields):
pass


class RecordDateSplitter:
basepath = None
out = None

def __init__(self, basepath):
self.basepath = basepath
def __init__(self, basepath: str | Path):
self.basepath = Path(basepath)

Check warning on line 82 in flow/record/__init__.py

View check run for this annotation

Codecov / codecov/patch

flow/record/__init__.py#L82

Added line #L82 was not covered by tests
self.out = {}

def getstream(self, t):
def getstream(self, t: tuple[int, int, int]) -> RecordStreamWriter:
if t not in self.out:
path = os.path.join(self.basepath, "-".join(["{:2d}".format(v) for v in t]) + ".rec.gz")
path = self.basepath.joinpath("-".join([f"{v:2d}" for v in t]) + ".rec.gz")

Check warning on line 87 in flow/record/__init__.py

View check run for this annotation

Codecov / codecov/patch

flow/record/__init__.py#L87

Added line #L87 was not covered by tests
f = gzip.GzipFile(path, "wb")
rs = RecordStreamWriter(f)
self.out[t] = rs
return self.out[t]

def write(self, r):
def write(self, r: Record) -> None:
t = (r.ts.year, r.ts.month, r.ts.day)
rs = self.getstream(t)
rs.write(r)
rs.fp.flush()

def close(self):
def close(self) -> None:
for rs in self.out.values():
rs.close()
46 changes: 18 additions & 28 deletions flow/record/adapter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,53 @@
from __future__ import annotations

__path__ = __import__("pkgutil").extend_path(__path__, __name__) # make this namespace extensible from other packages
import abc
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Iterator

Check warning on line 8 in flow/record/adapter/__init__.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/__init__.py#L8

Added line #L8 was not covered by tests

def with_metaclass(meta, *bases):
"""Create a base class with a metaclass. Python 2 and 3 compatible."""

# This requires a bit of explanation: the basic idea is to make a dummy
# metaclass for one level of class instantiation that replaces itself with
# the actual metaclass.
class metaclass(type):
def __new__(cls, name, this_bases, d):
return meta(name, bases, d)

@classmethod
def __prepare__(cls, name, this_bases):
return meta.__prepare__(name, bases)

return type.__new__(metaclass, "temporary_class", (), {})
from flow.record.base import Record

Check warning on line 10 in flow/record/adapter/__init__.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/__init__.py#L10

Added line #L10 was not covered by tests


class AbstractWriter(with_metaclass(abc.ABCMeta, object)):
class AbstractWriter(metaclass=abc.ABCMeta):
@abc.abstractmethod
def write(self, rec):
def write(self, rec: Record) -> None:
"""Write a record."""
raise NotImplementedError

@abc.abstractmethod
def flush(self):
def flush(self) -> None:
"""Flush any buffered writes."""
raise NotImplementedError

@abc.abstractmethod
def close(self):
def close(self) -> None:
"""Close the Writer, no more writes will be possible."""
raise NotImplementedError

def __del__(self):
def __del__(self) -> None:
self.close()

def __enter__(self):
def __enter__(self) -> AbstractWriter: # noqa: PYI034
return self

def __exit__(self, *args):
def __exit__(self, *args) -> None:
self.flush()
self.close()


class AbstractReader(with_metaclass(abc.ABCMeta, object)):
class AbstractReader(metaclass=abc.ABCMeta):
@abc.abstractmethod
def __iter__(self):
def __iter__(self) -> Iterator[Record]:
"""Return a record iterator."""
raise NotImplementedError

def close(self):
def close(self) -> None: # noqa: B027
"""Close the Reader, can be overriden to properly free resources."""
pass

def __enter__(self):
def __enter__(self) -> AbstractReader: # noqa: PYI034
return self

def __exit__(self, *args):
def __exit__(self, *args) -> None:
self.close()
17 changes: 12 additions & 5 deletions flow/record/adapter/archive.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from flow.record.adapter import AbstractReader, AbstractWriter
from flow.record.stream import RecordArchiver

if TYPE_CHECKING:
from flow.record.base import Record

Check warning on line 9 in flow/record/adapter/archive.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/archive.py#L9

Added line #L9 was not covered by tests

__usage__ = """
Record archiver adapter, writes records to YYYY/mm/dd directories (writer only)
---
Expand All @@ -12,27 +19,27 @@
class ArchiveWriter(AbstractWriter):
writer = None

def __init__(self, path, **kwargs):
def __init__(self, path: str, **kwargs):
self.path = path

path_template = kwargs.get("path_template")
name = kwargs.get("name")

self.writer = RecordArchiver(self.path, path_template=path_template, name=name)

def write(self, r):
def write(self, r: Record) -> None:
self.writer.write(r)

def flush(self):
def flush(self) -> None:
# RecordArchiver already flushes after every write
pass

def close(self):
def close(self) -> None:
if self.writer:
self.writer.close()
self.writer = None


class ArchiveReader(AbstractReader):
def __init__(self, path, **kwargs):
def __init__(self, path: str, **kwargs):
raise NotImplementedError
33 changes: 18 additions & 15 deletions flow/record/adapter/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
from datetime import datetime, timedelta, timezone
from importlib.util import find_spec
from typing import Any, Iterator
from typing import TYPE_CHECKING, Any, BinaryIO

import fastavro

Expand All @@ -12,6 +12,10 @@
from flow.record.selector import make_selector
from flow.record.utils import is_stdout

if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path

Check warning on line 17 in flow/record/adapter/avro.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/avro.py#L16-L17

Added lines #L16 - L17 were not covered by tests

__usage__ = """
Apache AVRO adapter
---
Expand Down Expand Up @@ -52,7 +56,7 @@
fp = None
writer = None

def __init__(self, path, key=None, **kwargs):
def __init__(self, path: str | Path | BinaryIO, **kwargs):
self.fp = record.open_path_or_stream(path, "wb")

self.desc = None
Expand All @@ -69,11 +73,11 @@
self.writer = fastavro.write.Writer(self.fp, self.parsed_schema, codec=self.codec)

if self.desc != r._desc:
raise Exception("Mixed record types")
raise ValueError("Mixed record types")

Check warning on line 76 in flow/record/adapter/avro.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/avro.py#L76

Added line #L76 was not covered by tests

self.writer.write(r._packdict())

def flush(self):
def flush(self) -> None:
if not self.writer:
self.writer = fastavro.write.Writer(
self.fp,
Expand All @@ -92,21 +96,21 @@
class AvroReader(AbstractReader):
fp = None

def __init__(self, path, selector=None, **kwargs):
def __init__(self, path: str, selector: str | None = None, **kwargs):
self.fp = record.open_path_or_stream(path, "rb")
self.selector = make_selector(selector)

self.reader = fastavro.reader(self.fp)
self.schema = self.reader.writer_schema
if not self.schema:
raise Exception("Missing Avro schema")
raise ValueError("Missing Avro schema")

Check warning on line 106 in flow/record/adapter/avro.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/avro.py#L106

Added line #L106 was not covered by tests

self.desc = schema_to_descriptor(self.schema)

# Store the fieldnames that are of type "datetime"
self.datetime_fields = set(
self.datetime_fields = {
name for name, field in self.desc.get_all_fields().items() if field.typename == "datetime"
)
}

def __iter__(self) -> Iterator[record.Record]:
for obj in self.reader:
Expand Down Expand Up @@ -149,7 +153,7 @@
else:
avro_type = AVRO_TYPE_MAP.get(field_type)
if not avro_type:
raise Exception("Unsupported Avro type: {}".format(field_type))
raise ValueError(f"Unsupported Avro type: {field_type}")

Check warning on line 156 in flow/record/adapter/avro.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/avro.py#L156

Added line #L156 was not covered by tests

field_schema["type"] = [avro_type, "null"]

Expand Down Expand Up @@ -190,16 +194,15 @@
if isinstance(t, dict):
if t.get("type") == "array":
item_type = avro_type_to_flow_type(t.get("items"))
return "{}[]".format(item_type)
else:
logical_type = t.get("logicalType")
if logical_type and ("time" in logical_type or "date" in logical_type):
return "datetime"
return f"{item_type}[]"
logical_type = t.get("logicalType")
if logical_type and ("time" in logical_type or "date" in logical_type):
return "datetime"

Check warning on line 200 in flow/record/adapter/avro.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/avro.py#L197-L200

Added lines #L197 - L200 were not covered by tests

if t == "null":
continue

if t in RECORD_TYPE_MAP:
return RECORD_TYPE_MAP[t]

raise TypeError("Can't map avro type to flow type: {}".format(t))
raise TypeError(f"Can't map avro type to flow type: {t}")

Check warning on line 208 in flow/record/adapter/avro.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/avro.py#L208

Added line #L208 was not covered by tests
24 changes: 16 additions & 8 deletions flow/record/adapter/broker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from flow.broker import Publisher, Subscriber
from __future__ import annotations

Check warning on line 1 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L1

Added line #L1 was not covered by tests

from typing import TYPE_CHECKING

Check warning on line 3 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L3

Added line #L3 was not covered by tests

from flow.broker import Publisher, Subscriber

Check warning on line 5 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L5

Added line #L5 was not covered by tests
from flow.record.adapter import AbstractReader, AbstractWriter

if TYPE_CHECKING:
from collections.abc import Iterator

Check warning on line 9 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L8-L9

Added lines #L8 - L9 were not covered by tests

from flow.record.base import Record

Check warning on line 11 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L11

Added line #L11 was not covered by tests

__usage__ = """
PubSub adapter using flow.broker
---
Expand All @@ -13,23 +21,23 @@
class BrokerWriter(AbstractWriter):
publisher = None

def __init__(self, uri, source=None, classification=None, **kwargs):
def __init__(self, uri: str, source: str | None = None, classification: str | None = None, **kwargs):

Check warning on line 24 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L24

Added line #L24 was not covered by tests
self.publisher = Publisher(uri, **kwargs)
self.source = source
self.classification = classification

def write(self, r):
def write(self, r: Record) -> None:

Check warning on line 29 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L29

Added line #L29 was not covered by tests
record = r._replace(
_source=self.source or r._source,
_classification=self.classification or r._classification,
)
self.publisher.send(record)

def flush(self):
def flush(self) -> None:

Check warning on line 36 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L36

Added line #L36 was not covered by tests
if self.publisher:
self.publisher.flush()

def close(self):
def close(self) -> None:

Check warning on line 40 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L40

Added line #L40 was not covered by tests
if self.publisher:
if hasattr(self.publisher, "stop"):
# Requires flow.broker >= 1.1.1
Expand All @@ -42,14 +50,14 @@
class BrokerReader(AbstractReader):
subscriber = None

def __init__(self, uri, name=None, selector=None, **kwargs):
def __init__(self, uri: str, name: str | None = None, selector: str | None = None, **kwargs):

Check warning on line 53 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L53

Added line #L53 was not covered by tests
self.subscriber = Subscriber(uri, **kwargs)
self.subscription = self.subscriber.select(name, str(selector))

def __iter__(self):
def __iter__(self) -> Iterator[Record]:

Check warning on line 57 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L57

Added line #L57 was not covered by tests
return iter(self.subscription)

def close(self):
def close(self) -> None:

Check warning on line 60 in flow/record/adapter/broker.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/broker.py#L60

Added line #L60 was not covered by tests
if self.subscriber:
self.subscriber.stop()
self.subscriber = None
Loading
Loading