Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ jobs:
BLNK_REDIS_DNS: localhost:6379

- name: Test
run: go test -v ./
run: go test -v ./...
6 changes: 3 additions & 3 deletions api/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ func TestUpdateMetadata(t *testing.T) {
}

t.Run("Missing entity ID", func(t *testing.T) {
req := httptest.NewRequest("PATCH", "/metadata/update/", nil)
req := httptest.NewRequest("POST", "/metadata", nil)
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
assert.Equal(t, http.StatusNotFound, resp.Code)
})

t.Run("Invalid JSON body", func(t *testing.T) {
req := httptest.NewRequest("PATCH", "/metadata/update/ldg_123", bytes.NewReader([]byte("invalid json")))
req := httptest.NewRequest("POST", "/ldg_123/metadata", bytes.NewReader([]byte("invalid json")))
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
Expand All @@ -55,7 +55,7 @@ func TestUpdateMetadata(t *testing.T) {
WrongField: "value",
}
payloadBytes, _ := request.ToJsonReq(&payload)
req := httptest.NewRequest("PATCH", "/metadata/update/ldg_123", payloadBytes)
req := httptest.NewRequest("POST", "/ldg_123/metadata", payloadBytes)
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
Expand Down
12 changes: 12 additions & 0 deletions api/reconciliation_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func (a Api) StartReconciliation(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if len(req.MatchingRuleIDs) == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "matching_rule_ids is required"})
return
}

reconciliationID, err := a.blnk.StartReconciliation(c.Request.Context(), req.UploadID, req.Strategy, req.GroupingCriteria, req.MatchingRuleIDs, req.DryRun)
if err != nil {
Expand Down Expand Up @@ -114,6 +118,14 @@ func (a Api) InstantReconciliation(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if len(req.ExternalTransactions) == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "external_transactions is required"})
return
}
if len(req.MatchingRuleIDs) == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "matching_rule_ids is required"})
return
}

reconciliationID, err := a.blnk.StartInstantReconciliation(
c.Request.Context(),
Expand Down
20 changes: 10 additions & 10 deletions api/reconciliation_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func TestStartReconciliation(t *testing.T) {
Strategy: "",
}
payloadBytes, _ := request.ToJsonReq(&payload)
req := httptest.NewRequest("POST", "/reconciliations", payloadBytes)
req := httptest.NewRequest("POST", "/reconciliation/start", payloadBytes)
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
assert.Equal(t, http.StatusBadRequest, resp.Code)
})

t.Run("Invalid JSON", func(t *testing.T) {
req := httptest.NewRequest("POST", "/reconciliations", bytes.NewReader([]byte("invalid json")))
req := httptest.NewRequest("POST", "/reconciliation/start", bytes.NewReader([]byte("invalid json")))
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
Expand All @@ -74,15 +74,15 @@ func TestInstantReconciliation(t *testing.T) {
MatchingRuleIDs: []string{"mr_test"},
}
payloadBytes, _ := request.ToJsonReq(&payload)
req := httptest.NewRequest("POST", "/reconciliations/instant", payloadBytes)
req := httptest.NewRequest("POST", "/reconciliation/start-instant", payloadBytes)
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
assert.Equal(t, http.StatusBadRequest, resp.Code)
})

