Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 16 additions & 24 deletions lib/gearmanode/job-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -300,22 +300,21 @@ JobServer.prototype._processData = function (chunk) {
return;
}

if (this.hasOwnProperty('segmentedPacket')) {// if has segmentedPacket, it should be joined with chunk firstly
chunk = Buffer.concat([this.segmentedPacket, chunk]);
delete this.segmentedPacket;
}

if (chunk.length < protocol.CONSTANTS.HEADER_LEN) { // it's only header fragment, the rest should come in next packet
this.headerfrag = chunk;
this.segmentedPacket = chunk;
return;
}

if (chunk.readUInt32BE(0) !== protocol.CONSTANTS.HEADER_RESP) {
if (this.hasOwnProperty('segmentedPacket')) {
chunk = Buffer.concat([this.segmentedPacket, chunk]);
} else if (this.hasOwnProperty('headerfrag')) {
chunk = Buffer.concat([this.headerfrag, chunk]);
delete this.headerfrag;
} else { // not previous packet stored to be concatenated -> it must be error
// OUT OF SYNC!
this.clientOrWorker._unrecoverableError('out of sync with server');
return;
}
if (chunk.readUInt32BE(0) != protocol.CONSTANTS.HEADER_RESP
&& chunk.readUInt32BE(0) != protocol.CONSTANTS.HEADER_REQ) {
// OUT OF SYNC!
this.clientOrWorker._unrecoverableError('out of sync with server');
return;
}

var packetType = chunk.readUInt32BE(4);
Expand All @@ -336,28 +335,21 @@ JobServer.prototype._processData = function (chunk) {
if (JobServer.logger.isLevelEnabled('verbose')) {
JobServer.logger.log('verbose', 'joined packet found, responseSize=%d, packetLen=%d', responseLength, chunk.length);
}
nextChunk = new Buffer(chunk.slice(responseLength));
if (nextChunk.length >= protocol.CONSTANTS.HEADER_LEN) { // header complete
var self = this;
process.nextTick(function() {
self._processData(nextChunk);
});
} else { // it's only header fragment, the rest should come in next packet
this.headerfrag = nextChunk;
}

var self = this;
var nextChunk = new Buffer(chunk.slice(responseLength));
process.nextTick(function() {
self._processData(nextChunk);
});
chunk = chunk.slice(0, responseLength);
}

// parse packet if it is a response
var parsedPacket;
if (protocol.DEFINITION[packetCode] !== undefined
&& (protocol.CONSTANTS.TYPE_RESP & protocol.DEFINITION[packetCode][1])) {
delete this.segmentedPacket; // clear cached previous segments if exist or not
parsedPacket = protocol.parsePacket(chunk, protocol.DEFINITION[packetCode][2]);
}


var handle, job, processed, status;

switch (packetType) {
Expand Down
2 changes: 1 addition & 1 deletion lib/gearmanode/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ var Job = exports.Job = function (clientOrWorker, options) {
if (this.clientOrWorker._type === 'Worker') {
// VALIDATION
common.mixin({ handle: 'mandatory', jobServerUid: 'mandatory' }, pattern);
common.verifyAndSanitizeOptions(options, pattern);
returned = common.verifyAndSanitizeOptions(options, pattern);
if (returned instanceof Error) { return returned; }

this.handle = options.handle;
Expand Down
32 changes: 23 additions & 9 deletions test/test-job-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,9 @@ describe('JobServer', function() {
js.connected = true;
js._processData(chunk);
should.not.exist(js.segmentedPacket);
should.not.exist(js.headerfrag);
js.clientOrWorker._response.calledOnce.should.be.true;
js.clientOrWorker._response.getCall(0).args[1].should.equal(protocol.PACKET_TYPES.NOOP);
})
})
it('should process one packet with two messages correctly', function(done) {
var chunk1 = new Buffer([0x00, 0x52, 0x45, 0x53, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00]); // NOOP
var chunk2 = new Buffer([0x00, 0x52, 0x45, 0x53, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00]); // NO_JOB
Expand All @@ -252,7 +251,6 @@ describe('JobServer', function() {
// so the asserts must be in the next tick
process.nextTick(function() {
should.not.exist(js.segmentedPacket);
should.not.exist(js.headerfrag);
js.clientOrWorker._response.calledTwice.should.be.true;
js.clientOrWorker._response.getCall(0).args[1].should.equal(protocol.PACKET_TYPES.NOOP);
js.clientOrWorker._response.getCall(1).args[1].should.equal(protocol.PACKET_TYPES.NO_JOB);
Expand All @@ -267,11 +265,26 @@ describe('JobServer', function() {
js.connected = true;
js._processData(chunk1);
should.exist(js.segmentedPacket);
should.not.exist(js.headerfrag);
js.clientOrWorker._response.called.should.be.false;
js._processData(chunk2);
should.not.exist(js.segmentedPacket);
should.not.exist(js.headerfrag);
js.clientOrWorker._response.calledOnce.should.be.true;
js.clientOrWorker._response.getCall(0).args[1].should.equal(protocol.PACKET_TYPES.JOB_ASSIGN);
})
it('should process more packet(length less than the header length) with one messages correctly', function() {
var chunk1 = new Buffer([0x00, 0x52, 0x45, 0x53, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x14]); // JOB_ASSIGN
var chunk2 = new Buffer([0x48, 0x3a, 0x6c, 0x61, 0x70, 0x3a, 0x31, 0x00, 0x72, 0x65, 0x76, 0x65, 0x72, 0x73, 0x65, 0x00]);
var chunk3 = new Buffer([0x74, 0x65, 0x73, 0x74]);
js.clientOrWorker._response = sinon.spy();
js.connected = true;
js._processData(chunk1);
should.exist(js.segmentedPacket);
js.clientOrWorker._response.called.should.be.false;
js._processData(chunk2);
should.exist(js.segmentedPacket);
js.clientOrWorker._response.called.should.be.false;
js._processData(chunk3);
should.not.exist(js.segmentedPacket);
js.clientOrWorker._response.calledOnce.should.be.true;
js.clientOrWorker._response.getCall(0).args[1].should.equal(protocol.PACKET_TYPES.JOB_ASSIGN);
})
Expand All @@ -282,11 +295,12 @@ describe('JobServer', function() {
js.clientOrWorker._response = sinon.spy();
js.connected = true;
js._processData(chunk1);
should.not.exist(js.segmentedPacket);
should.exist(js.headerfrag);
should.exist(js.segmentedPacket);
js._processData(chunk2);
should.not.exist(js.segmentedPacket);
should.exist(js.headerfrag); // because last 3 bytes are identified as splitted header again
process.nextTick(function() {
should.exist(js.segmentedPacket);
done();
});
js.clientOrWorker._response.calledOnce.should.be.true;
js.clientOrWorker._response.getCall(0).args[1].should.equal(protocol.PACKET_TYPES.NOOP);
})
Expand Down