Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"description": "A reference implementation of the NABHAS message broker, implemented in Node.js. Designed for demonstration, testing, and as an example for production-ready systems.",
"author": "DeX Group, LLC",
"license": "Apache-2.0",
"type": "module",
"homepage": "https://github.com/DeX-Group-LLC/message-broker-node#readme",
"repository": {
"type": "git",
Expand All @@ -21,8 +20,8 @@
"typescript"
],
"scripts": {
"start": "ts-node src/server.ts",
"start:client": "ts-node examples/websocket-client.ts",
"start": "ts-node -r tsconfig-paths/register src/server.ts",
"start:client": "ts-node -r tsconfig-paths/register examples/websocket-client.ts",
"test": "jest --coverage --maxWorkers=4",
"coveralls": "jest --coverage && coveralls < coverage/lcov.info"
},
Expand Down
16 changes: 9 additions & 7 deletions src/core/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import { MonitoringManager } from '@core/monitoring';
import { ServiceRegistry } from '@core/registry';
import { MessageRouter } from '@core/router';
import { SubscriptionManager } from '@core/subscription';
import logger from '@utils/logger';
import { SetupLogger } from '@utils/logger';

const logger = SetupLogger('MessageBroker');

export class MessageBroker {
private wss: WebSocketServer;
Expand All @@ -31,15 +33,15 @@ export class MessageBroker {
this.messageRouter.assignServiceRegistry(this.serviceRegistry);
this.wss = createWebSocketServer(this.connectionManager);
this.createdAt = new Date();
logger.info(`[MessageBroker] Created at ${this.createdAt.toISOString()}`);
logger.info(`[MessageBroker] Listening on ${config.host}:${config.port} (WebSocket)`);
logger.info(`Created at ${this.createdAt.toISOString()}`);
logger.info(`Listening on ${config.host}:${config.port} (WebSocket)`);
}

/**
* Shuts down the Message Broker.
*/
async shutdown(): Promise<void> {
logger.info('[MessageBroker] Shutting down...');
logger.info('Shutting down...');

// Clear all requests
await this.messageRouter.dispose();
Expand All @@ -57,15 +59,15 @@ export class MessageBroker {
await new Promise<void>((resolve, reject) => {
this.wss.close((err) => {
if (err) {
logger.error('[MessageBroker] Error closing WebSocket server:', err);
logger.error('Error closing WebSocket server', { error: err });
reject(err);
} else {
logger.info('[MessageBroker] WebSocket server closed');
logger.info('WebSocket server closed');
resolve();
}
});
});

logger.info('[MessageBroker] Shutdown complete.');
logger.info('Shutdown complete.');
}
}
28 changes: 15 additions & 13 deletions src/core/connection/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import { ServiceRegistry } from '@core/registry';
import { MessageRouter } from '@core/router';
import { MessageUtils, Header, Payload } from '@core/utils';
import { ActionType } from '@core/types';
import logger from '@utils/logger';
import { SetupLogger } from '@utils/logger';
import { ConnectionMetrics } from './metrics';

const logger = SetupLogger('ConnectionManager');

/**
* The header for an error message when the message header are malformed.
*/
Expand Down Expand Up @@ -48,7 +50,7 @@ export class ConnectionManager {
this.serviceRegistry.registerService(connection.serviceId);
// Update metrics
this.metrics.onConnectionEstablished();
logger.info(`[ConnectionManager] Added connection for service ${connection.serviceId} (IP ${connection.ip})`);
logger.info(`Added connection for service ${connection.serviceId} (IP ${connection.ip})`);
} catch (error) {
// If registration fails, remove the connection
this.connections.delete(connection.serviceId);
Expand All @@ -71,7 +73,7 @@ export class ConnectionManager {
this.connections.delete(serviceId);
this.serviceRegistry.unregisterService(serviceId);
this.metrics.onConnectionClosed();
logger.info(`[ConnectionManager] Removed connection for service ${serviceId} (IP ${connection.ip})`);
logger.info(`Removed connection for service ${serviceId} (IP ${connection.ip})`);
}
}

Expand All @@ -86,19 +88,19 @@ export class ConnectionManager {
const connection = this.connections.get(serviceId);

if (!connection) {
logger.warn(`[ConnectionManager] Unable to send message to service ${serviceId}: connection not found`);
logger.warn(`Unable to send message to service ${serviceId}: connection not found`);
return;
}

if (connection.state !== ConnectionState.OPEN) {
logger.warn(`[ConnectionManager] Unable to send message to service ${serviceId}: Connection is not open`);
logger.warn(`Unable to send message to service ${serviceId}: Connection is not open`);
connection.close();
this.removeConnection(serviceId);
throw new InternalError('Desired service connection is not open');
}

connection.send(MessageUtils.serialize(header, payload));
logger.info(`[ConnectionManager] Sent message to service ${serviceId}`);
//logger.debug(`Sent message ${header.action}:${header.topic}:${header.version}:${header.requestid ? ':' +header.requestid : ''} to ${serviceId}`, { header, payload, serviceId });
}

/**
Expand All @@ -111,7 +113,7 @@ export class ConnectionManager {
let header: Header | null = null;
let payload: Payload | null = null;

logger.info(`[ConnectionManager] Received message from service ${connection.serviceId} (IP ${connection.ip})`);
//logger.debug(`Received message from service ${connection.serviceId} (IP ${connection.ip})`);

try {
// Create the parser and parse the message
Expand All @@ -126,24 +128,24 @@ export class ConnectionManager {
header = header !== null ? { ...header, action: ActionType.RESPONSE } : ERROR_HEADER;
// If the error is a MessageError, send an error message to the client
if (error instanceof MessageError) {
logger.error(`[ConnectionManager] [${error.code}] ${error.message}`, error.details);
logger.error(`[${error.code}] ${error.message}`, error.details);
connection.send(MessageUtils.serialize(header, { error: error.toJSON() }));
return;
} else if (error instanceof Error) {
if (header == ERROR_HEADER) {
// If the header is null, the message is malformed
logger.error('[ConnectionManager] Unexpected error while parsing message header:', error);
logger.error('Unexpected error while parsing message header:', error);
connection.send(MessageUtils.serialize(header, { error: new MalformedMessageError('Unexpected error while parsing message header').toJSON() }));
return;
} else if (payload == null) {
// If the payload is null, the message is malformed
logger.error('[ConnectionManager] Unexpected error while parsing message payload:', error);
logger.error('Unexpected error while parsing message payload:', error);
connection.send(MessageUtils.serialize(header, { error: new MalformedMessageError('Unexpected error while parsing message payload').toJSON() }));
return;
} else {
// If the error is not a MessageError and the header and payload are not null, then the error is during routing
// Send an internal error message to the client
logger.error('[ConnectionManager] An unexpected error while routing the message:', error);
logger.error('An unexpected error while routing the message:', error);
connection.send(MessageUtils.serialize(header, { error: new InternalError('An unexpected error while routing the message').toJSON() }));
return;
}
Expand Down Expand Up @@ -188,8 +190,8 @@ export class ConnectionManager {
connection.close();
}
this.connections.clear();
logger.info('[ConnectionManager] Closed all connections');
logger.info('Closed all connections');
this.metrics.dispose();
logger.info('[ConnectionManager] Disposed of all metrics');
logger.info('Disposed of all metrics');
}
}
4 changes: 3 additions & 1 deletion src/core/connection/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import { IncomingMessage } from 'http';
import { WebSocketServer, WebSocket } from 'ws';
import { config } from '@config';
import { InternalError } from '@core/errors';
import logger from '@utils/logger';
import { SetupLogger} from '@utils/logger';
import { ConnectionManager } from './manager';
import { Connection, ConnectionState } from './types';

const logger = SetupLogger('WebSocketConnection');

export class WebSocketConnection implements Connection {
serviceId!: string; // This will be set by the ConnectionManager

Expand Down
60 changes: 47 additions & 13 deletions src/core/monitoring/manager.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import { InternalError } from '@core/errors';
import logger from '@utils/logger';
import { SetupLogger } from '@utils/logger';
import { Metric, ParameterizedMetric } from './metrics';
import { BaseSlot, IReadOnlySlot } from './metrics/slots';

const logger = SetupLogger('MonitoringManager');

export interface MetricInfo {
name: string;
type: string;
timestamp: string;
value: number;
}

/**
* Manages all metrics in the application, providing registration and lookup functionality.
* Metric names follow topic naming conventions (e.g., 'system.cpu.usage').
Expand Down Expand Up @@ -110,25 +119,50 @@ export class MonitoringManager {
this.metrics.clear();
this.parameterizedMetrics.clear();

logger.info('[MonitoringManager] Cleared all metrics');
logger.info('Cleared all metrics');
}

/**
* Serializes all metrics to a JSON object.
* @returns A JSON object containing all metrics.
*/
serializeMetrics(): Record<string, any> {
const metrics: Record<string, any> = {};
// Serialize all metrics:
for (const metric of this.metrics.values()) {
metrics[metric.name] = metric.slot.value;
}
// Serialize all parameterized metrics:
for (const metric of this.parameterizedMetrics.values()) {
for (const metricInstance of metric.allMetrics) {
metrics[metricInstance.name] = metricInstance.slot.value;
serializeMetrics(showAll: boolean): Record<string, number> | Record<string, MetricInfo> {
if (showAll) {
const metrics: Record<string, MetricInfo> = {};
// Serialize all metrics:
for (const metric of this.metrics.values()) {
metrics[metric.name] = {
name: metric.name,
type: metric.slot.constructor.name.toLowerCase().replace('slot', ''),
timestamp: metric.slot.lastModified.toISOString(),
value: metric.slot.value
};
}
// Serialize all parameterized metrics:
for (const metric of this.parameterizedMetrics.values()) {
for (const metricInstance of metric.allMetrics) {
metrics[metricInstance.name] = {
name: metricInstance.name,
type: metricInstance.slot.constructor.name.toLowerCase().replace('slot', ''),
timestamp: metricInstance.slot.lastModified.toISOString(),
value: metricInstance.slot.value
};
}
}
return metrics;
} else {
const metrics: Record<string, number> = {};
// Serialize all metrics:
for (const metric of this.metrics.values()) {
metrics[metric.name] = metric.slot.value;
}
// Serialize all parameterized metrics:
for (const metric of this.parameterizedMetrics.values()) {
for (const metricInstance of metric.allMetrics) {
metrics[metricInstance.name] = metricInstance.slot.value;
}
}
return metrics;
}
return metrics;
}
}
2 changes: 1 addition & 1 deletion src/core/monitoring/metrics/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const METRIC_NAME_REGEX = /^[a-z][a-z0-9]*(\.[a-z][a-z0-9]*|\.\{[a-z]+:[^:}]+\})
* This is the primary metric type used for monitoring values.
* TSlot is the type of slot that handles value storage and behavior
*/
export class Metric<TSlot extends BaseSlot = IManageableSlot> extends EventEmitter {
export class Metric<TSlot extends BaseSlot = BaseSlot> extends EventEmitter {
/** The slot that handles value storage and behavior */
private _slot: TSlot;

Expand Down
2 changes: 1 addition & 1 deletion src/core/monitoring/metrics/parameterized.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface ExtractResult {
* metric.getMetric({ topic: 'events.europe' }).set(1); // Creates metric 'router.message.rate.{topic:events.europe}'
* ```
*/
export class ParameterizedMetric<TSlot extends BaseSlot = IManageableSlot> extends EventEmitter {
export class ParameterizedMetric<TSlot extends BaseSlot = BaseSlot> extends EventEmitter {
private pattern: RegExp;
private metrics = new Map<string, Metric<TSlot>>();

Expand Down
9 changes: 9 additions & 0 deletions src/core/monitoring/metrics/slots/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,20 @@ import { IManageableSlot } from './interface';
* Handles simple value storage and retrieval.
*/
export abstract class BaseSlot implements IManageableSlot {
protected _lastModified: Date = new Date();

/**
* Gets the current value.
*/
abstract get value(): number;

/**
* Gets the last modified time.
*/
get lastModified(): Date {
return this._lastModified;
}

/**
* Resets the value.
*/
Expand Down
3 changes: 3 additions & 0 deletions src/core/monitoring/metrics/slots/gauge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ export class GaugeSlot extends BaseSlot implements ISlotAddable, ISlotSettable {
*/
set(value: number): void {
this._value = value;
this._lastModified = new Date();
}

/**
* Adds a value to the current value.
*/
add(value: number): void {
this._value += value;
this._lastModified = new Date();
}

/**
Expand All @@ -34,6 +36,7 @@ export class GaugeSlot extends BaseSlot implements ISlotAddable, ISlotSettable {
*/
reset(): void {
this._value = 0;
this._lastModified = new Date();
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/core/monitoring/metrics/slots/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ export interface IReadOnlySlot extends IBaseSlot {
* Gets the current value.
*/
get value(): number;

/**
* Gets the last modified time.
*/
get lastModified(): Date;
}

/**
Expand Down
3 changes: 3 additions & 0 deletions src/core/monitoring/metrics/slots/rate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class RateSlot extends BaseSlot implements ISlotAddable {
// Store the current interval's value before reset
slot._lastIntervalValue = slot._currentIntervalValue;
slot._currentIntervalValue = 0;
slot._lastModified = new Date();
}
}, 1000);
// Ensure cleanup on exit
Expand All @@ -45,6 +46,7 @@ export class RateSlot extends BaseSlot implements ISlotAddable {
*/
add(value: number): void {
this._currentIntervalValue += value;
// We don't need to update the last modified time here because get value() uses the last interval value
}

/**
Expand All @@ -67,6 +69,7 @@ export class RateSlot extends BaseSlot implements ISlotAddable {
reset(): void {
this._lastIntervalValue = 0;
this._currentIntervalValue = 0;
this._lastModified = new Date();
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/core/monitoring/metrics/slots/uptime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class UptimeSlot extends BaseSlot implements ISlotSettable<Date> {
*/
set(value: Date): void {
this._startTime = value;
this._lastModified = new Date();
}

/**
Expand All @@ -37,6 +38,7 @@ export class UptimeSlot extends BaseSlot implements ISlotSettable<Date> {
*/
reset(): void {
this._startTime = new Date();
this._lastModified = new Date();
}

/**
Expand Down
Loading
Loading