Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ingest/ledger_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,12 +736,12 @@ func (t *LedgerTransaction) InnerTransactionHash() (string, bool) {
return hex.EncodeToString(innerHash[:]), true
}

func (t *LedgerTransaction) NewMaxFee() (uint32, bool) {
func (t *LedgerTransaction) NewMaxFee() (int64, bool) {
if !t.Envelope.IsFeeBump() {
return 0, false
}

return uint32(t.Envelope.FeeBumpFee()), true
return t.Envelope.FeeBumpFee(), true
}

func (t *LedgerTransaction) Successful() bool {
Expand Down
4 changes: 2 additions & 2 deletions ingest/ledger_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,10 +758,10 @@ func TestTransactionHelperFunctions(t *testing.T) {
assert.Equal(t, false, ok)
assert.Equal(t, "", innerTransactionHash)

var newMaxFee uint32
var newMaxFee int64
newMaxFee, ok = transaction.NewMaxFee()
assert.Equal(t, false, ok)
assert.Equal(t, uint32(0), newMaxFee)
assert.Equal(t, int64(0), newMaxFee)

assert.Equal(t, true, transaction.Successful())
}
Expand Down
27 changes: 18 additions & 9 deletions processors/token_transfer/token_transfer_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"math/big"

"github.com/stellar/go-stellar-sdk/amount"
assetProto "github.com/stellar/go-stellar-sdk/asset"
Expand Down Expand Up @@ -993,14 +994,20 @@ For more details, on why this is needed, refer - https://github.com/stellar/stel

The maybeGenerateMintOrBurnEvents function takes in an account and an asset, but in reality, this will only be called for operationSourceAccount and strictly for XLM
*/
func (p *EventsProcessor) maybeGenerateMintOrBurnEventsForReconciliation(tx ingest.LedgerTransaction, opIndex uint32, changesMap, eventsMap map[balanceKey]int64, account xdr.MuxedAccount, asset xdr.Asset) (*TokenTransferEvent, error) {
func (p *EventsProcessor) maybeGenerateMintOrBurnEventsForReconciliation(tx ingest.LedgerTransaction, opIndex uint32, changesMap, eventsMap map[balanceKey]*big.Int, account xdr.MuxedAccount, asset xdr.Asset) (*TokenTransferEvent, error) {
accountStr := account.ToAccountId().Address()
// Create the balance key for this account and XLM asset
key := balanceKey{holder: accountStr, asset: asset.StringCanonical()}

// Get the balance changes from both maps
// Get the balance changes from both maps (nil means zero)
changesBalance := changesMap[key]
if changesBalance == nil {
changesBalance = new(big.Int)
}
eventsBalance := eventsMap[key]
if eventsBalance == nil {
eventsBalance = new(big.Int)
}

/*
Highlighting all possible scenarios:
Expand Down Expand Up @@ -1053,11 +1060,10 @@ func (p *EventsProcessor) maybeGenerateMintOrBurnEventsForReconciliation(tx inge
*/

// Both maps have entries for this account/asset
diff := changesBalance - eventsBalance
// Not in either map, no difference
diff := new(big.Int).Sub(changesBalance, eventsBalance)

// If no difference, no mint or burn needs to be emitted
if diff == 0 {
if diff.Sign() == 0 {
return nil, nil
}

Expand All @@ -1067,12 +1073,12 @@ func (p *EventsProcessor) maybeGenerateMintOrBurnEventsForReconciliation(tx inge
protoAsset := assetProto.NewProtoAsset(asset)

// Generate appropriate event based on the difference
if diff > 0 {
if diff.Sign() > 0 {
// changesMap shows more XLM than eventsMap - need to MINT
mintOrBurnEvent = NewMintEvent(meta, accountStr, amount.String64Raw(xdr.Int64(diff)), protoAsset)
mintOrBurnEvent = NewMintEvent(meta, accountStr, diff.String(), protoAsset)
} else {
// changesMap shows less XLM than eventsMap - need to BURN
mintOrBurnEvent = NewBurnEvent(meta, accountStr, amount.String64Raw(xdr.Int64(-diff)), protoAsset)
mintOrBurnEvent = NewBurnEvent(meta, accountStr, new(big.Int).Abs(diff).String(), protoAsset)
}

return mintOrBurnEvent, nil
Expand All @@ -1085,7 +1091,10 @@ func (p *EventsProcessor) generateXlmReconciliationEvents(tx ingest.LedgerTransa
return nil, fmt.Errorf("failed to get operation changes for operation Index: %v: %w", opIndex, err)
}
changesMap := findBalanceDeltasFromChanges(operationChanges)
eventsMap := findBalanceDeltasFromEvents(operationEvents)
eventsMap, err := findBalanceDeltasFromEvents(operationEvents)
if err != nil {
return nil, fmt.Errorf("failed to compute event balance deltas for operation Index: %v: %w", opIndex, err)
}
operationSrcAccount := operationSourceAccount(tx, op)

return p.maybeGenerateMintOrBurnEventsForReconciliation(tx, opIndex, changesMap, eventsMap, operationSrcAccount, xlmAsset)
Expand Down
103 changes: 72 additions & 31 deletions processors/token_transfer/verify_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package token_transfer
import (
"fmt"
"io"
"math/big"

"github.com/google/go-cmp/cmp"
"github.com/stellar/go-stellar-sdk/amount"
"github.com/stellar/go-stellar-sdk/ingest"
"github.com/stellar/go-stellar-sdk/strkey"
"github.com/stellar/go-stellar-sdk/xdr"
Expand All @@ -18,18 +18,25 @@ type balanceKey struct {
}

// updateBalanceMap updates the map and removes the entry if the value becomes 0
func updateBalanceMap(m map[balanceKey]int64, key balanceKey, delta int64) {
func updateBalanceMap(m map[balanceKey]*big.Int, key balanceKey, delta *big.Int) {
// We dont include movement to/from contract address is balance delta tracking, since there is no standard way to derive/verify from contractData
if strkey.IsValidContractAddress(key.holder) {
return
}
m[key] += delta
if m[key] == 0 {
delete(m, key)
if delta.Sign() == 0 {
return
}
if existing, ok := m[key]; ok {
existing.Add(existing, delta)
if existing.Sign() == 0 {
delete(m, key)
}
} else {
m[key] = new(big.Int).Set(delta)
}
}

func fetchAccountDeltaFromChange(change ingest.Change, m map[balanceKey]int64) {
func fetchAccountDeltaFromChange(change ingest.Change, m map[balanceKey]*big.Int) {
var accountKey string
var pre, post xdr.Int64

Expand All @@ -44,11 +51,11 @@ func fetchAccountDeltaFromChange(change ingest.Change, m map[balanceKey]int64) {
post = entry.Balance
}

delta := int64(post - pre)
delta := big.NewInt(int64(post - pre))
updateBalanceMap(m, balanceKey{holder: accountKey, asset: xlmAsset.StringCanonical()}, delta)
}

func fetchTrustlineDeltaFromChange(change ingest.Change, m map[balanceKey]int64) {
func fetchTrustlineDeltaFromChange(change ingest.Change, m map[balanceKey]*big.Int) {
var trustlineKey string
var asset string
var pre, post xdr.Int64
Expand All @@ -72,11 +79,11 @@ func fetchTrustlineDeltaFromChange(change ingest.Change, m map[balanceKey]int64)
asset = entry.Asset.ToAsset().StringCanonical()
}

delta := int64(post - pre)
delta := big.NewInt(int64(post - pre))
updateBalanceMap(m, balanceKey{holder: trustlineKey, asset: asset}, delta)
}

func fetchClaimableDeltaFromChange(change ingest.Change, m map[balanceKey]int64) {
func fetchClaimableDeltaFromChange(change ingest.Change, m map[balanceKey]*big.Int) {
var cbKey string
var asset string
var pre, post xdr.Int64
Expand All @@ -94,11 +101,11 @@ func fetchClaimableDeltaFromChange(change ingest.Change, m map[balanceKey]int64)
post = entry.Amount
}

delta := int64(post - pre)
delta := big.NewInt(int64(post - pre))
updateBalanceMap(m, balanceKey{holder: cbKey, asset: asset}, delta)
}

func fetchLiquidityPoolDeltaFromChange(change ingest.Change, m map[balanceKey]int64) {
func fetchLiquidityPoolDeltaFromChange(change ingest.Change, m map[balanceKey]*big.Int) {
var lpKey string
var assetA, assetB string
var preA, preB, postA, postB xdr.Int64
Expand All @@ -119,16 +126,16 @@ func fetchLiquidityPoolDeltaFromChange(change ingest.Change, m map[balanceKey]in
postA, postB = cp.ReserveA, cp.ReserveB
}

deltaA := int64(postA - preA)
deltaB := int64(postB - preB)
deltaA := big.NewInt(int64(postA - preA))
deltaB := big.NewInt(int64(postB - preB))

updateBalanceMap(m, balanceKey{holder: lpKey, asset: assetA}, deltaA)
updateBalanceMap(m, balanceKey{holder: lpKey, asset: assetB}, deltaB)
}

// findBalanceDeltasFromChanges aggregates all balance changes from ledger entry changes
func findBalanceDeltasFromChanges(changes []ingest.Change) map[balanceKey]int64 {
hashmap := make(map[balanceKey]int64)
func findBalanceDeltasFromChanges(changes []ingest.Change) map[balanceKey]*big.Int {
hashmap := make(map[balanceKey]*big.Int)
for _, change := range changes {
switch change.Type {
case xdr.LedgerEntryTypeAccount:
Expand All @@ -144,9 +151,18 @@ func findBalanceDeltasFromChanges(changes []ingest.Change) map[balanceKey]int64
return hashmap
}

// parseAmount parses a raw amount string into a *big.Int.
func parseAmount(amountStr string, eventType string) (*big.Int, error) {
amt, ok := new(big.Int).SetString(amountStr, 10)
if !ok {
return nil, fmt.Errorf("invalid amount %q in %s event", amountStr, eventType)
}
return amt, nil
}

// findBalanceDeltasFromEvents aggregates all balance changes from token transfer events
func findBalanceDeltasFromEvents(events []*TokenTransferEvent) map[balanceKey]int64 {
hashmap := make(map[balanceKey]int64)
func findBalanceDeltasFromEvents(events []*TokenTransferEvent) (map[balanceKey]*big.Int, error) {
hashmap := make(map[balanceKey]*big.Int)

for _, event := range events {
if event.GetAsset() == nil { // needed check for custom token events which won't have an asset
Expand All @@ -158,50 +174,65 @@ func findBalanceDeltasFromEvents(events []*TokenTransferEvent) map[balanceKey]in
ev := event.GetFee()
address := ev.From
asset := xlmAsset.StringCanonical()
amt := amount.MustParseInt64Raw(ev.Amount)
amt, err := parseAmount(ev.Amount, string(event.GetEventType()))
if err != nil {
return nil, err
}
// Address' balance reduces by amt in FEE
updateBalanceMap(hashmap, balanceKey{holder: address, asset: asset}, -amt)
updateBalanceMap(hashmap, balanceKey{holder: address, asset: asset}, new(big.Int).Neg(amt))

case *TokenTransferEvent_Transfer:
ev := event.GetTransfer()
fromAddress := ev.From
toAddress := ev.To
amt := amount.MustParseInt64Raw(ev.Amount)
amt, err := parseAmount(ev.Amount, string(event.GetEventType()))
if err != nil {
return nil, err
}
asset := event.GetAsset().ToXdrAsset().StringCanonical()
// FromAddress' balance reduces by amt in TRANSFER
updateBalanceMap(hashmap, balanceKey{holder: fromAddress, asset: asset}, -amt)
updateBalanceMap(hashmap, balanceKey{holder: fromAddress, asset: asset}, new(big.Int).Neg(amt))
// ToAddress' balance increases by amt in TRANSFER
updateBalanceMap(hashmap, balanceKey{holder: toAddress, asset: asset}, amt)

case *TokenTransferEvent_Mint:
ev := event.GetMint()
toAddress := ev.To
asset := event.GetAsset().ToXdrAsset().StringCanonical()
amt := amount.MustParseInt64Raw(ev.Amount)
amt, err := parseAmount(ev.Amount, string(event.GetEventType()))
if err != nil {
return nil, err
}
// ToAddress' balance increases by amt in MINT
updateBalanceMap(hashmap, balanceKey{holder: toAddress, asset: asset}, amt)

case *TokenTransferEvent_Burn:
ev := event.GetBurn()
fromAddress := ev.From
asset := event.GetAsset().ToXdrAsset().StringCanonical()
amt := amount.MustParseInt64Raw(ev.Amount)
amt, err := parseAmount(ev.Amount, string(event.GetEventType()))
if err != nil {
return nil, err
}
// FromAddress' balance reduces by amt in BURN
updateBalanceMap(hashmap, balanceKey{holder: fromAddress, asset: asset}, -amt)
updateBalanceMap(hashmap, balanceKey{holder: fromAddress, asset: asset}, new(big.Int).Neg(amt))

case *TokenTransferEvent_Clawback:
ev := event.GetClawback()
fromAddress := ev.From
asset := event.GetAsset().ToXdrAsset().StringCanonical()
amt := amount.MustParseInt64Raw(ev.Amount)
amt, err := parseAmount(ev.Amount, string(event.GetEventType()))
if err != nil {
return nil, err
}
// FromAddress' balance reduces by amt in CLAWBACK
updateBalanceMap(hashmap, balanceKey{holder: fromAddress, asset: asset}, -amt)
updateBalanceMap(hashmap, balanceKey{holder: fromAddress, asset: asset}, new(big.Int).Neg(amt))

default:
panic(fmt.Errorf("unknown event type %s", event.GetEventType()))
return nil, fmt.Errorf("unknown event type %s", event.GetEventType())
}
}
return hashmap
return hashmap, nil
}

func VerifyEvents(ledger xdr.LedgerCloseMeta, passphrase string, readFromUnifiedEvents bool) error {
Expand All @@ -216,6 +247,13 @@ func VerifyEvents(ledger xdr.LedgerCloseMeta, passphrase string, readFromUnified
return fmt.Errorf("error creating transaction reader: %w", err)
}

bigIntComparer := cmp.Comparer(func(a, b *big.Int) bool {
if a == nil || b == nil {
return a == b
}
return a.Cmp(b) == 0
})

for {
var tx ingest.LedgerTransaction
var events []*TokenTransferEvent
Expand Down Expand Up @@ -246,10 +284,13 @@ func VerifyEvents(ledger xdr.LedgerCloseMeta, passphrase string, readFromUnified
changes := append(feeChanges, txChanges...)
changes = append(changes, postTxApplyFeeChanges...)

txEventsMap := findBalanceDeltasFromEvents(events)
txEventsMap, err := findBalanceDeltasFromEvents(events)
if err != nil {
return fmt.Errorf("verifyEventsError: %w", err)
}
txChangesMap := findBalanceDeltasFromChanges(changes)

if diff := cmp.Diff(txEventsMap, txChangesMap); diff != "" {
if diff := cmp.Diff(txEventsMap, txChangesMap, bigIntComparer); diff != "" {
return fmt.Errorf("balance delta mismatch between events and ledger changes for ledgerSequence: %v, closedAt: %v, txHash: %v\n"+
"('-' indicates missing or different in events, '+' indicates missing or different in ledger changes)\n%s", ledger.LedgerSequence(), ledger.ClosedAt(), txHash, diff)
}
Expand Down
Loading
Loading