Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.testng.annotations.AfterMethod;
Expand All @@ -48,6 +49,7 @@
* These tests verify that the hybrid approach with StampedLock and concurrent data structures
* correctly handles concurrent access patterns without deadlocks, race conditions, or data corruption.
*/
@Slf4j
public class BucketDelayedDeliveryTrackerThreadSafetyTest {

private BucketDelayedDeliveryTracker tracker;
Expand Down Expand Up @@ -97,83 +99,99 @@ public void setUp() throws Exception {

@AfterMethod
public void tearDown() throws Exception {
if (tracker != null) {
tracker.close();
}
// First shutdown executor to stop all threads
if (executorService != null) {
assertTrue(MoreExecutors.shutdownAndAwaitTermination(executorService, 5, TimeUnit.SECONDS),
"Executor should shutdown cleanly");
}
// Then close tracker safely after all threads stopped
if (tracker != null) {
tracker.close();
}
}

/**
* Test concurrent containsMessage() calls while adding messages.
* Test concurrent containsMessage() calls while adding messages sequentially.
* This tests the StampedLock optimistic read performance under contention.
* addMessage is executed sequentially (as in real scenarios), while containsMessage is concurrent.
*/
@Test
public void testConcurrentContainsMessageWithWrites() throws Exception {
final int numThreads = 16;
final int operationsPerThread = 1000; // Restore to test bucket creation properly
final int numReadThreads = 8;
final int readsPerThread = 1000;
final int totalMessages = 5000;
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(numThreads);
final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
final AtomicInteger errors = new AtomicInteger(0);
final AtomicReference<Exception> firstException = new AtomicReference<>();
final AtomicInteger messagesAdded = new AtomicInteger(0);

// Start reader threads
for (int i = 0; i < numThreads / 2; i++) {
// Start reader threads - these will run concurrently
for (int i = 0; i < numReadThreads; i++) {
final int threadId = i;
executorService.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < operationsPerThread; j++) {
long ledgerId = threadId * 1000 + j;
long entryId = j;
// Continuously read for a period while messages are being added
long endTime = System.currentTimeMillis() + 10000; // Read for 10 seconds
int readCount = 0;
while (System.currentTimeMillis() < endTime && readCount < readsPerThread) {
// Check for messages across the range that might be added
long ledgerId = 1000 + (readCount % totalMessages);
long entryId = readCount % 100;
// This should not throw exceptions or block indefinitely
tracker.containsMessage(ledgerId, entryId);
readCount++;
if (readCount % 100 == 0) {
Thread.sleep(1); // Small delay to allow writes
}
}
} catch (Exception e) {
errors.incrementAndGet();
firstException.compareAndSet(null, e);
e.printStackTrace();
} finally {
doneLatch.countDown();
readersDone.countDown();
}
});
}

// Start writer threads
for (int i = numThreads / 2; i < numThreads; i++) {
final int threadId = i;
executorService.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < operationsPerThread; j++) {
long ledgerId = threadId * 1000 + j;
long entryId = j;
long deliverAt = System.currentTimeMillis() + 10000; // 10s delay
tracker.addMessage(ledgerId, entryId, deliverAt);
// Start the single writer thread - sequential addMessage calls
executorService.submit(() -> {
try {
startLatch.await();
for (int i = 0; i < totalMessages; i++) {
long ledgerId = 1000 + i; // Sequential ledgerId
long entryId = i % 100;
long deliverAt = System.currentTimeMillis() + 10000; // 10s delay
boolean added = tracker.addMessage(ledgerId, entryId, deliverAt);
if (added) {
messagesAdded.incrementAndGet();
}
// Small delay to simulate real processing time
if (i % 100 == 0) {
Thread.sleep(1);
}
} catch (Exception e) {
errors.incrementAndGet();
firstException.compareAndSet(null, e);
e.printStackTrace();
} finally {
doneLatch.countDown();
}
});
}
} catch (Exception e) {
errors.incrementAndGet();
firstException.compareAndSet(null, e);
e.printStackTrace();
}
});

startLatch.countDown();
assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds");
assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should complete within 30 seconds");

if (errors.get() > 0) {
Exception exception = firstException.get();
if (exception != null) {
System.err.println("First exception caught: " + exception.getMessage());
log.error("First exception caught: " + exception.getMessage());
exception.printStackTrace();
}
}
assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations");
assertTrue(messagesAdded.get() > 0, "Some messages should have been added");
}

/**
Expand Down
Loading