Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6463412
refactor: remove unnecessary types
kaya-david Mar 19, 2026
2c06bef
refactor: replace uvloop.run with asyncio.Runner and configurable loo…
kaya-david Mar 19, 2026
70f1542
fix: fix config refresh, remove config scheduler, small adaptions
kaya-david Mar 25, 2026
4438165
refactor: remove loop_factory
kaya-david Mar 25, 2026
6cee075
feat: add asyncio exception handler for unhandled errors
kaya-david Mar 25, 2026
dea4dbd
feat: improve config refresh sync/async
kaya-david Mar 25, 2026
8ac6cd2
feat: improve config refresh setup and teardown logic + improve types…
kaya-david Mar 26, 2026
4815394
refactor: adjust naming to follow Python conventions (shadowing)
kaya-david Mar 26, 2026
a2c4183
refactor: remove print
kaya-david Mar 30, 2026
50ea0b8
refactor: remove unused import
kaya-david Mar 30, 2026
6b6990c
refactor: simplify config refresh
kaya-david Mar 31, 2026
d8422c1
refactor: annotation
kaya-david Mar 31, 2026
99ef937
refactor: improve exception handler
kaya-david Mar 31, 2026
74a1b43
fix: correct kafka delivery semantics and unify async shutdown lifecy…
kaya-david Apr 7, 2026
47ad525
fix: prevent race condition between SIGINT handler and benchmark flow
kaya-david Mar 31, 2026
47c22c8
refactor: simplify worker shutdown after timeout
kaya-david Mar 31, 2026
b61b83f
fix: clean up exporter port before and after logprep runs
kaya-david Mar 31, 2026
b97d924
refactor: restore _shut_down hook to preserve idempotent and extensib…
kaya-david Apr 2, 2026
99cd7ec
refactor: review issues
kaya-david Apr 2, 2026
6062160
refactor: remove unsubscribe call, as close() already handles cleanup…
kaya-david Apr 7, 2026
6d8bb81
feat: migrate to async AIOProducer and replace on_delivery callbacks …
kaya-david Apr 7, 2026
64656bf
refactor: fix review issue
kaya-david Apr 7, 2026
b87ba3b
refactor: rename module to logging_helpers to avoid stdlib name clash…
kaya-david Apr 7, 2026
d0d2d72
refactor: remove unused constant MAX_CONFIG_REFRESH_INTERVAL_DEVIATIO…
kaya-david Apr 7, 2026
db44fe8
refactor: fix review issue
kaya-david Apr 7, 2026
bcba7a9
refactor: guard cached _search_context on shutdown and remove unused …
kaya-david Apr 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 4 additions & 37 deletions benchmark.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# pylint: disable=C0103

