diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index a7898917..24a4fbf8 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -1,14 +1,14 @@ from __future__ import annotations import hashlib +import json import logging import queue +import sys import threading from contextlib import suppress from typing import TYPE_CHECKING -import urllib3 - try: import elasticsearch import elasticsearch.helpers @@ -85,7 +85,7 @@ def __init__( self.max_retries = int(max_retries) if not uri.lower().startswith(("http://", "https://")): - uri = "http://" + uri + uri = "https://" + uri self.queue: queue.Queue[Record | StopIteration] = queue.Queue(maxsize=queue_size) self.event = threading.Event() @@ -95,6 +95,7 @@ def __init__( self.es = elasticsearch.Elasticsearch( uri, verify_certs=verify_certs, + ssl_show_warn=verify_certs, http_compress=http_compress, api_key=api_key, request_timeout=request_timeout, @@ -107,10 +108,6 @@ def __init__( self.thread = threading.Thread(target=self.streaming_bulk_thread) self.thread.start() - if not verify_certs: - # Disable InsecureRequestWarning of urllib3, caused by the verify_certs flag. - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - self.metadata_fields = {} for arg_key, arg_val in kwargs.items(): if arg_key.startswith("_meta_"): @@ -118,7 +115,13 @@ def __init__( def excepthook(self, exc: threading.ExceptHookArgs, *args, **kwargs) -> None: self.exception = getattr(exc, "exc_value", exc) - self.exception = enrich_elastic_exception(self.exception) + + # version guard for add_note(), which was added in Python 3.11 + # TODO: Remove version guard after dropping support for Python 3.10 + if sys.version_info >= (3, 11): + for note in create_elasticsearch_error_notes(getattr(self.exception, "errors", []), max_notes=5): + self.exception.add_note(note) + self.event.set() def record_to_document(self, record: Record, index: str) -> dict: @@ -230,11 +233,12 @@ def __init__( max_retries = int(max_retries) if not uri.lower().startswith(("http://", "https://")): - uri = "http://" + uri + uri = "https://" + uri self.es = elasticsearch.Elasticsearch( uri, verify_certs=verify_certs, + ssl_show_warn=verify_certs, http_compress=http_compress, api_key=api_key, request_timeout=request_timeout, @@ -242,10 +246,6 @@ def __init__( max_retries=max_retries, ) - if not verify_certs: - # Disable InsecureRequestWarning of urllib3, caused by the verify_certs flag. - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - def __iter__(self) -> Iterator[Record]: ctx = get_app_context() selector = self.selector @@ -266,30 +266,69 @@ def close(self) -> None: self.es.close() -def enrich_elastic_exception(exception: Exception) -> Exception: - """Extend the exception with error information from Elastic. +def create_elasticsearch_error_notes(errors: list[dict] | dict, max_notes: int = 0) -> list[str]: + """Convert Elasticsearch Exception errors into pretty formatted notes. Resources: - https://elasticsearch-py.readthedocs.io/en/v8.17.1/exceptions.html + + Arguments: + errors: A list of error items from an Elasticsearch exception, or a single error + max_notes: Maximum number of notes to create. If 0, all errors will be converted into notes. + + Returns: + A list of formatted error notes. """ - errors = set() - if hasattr(exception, "errors"): + if isinstance(errors, dict): + errors = [errors] + + notes = [] + for idx, error in enumerate(errors, 1): + # Extract index information + index = error.get("index", {}) + index_name = index.get("_index", "unknown _index") + doc_id = index.get("_id", "unknown _id") + status = index.get("status") + + # Extract error details + error = index.get("error", {}) + error_type = error.get("type", "unknown error type") + error_reason = error.get("reason", "unknown reason") + + # Create formatted note + note_parts = [ + f"Error {idx}, {error_type!r} ({status=}):", + f" index: {index_name}", + f" document_id: {doc_id}", + f" reason: {error_reason}", + ] + + # Include caused_by information if available + if caused_by := error.get("caused_by"): + cause_type = caused_by.get("type") + cause_reason = caused_by.get("reason") + note_parts.append(f" caused_by: {cause_type}, reason: {cause_reason}") + + # Extract the record_descriptor name from the "data" field if possible try: - for error in exception.errors: - index_dict = error.get("index", {}) - status = index_dict.get("status") - error_dict = index_dict.get("error", {}) - error_type = error_dict.get("type") - error_reason = error_dict.get("reason", "") - - errors.add(f"({status} {error_type} {error_reason})") + data = json.loads(index.get("data", "{}")) + record_metadata = data.pop("_record_metadata", {}) + descriptor = record_metadata.get("descriptor", {}) + if descriptor_name := descriptor.get("name"): + note_parts.append(f" descriptor_name: {descriptor_name}") + if data: + note_parts.append(f" data: {json.dumps(data)}") except Exception: - errors.add("unable to extend errors") + # failed to get descriptor_name and data, ignore + pass + + notes.append("\n".join(note_parts) + "\n") - # append errors to original exception message - error_str = ", ".join(errors) - original_message = exception.args[0] if exception.args else "" - new_message = f"{original_message} {error_str}" - exception.args = (new_message, *exception.args[1:]) + # if max_notes is reached, stop processing and add a final note about remaining errors + if max_notes > 0 and idx >= max_notes: + remaining = len(errors) - idx + if remaining > 0: + notes.append(f"... and {remaining} more error(s) not shown.") + break - return exception + return notes diff --git a/flow/record/tools/rdump.py b/flow/record/tools/rdump.py index 417c7490..974cbf6f 100644 --- a/flow/record/tools/rdump.py +++ b/flow/record/tools/rdump.py @@ -433,10 +433,17 @@ def main(argv: list[str] | None = None) -> int: return ret -def print_error(e: Exception) -> None: - log.error("rdump encountered a fatal error: %s", e) +def print_error(exc: Exception) -> None: + log.error("rdump encountered a fatal error: %s", exc) + if log.isEnabledFor(LOGGING_TRACE_LEVEL): - log.exception("Full traceback") + raise + + # Print any additional notes attached to the exception (e.g. from adapters) at warning level + for note in getattr(exc, "__notes__", []): + log.error(note) + + log.warning("To show full traceback, run with -vvv") if __name__ == "__main__": diff --git a/tests/adapter/test_elastic.py b/tests/adapter/test_elastic.py index 6df5384b..f5754e7b 100644 --- a/tests/adapter/test_elastic.py +++ b/tests/adapter/test_elastic.py @@ -1,12 +1,15 @@ +# ruff: noqa: E501 from __future__ import annotations import json +import sys from typing import TYPE_CHECKING import pytest +from elasticsearch.helpers import BulkIndexError from flow.record import RecordDescriptor -from flow.record.adapter.elastic import ElasticWriter +from flow.record.adapter.elastic import ElasticWriter, create_elasticsearch_error_notes if TYPE_CHECKING: from flow.record.base import Record @@ -57,3 +60,149 @@ def test_elastic_writer_metadata(record: Record) -> None: } ), } + + +def test_elastic_writer_metadata_exception() -> None: + with ElasticWriter(uri="elasticsearch:9200") as writer: + writer.excepthook( + BulkIndexError( + "1 document(s) failed to index.", + errors=[ + { + "index": { + "_index": "example-index", + "_id": "bWFkZSB5b3UgbG9vayDwn5GA", + "status": 400, + "error": { + "type": "document_parsing_exception", + "reason": "[1:225] failed to parse field [example] of type [long] in document with id " + "'bWFkZSB5b3UgbG9vayDwn5GA'. Preview of field's value: 'Foo'", + "caused_by": { + "type": "illegal_argument_exception", + "reason": 'For input string: "Foo"', + }, + }, + "data": '{"example":"Foo","_record_metadata":{"descriptor":{"name":"example/record",' + '"hash":1234567890},"source":"/path/to/source","classification":null,' + '"generated":"2025-12-31T12:34:56.789012+00:00","version":1}}', + } + } + ], + ) + ) + + with pytest.raises(BulkIndexError) as exc_info: + writer.__exit__() + + writer.exception = None + exception = exc_info.value + assert isinstance(exception, BulkIndexError) + + # version guard for __notes__ attribute, which was added in Python 3.11 + # TODO: Remove after we drop support for Python 3.10 + if sys.version_info >= (3, 11): + assert exception.__notes__ == [ + """\ +Error 1, 'document_parsing_exception' (status=400): + index: example-index + document_id: bWFkZSB5b3UgbG9vayDwn5GA + reason: [1:225] failed to parse field [example] of type [long] in document with id 'bWFkZSB5b3UgbG9vayDwn5GA'. Preview of field's value: 'Foo' + caused_by: illegal_argument_exception, reason: For input string: "Foo" + descriptor_name: example/record + data: {"example": "Foo"} +""" + ] + + +def test_create_elastic_notes() -> None: + exception = BulkIndexError( + "1 document(s) failed to index.", + errors=[ + { + "index": { + "_index": "example-index", + "_id": "bWFkZSB5b3UgbG9vayDwn5GA", + "status": 400, + "error": { + "type": "document_parsing_exception", + "reason": "[1:225] failed to parse field [example] of type [long] in document with id " + "'bWFkZSB5b3UgbG9vayDwn5GA'. Preview of field's value: 'Foo'", + "caused_by": { + "type": "illegal_argument_exception", + "reason": 'For input string: "Foo"', + }, + }, + "data": '{"example":"Foo","_record_metadata":{"descriptor":{"name":"example/record",' + '"hash":1234567890},"source":"/path/to/source","classification":null,' + '"generated":"2025-12-31T12:34:56.789012+00:00","version":1}}', + }, + }, + { + "index": { + "_index": "my-index", + "_id": "4XuIRpwBbjwxMKSCr8TE", + "status": 400, + "error": { + "type": "document_parsing_exception", + "reason": "[1:150] failed to parse field [content] of type [date] in document with id '4XuIRpwBbjwxMKSCr8TE'. Preview of field's value: 'This is the content of a sampe pastebin record'", + "caused_by": { + "type": "illegal_argument_exception", + "reason": "failed to parse date field [This is the content of a sampe pastebin record] with format [strict_date_optional_time||epoch_millis]", + "caused_by": { + "type": "date_time_parse_exception", + "reason": "Failed to parse with all enclosed parsers", + }, + }, + }, + "data": '{"key": "Q42eWSaF", "date": "2019-03-19T09:09:47+00:00", "expire_date": "1970-01-01T00:00:00+00:00", "title": "A sample pastebin record", "content": "This is the content of a sampe pastebin record", "user": "", "syntax": "text", "_record_metadata": {"descriptor": {"name": "text/paste", "hash": 831446724}, "source": "external/pastebin", "classification": "PUBLIC", "generated": "2019-03-19T09:11:04.706581+00:00", "version": 1}}', + } + }, + ], + ) + errors = exception.errors + assert len(errors) == 2 + + # Test with max_notes=1, which should only include the first error and a summary note about the remaining errors + notes = create_elasticsearch_error_notes(errors, max_notes=1) + assert len(notes) == 2 + assert ( + notes[0] + == """\ +Error 1, 'document_parsing_exception' (status=400): + index: example-index + document_id: bWFkZSB5b3UgbG9vayDwn5GA + reason: [1:225] failed to parse field [example] of type [long] in document with id 'bWFkZSB5b3UgbG9vayDwn5GA'. Preview of field's value: 'Foo' + caused_by: illegal_argument_exception, reason: For input string: "Foo" + descriptor_name: example/record + data: {"example": "Foo"} +""" + ) + assert notes[-1] == "... and 1 more error(s) not shown." + + # Test with max_notes=2, which should show both errors without the summary note + notes = create_elasticsearch_error_notes(errors, max_notes=2) + assert len(notes) == 2 + assert ( + notes[0] + == """\ +Error 1, 'document_parsing_exception' (status=400): + index: example-index + document_id: bWFkZSB5b3UgbG9vayDwn5GA + reason: [1:225] failed to parse field [example] of type [long] in document with id 'bWFkZSB5b3UgbG9vayDwn5GA'. Preview of field's value: 'Foo' + caused_by: illegal_argument_exception, reason: For input string: "Foo" + descriptor_name: example/record + data: {"example": "Foo"} +""" + ) + assert ( + notes[1] + == """\ +Error 2, 'document_parsing_exception' (status=400): + index: my-index + document_id: 4XuIRpwBbjwxMKSCr8TE + reason: [1:150] failed to parse field [content] of type [date] in document with id '4XuIRpwBbjwxMKSCr8TE'. Preview of field's value: 'This is the content of a sampe pastebin record' + caused_by: illegal_argument_exception, reason: failed to parse date field [This is the content of a sampe pastebin record] with format [strict_date_optional_time||epoch_millis] + descriptor_name: text/paste + data: {"key": "Q42eWSaF", "date": "2019-03-19T09:09:47+00:00", "expire_date": "1970-01-01T00:00:00+00:00", "title": "A sample pastebin record", "content": "This is the content of a sampe pastebin record", "user": "", "syntax": "text"} +""" + ) diff --git a/tests/tools/test_rdump.py b/tests/tools/test_rdump.py index 9f4df6b9..6a364bda 100644 --- a/tests/tools/test_rdump.py +++ b/tests/tools/test_rdump.py @@ -20,6 +20,7 @@ from flow.record.adapter.line import field_types_for_record_descriptor from flow.record.fieldtypes import flow_record_tz from flow.record.tools import rdump +from flow.record.utils import LOGGING_TRACE_LEVEL from tests._utils import generate_plain_records @@ -870,3 +871,36 @@ def test_rdump_invalid_stdin_pipe(stdin_bytes: bytes) -> None: assert pipe.returncode == 1, "rdump should exit with error code 1 on invalid input" assert b"rdump encountered a fatal error: Could not find adapter for file-like object" in stderr assert b"Processed 0 records (matched=0, unmatched=0)" in stdout + + +@pytest.mark.skipif(sys.version_info < (3, 11), reason="skip on python 3.10 or lower") +def test_rdump_print_error_notes( + tmp_path: Path, + capsys: pytest.CaptureFixture, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that rdump prints error notes when an exception occurs.""" + + path = tmp_path / "test.records" + path.touch() # create an empty file + + exc = ValueError("something went wrong") + exc.add_note("Check the input format") + + with mock.patch("flow.record.tools.rdump.RecordWriter", side_effect=exc): + rdump.main([str(path)]) + _out, err = capsys.readouterr() + + assert "something went wrong" in err + assert "Check the input format" in err + assert "To show full traceback, run with -vvv" in err + + # with full traceback + with ( + caplog.at_level(LOGGING_TRACE_LEVEL), + mock.patch("flow.record.tools.rdump.RecordWriter", side_effect=exc), + pytest.raises(ValueError, match="something went wrong\nCheck the input format"), + ): + rdump.main([str(path), "-vvv"]) + + capsys.readouterr()