t.Run("Invalid JSON", func(t *testing.T) {
req := httptest.NewRequest("POST", "/reconciliations/instant", bytes.NewReader([]byte("invalid json")))
req := httptest.NewRequest("POST", "/reconciliation/start-instant", bytes.NewReader([]byte("invalid json")))
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
Expand All @@ -97,7 +97,7 @@ func TestGetReconciliation(t *testing.T) {
}

t.Run("Reconciliation not found", func(t *testing.T) {
req := httptest.NewRequest("GET", "/reconciliations/rec_nonexistent", nil)
req := httptest.NewRequest("GET", "/reconciliation/rec_nonexistent", nil)
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
assert.Equal(t, http.StatusNotFound, resp.Code)
Expand All @@ -111,7 +111,7 @@ func TestCreateMatchingRule(t *testing.T) {
}

t.Run("Invalid JSON", func(t *testing.T) {
req := httptest.NewRequest("POST", "/matching-rules", bytes.NewReader([]byte("invalid json")))
req := httptest.NewRequest("POST", "/reconciliation/matching-rules", bytes.NewReader([]byte("invalid json")))
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
Expand All @@ -134,15 +134,15 @@ func TestUpdateMatchingRule(t *testing.T) {
Description: "Updated description",
}
payloadBytes, _ := request.ToJsonReq(&payload)
req := httptest.NewRequest("PUT", "/matching-rules/", payloadBytes)
req := httptest.NewRequest("PUT", "/reconciliation/matching-rules/", payloadBytes)
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
assert.Equal(t, http.StatusNotFound, resp.Code)
})

t.Run("Invalid JSON", func(t *testing.T) {
req := httptest.NewRequest("PUT", "/matching-rules/mr_test", bytes.NewReader([]byte("invalid json")))
req := httptest.NewRequest("PUT", "/reconciliation/matching-rules/mr_test", bytes.NewReader([]byte("invalid json")))
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
Expand All @@ -157,14 +157,14 @@ func TestDeleteMatchingRule(t *testing.T) {
}

t.Run("Missing rule ID", func(t *testing.T) {
req := httptest.NewRequest("DELETE", "/matching-rules/", nil)
req := httptest.NewRequest("DELETE", "/reconciliation/matching-rules/", nil)
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
assert.Equal(t, http.StatusNotFound, resp.Code)
})

t.Run("Delete non-existent rule", func(t *testing.T) {
req := httptest.NewRequest("DELETE", "/matching-rules/mr_nonexistent", nil)
req := httptest.NewRequest("DELETE", "/reconciliation/matching-rules/mr_nonexistent", nil)
resp := httptest.NewRecorder()
router.ServeHTTP(resp, req)
assert.Equal(t, http.StatusInternalServerError, resp.Code)
Expand Down
12 changes: 11 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ var (
MaxQueueSize: 1000,
MaxWorkers: 10,
LockDuration: 30 * time.Minute,
LockWaitTimeout: 3 * time.Second,
IndexQueuePrefix: "transactions",
EnableCoalescing: false,
EnableCoalescing: true,
EnableQueuedChecks: false,
DisableBatchReferenceCheck: false,
}
Expand Down Expand Up @@ -156,6 +157,7 @@ type TransactionConfig struct {
MaxQueueSize int `json:"max_queue_size" envconfig:"BLNK_TRANSACTION_MAX_QUEUE_SIZE"`
MaxWorkers int `json:"max_workers" envconfig:"BLNK_TRANSACTION_MAX_WORKERS"`
LockDuration time.Duration `json:"lock_duration" envconfig:"BLNK_TRANSACTION_LOCK_DURATION"`
LockWaitTimeout time.Duration `json:"lock_wait_timeout" envconfig:"BLNK_TRANSACTION_LOCK_WAIT_TIMEOUT"`
IndexQueuePrefix string `json:"index_queue_prefix" envconfig:"BLNK_TRANSACTION_INDEX_QUEUE_PREFIX"`
EnableCoalescing bool `json:"enable_coalescing" envconfig:"BLNK_TRANSACTION_ENABLE_COALESCING"`
EnableQueuedChecks bool `json:"enable_queued_checks" envconfig:"BLNK_TRANSACTION_ENABLE_QUEUED_CHECKS"`
Expand Down Expand Up @@ -339,6 +341,14 @@ func (cnf *Configuration) setTransactionDefaults() {
} else {
cnf.Transaction.LockDuration = cnf.Transaction.LockDuration * time.Second
}
if cnf.Transaction.LockWaitTimeout == 0 {
cnf.Transaction.LockWaitTimeout = defaultTransaction.LockWaitTimeout
} else if cnf.Transaction.LockWaitTimeout > 0 && cnf.Transaction.LockWaitTimeout < time.Second {
cnf.Transaction.LockWaitTimeout = cnf.Transaction.LockWaitTimeout * time.Second
}
if !cnf.Transaction.EnableCoalescing {
cnf.Transaction.EnableCoalescing = defaultTransaction.EnableCoalescing
}
if cnf.Transaction.IndexQueuePrefix == "" {
cnf.Transaction.IndexQueuePrefix = defaultTransaction.IndexQueuePrefix
}
Expand Down
37 changes: 37 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"os"
"testing"
"time"
)

func TestValidateAndAddDefaults(t *testing.T) {
Expand Down Expand Up @@ -76,6 +77,36 @@ func TestValidateAndAddDefaults(t *testing.T) {
if cnf.Server.Port != DEFAULT_PORT {
t.Errorf("Expected default port %s, got %s", DEFAULT_PORT, cnf.Server.Port)
}
if cnf.Transaction.LockWaitTimeout != 3*time.Second {
t.Errorf("Expected default lock wait timeout %s, got %s", 3*time.Second, cnf.Transaction.LockWaitTimeout)
}
if !cnf.Transaction.EnableCoalescing {
t.Errorf("Expected coalescing to default to enabled")
}
}

func TestValidateAndAddDefaults_TransactionLockWaitTimeout(t *testing.T) {
cnf := Configuration{
ProjectName: "Test Project",
DataSource: DataSourceConfig{
Dns: "some-dns",
},
Redis: RedisConfig{
Dns: "localhost:6379",
},
Transaction: TransactionConfig{
LockWaitTimeout: 12,
},
}

err := cnf.validateAndAddDefaults()
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}

if cnf.Transaction.LockWaitTimeout != 12*time.Second {
t.Fatalf("Expected LockWaitTimeout to be 12s, got %s", cnf.Transaction.LockWaitTimeout)
}
}

func TestLoadConfigFromFile(t *testing.T) {
Expand All @@ -95,6 +126,9 @@ func TestLoadConfigFromFile(t *testing.T) {
Redis: RedisConfig{
Dns: "temp-redis",
},
Transaction: TransactionConfig{
LockWaitTimeout: 7,
},
}
if err := json.NewEncoder(tmpFile).Encode(sampleConfig); err != nil {
t.Fatalf("Unable to write to temporary file: %v", err)
Expand Down Expand Up @@ -125,6 +159,9 @@ func TestLoadConfigFromFile(t *testing.T) {
if loadedConfig.DataSource.Dns != "temp-dns" {
t.Errorf("Expected DataSource.Dns to be 'temp-dns', got '%s'", loadedConfig.DataSource.Dns)
}
if loadedConfig.Transaction.LockWaitTimeout != 7*time.Second {
t.Errorf("Expected LockWaitTimeout to be '7s', got '%s'", loadedConfig.Transaction.LockWaitTimeout)
}
}

func TestInitConfig(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions internal/hotpairs/hotpairs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package hotpairs

import (
"context"
"fmt"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
redlock "github.com/blnkfinance/blnk/internal/lock"
"github.com/blnkfinance/blnk/model"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -156,3 +158,9 @@ func TestQueueLaneFromMetadata(t *testing.T) {
require.Equal(t, LaneNormal, QueueLaneFromMetadata(nil))
require.Equal(t, LaneHot, QueueLaneFromMetadata(map[string]interface{}{QueueLaneMetaKey: LaneHot}))
}

func TestIsLockContentionError(t *testing.T) {
require.True(t, IsLockContentionError(fmt.Errorf("wrapped: %w", redlock.ErrLockHeld)))
require.True(t, IsLockContentionError(fmt.Errorf("wrapped: %w", redlock.ErrLockWaitTimeout)))
require.False(t, IsLockContentionError(context.DeadlineExceeded))
}
8 changes: 8 additions & 0 deletions internal/hotpairs/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package hotpairs

import (
"context"
"errors"
"strings"

redlock "github.com/blnkfinance/blnk/internal/lock"
"github.com/blnkfinance/blnk/model"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -164,6 +166,12 @@ func IsLockContentionError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, redlock.ErrLockHeld) || errors.Is(err, redlock.ErrLockWaitTimeout) {
return true
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "already held") ||
strings.Contains(msg, "failed to acquire lock") ||
Expand Down
Loading
Loading