diff --git a/lib/Dicer.js b/lib/Dicer.js index e385a94..717277f 100644 --- a/lib/Dicer.js +++ b/lib/Dicer.js @@ -48,44 +48,8 @@ class Dicer extends Writable { this._part.emit('header', header); }); this._hparser.on('error', (err) => { - if (this._part && !this._ignoreData) { - this._part.emit('error', err); - this._part.push(null); - } - }); - } - - emit(ev) { - if (ev !== 'finish' || this._realFinish) { - Writable.prototype.emit.apply(this, arguments); - return; - } - - if (this._finished) - return; - - process.nextTick(() => { - this.emit('error', new Error('Unexpected end of multipart data')); - - if (this._part && !this._ignoreData) { - const type = (this._isPreamble ? 'Preamble' : 'Part'); - this._part.emit( - 'error', - new Error(`${type} terminated early due to ` - + 'unexpected end of multipart data') - ); - this._part.push(null); - process.nextTick(() => { - this._realFinish = true; - this.emit('finish'); - this._realFinish = false; - }); - return; - } - - this._realFinish = true; - this.emit('finish'); - this._realFinish = false; + if (this._part && !this._ignoreData) + this._part.destroy(err); }); } @@ -97,9 +61,7 @@ class Dicer extends Writable { if (this._headerFirst && this._isPreamble) { if (!this._part) { this._part = new PartStream(this._partOpts); - if (this._events.preamble) - this.emit('preamble', this._part); - else + if (!this.emit('preamble', this._part)) ignore(this); } const r = this._hparser.push(data); @@ -123,6 +85,34 @@ class Dicer extends Writable { cb(); } + _final(cb) { + if (this._finished) { + if (this._parts !== 0) { + this._pause = true; + this._cb = cb; + return; + } + + cb(); + return; + } + + if (this._part && !this._ignoreData) { + const type = (this._isPreamble ? 'Preamble' : 'Part'); + this._part.destroy( + new Error(`${type} terminated early due to ` + + 'unexpected end of multipart data') + ); + ignore(this); + } + + // Node <= 12 compatibility, otherwise `part`'s 'error'/'end' have no chance + // to emit. + process.nextTick(() => { + cb(new Error('Unexpected end of multipart data')); + }); + } + reset() { this._part = undefined; this._bparser = undefined; @@ -154,16 +144,14 @@ function onInfo(isMatch, data, start, end) { } } if (this._dashes === 2) { - if ((start + i) < end && this._events.trailer) + if ((start + i) < end && this.listenerCount('trailer')) this.emit('trailer', data.slice(start + i, end)); this.reset(); this._finished = true; // No more parts will be added - if (this._parts === 0) { - this._realFinish = true; - this.emit('finish'); - this._realFinish = false; - } + if (this._parts === 0) + unpause(this); + } if (this._dashes) return; @@ -176,9 +164,7 @@ function onInfo(isMatch, data, start, end) { unpause(this); }; ev = this._isPreamble ? 'preamble' : 'part'; - if (this._events[ev]) - this.emit(ev, this._part); - else + if (!this.emit(ev, this._part)) ignore(this); if (!this._isPreamble) this._inHeader = true; @@ -205,15 +191,8 @@ function onInfo(isMatch, data, start, end) { } else { ++this._parts; this._part.on('end', () => { - if (--this._parts === 0) { - if (this._finished) { - this._realFinish = true; - this.emit('finish'); - this._realFinish = false; - } else { - unpause(this); - } - } + if (--this._parts === 0) + unpause(this); }); } this._part.push(null); diff --git a/test/test-multipart.js b/test/test-multipart.js index d696c92..f260c99 100644 --- a/test/test-multipart.js +++ b/test/test-multipart.js @@ -84,6 +84,13 @@ function next() { header: undefined }; + const onPreambleEnd = () => { + if (preamble.body) + preamble.body = Buffer.concat(preamble.body, preamble.bodylen); + if (preamble.body || preamble.header) + state.preamble = preamble; + }; + p.on('header', (h) => { preamble.header = h; if (v.setBoundary) @@ -100,12 +107,8 @@ function next() { preamble.bodylen += data.length; }).on('error', (err) => { preamble.error = err; - }).on('end', () => { - if (preamble.body) - preamble.body = Buffer.concat(preamble.body, preamble.bodylen); - if (preamble.body || preamble.header) - state.preamble = preamble; - }); + onPreambleEnd(); + }).on('end', onPreambleEnd); }); dicer.on('part', (p) => { const part = { @@ -115,6 +118,12 @@ function next() { header: undefined }; + const onPartEnd = () => { + if (part.body) + part.body = Buffer.concat(part.body, part.bodylen); + state.parts.push(part); + }; + p.on('header', (h) => { part.header = h; }).on('data', (data) => { @@ -126,15 +135,23 @@ function next() { }).on('error', (err) => { part.error = err; ++partErrors; - }).on('end', () => { - if (part.body) - part.body = Buffer.concat(part.body, part.bodylen); - state.parts.push(part); - }); - }).on('error', (err) => { - error = err; - }).on('finish', () => { - assert(finishes++ === 0, makeMsg(v.what, 'finish emitted multiple times')); + onPartEnd(); + }).on('end', onPartEnd); + }).on('error', onFinish).on('finish', onFinish); + + function onFinish(err) { + if (err) { + assert( + error === undefined, + makeMsg(v.what, 'error emitted multiple times') + ); + error = err; + } + + // Node <= 12 emits both 'error' and 'end', while Node > 14 emits only + // 'error'. + if (finishes++ > 0) + return; if (v.dicerError) { assert(error !== undefined, makeMsg(v.what, 'Expected error')); @@ -242,7 +259,7 @@ function next() { } ++t; next(); - }); + } fs.createReadStream(fixtureBase + '/original').pipe(dicer); } diff --git a/test/test-pipeline.js b/test/test-pipeline.js new file mode 100644 index 0000000..34822be --- /dev/null +++ b/test/test-pipeline.js @@ -0,0 +1,30 @@ +'use strict'; + +const assert = require('assert'); + +const { Readable, pipeline } = require('stream'); +const Dicer = require('..'); + +const r = new Readable({ read() {} }); +const d = new Dicer({ boundary: 'a' }); + +let isFinished = false; + +d.on('part', async (part) => { + part.resume(); +}); + +r.push('--a\r\nA: 1\r\nB: 1\r\n\r\n123\r\n--a\r\n\r\n456\r\n--a--\r\n'); +setImmediate(() => { + r.push(null); +}); + +pipeline(r, d, (error) => { + assert(isFinished === false, 'Double-invocation of pipeline callback'); + assert(error === undefined, 'Unexpected pipeline error'); + isFinished = true; +}); + +process.on('exit', () => { + assert(isFinished === true, 'Should finish before exiting'); +});