Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
ea06b58
feat: add async pipeline manager POC
kaya-david Feb 18, 2026
b7a200b
add debug launch configuration for ng
mhoff Feb 24, 2026
0a3350e
add integration draft
mhoff Feb 24, 2026
ee5607c
remove isolated poc code
mhoff Feb 26, 2026
0ab26ac
fix logging error
mhoff Feb 26, 2026
a0fc08d
refactor to use more composition and less events in top-level code
mhoff Feb 27, 2026
b5b1e8b
feat: make confluent_kafka async; first steps towards async input han…
kaya-david Mar 4, 2026
4ecb222
feat: add pipeline configs for benchmark
kaya-david Mar 4, 2026
e0ab0f1
WIP: initial async steps for sender and opensearch output
kaya-david Mar 4, 2026
89e8c13
WIP
kaya-david Mar 4, 2026
eabae13
WIP
kaya-david Mar 5, 2026
ca4b1fc
use async setup in ng-world; tests TBD
mhoff Mar 8, 2026
9f57cdb
rename debug configurations and use internalConsole
mhoff Mar 9, 2026
06272de
show taskName in log messages
mhoff Mar 9, 2026
9098932
fix benchmark pipeline configs
mhoff Mar 9, 2026
29ed218
update debug logs
mhoff Mar 9, 2026
333ed60
fix benchmark configs for MacOS
mhoff Mar 9, 2026
dc36fa3
ensure taskName property is set for older python versions
mhoff Mar 9, 2026
92df15b
avoid sending events multiple times in sender
mhoff Mar 9, 2026
9442f00
add draft for store_batch
mhoff Mar 9, 2026
b5cd793
WIP: small adaptions + benchmark results with adapted pipeline config…
kaya-david Mar 9, 2026
55ebd80
WIP: benchmark results with adapted pipeline configs -> opensearch me…
kaya-david Mar 9, 2026
ae79764
fix wrong import
mhoff Mar 10, 2026
79290e3
set MAX_QUEUE_SIZE to BATCH_SIZE and increase input_worker batch_size
mhoff Mar 10, 2026
49e198f
disentangle EventBacklog and input
mhoff Mar 10, 2026
e8dad3a
docs: benchmark logs
kaya-david Mar 10, 2026
7a946ec
remove terminal out_queue
mhoff Mar 10, 2026
6fd235d
add error logs for worker flush timers
mhoff Mar 10, 2026
b1b23da
include shutdown time in performance measurement
mhoff Mar 11, 2026
c8774ef
distribute events to different queues after processing; add debug log…
mhoff Mar 11, 2026
2265739
Cleanup opensearch output (#947)
Pablu23 Apr 2, 2026
6463412
refactor: remove unnecessary types
kaya-david Mar 19, 2026
2c06bef
refactor: replace uvloop.run with asyncio.Runner and configurable loo…
kaya-david Mar 19, 2026
70f1542
fix: fix config refresh, remove config scheduler, small adaptions
kaya-david Mar 25, 2026
4438165
refactor: remove loop_factory
kaya-david Mar 25, 2026
6cee075
feat: add asyncio exception handler for unhandled errors
kaya-david Mar 25, 2026
dea4dbd
feat: improve config refresh sync/async
kaya-david Mar 25, 2026
8ac6cd2
feat: improve config refresh setup and teardown logic + improve types…
kaya-david Mar 26, 2026
4815394
refactor: adjust naming to follow Python conventions (shadowing)
kaya-david Mar 26, 2026
a2c4183
refactor: remove print
kaya-david Mar 30, 2026
50ea0b8
refactor: remove unused import
kaya-david Mar 30, 2026
6b6990c
refactor: simplify config refresh
kaya-david Mar 31, 2026
d8422c1
refactor: annotation
kaya-david Mar 31, 2026
99ef937
refactor: improve exception handler
kaya-david Mar 31, 2026
74a1b43
fix: correct kafka delivery semantics and unify async shutdown lifecy…
kaya-david Apr 7, 2026
47ad525
fix: prevent race condition between SIGINT handler and benchmark flow
kaya-david Mar 31, 2026
47c22c8
refactor: simplify worker shutdown after timeout
kaya-david Mar 31, 2026
b61b83f
fix: clean up exporter port before and after logprep runs
kaya-david Mar 31, 2026
b97d924
refactor: restore _shut_down hook to preserve idempotent and extensib…
kaya-david Apr 2, 2026
99cd7ec
refactor: review issues
kaya-david Apr 2, 2026
6062160
refactor: remove unsubscribe call, as close() already handles cleanup…
kaya-david Apr 7, 2026
6d8bb81
feat: migrate to async AIOProducer and replace on_delivery callbacks …
kaya-david Apr 7, 2026
64656bf
refactor: fix review issue
kaya-david Apr 7, 2026
b87ba3b
refactor: rename module to logging_helpers to avoid stdlib name clash…
kaya-david Apr 7, 2026
d0d2d72
refactor: remove unused constant MAX_CONFIG_REFRESH_INTERVAL_DEVIATIO…
kaya-david Apr 7, 2026
db44fe8
refactor: fix review issue
kaya-david Apr 7, 2026
bcba7a9
refactor: guard cached _search_context on shutdown and remove unused …
kaya-david Apr 7, 2026
f669bca
fix: make callbacks async
kaya-david Apr 7, 2026
e600bf5
fix: set event state to STORING_IN_OUTPUT
kaya-david Apr 7, 2026
a4530ab
fix: fix config refresh, remove config scheduler, small adaptions
kaya-david Mar 25, 2026
5c4cf30
WIP
kaya-david Mar 31, 2026
42d99da
WIP
kaya-david Apr 2, 2026
5ce902d
WIP
kaya-david Apr 7, 2026
8307ed7
WIP
kaya-david Apr 8, 2026
3cd5cee
fix: avoid adding None items in worker queue mode as well
kaya-david Apr 9, 2026
615617d
remove benchmark results and extra scripts
mhoff Apr 10, 2026
74107d2
disentangle ng and non-ng
mhoff Apr 10, 2026
2863ad9
cleanup, simplify and fix mypy issues
mhoff Apr 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
"version": "0.2.0",
"configurations": [
{
"name": "Debug non-ng example pipeline",
"name": "non-ng example pipeline",
"type": "debugpy",
"request": "launch",
"program": "logprep/run_logprep.py",
"console": "integratedTerminal",
"console": "internalConsole",
"args": [
"run",
"examples/exampledata/config/pipeline.yml"
Expand All @@ -15,6 +15,21 @@
"PROMETHEUS_MULTIPROC_DIR": "tmp/logprep"
},
"justMyCode": false
},
{
"name": "ng example pipeline",
"type": "debugpy",
"request": "launch",
"program": "logprep/run_ng.py",
"console": "internalConsole",
"args": [
"run",
"examples/exampledata/config/ng_pipeline.yml"
],
"env": {
"PROMETHEUS_MULTIPROC_DIR": "tmp/logprep"
},
"justMyCode": false
}
]
}
1 change: 1 addition & 0 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ def benchmark_run(
binary = "logprep-ng" if ng == 1 else "logprep"

t_startup = time.time()

logprep_proc = popen_cmd([binary, "run", str(pipeline_config)], env=env)
_current_logprep_proc = logprep_proc

Expand Down
125 changes: 125 additions & 0 deletions examples/exampledata/config/_benchmark_ng_pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
version: 2
process_count: 1
timeout: 5.0
restart_count: 2
config_refresh_interval: 5
error_backlog_size: 15000
logger:
level: DEBUG
format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
loggers:
"py.warnings": {"level": "ERROR"}
"Runner": {"level": "INFO"}
"Processor": {"level": "ERROR"}
"Exporter": {"level": "ERROR"}
"uvicorn": {"level": "ERROR"}
"uvicorn.access": {"level": "ERROR"}
"OpenSearchOutput": {"level": "DEBUG"}
"KafkaOutput": {"level": "ERROR"}
"Input": {"level": "ERROR"}
metrics:
enabled: true
port: 8001
pipeline:
- labelername:
type: ng_labeler
schema: examples/exampledata/rules/labeler/schema.json
include_parent_labels: true
rules:
- examples/exampledata/rules/labeler/rules
- dissector:
type: ng_dissector
rules:
- examples/exampledata/rules/dissector/rules
- dropper:
type: ng_dropper
rules:
- examples/exampledata/rules/dropper/rules
- filter: "test_dropper"
dropper:
drop:
- drop_me
description: "..."
- pre_detector:
type: ng_pre_detector
rules:
- examples/exampledata/rules/pre_detector/rules
outputs:
- opensearch: sre
tree_config: examples/exampledata/rules/pre_detector/tree_config.json
alert_ip_list_path: examples/exampledata/rules/pre_detector/alert_ips.yml
- amides:
type: ng_amides
rules:
- examples/exampledata/rules/amides/rules
models_path: examples/exampledata/models/model.zip
num_rule_attributions: 10
max_cache_entries: 1000000
decision_threshold: 0.32
- pseudonymizer:
type: ng_pseudonymizer
pubkey_analyst: examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem
pubkey_depseudo: examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem
regex_mapping: examples/exampledata/rules/pseudonymizer/regex_mapping.yml
hash_salt: a_secret_tasty_ingredient
outputs:
- opensearch: pseudonyms
rules:
- examples/exampledata/rules/pseudonymizer/rules/
max_cached_pseudonyms: 1000000
- calculator:
type: ng_calculator
rules:
- filter: "test_label: execute"
calculator:
target_field: "calculation"
calc: "1 + 1"
input:
kafka:
type: ng_confluentkafka_input
topic: consumer
kafka_config:
bootstrap.servers: 127.0.0.1:9092
group.id: cgroup3
enable.auto.commit: "true"
auto.commit.interval.ms: "10000"
enable.auto.offset.store: "false"
queued.min.messages: "100000"
queued.max.messages.kbytes: "65536"
statistics.interval.ms: "60000"
preprocessing:
version_info_target_field: Logprep_version_info
log_arrival_time_target_field: event.ingested
hmac:
target: <RAW_MSG>
key: "thisisasecureandrandomkey"
output_field: Full_event
output:
opensearch:
type: ng_opensearch_output
hosts:
- 127.0.0.1:9200
default_index: processed
default_op_type: create
message_backlog_size: 2500
timeout: 10000
flush_timeout: 60
user: admin
secret: admin
desired_cluster_status: ["green", "yellow"]
chunk_size: 25
error_output:
opensearch:
type: ng_opensearch_output
hosts:
- 127.0.0.1:9200
default_index: errors
default_op_type: create
message_backlog_size: 2500
timeout: 10000
flush_timeout: 60
user: admin
secret: admin
desired_cluster_status: ["green", "yellow"]
chunk_size: 25
125 changes: 125 additions & 0 deletions examples/exampledata/config/_benchmark_non_ng_pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
version: 2
process_count: 1
timeout: 5.0
restart_count: 2
config_refresh_interval: 5
error_backlog_size: 15000
logger:
level: DEBUG
format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
loggers:
"py.warnings": {"level": "ERROR"}
"Runner": {"level": "INFO"}
"Processor": {"level": "ERROR"}
"Exporter": {"level": "ERROR"}
"uvicorn": {"level": "ERROR"}
"uvicorn.access": {"level": "ERROR"}
"OpenSearchOutput": {"level": "DEBUG"}
"KafkaOutput": {"level": "ERROR"}
"Input": {"level": "ERROR"}
metrics:
enabled: true
port: 8001
pipeline:
- labelername:
type: labeler
schema: examples/exampledata/rules/labeler/schema.json
include_parent_labels: true
rules:
- examples/exampledata/rules/labeler/rules
- dissector:
type: dissector
rules:
- examples/exampledata/rules/dissector/rules
- dropper:
type: dropper
rules:
- examples/exampledata/rules/dropper/rules
- filter: "test_dropper"
dropper:
drop:
- drop_me
description: "..."
- pre_detector:
type: pre_detector
rules:
- examples/exampledata/rules/pre_detector/rules
outputs:
- opensearch: sre
tree_config: examples/exampledata/rules/pre_detector/tree_config.json
alert_ip_list_path: examples/exampledata/rules/pre_detector/alert_ips.yml
- amides:
type: amides
rules:
- examples/exampledata/rules/amides/rules
models_path: examples/exampledata/models/model.zip
num_rule_attributions: 10
max_cache_entries: 1000000
decision_threshold: 0.32
- pseudonymizer:
type: pseudonymizer
pubkey_analyst: examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem
pubkey_depseudo: examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem
regex_mapping: examples/exampledata/rules/pseudonymizer/regex_mapping.yml
hash_salt: a_secret_tasty_ingredient
outputs:
- opensearch: pseudonyms
rules:
- examples/exampledata/rules/pseudonymizer/rules/
max_cached_pseudonyms: 1000000
- calculator:
type: calculator
rules:
- filter: "test_label: execute"
calculator:
target_field: "calculation"
calc: "1 + 1"
input:
kafka:
type: confluentkafka_input
topic: consumer
kafka_config:
bootstrap.servers: 127.0.0.1:9092
group.id: cgroup3
enable.auto.commit: "true"
auto.commit.interval.ms: "10000"
enable.auto.offset.store: "false"
queued.min.messages: "100000"
queued.max.messages.kbytes: "65536"
statistics.interval.ms: "60000"
preprocessing:
version_info_target_field: Logprep_version_info
log_arrival_time_target_field: event.ingested
hmac:
target: <RAW_MSG>
key: "thisisasecureandrandomkey"
output_field: Full_event
output:
opensearch:
type: opensearch_output
hosts:
- 127.0.0.1:9200
default_index: processed
default_op_type: create
message_backlog_size: 2500
timeout: 10000
flush_timeout: 60
user: admin
secret: admin
desired_cluster_status: ["green", "yellow"]
chunk_size: 25
error_output:
opensearch:
type: opensearch_output
hosts:
- 127.0.0.1:9200
default_index: errors
default_op_type: create
message_backlog_size: 2500
timeout: 10000
flush_timeout: 60
user: admin
secret: admin
desired_cluster_status: ["green", "yellow"]
chunk_size: 25
1 change: 1 addition & 0 deletions examples/exampledata/config/ng_pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ logger:
"uvicorn.access": {"level": "ERROR"}
"OpenSearchOutput": {"level": "DEBUG"}
"KafkaOutput": {"level": "ERROR"}
"Input": {"level": "ERROR"}
metrics:
enabled: true
port: 8001
Expand Down
2 changes: 1 addition & 1 deletion logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def metric_labels(self) -> dict:
"""Labels for the metrics"""
return {"component": self._config.type, "name": self.name, "description": "", "type": ""}

def __init__(self, name: str, configuration: "Config", pipeline_index: int | None = None):
def __init__(self, name: str, configuration: Config, pipeline_index: int | None = None):
self._config = configuration
self.name = name
self.pipeline_index = pipeline_index
Expand Down
2 changes: 1 addition & 1 deletion logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def result(self, value: ProcessorResult):
self._result = value

@property
def rules(self) -> list["Rule"]:
def rules(self) -> Sequence["Rule"]:
"""Returns all rules

Returns
Expand Down
8 changes: 4 additions & 4 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class HttpEndpoint(ABC):
# pylint: disable=too-many-arguments
def __init__(
self,
messages: mp.Queue,
messages: mp.Queue[dict],
original_event_field: dict[str, str] | None,
collect_meta: bool,
metafield_name: str,
Expand Down Expand Up @@ -336,7 +336,7 @@ def put_message(self, event: dict, metadata: dict):
class JSONHttpEndpoint(HttpEndpoint):
""":code:`json` endpoint to get json from request"""

_decoder = msgspec.json.Decoder()
_decoder: msgspec.json.Decoder[dict] = msgspec.json.Decoder()

@raise_request_exceptions
@basic_auth
Expand All @@ -360,7 +360,7 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff
class JSONLHttpEndpoint(HttpEndpoint):
""":code:`jsonl` endpoint to get jsonl from request"""

_decoder = msgspec.json.Decoder()
_decoder: msgspec.json.Decoder[dict] = msgspec.json.Decoder()

@raise_request_exceptions
@basic_auth
Expand Down Expand Up @@ -555,7 +555,7 @@ def __attrs_post_init__(self):

__slots__: list[str] = ["target", "app", "http_server"]

messages: typing.Optional[Queue] = None
messages: Queue[dict] | None = None

_endpoint_registry: Mapping[str, type[HttpEndpoint]] = {
"json": JSONHttpEndpoint,
Expand Down
24 changes: 11 additions & 13 deletions logprep/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Factory:
"""Create components for logprep."""

@classmethod
def create(cls, configuration: dict) -> Component | None:
def create(cls, configuration: dict) -> Component:
"""Create component."""
if configuration == {} or configuration is None:
raise InvalidConfigurationError("The component definition is empty.")
Expand All @@ -23,16 +23,14 @@ def create(cls, configuration: dict) -> Component | None:
f"Found multiple component definitions ({', '.join(configuration.keys())}),"
+ " but there must be exactly one."
)
for component_name, component_configuration_dict in configuration.items():
if configuration == {} or component_configuration_dict is None:
raise InvalidConfigurationError(
f'The definition of component "{component_name}" is empty.'
)
if not isinstance(component_configuration_dict, dict):
raise InvalidConfigSpecificationError(component_name)
component = Configuration.get_class(component_name, component_configuration_dict)
component_configuration = Configuration.create(
component_name, component_configuration_dict
# we know configuration has exactly one entry
[(component_name, component_configuration_dict)] = configuration.items()
if component_configuration_dict is None:
raise InvalidConfigurationError(
f'The definition of component "{component_name}" is empty.'
)
return component(component_name, component_configuration)
return None
if not isinstance(component_configuration_dict, dict):
raise InvalidConfigSpecificationError(component_name)
component = Configuration.get_class(component_name, component_configuration_dict)
component_configuration = Configuration.create(component_name, component_configuration_dict)
return component(component_name, component_configuration)
Loading
Loading