From 40111696bb92c71affce9109f4786b387590b48a Mon Sep 17 00:00:00 2001 From: boxhock Date: Thu, 29 Apr 2021 00:10:08 +0200 Subject: [PATCH 1/2] Refactor Agoric integration --- blockchain/agoric.go | 144 -------------------- blockchain/agoric/agoric.go | 30 +++++ blockchain/agoric/runlog.go | 130 ++++++++++++++++++ blockchain/agoric/runlog_test.go | 80 +++++++++++ blockchain/agoric_test.go | 221 ------------------------------- blockchain/blockchain.go | 7 + subscriber/core_ws.go | 112 ++++++++++++++++ subscriber/subscriber.go | 3 + subscriber/ws.go | 36 ++--- 9 files changed, 376 insertions(+), 387 deletions(-) delete mode 100644 blockchain/agoric.go create mode 100644 blockchain/agoric/agoric.go create mode 100644 blockchain/agoric/runlog.go create mode 100644 blockchain/agoric/runlog_test.go delete mode 100644 blockchain/agoric_test.go create mode 100644 subscriber/core_ws.go diff --git a/blockchain/agoric.go b/blockchain/agoric.go deleted file mode 100644 index 8b616d3a..00000000 --- a/blockchain/agoric.go +++ /dev/null @@ -1,144 +0,0 @@ -package blockchain - -import ( - "encoding/json" - "errors" - "fmt" - "strings" - - "github.com/prometheus/client_golang/prometheus" - "github.com/smartcontractkit/chainlink/core/logger" - "github.com/smartcontractkit/external-initiator/store" - "github.com/smartcontractkit/external-initiator/subscriber" -) - -// Agoric is the identifier of this -// blockchain integration. -const Agoric = "agoric" - -// linkDecimals is the number of decimal places in $LINK -const linkDecimals = 18 - -// linkAgoricDecimals is the number of decimal places in a uaglink token -// FIXME: Ideally the same as linkDecimals. -const linkAgoricDecimals = 6 - -type agoricFilter struct { - JobID string -} - -type agoricManager struct { - endpointName string - filter agoricFilter -} - -func init() { - if linkAgoricDecimals > linkDecimals { - panic(fmt.Errorf("linkAgoricDecimals %d must be less than or equal to linkDecimals %d", linkAgoricDecimals, linkDecimals)) - } -} - -func createAgoricManager(t subscriber.Type, conf store.Subscription) (*agoricManager, error) { - if t != subscriber.WS { - return nil, errors.New("only WS connections are allowed for Agoric") - } - - return &agoricManager{ - endpointName: conf.EndpointName, - filter: agoricFilter{ - JobID: conf.Job, - }, - }, nil -} - -func (sm agoricManager) GetTriggerJson() []byte { - return nil -} - -type agoricEvent struct { - Type string `json:"type"` - Data json.RawMessage `json:"data"` -} - -type agoricOnQueryData struct { - QueryID string `json:"queryId"` - Query json.RawMessage `json:"query"` - Fee int64 `json:"fee"` -} - -type chainlinkQuery struct { - JobID string `json:"jobId"` - Params map[string]interface{} `json:"params"` -} - -func (sm *agoricManager) ParseResponse(data []byte) ([]subscriber.Event, bool) { - promLastSourcePing.With(prometheus.Labels{"endpoint": sm.endpointName, "jobid": string(sm.filter.JobID)}).SetToCurrentTime() - - var agEvent agoricEvent - err := json.Unmarshal(data, &agEvent) - if err != nil { - logger.Error("Failed parsing agoricEvent:", err) - return nil, false - } - - var subEvents []subscriber.Event - - switch agEvent.Type { - case "oracleServer/onQuery": - // Do this below. - break - case "oracleServer/onError": - case "oracleServer/onReply": - return nil, false - default: - // We don't need something so noisy. - // logger.Error("Unimplemented message type:", agEvent.Type) - return nil, false - } - - var onQueryData agoricOnQueryData - err = json.Unmarshal(agEvent.Data, &onQueryData) - if err != nil { - logger.Error("Failed parsing queryData:", err) - return nil, false - } - - var query chainlinkQuery - err = json.Unmarshal(onQueryData.Query, &query) - if err != nil { - logger.Error("Failed parsing chainlink query:", err) - return nil, false - } - - // Check that the job ID matches. - if query.JobID != sm.filter.JobID { - return subEvents, true - } - - var requestParams map[string]interface{} - if query.Params == nil { - requestParams = make(map[string]interface{}) - } else { - requestParams = query.Params - } - requestParams["request_id"] = onQueryData.QueryID - requestParams["payment"] = fmt.Sprint(onQueryData.Fee) + - strings.Repeat("0", linkDecimals-linkAgoricDecimals) - - _, err = json.Marshal(requestParams) - if err != nil { - logger.Error(err) - return nil, false - } - // subEvents = append(subEvents, event) - - return subEvents, true -} - -func (sm *agoricManager) GetTestJson() []byte { - return nil -} - -func (sm *agoricManager) ParseTestResponse(data []byte) error { - return nil -} diff --git a/blockchain/agoric/agoric.go b/blockchain/agoric/agoric.go new file mode 100644 index 00000000..0abaf6fd --- /dev/null +++ b/blockchain/agoric/agoric.go @@ -0,0 +1,30 @@ +package agoric + +import ( + "github.com/smartcontractkit/external-initiator/store" + "github.com/smartcontractkit/external-initiator/subscriber" +) + +const Name = "agoric" + +type manager struct { + jobid string + + conn *subscriber.WebsocketConnection +} + +func createManager(sub store.Subscription) (*manager, error) { + conn, err := subscriber.NewCoreWebsocketConnection(sub.Endpoint) + if err != nil { + return nil, err + } + + return &manager{ + jobid: sub.Job, + conn: conn, + }, nil +} + +func (m manager) Stop() { + m.conn.Stop() +} diff --git a/blockchain/agoric/runlog.go b/blockchain/agoric/runlog.go new file mode 100644 index 00000000..6ed435cd --- /dev/null +++ b/blockchain/agoric/runlog.go @@ -0,0 +1,130 @@ +package agoric + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/smartcontractkit/external-initiator/blockchain/common" + "github.com/smartcontractkit/external-initiator/store" + + "github.com/smartcontractkit/chainlink/core/logger" +) + +// linkDecimals is the number of decimal places in $LINK +// This value must be greater than linkAgoricDecimals +const linkDecimals = 18 + +// linkAgoricDecimals is the number of decimal places in a uaglink token +// FIXME: Ideally the same as linkDecimals. +// This value must be lower than linkDecimals +const linkAgoricDecimals = 6 + +var ( + errNoJobMatch = errors.New("event did not match a job") +) + +type runlogManager struct { + *manager +} + +func CreateRunlogManager(sub store.Subscription) (*runlogManager, error) { + manager, err := createManager(sub) + if err != nil { + return nil, err + } + + return &runlogManager{ + manager: manager, + }, nil +} + +func (r runlogManager) SubscribeEvents(ctx context.Context, ch chan<- common.RunlogRequest) error { + msgs := make(chan []byte) + go r.conn.Read(msgs) + + go func() { + for { + select { + case msg := <-msgs: + req, err := r.parseRequests(msg) + if err == errNoJobMatch { + continue + } else if err != nil { + logger.Error(err) + continue + } + ch <- req + case <-ctx.Done(): + return + } + } + }() + + return nil +} + +func (r runlogManager) CreateJobRun(request common.RunlogRequest) map[string]interface{} { + // This implementation does not need to make any changes + // to the request payload. + return request +} + +type agoricEvent struct { + Type string `json:"type"` + Data json.RawMessage `json:"data"` +} + +type agoricOnQueryData struct { + QueryID string `json:"queryId"` + Query json.RawMessage `json:"query"` + Fee int64 `json:"fee"` +} + +type chainlinkQuery struct { + JobID string `json:"jobid"` + Params map[string]interface{} `json:"params"` +} + +func (r runlogManager) parseRequests(data []byte) (common.RunlogRequest, error) { + var agEvent agoricEvent + err := json.Unmarshal(data, &agEvent) + if err != nil { + return nil, err + } + + if agEvent.Type != "oracleServer/onQuery" { + return nil, errNoJobMatch + } + + var onQueryData agoricOnQueryData + err = json.Unmarshal(agEvent.Data, &onQueryData) + if err != nil { + return nil, err + } + + var query chainlinkQuery + err = json.Unmarshal(onQueryData.Query, &query) + if err != nil { + return nil, err + } + + // Check that the job ID matches. + if query.JobID != r.jobid { + return nil, errNoJobMatch + } + + var requestParams map[string]interface{} + if query.Params == nil { + requestParams = make(map[string]interface{}) + } else { + requestParams = query.Params + } + requestParams["request_id"] = onQueryData.QueryID + requestParams["payment"] = fmt.Sprint(onQueryData.Fee) + + strings.Repeat("0", linkDecimals-linkAgoricDecimals) + + return requestParams, nil +} diff --git a/blockchain/agoric/runlog_test.go b/blockchain/agoric/runlog_test.go new file mode 100644 index 00000000..96221785 --- /dev/null +++ b/blockchain/agoric/runlog_test.go @@ -0,0 +1,80 @@ +package agoric + +import ( + "github.com/smartcontractkit/external-initiator/blockchain/common" + "reflect" + "testing" +) + +func TestRunlogManager_ParseRequests(t *testing.T) { + type fields struct { + jobid string + } + type args struct { + data []byte + } + tests := []struct { + name string + fields fields + args args + want common.RunlogRequest + wantErr bool + }{ + { + "fails parsing invalid payload", + fields{}, + args{data: []byte(`invalid`)}, + nil, + true, + }, + { + "fails parsing invalid WS body", + fields{}, + args{data: []byte(`{}`)}, + nil, + true, + }, + { + "fails parsing invalid WS type", + fields{}, + args{data: []byte(`{"type":"oracleServer/wrongType"}`)}, + nil, + true, + }, + { + "successfully parses WS Oracle request", + fields{jobid: "9999"}, + args{data: []byte(`{"type":"oracleServer/onQuery","data":{"query":{"jobID":"9999","params":{"path":"foo"}},"queryId":"123","fee":191919}}`)}, + common.RunlogRequest{ + "path": "foo", + "payment": "191919000000000000", + "request_id": "123", + }, + false, + }, + { + "skips unfiltered WS Oracle request", + fields{jobid: "Z9999"}, + args{data: []byte(`{"type":"oracleServer/onQuery","data":{"query":{"jobID":"9999","params":{"path":"foo"}},"queryId":"123","fee":191919}}`)}, + nil, + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := runlogManager{ + manager: &manager{ + jobid: tt.fields.jobid, + }, + } + got, err := e.parseRequests(tt.args.data) + if (err != nil) != tt.wantErr { + t.Errorf("parseRequests() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseRequests() got = %s, want %s", got, tt.want) + } + }) + } +} diff --git a/blockchain/agoric_test.go b/blockchain/agoric_test.go deleted file mode 100644 index 04f81a21..00000000 --- a/blockchain/agoric_test.go +++ /dev/null @@ -1,221 +0,0 @@ -package blockchain - -import ( - "errors" - "reflect" - "testing" - - "github.com/smartcontractkit/external-initiator/store" - "github.com/smartcontractkit/external-initiator/subscriber" -) - -func TestCreateAgoricFilterMessage(t *testing.T) { - tests := []struct { - name string - args store.AgoricSubscription - p subscriber.Type - want []byte - err error - }{ - { - "empty", - store.AgoricSubscription{}, - subscriber.WS, - nil, - nil, - }, - { - "address only", - store.AgoricSubscription{}, - subscriber.WS, - nil, - nil, - }, - { - "empty RPC", - store.AgoricSubscription{}, - subscriber.RPC, - nil, - errors.New("only WS connections are allowed for Agoric"), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mgr, err := createAgoricManager(tt.p, store.Subscription{Agoric: tt.args}) - if !reflect.DeepEqual(err, tt.err) { - t.Errorf("createAgoricManager.err = %s, want %s", err, tt.err) - } - if err == nil { - if got := mgr.GetTriggerJson(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("GetTriggerJson() = %s, want %s", got, tt.want) - } - } - }) - } - - t.Run("has invalid filter query", func(t *testing.T) { - got := agoricManager{filter: agoricFilter{JobID: "1919"}}.GetTriggerJson() - if got != nil { - t.Errorf("GetTriggerJson() = %s, want nil", got) - } - }) -} - -func TestAgoricManager_GetTestJson(t *testing.T) { - type fields struct { - filter agoricFilter - p subscriber.Type - } - tests := []struct { - name string - fields fields - want []byte - }{ - { - "returns empty when using RPC", - fields{ - p: subscriber.RPC, - }, - nil, - }, - { - "returns empty when using WS", - fields{ - p: subscriber.WS, - }, - nil, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := agoricManager{ - filter: tt.fields.filter, - } - if got := e.GetTestJson(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("GetTestJson() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestAgoricManager_ParseTestResponse(t *testing.T) { - type fields struct { - f agoricFilter - p subscriber.Type - } - type args struct { - data []byte - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - "does nothing for WS", - fields{f: agoricFilter{}, p: subscriber.WS}, - args{}, - false, - }, - { - "parses RPC responses", - fields{f: agoricFilter{}, p: subscriber.RPC}, - args{[]byte(`{"jsonrpc":"2.0","id":1,"result":"0x1"}`)}, - false, - }, - { - "fails unmarshal payload", - fields{f: agoricFilter{}, p: subscriber.RPC}, - args{[]byte(`error`)}, - false, - }, - { - "fails unmarshal result", - fields{f: agoricFilter{}, p: subscriber.RPC}, - args{[]byte(`{"jsonrpc":"2.0","id":1,"result":["0x1"]}`)}, - false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := agoricManager{ - filter: tt.fields.f, - } - if err := e.ParseTestResponse(tt.args.data); (err != nil) != tt.wantErr { - t.Errorf("ParseTestResponse() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestAgoricManager_ParseResponse(t *testing.T) { - type fields struct { - filter agoricFilter - p subscriber.Type - } - type args struct { - data []byte - } - tests := []struct { - name string - fields fields - args args - want []subscriber.Event - want1 bool - }{ - { - "fails parsing invalid payload", - fields{filter: agoricFilter{}, p: subscriber.WS}, - args{data: []byte(`invalid`)}, - nil, - false, - }, - { - "fails parsing invalid WS body", - fields{filter: agoricFilter{}, p: subscriber.WS}, - args{data: []byte(`{}`)}, - nil, - false, - }, - { - "fails parsing invalid WS type", - fields{filter: agoricFilter{}, p: subscriber.WS}, - args{data: []byte(`{"type":"oracleServer/wrongType"}`)}, - nil, - false, - }, - /*{ - "successfully parses WS Oracle request", - fields{filter: agoricFilter{JobID: "9999"}, p: subscriber.WS}, - args{data: []byte(`{"type":"oracleServer/onQuery","data":{"query":{"jobID":"9999","params":{"path":"foo"}},"queryId":"123","fee":191919}}`)}, - []subscriber.Event{map[string]interface{}{ - "path": "foo", - "patment": "191919000000000000", - "request_id": "123", - }}, - true, - },*/ - { - "skips unfiltered WS Oracle request", - fields{filter: agoricFilter{JobID: "Z9999"}, p: subscriber.WS}, - args{data: []byte(`{"type":"oracleServer/onQuery","data":{"query":{"jobID":"9999","params":{"path":"foo"}},"queryId":"123","fee":191919}}`)}, - nil, - true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := agoricManager{ - filter: tt.fields.filter, - } - got, got1 := e.ParseResponse(tt.args.data) - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("ParseResponse() got = %s, want %s", got, tt.want) - } - if got1 != tt.want1 { - t.Errorf("ParseResponse() got1 = %v, want %v", got1, tt.want1) - } - }) - } -} diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 0f8dd63f..3512a069 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" + "github.com/smartcontractkit/external-initiator/blockchain/agoric" "github.com/smartcontractkit/external-initiator/blockchain/common" "github.com/smartcontractkit/external-initiator/blockchain/conflux" "github.com/smartcontractkit/external-initiator/blockchain/ethereum" @@ -53,6 +54,8 @@ func CreateRunlogManager(sub store.Subscription) (common.RunlogManager, error) { return conflux.CreateRunlogManager(sub) case harmony.Name: return harmony.CreateRunlogManager(sub) + case agoric.Name: + return agoric.CreateRunlogManager(sub) } return nil, fmt.Errorf("unknown endpoint type: %s", sub.Endpoint.Type) } @@ -62,6 +65,7 @@ var blockchains = []string{ substrate.Name, conflux.Name, harmony.Name, + agoric.Name, } func ValidBlockchain(name string) bool { @@ -115,6 +119,9 @@ func CreateSubscription(sub store.Subscription, params Params) store.Subscriptio sub.Conflux = store.CfxSubscription{ Addresses: addresses, } + case agoric.Name: + // TODO: No data stored here? + sub.Agoric = store.AgoricSubscription{} } return sub diff --git a/subscriber/core_ws.go b/subscriber/core_ws.go new file mode 100644 index 00000000..74866f66 --- /dev/null +++ b/subscriber/core_ws.go @@ -0,0 +1,112 @@ +package subscriber + +import ( + "github.com/gorilla/websocket" + "github.com/smartcontractkit/chainlink/core/logger" + "github.com/smartcontractkit/external-initiator/store" + "sync" + "time" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Maximum message size allowed from peer. + maxMessageSize = 15 * 1024 * 1024 +) + +type WebsocketConnection struct { + endpoint string + + conn *websocket.Conn + + quitOnce sync.Once + writeMutex sync.Mutex + + chClose chan struct{} + stopped bool +} + +func NewCoreWebsocketConnection(endpoint store.Endpoint) (*WebsocketConnection, error) { + conn, _, err := websocket.DefaultDialer.Dial(endpoint.Url, nil) + if err != nil { + return nil, err + } + + wsc := &WebsocketConnection{ + endpoint: endpoint.Url, + conn: conn, + chClose: make(chan struct{}), + } + + return wsc, nil +} + +func (wsc *WebsocketConnection) Type() Type { + return CoreWS +} + +func (wsc *WebsocketConnection) Stop() { + wsc.quitOnce.Do(func() { + wsc.stopped = true + close(wsc.chClose) + }) +} + +func (wsc *WebsocketConnection) resetConnection() { + if wsc.stopped { + return + } + + attempts := 0 + for { + if wsc.stopped { + return + } + + attempts++ + + conn, _, err := websocket.DefaultDialer.Dial(wsc.endpoint, nil) + if err != nil { + logger.Error(err) + var fac time.Duration + if attempts < 5 { + fac = time.Duration(attempts * 2) + } else { + fac = 10 + } + time.Sleep(fac * time.Second) + continue + } + + wsc.conn = conn + break + } +} + +func (wsc *WebsocketConnection) Read(ch chan<- []byte) { + defer wsc.resetConnection() + + wsc.conn.SetReadLimit(maxMessageSize) + for { + _, message, err := wsc.conn.ReadMessage() + if err != nil { + // TODO: Reconnect + return + } + + ch <- message + } +} + +func (wsc *WebsocketConnection) SendMessage(payload []byte) error { + wsc.writeMutex.Lock() + defer wsc.writeMutex.Unlock() + + err := wsc.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err != nil { + return err + } + return wsc.conn.WriteMessage(websocket.TextMessage, payload) +} diff --git a/subscriber/subscriber.go b/subscriber/subscriber.go index 3adc2ad6..cd316bff 100644 --- a/subscriber/subscriber.go +++ b/subscriber/subscriber.go @@ -24,6 +24,9 @@ const ( // Client are connections encapsulated in its // entirety by the blockchain implementation. Client + // CoreWS are pure WS connections without + // any JSON-RPC support + CoreWS ) // Event is the individual event that occurs during diff --git a/subscriber/ws.go b/subscriber/ws.go index 99b8f1f6..2f1da61e 100644 --- a/subscriber/ws.go +++ b/subscriber/ws.go @@ -15,19 +15,11 @@ import ( "go.uber.org/atomic" ) -const ( - // Time allowed to write a message to the peer. - writeWait = 10 * time.Second - - // Maximum message size allowed from peer. - maxMessageSize = 15 * 1024 * 1024 -) - var ( errorRequestTimeout = errors.New("request timed out") ) -type websocketConnection struct { +type jsonRpcWebsocketConnection struct { endpoint string requests []*subscribeRequest @@ -46,13 +38,13 @@ type websocketConnection struct { stopped bool } -func NewWebsocketConnection(endpoint store.Endpoint) (*websocketConnection, error) { +func NewWebsocketConnection(endpoint store.Endpoint) (*jsonRpcWebsocketConnection, error) { conn, _, err := websocket.DefaultDialer.Dial(endpoint.Url, nil) if err != nil { return nil, err } - wsc := &websocketConnection{ + wsc := &jsonRpcWebsocketConnection{ endpoint: endpoint.Url, conn: conn, subscriptionListeners: make(map[string]chan<- json.RawMessage), @@ -66,18 +58,18 @@ func NewWebsocketConnection(endpoint store.Endpoint) (*websocketConnection, erro return wsc, nil } -func (wsc *websocketConnection) Type() Type { +func (wsc *jsonRpcWebsocketConnection) Type() Type { return WS } -func (wsc *websocketConnection) Stop() { +func (wsc *jsonRpcWebsocketConnection) Stop() { wsc.quitOnce.Do(func() { wsc.stopped = true close(wsc.chClose) }) } -func (wsc *websocketConnection) Subscribe(ctx context.Context, method, unsubscribeMethod string, params json.RawMessage, ch chan<- json.RawMessage) error { +func (wsc *jsonRpcWebsocketConnection) Subscribe(ctx context.Context, method, unsubscribeMethod string, params json.RawMessage, ch chan<- json.RawMessage) error { req := wsc.newSubscribeRequest(ctx, method, unsubscribeMethod, params, ch) err := wsc.subscribe(req) if err != nil { @@ -87,7 +79,7 @@ func (wsc *websocketConnection) Subscribe(ctx context.Context, method, unsubscri return nil } -func (wsc *websocketConnection) Request(ctx context.Context, method string, params json.RawMessage) (result json.RawMessage, err error) { +func (wsc *jsonRpcWebsocketConnection) Request(ctx context.Context, method string, params json.RawMessage) (result json.RawMessage, err error) { listener := make(chan json.RawMessage, 1) nonce := wsc.nonce.Inc() wsc.nonceListeners[nonce] = listener @@ -114,7 +106,7 @@ func (wsc *websocketConnection) Request(ctx context.Context, method string, para } } -func (wsc *websocketConnection) resetConnection() { +func (wsc *jsonRpcWebsocketConnection) resetConnection() { if wsc.stopped { return } @@ -156,7 +148,7 @@ func (wsc *websocketConnection) resetConnection() { } } -func (wsc *websocketConnection) read() { +func (wsc *jsonRpcWebsocketConnection) read() { defer wsc.resetConnection() wsc.conn.SetReadLimit(maxMessageSize) @@ -171,7 +163,7 @@ func (wsc *websocketConnection) read() { } } -func (wsc *websocketConnection) processIncomingMessage(payload json.RawMessage) { +func (wsc *jsonRpcWebsocketConnection) processIncomingMessage(payload json.RawMessage) { var msg JsonrpcMessage err := json.Unmarshal(payload, &msg) if err != nil { @@ -212,7 +204,7 @@ func (wsc *websocketConnection) processIncomingMessage(payload json.RawMessage) ch <- params.Result } -func (wsc *websocketConnection) subscribe(req *subscribeRequest) error { +func (wsc *jsonRpcWebsocketConnection) subscribe(req *subscribeRequest) error { subscriptionId, err := wsc.getSubscriptionId(req) if err != nil { return err @@ -249,7 +241,7 @@ func (wsc *websocketConnection) subscribe(req *subscribeRequest) error { return nil } -func (wsc *websocketConnection) getSubscriptionId(req *subscribeRequest) (string, error) { +func (wsc *jsonRpcWebsocketConnection) getSubscriptionId(req *subscribeRequest) (string, error) { nonce := wsc.nonce.Inc() payload, err := NewJsonrpcMessage(nonce, req.method, req.params) if err != nil { @@ -281,7 +273,7 @@ func (wsc *websocketConnection) getSubscriptionId(req *subscribeRequest) (string } } -func (wsc *websocketConnection) sendMessage(payload json.RawMessage) error { +func (wsc *jsonRpcWebsocketConnection) sendMessage(payload json.RawMessage) error { wsc.writeMutex.Lock() defer wsc.writeMutex.Unlock() @@ -302,7 +294,7 @@ type subscribeRequest struct { stopped bool } -func (wsc *websocketConnection) newSubscribeRequest(ctx context.Context, method, unsubscribeMethod string, params json.RawMessage, ch chan<- json.RawMessage) *subscribeRequest { +func (wsc *jsonRpcWebsocketConnection) newSubscribeRequest(ctx context.Context, method, unsubscribeMethod string, params json.RawMessage, ch chan<- json.RawMessage) *subscribeRequest { req := &subscribeRequest{ ctx: ctx, method: method, From f10388cb52195defd0922180c6004d21b1b17b7f Mon Sep 17 00:00:00 2001 From: aalu1418 Date: Wed, 5 May 2021 15:01:09 -0400 Subject: [PATCH 2/2] goimports formatting fixes --- blockchain/agoric/runlog_test.go | 3 ++- subscriber/core_ws.go | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/blockchain/agoric/runlog_test.go b/blockchain/agoric/runlog_test.go index 96221785..a642f7d3 100644 --- a/blockchain/agoric/runlog_test.go +++ b/blockchain/agoric/runlog_test.go @@ -1,9 +1,10 @@ package agoric import ( - "github.com/smartcontractkit/external-initiator/blockchain/common" "reflect" "testing" + + "github.com/smartcontractkit/external-initiator/blockchain/common" ) func TestRunlogManager_ParseRequests(t *testing.T) { diff --git a/subscriber/core_ws.go b/subscriber/core_ws.go index 74866f66..8598b247 100644 --- a/subscriber/core_ws.go +++ b/subscriber/core_ws.go @@ -1,11 +1,12 @@ package subscriber import ( + "sync" + "time" + "github.com/gorilla/websocket" "github.com/smartcontractkit/chainlink/core/logger" "github.com/smartcontractkit/external-initiator/store" - "sync" - "time" ) const (