From 553e6197e2325b78b8568c1aa24d03b8cfb7668e Mon Sep 17 00:00:00 2001 From: Niks988 Date: Fri, 25 Jul 2025 10:49:52 +0200 Subject: [PATCH 1/4] telemetry with batching, persistence, and improved logging --- ot-node.js | 12 + .../implementation/quest-telemetry.js | 450 +++++++++++++++++- 2 files changed, 444 insertions(+), 18 deletions(-) diff --git a/ot-node.js b/ot-node.js index 36d927aa04..26f21e3c6f 100644 --- a/ot-node.js +++ b/ot-node.js @@ -442,6 +442,18 @@ class OTNode { async handleExit() { this.logger.info('SIGINT or SIGTERM received. Shutting down...'); + + // Cleanup telemetry first to ensure all events are sent + try { + const telemetryModuleManager = this.container.resolve('telemetryModuleManager'); + if (telemetryModuleManager && telemetryModuleManager.getImplementation()) { + this.logger.info('Cleaning up telemetry...'); + await telemetryModuleManager.getImplementation().module.cleanup(); + } + } catch (error) { + this.logger.error(`Error during telemetry cleanup: ${error.message}`); + } + const commandExecutor = this.container.resolve('commandExecutor'); await commandExecutor.commandExecutorShutdown(); process.exit(0); diff --git a/src/modules/telemetry/implementation/quest-telemetry.js b/src/modules/telemetry/implementation/quest-telemetry.js index 13faf4fab6..ae242ac8c0 100644 --- a/src/modules/telemetry/implementation/quest-telemetry.js +++ b/src/modules/telemetry/implementation/quest-telemetry.js @@ -1,12 +1,321 @@ import { Sender } from '@questdb/nodejs-client'; class QuestTelemetry { + constructor() { + this.localSender = null; + this.lastErrorLogTime = 0; + this.errorLogInterval = 60000; // 1 minute between error logs + this.retryAttempts = 0; + this.maxRetryAttempts = 5; + this.baseRetryDelay = 1000; // 1 second + + // Health monitoring + this.healthCheckInterval = 3 * 60 * 1000; // 3 minutes + this.healthCheckTimer = null; + this.telemetryStats = { + totalEvents: 0, + successfulEvents: 0, + failedEvents: 0, + lastHealthCheck: Date.now(), + connectionStatus: 'disconnected' + }; + + // Event batching + this.batchSize = 50; // Send batch when 50 events collected + this.maxBatchSize = 200; // Maximum batch size to prevent memory leaks + this.batchTimeout = 5000; // Send batch after 5 seconds + this.eventBatch = []; + this.batchTimer = null; + + // Bulletproof features + this.isShuttingDown = false; + this.connectionHealthTimer = null; + this.connectionHealthInterval = 30 * 1000; // 30 seconds + this.lastSuccessfulSend = Date.now(); + this.maxTimeWithoutSuccess = 5 * 60 * 1000; // 5 minutes + + // Event persistence for when QuestDB is down + this.persistentEventQueue = []; + this.maxPersistentQueueSize = 10000; // Increased to 10,000 events + this.isConnectionDown = false; + this.retryQueueTimer = null; + this.retryQueueInterval = 10 * 1000; // 10 seconds + } + async initialize(config, logger) { this.config = config; this.logger = logger; - this.localSender = Sender.fromConfig(this.config.localEndpoint); - if (this.config.sendToSignalingService) { - this.signalingServiceSender = Sender.fromConfig(this.config.signalingServiceEndpoint); + await this.createLocalSender(); + this.startHealthMonitoring(); + this.startBatchTimer(); + this.startConnectionHealthCheck(); + this.startRetryQueueTimer(); + + // Graceful shutdown handling + process.on('SIGINT', () => this.gracefulShutdown()); + process.on('SIGTERM', () => this.gracefulShutdown()); + } + + async gracefulShutdown() { + if (this.isShuttingDown) return; + this.isShuttingDown = true; + + this.logger.info('QuestDB telemetry graceful shutdown initiated'); + await this.cleanup(); + } + + async createLocalSender() { + try { + this.localSender = Sender.fromConfig(this.config.localEndpoint); + this.retryAttempts = 0; + this.telemetryStats.connectionStatus = 'connected'; + this.lastSuccessfulSend = Date.now(); + this.logger.debug('QuestDB local sender created successfully'); + } catch (error) { + this.telemetryStats.connectionStatus = 'failed'; + this.logError(`Failed to create QuestDB local sender: ${error.message}`); + } + } + + startHealthMonitoring() { + this.healthCheckTimer = setInterval(() => { + this.logHealthStatus(); + }, this.healthCheckInterval); + + this.logger.info('QuestDB telemetry health monitoring started (every 3 minutes)'); + } + + startConnectionHealthCheck() { + this.connectionHealthTimer = setInterval(() => { + this.checkConnectionHealth(); + }, this.connectionHealthInterval); + + this.logger.info('QuestDB telemetry connection health check started (every 30 seconds)'); + } + + startRetryQueueTimer() { + this.retryQueueTimer = setInterval(() => { + this.retryPersistentEvents(); + }, this.retryQueueInterval); + + this.logger.info('QuestDB telemetry persistent event retry timer started (every 10 seconds)'); + } + + checkConnectionHealth() { + const timeSinceLastSuccess = Date.now() - this.lastSuccessfulSend; + + if (timeSinceLastSuccess > this.maxTimeWithoutSuccess) { + this.logger.warn(`No successful telemetry sends for ${Math.round(timeSinceLastSuccess / 1000)}s, recreating connection`); + this.recreateConnection(); + } + } + + async recreateConnection() { + try { + if (this.localSender) { + await this.localSender.flush(); + await this.localSender.close(); + } + await this.createLocalSender(); + } catch (error) { + this.logError(`Failed to recreate QuestDB connection: ${error.message}`); + } + } + + startBatchTimer() { + this.batchTimer = setInterval(() => { + this.flushBatch(); + }, this.batchTimeout); + + this.logger.info(`QuestDB telemetry batching started (batch size: ${this.batchSize}, max: ${this.maxBatchSize}, timeout: ${this.batchTimeout}ms)`); + } + + async flushBatch() { + if (this.eventBatch.length === 0) { + return; + } + + const batchToSend = [...this.eventBatch]; + this.eventBatch = []; + + try { + await this.retryWithBackoff(async () => { + for (const event of batchToSend) { + const table = this.localSender.table('event'); + + table.symbol('operationId', event.operationId || 'NULL'); + table.symbol('blockchainId', event.blockchainId || 'NULL'); + table.symbol('name', event.name || 'NULL'); + if (event.value1 !== null) table.symbol('value1', event.value1); + if (event.value2 !== null) table.symbol('value2', event.value2); + if (event.value3 !== null) table.symbol('value3', event.value3); + table.timestampColumn('timestamp', event.timestamp * 1000); + + await table.at(Date.now(), 'ms'); + } + + await this.localSender.flush(); + }); + + this.telemetryStats.successfulEvents += batchToSend.length; + this.telemetryStats.connectionStatus = 'connected'; + this.lastSuccessfulSend = Date.now(); + + this.logger.debug(`Successfully sent batch of ${batchToSend.length} events to QuestDB`); + } catch (err) { + this.telemetryStats.failedEvents += batchToSend.length; + this.telemetryStats.connectionStatus = 'error'; + this.logError(`Error sending batch of ${batchToSend.length} events to QuestDB: ${err.message}`); + + // Fallback: try to send events individually + await this.fallbackToIndividualEvents(batchToSend); + } + } + + async fallbackToIndividualEvents(events) { + this.logger.warn(`Attempting fallback: sending ${events.length} events individually`); + + for (const event of events) { + try { + await this.retryWithBackoff(async () => { + const table = this.localSender.table('event'); + + table.symbol('operationId', event.operationId || 'NULL'); + table.symbol('blockchainId', event.blockchainId || 'NULL'); + table.symbol('name', event.name || 'NULL'); + if (event.value1 !== null) table.symbol('value1', event.value1); + if (event.value2 !== null) table.symbol('value2', event.value2); + if (event.value3 !== null) table.symbol('value3', event.value3); + table.timestampColumn('timestamp', event.timestamp * 1000); + + await table.at(Date.now(), 'ms'); + await this.localSender.flush(); + }); + + this.telemetryStats.successfulEvents++; + this.lastSuccessfulSend = Date.now(); + } catch (error) { + this.telemetryStats.failedEvents++; + this.logError(`Fallback failed for event ${event.operationId}: ${error.message}`); + } + } + } + + logHealthStatus() { + const now = Date.now(); + const timeSinceLastCheck = now - this.telemetryStats.lastHealthCheck; + const successRate = this.telemetryStats.totalEvents > 0 + ? ((this.telemetryStats.successfulEvents / this.telemetryStats.totalEvents) * 100).toFixed(1) + : 0; + + // Calculate batch status + const batchStatus = this.eventBatch.length > 0 + ? `Pending for next batch: ${this.eventBatch.length}/${this.batchSize}` + : 'Batch: empty'; + + // Calculate queue status + const queueStatus = this.persistentEventQueue.length > 0 + ? `Queue: ${this.persistentEventQueue.length}/${this.maxPersistentQueueSize}` + : 'Queue: empty'; + + this.logger.info( + `[TELEMETRY HEALTH] Status: ${this.telemetryStats.connectionStatus}, ` + + `Success Rate: ${successRate}%, ` + + `Events: ${this.telemetryStats.successfulEvents}/${this.telemetryStats.totalEvents} successful, ` + + `Failed: ${this.telemetryStats.failedEvents}, ` + + `${batchStatus}, ` + + `${queueStatus}, ` + + `Last Success: ${Math.round((now - this.lastSuccessfulSend) / 1000)}s ago, ` + + `Period: ${Math.round(timeSinceLastCheck / 1000)}s` + ); + + // Reset stats for next period + this.telemetryStats.totalEvents = 0; + this.telemetryStats.successfulEvents = 0; + this.telemetryStats.failedEvents = 0; + this.telemetryStats.lastHealthCheck = now; + } + + logError(message) { + const now = Date.now(); + if (now - this.lastErrorLogTime > this.errorLogInterval) { + this.logger.error(message); + this.lastErrorLogTime = now; + } + } + + async retryWithBackoff(operation) { + for (let attempt = 0; attempt < this.maxRetryAttempts; attempt++) { + try { + return await operation(); + } catch (error) { + if (attempt === this.maxRetryAttempts - 1) { + throw error; + } + + const delay = this.baseRetryDelay * Math.pow(2, attempt); + this.logError(`QuestDB operation failed, retrying in ${delay}ms (attempt ${attempt + 1}/${this.maxRetryAttempts}): ${error.message}`); + await new Promise(resolve => setTimeout(resolve, delay)); + + // Recreate sender on connection errors + if (error.message.includes('ECONNRESET') || error.message.includes('connection')) { + this.telemetryStats.connectionStatus = 'reconnecting'; + await this.createLocalSender(); + } + } + } + } + + async retryPersistentEvents() { + if (this.persistentEventQueue.length === 0 || !this.localSender) { + return; + } + + this.logger.info(`Attempting to retry ${this.persistentEventQueue.length} persistent events`); + + const eventsToRetry = [...this.persistentEventQueue]; + this.persistentEventQueue = []; + + try { + await this.retryWithBackoff(async () => { + for (const event of eventsToRetry) { + const table = this.localSender.table('event'); + + table.symbol('operationId', event.operationId || 'NULL'); + table.symbol('blockchainId', event.blockchainId || 'NULL'); + table.symbol('name', event.name || 'NULL'); + if (event.value1 !== null) table.symbol('value1', event.value1); + if (event.value2 !== null) table.symbol('value2', event.value2); + if (event.value3 !== null) table.symbol('value3', event.value3); + table.timestampColumn('timestamp', event.timestamp * 1000); + + await table.at(Date.now(), 'ms'); + } + + await this.localSender.flush(); + }); + + this.telemetryStats.successfulEvents += eventsToRetry.length; + this.telemetryStats.connectionStatus = 'connected'; + this.lastSuccessfulSend = Date.now(); + this.isConnectionDown = false; + + this.logger.info(`Successfully retried ${eventsToRetry.length} persistent events to QuestDB`); + } catch (err) { + // Put events back in queue for next retry + this.persistentEventQueue.unshift(...eventsToRetry); + + // If queue gets too large, drop oldest events + if (this.persistentEventQueue.length > this.maxPersistentQueueSize) { + const droppedCount = this.persistentEventQueue.length - this.maxPersistentQueueSize; + this.persistentEventQueue = this.persistentEventQueue.slice(0, this.maxPersistentQueueSize); + this.telemetryStats.failedEvents += droppedCount; + this.logger.warn(`Dropped ${droppedCount} events due to queue size limit`); + } + + this.telemetryStats.connectionStatus = 'error'; + this.isConnectionDown = true; + this.logError(`Failed to retry persistent events: ${err.message}`); } } @@ -23,24 +332,129 @@ class QuestTelemetry { value2 = null, value3 = null, ) { - try { - const table = this.localSender.table('event'); + this.telemetryStats.totalEvents++; + + const event = { + operationId, + timestamp, + blockchainId, + name, + value1, + value2, + value3 + }; - table.symbol('operationId', operationId || 'NULL'); - table.symbol('blockchainId', blockchainId || 'NULL'); - table.symbol('name', name || 'NULL'); - if (value1 !== null) table.symbol('value1', value1); - if (value2 !== null) table.symbol('value2', value2); - if (value3 !== null) table.symbol('value3', value3); - table.timestampColumn('timestamp', timestamp * 1000); + // If shutting down, try to send immediately instead of dropping + if (this.isShuttingDown) { + if (this.localSender && !this.isConnectionDown) { + try { + const table = this.localSender.table('event'); + + table.symbol('operationId', event.operationId || 'NULL'); + table.symbol('blockchainId', event.blockchainId || 'NULL'); + table.symbol('name', event.name || 'NULL'); + if (event.value1 !== null) table.symbol('value1', event.value1); + if (event.value2 !== null) table.symbol('value2', event.value2); + if (event.value3 !== null) table.symbol('value3', event.value3); + table.timestampColumn('timestamp', event.timestamp * 1000); + + await table.at(Date.now(), 'ms'); + await this.localSender.flush(); + + this.telemetryStats.successfulEvents++; + this.logger.debug(`Sent event ${event.operationId} during shutdown`); + } catch (error) { + this.telemetryStats.failedEvents++; + this.logger.warn(`Failed to send event ${event.operationId} during shutdown: ${error.message}`); + } + } else { + this.telemetryStats.failedEvents++; + this.logger.warn(`Dropping event ${event.operationId} during shutdown - no connection available`); + } + return; + } - await table.at(Date.now(), 'ms'); - await this.localSender.flush(); - await this.localSender.close(); + // If QuestDB is down, queue event for later retry + if (!this.localSender || this.isConnectionDown) { + this.persistentEventQueue.push(event); + + // If memory queue is full, drop oldest events + if (this.persistentEventQueue.length > this.maxPersistentQueueSize) { + const droppedEvent = this.persistentEventQueue.shift(); + this.telemetryStats.failedEvents++; + this.logger.warn(`Dropped event ${droppedEvent.operationId} due to queue size limit`); + } + + this.telemetryStats.connectionStatus = 'disconnected'; + this.isConnectionDown = true; + this.logError('QuestDB local sender not available, event queued for retry'); + return; + } - // this.logger.info('Event telemetry successfully sent to local QuestDB'); - } catch (err) { - this.logger.error(`Error sending telemetry to local QuestDB: ${err.message}`); + // Add event to batch + this.eventBatch.push(event); + + // Prevent memory leaks: force flush if batch gets too large + if (this.eventBatch.length >= this.maxBatchSize) { + this.logger.warn(`Batch size limit reached (${this.maxBatchSize}), forcing flush`); + await this.flushBatch(); + } + // Send batch if it's full + else if (this.eventBatch.length >= this.batchSize) { + await this.flushBatch(); + } + } + + async cleanup() { + try { + this.isShuttingDown = true; + + if (this.healthCheckTimer) { + clearInterval(this.healthCheckTimer); + this.healthCheckTimer = null; + } + + if (this.batchTimer) { + clearInterval(this.batchTimer); + this.batchTimer = null; + } + + if (this.connectionHealthTimer) { + clearInterval(this.connectionHealthTimer); + this.connectionHealthTimer = null; + } + + if (this.retryQueueTimer) { + clearInterval(this.retryQueueTimer); + this.retryQueueTimer = null; + } + + // Flush any remaining events in batch + if (this.eventBatch.length > 0) { + this.logger.info(`Flushing final batch of ${this.eventBatch.length} events before shutdown`); + await this.flushBatch(); + } + + // Flush any persistent events + if (this.persistentEventQueue.length > 0) { + this.logger.info(`Flushing final persistent queue of ${this.persistentEventQueue.length} events before shutdown`); + await this.retryPersistentEvents(); + + // Log any remaining events that couldn't be sent + if (this.persistentEventQueue.length > 0) { + this.logger.warn(`Could not send ${this.persistentEventQueue.length} events before shutdown`); + } + } + + if (this.localSender) { + await this.localSender.flush(); + await this.localSender.close(); + this.localSender = null; + } + + this.logger.info('QuestDB telemetry cleanup completed'); + } catch (error) { + this.logger.error(`Error during QuestDB telemetry cleanup: ${error.message}`); } } } From ed009784f671e91ab8972910ca88dedf5598db1f Mon Sep 17 00:00:00 2001 From: Niks988 Date: Mon, 28 Jul 2025 16:13:57 +0200 Subject: [PATCH 2/4] improved proof logging, other improvements --- .../implementation/quest-telemetry.js | 463 +----------------- 1 file changed, 1 insertion(+), 462 deletions(-) diff --git a/src/modules/telemetry/implementation/quest-telemetry.js b/src/modules/telemetry/implementation/quest-telemetry.js index ae242ac8c0..f9a0c62a44 100644 --- a/src/modules/telemetry/implementation/quest-telemetry.js +++ b/src/modules/telemetry/implementation/quest-telemetry.js @@ -1,462 +1 @@ -import { Sender } from '@questdb/nodejs-client'; - -class QuestTelemetry { - constructor() { - this.localSender = null; - this.lastErrorLogTime = 0; - this.errorLogInterval = 60000; // 1 minute between error logs - this.retryAttempts = 0; - this.maxRetryAttempts = 5; - this.baseRetryDelay = 1000; // 1 second - - // Health monitoring - this.healthCheckInterval = 3 * 60 * 1000; // 3 minutes - this.healthCheckTimer = null; - this.telemetryStats = { - totalEvents: 0, - successfulEvents: 0, - failedEvents: 0, - lastHealthCheck: Date.now(), - connectionStatus: 'disconnected' - }; - - // Event batching - this.batchSize = 50; // Send batch when 50 events collected - this.maxBatchSize = 200; // Maximum batch size to prevent memory leaks - this.batchTimeout = 5000; // Send batch after 5 seconds - this.eventBatch = []; - this.batchTimer = null; - - // Bulletproof features - this.isShuttingDown = false; - this.connectionHealthTimer = null; - this.connectionHealthInterval = 30 * 1000; // 30 seconds - this.lastSuccessfulSend = Date.now(); - this.maxTimeWithoutSuccess = 5 * 60 * 1000; // 5 minutes - - // Event persistence for when QuestDB is down - this.persistentEventQueue = []; - this.maxPersistentQueueSize = 10000; // Increased to 10,000 events - this.isConnectionDown = false; - this.retryQueueTimer = null; - this.retryQueueInterval = 10 * 1000; // 10 seconds - } - - async initialize(config, logger) { - this.config = config; - this.logger = logger; - await this.createLocalSender(); - this.startHealthMonitoring(); - this.startBatchTimer(); - this.startConnectionHealthCheck(); - this.startRetryQueueTimer(); - - // Graceful shutdown handling - process.on('SIGINT', () => this.gracefulShutdown()); - process.on('SIGTERM', () => this.gracefulShutdown()); - } - - async gracefulShutdown() { - if (this.isShuttingDown) return; - this.isShuttingDown = true; - - this.logger.info('QuestDB telemetry graceful shutdown initiated'); - await this.cleanup(); - } - - async createLocalSender() { - try { - this.localSender = Sender.fromConfig(this.config.localEndpoint); - this.retryAttempts = 0; - this.telemetryStats.connectionStatus = 'connected'; - this.lastSuccessfulSend = Date.now(); - this.logger.debug('QuestDB local sender created successfully'); - } catch (error) { - this.telemetryStats.connectionStatus = 'failed'; - this.logError(`Failed to create QuestDB local sender: ${error.message}`); - } - } - - startHealthMonitoring() { - this.healthCheckTimer = setInterval(() => { - this.logHealthStatus(); - }, this.healthCheckInterval); - - this.logger.info('QuestDB telemetry health monitoring started (every 3 minutes)'); - } - - startConnectionHealthCheck() { - this.connectionHealthTimer = setInterval(() => { - this.checkConnectionHealth(); - }, this.connectionHealthInterval); - - this.logger.info('QuestDB telemetry connection health check started (every 30 seconds)'); - } - - startRetryQueueTimer() { - this.retryQueueTimer = setInterval(() => { - this.retryPersistentEvents(); - }, this.retryQueueInterval); - - this.logger.info('QuestDB telemetry persistent event retry timer started (every 10 seconds)'); - } - - checkConnectionHealth() { - const timeSinceLastSuccess = Date.now() - this.lastSuccessfulSend; - - if (timeSinceLastSuccess > this.maxTimeWithoutSuccess) { - this.logger.warn(`No successful telemetry sends for ${Math.round(timeSinceLastSuccess / 1000)}s, recreating connection`); - this.recreateConnection(); - } - } - - async recreateConnection() { - try { - if (this.localSender) { - await this.localSender.flush(); - await this.localSender.close(); - } - await this.createLocalSender(); - } catch (error) { - this.logError(`Failed to recreate QuestDB connection: ${error.message}`); - } - } - - startBatchTimer() { - this.batchTimer = setInterval(() => { - this.flushBatch(); - }, this.batchTimeout); - - this.logger.info(`QuestDB telemetry batching started (batch size: ${this.batchSize}, max: ${this.maxBatchSize}, timeout: ${this.batchTimeout}ms)`); - } - - async flushBatch() { - if (this.eventBatch.length === 0) { - return; - } - - const batchToSend = [...this.eventBatch]; - this.eventBatch = []; - - try { - await this.retryWithBackoff(async () => { - for (const event of batchToSend) { - const table = this.localSender.table('event'); - - table.symbol('operationId', event.operationId || 'NULL'); - table.symbol('blockchainId', event.blockchainId || 'NULL'); - table.symbol('name', event.name || 'NULL'); - if (event.value1 !== null) table.symbol('value1', event.value1); - if (event.value2 !== null) table.symbol('value2', event.value2); - if (event.value3 !== null) table.symbol('value3', event.value3); - table.timestampColumn('timestamp', event.timestamp * 1000); - - await table.at(Date.now(), 'ms'); - } - - await this.localSender.flush(); - }); - - this.telemetryStats.successfulEvents += batchToSend.length; - this.telemetryStats.connectionStatus = 'connected'; - this.lastSuccessfulSend = Date.now(); - - this.logger.debug(`Successfully sent batch of ${batchToSend.length} events to QuestDB`); - } catch (err) { - this.telemetryStats.failedEvents += batchToSend.length; - this.telemetryStats.connectionStatus = 'error'; - this.logError(`Error sending batch of ${batchToSend.length} events to QuestDB: ${err.message}`); - - // Fallback: try to send events individually - await this.fallbackToIndividualEvents(batchToSend); - } - } - - async fallbackToIndividualEvents(events) { - this.logger.warn(`Attempting fallback: sending ${events.length} events individually`); - - for (const event of events) { - try { - await this.retryWithBackoff(async () => { - const table = this.localSender.table('event'); - - table.symbol('operationId', event.operationId || 'NULL'); - table.symbol('blockchainId', event.blockchainId || 'NULL'); - table.symbol('name', event.name || 'NULL'); - if (event.value1 !== null) table.symbol('value1', event.value1); - if (event.value2 !== null) table.symbol('value2', event.value2); - if (event.value3 !== null) table.symbol('value3', event.value3); - table.timestampColumn('timestamp', event.timestamp * 1000); - - await table.at(Date.now(), 'ms'); - await this.localSender.flush(); - }); - - this.telemetryStats.successfulEvents++; - this.lastSuccessfulSend = Date.now(); - } catch (error) { - this.telemetryStats.failedEvents++; - this.logError(`Fallback failed for event ${event.operationId}: ${error.message}`); - } - } - } - - logHealthStatus() { - const now = Date.now(); - const timeSinceLastCheck = now - this.telemetryStats.lastHealthCheck; - const successRate = this.telemetryStats.totalEvents > 0 - ? ((this.telemetryStats.successfulEvents / this.telemetryStats.totalEvents) * 100).toFixed(1) - : 0; - - // Calculate batch status - const batchStatus = this.eventBatch.length > 0 - ? `Pending for next batch: ${this.eventBatch.length}/${this.batchSize}` - : 'Batch: empty'; - - // Calculate queue status - const queueStatus = this.persistentEventQueue.length > 0 - ? `Queue: ${this.persistentEventQueue.length}/${this.maxPersistentQueueSize}` - : 'Queue: empty'; - - this.logger.info( - `[TELEMETRY HEALTH] Status: ${this.telemetryStats.connectionStatus}, ` + - `Success Rate: ${successRate}%, ` + - `Events: ${this.telemetryStats.successfulEvents}/${this.telemetryStats.totalEvents} successful, ` + - `Failed: ${this.telemetryStats.failedEvents}, ` + - `${batchStatus}, ` + - `${queueStatus}, ` + - `Last Success: ${Math.round((now - this.lastSuccessfulSend) / 1000)}s ago, ` + - `Period: ${Math.round(timeSinceLastCheck / 1000)}s` - ); - - // Reset stats for next period - this.telemetryStats.totalEvents = 0; - this.telemetryStats.successfulEvents = 0; - this.telemetryStats.failedEvents = 0; - this.telemetryStats.lastHealthCheck = now; - } - - logError(message) { - const now = Date.now(); - if (now - this.lastErrorLogTime > this.errorLogInterval) { - this.logger.error(message); - this.lastErrorLogTime = now; - } - } - - async retryWithBackoff(operation) { - for (let attempt = 0; attempt < this.maxRetryAttempts; attempt++) { - try { - return await operation(); - } catch (error) { - if (attempt === this.maxRetryAttempts - 1) { - throw error; - } - - const delay = this.baseRetryDelay * Math.pow(2, attempt); - this.logError(`QuestDB operation failed, retrying in ${delay}ms (attempt ${attempt + 1}/${this.maxRetryAttempts}): ${error.message}`); - await new Promise(resolve => setTimeout(resolve, delay)); - - // Recreate sender on connection errors - if (error.message.includes('ECONNRESET') || error.message.includes('connection')) { - this.telemetryStats.connectionStatus = 'reconnecting'; - await this.createLocalSender(); - } - } - } - } - - async retryPersistentEvents() { - if (this.persistentEventQueue.length === 0 || !this.localSender) { - return; - } - - this.logger.info(`Attempting to retry ${this.persistentEventQueue.length} persistent events`); - - const eventsToRetry = [...this.persistentEventQueue]; - this.persistentEventQueue = []; - - try { - await this.retryWithBackoff(async () => { - for (const event of eventsToRetry) { - const table = this.localSender.table('event'); - - table.symbol('operationId', event.operationId || 'NULL'); - table.symbol('blockchainId', event.blockchainId || 'NULL'); - table.symbol('name', event.name || 'NULL'); - if (event.value1 !== null) table.symbol('value1', event.value1); - if (event.value2 !== null) table.symbol('value2', event.value2); - if (event.value3 !== null) table.symbol('value3', event.value3); - table.timestampColumn('timestamp', event.timestamp * 1000); - - await table.at(Date.now(), 'ms'); - } - - await this.localSender.flush(); - }); - - this.telemetryStats.successfulEvents += eventsToRetry.length; - this.telemetryStats.connectionStatus = 'connected'; - this.lastSuccessfulSend = Date.now(); - this.isConnectionDown = false; - - this.logger.info(`Successfully retried ${eventsToRetry.length} persistent events to QuestDB`); - } catch (err) { - // Put events back in queue for next retry - this.persistentEventQueue.unshift(...eventsToRetry); - - // If queue gets too large, drop oldest events - if (this.persistentEventQueue.length > this.maxPersistentQueueSize) { - const droppedCount = this.persistentEventQueue.length - this.maxPersistentQueueSize; - this.persistentEventQueue = this.persistentEventQueue.slice(0, this.maxPersistentQueueSize); - this.telemetryStats.failedEvents += droppedCount; - this.logger.warn(`Dropped ${droppedCount} events due to queue size limit`); - } - - this.telemetryStats.connectionStatus = 'error'; - this.isConnectionDown = true; - this.logError(`Failed to retry persistent events: ${err.message}`); - } - } - - listenOnEvents(eventEmitter, onEventReceived) { - return eventEmitter.on('operation_status_changed', onEventReceived); - } - - async sendTelemetryData( - operationId, - timestamp, - blockchainId = '', - name = '', - value1 = null, - value2 = null, - value3 = null, - ) { - this.telemetryStats.totalEvents++; - - const event = { - operationId, - timestamp, - blockchainId, - name, - value1, - value2, - value3 - }; - - // If shutting down, try to send immediately instead of dropping - if (this.isShuttingDown) { - if (this.localSender && !this.isConnectionDown) { - try { - const table = this.localSender.table('event'); - - table.symbol('operationId', event.operationId || 'NULL'); - table.symbol('blockchainId', event.blockchainId || 'NULL'); - table.symbol('name', event.name || 'NULL'); - if (event.value1 !== null) table.symbol('value1', event.value1); - if (event.value2 !== null) table.symbol('value2', event.value2); - if (event.value3 !== null) table.symbol('value3', event.value3); - table.timestampColumn('timestamp', event.timestamp * 1000); - - await table.at(Date.now(), 'ms'); - await this.localSender.flush(); - - this.telemetryStats.successfulEvents++; - this.logger.debug(`Sent event ${event.operationId} during shutdown`); - } catch (error) { - this.telemetryStats.failedEvents++; - this.logger.warn(`Failed to send event ${event.operationId} during shutdown: ${error.message}`); - } - } else { - this.telemetryStats.failedEvents++; - this.logger.warn(`Dropping event ${event.operationId} during shutdown - no connection available`); - } - return; - } - - // If QuestDB is down, queue event for later retry - if (!this.localSender || this.isConnectionDown) { - this.persistentEventQueue.push(event); - - // If memory queue is full, drop oldest events - if (this.persistentEventQueue.length > this.maxPersistentQueueSize) { - const droppedEvent = this.persistentEventQueue.shift(); - this.telemetryStats.failedEvents++; - this.logger.warn(`Dropped event ${droppedEvent.operationId} due to queue size limit`); - } - - this.telemetryStats.connectionStatus = 'disconnected'; - this.isConnectionDown = true; - this.logError('QuestDB local sender not available, event queued for retry'); - return; - } - - // Add event to batch - this.eventBatch.push(event); - - // Prevent memory leaks: force flush if batch gets too large - if (this.eventBatch.length >= this.maxBatchSize) { - this.logger.warn(`Batch size limit reached (${this.maxBatchSize}), forcing flush`); - await this.flushBatch(); - } - // Send batch if it's full - else if (this.eventBatch.length >= this.batchSize) { - await this.flushBatch(); - } - } - - async cleanup() { - try { - this.isShuttingDown = true; - - if (this.healthCheckTimer) { - clearInterval(this.healthCheckTimer); - this.healthCheckTimer = null; - } - - if (this.batchTimer) { - clearInterval(this.batchTimer); - this.batchTimer = null; - } - - if (this.connectionHealthTimer) { - clearInterval(this.connectionHealthTimer); - this.connectionHealthTimer = null; - } - - if (this.retryQueueTimer) { - clearInterval(this.retryQueueTimer); - this.retryQueueTimer = null; - } - - // Flush any remaining events in batch - if (this.eventBatch.length > 0) { - this.logger.info(`Flushing final batch of ${this.eventBatch.length} events before shutdown`); - await this.flushBatch(); - } - - // Flush any persistent events - if (this.persistentEventQueue.length > 0) { - this.logger.info(`Flushing final persistent queue of ${this.persistentEventQueue.length} events before shutdown`); - await this.retryPersistentEvents(); - - // Log any remaining events that couldn't be sent - if (this.persistentEventQueue.length > 0) { - this.logger.warn(`Could not send ${this.persistentEventQueue.length} events before shutdown`); - } - } - - if (this.localSender) { - await this.localSender.flush(); - await this.localSender.close(); - this.localSender = null; - } - - this.logger.info('QuestDB telemetry cleanup completed'); - } catch (error) { - this.logger.error(`Error during QuestDB telemetry cleanup: ${error.message}`); - } - } -} - -export default QuestTelemetry; +git a \ No newline at end of file From 51d3ce41b93e34a1417e1d2aaba3d6c240c018fb Mon Sep 17 00:00:00 2001 From: Niks988 Date: Mon, 28 Jul 2025 16:14:08 +0200 Subject: [PATCH 3/4] improved proof logging, other improvements --- .../implementation/quest-telemetry.js | 859 +++++++++++++++++- 1 file changed, 858 insertions(+), 1 deletion(-) diff --git a/src/modules/telemetry/implementation/quest-telemetry.js b/src/modules/telemetry/implementation/quest-telemetry.js index f9a0c62a44..ba88cb09b9 100644 --- a/src/modules/telemetry/implementation/quest-telemetry.js +++ b/src/modules/telemetry/implementation/quest-telemetry.js @@ -1 +1,858 @@ -git a \ No newline at end of file +import { Sender } from '@questdb/nodejs-client'; + +class QuestTelemetry { + constructor() { + this.localSender = null; + this.lastErrorLogTime = 0; + this.errorLogInterval = 60000; // 1 minute between error logs + this.retryAttempts = 0; + this.maxRetryAttempts = 5; + this.baseRetryDelay = 1000; // 1 second + + // Health monitoring + this.healthCheckInterval = 3 * 60 * 1000; // 3 minutes + this.healthCheckTimer = null; + this.telemetryStats = { + totalEvents: 0, + successfulEvents: 0, + failedEvents: 0, + droppedEvents: 0, // Track permanently dropped events + proofEvents: 0, + successfulProofEvents: 0, + lastHealthCheck: Date.now(), + connectionStatus: 'disconnected', + startTime: Date.now() // Track when telemetry module started + }; + + // Detailed metrics (no logging spam) + this.metrics = { + eventsByType: new Map(), + proofEventsByBlockchain: new Map(), + finalizedProofsByBlockchain: new Map(), // Track PROOF_CHALANGE_FINALIZED per chain (lifetime) + submittedProofsByBlockchain: new Map(), // Track PROOF_SUBMITTED per chain (lifetime) + // Period tracking (resets every 3 minutes) + periodFinalizedProofsByBlockchain: new Map(), // Track PROOF_CHALANGE_FINALIZED per chain (current period) + periodSubmittedProofsByBlockchain: new Map(), // Track PROOF_SUBMITTED per chain (current period) + recentProofEvents: [], // Last 50 proof events + processedProofEvents: new Set(), // Track processed proof events to prevent duplicates + lastProofEventTime: null, + maxRecentEvents: 50, + totalDroppedEvents: 0, // Lifetime counter + erroredEvents: new Map(), // Track errored events by type + erroredEventsByBlockchain: new Map() // Track errored events by blockchain + }; + + // Event batching + this.batchSize = 50; // Send batch when 50 events collected + this.maxBatchSize = 200; // Maximum batch size to prevent memory leaks + this.batchTimeout = 5000; // Send batch after 5 seconds + this.eventBatch = []; + this.batchTimer = null; + + // Bulletproof features + this.isShuttingDown = false; + this.connectionHealthTimer = null; + this.connectionHealthInterval = 30 * 1000; // 30 seconds + this.lastSuccessfulSend = Date.now(); + this.maxTimeWithoutSuccess = 5 * 60 * 1000; // 5 minutes + + // Event persistence for when QuestDB is down + this.persistentEventQueue = []; + this.maxPersistentQueueSize = 10000; // Increased to 10,000 events + this.isConnectionDown = false; + this.retryQueueTimer = null; + this.retryQueueInterval = 10 * 1000; // 10 seconds + } + + async initialize(config, logger) { + this.config = config; + this.logger = logger; + await this.createLocalSender(); + this.startHealthMonitoring(); + this.startBatchTimer(); + this.startConnectionHealthCheck(); + this.startRetryQueueTimer(); + + // Graceful shutdown handling + process.on('SIGINT', () => this.gracefulShutdown()); + process.on('SIGTERM', () => this.gracefulShutdown()); + } + + async gracefulShutdown() { + if (this.isShuttingDown) return; + this.isShuttingDown = true; + + this.logger.info('QuestDB telemetry graceful shutdown initiated'); + await this.cleanup(); + } + + async createLocalSender() { + try { + this.localSender = Sender.fromConfig(this.config.localEndpoint); + this.retryAttempts = 0; + this.telemetryStats.connectionStatus = 'connected'; + this.lastSuccessfulSend = Date.now(); + this.logger.debug('QuestDB local sender created successfully'); + } catch (error) { + this.telemetryStats.connectionStatus = 'failed'; + this.logError(`Failed to create QuestDB local sender: ${error.message}`); + } + } + + startHealthMonitoring() { + this.healthCheckTimer = setInterval(() => { + this.logHealthStatus(); + }, this.healthCheckInterval); + + this.logger.info('QuestDB telemetry health monitoring started (every 3 minutes)'); + } + + startConnectionHealthCheck() { + this.connectionHealthTimer = setInterval(() => { + this.checkConnectionHealth(); + }, this.connectionHealthInterval); + + this.logger.info('QuestDB telemetry connection health check started (every 30 seconds)'); + } + + startRetryQueueTimer() { + this.retryQueueTimer = setInterval(() => { + this.retryPersistentEvents(); + }, this.retryQueueInterval); + + this.logger.info('QuestDB telemetry persistent event retry timer started (every 10 seconds)'); + } + + checkConnectionHealth() { + const timeSinceLastSuccess = Date.now() - this.lastSuccessfulSend; + + if (timeSinceLastSuccess > this.maxTimeWithoutSuccess) { + this.logger.warn(`No successful telemetry sends for ${Math.round(timeSinceLastSuccess / 1000)}s, recreating connection`); + this.recreateConnection(); + } + } + + async recreateConnection() { + try { + if (this.localSender) { + await this.localSender.flush(); + await this.localSender.close(); + } + await this.createLocalSender(); + } catch (error) { + this.logError(`Failed to recreate QuestDB connection: ${error.message}`); + } + } + + startBatchTimer() { + this.batchTimer = setInterval(() => { + this.flushBatch(); + }, this.batchTimeout); + + this.logger.info(`QuestDB telemetry batching started (batch size: ${this.batchSize}, max: ${this.maxBatchSize}, timeout: ${this.batchTimeout}ms)`); + } + + async flushBatch() { + if (this.eventBatch.length === 0) { + return; + } + + const batchToSend = [...this.eventBatch]; + this.eventBatch = []; + + // Count proof events in this batch + const proofEvents = batchToSend.filter(event => { + const name = event.name; + return name && ( + name === 'PROOF_CHALANGE_FINALIZED' || + name === 'PROOF_SUBMITTED' || + name.includes('PROOF_') || + name.includes('proof_') || + (name.includes('proof') && (name.includes('submit') || name.includes('final') || name.includes('complete'))) + ); + }); + + try { + await this.retryWithBackoff(async () => { + for (const event of batchToSend) { + const table = this.localSender.table('event'); + + table.symbol('operationId', event.operationId || 'NULL'); + table.symbol('blockchainId', event.blockchainId || 'NULL'); + table.symbol('name', event.name || 'NULL'); + if (event.value1 !== null) table.symbol('value1', event.value1); + if (event.value2 !== null) table.symbol('value2', event.value2); + if (event.value3 !== null) table.symbol('value3', event.value3); + table.timestampColumn('timestamp', event.timestamp * 1000); + + await table.at(Date.now(), 'ms'); + } + + await this.localSender.flush(); + }); + + this.telemetryStats.successfulEvents += batchToSend.length; + this.telemetryStats.connectionStatus = 'connected'; + this.lastSuccessfulSend = Date.now(); + + // Track successful proof events + if (proofEvents.length > 0) { + this.telemetryStats.successfulProofEvents += proofEvents.length; + } + } catch (err) { + this.telemetryStats.failedEvents += batchToSend.length; + this.telemetryStats.connectionStatus = 'error'; + this.logError(`Error sending batch of ${batchToSend.length} events to QuestDB: ${err.message}`); + + // Track errored events by type and blockchain + batchToSend.forEach(event => { + // Track by event type + const typeCount = this.metrics.erroredEvents.get(event.name) || 0; + this.metrics.erroredEvents.set(event.name, typeCount + 1); + + // Track by blockchain + const blockchainKey = `${event.blockchainId}:${event.name}`; + const blockchainCount = this.metrics.erroredEventsByBlockchain.get(blockchainKey) || 0; + this.metrics.erroredEventsByBlockchain.set(blockchainKey, blockchainCount + 1); + }); + + // Log failed proof events (important to know) + if (proofEvents.length > 0) { + this.logger.error(`[TELEMETRY] Failed to send ${proofEvents.length} proof events: ${proofEvents.map(e => e.operationId).join(', ')}`); + } + + // Fallback: try to send events individually + await this.fallbackToIndividualEvents(batchToSend); + } + } + + async fallbackToIndividualEvents(events) { + this.logger.warn(`Attempting fallback: sending ${events.length} events individually`); + + for (const event of events) { + try { + await this.retryWithBackoff(async () => { + const table = this.localSender.table('event'); + + table.symbol('operationId', event.operationId || 'NULL'); + table.symbol('blockchainId', event.blockchainId || 'NULL'); + table.symbol('name', event.name || 'NULL'); + if (event.value1 !== null) table.symbol('value1', event.value1); + if (event.value2 !== null) table.symbol('value2', event.value2); + if (event.value3 !== null) table.symbol('value3', event.value3); + table.timestampColumn('timestamp', event.timestamp * 1000); + + await table.at(Date.now(), 'ms'); + await this.localSender.flush(); + }); + + this.telemetryStats.successfulEvents++; + this.lastSuccessfulSend = Date.now(); + } catch (error) { + this.telemetryStats.failedEvents++; + + // Track errored events by type and blockchain + const typeCount = this.metrics.erroredEvents.get(event.name) || 0; + this.metrics.erroredEvents.set(event.name, typeCount + 1); + + const blockchainKey = `${event.blockchainId}:${event.name}`; + const blockchainCount = this.metrics.erroredEventsByBlockchain.get(blockchainKey) || 0; + this.metrics.erroredEventsByBlockchain.set(blockchainKey, blockchainCount + 1); + + this.logError(`Fallback failed for event ${event.operationId}: ${error.message}`); + } + } + } + + logHealthStatus() { + const now = Date.now(); + const timeSinceLastCheck = now - this.telemetryStats.lastHealthCheck; + + // Calculate proper failed events (events that weren't successful or dropped) + const actualFailedEvents = Math.max(0, this.telemetryStats.totalEvents - this.telemetryStats.successfulEvents - this.telemetryStats.droppedEvents); + + // Ensure success rate doesn't exceed 100% + const maxSuccessfulEvents = Math.min(this.telemetryStats.successfulEvents, this.telemetryStats.totalEvents); + const successRate = this.telemetryStats.totalEvents > 0 + ? ((maxSuccessfulEvents / this.telemetryStats.totalEvents) * 100).toFixed(1) + : 0; + + // Calculate batch status + const batchStatus = this.eventBatch.length > 0 + ? `Pending for next batch: ${this.eventBatch.length}/${this.batchSize}` + : 'Batch: empty'; + + // Calculate queue status + const queueStatus = this.persistentEventQueue.length > 0 + ? `Queue: ${this.persistentEventQueue.length}/${this.maxPersistentQueueSize}` + : 'Queue: empty'; + + // Explain what happened to non-successful events + let failureExplanation = ''; + const totalNonSuccessful = actualFailedEvents + this.telemetryStats.droppedEvents; + if (totalNonSuccessful > 0) { + const accountedEvents = this.eventBatch.length + this.persistentEventQueue.length; + const unexplainedEvents = Math.max(0, actualFailedEvents - accountedEvents); + + let explanations = []; + if (this.eventBatch.length > 0) { + explanations.push(`${this.eventBatch.length} moved to next batch`); + } + if (this.persistentEventQueue.length > 0) { + explanations.push(`${this.persistentEventQueue.length} queued for retry`); + } + if (this.telemetryStats.droppedEvents > 0) { + explanations.push(`${this.telemetryStats.droppedEvents} DROPPED`); + } + if (unexplainedEvents > 0) { + explanations.push(`${unexplainedEvents} in processing pipeline`); + } + + if (explanations.length > 0) { + failureExplanation = ` (${explanations.join(', ')})`; + } + } + + // Include lifetime dropped count if any + const droppedInfo = this.metrics.totalDroppedEvents > 0 + ? `, Lifetime dropped: ${this.metrics.totalDroppedEvents}` + : ''; + + // 1. TELEMETRY HEALTH - General system health without proofs + this.logger.info( + `[TELEMETRY HEALTH] Status: ${this.telemetryStats.connectionStatus}, ` + + `Success Rate: ${successRate}%, ` + + `Events: ${maxSuccessfulEvents}/${this.telemetryStats.totalEvents} successful, ` + + `Failed: ${totalNonSuccessful}${failureExplanation}, ` + + `${batchStatus}, ` + + `${queueStatus}, ` + + `Last Success: ${Math.round((now - this.lastSuccessfulSend) / 1000)}s ago, ` + + `Period: ${Math.round(timeSinceLastCheck / 1000)}s${droppedInfo}` + ); + + // 2. TELEMETRY PROOFS - Proof event statistics + this.logProofStatus(); + + // Log error details immediately if there are actually tracked errors + const totalTrackedErrors = Array.from(this.metrics.erroredEvents.values()).reduce((sum, count) => sum + count, 0); + if (totalTrackedErrors > 0) { + this.logErrorDetails(); + } + + // Log detailed metrics every 3rd health check (9 minutes) + if (this.telemetryStats.lastHealthCheck > 0 && (Date.now() - this.telemetryStats.lastHealthCheck) > 0) { + const healthCheckCount = Math.floor((now - this.telemetryStats.lastHealthCheck) / this.healthCheckInterval); + if (healthCheckCount % 3 === 0) { + this.logDetailedMetrics(); + + // Log error details if any (backup in case not logged above) + this.logErrorDetails(); + } + } + + // Reset stats for next period + this.telemetryStats.totalEvents = 0; + this.telemetryStats.successfulEvents = 0; + this.telemetryStats.failedEvents = 0; + this.telemetryStats.droppedEvents = 0; // Reset period counter + this.telemetryStats.proofEvents = 0; + this.telemetryStats.successfulProofEvents = 0; + this.telemetryStats.lastHealthCheck = now; + } + + logProofStatus() { + const finalizedTotal = Array.from(this.metrics.finalizedProofsByBlockchain.values()).reduce((sum, count) => sum + count, 0); + const submittedTotal = Array.from(this.metrics.submittedProofsByBlockchain.values()).reduce((sum, count) => sum + count, 0); + + if (finalizedTotal > 0 || submittedTotal > 0) { + // Finalized proofs per blockchain + const finalizedByChain = Array.from(this.metrics.finalizedProofsByBlockchain.entries()) + .map(([chain, count]) => `${chain}:${count}`) + .join(', '); + + // Submitted proofs per blockchain + const submittedByChain = Array.from(this.metrics.submittedProofsByBlockchain.entries()) + .map(([chain, count]) => `${chain}:${count}`) + .join(', '); + + const timeSinceLastProof = this.metrics.lastProofEventTime + ? Math.round((Date.now() - this.metrics.lastProofEventTime) / 1000) + : 'never'; + + // Calculate telemetry uptime + const uptimeMs = Date.now() - this.telemetryStats.startTime; + const uptimeHours = (uptimeMs / (1000 * 60 * 60)).toFixed(1); + const startTimeFormatted = new Date(this.telemetryStats.startTime).toLocaleString('en-US', { + month: '2-digit', + day: '2-digit', + hour12: false, + hour: '2-digit', + minute: '2-digit' + }); + + let proofLog = `[TELEMETRY PROOFS] Since telemetry/ot-node start ${startTimeFormatted} (${uptimeHours}h) - Events received and sent to QuestDB:`; + + if (finalizedByChain) { + proofLog += ` PROOF_CHALANGE_FINALIZED: ${finalizedByChain}`; + } + + if (submittedByChain) { + proofLog += ` PROOF_SUBMITTED: ${submittedByChain}`; + } + + proofLog += ` Last proof: ${timeSinceLastProof}s ago`; + + this.logger.info(proofLog); + } else { + // Calculate telemetry uptime for no events case + const uptimeMs = Date.now() - this.telemetryStats.startTime; + const uptimeHours = (uptimeMs / (1000 * 60 * 60)).toFixed(1); + const startTimeFormatted = new Date(this.telemetryStats.startTime).toLocaleString('en-US', { + month: '2-digit', + day: '2-digit', + hour12: false, + hour: '2-digit', + minute: '2-digit' + }); + + this.logger.info(`[TELEMETRY PROOFS] Since telemetry/ot-node start ${startTimeFormatted} (${uptimeHours}h) - No proof events received from ot-node (telemetry waiting for proof events)`); + } + + // Reset period counters for next 3-minute period + this.metrics.periodFinalizedProofsByBlockchain.clear(); + this.metrics.periodSubmittedProofsByBlockchain.clear(); + } + + logDetailedMetrics() { + const topEventTypes = Array.from(this.metrics.eventsByType.entries()) + .sort((a, b) => b[1] - a[1]) + .slice(0, 5) + .map(([type, count]) => `${type}:${count}`) + .join(', '); + + this.logger.info(`[TELEMETRY METRICS] Top events: ${topEventTypes || 'none'}`); + } + + logErrorDetails() { + const totalErroredEvents = Array.from(this.metrics.erroredEvents.values()).reduce((sum, count) => sum + count, 0); + + if (totalErroredEvents > 0) { + // Top 5 errored event types + const topErroredTypes = Array.from(this.metrics.erroredEvents.entries()) + .sort((a, b) => b[1] - a[1]) + .slice(0, 5) + .map(([type, count]) => `${type}:${count}`) + .join(', '); + + // Errored events by blockchain (proof events specifically) + const erroredByBlockchain = Array.from(this.metrics.erroredEventsByBlockchain.entries()) + .filter(([key]) => key.includes('PROOF_')) + .map(([key, count]) => `${key}:${count}`) + .join(', '); + + // All errored events by blockchain for complete visibility + const allErroredByBlockchain = Array.from(this.metrics.erroredEventsByBlockchain.entries()) + .slice(0, 10) // Limit to top 10 to avoid spam + .map(([key, count]) => `${key}:${count}`) + .join(', '); + + let errorLog = `[TELEMETRY ERRORS] Total errored: ${totalErroredEvents}, Top types: ${topErroredTypes}`; + + if (erroredByBlockchain) { + errorLog += `, Proof errors: ${erroredByBlockchain}`; + } + + if (allErroredByBlockchain) { + errorLog += `, All errors by chain: ${allErroredByBlockchain}`; + } + + this.logger.warn(errorLog); + } + } + + logLifetimeProofSummary() { + // This method is no longer needed as we moved proof logging to logProofStatus() + // Keep empty for now to avoid breaking existing timers + } + + getMetricsSummary() { + const topEventTypes = Array.from(this.metrics.eventsByType.entries()) + .sort((a, b) => b[1] - a[1]) + .slice(0, 5) + .map(([type, count]) => `${type}:${count}`) + .join(', '); + + // Finalized proofs per blockchain + const finalizedByBlockchain = Array.from(this.metrics.finalizedProofsByBlockchain.entries()) + .map(([blockchain, count]) => `${blockchain}:${count}`) + .join(', '); + + // Submitted proofs per blockchain + const submittedByBlockchain = Array.from(this.metrics.submittedProofsByBlockchain.entries()) + .map(([blockchain, count]) => `${blockchain}:${count}`) + .join(', '); + + const timeSinceLastProof = this.metrics.lastProofEventTime + ? Math.round((Date.now() - this.metrics.lastProofEventTime) / 1000) + : 'never'; + + let summary = `Top events: ${topEventTypes || 'none'}`; + + if (finalizedByBlockchain) { + summary += `, Finalized proofs: ${finalizedByBlockchain}`; + } + + if (submittedByBlockchain) { + summary += `, Submitted proofs: ${submittedByBlockchain}`; + } + + summary += `, Last proof: ${timeSinceLastProof}s ago`; + + return summary; + } + + logError(message) { + const now = Date.now(); + if (now - this.lastErrorLogTime > this.errorLogInterval) { + this.logger.error(message); + this.lastErrorLogTime = now; + } + } + + async retryWithBackoff(operation) { + for (let attempt = 0; attempt < this.maxRetryAttempts; attempt++) { + try { + return await operation(); + } catch (error) { + if (attempt === this.maxRetryAttempts - 1) { + throw error; + } + + const delay = this.baseRetryDelay * Math.pow(2, attempt); + this.logError(`QuestDB operation failed, retrying in ${delay}ms (attempt ${attempt + 1}/${this.maxRetryAttempts}): ${error.message}`); + await new Promise(resolve => setTimeout(resolve, delay)); + + // Recreate sender on connection errors + if (error.message.includes('ECONNRESET') || error.message.includes('connection')) { + this.telemetryStats.connectionStatus = 'reconnecting'; + await this.createLocalSender(); + } + } + } + } + + async retryPersistentEvents() { + if (this.persistentEventQueue.length === 0 || !this.localSender) { + return; + } + + this.logger.info(`Attempting to retry ${this.persistentEventQueue.length} persistent events`); + + const eventsToRetry = [...this.persistentEventQueue]; + this.persistentEventQueue = []; + + try { + await this.retryWithBackoff(async () => { + for (const event of eventsToRetry) { + const table = this.localSender.table('event'); + + table.symbol('operationId', event.operationId || 'NULL'); + table.symbol('blockchainId', event.blockchainId || 'NULL'); + table.symbol('name', event.name || 'NULL'); + if (event.value1 !== null) table.symbol('value1', event.value1); + if (event.value2 !== null) table.symbol('value2', event.value2); + if (event.value3 !== null) table.symbol('value3', event.value3); + table.timestampColumn('timestamp', event.timestamp * 1000); + + await table.at(Date.now(), 'ms'); + } + + await this.localSender.flush(); + }); + + this.telemetryStats.successfulEvents += eventsToRetry.length; + this.telemetryStats.connectionStatus = 'connected'; + this.lastSuccessfulSend = Date.now(); + this.isConnectionDown = false; + + this.logger.info(`Successfully retried ${eventsToRetry.length} persistent events to QuestDB`); + } catch (err) { + // Put events back in queue for next retry + this.persistentEventQueue.unshift(...eventsToRetry); + + // If queue gets too large, drop oldest events + if (this.persistentEventQueue.length > this.maxPersistentQueueSize) { + const droppedCount = this.persistentEventQueue.length - this.maxPersistentQueueSize; + this.persistentEventQueue = this.persistentEventQueue.slice(0, this.maxPersistentQueueSize); + + // Track dropped events + this.telemetryStats.droppedEvents += droppedCount; + this.metrics.totalDroppedEvents += droppedCount; + this.logger.warn(`PERMANENTLY DROPPED ${droppedCount} events due to queue size limit (total dropped: ${this.metrics.totalDroppedEvents})`); + } + + this.telemetryStats.connectionStatus = 'error'; + this.isConnectionDown = true; + this.logError(`Failed to retry persistent events: ${err.message}`); + } + } + + listenOnEvents(eventEmitter, onEventReceived) { + return eventEmitter.on('operation_status_changed', onEventReceived); + } + + async sendTelemetryData( + operationId, + timestamp, + blockchainId = '', + name = '', + value1 = null, + value2 = null, + value3 = null, + ) { + this.telemetryStats.totalEvents++; + + // Track metrics silently + this.trackEventMetrics(operationId, name, blockchainId, timestamp); + + const event = { + operationId, + timestamp, + blockchainId, + name, + value1, + value2, + value3 + }; + + // If shutting down, try to send immediately instead of dropping + if (this.isShuttingDown) { + if (this.localSender && !this.isConnectionDown) { + try { + const table = this.localSender.table('event'); + + table.symbol('operationId', event.operationId || 'NULL'); + table.symbol('blockchainId', event.blockchainId || 'NULL'); + table.symbol('name', event.name || 'NULL'); + if (event.value1 !== null) table.symbol('value1', event.value1); + if (event.value2 !== null) table.symbol('value2', event.value2); + if (event.value3 !== null) table.symbol('value3', event.value3); + table.timestampColumn('timestamp', event.timestamp * 1000); + + await table.at(Date.now(), 'ms'); + await this.localSender.flush(); + + this.telemetryStats.successfulEvents++; + } catch (error) { + this.telemetryStats.failedEvents++; + this.logger.warn(`Failed to send event during shutdown: ${error.message}`); + } + } else { + this.telemetryStats.failedEvents++; + this.logger.warn(`Dropping event during shutdown - no connection available`); + } + return; + } + + // If QuestDB is down, queue event for later retry + if (!this.localSender || this.isConnectionDown) { + this.persistentEventQueue.push(event); + + // If memory queue is full, drop oldest events + if (this.persistentEventQueue.length > this.maxPersistentQueueSize) { + const droppedEvent = this.persistentEventQueue.shift(); + this.telemetryStats.droppedEvents++; + this.metrics.totalDroppedEvents++; + this.logger.warn(`PERMANENTLY DROPPED event ${droppedEvent.operationId} due to queue size limit (total dropped: ${this.metrics.totalDroppedEvents})`); + } + + this.telemetryStats.connectionStatus = 'disconnected'; + this.isConnectionDown = true; + this.logError('QuestDB local sender not available, event queued for retry'); + return; + } + + // Add event to batch + this.eventBatch.push(event); + + // Prevent memory leaks: force flush if batch gets too large + if (this.eventBatch.length >= this.maxBatchSize) { + this.logger.warn(`Batch size limit reached (${this.maxBatchSize}), forcing flush`); + await this.flushBatch(); + } + // Send batch if it's full + else if (this.eventBatch.length >= this.batchSize) { + await this.flushBatch(); + } + } + + trackEventMetrics(operationId, name, blockchainId, timestamp) { + // Track event types + const currentCount = this.metrics.eventsByType.get(name) || 0; + this.metrics.eventsByType.set(name, currentCount + 1); + + // Track specific proof events + const isProofEvent = name && ( + name === 'PROOF_CHALANGE_FINALIZED' || + name === 'PROOF_SUBMITTED' || + name.includes('PROOF_') || + name.includes('proof_') || + (name.includes('proof') && (name.includes('submit') || name.includes('final') || name.includes('complete'))) + ); + + if (isProofEvent) { + // Create unique identifier for this proof event + const proofEventId = `${operationId}:${name}:${blockchainId}`; + + // Check if we've already processed this exact proof event + if (this.metrics.processedProofEvents.has(proofEventId)) { + return; // Skip processing this duplicate silently + } + + // Mark this proof event as processed + this.metrics.processedProofEvents.add(proofEventId); + + // Clean up old processed events to prevent memory leaks (keep last 1000) + if (this.metrics.processedProofEvents.size > 1000) { + const processedArray = Array.from(this.metrics.processedProofEvents); + this.metrics.processedProofEvents.clear(); + // Keep the last 500 events + processedArray.slice(-500).forEach(id => this.metrics.processedProofEvents.add(id)); + } + + this.telemetryStats.proofEvents++; + this.metrics.lastProofEventTime = Date.now(); + + // Track by blockchain (general) + const blockchainCount = this.metrics.proofEventsByBlockchain.get(blockchainId) || 0; + this.metrics.proofEventsByBlockchain.set(blockchainId, blockchainCount + 1); + + // Track specific proof types per blockchain + if (name === 'PROOF_CHALANGE_FINALIZED') { + // Update lifetime counter + const finalizedCount = this.metrics.finalizedProofsByBlockchain.get(blockchainId) || 0; + this.metrics.finalizedProofsByBlockchain.set(blockchainId, finalizedCount + 1); + + // Update period counter + const periodFinalizedCount = this.metrics.periodFinalizedProofsByBlockchain.get(blockchainId) || 0; + this.metrics.periodFinalizedProofsByBlockchain.set(blockchainId, periodFinalizedCount + 1); + + // Parse operation ID for clearer display + const operationDetails = this.parseOperationId(operationId, blockchainId); + const periodMinutes = Math.round(this.healthCheckInterval / (1000 * 60)); // 3 minutes + const eventCount = periodFinalizedCount + 1; + const eventText = eventCount === 1 ? 'event' : 'events'; + this.logger.info(`[TELEMETRY PROOFS] PROOF_CHALANGE_FINALIZED received from ot-node and emitted to QuestDB for ${blockchainId} ${operationDetails} - ${eventCount} ${eventText} received in last ${periodMinutes} minutes`); + } else if (name === 'PROOF_SUBMITTED') { + // Update lifetime counter + const submittedCount = this.metrics.submittedProofsByBlockchain.get(blockchainId) || 0; + this.metrics.submittedProofsByBlockchain.set(blockchainId, submittedCount + 1); + + // Update period counter + const periodSubmittedCount = this.metrics.periodSubmittedProofsByBlockchain.get(blockchainId) || 0; + this.metrics.periodSubmittedProofsByBlockchain.set(blockchainId, periodSubmittedCount + 1); + + // Parse operation ID for clearer display + const operationDetails = this.parseOperationId(operationId, blockchainId); + const periodMinutes = Math.round(this.healthCheckInterval / (1000 * 60)); // 3 minutes + const eventCount = periodSubmittedCount + 1; + const eventText = eventCount === 1 ? 'event' : 'events'; + this.logger.info(`[TELEMETRY PROOFS] PROOF_SUBMITTED received from ot-node and emitted to QuestDB for ${blockchainId} ${operationDetails} - ${eventCount} ${eventText} received in last ${periodMinutes} minutes`); + } + + // Keep recent proof events (for debugging) + this.metrics.recentProofEvents.push({ + operationId, + name, + blockchainId, + timestamp: Date.now() + }); + + // Limit size + if (this.metrics.recentProofEvents.length > this.metrics.maxRecentEvents) { + this.metrics.recentProofEvents.shift(); + } + + // Log milestones for other proof events (every 10th) + if (name !== 'PROOF_CHALANGE_FINALIZED' && name !== 'PROOF_SUBMITTED' && this.telemetryStats.proofEvents % 10 === 0) { + this.logger.info(`[TELEMETRY PROOFS] Proof milestone: ${this.telemetryStats.proofEvents} proof events received (latest: ${operationId}:${name} on ${blockchainId})`); + } + } + } + + parseOperationId(operationId, blockchainId) { + // Operation ID format: ${blockchainId}-${epoch}-${activeProofPeriodStartBlock} + // Example: base:8453-7-33325200 + try { + // Remove the blockchain prefix to get the numbers + const withoutBlockchain = operationId.replace(`${blockchainId}-`, ''); + const parts = withoutBlockchain.split('-'); + + if (parts.length >= 2) { + const epoch = parts[0]; + const block = parts[1]; + return `(epoch: ${epoch}, block: ${block})`; + } + } catch (error) { + // Fallback if parsing fails + } + + // Fallback to original format if parsing fails + return `(operation: ${operationId})`; + } + + async cleanup() { + try { + this.isShuttingDown = true; + + if (this.healthCheckTimer) { + clearInterval(this.healthCheckTimer); + this.healthCheckTimer = null; + } + + if (this.batchTimer) { + clearInterval(this.batchTimer); + this.batchTimer = null; + } + + if (this.connectionHealthTimer) { + clearInterval(this.connectionHealthTimer); + this.connectionHealthTimer = null; + } + + if (this.retryQueueTimer) { + clearInterval(this.retryQueueTimer); + this.retryQueueTimer = null; + } + + // Flush any remaining events in batch + if (this.eventBatch.length > 0) { + this.logger.info(`Flushing final batch of ${this.eventBatch.length} events before shutdown`); + await this.flushBatch(); + } + + // Flush any persistent events + if (this.persistentEventQueue.length > 0) { + this.logger.info(`Flushing final persistent queue of ${this.persistentEventQueue.length} events before shutdown`); + await this.retryPersistentEvents(); + + // Log any remaining events that couldn't be sent + if (this.persistentEventQueue.length > 0) { + this.logger.warn(`Could not send ${this.persistentEventQueue.length} events before shutdown`); + } + } + + if (this.localSender) { + await this.localSender.flush(); + await this.localSender.close(); + this.localSender = null; + } + + this.logger.info('QuestDB telemetry cleanup completed'); + } catch (error) { + this.logger.error(`Error during QuestDB telemetry cleanup: ${error.message}`); + } + } +} + +export default QuestTelemetry; From fe97c68f14beb89c750282d871234b6daba054e9 Mon Sep 17 00:00:00 2001 From: Samuel Wamala Date: Wed, 29 Oct 2025 15:33:23 +0300 Subject: [PATCH 4/4] fix: address PR review comments - Remove signal handler duplication to prevent shutdown race conditions - Fix telemetry cleanup method access path - Extract magic values to constants for better maintainability - Limit proof event logging to prevent huge logs - Refactor sequential retry to parallel for better performance - Fix queue status log consistency - Remove dead code (logLifetimeProofSummary) Addresses review comments from PR #4014 --- ot-node.js | 2 +- src/constants/constants.js | 31 ++ .../implementation/quest-telemetry.js | 317 +++++++++++------- 3 files changed, 228 insertions(+), 122 deletions(-) diff --git a/ot-node.js b/ot-node.js index 26f21e3c6f..58e3bad75f 100644 --- a/ot-node.js +++ b/ot-node.js @@ -448,7 +448,7 @@ class OTNode { const telemetryModuleManager = this.container.resolve('telemetryModuleManager'); if (telemetryModuleManager && telemetryModuleManager.getImplementation()) { this.logger.info('Cleaning up telemetry...'); - await telemetryModuleManager.getImplementation().module.cleanup(); + await telemetryModuleManager.getImplementation().cleanup(); } } catch (error) { this.logger.error(`Error during telemetry cleanup: ${error.message}`); diff --git a/src/constants/constants.js b/src/constants/constants.js index 5900aec023..f15737cb59 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -1105,3 +1105,34 @@ export const MAX_TOKEN_ID_PER_GET_PAGE = 50; export const BLAZEGRAPH_HEALTH_INTERVAL = 60 * 1000; export const MAX_COMMAND_LIFETIME = 15 * 60 * 1000; + +// Telemetry constants +export const TELEMETRY_CONNECTION_STATUS = { + CONNECTED: 'connected', + FAILED: 'failed', + DISCONNECTED: 'disconnected', + ERROR: 'error', + RECONNECTING: 'reconnecting', +}; + +export const TELEMETRY_INTERVALS = { + HEALTH_CHECK: 3 * 60 * 1000, // 3 minutes + CONNECTION_HEALTH: 30 * 1000, // 30 seconds + RETRY_QUEUE: 10 * 1000, // 10 seconds + BATCH_TIMEOUT: 5000, // 5 seconds + ERROR_LOG: 60000, // 1 minute +}; + +export const TELEMETRY_BATCH_CONFIG = { + BATCH_SIZE: 50, + MAX_BATCH_SIZE: 200, + MAX_PERSISTENT_QUEUE_SIZE: 10000, + MAX_RECENT_EVENTS: 50, + MAX_RETRY_ATTEMPTS: 5, + BASE_RETRY_DELAY: 1000, // 1 second + MAX_TIME_WITHOUT_SUCCESS: 5 * 60 * 1000, // 5 minutes +}; + +export const TELEMETRY_NULL_VALUE = 'NULL'; + +export const TELEMETRY_MAX_LOG_OPERATION_IDS = 10; diff --git a/src/modules/telemetry/implementation/quest-telemetry.js b/src/modules/telemetry/implementation/quest-telemetry.js index ba88cb09b9..1ed185e200 100644 --- a/src/modules/telemetry/implementation/quest-telemetry.js +++ b/src/modules/telemetry/implementation/quest-telemetry.js @@ -1,16 +1,19 @@ import { Sender } from '@questdb/nodejs-client'; +import { + TELEMETRY_CONNECTION_STATUS, + TELEMETRY_INTERVALS, + TELEMETRY_BATCH_CONFIG, + TELEMETRY_NULL_VALUE, + TELEMETRY_MAX_LOG_OPERATION_IDS, +} from '../../../constants/constants.js'; class QuestTelemetry { constructor() { this.localSender = null; this.lastErrorLogTime = 0; - this.errorLogInterval = 60000; // 1 minute between error logs this.retryAttempts = 0; - this.maxRetryAttempts = 5; - this.baseRetryDelay = 1000; // 1 second // Health monitoring - this.healthCheckInterval = 3 * 60 * 1000; // 3 minutes this.healthCheckTimer = null; this.telemetryStats = { totalEvents: 0, @@ -20,7 +23,7 @@ class QuestTelemetry { proofEvents: 0, successfulProofEvents: 0, lastHealthCheck: Date.now(), - connectionStatus: 'disconnected', + connectionStatus: TELEMETRY_CONNECTION_STATUS.DISCONNECTED, startTime: Date.now() // Track when telemetry module started }; @@ -36,32 +39,32 @@ class QuestTelemetry { recentProofEvents: [], // Last 50 proof events processedProofEvents: new Set(), // Track processed proof events to prevent duplicates lastProofEventTime: null, - maxRecentEvents: 50, totalDroppedEvents: 0, // Lifetime counter erroredEvents: new Map(), // Track errored events by type erroredEventsByBlockchain: new Map() // Track errored events by blockchain }; // Event batching - this.batchSize = 50; // Send batch when 50 events collected - this.maxBatchSize = 200; // Maximum batch size to prevent memory leaks - this.batchTimeout = 5000; // Send batch after 5 seconds this.eventBatch = []; this.batchTimer = null; // Bulletproof features this.isShuttingDown = false; this.connectionHealthTimer = null; - this.connectionHealthInterval = 30 * 1000; // 30 seconds this.lastSuccessfulSend = Date.now(); - this.maxTimeWithoutSuccess = 5 * 60 * 1000; // 5 minutes // Event persistence for when QuestDB is down this.persistentEventQueue = []; - this.maxPersistentQueueSize = 10000; // Increased to 10,000 events this.isConnectionDown = false; this.retryQueueTimer = null; - this.retryQueueInterval = 10 * 1000; // 10 seconds + + // Concurrency guards + this.isFlushingBatch = false; + this.isRetryingEvents = false; + this.isRecreatingConnection = false; + + // Health check tracking + this.healthCheckCount = 0; } async initialize(config, logger) { @@ -72,29 +75,17 @@ class QuestTelemetry { this.startBatchTimer(); this.startConnectionHealthCheck(); this.startRetryQueueTimer(); - - // Graceful shutdown handling - process.on('SIGINT', () => this.gracefulShutdown()); - process.on('SIGTERM', () => this.gracefulShutdown()); - } - - async gracefulShutdown() { - if (this.isShuttingDown) return; - this.isShuttingDown = true; - - this.logger.info('QuestDB telemetry graceful shutdown initiated'); - await this.cleanup(); } async createLocalSender() { try { this.localSender = Sender.fromConfig(this.config.localEndpoint); this.retryAttempts = 0; - this.telemetryStats.connectionStatus = 'connected'; + this.telemetryStats.connectionStatus = TELEMETRY_CONNECTION_STATUS.CONNECTED; this.lastSuccessfulSend = Date.now(); this.logger.debug('QuestDB local sender created successfully'); } catch (error) { - this.telemetryStats.connectionStatus = 'failed'; + this.telemetryStats.connectionStatus = TELEMETRY_CONNECTION_STATUS.FAILED; this.logError(`Failed to create QuestDB local sender: ${error.message}`); } } @@ -102,37 +93,59 @@ class QuestTelemetry { startHealthMonitoring() { this.healthCheckTimer = setInterval(() => { this.logHealthStatus(); - }, this.healthCheckInterval); + }, TELEMETRY_INTERVALS.HEALTH_CHECK); this.logger.info('QuestDB telemetry health monitoring started (every 3 minutes)'); } startConnectionHealthCheck() { - this.connectionHealthTimer = setInterval(() => { - this.checkConnectionHealth(); - }, this.connectionHealthInterval); + this.connectionHealthTimer = setInterval(async () => { + try { + await this.checkConnectionHealth(); + } catch (error) { + this.logger.error(`Error in connection health check: ${error.message}`); + } + }, TELEMETRY_INTERVALS.CONNECTION_HEALTH); this.logger.info('QuestDB telemetry connection health check started (every 30 seconds)'); } startRetryQueueTimer() { - this.retryQueueTimer = setInterval(() => { - this.retryPersistentEvents(); - }, this.retryQueueInterval); + this.retryQueueTimer = setInterval(async () => { + // Skip if already retrying to prevent concurrent execution + if (this.isRetryingEvents) { + this.logger.debug('Skipping retry cycle - previous retry still in progress'); + return; + } + + try { + await this.retryPersistentEvents(); + } catch (error) { + this.logger.error(`Error in retry queue timer: ${error.message}`); + } + }, TELEMETRY_INTERVALS.RETRY_QUEUE); this.logger.info('QuestDB telemetry persistent event retry timer started (every 10 seconds)'); } - checkConnectionHealth() { + async checkConnectionHealth() { const timeSinceLastSuccess = Date.now() - this.lastSuccessfulSend; - if (timeSinceLastSuccess > this.maxTimeWithoutSuccess) { + if (timeSinceLastSuccess > TELEMETRY_BATCH_CONFIG.MAX_TIME_WITHOUT_SUCCESS) { this.logger.warn(`No successful telemetry sends for ${Math.round(timeSinceLastSuccess / 1000)}s, recreating connection`); - this.recreateConnection(); + await this.recreateConnection(); } } async recreateConnection() { + // Prevent concurrent recreation attempts + if (this.isRecreatingConnection) { + this.logger.debug('Connection recreation already in progress, skipping'); + return; + } + + this.isRecreatingConnection = true; + try { if (this.localSender) { await this.localSender.flush(); @@ -141,15 +154,27 @@ class QuestTelemetry { await this.createLocalSender(); } catch (error) { this.logError(`Failed to recreate QuestDB connection: ${error.message}`); + } finally { + this.isRecreatingConnection = false; } } startBatchTimer() { - this.batchTimer = setInterval(() => { - this.flushBatch(); - }, this.batchTimeout); + this.batchTimer = setInterval(async () => { + // Skip if already flushing to prevent concurrent execution + if (this.isFlushingBatch) { + this.logger.debug('Skipping batch flush - previous flush still in progress'); + return; + } + + try { + await this.flushBatch(); + } catch (error) { + this.logger.error(`Error in batch timer: ${error.message}`); + } + }, TELEMETRY_INTERVALS.BATCH_TIMEOUT); - this.logger.info(`QuestDB telemetry batching started (batch size: ${this.batchSize}, max: ${this.maxBatchSize}, timeout: ${this.batchTimeout}ms)`); + this.logger.info(`QuestDB telemetry batching started (batch size: ${TELEMETRY_BATCH_CONFIG.BATCH_SIZE}, max: ${TELEMETRY_BATCH_CONFIG.MAX_BATCH_SIZE}, timeout: ${TELEMETRY_INTERVALS.BATCH_TIMEOUT}ms)`); } async flushBatch() { @@ -157,8 +182,17 @@ class QuestTelemetry { return; } - const batchToSend = [...this.eventBatch]; - this.eventBatch = []; + // Set guard to prevent concurrent flushes + if (this.isFlushingBatch) { + this.logger.debug('Flush already in progress, skipping'); + return; + } + + this.isFlushingBatch = true; + + try { + const batchToSend = [...this.eventBatch]; + this.eventBatch = []; // Count proof events in this batch const proofEvents = batchToSend.filter(event => { @@ -177,9 +211,9 @@ class QuestTelemetry { for (const event of batchToSend) { const table = this.localSender.table('event'); - table.symbol('operationId', event.operationId || 'NULL'); - table.symbol('blockchainId', event.blockchainId || 'NULL'); - table.symbol('name', event.name || 'NULL'); + table.symbol('operationId', event.operationId || TELEMETRY_NULL_VALUE); + table.symbol('blockchainId', event.blockchainId || TELEMETRY_NULL_VALUE); + table.symbol('name', event.name || TELEMETRY_NULL_VALUE); if (event.value1 !== null) table.symbol('value1', event.value1); if (event.value2 !== null) table.symbol('value2', event.value2); if (event.value3 !== null) table.symbol('value3', event.value3); @@ -192,7 +226,7 @@ class QuestTelemetry { }); this.telemetryStats.successfulEvents += batchToSend.length; - this.telemetryStats.connectionStatus = 'connected'; + this.telemetryStats.connectionStatus = TELEMETRY_CONNECTION_STATUS.CONNECTED; this.lastSuccessfulSend = Date.now(); // Track successful proof events @@ -201,7 +235,7 @@ class QuestTelemetry { } } catch (err) { this.telemetryStats.failedEvents += batchToSend.length; - this.telemetryStats.connectionStatus = 'error'; + this.telemetryStats.connectionStatus = TELEMETRY_CONNECTION_STATUS.ERROR; this.logError(`Error sending batch of ${batchToSend.length} events to QuestDB: ${err.message}`); // Track errored events by type and blockchain @@ -216,14 +250,20 @@ class QuestTelemetry { this.metrics.erroredEventsByBlockchain.set(blockchainKey, blockchainCount + 1); }); - // Log failed proof events (important to know) + // Log failed proof events (important to know) - limit to first 10 IDs to avoid huge logs if (proofEvents.length > 0) { - this.logger.error(`[TELEMETRY] Failed to send ${proofEvents.length} proof events: ${proofEvents.map(e => e.operationId).join(', ')}`); + const sampleIds = proofEvents.slice(0, TELEMETRY_MAX_LOG_OPERATION_IDS).map(e => e.operationId).join(', '); + const suffix = proofEvents.length > TELEMETRY_MAX_LOG_OPERATION_IDS ? ` (and ${proofEvents.length - TELEMETRY_MAX_LOG_OPERATION_IDS} more)` : ''; + this.logger.error(`[TELEMETRY] Failed to send ${proofEvents.length} proof events: ${sampleIds}${suffix}`); } // Fallback: try to send events individually await this.fallbackToIndividualEvents(batchToSend); } + } finally { + // Always release the guard + this.isFlushingBatch = false; + } } async fallbackToIndividualEvents(events) { @@ -234,9 +274,9 @@ class QuestTelemetry { await this.retryWithBackoff(async () => { const table = this.localSender.table('event'); - table.symbol('operationId', event.operationId || 'NULL'); - table.symbol('blockchainId', event.blockchainId || 'NULL'); - table.symbol('name', event.name || 'NULL'); + table.symbol('operationId', event.operationId || TELEMETRY_NULL_VALUE); + table.symbol('blockchainId', event.blockchainId || TELEMETRY_NULL_VALUE); + table.symbol('name', event.name || TELEMETRY_NULL_VALUE); if (event.value1 !== null) table.symbol('value1', event.value1); if (event.value2 !== null) table.symbol('value2', event.value2); if (event.value3 !== null) table.symbol('value3', event.value3); @@ -279,13 +319,12 @@ class QuestTelemetry { // Calculate batch status const batchStatus = this.eventBatch.length > 0 - ? `Pending for next batch: ${this.eventBatch.length}/${this.batchSize}` + ? `Pending for next batch: ${this.eventBatch.length}/${TELEMETRY_BATCH_CONFIG.BATCH_SIZE}` : 'Batch: empty'; - // Calculate queue status - const queueStatus = this.persistentEventQueue.length > 0 - ? `Queue: ${this.persistentEventQueue.length}/${this.maxPersistentQueueSize}` - : 'Queue: empty'; + // Calculate queue status with consistent percentage format + const queuePercentage = Math.round((this.persistentEventQueue.length / TELEMETRY_BATCH_CONFIG.MAX_PERSISTENT_QUEUE_SIZE) * 100); + const queueStatus = `Queue: ${this.persistentEventQueue.length}/${TELEMETRY_BATCH_CONFIG.MAX_PERSISTENT_QUEUE_SIZE} (${queuePercentage}%)`; // Explain what happened to non-successful events let failureExplanation = ''; @@ -339,15 +378,15 @@ class QuestTelemetry { this.logErrorDetails(); } + // Increment health check counter + this.healthCheckCount++; + // Log detailed metrics every 3rd health check (9 minutes) - if (this.telemetryStats.lastHealthCheck > 0 && (Date.now() - this.telemetryStats.lastHealthCheck) > 0) { - const healthCheckCount = Math.floor((now - this.telemetryStats.lastHealthCheck) / this.healthCheckInterval); - if (healthCheckCount % 3 === 0) { - this.logDetailedMetrics(); - - // Log error details if any (backup in case not logged above) - this.logErrorDetails(); - } + if (this.healthCheckCount % 3 === 0) { + this.logDetailedMetrics(); + + // Log error details if any (backup in case not logged above) + this.logErrorDetails(); } // Reset stats for next period @@ -470,10 +509,6 @@ class QuestTelemetry { } } - logLifetimeProofSummary() { - // This method is no longer needed as we moved proof logging to logProofStatus() - // Keep empty for now to avoid breaking existing timers - } getMetricsSummary() { const topEventTypes = Array.from(this.metrics.eventsByType.entries()) @@ -513,28 +548,28 @@ class QuestTelemetry { logError(message) { const now = Date.now(); - if (now - this.lastErrorLogTime > this.errorLogInterval) { + if (now - this.lastErrorLogTime > TELEMETRY_INTERVALS.ERROR_LOG) { this.logger.error(message); this.lastErrorLogTime = now; } } async retryWithBackoff(operation) { - for (let attempt = 0; attempt < this.maxRetryAttempts; attempt++) { + for (let attempt = 0; attempt < TELEMETRY_BATCH_CONFIG.MAX_RETRY_ATTEMPTS; attempt++) { try { return await operation(); } catch (error) { - if (attempt === this.maxRetryAttempts - 1) { + if (attempt === TELEMETRY_BATCH_CONFIG.MAX_RETRY_ATTEMPTS - 1) { throw error; } - const delay = this.baseRetryDelay * Math.pow(2, attempt); - this.logError(`QuestDB operation failed, retrying in ${delay}ms (attempt ${attempt + 1}/${this.maxRetryAttempts}): ${error.message}`); + const delay = TELEMETRY_BATCH_CONFIG.BASE_RETRY_DELAY * Math.pow(2, attempt); + this.logError(`QuestDB operation failed, retrying in ${delay}ms (attempt ${attempt + 1}/${TELEMETRY_BATCH_CONFIG.MAX_RETRY_ATTEMPTS}): ${error.message}`); await new Promise(resolve => setTimeout(resolve, delay)); // Recreate sender on connection errors if (error.message.includes('ECONNRESET') || error.message.includes('connection')) { - this.telemetryStats.connectionStatus = 'reconnecting'; + this.telemetryStats.connectionStatus = TELEMETRY_CONNECTION_STATUS.RECONNECTING; await this.createLocalSender(); } } @@ -546,54 +581,94 @@ class QuestTelemetry { return; } - this.logger.info(`Attempting to retry ${this.persistentEventQueue.length} persistent events`); + // Set guard to prevent concurrent retries + if (this.isRetryingEvents) { + this.logger.debug('Retry already in progress, skipping'); + return; + } + + this.isRetryingEvents = true; - const eventsToRetry = [...this.persistentEventQueue]; - this.persistentEventQueue = []; - try { - await this.retryWithBackoff(async () => { - for (const event of eventsToRetry) { + this.logger.info(`Attempting to retry ${this.persistentEventQueue.length} persistent events`); + + const eventsToRetry = [...this.persistentEventQueue]; + this.persistentEventQueue = []; + + // Retry all events in parallel for better performance + const retryPromises = eventsToRetry.map((event) => + this.retryWithBackoff(async () => { const table = this.localSender.table('event'); - table.symbol('operationId', event.operationId || 'NULL'); - table.symbol('blockchainId', event.blockchainId || 'NULL'); - table.symbol('name', event.name || 'NULL'); + table.symbol('operationId', event.operationId || TELEMETRY_NULL_VALUE); + table.symbol('blockchainId', event.blockchainId || TELEMETRY_NULL_VALUE); + table.symbol('name', event.name || TELEMETRY_NULL_VALUE); if (event.value1 !== null) table.symbol('value1', event.value1); if (event.value2 !== null) table.symbol('value2', event.value2); if (event.value3 !== null) table.symbol('value3', event.value3); table.timestampColumn('timestamp', event.timestamp * 1000); await table.at(Date.now(), 'ms'); - } - + }).then(() => ({ status: 'fulfilled', event })) + .catch((error) => ({ status: 'rejected', event, error })) + ); + + const results = await Promise.allSettled(retryPromises); + + // Flush all successful events at once + try { await this.localSender.flush(); + } catch (flushError) { + this.logError(`Failed to flush events during retry: ${flushError.message}`); + } + + // Count successes and failures + let successCount = 0; + let failedEvents = []; + + results.forEach((result) => { + if (result.status === 'fulfilled' && result.value.status === 'fulfilled') { + successCount++; + } else { + const eventData = result.status === 'fulfilled' ? result.value.event : result.reason?.event; + if (eventData) { + failedEvents.push(eventData); + } + } }); - - this.telemetryStats.successfulEvents += eventsToRetry.length; - this.telemetryStats.connectionStatus = 'connected'; - this.lastSuccessfulSend = Date.now(); - this.isConnectionDown = false; - - this.logger.info(`Successfully retried ${eventsToRetry.length} persistent events to QuestDB`); - } catch (err) { - // Put events back in queue for next retry - this.persistentEventQueue.unshift(...eventsToRetry); - - // If queue gets too large, drop oldest events - if (this.persistentEventQueue.length > this.maxPersistentQueueSize) { - const droppedCount = this.persistentEventQueue.length - this.maxPersistentQueueSize; - this.persistentEventQueue = this.persistentEventQueue.slice(0, this.maxPersistentQueueSize); + + if (successCount > 0) { + this.telemetryStats.successfulEvents += successCount; + this.telemetryStats.connectionStatus = TELEMETRY_CONNECTION_STATUS.CONNECTED; + this.lastSuccessfulSend = Date.now(); + this.isConnectionDown = false; + this.logger.info(`Successfully retried ${successCount}/${eventsToRetry.length} persistent events to QuestDB`); + } + + if (failedEvents.length > 0) { + // Put failed events back in queue for next retry + this.persistentEventQueue.unshift(...failedEvents); + + // If queue gets too large, drop oldest events + if (this.persistentEventQueue.length > TELEMETRY_BATCH_CONFIG.MAX_PERSISTENT_QUEUE_SIZE) { + const droppedCount = this.persistentEventQueue.length - TELEMETRY_BATCH_CONFIG.MAX_PERSISTENT_QUEUE_SIZE; + this.persistentEventQueue = this.persistentEventQueue.slice(0, TELEMETRY_BATCH_CONFIG.MAX_PERSISTENT_QUEUE_SIZE); + + // Track dropped events + this.telemetryStats.droppedEvents += droppedCount; + this.metrics.totalDroppedEvents += droppedCount; + this.logger.warn(`PERMANENTLY DROPPED ${droppedCount} events due to queue size limit (total dropped: ${this.metrics.totalDroppedEvents})`); + } - // Track dropped events - this.telemetryStats.droppedEvents += droppedCount; - this.metrics.totalDroppedEvents += droppedCount; - this.logger.warn(`PERMANENTLY DROPPED ${droppedCount} events due to queue size limit (total dropped: ${this.metrics.totalDroppedEvents})`); + if (successCount === 0) { + this.telemetryStats.connectionStatus = TELEMETRY_CONNECTION_STATUS.ERROR; + this.isConnectionDown = true; + } + this.logError(`Failed to retry ${failedEvents.length}/${eventsToRetry.length} persistent events`); } - - this.telemetryStats.connectionStatus = 'error'; - this.isConnectionDown = true; - this.logError(`Failed to retry persistent events: ${err.message}`); + } finally { + // Always release the guard + this.isRetryingEvents = false; } } @@ -631,9 +706,9 @@ class QuestTelemetry { try { const table = this.localSender.table('event'); - table.symbol('operationId', event.operationId || 'NULL'); - table.symbol('blockchainId', event.blockchainId || 'NULL'); - table.symbol('name', event.name || 'NULL'); + table.symbol('operationId', event.operationId || TELEMETRY_NULL_VALUE); + table.symbol('blockchainId', event.blockchainId || TELEMETRY_NULL_VALUE); + table.symbol('name', event.name || TELEMETRY_NULL_VALUE); if (event.value1 !== null) table.symbol('value1', event.value1); if (event.value2 !== null) table.symbol('value2', event.value2); if (event.value3 !== null) table.symbol('value3', event.value3); @@ -659,14 +734,14 @@ class QuestTelemetry { this.persistentEventQueue.push(event); // If memory queue is full, drop oldest events - if (this.persistentEventQueue.length > this.maxPersistentQueueSize) { + if (this.persistentEventQueue.length > TELEMETRY_BATCH_CONFIG.MAX_PERSISTENT_QUEUE_SIZE) { const droppedEvent = this.persistentEventQueue.shift(); this.telemetryStats.droppedEvents++; this.metrics.totalDroppedEvents++; this.logger.warn(`PERMANENTLY DROPPED event ${droppedEvent.operationId} due to queue size limit (total dropped: ${this.metrics.totalDroppedEvents})`); } - this.telemetryStats.connectionStatus = 'disconnected'; + this.telemetryStats.connectionStatus = TELEMETRY_CONNECTION_STATUS.DISCONNECTED; this.isConnectionDown = true; this.logError('QuestDB local sender not available, event queued for retry'); return; @@ -676,12 +751,12 @@ class QuestTelemetry { this.eventBatch.push(event); // Prevent memory leaks: force flush if batch gets too large - if (this.eventBatch.length >= this.maxBatchSize) { - this.logger.warn(`Batch size limit reached (${this.maxBatchSize}), forcing flush`); + if (this.eventBatch.length >= TELEMETRY_BATCH_CONFIG.MAX_BATCH_SIZE) { + this.logger.warn(`Batch size limit reached (${TELEMETRY_BATCH_CONFIG.MAX_BATCH_SIZE}), forcing flush`); await this.flushBatch(); } // Send batch if it's full - else if (this.eventBatch.length >= this.batchSize) { + else if (this.eventBatch.length >= TELEMETRY_BATCH_CONFIG.BATCH_SIZE) { await this.flushBatch(); } } @@ -739,7 +814,7 @@ class QuestTelemetry { // Parse operation ID for clearer display const operationDetails = this.parseOperationId(operationId, blockchainId); - const periodMinutes = Math.round(this.healthCheckInterval / (1000 * 60)); // 3 minutes + const periodMinutes = Math.round(TELEMETRY_INTERVALS.HEALTH_CHECK / (1000 * 60)); // 3 minutes const eventCount = periodFinalizedCount + 1; const eventText = eventCount === 1 ? 'event' : 'events'; this.logger.info(`[TELEMETRY PROOFS] PROOF_CHALANGE_FINALIZED received from ot-node and emitted to QuestDB for ${blockchainId} ${operationDetails} - ${eventCount} ${eventText} received in last ${periodMinutes} minutes`); @@ -754,7 +829,7 @@ class QuestTelemetry { // Parse operation ID for clearer display const operationDetails = this.parseOperationId(operationId, blockchainId); - const periodMinutes = Math.round(this.healthCheckInterval / (1000 * 60)); // 3 minutes + const periodMinutes = Math.round(TELEMETRY_INTERVALS.HEALTH_CHECK / (1000 * 60)); // 3 minutes const eventCount = periodSubmittedCount + 1; const eventText = eventCount === 1 ? 'event' : 'events'; this.logger.info(`[TELEMETRY PROOFS] PROOF_SUBMITTED received from ot-node and emitted to QuestDB for ${blockchainId} ${operationDetails} - ${eventCount} ${eventText} received in last ${periodMinutes} minutes`); @@ -769,7 +844,7 @@ class QuestTelemetry { }); // Limit size - if (this.metrics.recentProofEvents.length > this.metrics.maxRecentEvents) { + if (this.metrics.recentProofEvents.length > TELEMETRY_BATCH_CONFIG.MAX_RECENT_EVENTS) { this.metrics.recentProofEvents.shift(); }