"""
Benchmark runner for logprep (logprep-ng and non-ng).

Expand Down Expand Up @@ -285,33 +286,6 @@ def opensearch_count_processed(opensearch_url: str, processed_index: str) -> int
return int(resp.json()["count"])


def opensearch_debug_snapshot(opensearch_url: str) -> None:
"""
Print a small OpenSearch state snapshot for debugging.
Never raises (best-effort).
"""
try:
r = requests.get(f"{opensearch_url}/_cat/indices?v", timeout=10)
print("\n--- _cat/indices ---")
print(r.text)
except Exception as e:
print(f"\n--- _cat/indices (failed) ---\n{e}")

try:
r = requests.get(f"{opensearch_url}/_cat/count?v", timeout=10)
print("\n--- _cat/count ---")
print(r.text)
except Exception as e:
print(f"\n--- _cat/count (failed) ---\n{e}")

try:
r = requests.get(f"{opensearch_url}/_cat/aliases?v", timeout=10)
print("\n--- _cat/aliases ---")
print(r.text)
except Exception as e:
print(f"\n--- _cat/aliases (failed) ---\n{e}")


def reset_prometheus_dir(path: str) -> None:
"""
Recreate PROMETHEUS_MULTIPROC_DIR.
Expand All @@ -334,8 +308,8 @@ def resolve_pipeline_config(ng: int) -> Path:
Pipeline config path.
"""
if ng == 1:
return Path("./examples/exampledata/config/_benchmark_ng_pipeline.yml")
return Path("./examples/exampledata/config/_benchmark_non_ng_pipeline.yml")
return Path("./examples/exampledata/config/ng_pipeline.yml")
return Path("./examples/exampledata/config/pipeline.yml")


def read_vm_max_map_count() -> int:
Expand Down Expand Up @@ -622,27 +596,20 @@ def benchmark_run(

time.sleep(sleep_after_logprep_start_s)

print("\n=== OpenSearch snapshot (before measurement) ===")
opensearch_debug_snapshot(opensearch_url)

baseline = opensearch_count_processed(opensearch_url, processed_index)
startup_s = time.time() - t_startup

t_run = time.time()
time.sleep(run_seconds)
window_s = time.time() - t_run

kill_hard(logprep_proc)

window_s = time.time() - t_run
logprep_proc = None
_current_logprep_proc = None

# ensure near-real-time writes are visible to _count before measuring
opensearch_refresh(opensearch_url, processed_index)

print("\n=== OpenSearch snapshot (after run / after refresh) ===")
opensearch_debug_snapshot(opensearch_url)

after = opensearch_count_processed(opensearch_url, processed_index)
processed = max(0, after - baseline)

Expand Down
2 changes: 1 addition & 1 deletion logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def _shut_down(self) -> None:
self._clear_scheduled_jobs()
self._clear_properties()

async def shut_down(self):
def shut_down(self):
"""Stop processing of this component.

Optional: Called when stopping the pipeline
Expand Down
10 changes: 10 additions & 0 deletions logprep/abc/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ class Metrics(NgComponent.Metrics):
)
)
"""Number of errors that occurred while processing events"""

async def setup(self) -> None:
"""Set up the connector."""

await super().setup()

async def shut_down(self) -> None:
"""Shutdown the connector and cleanup resources."""

await super().shut_down()
8 changes: 6 additions & 2 deletions logprep/ng/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ class NgComponent(Component):
# This is unclean from an interface perspective, but works if the worlds doen't mix.

async def setup(self) -> None:
return super().setup()
"""Set up the ng component."""

super().setup()

async def shut_down(self) -> None:
return super().shut_down()
"""Shut down ng component and cleanup resources."""

super().shut_down()

# pylint: enable=invalid-overridden-method,useless-parent-delegation
10 changes: 10 additions & 0 deletions logprep/ng/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,3 +504,13 @@ def _add_hmac_to(
}
add_fields_to(event_dict, new_field)
return event_dict

async def setup(self) -> None:
"""Set up the input connector."""

await super().setup()

async def shut_down(self) -> None:
"""Shut down input components and cleanup resources."""

await super().shut_down()
14 changes: 10 additions & 4 deletions logprep/ng/abc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ def wrapper(self, *args, **kwargs):

return wrapper

def _shut_down(self) -> None:
"""Shut down the output connector."""
self.flush()
return super()._shut_down()
async def setup(self) -> None:
"""Set up the output connector."""

await super().setup()

async def shut_down(self) -> None:
"""Shut down the output connector and cleanup resources."""

await self.flush()
await super().shut_down()
7 changes: 7 additions & 0 deletions logprep/ng/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ def _write_target_field(self, event: dict, rule: "Rule", result: Any) -> None:
)

async def setup(self) -> None:
"""Set up the processor."""

await super().setup()
for rule in self.rules:
_ = rule.metrics # initialize metrics to show them on startup

async def shut_down(self) -> None:
"""Shut down the processor and run required cleanups"""

