Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions apps/chain_pusher/pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pusher
import (
"context"
"fmt"
"maps"
"math/big"
"time"

Expand All @@ -17,6 +18,8 @@ const (
contractChChannelBufferSize = 128
)

type updateBatch map[types.InternalEncodedAssetID]types.AggregatedSignedPrice

// Pusher is a struct that contains the configuration for the Pusher.
type Pusher struct {
storkWsEndpoint string
Expand Down Expand Up @@ -116,7 +119,7 @@ func (p *Pusher) Run(ctx context.Context) {
ticker := time.NewTicker(p.batchingWindowDuration)
defer ticker.Stop()

pushCh := make(chan map[types.InternalEncodedAssetID]types.AggregatedSignedPrice, 128)
pushCh := make(chan updateBatch, 128)

// use separate goroutine to handle push updates to avoid blocking the main loop
go func() {
Expand All @@ -125,8 +128,9 @@ func (p *Pusher) Run(ctx context.Context) {
case <-ctx.Done():
return
case pushUpdates := <-pushCh:
// synchronous push updates to ensure update order is maintained
p.handlePushUpdates(ctx, pushUpdates, contractCh)
// drain the channel and merge all pending batches so only the latest update per asset is pushed
merged := drainAndMerge(pushUpdates, pushCh)
p.handlePushUpdates(ctx, merged, contractCh)
}
}
}()
Expand Down Expand Up @@ -154,6 +158,19 @@ func (p *Pusher) Run(ctx context.Context) {
}
}

// drainAndMerge takes an initial batch and drains any additional pending batches from the channel,
// merging them so that only the latest update per asset is kept.
func drainAndMerge(initial updateBatch, ch <-chan updateBatch) updateBatch {
merged := make(updateBatch, len(initial))
maps.Copy(merged, initial)

for batch := range ch {
maps.Copy(merged, batch)
}

return merged
}

func (p *Pusher) pullWithTimeout(
ctx context.Context, encodedAssetIDs []types.InternalEncodedAssetID,
) (map[types.InternalEncodedAssetID]types.InternalTemporalNumericValue, error) {
Expand All @@ -169,7 +186,7 @@ func (p *Pusher) pullWithTimeout(
}

func (p *Pusher) pushWithTimeout(
ctx context.Context, nextUpdate map[types.InternalEncodedAssetID]types.AggregatedSignedPrice,
ctx context.Context, nextUpdate updateBatch,
) error {
pullCtx, pullCancel := context.WithTimeout(ctx, defaultNetworkTimeout)
defer pullCancel()
Expand Down Expand Up @@ -310,7 +327,7 @@ func (p *Pusher) initializeAssets() (*types.AssetConfig, []shared.AssetID, []typ
// handlePushUpdates processes updates and pushes them to the contract.
func (p *Pusher) handlePushUpdates(
ctx context.Context,
updates map[types.InternalEncodedAssetID]types.AggregatedSignedPrice,
updates updateBatch,
contractCh chan<- map[types.InternalEncodedAssetID]types.InternalTemporalNumericValue,
) {
if len(updates) > 0 {
Expand Down
Loading