From 3d7154767e85d1c0fca00443be032512baea3832 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Mon, 3 Mar 2025 11:42:56 +0100 Subject: [PATCH 01/13] improve exception logging --- flow/record/adapter/elastic.py | 48 +++++++++++++++++++++++------ flow/record/stream.py | 13 ++++++-- flow/record/tools/rdump.py | 56 +++++++++++++++++++++++++++++++--- pyproject.toml | 3 +- 4 files changed, 102 insertions(+), 18 deletions(-) diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index 6a74bf28..5b427e17 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -76,6 +76,8 @@ def __init__( http_compress = str(http_compress).lower() in ("1", "true") self.hash_record = str(hash_record).lower() in ("1", "true") queue_size = int(queue_size) + request_timeout = int(request_timeout) + self.max_retries = int(max_retries) if not uri.lower().startswith(("http://", "https://")): uri = "http://" + uri @@ -92,7 +94,7 @@ def __init__( api_key=api_key, request_timeout=request_timeout, retry_on_timeout=True, - max_retries=max_retries, + max_retries=self.max_retries, ) self.json_packer = JsonRecordPacker() @@ -112,10 +114,29 @@ def __init__( self.metadata_fields[arg_key[6:]] = arg_val def excepthook(self, exc: threading.ExceptHookArgs, *args, **kwargs) -> None: - log.error("Exception in thread: %s", exc) self.exception = getattr(exc, "exc_value", exc) + + # Extend the exception with error information from Elastic. + # https://elasticsearch-py.readthedocs.io/en/v8.17.1/exceptions.html + if hasattr(self.exception, "errors"): + try: + unique_errors = set() + for error in self.exception.errors: + i = error.get("index", {}) + unique_errors.add( + f"({i.get('status')} {i.get('error', {}).get('type')} {i.get('error', {}).get('reason')})" + ) + except Exception: + unique_errors = ["unable to extend errors"] + self.exception.args = (f"{self.exception.args[0]} {', '.join(unique_errors)}",) + self.exception.args[1:] + self.event.set() - self.close() + + # Close will raise the exception we just set, but we want the rdump process to catch it instead. + try: + self.close() + except Exception: + pass def record_to_document(self, record: Record, index: str) -> dict: """Convert a record to a Elasticsearch compatible document dictionary""" @@ -168,13 +189,14 @@ def streaming_bulk_thread(self) -> None: - https://elasticsearch-py.readthedocs.io/en/v8.17.1/helpers.html#elasticsearch.helpers.streaming_bulk - https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/helpers/actions.py#L362 """ + logging.getLogger("elastic_transport").setLevel(logging.CRITICAL) + for _ok, _item in elasticsearch.helpers.streaming_bulk( self.es, self.document_stream(), raise_on_error=True, raise_on_exception=True, - # Some settings have to be redefined because streaming_bulk does not inherit them from the self.es instance. - max_retries=3, + max_retries=self.max_retries, ): pass @@ -190,13 +212,19 @@ def flush(self) -> None: pass def close(self) -> None: - self.queue.put(StopIteration) - self.event.wait() + if hasattr(self, "queue"): + self.queue.put(StopIteration) + + if hasattr(self, "event"): + self.event.wait() if hasattr(self, "es"): - self.es.close() + try: + self.es.close() + except Exception: + pass - if self.exception: + if hasattr(self, "exception") and self.exception: raise self.exception @@ -218,6 +246,8 @@ def __init__( self.selector = selector verify_certs = str(verify_certs).lower() in ("1", "true") http_compress = str(http_compress).lower() in ("1", "true") + request_timeout = int(request_timeout) + max_retries = int(max_retries) if not uri.lower().startswith(("http://", "https://")): uri = "http://" + uri diff --git a/flow/record/stream.py b/flow/record/stream.py index d2f77fe9..81727b8a 100644 --- a/flow/record/stream.py +++ b/flow/record/stream.py @@ -161,12 +161,19 @@ def record_stream(sources: list[str], selector: str | None = None) -> Iterator[R yield from reader reader.close() except IOError as e: - log.exception("%s(%r): %s", reader, src, e) # noqa: TRY401 + if len(sources) == 1: + raise + else: + log.error("%s(%r): %s", reader, src, e) + log.debug("", exc_info=e) except KeyboardInterrupt: raise except Exception as e: - log.warning("Exception in %r for %r: %s -- skipping to next reader", reader, src, aRepr.repr(e)) - continue + if len(sources) == 1: + raise + else: + log.warning("Exception in %r for %r: %s -- skipping to next reader", reader, src, aRepr.repr(e)) + continue class PathTemplateWriter: diff --git a/flow/record/tools/rdump.py b/flow/record/tools/rdump.py index f9759355..5bcb2b0d 100644 --- a/flow/record/tools/rdump.py +++ b/flow/record/tools/rdump.py @@ -29,7 +29,15 @@ except ImportError: HAS_TQDM = False -log = logging.getLogger(__name__) +try: + import structlog + + log = structlog.get_logger(__name__) + HAS_STRUCTLOG = True + +except ImportError: + log = logging.getLogger(__name__) + HAS_STRUCTLOG = False def list_adapters() -> None: @@ -126,6 +134,11 @@ def main(argv: list[str] | None = None) -> int: action="store_true", help="Show progress bar (requires tqdm)", ) + output.add_argument( + "--stats", + action="store_true", + help="Show count of processed records", + ) advanced = parser.add_argument_group("advanced") advanced.add_argument( @@ -185,7 +198,20 @@ def main(argv: list[str] | None = None) -> int: levels = [logging.WARNING, logging.INFO, logging.DEBUG] level = levels[min(len(levels) - 1, args.verbose)] - logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s") + + if HAS_STRUCTLOG: + structlog.configure( + wrapper_class=structlog.make_filtering_bound_logger(level), + processors=( + [ + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.dev.ConsoleRenderer(colors=True, pad_event=10), + ] + ), + ) + else: + logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s") fields_to_exclude = args.exclude.split(",") if args.exclude else [] fields = args.fields.split(",") if args.fields else [] @@ -239,6 +265,7 @@ def main(argv: list[str] | None = None) -> int: count = 0 record_writer = None + ret = 0 try: record_writer = RecordWriter(uri) @@ -266,14 +293,33 @@ def main(argv: list[str] | None = None) -> int: else: record_writer.write(rec) + except Exception as e: + print_error(e) + + # Prevent throwing an exception twice when deconstructing the record writer. + if hasattr(record_writer, "exception") and record_writer.exception is e: + record_writer.exception = None + + ret = 1 + finally: if record_writer: - record_writer.__exit__() - if args.list: + # Exceptions raised in threads can be thrown when deconstructing the writer. + try: + record_writer.__exit__() + except Exception as e: + print_error(e) + + if (args.list or args.stats) and not args.progress: print(f"Processed {count} records") - return 0 + return ret + + +def print_error(e: Exception) -> None: + log.error("rdump encountered a fatal error: %s", e) + log.debug("", exc_info=e) if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 9776758a..1155839d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,7 @@ test = [ full = [ "flow.record[compression]", "tqdm", + "structlog", ] [project.scripts] @@ -116,7 +117,7 @@ select = [ "FURB", "RUF", ] -ignore = ["E203", "B904", "UP024", "ANN002", "ANN003", "ANN204", "ANN401", "SIM105", "TRY003"] +ignore = ["E203", "B904", "UP024", "ANN002", "ANN003", "ANN204", "ANN401", "SIM105", "TRY003", "TRY400"] [tool.ruff.lint.per-file-ignores] "tests/docs/**" = ["INP001"] From 4482acaaa1c971ae499c6168d971a12ce7db8679 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Mon, 3 Mar 2025 12:14:00 +0100 Subject: [PATCH 02/13] fix tests --- flow/record/tools/rdump.py | 1 - pyproject.toml | 1 + tests/test_rdump.py | 5 ++--- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/flow/record/tools/rdump.py b/flow/record/tools/rdump.py index 5bcb2b0d..8f108f8f 100644 --- a/flow/record/tools/rdump.py +++ b/flow/record/tools/rdump.py @@ -304,7 +304,6 @@ def main(argv: list[str] | None = None) -> int: finally: if record_writer: - # Exceptions raised in threads can be thrown when deconstructing the writer. try: record_writer.__exit__() diff --git a/pyproject.toml b/pyproject.toml index 1155839d..31ce9092 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ test = [ "duckdb; platform_python_implementation != 'PyPy' and python_version < '3.12'", # duckdb "pytz; platform_python_implementation != 'PyPy' and python_version < '3.12'", # duckdb "tqdm", + "structlog", ] full = [ "flow.record[compression]", diff --git a/tests/test_rdump.py b/tests/test_rdump.py index 783855a2..9dcb2b18 100644 --- a/tests/test_rdump.py +++ b/tests/test_rdump.py @@ -72,8 +72,8 @@ def test_rdump_pipe(tmp_path: Path) -> None: stderr=subprocess.PIPE, ) stdout, stderr = p2.communicate() - assert stdout.strip() == b"" - assert b"Are you perhaps entering record text, rather than a record stream?" in stderr.strip() + assert stderr.strip() == b"[reading from stdin]" + assert b"Are you perhaps entering record text, rather than a record stream?" in stdout.strip() # rdump test.records -w - | rdump -s 'r.count in (1, 3, 9)' -w filtered.records path2 = tmp_path / "filtered.records" @@ -721,4 +721,3 @@ def test_rdump_list_progress(tmp_path: Path, capsys: pytest.CaptureFixture) -> N # stdout should contain the RecordDescriptor definition and count assert "# " in captured.out - assert "Processed 100 records" in captured.out From 26601c12b36743406950a128143fe05e7b4b30ad Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 5 Mar 2025 11:05:12 +0100 Subject: [PATCH 03/13] make structlog use stderr just like cpython logger --- flow/record/tools/rdump.py | 1 + tests/test_rdump.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flow/record/tools/rdump.py b/flow/record/tools/rdump.py index 8f108f8f..ded4b694 100644 --- a/flow/record/tools/rdump.py +++ b/flow/record/tools/rdump.py @@ -202,6 +202,7 @@ def main(argv: list[str] | None = None) -> int: if HAS_STRUCTLOG: structlog.configure( wrapper_class=structlog.make_filtering_bound_logger(level), + logger_factory=structlog.PrintLoggerFactory(file=sys.stderr), processors=( [ structlog.stdlib.add_log_level, diff --git a/tests/test_rdump.py b/tests/test_rdump.py index 9dcb2b18..781e1566 100644 --- a/tests/test_rdump.py +++ b/tests/test_rdump.py @@ -72,8 +72,8 @@ def test_rdump_pipe(tmp_path: Path) -> None: stderr=subprocess.PIPE, ) stdout, stderr = p2.communicate() - assert stderr.strip() == b"[reading from stdin]" - assert b"Are you perhaps entering record text, rather than a record stream?" in stdout.strip() + assert stdout.strip() == b"" + assert b"Are you perhaps entering record text, rather than a record stream?" in stderr.strip() # rdump test.records -w - | rdump -s 'r.count in (1, 3, 9)' -w filtered.records path2 = tmp_path / "filtered.records" From 62a9092eb621a5ff07bbd45ffd62b199c7b047cc Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 5 Mar 2025 16:30:21 +0100 Subject: [PATCH 04/13] implement review feedback --- flow/record/adapter/elastic.py | 52 ++++++++++++++++++---------------- flow/record/stream.py | 5 ++-- flow/record/tools/rdump.py | 4 +-- 3 files changed, 32 insertions(+), 29 deletions(-) diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index 5b427e17..291a423a 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -1,5 +1,6 @@ from __future__ import annotations +from contextlib import suppress import hashlib import logging import queue @@ -115,29 +116,9 @@ def __init__( def excepthook(self, exc: threading.ExceptHookArgs, *args, **kwargs) -> None: self.exception = getattr(exc, "exc_value", exc) - - # Extend the exception with error information from Elastic. - # https://elasticsearch-py.readthedocs.io/en/v8.17.1/exceptions.html - if hasattr(self.exception, "errors"): - try: - unique_errors = set() - for error in self.exception.errors: - i = error.get("index", {}) - unique_errors.add( - f"({i.get('status')} {i.get('error', {}).get('type')} {i.get('error', {}).get('reason')})" - ) - except Exception: - unique_errors = ["unable to extend errors"] - self.exception.args = (f"{self.exception.args[0]} {', '.join(unique_errors)}",) + self.exception.args[1:] - + self.exception = enrich_elastic_exception(self.exception) self.event.set() - # Close will raise the exception we just set, but we want the rdump process to catch it instead. - try: - self.close() - except Exception: - pass - def record_to_document(self, record: Record, index: str) -> dict: """Convert a record to a Elasticsearch compatible document dictionary""" rdict = record._asdict() @@ -189,7 +170,6 @@ def streaming_bulk_thread(self) -> None: - https://elasticsearch-py.readthedocs.io/en/v8.17.1/helpers.html#elasticsearch.helpers.streaming_bulk - https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/helpers/actions.py#L362 """ - logging.getLogger("elastic_transport").setLevel(logging.CRITICAL) for _ok, _item in elasticsearch.helpers.streaming_bulk( self.es, @@ -219,10 +199,8 @@ def close(self) -> None: self.event.wait() if hasattr(self, "es"): - try: + with suppress(Exception): self.es.close() - except Exception: - pass if hasattr(self, "exception") and self.exception: raise self.exception @@ -284,3 +262,27 @@ def __iter__(self) -> Iterator[Record]: def close(self) -> None: if hasattr(self, "es"): self.es.close() + + +def enrich_elastic_exception(exception: Exception) -> Exception: + """Extend the exception with error information from Elastic. + + Resources: + - https://elasticsearch-py.readthedocs.io/en/v8.17.1/exceptions.html + """ + errors = set() + if hasattr(exception, "errors"): + try: + for error in exception.errors: + i = error.get("index", {}) + status = i.get("status") + type = i.get("error", {}).get("type") + reason = i.get("error", {}).get("reason") + + errors.add(f"({status} {type} {reason})") + except Exception: + errors.add("unable to extend errors") + + exception.args = (f"{exception.args[0]} {', '.join(errors)}",) + exception.args[1:] + + return exception diff --git a/flow/record/stream.py b/flow/record/stream.py index 81727b8a..83ff4ff1 100644 --- a/flow/record/stream.py +++ b/flow/record/stream.py @@ -146,8 +146,9 @@ def __iter__(self) -> Iterator[Record]: def record_stream(sources: list[str], selector: str | None = None) -> Iterator[Record]: """Return a Record stream generator from the given Record sources. - Exceptions in a Record source will be caught so the stream is not interrupted. + If there are multiple sources, exceptions are caught and logged, and the stream continues with the next source. """ + log.debug("Record stream with selector: %r", selector) for src in sources: # Inform user that we are reading from stdin @@ -165,7 +166,7 @@ def record_stream(sources: list[str], selector: str | None = None) -> Iterator[R raise else: log.error("%s(%r): %s", reader, src, e) - log.debug("", exc_info=e) + log.debug("Full traceback", exc_info=e) except KeyboardInterrupt: raise except Exception as e: diff --git a/flow/record/tools/rdump.py b/flow/record/tools/rdump.py index ded4b694..f31a03f6 100644 --- a/flow/record/tools/rdump.py +++ b/flow/record/tools/rdump.py @@ -312,14 +312,14 @@ def main(argv: list[str] | None = None) -> int: print_error(e) if (args.list or args.stats) and not args.progress: - print(f"Processed {count} records") + print(f"Processed {count} records", file=sys.stderr) return ret def print_error(e: Exception) -> None: log.error("rdump encountered a fatal error: %s", e) - log.debug("", exc_info=e) + log.debug("Full traceback", exc_info=e) if __name__ == "__main__": From 8e401e385b8a3fe9072172e43a41e3f117f07adb Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 5 Mar 2025 16:34:31 +0100 Subject: [PATCH 05/13] fix linter --- flow/record/adapter/elastic.py | 2 +- tox.ini | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index 291a423a..5a4d69d8 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -1,10 +1,10 @@ from __future__ import annotations -from contextlib import suppress import hashlib import logging import queue import threading +from contextlib import suppress from typing import TYPE_CHECKING try: diff --git a/tox.ini b/tox.ini index 11864f38..ce04b63c 100644 --- a/tox.ini +++ b/tox.ini @@ -36,6 +36,7 @@ deps = ruff==0.9.2 commands = ruff format flow tests + ruff check --fix flow tests [testenv:lint] package = skip From 9e68edd77ad93e4c2a37481ea94914b09572c73f Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 5 Mar 2025 17:20:27 +0100 Subject: [PATCH 06/13] fix tests --- flow/record/tools/rdump.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/record/tools/rdump.py b/flow/record/tools/rdump.py index f31a03f6..177e7b87 100644 --- a/flow/record/tools/rdump.py +++ b/flow/record/tools/rdump.py @@ -312,7 +312,7 @@ def main(argv: list[str] | None = None) -> int: print_error(e) if (args.list or args.stats) and not args.progress: - print(f"Processed {count} records", file=sys.stderr) + print(f"Processed {count} records", file=sys.stdout if args.list else sys.stderr) return ret From 35b1d1c644ce5dd71c02a81051c5801ef282bd5d Mon Sep 17 00:00:00 2001 From: Computer Network Investigation <121175071+JSCU-CNI@users.noreply.github.com> Date: Thu, 6 Mar 2025 14:22:25 +0100 Subject: [PATCH 07/13] Apply suggestions from code review Co-authored-by: Yun Zheng Hu --- flow/record/adapter/elastic.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index 5a4d69d8..55a8223a 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -274,15 +274,20 @@ def enrich_elastic_exception(exception: Exception) -> Exception: if hasattr(exception, "errors"): try: for error in exception.errors: - i = error.get("index", {}) - status = i.get("status") - type = i.get("error", {}).get("type") - reason = i.get("error", {}).get("reason") + index_dict = error.get("index", {}) + status = index_dict.get("status") + error_dict = index_dict.get("error", {}) + error_type = error_dict.get("type") + error_reason = error_dict.get("reason", "") - errors.add(f"({status} {type} {reason})") + errors.add(f"({status} {error_type} {error_reason})") except Exception: errors.add("unable to extend errors") - exception.args = (f"{exception.args[0]} {', '.join(errors)}",) + exception.args[1:] + # append errors to original exeption message + error_str = ", ".join(unique_errors) + original_message = self.exception.args[0] if self.exception.args else "" + new_message = f"{original_message} {error_str}" + self.exception.args = (new_message,) + self.exception.args[1:] return exception From 2fa8e81fb88100fedafe86708c8b6da474cc7254 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Tue, 17 Jun 2025 11:06:48 +0200 Subject: [PATCH 08/13] Install structlog handler to use with existing Python logging system --- flow/record/tools/rdump.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/flow/record/tools/rdump.py b/flow/record/tools/rdump.py index 177e7b87..4f010196 100644 --- a/flow/record/tools/rdump.py +++ b/flow/record/tools/rdump.py @@ -32,14 +32,15 @@ try: import structlog - log = structlog.get_logger(__name__) HAS_STRUCTLOG = True except ImportError: - log = logging.getLogger(__name__) HAS_STRUCTLOG = False +log = logging.getLogger(__name__) + + def list_adapters() -> None: failed = [] loader = flow.record.adapter.__loader__ @@ -198,21 +199,27 @@ def main(argv: list[str] | None = None) -> int: levels = [logging.WARNING, logging.INFO, logging.DEBUG] level = levels[min(len(levels) - 1, args.verbose)] + logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s") if HAS_STRUCTLOG: - structlog.configure( - wrapper_class=structlog.make_filtering_bound_logger(level), - logger_factory=structlog.PrintLoggerFactory(file=sys.stderr), - processors=( - [ + # We have structlog, configure Python logging to use it for rendering + console_renderer = structlog.dev.ConsoleRenderer() + handler = logging.StreamHandler() + handler.setFormatter( + structlog.stdlib.ProcessorFormatter( + processor=console_renderer, + foreign_pre_chain=[ + structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.processors.TimeStamper(fmt="iso"), - structlog.dev.ConsoleRenderer(colors=True, pad_event=10), - ] - ), + ], + ) ) - else: - logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s") + + # Clear existing handlers and add our structlog handler + root_logger = logging.getLogger() + root_logger.handlers.clear() + root_logger.addHandler(handler) fields_to_exclude = args.exclude.split(",") if args.exclude else [] fields = args.fields.split(",") if args.fields else [] From 73c6bad28e862c7fa3ae1aed062e61e736f55f7c Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Tue, 17 Jun 2025 11:24:46 +0200 Subject: [PATCH 09/13] ruf --fix first then ruff format --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index ce04b63c..0e0150df 100644 --- a/tox.ini +++ b/tox.ini @@ -35,8 +35,8 @@ package = skip deps = ruff==0.9.2 commands = - ruff format flow tests ruff check --fix flow tests + ruff format flow tests [testenv:lint] package = skip From 7fd21c6ac01bcc48e44274ed58ef2f08c67dd054 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Tue, 17 Jun 2025 11:25:26 +0200 Subject: [PATCH 10/13] Fix enrich_elastic_exception method --- flow/record/adapter/elastic.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index 55a8223a..00ded5f0 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -285,9 +285,9 @@ def enrich_elastic_exception(exception: Exception) -> Exception: errors.add("unable to extend errors") # append errors to original exeption message - error_str = ", ".join(unique_errors) - original_message = self.exception.args[0] if self.exception.args else "" - new_message = f"{original_message} {error_str}" - self.exception.args = (new_message,) + self.exception.args[1:] + error_str = ", ".join(errors) + original_message = exception.args[0] if exception.args else "" + new_message = f"{original_message} {error_str}" + exception.args = (new_message,) + exception.args[1:] return exception From efea91879735dc0d027cc6d579dd896f067d1b73 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Tue, 17 Jun 2025 11:26:24 +0200 Subject: [PATCH 11/13] Use __name__ instead of __package__ for logging.getLogger() --- flow/record/adapter/splunk.py | 2 +- flow/record/base.py | 2 +- flow/record/jsonpacker.py | 2 +- flow/record/stream.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/record/adapter/splunk.py b/flow/record/adapter/splunk.py index 6b618c61..a3db661d 100644 --- a/flow/record/adapter/splunk.py +++ b/flow/record/adapter/splunk.py @@ -35,7 +35,7 @@ [SSL_VERIFY]: Whether to verify the server certificate when sending data over HTTPS. Defaults to True. """ -log = logging.getLogger(__package__) +log = logging.getLogger(__name__) # Amount of records to bundle into a single request when sending data over HTTP(S). RECORD_BUFFER_LIMIT = 20 diff --git a/flow/record/base.py b/flow/record/base.py index 47945d5c..3d6fb2b0 100644 --- a/flow/record/base.py +++ b/flow/record/base.py @@ -64,7 +64,7 @@ from flow.record.adapter import AbstractReader, AbstractWriter -log = logging.getLogger(__package__) +log = logging.getLogger(__name__) _utcnow = functools.partial(datetime.now, timezone.utc) RECORD_VERSION = 1 diff --git a/flow/record/jsonpacker.py b/flow/record/jsonpacker.py index 01857bf2..0984446e 100644 --- a/flow/record/jsonpacker.py +++ b/flow/record/jsonpacker.py @@ -11,7 +11,7 @@ from flow.record.exceptions import RecordDescriptorNotFound from flow.record.utils import EventHandler -log = logging.getLogger(__package__) +log = logging.getLogger(__name__) class JsonRecordPacker: diff --git a/flow/record/stream.py b/flow/record/stream.py index 83ff4ff1..dee566cf 100644 --- a/flow/record/stream.py +++ b/flow/record/stream.py @@ -22,7 +22,7 @@ from flow.record.adapter import AbstractWriter -log = logging.getLogger(__package__) +log = logging.getLogger(__name__) aRepr = reprlib.Repr() aRepr.maxother = 255 From 0fc998a7f66cf7730c33b4d1f26adfae3275c6c5 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Tue, 17 Jun 2025 13:14:33 +0200 Subject: [PATCH 12/13] Add one extra logging level called LOGGING_TRACE_LEVEL Some exception tracebacks are only shown when using this log level. -v = INFO -vv = DEBUG -vvv = TRACE --- flow/record/stream.py | 7 +++++-- flow/record/tools/rdump.py | 7 ++++--- flow/record/utils.py | 2 ++ 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/flow/record/stream.py b/flow/record/stream.py index dee566cf..4fa665f1 100644 --- a/flow/record/stream.py +++ b/flow/record/stream.py @@ -15,7 +15,7 @@ from flow.record.fieldtypes import fieldtype_for_value from flow.record.packer import RecordPacker from flow.record.selector import make_selector -from flow.record.utils import is_stdout +from flow.record.utils import LOGGING_TRACE_LEVEL, is_stdout if TYPE_CHECKING: from collections.abc import Iterator @@ -149,6 +149,8 @@ def record_stream(sources: list[str], selector: str | None = None) -> Iterator[R If there are multiple sources, exceptions are caught and logged, and the stream continues with the next source. """ + trace = log.isEnabledFor(LOGGING_TRACE_LEVEL) + log.debug("Record stream with selector: %r", selector) for src in sources: # Inform user that we are reading from stdin @@ -166,7 +168,8 @@ def record_stream(sources: list[str], selector: str | None = None) -> Iterator[R raise else: log.error("%s(%r): %s", reader, src, e) - log.debug("Full traceback", exc_info=e) + if trace: + log.exception("Full traceback") except KeyboardInterrupt: raise except Exception as e: diff --git a/flow/record/tools/rdump.py b/flow/record/tools/rdump.py index 4f010196..aba342e4 100644 --- a/flow/record/tools/rdump.py +++ b/flow/record/tools/rdump.py @@ -14,7 +14,7 @@ from flow.record import RecordWriter, iter_timestamped_records, record_stream from flow.record.selector import make_selector from flow.record.stream import RecordFieldRewriter -from flow.record.utils import catch_sigpipe +from flow.record.utils import LOGGING_TRACE_LEVEL, catch_sigpipe try: from flow.record.version import version @@ -197,7 +197,7 @@ def main(argv: list[str] | None = None) -> int: args = parser.parse_args(argv) - levels = [logging.WARNING, logging.INFO, logging.DEBUG] + levels = [logging.WARNING, logging.INFO, logging.DEBUG, LOGGING_TRACE_LEVEL] level = levels[min(len(levels) - 1, args.verbose)] logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s") @@ -326,7 +326,8 @@ def main(argv: list[str] | None = None) -> int: def print_error(e: Exception) -> None: log.error("rdump encountered a fatal error: %s", e) - log.debug("Full traceback", exc_info=e) + if log.isEnabledFor(LOGGING_TRACE_LEVEL): + log.exception("Full traceback") if __name__ == "__main__": diff --git a/flow/record/utils.py b/flow/record/utils.py index 57d28949..da538dbd 100644 --- a/flow/record/utils.py +++ b/flow/record/utils.py @@ -7,6 +7,8 @@ from functools import wraps from typing import Any, BinaryIO, Callable, TextIO +LOGGING_TRACE_LEVEL = 5 + def get_stdout(binary: bool = False) -> TextIO | BinaryIO: """Return the stdout stream as binary or text stream. From f32f0eb80bb2d59eac39e8a26531406065d53829 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 30 Jun 2025 11:50:27 +0200 Subject: [PATCH 13/13] Update flow/record/adapter/elastic.py Co-authored-by: Erik Schamper <1254028+Schamper@users.noreply.github.com> --- flow/record/adapter/elastic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index 024ae73f..d6c09324 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -283,7 +283,7 @@ def enrich_elastic_exception(exception: Exception) -> Exception: except Exception: errors.add("unable to extend errors") - # append errors to original exeption message + # append errors to original exception message error_str = ", ".join(errors) original_message = exception.args[0] if exception.args else "" new_message = f"{original_message} {error_str}"