diff --git a/.gitignore b/.gitignore index ba8b8ca..21442e8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules .DS_Store yarn.lock +.vscode \ No newline at end of file diff --git a/stream.js b/stream.js index b066dce..bbd9d46 100644 --- a/stream.js +++ b/stream.js @@ -1,189 +1,196 @@ -'use strict' +"use strict"; -var Transform = require('readable-stream').Transform -var duplexify = require('duplexify') -var WS = require('ws') -var Buffer = require('safe-buffer').Buffer +var Transform = require("readable-stream").Transform; +var duplexify = require("duplexify"); +var WS = require("ws"); +var Buffer = require("safe-buffer").Buffer; -module.exports = WebSocketStream +module.exports = WebSocketStream; -function buildProxy (options, socketWrite, socketEnd) { +function buildProxy(options, socketWrite, socketEnd) { var proxy = new Transform({ - objectMode: options.objectMode - }) + objectMode: options.objectMode, + }); - proxy._write = socketWrite - proxy._flush = socketEnd + proxy._write = socketWrite; + proxy._flush = socketEnd; - return proxy + return proxy; } function WebSocketStream(target, protocols, options) { - var stream, socket + var stream, socket; - var isBrowser = process.title === 'browser' - var isNative = !!global.WebSocket - var socketWrite = isBrowser ? socketWriteBrowser : socketWriteNode + var isBrowser = + process.title === "browser" || process.title === "reactnative"; + var isNative = !!global.WebSocket; + var socketWrite = isBrowser ? socketWriteBrowser : socketWriteNode; - if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) { + if (protocols && !Array.isArray(protocols) && "object" === typeof protocols) { // accept the "options" Object as the 2nd argument - options = protocols - protocols = null + options = protocols; + protocols = null; - if (typeof options.protocol === 'string' || Array.isArray(options.protocol)) { + if ( + typeof options.protocol === "string" || + Array.isArray(options.protocol) + ) { protocols = options.protocol; } } - if (!options) options = {} + if (!options) options = {}; if (options.objectMode === undefined) { - options.objectMode = !(options.binary === true || options.binary === undefined) + options.objectMode = !( + options.binary === true || options.binary === undefined + ); } - var proxy = buildProxy(options, socketWrite, socketEnd) + var proxy = buildProxy(options, socketWrite, socketEnd); if (!options.objectMode) { - proxy._writev = writev + proxy._writev = writev; } // browser only: sets the maximum socket buffer size before throttling - var bufferSize = options.browserBufferSize || 1024 * 512 + var bufferSize = options.browserBufferSize || 1024 * 512; // browser only: how long to wait when throttling - var bufferTimeout = options.browserBufferTimeout || 1000 + var bufferTimeout = options.browserBufferTimeout || 1000; // use existing WebSocket object that was passed in - if (typeof target === 'object') { - socket = target - // otherwise make a new one + if (typeof target === "object") { + socket = target; + // otherwise make a new one } else { // special constructor treatment for native websockets in browsers, see // https://github.com/maxogden/websocket-stream/issues/82 - if (isNative && isBrowser) { - socket = new WS(target, protocols) + var additionalHeaders = !!(options && options.headers); + if (isNative && isBrowser && !additionalHeaders) { + socket = new WS(target, protocols); } else { - socket = new WS(target, protocols, options) + socket = new WS(target, protocols, options); } - socket.binaryType = 'arraybuffer' + socket.binaryType = "arraybuffer"; } - + // according to https://github.com/baygeldin/ws-streamify/issues/1 // Nodejs WebSocketServer cause memory leak // Handlers like onerror, onclose, onmessage and onopen are accessible via setter/getter // And setter first of all fires removeAllListeners, that doesnt make inner array of clients on WebSocketServer cleared ever - var eventListenerSupport = ('undefined' === typeof socket.addEventListener) + var eventListenerSupport = "undefined" === typeof socket.addEventListener; // was already open when passed in if (socket.readyState === socket.OPEN) { - stream = proxy + stream = proxy; } else { - stream = stream = duplexify(undefined, undefined, options) + stream = stream = duplexify(undefined, undefined, options); if (!options.objectMode) { - stream._writev = writev + stream._writev = writev; } - + if (eventListenerSupport) { - socket.addEventListener('open', onopen) + socket.addEventListener("open", onopen); } else { - socket.onopen = onopen + socket.onopen = onopen; } } - stream.socket = socket + stream.socket = socket; if (eventListenerSupport) { - socket.addEventListener('close', onclose) - socket.addEventListener('error', onerror) - socket.addEventListener('message', onmessage) + socket.addEventListener("close", onclose); + socket.addEventListener("error", onerror); + socket.addEventListener("message", onmessage); } else { - socket.onclose = onclose - socket.onerror = onerror - socket.onmessage = onmessage + socket.onclose = onclose; + socket.onerror = onerror; + socket.onmessage = onmessage; } - proxy.on('close', destroy) + proxy.on("close", destroy); - var coerceToBuffer = !options.objectMode + var coerceToBuffer = !options.objectMode; function socketWriteNode(chunk, enc, next) { // avoid errors, this never happens unless // destroy() is called if (socket.readyState !== socket.OPEN) { - next() - return + next(); + return; } - if (coerceToBuffer && typeof chunk === 'string') { - chunk = Buffer.from(chunk, 'utf8') + if (coerceToBuffer && typeof chunk === "string") { + chunk = Buffer.from(chunk, "utf8"); } - socket.send(chunk, next) + socket.send(chunk, next); } function socketWriteBrowser(chunk, enc, next) { if (socket.bufferedAmount > bufferSize) { - setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next) - return + setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next); + return; } - if (coerceToBuffer && typeof chunk === 'string') { - chunk = Buffer.from(chunk, 'utf8') + if (coerceToBuffer && typeof chunk === "string") { + chunk = Buffer.from(chunk, "utf8"); } try { - socket.send(chunk) - } catch(err) { - return next(err) + socket.send(chunk); + } catch (err) { + return next(err); } - next() + next(); } function socketEnd(done) { - socket.close() - done() + socket.close(); + done(); } function onopen() { - stream.setReadable(proxy) - stream.setWritable(proxy) - stream.emit('connect') + stream.setReadable(proxy); + stream.setWritable(proxy); + stream.emit("connect"); } function onclose() { - stream.end() - stream.destroy() + stream.end(); + stream.destroy(); } function onerror(err) { - stream.destroy(err) + stream.destroy(err); } function onmessage(event) { - var data = event.data - if (data instanceof ArrayBuffer) data = Buffer.from(data) - else data = Buffer.from(data, 'utf8') - proxy.push(data) + var data = event.data; + if (data instanceof ArrayBuffer) data = Buffer.from(data); + else data = Buffer.from(data, "utf8"); + proxy.push(data); } function destroy() { - socket.close() + socket.close(); } // this is to be enabled only if objectMode is false - function writev (chunks, cb) { - var buffers = new Array(chunks.length) + function writev(chunks, cb) { + var buffers = new Array(chunks.length); for (var i = 0; i < chunks.length; i++) { - if (typeof chunks[i].chunk === 'string') { - buffers[i] = Buffer.from(chunks[i], 'utf8') + if (typeof chunks[i].chunk === "string") { + buffers[i] = Buffer.from(chunks[i], "utf8"); } else { - buffers[i] = chunks[i].chunk + buffers[i] = chunks[i].chunk; } } - this._write(Buffer.concat(buffers), 'binary', cb) + this._write(Buffer.concat(buffers), "binary", cb); } - return stream + return stream; }