Skip to content

feat: add DynamicBatcher for batching items across caller threads#35849

Open
bjorncs wants to merge 1 commit intomasterfrom
bjorncs/dynamic-batching-embedder
Open

feat: add DynamicBatcher for batching items across caller threads#35849
bjorncs wants to merge 1 commit intomasterfrom
bjorncs/dynamic-batching-embedder

Conversation

@bjorncs
Copy link
Member

@bjorncs bjorncs commented Feb 11, 2026

Utility that groups items from multiple caller threads into batches partitioned by a caller-supplied key. One caller thread per batch acts as executor while others wait for the result. Batches are dispatched when reaching max size or max delay timeout.

@bjorncs bjorncs requested a review from havardpe February 12, 2026 12:58
Utility that groups items from multiple caller threads into batches
partitioned by a caller-supplied key. One caller thread per batch
acts as executor while others wait for the result. Batches are
dispatched when reaching max size or max delay timeout.
@bjorncs bjorncs force-pushed the bjorncs/dynamic-batching-embedder branch from e7f456e to cd05b8c Compare February 13, 2026 17:10
@bjorncs bjorncs requested a review from havardpe February 13, 2026 17:11
Copy link
Member

@havardpe havardpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks good. I only have some minor comments

synchronized (batch.monitor) {
onWaiting.run();
for (long deadlineMs = batch.deadline.toEpochMilli(), now = timer.instant().toEpochMilli();
!batch.isDone && now < deadlineMs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is enough to check isFilling here, since we end up waiting for completion further down unless we become the executor

try { batch.monitor.wait(deadlineMs - now); }
catch (InterruptedException e) {}
}
if (!batch.isDone && batch.isFilling) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isFilling already implies not done and is the thing we should check for here

return awaitResult(batch, index);
}

/** Wakes up all waiting threads. For unit testing. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we wake the threads waiting on batches not yet removed from the map, which may not be all of them, but the test might be written in such a way that the comment becomes true.

}
}

private Batch<K, I, O> startNewBatch(K key, I input) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider HasBatchesLock suffix on method name to indicate which lock we expect the caller to have

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants