diff --git a/benchmark.py b/benchmark.py index 92b3118bd..aa49b5805 100644 --- a/benchmark.py +++ b/benchmark.py @@ -1,4 +1,5 @@ # pylint: disable=C0103 + """ Benchmark runner for logprep (logprep-ng and non-ng). @@ -285,33 +286,6 @@ def opensearch_count_processed(opensearch_url: str, processed_index: str) -> int return int(resp.json()["count"]) -def opensearch_debug_snapshot(opensearch_url: str) -> None: - """ - Print a small OpenSearch state snapshot for debugging. - Never raises (best-effort). - """ - try: - r = requests.get(f"{opensearch_url}/_cat/indices?v", timeout=10) - print("\n--- _cat/indices ---") - print(r.text) - except Exception as e: - print(f"\n--- _cat/indices (failed) ---\n{e}") - - try: - r = requests.get(f"{opensearch_url}/_cat/count?v", timeout=10) - print("\n--- _cat/count ---") - print(r.text) - except Exception as e: - print(f"\n--- _cat/count (failed) ---\n{e}") - - try: - r = requests.get(f"{opensearch_url}/_cat/aliases?v", timeout=10) - print("\n--- _cat/aliases ---") - print(r.text) - except Exception as e: - print(f"\n--- _cat/aliases (failed) ---\n{e}") - - def reset_prometheus_dir(path: str) -> None: """ Recreate PROMETHEUS_MULTIPROC_DIR. @@ -334,8 +308,8 @@ def resolve_pipeline_config(ng: int) -> Path: Pipeline config path. """ if ng == 1: - return Path("./examples/exampledata/config/_benchmark_ng_pipeline.yml") - return Path("./examples/exampledata/config/_benchmark_non_ng_pipeline.yml") + return Path("./examples/exampledata/config/ng_pipeline.yml") + return Path("./examples/exampledata/config/pipeline.yml") def read_vm_max_map_count() -> int: @@ -622,27 +596,20 @@ def benchmark_run( time.sleep(sleep_after_logprep_start_s) - print("\n=== OpenSearch snapshot (before measurement) ===") - opensearch_debug_snapshot(opensearch_url) - baseline = opensearch_count_processed(opensearch_url, processed_index) startup_s = time.time() - t_startup t_run = time.time() time.sleep(run_seconds) + window_s = time.time() - t_run kill_hard(logprep_proc) - - window_s = time.time() - t_run logprep_proc = None _current_logprep_proc = None # ensure near-real-time writes are visible to _count before measuring opensearch_refresh(opensearch_url, processed_index) - print("\n=== OpenSearch snapshot (after run / after refresh) ===") - opensearch_debug_snapshot(opensearch_url) - after = opensearch_count_processed(opensearch_url, processed_index) processed = max(0, after - baseline) diff --git a/logprep/abc/component.py b/logprep/abc/component.py index 6fa96c84a..a80910464 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -146,7 +146,7 @@ def _shut_down(self) -> None: self._clear_scheduled_jobs() self._clear_properties() - async def shut_down(self): + def shut_down(self): """Stop processing of this component. Optional: Called when stopping the pipeline diff --git a/logprep/abc/connector.py b/logprep/abc/connector.py index 7749687a3..839ac61cf 100644 --- a/logprep/abc/connector.py +++ b/logprep/abc/connector.py @@ -48,3 +48,13 @@ class Metrics(NgComponent.Metrics): ) ) """Number of errors that occurred while processing events""" + + async def setup(self) -> None: + """Set up the connector.""" + + await super().setup() + + async def shut_down(self) -> None: + """Shutdown the connector and cleanup resources.""" + + await super().shut_down() diff --git a/logprep/ng/abc/component.py b/logprep/ng/abc/component.py index 551a7a391..0bb618e56 100644 --- a/logprep/ng/abc/component.py +++ b/logprep/ng/abc/component.py @@ -16,9 +16,13 @@ class NgComponent(Component): # This is unclean from an interface perspective, but works if the worlds doen't mix. async def setup(self) -> None: - return super().setup() + """Set up the ng component.""" + + super().setup() async def shut_down(self) -> None: - return super().shut_down() + """Shut down ng component and cleanup resources.""" + + super().shut_down() # pylint: enable=invalid-overridden-method,useless-parent-delegation diff --git a/logprep/ng/abc/input.py b/logprep/ng/abc/input.py index 547759467..9f908e0d0 100644 --- a/logprep/ng/abc/input.py +++ b/logprep/ng/abc/input.py @@ -504,3 +504,13 @@ def _add_hmac_to( } add_fields_to(event_dict, new_field) return event_dict + + async def setup(self) -> None: + """Set up the input connector.""" + + await super().setup() + + async def shut_down(self) -> None: + """Shut down input components and cleanup resources.""" + + await super().shut_down() diff --git a/logprep/ng/abc/output.py b/logprep/ng/abc/output.py index 979945560..ab4f71050 100644 --- a/logprep/ng/abc/output.py +++ b/logprep/ng/abc/output.py @@ -144,7 +144,13 @@ def wrapper(self, *args, **kwargs): return wrapper - def _shut_down(self) -> None: - """Shut down the output connector.""" - self.flush() - return super()._shut_down() + async def setup(self) -> None: + """Set up the output connector.""" + + await super().setup() + + async def shut_down(self) -> None: + """Shut down the output connector and cleanup resources.""" + + await self.flush() + await super().shut_down() diff --git a/logprep/ng/abc/processor.py b/logprep/ng/abc/processor.py index cb730960b..d22602ef9 100644 --- a/logprep/ng/abc/processor.py +++ b/logprep/ng/abc/processor.py @@ -256,6 +256,13 @@ def _write_target_field(self, event: dict, rule: "Rule", result: Any) -> None: ) async def setup(self) -> None: + """Set up the processor.""" + await super().setup() for rule in self.rules: _ = rule.metrics # initialize metrics to show them on startup + + async def shut_down(self) -> None: + """Shut down the processor and run required cleanups""" + + await super().shut_down() diff --git a/logprep/ng/connector/confluent_kafka/input.py b/logprep/ng/connector/confluent_kafka/input.py index 5211072b5..75d330d11 100644 --- a/logprep/ng/connector/confluent_kafka/input.py +++ b/logprep/ng/connector/confluent_kafka/input.py @@ -28,14 +28,12 @@ auto.offset.reset: "earliest" """ -# pylint: enable=line-too-long import logging import os import typing from functools import cached_property, partial from socket import getfqdn -from types import MappingProxyType -from typing import Union +from types import MappingProxyType # pylint: disable=no-name-in-module import msgspec from attrs import define, field, validators @@ -348,7 +346,7 @@ async def get_consumer(self, max_workers: int = 4) -> AIOConsumer: return self._consumer - def _error_callback(self, error: KafkaException) -> None: + async def _error_callback(self, error: KafkaException) -> None: """Callback for generic/global error events, these errors are typically to be considered informational since the client will automatically try to recover. This callback is served upon calling client.poll() @@ -361,7 +359,7 @@ def _error_callback(self, error: KafkaException) -> None: self.metrics.number_of_errors += 1 logger.error("%s: %s", self.describe(), error) - def _stats_callback(self, stats_raw: str) -> None: + async def _stats_callback(self, stats_raw: str) -> None: """Callback for statistics data. This callback is triggered by poll() or flush every `statistics.interval.ms` (needs to be configured separately) @@ -395,8 +393,10 @@ def _stats_callback(self, stats_raw: str) -> None: "assignment_size", DEFAULT_RETURN ) - def _commit_callback( - self, error: Union[KafkaException, None], topic_partitions: list[TopicPartition] + async def _commit_callback( + self, + error: KafkaException | None, + topic_partitions: list[TopicPartition], ) -> None: """Callback used to indicate success or failure of asynchronous and automatic commit requests. This callback is served upon calling consumer.poll() @@ -461,8 +461,6 @@ async def _get_raw_event(self, timeout: float) -> Message | None: # type: ignor message = await consumer.poll(timeout=timeout) except RuntimeError as error: raise FatalInputError(self, str(error)) from error - except Exception as error: # remove this - pass if message is None: return None if message.value() is None or message.partition() is None or message.offset() is None: @@ -503,7 +501,6 @@ async def _get_event(self, timeout: float) -> tuple: """ message = await self._get_raw_event(timeout) - # assert None not in (message.value(), message.partition(), message.offset()) if message is None: return None, None, None @@ -602,17 +599,11 @@ async def _get_memberid(self) -> str | None: member_id = None try: consumer = await self.get_consumer() - member_id = consumer._consumer.memberid() + member_id = consumer._consumer.memberid() # pylint: disable=protected-access except RuntimeError as error: logger.error("Failed to retrieve member ID: %s", error) return member_id - async def shut_down(self) -> None: - """Close consumer, which also commits kafka offsets.""" - consumer = await self.get_consumer() - await consumer.close() - super()._shut_down() - def health(self) -> bool: """Check the health of the component. @@ -637,7 +628,8 @@ async def acknowledge(self, events: list[LogEvent]): logger.debug("acknowledge called") async def setup(self): - """Set the component up.""" + """Set the confluent kafka input connector.""" + await super().setup() try: @@ -651,3 +643,11 @@ async def setup(self): ) except KafkaException as error: raise FatalInputError(self, f"Could not setup kafka consumer: {error}") from error + + async def shut_down(self) -> None: + """Shut down the confluent kafka input connector and cleanup resources.""" + + if self._consumer is not None: + await self._consumer.close() + + await super().shut_down() diff --git a/logprep/ng/connector/confluent_kafka/output.py b/logprep/ng/connector/confluent_kafka/output.py index e4356f08f..914b7be9d 100644 --- a/logprep/ng/connector/confluent_kafka/output.py +++ b/logprep/ng/connector/confluent_kafka/output.py @@ -33,6 +33,7 @@ from attrs import define, field, validators from confluent_kafka import KafkaException, Message, Producer # type: ignore from confluent_kafka.admin import AdminClient +from confluent_kafka.aio import AIOProducer from logprep.metrics.metrics import GaugeMetric from logprep.ng.abc.event import Event @@ -235,8 +236,8 @@ def _admin(self) -> AdminClient: return AdminClient(admin_config) @cached_property - def _producer(self) -> Producer: - return Producer(self._kafka_config) + def _producer(self) -> AIOProducer: + return AIOProducer(self._kafka_config) def _error_callback(self, error: KafkaException) -> None: """Callback for generic/global error events, these errors are typically @@ -285,14 +286,12 @@ def describe(self) -> str: async def store_batch( self, events: Sequence[Event], target: str | None = None - ) -> tuple[Sequence[Event], Sequence[Event]]: + ) -> Sequence[Event]: store_target = target if target is not None else self.config.topic for event in events: await self.store_custom(event, store_target) - return ( - [e for e in events if e.state == EventStateType.DELIVERED], - [e for e in events if e.state == EventStateType.FAILED], - ) + + return events async def store(self, event: Event) -> None: """Store a document in the producer topic. @@ -316,23 +315,35 @@ async def store_custom(self, event: Event, target: str) -> None: target : str Topic to store event data in. """ - event.state.current_state = EventStateType.STORING_IN_OUTPUT - document = event.data self.metrics.number_of_processed_events += 1 + try: - self._producer.produce( + delivery_future = await self._producer.produce( topic=target, value=self._encoder.encode(document), - on_delivery=partial(self.on_delivery, event), ) - logger.debug("Produced message %s to topic %s", str(document), target) - self._producer.poll(self.config.send_timeout) - self._producer.flush() - except BufferError: - # block program until buffer is empty or timeout is reached - self._producer.flush(timeout=self.config.flush_timeout) - logger.debug("Buffer full, flushing") + msg = await delivery_future + except KafkaException as err: + event.state.current_state = EventStateType.FAILED + event.errors.append(err) + logger.error("Kafka exception during produce: %s", err) + self.metrics.number_of_errors += 1 + return + except Exception as err: + event.state.current_state = EventStateType.FAILED + event.errors.append(err) + logger.error("Message delivery failed: %s", err) + self.metrics.number_of_errors += 1 + return + + event.state.current_state = EventStateType.DELIVERED + logger.debug( + "Message delivered to '%s' partition %s, offset %s", + msg.topic(), + msg.partition(), + msg.offset(), + ) async def flush(self) -> None: """ensures that all messages are flushed. According to @@ -364,24 +375,17 @@ def health(self) -> bool: return super().health() async def setup(self) -> None: - """Set the component up.""" + """Set the confluent kafka output connector.""" + try: await super().setup() except KafkaException as error: raise FatalOutputError(self, f"Could not setup kafka producer: {error}") from error - def on_delivery(self, event: Event, err: KafkaException, msg: Message) -> None: - """Callback for delivery reports.""" - if err is not None: - event.state.current_state = EventStateType.FAILED - event.errors.append(err) - logger.error("Message delivery failed: %s", err) - self.metrics.number_of_errors += 1 - return - event.state.current_state = EventStateType.DELIVERED - logger.debug( - "Message delivered to '%s' partition %s, offset %s", - msg.topic(), - msg.partition(), - msg.offset(), - ) + async def shut_down(self) -> None: + """Shut down the confluent kafka output connector and cleanup resources.""" + + if "_producer" in self.__dict__: + await self.flush() + + await super().shut_down() diff --git a/logprep/ng/connector/file/input.py b/logprep/ng/connector/file/input.py index e13269c5b..715274745 100644 --- a/logprep/ng/connector/file/input.py +++ b/logprep/ng/connector/file/input.py @@ -191,7 +191,7 @@ async def setup(self) -> None: file_name=self.config.logfile_path, ) - def _shut_down(self) -> None: + async def shut_down(self) -> None: """Raises the Stop Event Flag that will stop the thread that monitors the logfile""" self.stop_flag.set() - return super()._shut_down() + await super().shut_down() diff --git a/logprep/ng/connector/http/input.py b/logprep/ng/connector/http/input.py index f206dd684..7e09bea14 100644 --- a/logprep/ng/connector/http/input.py +++ b/logprep/ng/connector/http/input.py @@ -356,11 +356,11 @@ async def _get_event(self, timeout: float) -> tuple: except queue.Empty: return None, None, None - def _shut_down(self): + async def shut_down(self): """Raises Uvicorn HTTP Server internal stop flag and waits to join""" if self.http_server: self.http_server.shut_down() - return super()._shut_down() + await super().shut_down() @cached_property def health_endpoints(self) -> list[str]: diff --git a/logprep/ng/connector/opensearch/output.py b/logprep/ng/connector/opensearch/output.py index 9aa5efbe2..df99a74cf 100644 --- a/logprep/ng/connector/opensearch/output.py +++ b/logprep/ng/connector/opensearch/output.py @@ -45,7 +45,6 @@ helpers, ) from opensearchpy.serializer import JSONSerializer -from typing_extensions import override from logprep.abc.exceptions import LogprepException from logprep.ng.abc.event import Event @@ -356,6 +355,7 @@ async def health(self) -> bool: # type: ignore # TODO: fix mypy issue return False return super().health() and resp.get("status") in self.config.desired_cluster_status - @override async def shut_down(self): - await self._search_context.close() + if "_search_context" in self.__dict__: + await self._search_context.close() + await super().shut_down() diff --git a/logprep/ng/manager.py b/logprep/ng/manager.py index 58ca56c6f..ad697823a 100644 --- a/logprep/ng/manager.py +++ b/logprep/ng/manager.py @@ -40,6 +40,8 @@ def __init__(self, configuration: Configuration, shutdown_timeout_s: float) -> N self._shutdown_timeout_s = shutdown_timeout_s async def setup(self): + """Setup the pipeline manager.""" + self._event_backlog = SetEventBacklog() self._input_connector = cast(Input, Factory.create(self.configuration.input)) @@ -74,7 +76,7 @@ async def setup(self): self._queues = [] self._orchestrator = self._create_orchestrator() - def _create_orchestrator(self) -> WorkerOrchestrator: + def _create_orchestrator(self) -> WorkerOrchestrator: # pylint: disable=too-many-locals process_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) send_to_default_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) send_to_extras_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) @@ -194,16 +196,15 @@ async def run(self) -> None: try: await self._orchestrator.run() except CancelledError: - # TODO cancelling() > 0 is no safe discriminator; improve - current_task = asyncio.current_task() - if current_task and current_task.cancelling() > 0: - logger.debug("PipelineManager.run has been cancelled. Shutting down") - await self._shut_down() - else: - logger.error("Orchestrator has been cancelled. Shutting down") - await self._shut_down() - - async def _shut_down(self) -> None: + logger.debug("PipelineManager.run cancelled. Shutting down.") + await self.shut_down() + raise + except Exception: + logger.exception("PipelineManager.run failed. Shutting down.") + await self.shut_down() + raise + + async def shut_down(self) -> None: """Shut down runner components, and required runner attributes.""" logger.debug( @@ -221,6 +222,7 @@ async def _shut_down(self) -> None: if self._sender is not None: await self._sender.shut_down() # self._input_connector.acknowledge() + await self._input_connector.shut_down() len_delivered_events = len(list(self._event_backlog.get(EventStateType.DELIVERED))) if len_delivered_events: diff --git a/logprep/ng/runner.py b/logprep/ng/runner.py index 7033fb17b..e4b6ddfe6 100644 --- a/logprep/ng/runner.py +++ b/logprep/ng/runner.py @@ -14,14 +14,13 @@ from logprep.ng.manager import PipelineManager from logprep.ng.util.async_helpers import TerminateTaskGroup, restart_task_on_iter from logprep.ng.util.configuration import Configuration -from logprep.ng.util.defaults import DEFAULT_LOG_CONFIG +from logprep.ng.util.defaults import DEFAULT_LOG_CONFIG, MIN_CONFIG_REFRESH_INTERVAL logger = logging.getLogger("Runner") GRACEFUL_SHUTDOWN_TIMEOUT = 3 HARD_SHUTDOWN_TIMEOUT = 5 -MAX_CONFIG_REFRESH_INTERVAL_DEVIATION_PERCENT = 0.05 class Runner: @@ -46,36 +45,49 @@ def __init__(self, configuration: Configuration) -> None: self._task_group = asyncio.TaskGroup() self._stop_event = asyncio.Event() - self._pipeline_manager: PipelineManager | None = None - async def _refresh_configuration_gen(self) -> AsyncGenerator[Configuration, None]: - self.config.schedule_config_refresh() + self._running_config_version = self.config.version refresh_interval = self.config.config_refresh_interval - while True: - self.config.refresh() - if self.config.version != self._running_config_version: - yield self.config - self._running_config_version = self.config.version - refresh_interval = self.config.config_refresh_interval + if refresh_interval is None: + logger.debug("Config refresh has been disabled.") + return - if refresh_interval is not None: - try: - await asyncio.sleep( - # realistic bad case: starting to sleep just a moment before scheduled time - # unlikely worst case: starting to sleep even after scheduled time - # (if yield takes some time and interval is short) - # --> compensate bad case by giving an upper boundary to the deviation - refresh_interval - * MAX_CONFIG_REFRESH_INTERVAL_DEVIATION_PERCENT - ) - except asyncio.CancelledError: - logger.debug("Config refresh cancelled. Exiting...") - raise + loop = asyncio.get_running_loop() + next_run = loop.time() + refresh_interval + + while True: + sleep_time = next_run - loop.time() + if sleep_time < 0: + sleep_time = 0.0 + + try: + await asyncio.sleep(sleep_time) + except asyncio.CancelledError: + logger.debug("Config refresh cancelled. Exiting...") + raise + + try: + await self.config.reload() + except asyncio.CancelledError: + logger.debug("Config reload cancelled. Exiting...") + raise + except Exception: + logger.exception("scheduled config reload failed") + raise else: + if self.config.version != self._running_config_version: + logger.info(f"Detected new config version: {self.config.version}") + self._running_config_version = self.config.version + yield self.config + + refresh_interval = self.config.config_refresh_interval + if refresh_interval is None: logger.debug("Config refresh has been disabled.") break + next_run += refresh_interval + async def run(self) -> None: """Run the runner and continuously process events until stopped.""" self._running_config_version = self.config.version diff --git a/logprep/ng/util/__init__.py b/logprep/ng/util/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/logprep/ng/util/async_helpers.py b/logprep/ng/util/async_helpers.py index 875062148..016d58517 100644 --- a/logprep/ng/util/async_helpers.py +++ b/logprep/ng/util/async_helpers.py @@ -2,6 +2,7 @@ import asyncio from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator, Callable +from logging import Logger from typing import Awaitable, TypeVar T = TypeVar("T") @@ -111,3 +112,33 @@ async def restart_task_on_iter( await cancel_task_and_wait(task, cancel_timeout_s) task = await task_factory(data) yield task + + +def asyncio_exception_handler( + _: asyncio.AbstractEventLoop, + context: dict, + logger: Logger, +) -> None: + """ + Handle unhandled exceptions reported by the asyncio event loop. + + Covers exceptions from background tasks, callbacks, and loop internals. + Does not handle exceptions from awaited coroutines (e.g. runner.run()). + """ + + msg = context.get("message", "Unhandled exception in event loop") + exception = context.get("exception") + task = context.get("task") or context.get("future") + + logger.error(f"{msg}") + + if task: + logger.error(f"Task: {task!r}") + + if isinstance(task, asyncio.Task): + logger.error(f"Task name: {task.get_name()}") + + if exception: + logger.error(f"Unhandled exception: {exception!r}", exc_info=exception) + else: + logger.error(f"Context: {context!r}") diff --git a/logprep/ng/util/configuration.py b/logprep/ng/util/configuration.py index 434147514..d674dfc97 100644 --- a/logprep/ng/util/configuration.py +++ b/logprep/ng/util/configuration.py @@ -197,7 +197,6 @@ from copy import deepcopy from importlib.metadata import version from itertools import chain -from logging.config import dictConfig from pathlib import Path from typing import Any, Iterable, List, Optional, Sequence, Tuple @@ -206,7 +205,6 @@ from ruamel.yaml import YAML from ruamel.yaml.compat import StringIO from ruamel.yaml.scanner import ScannerError -from schedule import Scheduler from logprep.abc.getter import Getter from logprep.factory import Factory @@ -239,14 +237,14 @@ class MyYAML(YAML): """helper class to dump yaml with ruamel.yaml""" def dump(self, data: Any, stream: Any | None = None, **kw: Any) -> Any: - inefficient = False if stream is None: - inefficient = True stream = StringIO() - YAML.dump(self, data, stream, **kw) - if inefficient: + YAML.dump(self, data, stream, **kw) return stream.getvalue() + YAML.dump(self, data, stream, **kw) + return None + yaml = MyYAML(pure=True) @@ -340,7 +338,7 @@ class LoggerConfig: compatible with :func:`logging.config.dictConfig`. """ - _LOG_LEVELS = ( + _log_levels = ( logging.NOTSET, # 0 logging.DEBUG, # 10 logging.INFO, # 20 @@ -358,7 +356,7 @@ class LoggerConfig: default="INFO", validator=[ validators.instance_of(str), - validators.in_([logging.getLevelName(level) for level in _LOG_LEVELS]), + validators.in_([logging.getLevelName(level) for level in _log_levels]), ], eq=False, ) @@ -443,7 +441,7 @@ def setup_logging(self) -> None: log_config = asdict(self) os.environ["LOGPREP_LOG_CONFIG"] = json.dumps(log_config) - dictConfig(log_config) + logging.config.dictConfig(log_config) def _set_loggers_levels(self) -> None: """Normalize per-logger configuration and preserve explicit levels. @@ -663,21 +661,12 @@ class Configuration: validator=validators.instance_of(tuple), factory=tuple, repr=False, eq=False ) - _scheduler: Scheduler = field( - factory=Scheduler, - validator=validators.instance_of(Scheduler), - repr=False, - eq=False, - init=False, - ) - _config_failure: bool = field(default=False, repr=False, eq=False, init=False) _unserializable_fields = ( "_getter", "_configs", "_config_failure", - "_scheduler", "_metrics", "_unserializable_fields", ) @@ -876,8 +865,16 @@ async def reload(self) -> None: errors: List[Exception] = [] try: new_config = await Configuration.from_sources(self.config_paths) + refresh_interval = ( + MIN_CONFIG_REFRESH_INTERVAL + if self.config_refresh_interval is None + else max( + self.config_refresh_interval, + MIN_CONFIG_REFRESH_INTERVAL, + ) + ) if new_config.config_refresh_interval is None: - new_config.config_refresh_interval = self.config_refresh_interval + new_config.config_refresh_interval = refresh_interval self._configs = new_config._configs # pylint: disable=protected-access self._set_attributes_from_configs() self._set_version_info_metric() @@ -907,49 +904,8 @@ def _set_config_refresh_interval(self, config_refresh_interval: int | None) -> N return config_refresh_interval = max(config_refresh_interval, MIN_CONFIG_REFRESH_INTERVAL) self.config_refresh_interval = config_refresh_interval - self.schedule_config_refresh() self._metrics.config_refresh_interval += config_refresh_interval - def schedule_config_refresh(self) -> None: - """ - Schedules a periodic configuration refresh based on the specified interval. - - Cancels any existing scheduled configuration refresh job and schedules a new one - using the current :code:`config_refresh_interval`. - The refresh job will call the :code:`reload` method at the specified interval - in seconds on invoking the :code:`refresh` method. - - Notes - ----- - - Only one configuration refresh job is scheduled at a time - - Any existing job is cancelled before scheduling a new one. - - The interval must be an integer representing seconds. - - Examples - -------- - >>> self.schedule_config_refresh() - Config refresh interval is set to: 60 seconds - """ - scheduler = self._scheduler - if self.config_refresh_interval is None: - if scheduler.jobs: - scheduler.cancel_job(scheduler.jobs[0]) - return - - self.config_refresh_interval = max( - self.config_refresh_interval, MIN_CONFIG_REFRESH_INTERVAL - ) - refresh_interval = self.config_refresh_interval - if scheduler.jobs: - scheduler.cancel_job(scheduler.jobs[0]) - if isinstance(refresh_interval, int): - scheduler.every(refresh_interval).seconds.do(self.reload) - logger.info("Config refresh interval is set to: %s seconds", refresh_interval) - - def refresh(self) -> None: - """Wrap the scheduler run_pending method hide the implementation details.""" - self._scheduler.run_pending() - def _set_attributes_from_configs(self) -> None: for attribute in filter(lambda x: x.repr, fields(self.__class__)): setattr( diff --git a/logprep/ng/util/defaults.py b/logprep/ng/util/defaults.py index b3060ecf3..62f9cd1aa 100644 --- a/logprep/ng/util/defaults.py +++ b/logprep/ng/util/defaults.py @@ -36,7 +36,7 @@ class EXITCODES(IntEnum): "version": 1, "formatters": { "logprep": { - "class": "logprep.ng.util.logging.LogprepFormatter", + "class": "logprep.ng.util.logging_helpers.LogprepFormatter", "format": DEFAULT_LOG_FORMAT, "datefmt": DEFAULT_LOG_DATE_FORMAT, } diff --git a/logprep/ng/util/logging.py b/logprep/ng/util/logging_helpers.py similarity index 100% rename from logprep/ng/util/logging.py rename to logprep/ng/util/logging_helpers.py diff --git a/logprep/ng/util/worker/__init__.py b/logprep/ng/util/worker/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/logprep/ng/util/worker/types.py b/logprep/ng/util/worker/types.py index 78593ed8c..61897cbe4 100644 --- a/logprep/ng/util/worker/types.py +++ b/logprep/ng/util/worker/types.py @@ -18,9 +18,7 @@ Input = TypeVar("Input") Output = TypeVar("Output") -SyncHandler = Callable[[list[Input]], list[Output]] AsyncHandler = Callable[[list[Input]], Coroutine[object, object, list[Output]]] -Handler = SyncHandler[Input, Output] | AsyncHandler[Input, Output] class SizeLimitedQueue(asyncio.Queue[T]): diff --git a/logprep/ng/util/worker/worker.py b/logprep/ng/util/worker/worker.py index 1584257a2..8cbe75de7 100644 --- a/logprep/ng/util/worker/worker.py +++ b/logprep/ng/util/worker/worker.py @@ -37,7 +37,7 @@ def __init__( batch_size: int, batch_interval_s: float, handler: AsyncHandler[Input, Output], - in_queue: asyncio.Queue[Input] | AsyncIterator[Input], + in_queue: SizeLimitedQueue[Input] | AsyncIterator[Input], out_queue: SizeLimitedQueue[Output] | None = None, ) -> None: self.name = name @@ -194,7 +194,6 @@ async def run(self, stop_event: asyncio.Event) -> None: while not stop_event.is_set(): item = await self.in_queue.get() await self.add(item) - # TODO is this await really necessary? await asyncio.sleep(0.0) else: while not stop_event.is_set(): @@ -203,7 +202,6 @@ async def run(self, stop_event: asyncio.Event) -> None: if item is not None: await self.add(item) - # TODO is this await really necessary? await asyncio.sleep(0.0) except asyncio.CancelledError: @@ -213,29 +211,6 @@ async def run(self, stop_event: asyncio.Event) -> None: await self.flush() -class TransferWorker(Worker[T, T]): - def __init__( - self, - name: str, - batch_size: int, - batch_interval_s: float, - in_queue: asyncio.Queue[T] | AsyncIterator[T], - out_queue: SizeLimitedQueue[T] | None = None, - ) -> None: - super().__init__( - name=name, - batch_size=batch_size, - batch_interval_s=batch_interval_s, - in_queue=in_queue, - out_queue=out_queue, - handler=self.__handle_noop, - ) - - async def __handle_noop(self, batch: list[T]) -> list[T]: - await asyncio.sleep(0) - return [e for e in batch if e is not None] - - class WorkerOrchestrator: """ Orchestrates a chain of workers. @@ -325,11 +300,11 @@ async def shut_down(self, timeout_s: float) -> None: ) except TimeoutError: unfinished_workers = [w for w in tasks_but_current if not w.done()] - if len(unfinished_workers) > 0: + if unfinished_workers: logger.debug( - "[%d/%d] did not stop gracefully. Cancelling: [%s]", + "[%d/%d] did not stop gracefully. Awaiting cancellation: [%s]", len(unfinished_workers), len(tasks_but_current), ", ".join(map(asyncio.Task.get_name, unfinished_workers)), ) - await asyncio.gather(*tasks_but_current, return_exceptions=True) + await asyncio.gather(*unfinished_workers, return_exceptions=True) diff --git a/logprep/run_ng.py b/logprep/run_ng.py index 3014710a1..7146142fa 100644 --- a/logprep/run_ng.py +++ b/logprep/run_ng.py @@ -1,16 +1,18 @@ # pylint: disable=logging-fstring-interpolation """This module can be used to start the logprep.""" +import asyncio import logging -import os import signal import sys +from functools import partial from multiprocessing import set_start_method import click import uvloop from logprep.ng.runner import Runner +from logprep.ng.util.async_helpers import asyncio_exception_handler from logprep.ng.util.configuration import Configuration, InvalidConfigurationError from logprep.util.defaults import EXITCODES from logprep.util.helper import get_versions_string @@ -68,22 +70,22 @@ def run(configs: tuple[str], version=None) -> None: CONFIG is a path to configuration file (filepath or URL). """ - async def _run(configs: tuple[str], version=None): - configuration = await _get_configuration(configs) - runner = Runner(configuration) - runner.setup_logging() - if version: + async def _run(configs_: tuple[str], version_=None): + configuration = await _get_configuration(configs_) + runner_ = Runner(configuration) + runner_.setup_logging() + if version_: _print_version(configuration) for v in get_versions_string(configuration).split("\n"): logger.info(v) logger.debug(f"Metric export enabled: {configuration.metrics.enabled}") - logger.debug(f"Config path: {configs}") + logger.debug(f"Config path: {configs_}") try: if "pytest" not in sys.modules: # needed for not blocking tests signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) logger.debug("Configuration loaded") - await runner.run() + await runner_.run() except SystemExit as error: logger.debug(f"Exit received with code {error.code}") sys.exit(error.code) @@ -91,16 +93,18 @@ async def _run(configs: tuple[str], version=None): except ExceptionGroup as error_group: logger.exception(f"Multiple errors occurred: {error_group}") except Exception as error: - if os.environ.get("DEBUG", False): - logger.exception(f"A critical error occurred: {error}") # pragma: no cover - else: - logger.critical(f"A critical error occurred: {error}") - if runner: - runner.stop() + logger.exception(f"A critical error occurred: {error}") + + if runner_: + runner_.stop() sys.exit(EXITCODES.ERROR) # pylint: enable=broad-except - uvloop.run(_run(configs, version)) + with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner: + handler = partial(asyncio_exception_handler, logger=logger) + loop = runner.get_loop() + loop.set_exception_handler(handler) + runner.run(_run(configs, version)) def signal_handler(__: int, _) -> None: diff --git a/run_benchmarks.py b/run_benchmarks.py index 7a57df5cb..8636e052d 100644 --- a/run_benchmarks.py +++ b/run_benchmarks.py @@ -5,10 +5,10 @@ from datetime import datetime from pathlib import Path -PYTHON_VERSIONS = ["3.11"] # , "3.12", "3.13", "3.14"] +PYTHON_VERSIONS = ["3.11", "3.12", "3.13", "3.14"] MODES = [ - ("nonNG", "0"), ("asyncNG", "1"), + ("nonNG", "0"), ] @@ -32,9 +32,12 @@ def run_benchmarks() -> None: py, "benchmark.py", "--event-num", - "120000", + "250000", "--runs", "30", + "30", + "45", + "45", "--ng", ng_flag, "--out",