Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
.DS_Store
yarn.lock
.vscode
173 changes: 90 additions & 83 deletions stream.js
Original file line number Diff line number Diff line change
@@ -1,189 +1,196 @@
'use strict'
"use strict";

var Transform = require('readable-stream').Transform
var duplexify = require('duplexify')
var WS = require('ws')
var Buffer = require('safe-buffer').Buffer
var Transform = require("readable-stream").Transform;
var duplexify = require("duplexify");
var WS = require("ws");
var Buffer = require("safe-buffer").Buffer;

module.exports = WebSocketStream
module.exports = WebSocketStream;

function buildProxy (options, socketWrite, socketEnd) {
function buildProxy(options, socketWrite, socketEnd) {
var proxy = new Transform({
objectMode: options.objectMode
})
objectMode: options.objectMode,
});

proxy._write = socketWrite
proxy._flush = socketEnd
proxy._write = socketWrite;
proxy._flush = socketEnd;

return proxy
return proxy;
}

function WebSocketStream(target, protocols, options) {
var stream, socket
var stream, socket;

var isBrowser = process.title === 'browser'
var isNative = !!global.WebSocket
var socketWrite = isBrowser ? socketWriteBrowser : socketWriteNode
var isBrowser =
process.title === "browser" || process.title === "reactnative";
var isNative = !!global.WebSocket;
var socketWrite = isBrowser ? socketWriteBrowser : socketWriteNode;

if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) {
if (protocols && !Array.isArray(protocols) && "object" === typeof protocols) {
// accept the "options" Object as the 2nd argument
options = protocols
protocols = null
options = protocols;
protocols = null;

if (typeof options.protocol === 'string' || Array.isArray(options.protocol)) {
if (
typeof options.protocol === "string" ||
Array.isArray(options.protocol)
) {
protocols = options.protocol;
}
}

if (!options) options = {}
if (!options) options = {};

if (options.objectMode === undefined) {
options.objectMode = !(options.binary === true || options.binary === undefined)
options.objectMode = !(
options.binary === true || options.binary === undefined
);
}

var proxy = buildProxy(options, socketWrite, socketEnd)
var proxy = buildProxy(options, socketWrite, socketEnd);

if (!options.objectMode) {
proxy._writev = writev
proxy._writev = writev;
}

// browser only: sets the maximum socket buffer size before throttling
var bufferSize = options.browserBufferSize || 1024 * 512
var bufferSize = options.browserBufferSize || 1024 * 512;

// browser only: how long to wait when throttling
var bufferTimeout = options.browserBufferTimeout || 1000
var bufferTimeout = options.browserBufferTimeout || 1000;

// use existing WebSocket object that was passed in
if (typeof target === 'object') {
socket = target
// otherwise make a new one
if (typeof target === "object") {
socket = target;
// otherwise make a new one
} else {
// special constructor treatment for native websockets in browsers, see
// https://github.com/maxogden/websocket-stream/issues/82
if (isNative && isBrowser) {
socket = new WS(target, protocols)
var additionalHeaders = !!(options && options.headers);
if (isNative && isBrowser && !additionalHeaders) {
socket = new WS(target, protocols);
} else {
socket = new WS(target, protocols, options)
socket = new WS(target, protocols, options);
}

socket.binaryType = 'arraybuffer'
socket.binaryType = "arraybuffer";
}

// according to https://github.com/baygeldin/ws-streamify/issues/1
// Nodejs WebSocketServer cause memory leak
// Handlers like onerror, onclose, onmessage and onopen are accessible via setter/getter
// And setter first of all fires removeAllListeners, that doesnt make inner array of clients on WebSocketServer cleared ever
var eventListenerSupport = ('undefined' === typeof socket.addEventListener)
var eventListenerSupport = "undefined" === typeof socket.addEventListener;

// was already open when passed in
if (socket.readyState === socket.OPEN) {
stream = proxy
stream = proxy;
} else {
stream = stream = duplexify(undefined, undefined, options)
stream = stream = duplexify(undefined, undefined, options);
if (!options.objectMode) {
stream._writev = writev
stream._writev = writev;
}

if (eventListenerSupport) {
socket.addEventListener('open', onopen)
socket.addEventListener("open", onopen);
} else {
socket.onopen = onopen
socket.onopen = onopen;
}
}

stream.socket = socket
stream.socket = socket;

if (eventListenerSupport) {
socket.addEventListener('close', onclose)
socket.addEventListener('error', onerror)
socket.addEventListener('message', onmessage)
socket.addEventListener("close", onclose);
socket.addEventListener("error", onerror);
socket.addEventListener("message", onmessage);
} else {
socket.onclose = onclose
socket.onerror = onerror
socket.onmessage = onmessage
socket.onclose = onclose;
socket.onerror = onerror;
socket.onmessage = onmessage;
}

proxy.on('close', destroy)
proxy.on("close", destroy);

var coerceToBuffer = !options.objectMode
var coerceToBuffer = !options.objectMode;

function socketWriteNode(chunk, enc, next) {
// avoid errors, this never happens unless
// destroy() is called
if (socket.readyState !== socket.OPEN) {
next()
return
next();
return;
}

if (coerceToBuffer && typeof chunk === 'string') {
chunk = Buffer.from(chunk, 'utf8')
if (coerceToBuffer && typeof chunk === "string") {
chunk = Buffer.from(chunk, "utf8");
}
socket.send(chunk, next)
socket.send(chunk, next);
}

function socketWriteBrowser(chunk, enc, next) {
if (socket.bufferedAmount > bufferSize) {
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
return
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next);
return;
}

if (coerceToBuffer && typeof chunk === 'string') {
chunk = Buffer.from(chunk, 'utf8')
if (coerceToBuffer && typeof chunk === "string") {
chunk = Buffer.from(chunk, "utf8");
}

try {
socket.send(chunk)
} catch(err) {
return next(err)
socket.send(chunk);
} catch (err) {
return next(err);
}

next()
next();
}

function socketEnd(done) {
socket.close()
done()
socket.close();
done();
}

function onopen() {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
stream.setReadable(proxy);
stream.setWritable(proxy);
stream.emit("connect");
}

function onclose() {
stream.end()
stream.destroy()
stream.end();
stream.destroy();
}

function onerror(err) {
stream.destroy(err)
stream.destroy(err);
}

function onmessage(event) {
var data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
var data = event.data;
if (data instanceof ArrayBuffer) data = Buffer.from(data);
else data = Buffer.from(data, "utf8");
proxy.push(data);
}

function destroy() {
socket.close()
socket.close();
}

// this is to be enabled only if objectMode is false
function writev (chunks, cb) {
var buffers = new Array(chunks.length)
function writev(chunks, cb) {
var buffers = new Array(chunks.length);
for (var i = 0; i < chunks.length; i++) {
if (typeof chunks[i].chunk === 'string') {
buffers[i] = Buffer.from(chunks[i], 'utf8')
if (typeof chunks[i].chunk === "string") {
buffers[i] = Buffer.from(chunks[i], "utf8");
} else {
buffers[i] = chunks[i].chunk
buffers[i] = chunks[i].chunk;
}
}

this._write(Buffer.concat(buffers), 'binary', cb)
this._write(Buffer.concat(buffers), "binary", cb);
}

return stream
return stream;
}