Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 56 additions & 36 deletions internal/conntrack/aggregator_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package conntrack

import (
"context"
"fmt"
"log"
"time"
Expand Down Expand Up @@ -46,10 +47,12 @@ func NewZoneMarkAggregator() (*ZoneMarkAggregator, error) {
log.Printf("Warning: Failed to set write buffer size: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
a := &ZoneMarkAggregator{
counts: make(map[ZoneMarkKey]int),
listenCli: listenCli,
stopCh: make(chan struct{}),
ctx: ctx,
cancel: cancel,
eventsCh: make(chan conntrack.Event, eventChanSize),
destroyDeltas: make(map[ZoneMarkKey]int),
lastEventTime: time.Now(),
Expand All @@ -68,19 +71,16 @@ func (a *ZoneMarkAggregator) Start() error {

for i := 0; i < eventWorkerCount; i++ {
a.wg.Go(func() error {
a.eventWorker()
return nil
return a.eventWorker(a.ctx)
})
}

a.wg.Go(func() error {
a.destroyFlusher()
return nil
return a.destroyFlusher(a.ctx)
})

a.wg.Go(func() error {
a.startHealthMonitoring()
return nil
return a.startHealthMonitoring(a.ctx)
})

return nil
Expand All @@ -95,7 +95,7 @@ func (a *ZoneMarkAggregator) startEventListener() error {
netfilter.GroupCTUpdate,
}

errCh, err := a.listenCli.Listen(libEvents, 10, groups)
errCh, err := a.listenCli.Listen(libEvents, 50, groups)
if err != nil {
return fmt.Errorf("failed to listen to conntrack events: %w", err)
}
Expand All @@ -106,9 +106,9 @@ func (a *ZoneMarkAggregator) startEventListener() error {

for {
select {
case <-a.stopCh:
case <-a.ctx.Done():
log.Printf("Stopping lib->bounded relay after %d lib events", eventCount)
return nil
return a.ctx.Err()
case e := <-errCh:
if e != nil {
log.Printf("conntrack listener error: %v", e)
Expand Down Expand Up @@ -145,28 +145,30 @@ func (a *ZoneMarkAggregator) startEventListener() error {
}

// eventWorker consumes events from eventsCh and handles them
func (a *ZoneMarkAggregator) eventWorker() {

func (a *ZoneMarkAggregator) eventWorker(ctx context.Context) error {
for {
select {
case <-a.stopCh:
return
case <-ctx.Done():
return ctx.Err()
case ev := <-a.eventsCh:
a.handleEvent(ev)
if err := a.handleEvent(ev); err != nil {
log.Printf("Error handling event: %v", err)
// Continue processing other events, but log the error
}
}
}
}

// handleEvent processes a single event.
func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) {
func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) error {
f := ev.Flow
key := ZoneMarkKey{Zone: f.Zone, Mark: f.Mark}

if ev.Type == conntrack.EventNew {
a.countsMu.Lock()
defer a.countsMu.Unlock()
a.counts[key]++
return
return nil
}

if ev.Type == conntrack.EventDestroy {
Expand All @@ -182,7 +184,7 @@ func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) {
defer a.countsMu.Unlock()
// Apply deltas immediately to minimize lag during extreme load
a.applyDeltasImmediatelyUnsafe(deltas)
return
return nil
}
// Log every 1000 DESTROY events to verify they're being received
if len(a.destroyDeltas)%1000 == 0 {
Expand All @@ -194,8 +196,10 @@ func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) {
log.Printf("Warning: destroyDeltas saturated (size=%d). missedEvents=%d", len(a.destroyDeltas), a.missedEvents.Load())
}
}
return
return nil
}

return nil
}

// applyDeltasImmediatelyUnsafe applies deltas immediately to minimize lag during extreme load
Expand All @@ -217,16 +221,16 @@ func (a *ZoneMarkAggregator) applyDeltasImmediatelyUnsafe(deltas map[ZoneMarkKey

// destroyFlusher periodically applies the aggregated DESTROY deltas into counts
// Uses adaptive flushing: more frequent during high event rates for minimal lag
func (a *ZoneMarkAggregator) destroyFlusher() {
func (a *ZoneMarkAggregator) destroyFlusher(ctx context.Context) error {
ticker := time.NewTicker(destroyFlushIntvl)
defer ticker.Stop()

for {
select {
case <-a.stopCh:
case <-ctx.Done():
log.Printf("Destroy flusher stopping, final flush...")
a.flushDestroyDeltas()
return
return ctx.Err()
case <-ticker.C:
// Adaptive flushing: flush more frequently during high event rates
a.countsMu.RLock()
Expand Down Expand Up @@ -297,38 +301,54 @@ func (a *ZoneMarkAggregator) Snapshot() map[ZoneMarkKey]int {
}

// startHealthMonitoring periodically logs aggregator health
func (a *ZoneMarkAggregator) startHealthMonitoring() {
func (a *ZoneMarkAggregator) startHealthMonitoring(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

for {
select {
case <-a.stopCh:
return
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
a.performHealthCheck()
if err := a.performHealthCheck(); err != nil {
log.Printf("Health check error: %v", err)
// Continue monitoring even if health check fails
}
}
}
}

func (a *ZoneMarkAggregator) performHealthCheck() {
func (a *ZoneMarkAggregator) performHealthCheck() error {
missed := a.missedEvents.Load()

if missed > dropsWarnThreshold {
if err := a.RestartListener(); err != nil {
log.Printf("Health check: RestartListener failed: %v", err)
} else {
a.missedEvents.Store(0)
log.Printf("Health check: Listener restarted successfully")
return fmt.Errorf("failed to restart listener: %w", err)
}
a.missedEvents.Store(0)
log.Printf("Health check: Listener restarted successfully")
}
a.lastHealthCheck = time.Now()
return nil
}

// GetError returns any error from the errgroup if available
func (a *ZoneMarkAggregator) GetError() error {
// This is a non-blocking way to check if there are any errors
// The actual error handling happens in Stop()
return nil
}

// Stop cancels listening and closes the connection.
func (a *ZoneMarkAggregator) Stop() {
close(a.stopCh)
a.wg.Wait() // Wait for all goroutines to exit cleanly
a.cancel() // Cancel the context to signal all goroutines to stop

// Wait for all goroutines to exit and check for errors
if err := a.wg.Wait(); err != nil {
log.Printf("Error from goroutine group: %v", err)
}

if a.listenCli != nil {
if err := a.listenCli.Close(); err != nil {
log.Printf("Error closing listenCli during cleanup: %v", err)
Expand All @@ -342,8 +362,8 @@ func (a *ZoneMarkAggregator) RestartListener() error {
a.listenerMu.Lock()
defer a.listenerMu.Unlock()

// Signal all goroutines to stop by closing stopCh
close(a.stopCh)
// Signal all goroutines to stop by canceling the context
a.cancel()

// Close the old connection to help goroutines exit faster
if a.listenCli != nil {
Expand All @@ -355,8 +375,8 @@ func (a *ZoneMarkAggregator) RestartListener() error {
// Wait for all goroutines to exit cleanly
a.wg.Wait()

// Create a new stopCh for the restarted goroutines
a.stopCh = make(chan struct{})
// Create a new context for the restarted goroutines
a.ctx, a.cancel = context.WithCancel(context.Background())

// Create new connection
listenCli, err := conntrack.Dial(nil)
Expand Down
116 changes: 116 additions & 0 deletions internal/conntrack/aggregator_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2017 DigitalOcean.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build linux

package conntrack

import (
"testing"
)

func TestZoneMarkAggregator(t *testing.T) {
// Test aggregator creation
agg, err := NewZoneMarkAggregator()
if err != nil {
// This is expected to fail in test environment due to permission requirements
t.Logf("Expected failure in test environment: NewZoneMarkAggregator() error = %v", err)
return
}

if agg == nil {
t.Fatal("NewZoneMarkAggregator() returned nil aggregator")
}

// Test basic methods
snapshot := agg.Snapshot()
if snapshot == nil {
t.Fatal("Snapshot() returned nil")
}

// Clean up
t.Cleanup(agg.Stop)
}

func TestZoneMarkAggregatorSnapshot(t *testing.T) {
// Test aggregator creation
agg, err := NewZoneMarkAggregator()
if err != nil {
// This is expected to fail in test environment due to permission requirements
t.Logf("Expected failure in test environment: NewZoneMarkAggregator() error = %v", err)
return
}

if agg == nil {
t.Fatal("NewZoneMarkAggregator() returned nil aggregator")
}

// Test snapshot functionality with new ZoneMarkKey-based mapping
snapshot := agg.Snapshot()
if snapshot == nil {
t.Fatal("Snapshot() returned nil")
}

// Verify snapshot is a map[ZoneMarkKey]int
if len(snapshot) == 0 {
t.Log("Snapshot is empty (expected in test environment)")
}

// Test that we can iterate over the snapshot
for key, count := range snapshot {
if count <= 0 {
t.Errorf("Invalid count %d for key %+v", count, key)
}
t.Logf("Zone: %d, Mark: %d, Count: %d", key.Zone, key.Mark, count)
}

// Clean up
t.Cleanup(agg.Stop)
}

func TestZMKeyComparison(t *testing.T) {
// Test that ZoneMarkKey works correctly as a map key
key1 := ZoneMarkKey{Zone: 1, Mark: 100}
key2 := ZoneMarkKey{Zone: 1, Mark: 100}
key3 := ZoneMarkKey{Zone: 2, Mark: 100}
key4 := ZoneMarkKey{Zone: 1, Mark: 200}

// Test equality
if key1 != key2 {
t.Error("Identical ZoneMarkKey structs should be equal")
}

// Test inequality
if key1 == key3 {
t.Error("Different zone ZoneMarkKey structs should not be equal")
}
if key1 == key4 {
t.Error("Different mark ZoneMarkKey structs should not be equal")
}

// Test as map keys
testMap := make(map[ZoneMarkKey]int)
testMap[key1] = 5
testMap[key3] = 10

if testMap[key1] != 5 {
t.Error("ZoneMarkKey should work as map key")
}
if testMap[key2] != 5 {
t.Error("Equal ZoneMarkKey structs should map to same value")
}
if testMap[key3] != 10 {
t.Error("Different ZoneMarkKey should map to different value")
}
}
42 changes: 0 additions & 42 deletions internal/conntrack/aggregator_stub.go

This file was deleted.

Loading