-
Notifications
You must be signed in to change notification settings - Fork 45
Open
Labels
Description
Describe the bug
aop/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java
Lines 287 to 303 in 06bf657
| public void removeQueue(AmqpQueue queue) { | |
| queues.remove(queue); | |
| if (exchangeType == Type.Direct) { | |
| for (Map.Entry<String, Set<AmqpQueue>> entry : bindingKeyQueueMap.entrySet()) { | |
| bindingKeyQueueMap.computeIfPresent(entry.getKey(), (k, v) -> { | |
| v.remove(queue); | |
| if (v.isEmpty()) { | |
| return null; | |
| } | |
| return v; | |
| }); | |
| } | |
| } | |
| updateExchangeProperties(); | |
| deleteCursor(queue.getName()); | |
| } | |
in removeQueue method, we remove value from a map when iterating it, which cause potential ConcurrentModificationException.
To Reproduce
bind queue with Direct exchange, then remove the queue
Expected behavior
queue removed successfully
Screenshots
none
Additional context
none
Reactions are currently unavailable