diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index aa9e42a880b..8f99b7b9cad 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -115,7 +115,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { private int exitCode = ExitCode.OK; private final ConcurrentLongHashMap masterKeyCache = - ConcurrentLongHashMap.newBuilder().autoShrink(true).build(); + ConcurrentLongHashMap.newBuilder().build(); protected StateManager stateManager; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java index f1372b28944..6433124b21c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java @@ -42,6 +42,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongFunction; + +import org.junit.Ignore; import org.junit.Test; /** @@ -348,6 +350,7 @@ public void testRehashingWithDeletes() { assertEquals(map.size(), n); } + @Test public void concurrentInsertions() throws Throwable { ConcurrentLongHashMap map = @@ -488,6 +491,170 @@ public void stressConcurrentInsertionsAndReads() throws Throwable { executor.shutdown(); } + @Ignore("autoShrink=true breaks this test, proper fix tbd") + @Test + public void stressConcurrentInsertionsAndReadsAutoShrink() throws Throwable { + ConcurrentLongHashMap map = + ConcurrentLongHashMap.newBuilder() + .concurrencyLevel(4) + .expectedItems(4) + .autoShrink(true) + .build(); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int writeThreads = 8; + final int readThreads = 8; + final int n = 1_000_000; + String[] values = new String[] { + "v", + "vv", + "vvv", + "vvvv", + "vvvvv", + "vvvvvv", + "vvvvvvv", + "vvvvvvvv", + "vvvvvvvvv", + "vvvvvvvvvv", + }; + final int numValues = values.length; + + CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads); + List> futures = new ArrayList<>(); + + System.out.println("Starting writes"); + for (int i = 0; i < writeThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(threadIdx); + + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int j = 0; j < n; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + map.putIfAbsent(key, values[(int)Math.abs(key % numValues)]); + } + })); + } + + System.out.println("Starting reads"); + for (int i = 0; i < readThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(threadIdx); + + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int j = 0; j < n; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + String value = map.get(key); + if (value != null) { + assertEquals(values[(int) Math.abs(key % numValues)], value); + } + } + })); + } + + System.out.println("Waiting for futures"); + int count = 0; + for (Future future : futures) { + future.get(); + count++; + if (count % 1000 == 0) { + System.out.println("Completed " + count + " futures out of " + futures.size()); + } + } + + assertEquals(map.size(), n * writeThreads); + + futures.clear(); + barrier.reset(); + + System.out.println("Starting removes"); + for (int i = 0; i < writeThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(threadIdx); + + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int j = 0; j < n; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + map.putIfAbsent(key, values[(int)Math.abs(key % numValues)]); + map.remove(key); + String value = map.get(key); + assertNull(value); + + } + })); + } + + System.out.println("Starting reads 2"); + for (int i = 0; i < readThreads; i++) { + final int threadIdx = i; + + //for (int k = 0; k < 4; k++) { + futures.add(executor.submit(() -> { + Random random = new Random(threadIdx); + + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int j = 0; j < n; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + String value = map.get(key); + if (value != null) { + assertEquals(values[(int) Math.abs(key % numValues)], value); + } + } + })); + //} + } + + System.out.println("Waiting for futures 2"); + count = 0; + for (Future future : futures) { + future.get(); + count++; + if (count % 1000 == 0) { + System.out.println("Completed " + count + " futures out of " + futures.size()); + } + } + futures.clear(); + + executor.shutdown(); + } + @Test public void testIteration() { ConcurrentLongHashMap map =