diff --git a/apps/chain_pusher/pkg/pusher/pusher.go b/apps/chain_pusher/pkg/pusher/pusher.go index c4374425..62ee1440 100644 --- a/apps/chain_pusher/pkg/pusher/pusher.go +++ b/apps/chain_pusher/pkg/pusher/pusher.go @@ -3,6 +3,7 @@ package pusher import ( "context" "fmt" + "maps" "math/big" "time" @@ -17,6 +18,8 @@ const ( contractChChannelBufferSize = 128 ) +type updateBatch map[types.InternalEncodedAssetID]types.AggregatedSignedPrice + // Pusher is a struct that contains the configuration for the Pusher. type Pusher struct { storkWsEndpoint string @@ -116,7 +119,7 @@ func (p *Pusher) Run(ctx context.Context) { ticker := time.NewTicker(p.batchingWindowDuration) defer ticker.Stop() - pushCh := make(chan map[types.InternalEncodedAssetID]types.AggregatedSignedPrice, 128) + pushCh := make(chan updateBatch, 128) // use separate goroutine to handle push updates to avoid blocking the main loop go func() { @@ -125,8 +128,9 @@ func (p *Pusher) Run(ctx context.Context) { case <-ctx.Done(): return case pushUpdates := <-pushCh: - // synchronous push updates to ensure update order is maintained - p.handlePushUpdates(ctx, pushUpdates, contractCh) + // drain the channel and merge all pending batches so only the latest update per asset is pushed + merged := drainAndMerge(pushUpdates, pushCh) + p.handlePushUpdates(ctx, merged, contractCh) } } }() @@ -154,6 +158,19 @@ func (p *Pusher) Run(ctx context.Context) { } } +// drainAndMerge takes an initial batch and drains any additional pending batches from the channel, +// merging them so that only the latest update per asset is kept. +func drainAndMerge(initial updateBatch, ch <-chan updateBatch) updateBatch { + merged := make(updateBatch, len(initial)) + maps.Copy(merged, initial) + + for batch := range ch { + maps.Copy(merged, batch) + } + + return merged +} + func (p *Pusher) pullWithTimeout( ctx context.Context, encodedAssetIDs []types.InternalEncodedAssetID, ) (map[types.InternalEncodedAssetID]types.InternalTemporalNumericValue, error) { @@ -169,7 +186,7 @@ func (p *Pusher) pullWithTimeout( } func (p *Pusher) pushWithTimeout( - ctx context.Context, nextUpdate map[types.InternalEncodedAssetID]types.AggregatedSignedPrice, + ctx context.Context, nextUpdate updateBatch, ) error { pullCtx, pullCancel := context.WithTimeout(ctx, defaultNetworkTimeout) defer pullCancel() @@ -310,7 +327,7 @@ func (p *Pusher) initializeAssets() (*types.AssetConfig, []shared.AssetID, []typ // handlePushUpdates processes updates and pushes them to the contract. func (p *Pusher) handlePushUpdates( ctx context.Context, - updates map[types.InternalEncodedAssetID]types.AggregatedSignedPrice, + updates updateBatch, contractCh chan<- map[types.InternalEncodedAssetID]types.InternalTemporalNumericValue, ) { if len(updates) > 0 {