From 03b7559e7fe2ba599af4a3409c6849eb815e3c72 Mon Sep 17 00:00:00 2001 From: Jade Park Date: Thu, 5 Feb 2026 20:12:17 +0000 Subject: [PATCH 1/2] fix: better node pool management --- pkg/relay/chain.go | 146 ++++++++++++++++++++++------------------- pkg/ton/chain/chain.go | 39 ++++++++--- 2 files changed, 107 insertions(+), 78 deletions(-) diff --git a/pkg/relay/chain.go b/pkg/relay/chain.go index f9e68a5d5..3c4e7da2f 100644 --- a/pkg/relay/chain.go +++ b/pkg/relay/chain.go @@ -7,11 +7,12 @@ import ( "errors" "fmt" "math/big" - "math/rand/v2" "strconv" "sync" "time" + "github.com/xssnick/tonutils-go/address" + "github.com/xssnick/tonutils-go/liteclient" "github.com/xssnick/tonutils-go/ton" "github.com/xssnick/tonutils-go/ton/wallet" "github.com/xssnick/tonutils-go/tvm/cell" @@ -67,6 +68,7 @@ var _ Chain = (*chain)(nil) type cachedClient struct { client ton.APIClientWrapped + pool *liteclient.ConnectionPool timestamp time.Time } @@ -83,8 +85,8 @@ type chain struct { lp logpoller.Service bm services.Service - clientCache map[int]*cachedClient - cacheMu sync.RWMutex + sharedClient *cachedClient + cacheMu sync.RWMutex } func NewChain(cfg *config.TOMLConfig, opts ChainOpts) (Chain, error) { @@ -108,7 +110,6 @@ func newChain(cfg *config.TOMLConfig, loopKs loop.Keystore, lggr logger.Logger, cfg: cfg, lggr: logger.Named(lggr, "Chain"), ds: ds, - clientCache: make(map[int]*cachedClient), } // TODO(@jadepark-dev): TXM technically doesn't need SignedAPIClient, revisit to refactor @@ -207,7 +208,15 @@ func (c *chain) Start(ctx context.Context) error { func (c *chain) Close() error { return c.starter.StopOnce("Chain", func() error { - c.lggr.Debug("Stopping txm, log poller, and balance monitor") + c.lggr.Debug("Stopping txm, log poller, balance monitor, and shared connection pool") + + c.cacheMu.Lock() + if c.sharedClient != nil { + c.sharedClient.pool.Stop() + c.sharedClient = nil + } + c.cacheMu.Unlock() + return services.CloseAll(c.txm, c.lp, c.bm) }) } @@ -325,80 +334,88 @@ func (c *chain) ChainID() string { return c.id } -// GetClient returns a client, randomly selecting one from available and valid nodes +// GetClient returns a TON API client backed by a shared connection pool containing all +// configured liteserver nodes. The pool handles load balancing, failover, and health +// monitoring internally. The client is cached with a TTL to periodically refresh the +// trusted block and re-validate the chain ID. func (c *chain) GetClient(ctx context.Context) (ton.APIClientWrapped, error) { - var lastErr error nodes := c.cfg.Nodes if len(nodes) == 0 { return nil, errors.New("no nodes available") } - indexes := rand.Perm(len(nodes)) + // Fast path: return cached client if TTL is still valid + c.cacheMu.RLock() + if c.sharedClient != nil && time.Since(c.sharedClient.timestamp) < c.cfg.ClientTTL { + client := c.sharedClient.client + c.cacheMu.RUnlock() + return client, nil + } + c.cacheMu.RUnlock() - for _, i := range indexes { - node := nodes[i] + // Slow path: create or refresh the shared client + c.cacheMu.Lock() + defer c.cacheMu.Unlock() - // Check cache - c.cacheMu.RLock() - entry, ok := c.clientCache[i] - c.cacheMu.RUnlock() + // Double-check after acquiring write lock + if c.sharedClient != nil && time.Since(c.sharedClient.timestamp) < c.cfg.ClientTTL { + return c.sharedClient.client, nil + } - if ok && time.Since(entry.timestamp) < c.cfg.ClientTTL { - c.lggr.Debugw("Using cached client", "name", node.Name) - return entry.client, nil - } else if ok { - // TTL expired — evict - c.lggr.Debugw("Evicting expired client", "name", node.Name) - c.cacheMu.Lock() - delete(c.clientCache, i) - c.cacheMu.Unlock() - } + // Stop the old pool if TTL expired + if c.sharedClient != nil { + c.lggr.Debugw("Stopping expired shared connection pool") + c.sharedClient.pool.Stop() + c.sharedClient = nil + } - // Build new client, expected URL format: liteserver://publickey@host:port - liteServerURL := node.URL.String() - connectionPool, cerr := tonchain.CreateLiteserverConnectionPool(ctx, liteServerURL) - if cerr != nil { - c.lggr.Warnw("failed to get connection pool", "name", node.Name, "ton-url", node.URL, "err", cerr) - continue - } + // Collect all node URLs + urls := make([]string, len(nodes)) + for i, node := range nodes { + urls[i] = node.URL.String() + } - client := ton.NewAPIClient(connectionPool, ton.ProofCheckPolicyFast).WithRetry(defaultTONClientRetryCount) + // Create a single pool with all nodes + pool, err := tonchain.CreateMultiLiteserverConnectionPool(ctx, urls) + if err != nil { + return nil, fmt.Errorf("failed to create shared connection pool: %w", err) + } - blockID, err := client.CurrentMasterchainInfo(ctx) - if err != nil { - lastErr = err - c.evictClient(i, *node.Name, "CurrentMasterchainInfo failed") - continue - } - // set starting point to verify master block proofs chain - client.SetTrustedBlock(blockID) + client := ton.NewAPIClient(pool, ton.ProofCheckPolicyFast).WithRetry(defaultTONClientRetryCount) - block, err := client.GetBlockData(ctx, blockID) - if err != nil { - lastErr = err - c.evictClient(i, *node.Name, "GetBlockData failed") - continue - } + // Validate: get current masterchain info and set trusted block + blockID, err := client.CurrentMasterchainInfo(ctx) + if err != nil { + pool.Stop() + return nil, fmt.Errorf("failed to get masterchain info: %w", err) + } + if blockID.Workchain != address.MasterchainID { + pool.Stop() + return nil, fmt.Errorf("expected masterchain block (workchain %d), got workchain %d", address.MasterchainID, blockID.Workchain) + } + client.SetTrustedBlock(blockID) - chainID := block.GlobalID - if strconv.FormatInt(int64(chainID), 10) != c.id { - c.lggr.Errorw("unexpected chain id", "name", node.Name, "localChainId", c.id, "remoteChainId", chainID) - continue - } + // Validate chain ID + block, err := client.GetBlockData(ctx, blockID) + if err != nil { + pool.Stop() + return nil, fmt.Errorf("failed to get block data: %w", err) + } - // Cache the fresh client - c.cacheMu.Lock() - c.clientCache[i] = &cachedClient{ - client: client, - timestamp: time.Now(), - } - c.cacheMu.Unlock() + chainID := block.GlobalID + if strconv.FormatInt(int64(chainID), 10) != c.id { + pool.Stop() + return nil, fmt.Errorf("unexpected chain id: expected %s, got %d", c.id, chainID) + } - c.lggr.Debugw("Created and cached client", "name", node.Name) - return client, nil + c.sharedClient = &cachedClient{ + client: client, + pool: pool, + timestamp: time.Now(), } - return nil, fmt.Errorf("no valid TON nodes available, last error: %w", lastErr) + c.lggr.Infow("Created shared connection pool", "nodeCount", len(nodes)) + return client, nil } func (c *chain) GetSignerWallet(ctx context.Context, client ton.APIClientWrapped, loopKs loop.Keystore, accountIndex int) (*wallet.Wallet, error) { @@ -440,13 +457,6 @@ func (c *chain) GetSignerWallet(ctx context.Context, client ton.APIClientWrapped return w, nil } -func (c *chain) evictClient(index int, name string, reason string) { - c.cacheMu.Lock() - defer c.cacheMu.Unlock() - delete(c.clientCache, index) - c.lggr.Warnw("evicted client due to error", "name", name, "reason", reason) -} - func (c *chain) listNodeStatuses(start, end int) ([]commontypes.NodeStatus, int, error) { stats := make([]commontypes.NodeStatus, 0) total := len(c.cfg.Nodes) diff --git a/pkg/ton/chain/chain.go b/pkg/ton/chain/chain.go index 8d04d582b..f0f2caf62 100644 --- a/pkg/ton/chain/chain.go +++ b/pkg/ton/chain/chain.go @@ -8,20 +8,39 @@ import ( "github.com/xssnick/tonutils-go/liteclient" ) -// CreateLiteserverConnectionPool parses a liteserver:// URL and creates a connection pool -func CreateLiteserverConnectionPool(ctx context.Context, liteserverURL string) (*liteclient.ConnectionPool, error) { - publicKey, hostPort, err := parseLiteserverURL(liteserverURL) - if err != nil { - return nil, err +// CreateMultiLiteserverConnectionPool creates a single ConnectionPool containing all provided +// liteserver nodes. The pool natively handles load balancing, failover, and health monitoring. +// Partial connection failures are tolerated; an error is returned only if zero nodes connect. +func CreateMultiLiteserverConnectionPool(ctx context.Context, liteserverURLs []string) (*liteclient.ConnectionPool, error) { + if len(liteserverURLs) == 0 { + return nil, fmt.Errorf("no liteserver URLs provided") + } + + pool := liteclient.NewConnectionPool() + + var connected int + for _, u := range liteserverURLs { + publicKey, hostPort, err := parseLiteserverURL(u) + if err != nil { + return nil, fmt.Errorf("invalid liteserver URL %q: %w", u, err) + } + if err = pool.AddConnection(ctx, hostPort, publicKey); err != nil { + // tolerate partial failures — some nodes may be temporarily down + continue + } + connected++ } - connectionPool := liteclient.NewConnectionPool() - err = connectionPool.AddConnection(ctx, hostPort, publicKey) - if err != nil { - return nil, fmt.Errorf("failed to add liteserver connection: %w", err) + if connected == 0 { + return nil, fmt.Errorf("failed to connect to any of %d liteserver nodes", len(liteserverURLs)) } - return connectionPool, nil + return pool, nil +} + +// CreateLiteserverConnectionPool parses a liteserver:// URL and creates a connection pool +func CreateLiteserverConnectionPool(ctx context.Context, liteserverURL string) (*liteclient.ConnectionPool, error) { + return CreateMultiLiteserverConnectionPool(ctx, []string{liteserverURL}) } // parseLiteserverURL parses a liteserver:// URL and returns the public key and host:port From 6982c034873846b2e6d74fc5c61692e89d5258b9 Mon Sep 17 00:00:00 2001 From: Jade Park Date: Thu, 5 Feb 2026 20:25:09 +0000 Subject: [PATCH 2/2] chore: add tests --- pkg/relay/chain_test.go | 54 +++++++++++++++++++++++++++++++++++++ pkg/ton/chain/chain_test.go | 27 +++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 pkg/relay/chain_test.go diff --git a/pkg/relay/chain_test.go b/pkg/relay/chain_test.go new file mode 100644 index 000000000..15189f378 --- /dev/null +++ b/pkg/relay/chain_test.go @@ -0,0 +1,54 @@ +package relay + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/xssnick/tonutils-go/liteclient" + "github.com/xssnick/tonutils-go/ton" + + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + + "github.com/smartcontractkit/chainlink-ton/pkg/config" +) + +type mockAPIClient struct { + ton.APIClientWrapped +} + +// TestGetClient_CacheHit verifies the fast path: when a cached client exists +// with a valid TTL, GetClient returns it without creating a new connection pool. +func TestGetClient_CacheHit(t *testing.T) { + t.Parallel() + + mock := &mockAPIClient{} + c := &chain{ + id: "-3", + cfg: &config.TOMLConfig{ + Nodes: config.Nodes{makeNode("node1")}, + Chain: config.Chain{ClientTTL: 10 * time.Minute}, + }, + lggr: logger.Sugared(logger.Nop()), + sharedClient: &cachedClient{ + client: mock, + pool: liteclient.NewConnectionPool(), + timestamp: time.Now(), + }, + } + + client, err := c.GetClient(context.Background()) + require.NoError(t, err) + // must return the exact same cached instance, not a new client + assert.Same(t, mock, client) +} + +func makeNode(name string) *config.Node { + n := name + u := commonconfig.MustParseURL("liteserver://AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=@192.0.2.1:1") + return &config.Node{Name: &n, URL: u} +} diff --git a/pkg/ton/chain/chain_test.go b/pkg/ton/chain/chain_test.go index 9dc0530f4..349c984e0 100644 --- a/pkg/ton/chain/chain_test.go +++ b/pkg/ton/chain/chain_test.go @@ -1,7 +1,11 @@ package chain import ( + "context" "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestParseLiteserverURL(t *testing.T) { @@ -89,3 +93,26 @@ func TestParseLiteserverURL(t *testing.T) { }) } } + +// TestCreateMultiLiteserverConnectionPool_Validation verifies that the function +// rejects invalid inputs before attempting any network connections. +func TestCreateMultiLiteserverConnectionPool_Validation(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + urls []string + }{ + {name: "empty URL list", urls: []string{}}, + {name: "nil URL list", urls: nil}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + pool, err := CreateMultiLiteserverConnectionPool(context.Background(), tt.urls) + require.Error(t, err) + assert.Nil(t, pool) + }) + } +}