From 3b628784bcf2c998737e1cc981f36ad91d8e4fb7 Mon Sep 17 00:00:00 2001 From: "Simon.Liao" Date: Fri, 23 May 2025 18:40:38 +0800 Subject: [PATCH 1/2] update UpdateMsg --- go/channel.go | 14 ++++++++++---- go/model.go | 20 +++++++++++++++----- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/go/channel.go b/go/channel.go index 2585f99..476fa4b 100644 --- a/go/channel.go +++ b/go/channel.go @@ -153,7 +153,7 @@ func (ws *WsService) readMsg() { ws.Logger.Println("reconnect success, continue read message") continue } - + ws.Logger.Println("received message:", string(rawMsg)) var msg UpdateMsg if err := json.Unmarshal(rawMsg, &msg); err != nil { continue @@ -212,9 +212,7 @@ func (ws *WsService) receiveCallMsg(channel string, msgCh chan *UpdateMsg) { func (ws *WsService) APIRequest(channel string, payload any, keyVals map[string]any) error { var err error - ws.loginOnce.Do(func() { - err = ws.login() - }) + err = ws.Login() if err != nil { return err @@ -239,6 +237,14 @@ func (ws *WsService) APIRequest(channel string, payload any, keyVals map[string] return ws.apiRequest(channel, payload, keyVals) } +func (ws *WsService) Login() error { + var err error + ws.loginOnce.Do(func() { + err = ws.login() + }) + return err +} + func (ws *WsService) login() error { if ws.conf.Key == "" || ws.conf.Secret == "" { return newAuthEmptyErr() diff --git a/go/model.go b/go/model.go index 2bfa2fe..8edd490 100644 --- a/go/model.go +++ b/go/model.go @@ -21,14 +21,24 @@ type UpdateMsg struct { Message string `json:"message"` } `json:"errs"` } `json:"data"` + RequestID string `json:"request_id"` + Ack bool `json:"ack"` } type ResponseHeader struct { - ResponseTime string `json:"response_time"` - Status string `json:"status"` - Channel string `json:"channel"` - Event string `json:"event"` - ClientID string `json:"client_id"` + ResponseTime string `json:"response_time"` + Status string `json:"status"` + Channel string `json:"channel"` + Event string `json:"event"` + ClientID string `json:"client_id"` + ConnID string `json:"conn_id"` + ConnTraceID string `json:"conn_trace_id"` + TraceID string `json:"trace_id"` + XInTime int64 `json:"x_in_time"` + XOutTime int64 `json:"x_out_time"` + XGateRatelimitRequestsRemain int `json:"x_gate_ratelimit_requests_remain"` + XGateRatelimitLimit int `json:"x_gate_ratelimit_limit"` + XGateRatelimitResetTimestamp int64 `json:"x_gate_ratelimit_reset_timestamp"` } func (u *UpdateMsg) GetChannel() string { From a1f4c3a0c27248ae84bd9ca53dd1eac9b28cbfb1 Mon Sep 17 00:00:00 2001 From: "Simon.Liao" Date: Wed, 28 May 2025 18:32:40 +0800 Subject: [PATCH 2/2] fix ping & login bug --- go/channel.go | 6 ++--- go/client.go | 65 +++++++++++++++++++++------------------------------ 2 files changed, 30 insertions(+), 41 deletions(-) diff --git a/go/channel.go b/go/channel.go index 476fa4b..3ca941c 100644 --- a/go/channel.go +++ b/go/channel.go @@ -145,15 +145,15 @@ func (ws *WsService) readMsg() { default: _, rawMsg, err := ws.Client.ReadMessage() if err != nil { - ws.Logger.Printf("websocket err: %s", err.Error()) + ws.Logger.Printf("websocket err: %s %s", err.Error(), ws.conf.Key) if e := ws.reconnect(); e != nil { ws.Logger.Printf("reconnect err:%s", err.Error()) return } - ws.Logger.Println("reconnect success, continue read message") + ws.Logger.Println("reconnect success, continue read message ", ws.conf.Key) continue } - ws.Logger.Println("received message:", string(rawMsg)) + ws.Logger.Println("received message:", string(rawMsg), ws.conf.Key) var msg UpdateMsg if err := json.Unmarshal(rawMsg, &msg); err != nil { continue diff --git a/go/client.go b/go/client.go index 9581f23..fa7825e 100644 --- a/go/client.go +++ b/go/client.go @@ -38,7 +38,7 @@ type WsService struct { // ConnConf default URL is spot websocket type ConnConf struct { App string - subscribeMsg *sync.Map + subscribeMsg sync.Map URL string Key string Secret string @@ -116,7 +116,7 @@ func NewWsService(ctx context.Context, logger *log.Logger, conf *ConnConf) (*WsS clientMu: new(sync.Mutex), } - go ws.activePing() + ws.keepAlive() return ws, nil } @@ -124,7 +124,6 @@ func NewWsService(ctx context.Context, logger *log.Logger, conf *ConnConf) (*WsS func getInitConnConf() *ConnConf { return &ConnConf{ App: "spot", - subscribeMsg: new(sync.Map), MaxRetryConn: MaxRetryConn, Key: "", Secret: "", @@ -165,7 +164,6 @@ func NewConnConfFromOption(op *ConfOptions) *ConnConf { } return &ConnConf{ App: op.App, - subscribeMsg: new(sync.Map), MaxRetryConn: op.MaxRetryConn, Key: op.Key, Secret: op.Secret, @@ -216,6 +214,11 @@ func (ws *WsService) reconnect() error { ws.status = connected + // should login when reconnect + if err := ws.login(); err != nil { + ws.Logger.Println("reconnect login err:%s", err.Error()) + } + // resubscribe after reconnect ws.conf.subscribeMsg.Range(func(key, value interface{}) bool { // key is channel, value is []requestHistory @@ -313,45 +316,31 @@ func (ws *WsService) GetConnection() *websocket.Conn { return ws.Client } -func (ws *WsService) activePing() { - du, err := time.ParseDuration(ws.conf.PingInterval) - if err != nil { - ws.Logger.Printf("failed to parse ping interval: %s, use default ping interval 10s instead", ws.conf.PingInterval) - du, err = time.ParseDuration(DefaultPingInterval) - if err != nil { - du = time.Second * 10 - } - } +func (ws *WsService) keepAlive() { + var timeout = 10 * time.Second + ticker := time.NewTicker(timeout) - ticker := time.NewTicker(du) - defer ticker.Stop() - - for { - select { - case <-ws.Ctx.Done(): - return - case <-ticker.C: - subscribeMap := map[string]int{} - ws.conf.subscribeMsg.Range(func(key, value interface{}) bool { - splits := strings.Split(key.(string), ".") - if len(splits) == 2 { - subscribeMap[splits[0]] = 1 - } - return true - }) + lastResponse := time.Now() + ws.Client.SetPongHandler(func(msg string) error { + lastResponse = time.Now() + return nil + }) - if ws.status != connected { - continue + go func() { + defer ticker.Stop() + for { + ws.mu.Lock() + err := ws.Client.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)) + ws.mu.Unlock() + if err != nil { + ws.Logger.Printf("send ping err:%s", err.Error()) } - - for app := range subscribeMap { - channel := app + ".ping" - if err := ws.Subscribe(channel, nil); err != nil { - ws.Logger.Printf("subscribe channel[%s] failed: %v", channel, err) - } + <-ticker.C + if time.Since(lastResponse) > 30*time.Second { + ws.Logger.Printf("ping timeout, should reconnect") } } - } + }() } var statusString = map[status]string{