Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions flow/record/adapter/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import queue
import threading
from contextlib import suppress
from typing import TYPE_CHECKING

import urllib3
Expand Down Expand Up @@ -79,6 +80,8 @@
http_compress = boolean_argument(http_compress)
self.hash_record = boolean_argument(hash_record)
queue_size = int(queue_size)
request_timeout = int(request_timeout)
self.max_retries = int(max_retries)

if not uri.lower().startswith(("http://", "https://")):
uri = "http://" + uri
Expand All @@ -95,7 +98,7 @@
api_key=api_key,
request_timeout=request_timeout,
retry_on_timeout=True,
max_retries=max_retries,
max_retries=self.max_retries,
)

self.json_packer = JsonRecordPacker()
Expand All @@ -113,10 +116,9 @@
self.metadata_fields[arg_key[6:]] = arg_val

def excepthook(self, exc: threading.ExceptHookArgs, *args, **kwargs) -> None:
log.error("Exception in thread: %s", exc)
self.exception = getattr(exc, "exc_value", exc)
self.exception = enrich_elastic_exception(self.exception)

Check warning on line 120 in flow/record/adapter/elastic.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/elastic.py#L120

Added line #L120 was not covered by tests
self.event.set()
self.close()

def record_to_document(self, record: Record, index: str) -> dict:
"""Convert a record to a Elasticsearch compatible document dictionary"""
Expand Down Expand Up @@ -169,13 +171,13 @@
- https://elasticsearch-py.readthedocs.io/en/v8.17.1/helpers.html#elasticsearch.helpers.streaming_bulk
- https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/helpers/actions.py#L362
"""

for _ok, _item in elasticsearch.helpers.streaming_bulk(
self.es,
self.document_stream(),
raise_on_error=True,
raise_on_exception=True,
# Some settings have to be redefined because streaming_bulk does not inherit them from the self.es instance.
max_retries=3,
max_retries=self.max_retries,
):
pass

Expand All @@ -191,13 +193,17 @@
pass

def close(self) -> None:
self.queue.put(StopIteration)
self.event.wait()
if hasattr(self, "queue"):
self.queue.put(StopIteration)

if hasattr(self, "event"):
self.event.wait()

if hasattr(self, "es"):
self.es.close()
with suppress(Exception):
self.es.close()

if self.exception:
if hasattr(self, "exception") and self.exception:
raise self.exception


Expand All @@ -219,6 +225,8 @@
self.selector = selector
verify_certs = boolean_argument(verify_certs)
http_compress = boolean_argument(http_compress)
request_timeout = int(request_timeout)
max_retries = int(max_retries)

Check warning on line 229 in flow/record/adapter/elastic.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/elastic.py#L228-L229

Added lines #L228 - L229 were not covered by tests

if not uri.lower().startswith(("http://", "https://")):
uri = "http://" + uri
Expand Down Expand Up @@ -253,3 +261,32 @@
def close(self) -> None:
if hasattr(self, "es"):
self.es.close()


def enrich_elastic_exception(exception: Exception) -> Exception:
"""Extend the exception with error information from Elastic.

Resources:
- https://elasticsearch-py.readthedocs.io/en/v8.17.1/exceptions.html
"""
errors = set()
if hasattr(exception, "errors"):
try:
for error in exception.errors:
index_dict = error.get("index", {})
status = index_dict.get("status")
error_dict = index_dict.get("error", {})
error_type = error_dict.get("type")
error_reason = error_dict.get("reason", "")

Check warning on line 280 in flow/record/adapter/elastic.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/elastic.py#L272-L280

Added lines #L272 - L280 were not covered by tests

errors.add(f"({status} {error_type} {error_reason})")
except Exception:
errors.add("unable to extend errors")

Check warning on line 284 in flow/record/adapter/elastic.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/elastic.py#L282-L284

Added lines #L282 - L284 were not covered by tests

# append errors to original exception message
error_str = ", ".join(errors)
original_message = exception.args[0] if exception.args else ""
new_message = f"{original_message} {error_str}"
exception.args = (new_message,) + exception.args[1:]

Check warning on line 290 in flow/record/adapter/elastic.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/elastic.py#L287-L290

Added lines #L287 - L290 were not covered by tests

return exception

Check warning on line 292 in flow/record/adapter/elastic.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/elastic.py#L292

Added line #L292 was not covered by tests
2 changes: 1 addition & 1 deletion flow/record/adapter/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
[SSL_VERIFY]: Whether to verify the server certificate when sending data over HTTPS. Defaults to True.
"""

log = logging.getLogger(__package__)
log = logging.getLogger(__name__)

