Skip to content

Commit cbc35b2

Browse files
Jesper Svennevidjsvennevid
authored andcommitted
Added lock-support for jobs using the redis backend
1 parent f814eaa commit cbc35b2

File tree

2 files changed

+60
-17
lines changed

2 files changed

+60
-17
lines changed

lib/backends/redis.js

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ var redis_lock = "" +
1616
"end";
1717

1818
var redis_unlock = "" +
19-
"if redis.call('get', KEYS[1]) == ARGV[1] then\n" +
19+
"if redis.call('get', KEYS[1]) == ARGV[1]\n" +
2020
"then\n" +
2121
" return redis.call('del', KEYS[1])\n" +
2222
"else\n" +
@@ -103,23 +103,50 @@ _.extend(ProducerBackend.prototype, {
103103
return;
104104
}
105105

106-
var message = this.queue.shift();
107-
this.pending[message.payload.index] = message;
108-
message.payload.sender = this.producerId;
106+
var run = _.bind(function () {
107+
this.pending[message.payload.index] = message;
108+
message.payload.sender = this.producerId;
109+
110+
var jobQueue = this.options.prefix + "jobs";
111+
this.producerClient.multi().rpush(jobQueue, JSON.stringify(message.payload), _.bind(function (err) {
112+
if (err) {
113+
if (message.payload.index) {
114+
delete this.pending[message.payload.index];
115+
this.queue.unshift(message);
116+
}
117+
delete message.payload.sender;
118+
119+
process.nextTick(_.bind(function () {
120+
this.processQueue();
121+
}, this));
122+
} else {
123+
this.processQueue();
124+
}
125+
}, this)).expire(jobQueue, 20).exec();
126+
}, this);
109127

110-
var jobQueue = this.options.prefix + "jobs";
111-
this.producerClient.multi().rpush(jobQueue, JSON.stringify(message.payload), _.bind(function (err) {
112-
if (err) {
113-
if (message.payload.index) {
114-
delete this.pending[message.payload.index];
128+
var message = this.queue.shift();
129+
if ("lock" in message.payload.options) {
130+
var lockName = this.options.prefix + "locks." + message.payload.options.lock;
131+
var lockData = this.producerId + '.' + message.payload.index;
132+
this.producerClient.eval(redis_lock, 1, lockName, lockData, Math.floor(message.lock_timeout * 2 / 1000), _.bind(function (err, response) {
133+
if (err) {
115134
this.queue.unshift(message);
135+
return;
116136
}
117-
delete message.payload.sender;
118137

119-
} else {
120-
this.processQueue();
121-
}
122-
}, this)).expire(jobQueue, 20).exec();
138+
if (response > 0) {
139+
run();
140+
} else {
141+
this.frontend.onResponse(message, { error: "locked" });
142+
process.nextTick(_.bind(function () {
143+
this.processQueue();
144+
}, this));
145+
}
146+
}, this));
147+
} else {
148+
run();
149+
}
123150
},
124151

125152
scheduleResponse: function () {
@@ -133,9 +160,21 @@ _.extend(ProducerBackend.prototype, {
133160
var response = JSON.parse(data[1]);
134161
var message = this.pending[response.index];
135162
if (message) {
136-
delete this.pending[response.index];
137-
this.frontend.onResponse(message, response);
138-
delete message.payload.index;
163+
var complete = _.bind(function () {
164+
delete this.pending[response.index];
165+
this.frontend.onResponse(message, response);
166+
delete message.payload.index;
167+
}, this);
168+
169+
if ("lock" in message.payload.options) {
170+
var lockName = this.options.prefix + "locks." + message.payload.options.lock;
171+
var lockData = this.producerId + '.' + message.payload.index;
172+
this.producerClient.eval(redis_unlock, 1, lockName, lockData, function (err) {
173+
complete();
174+
});
175+
} else {
176+
complete();
177+
}
139178
}
140179
}
141180

lib/workqueue.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ _.extend(Producer.prototype, {
7070
callback: callback
7171
};
7272

73+
if ("lock" in message.payload.options) {
74+
message.lock_timeout = this.options.timeout;
75+
}
76+
7377
message.timeout = setTimeout(_.bind(function () {
7478
this.backend.onTimeout(message.payload.index);
7579
delete message.payload.timeout;

0 commit comments

Comments
 (0)