Skip to content

feat: implement bulk transaction processing with BulkRecorder#1940

Open
Ygohr wants to merge 2 commits intodevelopfrom
feature/mdz-1903-2
Open

feat: implement bulk transaction processing with BulkRecorder#1940
Ygohr wants to merge 2 commits intodevelopfrom
feature/mdz-1903-2

Conversation

@Ygohr
Copy link
Contributor

@Ygohr Ygohr commented Mar 21, 2026

Summary

Implements bulk processing for RabbitMQ transaction messages at the consumer layer. Messages are collected based on configurable batch size and flush timeout, then persisted using multi-row INSERT/UPDATE statements with ON CONFLICT for idempotency. This replaces the N+1 INSERT pattern with batched operations, reducing database round-trips from ~300 to ~2-5 for 50 transactions.

Motivation

The current transaction processing inserts each transaction and operation individually (1 INSERT per entity), causing significant performance overhead:

  • 50 transactions with 5 operations each = 300 database round-trips
  • 50 individual RabbitMQ ACKs

The bulk approach reduces this to:

  • 1 bulk transaction INSERT (50 rows)
  • 1 bulk operation INSERT (250 rows)
  • 1 bulk update for status transitions
  • 1 RabbitMQ bulk ACK

Expected throughput improvement: 10-100x depending on bulk size.

Semantic Decision

  • Type: Feature (new capability)
  • Scope: Transaction component only
  • Breaking: No - bulk mode is additive and disabled by default when async is disabled
  • Backward Compatible: Yes - existing behavior unchanged when RABBITMQ_TRANSACTION_ASYNC=false

Changes

New Files

File Purpose
components/transaction/internal/adapters/rabbitmq/bulk_collector.go Accumulates RabbitMQ messages with size/timeout-based flush triggers
components/transaction/internal/adapters/rabbitmq/bulk_collector_test.go Unit tests for BulkCollector (582 lines)
components/transaction/internal/services/command/create-bulk-transaction-operations-async.go Orchestrates bulk insert/update with fallback to individual processing
components/transaction/internal/services/command/create-bulk-transaction-operations-async_test.go Unit tests for bulk processing (927 lines)
components/transaction/internal/adapters/postgres/transaction/transaction_updatebulk_test.go Unit tests for bulk update repository methods (535 lines)
components/transaction/internal/bootstrap/config_test.go Unit tests for bulk config validation (162 lines)
pkg/repository/bulk.go Shared types: BulkInsertResult, BulkUpdateResult, DBExecutor interface

Environment Variables - Bulk Recorder

Variable Type Default Description
BULK_RECORDER_ENABLED bool true Enable bulk mode (requires RABBITMQ_TRANSACTION_ASYNC=true)
BULK_RECORDER_SIZE int workers × prefetch Messages per bulk batch
BULK_RECORDER_FLUSH_TIMEOUT_MS int 100 Max wait before flushing incomplete bulk
BULK_RECORDER_MAX_ROWS_PER_INSERT int 1000 Max rows per INSERT statement (PostgreSQL limit)
BULK_RECORDER_FALLBACK_ENABLED bool true Fall back to individual inserts on failure

Refactored Code

Before After
Individual transaction INSERT per message Multi-row INSERT with ON CONFLICT (id) DO NOTHING via CreateBulk()
Individual operation INSERT per operation Multi-row INSERT with ON CONFLICT (id) DO NOTHING via CreateBulk()
Individual status UPDATE per transaction Batched UPDATE with CTE grouping by status via UpdateBulk()
N individual RabbitMQ ACKs Single bulk ACK after batch processing

Repository Interface Changes

Repository New Methods
Transaction CreateBulk(ctx, transactions), CreateBulkTx(ctx, tx, transactions), UpdateBulk(ctx, transactions), UpdateBulkTx(ctx, tx, transactions)
Operation CreateBulk(ctx, operations), CreateBulkTx(ctx, tx, operations)

