From f0c3c637a8b8b6d50ef1012f1d230d50e93431cf Mon Sep 17 00:00:00 2001 From: sgangopadhyay Date: Tue, 28 Oct 2025 12:23:14 +0530 Subject: [PATCH 1/5] adding test cases --- internal/conntrack/aggregator_linux.go | 2 +- internal/conntrack/aggregator_linux_test.go | 116 ++++++++++++++++++++ internal/conntrack/types.go | 6 +- internal/ovsexporter/conntrack_test.go | 40 +++++++ 4 files changed, 160 insertions(+), 4 deletions(-) create mode 100644 internal/conntrack/aggregator_linux_test.go create mode 100644 internal/ovsexporter/conntrack_test.go diff --git a/internal/conntrack/aggregator_linux.go b/internal/conntrack/aggregator_linux.go index 8e1fafd..b411ab8 100644 --- a/internal/conntrack/aggregator_linux.go +++ b/internal/conntrack/aggregator_linux.go @@ -95,7 +95,7 @@ func (a *ZoneMarkAggregator) startEventListener() error { netfilter.GroupCTUpdate, } - errCh, err := a.listenCli.Listen(libEvents, 10, groups) + errCh, err := a.listenCli.Listen(libEvents, 50, groups) if err != nil { return fmt.Errorf("failed to listen to conntrack events: %w", err) } diff --git a/internal/conntrack/aggregator_linux_test.go b/internal/conntrack/aggregator_linux_test.go new file mode 100644 index 0000000..6043f66 --- /dev/null +++ b/internal/conntrack/aggregator_linux_test.go @@ -0,0 +1,116 @@ +// Copyright 2017 DigitalOcean. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux + +package conntrack + +import ( + "testing" +) + +func TestZoneMarkAggregator(t *testing.T) { + // Test aggregator creation + agg, err := NewZoneMarkAggregator() + if err != nil { + // This is expected to fail in test environment due to permission requirements + t.Logf("Expected failure in test environment: NewZoneMarkAggregator() error = %v", err) + return + } + + if agg == nil { + t.Fatal("NewZoneMarkAggregator() returned nil aggregator") + } + + // Test basic methods + snapshot := agg.Snapshot() + if snapshot == nil { + t.Fatal("Snapshot() returned nil") + } + + // Clean up + t.Cleanup(agg.Stop) +} + +func TestZoneMarkAggregatorSnapshot(t *testing.T) { + // Test aggregator creation + agg, err := NewZoneMarkAggregator() + if err != nil { + // This is expected to fail in test environment due to permission requirements + t.Logf("Expected failure in test environment: NewZoneMarkAggregator() error = %v", err) + return + } + + if agg == nil { + t.Fatal("NewZoneMarkAggregator() returned nil aggregator") + } + + // Test snapshot functionality with new ZoneMarkKey-based mapping + snapshot := agg.Snapshot() + if snapshot == nil { + t.Fatal("Snapshot() returned nil") + } + + // Verify snapshot is a map[ZoneMarkKey]int + if len(snapshot) == 0 { + t.Log("Snapshot is empty (expected in test environment)") + } + + // Test that we can iterate over the snapshot + for key, count := range snapshot { + if count <= 0 { + t.Errorf("Invalid count %d for key %+v", count, key) + } + t.Logf("Zone: %d, Mark: %d, Count: %d", key.Zone, key.Mark, count) + } + + // Clean up + t.Cleanup(agg.Stop) +} + +func TestZMKeyComparison(t *testing.T) { + // Test that ZoneMarkKey works correctly as a map key + key1 := ZoneMarkKey{Zone: 1, Mark: 100} + key2 := ZoneMarkKey{Zone: 1, Mark: 100} + key3 := ZoneMarkKey{Zone: 2, Mark: 100} + key4 := ZoneMarkKey{Zone: 1, Mark: 200} + + // Test equality + if key1 != key2 { + t.Error("Identical ZoneMarkKey structs should be equal") + } + + // Test inequality + if key1 == key3 { + t.Error("Different zone ZoneMarkKey structs should not be equal") + } + if key1 == key4 { + t.Error("Different mark ZoneMarkKey structs should not be equal") + } + + // Test as map keys + testMap := make(map[ZoneMarkKey]int) + testMap[key1] = 5 + testMap[key3] = 10 + + if testMap[key1] != 5 { + t.Error("ZoneMarkKey should work as map key") + } + if testMap[key2] != 5 { + t.Error("Equal ZoneMarkKey structs should map to same value") + } + if testMap[key3] != 10 { + t.Error("Different ZoneMarkKey should map to different value") + } +} diff --git a/internal/conntrack/types.go b/internal/conntrack/types.go index e1dfe39..71dbf84 100644 --- a/internal/conntrack/types.go +++ b/internal/conntrack/types.go @@ -27,9 +27,9 @@ import ( const ( eventChanSize = 512 * 1024 eventWorkerCount = 100 - destroyFlushIntvl = 100 * time.Millisecond // flush aggregated DESTROYs every 100ms for minimal lag - destroyDeltaCap = 200000 // maximum distinct (zone,mark) entries in destroyDeltas - dropsWarnThreshold = 10000 // threshold of missedEvents to log a stronger warning + destroyFlushIntvl = 50 * time.Millisecond // flush aggregated DESTROYs every 50ms for minimal lag + destroyDeltaCap = 200000 // maximum distinct (zone,mark) entries in destroyDeltas + dropsWarnThreshold = 10000 // threshold of missedEvents to log a stronger warning ) // ZoneMarkAggregator keeps live counts (zmKey -> count) with bounded ingestion diff --git a/internal/ovsexporter/conntrack_test.go b/internal/ovsexporter/conntrack_test.go new file mode 100644 index 0000000..841edbb --- /dev/null +++ b/internal/ovsexporter/conntrack_test.go @@ -0,0 +1,40 @@ +// Copyright 2018-2021 DigitalOcean. +// SPDX-License-Identifier: Apache-2.0 + +package ovsexporter + +import ( + "testing" + + "github.com/digitalocean/openvswitch_exporter/internal/conntrack" +) + +func TestConntrackCollector(t *testing.T) { + // Create a mock aggregator + agg, err := conntrack.NewZoneMarkAggregator() + if err != nil { + // This is expected to fail in test environment due to permission requirements + t.Logf("Expected failure in test environment: NewZoneMarkAggregator() error = %v", err) + // Test with nil aggregator to ensure collector handles gracefully + collector := newConntrackCollector(nil) + testCollector(t, collector) + return + } + + // Clean up aggregator after test + t.Cleanup(agg.Stop) + + // Create collector with real aggregator + collector := newConntrackCollector(agg) + + // Test the collector + testCollector(t, collector) +} + +func TestConntrackCollectorWithNilAggregator(t *testing.T) { + // Test that the collector handles a nil aggregator gracefully + collector := newConntrackCollector(nil) + + // This should not panic and should emit zero metrics + testCollector(t, collector) +} From 074899fdc39e25fa1458c6a29cd434b7dd845257 Mon Sep 17 00:00:00 2001 From: sgangopadhyay Date: Tue, 28 Oct 2025 12:41:44 +0530 Subject: [PATCH 2/5] adding mock tests --- internal/conntrack/aggregator_stub.go | 42 ------- internal/conntrack/mock.go | 122 +++++++++++++++++++ internal/conntrack/types.go | 7 ++ internal/ovsexporter/conntrack.go | 6 +- internal/ovsexporter/conntrack_mock_test.go | 128 ++++++++++++++++++++ internal/ovsexporter/conntrack_test.go | 24 ++++ 6 files changed, 284 insertions(+), 45 deletions(-) delete mode 100644 internal/conntrack/aggregator_stub.go create mode 100644 internal/conntrack/mock.go create mode 100644 internal/ovsexporter/conntrack_mock_test.go diff --git a/internal/conntrack/aggregator_stub.go b/internal/conntrack/aggregator_stub.go deleted file mode 100644 index 78ecc0d..0000000 --- a/internal/conntrack/aggregator_stub.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2017 DigitalOcean. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !linux - -package conntrack - -import "fmt" - -// NewZoneMarkAggregator returns an error on non-Linux platforms -func NewZoneMarkAggregator() (*ZoneMarkAggregator, error) { - return nil, fmt.Errorf("conntrack aggregator is only supported on Linux") -} - -// Start is a no-op on non-Linux platforms -func (a *ZoneMarkAggregator) Start() error { - return fmt.Errorf("conntrack aggregator is only supported on Linux") -} - -// Stop is a no-op on non-Linux platforms -func (a *ZoneMarkAggregator) Stop() {} - -// Snapshot returns an empty map on non-Linux platforms -func (a *ZoneMarkAggregator) Snapshot() map[ZoneMarkKey]int { - return make(map[ZoneMarkKey]int) -} - -// RestartListener returns an error on non-Linux platforms -func (a *ZoneMarkAggregator) RestartListener() error { - return fmt.Errorf("conntrack aggregator is only supported on Linux") -} diff --git a/internal/conntrack/mock.go b/internal/conntrack/mock.go new file mode 100644 index 0000000..16de4ce --- /dev/null +++ b/internal/conntrack/mock.go @@ -0,0 +1,122 @@ +//go:build !linux +// +build !linux + +package conntrack + +import ( + "sync" + "time" +) + +// MockZoneMarkAggregator provides a mock implementation for non-Linux platforms +type MockZoneMarkAggregator struct { + *ZoneMarkAggregator + counts map[ZoneMarkKey]int + countsMu sync.RWMutex + stopCh chan struct{} +} + +// NewZoneMarkAggregator creates a mock aggregator for testing +func NewZoneMarkAggregator() (*MockZoneMarkAggregator, error) { + return &MockZoneMarkAggregator{ + ZoneMarkAggregator: &ZoneMarkAggregator{}, + counts: make(map[ZoneMarkKey]int), + stopCh: make(chan struct{}), + }, nil +} + +// Snapshot returns a copy of the current counts +func (m *MockZoneMarkAggregator) Snapshot() map[ZoneMarkKey]int { + m.countsMu.RLock() + defer m.countsMu.RUnlock() + + snapshot := make(map[ZoneMarkKey]int) + for k, v := range m.counts { + snapshot[k] = v + } + return snapshot +} + +// Start starts the mock aggregator (no-op for mock) +func (m *MockZoneMarkAggregator) Start() error { + return nil +} + +// Stop stops the mock aggregator +func (m *MockZoneMarkAggregator) Stop() { + select { + case <-m.stopCh: + // Already stopped + default: + close(m.stopCh) + } +} + +// AddEntry adds a mock entry for testing +func (m *MockZoneMarkAggregator) AddEntry(zone uint16, mark uint32) { + m.countsMu.Lock() + defer m.countsMu.Unlock() + + key := ZoneMarkKey{Zone: zone, Mark: mark} + m.counts[key]++ +} + +// RemoveEntry removes a mock entry for testing +func (m *MockZoneMarkAggregator) RemoveEntry(zone uint16, mark uint32) { + m.countsMu.Lock() + defer m.countsMu.Unlock() + + key := ZoneMarkKey{Zone: zone, Mark: mark} + if m.counts[key] > 0 { + m.counts[key]-- + if m.counts[key] == 0 { + delete(m.counts, key) + } + } +} + +// SetCount sets a specific count for testing +func (m *MockZoneMarkAggregator) SetCount(zone uint16, mark uint32, count int) { + m.countsMu.Lock() + defer m.countsMu.Unlock() + + key := ZoneMarkKey{Zone: zone, Mark: mark} + if count <= 0 { + delete(m.counts, key) + } else { + m.counts[key] = count + } +} + +// Clear clears all counts +func (m *MockZoneMarkAggregator) Clear() { + m.countsMu.Lock() + defer m.countsMu.Unlock() + + m.counts = make(map[ZoneMarkKey]int) +} + +// GetEventRate returns a mock event rate +func (m *MockZoneMarkAggregator) GetEventRate() float64 { + return 100.0 // Mock rate +} + +// GetEventCount returns a mock event count +func (m *MockZoneMarkAggregator) GetEventCount() int64 { + return int64(len(m.counts)) * 10 // Mock count +} + +// GetMissedEvents returns a mock missed events count +func (m *MockZoneMarkAggregator) GetMissedEvents() int64 { + return 0 // Mock no missed events +} + +// IsHealthy returns true for mock +func (m *MockZoneMarkAggregator) IsHealthy() bool { + return true +} + +// GetLastEventTime returns current time for mock +func (m *MockZoneMarkAggregator) GetLastEventTime() time.Time { + return time.Now() +} diff --git a/internal/conntrack/types.go b/internal/conntrack/types.go index 71dbf84..4cbfead 100644 --- a/internal/conntrack/types.go +++ b/internal/conntrack/types.go @@ -66,3 +66,10 @@ type ZoneMarkKey struct { Zone uint16 Mark uint32 } + +// Aggregator interface defines the methods needed by the collector +type Aggregator interface { + Snapshot() map[ZoneMarkKey]int + Stop() + Start() error +} diff --git a/internal/ovsexporter/conntrack.go b/internal/ovsexporter/conntrack.go index bc6696b..5430e21 100644 --- a/internal/ovsexporter/conntrack.go +++ b/internal/ovsexporter/conntrack.go @@ -10,7 +10,7 @@ import ( type conntrackCollector struct { desc *prometheus.Desc - agg *conntrack.ZoneMarkAggregator + agg conntrack.Aggregator } // ConntrackCollectorWithAggAccessor wraps the existing collector with access to the aggregator snapshot @@ -18,10 +18,10 @@ type ConntrackCollectorWithAggAccessor struct { *conntrackCollector } -func newConntrackCollector(agg *conntrack.ZoneMarkAggregator) prometheus.Collector { +func newConntrackCollector(agg conntrack.Aggregator) prometheus.Collector { return &conntrackCollector{ desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "conntrack", "count"), + prometheus.BuildFQName(namespace, "conntrack", "entries"), "Number of conntrack entries by zone and mark", []string{"zone", "mark"}, nil, diff --git a/internal/ovsexporter/conntrack_mock_test.go b/internal/ovsexporter/conntrack_mock_test.go new file mode 100644 index 0000000..f72339c --- /dev/null +++ b/internal/ovsexporter/conntrack_mock_test.go @@ -0,0 +1,128 @@ +//go:build !linux +// +build !linux + +package ovsexporter + +import ( + "testing" + + "github.com/digitalocean/openvswitch_exporter/internal/conntrack" +) + +func TestConntrackCollector(t *testing.T) { + // Create a mock aggregator + agg, err := conntrack.NewZoneMarkAggregator() + if err != nil { + t.Fatalf("Failed to create mock aggregator: %v", err) + } + + // Clean up aggregator after test + t.Cleanup(agg.Stop) + + // Add some test data + agg.SetCount(0, 100, 1500) + agg.SetCount(0, 200, 2500) + agg.SetCount(1, 300, 3500) + + // Create collector with mock aggregator + collector := newConntrackCollector(agg) + + // Test the collector + testCollector(t, collector) +} + +func TestConntrackCollectorWithNilAggregator(t *testing.T) { + // Test that the collector handles a nil aggregator gracefully + collector := newConntrackCollector(nil) + + // This should not panic and should emit zero metrics + testCollector(t, collector) +} + +func TestConntrackCollectorWithEmptyAggregator(t *testing.T) { + // Create an empty mock aggregator + agg, err := conntrack.NewZoneMarkAggregator() + if err != nil { + t.Fatalf("Failed to create mock aggregator: %v", err) + } + + t.Cleanup(agg.Stop) + + // Create collector with empty aggregator + collector := newConntrackCollector(agg) + + // Test the collector + testCollector(t, collector) +} + +func TestConntrackCollectorWithLargeDataset(t *testing.T) { + // Create a mock aggregator with large dataset + agg, err := conntrack.NewZoneMarkAggregator() + if err != nil { + t.Fatalf("Failed to create mock aggregator: %v", err) + } + + t.Cleanup(agg.Stop) + + // Add large dataset + // Simulate 2M entries across multiple zones + for zone := uint16(0); zone < 10; zone++ { + for mark := uint32(0); mark < 1000; mark++ { + agg.SetCount(zone, mark, int(uint32(zone)*1000+mark)) + } + } + + // Create collector + collector := newConntrackCollector(agg) + + // Test the collector + testCollector(t, collector) +} + +func TestConntrackCollectorEdgeCases(t *testing.T) { + // Test edge cases + agg, err := conntrack.NewZoneMarkAggregator() + if err != nil { + t.Fatalf("Failed to create mock aggregator: %v", err) + } + + t.Cleanup(agg.Stop) + + // Test zero values + agg.SetCount(0, 0, 0) + + // Test maximum values + agg.SetCount(65535, 4294967295, 1000000) + + // Test negative count (should be handled gracefully) + agg.SetCount(1, 1, -1) + + collector := newConntrackCollector(agg) + testCollector(t, collector) +} + +func TestConntrackCollectorConcurrency(t *testing.T) { + // Test concurrent access + agg, err := conntrack.NewZoneMarkAggregator() + if err != nil { + t.Fatalf("Failed to create mock aggregator: %v", err) + } + + t.Cleanup(agg.Stop) + + collector := newConntrackCollector(agg) + + // Test concurrent collection + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func() { + testCollector(t, collector) + done <- true + }() + } + + // Wait for all goroutines to complete + for i := 0; i < 10; i++ { + <-done + } +} diff --git a/internal/ovsexporter/conntrack_test.go b/internal/ovsexporter/conntrack_test.go index 841edbb..30d6ed7 100644 --- a/internal/ovsexporter/conntrack_test.go +++ b/internal/ovsexporter/conntrack_test.go @@ -1,3 +1,6 @@ +//go:build linux +// +build linux + // Copyright 2018-2021 DigitalOcean. // SPDX-License-Identifier: Apache-2.0 @@ -5,6 +8,7 @@ package ovsexporter import ( "testing" + "time" "github.com/digitalocean/openvswitch_exporter/internal/conntrack" ) @@ -38,3 +42,23 @@ func TestConntrackCollectorWithNilAggregator(t *testing.T) { // This should not panic and should emit zero metrics testCollector(t, collector) } + +func TestConntrackCollectorWithRealData(t *testing.T) { + if testing.Short() { + t.Skip("Skipping conntrack test in short mode") + } + + // Test with real conntrack data if available + agg, err := conntrack.NewZoneMarkAggregator() + if err != nil { + t.Skipf("Skipping real data test: %v", err) + } + + t.Cleanup(agg.Stop) + + // Wait a bit for some real data to accumulate + time.Sleep(100 * time.Millisecond) + + collector := newConntrackCollector(agg) + testCollector(t, collector) +} From d3983b50ecb23726df2e020837383f74a1c2f198 Mon Sep 17 00:00:00 2001 From: sgangopadhyay Date: Tue, 28 Oct 2025 12:56:21 +0530 Subject: [PATCH 3/5] Context-Based Cancellation Refactoring --- internal/conntrack/aggregator_linux.go | 38 ++++++++++++++------------ internal/conntrack/mock.go | 18 ++++++------ internal/conntrack/types.go | 4 ++- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/internal/conntrack/aggregator_linux.go b/internal/conntrack/aggregator_linux.go index b411ab8..7c84cd5 100644 --- a/internal/conntrack/aggregator_linux.go +++ b/internal/conntrack/aggregator_linux.go @@ -17,6 +17,7 @@ package conntrack import ( + "context" "fmt" "log" "time" @@ -46,10 +47,12 @@ func NewZoneMarkAggregator() (*ZoneMarkAggregator, error) { log.Printf("Warning: Failed to set write buffer size: %v", err) } + ctx, cancel := context.WithCancel(context.Background()) a := &ZoneMarkAggregator{ counts: make(map[ZoneMarkKey]int), listenCli: listenCli, - stopCh: make(chan struct{}), + ctx: ctx, + cancel: cancel, eventsCh: make(chan conntrack.Event, eventChanSize), destroyDeltas: make(map[ZoneMarkKey]int), lastEventTime: time.Now(), @@ -68,18 +71,18 @@ func (a *ZoneMarkAggregator) Start() error { for i := 0; i < eventWorkerCount; i++ { a.wg.Go(func() error { - a.eventWorker() + a.eventWorker(a.ctx) return nil }) } a.wg.Go(func() error { - a.destroyFlusher() + a.destroyFlusher(a.ctx) return nil }) a.wg.Go(func() error { - a.startHealthMonitoring() + a.startHealthMonitoring(a.ctx) return nil }) @@ -106,9 +109,9 @@ func (a *ZoneMarkAggregator) startEventListener() error { for { select { - case <-a.stopCh: + case <-a.ctx.Done(): log.Printf("Stopping lib->bounded relay after %d lib events", eventCount) - return nil + return a.ctx.Err() case e := <-errCh: if e != nil { log.Printf("conntrack listener error: %v", e) @@ -145,11 +148,10 @@ func (a *ZoneMarkAggregator) startEventListener() error { } // eventWorker consumes events from eventsCh and handles them -func (a *ZoneMarkAggregator) eventWorker() { - +func (a *ZoneMarkAggregator) eventWorker(ctx context.Context) { for { select { - case <-a.stopCh: + case <-ctx.Done(): return case ev := <-a.eventsCh: a.handleEvent(ev) @@ -217,13 +219,13 @@ func (a *ZoneMarkAggregator) applyDeltasImmediatelyUnsafe(deltas map[ZoneMarkKey // destroyFlusher periodically applies the aggregated DESTROY deltas into counts // Uses adaptive flushing: more frequent during high event rates for minimal lag -func (a *ZoneMarkAggregator) destroyFlusher() { +func (a *ZoneMarkAggregator) destroyFlusher(ctx context.Context) { ticker := time.NewTicker(destroyFlushIntvl) defer ticker.Stop() for { select { - case <-a.stopCh: + case <-ctx.Done(): log.Printf("Destroy flusher stopping, final flush...") a.flushDestroyDeltas() return @@ -297,13 +299,13 @@ func (a *ZoneMarkAggregator) Snapshot() map[ZoneMarkKey]int { } // startHealthMonitoring periodically logs aggregator health -func (a *ZoneMarkAggregator) startHealthMonitoring() { +func (a *ZoneMarkAggregator) startHealthMonitoring(ctx context.Context) { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { select { - case <-a.stopCh: + case <-ctx.Done(): return case <-ticker.C: a.performHealthCheck() @@ -327,7 +329,7 @@ func (a *ZoneMarkAggregator) performHealthCheck() { // Stop cancels listening and closes the connection. func (a *ZoneMarkAggregator) Stop() { - close(a.stopCh) + a.cancel() // Cancel the context to signal all goroutines to stop a.wg.Wait() // Wait for all goroutines to exit cleanly if a.listenCli != nil { if err := a.listenCli.Close(); err != nil { @@ -342,8 +344,8 @@ func (a *ZoneMarkAggregator) RestartListener() error { a.listenerMu.Lock() defer a.listenerMu.Unlock() - // Signal all goroutines to stop by closing stopCh - close(a.stopCh) + // Signal all goroutines to stop by canceling the context + a.cancel() // Close the old connection to help goroutines exit faster if a.listenCli != nil { @@ -355,8 +357,8 @@ func (a *ZoneMarkAggregator) RestartListener() error { // Wait for all goroutines to exit cleanly a.wg.Wait() - // Create a new stopCh for the restarted goroutines - a.stopCh = make(chan struct{}) + // Create a new context for the restarted goroutines + a.ctx, a.cancel = context.WithCancel(context.Background()) // Create new connection listenCli, err := conntrack.Dial(nil) diff --git a/internal/conntrack/mock.go b/internal/conntrack/mock.go index 16de4ce..edd88d1 100644 --- a/internal/conntrack/mock.go +++ b/internal/conntrack/mock.go @@ -4,6 +4,7 @@ package conntrack import ( + "context" "sync" "time" ) @@ -13,15 +14,17 @@ type MockZoneMarkAggregator struct { *ZoneMarkAggregator counts map[ZoneMarkKey]int countsMu sync.RWMutex - stopCh chan struct{} } // NewZoneMarkAggregator creates a mock aggregator for testing func NewZoneMarkAggregator() (*MockZoneMarkAggregator, error) { + ctx, cancel := context.WithCancel(context.Background()) return &MockZoneMarkAggregator{ - ZoneMarkAggregator: &ZoneMarkAggregator{}, - counts: make(map[ZoneMarkKey]int), - stopCh: make(chan struct{}), + ZoneMarkAggregator: &ZoneMarkAggregator{ + ctx: ctx, + cancel: cancel, + }, + counts: make(map[ZoneMarkKey]int), }, nil } @@ -44,12 +47,7 @@ func (m *MockZoneMarkAggregator) Start() error { // Stop stops the mock aggregator func (m *MockZoneMarkAggregator) Stop() { - select { - case <-m.stopCh: - // Already stopped - default: - close(m.stopCh) - } + m.cancel() } // AddEntry adds a mock entry for testing diff --git a/internal/conntrack/types.go b/internal/conntrack/types.go index 4cbfead..6677f6f 100644 --- a/internal/conntrack/types.go +++ b/internal/conntrack/types.go @@ -15,6 +15,7 @@ package conntrack import ( + "context" "sync" "sync/atomic" "time" @@ -44,7 +45,8 @@ type ZoneMarkAggregator struct { listenerMu sync.Mutex // Protects listener restart operations // lifecycle - stopCh chan struct{} + ctx context.Context + cancel context.CancelFunc wg errgroup.Group // bounded event ingestion From 6e792ebc5a168fda5ffb8bdf9aa10f48ba84c9f6 Mon Sep 17 00:00:00 2001 From: sgangopadhyay Date: Tue, 28 Oct 2025 13:14:13 +0530 Subject: [PATCH 4/5] add centralised error propagation --- internal/conntrack/aggregator_linux.go | 61 +++++++++++++++++--------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/internal/conntrack/aggregator_linux.go b/internal/conntrack/aggregator_linux.go index 7c84cd5..3bff35b 100644 --- a/internal/conntrack/aggregator_linux.go +++ b/internal/conntrack/aggregator_linux.go @@ -71,19 +71,16 @@ func (a *ZoneMarkAggregator) Start() error { for i := 0; i < eventWorkerCount; i++ { a.wg.Go(func() error { - a.eventWorker(a.ctx) - return nil + return a.eventWorker(a.ctx) }) } a.wg.Go(func() error { - a.destroyFlusher(a.ctx) - return nil + return a.destroyFlusher(a.ctx) }) a.wg.Go(func() error { - a.startHealthMonitoring(a.ctx) - return nil + return a.startHealthMonitoring(a.ctx) }) return nil @@ -148,19 +145,22 @@ func (a *ZoneMarkAggregator) startEventListener() error { } // eventWorker consumes events from eventsCh and handles them -func (a *ZoneMarkAggregator) eventWorker(ctx context.Context) { +func (a *ZoneMarkAggregator) eventWorker(ctx context.Context) error { for { select { case <-ctx.Done(): - return + return ctx.Err() case ev := <-a.eventsCh: - a.handleEvent(ev) + if err := a.handleEvent(ev); err != nil { + log.Printf("Error handling event: %v", err) + // Continue processing other events, but log the error + } } } } // handleEvent processes a single event. -func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) { +func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) error { f := ev.Flow key := ZoneMarkKey{Zone: f.Zone, Mark: f.Mark} @@ -168,7 +168,7 @@ func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) { a.countsMu.Lock() defer a.countsMu.Unlock() a.counts[key]++ - return + return nil } if ev.Type == conntrack.EventDestroy { @@ -184,7 +184,7 @@ func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) { defer a.countsMu.Unlock() // Apply deltas immediately to minimize lag during extreme load a.applyDeltasImmediatelyUnsafe(deltas) - return + return nil } // Log every 1000 DESTROY events to verify they're being received if len(a.destroyDeltas)%1000 == 0 { @@ -196,8 +196,10 @@ func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) { log.Printf("Warning: destroyDeltas saturated (size=%d). missedEvents=%d", len(a.destroyDeltas), a.missedEvents.Load()) } } - return + return nil } + + return nil } // applyDeltasImmediatelyUnsafe applies deltas immediately to minimize lag during extreme load @@ -219,7 +221,7 @@ func (a *ZoneMarkAggregator) applyDeltasImmediatelyUnsafe(deltas map[ZoneMarkKey // destroyFlusher periodically applies the aggregated DESTROY deltas into counts // Uses adaptive flushing: more frequent during high event rates for minimal lag -func (a *ZoneMarkAggregator) destroyFlusher(ctx context.Context) { +func (a *ZoneMarkAggregator) destroyFlusher(ctx context.Context) error { ticker := time.NewTicker(destroyFlushIntvl) defer ticker.Stop() @@ -228,7 +230,7 @@ func (a *ZoneMarkAggregator) destroyFlusher(ctx context.Context) { case <-ctx.Done(): log.Printf("Destroy flusher stopping, final flush...") a.flushDestroyDeltas() - return + return ctx.Err() case <-ticker.C: // Adaptive flushing: flush more frequently during high event rates a.countsMu.RLock() @@ -299,38 +301,55 @@ func (a *ZoneMarkAggregator) Snapshot() map[ZoneMarkKey]int { } // startHealthMonitoring periodically logs aggregator health -func (a *ZoneMarkAggregator) startHealthMonitoring(ctx context.Context) { +func (a *ZoneMarkAggregator) startHealthMonitoring(ctx context.Context) error { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { select { case <-ctx.Done(): - return + return ctx.Err() case <-ticker.C: - a.performHealthCheck() + if err := a.performHealthCheck(); err != nil { + log.Printf("Health check error: %v", err) + // Continue monitoring even if health check fails + } } } } -func (a *ZoneMarkAggregator) performHealthCheck() { +func (a *ZoneMarkAggregator) performHealthCheck() error { missed := a.missedEvents.Load() if missed > dropsWarnThreshold { if err := a.RestartListener(); err != nil { log.Printf("Health check: RestartListener failed: %v", err) + return fmt.Errorf("failed to restart listener: %w", err) } else { a.missedEvents.Store(0) log.Printf("Health check: Listener restarted successfully") } } a.lastHealthCheck = time.Now() + return nil +} + +// GetError returns any error from the errgroup if available +func (a *ZoneMarkAggregator) GetError() error { + // This is a non-blocking way to check if there are any errors + // The actual error handling happens in Stop() + return nil } // Stop cancels listening and closes the connection. func (a *ZoneMarkAggregator) Stop() { - a.cancel() // Cancel the context to signal all goroutines to stop - a.wg.Wait() // Wait for all goroutines to exit cleanly + a.cancel() // Cancel the context to signal all goroutines to stop + + // Wait for all goroutines to exit and check for errors + if err := a.wg.Wait(); err != nil { + log.Printf("Error from goroutine group: %v", err) + } + if a.listenCli != nil { if err := a.listenCli.Close(); err != nil { log.Printf("Error closing listenCli during cleanup: %v", err) From cb19994bdd427c92f822a6fc0300234a0dfe799a Mon Sep 17 00:00:00 2001 From: sgangopadhyay Date: Tue, 28 Oct 2025 13:19:40 +0530 Subject: [PATCH 5/5] solve lint error --- internal/conntrack/aggregator_linux.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/conntrack/aggregator_linux.go b/internal/conntrack/aggregator_linux.go index 3bff35b..6a8a54f 100644 --- a/internal/conntrack/aggregator_linux.go +++ b/internal/conntrack/aggregator_linux.go @@ -325,10 +325,9 @@ func (a *ZoneMarkAggregator) performHealthCheck() error { if err := a.RestartListener(); err != nil { log.Printf("Health check: RestartListener failed: %v", err) return fmt.Errorf("failed to restart listener: %w", err) - } else { - a.missedEvents.Store(0) - log.Printf("Health check: Listener restarted successfully") } + a.missedEvents.Store(0) + log.Printf("Health check: Listener restarted successfully") } a.lastHealthCheck = time.Now() return nil