Skip to content
This repository was archived by the owner on Jan 16, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 33 additions & 41 deletions lib/commandqueue.js
Original file line number Diff line number Diff line change
@@ -1,46 +1,38 @@
module.exports = CommandQueue

"use strict";
/**
* CommandQueue manages a queue of commands to be executed on a resource.
* Commands pushed in are themselves simply functions that take two arguments,
* the resource and a callback for when they are finished.
*/
function CommandQueue(resource) {
if (!(this instanceof CommandQueue))
return new CommandQueue(resource)

this.queue = []
this.active = null
this.resource = resource
}

CommandQueue.prototype = {}

/**
* Add a command to be executed once the resource becomes available. If the queue
* is not currently busy, immediately invoke this command.
*/
CommandQueue.prototype.push = function push(command) {
this.queue.push(command)
if (!this.active) this._next()
}

/**
* Internal - run the next command, if there is one
*
* If there is no next command, this.active will be set to undefined, marking
* the queue as idle
*/
CommandQueue.prototype._next = function _next() {
var command = this.active = this.queue.shift()

// If there is no command to run, we idle
if (!command) return

// When the command finishes, try to run the next command
var self = this
command(this.resource, function() {
self._next()
})
}

var CommandQueue = (function () {
function CommandQueue(resource) {
this.resource = resource;
this.queue = [];
}
/**
* Add a command to be executed once the resource becomes available. If the queue
* is not currently busy, immediately invoke this command.
*/
CommandQueue.prototype.push = function (command) {
this.queue.push(command);
if (!this.active)
this._next();
};
/**
* Internal - run the next command, if there is one
*
* If there is no next command, this.active will be set to undefined, marking
* the queue as idle
*/
CommandQueue.prototype._next = function () {
var _this = this;
this.active = this.queue.shift();
// If there is no command to run, we idle
if (this.active === undefined)
return;
// When the command finishes, try to run the next command
this.active(this.resource, function () { return _this._next(); });
};
return CommandQueue;
}());
module.exports = CommandQueue;
147 changes: 62 additions & 85 deletions lib/connectionpool.js
Original file line number Diff line number Diff line change
@@ -1,85 +1,62 @@
module.exports = ConnectionPool

/**
* A ConnectionPool manages connections to a set of backend servers.
*
* It takes a connect function and a get_server function as arguments.
*
* - The connect function takes a server spec in the form of a string of
* "host:port" and returns an open connection.
* - The get_server function takes a key and returns the correct server spec.
*/
function ConnectionPool(connect, get_server) {
if (!connect)
throw "You must specify a connection function"
if (!get_server)
throw "You must specify a get_server function to map a key to a server"

this.connections = {}
this.get_server = get_server
this.connect = connect
}

ConnectionPool.prototype = {}

/**
* Get a connection for use. Takes a key or array of keys, and for each
* connection, ensures it is open and passes it to the callback along with an
* array of keys the given connection is responsible for.
*/
ConnectionPool.prototype.use = function(keys, cb) {
keys = keys instanceof Array ? keys : [keys]

var keysByServer = this._groupKeys(keys)
, server

for (server in keysByServer)
if (keysByServer.hasOwnProperty(server))
cb(this._connection(server), keysByServer[server])
}

/**
* Close all open connections. They will be re-opened on-demand.
*
* The optional "end" parameter is a function that will be passed each open
* connection to close it.
*/
ConnectionPool.prototype.reset = function(end) {
end = end || function(conn) { conn.end() }

for (var key in this.connections)
if (this.connections.hasOwnProperty(key))
end(this.connections[key])

this.connections = {}
}

/**
* Internal - Group keys by the server they are assigned to
*/
ConnectionPool.prototype._groupKeys = function(keys) {
var keysByServer = {}

for (var i = 0; i < keys.length; i++) {
var server = this.get_server(keys[i])
if (!keysByServer[server]) keysByServer[server] = []
keysByServer[server].push(keys[i])
}

return keysByServer
}

/**
* Internal - get the connection for the given server
*/
ConnectionPool.prototype._connection = function(server) {
var self = this
function onEnd() {
delete self.connections[server]
}

if (!this.connections.hasOwnProperty(server))
this.connections[server] = this.connect(server, onEnd)

return this.connections[server]
}
"use strict";
var ConnectionPool = (function () {
function ConnectionPool(connect, get_server) {
this.connect = connect;
this.get_server = get_server;
this.connections = {};
if (!connect)
throw "You must specify a connection function";
if (!get_server)
throw "You must specify a get_server function to map a key to a server";
}
/**
* Get a connection for use. Takes a key or array of keys, and for each
* connection, ensures it is open and passes it to the callback along with an
* array of keys the given connection is responsible for.
*/
ConnectionPool.prototype.use = function (keys, cb) {
keys = keys instanceof Array ? keys : [keys];
var keysByServer = this._groupKeys(keys);
for (var server in keysByServer)
if (keysByServer.hasOwnProperty(server))
cb(this._connection(server), keysByServer[server]);
};
/**
* Close all open connections. They will be re-opened on-demand.
*
* The optional "end" parameter is a function that will be passed each open
* connection to close it.
*/
ConnectionPool.prototype.reset = function (end) {
end = end || function (conn) { conn.end(); };
for (var key in this.connections)
if (this.connections.hasOwnProperty(key))
end(this.connections[key]);
this.connections = {};
};
/**
* Internal - Group keys by the server they are assigned to
*/
ConnectionPool.prototype._groupKeys = function (keys) {
var keysByServer = {};
for (var i = 0; i < keys.length; i++) {
var server = this.get_server(keys[i]);
if (!keysByServer[server])
keysByServer[server] = [];
keysByServer[server].push(keys[i]);
}
return keysByServer;
};
/**
* Internal - get the connection for the given server
*/
ConnectionPool.prototype._connection = function (server) {
var _this = this;
if (!this.connections.hasOwnProperty(server)) {
this.connections[server] = this.connect(server, function () { return delete _this.connections[server]; });
}
return this.connections[server];
};
return ConnectionPool;
}());
module.exports = ConnectionPool;
Loading