From 6a922fde1cf4423916c6740aa46e2630b593d32a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 20:31:04 +0200 Subject: [PATCH 01/13] rm inherit --- index.js | 1 - package.json | 1 - 2 files changed, 2 deletions(-) diff --git a/index.js b/index.js index 3430fe4..40619e4 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,5 @@ var stream = require('readable-stream') var eos = require('end-of-stream') -var inherits = require('inherits') var shift = require('stream-shift') var SIGNAL_FLUSH = (Buffer.from && Buffer.from !== Uint8Array.from) diff --git a/package.json b/package.json index 2758df9..c042363 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,6 @@ "main": "index.js", "dependencies": { "end-of-stream": "^1.4.1", - "inherits": "^2.0.3", "readable-stream": "^3.1.1", "stream-shift": "^1.0.0" }, From 2e0de659495c820a88122d2a9d5e8dc4d3b92d23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 20:31:26 +0200 Subject: [PATCH 02/13] convert to class --- index.js | 379 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 198 insertions(+), 181 deletions(-) diff --git a/index.js b/index.js index 40619e4..af98014 100644 --- a/index.js +++ b/index.js @@ -2,9 +2,7 @@ var stream = require('readable-stream') var eos = require('end-of-stream') var shift = require('stream-shift') -var SIGNAL_FLUSH = (Buffer.from && Buffer.from !== Uint8Array.from) - ? Buffer.from([0]) - : new Buffer([0]) +var SIGNAL_FLUSH = Buffer.from([0]) var onuncork = function(self, fn) { if (self._corked) self.once('uncork', fn) @@ -36,202 +34,221 @@ var toStreams2 = function(rs) { return new (stream.Readable)({objectMode:true, highWaterMark:16}).wrap(rs) } -var Duplexify = function(writable, readable, opts) { - if (!(this instanceof Duplexify)) return new Duplexify(writable, readable, opts) - stream.Duplex.call(this, opts) - - this._writable = null - this._readable = null - this._readable2 = null - - this._autoDestroy = !opts || opts.autoDestroy !== false - this._forwardDestroy = !opts || opts.destroy !== false - this._forwardEnd = !opts || opts.end !== false - this._corked = 1 // start corked - this._ondrain = null - this._drained = false - this._forwarding = false - this._unwrite = null - this._unread = null - this._ended = false - - this.destroyed = false - - if (writable) this.setWritable(writable) - if (readable) this.setReadable(readable) -} - -inherits(Duplexify, stream.Duplex) - -Duplexify.obj = function(writable, readable, opts) { - if (!opts) opts = {} - opts.objectMode = true - opts.highWaterMark = 16 - return new Duplexify(writable, readable, opts) -} +class Duplexing extends stream.Duplex { + constructor (writable, readable, opts) { + super(opts) -Duplexify.prototype.cork = function() { - if (++this._corked === 1) this.emit('cork') -} - -Duplexify.prototype.uncork = function() { - if (this._corked && --this._corked === 0) this.emit('uncork') -} + this._writable = null + this._readable = null + this._readable2 = null -Duplexify.prototype.setWritable = function(writable) { - if (this._unwrite) this._unwrite() - - if (this.destroyed) { - if (writable && writable.destroy) writable.destroy() - return + this._autoDestroy = !opts || opts.autoDestroy !== false + this._forwardDestroy = !opts || opts.destroy !== false + this._forwardEnd = !opts || opts.end !== false + this._corked = 1 // start corked + this._ondrain = null + this._drained = false + this._forwarding = false + this._unwrite = null + this._unread = null + this._ended = false + + this.destroyed = false + + if (writable) + this.setWritable(writable) + if (readable) + this.setReadable(readable) } - - if (writable === null || writable === false) { - this.end() - return + cork() { + if (++this._corked === 1) + this.emit('cork') } - - var self = this - var unend = eos(writable, {writable:true, readable:false}, destroyer(this, this._forwardEnd)) - - var ondrain = function() { - var ondrain = self._ondrain - self._ondrain = null - if (ondrain) ondrain() + uncork() { + if (this._corked && --this._corked === 0) + this.emit('uncork') } - - var clear = function() { - self._writable.removeListener('drain', ondrain) - unend() + setWritable(writable) { + if (this._unwrite) + this._unwrite() + + if (this.destroyed) { + if (writable && writable.destroy) + writable.destroy() + return + } + + if (writable === null || writable === false) { + this.end() + return + } + + var self = this + var unend = eos(writable, { writable: true, readable: false }, destroyer(this, this._forwardEnd)) + + var ondrain = function () { + var ondrain = self._ondrain + self._ondrain = null + if (ondrain) + ondrain() + } + + var clear = function () { + self._writable.removeListener('drain', ondrain) + unend() + } + + if (this._unwrite) + process.nextTick(ondrain) // force a drain on stream reset to avoid livelocks + + this._writable = writable + this._writable.on('drain', ondrain) + this._unwrite = clear + + this.uncork() // always uncork setWritable } - - if (this._unwrite) process.nextTick(ondrain) // force a drain on stream reset to avoid livelocks - - this._writable = writable - this._writable.on('drain', ondrain) - this._unwrite = clear - - this.uncork() // always uncork setWritable -} - -Duplexify.prototype.setReadable = function(readable) { - if (this._unread) this._unread() - - if (this.destroyed) { - if (readable && readable.destroy) readable.destroy() - return + setReadable(readable) { + if (this._unread) + this._unread() + + if (this.destroyed) { + if (readable && readable.destroy) + readable.destroy() + return + } + + if (readable === null || readable === false) { + this.push(null) + this.resume() + return + } + + var self = this + var unend = eos(readable, { writable: false, readable: true }, destroyer(this)) + + var onreadable = function () { + self._forward() + } + + var onend = function () { + self.push(null) + } + + var clear = function () { + self._readable2.removeListener('readable', onreadable) + self._readable2.removeListener('end', onend) + unend() + } + + this._drained = true + this._readable = readable + this._readable2 = readable._readableState ? readable : toStreams2(readable) + this._readable2.on('readable', onreadable) + this._readable2.on('end', onend) + this._unread = clear + + this._forward() } - - if (readable === null || readable === false) { - this.push(null) - this.resume() - return + _read() { + this._drained = true + this._forward() } + _forward() { + if (this._forwarding || !this._readable2 || !this._drained) + return + this._forwarding = true - var self = this - var unend = eos(readable, {writable:false, readable:true}, destroyer(this)) + var data - var onreadable = function() { - self._forward() - } + while (this._drained && (data = shift(this._readable2)) !== null) { + if (this.destroyed) + continue + this._drained = this.push(data) + } - var onend = function() { - self.push(null) + this._forwarding = false } - - var clear = function() { - self._readable2.removeListener('readable', onreadable) - self._readable2.removeListener('end', onend) - unend() + destroy(err, cb) { + if (!cb) + cb = noop + if (this.destroyed) + return cb(null) + this.destroyed = true + + var self = this + process.nextTick(function () { + self._destroy(err) + cb(null) + }) } - - this._drained = true - this._readable = readable - this._readable2 = readable._readableState ? readable : toStreams2(readable) - this._readable2.on('readable', onreadable) - this._readable2.on('end', onend) - this._unread = clear - - this._forward() -} - -Duplexify.prototype._read = function() { - this._drained = true - this._forward() -} - -Duplexify.prototype._forward = function() { - if (this._forwarding || !this._readable2 || !this._drained) return - this._forwarding = true - - var data - - while (this._drained && (data = shift(this._readable2)) !== null) { - if (this.destroyed) continue - this._drained = this.push(data) + _destroy(err) { + if (err) { + var ondrain = this._ondrain + this._ondrain = null + if (ondrain) + ondrain(err) + else + this.emit('error', err) + } + + if (this._forwardDestroy) { + if (this._readable && this._readable.destroy) + this._readable.destroy() + if (this._writable && this._writable.destroy) + this._writable.destroy() + } + + this.emit('close') } - - this._forwarding = false -} - -Duplexify.prototype.destroy = function(err, cb) { - if (!cb) cb = noop - if (this.destroyed) return cb(null) - this.destroyed = true - - var self = this - process.nextTick(function() { - self._destroy(err) - cb(null) - }) -} - -Duplexify.prototype._destroy = function(err) { - if (err) { - var ondrain = this._ondrain - this._ondrain = null - if (ondrain) ondrain(err) - else this.emit('error', err) + _write(data, enc, cb) { + if (this.destroyed) + return + if (this._corked) + return onuncork(this, this._write.bind(this, data, enc, cb)) + if (data === SIGNAL_FLUSH) + return this._finish(cb) + if (!this._writable) + return cb() + + if (this._writable.write(data) === false) + this._ondrain = cb + else if (!this.destroyed) + cb() } - - if (this._forwardDestroy) { - if (this._readable && this._readable.destroy) this._readable.destroy() - if (this._writable && this._writable.destroy) this._writable.destroy() + _finish(cb) { + var self = this + this.emit('preend') + onuncork(this, function () { + end(self._forwardEnd && self._writable, function () { + // haxx to not emit prefinish twice + if (self._writableState.prefinished === false) + self._writableState.prefinished = true + self.emit('prefinish') + onuncork(self, cb) + }) + }) } - this.emit('close') -} - -Duplexify.prototype._write = function(data, enc, cb) { - if (this.destroyed) return - if (this._corked) return onuncork(this, this._write.bind(this, data, enc, cb)) - if (data === SIGNAL_FLUSH) return this._finish(cb) - if (!this._writable) return cb() - - if (this._writable.write(data) === false) this._ondrain = cb - else if (!this.destroyed) cb() -} - -Duplexify.prototype._finish = function(cb) { - var self = this - this.emit('preend') - onuncork(this, function() { - end(self._forwardEnd && self._writable, function() { - // haxx to not emit prefinish twice - if (self._writableState.prefinished === false) self._writableState.prefinished = true - self.emit('prefinish') - onuncork(self, cb) - }) - }) -} + end(data, enc, cb) { + if (typeof data === 'function') + return this.end(null, null, data) + if (typeof enc === 'function') + return this.end(data, null, enc) + this._ended = true + if (data) + this.write(data) + if (!this._writableState.ending && !this._writableState.destroyed) + this.write(SIGNAL_FLUSH) + return stream.Writable.prototype.end.call(this, cb) + } -Duplexify.prototype.end = function(data, enc, cb) { - if (typeof data === 'function') return this.end(null, null, data) - if (typeof enc === 'function') return this.end(data, null, enc) - this._ended = true - if (data) this.write(data) - if (!this._writableState.ending && !this._writableState.destroyed) this.write(SIGNAL_FLUSH) - return stream.Writable.prototype.end.call(this, cb) + static obj(writable, readable, opts) { + if (!opts) + opts = {} + opts.objectMode = true + opts.highWaterMark = 16 + return new Duplexing(writable, readable, opts) + } } module.exports = Duplexify From 116d447ee59a9251555c931325a6660d03e8f0ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 20:31:37 +0200 Subject: [PATCH 03/13] proxy for backward comp --- index.js | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index af98014..c325ee5 100644 --- a/index.js +++ b/index.js @@ -251,4 +251,10 @@ class Duplexing extends stream.Duplex { } } -module.exports = Duplexify +// Use proxy to call class constructor without new keyword +// For backward compatibility +module.exports = new Proxy(Duplexing, { + apply (target, thisArg, argumentsList) { + return new target(...argumentsList); + } +}) From 6757bd0d0a12f805aa5ac1de73ebe825b77957ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 20:34:07 +0200 Subject: [PATCH 04/13] update dep --- package.json | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index c042363..9b5f327 100644 --- a/package.json +++ b/package.json @@ -4,14 +4,14 @@ "description": "Turn a writable and readable stream into a streams2 duplex stream with support for async initialization and streams1/streams2 input", "main": "index.js", "dependencies": { - "end-of-stream": "^1.4.1", - "readable-stream": "^3.1.1", - "stream-shift": "^1.0.0" + "end-of-stream": "^1.4.4", + "readable-stream": "^3.6.0", + "stream-shift": "^1.0.1" }, "devDependencies": { - "concat-stream": "^1.5.2", - "tape": "^4.0.0", - "through2": "^2.0.0" + "concat-stream": "^2.0.0", + "tape": "^5.3.1", + "through2": "^4.0.2" }, "scripts": { "test": "tape test.js" From a1d6bcd834c6fc0059bf6c916003b6e5fda7ce6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 20:43:15 +0200 Subject: [PATCH 05/13] standard --fix --- example.js | 14 ++--- index.js | 178 +++++++++++++++++++++++------------------------------ 2 files changed, 85 insertions(+), 107 deletions(-) diff --git a/example.js b/example.js index 5585c19..7ffe72b 100644 --- a/example.js +++ b/example.js @@ -1,17 +1,17 @@ -var duplexify = require('duplexify') -var http = require('http') +const duplexify = require('duplexify') +const http = require('http') -var request = function(opts) { - var req = http.request(opts) - var dup = duplexify() +const request = function (opts) { + const req = http.request(opts) + const dup = duplexify() dup.setWritable(req) - req.on('response', function(res) { + req.on('response', function (res) { dup.setReadable(res) }) return dup } -var req = request({ +const req = request({ method: 'GET', host: 'www.google.com', port: 80 diff --git a/index.js b/index.js index c325ee5..62583ad 100644 --- a/index.js +++ b/index.js @@ -1,26 +1,26 @@ -var stream = require('readable-stream') -var eos = require('end-of-stream') -var shift = require('stream-shift') +const stream = require('readable-stream') +const eos = require('end-of-stream') +const shift = require('stream-shift') -var SIGNAL_FLUSH = Buffer.from([0]) +const SIGNAL_FLUSH = Buffer.from([0]) -var onuncork = function(self, fn) { +const onuncork = function (self, fn) { if (self._corked) self.once('uncork', fn) else fn() } -var autoDestroy = function (self, err) { +const autoDestroy = function (self, err) { if (self._autoDestroy) self.destroy(err) } -var destroyer = function(self, end) { - return function(err) { +const destroyer = function (self, end) { + return function (err) { if (err) autoDestroy(self, err.message === 'premature close' ? null : err) else if (end && !self._ended) self.end() } } -var end = function(ws, fn) { +const end = function (ws, fn) { if (!ws) return fn() if (ws._writableState && ws._writableState.finished) return fn() if (ws._writableState) return ws.end(fn) @@ -28,10 +28,10 @@ var end = function(ws, fn) { fn() } -var noop = function() {} +const noop = function () {} -var toStreams2 = function(rs) { - return new (stream.Readable)({objectMode:true, highWaterMark:16}).wrap(rs) +const toStreams2 = function (rs) { + return new stream.Readable({ objectMode: true, highWaterMark: 16 }).wrap(rs) } class Duplexing extends stream.Duplex { @@ -55,26 +55,23 @@ class Duplexing extends stream.Duplex { this.destroyed = false - if (writable) - this.setWritable(writable) - if (readable) - this.setReadable(readable) + if (writable) { this.setWritable(writable) } + if (readable) { this.setReadable(readable) } } - cork() { - if (++this._corked === 1) - this.emit('cork') + + cork () { + if (++this._corked === 1) { this.emit('cork') } } - uncork() { - if (this._corked && --this._corked === 0) - this.emit('uncork') + + uncork () { + if (this._corked && --this._corked === 0) { this.emit('uncork') } } - setWritable(writable) { - if (this._unwrite) - this._unwrite() + + setWritable (writable) { + if (this._unwrite) { this._unwrite() } if (this.destroyed) { - if (writable && writable.destroy) - writable.destroy() + if (writable && writable.destroy) { writable.destroy() } return } @@ -83,23 +80,21 @@ class Duplexing extends stream.Duplex { return } - var self = this - var unend = eos(writable, { writable: true, readable: false }, destroyer(this, this._forwardEnd)) + const self = this + const unend = eos(writable, { writable: true, readable: false }, destroyer(this, this._forwardEnd)) - var ondrain = function () { - var ondrain = self._ondrain + const ondrain = function () { + const ondrain = self._ondrain self._ondrain = null - if (ondrain) - ondrain() + if (ondrain) { ondrain() } } - var clear = function () { + const clear = function () { self._writable.removeListener('drain', ondrain) unend() } - if (this._unwrite) - process.nextTick(ondrain) // force a drain on stream reset to avoid livelocks + if (this._unwrite) { process.nextTick(ondrain) } // force a drain on stream reset to avoid livelocks this._writable = writable this._writable.on('drain', ondrain) @@ -107,13 +102,12 @@ class Duplexing extends stream.Duplex { this.uncork() // always uncork setWritable } - setReadable(readable) { - if (this._unread) - this._unread() + + setReadable (readable) { + if (this._unread) { this._unread() } if (this.destroyed) { - if (readable && readable.destroy) - readable.destroy() + if (readable && readable.destroy) { readable.destroy() } return } @@ -123,18 +117,18 @@ class Duplexing extends stream.Duplex { return } - var self = this - var unend = eos(readable, { writable: false, readable: true }, destroyer(this)) + const self = this + const unend = eos(readable, { writable: false, readable: true }, destroyer(this)) - var onreadable = function () { + const onreadable = function () { self._forward() } - var onend = function () { + const onend = function () { self.push(null) } - var clear = function () { + const clear = function () { self._readable2.removeListener('readable', onreadable) self._readable2.removeListener('end', onend) unend() @@ -149,102 +143,86 @@ class Duplexing extends stream.Duplex { this._forward() } - _read() { + + _read () { this._drained = true this._forward() } - _forward() { - if (this._forwarding || !this._readable2 || !this._drained) - return + + _forward () { + if (this._forwarding || !this._readable2 || !this._drained) { return } this._forwarding = true - var data + let data while (this._drained && (data = shift(this._readable2)) !== null) { - if (this.destroyed) - continue + if (this.destroyed) { continue } this._drained = this.push(data) } this._forwarding = false } - destroy(err, cb) { - if (!cb) - cb = noop - if (this.destroyed) - return cb(null) + + destroy (err, cb) { + if (!cb) { cb = noop } + if (this.destroyed) { return cb(null) } this.destroyed = true - var self = this + const self = this process.nextTick(function () { self._destroy(err) cb(null) }) } - _destroy(err) { + + _destroy (err) { if (err) { - var ondrain = this._ondrain + const ondrain = this._ondrain this._ondrain = null - if (ondrain) - ondrain(err) - else - this.emit('error', err) + if (ondrain) { ondrain(err) } else { this.emit('error', err) } } if (this._forwardDestroy) { - if (this._readable && this._readable.destroy) - this._readable.destroy() - if (this._writable && this._writable.destroy) - this._writable.destroy() + if (this._readable && this._readable.destroy) { this._readable.destroy() } + if (this._writable && this._writable.destroy) { this._writable.destroy() } } this.emit('close') } - _write(data, enc, cb) { - if (this.destroyed) - return - if (this._corked) - return onuncork(this, this._write.bind(this, data, enc, cb)) - if (data === SIGNAL_FLUSH) - return this._finish(cb) - if (!this._writable) - return cb() - - if (this._writable.write(data) === false) - this._ondrain = cb - else if (!this.destroyed) - cb() + + _write (data, enc, cb) { + if (this.destroyed) { return } + if (this._corked) { return onuncork(this, this._write.bind(this, data, enc, cb)) } + if (data === SIGNAL_FLUSH) { return this._finish(cb) } + if (!this._writable) { return cb() } + + if (this._writable.write(data) === false) { this._ondrain = cb } else if (!this.destroyed) { cb() } } - _finish(cb) { - var self = this + + _finish (cb) { + const self = this this.emit('preend') onuncork(this, function () { end(self._forwardEnd && self._writable, function () { // haxx to not emit prefinish twice - if (self._writableState.prefinished === false) - self._writableState.prefinished = true + if (self._writableState.prefinished === false) { self._writableState.prefinished = true } self.emit('prefinish') onuncork(self, cb) }) }) } - end(data, enc, cb) { - if (typeof data === 'function') - return this.end(null, null, data) - if (typeof enc === 'function') - return this.end(data, null, enc) + end (data, enc, cb) { + if (typeof data === 'function') { return this.end(null, null, data) } + if (typeof enc === 'function') { return this.end(data, null, enc) } this._ended = true - if (data) - this.write(data) - if (!this._writableState.ending && !this._writableState.destroyed) - this.write(SIGNAL_FLUSH) + if (data) { this.write(data) } + if (!this._writableState.ending && !this._writableState.destroyed) { this.write(SIGNAL_FLUSH) } return stream.Writable.prototype.end.call(this, cb) } - static obj(writable, readable, opts) { - if (!opts) - opts = {} + static obj (writable, readable, opts) { + if (!opts) { opts = {} } opts.objectMode = true opts.highWaterMark = 16 return new Duplexing(writable, readable, opts) @@ -255,6 +233,6 @@ class Duplexing extends stream.Duplex { // For backward compatibility module.exports = new Proxy(Duplexing, { apply (target, thisArg, argumentsList) { - return new target(...argumentsList); + return new target(...argumentsList) } }) From e0d56d2fd301e7b6736866634dd0767c5b4b0613 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 20:46:45 +0200 Subject: [PATCH 06/13] default args --- index.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/index.js b/index.js index 62583ad..0447189 100644 --- a/index.js +++ b/index.js @@ -163,8 +163,7 @@ class Duplexing extends stream.Duplex { this._forwarding = false } - destroy (err, cb) { - if (!cb) { cb = noop } + destroy (err, cb = noop) { if (this.destroyed) { return cb(null) } this.destroyed = true @@ -221,8 +220,7 @@ class Duplexing extends stream.Duplex { return stream.Writable.prototype.end.call(this, cb) } - static obj (writable, readable, opts) { - if (!opts) { opts = {} } + static obj (writable, readable, opts = {}) { opts.objectMode = true opts.highWaterMark = 16 return new Duplexing(writable, readable, opts) From 59418403727d68dab57ea687f67364fd94687b0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 20:46:58 +0200 Subject: [PATCH 07/13] destruct --- index.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/index.js b/index.js index 0447189..9734be2 100644 --- a/index.js +++ b/index.js @@ -1,4 +1,4 @@ -const stream = require('readable-stream') +const { Duplex, Readable, Writable } = require('readable-stream') const eos = require('end-of-stream') const shift = require('stream-shift') @@ -31,10 +31,10 @@ const end = function (ws, fn) { const noop = function () {} const toStreams2 = function (rs) { - return new stream.Readable({ objectMode: true, highWaterMark: 16 }).wrap(rs) + return new Readable({ objectMode: true, highWaterMark: 16 }).wrap(rs) } -class Duplexing extends stream.Duplex { +class Duplexing extends Duplex { constructor (writable, readable, opts) { super(opts) @@ -217,7 +217,7 @@ class Duplexing extends stream.Duplex { this._ended = true if (data) { this.write(data) } if (!this._writableState.ending && !this._writableState.destroyed) { this.write(SIGNAL_FLUSH) } - return stream.Writable.prototype.end.call(this, cb) + return Writable.prototype.end.call(this, cb) } static obj (writable, readable, opts = {}) { From e63cf66b73c469ff57bbe40cf1f1f05a626a503f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 20:50:16 +0200 Subject: [PATCH 08/13] got rid of noop --- index.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/index.js b/index.js index 9734be2..3ed8cc0 100644 --- a/index.js +++ b/index.js @@ -28,8 +28,6 @@ const end = function (ws, fn) { fn() } -const noop = function () {} - const toStreams2 = function (rs) { return new Readable({ objectMode: true, highWaterMark: 16 }).wrap(rs) } @@ -163,14 +161,14 @@ class Duplexing extends Duplex { this._forwarding = false } - destroy (err, cb = noop) { - if (this.destroyed) { return cb(null) } + destroy (err, cb) { + if (this.destroyed) { return cb && cb(null) } this.destroyed = true const self = this process.nextTick(function () { self._destroy(err) - cb(null) + cb && cb(null) }) } From 0a2e8e88e553059101c5d088f8907a11f2651aaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 20:55:20 +0200 Subject: [PATCH 09/13] rm self variable --- index.js | 64 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/index.js b/index.js index 3ed8cc0..018ddde 100644 --- a/index.js +++ b/index.js @@ -4,19 +4,19 @@ const shift = require('stream-shift') const SIGNAL_FLUSH = Buffer.from([0]) -const onuncork = function (self, fn) { - if (self._corked) self.once('uncork', fn) +const onuncork = function (ctx, fn) { + if (ctx._corked) ctx.once('uncork', fn) else fn() } -const autoDestroy = function (self, err) { - if (self._autoDestroy) self.destroy(err) +const autoDestroy = function (ctx, err) { + if (ctx._autoDestroy) ctx.destroy(err) } -const destroyer = function (self, end) { +const destroyer = function (ctx, end) { return function (err) { - if (err) autoDestroy(self, err.message === 'premature close' ? null : err) - else if (end && !self._ended) self.end() + if (err) autoDestroy(ctx, err.message === 'premature close' ? null : err) + else if (end && !ctx._ended) ctx.end() } } @@ -78,17 +78,16 @@ class Duplexing extends Duplex { return } - const self = this const unend = eos(writable, { writable: true, readable: false }, destroyer(this, this._forwardEnd)) - const ondrain = function () { - const ondrain = self._ondrain - self._ondrain = null + const ondrain = () => { + const ondrain = this._ondrain + this._ondrain = null if (ondrain) { ondrain() } } - const clear = function () { - self._writable.removeListener('drain', ondrain) + const clear = () => { + this._writable.removeListener('drain', ondrain) unend() } @@ -115,20 +114,22 @@ class Duplexing extends Duplex { return } - const self = this - const unend = eos(readable, { writable: false, readable: true }, destroyer(this)) + const unend = eos(readable, { + writable: false, + readable: true + }, destroyer(this)) - const onreadable = function () { - self._forward() + const onreadable = () => { + this._forward() } - const onend = function () { - self.push(null) + const onend = () => { + this.push(null) } - const clear = function () { - self._readable2.removeListener('readable', onreadable) - self._readable2.removeListener('end', onend) + const clear = () => { + this._readable2.removeListener('readable', onreadable) + this._readable2.removeListener('end', onend) unend() } @@ -165,9 +166,8 @@ class Duplexing extends Duplex { if (this.destroyed) { return cb && cb(null) } this.destroyed = true - const self = this - process.nextTick(function () { - self._destroy(err) + process.nextTick(() => { + this._destroy(err) cb && cb(null) }) } @@ -197,14 +197,16 @@ class Duplexing extends Duplex { } _finish (cb) { - const self = this this.emit('preend') - onuncork(this, function () { - end(self._forwardEnd && self._writable, function () { + + onuncork(this, () => { + end(this._forwardEnd && this._writable, () => { // haxx to not emit prefinish twice - if (self._writableState.prefinished === false) { self._writableState.prefinished = true } - self.emit('prefinish') - onuncork(self, cb) + if (this._writableState.prefinished === false) { + this._writableState.prefinished = true + } + this.emit('prefinish') + onuncork(this, cb) }) }) } From cad429d87a319d09b87d1be8367977722f9012d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 20:59:25 +0200 Subject: [PATCH 10/13] standard fix, remove some `const` --- index.js | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/index.js b/index.js index 018ddde..b48d57f 100644 --- a/index.js +++ b/index.js @@ -4,23 +4,23 @@ const shift = require('stream-shift') const SIGNAL_FLUSH = Buffer.from([0]) -const onuncork = function (ctx, fn) { +function onuncork (ctx, fn) { if (ctx._corked) ctx.once('uncork', fn) else fn() } -const autoDestroy = function (ctx, err) { +function autoDestroy (ctx, err) { if (ctx._autoDestroy) ctx.destroy(err) } -const destroyer = function (ctx, end) { +function destroyer (ctx, end) { return function (err) { if (err) autoDestroy(ctx, err.message === 'premature close' ? null : err) else if (end && !ctx._ended) ctx.end() } } -const end = function (ws, fn) { +function end (ws, fn) { if (!ws) return fn() if (ws._writableState && ws._writableState.finished) return fn() if (ws._writableState) return ws.end(fn) @@ -28,7 +28,8 @@ const end = function (ws, fn) { fn() } -const toStreams2 = function (rs) { +/** @param {Readable} rs */ +function toStreams2 (rs) { return new Readable({ objectMode: true, highWaterMark: 16 }).wrap(rs) } @@ -230,7 +231,7 @@ class Duplexing extends Duplex { // Use proxy to call class constructor without new keyword // For backward compatibility module.exports = new Proxy(Duplexing, { - apply (target, thisArg, argumentsList) { - return new target(...argumentsList) + apply (Target, thisArg, argumentsList) { + return new Target(...argumentsList) } }) From 2b1eec28f8de381512fab50d8d082cae8f5a7c3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 21:12:29 +0200 Subject: [PATCH 11/13] fix long lines --- index.js | 80 ++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 19 deletions(-) diff --git a/index.js b/index.js index b48d57f..0d0a05c 100644 --- a/index.js +++ b/index.js @@ -28,7 +28,6 @@ function end (ws, fn) { fn() } -/** @param {Readable} rs */ function toStreams2 (rs) { return new Readable({ objectMode: true, highWaterMark: 16 }).wrap(rs) } @@ -67,10 +66,14 @@ class Duplexing extends Duplex { } setWritable (writable) { - if (this._unwrite) { this._unwrite() } + if (this._unwrite) { + this._unwrite() + } if (this.destroyed) { - if (writable && writable.destroy) { writable.destroy() } + if (writable && writable.destroy) { + writable.destroy() + } return } @@ -79,7 +82,10 @@ class Duplexing extends Duplex { return } - const unend = eos(writable, { writable: true, readable: false }, destroyer(this, this._forwardEnd)) + const unend = eos(writable, { + writable: true, + readable: false + }, destroyer(this, this._forwardEnd)) const ondrain = () => { const ondrain = this._ondrain @@ -92,7 +98,10 @@ class Duplexing extends Duplex { unend() } - if (this._unwrite) { process.nextTick(ondrain) } // force a drain on stream reset to avoid livelocks + if (this._unwrite) { + // force a drain on stream reset to avoid livelocks + process.nextTick(ondrain) + } this._writable = writable this._writable.on('drain', ondrain) @@ -105,7 +114,9 @@ class Duplexing extends Duplex { if (this._unread) { this._unread() } if (this.destroyed) { - if (readable && readable.destroy) { readable.destroy() } + if (readable && readable.destroy) { + readable.destroy() + } return } @@ -136,7 +147,9 @@ class Duplexing extends Duplex { this._drained = true this._readable = readable - this._readable2 = readable._readableState ? readable : toStreams2(readable) + this._readable2 = readable._readableState + ? readable + : toStreams2(readable) this._readable2.on('readable', onreadable) this._readable2.on('end', onend) this._unread = clear @@ -150,7 +163,10 @@ class Duplexing extends Duplex { } _forward () { - if (this._forwarding || !this._readable2 || !this._drained) { return } + if (this._forwarding || !this._readable2 || !this._drained) { + return + } + this._forwarding = true let data @@ -164,7 +180,10 @@ class Duplexing extends Duplex { } destroy (err, cb) { - if (this.destroyed) { return cb && cb(null) } + if (this.destroyed) { + return cb && cb(null) + } + this.destroyed = true process.nextTick(() => { @@ -177,12 +196,18 @@ class Duplexing extends Duplex { if (err) { const ondrain = this._ondrain this._ondrain = null + if (ondrain) { ondrain(err) } else { this.emit('error', err) } } if (this._forwardDestroy) { - if (this._readable && this._readable.destroy) { this._readable.destroy() } - if (this._writable && this._writable.destroy) { this._writable.destroy() } + if (this._readable && this._readable.destroy) { + this._readable.destroy() + } + + if (this._writable && this._writable.destroy) { + this._writable.destroy() + } } this.emit('close') @@ -190,11 +215,22 @@ class Duplexing extends Duplex { _write (data, enc, cb) { if (this.destroyed) { return } - if (this._corked) { return onuncork(this, this._write.bind(this, data, enc, cb)) } - if (data === SIGNAL_FLUSH) { return this._finish(cb) } - if (!this._writable) { return cb() } + if (this._corked) { + return onuncork(this, this._write.bind(this, data, enc, cb)) + } + if (data === SIGNAL_FLUSH) { + return this._finish(cb) + } - if (this._writable.write(data) === false) { this._ondrain = cb } else if (!this.destroyed) { cb() } + if (!this._writable) { + return cb() + } + + if (this._writable.write(data) === false) { + this._ondrain = cb + } else if (!this.destroyed) { + cb() + } } _finish (cb) { @@ -213,11 +249,17 @@ class Duplexing extends Duplex { } end (data, enc, cb) { - if (typeof data === 'function') { return this.end(null, null, data) } - if (typeof enc === 'function') { return this.end(data, null, enc) } + if (typeof data === 'function') return this.end(null, null, data) + if (typeof enc === 'function') return this.end(data, null, enc) + this._ended = true - if (data) { this.write(data) } - if (!this._writableState.ending && !this._writableState.destroyed) { this.write(SIGNAL_FLUSH) } + + if (data) this.write(data) + + if (!this._writableState.ending && !this._writableState.destroyed) { + this.write(SIGNAL_FLUSH) + } + return Writable.prototype.end.call(this, cb) } From 4b2d8b4c1862afee1144860a1ac4e237fd59e436 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 21:15:16 +0200 Subject: [PATCH 12/13] use built in stream --- index.js | 2 +- package.json | 1 - test.js | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/index.js b/index.js index 0d0a05c..409888c 100644 --- a/index.js +++ b/index.js @@ -1,4 +1,4 @@ -const { Duplex, Readable, Writable } = require('readable-stream') +const { Duplex, Readable, Writable } = require('stream') const eos = require('end-of-stream') const shift = require('stream-shift') diff --git a/package.json b/package.json index 9b5f327..98db7ab 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,6 @@ "main": "index.js", "dependencies": { "end-of-stream": "^1.4.4", - "readable-stream": "^3.6.0", "stream-shift": "^1.0.1" }, "devDependencies": { diff --git a/test.js b/test.js index 9341105..1408d7c 100644 --- a/test.js +++ b/test.js @@ -1,7 +1,7 @@ var tape = require('tape') var through = require('through2') var concat = require('concat-stream') -var stream = require('readable-stream') +var stream = require('stream') var net = require('net') var duplexify = require('./') From aa593dc033e5a20f8c917144996b7f5016cdb039 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Mon, 20 Sep 2021 21:17:02 +0200 Subject: [PATCH 13/13] no undef buf --- index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/index.js b/index.js index 409888c..3fc3e58 100644 --- a/index.js +++ b/index.js @@ -1,4 +1,5 @@ const { Duplex, Readable, Writable } = require('stream') +const { Buffer } = require('buffer') const eos = require('end-of-stream') const shift = require('stream-shift')