Key Implementation Details

  1. BulkCollector (bulk_collector.go):

    • Thread-safe message accumulation with mutex protection
    • Dual flush triggers: size threshold OR timeout expiration
    • Graceful shutdown flushes pending messages
    • Configurable error handler for flush failures
  2. Bulk Insert (transaction.postgresql.go):

    • Uses Squirrel query builder for multi-row INSERT
    • ON CONFLICT (id) DO NOTHING for idempotency
    • Chunking for large batches (default 1000 rows)
    • Sorts by ID before INSERT to prevent deadlocks
  3. Bulk Update (transaction.postgresql.go):

    • CTE-based UPDATE grouping transactions by target status
    • Only updates rows where status differs (skips unchanged)
    • Returns affected count for observability
  4. Fallback Safety (create-bulk-transaction-operations-async.go):

    • On bulk failure, reverts to CreateBalanceTransactionOperationsAsync per message
    • Duplicates (unique violation) treated as success
    • Partial success supported - processes what it can

Test Plan

  • Unit tests for BulkCollector - size/timeout triggers, shutdown flush, error handling
  • Unit tests for CreateBulk - insert counts, duplicate handling, chunking
  • Unit tests for UpdateBulk - status transitions, unchanged detection
  • Unit tests for CreateBulkTransactionOperationsAsync - orchestration, fallback
  • Unit tests for bulk config parsing and defaults
  • Verify mock interfaces updated for new repository methods

@Ygohr Ygohr self-assigned this Mar 21, 2026
@Ygohr Ygohr requested review from a team as code owners March 21, 2026 00:18
@Ygohr Ygohr added the back-end Back-end Issues label Mar 21, 2026
@Ygohr Ygohr linked an issue Mar 21, 2026 that may be closed by this pull request
@lerian-studio
Copy link
Contributor

This PR is very large (13 files, 3397 lines changed). Consider breaking it into smaller PRs for easier review.

@coderabbitai
Copy link

coderabbitai bot commented Mar 21, 2026

Walkthrough

This pull request introduces bulk transaction processing capabilities across multiple layers of the transaction component. It adds environment configuration for bulk recorder tuning (enablement, flush timeout, row limits, and fallback behavior), implements bulk database operations (bulk update transactions via PostgreSQL with chunked execution), introduces a RabbitMQ-based bulk message collector component with configurable size and timeout thresholds, adds configuration resolution with computed defaults based on worker/prefetch counts, implements a new service-level bulk transaction command that classifies payloads into inserts versus updates, handles fallback to individual processing on bulk operation failures, and performs downstream balance updates and metadata processing. Additionally, defensive nil-checks are added in transaction creation and update paths, and a new BulkUpdateResult type tracks bulk update operation metrics (attempted, updated, unchanged rows).

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant Service as CreateBulkTransactionOperationsAsync
    participant DB as PostgreSQL Repository
    participant Cache as Balance Service
    participant Queue as RabbitMQ Producer
    participant Redis as Redis Cache

    Client->>Service: Bulk Transaction Payloads
    Note over Service: Classify (Insert vs Update)
    Note over Service: Sort by ID
    
    Service->>DB: BulkInsert Transactions
    alt Bulk Insert Success
        DB-->>Service: Inserted Count
        Service->>DB: BulkInsert Operations
        DB-->>Service: Inserted Count
        Service->>DB: BulkUpdate Statuses (if transitions)
        DB-->>Service: Updated Count
    else Bulk Insert Fails
        Service->>Service: Fallback to Individual Processing
        loop For Each Payload
            Service->>Service: CreateBalanceTransactionOperationsAsync (per-item)
        end
    end
    
    alt Balance Updates Needed
        Service->>Cache: Update Balances (per payload)
        Cache-->>Service: Success/Error
    end
    
    Service->>Queue: Send Metadata & Events
    Service->>Redis: Cleanup Entries
    
    Service-->>Client: BulkResult (counts + fallback info)
Loading
sequenceDiagram
    participant Producer as Message Producer
    participant Collector as BulkCollector
    participant Timer as Timeout Timer
    participant Callback as Flush Callback
    participant Handler as Error Handler

    Producer->>Collector: Add(message)
    activate Collector
    Note over Collector: Buffer message
    
    alt Buffer reaches bulkSize
        Collector->>Callback: Call with buffered messages
    else flushTimeout elapses
        Collector->>Timer: Reset timeout
        Collector->>Callback: Call with buffered messages (if any)
    end
    
    activate Callback
    alt Callback succeeds
        Callback-->>Collector: nil
    else Callback fails
        Callback-->>Collector: error
        Collector->>Handler: Call error handler
        Handler->>Handler: Handle error
    end
    deactivate Callback
    
    Note over Collector: Clear buffer
    deactivate Collector
