Skip to content

Commit 9475041

Browse files
Jesper Svennevidjsvennevid
authored andcommitted
Added support for a "master" producer, in case certain jobs should only be scheduled by a single producer
* Local backend automatically elects the Producer as master * Redis backend acquires and refreshes a distributed mutex to track the current master
1 parent 6d8ddf3 commit 9475041

File tree

3 files changed

+118
-28
lines changed

3 files changed

+118
-28
lines changed

lib/backends/local.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ function ProducerBackend(frontend) {
66
this.pending = {};
77
this.active = {};
88
this.queue = [];
9+
this.isMaster = false;
910
}
1011

1112
_.extend(ProducerBackend.prototype, {
@@ -71,6 +72,15 @@ _.extend(ProducerBackend.prototype, {
7172
});
7273
}
7374
this.processQueue();
75+
},
76+
77+
acquireMaster: function (callback) {
78+
this.isMaster = true;
79+
callback(true);
80+
},
81+
82+
releaseMaster: function () {
83+
this.isMaster = false
7484
}
7585
});
7686

lib/backends/redis.js

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,27 @@ var _ = require('underscore'),
22
cluster = require('cluster'),
33
redis = require('redis');
44

5+
var redis_lock = "" +
6+
"local result = redis.call('set', KEYS[1], ARGV[1], 'nx', 'ex', ARGV[2])\n" +
7+
"if result then\n" +
8+
" return 1\n" +
9+
"else\n" +
10+
" if redis.call('get', KEYS[1]) == ARGV[1] then\n" +
11+
" redis.call('expire', KEYS[1], ARGV[2])\n" +
12+
" return 2\n" +
13+
" else\n" +
14+
" return 0\n" +
15+
" end\n" +
16+
"end";
17+
18+
var redis_unlock = "" +
19+
"if redis.call('get', KEYS[1]) == ARGV[1] then\n" +
20+
"then\n" +
21+
" return redis.call('del', KEYS[1])\n" +
22+
"else\n" +
23+
" return 0\n" +
24+
"end";
25+
526
function ProducerBackend(frontend, options) {
627
options = _.clone(options) || {};
728

@@ -17,6 +38,7 @@ function ProducerBackend(frontend, options) {
1738
this.active = {};
1839
this.queue = [];
1940
this.producerId = -1;
41+
this.isMaster = false;
2042
this.choked = false;
2143

2244
var producerClient = this.producerClient = redis.createClient(options.port, options.host);
@@ -134,6 +156,35 @@ _.extend(ProducerBackend.prototype, {
134156
});
135157
}
136158
this.processQueue();
159+
},
160+
161+
acquireMaster: function (timeout, callback) {
162+
if (this.producerId < 0) {
163+
callback(false);
164+
return;
165+
}
166+
167+
var masterLock = this.options.prefix + "master";
168+
this.producerClient.eval(redis_lock, 1, masterLock, this.producerId, Math.floor(timeout / 1000), _.bind(function (err, response) {
169+
if (err) {
170+
callback(false);
171+
return;
172+
}
173+
174+
this.isMaster = response > 0;
175+
callback(this.isMaster);
176+
}, this));
177+
},
178+
179+
releaseMaster: function () {
180+
this.isMaster = false;
181+
182+
if (this.producerId < 0) {
183+
return;
184+
}
185+
186+
var masterLock = this.options.prefix + "master";
187+
this.producerClient.eval(redis_unlock, 1, masterLock, this.producerId);
137188
}
138189
});
139190

lib/workqueue.js

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,44 +10,56 @@ function Producer(options) {
1010
type: 'local',
1111
workers: require('os').cpus().length,
1212
timeout: 10000,
13+
masterTimeout: 10000,
1314
redis: {},
1415
local: {}
1516
});
1617

17-
this.index = 0;
1818
this.options = options;
19-
this.debugPort = 0;
20-
this.debugMode = _.some(process.execArgv, _.bind(function (s) {
21-
var offset = s.indexOf('--debug');
22-
if (offset !== -1) {
23-
var valueOffset = s.indexOf('=', offset);
24-
if (valueOffset !== -1) {
25-
this.debugPort = parseInt(s.slice(valueOffset+1));
19+
20+
this.initialize();
21+
}
22+
23+
_.extend(Producer.prototype, {
24+
initialize: function () {
25+
this.index = 0;
26+
this.debugPort = 0;
27+
this.debugMode = _.some(process.execArgv, _.bind(function (s) {
28+
var offset = s.indexOf('--debug');
29+
if (offset !== -1) {
30+
var valueOffset = s.indexOf('=', offset);
31+
if (valueOffset !== -1) {
32+
this.debugPort = parseInt(s.slice(valueOffset+1));
33+
}
34+
return true;
2635
}
27-
return true;
36+
return false;
37+
}, this));
38+
39+
if (this.debugMode) {
40+
cluster.setupMaster({
41+
execArgv: process.execArgv.filter(function (s) {
42+
return s.indexOf('--debug') === -1;
43+
})
44+
});
2845
}
29-
return false;
30-
}, this));
31-
32-
if (this.debugMode) {
33-
cluster.setupMaster({
34-
execArgv: process.execArgv.filter(function (s) {
35-
return s.indexOf('--debug') === -1;
36-
})
37-
});
38-
}
3946

40-
switch (options.type) {
41-
default:case 'local': this.backend = new local_backend.ProducerBackend(this, options.local); break;
42-
case 'redis': this.backend = new redis_backend.ProducerBackend(this, options.redis); break;
43-
}
47+
switch (this.options.type) {
48+
default:case 'local': this.backend = new local_backend.ProducerBackend(this, this.options.local); break;
49+
case 'redis': this.backend = new redis_backend.ProducerBackend(this, this.options.redis); break;
50+
}
4451

45-
for (var i = 0; i < options.workers; ++i) {
46-
this.startWorker();
47-
}
48-
}
52+
for (var i = 0; i < this.options.workers; ++i) {
53+
this.startWorker();
54+
}
55+
56+
this.acquireMaster();
57+
},
58+
59+
destroy: function () {
60+
this.releaseMaster();
61+
},
4962

50-
_.extend(Producer.prototype, {
5163
post: function (type, options, callback) {
5264
var message = {
5365
payload: {
@@ -94,6 +106,23 @@ _.extend(Producer.prototype, {
94106
clearTimeout(message.timeout);
95107
}
96108
message.callback(response.error);
109+
},
110+
111+
acquireMaster: function () {
112+
this.backend.acquireMaster(this.options.masterTimeout * 2, _.bind(function (success) {
113+
this.masterTimer = setTimeout(_.bind(this.acquireMaster, this), this.options.masterTimeout + Math.floor(Math.random() * this.options.masterTimeout * 0.5));
114+
}, this));
115+
},
116+
117+
releaseMaster: function () {
118+
clearTimeout(this.masterTimer);
119+
delete this.masterTimer;
120+
121+
this.backend.releaseMaster();
122+
},
123+
124+
isMaster: function () {
125+
return this.backend.isMaster;
97126
}
98127
});
99128

0 commit comments

Comments
 (0)