await super().shut_down()
36 changes: 18 additions & 18 deletions logprep/ng/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
auto.offset.reset: "earliest"
"""

# pylint: enable=line-too-long
import logging
import os
import typing
from functools import cached_property, partial
from socket import getfqdn
from types import MappingProxyType
from typing import Union
from types import MappingProxyType # pylint: disable=no-name-in-module

import msgspec
from attrs import define, field, validators
Expand Down Expand Up @@ -348,7 +346,7 @@ async def get_consumer(self, max_workers: int = 4) -> AIOConsumer:

return self._consumer

def _error_callback(self, error: KafkaException) -> None:
async def _error_callback(self, error: KafkaException) -> None:
"""Callback for generic/global error events, these errors are typically
to be considered informational since the client will automatically try to recover.
This callback is served upon calling client.poll()
Expand All @@ -361,7 +359,7 @@ def _error_callback(self, error: KafkaException) -> None:
self.metrics.number_of_errors += 1
logger.error("%s: %s", self.describe(), error)

def _stats_callback(self, stats_raw: str) -> None:
async def _stats_callback(self, stats_raw: str) -> None:
"""Callback for statistics data. This callback is triggered by poll()
or flush every `statistics.interval.ms` (needs to be configured separately)

Expand Down Expand Up @@ -395,8 +393,10 @@ def _stats_callback(self, stats_raw: str) -> None:
"assignment_size", DEFAULT_RETURN
)

def _commit_callback(
self, error: Union[KafkaException, None], topic_partitions: list[TopicPartition]
async def _commit_callback(
self,
error: KafkaException | None,
topic_partitions: list[TopicPartition],
) -> None:
"""Callback used to indicate success or failure of asynchronous and
automatic commit requests. This callback is served upon calling consumer.poll()
Expand Down Expand Up @@ -461,8 +461,6 @@ async def _get_raw_event(self, timeout: float) -> Message | None: # type: ignor
message = await consumer.poll(timeout=timeout)
except RuntimeError as error:
raise FatalInputError(self, str(error)) from error
except Exception as error: # remove this
pass
if message is None:
return None
if message.value() is None or message.partition() is None or message.offset() is None:
Expand Down Expand Up @@ -503,7 +501,6 @@ async def _get_event(self, timeout: float) -> tuple:
"""

message = await self._get_raw_event(timeout)
# assert None not in (message.value(), message.partition(), message.offset())

if message is None:
return None, None, None
Expand Down Expand Up @@ -602,17 +599,11 @@ async def _get_memberid(self) -> str | None:
member_id = None
try:
consumer = await self.get_consumer()
member_id = consumer._consumer.memberid()
member_id = consumer._consumer.memberid() # pylint: disable=protected-access
except RuntimeError as error:
logger.error("Failed to retrieve member ID: %s", error)
return member_id

async def shut_down(self) -> None:
"""Close consumer, which also commits kafka offsets."""
consumer = await self.get_consumer()
await consumer.close()
super()._shut_down()

def health(self) -> bool:
"""Check the health of the component.

Expand All @@ -637,7 +628,8 @@ async def acknowledge(self, events: list[LogEvent]):
logger.debug("acknowledge called")

async def setup(self):
"""Set the component up."""
"""Set the confluent kafka input connector."""

await super().setup()

try:
Expand All @@ -651,3 +643,11 @@ async def setup(self):
)
except KafkaException as error:
raise FatalInputError(self, f"Could not setup kafka consumer: {error}") from error

async def shut_down(self) -> None:
"""Shut down the confluent kafka input connector and cleanup resources."""

if self._consumer is not None:
await self._consumer.close()

await super().shut_down()
72 changes: 38 additions & 34 deletions logprep/ng/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from attrs import define, field, validators
from confluent_kafka import KafkaException, Message, Producer # type: ignore
from confluent_kafka.admin import AdminClient
from confluent_kafka.aio import AIOProducer

from logprep.metrics.metrics import GaugeMetric
from logprep.ng.abc.event import Event
Expand Down Expand Up @@ -235,8 +236,8 @@ def _admin(self) -> AdminClient:
return AdminClient(admin_config)

@cached_property
def _producer(self) -> Producer:
return Producer(self._kafka_config)
def _producer(self) -> AIOProducer:
return AIOProducer(self._kafka_config)

