Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
node_modules
coverage
coverage
data
22 changes: 20 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.7'
version: "3.7"
services:
rabbitmq:
image: "rabbitmq:3-management"
Expand All @@ -16,4 +16,22 @@ services:
labels:
NAME: "rabbitmq"
volumes:
- "./etc/rabbitmq/definitions.json:/etc/rabbitmq/definitions.json"
- "./etc/rabbitmq/definitions.json:/etc/rabbitmq/definitions.json"

redis:
image: "bitnami/redis:5.0"
container_name: "redis"
hostname: "redis"
restart: always
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL
ports:
- "6379:6379"
volumes:
- "./data/redis:/bitnami/redis/data"

rebrow:
image: "marian/rebrow"
ports:
- "8082:5001"
15 changes: 10 additions & 5 deletions etc/config.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
module.exports = {
pubsub : {
endpoint : process.env.PUBSUB_ENDPOINT || 'localhost:5672',
login : process.env.PUBSUB_LOGIN || 'test',
password : process.env.PUBSUB_PASSWORD || 'test'
}
rabbitPubsub: {
endpoint: process.env.PUBSUB_RABBIT_ENDPOINT || "localhost:5672",
login: process.env.PUBSUB_RABBIT_LOGIN || "test",
password: process.env.PUBSUB_RABBIT_PASSWORD || "test",
},
redisPubsub: {
port: process.env.PUBSUB_REDIS_PORT || 6379,
host: process.env.PUBSUB_REDIS_HOST || "localhost",
database: process.env.PUBSUB_REDIS_DATABASE || 1,
},
};
14 changes: 0 additions & 14 deletions examples/notificator/notify.js

This file was deleted.

14 changes: 14 additions & 0 deletions examples/notificator/notifyRabbit.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env node
const notificator = require("../../lib/notificatorSingletonRabbit");

async function main() {
let iterator = 0;
await notificator.init();

setInterval(() => {
notificator.notify({ text: `iteration ${iterator}` });
++iterator;
}, 5000);
}

main();
14 changes: 14 additions & 0 deletions examples/notificator/notifyRedis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env node
const notificator = require("../../lib/notificatorSingletonRedis");

async function main() {
let iterator = 0;
await notificator.init();

setInterval(() => {
notificator.notify({ text: `iteration ${iterator}` });
++iterator;
}, 5000);
}

main();
14 changes: 0 additions & 14 deletions examples/notificator/receive.js

This file was deleted.

14 changes: 14 additions & 0 deletions examples/notificator/receiveRabbit.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env node
const notificator = require("../../lib/notificatorSingletonRabbit");

async function main() {
await notificator.init();

notificator.receive(customMessageHandler);
}

function customMessageHandler(message) {
console.log(`Via notificator received ${JSON.stringify(message)}`);
}

main();
14 changes: 14 additions & 0 deletions examples/notificator/receiveRedis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env node
const notificator = require("../../lib/notificatorSingletonRedis");

async function main() {
await notificator.init();

notificator.receive(customMessageHandler);
}

function customMessageHandler(message) {
console.log(`Via notificator received ${JSON.stringify(message)}`);
}

main();
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
const notificator = require('./notificatorSingletone');
const notificator = require("./notificatorSingletonRabbit");

