-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrepeater.js
More file actions
149 lines (128 loc) · 3.73 KB
/
repeater.js
File metadata and controls
149 lines (128 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
const ReconnectingWebSocket = require('reconnecting-websocket');
const WS = require('ws');
//////
// WebSocket server setup
function noop() {}
function heartbeat() {
this.isAlive = true;
}
function parse_message(message) {
let action = undefined;
let topic = undefined;
try {
message = JSON.parse(message);
action = message.action;
topic = message.topic;
} catch (e) {
console.error('Error parsing message:');
console.error(message);
console.error(e);
return;
}
if (action == 'subscribe' && topic == 'confirmation') this.subscribe_confirmations = true;
else if (action == 'unsubscribe' && topic == 'confirmation') this.subscribe_confirmations = false;
else if (action == 'subscribe' && topic == 'cps') this.subscribe_cps = true;
else if (action == 'unsubscribe' && topic == 'cps') this.subscribe_cps = false;
}
const wss = new WS.Server({ port: process.env.PORT });
wss.on('connection', function connection(ws, req) {
ws.isAlive = true;
ws.on('pong', heartbeat);
ws.on('message', parse_message);
});
const interval = setInterval(function ping() {
wss.clients.forEach(function each(ws) {
if (ws.isAlive === false) return ws.terminate();
ws.isAlive = false;
ws.ping(noop);
});
}, 30000);
wss.on('close', function close() {
clearInterval(interval);
});
// Block bucket; fills with blocks to be dumped to emit all
let block_bucket = [];
function block_dump() {
if (block_bucket.length == 0) return;
let data = {
dtg: new Date(),
topic: 'confirmation',
cps: get_cps(),
blocks: block_bucket,
duration: Number(process.env.BLOCK_DUMP_PERIODICITY)
}
wss.clients.forEach(function each(client) {
if (client !== ws && client.readyState === WS.OPEN && client.subscribe_confirmations == true) {
client.send(JSON.stringify(data));
}
});
block_bucket = [];
}
setInterval(block_dump, Number(process.env.BLOCK_DUMP_PERIODICITY));
// Confirmations per second
let blocks = new Array(30).fill(0);
setInterval(update_cps, 1*1000);
function update_cps() {
// Every second update the array
blocks = blocks.slice(1,);
blocks.push(0);
}
function get_cps() {
let cps = blocks.reduce(function(a, b) { return a + b; }, 0) / blocks.length;
return cps;
}
var cps_emit = setInterval(function() {
wss.clients.forEach(function each(client) {
if (client !== ws && client.readyState === WS.OPEN && client.subscribe_cps == true) {
client.send(JSON.stringify({
dtg: new Date(),
topic: 'cps',
cps: get_cps(),
seconds: blocks.length,
}));
}
});
}, process.env.CPS_PERIODICITY);
//////
// Messaging
// Connect to this host
let ws_host = 'ws://localhost:7078'
// Create a websocket and reconnect if broken
ws = new ReconnectingWebSocket(ws_host, [], {
WebSocket: WS,
connectionTimeout: 1000,
maxRetries: Infinity,
maxReconnectionDelay: 8000,
minReconnectionDelay: 3000
});
// A tracked account was detected
ws.onmessage = msg => {
if (typeof msg.data === 'string') {
let data = JSON.parse(msg.data);
if (data.topic == 'confirmation') {
block_bucket.push(data.message);
blocks[blocks.length-1] += 1;
}
// console.log(msg.data)
}
}
// As soon as we connect, subscribe to confirmations
ws.onopen = () => {
console.log('WebSocket Client Connected')
if (ws.readyState === ws.OPEN) {
let msg = {
"action": "subscribe",
"topic": "confirmation",
"options": {
"confirmation_type": "all"
}
}
ws.send(JSON.stringify(msg))
}
}
ws.onclose = () => {
console.log("WebSocket Client Closed")
}
ws.onerror = (e) => {
console.error("Websocket: " + e.error)
}