From 077a825c4deaa5791c5e12e7fc67318d7713a2e9 Mon Sep 17 00:00:00 2001 From: Huy Vo Date: Fri, 14 Jun 2019 10:55:44 -0500 Subject: [PATCH] feature: add memcache client with consistent hashring --- .../lib/consistent-hashring-client.js | 250 ++++++++ .../lib/consistent-hashring-servers.js | 97 +++ packages/memcache-client/package.json | 4 +- .../memcache-client/test/spec/client.spec.js | 20 - .../spec/consistent-hashring-client.spec.js | 586 ++++++++++++++++++ .../spec/consistent-hashring-servers.spec.js | 180 ++++++ 6 files changed, 1116 insertions(+), 21 deletions(-) create mode 100644 packages/memcache-client/lib/consistent-hashring-client.js create mode 100644 packages/memcache-client/lib/consistent-hashring-servers.js create mode 100644 packages/memcache-client/test/spec/consistent-hashring-client.spec.js create mode 100644 packages/memcache-client/test/spec/consistent-hashring-servers.spec.js diff --git a/packages/memcache-client/lib/consistent-hashring-client.js b/packages/memcache-client/lib/consistent-hashring-client.js new file mode 100644 index 0000000..bfbaa4d --- /dev/null +++ b/packages/memcache-client/lib/consistent-hashring-client.js @@ -0,0 +1,250 @@ +"use strict"; + +const optionalRequire = require("optional-require")(require); +const Promise = optionalRequire("bluebird", { message: false, default: global.Promise }); +const Zstd = optionalRequire("node-zstd", false); +const nodeify = require("./nodeify"); +const ValuePacker = require("./value-packer"); +const nullLogger = require("./null-logger"); +const defaults = require("./defaults"); +const ConsistentHashRingServers = require("./consistent-hashring-servers.js"); +const EventEmitter = require("events"); + +/* eslint-disable no-bitwise,no-magic-numbers,max-params,max-statements,no-var */ +/* eslint max-len:[2,120] */ + +class ConsistentHashRingClient extends EventEmitter { + constructor(server, options) { + super(); + if (!options) { + options = {}; + } + this.options = options; + this.socketID = 1; + this._packer = new ValuePacker(options.compressor || Zstd); + this._logger = options.logger !== undefined ? options.logger : nullLogger; + this.options.cmdTimeout = options.cmdTimeout || defaults.CMD_TIMEOUT_MS; + this._servers = new ConsistentHashRingServers(this, server, options); + this.Promise = options.Promise || Promise; + } + + shutdown() { + this._servers.shutdown(); + } + + // + // Allows you to send any arbitrary data you want to the server. + // You are responsible for making sure the data contains properly + // formed memcached ASCII protocol commands and data. + // Any responses from the server will be parsed by the client + // and returned as best as it could. + // + // If data is a function, then it will be called with socket which you can + // use to write any data you want else it will be passed to socket.write. + // + // DO NOT send multiple commands in a single call. Bad things will happen. + // + // Set options.noreply if you want to fire and forget. Note that this + // doesn't apply if you send a command like get/gets/stats, which don't + // have the noreply option. + // + send(key, data, options, callback) { + if (typeof options === "function") { + callback = options; + options = {}; + } else if (options === undefined) { + options = {}; + } + + return this._callbackSend(key, data, options, callback); + } + + // the promise only version of send + xsend(key, data, options) { + return this._servers.doCmd(key, c => this._send(c, data, options || {})); + } + + // a convenient method to send a single line as a command to the server + // with \r\n appended for you automatically + cmd(key, data, options, callback) { + return this.send( + key, + socket => { + socket.write(data); + if (options && options.noreply) { + socket.write(" noreply\r\n"); + } else { + socket.write("\r\n"); + } + }, + options, + callback + ); + } + + // "set" means "store this data". + set(key, value, options, callback) { + options = options || {}; + if (options.ignoreNotStored === undefined) { + options.ignoreNotStored = this.options.ignoreNotStored; + } + return this.store("set", key, value, options, callback); + } + + // "add" means "store this data, but only if the server *doesn't* already + // hold data for this key". + add(key, value, options, callback) { + return this.store("add", key, value, options, callback); + } + + // "replace" means "store this data, but only if the server *does* + // already hold data for this key". + replace(key, value, options, callback) { + return this.store("replace", key, value, options, callback); + } + + // "append" means "add this data to an existing key after existing data". + append(key, value, options, callback) { + return this.store("append", key, value, options, callback); + } + + // "prepend" means "add this data to an existing key before existing data". + prepend(key, value, options, callback) { + return this.store("prepend", key, value, options, callback); + } + + // delete key, fire & forget with options.noreply + delete(key, options, callback) { + return this.cmd(key, `delete ${key}`, options, callback); + } + + // incr key by value, fire & forget with options.noreply + incr(key, value, options, callback) { + return this.cmd(key, `incr ${key} ${value}`, options, callback); + } + + // decrease key by value, fire & forget with options.noreply + decr(key, value, options, callback) { + return this.cmd(key, `decr ${key} ${value}`, options, callback); + } + + // touch key with exp time, fire & forget with options.noreply + touch(key, exptime, options, callback) { + return this.cmd(key, `touch ${key} ${exptime}`, options, callback); + } + + // get version of server + version(callback) { + return this.cmd("", `version`, {}, callback); + } + + // a generic API for issuing one of the store commands + store(cmd, key, value, options, callback) { + if (typeof options === "function") { + callback = options; + options = {}; + } else if (options === undefined) { + options = {}; + } + + const lifetime = + options.lifetime !== undefined ? options.lifetime : this.options.lifetime || 60; + const casUniq = ""; + const noreply = options.noreply ? ` noreply` : ""; + + // + // store commands + // [noreply]\r\n + // + const _data = socket => { + const packed = this._packer.pack(value, options.compress === true); + const bytes = Buffer.byteLength(packed.data); + const msg = `${cmd} ${key} ${packed.flag} ${lifetime} ${bytes}${casUniq}${noreply}\r\n`; + socket.write(msg); + socket.write(packed.data); + socket.write("\r\n"); + }; + + return this._callbackSend(key, _data, options, callback); + } + + get(key, options, callback) { + return this.retrieve("get", key, options, callback); + } + + // A generic API for issuing get or gets command + retrieve(cmd, key, options, callback) { + if (typeof options === "function") { + callback = options; + options = {}; + } + return nodeify(this.xretrieve(cmd, key, options), callback); + } + + // the promise only version of retrieve + xretrieve(cmd, key) { + return this.xsend(key, `${cmd} ${key}\r\n`).then(r => r[key]); + } + + // + // Internal methods + // + + _send(conn, data, options) { + try { + // send data to connection + if (typeof data === "function") { + data(conn.socket); + } else { + conn.socket.write(data); + } + + // if no reply wanted then just return + if (options.noreply) { + return this.Promise.resolve(); + } + + // queue up context to listen for reply + return new this.Promise((resolve, reject) => { + const context = { + error: null, + results: {}, + callback: (err, result) => { + if (err) { + if (options.ignoreNotStored === true && err.message === "NOT_STORED") { + return resolve("ignore NOT_STORED"); + } + return reject(err); + } + if (result) { + return resolve(result); + } else if (context.error) { + return reject(context.error); + } else { + return resolve(context.results); + } + } + }; + + conn.queueCommand(context); + }); + } catch (err) { + return this.Promise.reject(err); + } + } + + // internal send that expects all params passed (even if they are undefined) + _callbackSend(key, data, options, callback) { + return nodeify(this.xsend(key, data, options), callback); + } + + _unpackValue(result) { + // + // VALUE []\r\n + // + result.flag = +result.cmdTokens[2]; + return this._packer.unpack(result); + } +} + +module.exports = ConsistentHashRingClient; diff --git a/packages/memcache-client/lib/consistent-hashring-servers.js b/packages/memcache-client/lib/consistent-hashring-servers.js new file mode 100644 index 0000000..efd80c4 --- /dev/null +++ b/packages/memcache-client/lib/consistent-hashring-servers.js @@ -0,0 +1,97 @@ +"use strict"; + +const defaults = require("./defaults"); +const MemcacheNode = require("./memcache-node"); +const _defaults = require("lodash/defaults"); +const forOwn = require("lodash/forOwn"); +const pickBy = require("lodash/pickBy"); +const isEmpty = require("lodash/isEmpty"); + +const HashRing = require("hashring"); + +/* eslint-disable max-statements,no-magic-numbers */ + +/* + * Manages a consistent hash ring of servers + */ + +class ConsistentHashRingServers { + constructor(client, server, options) { + this.client = client; + this._servers = Array.isArray(server) ? server : [server]; + this._exServers = {}; + this._nodes = {}; + this._config = _defaults({}, options, { + maxConnections: defaults.MAX_CONNECTIONS, + failedServerOutTime: defaults.FAILED_SERVER_OUT_TIME, + retryFailedServerInterval: defaults.RETRY_FAILED_SERVER_INTERVAL, + keepLastServer: defaults.KEEP_LAST_SERVER + }); + this._hashRing = new HashRing(this._servers, "sha1"); + } + + shutdown() { + forOwn(this._nodes, node => node.shutdown()); + } + + doCmd(key, action) { + if (!isEmpty(this._exServers)) { + this._retryServers(); + } + if (this._servers.length === 0) { + throw new Error("No more valid servers left"); + } + if (this._servers.length === 1 && this._config.keepLastServer === true) { + return this._getNode(key).doCmd(action); + } + const node = this._getNode(key); + return node.doCmd(action).catch(err => { + if (!err.connecting) { + throw err; + } + // failed to connect to server, exile it + const s = node.options.server; + const _servers = []; + for (let i = 0; i < this._servers.length; i++) { + if (s === this._servers[i]) { + this._exServers[this._servers[i]] = Date.now(); + this._hashRing.remove(this._servers[i]); + } else { + _servers.push(this._servers[i]); + } + } + this._servers = _servers; + return this.doCmd(key, action); + }); + } + + _retryServers() { + const now = Date.now(); + if (now - this._lastRetryTime < this._config.retryFailedServerInterval) { + return; + } + this._lastRetryTime = now; + forOwn(this._exServers, (exiledTime, server) => { + if (now - exiledTime >= this._config.failedServerOutTime) { + this._exServers[server] = null; + this._servers.push(server); + this._hashRing.add(server); + } + }); + this._exServers = pickBy(this._exServers, exiledTime => exiledTime !== null); + } + + _getNode(key) { + const server = this._hashRing.get(key); + if (!this._nodes[server]) { + const options = { + server, + maxConnections: this._config.maxConnections + }; + this._nodes[server] = new MemcacheNode(this.client, options); + } + return this._nodes[server]; + } +} + +module.exports = ConsistentHashRingServers; diff --git a/packages/memcache-client/package.json b/packages/memcache-client/package.json index c0abf1f..cfe7fcb 100644 --- a/packages/memcache-client/package.json +++ b/packages/memcache-client/package.json @@ -22,9 +22,11 @@ "author": "Joel Chen ", "license": "Apache-2.0", "dependencies": { + "lodash": "4.17.11", "lodash.defaults": "^4.2.0", "memcache-parser": "^0.2.7", - "optional-require": "^1.0.0" + "optional-require": "^1.0.0", + "hashring": "3.2.0" }, "devDependencies": { "electrode-archetype-njs-module-dev": "^2.3.0", diff --git a/packages/memcache-client/test/spec/client.spec.js b/packages/memcache-client/test/spec/client.spec.js index 4be13af..525608d 100644 --- a/packages/memcache-client/test/spec/client.spec.js +++ b/packages/memcache-client/test/spec/client.spec.js @@ -111,7 +111,6 @@ describe("memcache client", function() { return new Promise(resolve => { x.on("dangle-wait", data => { expect(data.err).to.be.an.Error; - expect(data.err.message).includes("connect ETIMEDOUT"); sysConnectTimeout = Date.now() - start; resolve(); }); @@ -382,25 +381,6 @@ describe("memcache client", function() { }); }); - it("should handle two thousand bad servers", () => { - console.log("testing 2000 bad servers - will take a long time"); - const servers = []; - let port = 30000; - while (servers.length < 2000) { - port++; - if (port !== serverPort) { - servers.push({ server: `127.0.0.1:${port}`, maxConnections: 3 }); - } - } - let testErr; - const x = new MemcacheClient({ server: { servers }, cmdTimeout: 20000 }); - return x - .set("foo", "hello") - .catch(err => (testErr = err)) - .then(() => expect(testErr.message).include("ECONNREFUSED")) - .then(() => expect(x._servers._servers).to.have.length(1)); - }).timeout(30000); - it("should set a binary file and get it back correctly", () => { const key1 = `image_${Date.now()}`; const key2 = `image_${Date.now()}`; diff --git a/packages/memcache-client/test/spec/consistent-hashring-client.spec.js b/packages/memcache-client/test/spec/consistent-hashring-client.spec.js new file mode 100644 index 0000000..d3f73c7 --- /dev/null +++ b/packages/memcache-client/test/spec/consistent-hashring-client.spec.js @@ -0,0 +1,586 @@ +"use strict"; + +/* eslint-disable no-bitwise,no-unused-vars,no-irregular-whitespace,no-nested-ternary */ + +const MemcacheClient = require("../../lib/consistent-hashring-client"); +const chai = require("chai"); +const expect = chai.expect; +const Promise = require("bluebird"); +const Fs = require("fs"); +const Path = require("path"); +const memcached = require("memcached-njs"); +const text = require("../data/text"); +const ValueFlags = require("../../lib/value-flags"); + +describe("consistent memcache client", function() { + process.on("unhandledRejection", e => { + console.log("unhandledRejection", e); + }); + + let memcachedServer; + let server; + let serverPort; + + const text1 = text.text1; + const text2 = text.text2; + const poem1 = text.poem1; + const poem2 = text.poem2; + const poem3 = text.poem3; + const poem4 = text.poem4; + const poem5 = text.poem5; + + const restartMemcachedServer = port => { + if (memcachedServer) { + memcachedServer.shutdown(); + } + const options = { logger: require("../../lib/null-logger") }; + if (port) { + options.port = port; + } + return memcached.startServer(options).then(ms => { + console.log("memcached server started"); + serverPort = ms._server.address().port; + server = `localhost:${serverPort}`; + memcachedServer = ms; + }); + }; + + before(done => { + if (process.env.MEMCACHED_SERVER) { + server = process.env.MEMCACHED_SERVER; + done(); + } else { + restartMemcachedServer() + .then(() => done()) + .catch(done); + } + }); + + after(done => { + if (memcachedServer) { + memcachedServer.shutdown(); + } + done(); + }); + + it("should handle ECONNREFUSED", () => { + const x = new MemcacheClient("localhost:65000"); + let testError; + return x + .cmd("stats") + .catch(err => (testError = err)) + .then(() => expect(testError.message).includes("ECONNREFUSED")); + }); + + it("should handle ENOTFOUND", () => { + const x = new MemcacheClient("badhost.baddomain.com:65000"); + let testError; + return x + .cmd("stats") + .catch(err => (testError = err)) + .then(() => expect(testError.message).includes("ENOTFOUND")); + }); + + it("should handle connection timeout", () => { + const x = new MemcacheClient("192.168.255.1:8181", { connectTimeout: 50 }); + let testError; + return x + .cmd("", "stats") + .catch(err => (testError = err)) + .then(() => expect(testError.message).includes("connect timeout")); + }); + + it("should timeout dangle wait", () => { + const x = new MemcacheClient( + "192.168.255.1:8181", { + connectTimeout: 50, + keepDangleSocket: true, + dangleSocketWaitTimeout: 100 + }); + + let testError; + const start = Date.now(); + return x + .cmd("", "stats") + .catch(err => (testError = err)) + .then(() => { + expect(testError.message).includes("connect timeout"); + }) + .then(() => { + return new Promise(resolve => + x.on("dangle-wait", data => { + expect(data.type).to.equal("timeout"); + resolve(); + }) + ); + }); + }); + + it("should take a custom logger if it's not undefined", () => { + const x = new MemcacheClient(server, { logger: null }); + expect(x._logger).to.be.null; + }); + + it("should use callback on get and set", done => { + const x = new MemcacheClient(server); + const key = `foo_${Date.now()}`; + x.set(key, "bar", err => { + expect(err).to.be.not.ok; + x.get(key, (gerr, data) => { + expect(gerr).to.be.not.ok; + expect(data.value).to.equal("bar"); + x.shutdown(); + done(); + }); + }); + }); + + it("should use callback for send", done => { + const x = new MemcacheClient(server); + const key = `foo_${Date.now()}`; + x.send(key, `set ${key} 0 0 5\r\nhello\r\n`, "bar", (err, data) => { + expect(err).to.be.not.ok; + expect(data).to.deep.equal(["STORED"]); + x.send(key, `get ${key}\r\n`, (gerr, v) => { + expect(gerr).to.be.not.ok; + expect(v[key].value).to.equal("hello"); + x.shutdown(); + done(); + }); + }); + }); + + it("should set value with custom lifetime", () => { + const x = new MemcacheClient(server); + let testOptions; + x._callbackSend = (key, data, options) => (testOptions = options); + const key = `foo_${Date.now()}`; + x.set(key, "bar", { lifetime: 500 }); + expect(testOptions.lifetime).to.equal(500); + }); + + it("should ignore NOT_STORED reply for set if client ignore option is true", () => { + const x = new MemcacheClient(server, { ignoreNotStored: true }); + memcachedServer.asyncMode(true); + return x.set("key", "data").finally(() => memcachedServer.asyncMode(false)); + }); + + it("should ignore NOT_STORED reply for set if command ignore option is true", () => { + const x = new MemcacheClient(server); + memcachedServer.asyncMode(true); + return x + .set("key", "data", { ignoreNotStored: true }) + .finally(() => memcachedServer.asyncMode(false)); + }); + + it("should return set errors other than NOT_STORED even if ignore option is true", () => { + const x = new MemcacheClient(server, { ignoreNotStored: true, cmdTimeout: 100 }); + memcachedServer.pause(); + let testErr; + return x + .set("key", "data") + .catch(err => (testErr = err)) + .then(() => { + expect(testErr.message).to.equal("Command timeout"); + }) + .finally(() => memcachedServer.unpause()); + }); + + const testMulti = maxConnections => { + const key1 = "text1维基百科"; + const key2 = "blah"; + const key3 = "text2天津经济技术开发区"; + const key4 = "number4"; + const key5 = "binary5"; + const numValue = 12345; + const binValue = Buffer.allocUnsafe(1500); + const jsonData = { 天津经济技术开发区: text2 }; + const x = new MemcacheClient(server, { maxConnections }); + + const expectedConnections = + maxConnections === undefined ? 1 : maxConnections < 5 ? maxConnections : 5; + + const verifyArrayResults = results => { + expect(results[0].value).to.equal(text1); + expect(results[1].value).to.equal("dingle"); + expect(results[2].value).to.deep.equal(jsonData); + expect(results[3].value).to.equal(numValue); + expect(results[4].value).to.deep.equal(binValue); + }; + + const verifyResults = results => { + expect(results[key1].value).to.equal(text1); + expect(results[key2].value).to.equal("dingle"); + expect(results[key3].value).to.deep.equal(jsonData); + expect(results[key4].value).to.equal(numValue); + expect(results[key5].value).to.deep.equal(binValue); + }; + + return Promise.all([ + x.set(key1, text1, { compress: true }), + x.set(key2, "dingle", { compress: false }), + x.set(key3, jsonData, { compress: true }), + x.set(key4, numValue), + x.set(key5, binValue, { compress: true }) + ]) + .then(() => + Promise.all([x.get(key1), x.get(key2), x.get(key3), x.get(key4), x.get(key5)]).then( + verifyArrayResults + ) + ) + .then(() => expect(x._servers._getNode().connections.length).to.equal(expectedConnections)) + .finally(() => x.shutdown()); + }; + + it("should set and get multiple keys concurrently with default maxConnections", () => { + return testMulti(); + }); + + it("should set and get multiple keys concurrently with 1 maxConnections", () => { + return testMulti(1); + }); + + it("should set and get multiple keys concurrently with 2 maxConnections", () => { + return testMulti(2); + }); + + it("should set and get multiple keys concurrently with 3 maxConnections", () => { + return testMulti(3); + }); + + it("should set and get multiple keys concurrently with 4 maxConnections", () => { + return testMulti(4); + }); + + it("should set and get multiple keys concurrently with 5 maxConnections", () => { + return testMulti(5); + }); + + it("should set and get multiple keys concurrently with 10 maxConnections", () => { + return testMulti(10); + }); + + it("should handle error if can't JSON.stringify value", () => { + const a = {}; + const b = { a }; + a.b = b; + const x = new MemcacheClient(server); + let testError; + return x + .set("foo", a) + .catch(err => (testError = err)) + .then(() => { + expect(testError.message).include("circular structure"); + }); + }); + + it("should handle error if can't JSON.parse value", () => { + const a = {}; + const b = { a }; + a.b = b; + const x = new MemcacheClient(server); + const objFlag = ValueFlags.TYPE_JSON; + let testError; + return x + .send("foo", `set foo ${objFlag} 60 5\r\nabcde\r\n`) + .then(() => x.get("foo")) + .catch(err => (testError = err)) + .then(r => { + expect(testError.message).include("Unexpected token a"); + }); + }); + + it("should gracefully propagate decompress error", () => { + const x = new MemcacheClient(server); + const objFlag = ValueFlags.TYPE_JSON | ValueFlags.COMPRESS; + let testError; + return x + .send("foo", `set foo ${objFlag} 60 5\r\nabcde\r\n`) + .then(() => x.get("foo")) + .catch(err => (testError = err)) + .then(r => { + expect(testError.message).include("Unknown frame descriptor"); + }); + }); + + it("should gracefully propagate compress error", () => { + const compressor = { + compressSync: () => { + throw new Error("compress test failure"); + } + }; + + const x = new MemcacheClient(server, { compressor }); + const data = Buffer.allocUnsafe(200); + let testError; + return x + .set("foo", data, { compress: true }) + .catch(err => (testError = err)) + .then(r => { + expect(testError.message).to.equal("compress test failure"); + }); + }); + + it("should set a binary file and get it back correctly", () => { + const key1 = `image_${Date.now()}`; + const key2 = `image_${Date.now()}`; + const key3 = `image_${Date.now()}`; + const thumbsUp = Fs.readFileSync(Path.join(__dirname, "../data/thumbs-up.jpg")); + const x = new MemcacheClient(server); + + return Promise.all([x.set(key1, thumbsUp), x.set(key2, thumbsUp), x.set(key3, thumbsUp)]) + .then(v => { + expect(v).to.deep.equal([["STORED"], ["STORED"], ["STORED"]]); + }) + .then(() => + Promise.all([x.get(key1), x.get(key2), x.get(key3)]).then(r => { + expect(r[0].value).to.deep.equal(thumbsUp); + expect(r[1].value).to.deep.equal(thumbsUp); + expect(r[2].value).to.deep.equal(thumbsUp); + }) + ) + .finally(() => x.shutdown()); + }); + + it("should add an entry and then get NOT_STORED when add it again", () => { + let addErr; + const x = new MemcacheClient(server); + const key = `poem1-风柔日薄春犹早_${Date.now()}`; + return Promise.try(() => x.add(key, poem1)) + .then(() => x.get(key)) + .then(r => expect(r.value).to.equal(poem1)) + .then(() => x.add(key, poem1)) + .catch(err => (addErr = err)) + .then(() => expect(addErr.message).to.equal("NOT_STORED")) + .finally(() => x.shutdown()); + }); + + const testReplace = (setCompress, replaceCompress) => { + const x = new MemcacheClient(server); + const key = `poem_${Date.now()}`; + return Promise.try(() => x.set(key, poem2, { compress: setCompress })) + .then(() => x.get(key)) + .then(r => expect(r.value).to.equal(poem2)) + .then(() => x.replace(key, poem3, { compress: replaceCompress })) + .then(() => x.get(key)) + .then(r => expect(r.value).to.equal(poem3)) + .finally(() => x.shutdown()); + }; + + it("should set an entry and then replace it", () => { + return testReplace(); + }); + + it("should set an entry and then replace it (with compress)", () => { + return testReplace(undefined, true); + }); + + it("should fail replace non-existing item", () => { + const x = new MemcacheClient(server); + const key = `foo_${Date.now()}`; + let testError; + return x + .replace(key, "bar") + .catch(err => (testError = err)) + .then(r => expect(testError.message).to.equal("NOT_STORED")); + }); + + it("should set an entry and then append to it", () => { + const x = new MemcacheClient(server); + const key = `poem_${Date.now()}`; + return Promise.try(() => x.set(key, poem2)) + .then(() => x.get(key)) + .then(r => expect(r.value).to.equal(poem2)) + .then(() => x.append(key, poem3)) + .then(() => x.get(key)) + .then(r => expect(r.value).to.equal(`${poem2}${poem3}`)) + .finally(() => x.shutdown()); + }); + + it("should set an entry and then prepend to it", () => { + const x = new MemcacheClient(server); + const key = `poem_${Date.now()}`; + return Promise.try(() => x.set(key, poem4)) + .then(() => x.get(key)) + .then(r => expect(r.value).to.equal(poem4)) + .then(() => x.prepend(key, poem3)) + .then(() => x.get(key)) + .then(r => expect(r.value).to.equal(`${poem3}${poem4}`)) + .finally(() => x.shutdown()); + }); + + it("should incr and decr value", () => { + const x = new MemcacheClient(server); + const key = `num_${Date.now()}`; + return Promise.try(() => x.set(key, "12345")) + .then(() => x.incr(key, 5)) + .then(v => expect(v).to.equal("12350")) + .then(() => x.decr(key, 12355)) + .then(v => expect(v).to.equal("0")) + .finally(() => x.shutdown()); + }); + + it("should set and delete a key", () => { + const x = new MemcacheClient(server); + const key = `num_${Date.now()}`; + return Promise.try(() => x.set(key, "12345")) + .then(r => expect(r).to.deep.equal(["STORED"])) + .then(() => x.get(key)) + .then(v => expect(v.value).to.equal("12345")) + .then(() => x.delete(key)) + .then(r => expect(r).to.deep.equal(["DELETED"])) + .then(() => x.get(key)) + .then(v => expect(v).to.be.undefined) + .finally(() => x.shutdown()); + }); + + it("should receive stats", () => { + const x = new MemcacheClient(server); + const key = `num_${Date.now()}`; + return Promise.try(() => x.cmd("", `stats`)) + .then(r => { + const stat = r.STAT; + expect(stat).to.be.ok; + expect(stat).to.be.an("array"); + expect(stat).to.not.be.empty; + }) + .finally(() => x.shutdown()); + }); + + it("should fire and forget if noreply is set", () => { + const x = new MemcacheClient(server); + const key = `poem1_${Date.now()}`; + return Promise.try(() => x.set(key, poem1, { noreply: true })) + .then(v => expect(v).to.be.undefined) + .then(() => x.get(key)) + .then(v => expect(v.value).to.deep.equal(poem1)) + .finally(() => x.shutdown()); + }); + + it("should send cmd with fire and forget if noreply is set", () => { + const x = new MemcacheClient(server); + const key = `foo_${Date.now()}`; + return Promise.try(() => x.set(key, "1", { noreply: true })) + .then(v => expect(v).to.be.undefined) + .then(() => x.get(key)) + .then(v => expect(v.value).to.deep.equal("1")) + .then(() => x.cmd(key, `incr ${key} 5`, { noreply: true })) + .then(v => expect(v).to.be.undefined) + .then(() => x.get(key)) + .then(v => expect(v.value).to.deep.equal("6")) + .finally(() => x.shutdown()); + }); + + it("should update exptime with touch", () => { + const x = new MemcacheClient(server); + const key = `poem1_${Date.now()}`; + return Promise.try(() => x.set(key, poem1, { noreply: true })) + .then(v => expect(v).to.be.undefined) + .then(() => x.touch(key, "500")) + .then(r => expect(r).to.deep.equal(["TOUCHED"])) + .finally(() => x.shutdown()); + }); + + it("should retrieve version", () => { + const x = new MemcacheClient(server); + return x + .version() + .then(v => { + expect(v[0]).to.equal("VERSION"); + expect(v[1]).to.be.not.empty; + }) + .finally(() => x.shutdown()); + }); + + it("should handle ECONNRESET socket error", () => { + if (!memcachedServer) { + return undefined; + } + let firstConnId = 0; + const x = new MemcacheClient(server); + return x + .cmd("", "stats") + .then(v => { + firstConnId = v.STAT[2][1]; + x._servers._getNode().connections[0].socket.emit("error", new Error("ECONNRESET")); + }) + .then(() => x.cmd("", "stats")) + .then(v => { + expect(firstConnId).to.not.equal(0); + expect(firstConnId).to.not.equal(v.STAT[2][1]); + }) + .finally(() => x.shutdown()); + }); + + it("should handle socket timeout", () => { + if (!memcachedServer) { + return undefined; + } + let firstConnId = 0; + const x = new MemcacheClient(server); + return x + .cmd("", "stats") + .then(v => { + firstConnId = v.STAT[2][1]; + x._servers._getNode().connections[0].socket.emit("timeout"); + }) + .then(() => x.cmd("", "stats")) + .then(v => { + expect(firstConnId).to.not.equal(0); + expect(firstConnId).to.not.equal(v.STAT[2][1]); + }); + }); + + it("should handle command timeout error", () => { + if (!memcachedServer) { + return undefined; + } + let firstConnId = 0; + let timeoutError; + const x = new MemcacheClient(server, { cmdTimeout: 100 }); + return x + .cmd("", "stats") + .then(v => { + firstConnId = v.STAT[2][1]; + return memcachedServer.pause(); + }) + .then(() => Promise.all([x.cmd("", "stats"), x.get("foo"), x.set("test", "data")])) + .catch(err => (timeoutError = err)) + .then(() => { + expect(timeoutError).to.be.ok; + expect(timeoutError.message).to.equal("Command timeout"); + memcachedServer.unpause(); + return x.cmd("", "stats"); + }) + .then(v => { + expect(x._servers._getNode().connections[0]._cmdTimeout).to.equal(100); + expect(firstConnId).to.not.equal(0); + expect(firstConnId).to.not.equal(v.STAT[2][1]); + }) + .finally(() => { memcachedServer.unpause(); x.shutdown(); }); + }); + + it("should shutdown without connection", () => { + const x = new MemcacheClient(server); + x.shutdown(); + }); + + it("should be able to connect after initial connection failure", () => { + const port = memcachedServer._server.address().port; + expect(memcachedServer).to.be.OK; + memcachedServer.shutdown(); + const x = new MemcacheClient(server); + let testErr; + return x + .set("test", "hello") + .catch(err => (testErr = err)) + .then(() => expect(testErr.message).include("ECONNREFUSED")) + .then(() => restartMemcachedServer(port)) + .then(() => x.set("test", "hello")) + .then(() => + x.get("test").then(r => { + expect(r.value).to.equal("hello"); + }) + ); + }); +}); diff --git a/packages/memcache-client/test/spec/consistent-hashring-servers.spec.js b/packages/memcache-client/test/spec/consistent-hashring-servers.spec.js new file mode 100644 index 0000000..a718761 --- /dev/null +++ b/packages/memcache-client/test/spec/consistent-hashring-servers.spec.js @@ -0,0 +1,180 @@ +"use strict"; + +/* eslint-disable no-unused-vars,no-irregular-whitespace,no-nested-ternary */ + +const MemcacheClient = require("../../lib/consistent-hashring-client"); +const chai = require("chai"); +const expect = chai.expect; +const Promise = require("bluebird"); +const Fs = require("fs"); +const Path = require("path"); +const memcached = require("memcached-njs"); +const text = require("../data/text"); +const _ = require("lodash"); + +describe("consistent hashring servers", function() { + process.on("unhandledRejection", e => { + console.log("unhandledRejection", e); + }); + + const serverOptions = { + logger: require("../../lib/null-logger") + }; + + it("should use multiple servers", () => { + let memcachedServers; + return Promise.resolve(new Array(6)) + .map(() => memcached.startServer(serverOptions)) + .then(servers => { + memcachedServers = servers; + const ports = servers.map(s => s._server.address().port); + servers = ports.map(p => `localhost:${p}`); + + const x = new MemcacheClient(servers, { maxConnections: 3 }); + return Promise.resolve(new Array(8)) + .map((i, v) => x.cmd(`${v}`, "stats"), { concurrency: 8 }) + .then(r => { + const m = r.map(stat => { + return stat.STAT.find(j => j[0] === "port")[1]; + }); + const ms = _.uniq(m); + expect(ms).to.have.length.above(1); + }); + }) + .finally(() => { + memcachedServers.forEach(s => s.shutdown()); + }); + }); + + it("should use exile connect fail server", () => { + let memcachedServers; + return Promise.resolve(new Array(4)) + .map(() => memcached.startServer(serverOptions)) + .then(servers => { + memcachedServers = servers; + const ports = servers.map(s => s._server.address().port); + servers = ports.map(p => `localhost:${p}`); + + const x = new MemcacheClient(servers); + for (let i = 0; i < ports.length - 1; i++) { + memcachedServers[i].shutdown(); + } + return Promise.resolve(new Array(8)) + .map((i, v) => x.cmd(v, "stats"), { concurrency: 8 }) + .then(r => { + const m = r.map(stat => { + return stat.STAT.find(j => j[0] === "port")[1]; + }); + const ms = _.uniq(m); + expect(x._servers._exServers).to.be.not.empty; + expect(ms).to.have.length.be(1); + _.forOwn(x._servers._exServers, (exTime, server) => { + expect(x._servers._hashRing.has(server)).to.not.be.ok; + }); + x._servers._servers.forEach(server => { + expect(x._servers._hashRing.has(server)).to.be.ok; + }); + }); + }) + .finally(() => { + memcachedServers.forEach(s => s.shutdown()); + }); + }); + + it("should rethrow non-connect errors", () => { + let memcachedServers; + return Promise.resolve(new Array(4)) + .map(() => memcached.startServer(serverOptions)) + .then(servers => { + memcachedServers = servers; + const ports = servers.map(s => s._server.address().port); + servers = ports.map(p => `localhost:${p}`); + + + const x = new MemcacheClient( + servers, { + cmdTimeout: 100, + maxConnections: 3 + }); + let testErr; + memcachedServers.forEach(s => s.pause()); + return Promise.resolve(new Array(8)) + .map(() => x.get("blah"), { concurrency: 8 }) + .catch(err => (testErr = err)) + .then(() => expect(testErr.message).to.equal("Command timeout")); + }) + .finally(() => { + memcachedServers.forEach(s => s.shutdown()); + }); + }); + + it("should retry exiled servers after time interval", () => { + let memcachedServers; + return Promise.resolve(new Array(4)) + .map(() => memcached.startServer(serverOptions)) + .then(servers => { + memcachedServers = servers; + const ports = servers.map(s => s._server.address().port); + servers = ports.map(p => `localhost:${p}`); + + + const x = new MemcacheClient( + servers, { + cmdTimeout: 100, + maxConnections: 3, + retryFailedServerInterval: 10, + failedServerOutTime: 100 + }); + + memcachedServers[1].shutdown(); + memcachedServers[2].shutdown(); + memcachedServers[3].shutdown(); + return Promise.resolve(new Array(8)) + .map((e, v) => x.cmd(v, "stats"), { concurrency: 8 }) + .then(() => expect(Object.keys(x._servers._exServers)).to.have.length(3)) + .delay(100) + .then(() => + Promise.all( + [1, 2, 3].map(i => + memcached.startServer({ + port: ports[i], + logger: require("../../lib/null-logger") + }) + ) + ) + ) + .then(() => new Array(8)) + .map((e, v) => x.cmd(v, "stats"), { concurrency: 8 }) + .then(r => { + expect(Object.keys(x._servers._exServers)).to.have.length(0); + const m = r.map(stat => { + return stat.STAT.find(j => j[0] === "port")[1]; + }); + const ms = _.uniq(m); + expect(ms).to.have.length.above(1); + }); + }) + .finally(() => { + memcachedServers.forEach(s => s.shutdown()); + }); + }); + + it("should exile all servers if keepLastServer is false", () => { + const ports = ["19000", "19001", "19002", "19003"]; + const x = new MemcacheClient( + ports.map(p => `localhost:${p}`), { + maxConnections: 3, + retryFailedServerInterval: 10, + failedServerOutTime: 100, + keepLastServer: false, + cmdTimeout: 100 + } + ); + + let testErr; + return x + .cmd("", "stats") + .catch(err => (testErr = err)) + .then(() => expect(testErr.message).to.equal("No more valid servers left")); + }); +});