diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java index 3bc96499bfdef..6a8ac244118d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; import org.testng.annotations.AfterMethod; @@ -48,6 +49,7 @@ * These tests verify that the hybrid approach with StampedLock and concurrent data structures * correctly handles concurrent access patterns without deadlocks, race conditions, or data corruption. */ +@Slf4j public class BucketDelayedDeliveryTrackerThreadSafetyTest { private BucketDelayedDeliveryTracker tracker; @@ -97,83 +99,99 @@ public void setUp() throws Exception { @AfterMethod public void tearDown() throws Exception { - if (tracker != null) { - tracker.close(); - } + // First shutdown executor to stop all threads if (executorService != null) { assertTrue(MoreExecutors.shutdownAndAwaitTermination(executorService, 5, TimeUnit.SECONDS), "Executor should shutdown cleanly"); } + // Then close tracker safely after all threads stopped + if (tracker != null) { + tracker.close(); + } } /** - * Test concurrent containsMessage() calls while adding messages. + * Test concurrent containsMessage() calls while adding messages sequentially. * This tests the StampedLock optimistic read performance under contention. + * addMessage is executed sequentially (as in real scenarios), while containsMessage is concurrent. */ @Test public void testConcurrentContainsMessageWithWrites() throws Exception { - final int numThreads = 16; - final int operationsPerThread = 1000; // Restore to test bucket creation properly + final int numReadThreads = 8; + final int readsPerThread = 1000; + final int totalMessages = 5000; final CountDownLatch startLatch = new CountDownLatch(1); - final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final CountDownLatch readersDone = new CountDownLatch(numReadThreads); final AtomicInteger errors = new AtomicInteger(0); final AtomicReference firstException = new AtomicReference<>(); + final AtomicInteger messagesAdded = new AtomicInteger(0); - // Start reader threads - for (int i = 0; i < numThreads / 2; i++) { + // Start reader threads - these will run concurrently + for (int i = 0; i < numReadThreads; i++) { final int threadId = i; executorService.submit(() -> { try { startLatch.await(); - for (int j = 0; j < operationsPerThread; j++) { - long ledgerId = threadId * 1000 + j; - long entryId = j; + // Continuously read for a period while messages are being added + long endTime = System.currentTimeMillis() + 10000; // Read for 10 seconds + int readCount = 0; + while (System.currentTimeMillis() < endTime && readCount < readsPerThread) { + // Check for messages across the range that might be added + long ledgerId = 1000 + (readCount % totalMessages); + long entryId = readCount % 100; // This should not throw exceptions or block indefinitely tracker.containsMessage(ledgerId, entryId); + readCount++; + if (readCount % 100 == 0) { + Thread.sleep(1); // Small delay to allow writes + } } } catch (Exception e) { errors.incrementAndGet(); firstException.compareAndSet(null, e); e.printStackTrace(); } finally { - doneLatch.countDown(); + readersDone.countDown(); } }); } - // Start writer threads - for (int i = numThreads / 2; i < numThreads; i++) { - final int threadId = i; - executorService.submit(() -> { - try { - startLatch.await(); - for (int j = 0; j < operationsPerThread; j++) { - long ledgerId = threadId * 1000 + j; - long entryId = j; - long deliverAt = System.currentTimeMillis() + 10000; // 10s delay - tracker.addMessage(ledgerId, entryId, deliverAt); + // Start the single writer thread - sequential addMessage calls + executorService.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < totalMessages; i++) { + long ledgerId = 1000 + i; // Sequential ledgerId + long entryId = i % 100; + long deliverAt = System.currentTimeMillis() + 10000; // 10s delay + boolean added = tracker.addMessage(ledgerId, entryId, deliverAt); + if (added) { + messagesAdded.incrementAndGet(); + } + // Small delay to simulate real processing time + if (i % 100 == 0) { + Thread.sleep(1); } - } catch (Exception e) { - errors.incrementAndGet(); - firstException.compareAndSet(null, e); - e.printStackTrace(); - } finally { - doneLatch.countDown(); } - }); - } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } + }); startLatch.countDown(); - assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds"); + assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should complete within 30 seconds"); if (errors.get() > 0) { Exception exception = firstException.get(); if (exception != null) { - System.err.println("First exception caught: " + exception.getMessage()); + log.error("First exception caught: " + exception.getMessage()); exception.printStackTrace(); } } assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations"); + assertTrue(messagesAdded.get() > 0, "Some messages should have been added"); } /**