From 5aa4c931dca22ca03671b17581b48b494120ca76 Mon Sep 17 00:00:00 2001 From: Claret Nnamocha Date: Tue, 28 Oct 2025 10:28:14 +0100 Subject: [PATCH 1/3] feat: implement equal slot distribution for providers in priority queue - Refactored CreatePriorityQueueForBucket to use two-pass algorithm - All providers now receive equal number of queue slots regardless of token count - Providers with fewer tokens have their tokens cycled proportionally - Added 20-slot cap per provider to prevent excessive queue sizes - Added comprehensive logging for distribution metrics - Added 4 unit tests covering equal distribution, capping, and edge cases Fixes issue where providers with more tokens dominated the queue --- services/priority_queue.go | 59 ++- services/priority_queue_test.go | 612 ++++++++++++++++++++++++++++++++ 2 files changed, 667 insertions(+), 4 deletions(-) diff --git a/services/priority_queue.go b/services/priority_queue.go index bba527dc9..f23559d4d 100644 --- a/services/priority_queue.go +++ b/services/priority_queue.go @@ -206,6 +206,10 @@ func (s *PriorityQueueService) CreatePriorityQueueForBucket(ctx context.Context, // TODO: add also the checks for all the currencies that a provider has + // First pass: collect all valid token entries for each provider + providerTokenEntries := make(map[string][]string) // map[providerID][]queueEntry + maxTokensPerProvider := 0 + for _, provider := range providers { exists, err := provider.QueryProviderCurrencies(). Where(providercurrencies.HasCurrencyWith(fiatcurrency.IDEQ(bucket.Edges.Currency.ID))). @@ -234,6 +238,8 @@ func (s *PriorityQueueService) CreatePriorityQueueForBucket(ctx context.Context, } tokenSymbols := []string{} + providerEntries := []string{} + for _, orderToken := range orderTokens { if utils.ContainsString(tokenSymbols, orderToken.Edges.Token.Symbol) { continue @@ -269,18 +275,63 @@ func (s *PriorityQueueService) CreatePriorityQueueForBucket(ctx context.Context, // Serialize the provider ID, token, rate, min and max order amount into a single string data := fmt.Sprintf("%s:%s:%s:%s:%s", provider.ID, orderToken.Edges.Token.Symbol, rate, orderToken.MinOrderAmount, orderToken.MaxOrderAmount) + providerEntries = append(providerEntries, data) + } + + // Track this provider's entries if they have any valid tokens + if len(providerEntries) > 0 { + providerTokenEntries[provider.ID] = providerEntries + if len(providerEntries) > maxTokensPerProvider { + maxTokensPerProvider = len(providerEntries) + } + } + } + + // Cap maximum slots per provider at 20 to prevent excessive queue size + const maxSlotsPerProvider = 20 + targetSlotsPerProvider := maxTokensPerProvider + if targetSlotsPerProvider > maxSlotsPerProvider { + targetSlotsPerProvider = maxSlotsPerProvider + } + + // If no providers have valid entries, exit early + if targetSlotsPerProvider == 0 { + logger.WithFields(logger.Fields{ + "CurrencyCode": bucket.Edges.Currency.Code, + "MinAmount": bucket.MinAmount, + "MaxAmount": bucket.MaxAmount, + }).Warnf("no valid provider entries found for bucket") + return + } + + // Second pass: distribute slots equally by cycling through each provider's tokens + for providerID, entries := range providerTokenEntries { + numEntries := len(entries) + for i := 0; i < targetSlotsPerProvider; i++ { + // Cycle through available entries using modulo + data := entries[i%numEntries] // Enqueue the serialized data into the circular queue - err = storage.RedisClient.RPush(ctx, redisKey, data).Err() + err := storage.RedisClient.RPush(ctx, redisKey, data).Err() if err != nil && err != context.Canceled { logger.WithFields(logger.Fields{ - "Error": fmt.Sprintf("%v", err), - "Key": redisKey, - "Data": data, + "Error": fmt.Sprintf("%v", err), + "Key": redisKey, + "Data": data, + "ProviderID": providerID, }).Errorf("failed to enqueue provider data to circular queue") } } } + + logger.WithFields(logger.Fields{ + "CurrencyCode": bucket.Edges.Currency.Code, + "MinAmount": bucket.MinAmount, + "MaxAmount": bucket.MaxAmount, + "NumProviders": len(providerTokenEntries), + "TargetSlotsPerProvider": targetSlotsPerProvider, + "TotalQueueSize": len(providerTokenEntries) * targetSlotsPerProvider, + }).Infof("successfully created priority queue with equal slot distribution") } // AssignLockPaymentOrders assigns lock payment orders to providers diff --git a/services/priority_queue_test.go b/services/priority_queue_test.go index 2abeaaa63..8776d1569 100644 --- a/services/priority_queue_test.go +++ b/services/priority_queue_test.go @@ -578,4 +578,616 @@ func TestPriorityQueueTest(t *testing.T) { // close(orderRequestChan) // }) // }) + + t.Run("TestEqualSlotDistribution", func(t *testing.T) { + ctx := context.Background() + + // Create a second token for testing + token2Id, err := db.Client.Token. + Create(). + SetSymbol("TST2"). + SetContractAddress("0xd4E96eF8eee8678dBFf4d535E033Ed1a4F7605b8"). + SetDecimals(6). + SetNetworkID(testCtxForPQ.token.Edges.Network.ID). + SetIsEnabled(true). + SetBaseCurrency("KES"). + OnConflict(). + UpdateNewValues(). + ID(ctx) + assert.NoError(t, err) + + token2, err := db.Client.Token. + Query(). + Where(tokenEnt.IDEQ(token2Id)). + WithNetwork(). + Only(ctx) + assert.NoError(t, err) + + // Create a third token for testing + token3Id, err := db.Client.Token. + Create(). + SetSymbol("TST3"). + SetContractAddress("0xd4E96eF8eee8678dBFf4d535E033Ed1a4F7605b9"). + SetDecimals(6). + SetNetworkID(testCtxForPQ.token.Edges.Network.ID). + SetIsEnabled(true). + SetBaseCurrency("KES"). + OnConflict(). + UpdateNewValues(). + ID(ctx) + assert.NoError(t, err) + + token3, err := db.Client.Token. + Query(). + Where(tokenEnt.IDEQ(token3Id)). + WithNetwork(). + Only(ctx) + assert.NoError(t, err) + + // Create provider with multiple tokens (3 tokens) + multiTokenProvider, err := test.CreateTestUser(map[string]interface{}{ + "scope": "provider", + "email": "multitoken@test.com", + }) + assert.NoError(t, err) + + multiTokenProviderProfile, err := test.CreateTestProviderProfile(map[string]interface{}{ + "user_id": multiTokenProvider.ID, + "currency_id": testCtxForPQ.currency.ID, + "host_identifier": "https://multitoken.com", + }) + assert.NoError(t, err) + + // Add 3 tokens to multiTokenProvider + _, err = test.AddProviderOrderTokenToProvider( + map[string]interface{}{ + "fixed_conversion_rate": decimal.NewFromFloat(100), + "conversion_rate_type": "fixed", + "floating_conversion_rate": decimal.NewFromFloat(1.0), + "max_order_amount": decimal.NewFromFloat(1000), + "min_order_amount": decimal.NewFromFloat(1.0), + "provider": multiTokenProviderProfile, + "currency_id": testCtxForPQ.currency.ID, + "network": testCtxForPQ.token.Edges.Network.Identifier, + "token_id": testCtxForPQ.token.ID, + }, + ) + assert.NoError(t, err) + + _, err = test.AddProviderOrderTokenToProvider( + map[string]interface{}{ + "fixed_conversion_rate": decimal.NewFromFloat(110), + "conversion_rate_type": "fixed", + "floating_conversion_rate": decimal.NewFromFloat(1.0), + "max_order_amount": decimal.NewFromFloat(1000), + "min_order_amount": decimal.NewFromFloat(1.0), + "provider": multiTokenProviderProfile, + "currency_id": testCtxForPQ.currency.ID, + "network": token2.Edges.Network.Identifier, + "token_id": token2.ID, + }, + ) + assert.NoError(t, err) + + _, err = test.AddProviderOrderTokenToProvider( + map[string]interface{}{ + "fixed_conversion_rate": decimal.NewFromFloat(120), + "conversion_rate_type": "fixed", + "floating_conversion_rate": decimal.NewFromFloat(1.0), + "max_order_amount": decimal.NewFromFloat(1000), + "min_order_amount": decimal.NewFromFloat(1.0), + "provider": multiTokenProviderProfile, + "currency_id": testCtxForPQ.currency.ID, + "network": token3.Edges.Network.Identifier, + "token_id": token3.ID, + }, + ) + assert.NoError(t, err) + + // Update ProviderCurrencies with sufficient balance + _, err = db.Client.ProviderCurrencies. + Update(). + Where(providercurrencies.HasProviderWith(providerprofile.IDEQ(multiTokenProviderProfile.ID))). + Where(providercurrencies.HasCurrencyWith(fiatcurrency.IDEQ(testCtxForPQ.currency.ID))). + SetAvailableBalance(decimal.NewFromFloat(100000)). + SetTotalBalance(decimal.NewFromFloat(100000)). + Save(ctx) + assert.NoError(t, err) + + // Create provider with single token + singleTokenProvider, err := test.CreateTestUser(map[string]interface{}{ + "scope": "provider", + "email": "singletoken@test.com", + }) + assert.NoError(t, err) + + singleTokenProviderProfile, err := test.CreateTestProviderProfile(map[string]interface{}{ + "user_id": singleTokenProvider.ID, + "currency_id": testCtxForPQ.currency.ID, + "host_identifier": "https://singletoken.com", + }) + assert.NoError(t, err) + + // Add only 1 token to singleTokenProvider + _, err = test.AddProviderOrderTokenToProvider( + map[string]interface{}{ + "fixed_conversion_rate": decimal.NewFromFloat(105), + "conversion_rate_type": "fixed", + "floating_conversion_rate": decimal.NewFromFloat(1.0), + "max_order_amount": decimal.NewFromFloat(1000), + "min_order_amount": decimal.NewFromFloat(1.0), + "provider": singleTokenProviderProfile, + "currency_id": testCtxForPQ.currency.ID, + "network": testCtxForPQ.token.Edges.Network.Identifier, + "token_id": testCtxForPQ.token.ID, + }, + ) + assert.NoError(t, err) + + // Update ProviderCurrencies with sufficient balance + _, err = db.Client.ProviderCurrencies. + Update(). + Where(providercurrencies.HasProviderWith(providerprofile.IDEQ(singleTokenProviderProfile.ID))). + Where(providercurrencies.HasCurrencyWith(fiatcurrency.IDEQ(testCtxForPQ.currency.ID))). + SetAvailableBalance(decimal.NewFromFloat(100000)). + SetTotalBalance(decimal.NewFromFloat(100000)). + Save(ctx) + assert.NoError(t, err) + + // Create a bucket with both providers + bucket, err := test.CreateTestProvisionBucket(map[string]interface{}{ + "provider_id": multiTokenProviderProfile.ID, + "min_amount": testCtxForPQ.minAmount, + "max_amount": testCtxForPQ.maxAmount, + "currency_id": testCtxForPQ.currency.ID, + }) + assert.NoError(t, err) + + // Add second provider to the same bucket + _, err = db.Client.ProvisionBucket. + UpdateOneID(bucket.ID). + AddProviderProfileIDs(singleTokenProviderProfile.ID). + Save(ctx) + assert.NoError(t, err) + + _bucket, err := db.Client.ProvisionBucket. + Query(). + Where(provisionbucket.IDEQ(bucket.ID)). + WithCurrency(). + WithProviderProfiles(). + Only(ctx) + assert.NoError(t, err) + + // Create the priority queue + service.CreatePriorityQueueForBucket(ctx, _bucket) + + redisKey := fmt.Sprintf("bucket_%s_%s_%s", _bucket.Edges.Currency.Code, testCtxForPQ.minAmount, testCtxForPQ.maxAmount) + + // Verify the queue structure + data, err := db.RedisClient.LRange(ctx, redisKey, 0, -1).Result() + assert.NoError(t, err) + + // Both providers should have equal slots (3 each, matching the max token count) + expectedSlots := 3 + totalExpectedSlots := expectedSlots * 2 // 2 providers + + assert.Equal(t, totalExpectedSlots, len(data), "Total slots should be %d (3 slots per provider x 2 providers)", totalExpectedSlots) + + // Count slots per provider + multiTokenSlots := 0 + singleTokenSlots := 0 + for _, entry := range data { + if strings.HasPrefix(entry, multiTokenProviderProfile.ID) { + multiTokenSlots++ + } else if strings.HasPrefix(entry, singleTokenProviderProfile.ID) { + singleTokenSlots++ + } + } + + assert.Equal(t, expectedSlots, multiTokenSlots, "Multi-token provider should have exactly %d slots", expectedSlots) + assert.Equal(t, expectedSlots, singleTokenSlots, "Single-token provider should have exactly %d slots", expectedSlots) + + // Verify that single token provider's token is repeated 3 times + singleTokenEntries := []string{} + for _, entry := range data { + if strings.HasPrefix(entry, singleTokenProviderProfile.ID) { + singleTokenEntries = append(singleTokenEntries, entry) + } + } + + // All entries should be identical (same token repeated) + for i := 1; i < len(singleTokenEntries); i++ { + assert.Equal(t, singleTokenEntries[0], singleTokenEntries[i], "Single-token provider entries should be identical (cycling)") + } + + // Verify multi-token provider has all 3 different tokens + multiTokenEntries := []string{} + tokenSymbolsFound := make(map[string]bool) + for _, entry := range data { + if strings.HasPrefix(entry, multiTokenProviderProfile.ID) { + multiTokenEntries = append(multiTokenEntries, entry) + parts := strings.Split(entry, ":") + if len(parts) >= 2 { + tokenSymbolsFound[parts[1]] = true + } + } + } + + assert.Equal(t, 3, len(tokenSymbolsFound), "Multi-token provider should have all 3 different tokens represented") + assert.True(t, tokenSymbolsFound["TST"], "Should contain TST token") + assert.True(t, tokenSymbolsFound["TST2"], "Should contain TST2 token") + assert.True(t, tokenSymbolsFound["TST3"], "Should contain TST3 token") + }) + + t.Run("TestSlotCapAt20", func(t *testing.T) { + ctx := context.Background() + + // Create a provider with many tokens (more than 20) + manyTokenProvider, err := test.CreateTestUser(map[string]interface{}{ + "scope": "provider", + "email": "manytoken@test.com", + }) + assert.NoError(t, err) + + manyTokenProviderProfile, err := test.CreateTestProviderProfile(map[string]interface{}{ + "user_id": manyTokenProvider.ID, + "currency_id": testCtxForPQ.currency.ID, + "host_identifier": "https://manytoken.com", + }) + assert.NoError(t, err) + + // Add 25 tokens to test the cap + for i := 0; i < 25; i++ { + tokenId, err := db.Client.Token. + Create(). + SetSymbol(fmt.Sprintf("CAP%d", i)). + SetContractAddress(fmt.Sprintf("0xCAP%024d", i)). + SetDecimals(6). + SetNetworkID(testCtxForPQ.token.Edges.Network.ID). + SetIsEnabled(true). + SetBaseCurrency("KES"). + OnConflict(). + UpdateNewValues(). + ID(ctx) + assert.NoError(t, err) + + _, err = test.AddProviderOrderTokenToProvider( + map[string]interface{}{ + "fixed_conversion_rate": decimal.NewFromFloat(100 + float64(i)), + "conversion_rate_type": "fixed", + "floating_conversion_rate": decimal.NewFromFloat(1.0), + "max_order_amount": decimal.NewFromFloat(1000), + "min_order_amount": decimal.NewFromFloat(1.0), + "provider": manyTokenProviderProfile, + "currency_id": testCtxForPQ.currency.ID, + "network": testCtxForPQ.token.Edges.Network.Identifier, + "token_id": tokenId, + }, + ) + assert.NoError(t, err) + } + + // Update ProviderCurrencies with sufficient balance + _, err = db.Client.ProviderCurrencies. + Update(). + Where(providercurrencies.HasProviderWith(providerprofile.IDEQ(manyTokenProviderProfile.ID))). + Where(providercurrencies.HasCurrencyWith(fiatcurrency.IDEQ(testCtxForPQ.currency.ID))). + SetAvailableBalance(decimal.NewFromFloat(100000)). + SetTotalBalance(decimal.NewFromFloat(100000)). + Save(ctx) + assert.NoError(t, err) + + // Create a bucket with this provider + bucket, err := test.CreateTestProvisionBucket(map[string]interface{}{ + "provider_id": manyTokenProviderProfile.ID, + "min_amount": testCtxForPQ.minAmount, + "max_amount": testCtxForPQ.maxAmount, + "currency_id": testCtxForPQ.currency.ID, + }) + assert.NoError(t, err) + + _bucket, err := db.Client.ProvisionBucket. + Query(). + Where(provisionbucket.IDEQ(bucket.ID)). + WithCurrency(). + WithProviderProfiles(). + Only(ctx) + assert.NoError(t, err) + + // Create the priority queue + service.CreatePriorityQueueForBucket(ctx, _bucket) + + redisKey := fmt.Sprintf("bucket_%s_%s_%s", _bucket.Edges.Currency.Code, testCtxForPQ.minAmount, testCtxForPQ.maxAmount) + + // Verify the queue structure + data, err := db.RedisClient.LRange(ctx, redisKey, 0, -1).Result() + assert.NoError(t, err) + + // Should be capped at 20 slots, not 25 + assert.Equal(t, 20, len(data), "Slots should be capped at 20 even though provider has 25 tokens") + + // Count unique tokens in the queue + tokenSymbolsFound := make(map[string]bool) + for _, entry := range data { + parts := strings.Split(entry, ":") + if len(parts) >= 2 { + tokenSymbolsFound[parts[1]] = true + } + } + + // Should have 20 unique tokens (cycling through the first 20) + assert.Equal(t, 20, len(tokenSymbolsFound), "Should have exactly 20 unique tokens in queue") + }) + + t.Run("TestNoValidProvidersExitEarly", func(t *testing.T) { + ctx := context.Background() + + // Create a provider with no tokens + noTokenProvider, err := test.CreateTestUser(map[string]interface{}{ + "scope": "provider", + "email": "notoken@test.com", + }) + assert.NoError(t, err) + + noTokenProviderProfile, err := test.CreateTestProviderProfile(map[string]interface{}{ + "user_id": noTokenProvider.ID, + "currency_id": testCtxForPQ.currency.ID, + "host_identifier": "https://notoken.com", + }) + assert.NoError(t, err) + + // Update ProviderCurrencies with sufficient balance + _, err = db.Client.ProviderCurrencies. + Update(). + Where(providercurrencies.HasProviderWith(providerprofile.IDEQ(noTokenProviderProfile.ID))). + Where(providercurrencies.HasCurrencyWith(fiatcurrency.IDEQ(testCtxForPQ.currency.ID))). + SetAvailableBalance(decimal.NewFromFloat(100000)). + SetTotalBalance(decimal.NewFromFloat(100000)). + Save(ctx) + assert.NoError(t, err) + + // Create a bucket with this provider (no tokens) + bucket, err := test.CreateTestProvisionBucket(map[string]interface{}{ + "provider_id": noTokenProviderProfile.ID, + "min_amount": testCtxForPQ.minAmount, + "max_amount": testCtxForPQ.maxAmount, + "currency_id": testCtxForPQ.currency.ID, + }) + assert.NoError(t, err) + + _bucket, err := db.Client.ProvisionBucket. + Query(). + Where(provisionbucket.IDEQ(bucket.ID)). + WithCurrency(). + WithProviderProfiles(). + Only(ctx) + assert.NoError(t, err) + + // Create the priority queue (should exit early) + service.CreatePriorityQueueForBucket(ctx, _bucket) + + redisKey := fmt.Sprintf("bucket_%s_%s_%s", _bucket.Edges.Currency.Code, testCtxForPQ.minAmount, testCtxForPQ.maxAmount) + + // Verify the queue is empty + data, err := db.RedisClient.LRange(ctx, redisKey, 0, -1).Result() + assert.NoError(t, err) + assert.Equal(t, 0, len(data), "Queue should be empty when no valid providers exist") + }) + + t.Run("TestTwoProvidersWithDifferentTokenCounts", func(t *testing.T) { + ctx := context.Background() + + // Create provider with 2 tokens + twoTokenProvider, err := test.CreateTestUser(map[string]interface{}{ + "scope": "provider", + "email": "twotoken@test.com", + }) + assert.NoError(t, err) + + twoTokenProviderProfile, err := test.CreateTestProviderProfile(map[string]interface{}{ + "user_id": twoTokenProvider.ID, + "currency_id": testCtxForPQ.currency.ID, + "host_identifier": "https://twotoken.com", + }) + assert.NoError(t, err) + + // Create tokens + token4Id, err := db.Client.Token. + Create(). + SetSymbol("TST4"). + SetContractAddress("0xd4E96eF8eee8678dBFf4d535E033Ed1a4F7605c0"). + SetDecimals(6). + SetNetworkID(testCtxForPQ.token.Edges.Network.ID). + SetIsEnabled(true). + SetBaseCurrency("KES"). + OnConflict(). + UpdateNewValues(). + ID(ctx) + assert.NoError(t, err) + + token5Id, err := db.Client.Token. + Create(). + SetSymbol("TST5"). + SetContractAddress("0xd4E96eF8eee8678dBFf4d535E033Ed1a4F7605c1"). + SetDecimals(6). + SetNetworkID(testCtxForPQ.token.Edges.Network.ID). + SetIsEnabled(true). + SetBaseCurrency("KES"). + OnConflict(). + UpdateNewValues(). + ID(ctx) + assert.NoError(t, err) + + // Add 2 tokens to twoTokenProvider + _, err = test.AddProviderOrderTokenToProvider( + map[string]interface{}{ + "fixed_conversion_rate": decimal.NewFromFloat(100), + "conversion_rate_type": "fixed", + "floating_conversion_rate": decimal.NewFromFloat(1.0), + "max_order_amount": decimal.NewFromFloat(1000), + "min_order_amount": decimal.NewFromFloat(1.0), + "provider": twoTokenProviderProfile, + "currency_id": testCtxForPQ.currency.ID, + "network": testCtxForPQ.token.Edges.Network.Identifier, + "token_id": token4Id, + }, + ) + assert.NoError(t, err) + + _, err = test.AddProviderOrderTokenToProvider( + map[string]interface{}{ + "fixed_conversion_rate": decimal.NewFromFloat(105), + "conversion_rate_type": "fixed", + "floating_conversion_rate": decimal.NewFromFloat(1.0), + "max_order_amount": decimal.NewFromFloat(1000), + "min_order_amount": decimal.NewFromFloat(1.0), + "provider": twoTokenProviderProfile, + "currency_id": testCtxForPQ.currency.ID, + "network": testCtxForPQ.token.Edges.Network.Identifier, + "token_id": token5Id, + }, + ) + assert.NoError(t, err) + + // Update ProviderCurrencies with sufficient balance + _, err = db.Client.ProviderCurrencies. + Update(). + Where(providercurrencies.HasProviderWith(providerprofile.IDEQ(twoTokenProviderProfile.ID))). + Where(providercurrencies.HasCurrencyWith(fiatcurrency.IDEQ(testCtxForPQ.currency.ID))). + SetAvailableBalance(decimal.NewFromFloat(100000)). + SetTotalBalance(decimal.NewFromFloat(100000)). + Save(ctx) + assert.NoError(t, err) + + // Create provider with 5 tokens + fiveTokenProvider, err := test.CreateTestUser(map[string]interface{}{ + "scope": "provider", + "email": "fivetoken@test.com", + }) + assert.NoError(t, err) + + fiveTokenProviderProfile, err := test.CreateTestProviderProfile(map[string]interface{}{ + "user_id": fiveTokenProvider.ID, + "currency_id": testCtxForPQ.currency.ID, + "host_identifier": "https://fivetoken.com", + }) + assert.NoError(t, err) + + // Add 5 tokens to fiveTokenProvider + for i := 0; i < 5; i++ { + tokenId, err := db.Client.Token. + Create(). + SetSymbol(fmt.Sprintf("TST%d", 10+i)). + SetContractAddress(fmt.Sprintf("0xd4E96eF8eee8678dBFf4d535E033Ed1a4F7605%02d", 10+i)). + SetDecimals(6). + SetNetworkID(testCtxForPQ.token.Edges.Network.ID). + SetIsEnabled(true). + SetBaseCurrency("KES"). + OnConflict(). + UpdateNewValues(). + ID(ctx) + assert.NoError(t, err) + + _, err = test.AddProviderOrderTokenToProvider( + map[string]interface{}{ + "fixed_conversion_rate": decimal.NewFromFloat(100 + float64(i)), + "conversion_rate_type": "fixed", + "floating_conversion_rate": decimal.NewFromFloat(1.0), + "max_order_amount": decimal.NewFromFloat(1000), + "min_order_amount": decimal.NewFromFloat(1.0), + "provider": fiveTokenProviderProfile, + "currency_id": testCtxForPQ.currency.ID, + "network": testCtxForPQ.token.Edges.Network.Identifier, + "token_id": tokenId, + }, + ) + assert.NoError(t, err) + } + + // Update ProviderCurrencies with sufficient balance + _, err = db.Client.ProviderCurrencies. + Update(). + Where(providercurrencies.HasProviderWith(providerprofile.IDEQ(fiveTokenProviderProfile.ID))). + Where(providercurrencies.HasCurrencyWith(fiatcurrency.IDEQ(testCtxForPQ.currency.ID))). + SetAvailableBalance(decimal.NewFromFloat(100000)). + SetTotalBalance(decimal.NewFromFloat(100000)). + Save(ctx) + assert.NoError(t, err) + + // Create a bucket with both providers + bucket, err := test.CreateTestProvisionBucket(map[string]interface{}{ + "provider_id": twoTokenProviderProfile.ID, + "min_amount": testCtxForPQ.minAmount, + "max_amount": testCtxForPQ.maxAmount, + "currency_id": testCtxForPQ.currency.ID, + }) + assert.NoError(t, err) + + // Add second provider to the same bucket + _, err = db.Client.ProvisionBucket. + UpdateOneID(bucket.ID). + AddProviderProfileIDs(fiveTokenProviderProfile.ID). + Save(ctx) + assert.NoError(t, err) + + _bucket, err := db.Client.ProvisionBucket. + Query(). + Where(provisionbucket.IDEQ(bucket.ID)). + WithCurrency(). + WithProviderProfiles(). + Only(ctx) + assert.NoError(t, err) + + // Create the priority queue + service.CreatePriorityQueueForBucket(ctx, _bucket) + + redisKey := fmt.Sprintf("bucket_%s_%s_%s", _bucket.Edges.Currency.Code, testCtxForPQ.minAmount, testCtxForPQ.maxAmount) + + // Verify the queue structure + data, err := db.RedisClient.LRange(ctx, redisKey, 0, -1).Result() + assert.NoError(t, err) + + // Both providers should have equal slots (5 each, matching the max token count) + expectedSlots := 5 + totalExpectedSlots := expectedSlots * 2 // 2 providers + + assert.Equal(t, totalExpectedSlots, len(data), "Total slots should be %d (5 slots per provider x 2 providers)", totalExpectedSlots) + + // Count slots per provider + twoTokenSlots := 0 + fiveTokenSlots := 0 + for _, entry := range data { + if strings.HasPrefix(entry, twoTokenProviderProfile.ID) { + twoTokenSlots++ + } else if strings.HasPrefix(entry, fiveTokenProviderProfile.ID) { + fiveTokenSlots++ + } + } + + assert.Equal(t, expectedSlots, twoTokenSlots, "Two-token provider should have exactly %d slots", expectedSlots) + assert.Equal(t, expectedSlots, fiveTokenSlots, "Five-token provider should have exactly %d slots", expectedSlots) + + // Verify that two-token provider cycles through its tokens + twoTokenEntries := []string{} + tokenSymbolsInTwoToken := make(map[string]int) + for _, entry := range data { + if strings.HasPrefix(entry, twoTokenProviderProfile.ID) { + twoTokenEntries = append(twoTokenEntries, entry) + parts := strings.Split(entry, ":") + if len(parts) >= 2 { + tokenSymbolsInTwoToken[parts[1]]++ + } + } + } + + // With 2 tokens and 5 slots: should have pattern like [T1, T2, T1, T2, T1] + assert.Equal(t, 2, len(tokenSymbolsInTwoToken), "Two-token provider should cycle through 2 unique tokens") + // Token counts should be 3 and 2 (or 2 and 3) + counts := []int{} + for _, count := range tokenSymbolsInTwoToken { + counts = append(counts, count) + } + assert.Contains(t, counts, 3, "One token should appear 3 times") + assert.Contains(t, counts, 2, "Other token should appear 2 times") + }) } From 761890b7a640a00229718edf5966a523bb7d3312 Mon Sep 17 00:00:00 2001 From: Claret Nnamocha Date: Tue, 28 Oct 2025 22:57:06 +0100 Subject: [PATCH 2/3] refactor: add deterministic provider ordering for reproducibility - Sort provider IDs before iteration to ensure consistent queue order - Prevents non-deterministic map iteration from affecting queue creation - Improves debugging experience with predictable queue ordering - Added TestDeterministicQueueOrder to verify consistent queue generation - Added containsString helper function for tests --- services/priority_queue.go | 11 ++- services/priority_queue_test.go | 142 ++++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+), 1 deletion(-) diff --git a/services/priority_queue.go b/services/priority_queue.go index f23559d4d..4a8303437 100644 --- a/services/priority_queue.go +++ b/services/priority_queue.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sort" "strings" "time" @@ -305,7 +306,15 @@ func (s *PriorityQueueService) CreatePriorityQueueForBucket(ctx context.Context, } // Second pass: distribute slots equally by cycling through each provider's tokens - for providerID, entries := range providerTokenEntries { + // Sort provider IDs for deterministic iteration order (aids debugging and testing) + providerIDs := make([]string, 0, len(providerTokenEntries)) + for providerID := range providerTokenEntries { + providerIDs = append(providerIDs, providerID) + } + sort.Strings(providerIDs) + + for _, providerID := range providerIDs { + entries := providerTokenEntries[providerID] numEntries := len(entries) for i := 0; i < targetSlotsPerProvider; i++ { // Cycle through available entries using modulo diff --git a/services/priority_queue_test.go b/services/priority_queue_test.go index 8776d1569..e3b979f35 100644 --- a/services/priority_queue_test.go +++ b/services/priority_queue_test.go @@ -1190,4 +1190,146 @@ func TestPriorityQueueTest(t *testing.T) { assert.Contains(t, counts, 3, "One token should appear 3 times") assert.Contains(t, counts, 2, "Other token should appear 2 times") }) + + t.Run("TestDeterministicQueueOrder", func(t *testing.T) { + ctx := context.Background() + + // Create 3 providers with different IDs to test deterministic ordering + providers := []struct { + email string + host string + }{ + {"provider-c@test.com", "https://provider-c.com"}, + {"provider-a@test.com", "https://provider-a.com"}, + {"provider-b@test.com", "https://provider-b.com"}, + } + + providerProfiles := []*ent.ProviderProfile{} + + for _, p := range providers { + providerUser, err := test.CreateTestUser(map[string]interface{}{ + "scope": "provider", + "email": p.email, + }) + assert.NoError(t, err) + + providerProfile, err := test.CreateTestProviderProfile(map[string]interface{}{ + "user_id": providerUser.ID, + "currency_id": testCtxForPQ.currency.ID, + "host_identifier": p.host, + }) + assert.NoError(t, err) + + // Add 1 token to each provider + _, err = test.AddProviderOrderTokenToProvider( + map[string]interface{}{ + "fixed_conversion_rate": decimal.NewFromFloat(100), + "conversion_rate_type": "fixed", + "floating_conversion_rate": decimal.NewFromFloat(1.0), + "max_order_amount": decimal.NewFromFloat(1000), + "min_order_amount": decimal.NewFromFloat(1.0), + "provider": providerProfile, + "currency_id": testCtxForPQ.currency.ID, + "network": testCtxForPQ.token.Edges.Network.Identifier, + "token_id": testCtxForPQ.token.ID, + }, + ) + assert.NoError(t, err) + + // Update ProviderCurrencies with sufficient balance + _, err = db.Client.ProviderCurrencies. + Update(). + Where(providercurrencies.HasProviderWith(providerprofile.IDEQ(providerProfile.ID))). + Where(providercurrencies.HasCurrencyWith(fiatcurrency.IDEQ(testCtxForPQ.currency.ID))). + SetAvailableBalance(decimal.NewFromFloat(100000)). + SetTotalBalance(decimal.NewFromFloat(100000)). + Save(ctx) + assert.NoError(t, err) + + providerProfiles = append(providerProfiles, providerProfile) + } + + // Create a bucket with all providers + bucket, err := test.CreateTestProvisionBucket(map[string]interface{}{ + "provider_id": providerProfiles[0].ID, + "min_amount": testCtxForPQ.minAmount, + "max_amount": testCtxForPQ.maxAmount, + "currency_id": testCtxForPQ.currency.ID, + }) + assert.NoError(t, err) + + // Add other providers to the bucket + _, err = db.Client.ProvisionBucket. + UpdateOneID(bucket.ID). + AddProviderProfileIDs(providerProfiles[1].ID, providerProfiles[2].ID). + Save(ctx) + assert.NoError(t, err) + + _bucket, err := db.Client.ProvisionBucket. + Query(). + Where(provisionbucket.IDEQ(bucket.ID)). + WithCurrency(). + WithProviderProfiles(). + Only(ctx) + assert.NoError(t, err) + + // Create the queue multiple times and verify consistent ordering + redisKey := fmt.Sprintf("bucket_%s_%s_%s", _bucket.Edges.Currency.Code, testCtxForPQ.minAmount, testCtxForPQ.maxAmount) + + // First run + service.CreatePriorityQueueForBucket(ctx, _bucket) + firstRun, err := db.RedisClient.LRange(ctx, redisKey, 0, -1).Result() + assert.NoError(t, err) + + // Delete and recreate + _, err = db.RedisClient.Del(ctx, redisKey).Result() + assert.NoError(t, err) + + // Second run + service.CreatePriorityQueueForBucket(ctx, _bucket) + secondRun, err := db.RedisClient.LRange(ctx, redisKey, 0, -1).Result() + assert.NoError(t, err) + + // Delete and recreate + _, err = db.RedisClient.Del(ctx, redisKey).Result() + assert.NoError(t, err) + + // Third run + service.CreatePriorityQueueForBucket(ctx, _bucket) + thirdRun, err := db.RedisClient.LRange(ctx, redisKey, 0, -1).Result() + assert.NoError(t, err) + + // All runs should produce identical queue order + assert.Equal(t, firstRun, secondRun, "First and second runs should produce identical queue order") + assert.Equal(t, secondRun, thirdRun, "Second and third runs should produce identical queue order") + + // Verify providers are ordered deterministically (sorted by ID) + // Collect all provider IDs in order of appearance + seenProviders := []string{} + for _, entry := range firstRun { + parts := strings.Split(entry, ":") + if len(parts) > 0 { + providerID := parts[0] + if !containsString(seenProviders, providerID) { + seenProviders = append(seenProviders, providerID) + } + } + } + + // Verify the providers appear in sorted order + sortedProviders := make([]string, len(seenProviders)) + copy(sortedProviders, seenProviders) + sort.Strings(sortedProviders) + assert.Equal(t, sortedProviders, seenProviders, "Providers should appear in sorted ID order") + }) +} + +// Helper function for the test +func containsString(slice []string, str string) bool { + for _, s := range slice { + if s == str { + return true + } + } + return false } From 312031e2cb1a45b9c6a4b20b68b21dfec65302d4 Mon Sep 17 00:00:00 2001 From: Claret Nnamocha Date: Tue, 28 Oct 2025 23:15:18 +0100 Subject: [PATCH 3/3] fix: correct logical condition in OnlySender and OnlyProvider middleware - Update condition checks in OnlySenderMiddleware and OnlyProviderMiddleware to use logical OR instead of AND for better validation of scope. - Ensures proper handling of unauthorized access when the sender or provider is not set. --- routers/middleware/auth.go | 4 ++-- tasks/tasks_test.go | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/routers/middleware/auth.go b/routers/middleware/auth.go index f2f1bbd04..6f44a2f54 100644 --- a/routers/middleware/auth.go +++ b/routers/middleware/auth.go @@ -465,7 +465,7 @@ func DynamicAuthMiddleware(c *gin.Context) { func OnlySenderMiddleware(c *gin.Context) { scope, ok := c.Get("sender") - if !ok && scope == nil { + if !ok || scope == nil { u.APIResponse(c, http.StatusUnauthorized, "error", "Invalid API key or token", nil) c.Abort() return @@ -478,7 +478,7 @@ func OnlySenderMiddleware(c *gin.Context) { func OnlyProviderMiddleware(c *gin.Context) { scope, ok := c.Get("provider") - if !ok && scope == nil { + if !ok || scope == nil { u.APIResponse(c, http.StatusUnauthorized, "error", "Invalid API key or token", nil) c.Abort() return diff --git a/tasks/tasks_test.go b/tasks/tasks_test.go index bd4c3305f..3bf8186bf 100644 --- a/tasks/tasks_test.go +++ b/tasks/tasks_test.go @@ -130,7 +130,9 @@ func TestTasks(t *testing.T) { // Setup test data err := setup() - assert.NoError(t, err) + if !assert.NoError(t, err) { + t.FailNow() + } t.Run("RetryFailedWebhookNotifications", func(t *testing.T) { httpmock.Activate()