Skip to content
65 changes: 64 additions & 1 deletion pkg/chipingress/batch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package batch
import (
"context"
"errors"
"strconv"
"sync"
"sync/atomic"
"time"

cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)
Expand All @@ -16,10 +20,16 @@ type messageWithCallback struct {
callback func(error)
}

type seqnumKey struct {
source string
eventType string
}

// Client is a batching client that accumulates messages and sends them in batches.
type Client struct {
client chipingress.Client
batchSize int
cloneEvent bool
maxConcurrentSends chan struct{}
batchInterval time.Duration
maxPublishTimeout time.Duration
Expand All @@ -31,6 +41,7 @@ type Client struct {
shutdownOnce sync.Once
batcherDone chan struct{}
cancelBatcher context.CancelFunc
counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop()
}

// Opt is a functional option for configuring the batch Client.
Expand All @@ -42,6 +53,7 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) {
client: client,
log: zap.NewNop().Sugar(),
batchSize: 10,
cloneEvent: true,
maxConcurrentSends: make(chan struct{}, 1),
messageBuffer: make(chan *messageWithCallback, 200),
batchInterval: 100 * time.Millisecond,
Expand Down Expand Up @@ -123,15 +135,37 @@ func (b *Client) Stop() {
case <-ctx.Done(): // timeout or context cancelled
b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout)
}

// Release per-stream seqnum state to avoid unbounded growth from high-cardinality source/type values.
b.clearCounters()
})
}

func (b *Client) clearCounters() {
b.counters.Range(func(key, _ any) bool {
b.counters.Delete(key)
return true
})
}

// seqnumFor returns the next sequence number for the given source+type pair.
// Each unique (source, type) pair has its own independent counter starting at 1.
func (b *Client) seqnumFor(source, typ string) uint64 {
key := seqnumKey{source: source, eventType: typ}
v, _ := b.counters.LoadOrStore(key, &atomic.Uint64{})
return v.(*atomic.Uint64).Add(1)
}

// QueueMessage queues a single message to the batch client with an optional callback.
// The callback will be invoked after the batch containing this message is sent.
// The callback receives an error parameter (nil on success).
// Callbacks are invoked from goroutines
// Returns immediately with no blocking - drops message if channel is full.
// Returns an error if the message was dropped.
// QueueMessage stamps/overwrites the "seqnum" extension on the event it buffers.
// By default, it clones the input event first (WithEventClone(true)) so caller-owned
// objects are not mutated and queued snapshots remain immutable under pointer reuse.
// If cloning is disabled via WithEventClone(false), the caller event is mutated in place.
func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(error)) error {
if event == nil {
return nil
Expand All @@ -144,8 +178,29 @@ func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(err
default:
}

eventToQueue := event
if b.cloneEvent {
// Clone the caller-owned event so queued messages keep an immutable seqnum snapshot.
eventCopy, ok := proto.Clone(event).(*chipingress.CloudEventPb)
if !ok {
return errors.New("failed to clone event")
}
eventToQueue = eventCopy
}

// Stamp seqnum extension attribute using the event snapshot being queued.
seq := b.seqnumFor(eventToQueue.Source, eventToQueue.Type)
if eventToQueue.Attributes == nil {
eventToQueue.Attributes = make(map[string]*cepb.CloudEventAttributeValue)
}
eventToQueue.Attributes["seqnum"] = &cepb.CloudEventAttributeValue{
Attr: &cepb.CloudEventAttributeValue_CeString{
CeString: strconv.FormatUint(seq, 10),
},
}

msg := &messageWithCallback{
event: event,
event: eventToQueue,
callback: callback,
}

Expand Down Expand Up @@ -198,6 +253,14 @@ func WithBatchSize(batchSize int) Opt {
}
}

// WithEventClone controls whether QueueMessage clones events before stamping seqnum and buffering.
// Defaults to true for safety when caller reuses event pointers.
func WithEventClone(clone bool) Opt {
return func(c *Client) {
c.cloneEvent = clone
}
}

// WithMaxConcurrentSends sets the maximum number of concurrent batch send operations
func WithMaxConcurrentSends(maxConcurrentSends int) Opt {
return func(c *Client) {
Expand Down
Loading
Loading