Loading

Suggested reviewers

  • gandalf-at-lerian
  • ClaraTersi
🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: implement bulk transaction processing with BulkRecorder' accurately summarizes the main feature addition and is clear and specific.
Description check ✅ Passed The PR description is comprehensive and well-structured, covering summary, motivation, semantic decision, changes, and test plan despite not following the exact template structure.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

You can get early access to new features in CodeRabbit.

Enable the early_access setting to enable early access features such as new models, tools, and more.

@lerian-studio
Copy link
Contributor

📊 Unit Test Coverage Report: midaz-transaction

Metric Value
Overall Coverage 85.2% ✅ PASS
Threshold 85%

Coverage by Package

Package Coverage
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/grpc/in 100.0%
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/http/in 78.5%
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/mongodb 66.7%
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/assetrate 100.0%
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/balance 100.0%
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/operation 90.0%
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/operationroute 100.0%
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/transaction 97.4%
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/transactionroute 100.0%
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/rabbitmq 91.6%
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/redis/balance 100.0%
github.com/LerianStudio/midaz/v3/components/transaction/internal/services/command 89.6%
github.com/LerianStudio/midaz/v3/components/transaction/internal/services/query 95.2%
github.com/LerianStudio/midaz/v3/components/transaction/internal/services 100.0%

Generated by Go PR Analysis workflow

@lerian-studio
Copy link
Contributor

🔒 Security Scan Results — crm

Trivy

Filesystem Scan

✅ No vulnerabilities or secrets found.

Docker Image Scan

✅ No vulnerabilities found.


Docker Hub Health Score Compliance

✅ Policies — 4/4 met

Policy Status
Default non-root user ✅ Passed
No fixable critical/high CVEs ✅ Passed
No high-profile vulnerabilities ✅ Passed
No AGPL v3 licenses ✅ Passed

🔍 View full scan logs

@lerian-studio
Copy link
Contributor

🔒 Security Scan Results — ledger

Trivy

Filesystem Scan

✅ No vulnerabilities or secrets found.

Docker Image Scan

✅ No vulnerabilities found.


Docker Hub Health Score Compliance

✅ Policies — 4/4 met

Policy Status
Default non-root user ✅ Passed
No fixable critical/high CVEs ✅ Passed
No high-profile vulnerabilities ✅ Passed
No AGPL v3 licenses ✅ Passed

🔍 View full scan logs

@lerian-studio
Copy link
Contributor

🔒 Security Scan Results — onboarding

Trivy

Filesystem Scan

✅ No vulnerabilities or secrets found.

Docker Image Scan

✅ No vulnerabilities found.


Docker Hub Health Score Compliance

✅ Policies — 4/4 met

Policy Status
Default non-root user ✅ Passed
No fixable critical/high CVEs ✅ Passed
No high-profile vulnerabilities ✅ Passed
No AGPL v3 licenses ✅ Passed

🔍 View full scan logs

@lerian-studio
Copy link
Contributor

🔒 Security Scan Results — transaction

Trivy

Filesystem Scan

✅ No vulnerabilities or secrets found.

Docker Image Scan

✅ No vulnerabilities found.


Docker Hub Health Score Compliance

✅ Policies — 4/4 met

Policy Status
Default non-root user ✅ Passed
No fixable critical/high CVEs ✅ Passed
No high-profile vulnerabilities ✅ Passed
No AGPL v3 licenses ✅ Passed

