diff --git a/lib/metadata/event-debouncer.js b/lib/metadata/event-debouncer.js deleted file mode 100644 index fa4411080..000000000 --- a/lib/metadata/event-debouncer.js +++ /dev/null @@ -1,159 +0,0 @@ -"use strict"; - -const util = require("util"); -const utils = require("../utils"); -const promiseUtils = require("../promise-utils"); - -const _queueOverflowThreshold = 1000; - -/** - * Debounce protocol events by acting on those events with a sliding delay. - * @ignore - * @constructor - */ -class EventDebouncer { - /** - * Creates a new instance of the event debouncer. - * @param {Number} delay - * @param {Function} logger - */ - constructor(delay, logger) { - this._delay = delay; - this._logger = logger; - this._queue = null; - this._timeout = null; - } - - /** - * Adds a new event to the queue and moves the delay. - * @param {{ handler: Function, all: boolean|undefined, keyspace: String|undefined, - * cqlObject: String|null|undefined }} event - * @param {Boolean} processNow - * @returns {Promise} - */ - eventReceived(event, processNow) { - return new Promise((resolve, reject) => { - event.callback = promiseUtils.getCallback(resolve, reject); - this._queue = this._queue || { callbacks: [], keyspaces: {} }; - const delay = !processNow ? this._delay : 0; - if (event.all) { - // when an event marked with all is received, it supersedes all the rest of events - // a full update (hosts + keyspaces + tokens) is going to be made - this._queue.mainEvent = event; - } - if (this._queue.callbacks.length === _queueOverflowThreshold) { - // warn once - this._logger( - "warn", - util.format( - "Event debouncer queue exceeded %d events", - _queueOverflowThreshold, - ), - ); - } - this._queue.callbacks.push(event.callback); - if (this._queue.mainEvent) { - // a full refresh is scheduled and the callback was added, nothing else to do. - return this._slideDelay(delay); - } - // Insert at keyspace level - let keyspaceEvents = this._queue.keyspaces[event.keyspace]; - if (!keyspaceEvents) { - keyspaceEvents = this._queue.keyspaces[event.keyspace] = { - events: [], - }; - } - if (event.cqlObject === undefined) { - // a full refresh of the keyspace, supersedes all child keyspace events - keyspaceEvents.mainEvent = event; - } - keyspaceEvents.events.push(event); - this._slideDelay(delay); - }); - } - - /** - * @param {Number} delay - * @private - * */ - _slideDelay(delay) { - const self = this; - function process() { - const q = self._queue; - self._queue = null; - self._timeout = null; - processQueue(q); - } - if (delay === 0) { - // no delay, process immediately - if (this._timeout) { - clearTimeout(this._timeout); - } - return process(); - } - const previousTimeout = this._timeout; - // Add the new timeout before removing the previous one performs better - this._timeout = setTimeout(process, delay); - if (previousTimeout) { - clearTimeout(previousTimeout); - } - } - - /** - * Clears the timeout and invokes all pending callback. - */ - shutdown() { - if (!this._queue) { - return; - } - this._queue.callbacks.forEach(function (cb) { - cb(); - }); - this._queue = null; - clearTimeout(this._timeout); - this._timeout = null; - } -} - -/** - * @param {{callbacks: Array, keyspaces: Object, mainEvent: Object}} q - * @private - */ -function processQueue(q) { - if (q.mainEvent) { - // refresh all by invoking 1 handler and invoke all pending callbacks - return promiseUtils.toCallback(q.mainEvent.handler(), (err) => { - for (let i = 0; i < q.callbacks.length; i++) { - q.callbacks[i](err); - } - }); - } - - utils.each(Object.keys(q.keyspaces), function eachKeyspace(name, next) { - const keyspaceEvents = q.keyspaces[name]; - if (keyspaceEvents.mainEvent) { - // refresh a keyspace - return promiseUtils.toCallback( - keyspaceEvents.mainEvent.handler(), - function mainEventCallback(err) { - for (let i = 0; i < keyspaceEvents.events.length; i++) { - keyspaceEvents.events[i].callback(err); - } - - next(); - }, - ); - } - - // deal with individual handlers and callbacks - keyspaceEvents.events.forEach((event) => { - // sync handlers - event.handler(); - event.callback(); - }); - - next(); - }); -} - -module.exports = EventDebouncer; diff --git a/lib/metadata/index.js b/lib/metadata/index.js index 2cfb4a990..c75904845 100644 --- a/lib/metadata/index.js +++ b/lib/metadata/index.js @@ -1,20 +1,10 @@ "use strict"; -const events = require("events"); -const util = require("util"); - /** * Module containing classes and fields related to metadata. * @module metadata */ -const utils = require("../utils"); -const errors = require("../errors"); -const types = require("../types"); -const schemaParserFactory = require("./schema-parser"); -const promiseUtils = require("../promise-utils"); -const { TokenRange } = require("../token"); - /** * @const * @private @@ -59,241 +49,23 @@ class Metadata { * @param {ControlConnection} controlConnection Control connection used to retrieve information. */ constructor(options, controlConnection) { - if (!options) { - throw new errors.ArgumentError("Options are not defined"); - } - - Object.defineProperty(this, "options", { - value: options, - enumerable: false, - writable: false, - }); - Object.defineProperty(this, "controlConnection", { - value: controlConnection, - enumerable: false, - writable: false, - }); - this.keyspaces = {}; - this.initialized = false; - this._isDbaas = false; - this._schemaParser = schemaParserFactory.getByVersion( - options, - controlConnection, - this.getUdt.bind(this), - ); - this.log = utils.log; - this._preparedQueries = new PreparedQueries( - options.maxPrepared, - (...args) => this.log(...args), - ); - } - - /** - * Sets the cassandra version - * @internal - * @ignore - * @param {Array.} version - */ - setCassandraVersion(version) { - this._schemaParser = schemaParserFactory.getByVersion( - this.options, - this.controlConnection, - this.getUdt.bind(this), - version, - this._schemaParser, - ); - } - - /** - * Determines whether the cluster is provided as a service. - * @returns {boolean} true when the cluster is provided as a service (DataStax Astra), false when it's a - * different deployment (on-prem). - */ - isDbaas() { - return this._isDbaas; - } - - /** - * Sets the product type as DBaaS. - * @internal - * @ignore - */ - setProductTypeAsDbaas() { - this._isDbaas = true; - } - - /** - * Populates the information regarding primary replica per token, datacenters (+ racks) and sorted token ring. - * @ignore - * @param {HostMap} hosts - */ - buildTokens(hosts) { - if (!this.tokenizer) { - return this.log("error", "Tokenizer could not be determined"); - } - // Get a sorted array of tokens - const allSorted = []; - // Get a map of - const primaryReplicas = {}; - // Depending on the amount of tokens, this could be an expensive operation - const hostArray = hosts.values(); - const stringify = this.tokenizer.stringify; - const datacenters = {}; - hostArray.forEach((h) => { - if (!h.tokens) { - return; - } - h.tokens.forEach((tokenString) => { - const token = this.tokenizer.parse(tokenString); - utils.insertSorted(allSorted, token, (t1, t2) => - t1.compare(t2), - ); - primaryReplicas[stringify(token)] = h; - }); - let dc = datacenters[h.datacenter]; - if (!dc) { - dc = datacenters[h.datacenter] = { - hostLength: 0, - racks: new utils.HashSet(), - }; - } - dc.hostLength++; - dc.racks.add(h.rack); - }); - // Primary replica for given token - this.primaryReplicas = primaryReplicas; - // All the tokens in ring order - this.ring = allSorted; - // Build TokenRanges. - const tokenRanges = new Set(); - if (this.ring.length === 1) { - // If there is only one token, return the range ]minToken, minToken] - const min = this.tokenizer.minToken(); - tokenRanges.add(new TokenRange(min, min, this.tokenizer)); - } else { - for (let i = 0; i < this.ring.length; i++) { - const start = this.ring[i]; - const end = this.ring[(i + 1) % this.ring.length]; - tokenRanges.add(new TokenRange(start, end, this.tokenizer)); - } - } - this.tokenRanges = tokenRanges; - // Compute string versions as it's potentially expensive and frequently reused later - this.ringTokensAsStrings = new Array(allSorted.length); - for (let i = 0; i < allSorted.length; i++) { - this.ringTokensAsStrings[i] = stringify(allSorted[i]); - } - // Datacenter metadata (host length and racks) - this.datacenters = datacenters; } /** * Gets the keyspace metadata information and updates the internal state of the driver. - * - * If a `callback` is provided, the callback is invoked when the keyspaces metadata refresh completes. - * Otherwise, it returns a `Promise`. * @param {String} name Name of the keyspace. - * @param {Function} [callback] Optional callback. - */ - refreshKeyspace(name, callback) { - return promiseUtils.optionalCallback( - this._refreshKeyspace(name), - callback, - ); - } - - /** - * @param {String} name - * @private */ - async _refreshKeyspace(name) { - if (!this.initialized) { - throw this._uninitializedError(); - } - this.log("info", util.format("Retrieving keyspace %s metadata", name)); - try { - const ksInfo = await this._schemaParser.getKeyspace(name); - if (!ksInfo) { - // the keyspace was dropped - delete this.keyspaces[name]; - return null; - } - // Tokens are lazily init on the keyspace, once a replica from that keyspace is retrieved. - this.keyspaces[ksInfo.name] = ksInfo; - return ksInfo; - } catch (err) { - this.log( - "error", - "There was an error while trying to retrieve keyspace information", - err, - ); - throw err; - } + refreshKeyspace(name) { + throw new Error("TODO: Not implemented"); } /** * Gets the metadata information of all the keyspaces and updates the internal state of the driver. - * - * If a `callback` is provided, the callback is invoked when the keyspace metadata refresh completes. - * Otherwise, it returns a `Promise`. - * @param {Boolean|Function} [waitReconnect] Determines if it should wait for reconnection in case the control connection is not + * @param {Boolean} [waitReconnect] Determines if it should wait for reconnection in case the control connection is not * connected at the moment. Default: true. - * @param {Function} [callback] Optional callback. - */ - refreshKeyspaces(waitReconnect, callback) { - if ( - typeof waitReconnect === "function" || - typeof waitReconnect === "undefined" - ) { - callback = waitReconnect; - waitReconnect = true; - } - if (!this.initialized) { - const err = this._uninitializedError(); - if (callback) { - return callback(err); - } - return Promise.reject(err); - } - return promiseUtils.optionalCallback( - this.refreshKeyspacesInternal(waitReconnect), - callback, - ); - } - - /** - * @param {Boolean} waitReconnect - * @returns {Promise>} - * @ignore - * @internal */ - async refreshKeyspacesInternal(waitReconnect) { - this.log("info", "Retrieving keyspaces metadata"); - try { - this.keyspaces = - await this._schemaParser.getKeyspaces(waitReconnect); - return this.keyspaces; - } catch (err) { - this.log( - "error", - "There was an error while trying to retrieve keyspaces information", - err, - ); - throw err; - } - } - - _getKeyspaceReplicas(keyspace) { - if (!keyspace.replicas) { - // Calculate replicas the first time for the keyspace - keyspace.replicas = keyspace.tokenToReplica( - this.tokenizer, - this.ringTokensAsStrings, - this.primaryReplicas, - this.datacenters, - ); - } - return keyspace.replicas; + refreshKeyspaces(waitReconnect) { + throw new Error("TODO: Not implemented"); } /** @@ -306,39 +78,7 @@ class Metadata { * @returns {Array} */ getReplicas(keyspaceName, token) { - if (!this.ring) { - return null; - } - if (Buffer.isBuffer(token)) { - token = this.tokenizer.hash(token); - } - if (token instanceof TokenRange) { - token = token.end; - } - let keyspace; - if (keyspaceName) { - keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - // the keyspace was not found, the metadata should be loaded beforehand - return null; - } - } - let i = utils.binarySearch(this.ring, token, (t1, t2) => - t1.compare(t2), - ); - if (i < 0) { - i = ~i; - } - if (i >= this.ring.length) { - // it circled back - i = i % this.ring.length; - } - const closestToken = this.ringTokensAsStrings[i]; - if (!keyspaceName) { - return [this.primaryReplicas[closestToken]]; - } - const replicas = this._getKeyspaceReplicas(keyspace); - return replicas[closestToken]; + throw new Error("TODO: Not implemented"); } /** @@ -346,7 +86,7 @@ class Metadata { * @returns {Set} The ranges of the ring or empty set if schema metadata is not enabled. */ getTokenRanges() { - return this.tokenRanges; + throw new Error("TODO: Not implemented"); } /** @@ -357,32 +97,7 @@ class Metadata { * @returns {Set|null} Ranges for the keyspace on this host or null if keyspace isn't found or hasn't been loaded. */ getTokenRangesForHost(keyspaceName, host) { - if (!this.ring) { - return null; - } - let keyspace; - if (keyspaceName) { - keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - // the keyspace was not found, the metadata should be loaded beforehand - return null; - } - } - // If the ring has only 1 token, just return the ranges as we should only have a single node cluster. - if (this.ring.length === 1) { - return this.getTokenRanges(); - } - const replicas = this._getKeyspaceReplicas(keyspace); - const ranges = new Set(); - // for each range, find replicas for end token, if replicas include host, add range. - this.tokenRanges.forEach((tokenRange) => { - const replicasForToken = - replicas[this.tokenizer.stringify(tokenRange.end)]; - if (replicasForToken.indexOf(host) !== -1) { - ranges.add(tokenRange); - } - }); - return ranges; + throw new Error("TODO: Not implemented"); } /** @@ -392,17 +107,7 @@ class Metadata { * @returns {Token} constructed token from the input buffer. */ newToken(components) { - if (!this.tokenizer) { - throw new Error( - "Partitioner not established. This should only happen if metadata was disabled or you have not connected yet.", - ); - } - if (Array.isArray(components)) { - return this.tokenizer.hash(Buffer.concat(components)); - } else if (util.isString(components)) { - return this.tokenizer.parse(components); - } - return this.tokenizer.hash(components); + throw new Error("TODO: Not implemented"); } /** @@ -412,23 +117,7 @@ class Metadata { * @returns TokenRange build range spanning from start (exclusive) to end (inclusive). */ newTokenRange(start, end) { - if (!this.tokenizer) { - throw new Error( - "Partitioner not established. This should only happen if metadata was disabled or you have not connected yet.", - ); - } - return new TokenRange(start, end, this.tokenizer); - } - - /** - * Gets the metadata information already stored associated to a prepared statement - * @param {String} keyspaceName - * @param {String} query - * @internal - * @ignore - */ - getPreparedInfo(keyspaceName, query) { - return this._preparedQueries.getOrAdd(keyspaceName, query); + throw new Error("TODO: Not implemented"); } /** @@ -436,339 +125,76 @@ class Metadata { * Following calls to the Client using the prepare flag will re-prepare the statements. */ clearPrepared() { - this._preparedQueries.clear(); - } - - /** @ignore */ - getPreparedById(id) { - return this._preparedQueries.getById(id); - } - - /** @ignore */ - setPreparedById(info) { - return this._preparedQueries.setById(info); - } - - /** @ignore */ - getAllPrepared() { - return this._preparedQueries.getAll(); - } - - /** @ignore */ - _uninitializedError() { - return new Error( - "Metadata has not been initialized. This could only happen if you have not connected yet.", - ); + throw new Error("TODO: Not implemented"); } /** * Gets the definition of an user-defined type. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same UDT definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace. * @param {String} name Name of the UDT. - * @param {Function} [callback] The callback to invoke when retrieval completes. */ - getUdt(keyspaceName, name, callback) { - return promiseUtils.optionalCallback( - this._getUdt(keyspaceName, name), - callback, - ); - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @returns {Promise} - * @private - */ - async _getUdt(keyspaceName, name) { - if (!this.initialized) { - throw this._uninitializedError(); - } - let cache; - if (this.options.isMetadataSyncEnabled) { - const keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - return null; - } - cache = keyspace.udts; - } - return await this._schemaParser.getUdt(keyspaceName, name, cache); + getUdt(keyspaceName, name) { + throw new Error("TODO: Not implemented"); } /** * Gets the definition of a table. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same table definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace. * @param {String} name Name of the Table. - * @param {Function} [callback] The callback with the err as a first parameter and the {@link TableMetadata} as - * second parameter. */ - getTable(keyspaceName, name, callback) { - return promiseUtils.optionalCallback( - this._getTable(keyspaceName, name), - callback, - ); - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @private - */ - async _getTable(keyspaceName, name) { - if (!this.initialized) { - throw this._uninitializedError(); - } - let cache; - let virtual; - if (this.options.isMetadataSyncEnabled) { - const keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - return null; - } - cache = keyspace.tables; - virtual = keyspace.virtual; - } - return await this._schemaParser.getTable( - keyspaceName, - name, - cache, - virtual, - ); + getTable(keyspaceName, name) { + throw new Error("TODO: Not implemented"); } /** * Gets the definition of CQL functions for a given name. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same function definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace. * @param {String} name Name of the Function. - * @param {Function} [callback] The callback with the err as a first parameter and the array of {@link SchemaFunction} - * as second parameter. - */ - getFunctions(keyspaceName, name, callback) { - return promiseUtils.optionalCallback( - this._getFunctionsWrapper(keyspaceName, name), - callback, - ); - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @private */ - async _getFunctionsWrapper(keyspaceName, name) { - if (!keyspaceName || !name) { - throw new errors.ArgumentError( - "You must provide the keyspace name and cql function name to retrieve the metadata", - ); - } - const functionsMap = await this._getFunctions( - keyspaceName, - name, - false, - ); - return Array.from(functionsMap.values()); + getFunctions(keyspaceName, name) { + throw new Error("TODO: Not implemented"); } /** * Gets a definition of CQL function for a given name and signature. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same function definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace * @param {String} name Name of the Function * @param {Array.|Array.<{code, info}>} signature Array of types of the parameters. - * @param {Function} [callback] The callback with the err as a first parameter and the {@link SchemaFunction} as second - * parameter. */ - getFunction(keyspaceName, name, signature, callback) { - return promiseUtils.optionalCallback( - this._getSingleFunction(keyspaceName, name, signature, false), - callback, - ); + getFunction(keyspaceName, name, signature) { + throw new Error("TODO: Not implemented"); } /** * Gets the definition of CQL aggregate for a given name. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same aggregates definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace - * @param {String} name Name of the Function - * @param {Function} [callback] The callback with the err as a first parameter and the array of {@link Aggregate} as - * second parameter. - */ - getAggregates(keyspaceName, name, callback) { - return promiseUtils.optionalCallback( - this._getAggregates(keyspaceName, name), - callback, - ); - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @private + * @param {String} name Name of the aggregate */ - async _getAggregates(keyspaceName, name) { - if (!keyspaceName || !name) { - throw new errors.ArgumentError( - "You must provide the keyspace name and cql aggregate name to retrieve the metadata", - ); - } - const functionsMap = await this._getFunctions(keyspaceName, name, true); - return Array.from(functionsMap.values()); + getAggregates(keyspaceName, name) { + throw new Error("TODO: Not implemented"); } /** * Gets a definition of CQL aggregate for a given name and signature. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same aggregate definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace * @param {String} name Name of the aggregate * @param {Array.|Array.<{code, info}>} signature Array of types of the parameters. - * @param {Function} [callback] The callback with the err as a first parameter and the {@link Aggregate} as second parameter. */ - getAggregate(keyspaceName, name, signature, callback) { - return promiseUtils.optionalCallback( - this._getSingleFunction(keyspaceName, name, signature, true), - callback, - ); + getAggregate(keyspaceName, name, signature) { + throw new Error("TODO: Not implemented"); } /** * Gets the definition of a CQL materialized view for a given name. * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * * Note that, unlike the rest of the {@link Metadata} methods, this method does not cache the result for following * calls, as the current version of the Cassandra native protocol does not support schema change events for * materialized views. Each call to this method will produce one or more queries to the cluster. * @param {String} keyspaceName Name of the keyspace * @param {String} name Name of the materialized view - * @param {Function} [callback] The callback with the err as a first parameter and the {@link MaterializedView} as - * second parameter. - */ - getMaterializedView(keyspaceName, name, callback) { - return promiseUtils.optionalCallback( - this._getMaterializedView(keyspaceName, name), - callback, - ); - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @returns {Promise} - * @private */ - async _getMaterializedView(keyspaceName, name) { - if (!this.initialized) { - throw this._uninitializedError(); - } - let cache; - if (this.options.isMetadataSyncEnabled) { - const keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - return null; - } - cache = keyspace.views; - } - return await this._schemaParser.getMaterializedView( - keyspaceName, - name, - cache, - ); - } - - /** - * Gets a map of cql function definitions or aggregates based on signature. - * @param {String} keyspaceName - * @param {String} name Name of the function or aggregate - * @param {Boolean} aggregate - * @returns {Promise} - * @private - */ - async _getFunctions(keyspaceName, name, aggregate) { - if (!this.initialized) { - throw this._uninitializedError(); - } - let cache; - if (this.options.isMetadataSyncEnabled) { - const keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - return new Map(); - } - cache = aggregate ? keyspace.aggregates : keyspace.functions; - } - return await this._schemaParser.getFunctions( - keyspaceName, - name, - aggregate, - cache, - ); - } - - /** - * Gets a single cql function or aggregate definition - * @param {String} keyspaceName - * @param {String} name - * @param {Array} signature - * @param {Boolean} aggregate - * @returns {Promise} - * @private - */ - async _getSingleFunction(keyspaceName, name, signature, aggregate) { - if (!keyspaceName || !name) { - throw new errors.ArgumentError( - "You must provide the keyspace name and cql function name to retrieve the metadata", - ); - } - if (!Array.isArray(signature)) { - throw new errors.ArgumentError( - "Signature must be an array of types", - ); - } - signature = signature.map((item) => { - if (typeof item === "string") { - return item; - } - return types.getDataTypeNameByCode(item); - }); - const functionsMap = await this._getFunctions( - keyspaceName, - name, - aggregate, - ); - return functionsMap.get(signature.join(",")) || null; + getMaterializedView(keyspaceName, name) { + throw new Error("TODO: Not implemented"); } /** @@ -776,35 +202,11 @@ class Metadata { * query. The trace itself is stored in Cassandra in the `sessions` and * `events` table in the `system_traces` keyspace and can be * retrieve manually using the trace identifier. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. * @param {Uuid} traceId Identifier of the trace session. * @param {Number} [consistency] The consistency level to obtain the trace. - * @param {Function} [callback] The callback with the err as first parameter and the query trace as second parameter. */ - getTrace(traceId, consistency, callback) { - if (!callback && typeof consistency === "function") { - // Both callback and consistency are optional parameters - // In this case, the second parameter is the callback - callback = consistency; - consistency = null; - } - - return promiseUtils.optionalCallback( - this._getTrace(traceId, consistency), - callback, - ); - } - - /** - * @param {Uuid} traceId - * @param {Number} consistency - * @returns {Promise} - * @private - */ - async _getTrace() { - throw new Error(`TODO: Not implemented`); + getTrace(traceId, consistency) { + throw new Error("TODO: Not implemented"); } /** @@ -812,220 +214,11 @@ class Metadata { * * This method performs a one-time check only, without any form of retry; therefore * `protocolOptions.maxSchemaAgreementWaitSeconds` setting does not apply in this case. - * @param {Function} [callback] A function that is invoked with a value - * `true` when all hosts agree on the schema and `false` when there is no agreement or when + * @returns {Boolean} `true` when all hosts agree on the schema and `false` when there is no agreement or when * the check could not be performed (for example, if the control connection is down). - * @returns {Promise} Returns a `Promise` when a callback is not provided. The promise resolves to - * `true` when all hosts agree on the schema and `false` when there is no agreement or when - * the check could not be performed (for example, if the control connection is down). - */ - checkSchemaAgreement(callback) { - return promiseUtils.optionalCallback( - this._checkSchemaAgreement(), - callback, - ); - } - - /** - * Async-only version of check schema agreement. - * @private - */ - async _checkSchemaAgreement() { - const connection = this.controlConnection.connection; - if (!connection) { - return false; - } - try { - return await this.compareSchemaVersions(connection); - } catch (err) { - return false; - } - } - - /** - * Uses the metadata to fill the user provided parameter hints - * @param {String} keyspace - * @param {Array} hints - * @internal - * @ignore - */ - async adaptUserHints(keyspace, hints) { - if (!Array.isArray(hints)) { - return; - } - const udts = []; - // Check for udts and get the metadata - for (let i = 0; i < hints.length; i++) { - const hint = hints[i]; - if (typeof hint !== "string") { - continue; - } - - const type = types.dataTypes.getByName(hint); - this._checkUdtTypes(udts, type, keyspace); - hints[i] = type; - } - - for (const type of udts) { - const udtInfo = await this.getUdt( - type.info.keyspace, - type.info.name, - ); - if (!udtInfo) { - throw new TypeError( - "User defined type not found: " + - type.info.keyspace + - "." + - type.info.name, - ); - } - type.info = udtInfo; - } - } - - /** - * @param {Array} udts - * @param {{code, info}} type - * @param {string} keyspace - * @private - */ - _checkUdtTypes(udts, type, keyspace) { - if (type.code === types.dataTypes.udt) { - const udtName = type.info.split("."); - type.info = { - keyspace: udtName[0], - name: udtName[1], - }; - if (!type.info.name) { - if (!keyspace) { - throw new TypeError( - "No keyspace specified for udt: " + udtName.join("."), - ); - } - // use the provided keyspace - type.info.name = type.info.keyspace; - type.info.keyspace = keyspace; - } - udts.push(type); - return; - } - - if (!type.info) { - return; - } - if ( - type.code === types.dataTypes.list || - type.code === types.dataTypes.set - ) { - return this._checkUdtTypes(udts, type.info, keyspace); - } - if (type.code === types.dataTypes.map) { - this._checkUdtTypes(udts, type.info[0], keyspace); - this._checkUdtTypes(udts, type.info[1], keyspace); - } - } - - /** - * Uses the provided connection to query the schema versions and compare them. - * @param {Connection} connection - * @internal - * @ignore */ - async compareSchemaVersions() { - throw new Error(`TODO: Not implemented`); - } -} - -/** - * Allows to store prepared queries and retrieval by query or query id. - * @ignore - */ -class PreparedQueries { - /** - * @param {Number} maxPrepared - * @param {Function} logger - */ - constructor(maxPrepared, logger) { - this.length = 0; - this._maxPrepared = maxPrepared; - this._mapByKey = new Map(); - this._mapById = new Map(); - this._logger = logger; - } - - _getKey(keyspace, query) { - return (keyspace || "") + query; - } - - getOrAdd(keyspace, query) { - const key = this._getKey(keyspace, query); - let info = this._mapByKey.get(key); - if (info) { - return info; - } - - this._validateOverflow(); - - info = new events.EventEmitter(); - info.setMaxListeners(0); - info.query = query; - // The keyspace in which it was prepared - info.keyspace = keyspace; - this._mapByKey.set(key, info); - this.length++; - return info; - } - - _validateOverflow() { - if (this.length < this._maxPrepared) { - return; - } - - const toRemove = []; - this._logger( - "warning", - "Prepared statements exceeded maximum. This could be caused by preparing queries that contain parameters", - ); - - const toRemoveLength = this.length - this._maxPrepared + 1; - - for (const [key, info] of this._mapByKey) { - if (!info.queryId) { - // Only remove queries that contain queryId - continue; - } - - const length = toRemove.push([key, info]); - if (length >= toRemoveLength) { - break; - } - } - - for (const [key, info] of toRemove) { - this._mapByKey.delete(key); - this._mapById.delete(info.queryId.toString("hex")); - this.length--; - } - } - - setById(info) { - this._mapById.set(info.queryId.toString("hex"), info); - } - - getById(id) { - return this._mapById.get(id.toString("hex")); - } - - clear() { - this._mapByKey = new Map(); - this._mapById = new Map(); - this.length = 0; - } - - getAll() { - return Array.from(this._mapByKey.values()).filter( - (info) => !!info.queryId, - ); + checkSchemaAgreement() { + throw new Error("TODO: Not implemented"); } } diff --git a/lib/metadata/schema-parser.js b/lib/metadata/schema-parser.js deleted file mode 100644 index cabd675c7..000000000 --- a/lib/metadata/schema-parser.js +++ /dev/null @@ -1,1396 +0,0 @@ -"use strict"; -const util = require("util"); -const events = require("events"); -const types = require("../types"); -const utils = require("../utils"); -const errors = require("../errors"); -const promiseUtils = require("../promise-utils"); -const TableMetadata = require("./table-metadata"); -const Aggregate = require("./aggregate"); -const SchemaFunction = require("./schema-function"); -const Index = require("./schema-index"); -const MaterializedView = require("./materialized-view"); -const { format } = util; - -/** - * @module metadata/schemaParser - * @ignore - */ - -const _selectAllKeyspacesV1 = "SELECT * FROM system.schema_keyspaces"; -const _selectSingleKeyspaceV1 = - "SELECT * FROM system.schema_keyspaces where keyspace_name = '%s'"; -const _selectAllKeyspacesV2 = "SELECT * FROM system_schema.keyspaces"; -const _selectSingleKeyspaceV2 = - "SELECT * FROM system_schema.keyspaces where keyspace_name = '%s'"; -const _selectTableV1 = - "SELECT * FROM system.schema_columnfamilies WHERE keyspace_name='%s' AND columnfamily_name='%s'"; -const _selectTableV2 = - "SELECT * FROM system_schema.tables WHERE keyspace_name='%s' AND table_name='%s'"; -const _selectColumnsV1 = - "SELECT * FROM system.schema_columns WHERE keyspace_name='%s' AND columnfamily_name='%s'"; -const _selectColumnsV2 = - "SELECT * FROM system_schema.columns WHERE keyspace_name='%s' AND table_name='%s'"; -const _selectIndexesV2 = - "SELECT * FROM system_schema.indexes WHERE keyspace_name='%s' AND table_name='%s'"; -const _selectUdtV1 = - "SELECT * FROM system.schema_usertypes WHERE keyspace_name='%s' AND type_name='%s'"; -const _selectUdtV2 = - "SELECT * FROM system_schema.types WHERE keyspace_name='%s' AND type_name='%s'"; -const _selectFunctionsV1 = - "SELECT * FROM system.schema_functions WHERE keyspace_name = '%s' AND function_name = '%s'"; -const _selectFunctionsV2 = - "SELECT * FROM system_schema.functions WHERE keyspace_name = '%s' AND function_name = '%s'"; -const _selectAggregatesV1 = - "SELECT * FROM system.schema_aggregates WHERE keyspace_name = '%s' AND aggregate_name = '%s'"; -const _selectAggregatesV2 = - "SELECT * FROM system_schema.aggregates WHERE keyspace_name = '%s' AND aggregate_name = '%s'"; -const _selectMaterializedViewV2 = - "SELECT * FROM system_schema.views WHERE keyspace_name = '%s' AND view_name = '%s'"; - -const _selectAllVirtualKeyspaces = - "SELECT * FROM system_virtual_schema.keyspaces"; -const _selectSingleVirtualKeyspace = - "SELECT * FROM system_virtual_schema.keyspaces where keyspace_name = '%s'"; -const _selectVirtualTable = - "SELECT * FROM system_virtual_schema.tables where keyspace_name = '%s' and table_name='%s'"; -const _selectVirtualColumns = - "SELECT * FROM system_virtual_schema.columns where keyspace_name = '%s' and table_name='%s'"; - -/** - * @abstract - * @param {ClientOptions} options The client options - * @param {ControlConnection} cc - * @constructor - * @ignore - */ -class SchemaParser { - constructor(options, cc) { - this.cc = cc; - this.encodingOptions = options.encoding; - this.selectTable = null; - this.selectColumns = null; - this.selectIndexes = null; - this.selectUdt = null; - this.selectAggregates = null; - this.selectFunctions = null; - this.supportsVirtual = false; - } - - /** - * @param name - * @param durableWrites - * @param strategy - * @param strategyOptions - * @param virtual - * @returns {{name, durableWrites, strategy, strategyOptions, tokenToReplica, udts, tables, functions, aggregates}} - * @protected - */ - _createKeyspace(name, durableWrites, strategy, strategyOptions, virtual) { - return { - name, - durableWrites, - strategy, - strategyOptions, - virtual: virtual === true, - udts: {}, - tables: {}, - functions: {}, - aggregates: {}, - views: {}, - tokenToReplica: getTokenToReplicaMapper(strategy, strategyOptions), - graphEngine: undefined, - }; - } - - /** - * @abstract - * @param {String} name - * @returns {Promise} - */ - getKeyspace(name) {} - - /** - * @abstract - * @param {Boolean} waitReconnect - * @returns {Promise>} - */ - getKeyspaces(waitReconnect) {} - - /** - * @param {String} keyspaceName - * @param {String} name - * @param {Object} cache - * @param {Boolean} virtual - * @returns {Promise} - */ - async getTable(keyspaceName, name, cache, virtual) { - let tableInfo = cache && cache[name]; - if (!tableInfo) { - tableInfo = new TableMetadata(name); - if (cache) { - cache[name] = tableInfo; - } - } - if (tableInfo.loaded) { - return tableInfo; - } - if (tableInfo.loading) { - // Wait for it to emit - return promiseUtils.fromEvent(tableInfo, "load"); - } - try { - // its not cached and not being retrieved - tableInfo.loading = true; - let indexRows; - let virtualTable = virtual; - const selectTable = virtualTable - ? _selectVirtualTable - : this.selectTable; - const query = util.format(selectTable, keyspaceName, name); - let tableRow = await this._getFirstRow(query); - // if we weren't sure if table was virtual or not, query virtual schema. - if ( - !tableRow && - this.supportsVirtual && - virtualTable === undefined - ) { - const query = util.format( - _selectVirtualTable, - keyspaceName, - name, - ); - try { - tableRow = await this._getFirstRow(query); - } catch (err) { - // we can't error here as we can't be sure if the node - // supports virtual tables, in this case it is adequate - // to act as if there was no matching table. - } - if (tableRow) { - // We are fetching a virtual table - virtualTable = true; - } - } - if (!tableRow) { - tableInfo.loading = false; - tableInfo.emit("load", null, null); - return null; - } - const selectColumns = virtualTable - ? _selectVirtualColumns - : this.selectColumns; - const columnRows = await this._getRows( - util.format(selectColumns, keyspaceName, name), - ); - if (this.selectIndexes && !virtualTable) { - indexRows = await this._getRows( - util.format(this.selectIndexes, keyspaceName, name), - ); - } - await this._parseTableOrView( - tableInfo, - tableRow, - columnRows, - indexRows, - virtualTable, - ); - tableInfo.loaded = true; - tableInfo.emit("load", null, tableInfo); - return tableInfo; - } catch (err) { - tableInfo.emit("load", err, null); - throw err; - } finally { - tableInfo.loading = false; - } - } - - async _getFirstRow(query) { - const rows = await this._getRows(query); - return rows[0]; - } - - async _getRows(query) { - const response = await this.cc.query(query); - return response.rows; - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @param {Object} cache - * @returns {Promise} - */ - async getUdt(keyspaceName, name, cache) { - let udtInfo = cache && cache[name]; - if (!udtInfo) { - udtInfo = new events.EventEmitter(); - if (cache) { - cache[name] = udtInfo; - } - udtInfo.setMaxListeners(0); - udtInfo.loading = false; - udtInfo.name = name; - udtInfo.keyspace = keyspaceName; - udtInfo.fields = null; - } - if (udtInfo.fields) { - return udtInfo; - } - if (udtInfo.loading) { - return promiseUtils.fromEvent(udtInfo, "load"); - } - udtInfo.loading = true; - const query = format(this.selectUdt, keyspaceName, name); - try { - const row = await this._getFirstRow(query); - if (!row) { - udtInfo.loading = false; - udtInfo.emit("load", null, null); - return null; - } - await this._parseUdt(udtInfo, row); - udtInfo.emit("load", null, udtInfo); - return udtInfo; - } catch (err) { - udtInfo.emit("load", err); - throw err; - } finally { - udtInfo.loading = false; - } - } - - /** - * Parses the udt information from the row - * @param udtInfo - * @param {Row} row - * @returns {Promise} - * @abstract - */ - _parseUdt(udtInfo, row) {} - - /** - * Builds the metadata based on the table and column rows - * @abstract - * @param {module:metadata~TableMetadata} tableInfo - * @param {Row} tableRow - * @param {Array.} columnRows - * @param {Array.} indexRows - * @param {Boolean} virtual - * @returns {Promise} - * @throws {Error} - */ - async _parseTableOrView( - tableInfo, - tableRow, - columnRows, - indexRows, - virtual, - ) {} - - /** - * @abstract - * @param {String} keyspaceName - * @param {String} name - * @param {Object} cache - * @returns {Promise} - */ - getMaterializedView(keyspaceName, name, cache) {} - - /** - * @param {String} keyspaceName - * @param {String} name - * @param {Boolean} aggregate - * @param {Object} cache - * @returns {Promise} - */ - async getFunctions(keyspaceName, name, aggregate, cache) { - /** @type {String} */ - let query = this.selectFunctions; - let parser = (row) => this._parseFunction(row); - if (aggregate) { - query = this.selectAggregates; - parser = (row) => this._parseAggregate(row); - } - // if it's not already loaded, get all functions with that name - // cache it by name and, within name, by signature - let functionsInfo = cache && cache[name]; - if (!functionsInfo) { - functionsInfo = new events.EventEmitter(); - if (cache) { - cache[name] = functionsInfo; - } - functionsInfo.setMaxListeners(0); - } - if (functionsInfo.values) { - return functionsInfo.values; - } - if (functionsInfo.loading) { - return promiseUtils.fromEvent(functionsInfo, "load"); - } - functionsInfo.loading = true; - try { - const rows = await this._getRows(format(query, keyspaceName, name)); - const funcs = await Promise.all(rows.map(parser)); - const result = new Map(); - if (rows.length > 0) { - // Cache positive hits - functionsInfo.values = result; - } - - funcs.forEach((f) => - functionsInfo.values.set(f.signature.join(","), f), - ); - functionsInfo.emit("load", null, result); - return result; - } catch (err) { - functionsInfo.emit("load", err); - throw err; - } finally { - functionsInfo.loading = false; - } - } - - /** - * @abstract - * @param {Row} row - * @returns {Promise} - */ - _parseAggregate(row) {} - - /** - * @abstract - * @param {Row} row - * @returns {Promise} - */ - _parseFunction(row) {} - - /** @returns {Map} */ - _asMap(obj) { - if (!obj) { - return new Map(); - } - if ( - this.encodingOptions.map && - obj instanceof this.encodingOptions.map - ) { - // Its already a Map or a polyfill of a Map - return obj; - } - return new Map(Object.keys(obj).map((k) => [k, obj[k]])); - } - - _mapAsObject(map) { - if (!map) { - return map; - } - if ( - this.encodingOptions.map && - map instanceof this.encodingOptions.map - ) { - const result = {}; - map.forEach((value, key) => (result[key] = value)); - return result; - } - return map; - } -} - -/** - * Used to parse schema information for Cassandra versions 1.2.x, and 2.x - * @ignore - */ -class SchemaParserV1 extends SchemaParser { - /** - * @param {ClientOptions} options - * @param {ControlConnection} cc - */ - constructor(options, cc) { - super(options, cc); - this.selectTable = _selectTableV1; - this.selectColumns = _selectColumnsV1; - this.selectUdt = _selectUdtV1; - this.selectAggregates = _selectAggregatesV1; - this.selectFunctions = _selectFunctionsV1; - } - - async getKeyspaces(waitReconnect) { - const keyspaces = {}; - const result = await this.cc.query( - _selectAllKeyspacesV1, - waitReconnect, - ); - for (let i = 0; i < result.rows.length; i++) { - const row = result.rows[i]; - const ksInfo = this._createKeyspace( - row["keyspace_name"], - row["durable_writes"], - row["strategy_class"], - JSON.parse(row["strategy_options"] || null), - ); - keyspaces[ksInfo.name] = ksInfo; - } - return keyspaces; - } - - async getKeyspace(name) { - const row = await this._getFirstRow( - format(_selectSingleKeyspaceV1, name), - ); - if (!row) { - return null; - } - return this._createKeyspace( - row["keyspace_name"], - row["durable_writes"], - row["strategy_class"], - JSON.parse(row["strategy_options"]), - ); - } - - async _parseTableOrView( - tableInfo, - tableRow, - columnRows, - indexRows, - virtual, - ) { - // All the tableInfo parsing in V1 is sync, it uses a async function because the super class defines one - // to support other versions. - let c, name, types; - const encoder = this.cc.getEncoder(); - const columnsKeyed = {}; - let partitionKeys = []; - let clusteringKeys = []; - tableInfo.bloomFilterFalsePositiveChance = - tableRow["bloom_filter_fp_chance"]; - tableInfo.caching = tableRow["caching"]; - tableInfo.comment = tableRow["comment"]; - tableInfo.compactionClass = tableRow["compaction_strategy_class"]; - tableInfo.compactionOptions = JSON.parse( - tableRow["compaction_strategy_options"], - ); - tableInfo.compression = JSON.parse(tableRow["compression_parameters"]); - tableInfo.gcGraceSeconds = tableRow["gc_grace_seconds"]; - tableInfo.localReadRepairChance = tableRow["local_read_repair_chance"]; - tableInfo.readRepairChance = tableRow["read_repair_chance"]; - tableInfo.populateCacheOnFlush = - tableRow["populate_io_cache_on_flush"] || - tableInfo.populateCacheOnFlush; - tableInfo.memtableFlushPeriod = - tableRow["memtable_flush_period_in_ms"] || - tableInfo.memtableFlushPeriod; - tableInfo.defaultTtl = - tableRow["default_time_to_live"] || tableInfo.defaultTtl; - tableInfo.speculativeRetry = - tableRow["speculative_retry"] || tableInfo.speculativeRetry; - tableInfo.indexInterval = - tableRow["index_interval"] || tableInfo.indexInterval; - if (typeof tableRow["min_index_interval"] !== "undefined") { - // Cassandra 2.1+ - tableInfo.minIndexInterval = - tableRow["min_index_interval"] || tableInfo.minIndexInterval; - tableInfo.maxIndexInterval = - tableRow["max_index_interval"] || tableInfo.maxIndexInterval; - } else { - // set to null - tableInfo.minIndexInterval = null; - tableInfo.maxIndexInterval = null; - } - if (typeof tableRow["replicate_on_write"] !== "undefined") { - // leave the default otherwise - tableInfo.replicateOnWrite = tableRow["replicate_on_write"]; - } - tableInfo.columns = []; - for (let i = 0; i < columnRows.length; i++) { - const row = columnRows[i]; - const type = encoder.parseFqTypeName(row["validator"]); - c = { - name: row["column_name"], - type: type, - isStatic: false, - }; - tableInfo.columns.push(c); - columnsKeyed[c.name] = c; - switch (row["type"]) { - case "partition_key": - partitionKeys.push({ - c: c, - index: row["component_index"] || 0, - }); - break; - case "clustering_key": - clusteringKeys.push({ - c: c, - index: row["component_index"] || 0, - order: c.type.options.reversed ? "DESC" : "ASC", - }); - break; - case "static": - // C* 2.0.6+ supports static columns - c.isStatic = true; - break; - } - } - if (partitionKeys.length > 0) { - tableInfo.partitionKeys = partitionKeys - .sort(utils.propCompare("index")) - .map((item) => item.c); - clusteringKeys.sort(utils.propCompare("index")); - tableInfo.clusteringKeys = clusteringKeys.map((item) => item.c); - tableInfo.clusteringOrder = clusteringKeys.map( - (item) => item.order, - ); - } - // In C* 1.2, keys are not stored on the schema_columns table - const keysStoredInTableRow = tableInfo.partitionKeys.length === 0; - if (keysStoredInTableRow && tableRow["key_aliases"]) { - // In C* 1.2, keys are not stored on the schema_columns table - partitionKeys = JSON.parse(tableRow["key_aliases"]); - types = encoder.parseKeyTypes(tableRow["key_validator"]).types; - for (let i = 0; i < partitionKeys.length; i++) { - name = partitionKeys[i]; - c = columnsKeyed[name]; - if (!c) { - c = { - name: name, - type: types[i], - }; - tableInfo.columns.push(c); - } - tableInfo.partitionKeys.push(c); - } - } - const comparator = encoder.parseKeyTypes(tableRow["comparator"]); - if (keysStoredInTableRow && tableRow["column_aliases"]) { - clusteringKeys = JSON.parse(tableRow["column_aliases"]); - for (let i = 0; i < clusteringKeys.length; i++) { - name = clusteringKeys[i]; - c = columnsKeyed[name]; - if (!c) { - c = { - name: name, - type: comparator.types[i], - }; - tableInfo.columns.push(c); - } - tableInfo.clusteringKeys.push(c); - tableInfo.clusteringOrder.push( - c.type.options.reversed ? "DESC" : "ASC", - ); - } - } - tableInfo.isCompact = !!tableRow["is_dense"]; - if (!tableInfo.isCompact) { - // is_dense column does not exist in previous versions of Cassandra - // also, compact pk, ck and val appear as is_dense false - // clusteringKeys != comparator types - 1 - // or not composite (comparator) - tableInfo.isCompact = - // clustering keys are not marked as composite - !comparator.isComposite || - // only 1 column not part of the partition or clustering keys - (!comparator.hasCollections && - tableInfo.clusteringKeys.length !== - comparator.types.length - 1); - } - name = tableRow["value_alias"]; - if (tableInfo.isCompact && name && !columnsKeyed[name]) { - // additional column in C* 1.2 as value_alias - c = { - name: name, - type: encoder.parseFqTypeName(tableRow["default_validator"]), - }; - tableInfo.columns.push(c); - columnsKeyed[name] = c; - } - tableInfo.columnsByName = columnsKeyed; - tableInfo.indexes = Index.fromColumnRows( - columnRows, - tableInfo.columnsByName, - ); - } - - getMaterializedView(keyspaceName, name, cache) { - return Promise.reject( - new errors.NotSupportedError( - "Materialized views are not supported on Cassandra versions below 3.0", - ), - ); - } - - async _parseAggregate(row) { - const encoder = this.cc.getEncoder(); - const aggregate = new Aggregate(); - aggregate.name = row["aggregate_name"]; - aggregate.keyspaceName = row["keyspace_name"]; - aggregate.signature = row["signature"] || utils.emptyArray; - aggregate.stateFunction = row["state_func"]; - aggregate.finalFunction = row["final_func"]; - aggregate.initConditionRaw = row["initcond"]; - aggregate.argumentTypes = ( - row["argument_types"] || utils.emptyArray - ).map((name) => encoder.parseFqTypeName(name)); - aggregate.stateType = encoder.parseFqTypeName(row["state_type"]); - const initConditionValue = encoder.decode( - aggregate.initConditionRaw, - aggregate.stateType, - ); - if ( - initConditionValue !== null && - typeof initConditionValue !== "undefined" - ) { - aggregate.initCondition = initConditionValue.toString(); - } - aggregate.returnType = encoder.parseFqTypeName(row["return_type"]); - return aggregate; - } - - async _parseFunction(row) { - const encoder = this.cc.getEncoder(); - const func = new SchemaFunction(); - func.name = row["function_name"]; - func.keyspaceName = row["keyspace_name"]; - func.signature = row["signature"] || utils.emptyArray; - func.argumentNames = row["argument_names"] || utils.emptyArray; - func.body = row["body"]; - func.calledOnNullInput = row["called_on_null_input"]; - func.language = row["language"]; - func.argumentTypes = (row["argument_types"] || utils.emptyArray).map( - (name) => encoder.parseFqTypeName(name), - ); - func.returnType = encoder.parseFqTypeName(row["return_type"]); - return func; - } - - async _parseUdt(udtInfo, row) { - const encoder = this.cc.getEncoder(); - const fieldNames = row["field_names"]; - const fieldTypes = row["field_types"]; - const fields = new Array(fieldNames.length); - for (let i = 0; i < fieldNames.length; i++) { - fields[i] = { - name: fieldNames[i], - type: encoder.parseFqTypeName(fieldTypes[i]), - }; - } - udtInfo.fields = fields; - return udtInfo; - } -} - -/** - * Used to parse schema information for Cassandra versions 3.x and above - * @param {ClientOptions} options The client options - * @param {ControlConnection} cc The control connection to be used - * @param {Function} udtResolver The function to be used to retrieve the udts. - * @ignore - */ -class SchemaParserV2 extends SchemaParser { - /** - * @param {ClientOptions} options The client options - * @param {ControlConnection} cc The control connection to be used - * @param {Function} udtResolver The function to be used to retrieve the udts. - */ - constructor(options, cc, udtResolver) { - super(options, cc); - this.udtResolver = udtResolver; - this.selectTable = _selectTableV2; - this.selectColumns = _selectColumnsV2; - this.selectUdt = _selectUdtV2; - this.selectAggregates = _selectAggregatesV2; - this.selectFunctions = _selectFunctionsV2; - this.selectIndexes = _selectIndexesV2; - } - - async getKeyspaces(waitReconnect) { - const keyspaces = {}; - const result = await this.cc.query( - _selectAllKeyspacesV2, - waitReconnect, - ); - for (let i = 0; i < result.rows.length; i++) { - const ksInfo = this._parseKeyspace(result.rows[i]); - keyspaces[ksInfo.name] = ksInfo; - } - return keyspaces; - } - - async getKeyspace(name) { - const row = await this._getFirstRow( - format(_selectSingleKeyspaceV2, name), - ); - if (!row) { - return null; - } - return this._parseKeyspace(row); - } - - async getMaterializedView(keyspaceName, name, cache) { - let viewInfo = cache && cache[name]; - if (!viewInfo) { - viewInfo = new MaterializedView(name); - if (cache) { - cache[name] = viewInfo; - } - } - if (viewInfo.loaded) { - return viewInfo; - } - if (viewInfo.loading) { - return promiseUtils.fromEvent(viewInfo, "load"); - } - viewInfo.loading = true; - try { - const tableRow = await this._getFirstRow( - format(_selectMaterializedViewV2, keyspaceName, name), - ); - if (!tableRow) { - viewInfo.emit("load", null, null); - viewInfo.loading = false; - return null; - } - const columnRows = await this._getRows( - format(this.selectColumns, keyspaceName, name), - ); - await this._parseTableOrView( - viewInfo, - tableRow, - columnRows, - null, - false, - ); - viewInfo.loaded = true; - viewInfo.emit("load", null, viewInfo); - return viewInfo; - } catch (err) { - viewInfo.emit("load", err); - throw err; - } finally { - viewInfo.loading = false; - } - } - - _parseKeyspace(row, virtual) { - const replication = row["replication"]; - let strategy; - let strategyOptions; - if (replication) { - strategy = replication["class"]; - strategyOptions = {}; - for (const key in replication) { - if ( - !Object.prototype.hasOwnProperty.call(replication, key) || - key === "class" - ) { - continue; - } - strategyOptions[key] = replication[key]; - } - } - - const ks = this._createKeyspace( - row["keyspace_name"], - row["durable_writes"], - strategy, - strategyOptions, - virtual, - ); - ks.graphEngine = row["graph_engine"]; - return ks; - } - - async _parseTableOrView( - tableInfo, - tableRow, - columnRows, - indexRows, - virtual, - ) { - const encoder = this.cc.getEncoder(); - const columnsKeyed = {}; - const partitionKeys = []; - const clusteringKeys = []; - tableInfo.columns = await Promise.all( - columnRows.map(async (row) => { - const type = await encoder.parseTypeName( - tableRow["keyspace_name"], - row["type"], - 0, - null, - this.udtResolver, - ); - const c = { - name: row["column_name"], - type: type, - isStatic: false, - }; - columnsKeyed[c.name] = c; - switch (row["kind"]) { - case "partition_key": - partitionKeys.push({ c, index: row["position"] || 0 }); - break; - case "clustering": - clusteringKeys.push({ - c, - index: row["position"] || 0, - order: - row["clustering_order"] === "desc" - ? "DESC" - : "ASC", - }); - break; - case "static": - c.isStatic = true; - break; - } - return c; - }), - ); - tableInfo.columnsByName = columnsKeyed; - tableInfo.partitionKeys = partitionKeys - .sort(utils.propCompare("index")) - .map((item) => item.c); - clusteringKeys.sort(utils.propCompare("index")); - tableInfo.clusteringKeys = clusteringKeys.map((item) => item.c); - tableInfo.clusteringOrder = clusteringKeys.map((item) => item.order); - if (virtual) { - // When table is virtual, the only relevant information to parse are the columns - // as the table itself has no configuration - tableInfo.virtual = true; - return; - } - const isView = tableInfo instanceof MaterializedView; - tableInfo.bloomFilterFalsePositiveChance = - tableRow["bloom_filter_fp_chance"]; - tableInfo.caching = JSON.stringify(tableRow["caching"]); - tableInfo.comment = tableRow["comment"]; - // Regardless of the encoding options, use always an Object to represent an associative Array - const compaction = this._asMap(tableRow["compaction"]); - if (compaction) { - // compactionOptions as an Object - tableInfo.compactionOptions = {}; - tableInfo.compactionClass = compaction.get("class"); - compaction.forEach((value, key) => { - if (key === "class") { - return; - } - tableInfo.compactionOptions[key] = compaction.get(key); - }); - } - // Convert compression to an Object - tableInfo.compression = this._mapAsObject(tableRow["compression"]); - tableInfo.gcGraceSeconds = tableRow["gc_grace_seconds"]; - tableInfo.localReadRepairChance = - tableRow["dclocal_read_repair_chance"]; - tableInfo.readRepairChance = tableRow["read_repair_chance"]; - tableInfo.extensions = this._mapAsObject(tableRow["extensions"]); - tableInfo.crcCheckChance = tableRow["crc_check_chance"]; - tableInfo.memtableFlushPeriod = - tableRow["memtable_flush_period_in_ms"] || - tableInfo.memtableFlushPeriod; - tableInfo.defaultTtl = - tableRow["default_time_to_live"] || tableInfo.defaultTtl; - tableInfo.speculativeRetry = - tableRow["speculative_retry"] || tableInfo.speculativeRetry; - tableInfo.minIndexInterval = - tableRow["min_index_interval"] || tableInfo.minIndexInterval; - tableInfo.maxIndexInterval = - tableRow["max_index_interval"] || tableInfo.maxIndexInterval; - tableInfo.nodesync = tableRow["nodesync"] || tableInfo.nodesync; - if (!isView) { - const cdc = tableRow["cdc"]; - if (cdc !== undefined) { - tableInfo.cdc = cdc; - } - } - if (isView) { - tableInfo.tableName = tableRow["base_table_name"]; - tableInfo.whereClause = tableRow["where_clause"]; - tableInfo.includeAllColumns = tableRow["include_all_columns"]; - return; - } - tableInfo.indexes = this._getIndexes(indexRows); - // flags can be an instance of Array or Set (real or polyfill) - let flags = tableRow["flags"]; - if (Array.isArray(flags)) { - flags = new Set(flags); - } - const isDense = flags.has("dense"); - const isSuper = flags.has("super"); - const isCompound = flags.has("compound"); - tableInfo.isCompact = isSuper || isDense || !isCompound; - // Remove the columns related to Thrift - const isStaticCompact = !isSuper && !isDense && !isCompound; - if (isStaticCompact) { - pruneStaticCompactTableColumns(tableInfo); - } else if (isDense) { - pruneDenseTableColumns(tableInfo); - } - } - - _getIndexes(indexRows) { - if (!indexRows || indexRows.length === 0) { - return utils.emptyArray; - } - return indexRows.map((row) => { - const options = this._mapAsObject(row["options"]); - return new Index( - row["index_name"], - options["target"], - row["kind"], - options, - ); - }); - } - - async _parseAggregate(row) { - const encoder = this.cc.getEncoder(); - const aggregate = new Aggregate(); - aggregate.name = row["aggregate_name"]; - aggregate.keyspaceName = row["keyspace_name"]; - aggregate.signature = row["argument_types"] || utils.emptyArray; - aggregate.stateFunction = row["state_func"]; - aggregate.finalFunction = row["final_func"]; - aggregate.initConditionRaw = row["initcond"]; - aggregate.initCondition = aggregate.initConditionRaw; - aggregate.deterministic = row["deterministic"] || false; - aggregate.argumentTypes = await Promise.all( - aggregate.signature.map((name) => - encoder.parseTypeName( - row["keyspace_name"], - name, - 0, - null, - this.udtResolver, - ), - ), - ); - aggregate.stateType = await encoder.parseTypeName( - row["keyspace_name"], - row["state_type"], - 0, - null, - this.udtResolver, - ); - aggregate.returnType = await encoder.parseTypeName( - row["keyspace_name"], - row["return_type"], - 0, - null, - this.udtResolver, - ); - return aggregate; - } - - async _parseFunction(row) { - const encoder = this.cc.getEncoder(); - const func = new SchemaFunction(); - func.name = row["function_name"]; - func.keyspaceName = row["keyspace_name"]; - func.signature = row["argument_types"] || utils.emptyArray; - func.argumentNames = row["argument_names"] || utils.emptyArray; - func.body = row["body"]; - func.calledOnNullInput = row["called_on_null_input"]; - func.language = row["language"]; - func.deterministic = row["deterministic"] || false; - func.monotonic = row["monotonic"] || false; - func.monotonicOn = row["monotonic_on"] || utils.emptyArray; - func.argumentTypes = await Promise.all( - func.signature.map((name) => - encoder.parseTypeName( - row["keyspace_name"], - name, - 0, - null, - this.udtResolver, - ), - ), - ); - func.returnType = await encoder.parseTypeName( - row["keyspace_name"], - row["return_type"], - 0, - null, - this.udtResolver, - ); - return func; - } - - async _parseUdt(udtInfo, row) { - const encoder = this.cc.getEncoder(); - const fieldTypes = row["field_types"]; - const keyspace = row["keyspace_name"]; - udtInfo.fields = await Promise.all( - row["field_names"].map(async (name, i) => { - const type = await encoder.parseTypeName( - keyspace, - fieldTypes[i], - 0, - null, - this.udtResolver, - ); - return { name, type }; - }), - ); - return udtInfo; - } -} - -/** - * Used to parse schema information for Cassandra versions 4.x and above. - * - * This parser similar to [SchemaParserV2] expect it also parses virtual - * keyspaces. - * @ignore - */ -class SchemaParserV3 extends SchemaParserV2 { - /** - * @param {ClientOptions} options The client options - * @param {ControlConnection} cc The control connection to be used - * @param {Function} udtResolver The function to be used to retrieve the udts. - */ - constructor(options, cc, udtResolver) { - super(options, cc, udtResolver); - this.supportsVirtual = true; - } - - async getKeyspaces(waitReconnect) { - const keyspaces = {}; - const queries = [ - { query: _selectAllKeyspacesV2, virtual: false }, - { query: _selectAllVirtualKeyspaces, virtual: true }, - ]; - - await Promise.all( - queries.map(async (q) => { - let result; - try { - result = await this.cc.query(q.query, waitReconnect); - } catch (err) { - if (q.virtual) { - // Only throw error for non-virtual query as - // server reporting C* 4.0 may not actually implement - // virtual tables. - return; - } - throw err; - } - for (let i = 0; i < result.rows.length; i++) { - const ksInfo = this._parseKeyspace( - result.rows[i], - q.virtual, - ); - keyspaces[ksInfo.name] = ksInfo; - } - }), - ); - return keyspaces; - } - - async getKeyspace(name) { - const ks = await this._getKeyspace( - _selectSingleKeyspaceV2, - name, - false, - ); - if (!ks) { - // if not found, attempt to retrieve as virtual keyspace. - return this._getKeyspace(_selectSingleVirtualKeyspace, name, true); - } - return ks; - } - - async _getKeyspace(query, name, virtual) { - try { - const row = await this._getFirstRow(format(query, name)); - - if (!row) { - return null; - } - - return this._parseKeyspace(row, virtual); - } catch (err) { - if (virtual) { - // only throw error for non-virtual query as - // server reporting C* 4.0 may not actually implement - // virtual tables. - return null; - } - throw err; - } - } -} - -/** - * Upon migration from thrift to CQL, we internally create a pair of surrogate clustering/regular columns - * for compact static tables. These columns shouldn't be exposed to the user but are currently returned by C*. - * We also need to remove the static keyword for all other columns in the table. - * @param {module:metadata~TableMetadata} tableInfo - */ -function pruneStaticCompactTableColumns(tableInfo) { - let i; - let c; - // remove "column1 text" clustering column - for (i = 0; i < tableInfo.clusteringKeys.length; i++) { - c = tableInfo.clusteringKeys[i]; - const index = tableInfo.columns.indexOf(c); - tableInfo.columns.splice(index, 1); - delete tableInfo.columnsByName[c.name]; - } - tableInfo.clusteringKeys = utils.emptyArray; - tableInfo.clusteringOrder = utils.emptyArray; - // remove regular columns and set the static columns to non-static - i = tableInfo.columns.length; - while (i--) { - c = tableInfo.columns[i]; - if (!c.isStatic && tableInfo.partitionKeys.indexOf(c) === -1) { - // remove "value blob" regular column - tableInfo.columns.splice(i, 1); - delete tableInfo.columnsByName[c.name]; - continue; - } - c.isStatic = false; - } -} - -/** - * Upon migration from thrift to CQL, we internally create a surrogate column "value" of type custom. - * This column shouldn't be exposed to the user but is currently returned by C*. - * @param {module:metadata~TableMetadata} tableInfo - */ -function pruneDenseTableColumns(tableInfo) { - let i = tableInfo.columns.length; - while (i--) { - const c = tableInfo.columns[i]; - if ( - !c.isStatic && - c.type.code === types.dataTypes.custom && - c.type.info === "empty" - ) { - // remove "value blob" regular column - tableInfo.columns.splice(i, 1); - delete tableInfo.columnsByName[c.name]; - continue; - } - c.isStatic = false; - } -} - -function getTokenToReplicaMapper(strategy, strategyOptions) { - if (/SimpleStrategy$/.test(strategy)) { - const rf = parseInt(strategyOptions["replication_factor"], 10); - if (rf > 1) { - return getTokenToReplicaSimpleMapper(rf); - } - } - if (/NetworkTopologyStrategy$/.test(strategy)) { - return getTokenToReplicaNetworkMapper(strategyOptions); - } - // default, wrap in an Array - return function noStrategy(tokenizer, ring, primaryReplicas) { - const replicas = {}; - for (const key in primaryReplicas) { - if (!Object.prototype.hasOwnProperty.call(primaryReplicas, key)) { - continue; - } - replicas[key] = [primaryReplicas[key]]; - } - return replicas; - }; -} - -/** - * @param {Number} replicationFactor - * @returns {function} - */ -function getTokenToReplicaSimpleMapper(replicationFactor) { - return function tokenSimpleStrategy( - tokenizer, - ringTokensAsStrings, - primaryReplicas, - ) { - const ringLength = ringTokensAsStrings.length; - const rf = Math.min(replicationFactor, ringLength); - const replicas = {}; - for (let i = 0; i < ringLength; i++) { - const key = ringTokensAsStrings[i]; - const tokenReplicas = [primaryReplicas[key]]; - for (let j = 1; j < ringLength && tokenReplicas.length < rf; j++) { - let nextReplicaIndex = i + j; - if (nextReplicaIndex >= ringLength) { - // circle back - nextReplicaIndex = nextReplicaIndex % ringLength; - } - const nextReplica = - primaryReplicas[ringTokensAsStrings[nextReplicaIndex]]; - // In the case of vnodes, consecutive sections of the ring can be assigned to the same host. - if (tokenReplicas.indexOf(nextReplica) === -1) { - tokenReplicas.push(nextReplica); - } - } - replicas[key] = tokenReplicas; - } - return replicas; - }; -} - -/** - * @param {Object} replicationFactors - * @returns {Function} - * @private - */ -function getTokenToReplicaNetworkMapper(replicationFactors) { - // A(DC1) - // - // H B(DC2) - // | - // G --+-- C(DC1) - // | - // F D(DC2) - // - // E(DC1) - return function tokenNetworkStrategy( - tokenizer, - ringTokensAsStrings, - primaryReplicas, - datacenters, - ) { - const replicas = {}; - const ringLength = ringTokensAsStrings.length; - - for (let i = 0; i < ringLength; i++) { - const key = ringTokensAsStrings[i]; - const tokenReplicas = []; - const replicasByDc = {}; - const racksPlaced = {}; - const skippedHosts = []; - for (let j = 0; j < ringLength; j++) { - let nextReplicaIndex = i + j; - if (nextReplicaIndex >= ringLength) { - // circle back - nextReplicaIndex = nextReplicaIndex % ringLength; - } - const h = - primaryReplicas[ringTokensAsStrings[nextReplicaIndex]]; - // In the case of vnodes, consecutive sections of the ring can be assigned to the same host. - if (tokenReplicas.indexOf(h) !== -1) { - continue; - } - const dc = h.datacenter; - // Check if the next replica belongs to one of the targeted dcs - let dcRf = parseInt(replicationFactors[dc], 10); - if (!dcRf) { - continue; - } - dcRf = Math.min(dcRf, datacenters[dc].hostLength); - let dcReplicas = replicasByDc[dc] || 0; - // Amount of replicas per dc is greater than rf or the amount of host in the datacenter - if (dcReplicas >= dcRf) { - continue; - } - let racksPlacedInDc = racksPlaced[dc]; - if (!racksPlacedInDc) { - racksPlacedInDc = racksPlaced[dc] = new utils.HashSet(); - } - if ( - h.rack && - racksPlacedInDc.contains(h.rack) && - racksPlacedInDc.length < datacenters[dc].racks.length - ) { - // We already selected a replica for this rack - // Skip until replicas in other racks are added - if (skippedHosts.length < dcRf - dcReplicas) { - skippedHosts.push(h); - } - continue; - } - replicasByDc[h.datacenter] = ++dcReplicas; - tokenReplicas.push(h); - if ( - h.rack && - racksPlacedInDc.add(h.rack) && - racksPlacedInDc.length === datacenters[dc].racks.length - ) { - // We finished placing all replicas for all racks in this dc - // Add the skipped hosts - replicasByDc[dc] += addSkippedHosts( - dcRf, - dcReplicas, - tokenReplicas, - skippedHosts, - ); - } - if ( - isDoneForToken( - replicationFactors, - datacenters, - replicasByDc, - ) - ) { - break; - } - } - replicas[key] = tokenReplicas; - } - return replicas; - }; -} - -/** - * @returns {Number} The number of skipped hosts added. - */ -function addSkippedHosts(dcRf, dcReplicas, tokenReplicas, skippedHosts) { - let i; - for (i = 0; i < dcRf - dcReplicas && i < skippedHosts.length; i++) { - tokenReplicas.push(skippedHosts[i]); - } - return i; -} - -function isDoneForToken(replicationFactors, datacenters, replicasByDc) { - const keys = Object.keys(replicationFactors); - for (let i = 0; i < keys.length; i++) { - const dcName = keys[i]; - const dc = datacenters[dcName]; - if (!dc) { - // A DC is included in the RF but the DC does not exist in the topology - continue; - } - const rf = Math.min( - parseInt(replicationFactors[dcName], 10), - dc.hostLength, - ); - if (rf > 0 && (!replicasByDc[dcName] || replicasByDc[dcName] < rf)) { - return false; - } - } - return true; -} - -/** - * Creates a new instance if the currentInstance is not valid for the - * provided Cassandra version - * @param {ClientOptions} options The client options - * @param {ControlConnection} cc The control connection to be used - * @param {Function} udtResolver The function to be used to retrieve the udts. - * @param {Array.} [version] The cassandra version - * @param {SchemaParser} [currentInstance] The current instance - * @returns {SchemaParser} - */ -function getByVersion(options, cc, udtResolver, version, currentInstance) { - let parserConstructor = SchemaParserV1; - if (version && version[0] === 3) { - parserConstructor = SchemaParserV2; - } else if (version && version[0] >= 4) { - parserConstructor = SchemaParserV3; - } - if (!currentInstance || !(currentInstance instanceof parserConstructor)) { - return new parserConstructor(options, cc, udtResolver); - } - return currentInstance; -} - -exports.getByVersion = getByVersion; -exports.isDoneForToken = isDoneForToken; diff --git a/test/unit-broken/metadata-tests.js b/test/unit-broken/metadata-tests.js index e390fd176..4f1030449 100644 --- a/test/unit-broken/metadata-tests.js +++ b/test/unit-broken/metadata-tests.js @@ -19,8 +19,6 @@ const dataTypes = types.dataTypes; const utils = require("../../lib/utils"); const errors = require("../../lib/errors"); const Encoder = require("../../lib/encoder"); -const isDoneForToken = - require("../../lib/metadata/schema-parser").isDoneForToken; describe("Metadata", function () { this.timeout(5000); @@ -4982,47 +4980,6 @@ describe("Metadata", function () { }); }); -describe("SchemaParser", function () { - describe("isDoneForToken()", function () { - it("should skip if dc not included in topology", function () { - const replicationFactors = { dc1: 3, dc2: 1 }; - // dc2 does not exist - const datacenters = { - dc1: { hostLength: 6 }, - }; - assert.strictEqual( - false, - isDoneForToken(replicationFactors, datacenters, {}), - ); - }); - - it("should skip if rf equals to 0", function () { - // rf 0 for dc2 - const replicationFactors = { dc1: 4, dc2: 0 }; - const datacenters = { - dc1: { hostLength: 6 }, - dc2: { hostLength: 6 }, - }; - assert.strictEqual( - true, - isDoneForToken(replicationFactors, datacenters, { dc1: 4 }), - ); - }); - - it("should return false for undefined replicasByDc[dcName]", function () { - const replicationFactors = { dc1: 3, dc2: 1 }; - // dc2 does not exist - const datacenters = { - dc1: { hostLength: 6 }, - }; - assert.strictEqual( - false, - isDoneForToken(replicationFactors, datacenters, {}), - ); - }); - }); -}); - function getControlConnectionForTable( tableRow, columnRows, diff --git a/test/unit-not-supported/event-debouncer-tests.js b/test/unit-not-supported/event-debouncer-tests.js deleted file mode 100644 index df31a9a0a..000000000 --- a/test/unit-not-supported/event-debouncer-tests.js +++ /dev/null @@ -1,347 +0,0 @@ -"use strict"; - -const { assert } = require("chai"); -const sinon = require("sinon"); - -const helper = require("../test-helper"); -const EventDebouncer = require("../../lib/metadata/event-debouncer"); - -describe("EventDebouncer", function () { - describe("timeoutElapsed()", function () { - it("should set the queue to null", function (done) { - const debouncer = newInstance(1); - debouncer._queue = { - mainEvent: { handler: () => Promise.resolve() }, - callbacks: [helper.noop], - }; - debouncer._slideDelay(1); - setTimeout(function () { - assert.strictEqual(debouncer._queue, null); - done(); - }, 40); - }); - - it("should process the main event and invoke all the callbacks", function (done) { - const debouncer = newInstance(1); - let callbackCounter = 0; - function increaseCounter() { - callbackCounter++; - } - debouncer._queue = { - mainEvent: { handler: () => Promise.resolve() }, - callbacks: helper.fillArray(10, increaseCounter), - }; - debouncer._slideDelay(1); - setTimeout(function () { - assert.strictEqual(callbackCounter, 10); - done(); - }, 40); - }); - - it("should process each keyspace the main event and invoke all child the callbacks", function (done) { - const debouncer = newInstance(1); - let callbackCounter = 0; - let ksMainEventCalled = 0; - - function increaseCounter() { - callbackCounter++; - return Promise.resolve(); - } - - debouncer._queue = { - callbacks: [assert.fail], - keyspaces: { - ks1: { - mainEvent: { - handler: () => { - ksMainEventCalled++; - return Promise.resolve(); - }, - callback: increaseCounter, - }, - events: [ - { callback: increaseCounter }, - { callback: increaseCounter }, - ], - }, - }, - }; - - debouncer._slideDelay(1); - - setTimeout(function () { - assert.strictEqual(callbackCounter, 2); - assert.strictEqual(ksMainEventCalled, 1); - done(); - }, 40); - }); - - it("should process each keyspace and invoke handlers and callbacks", function (done) { - const debouncer = newInstance(1); - let callbackCounter = 0; - function increaseCounter() { - callbackCounter++; - } - const handlersCalled = []; - function getHandler(name) { - return function () { - handlersCalled.push(name); - }; - } - debouncer._queue = { - callbacks: [assert.fail], - keyspaces: { - ks1: { - events: [ - { - callback: increaseCounter, - handler: getHandler("A"), - }, - { - callback: increaseCounter, - handler: getHandler("B"), - }, - ], - }, - }, - }; - debouncer._slideDelay(1); - setTimeout(function () { - assert.strictEqual(callbackCounter, 2); - assert.deepEqual(handlersCalled, ["A", "B"]); - done(); - }, 40); - }); - }); - - describe("#eventReceived()", function () { - it("should invoke 1 handler and all the callbacks when one event is flagged as `all`", async () => { - const debouncer = newInstance(20); - let mainEventHandlerCalled = 0; - - await Promise.all([ - debouncer.eventReceived( - { handler: helper.failop, keyspace: "ks1" }, - false, - ), - debouncer.eventReceived( - { - handler: helper.failop, - keyspace: "ks1", - cqlObject: "abc", - }, - false, - ), - debouncer.eventReceived( - { handler: helper.failop, keyspace: "ks2" }, - false, - ), - - // Send an event with `all: true` - debouncer.eventReceived( - { - handler: () => { - mainEventHandlerCalled++; - return Promise.resolve(); - }, - all: true, - }, - false, - ), - - // Another one with `all: false` - debouncer.eventReceived( - { handler: helper.failop, keyspace: "ks2" }, - false, - ), - ]); - - assert.strictEqual(mainEventHandlerCalled, 1); - assert.strictEqual(debouncer._timeout, null); - }); - - it("should invoke 1 keyspace handler and all the callbacks when cqlObject is undefined", async () => { - const debouncer = newInstance(30); - const handlersCalled = []; - function getHandler(name) { - return () => { - handlersCalled.push(name); - return Promise.resolve(); - }; - } - - const promise = Promise.all([ - debouncer.eventReceived( - { handler: getHandler("A"), keyspace: "ks1" }, - false, - ), - debouncer.eventReceived( - { - handler: getHandler("B"), - keyspace: "ks1", - cqlObject: "1a", - }, - false, - ), - debouncer.eventReceived( - { handler: getHandler("C"), keyspace: "ks2" }, - false, - ), - debouncer.eventReceived( - { handler: getHandler("D"), keyspace: "ks2" }, - false, - ), - debouncer.eventReceived( - { handler: getHandler("E"), keyspace: "ks2" }, - false, - ), - debouncer.eventReceived( - { - handler: getHandler("F"), - keyspace: "ks3", - cqlObject: "3a", - }, - false, - ), - debouncer.eventReceived( - { - handler: getHandler("H"), - keyspace: "ks3", - cqlObject: "3b", - }, - false, - ), - ]); - - await helper.delayAsync(5); - - // Should not be called yet - assert.deepStrictEqual(handlersCalled, []); - - await promise; - - assert.deepEqual(handlersCalled, ["A", "E", "F", "H"]); - assert.strictEqual(debouncer._timeout, null); - }); - - it("should not invoke handlers before time elapses", async () => { - const debouncer = newInstance(200); - const handlersCalled = []; - function getHandler(name) { - return () => { - handlersCalled.push(name); - return Promise.resolve(); - }; - } - - debouncer.eventReceived( - { handler: getHandler("A"), keyspace: "ks1" }, - false, - ); - - await helper.delayAsync(20); - - // should not be called yet - assert.lengthOf(handlersCalled, 0); - debouncer.shutdown(); - }); - - it("should process queue immediately when processNow is true", async () => { - const debouncer = newInstance(40); - const handlersCalled = []; - function getHandler(name) { - return () => { - handlersCalled.push(name); - return Promise.resolve(); - }; - } - - const spy = sinon.spy(() => {}); - - const promise = Promise.all([ - debouncer - .eventReceived( - { handler: getHandler("A"), keyspace: "ks2" }, - false, - ) - .then(spy), - debouncer - .eventReceived( - { handler: getHandler("B"), keyspace: "ks1" }, - false, - ) - .then(spy), - // set with process now to true - debouncer - .eventReceived( - { handler: getHandler("C"), keyspace: "ks1" }, - true, - ) - .then(spy), - debouncer - .eventReceived( - { handler: getHandler("D"), keyspace: "ks1" }, - false, - ) - .then(spy), - ]); - - await helper.delayAsync(20); - - // The first three should be resolved by now - assert.strictEqual(spy.callCount, 3); - // Flattens the amount of requests - assert.deepEqual(handlersCalled, ["A", "C"]); - - await promise; - - assert.deepEqual(handlersCalled, ["A", "C", "D"]); - }); - }); - describe("#shutdown()", () => { - it("should invoke all callbacks", async () => { - const debouncer = newInstance(20); - const spy = sinon.spy(() => {}); - - debouncer - .eventReceived( - { handler: helper.failop, keyspace: "ks1" }, - false, - ) - .then(spy); - debouncer - .eventReceived( - { - handler: helper.failop, - keyspace: "ks1", - cqlObject: "1a", - }, - false, - ) - .then(spy); - debouncer - .eventReceived( - { handler: helper.failop, keyspace: "ks2" }, - false, - ) - .then(spy); - debouncer - .eventReceived({ handler: helper.failop, all: true }, false) - .then(spy); - - debouncer.shutdown(); - - await helper.delayAsync(5); - assert.strictEqual(spy.callCount, 4); - - // Check that timer shouldn't elapse - await helper.delayAsync(30); - assert.strictEqual(spy.callCount, 4); - }); - }); -}); - -/** @returns {EventDebouncer} */ -function newInstance(delay) { - return new EventDebouncer(delay, helper.noop); -}