Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 78 additions & 68 deletions pkg/relay/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"errors"
"fmt"
"math/big"
"math/rand/v2"
"strconv"
"sync"
"time"

"github.com/xssnick/tonutils-go/address"
"github.com/xssnick/tonutils-go/liteclient"
"github.com/xssnick/tonutils-go/ton"
"github.com/xssnick/tonutils-go/ton/wallet"
"github.com/xssnick/tonutils-go/tvm/cell"
Expand Down Expand Up @@ -67,6 +68,7 @@ var _ Chain = (*chain)(nil)

type cachedClient struct {
client ton.APIClientWrapped
pool *liteclient.ConnectionPool
timestamp time.Time
}

Expand All @@ -83,8 +85,8 @@ type chain struct {
lp logpoller.Service
bm services.Service

clientCache map[int]*cachedClient
cacheMu sync.RWMutex
sharedClient *cachedClient
cacheMu sync.RWMutex
}

func NewChain(cfg *config.TOMLConfig, opts ChainOpts) (Chain, error) {
Expand All @@ -108,7 +110,6 @@ func newChain(cfg *config.TOMLConfig, loopKs loop.Keystore, lggr logger.Logger,
cfg: cfg,
lggr: logger.Named(lggr, "Chain"),
ds: ds,
clientCache: make(map[int]*cachedClient),
}

// TODO(@jadepark-dev): TXM technically doesn't need SignedAPIClient, revisit to refactor
Expand Down Expand Up @@ -207,7 +208,15 @@ func (c *chain) Start(ctx context.Context) error {

func (c *chain) Close() error {
return c.starter.StopOnce("Chain", func() error {
c.lggr.Debug("Stopping txm, log poller, and balance monitor")
c.lggr.Debug("Stopping txm, log poller, balance monitor, and shared connection pool")

c.cacheMu.Lock()
if c.sharedClient != nil {
c.sharedClient.pool.Stop()
c.sharedClient = nil
}
c.cacheMu.Unlock()

return services.CloseAll(c.txm, c.lp, c.bm)
})
}
Expand Down Expand Up @@ -325,80 +334,88 @@ func (c *chain) ChainID() string {
return c.id
}

// GetClient returns a client, randomly selecting one from available and valid nodes
// GetClient returns a TON API client backed by a shared connection pool containing all
// configured liteserver nodes. The pool handles load balancing, failover, and health
// monitoring internally. The client is cached with a TTL to periodically refresh the
// trusted block and re-validate the chain ID.
func (c *chain) GetClient(ctx context.Context) (ton.APIClientWrapped, error) {
var lastErr error
nodes := c.cfg.Nodes
if len(nodes) == 0 {
return nil, errors.New("no nodes available")
}

indexes := rand.Perm(len(nodes))
// Fast path: return cached client if TTL is still valid
c.cacheMu.RLock()
if c.sharedClient != nil && time.Since(c.sharedClient.timestamp) < c.cfg.ClientTTL {
client := c.sharedClient.client
c.cacheMu.RUnlock()
return client, nil
}
c.cacheMu.RUnlock()

for _, i := range indexes {
node := nodes[i]
// Slow path: create or refresh the shared client
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

// Check cache
c.cacheMu.RLock()
entry, ok := c.clientCache[i]
c.cacheMu.RUnlock()
// Double-check after acquiring write lock
if c.sharedClient != nil && time.Since(c.sharedClient.timestamp) < c.cfg.ClientTTL {
return c.sharedClient.client, nil
}

if ok && time.Since(entry.timestamp) < c.cfg.ClientTTL {
c.lggr.Debugw("Using cached client", "name", node.Name)
return entry.client, nil
} else if ok {
// TTL expired — evict
c.lggr.Debugw("Evicting expired client", "name", node.Name)
c.cacheMu.Lock()
delete(c.clientCache, i)
c.cacheMu.Unlock()
Comment on lines -350 to -354
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leak: been creating liteserver pool on every client cache miss/expiry(every 10 minutes)

The cachedClient struct only held client and timestamp, no pool reference

}
// Stop the old pool if TTL expired
if c.sharedClient != nil {
c.lggr.Debugw("Stopping expired shared connection pool")
c.sharedClient.pool.Stop()
c.sharedClient = nil
}

// Build new client, expected URL format: liteserver://publickey@host:port
liteServerURL := node.URL.String()
connectionPool, cerr := tonchain.CreateLiteserverConnectionPool(ctx, liteServerURL)
if cerr != nil {
c.lggr.Warnw("failed to get connection pool", "name", node.Name, "ton-url", node.URL, "err", cerr)
continue
}
// Collect all node URLs
urls := make([]string, len(nodes))
for i, node := range nodes {
urls[i] = node.URL.String()
}

client := ton.NewAPIClient(connectionPool, ton.ProofCheckPolicyFast).WithRetry(defaultTONClientRetryCount)
// Create a single pool with all nodes
pool, err := tonchain.CreateMultiLiteserverConnectionPool(ctx, urls)
if err != nil {
return nil, fmt.Errorf("failed to create shared connection pool: %w", err)
}

blockID, err := client.CurrentMasterchainInfo(ctx)
if err != nil {
lastErr = err
c.evictClient(i, *node.Name, "CurrentMasterchainInfo failed")
continue
}
// set starting point to verify master block proofs chain
client.SetTrustedBlock(blockID)
client := ton.NewAPIClient(pool, ton.ProofCheckPolicyFast).WithRetry(defaultTONClientRetryCount)

block, err := client.GetBlockData(ctx, blockID)
if err != nil {
lastErr = err
c.evictClient(i, *node.Name, "GetBlockData failed")
continue
}
// Validate: get current masterchain info and set trusted block
blockID, err := client.CurrentMasterchainInfo(ctx)
if err != nil {
pool.Stop()
return nil, fmt.Errorf("failed to get masterchain info: %w", err)
}
if blockID.Workchain != address.MasterchainID {
pool.Stop()
return nil, fmt.Errorf("expected masterchain block (workchain %d), got workchain %d", address.MasterchainID, blockID.Workchain)
}
client.SetTrustedBlock(blockID)

chainID := block.GlobalID
if strconv.FormatInt(int64(chainID), 10) != c.id {
c.lggr.Errorw("unexpected chain id", "name", node.Name, "localChainId", c.id, "remoteChainId", chainID)
continue
}
// Validate chain ID
block, err := client.GetBlockData(ctx, blockID)
if err != nil {
pool.Stop()
return nil, fmt.Errorf("failed to get block data: %w", err)
}

// Cache the fresh client
c.cacheMu.Lock()
c.clientCache[i] = &cachedClient{
client: client,
timestamp: time.Now(),
}
c.cacheMu.Unlock()
chainID := block.GlobalID
if strconv.FormatInt(int64(chainID), 10) != c.id {
pool.Stop()
return nil, fmt.Errorf("unexpected chain id: expected %s, got %d", c.id, chainID)
}

c.lggr.Debugw("Created and cached client", "name", node.Name)
return client, nil
c.sharedClient = &cachedClient{
client: client,
pool: pool,
timestamp: time.Now(),
}

return nil, fmt.Errorf("no valid TON nodes available, last error: %w", lastErr)
c.lggr.Infow("Created shared connection pool", "nodeCount", len(nodes))
return client, nil
}

func (c *chain) GetSignerWallet(ctx context.Context, client ton.APIClientWrapped, loopKs loop.Keystore, accountIndex int) (*wallet.Wallet, error) {
Expand Down Expand Up @@ -440,13 +457,6 @@ func (c *chain) GetSignerWallet(ctx context.Context, client ton.APIClientWrapped
return w, nil
}

func (c *chain) evictClient(index int, name string, reason string) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
delete(c.clientCache, index)
c.lggr.Warnw("evicted client due to error", "name", name, "reason", reason)
}

func (c *chain) listNodeStatuses(start, end int) ([]commontypes.NodeStatus, int, error) {
stats := make([]commontypes.NodeStatus, 0)
total := len(c.cfg.Nodes)
Expand Down
54 changes: 54 additions & 0 deletions pkg/relay/chain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package relay

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/xssnick/tonutils-go/liteclient"
"github.com/xssnick/tonutils-go/ton"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-ton/pkg/config"
)

type mockAPIClient struct {
ton.APIClientWrapped
}

// TestGetClient_CacheHit verifies the fast path: when a cached client exists
// with a valid TTL, GetClient returns it without creating a new connection pool.
func TestGetClient_CacheHit(t *testing.T) {
t.Parallel()

mock := &mockAPIClient{}
c := &chain{
id: "-3",
cfg: &config.TOMLConfig{
Nodes: config.Nodes{makeNode("node1")},
Chain: config.Chain{ClientTTL: 10 * time.Minute},
},
lggr: logger.Sugared(logger.Nop()),
sharedClient: &cachedClient{
client: mock,
pool: liteclient.NewConnectionPool(),
timestamp: time.Now(),
},
}

client, err := c.GetClient(context.Background())
require.NoError(t, err)
// must return the exact same cached instance, not a new client
assert.Same(t, mock, client)
}

func makeNode(name string) *config.Node {
n := name
u := commonconfig.MustParseURL("liteserver://AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=@192.0.2.1:1")
return &config.Node{Name: &n, URL: u}
}
39 changes: 29 additions & 10 deletions pkg/ton/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,39 @@ import (
"github.com/xssnick/tonutils-go/liteclient"
)

// CreateLiteserverConnectionPool parses a liteserver:// URL and creates a connection pool
func CreateLiteserverConnectionPool(ctx context.Context, liteserverURL string) (*liteclient.ConnectionPool, error) {
publicKey, hostPort, err := parseLiteserverURL(liteserverURL)
if err != nil {
return nil, err
// CreateMultiLiteserverConnectionPool creates a single ConnectionPool containing all provided
// liteserver nodes. The pool natively handles load balancing, failover, and health monitoring.
// Partial connection failures are tolerated; an error is returned only if zero nodes connect.
func CreateMultiLiteserverConnectionPool(ctx context.Context, liteserverURLs []string) (*liteclient.ConnectionPool, error) {
if len(liteserverURLs) == 0 {
return nil, fmt.Errorf("no liteserver URLs provided")
}

pool := liteclient.NewConnectionPool()

var connected int
for _, u := range liteserverURLs {
publicKey, hostPort, err := parseLiteserverURL(u)
if err != nil {
return nil, fmt.Errorf("invalid liteserver URL %q: %w", u, err)
}
if err = pool.AddConnection(ctx, hostPort, publicKey); err != nil {
// tolerate partial failures — some nodes may be temporarily down
continue
}
connected++
}

connectionPool := liteclient.NewConnectionPool()
err = connectionPool.AddConnection(ctx, hostPort, publicKey)
if err != nil {
return nil, fmt.Errorf("failed to add liteserver connection: %w", err)
if connected == 0 {
return nil, fmt.Errorf("failed to connect to any of %d liteserver nodes", len(liteserverURLs))
}

return connectionPool, nil
return pool, nil
}

// CreateLiteserverConnectionPool parses a liteserver:// URL and creates a connection pool
func CreateLiteserverConnectionPool(ctx context.Context, liteserverURL string) (*liteclient.ConnectionPool, error) {
return CreateMultiLiteserverConnectionPool(ctx, []string{liteserverURL})
}

// parseLiteserverURL parses a liteserver:// URL and returns the public key and host:port
Expand Down
27 changes: 27 additions & 0 deletions pkg/ton/chain/chain_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package chain

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestParseLiteserverURL(t *testing.T) {
Expand Down Expand Up @@ -89,3 +93,26 @@ func TestParseLiteserverURL(t *testing.T) {
})
}
}

// TestCreateMultiLiteserverConnectionPool_Validation verifies that the function
// rejects invalid inputs before attempting any network connections.
func TestCreateMultiLiteserverConnectionPool_Validation(t *testing.T) {
t.Parallel()

tests := []struct {
name string
urls []string
}{
{name: "empty URL list", urls: []string{}},
{name: "nil URL list", urls: nil},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
pool, err := CreateMultiLiteserverConnectionPool(context.Background(), tt.urls)
require.Error(t, err)
assert.Nil(t, pool)
})
}
}
Loading