Skip to content

Commit 6d8ddf3

Browse files
Jesper Svennevidjsvennevid
authored andcommitted
Added redis-backend
Moved shared functionality into frontend classes Added a few more tests
1 parent 3e30c75 commit 6d8ddf3

File tree

8 files changed

+403
-118
lines changed

8 files changed

+403
-118
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Minimalistic job queue with redis support
2+
=========================================

lib/backends/local.js

Lines changed: 37 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,31 @@
11
var _ = require('underscore'),
22
cluster = require('cluster');
33

4-
function ProducerBackend(options) {
5-
options = options || {};
6-
7-
_.defaults(options, {
8-
workers: require('os').cpus().length
9-
});
10-
11-
this.index = 0;
4+
function ProducerBackend(frontend) {
5+
this.frontend = frontend;
126
this.pending = {};
137
this.active = {};
148
this.queue = [];
15-
this.debugMode = _.some(process.execArgv, function (s) {
16-
return s.indexOf('--debug-brk') !== -1;
17-
});
18-
this.debugIndex = 59000; // TODO: grab debug port
19-
20-
if (this.debugMode) {
21-
cluster.setupMaster({
22-
execArgv: process.execArgv.filter(function (s) {
23-
return s.indexOf('--debug-brk') === -1;
24-
})
25-
});
26-
}
27-
28-
for (var i = 0; i < options.workers; ++i) {
29-
this.startWorker();
30-
}
319
}
3210

3311
_.extend(ProducerBackend.prototype, {
34-
post: function (type, options, callback) {
35-
var message = {
36-
payload: {
37-
type: type,
38-
options: options,
39-
index: this.index++
40-
},
41-
callback: callback
42-
};
43-
44-
this.queueMessage(message);
45-
this.processQueue();
46-
47-
return message.payload.index;
48-
},
49-
50-
setupMessageListener: function (worker) {
51-
worker.on('message', _.bind(function (message) {
52-
this.onMessage(worker.id, message);
53-
}, this));
54-
55-
worker.on('exit', _.bind(function (worker, code, signal) {
56-
this.startWorker();
12+
setupWorker: function (worker) {
13+
worker.on('message', _.bind(function (response) {
14+
var message = this.active[worker.id];
15+
delete this.active[worker.id];
16+
if (message) {
17+
delete this.pending[message.payload.index];
18+
this.frontend.onResponse(message, response);
19+
}
5720
this.processQueue();
5821
}, this));
59-
},
60-
61-
onMessage: function (id, message) {
62-
var pending = this.pending[message.index];
63-
64-
delete this.pending[message.index];
65-
delete this.active[id];
66-
67-
clearTimeout(pending.timeout);
68-
pending.callback(message.error);
6922
this.processQueue();
7023
},
7124

7225
queueMessage: function (message) {
26+
message.timestamp = new Date();
7327
this.queue.push(message);
28+
this.processQueue();
7429
},
7530

7631
processQueue: function () {
@@ -95,62 +50,46 @@ _.extend(ProducerBackend.prototype, {
9550
}
9651

9752
this.queue.shift();
98-
99-
message.timestamp = new Date();
100-
10153
this.pending[message.payload.index] = message;
102-
103-
message.timeout = setTimeout(_.bind(function () {
104-
delete this.pending[message.payload.index];
105-
delete this.active[message.worker];
106-
message.callback("timeout");
107-
108-
var worker = cluster.workers[message.worker];
109-
worker.kill();
110-
}, this), 10000);
11154
},
11255

113-
startWorker: function () {
114-
var worker;
115-
if (this.debugMode) {
116-
cluster.settings.execArgv.push('--debug=' + (this.debugIndex++));
117-
worker = cluster.fork();
118-
cluster.settings.execArgv.pop();
56+
onTimeout: function (id) {
57+
var pending = this.pending[id];
58+
if (pending) {
59+
delete this.pending[id];
60+
if (pending.worker) {
61+
delete this.active[pending.worker];
62+
}
63+
64+
var worker = cluster.workers[pending.worker];
65+
if (worker) {
66+
worker.kill();
67+
}
11968
} else {
120-
worker = cluster.fork();
69+
this.queue = _.filter(this.queue, function (message) {
70+
return message.payload.index != id;
71+
});
12172
}
122-
this.setupMessageListener(worker);
73+
this.processQueue();
12374
}
12475
});
12576

126-
function ConsumerBackend() {
127-
this.methods = {};
77+
function ConsumerBackend(frontend) {
78+
this.frontend = frontend;
12879
}
12980

13081
_.extend(ConsumerBackend.prototype, {
131-
registerMethod: function (type, method) {
132-
this.methods[type] = method;
133-
},
13482

13583
run: function () {
136-
process.on('message', _.bind(this.onMessage, this));
84+
process.on('message', _.bind(function (message) {
85+
this.frontend.onPayload(message);
86+
}, this));
13787
},
13888

139-
onMessage: function (message) {
140-
var method = this.methods[message.type];
141-
if (!method) {
142-
process.send({
143-
error: 'method not found',
144-
index: message.index
145-
});
146-
return;
147-
}
148-
149-
method(message.options, function (err) {
150-
process.send({
151-
error: err,
152-
index: message.index
153-
});
89+
onResponse: function (message, response) {
90+
process.send({
91+
error: response,
92+
index: message.index
15493
});
15594
}
15695
});

lib/backends/redis.js

Lines changed: 165 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,184 @@
1-
var _ = require('underscore');
1+
var _ = require('underscore'),
2+
cluster = require('cluster'),
3+
redis = require('redis');
24

3-
function ProducerBackend(options) {
4-
options = options || {};
5+
function ProducerBackend(frontend, options) {
6+
options = _.clone(options) || {};
57

68
_.defaults(options, {
79
host: "localhost",
810
port: 6379,
9-
prefix: "workqueue"
11+
prefix: "workqueue:",
1012
});
13+
14+
this.frontend = frontend;
15+
this.options = options;
16+
this.pending = {};
17+
this.active = {};
18+
this.queue = [];
19+
this.producerId = -1;
20+
this.choked = false;
21+
22+
var producerClient = this.producerClient = redis.createClient(options.port, options.host);
23+
var responseClient = this.responseClient = redis.createClient(options.port, options.host);
24+
25+
producerClient.on('ready', _.bind(function () {
26+
if (this.producerId < 0) {
27+
producerClient.incr(options.prefix + "producers", _.bind(function (err, data) {
28+
this.producerId = data;
29+
this.scheduleResponse();
30+
this.processQueue();
31+
}, this));
32+
} else {
33+
this.processQueue();
34+
}
35+
}, this));
36+
producerClient.on('close', _.bind(function () {
37+
this.producerId = -1;
38+
}));
39+
producerClient.on('error', _.bind(function () {
40+
this.producerId = -1;
41+
}, this));
42+
producerClient.on('drain', _.bind(function () {
43+
this.choked = false;
44+
this.processQueue();
45+
}, this));
46+
responseClient.on('ready', _.bind(function () {
47+
this.scheduleResponse();
48+
}, this));
49+
responseClient.on('error', _.bind(function () {
50+
}));
1151
}
1252

1353
_.extend(ProducerBackend.prototype, {
54+
setupWorker: function (worker) {
55+
},
56+
57+
queueMessage: function (message) {
58+
message.timestamp = new Date();
59+
this.queue.push(message);
60+
this.processQueue();
61+
},
62+
63+
processQueue: function () {
64+
if (this.queue.length == 0) {
65+
return;
66+
}
67+
68+
if (this.choked) {
69+
if (this.producerClient.command_queue.length > 0) {
70+
return;
71+
}
72+
this.choked = false;
73+
}
74+
75+
if (!this.producerClient.connected || (this.producerId < 0)) {
76+
return;
77+
}
78+
79+
if (this.producerClient.command_queue.length > 200) {
80+
this.choked = true;
81+
return;
82+
}
83+
84+
var message = this.queue.shift();
85+
this.pending[message.payload.index] = message;
86+
message.payload.sender = this.producerId;
87+
88+
var jobQueue = this.options.prefix + "jobs";
89+
this.producerClient.multi().rpush(jobQueue, JSON.stringify(message.payload), _.bind(function (err) {
90+
if (err) {
91+
if (message.payload.index) {
92+
delete this.pending[message.payload.index];
93+
this.queue.unshift(message);
94+
}
95+
delete message.payload.sender;
96+
97+
} else {
98+
this.processQueue();
99+
}
100+
}, this)).expire(jobQueue, 20).exec();
101+
},
102+
103+
scheduleResponse: function () {
104+
if (this.producerId < 0) {
105+
return;
106+
}
107+
108+
var responseQueue = this.options.prefix + "response." + this.producerId;
109+
this.responseClient.blpop(responseQueue, 5, _.bind(function (err, data) {
110+
if (!err && data) {
111+
var response = JSON.parse(data[1]);
112+
var message = this.pending[response.index];
113+
if (message) {
114+
delete this.pending[response.index];
115+
this.frontend.onResponse(message, response);
116+
delete message.payload.index;
117+
}
118+
}
119+
120+
process.nextTick(_.bind(function () {
121+
this.scheduleResponse();
122+
}, this));
123+
}, this));
124+
},
125+
126+
onTimeout: function (id) {
127+
var pending = this.pending[id];
128+
if (pending) {
129+
delete this.pending[id];
130+
delete pending.payload.index;
131+
} else {
132+
this.queue = _.filter(this.queue, function (message) {
133+
return message.payload.index != id;
134+
});
135+
}
136+
this.processQueue();
137+
}
14138
});
15139

16-
function ConsumerBackend(options) {
17-
options = options || {};
140+
function ConsumerBackend(frontend, options) {
141+
options = _.clone(options) || {};
18142

19143
_.defaults(options, {
20144
host: "localhost",
21145
port: 6379,
22-
prefix: "workqueue"
146+
prefix: "workqueue:"
23147
});
148+
149+
this.frontend = frontend;
150+
this.options = options;
24151
}
25152

26153
_.extend(ConsumerBackend.prototype, {
27-
});
154+
run: function () {
155+
var redisClient = this.redisClient = redis.createClient(this.options.port, this.options.host);
156+
redisClient.on('ready', _.bind(function () {
157+
this.reschedule();
158+
}, this));
159+
redisClient.on('error', _.bind(function () {
160+
}, this));
161+
},
162+
163+
reschedule: function () {
164+
var jobQueue = this.options.prefix + "jobs";
165+
this.redisClient.blpop(jobQueue, 0, _.bind(function (err, data) {
166+
if (err || !data) {
167+
this.reschedule();
168+
}
169+
this.frontend.onPayload(JSON.parse(data[1]));
170+
}, this));
171+
},
172+
173+
onResponse: function (message, response) {
174+
var responseQueue = this.options.prefix + "response." + message.sender;
175+
this.redisClient.multi().rpush(responseQueue, JSON.stringify({
176+
error: response,
177+
index: message.index
178+
})).expire(responseQueue, 20).exec();
179+
this.reschedule();
180+
}
181+
});
182+
183+
exports.ProducerBackend = ProducerBackend;
184+
exports.ConsumerBackend = ConsumerBackend;

0 commit comments

Comments
 (0)