diff --git a/.vscode/launch.json b/.vscode/launch.json index 1b682f69b..c1c36a3d8 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -2,11 +2,11 @@ "version": "0.2.0", "configurations": [ { - "name": "Debug non-ng example pipeline", + "name": "non-ng example pipeline", "type": "debugpy", "request": "launch", "program": "logprep/run_logprep.py", - "console": "integratedTerminal", + "console": "internalConsole", "args": [ "run", "examples/exampledata/config/pipeline.yml" @@ -15,6 +15,21 @@ "PROMETHEUS_MULTIPROC_DIR": "tmp/logprep" }, "justMyCode": false + }, + { + "name": "ng example pipeline", + "type": "debugpy", + "request": "launch", + "program": "logprep/run_ng.py", + "console": "internalConsole", + "args": [ + "run", + "examples/exampledata/config/ng_pipeline.yml" + ], + "env": { + "PROMETHEUS_MULTIPROC_DIR": "tmp/logprep" + }, + "justMyCode": false } ] } diff --git a/benchmark.py b/benchmark.py index aa49b5805..cbc258511 100644 --- a/benchmark.py +++ b/benchmark.py @@ -591,6 +591,7 @@ def benchmark_run( binary = "logprep-ng" if ng == 1 else "logprep" t_startup = time.time() + logprep_proc = popen_cmd([binary, "run", str(pipeline_config)], env=env) _current_logprep_proc = logprep_proc diff --git a/examples/exampledata/config/_benchmark_ng_pipeline.yml b/examples/exampledata/config/_benchmark_ng_pipeline.yml new file mode 100644 index 000000000..a8ed1b630 --- /dev/null +++ b/examples/exampledata/config/_benchmark_ng_pipeline.yml @@ -0,0 +1,125 @@ +version: 2 +process_count: 1 +timeout: 5.0 +restart_count: 2 +config_refresh_interval: 5 +error_backlog_size: 15000 +logger: + level: DEBUG + format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s" + datefmt: "%Y-%m-%d %H:%M:%S" + loggers: + "py.warnings": {"level": "ERROR"} + "Runner": {"level": "INFO"} + "Processor": {"level": "ERROR"} + "Exporter": {"level": "ERROR"} + "uvicorn": {"level": "ERROR"} + "uvicorn.access": {"level": "ERROR"} + "OpenSearchOutput": {"level": "DEBUG"} + "KafkaOutput": {"level": "ERROR"} + "Input": {"level": "ERROR"} +metrics: + enabled: true + port: 8001 +pipeline: + - labelername: + type: ng_labeler + schema: examples/exampledata/rules/labeler/schema.json + include_parent_labels: true + rules: + - examples/exampledata/rules/labeler/rules + - dissector: + type: ng_dissector + rules: + - examples/exampledata/rules/dissector/rules + - dropper: + type: ng_dropper + rules: + - examples/exampledata/rules/dropper/rules + - filter: "test_dropper" + dropper: + drop: + - drop_me + description: "..." + - pre_detector: + type: ng_pre_detector + rules: + - examples/exampledata/rules/pre_detector/rules + outputs: + - opensearch: sre + tree_config: examples/exampledata/rules/pre_detector/tree_config.json + alert_ip_list_path: examples/exampledata/rules/pre_detector/alert_ips.yml + - amides: + type: ng_amides + rules: + - examples/exampledata/rules/amides/rules + models_path: examples/exampledata/models/model.zip + num_rule_attributions: 10 + max_cache_entries: 1000000 + decision_threshold: 0.32 + - pseudonymizer: + type: ng_pseudonymizer + pubkey_analyst: examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem + pubkey_depseudo: examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem + regex_mapping: examples/exampledata/rules/pseudonymizer/regex_mapping.yml + hash_salt: a_secret_tasty_ingredient + outputs: + - opensearch: pseudonyms + rules: + - examples/exampledata/rules/pseudonymizer/rules/ + max_cached_pseudonyms: 1000000 + - calculator: + type: ng_calculator + rules: + - filter: "test_label: execute" + calculator: + target_field: "calculation" + calc: "1 + 1" +input: + kafka: + type: ng_confluentkafka_input + topic: consumer + kafka_config: + bootstrap.servers: 127.0.0.1:9092 + group.id: cgroup3 + enable.auto.commit: "true" + auto.commit.interval.ms: "10000" + enable.auto.offset.store: "false" + queued.min.messages: "100000" + queued.max.messages.kbytes: "65536" + statistics.interval.ms: "60000" + preprocessing: + version_info_target_field: Logprep_version_info + log_arrival_time_target_field: event.ingested + hmac: + target: + key: "thisisasecureandrandomkey" + output_field: Full_event +output: + opensearch: + type: ng_opensearch_output + hosts: + - 127.0.0.1:9200 + default_index: processed + default_op_type: create + message_backlog_size: 2500 + timeout: 10000 + flush_timeout: 60 + user: admin + secret: admin + desired_cluster_status: ["green", "yellow"] + chunk_size: 25 +error_output: + opensearch: + type: ng_opensearch_output + hosts: + - 127.0.0.1:9200 + default_index: errors + default_op_type: create + message_backlog_size: 2500 + timeout: 10000 + flush_timeout: 60 + user: admin + secret: admin + desired_cluster_status: ["green", "yellow"] + chunk_size: 25 diff --git a/examples/exampledata/config/_benchmark_non_ng_pipeline.yml b/examples/exampledata/config/_benchmark_non_ng_pipeline.yml new file mode 100644 index 000000000..ac956e549 --- /dev/null +++ b/examples/exampledata/config/_benchmark_non_ng_pipeline.yml @@ -0,0 +1,125 @@ +version: 2 +process_count: 1 +timeout: 5.0 +restart_count: 2 +config_refresh_interval: 5 +error_backlog_size: 15000 +logger: + level: DEBUG + format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s" + datefmt: "%Y-%m-%d %H:%M:%S" + loggers: + "py.warnings": {"level": "ERROR"} + "Runner": {"level": "INFO"} + "Processor": {"level": "ERROR"} + "Exporter": {"level": "ERROR"} + "uvicorn": {"level": "ERROR"} + "uvicorn.access": {"level": "ERROR"} + "OpenSearchOutput": {"level": "DEBUG"} + "KafkaOutput": {"level": "ERROR"} + "Input": {"level": "ERROR"} +metrics: + enabled: true + port: 8001 +pipeline: + - labelername: + type: labeler + schema: examples/exampledata/rules/labeler/schema.json + include_parent_labels: true + rules: + - examples/exampledata/rules/labeler/rules + - dissector: + type: dissector + rules: + - examples/exampledata/rules/dissector/rules + - dropper: + type: dropper + rules: + - examples/exampledata/rules/dropper/rules + - filter: "test_dropper" + dropper: + drop: + - drop_me + description: "..." + - pre_detector: + type: pre_detector + rules: + - examples/exampledata/rules/pre_detector/rules + outputs: + - opensearch: sre + tree_config: examples/exampledata/rules/pre_detector/tree_config.json + alert_ip_list_path: examples/exampledata/rules/pre_detector/alert_ips.yml + - amides: + type: amides + rules: + - examples/exampledata/rules/amides/rules + models_path: examples/exampledata/models/model.zip + num_rule_attributions: 10 + max_cache_entries: 1000000 + decision_threshold: 0.32 + - pseudonymizer: + type: pseudonymizer + pubkey_analyst: examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem + pubkey_depseudo: examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem + regex_mapping: examples/exampledata/rules/pseudonymizer/regex_mapping.yml + hash_salt: a_secret_tasty_ingredient + outputs: + - opensearch: pseudonyms + rules: + - examples/exampledata/rules/pseudonymizer/rules/ + max_cached_pseudonyms: 1000000 + - calculator: + type: calculator + rules: + - filter: "test_label: execute" + calculator: + target_field: "calculation" + calc: "1 + 1" +input: + kafka: + type: confluentkafka_input + topic: consumer + kafka_config: + bootstrap.servers: 127.0.0.1:9092 + group.id: cgroup3 + enable.auto.commit: "true" + auto.commit.interval.ms: "10000" + enable.auto.offset.store: "false" + queued.min.messages: "100000" + queued.max.messages.kbytes: "65536" + statistics.interval.ms: "60000" + preprocessing: + version_info_target_field: Logprep_version_info + log_arrival_time_target_field: event.ingested + hmac: + target: + key: "thisisasecureandrandomkey" + output_field: Full_event +output: + opensearch: + type: opensearch_output + hosts: + - 127.0.0.1:9200 + default_index: processed + default_op_type: create + message_backlog_size: 2500 + timeout: 10000 + flush_timeout: 60 + user: admin + secret: admin + desired_cluster_status: ["green", "yellow"] + chunk_size: 25 +error_output: + opensearch: + type: opensearch_output + hosts: + - 127.0.0.1:9200 + default_index: errors + default_op_type: create + message_backlog_size: 2500 + timeout: 10000 + flush_timeout: 60 + user: admin + secret: admin + desired_cluster_status: ["green", "yellow"] + chunk_size: 25 diff --git a/examples/exampledata/config/ng_pipeline.yml b/examples/exampledata/config/ng_pipeline.yml index 5a173a0fb..fc8c4dbda 100644 --- a/examples/exampledata/config/ng_pipeline.yml +++ b/examples/exampledata/config/ng_pipeline.yml @@ -17,6 +17,7 @@ logger: "uvicorn.access": {"level": "ERROR"} "OpenSearchOutput": {"level": "DEBUG"} "KafkaOutput": {"level": "ERROR"} + "Input": {"level": "ERROR"} metrics: enabled: true port: 8001 diff --git a/logprep/abc/component.py b/logprep/abc/component.py index db80d43c4..a80910464 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -77,7 +77,7 @@ def metric_labels(self) -> dict: """Labels for the metrics""" return {"component": self._config.type, "name": self.name, "description": "", "type": ""} - def __init__(self, name: str, configuration: "Config", pipeline_index: int | None = None): + def __init__(self, name: str, configuration: Config, pipeline_index: int | None = None): self._config = configuration self.name = name self.pipeline_index = pipeline_index diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 45f3c4118..bd11c3bbf 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -151,7 +151,7 @@ def result(self, value: ProcessorResult): self._result = value @property - def rules(self) -> list["Rule"]: + def rules(self) -> Sequence["Rule"]: """Returns all rules Returns diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index 0bc924cc5..4ee2c13f9 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -266,7 +266,7 @@ class HttpEndpoint(ABC): # pylint: disable=too-many-arguments def __init__( self, - messages: mp.Queue, + messages: mp.Queue[dict], original_event_field: dict[str, str] | None, collect_meta: bool, metafield_name: str, @@ -336,7 +336,7 @@ def put_message(self, event: dict, metadata: dict): class JSONHttpEndpoint(HttpEndpoint): """:code:`json` endpoint to get json from request""" - _decoder = msgspec.json.Decoder() + _decoder: msgspec.json.Decoder[dict] = msgspec.json.Decoder() @raise_request_exceptions @basic_auth @@ -360,7 +360,7 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff class JSONLHttpEndpoint(HttpEndpoint): """:code:`jsonl` endpoint to get jsonl from request""" - _decoder = msgspec.json.Decoder() + _decoder: msgspec.json.Decoder[dict] = msgspec.json.Decoder() @raise_request_exceptions @basic_auth @@ -555,7 +555,7 @@ def __attrs_post_init__(self): __slots__: list[str] = ["target", "app", "http_server"] - messages: typing.Optional[Queue] = None + messages: Queue[dict] | None = None _endpoint_registry: Mapping[str, type[HttpEndpoint]] = { "json": JSONHttpEndpoint, diff --git a/logprep/factory.py b/logprep/factory.py index 3c6b18d5b..7e6e6db33 100644 --- a/logprep/factory.py +++ b/logprep/factory.py @@ -12,7 +12,7 @@ class Factory: """Create components for logprep.""" @classmethod - def create(cls, configuration: dict) -> Component | None: + def create(cls, configuration: dict) -> Component: """Create component.""" if configuration == {} or configuration is None: raise InvalidConfigurationError("The component definition is empty.") @@ -23,16 +23,14 @@ def create(cls, configuration: dict) -> Component | None: f"Found multiple component definitions ({', '.join(configuration.keys())})," + " but there must be exactly one." ) - for component_name, component_configuration_dict in configuration.items(): - if configuration == {} or component_configuration_dict is None: - raise InvalidConfigurationError( - f'The definition of component "{component_name}" is empty.' - ) - if not isinstance(component_configuration_dict, dict): - raise InvalidConfigSpecificationError(component_name) - component = Configuration.get_class(component_name, component_configuration_dict) - component_configuration = Configuration.create( - component_name, component_configuration_dict + # we know configuration has exactly one entry + [(component_name, component_configuration_dict)] = configuration.items() + if component_configuration_dict is None: + raise InvalidConfigurationError( + f'The definition of component "{component_name}" is empty.' ) - return component(component_name, component_configuration) - return None + if not isinstance(component_configuration_dict, dict): + raise InvalidConfigSpecificationError(component_name) + component = Configuration.get_class(component_name, component_configuration_dict) + component_configuration = Configuration.create(component_name, component_configuration_dict) + return component(component_name, component_configuration) diff --git a/logprep/ng/abc/component.py b/logprep/ng/abc/component.py new file mode 100644 index 000000000..0bb618e56 --- /dev/null +++ b/logprep/ng/abc/component.py @@ -0,0 +1,28 @@ +"""abstract module for components""" + +import logging + +from logprep.abc.component import Component + +logger = logging.getLogger("Component") + + +class NgComponent(Component): + """Abstract Component Class to define the Interface""" + + # pylint: disable=invalid-overridden-method, useless-parent-delegation + # TODO fork ng-based Component properly + # We override the setup to be async in the ng component tree. + # This is unclean from an interface perspective, but works if the worlds doen't mix. + + async def setup(self) -> None: + """Set up the ng component.""" + + super().setup() + + async def shut_down(self) -> None: + """Shut down ng component and cleanup resources.""" + + super().shut_down() + + # pylint: enable=invalid-overridden-method,useless-parent-delegation diff --git a/logprep/ng/abc/connector.py b/logprep/ng/abc/connector.py new file mode 100644 index 000000000..d817d94c9 --- /dev/null +++ b/logprep/ng/abc/connector.py @@ -0,0 +1,50 @@ +"""abstract module for connectors""" + +from attrs import define, field + +from logprep.metrics.metrics import CounterMetric, HistogramMetric +from logprep.ng.abc.component import NgComponent as Component + + +class Connector(Component): + """Abstract Connector Class to define the Interface""" + + @define(kw_only=True) + class Config(Component.Config): + """Configuration for the connector""" + + @define(kw_only=True) + class Metrics(Component.Metrics): + """Tracks statistics about this connector""" + + number_of_processed_events: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of successful events", + name="number_of_processed_events", + ) + ) + """Number of successful events""" + + processing_time_per_event: HistogramMetric = field( + factory=lambda: HistogramMetric( + description="Time in seconds that it took to store an event", + name="processing_time_per_event", + ) + ) + """Time in seconds that it took to process an event""" + + number_of_warnings: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of warnings that occurred while storing events", + name="number_of_warnings", + ) + ) + """Number of warnings that occurred while processing events""" + + number_of_errors: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of errors that occurred while storing events", + name="number_of_errors", + ) + ) + """Number of errors that occurred while processing events""" diff --git a/logprep/ng/abc/event.py b/logprep/ng/abc/event.py index c8400e975..72a6523c0 100644 --- a/logprep/ng/abc/event.py +++ b/logprep/ng/abc/event.py @@ -9,6 +9,7 @@ from logprep.ng.event.event_state import EventState, EventStateType from logprep.util.helper import ( FieldValue, + Missing, add_fields_to, get_dotted_field_value, pop_dotted_field_value, @@ -21,6 +22,14 @@ class EventMetadata(ABC): """Abstract EventMetadata Class to define the Interface""" + @staticmethod + def from_dict(_: dict): + """ + Constructs a metadata object from the given dict. + Currently implemented as a placeholder for future development. + """ + return EventMetadata() + class Event(ABC): """ @@ -197,7 +206,7 @@ def get_dotted_field_value(self, dotted_field: str) -> Any: """ return get_dotted_field_value(self.data, dotted_field) - def pop_dotted_field_value(self, dotted_field: str) -> FieldValue: + def pop_dotted_field_value(self, dotted_field: str) -> FieldValue | Missing: """ Shortcut method that delegates to the global `pop_dotted_field_value` helper. diff --git a/logprep/ng/abc/input.py b/logprep/ng/abc/input.py index 67e0db9b9..57eafa182 100644 --- a/logprep/ng/abc/input.py +++ b/logprep/ng/abc/input.py @@ -1,3 +1,5 @@ +# pylint: disable=line-too-long + """This module provides the abstract base class for all input endpoints. New input endpoint types are created by implementing it. """ @@ -10,22 +12,19 @@ import typing import zlib from abc import abstractmethod -from collections.abc import Iterator +from collections.abc import AsyncIterator from copy import deepcopy from functools import cached_property from hmac import HMAC -from typing import Self from zoneinfo import ZoneInfo from attrs import define, field, validators -from logprep.abc.connector import Connector from logprep.abc.exceptions import LogprepException -from logprep.metrics.metrics import Metric -from logprep.ng.abc.event import EventBacklog +from logprep.ng.abc.connector import Connector +from logprep.ng.abc.event import EventMetadata from logprep.ng.event.event_state import EventStateType from logprep.ng.event.log_event import LogEvent -from logprep.ng.event.set_event_backlog import SetEventBacklog from logprep.processor.base.exceptions import FieldExistsWarning from logprep.util.converters import convert_from_dict from logprep.util.helper import ( @@ -87,7 +86,7 @@ class SourceDisconnectedWarning(InputWarning): """Lost (or failed to establish) contact with the source.""" -class InputIterator(Iterator): +class InputIterator(AsyncIterator): """Base Class for an input Iterator""" def __init__(self, input_connector: "Input", timeout: float): @@ -104,18 +103,7 @@ def __init__(self, input_connector: "Input", timeout: float): self.input_connector = input_connector self.timeout = timeout - def __iter__(self) -> Self: - """Return the iterator instance itself. - - Returns - ------- - Self - The iterator instance (self). - """ - - return self - - def __next__(self) -> LogEvent | None: + async def __anext__(self) -> LogEvent | None: """Return the next event in the Input Connector within the configured timeout. Returns @@ -123,7 +111,7 @@ def __next__(self) -> LogEvent | None: LogEvent | None The next event retrieved from the underlying data source. """ - event = self.input_connector.get_next(timeout=self.timeout) + event = await self.input_connector.get_next(timeout=self.timeout) logger.debug( "InputIterator fetching next event with timeout %s, is None: %s", self.timeout, @@ -160,7 +148,6 @@ class Config(Connector.Config): ) def __init__(self, name: str, configuration: "Input.Config") -> None: - self.event_backlog: EventBacklog = SetEventBacklog() super().__init__(name, configuration) @property @@ -193,17 +180,9 @@ def __call__(self, *, timeout: float) -> InputIterator: return InputIterator(self, timeout) - def acknowledge(self) -> None: - """Acknowledge all delivered events, so Input Connector can return final ACK state. - - As side effect, all older events with state ACKED has to be removed from `event_backlog` - before acknowledging new ones. - """ - - self.event_backlog.unregister(state_type=EventStateType.ACKED) - - for event in self.event_backlog.get(state_type=EventStateType.DELIVERED): - event.state.next_state() + @abstractmethod + async def acknowledge(self, events: list[LogEvent]) -> None: + """Acknowledge all delivered events, so Input Connector can return final ACK state.""" @property def _add_hmac(self) -> bool: @@ -254,23 +233,8 @@ def _add_full_event_to_target_field(self) -> bool: """Check and return if the event should be written into one singular field.""" return bool(self.config.preprocessing.add_full_event_to_target_field) - def _get_raw_event(self, timeout: float) -> bytes | None: # pylint: disable=unused-argument - """Implements the details how to get the raw event - - Parameters - ---------- - timeout : float - timeout - - Returns - ------- - raw_event : bytes - The retrieved raw event - """ - return None - @abstractmethod - def _get_event(self, timeout: float) -> tuple: + async def _get_event(self, timeout: float) -> tuple[dict, bytes, EventMetadata] | None: """Implements the details how to get the event Parameters @@ -280,30 +244,30 @@ def _get_event(self, timeout: float) -> tuple: Returns ------- - (event, raw_event, metadata) + (event, raw_event, metadata) | None """ - def _register_failed_event( + def _produce_failed_event( self, event: dict | None, - raw_event: bytes | None, - metadata: dict | None, + raw_event: bytes, + metadata: EventMetadata, error: Exception, - ) -> None: + ) -> LogEvent: """Helper method to register the failed event to event backlog.""" error_log_event = LogEvent( - data=event if isinstance(event, dict) else {}, - original=raw_event if raw_event is not None else b"", - metadata=metadata, # type: ignore + data=event if event is not None else {}, + original=raw_event, + metadata=metadata, ) error_log_event.errors.append(error) error_log_event.state.current_state = EventStateType.FAILED - self.event_backlog.register(events=[error_log_event]) + return error_log_event - @Metric.measure_time() - def get_next(self, timeout: float) -> LogEvent | None: + # @Metric.measure_time() + async def get_next(self, timeout: float) -> LogEvent | None: """Return the next document Parameters @@ -316,17 +280,14 @@ def get_next(self, timeout: float) -> LogEvent | None: input : LogEvent, None Input log data. """ - self.acknowledge() - event: dict | None = None - raw_event: bytes | None = None - metadata: dict | None = None + event_tuple = await self._get_event(timeout) - try: - event, raw_event, metadata = self._get_event(timeout) + if event_tuple is None: + return None - if event is None: - return None + event, raw_event, metadata = event_tuple + try: if not isinstance(event, dict): raise CriticalInputError(self, "not a dict", event) @@ -366,26 +327,25 @@ def get_next(self, timeout: float) -> LogEvent | None: except (FieldExistsWarning, TimeParserException) as error: raise CriticalInputError(self, error.args[0], event) from error except CriticalInputError as error: - self._register_failed_event( + self._produce_failed_event( event=event, raw_event=raw_event, - metadata=metadata, # type: ignore + metadata=metadata if metadata is not None else None, error=error, ) return None log_event = LogEvent( data=event, - original=raw_event, # type: ignore - metadata=metadata, # type: ignore + original=raw_event, + metadata=metadata, ) - self.event_backlog.register(events=[log_event]) - log_event.state.next_state() + log_event.state.current_state = EventStateType.RECEIVED return log_event - def batch_finished_callback(self) -> None: + async def batch_finished_callback(self) -> None: """Can be called by output connectors after processing a batch of one or more records.""" def _add_env_enrichment_to_event(self, event: dict, enrichments: dict) -> None: @@ -524,3 +484,13 @@ def _add_hmac_to( } add_fields_to(event_dict, new_field) return event_dict + + async def setup(self) -> None: + """Set up the input connector.""" + + await super().setup() + + async def shut_down(self) -> None: + """Shut down input components and cleanup resources.""" + + await super().shut_down() diff --git a/logprep/ng/abc/output.py b/logprep/ng/abc/output.py index 8961e32ef..0fdf32bfd 100644 --- a/logprep/ng/abc/output.py +++ b/logprep/ng/abc/output.py @@ -3,13 +3,14 @@ """ from abc import abstractmethod +from collections.abc import Sequence from copy import deepcopy from typing import Any, Callable from attrs import define, field, validators -from logprep.abc.connector import Connector from logprep.abc.exceptions import LogprepException +from logprep.ng.abc.connector import Connector from logprep.ng.abc.event import Event from logprep.ng.event.event_state import EventStateType @@ -81,7 +82,7 @@ def __init__(self, name: str, configuration: "Connector.Config"): self.input_connector = None @abstractmethod - def store(self, event: Event) -> None: + async def store(self, event: Event) -> None: """Store the event in the output destination. Parameters @@ -91,7 +92,7 @@ def store(self, event: Event) -> None: """ @abstractmethod - def store_custom(self, event: Event, target: str) -> None: + async def store_custom(self, event: Event, target: str) -> None: """Store the event in the output destination. Parameters @@ -103,7 +104,26 @@ def store_custom(self, event: Event, target: str) -> None: """ @abstractmethod - def flush(self): + async def store_batch( + self, events: Sequence[Event], target: str | None = None + ) -> Sequence[Event]: + """Stores the events in the output destination. + + Parameters + ---------- + events : Sequence[Event] + Events to be stored. + target : str | None + Custom target for the events, defaults to None + + Returns + ------- + Sequence[Event] + Events after sending. + """ + + @abstractmethod + async def flush(self): """Write the backlog to the output destination. Needs to be implemented in child classes to ensure that the backlog is written to the output destination. @@ -124,7 +144,13 @@ def wrapper(self, *args, **kwargs): return wrapper - def _shut_down(self) -> None: - """Shut down the output connector.""" - self.flush() - return super()._shut_down() + async def setup(self) -> None: + """Set up the output connector.""" + + await super().setup() + + async def shut_down(self) -> None: + """Shut down the output connector and cleanup resources.""" + + await self.flush() + await super().shut_down() diff --git a/logprep/ng/abc/processor.py b/logprep/ng/abc/processor.py index 6b074f3f4..cc093994f 100644 --- a/logprep/ng/abc/processor.py +++ b/logprep/ng/abc/processor.py @@ -9,9 +9,9 @@ from attrs import define, field, validators -from logprep.abc.component import Component from logprep.framework.rule_tree.rule_tree import RuleTree from logprep.metrics.metrics import Metric +from logprep.ng.abc.component import NgComponent as Component from logprep.ng.event.log_event import LogEvent from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning from logprep.util.helper import ( @@ -255,7 +255,14 @@ def _write_target_field(self, event: dict, rule: "Rule", result: Any) -> None: overwrite_target=getattr(rule, "overwrite_target", False), ) - def setup(self) -> None: - super().setup() + async def setup(self) -> None: + """Set up the processor.""" + + await super().setup() for rule in self.rules: _ = rule.metrics # initialize metrics to show them on startup + + async def shut_down(self) -> None: + """Shut down the processor and run required cleanups""" + + await super().shut_down() diff --git a/logprep/ng/connector/confluent_kafka/input.py b/logprep/ng/connector/confluent_kafka/input.py index 5824f148f..f4aa377d0 100644 --- a/logprep/ng/connector/confluent_kafka/input.py +++ b/logprep/ng/connector/confluent_kafka/input.py @@ -28,14 +28,12 @@ auto.offset.reset: "earliest" """ -# pylint: enable=line-too-long import logging import os import typing from functools import cached_property, partial from socket import getfqdn -from types import MappingProxyType -from typing import Union +from types import MappingProxyType # pylint: disable=no-name-in-module import msgspec from attrs import define, field, validators @@ -44,14 +42,15 @@ OFFSET_END, OFFSET_INVALID, OFFSET_STORED, - Consumer, KafkaException, Message, TopicPartition, ) from confluent_kafka.admin import AdminClient +from confluent_kafka.aio import AIOConsumer from logprep.metrics.metrics import CounterMetric, GaugeMetric +from logprep.ng.abc.event import EventMetadata from logprep.ng.abc.input import ( CriticalInputError, CriticalInputParsingError, @@ -60,6 +59,7 @@ InputWarning, ) from logprep.ng.connector.confluent_kafka.metadata import ConfluentKafkaMetadata +from logprep.ng.event.log_event import LogEvent from logprep.util.validators import keys_in_validator DEFAULTS = { @@ -266,16 +266,23 @@ class Config(Input.Config): - Use SSL/mTLS encryption for data in transit. - Configure SASL or mTLS authentication for your Kafka clients. - Regularly rotate your Kafka credentials and secrets. - """ - _last_valid_record: Message | None + max_workers: int = field( + validator=validators.instance_of(int), + default=4, + ) + """ + The maximum number of concurrent worker tasks for message processing. + Should generally not exceed the number of topic partitions. + Defaults to 4. + """ - __slots__ = ["_last_valid_record"] + __slots__ = ["_last_valid_record", "_consumer"] def __init__(self, name: str, configuration: "ConfluentKafkaInput.Config") -> None: super().__init__(name, configuration) - self._last_valid_record = None + self._last_valid_record: Message | None = None @property def config(self) -> Config: @@ -308,6 +315,23 @@ def _kafka_config(self) -> dict: ) return DEFAULTS | self.config.kafka_config | injected_config + async def setup(self): + """Set the confluent kafka input connector.""" + + await super().setup() + + try: + self._consumer = AIOConsumer(self._kafka_config, max_workers=self.config.max_workers) + + await self._consumer.subscribe( + [self.config.topic], + on_assign=self._assign_callback, + on_revoke=self._revoke_callback, + on_lost=self._lost_callback, + ) + except KafkaException as error: + raise FatalInputError(self, f"Could not setup kafka consumer: {error}") from error + @cached_property def _admin(self) -> AdminClient: """configures and returns the admin client @@ -323,18 +347,27 @@ def _admin(self) -> AdminClient: admin_config[key] = value return AdminClient(admin_config) - @cached_property - def _consumer(self) -> Consumer: - """configures and returns the consumer + async def get_consumer(self, max_workers: int = 4) -> AIOConsumer: + """ + Configures and returns the asynchronous Kafka consumer. + + Parameters + ---------- + max_workers : int, optional + Returns ------- - Consumer - confluent_kafka consumer object + AIOConsumer + The pre-configured aiokafka consumer object. """ - return Consumer(self._kafka_config) - def _error_callback(self, error: KafkaException) -> None: + if self._consumer is None: + self._consumer = AIOConsumer(self._kafka_config, max_workers=max_workers) + + return self._consumer + + async def _error_callback(self, error: KafkaException) -> None: """Callback for generic/global error events, these errors are typically to be considered informational since the client will automatically try to recover. This callback is served upon calling client.poll() @@ -347,7 +380,7 @@ def _error_callback(self, error: KafkaException) -> None: self.metrics.number_of_errors += 1 logger.error("%s: %s", self.describe(), error) - def _stats_callback(self, stats_raw: str) -> None: + async def _stats_callback(self, stats_raw: str) -> None: """Callback for statistics data. This callback is triggered by poll() or flush every `statistics.interval.ms` (needs to be configured separately) @@ -381,8 +414,10 @@ def _stats_callback(self, stats_raw: str) -> None: "assignment_size", DEFAULT_RETURN ) - def _commit_callback( - self, error: Union[KafkaException, None], topic_partitions: list[TopicPartition] + async def _commit_callback( + self, + error: KafkaException | None, + topic_partitions: list[TopicPartition], ) -> None: """Callback used to indicate success or failure of asynchronous and automatic commit requests. This callback is served upon calling consumer.poll() @@ -424,8 +459,7 @@ def describe(self) -> str: base_description = super().describe() return f"{base_description} - Kafka Input: {self.config.kafka_config['bootstrap.servers']}" - def _get_raw_event(self, timeout: float) -> Message | None: # type: ignore - # TODO type needs to be fixed + async def _get_raw_event(self, timeout: float) -> Message | None: """Get next raw Message from Kafka. Parameters @@ -444,7 +478,8 @@ def _get_raw_event(self, timeout: float) -> Message | None: # type: ignore Raises if an input is invalid or if it causes an error. """ try: - message = self._consumer.poll(timeout=timeout) + consumer = await self.get_consumer() + message = await consumer.poll(timeout=timeout) except RuntimeError as error: raise FatalInputError(self, str(error)) from error if message is None: @@ -463,7 +498,7 @@ def _get_raw_event(self, timeout: float) -> Message | None: # type: ignore return message - def _get_event(self, timeout: float) -> tuple: + async def _get_event(self, timeout: float) -> tuple[dict, bytes, EventMetadata] | None: """Parse the raw document from Kafka into a json. Parameters @@ -486,11 +521,10 @@ def _get_event(self, timeout: float) -> tuple: Raises if an input is invalid or if it causes an error. """ - message = self._get_raw_event(timeout) - # assert None not in (message.value(), message.partition(), message.offset()) + message = await self._get_raw_event(timeout) if message is None: - return None, None, None + return None raw_event = typing.cast(bytes, message.value()) @@ -522,7 +556,7 @@ def _enable_auto_offset_store(self) -> bool: def _enable_auto_commit(self) -> bool: return self.config.kafka_config.get("enable.auto.commit") == "true" - def batch_finished_callback(self) -> None: + async def batch_finished_callback(self) -> None: """Store offsets for last message referenced by `self._last_valid_records`. Should be called after delivering the current message to the output or error queue. """ @@ -533,14 +567,16 @@ def batch_finished_callback(self) -> None: if not self._last_valid_record: return try: - self._consumer.store_offsets(message=self._last_valid_record) + await self._consumer.store_offsets(message=self._last_valid_record) except KafkaException as error: raise InputWarning(self, f"{error}, {self._last_valid_record}") from error - def _assign_callback(self, _: Consumer, topic_partitions: list[TopicPartition]) -> None: + async def _assign_callback( + self, _: AIOConsumer, topic_partitions: list[TopicPartition] + ) -> None: for topic_partition in topic_partitions: offset, partition = topic_partition.offset, topic_partition.partition - member_id = self._get_memberid() + member_id = await self._get_memberid() logger.info( "%s was assigned to topic: %s | partition %s", member_id, @@ -554,23 +590,24 @@ def _assign_callback(self, _: Consumer, topic_partitions: list[TopicPartition]) self.metrics.committed_offsets.add_with_labels(offset, labels) self.metrics.current_offsets.add_with_labels(offset, labels) - def _revoke_callback(self, _: Consumer, topic_partitions: list[TopicPartition]) -> None: - + async def _revoke_callback( + self, _: AIOConsumer, topic_partitions: list[TopicPartition] + ) -> None: for topic_partition in topic_partitions: self.metrics.number_of_warnings += 1 - member_id = self._get_memberid() + member_id = await self._get_memberid() logger.warning( "%s to be revoked from topic: %s | partition %s", member_id, topic_partition.topic, topic_partition.partition, ) - self.batch_finished_callback() + await self.batch_finished_callback() - def _lost_callback(self, _: Consumer, topic_partitions: list[TopicPartition]) -> None: + async def _lost_callback(self, _: AIOConsumer, topic_partitions: list[TopicPartition]) -> None: for topic_partition in topic_partitions: self.metrics.number_of_warnings += 1 - member_id = self._get_memberid() + member_id = await self._get_memberid() logger.warning( "%s has lost topic: %s | partition %s - try to reassign", member_id, @@ -578,19 +615,14 @@ def _lost_callback(self, _: Consumer, topic_partitions: list[TopicPartition]) -> topic_partition.partition, ) - def _get_memberid(self) -> str | None: + async def _get_memberid(self) -> str | None: member_id = None try: - member_id = self._consumer.memberid() + member_id = self._consumer._consumer.memberid() # pylint: disable=protected-access except RuntimeError as error: logger.error("Failed to retrieve member ID: %s", error) return member_id - def _shut_down(self) -> None: - """Close consumer, which also commits kafka offsets.""" - self._consumer.close() - return super()._shut_down() - def health(self) -> bool: """Check the health of the component. @@ -611,15 +643,13 @@ def health(self) -> bool: return False return super().health() - def setup(self) -> None: - """Set the component up.""" - try: - self._consumer.subscribe( - [self.config.topic], - on_assign=self._assign_callback, - on_revoke=self._revoke_callback, - on_lost=self._lost_callback, - ) - super().setup() - except KafkaException as error: - raise FatalInputError(self, f"Could not setup kafka consumer: {error}") from error + async def acknowledge(self, events: list[LogEvent]): + logger.debug("acknowledge called") + + async def shut_down(self) -> None: + """Shut down the confluent kafka input connector and cleanup resources.""" + + if self._consumer is not None: + await self._consumer.close() + + await super().shut_down() diff --git a/logprep/ng/connector/confluent_kafka/output.py b/logprep/ng/connector/confluent_kafka/output.py index 5ff83e786..aa55d3b32 100644 --- a/logprep/ng/connector/confluent_kafka/output.py +++ b/logprep/ng/connector/confluent_kafka/output.py @@ -25,6 +25,7 @@ import logging import typing +from collections.abc import Sequence from functools import cached_property, partial from socket import getfqdn from types import MappingProxyType @@ -32,10 +33,12 @@ from attrs import define, field, validators from confluent_kafka import KafkaException, Message, Producer # type: ignore from confluent_kafka.admin import AdminClient +from confluent_kafka.aio import AIOProducer -from logprep.metrics.metrics import GaugeMetric, Metric +from logprep.metrics.metrics import GaugeMetric from logprep.ng.abc.event import Event from logprep.ng.abc.output import FatalOutputError, Output +from logprep.ng.event.event_state import EventStateType from logprep.util.validators import keys_in_validator DEFAULTS = { @@ -195,6 +198,12 @@ class Config(Output.Config): - Regularly rotate your Kafka credentials and secrets. """ + __slots__ = ["_producer"] + + def __init__(self, name: str, configuration: "ConfluentKafkaOutput.Config"): + super().__init__(name, configuration) + self._producer: AIOProducer | None = None + @property def config(self) -> Config: """Provides the properly typed rule configuration object""" @@ -232,9 +241,20 @@ def _admin(self) -> AdminClient: admin_config[key] = value return AdminClient(admin_config) - @cached_property - def _producer(self) -> Producer: - return Producer(self._kafka_config) + def get_producer(self) -> AIOProducer: + """ + Configures and returns the asynchronous Kafka producer. + + Returns + ------- + AIOProducer + The pre-configured aiokafka producer object. + """ + + if self._producer is None: + self._producer = AIOProducer(self._kafka_config) + + return self._producer def _error_callback(self, error: KafkaException) -> None: """Callback for generic/global error events, these errors are typically @@ -281,7 +301,16 @@ def describe(self) -> str: f"{self.config.kafka_config.get('bootstrap.servers')}" ) - def store(self, event: Event) -> None: + async def store_batch( + self, events: Sequence[Event], target: str | None = None + ) -> Sequence[Event]: + store_target = target if target is not None else self.config.topic + for event in events: + await self.store_custom(event, store_target) + + return events + + async def store(self, event: Event) -> None: """Store a document in the producer topic. Parameters @@ -289,11 +318,11 @@ def store(self, event: Event) -> None: event : Event The event to store. """ - self.store_custom(event, self.config.topic) + await self.store_custom(event, self.config.topic) - @Output._handle_errors - @Metric.measure_time() - def store_custom(self, event: Event, target: str) -> None: + # @Output._handle_errors + # @Metric.measure_time() + async def store_custom(self, event: Event, target: str) -> None: """Write document to Kafka into target topic. Parameters @@ -303,29 +332,47 @@ def store_custom(self, event: Event, target: str) -> None: target : str Topic to store event data in. """ - event.state.next_state() + event.state.current_state = EventStateType.STORING_IN_OUTPUT + document = event.data self.metrics.number_of_processed_events += 1 + try: - self._producer.produce( + producer = self.get_producer() + delivery_future = await producer.produce( topic=target, value=self._encoder.encode(document), - on_delivery=partial(self.on_delivery, event), ) - logger.debug("Produced message %s to topic %s", str(document), target) - self._producer.poll(self.config.send_timeout) - except BufferError: - # block program until buffer is empty or timeout is reached - self._producer.flush(timeout=self.config.flush_timeout) - logger.debug("Buffer full, flushing") - - def flush(self) -> None: + msg = await delivery_future + except KafkaException as err: + event.state.current_state = EventStateType.FAILED + event.errors.append(err) + logger.error("Kafka exception during produce: %s", err) + self.metrics.number_of_errors += 1 + return + except Exception as err: + event.state.current_state = EventStateType.FAILED + event.errors.append(err) + logger.error("Message delivery failed: %s", err) + self.metrics.number_of_errors += 1 + return + + event.state.current_state = EventStateType.DELIVERED + logger.debug( + "Message delivered to '%s' partition %s, offset %s", + msg.topic(), + msg.partition(), + msg.offset(), + ) + + async def flush(self) -> None: """ensures that all messages are flushed. According to https://confluent-kafka-python.readthedocs.io/en/latest/#confluent_kafka.Producer.flush flush without the timeout parameter will block until all messages are delivered. This ensures no messages will get lost on shutdown. """ - remaining_messages = self._producer.flush() + producer = self.get_producer() + remaining_messages = await producer.flush() if remaining_messages: self.metrics.number_of_errors += 1 logger.error( @@ -348,25 +395,18 @@ def health(self) -> bool: return False return super().health() - def setup(self) -> None: - """Set the component up.""" + async def setup(self) -> None: + """Set the confluent kafka output connector.""" + try: - super().setup() + await super().setup() except KafkaException as error: raise FatalOutputError(self, f"Could not setup kafka producer: {error}") from error - def on_delivery(self, event: Event, err: KafkaException, msg: Message) -> None: - """Callback for delivery reports.""" - if err is not None: - event.state.next_state(success=False) - event.errors.append(err) - logger.error("Message delivery failed: %s", err) - self.metrics.number_of_errors += 1 - return - event.state.next_state(success=True) - logger.debug( - "Message delivered to '%s' partition %s, offset %s", - msg.topic(), - msg.partition(), - msg.offset(), - ) + async def shut_down(self) -> None: + """Shut down the confluent kafka output connector and cleanup resources.""" + + await super().shut_down() + + if self._producer is not None: + await self._producer.close() diff --git a/logprep/ng/connector/file/input.py b/logprep/ng/connector/file/input.py index 72978330d..715274745 100644 --- a/logprep/ng/connector/file/input.py +++ b/logprep/ng/connector/file/input.py @@ -161,7 +161,7 @@ def _line_to_dict(self, input_line: str) -> dict: return {"message": input_line} return {} - def _get_event(self, timeout: float) -> tuple: + async def _get_event(self, timeout: float) -> tuple: """Returns the first message from the threadsafe queue""" try: message: dict = self._messages.get(timeout=timeout) @@ -170,12 +170,12 @@ def _get_event(self, timeout: float) -> tuple: except queue.Empty: return None, None, None - def setup(self) -> None: + async def setup(self) -> None: """Creates and starts the Thread that continuously monitors the given logfile. Right now this input connector is only started in the first process. It needs the class attribute pipeline_index before running setup in Pipeline Initiation""" - super().setup() + await super().setup() if not hasattr(self, "pipeline_index"): raise FatalInputError( self, "Necessary instance attribute `pipeline_index` could not be found." # type: ignore @@ -191,7 +191,7 @@ def setup(self) -> None: file_name=self.config.logfile_path, ) - def _shut_down(self) -> None: + async def shut_down(self) -> None: """Raises the Stop Event Flag that will stop the thread that monitors the logfile""" self.stop_flag.set() - return super()._shut_down() + await super().shut_down() diff --git a/logprep/ng/connector/http/input.py b/logprep/ng/connector/http/input.py index 358ee810b..e59755592 100644 --- a/logprep/ng/connector/http/input.py +++ b/logprep/ng/connector/http/input.py @@ -114,6 +114,7 @@ ) from logprep.factory_error import InvalidConfigurationError from logprep.metrics.metrics import CounterMetric, GaugeMetric +from logprep.ng.abc.event import EventMetadata from logprep.ng.abc.input import Input from logprep.util import http, rstr from logprep.util.credentials import CredentialsFactory @@ -272,7 +273,7 @@ def __attrs_post_init__(self): __slots__: list[str] = ["target", "app", "http_server"] - messages: typing.Optional[Queue] = None + messages: typing.Optional[Queue[dict]] = None _endpoint_registry: Mapping[str, type[HttpEndpoint]] = { "json": JSONHttpEndpoint, @@ -297,10 +298,10 @@ def config(self) -> Config: """Provides the properly typed rule configuration object""" return typing.cast(HttpInput.Config, self._config) - def setup(self) -> None: + async def setup(self) -> None: """setup starts the actual functionality of this connector.""" - super().setup() + await super().setup() if self.messages is None: raise ValueError("message queue `messages` has not been set") @@ -344,23 +345,23 @@ def _get_asgi_app(endpoints_config: dict) -> falcon.asgi.App: app.add_sink(endpoint, prefix=route_compile_helper(endpoint_path)) return app - def _get_event(self, timeout: float) -> tuple: + async def _get_event(self, timeout: float) -> tuple[dict, bytes, EventMetadata] | None: """Returns the first message from the queue""" - messages = typing.cast(Queue, self.messages) + messages = typing.cast(Queue[dict], self.messages) self.metrics.message_backlog_size += messages.qsize() try: message = messages.get(timeout=timeout) raw_message = str(message).encode("utf8") - return message, raw_message, None + return message, raw_message, EventMetadata.from_dict({}) except queue.Empty: - return None, None, None + return None - def _shut_down(self): + async def shut_down(self): """Raises Uvicorn HTTP Server internal stop flag and waits to join""" if self.http_server: self.http_server.shut_down() - return super()._shut_down() + await super().shut_down() @cached_property def health_endpoints(self) -> list[str]: diff --git a/logprep/ng/connector/jsonl/output.py b/logprep/ng/connector/jsonl/output.py index b93b0ed40..249f658e3 100644 --- a/logprep/ng/connector/jsonl/output.py +++ b/logprep/ng/connector/jsonl/output.py @@ -19,6 +19,7 @@ """ import json +import typing from attrs import define, field, validators @@ -61,11 +62,16 @@ def __init__(self, name: str, configuration: "Output.Config"): self.events = [] self.failed_events = [] - def setup(self): - super().setup() - open(self._config.output_file, "a+", encoding="utf8").close() - if self._config.output_file_custom: - open(self._config.output_file_custom, "a+", encoding="utf8").close() + @property + def config(self) -> Config: + """Provides the properly typed configuration object""" + return typing.cast(JsonlOutput.Config, self._config) + + async def setup(self): + await super().setup() + open(self.config.output_file, "a+", encoding="utf8").close() + if self.config.output_file_custom: + open(self.config.output_file_custom, "a+", encoding="utf8").close() @staticmethod def _write_json(filepath: str, line: dict): @@ -78,7 +84,7 @@ def store(self, event: Event) -> None: """Store the event in the output destination.""" event.state.next_state() self.events.append(event.data) - JsonlOutput._write_json(self._config.output_file, event.data) + JsonlOutput._write_json(self.config.output_file, event.data) self.metrics.number_of_processed_events += 1 event.state.next_state(success=True) @@ -89,8 +95,8 @@ def store_custom(self, event: Event, target: str) -> None: document = {target: event.data} self.events.append(document) - if self._config.output_file_custom: - JsonlOutput._write_json(self._config.output_file_custom, document) + if self.config.output_file_custom: + JsonlOutput._write_json(self.config.output_file_custom, document) self.metrics.number_of_processed_events += 1 event.state.next_state(success=True) diff --git a/logprep/ng/connector/opensearch/output.py b/logprep/ng/connector/opensearch/output.py index 315192eea..df99a74cf 100644 --- a/logprep/ng/connector/opensearch/output.py +++ b/logprep/ng/connector/opensearch/output.py @@ -33,18 +33,23 @@ import logging import ssl import typing +from collections.abc import Sequence from functools import cached_property from typing import List, Optional -import opensearchpy as search from attrs import define, field, validators -from opensearchpy import OpenSearchException, helpers +from opensearchpy import ( + AsyncOpenSearch, + OpenSearchException, + SerializationError, + helpers, +) from opensearchpy.serializer import JSONSerializer from logprep.abc.exceptions import LogprepException -from logprep.metrics.metrics import Metric from logprep.ng.abc.event import Event from logprep.ng.abc.output import Output +from logprep.ng.event.event_state import EventStateType logger = logging.getLogger("OpenSearchOutput") @@ -82,7 +87,7 @@ def dumps(self, data): try: return self._encoder.encode(data).decode("utf-8") except (ValueError, TypeError) as e: - raise search.exceptions.SerializationError(data, e) + raise SerializationError(data, e) def loads(self, s): return self._decoder.decode(s) @@ -132,17 +137,22 @@ class Config(Output.Config): thread_count: int = field( default=4, validator=(validators.instance_of(int), validators.gt(1)) ) - """Number of threads to use for bulk requests.""" + """Number of threads to use for bulk requests. + DEPCRECATED: This Argument is deprecated and doesnt do anything anymore, + it will be removed in the future""" queue_size: int = field( default=4, validator=(validators.instance_of(int), validators.gt(1)) ) - """Number of queue size to use for bulk requests.""" + """Number of queue size to use for bulk requests. + DEPCRECATED: This Argument is deprecated and doesnt do anything anymore, + it will be removed in the future""" chunk_size: int = field( default=500, validator=(validators.instance_of(int), validators.gt(1)) ) """Chunk size to use for bulk requests.""" max_chunk_bytes: int = field( - default=100 * 1024 * 1024, validator=(validators.instance_of(int), validators.gt(1)) + default=100 * 1024 * 1024, + validator=(validators.instance_of(int), validators.gt(1)), ) """Max chunk size to use for bulk requests. The default is 100MB.""" max_retries: int = field( @@ -155,16 +165,24 @@ class Config(Output.Config): """Desired cluster status for health check as list of strings. Default is ["green"]""" default_op_type: str = field( default="index", - validator=(validators.instance_of(str), validators.in_(["create", "index"])), + validator=( + validators.instance_of(str), + validators.in_(["create", "index"]), + ), ) """Default op_type for indexing documents. Default is 'index', Consider using 'create' for data streams or to prevent overwriting existing documents.""" - __slots__ = ("_message_backlog",) + __slots__ = ("_message_backlog", "_flush_task") _message_backlog: list[Event] """List of messages to be sent to Opensearch.""" + @property + def _metrics(self) -> Output.Metrics: + """Provides the properly typed metrics object""" + return typing.cast(Output.Metrics, self.metrics) + @property def config(self) -> Config: """Provides the properly typed rule configuration object""" @@ -212,7 +230,7 @@ def http_auth(self) -> tuple | None: @cached_property def _search_context(self): """Returns the opensearch client.""" - return search.OpenSearch( + return AsyncOpenSearch( self.config.hosts, scheme=self.schema, http_auth=self.http_auth, @@ -228,12 +246,9 @@ def _search_context(self): def __init__(self, name: str, configuration: "OpensearchOutput.Config"): super().__init__(name, configuration) - self._message_backlog = [] - def setup(self): - super().setup() - flush_timeout = self.config.flush_timeout - self._schedule_task(task=self.flush, seconds=flush_timeout) + async def setup(self): + await super().setup() def describe(self) -> str: """Get name of Opensearch endpoint with the host. @@ -247,13 +262,12 @@ def describe(self) -> str: base_description = Output.describe(self) return f"{base_description} - Opensearch Output: {self.config.hosts}" - @Output._handle_errors - def store(self, event: Event) -> None: + async def store(self, event: Event) -> None: """Store a document in the index defined in the document or to the default index.""" - self.store_custom(event, event.data.get("_index", self.config.default_index)) + await self.store_batch([event], event.data.get("_index", self.config.default_index)) - @Output._handle_errors - def store_custom(self, event: Event, target: str) -> None: + # @Output._handle_errors + async def store_custom(self, event: Event, target: str) -> None: """Store document into backlog to be written into Opensearch with the target index. The target index is determined per document by parameter :code:`target`. @@ -264,76 +278,84 @@ def store_custom(self, event: Event, target: str) -> None: target : str Index to store the document in. """ - event.state.next_state() - document = event.data - document["_index"] = target - document["_op_type"] = document.get("_op_type", self.config.default_op_type) - self.metrics.number_of_processed_events += 1 - self._message_backlog.append(event) - self._write_to_search_context() - - def _write_to_search_context(self): - """Writes documents from a buffer into Opensearch indices. - - Writes documents in a bulk if the document buffer limit has been reached. - This reduces connections to Opensearch and improves performance. - """ - if len(self._message_backlog) >= self.config.message_backlog_size: - self.flush() - - @Metric.measure_time() - def flush(self): - if not self._message_backlog: - return - logger.debug("Flushing %d documents to Opensearch", len(self._message_backlog)) - self._bulk(self._search_context, self._message_backlog) - self._message_backlog.clear() - - def _bulk(self, client: search.OpenSearch, events: list[Event]) -> None: - """Bulk index documents into Opensearch. - Uses the parallel_bulk function from the opensearchpy library. - - the error information is stored in a document with the following structure: - - ```json - { - "op_type": { - "error": "error message", - "status": "status_code", - "exception": "exception message" + await self.store_batch([event], target) + + # @Output._handle_errors + async def store_batch( + self, events: Sequence[Event], target: str | None = None + ) -> Sequence[Event]: + logger.debug("store_batch called with %d events, target=%s", len(events), target) + target = target if target else self.config.default_index + + for event in events: + document = event.data + document["_index"] = document.get("_index", target) + document["_op_type"] = document.get("_op_type", self.config.default_op_type) + + event.state.current_state = EventStateType.STORING_IN_OUTPUT + + self._metrics.number_of_processed_events += len(events) + logger.debug("Flushing %d documents to Opensearch", len(events)) + await self._bulk(self._search_context, events) + return events + + # @Metric.measure_time() + async def flush(self): + logger.debug("flush is not required") + + async def _bulk(self, client: AsyncOpenSearch, events: Sequence[Event]) -> None: + """Bulk index documents into Opensearch. Uses the parallel_bulk function from the opensearchpy library. + The error information is stored in a document with the following structure: + json + { + "op_type": { + "error": "error message", + "status": "status_code", + "exception": "exception message" + } } } - } """ + kwargs = { "max_chunk_bytes": self.config.max_chunk_bytes, "chunk_size": self.config.chunk_size, - "queue_size": self.config.queue_size, - "thread_count": self.config.thread_count, "raise_on_error": False, "raise_on_exception": False, } + actions = (event.data for event in events) - for index, result in enumerate(helpers.parallel_bulk(client, actions, **kwargs)): # type: ignore - success, item = result + + index = 0 + async for success, item in helpers.async_streaming_bulk(client, actions, **kwargs): + event = events[index] + index += 1 + if success: - events[index].state.next_state(success=True) + event.state.current_state = EventStateType.DELIVERED continue - op_type = item.get("_op_type", self.config.default_op_type) - error_info = item.get(op_type, {}) + + event.state.current_state = EventStateType.FAILED + + op_infos = item.values() + error_info = op_infos[0] if len(op_infos) > 0 else {} + error = BulkError(error_info.get("error", "Failed to index document"), **error_info) - event = events[index] - event.state.next_state(success=False) event.errors.append(error) - def health(self) -> bool: + async def health(self) -> bool: # type: ignore # TODO: fix mypy issue """Check the health of the component.""" try: - resp = self._search_context.cluster.health( + resp = await self._search_context.cluster.health( params={"timeout": self.config.health_timeout} ) except (OpenSearchException, ConnectionError) as error: logger.error("Health check failed: %s", error) - self.metrics.number_of_errors += 1 + self._metrics.number_of_errors += 1 return False return super().health() and resp.get("status") in self.config.desired_cluster_status + + async def shut_down(self): + if "_search_context" in self.__dict__: + await self._search_context.close() + await super().shut_down() diff --git a/logprep/ng/event/event_state.py b/logprep/ng/event/event_state.py index 266d7ea43..1dd9b45da 100644 --- a/logprep/ng/event/event_state.py +++ b/logprep/ng/event/event_state.py @@ -19,16 +19,26 @@ class EventStateType(StrEnum): PROCESSED = "processed" """The event has been processed by all pipeline processors.""" + STORING_IN_OUTPUT = "storing_in_output" + """The event is storing in the output connector.""" + STORED_IN_OUTPUT = "stored_in_output" """The event was successfully stored in the output connector.""" FAILED = "failed" """The event failed during processing or output storage.""" + STORING_IN_ERROR = "storing_in_error" + """The event is storing in the error output (e.g. error queue or + fallback output).""" + STORED_IN_ERROR = "stored_in_error" """The event was stored in the error output (e.g. error queue or fallback output).""" + DELIVERING = "delivering" + """The event is delivering to the target system or final destination.""" + DELIVERED = "delivered" """The event was delivered to the target system or final destination.""" diff --git a/logprep/ng/manager.py b/logprep/ng/manager.py new file mode 100644 index 000000000..1126db363 --- /dev/null +++ b/logprep/ng/manager.py @@ -0,0 +1,236 @@ +""" +Runner module +""" + +import asyncio +import logging +import typing +from asyncio import CancelledError +from typing import cast + +from logprep.factory import Factory +from logprep.ng.abc.input import Input +from logprep.ng.abc.output import Output +from logprep.ng.abc.processor import Processor +from logprep.ng.event.event_state import EventStateType +from logprep.ng.event.log_event import LogEvent +from logprep.ng.event.set_event_backlog import SetEventBacklog +from logprep.ng.pipeline import Pipeline +from logprep.ng.sender import Sender +from logprep.ng.util.async_helpers import report_event_state +from logprep.ng.util.configuration import Configuration +from logprep.ng.util.worker.types import SizeLimitedQueue +from logprep.ng.util.worker.worker import Worker, WorkerOrchestrator + +logger = logging.getLogger("PipelineManager") + + +BATCH_SIZE = 2_500 +BATCH_INTERVAL_S = 5 +MAX_QUEUE_SIZE = BATCH_SIZE + + +class PipelineManager: + """Orchestrator class managing pipeline inputs, processors and outputs""" + + def __init__(self, configuration: Configuration, shutdown_timeout_s: float) -> None: + """Initialize the component from the given `configuration`.""" + + self.configuration = configuration + self._shutdown_timeout_s = shutdown_timeout_s + + async def setup(self): + """Setup the pipeline manager.""" + + self._event_backlog = SetEventBacklog() + + self._input_connector = cast(Input, Factory.create(self.configuration.input)) + await self._input_connector.setup() + + processors = [ + typing.cast(Processor, Factory.create(processor_config)) + for processor_config in self.configuration.pipeline + ] + for processor in processors: + await processor.setup() + + self._pipeline = Pipeline(processors) + + output_connectors = [ + typing.cast(Output, Factory.create({output_name: output})) + for output_name, output in self.configuration.output.items() + ] + + error_output = ( + typing.cast(Output, Factory.create(self.configuration.error_output)) + if self.configuration.error_output + else None + ) + + if error_output is None: + logger.warning("No error output configured.") + + self._sender = Sender(outputs=output_connectors, error_output=error_output) + await self._sender.setup() + + self._queues = [] + self._orchestrator = self._create_orchestrator() + + def _create_orchestrator(self) -> WorkerOrchestrator: # pylint: disable=too-many-locals + process_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) + send_to_default_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) + send_to_extras_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) + send_to_error_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) + acknowledge_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) + self._queues = [ + process_queue, + send_to_default_queue, + send_to_extras_queue, + send_to_error_queue, + acknowledge_queue, + ] + + async def _report_event_state(batch: list[LogEvent]) -> list[LogEvent]: + return await report_event_state(logger, batch) + + async def transfer_batch(batch: list[LogEvent]) -> list[LogEvent]: + for event in batch: + event.state.current_state = EventStateType.RECEIVED + + _ = await _report_event_state(batch) + return batch + + input_worker: Worker[LogEvent, LogEvent] = Worker( + name="input_worker", + batch_size=500, + batch_interval_s=BATCH_INTERVAL_S, + in_queue=self._input_connector(timeout=self.configuration.timeout), + out_queue=process_queue, + handler=transfer_batch, + ) + + async def _processor_handler(batch: list[LogEvent]) -> list[LogEvent]: + async def _handle(event: LogEvent): + # TODO make processing async + self._pipeline.process(event) + # TODO handle all possible states + if event.state != EventStateType.FAILED: + if event.extra_data: + await send_to_extras_queue.put(event) + else: + await send_to_default_queue.put(event) + else: + await send_to_error_queue.put(event) + + await asyncio.gather(*map(_handle, batch)) + + _ = await _report_event_state(batch) + return batch + + processing_worker: Worker[LogEvent, LogEvent] = Worker( + name="processing_worker", + batch_size=BATCH_SIZE, + batch_interval_s=BATCH_INTERVAL_S, + in_queue=process_queue, + handler=_processor_handler, + ) + + async def _send_extras_handler(batch: list[LogEvent]) -> list[LogEvent]: + _ = await _report_event_state(batch) + return await self._sender.send_extras(batch) + + extra_output_worker: Worker[LogEvent, LogEvent] = Worker( + name="extra_output_worker", + batch_size=BATCH_SIZE, + batch_interval_s=BATCH_INTERVAL_S, + in_queue=send_to_extras_queue, + out_queue=send_to_default_queue, + handler=_send_extras_handler, + ) + + async def _send_default_output_handler(batch: list[LogEvent]) -> list[LogEvent]: + _ = await _report_event_state(batch) + return await self._sender.send_default_output(batch) + + output_worker: Worker[LogEvent, LogEvent] = Worker( + name="output_worker", + batch_size=BATCH_SIZE, + batch_interval_s=BATCH_INTERVAL_S, + in_queue=send_to_default_queue, + out_queue=acknowledge_queue, + handler=_send_default_output_handler, + ) + + async def _send_error_output_handler(batch: list[LogEvent]) -> list[LogEvent]: + await _report_event_state(batch) + await self._sender._send_and_flush_failed_events(batch) + return batch + + error_worker: Worker[LogEvent, LogEvent] = Worker( + name="error_worker", + batch_size=BATCH_SIZE, + batch_interval_s=BATCH_INTERVAL_S, + in_queue=send_to_error_queue, + handler=_send_error_output_handler, + ) + + acknowledge_worker: Worker[LogEvent, LogEvent] = Worker( + name="acknowledge_worker", + batch_size=BATCH_SIZE, + batch_interval_s=BATCH_INTERVAL_S, + in_queue=acknowledge_queue, + handler=_report_event_state, + ) + + return WorkerOrchestrator( + workers=[ + input_worker, + processing_worker, + extra_output_worker, + output_worker, + error_worker, + acknowledge_worker, + ] + ) + + async def run(self) -> None: + """Run the runner and continuously process events until stopped.""" + + try: + await self._orchestrator.run() + except CancelledError: + logger.debug("PipelineManager.run cancelled. Shutting down.") + await self.shut_down() + raise + except Exception: + logger.exception("PipelineManager.run failed. Shutting down.") + await self.shut_down() + raise + + async def shut_down(self) -> None: + """Shut down runner components, and required runner attributes.""" + + logger.debug( + "Remaining items in queues: [%s]", ", ".join(f"{q.qsize()}" for q in self._queues) + ) + + if self._orchestrator is not None: + # TODO only a fraction of shutdown_timeout_s should be passed to the orchestrator + await self._orchestrator.shut_down(self._shutdown_timeout_s) + + logger.debug( + "Remaining items in queues: [%s]", ", ".join(f"{q.qsize()}" for q in self._queues) + ) + + if self._sender is not None: + await self._sender.shut_down() + # self._input_connector.acknowledge() + await self._input_connector.shut_down() + + len_delivered_events = len(list(self._event_backlog.get(EventStateType.DELIVERED))) + if len_delivered_events: + logger.error( + "Input connector has %d non-acked events in event_backlog.", len_delivered_events + ) + + logger.info("Runner shut down complete.") diff --git a/logprep/ng/pipeline.py b/logprep/ng/pipeline.py index ace4e8a9c..715de773d 100644 --- a/logprep/ng/pipeline.py +++ b/logprep/ng/pipeline.py @@ -1,90 +1,69 @@ """pipeline module for processing events through a series of processors.""" +import asyncio import logging -from collections.abc import Iterator -from functools import partial -from typing import Generator from logprep.ng.abc.processor import Processor +from logprep.ng.event.event_state import EventStateType from logprep.ng.event.log_event import LogEvent logger = logging.getLogger("Pipeline") -def _process_event(event: LogEvent | None, processors: list[Processor]) -> LogEvent: +def _process_event(event: LogEvent, processors: list[Processor]) -> LogEvent: """process all processors for one event""" - if event is None or not event.data: - return None - event.state.next_state() + event.state.current_state = EventStateType.PROCESSING for processor in processors: if not event.data: break processor.process(event) + event.errors.append(ValueError("test")) if not event.errors: - event.state.next_state(success=True) + event.state.current_state = EventStateType.PROCESSED else: - event.state.next_state(success=False) + event.state.current_state = EventStateType.FAILED logger.error("event failed: %s with errors: %s", event, event.errors) return event -class Pipeline(Iterator): - """Pipeline class to process events through a series of processors. - Examples: - >>> from logprep.ng.event.log_event import LogEvent - >>> from logprep.ng.abc.event import Event - >>> class MockProcessor: - ... def process(self, event: LogEvent) -> None: - ... event.data["processed"] = True - ... - >>> - >>> # Create test events - >>> events = [ - ... LogEvent({"message": "test1"}, original=b""), - ... LogEvent({"message": "test2"}, original=b"") - ... ] - >>> processors = [MockProcessor()] - >>> - >>> # Create and run pipeline - >>> pipeline = Pipeline(iter(events), processors) - >>> processed_events = list(pipeline) - >>> len(processed_events) - 2 - >>> processed_events[0].data["processed"] - True - >>> processed_events[1].data["message"] - 'test2' - """ +class Pipeline: + """Pipeline class to process events through a series of processors.""" def __init__( self, - log_events_iter: Iterator[LogEvent], processors: list[Processor], ) -> None: self.processors = processors - self.log_events_iter = log_events_iter - def __iter__(self) -> Generator[LogEvent | None, None, None]: - """Iterate over processed events.""" + def process(self, event: LogEvent) -> LogEvent: + """Process the given event through the series of configured processors - yield from map(partial(_process_event, processors=self.processors), self.log_events_iter) + Parameters + ---------- + event : LogEvent + The event to be processed and modified in-place. - def __next__(self): - raise NotImplementedError("Use iteration to get processed events.") + Returns + ------- + LogEvent + The event which was presented as an input and modified in-place. + """ + return _process_event(event, processors=self.processors) - def shut_down(self) -> None: + async def shut_down(self) -> None: """Shutdown the pipeline gracefully.""" for processor in self.processors: - processor.shut_down() + await processor.shut_down() logger.info("All processors has been shut down.") logger.info("Pipeline has been shut down.") - def setup(self) -> None: + async def setup(self) -> None: """Setup the pipeline components.""" - for processor in self.processors: - processor.setup() + await asyncio.gather( + *(processor.setup() for processor in self.processors), return_exceptions=True + ) logger.info("Pipeline has been set up.") diff --git a/logprep/ng/processor/amides/processor.py b/logprep/ng/processor/amides/processor.py index 0d124c472..8903a3e53 100644 --- a/logprep/ng/processor/amides/processor.py +++ b/logprep/ng/processor/amides/processor.py @@ -83,6 +83,7 @@ """ import logging +import typing from functools import cached_property, lru_cache from multiprocessing import current_process from pathlib import Path @@ -97,6 +98,7 @@ from logprep.processor.amides.detection import MisuseDetector, RuleAttributor from logprep.processor.amides.normalize import CommandLineNormalizer from logprep.processor.amides.rule import AmidesRule +from logprep.processor.base.rule import Rule from logprep.util.getter import GetterFactory from logprep.util.helper import get_dotted_field_value @@ -201,8 +203,8 @@ def _normalizer(self): def _evaluate_cmdline_cached(self): return lru_cache(maxsize=self._config.max_cache_entries)(self._evaluate_cmdline) - def setup(self): - super().setup() + async def setup(self): + await super().setup() models = self._load_and_unpack_models() self._misuse_detector = MisuseDetector(models["single"], self._config.decision_threshold) @@ -227,7 +229,8 @@ def _load_and_unpack_models(self): return models - def _apply_rules(self, event: dict, rule: AmidesRule): + def _apply_rules(self, event: dict, rule: Rule): + rule = typing.cast(AmidesRule, rule) cmdline = get_dotted_field_value(event, rule.source_fields[0]) if self._handle_missing_fields(event, rule, rule.source_fields, [cmdline]): return diff --git a/logprep/ng/processor/generic_resolver/processor.py b/logprep/ng/processor/generic_resolver/processor.py index 163392852..ab6dbba23 100644 --- a/logprep/ng/processor/generic_resolver/processor.py +++ b/logprep/ng/processor/generic_resolver/processor.py @@ -213,6 +213,6 @@ def _update_cache_metrics(self) -> None: self.metrics.num_cache_entries += cache_info.currsize self.metrics.cache_load += cache_info.currsize / self.max_cache_entries - def setup(self) -> None: - super().setup() + async def setup(self) -> None: + await super().setup() self._cache_metrics_skip_count = 0 diff --git a/logprep/ng/processor/geoip_enricher/processor.py b/logprep/ng/processor/geoip_enricher/processor.py index 70e4894a8..7694f0f87 100644 --- a/logprep/ng/processor/geoip_enricher/processor.py +++ b/logprep/ng/processor/geoip_enricher/processor.py @@ -95,8 +95,8 @@ def _city_db(self) -> database.Reader: logger.exception("failed to load GeoIP database") raise - def setup(self) -> None: - super().setup() + async def setup(self) -> None: + await super().setup() _ = self._city_db # trigger download def _try_getting_geoip_data(self, ip_string: str) -> dict: diff --git a/logprep/ng/processor/grokker/processor.py b/logprep/ng/processor/grokker/processor.py index c77191626..b72c7c1b6 100644 --- a/logprep/ng/processor/grokker/processor.py +++ b/logprep/ng/processor/grokker/processor.py @@ -110,9 +110,9 @@ def _apply_rules(self, event: dict, rule: Rule) -> None: if not matches: raise ProcessingWarning("no grok pattern matched", rule, event) - def setup(self) -> None: + async def setup(self) -> None: """Loads the action mapping. Has to be called before processing""" - super().setup() + await super().setup() custom_patterns_dir = self.config.custom_patterns_dir if re.search(r"http(s)?:\/\/.*?\.zip", custom_patterns_dir): with tempfile.TemporaryDirectory("grok") as patterns_tmp_path: diff --git a/logprep/ng/processor/labeler/processor.py b/logprep/ng/processor/labeler/processor.py index 370069955..991dfc47a 100644 --- a/logprep/ng/processor/labeler/processor.py +++ b/logprep/ng/processor/labeler/processor.py @@ -24,11 +24,13 @@ .. automodule:: logprep.processor.labeler.rule """ -from typing import Optional +import typing +from collections.abc import Iterable, Sequence from attrs import define, field, validators from logprep.ng.abc.processor import Processor +from logprep.processor.base.rule import Rule from logprep.processor.labeler.labeling_schema import LabelingSchema from logprep.processor.labeler.rule import LabelerRule from logprep.util.helper import add_fields_to, get_dotted_field_value @@ -43,9 +45,7 @@ class Config(Processor.Config): schema: str = field(validator=validators.instance_of(str)) """Path to a labeling schema file. For string format see :ref:`getters`.""" - include_parent_labels: Optional[bool] = field( - default=False, validator=validators.optional(validator=validators.instance_of(bool)) - ) + include_parent_labels: bool = field(default=False, validator=validators.instance_of(bool)) """If the option is deactivated only labels defined in a rule will be activated. Otherwise, also allowed labels in the path to the *root* of the corresponding category of a label will be added. @@ -58,24 +58,35 @@ class Config(Processor.Config): rule_class = LabelerRule - def __init__(self, name: str, configuration: Processor.Config) -> None: + def __init__(self, name: str, configuration: "Labeler.Config") -> None: self._schema = LabelingSchema.create_from_file(configuration.schema) super().__init__(name, configuration=configuration) - def setup(self) -> None: - super().setup() + @property + def config(self) -> Config: + """Provides the properly typed configuration object""" + return typing.cast(Labeler.Config, self._config) + + @property + def rules(self) -> Sequence[LabelerRule]: + """Returns all rules""" + return typing.cast(Sequence[LabelerRule], super().rules) + + async def setup(self) -> None: + await super().setup() for rule in self.rules: - if self._config.include_parent_labels: + if self.config.include_parent_labels: rule.add_parent_labels_from_schema(self._schema) rule.conforms_to_schema(self._schema) - def _apply_rules(self, event: dict, rule: LabelerRule) -> None: + def _apply_rules(self, event: dict, rule: Rule) -> None: """Applies the rule to the current event""" - fields = {key: value for key, value in rule.prefixed_label.items()} - add_fields_to(event, fields, rule=rule, merge_with_target=True) - # convert sets into sorted lists + rule = typing.cast(LabelerRule, rule) + add_fields_to(event, rule.prefixed_label, rule=rule, merge_with_target=True) + # we have already added (merged) the prefixed_labels with list values + # now we extract them to make them unique and sorted fields = { - key: sorted(set(get_dotted_field_value(event, key))) + key: sorted(set(typing.cast(Iterable, get_dotted_field_value(event, key)))) for key, _ in rule.prefixed_label.items() } add_fields_to(event, fields, rule=rule, overwrite_target=True) diff --git a/logprep/ng/processor/list_comparison/processor.py b/logprep/ng/processor/list_comparison/processor.py index bae57be27..bb3d16537 100644 --- a/logprep/ng/processor/list_comparison/processor.py +++ b/logprep/ng/processor/list_comparison/processor.py @@ -26,6 +26,9 @@ .. automodule:: logprep.processor.list_comparison.rule """ +import typing +from collections.abc import Sequence + from attrs import define, field, validators from logprep.ng.abc.processor import Processor @@ -49,10 +52,20 @@ class Config(Processor.Config): rule_class = ListComparisonRule - def setup(self) -> None: - super().setup() + @property + def config(self) -> Config: + """Provides the properly typed configuration object""" + return typing.cast(ListComparison.Config, self._config) + + @property + def rules(self) -> Sequence[ListComparisonRule]: + """Returns all rules""" + return typing.cast(Sequence[ListComparisonRule], super().rules) + + async def setup(self) -> None: + await super().setup() for rule in self.rules: - rule.init_list_comparison(self._config.list_search_base_path) + rule.init_list_comparison(self.config.list_search_base_path) def _apply_rules(self, event, rule): """Apply matching rule to given log event. diff --git a/logprep/ng/processor/pseudonymizer/processor.py b/logprep/ng/processor/pseudonymizer/processor.py index ff68ff6a9..a55bde538 100644 --- a/logprep/ng/processor/pseudonymizer/processor.py +++ b/logprep/ng/processor/pseudonymizer/processor.py @@ -44,6 +44,8 @@ """ import re +import typing +from collections.abc import Sequence from functools import cached_property, lru_cache from itertools import chain from typing import Callable, Pattern @@ -56,15 +58,17 @@ from logprep.metrics.metrics import CounterMetric, GaugeMetric from logprep.ng.event.pseudonym_event import PseudonymEvent from logprep.ng.processor.field_manager.processor import FieldManager +from logprep.processor.base.rule import Rule from logprep.processor.pseudonymizer.rule import PseudonymizerRule from logprep.util.getter import GetterFactory from logprep.util.hasher import SHA256Hasher -from logprep.util.helper import add_fields_to, get_dotted_field_value +from logprep.util.helper import add_fields_to, get_dotted_field_values from logprep.util.pseudo.encrypter import ( DualPKCS1HybridCTREncrypter, DualPKCS1HybridGCMEncrypter, Encrypter, ) +from logprep.util.typing import is_lru_cached from logprep.util.url.url import extract_urls @@ -76,20 +80,16 @@ class Config(FieldManager.Config): """Pseudonymizer config""" outputs: tuple[dict[str, str]] = field( - validator=[ - validators.deep_iterable( - member_validator=[ - validators.instance_of(dict), - validators.deep_mapping( - key_validator=validators.instance_of(str), - value_validator=validators.instance_of(str), - mapping_validator=validators.max_len(1), - ), - ], - iterable_validator=validators.instance_of(tuple), - ), - validators.min_len(1), - ], + validator=validators.deep_iterable( + member_validator=[ + validators.deep_mapping( + key_validator=validators.instance_of(str), + value_validator=validators.instance_of(str), + mapping_validator=(validators.instance_of(dict), validators.max_len(1)), + ), + ], + iterable_validator=(validators.instance_of(tuple), validators.min_len(1)), + ), converter=tuple, ) """list of output mappings in form of :code:`output_name:topic`. @@ -193,27 +193,38 @@ def _hasher(self) -> SHA256Hasher: @cached_property def _encrypter(self) -> Encrypter: - if self._config.mode == "CTR": - encrypter = DualPKCS1HybridCTREncrypter() - else: - encrypter = DualPKCS1HybridGCMEncrypter() - encrypter.load_public_keys(self._config.pubkey_analyst, self._config.pubkey_depseudo) + encrypter = ( + DualPKCS1HybridCTREncrypter() + if self.config.mode == "CTR" + else DualPKCS1HybridGCMEncrypter() + ) + encrypter.load_public_keys(self.config.pubkey_analyst, self.config.pubkey_depseudo) return encrypter @cached_property def _regex_mapping(self) -> dict: - return GetterFactory.from_string(self._config.regex_mapping).get_yaml() + return GetterFactory.from_string(self.config.regex_mapping).get_dict() @cached_property def _get_pseudonym_dict_cached(self) -> Callable: - return lru_cache(maxsize=self._config.max_cached_pseudonyms)(self._pseudonymize) + return lru_cache(maxsize=self.config.max_cached_pseudonyms)(self._pseudonymize) @cached_property def _pseudonymize_url_cached(self) -> Callable: - return lru_cache(maxsize=self._config.max_cached_pseudonymized_urls)(self._pseudonymize_url) + return lru_cache(maxsize=self.config.max_cached_pseudonymized_urls)(self._pseudonymize_url) + + @property + def config(self) -> Config: + """Provides the properly typed configuration object""" + return typing.cast(Pseudonymizer.Config, self._config) + + @property + def rules(self) -> Sequence[PseudonymizerRule]: + """Returns all rules""" + return typing.cast(Sequence[PseudonymizerRule], super().rules) - def setup(self) -> None: - super().setup() + async def setup(self) -> None: + await super().setup() self._replace_regex_keywords_by_regex_expression() def _replace_regex_keywords_by_regex_expression(self) -> None: @@ -223,13 +234,12 @@ def _replace_regex_keywords_by_regex_expression(self) -> None: rule.pseudonyms[dotted_field] = re.compile(self._regex_mapping[regex_keyword]) elif isinstance(regex_keyword, str): # after the first run, the regex is compiled raise InvalidConfigurationError( - f"Regex keyword '{regex_keyword}' not found in regex_mapping '{self._config.regex_mapping}'" + f"Regex keyword '{regex_keyword}' not found in regex_mapping '{self.config.regex_mapping}'" ) - def _apply_rules(self, event: dict, rule: PseudonymizerRule) -> None: - source_dict = {} - for source_field in rule.pseudonyms: - source_dict[source_field] = get_dotted_field_value(event, source_field) + def _apply_rules(self, event: dict, rule: Rule) -> None: + rule = typing.cast(PseudonymizerRule, rule) + source_dict = get_dotted_field_values(event, rule.pseudonyms) self._handle_missing_fields(event, rule, source_dict.keys(), source_dict.values()) for dotted_field, field_value in source_dict.items(): @@ -242,7 +252,7 @@ def _apply_rules(self, event: dict, rule: PseudonymizerRule) -> None: for value in field_value ] else: - field_value = self._pseudonymize_field(rule, dotted_field, regex, field_value) + field_value = self._pseudonymize_field(rule, dotted_field, regex, str(field_value)) add_fields_to( event, fields={dotted_field: field_value}, rule=rule, overwrite_target=True ) @@ -277,13 +287,13 @@ def _pseudonymize_string(self, value: str) -> str: if self.pseudonymized_pattern.match(value): return value pseudonym_dict = self._get_pseudonym_dict_cached(value) - pseudonym_event = PseudonymEvent(pseudonym_dict, outputs=self._config.outputs) + pseudonym_event = PseudonymEvent(pseudonym_dict, outputs=self.config.outputs) if pseudonym_event not in self._event.extra_data: self._event.extra_data.append(pseudonym_event) return self._wrap_hash(pseudonym_dict["pseudonym"]) def _pseudonymize(self, value: str) -> dict[str, str]: - hash_string = self._hasher.hash_str(value, salt=self._config.hash_salt) + hash_string = self._hasher.hash_str(value, salt=self.config.hash_salt) encrypted_origin = self._encrypter.encrypt(value) return {"pseudonym": hash_string, "origin": encrypted_origin} @@ -325,11 +335,15 @@ def _wrap_hash(self, hash_string: str) -> str: return self.HASH_PREFIX + hash_string + self.HASH_SUFFIX def _update_cache_metrics(self) -> None: - cache_info_pseudonyms = self._get_pseudonym_dict_cached.cache_info() - cache_info_urls = self._pseudonymize_url_cached.cache_info() - self.metrics.new_results += cache_info_pseudonyms.misses + cache_info_urls.misses - self.metrics.cached_results += cache_info_pseudonyms.hits + cache_info_urls.hits - self.metrics.num_cache_entries += cache_info_pseudonyms.currsize + cache_info_urls.currsize - self.metrics.cache_load += (cache_info_pseudonyms.currsize + cache_info_urls.currsize) / ( - cache_info_pseudonyms.maxsize + cache_info_urls.maxsize + caches = [ + f.cache_info() + for f in [self._get_pseudonym_dict_cached, self._pseudonymize_url_cached] + if is_lru_cached(f) + ] + + self.metrics.new_results += sum(c.misses for c in caches) + self.metrics.cached_results += sum(c.hits for c in caches) + self.metrics.num_cache_entries += sum(c.currsize for c in caches) + self.metrics.cache_load += (sum(c.currsize for c in caches)) / ( + sum(typing.cast(int, c.maxsize) for c in caches) ) diff --git a/logprep/ng/processor/template_replacer/processor.py b/logprep/ng/processor/template_replacer/processor.py index 3a08b4161..d53dabaf9 100644 --- a/logprep/ng/processor/template_replacer/processor.py +++ b/logprep/ng/processor/template_replacer/processor.py @@ -175,8 +175,8 @@ def _perform_replacement( overwrite_target=overwrite, ) - def setup(self) -> None: - super().setup() + async def setup(self) -> None: + await super().setup() self._target_field = self.config.pattern["target_field"] self._fields = self.config.pattern["fields"] self._initialize_replacement_mapping() diff --git a/logprep/ng/runner.py b/logprep/ng/runner.py index 7bd90cdf9..23b6348e7 100644 --- a/logprep/ng/runner.py +++ b/logprep/ng/runner.py @@ -2,29 +2,27 @@ Runner module """ +import asyncio import json import logging -import logging.config import os import warnings -from typing import cast +from collections.abc import AsyncGenerator from attrs import asdict -from logprep.factory import Factory -from logprep.ng.abc.input import Input -from logprep.ng.abc.output import Output -from logprep.ng.abc.processor import Processor -from logprep.ng.event.event_state import EventStateType -from logprep.ng.event.set_event_backlog import SetEventBacklog -from logprep.ng.pipeline import Pipeline -from logprep.ng.sender import Sender +from logprep.ng.manager import PipelineManager +from logprep.ng.util.async_helpers import TerminateTaskGroup, restart_task_on_iter from logprep.ng.util.configuration import Configuration from logprep.ng.util.defaults import DEFAULT_LOG_CONFIG logger = logging.getLogger("Runner") +GRACEFUL_SHUTDOWN_TIMEOUT = 3 +HARD_SHUTDOWN_TIMEOUT = 5 + + class Runner: """Class, a singleton runner, responsible for running the log processing pipeline.""" @@ -42,195 +40,106 @@ def __init__(self, configuration: Configuration) -> None: Component wiring is deferred to `setup()` to preserve the required init order. """ - self.configuration = configuration - self._running_config_version = configuration.version - self._input_connector: Input | None = None - - # Initialized in `setup()`; updated by runner logic thereafter: - self.should_exit: bool | None = None - self.sender: Sender | None = None - - self.setup() - - def _initialize_pipeline(self) -> Pipeline: - """Initialize the pipeline from the given `configuration`. - - This method performs the following tasks: - - - Creates components based on the configuration: - - input connector - - processors - - - Sets up the input connector: - - attaches an event backlog - - calls its `setup()` method - - initializes its iterator with the configured timeout - - - Validates that: - - an input connector is configured - - all processors are properly configured - - - Instantiates the `Pipeline` with: - - the input connector iterator - - the list of processors - - Returns - ------- - Pipeline - The instantiated pipeline instance (not yet set up). - """ - - self._input_connector = cast(Input, Factory.create(self.configuration.input)) - self._input_connector.event_backlog = SetEventBacklog() - self._input_connector.setup() - - input_iterator = self._input_connector(timeout=self.configuration.timeout) - processors = cast( - list[Processor], - [Factory.create(processor_config) for processor_config in self.configuration.pipeline], - ) - - return Pipeline( - log_events_iter=input_iterator, - processors=cast(list[Processor], processors), - ) - - def _initialize_sender(self) -> Sender: - """Initialize the sender from the given `configuration`. - - This method performs the following tasks: - - - Creates components based on the configuration: - - output connectors - - error output + self.config = configuration + self._stop_event = asyncio.Event() - - Validates that: - - all output connectors are configured - - an error output is available + async def _refresh_configuration_gen( + self, initial_config_version: str | None = None + ) -> AsyncGenerator[Configuration, None]: + current_config_version = initial_config_version + refresh_interval = self.config.config_refresh_interval - - Instantiates the `Sender` with: - - the initialized pipeline - - configured outputs - - configured error output - - process count from configuration + if refresh_interval is None: + logger.debug("Config refresh has been disabled.") + return - Returns - ------- - Sender - The instantiated sender instance (not yet set up). - """ - - output_connectors = cast( - list[Output], - [ - Factory.create({output_name: output}) - for output_name, output in self.configuration.output.items() - ], - ) - - error_output: Output | None = ( - Factory.create(self.configuration.error_output) - if self.configuration.error_output - else None - ) - - if error_output is None: - logger.warning("No error output configured.") - - return Sender( - pipeline=self._initialize_pipeline(), - outputs=cast(list[Output], output_connectors), - error_output=error_output, - process_count=self.configuration.process_count, - ) - - def run(self) -> None: - """Run the runner and continuously process events until stopped. - - This method starts the main processing loop, refreshes the configuration - if needed, processes event batches, and only exits once `stop()` has been - called (setting `should_exit` to True). At the end, it shuts down all - components gracefully. - """ - - # TODO: - # * integration tests - - self.configuration.schedule_config_refresh() + loop = asyncio.get_running_loop() + next_run = loop.time() + refresh_interval while True: - if self.should_exit: - logger.debug("Runner exiting.") + sleep_time = next_run - loop.time() + if sleep_time < 0: + sleep_time = 0.0 + + try: + await asyncio.sleep(sleep_time) + except asyncio.CancelledError: + logger.debug("Config refresh cancelled. Exiting...") + raise + + try: + await self.config.reload() + except asyncio.CancelledError: + logger.debug("Config reload cancelled. Exiting...") + raise + except Exception: + logger.exception("scheduled config reload failed") + raise + else: + if self.config.version != current_config_version: + logger.info("Detected new config version: %s", self.config.version) + current_config_version = self.config.version + yield self.config + + refresh_interval = self.config.config_refresh_interval + if refresh_interval is None: + logger.debug("Config refresh has been disabled.") break - logger.debug("Runner processing loop.") + next_run += refresh_interval + + async def run(self) -> None: + """Run the runner and continuously process events until stopped.""" + + try: + async with asyncio.TaskGroup() as tg: + tg.create_task(TerminateTaskGroup.raise_on_event(self._stop_event)) + + async def start_pipeline(config: Configuration) -> asyncio.Task: + pipeline_manager = PipelineManager( + config, shutdown_timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT + ) + await pipeline_manager.setup() + + return tg.create_task( + pipeline_manager.run(), + name="pipeline_manager", + ) + + try: + async for _ in restart_task_on_iter( + source=self._refresh_configuration_gen( + initial_config_version=self.config.version + ), + task_factory=start_pipeline, + cancel_timeout_s=HARD_SHUTDOWN_TIMEOUT, + inital_task=await start_pipeline(self.config), + ): + logger.debug( + "A new pipeline task has been spawned based on the latest configuration" + ) + except TimeoutError: + logger.error( + "Could not gracefully shut down pipeline manager within timeframe", + exc_info=True, + ) + raise + except ExceptionGroup as eg: + if not eg.exceptions or len(eg.exceptions) > 1: + raise + match list(eg.exceptions)[0]: + case TerminateTaskGroup(): + logger.debug("Task group terminated") + case _: + raise - logger.debug("Check configuration change before processing a batch of events.") - self.configuration.refresh() - - if self.configuration.version != self._running_config_version: - self.reload() - - logger.debug("Process next batch of events.") - self._process_events() - - self.shut_down() logger.debug("End log processing.") - def _process_events(self) -> None: - """Process a batch of events got from sender iterator.""" - - logger.debug("Start log processing.") - - sender = cast(Sender, self.sender) - logger.debug(f"Get batch of events from sender (batch_size={sender.batch_size}).") - for event in sender: - if event is None: - continue - - if event.state == EventStateType.FAILED: - logger.error("event failed: %s", event) - else: - logger.debug("event processed: %s", event.state) - - logger.debug("Finished processing batch of events.") - - def setup(self) -> None: - """Set up the runner, its components, and required runner attributes.""" - - self.sender = self._initialize_sender() - self.sender.setup() - self.should_exit = False - - logger.info("Runner set up complete.") - - def shut_down(self) -> None: - """Shut down runner components, and required runner attributes.""" - - self.should_exit = True - cast(Sender, self.sender).shut_down() - self.sender = None - - input_connector = cast(Input, self._input_connector) - input_connector.acknowledge() - - len_delivered_events = len(input_connector.event_backlog.get(EventStateType.DELIVERED)) - if len_delivered_events: - logger.error( - f"Input connector has {len_delivered_events} non-acked events in event_backlog." - ) - - logger.info("Runner shut down complete.") - def stop(self) -> None: - """Stop the runner and signal the underlying processing pipeline to exit. - - This method sets the `should_exit` flag to True, which will cause the - runner and its components to stop gracefully. - """ + """Stop the runner and signal the underlying processing pipeline to exit.""" logger.info("Stopping runner and exiting...") - self.should_exit = True + self._stop_event.set() def setup_logging(self) -> None: """Setup the logging configuration. @@ -241,18 +150,6 @@ def setup_logging(self) -> None: warnings.simplefilter("always", DeprecationWarning) logging.captureWarnings(True) - log_config = DEFAULT_LOG_CONFIG | asdict(self.configuration.logger) + log_config = DEFAULT_LOG_CONFIG | asdict(self.config.logger) os.environ["LOGPREP_LOG_CONFIG"] = json.dumps(log_config) logging.config.dictConfig(log_config) - - def reload(self) -> None: - """Reload the log processing pipeline.""" - - logger.info("Reloading log processing pipeline...") - - self.shut_down() - self.setup() - - self._running_config_version = self.configuration.version - self.configuration.schedule_config_refresh() - logger.info("Finished reloading log processing pipeline.") diff --git a/logprep/ng/sender.py b/logprep/ng/sender.py index ece8b4616..a5dd5a3fb 100644 --- a/logprep/ng/sender.py +++ b/logprep/ng/sender.py @@ -1,16 +1,16 @@ """sender module""" +import asyncio import logging -from collections.abc import Iterator -from itertools import islice -from typing import Generator +import typing +from collections import defaultdict +from collections.abc import Sequence from logprep.ng.abc.event import ExtraDataEvent from logprep.ng.abc.output import Output from logprep.ng.event.error_event import ErrorEvent from logprep.ng.event.event_state import EventStateType from logprep.ng.event.log_event import LogEvent -from logprep.ng.pipeline import Pipeline logger = logging.getLogger("Sender") @@ -23,91 +23,77 @@ def __str__(self) -> str: return f"{self.message}: {self.exceptions}" -class Sender(Iterator): +class Sender: """Sender class to handle sending events to configured outputs.""" def __init__( self, - pipeline: Pipeline, outputs: list[Output], error_output: Output | None = None, - process_count: int = 3, ) -> None: - self.pipeline = pipeline self._outputs = {output.name: output for output in outputs} self._default_output = [output for output in outputs if output.default][0] self._error_output = error_output - self.batch_size = process_count - self.should_exit = False - - def __next__(self) -> LogEvent | ErrorEvent: - """not implemented, use iter()""" - raise NotImplementedError("Use iter() to get events from the Sender.") - - def __iter__(self) -> Generator[LogEvent | ErrorEvent, None, None]: - """Iterate over processed events.""" - while True: - logger.debug("Sender iterating") - batch = list(islice(self.pipeline, self.batch_size)) - self._send_and_flush_processed_events(batch_events=batch) - if self._error_output: - self._send_and_flush_failed_events(batch_events=batch) - if self.should_exit: - logger.debug("Sender exiting") - self.shut_down() - return - yield from batch - - def _send_and_flush_failed_events(self, batch_events: list[LogEvent]) -> None: - error_events = [ - self._send_failed(event) - for event in batch_events - if event is not None and event.state == EventStateType.FAILED - ] - if not error_events: - return - self._error_output.flush() # type: ignore[union-attr] + async def send_extras(self, batch_events: Sequence[LogEvent]) -> Sequence[LogEvent]: + output_buffers: dict[str, dict[str, list[ExtraDataEvent]]] = { + output_name: defaultdict(list) for output_name in self._outputs.keys() + } + + for event in batch_events: + for extra in typing.cast(Sequence[ExtraDataEvent], event.extra_data): + for output in extra.outputs: + for name, target in output.items(): + try: + output_buffers[name][target].append(extra) + except KeyError as error: + raise ValueError(f"Output {name} not configured.") from error + + results = await asyncio.gather( + *( + self._outputs[name].store_batch(events, target) + for name, target_events in output_buffers.items() + for target, events in target_events.items() + ), + return_exceptions=True, + ) + for r in results: + if isinstance(r, Exception): + logger.exception("Error while sending processed event", exc_info=r) + + # TODO: filter and handle successful + failed + # succeed_events, failed_events = ( + # [e for e in batch_events if e.state == EventStateType.DELIVERED], + # [e for e in batch_events if e.state == EventStateType.FAILED], + # ) + # assert len(succeed_events) + len(failed_events) == len(batch_events), "Lost events in batch" + + logger.debug("return send_extras %d", len(batch_events)) + + return batch_events + + async def send_default_output(self, batch_events: Sequence[LogEvent]) -> Sequence[LogEvent]: + logger.debug("send_default_output %d", len(batch_events)) + return await self._default_output.store_batch(batch_events) # type: ignore + + async def _send_and_flush_failed_events(self, batch_events: list[LogEvent]) -> None: + # send in parallel (minimal change vs. serial list comprehension) + error_events = await asyncio.gather(*(self._send_failed(event) for event in batch_events)) + + await self._error_output.flush() # type: ignore[union-attr] + failed_error_events = [ - event for event in error_events if event.state == EventStateType.FAILED + event for event in error_events if event.state is EventStateType.FAILED ] for error_event in failed_error_events: logger.error("Error during sending to error output: %s", error_event) - def _send_and_flush_processed_events(self, batch_events: list[LogEvent]) -> None: - processed_events = [ - self._send_processed(event) - for event in batch_events - if event is not None and event.state == EventStateType.PROCESSED - ] - if not processed_events: - return - for output in self._outputs.values(): - output.flush() - - def _send_extra_data(self, event: LogEvent) -> None: - extra_data_events: list[ExtraDataEvent] = event.extra_data - for extra_data_event in extra_data_events: - for output in extra_data_event.outputs: - for output_name, output_target in output.items(): - if output_name in self._outputs: - self._outputs[output_name].store_custom(extra_data_event, output_target) - else: - raise ValueError(f"Output {output_name} not configured.") - - def _send_processed(self, event: LogEvent) -> LogEvent: - if event.extra_data: - self._send_extra_data(event) - self._default_output.store(event) - return event - - def _send_failed(self, event: LogEvent) -> ErrorEvent: + async def _send_failed(self, event: LogEvent) -> ErrorEvent: """Send the event to the error output. If event can't be sent, it will be logged as an error. """ - error_event = self._get_error_event(event) - self._error_output.store(error_event) # type: ignore[union-attr] + await self._error_output.store(error_event) # type: ignore[union-attr] return error_event def _get_error_event(self, event: LogEvent) -> ErrorEvent: @@ -121,35 +107,19 @@ def _get_error_event(self, event: LogEvent) -> ErrorEvent: ) return ErrorEvent(log_event=event, reason=reason, state=EventStateType.PROCESSED) - def shut_down(self) -> None: + async def shut_down(self) -> None: """Shutdown all outputs gracefully.""" - - self.stop() for _, output in self._outputs.items(): - output.shut_down() + await output.shut_down() if self._error_output: - self._error_output.shut_down() + await self._error_output.shut_down() logger.info("All outputs have been shut down.") - - self.pipeline.shut_down() logger.info("Sender has been shut down.") - def setup(self) -> None: + async def setup(self) -> None: """Setup all outputs.""" for _, output in self._outputs.items(): - output.setup() + await output.setup() if self._error_output: - self._error_output.setup() + await self._error_output.setup() logger.info("All outputs have been set up.") - self.pipeline.setup() - - def stop(self) -> None: - """Request the sender to stop iteration. - - Calling stop() sets the should_exit flag. The sender will finish processing - the current batch and exit on the next iteration (i.e., the next next() call). - If you need to enforce an immediate stop, use shut_down() instead. - """ - - self.should_exit = True - logger.info("Sender stop signal received.") diff --git a/logprep/ng/util/__init__.py b/logprep/ng/util/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/logprep/ng/util/async_helpers.py b/logprep/ng/util/async_helpers.py new file mode 100644 index 000000000..51fefec54 --- /dev/null +++ b/logprep/ng/util/async_helpers.py @@ -0,0 +1,162 @@ +"""A collection of helper utilitites for async code""" + +import asyncio +from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator, Callable +from logging import Logger +from typing import Awaitable, TypeVar + +from logprep.ng.event.log_event import LogEvent +from logprep.ng.util.events import partition_by_state + +T = TypeVar("T") +D = TypeVar("D") + + +TaskFactory = Callable[[D], Awaitable[asyncio.Task[T]]] + + +class TerminateTaskGroup(Exception): + """Exception raised to terminate a task group.""" + + @staticmethod + async def raise_on_timeout(timeout_s: float, msg: str | None = None): + """Raises this exception type as soon as the timeout (in seconds) expires. + + Parameters + ---------- + timeout_s : float + Number of seconds after which the exception should be raised + msg : str | None, optional + Message for the exception, by default None + + Raises + ------ + TerminateTaskGroup + The exception for terminating the task group. + """ + await asyncio.sleep(timeout_s) + raise TerminateTaskGroup(msg) + + @staticmethod + async def raise_on_event(event: asyncio.Event, msg: str | None = None): + """Raises this exception type as soon as the event is set. + + Parameters + ---------- + event : asyncio.Event + Triggering event for the exception + msg : str | None, optional + Message for the exception, by default None + + Raises + ------ + TerminateTaskGroup + The exception for terminating the task group. + """ + await event.wait() + raise TerminateTaskGroup(msg) + + +async def cancel_task_and_wait(task: asyncio.Task[T], timeout_s: float) -> None: + """Cancels the given task and waits for it to actually stop. + Raises a :code:`TimeoutError` if timeout expires. + A :code:`CancelledError` will only be raised if the parent task is cancelled. + + Parameters + ---------- + task : asyncio.Task[T] + The task to cancel + timeout_s : float + The timeout in seconds to wait + + Raises + ------ + TimeoutError + Raised if the timeout expires and the task is still not done. + """ + task.cancel() + done, _ = await asyncio.wait([task], timeout=timeout_s) + if not done: + raise TimeoutError(f"Task {task.get_name()} did not stop in time after cancellation") + + +async def restart_task_on_iter( + source: AsyncIterator[D] | AsyncIterable[D], + task_factory: TaskFactory, + cancel_timeout_s: float, + inital_task: asyncio.Task[T] | None = None, +) -> AsyncGenerator[asyncio.Task[T], None]: + """Consumes an iterable data source and ensures that there is always one task executing on the latest data. + + Parameters + ---------- + source : AsyncIterator[D] | AsyncIterable[D] + The data source producing parameters for the spawned tasks + task_factory : Callable[[D], asyncio.Task[T]] + The factory to create new tasks from new data items + cancel_timeout_s : float + The number of seconds after which task cancellation is deemed not successful + inital_task : asyncio.Task[T] | None, optional + The initial task, by default None + + Returns + ------- + AsyncGenerator[asyncio.Task[T], None] + The stream of tasks which result from spawning fresh tasks on new data + + Yields + ------ + Iterator[AsyncGenerator[asyncio.Task[T], None]] + The stream of tasks which result from spawning fresh tasks on new data + """ + task = inital_task + async for data in source: + if task is not None: + await cancel_task_and_wait(task, cancel_timeout_s) + task = await task_factory(data) + yield task + + +def asyncio_exception_handler( + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + context: dict, + logger: Logger, +) -> None: + """ + Handle unhandled exceptions reported by the asyncio event loop. + + Covers exceptions from background tasks, callbacks, and loop internals. + Does not handle exceptions from awaited coroutines (e.g. runner.run()). + + Args: + loop: The current event loop. + context: Asyncio error context (may contain message, exception, task/future). + logger: Logger used to record the error. + """ + + msg = context.get("message", "Unhandled exception in event loop") + exception = context.get("exception") + task = context.get("task") or context.get("future") + + logger.error(f"{msg}") + + if task: + logger.error(f"Task: {task!r}") + + if isinstance(task, asyncio.Task): + logger.error(f"Task name: {task.get_name()}") + + if exception: + logger.error(f"Unhandled exception: {exception!r}", exc_info=exception) + else: + logger.error(f"Context: {context!r}") + + +async def report_event_state(logger: Logger, batch: list[LogEvent]) -> list[LogEvent]: + events_by_state = partition_by_state(batch) + logger.info( + "Finished processing %d events: %s", + len(batch), + ", ".join(f"#{state}={len(events)}" for state, events in events_by_state.items()), + ) + return batch diff --git a/logprep/ng/util/configuration.py b/logprep/ng/util/configuration.py index 37b2aedb4..d674dfc97 100644 --- a/logprep/ng/util/configuration.py +++ b/logprep/ng/util/configuration.py @@ -193,10 +193,10 @@ import json import logging import os +import typing from copy import deepcopy from importlib.metadata import version from itertools import chain -from logging.config import dictConfig from pathlib import Path from typing import Any, Iterable, List, Optional, Sequence, Tuple @@ -205,14 +205,13 @@ from ruamel.yaml import YAML from ruamel.yaml.compat import StringIO from ruamel.yaml.scanner import ScannerError -from schedule import Scheduler -from logprep.abc.component import Component from logprep.abc.getter import Getter -from logprep.abc.processor import Processor from logprep.factory import Factory from logprep.factory_error import FactoryError, InvalidConfigurationError from logprep.metrics.metrics import CounterMetric, GaugeMetric +from logprep.ng.abc.component import NgComponent +from logprep.ng.abc.processor import Processor from logprep.ng.util.defaults import ( DEFAULT_CONFIG_LOCATION, DEFAULT_LOG_CONFIG, @@ -238,14 +237,14 @@ class MyYAML(YAML): """helper class to dump yaml with ruamel.yaml""" def dump(self, data: Any, stream: Any | None = None, **kw: Any) -> Any: - inefficient = False if stream is None: - inefficient = True stream = StringIO() - YAML.dump(self, data, stream, **kw) - if inefficient: + YAML.dump(self, data, stream, **kw) return stream.getvalue() + YAML.dump(self, data, stream, **kw) + return None + yaml = MyYAML(pure=True) @@ -253,9 +252,9 @@ def dump(self, data: Any, stream: Any | None = None, **kw: Any) -> Any: class InvalidConfigurationErrors(InvalidConfigurationError): """Raise for multiple Configuration related exceptions.""" - errors: List[InvalidConfigurationError] + errors: Sequence[InvalidConfigurationError] - def __init__(self, errors: List[Exception]) -> None: + def __init__(self, errors: Sequence[Exception]) -> None: unique_errors = [] for error in errors: if not isinstance(error, InvalidConfigurationError): @@ -339,7 +338,7 @@ class LoggerConfig: compatible with :func:`logging.config.dictConfig`. """ - _LOG_LEVELS = ( + _log_levels = ( logging.NOTSET, # 0 logging.DEBUG, # 10 logging.INFO, # 20 @@ -357,7 +356,7 @@ class LoggerConfig: default="INFO", validator=[ validators.instance_of(str), - validators.in_([logging.getLevelName(level) for level in _LOG_LEVELS]), + validators.in_([logging.getLevelName(level) for level in _log_levels]), ], eq=False, ) @@ -442,7 +441,7 @@ def setup_logging(self) -> None: log_config = asdict(self) os.environ["LOGPREP_LOG_CONFIG"] = json.dumps(log_config) - dictConfig(log_config) + logging.config.dictConfig(log_config) def _set_loggers_levels(self) -> None: """Normalize per-logger configuration and preserve explicit levels. @@ -653,7 +652,6 @@ class Configuration: _metrics: "Configuration.Metrics" = field(init=False, repr=False, eq=False) _getter: Getter = field( - validator=validators.instance_of(Getter), default=GetterFactory.from_string(DEFAULT_CONFIG_LOCATION), repr=False, eq=False, @@ -663,27 +661,18 @@ class Configuration: validator=validators.instance_of(tuple), factory=tuple, repr=False, eq=False ) - _scheduler: Scheduler = field( - factory=Scheduler, - validator=validators.instance_of(Scheduler), - repr=False, - eq=False, - init=False, - ) - _config_failure: bool = field(default=False, repr=False, eq=False, init=False) _unserializable_fields = ( "_getter", "_configs", "_config_failure", - "_scheduler", "_metrics", "_unserializable_fields", ) @define(kw_only=True) - class Metrics(Component.Metrics): + class Metrics(NgComponent.Metrics): """Metrics for the Logprep Runner.""" version_info: GaugeMetric = field( @@ -780,7 +769,7 @@ def from_source(cls, config_path: str) -> "Configuration": return config @classmethod - def from_sources(cls, config_paths: Iterable[str] | None = None) -> "Configuration": + async def from_sources(cls, config_paths: Iterable[str] | None = None) -> "Configuration": """Creates configuration from a list of configuration sources. Parameters @@ -822,7 +811,7 @@ def from_sources(cls, config_paths: Iterable[str] | None = None) -> "Configurati except InvalidConfigurationErrors as error: errors = [*errors, *error.errors] try: - configuration._verify() + await configuration._verify() except InvalidConfigurationErrors as error: errors = [*errors, *error.errors] if errors: @@ -845,7 +834,7 @@ def as_yaml(self) -> str: """Return the configuration as yaml string.""" return yaml.dump(self.as_dict()) - def reload(self) -> None: + async def reload(self) -> None: """Reload the application's configuration from the configured sources. This method attempts to rebuild the configuration from all paths listed in @@ -875,9 +864,17 @@ def reload(self) -> None: errors: List[Exception] = [] try: - new_config = Configuration.from_sources(self.config_paths) + new_config = await Configuration.from_sources(self.config_paths) + refresh_interval = ( + MIN_CONFIG_REFRESH_INTERVAL + if self.config_refresh_interval is None + else max( + self.config_refresh_interval, + MIN_CONFIG_REFRESH_INTERVAL, + ) + ) if new_config.config_refresh_interval is None: - new_config.config_refresh_interval = self.config_refresh_interval + new_config.config_refresh_interval = refresh_interval self._configs = new_config._configs # pylint: disable=protected-access self._set_attributes_from_configs() self._set_version_info_metric() @@ -907,49 +904,8 @@ def _set_config_refresh_interval(self, config_refresh_interval: int | None) -> N return config_refresh_interval = max(config_refresh_interval, MIN_CONFIG_REFRESH_INTERVAL) self.config_refresh_interval = config_refresh_interval - self.schedule_config_refresh() self._metrics.config_refresh_interval += config_refresh_interval - def schedule_config_refresh(self) -> None: - """ - Schedules a periodic configuration refresh based on the specified interval. - - Cancels any existing scheduled configuration refresh job and schedules a new one - using the current :code:`config_refresh_interval`. - The refresh job will call the :code:`reload` method at the specified interval - in seconds on invoking the :code:`refresh` method. - - Notes - ----- - - Only one configuration refresh job is scheduled at a time - - Any existing job is cancelled before scheduling a new one. - - The interval must be an integer representing seconds. - - Examples - -------- - >>> self.schedule_config_refresh() - Config refresh interval is set to: 60 seconds - """ - scheduler = self._scheduler - if self.config_refresh_interval is None: - if scheduler.jobs: - scheduler.cancel_job(scheduler.jobs[0]) - return - - self.config_refresh_interval = max( - self.config_refresh_interval, MIN_CONFIG_REFRESH_INTERVAL - ) - refresh_interval = self.config_refresh_interval - if scheduler.jobs: - scheduler.cancel_job(scheduler.jobs[0]) - if isinstance(refresh_interval, int): - scheduler.every(refresh_interval).seconds.do(self.reload) - logger.info("Config refresh interval is set to: %s seconds", refresh_interval) - - def refresh(self) -> None: - """Wrap the scheduler run_pending method hide the implementation details.""" - self._scheduler.run_pending() - def _set_attributes_from_configs(self) -> None: for attribute in filter(lambda x: x.repr, fields(self.__class__)): setattr( @@ -1009,7 +965,7 @@ def _get_last_non_default_value(configs: Sequence["Configuration"], attribute: s return values[-1] return getattr(Configuration(), attribute) - def _verify(self) -> None: + async def _verify(self) -> None: """Verify the configuration.""" errors: list[Exception] = [] try: @@ -1038,8 +994,8 @@ def _verify(self) -> None: errors.append(error) for processor_config in self.pipeline: try: - processor = Factory.create(deepcopy(processor_config)) - processor.setup() + processor = typing.cast(Processor, Factory.create(deepcopy(processor_config))) + await processor.setup() self._verify_rules(processor) except ( FactoryError, @@ -1055,9 +1011,9 @@ def _verify(self) -> None: self._verify_processor_outputs(processor_config) except Exception as error: # pylint: disable=broad-except errors.append(error) - if ENV_NAME_LOGPREP_CREDENTIALS_FILE in os.environ: + credentials_file_path = os.environ.get(ENV_NAME_LOGPREP_CREDENTIALS_FILE) + if credentials_file_path is not None: try: - credentials_file_path = os.environ.get(ENV_NAME_LOGPREP_CREDENTIALS_FILE) _ = CredentialsFactory.get_content(Path(credentials_file_path)) except Exception as error: # pylint: disable=broad-except errors.append(error) diff --git a/logprep/ng/util/defaults.py b/logprep/ng/util/defaults.py index a897f4862..62f9cd1aa 100644 --- a/logprep/ng/util/defaults.py +++ b/logprep/ng/util/defaults.py @@ -21,7 +21,9 @@ class EXITCODES(IntEnum): DEFAULT_MESSAGE_BACKLOG_SIZE = 15000 DEFAULT_RESTART_COUNT = 5 DEFAULT_CONFIG_LOCATION = "file:///etc/logprep/pipeline.yml" -DEFAULT_LOG_FORMAT = "%(asctime)-15s %(process)-6s %(name)-10s %(levelname)-8s: %(message)s" +DEFAULT_LOG_FORMAT = ( + "%(asctime)-15s %(process)-6s %(taskName)s %(name)-10s %(levelname)-8s: %(message)s" +) DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" DEFAULT_AES_KEY_LENGTH = 32 DEFAULT_BATCH_SIZE = 1 @@ -34,7 +36,7 @@ class EXITCODES(IntEnum): "version": 1, "formatters": { "logprep": { - "class": "logprep.util.logging.LogprepFormatter", + "class": "logprep.ng.util.logging_helpers.LogprepFormatter", "format": DEFAULT_LOG_FORMAT, "datefmt": DEFAULT_LOG_DATE_FORMAT, } diff --git a/logprep/ng/util/events.py b/logprep/ng/util/events.py new file mode 100644 index 000000000..32f3b9c24 --- /dev/null +++ b/logprep/ng/util/events.py @@ -0,0 +1,17 @@ +from collections import defaultdict +from collections.abc import Sequence +from typing import TypeVar + +from logprep.ng.abc.event import Event +from logprep.ng.event.event_state import EventStateType + +E_co = TypeVar("E_co", bound=Event, covariant=True) + + +def partition_by_state(events: Sequence[E_co]) -> dict[EventStateType, list[E_co]]: + result = defaultdict(list) + + for event in events: + result[event.state.current_state].append(event) + + return result diff --git a/logprep/ng/util/logging_helpers.py b/logprep/ng/util/logging_helpers.py new file mode 100644 index 000000000..d5dac9411 --- /dev/null +++ b/logprep/ng/util/logging_helpers.py @@ -0,0 +1,45 @@ +"""helper classes for logprep logging""" + +import asyncio +import threading + +from logprep.util.logging import LogprepFormatter as NonNgLogprepFormatter +from logprep.util.logging import LogprepMPQueueListener as NonNgLogprepMPQueueListener + + +class LogprepFormatter(NonNgLogprepFormatter): + """ + A custom formatter for logprep logging with additional attributes. + + The Formatter can be initialized with a format string which makes use of + knowledge of the LogRecord attributes - e.g. the default value mentioned + above makes use of the fact that the user's message and arguments are pre- + formatted into a LogRecord's message attribute. The available attributes + are listed in the + `python documentation `_ . + Additionally, the formatter provides the following logprep specific attributes: + + .. table:: + + +-----------------------+--------------------------------------------------+ + | attribute | description | + +=======================+==================================================+ + | %(hostname) | (Logprep specific) The hostname of the machine | + | | where the log was emitted | + +-----------------------+--------------------------------------------------+ + | %(taskName) | The name of the executing asyncio task. | + +-----------------------+--------------------------------------------------+ + + """ + + def format(self, record): + # patch taskName for older python version (at least 3.11) + try: + record.taskName = asyncio.current_task().get_name() + except Exception: # pylint: disable=broad-exception-caught + record.taskName = threading.current_thread().name + return super().format(record) + + +class LogprepMPQueueListener(NonNgLogprepMPQueueListener): + """Logprep specific QueueListener that uses a multiprocessing instead of threading""" diff --git a/logprep/ng/util/worker/__init__.py b/logprep/ng/util/worker/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/logprep/ng/util/worker/types.py b/logprep/ng/util/worker/types.py new file mode 100644 index 000000000..61897cbe4 --- /dev/null +++ b/logprep/ng/util/worker/types.py @@ -0,0 +1,43 @@ +""" +Fundamental contracts and abstractions for the async pipeline system. + +This module defines the structural interfaces that decouple workers, +handlers, and pipeline infrastructure. The intent is to establish a +clear separation between execution mechanics and processing logic, +allowing components to remain reusable, composable, and reload-safe. + +All definitions here describe behavior, expectations, and semantic +constraints rather than implementing runtime functionality. +""" + +import asyncio +from collections.abc import Callable, Coroutine +from typing import TypeVar + +T = TypeVar("T") +Input = TypeVar("Input") +Output = TypeVar("Output") + +AsyncHandler = Callable[[list[Input]], Coroutine[object, object, list[Output]]] + + +class SizeLimitedQueue(asyncio.Queue[T]): + """ + Queue wrapper which ensures a maxsize configured. + + Parameters + ---------- + maxsize : int + Maximum number of items the queue can hold. + Must be > 0. + + Raises + ------ + ValueError + If maxsize <= 0. + """ + + def __init__(self, maxsize: int) -> None: + if maxsize <= 0: + raise ValueError("Queue must be bounded") + super().__init__(maxsize=maxsize) diff --git a/logprep/ng/util/worker/worker.py b/logprep/ng/util/worker/worker.py new file mode 100644 index 000000000..0254ff5da --- /dev/null +++ b/logprep/ng/util/worker/worker.py @@ -0,0 +1,311 @@ +""" +Worker execution and batching mechanics. + +This module provides the standalone Worker abstraction responsible for +input consumption, deterministic batching, optional batch processing, +and cooperative shutdown behavior. + +The worker is intentionally decoupled from pipeline orchestration logic +and focuses solely on predictable buffering, flushing, and backpressure +interaction with the output queue. +""" + +import asyncio +import logging +from asyncio import AbstractEventLoop +from collections import deque +from collections.abc import AsyncIterator +from typing import Any, Generic, TypeVar + +from logprep.ng.util.worker.types import AsyncHandler, SizeLimitedQueue + +logger = logging.getLogger("Worker") # pylint: disable=no-member + +T = TypeVar("T") +Input = TypeVar("Input") +Output = TypeVar("Output") + + +class Worker(Generic[Input, Output]): + """ + Generic batching worker with cooperative shutdown semantics. + """ + + def __init__( + self, + name: str, + batch_size: int, + batch_interval_s: float, + handler: AsyncHandler[Input, Output], + in_queue: SizeLimitedQueue[Input] | AsyncIterator[Input], + out_queue: SizeLimitedQueue[Output] | None = None, + ) -> None: + self.name = name + + self._handler = handler + + self.in_queue = in_queue + self.out_queue = out_queue + + self._batch_interval_s = batch_interval_s + self._batch_size = batch_size + + self._batch_buffer: deque[Input] = deque() + self._buffer_lock = asyncio.Lock() # TODO is locking really required? + + self._flush_timer: asyncio.Task[None] | None = None + + def _start_timer_locked(self) -> None: + """ + Arm or re-arm the batch timer. + + Must be called with _buffer_lock held. Ensures that at most one + timer task is active for the current batch window. + """ + if self._flush_timer: + if self._flush_timer.done(): + exc = self._flush_timer.exception() + if exc is not None: + logger.error("flush timer task has failed", exc_info=exc) + else: + self._flush_timer.cancel() + self._flush_timer = asyncio.create_task(self._flush_after_interval()) + + def _cancel_timer_if_needed(self) -> None: + """ + Cancel the active timer task if it is still pending. + + Avoids cancelling the currently executing timer task to prevent + self-cancellation race conditions. + """ + t = self._flush_timer + if not t: + return + if t.done(): + exc = t.exception() + if exc is not None: + logger.error("flush timer task has failed", exc_info=exc) + return + if t is asyncio.current_task(): + return + t.cancel() + + async def _flush_after_interval(self) -> None: + """ + Timer coroutine responsible for time-based batch flushing. + + Sleeps for the configured interval and flushes the buffered items + if the batch has not already been drained by the size trigger. + """ + try: + await asyncio.sleep(self._batch_interval_s) + except asyncio.CancelledError: + return + + batch: list[Input] | None = None + async with self._buffer_lock: + if self._batch_buffer: + batch = self._drain_locked() + if self._flush_timer is asyncio.current_task(): + self._flush_timer = None + + if batch: + logger.debug("Flushing messages based on timer") + await self._flush_batch(batch) + + def _drain_locked(self) -> list[Input]: + """ + Drain the current buffer contents. + + Must be called with _buffer_lock held. Cancels any active timer + and returns a snapshot of buffered items. + """ + batch = list(self._batch_buffer) + self._batch_buffer.clear() + self._cancel_timer_if_needed() + self._flush_timer = None + return batch + + async def add(self, item: Input) -> None: + """ + Add a single item to the batch buffer. + + May trigger a flush if the size threshold is reached. Starts the + batch timer when the first item of a new batch arrives. + """ + batch_to_flush: list[Input] | None = None + + async with self._buffer_lock: + self._batch_buffer.append(item) + + if len(self._batch_buffer) == 1: + self._start_timer_locked() + + if len(self._batch_buffer) >= self._batch_size: + batch_to_flush = self._drain_locked() + + if batch_to_flush: + logger.debug("Flushing messages based on backlog size") + await self._flush_batch(batch_to_flush) + + async def flush(self) -> None: + """ + Force flushing of buffered items. + + Drains and processes the current buffer regardless of size or + timer state. + """ + + batch_to_flush: list[Input] | None = None + async with self._buffer_lock: + if self._batch_buffer: + batch_to_flush = self._drain_locked() + if batch_to_flush: + logger.debug("Flushing messages based on manual trigger") + await self._flush_batch(batch_to_flush) + + async def _process_batch(self, batch: list[Input]) -> list[Output]: + return await self._handler(batch) + + async def _flush_batch(self, batch: list[Input]) -> None: + """ + Process and forward a completed batch. + + Applies the optional handler and forwards the resulting items to + the output queue if configured. + """ + batch_result: list[Output] = await self._process_batch(batch) + + if self.out_queue is not None: + for item in batch_result: + await self.out_queue.put(item) + + await asyncio.sleep(0) + + async def run(self, stop_event: asyncio.Event) -> None: + """ + Execute the worker processing loop. + + Continuously consumes items until stop_event is set or the task is + cancelled. Ensures a final buffer flush during shutdown. + """ + + try: + if isinstance(self.in_queue, asyncio.Queue): + while not stop_event.is_set(): + item = await self.in_queue.get() + + if item is not None: + await self.add(item) + + await asyncio.sleep(0.0) + else: + while not stop_event.is_set(): + item = await anext(self.in_queue) + + if item is not None: + await self.add(item) + + await asyncio.sleep(0.0) + + except asyncio.CancelledError: + logger.debug("Worker cancelled") + raise + finally: + await self.flush() + + +class WorkerOrchestrator: + """ + Orchestrates a chain of workers. + + Lifecycle: + - run(): start workers + background tasks and wait until stop_event is set + - shut_down(): stop workers + background tasks and end manager lifetime + """ + + def __init__( + self, + workers: list[Worker], + loop: AbstractEventLoop | None = None, + ) -> None: + """ + Initialize the manager with a worker chain and optional event loop. + """ + self._loop: AbstractEventLoop = loop if loop is not None else asyncio.get_event_loop() + self._workers: list[Worker] = workers + + self._stop_event = asyncio.Event() + + self._worker_tasks: set[asyncio.Task[Any]] = set() + + self._exceptions: list[BaseException] = [] + self._reload_lock = asyncio.Lock() + + def _setup(self) -> None: + """Perform manager initialization steps that require a fully constructed instance.""" + + def run_workers(self) -> None: + """ + Start worker tasks (data-plane). + + Worker tasks may be restarted on reload; background tasks are not. + """ + for worker in self._workers: + t = self._loop.create_task(worker.run(self._stop_event), name=worker.name) + self._add_worker_task(t) + + def _add_worker_task(self, task: asyncio.Task[Any]) -> None: + self.exceptions_ = """Track a worker task and fail-fast on exceptions.""" + + def _done(t: asyncio.Task[Any]) -> None: + self._worker_tasks.discard(t) + + if t.cancelled(): + return + + exc = t.exception() + if exc is not None: + self._exceptions.append(exc) + self._stop_event.set() + + task.add_done_callback(_done) + self._worker_tasks.add(task) + + async def run(self) -> None: + """ + Run the manager until stop_event is set. + + Starts workers and background tasks and then blocks waiting for shutdown. + """ + + self._setup() + self.run_workers() + + await self._stop_event.wait() + + async def shut_down(self, timeout_s: float) -> None: + """ + Fully shut down the manager. + + Stops workers and background tasks, clears registrations, and signals stop_event + so run() can exit. + """ + self._stop_event.set() + + logger.debug("waiting for termination of %d tasks", len(self._worker_tasks)) + + try: + await asyncio.wait_for( + asyncio.gather(*self._worker_tasks, return_exceptions=True), timeout_s + ) + except TimeoutError: + unfinished_workers = [w for w in self._worker_tasks if not w.done()] + if unfinished_workers: + logger.debug( + "[%d/%d] did not stop gracefully. Awaiting cancellation: [%s]", + len(unfinished_workers), + len(self._worker_tasks), + ", ".join(map(asyncio.Task.get_name, unfinished_workers)), + ) + await asyncio.gather(*unfinished_workers, return_exceptions=True) diff --git a/logprep/processor/labeler/processor.py b/logprep/processor/labeler/processor.py index 52cdfac21..57c8d8be5 100644 --- a/logprep/processor/labeler/processor.py +++ b/logprep/processor/labeler/processor.py @@ -24,11 +24,13 @@ .. automodule:: logprep.processor.labeler.rule """ -from typing import Optional +import typing +from collections.abc import Iterable, Sequence from attrs import define, field, validators from logprep.abc.processor import Processor +from logprep.processor.base.rule import Rule from logprep.processor.labeler.labeling_schema import LabelingSchema from logprep.processor.labeler.rule import LabelerRule from logprep.util.helper import add_fields_to, get_dotted_field_value @@ -58,9 +60,7 @@ class Config(Processor.Config): authenticity and integrity of the loaded values. """ - include_parent_labels: Optional[bool] = field( - default=False, validator=validators.optional(validator=validators.instance_of(bool)) - ) + include_parent_labels: bool = field(default=False, validator=validators.instance_of(bool)) """If the option is deactivated only labels defined in a rule will be activated. Otherwise, also allowed labels in the path to the *root* of the corresponding category of a label will be added. @@ -73,10 +73,20 @@ class Config(Processor.Config): rule_class = LabelerRule - def __init__(self, name: str, configuration: Processor.Config): + def __init__(self, name: str, configuration: "Labeler.Config"): self._schema = LabelingSchema.create_from_file(configuration.schema) super().__init__(name, configuration=configuration) + @property + def config(self) -> Config: + """Provides the properly typed configuration object""" + return typing.cast(Labeler.Config, self._config) + + @property + def rules(self) -> Sequence[LabelerRule]: + """Returns all rules""" + return typing.cast(Sequence[LabelerRule], super().rules) + def setup(self): super().setup() for rule in self.rules: @@ -84,13 +94,14 @@ def setup(self): rule.add_parent_labels_from_schema(self._schema) rule.conforms_to_schema(self._schema) - def _apply_rules(self, event, rule): + def _apply_rules(self, event: dict, rule: Rule) -> None: """Applies the rule to the current event""" - fields = {key: value for key, value in rule.prefixed_label.items()} - add_fields_to(event, fields, rule=rule, merge_with_target=True) - # convert sets into sorted lists + rule = typing.cast(LabelerRule, rule) + add_fields_to(event, rule.prefixed_label, rule=rule, merge_with_target=True) + # we have already added (merged) the prefixed_labels with list values + # now we extract them to make them unique and sorted fields = { - key: sorted(set(get_dotted_field_value(event, key))) + key: sorted(set(typing.cast(Iterable, get_dotted_field_value(event, key)))) for key, _ in rule.prefixed_label.items() } add_fields_to(event, fields, rule=rule, overwrite_target=True) diff --git a/logprep/processor/pseudonymizer/processor.py b/logprep/processor/pseudonymizer/processor.py index 020968cba..899b4732c 100644 --- a/logprep/processor/pseudonymizer/processor.py +++ b/logprep/processor/pseudonymizer/processor.py @@ -33,6 +33,8 @@ """ import re +import typing +from collections.abc import Sequence from functools import cached_property, lru_cache from itertools import chain from typing import Pattern @@ -47,12 +49,13 @@ from logprep.processor.pseudonymizer.rule import PseudonymizerRule from logprep.util.getter import GetterFactory from logprep.util.hasher import SHA256Hasher -from logprep.util.helper import add_fields_to, get_dotted_field_value +from logprep.util.helper import add_fields_to, get_dotted_field_values from logprep.util.pseudo.encrypter import ( DualPKCS1HybridCTREncrypter, DualPKCS1HybridGCMEncrypter, Encrypter, ) +from logprep.util.typing import is_lru_cached from logprep.util.url.url import extract_urls @@ -64,20 +67,16 @@ class Config(FieldManager.Config): """Pseudonymizer config""" outputs: tuple[dict[str, str]] = field( - validator=[ - validators.deep_iterable( - member_validator=[ - validators.instance_of(dict), - validators.deep_mapping( - key_validator=validators.instance_of(str), - value_validator=validators.instance_of(str), - mapping_validator=validators.max_len(1), - ), - ], - iterable_validator=validators.instance_of(tuple), - ), - validators.min_len(1), - ], + validator=validators.deep_iterable( + member_validator=[ + validators.deep_mapping( + key_validator=validators.instance_of(str), + value_validator=validators.instance_of(str), + mapping_validator=(validators.instance_of(dict), validators.max_len(1)), + ), + ], + iterable_validator=(validators.instance_of(tuple), validators.min_len(1)), + ), converter=tuple, ) """list of output mappings in form of :code:`output_name:topic`. @@ -227,24 +226,35 @@ def _hasher(self): @cached_property def _encrypter(self) -> Encrypter: - if self._config.mode == "CTR": - encrypter = DualPKCS1HybridCTREncrypter() - else: - encrypter = DualPKCS1HybridGCMEncrypter() - encrypter.load_public_keys(self._config.pubkey_analyst, self._config.pubkey_depseudo) + encrypter = ( + DualPKCS1HybridCTREncrypter() + if self.config.mode == "CTR" + else DualPKCS1HybridGCMEncrypter() + ) + encrypter.load_public_keys(self.config.pubkey_analyst, self.config.pubkey_depseudo) return encrypter @cached_property def _regex_mapping(self) -> dict: - return GetterFactory.from_string(self._config.regex_mapping).get_yaml() + return GetterFactory.from_string(self.config.regex_mapping).get_dict() @cached_property def _get_pseudonym_dict_cached(self): - return lru_cache(maxsize=self._config.max_cached_pseudonyms)(self._pseudonymize) + return lru_cache(maxsize=self.config.max_cached_pseudonyms)(self._pseudonymize) @cached_property def _pseudonymize_url_cached(self): - return lru_cache(maxsize=self._config.max_cached_pseudonymized_urls)(self._pseudonymize_url) + return lru_cache(maxsize=self.config.max_cached_pseudonymized_urls)(self._pseudonymize_url) + + @property + def config(self) -> Config: + """Provides the properly typed configuration object""" + return typing.cast(Pseudonymizer.Config, self._config) + + @property + def rules(self) -> Sequence[PseudonymizerRule]: + """Returns all rules""" + return typing.cast(Sequence[PseudonymizerRule], super().rules) def setup(self): super().setup() @@ -257,13 +267,12 @@ def _replace_regex_keywords_by_regex_expression(self): rule.pseudonyms[dotted_field] = re.compile(self._regex_mapping[regex_keyword]) elif isinstance(regex_keyword, str): # after the first run, the regex is compiled raise InvalidConfigurationError( - f"Regex keyword '{regex_keyword}' not found in regex_mapping '{self._config.regex_mapping}'" + f"Regex keyword '{regex_keyword}' not found in regex_mapping '{self.config.regex_mapping}'" ) def _apply_rules(self, event: dict, rule: PseudonymizerRule): - source_dict = {} - for source_field in rule.pseudonyms: - source_dict[source_field] = get_dotted_field_value(event, source_field) + rule = typing.cast(PseudonymizerRule, rule) + source_dict = get_dotted_field_values(event, rule.pseudonyms) self._handle_missing_fields(event, rule, source_dict.keys(), source_dict.values()) for dotted_field, field_value in source_dict.items(): @@ -276,7 +285,7 @@ def _apply_rules(self, event: dict, rule: PseudonymizerRule): for value in field_value ] else: - field_value = self._pseudonymize_field(rule, dotted_field, regex, field_value) + field_value = self._pseudonymize_field(rule, dotted_field, regex, str(field_value)) add_fields_to( event, fields={dotted_field: field_value}, rule=rule, overwrite_target=True ) @@ -311,13 +320,13 @@ def _pseudonymize_string(self, value: str) -> str: if self.pseudonymized_pattern.match(value): return value pseudonym_dict = self._get_pseudonym_dict_cached(value) - extra = (pseudonym_dict, self._config.outputs) + extra = (pseudonym_dict, self.config.outputs) if extra not in self.result.data: self.result.data.append(extra) return self._wrap_hash(pseudonym_dict["pseudonym"]) def _pseudonymize(self, value): - hash_string = self._hasher.hash_str(value, salt=self._config.hash_salt) + hash_string = self._hasher.hash_str(value, salt=self.config.hash_salt) encrypted_origin = self._encrypter.encrypt(value) return {"pseudonym": hash_string, "origin": encrypted_origin} @@ -358,12 +367,16 @@ def _pseudonymize_url(self, url_string: str) -> str: def _wrap_hash(self, hash_string: str) -> str: return self.HASH_PREFIX + hash_string + self.HASH_SUFFIX - def _update_cache_metrics(self): - cache_info_pseudonyms = self._get_pseudonym_dict_cached.cache_info() - cache_info_urls = self._pseudonymize_url_cached.cache_info() - self.metrics.new_results += cache_info_pseudonyms.misses + cache_info_urls.misses - self.metrics.cached_results += cache_info_pseudonyms.hits + cache_info_urls.hits - self.metrics.num_cache_entries += cache_info_pseudonyms.currsize + cache_info_urls.currsize - self.metrics.cache_load += (cache_info_pseudonyms.currsize + cache_info_urls.currsize) / ( - cache_info_pseudonyms.maxsize + cache_info_urls.maxsize + def _update_cache_metrics(self) -> None: + caches = [ + f.cache_info() + for f in [self._get_pseudonym_dict_cached, self._pseudonymize_url_cached] + if is_lru_cached(f) + ] + + self.metrics.new_results += sum(c.misses for c in caches) + self.metrics.cached_results += sum(c.hits for c in caches) + self.metrics.num_cache_entries += sum(c.currsize for c in caches) + self.metrics.cache_load += (sum(c.currsize for c in caches)) / ( + sum(typing.cast(int, c.maxsize) for c in caches) ) diff --git a/logprep/run_ng.py b/logprep/run_ng.py index 8db477f0a..7146142fa 100644 --- a/logprep/run_ng.py +++ b/logprep/run_ng.py @@ -1,15 +1,18 @@ # pylint: disable=logging-fstring-interpolation """This module can be used to start the logprep.""" +import asyncio import logging -import os import signal import sys +from functools import partial from multiprocessing import set_start_method import click +import uvloop from logprep.ng.runner import Runner +from logprep.ng.util.async_helpers import asyncio_exception_handler from logprep.ng.util.configuration import Configuration, InvalidConfigurationError from logprep.util.defaults import EXITCODES from logprep.util.helper import get_versions_string @@ -27,9 +30,9 @@ def _print_version(config: "Configuration") -> None: sys.exit(EXITCODES.SUCCESS) -def _get_configuration(config_paths: tuple[str]) -> Configuration: +async def _get_configuration(config_paths: tuple[str]) -> Configuration: try: - config = Configuration.from_sources(config_paths) + config = await Configuration.from_sources(config_paths) logger.info("Log level set to '%s'", config.logger.level) return config except InvalidConfigurationError as error: @@ -66,34 +69,42 @@ def run(configs: tuple[str], version=None) -> None: CONFIG is a path to configuration file (filepath or URL). """ - configuration = _get_configuration(configs) - runner = Runner(configuration) - runner.setup_logging() - if version: - _print_version(configuration) - for version in get_versions_string(configuration).split("\n"): - logger.info(version) - logger.debug(f"Metric export enabled: {configuration.metrics.enabled}") - logger.debug(f"Config path: {configs}") - try: - if "pytest" not in sys.modules: # needed for not blocking tests - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGINT, signal_handler) - logger.debug("Configuration loaded") - runner.run() - except SystemExit as error: - logger.debug(f"Exit received with code {error.code}") - sys.exit(error.code) - # pylint: disable=broad-except - except Exception as error: - if os.environ.get("DEBUG", False): - logger.exception(f"A critical error occurred: {error}") # pragma: no cover - else: - logger.critical(f"A critical error occurred: {error}") - if runner: - runner.stop() - sys.exit(EXITCODES.ERROR) - # pylint: enable=broad-except + + async def _run(configs_: tuple[str], version_=None): + configuration = await _get_configuration(configs_) + runner_ = Runner(configuration) + runner_.setup_logging() + if version_: + _print_version(configuration) + for v in get_versions_string(configuration).split("\n"): + logger.info(v) + logger.debug(f"Metric export enabled: {configuration.metrics.enabled}") + logger.debug(f"Config path: {configs_}") + try: + if "pytest" not in sys.modules: # needed for not blocking tests + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + logger.debug("Configuration loaded") + await runner_.run() + except SystemExit as error: + logger.debug(f"Exit received with code {error.code}") + sys.exit(error.code) + # pylint: disable=broad-except + except ExceptionGroup as error_group: + logger.exception(f"Multiple errors occurred: {error_group}") + except Exception as error: + logger.exception(f"A critical error occurred: {error}") + + if runner_: + runner_.stop() + sys.exit(EXITCODES.ERROR) + # pylint: enable=broad-except + + with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner: + handler = partial(asyncio_exception_handler, logger=logger) + loop = runner.get_loop() + loop.set_exception_handler(handler) + runner.run(_run(configs, version)) def signal_handler(__: int, _) -> None: diff --git a/logprep/util/helper.py b/logprep/util/helper.py index 131ff3033..da10d5fc7 100644 --- a/logprep/util/helper.py +++ b/logprep/util/helper.py @@ -25,6 +25,7 @@ from logprep.util.defaults import DEFAULT_CONFIG_LOCATION if TYPE_CHECKING: # pragma: no cover + from logprep.ng.util.configuration import Configuration as NgConfiguration from logprep.processor.base.rule import Rule from logprep.util.configuration import Configuration @@ -144,6 +145,7 @@ def _add_field_to( elif isinstance(existing_value, (int, float, str, bool)) and isinstance(content, list): target_parent[target_key] = [existing_value, *content] else: + # FIXME combining ll 117 & 135, overwrite_target can never be True here if not overwrite_target: raise FieldExistsWarning(rule, event, [target_field]) target_parent[target_key] = [existing_value, content] @@ -738,7 +740,9 @@ def get_source_fields_dict(event, rule): return source_field_dict -def get_versions_string(config: Optional["Configuration"] = None) -> str: +def get_versions_string( + config: Optional["Configuration"] | Optional["NgConfiguration"] = None, +) -> str: """ Prints the version and exists. If a configuration was found then it's version is printed as well diff --git a/pyproject.toml b/pyproject.toml index 07ba18812..e353dff0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,7 +67,7 @@ dependencies = [ "luqum<2", "more-itertools==8.10.0", "numpy>=1.26.0,<3", - "opensearch-py<4", + "opensearch-py[async]<4", "prometheus_client<1", "protobuf>=3.20.2,<7", "pycryptodome<4", @@ -162,7 +162,7 @@ before-build = "curl -sSf https://sh.rustup.rs | sh -s -- -y" environment = 'PATH=$HOME/.cargo/bin:$PATH' [tool.pylint.MAIN] -ignore = ".venv" +ignore = ".venv, poc" fail-under = 9.5 [tool.pylint.FORMAT] @@ -182,7 +182,7 @@ max-attributes=12 [tool.pylint.CLASSES] # List of method names used to declare (i.e. assign) instance attributes. -defining-attr-methods="__init__,__new__,setup" +defining-attr-methods="__init__,__new__,setup,_setup" [tool.mypy] # use imported type information but don't report issues if not a file under check diff --git a/run_benchmarks.py b/run_benchmarks.py new file mode 100644 index 000000000..f8d7facac --- /dev/null +++ b/run_benchmarks.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 + +import subprocess +import sys +from datetime import datetime +from pathlib import Path + +PYTHON_VERSIONS = ["3.11"] # , "3.12", "3.13", "3.14"] +MODES = [ + ("asyncNG", "1"), + # ("nonNG", "0"), +] + + +def run_benchmarks() -> None: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + out_dir = Path("benchmark_results") / timestamp + out_dir.mkdir(parents=True, exist_ok=True) + + print(f"Results directory: {out_dir}\n") + + commands = [] + + for mode_name, ng_flag in MODES: + for py in PYTHON_VERSIONS: + outfile = out_dir / f"{mode_name}_python{py}.txt" + + cmd = [ + "uv", + "run", + "--python", + py, + "benchmark.py", + "--event-num", + "3210", + "--runs", + "60", + "--ng", + ng_flag, + "--out", + str(outfile), + ] + + commands.append(cmd) + + for i, cmd in enumerate(commands, start=1): + print(f"=== Run {i}/{len(commands)} ===") + print(" ".join(cmd)) + + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError as e: + print(f"Run failed with exit code {e.returncode}") + sys.exit(e.returncode) + + print("\nAll benchmark runs finished.") + + +if __name__ == "__main__": + run_benchmarks() diff --git a/uv.lock b/uv.lock index e0d2b2af7..fd9adfdd8 100644 --- a/uv.lock +++ b/uv.lock @@ -1392,7 +1392,7 @@ dependencies = [ { name = "more-itertools" }, { name = "msgspec" }, { name = "numpy" }, - { name = "opensearch-py" }, + { name = "opensearch-py", extra = ["async"] }, { name = "pandas" }, { name = "prometheus-client" }, { name = "protobuf" }, @@ -1474,7 +1474,7 @@ requires-dist = [ { name = "nbsphinx", marker = "extra == 'doc'", specifier = ">=0.9" }, { name = "numpy", specifier = ">=1.26.0,<3" }, { name = "openpyxl", marker = "extra == 'doc'" }, - { name = "opensearch-py", specifier = "<4" }, + { name = "opensearch-py", extras = ["async"], specifier = "<4" }, { name = "pandas", specifier = "<3" }, { name = "pre-commit", marker = "extra == 'dev'" }, { name = "prometheus-client", specifier = "<1" }, @@ -2140,6 +2140,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/a1/293c8ad81768ad625283d960685bde07c6302abf20a685e693b48ab6eb91/opensearch_py-3.1.0-py3-none-any.whl", hash = "sha256:e5af83d0454323e6ea9ddee8c0dcc185c0181054592d23cb701da46271a3b65b", size = 385729, upload-time = "2025-11-20T16:37:34.941Z" }, ] +[package.optional-dependencies] +async = [ + { name = "aiohttp" }, +] + [[package]] name = "orderly-set" version = "5.5.0"