diff --git a/package.json b/package.json index adf5427..cbfdff0 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,6 @@ "description": "A reference implementation of the NABHAS message broker, implemented in Node.js. Designed for demonstration, testing, and as an example for production-ready systems.", "author": "DeX Group, LLC", "license": "Apache-2.0", - "type": "module", "homepage": "https://github.com/DeX-Group-LLC/message-broker-node#readme", "repository": { "type": "git", @@ -21,8 +20,8 @@ "typescript" ], "scripts": { - "start": "ts-node src/server.ts", - "start:client": "ts-node examples/websocket-client.ts", + "start": "ts-node -r tsconfig-paths/register src/server.ts", + "start:client": "ts-node -r tsconfig-paths/register examples/websocket-client.ts", "test": "jest --coverage --maxWorkers=4", "coveralls": "jest --coverage && coveralls < coverage/lcov.info" }, diff --git a/src/core/broker.ts b/src/core/broker.ts index a6b0646..caf42f5 100644 --- a/src/core/broker.ts +++ b/src/core/broker.ts @@ -6,7 +6,9 @@ import { MonitoringManager } from '@core/monitoring'; import { ServiceRegistry } from '@core/registry'; import { MessageRouter } from '@core/router'; import { SubscriptionManager } from '@core/subscription'; -import logger from '@utils/logger'; +import { SetupLogger } from '@utils/logger'; + +const logger = SetupLogger('MessageBroker'); export class MessageBroker { private wss: WebSocketServer; @@ -31,15 +33,15 @@ export class MessageBroker { this.messageRouter.assignServiceRegistry(this.serviceRegistry); this.wss = createWebSocketServer(this.connectionManager); this.createdAt = new Date(); - logger.info(`[MessageBroker] Created at ${this.createdAt.toISOString()}`); - logger.info(`[MessageBroker] Listening on ${config.host}:${config.port} (WebSocket)`); + logger.info(`Created at ${this.createdAt.toISOString()}`); + logger.info(`Listening on ${config.host}:${config.port} (WebSocket)`); } /** * Shuts down the Message Broker. */ async shutdown(): Promise { - logger.info('[MessageBroker] Shutting down...'); + logger.info('Shutting down...'); // Clear all requests await this.messageRouter.dispose(); @@ -57,15 +59,15 @@ export class MessageBroker { await new Promise((resolve, reject) => { this.wss.close((err) => { if (err) { - logger.error('[MessageBroker] Error closing WebSocket server:', err); + logger.error('Error closing WebSocket server', { error: err }); reject(err); } else { - logger.info('[MessageBroker] WebSocket server closed'); + logger.info('WebSocket server closed'); resolve(); } }); }); - logger.info('[MessageBroker] Shutdown complete.'); + logger.info('Shutdown complete.'); } } diff --git a/src/core/connection/manager.ts b/src/core/connection/manager.ts index 171d29f..d80b29d 100644 --- a/src/core/connection/manager.ts +++ b/src/core/connection/manager.ts @@ -6,9 +6,11 @@ import { ServiceRegistry } from '@core/registry'; import { MessageRouter } from '@core/router'; import { MessageUtils, Header, Payload } from '@core/utils'; import { ActionType } from '@core/types'; -import logger from '@utils/logger'; +import { SetupLogger } from '@utils/logger'; import { ConnectionMetrics } from './metrics'; +const logger = SetupLogger('ConnectionManager'); + /** * The header for an error message when the message header are malformed. */ @@ -48,7 +50,7 @@ export class ConnectionManager { this.serviceRegistry.registerService(connection.serviceId); // Update metrics this.metrics.onConnectionEstablished(); - logger.info(`[ConnectionManager] Added connection for service ${connection.serviceId} (IP ${connection.ip})`); + logger.info(`Added connection for service ${connection.serviceId} (IP ${connection.ip})`); } catch (error) { // If registration fails, remove the connection this.connections.delete(connection.serviceId); @@ -71,7 +73,7 @@ export class ConnectionManager { this.connections.delete(serviceId); this.serviceRegistry.unregisterService(serviceId); this.metrics.onConnectionClosed(); - logger.info(`[ConnectionManager] Removed connection for service ${serviceId} (IP ${connection.ip})`); + logger.info(`Removed connection for service ${serviceId} (IP ${connection.ip})`); } } @@ -86,19 +88,19 @@ export class ConnectionManager { const connection = this.connections.get(serviceId); if (!connection) { - logger.warn(`[ConnectionManager] Unable to send message to service ${serviceId}: connection not found`); + logger.warn(`Unable to send message to service ${serviceId}: connection not found`); return; } if (connection.state !== ConnectionState.OPEN) { - logger.warn(`[ConnectionManager] Unable to send message to service ${serviceId}: Connection is not open`); + logger.warn(`Unable to send message to service ${serviceId}: Connection is not open`); connection.close(); this.removeConnection(serviceId); throw new InternalError('Desired service connection is not open'); } connection.send(MessageUtils.serialize(header, payload)); - logger.info(`[ConnectionManager] Sent message to service ${serviceId}`); + //logger.debug(`Sent message ${header.action}:${header.topic}:${header.version}:${header.requestid ? ':' +header.requestid : ''} to ${serviceId}`, { header, payload, serviceId }); } /** @@ -111,7 +113,7 @@ export class ConnectionManager { let header: Header | null = null; let payload: Payload | null = null; - logger.info(`[ConnectionManager] Received message from service ${connection.serviceId} (IP ${connection.ip})`); + //logger.debug(`Received message from service ${connection.serviceId} (IP ${connection.ip})`); try { // Create the parser and parse the message @@ -126,24 +128,24 @@ export class ConnectionManager { header = header !== null ? { ...header, action: ActionType.RESPONSE } : ERROR_HEADER; // If the error is a MessageError, send an error message to the client if (error instanceof MessageError) { - logger.error(`[ConnectionManager] [${error.code}] ${error.message}`, error.details); + logger.error(`[${error.code}] ${error.message}`, error.details); connection.send(MessageUtils.serialize(header, { error: error.toJSON() })); return; } else if (error instanceof Error) { if (header == ERROR_HEADER) { // If the header is null, the message is malformed - logger.error('[ConnectionManager] Unexpected error while parsing message header:', error); + logger.error('Unexpected error while parsing message header:', error); connection.send(MessageUtils.serialize(header, { error: new MalformedMessageError('Unexpected error while parsing message header').toJSON() })); return; } else if (payload == null) { // If the payload is null, the message is malformed - logger.error('[ConnectionManager] Unexpected error while parsing message payload:', error); + logger.error('Unexpected error while parsing message payload:', error); connection.send(MessageUtils.serialize(header, { error: new MalformedMessageError('Unexpected error while parsing message payload').toJSON() })); return; } else { // If the error is not a MessageError and the header and payload are not null, then the error is during routing // Send an internal error message to the client - logger.error('[ConnectionManager] An unexpected error while routing the message:', error); + logger.error('An unexpected error while routing the message:', error); connection.send(MessageUtils.serialize(header, { error: new InternalError('An unexpected error while routing the message').toJSON() })); return; } @@ -188,8 +190,8 @@ export class ConnectionManager { connection.close(); } this.connections.clear(); - logger.info('[ConnectionManager] Closed all connections'); + logger.info('Closed all connections'); this.metrics.dispose(); - logger.info('[ConnectionManager] Disposed of all metrics'); + logger.info('Disposed of all metrics'); } } \ No newline at end of file diff --git a/src/core/connection/websocket.ts b/src/core/connection/websocket.ts index 047fe22..ebeaa5e 100644 --- a/src/core/connection/websocket.ts +++ b/src/core/connection/websocket.ts @@ -2,10 +2,12 @@ import { IncomingMessage } from 'http'; import { WebSocketServer, WebSocket } from 'ws'; import { config } from '@config'; import { InternalError } from '@core/errors'; -import logger from '@utils/logger'; +import { SetupLogger} from '@utils/logger'; import { ConnectionManager } from './manager'; import { Connection, ConnectionState } from './types'; +const logger = SetupLogger('WebSocketConnection'); + export class WebSocketConnection implements Connection { serviceId!: string; // This will be set by the ConnectionManager diff --git a/src/core/monitoring/manager.ts b/src/core/monitoring/manager.ts index c651c7c..7d529db 100644 --- a/src/core/monitoring/manager.ts +++ b/src/core/monitoring/manager.ts @@ -1,8 +1,17 @@ import { InternalError } from '@core/errors'; -import logger from '@utils/logger'; +import { SetupLogger } from '@utils/logger'; import { Metric, ParameterizedMetric } from './metrics'; import { BaseSlot, IReadOnlySlot } from './metrics/slots'; +const logger = SetupLogger('MonitoringManager'); + +export interface MetricInfo { + name: string; + type: string; + timestamp: string; + value: number; +} + /** * Manages all metrics in the application, providing registration and lookup functionality. * Metric names follow topic naming conventions (e.g., 'system.cpu.usage'). @@ -110,25 +119,50 @@ export class MonitoringManager { this.metrics.clear(); this.parameterizedMetrics.clear(); - logger.info('[MonitoringManager] Cleared all metrics'); + logger.info('Cleared all metrics'); } /** * Serializes all metrics to a JSON object. * @returns A JSON object containing all metrics. */ - serializeMetrics(): Record { - const metrics: Record = {}; - // Serialize all metrics: - for (const metric of this.metrics.values()) { - metrics[metric.name] = metric.slot.value; - } - // Serialize all parameterized metrics: - for (const metric of this.parameterizedMetrics.values()) { - for (const metricInstance of metric.allMetrics) { - metrics[metricInstance.name] = metricInstance.slot.value; + serializeMetrics(showAll: boolean): Record | Record { + if (showAll) { + const metrics: Record = {}; + // Serialize all metrics: + for (const metric of this.metrics.values()) { + metrics[metric.name] = { + name: metric.name, + type: metric.slot.constructor.name.toLowerCase().replace('slot', ''), + timestamp: metric.slot.lastModified.toISOString(), + value: metric.slot.value + }; + } + // Serialize all parameterized metrics: + for (const metric of this.parameterizedMetrics.values()) { + for (const metricInstance of metric.allMetrics) { + metrics[metricInstance.name] = { + name: metricInstance.name, + type: metricInstance.slot.constructor.name.toLowerCase().replace('slot', ''), + timestamp: metricInstance.slot.lastModified.toISOString(), + value: metricInstance.slot.value + }; + } + } + return metrics; + } else { + const metrics: Record = {}; + // Serialize all metrics: + for (const metric of this.metrics.values()) { + metrics[metric.name] = metric.slot.value; + } + // Serialize all parameterized metrics: + for (const metric of this.parameterizedMetrics.values()) { + for (const metricInstance of metric.allMetrics) { + metrics[metricInstance.name] = metricInstance.slot.value; + } } + return metrics; } - return metrics; } } diff --git a/src/core/monitoring/metrics/metric.ts b/src/core/monitoring/metrics/metric.ts index 80917b0..5ff9d66 100644 --- a/src/core/monitoring/metrics/metric.ts +++ b/src/core/monitoring/metrics/metric.ts @@ -10,7 +10,7 @@ const METRIC_NAME_REGEX = /^[a-z][a-z0-9]*(\.[a-z][a-z0-9]*|\.\{[a-z]+:[^:}]+\}) * This is the primary metric type used for monitoring values. * TSlot is the type of slot that handles value storage and behavior */ -export class Metric extends EventEmitter { +export class Metric extends EventEmitter { /** The slot that handles value storage and behavior */ private _slot: TSlot; diff --git a/src/core/monitoring/metrics/parameterized.ts b/src/core/monitoring/metrics/parameterized.ts index 3a7b5d1..1255bcd 100644 --- a/src/core/monitoring/metrics/parameterized.ts +++ b/src/core/monitoring/metrics/parameterized.ts @@ -28,7 +28,7 @@ export interface ExtractResult { * metric.getMetric({ topic: 'events.europe' }).set(1); // Creates metric 'router.message.rate.{topic:events.europe}' * ``` */ -export class ParameterizedMetric extends EventEmitter { +export class ParameterizedMetric extends EventEmitter { private pattern: RegExp; private metrics = new Map>(); diff --git a/src/core/monitoring/metrics/slots/base.ts b/src/core/monitoring/metrics/slots/base.ts index 56eb742..07e1a28 100644 --- a/src/core/monitoring/metrics/slots/base.ts +++ b/src/core/monitoring/metrics/slots/base.ts @@ -5,11 +5,20 @@ import { IManageableSlot } from './interface'; * Handles simple value storage and retrieval. */ export abstract class BaseSlot implements IManageableSlot { + protected _lastModified: Date = new Date(); + /** * Gets the current value. */ abstract get value(): number; + /** + * Gets the last modified time. + */ + get lastModified(): Date { + return this._lastModified; + } + /** * Resets the value. */ diff --git a/src/core/monitoring/metrics/slots/gauge.ts b/src/core/monitoring/metrics/slots/gauge.ts index 834961f..2810834 100644 --- a/src/core/monitoring/metrics/slots/gauge.ts +++ b/src/core/monitoring/metrics/slots/gauge.ts @@ -13,6 +13,7 @@ export class GaugeSlot extends BaseSlot implements ISlotAddable, ISlotSettable { */ set(value: number): void { this._value = value; + this._lastModified = new Date(); } /** @@ -20,6 +21,7 @@ export class GaugeSlot extends BaseSlot implements ISlotAddable, ISlotSettable { */ add(value: number): void { this._value += value; + this._lastModified = new Date(); } /** @@ -34,6 +36,7 @@ export class GaugeSlot extends BaseSlot implements ISlotAddable, ISlotSettable { */ reset(): void { this._value = 0; + this._lastModified = new Date(); } /** diff --git a/src/core/monitoring/metrics/slots/interface.ts b/src/core/monitoring/metrics/slots/interface.ts index 69bf4ca..4e52b7d 100644 --- a/src/core/monitoring/metrics/slots/interface.ts +++ b/src/core/monitoring/metrics/slots/interface.ts @@ -9,6 +9,11 @@ export interface IReadOnlySlot extends IBaseSlot { * Gets the current value. */ get value(): number; + + /** + * Gets the last modified time. + */ + get lastModified(): Date; } /** diff --git a/src/core/monitoring/metrics/slots/rate.ts b/src/core/monitoring/metrics/slots/rate.ts index 9bd04f3..660473a 100644 --- a/src/core/monitoring/metrics/slots/rate.ts +++ b/src/core/monitoring/metrics/slots/rate.ts @@ -27,6 +27,7 @@ export class RateSlot extends BaseSlot implements ISlotAddable { // Store the current interval's value before reset slot._lastIntervalValue = slot._currentIntervalValue; slot._currentIntervalValue = 0; + slot._lastModified = new Date(); } }, 1000); // Ensure cleanup on exit @@ -45,6 +46,7 @@ export class RateSlot extends BaseSlot implements ISlotAddable { */ add(value: number): void { this._currentIntervalValue += value; + // We don't need to update the last modified time here because get value() uses the last interval value } /** @@ -67,6 +69,7 @@ export class RateSlot extends BaseSlot implements ISlotAddable { reset(): void { this._lastIntervalValue = 0; this._currentIntervalValue = 0; + this._lastModified = new Date(); } /** diff --git a/src/core/monitoring/metrics/slots/uptime.ts b/src/core/monitoring/metrics/slots/uptime.ts index 59c4d49..f0b98e8 100644 --- a/src/core/monitoring/metrics/slots/uptime.ts +++ b/src/core/monitoring/metrics/slots/uptime.ts @@ -15,6 +15,7 @@ export class UptimeSlot extends BaseSlot implements ISlotSettable { */ set(value: Date): void { this._startTime = value; + this._lastModified = new Date(); } /** @@ -37,6 +38,7 @@ export class UptimeSlot extends BaseSlot implements ISlotSettable { */ reset(): void { this._startTime = new Date(); + this._lastModified = new Date(); } /** diff --git a/src/core/registry/index.ts b/src/core/registry/index.ts index bc69408..41fa7e5 100644 --- a/src/core/registry/index.ts +++ b/src/core/registry/index.ts @@ -1,4 +1,4 @@ -import { LogEntry } from 'winston'; +import { LogEntry, transports } from 'winston'; import { config } from '@config'; import { ConnectionManager } from '@core/connection'; import { @@ -10,9 +10,11 @@ import { MonitoringManager } from '@core/monitoring'; import { SubscriptionManager } from '@core/subscription'; import { ActionType } from '@core/types'; import { Header, Message, TopicUtils } from '@core/utils'; -import logger from '@utils/logger'; +import { SetupLogger } from '@utils/logger'; import { RegistryMetrics } from './metrics'; +const logger = SetupLogger('ServiceRegistry'); + interface ServiceRegistration { id: string; name: string; @@ -22,7 +24,7 @@ interface ServiceRegistration { heartbeatRetryTimeout: NodeJS.Timeout; heartbeatDeregisterTimeout: NodeJS.Timeout; logSubscriptions: { - level: string; + levels: string[]; regex?: RegExp; }; metricSubscriptions: { @@ -44,8 +46,9 @@ export class ServiceRegistry { this.metrics.count.slot.set(0); // Hook into the logger: - logger.on('logged', this._handleLogBind); - logger.info('[ServiceRegistry] Hooked into the logger'); + logger.stream({ start: -1 }).on('log', this._handleLogBind); + logger.on('data', this._handleLogBind); + logger.info('Hooked into the logger'); } /** @@ -54,20 +57,25 @@ export class ServiceRegistry { * @param message The log message. */ private handleLog(message: LogEntry): void { + //logger.off('data', this._handleLogBind); const header: Header = { action: ActionType.RESPONSE, topic: 'system.log', version: '1.0.0', }; - const payload = { level: message.level, message: message.message }; // Loop through all services and check if the log message matches the service's log subscriptions for (const service of this.services.values()) { // Check if the service has subscribed to the log level and code (if codes are empty, subscribe to all codes) - if (service.logSubscriptions.level === message.level && (service.logSubscriptions.regex === undefined || service.logSubscriptions.regex.test(message.message))) { + if (service.logSubscriptions.levels.includes(message.level) && (service.logSubscriptions.regex === undefined || service.logSubscriptions.regex.test(message.message))) { // Send the log message to the service - this.connectionManager.sendMessage(service.id, header, payload); + try { + this.connectionManager.sendMessage(service.id, header, message); + } catch (error) { + logger.error(`Error sending log message to service ${service.id}:`, error); + } } } + //logger.on('data', this._handleLogBind); } private _handleLogBind = this.handleLog.bind(this); @@ -88,14 +96,14 @@ export class ServiceRegistry { this.unregisterService(service.id); } this.services.clear(); - logger.info('[ServiceRegistry] Unregistered all services'); + logger.info('Unregistered all services'); this.metrics.dispose(); - logger.info('[ServiceRegistry] Disposed of all metrics'); + logger.info('Disposed of all metrics'); // Unhook from the logger: - logger.off('log', this._handleLogBind); - logger.info('[ServiceRegistry] Unhooked from the logger'); + logger.off('data', this._handleLogBind); + logger.info('Unhooked from the logger'); } /** @@ -117,7 +125,7 @@ export class ServiceRegistry { existingService.lastHeartbeat = now; this.resetHeartbeat(existingService); this.services.set(serviceId, existingService); - logger.info(`[ServiceRegistry] Service ${serviceId} updated.`); + logger.info(`Service ${serviceId} updated.`, { name, description }); } else { // Add the new service this.services.set(serviceId, { @@ -126,12 +134,12 @@ export class ServiceRegistry { description, connectedAt: now, lastHeartbeat: now, - logSubscriptions: { level: '', regex: undefined }, // Default log subscription level + logSubscriptions: { levels: [], regex: undefined }, // Default log subscription level metricSubscriptions: { metrics: [], frequency: 0 }, heartbeatRetryTimeout: setTimeout(this.sendHeartbeat.bind(this, serviceId), config.connection.heartbeatRetryTimeout), heartbeatDeregisterTimeout: setTimeout(this.unregisterService.bind(this, serviceId), config.connection.heartbeatDeregisterTimeout), }); - logger.info(`[ServiceRegistry] Service ${serviceId} registered.`); + logger.info(`Service ${serviceId} registered.`, { name, description }); // Register parameterized metrics for the new service this.metrics.serviceUptime.registerMetric({ serviceId }); @@ -158,7 +166,7 @@ export class ServiceRegistry { this.services.delete(serviceId); this.subscriptionManager.unsubscribe(serviceId); this.connectionManager.removeConnection(serviceId); - logger.info(`[ServiceRegistry] Service ${serviceId} unregistered.`); + logger.info(`Service ${serviceId} unregistered.`, { name: service.name, description: service.description }); } this.metrics.count.slot.set(this.services.size); @@ -182,7 +190,7 @@ export class ServiceRegistry { * @param message The received message. */ handleSystemMessage(serviceId: string, message: Message): void { - logger.info(`[ServiceRegistry] Received system message from ${serviceId}:`, message); + logger.info(`Received system message ${message.header.action}:${message.header.topic}:${message.header.version}:${message.header.requestid ? ':' +message.header.requestid : ''}`, { header: message.header, serviceId }); // Check the actions are valid if (message.header.topic === 'system.heartbeat') { @@ -198,7 +206,7 @@ export class ServiceRegistry { try { switch (message.header.topic) { case 'system.heartbeat': - this.handleHeartbeatRequest(serviceId, message); + this.handleHeartbeatMessage(serviceId, message); break; case 'system.log.subscribe': this.handleLogSubscribe(serviceId, message); @@ -206,8 +214,8 @@ export class ServiceRegistry { case 'system.log.unsubscribe': this.handleLogUnsubscribe(serviceId, message); break; - case 'system.metric': - this.handleMetricRequest(serviceId, message); + case 'system.metrics': + this.handleMetricsRequest(serviceId, message); break; case 'system.service.list': this.handleServiceList(serviceId, message); @@ -228,7 +236,7 @@ export class ServiceRegistry { throw new TopicNotSupportedError(`Unknown system message topic: ${message.header.topic}`); } } catch (error) { - logger.error(`[ServiceRegistry] Error handling system message from ${serviceId}:`, error); + logger.error(`Error handling system message ${message.header.action}:${message.header.topic}:${message.header.version}:${message.header.requestid ? ':' +message.header.requestid : ''}:`, { error, serviceId }); this.metrics.serviceErrorRate.getMetric({ serviceId })?.slot.add(1); throw error; } @@ -257,7 +265,7 @@ export class ServiceRegistry { * @param serviceId The ID of the service requesting the metrics. * @param message The message to handle. */ - private handleHeartbeatRequest(serviceId: string, message: Message): void { + private handleHeartbeatMessage(serviceId: string, message: Message): void { // Check if the service exists const service = this.services.get(serviceId); if (!service) { @@ -288,12 +296,20 @@ export class ServiceRegistry { throw new ServiceUnavailableError(`Service ${serviceId} not found`); } - let { level = 'error', regex = undefined } = message.payload; // Default to error if no level is specified + let { levels = ['error'], regex = undefined } = message.payload; // Default to error if no level is specified // Validate the level const validLevels = ['debug', 'info', 'warn', 'error']; - if (typeof level !== 'string' || !validLevels.includes(level)) { - throw new InvalidRequestError('Invalid log level', { level }); + if (levels) { + if (Array.isArray(levels)) { + for (const level of levels) { + if (typeof level !== 'string' || !validLevels.includes(level)) { + throw new InvalidRequestError('Invalid log level', { level }); + } + } + } else { + throw new InvalidRequestError('Invalid log levels', { levels }); + } } // Validate the regex (if provided) @@ -309,7 +325,7 @@ export class ServiceRegistry { } // Update the service's log subscriptions - service.logSubscriptions = { level, regex }; + service.logSubscriptions = { levels, regex }; // Subscribe to the log level and regex this.subscriptionManager.subscribe(serviceId, `system.log`); @@ -334,7 +350,7 @@ export class ServiceRegistry { } // Reset log subscriptions to default (no level, no codes) - service.logSubscriptions = { level: '', regex: undefined }; + service.logSubscriptions = { levels: [], regex: undefined }; // Unsubscribe from the log level and regex this.subscriptionManager.unsubscribe(serviceId, `system.log`); @@ -346,12 +362,12 @@ export class ServiceRegistry { } /** - * Handles a metric request. + * Handles a metrics request. * * @param serviceId The ID of the service requesting the metrics. * @param message The message to handle. */ - private handleMetricRequest(serviceId: string, message: Message): void { + private handleMetricsRequest(serviceId: string, message: Message): void { // Check if the service exists const service = this.services.get(serviceId); if (!service) { @@ -360,8 +376,7 @@ export class ServiceRegistry { } // TODO: Implement logic to fetch the latest metrics - // const metrics = this.monitoring.getLatestMetrics(); - const metrics = this.monitoringManager.serializeMetrics(); + const metrics = this.monitoringManager.serializeMetrics(message.payload.showAll); const responseHeader = { ...message.header, action: ActionType.RESPONSE }; const responsePayload = { metrics }; @@ -396,7 +411,6 @@ export class ServiceRegistry { * @param message The message to handle. */ private handleServiceRegister(serviceId: string, message: Message): void { - logger.info(`Service ${serviceId} registered with header:`, message.header, `and payload:`, message.payload); const { name, description } = message.payload; // Validate the payload diff --git a/src/core/router/index.ts b/src/core/router/index.ts index 4aafd66..b3adc15 100644 --- a/src/core/router/index.ts +++ b/src/core/router/index.ts @@ -6,7 +6,9 @@ import { ServiceRegistry } from '@core/registry'; import { SubscriptionManager } from '@core/subscription'; import { ActionType } from '@core/types'; import { Message, Header } from '@core/utils'; -import logger from '@utils/logger'; +import { SetupLogger } from '@utils/logger'; + +const logger = SetupLogger('MessageRouter'); export interface Request { originServiceId: string; @@ -309,6 +311,6 @@ export class MessageRouter { } } this.requests.clear(); - logger.info('[MessageRouter] Cleared all outstanding requests'); + logger.info('Cleared all outstanding requests'); } } \ No newline at end of file diff --git a/src/core/subscription/index.ts b/src/core/subscription/index.ts index 1a7dafc..1b98768 100644 --- a/src/core/subscription/index.ts +++ b/src/core/subscription/index.ts @@ -1,5 +1,7 @@ import { TopicUtils } from '@core/utils'; -import logger from '@utils/logger'; +import { SetupLogger } from '@utils/logger'; + +const logger = SetupLogger('SubscriptionManager'); interface Subscriber { serviceId: string; @@ -153,6 +155,6 @@ export class SubscriptionManager { */ async dispose(): Promise { this.subscriptions.clear(); - logger.info('[SubscriptionManager] Cleared all subscriptions'); + logger.info('Cleared all subscriptions'); } } diff --git a/src/server.ts b/src/server.ts index eefed02..467823e 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,5 +1,7 @@ import { MessageBroker } from '@core/broker'; -import logger from '@utils/logger'; +import { SetupLogger } from '@utils/logger'; + +const logger = SetupLogger('Server'); /** * Starts the Message Broker server. diff --git a/src/utils/logger.ts b/src/utils/logger.ts index 9b3bf70..b961dbd 100644 --- a/src/utils/logger.ts +++ b/src/utils/logger.ts @@ -17,4 +17,8 @@ const logger = winston.createLogger({ ], }); +export function SetupLogger(module: string) { + return logger.child({ module }); +} + export default logger; \ No newline at end of file