Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions balloony.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var message_unusual = "Unusual Sonde Detected!"

var receivers []Point
var receiversMutex sync.RWMutex
var seenSerials = newSerialTracker()

const defaultReceiversUpdateInterval = 12 * 60 * 60 // 12 hours in seconds
const zeroWidthSpace = "\u200B"
Expand Down Expand Up @@ -64,6 +65,8 @@ var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Me

// In most situations, we only get 1 packet, but we still handle it with a foreach in the situation where we have a multi-sdr receiver
for _, pkt := range pkts {
wasSeen := seenSerials.wasSeenRecently(pkt.Serial)
seenSerials.markSeen(pkt.Serial, pkt.TimeReceived)
// TEST: Test the nearest point functionality
// closest, dist, err := FindClosestPoint(pkt.Lat, pkt.Lon, launchSites)
// if err != nil {
Expand All @@ -75,9 +78,9 @@ var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Me
// This is the main processing loop for incoming sondehub packets
// Check to see if the sonde is inside of our area of interest
if !InsidePoly([]float64{pkt.Lon, pkt.Lat}, boundaryPts) {
// Skip packets that are outside the defined boundary
if !bypassLocationFilter {
return
// Skip packets outside the boundary unless we have seen this serial recently.
if !bypassLocationFilter && !wasSeen {
continue
}
}

Expand Down Expand Up @@ -447,8 +450,9 @@ func main() {
panic(err)
}

// Start the receivers updater goroutine
// Start periodic background workers
startReceiversUpdater()
startStaleSeenSerialsCleanup(seenSerials)

// Wait for Ctrl+C (SIGINT) to exit
c := make(chan os.Signal, 1)
Expand Down
65 changes: 65 additions & 0 deletions tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"sync"
"time"
)

const (
staleSeenTTL = time.Hour
staleCleanupInterval = time.Hour
)

// serialTracker keeps a lightweight in-memory record of recently seen serials.
type serialTracker struct {
mu sync.RWMutex
serials map[string]struct{}
lastSeen map[string]time.Time
}

func newSerialTracker() *serialTracker {
return &serialTracker{
serials: make(map[string]struct{}),
lastSeen: make(map[string]time.Time),
}
}

// wasSeenRecently reports whether a serial exists in memory.
func (t *serialTracker) wasSeenRecently(serial string) bool {
t.mu.RLock()
_, ok := t.serials[serial]
t.mu.RUnlock()
return ok
}

// markSeen updates the serial timestamp to "now".
func (t *serialTracker) markSeen(serial string, now time.Time) {
t.mu.Lock()
t.serials[serial] = struct{}{}
t.lastSeen[serial] = now
t.mu.Unlock()
}

// removeStale drops serials that have not been seen within maxAge.
func (t *serialTracker) removeStale(now time.Time, maxAge time.Duration) {
cutoff := now.Add(-maxAge)
t.mu.Lock()
for serial, seenAt := range t.lastSeen {
if seenAt.Before(cutoff) {
delete(t.lastSeen, serial)
delete(t.serials, serial)
}
}
t.mu.Unlock()
}

func startStaleSeenSerialsCleanup(tracker *serialTracker) {
go func() {
ticker := time.NewTicker(staleCleanupInterval)
defer ticker.Stop()

for now := range ticker.C {
tracker.removeStale(now, staleSeenTTL)
}
}()
}