diff --git a/src/main/java/gov/usgs/earthquake/distribution/AdminSocketServer.java b/src/main/java/gov/usgs/earthquake/distribution/AdminSocketServer.java
index 40eda448..2bd8f20f 100644
--- a/src/main/java/gov/usgs/earthquake/distribution/AdminSocketServer.java
+++ b/src/main/java/gov/usgs/earthquake/distribution/AdminSocketServer.java
@@ -19,7 +19,7 @@
/**
* Telnet to this socket to get a "command prompt".
- *
+ *
* @author jmfee
*/
public class AdminSocketServer extends DefaultConfigurable implements
@@ -74,7 +74,7 @@ public void shutdown() throws Exception {
/**
* Process a line of input.
- *
+ *
* @param line
* input
* @param out
@@ -144,10 +144,11 @@ public void onSocket(Socket socket) {
in = socket.getInputStream();
out = socket.getOutputStream();
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- String line = null;
- while ((line = br.readLine()) != null) {
- processLine(line, out);
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(in))) {
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ processLine(line, out);
+ }
}
} catch (Exception ex) {
LOGGER.log(Level.WARNING, "[" + getName()
diff --git a/src/main/java/gov/usgs/earthquake/distribution/EIDSNotificationSender.java b/src/main/java/gov/usgs/earthquake/distribution/EIDSNotificationSender.java
index c452f287..66934fb2 100644
--- a/src/main/java/gov/usgs/earthquake/distribution/EIDSNotificationSender.java
+++ b/src/main/java/gov/usgs/earthquake/distribution/EIDSNotificationSender.java
@@ -9,7 +9,6 @@
import java.io.File;
import java.util.Date;
-import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
diff --git a/src/main/java/gov/usgs/earthquake/distribution/ExternalNotificationListener.java b/src/main/java/gov/usgs/earthquake/distribution/ExternalNotificationListener.java
index 4245ed67..ceb4f305 100644
--- a/src/main/java/gov/usgs/earthquake/distribution/ExternalNotificationListener.java
+++ b/src/main/java/gov/usgs/earthquake/distribution/ExternalNotificationListener.java
@@ -7,7 +7,7 @@
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
-import gov.usgs.util.StreamUtils;
+import gov.usgs.util.ProcessManager;
import gov.usgs.util.XmlUtils;
import java.io.File;
@@ -19,33 +19,31 @@
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
-import java.util.logging.Level;
import java.util.logging.Logger;
/**
* An external process that is called when new products arrive.
- *
+ *
* The ExternalNotificationListener implements the Configurable interface and
* can use the following configuration parameters:
- *
+ *
*
* - command
* - (Required) The command to execute. This must be an executable command and
* may include arguments. Any product specific arguments are appended at the end
* of command.
- *
+ *
* - storage
* - (Required) A directory used to store all products. Each product is
* extracted into a separate directory within this directory and is referenced
* by the --directory=/path/to/directory argument when command is executed.
*
- *
+ *
*/
public class ExternalNotificationListener extends DefaultNotificationListener {
/** Logging object. */
- private static final Logger LOGGER = Logger
- .getLogger(ExternalNotificationListener.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(ExternalNotificationListener.class.getName());
/** Configuration parameter for storage directory product. */
public static final String STORAGE_NAME_PROPERTY = "storage";
@@ -66,7 +64,7 @@ public class ExternalNotificationListener extends DefaultNotificationListener {
/**
* Construct a new ExternalNotificationListener.
- *
+ *
* The listener must be configured with a FileProductStorage and command to
* function.
*/
@@ -75,41 +73,32 @@ public ExternalNotificationListener() {
/**
* Configure an ExternalNotificationListener using a Config object.
- *
- * @param config
- * the config containing a
+ *
+ * @param config the config containing a
*/
public void configure(Config config) throws Exception {
super.configure(config);
command = config.getProperty(COMMAND_PROPERTY);
if (command == null) {
- throw new ConfigurationException("[" + getName()
- + "] 'command' is a required configuration property");
+ throw new ConfigurationException("[" + getName() + "] 'command' is a required configuration property");
}
LOGGER.config("[" + getName() + "] command is '" + command + "'");
// storage references an object in the global configuration
String storageName = config.getProperty(STORAGE_NAME_PROPERTY);
- String storageDirectory = config
- .getProperty(STORAGE_DIRECTORY_PROPERTY);
+ String storageDirectory = config.getProperty(STORAGE_DIRECTORY_PROPERTY);
if (storageName == null && storageDirectory == null) {
- throw new ConfigurationException("[" + getName()
- + "] 'storage' is a required configuration property.");
+ throw new ConfigurationException("[" + getName() + "] 'storage' is a required configuration property.");
}
if (storageName != null) {
- LOGGER.config("[" + getName() + "] loading FileProductStorage '"
- + storageName + "'");
- storage = (FileProductStorage) Config.getConfig().getObject(
- storageName);
+ LOGGER.config("[" + getName() + "] loading FileProductStorage '" + storageName + "'");
+ storage = (FileProductStorage) Config.getConfig().getObject(storageName);
if (storage == null) {
- throw new ConfigurationException("[" + getName()
- + "] unable to load FileProductStorage '" + storageName
- + "'");
+ throw new ConfigurationException("[" + getName() + "] unable to load FileProductStorage '" + storageName + "'");
}
} else {
- LOGGER.config("[" + getName() + "] using storage directory '"
- + storageDirectory + "'");
+ LOGGER.config("[" + getName() + "] using storage directory '" + storageDirectory + "'");
storage = new FileProductStorage(new File(storageDirectory));
}
}
@@ -136,9 +125,8 @@ public void startup() throws Exception {
/**
* Append product arguments to the base command.
- *
- * @param product
- * the product used to generate arguments.
+ *
+ * @param product the product used to generate arguments.
* @return command as a string.
* @throws Exception
*/
@@ -150,34 +138,26 @@ public String getProductCommand(final Product product) throws Exception {
// get path to product in storage, should be a directory
File productDirectory = storage.getProductFile(id);
- buf.append(" ").append(CLIProductBuilder.DIRECTORY_ARGUMENT)
- .append(productDirectory.getCanonicalPath());
-
- buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT)
- .append(id.getType());
- buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT)
- .append(id.getCode());
- buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT)
- .append(id.getSource());
- buf.append(" ").append(CLIProductBuilder.UPDATE_TIME_ARGUMENT)
- .append(XmlUtils.formatDate(id.getUpdateTime()));
- buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT)
- .append(product.getStatus());
+ buf.append(" ").append(CLIProductBuilder.DIRECTORY_ARGUMENT).append(productDirectory.getCanonicalPath());
+
+ buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT).append(id.getType());
+ buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT).append(id.getCode());
+ buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT).append(id.getSource());
+ buf.append(" ").append(CLIProductBuilder.UPDATE_TIME_ARGUMENT).append(XmlUtils.formatDate(id.getUpdateTime()));
+ buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT).append(product.getStatus());
if (product.isDeleted()) {
buf.append(" ").append(CLIProductBuilder.DELETE_ARGUMENT);
}
if (product.getTrackerURL() != null) {
- buf.append(" ").append(CLIProductBuilder.TRACKER_URL_ARGUMENT)
- .append(product.getTrackerURL().toString());
+ buf.append(" ").append(CLIProductBuilder.TRACKER_URL_ARGUMENT).append(product.getTrackerURL().toString());
}
Map props = product.getProperties();
Iterator iter = props.keySet().iterator();
while (iter.hasNext()) {
String name = iter.next();
- buf.append(" \"").append(CLIProductBuilder.PROPERTY_ARGUMENT)
- .append(name).append("=")
+ buf.append(" \"").append(CLIProductBuilder.PROPERTY_ARGUMENT).append(name).append("=")
.append(props.get(name).replace("\"", "\\\"")).append("\"");
}
@@ -187,8 +167,7 @@ public String getProductCommand(final Product product) throws Exception {
String relation = iter.next();
Iterator iter2 = links.get(relation).iterator();
while (iter2.hasNext()) {
- buf.append(" ").append(CLIProductBuilder.LINK_ARGUMENT)
- .append(relation).append("=")
+ buf.append(" ").append(CLIProductBuilder.LINK_ARGUMENT).append(relation).append("=")
.append(iter2.next().toString());
}
}
@@ -196,13 +175,11 @@ public String getProductCommand(final Product product) throws Exception {
Content content = product.getContents().get("");
if (content != null) {
buf.append(" ").append(CLIProductBuilder.CONTENT_ARGUMENT);
- buf.append(" ").append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT)
- .append(content.getContentType());
+ buf.append(" ").append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT).append(content.getContentType());
}
if (product.getSignature() != null) {
- buf.append(" ").append(SIGNATURE_ARGUMENT)
- .append(product.getSignature());
+ buf.append(" ").append(SIGNATURE_ARGUMENT).append(product.getSignature());
}
return buf.toString();
@@ -210,16 +187,13 @@ public String getProductCommand(final Product product) throws Exception {
/**
* Split a command string into a command array.
- *
- * This version uses a StringTokenizer to split arguments. Quoted arguments
- * are supported (single or double), with quotes removed before passing to
- * runtime. Double quoting arguments will preserve quotes when passing to
- * runtime.
- *
- * @param command
- * command to run.
- * @return Array of arguments suitable for passing to
- * Runtime.exec(String[]).
+ *
+ * This version uses a StringTokenizer to split arguments. Quoted arguments are
+ * supported (single or double), with quotes removed before passing to runtime.
+ * Double quoting arguments will preserve quotes when passing to runtime.
+ *
+ * @param command command to run.
+ * @return Array of arguments suitable for passing to Runtime.exec(String[]).
*/
protected static String[] splitCommand(final String command) {
List arguments = new LinkedList();
@@ -242,8 +216,7 @@ protected static String[] splitCommand(final String command) {
if (currentArgument.endsWith("\"")) {
// that has balanced quotes
// remove quotes and add argument
- currentArgument = currentArgument.substring(1,
- currentArgument.length() - 1);
+ currentArgument = currentArgument.substring(1, currentArgument.length() - 1);
} else {
// unbalanced quotes, keep going
continue;
@@ -253,8 +226,7 @@ protected static String[] splitCommand(final String command) {
if (currentArgument.endsWith("'")) {
// that has balanced quotes
// remove quotes and add argument
- currentArgument = currentArgument.substring(1,
- currentArgument.length() - 1);
+ currentArgument = currentArgument.substring(1, currentArgument.length() - 1);
} else {
// unbalanced quotes, keep going
continue;
@@ -275,7 +247,7 @@ protected static String[] splitCommand(final String command) {
/**
* Call the external process for this product.
- *
+ *
* @param product
* @throws Exception
*/
@@ -284,122 +256,47 @@ public void onProduct(final Product product) throws Exception {
try {
storage.storeProduct(product);
} catch (ProductAlreadyInStorageException e) {
- LOGGER.info("[" + getName() + "] product already in storage "
- + product.getId().toString());
+ LOGGER.info("[" + getName() + "] product already in storage " + product.getId().toString());
}
// now run command
- String productCommand = null;
- Process process = null;
- int exitValue = -1;
+ runProductCommand(getProductCommand(product), product);
+ }
- try {
- productCommand = getProductCommand(product);
- LOGGER.info("[" + getName() + "] running command " + productCommand);
- process = Runtime.getRuntime().exec(productCommand);
+ /**
+ * Run a product command.
+ *
+ * @param command command and arguments.
+ * @param product product, when set and empty content (path "") is defined, the
+ * content is provided to the command on stdin.
+ * @throws Exception
+ */
+ public void runProductCommand(final String command, final Product product) throws Exception {
+ // execute
+ LOGGER.info("[" + getName() + "] running command " + command);
- // inline product content, may or may not be null
+ ProcessManager process = new ProcessManager(command, this.getTimeout());
+ // Stream content over stdin if it exists
+ if (product != null) {
Content content = product.getContents().get("");
if (content != null) {
- StreamUtils.transferStream(content.getInputStream(),
- process.getOutputStream());
- } else {
- // need to close process stdin either way
- StreamUtils.closeStream(process.getOutputStream());
+ process.stdin = content.getInputStream();
}
-
- // maybe log/capture process input/error streams
- // or switch to "Command"
-
- exitValue = process.waitFor();
- } catch (Exception e) {
- if (process != null) {
- // make sure to kill zombies
- process.destroy();
- }
-
- // signal that process did not exit normally
- exitValue = -1;
-
- // give subclasses chance to handle exception
- commandException(product, productCommand, e);
}
-
- // if process exited normally
- if (exitValue != -1) {
- // give subclasses chance to handle exitValue, which may be non-zero
- commandComplete(product, productCommand, exitValue);
+ int exitValue = process.call();
+ LOGGER.info("[" + getName() + "] command '" + command + "' exited with status '" + exitValue + "'");
+ if (exitValue != 0) {
+ byte[] errorOutput = process.stderr.toByteArray();
+ LOGGER.fine("[" + getName() + "] command '" + command + "' stderr output '" + new String(errorOutput) + "'");
}
- }
-
- /**
- * Called when the command finishes executing normally.
- *
- * This implementation throws a NotificationListenerException if the
- * exitValue is non-zero.
- *
- * @param product
- * the product being processed.
- * @param command
- * the generated command, as a string.
- * @param exitValue
- * the exit status of the process.
- * @throws Exception
- * When re-notification should occur, based on maxTries, or none
- * if done.
- */
- public void commandComplete(final Product product, final String command,
- final int exitValue) throws Exception {
- LOGGER.info("[" + getName() + "] command '" + command
- + "' exited with status '" + exitValue + "'");
// send heartbeat info
HeartbeatListener.sendHeartbeatMessage(getName(), "command", command);
- HeartbeatListener.sendHeartbeatMessage(getName(), "exit value",
- Integer.toString(exitValue));
+ HeartbeatListener.sendHeartbeatMessage(getName(), "exit value", Integer.toString(exitValue));
if (exitValue != 0) {
- throw new NotificationListenerException("[" + getName()
- + "] command exited with status " + exitValue);
- }
- }
-
- /**
- * Called when an exception occurs while running command.
- *
- * This implementation throws a NotificationListenerException with exception
- * as the cause.
- *
- * @param product
- * product being processed
- * @param productCommand
- * command that was built
- * @param exception
- * exception that was thrown during execution. This will be an
- * InterruptedException if the process timed out.
- * @throws Exception
- * When re-notification should occur, based on maxTries, or none
- * if done.
- */
- public void commandException(final Product product,
- final String productCommand, final Exception exception)
- throws Exception {
- if (exception instanceof InterruptedException) {
- LOGGER.warning("[" + getName() + "] command '" + productCommand
- + "' timed out");
- } else {
- LOGGER.log(Level.WARNING, "[" + getName()
- + "] exception running command '" + productCommand + "'",
- exception);
+ throw new NotificationListenerException("[" + getName() + "] command exited with status " + exitValue);
}
-
- // send heartbeat info
- HeartbeatListener.sendHeartbeatMessage(getName(), "exception",
- productCommand);
- HeartbeatListener.sendHeartbeatMessage(getName(), "exception class",
- exception.getClass().getName());
-
- throw new NotificationListenerException(exception);
}
/**
@@ -410,8 +307,7 @@ public FileProductStorage getStorage() {
}
/**
- * @param storage
- * the storage to set
+ * @param storage the storage to set
*/
public void setStorage(FileProductStorage storage) {
this.storage = storage;
@@ -425,8 +321,7 @@ public String getCommand() {
}
/**
- * @param command
- * the command to set
+ * @param command the command to set
*/
public void setCommand(String command) {
this.command = command;
diff --git a/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java b/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java
index 6c746db3..913e34bd 100644
--- a/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java
+++ b/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java
@@ -1,7 +1,6 @@
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.product.Product;
-import gov.usgs.earthquake.distribution.HeartbeatStatus;
import gov.usgs.util.Config;
import java.util.Date;
@@ -22,9 +21,9 @@
/**
* Heartbeat Listener stores heartbeat messages and writes them to a heartbeat
* file when a product is received
- *
+ *
* @author tene
- *
+ *
*/
public class HeartbeatListener extends DefaultNotificationListener {
@@ -63,7 +62,7 @@ public class HeartbeatListener extends DefaultNotificationListener {
/**
* Create a new HeartbeatListener.
- *
+ *
* Sets up the includeTypes list to contain "heartbeat".
*/
public HeartbeatListener() throws Exception {
@@ -119,7 +118,7 @@ public void onProduct(final Product product) throws Exception {
/**
* Send heartbeat data to heartbeat listener
- *
+ *
* @param component
* @param key
* @param value
@@ -151,7 +150,7 @@ public static void sendHeartbeatMessage(final String component,
/**
* Write heartbeat data for all components to the heartbeat file
- *
+ *
* @return true
* @throws IOException
*/
diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductTracker.java b/src/main/java/gov/usgs/earthquake/distribution/ProductTracker.java
index 63175127..07f22bef 100644
--- a/src/main/java/gov/usgs/earthquake/distribution/ProductTracker.java
+++ b/src/main/java/gov/usgs/earthquake/distribution/ProductTracker.java
@@ -4,7 +4,6 @@
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.product.ProductId;
-import gov.usgs.earthquake.distribution.HeartbeatListener;
import gov.usgs.util.Config;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.XmlUtils;
@@ -26,12 +25,12 @@
/**
* Send updates and search sent updates about distribution status.
- *
+ *
* ProductDistribution clients to send status updates about received
* notifications, and processed products.
- *
+ *
* Search Example
- *
+ *
*
* ProductTracker tracker = new ProductTracker(new URL(
* "http://ehppdl1.cr.usgs.gov/tracker/"));
@@ -43,19 +42,19 @@
* List<ProductTrackerUpdate> updates = tracker.getUpdates(source, type, code,
* updateTime, className);
*
- *
+ *
* Update Example
- *
+ *
*
* Product product = ...;
* ProductTracker tracker = new ProductTracker(product.getTrackerURL()).;
- * ProductTrackerUpdate update = new ProductTrackerUpdate(product.getTrackerURL(),
- * product.getId(),
- * "my component name",
+ * ProductTrackerUpdate update = new ProductTrackerUpdate(product.getTrackerURL(),
+ * product.getId(),
+ * "my component name",
* "my component message");
* tracker.sendUpdate(update);
*
- *
+ *
*/
public class ProductTracker {
@@ -68,7 +67,7 @@ public class ProductTracker {
/**
* Set whether sending tracker updates is enabled from this host.
- *
+ *
* @param enabled
* true to send tracker updates, false to disable.
*/
@@ -103,7 +102,7 @@ public void setTrackerURL(URL trackerURL) {
/**
* Send an update to this ProductTracker.
- *
+ *
* @param update
* the update to send to the tracker.
* @return the update object processed by the tracker, including sequence
@@ -134,7 +133,7 @@ public ProductTrackerUpdate sendUpdate(final ProductTrackerUpdate update)
/**
* Send an update to this ProductTracker.
- *
+ *
* @param update
* the update to send to the tracker.
* @return the raw XML returned by the tracker, or null if unable to send.
@@ -193,10 +192,10 @@ public List getUpdates(final String source,
/**
* Search for updates on this tracker.
- *
+ *
* At least one field must be not null, or this method will return no
* updates.
- *
+ *
* @param source
* product source.
* @param type
@@ -222,7 +221,7 @@ public List getUpdates(final String source,
/**
* Search for updates on this tracker, returning raw xml.
- *
+ *
* @param source
* @param type
* @param code
@@ -266,7 +265,7 @@ public String getUpdateXML(final String source, final String type,
/**
* Send a custom tracker update message.
- *
+ *
* @param className
* the module that is sending the message.
* @param id
@@ -285,7 +284,7 @@ public ProductTrackerUpdate sendUpdate(final String className,
/**
* Send a productCreated update.
- *
+ *
* @param className
* the module that created the product.
* @param id
@@ -302,7 +301,7 @@ public ProductTrackerUpdate productCreated(final String className,
/**
* Send a productIndexed update.
- *
+ *
* @param className
* the module that indexed the product.
* @param id
@@ -319,7 +318,7 @@ public ProductTrackerUpdate productIndexed(final String className,
/**
* Send a notificationSent update.
- *
+ *
* @param className
* the module that sent the notification.
* @param notification
@@ -337,7 +336,7 @@ public ProductTrackerUpdate notificationSent(final String className,
/**
* Send a notificationReceived update.
- *
+ *
* @param className
* the module that received the notification.
* @param notification
@@ -355,7 +354,7 @@ public ProductTrackerUpdate notificationReceived(final String className,
/**
* Send a productDownloaded update.
- *
+ *
* @param className
* the module that downloaded the product.
* @param id
@@ -373,7 +372,7 @@ public ProductTrackerUpdate productDownloaded(final String className,
/**
* Send a productReceived update.
- *
+ *
* @param className
* the module that received the product.
* @param id
@@ -391,7 +390,7 @@ public ProductTrackerUpdate productReceived(final String className,
/**
* Send an exception update.
- *
+ *
* @param className
* the module that encountered an exception.
* @param id
@@ -411,7 +410,7 @@ public ProductTrackerUpdate exception(final String className,
/**
* Encode data for a HTTP Post.
- *
+ *
* @param data
* a map containing name value pairs for encoding.
* @return a string of encoded data.
@@ -434,7 +433,7 @@ public static String encodeURLData(final Map data)
/**
* Execute a HTTP Post.
- *
+ *
* @param url
* the target url.
* @param data
@@ -469,7 +468,7 @@ public static String post(final URL url, final Map data)
/**
* Parse xml received from a ProductTracker using a ProductTrackerParser.
- *
+ *
* @param trackerURL
* the trackerURL being parsed (so updates are flagged as from
* this tracker).
@@ -497,7 +496,7 @@ public List parseTrackerResponse(
/**
* Command Line Interface to ProductTracker.
- *
+ *
* @param args
* @throws Exception
*/
diff --git a/src/main/java/gov/usgs/earthquake/geoserve/ANSSRegionsFactory.java b/src/main/java/gov/usgs/earthquake/geoserve/ANSSRegionsFactory.java
index a38fa0dd..c48c8883 100644
--- a/src/main/java/gov/usgs/earthquake/geoserve/ANSSRegionsFactory.java
+++ b/src/main/java/gov/usgs/earthquake/geoserve/ANSSRegionsFactory.java
@@ -12,7 +12,6 @@
import javax.json.Json;
import javax.json.JsonObject;
-import gov.usgs.earthquake.geoserve.GeoserveLayersService;
import gov.usgs.earthquake.qdm.Regions;
import gov.usgs.util.FileUtils;
import gov.usgs.util.StreamUtils;
@@ -23,7 +22,7 @@
*
* Simplest usage:
* ANSSRegionsFactory.getFactory().getRegions()
- *
+ *
* Regions are not fetched until {@link #startup()}
* (or {@link #fetchRegions()}) is called.
*/
@@ -73,7 +72,7 @@ public ANSSRegionsFactory (final GeoserveLayersService geoserveLayersService) {
}
/**
- * Get the global ANSSRegionsFactory,
+ * Get the global ANSSRegionsFactory,
* creating and starting if needed.
*/
public static synchronized ANSSRegionsFactory getFactory() {
diff --git a/src/main/java/gov/usgs/earthquake/indexer/ExternalIndexerListener.java b/src/main/java/gov/usgs/earthquake/indexer/ExternalIndexerListener.java
index 566bc60a..82aad841 100644
--- a/src/main/java/gov/usgs/earthquake/indexer/ExternalIndexerListener.java
+++ b/src/main/java/gov/usgs/earthquake/indexer/ExternalIndexerListener.java
@@ -15,7 +15,7 @@
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
-import gov.usgs.util.StreamUtils;
+import gov.usgs.util.ProcessManager;
import gov.usgs.util.XmlUtils;
import java.io.File;
@@ -24,51 +24,47 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* ExternalIndexerListener triggers external, non-Java listener processes.
- *
- * Provides a translation to a command-line interface
- * for the product indexer to speak with external, non-Java listeners.
- *
+ *
+ * Provides a translation to a command-line interface for the product indexer to
+ * speak with external, non-Java listeners.
+ *
* As a child-class of the AbstractListener, this also accepts the following
* configration parameters:
- *
+ *
*
* - command
* - (Required) The command to execute. This must be an executable command and
* may include arguments. Any product-specific arguments are appended at the end
* of command.
- *
+ *
* - storage
* - (Required) A directory used to store all products. Each product is
* extracted into a separate directory within this directory and is referenced
* by the --directory=/path/to/directory argument when command is executed.
- *
+ *
* - processUnassociated
* - (Optional, Default = false) Whether or not to process unassociated
* products. Valid values are "true" and "false".
- *
+ *
* - processPreferredOnly
* - (Optional, Default = false) Whether or not to process only preferred
* products of the type accepted by this listener. Valid values are "true" and
* "false".
- *
+ *
* - autoArchive
* - (Optional, Default = false) Whether or not to archive products from
* storage when they are archived by the indexer.
- *
+ *
*
*/
-public class ExternalIndexerListener extends DefaultIndexerListener implements
- IndexerListener {
+public class ExternalIndexerListener extends DefaultIndexerListener {
- private static final Logger LOGGER = Logger
- .getLogger(ExternalIndexerListener.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(ExternalIndexerListener.class.getName());
public static final String EVENT_ACTION_ARGUMENT = "--action=";
public static final String EVENT_IDS_ARGUMENT = "--eventids=";
@@ -108,9 +104,9 @@ public class ExternalIndexerListener extends DefaultIndexerListener implements
/**
* Construct a new ExternalIndexerListener object
- *
- * The listener must be configured with a FileProductStorage and a command
- * to function.
+ *
+ * The listener must be configured with a FileProductStorage and a command to
+ * function.
*/
public ExternalIndexerListener() {
super();
@@ -118,7 +114,7 @@ public ExternalIndexerListener() {
/*
* (non-Javadoc)
- *
+ *
* @see gov.usgs.earthquake.indexer.IndexerListener#onIndexerEvent(gov.usgs.
* earthquake.indexer.IndexerEvent)
*/
@@ -130,8 +126,7 @@ public void onIndexerEvent(IndexerEvent change) throws Exception {
// store product first
Product product = storeProduct(change.getProduct());
- for (Iterator changeIter = change
- .getIndexerChanges().iterator(); changeIter.hasNext();) {
+ for (Iterator changeIter = change.getIndexerChanges().iterator(); changeIter.hasNext();) {
IndexerChange indexerChange = changeIter.next();
// check if we should process this change
@@ -140,16 +135,14 @@ public void onIndexerEvent(IndexerEvent change) throws Exception {
}
// build command
- final String indexerCommand = getProductSummaryCommand(change,
- indexerChange);
+ final String indexerCommand = getProductSummaryCommand(change, indexerChange);
runProductCommand(indexerCommand, product);
}
}
if (autoArchive) {
- Iterator changeIter = change.getIndexerChanges()
- .iterator();
+ Iterator changeIter = change.getIndexerChanges().iterator();
ProductStorage storage = getStorage();
while (changeIter.hasNext()) {
IndexerChange nextChange = changeIter.next();
@@ -157,24 +150,17 @@ public void onIndexerEvent(IndexerEvent change) throws Exception {
// one product being archived
if (change.getSummary() != null) {
ProductId productId = change.getSummary().getId();
- LOGGER.log(Level.FINER,
- "[" + getName() + "] auto archiving product "
- + productId.toString());
+ LOGGER.log(Level.FINER, "[" + getName() + "] auto archiving product " + productId.toString());
storage.removeProduct(productId);
}
} else if (nextChange.getType() == IndexerChangeType.EVENT_ARCHIVED) {
// all products on event being archived
Event changeEvent = nextChange.getOriginalEvent();
- LOGGER.log(Level.FINER,
- "[" + getName() + "] auto archiving event "
- + changeEvent.getEventId() + " products");
- Iterator productIter = changeEvent
- .getAllProductList().iterator();
+ LOGGER.log(Level.FINER, "[" + getName() + "] auto archiving event " + changeEvent.getEventId() + " products");
+ Iterator productIter = changeEvent.getAllProductList().iterator();
while (productIter.hasNext()) {
ProductId productId = productIter.next().getId();
- LOGGER.log(Level.FINER,
- "[" + getName() + "] auto archiving product "
- + productId.toString());
+ LOGGER.log(Level.FINER, "[" + getName() + "] auto archiving product " + productId.toString());
storage.removeProduct(productId);
}
}
@@ -196,8 +182,7 @@ public Product storeProduct(final Product product) throws Exception {
getStorage().storeProduct(product);
listenerProduct = getStorage().getProduct(product.getId());
} else {
- LOGGER.finer("[" + getName()
- + "] Change product is null. Probably archiving.");
+ LOGGER.finer("[" + getName() + "] Change product is null. Probably archiving.");
}
} catch (ProductAlreadyInStorageException paise) {
LOGGER.info("[" + getName() + "] product already in storage");
@@ -212,70 +197,42 @@ public Product storeProduct(final Product product) throws Exception {
* Run a product command.
*
* @param command command and arguments.
- * @param product product, when set and empty content (path "") is defined,
- * the content is provided to the command on stdin.
+ * @param product product, when set and empty content (path "") is defined, the
+ * content is provided to the command on stdin.
* @throws Exception
*/
public void runProductCommand(final String command, final Product product) throws Exception {
// execute
LOGGER.info("[" + getName() + "] running command " + command);
- final Process process = Runtime.getRuntime().exec(command);
+ ProcessManager process = new ProcessManager(command, this.getTimeout());
// Stream content over stdin if it exists
if (product != null) {
Content content = product.getContents().get("");
if (content != null) {
- StreamUtils.transferStream(content.getInputStream(),
- process.getOutputStream());
+ process.stdin = content.getInputStream();
}
}
-
- // Close the output stream
- StreamUtils.closeStream(process.getOutputStream());
-
- Timer commandTimer = new Timer();
- if (this.getTimeout() > 0) {
- // Schedule process destruction for commandTimeout
- // milliseconds in the future
- commandTimer.schedule(new TimerTask() {
- public void run() {
- LOGGER.warning("[" + getName()
- + "] command timeout '" + command
- + "', destroying process.");
- process.destroy();
- }
- }, this.getTimeout());
+ int exitValue = process.call();
+ LOGGER.info("[" + getName() + "] command '" + command + "' exited with status '" + exitValue + "'");
+ if (exitValue != 0) {
+ byte[] errorOutput = process.stderr.toByteArray();
+ LOGGER.fine("[" + getName() + "] command '" + command + "' stderr output '" + new String(errorOutput) + "'");
}
- // Wait for process to complete
- process.waitFor();
- // Cancel the timer if it was not triggered
- commandTimer.cancel();
- LOGGER.info("[" + getName() + "] command '" + command
- + "' exited with status '" + process.exitValue() + "'");
- if (process.exitValue() != 0) {
- byte[] errorOutput = StreamUtils.readStream(process.getErrorStream());
- LOGGER.fine("[" + getName() + "] command '" + command + "' stderr output '" +
- new String(errorOutput) + "'");
- }
- StreamUtils.closeStream(process.getErrorStream());
-
// send heartbeat info
HeartbeatListener.sendHeartbeatMessage(getName(), "command", command);
- HeartbeatListener.sendHeartbeatMessage(getName(), "exit value",
- Integer.toString(process.exitValue()));
+ HeartbeatListener.sendHeartbeatMessage(getName(), "exit value", Integer.toString(exitValue));
}
/**
* Get the product command and add the indexer arguments to it.
- *
- * @param change
- * The IndexerEvent received by the ExternalIndexerListener
+ *
+ * @param change The IndexerEvent received by the ExternalIndexerListener
* @return the command to execute with its arguments as a string
* @throws Exception
*/
- public String getProductSummaryCommand(IndexerEvent change,
- IndexerChange indexerChange) throws Exception {
+ public String getProductSummaryCommand(IndexerEvent change, IndexerChange indexerChange) throws Exception {
ProductSummary summary = change.getSummary();
Event event = indexerChange.getNewEvent();
@@ -287,9 +244,7 @@ public String getProductSummaryCommand(IndexerEvent change,
String command = getProductSummaryCommand(event, summary);
// Tells external indexer what type of index event occurred.
- command = command + " " +
- ExternalIndexerListener.EVENT_ACTION_ARGUMENT +
- indexerChange.getType().toString();
+ command = command + " " + ExternalIndexerListener.EVENT_ACTION_ARGUMENT + indexerChange.getType().toString();
return command;
}
@@ -312,32 +267,23 @@ public String getProductSummaryCommand(Event event, ProductSummary summary) thro
indexerCommand.append(getProductSummaryArguments(summary));
}
-
Product product = null;
try {
product = getStorage().getProduct(summary.getId());
} catch (Exception e) {
// when archiving product may not exist
- LOGGER.log(
- Level.FINE,
- "Exception retreiving product from storage, probably archiving",
- e);
+ LOGGER.log(Level.FINE, "Exception retreiving product from storage, probably archiving", e);
}
if (product != null) {
// Can only add these arguments if there is a product
Content content = product.getContents().get("");
if (content != null) {
- indexerCommand.append(" ").append(
- CLIProductBuilder.CONTENT_ARGUMENT);
- indexerCommand.append(" ")
- .append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT)
- .append(content.getContentType());
+ indexerCommand.append(" ").append(CLIProductBuilder.CONTENT_ARGUMENT);
+ indexerCommand.append(" ").append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT).append(content.getContentType());
}
if (product.getSignature() != null) {
- indexerCommand
- .append(" ")
- .append(ExternalNotificationListener.SIGNATURE_ARGUMENT)
+ indexerCommand.append(" ").append(ExternalNotificationListener.SIGNATURE_ARGUMENT)
.append(product.getSignature());
}
@@ -356,14 +302,9 @@ public String getEventArguments(final Event event) {
StringBuffer buf = new StringBuffer();
EventSummary eventSummary = event.getEventSummary();
- buf.append(" ")
- .append(ExternalIndexerListener.PREFERRED_ID_ARGUMENT)
- .append(eventSummary.getId());
- buf.append(" ")
- .append(ExternalIndexerListener.PREFERRED_EVENTSOURCE_ARGUMENT)
- .append(eventSummary.getSource());
- buf.append(" ")
- .append(ExternalIndexerListener.PREFERRED_EVENTSOURCECODE_ARGUMENT)
+ buf.append(" ").append(ExternalIndexerListener.PREFERRED_ID_ARGUMENT).append(eventSummary.getId());
+ buf.append(" ").append(ExternalIndexerListener.PREFERRED_EVENTSOURCE_ARGUMENT).append(eventSummary.getSource());
+ buf.append(" ").append(ExternalIndexerListener.PREFERRED_EVENTSOURCECODE_ARGUMENT)
.append(eventSummary.getSourceCode());
Map> eventids = event.getAllEventCodes(true);
Iterator sourceIter = eventids.keySet().iterator();
@@ -380,20 +321,15 @@ public String getEventArguments(final Event event) {
}
}
- buf.append(" ").append(PREFERRED_MAGNITUDE_ARGUMENT)
- .append(eventSummary.getMagnitude());
- buf.append(" ").append(PREFERRED_LATITUDE_ARGUMENT)
- .append(eventSummary.getLatitude());
- buf.append(" ").append(PREFERRED_LONGITUDE_ARGUMENT)
- .append(eventSummary.getLongitude());
- buf.append(" ").append(PREFERRED_DEPTH_ARGUMENT)
- .append(eventSummary.getDepth());
+ buf.append(" ").append(PREFERRED_MAGNITUDE_ARGUMENT).append(eventSummary.getMagnitude());
+ buf.append(" ").append(PREFERRED_LATITUDE_ARGUMENT).append(eventSummary.getLatitude());
+ buf.append(" ").append(PREFERRED_LONGITUDE_ARGUMENT).append(eventSummary.getLongitude());
+ buf.append(" ").append(PREFERRED_DEPTH_ARGUMENT).append(eventSummary.getDepth());
String eventTime = null;
if (event.getTime() != null) {
eventTime = XmlUtils.formatDate(event.getTime());
}
- buf.append(" ").append(PREFERRED_ORIGIN_TIME_ARGUMENT)
- .append(eventTime);
+ buf.append(" ").append(PREFERRED_ORIGIN_TIME_ARGUMENT).append(eventTime);
return buf.toString();
}
@@ -410,33 +346,22 @@ public String getProductSummaryArguments(final ProductSummary summary) throws IO
File productDirectory = getStorage().getProductFile(summary.getId());
if (productDirectory.exists()) {
// Add the directory argument
- buf.append(" ")
- .append(CLIProductBuilder.DIRECTORY_ARGUMENT)
- .append(productDirectory.getCanonicalPath());
+ buf.append(" ").append(CLIProductBuilder.DIRECTORY_ARGUMENT).append(productDirectory.getCanonicalPath());
}
// Add arguments from summary
- buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT)
- .append(summary.getType());
- buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT)
- .append(summary.getCode());
- buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT)
- .append(summary.getSource());
- buf.append(" ")
- .append(CLIProductBuilder.UPDATE_TIME_ARGUMENT)
- .append(XmlUtils.formatDate(summary.getUpdateTime()));
- buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT)
- .append(summary.getStatus());
+ buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT).append(summary.getType());
+ buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT).append(summary.getCode());
+ buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT).append(summary.getSource());
+ buf.append(" ").append(CLIProductBuilder.UPDATE_TIME_ARGUMENT).append(XmlUtils.formatDate(summary.getUpdateTime()));
+ buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT).append(summary.getStatus());
if (summary.isDeleted()) {
- buf.append(" ")
- .append(CLIProductBuilder.DELETE_ARGUMENT);
+ buf.append(" ").append(CLIProductBuilder.DELETE_ARGUMENT);
}
// Add optional tracker URL argument
if (summary.getTrackerURL() != null) {
- buf.append(" ")
- .append(CLIProductBuilder.TRACKER_URL_ARGUMENT)
- .append(summary.getTrackerURL());
+ buf.append(" ").append(CLIProductBuilder.TRACKER_URL_ARGUMENT).append(summary.getTrackerURL());
}
// Add property arguments
@@ -444,10 +369,8 @@ public String getProductSummaryArguments(final ProductSummary summary) throws IO
Iterator iter = props.keySet().iterator();
while (iter.hasNext()) {
String name = iter.next();
- buf.append(" \"")
- .append(CLIProductBuilder.PROPERTY_ARGUMENT).append(name)
- .append("=").append(props.get(name).replace("\"", "\\\""))
- .append("\"");
+ buf.append(" \"").append(CLIProductBuilder.PROPERTY_ARGUMENT).append(name).append("=")
+ .append(props.get(name).replace("\"", "\\\"")).append("\"");
}
// Add link arguments
@@ -457,9 +380,7 @@ public String getProductSummaryArguments(final ProductSummary summary) throws IO
String relation = iter.next();
Iterator iter2 = links.get(relation).iterator();
while (iter2.hasNext()) {
- buf.append(" ")
- .append(CLIProductBuilder.LINK_ARGUMENT)
- .append(relation).append("=")
+ buf.append(" ").append(CLIProductBuilder.LINK_ARGUMENT).append(relation).append("=")
.append(iter2.next().toString());
}
}
@@ -469,17 +390,15 @@ public String getProductSummaryArguments(final ProductSummary summary) throws IO
/**
* Configure an ExternalNotificationListener using a Config object.
- *
- * @param config
- * the config containing a
+ *
+ * @param config the config containing a
*/
public void configure(Config config) throws Exception {
super.configure(config);
command = config.getProperty(COMMAND_PROPERTY);
if (command == null) {
- throw new ConfigurationException("[" + getName()
- + "] 'command' is a required configuration property");
+ throw new ConfigurationException("[" + getName() + "] 'command' is a required configuration property");
}
LOGGER.config("[" + getName() + "] command is '" + command + "'");
@@ -487,29 +406,22 @@ public void configure(Config config) throws Exception {
String storageName = config.getProperty(STORAGE_NAME_PROPERTY);
String directoryName = config.getProperty(STORAGE_DIRECTORY_PROPERTY);
if (storageName == null && directoryName == null) {
- throw new ConfigurationException("[" + getName()
- + "] one of 'storage' or 'storageDirectory' is required");
+ throw new ConfigurationException("[" + getName() + "] one of 'storage' or 'storageDirectory' is required");
}
if (storageName != null) {
- LOGGER.config("[" + getName() + "] loading FileProductStorage '"
- + storageName + "'");
- storage = (FileProductStorage) Config.getConfig().getObject(
- storageName);
+ LOGGER.config("[" + getName() + "] loading FileProductStorage '" + storageName + "'");
+ storage = (FileProductStorage) Config.getConfig().getObject(storageName);
if (storage == null) {
- throw new ConfigurationException("[" + getName()
- + "] unable to load FileProductStorage '" + storageName
- + "'");
+ throw new ConfigurationException("[" + getName() + "] unable to load FileProductStorage '" + storageName + "'");
}
} else {
- LOGGER.config("[" + getName() + "] using storage directory '"
- + directoryName + "'");
+ LOGGER.config("[" + getName() + "] using storage directory '" + directoryName + "'");
storage = new FileProductStorage(new File(directoryName));
storage.setName(getName() + "-storage");
}
- autoArchive = Boolean.valueOf(config.getProperty(AUTO_ARCHIVE_PROPERTY,
- AUTO_ARCHIVE_DEFAULT));
+ autoArchive = Boolean.valueOf(config.getProperty(AUTO_ARCHIVE_PROPERTY, AUTO_ARCHIVE_DEFAULT));
LOGGER.config("[" + getName() + "] autoArchive = " + autoArchive);
}
@@ -541,8 +453,7 @@ public FileProductStorage getStorage() {
}
/**
- * @param storage
- * the storage to set
+ * @param storage the storage to set
*/
public void setStorage(FileProductStorage storage) {
this.storage = storage;
@@ -556,8 +467,7 @@ public String getCommand() {
}
/**
- * @param command
- * the command to set
+ * @param command the command to set
*/
public void setCommand(String command) {
this.command = command;
@@ -571,8 +481,7 @@ public boolean isAutoArchive() {
}
/**
- * @param autoArchive
- * the autoArchive to set
+ * @param autoArchive the autoArchive to set
*/
public void setAutoArchive(boolean autoArchive) {
this.autoArchive = autoArchive;
diff --git a/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java b/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java
index 56807439..4f1f43e1 100644
--- a/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java
+++ b/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java
@@ -12,9 +12,9 @@
/**
* ReliableIndexerListener listens for product changes by the indexer, then handles the new products independently in a background thread.
- *
+ *
* This class does little more than output logs for the products it has seen; it is designed to be extended.
- *
+ *
* Several useful methods are availble to be overridden or otherwise used:
*
* - onBeforeProcessThreadStart
@@ -22,16 +22,16 @@
* - getNextProducts
* - processProducts
*
- *
+ *
* This class accepts an index for querying in config:
- *
+ *
*
* - index
* - (Required) The index to use for product querying.
*
*/
-public class ReliableIndexerListener extends DefaultIndexerListener implements IndexerListener, Runnable {
+public class ReliableIndexerListener extends DefaultIndexerListener implements Runnable {
protected static final Logger LOGGER = Logger
.getLogger(ReliableIndexerListener.class.getName());
@@ -47,7 +47,7 @@ public class ReliableIndexerListener extends DefaultIndexerListener implements I
/**
* Sets up an object on start
- *
+ *
* @param config configuration
*
* @throws Exception if missing product index
@@ -68,13 +68,13 @@ public void configure(Config config) throws Exception {
}
}
- /**
+ /**
* Wakes thread when indexer makes changes
- *
+ *
* @param delta Indexer Event - not used
*
* @throws Exception if something goes wrong
- */
+ */
public void onIndexerEvent(IndexerEvent delta) throws Exception {
//Synchronized on the syncObject so we don't miss events
synchronized (syncObject) {
@@ -149,7 +149,7 @@ public void run() {
/**
* Starts thread
- *
+ *
* Calls onBeforeProcessThreadStart() in case subclasses want to add functionality
*
* @throws Exception if there's a thread issue
@@ -208,7 +208,7 @@ public long getLastIndexId() {
*/
public void setLastIndexId(final long lastIndexId) {
this.lastIndexId = lastIndexId;
- }
+ }
/**
* Run before thread start.
@@ -266,5 +266,5 @@ public void processProduct(final ProductSummary product) throws Exception {
}
-
+
}
diff --git a/src/main/java/gov/usgs/earthquake/nats/NATSClient.java b/src/main/java/gov/usgs/earthquake/nats/NATSClient.java
index 0e78abcf..db8d23ca 100644
--- a/src/main/java/gov/usgs/earthquake/nats/NATSClient.java
+++ b/src/main/java/gov/usgs/earthquake/nats/NATSClient.java
@@ -6,7 +6,6 @@
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.StreamingConnectionFactory;
-import java.math.BigInteger;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.security.MessageDigest;
diff --git a/src/main/java/gov/usgs/earthquake/nats/NATSStreamingNotificationSender.java b/src/main/java/gov/usgs/earthquake/nats/NATSStreamingNotificationSender.java
index 94acc388..2cd3381a 100644
--- a/src/main/java/gov/usgs/earthquake/nats/NATSStreamingNotificationSender.java
+++ b/src/main/java/gov/usgs/earthquake/nats/NATSStreamingNotificationSender.java
@@ -3,7 +3,6 @@
import gov.usgs.earthquake.distribution.*;
import gov.usgs.util.Config;
-import javax.security.auth.login.Configuration;
import java.util.logging.Level;
import java.util.logging.Logger;
diff --git a/src/main/java/gov/usgs/util/ProcessManager.java b/src/main/java/gov/usgs/util/ProcessManager.java
new file mode 100644
index 00000000..87975abf
--- /dev/null
+++ b/src/main/java/gov/usgs/util/ProcessManager.java
@@ -0,0 +1,104 @@
+package gov.usgs.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+
+public class ProcessManager implements Callable {
+
+ public String command;
+ public long timeout;
+ // streams to read/write
+ public InputStream stdin = null;
+ public ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ public ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+ // track result
+ public boolean timedOut = false;
+ public List errors = new ArrayList();
+
+ public ProcessManager(final String command, final long timeout) {
+ this.command = command;
+ this.timeout = timeout;
+ }
+
+ public Integer call() {
+ final Process process;
+ // start process
+ try {
+ process = Runtime.getRuntime().exec(this.command);
+ } catch (IOException e) {
+ errors.add(e);
+ return -1;
+ }
+ // schedule timeout
+ Timer commandTimer = new Timer();
+ if (this.timeout > 0) {
+ commandTimer.schedule(new TimerTask() {
+ public void run() {
+ process.destroy();
+ }
+ }, this.timeout);
+ }
+ // io threads
+ final Thread stdinThread = new Thread(() -> {
+ if (this.stdin != null) {
+ try (final OutputStream out = process.getOutputStream()) {
+ StreamUtils.transferStream(this.stdin, out);
+ } catch (Exception e) {
+ this.errors.add(e);
+ }
+ }
+ });
+ final Thread stdoutThread = new Thread(() -> {
+ try (final InputStream in = process.getInputStream()) {
+ StreamUtils.transferStream(in, this.stdout);
+ } catch (Exception e) {
+ this.errors.add(e);
+ }
+ });
+ final Thread stderrThread = new Thread(() -> {
+ try (final InputStream in = process.getErrorStream()) {
+ StreamUtils.transferStream(in, this.stderr);
+ } catch (Exception e) {
+ this.errors.add(e);
+ }
+ });
+ // start io threads
+ stdinThread.start();
+ stdoutThread.start();
+ stderrThread.start();
+ // Wait for process to complete
+ try {
+ process.waitFor();
+ } catch (Exception e) {
+ errors.add(e);
+ }
+ // Cancel the timer if it was not triggered
+ commandTimer.cancel();
+ // join io threads
+ try {
+ stdinThread.join();
+ } catch (Exception e) {
+ errors.add(e);
+ }
+ try {
+ stdoutThread.join();
+ } catch (Exception e) {
+ errors.add(e);
+ }
+ try {
+ stderrThread.join();
+ } catch (Exception e) {
+ errors.add(e);
+ }
+ // return exit code
+ return process.exitValue();
+ }
+
+}
diff --git a/src/test/java/gov/usgs/earthquake/distribution/HeartbeatListenerTest.java b/src/test/java/gov/usgs/earthquake/distribution/HeartbeatListenerTest.java
index e71fed75..836c765e 100644
--- a/src/test/java/gov/usgs/earthquake/distribution/HeartbeatListenerTest.java
+++ b/src/test/java/gov/usgs/earthquake/distribution/HeartbeatListenerTest.java
@@ -1,6 +1,5 @@
package gov.usgs.earthquake.distribution;
-import gov.usgs.earthquake.distribution.HeartbeatListener;
import gov.usgs.util.Config;
import org.junit.Assert;
import org.junit.Test;
@@ -10,13 +9,13 @@ public class HeartbeatListenerTest {
/**
* JSON Heartbeat Listener test should pass
- *
+ *
* @throws Exception
*/
@Test
public void testJSONHeartbeatListener() throws Exception {
HeartbeatListener objListener;
-
+
try {
Config config = new Config();
config.setProperty("heartbeatFilename", "heartbeat.dat");
@@ -54,13 +53,13 @@ public void testJSONHeartbeatListener() throws Exception {
/**
* Write Heartbeat File test should pass
- *
+ *
* @throws Exception
*/
@Test
public void testWriteHeartbeat() throws Exception {
HeartbeatListener objListener;
-
+
try {
Config config = new Config();
@@ -83,7 +82,7 @@ public void testWriteHeartbeat() throws Exception {
"value42");
HeartbeatListener.sendHeartbeatMessage("TestComponent4", "key43",
"value43");
-
+
objListener.writeHeartbeat();
} catch (Exception e) {
@@ -94,13 +93,13 @@ public void testWriteHeartbeat() throws Exception {
/**
* Manual clear of heartbeat data test should pass
- *
+ *
* @throws Exception
*/
@Test
public void testManualClearHeartbeat() throws Exception {
HeartbeatListener objListener;
-
+
long delayMillSec = 1000L;
long delayTimeEnd = System.currentTimeMillis() + delayMillSec;
@@ -145,13 +144,13 @@ public void testManualClearHeartbeat() throws Exception {
/**
* Automatic clear of heartbeat data test should pass
- *
+ *
* @throws Exception
*/
@Test
public void testAutoClearHeartbeat() throws Exception {
HeartbeatListener objListener;
-
+
long delayMillSec = 5000L;
long delayTimeEnd = System.currentTimeMillis() + delayMillSec;
diff --git a/src/test/java/gov/usgs/earthquake/distribution/ListenerBackupTest.java b/src/test/java/gov/usgs/earthquake/distribution/ListenerBackupTest.java
index 3c39b517..e1b94529 100644
--- a/src/test/java/gov/usgs/earthquake/distribution/ListenerBackupTest.java
+++ b/src/test/java/gov/usgs/earthquake/distribution/ListenerBackupTest.java
@@ -19,8 +19,6 @@
import gov.usgs.earthquake.product.io.IOUtil;
import gov.usgs.earthquake.product.io.ObjectProductHandler;
import gov.usgs.earthquake.product.io.ObjectProductSource;
-import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
-import gov.usgs.earthquake.distribution.URLNotification;
import gov.usgs.util.FileUtils;
import gov.usgs.util.SocketAcceptor;
import gov.usgs.util.SocketListenerInterface;
@@ -230,7 +228,7 @@ public Product getProduct() {
/**
* Modified so onNotification sleeps longer than the timeout.
- *
+ *
* This led to a synchronization issue with ExecutorTask, and this test
* hangs when ExecutorTask remains synchronized... Without
* synchronzation on ExecutorTask, this works as expected.
diff --git a/src/test/java/gov/usgs/earthquake/distribution/WebSocketNotificationReceiverTest.java b/src/test/java/gov/usgs/earthquake/distribution/WebSocketNotificationReceiverTest.java
index 9cc9bf12..c1aad102 100644
--- a/src/test/java/gov/usgs/earthquake/distribution/WebSocketNotificationReceiverTest.java
+++ b/src/test/java/gov/usgs/earthquake/distribution/WebSocketNotificationReceiverTest.java
@@ -6,7 +6,6 @@
import javax.json.Json;
import javax.json.JsonObject;
-import java.util.Date;
public class WebSocketNotificationReceiverTest {
diff --git a/src/test/java/gov/usgs/earthquake/shakemap/GridXMLHandlerTest.java b/src/test/java/gov/usgs/earthquake/shakemap/GridXMLHandlerTest.java
index 85cefac5..36d2994d 100644
--- a/src/test/java/gov/usgs/earthquake/shakemap/GridXMLHandlerTest.java
+++ b/src/test/java/gov/usgs/earthquake/shakemap/GridXMLHandlerTest.java
@@ -8,12 +8,11 @@
import org.junit.Test;
import gov.usgs.util.StreamUtils;
-import gov.usgs.earthquake.shakemap.GridXMLHandler;
import org.junit.Before;
public class GridXMLHandlerTest {
-
+
private GridXMLHandler module = null;
@Before