def _error_callback(self, error: KafkaException) -> None:
"""Callback for generic/global error events, these errors are typically
Expand Down Expand Up @@ -285,14 +286,12 @@ def describe(self) -> str:

async def store_batch(
self, events: Sequence[Event], target: str | None = None
) -> tuple[Sequence[Event], Sequence[Event]]:
) -> Sequence[Event]:
store_target = target if target is not None else self.config.topic
for event in events:
await self.store_custom(event, store_target)
return (
[e for e in events if e.state == EventStateType.DELIVERED],
[e for e in events if e.state == EventStateType.FAILED],
)

return events

async def store(self, event: Event) -> None:
"""Store a document in the producer topic.
Expand All @@ -316,23 +315,35 @@ async def store_custom(self, event: Event, target: str) -> None:
target : str
Topic to store event data in.
"""
event.state.current_state = EventStateType.STORING_IN_OUTPUT

document = event.data
self.metrics.number_of_processed_events += 1

try:
self._producer.produce(
delivery_future = await self._producer.produce(
topic=target,
value=self._encoder.encode(document),
on_delivery=partial(self.on_delivery, event),
)
logger.debug("Produced message %s to topic %s", str(document), target)
self._producer.poll(self.config.send_timeout)
self._producer.flush()
except BufferError:
# block program until buffer is empty or timeout is reached
self._producer.flush(timeout=self.config.flush_timeout)
logger.debug("Buffer full, flushing")
msg = await delivery_future
except KafkaException as err:
event.state.current_state = EventStateType.FAILED
event.errors.append(err)
logger.error("Kafka exception during produce: %s", err)
self.metrics.number_of_errors += 1
return
except Exception as err:
event.state.current_state = EventStateType.FAILED
event.errors.append(err)
logger.error("Message delivery failed: %s", err)
self.metrics.number_of_errors += 1
return

event.state.current_state = EventStateType.DELIVERED
logger.debug(
"Message delivered to '%s' partition %s, offset %s",
msg.topic(),
msg.partition(),
msg.offset(),
)

async def flush(self) -> None:
"""ensures that all messages are flushed. According to
Expand Down Expand Up @@ -364,24 +375,17 @@ def health(self) -> bool:
return super().health()

async def setup(self) -> None:
"""Set the component up."""
"""Set the confluent kafka output connector."""

try:
await super().setup()
except KafkaException as error:
raise FatalOutputError(self, f"Could not setup kafka producer: {error}") from error

def on_delivery(self, event: Event, err: KafkaException, msg: Message) -> None:
"""Callback for delivery reports."""
if err is not None:
event.state.current_state = EventStateType.FAILED
event.errors.append(err)
logger.error("Message delivery failed: %s", err)
self.metrics.number_of_errors += 1
return
event.state.current_state = EventStateType.DELIVERED
logger.debug(
"Message delivered to '%s' partition %s, offset %s",
msg.topic(),
msg.partition(),
msg.offset(),
)
async def shut_down(self) -> None:
"""Shut down the confluent kafka output connector and cleanup resources."""

if "_producer" in self.__dict__:
await self.flush()
Comment on lines +388 to +389
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this? Shouldnt we just always flush, I mmean shouldnt the flush be agnostic to, there is a producer and there is none? Also I dont like this if, isnt there any other way to check if we have a producer?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_producer is a cached property and is only initialized on first access. A shut_down could technically occur before it was ever used (i.e. before the producer exists), which would cause a crash during flush. This check is therefore a precaution.


await super().shut_down()
4 changes: 2 additions & 2 deletions logprep/ng/connector/file/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def setup(self) -> None:
file_name=self.config.logfile_path,
)

def _shut_down(self) -> None:
async def shut_down(self) -> None:
"""Raises the Stop Event Flag that will stop the thread that monitors the logfile"""
self.stop_flag.set()
return super()._shut_down()
await super().shut_down()
Loading
Loading