-
Notifications
You must be signed in to change notification settings - Fork 2.1k
CHIA-3856 Use an adapted version of deficit round robin algorithm in TransactionQueue's pop #20351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
CHIA-3856 Use an adapted version of deficit round robin algorithm in TransactionQueue's pop #20351
Conversation
Pull Request Test Coverage Report for Build 20440722378Details
💛 - Coveralls |
arvidn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of your implementation is that it:
- scans for a transaction whose cost >= the peer's deficit counter
1.2 If found, ingest transaction and decrement the deficit counter by the cost - If not found, increment the deficit counter of all peers by the lowest cost transaction, and goto 1
I don't think you need the cursor for fairness anymore, you'll get fairness anyway, since you track the deficit counters.
I think it could be made a bit more efficient by reducing the linear scan into a heap pop. It may introduce some more complexity though. This is not a complete thought. But imagine if every peer had a sort "key" which was its deficit_counter - cost_of_top_tx. You could have a priority queue (or really, a heap would suffice) of those peers. Every time a deficit counter is adjusted, the peer is resorted. You always pop from the top (which is O(1)). You'd need to figure out when to increment the deficit counters and by how much. So maybe this wouldn't be so much simpler.
| log: logging.Logger | ||
| _max_tx_clvm_cost: uint64 | ||
| # Map of peer ID to deficit counter in the context of deficit round robin | ||
| _deficit_counters: dict[bytes32, int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't _queue_dict also a map of the same peer IDs?
It would seem cheaper and simpler to stick this int in that dict instead. Am I missing something?
| self._index_to_peer_map = new_peer_map | ||
| if result is not None: | ||
| return result | ||
| # Map of peer ID to its top transaction's advertised cost |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this deserves a comment explaining the idea behind this behavior. that we want to service transactions fairly between peers, based on cost.
| if tx_info is None: | ||
| top_tx_advertised_cost = max( | ||
| (t.advertised_cost for t in entry.peers_with_tx.values()), default=self._max_tx_clvm_cost | ||
| ) | ||
| else: | ||
| top_tx_advertised_cost = tx_info.advertised_cost |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this case warrants a comment. This is where we don't know the cost (or fee) for a transaction, so we want to assume a very high cost. I don't think we should search peer_with_tx, we should just assume it has a really high cost. This is just for backwards compatibility, right?
| if tx_info is None: | |
| top_tx_advertised_cost = max( | |
| (t.advertised_cost for t in entry.peers_with_tx.values()), default=self._max_tx_clvm_cost | |
| ) | |
| else: | |
| top_tx_advertised_cost = tx_info.advertised_cost | |
| top_tx_advertised_cost = self._max_tx_clvm_cost if tx_info is None else tx_info.advertised_cost |
0d5e94e to
2811d66
Compare
|
coverage: |
| new_peer_map.append(peer_id) | ||
| self._index_to_peer_map = new_peer_map | ||
| if len(self._index_to_peer_map) > 0: | ||
| self._list_cursor %= len(self._index_to_peer_map) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor adjustment after cleanup skips peers incorrectly
The _cleanup_peer_queues method uses simple modulo to adjust _list_cursor after removing empty peer queues, but this doesn't correctly account for peers removed before the cursor position. For example, if peers are [A, B, C, D] with cursor=2 pointing to C, and only B is removed, the new list becomes [A, C, D] where C is now at index 1. But the cursor becomes 2 % 3 = 2, pointing to D instead of C. This breaks the round-robin fairness guarantee by skipping peers in the iteration order, giving preferential treatment to some peers over others.
| tx_info = entry.peers_with_tx.get(peer_id) | ||
| # If we don't know the cost information for this transaction | ||
| # we fallback to the highest cost. | ||
| top_tx_advertised_cost = self._max_tx_clvm_cost if tx_info is None else tx_info.advertised_cost |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent handling of zero advertised cost between methods
The put() method checks tx_info is not None and tx_info.advertised_cost > 0 to handle both missing cost info AND zero/negative cost, falling back to infinity priority in either case. However, pop() only checks tx_info is None for the fallback to _max_tx_clvm_cost. If tx_info exists but advertised_cost is 0 or negative, pop() would use that value directly instead of falling back to _max_tx_clvm_cost. This inconsistency means a transaction with advertised_cost <= 0 gets lowest priority in its peer's queue but is treated as "free" (zero cost) in the deficit calculation, potentially allowing unfair bypass of the deficit round robin fairness mechanism.
Additional Locations (1)
| @dataclass | ||
| class NormalPriorityQueue: | ||
| priority_queue: PriorityQueue[tuple[float, TransactionQueueEntry]] | ||
| deficit: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think deficit warrants a comment. the unit is CLVM cost for instance
|
|
||
| @dataclass | ||
| class NormalPriorityQueue: | ||
| priority_queue: PriorityQueue[tuple[float, TransactionQueueEntry]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this member also warrants a comment. I imagine that float is fee per cost, or is it cost per fee? Or maybe negative fee-per-cost.
| log: logging.Logger | ||
| _max_tx_clvm_cost: uint64 | ||
| # Each 100 pops we do a cleanup of empty peer queues | ||
| _cleanup_counter: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you need to add deferred cleanup?
|
|
||
| def put(self, tx: TransactionQueueEntry, peer_id: bytes32 | None, high_priority: bool = False) -> None: | ||
| if peer_id is None or high_priority: # when it's local there is no peer_id. | ||
| self._high_priority_queue.put(tx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you add:
self._queue_length.release()
return
here, you can de-indent the else-block
| self._normal_priority_queues[peer_id] = NormalPriorityQueue(PriorityQueue(), 0) | ||
| self._index_to_peer_map.append(peer_id) | ||
| if self._queue_dict[peer_id].qsize() < self.peer_size_limit: | ||
| if self._normal_priority_queues[peer_id].priority_queue.qsize() < self.peer_size_limit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you invert this check and throw, the rest of the code can be de-indented
| raise TransactionQueueFull(f"Transaction queue full for peer {peer_id}") | ||
| self._queue_length.release() # increment semaphore to indicate that we have a new item in the queue | ||
|
|
||
| def _cleanup_peer_queues(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was there no cleanup earlier? Was this a memory leak?
Purpose:
Converts
TransactionQueue'spopfrom a simple round robin across peers to a deficit round robin.Current Behavior:
TransactionQueue'spopimplements a simple round robin across peers.New Behavior:
TransactionQueue'spopimplements an adapted version of the deficit round robin algorithm.Testing Notes:
Note
Introduces adapted deficit round-robin scheduling for transaction processing to improve per-peer fairness and cost-aware selection.
NormalPriorityQueuewith per-peer priority queues anddeficitcounters;pop()cycles peers, sending a tx when deficit covers its advertised cost, otherwise increments deficits by the lowest top-tx costinfpriority when cost is unknown and usemax_tx_clvm_costfor deficit calculationsmax_tx_clvm_costparameter toTransactionQueue(used as MAX_BLOCK_COST_CLVM//2); wire throughFullNodeinitializationWritten by Cursor Bugbot for commit 2811d66. This will update automatically on new commits. Configure here.