Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions wake/src/main/java/com/microsoft/wake/rx/AbstractRxStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
*/
public abstract class AbstractRxStage<T> implements RxStage<T> {

protected final AtomicBoolean completing;
protected final AtomicBoolean completed;
protected final AtomicBoolean closed;
protected final String name;
protected final Meter inMeter;
Expand All @@ -37,6 +39,8 @@ public abstract class AbstractRxStage<T> implements RxStage<T> {
* @param stageName the stage name
*/
public AbstractRxStage(String stageName) {
this.completing = new AtomicBoolean(false);
this.completed = new AtomicBoolean(false);
this.closed = new AtomicBoolean(false);
this.name = stageName;
this.inMeter = new Meter(stageName+"_in");
Expand All @@ -50,6 +54,9 @@ public AbstractRxStage(String stageName) {
* input must call this each time an event is input.
*/
protected void beforeOnNext() {
if(completing.get()) {
throw new IllegalStateException("Can't call onNext() after onCompleted() or onError() has been called!");
}
inMeter.mark(1);
}

Expand Down Expand Up @@ -80,4 +87,13 @@ public Meter getInMeter() {
public Meter getOutMeter() {
return outMeter;
}
@Override
public void close() throws Exception {
if(!completed.get()) {
throw new IllegalStateException("Can't close until onCompleted() or onError() has returned!");
}
if(!closed.compareAndSet(false, true)) {
throw new IllegalStateException("Can't close a stage more than once!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -147,16 +146,19 @@ public void run() {
}

private void submitCompletion(final Runnable r) {
if(!completing.compareAndSet(false, true)) {
throw new IllegalStateException("Cannot call onError / onCompleted more than once!");
}

executor.shutdown();
completionExecutor.submit(new Runnable() {

@Override
public void run() {
try {
// no timeout for completion, only close()
if (!executor.awaitTermination(3153600000L, TimeUnit.SECONDS)) {
LOG.log(Level.SEVERE, "Executor terminated due to unrequired timeout");
observer.onError(new TimeoutException());
while (!executor.awaitTermination(10L, TimeUnit.SECONDS)) {
LOG.log(Level.SEVERE, "Executor still has not shut down: " + this);
}
} catch (InterruptedException e) {
e.printStackTrace();
Expand All @@ -165,6 +167,11 @@ public void run() {
r.run();
}
});

if(!completed.compareAndSet(false, true)) {
throw new IllegalStateException("Cannot call onError / onCompleted more than once!");
}

}

/**
Expand All @@ -174,19 +181,18 @@ public void run() {
*/
@Override
public void close() throws Exception {
if (closed.compareAndSet(false, true)) {
executor.shutdown();
completionExecutor.shutdown();
if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
List<Runnable> droppedRunnables = executor.shutdownNow();
LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks.");
}
if (!completionExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
List<Runnable> droppedRunnables = completionExecutor.shutdownNow();
LOG.log(Level.WARNING, "Completion executor dropped " + droppedRunnables.size() + " tasks.");
}
super.close();
// executor.shutdown();
completionExecutor.shutdown();
if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
List<Runnable> droppedRunnables = executor.shutdownNow();
LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks.");
}
if (!completionExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
List<Runnable> droppedRunnables = completionExecutor.shutdownNow();
LOG.log(Level.WARNING, "Completion executor dropped " + droppedRunnables.size() + " tasks.");
}
}

Expand Down