-
Notifications
You must be signed in to change notification settings - Fork 0
Persistent Evaluators Tutorial
This is the third REEF tutorial in series. The first two sections are here:
In this tutorial we will write a REEF application very similar to the Distributed Shell example, but with a twist: our Driver will hold on to the Evaluators and run multiple Tasks on each. We will also demonstrate how to perform the communication between the Client and the Driver, and the Driver and the Evaluators.
The code for the application is in the REEF/reef-examples directory.
The application starts at the
Launch.main()
method. The code is very similar to the one of the Distributed Shell
Tutorial: most of it is about building the right
TANG configuration for a REEF client.
To that end, the launcher creates a ClientConfiguration object, and also
builds a runtime configuration either for YARN or for the local environment:
private static Configuration getClientConfiguration(final String[] args)
throws BindException, InjectionException, IOException {
final Configuration commandLineConf = parseCommandLine(args);
final Configuration clientConfiguration = ClientConfiguration.CONF
.set(ClientConfiguration.JOB_OBSERVER, JobClient.class)
.set(ClientConfiguration.RUNTIME_ERROR_HANDLER, JobClient.class)
.build();
// TODO: Remove the injector, have stuff injected.
final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
final Configuration runtimeConfiguration;
if (isLocal) {
LOG.log(Level.INFO, "Running on the local runtime");
runtimeConfiguration = LocalRuntimeConfiguration.CONF
.set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
.build();
} else {
LOG.log(Level.INFO, "Running on YARN");
runtimeConfiguration = YarnClientConfiguration.CONF
.set(YarnClientConfiguration.REEF_JAR_FILE, EnvironmentUtils.getClassLocationFile(REEF.class))
.build();
}
return Tang.Factory.getTang()
.newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
cloneCommandLineConfiguration(commandLineConf))
.build();
}The resulting Configuration can be used in TANG to instantiate the Client:
public static void main(final String[] args) {
try {
final Configuration config = getClientConfiguration(args);
LOG.log(Level.INFO, "Configuration:\n--\n{0}--",
ConfigurationFile.toConfigurationString(config));
final Injector injector = Tang.Factory.getTang().newInjector(config);
final JobClient client = injector.getInstance(JobClient.class);
client.submit();
client.waitForCompletion();
LOG.info("Done!");
} catch (final BindException | InjectionException | IOException ex) {
LOG.log(Level.SEVERE, "Job configuration error", ex);
}
}Persistent Evaluator client JobClient runs on the client side and
communicates with the Driver part of the application. Here is the Launcher code
again that builds the Persistent Evaluator Configuration:
final Configuration clientConfiguration = ClientConfiguration.CONF
.set(ClientConfiguration.JOB_OBSERVER, JobClient.class)
.set(ClientConfiguration.RUNTIME_ERROR_HANDLER, JobClient.class)
.build();The Client constructs configuration for the Driver:
driverConfiguration =
EnvironmentUtils.addClasspath(DriverConfiguration.CONF, DriverConfiguration.GLOBAL_LIBRARIES)
.set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis())
.set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
.set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
.set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
.set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
.set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
.set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
.set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class)
.set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
.set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class)
.build();...and submits it to the REEF framework:
public void submit() {
this.reef.submit(this.driverConfiguration);
}