🔍 View full scan logs

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go`:
- Around line 485-499: The result.Attempted is prefilled with len(transactions)
before any chunk runs, overstating attempts on partial failure; change
initialization and update Attempted only as chunks are actually sent to
ExecContext (or increment by the chunk size just before each ExecContext call
inside the for loop in transaction.postgresql.go) so that if ctx.Done() triggers
before a chunk is executed the returned repository.BulkUpdateResult.Attempted
reflects only the rows that were actually submitted; ensure this update happens
right before the ExecContext call and that the early-return path (select on
ctx.Done()) returns the current Attempted count.
- Around line 541-568: The batched UPDATE only matches on t.id and therefore can
update rows across organizations/ledgers; modify the VALUES and WHERE to
preserve org/ledger scoping: when building valuesClauses and args in the loop
(where TransactionPostgreSQLModel record is created), include
record.OrganizationID and record.LedgerID in each tuple (so each row becomes
($1::uuid, $2::uuid, $3::uuid, $4, $5, $6::timestamp) etc.), update the args
capacity to len(transactions)*6 and append record.OrganizationID and
record.LedgerID before record.ID/status/... in the same stable order, and update
the final query variable to define AS v(id, organization_id, ledger_id,
new_status, new_status_description, new_updated_at) and add WHERE t.id = v.id
AND t.organization_id = v.organization_id AND t.ledger_id = v.ledger_id AND
t.status != v.new_status AND t.deleted_at IS NULL so the batch honors the same
org/ledger guard as the single-row Update path.

In `@components/transaction/internal/adapters/rabbitmq/bulk_collector.go`:
- Around line 237-257: cleanupOnExit currently passes the possibly cancelled ctx
into executeFlush, which can cause the final batch to be dropped; modify
cleanupOnExit so that before calling executeFlush it creates a fresh context
with a short timeout (e.g., via context.WithTimeout or context.Background plus
WithTimeout) and uses that new ctx for executeFlush, then ensure you cancel the
timeout ctx after executeFlush returns; keep the existing locking and message
swap logic in cleanupOnExit and only change the context passed to executeFlush.

In
`@components/transaction/internal/services/command/create-bulk-transaction-operations-async.go`:
- Around line 103-105: The bulk path currently calls processMetadataAndEvents
for the entire payload batch even when DB returned no-op/duplicate rows; change
the flow in createBulkTransactionOperationsAsync so you first examine the bulk
insert/update result (the structure that indicates per-row status — e.g.,
duplicate/unchanged flags or affected row counts) and filter payloads to only
those with actual changes, then call uc.processMetadataAndEvents(ctx, logger,
filteredPayloads); ensure the filter logic references the result fields used in
the diff (the bulk operation result and the payload items) so no metadata/events
are produced for rows the DB ignored.
- Around line 379-399: The loop currently swallows errors from
uc.UpdateTransactionStatus and returns nil, so small-batch updates can be
partially persisted while the caller is told everything succeeded; change the
logic in the function that processes individual transactions (the loop that
calls uc.UpdateTransactionStatus for each tx) to record failures (e.g.
failureCount or slice of errs), log each failure as now, and after the loop set
result.TransactionsUpdated as you already do but if any UpdateTransactionStatus
returned an error return a non-nil error (aggregated or the first error) instead
of nil so the caller knows the batch wasn’t fully applied; keep uses of
uc.UpdateTransactionStatus, result.TransactionsUpdated, and the existing
logger.Log calls.
- Around line 176-191: The loop currently aliases payload.Transaction into tx
and mutates it, corrupting the original payloads used later by
fallbackToIndividualProcessing; instead create a new transaction copy inside the
loop (a shallow or deep copy as appropriate) and mutate that copy when
classifying and when calling uc.prepareTransactionForInsert, leaving
payload.Transaction untouched; keep the classification logic using
uc.isStatusTransition and append the copied transaction to toUpdate.transactions
or toInsert.transactions so fallbackToIndividualProcessing still has the
original, unmodified payloads.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: a72ca4a0-4ad1-440c-a188-ff5dd073ee5c

📥 Commits

Reviewing files that changed from the base of the PR and between 21113b8 and 6f1266c.

📒 Files selected for processing (13)
  • components/transaction/.env.example
  • components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go
  • components/transaction/internal/adapters/postgres/transaction/transaction.postgresql_mock.go
  • components/transaction/internal/adapters/postgres/transaction/transaction_updatebulk_test.go
  • components/transaction/internal/adapters/rabbitmq/bulk_collector.go
  • components/transaction/internal/adapters/rabbitmq/bulk_collector_test.go
  • components/transaction/internal/bootstrap/config.go
  • components/transaction/internal/bootstrap/config_test.go
  • components/transaction/internal/services/command/create-balance-transaction-operations-async.go
  • components/transaction/internal/services/command/create-bulk-transaction-operations-async.go
  • components/transaction/internal/services/command/create-bulk-transaction-operations-async_test.go
  • components/transaction/internal/services/command/update-transaction.go
  • pkg/repository/bulk.go

Comment on lines +485 to +499
result := &repository.BulkUpdateResult{Attempted: int64(len(transactions))}

// Chunk into bulks of ~1,000 rows to stay within PostgreSQL's parameter limit
// Transaction update uses 4 columns (id, status, status_description, updated_at), so 1000 rows = 4,000 parameters
const chunkSize = 1000

for i := 0; i < len(transactions); i += chunkSize {
// Check for context cancellation between chunks
select {
case <-ctx.Done():
libOpentelemetry.HandleSpanError(span, "Context cancelled during bulk update", ctx.Err())
logger.Log(ctx, libLog.LevelWarn, fmt.Sprintf("Context cancelled during bulk update: %v", ctx.Err()))

// Return partial result; Unchanged stays 0 since remaining items were not processed
return result, ctx.Err()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Attempted is overstated on partial failures.

repository.BulkUpdateResult.Attempted is documented as “rows sent to UPDATE”, but Line 485 pre-fills it with the full input before any chunk runs. If the context is canceled before the first ExecContext, the returned partial result still says every row was attempted, which makes the metric unreliable for retries and diagnostics.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go`
around lines 485 - 499, The result.Attempted is prefilled with len(transactions)
before any chunk runs, overstating attempts on partial failure; change
initialization and update Attempted only as chunks are actually sent to
ExecContext (or increment by the chunk size just before each ExecContext call
inside the for loop in transaction.postgresql.go) so that if ctx.Done() triggers
before a chunk is executed the returned repository.BulkUpdateResult.Attempted
reflects only the rows that were actually submitted; ensure this update happens
right before the ExecContext call and that the early-return path (select on
ctx.Done()) returns the current Attempted count.

