diff --git a/lib/distributor/distributor.go b/lib/distributor/distributor.go index fa80ba4..1409350 100644 --- a/lib/distributor/distributor.go +++ b/lib/distributor/distributor.go @@ -71,7 +71,7 @@ func (d *Distributor) HandleEvent(event provider.Event) { // Forward to all consumers for _, c := range consumers { - if err := c.HandleEvent(event); err != nil { + if err := safeHandleEvent(c, event); err != nil { log.WithFields(log.Fields{ "consumer": c.Name(), "error": err, @@ -123,6 +123,17 @@ func (d *Distributor) Correlator() *enrichment.Correlator { return d.correlator } +// safeHandleEvent wraps a consumer's HandleEvent in panic recovery so that +// a single misbehaving consumer cannot crash the entire event pipeline. +func safeHandleEvent(c EventConsumer, event provider.Event) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic in consumer %s: %v", c.Name(), r) + } + }() + return c.HandleEvent(event) +} + func enrichmentKey(id provider.EventIdentifier) string { return fmt.Sprintf("%s:%d", id.ProviderName, id.EventID) } diff --git a/lib/distributor/integration_test.go b/lib/distributor/integration_test.go index b90466c..935fe5a 100644 --- a/lib/distributor/integration_test.go +++ b/lib/distributor/integration_test.go @@ -438,6 +438,56 @@ func decodeJSONLines(t *testing.T, buf *bytes.Buffer) []map[string]interface{} { return lines } +// panicConsumer always panics when handling an event. +type panicConsumer struct{} + +func (p *panicConsumer) Name() string { return "panic-consumer" } +func (p *panicConsumer) Initialize() error { return nil } +func (p *panicConsumer) HandleEvent(_ provider.Event) error { + panic("deliberate panic in consumer") +} +func (p *panicConsumer) Close() error { return nil } + +// countConsumer counts events received. +type countConsumer struct { + count int +} + +func (c *countConsumer) Name() string { return "count-consumer" } +func (c *countConsumer) Initialize() error { return nil } +func (c *countConsumer) HandleEvent(_ provider.Event) error { c.count++; return nil } +func (c *countConsumer) Close() error { return nil } + +// TestDistributorPanicRecovery verifies that a panicking consumer does not +// crash the distributor or prevent other consumers from receiving events. +func TestDistributorPanicRecovery(t *testing.T) { + enricher := enrichment.NewEventEnricher() + correlator, err := enrichment.NewCorrelator(128) + if err != nil { + t.Fatalf("NewCorrelator: %v", err) + } + + d := distributor.New(enricher, correlator) + + counter := &countConsumer{} + d.RegisterConsumer(&panicConsumer{}) // registered first — panics on every event + d.RegisterConsumer(counter) // registered second — must still receive events + + // Create a minimal event and send it through the distributor. + eventDir := t.TempDir() + writeFile(t, eventDir, "event.jsonl", `{"EventID":1,"ProviderName":"LinuxEBPF","ProcessId":1234,"Image":"/usr/bin/test","CommandLine":"test"}`) + + rp := replay.New(filepath.Join(eventDir, "event.jsonl")) + rp.SendEvents(d.HandleEvent) + + if counter.count == 0 { + t.Fatal("count-consumer received 0 events; panic in first consumer killed the pipeline") + } + if d.Processed() == 0 { + t.Fatal("distributor processed 0 events after panic recovery") + } +} + func init() { // Suppress standard logger output during tests. log.SetOutput(io.Discard)