# Amount of records to bundle into a single request when sending data over HTTP(S).
RECORD_BUFFER_LIMIT = 20
Expand Down
2 changes: 1 addition & 1 deletion flow/record/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@

from flow.record.adapter import AbstractReader, AbstractWriter

log = logging.getLogger(__package__)
log = logging.getLogger(__name__)
_utcnow = functools.partial(datetime.now, timezone.utc)

RECORD_VERSION = 1
Expand Down
2 changes: 1 addition & 1 deletion flow/record/jsonpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from flow.record.exceptions import RecordDescriptorNotFound
from flow.record.utils import EventHandler

log = logging.getLogger(__package__)
log = logging.getLogger(__name__)


class JsonRecordPacker:
Expand Down
23 changes: 17 additions & 6 deletions flow/record/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
from flow.record.fieldtypes import fieldtype_for_value
from flow.record.packer import RecordPacker
from flow.record.selector import make_selector
from flow.record.utils import is_stdout
from flow.record.utils import LOGGING_TRACE_LEVEL, is_stdout

if TYPE_CHECKING:
from collections.abc import Iterator

from flow.record.adapter import AbstractWriter

log = logging.getLogger(__package__)
log = logging.getLogger(__name__)

aRepr = reprlib.Repr()
aRepr.maxother = 255
Expand Down Expand Up @@ -146,8 +146,11 @@
def record_stream(sources: list[str], selector: str | None = None) -> Iterator[Record]:
"""Return a Record stream generator from the given Record sources.

Exceptions in a Record source will be caught so the stream is not interrupted.
If there are multiple sources, exceptions are caught and logged, and the stream continues with the next source.
"""

trace = log.isEnabledFor(LOGGING_TRACE_LEVEL)

log.debug("Record stream with selector: %r", selector)
for src in sources:
# Inform user that we are reading from stdin
Expand All @@ -161,12 +164,20 @@
yield from reader
reader.close()
except IOError as e:
log.exception("%s(%r): %s", reader, src, e) # noqa: TRY401
if len(sources) == 1:
raise

Check warning on line 168 in flow/record/stream.py

View check run for this annotation

Codecov / codecov/patch

flow/record/stream.py#L167-L168

Added lines #L167 - L168 were not covered by tests
else:
log.error("%s(%r): %s", reader, src, e)
if trace:
log.exception("Full traceback")

Check warning on line 172 in flow/record/stream.py

View check run for this annotation

Codecov / codecov/patch

flow/record/stream.py#L170-L172

Added lines #L170 - L172 were not covered by tests
except KeyboardInterrupt:
raise
except Exception as e:
log.warning("Exception in %r for %r: %s -- skipping to next reader", reader, src, aRepr.repr(e))
continue
if len(sources) == 1:
raise

Check warning on line 177 in flow/record/stream.py

View check run for this annotation

Codecov / codecov/patch

flow/record/stream.py#L176-L177

Added lines #L176 - L177 were not covered by tests
else:
log.warning("Exception in %r for %r: %s -- skipping to next reader", reader, src, aRepr.repr(e))
continue

Check warning on line 180 in flow/record/stream.py

View check run for this annotation

Codecov / codecov/patch

flow/record/stream.py#L179-L180

Added lines #L179 - L180 were not covered by tests


class PathTemplateWriter:
Expand Down
66 changes: 60 additions & 6 deletions flow/record/tools/rdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from flow.record import RecordWriter, iter_timestamped_records, record_stream
from flow.record.selector import make_selector
from flow.record.stream import RecordFieldRewriter
from flow.record.utils import catch_sigpipe
from flow.record.utils import LOGGING_TRACE_LEVEL, catch_sigpipe

try:
from flow.record.version import version
Expand All @@ -30,6 +30,15 @@
except ImportError:
HAS_TQDM = False

try:
import structlog

HAS_STRUCTLOG = True

except ImportError:
HAS_STRUCTLOG = False

Check warning on line 39 in flow/record/tools/rdump.py

View check run for this annotation

Codecov / codecov/patch

flow/record/tools/rdump.py#L38-L39

Added lines #L38 - L39 were not covered by tests


log = logging.getLogger(__name__)


Expand Down Expand Up @@ -129,6 +138,11 @@
action="store_true",
help="Show progress bar (requires tqdm)",
)
output.add_argument(
"--stats",
action="store_true",
help="Show count of processed records",
)

