diff --git a/lib/gearmanode/job-server.js b/lib/gearmanode/job-server.js index 3afd62b..bfab202 100644 --- a/lib/gearmanode/job-server.js +++ b/lib/gearmanode/job-server.js @@ -300,22 +300,21 @@ JobServer.prototype._processData = function (chunk) { return; } + if (this.hasOwnProperty('segmentedPacket')) {// if has segmentedPacket, it should be joined with chunk firstly + chunk = Buffer.concat([this.segmentedPacket, chunk]); + delete this.segmentedPacket; + } + if (chunk.length < protocol.CONSTANTS.HEADER_LEN) { // it's only header fragment, the rest should come in next packet - this.headerfrag = chunk; + this.segmentedPacket = chunk; return; } - if (chunk.readUInt32BE(0) !== protocol.CONSTANTS.HEADER_RESP) { - if (this.hasOwnProperty('segmentedPacket')) { - chunk = Buffer.concat([this.segmentedPacket, chunk]); - } else if (this.hasOwnProperty('headerfrag')) { - chunk = Buffer.concat([this.headerfrag, chunk]); - delete this.headerfrag; - } else { // not previous packet stored to be concatenated -> it must be error - // OUT OF SYNC! - this.clientOrWorker._unrecoverableError('out of sync with server'); - return; - } + if (chunk.readUInt32BE(0) != protocol.CONSTANTS.HEADER_RESP + && chunk.readUInt32BE(0) != protocol.CONSTANTS.HEADER_REQ) { + // OUT OF SYNC! + this.clientOrWorker._unrecoverableError('out of sync with server'); + return; } var packetType = chunk.readUInt32BE(4); @@ -336,16 +335,11 @@ JobServer.prototype._processData = function (chunk) { if (JobServer.logger.isLevelEnabled('verbose')) { JobServer.logger.log('verbose', 'joined packet found, responseSize=%d, packetLen=%d', responseLength, chunk.length); } - nextChunk = new Buffer(chunk.slice(responseLength)); - if (nextChunk.length >= protocol.CONSTANTS.HEADER_LEN) { // header complete - var self = this; - process.nextTick(function() { - self._processData(nextChunk); - }); - } else { // it's only header fragment, the rest should come in next packet - this.headerfrag = nextChunk; - } - + var self = this; + var nextChunk = new Buffer(chunk.slice(responseLength)); + process.nextTick(function() { + self._processData(nextChunk); + }); chunk = chunk.slice(0, responseLength); } @@ -353,11 +347,9 @@ JobServer.prototype._processData = function (chunk) { var parsedPacket; if (protocol.DEFINITION[packetCode] !== undefined && (protocol.CONSTANTS.TYPE_RESP & protocol.DEFINITION[packetCode][1])) { - delete this.segmentedPacket; // clear cached previous segments if exist or not parsedPacket = protocol.parsePacket(chunk, protocol.DEFINITION[packetCode][2]); } - var handle, job, processed, status; switch (packetType) { diff --git a/lib/gearmanode/job.js b/lib/gearmanode/job.js index 9b6fd30..d99fe47 100644 --- a/lib/gearmanode/job.js +++ b/lib/gearmanode/job.js @@ -86,7 +86,7 @@ var Job = exports.Job = function (clientOrWorker, options) { if (this.clientOrWorker._type === 'Worker') { // VALIDATION common.mixin({ handle: 'mandatory', jobServerUid: 'mandatory' }, pattern); - common.verifyAndSanitizeOptions(options, pattern); + returned = common.verifyAndSanitizeOptions(options, pattern); if (returned instanceof Error) { return returned; } this.handle = options.handle; diff --git a/test/test-job-server.js b/test/test-job-server.js index 26b4390..03c5e08 100644 --- a/test/test-job-server.js +++ b/test/test-job-server.js @@ -237,10 +237,9 @@ describe('JobServer', function() { js.connected = true; js._processData(chunk); should.not.exist(js.segmentedPacket); - should.not.exist(js.headerfrag); js.clientOrWorker._response.calledOnce.should.be.true; js.clientOrWorker._response.getCall(0).args[1].should.equal(protocol.PACKET_TYPES.NOOP); - }) + }) it('should process one packet with two messages correctly', function(done) { var chunk1 = new Buffer([0x00, 0x52, 0x45, 0x53, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00]); // NOOP var chunk2 = new Buffer([0x00, 0x52, 0x45, 0x53, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00]); // NO_JOB @@ -252,7 +251,6 @@ describe('JobServer', function() { // so the asserts must be in the next tick process.nextTick(function() { should.not.exist(js.segmentedPacket); - should.not.exist(js.headerfrag); js.clientOrWorker._response.calledTwice.should.be.true; js.clientOrWorker._response.getCall(0).args[1].should.equal(protocol.PACKET_TYPES.NOOP); js.clientOrWorker._response.getCall(1).args[1].should.equal(protocol.PACKET_TYPES.NO_JOB); @@ -267,11 +265,26 @@ describe('JobServer', function() { js.connected = true; js._processData(chunk1); should.exist(js.segmentedPacket); - should.not.exist(js.headerfrag); js.clientOrWorker._response.called.should.be.false; js._processData(chunk2); should.not.exist(js.segmentedPacket); - should.not.exist(js.headerfrag); + js.clientOrWorker._response.calledOnce.should.be.true; + js.clientOrWorker._response.getCall(0).args[1].should.equal(protocol.PACKET_TYPES.JOB_ASSIGN); + }) + it('should process more packet(length less than the header length) with one messages correctly', function() { + var chunk1 = new Buffer([0x00, 0x52, 0x45, 0x53, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x14]); // JOB_ASSIGN + var chunk2 = new Buffer([0x48, 0x3a, 0x6c, 0x61, 0x70, 0x3a, 0x31, 0x00, 0x72, 0x65, 0x76, 0x65, 0x72, 0x73, 0x65, 0x00]); + var chunk3 = new Buffer([0x74, 0x65, 0x73, 0x74]); + js.clientOrWorker._response = sinon.spy(); + js.connected = true; + js._processData(chunk1); + should.exist(js.segmentedPacket); + js.clientOrWorker._response.called.should.be.false; + js._processData(chunk2); + should.exist(js.segmentedPacket); + js.clientOrWorker._response.called.should.be.false; + js._processData(chunk3); + should.not.exist(js.segmentedPacket); js.clientOrWorker._response.calledOnce.should.be.true; js.clientOrWorker._response.getCall(0).args[1].should.equal(protocol.PACKET_TYPES.JOB_ASSIGN); }) @@ -282,11 +295,12 @@ describe('JobServer', function() { js.clientOrWorker._response = sinon.spy(); js.connected = true; js._processData(chunk1); - should.not.exist(js.segmentedPacket); - should.exist(js.headerfrag); + should.exist(js.segmentedPacket); js._processData(chunk2); - should.not.exist(js.segmentedPacket); - should.exist(js.headerfrag); // because last 3 bytes are identified as splitted header again + process.nextTick(function() { + should.exist(js.segmentedPacket); + done(); + }); js.clientOrWorker._response.calledOnce.should.be.true; js.clientOrWorker._response.getCall(0).args[1].should.equal(protocol.PACKET_TYPES.NOOP); })