diff --git a/config/config.go b/config/config.go index 082ee2d9..e0320c5b 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,7 @@ type cleanup struct { Quests bool `koanf:"quests"` Incidents bool `koanf:"incidents"` Tappables bool `koanf:"tappables"` + Hyperlocals bool `koanf:"hyperlocals"` Stats bool `koanf:"stats"` StatsDays int `koanf:"stats_days"` DeviceHours int `koanf:"device_hours"` @@ -124,18 +125,19 @@ type tuning struct { } type scanRule struct { - Areas []string `koanf:"areas"` - AreaNames []geo.AreaName `koanf:"-"` - ScanContext []string `koanf:"context"` - ProcessPokemon *bool `koanf:"pokemon"` - ProcessWilds *bool `koanf:"wild_pokemon"` - ProcessNearby *bool `koanf:"nearby_pokemon"` - ProcessWeather *bool `koanf:"weather"` - ProcessCells *bool `koanf:"cells"` - ProcessPokestops *bool `koanf:"pokestops"` - ProcessGyms *bool `koanf:"gyms"` - ProcessStations *bool `koanf:"stations"` - ProcessTappables *bool `koanf:"tappables"` + Areas []string `koanf:"areas"` + AreaNames []geo.AreaName `koanf:"-"` + ScanContext []string `koanf:"context"` + ProcessPokemon *bool `koanf:"pokemon"` + ProcessWilds *bool `koanf:"wild_pokemon"` + ProcessNearby *bool `koanf:"nearby_pokemon"` + ProcessWeather *bool `koanf:"weather"` + ProcessCells *bool `koanf:"cells"` + ProcessPokestops *bool `koanf:"pokestops"` + ProcessGyms *bool `koanf:"gyms"` + ProcessStations *bool `koanf:"stations"` + ProcessTappables *bool `koanf:"tappables"` + ProcessHyperlocals *bool `koanf:"hyperlocals"` } var Config configDefinition diff --git a/config/reader.go b/config/reader.go index a8c5c77a..0a2ec4cd 100644 --- a/config/reader.go +++ b/config/reader.go @@ -44,6 +44,7 @@ func ReadConfig() (configDefinition, error) { Quests: true, Incidents: true, Tappables: true, + Hyperlocals: true, StatsDays: 7, DeviceHours: 24, }, diff --git a/decoder/hyperlocal.go b/decoder/hyperlocal.go new file mode 100644 index 00000000..0c3f3c85 --- /dev/null +++ b/decoder/hyperlocal.go @@ -0,0 +1,127 @@ +package decoder + +import ( + "context" + "database/sql" + "errors" + "golbat/db" + "golbat/pogo" + + "github.com/jellydator/ttlcache/v3" + log "github.com/sirupsen/logrus" +) + +// Hyperlocal struct for hyperlocal experiment data +type Hyperlocal struct { + ExperimentId int32 `db:"experiment_id" json:"experiment_id"` + StartMs int64 `db:"start_ms" json:"start_ms"` + EndMs int64 `db:"end_ms" json:"end_ms"` + Lat float64 `db:"lat" json:"lat"` + Lon float64 `db:"lon" json:"lon"` + RadiusM float64 `db:"radius_m" json:"radius_m"` + ChallengeBonusKey string `db:"challenge_bonus_key" json:"challenge_bonus_key"` + UpdatedMs int64 `db:"updated_ms" json:"updated_ms"` +} + +// HyperlocalKey represents the composite primary key for hyperlocal records +type HyperlocalKey struct { + ExperimentId int32 `json:"experiment_id"` + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` +} + +func (h *Hyperlocal) getKey() HyperlocalKey { + return HyperlocalKey{ + ExperimentId: h.ExperimentId, + Lat: h.Lat, + Lon: h.Lon, + } +} + +func (h *Hyperlocal) updateFromHyperlocalProto(data *pogo.HyperlocalExperimentClientProto, timestampMs int64) { + h.ExperimentId = data.ExperimentId + h.StartMs = data.StartMs + h.EndMs = data.EndMs + h.Lat = data.LatDegrees + h.Lon = data.LngDegrees + h.RadiusM = data.EventRadiusM + h.ChallengeBonusKey = data.ChallengeBonusKey + h.UpdatedMs = timestampMs +} + +func getHyperlocalRecord(ctx context.Context, db db.DbDetails, key HyperlocalKey) (*Hyperlocal, error) { + // Check cache first using HyperlocalKey directly + if cachedItem := hyperlocalCache.Get(key); cachedItem != nil { + hyperlocal := cachedItem.Value() + return &hyperlocal, nil + } + + hyperlocal := Hyperlocal{} + err := db.GeneralDb.GetContext(ctx, &hyperlocal, + `SELECT experiment_id, start_ms, end_ms, lat, lon, radius_m, challenge_bonus_key, updated_ms + FROM hyperlocal + WHERE experiment_id = ? AND lat = ? AND lon = ?`, key.ExperimentId, key.Lat, key.Lon) + statsCollector.IncDbQuery("select hyperlocal", err) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + + if err != nil { + return nil, err + } + return &hyperlocal, nil +} + +func saveHyperlocalRecord(ctx context.Context, details db.DbDetails, hyperlocal *Hyperlocal) { + key := hyperlocal.getKey() + oldHyperlocal, _ := getHyperlocalRecord(ctx, details, key) + + if oldHyperlocal != nil && !hasChangesHyperlocal(oldHyperlocal, hyperlocal) { + return + } + + if oldHyperlocal == nil { + res, err := details.GeneralDb.NamedExecContext(ctx, ` + INSERT INTO hyperlocal ( + experiment_id, start_ms, end_ms, lat, lon, radius_m, challenge_bonus_key, updated_ms + ) VALUES ( + :experiment_id, :start_ms, :end_ms, :lat, :lon, :radius_m, :challenge_bonus_key, :updated_ms + ) + `, hyperlocal) + statsCollector.IncDbQuery("insert hyperlocal", err) + if err != nil { + log.Errorf("insert hyperlocal %+v: %s", key, err) + return + } + _ = res + } else { + res, err := details.GeneralDb.NamedExecContext(ctx, ` + UPDATE hyperlocal SET + start_ms = :start_ms, + end_ms = :end_ms, + radius_m = :radius_m, + challenge_bonus_key = :challenge_bonus_key, + updated_ms = :updated_ms + WHERE experiment_id = :experiment_id AND lat = :lat AND lon = :lon + `, hyperlocal) + statsCollector.IncDbQuery("update hyperlocal", err) + if err != nil { + log.Errorf("update hyperlocal %+v: %s", key, err) + return + } + _ = res + } + hyperlocalCache.Set(key, *hyperlocal, ttlcache.DefaultTTL) +} + +func hasChangesHyperlocal(old *Hyperlocal, new *Hyperlocal) bool { + return old.StartMs != new.StartMs || + old.EndMs != new.EndMs || + old.RadiusM != new.RadiusM || + old.ChallengeBonusKey != new.ChallengeBonusKey || + old.UpdatedMs < new.UpdatedMs +} + +func ClearHyperlocalCache() { + hyperlocalCache.DeleteAll() +} diff --git a/decoder/main.go b/decoder/main.go index 0b87aaca..091d8c3c 100644 --- a/decoder/main.go +++ b/decoder/main.go @@ -53,6 +53,11 @@ type RawMapPokemonData struct { Timestamp int64 } +type RawHyperlocalData struct { + Data *pogo.HyperlocalExperimentClientProto + Timestamp int64 +} + type webhooksSenderInterface interface { AddMessage(whType webhooks.WebhookType, message any, areas []geo.AreaName) } @@ -72,6 +77,7 @@ var playerCache *ttlcache.Cache[string, Player] var routeCache *ttlcache.Cache[string, Route] var diskEncounterCache *ttlcache.Cache[uint64, *pogo.DiskEncounterOutProto] var getMapFortsCache *ttlcache.Cache[string, *pogo.GetMapFortsOutProto_FortProto] +var hyperlocalCache *ttlcache.Cache[HyperlocalKey, Hyperlocal] var gymStripedMutex = stripedmutex.New(128) var pokestopStripedMutex = stripedmutex.New(128) @@ -82,6 +88,7 @@ var pokemonStripedMutex = intstripedmutex.New(1024) var weatherStripedMutex = intstripedmutex.New(128) var s2cellStripedMutex = stripedmutex.New(1024) var routeStripedMutex = stripedmutex.New(128) +var hyperlocalStripedMutex = intstripedmutex.New(128) var s2CellLookup = sync.Map{} @@ -168,6 +175,11 @@ func initDataCache() { ttlcache.WithTTL[string, Route](60 * time.Minute), ) go routeCache.Start() + + hyperlocalCache = ttlcache.New[HyperlocalKey, Hyperlocal]( + ttlcache.WithTTL[HyperlocalKey, Hyperlocal](60 * time.Minute), + ) + go hyperlocalCache.Start() } func InitialiseOhbem() { @@ -346,6 +358,39 @@ func UpdateStationBatch(ctx context.Context, db db.DbDetails, scanParameters Sca } } +func UpdateHyperlocalBatch(ctx context.Context, db db.DbDetails, scanParameters ScanParameters, p []RawHyperlocalData) { + if len(p) <= 0 { + return + } + + for _, raw := range p { + key := HyperlocalKey{ + ExperimentId: raw.Data.GetExperimentId(), + Lat: raw.Data.GetLatDegrees(), + Lon: raw.Data.GetLngDegrees(), + } + + hyperlocalMutex, _ := hyperlocalStripedMutex.GetLock(uint64(key.ExperimentId) ^ math.Float64bits(key.Lat) ^ math.Float64bits(key.Lon)) + hyperlocalMutex.Lock() + + hyperlocal, err := getHyperlocalRecord(ctx, db, key) + if err != nil { + log.Errorf("getHyperlocalRecord: %s", err) + hyperlocalMutex.Unlock() + continue + } + + if hyperlocal == nil { + hyperlocal = &Hyperlocal{} + } + + hyperlocal.updateFromHyperlocalProto(raw.Data, raw.Timestamp) + saveHyperlocalRecord(ctx, db, hyperlocal) + + hyperlocalMutex.Unlock() + } +} + func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters ScanParameters, wildPokemonList []RawWildPokemonData, nearbyPokemonList []RawNearbyPokemonData, mapPokemonList []RawMapPokemonData, weather []*pogo.ClientWeatherProto, username string) { weatherLookup := make(map[int64]pogo.GameplayWeatherProto_WeatherCondition) for _, weatherProto := range weather { diff --git a/decoder/pokestop.go b/decoder/pokestop.go index 2f4be91b..e04f4373 100644 --- a/decoder/pokestop.go +++ b/decoder/pokestop.go @@ -438,7 +438,7 @@ func (stop *Pokestop) updatePokestopFromQuestProto(questProto *pogo.FortSearchOu } else { infoData["pokemon_id"] = int(info.GetPokemonId()) } - if info.ShinyProbability > 0.0 { + if info.ShinyProbability != 0.0 { infoData["shiny_probability"] = info.ShinyProbability } if display := info.PokemonDisplay; display != nil { diff --git a/decoder/scanarea.go b/decoder/scanarea.go index 1188994e..7d2ef13d 100644 --- a/decoder/scanarea.go +++ b/decoder/scanarea.go @@ -7,15 +7,16 @@ import ( ) type ScanParameters struct { - ProcessPokemon bool - ProcessWild bool - ProcessNearby bool - ProcessWeather bool - ProcessPokestops bool - ProcessGyms bool - ProcessStations bool - ProcessCells bool - ProcessTappables bool + ProcessPokemon bool + ProcessWild bool + ProcessNearby bool + ProcessWeather bool + ProcessPokestops bool + ProcessGyms bool + ProcessStations bool + ProcessHyperlocals bool + ProcessCells bool + ProcessTappables bool } func FindScanConfiguration(scanContext string, lat, lon float64) ScanParameters { @@ -54,27 +55,29 @@ func FindScanConfiguration(scanContext string, lat, lon float64) ScanParameters return *value } return ScanParameters{ - ProcessPokemon: defaultTrue(rule.ProcessPokemon), - ProcessWild: defaultTrue(rule.ProcessWilds), - ProcessNearby: defaultTrue(rule.ProcessNearby), - ProcessCells: defaultTrue(rule.ProcessCells), - ProcessWeather: defaultTrue(rule.ProcessWeather), - ProcessPokestops: defaultTrue(rule.ProcessPokestops), - ProcessGyms: defaultTrue(rule.ProcessGyms), - ProcessStations: defaultTrue(rule.ProcessStations), - ProcessTappables: defaultTrue(rule.ProcessTappables), + ProcessPokemon: defaultTrue(rule.ProcessPokemon), + ProcessWild: defaultTrue(rule.ProcessWilds), + ProcessNearby: defaultTrue(rule.ProcessNearby), + ProcessCells: defaultTrue(rule.ProcessCells), + ProcessWeather: defaultTrue(rule.ProcessWeather), + ProcessPokestops: defaultTrue(rule.ProcessPokestops), + ProcessGyms: defaultTrue(rule.ProcessGyms), + ProcessStations: defaultTrue(rule.ProcessStations), + ProcessTappables: defaultTrue(rule.ProcessTappables), + ProcessHyperlocals: defaultTrue(rule.ProcessHyperlocals), } } return ScanParameters{ - ProcessPokemon: true, - ProcessWild: true, - ProcessNearby: true, - ProcessCells: true, - ProcessWeather: true, - ProcessGyms: true, - ProcessPokestops: true, - ProcessStations: true, - ProcessTappables: true, + ProcessPokemon: true, + ProcessWild: true, + ProcessNearby: true, + ProcessCells: true, + ProcessWeather: true, + ProcessGyms: true, + ProcessPokestops: true, + ProcessStations: true, + ProcessTappables: true, + ProcessHyperlocals: true, } } diff --git a/main.go b/main.go index abf98bc7..d27e086b 100644 --- a/main.go +++ b/main.go @@ -226,6 +226,10 @@ func main() { StartTappableExpiry(db) } + if cfg.Cleanup.Hyperlocals == true { + StartHyperlocalExpiry(db) + } + if cfg.Cleanup.Quests == true { StartQuestExpiry(db) } @@ -825,6 +829,7 @@ func decodeGMO(ctx context.Context, protoData *ProtoData, scanParameters decoder var newMapPokemon []decoder.RawMapPokemonData var newMapCells []uint64 var cellsToBeCleaned []uint64 + var newHyperlocals []decoder.RawHyperlocalData for _, mapCell := range decodedGmo.MapCell { if isCellNotEmpty(mapCell) { @@ -849,6 +854,9 @@ func decodeGMO(ctx context.Context, protoData *ProtoData, scanParameters decoder for _, station := range mapCell.Stations { newStations = append(newStations, decoder.RawStationData{Cell: mapCell.S2CellId, Data: station}) } + for _, hyperlocal := range mapCell.HyperlocalExperiment { + newHyperlocals = append(newHyperlocals, decoder.RawHyperlocalData{Data: hyperlocal, Timestamp: mapCell.AsOfTimeMs}) + } } if scanParameters.ProcessGyms || scanParameters.ProcessPokestops { @@ -863,6 +871,9 @@ func decodeGMO(ctx context.Context, protoData *ProtoData, scanParameters decoder if scanParameters.ProcessStations { decoder.UpdateStationBatch(ctx, dbDetails, scanParameters, newStations) } + if scanParameters.ProcessHyperlocals { + decoder.UpdateHyperlocalBatch(ctx, dbDetails, scanParameters, newHyperlocals) + } if scanParameters.ProcessCells { decoder.UpdateClientMapS2CellBatch(ctx, dbDetails, newMapCells) diff --git a/sql/48_hyperlocal.up.sql b/sql/48_hyperlocal.up.sql new file mode 100644 index 00000000..26e32592 --- /dev/null +++ b/sql/48_hyperlocal.up.sql @@ -0,0 +1,14 @@ +CREATE TABLE `hyperlocal` ( + `experiment_id` INT NOT NULL, + `start_ms` BIGINT NOT NULL, + `end_ms` BIGINT NOT NULL, + `lat` DOUBLE(18,14) NOT NULL, + `lon` DOUBLE(18,14) NOT NULL, + `radius_m` DOUBLE(18,14) NOT NULL, + `challenge_bonus_key` VARCHAR(255) NOT NULL, + `updated_ms` BIGINT NOT NULL, + PRIMARY KEY(`experiment_id`,`lat`,`lon`), + KEY `ix_end_ms` (`end_ms`,`lat`,`lon`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_general_ci; diff --git a/stats.go b/stats.go index 1cf549f0..86d30011 100644 --- a/stats.go +++ b/stats.go @@ -215,6 +215,36 @@ func StartTappableExpiry(db *sqlx.DB) { }() } +func StartHyperlocalExpiry(db *sqlx.DB) { + ticker := time.NewTicker(time.Hour + 30*time.Minute) + go func() { + for { + <-ticker.C + start := time.Now() + + var result sql.Result + var err error + + // Delete expired hyperlocals where end_ms is in the past + result, err = db.Exec("DELETE FROM hyperlocal WHERE end_ms < UNIX_TIMESTAMP() * 1000;") + + elapsed := time.Since(start) + + if err != nil { + log.Errorf("DB - Cleanup of hyperlocal table error %s", err) + } else { + rows, _ := result.RowsAffected() + log.Infof("DB - Cleanup of hyperlocal table took %s (%d rows)", elapsed, rows) + + // Clear the hyperlocal cache if we deleted any rows + if rows > 0 { + decoder.ClearHyperlocalCache() + } + } + } + }() +} + func StartQuestExpiry(db *sqlx.DB) { ticker := time.NewTicker(time.Hour + 1*time.Minute) go func() {