Skip to content
gtoonstra edited this page Jun 24, 2015 · 7 revisions

##File system

All files to remap appear local. If a file mount is actually a distributed file system, then remap is not aware of that. It is up to the dfs implementation to figure out replication and other issues.

The suggested dfs (distributed file system) for remap is cephfs.

You are not obliged to use a dfs. You can just run remap on a local computer instead and use a local hard disk for datasets up to 500GB or so.

Alternatives for small clusters of 3-5 nodes or so are nfs, samba and such clients. Then you'd probably map /remote/data and /remote/im to separate nodes. (see bottom page: Design)

##Messaging

To solve issues like starting nodes, etc., remap uses a pub/sub messaging system. The messaging library used here is "nanomsg". All message data is in json with a string message prefix explained below. nanomsg and the use of basic strings allows any future components in different languages to be added without compiling in specific libraries.

Messages are communicated in different scopes. So there is compartmentalization of where messages go. This is determined by the boundaries where the infrastructure manager places message brokers. All apps running remap connect to one broker and only one broker. Bonjour is used to advertise the location of the broker on a local network.

Brokers have a list of the senderid's of cores on their local network.

Message topics have two different templates. One follows a specific naming convention that controls their scope, msgtype and sender id

[scope].[msgtype].[senderid] {data}

scope = either global, notlocal, local; 
* global = all nodes
* notlocal = all nodes except local
* local = only local nodes
msgtype = the name of the message
senderid = self explanatory
data = a dictionary of elements 

The other sends messages from one unidentified endpoint to a known endpoint directly, intended for informal communications where sender and recipient do not yet know each other.

[recipientid].[msgtype].[senderid] {data}

recipientid = the id of the intended receiver for the message
msgtype = name of the message
senderid = the id of the sender

This allows brokers to verify if a core exists on their local network by inspecting the recipient id. If it does, then the message is routed only there and does not get transferred to other brokers.

The mechanism between brokers is another pub/sub gateway. Each broker subscribes on behalf of its clients to messages (recipients) with other brokers. So the broker maintains the list of active clients on its own local network and just sends the message to all other brokers it knows if the recipient id is not known locally.

This way the broker can simply publish the message with a recipient to that pub/sub bus and forget about it. The message will only go to the broker that subscribed with that recipient id. This means the following:

  • cores that communicate informally can find each other without the recipient having to deal with thousands of connection requests (connection storm)
  • this protocol is something like a survey or an action where someone wants to see a show of hands. More formal communication hereafter uses different protocols.

Every broker has 2 pub/sub sets of sockets, one to talk to its local clients and another to talk to other brokers on the network. This means that the only thing to be configured is the broker. It also makes the broker a single point of failure, but greatly reduces complexity of the system. There are some failover possibilities here using bonjour networking. If another bonjour record comes online, pub/sub connections can be retargeted at the new server.

How an app initiator starts a new run:

  • initiator sends out a 'global' message to query for available cores on the network. This returns two sets:"cores available immediately" and "cores that need to be killed first". The second list only has cores if the priority level is lower on a node.
  • every potential core receives the message and figures out if it can serve the request. If cores are busy and of a higher priority, the node returns nothing.
  • with the data the initiator has now, the initiator figures out a plan for the job by committing resources. Note that these committed resources can change throughout the lifetime of the plan.
  • the initiator sends a task description to each node with the work to do.
  • the nodes start the cores, the cores grab a unit of work and start executing.
  • the initiator waits for results to drop in and keeps track of finished work.
  • the initiator starts the reduce jobs after all mapper jobs have finished.

##Nodes

Nodes manage cores. Each node has a 'bus' connection with all possible running cores. The node figures out which broker to connect, through bonjour and manages the actual network traffic with a bit of local rerouting and snooping of data.

##Cores

Cores provide actual mapping or reduce work. They only connect to the node daemon. If anything happens, cores just disappear.

##Example in code

This is a standalone MapReduce algorithm in python, written without any distributed infrastructure, that demonstrates what remap is trying to achieve at a larger scale:

https://github.com/gtoonstra/remap/blob/master/dev/basic_elements.py

This code eventually reduces to something like this, for simple text files. More complex file formats of course inflate the code:

https://github.com/gtoonstra/remap/blob/master/examples/wordcount/wordcount.py

Clone this wiki locally