Skip to content

Conversation

@agarakan
Copy link
Contributor

@agarakan agarakan commented Dec 30, 2025

Description of the issue

Introduce RetryHeap and RetryHeapProcessor implementations for cloudwatchlogs log publishing

Description of changes

RetryHeap is implemented as a min-heap sorted on retryTimestamp for a given batch PutLogEvent request that has already been attempted once and failed.

  1. Implements a min heap on retryTimestamp
  2. depends on go standard library container/heap
  3. Implements Push(), PopReady(), Size() and Stop()
  4. Implementation includes a semaphor to track heap size and allow blocking mechanism when retryHeap hits capacity.
  5. Implementation includes a mutex so pushing to heap does not encounter race condition

RetryHeapProcessor will periodically scan for batches that are ready to be retried based on retryTimestamp and move them to the SenderPool tasks queue for processing by the sender pool.

  1. Scans retry heap periodically (every 100 ms) for ready to retry batches
  2. Maintains reference to the SenderPool to put ready batches back on the senderpool tasks queue

Assumptions: The RetryHeap and RetryHeapProcessor will only be instantiated and used when concurrency is enabled.

No functionality changes are introduced in this PR as this sets up RetryHeap and related components, but does not instantiate them in the code yet.

License

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Tests

New unit tests are passing

make lint
make fmt
make fmt-sh
make test

Requirements

Before commiting your code, please do the following steps.

  1. Run make fmt and make fmt-sh
  2. Run make lint

Integration Tests

To run integration tests against this PR, add the ready for testing label.

@agarakan agarakan requested a review from a team as a code owner December 30, 2025 16:52
@agarakan agarakan added the ready for testing Indicates this PR is ready for integration tests to run label Dec 30, 2025
@agarakan agarakan changed the base branch from enable-multithreaded-logging-by-default to poison-pill December 30, 2025 18:08
)

// retryHeapImpl implements heap.Interface for logEventBatch sorted by nextRetryTime
type retryHeapImpl []*logEventBatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add var _ heap.Interface = (*retryHeapImpl)(nil)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack will update as follow up

}

// RetryHeap manages failed batches during their retry wait periods
type RetryHeap interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add var _ RetryHeap = (*retryHeap)(nil)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack will update as follow up

for _, batch := range readyBatches {
// Check if batch has expired
if batch.isExpired(p.maxRetryDuration) {
p.logger.Debugf("Dropping expired batch for %s/%s", batch.Group, batch.Stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack will update as follow up

)

// retryHeapImpl implements heap.Interface for logEventBatch sorted by nextRetryTime
type retryHeapImpl []*logEventBatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why do we have a retryHeapImpl and a retryHeap? Is there ever a case where we'd use one without the other? Is it better to operate on the []*logEventBatch directly instead of accessing it as a field of retryHeap?

Comment on lines +30 to +36
func (h *retryHeapImpl) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is missing a step from the PriorityQueue example

https://pkg.go.dev/container/heap#Interface

func (pq *PriorityQueue) Pop() any {
    ...
	old[n-1] = nil  // don't stop the GC from reclaiming the item eventually
    ...
}

Is there a reason we decided not to include it?

batch3 := newLogEventBatch(target, nil)
batch3.nextRetryTime = time.Now().Add(time.Hour) // Future time, won't be popped
heap.Push(batch3) // This should block
pushCompleted = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a data race since you're accessing the same variable on two separate threads/goroutines. Consider using an atomic or a channel.

}

// RetryHeap manages failed batches during their retry wait periods
type RetryHeap interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is the idea that each sender will have a reference to the RetryHeap to push failed batches onto and then there's a single RetryHeapProcessor that also has a reference to the RetryHeap?

// NewRetryHeap creates a new retry heap with the specified maximum size
func NewRetryHeap(maxSize int) RetryHeap {
rh := &retryHeap{
heap: make(retryHeapImpl, 0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would it make sense to initialize it with a capacity of maxSize? That way it doesn't need to reallocate on each push?

}

// Stop stops the retry heap
func (rh *retryHeap) Stop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is going to stop the retry heap? Does it stop when the RetryHeapProcessor stops?

}

// Wait for batches to become ready, then pop to release semaphore
time.Sleep(4 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we use sleeps in other unit tests, but is there a way to make this less flaky? Is there a reason to wait since this isn't using the processor, so doesn't rely on the ticker? The initial batches could be expired when added like in the other tests.


// Wait for batches to become ready, then pop to release semaphore
time.Sleep(4 * time.Second)
heap.PopReady()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we validate that the correct batches were taken off the heap?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ready for testing Indicates this PR is ready for integration tests to run

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants