diff --git a/wake/src/main/java/com/microsoft/wake/impl/ForkPoolStage.java b/wake/src/main/java/com/microsoft/wake/impl/ForkPoolStage.java index fa9be24..629834f 100644 --- a/wake/src/main/java/com/microsoft/wake/impl/ForkPoolStage.java +++ b/wake/src/main/java/com/microsoft/wake/impl/ForkPoolStage.java @@ -49,20 +49,19 @@ public class ForkPoolStage extends AbstractEStage { @Inject public ForkPoolStage(@Parameter(StageConfiguration.StageName.class) String stageName, - @Parameter(StageConfiguration.StageHandler.class) EventHandler handler, - WakeSharedPool sharedPool - ) { + @Parameter(StageConfiguration.StageHandler.class) EventHandler handler, + WakeSharedPool sharedPool) { super(stageName); this.pool = sharedPool; this.handler = handler; //TODO: should WakeSharedPool register its stages? - StageManager.instance().register(this); + StageManager.instance().register(name, this); } @Inject public ForkPoolStage(@Parameter(StageConfiguration.StageHandler.class) EventHandler handler, - WakeSharedPool sharedPool) { + WakeSharedPool sharedPool) { this(ForkPoolStage.class.getName(), handler, sharedPool); } diff --git a/wake/src/main/java/com/microsoft/wake/impl/SingleThreadStage.java b/wake/src/main/java/com/microsoft/wake/impl/SingleThreadStage.java index 7b19f94..a8565fb 100644 --- a/wake/src/main/java/com/microsoft/wake/impl/SingleThreadStage.java +++ b/wake/src/main/java/com/microsoft/wake/impl/SingleThreadStage.java @@ -71,7 +71,7 @@ public SingleThreadStage(final @Parameter(StageName.class) String name, thread = new Thread(new Producer(name, queue, handler, interrupted)); thread.setName("SingleThreadStage<" + name + ">"); thread.start(); - StageManager.instance().register(this); + StageManager.instance().register(name, this); } /** diff --git a/wake/src/main/java/com/microsoft/wake/impl/StageManager.java b/wake/src/main/java/com/microsoft/wake/impl/StageManager.java index bff73a4..456ee6c 100644 --- a/wake/src/main/java/com/microsoft/wake/impl/StageManager.java +++ b/wake/src/main/java/com/microsoft/wake/impl/StageManager.java @@ -18,7 +18,10 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,25 +35,29 @@ public final class StageManager implements Stage { private static final Logger LOG = Logger.getLogger(StageManager.class.getName()); - + private final UUID id = UUID.randomUUID(); + private static StageManager instance = new StageManager(); + private final Map stageNames; private final List stages; private final AtomicBoolean closed = new AtomicBoolean(false); StageManager() { stages = Collections.synchronizedList(new ArrayList()); - LOG.log(Level.FINE, "StageManager adds a shutdown hook"); + stageNames = Collections.synchronizedMap(new HashMap()); + + LOG.log(Level.FINE, "StageManager {0} adds a shutdown hook", id); Runtime.getRuntime().addShutdownHook(new Thread( new Runnable() { @Override public void run() { try { - LOG.log(Level.FINEST, "Shutdown hook : closing stages"); + LOG.log(Level.FINEST, "{0} Shutdown hook : closing stages", id); StageManager.instance().close(); - LOG.log(Level.FINEST, "Shutdown hook : closed stages"); + LOG.log(Level.FINEST, "{0} Shutdown hook : closed stages", id); } catch (Exception e) { - LOG.log(Level.WARNING, "StageManager close failure " + e.getMessage()); + LOG.log(Level.WARNING, "StageManager {0} close failure {1}", new Object[] {id, e.getMessage()}); } } } @@ -61,17 +68,27 @@ public static StageManager instance() { return instance; } + @Deprecated public void register(Stage stage) { - LOG.log(Level.FINEST, "StageManager adds stage " + stage); + LOG.log(Level.FINE, "StageManager {0} adds stage {1}", new Object[] {id, stage.getClass().getName()}); + + stages.add(stage); + stageNames.put(stage, "unknown"); + } + + public void register(String name, Stage stage) { + LOG.log(Level.FINE, "StageManager {0} adds stage {1} name {2}", new Object[] {id, stage.getClass().getName(), name}); stages.add(stage); + stageNames.put(stage, name); } @Override public void close() throws Exception { if (closed.compareAndSet(false, true)) { - for (Stage stage : stages) { - LOG.log(Level.FINEST, "Closing {0}", stage); + for (final Stage stage : stages) { + final String name = stageNames.get(stage); + LOG.log(Level.FINEST, "{0} Closing stage {1} name {2}", new Object[] {id, stage.getClass().getName(), name}); stage.close(); } } diff --git a/wake/src/main/java/com/microsoft/wake/impl/SyncStage.java b/wake/src/main/java/com/microsoft/wake/impl/SyncStage.java index 12cec5d..e644481 100644 --- a/wake/src/main/java/com/microsoft/wake/impl/SyncStage.java +++ b/wake/src/main/java/com/microsoft/wake/impl/SyncStage.java @@ -75,7 +75,7 @@ public SyncStage(final @Parameter(StageName.class) String name, super(name); this.handler = handler; this.errorHandler = errorHandler; - StageManager.instance().register(this); + StageManager.instance().register(name, this); } /** diff --git a/wake/src/main/java/com/microsoft/wake/impl/ThreadPoolStage.java b/wake/src/main/java/com/microsoft/wake/impl/ThreadPoolStage.java index 97292af..92c8ebe 100644 --- a/wake/src/main/java/com/microsoft/wake/impl/ThreadPoolStage.java +++ b/wake/src/main/java/com/microsoft/wake/impl/ThreadPoolStage.java @@ -84,7 +84,7 @@ public ThreadPoolStage(final @Parameter(StageName.class) String name, throw new WakeRuntimeException(name + " numThreads " + numThreads + " is less than or equal to 0"); this.numThreads = numThreads; this.executor = Executors.newFixedThreadPool(numThreads, new DefaultThreadFactory(name)); - StageManager.instance().register(this); + StageManager.instance().register(name, this); } /** @@ -163,7 +163,7 @@ public ThreadPoolStage(final @Parameter(StageName.class) String name, this.errorHandler = errorHandler; this.numThreads = 0; this.executor = executor; - StageManager.instance().register(this); + StageManager.instance().register(name, this); } /** diff --git a/wake/src/main/java/com/microsoft/wake/impl/TimerStage.java b/wake/src/main/java/com/microsoft/wake/impl/TimerStage.java index 1f0d32d..d20ffa8 100644 --- a/wake/src/main/java/com/microsoft/wake/impl/TimerStage.java +++ b/wake/src/main/java/com/microsoft/wake/impl/TimerStage.java @@ -109,7 +109,7 @@ public void run() { } }, initialDelay, period, TimeUnit.MILLISECONDS); - StageManager.instance().register(this); + StageManager.instance().register(name, this); } diff --git a/wake/src/main/java/com/microsoft/wake/impl/WakeSharedPool.java b/wake/src/main/java/com/microsoft/wake/impl/WakeSharedPool.java index d3ae868..6f93286 100644 --- a/wake/src/main/java/com/microsoft/wake/impl/WakeSharedPool.java +++ b/wake/src/main/java/com/microsoft/wake/impl/WakeSharedPool.java @@ -69,7 +69,7 @@ public void uncaughtException(Thread t, Throwable e) { true); // register it with the StageManager, since the pool is meant to back stages - StageManager.instance().register(this); + StageManager.instance().register("WakeSharedPool", this); } @Inject diff --git a/wake/src/main/java/com/microsoft/wake/remote/impl/DefaultRemoteManagerImplementation.java b/wake/src/main/java/com/microsoft/wake/remote/impl/DefaultRemoteManagerImplementation.java index b3bbb76..68b3314 100644 --- a/wake/src/main/java/com/microsoft/wake/remote/impl/DefaultRemoteManagerImplementation.java +++ b/wake/src/main/java/com/microsoft/wake/remote/impl/DefaultRemoteManagerImplementation.java @@ -89,8 +89,8 @@ public DefaultRemoteManagerImplementation( handlerContainer.setTransport(transport); myIdentifier = new SocketRemoteIdentifier((InetSocketAddress) transport.getLocalAddress()); reSendStage = new RemoteSenderStage(codec, transport); - StageManager.instance().register(this); - LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2}", new Object[]{this.name, myIdentifier, counter.incrementAndGet()}); + StageManager.instance().register(name, this); + LOG.log(Level.FINEST, "RemoteManager: {0} myid {1} counter {2}", new Object[]{this.name, myIdentifier, counter.incrementAndGet()}); } @Inject @@ -157,7 +157,7 @@ public EventHandler getHandler(RemoteIdentifier destinationIdentifier, public AutoCloseable registerHandler(RemoteIdentifier sourceIdentifier, Class messageType, EventHandler theHandler) { if (LOG.isLoggable(Level.FINE)) - LOG.log(Level.FINE, "RemoteManager: {0} remoteid: {1} messageType: {2} handler: {3}", new Object[]{this.name, sourceIdentifier, messageType.getName(), theHandler.getClass().getName()}); + LOG.log(Level.FINE, "RemoteManager: {0} myid: {1} remoteid: {2} messageType: {3} handler: {4}", new Object[] {this.name, myIdentifier, sourceIdentifier, messageType.getName(), theHandler.getClass().getName()}); return handlerContainer.registerHandler(sourceIdentifier, messageType, theHandler); } @@ -172,7 +172,7 @@ public AutoCloseable registerHandler(RemoteIdentifier sourceIde public AutoCloseable registerHandler(Class messageType, EventHandler> theHandler) { if (LOG.isLoggable(Level.FINE)) - LOG.log(Level.FINE, "RemoteManager: {0} messageType: {1} handler: {2}", new Object[]{this.name, messageType.getName(), theHandler.getClass().getName()}); + LOG.log(Level.FINE, "RemoteManager: {0} myid: {1} messageType: {2} handler: {3}", new Object[]{this.name, myIdentifier, messageType.getName(), theHandler.getClass().getName()}); return handlerContainer.registerHandler(messageType, theHandler); } @@ -199,31 +199,31 @@ public RemoteIdentifier getMyIdentifier() { @Override public void close() { if (closed.compareAndSet(false, true)) { - LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}", new Object[]{this.name, myIdentifier}); + LOG.log(Level.FINE, "RemoteManager: {0} {1} Closing remote manager", new Object[] {name, myIdentifier}); final Runnable closeRunnable = new Runnable() { @Override public void run() { try { - LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier); + LOG.log(Level.FINE, "RemoteManager: {0} {1} Closing sender stage", new Object[] {name, myIdentifier}); reSendStage.close(); - LOG.log(Level.FINE, "Closed the remote sender stage"); + LOG.log(Level.FINE, "RemoteManager: {0} {1} Closed the remote sender stage", new Object[] {name, myIdentifier}); } catch (final Exception e) { LOG.log(Level.SEVERE, "Unable to close the remote sender stage", e); } try { - LOG.log(Level.FINE, "Closing transport {0}", myIdentifier); + LOG.log(Level.FINE, "RemoteManager: {0} {1} Closing transport", new Object[] {name, myIdentifier}); transport.close(); - LOG.log(Level.FINE, "Closed the transport"); + LOG.log(Level.FINE, "RemoteManager: {0} {1} Closed the transport", new Object[] {name, myIdentifier}); } catch (final Exception e) { LOG.log(Level.SEVERE, "Unable to close the transport.", e); } try { - LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier); + LOG.log(Level.FINE, "RemoteManager: {0} {1} Closing receiver stage", new Object[] {name, myIdentifier}); reRecvStage.close(); - LOG.log(Level.FINE, "Closed the remote receiver stage"); + LOG.log(Level.FINE, "RemoteManager: {0} {1} Closed the remote receiver stage", new Object[] {name, myIdentifier}); } catch (final Exception e) { LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e); } @@ -235,7 +235,7 @@ public void run() { closeExecutor.submit(closeRunnable); closeExecutor.shutdown(); if (!closeExecutor.isShutdown()) { - LOG.log(Level.SEVERE, "close executor did not shutdown properly."); + LOG.log(Level.SEVERE, "RemoteManager: {0} {1} close executor did not shutdown properly.", new Object[] {name, myIdentifier}); } @@ -250,9 +250,9 @@ public void run() { if (closeExecutor.isTerminated()) { - LOG.log(Level.FINE, "close executor did terminate properly."); + LOG.log(Level.FINE, "RemoteManager: {0} {1} close executor did terminate properly.", new Object[] {name, myIdentifier}); } else { - LOG.log(Level.SEVERE, "close executor did not terminate properly."); + LOG.log(Level.SEVERE, "RemoteManager: {0} {1} close executor did not terminate properly.", new Object[] {name, myIdentifier}); } } diff --git a/wake/src/main/java/com/microsoft/wake/remote/impl/RemoteEvent.java b/wake/src/main/java/com/microsoft/wake/remote/impl/RemoteEvent.java index 7c36c65..b8f3dab 100644 --- a/wake/src/main/java/com/microsoft/wake/remote/impl/RemoteEvent.java +++ b/wake/src/main/java/com/microsoft/wake/remote/impl/RemoteEvent.java @@ -25,7 +25,6 @@ */ public class RemoteEvent { - //private static final AtomicLong curSeq = new AtomicLong(0); private SocketAddress localAddr; private SocketAddress remoteAddr; private String src; @@ -33,25 +32,6 @@ public class RemoteEvent { private final T event; private final long seq; - /** - * Constructs a remote event - * - * @param localAddr the local socket address - * @param remoteAddr the remote socket address - * @param src the source - * @param sink the remote sink - * @param event the event - */ - /*public RemoteEvent(SocketAddress localAddr, SocketAddress remoteAddr, String src, String sink, T event) { - this.localAddr = localAddr; - this.remoteAddr = remoteAddr; - this.src = src; - this.sink = sink; - this.event = event; - this.seq = curSeq.getAndIncrement(); - } - */ - /** * Constructs a remote event * @@ -180,7 +160,7 @@ public String toString() { builder.append(" seq="); builder.append(seq); builder.append(" event="); - builder.append(event); + builder.append(event.toString()); return builder.toString(); } diff --git a/wake/src/main/java/com/microsoft/wake/remote/transport/netty/NettyChannelHandler.java b/wake/src/main/java/com/microsoft/wake/remote/transport/netty/NettyChannelHandler.java index 9b7de9e..6c4c82e 100644 --- a/wake/src/main/java/com/microsoft/wake/remote/transport/netty/NettyChannelHandler.java +++ b/wake/src/main/java/com/microsoft/wake/remote/transport/netty/NettyChannelHandler.java @@ -54,8 +54,8 @@ public NettyChannelHandler(String tag, ChannelGroup channelGroup, NettyEventList @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { - if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, tag + " " + e.toString() + " local: " + e.getChannel().getLocalAddress() + - " remote: " + e.getChannel().getRemoteAddress()); + if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "{0} {1} local: {2} remote: {3}", + new Object[] {tag, e.toString(), e.getChannel().getLocalAddress(), e.getChannel().getRemoteAddress()}); } super.handleUpstream(ctx, e); } @@ -108,10 +108,8 @@ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws */ @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - LOG.log(Level.WARNING, "Unexpected exception from downstream. " + " channel: " + e.getChannel() + - " local: " + e.getChannel().getLocalAddress() + - " remote: " + e.getChannel().getRemoteAddress() + - " cause: " + e.getCause()); + LOG.log(Level.WARNING, "{0} unexpected exception from downstream. channel: {1} local: {2} remote: {3} cause: {4}", + new Object[] {tag, e.getChannel(), e.getChannel().getLocalAddress(), e.getChannel().getRemoteAddress(), e.getCause()}); e.getCause().printStackTrace(); e.getChannel().close(); listener.exceptionCaught(e); diff --git a/wake/src/main/java/com/microsoft/wake/rx/impl/RxSyncStage.java b/wake/src/main/java/com/microsoft/wake/rx/impl/RxSyncStage.java index 6ce7078..9df4650 100644 --- a/wake/src/main/java/com/microsoft/wake/rx/impl/RxSyncStage.java +++ b/wake/src/main/java/com/microsoft/wake/rx/impl/RxSyncStage.java @@ -55,7 +55,7 @@ public RxSyncStage(@Parameter(StageName.class) String name, @Parameter(StageObserver.class) Observer observer) { super(name); this.observer = observer; - StageManager.instance().register(this); + StageManager.instance().register(name, this); } /** diff --git a/wake/src/main/java/com/microsoft/wake/rx/impl/RxThreadPoolStage.java b/wake/src/main/java/com/microsoft/wake/rx/impl/RxThreadPoolStage.java index 35748e6..337f2e2 100644 --- a/wake/src/main/java/com/microsoft/wake/rx/impl/RxThreadPoolStage.java +++ b/wake/src/main/java/com/microsoft/wake/rx/impl/RxThreadPoolStage.java @@ -91,7 +91,7 @@ public RxThreadPoolStage(@Parameter(StageName.class) final String name, tf = new DefaultThreadFactory(name); this.executor = Executors.newFixedThreadPool(numThreads, tf); this.completionExecutor = Executors.newSingleThreadExecutor(tf); - StageManager.instance().register(this); + StageManager.instance().register(name, this); } /**