advanced = parser.add_argument_group("advanced")
advanced.add_argument(
Expand Down Expand Up @@ -195,10 +209,30 @@

args = parser.parse_args(argv)

levels = [logging.WARNING, logging.INFO, logging.DEBUG]
levels = [logging.WARNING, logging.INFO, logging.DEBUG, LOGGING_TRACE_LEVEL]
level = levels[min(len(levels) - 1, args.verbose)]
logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s")

if HAS_STRUCTLOG:
# We have structlog, configure Python logging to use it for rendering
console_renderer = structlog.dev.ConsoleRenderer()
handler = logging.StreamHandler()
handler.setFormatter(
structlog.stdlib.ProcessorFormatter(
processor=console_renderer,
foreign_pre_chain=[
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
],
)
)

# Clear existing handlers and add our structlog handler
root_logger = logging.getLogger()
root_logger.handlers.clear()
root_logger.addHandler(handler)

fields_to_exclude = args.exclude.split(",") if args.exclude else []
fields = args.fields.split(",") if args.fields else []

Expand Down Expand Up @@ -252,6 +286,7 @@

count = 0
record_writer = None
ret = 0

try:
record_writer = RecordWriter(uri)
Expand Down Expand Up @@ -279,14 +314,33 @@
else:
record_writer.write(rec)

except Exception as e:
print_error(e)

Check warning on line 318 in flow/record/tools/rdump.py

View check run for this annotation

Codecov / codecov/patch

flow/record/tools/rdump.py#L317-L318

Added lines #L317 - L318 were not covered by tests

# Prevent throwing an exception twice when deconstructing the record writer.
if hasattr(record_writer, "exception") and record_writer.exception is e:
record_writer.exception = None

Check warning on line 322 in flow/record/tools/rdump.py

View check run for this annotation

Codecov / codecov/patch

flow/record/tools/rdump.py#L321-L322

Added lines #L321 - L322 were not covered by tests

ret = 1

Check warning on line 324 in flow/record/tools/rdump.py

View check run for this annotation

Codecov / codecov/patch

flow/record/tools/rdump.py#L324

Added line #L324 was not covered by tests

finally:
if record_writer:
record_writer.__exit__()
# Exceptions raised in threads can be thrown when deconstructing the writer.
try:
record_writer.__exit__()
except Exception as e:
print_error(e)

Check warning on line 332 in flow/record/tools/rdump.py

View check run for this annotation

Codecov / codecov/patch

flow/record/tools/rdump.py#L331-L332

Added lines #L331 - L332 were not covered by tests

if (args.list or args.stats) and not args.progress:
print(f"Processed {count} records", file=sys.stdout if args.list else sys.stderr)

return ret

if args.list:
print(f"Processed {count} records")

return 0
def print_error(e: Exception) -> None:
log.error("rdump encountered a fatal error: %s", e)
if log.isEnabledFor(LOGGING_TRACE_LEVEL):
log.exception("Full traceback")

Check warning on line 343 in flow/record/tools/rdump.py

View check run for this annotation

Codecov / codecov/patch

flow/record/tools/rdump.py#L341-L343

Added lines #L341 - L343 were not covered by tests


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions flow/record/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from functools import wraps
from typing import Any, BinaryIO, Callable, TextIO

LOGGING_TRACE_LEVEL = 5


def get_stdout(binary: bool = False) -> TextIO | BinaryIO:
"""Return the stdout stream as binary or text stream.
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ test = [
"duckdb; platform_python_implementation != 'PyPy' and python_version < '3.12'", # duckdb
"pytz; platform_python_implementation != 'PyPy' and python_version < '3.12'", # duckdb
"tqdm",
"structlog",
]
full = [
"flow.record[compression]",
"tqdm",
"structlog",
]

[project.scripts]
Expand Down Expand Up @@ -120,7 +122,7 @@ select = [
"FURB",
"RUF",
]
ignore = ["E203", "B904", "UP024", "ANN002", "ANN003", "ANN204", "ANN401", "SIM105", "TRY003"]
ignore = ["E203", "B904", "UP024", "ANN002", "ANN003", "ANN204", "ANN401", "SIM105", "TRY003", "TRY400"]

[tool.ruff.lint.per-file-ignores]
"tests/docs/**" = ["INP001"]
Expand Down
1 change: 0 additions & 1 deletion tests/test_rdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,4 +720,3 @@ def test_rdump_list_progress(tmp_path: Path, capsys: pytest.CaptureFixture) -> N

# stdout should contain the RecordDescriptor definition and count
assert "# <RecordDescriptor test/rdump/progress, hash=eeb21156>" in captured.out
assert "Processed 100 records" in captured.out
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ package = skip
deps =
ruff==0.9.2
commands =
ruff check --fix flow tests
ruff format flow tests

[testenv:lint]
Expand Down