fix: memory leaks and race conditions with redis#689
Conversation
📝 WalkthroughWalkthroughReplaces ad-hoc HTTP clients with a shared singleton and adds graceful shutdown; centralizes Slack POST logic with request timeouts; reimplements crypto utilities in Go with stricter validation; introduces per-operation contexts/timeouts and parallelization in several tasks; restores tests to use an in-test email mock; plus widespread formatting/whitespace edits. Changes
Sequence Diagram(s)sequenceDiagram
participant Worker as RetryWorker
participant Store as DB/Store
participant Webhook as ExternalWebhook
participant Email as EmailService
participant Logger as Logger
Worker->>Store: Fetch failed webhook attempts (fetchCtx, 30s)
alt attempts found
loop per attempt
Worker->>Webhook: Send webhook (sendCtx, 30s)
Webhook-->>Worker: response (success / failure)
alt success
Worker->>Store: Save success (saveCtx, 10s)
Store-->>Worker: ack / err
else failure
Worker->>Email: Send failure email (emailCtx, 15s)
Email-->>Worker: ack / err (logged)
Worker->>Store: Save failure state (saveCtx, 10s)
Store-->>Worker: ack / err (logged)
end
end
end
Worker->>Logger: Log summary / continue
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
The base branch was changed.
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
tasks/fulfillments_webhooks.go (1)
591-614:⚠️ Potential issue | 🟠 MajorEarly
returnstatements in the expired block abort the entire batch.Lines 594, 605, and 613 call
returnon errors within the expired-status branch. This means a single problematic attempt (e.g., malformed sender ID, unreachable email service) will prevent all subsequent attempts in the loop from being processed. This contradicts the goal of the PR ("log error but don't fail entire batch").These should follow the same log-and-continue pattern used on lines 620–626.
🐛 Proposed fix: log and continue instead of returning
if nextRetryTime.Sub(attempt.CreatedAt.Add(-baseDelay)) > maxCumulativeTime { attemptUpdate.SetStatus(webhookretryattempt.StatusExpired) uid, err := uuid.Parse(attempt.Payload["data"].(map[string]interface{})["senderId"].(string)) if err != nil { saveCancel() - return fmt.Errorf("RetryFailedWebhookNotifications.FailedExtraction: %w", err) + logger.WithFields(logger.Fields{ + "Error": fmt.Sprintf("%v", err), + "AttemptID": attempt.ID, + }).Errorf("RetryFailedWebhookNotifications.FailedExtraction") + continue } profile, err := storage.Client.SenderProfile. Query(). Where( senderprofile.IDEQ(uid), ). WithUser(). Only(saveCtx) if err != nil { saveCancel() - return fmt.Errorf("RetryFailedWebhookNotifications.CouldNotFetchProfile: %w", err) + logger.WithFields(logger.Fields{ + "Error": fmt.Sprintf("%v", err), + "AttemptID": attempt.ID, + }).Errorf("RetryFailedWebhookNotifications.CouldNotFetchProfile") + continue } emailService := email.NewEmailServiceWithProviders() _, err = emailService.SendWebhookFailureEmail(saveCtx, profile.Edges.User.Email, profile.Edges.User.FirstName) if err != nil { saveCancel() - return fmt.Errorf("RetryFailedWebhookNotifications.SendWebhookFailureEmail: %w", err) + logger.WithFields(logger.Fields{ + "Error": fmt.Sprintf("%v", err), + "AttemptID": attempt.ID, + }).Errorf("RetryFailedWebhookNotifications.SendWebhookFailureEmail") + // Continue to save the expired status even if email fails } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/fulfillments_webhooks.go` around lines 591 - 614, In the expired-status branch, don't abort the whole batch on per-attempt failures: replace the early returns after uuid.Parse, the SenderProfile Query().Only(saveCtx) error, and the SendWebhookFailureEmail error with the same log-and-continue behavior used later (lines 620–626). Specifically, after calling saveCancel(), log the error with context (include attempt payload senderId and/or attempt ID), then continue the loop instead of returning; apply this to the uuid.Parse failure, the profile fetch (storage.Client.SenderProfile.Query().Where(...).WithUser().Only(saveCtx)) failure, and the emailService.SendWebhookFailureEmail(saveCtx, ...) failure.tasks/indexing.go (1)
66-93:⚠️ Potential issue | 🔴 CriticalData race: concurrent goroutines write to the shared outer
errvariable.Lines 76 and 86 use
=(not:=), assigning to theerrdeclared at line 47 in the function scope. Since multiple goroutines run in parallel, this is a data race onerr. TheindexerInstanceis fine (declared per-goroutine at line 69), buterrmust be shadowed locally inside the goroutine.🐛 Proposed fix: shadow `err` inside the goroutine
go func(network *ent.Network) { defer wg.Done() // Use the provided context with timeout instead of creating a new unbounded one var indexerInstance types.Indexer + var err error if strings.HasPrefix(network.Identifier, "tron") {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/indexing.go` around lines 66 - 93, The goroutine is writing to the shared outer err variable (data race); fix it by declaring a local err inside the goroutine (shadowing the outer one) before calling indexer.NewIndexerStarknet and indexer.NewIndexerEVM (e.g., use local "err := ..." or "var err error" within the goroutine) so the assignments in the NewIndexerStarknet/NewIndexerEVM branches do not mutate the outer err; keep indexerInstance as the per-goroutine variable already declared in the goroutine.
🤖 Fix all issues with AI agents
Verify each finding against the current code and only fix it if needed.
In `@controllers/accounts/auth_test.go`:
- Around line 263-345: The test "with valid payload and failed welcome email" no
longer triggers an email failure because the code uses &mockEmailService{} which
always succeeds; update the test to either inject a failing mock that implements
SendWelcomeEmail (e.g., create failingMockEmailService with SendWelcomeEmail
returning an error) and use that in the controller setup so the SendGrid
httpmock is not required, or remove/rename the test; ensure you reference
mockEmailService, SendWelcomeEmail, and the test name when making the change so
the test actually asserts the controller's error-handling path for welcome email
failures.
In `@tasks/fulfillments_webhooks.go`:
- Around line 575-618: The single 10s context (saveCtx/saveCancel) is reused
across multiple sequential I/O calls (storage.Client.SenderProfile.Query().Only,
email.NewEmailServiceWithProviders().SendWebhookFailureEmail, and
attemptUpdate.Save) causing the final Save to risk context deadline exceeded;
instead create separate short-lived contexts for each network/DB call (e.g.,
ctxProfile with its own timeout for the SenderProfile Query, ctxEmail for
SendWebhookFailureEmail, and ctxSave for attemptUpdate.Save), use those contexts
in the respective calls, and ensure each corresponding cancel function is
deferred/called promptly so the expired-path status update reliably persists
even if earlier ops are slow.
- Around line 591-614: In the expired-status branch, don't abort the whole batch
on per-attempt failures: replace the early returns after uuid.Parse, the
SenderProfile Query().Only(saveCtx) error, and the SendWebhookFailureEmail error
with the same log-and-continue behavior used later (lines 620–626).
Specifically, after calling saveCancel(), log the error with context (include
attempt payload senderId and/or attempt ID), then continue the loop instead of
returning; apply this to the uuid.Parse failure, the profile fetch
(storage.Client.SenderProfile.Query().Where(...).WithUser().Only(saveCtx))
failure, and the emailService.SendWebhookFailureEmail(saveCtx, ...) failure.
In `@tasks/indexing.go`:
- Around line 26-27: The 10s root context/lock TTL is too short: update the
context created at context.WithTimeout (ctx, cancel) and the lock TTL variable
referenced near the same block (currently 10*time.Second) to a longer duration
(e.g., 30–60s) and make the lock TTL slightly longer than the context timeout;
alternatively implement a longer parent context and derive per-network child
contexts inside the network-loop so slow RPC calls (used by the gateway indexing
goroutines waited on by wg.Wait()) only cancel their own work instead of all
networks—apply the change around the ctx, cancel := context.WithTimeout(...)
call and the lock TTL declaration, and ensure wg.Wait() remains coordinated with
the new timeouts.
- Around line 62-65: The inner WaitGroup named wg shadows the outer wg (declared
in the outer loop) which is confusing; rename the inner variable (for example
change the inner var wg sync.WaitGroup to transferWg or transferWG) and update
all uses (Add, Done, Wait calls) inside that inner scope so the outer and inner
WaitGroups are clearly distinct (look for occurrences of wg used inside the
transfer/inner goroutine block and rename them consistently).
- Around line 216-255: ResolvePaymentOrderMishaps (and similarly
ProcessStuckValidatedOrders) currently spins off unbounded goroutines with go
func(...) and returns immediately; change these to use the same pattern as
TaskIndexBlockchainEvents by creating a parent context with timeout/cancel, a
sync.WaitGroup to wait for all worker goroutines, and a bounded concurrency
semaphore (e.g., buffered channel or worker pool) to limit simultaneous
goroutines; launch each worker with wg.Add(1) and defer wg.Done() inside the
goroutine, pass the cancellable ctx to resolveMissedEvents, collect errors via a
channel and close it when done, wait for wg.Wait(), cancel the ctx on timeout or
on fatal error, and return a consolidated error to callers instead of returning
nil immediately.
- Around line 66-93: The goroutine is writing to the shared outer err variable
(data race); fix it by declaring a local err inside the goroutine (shadowing the
outer one) before calling indexer.NewIndexerStarknet and indexer.NewIndexerEVM
(e.g., use local "err := ..." or "var err error" within the goroutine) so the
assignments in the NewIndexerStarknet/NewIndexerEVM branches do not mutate the
outer err; keep indexerInstance as the per-goroutine variable already declared
in the goroutine.
In `@tasks/startup.go`:
- Around line 27-34: The ReassignStaleOrderRequest goroutine currently uses a
plain for-range over orderRequestChan and can block on channel reads during
shutdown; update the loop inside ReassignStaleOrderRequest to an explicit for {
select { case msg, ok := <-orderRequestChan: if !ok { return } /* existing
handling */ case <-ctx.Done(): return } } so it returns promptly when ctx is
cancelled and still handles a closed channel correctly, preserving the existing
message processing logic and ensuring the deferred orderRequest.Close() runs
timely.
- Around line 15-17: The current code uses ctx := context.Background() which
cannot be cancelled, so a long-lived subscription in ReassignStaleOrderRequest
may never exit and defer orderRequest.Close() won't run on shutdown; change the
function to accept a cancellable context passed in from the application's
shutdown coordinator (e.g., signal.NotifyContext in main) instead of creating
context.Background() here, and use that context when calling
ReassignStaleOrderRequest and any blocking Redis/subscribe operations so they
can return when the context is cancelled and allow orderRequest.Close() to run.
In `@utils/crypto/crypto.go`:
- Around line 68-82: The DecryptPlain function slices the input into nonce and
ciphertext without checking length, which can panic for short inputs; before
doing nonce, ciphertext := ciphertext[:gcm.NonceSize()],
ciphertext[gcm.NonceSize():], add a guard that validates len(ciphertext) >=
gcm.NonceSize() and return a descriptive error (e.g., "ciphertext too short for
nonce") if not; ensure you reference gcm.NonceSize() for the check and return
the error from DecryptPlain so callers can handle malformed input.
- Around line 173-176: PublicKeyDecryptPlain currently dereferences
privateKeyBlock.Bytes without checking pem.Decode output, risking a nil panic;
update PublicKeyDecryptPlain to validate the pem.Decode result (privateKeyBlock
!= nil) before accessing .Bytes, returning a clear error when nil, mirroring the
nil-check pattern used in PublicKeyEncryptPlain, then proceed to call
x509.ParsePKCS1PrivateKey only after the nil check.
In `@utils/http_client.go`:
- Around line 21-30: The custom transport creation currently builds a fresh
http.Transport (assigned to transport) which drops default behaviors like
ProxyFromEnvironment; instead clone http.DefaultTransport (type assert to
*http.Transport) and copy it into your transport, then override only pooling
fields (MaxIdleConns, MaxIdleConnsPerHost, IdleConnTimeout) before assigning it
to httpClient.Transport; ensure you preserve other defaults such as Proxy,
DialContext, TLSHandshakeTimeout, and so on when constructing the transport used
by the httpClient inside the once.Do block.
🧹 Nitpick comments (4)
🤖 Fix all nitpicks with AI agents
Verify each finding against the current code and only fix it if needed. In `@tasks/fulfillments_webhooks.go`: - Around line 575-618: The single 10s context (saveCtx/saveCancel) is reused across multiple sequential I/O calls (storage.Client.SenderProfile.Query().Only, email.NewEmailServiceWithProviders().SendWebhookFailureEmail, and attemptUpdate.Save) causing the final Save to risk context deadline exceeded; instead create separate short-lived contexts for each network/DB call (e.g., ctxProfile with its own timeout for the SenderProfile Query, ctxEmail for SendWebhookFailureEmail, and ctxSave for attemptUpdate.Save), use those contexts in the respective calls, and ensure each corresponding cancel function is deferred/called promptly so the expired-path status update reliably persists even if earlier ops are slow. In `@tasks/indexing.go`: - Around line 62-65: The inner WaitGroup named wg shadows the outer wg (declared in the outer loop) which is confusing; rename the inner variable (for example change the inner var wg sync.WaitGroup to transferWg or transferWG) and update all uses (Add, Done, Wait calls) inside that inner scope so the outer and inner WaitGroups are clearly distinct (look for occurrences of wg used inside the transfer/inner goroutine block and rename them consistently). - Around line 216-255: ResolvePaymentOrderMishaps (and similarly ProcessStuckValidatedOrders) currently spins off unbounded goroutines with go func(...) and returns immediately; change these to use the same pattern as TaskIndexBlockchainEvents by creating a parent context with timeout/cancel, a sync.WaitGroup to wait for all worker goroutines, and a bounded concurrency semaphore (e.g., buffered channel or worker pool) to limit simultaneous goroutines; launch each worker with wg.Add(1) and defer wg.Done() inside the goroutine, pass the cancellable ctx to resolveMissedEvents, collect errors via a channel and close it when done, wait for wg.Wait(), cancel the ctx on timeout or on fatal error, and return a consolidated error to callers instead of returning nil immediately. In `@tasks/startup.go`: - Around line 15-17: The current code uses ctx := context.Background() which cannot be cancelled, so a long-lived subscription in ReassignStaleOrderRequest may never exit and defer orderRequest.Close() won't run on shutdown; change the function to accept a cancellable context passed in from the application's shutdown coordinator (e.g., signal.NotifyContext in main) instead of creating context.Background() here, and use that context when calling ReassignStaleOrderRequest and any blocking Redis/subscribe operations so they can return when the context is cancelled and allow orderRequest.Close() to run.tasks/fulfillments_webhooks.go (1)
575-618: Single 10s timeout is shared across multiple I/O operations in the expired path.When an attempt expires,
saveCtx(10s) is used sequentially for the profile DB query (line 602), the email send (line 609), and the final DB save (line 617). If the profile query or email service is slow, the save can fail withcontext deadline exceeded— silently dropping the status update.Consider either giving each operation its own short-lived context, or increasing the timeout for the expired path to account for three sequential I/O calls.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/fulfillments_webhooks.go` around lines 575 - 618, The single 10s context (saveCtx/saveCancel) is reused across multiple sequential I/O calls (storage.Client.SenderProfile.Query().Only, email.NewEmailServiceWithProviders().SendWebhookFailureEmail, and attemptUpdate.Save) causing the final Save to risk context deadline exceeded; instead create separate short-lived contexts for each network/DB call (e.g., ctxProfile with its own timeout for the SenderProfile Query, ctxEmail for SendWebhookFailureEmail, and ctxSave for attemptUpdate.Save), use those contexts in the respective calls, and ensure each corresponding cancel function is deferred/called promptly so the expired-path status update reliably persists even if earlier ops are slow.tasks/startup.go (1)
15-17: Consider using a cancellable context for graceful shutdown.
context.Background()provides no way to signal this long-lived subscription goroutine to stop. IfReassignStaleOrderRequestblocks indefinitely on the channel, thedefer orderRequest.Close()on line 29 will never fire during a graceful shutdown — unless the Redis client itself is closed externally first (which is fragile and order-dependent).Passing a cancellable context from the application's shutdown coordinator (e.g., one derived from
signal.NotifyContextinmain.go) would allow the subscription to be cleanly interrupted and the goroutine to exit promptly.-func SubscribeToRedisKeyspaceEvents() { +func SubscribeToRedisKeyspaceEvents(ctx context.Context) { - ctx := context.Background()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/startup.go` around lines 15 - 17, The current code uses ctx := context.Background() which cannot be cancelled, so a long-lived subscription in ReassignStaleOrderRequest may never exit and defer orderRequest.Close() won't run on shutdown; change the function to accept a cancellable context passed in from the application's shutdown coordinator (e.g., signal.NotifyContext in main) instead of creating context.Background() here, and use that context when calling ReassignStaleOrderRequest and any blocking Redis/subscribe operations so they can return when the context is cancelled and allow orderRequest.Close() to run.tasks/indexing.go (2)
62-65: Innerwgshadows outerwg— works but worth noting for clarity.The inner
var wg sync.WaitGroupon line 132 shadows the outerwgdeclared on line 63. This is functionally correct (inner goroutines wait on the inner WaitGroup, outer goroutines wait on the outer one), but can be confusing during maintenance. Consider renaming the inner one (e.g.,transferWg) to make the two scopes more obvious.Also applies to: 130-184
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/indexing.go` around lines 62 - 65, The inner WaitGroup named wg shadows the outer wg (declared in the outer loop) which is confusing; rename the inner variable (for example change the inner var wg sync.WaitGroup to transferWg or transferWG) and update all uses (Add, Done, Wait calls) inside that inner scope so the outer and inner WaitGroups are clearly distinct (look for occurrences of wg used inside the transfer/inner goroutine block and rename them consistently).
216-255:ResolvePaymentOrderMishapsandProcessStuckValidatedOrdersstill have unbounded goroutines.These functions launch goroutines with
go func(...)(lines 246, 411) without aWaitGroupor context timeout — the same pattern this PR fixes inTaskIndexBlockchainEvents. The function returnsnilimmediately, so callers have no visibility into errors or completion, and goroutines can accumulate if processing stalls. Consider applying the sameWaitGroup+ timeout pattern here for consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/indexing.go` around lines 216 - 255, ResolvePaymentOrderMishaps (and similarly ProcessStuckValidatedOrders) currently spins off unbounded goroutines with go func(...) and returns immediately; change these to use the same pattern as TaskIndexBlockchainEvents by creating a parent context with timeout/cancel, a sync.WaitGroup to wait for all worker goroutines, and a bounded concurrency semaphore (e.g., buffered channel or worker pool) to limit simultaneous goroutines; launch each worker with wg.Add(1) and defer wg.Done() inside the goroutine, pass the cancellable ctx to resolveMissedEvents, collect errors via a channel and close it when done, wait for wg.Wait(), cancel the ctx on timeout or on fatal error, and return a consolidated error to callers instead of returning nil immediately.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
…kchainEvents - Context timeout: 10s → 45s (accommodates multi-network RPC calls) - Lock TTL: 10s → 50s (prevents expiry before wg.Wait() completion) - Prevents concurrent execution when cron interval is 4s
…tdown Convert for-range loop to explicit select with context.Done() case to enable prompt graceful shutdown. Handles channel closure correctly and preserves all existing message processing logic.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tasks/fulfillments_webhooks.go (1)
589-617:⚠️ Potential issue | 🟠 MajorInconsistent error handling: profile-fetch and email-send errors still abort the entire batch.
Lines 606 and 616
return fmt.Errorf(...), which stops processing all remaining attempts. This contradicts the new resilience pattern applied to save errors (lines 626–631), where you log andcontinue. A single expired attempt with a missing profile or transient email failure will prevent retries of every subsequent webhook in the batch.Additionally, Line 589 uses unchecked type assertions (
attempt.Payload["data"].(map[string]interface{})["senderId"].(string)) which will panic if the payload shape is unexpected. Since this payload comes from stored data, a corrupted or schema-mismatched record would crash the entire task.Proposed fix: log-and-continue + safe type assertions
- uid, err := uuid.Parse(attempt.Payload["data"].(map[string]interface{})["senderId"].(string)) - if err != nil { - return fmt.Errorf("RetryFailedWebhookNotifications.FailedExtraction: %w", err) - } + data, ok := attempt.Payload["data"].(map[string]interface{}) + if !ok { + logger.WithFields(logger.Fields{ + "AttemptID": attempt.ID, + }).Errorf("RetryFailedWebhookNotifications: invalid payload structure, missing 'data'") + continue + } + senderIDStr, ok := data["senderId"].(string) + if !ok { + logger.WithFields(logger.Fields{ + "AttemptID": attempt.ID, + }).Errorf("RetryFailedWebhookNotifications: invalid payload structure, missing 'senderId'") + continue + } + uid, err := uuid.Parse(senderIDStr) + if err != nil { + logger.WithFields(logger.Fields{ + "Error": fmt.Sprintf("%v", err), + "AttemptID": attempt.ID, + }).Errorf("RetryFailedWebhookNotifications.FailedExtraction") + continue + }if err != nil { - return fmt.Errorf("RetryFailedWebhookNotifications.CouldNotFetchProfile: %w", err) + logger.WithFields(logger.Fields{ + "Error": fmt.Sprintf("%v", err), + "AttemptID": attempt.ID, + }).Errorf("RetryFailedWebhookNotifications.CouldNotFetchProfile") + continue }if err != nil { - return fmt.Errorf("RetryFailedWebhookNotifications.SendWebhookFailureEmail: %w", err) + logger.WithFields(logger.Fields{ + "Error": fmt.Sprintf("%v", err), + "AttemptID": attempt.ID, + }).Errorf("RetryFailedWebhookNotifications.SendWebhookFailureEmail") + continue }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/fulfillments_webhooks.go` around lines 589 - 617, The batch handler currently panics on bad payload shapes and aborts the whole batch when a single profile-fetch or email-send fails; change the extraction at the top (the use of attempt.Payload["data"].(map[string]interface{})["senderId"].(string)) to defensive parsing with ok/type checks and handle any extraction error by logging (with context: attempt.ID or attempt.Payload) and continue the loop instead of returning; likewise replace the error returns after storage.Client.SenderProfile.Only (profile fetch) and emailService.SendWebhookFailureEmail so they log the specific error (include uid/attempt id) and continue processing remaining attempts rather than returning; keep the existing profileCtx/emailCtx cancellations and also defensively check profile and profile.Edges.User for nil before using Email/FirstName and treat missing user as a log-and-continue case.
🧹 Nitpick comments (9)
controllers/accounts/auth_test.go (2)
384-404: Remove commented-out test code.This 20-line commented block for "test unsupported fiat" adds noise. If it's needed later, it can be recovered from git history; otherwise track it in an issue.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@controllers/accounts/auth_test.go` around lines 384 - 404, Remove the 20-line commented-out "test unsupported fiat" block in controllers/accounts/auth_test.go (the t.Run(...) block that builds a types.RegisterPayload and calls test.PerformRequest with router) to eliminate dead/commented test noise; if the test is needed later, rely on git history or open an issue instead of keeping it in the file.
196-260: Test name is misleading — no welcome-email-specific assertion.This test is identical to the first registration test (different email address) and makes no assertion about the welcome email being sent. The mock always succeeds silently. Consider either:
- Verifying the mock was called (e.g., add a call counter to
mockEmailService)- Or renaming this test to reflect what it actually validates
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@controllers/accounts/auth_test.go` around lines 196 - 260, The test named "with valid payload and welcome email" doesn't assert the welcome email was sent; either rename the t.Run string to reflect it only verifies registration and API key/profile creation (e.g., "with valid payload creates user, profiles and API keys") or modify the test to assert the email mock was invoked by adding a call counter or spy to mockEmailService (increment a counter on send, expose accessor, then assert the counter > 0 after PerformRequest). Update references to mockEmailService and the t.Run name accordingly.utils/crypto/crypto.go (3)
484-493: Redundant marshal→unmarshal round-trip and no-op error wrapping.
messageis already a deserializedinterface{}(fromjson.Unmarshalinside the decrypt functions). Re-marshaling it to JSON and unmarshaling into*PaymentOrderRecipientworks but is wasteful. Ajson.Decoderormapstructureconversion would be more direct.Also, Line 492 wraps the error with
fmt.Errorf("%w", err)which adds zero context — just returnerrdirectly.Minimal fix for the error wrapping
- err = json.Unmarshal(messageBytes, &recipient) - if err != nil { - return nil, fmt.Errorf("%w", err) - } + if err = json.Unmarshal(messageBytes, &recipient); err != nil { + return nil, fmt.Errorf("failed to unmarshal recipient: %w", err) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@utils/crypto/crypto.go` around lines 484 - 493, The code does an unnecessary JSON marshal→unmarshal on the already-deserialized value `message` to build `*types.PaymentOrderRecipient` and wraps the unmarshal error with a no-op `fmt.Errorf("%w", err)`; instead, convert directly (e.g. use a map-to-struct decoder such as mapstructure.Decode or a json.Decoder/strong type assertion) to populate a `recipient` of type `*types.PaymentOrderRecipient` without re-marshalling, and when an error occurs return `err` directly (remove `fmt.Errorf("%w", err)`) — update the logic around the `recipient` variable and the decrypt functions that pass `message` to perform a direct conversion and simple error return.
440-457:isHybridEncryptedrelies on a magic number (256) for RSA key length detection.The hardcoded check
keyLen != 256assumes RSA-2048 is always used (256 bytes = 2048 bits). Consider extracting this as a named constant for clarity and to make future key-size changes easier to discover.+const rsaEncryptedKeySize = 256 // RSA-2048 produces 256-byte encrypted output + func isHybridEncrypted(data []byte) bool { ... - if keyLen != 256 { + if keyLen != rsaEncryptedKeySize {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@utils/crypto/crypto.go` around lines 440 - 457, The isHybridEncrypted function uses a hardcoded magic number 256 for the RSA key length; replace that literal with a descriptive constant (e.g., rsa2048KeyLenBytes or expectedRSAKeyLen) and use that constant in the keyLen comparison and the subsequent length check so future key-size changes are obvious; update any related comments and ensure the constant is defined near other crypto constants for discoverability.
131-158: Consider OAEP padding instead of PKCS1v15 for new encryption paths.
rsa.EncryptPKCS1v15/rsa.DecryptPKCS1v15are used throughout. PKCS#1 v1.5 is susceptible to padding-oracle attacks (Bleichenbacher). For any new encryption paths (e.g., the hybrid flow),rsa.EncryptOAEP/rsa.DecryptOAEPwould be more secure. Acknowledged this would be a breaking change for existing encrypted data, so flagging for awareness only.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@utils/crypto/crypto.go` around lines 131 - 158, The PublicKeyEncryptPlain function currently uses rsa.EncryptPKCS1v15 which is less secure; replace the PKCS#1 v1.5 call with rsa.EncryptOAEP using SHA-256: create a SHA-256 hash (sha256.New()), pass rand.Reader, the parsed *rsa.PublicKey, plaintext, and nil/optional label to rsa.EncryptOAEP, and return that result; also update any corresponding decryption path (e.g., calls using rsa.DecryptPKCS1v15) to rsa.DecryptOAEP to keep compatibility (note this is a breaking change for existing ciphertext), and ensure crypto/sha256 is imported for the hash.tasks/order_requests.go (4)
164-177:time.Sleepin retry loop blocks the goroutine and ignores context cancellation.Two minor concerns:
- Each
time.Sleep(250ms)is unconditional — including after the last iteration (i == 2), where the result is discarded and the loop exits. That wastes 250 ms on every DEL event that stays pending.- During graceful shutdown, the goroutine won't exit until the sleep finishes (up to 750 ms total). Since you added
ctx.Done()handling elsewhere, it would be consistent to respect it here too.A
select-based delay would fix both:Proposed fix
if isDelEvent { shouldSkip := false for i := 0; i < 3; i++ { currentOrder, err := storage.Client.PaymentOrder.Get(ctx, orderUUID) if err == nil && currentOrder != nil && currentOrder.Status != paymentorder.StatusPending { shouldSkip = true break } - time.Sleep(250 * time.Millisecond) + if i < 2 { // skip sleep on last iteration + select { + case <-time.After(250 * time.Millisecond): + case <-ctx.Done(): + return + } + } } if shouldSkip { continue } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/order_requests.go` around lines 164 - 177, The retry loop that checks storage.Client.PaymentOrder.Get when isDelEvent uses unconditional time.Sleep(250 * time.Millisecond) which can sleep after the final iteration and ignores ctx cancellation; replace the unconditional sleep with a select that either waits on time.After(250*time.Millisecond) or returns early when ctx.Done() is closed, and only perform the delay between iterations (i < 2) so you don't sleep after the last attempt; keep the existing shouldSkip logic and continue behavior unchanged.
291-300: Remove commented-out logger line, and consider hoisting service construction.
- Line 293: dead commented-out code — remove it to keep the file clean.
services.NewPriorityQueueService()(andbalance.New()at line 231) are instantiated on every message. If these constructors allocate clients or connections, this could be wasteful in a hot loop. Hoisting them above theforloop (or into the caller) would reuse the instance.Hoist service construction
func ReassignStaleOrderRequest(ctx context.Context, orderRequestChan <-chan *redis.Message) { + priorityQueueSvc := services.NewPriorityQueueService() + balanceSvc := balance.New() for { select {Then replace
services.NewPriorityQueueService()on line 291 andbalance.New()on line 231 with the pre-built instances.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/order_requests.go` around lines 291 - 300, Remove the dead commented logger line inside the ReassignStaleOrderRequest block and hoist service construction for heavy resources: create one instance of services.NewPriorityQueueService() and balance.New() outside (above) the main message-processing loop or in the caller, then replace inline calls to services.NewPriorityQueueService() and balance.New() inside the loop (and the AssignPaymentOrder call) with the reused instances (e.g., pqSvc.AssignPaymentOrder(...) and balanceSvc or similar) so constructors are not invoked per message; keep the existing error logging (logger.WithFields(...).Errorf(...)) unchanged except for removing the commented line.
210-246: Meta cleanup and balance release look correct; one small note on error visibility.The best-effort release + exclude-list push + meta deletion is well-structured. Silencing the
RPush/Delerrors with_, _is acceptable for best-effort, but theExpireAterror on line 224 is also silenced — inconsistent withreassignCancelledOrder(line 33) which logs that same failure. Consider logging atWarnlevel for parity, so TTL failures don't go unnoticed.Optional: log TTL failure
if metaProviderID != "" { excludeKey := fmt.Sprintf("order_exclude_list_%s", order.ID) _, _ = storage.RedisClient.RPush(ctx, excludeKey, metaProviderID).Result() - _ = storage.RedisClient.ExpireAt(ctx, excludeKey, time.Now().Add(orderConf.OrderRequestValidity*2)).Err() + if expErr := storage.RedisClient.ExpireAt(ctx, excludeKey, time.Now().Add(orderConf.OrderRequestValidity*2)).Err(); expErr != nil { + logger.WithFields(logger.Fields{ + "Error": fmt.Sprintf("%v", expErr), + "OrderID": order.ID.String(), + }).Warnf("ReassignStaleOrderRequest: failed to set TTL for order exclude list") + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/order_requests.go` around lines 210 - 246, The Redis ExpireAt call's error is being ignored while similar code in reassignCancelledOrder logs TTL failures; update the ExpireAt handling so that if storage.RedisClient.ExpireAt(ctx, excludeKey, time.Now().Add(orderConf.OrderRequestValidity*2)).Err() returns a non-nil error, you log it at Warn (include fields OrderID, ProviderID/excludeKey and the error) rather than silencing it; locate the block around metaProviderID != "" where excludeKey is set and change the anonymous discard of ExpireAt's error to a logger.Warnf/WithFields call matching the pattern used elsewhere in reassignCancelledOrder.
148-160: Payload parsing relies on_-split positional convention — consider a defensive prefix check.
strings.Split(msg.Payload, "_")followed by taking the last element works for keys likeorder_request_<uuid>, but would silently extract the wrong substring if the key format ever changes (e.g., a composite suffix). Theuuid.Parseguard catches garbage, which is good, but a quick prefix validation would make intent explicit and produce a clearer error message.Optional: add prefix guard
+ const orderRequestPrefix = "order_request_" + if !strings.HasPrefix(msg.Payload, orderRequestPrefix) { + logger.WithFields(logger.Fields{ + "Payload": msg.Payload, + }).Warnf("ReassignStaleOrderRequest: unexpected payload format, skipping") + continue + } + orderID := strings.TrimPrefix(msg.Payload, orderRequestPrefix) - key := strings.Split(msg.Payload, "_") - orderID := key[len(key)-1]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tasks/order_requests.go` around lines 148 - 160, The payload parsing currently splits msg.Payload by "_" and takes the last part which is brittle; before splitting, validate the expected prefix (e.g., strings.HasPrefix(msg.Payload, "order_request_")) and if present extract the substring after that prefix (assign to orderID) and pass it to uuid.Parse; if the prefix check fails, log a clear message via logger.WithFields (include msg.Payload) and continue—keep the existing uuid.Parse error handling for malformed UUIDs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@controllers/accounts/auth_test.go`:
- Around line 958-967: The test "FailsForEmptyResetToken" is using assert.Error
with its arguments reversed, so it always passes; replace the incorrect
assert.Error(t, errors.New("Invalid password reset token"), err) with the same
pattern used elsewhere: assert.NoError(t, err) to validate the PerformRequest
call succeeded, and then assert the response/body contains the expected error;
apply the identical change for the other two occurrences around the other
failing cases that call PerformRequest in this file (same pattern as other tests
using assert.NoError and then asserting response code/message).
In `@utils/crypto/crypto.go`:
- Around line 318-325: The code reads keyLen :=
binary.BigEndian.Uint32(encrypted[0:4]) and then does int(4+keyLen) which can
overflow and bypass the length check; update the logic in the function that
extracts encryptedKey/aesCiphertext to (1) first ensure encrypted has at least 4
bytes before reading the length, (2) promote keyLen to a wider integer (e.g.,
uint64 or int) and compute total := 4 + keyLen safely, (3) verify total <=
uint64(len(encrypted)) (or total <= len(encrypted) after safe cast) and reject
if not, and (4) then slice encryptedKey and aesCiphertext using the validated,
safely-cast index values (referencing variables keyLen, encryptedKey,
aesCiphertext where the slices are created).
---
Outside diff comments:
In `@tasks/fulfillments_webhooks.go`:
- Around line 589-617: The batch handler currently panics on bad payload shapes
and aborts the whole batch when a single profile-fetch or email-send fails;
change the extraction at the top (the use of
attempt.Payload["data"].(map[string]interface{})["senderId"].(string)) to
defensive parsing with ok/type checks and handle any extraction error by logging
(with context: attempt.ID or attempt.Payload) and continue the loop instead of
returning; likewise replace the error returns after
storage.Client.SenderProfile.Only (profile fetch) and
emailService.SendWebhookFailureEmail so they log the specific error (include
uid/attempt id) and continue processing remaining attempts rather than
returning; keep the existing profileCtx/emailCtx cancellations and also
defensively check profile and profile.Edges.User for nil before using
Email/FirstName and treat missing user as a log-and-continue case.
---
Nitpick comments:
In `@controllers/accounts/auth_test.go`:
- Around line 384-404: Remove the 20-line commented-out "test unsupported fiat"
block in controllers/accounts/auth_test.go (the t.Run(...) block that builds a
types.RegisterPayload and calls test.PerformRequest with router) to eliminate
dead/commented test noise; if the test is needed later, rely on git history or
open an issue instead of keeping it in the file.
- Around line 196-260: The test named "with valid payload and welcome email"
doesn't assert the welcome email was sent; either rename the t.Run string to
reflect it only verifies registration and API key/profile creation (e.g., "with
valid payload creates user, profiles and API keys") or modify the test to assert
the email mock was invoked by adding a call counter or spy to mockEmailService
(increment a counter on send, expose accessor, then assert the counter > 0 after
PerformRequest). Update references to mockEmailService and the t.Run name
accordingly.
In `@tasks/order_requests.go`:
- Around line 164-177: The retry loop that checks
storage.Client.PaymentOrder.Get when isDelEvent uses unconditional
time.Sleep(250 * time.Millisecond) which can sleep after the final iteration and
ignores ctx cancellation; replace the unconditional sleep with a select that
either waits on time.After(250*time.Millisecond) or returns early when
ctx.Done() is closed, and only perform the delay between iterations (i < 2) so
you don't sleep after the last attempt; keep the existing shouldSkip logic and
continue behavior unchanged.
- Around line 291-300: Remove the dead commented logger line inside the
ReassignStaleOrderRequest block and hoist service construction for heavy
resources: create one instance of services.NewPriorityQueueService() and
balance.New() outside (above) the main message-processing loop or in the caller,
then replace inline calls to services.NewPriorityQueueService() and
balance.New() inside the loop (and the AssignPaymentOrder call) with the reused
instances (e.g., pqSvc.AssignPaymentOrder(...) and balanceSvc or similar) so
constructors are not invoked per message; keep the existing error logging
(logger.WithFields(...).Errorf(...)) unchanged except for removing the commented
line.
- Around line 210-246: The Redis ExpireAt call's error is being ignored while
similar code in reassignCancelledOrder logs TTL failures; update the ExpireAt
handling so that if storage.RedisClient.ExpireAt(ctx, excludeKey,
time.Now().Add(orderConf.OrderRequestValidity*2)).Err() returns a non-nil error,
you log it at Warn (include fields OrderID, ProviderID/excludeKey and the error)
rather than silencing it; locate the block around metaProviderID != "" where
excludeKey is set and change the anonymous discard of ExpireAt's error to a
logger.Warnf/WithFields call matching the pattern used elsewhere in
reassignCancelledOrder.
- Around line 148-160: The payload parsing currently splits msg.Payload by "_"
and takes the last part which is brittle; before splitting, validate the
expected prefix (e.g., strings.HasPrefix(msg.Payload, "order_request_")) and if
present extract the substring after that prefix (assign to orderID) and pass it
to uuid.Parse; if the prefix check fails, log a clear message via
logger.WithFields (include msg.Payload) and continue—keep the existing
uuid.Parse error handling for malformed UUIDs.
In `@utils/crypto/crypto.go`:
- Around line 484-493: The code does an unnecessary JSON marshal→unmarshal on
the already-deserialized value `message` to build `*types.PaymentOrderRecipient`
and wraps the unmarshal error with a no-op `fmt.Errorf("%w", err)`; instead,
convert directly (e.g. use a map-to-struct decoder such as mapstructure.Decode
or a json.Decoder/strong type assertion) to populate a `recipient` of type
`*types.PaymentOrderRecipient` without re-marshalling, and when an error occurs
return `err` directly (remove `fmt.Errorf("%w", err)`) — update the logic around
the `recipient` variable and the decrypt functions that pass `message` to
perform a direct conversion and simple error return.
- Around line 440-457: The isHybridEncrypted function uses a hardcoded magic
number 256 for the RSA key length; replace that literal with a descriptive
constant (e.g., rsa2048KeyLenBytes or expectedRSAKeyLen) and use that constant
in the keyLen comparison and the subsequent length check so future key-size
changes are obvious; update any related comments and ensure the constant is
defined near other crypto constants for discoverability.
- Around line 131-158: The PublicKeyEncryptPlain function currently uses
rsa.EncryptPKCS1v15 which is less secure; replace the PKCS#1 v1.5 call with
rsa.EncryptOAEP using SHA-256: create a SHA-256 hash (sha256.New()), pass
rand.Reader, the parsed *rsa.PublicKey, plaintext, and nil/optional label to
rsa.EncryptOAEP, and return that result; also update any corresponding
decryption path (e.g., calls using rsa.DecryptPKCS1v15) to rsa.DecryptOAEP to
keep compatibility (note this is a breaking change for existing ciphertext), and
ensure crypto/sha256 is imported for the hash.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
controllers/accounts/auth_test.go (1)
383-403: Remove commented-out test code.This block has been commented out for a while. Either restore it as a proper test or delete it to reduce noise.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@controllers/accounts/auth_test.go` around lines 383 - 403, Remove the stale commented-out test block that wraps a t.Run and the RegisterPayload setup (the commented lines referencing t.Run, types.RegisterPayload, headers map, and test.PerformRequest) to reduce noise; alternatively, if intended to keep, restore it as an active test (e.g., a proper t.Run or TestRegisterUnsupportedFiat) by uncommenting and ensuring it asserts http.StatusInternalServerError against the router response.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@controllers/accounts/auth_test.go`:
- Around line 195-259: The "with valid payload and welcome email" test
duplicates the earlier registration test and doesn't assert welcome-email
behavior; update the test to either assert the email-sending call or remove it:
locate the test function named 'with valid payload and welcome email' in
auth_test.go and either (1) replace the no-op mock with a recording/mock spy for
SendWelcomeEmail (or the service method used by the Register handler) and assert
it was invoked with the expected user/email arguments after PerformRequest, or
(2) delete this duplicate test if email behavior is covered elsewhere; ensure
you reference the Register request via test.PerformRequest and the mocked
SendWelcomeEmail method used by the controller so the assertion targets the
correct symbol.
---
Nitpick comments:
In `@controllers/accounts/auth_test.go`:
- Around line 383-403: Remove the stale commented-out test block that wraps a
t.Run and the RegisterPayload setup (the commented lines referencing t.Run,
types.RegisterPayload, headers map, and test.PerformRequest) to reduce noise;
alternatively, if intended to keep, restore it as an active test (e.g., a proper
t.Run or TestRegisterUnsupportedFiat) by uncommenting and ensuring it asserts
http.StatusInternalServerError against the router response.
Memory Leak and Race Conditions Fix
Description
This pull request addresses critical memory leak issues that were causing the aggregator service to degrade over time and require frequent restarts. The service was experiencing 2.3-4.6 GB of memory growth per day due to unbounded goroutines, unmanaged HTTP connections, and missing resource cleanup. These changes implement proper resource lifecycle management and context cancellation to ensure stable long-term operation.
Key Issues Fixed
Unbounded Goroutine Accumulation in Indexing Task - The
TaskIndexBlockchainEvents()cron job (running every 4 seconds) was spawning goroutines without timeouts. If any indexing operation hung, goroutines accumulated indefinitely, consuming ~2-4 GB/day.Redis Subscription Memory Leak - The Redis
PSubscribe()connection created a persistent goroutine with no cleanup mechanism. If the subscription failed or needed to be shut down, the goroutine leaked indefinitely. This caused refunds for unknown reasons when orders weren't assigned to providers.RPC Client Connection Leaks - Multiple instances in
utils/userop.gowere creating new HTTP clients instead of reusing connections, causing connection pool exhaustion.Slack Notification HTTP Leaks - The Slack service used bare
http.Post()calls without timeouts, allowing stalled endpoints to hang indefinitely and leak connections.Missing Context Timeouts - Background tasks used
context.Background()directly without timeouts, allowing operations to hang indefinitely and exhaust system resources.No Graceful Shutdown - The application had no cleanup on exit, leaving HTTP client connections and other resources in an inconsistent state.
Changes Made
utils/http_client.go(NEW) - Created singleton HTTP client with proper connection pooling (100 max idle, 10 per host, 90s idle timeout). This prevents creating new clients repeatedly and ensures connections are reused.tasks/startup.go- Fixed Redis subscription cleanup by wrapping the goroutine with properdeferto close the subscription on exit, preventing goroutine leaks.tasks/indexing.go- Added context timeout (10 seconds) andsync.WaitGrouptoTaskIndexBlockchainEvents()to ensure all spawned goroutines complete before the function returns. This prevents goroutine accumulation.tasks/fulfillments_webhooks.go- Enabled context timeout (2 minutes) inRetryFailedWebhookNotifications()to prevent indefinite hanging on webhook retries.utils/userop.go- Replaced 5 instances of&http.Client{Transport: &http.Transport{}}withGetHTTPClient()to reuse the shared HTTP client for RPC operations across multiple functions, includingSendUserOperation(),SponsorUserOperation(),GetUserOperationByReceipt(),GetUserOperationStatus(), andgetStandardGasPrices().services/slack.go- AddedpostSlackJSON()helper method with 10-second timeout and switched all notification methods (SendUserSignupNotification(),SendActionFeedbackNotification(),SendSubmissionNotification()) to use the shared HTTP client viaGetHTTPClient()instead of creating new clients.main.go- Added import forutilspackage and graceful shutdown call toutils.CloseHTTPClient()to properly close idle HTTP connections on application exit.Impact
Memory Usage Before: 2.3-4.6 GB/day growth (service requires restart every 24-48 hours)
Memory Usage After: ~0 MB/day (stable, indefinite uptime)
Resource Improvements:
Testing
go build ./...)go vet ./...)go fmt ./utils ./tasks ./services)Verification Steps
To verify the fixes work correctly:
curl http://localhost:6060/debug/pprof/goroutine | head -1(should remain constant)lsof -p $(pgrep aggregator) | grep TCP | wc -l(should remain stable)Checklist
stable)By submitting this PR, I agree to Paycrest's Contributor Code of Conduct and Contribution Guide.
Summary Table
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Improvements