diff --git a/lib/drivers/mysql.js b/lib/drivers/mysql.js index 11e44f5..e9ad6b6 100644 --- a/lib/drivers/mysql.js +++ b/lib/drivers/mysql.js @@ -4,33 +4,62 @@ module.exports = function MysqlDriver() { return { connect: function connect(opts, callback) { - const conn = mysql.createConnection(opts) - conn.connect(callback) - return conn + if (callback) { + process.nextTick(function () { + callback(null) + }) + } + + opts.connectionLimit = 100 + return mysql.createPool(opts) }, - close: function close(connection, callback) { - return connection.end(callback) + close: function close(pool, callback) { + pool.end(callback) }, - getQueryFn: function getQueryFn(connection) { + getQueryFn: function getQueryFn(pool) { return function (sql, params, callback) { const opts = { sql: sql } - return connection.query(opts, params, callback) + + pool.getConnection(function (err, connection) { + if (err) { + if (callback) + return callback(err) + else + return console.error(err) + } + + return connection.query(opts, params, function (err, rows) { + connection.release() + if (callback) callback(err, rows) + }) + }) } }, - getStreamFn: function getStreamFn(connection) { + getStreamFn: function getStreamFn(pool) { // will be in the context of `db` // expected to return a stream return function streamQuery(sql, opts) { const stream = new Stream() - const queryStream = this.query(sql) const handlers = this.driver.streamHandlers(stream, opts) - stream.pause = connection.pause.bind(connection) - stream.resume = connection.resume.bind(connection) + pool.getConnection(function (err, connection) { + if (err) return stream.emit('error', err) + + function endCallback(err) { + connection.release() + handlers.done(err) + } + + const queryStream = connection.query(sql) + + stream.pause = connection.pause.bind(connection) + stream.resume = connection.resume.bind(connection) + + queryStream.on('result', handlers.row) + queryStream.on('end', endCallback) + queryStream.on('error', stream.emit.bind(stream, 'error')) + }) - queryStream.on('result', handlers.row) - queryStream.on('end', handlers.done) - queryStream.on('error', stream.emit.bind(stream, 'error')) return stream } },