Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.apache.logging.log4j.plugins.PluginAttribute;
import org.apache.logging.log4j.plugins.PluginFactory;
import org.jctools.queues.MpscArrayQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;

/**
* Factory for creating instances of BlockingQueues backed by JCTools {@link MpscArrayQueue}.
* Factory for creating instances of BlockingQueues backed by JCTools {@link MpscBlockingConsumerArrayQueue} or {@link MpscArrayQueue}.
*
* @since 2.7
*/
Expand All @@ -43,6 +44,9 @@ private JCToolsBlockingQueueFactory(final WaitStrategy waitStrategy) {

@Override
public BlockingQueue<E> create(final int capacity) {
if (WaitStrategy.PARK.equals(waitStrategy)) {
return new MpscParkBlockingQueue<>(capacity);
}
return new MpscBlockingQueue<>(capacity, waitStrategy);
}

Expand All @@ -52,9 +56,6 @@ public static <E> JCToolsBlockingQueueFactory<E> createFactory(
return new JCToolsBlockingQueueFactory<>(waitStrategy);
}

/**
* BlockingQueue wrapper for JCTools multiple producer single consumer array queue.
*/
private static final class MpscBlockingQueue<E> extends MpscArrayQueue<E> implements BlockingQueue<E> {

private final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy;
Expand Down Expand Up @@ -147,6 +148,69 @@ public E take() throws InterruptedException {
}
}

private static final class MpscParkBlockingQueue<E> extends MpscBlockingConsumerArrayQueue<E> implements BlockingQueue<E> {

private final JCToolsBlockingQueueFactory.WaitStrategy producerWaitStrategy;

MpscParkBlockingQueue(final int capacity) {
super(capacity);
this.producerWaitStrategy = JCToolsBlockingQueueFactory.WaitStrategy.PARK;
}

@Override
public int drainTo(final Collection<? super E> c) {
return drainTo(c, capacity());
}

@Override
public int drainTo(final Collection<? super E> c, final int maxElements) {
return drain(new Consumer<E>() {
@Override
public void accept(final E e) {
c.add(e);
}
}, maxElements);
}

@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
int idleCounter = 0;
final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout);
do {
if (offer(e)) {
return true;
} else if (System.nanoTime() - timeoutNanos > 0) {
return false;
}
idleCounter = producerWaitStrategy.idle(idleCounter);
} while (!Thread.interrupted()); //clear interrupted flag
throw new InterruptedException();
}

@Override
public void put(final E e) throws InterruptedException {
int idleCounter = 0;
do {
if (offer(e)) {
return;
}
idleCounter = producerWaitStrategy.idle(idleCounter);
} while (!Thread.interrupted()); //clear interrupted flag
throw new InterruptedException();
}

@Override
public boolean offer(final E e) {
//keep 2 cache lines empty to avoid false sharing that will slow the consumer thread when queue is full.
return offerIfBelowThreshold(e, capacity() - 32);
}

@Override
public int remainingCapacity() {
return capacity() - size();
}
}

public enum WaitStrategy {
SPIN(new Idle() {
@Override
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@
<activemq.version>5.15.9</activemq.version>
<!-- Allow Clirr severity to be overriden by the command-line option -DminSeverity=level -->
<minSeverity>info</minSeverity>
<jctoolsVersion>3.1.0</jctoolsVersion>
<jctoolsVersion>3.3.0</jctoolsVersion>
<junitVersion>4.13.1</junitVersion>
<junitJupiterVersion>5.7.0</junitJupiterVersion>
<mockitoVersion>3.5.5</mockitoVersion>
Expand Down
5 changes: 4 additions & 1 deletion src/site/asciidoc/manual/appenders.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ attribute, `spinPolicy`, which corresponds to the `SpinPolicy` enum.

|JCToolsBlockingQueue |This uses
https://jctools.github.io/JCTools/[JCTools], specifically the MPSC
bounded lock-free queue.
bounded blocking consumer lock-free queue if a park wait strategy is chosen, the MPSC
bounded lock-free queue otherwise.
The difference between the 2 queues is that the former is optimized to coordinate producers
to wakeup a sleeping consumer.

|LinkedTransferQueue |This uses the new in Java 7 implementation
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedTransferQueue.html[`LinkedTransferQueue`].
Expand Down