Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions routers/middleware/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func DynamicAuthMiddleware(c *gin.Context) {
func OnlySenderMiddleware(c *gin.Context) {
scope, ok := c.Get("sender")

if !ok && scope == nil {
if !ok || scope == nil {
u.APIResponse(c, http.StatusUnauthorized, "error", "Invalid API key or token", nil)
c.Abort()
return
Expand All @@ -478,7 +478,7 @@ func OnlySenderMiddleware(c *gin.Context) {
func OnlyProviderMiddleware(c *gin.Context) {
scope, ok := c.Get("provider")

if !ok && scope == nil {
if !ok || scope == nil {
u.APIResponse(c, http.StatusUnauthorized, "error", "Invalid API key or token", nil)
c.Abort()
return
Expand Down
68 changes: 64 additions & 4 deletions services/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -206,6 +207,10 @@ func (s *PriorityQueueService) CreatePriorityQueueForBucket(ctx context.Context,

// TODO: add also the checks for all the currencies that a provider has

// First pass: collect all valid token entries for each provider
providerTokenEntries := make(map[string][]string) // map[providerID][]queueEntry
maxTokensPerProvider := 0

for _, provider := range providers {
exists, err := provider.QueryProviderCurrencies().
Where(providercurrencies.HasCurrencyWith(fiatcurrency.IDEQ(bucket.Edges.Currency.ID))).
Expand Down Expand Up @@ -234,6 +239,8 @@ func (s *PriorityQueueService) CreatePriorityQueueForBucket(ctx context.Context,
}

tokenSymbols := []string{}
providerEntries := []string{}

for _, orderToken := range orderTokens {
if utils.ContainsString(tokenSymbols, orderToken.Edges.Token.Symbol) {
continue
Expand Down Expand Up @@ -269,18 +276,71 @@ func (s *PriorityQueueService) CreatePriorityQueueForBucket(ctx context.Context,

// Serialize the provider ID, token, rate, min and max order amount into a single string
data := fmt.Sprintf("%s:%s:%s:%s:%s", provider.ID, orderToken.Edges.Token.Symbol, rate, orderToken.MinOrderAmount, orderToken.MaxOrderAmount)
providerEntries = append(providerEntries, data)
}

// Track this provider's entries if they have any valid tokens
if len(providerEntries) > 0 {
providerTokenEntries[provider.ID] = providerEntries
if len(providerEntries) > maxTokensPerProvider {
maxTokensPerProvider = len(providerEntries)
}
}
}

// Cap maximum slots per provider at 20 to prevent excessive queue size
const maxSlotsPerProvider = 20
targetSlotsPerProvider := maxTokensPerProvider
if targetSlotsPerProvider > maxSlotsPerProvider {
targetSlotsPerProvider = maxSlotsPerProvider
}

// If no providers have valid entries, exit early
if targetSlotsPerProvider == 0 {
logger.WithFields(logger.Fields{
"CurrencyCode": bucket.Edges.Currency.Code,
"MinAmount": bucket.MinAmount,
"MaxAmount": bucket.MaxAmount,
}).Warnf("no valid provider entries found for bucket")
return
}

// Second pass: distribute slots equally by cycling through each provider's tokens
// Sort provider IDs for deterministic iteration order (aids debugging and testing)
providerIDs := make([]string, 0, len(providerTokenEntries))
for providerID := range providerTokenEntries {
providerIDs = append(providerIDs, providerID)
}
sort.Strings(providerIDs)

for _, providerID := range providerIDs {
entries := providerTokenEntries[providerID]
numEntries := len(entries)
for i := 0; i < targetSlotsPerProvider; i++ {
// Cycle through available entries using modulo
data := entries[i%numEntries]

// Enqueue the serialized data into the circular queue
err = storage.RedisClient.RPush(ctx, redisKey, data).Err()
err := storage.RedisClient.RPush(ctx, redisKey, data).Err()
if err != nil && err != context.Canceled {
logger.WithFields(logger.Fields{
"Error": fmt.Sprintf("%v", err),
"Key": redisKey,
"Data": data,
"Error": fmt.Sprintf("%v", err),
"Key": redisKey,
"Data": data,
"ProviderID": providerID,
}).Errorf("failed to enqueue provider data to circular queue")
}
}
}

logger.WithFields(logger.Fields{
"CurrencyCode": bucket.Edges.Currency.Code,
"MinAmount": bucket.MinAmount,
"MaxAmount": bucket.MaxAmount,
"NumProviders": len(providerTokenEntries),
"TargetSlotsPerProvider": targetSlotsPerProvider,
"TotalQueueSize": len(providerTokenEntries) * targetSlotsPerProvider,
}).Infof("successfully created priority queue with equal slot distribution")
}

// AssignLockPaymentOrders assigns lock payment orders to providers
Expand Down
Loading
Loading