module.exports = notificator;
module.exports = notificator;
66 changes: 33 additions & 33 deletions lib/Notificator.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
class Notificator {
constructor(args) {
this.pubsub = args.pubsub;
this.isInited = false;
}
constructor(args) {
this.pubsub = args.pubsub;
this.isInitialized = false;
}

async init() {
if (this.isInited) return;
try {
console.info('Notificator initialization started...');
await this.pubsub.connect();
await this.pubsub.createChannel('notifications');
this.isInited = true;
console.info('Notificator initialization completed.');
} catch (error) {
console.error('Notificator initialization failend.');
console.error(error.message);
}
async init() {
if (this.isInitialized) return;
try {
console.info("Notificator initialization started...");
await this.pubsub.connect();
await this.pubsub.createChannel("notifications");
this.isInitialized = true;
console.info("Notificator initialization completed.");
} catch (error) {
console.error("Notificator initialization failend.");
console.error(error.message);
}
}

notify(message) {
if (!this.isInited) {
console.warn('Can not notify. Notificator not inited');
notify(message) {
if (!this.isInitialized) {
console.warn("Can not notify. Notificator not inited");

return;
}
try {
this.pubsub.publish('notifications', message);
} catch (error) {
console.error('Failed to notify');
console.error(error.message);
}
return;
}
try {
this.pubsub.publish("notifications", message);
} catch (error) {
console.error("Failed to notify");
console.error(error.message);
}
}

receive(messageHandler) {
if (!this.isInited) {
console.warn('Can not receive. Notificator not inited');
receive(messageHandler) {
if (!this.isInitialized) {
console.warn("Can not receive. Notificator not inited");

return;
}
this.pubsub.subscribe('notifications', messageHandler);
return;
}
this.pubsub.subscribe("notifications", messageHandler);
}
}

module.exports = Notificator;
157 changes: 157 additions & 0 deletions lib/drivers/Redis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
const redis = require("redis");
const { formatMessage, parseMessage } = require("../utils");
const PubSubDriverInterface = require("./PubSubDriverInterface");

class Redis extends PubSubDriverInterface {
constructor(args) {
super(args);
if (!args.port) throw new Error('"port" is required');
if (!args.host) throw new Error('"host" is required');
if (!args.database) throw new Error('"database" is required');
this.isReconnecting = false;
this.port = args.port;
this.host = args.host;
this.database = args.database;
}

async connect() {
return new Promise(async (resolve) => {
try {
this.connection = await new Promise((res) => {
const subscriber = redis.createClient(this.port, this.host, {
db: this.database,
});
const publisher = redis.createClient(this.port, this.host, {
db: this.database,
});
res({ subscriber, publisher });
});
} catch (error) {
console.error(`Failed to connect to ${this.host}:${this.port}`);
await new Promise((res) => setTimeout(() => res(), 5000));
console.info("Trying to reconnect...");

return this.connect();
}

this.connection.subscriber.on("error", (error) => {
console.error(error);
});

this.connection.publisher.on("error", (error) => {
console.error(error);
});

this.connection.subscriber.on("ready", () => {
console.info(
`Subscriber connected to Redis on ${this.host}:${this.port}`
);
});

this.connection.publisher.on("ready", () => {
console.info(
`Publisher connected to Redis on ${this.host}:${this.port}`
);
});

if (this.isReconnecting) {
await this._recreateChannels();
await this._reassignHandlers();
console.info("Reconnected successfully.");
this.isReconnecting = false;
}

resolve(this.connection);
});
}

async _recreateChannels() {
console.info("Recreating channels...");
for (const channelName in this.channels) {
if (!this.channels[channelName]) continue;
await this.createChannel(channelName);
}
console.info("Recreating channels completed.");
}

_reassignHandlers() {
console.info("Reassigning handlers...");
for (const channelName in this.handlers) {
if (!this.handlers[channelName]) continue;
console.info(`For channel: "${channelName}"`);
for (const handler of this.handlers[channelName]) {
console.info(`Subscribing for handler: "${handler.name}"`);
this.subscribe(channelName, handler, true);
}
}
console.info("Reassign handlers completed.");
}

async createChannel(channelName) {
this.channels[channelName] = await new Promise((res, rej) => {
this.connection.subscriber.on("subscribe", function (channel, count) {
console.info(`Created channel "${channelName}"`);
res(channel);
});

this.connection.subscriber.subscribe(channelName);
});

if (!this.handlers[channelName]) this.handlers[channelName] = [];

return this.channels[channelName];
}

publish(exchange, message) {
try {
const formattedMessage = formatMessage(message);

console.info(
`Publishing message '${formattedMessage.slice(
0,
40
)}...' to channel "${exchange}"`
);
if (!this.channels[exchange])
throw Error(`Channel for exchange ${exchange} not exists`);
this.connection.publisher.publish(
exchange,
Buffer.from(formattedMessage)
);
} catch (error) {
console.log(error);
throw error;
}
}

subscribe(exchange, messageHandler, isReconnecting = false) {
console.log("subscribe()");
if (!this.channels[exchange])
throw Error(`Channel for queue ${exchange} not exists`);

this.connection.subscriber.on("message", (channel, message) => {
this._messageHandler({ channel, message }, messageHandler);
});

if (!isReconnecting) this.handlers[exchange].push(messageHandler);
}

close() {
console.log("close()");
this.connection.subscriber.end(true);
this.connection.publisher.end(true);
console.info("Closed connection.");
}

_messageHandler({ channel, message }, messageHandler) {
const messageString = message.toString();

console.info(
` [x] Received on channel ${channel}: "${messageString.slice(0, 40)}...`
);
if (typeof messageHandler === "function")
messageHandler(parseMessage(messageString));
}
}

module.exports = Redis;
Loading