Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions rpc/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/diodechain/diode_client/config"
"github.com/diodechain/diode_client/edge"
"github.com/diodechain/diode_client/rlp"
)

var (
Expand Down Expand Up @@ -265,6 +266,16 @@ func (client *Client) isAllowlisted(port *config.Port, addr Address) bool {

// handleInboundMessage handle inbound message
func (client *Client) handleInboundMessage(msg edge.Message) {
// If message framing is broken, the payload will not be valid RLP.
// This is non-recoverable because we won't be able to find the beginning of
// the next frame again, so close the connection to prevent a loop.
if err := validateRLPMessage(msg.Buffer); err != nil {
client.Log().Error("Invalid RLP frame received, closing connection: %v", err)
client.Log().Debug("Invalid RLP data: %x", msg.Buffer)
client.Close()
return
}

if msg.IsResponse() {
defer client.timer.profile(time.Now(), "handleResponse")

Expand Down Expand Up @@ -320,6 +331,34 @@ func (client *Client) handleInboundMessage(msg edge.Message) {
client.handleInboundRequest(inboundRequest)
}

func validateRLPMessage(b []byte) error {
k, content, rest, err := rlp.Split(b)
if err != nil {
return err
}
if len(rest) != 0 {
return rlp.ErrMoreThanOneValue
}
return validateRLPValue(k, content)
}

func validateRLPValue(k rlp.Kind, content []byte) error {
if k != rlp.List {
return nil
}
for len(content) > 0 {
innerKind, innerContent, rest, err := rlp.Split(content)
if err != nil {
return err
}
if err := validateRLPValue(innerKind, innerContent); err != nil {
return err
}
content = rest
}
return nil
}

// recvMessageLoop infinite loop to read message from server
func (client *Client) recvMessageLoop() {
msgBuffer := make(chan edge.Message, 20)
Expand Down
94 changes: 94 additions & 0 deletions rpc/bridge_inbound_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Diode Network Client
// Copyright 2021 Diode
// Licensed under the Diode License, Version 1.1
package rpc

import (
"testing"

"github.com/diodechain/diode_client/config"
"github.com/diodechain/diode_client/edge"
"github.com/diodechain/diode_client/rlp"
)

func TestHandleInboundMessage_ClosesOnInvalidRLP(t *testing.T) {
cfg := testConfig()
config.AppConfig = cfg
pool := NewPool()
client := NewClient("localhost:0", nil, cfg, pool)
Comment on lines +15 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This client setup logic is repeated across TestHandleInboundMessage_ClosesOnInvalidRLP, TestHandleInboundMessage_DoesNotCloseOnValidUnsupportedRequest, and TestHandleInboundMessage_ClosesOnBrokenFrameFromLog. To improve maintainability and reduce code duplication, consider extracting this into a test helper function.

For example:

func newTestClient(t *testing.T) *Client {
	t.Helper()
	cfg := testConfig()
	config.AppConfig = cfg
	pool := NewPool()
	return NewClient("localhost:0", nil, cfg, pool)
}

Each test could then be simplified to start with client := newTestClient(t).


// 0xb8 indicates a long string with 1 byte length-of-length, but the length byte is missing.
msg := edge.Message{Len: 1, Buffer: []byte{0xb8}}
client.handleInboundMessage(msg)

if !client.Closed() {
t.Fatalf("expected client to be closed on invalid RLP frame")
}
}

func TestHandleInboundMessage_DoesNotCloseOnValidUnsupportedRequest(t *testing.T) {
cfg := testConfig()
config.AppConfig = cfg
pool := NewPool()
client := NewClient("localhost:0", nil, cfg, pool)

buf, err := rlp.EncodeToBytes([]interface{}{[]byte("not-a-known-rpc")})
if err != nil {
t.Fatalf("failed to encode rlp: %v", err)
}

msg := edge.Message{Len: len(buf), Buffer: buf}
client.handleInboundMessage(msg)

if client.Closed() {
t.Fatalf("did not expect client to be closed on a valid but unsupported request")
}
}

func TestHandleInboundMessage_ClosesOnBrokenFrameFromLog(t *testing.T) {
cfg := testConfig()
config.AppConfig = cfg
pool := NewPool()
client := NewClient("localhost:0", nil, cfg, pool)

// Reported by customer as a doom-loop trigger; parsing indicates framing is broken.
// The message contains extra/truncated data and must be treated as non-recoverable.
buf := []byte{
0xf9, 0x01, 0x89, 0x84, 0x01, 0x00, 0x00, 0x01, 0xf9, 0x01, 0x81, 0x88, 0x72, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0xf9, 0x01, 0x53, 0xec, 0x8a, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x68,
0x61, 0x73, 0x68, 0xa0, 0x00, 0x00, 0x04, 0xaf, 0x7c, 0xc9, 0x8c, 0x99, 0xd0, 0xa0, 0xa8, 0x6d,
0xaf, 0x6a, 0xbd, 0x9b, 0x16, 0xc7, 0x2d, 0xdc, 0xed, 0x51, 0x56, 0x5a, 0x86, 0x23, 0x50, 0x62,
0x24, 0xd5, 0x08, 0x57, 0xf8, 0x53, 0x8f, 0x6d, 0x69, 0x6e, 0x65, 0x72, 0x5f, 0x73, 0x69, 0x67,
0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0xb8, 0x41, 0x01, 0x4e, 0x5e, 0x3d, 0x1a, 0x8b, 0xd8, 0x3b,
0xf5, 0x90, 0x4d, 0xe9, 0xf1, 0x53, 0x58, 0x94, 0xc5, 0x40, 0xf6, 0x9c, 0x26, 0x95, 0x36, 0x07,
0x48, 0x62, 0x6d, 0xc6, 0x81, 0x0b, 0x03, 0x8b, 0x06, 0x1a, 0x6d, 0x29, 0x5b, 0xe9, 0xd9, 0x62,
0x4d, 0x0b, 0x8c, 0x6d, 0x4b, 0xba, 0x3d, 0x4e, 0xdc, 0x46, 0x4e, 0x79, 0x2f, 0xec, 0x14, 0xc7,
0xbd, 0x85, 0x77, 0xe7, 0x11, 0x6b, 0x2f, 0xc9, 0x36, 0x56, 0xe3, 0x85, 0x6e, 0x6f, 0x6e, 0x63,
0x65, 0x9c, 0x22, 0x49, 0xc2, 0xa7, 0x73, 0x26, 0x59, 0x7b, 0x2e, 0x01, 0x2e, 0x8e, 0x5f, 0xd9,
0x1a, 0x9f, 0xcf, 0x4a, 0xd0, 0x93, 0x91, 0xf4, 0xd3, 0x8b, 0x27, 0xfd, 0x32, 0x34, 0xcb, 0x86,
0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x83, 0x9a, 0x28, 0x73, 0xf0, 0x8e, 0x70, 0x72, 0x65, 0x76,
0x69, 0x6f, 0x75, 0x73, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0xa0, 0x00, 0x00, 0x04, 0xad, 0xb4,
0x33, 0xa9, 0x67, 0x07, 0x8e, 0x8f, 0xd4, 0x1d, 0x5b, 0x20, 0x81, 0x31, 0x50, 0x70, 0xfb, 0x18,
0xf9, 0x6b, 0x1d, 0x9c, 0xd4, 0xa3, 0xce, 0xbf, 0x59, 0x0f, 0x8c, 0xec, 0x8a, 0x73, 0x74, 0x61,
0x74, 0x65, 0x5f, 0x68, 0x61, 0x73, 0x68, 0xa0, 0x2e, 0x49, 0x56, 0x87, 0x0c, 0xa5, 0x96, 0xa4,
0x92, 0x6d, 0x57, 0xfe, 0xa5, 0xee, 0x06, 0x8a, 0x0f, 0x6d, 0xfd, 0x5e, 0x8d, 0x62, 0xbc, 0x5b,
0x35, 0x6b, 0x80, 0x0a, 0xe4, 0x06, 0x18, 0x06, 0xcf, 0x89, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74,
0x61, 0x6d, 0x70, 0x84, 0x69, 0x35, 0x70, 0xd4, 0xf2, 0x90, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61,
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x68, 0x61, 0x73, 0x68, 0xa0, 0xe7, 0xaf, 0x85, 0xcf, 0x72,
0x40, 0xe4, 0x4d, 0x5d, 0xb3, 0x3a, 0xfa, 0x4e, 0x4e, 0x0a, 0xed, 0x10, 0x23, 0x71, 0xab, 0x7b,
0x23, 0x8c, 0xc0, 0x3f, 0xec, 0x75, 0xe0, 0x0a, 0xf9, 0x8a, 0xc2, 0xa1, 0x02, 0x10, 0x07, 0xad,
0xc7, 0xe9, 0xd3, 0x3f, 0xec, 0x5a, 0xab, 0x31, 0xf8, 0x83, 0x29, 0x30, 0xc1, 0x60, 0x1b, 0x64,
0x9a, 0x6c, 0xa0, 0x86, 0x48, 0x9a, 0x45, 0xc6, 0xe7, 0x6b, 0x3c, 0xb8, 0x32,
}

if err := validateRLPMessage(buf); err == nil {
t.Fatalf("expected invalid RLP for reported frame")
}

msg := edge.Message{Len: 398, Buffer: buf}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The message length is hardcoded as 398. It's more robust to use len(buf) to dynamically set the length. This ensures the test remains correct even if the test data in buf is modified in the future.

Suggested change
msg := edge.Message{Len: 398, Buffer: buf}
msg := edge.Message{Len: len(buf), Buffer: buf}

client.handleInboundMessage(msg)

if !client.Closed() {
t.Fatalf("expected client to be closed for reported broken frame")
}
}
Loading