Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions wake/src/main/java/com/microsoft/wake/impl/ForkPoolStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,19 @@ public class ForkPoolStage<T> extends AbstractEStage<T> {

@Inject
public ForkPoolStage(@Parameter(StageConfiguration.StageName.class) String stageName,
@Parameter(StageConfiguration.StageHandler.class) EventHandler<T> handler,
WakeSharedPool sharedPool
) {
@Parameter(StageConfiguration.StageHandler.class) EventHandler<T> handler,
WakeSharedPool sharedPool) {
super(stageName);
this.pool = sharedPool;
this.handler = handler;
//TODO: should WakeSharedPool register its stages?

StageManager.instance().register(this);
StageManager.instance().register(name, this);
}

@Inject
public ForkPoolStage(@Parameter(StageConfiguration.StageHandler.class) EventHandler<T> handler,
WakeSharedPool sharedPool) {
WakeSharedPool sharedPool) {
this(ForkPoolStage.class.getName(), handler, sharedPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public SingleThreadStage(final @Parameter(StageName.class) String name,
thread = new Thread(new Producer<T>(name, queue, handler, interrupted));
thread.setName("SingleThreadStage<" + name + ">");
thread.start();
StageManager.instance().register(this);
StageManager.instance().register(name, this);
}

/**
Expand Down
33 changes: 25 additions & 8 deletions wake/src/main/java/com/microsoft/wake/impl/StageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -32,25 +35,29 @@
public final class StageManager implements Stage {

private static final Logger LOG = Logger.getLogger(StageManager.class.getName());

private final UUID id = UUID.randomUUID();

private static StageManager instance = new StageManager();
private final Map<Stage, String> stageNames;
private final List<Stage> stages;
private final AtomicBoolean closed = new AtomicBoolean(false);

StageManager() {

stages = Collections.synchronizedList(new ArrayList<Stage>());
LOG.log(Level.FINE, "StageManager adds a shutdown hook");
stageNames = Collections.synchronizedMap(new HashMap<Stage, String>());

LOG.log(Level.FINE, "StageManager {0} adds a shutdown hook", id);
Runtime.getRuntime().addShutdownHook(new Thread(
new Runnable() {
@Override
public void run() {
try {
LOG.log(Level.FINEST, "Shutdown hook : closing stages");
LOG.log(Level.FINEST, "{0} Shutdown hook : closing stages", id);
StageManager.instance().close();
LOG.log(Level.FINEST, "Shutdown hook : closed stages");
LOG.log(Level.FINEST, "{0} Shutdown hook : closed stages", id);
} catch (Exception e) {
LOG.log(Level.WARNING, "StageManager close failure " + e.getMessage());
LOG.log(Level.WARNING, "StageManager {0} close failure {1}", new Object[] {id, e.getMessage()});
}
}
}
Expand All @@ -61,17 +68,27 @@ public static StageManager instance() {
return instance;
}

@Deprecated
public void register(Stage stage) {
LOG.log(Level.FINEST, "StageManager adds stage " + stage);
LOG.log(Level.FINE, "StageManager {0} adds stage {1}", new Object[] {id, stage.getClass().getName()});

stages.add(stage);
stageNames.put(stage, "unknown");
}

public void register(String name, Stage stage) {
LOG.log(Level.FINE, "StageManager {0} adds stage {1} name {2}", new Object[] {id, stage.getClass().getName(), name});

stages.add(stage);
stageNames.put(stage, name);
}

@Override
public void close() throws Exception {
if (closed.compareAndSet(false, true)) {
for (Stage stage : stages) {
LOG.log(Level.FINEST, "Closing {0}", stage);
for (final Stage stage : stages) {
final String name = stageNames.get(stage);
LOG.log(Level.FINEST, "{0} Closing stage {1} name {2}", new Object[] {id, stage.getClass().getName(), name});
stage.close();
}
}
Expand Down
2 changes: 1 addition & 1 deletion wake/src/main/java/com/microsoft/wake/impl/SyncStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public SyncStage(final @Parameter(StageName.class) String name,
super(name);
this.handler = handler;
this.errorHandler = errorHandler;
StageManager.instance().register(this);
StageManager.instance().register(name, this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ThreadPoolStage(final @Parameter(StageName.class) String name,
throw new WakeRuntimeException(name + " numThreads " + numThreads + " is less than or equal to 0");
this.numThreads = numThreads;
this.executor = Executors.newFixedThreadPool(numThreads, new DefaultThreadFactory(name));
StageManager.instance().register(this);
StageManager.instance().register(name, this);
}

/**
Expand Down Expand Up @@ -163,7 +163,7 @@ public ThreadPoolStage(final @Parameter(StageName.class) String name,
this.errorHandler = errorHandler;
this.numThreads = 0;
this.executor = executor;
StageManager.instance().register(this);
StageManager.instance().register(name, this);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion wake/src/main/java/com/microsoft/wake/impl/TimerStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void run() {
}

}, initialDelay, period, TimeUnit.MILLISECONDS);
StageManager.instance().register(this);
StageManager.instance().register(name, this);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void uncaughtException(Thread t, Throwable e) {
true);

// register it with the StageManager, since the pool is meant to back stages
StageManager.instance().register(this);
StageManager.instance().register("WakeSharedPool", this);
}

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public <T> DefaultRemoteManagerImplementation(
handlerContainer.setTransport(transport);
myIdentifier = new SocketRemoteIdentifier((InetSocketAddress) transport.getLocalAddress());
reSendStage = new RemoteSenderStage(codec, transport);
StageManager.instance().register(this);
LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2}", new Object[]{this.name, myIdentifier, counter.incrementAndGet()});
StageManager.instance().register(name, this);
LOG.log(Level.FINEST, "RemoteManager: {0} myid {1} counter {2}", new Object[]{this.name, myIdentifier, counter.incrementAndGet()});
}

@Inject
Expand Down Expand Up @@ -157,7 +157,7 @@ public <T> EventHandler<T> getHandler(RemoteIdentifier destinationIdentifier,
public <T, U extends T> AutoCloseable registerHandler(RemoteIdentifier sourceIdentifier,
Class<U> messageType, EventHandler<T> theHandler) {
if (LOG.isLoggable(Level.FINE))
LOG.log(Level.FINE, "RemoteManager: {0} remoteid: {1} messageType: {2} handler: {3}", new Object[]{this.name, sourceIdentifier, messageType.getName(), theHandler.getClass().getName()});
LOG.log(Level.FINE, "RemoteManager: {0} myid: {1} remoteid: {2} messageType: {3} handler: {4}", new Object[] {this.name, myIdentifier, sourceIdentifier, messageType.getName(), theHandler.getClass().getName()});
return handlerContainer.registerHandler(sourceIdentifier, messageType, theHandler);
}

Expand All @@ -172,7 +172,7 @@ public <T, U extends T> AutoCloseable registerHandler(RemoteIdentifier sourceIde
public <T, U extends T> AutoCloseable registerHandler(Class<U> messageType,
EventHandler<RemoteMessage<T>> theHandler) {
if (LOG.isLoggable(Level.FINE))
LOG.log(Level.FINE, "RemoteManager: {0} messageType: {1} handler: {2}", new Object[]{this.name, messageType.getName(), theHandler.getClass().getName()});
LOG.log(Level.FINE, "RemoteManager: {0} myid: {1} messageType: {2} handler: {3}", new Object[]{this.name, myIdentifier, messageType.getName(), theHandler.getClass().getName()});
return handlerContainer.registerHandler(messageType, theHandler);
}

Expand All @@ -199,31 +199,31 @@ public RemoteIdentifier getMyIdentifier() {
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}", new Object[]{this.name, myIdentifier});
LOG.log(Level.FINE, "RemoteManager: {0} {1} Closing remote manager", new Object[] {name, myIdentifier});

final Runnable closeRunnable = new Runnable() {
@Override
public void run() {
try {
LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier);
LOG.log(Level.FINE, "RemoteManager: {0} {1} Closing sender stage", new Object[] {name, myIdentifier});
reSendStage.close();
LOG.log(Level.FINE, "Closed the remote sender stage");
LOG.log(Level.FINE, "RemoteManager: {0} {1} Closed the remote sender stage", new Object[] {name, myIdentifier});
} catch (final Exception e) {
LOG.log(Level.SEVERE, "Unable to close the remote sender stage", e);
}

try {
LOG.log(Level.FINE, "Closing transport {0}", myIdentifier);
LOG.log(Level.FINE, "RemoteManager: {0} {1} Closing transport", new Object[] {name, myIdentifier});
transport.close();
LOG.log(Level.FINE, "Closed the transport");
LOG.log(Level.FINE, "RemoteManager: {0} {1} Closed the transport", new Object[] {name, myIdentifier});
} catch (final Exception e) {
LOG.log(Level.SEVERE, "Unable to close the transport.", e);
}

try {
LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier);
LOG.log(Level.FINE, "RemoteManager: {0} {1} Closing receiver stage", new Object[] {name, myIdentifier});
reRecvStage.close();
LOG.log(Level.FINE, "Closed the remote receiver stage");
LOG.log(Level.FINE, "RemoteManager: {0} {1} Closed the remote receiver stage", new Object[] {name, myIdentifier});
} catch (final Exception e) {
LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e);
}
Expand All @@ -235,7 +235,7 @@ public void run() {
closeExecutor.submit(closeRunnable);
closeExecutor.shutdown();
if (!closeExecutor.isShutdown()) {
LOG.log(Level.SEVERE, "close executor did not shutdown properly.");
LOG.log(Level.SEVERE, "RemoteManager: {0} {1} close executor did not shutdown properly.", new Object[] {name, myIdentifier});
}


Expand All @@ -250,9 +250,9 @@ public void run() {


if (closeExecutor.isTerminated()) {
LOG.log(Level.FINE, "close executor did terminate properly.");
LOG.log(Level.FINE, "RemoteManager: {0} {1} close executor did terminate properly.", new Object[] {name, myIdentifier});
} else {
LOG.log(Level.SEVERE, "close executor did not terminate properly.");
LOG.log(Level.SEVERE, "RemoteManager: {0} {1} close executor did not terminate properly.", new Object[] {name, myIdentifier});
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,13 @@
*/
public class RemoteEvent<T> {

//private static final AtomicLong curSeq = new AtomicLong(0);
private SocketAddress localAddr;
private SocketAddress remoteAddr;
private String src;
private String sink;
private final T event;
private final long seq;

/**
* Constructs a remote event
*
* @param localAddr the local socket address
* @param remoteAddr the remote socket address
* @param src the source
* @param sink the remote sink
* @param event the event
*/
/*public RemoteEvent(SocketAddress localAddr, SocketAddress remoteAddr, String src, String sink, T event) {
this.localAddr = localAddr;
this.remoteAddr = remoteAddr;
this.src = src;
this.sink = sink;
this.event = event;
this.seq = curSeq.getAndIncrement();
}
*/

/**
* Constructs a remote event
*
Expand Down Expand Up @@ -180,7 +160,7 @@ public String toString() {
builder.append(" seq=");
builder.append(seq);
builder.append(" event=");
builder.append(event);
builder.append(event.toString());
return builder.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public NettyChannelHandler(String tag, ChannelGroup channelGroup, NettyEventList
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, tag + " " + e.toString() + " local: " + e.getChannel().getLocalAddress() +
" remote: " + e.getChannel().getRemoteAddress());
if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "{0} {1} local: {2} remote: {3}",
new Object[] {tag, e.toString(), e.getChannel().getLocalAddress(), e.getChannel().getRemoteAddress()});
}
super.handleUpstream(ctx, e);
}
Expand Down Expand Up @@ -108,10 +108,8 @@ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
LOG.log(Level.WARNING, "Unexpected exception from downstream. " + " channel: " + e.getChannel() +
" local: " + e.getChannel().getLocalAddress() +
" remote: " + e.getChannel().getRemoteAddress() +
" cause: " + e.getCause());
LOG.log(Level.WARNING, "{0} unexpected exception from downstream. channel: {1} local: {2} remote: {3} cause: {4}",
new Object[] {tag, e.getChannel(), e.getChannel().getLocalAddress(), e.getChannel().getRemoteAddress(), e.getCause()});
e.getCause().printStackTrace();
e.getChannel().close();
listener.exceptionCaught(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public RxSyncStage(@Parameter(StageName.class) String name,
@Parameter(StageObserver.class) Observer<T> observer) {
super(name);
this.observer = observer;
StageManager.instance().register(this);
StageManager.instance().register(name, this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public RxThreadPoolStage(@Parameter(StageName.class) final String name,
tf = new DefaultThreadFactory(name);
this.executor = Executors.newFixedThreadPool(numThreads, tf);
this.completionExecutor = Executors.newSingleThreadExecutor(tf);
StageManager.instance().register(this);
StageManager.instance().register(name, this);
}

/**
Expand Down