Skip to content

Commit f814eaa

Browse files
Jesper Svennevidjsvennevid
authored andcommitted
Added lock-support for jobs in the local backend (only run a single instance of a job at the same time)
Reordered timeout setup to allow for proper cleanup when a job cannot run due to a lock Added missing parameter to acquireMaster on local backend
1 parent 9475041 commit f814eaa

File tree

2 files changed

+22
-5
lines changed

2 files changed

+22
-5
lines changed

lib/backends/local.js

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ function ProducerBackend(frontend) {
66
this.pending = {};
77
this.active = {};
88
this.queue = [];
9+
this.locks = {};
910
this.isMaster = false;
1011
}
1112

@@ -15,6 +16,9 @@ _.extend(ProducerBackend.prototype, {
1516
var message = this.active[worker.id];
1617
delete this.active[worker.id];
1718
if (message) {
19+
if ("lock" in message.payload.options) {
20+
delete this.locks[message.payload.options.lock];
21+
}
1822
delete this.pending[message.payload.index];
1923
this.frontend.onResponse(message, response);
2024
}
@@ -36,6 +40,12 @@ _.extend(ProducerBackend.prototype, {
3640

3741
var message = this.queue[0];
3842

43+
if (("lock" in message.payload.options) && this.locks[message.payload.options.lock]) {
44+
this.queue.shift();
45+
this.frontend.onResponse(message, { error: "locked" });
46+
return;
47+
}
48+
3949
for (var id in cluster.workers) {
4050
if (!this.active[id]) {
4151
cluster.workers[id].send(message.payload);
@@ -52,16 +62,24 @@ _.extend(ProducerBackend.prototype, {
5262

5363
this.queue.shift();
5464
this.pending[message.payload.index] = message;
65+
if ("lock" in message.payload.options) {
66+
this.locks[message.payload.options.lock] = message;
67+
}
5568
},
5669

5770
onTimeout: function (id) {
5871
var pending = this.pending[id];
5972
if (pending) {
6073
delete this.pending[id];
74+
6175
if (pending.worker) {
6276
delete this.active[pending.worker];
6377
}
6478

79+
if ("lock" in pending.payload.options) {
80+
delete this.locks[pending.payload.options.lock];
81+
}
82+
6583
var worker = cluster.workers[pending.worker];
6684
if (worker) {
6785
worker.kill();
@@ -74,7 +92,7 @@ _.extend(ProducerBackend.prototype, {
7492
this.processQueue();
7593
},
7694

77-
acquireMaster: function (callback) {
95+
acquireMaster: function (timeout, callback) {
7896
this.isMaster = true;
7997
callback(true);
8098
},

lib/workqueue.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,22 @@ _.extend(Producer.prototype, {
6464
var message = {
6565
payload: {
6666
type: type,
67-
options: options,
67+
options: _.clone(options),
6868
index: this.index++
6969
},
7070
callback: callback
7171
};
7272

73-
this.backend.queueMessage(message);
74-
7573
message.timeout = setTimeout(_.bind(function () {
7674
this.backend.onTimeout(message.payload.index);
7775
delete message.payload.timeout;
7876
this.onResponse(message, {
79-
error: "job timeout",
77+
error: "timeout",
8078
index: message.payload.index
8179
});
8280
}, this), this.options.timeout);
8381

82+
this.backend.queueMessage(message);
8483
return message.payload.index;
8584
},
8685

0 commit comments

Comments
 (0)