diff --git a/wake/src/main/java/com/microsoft/wake/rx/AbstractRxStage.java b/wake/src/main/java/com/microsoft/wake/rx/AbstractRxStage.java index 002a65e..03e54d5 100644 --- a/wake/src/main/java/com/microsoft/wake/rx/AbstractRxStage.java +++ b/wake/src/main/java/com/microsoft/wake/rx/AbstractRxStage.java @@ -26,6 +26,8 @@ */ public abstract class AbstractRxStage implements RxStage { + protected final AtomicBoolean completing; + protected final AtomicBoolean completed; protected final AtomicBoolean closed; protected final String name; protected final Meter inMeter; @@ -37,6 +39,8 @@ public abstract class AbstractRxStage implements RxStage { * @param stageName the stage name */ public AbstractRxStage(String stageName) { + this.completing = new AtomicBoolean(false); + this.completed = new AtomicBoolean(false); this.closed = new AtomicBoolean(false); this.name = stageName; this.inMeter = new Meter(stageName+"_in"); @@ -50,6 +54,9 @@ public AbstractRxStage(String stageName) { * input must call this each time an event is input. */ protected void beforeOnNext() { + if(completing.get()) { + throw new IllegalStateException("Can't call onNext() after onCompleted() or onError() has been called!"); + } inMeter.mark(1); } @@ -80,4 +87,13 @@ public Meter getInMeter() { public Meter getOutMeter() { return outMeter; } + @Override + public void close() throws Exception { + if(!completed.get()) { + throw new IllegalStateException("Can't close until onCompleted() or onError() has returned!"); + } + if(!closed.compareAndSet(false, true)) { + throw new IllegalStateException("Can't close a stage more than once!"); + } + } } diff --git a/wake/src/main/java/com/microsoft/wake/rx/impl/RxThreadPoolStage.java b/wake/src/main/java/com/microsoft/wake/rx/impl/RxThreadPoolStage.java index dd5c608..c7149f6 100644 --- a/wake/src/main/java/com/microsoft/wake/rx/impl/RxThreadPoolStage.java +++ b/wake/src/main/java/com/microsoft/wake/rx/impl/RxThreadPoolStage.java @@ -20,7 +20,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -147,6 +146,10 @@ public void run() { } private void submitCompletion(final Runnable r) { + if(!completing.compareAndSet(false, true)) { + throw new IllegalStateException("Cannot call onError / onCompleted more than once!"); + } + executor.shutdown(); completionExecutor.submit(new Runnable() { @@ -154,9 +157,8 @@ private void submitCompletion(final Runnable r) { public void run() { try { // no timeout for completion, only close() - if (!executor.awaitTermination(3153600000L, TimeUnit.SECONDS)) { - LOG.log(Level.SEVERE, "Executor terminated due to unrequired timeout"); - observer.onError(new TimeoutException()); + while (!executor.awaitTermination(10L, TimeUnit.SECONDS)) { + LOG.log(Level.SEVERE, "Executor still has not shut down: " + this); } } catch (InterruptedException e) { e.printStackTrace(); @@ -165,6 +167,11 @@ public void run() { r.run(); } }); + + if(!completed.compareAndSet(false, true)) { + throw new IllegalStateException("Cannot call onError / onCompleted more than once!"); + } + } /** @@ -174,19 +181,18 @@ public void run() { */ @Override public void close() throws Exception { - if (closed.compareAndSet(false, true)) { - executor.shutdown(); - completionExecutor.shutdown(); - if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { - LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); - List droppedRunnables = executor.shutdownNow(); - LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); - } - if (!completionExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { - LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); - List droppedRunnables = completionExecutor.shutdownNow(); - LOG.log(Level.WARNING, "Completion executor dropped " + droppedRunnables.size() + " tasks."); - } + super.close(); +// executor.shutdown(); + completionExecutor.shutdown(); + if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { + LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); + List droppedRunnables = executor.shutdownNow(); + LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); + } + if (!completionExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { + LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); + List droppedRunnables = completionExecutor.shutdownNow(); + LOG.log(Level.WARNING, "Completion executor dropped " + droppedRunnables.size() + " tasks."); } }