-
Notifications
You must be signed in to change notification settings - Fork 0
Group Communication Tutorial
REEF provides a group communication library, GroupOperators, that applications can use to configure various patterns of group communication. In the Matrix Multiplication demo, the Tasks make use of the Scatter, Broadcast, and Reduce communication patterns. This tutorial covers how the communication is setup and used in the demo.
Writing Hello REEF application : You can learn about the basic components(Task, Driver, Handler, ...) from the tutorial.
The example distributes partial matrices and computes the multiplication, then gathers the calculated partial results and combines them. The Tasks that run the partial computations are called Compute Tasks. The Task that divides the matrix, then receives and combines the computed partial matrices is called the Controller Task. The communication between the Controller Task and Compute Tasks is configured using GroupOperators as detailed below.
The Driver first requests Evaluators to run the Compute and Controller Tasks. In MatMultDriver.java:
final class StartHandler implements EventHandler<StartTime> {
@Override
public void onNext(final StartTime startTime) {
...
MatMultDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
.setNumber(computeTasks + controllerTasks)
.setMemory(128)
.build());
}
...
}The default value of computeTasks is five and controllerTasks is one. So the Driver requests six Evaluator allocations.
The Driver is responsible for configuring Tasks for group communication and submitting the Tasks. MatMultDriver delegates this work to the TaskSubmitter. The Task Submitter assigns IDs to each Task, and then uses GroupOperators to define the group communication patterns based on these IDs. At the time of submission, GroupOperators builds the correct configuration for the Task, given its ID.
The Compute Tasks receive parts of the matrix for computation, so they should be initiated before the Controller Task. The driver ensures this order by keeping track of the Active Contexts (using TaskSubmitter) and the number of Running Tasks.
The Task Submitter sets up the Tasks for Scatter, Broadcast and Reduce operations using GroupOperators in the onNext() method. As a first step, it registers each Active Context in the NameService with a unique ID. The IDs are divided into computeTaskIds and controllerId.
public void onNext(Iterable<ActiveContext> contexts) {
...
for (ActiveContext context : contexts) {
if (runnEvalCnt != -1) {
...
final String hostAddr = context.getEvaluatorDescriptor().getNodeDescriptor()
.getInetSocketAddress().getHostName();
final int port = nsPorts.get(runnEvalCnt);
final ComparableIdentifier compTaskId = computeTaskIds.get(runnEvalCnt);
logger.log(Level.INFO, "Registering " + compTaskId + " with " + hostAddr + ":" + port);
nameService.register(compTaskId, new InetSocketAddress(hostAddr,
port));
...
} else {
...
nameService.register(controllerId, new InetSocketAddress(
hostAddr, controllerPort));
}
...
}
...
}Then the Group Operators are added, setting the computeTaskIds and controllerId as receiver or sender for each respective operator.
public void onNext(Iterable<ActiveContext> contexts) {
...
operators = new GroupOperators(VectorCodec.class, VectorConcat.class,
nameServiceAddr, nameServicePort, id2port);
// Fluent syntax for adding the group communication
// operators that are needed for the job
operators.addScatter().setSender(controllerId)
.setReceivers(computeTaskIds);
operators.addBroadCast().setSender(controllerId)
.setReceivers(computeTaskIds);
// Each operator can override the default setting
// Here the reduce function though same put it in
// to illustrate
operators.addReduce().setReceiver(controllerId)
.setSenders(computeTaskIds).setRedFuncClass(VectorConcat.class);
...Now that the operators are setup, the Driver sumbits the Compute Tasks. The group communication configuration is built by calling the GroupOperators method getConfig() for each Task ID that was registered in the NameService.
public void onNext(Iterable<ActiveContext> contexts) {
...
for (int i = 0; i < contextList.size(); i++) {
final ComparableIdentifier compTaskId = computeTaskIds.get(i);
contextList.get(i).submitTask(getComputeTaskConfig(compTaskId));
}
}
...
private Configuration getComputeTaskConfig(final ComparableIdentifier compTaskId) {
...
final JavaConfigurationBuilder b = Tang.Factory.getTang().newConfigurationBuilder();
b.addConfiguration(operators.getConfig(compTaskId));
...
}Once all Compute Tasks have launched, MatMultDriver calls the submitControlTask() method to submit the Controller Task. (The bookkeeping for keeping track of how many Compute Tasks have been launched is explained later.)
public void submitControlTask() {
try {
final JavaConfigurationBuilder b = Tang.Factory.getTang().newConfigurationBuilder();
b.addConfiguration(operators.getConfig(controllerId));
...
controlerContext.submitTask(b.build())
...
}
...ControllerTask uses Scatter.Sender, Broadcast.Sender and Reduce.Receiver for sending the matrices and reducing calculated results. These operators are injected into the constructor by TANG according to the configuration set by Task Submitter at the Driver:
@Inject
public ComputeTask(Scatter.Receiver<Vector> scatterReceiver,
Broadcast.Receiver<Vector> broadcastReceiver,
Reduce.Sender<Vector> reduceSender) {
super();
this.scatterReceiver = scatterReceiver;
this.broadcastReceiver = broadcastReceiver;
this.reduceSender = reduceSender;
}ControllerTask controls the matrix multiplication task splitting up the input matrix into parts(row-wise) and scatters them amongst the compute tasks and broadcasts each column vector and receives the reduced(concatenated) partial products as the output vector using these operators.
@Override
public byte[] call(byte[] memento) throws Exception {
// Scatter the matrix A
logger.log(Level.FINE, "Scattering A");
scatterSender.send(A);
logger.log(Level.FINE, "Finished Scattering A");
List<Vector> result = new ArrayList<>();
Vector sizeVec = new DenseVector(1);
sizeVec.set(0, (double) X.size());
// Broadcast the number of columns to be
// broadcasted
broadcastSender.send(sizeVec);
// Just use Iterable with a Matrix class
for (Vector x : X) {
// Broadcast each column
broadcastSender.send(x);
// Receive a concatenated vector of the
// partial sums computed by each computeTask
Vector Ax = reduceReceiver.reduce();
// Accumulate the result
result.add(Ax);
}
String resStr = resultString(A, X, result);
return resStr.getBytes();
}ComputeTask uses counterparts to the Controller Tasks to communicate: Scatter.Receiver, Broadcast.Receiver and Reduce.Sender. Again, the configuration is injected into the constructor by TANG:
@Inject
public ControllerTask(Scatter.Sender<Vector> scatterSender,
Broadcast.Sender<Vector> broadcastSender,
Reduce.Receiver<Vector> reduceReceiver) {
super();
this.scatterSender = scatterSender;
this.broadcastSender = broadcastSender;
this.reduceReceiver = reduceReceiver;
...
}With Scatter, ComputeTask receives the partial matrix(row partitioned) it is responsible for. With Broadcast, ComputeTask receives the number of columns, and then receives each column vector. The partial product of each column vector with its assigned partial matrix is computed. Each partial product is sent to the Controller Task using Reduce:
@Override
public byte[] call(byte[] memento) throws Exception {
// Receive the partial matrix using which
// we compute the dot products
logger.log(Level.FINE, "Waiting for scatterReceive");
List<Vector> partialA = scatterReceiver.receive();
logger.log(Level.FINE, "Received: " + partialA);
// Receive how many times we need to do the
// dot product
Vector sizeVec = broadcastReceiver.receive();
int size = (int) sizeVec.get(0);
for (int i = 0; i < size; i++) {
// Receive column vector
Vector x = broadcastReceiver.receive();
// Compute partial product Ax
Vector partialAx = computeAx(partialA, x);
// Send up the aggregation(concatenation) tree
// to the controller task
reduceSender.send(partialAx);
}
return null;
}MatMultDriver is implemented to wait for all Contexts to be activated before assigning IDs and defining the group communications. Similarly, the Driver waits for all Compute Tasks to be started before submitting the Controller Task. This section discusses the bookkeeping done at the Driver in these two situations.
ActiveContextHandler accumulates the Contexts that are created by making use of a BlockingEventHandler<ActiveContext>. In MatMultDriver.ActiveContextHandler:
final class ActiveContextHandler implements EventHandler<ActiveContext> {
@Override
public void onNext(final ActiveContext activeContext) {
...
contextAccumulator.onNext(activeContext);
}
}BlockingEventHandler(final int expectedSize, final EventHandler<Iterable<T>> destination) needs two variables when it is constructed. It executes destination.onNext() when the number of requested event is greater than or equal to the expectedSize. In this example, it fires TaskSubmitter.onNext to setup the group communication once all Contexts have been activated.
In order to make sure all Compute Tasks are running, an Atomic Integer compTasksRunning is incremented on each RunningTask event. When the number of running Tasks equals the number of Compute Tasks, the Controller Task is submitted:
final class RunningTaskHandler implements EventHandler<RunningTask> {
@Override
public final void onNext(final RunningTask task) {
LOG.log(Level.INFO, "Task \"{0}\" is running!", task.getId());
if (compTasksRunning.incrementAndGet() == computeTasks) {
// All compute tasks are running - launch controller task
taskSubmitter.submitControlTask();
}
}
}