Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions decoder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,27 +415,17 @@ func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters Sca
} else {
updateTime := wild.Timestamp / 1000
if pokemon.isNewRecord() || pokemon.wildSignificantUpdate(wild.Data, updateTime) {
go func(wildPokemon *pogo.WildPokemonProto, cellId int64, timestampMs int64) {
time.Sleep(15 * time.Second)
pokemonMutex, _ := pokemonStripedMutex.GetLock(encounterId)
pokemonMutex.Lock()
defer pokemonMutex.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

if pokemon, err := getOrCreatePokemonRecord(ctx, db, encounterId); err != nil {
log.Errorf("getOrCreatePokemonRecord: %s", err)
} else {
// Update if there is still a change required & this update is the most recent
if pokemon.wildSignificantUpdate(wildPokemon, updateTime) && pokemon.Updated.ValueOrZero() < updateTime {
log.Debugf("DELAYED UPDATE: Updating pokemon %d from wild", encounterId)

pokemon.updateFromWild(ctx, db, wildPokemon, cellId, weatherLookup, timestampMs, username)
savePokemonRecordAsAtTime(ctx, db, pokemon, false, true, true, updateTime)
}
}
}(wild.Data, int64(wild.Cell), wild.Timestamp)
// The sweeper will process it after timeout if no encounter arrives
pending := &PendingPokemon{
EncounterId: encounterId,
WildPokemon: wild.Data,
CellId: int64(wild.Cell),
TimestampMs: wild.Timestamp,
UpdateTime: updateTime,
WeatherLookup: weatherLookup,
Username: username,
}
pokemonPendingQueue.AddPending(pending)
}
}
}
Expand Down
166 changes: 166 additions & 0 deletions decoder/pending_pokemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package decoder

import (
"context"
"sync"
"time"

"golbat/db"
"golbat/pogo"

log "github.com/sirupsen/logrus"
)

// PendingPokemon stores wild pokemon data awaiting a potential encounter
type PendingPokemon struct {
EncounterId uint64
WildPokemon *pogo.WildPokemonProto
CellId int64
TimestampMs int64
UpdateTime int64
WeatherLookup map[int64]pogo.GameplayWeatherProto_WeatherCondition
Username string
ReceivedAt time.Time
}

// PokemonPendingQueue manages pokemon awaiting encounter data
type PokemonPendingQueue struct {
mu sync.RWMutex
pending map[uint64]*PendingPokemon
timeout time.Duration
}

// NewPokemonPendingQueue creates a new pending queue with the specified timeout
func NewPokemonPendingQueue(timeout time.Duration) *PokemonPendingQueue {
return &PokemonPendingQueue{
pending: make(map[uint64]*PendingPokemon),
timeout: timeout,
}
}

// AddPending stores a wild pokemon awaiting encounter data.
// Returns true if the pokemon was added, false if it already exists.
func (q *PokemonPendingQueue) AddPending(p *PendingPokemon) bool {
q.mu.Lock()
defer q.mu.Unlock()

// Only add if not already present (first sighting wins)
if _, exists := q.pending[p.EncounterId]; exists {
return false
}

p.ReceivedAt = time.Now()
q.pending[p.EncounterId] = p
return true
}

// TryComplete attempts to retrieve and remove a pending pokemon for an encounter.
// Returns the pending pokemon and true if found, nil and false otherwise.
func (q *PokemonPendingQueue) TryComplete(encounterId uint64) (*PendingPokemon, bool) {
q.mu.Lock()
defer q.mu.Unlock()

p, exists := q.pending[encounterId]
if exists {
delete(q.pending, encounterId)
}
return p, exists
}

// Remove removes a pending pokemon without processing it.
func (q *PokemonPendingQueue) Remove(encounterId uint64) {
q.mu.Lock()
delete(q.pending, encounterId)
q.mu.Unlock()
}

// Size returns the current number of pending pokemon
func (q *PokemonPendingQueue) Size() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.pending)
}

