From 63fb49621eda8a91141972c4d32908c366ac7d48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Czech?= Date: Mon, 9 Mar 2026 14:10:10 +0100 Subject: [PATCH 1/4] Remove all metadata class internals This removes all internals of metadata classes, leaving only the public API without any implementation. This simplification is done, to better understand changes to the public API that will be done in the following commits. --- lib/metadata/index.js | 767 +----------------------------------------- 1 file changed, 18 insertions(+), 749 deletions(-) diff --git a/lib/metadata/index.js b/lib/metadata/index.js index 2cfb4a990..a1996ac6f 100644 --- a/lib/metadata/index.js +++ b/lib/metadata/index.js @@ -1,20 +1,10 @@ "use strict"; -const events = require("events"); -const util = require("util"); - /** * Module containing classes and fields related to metadata. * @module metadata */ -const utils = require("../utils"); -const errors = require("../errors"); -const types = require("../types"); -const schemaParserFactory = require("./schema-parser"); -const promiseUtils = require("../promise-utils"); -const { TokenRange } = require("../token"); - /** * @const * @private @@ -59,49 +49,6 @@ class Metadata { * @param {ControlConnection} controlConnection Control connection used to retrieve information. */ constructor(options, controlConnection) { - if (!options) { - throw new errors.ArgumentError("Options are not defined"); - } - - Object.defineProperty(this, "options", { - value: options, - enumerable: false, - writable: false, - }); - Object.defineProperty(this, "controlConnection", { - value: controlConnection, - enumerable: false, - writable: false, - }); - this.keyspaces = {}; - this.initialized = false; - this._isDbaas = false; - this._schemaParser = schemaParserFactory.getByVersion( - options, - controlConnection, - this.getUdt.bind(this), - ); - this.log = utils.log; - this._preparedQueries = new PreparedQueries( - options.maxPrepared, - (...args) => this.log(...args), - ); - } - - /** - * Sets the cassandra version - * @internal - * @ignore - * @param {Array.} version - */ - setCassandraVersion(version) { - this._schemaParser = schemaParserFactory.getByVersion( - this.options, - this.controlConnection, - this.getUdt.bind(this), - version, - this._schemaParser, - ); } /** @@ -110,81 +57,7 @@ class Metadata { * different deployment (on-prem). */ isDbaas() { - return this._isDbaas; - } - - /** - * Sets the product type as DBaaS. - * @internal - * @ignore - */ - setProductTypeAsDbaas() { - this._isDbaas = true; - } - - /** - * Populates the information regarding primary replica per token, datacenters (+ racks) and sorted token ring. - * @ignore - * @param {HostMap} hosts - */ - buildTokens(hosts) { - if (!this.tokenizer) { - return this.log("error", "Tokenizer could not be determined"); - } - // Get a sorted array of tokens - const allSorted = []; - // Get a map of - const primaryReplicas = {}; - // Depending on the amount of tokens, this could be an expensive operation - const hostArray = hosts.values(); - const stringify = this.tokenizer.stringify; - const datacenters = {}; - hostArray.forEach((h) => { - if (!h.tokens) { - return; - } - h.tokens.forEach((tokenString) => { - const token = this.tokenizer.parse(tokenString); - utils.insertSorted(allSorted, token, (t1, t2) => - t1.compare(t2), - ); - primaryReplicas[stringify(token)] = h; - }); - let dc = datacenters[h.datacenter]; - if (!dc) { - dc = datacenters[h.datacenter] = { - hostLength: 0, - racks: new utils.HashSet(), - }; - } - dc.hostLength++; - dc.racks.add(h.rack); - }); - // Primary replica for given token - this.primaryReplicas = primaryReplicas; - // All the tokens in ring order - this.ring = allSorted; - // Build TokenRanges. - const tokenRanges = new Set(); - if (this.ring.length === 1) { - // If there is only one token, return the range ]minToken, minToken] - const min = this.tokenizer.minToken(); - tokenRanges.add(new TokenRange(min, min, this.tokenizer)); - } else { - for (let i = 0; i < this.ring.length; i++) { - const start = this.ring[i]; - const end = this.ring[(i + 1) % this.ring.length]; - tokenRanges.add(new TokenRange(start, end, this.tokenizer)); - } - } - this.tokenRanges = tokenRanges; - // Compute string versions as it's potentially expensive and frequently reused later - this.ringTokensAsStrings = new Array(allSorted.length); - for (let i = 0; i < allSorted.length; i++) { - this.ringTokensAsStrings[i] = stringify(allSorted[i]); - } - // Datacenter metadata (host length and racks) - this.datacenters = datacenters; + throw new Error("TODO: Not implemented"); } /** @@ -196,39 +69,7 @@ class Metadata { * @param {Function} [callback] Optional callback. */ refreshKeyspace(name, callback) { - return promiseUtils.optionalCallback( - this._refreshKeyspace(name), - callback, - ); - } - - /** - * @param {String} name - * @private - */ - async _refreshKeyspace(name) { - if (!this.initialized) { - throw this._uninitializedError(); - } - this.log("info", util.format("Retrieving keyspace %s metadata", name)); - try { - const ksInfo = await this._schemaParser.getKeyspace(name); - if (!ksInfo) { - // the keyspace was dropped - delete this.keyspaces[name]; - return null; - } - // Tokens are lazily init on the keyspace, once a replica from that keyspace is retrieved. - this.keyspaces[ksInfo.name] = ksInfo; - return ksInfo; - } catch (err) { - this.log( - "error", - "There was an error while trying to retrieve keyspace information", - err, - ); - throw err; - } + throw new Error("TODO: Not implemented"); } /** @@ -241,59 +82,7 @@ class Metadata { * @param {Function} [callback] Optional callback. */ refreshKeyspaces(waitReconnect, callback) { - if ( - typeof waitReconnect === "function" || - typeof waitReconnect === "undefined" - ) { - callback = waitReconnect; - waitReconnect = true; - } - if (!this.initialized) { - const err = this._uninitializedError(); - if (callback) { - return callback(err); - } - return Promise.reject(err); - } - return promiseUtils.optionalCallback( - this.refreshKeyspacesInternal(waitReconnect), - callback, - ); - } - - /** - * @param {Boolean} waitReconnect - * @returns {Promise>} - * @ignore - * @internal - */ - async refreshKeyspacesInternal(waitReconnect) { - this.log("info", "Retrieving keyspaces metadata"); - try { - this.keyspaces = - await this._schemaParser.getKeyspaces(waitReconnect); - return this.keyspaces; - } catch (err) { - this.log( - "error", - "There was an error while trying to retrieve keyspaces information", - err, - ); - throw err; - } - } - - _getKeyspaceReplicas(keyspace) { - if (!keyspace.replicas) { - // Calculate replicas the first time for the keyspace - keyspace.replicas = keyspace.tokenToReplica( - this.tokenizer, - this.ringTokensAsStrings, - this.primaryReplicas, - this.datacenters, - ); - } - return keyspace.replicas; + throw new Error("TODO: Not implemented"); } /** @@ -306,39 +95,7 @@ class Metadata { * @returns {Array} */ getReplicas(keyspaceName, token) { - if (!this.ring) { - return null; - } - if (Buffer.isBuffer(token)) { - token = this.tokenizer.hash(token); - } - if (token instanceof TokenRange) { - token = token.end; - } - let keyspace; - if (keyspaceName) { - keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - // the keyspace was not found, the metadata should be loaded beforehand - return null; - } - } - let i = utils.binarySearch(this.ring, token, (t1, t2) => - t1.compare(t2), - ); - if (i < 0) { - i = ~i; - } - if (i >= this.ring.length) { - // it circled back - i = i % this.ring.length; - } - const closestToken = this.ringTokensAsStrings[i]; - if (!keyspaceName) { - return [this.primaryReplicas[closestToken]]; - } - const replicas = this._getKeyspaceReplicas(keyspace); - return replicas[closestToken]; + throw new Error("TODO: Not implemented"); } /** @@ -346,7 +103,7 @@ class Metadata { * @returns {Set} The ranges of the ring or empty set if schema metadata is not enabled. */ getTokenRanges() { - return this.tokenRanges; + throw new Error("TODO: Not implemented"); } /** @@ -357,32 +114,7 @@ class Metadata { * @returns {Set|null} Ranges for the keyspace on this host or null if keyspace isn't found or hasn't been loaded. */ getTokenRangesForHost(keyspaceName, host) { - if (!this.ring) { - return null; - } - let keyspace; - if (keyspaceName) { - keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - // the keyspace was not found, the metadata should be loaded beforehand - return null; - } - } - // If the ring has only 1 token, just return the ranges as we should only have a single node cluster. - if (this.ring.length === 1) { - return this.getTokenRanges(); - } - const replicas = this._getKeyspaceReplicas(keyspace); - const ranges = new Set(); - // for each range, find replicas for end token, if replicas include host, add range. - this.tokenRanges.forEach((tokenRange) => { - const replicasForToken = - replicas[this.tokenizer.stringify(tokenRange.end)]; - if (replicasForToken.indexOf(host) !== -1) { - ranges.add(tokenRange); - } - }); - return ranges; + throw new Error("TODO: Not implemented"); } /** @@ -392,17 +124,7 @@ class Metadata { * @returns {Token} constructed token from the input buffer. */ newToken(components) { - if (!this.tokenizer) { - throw new Error( - "Partitioner not established. This should only happen if metadata was disabled or you have not connected yet.", - ); - } - if (Array.isArray(components)) { - return this.tokenizer.hash(Buffer.concat(components)); - } else if (util.isString(components)) { - return this.tokenizer.parse(components); - } - return this.tokenizer.hash(components); + throw new Error("TODO: Not implemented"); } /** @@ -412,23 +134,7 @@ class Metadata { * @returns TokenRange build range spanning from start (exclusive) to end (inclusive). */ newTokenRange(start, end) { - if (!this.tokenizer) { - throw new Error( - "Partitioner not established. This should only happen if metadata was disabled or you have not connected yet.", - ); - } - return new TokenRange(start, end, this.tokenizer); - } - - /** - * Gets the metadata information already stored associated to a prepared statement - * @param {String} keyspaceName - * @param {String} query - * @internal - * @ignore - */ - getPreparedInfo(keyspaceName, query) { - return this._preparedQueries.getOrAdd(keyspaceName, query); + throw new Error("TODO: Not implemented"); } /** @@ -436,29 +142,7 @@ class Metadata { * Following calls to the Client using the prepare flag will re-prepare the statements. */ clearPrepared() { - this._preparedQueries.clear(); - } - - /** @ignore */ - getPreparedById(id) { - return this._preparedQueries.getById(id); - } - - /** @ignore */ - setPreparedById(info) { - return this._preparedQueries.setById(info); - } - - /** @ignore */ - getAllPrepared() { - return this._preparedQueries.getAll(); - } - - /** @ignore */ - _uninitializedError() { - return new Error( - "Metadata has not been initialized. This could only happen if you have not connected yet.", - ); + throw new Error("TODO: Not implemented"); } /** @@ -474,31 +158,7 @@ class Metadata { * @param {Function} [callback] The callback to invoke when retrieval completes. */ getUdt(keyspaceName, name, callback) { - return promiseUtils.optionalCallback( - this._getUdt(keyspaceName, name), - callback, - ); - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @returns {Promise} - * @private - */ - async _getUdt(keyspaceName, name) { - if (!this.initialized) { - throw this._uninitializedError(); - } - let cache; - if (this.options.isMetadataSyncEnabled) { - const keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - return null; - } - cache = keyspace.udts; - } - return await this._schemaParser.getUdt(keyspaceName, name, cache); + throw new Error("TODO: Not implemented"); } /** @@ -515,37 +175,7 @@ class Metadata { * second parameter. */ getTable(keyspaceName, name, callback) { - return promiseUtils.optionalCallback( - this._getTable(keyspaceName, name), - callback, - ); - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @private - */ - async _getTable(keyspaceName, name) { - if (!this.initialized) { - throw this._uninitializedError(); - } - let cache; - let virtual; - if (this.options.isMetadataSyncEnabled) { - const keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - return null; - } - cache = keyspace.tables; - virtual = keyspace.virtual; - } - return await this._schemaParser.getTable( - keyspaceName, - name, - cache, - virtual, - ); + throw new Error("TODO: Not implemented"); } /** @@ -562,29 +192,7 @@ class Metadata { * as second parameter. */ getFunctions(keyspaceName, name, callback) { - return promiseUtils.optionalCallback( - this._getFunctionsWrapper(keyspaceName, name), - callback, - ); - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @private - */ - async _getFunctionsWrapper(keyspaceName, name) { - if (!keyspaceName || !name) { - throw new errors.ArgumentError( - "You must provide the keyspace name and cql function name to retrieve the metadata", - ); - } - const functionsMap = await this._getFunctions( - keyspaceName, - name, - false, - ); - return Array.from(functionsMap.values()); + throw new Error("TODO: Not implemented"); } /** @@ -602,10 +210,7 @@ class Metadata { * parameter. */ getFunction(keyspaceName, name, signature, callback) { - return promiseUtils.optionalCallback( - this._getSingleFunction(keyspaceName, name, signature, false), - callback, - ); + throw new Error("TODO: Not implemented"); } /** @@ -622,25 +227,7 @@ class Metadata { * second parameter. */ getAggregates(keyspaceName, name, callback) { - return promiseUtils.optionalCallback( - this._getAggregates(keyspaceName, name), - callback, - ); - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @private - */ - async _getAggregates(keyspaceName, name) { - if (!keyspaceName || !name) { - throw new errors.ArgumentError( - "You must provide the keyspace name and cql aggregate name to retrieve the metadata", - ); - } - const functionsMap = await this._getFunctions(keyspaceName, name, true); - return Array.from(functionsMap.values()); + throw new Error("TODO: Not implemented"); } /** @@ -657,10 +244,7 @@ class Metadata { * @param {Function} [callback] The callback with the err as a first parameter and the {@link Aggregate} as second parameter. */ getAggregate(keyspaceName, name, signature, callback) { - return promiseUtils.optionalCallback( - this._getSingleFunction(keyspaceName, name, signature, true), - callback, - ); + throw new Error("TODO: Not implemented"); } /** @@ -678,97 +262,7 @@ class Metadata { * second parameter. */ getMaterializedView(keyspaceName, name, callback) { - return promiseUtils.optionalCallback( - this._getMaterializedView(keyspaceName, name), - callback, - ); - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @returns {Promise} - * @private - */ - async _getMaterializedView(keyspaceName, name) { - if (!this.initialized) { - throw this._uninitializedError(); - } - let cache; - if (this.options.isMetadataSyncEnabled) { - const keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - return null; - } - cache = keyspace.views; - } - return await this._schemaParser.getMaterializedView( - keyspaceName, - name, - cache, - ); - } - - /** - * Gets a map of cql function definitions or aggregates based on signature. - * @param {String} keyspaceName - * @param {String} name Name of the function or aggregate - * @param {Boolean} aggregate - * @returns {Promise} - * @private - */ - async _getFunctions(keyspaceName, name, aggregate) { - if (!this.initialized) { - throw this._uninitializedError(); - } - let cache; - if (this.options.isMetadataSyncEnabled) { - const keyspace = this.keyspaces[keyspaceName]; - if (!keyspace) { - return new Map(); - } - cache = aggregate ? keyspace.aggregates : keyspace.functions; - } - return await this._schemaParser.getFunctions( - keyspaceName, - name, - aggregate, - cache, - ); - } - - /** - * Gets a single cql function or aggregate definition - * @param {String} keyspaceName - * @param {String} name - * @param {Array} signature - * @param {Boolean} aggregate - * @returns {Promise} - * @private - */ - async _getSingleFunction(keyspaceName, name, signature, aggregate) { - if (!keyspaceName || !name) { - throw new errors.ArgumentError( - "You must provide the keyspace name and cql function name to retrieve the metadata", - ); - } - if (!Array.isArray(signature)) { - throw new errors.ArgumentError( - "Signature must be an array of types", - ); - } - signature = signature.map((item) => { - if (typeof item === "string") { - return item; - } - return types.getDataTypeNameByCode(item); - }); - const functionsMap = await this._getFunctions( - keyspaceName, - name, - aggregate, - ); - return functionsMap.get(signature.join(",")) || null; + throw new Error("TODO: Not implemented"); } /** @@ -784,27 +278,7 @@ class Metadata { * @param {Function} [callback] The callback with the err as first parameter and the query trace as second parameter. */ getTrace(traceId, consistency, callback) { - if (!callback && typeof consistency === "function") { - // Both callback and consistency are optional parameters - // In this case, the second parameter is the callback - callback = consistency; - consistency = null; - } - - return promiseUtils.optionalCallback( - this._getTrace(traceId, consistency), - callback, - ); - } - - /** - * @param {Uuid} traceId - * @param {Number} consistency - * @returns {Promise} - * @private - */ - async _getTrace() { - throw new Error(`TODO: Not implemented`); + throw new Error("TODO: Not implemented"); } /** @@ -820,212 +294,7 @@ class Metadata { * the check could not be performed (for example, if the control connection is down). */ checkSchemaAgreement(callback) { - return promiseUtils.optionalCallback( - this._checkSchemaAgreement(), - callback, - ); - } - - /** - * Async-only version of check schema agreement. - * @private - */ - async _checkSchemaAgreement() { - const connection = this.controlConnection.connection; - if (!connection) { - return false; - } - try { - return await this.compareSchemaVersions(connection); - } catch (err) { - return false; - } - } - - /** - * Uses the metadata to fill the user provided parameter hints - * @param {String} keyspace - * @param {Array} hints - * @internal - * @ignore - */ - async adaptUserHints(keyspace, hints) { - if (!Array.isArray(hints)) { - return; - } - const udts = []; - // Check for udts and get the metadata - for (let i = 0; i < hints.length; i++) { - const hint = hints[i]; - if (typeof hint !== "string") { - continue; - } - - const type = types.dataTypes.getByName(hint); - this._checkUdtTypes(udts, type, keyspace); - hints[i] = type; - } - - for (const type of udts) { - const udtInfo = await this.getUdt( - type.info.keyspace, - type.info.name, - ); - if (!udtInfo) { - throw new TypeError( - "User defined type not found: " + - type.info.keyspace + - "." + - type.info.name, - ); - } - type.info = udtInfo; - } - } - - /** - * @param {Array} udts - * @param {{code, info}} type - * @param {string} keyspace - * @private - */ - _checkUdtTypes(udts, type, keyspace) { - if (type.code === types.dataTypes.udt) { - const udtName = type.info.split("."); - type.info = { - keyspace: udtName[0], - name: udtName[1], - }; - if (!type.info.name) { - if (!keyspace) { - throw new TypeError( - "No keyspace specified for udt: " + udtName.join("."), - ); - } - // use the provided keyspace - type.info.name = type.info.keyspace; - type.info.keyspace = keyspace; - } - udts.push(type); - return; - } - - if (!type.info) { - return; - } - if ( - type.code === types.dataTypes.list || - type.code === types.dataTypes.set - ) { - return this._checkUdtTypes(udts, type.info, keyspace); - } - if (type.code === types.dataTypes.map) { - this._checkUdtTypes(udts, type.info[0], keyspace); - this._checkUdtTypes(udts, type.info[1], keyspace); - } - } - - /** - * Uses the provided connection to query the schema versions and compare them. - * @param {Connection} connection - * @internal - * @ignore - */ - async compareSchemaVersions() { - throw new Error(`TODO: Not implemented`); - } -} - -/** - * Allows to store prepared queries and retrieval by query or query id. - * @ignore - */ -class PreparedQueries { - /** - * @param {Number} maxPrepared - * @param {Function} logger - */ - constructor(maxPrepared, logger) { - this.length = 0; - this._maxPrepared = maxPrepared; - this._mapByKey = new Map(); - this._mapById = new Map(); - this._logger = logger; - } - - _getKey(keyspace, query) { - return (keyspace || "") + query; - } - - getOrAdd(keyspace, query) { - const key = this._getKey(keyspace, query); - let info = this._mapByKey.get(key); - if (info) { - return info; - } - - this._validateOverflow(); - - info = new events.EventEmitter(); - info.setMaxListeners(0); - info.query = query; - // The keyspace in which it was prepared - info.keyspace = keyspace; - this._mapByKey.set(key, info); - this.length++; - return info; - } - - _validateOverflow() { - if (this.length < this._maxPrepared) { - return; - } - - const toRemove = []; - this._logger( - "warning", - "Prepared statements exceeded maximum. This could be caused by preparing queries that contain parameters", - ); - - const toRemoveLength = this.length - this._maxPrepared + 1; - - for (const [key, info] of this._mapByKey) { - if (!info.queryId) { - // Only remove queries that contain queryId - continue; - } - - const length = toRemove.push([key, info]); - if (length >= toRemoveLength) { - break; - } - } - - for (const [key, info] of toRemove) { - this._mapByKey.delete(key); - this._mapById.delete(info.queryId.toString("hex")); - this.length--; - } - } - - setById(info) { - this._mapById.set(info.queryId.toString("hex"), info); - } - - getById(id) { - return this._mapById.get(id.toString("hex")); - } - - clear() { - this._mapByKey = new Map(); - this._mapById = new Map(); - this.length = 0; - } - - getAll() { - return Array.from(this._mapByKey.values()).filter( - (info) => !!info.queryId, - ); + throw new Error("TODO: Not implemented"); } } From a54db3f3ed1ba69aa16caf8664895dafa5aa039b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Czech?= Date: Mon, 9 Mar 2026 14:22:32 +0100 Subject: [PATCH 2/4] Remove isDbaas from metadata API This was a metadata specific to DSx service --- lib/metadata/index.js | 9 --------- 1 file changed, 9 deletions(-) diff --git a/lib/metadata/index.js b/lib/metadata/index.js index a1996ac6f..38c7fad08 100644 --- a/lib/metadata/index.js +++ b/lib/metadata/index.js @@ -51,15 +51,6 @@ class Metadata { constructor(options, controlConnection) { } - /** - * Determines whether the cluster is provided as a service. - * @returns {boolean} true when the cluster is provided as a service (DataStax Astra), false when it's a - * different deployment (on-prem). - */ - isDbaas() { - throw new Error("TODO: Not implemented"); - } - /** * Gets the keyspace metadata information and updates the internal state of the driver. * From 289e5a912205b0e6c3c1232da71b0ed80df8e486 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Czech?= Date: Mon, 9 Mar 2026 14:42:31 +0100 Subject: [PATCH 3/4] Update the documentation to sync version. The plan is to let the rust driver handle all metadata fetching. This means we will be able to retrieve all the metadata in a sync way. This commit updates the metadata class API to expose all endpoints in a sync way. This is meant to split the API changes into smaller parts, to better understand the differences between the old and new API. --- lib/metadata/index.js | 95 +++++++------------------------------------ 1 file changed, 14 insertions(+), 81 deletions(-) diff --git a/lib/metadata/index.js b/lib/metadata/index.js index 38c7fad08..c75904845 100644 --- a/lib/metadata/index.js +++ b/lib/metadata/index.js @@ -53,26 +53,18 @@ class Metadata { /** * Gets the keyspace metadata information and updates the internal state of the driver. - * - * If a `callback` is provided, the callback is invoked when the keyspaces metadata refresh completes. - * Otherwise, it returns a `Promise`. * @param {String} name Name of the keyspace. - * @param {Function} [callback] Optional callback. */ - refreshKeyspace(name, callback) { + refreshKeyspace(name) { throw new Error("TODO: Not implemented"); } /** * Gets the metadata information of all the keyspaces and updates the internal state of the driver. - * - * If a `callback` is provided, the callback is invoked when the keyspace metadata refresh completes. - * Otherwise, it returns a `Promise`. - * @param {Boolean|Function} [waitReconnect] Determines if it should wait for reconnection in case the control connection is not + * @param {Boolean} [waitReconnect] Determines if it should wait for reconnection in case the control connection is not * connected at the moment. Default: true. - * @param {Function} [callback] Optional callback. */ - refreshKeyspaces(waitReconnect, callback) { + refreshKeyspaces(waitReconnect) { throw new Error("TODO: Not implemented"); } @@ -138,121 +130,70 @@ class Metadata { /** * Gets the definition of an user-defined type. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same UDT definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace. * @param {String} name Name of the UDT. - * @param {Function} [callback] The callback to invoke when retrieval completes. */ - getUdt(keyspaceName, name, callback) { + getUdt(keyspaceName, name) { throw new Error("TODO: Not implemented"); } /** * Gets the definition of a table. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same table definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace. * @param {String} name Name of the Table. - * @param {Function} [callback] The callback with the err as a first parameter and the {@link TableMetadata} as - * second parameter. */ - getTable(keyspaceName, name, callback) { + getTable(keyspaceName, name) { throw new Error("TODO: Not implemented"); } /** * Gets the definition of CQL functions for a given name. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same function definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace. * @param {String} name Name of the Function. - * @param {Function} [callback] The callback with the err as a first parameter and the array of {@link SchemaFunction} - * as second parameter. */ - getFunctions(keyspaceName, name, callback) { + getFunctions(keyspaceName, name) { throw new Error("TODO: Not implemented"); } /** * Gets a definition of CQL function for a given name and signature. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same function definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace * @param {String} name Name of the Function * @param {Array.|Array.<{code, info}>} signature Array of types of the parameters. - * @param {Function} [callback] The callback with the err as a first parameter and the {@link SchemaFunction} as second - * parameter. */ - getFunction(keyspaceName, name, signature, callback) { + getFunction(keyspaceName, name, signature) { throw new Error("TODO: Not implemented"); } /** * Gets the definition of CQL aggregate for a given name. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same aggregates definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace - * @param {String} name Name of the Function - * @param {Function} [callback] The callback with the err as a first parameter and the array of {@link Aggregate} as - * second parameter. + * @param {String} name Name of the aggregate */ - getAggregates(keyspaceName, name, callback) { + getAggregates(keyspaceName, name) { throw new Error("TODO: Not implemented"); } /** * Gets a definition of CQL aggregate for a given name and signature. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * - * When trying to retrieve the same aggregate definition concurrently, it will query once and invoke all callbacks - * with the retrieved information. * @param {String} keyspaceName Name of the keyspace * @param {String} name Name of the aggregate * @param {Array.|Array.<{code, info}>} signature Array of types of the parameters. - * @param {Function} [callback] The callback with the err as a first parameter and the {@link Aggregate} as second parameter. */ - getAggregate(keyspaceName, name, signature, callback) { + getAggregate(keyspaceName, name, signature) { throw new Error("TODO: Not implemented"); } /** * Gets the definition of a CQL materialized view for a given name. * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. - * * Note that, unlike the rest of the {@link Metadata} methods, this method does not cache the result for following * calls, as the current version of the Cassandra native protocol does not support schema change events for * materialized views. Each call to this method will produce one or more queries to the cluster. * @param {String} keyspaceName Name of the keyspace * @param {String} name Name of the materialized view - * @param {Function} [callback] The callback with the err as a first parameter and the {@link MaterializedView} as - * second parameter. */ - getMaterializedView(keyspaceName, name, callback) { + getMaterializedView(keyspaceName, name) { throw new Error("TODO: Not implemented"); } @@ -261,14 +202,10 @@ class Metadata { * query. The trace itself is stored in Cassandra in the `sessions` and * `events` table in the `system_traces` keyspace and can be * retrieve manually using the trace identifier. - * - * If a `callback` is provided, the callback is invoked when the metadata retrieval completes. - * Otherwise, it returns a `Promise`. * @param {Uuid} traceId Identifier of the trace session. * @param {Number} [consistency] The consistency level to obtain the trace. - * @param {Function} [callback] The callback with the err as first parameter and the query trace as second parameter. */ - getTrace(traceId, consistency, callback) { + getTrace(traceId, consistency) { throw new Error("TODO: Not implemented"); } @@ -277,14 +214,10 @@ class Metadata { * * This method performs a one-time check only, without any form of retry; therefore * `protocolOptions.maxSchemaAgreementWaitSeconds` setting does not apply in this case. - * @param {Function} [callback] A function that is invoked with a value - * `true` when all hosts agree on the schema and `false` when there is no agreement or when - * the check could not be performed (for example, if the control connection is down). - * @returns {Promise} Returns a `Promise` when a callback is not provided. The promise resolves to - * `true` when all hosts agree on the schema and `false` when there is no agreement or when + * @returns {Boolean} `true` when all hosts agree on the schema and `false` when there is no agreement or when * the check could not be performed (for example, if the control connection is down). */ - checkSchemaAgreement(callback) { + checkSchemaAgreement() { throw new Error("TODO: Not implemented"); } } From a5151e47e67b8946d0d0542f4d23ea7b074c1a1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Czech?= Date: Mon, 9 Mar 2026 15:22:55 +0100 Subject: [PATCH 4/4] Remove event debouncer This is a no longer used part of the code that wasn't exposed in the public API --- lib/metadata/event-debouncer.js | 159 -- lib/metadata/schema-parser.js | 1396 ----------------- test/unit-broken/metadata-tests.js | 43 - .../event-debouncer-tests.js | 347 ---- 4 files changed, 1945 deletions(-) delete mode 100644 lib/metadata/event-debouncer.js delete mode 100644 lib/metadata/schema-parser.js delete mode 100644 test/unit-not-supported/event-debouncer-tests.js diff --git a/lib/metadata/event-debouncer.js b/lib/metadata/event-debouncer.js deleted file mode 100644 index fa4411080..000000000 --- a/lib/metadata/event-debouncer.js +++ /dev/null @@ -1,159 +0,0 @@ -"use strict"; - -const util = require("util"); -const utils = require("../utils"); -const promiseUtils = require("../promise-utils"); - -const _queueOverflowThreshold = 1000; - -/** - * Debounce protocol events by acting on those events with a sliding delay. - * @ignore - * @constructor - */ -class EventDebouncer { - /** - * Creates a new instance of the event debouncer. - * @param {Number} delay - * @param {Function} logger - */ - constructor(delay, logger) { - this._delay = delay; - this._logger = logger; - this._queue = null; - this._timeout = null; - } - - /** - * Adds a new event to the queue and moves the delay. - * @param {{ handler: Function, all: boolean|undefined, keyspace: String|undefined, - * cqlObject: String|null|undefined }} event - * @param {Boolean} processNow - * @returns {Promise} - */ - eventReceived(event, processNow) { - return new Promise((resolve, reject) => { - event.callback = promiseUtils.getCallback(resolve, reject); - this._queue = this._queue || { callbacks: [], keyspaces: {} }; - const delay = !processNow ? this._delay : 0; - if (event.all) { - // when an event marked with all is received, it supersedes all the rest of events - // a full update (hosts + keyspaces + tokens) is going to be made - this._queue.mainEvent = event; - } - if (this._queue.callbacks.length === _queueOverflowThreshold) { - // warn once - this._logger( - "warn", - util.format( - "Event debouncer queue exceeded %d events", - _queueOverflowThreshold, - ), - ); - } - this._queue.callbacks.push(event.callback); - if (this._queue.mainEvent) { - // a full refresh is scheduled and the callback was added, nothing else to do. - return this._slideDelay(delay); - } - // Insert at keyspace level - let keyspaceEvents = this._queue.keyspaces[event.keyspace]; - if (!keyspaceEvents) { - keyspaceEvents = this._queue.keyspaces[event.keyspace] = { - events: [], - }; - } - if (event.cqlObject === undefined) { - // a full refresh of the keyspace, supersedes all child keyspace events - keyspaceEvents.mainEvent = event; - } - keyspaceEvents.events.push(event); - this._slideDelay(delay); - }); - } - - /** - * @param {Number} delay - * @private - * */ - _slideDelay(delay) { - const self = this; - function process() { - const q = self._queue; - self._queue = null; - self._timeout = null; - processQueue(q); - } - if (delay === 0) { - // no delay, process immediately - if (this._timeout) { - clearTimeout(this._timeout); - } - return process(); - } - const previousTimeout = this._timeout; - // Add the new timeout before removing the previous one performs better - this._timeout = setTimeout(process, delay); - if (previousTimeout) { - clearTimeout(previousTimeout); - } - } - - /** - * Clears the timeout and invokes all pending callback. - */ - shutdown() { - if (!this._queue) { - return; - } - this._queue.callbacks.forEach(function (cb) { - cb(); - }); - this._queue = null; - clearTimeout(this._timeout); - this._timeout = null; - } -} - -/** - * @param {{callbacks: Array, keyspaces: Object, mainEvent: Object}} q - * @private - */ -function processQueue(q) { - if (q.mainEvent) { - // refresh all by invoking 1 handler and invoke all pending callbacks - return promiseUtils.toCallback(q.mainEvent.handler(), (err) => { - for (let i = 0; i < q.callbacks.length; i++) { - q.callbacks[i](err); - } - }); - } - - utils.each(Object.keys(q.keyspaces), function eachKeyspace(name, next) { - const keyspaceEvents = q.keyspaces[name]; - if (keyspaceEvents.mainEvent) { - // refresh a keyspace - return promiseUtils.toCallback( - keyspaceEvents.mainEvent.handler(), - function mainEventCallback(err) { - for (let i = 0; i < keyspaceEvents.events.length; i++) { - keyspaceEvents.events[i].callback(err); - } - - next(); - }, - ); - } - - // deal with individual handlers and callbacks - keyspaceEvents.events.forEach((event) => { - // sync handlers - event.handler(); - event.callback(); - }); - - next(); - }); -} - -module.exports = EventDebouncer; diff --git a/lib/metadata/schema-parser.js b/lib/metadata/schema-parser.js deleted file mode 100644 index cabd675c7..000000000 --- a/lib/metadata/schema-parser.js +++ /dev/null @@ -1,1396 +0,0 @@ -"use strict"; -const util = require("util"); -const events = require("events"); -const types = require("../types"); -const utils = require("../utils"); -const errors = require("../errors"); -const promiseUtils = require("../promise-utils"); -const TableMetadata = require("./table-metadata"); -const Aggregate = require("./aggregate"); -const SchemaFunction = require("./schema-function"); -const Index = require("./schema-index"); -const MaterializedView = require("./materialized-view"); -const { format } = util; - -/** - * @module metadata/schemaParser - * @ignore - */ - -const _selectAllKeyspacesV1 = "SELECT * FROM system.schema_keyspaces"; -const _selectSingleKeyspaceV1 = - "SELECT * FROM system.schema_keyspaces where keyspace_name = '%s'"; -const _selectAllKeyspacesV2 = "SELECT * FROM system_schema.keyspaces"; -const _selectSingleKeyspaceV2 = - "SELECT * FROM system_schema.keyspaces where keyspace_name = '%s'"; -const _selectTableV1 = - "SELECT * FROM system.schema_columnfamilies WHERE keyspace_name='%s' AND columnfamily_name='%s'"; -const _selectTableV2 = - "SELECT * FROM system_schema.tables WHERE keyspace_name='%s' AND table_name='%s'"; -const _selectColumnsV1 = - "SELECT * FROM system.schema_columns WHERE keyspace_name='%s' AND columnfamily_name='%s'"; -const _selectColumnsV2 = - "SELECT * FROM system_schema.columns WHERE keyspace_name='%s' AND table_name='%s'"; -const _selectIndexesV2 = - "SELECT * FROM system_schema.indexes WHERE keyspace_name='%s' AND table_name='%s'"; -const _selectUdtV1 = - "SELECT * FROM system.schema_usertypes WHERE keyspace_name='%s' AND type_name='%s'"; -const _selectUdtV2 = - "SELECT * FROM system_schema.types WHERE keyspace_name='%s' AND type_name='%s'"; -const _selectFunctionsV1 = - "SELECT * FROM system.schema_functions WHERE keyspace_name = '%s' AND function_name = '%s'"; -const _selectFunctionsV2 = - "SELECT * FROM system_schema.functions WHERE keyspace_name = '%s' AND function_name = '%s'"; -const _selectAggregatesV1 = - "SELECT * FROM system.schema_aggregates WHERE keyspace_name = '%s' AND aggregate_name = '%s'"; -const _selectAggregatesV2 = - "SELECT * FROM system_schema.aggregates WHERE keyspace_name = '%s' AND aggregate_name = '%s'"; -const _selectMaterializedViewV2 = - "SELECT * FROM system_schema.views WHERE keyspace_name = '%s' AND view_name = '%s'"; - -const _selectAllVirtualKeyspaces = - "SELECT * FROM system_virtual_schema.keyspaces"; -const _selectSingleVirtualKeyspace = - "SELECT * FROM system_virtual_schema.keyspaces where keyspace_name = '%s'"; -const _selectVirtualTable = - "SELECT * FROM system_virtual_schema.tables where keyspace_name = '%s' and table_name='%s'"; -const _selectVirtualColumns = - "SELECT * FROM system_virtual_schema.columns where keyspace_name = '%s' and table_name='%s'"; - -/** - * @abstract - * @param {ClientOptions} options The client options - * @param {ControlConnection} cc - * @constructor - * @ignore - */ -class SchemaParser { - constructor(options, cc) { - this.cc = cc; - this.encodingOptions = options.encoding; - this.selectTable = null; - this.selectColumns = null; - this.selectIndexes = null; - this.selectUdt = null; - this.selectAggregates = null; - this.selectFunctions = null; - this.supportsVirtual = false; - } - - /** - * @param name - * @param durableWrites - * @param strategy - * @param strategyOptions - * @param virtual - * @returns {{name, durableWrites, strategy, strategyOptions, tokenToReplica, udts, tables, functions, aggregates}} - * @protected - */ - _createKeyspace(name, durableWrites, strategy, strategyOptions, virtual) { - return { - name, - durableWrites, - strategy, - strategyOptions, - virtual: virtual === true, - udts: {}, - tables: {}, - functions: {}, - aggregates: {}, - views: {}, - tokenToReplica: getTokenToReplicaMapper(strategy, strategyOptions), - graphEngine: undefined, - }; - } - - /** - * @abstract - * @param {String} name - * @returns {Promise} - */ - getKeyspace(name) {} - - /** - * @abstract - * @param {Boolean} waitReconnect - * @returns {Promise>} - */ - getKeyspaces(waitReconnect) {} - - /** - * @param {String} keyspaceName - * @param {String} name - * @param {Object} cache - * @param {Boolean} virtual - * @returns {Promise} - */ - async getTable(keyspaceName, name, cache, virtual) { - let tableInfo = cache && cache[name]; - if (!tableInfo) { - tableInfo = new TableMetadata(name); - if (cache) { - cache[name] = tableInfo; - } - } - if (tableInfo.loaded) { - return tableInfo; - } - if (tableInfo.loading) { - // Wait for it to emit - return promiseUtils.fromEvent(tableInfo, "load"); - } - try { - // its not cached and not being retrieved - tableInfo.loading = true; - let indexRows; - let virtualTable = virtual; - const selectTable = virtualTable - ? _selectVirtualTable - : this.selectTable; - const query = util.format(selectTable, keyspaceName, name); - let tableRow = await this._getFirstRow(query); - // if we weren't sure if table was virtual or not, query virtual schema. - if ( - !tableRow && - this.supportsVirtual && - virtualTable === undefined - ) { - const query = util.format( - _selectVirtualTable, - keyspaceName, - name, - ); - try { - tableRow = await this._getFirstRow(query); - } catch (err) { - // we can't error here as we can't be sure if the node - // supports virtual tables, in this case it is adequate - // to act as if there was no matching table. - } - if (tableRow) { - // We are fetching a virtual table - virtualTable = true; - } - } - if (!tableRow) { - tableInfo.loading = false; - tableInfo.emit("load", null, null); - return null; - } - const selectColumns = virtualTable - ? _selectVirtualColumns - : this.selectColumns; - const columnRows = await this._getRows( - util.format(selectColumns, keyspaceName, name), - ); - if (this.selectIndexes && !virtualTable) { - indexRows = await this._getRows( - util.format(this.selectIndexes, keyspaceName, name), - ); - } - await this._parseTableOrView( - tableInfo, - tableRow, - columnRows, - indexRows, - virtualTable, - ); - tableInfo.loaded = true; - tableInfo.emit("load", null, tableInfo); - return tableInfo; - } catch (err) { - tableInfo.emit("load", err, null); - throw err; - } finally { - tableInfo.loading = false; - } - } - - async _getFirstRow(query) { - const rows = await this._getRows(query); - return rows[0]; - } - - async _getRows(query) { - const response = await this.cc.query(query); - return response.rows; - } - - /** - * @param {String} keyspaceName - * @param {String} name - * @param {Object} cache - * @returns {Promise} - */ - async getUdt(keyspaceName, name, cache) { - let udtInfo = cache && cache[name]; - if (!udtInfo) { - udtInfo = new events.EventEmitter(); - if (cache) { - cache[name] = udtInfo; - } - udtInfo.setMaxListeners(0); - udtInfo.loading = false; - udtInfo.name = name; - udtInfo.keyspace = keyspaceName; - udtInfo.fields = null; - } - if (udtInfo.fields) { - return udtInfo; - } - if (udtInfo.loading) { - return promiseUtils.fromEvent(udtInfo, "load"); - } - udtInfo.loading = true; - const query = format(this.selectUdt, keyspaceName, name); - try { - const row = await this._getFirstRow(query); - if (!row) { - udtInfo.loading = false; - udtInfo.emit("load", null, null); - return null; - } - await this._parseUdt(udtInfo, row); - udtInfo.emit("load", null, udtInfo); - return udtInfo; - } catch (err) { - udtInfo.emit("load", err); - throw err; - } finally { - udtInfo.loading = false; - } - } - - /** - * Parses the udt information from the row - * @param udtInfo - * @param {Row} row - * @returns {Promise} - * @abstract - */ - _parseUdt(udtInfo, row) {} - - /** - * Builds the metadata based on the table and column rows - * @abstract - * @param {module:metadata~TableMetadata} tableInfo - * @param {Row} tableRow - * @param {Array.} columnRows - * @param {Array.} indexRows - * @param {Boolean} virtual - * @returns {Promise} - * @throws {Error} - */ - async _parseTableOrView( - tableInfo, - tableRow, - columnRows, - indexRows, - virtual, - ) {} - - /** - * @abstract - * @param {String} keyspaceName - * @param {String} name - * @param {Object} cache - * @returns {Promise} - */ - getMaterializedView(keyspaceName, name, cache) {} - - /** - * @param {String} keyspaceName - * @param {String} name - * @param {Boolean} aggregate - * @param {Object} cache - * @returns {Promise} - */ - async getFunctions(keyspaceName, name, aggregate, cache) { - /** @type {String} */ - let query = this.selectFunctions; - let parser = (row) => this._parseFunction(row); - if (aggregate) { - query = this.selectAggregates; - parser = (row) => this._parseAggregate(row); - } - // if it's not already loaded, get all functions with that name - // cache it by name and, within name, by signature - let functionsInfo = cache && cache[name]; - if (!functionsInfo) { - functionsInfo = new events.EventEmitter(); - if (cache) { - cache[name] = functionsInfo; - } - functionsInfo.setMaxListeners(0); - } - if (functionsInfo.values) { - return functionsInfo.values; - } - if (functionsInfo.loading) { - return promiseUtils.fromEvent(functionsInfo, "load"); - } - functionsInfo.loading = true; - try { - const rows = await this._getRows(format(query, keyspaceName, name)); - const funcs = await Promise.all(rows.map(parser)); - const result = new Map(); - if (rows.length > 0) { - // Cache positive hits - functionsInfo.values = result; - } - - funcs.forEach((f) => - functionsInfo.values.set(f.signature.join(","), f), - ); - functionsInfo.emit("load", null, result); - return result; - } catch (err) { - functionsInfo.emit("load", err); - throw err; - } finally { - functionsInfo.loading = false; - } - } - - /** - * @abstract - * @param {Row} row - * @returns {Promise} - */ - _parseAggregate(row) {} - - /** - * @abstract - * @param {Row} row - * @returns {Promise} - */ - _parseFunction(row) {} - - /** @returns {Map} */ - _asMap(obj) { - if (!obj) { - return new Map(); - } - if ( - this.encodingOptions.map && - obj instanceof this.encodingOptions.map - ) { - // Its already a Map or a polyfill of a Map - return obj; - } - return new Map(Object.keys(obj).map((k) => [k, obj[k]])); - } - - _mapAsObject(map) { - if (!map) { - return map; - } - if ( - this.encodingOptions.map && - map instanceof this.encodingOptions.map - ) { - const result = {}; - map.forEach((value, key) => (result[key] = value)); - return result; - } - return map; - } -} - -/** - * Used to parse schema information for Cassandra versions 1.2.x, and 2.x - * @ignore - */ -class SchemaParserV1 extends SchemaParser { - /** - * @param {ClientOptions} options - * @param {ControlConnection} cc - */ - constructor(options, cc) { - super(options, cc); - this.selectTable = _selectTableV1; - this.selectColumns = _selectColumnsV1; - this.selectUdt = _selectUdtV1; - this.selectAggregates = _selectAggregatesV1; - this.selectFunctions = _selectFunctionsV1; - } - - async getKeyspaces(waitReconnect) { - const keyspaces = {}; - const result = await this.cc.query( - _selectAllKeyspacesV1, - waitReconnect, - ); - for (let i = 0; i < result.rows.length; i++) { - const row = result.rows[i]; - const ksInfo = this._createKeyspace( - row["keyspace_name"], - row["durable_writes"], - row["strategy_class"], - JSON.parse(row["strategy_options"] || null), - ); - keyspaces[ksInfo.name] = ksInfo; - } - return keyspaces; - } - - async getKeyspace(name) { - const row = await this._getFirstRow( - format(_selectSingleKeyspaceV1, name), - ); - if (!row) { - return null; - } - return this._createKeyspace( - row["keyspace_name"], - row["durable_writes"], - row["strategy_class"], - JSON.parse(row["strategy_options"]), - ); - } - - async _parseTableOrView( - tableInfo, - tableRow, - columnRows, - indexRows, - virtual, - ) { - // All the tableInfo parsing in V1 is sync, it uses a async function because the super class defines one - // to support other versions. - let c, name, types; - const encoder = this.cc.getEncoder(); - const columnsKeyed = {}; - let partitionKeys = []; - let clusteringKeys = []; - tableInfo.bloomFilterFalsePositiveChance = - tableRow["bloom_filter_fp_chance"]; - tableInfo.caching = tableRow["caching"]; - tableInfo.comment = tableRow["comment"]; - tableInfo.compactionClass = tableRow["compaction_strategy_class"]; - tableInfo.compactionOptions = JSON.parse( - tableRow["compaction_strategy_options"], - ); - tableInfo.compression = JSON.parse(tableRow["compression_parameters"]); - tableInfo.gcGraceSeconds = tableRow["gc_grace_seconds"]; - tableInfo.localReadRepairChance = tableRow["local_read_repair_chance"]; - tableInfo.readRepairChance = tableRow["read_repair_chance"]; - tableInfo.populateCacheOnFlush = - tableRow["populate_io_cache_on_flush"] || - tableInfo.populateCacheOnFlush; - tableInfo.memtableFlushPeriod = - tableRow["memtable_flush_period_in_ms"] || - tableInfo.memtableFlushPeriod; - tableInfo.defaultTtl = - tableRow["default_time_to_live"] || tableInfo.defaultTtl; - tableInfo.speculativeRetry = - tableRow["speculative_retry"] || tableInfo.speculativeRetry; - tableInfo.indexInterval = - tableRow["index_interval"] || tableInfo.indexInterval; - if (typeof tableRow["min_index_interval"] !== "undefined") { - // Cassandra 2.1+ - tableInfo.minIndexInterval = - tableRow["min_index_interval"] || tableInfo.minIndexInterval; - tableInfo.maxIndexInterval = - tableRow["max_index_interval"] || tableInfo.maxIndexInterval; - } else { - // set to null - tableInfo.minIndexInterval = null; - tableInfo.maxIndexInterval = null; - } - if (typeof tableRow["replicate_on_write"] !== "undefined") { - // leave the default otherwise - tableInfo.replicateOnWrite = tableRow["replicate_on_write"]; - } - tableInfo.columns = []; - for (let i = 0; i < columnRows.length; i++) { - const row = columnRows[i]; - const type = encoder.parseFqTypeName(row["validator"]); - c = { - name: row["column_name"], - type: type, - isStatic: false, - }; - tableInfo.columns.push(c); - columnsKeyed[c.name] = c; - switch (row["type"]) { - case "partition_key": - partitionKeys.push({ - c: c, - index: row["component_index"] || 0, - }); - break; - case "clustering_key": - clusteringKeys.push({ - c: c, - index: row["component_index"] || 0, - order: c.type.options.reversed ? "DESC" : "ASC", - }); - break; - case "static": - // C* 2.0.6+ supports static columns - c.isStatic = true; - break; - } - } - if (partitionKeys.length > 0) { - tableInfo.partitionKeys = partitionKeys - .sort(utils.propCompare("index")) - .map((item) => item.c); - clusteringKeys.sort(utils.propCompare("index")); - tableInfo.clusteringKeys = clusteringKeys.map((item) => item.c); - tableInfo.clusteringOrder = clusteringKeys.map( - (item) => item.order, - ); - } - // In C* 1.2, keys are not stored on the schema_columns table - const keysStoredInTableRow = tableInfo.partitionKeys.length === 0; - if (keysStoredInTableRow && tableRow["key_aliases"]) { - // In C* 1.2, keys are not stored on the schema_columns table - partitionKeys = JSON.parse(tableRow["key_aliases"]); - types = encoder.parseKeyTypes(tableRow["key_validator"]).types; - for (let i = 0; i < partitionKeys.length; i++) { - name = partitionKeys[i]; - c = columnsKeyed[name]; - if (!c) { - c = { - name: name, - type: types[i], - }; - tableInfo.columns.push(c); - } - tableInfo.partitionKeys.push(c); - } - } - const comparator = encoder.parseKeyTypes(tableRow["comparator"]); - if (keysStoredInTableRow && tableRow["column_aliases"]) { - clusteringKeys = JSON.parse(tableRow["column_aliases"]); - for (let i = 0; i < clusteringKeys.length; i++) { - name = clusteringKeys[i]; - c = columnsKeyed[name]; - if (!c) { - c = { - name: name, - type: comparator.types[i], - }; - tableInfo.columns.push(c); - } - tableInfo.clusteringKeys.push(c); - tableInfo.clusteringOrder.push( - c.type.options.reversed ? "DESC" : "ASC", - ); - } - } - tableInfo.isCompact = !!tableRow["is_dense"]; - if (!tableInfo.isCompact) { - // is_dense column does not exist in previous versions of Cassandra - // also, compact pk, ck and val appear as is_dense false - // clusteringKeys != comparator types - 1 - // or not composite (comparator) - tableInfo.isCompact = - // clustering keys are not marked as composite - !comparator.isComposite || - // only 1 column not part of the partition or clustering keys - (!comparator.hasCollections && - tableInfo.clusteringKeys.length !== - comparator.types.length - 1); - } - name = tableRow["value_alias"]; - if (tableInfo.isCompact && name && !columnsKeyed[name]) { - // additional column in C* 1.2 as value_alias - c = { - name: name, - type: encoder.parseFqTypeName(tableRow["default_validator"]), - }; - tableInfo.columns.push(c); - columnsKeyed[name] = c; - } - tableInfo.columnsByName = columnsKeyed; - tableInfo.indexes = Index.fromColumnRows( - columnRows, - tableInfo.columnsByName, - ); - } - - getMaterializedView(keyspaceName, name, cache) { - return Promise.reject( - new errors.NotSupportedError( - "Materialized views are not supported on Cassandra versions below 3.0", - ), - ); - } - - async _parseAggregate(row) { - const encoder = this.cc.getEncoder(); - const aggregate = new Aggregate(); - aggregate.name = row["aggregate_name"]; - aggregate.keyspaceName = row["keyspace_name"]; - aggregate.signature = row["signature"] || utils.emptyArray; - aggregate.stateFunction = row["state_func"]; - aggregate.finalFunction = row["final_func"]; - aggregate.initConditionRaw = row["initcond"]; - aggregate.argumentTypes = ( - row["argument_types"] || utils.emptyArray - ).map((name) => encoder.parseFqTypeName(name)); - aggregate.stateType = encoder.parseFqTypeName(row["state_type"]); - const initConditionValue = encoder.decode( - aggregate.initConditionRaw, - aggregate.stateType, - ); - if ( - initConditionValue !== null && - typeof initConditionValue !== "undefined" - ) { - aggregate.initCondition = initConditionValue.toString(); - } - aggregate.returnType = encoder.parseFqTypeName(row["return_type"]); - return aggregate; - } - - async _parseFunction(row) { - const encoder = this.cc.getEncoder(); - const func = new SchemaFunction(); - func.name = row["function_name"]; - func.keyspaceName = row["keyspace_name"]; - func.signature = row["signature"] || utils.emptyArray; - func.argumentNames = row["argument_names"] || utils.emptyArray; - func.body = row["body"]; - func.calledOnNullInput = row["called_on_null_input"]; - func.language = row["language"]; - func.argumentTypes = (row["argument_types"] || utils.emptyArray).map( - (name) => encoder.parseFqTypeName(name), - ); - func.returnType = encoder.parseFqTypeName(row["return_type"]); - return func; - } - - async _parseUdt(udtInfo, row) { - const encoder = this.cc.getEncoder(); - const fieldNames = row["field_names"]; - const fieldTypes = row["field_types"]; - const fields = new Array(fieldNames.length); - for (let i = 0; i < fieldNames.length; i++) { - fields[i] = { - name: fieldNames[i], - type: encoder.parseFqTypeName(fieldTypes[i]), - }; - } - udtInfo.fields = fields; - return udtInfo; - } -} - -/** - * Used to parse schema information for Cassandra versions 3.x and above - * @param {ClientOptions} options The client options - * @param {ControlConnection} cc The control connection to be used - * @param {Function} udtResolver The function to be used to retrieve the udts. - * @ignore - */ -class SchemaParserV2 extends SchemaParser { - /** - * @param {ClientOptions} options The client options - * @param {ControlConnection} cc The control connection to be used - * @param {Function} udtResolver The function to be used to retrieve the udts. - */ - constructor(options, cc, udtResolver) { - super(options, cc); - this.udtResolver = udtResolver; - this.selectTable = _selectTableV2; - this.selectColumns = _selectColumnsV2; - this.selectUdt = _selectUdtV2; - this.selectAggregates = _selectAggregatesV2; - this.selectFunctions = _selectFunctionsV2; - this.selectIndexes = _selectIndexesV2; - } - - async getKeyspaces(waitReconnect) { - const keyspaces = {}; - const result = await this.cc.query( - _selectAllKeyspacesV2, - waitReconnect, - ); - for (let i = 0; i < result.rows.length; i++) { - const ksInfo = this._parseKeyspace(result.rows[i]); - keyspaces[ksInfo.name] = ksInfo; - } - return keyspaces; - } - - async getKeyspace(name) { - const row = await this._getFirstRow( - format(_selectSingleKeyspaceV2, name), - ); - if (!row) { - return null; - } - return this._parseKeyspace(row); - } - - async getMaterializedView(keyspaceName, name, cache) { - let viewInfo = cache && cache[name]; - if (!viewInfo) { - viewInfo = new MaterializedView(name); - if (cache) { - cache[name] = viewInfo; - } - } - if (viewInfo.loaded) { - return viewInfo; - } - if (viewInfo.loading) { - return promiseUtils.fromEvent(viewInfo, "load"); - } - viewInfo.loading = true; - try { - const tableRow = await this._getFirstRow( - format(_selectMaterializedViewV2, keyspaceName, name), - ); - if (!tableRow) { - viewInfo.emit("load", null, null); - viewInfo.loading = false; - return null; - } - const columnRows = await this._getRows( - format(this.selectColumns, keyspaceName, name), - ); - await this._parseTableOrView( - viewInfo, - tableRow, - columnRows, - null, - false, - ); - viewInfo.loaded = true; - viewInfo.emit("load", null, viewInfo); - return viewInfo; - } catch (err) { - viewInfo.emit("load", err); - throw err; - } finally { - viewInfo.loading = false; - } - } - - _parseKeyspace(row, virtual) { - const replication = row["replication"]; - let strategy; - let strategyOptions; - if (replication) { - strategy = replication["class"]; - strategyOptions = {}; - for (const key in replication) { - if ( - !Object.prototype.hasOwnProperty.call(replication, key) || - key === "class" - ) { - continue; - } - strategyOptions[key] = replication[key]; - } - } - - const ks = this._createKeyspace( - row["keyspace_name"], - row["durable_writes"], - strategy, - strategyOptions, - virtual, - ); - ks.graphEngine = row["graph_engine"]; - return ks; - } - - async _parseTableOrView( - tableInfo, - tableRow, - columnRows, - indexRows, - virtual, - ) { - const encoder = this.cc.getEncoder(); - const columnsKeyed = {}; - const partitionKeys = []; - const clusteringKeys = []; - tableInfo.columns = await Promise.all( - columnRows.map(async (row) => { - const type = await encoder.parseTypeName( - tableRow["keyspace_name"], - row["type"], - 0, - null, - this.udtResolver, - ); - const c = { - name: row["column_name"], - type: type, - isStatic: false, - }; - columnsKeyed[c.name] = c; - switch (row["kind"]) { - case "partition_key": - partitionKeys.push({ c, index: row["position"] || 0 }); - break; - case "clustering": - clusteringKeys.push({ - c, - index: row["position"] || 0, - order: - row["clustering_order"] === "desc" - ? "DESC" - : "ASC", - }); - break; - case "static": - c.isStatic = true; - break; - } - return c; - }), - ); - tableInfo.columnsByName = columnsKeyed; - tableInfo.partitionKeys = partitionKeys - .sort(utils.propCompare("index")) - .map((item) => item.c); - clusteringKeys.sort(utils.propCompare("index")); - tableInfo.clusteringKeys = clusteringKeys.map((item) => item.c); - tableInfo.clusteringOrder = clusteringKeys.map((item) => item.order); - if (virtual) { - // When table is virtual, the only relevant information to parse are the columns - // as the table itself has no configuration - tableInfo.virtual = true; - return; - } - const isView = tableInfo instanceof MaterializedView; - tableInfo.bloomFilterFalsePositiveChance = - tableRow["bloom_filter_fp_chance"]; - tableInfo.caching = JSON.stringify(tableRow["caching"]); - tableInfo.comment = tableRow["comment"]; - // Regardless of the encoding options, use always an Object to represent an associative Array - const compaction = this._asMap(tableRow["compaction"]); - if (compaction) { - // compactionOptions as an Object - tableInfo.compactionOptions = {}; - tableInfo.compactionClass = compaction.get("class"); - compaction.forEach((value, key) => { - if (key === "class") { - return; - } - tableInfo.compactionOptions[key] = compaction.get(key); - }); - } - // Convert compression to an Object - tableInfo.compression = this._mapAsObject(tableRow["compression"]); - tableInfo.gcGraceSeconds = tableRow["gc_grace_seconds"]; - tableInfo.localReadRepairChance = - tableRow["dclocal_read_repair_chance"]; - tableInfo.readRepairChance = tableRow["read_repair_chance"]; - tableInfo.extensions = this._mapAsObject(tableRow["extensions"]); - tableInfo.crcCheckChance = tableRow["crc_check_chance"]; - tableInfo.memtableFlushPeriod = - tableRow["memtable_flush_period_in_ms"] || - tableInfo.memtableFlushPeriod; - tableInfo.defaultTtl = - tableRow["default_time_to_live"] || tableInfo.defaultTtl; - tableInfo.speculativeRetry = - tableRow["speculative_retry"] || tableInfo.speculativeRetry; - tableInfo.minIndexInterval = - tableRow["min_index_interval"] || tableInfo.minIndexInterval; - tableInfo.maxIndexInterval = - tableRow["max_index_interval"] || tableInfo.maxIndexInterval; - tableInfo.nodesync = tableRow["nodesync"] || tableInfo.nodesync; - if (!isView) { - const cdc = tableRow["cdc"]; - if (cdc !== undefined) { - tableInfo.cdc = cdc; - } - } - if (isView) { - tableInfo.tableName = tableRow["base_table_name"]; - tableInfo.whereClause = tableRow["where_clause"]; - tableInfo.includeAllColumns = tableRow["include_all_columns"]; - return; - } - tableInfo.indexes = this._getIndexes(indexRows); - // flags can be an instance of Array or Set (real or polyfill) - let flags = tableRow["flags"]; - if (Array.isArray(flags)) { - flags = new Set(flags); - } - const isDense = flags.has("dense"); - const isSuper = flags.has("super"); - const isCompound = flags.has("compound"); - tableInfo.isCompact = isSuper || isDense || !isCompound; - // Remove the columns related to Thrift - const isStaticCompact = !isSuper && !isDense && !isCompound; - if (isStaticCompact) { - pruneStaticCompactTableColumns(tableInfo); - } else if (isDense) { - pruneDenseTableColumns(tableInfo); - } - } - - _getIndexes(indexRows) { - if (!indexRows || indexRows.length === 0) { - return utils.emptyArray; - } - return indexRows.map((row) => { - const options = this._mapAsObject(row["options"]); - return new Index( - row["index_name"], - options["target"], - row["kind"], - options, - ); - }); - } - - async _parseAggregate(row) { - const encoder = this.cc.getEncoder(); - const aggregate = new Aggregate(); - aggregate.name = row["aggregate_name"]; - aggregate.keyspaceName = row["keyspace_name"]; - aggregate.signature = row["argument_types"] || utils.emptyArray; - aggregate.stateFunction = row["state_func"]; - aggregate.finalFunction = row["final_func"]; - aggregate.initConditionRaw = row["initcond"]; - aggregate.initCondition = aggregate.initConditionRaw; - aggregate.deterministic = row["deterministic"] || false; - aggregate.argumentTypes = await Promise.all( - aggregate.signature.map((name) => - encoder.parseTypeName( - row["keyspace_name"], - name, - 0, - null, - this.udtResolver, - ), - ), - ); - aggregate.stateType = await encoder.parseTypeName( - row["keyspace_name"], - row["state_type"], - 0, - null, - this.udtResolver, - ); - aggregate.returnType = await encoder.parseTypeName( - row["keyspace_name"], - row["return_type"], - 0, - null, - this.udtResolver, - ); - return aggregate; - } - - async _parseFunction(row) { - const encoder = this.cc.getEncoder(); - const func = new SchemaFunction(); - func.name = row["function_name"]; - func.keyspaceName = row["keyspace_name"]; - func.signature = row["argument_types"] || utils.emptyArray; - func.argumentNames = row["argument_names"] || utils.emptyArray; - func.body = row["body"]; - func.calledOnNullInput = row["called_on_null_input"]; - func.language = row["language"]; - func.deterministic = row["deterministic"] || false; - func.monotonic = row["monotonic"] || false; - func.monotonicOn = row["monotonic_on"] || utils.emptyArray; - func.argumentTypes = await Promise.all( - func.signature.map((name) => - encoder.parseTypeName( - row["keyspace_name"], - name, - 0, - null, - this.udtResolver, - ), - ), - ); - func.returnType = await encoder.parseTypeName( - row["keyspace_name"], - row["return_type"], - 0, - null, - this.udtResolver, - ); - return func; - } - - async _parseUdt(udtInfo, row) { - const encoder = this.cc.getEncoder(); - const fieldTypes = row["field_types"]; - const keyspace = row["keyspace_name"]; - udtInfo.fields = await Promise.all( - row["field_names"].map(async (name, i) => { - const type = await encoder.parseTypeName( - keyspace, - fieldTypes[i], - 0, - null, - this.udtResolver, - ); - return { name, type }; - }), - ); - return udtInfo; - } -} - -/** - * Used to parse schema information for Cassandra versions 4.x and above. - * - * This parser similar to [SchemaParserV2] expect it also parses virtual - * keyspaces. - * @ignore - */ -class SchemaParserV3 extends SchemaParserV2 { - /** - * @param {ClientOptions} options The client options - * @param {ControlConnection} cc The control connection to be used - * @param {Function} udtResolver The function to be used to retrieve the udts. - */ - constructor(options, cc, udtResolver) { - super(options, cc, udtResolver); - this.supportsVirtual = true; - } - - async getKeyspaces(waitReconnect) { - const keyspaces = {}; - const queries = [ - { query: _selectAllKeyspacesV2, virtual: false }, - { query: _selectAllVirtualKeyspaces, virtual: true }, - ]; - - await Promise.all( - queries.map(async (q) => { - let result; - try { - result = await this.cc.query(q.query, waitReconnect); - } catch (err) { - if (q.virtual) { - // Only throw error for non-virtual query as - // server reporting C* 4.0 may not actually implement - // virtual tables. - return; - } - throw err; - } - for (let i = 0; i < result.rows.length; i++) { - const ksInfo = this._parseKeyspace( - result.rows[i], - q.virtual, - ); - keyspaces[ksInfo.name] = ksInfo; - } - }), - ); - return keyspaces; - } - - async getKeyspace(name) { - const ks = await this._getKeyspace( - _selectSingleKeyspaceV2, - name, - false, - ); - if (!ks) { - // if not found, attempt to retrieve as virtual keyspace. - return this._getKeyspace(_selectSingleVirtualKeyspace, name, true); - } - return ks; - } - - async _getKeyspace(query, name, virtual) { - try { - const row = await this._getFirstRow(format(query, name)); - - if (!row) { - return null; - } - - return this._parseKeyspace(row, virtual); - } catch (err) { - if (virtual) { - // only throw error for non-virtual query as - // server reporting C* 4.0 may not actually implement - // virtual tables. - return null; - } - throw err; - } - } -} - -/** - * Upon migration from thrift to CQL, we internally create a pair of surrogate clustering/regular columns - * for compact static tables. These columns shouldn't be exposed to the user but are currently returned by C*. - * We also need to remove the static keyword for all other columns in the table. - * @param {module:metadata~TableMetadata} tableInfo - */ -function pruneStaticCompactTableColumns(tableInfo) { - let i; - let c; - // remove "column1 text" clustering column - for (i = 0; i < tableInfo.clusteringKeys.length; i++) { - c = tableInfo.clusteringKeys[i]; - const index = tableInfo.columns.indexOf(c); - tableInfo.columns.splice(index, 1); - delete tableInfo.columnsByName[c.name]; - } - tableInfo.clusteringKeys = utils.emptyArray; - tableInfo.clusteringOrder = utils.emptyArray; - // remove regular columns and set the static columns to non-static - i = tableInfo.columns.length; - while (i--) { - c = tableInfo.columns[i]; - if (!c.isStatic && tableInfo.partitionKeys.indexOf(c) === -1) { - // remove "value blob" regular column - tableInfo.columns.splice(i, 1); - delete tableInfo.columnsByName[c.name]; - continue; - } - c.isStatic = false; - } -} - -/** - * Upon migration from thrift to CQL, we internally create a surrogate column "value" of type custom. - * This column shouldn't be exposed to the user but is currently returned by C*. - * @param {module:metadata~TableMetadata} tableInfo - */ -function pruneDenseTableColumns(tableInfo) { - let i = tableInfo.columns.length; - while (i--) { - const c = tableInfo.columns[i]; - if ( - !c.isStatic && - c.type.code === types.dataTypes.custom && - c.type.info === "empty" - ) { - // remove "value blob" regular column - tableInfo.columns.splice(i, 1); - delete tableInfo.columnsByName[c.name]; - continue; - } - c.isStatic = false; - } -} - -function getTokenToReplicaMapper(strategy, strategyOptions) { - if (/SimpleStrategy$/.test(strategy)) { - const rf = parseInt(strategyOptions["replication_factor"], 10); - if (rf > 1) { - return getTokenToReplicaSimpleMapper(rf); - } - } - if (/NetworkTopologyStrategy$/.test(strategy)) { - return getTokenToReplicaNetworkMapper(strategyOptions); - } - // default, wrap in an Array - return function noStrategy(tokenizer, ring, primaryReplicas) { - const replicas = {}; - for (const key in primaryReplicas) { - if (!Object.prototype.hasOwnProperty.call(primaryReplicas, key)) { - continue; - } - replicas[key] = [primaryReplicas[key]]; - } - return replicas; - }; -} - -/** - * @param {Number} replicationFactor - * @returns {function} - */ -function getTokenToReplicaSimpleMapper(replicationFactor) { - return function tokenSimpleStrategy( - tokenizer, - ringTokensAsStrings, - primaryReplicas, - ) { - const ringLength = ringTokensAsStrings.length; - const rf = Math.min(replicationFactor, ringLength); - const replicas = {}; - for (let i = 0; i < ringLength; i++) { - const key = ringTokensAsStrings[i]; - const tokenReplicas = [primaryReplicas[key]]; - for (let j = 1; j < ringLength && tokenReplicas.length < rf; j++) { - let nextReplicaIndex = i + j; - if (nextReplicaIndex >= ringLength) { - // circle back - nextReplicaIndex = nextReplicaIndex % ringLength; - } - const nextReplica = - primaryReplicas[ringTokensAsStrings[nextReplicaIndex]]; - // In the case of vnodes, consecutive sections of the ring can be assigned to the same host. - if (tokenReplicas.indexOf(nextReplica) === -1) { - tokenReplicas.push(nextReplica); - } - } - replicas[key] = tokenReplicas; - } - return replicas; - }; -} - -/** - * @param {Object} replicationFactors - * @returns {Function} - * @private - */ -function getTokenToReplicaNetworkMapper(replicationFactors) { - // A(DC1) - // - // H B(DC2) - // | - // G --+-- C(DC1) - // | - // F D(DC2) - // - // E(DC1) - return function tokenNetworkStrategy( - tokenizer, - ringTokensAsStrings, - primaryReplicas, - datacenters, - ) { - const replicas = {}; - const ringLength = ringTokensAsStrings.length; - - for (let i = 0; i < ringLength; i++) { - const key = ringTokensAsStrings[i]; - const tokenReplicas = []; - const replicasByDc = {}; - const racksPlaced = {}; - const skippedHosts = []; - for (let j = 0; j < ringLength; j++) { - let nextReplicaIndex = i + j; - if (nextReplicaIndex >= ringLength) { - // circle back - nextReplicaIndex = nextReplicaIndex % ringLength; - } - const h = - primaryReplicas[ringTokensAsStrings[nextReplicaIndex]]; - // In the case of vnodes, consecutive sections of the ring can be assigned to the same host. - if (tokenReplicas.indexOf(h) !== -1) { - continue; - } - const dc = h.datacenter; - // Check if the next replica belongs to one of the targeted dcs - let dcRf = parseInt(replicationFactors[dc], 10); - if (!dcRf) { - continue; - } - dcRf = Math.min(dcRf, datacenters[dc].hostLength); - let dcReplicas = replicasByDc[dc] || 0; - // Amount of replicas per dc is greater than rf or the amount of host in the datacenter - if (dcReplicas >= dcRf) { - continue; - } - let racksPlacedInDc = racksPlaced[dc]; - if (!racksPlacedInDc) { - racksPlacedInDc = racksPlaced[dc] = new utils.HashSet(); - } - if ( - h.rack && - racksPlacedInDc.contains(h.rack) && - racksPlacedInDc.length < datacenters[dc].racks.length - ) { - // We already selected a replica for this rack - // Skip until replicas in other racks are added - if (skippedHosts.length < dcRf - dcReplicas) { - skippedHosts.push(h); - } - continue; - } - replicasByDc[h.datacenter] = ++dcReplicas; - tokenReplicas.push(h); - if ( - h.rack && - racksPlacedInDc.add(h.rack) && - racksPlacedInDc.length === datacenters[dc].racks.length - ) { - // We finished placing all replicas for all racks in this dc - // Add the skipped hosts - replicasByDc[dc] += addSkippedHosts( - dcRf, - dcReplicas, - tokenReplicas, - skippedHosts, - ); - } - if ( - isDoneForToken( - replicationFactors, - datacenters, - replicasByDc, - ) - ) { - break; - } - } - replicas[key] = tokenReplicas; - } - return replicas; - }; -} - -/** - * @returns {Number} The number of skipped hosts added. - */ -function addSkippedHosts(dcRf, dcReplicas, tokenReplicas, skippedHosts) { - let i; - for (i = 0; i < dcRf - dcReplicas && i < skippedHosts.length; i++) { - tokenReplicas.push(skippedHosts[i]); - } - return i; -} - -function isDoneForToken(replicationFactors, datacenters, replicasByDc) { - const keys = Object.keys(replicationFactors); - for (let i = 0; i < keys.length; i++) { - const dcName = keys[i]; - const dc = datacenters[dcName]; - if (!dc) { - // A DC is included in the RF but the DC does not exist in the topology - continue; - } - const rf = Math.min( - parseInt(replicationFactors[dcName], 10), - dc.hostLength, - ); - if (rf > 0 && (!replicasByDc[dcName] || replicasByDc[dcName] < rf)) { - return false; - } - } - return true; -} - -/** - * Creates a new instance if the currentInstance is not valid for the - * provided Cassandra version - * @param {ClientOptions} options The client options - * @param {ControlConnection} cc The control connection to be used - * @param {Function} udtResolver The function to be used to retrieve the udts. - * @param {Array.} [version] The cassandra version - * @param {SchemaParser} [currentInstance] The current instance - * @returns {SchemaParser} - */ -function getByVersion(options, cc, udtResolver, version, currentInstance) { - let parserConstructor = SchemaParserV1; - if (version && version[0] === 3) { - parserConstructor = SchemaParserV2; - } else if (version && version[0] >= 4) { - parserConstructor = SchemaParserV3; - } - if (!currentInstance || !(currentInstance instanceof parserConstructor)) { - return new parserConstructor(options, cc, udtResolver); - } - return currentInstance; -} - -exports.getByVersion = getByVersion; -exports.isDoneForToken = isDoneForToken; diff --git a/test/unit-broken/metadata-tests.js b/test/unit-broken/metadata-tests.js index e390fd176..4f1030449 100644 --- a/test/unit-broken/metadata-tests.js +++ b/test/unit-broken/metadata-tests.js @@ -19,8 +19,6 @@ const dataTypes = types.dataTypes; const utils = require("../../lib/utils"); const errors = require("../../lib/errors"); const Encoder = require("../../lib/encoder"); -const isDoneForToken = - require("../../lib/metadata/schema-parser").isDoneForToken; describe("Metadata", function () { this.timeout(5000); @@ -4982,47 +4980,6 @@ describe("Metadata", function () { }); }); -describe("SchemaParser", function () { - describe("isDoneForToken()", function () { - it("should skip if dc not included in topology", function () { - const replicationFactors = { dc1: 3, dc2: 1 }; - // dc2 does not exist - const datacenters = { - dc1: { hostLength: 6 }, - }; - assert.strictEqual( - false, - isDoneForToken(replicationFactors, datacenters, {}), - ); - }); - - it("should skip if rf equals to 0", function () { - // rf 0 for dc2 - const replicationFactors = { dc1: 4, dc2: 0 }; - const datacenters = { - dc1: { hostLength: 6 }, - dc2: { hostLength: 6 }, - }; - assert.strictEqual( - true, - isDoneForToken(replicationFactors, datacenters, { dc1: 4 }), - ); - }); - - it("should return false for undefined replicasByDc[dcName]", function () { - const replicationFactors = { dc1: 3, dc2: 1 }; - // dc2 does not exist - const datacenters = { - dc1: { hostLength: 6 }, - }; - assert.strictEqual( - false, - isDoneForToken(replicationFactors, datacenters, {}), - ); - }); - }); -}); - function getControlConnectionForTable( tableRow, columnRows, diff --git a/test/unit-not-supported/event-debouncer-tests.js b/test/unit-not-supported/event-debouncer-tests.js deleted file mode 100644 index df31a9a0a..000000000 --- a/test/unit-not-supported/event-debouncer-tests.js +++ /dev/null @@ -1,347 +0,0 @@ -"use strict"; - -const { assert } = require("chai"); -const sinon = require("sinon"); - -const helper = require("../test-helper"); -const EventDebouncer = require("../../lib/metadata/event-debouncer"); - -describe("EventDebouncer", function () { - describe("timeoutElapsed()", function () { - it("should set the queue to null", function (done) { - const debouncer = newInstance(1); - debouncer._queue = { - mainEvent: { handler: () => Promise.resolve() }, - callbacks: [helper.noop], - }; - debouncer._slideDelay(1); - setTimeout(function () { - assert.strictEqual(debouncer._queue, null); - done(); - }, 40); - }); - - it("should process the main event and invoke all the callbacks", function (done) { - const debouncer = newInstance(1); - let callbackCounter = 0; - function increaseCounter() { - callbackCounter++; - } - debouncer._queue = { - mainEvent: { handler: () => Promise.resolve() }, - callbacks: helper.fillArray(10, increaseCounter), - }; - debouncer._slideDelay(1); - setTimeout(function () { - assert.strictEqual(callbackCounter, 10); - done(); - }, 40); - }); - - it("should process each keyspace the main event and invoke all child the callbacks", function (done) { - const debouncer = newInstance(1); - let callbackCounter = 0; - let ksMainEventCalled = 0; - - function increaseCounter() { - callbackCounter++; - return Promise.resolve(); - } - - debouncer._queue = { - callbacks: [assert.fail], - keyspaces: { - ks1: { - mainEvent: { - handler: () => { - ksMainEventCalled++; - return Promise.resolve(); - }, - callback: increaseCounter, - }, - events: [ - { callback: increaseCounter }, - { callback: increaseCounter }, - ], - }, - }, - }; - - debouncer._slideDelay(1); - - setTimeout(function () { - assert.strictEqual(callbackCounter, 2); - assert.strictEqual(ksMainEventCalled, 1); - done(); - }, 40); - }); - - it("should process each keyspace and invoke handlers and callbacks", function (done) { - const debouncer = newInstance(1); - let callbackCounter = 0; - function increaseCounter() { - callbackCounter++; - } - const handlersCalled = []; - function getHandler(name) { - return function () { - handlersCalled.push(name); - }; - } - debouncer._queue = { - callbacks: [assert.fail], - keyspaces: { - ks1: { - events: [ - { - callback: increaseCounter, - handler: getHandler("A"), - }, - { - callback: increaseCounter, - handler: getHandler("B"), - }, - ], - }, - }, - }; - debouncer._slideDelay(1); - setTimeout(function () { - assert.strictEqual(callbackCounter, 2); - assert.deepEqual(handlersCalled, ["A", "B"]); - done(); - }, 40); - }); - }); - - describe("#eventReceived()", function () { - it("should invoke 1 handler and all the callbacks when one event is flagged as `all`", async () => { - const debouncer = newInstance(20); - let mainEventHandlerCalled = 0; - - await Promise.all([ - debouncer.eventReceived( - { handler: helper.failop, keyspace: "ks1" }, - false, - ), - debouncer.eventReceived( - { - handler: helper.failop, - keyspace: "ks1", - cqlObject: "abc", - }, - false, - ), - debouncer.eventReceived( - { handler: helper.failop, keyspace: "ks2" }, - false, - ), - - // Send an event with `all: true` - debouncer.eventReceived( - { - handler: () => { - mainEventHandlerCalled++; - return Promise.resolve(); - }, - all: true, - }, - false, - ), - - // Another one with `all: false` - debouncer.eventReceived( - { handler: helper.failop, keyspace: "ks2" }, - false, - ), - ]); - - assert.strictEqual(mainEventHandlerCalled, 1); - assert.strictEqual(debouncer._timeout, null); - }); - - it("should invoke 1 keyspace handler and all the callbacks when cqlObject is undefined", async () => { - const debouncer = newInstance(30); - const handlersCalled = []; - function getHandler(name) { - return () => { - handlersCalled.push(name); - return Promise.resolve(); - }; - } - - const promise = Promise.all([ - debouncer.eventReceived( - { handler: getHandler("A"), keyspace: "ks1" }, - false, - ), - debouncer.eventReceived( - { - handler: getHandler("B"), - keyspace: "ks1", - cqlObject: "1a", - }, - false, - ), - debouncer.eventReceived( - { handler: getHandler("C"), keyspace: "ks2" }, - false, - ), - debouncer.eventReceived( - { handler: getHandler("D"), keyspace: "ks2" }, - false, - ), - debouncer.eventReceived( - { handler: getHandler("E"), keyspace: "ks2" }, - false, - ), - debouncer.eventReceived( - { - handler: getHandler("F"), - keyspace: "ks3", - cqlObject: "3a", - }, - false, - ), - debouncer.eventReceived( - { - handler: getHandler("H"), - keyspace: "ks3", - cqlObject: "3b", - }, - false, - ), - ]); - - await helper.delayAsync(5); - - // Should not be called yet - assert.deepStrictEqual(handlersCalled, []); - - await promise; - - assert.deepEqual(handlersCalled, ["A", "E", "F", "H"]); - assert.strictEqual(debouncer._timeout, null); - }); - - it("should not invoke handlers before time elapses", async () => { - const debouncer = newInstance(200); - const handlersCalled = []; - function getHandler(name) { - return () => { - handlersCalled.push(name); - return Promise.resolve(); - }; - } - - debouncer.eventReceived( - { handler: getHandler("A"), keyspace: "ks1" }, - false, - ); - - await helper.delayAsync(20); - - // should not be called yet - assert.lengthOf(handlersCalled, 0); - debouncer.shutdown(); - }); - - it("should process queue immediately when processNow is true", async () => { - const debouncer = newInstance(40); - const handlersCalled = []; - function getHandler(name) { - return () => { - handlersCalled.push(name); - return Promise.resolve(); - }; - } - - const spy = sinon.spy(() => {}); - - const promise = Promise.all([ - debouncer - .eventReceived( - { handler: getHandler("A"), keyspace: "ks2" }, - false, - ) - .then(spy), - debouncer - .eventReceived( - { handler: getHandler("B"), keyspace: "ks1" }, - false, - ) - .then(spy), - // set with process now to true - debouncer - .eventReceived( - { handler: getHandler("C"), keyspace: "ks1" }, - true, - ) - .then(spy), - debouncer - .eventReceived( - { handler: getHandler("D"), keyspace: "ks1" }, - false, - ) - .then(spy), - ]); - - await helper.delayAsync(20); - - // The first three should be resolved by now - assert.strictEqual(spy.callCount, 3); - // Flattens the amount of requests - assert.deepEqual(handlersCalled, ["A", "C"]); - - await promise; - - assert.deepEqual(handlersCalled, ["A", "C", "D"]); - }); - }); - describe("#shutdown()", () => { - it("should invoke all callbacks", async () => { - const debouncer = newInstance(20); - const spy = sinon.spy(() => {}); - - debouncer - .eventReceived( - { handler: helper.failop, keyspace: "ks1" }, - false, - ) - .then(spy); - debouncer - .eventReceived( - { - handler: helper.failop, - keyspace: "ks1", - cqlObject: "1a", - }, - false, - ) - .then(spy); - debouncer - .eventReceived( - { handler: helper.failop, keyspace: "ks2" }, - false, - ) - .then(spy); - debouncer - .eventReceived({ handler: helper.failop, all: true }, false) - .then(spy); - - debouncer.shutdown(); - - await helper.delayAsync(5); - assert.strictEqual(spy.callCount, 4); - - // Check that timer shouldn't elapse - await helper.delayAsync(30); - assert.strictEqual(spy.callCount, 4); - }); - }); -}); - -/** @returns {EventDebouncer} */ -function newInstance(delay) { - return new EventDebouncer(delay, helper.noop); -}