diff --git a/balloony.go b/balloony.go index 1289c73..6ee9b91 100644 --- a/balloony.go +++ b/balloony.go @@ -32,6 +32,7 @@ var message_unusual = "Unusual Sonde Detected!" var receivers []Point var receiversMutex sync.RWMutex +var seenSerials = newSerialTracker() const defaultReceiversUpdateInterval = 12 * 60 * 60 // 12 hours in seconds const zeroWidthSpace = "\u200B" @@ -64,6 +65,8 @@ var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Me // In most situations, we only get 1 packet, but we still handle it with a foreach in the situation where we have a multi-sdr receiver for _, pkt := range pkts { + wasSeen := seenSerials.wasSeenRecently(pkt.Serial) + seenSerials.markSeen(pkt.Serial, pkt.TimeReceived) // TEST: Test the nearest point functionality // closest, dist, err := FindClosestPoint(pkt.Lat, pkt.Lon, launchSites) // if err != nil { @@ -75,9 +78,9 @@ var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Me // This is the main processing loop for incoming sondehub packets // Check to see if the sonde is inside of our area of interest if !InsidePoly([]float64{pkt.Lon, pkt.Lat}, boundaryPts) { - // Skip packets that are outside the defined boundary - if !bypassLocationFilter { - return + // Skip packets outside the boundary unless we have seen this serial recently. + if !bypassLocationFilter && !wasSeen { + continue } } @@ -447,8 +450,9 @@ func main() { panic(err) } - // Start the receivers updater goroutine + // Start periodic background workers startReceiversUpdater() + startStaleSeenSerialsCleanup(seenSerials) // Wait for Ctrl+C (SIGINT) to exit c := make(chan os.Signal, 1) diff --git a/tracker.go b/tracker.go new file mode 100644 index 0000000..d82c4f6 --- /dev/null +++ b/tracker.go @@ -0,0 +1,65 @@ +package main + +import ( + "sync" + "time" +) + +const ( + staleSeenTTL = time.Hour + staleCleanupInterval = time.Hour +) + +// serialTracker keeps a lightweight in-memory record of recently seen serials. +type serialTracker struct { + mu sync.RWMutex + serials map[string]struct{} + lastSeen map[string]time.Time +} + +func newSerialTracker() *serialTracker { + return &serialTracker{ + serials: make(map[string]struct{}), + lastSeen: make(map[string]time.Time), + } +} + +// wasSeenRecently reports whether a serial exists in memory. +func (t *serialTracker) wasSeenRecently(serial string) bool { + t.mu.RLock() + _, ok := t.serials[serial] + t.mu.RUnlock() + return ok +} + +// markSeen updates the serial timestamp to "now". +func (t *serialTracker) markSeen(serial string, now time.Time) { + t.mu.Lock() + t.serials[serial] = struct{}{} + t.lastSeen[serial] = now + t.mu.Unlock() +} + +// removeStale drops serials that have not been seen within maxAge. +func (t *serialTracker) removeStale(now time.Time, maxAge time.Duration) { + cutoff := now.Add(-maxAge) + t.mu.Lock() + for serial, seenAt := range t.lastSeen { + if seenAt.Before(cutoff) { + delete(t.lastSeen, serial) + delete(t.serials, serial) + } + } + t.mu.Unlock() +} + +func startStaleSeenSerialsCleanup(tracker *serialTracker) { + go func() { + ticker := time.NewTicker(staleCleanupInterval) + defer ticker.Stop() + + for now := range ticker.C { + tracker.removeStale(now, staleSeenTTL) + } + }() +}