// collectExpired removes and returns all entries older than timeout
func (q *PokemonPendingQueue) collectExpired() []*PendingPokemon {
cutoff := time.Now().Add(-q.timeout)

q.mu.Lock()
defer q.mu.Unlock()

var expired []*PendingPokemon
for id, p := range q.pending {
if p.ReceivedAt.Before(cutoff) {
expired = append(expired, p)
delete(q.pending, id)
}
}

return expired
}

// StartSweeper starts a background goroutine that processes expired entries
func (q *PokemonPendingQueue) StartSweeper(ctx context.Context, interval time.Duration, dbDetails db.DbDetails) {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Info("Pokemon pending queue sweeper stopped")
return
case <-ticker.C:
expired := q.collectExpired()
if len(expired) > 0 {
log.Debugf("Processing %d expired pending pokemon", len(expired))
q.processExpired(ctx, dbDetails, expired)
}
}
}
}()
}

// processExpired handles pokemon that didn't receive an encounter within the timeout
func (q *PokemonPendingQueue) processExpired(ctx context.Context, dbDetails db.DbDetails, expired []*PendingPokemon) {
for _, p := range expired {
pokemonMutex, _ := pokemonStripedMutex.GetLock(p.EncounterId)
pokemonMutex.Lock()

processCtx, cancel := context.WithTimeout(ctx, 3*time.Second)

pokemon, err := getOrCreatePokemonRecord(processCtx, dbDetails, p.EncounterId)
if err != nil {
log.Errorf("getOrCreatePokemonRecord in sweeper: %s", err)
cancel()
pokemonMutex.Unlock()
continue
}

// Update if there is still a change required & this update is the most recent
if pokemon.wildSignificantUpdate(p.WildPokemon, p.UpdateTime) && pokemon.Updated.ValueOrZero() < p.UpdateTime {
log.Debugf("DELAYED UPDATE: Updating pokemon %d from wild (sweeper)", p.EncounterId)

pokemon.updateFromWild(processCtx, dbDetails, p.WildPokemon, p.CellId, p.WeatherLookup, p.TimestampMs, p.Username)
savePokemonRecordAsAtTime(processCtx, dbDetails, pokemon, false, true, true, p.UpdateTime)
}

cancel()
pokemonMutex.Unlock()
}
}

// Global pending queue instance
var pokemonPendingQueue *PokemonPendingQueue

// InitPokemonPendingQueue initializes the global pending queue
func InitPokemonPendingQueue(ctx context.Context, dbDetails db.DbDetails, timeout time.Duration, sweepInterval time.Duration) {
pokemonPendingQueue = NewPokemonPendingQueue(timeout)
pokemonPendingQueue.StartSweeper(ctx, sweepInterval, dbDetails)
log.Infof("Pokemon pending queue started with %v timeout and %v sweep interval", timeout, sweepInterval)
}

// GetPokemonPendingQueue returns the global pending queue instance
func GetPokemonPendingQueue() *PokemonPendingQueue {
return pokemonPendingQueue
}
5 changes: 5 additions & 0 deletions decoder/pokemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,11 @@ func UpdatePokemonRecordWithEncounterProto(ctx context.Context, db db.DbDetails,

encounterId := encounter.Pokemon.EncounterId

// Remove from pending queue - encounter arrived so no need for delayed wild update
if pokemonPendingQueue != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this should never be nilotherwise we would have forgotten to initialize it in the flow, right?

pokemonPendingQueue.Remove(encounterId)
}

pokemonMutex, _ := pokemonStripedMutex.GetLock(encounterId)
pokemonMutex.Lock()
defer pokemonMutex.Unlock()
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func main() {
_ = decoder.WatchMasterFileData()
}
decoder.LoadStatsGeofences()
decoder.InitPokemonPendingQueue(ctx, dbDetails, 30*time.Second, 5*time.Second)
InitDeviceCache()

wg.Add(1)
Expand Down