diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java index 9333611c646..8840937d152 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java @@ -26,9 +26,10 @@ import org.apache.logging.log4j.plugins.PluginAttribute; import org.apache.logging.log4j.plugins.PluginFactory; import org.jctools.queues.MpscArrayQueue; +import org.jctools.queues.MpscBlockingConsumerArrayQueue; /** - * Factory for creating instances of BlockingQueues backed by JCTools {@link MpscArrayQueue}. + * Factory for creating instances of BlockingQueues backed by JCTools {@link MpscBlockingConsumerArrayQueue} or {@link MpscArrayQueue}. * * @since 2.7 */ @@ -43,6 +44,9 @@ private JCToolsBlockingQueueFactory(final WaitStrategy waitStrategy) { @Override public BlockingQueue create(final int capacity) { + if (WaitStrategy.PARK.equals(waitStrategy)) { + return new MpscParkBlockingQueue<>(capacity); + } return new MpscBlockingQueue<>(capacity, waitStrategy); } @@ -52,9 +56,6 @@ public static JCToolsBlockingQueueFactory createFactory( return new JCToolsBlockingQueueFactory<>(waitStrategy); } - /** - * BlockingQueue wrapper for JCTools multiple producer single consumer array queue. - */ private static final class MpscBlockingQueue extends MpscArrayQueue implements BlockingQueue { private final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy; @@ -147,6 +148,69 @@ public E take() throws InterruptedException { } } + private static final class MpscParkBlockingQueue extends MpscBlockingConsumerArrayQueue implements BlockingQueue { + + private final JCToolsBlockingQueueFactory.WaitStrategy producerWaitStrategy; + + MpscParkBlockingQueue(final int capacity) { + super(capacity); + this.producerWaitStrategy = JCToolsBlockingQueueFactory.WaitStrategy.PARK; + } + + @Override + public int drainTo(final Collection c) { + return drainTo(c, capacity()); + } + + @Override + public int drainTo(final Collection c, final int maxElements) { + return drain(new Consumer() { + @Override + public void accept(final E e) { + c.add(e); + } + }, maxElements); + } + + @Override + public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { + int idleCounter = 0; + final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout); + do { + if (offer(e)) { + return true; + } else if (System.nanoTime() - timeoutNanos > 0) { + return false; + } + idleCounter = producerWaitStrategy.idle(idleCounter); + } while (!Thread.interrupted()); //clear interrupted flag + throw new InterruptedException(); + } + + @Override + public void put(final E e) throws InterruptedException { + int idleCounter = 0; + do { + if (offer(e)) { + return; + } + idleCounter = producerWaitStrategy.idle(idleCounter); + } while (!Thread.interrupted()); //clear interrupted flag + throw new InterruptedException(); + } + + @Override + public boolean offer(final E e) { + //keep 2 cache lines empty to avoid false sharing that will slow the consumer thread when queue is full. + return offerIfBelowThreshold(e, capacity() - 32); + } + + @Override + public int remainingCapacity() { + return capacity() - size(); + } + } + public enum WaitStrategy { SPIN(new Idle() { @Override diff --git a/pom.xml b/pom.xml index ca5dc5813f8..913947e87b3 100644 --- a/pom.xml +++ b/pom.xml @@ -243,7 +243,7 @@ 5.15.9 info - 3.1.0 + 3.3.0 4.13.1 5.7.0 3.5.5 diff --git a/src/site/asciidoc/manual/appenders.adoc b/src/site/asciidoc/manual/appenders.adoc index 5fc0bf3e861..3174baa6b52 100644 --- a/src/site/asciidoc/manual/appenders.adoc +++ b/src/site/asciidoc/manual/appenders.adoc @@ -187,7 +187,10 @@ attribute, `spinPolicy`, which corresponds to the `SpinPolicy` enum. |JCToolsBlockingQueue |This uses https://jctools.github.io/JCTools/[JCTools], specifically the MPSC -bounded lock-free queue. +bounded blocking consumer lock-free queue if a park wait strategy is chosen, the MPSC +bounded lock-free queue otherwise. +The difference between the 2 queues is that the former is optimized to coordinate producers +to wakeup a sleeping consumer. |LinkedTransferQueue |This uses the new in Java 7 implementation https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedTransferQueue.html[`LinkedTransferQueue`].