From 6463412406de5e1e40b105da531f5bfed16b9415 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Thu, 19 Mar 2026 13:46:10 +0100 Subject: [PATCH 01/26] refactor: remove unnecessary types --- logprep/ng/util/worker/types.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/logprep/ng/util/worker/types.py b/logprep/ng/util/worker/types.py index 78593ed8c..61897cbe4 100644 --- a/logprep/ng/util/worker/types.py +++ b/logprep/ng/util/worker/types.py @@ -18,9 +18,7 @@ Input = TypeVar("Input") Output = TypeVar("Output") -SyncHandler = Callable[[list[Input]], list[Output]] AsyncHandler = Callable[[list[Input]], Coroutine[object, object, list[Output]]] -Handler = SyncHandler[Input, Output] | AsyncHandler[Input, Output] class SizeLimitedQueue(asyncio.Queue[T]): From 2c06beff21e70c6febcea2683687842a660ebbe5 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Thu, 19 Mar 2026 15:31:27 +0100 Subject: [PATCH 02/26] refactor: replace uvloop.run with asyncio.Runner and configurable loop_factory --- logprep/run_ng.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/logprep/run_ng.py b/logprep/run_ng.py index 3014710a1..e4c6fae5c 100644 --- a/logprep/run_ng.py +++ b/logprep/run_ng.py @@ -1,6 +1,7 @@ # pylint: disable=logging-fstring-interpolation """This module can be used to start the logprep.""" +import asyncio import logging import os import signal @@ -8,7 +9,6 @@ from multiprocessing import set_start_method import click -import uvloop from logprep.ng.runner import Runner from logprep.ng.util.configuration import Configuration, InvalidConfigurationError @@ -70,8 +70,8 @@ def run(configs: tuple[str], version=None) -> None: async def _run(configs: tuple[str], version=None): configuration = await _get_configuration(configs) - runner = Runner(configuration) - runner.setup_logging() + _runner = Runner(configuration) + _runner.setup_logging() if version: _print_version(configuration) for v in get_versions_string(configuration).split("\n"): @@ -83,7 +83,7 @@ async def _run(configs: tuple[str], version=None): signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) logger.debug("Configuration loaded") - await runner.run() + await _runner.run() except SystemExit as error: logger.debug(f"Exit received with code {error.code}") sys.exit(error.code) @@ -95,12 +95,27 @@ async def _run(configs: tuple[str], version=None): logger.exception(f"A critical error occurred: {error}") # pragma: no cover else: logger.critical(f"A critical error occurred: {error}") - if runner: - runner.stop() + if _runner: + _runner.stop() sys.exit(EXITCODES.ERROR) # pylint: enable=broad-except - uvloop.run(_run(configs, version)) + def _get_loop_factory(mode: str): + match mode: + case "uvloop": + import uvloop + + logger.info("Using event loop: uvloop") + return uvloop.new_event_loop + case "asyncio": + logger.info("Using event loop: asyncio") + return asyncio.new_event_loop + + case _: + raise ValueError(f"Unknown loop mode: {mode}") + + with asyncio.Runner(loop_factory=_get_loop_factory(mode="uvloop")) as runner: + runner.run(_run(configs, version)) def signal_handler(__: int, _) -> None: From 70f15425e876e46a200fe2c4a8f776928e21cd54 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Wed, 25 Mar 2026 10:42:52 +0100 Subject: [PATCH 03/26] fix: fix config refresh, remove config scheduler, small adaptions --- logprep/ng/connector/confluent_kafka/input.py | 14 ++--- logprep/ng/manager.py | 21 +++---- logprep/ng/runner.py | 19 ++++-- logprep/ng/util/configuration.py | 59 +++++++++++++++---- logprep/ng/util/worker/worker.py | 27 +-------- logprep/run_ng.py | 22 +++---- pyproject.toml | 11 +++- 7 files changed, 98 insertions(+), 75 deletions(-) diff --git a/logprep/ng/connector/confluent_kafka/input.py b/logprep/ng/connector/confluent_kafka/input.py index 5211072b5..a05588132 100644 --- a/logprep/ng/connector/confluent_kafka/input.py +++ b/logprep/ng/connector/confluent_kafka/input.py @@ -28,14 +28,12 @@ auto.offset.reset: "earliest" """ -# pylint: enable=line-too-long import logging import os import typing from functools import cached_property, partial from socket import getfqdn -from types import MappingProxyType -from typing import Union +from types import MappingProxyType # pylint: disable=no-name-in-module import msgspec from attrs import define, field, validators @@ -81,7 +79,7 @@ DEFAULT_RETURN = 0 -logger = logging.getLogger("KafkaInput") +logger = logging.getLogger("KafkaInput") # pylint: disable=no-member class ConfluentKafkaInput(Input): @@ -396,7 +394,9 @@ def _stats_callback(self, stats_raw: str) -> None: ) def _commit_callback( - self, error: Union[KafkaException, None], topic_partitions: list[TopicPartition] + self, + error: KafkaException | None, + topic_partitions: list[TopicPartition], ) -> None: """Callback used to indicate success or failure of asynchronous and automatic commit requests. This callback is served upon calling consumer.poll() @@ -461,8 +461,6 @@ async def _get_raw_event(self, timeout: float) -> Message | None: # type: ignor message = await consumer.poll(timeout=timeout) except RuntimeError as error: raise FatalInputError(self, str(error)) from error - except Exception as error: # remove this - pass if message is None: return None if message.value() is None or message.partition() is None or message.offset() is None: @@ -602,7 +600,7 @@ async def _get_memberid(self) -> str | None: member_id = None try: consumer = await self.get_consumer() - member_id = consumer._consumer.memberid() + member_id = consumer._consumer.memberid() # pylint: disable=protected-access except RuntimeError as error: logger.error("Failed to retrieve member ID: %s", error) return member_id diff --git a/logprep/ng/manager.py b/logprep/ng/manager.py index 58ca56c6f..124827347 100644 --- a/logprep/ng/manager.py +++ b/logprep/ng/manager.py @@ -22,7 +22,7 @@ from logprep.ng.util.worker.types import SizeLimitedQueue from logprep.ng.util.worker.worker import Worker, WorkerOrchestrator -logger = logging.getLogger("PipelineManager") +logger = logging.getLogger("PipelineManager") # pylint: disable=no-member BATCH_SIZE = 2_500 @@ -40,6 +40,8 @@ def __init__(self, configuration: Configuration, shutdown_timeout_s: float) -> N self._shutdown_timeout_s = shutdown_timeout_s async def setup(self): + """Setup the pipeline manager.""" + self._event_backlog = SetEventBacklog() self._input_connector = cast(Input, Factory.create(self.configuration.input)) @@ -74,7 +76,7 @@ async def setup(self): self._queues = [] self._orchestrator = self._create_orchestrator() - def _create_orchestrator(self) -> WorkerOrchestrator: + def _create_orchestrator(self) -> WorkerOrchestrator: # pylint: disable=too-many-locals process_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) send_to_default_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) send_to_extras_queue = SizeLimitedQueue[LogEvent](maxsize=MAX_QUEUE_SIZE) @@ -194,14 +196,13 @@ async def run(self) -> None: try: await self._orchestrator.run() except CancelledError: - # TODO cancelling() > 0 is no safe discriminator; improve - current_task = asyncio.current_task() - if current_task and current_task.cancelling() > 0: - logger.debug("PipelineManager.run has been cancelled. Shutting down") - await self._shut_down() - else: - logger.error("Orchestrator has been cancelled. Shutting down") - await self._shut_down() + logger.debug("PipelineManager.run cancelled. Shutting down.") + await self._shut_down() + raise + except Exception: + logger.exception("PipelineManager.run failed. Shutting down.") + await self._shut_down() + raise async def _shut_down(self) -> None: """Shut down runner components, and required runner attributes.""" diff --git a/logprep/ng/runner.py b/logprep/ng/runner.py index 7033fb17b..6d43a61a8 100644 --- a/logprep/ng/runner.py +++ b/logprep/ng/runner.py @@ -50,15 +50,21 @@ def __init__(self, configuration: Configuration) -> None: async def _refresh_configuration_gen(self) -> AsyncGenerator[Configuration, None]: self.config.schedule_config_refresh() + + self._running_config_version = self.config.version refresh_interval = self.config.config_refresh_interval + while True: self.config.refresh() if self.config.version != self._running_config_version: - yield self.config + logger.info(f"Detected new config version: {self.config.version}") + self._running_config_version = self.config.version refresh_interval = self.config.config_refresh_interval + yield self.config + if refresh_interval is not None: try: await asyncio.sleep( @@ -85,13 +91,14 @@ async def run(self) -> None: tg.create_task(TerminateTaskGroup.raise_on_event(self._stop_event)) async def start_pipeline(config: Configuration) -> asyncio.Task: - pipeline_manager = PipelineManager( + logger.debug(">>>>> Starting pipeline") + self._pipeline_manager = PipelineManager( config, shutdown_timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT ) - await pipeline_manager.setup() + await self._pipeline_manager.setup() return tg.create_task( - pipeline_manager.run(), + self._pipeline_manager.run(), name="pipeline_manager", ) @@ -136,7 +143,7 @@ def setup_logging(self) -> None: """ warnings.simplefilter("always", DeprecationWarning) - logging.captureWarnings(True) + logging.captureWarnings(True) # pylint: disable=no-member log_config = DEFAULT_LOG_CONFIG | asdict(self.config.logger) os.environ["LOGPREP_LOG_CONFIG"] = json.dumps(log_config) - logging.config.dictConfig(log_config) + logging.config.dictConfig(log_config) # pylint: disable=no-member diff --git a/logprep/ng/util/configuration.py b/logprep/ng/util/configuration.py index 434147514..ca5c81091 100644 --- a/logprep/ng/util/configuration.py +++ b/logprep/ng/util/configuration.py @@ -190,6 +190,7 @@ group.id: test" """ +import asyncio import json import logging import os @@ -197,7 +198,6 @@ from copy import deepcopy from importlib.metadata import version from itertools import chain -from logging.config import dictConfig from pathlib import Path from typing import Any, Iterable, List, Optional, Sequence, Tuple @@ -232,21 +232,21 @@ ) from logprep.util.rule_loader import RuleLoader -logger = logging.getLogger("Config") +logger = logging.getLogger("Config") # pylint: disable=no-member class MyYAML(YAML): """helper class to dump yaml with ruamel.yaml""" def dump(self, data: Any, stream: Any | None = None, **kw: Any) -> Any: - inefficient = False if stream is None: - inefficient = True stream = StringIO() - YAML.dump(self, data, stream, **kw) - if inefficient: + YAML.dump(self, data, stream, **kw) return stream.getvalue() + YAML.dump(self, data, stream, **kw) + return None + yaml = MyYAML(pure=True) @@ -340,7 +340,7 @@ class LoggerConfig: compatible with :func:`logging.config.dictConfig`. """ - _LOG_LEVELS = ( + _log_levels = ( logging.NOTSET, # 0 logging.DEBUG, # 10 logging.INFO, # 20 @@ -358,7 +358,7 @@ class LoggerConfig: default="INFO", validator=[ validators.instance_of(str), - validators.in_([logging.getLevelName(level) for level in _LOG_LEVELS]), + validators.in_([logging.getLevelName(level) for level in _log_levels]), ], eq=False, ) @@ -443,7 +443,7 @@ def setup_logging(self) -> None: log_config = asdict(self) os.environ["LOGPREP_LOG_CONFIG"] = json.dumps(log_config) - dictConfig(log_config) + logging.config.dictConfig(log_config) def _set_loggers_levels(self) -> None: """Normalize per-logger configuration and preserve explicit levels. @@ -673,6 +673,22 @@ class Configuration: _config_failure: bool = field(default=False, repr=False, eq=False, init=False) + _background_tasks: set = field( + factory=set, + validator=validators.instance_of(set), + repr=False, + eq=False, + init=False, + ) + + _reload_lock: asyncio.Lock = field( + factory=asyncio.Lock, + validator=validators.instance_of(asyncio.Lock), + repr=False, + eq=False, + init=False, + ) + _unserializable_fields = ( "_getter", "_configs", @@ -680,6 +696,8 @@ class Configuration: "_scheduler", "_metrics", "_unserializable_fields", + "_reload_lock", + "_background_tasks", ) @define(kw_only=True) @@ -943,7 +961,28 @@ def schedule_config_refresh(self) -> None: if scheduler.jobs: scheduler.cancel_job(scheduler.jobs[0]) if isinstance(refresh_interval, int): - scheduler.every(refresh_interval).seconds.do(self.reload) + + async def _reload_wrapper() -> None: + if self._reload_lock.locked(): + logger.warning( + "config reload already running; skipping scheduled config reload run", + ) + return + + async with self._reload_lock: + try: + await self.reload() + except Exception: + logger.exception("config reload failed") + raise + + def _schedule_reload() -> None: + task = asyncio.create_task(_reload_wrapper()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + + scheduler.every(refresh_interval).seconds.do(_schedule_reload) + logger.info("Config refresh interval is set to: %s seconds", refresh_interval) def refresh(self) -> None: diff --git a/logprep/ng/util/worker/worker.py b/logprep/ng/util/worker/worker.py index 1584257a2..5e2fca9e1 100644 --- a/logprep/ng/util/worker/worker.py +++ b/logprep/ng/util/worker/worker.py @@ -19,7 +19,7 @@ from logprep.ng.util.worker.types import AsyncHandler, SizeLimitedQueue -logger = logging.getLogger("Worker") +logger = logging.getLogger("Worker") # pylint: disable=no-member T = TypeVar("T") Input = TypeVar("Input") @@ -194,7 +194,6 @@ async def run(self, stop_event: asyncio.Event) -> None: while not stop_event.is_set(): item = await self.in_queue.get() await self.add(item) - # TODO is this await really necessary? await asyncio.sleep(0.0) else: while not stop_event.is_set(): @@ -203,7 +202,6 @@ async def run(self, stop_event: asyncio.Event) -> None: if item is not None: await self.add(item) - # TODO is this await really necessary? await asyncio.sleep(0.0) except asyncio.CancelledError: @@ -213,29 +211,6 @@ async def run(self, stop_event: asyncio.Event) -> None: await self.flush() -class TransferWorker(Worker[T, T]): - def __init__( - self, - name: str, - batch_size: int, - batch_interval_s: float, - in_queue: asyncio.Queue[T] | AsyncIterator[T], - out_queue: SizeLimitedQueue[T] | None = None, - ) -> None: - super().__init__( - name=name, - batch_size=batch_size, - batch_interval_s=batch_interval_s, - in_queue=in_queue, - out_queue=out_queue, - handler=self.__handle_noop, - ) - - async def __handle_noop(self, batch: list[T]) -> list[T]: - await asyncio.sleep(0) - return [e for e in batch if e is not None] - - class WorkerOrchestrator: """ Orchestrates a chain of workers. diff --git a/logprep/run_ng.py b/logprep/run_ng.py index e4c6fae5c..ed3c3eb92 100644 --- a/logprep/run_ng.py +++ b/logprep/run_ng.py @@ -3,7 +3,6 @@ import asyncio import logging -import os import signal import sys from multiprocessing import set_start_method @@ -20,7 +19,7 @@ init_yaml_loader_tags("safe", "rt") -logger = logging.getLogger("root") +logger = logging.getLogger("root") # pylint: disable=no-member def _print_version(config: "Configuration") -> None: @@ -68,16 +67,16 @@ def run(configs: tuple[str], version=None) -> None: CONFIG is a path to configuration file (filepath or URL). """ - async def _run(configs: tuple[str], version=None): - configuration = await _get_configuration(configs) + async def _run(configs_: tuple[str], version_=None): + configuration = await _get_configuration(configs_) _runner = Runner(configuration) _runner.setup_logging() - if version: + if version_: _print_version(configuration) for v in get_versions_string(configuration).split("\n"): logger.info(v) logger.debug(f"Metric export enabled: {configuration.metrics.enabled}") - logger.debug(f"Config path: {configs}") + logger.debug(f"Config path: {configs_}") try: if "pytest" not in sys.modules: # needed for not blocking tests signal.signal(signal.SIGTERM, signal_handler) @@ -91,10 +90,8 @@ async def _run(configs: tuple[str], version=None): except ExceptionGroup as error_group: logger.exception(f"Multiple errors occurred: {error_group}") except Exception as error: - if os.environ.get("DEBUG", False): - logger.exception(f"A critical error occurred: {error}") # pragma: no cover - else: - logger.critical(f"A critical error occurred: {error}") + logger.exception(f"A critical error occurred: {error}") + if _runner: _runner.stop() sys.exit(EXITCODES.ERROR) @@ -103,14 +100,11 @@ async def _run(configs: tuple[str], version=None): def _get_loop_factory(mode: str): match mode: case "uvloop": - import uvloop + import uvloop # pylint: disable=import-outside-toplevel - logger.info("Using event loop: uvloop") return uvloop.new_event_loop case "asyncio": - logger.info("Using event loop: asyncio") return asyncio.new_event_loop - case _: raise ValueError(f"Unknown loop mode: {mode}") diff --git a/pyproject.toml b/pyproject.toml index e353dff0a..97cbd5d81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -172,7 +172,16 @@ max-line-length=100 no-docstring-rgx="^test_|^.*TestCase|^_|^Test" [tool.pylint."MESAGES CONTROL"] -disable="too-few-public-methods,unsupported-membership-test" +disable = [ + "too-few-public-methods", + "unsupported-membership-test", + "too-many-positional-arguments", + "too-many-arguments", + "too-many-branches", + "too-many-instance-attributes", + "too-many-lines", + "line-too-long", +] [tool.pylint.DESIGN] min-public-methods=1 From 443816586ba5cc2e76e3773344399baf1cdb4787 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Wed, 25 Mar 2026 10:51:21 +0100 Subject: [PATCH 04/26] refactor: remove loop_factory --- logprep/run_ng.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/logprep/run_ng.py b/logprep/run_ng.py index ed3c3eb92..248bf0684 100644 --- a/logprep/run_ng.py +++ b/logprep/run_ng.py @@ -8,6 +8,7 @@ from multiprocessing import set_start_method import click +import uvloop from logprep.ng.runner import Runner from logprep.ng.util.configuration import Configuration, InvalidConfigurationError @@ -97,18 +98,7 @@ async def _run(configs_: tuple[str], version_=None): sys.exit(EXITCODES.ERROR) # pylint: enable=broad-except - def _get_loop_factory(mode: str): - match mode: - case "uvloop": - import uvloop # pylint: disable=import-outside-toplevel - - return uvloop.new_event_loop - case "asyncio": - return asyncio.new_event_loop - case _: - raise ValueError(f"Unknown loop mode: {mode}") - - with asyncio.Runner(loop_factory=_get_loop_factory(mode="uvloop")) as runner: + with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner: runner.run(_run(configs, version)) From 6cee075281e610c8b84e8d3dce67a87e9ff50fa9 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Wed, 25 Mar 2026 11:25:18 +0100 Subject: [PATCH 05/26] feat: add asyncio exception handler for unhandled errors --- logprep/ng/connector/confluent_kafka/input.py | 2 +- logprep/ng/manager.py | 2 +- logprep/ng/runner.py | 4 +-- logprep/ng/util/__init__.py | 0 logprep/ng/util/async_helpers.py | 33 +++++++++++++++++++ logprep/ng/util/configuration.py | 2 +- logprep/ng/util/defaults.py | 2 +- .../util/{logging.py => logprep_logging.py} | 0 logprep/ng/util/worker/__init__.py | 0 logprep/ng/util/worker/worker.py | 2 +- logprep/run_ng.py | 7 +++- 11 files changed, 46 insertions(+), 8 deletions(-) create mode 100644 logprep/ng/util/__init__.py rename logprep/ng/util/{logging.py => logprep_logging.py} (100%) create mode 100644 logprep/ng/util/worker/__init__.py diff --git a/logprep/ng/connector/confluent_kafka/input.py b/logprep/ng/connector/confluent_kafka/input.py index a05588132..5765c0d4f 100644 --- a/logprep/ng/connector/confluent_kafka/input.py +++ b/logprep/ng/connector/confluent_kafka/input.py @@ -79,7 +79,7 @@ DEFAULT_RETURN = 0 -logger = logging.getLogger("KafkaInput") # pylint: disable=no-member +logger = logging.getLogger("KafkaInput") class ConfluentKafkaInput(Input): diff --git a/logprep/ng/manager.py b/logprep/ng/manager.py index 124827347..934f17ab3 100644 --- a/logprep/ng/manager.py +++ b/logprep/ng/manager.py @@ -22,7 +22,7 @@ from logprep.ng.util.worker.types import SizeLimitedQueue from logprep.ng.util.worker.worker import Worker, WorkerOrchestrator -logger = logging.getLogger("PipelineManager") # pylint: disable=no-member +logger = logging.getLogger("PipelineManager") BATCH_SIZE = 2_500 diff --git a/logprep/ng/runner.py b/logprep/ng/runner.py index 6d43a61a8..6e91e7335 100644 --- a/logprep/ng/runner.py +++ b/logprep/ng/runner.py @@ -143,7 +143,7 @@ def setup_logging(self) -> None: """ warnings.simplefilter("always", DeprecationWarning) - logging.captureWarnings(True) # pylint: disable=no-member + logging.captureWarnings(True) log_config = DEFAULT_LOG_CONFIG | asdict(self.config.logger) os.environ["LOGPREP_LOG_CONFIG"] = json.dumps(log_config) - logging.config.dictConfig(log_config) # pylint: disable=no-member + logging.config.dictConfig(log_config) diff --git a/logprep/ng/util/__init__.py b/logprep/ng/util/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/logprep/ng/util/async_helpers.py b/logprep/ng/util/async_helpers.py index 875062148..972d723b8 100644 --- a/logprep/ng/util/async_helpers.py +++ b/logprep/ng/util/async_helpers.py @@ -2,6 +2,7 @@ import asyncio from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator, Callable +from logging import Logger from typing import Awaitable, TypeVar T = TypeVar("T") @@ -111,3 +112,35 @@ async def restart_task_on_iter( await cancel_task_and_wait(task, cancel_timeout_s) task = await task_factory(data) yield task + + +def asyncio_exception_handler( + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + context: dict, + logger: Logger, +) -> None: + """ + Handle unhandled exceptions reported by the asyncio event loop. + + Covers exceptions from background tasks, callbacks, and loop internals. + Does not handle exceptions from awaited coroutines (e.g. runner.run()). + + Args: + loop: The current event loop. + context: Asyncio error context (may contain message, exception, task/future). + logger: Logger used to record the error. + """ + + msg = context.get("message", "Unhandled exception in event loop") + exception = context.get("exception") + task = context.get("task") or context.get("future") + + logger.error(f"[asyncio] {msg}") + + if task: + logger.error(f"[asyncio] Task: {task!r}") + + if exception: + logger.error("[asyncio] Exception:", exc_info=exception) + else: + logger.error("[asyncio] Context: %s", context) diff --git a/logprep/ng/util/configuration.py b/logprep/ng/util/configuration.py index ca5c81091..3872a2291 100644 --- a/logprep/ng/util/configuration.py +++ b/logprep/ng/util/configuration.py @@ -232,7 +232,7 @@ ) from logprep.util.rule_loader import RuleLoader -logger = logging.getLogger("Config") # pylint: disable=no-member +logger = logging.getLogger("Config") class MyYAML(YAML): diff --git a/logprep/ng/util/defaults.py b/logprep/ng/util/defaults.py index b3060ecf3..a97bebfe9 100644 --- a/logprep/ng/util/defaults.py +++ b/logprep/ng/util/defaults.py @@ -36,7 +36,7 @@ class EXITCODES(IntEnum): "version": 1, "formatters": { "logprep": { - "class": "logprep.ng.util.logging.LogprepFormatter", + "class": "logprep.ng.util.logprep_logging.LogprepFormatter", "format": DEFAULT_LOG_FORMAT, "datefmt": DEFAULT_LOG_DATE_FORMAT, } diff --git a/logprep/ng/util/logging.py b/logprep/ng/util/logprep_logging.py similarity index 100% rename from logprep/ng/util/logging.py rename to logprep/ng/util/logprep_logging.py diff --git a/logprep/ng/util/worker/__init__.py b/logprep/ng/util/worker/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/logprep/ng/util/worker/worker.py b/logprep/ng/util/worker/worker.py index 5e2fca9e1..532a7fbd5 100644 --- a/logprep/ng/util/worker/worker.py +++ b/logprep/ng/util/worker/worker.py @@ -19,7 +19,7 @@ from logprep.ng.util.worker.types import AsyncHandler, SizeLimitedQueue -logger = logging.getLogger("Worker") # pylint: disable=no-member +logger = logging.getLogger("Worker") T = TypeVar("T") Input = TypeVar("Input") diff --git a/logprep/run_ng.py b/logprep/run_ng.py index 248bf0684..0f13f0bd8 100644 --- a/logprep/run_ng.py +++ b/logprep/run_ng.py @@ -5,12 +5,14 @@ import logging import signal import sys +from functools import partial from multiprocessing import set_start_method import click import uvloop from logprep.ng.runner import Runner +from logprep.ng.util.async_helpers import asyncio_exception_handler from logprep.ng.util.configuration import Configuration, InvalidConfigurationError from logprep.util.defaults import EXITCODES from logprep.util.helper import get_versions_string @@ -20,7 +22,7 @@ init_yaml_loader_tags("safe", "rt") -logger = logging.getLogger("root") # pylint: disable=no-member +logger = logging.getLogger("root") def _print_version(config: "Configuration") -> None: @@ -99,6 +101,9 @@ async def _run(configs_: tuple[str], version_=None): # pylint: enable=broad-except with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner: + handler = partial(asyncio_exception_handler, logger=logger) + loop = runner.get_loop() + loop.set_exception_handler(handler) runner.run(_run(configs, version)) From dea4dbd4870db49ae5fcae95c37454b32274aef9 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Wed, 25 Mar 2026 14:08:36 +0100 Subject: [PATCH 06/26] feat: improve config refresh sync/async --- logprep/ng/runner.py | 59 +++++++++++++++++--------------- logprep/ng/util/configuration.py | 54 +++++++++++++++++++++++------ 2 files changed, 76 insertions(+), 37 deletions(-) diff --git a/logprep/ng/runner.py b/logprep/ng/runner.py index 6e91e7335..23947d7fd 100644 --- a/logprep/ng/runner.py +++ b/logprep/ng/runner.py @@ -54,33 +54,38 @@ async def _refresh_configuration_gen(self) -> AsyncGenerator[Configuration, None self._running_config_version = self.config.version refresh_interval = self.config.config_refresh_interval - while True: - self.config.refresh() - - if self.config.version != self._running_config_version: - logger.info(f"Detected new config version: {self.config.version}") - - self._running_config_version = self.config.version - refresh_interval = self.config.config_refresh_interval - - yield self.config - - if refresh_interval is not None: - try: - await asyncio.sleep( - # realistic bad case: starting to sleep just a moment before scheduled time - # unlikely worst case: starting to sleep even after scheduled time - # (if yield takes some time and interval is short) - # --> compensate bad case by giving an upper boundary to the deviation - refresh_interval - * MAX_CONFIG_REFRESH_INTERVAL_DEVIATION_PERCENT - ) - except asyncio.CancelledError: - logger.debug("Config refresh cancelled. Exiting...") - raise - else: - logger.debug("Config refresh has been disabled.") - break + try: + while True: + self.config.refresh() + + if self.config.version != self._running_config_version: + logger.info(f"Detected new config version: {self.config.version}") + + self._running_config_version = self.config.version + refresh_interval = self.config.config_refresh_interval + + yield self.config + + if refresh_interval is not None: + try: + await asyncio.sleep( + # realistic bad case: starting to sleep just a moment before scheduled time + # unlikely worst case: starting to sleep even after scheduled time + # (if yield takes some time and interval is short) + # --> compensate bad case by giving an upper boundary to the deviation + refresh_interval + * MAX_CONFIG_REFRESH_INTERVAL_DEVIATION_PERCENT + ) + except asyncio.CancelledError: + logger.debug("Config refresh cancelled. Exiting...") + raise + else: + logger.debug("Config refresh has been disabled.") + break + except Exception: + raise + finally: + self.config.stop_config_refresh() async def run(self) -> None: """Run the runner and continuously process events until stopped.""" diff --git a/logprep/ng/util/configuration.py b/logprep/ng/util/configuration.py index 3872a2291..d4bb41f71 100644 --- a/logprep/ng/util/configuration.py +++ b/logprep/ng/util/configuration.py @@ -202,6 +202,7 @@ from typing import Any, Iterable, List, Optional, Sequence, Tuple from attrs import asdict, define, field, fields, validators +from numpy.distutils.conv_template import named_re from requests import RequestException from ruamel.yaml import YAML from ruamel.yaml.compat import StringIO @@ -673,9 +674,9 @@ class Configuration: _config_failure: bool = field(default=False, repr=False, eq=False, init=False) - _background_tasks: set = field( - factory=set, - validator=validators.instance_of(set), + _refresh_task: asyncio.Task | None = field( + default=None, + validator=validators.optional(validators.instance_of(asyncio.Task)), repr=False, eq=False, init=False, @@ -696,8 +697,8 @@ class Configuration: "_scheduler", "_metrics", "_unserializable_fields", + "_refresh_task", "_reload_lock", - "_background_tasks", ) @define(kw_only=True) @@ -952,6 +953,12 @@ def schedule_config_refresh(self) -> None: if self.config_refresh_interval is None: if scheduler.jobs: scheduler.cancel_job(scheduler.jobs[0]) + + if self._refresh_task is not None: + self._refresh_task.cancel() + + self._refresh_task = None + return self.config_refresh_interval = max( @@ -960,9 +967,17 @@ def schedule_config_refresh(self) -> None: refresh_interval = self.config_refresh_interval if scheduler.jobs: scheduler.cancel_job(scheduler.jobs[0]) + + if self._refresh_task is not None: + self._refresh_task.cancel() + + self._refresh_task = None + if isinstance(refresh_interval, int): async def _reload_wrapper() -> None: + current_task = asyncio.current_task() + if self._reload_lock.locked(): logger.warning( "config reload already running; skipping scheduled config reload run", @@ -972,23 +987,42 @@ async def _reload_wrapper() -> None: async with self._reload_lock: try: await self.reload() + except asyncio.CancelledError: + logger.info("scheduled config reload task cancelled") + raise except Exception: - logger.exception("config reload failed") + logger.exception("scheduled config reload failed") raise + finally: + if self._refresh_task is current_task: + self._refresh_task = None def _schedule_reload() -> None: - task = asyncio.create_task(_reload_wrapper()) - self._background_tasks.add(task) - task.add_done_callback(self._background_tasks.discard) + old_task = self._refresh_task - scheduler.every(refresh_interval).seconds.do(_schedule_reload) + if old_task is not None: + old_task.cancel() + + self._refresh_task = asyncio.create_task(_reload_wrapper()) - logger.info("Config refresh interval is set to: %s seconds", refresh_interval) + scheduler.every(refresh_interval).seconds.do(_schedule_reload) + logger.info(f"Config refresh interval is set to: {refresh_interval} seconds") def refresh(self) -> None: """Wrap the scheduler run_pending method hide the implementation details.""" self._scheduler.run_pending() + def stop_config_refresh(self) -> None: + """Stop scheduled config refresh.""" + + self._scheduler.clear() + + task = self._refresh_task + self._refresh_task = None + + if task is not None: + task.cancel() + def _set_attributes_from_configs(self) -> None: for attribute in filter(lambda x: x.repr, fields(self.__class__)): setattr( From 8ac6cd2ba1826e1cb9caf9f2c78b9b01c7a977fe Mon Sep 17 00:00:00 2001 From: David Kaya Date: Thu, 26 Mar 2026 07:18:16 +0100 Subject: [PATCH 07/26] feat: improve config refresh setup and teardown logic + improve types for Worker and some logs --- logprep/ng/runner.py | 4 ++-- logprep/ng/util/configuration.py | 2 ++ logprep/ng/util/worker/worker.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/logprep/ng/runner.py b/logprep/ng/runner.py index 23947d7fd..8990f9c44 100644 --- a/logprep/ng/runner.py +++ b/logprep/ng/runner.py @@ -84,8 +84,6 @@ async def _refresh_configuration_gen(self) -> AsyncGenerator[Configuration, None break except Exception: raise - finally: - self.config.stop_config_refresh() async def run(self) -> None: """Run the runner and continuously process events until stopped.""" @@ -131,6 +129,8 @@ async def start_pipeline(config: Configuration) -> asyncio.Task: logger.debug("Task group terminated") case _: raise + finally: + self.config.stop_config_refresh() logger.debug("End log processing.") diff --git a/logprep/ng/util/configuration.py b/logprep/ng/util/configuration.py index d4bb41f71..5cbebd6a9 100644 --- a/logprep/ng/util/configuration.py +++ b/logprep/ng/util/configuration.py @@ -1023,6 +1023,8 @@ def stop_config_refresh(self) -> None: if task is not None: task.cancel() + logger.debug("Config refresh task cancelled") + def _set_attributes_from_configs(self) -> None: for attribute in filter(lambda x: x.repr, fields(self.__class__)): setattr( diff --git a/logprep/ng/util/worker/worker.py b/logprep/ng/util/worker/worker.py index 532a7fbd5..d0ad36b1b 100644 --- a/logprep/ng/util/worker/worker.py +++ b/logprep/ng/util/worker/worker.py @@ -37,7 +37,7 @@ def __init__( batch_size: int, batch_interval_s: float, handler: AsyncHandler[Input, Output], - in_queue: asyncio.Queue[Input] | AsyncIterator[Input], + in_queue: SizeLimitedQueue[Input] | AsyncIterator[Input], out_queue: SizeLimitedQueue[Output] | None = None, ) -> None: self.name = name From 4815394b5bc3e61956d5a502a3fb3f553985f151 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Thu, 26 Mar 2026 07:33:13 +0100 Subject: [PATCH 08/26] refactor: adjust naming to follow Python conventions (shadowing) --- logprep/run_ng.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/logprep/run_ng.py b/logprep/run_ng.py index 0f13f0bd8..7146142fa 100644 --- a/logprep/run_ng.py +++ b/logprep/run_ng.py @@ -72,8 +72,8 @@ def run(configs: tuple[str], version=None) -> None: async def _run(configs_: tuple[str], version_=None): configuration = await _get_configuration(configs_) - _runner = Runner(configuration) - _runner.setup_logging() + runner_ = Runner(configuration) + runner_.setup_logging() if version_: _print_version(configuration) for v in get_versions_string(configuration).split("\n"): @@ -85,7 +85,7 @@ async def _run(configs_: tuple[str], version_=None): signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) logger.debug("Configuration loaded") - await _runner.run() + await runner_.run() except SystemExit as error: logger.debug(f"Exit received with code {error.code}") sys.exit(error.code) @@ -95,8 +95,8 @@ async def _run(configs_: tuple[str], version_=None): except Exception as error: logger.exception(f"A critical error occurred: {error}") - if _runner: - _runner.stop() + if runner_: + runner_.stop() sys.exit(EXITCODES.ERROR) # pylint: enable=broad-except From a2c4183d1be0c734ada1f9640c7abb0169c8632a Mon Sep 17 00:00:00 2001 From: David Kaya Date: Mon, 30 Mar 2026 09:15:57 +0200 Subject: [PATCH 09/26] refactor: remove print --- logprep/ng/runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/logprep/ng/runner.py b/logprep/ng/runner.py index 8990f9c44..42a38e7a1 100644 --- a/logprep/ng/runner.py +++ b/logprep/ng/runner.py @@ -94,7 +94,6 @@ async def run(self) -> None: tg.create_task(TerminateTaskGroup.raise_on_event(self._stop_event)) async def start_pipeline(config: Configuration) -> asyncio.Task: - logger.debug(">>>>> Starting pipeline") self._pipeline_manager = PipelineManager( config, shutdown_timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT ) From 50ea0b8d346b6b0a04f6a3252dea0b852764dda0 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Mon, 30 Mar 2026 11:27:59 +0200 Subject: [PATCH 10/26] refactor: remove unused import --- logprep/ng/util/configuration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/logprep/ng/util/configuration.py b/logprep/ng/util/configuration.py index 5cbebd6a9..9c8c6e7e1 100644 --- a/logprep/ng/util/configuration.py +++ b/logprep/ng/util/configuration.py @@ -202,7 +202,6 @@ from typing import Any, Iterable, List, Optional, Sequence, Tuple from attrs import asdict, define, field, fields, validators -from numpy.distutils.conv_template import named_re from requests import RequestException from ruamel.yaml import YAML from ruamel.yaml.compat import StringIO From 6b6990cbdbf60ed8376ca2309fa1a084889938b4 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 31 Mar 2026 11:00:20 +0200 Subject: [PATCH 11/26] refactor: simplify config refresh --- logprep/ng/runner.py | 70 ++++++++-------- logprep/ng/util/configuration.py | 136 ++----------------------------- 2 files changed, 45 insertions(+), 161 deletions(-) diff --git a/logprep/ng/runner.py b/logprep/ng/runner.py index 42a38e7a1..975beaa18 100644 --- a/logprep/ng/runner.py +++ b/logprep/ng/runner.py @@ -14,7 +14,7 @@ from logprep.ng.manager import PipelineManager from logprep.ng.util.async_helpers import TerminateTaskGroup, restart_task_on_iter from logprep.ng.util.configuration import Configuration -from logprep.ng.util.defaults import DEFAULT_LOG_CONFIG +from logprep.ng.util.defaults import DEFAULT_LOG_CONFIG, MIN_CONFIG_REFRESH_INTERVAL logger = logging.getLogger("Runner") @@ -46,44 +46,48 @@ def __init__(self, configuration: Configuration) -> None: self._task_group = asyncio.TaskGroup() self._stop_event = asyncio.Event() - self._pipeline_manager: PipelineManager | None = None - async def _refresh_configuration_gen(self) -> AsyncGenerator[Configuration, None]: - self.config.schedule_config_refresh() - self._running_config_version = self.config.version refresh_interval = self.config.config_refresh_interval - try: - while True: - self.config.refresh() + if refresh_interval is None: + logger.debug("Config refresh has been disabled.") + return + + loop = asyncio.get_running_loop() + next_run = loop.time() + refresh_interval + + while True: + sleep_time = next_run - loop.time() + if sleep_time < 0: + sleep_time = 0.0 + + try: + await asyncio.sleep(sleep_time) + except asyncio.CancelledError: + logger.debug("Config refresh cancelled. Exiting...") + raise + try: + await self.config.reload() + except asyncio.CancelledError: + logger.debug("Config reload cancelled. Exiting...") + raise + except Exception: + logger.exception("scheduled config reload failed") + raise + else: if self.config.version != self._running_config_version: logger.info(f"Detected new config version: {self.config.version}") - self._running_config_version = self.config.version - refresh_interval = self.config.config_refresh_interval - yield self.config - if refresh_interval is not None: - try: - await asyncio.sleep( - # realistic bad case: starting to sleep just a moment before scheduled time - # unlikely worst case: starting to sleep even after scheduled time - # (if yield takes some time and interval is short) - # --> compensate bad case by giving an upper boundary to the deviation - refresh_interval - * MAX_CONFIG_REFRESH_INTERVAL_DEVIATION_PERCENT - ) - except asyncio.CancelledError: - logger.debug("Config refresh cancelled. Exiting...") - raise - else: - logger.debug("Config refresh has been disabled.") - break - except Exception: - raise + refresh_interval = self.config.config_refresh_interval + if refresh_interval is None: + logger.debug("Config refresh has been disabled.") + break + + next_run += refresh_interval async def run(self) -> None: """Run the runner and continuously process events until stopped.""" @@ -94,13 +98,13 @@ async def run(self) -> None: tg.create_task(TerminateTaskGroup.raise_on_event(self._stop_event)) async def start_pipeline(config: Configuration) -> asyncio.Task: - self._pipeline_manager = PipelineManager( + pipeline_manager = PipelineManager( config, shutdown_timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT ) - await self._pipeline_manager.setup() + await pipeline_manager.setup() return tg.create_task( - self._pipeline_manager.run(), + pipeline_manager.run(), name="pipeline_manager", ) @@ -128,8 +132,6 @@ async def start_pipeline(config: Configuration) -> asyncio.Task: logger.debug("Task group terminated") case _: raise - finally: - self.config.stop_config_refresh() logger.debug("End log processing.") diff --git a/logprep/ng/util/configuration.py b/logprep/ng/util/configuration.py index 9c8c6e7e1..d674dfc97 100644 --- a/logprep/ng/util/configuration.py +++ b/logprep/ng/util/configuration.py @@ -190,7 +190,6 @@ group.id: test" """ -import asyncio import json import logging import os @@ -206,7 +205,6 @@ from ruamel.yaml import YAML from ruamel.yaml.compat import StringIO from ruamel.yaml.scanner import ScannerError -from schedule import Scheduler from logprep.abc.getter import Getter from logprep.factory import Factory @@ -663,41 +661,14 @@ class Configuration: validator=validators.instance_of(tuple), factory=tuple, repr=False, eq=False ) - _scheduler: Scheduler = field( - factory=Scheduler, - validator=validators.instance_of(Scheduler), - repr=False, - eq=False, - init=False, - ) - _config_failure: bool = field(default=False, repr=False, eq=False, init=False) - _refresh_task: asyncio.Task | None = field( - default=None, - validator=validators.optional(validators.instance_of(asyncio.Task)), - repr=False, - eq=False, - init=False, - ) - - _reload_lock: asyncio.Lock = field( - factory=asyncio.Lock, - validator=validators.instance_of(asyncio.Lock), - repr=False, - eq=False, - init=False, - ) - _unserializable_fields = ( "_getter", "_configs", "_config_failure", - "_scheduler", "_metrics", "_unserializable_fields", - "_refresh_task", - "_reload_lock", ) @define(kw_only=True) @@ -894,8 +865,16 @@ async def reload(self) -> None: errors: List[Exception] = [] try: new_config = await Configuration.from_sources(self.config_paths) + refresh_interval = ( + MIN_CONFIG_REFRESH_INTERVAL + if self.config_refresh_interval is None + else max( + self.config_refresh_interval, + MIN_CONFIG_REFRESH_INTERVAL, + ) + ) if new_config.config_refresh_interval is None: - new_config.config_refresh_interval = self.config_refresh_interval + new_config.config_refresh_interval = refresh_interval self._configs = new_config._configs # pylint: disable=protected-access self._set_attributes_from_configs() self._set_version_info_metric() @@ -925,105 +904,8 @@ def _set_config_refresh_interval(self, config_refresh_interval: int | None) -> N return config_refresh_interval = max(config_refresh_interval, MIN_CONFIG_REFRESH_INTERVAL) self.config_refresh_interval = config_refresh_interval - self.schedule_config_refresh() self._metrics.config_refresh_interval += config_refresh_interval - def schedule_config_refresh(self) -> None: - """ - Schedules a periodic configuration refresh based on the specified interval. - - Cancels any existing scheduled configuration refresh job and schedules a new one - using the current :code:`config_refresh_interval`. - The refresh job will call the :code:`reload` method at the specified interval - in seconds on invoking the :code:`refresh` method. - - Notes - ----- - - Only one configuration refresh job is scheduled at a time - - Any existing job is cancelled before scheduling a new one. - - The interval must be an integer representing seconds. - - Examples - -------- - >>> self.schedule_config_refresh() - Config refresh interval is set to: 60 seconds - """ - scheduler = self._scheduler - if self.config_refresh_interval is None: - if scheduler.jobs: - scheduler.cancel_job(scheduler.jobs[0]) - - if self._refresh_task is not None: - self._refresh_task.cancel() - - self._refresh_task = None - - return - - self.config_refresh_interval = max( - self.config_refresh_interval, MIN_CONFIG_REFRESH_INTERVAL - ) - refresh_interval = self.config_refresh_interval - if scheduler.jobs: - scheduler.cancel_job(scheduler.jobs[0]) - - if self._refresh_task is not None: - self._refresh_task.cancel() - - self._refresh_task = None - - if isinstance(refresh_interval, int): - - async def _reload_wrapper() -> None: - current_task = asyncio.current_task() - - if self._reload_lock.locked(): - logger.warning( - "config reload already running; skipping scheduled config reload run", - ) - return - - async with self._reload_lock: - try: - await self.reload() - except asyncio.CancelledError: - logger.info("scheduled config reload task cancelled") - raise - except Exception: - logger.exception("scheduled config reload failed") - raise - finally: - if self._refresh_task is current_task: - self._refresh_task = None - - def _schedule_reload() -> None: - old_task = self._refresh_task - - if old_task is not None: - old_task.cancel() - - self._refresh_task = asyncio.create_task(_reload_wrapper()) - - scheduler.every(refresh_interval).seconds.do(_schedule_reload) - logger.info(f"Config refresh interval is set to: {refresh_interval} seconds") - - def refresh(self) -> None: - """Wrap the scheduler run_pending method hide the implementation details.""" - self._scheduler.run_pending() - - def stop_config_refresh(self) -> None: - """Stop scheduled config refresh.""" - - self._scheduler.clear() - - task = self._refresh_task - self._refresh_task = None - - if task is not None: - task.cancel() - - logger.debug("Config refresh task cancelled") - def _set_attributes_from_configs(self) -> None: for attribute in filter(lambda x: x.repr, fields(self.__class__)): setattr( From d8422c15cfbf45fa1b2b9c1e7017edb75f37c858 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 31 Mar 2026 11:05:30 +0200 Subject: [PATCH 12/26] refactor: annotation --- logprep/ng/util/async_helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logprep/ng/util/async_helpers.py b/logprep/ng/util/async_helpers.py index 972d723b8..842fd2f1c 100644 --- a/logprep/ng/util/async_helpers.py +++ b/logprep/ng/util/async_helpers.py @@ -115,7 +115,7 @@ async def restart_task_on_iter( def asyncio_exception_handler( - loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + _: asyncio.AbstractEventLoop, context: dict, logger: Logger, ) -> None: @@ -126,7 +126,7 @@ def asyncio_exception_handler( Does not handle exceptions from awaited coroutines (e.g. runner.run()). Args: - loop: The current event loop. + _: The current event loop. Currently not used. context: Asyncio error context (may contain message, exception, task/future). logger: Logger used to record the error. """ From 99ef937e8a31f60f61a25284405373ce53ea4e09 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 31 Mar 2026 11:07:35 +0200 Subject: [PATCH 13/26] refactor: improve exception handler --- logprep/ng/util/async_helpers.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/logprep/ng/util/async_helpers.py b/logprep/ng/util/async_helpers.py index 842fd2f1c..016d58517 100644 --- a/logprep/ng/util/async_helpers.py +++ b/logprep/ng/util/async_helpers.py @@ -124,23 +124,21 @@ def asyncio_exception_handler( Covers exceptions from background tasks, callbacks, and loop internals. Does not handle exceptions from awaited coroutines (e.g. runner.run()). - - Args: - _: The current event loop. Currently not used. - context: Asyncio error context (may contain message, exception, task/future). - logger: Logger used to record the error. """ msg = context.get("message", "Unhandled exception in event loop") exception = context.get("exception") task = context.get("task") or context.get("future") - logger.error(f"[asyncio] {msg}") + logger.error(f"{msg}") if task: - logger.error(f"[asyncio] Task: {task!r}") + logger.error(f"Task: {task!r}") + + if isinstance(task, asyncio.Task): + logger.error(f"Task name: {task.get_name()}") if exception: - logger.error("[asyncio] Exception:", exc_info=exception) + logger.error(f"Unhandled exception: {exception!r}", exc_info=exception) else: - logger.error("[asyncio] Context: %s", context) + logger.error(f"Context: {context!r}") From 74a1b4358916cf28d5aee7414060f70ea24aa136 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 7 Apr 2026 06:42:37 +0200 Subject: [PATCH 14/26] fix: correct kafka delivery semantics and unify async shutdown lifecycle across components - unify component lifecycle by introducing async setup/shut_down across NG components - remove legacy _shut_down pattern and simplify base Component shutdown logic - align Connector/Input/Output/Processor lifecycle interfaces - fix kafka output delivery semantics by setting DELIVERED only via on_delivery callback - improve kafka error handling (BufferError retry, KafkaException -> FAILED) - ensure proper resource cleanup (consumer unsubscribe/close, producer flush, opensearch context close) - improve worker shutdown by cancelling only unfinished tasks # Conflicts: # logprep/ng/connector/opensearch/output.py --- logprep/abc/component.py | 9 +-- logprep/abc/connector.py | 10 +++ logprep/ng/abc/component.py | 8 +- logprep/ng/abc/input.py | 10 +++ logprep/ng/abc/output.py | 14 +++- logprep/ng/abc/processor.py | 7 ++ logprep/ng/connector/confluent_kafka/input.py | 26 +++--- .../ng/connector/confluent_kafka/output.py | 79 +++++++++++++------ logprep/ng/connector/file/input.py | 4 +- logprep/ng/connector/http/input.py | 4 +- logprep/ng/manager.py | 7 +- logprep/ng/util/worker/worker.py | 10 ++- 12 files changed, 131 insertions(+), 57 deletions(-) diff --git a/logprep/abc/component.py b/logprep/abc/component.py index 6fa96c84a..4de66a42b 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -142,11 +142,7 @@ def _clear_properties(self) -> None: if hasattr(self, "__dict__"): self.__dict__.clear() - def _shut_down(self) -> None: - self._clear_scheduled_jobs() - self._clear_properties() - - async def shut_down(self): + def shut_down(self): """Stop processing of this component. Optional: Called when stopping the pipeline @@ -154,7 +150,8 @@ async def shut_down(self): """ if not self._is_shut_down: self._is_shut_down = True - self._shut_down() + self._clear_scheduled_jobs() + self._clear_properties() def health(self) -> bool: """Check the health of the component. diff --git a/logprep/abc/connector.py b/logprep/abc/connector.py index 7749687a3..839ac61cf 100644 --- a/logprep/abc/connector.py +++ b/logprep/abc/connector.py @@ -48,3 +48,13 @@ class Metrics(NgComponent.Metrics): ) ) """Number of errors that occurred while processing events""" + + async def setup(self) -> None: + """Set up the connector.""" + + await super().setup() + + async def shut_down(self) -> None: + """Shutdown the connector and cleanup resources.""" + + await super().shut_down() diff --git a/logprep/ng/abc/component.py b/logprep/ng/abc/component.py index 551a7a391..0bb618e56 100644 --- a/logprep/ng/abc/component.py +++ b/logprep/ng/abc/component.py @@ -16,9 +16,13 @@ class NgComponent(Component): # This is unclean from an interface perspective, but works if the worlds doen't mix. async def setup(self) -> None: - return super().setup() + """Set up the ng component.""" + + super().setup() async def shut_down(self) -> None: - return super().shut_down() + """Shut down ng component and cleanup resources.""" + + super().shut_down() # pylint: enable=invalid-overridden-method,useless-parent-delegation diff --git a/logprep/ng/abc/input.py b/logprep/ng/abc/input.py index 547759467..9f908e0d0 100644 --- a/logprep/ng/abc/input.py +++ b/logprep/ng/abc/input.py @@ -504,3 +504,13 @@ def _add_hmac_to( } add_fields_to(event_dict, new_field) return event_dict + + async def setup(self) -> None: + """Set up the input connector.""" + + await super().setup() + + async def shut_down(self) -> None: + """Shut down input components and cleanup resources.""" + + await super().shut_down() diff --git a/logprep/ng/abc/output.py b/logprep/ng/abc/output.py index 979945560..ab4f71050 100644 --- a/logprep/ng/abc/output.py +++ b/logprep/ng/abc/output.py @@ -144,7 +144,13 @@ def wrapper(self, *args, **kwargs): return wrapper - def _shut_down(self) -> None: - """Shut down the output connector.""" - self.flush() - return super()._shut_down() + async def setup(self) -> None: + """Set up the output connector.""" + + await super().setup() + + async def shut_down(self) -> None: + """Shut down the output connector and cleanup resources.""" + + await self.flush() + await super().shut_down() diff --git a/logprep/ng/abc/processor.py b/logprep/ng/abc/processor.py index cb730960b..d22602ef9 100644 --- a/logprep/ng/abc/processor.py +++ b/logprep/ng/abc/processor.py @@ -256,6 +256,13 @@ def _write_target_field(self, event: dict, rule: "Rule", result: Any) -> None: ) async def setup(self) -> None: + """Set up the processor.""" + await super().setup() for rule in self.rules: _ = rule.metrics # initialize metrics to show them on startup + + async def shut_down(self) -> None: + """Shut down the processor and run required cleanups""" + + await super().shut_down() diff --git a/logprep/ng/connector/confluent_kafka/input.py b/logprep/ng/connector/confluent_kafka/input.py index 5765c0d4f..f605d5108 100644 --- a/logprep/ng/connector/confluent_kafka/input.py +++ b/logprep/ng/connector/confluent_kafka/input.py @@ -346,7 +346,7 @@ async def get_consumer(self, max_workers: int = 4) -> AIOConsumer: return self._consumer - def _error_callback(self, error: KafkaException) -> None: + async def _error_callback(self, error: KafkaException) -> None: """Callback for generic/global error events, these errors are typically to be considered informational since the client will automatically try to recover. This callback is served upon calling client.poll() @@ -359,7 +359,7 @@ def _error_callback(self, error: KafkaException) -> None: self.metrics.number_of_errors += 1 logger.error("%s: %s", self.describe(), error) - def _stats_callback(self, stats_raw: str) -> None: + async def _stats_callback(self, stats_raw: str) -> None: """Callback for statistics data. This callback is triggered by poll() or flush every `statistics.interval.ms` (needs to be configured separately) @@ -393,7 +393,7 @@ def _stats_callback(self, stats_raw: str) -> None: "assignment_size", DEFAULT_RETURN ) - def _commit_callback( + async def _commit_callback( self, error: KafkaException | None, topic_partitions: list[TopicPartition], @@ -605,12 +605,6 @@ async def _get_memberid(self) -> str | None: logger.error("Failed to retrieve member ID: %s", error) return member_id - async def shut_down(self) -> None: - """Close consumer, which also commits kafka offsets.""" - consumer = await self.get_consumer() - await consumer.close() - super()._shut_down() - def health(self) -> bool: """Check the health of the component. @@ -635,7 +629,8 @@ async def acknowledge(self, events: list[LogEvent]): logger.debug("acknowledge called") async def setup(self): - """Set the component up.""" + """Set the confluent kafka input connector.""" + await super().setup() try: @@ -649,3 +644,14 @@ async def setup(self): ) except KafkaException as error: raise FatalInputError(self, f"Could not setup kafka consumer: {error}") from error + + async def shut_down(self) -> None: + """Shut down the confluent kafka input connector and cleanup resources.""" + + consumer = await self.get_consumer() + + if consumer is not None: + await consumer.unsubscribe() + await consumer.close() + + await super().shut_down() diff --git a/logprep/ng/connector/confluent_kafka/output.py b/logprep/ng/connector/confluent_kafka/output.py index e4356f08f..3ce1a75e8 100644 --- a/logprep/ng/connector/confluent_kafka/output.py +++ b/logprep/ng/connector/confluent_kafka/output.py @@ -285,14 +285,12 @@ def describe(self) -> str: async def store_batch( self, events: Sequence[Event], target: str | None = None - ) -> tuple[Sequence[Event], Sequence[Event]]: + ) -> Sequence[Event]: store_target = target if target is not None else self.config.topic for event in events: await self.store_custom(event, store_target) - return ( - [e for e in events if e.state == EventStateType.DELIVERED], - [e for e in events if e.state == EventStateType.FAILED], - ) + + return events async def store(self, event: Event) -> None: """Store a document in the producer topic. @@ -326,14 +324,52 @@ async def store_custom(self, event: Event, target: str) -> None: value=self._encoder.encode(document), on_delivery=partial(self.on_delivery, event), ) - logger.debug("Produced message %s to topic %s", str(document), target) - self._producer.poll(self.config.send_timeout) - self._producer.flush() + except BufferError: - # block program until buffer is empty or timeout is reached self._producer.flush(timeout=self.config.flush_timeout) logger.debug("Buffer full, flushing") + try: + self._producer.produce( + topic=target, + value=self._encoder.encode(document), + on_delivery=partial(self.on_delivery, event), + ) + except BufferError as err: + event.state.current_state = EventStateType.FAILED + event.errors.append(err) + logger.error("Message delivery failed after retry: %s", err) + self.metrics.number_of_errors += 1 + return + + except KafkaException as err: + event.state.current_state = EventStateType.FAILED + event.errors.append(err) + logger.error("Kafka exception during produce: %s", err) + self.metrics.number_of_errors += 1 + return + + logger.debug("Produced message %s to topic %s", str(document), target) + self._producer.poll(self.config.send_timeout) + + def on_delivery(self, event: Event, err: KafkaException, msg: Message) -> None: + """Callback for delivery reports.""" + + if err is not None: + event.state.current_state = EventStateType.FAILED + event.errors.append(err) + logger.error("Message delivery failed: %s", err) + self.metrics.number_of_errors += 1 + return + + event.state.current_state = EventStateType.DELIVERED + logger.debug( + "Message delivered to '%s' partition %s, offset %s", + msg.topic(), + msg.partition(), + msg.offset(), + ) + async def flush(self) -> None: """ensures that all messages are flushed. According to https://confluent-kafka-python.readthedocs.io/en/latest/#confluent_kafka.Producer.flush @@ -364,24 +400,17 @@ def health(self) -> bool: return super().health() async def setup(self) -> None: - """Set the component up.""" + """Set the confluent kafka output connector.""" + try: await super().setup() except KafkaException as error: raise FatalOutputError(self, f"Could not setup kafka producer: {error}") from error - def on_delivery(self, event: Event, err: KafkaException, msg: Message) -> None: - """Callback for delivery reports.""" - if err is not None: - event.state.current_state = EventStateType.FAILED - event.errors.append(err) - logger.error("Message delivery failed: %s", err) - self.metrics.number_of_errors += 1 - return - event.state.current_state = EventStateType.DELIVERED - logger.debug( - "Message delivered to '%s' partition %s, offset %s", - msg.topic(), - msg.partition(), - msg.offset(), - ) + async def shut_down(self) -> None: + """Shut down the confluent kafka output connector and cleanup resources.""" + + if "_producer" in self.__dict__: + await self.flush() + + await super().shut_down() diff --git a/logprep/ng/connector/file/input.py b/logprep/ng/connector/file/input.py index e13269c5b..715274745 100644 --- a/logprep/ng/connector/file/input.py +++ b/logprep/ng/connector/file/input.py @@ -191,7 +191,7 @@ async def setup(self) -> None: file_name=self.config.logfile_path, ) - def _shut_down(self) -> None: + async def shut_down(self) -> None: """Raises the Stop Event Flag that will stop the thread that monitors the logfile""" self.stop_flag.set() - return super()._shut_down() + await super().shut_down() diff --git a/logprep/ng/connector/http/input.py b/logprep/ng/connector/http/input.py index f206dd684..7e09bea14 100644 --- a/logprep/ng/connector/http/input.py +++ b/logprep/ng/connector/http/input.py @@ -356,11 +356,11 @@ async def _get_event(self, timeout: float) -> tuple: except queue.Empty: return None, None, None - def _shut_down(self): + async def shut_down(self): """Raises Uvicorn HTTP Server internal stop flag and waits to join""" if self.http_server: self.http_server.shut_down() - return super()._shut_down() + await super().shut_down() @cached_property def health_endpoints(self) -> list[str]: diff --git a/logprep/ng/manager.py b/logprep/ng/manager.py index 934f17ab3..ad697823a 100644 --- a/logprep/ng/manager.py +++ b/logprep/ng/manager.py @@ -197,14 +197,14 @@ async def run(self) -> None: await self._orchestrator.run() except CancelledError: logger.debug("PipelineManager.run cancelled. Shutting down.") - await self._shut_down() + await self.shut_down() raise except Exception: logger.exception("PipelineManager.run failed. Shutting down.") - await self._shut_down() + await self.shut_down() raise - async def _shut_down(self) -> None: + async def shut_down(self) -> None: """Shut down runner components, and required runner attributes.""" logger.debug( @@ -222,6 +222,7 @@ async def _shut_down(self) -> None: if self._sender is not None: await self._sender.shut_down() # self._input_connector.acknowledge() + await self._input_connector.shut_down() len_delivered_events = len(list(self._event_backlog.get(EventStateType.DELIVERED))) if len_delivered_events: diff --git a/logprep/ng/util/worker/worker.py b/logprep/ng/util/worker/worker.py index d0ad36b1b..a4e7e73bd 100644 --- a/logprep/ng/util/worker/worker.py +++ b/logprep/ng/util/worker/worker.py @@ -292,7 +292,7 @@ async def shut_down(self, timeout_s: float) -> None: current_task = asyncio.current_task() tasks_but_current = [t for t in self._worker_tasks if t is not current_task] - logger.debug("waiting for termination of %d tasks", len(tasks_but_current)) + logger.debug(f"waiting for termination of {len(tasks_but_current)} tasks") try: await asyncio.wait_for( @@ -300,11 +300,15 @@ async def shut_down(self, timeout_s: float) -> None: ) except TimeoutError: unfinished_workers = [w for w in tasks_but_current if not w.done()] - if len(unfinished_workers) > 0: + if unfinished_workers: logger.debug( "[%d/%d] did not stop gracefully. Cancelling: [%s]", len(unfinished_workers), len(tasks_but_current), ", ".join(map(asyncio.Task.get_name, unfinished_workers)), ) - await asyncio.gather(*tasks_but_current, return_exceptions=True) + + for worker in unfinished_workers: + worker.cancel() + + await asyncio.gather(*unfinished_workers, return_exceptions=True) From 47ad52529121845e4e92f8d051c0a34c6270c123 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 31 Mar 2026 14:20:48 +0200 Subject: [PATCH 15/26] fix: prevent race condition between SIGINT handler and benchmark flow - remove docker compose teardown from SIGINT handler to avoid interfering with active OpenSearch requests - introduce coordinated shutdown via _shutdown_requested flag - add shutdown checkpoints to abort benchmark flow safely - ensure compose teardown happens only in controlled finally blocks - fix intermittent 503 errors during OpenSearch _count caused by concurrent shutdown --- benchmark.py | 115 ++++++++++++++++++++++++++-------------------- run_benchmarks.py | 11 +++-- 2 files changed, 73 insertions(+), 53 deletions(-) diff --git a/benchmark.py b/benchmark.py index 92b3118bd..546a0eab4 100644 --- a/benchmark.py +++ b/benchmark.py @@ -38,12 +38,17 @@ def _handle_sigint(signum, frame): """ - Handle Ctrl+C (SIGINT) and perform graceful shutdown. + Handle Ctrl+C (SIGINT) and request graceful shutdown. + + Avoid tearing down compose services directly from the signal handler, + because the main benchmark flow may still be interacting with them. + Cleanup is handled by the normal control flow / finally blocks. """ + del signum, frame # unused global _shutdown_requested _shutdown_requested = True - print("\n\n⚠ Ctrl+C detected. Shutting down benchmark...") + print("\n\n⚠ Ctrl+C detected. Stopping benchmark gracefully...") if _current_logprep_proc is not None: try: @@ -51,18 +56,13 @@ def _handle_sigint(signum, frame): except Exception: pass - if _current_compose_dir is not None and _current_env is not None: - try: - run_cmd( - ["docker", "compose", "down"], - cwd=_current_compose_dir, - env=_current_env, - ignore_error=True, - ) - except Exception: - pass - sys.exit(130) +def raise_if_shutdown_requested() -> None: + """ + Abort current benchmark flow if a shutdown was requested. + """ + if _shutdown_requested: + raise KeyboardInterrupt("Benchmark shutdown requested") # ------------------------- @@ -134,14 +134,10 @@ def print_benchmark_config(args: argparse.Namespace) -> None: for key in sorted(args_dict): value = args_dict[key] - # Format integers with underscore separator if isinstance(value, int): formatted = f"{value:_}" - - # Format list of integers (e.g. runs) elif isinstance(value, list) and all(isinstance(v, int) for v in value): formatted = "[" + ", ".join(f"{v:_}" for v in value) + "]" - else: formatted = value @@ -385,6 +381,7 @@ def wait_for_tcp(host: str, port: int, *, timeout_s: float, interval_s: float = last_err: OSError | None = None while time.time() < deadline: + raise_if_shutdown_requested() try: with socket.create_connection((host, port), timeout=2): return @@ -408,6 +405,7 @@ def wait_for_opensearch(opensearch_url: str, *, timeout_s: float, interval_s: fl last_err: Exception | None = None while time.time() < deadline: + raise_if_shutdown_requested() try: resp = requests.get(f"{opensearch_url}/_cluster/health", timeout=2) if resp.status_code == 200: @@ -436,6 +434,7 @@ def wait_for_kafka_topic( last_err: Exception | None = None while time.time() < deadline: + raise_if_shutdown_requested() try: proc = subprocess.run( [ @@ -569,11 +568,13 @@ def benchmark_run( _current_env = env try: + raise_if_shutdown_requested() ensure_vm_max_map_count() run_cmd(["docker", "compose", "down"], cwd=compose_dir, env=env) run_cmd(["docker", "volume", "rm", "compose_opensearch-data"], env=env, ignore_error=True) + raise_if_shutdown_requested() run_cmd( ["docker", "compose", "up", "-d", "--no-deps", *services], cwd=compose_dir, @@ -592,6 +593,8 @@ def benchmark_run( wait_for_tcp("127.0.0.1", 9200, timeout_s=float(sleep_after_compose_up_s)) wait_for_opensearch(opensearch_url, timeout_s=float(sleep_after_compose_up_s)) + raise_if_shutdown_requested() + batch_size = max(event_num // 10, 10) output_config = f'{{"bootstrap.servers": "{bootstrap_servers}"}}' @@ -612,7 +615,9 @@ def benchmark_run( env=env, ) + raise_if_shutdown_requested() time.sleep(sleep_after_generate_s) + raise_if_shutdown_requested() binary = "logprep-ng" if ng == 1 else "logprep" @@ -621,15 +626,18 @@ def benchmark_run( _current_logprep_proc = logprep_proc time.sleep(sleep_after_logprep_start_s) + raise_if_shutdown_requested() print("\n=== OpenSearch snapshot (before measurement) ===") opensearch_debug_snapshot(opensearch_url) + raise_if_shutdown_requested() baseline = opensearch_count_processed(opensearch_url, processed_index) startup_s = time.time() - t_startup t_run = time.time() time.sleep(run_seconds) + raise_if_shutdown_requested() kill_hard(logprep_proc) @@ -637,12 +645,13 @@ def benchmark_run( logprep_proc = None _current_logprep_proc = None - # ensure near-real-time writes are visible to _count before measuring + raise_if_shutdown_requested() opensearch_refresh(opensearch_url, processed_index) print("\n=== OpenSearch snapshot (after run / after refresh) ===") opensearch_debug_snapshot(opensearch_url) + raise_if_shutdown_requested() after = opensearch_count_processed(opensearch_url, processed_index) processed = max(0, after - baseline) @@ -838,36 +847,42 @@ def setup_output_tee(out_path: Path | None) -> None: if __name__ == "__main__": signal.signal(signal.SIGINT, _handle_sigint) - args_ = parse_args() - setup_output_tee(args_.out) - - print_benchmark_config(args_) - - pipeline_config_ = resolve_pipeline_config(args_.ng) - - results: list[RunResult] = [] - - benchmark_seconds = args_.runs - for run_idx, seconds in enumerate(benchmark_seconds, start=1): - print(f"----- Run Round {run_idx}: {seconds} seconds -----") - result = benchmark_run( - run_seconds=seconds, - ng=args_.ng, - event_num=args_.event_num, - prometheus_multiproc_dir=args_.prometheus_multiproc_dir, - compose_dir=args_.compose_dir, - pipeline_config=pipeline_config_, - gen_input_dir=args_.gen_input_dir, - bootstrap_servers=args_.bootstrap_servers, - sleep_after_compose_up_s=args_.sleep_after_compose_up_s, - sleep_after_generate_s=args_.sleep_after_generate_s, - sleep_after_logprep_start_s=args_.sleep_after_logprep_start_s, - opensearch_url=args_.opensearch_url, - processed_index=args_.processed_index, - services=args_.services, - ) - results.append(result) - print_single_run_result(result, event_num=args_.event_num) - print() + try: + args_ = parse_args() + setup_output_tee(args_.out) + + print_benchmark_config(args_) + + pipeline_config_ = resolve_pipeline_config(args_.ng) + + results: list[RunResult] = [] + + benchmark_seconds = args_.runs + for run_idx, seconds in enumerate(benchmark_seconds, start=1): + raise_if_shutdown_requested() + print(f"----- Run Round {run_idx}: {seconds} seconds -----") + result = benchmark_run( + run_seconds=seconds, + ng=args_.ng, + event_num=args_.event_num, + prometheus_multiproc_dir=args_.prometheus_multiproc_dir, + compose_dir=args_.compose_dir, + pipeline_config=pipeline_config_, + gen_input_dir=args_.gen_input_dir, + bootstrap_servers=args_.bootstrap_servers, + sleep_after_compose_up_s=args_.sleep_after_compose_up_s, + sleep_after_generate_s=args_.sleep_after_generate_s, + sleep_after_logprep_start_s=args_.sleep_after_logprep_start_s, + opensearch_url=args_.opensearch_url, + processed_index=args_.processed_index, + services=args_.services, + ) + results.append(result) + print_single_run_result(result, event_num=args_.event_num) + print() + + print_runs_table_and_summary(results) - print_runs_table_and_summary(results) + except KeyboardInterrupt: + print("\nBenchmark aborted.") + sys.exit(130) diff --git a/run_benchmarks.py b/run_benchmarks.py index 7a57df5cb..2f5f776f1 100644 --- a/run_benchmarks.py +++ b/run_benchmarks.py @@ -5,10 +5,10 @@ from datetime import datetime from pathlib import Path -PYTHON_VERSIONS = ["3.11"] # , "3.12", "3.13", "3.14"] +PYTHON_VERSIONS = ["3.11", "3.12", "3.13", "3.14"] MODES = [ - ("nonNG", "0"), ("asyncNG", "1"), + ("nonNG", "0"), ] @@ -32,9 +32,14 @@ def run_benchmarks() -> None: py, "benchmark.py", "--event-num", - "120000", + "250000", "--runs", "30", + "30", + "45", + "45", + "60", + "60", "--ng", ng_flag, "--out", From 47c22c85b70ed7708df49fe604b50bc54aa47970 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 31 Mar 2026 14:29:12 +0200 Subject: [PATCH 16/26] refactor: simplify worker shutdown after timeout - remove docker compose teardown from SIGINT handler to avoid interfering with active OpenSearch requests - introduce coordinated shutdown via _shutdown_requested flag - add shutdown checkpoints to abort benchmark flow safely - ensure compose teardown happens only in controlled finally blocks - fix intermittent 503 errors during OpenSearch _count caused by concurrent shutdown --- logprep/ng/util/worker/worker.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/logprep/ng/util/worker/worker.py b/logprep/ng/util/worker/worker.py index a4e7e73bd..687be98af 100644 --- a/logprep/ng/util/worker/worker.py +++ b/logprep/ng/util/worker/worker.py @@ -302,13 +302,9 @@ async def shut_down(self, timeout_s: float) -> None: unfinished_workers = [w for w in tasks_but_current if not w.done()] if unfinished_workers: logger.debug( - "[%d/%d] did not stop gracefully. Cancelling: [%s]", + "[%d/%d] did not stop gracefully. Awaiting cancellation: [%s]", len(unfinished_workers), len(tasks_but_current), ", ".join(map(asyncio.Task.get_name, unfinished_workers)), ) - - for worker in unfinished_workers: - worker.cancel() - await asyncio.gather(*unfinished_workers, return_exceptions=True) From b61b83fda5d2453fe713801304cab87228ba800a Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 31 Mar 2026 16:37:37 +0200 Subject: [PATCH 17/26] fix: clean up exporter port before and after logprep runs --- benchmark.py | 128 ++++++++++++++++++++++++++++++++++++++++++++++ run_benchmarks.py | 2 - 2 files changed, 128 insertions(+), 2 deletions(-) diff --git a/benchmark.py b/benchmark.py index 546a0eab4..a5fdce66f 100644 --- a/benchmark.py +++ b/benchmark.py @@ -35,6 +35,9 @@ _current_compose_dir: Path | None = None _current_env: dict[str, str] | None = None +# Exporter / metrics port used by logprep +EXPORTER_PORT = 8001 + def _handle_sigint(signum, frame): """ @@ -149,6 +152,7 @@ def print_benchmark_config(args: argparse.Namespace) -> None: print(f"{' ↳ mode':30s}: {mode}") print(f"{' ↳ pipeline_config':30s}: {pipeline_config}") + print(f"{'exporter_port':30s}: {EXPORTER_PORT}") print("================================\n") @@ -249,6 +253,118 @@ def kill_hard(proc: subprocess.Popen) -> None: proc.wait() +def is_tcp_port_open(host: str, port: int, timeout: float = 0.5) -> bool: + """ + Return True if a TCP connection to host:port can be established. + + Args: + host: Hostname or IP address. + port: TCP port. + timeout: Socket timeout in seconds. + + Returns: + True if the TCP port accepts connections, otherwise False. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(timeout) + return sock.connect_ex((host, port)) == 0 + + +def find_pids_listening_on_port(port: int) -> list[int]: + """ + Return a list of PIDs listening on the given TCP port. + + Uses lsof and returns an empty list if no processes are found. + + Args: + port: TCP port. + + Returns: + List of PIDs. + """ + result = subprocess.run( + ["lsof", "-ti", f":{port}"], + capture_output=True, + text=True, + check=False, + ) + + if result.returncode not in (0, 1): + raise RuntimeError( + f"Failed to query listener processes on port {port}: {result.stderr.strip()}" + ) + + pids: list[int] = [] + for line in result.stdout.splitlines(): + line = line.strip() + if line.isdigit(): + pids.append(int(line)) + + return pids + + +def kill_processes_listening_on_port( + port: int, + *, + sigterm_wait_s: float = 1.0, + final_wait_s: float = 2.0, +) -> None: + """ + Kill processes listening on the given TCP port. + + First sends SIGTERM, then SIGKILL if the port is still occupied. + + Args: + port: TCP port to free. + sigterm_wait_s: Time to wait after SIGTERM. + final_wait_s: Time to wait after SIGKILL. + + Raises: + RuntimeError: If the port is still in use after cleanup. + """ + pids = find_pids_listening_on_port(port) + if not pids: + return + + print( + f"Port {port} is already in use. Terminating listener processes: {', '.join(map(str, pids))}" + ) + + for pid in pids: + try: + os.kill(pid, signal.SIGTERM) + except ProcessLookupError: + pass + + deadline = time.time() + sigterm_wait_s + while time.time() < deadline: + if not is_tcp_port_open("127.0.0.1", port): + return + time.sleep(0.1) + + remaining_pids = find_pids_listening_on_port(port) + if remaining_pids: + print( + f"Port {port} still in use after SIGTERM. Sending SIGKILL to: " + f"{', '.join(map(str, remaining_pids))}" + ) + + for pid in remaining_pids: + try: + os.kill(pid, signal.SIGKILL) + except ProcessLookupError: + pass + + deadline = time.time() + final_wait_s + while time.time() < deadline: + if not is_tcp_port_open("127.0.0.1", port): + return + time.sleep(0.1) + + if is_tcp_port_open("127.0.0.1", port): + raise RuntimeError(f"Port {port} is still in use after cleanup.") + + def opensearch_refresh(opensearch_url: str, processed_index: str) -> None: """ Force a refresh of the processed index so counts reflect recent writes. @@ -621,6 +737,9 @@ def benchmark_run( binary = "logprep-ng" if ng == 1 else "logprep" + # Ensure exporter port is free before starting logprep. + kill_processes_listening_on_port(EXPORTER_PORT) + t_startup = time.time() logprep_proc = popen_cmd([binary, "run", str(pipeline_config)], env=env) _current_logprep_proc = logprep_proc @@ -641,6 +760,9 @@ def benchmark_run( kill_hard(logprep_proc) + # Ensure exporter port is released after forceful process termination. + kill_processes_listening_on_port(EXPORTER_PORT) + window_s = time.time() - t_run logprep_proc = None _current_logprep_proc = None @@ -662,6 +784,12 @@ def benchmark_run( finally: if logprep_proc is not None: kill_hard(logprep_proc) + + try: + kill_processes_listening_on_port(EXPORTER_PORT) + except Exception as exc: + print(f"Warning: failed to clean up exporter port {EXPORTER_PORT}: {exc}") + _current_logprep_proc = None run_cmd(["docker", "compose", "down"], cwd=compose_dir, env=env, ignore_error=True) diff --git a/run_benchmarks.py b/run_benchmarks.py index 2f5f776f1..8636e052d 100644 --- a/run_benchmarks.py +++ b/run_benchmarks.py @@ -38,8 +38,6 @@ def run_benchmarks() -> None: "30", "45", "45", - "60", - "60", "--ng", ng_flag, "--out", From b97d92446fe79018072ae333c7a532ef2aa7343d Mon Sep 17 00:00:00 2001 From: David Kaya Date: Thu, 2 Apr 2026 06:21:15 +0200 Subject: [PATCH 18/26] refactor: restore _shut_down hook to preserve idempotent and extensible shutdown semantics --- logprep/abc/component.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/logprep/abc/component.py b/logprep/abc/component.py index 4de66a42b..a80910464 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -142,6 +142,10 @@ def _clear_properties(self) -> None: if hasattr(self, "__dict__"): self.__dict__.clear() + def _shut_down(self) -> None: + self._clear_scheduled_jobs() + self._clear_properties() + def shut_down(self): """Stop processing of this component. @@ -150,8 +154,7 @@ def shut_down(self): """ if not self._is_shut_down: self._is_shut_down = True - self._clear_scheduled_jobs() - self._clear_properties() + self._shut_down() def health(self) -> bool: """Check the health of the component. From 99cd7ecace0fe3d1e6e2b893c85dc08671226d5c Mon Sep 17 00:00:00 2001 From: David Kaya Date: Thu, 2 Apr 2026 06:55:35 +0200 Subject: [PATCH 19/26] refactor: review issues --- logprep/ng/connector/confluent_kafka/input.py | 1 - pyproject.toml | 11 +---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/logprep/ng/connector/confluent_kafka/input.py b/logprep/ng/connector/confluent_kafka/input.py index f605d5108..c5dd8005e 100644 --- a/logprep/ng/connector/confluent_kafka/input.py +++ b/logprep/ng/connector/confluent_kafka/input.py @@ -501,7 +501,6 @@ async def _get_event(self, timeout: float) -> tuple: """ message = await self._get_raw_event(timeout) - # assert None not in (message.value(), message.partition(), message.offset()) if message is None: return None, None, None diff --git a/pyproject.toml b/pyproject.toml index 97cbd5d81..e353dff0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -172,16 +172,7 @@ max-line-length=100 no-docstring-rgx="^test_|^.*TestCase|^_|^Test" [tool.pylint."MESAGES CONTROL"] -disable = [ - "too-few-public-methods", - "unsupported-membership-test", - "too-many-positional-arguments", - "too-many-arguments", - "too-many-branches", - "too-many-instance-attributes", - "too-many-lines", - "line-too-long", -] +disable="too-few-public-methods,unsupported-membership-test" [tool.pylint.DESIGN] min-public-methods=1 From 60621601bb48cc028c7dd6357035107d562f3a7b Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 7 Apr 2026 08:46:57 +0200 Subject: [PATCH 20/26] refactor: remove unsubscribe call, as close() already handles cleanup (unsubscribe only needed for dynamic topic switching during runtime) --- logprep/ng/connector/confluent_kafka/input.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/logprep/ng/connector/confluent_kafka/input.py b/logprep/ng/connector/confluent_kafka/input.py index c5dd8005e..75d330d11 100644 --- a/logprep/ng/connector/confluent_kafka/input.py +++ b/logprep/ng/connector/confluent_kafka/input.py @@ -647,10 +647,7 @@ async def setup(self): async def shut_down(self) -> None: """Shut down the confluent kafka input connector and cleanup resources.""" - consumer = await self.get_consumer() - - if consumer is not None: - await consumer.unsubscribe() - await consumer.close() + if self._consumer is not None: + await self._consumer.close() await super().shut_down() From 6d8bb8100ac203b4af8f21ac5eef0964048d0ae1 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 7 Apr 2026 11:13:24 +0200 Subject: [PATCH 21/26] feat: migrate to async AIOProducer and replace on_delivery callbacks with awaitable delivery futures --- .../ng/connector/confluent_kafka/output.py | 39 ++++--------------- 1 file changed, 7 insertions(+), 32 deletions(-) diff --git a/logprep/ng/connector/confluent_kafka/output.py b/logprep/ng/connector/confluent_kafka/output.py index 3ce1a75e8..914b7be9d 100644 --- a/logprep/ng/connector/confluent_kafka/output.py +++ b/logprep/ng/connector/confluent_kafka/output.py @@ -33,6 +33,7 @@ from attrs import define, field, validators from confluent_kafka import KafkaException, Message, Producer # type: ignore from confluent_kafka.admin import AdminClient +from confluent_kafka.aio import AIOProducer from logprep.metrics.metrics import GaugeMetric from logprep.ng.abc.event import Event @@ -235,8 +236,8 @@ def _admin(self) -> AdminClient: return AdminClient(admin_config) @cached_property - def _producer(self) -> Producer: - return Producer(self._kafka_config) + def _producer(self) -> AIOProducer: + return AIOProducer(self._kafka_config) def _error_callback(self, error: KafkaException) -> None: """Callback for generic/global error events, these errors are typically @@ -314,48 +315,22 @@ async def store_custom(self, event: Event, target: str) -> None: target : str Topic to store event data in. """ - event.state.current_state = EventStateType.STORING_IN_OUTPUT - document = event.data self.metrics.number_of_processed_events += 1 + try: - self._producer.produce( + delivery_future = await self._producer.produce( topic=target, value=self._encoder.encode(document), - on_delivery=partial(self.on_delivery, event), ) - - except BufferError: - self._producer.flush(timeout=self.config.flush_timeout) - logger.debug("Buffer full, flushing") - - try: - self._producer.produce( - topic=target, - value=self._encoder.encode(document), - on_delivery=partial(self.on_delivery, event), - ) - except BufferError as err: - event.state.current_state = EventStateType.FAILED - event.errors.append(err) - logger.error("Message delivery failed after retry: %s", err) - self.metrics.number_of_errors += 1 - return - + msg = await delivery_future except KafkaException as err: event.state.current_state = EventStateType.FAILED event.errors.append(err) logger.error("Kafka exception during produce: %s", err) self.metrics.number_of_errors += 1 return - - logger.debug("Produced message %s to topic %s", str(document), target) - self._producer.poll(self.config.send_timeout) - - def on_delivery(self, event: Event, err: KafkaException, msg: Message) -> None: - """Callback for delivery reports.""" - - if err is not None: + except Exception as err: event.state.current_state = EventStateType.FAILED event.errors.append(err) logger.error("Message delivery failed: %s", err) From 64656bf95594a473e1328a4a58fbbb3e66e42f99 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 7 Apr 2026 11:36:49 +0200 Subject: [PATCH 22/26] refactor: fix review issue --- logprep/ng/util/worker/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logprep/ng/util/worker/worker.py b/logprep/ng/util/worker/worker.py index 687be98af..8cbe75de7 100644 --- a/logprep/ng/util/worker/worker.py +++ b/logprep/ng/util/worker/worker.py @@ -292,7 +292,7 @@ async def shut_down(self, timeout_s: float) -> None: current_task = asyncio.current_task() tasks_but_current = [t for t in self._worker_tasks if t is not current_task] - logger.debug(f"waiting for termination of {len(tasks_but_current)} tasks") + logger.debug("waiting for termination of %d tasks", len(tasks_but_current)) try: await asyncio.wait_for( From b87ba3bdf1072268177443d76443775d97aac2dc Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 7 Apr 2026 11:46:56 +0200 Subject: [PATCH 23/26] refactor: rename module to logging_helpers to avoid stdlib name clash and align with project naming conventions --- logprep/ng/util/defaults.py | 2 +- logprep/ng/util/{logprep_logging.py => logging_helpers.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename logprep/ng/util/{logprep_logging.py => logging_helpers.py} (100%) diff --git a/logprep/ng/util/defaults.py b/logprep/ng/util/defaults.py index a97bebfe9..62f9cd1aa 100644 --- a/logprep/ng/util/defaults.py +++ b/logprep/ng/util/defaults.py @@ -36,7 +36,7 @@ class EXITCODES(IntEnum): "version": 1, "formatters": { "logprep": { - "class": "logprep.ng.util.logprep_logging.LogprepFormatter", + "class": "logprep.ng.util.logging_helpers.LogprepFormatter", "format": DEFAULT_LOG_FORMAT, "datefmt": DEFAULT_LOG_DATE_FORMAT, } diff --git a/logprep/ng/util/logprep_logging.py b/logprep/ng/util/logging_helpers.py similarity index 100% rename from logprep/ng/util/logprep_logging.py rename to logprep/ng/util/logging_helpers.py From d0d2d72b4e3af1a983d10408d3836314a9df2e84 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 7 Apr 2026 11:51:34 +0200 Subject: [PATCH 24/26] refactor: remove unused constant MAX_CONFIG_REFRESH_INTERVAL_DEVIATION_PERCENT --- logprep/ng/runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/logprep/ng/runner.py b/logprep/ng/runner.py index 975beaa18..e4b6ddfe6 100644 --- a/logprep/ng/runner.py +++ b/logprep/ng/runner.py @@ -21,7 +21,6 @@ GRACEFUL_SHUTDOWN_TIMEOUT = 3 HARD_SHUTDOWN_TIMEOUT = 5 -MAX_CONFIG_REFRESH_INTERVAL_DEVIATION_PERCENT = 0.05 class Runner: From db44fe8f29d5049241a9c396d9e69372e07f4203 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 7 Apr 2026 11:56:32 +0200 Subject: [PATCH 25/26] refactor: fix review issue --- benchmark.py | 284 ++++++++++----------------------------------------- 1 file changed, 54 insertions(+), 230 deletions(-) diff --git a/benchmark.py b/benchmark.py index a5fdce66f..aa49b5805 100644 --- a/benchmark.py +++ b/benchmark.py @@ -1,4 +1,5 @@ # pylint: disable=C0103 + """ Benchmark runner for logprep (logprep-ng and non-ng). @@ -35,23 +36,15 @@ _current_compose_dir: Path | None = None _current_env: dict[str, str] | None = None -# Exporter / metrics port used by logprep -EXPORTER_PORT = 8001 - def _handle_sigint(signum, frame): """ - Handle Ctrl+C (SIGINT) and request graceful shutdown. - - Avoid tearing down compose services directly from the signal handler, - because the main benchmark flow may still be interacting with them. - Cleanup is handled by the normal control flow / finally blocks. + Handle Ctrl+C (SIGINT) and perform graceful shutdown. """ - del signum, frame # unused global _shutdown_requested _shutdown_requested = True - print("\n\n⚠ Ctrl+C detected. Stopping benchmark gracefully...") + print("\n\n⚠ Ctrl+C detected. Shutting down benchmark...") if _current_logprep_proc is not None: try: @@ -59,13 +52,18 @@ def _handle_sigint(signum, frame): except Exception: pass + if _current_compose_dir is not None and _current_env is not None: + try: + run_cmd( + ["docker", "compose", "down"], + cwd=_current_compose_dir, + env=_current_env, + ignore_error=True, + ) + except Exception: + pass -def raise_if_shutdown_requested() -> None: - """ - Abort current benchmark flow if a shutdown was requested. - """ - if _shutdown_requested: - raise KeyboardInterrupt("Benchmark shutdown requested") + sys.exit(130) # ------------------------- @@ -137,10 +135,14 @@ def print_benchmark_config(args: argparse.Namespace) -> None: for key in sorted(args_dict): value = args_dict[key] + # Format integers with underscore separator if isinstance(value, int): formatted = f"{value:_}" + + # Format list of integers (e.g. runs) elif isinstance(value, list) and all(isinstance(v, int) for v in value): formatted = "[" + ", ".join(f"{v:_}" for v in value) + "]" + else: formatted = value @@ -152,7 +154,6 @@ def print_benchmark_config(args: argparse.Namespace) -> None: print(f"{' ↳ mode':30s}: {mode}") print(f"{' ↳ pipeline_config':30s}: {pipeline_config}") - print(f"{'exporter_port':30s}: {EXPORTER_PORT}") print("================================\n") @@ -253,118 +254,6 @@ def kill_hard(proc: subprocess.Popen) -> None: proc.wait() -def is_tcp_port_open(host: str, port: int, timeout: float = 0.5) -> bool: - """ - Return True if a TCP connection to host:port can be established. - - Args: - host: Hostname or IP address. - port: TCP port. - timeout: Socket timeout in seconds. - - Returns: - True if the TCP port accepts connections, otherwise False. - """ - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.settimeout(timeout) - return sock.connect_ex((host, port)) == 0 - - -def find_pids_listening_on_port(port: int) -> list[int]: - """ - Return a list of PIDs listening on the given TCP port. - - Uses lsof and returns an empty list if no processes are found. - - Args: - port: TCP port. - - Returns: - List of PIDs. - """ - result = subprocess.run( - ["lsof", "-ti", f":{port}"], - capture_output=True, - text=True, - check=False, - ) - - if result.returncode not in (0, 1): - raise RuntimeError( - f"Failed to query listener processes on port {port}: {result.stderr.strip()}" - ) - - pids: list[int] = [] - for line in result.stdout.splitlines(): - line = line.strip() - if line.isdigit(): - pids.append(int(line)) - - return pids - - -def kill_processes_listening_on_port( - port: int, - *, - sigterm_wait_s: float = 1.0, - final_wait_s: float = 2.0, -) -> None: - """ - Kill processes listening on the given TCP port. - - First sends SIGTERM, then SIGKILL if the port is still occupied. - - Args: - port: TCP port to free. - sigterm_wait_s: Time to wait after SIGTERM. - final_wait_s: Time to wait after SIGKILL. - - Raises: - RuntimeError: If the port is still in use after cleanup. - """ - pids = find_pids_listening_on_port(port) - if not pids: - return - - print( - f"Port {port} is already in use. Terminating listener processes: {', '.join(map(str, pids))}" - ) - - for pid in pids: - try: - os.kill(pid, signal.SIGTERM) - except ProcessLookupError: - pass - - deadline = time.time() + sigterm_wait_s - while time.time() < deadline: - if not is_tcp_port_open("127.0.0.1", port): - return - time.sleep(0.1) - - remaining_pids = find_pids_listening_on_port(port) - if remaining_pids: - print( - f"Port {port} still in use after SIGTERM. Sending SIGKILL to: " - f"{', '.join(map(str, remaining_pids))}" - ) - - for pid in remaining_pids: - try: - os.kill(pid, signal.SIGKILL) - except ProcessLookupError: - pass - - deadline = time.time() + final_wait_s - while time.time() < deadline: - if not is_tcp_port_open("127.0.0.1", port): - return - time.sleep(0.1) - - if is_tcp_port_open("127.0.0.1", port): - raise RuntimeError(f"Port {port} is still in use after cleanup.") - - def opensearch_refresh(opensearch_url: str, processed_index: str) -> None: """ Force a refresh of the processed index so counts reflect recent writes. @@ -397,33 +286,6 @@ def opensearch_count_processed(opensearch_url: str, processed_index: str) -> int return int(resp.json()["count"]) -def opensearch_debug_snapshot(opensearch_url: str) -> None: - """ - Print a small OpenSearch state snapshot for debugging. - Never raises (best-effort). - """ - try: - r = requests.get(f"{opensearch_url}/_cat/indices?v", timeout=10) - print("\n--- _cat/indices ---") - print(r.text) - except Exception as e: - print(f"\n--- _cat/indices (failed) ---\n{e}") - - try: - r = requests.get(f"{opensearch_url}/_cat/count?v", timeout=10) - print("\n--- _cat/count ---") - print(r.text) - except Exception as e: - print(f"\n--- _cat/count (failed) ---\n{e}") - - try: - r = requests.get(f"{opensearch_url}/_cat/aliases?v", timeout=10) - print("\n--- _cat/aliases ---") - print(r.text) - except Exception as e: - print(f"\n--- _cat/aliases (failed) ---\n{e}") - - def reset_prometheus_dir(path: str) -> None: """ Recreate PROMETHEUS_MULTIPROC_DIR. @@ -446,8 +308,8 @@ def resolve_pipeline_config(ng: int) -> Path: Pipeline config path. """ if ng == 1: - return Path("./examples/exampledata/config/_benchmark_ng_pipeline.yml") - return Path("./examples/exampledata/config/_benchmark_non_ng_pipeline.yml") + return Path("./examples/exampledata/config/ng_pipeline.yml") + return Path("./examples/exampledata/config/pipeline.yml") def read_vm_max_map_count() -> int: @@ -497,7 +359,6 @@ def wait_for_tcp(host: str, port: int, *, timeout_s: float, interval_s: float = last_err: OSError | None = None while time.time() < deadline: - raise_if_shutdown_requested() try: with socket.create_connection((host, port), timeout=2): return @@ -521,7 +382,6 @@ def wait_for_opensearch(opensearch_url: str, *, timeout_s: float, interval_s: fl last_err: Exception | None = None while time.time() < deadline: - raise_if_shutdown_requested() try: resp = requests.get(f"{opensearch_url}/_cluster/health", timeout=2) if resp.status_code == 200: @@ -550,7 +410,6 @@ def wait_for_kafka_topic( last_err: Exception | None = None while time.time() < deadline: - raise_if_shutdown_requested() try: proc = subprocess.run( [ @@ -684,13 +543,11 @@ def benchmark_run( _current_env = env try: - raise_if_shutdown_requested() ensure_vm_max_map_count() run_cmd(["docker", "compose", "down"], cwd=compose_dir, env=env) run_cmd(["docker", "volume", "rm", "compose_opensearch-data"], env=env, ignore_error=True) - raise_if_shutdown_requested() run_cmd( ["docker", "compose", "up", "-d", "--no-deps", *services], cwd=compose_dir, @@ -709,8 +566,6 @@ def benchmark_run( wait_for_tcp("127.0.0.1", 9200, timeout_s=float(sleep_after_compose_up_s)) wait_for_opensearch(opensearch_url, timeout_s=float(sleep_after_compose_up_s)) - raise_if_shutdown_requested() - batch_size = max(event_num // 10, 10) output_config = f'{{"bootstrap.servers": "{bootstrap_servers}"}}' @@ -731,49 +586,30 @@ def benchmark_run( env=env, ) - raise_if_shutdown_requested() time.sleep(sleep_after_generate_s) - raise_if_shutdown_requested() binary = "logprep-ng" if ng == 1 else "logprep" - # Ensure exporter port is free before starting logprep. - kill_processes_listening_on_port(EXPORTER_PORT) - t_startup = time.time() logprep_proc = popen_cmd([binary, "run", str(pipeline_config)], env=env) _current_logprep_proc = logprep_proc time.sleep(sleep_after_logprep_start_s) - raise_if_shutdown_requested() - - print("\n=== OpenSearch snapshot (before measurement) ===") - opensearch_debug_snapshot(opensearch_url) - raise_if_shutdown_requested() baseline = opensearch_count_processed(opensearch_url, processed_index) startup_s = time.time() - t_startup t_run = time.time() time.sleep(run_seconds) - raise_if_shutdown_requested() + window_s = time.time() - t_run kill_hard(logprep_proc) - - # Ensure exporter port is released after forceful process termination. - kill_processes_listening_on_port(EXPORTER_PORT) - - window_s = time.time() - t_run logprep_proc = None _current_logprep_proc = None - raise_if_shutdown_requested() + # ensure near-real-time writes are visible to _count before measuring opensearch_refresh(opensearch_url, processed_index) - print("\n=== OpenSearch snapshot (after run / after refresh) ===") - opensearch_debug_snapshot(opensearch_url) - - raise_if_shutdown_requested() after = opensearch_count_processed(opensearch_url, processed_index) processed = max(0, after - baseline) @@ -784,12 +620,6 @@ def benchmark_run( finally: if logprep_proc is not None: kill_hard(logprep_proc) - - try: - kill_processes_listening_on_port(EXPORTER_PORT) - except Exception as exc: - print(f"Warning: failed to clean up exporter port {EXPORTER_PORT}: {exc}") - _current_logprep_proc = None run_cmd(["docker", "compose", "down"], cwd=compose_dir, env=env, ignore_error=True) @@ -975,42 +805,36 @@ def setup_output_tee(out_path: Path | None) -> None: if __name__ == "__main__": signal.signal(signal.SIGINT, _handle_sigint) - try: - args_ = parse_args() - setup_output_tee(args_.out) - - print_benchmark_config(args_) - - pipeline_config_ = resolve_pipeline_config(args_.ng) - - results: list[RunResult] = [] - - benchmark_seconds = args_.runs - for run_idx, seconds in enumerate(benchmark_seconds, start=1): - raise_if_shutdown_requested() - print(f"----- Run Round {run_idx}: {seconds} seconds -----") - result = benchmark_run( - run_seconds=seconds, - ng=args_.ng, - event_num=args_.event_num, - prometheus_multiproc_dir=args_.prometheus_multiproc_dir, - compose_dir=args_.compose_dir, - pipeline_config=pipeline_config_, - gen_input_dir=args_.gen_input_dir, - bootstrap_servers=args_.bootstrap_servers, - sleep_after_compose_up_s=args_.sleep_after_compose_up_s, - sleep_after_generate_s=args_.sleep_after_generate_s, - sleep_after_logprep_start_s=args_.sleep_after_logprep_start_s, - opensearch_url=args_.opensearch_url, - processed_index=args_.processed_index, - services=args_.services, - ) - results.append(result) - print_single_run_result(result, event_num=args_.event_num) - print() - - print_runs_table_and_summary(results) + args_ = parse_args() + setup_output_tee(args_.out) + + print_benchmark_config(args_) + + pipeline_config_ = resolve_pipeline_config(args_.ng) + + results: list[RunResult] = [] + + benchmark_seconds = args_.runs + for run_idx, seconds in enumerate(benchmark_seconds, start=1): + print(f"----- Run Round {run_idx}: {seconds} seconds -----") + result = benchmark_run( + run_seconds=seconds, + ng=args_.ng, + event_num=args_.event_num, + prometheus_multiproc_dir=args_.prometheus_multiproc_dir, + compose_dir=args_.compose_dir, + pipeline_config=pipeline_config_, + gen_input_dir=args_.gen_input_dir, + bootstrap_servers=args_.bootstrap_servers, + sleep_after_compose_up_s=args_.sleep_after_compose_up_s, + sleep_after_generate_s=args_.sleep_after_generate_s, + sleep_after_logprep_start_s=args_.sleep_after_logprep_start_s, + opensearch_url=args_.opensearch_url, + processed_index=args_.processed_index, + services=args_.services, + ) + results.append(result) + print_single_run_result(result, event_num=args_.event_num) + print() - except KeyboardInterrupt: - print("\nBenchmark aborted.") - sys.exit(130) + print_runs_table_and_summary(results) From bcba7a99a06eb9df2af8766a3c67667d47a045b7 Mon Sep 17 00:00:00 2001 From: David Kaya Date: Tue, 7 Apr 2026 12:11:15 +0200 Subject: [PATCH 26/26] refactor: guard cached _search_context on shutdown and remove unused @override decorator for consistency --- logprep/ng/connector/opensearch/output.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/logprep/ng/connector/opensearch/output.py b/logprep/ng/connector/opensearch/output.py index 9aa5efbe2..df99a74cf 100644 --- a/logprep/ng/connector/opensearch/output.py +++ b/logprep/ng/connector/opensearch/output.py @@ -45,7 +45,6 @@ helpers, ) from opensearchpy.serializer import JSONSerializer -from typing_extensions import override from logprep.abc.exceptions import LogprepException from logprep.ng.abc.event import Event @@ -356,6 +355,7 @@ async def health(self) -> bool: # type: ignore # TODO: fix mypy issue return False return super().health() and resp.get("status") in self.config.desired_cluster_status - @override async def shut_down(self): - await self._search_context.close() + if "_search_context" in self.__dict__: + await self._search_context.close() + await super().shut_down()