diff --git a/example.js b/example.js index 5585c19..7ffe72b 100644 --- a/example.js +++ b/example.js @@ -1,17 +1,17 @@ -var duplexify = require('duplexify') -var http = require('http') +const duplexify = require('duplexify') +const http = require('http') -var request = function(opts) { - var req = http.request(opts) - var dup = duplexify() +const request = function (opts) { + const req = http.request(opts) + const dup = duplexify() dup.setWritable(req) - req.on('response', function(res) { + req.on('response', function (res) { dup.setReadable(res) }) return dup } -var req = request({ +const req = request({ method: 'GET', host: 'www.google.com', port: 80 diff --git a/index.js b/index.js index 3430fe4..3fc3e58 100644 --- a/index.js +++ b/index.js @@ -1,29 +1,27 @@ -var stream = require('readable-stream') -var eos = require('end-of-stream') -var inherits = require('inherits') -var shift = require('stream-shift') +const { Duplex, Readable, Writable } = require('stream') +const { Buffer } = require('buffer') +const eos = require('end-of-stream') +const shift = require('stream-shift') -var SIGNAL_FLUSH = (Buffer.from && Buffer.from !== Uint8Array.from) - ? Buffer.from([0]) - : new Buffer([0]) +const SIGNAL_FLUSH = Buffer.from([0]) -var onuncork = function(self, fn) { - if (self._corked) self.once('uncork', fn) +function onuncork (ctx, fn) { + if (ctx._corked) ctx.once('uncork', fn) else fn() } -var autoDestroy = function (self, err) { - if (self._autoDestroy) self.destroy(err) +function autoDestroy (ctx, err) { + if (ctx._autoDestroy) ctx.destroy(err) } -var destroyer = function(self, end) { - return function(err) { - if (err) autoDestroy(self, err.message === 'premature close' ? null : err) - else if (end && !self._ended) self.end() +function destroyer (ctx, end) { + return function (err) { + if (err) autoDestroy(ctx, err.message === 'premature close' ? null : err) + else if (end && !ctx._ended) ctx.end() } } -var end = function(ws, fn) { +function end (ws, fn) { if (!ws) return fn() if (ws._writableState && ws._writableState.finished) return fn() if (ws._writableState) return ws.end(fn) @@ -31,208 +29,252 @@ var end = function(ws, fn) { fn() } -var noop = function() {} - -var toStreams2 = function(rs) { - return new (stream.Readable)({objectMode:true, highWaterMark:16}).wrap(rs) +function toStreams2 (rs) { + return new Readable({ objectMode: true, highWaterMark: 16 }).wrap(rs) } -var Duplexify = function(writable, readable, opts) { - if (!(this instanceof Duplexify)) return new Duplexify(writable, readable, opts) - stream.Duplex.call(this, opts) - - this._writable = null - this._readable = null - this._readable2 = null - - this._autoDestroy = !opts || opts.autoDestroy !== false - this._forwardDestroy = !opts || opts.destroy !== false - this._forwardEnd = !opts || opts.end !== false - this._corked = 1 // start corked - this._ondrain = null - this._drained = false - this._forwarding = false - this._unwrite = null - this._unread = null - this._ended = false - - this.destroyed = false - - if (writable) this.setWritable(writable) - if (readable) this.setReadable(readable) -} +class Duplexing extends Duplex { + constructor (writable, readable, opts) { + super(opts) -inherits(Duplexify, stream.Duplex) + this._writable = null + this._readable = null + this._readable2 = null -Duplexify.obj = function(writable, readable, opts) { - if (!opts) opts = {} - opts.objectMode = true - opts.highWaterMark = 16 - return new Duplexify(writable, readable, opts) -} - -Duplexify.prototype.cork = function() { - if (++this._corked === 1) this.emit('cork') -} - -Duplexify.prototype.uncork = function() { - if (this._corked && --this._corked === 0) this.emit('uncork') -} + this._autoDestroy = !opts || opts.autoDestroy !== false + this._forwardDestroy = !opts || opts.destroy !== false + this._forwardEnd = !opts || opts.end !== false + this._corked = 1 // start corked + this._ondrain = null + this._drained = false + this._forwarding = false + this._unwrite = null + this._unread = null + this._ended = false -Duplexify.prototype.setWritable = function(writable) { - if (this._unwrite) this._unwrite() + this.destroyed = false - if (this.destroyed) { - if (writable && writable.destroy) writable.destroy() - return + if (writable) { this.setWritable(writable) } + if (readable) { this.setReadable(readable) } } - if (writable === null || writable === false) { - this.end() - return + cork () { + if (++this._corked === 1) { this.emit('cork') } } - var self = this - var unend = eos(writable, {writable:true, readable:false}, destroyer(this, this._forwardEnd)) + uncork () { + if (this._corked && --this._corked === 0) { this.emit('uncork') } + } - var ondrain = function() { - var ondrain = self._ondrain - self._ondrain = null - if (ondrain) ondrain() + setWritable (writable) { + if (this._unwrite) { + this._unwrite() + } + + if (this.destroyed) { + if (writable && writable.destroy) { + writable.destroy() + } + return + } + + if (writable === null || writable === false) { + this.end() + return + } + + const unend = eos(writable, { + writable: true, + readable: false + }, destroyer(this, this._forwardEnd)) + + const ondrain = () => { + const ondrain = this._ondrain + this._ondrain = null + if (ondrain) { ondrain() } + } + + const clear = () => { + this._writable.removeListener('drain', ondrain) + unend() + } + + if (this._unwrite) { + // force a drain on stream reset to avoid livelocks + process.nextTick(ondrain) + } + + this._writable = writable + this._writable.on('drain', ondrain) + this._unwrite = clear + + this.uncork() // always uncork setWritable } - var clear = function() { - self._writable.removeListener('drain', ondrain) - unend() + setReadable (readable) { + if (this._unread) { this._unread() } + + if (this.destroyed) { + if (readable && readable.destroy) { + readable.destroy() + } + return + } + + if (readable === null || readable === false) { + this.push(null) + this.resume() + return + } + + const unend = eos(readable, { + writable: false, + readable: true + }, destroyer(this)) + + const onreadable = () => { + this._forward() + } + + const onend = () => { + this.push(null) + } + + const clear = () => { + this._readable2.removeListener('readable', onreadable) + this._readable2.removeListener('end', onend) + unend() + } + + this._drained = true + this._readable = readable + this._readable2 = readable._readableState + ? readable + : toStreams2(readable) + this._readable2.on('readable', onreadable) + this._readable2.on('end', onend) + this._unread = clear + + this._forward() } - if (this._unwrite) process.nextTick(ondrain) // force a drain on stream reset to avoid livelocks + _read () { + this._drained = true + this._forward() + } - this._writable = writable - this._writable.on('drain', ondrain) - this._unwrite = clear + _forward () { + if (this._forwarding || !this._readable2 || !this._drained) { + return + } - this.uncork() // always uncork setWritable -} + this._forwarding = true -Duplexify.prototype.setReadable = function(readable) { - if (this._unread) this._unread() + let data - if (this.destroyed) { - if (readable && readable.destroy) readable.destroy() - return - } + while (this._drained && (data = shift(this._readable2)) !== null) { + if (this.destroyed) { continue } + this._drained = this.push(data) + } - if (readable === null || readable === false) { - this.push(null) - this.resume() - return + this._forwarding = false } - var self = this - var unend = eos(readable, {writable:false, readable:true}, destroyer(this)) - - var onreadable = function() { - self._forward() - } + destroy (err, cb) { + if (this.destroyed) { + return cb && cb(null) + } - var onend = function() { - self.push(null) - } + this.destroyed = true - var clear = function() { - self._readable2.removeListener('readable', onreadable) - self._readable2.removeListener('end', onend) - unend() + process.nextTick(() => { + this._destroy(err) + cb && cb(null) + }) } - this._drained = true - this._readable = readable - this._readable2 = readable._readableState ? readable : toStreams2(readable) - this._readable2.on('readable', onreadable) - this._readable2.on('end', onend) - this._unread = clear - - this._forward() -} + _destroy (err) { + if (err) { + const ondrain = this._ondrain + this._ondrain = null -Duplexify.prototype._read = function() { - this._drained = true - this._forward() -} + if (ondrain) { ondrain(err) } else { this.emit('error', err) } + } -Duplexify.prototype._forward = function() { - if (this._forwarding || !this._readable2 || !this._drained) return - this._forwarding = true + if (this._forwardDestroy) { + if (this._readable && this._readable.destroy) { + this._readable.destroy() + } - var data + if (this._writable && this._writable.destroy) { + this._writable.destroy() + } + } - while (this._drained && (data = shift(this._readable2)) !== null) { - if (this.destroyed) continue - this._drained = this.push(data) + this.emit('close') } - this._forwarding = false -} - -Duplexify.prototype.destroy = function(err, cb) { - if (!cb) cb = noop - if (this.destroyed) return cb(null) - this.destroyed = true - - var self = this - process.nextTick(function() { - self._destroy(err) - cb(null) - }) -} - -Duplexify.prototype._destroy = function(err) { - if (err) { - var ondrain = this._ondrain - this._ondrain = null - if (ondrain) ondrain(err) - else this.emit('error', err) + _write (data, enc, cb) { + if (this.destroyed) { return } + if (this._corked) { + return onuncork(this, this._write.bind(this, data, enc, cb)) + } + if (data === SIGNAL_FLUSH) { + return this._finish(cb) + } + + if (!this._writable) { + return cb() + } + + if (this._writable.write(data) === false) { + this._ondrain = cb + } else if (!this.destroyed) { + cb() + } } - if (this._forwardDestroy) { - if (this._readable && this._readable.destroy) this._readable.destroy() - if (this._writable && this._writable.destroy) this._writable.destroy() + _finish (cb) { + this.emit('preend') + + onuncork(this, () => { + end(this._forwardEnd && this._writable, () => { + // haxx to not emit prefinish twice + if (this._writableState.prefinished === false) { + this._writableState.prefinished = true + } + this.emit('prefinish') + onuncork(this, cb) + }) + }) } - this.emit('close') -} + end (data, enc, cb) { + if (typeof data === 'function') return this.end(null, null, data) + if (typeof enc === 'function') return this.end(data, null, enc) -Duplexify.prototype._write = function(data, enc, cb) { - if (this.destroyed) return - if (this._corked) return onuncork(this, this._write.bind(this, data, enc, cb)) - if (data === SIGNAL_FLUSH) return this._finish(cb) - if (!this._writable) return cb() + this._ended = true - if (this._writable.write(data) === false) this._ondrain = cb - else if (!this.destroyed) cb() -} + if (data) this.write(data) -Duplexify.prototype._finish = function(cb) { - var self = this - this.emit('preend') - onuncork(this, function() { - end(self._forwardEnd && self._writable, function() { - // haxx to not emit prefinish twice - if (self._writableState.prefinished === false) self._writableState.prefinished = true - self.emit('prefinish') - onuncork(self, cb) - }) - }) -} + if (!this._writableState.ending && !this._writableState.destroyed) { + this.write(SIGNAL_FLUSH) + } -Duplexify.prototype.end = function(data, enc, cb) { - if (typeof data === 'function') return this.end(null, null, data) - if (typeof enc === 'function') return this.end(data, null, enc) - this._ended = true - if (data) this.write(data) - if (!this._writableState.ending && !this._writableState.destroyed) this.write(SIGNAL_FLUSH) - return stream.Writable.prototype.end.call(this, cb) + return Writable.prototype.end.call(this, cb) + } + + static obj (writable, readable, opts = {}) { + opts.objectMode = true + opts.highWaterMark = 16 + return new Duplexing(writable, readable, opts) + } } -module.exports = Duplexify +// Use proxy to call class constructor without new keyword +// For backward compatibility +module.exports = new Proxy(Duplexing, { + apply (Target, thisArg, argumentsList) { + return new Target(...argumentsList) + } +}) diff --git a/package.json b/package.json index 2758df9..98db7ab 100644 --- a/package.json +++ b/package.json @@ -4,15 +4,13 @@ "description": "Turn a writable and readable stream into a streams2 duplex stream with support for async initialization and streams1/streams2 input", "main": "index.js", "dependencies": { - "end-of-stream": "^1.4.1", - "inherits": "^2.0.3", - "readable-stream": "^3.1.1", - "stream-shift": "^1.0.0" + "end-of-stream": "^1.4.4", + "stream-shift": "^1.0.1" }, "devDependencies": { - "concat-stream": "^1.5.2", - "tape": "^4.0.0", - "through2": "^2.0.0" + "concat-stream": "^2.0.0", + "tape": "^5.3.1", + "through2": "^4.0.2" }, "scripts": { "test": "tape test.js" diff --git a/test.js b/test.js index 9341105..1408d7c 100644 --- a/test.js +++ b/test.js @@ -1,7 +1,7 @@ var tape = require('tape') var through = require('through2') var concat = require('concat-stream') -var stream = require('readable-stream') +var stream = require('stream') var net = require('net') var duplexify = require('./')