Comment on lines +541 to +568
// Build parameterized VALUES list for the batched update
// Each row has 4 values: id, status, status_description, updated_at
updatedAt := time.Now()
args := make([]any, 0, len(transactions)*4)
valuesClauses := make([]string, 0, len(transactions))

for i, tx := range transactions {
record := &TransactionPostgreSQLModel{}
record.FromEntity(tx)

// Calculate parameter positions (1-indexed for PostgreSQL)
baseIdx := i * 4
valuesClauses = append(valuesClauses, fmt.Sprintf("($%d::uuid, $%d, $%d, $%d::timestamp)",
baseIdx+1, baseIdx+2, baseIdx+3, baseIdx+4))

args = append(args, record.ID, record.Status, record.StatusDescription, updatedAt)
}

// Build the batched UPDATE query using UPDATE...FROM (VALUES...)
// This performs all updates in a single database round-trip
query := fmt.Sprintf(`UPDATE %s t
SET status = v.new_status,
status_description = v.new_status_description,
updated_at = v.new_updated_at
FROM (VALUES %s) AS v(id, new_status, new_status_description, new_updated_at)
WHERE t.id = v.id
AND t.status != v.new_status
AND t.deleted_at IS NULL`,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve org/ledger scoping in the batched UPDATE.

The single-row Update path guards on organization_id, ledger_id, and id, but this query matches only on t.id. That is a regression from the safety boundary enforced on Line 1062 to Line 1066: a misrouted or malformed payload can update a transaction whose org/ledger do not match the message.

🔐 Suggested fix
-	// Each row has 4 values: id, status, status_description, updated_at
+	// Each row has 6 values: id, organization_id, ledger_id, status, status_description, updated_at
 	updatedAt := time.Now()
-	args := make([]any, 0, len(transactions)*4)
+	args := make([]any, 0, len(transactions)*6)
 	valuesClauses := make([]string, 0, len(transactions))
 
 	for i, tx := range transactions {
 		record := &TransactionPostgreSQLModel{}
 		record.FromEntity(tx)
 
-		// Calculate parameter positions (1-indexed for PostgreSQL)
-		baseIdx := i * 4
-		valuesClauses = append(valuesClauses, fmt.Sprintf("($%d::uuid, $%d, $%d, $%d::timestamp)",
-			baseIdx+1, baseIdx+2, baseIdx+3, baseIdx+4))
-
-		args = append(args, record.ID, record.Status, record.StatusDescription, updatedAt)
+		// Calculate parameter positions (1-indexed for PostgreSQL)
+		baseIdx := i * 6
+		valuesClauses = append(valuesClauses, fmt.Sprintf(
+			"($%d::uuid, $%d::uuid, $%d::uuid, $%d, $%d, $%d::timestamp)",
+			baseIdx+1, baseIdx+2, baseIdx+3, baseIdx+4, baseIdx+5, baseIdx+6,
+		))
+
+		args = append(
+			args,
+			record.ID,
+			record.OrganizationID,
+			record.LedgerID,
+			record.Status,
+			record.StatusDescription,
+			updatedAt,
+		)
 	}
 
 	// Build the batched UPDATE query using UPDATE...FROM (VALUES...)
 	// This performs all updates in a single database round-trip
 	query := fmt.Sprintf(`UPDATE %s t
 		SET status = v.new_status,
 		    status_description = v.new_status_description,
 		    updated_at = v.new_updated_at
-		FROM (VALUES %s) AS v(id, new_status, new_status_description, new_updated_at)
+		FROM (VALUES %s) AS v(id, organization_id, ledger_id, new_status, new_status_description, new_updated_at)
 		WHERE t.id = v.id
+		  AND t.organization_id = v.organization_id
+		  AND t.ledger_id = v.ledger_id
 		  AND t.status != v.new_status
 		  AND t.deleted_at IS NULL`,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go`
around lines 541 - 568, The batched UPDATE only matches on t.id and therefore
can update rows across organizations/ledgers; modify the VALUES and WHERE to
preserve org/ledger scoping: when building valuesClauses and args in the loop
(where TransactionPostgreSQLModel record is created), include
record.OrganizationID and record.LedgerID in each tuple (so each row becomes
($1::uuid, $2::uuid, $3::uuid, $4, $5, $6::timestamp) etc.), update the args
capacity to len(transactions)*6 and append record.OrganizationID and
record.LedgerID before record.ID/status/... in the same stable order, and update
the final query variable to define AS v(id, organization_id, ledger_id,
new_status, new_status_description, new_updated_at) and add WHERE t.id = v.id
AND t.organization_id = v.organization_id AND t.ledger_id = v.ledger_id AND
t.status != v.new_status AND t.deleted_at IS NULL so the batch honors the same
org/ledger guard as the single-row Update path.

Comment on lines +237 to +257
// cleanupOnExit performs final flush and cleanup when the loop exits.
func (bc *BulkCollector) cleanupOnExit(ctx context.Context, timer *time.Timer) {
if timer != nil {
timer.Stop()
}

bc.mu.Lock()
if len(bc.messages) > 0 {
messages := bc.messages
bc.messages = make([]amqp.Delivery, 0, bc.bulkSize)
bc.mu.Unlock()

bc.executeFlush(ctx, messages)
} else {
bc.mu.Unlock()
}

bc.mu.Lock()
bc.started = false
bc.mu.Unlock()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Final flush uses potentially cancelled context.

When the collector exits due to context cancellation (line 152-153), cleanupOnExit is called with the cancelled context. The executeFlush on line 249 passes this cancelled context to the flush callback, which may cause the final batch to fail if the callback respects context cancellation.

Consider whether the final flush should use a fresh context (e.g., context.Background() with a timeout) to ensure remaining messages are persisted during graceful shutdown.

🛡️ Proposed fix to use a timeout context for final flush
 // cleanupOnExit performs final flush and cleanup when the loop exits.
 func (bc *BulkCollector) cleanupOnExit(ctx context.Context, timer *time.Timer) {
 	if timer != nil {
 		timer.Stop()
 	}
 
+	// Use a fresh context for final flush to ensure completion even if original was cancelled
+	flushCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+
 	bc.mu.Lock()
 	if len(bc.messages) > 0 {
 		messages := bc.messages
 		bc.messages = make([]amqp.Delivery, 0, bc.bulkSize)
 		bc.mu.Unlock()
 
-		bc.executeFlush(ctx, messages)
+		bc.executeFlush(flushCtx, messages)
 	} else {
 		bc.mu.Unlock()
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// cleanupOnExit performs final flush and cleanup when the loop exits.
func (bc *BulkCollector) cleanupOnExit(ctx context.Context, timer *time.Timer) {
if timer != nil {
timer.Stop()
}
bc.mu.Lock()
if len(bc.messages) > 0 {
messages := bc.messages
bc.messages = make([]amqp.Delivery, 0, bc.bulkSize)
bc.mu.Unlock()
bc.executeFlush(ctx, messages)
} else {
bc.mu.Unlock()
}
bc.mu.Lock()
bc.started = false
bc.mu.Unlock()
}
// cleanupOnExit performs final flush and cleanup when the loop exits.
func (bc *BulkCollector) cleanupOnExit(ctx context.Context, timer *time.Timer) {
if timer != nil {
timer.Stop()
}
// Use a fresh context for final flush to ensure completion even if original was cancelled
flushCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
bc.mu.Lock()
if len(bc.messages) > 0 {
messages := bc.messages
bc.messages = make([]amqp.Delivery, 0, bc.bulkSize)
bc.mu.Unlock()
bc.executeFlush(flushCtx, messages)
} else {
bc.mu.Unlock()
}
bc.mu.Lock()
bc.started = false
bc.mu.Unlock()
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/transaction/internal/adapters/rabbitmq/bulk_collector.go` around
lines 237 - 257, cleanupOnExit currently passes the possibly cancelled ctx into
executeFlush, which can cause the final batch to be dropped; modify
cleanupOnExit so that before calling executeFlush it creates a fresh context
with a short timeout (e.g., via context.WithTimeout or context.Background plus
WithTimeout) and uses that new ctx for executeFlush, then ensure you cancel the
timeout ctx after executeFlush returns; keep the existing locking and message
swap logic in cleanupOnExit and only change the context passed to executeFlush.

Comment on lines +103 to +105
// Process metadata and send events
uc.processMetadataAndEvents(ctx, logger, payloads)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The bulk path still emits side effects for no-op rows.

processMetadataAndEvents runs for the entire batch even when bulk insert/update reported duplicates or unchanged rows. On retried messages that means re-creating metadata and re-publishing events for rows the DB deliberately ignored, so the consumer is no longer idempotent at the side-effect boundary.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/transaction/internal/services/command/create-bulk-transaction-operations-async.go`
around lines 103 - 105, The bulk path currently calls processMetadataAndEvents
for the entire payload batch even when DB returned no-op/duplicate rows; change
the flow in createBulkTransactionOperationsAsync so you first examine the bulk
insert/update result (the structure that indicates per-row status — e.g.,
duplicate/unchanged flags or affected row counts) and filter payloads to only
those with actual changes, then call uc.processMetadataAndEvents(ctx, logger,
filteredPayloads); ensure the filter logic references the result fields used in
the diff (the bulk operation result and the payload items) so no metadata/events
are produced for rows the DB ignored.

Comment on lines +176 to +191
for _, payload := range payloads {
if payload.Transaction == nil {
continue
}

tx := payload.Transaction
tx.Body = pkgTransaction.Transaction{}

// Determine if this is an insert or an update
if uc.isStatusTransition(payload) {
// Status transition: PENDING -> APPROVED/CANCELED
toUpdate.transactions = append(toUpdate.transactions, tx)
} else {
// Normal insert
uc.prepareTransactionForInsert(tx)
toInsert.transactions = append(toInsert.transactions, tx)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't mutate the original payload during classification.

tx := payload.Transaction aliases the same object stored in payloads, so tx.Body = ... and prepareTransactionForInsert(tx) rewrite the message in place. If bulk persistence fails, fallbackToIndividualProcessing replays a modified payload instead of the original one, and new PENDING inserts lose their body even though prepareTransactionForInsert says that body should be kept.

🛠️ Suggested fix
-		tx := payload.Transaction
-		tx.Body = pkgTransaction.Transaction{}
+		txCopy := *payload.Transaction
+		tx := &txCopy
 
 		// Determine if this is an insert or an update
 		if uc.isStatusTransition(payload) {
 			// Status transition: PENDING -> APPROVED/CANCELED
 			toUpdate.transactions = append(toUpdate.transactions, tx)
 func (uc *UseCase) prepareTransactionForInsert(tx *transaction.Transaction) {
 	switch tx.Status.Code {
 	case constant.CREATED:
+		tx.Body = pkgTransaction.Transaction{}
 		description := constant.APPROVED
 		tx.Status = transaction.Status{
 			Code:        description,
 			Description: &description,
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/transaction/internal/services/command/create-bulk-transaction-operations-async.go`
around lines 176 - 191, The loop currently aliases payload.Transaction into tx
and mutates it, corrupting the original payloads used later by
fallbackToIndividualProcessing; instead create a new transaction copy inside the
loop (a shallow or deep copy as appropriate) and mutate that copy when
classifying and when calling uc.prepareTransactionForInsert, leaving
payload.Transaction untouched; keep the classification logic using
uc.isStatusTransition and append the copied transaction to toUpdate.transactions
or toInsert.transactions so fallbackToIndividualProcessing still has the
original, unmodified payloads.

Comment on lines +379 to +399
for _, tx := range transactions {
_, err := uc.UpdateTransactionStatus(ctx, tx)
if err != nil {
logger.Log(ctx, libLog.LevelWarn, fmt.Sprintf(
"Failed to update transaction %s status: %v", tx.ID, err,
))

continue
}

updated++
}

result.TransactionsUpdated = updated

logger.Log(ctx, libLog.LevelInfo, fmt.Sprintf(
"Individual updated transactions: attempted=%d, updated=%d",
result.TransactionsUpdateAttempted, updated,
))

return nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't treat failed small-batch status updates as success.

For batches below bulkUpdateThreshold, any UpdateTransactionStatus error is only logged and then discarded. The caller receives nil, proceeds with follow-up work, and can ACK the batch even though some or all status transitions were never persisted.

🧯 Suggested fix
 	for _, tx := range transactions {
 		_, err := uc.UpdateTransactionStatus(ctx, tx)
 		if err != nil {
 			logger.Log(ctx, libLog.LevelWarn, fmt.Sprintf(
 				"Failed to update transaction %s status: %v", tx.ID, err,
 			))
-
-			continue
+			return fmt.Errorf("update transaction %s status: %w", tx.ID, err)
 		}
 
 		updated++
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _, tx := range transactions {
_, err := uc.UpdateTransactionStatus(ctx, tx)
if err != nil {
logger.Log(ctx, libLog.LevelWarn, fmt.Sprintf(
"Failed to update transaction %s status: %v", tx.ID, err,
))
continue
}
updated++
}
result.TransactionsUpdated = updated
logger.Log(ctx, libLog.LevelInfo, fmt.Sprintf(
"Individual updated transactions: attempted=%d, updated=%d",
result.TransactionsUpdateAttempted, updated,
))
return nil
for _, tx := range transactions {
_, err := uc.UpdateTransactionStatus(ctx, tx)
if err != nil {
logger.Log(ctx, libLog.LevelWarn, fmt.Sprintf(
"Failed to update transaction %s status: %v", tx.ID, err,
))
return fmt.Errorf("update transaction %s status: %w", tx.ID, err)
}
updated++
}
result.TransactionsUpdated = updated
logger.Log(ctx, libLog.LevelInfo, fmt.Sprintf(
"Individual updated transactions: attempted=%d, updated=%d",
result.TransactionsUpdateAttempted, updated,
))
return nil
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/transaction/internal/services/command/create-bulk-transaction-operations-async.go`
around lines 379 - 399, The loop currently swallows errors from
uc.UpdateTransactionStatus and returns nil, so small-batch updates can be
partially persisted while the caller is told everything succeeded; change the
logic in the function that processes individual transactions (the loop that
calls uc.UpdateTransactionStatus for each tx) to record failures (e.g.
failureCount or slice of errs), log each failure as now, and after the loop set
result.TransactionsUpdated as you already do but if any UpdateTransactionStatus
returned an error return a non-nil error (aggregated or the first error) instead
of nil so the caller knows the batch wasn’t fully applied; keep uses of
uc.UpdateTransactionStatus, result.TransactionsUpdated, and the existing
logger.Log calls.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

back-end Back-end Issues size/XL

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bulk recorder

2 participants