From b04569d32c49fd86af3b0a724889f2215c775145 Mon Sep 17 00:00:00 2001 From: "Punnoose, Roshan" Date: Fri, 3 Jan 2014 14:55:00 -0500 Subject: [PATCH 1/4] Track the running supervisors Keep track of the running supervisors within the StormAMRMClient. Make use of this list and the yarn blacklist feature to ensure that multiple supervisors do not allocate on the same node. --- .../com/yahoo/storm/yarn/MasterServer.java | 13 +- .../com/yahoo/storm/yarn/StormAMRMClient.java | 178 +++++++++++++----- 2 files changed, 143 insertions(+), 48 deletions(-) diff --git a/src/main/java/com/yahoo/storm/yarn/MasterServer.java b/src/main/java/com/yahoo/storm/yarn/MasterServer.java index 27b8cae..55b1e32 100644 --- a/src/main/java/com/yahoo/storm/yarn/MasterServer.java +++ b/src/main/java/com/yahoo/storm/yarn/MasterServer.java @@ -80,10 +80,16 @@ public void run() { if (allocatedContainers.size() > 0) { // Add newly allocated containers to the client. LOG.info("HB: Received allocated containers (" + allocatedContainers.size() + ")"); - client.addAllocatedContainers(allocatedContainers); if (client.supervisorsAreToRun()) { LOG.info("HB: Supervisors are to run, so queueing (" + allocatedContainers.size() + ") containers..."); - launcherQueue.addAll(allocatedContainers); + for(Container allocatedContainer : allocatedContainers) { + if(client.addAllocatedContainer(allocatedContainer)){ + if(LOG.isDebugEnabled()) { + LOG.debug("HB: Queuing supervisor container["+allocatedContainer+"]"); + } + launcherQueue.addAll(allocatedContainers); + } + } } else { LOG.info("HB: Supervisors are to stop, so releasing all containers..."); client.stopAllSupervisors(); @@ -95,6 +101,9 @@ public void run() { if (completedContainers.size() > 0 && client.supervisorsAreToRun()) { LOG.debug("HB: Containers completed (" + completedContainers.size() + "), so releasing them."); + for(ContainerStatus containerStatus : completedContainers) { + client.stopSupervisors(containerStatus.getContainerId()); + } client.startAllSupervisors(); } diff --git a/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java b/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java index 1a357ec..195f379 100644 --- a/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java +++ b/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java @@ -16,43 +16,31 @@ package com.yahoo.storm.yarn; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.security.UserGroupInformation; +import backtype.storm.utils.Utils; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.Records; - +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; - +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.utils.Utils; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; class StormAMRMClient extends AMRMClientImpl { private static final Logger LOG = LoggerFactory.getLogger(StormAMRMClient.class); @@ -61,7 +49,7 @@ class StormAMRMClient extends AMRMClientImpl { private final Map storm_conf; private final YarnConfiguration hadoopConf; private final Priority DEFAULT_PRIORITY = Records.newRecord(Priority.class); - private final Set containers; + private final BiMap runningSupervisors; private volatile boolean supervisorsAreToRun = false; private AtomicInteger numSupervisors; private Resource maxResourceCapability; @@ -76,8 +64,9 @@ public StormAMRMClient(ApplicationAttemptId appAttemptId, this.hadoopConf = hadoopConf; Integer pri = Utils.getInt(storm_conf.get(Config.MASTER_CONTAINER_PRIORITY)); this.DEFAULT_PRIORITY.setPriority(pri); - this.containers = new TreeSet(); numSupervisors = new AtomicInteger(0); + runningSupervisors = Maps.synchronizedBiMap(HashBiMap.create()); // start am nm client nmClient = (NMClientImpl) NMClient.createNMClient(); @@ -90,7 +79,33 @@ public synchronized void startAllSupervisors() { this.supervisorsAreToRun = true; this.addSupervisorsRequest(); } - + + /** + * Stopping a supervisor by {@link NodeId} + * @param nodeIds + */ + public synchronized void stopSupervisors(NodeId... nodeIds) { + if(LOG.isDebugEnabled()){ + LOG.debug( + "Stopping supervisors at nodes[" + Arrays.toString(nodeIds) + "], " + + "releasing all containers."); + } + releaseSupervisors(nodeIds); + } + + /** + * Need to be able to stop a supervisor by {@link ContainerId} + * @param containerIds supervisor containers to stop + */ + public synchronized void stopSupervisors(ContainerId... containerIds) { + if(LOG.isDebugEnabled()){ + LOG.debug("Stopping supervisors in containers[" + + Arrays.toString(containerIds) + "], " + + "releasing all containers."); + } + releaseSupervisors(containerIds); + } + public synchronized void stopAllSupervisors() { LOG.debug("Stopping all supervisors, releasing all containers..."); this.supervisorsAreToRun = false; @@ -107,29 +122,100 @@ private void addSupervisorsRequest() { super.addContainerRequest(req); } } - - public synchronized boolean addAllocatedContainers(List containers) { - for (int i=0; i it = this.containers.iterator(); - ContainerId id; - while (it.hasNext()) { - id = it.next().getId(); - LOG.debug("Releasing container (id:"+id+")"); - releaseAssignedContainer(id); - it.remove(); + Set nodeIds = runningSupervisors.keySet(); + this.releaseSupervisors(nodeIds.toArray(new NodeId[nodeIds.size()])); + } + + /** + * This is the main entry point to release a supervisor. + * @param nodeIds + */ + private synchronized void releaseSupervisors(NodeId... nodeIds) { + for(NodeId nodeId : nodeIds) { + //remove from running supervisors list + ContainerId containerId = removeRunningSupervisor(nodeId); + if(containerId != null) { + LOG.debug("Releasing container (id:"+containerId+")"); + //release the containers on the specified nodes + super.releaseAssignedContainer(containerId); + //increase the number of supervisors to request on the next heartbeat + numSupervisors.incrementAndGet(); + } } } - + + private synchronized void releaseSupervisors(ContainerId... containerIds) { + BiMap inverse = runningSupervisors.inverse(); + for(ContainerId containerId : containerIds) { + NodeId nodeId = inverse.get(containerId); + if(nodeId != null) { + this.releaseSupervisors(nodeId); + } + } + } + public synchronized boolean supervisorsAreToRun() { return this.supervisorsAreToRun; } @@ -208,6 +294,6 @@ else if (vis.equals("APPLICATION")) public void setMaxResource(Resource maximumResourceCapability) { this.maxResourceCapability = maximumResourceCapability; - LOG.info("Max Capability is now "+this.maxResourceCapability); + LOG.info("Max Capability is now " + this.maxResourceCapability); } } From 02239a27c5e72d3f9421cf3c3fe0fb6b4f05b94a Mon Sep 17 00:00:00 2001 From: "Punnoose, Roshan" Date: Fri, 3 Jan 2014 15:29:39 -0500 Subject: [PATCH 2/4] configure allocation of supervisor resource The supervisor resource size is now configurable through the storm-yarn configuration --- .../java/com/yahoo/storm/yarn/Config.java | 7 +++++- .../com/yahoo/storm/yarn/MasterServer.java | 2 -- .../com/yahoo/storm/yarn/StormAMRMClient.java | 22 ++++++++-------- src/main/java/com/yahoo/storm/yarn/Util.java | 25 +++++++++++++++++++ 4 files changed, 43 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/yahoo/storm/yarn/Config.java b/src/main/java/com/yahoo/storm/yarn/Config.java index 0d1d6df..766eb6d 100644 --- a/src/main/java/com/yahoo/storm/yarn/Config.java +++ b/src/main/java/com/yahoo/storm/yarn/Config.java @@ -36,7 +36,12 @@ public class Config { //# of milliseconds to wait for YARN report on Storm Master host/port final public static String YARN_REPORT_WAIT_MILLIS = "yarn.report.wait.millis"; final public static String MASTER_HEARTBEAT_INTERVAL_MILLIS = "master.heartbeat.interval.millis"; - + + //size of the supervisor to request in yarn. This includes the supervisor + // and workers + final public static String SUPERVISOR_SIZE_MB = "supervisor.container.size-mb"; + final public static int DEFAULT_SUPERVISOR_SIZE = 8192; + @SuppressWarnings("rawtypes") static public Map readStormConfig() { return readStormConfig(null); diff --git a/src/main/java/com/yahoo/storm/yarn/MasterServer.java b/src/main/java/com/yahoo/storm/yarn/MasterServer.java index 55b1e32..3c89c56 100644 --- a/src/main/java/com/yahoo/storm/yarn/MasterServer.java +++ b/src/main/java/com/yahoo/storm/yarn/MasterServer.java @@ -171,8 +171,6 @@ public static void main(String[] args) throws Exception { RegisterApplicationMasterResponse resp = rmClient.registerApplicationMaster(addr.getHostName(), port, null); LOG.info("Got a registration response "+resp); - LOG.info("Max Capability "+resp.getMaximumResourceCapability()); - rmClient.setMaxResource(resp.getMaximumResourceCapability()); LOG.info("Starting HB thread"); server.initAndStartHeartbeat(rmClient, launcherQueue, (Integer) storm_conf diff --git a/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java b/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java index 195f379..cb9c01d 100644 --- a/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java +++ b/src/main/java/com/yahoo/storm/yarn/StormAMRMClient.java @@ -50,9 +50,9 @@ class StormAMRMClient extends AMRMClientImpl { private final YarnConfiguration hadoopConf; private final Priority DEFAULT_PRIORITY = Records.newRecord(Priority.class); private final BiMap runningSupervisors; + private final Resource supervisorResource; private volatile boolean supervisorsAreToRun = false; private AtomicInteger numSupervisors; - private Resource maxResourceCapability; private ApplicationAttemptId appAttemptId; private NMClientImpl nmClient; @@ -72,6 +72,14 @@ public StormAMRMClient(ApplicationAttemptId appAttemptId, nmClient = (NMClientImpl) NMClient.createNMClient(); nmClient.init(hadoopConf); nmClient.start(); + + //get number of slots for supervisor + int numWorkersPerSupervisor = Util.getNumWorkers(storm_conf); + int supervisorSizeMB = Util.getSupervisorSizeMB(storm_conf); + //add 1 for the supervisor itself + supervisorResource = + Resource.newInstance(supervisorSizeMB, numWorkersPerSupervisor + 1); + LOG.info("Supervisors will allocate Yarn Resource["+supervisorResource+"]"); } public synchronized void startAllSupervisors() { @@ -115,7 +123,7 @@ public synchronized void stopAllSupervisors() { private void addSupervisorsRequest() { int num = numSupervisors.getAndSet(0); for (int i=0; i Date: Fri, 3 Jan 2014 16:54:31 -0500 Subject: [PATCH 3/4] Upgrade to Hadoop 2.2.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0992424..4824136 100644 --- a/pom.xml +++ b/pom.xml @@ -63,7 +63,7 @@ 0.9.0-wip21 - 2.1.0-beta + 2.2.0 From a9c6647326fbad6f70d41faa4ad8929121a9224e Mon Sep 17 00:00:00 2001 From: "Punnoose, Roshan" Date: Fri, 3 Jan 2014 17:00:26 -0500 Subject: [PATCH 4/4] lower default supervisor container size --- src/main/java/com/yahoo/storm/yarn/Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/yahoo/storm/yarn/Config.java b/src/main/java/com/yahoo/storm/yarn/Config.java index 766eb6d..f69e38e 100644 --- a/src/main/java/com/yahoo/storm/yarn/Config.java +++ b/src/main/java/com/yahoo/storm/yarn/Config.java @@ -40,7 +40,7 @@ public class Config { //size of the supervisor to request in yarn. This includes the supervisor // and workers final public static String SUPERVISOR_SIZE_MB = "supervisor.container.size-mb"; - final public static int DEFAULT_SUPERVISOR_SIZE = 8192; + final public static int DEFAULT_SUPERVISOR_SIZE = 1024; @SuppressWarnings("rawtypes") static public Map readStormConfig() {