Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions CONNTRACK_CONFIG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Conntrack Configuration

This document describes the configuration options available for the conntrack aggregator.

## Environment Variables

The conntrack aggregator can be configured using environment variables with the `CONNTRACK_` prefix:

| Variable | Default | Description |
|----------|---------|-------------|
| `CONNTRACK_EVENT_CHAN_SIZE` | `524288` | Event channel buffer size (512KB) |
| `CONNTRACK_EVENT_WORKER_COUNT` | `100` | Number of event worker goroutines |
| `CONNTRACK_DESTROY_FLUSH_INTERVAL` | `50ms` | Interval for flushing destroy deltas |
| `CONNTRACK_DESTROY_DELTA_CAP` | `200000` | Maximum destroy delta entries |
| `CONNTRACK_DROPS_WARN_THRESHOLD` | `10000` | Threshold for missed events warning |
| `CONNTRACK_READ_BUFFER_SIZE` | `67108864` | Read buffer size (64MB) |
| `CONNTRACK_WRITE_BUFFER_SIZE` | `67108864` | Write buffer size (64MB) |
| `CONNTRACK_HEALTH_CHECK_INTERVAL` | `5m` | Health check interval |
| `CONNTRACK_GRACEFUL_TIMEOUT` | `30s` | Graceful shutdown timeout |

## Usage Examples

### Basic Configuration

```bash
# Set custom buffer sizes
export CONNTRACK_EVENT_CHAN_SIZE=1048576
export CONNTRACK_EVENT_WORKER_COUNT=200

# Run the exporter
./openvswitch_exporter
```

### High-Throughput Environment

For environments with high conntrack event rates (>1M events/sec):

```bash
export CONNTRACK_EVENT_CHAN_SIZE=1048576 # 1MB buffer
export CONNTRACK_EVENT_WORKER_COUNT=200 # More workers
export CONNTRACK_DESTROY_FLUSH_INTERVAL=25ms # Faster flushing
export CONNTRACK_DESTROY_DELTA_CAP=500000 # Larger delta cap
export CONNTRACK_READ_BUFFER_SIZE=134217728 # 128MB read buffer
export CONNTRACK_WRITE_BUFFER_SIZE=134217728 # 128MB write buffer
```

### Low-Resource Environment

For environments with limited resources:

```bash
export CONNTRACK_EVENT_CHAN_SIZE=65536 # 64KB buffer
export CONNTRACK_EVENT_WORKER_COUNT=50 # Fewer workers
export CONNTRACK_DESTROY_FLUSH_INTERVAL=100ms # Slower flushing
export CONNTRACK_DESTROY_DELTA_CAP=50000 # Smaller delta cap
export CONNTRACK_READ_BUFFER_SIZE=16777216 # 16MB read buffer
export CONNTRACK_WRITE_BUFFER_SIZE=16777216 # 16MB write buffer
```

### Development/Testing

For development and testing:

```bash
export CONNTRACK_GRACEFUL_TIMEOUT=5s # Faster shutdown
export CONNTRACK_HEALTH_CHECK_INTERVAL=1m # More frequent health checks
```

## Configuration Validation

The configuration system includes validation:

- **Positive values**: All numeric values must be positive
- **Valid durations**: Time values must be valid Go durations
- **Range checks**: Values are checked for reasonable ranges

Invalid values will fall back to defaults with a warning logged.

## Migration from Hardcoded Constants

The following hardcoded constants have been replaced:

| Old Constant | New Environment Variable | Default Value |
|--------------|-------------------------|---------------|
| `eventChanSize = 512 * 1024` | `CONNTRACK_EVENT_CHAN_SIZE` | `524288` |
| `eventWorkerCount = 100` | `CONNTRACK_EVENT_WORKER_COUNT` | `100` |
| `destroyFlushIntvl = 50ms` | `CONNTRACK_DESTROY_FLUSH_INTERVAL` | `50ms` |
| `destroyDeltaCap = 200000` | `CONNTRACK_DESTROY_DELTA_CAP` | `200000` |
| `dropsWarnThreshold = 10000` | `CONNTRACK_DROPS_WARN_THRESHOLD` | `10000` |
| Buffer sizes `64MB` | `CONNTRACK_READ_BUFFER_SIZE` / `WRITE_BUFFER_SIZE` | `67108864` |
| Health check `5m` | `CONNTRACK_HEALTH_CHECK_INTERVAL` | `5m` |
| Graceful timeout `30s` | `CONNTRACK_GRACEFUL_TIMEOUT` | `30s` |

## Performance Impact

