From afd578092356f5b00574ec31ae943cac7085a77b Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sun, 3 Aug 2025 16:23:57 +0800 Subject: [PATCH 1/2] [fix][broker] Fix IllegalArgumentException in bucket range validation for delayed message delivery --- .../broker/delayed/bucket/BucketDelayedDeliveryTracker.java | 4 ++-- .../bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 3f0fcc516571f..b1b0cb43010e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -395,8 +395,8 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver } } - if (ledgerId < lastMutableBucket.startLedgerId || existBucket) { - // If (ledgerId < startLedgerId || existBucket) means that message index belong to previous bucket range, + if (ledgerId < lastMutableBucket.endLedgerId || existBucket) { + // If (ledgerId < endLedgerId || existBucket) means that message index belongs to previous bucket range, // enter sharedBucketPriorityQueue directly sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId); lastMutableBucket.putIndexBit(ledgerId, entryId); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java index 3bc96499bfdef..59867e797b41e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java @@ -113,7 +113,7 @@ public void tearDown() throws Exception { @Test public void testConcurrentContainsMessageWithWrites() throws Exception { final int numThreads = 16; - final int operationsPerThread = 1000; // Restore to test bucket creation properly + final int operationsPerThread = 10000; // Restore to test bucket creation properly final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch doneLatch = new CountDownLatch(numThreads); final AtomicInteger errors = new AtomicInteger(0); From a02fe6f10ba16d0bf4a2a8eeca8ae6fe40a8d677 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Mon, 11 Aug 2025 21:05:27 +0800 Subject: [PATCH 2/2] fix(test): improve thread safety test for BucketDelayedDeliveryTracker --- .../bucket/BucketDelayedDeliveryTracker.java | 4 +- ...elayedDeliveryTrackerThreadSafetyTest.java | 86 +++++++++++-------- 2 files changed, 54 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index b1b0cb43010e4..3f0fcc516571f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -395,8 +395,8 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver } } - if (ledgerId < lastMutableBucket.endLedgerId || existBucket) { - // If (ledgerId < endLedgerId || existBucket) means that message index belongs to previous bucket range, + if (ledgerId < lastMutableBucket.startLedgerId || existBucket) { + // If (ledgerId < startLedgerId || existBucket) means that message index belong to previous bucket range, // enter sharedBucketPriorityQueue directly sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId); lastMutableBucket.putIndexBit(ledgerId, entryId); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java index 59867e797b41e..6a8ac244118d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; import org.testng.annotations.AfterMethod; @@ -48,6 +49,7 @@ * These tests verify that the hybrid approach with StampedLock and concurrent data structures * correctly handles concurrent access patterns without deadlocks, race conditions, or data corruption. */ +@Slf4j public class BucketDelayedDeliveryTrackerThreadSafetyTest { private BucketDelayedDeliveryTracker tracker; @@ -97,83 +99,99 @@ public void setUp() throws Exception { @AfterMethod public void tearDown() throws Exception { - if (tracker != null) { - tracker.close(); - } + // First shutdown executor to stop all threads if (executorService != null) { assertTrue(MoreExecutors.shutdownAndAwaitTermination(executorService, 5, TimeUnit.SECONDS), "Executor should shutdown cleanly"); } + // Then close tracker safely after all threads stopped + if (tracker != null) { + tracker.close(); + } } /** - * Test concurrent containsMessage() calls while adding messages. + * Test concurrent containsMessage() calls while adding messages sequentially. * This tests the StampedLock optimistic read performance under contention. + * addMessage is executed sequentially (as in real scenarios), while containsMessage is concurrent. */ @Test public void testConcurrentContainsMessageWithWrites() throws Exception { - final int numThreads = 16; - final int operationsPerThread = 10000; // Restore to test bucket creation properly + final int numReadThreads = 8; + final int readsPerThread = 1000; + final int totalMessages = 5000; final CountDownLatch startLatch = new CountDownLatch(1); - final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final CountDownLatch readersDone = new CountDownLatch(numReadThreads); final AtomicInteger errors = new AtomicInteger(0); final AtomicReference firstException = new AtomicReference<>(); + final AtomicInteger messagesAdded = new AtomicInteger(0); - // Start reader threads - for (int i = 0; i < numThreads / 2; i++) { + // Start reader threads - these will run concurrently + for (int i = 0; i < numReadThreads; i++) { final int threadId = i; executorService.submit(() -> { try { startLatch.await(); - for (int j = 0; j < operationsPerThread; j++) { - long ledgerId = threadId * 1000 + j; - long entryId = j; + // Continuously read for a period while messages are being added + long endTime = System.currentTimeMillis() + 10000; // Read for 10 seconds + int readCount = 0; + while (System.currentTimeMillis() < endTime && readCount < readsPerThread) { + // Check for messages across the range that might be added + long ledgerId = 1000 + (readCount % totalMessages); + long entryId = readCount % 100; // This should not throw exceptions or block indefinitely tracker.containsMessage(ledgerId, entryId); + readCount++; + if (readCount % 100 == 0) { + Thread.sleep(1); // Small delay to allow writes + } } } catch (Exception e) { errors.incrementAndGet(); firstException.compareAndSet(null, e); e.printStackTrace(); } finally { - doneLatch.countDown(); + readersDone.countDown(); } }); } - // Start writer threads - for (int i = numThreads / 2; i < numThreads; i++) { - final int threadId = i; - executorService.submit(() -> { - try { - startLatch.await(); - for (int j = 0; j < operationsPerThread; j++) { - long ledgerId = threadId * 1000 + j; - long entryId = j; - long deliverAt = System.currentTimeMillis() + 10000; // 10s delay - tracker.addMessage(ledgerId, entryId, deliverAt); + // Start the single writer thread - sequential addMessage calls + executorService.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < totalMessages; i++) { + long ledgerId = 1000 + i; // Sequential ledgerId + long entryId = i % 100; + long deliverAt = System.currentTimeMillis() + 10000; // 10s delay + boolean added = tracker.addMessage(ledgerId, entryId, deliverAt); + if (added) { + messagesAdded.incrementAndGet(); + } + // Small delay to simulate real processing time + if (i % 100 == 0) { + Thread.sleep(1); } - } catch (Exception e) { - errors.incrementAndGet(); - firstException.compareAndSet(null, e); - e.printStackTrace(); - } finally { - doneLatch.countDown(); } - }); - } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } + }); startLatch.countDown(); - assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds"); + assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should complete within 30 seconds"); if (errors.get() > 0) { Exception exception = firstException.get(); if (exception != null) { - System.err.println("First exception caught: " + exception.getMessage()); + log.error("First exception caught: " + exception.getMessage()); exception.printStackTrace(); } } assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations"); + assertTrue(messagesAdded.get() > 0, "Some messages should have been added"); } /**