From 234cf19c45f90c741f4d2cf372c699fa2e7db610 Mon Sep 17 00:00:00 2001 From: Isaac Wolkerstorfer Date: Mon, 3 Jul 2017 14:50:54 -0700 Subject: [PATCH 1/3] Small test changes --- test/integration/cas.test.js | 3 ++- test/responder.js | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/test/integration/cas.test.js b/test/integration/cas.test.js index 376cc30..76b1d31 100644 --- a/test/integration/cas.test.js +++ b/test/integration/cas.test.js @@ -4,7 +4,7 @@ var MemcacheClient = require('../../lib/memcacheclient') test("can use cas with memcache", function(t) { var server = new MemcacheServer() - var client = new MemcacheClient({ servers: [ server.host ] }) + var client = new MemcacheClient({ servers: [ server.host ], unref: false }) client.flush(0, function(err, result) { t.error(err, "should get no error") @@ -37,6 +37,7 @@ test("can use cas with memcache", function(t) { client.flush(0, function(err, result) { t.error(err, "should get no error") t.equals(result, "OK", "should get result OK for a FLUSH") + client.end() server.end() t.end() }) diff --git a/test/responder.js b/test/responder.js index 507971f..665c7b6 100644 --- a/test/responder.js +++ b/test/responder.js @@ -10,11 +10,9 @@ function Responder() { Responder.prototype = { send: function(message) { - console.log("send") this.input.push(message) }, recv: function() { - console.log("recv") var responder = this.responders.shift() return responder.apply(null, arguments) }, From 3a80f9d8bbb4f0d83fda1821599939234d15e1a9 Mon Sep 17 00:00:00 2001 From: Isaac Wolkerstorfer Date: Mon, 3 Jul 2017 14:51:07 -0700 Subject: [PATCH 2/3] Add tsconfig.json --- lib/commandqueue.js | 74 +++++++++---------- lib/connectionpool.js | 147 ++++++++++++++++---------------------- lib/delimitedstream.js | 2 +- lib/memcachesocket.js | 2 +- src/commandqueue.ts | 41 +++++++++++ src/connectionpool.ts | 87 ++++++++++++++++++++++ test/commandqueue.test.js | 4 +- tsconfig.json | 53 ++++++++++++++ 8 files changed, 280 insertions(+), 130 deletions(-) create mode 100644 src/commandqueue.ts create mode 100644 src/connectionpool.ts create mode 100644 tsconfig.json diff --git a/lib/commandqueue.js b/lib/commandqueue.js index cacaa3d..fb320c2 100644 --- a/lib/commandqueue.js +++ b/lib/commandqueue.js @@ -1,46 +1,38 @@ -module.exports = CommandQueue - +"use strict"; /** * CommandQueue manages a queue of commands to be executed on a resource. * Commands pushed in are themselves simply functions that take two arguments, * the resource and a callback for when they are finished. */ -function CommandQueue(resource) { - if (!(this instanceof CommandQueue)) - return new CommandQueue(resource) - - this.queue = [] - this.active = null - this.resource = resource -} - -CommandQueue.prototype = {} - -/** - * Add a command to be executed once the resource becomes available. If the queue - * is not currently busy, immediately invoke this command. - */ -CommandQueue.prototype.push = function push(command) { - this.queue.push(command) - if (!this.active) this._next() -} - -/** - * Internal - run the next command, if there is one - * - * If there is no next command, this.active will be set to undefined, marking - * the queue as idle - */ -CommandQueue.prototype._next = function _next() { - var command = this.active = this.queue.shift() - - // If there is no command to run, we idle - if (!command) return - - // When the command finishes, try to run the next command - var self = this - command(this.resource, function() { - self._next() - }) -} - +var CommandQueue = (function () { + function CommandQueue(resource) { + this.resource = resource; + this.queue = []; + } + /** + * Add a command to be executed once the resource becomes available. If the queue + * is not currently busy, immediately invoke this command. + */ + CommandQueue.prototype.push = function (command) { + this.queue.push(command); + if (!this.active) + this._next(); + }; + /** + * Internal - run the next command, if there is one + * + * If there is no next command, this.active will be set to undefined, marking + * the queue as idle + */ + CommandQueue.prototype._next = function () { + var _this = this; + this.active = this.queue.shift(); + // If there is no command to run, we idle + if (this.active === undefined) + return; + // When the command finishes, try to run the next command + this.active(this.resource, function () { return _this._next(); }); + }; + return CommandQueue; +}()); +module.exports = CommandQueue; diff --git a/lib/connectionpool.js b/lib/connectionpool.js index d0651c6..5b82fa2 100644 --- a/lib/connectionpool.js +++ b/lib/connectionpool.js @@ -1,85 +1,62 @@ -module.exports = ConnectionPool - -/** - * A ConnectionPool manages connections to a set of backend servers. - * - * It takes a connect function and a get_server function as arguments. - * - * - The connect function takes a server spec in the form of a string of - * "host:port" and returns an open connection. - * - The get_server function takes a key and returns the correct server spec. - */ -function ConnectionPool(connect, get_server) { - if (!connect) - throw "You must specify a connection function" - if (!get_server) - throw "You must specify a get_server function to map a key to a server" - - this.connections = {} - this.get_server = get_server - this.connect = connect -} - -ConnectionPool.prototype = {} - -/** - * Get a connection for use. Takes a key or array of keys, and for each - * connection, ensures it is open and passes it to the callback along with an - * array of keys the given connection is responsible for. - */ -ConnectionPool.prototype.use = function(keys, cb) { - keys = keys instanceof Array ? keys : [keys] - - var keysByServer = this._groupKeys(keys) - , server - - for (server in keysByServer) - if (keysByServer.hasOwnProperty(server)) - cb(this._connection(server), keysByServer[server]) -} - -/** - * Close all open connections. They will be re-opened on-demand. - * - * The optional "end" parameter is a function that will be passed each open - * connection to close it. - */ -ConnectionPool.prototype.reset = function(end) { - end = end || function(conn) { conn.end() } - - for (var key in this.connections) - if (this.connections.hasOwnProperty(key)) - end(this.connections[key]) - - this.connections = {} -} - -/** - * Internal - Group keys by the server they are assigned to - */ -ConnectionPool.prototype._groupKeys = function(keys) { - var keysByServer = {} - - for (var i = 0; i < keys.length; i++) { - var server = this.get_server(keys[i]) - if (!keysByServer[server]) keysByServer[server] = [] - keysByServer[server].push(keys[i]) - } - - return keysByServer -} - -/** - * Internal - get the connection for the given server - */ -ConnectionPool.prototype._connection = function(server) { - var self = this - function onEnd() { - delete self.connections[server] - } - - if (!this.connections.hasOwnProperty(server)) - this.connections[server] = this.connect(server, onEnd) - - return this.connections[server] -} +"use strict"; +var ConnectionPool = (function () { + function ConnectionPool(connect, get_server) { + this.connect = connect; + this.get_server = get_server; + this.connections = {}; + if (!connect) + throw "You must specify a connection function"; + if (!get_server) + throw "You must specify a get_server function to map a key to a server"; + } + /** + * Get a connection for use. Takes a key or array of keys, and for each + * connection, ensures it is open and passes it to the callback along with an + * array of keys the given connection is responsible for. + */ + ConnectionPool.prototype.use = function (keys, cb) { + keys = keys instanceof Array ? keys : [keys]; + var keysByServer = this._groupKeys(keys); + for (var server in keysByServer) + if (keysByServer.hasOwnProperty(server)) + cb(this._connection(server), keysByServer[server]); + }; + /** + * Close all open connections. They will be re-opened on-demand. + * + * The optional "end" parameter is a function that will be passed each open + * connection to close it. + */ + ConnectionPool.prototype.reset = function (end) { + end = end || function (conn) { conn.end(); }; + for (var key in this.connections) + if (this.connections.hasOwnProperty(key)) + end(this.connections[key]); + this.connections = {}; + }; + /** + * Internal - Group keys by the server they are assigned to + */ + ConnectionPool.prototype._groupKeys = function (keys) { + var keysByServer = {}; + for (var i = 0; i < keys.length; i++) { + var server = this.get_server(keys[i]); + if (!keysByServer[server]) + keysByServer[server] = []; + keysByServer[server].push(keys[i]); + } + return keysByServer; + }; + /** + * Internal - get the connection for the given server + */ + ConnectionPool.prototype._connection = function (server) { + var _this = this; + if (!this.connections.hasOwnProperty(server)) { + this.connections[server] = this.connect(server, function () { return delete _this.connections[server]; }); + } + return this.connections[server]; + }; + return ConnectionPool; +}()); +module.exports = ConnectionPool; diff --git a/lib/delimitedstream.js b/lib/delimitedstream.js index a1a4326..6ccbd25 100644 --- a/lib/delimitedstream.js +++ b/lib/delimitedstream.js @@ -15,7 +15,7 @@ function DelimitedStream(stream, delimiter) { this.stream = stream this.open = true this.delimiter = Buffer.isBuffer(delimiter) ? delimiter : Buffer(delimiter) - this.queue = CommandQueue(stream) + this.queue = new CommandQueue(stream) // {Buffer} Data that we received after the end of the last message we // returned. diff --git a/lib/memcachesocket.js b/lib/memcachesocket.js index 25f98fb..492e040 100644 --- a/lib/memcachesocket.js +++ b/lib/memcachesocket.js @@ -27,7 +27,7 @@ function MemcacheSocket(socket, options) { this.socket = socket this.compiler = TextCommandCompiler this.stream = MemcacheStream(this.socket) - this.queue = CommandQueue(this.stream) + this.queue = new CommandQueue(this.stream) } MemcacheSocket.prototype = { diff --git a/src/commandqueue.ts b/src/commandqueue.ts new file mode 100644 index 0000000..d70b80c --- /dev/null +++ b/src/commandqueue.ts @@ -0,0 +1,41 @@ +type Command = (resource: R, cb: () => void) => void + +/** + * CommandQueue manages a queue of commands to be executed on a resource. + * Commands pushed in are themselves simply functions that take two arguments, + * the resource and a callback for when they are finished. + */ +class CommandQueue { + + queue: Command[] = [] + active: Command | undefined + + constructor(public resource: R) {} + + /** + * Add a command to be executed once the resource becomes available. If the queue + * is not currently busy, immediately invoke this command. + */ + push(command: Command): void { + this.queue.push(command) + if (!this.active) this._next() + } + + /** + * Internal - run the next command, if there is one + * + * If there is no next command, this.active will be set to undefined, marking + * the queue as idle + */ + private _next(): void { + this.active = this.queue.shift() + + // If there is no command to run, we idle + if (this.active === undefined) return + + // When the command finishes, try to run the next command + this.active(this.resource, () => this._next()) + } +} + +export = CommandQueue diff --git a/src/connectionpool.ts b/src/connectionpool.ts new file mode 100644 index 0000000..4af7a68 --- /dev/null +++ b/src/connectionpool.ts @@ -0,0 +1,87 @@ +/** + * A ConnectionPool manages connections to a set of backend servers. + * + * It takes a connect function and a get_server function as arguments. + * + * - The connect function takes a server spec in the form of a string of + * "host:port" and returns an open connection. + * - The get_server function takes a key and returns the correct server spec. + */ +interface Endable { + end: () => void +} + +class ConnectionPool { + + connections: { [host: string]: Connection } = {} + + constructor( + public connect: (hostspec: string, onEnd: Function) => Connection, + public get_server: (key: string) => string + ) { + if (!connect) + throw "You must specify a connection function" + if (!get_server) + throw "You must specify a get_server function to map a key to a server" + } + + /** + * Get a connection for use. Takes a key or array of keys, and for each + * connection, ensures it is open and passes it to the callback along with an + * array of keys the given connection is responsible for. + */ + use(keys: string | string[], cb: (c: Connection, k: string[]) => void) { + keys = keys instanceof Array ? keys : [keys] + + let keysByServer = this._groupKeys(keys) + + for (const server in keysByServer) + if (keysByServer.hasOwnProperty(server)) + cb(this._connection(server), keysByServer[server]) + } + + /** + * Close all open connections. They will be re-opened on-demand. + * + * The optional "end" parameter is a function that will be passed each open + * connection to close it. + */ + reset(end: (c: Connection) => void) { + end = end || function(conn) { conn.end() } + + for (const key in this.connections) + if (this.connections.hasOwnProperty(key)) + end(this.connections[key]) + + this.connections = {} + } + + /** + * Internal - Group keys by the server they are assigned to + */ + private _groupKeys(keys: string[]): { [h: string]: string[] } { + let keysByServer: { [h: string]: string[] } = {} + + for (let i = 0; i < keys.length; i++) { + var server = this.get_server(keys[i]) + if (!keysByServer[server]) keysByServer[server] = [] + keysByServer[server].push(keys[i]) + } + + return keysByServer + } + + /** + * Internal - get the connection for the given server + */ + private _connection(server: string): Connection { + if (!this.connections.hasOwnProperty(server)) { + this.connections[server] = this.connect( + server, () => delete this.connections[server]) + } + + return this.connections[server] + } +} + +export = ConnectionPool diff --git a/test/commandqueue.test.js b/test/commandqueue.test.js index a70bb97..f7df9dc 100644 --- a/test/commandqueue.test.js +++ b/test/commandqueue.test.js @@ -1,9 +1,9 @@ var CommandQueue = require('../lib/commandqueue') - , test = require('tap').test +var test = require('tap').test test("CommandQueue makes sure each command gets access to the resource in order", function(t){ var results = [] - var queue = CommandQueue(results) + var queue = new CommandQueue(results) var calls = 0 function command(msg) { diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..6214bd2 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,53 @@ +{ + "compilerOptions": { + /* Basic Options */ + "target": "es5", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', or 'ESNEXT'. */ + "module": "commonjs", /* Specify module code generation: 'commonjs', 'amd', 'system', 'umd', 'es2015', or 'ESNext'. */ + // "lib": [], /* Specify library files to be included in the compilation: */ + // "allowJs": true, /* Allow javascript files to be compiled. */ + // "checkJs": true, /* Report errors in .js files. */ + // "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', or 'react'. */ + // "declaration": true, /* Generates corresponding '.d.ts' file. */ + // "sourceMap": true, /* Generates corresponding '.map' file. */ + // "outFile": "./", /* Concatenate and emit output to single file. */ + "outDir": "./lib/", /* Redirect output structure to the directory. */ + "rootDir": "./src/", /* Specify the root directory of input files. Use to control the output directory structure with --outDir. */ + // "removeComments": true, /* Do not emit comments to output. */ + // "noEmit": true, /* Do not emit outputs. */ + // "importHelpers": true, /* Import emit helpers from 'tslib'. */ + // "downlevelIteration": true, /* Provide full support for iterables in 'for-of', spread, and destructuring when targeting 'ES5' or 'ES3'. */ + // "isolatedModules": true, /* Transpile each file as a separate module (similar to 'ts.transpileModule'). */ + + /* Strict Type-Checking Options */ + "strict": true /* Enable all strict type-checking options. */ + // "noImplicitAny": true, /* Raise error on expressions and declarations with an implied 'any' type. */ + // "strictNullChecks": true, /* Enable strict null checks. */ + // "noImplicitThis": true, /* Raise error on 'this' expressions with an implied 'any' type. */ + // "alwaysStrict": true, /* Parse in strict mode and emit "use strict" for each source file. */ + + /* Additional Checks */ + // "noUnusedLocals": true, /* Report errors on unused locals. */ + // "noUnusedParameters": true, /* Report errors on unused parameters. */ + // "noImplicitReturns": true, /* Report error when not all code paths in function return a value. */ + // "noFallthroughCasesInSwitch": true, /* Report errors for fallthrough cases in switch statement. */ + + /* Module Resolution Options */ + // "moduleResolution": "node", /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */ + // "baseUrl": "./", /* Base directory to resolve non-absolute module names. */ + // "paths": {}, /* A series of entries which re-map imports to lookup locations relative to the 'baseUrl'. */ + // "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */ + // "typeRoots": [], /* List of folders to include type definitions from. */ + // "types": [], /* Type declaration files to be included in compilation. */ + // "allowSyntheticDefaultImports": true, /* Allow default imports from modules with no default export. This does not affect code emit, just typechecking. */ + + /* Source Map Options */ + // "sourceRoot": "./", /* Specify the location where debugger should locate TypeScript files instead of source locations. */ + // "mapRoot": "./", /* Specify the location where debugger should locate map files instead of generated locations. */ + // "inlineSourceMap": true, /* Emit a single file with source maps instead of having a separate file. */ + // "inlineSources": true, /* Emit the source alongside the sourcemaps within a single file; requires '--inlineSourceMap' or '--sourceMap' to be set. */ + + /* Experimental Options */ + // "experimentalDecorators": true, /* Enables experimental support for ES7 decorators. */ + // "emitDecoratorMetadata": true, /* Enables experimental support for emitting type metadata for decorators. */ + } +} From 064825b2fb6ba64e94f34b5bb201add6bd3a1960 Mon Sep 17 00:00:00 2001 From: Isaac Wolkerstorfer Date: Mon, 3 Jul 2017 16:10:37 -0700 Subject: [PATCH 3/3] Make DelimitedStream typescript --- lib/delimitedstream.js | 313 +++++++++++++++++------------------ lib/memcachestream.js | 2 +- package.json | 1 + src/delimitedstream.ts | 179 ++++++++++++++++++++ test/delimitedstream.test.js | 28 ++-- 5 files changed, 341 insertions(+), 182 deletions(-) create mode 100644 src/delimitedstream.ts diff --git a/lib/delimitedstream.js b/lib/delimitedstream.js index 6ccbd25..acd7ecb 100644 --- a/lib/delimitedstream.js +++ b/lib/delimitedstream.js @@ -1,45 +1,24 @@ -var CommandQueue = require('./commandqueue') - -module.exports = DelimitedStream - -/** - * A DelimitedStream is a wrapper around a stream that provides two - * higher-level send and recv methods, designed to speak a protocol in which - * messages are terminated with a known control sequence, such as CRLF. - */ -function DelimitedStream(stream, delimiter) { - if (!(this instanceof DelimitedStream)) - return new DelimitedStream(stream, delimiter) - - var self = this - this.stream = stream - this.open = true - this.delimiter = Buffer.isBuffer(delimiter) ? delimiter : Buffer(delimiter) - this.queue = new CommandQueue(stream) - - // {Buffer} Data that we received after the end of the last message we - // returned. - this.leftovers = null - - this.stream.on('end', function() { - self.open = false - }) -} - -DelimitedStream.prototype = {} - -/** - * Write a message to the underlying stream, terminating it with the - * delimiter. - * - * Throws an error if the stream is closed - */ -DelimitedStream.prototype.send = function send(message) { - if (!this.open) throw new DisconnectedError("write") - this.stream.write(message) - this.stream.write(this.delimiter) -} - +"use strict"; +var __extends = (this && this.__extends) || (function () { + var extendStatics = Object.setPrototypeOf || + ({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) || + function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; }; + return function (d, b) { + extendStatics(d, b); + function __() { this.constructor = d; } + d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); + }; +})(); +var CommandQueue = require("./commandqueue"); +var DisconnectedError = (function (_super) { + __extends(DisconnectedError, _super); + function DisconnectedError(action) { + var _this = _super.call(this, "Attempted to " + action + " after the connection was closed") || this; + _this.name = "DisconnectedError"; + return _this; + } + return DisconnectedError; +}(Error)); /** * Finds the first location of a needle within a buffer, starting at an offset. * @param buffer {Buffer} @@ -48,129 +27,137 @@ DelimitedStream.prototype.send = function send(message) { * @returns {int} */ function indexOf(buffer, needle, offset) { - for (var i = offset || 0; i <= buffer.length - needle.length; i++) { - var found = true; - for (var j = 0; j < needle.length; j++) { - if (buffer[i + j] !== needle[j]) { - found = false; - break; - } + for (var i = offset || 0; i <= buffer.length - needle.length; i++) { + var found = true; + for (var j = 0; j < needle.length; j++) { + if (buffer[i + j] !== needle[j]) { + found = false; + break; + } + } + if (found) { + // Found a location where they are all the same + return i; + } } - - if (found) { - // Found a location where they are all the same - return i; - } - } - return -1; + return -1; } - /** - * Read a message from the underlying stream, terminated with the delimiter. - * - * If minimumBytes is specified, read a message of *at least* that, but possibly - * longer. Otherwise, return the first chunk that ends in the delimiter. Thus, - * not specifying minimumBytes is equivalent to 0. + * A DelimitedStream is a wrapper around a stream that provides two + * higher-level send and recv methods, designed to speak a protocol in which + * messages are terminated with a known control sequence, such as CRLF. */ -DelimitedStream.prototype.recv = function recv(minimumBytes, cb) { - var self = this - - if (typeof minimumBytes === 'function') cb = minimumBytes - minimumBytes|=0 - - // {Buffer[]} Data we have read from the stream already, stored here in case - // the underlying stream doesn't give us enough data to return in one chunk. - var acc = [] - var accTotalLength = 0; - - this.queue.push(function (stream, done) { - function consumeChunk(chunk) { - // Create a buffer in which to search for the delimiter: concat enough - // previous chunks that the delimiter can't be hiding on the border of two - // chunks. - // Importantly, we must use O(1) previous chunks, to keep in linear time. - var chunksToSearch = [chunk]; - var prependedBytes = 0; - for (var i = acc.length - 1; - i >= 0 && prependedBytes < self.delimiter.length; - i--) { - chunksToSearch.unshift(acc[i]) - prependedBytes += acc[i].length - } - var searchBuffer = Buffer.concat(chunksToSearch) - - // We only start the search after minimumBytes. - // If we haven't reached minimumBytes yet, this automatically fails fast. - var skippedBytes = accTotalLength - prependedBytes - var searchStart = minimumBytes - skippedBytes - searchStart = Math.max(0, searchStart) - var hitInSearchBuffer = indexOf(searchBuffer, self.delimiter, searchStart) - - if (hitInSearchBuffer === -1) { - // Didn't find the delimiter. Store the chunk and wait for more data. - acc.push(chunk) - accTotalLength += chunk.length - return false - } else { - // The delimiter was found, we can safely touch all of acc without - // losing linear time complexity. - acc.push(chunk) - var allData = Buffer.concat(acc) - - var hitInAllData = hitInSearchBuffer + skippedBytes - - // Put the remainder of chunk into leftovers, excluding the delimiter. - // This will often be empty, but that's ok. - self.leftovers = allData.slice(hitInAllData + self.delimiter.length) - - finish(null, allData.slice(0, hitInAllData)) - return true - } - } - - function read() { - var chunk = stream.read() - if (chunk) consumeChunk(chunk) +var DelimitedStream = (function () { + function DelimitedStream(stream, delimiter) { + var _this = this; + this.stream = stream; + this.open = true; + // {Buffer} Data that we received after the end of the last message we + // returned. + this.leftovers = null; + this.delimiter = Buffer.isBuffer(delimiter) ? delimiter : new Buffer(delimiter); + this.queue = new CommandQueue(stream); + this.stream.on('end', function () { return _this.open = false; }); } - - function finish(err, resp) { - // We may not have added these listeners, but it's safe to remove them - // anyway. - stream.removeListener('readable', read) - stream.removeListener('error', finish) - stream.removeListener('end', end) - cb(err, resp) - done() - } - - function end() { - finish(new DisconnectedError("read")) - } - - if (self.leftovers) { - // We have data from a previous read, consume it as if it was a read - var finished = consumeChunk(self.leftovers) - - if (finished) { - // The leftovers were enough to return a message. No need to touch the - // stream - return - } - } - - stream.on('readable', read) - stream.on('error', finish) - stream.on('end', end) - - if (!self.open) end() - else read() - }) - return true -} - -function DisconnectedError(action) { - this.name = "DisconnectedError"; - this.message = "Attempted to " + action + " after the connection was closed"; -} -DisconnectedError.prototye = Object.create(Error.prototype); -DelimitedStream.ReadAfterDisconnectedError = DisconnectedError; + /** + * Write a message to the underlying stream, terminating it with the + * delimiter. + * + * Throws an error if the stream is closed + */ + DelimitedStream.prototype.send = function (message) { + if (!this.open) + throw new DisconnectedError("write"); + this.stream.write(message); + this.stream.write(this.delimiter); + }; + DelimitedStream.prototype.recv = function (a, b) { + var _this = this; + var minimumBytes = (typeof a === 'number') ? a | 0 : 0; + var maybeCb = (typeof a === 'function') ? a : b; + if (maybeCb === undefined) { + throw new Error("No callback specified for recv"); + } + var cb = maybeCb; + // {Buffer[]} Data we have read from the stream already, stored here in case + // the underlying stream doesn't give us enough data to return in one chunk. + var acc = []; + var accTotalLength = 0; + this.queue.push(function (stream, done) { + var consumeChunk = function (chunk) { + // Create a buffer in which to search for the delimiter: concat enough + // previous chunks that the delimiter can't be hiding on the border of two + // chunks. + // Importantly, we must use O(1) previous chunks, to keep in linear time. + var chunksToSearch = [chunk]; + var prependedBytes = 0; + for (var i = acc.length - 1; i >= 0 && prependedBytes < _this.delimiter.length; i--) { + chunksToSearch.unshift(acc[i]); + prependedBytes += acc[i].length; + } + var searchBuffer = Buffer.concat(chunksToSearch); + // We only start the search after minimumBytes. + // If we haven't reached minimumBytes yet, this automatically fails fast. + var skippedBytes = accTotalLength - prependedBytes; + var searchStart = minimumBytes - skippedBytes; + searchStart = Math.max(0, searchStart); + var hitInSearchBuffer = indexOf(searchBuffer, _this.delimiter, searchStart); + if (hitInSearchBuffer === -1) { + // Didn't find the delimiter. Store the chunk and wait for more data. + acc.push(chunk); + accTotalLength += chunk.length; + return false; + } + else { + // The delimiter was found, we can safely touch all of acc without + // losing linear time complexity. + acc.push(chunk); + var allData = Buffer.concat(acc); + var hitInAllData = hitInSearchBuffer + skippedBytes; + // Put the remainder of chunk into leftovers, excluding the delimiter. + // This will often be empty, but that's ok. + _this.leftovers = allData.slice(hitInAllData + _this.delimiter.length); + finish(null, allData.slice(0, hitInAllData)); + return true; + } + }; + var read = function () { + var chunk = stream.read(); + if (chunk) + consumeChunk(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }; + var finish = function (err, resp) { + // We may not have added these listeners, but it's safe to remove them + // anyway. + stream.removeListener('readable', read); + stream.removeListener('error', finish); + stream.removeListener('end', end); + cb(err, resp); + done(); + }; + var end = function () { + finish(new DisconnectedError("read")); + }; + if (_this.leftovers) { + // We have data from a previous read, consume it as if it was a read + var finished = consumeChunk(_this.leftovers); + if (finished) { + // The leftovers were enough to return a message. No need to touch the + // stream + return; + } + } + stream.on('readable', read); + stream.on('error', finish); + stream.on('end', end); + if (!_this.open) + end(); + else + read(); + }); + return true; + }; + DelimitedStream.ReadAfterDisconnectedError = DisconnectedError; + return DelimitedStream; +}()); +module.exports = DelimitedStream; diff --git a/lib/memcachestream.js b/lib/memcachestream.js index 92c597a..ba8b9eb 100644 --- a/lib/memcachestream.js +++ b/lib/memcachestream.js @@ -11,7 +11,7 @@ function MemcacheStream(stream) { if (!(this instanceof MemcacheStream)) return new MemcacheStream(stream) - this.stream = DelimitedStream(stream, "\r\n") + this.stream = new DelimitedStream(stream, "\r\n") } MemcacheStream.prototype = {} diff --git a/package.json b/package.json index bcbb2f9..25b6d2f 100644 --- a/package.json +++ b/package.json @@ -10,6 +10,7 @@ "author": "", "license": "MIT", "dependencies": { + "@types/node": "^8.0.7", "callback-timeout": "~0.1.0" }, "devDependencies": { diff --git a/src/delimitedstream.ts b/src/delimitedstream.ts new file mode 100644 index 0000000..eeb35be --- /dev/null +++ b/src/delimitedstream.ts @@ -0,0 +1,179 @@ +import CommandQueue = require('./commandqueue') + +type ReadCallback = (e: Error | null, b?: Buffer) => void + +class DisconnectedError extends Error { + constructor(action: string) { + super("Attempted to " + action + " after the connection was closed") + this.name = "DisconnectedError" + } +} + +/** + * Finds the first location of a needle within a buffer, starting at an offset. + * @param buffer {Buffer} + * @param needle {Buffer} + * @param offset {int} + * @returns {int} + */ +function indexOf(buffer: Buffer, needle: Buffer, offset: number): number { + for (var i = offset || 0; i <= buffer.length - needle.length; i++) { + var found = true; + for (var j = 0; j < needle.length; j++) { + if (buffer[i + j] !== needle[j]) { + found = false; + break; + } + } + + if (found) { + // Found a location where they are all the same + return i; + } + } + return -1; +} + +/** + * A DelimitedStream is a wrapper around a stream that provides two + * higher-level send and recv methods, designed to speak a protocol in which + * messages are terminated with a known control sequence, such as CRLF. + */ +class DelimitedStream { + static ReadAfterDisconnectedError = DisconnectedError + + open: boolean = true + // {Buffer} Data that we received after the end of the last message we + // returned. + private leftovers: Buffer | null = null + + readonly queue: CommandQueue + readonly delimiter: Buffer + + constructor(public stream: Stream, delimiter: Buffer | string) { + this.delimiter = Buffer.isBuffer(delimiter) ? delimiter : new Buffer(delimiter) + this.queue = new CommandQueue(stream) + this.stream.on('end', () => this.open = false) + } + + /** + * Write a message to the underlying stream, terminating it with the + * delimiter. + * + * Throws an error if the stream is closed + */ + send(message: Buffer): void { + if (!this.open) throw new DisconnectedError("write") + this.stream.write(message) + this.stream.write(this.delimiter) + } + + /** + * Read a message from the underlying stream, terminated with the delimiter. + * + * If minimumBytes is specified, read a message of *at least* that, but possibly + * longer. Otherwise, return the first chunk that ends in the delimiter. Thus, + * not specifying minimumBytes is equivalent to 0. + */ + recv(cb: ReadCallback): boolean + recv(minimumBytes: number, cb: ReadCallback): boolean + recv(a: number | ReadCallback, b?: ReadCallback): boolean { + const minimumBytes: number = (typeof a === 'number') ? a|0 : 0 + const maybeCb: ReadCallback | undefined = (typeof a === 'function') ? a : b + if (maybeCb === undefined) { + throw new Error("No callback specified for recv") + } + const cb: ReadCallback = maybeCb + + // {Buffer[]} Data we have read from the stream already, stored here in case + // the underlying stream doesn't give us enough data to return in one chunk. + var acc: Buffer[] = [] + var accTotalLength = 0 + + this.queue.push((stream: Stream, done: () => void) => { + const consumeChunk = (chunk: Buffer): boolean => { + // Create a buffer in which to search for the delimiter: concat enough + // previous chunks that the delimiter can't be hiding on the border of two + // chunks. + // Importantly, we must use O(1) previous chunks, to keep in linear time. + var chunksToSearch = [chunk]; + var prependedBytes = 0; + for (var i = acc.length - 1; + i >= 0 && prependedBytes < this.delimiter.length; + i--) { + chunksToSearch.unshift(acc[i]) + prependedBytes += acc[i].length + } + var searchBuffer = Buffer.concat(chunksToSearch) + + // We only start the search after minimumBytes. + // If we haven't reached minimumBytes yet, this automatically fails fast. + var skippedBytes = accTotalLength - prependedBytes + var searchStart = minimumBytes - skippedBytes + searchStart = Math.max(0, searchStart) + var hitInSearchBuffer = indexOf(searchBuffer, this.delimiter, searchStart) + + if (hitInSearchBuffer === -1) { + // Didn't find the delimiter. Store the chunk and wait for more data. + acc.push(chunk) + accTotalLength += chunk.length + return false + } else { + // The delimiter was found, we can safely touch all of acc without + // losing linear time complexity. + acc.push(chunk) + var allData = Buffer.concat(acc) + + var hitInAllData = hitInSearchBuffer + skippedBytes + + // Put the remainder of chunk into leftovers, excluding the delimiter. + // This will often be empty, but that's ok. + this.leftovers = allData.slice(hitInAllData + this.delimiter.length) + + finish(null, allData.slice(0, hitInAllData)) + return true + } + } + + const read = () => { + var chunk = stream.read() + if (chunk) consumeChunk(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)) + } + + const finish: ReadCallback = (err, resp) => { + // We may not have added these listeners, but it's safe to remove them + // anyway. + stream.removeListener('readable', read) + stream.removeListener('error', finish) + stream.removeListener('end', end) + cb(err, resp) + done() + } + + const end = () => { + finish(new DisconnectedError("read")) + } + + if (this.leftovers) { + // We have data from a previous read, consume it as if it was a read + var finished = consumeChunk(this.leftovers) + + if (finished) { + // The leftovers were enough to return a message. No need to touch the + // stream + return + } + } + + stream.on('readable', read) + stream.on('error', finish) + stream.on('end', end) + + if (!this.open) end() + else read() + }) + return true + } +} + +export = DelimitedStream diff --git a/test/delimitedstream.test.js b/test/delimitedstream.test.js index f7ee808..77042e1 100644 --- a/test/delimitedstream.test.js +++ b/test/delimitedstream.test.js @@ -4,7 +4,7 @@ var DelimitedStream = require('../lib/delimitedstream') test("read delimited messages", function(t) { var stream = PassThrough() - , testStream = DelimitedStream(stream, "----") + , testStream = new DelimitedStream(stream, "----") testStream.recv(function(err, msg1) { t.error(err, "no error") @@ -29,7 +29,7 @@ test("read delimited messages", function(t) { test("split delimiter", function(t) { var stream = PassThrough() - , testStream = DelimitedStream(stream, "----") + , testStream = new DelimitedStream(stream, "----") testStream.recv(function(err, msg1) { t.error(err, "no error") @@ -51,7 +51,7 @@ test("split delimiter", function(t) { test("process messages in-order", function(t) { var stream = PassThrough() - , testStream = DelimitedStream(stream, " ") + , testStream = new DelimitedStream(stream, " ") stream.write("hello ") @@ -73,7 +73,7 @@ test("process messages in-order", function(t) { test("write delimited messages", function(t) { var stream = PassThrough() - , testStream = DelimitedStream(stream, "----") + , testStream = new DelimitedStream(stream, "----") testStream.send("foobar") testStream.send("hello") @@ -86,7 +86,7 @@ test("write delimited messages", function(t) { test("handle errors in the stream", function(t) { var stream = PassThrough() - , testStream = DelimitedStream(stream, "----") + , testStream = new DelimitedStream(stream, "----") testStream.recv(function(err, msg1) { t.error(err, "no error") @@ -104,28 +104,20 @@ test("handle errors in the stream", function(t) { test("handle end-of-stream", function(t) { var stream = PassThrough() - , testStream = DelimitedStream(stream, "----") + , testStream = new DelimitedStream(stream, "----") testStream.recv(function(err, msg1) { t.error(err, "no error") t.equal(msg1.toString(), "msg1", "msg1 should be intact") + var expected_error = testStream.recv(function(err, msg) { - t.same(err, { - name: "DisconnectedError", - message: "Attempted to read after the connection was closed" - }, "got error reading during end") + t.same(err, new DelimitedStream.ReadAfterDisconnectedError("read"), "got error reading during end") t.type(msg, 'undefined', "no response") t.throws(function() { testStream.send("foo") - }, { - name: "DisconnectedError", - message: "Attempted to write after the connection was closed" - }, "got error writing after end") + }, new DelimitedStream.ReadAfterDisconnectedError("write"), "got error writing after end") testStream.recv(function(err, msg) { - t.same(err, { - name: "DisconnectedError", - message: "Attempted to read after the connection was closed" - }, "got error reading after end") + t.same(err, new DelimitedStream.ReadAfterDisconnectedError("read"), "got error reading after end") t.type(msg, 'undefined', "no response") t.end() })