Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Check out the the scripts in the simulations/ directory for some examples.
// Create 20 new peers and point them at the seed (usually this would happen in 20 separate processes)
// To prevent having a single point of failure you would probably have multiple seeds
for(var i = 9001; i <= 9020;i++) {
//For IPv6 peers use the format [ad:dre::ss]:port. e.g. [::1]:9000
var g = new Gossiper(i, ['127.0.0.1:9000']);
g.start();

Expand Down
55 changes: 35 additions & 20 deletions lib/gossiper.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ var Gossiper = exports.Gossiper = function(port, seeds, ip_to_bind) {
this.seeds = seeds;
this.my_state = new PeerState();
this.scuttle = new Scuttle(this.peers, this.my_state);

this.handleNewPeers(seeds);
}

util.inherits(Gossiper, EventEmitter);

Object.defineProperty(Gossiper.prototype, "peer_name", {
get: function() {
if(net.isIPv6(this.address)) {
return ['[' + this.address + ']', this.port.toString()].join(':');
}
return [this.address, this.port.toString()].join(':');
}
,enumerable: true
});

Gossiper.prototype.start = function(callback) {
var self = this;

Expand All @@ -32,9 +39,9 @@ Gossiper.prototype.start = function(callback) {

// Bind to ip/port
if(this.ip_to_bind) {
this.peer_name = [this.address, this.port.toString()].join(':');
this.my_state.address = this.address;
this.my_state.port = this.port;
this.peers[this.peer_name] = this.my_state;
this.my_state.name = this.peer_name;
this.server.listen(this.port, this.ip_to_bind, callback);
} else {
// this is an ugly hack to get the hostname of the local machine
Expand All @@ -44,9 +51,10 @@ Gossiper.prototype.start = function(callback) {
var l = stdout.length;
var hostname = stdout.slice(0, l - 1);
dns.lookup(hostname, 4, function(err,address, family) {
self.peer_name = [address, self.port.toString()].join(':');
self.address = address;
this.my_state.address = this.address;
this.my_state.port = this.port;
self.peers[self.peer_name] = self.my_state;
self.my_state.name = self.peer_name;
self.server.listen(self.port, address, callback);
});
});
Expand Down Expand Up @@ -83,8 +91,8 @@ Gossiper.prototype.gossip = function() {

// Gossip to seed under certain conditions
if(live_peer && !this.seeds[live_peer] && this.livePeers().length < this.seeds.length) {
if(Math.random() < (this.seeds / this.allPeers.size())) {
this.gossipToPeer(chooseRandom(this.peers));
if(Math.random() < (this.seeds / this.peers.length)) {
this.gossipToPeer(this.chooseRandom(this.allPeers()));
}
}

Expand All @@ -104,8 +112,8 @@ Gossiper.prototype.chooseRandom = function(peers) {
}

Gossiper.prototype.gossipToPeer = function(peer) {
var a = peer.split(":");
var gosipee = new net.createConnection(a[1], a[0]);
var a = this.peers[peer];
var gosipee = new net.createConnection(a.port, a.address);
var self = this;
gosipee.on('connect', function(net_stream) {
var mp_stream = new msgpack.Stream(gosipee);
Expand Down Expand Up @@ -147,25 +155,32 @@ Gossiper.prototype.handleMessage = function(net_stream, mp_stream, msg) {
Gossiper.prototype.handleNewPeers = function(new_peers) {
var self = this;
for(var i in new_peers) {
var peer_name = new_peers[i];
this.peers[peer_name] = new PeerState(peer_name);
this.emit('new_peer', peer_name);

var peer = this.peers[peer_name];
this.listenToPeer(peer);
var peer_info;
var m = new_peers[i].match(/\[(.+)\]:([0-9]+)/);
if(m) {
peer_info = {ip: m[1], port: m[2]};
} else {
m = new_peers[i].split(':');
peer_info = {ip: m[0], port: m[1]};
}
var tp = new PeerState(peer_info.ip, peer_info.port);
this.peers[tp.name] = tp;
this.emit('new_peer', tp);

this.listenToPeer(tp);
}
}

Gossiper.prototype.listenToPeer = function(peer) {
var self = this;
peer.on('update', function(k,v) {
self.emit('update', peer.name, k, v);
self.emit('update', peer, k, v);
});
peer.on('peer_alive', function() {
self.emit('peer_alive', peer.name);
self.emit('peer_alive', peer);
});
peer.on('peer_failed', function() {
self.emit('peer_failed', peer.name);
self.emit('peer_failed', peer);
});
}

Expand Down
21 changes: 17 additions & 4 deletions lib/peer_state.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
var AccrualFailureDetector = require('./accrual_failure_detector').AccrualFailureDetector,
EventEmitter = require('events').EventEmitter,
util = require('util');
util = require('util'),
net = require('net');

var PeerState = exports.PeerState = function(name) {
var PeerState = exports.PeerState = function(address, port) {
EventEmitter.call(this);
this.max_version_seen = 0;
this.attrs = {};
this.detector = new AccrualFailureDetector();
this.alive = true;
this.heart_beat_version = 0;
this.PHI = 8;
this.name = name;
//this.name = name;
this.address = address;
this.port = port;

};

util.inherits(PeerState, EventEmitter);

Object.defineProperty(PeerState.prototype, "name", {
get: function() {
if(net.isIPv6(this.address)) {
return ['[' + this.address + ']', this.port.toString()].join(':');
}
return [this.address, this.port.toString()].join(':');
}
,enumerable: true
});

PeerState.prototype.updateWithDelta = function(k,v,n) {
// It's possibly to get the same updates more than once if we're gossiping with multiple peers at once
// ignore them
Expand Down
9 changes: 9 additions & 0 deletions test/gossiper.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,13 @@ module.exports = {
assert.deepEqual(['127.0.0.1:8010', 'howdy', 'yall'], update);
});
}
,'Bind to local ipv6 address': function(assert, beforeExit) {
var g = new Gossiper(8018, [], '::1');
g.start();
setTimeout(function() {
beforeExit(function() {
assert.deepEqual(g.server.address().address, '::1');
});
}, 2000);
}
}