-
Notifications
You must be signed in to change notification settings - Fork 0
Retained Evaluators Tutorial
REEF allows applications to reuse resources, by retaining Evaluators. This tutorial covers how to retain Evaluators through an example distributed shell, which runs each submitted user command across specified nodes. The Driver runs each command as new Tasks on existing Evaluators, thus avoiding the overhead of creating new Evaluators for each command. This tutorial also covers how to perform the communication between REEF's components: between the Client and the Driver, and the Driver and the Evaluators.
You can find the code in the reef-examples directory.
##Prerequisite Writing Hello REEF application : You can learn about the basic components(Task, Driver, Handler, ...) from the tutorial.
##Launch
Launch is the entry point of this application, which starts at Launch.main() method. Most of this class is about building the TANG configuration for a REEF client.
The application retrieves these arguments in execution time.
- Command : The shell command
- NumRuns : Number of times to run the command
- NumEval : Number of evaluators to request
- Local : Whether or not to run on the local runtime
The arguments are injected via TANG.
@NamedParameter(doc = "The shell command", short_name = "cmd", default_value = "*INTERACTIVE*")
public static final class Command implements Name<String> {
}
We make a class for each argument. This class has an empty body and just implements Name<T> interface, where T is the type of argument. Therefore, NumRuns implements Name<Integer> and NumLocal implements Name<Boolean>.
@NamedParameter annotation provides more information to TANG. By this annotation, you can set the default value of the argument and give some useful information for documentation.
In getClientConfiguration() method, it merges runtimeConfiguration, clientConfiguration and commandLineConf.
return Tang.Factory.getTang()
.newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
cloneCommandLineConfiguration(commandLineConf))
.build();
-
runtimeConfigurationdetermines whether this application is executed under the Local or Yarn Configuration.final Configuration runtimeConfiguration; if (isLocal) { LOG.log(Level.INFO, "Running on the local runtime"); runtimeConfiguration = LocalRuntimeConfiguration.CONF .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS) .build(); } else { LOG.log(Level.INFO, "Running on YARN"); runtimeConfiguration = YarnClientConfiguration.CONF.build(); } -
clientConfigurationcontains the handlers for each event related to a job driver.final Configuration clientConfiguration = ClientConfiguration.CONF .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class) .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class) .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class) .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class) .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class) .build(); -
commandLineConfighas the command line arguments and these would be handed to the client by injection.final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); cb.bindNamedParameter(Command.class, injector.getNamedInstance(Command.class)); cb.bindNamedParameter(NumRuns.class, String.valueOf(injector.getNamedInstance(NumRuns.class))); cb.bindNamedParameter(NumEval.class, String.valueOf(injector.getNamedInstance(NumEval.class))); return cb.build();
Client launches the job driver and submits tasks to the driver.
In the previous chapter Launch, we bound the arguments to the ClientConfiguration. Here we inject the parameters in the constructor of JobClient.
@Inject
JobClient(final REEF reef,
@Parameter(Launch.Command.class) final String command,
@Parameter(Launch.NumRuns.class) final Integer numRuns,
@Parameter(Launch.NumEval.class) final Integer numEvaluators) throws BindException {
Setting the Driver configuration is almost same with how we saw in Writing Hello REEF tutorial. (See prerequisite)
Commands are submitted by submitTask() method. Later we will see where the Driver receives this event.
private synchronized void submitTask(final String cmd) {
...
this.runningJob.send(CODEC.encode(cmd));
...
}
Shell Task simply executes the shell command given, and return the result back to the driver.
This task initializes a Process to execute the command, and collects the result as a String until there is no more line.
final Process proc = Runtime.getRuntime().exec(cmd);
try (final BufferedReader input =
new BufferedReader(new InputStreamReader(proc.getInputStream()))) {
String line;
while ((line = input.readLine()) != null) {
sb.append(line).append('\n');
}
}
The result is returned as a byte array encoded by ObjectSerializableCodec
final ObjectSerializableCodec<String> codec = new ObjectSerializableCodec<>();
return codec.encode(sb.toString());
JobDriver is the implementation of the Driver. It transitions between four states:
-
INIT: Initial state, ready to request the Evaluators. -
WAIT_EVALUATORS: Wait for each Evaluator to be allocated and its Context activated. -
READY: Ready to submit a new Task. -
WAIT_TASKS: Wait for Tasks to terminate.
On startup (INIT state), the Driver starts the process of setting up the Evaluators (WAIT_EVALUATORS state). The Driver requests that all Evaluators be allocated. Once allocated, the Driver activates a Context on each allocated Evaluator. When this Evaluator setup is complete (READY state) the next user command is submitted as a Task to be run on the Active Context at each Evaluator. The Driver waits for all Tasks to successfully complete (WAIT_TASKS state). When all Tasks are completed, the Driver returns the results to the Client and is then ready to submit the next Task (READY state), again via the Active Context.
More details on this process are provided below.
Here is the constructor of JobDriver
@Inject
JobDriver(final JobMessageObserver jobMessageObserver,
final EvaluatorRequestor evaluatorRequestor,
final @Parameter(Launch.NumEval.class) Integer numEvaluators) {
this.jobMessageObserver = jobMessageObserver;
this.evaluatorRequestor = evaluatorRequestor;
this.numEvaluators = numEvaluators;
}
-
EvaluatorRequestorsubmits a request for the evaluators when the driver starts. (SeeStartHandler) -
JobMessageObserveris an object used to send results from the driver back to the client. We are going to see how it works below.
The set of running ActiveContexts is stored in JobDriver.
private final Map<String, ActiveContext> contexts = new HashMap<>();
On the ActiveContext event, the driver puts the newly activated context into this mapping.
public void onNext(final ActiveContext context) {
...
JobDriver.this.contexts.put(context.getId(), context);
...
}
To reuse the running Evaluators, the Driver submits a new command Task via the stored ActiveContext object. The Task submitted in this way runs reuses the Evaluator under its current Context. This is in contrast to the HelloREEF example, which runs a single Task on an Evaluator by submitting it together with a new Context to the AllocatedEvaluator.
private void submit(final String command) {
...
assert (this.state == State.READY);
...
for (final ActiveContext context : this.contexts.values()) {
this.submit(context, command);
}
}
private void submit(final ActiveContext context, final String command) {
...
cb.bindNamedParameter(Launch.Command.class, command);
context.submitTask(cb.build());
...
}
If a new Evaluator is allocated, the Driver submits a new Context in AllocatedEvaluatorHandler.
final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
@Override
public void onNext(final AllocatedEvaluator eval) {
...
eval.submitContext(ContextConfiguration.CONF.set(
ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build());
...
}
When an Evaluator has failed, the Driver receives an event of FailedEvaluator. It removes the failed entries from the active context map, and throws an Exception.
final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
@Override
public void onNext(final FailedEvaluator eval) {
...
for (final FailedContext failedContext : eval.getFailedContextList()) {
JobDriver.this.contexts.remove(failedContext.getId());
}
throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
...
}
The Driver handles the message from the Client via ClientMessageHandler. It receives the command to execute from the Client.
final class ClientMessageHandler implements EventHandler<byte[]> {
@Override
public void onNext(final byte[] message) {
synchronized (JobDriver.this) {
final String command = CODEC.decode(message);
...
if (JobDriver.this.state == State.READY) {
JobDriver.this.submit(command);
} else {
// not ready yet - save the command for better times.
assert (JobDriver.this.state == State.WAIT_EVALUATORS);
JobDriver.this.cmd = command;
}
...
}
When all Tasks are completed, the Driver reports back the results to the Client, using returnResults(). It invokes sendMessageToClient() method of jobMessageObserver, which sends a message to the Client.
private void returnResults() {
final StringBuilder sb = new StringBuilder();
for (final String result : this.results) {
sb.append(result);
}
this.results.clear();
LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
this.jobMessageObserver.sendMessageToClient(CODEC.encode(sb.toString()));
}