From 273174a6a594d3744af6fa57ad1bb84a9d3e1e85 Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 00:10:13 +0330 Subject: [PATCH 01/11] QUIC implementation --- README.md | 59 ++- config/config.go | 1 + go.mod | 7 + go.sum | 14 + internal/client/client.go | 15 + internal/client/transport/quic.go | 552 +++++++++++++++++++ internal/server/server.go | 17 + internal/server/transport/quic.go | 675 ++++++++++++++++++++++++ internal/utils/network/quic_dialer.go | 58 ++ internal/utils/network/quic_listener.go | 83 +++ 10 files changed, 1478 insertions(+), 3 deletions(-) create mode 100644 internal/client/transport/quic.go create mode 100644 internal/server/transport/quic.go create mode 100644 internal/utils/network/quic_dialer.go create mode 100644 internal/utils/network/quic_listener.go diff --git a/README.md b/README.md index d451d01..08edb13 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Welcome to the **`Backhaul`** project! This project provides a high-performance - [TCP Configuration](#tcp-configuration) - [TCP Multiplexing Configuration](#tcp-multiplexing-configuration) - [UDP Configuration](#udp-configuration) + - [QUIC Configuration](#quic-configuration) - [WebSocket Configuration](#websocket-configuration) - [Secure WebSocket Configuration](#secure-websocket-configuration) - [WS Multiplexing Configuration](#ws-multiplexing-configuration) @@ -36,7 +37,7 @@ This project offers a robust reverse tunneling solution to overcome NAT and fire ## Features * **High Performance**: Optimized for handling massive concurrent connections efficiently. -* **Protocol Flexibility**: Supports TCP, WebSocket (WS), and Secure WebSocket (WSS) transports. +* **Protocol Flexibility**: Supports TCP, UDP, QUIC, WebSocket (WS), and Secure WebSocket (WSS) transports. * **UDP over TCP**: Implements UDP traffic encapsulation and forwarding over a TCP connection for reliable delivery with built-in congestion control. * **Multiplexing**: Enables multiple connections over a single transport with SMUX. * **NAT & Firewall Bypass**: Overcomes restrictions with reverse tunneling. @@ -84,7 +85,7 @@ To start using the solution, you'll need to configure both server and client com ```toml [server]# Local, IRAN bind_addr = "0.0.0.0:3080" # Address and port for the server to listen on (mandatory). - transport = "tcp" # Protocol to use ("tcp", "tcpmux", "ws", "wss", "wsmux", "wssmux". mandatory). + transport = "tcp" # Protocol to use ("tcp", "tcpmux", "ws", "wss", "wsmux", "wssmux", "udp", "quic". mandatory). accept_udp = false # Enable transferring UDP connections over TCP transport. (optional, default: false) token = "your_token" # Authentication token for secure communication (optional). keepalive_period = 75 # Interval in seconds to send keep-alive packets.(optional, default: 75s) @@ -134,7 +135,7 @@ To start using the solution, you'll need to configure both server and client com [client] # Behind NAT, firewall-blocked remote_addr = "0.0.0.0:3080" # Server address and port (mandatory). edge_ip = "188.114.96.0" # Edge IP used for CDN connection, specifically for WebSocket-based transports.(Optional, default none) - transport = "tcp" # Protocol to use ("tcp", "tcpmux", "ws", "wss", "wsmux", "wssmux". mandatory). + transport = "tcp" # Protocol to use ("tcp", "tcpmux", "ws", "wss", "wsmux", "wssmux", "udp", "quic". mandatory). token = "your_token" # Authentication token for secure communication (optional). connection_pool = 8 # Number of pre-established connections.(optional, default: 8). aggressive_pool = false # Enables aggressive connection pool management.(optional, default: false). @@ -299,6 +300,58 @@ To start using the solution, you'll need to configure both server and client com ``` +#### QUIC Configuration +* **Server**: + + ```toml + [server] + bind_addr = "0.0.0.0:443" + transport = "quic" + token = "your_token" + heartbeat = 40 + channel_size = 2048 + sniffer = false + web_port = 2060 + sniffer_log = "/root/backhaul.json" + log_level = "info" + ports = ["80", "8080", "443=127.0.0.1:8443"] + tls_cert = "/path/to/cert.pem" # Optional: TLS certificate file + tls_key = "/path/to/key.pem" # Optional: TLS private key file + ``` + +* **Client**: + + ```toml + [client] + remote_addr = "your-server.com:443" + transport = "quic" + token = "your_token" + connection_pool = 8 + aggressive_pool = false + retry_interval = 3 + dial_timeout = 10 + sniffer = false + web_port = 2060 + sniffer_log = "/root/backhaul.json" + log_level = "info" + ``` + + **QUIC Features:** + - Built on QUIC protocol for improved performance and reliability + - Automatic connection multiplexing over a single QUIC connection + - TLS 1.3 encryption by default + - Low latency and fast connection establishment (0-RTT support) + - Better handling of network changes and mobility + - Efficient use of network resources + - If no TLS certificates are provided, self-signed certificates are generated automatically + + **Notes:** + - QUIC requires UDP connectivity between client and server + - Default port is 443 (HTTPS) but you can use any port + - QUIC provides better performance than TCP in high-latency or lossy networks + - Ideal for scenarios with frequent network changes or mobile connections + + ``` #### WebSocket Configuration * **Server**: diff --git a/config/config.go b/config/config.go index 197b00a..3586391 100644 --- a/config/config.go +++ b/config/config.go @@ -11,6 +11,7 @@ const ( WSMUX TransportType = "wsmux" WSSMUX TransportType = "wssmux" UDP TransportType = "udp" + QUIC TransportType = "quic" ) // ServerConfig represents the configuration for the server. diff --git a/go.mod b/go.mod index c678e54..4c1b0f3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.1 require ( github.com/BurntSushi/toml v1.4.0 github.com/gorilla/websocket v1.5.3 + github.com/quic-go/quic-go v0.54.0 github.com/shirou/gopsutil/v4 v4.24.8 github.com/sirupsen/logrus v1.9.3 github.com/xtaci/smux v1.5.27 @@ -18,5 +19,11 @@ require ( github.com/tklauser/go-sysconf v0.3.14 // indirect github.com/tklauser/numcpus v0.8.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.uber.org/mock v0.5.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/mod v0.18.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect + golang.org/x/tools v0.22.0 // indirect ) diff --git a/go.sum b/go.sum index 20b1cee..8aff8db 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg= +github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= github.com/shirou/gopsutil/v4 v4.24.8 h1:pVQjIenQkIhqO81mwTaXjTzOMT7d3TZkf43PlVFHENI= github.com/shirou/gopsutil/v4 v4.24.8/go.mod h1:wE0OrJtj4dG+hYkxqDH3QiBICdKSf04/npcvLLc/oRg= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= @@ -36,11 +38,23 @@ github.com/xtaci/smux v1.5.27 h1:uIU1dpJQQWUCmGxXBgajLfc8cMMb13hCitj+HC5yC/Q= github.com/xtaci/smux v1.5.27/go.mod h1:OMlQbT5vcgl2gb49mFkYo6SMf+zP3rcjcwQz7ZU7IGY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= +go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= +golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= +golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/client/client.go b/internal/client/client.go index ae67ee4..0caa5b9 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -148,6 +148,21 @@ func (c *Client) Start() { udpClient := transport.NewUDPClient(c.ctx, udpConfig, c.logger) go udpClient.Start() + case config.QUIC: + quicConfig := &transport.QuicConfig{ + RemoteAddr: c.config.RemoteAddr, + RetryInterval: time.Duration(c.config.RetryInterval) * time.Second, + DialTimeOut: time.Duration(c.config.DialTimeout) * time.Second, + ConnPoolSize: c.config.ConnectionPool, + Token: c.config.Token, + Sniffer: c.config.Sniffer, + WebPort: c.config.WebPort, + SnifferLog: c.config.SnifferLog, + AggressivePool: c.config.AggressivePool, + } + quicClient := transport.NewQUICClient(c.ctx, quicConfig, c.logger) + go quicClient.Start() + default: c.logger.Fatal("invalid transport type: ", c.config.Transport) } diff --git a/internal/client/transport/quic.go b/internal/client/transport/quic.go new file mode 100644 index 0000000..db645b6 --- /dev/null +++ b/internal/client/transport/quic.go @@ -0,0 +1,552 @@ +package transport + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "net" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/musix/backhaul/internal/utils" + "github.com/musix/backhaul/internal/utils/network" + "github.com/musix/backhaul/internal/web" + "github.com/quic-go/quic-go" + + "github.com/sirupsen/logrus" +) + +type QuicTransport struct { + config *QuicConfig + parentctx context.Context + ctx context.Context + cancel context.CancelFunc + logger *logrus.Logger + controlChannel *quic.Stream + quicConn *quic.Conn + usageMonitor *web.Usage + restartMutex sync.Mutex + poolConnections int32 + loadConnections int32 + controlFlow chan struct{} +} + +type QuicConfig struct { + RemoteAddr string + Token string + SnifferLog string + TunnelStatus string + RetryInterval time.Duration + DialTimeOut time.Duration + ConnPoolSize int + WebPort int + Sniffer bool + AggressivePool bool +} + +func NewQUICClient(parentCtx context.Context, config *QuicConfig, logger *logrus.Logger) *QuicTransport { + + ctx, cancel := context.WithCancel(parentCtx) + + + client := &QuicTransport{ + config: config, + parentctx: parentCtx, + ctx: ctx, + cancel: cancel, + logger: logger, + controlChannel: nil, + quicConn: nil, + usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), + poolConnections: 0, + loadConnections: 0, + controlFlow: make(chan struct{}, 100), + } + + return client +} + +func (c *QuicTransport) Start() { + if c.config.WebPort > 0 { + go c.usageMonitor.Monitor() + } + + c.config.TunnelStatus = "Disconnected (QUIC)" + + go c.channelDialer() +} + +func (c *QuicTransport) Restart() { + if !c.restartMutex.TryLock() { + c.logger.Warn("client is already restarting") + return + } + defer c.restartMutex.Unlock() + + c.logger.Info("restarting client...") + + + level := c.logger.Level + c.logger.SetLevel(logrus.FatalLevel) + + if c.cancel != nil { + c.cancel() + } + + + if c.quicConn != nil { + c.quicConn.CloseWithError(0, "restart") + } + + time.Sleep(2 * time.Second) + + ctx, cancel := context.WithCancel(c.parentctx) + c.ctx = ctx + c.cancel = cancel + + + c.controlChannel = nil + c.quicConn = nil + c.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", c.config.WebPort), ctx, c.config.SnifferLog, c.config.Sniffer, &c.config.TunnelStatus, c.logger) + c.config.TunnelStatus = "" + c.poolConnections = 0 + c.loadConnections = 0 + c.controlFlow = make(chan struct{}, 100) + + + c.logger.SetLevel(level) + + go c.Start() +} + +func (c *QuicTransport) channelDialer() { + c.logger.Info("attempting to establish a new QUIC control channel connection...") + + for { + select { + case <-c.ctx.Done(): + return + default: + + quicConn, err := network.QUICDialer(c.ctx, c.config.RemoteAddr, c.config.DialTimeOut, 3) + if err != nil { + c.logger.Errorf("QUIC dialer: %v", err) + time.Sleep(c.config.RetryInterval) + continue + } + + + stream, err := quicConn.OpenStreamSync(c.ctx) + if err != nil { + c.logger.Errorf("failed to open control stream: %v", err) + quicConn.CloseWithError(0, "stream open failed") + time.Sleep(c.config.RetryInterval) + continue + } + + + err = sendBinaryTransportString(stream, c.config.Token, utils.SG_Chan) + if err != nil { + c.logger.Errorf("failed to send security token: %v", err) + stream.Close() + quicConn.CloseWithError(0, "token send failed") + continue + } + + + stream.SetReadDeadline(time.Now().Add(2 * time.Second)) + + + message, _, err := receiveBinaryTransportString(stream) + if err != nil { + c.logger.Errorf("failed to receive control channel response: %v", err) + stream.Close() + quicConn.CloseWithError(0, "token response failed") + time.Sleep(c.config.RetryInterval) + continue + } + + + stream.SetReadDeadline(time.Time{}) + + if message == c.config.Token { + c.quicConn = quicConn + c.controlChannel = stream + c.logger.Info("QUIC control channel established successfully") + + c.config.TunnelStatus = "Connected (QUIC)" + go c.poolMaintainer() + go c.channelHandler() + + return + + } else { + c.logger.Errorf("invalid token received. Expected: %s, Received: %s. Retrying...", c.config.Token, message) + stream.Close() + quicConn.CloseWithError(0, "invalid token") + time.Sleep(c.config.RetryInterval) + continue + } + } + } +} + +func (c *QuicTransport) poolMaintainer() { + for i := 0; i < c.config.ConnPoolSize; i++ { + go c.streamDialer() + } + + + a := 4 + b := 5 + x := 3 + y := 4.0 + + if c.config.AggressivePool { + c.logger.Info("aggressive pool management enabled") + a = 1 + b = 2 + x = 0 + y = 0.75 + } + + tickerPool := time.NewTicker(time.Second * 1) + defer tickerPool.Stop() + + tickerLoad := time.NewTicker(time.Second * 10) + defer tickerLoad.Stop() + + newPoolSize := c.config.ConnPoolSize + var poolConnectionsSum int32 = 0 + + for { + select { + case <-c.ctx.Done(): + return + + case <-tickerPool.C: + + atomic.AddInt32(&poolConnectionsSum, atomic.LoadInt32(&c.poolConnections)) + + case <-tickerLoad.C: + + loadConnections := (int(atomic.LoadInt32(&c.loadConnections)) + 9) / 10 + atomic.StoreInt32(&c.loadConnections, 0) + + + poolConnectionsAvg := (int(atomic.LoadInt32(&poolConnectionsSum)) + 9) / 10 + atomic.StoreInt32(&poolConnectionsSum, 0) + + + if (loadConnections + a) > poolConnectionsAvg*b { + c.logger.Debugf("increasing pool size: %d -> %d, avg pool conn: %d, avg load conn: %d", newPoolSize, newPoolSize+1, poolConnectionsAvg, loadConnections) + newPoolSize++ + + + go c.streamDialer() + } else if float64(loadConnections+x) < float64(poolConnectionsAvg)*y && newPoolSize > c.config.ConnPoolSize { + c.logger.Debugf("decreasing pool size: %d -> %d, avg pool conn: %d, avg load conn: %d", newPoolSize, newPoolSize-1, poolConnectionsAvg, loadConnections) + newPoolSize-- + + + c.controlFlow <- struct{}{} + } + } + } +} + +func (c *QuicTransport) channelHandler() { + msgChan := make(chan byte, 1000) + + + go func() { + for { + select { + case <-c.ctx.Done(): + return + default: + msg, err := receiveBinaryByte(c.controlChannel) + if err != nil { + if c.cancel != nil { + c.logger.Error("failed to read from control channel. ", err) + go c.Restart() + } + return + } + msgChan <- msg + } + } + }() + + + for { + select { + case <-c.ctx.Done(): + _ = sendBinaryByte(c.controlChannel, utils.SG_Closed) + return + + case msg := <-msgChan: + switch msg { + case utils.SG_Chan: + atomic.AddInt32(&c.loadConnections, 1) + + select { + case <-c.controlFlow: + + default: + c.logger.Debug("channel signal received, initiating stream dialer") + go c.streamDialer() + } + + case utils.SG_HB: + c.logger.Debug("heartbeat signal received successfully") + + case utils.SG_Closed: + c.logger.Warn("control channel has been closed by the server") + go c.Restart() + return + + case utils.SG_RTT: + err := sendBinaryByte(c.controlChannel, utils.SG_RTT) + if err != nil { + c.logger.Error("failed to send RTT signal, restarting client: ", err) + go c.Restart() + return + } + + default: + c.logger.Errorf("unexpected response from channel: %v.", msg) + go c.Restart() + return + } + } + } +} + + +func (c *QuicTransport) streamDialer() { + c.logger.Debugf("initiating new QUIC stream to tunnel server") + + + stream, err := c.quicConn.OpenStreamSync(c.ctx) + if err != nil { + c.logger.Error("failed to open QUIC stream: ", err) + return + } + + + atomic.AddInt32(&c.poolConnections, 1) + + + remoteAddr, transport, err := receiveBinaryTransportString(stream) + + + atomic.AddInt32(&c.poolConnections, -1) + + if err != nil { + c.logger.Debugf("failed to receive port from tunnel stream: %v", err) + stream.Close() + return + } + + + port, resolvedAddr, err := network.ResolveRemoteAddr(remoteAddr) + if err != nil { + c.logger.Infof("failed to resolve remote port: %v", err) + stream.Close() + return + } + + switch transport { + case utils.SG_TCP: + + c.localDialer(stream, resolvedAddr, port) + + case utils.SG_UDP: + + c.logger.Debug("UDP over QUIC not yet implemented") + stream.Close() + + default: + c.logger.Error("undefined transport. close the connection.") + stream.Close() + } +} + +func (c *QuicTransport) localDialer(stream *quic.Stream, resolvedAddr string, port int) { + var sendBuf, recvBuf int + + if strings.Contains(resolvedAddr, "127.0.0.1") { + + sendBuf = 32 * 1024 + recvBuf = 32 * 1024 + } else { + + sendBuf = 0 + recvBuf = 0 + } + + localConnection, err := network.TcpDialer(c.ctx, resolvedAddr, "", c.config.DialTimeOut, 30*time.Second, true, 1, recvBuf, sendBuf, 0) + if err != nil { + c.logger.Errorf("local dialer: %v", err) + stream.Close() + return + } + + c.logger.Debugf("connected to local address %s successfully", resolvedAddr) + + + c.handleQUICStream(stream, localConnection, port) +} + + +func (c *QuicTransport) handleQUICStream(stream *quic.Stream, localConn net.Conn, port int) { + defer stream.Close() + defer localConn.Close() + + done := make(chan struct{}, 2) + + + go func() { + defer func() { done <- struct{}{} }() + defer stream.Close() + + buf := make([]byte, 32*1024) + for { + select { + case <-c.ctx.Done(): + return + default: + localConn.SetReadDeadline(time.Now().Add(30 * time.Second)) + n, err := localConn.Read(buf) + if err != nil { + if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() { + c.logger.Debugf("failed to read from local connection: %v", err) + } + return + } + localConn.SetReadDeadline(time.Time{}) + + if c.config.Sniffer { + c.usageMonitor.AddOrUpdatePort(port, uint64(n)) + } + + _, err = stream.Write(buf[:n]) + if err != nil { + c.logger.Debugf("failed to write to QUIC stream: %v", err) + return + } + } + } + }() + + + go func() { + defer func() { done <- struct{}{} }() + defer localConn.Close() + + buf := make([]byte, 32*1024) + for { + select { + case <-c.ctx.Done(): + return + default: + stream.SetReadDeadline(time.Now().Add(30 * time.Second)) + n, err := stream.Read(buf) + if err != nil { + c.logger.Debugf("failed to read from QUIC stream: %v", err) + return + } + stream.SetReadDeadline(time.Time{}) + + if c.config.Sniffer { + c.usageMonitor.AddOrUpdatePort(port, uint64(n)) + } + + _, err = localConn.Write(buf[:n]) + if err != nil { + c.logger.Debugf("failed to write to local connection: %v", err) + return + } + } + } + }() + + + <-done +} + + +func receiveBinaryByte(stream *quic.Stream) (byte, error) { + buf := make([]byte, 1) + _, err := io.ReadFull(stream, buf) + if err != nil { + return 0, err + } + return buf[0], nil +} + + +func sendBinaryByte(stream *quic.Stream, data byte) error { + buf := []byte{data} + _, err := stream.Write(buf) + return err +} + + +func receiveBinaryTransportString(stream *quic.Stream) (string, byte, error) { + + header := make([]byte, 3) + _, err := io.ReadFull(stream, header) + if err != nil { + return "", 0, err + } + + + strLen := binary.BigEndian.Uint16(header[:2]) + transport := header[2] + + + if strLen > 0 { + strData := make([]byte, strLen) + _, err = io.ReadFull(stream, strData) + if err != nil { + return "", 0, err + } + return string(strData), transport, nil + } + + return "", transport, nil +} + + +func sendBinaryTransportString(stream *quic.Stream, data string, transport byte) error { + strLen := len(data) + if strLen > 65535 { + return fmt.Errorf("string too long: %d", strLen) + } + + + header := make([]byte, 3) + binary.BigEndian.PutUint16(header[:2], uint16(strLen)) + header[2] = transport + + + _, err := stream.Write(header) + if err != nil { + return err + } + + + if strLen > 0 { + _, err = stream.Write([]byte(data)) + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/server/server.go b/internal/server/server.go index c477cef..ce1f176 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -149,6 +149,23 @@ func (s *Server) Start() { udpServer := transport.NewUDPServer(s.ctx, udpConfig, s.logger) go udpServer.Start() + case config.QUIC: + quicConfig := &transport.QuicConfig{ + BindAddr: s.config.BindAddr, + Heartbeat: time.Duration(s.config.Heartbeat) * time.Second, + Token: s.config.Token, + ChannelSize: s.config.ChannelSize, + Ports: s.config.Ports, + Sniffer: s.config.Sniffer, + WebPort: s.config.WebPort, + SnifferLog: s.config.SnifferLog, + TLSCertFile: s.config.TLSCertFile, + TLSKeyFile: s.config.TLSKeyFile, + } + + quicServer := transport.NewQUICServer(s.ctx, quicConfig, s.logger) + go quicServer.Start() + default: s.logger.Fatal("invalid transport type: ", s.config.Transport) } diff --git a/internal/server/transport/quic.go b/internal/server/transport/quic.go new file mode 100644 index 0000000..ffea180 --- /dev/null +++ b/internal/server/transport/quic.go @@ -0,0 +1,675 @@ +package transport + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "net" + "runtime" + "strconv" + "strings" + "sync" + "time" + + "github.com/musix/backhaul/internal/utils" + "github.com/musix/backhaul/internal/utils/network" + "github.com/musix/backhaul/internal/web" + "github.com/quic-go/quic-go" + + "github.com/sirupsen/logrus" +) + +type QuicTransport struct { + config *QuicConfig + parentctx context.Context + ctx context.Context + cancel context.CancelFunc + logger *logrus.Logger + streamChannel chan *quic.Stream + localChannel chan LocalTCPConn + reqNewConnChan chan struct{} + controlChannel *quic.Stream + quicListener *quic.Listener + restartMutex sync.Mutex + usageMonitor *web.Usage + rtt int64 +} + +type QuicConfig struct { + BindAddr string + Token string + SnifferLog string + TunnelStatus string + Ports []string + Sniffer bool + Heartbeat time.Duration + ChannelSize int + WebPort int + TLSCertFile string + TLSKeyFile string +} + +func NewQUICServer(parentCtx context.Context, config *QuicConfig, logger *logrus.Logger) *QuicTransport { + + ctx, cancel := context.WithCancel(parentCtx) + + + server := &QuicTransport{ + config: config, + parentctx: parentCtx, + ctx: ctx, + cancel: cancel, + logger: logger, + streamChannel: make(chan *quic.Stream, config.ChannelSize), + localChannel: make(chan LocalTCPConn, config.ChannelSize), + reqNewConnChan: make(chan struct{}, config.ChannelSize), + controlChannel: nil, + quicListener: nil, + usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), + rtt: 0, + } + + return server +} + +func (s *QuicTransport) Start() { + s.config.TunnelStatus = "Disconnected (QUIC)" + + if s.config.WebPort > 0 { + go s.usageMonitor.Monitor() + } + + go s.streamListener() + + s.channelHandshake() + + if s.controlChannel != nil { + s.config.TunnelStatus = "Connected (QUIC)" + + numCPU := runtime.NumCPU() + if numCPU > 4 { + numCPU = 4 + } + + go s.parsePortMappings() + go s.channelHandler() + + s.logger.Infof("starting %d handle loops on each CPU thread", numCPU) + + for i := 0; i < numCPU; i++ { + go s.handleLoop() + } + } +} + +func (s *QuicTransport) Restart() { + if !s.restartMutex.TryLock() { + s.logger.Warn("server restart already in progress, skipping restart attempt") + return + } + defer s.restartMutex.Unlock() + + s.logger.Info("restarting server...") + + + level := s.logger.Level + s.logger.SetLevel(logrus.FatalLevel) + + if s.cancel != nil { + s.cancel() + } + + + if s.quicListener != nil { + s.quicListener.Close() + } + + time.Sleep(2 * time.Second) + + ctx, cancel := context.WithCancel(s.parentctx) + s.ctx = ctx + s.cancel = cancel + + + s.streamChannel = make(chan *quic.Stream, s.config.ChannelSize) + s.localChannel = make(chan LocalTCPConn, s.config.ChannelSize) + s.reqNewConnChan = make(chan struct{}, s.config.ChannelSize) + s.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", s.config.WebPort), ctx, s.config.SnifferLog, s.config.Sniffer, &s.config.TunnelStatus, s.logger) + s.config.TunnelStatus = "" + s.controlChannel = nil + s.quicListener = nil + + + s.logger.SetLevel(level) + + go s.Start() +} + +func (s *QuicTransport) channelHandshake() { + for { + select { + case <-s.ctx.Done(): + return + case stream := <-s.streamChannel: + + if err := stream.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { + s.logger.Errorf("failed to set read deadline: %v", err) + stream.Close() + continue + } + + msg, transport, err := receiveBinaryTransportString(stream) + if transport != utils.SG_Chan { + s.logger.Errorf("invalid signal received for channel, Discarding connection") + stream.Close() + continue + } else if err != nil { + if _, ok := err.(net.Error); ok { + s.logger.Warn("timeout while waiting for control channel signal") + } else { + s.logger.Errorf("failed to receive control channel signal: %v", err) + } + stream.Close() + continue + } + + + stream.SetReadDeadline(time.Time{}) + + if msg != s.config.Token { + s.logger.Warnf("invalid security token received: %s", msg) + stream.Close() + continue + } + + err = sendBinaryTransportString(stream, s.config.Token, utils.SG_Chan) + if err != nil { + s.logger.Errorf("failed to send security token: %v", err) + stream.Close() + continue + } + + s.controlChannel = stream + + s.logger.Info("QUIC control channel successfully established.") + return + } + } +} + +func (s *QuicTransport) channelHandler() { + ticker := time.NewTicker(s.config.Heartbeat) + defer ticker.Stop() + + + messageChan := make(chan byte, 1) + + go func() { + for { + select { + case <-s.ctx.Done(): + return + default: + message, err := receiveBinaryByte(s.controlChannel) + if err != nil { + if s.cancel != nil { + s.logger.Error("failed to read from channel connection. ", err) + go s.Restart() + } + return + } + messageChan <- message + } + } + }() + + + rtt := time.Now() + err := sendBinaryByte(s.controlChannel, utils.SG_RTT) + if err != nil { + s.logger.Error("failed to send RTT signal, attempting to restart server...") + go s.Restart() + return + } + + for { + select { + case <-s.ctx.Done(): + _ = sendBinaryByte(s.controlChannel, utils.SG_Closed) + return + + case <-s.reqNewConnChan: + err := sendBinaryByte(s.controlChannel, utils.SG_Chan) + if err != nil { + s.logger.Error("failed to send request new connection signal. ", err) + go s.Restart() + return + } + + case <-ticker.C: + err := sendBinaryByte(s.controlChannel, utils.SG_HB) + if err != nil { + s.logger.Error("failed to send heartbeat signal") + go s.Restart() + return + } + s.logger.Trace("heartbeat signal sent successfully") + + case message, ok := <-messageChan: + if !ok { + s.logger.Error("channel closed, likely due to an error in QUIC read") + return + } + + if message == utils.SG_Closed { + s.logger.Warn("control channel has been closed by the client") + go s.Restart() + return + + } else if message == utils.SG_RTT { + measureRTT := time.Since(rtt) + s.rtt = measureRTT.Milliseconds() + s.logger.Infof("Round Trip Time (RTT): %d ms", s.rtt) + } + } + } +} + +func (s *QuicTransport) streamListener() { + listener, err := network.QUICListen(s.config.BindAddr, s.config.TLSCertFile, s.config.TLSKeyFile) + if err != nil { + s.logger.Fatalf("failed to start QUIC listener on %s: %v", s.config.BindAddr, err) + return + } + + defer listener.Close() + s.quicListener = listener + + s.logger.Infof("QUIC server started successfully, listening on address: %s", listener.Addr().String()) + + go s.acceptQUICConn(listener) + + <-s.ctx.Done() +} + +func (s *QuicTransport) acceptQUICConn(listener *quic.Listener) { + for { + select { + case <-s.ctx.Done(): + return + default: + s.logger.Debugf("waiting for incoming QUIC connection on %s", listener.Addr().String()) + conn, err := listener.Accept(s.ctx) + if err != nil { + s.logger.Debugf("failed to accept QUIC connection on %s: %v", listener.Addr().String(), err) + continue + } + + + go s.handleQUICConnection(conn) + } + } +} + +func (s *QuicTransport) handleQUICConnection(conn *quic.Conn) { + defer conn.CloseWithError(0, "connection handler finished") + + for { + select { + case <-s.ctx.Done(): + return + default: + + stream, err := conn.AcceptStream(s.ctx) + if err != nil { + s.logger.Debugf("failed to accept stream from %s: %v", conn.RemoteAddr().String(), err) + return + } + + select { + case s.streamChannel <- stream: + s.logger.Debugf("accepted new QUIC stream from %s", conn.RemoteAddr().String()) + default: + s.logger.Warnf("stream channel is full, discarding QUIC stream from %s", conn.RemoteAddr().String()) + stream.Close() + } + } + } +} + +func (s *QuicTransport) parsePortMappings() { + for _, portMapping := range s.config.Ports { + parts := strings.Split(portMapping, "=") + + var localAddr, remoteAddr string + + + if len(parts) == 1 { + localPortOrRange := strings.TrimSpace(parts[0]) + remoteAddr = localPortOrRange + + + if strings.Contains(localPortOrRange, "-") { + rangeParts := strings.Split(localPortOrRange, "-") + if len(rangeParts) != 2 { + s.logger.Fatalf("invalid port range format: %s", localPortOrRange) + } + + + startPort, err := strconv.Atoi(strings.TrimSpace(rangeParts[0])) + if err != nil || startPort < 1 || startPort > 65535 { + s.logger.Fatalf("invalid start port in range: %s", rangeParts[0]) + } + + endPort, err := strconv.Atoi(strings.TrimSpace(rangeParts[1])) + if err != nil || endPort < 1 || endPort > 65535 || endPort < startPort { + s.logger.Fatalf("invalid end port in range: %s", rangeParts[1]) + } + + + for port := startPort; port <= endPort; port++ { + localAddr = fmt.Sprintf(":%d", port) + go s.localListener(localAddr, strconv.Itoa(port)) + time.Sleep(1 * time.Millisecond) + } + continue + } else { + + port, err := strconv.Atoi(localPortOrRange) + if err != nil || port < 1 || port > 65535 { + s.logger.Fatalf("invalid port format: %s", localPortOrRange) + } + localAddr = fmt.Sprintf(":%d", port) + } + } else if len(parts) == 2 { + + localPortOrRange := strings.TrimSpace(parts[0]) + remoteAddr = strings.TrimSpace(parts[1]) + + + if strings.Contains(localPortOrRange, "-") { + rangeParts := strings.Split(localPortOrRange, "-") + if len(rangeParts) != 2 { + s.logger.Fatalf("invalid port range format: %s", localPortOrRange) + } + + + startPort, err := strconv.Atoi(strings.TrimSpace(rangeParts[0])) + if err != nil || startPort < 1 || startPort > 65535 { + s.logger.Fatalf("invalid start port in range: %s", rangeParts[0]) + } + + endPort, err := strconv.Atoi(strings.TrimSpace(rangeParts[1])) + if err != nil || endPort < 1 || endPort > 65535 || endPort < startPort { + s.logger.Fatalf("invalid end port in range: %s", rangeParts[1]) + } + + + for port := startPort; port <= endPort; port++ { + localAddr = fmt.Sprintf(":%d", port) + go s.localListener(localAddr, remoteAddr) + time.Sleep(1 * time.Millisecond) + } + continue + } else { + + port, err := strconv.Atoi(localPortOrRange) + if err == nil && port > 1 && port < 65535 { + localAddr = fmt.Sprintf(":%d", port) + } else { + localAddr = localPortOrRange + } + } + } else { + s.logger.Fatalf("invalid port mapping format: %s", portMapping) + } + + go s.localListener(localAddr, remoteAddr) + } +} + +func (s *QuicTransport) localListener(localAddr string, remoteAddr string) { + listener, err := net.Listen("tcp", localAddr) + if err != nil { + s.logger.Fatalf("failed to listen on %s: %v", localAddr, err) + return + } + + defer listener.Close() + + s.logger.Infof("TCP listener started successfully, listening on address: %s", listener.Addr().String()) + + go s.acceptLocalConn(listener, remoteAddr) + + <-s.ctx.Done() +} + +func (s *QuicTransport) acceptLocalConn(listener net.Listener, remoteAddr string) { + for { + select { + case <-s.ctx.Done(): + return + + default: + s.logger.Debugf("waiting for accept incoming connection on %s", listener.Addr().String()) + conn, err := listener.Accept() + if err != nil { + s.logger.Debugf("failed to accept connection on %s: %v", listener.Addr().String(), err) + continue + } + + + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + s.logger.Warnf("disarded non-TCP connection from %s", conn.RemoteAddr().String()) + conn.Close() + continue + } + + select { + case s.localChannel <- LocalTCPConn{conn: conn, remoteAddr: remoteAddr, timeCreated: time.Now().UnixMilli()}: + + select { + case s.reqNewConnChan <- struct{}{}: + + default: + + s.logger.Warn("channel is full, cannot request a new connection") + } + + s.logger.Debugf("accepted incoming TCP connection from %s", tcpConn.RemoteAddr().String()) + + default: + s.logger.Warnf("channel with listener %s is full, discarding TCP connection from %s", listener.Addr().String(), tcpConn.LocalAddr().String()) + conn.Close() + } + } + } +} + +func (s *QuicTransport) handleLoop() { + for { + select { + case <-s.ctx.Done(): + return + case localConn := <-s.localChannel: + loop: + for { + if time.Now().UnixMilli()-localConn.timeCreated > 3000 { + s.logger.Debugf("timeouted local connection: %d ms", time.Now().UnixMilli()-localConn.timeCreated) + localConn.conn.Close() + break loop + } + + select { + case <-s.ctx.Done(): + return + + case stream := <-s.streamChannel: + + if err := sendBinaryTransportString(stream, localConn.remoteAddr, utils.SG_TCP); err != nil { + s.logger.Errorf("%v", err) + stream.Close() + continue loop + } + + + go s.handleQUICStream(localConn.conn, stream, localConn.conn.LocalAddr().(*net.TCPAddr).Port) + break loop + } + } + } + } +} + + +func (s *QuicTransport) handleQUICStream(localConn net.Conn, stream *quic.Stream, port int) { + defer stream.Close() + defer localConn.Close() + + done := make(chan struct{}, 2) + + + go func() { + defer func() { done <- struct{}{} }() + defer stream.Close() + + buf := make([]byte, 32*1024) + for { + select { + case <-s.ctx.Done(): + return + default: + localConn.SetReadDeadline(time.Now().Add(30 * time.Second)) + n, err := localConn.Read(buf) + if err != nil { + if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() { + s.logger.Debugf("failed to read from local connection: %v", err) + } + return + } + localConn.SetReadDeadline(time.Time{}) + + if s.config.Sniffer { + s.usageMonitor.AddOrUpdatePort(port, uint64(n)) + } + + _, err = stream.Write(buf[:n]) + if err != nil { + s.logger.Debugf("failed to write to QUIC stream: %v", err) + return + } + } + } + }() + + + go func() { + defer func() { done <- struct{}{} }() + defer localConn.Close() + + buf := make([]byte, 32*1024) + for { + select { + case <-s.ctx.Done(): + return + default: + stream.SetReadDeadline(time.Now().Add(30 * time.Second)) + n, err := stream.Read(buf) + if err != nil { + s.logger.Debugf("failed to read from QUIC stream: %v", err) + return + } + stream.SetReadDeadline(time.Time{}) + + if s.config.Sniffer { + s.usageMonitor.AddOrUpdatePort(port, uint64(n)) + } + + _, err = localConn.Write(buf[:n]) + if err != nil { + s.logger.Debugf("failed to write to local connection: %v", err) + return + } + } + } + }() + + + <-done +} + + + + +func receiveBinaryByte(stream *quic.Stream) (byte, error) { + buf := make([]byte, 1) + _, err := io.ReadFull(stream, buf) + if err != nil { + return 0, err + } + return buf[0], nil +} + + +func sendBinaryByte(stream *quic.Stream, data byte) error { + buf := []byte{data} + _, err := stream.Write(buf) + return err +} + + +func receiveBinaryTransportString(stream *quic.Stream) (string, byte, error) { + + header := make([]byte, 3) + _, err := io.ReadFull(stream, header) + if err != nil { + return "", 0, err + } + + + strLen := binary.BigEndian.Uint16(header[:2]) + transport := header[2] + + + if strLen > 0 { + strData := make([]byte, strLen) + _, err = io.ReadFull(stream, strData) + if err != nil { + return "", 0, err + } + return string(strData), transport, nil + } + + return "", transport, nil +} + + +func sendBinaryTransportString(stream *quic.Stream, data string, transport byte) error { + strLen := len(data) + if strLen > 65535 { + return fmt.Errorf("string too long: %d", strLen) + } + + + header := make([]byte, 3) + binary.BigEndian.PutUint16(header[:2], uint16(strLen)) + header[2] = transport + + + _, err := stream.Write(header) + if err != nil { + return err + } + + + if strLen > 0 { + _, err = stream.Write([]byte(data)) + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/utils/network/quic_dialer.go b/internal/utils/network/quic_dialer.go new file mode 100644 index 0000000..65e3dec --- /dev/null +++ b/internal/utils/network/quic_dialer.go @@ -0,0 +1,58 @@ +package network + +import ( + "context" + "crypto/tls" + "fmt" + "time" + + "github.com/quic-go/quic-go" +) + +func QUICDialer(ctx context.Context, remoteAddress string, timeout time.Duration, retry int) (*quic.Conn, error) { + var conn *quic.Conn + var err error + + retries := retry + backoff := 1 * time.Second + + for i := 0; i < retries; i++ { + conn, err = attemptQUICDialer(ctx, remoteAddress, timeout) + if err == nil { + return conn, nil + } + + if i == retries-1 { + break + } + + time.Sleep(backoff) + backoff *= 2 + } + + return nil, err +} + +func attemptQUICDialer(ctx context.Context, remoteAddress string, timeout time.Duration) (*quic.Conn, error) { + dialCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{"backhaul-quic"}, + } + + quicConfig := &quic.Config{ + MaxIdleTimeout: 30 * time.Second, + MaxIncomingStreams: 1000, + MaxIncomingUniStreams: 1000, + KeepAlivePeriod: 15 * time.Second, + } + + conn, err := quic.DialAddr(dialCtx, remoteAddress, tlsConfig, quicConfig) + if err != nil { + return nil, fmt.Errorf("QUIC dial failed: %v", err) + } + + return conn, nil +} diff --git a/internal/utils/network/quic_listener.go b/internal/utils/network/quic_listener.go new file mode 100644 index 0000000..854409c --- /dev/null +++ b/internal/utils/network/quic_listener.go @@ -0,0 +1,83 @@ +package network + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "fmt" + "math/big" + "net" + "time" + + "github.com/quic-go/quic-go" +) + +func QUICListen(address string, tlsCertFile, tlsKeyFile string) (*quic.Listener, error) { + var tlsConfig *tls.Config + if tlsCertFile != "" && tlsKeyFile != "" { + cert, err := tls.LoadX509KeyPair(tlsCertFile, tlsKeyFile) + if err != nil { + return nil, fmt.Errorf("failed to load TLS certificate: %v", err) + } + tlsConfig = &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"backhaul-quic"}, + } + } else { + tlsConfig = generateSelfSignedCert() + } + + quicConfig := &quic.Config{ + MaxIdleTimeout: 30 * time.Second, + MaxIncomingStreams: 1000, + MaxIncomingUniStreams: 1000, + KeepAlivePeriod: 15 * time.Second, + } + + listener, err := quic.ListenAddr(address, tlsConfig, quicConfig) + if err != nil { + return nil, fmt.Errorf("failed to start QUIC listener: %v", err) + } + + return listener, nil +} + +func generateSelfSignedCert() *tls.Config { + return &tls.Config{ + Certificates: []tls.Certificate{generateCert()}, + NextProtos: []string{"backhaul-quic"}, + } +} + +func generateCert() tls.Certificate { + priv, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return tls.Certificate{} + } + + template := x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + Organization: []string{"Backhaul QUIC"}, + }, + NotBefore: time.Now(), + NotAfter: time.Now().Add(365 * 24 * time.Hour), + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1), net.IPv6loopback}, + DNSNames: []string{"localhost"}, + } + + certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) + if err != nil { + return tls.Certificate{} + } + + return tls.Certificate{ + Certificate: [][]byte{certDER}, + PrivateKey: priv, + } +} From 62430624283d3df6a43c62104262a75ed73549c1 Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 00:20:12 +0330 Subject: [PATCH 02/11] Fix goreleaser config for fork repository --- .goreleaser.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 0597f20..013f55d 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -23,8 +23,8 @@ archives: release: github: - owner: Musixal - name: backhaul + owner: EbadiDev + name: Backhaul checksum: name_template: "checksums.txt" changelog: From 08fb3335cf758c67941b3833b2b15b78c8de5818 Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 00:20:25 +0330 Subject: [PATCH 03/11] Fix goreleaser config for fork repository --- .goreleaser.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 013f55d..fb34f27 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -24,7 +24,7 @@ archives: release: github: owner: EbadiDev - name: Backhaul + name: backhaul checksum: name_template: "checksums.txt" changelog: From 97d9917f4a679ed4ce174cee886eee5b48a7e811 Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 00:21:24 +0330 Subject: [PATCH 04/11] chore(release): v0.7.3 --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 71546bd..dff3b41 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,7 @@ var ( ) // Define the version of the application -const version = "v0.7.2" +const version = "v0.7.3" func main() { configPath = flag.String("c", "", "path to the configuration file (TOML format)") From 91524009f566f1b0edd0ba1324bc0ca01ac57a93 Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 00:32:51 +0330 Subject: [PATCH 05/11] update dial_timeout so initial connection have more time --- internal/utils/network/quic_dialer.go | 5 +++-- internal/utils/network/quic_listener.go | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/utils/network/quic_dialer.go b/internal/utils/network/quic_dialer.go index 65e3dec..f1cfc18 100644 --- a/internal/utils/network/quic_dialer.go +++ b/internal/utils/network/quic_dialer.go @@ -43,10 +43,11 @@ func attemptQUICDialer(ctx context.Context, remoteAddress string, timeout time.D } quicConfig := &quic.Config{ - MaxIdleTimeout: 30 * time.Second, + MaxIdleTimeout: 60 * time.Second, MaxIncomingStreams: 1000, MaxIncomingUniStreams: 1000, - KeepAlivePeriod: 15 * time.Second, + KeepAlivePeriod: 10 * time.Second, + HandshakeIdleTimeout: 10 * time.Second, } conn, err := quic.DialAddr(dialCtx, remoteAddress, tlsConfig, quicConfig) diff --git a/internal/utils/network/quic_listener.go b/internal/utils/network/quic_listener.go index 854409c..cf06097 100644 --- a/internal/utils/network/quic_listener.go +++ b/internal/utils/network/quic_listener.go @@ -30,10 +30,11 @@ func QUICListen(address string, tlsCertFile, tlsKeyFile string) (*quic.Listener, } quicConfig := &quic.Config{ - MaxIdleTimeout: 30 * time.Second, + MaxIdleTimeout: 60 * time.Second, MaxIncomingStreams: 1000, MaxIncomingUniStreams: 1000, - KeepAlivePeriod: 15 * time.Second, + KeepAlivePeriod: 10 * time.Second, + HandshakeIdleTimeout: 10 * time.Second, } listener, err := quic.ListenAddr(address, tlsConfig, quicConfig) From 041873a55aebf4e655dd40d0e29adaa202f40279 Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 01:01:06 +0330 Subject: [PATCH 06/11] fix channelhandsahke -> first quic stream then direct data forwarding --- internal/server/transport/quic.go | 108 +++++++++++++++--------------- 1 file changed, 55 insertions(+), 53 deletions(-) diff --git a/internal/server/transport/quic.go b/internal/server/transport/quic.go index ffea180..99bfff5 100644 --- a/internal/server/transport/quic.go +++ b/internal/server/transport/quic.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net" - "runtime" "strconv" "strings" "sync" @@ -87,19 +86,10 @@ func (s *QuicTransport) Start() { if s.controlChannel != nil { s.config.TunnelStatus = "Connected (QUIC)" - numCPU := runtime.NumCPU() - if numCPU > 4 { - numCPU = 4 - } - go s.parsePortMappings() go s.channelHandler() - s.logger.Infof("starting %d handle loops on each CPU thread", numCPU) - - for i := 0; i < numCPU; i++ { - go s.handleLoop() - } + s.logger.Info("QUIC server ready - data streams will be handled directly") } } @@ -315,6 +305,9 @@ func (s *QuicTransport) acceptQUICConn(listener *quic.Listener) { func (s *QuicTransport) handleQUICConnection(conn *quic.Conn) { defer conn.CloseWithError(0, "connection handler finished") + + isFirstStream := true + for { select { case <-s.ctx.Done(): @@ -327,17 +320,62 @@ func (s *QuicTransport) handleQUICConnection(conn *quic.Conn) { return } - select { - case s.streamChannel <- stream: - s.logger.Debugf("accepted new QUIC stream from %s", conn.RemoteAddr().String()) - default: - s.logger.Warnf("stream channel is full, discarding QUIC stream from %s", conn.RemoteAddr().String()) - stream.Close() + if isFirstStream && s.controlChannel == nil { + + isFirstStream = false + select { + case s.streamChannel <- stream: + s.logger.Debugf("accepted control channel stream from %s", conn.RemoteAddr().String()) + default: + s.logger.Warnf("stream channel is full, discarding control stream from %s", conn.RemoteAddr().String()) + stream.Close() + } + } else { + + s.logger.Debugf("accepted data stream from %s", conn.RemoteAddr().String()) + go s.handleDataStream(stream) } } } } +func (s *QuicTransport) handleDataStream(stream *quic.Stream) { + defer stream.Close() + + + _, transport, err := receiveBinaryTransportString(stream) + if err != nil { + s.logger.Debugf("failed to receive remote address from data stream: %v", err) + return + } + + if transport != utils.SG_TCP { + s.logger.Debugf("unsupported transport type received: %v", transport) + return + } + + + select { + case localConn := <-s.localChannel: + + if time.Now().UnixMilli()-localConn.timeCreated > 3000 { + s.logger.Debugf("local connection timed out: %d ms", time.Now().UnixMilli()-localConn.timeCreated) + localConn.conn.Close() + return + } + + s.logger.Debugf("matching data stream with local connection for %s", localConn.remoteAddr) + + + go s.handleQUICStream(localConn.conn, stream, localConn.conn.LocalAddr().(*net.TCPAddr).Port) + + default: + + s.logger.Debug("no waiting local connection for data stream") + return + } +} + func (s *QuicTransport) parsePortMappings() { for _, portMapping := range s.config.Ports { parts := strings.Split(portMapping, "=") @@ -488,42 +526,6 @@ func (s *QuicTransport) acceptLocalConn(listener net.Listener, remoteAddr string } } -func (s *QuicTransport) handleLoop() { - for { - select { - case <-s.ctx.Done(): - return - case localConn := <-s.localChannel: - loop: - for { - if time.Now().UnixMilli()-localConn.timeCreated > 3000 { - s.logger.Debugf("timeouted local connection: %d ms", time.Now().UnixMilli()-localConn.timeCreated) - localConn.conn.Close() - break loop - } - - select { - case <-s.ctx.Done(): - return - - case stream := <-s.streamChannel: - - if err := sendBinaryTransportString(stream, localConn.remoteAddr, utils.SG_TCP); err != nil { - s.logger.Errorf("%v", err) - stream.Close() - continue loop - } - - - go s.handleQUICStream(localConn.conn, stream, localConn.conn.LocalAddr().(*net.TCPAddr).Port) - break loop - } - } - } - } -} - - func (s *QuicTransport) handleQUICStream(localConn net.Conn, stream *quic.Stream, port int) { defer stream.Close() defer localConn.Close() From e58d07112df6b0dc4a7fd5f18e8b1cac3ef5646e Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 01:15:50 +0330 Subject: [PATCH 07/11] fixed stream routing, fixed deadlock, fixed stream cleanup --- internal/server/transport/quic.go | 36 +++++++++++++++++-------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/internal/server/transport/quic.go b/internal/server/transport/quic.go index 99bfff5..f6140ae 100644 --- a/internal/server/transport/quic.go +++ b/internal/server/transport/quic.go @@ -320,7 +320,9 @@ func (s *QuicTransport) handleQUICConnection(conn *quic.Conn) { return } - if isFirstStream && s.controlChannel == nil { + s.logger.Debugf(" STREAM RECEIVED: isFirstStream=%v, controlChannel=%v", isFirstStream, s.controlChannel != nil) + + if isFirstStream { isFirstStream = false select { @@ -332,7 +334,7 @@ func (s *QuicTransport) handleQUICConnection(conn *quic.Conn) { } } else { - s.logger.Debugf("accepted data stream from %s", conn.RemoteAddr().String()) + s.logger.Debugf("RECEIVED DATA STREAM from %s", conn.RemoteAddr().String()) go s.handleDataStream(stream) } } @@ -340,19 +342,8 @@ func (s *QuicTransport) handleQUICConnection(conn *quic.Conn) { } func (s *QuicTransport) handleDataStream(stream *quic.Stream) { - defer stream.Close() - + s.logger.Debugf("handleDataStream called!") - _, transport, err := receiveBinaryTransportString(stream) - if err != nil { - s.logger.Debugf("failed to receive remote address from data stream: %v", err) - return - } - - if transport != utils.SG_TCP { - s.logger.Debugf("unsupported transport type received: %v", transport) - return - } select { @@ -361,17 +352,30 @@ func (s *QuicTransport) handleDataStream(stream *quic.Stream) { if time.Now().UnixMilli()-localConn.timeCreated > 3000 { s.logger.Debugf("local connection timed out: %d ms", time.Now().UnixMilli()-localConn.timeCreated) localConn.conn.Close() + stream.Close() + return + } + + s.logger.Debugf(" matching data stream with local connection for %s", localConn.remoteAddr) + + + err := sendBinaryTransportString(stream, localConn.remoteAddr, utils.SG_TCP) + if err != nil { + s.logger.Errorf("failed to send remote address to client: %v", err) + localConn.conn.Close() + stream.Close() return } - s.logger.Debugf("matching data stream with local connection for %s", localConn.remoteAddr) + s.logger.Debugf(" sent remote address %s to client, starting data forwarding", localConn.remoteAddr) go s.handleQUICStream(localConn.conn, stream, localConn.conn.LocalAddr().(*net.TCPAddr).Port) default: - s.logger.Debug("no waiting local connection for data stream") + s.logger.Debug(" no waiting local connection for data stream") + stream.Close() return } } From 2d433285b7d8f883398d69857ecce06cb26379ff Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 01:39:47 +0330 Subject: [PATCH 08/11] fix client streams were opened and blcoked immediately --- internal/client/transport/quic.go | 9 ++- internal/server/transport/quic.go | 112 +++++++++++++++++++++++++++--- 2 files changed, 110 insertions(+), 11 deletions(-) diff --git a/internal/client/transport/quic.go b/internal/client/transport/quic.go index db645b6..b2937db 100644 --- a/internal/client/transport/quic.go +++ b/internal/client/transport/quic.go @@ -330,13 +330,20 @@ func (c *QuicTransport) channelHandler() { func (c *QuicTransport) streamDialer() { c.logger.Debugf("initiating new QUIC stream to tunnel server") - stream, err := c.quicConn.OpenStreamSync(c.ctx) if err != nil { c.logger.Error("failed to open QUIC stream: ", err) return } + // Send "ready for work" signal to server + err = sendBinaryByte(stream, utils.SG_Chan) + if err != nil { + c.logger.Error("failed to send ready signal: ", err) + stream.Close() + return + } + atomic.AddInt32(&c.poolConnections, 1) diff --git a/internal/server/transport/quic.go b/internal/server/transport/quic.go index f6140ae..d3968ee 100644 --- a/internal/server/transport/quic.go +++ b/internal/server/transport/quic.go @@ -26,6 +26,7 @@ type QuicTransport struct { cancel context.CancelFunc logger *logrus.Logger streamChannel chan *quic.Stream + tunnelChannel chan *quic.Stream localChannel chan LocalTCPConn reqNewConnChan chan struct{} controlChannel *quic.Stream @@ -61,6 +62,7 @@ func NewQUICServer(parentCtx context.Context, config *QuicConfig, logger *logrus cancel: cancel, logger: logger, streamChannel: make(chan *quic.Stream, config.ChannelSize), + tunnelChannel: make(chan *quic.Stream, config.ChannelSize), localChannel: make(chan LocalTCPConn, config.ChannelSize), reqNewConnChan: make(chan struct{}, config.ChannelSize), controlChannel: nil, @@ -123,6 +125,7 @@ func (s *QuicTransport) Restart() { s.streamChannel = make(chan *quic.Stream, s.config.ChannelSize) + s.tunnelChannel = make(chan *quic.Stream, s.config.ChannelSize) s.localChannel = make(chan LocalTCPConn, s.config.ChannelSize) s.reqNewConnChan = make(chan struct{}, s.config.ChannelSize) s.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", s.config.WebPort), ctx, s.config.SnifferLog, s.config.Sniffer, &s.config.TunnelStatus, s.logger) @@ -313,14 +316,14 @@ func (s *QuicTransport) handleQUICConnection(conn *quic.Conn) { case <-s.ctx.Done(): return default: - + s.logger.Debugf("waiting for stream from %s...", conn.RemoteAddr().String()) stream, err := conn.AcceptStream(s.ctx) if err != nil { s.logger.Debugf("failed to accept stream from %s: %v", conn.RemoteAddr().String(), err) return } - s.logger.Debugf(" STREAM RECEIVED: isFirstStream=%v, controlChannel=%v", isFirstStream, s.controlChannel != nil) + s.logger.Debugf("stream received: isFirstStream=%v, controlChannel=%v", isFirstStream, s.controlChannel != nil) if isFirstStream { @@ -333,30 +336,74 @@ func (s *QuicTransport) handleQUICConnection(conn *quic.Conn) { stream.Close() } } else { + s.logger.Debugf("received data stream from %s", conn.RemoteAddr().String()) + - s.logger.Debugf("RECEIVED DATA STREAM from %s", conn.RemoteAddr().String()) - go s.handleDataStream(stream) + go func(stream *quic.Stream) { + + signal, err := receiveBinaryByte(stream) + if err != nil { + s.logger.Debugf("failed to receive ready signal from client: %v", err) + stream.Close() + return + } + + if signal != utils.SG_Chan { + s.logger.Debugf("unexpected signal from client: %d", signal) + stream.Close() + return + } + + + timeout := time.NewTimer(10 * time.Second) + defer timeout.Stop() + + select { + case localConn := <-s.localChannel: + + err := sendBinaryTransportString(stream, localConn.remoteAddr, utils.SG_TCP) + if err != nil { + s.logger.Errorf("failed to send remote address to client: %v", err) + localConn.conn.Close() + stream.Close() + return + } + + + go s.handleQUICStream(localConn.conn, stream, localConn.conn.LocalAddr().(*net.TCPAddr).Port) + + case <-timeout.C: + s.logger.Debugf("timeout waiting for local connection, closing stream") + stream.Close() + + case <-s.ctx.Done(): + stream.Close() + } + }(stream) } } } } func (s *QuicTransport) handleDataStream(stream *quic.Stream) { - s.logger.Debugf("handleDataStream called!") + s.logger.Debugf("� handleDataStream called for stream from %s!", stream.Context().Value("remoteAddr")) + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + select { case localConn := <-s.localChannel: - if time.Now().UnixMilli()-localConn.timeCreated > 3000 { + if time.Now().UnixMilli()-localConn.timeCreated > 5000 { s.logger.Debugf("local connection timed out: %d ms", time.Now().UnixMilli()-localConn.timeCreated) localConn.conn.Close() stream.Close() return } - s.logger.Debugf(" matching data stream with local connection for %s", localConn.remoteAddr) + s.logger.Debugf("🎯 matching data stream with local connection for %s", localConn.remoteAddr) err := sendBinaryTransportString(stream, localConn.remoteAddr, utils.SG_TCP) @@ -367,19 +414,64 @@ func (s *QuicTransport) handleDataStream(stream *quic.Stream) { return } - s.logger.Debugf(" sent remote address %s to client, starting data forwarding", localConn.remoteAddr) + s.logger.Debugf("✅ sent remote address %s to client, starting data forwarding", localConn.remoteAddr) go s.handleQUICStream(localConn.conn, stream, localConn.conn.LocalAddr().(*net.TCPAddr).Port) - default: + case <-timeout.C: + + s.logger.Debug("⏰ timeout waiting for local connection, closing data stream") + stream.Close() + return + + case <-s.ctx.Done(): - s.logger.Debug(" no waiting local connection for data stream") stream.Close() return } } +func (s *QuicTransport) handleLoop() { + for { + select { + case <-s.ctx.Done(): + return + case localConn := <-s.localChannel: + loop: + for { + if time.Now().UnixMilli()-localConn.timeCreated > 3000 { + s.logger.Debugf("timeouted local connection: %d ms", time.Now().UnixMilli()-localConn.timeCreated) + localConn.conn.Close() + break loop + } + + select { + case <-s.ctx.Done(): + return + + case tunnelStream := <-s.tunnelChannel: + + if err := sendBinaryTransportString(tunnelStream, localConn.remoteAddr, utils.SG_TCP); err != nil { + s.logger.Errorf("failed to send remote address to client: %v", err) + tunnelStream.Close() + continue loop + } + + s.logger.Debugf("✅ matched local connection %s with tunnel stream, starting data forwarding", localConn.remoteAddr) + + + go s.handleQUICStream(localConn.conn, tunnelStream, localConn.conn.LocalAddr().(*net.TCPAddr).Port) + break loop + + default: + time.Sleep(50 * time.Millisecond) + } + } + } + } +} + func (s *QuicTransport) parsePortMappings() { for _, portMapping := range s.config.Ports { parts := strings.Split(portMapping, "=") From 0aa6c07347f73d9092074f1ecfb103dfe942ad1a Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 02:02:01 +0330 Subject: [PATCH 09/11] stream count limiting, rate limit for stream creation only for quic --- internal/client/transport/quic.go | 48 +++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/internal/client/transport/quic.go b/internal/client/transport/quic.go index b2937db..d84432f 100644 --- a/internal/client/transport/quic.go +++ b/internal/client/transport/quic.go @@ -32,6 +32,7 @@ type QuicTransport struct { poolConnections int32 loadConnections int32 controlFlow chan struct{} + streamRateLimit chan struct{} } type QuicConfig struct { @@ -64,6 +65,7 @@ func NewQUICClient(parentCtx context.Context, config *QuicConfig, logger *logrus poolConnections: 0, loadConnections: 0, controlFlow: make(chan struct{}, 100), + streamRateLimit: make(chan struct{}, 5), } return client @@ -195,8 +197,19 @@ func (c *QuicTransport) channelDialer() { } func (c *QuicTransport) poolMaintainer() { + for i := 0; i < c.config.ConnPoolSize; i++ { - go c.streamDialer() + select { + case c.streamRateLimit <- struct{}{}: + go func() { + defer func() { <-c.streamRateLimit }() + c.streamDialer() + }() + default: + c.logger.Debugf("rate limit reached during initial pool creation") + time.Sleep(100 * time.Millisecond) + i-- + } } @@ -246,7 +259,15 @@ func (c *QuicTransport) poolMaintainer() { newPoolSize++ - go c.streamDialer() + select { + case c.streamRateLimit <- struct{}{}: + go func() { + defer func() { <-c.streamRateLimit }() + c.streamDialer() + }() + default: + c.logger.Debugf("rate limit reached in pool optimization, skipping stream creation") + } } else if float64(loadConnections+x) < float64(poolConnectionsAvg)*y && newPoolSize > c.config.ConnPoolSize { c.logger.Debugf("decreasing pool size: %d -> %d, avg pool conn: %d, avg load conn: %d", newPoolSize, newPoolSize-1, poolConnectionsAvg, loadConnections) newPoolSize-- @@ -293,12 +314,27 @@ func (c *QuicTransport) channelHandler() { case utils.SG_Chan: atomic.AddInt32(&c.loadConnections, 1) + + activeStreams := atomic.LoadInt32(&c.poolConnections) + if activeStreams >= int32(c.config.ConnPoolSize*2) { + c.logger.Debugf("too many active streams (%d), skipping new stream creation", activeStreams) + atomic.AddInt32(&c.loadConnections, -1) + continue + } + select { case <-c.controlFlow: - - default: + + case c.streamRateLimit <- struct{}{}: + c.logger.Debug("channel signal received, initiating stream dialer") - go c.streamDialer() + go func() { + defer func() { <-c.streamRateLimit }() + c.streamDialer() + }() + default: + c.logger.Debugf("rate limit reached, skipping stream creation") + atomic.AddInt32(&c.loadConnections, -1) } case utils.SG_HB: @@ -336,7 +372,7 @@ func (c *QuicTransport) streamDialer() { return } - // Send "ready for work" signal to server + err = sendBinaryByte(stream, utils.SG_Chan) if err != nil { c.logger.Error("failed to send ready signal: ", err) From 77b49555b7eeca08541f7a53e0fea0f3f76ecb0a Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 02:13:28 +0330 Subject: [PATCH 10/11] remove rete limit and create stream in right away --- internal/client/transport/quic.go | 46 ++++--------------------------- 1 file changed, 6 insertions(+), 40 deletions(-) diff --git a/internal/client/transport/quic.go b/internal/client/transport/quic.go index d84432f..28e9567 100644 --- a/internal/client/transport/quic.go +++ b/internal/client/transport/quic.go @@ -32,7 +32,6 @@ type QuicTransport struct { poolConnections int32 loadConnections int32 controlFlow chan struct{} - streamRateLimit chan struct{} } type QuicConfig struct { @@ -65,7 +64,6 @@ func NewQUICClient(parentCtx context.Context, config *QuicConfig, logger *logrus poolConnections: 0, loadConnections: 0, controlFlow: make(chan struct{}, 100), - streamRateLimit: make(chan struct{}, 5), } return client @@ -198,18 +196,8 @@ func (c *QuicTransport) channelDialer() { func (c *QuicTransport) poolMaintainer() { - for i := 0; i < c.config.ConnPoolSize; i++ { - select { - case c.streamRateLimit <- struct{}{}: - go func() { - defer func() { <-c.streamRateLimit }() - c.streamDialer() - }() - default: - c.logger.Debugf("rate limit reached during initial pool creation") - time.Sleep(100 * time.Millisecond) - i-- - } + for i := 0; i < c.config.ConnPoolSize; i++ { + go c.streamDialer() } @@ -259,15 +247,7 @@ func (c *QuicTransport) poolMaintainer() { newPoolSize++ - select { - case c.streamRateLimit <- struct{}{}: - go func() { - defer func() { <-c.streamRateLimit }() - c.streamDialer() - }() - default: - c.logger.Debugf("rate limit reached in pool optimization, skipping stream creation") - } + go c.streamDialer() } else if float64(loadConnections+x) < float64(poolConnectionsAvg)*y && newPoolSize > c.config.ConnPoolSize { c.logger.Debugf("decreasing pool size: %d -> %d, avg pool conn: %d, avg load conn: %d", newPoolSize, newPoolSize-1, poolConnectionsAvg, loadConnections) newPoolSize-- @@ -314,27 +294,13 @@ func (c *QuicTransport) channelHandler() { case utils.SG_Chan: atomic.AddInt32(&c.loadConnections, 1) - - activeStreams := atomic.LoadInt32(&c.poolConnections) - if activeStreams >= int32(c.config.ConnPoolSize*2) { - c.logger.Debugf("too many active streams (%d), skipping new stream creation", activeStreams) - atomic.AddInt32(&c.loadConnections, -1) - continue - } - select { - case <-c.controlFlow: + case <-c.controlFlow: - case c.streamRateLimit <- struct{}{}: + default: c.logger.Debug("channel signal received, initiating stream dialer") - go func() { - defer func() { <-c.streamRateLimit }() - c.streamDialer() - }() - default: - c.logger.Debugf("rate limit reached, skipping stream creation") - atomic.AddInt32(&c.loadConnections, -1) + go c.streamDialer() } case utils.SG_HB: From fe9be23060248204d738c936aa3483c339d484a1 Mon Sep 17 00:00:00 2001 From: EbadiDev Date: Tue, 9 Sep 2025 02:24:10 +0330 Subject: [PATCH 11/11] revert .goreleaser.yaml for PR --- .goreleaser.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.goreleaser.yaml b/.goreleaser.yaml index fb34f27..0597f20 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -23,7 +23,7 @@ archives: release: github: - owner: EbadiDev + owner: Musixal name: backhaul checksum: name_template: "checksums.txt"