diff --git a/src/main/java/com/yahoo/storm/yarn/StormMasterServerHandler.java b/src/main/java/com/yahoo/storm/yarn/StormMasterServerHandler.java index 37b07d6..3eae371 100644 --- a/src/main/java/com/yahoo/storm/yarn/StormMasterServerHandler.java +++ b/src/main/java/com/yahoo/storm/yarn/StormMasterServerHandler.java @@ -21,7 +21,9 @@ import java.net.UnknownHostException; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import backtype.storm.utils.Utils; import org.apache.thrift7.TException; import org.json.simple.JSONValue; import org.slf4j.Logger; @@ -38,12 +40,14 @@ public class StormMasterServerHandler implements StormMaster.Iface { Map _storm_conf; StormAMRMClient _client; MasterServer _masterServer; + AtomicInteger curNumNeedSupervisors; StormMasterServerHandler(@SuppressWarnings("rawtypes") Map storm_conf, StormAMRMClient client) { _storm_conf = storm_conf; setStormHostConf(); Util.rmNulls(_storm_conf); _client = client; + curNumNeedSupervisors = new AtomicInteger(0); } void init(MasterServer masterServer) { @@ -102,6 +106,7 @@ public void setStormConf(String storm_conf) throws TException { @Override public void addSupervisors(int number) throws TException { LOG.info("adding "+number+" supervisors..."); + curNumNeedSupervisors.addAndGet(number); _client.addSupervisors(number); } @@ -215,7 +220,10 @@ public void stopUI() throws TException { @Override public void startSupervisors() throws TException { LOG.info("starting supervisors..."); - _client.startAllSupervisors(); + if (!_client.supervisorsAreToRun()) { + _client.addSupervisors(curNumNeedSupervisors.get()); + _client.startAllSupervisors(); + } } @Override