-
Notifications
You must be signed in to change notification settings - Fork 0
feat(uptime): Add ability to use queues to manage parallelism #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: kafka-consumer-parallel-before
Are you sure you want to change the base?
feat(uptime): Add ability to use queues to manage parallelism #2
Conversation
One potential problem we have with batch processing is that any one slow item will clog up the whole batch. This pr implements a queueing method instead, where we keep N queues that each have their own workers. There's still a chance of individual items backlogging a queue, but we can try increased concurrency here to reduce the chances of that happening <!-- Describe your PR here. -->
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| self.all_offsets: dict[Partition, set[int]] = defaultdict(set) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Performance] Unbounded Offset Tracking Sets
- Problem: The
OffsetTrackerstores all observed and outstanding offsets in sets (all_offsets,outstanding) without size limits, leading to excessive memory consumption and potential OOM errors. - Fix: Implement a bounded size or an eviction policy for these sets to prevent unbounded memory growth.
| while not self.shutdown: | ||
| try: | ||
| work_item = self.work_queue.get() | ||
| except queue.ShutDown: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Bug] Incorrect Queue Shutdown Mechanism
- Problem: The
OrderedQueueWorkeruses a non-standardqueue.ShutDownexception andFixedQueuePoolcalls a non-existentshutdownmethod, preventing graceful worker termination and causing resource leaks. - Fix: Replace
queue.ShutDownwith a sentinel value or usequeue.Emptywith a timeout and aself.shutdownflag; remove the invalidq.shutdowncall and ensure workers are joined.
| start = max(last_committed + 1, min_offset) | ||
|
|
||
| highest_committable = last_committed | ||
| for offset in range(start, max_offset + 1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Performance] Inefficient Linear Scan for Committable Offsets
- Problem: The
get_committable_offsetsfunction uses a linear scan, which can be a performance bottleneck for large gaps betweenlast_committedandmax_offset. - Fix: Optimize by tracking contiguous blocks of completed offsets or using a data structure for faster identification.
| with self._get_partition_lock(partition): | ||
| self.last_committed[partition] = offset | ||
| # Remove all offsets <= committed offset | ||
| self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Performance] Inefficient Set Reconstruction for Offset Cleanup
- Problem: Reconstructing
self.all_offsets[partition]by iterating and filtering is computationally expensive for large sets, impacting commit loop performance. - Fix: Explore alternative data structures or methods for more efficient range-based deletion or filtering if
all_offsetsis expected to be large.
| self.workers: list[OrderedQueueWorker[T]] = [] | ||
|
|
||
| for i in range(num_queues): | ||
| work_queue: queue.Queue[WorkItem[T]] = queue.Queue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Performance] Unbounded Queues Cause Resource Exhaustion
- Problem:
queue.Queue()instances without amaxsizecan grow indefinitely, leading to excessive memory consumption and OutOfMemory errors. - Fix: Define a reasonable
maxsizefor queues to prevent memory exhaustion and provide natural backpressure.
pythonwork_queue: queue.Queue[WorkItem[T]] = queue.Queue(maxsize=some_configurable_limit)
Test 9