From 35d22094ae95e3bc18cdd5fb0fad0889294eb0a1 Mon Sep 17 00:00:00 2001 From: linyihang <1091106900@qq.com> Date: Tue, 8 Mar 2022 15:10:19 +0800 Subject: [PATCH 1/2] fix bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix bug when sever failed,proxy will unregister the unused proxy of server now --- discovery.ts | 1 + proxy.ts | 18 ++++++++-- socket.ts | 91 +++++++++++++++++++++++++++++++++++++++++++++++++++ tsconfig.json | 9 +++-- 4 files changed, 113 insertions(+), 6 deletions(-) create mode 100644 socket.ts diff --git a/discovery.ts b/discovery.ts index fa96b11..dcddebe 100644 --- a/discovery.ts +++ b/discovery.ts @@ -25,6 +25,7 @@ export async function getNodeList(): Promise { return nodes.map(data => parseNode(data)); } + export function listen(cb: (action: Action, node: Node) => void) { sub.subscribe(DISCOVERY_CHANNEL); sub.on("message", (_: string, message: any) => { diff --git a/proxy.ts b/proxy.ts index f52c60e..d07d2bb 100644 --- a/proxy.ts +++ b/proxy.ts @@ -3,17 +3,20 @@ import http from "http"; import https from "https"; import httpProxy from "http-proxy"; import { getNodeList, listen, Node, Action, cleanUpNode } from "./discovery"; +import { TcpClient } from "./socket"; +import { Console } from "console"; const HTTPS_PORT = 443; const HTTP_PORT = Number(process.env.PORT || 80); const HTTP_IP = process.env.IP || '0.0.0.0'; const SOCKET_TIMEOUT = Number(process.env.SOCKET_TIMEOUT || 30000); // 30 seconds default socket timeout -const processIds: { [id: string]: httpProxy } = {} +const processIds: { [id: string]: httpProxy} = {} let currProxy: number = 0; -const proxies: httpProxy[] = []; +const proxies: httpProxy[] = []; +const clients: TcpClient[] = []; http.globalAgent = new http.Agent({ keepAlive: true }); https.globalAgent = new https.Agent({ keepAlive: true }); @@ -85,6 +88,16 @@ function register(node: Node) { proxies.push(proxy); currProxy = proxies.length - 1; + const client = new TcpClient(+port, host); + clients.push(client) + client.on("close",()=>{ + console.warn(`node ${node.processId}/${node.address} failed, unregistering`); + console.debug("close", node,processIds) + unregister(node) + cleanUpNode(node) + clients.splice(clients.indexOf(client), 1); + }) + } function unregister(node: Node) { @@ -94,7 +107,6 @@ function unregister(node: Node) { if(idx > -1) { proxies.splice(proxies.indexOf(proxy), 1); delete processIds[node.processId]; - currProxy = proxies.length - 1; } } diff --git a/socket.ts b/socket.ts new file mode 100644 index 0000000..a341d54 --- /dev/null +++ b/socket.ts @@ -0,0 +1,91 @@ +import * as net from "net"; +import { EventEmitter } from "events"; + +function connectCb1(){ return } + +export class TcpClient extends EventEmitter { + die: boolean = false; + remoteAddress: string = ""; + socket: net.Socket; + maxLen: number; + len: number = 0; + buffer: Buffer = Buffer.allocUnsafe(0); + + constructor(port: number, host: string, maxLen: number=5, noDelay: boolean = false, connectCb: () => void = connectCb1) { + super(); + this.socket = net.connect(port, host, () => { + this.remoteAddress = this.socket.remoteAddress as string; + connectCb(); + }); + this.socket.setNoDelay(noDelay); + this.maxLen = maxLen; + this.socket.on("close", (err) => { + if (!this.die) { + this.die = true; + this.emit("close", err); + } + }); + this.socket.on("error", (err) => { + if (!this.die) { + this.die = true; + this.emit("close", err); + } + }); + this.socket.on("data", (data) => { + if (!this.die) { + } else { + this.close(); + } + }); + } + + send(data: Buffer) { + this.socket.write(data); + } + + close() { + this.socket.destroy(); + this.socket.emit("close"); + } +} + + +/** + * Unpack + */ + export function decode(socket: TcpClient, msg: Buffer) { + let readLen = 0; + while (readLen < msg.length) { + if (socket.len === 0) //data length is unknown + { + socket.buffer = Buffer.concat([socket.buffer, Buffer.from([msg[readLen]])]); + if (socket.buffer.length === 4) { + socket.len = socket.buffer.readUInt32BE(0); + if (socket.len > socket.maxLen || socket.len === 0) { + socket.close(); + throw new Error("socket data length is longer then " + socket.maxLen + ", close it, " + socket.remoteAddress); + return; + } + socket.buffer = Buffer.allocUnsafe(socket.len); + } + readLen++; + } + else if (msg.length - readLen < socket.len) // data not coming all + { + msg.copy(socket.buffer, socket.buffer.length - socket.len, readLen); + socket.len -= (msg.length - readLen); + readLen = msg.length; + } + else { + msg.copy(socket.buffer, socket.buffer.length - socket.len, readLen, readLen + socket.len); + + readLen += socket.len; + socket.len = 0; + let data = socket.buffer; + socket.buffer = Buffer.allocUnsafe(0); + + //data coming all + socket.emit("data", data); + } + } +} \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json index eb95277..c9f9df7 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,9 +1,12 @@ { "compilerOptions": { - "target": "es5", + "outDir": "lib", + "target": "es6", "module": "commonjs", - "strict": true, - "esModuleInterop": true + "strict": false, + "esModuleInterop": true, + "sourceMap": true, + "experimentalDecorators": true }, "include": [ "**.ts" From 05e8f942c6d8e005f713599d1fc6b6f51aab5dc4 Mon Sep 17 00:00:00 2001 From: linyihang <1091106900@qq.com> Date: Tue, 8 Mar 2022 15:27:04 +0800 Subject: [PATCH 2/2] fix bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix bug: when sever failed,proxy will unregister the unused proxy of server now --- discovery.ts | 1 - proxy.ts | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/discovery.ts b/discovery.ts index dcddebe..fa96b11 100644 --- a/discovery.ts +++ b/discovery.ts @@ -25,7 +25,6 @@ export async function getNodeList(): Promise { return nodes.map(data => parseNode(data)); } - export function listen(cb: (action: Action, node: Node) => void) { sub.subscribe(DISCOVERY_CHANNEL); sub.on("message", (_: string, message: any) => { diff --git a/proxy.ts b/proxy.ts index d07d2bb..74d6e2a 100644 --- a/proxy.ts +++ b/proxy.ts @@ -11,7 +11,7 @@ const HTTP_PORT = Number(process.env.PORT || 80); const HTTP_IP = process.env.IP || '0.0.0.0'; const SOCKET_TIMEOUT = Number(process.env.SOCKET_TIMEOUT || 30000); // 30 seconds default socket timeout -const processIds: { [id: string]: httpProxy} = {} +const processIds: { [id: string]: httpProxy } = {} let currProxy: number = 0; @@ -91,8 +91,7 @@ function register(node: Node) { const client = new TcpClient(+port, host); clients.push(client) client.on("close",()=>{ - console.warn(`node ${node.processId}/${node.address} failed, unregistering`); - console.debug("close", node,processIds) + console.warn(`node ${node.processId}/${node.address} close, unregistering`); unregister(node) cleanUpNode(node) clients.splice(clients.indexOf(client), 1);