diff --git a/decoder/main.go b/decoder/main.go index 42ec4c1b..4f7b9c38 100644 --- a/decoder/main.go +++ b/decoder/main.go @@ -415,27 +415,17 @@ func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters Sca } else { updateTime := wild.Timestamp / 1000 if pokemon.isNewRecord() || pokemon.wildSignificantUpdate(wild.Data, updateTime) { - go func(wildPokemon *pogo.WildPokemonProto, cellId int64, timestampMs int64) { - time.Sleep(15 * time.Second) - pokemonMutex, _ := pokemonStripedMutex.GetLock(encounterId) - pokemonMutex.Lock() - defer pokemonMutex.Unlock() - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - if pokemon, err := getOrCreatePokemonRecord(ctx, db, encounterId); err != nil { - log.Errorf("getOrCreatePokemonRecord: %s", err) - } else { - // Update if there is still a change required & this update is the most recent - if pokemon.wildSignificantUpdate(wildPokemon, updateTime) && pokemon.Updated.ValueOrZero() < updateTime { - log.Debugf("DELAYED UPDATE: Updating pokemon %d from wild", encounterId) - - pokemon.updateFromWild(ctx, db, wildPokemon, cellId, weatherLookup, timestampMs, username) - savePokemonRecordAsAtTime(ctx, db, pokemon, false, true, true, updateTime) - } - } - }(wild.Data, int64(wild.Cell), wild.Timestamp) + // The sweeper will process it after timeout if no encounter arrives + pending := &PendingPokemon{ + EncounterId: encounterId, + WildPokemon: wild.Data, + CellId: int64(wild.Cell), + TimestampMs: wild.Timestamp, + UpdateTime: updateTime, + WeatherLookup: weatherLookup, + Username: username, + } + pokemonPendingQueue.AddPending(pending) } } } diff --git a/decoder/pending_pokemon.go b/decoder/pending_pokemon.go new file mode 100644 index 00000000..4a22f070 --- /dev/null +++ b/decoder/pending_pokemon.go @@ -0,0 +1,166 @@ +package decoder + +import ( + "context" + "sync" + "time" + + "golbat/db" + "golbat/pogo" + + log "github.com/sirupsen/logrus" +) + +// PendingPokemon stores wild pokemon data awaiting a potential encounter +type PendingPokemon struct { + EncounterId uint64 + WildPokemon *pogo.WildPokemonProto + CellId int64 + TimestampMs int64 + UpdateTime int64 + WeatherLookup map[int64]pogo.GameplayWeatherProto_WeatherCondition + Username string + ReceivedAt time.Time +} + +// PokemonPendingQueue manages pokemon awaiting encounter data +type PokemonPendingQueue struct { + mu sync.RWMutex + pending map[uint64]*PendingPokemon + timeout time.Duration +} + +// NewPokemonPendingQueue creates a new pending queue with the specified timeout +func NewPokemonPendingQueue(timeout time.Duration) *PokemonPendingQueue { + return &PokemonPendingQueue{ + pending: make(map[uint64]*PendingPokemon), + timeout: timeout, + } +} + +// AddPending stores a wild pokemon awaiting encounter data. +// Returns true if the pokemon was added, false if it already exists. +func (q *PokemonPendingQueue) AddPending(p *PendingPokemon) bool { + q.mu.Lock() + defer q.mu.Unlock() + + // Only add if not already present (first sighting wins) + if _, exists := q.pending[p.EncounterId]; exists { + return false + } + + p.ReceivedAt = time.Now() + q.pending[p.EncounterId] = p + return true +} + +// TryComplete attempts to retrieve and remove a pending pokemon for an encounter. +// Returns the pending pokemon and true if found, nil and false otherwise. +func (q *PokemonPendingQueue) TryComplete(encounterId uint64) (*PendingPokemon, bool) { + q.mu.Lock() + defer q.mu.Unlock() + + p, exists := q.pending[encounterId] + if exists { + delete(q.pending, encounterId) + } + return p, exists +} + +// Remove removes a pending pokemon without processing it. +func (q *PokemonPendingQueue) Remove(encounterId uint64) { + q.mu.Lock() + delete(q.pending, encounterId) + q.mu.Unlock() +} + +// Size returns the current number of pending pokemon +func (q *PokemonPendingQueue) Size() int { + q.mu.RLock() + defer q.mu.RUnlock() + return len(q.pending) +} + +// collectExpired removes and returns all entries older than timeout +func (q *PokemonPendingQueue) collectExpired() []*PendingPokemon { + cutoff := time.Now().Add(-q.timeout) + + q.mu.Lock() + defer q.mu.Unlock() + + var expired []*PendingPokemon + for id, p := range q.pending { + if p.ReceivedAt.Before(cutoff) { + expired = append(expired, p) + delete(q.pending, id) + } + } + + return expired +} + +// StartSweeper starts a background goroutine that processes expired entries +func (q *PokemonPendingQueue) StartSweeper(ctx context.Context, interval time.Duration, dbDetails db.DbDetails) { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + log.Info("Pokemon pending queue sweeper stopped") + return + case <-ticker.C: + expired := q.collectExpired() + if len(expired) > 0 { + log.Debugf("Processing %d expired pending pokemon", len(expired)) + q.processExpired(ctx, dbDetails, expired) + } + } + } + }() +} + +// processExpired handles pokemon that didn't receive an encounter within the timeout +func (q *PokemonPendingQueue) processExpired(ctx context.Context, dbDetails db.DbDetails, expired []*PendingPokemon) { + for _, p := range expired { + pokemonMutex, _ := pokemonStripedMutex.GetLock(p.EncounterId) + pokemonMutex.Lock() + + processCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + + pokemon, err := getOrCreatePokemonRecord(processCtx, dbDetails, p.EncounterId) + if err != nil { + log.Errorf("getOrCreatePokemonRecord in sweeper: %s", err) + cancel() + pokemonMutex.Unlock() + continue + } + + // Update if there is still a change required & this update is the most recent + if pokemon.wildSignificantUpdate(p.WildPokemon, p.UpdateTime) && pokemon.Updated.ValueOrZero() < p.UpdateTime { + log.Debugf("DELAYED UPDATE: Updating pokemon %d from wild (sweeper)", p.EncounterId) + + pokemon.updateFromWild(processCtx, dbDetails, p.WildPokemon, p.CellId, p.WeatherLookup, p.TimestampMs, p.Username) + savePokemonRecordAsAtTime(processCtx, dbDetails, pokemon, false, true, true, p.UpdateTime) + } + + cancel() + pokemonMutex.Unlock() + } +} + +// Global pending queue instance +var pokemonPendingQueue *PokemonPendingQueue + +// InitPokemonPendingQueue initializes the global pending queue +func InitPokemonPendingQueue(ctx context.Context, dbDetails db.DbDetails, timeout time.Duration, sweepInterval time.Duration) { + pokemonPendingQueue = NewPokemonPendingQueue(timeout) + pokemonPendingQueue.StartSweeper(ctx, sweepInterval, dbDetails) + log.Infof("Pokemon pending queue started with %v timeout and %v sweep interval", timeout, sweepInterval) +} + +// GetPokemonPendingQueue returns the global pending queue instance +func GetPokemonPendingQueue() *PokemonPendingQueue { + return pokemonPendingQueue +} diff --git a/decoder/pokemon.go b/decoder/pokemon.go index 9244cc2b..33da3fa6 100644 --- a/decoder/pokemon.go +++ b/decoder/pokemon.go @@ -1400,6 +1400,11 @@ func UpdatePokemonRecordWithEncounterProto(ctx context.Context, db db.DbDetails, encounterId := encounter.Pokemon.EncounterId + // Remove from pending queue - encounter arrived so no need for delayed wild update + if pokemonPendingQueue != nil { + pokemonPendingQueue.Remove(encounterId) + } + pokemonMutex, _ := pokemonStripedMutex.GetLock(encounterId) pokemonMutex.Lock() defer pokemonMutex.Unlock() diff --git a/main.go b/main.go index 02cae765..209599b8 100644 --- a/main.go +++ b/main.go @@ -210,6 +210,7 @@ func main() { _ = decoder.WatchMasterFileData() } decoder.LoadStatsGeofences() + decoder.InitPokemonPendingQueue(ctx, dbDetails, 30*time.Second, 5*time.Second) InitDeviceCache() wg.Add(1)