Configuration changes can significantly impact performance:

- **Larger buffers**: Better for high-throughput, uses more memory
- **More workers**: Better parallelism, uses more CPU
- **Faster flushing**: Lower latency, more CPU usage
- **Larger delta cap**: Handles bursts better, uses more memory

Choose settings based on your environment's characteristics and requirements.
61 changes: 58 additions & 3 deletions cmd/openvswitch_exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
package main

import (
"context"
"flag"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/digitalocean/go-openvswitch/ovsnl"
"github.com/digitalocean/openvswitch_exporter/internal/ovsexporter"
Expand Down Expand Up @@ -38,9 +43,59 @@ func main() {
http.Redirect(w, r, *metricsPath, http.StatusMovedPermanently)
})

log.Printf("starting Open vSwitch exporter on %q", *metricsAddr)
// Create HTTP server
server := &http.Server{
Addr: *metricsAddr,
Handler: mux,
}

// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan,
syscall.SIGINT, // Ctrl+C
syscall.SIGTERM, // Termination request
syscall.SIGHUP, // Hang up (config reload)
syscall.SIGQUIT, // Quit signal
)

if err := http.ListenAndServe(*metricsAddr, mux); err != nil {
log.Fatalf("cannot start Open vSwitch exporter: %v", err)
// Start server in goroutine
go func() {
log.Printf("starting Open vSwitch exporter on %q", *metricsAddr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("cannot start Open vSwitch exporter: %v", err)
}
}()

// Wait for shutdown signal
sig := <-sigChan

switch sig {
case syscall.SIGHUP:
log.Printf("Received SIGHUP, reloading config...")
// TODO: Add config reload logic here
log.Printf("Config reloaded")
return
case syscall.SIGQUIT:
log.Printf("Received SIGQUIT, shutting down immediately...")
// Immediate shutdown for SIGQUIT
default:
log.Printf("Received signal %v, stopping gracefully...", sig)
}

// Graceful shutdown with 15 second timeout
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

if err := server.Shutdown(ctx); err != nil {
log.Printf("Server shutdown error: %v", err)
}

// Close collector if it supports graceful shutdown
if closeable, ok := collector.(interface{ Close() error }); ok {
if err := closeable.Close(); err != nil {
log.Printf("Collector shutdown error: %v", err)
}
}

log.Printf("Exporter stopped")
}
69 changes: 53 additions & 16 deletions internal/conntrack/aggregator_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,32 @@ import (

// NewZoneMarkAggregator creates a new aggregator with its own listening connection.
func NewZoneMarkAggregator() (*ZoneMarkAggregator, error) {
return NewZoneMarkAggregatorWithConfig(LoadConfig())
}

// NewZoneMarkAggregatorWithConfig creates a new aggregator with custom configuration.
func NewZoneMarkAggregatorWithConfig(config *Config) (*ZoneMarkAggregator, error) {
// Create a separate connection for listening to events
listenCli, err := conntrack.Dial(nil)
if err != nil {
return nil, fmt.Errorf("failed to create listening connection: %w", err)
}

if err := listenCli.SetReadBuffer(64 * 1024 * 1024); err != nil { // 64MB buffer for 1.4M events/sec
if err := listenCli.SetReadBuffer(config.ReadBufferSize); err != nil {
log.Printf("Warning: Failed to set read buffer size: %v", err)
}
if err := listenCli.SetWriteBuffer(64 * 1024 * 1024); err != nil { // 64MB buffer for 1.4M events/sec
if err := listenCli.SetWriteBuffer(config.WriteBufferSize); err != nil {
log.Printf("Warning: Failed to set write buffer size: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
a := &ZoneMarkAggregator{
config: config,
counts: make(map[ZoneMarkKey]int),
listenCli: listenCli,
ctx: ctx,
cancel: cancel,
eventsCh: make(chan conntrack.Event, eventChanSize),
eventsCh: make(chan conntrack.Event, config.EventChanSize),
destroyDeltas: make(map[ZoneMarkKey]int),
lastEventTime: time.Now(),
lastHealthCheck: time.Now(),
Expand All @@ -69,7 +74,7 @@ func (a *ZoneMarkAggregator) Start() error {
return err
}

for i := 0; i < eventWorkerCount; i++ {
for i := 0; i < a.config.EventWorkerCount; i++ {
a.wg.Go(func() error {
return a.eventWorker(a.ctx)
})
Expand Down Expand Up @@ -174,7 +179,7 @@ func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) error {
if ev.Type == conntrack.EventDestroy {
a.deltaMu.Lock()
defer a.deltaMu.Unlock()
if len(a.destroyDeltas) < destroyDeltaCap {
if len(a.destroyDeltas) < a.config.DestroyDeltaCap {
a.destroyDeltas[key]++
if len(a.destroyDeltas) > 50000 { // If we have >50K deltas, flush immediately
deltas := a.destroyDeltas
Expand All @@ -192,7 +197,7 @@ func (a *ZoneMarkAggregator) handleEvent(ev conntrack.Event) error {
}
} else {
a.missedEvents.Add(1)
if a.missedEvents.Load()%dropsWarnThreshold == 0 {
if a.missedEvents.Load()%a.config.DropsWarnThreshold == 0 {
log.Printf("Warning: destroyDeltas saturated (size=%d). missedEvents=%d", len(a.destroyDeltas), a.missedEvents.Load())
}
}
Expand Down Expand Up @@ -222,7 +227,7 @@ func (a *ZoneMarkAggregator) applyDeltasImmediatelyUnsafe(deltas map[ZoneMarkKey
// destroyFlusher periodically applies the aggregated DESTROY deltas into counts
// Uses adaptive flushing: more frequent during high event rates for minimal lag
func (a *ZoneMarkAggregator) destroyFlusher(ctx context.Context) error {
ticker := time.NewTicker(destroyFlushIntvl)
ticker := time.NewTicker(a.config.DestroyFlushIntvl)
defer ticker.Stop()

for {
Expand Down Expand Up @@ -250,7 +255,7 @@ func (a *ZoneMarkAggregator) destroyFlusher(ctx context.Context) error {
} else {
// Normal flush
a.flushDestroyDeltas()
ticker.Reset(destroyFlushIntvl) // Back to normal interval
ticker.Reset(a.config.DestroyFlushIntvl) // Back to normal interval
}
}
}
Expand Down Expand Up @@ -302,7 +307,7 @@ func (a *ZoneMarkAggregator) Snapshot() map[ZoneMarkKey]int {

// startHealthMonitoring periodically logs aggregator health
func (a *ZoneMarkAggregator) startHealthMonitoring(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Minute)
ticker := time.NewTicker(a.config.HealthCheckIntvl)
defer ticker.Stop()

for {
Expand All @@ -321,7 +326,7 @@ func (a *ZoneMarkAggregator) startHealthMonitoring(ctx context.Context) error {
func (a *ZoneMarkAggregator) performHealthCheck() error {
missed := a.missedEvents.Load()

if missed > dropsWarnThreshold {
if missed > a.config.DropsWarnThreshold {
if err := a.RestartListener(); err != nil {
log.Printf("Health check: RestartListener failed: %v", err)
return fmt.Errorf("failed to restart listener: %w", err)
Expand All @@ -340,21 +345,53 @@ func (a *ZoneMarkAggregator) GetError() error {
return nil
}

// Stop cancels listening and closes the connection.
func (a *ZoneMarkAggregator) Stop() {
a.cancel() // Cancel the context to signal all goroutines to stop
// Stop cancels listening and closes the connection with graceful shutdown.
func (a *ZoneMarkAggregator) Stop() error {
return a.StopWithTimeout(a.config.GracefulTimeout)
}

// StopWithTimeout cancels listening and closes the connection with a configurable timeout.
func (a *ZoneMarkAggregator) StopWithTimeout(timeout time.Duration) error {
// Signal shutdown to all goroutines
a.cancel()

// Create a context with timeout for graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// Channel to receive shutdown completion
done := make(chan error, 1)

// Wait for all goroutines to exit and check for errors
if err := a.wg.Wait(); err != nil {
log.Printf("Error from goroutine group: %v", err)
// Wait for goroutines to exit in a separate goroutine
go func() {
done <- a.wg.Wait()
}()

// Wait for either completion or timeout
select {
case err := <-done:
if err != nil {
log.Printf("Error from goroutine group during shutdown: %v", err)
// Continue with cleanup even if there were errors
}
case <-ctx.Done():
log.Printf("Graceful shutdown timeout exceeded (%v), forcing cleanup", timeout)
// Force close connections even if goroutines didn't exit cleanly
}

// Close the listening connection
if a.listenCli != nil {
if err := a.listenCli.Close(); err != nil {
log.Printf("Error closing listenCli during cleanup: %v", err)
}
a.listenCli = nil
}

// Final flush of any remaining deltas
a.flushDestroyDeltas()

log.Printf("Aggregator stopped gracefully")
return nil
}

// RestartListener attempts to restart the conntrack event listener
Expand Down
Loading