From f3f027e7658ac3cb5d2194f1fe1ce40106afd6ca Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Fri, 23 Jan 2026 19:40:45 +0100 Subject: [PATCH 1/3] fix(p2p): give each underlay address fresh 15s connection timeout --- pkg/p2p/libp2p/libp2p.go | 17 ++++++++++++++++- pkg/topology/kademlia/kademlia.go | 6 ------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 6bab9b8214c..2aa83e60a19 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -997,7 +997,18 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b // addresses for the same peer is passed to the host.Host.Connect function // and reachabiltiy Private is emitted on libp2p EventBus(), which results // in weaker connectivity and failures in some integration tests. + + baseCtx := context.WithoutCancel(ctx) + for _, addr := range addrs { + // Check if parent context is cancelled (shutdown, etc.) + if ctx.Err() != nil { + if connectErr == nil { + connectErr = ctx.Err() + } + break + } + // Extract the peer ID from the multiaddr. ai, err := libp2ppeer.AddrInfoFromP2pAddr(addr) if err != nil { @@ -1029,7 +1040,11 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b return address, p2p.ErrAlreadyConnected } - if err := s.connectionBreaker.Execute(func() error { return s.host.Connect(ctx, *info) }); err != nil { + connectCtx, cancel := context.WithTimeout(baseCtx, 15*time.Second) + err = s.connectionBreaker.Execute(func() error { return s.host.Connect(connectCtx, *info) }) + cancel() + + if err != nil { if errors.Is(err, breaker.ErrClosed) { s.metrics.ConnectBreakerCount.Inc() return nil, p2p.NewConnectionBackoffError(err, s.connectionBreaker.ClosedUntil()) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index b31dd6d6c6d..425c83f8129 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -807,9 +807,6 @@ func (k *Kad) connectBootNodes(ctx context.Context) { var attempts, connected int totalAttempts := maxBootNodeAttempts * len(k.opt.Bootnodes) - ctx, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - for _, addr := range k.opt.Bootnodes { if attempts >= totalAttempts || connected >= 3 { return @@ -960,9 +957,6 @@ func (k *Kad) recalcDepth() { func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma []ma.Multiaddr) error { k.logger.Debug("attempting connect to peer", "peer_address", peer) - ctx, cancel := context.WithTimeout(ctx, peerConnectionAttemptTimeout) - defer cancel() - k.metrics.TotalOutboundConnectionAttempts.Inc() switch i, err := k.p2p.Connect(ctx, ma); { From 59f250173efef5e7fc6498b410ad12295c10eaeb Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Fri, 6 Feb 2026 18:44:49 +0100 Subject: [PATCH 2/3] fix(p2p): remove WithoutCancel and add per-peer connection budget --- pkg/p2p/libp2p/libp2p.go | 12 +----------- pkg/topology/kademlia/kademlia.go | 14 ++++++++++---- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 2aa83e60a19..2009a676dd2 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -998,17 +998,7 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b // and reachabiltiy Private is emitted on libp2p EventBus(), which results // in weaker connectivity and failures in some integration tests. - baseCtx := context.WithoutCancel(ctx) - for _, addr := range addrs { - // Check if parent context is cancelled (shutdown, etc.) - if ctx.Err() != nil { - if connectErr == nil { - connectErr = ctx.Err() - } - break - } - // Extract the peer ID from the multiaddr. ai, err := libp2ppeer.AddrInfoFromP2pAddr(addr) if err != nil { @@ -1040,7 +1030,7 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b return address, p2p.ErrAlreadyConnected } - connectCtx, cancel := context.WithTimeout(baseCtx, 15*time.Second) + connectCtx, cancel := context.WithTimeout(ctx, 10*time.Second) err = s.connectionBreaker.Execute(func() error { return s.host.Connect(connectCtx, *info) }) cancel() diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 425c83f8129..c19e79c0e2f 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -42,10 +42,9 @@ const ( addPeerBatchSize = 500 - // To avoid context.Timeout errors during network failure, the value of - // the peerConnectionAttemptTimeout constant must be equal to or greater - // than 5 seconds (empirically verified). - peerConnectionAttemptTimeout = 15 * time.Second // timeout for establishing a new connection with peer. + // Each underlay address gets up to 10s for connection (in libp2p.Connect). + // This budget allows multiple addresses to be tried sequentially per peer. + peerConnectionAttemptTimeout = 45 * time.Second // timeout for establishing a new connection with peer. ) // Default option values @@ -817,6 +816,10 @@ func (k *Kad) connectBootNodes(ctx context.Context) { if attempts >= maxBootNodeAttempts { return true, nil } + + ctx, cancel := context.WithTimeout(ctx, peerConnectionAttemptTimeout) + defer cancel() + bzzAddress, err := k.p2p.Connect(ctx, []ma.Multiaddr{addr}) attempts++ @@ -957,6 +960,9 @@ func (k *Kad) recalcDepth() { func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma []ma.Multiaddr) error { k.logger.Debug("attempting connect to peer", "peer_address", peer) + ctx, cancel := context.WithTimeout(ctx, peerConnectionAttemptTimeout) + defer cancel() + k.metrics.TotalOutboundConnectionAttempts.Inc() switch i, err := k.p2p.Connect(ctx, ma); { From 80dfa1d89e6872cd3aea0cf69870ccb528bb4481 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Tue, 10 Feb 2026 15:16:17 +0100 Subject: [PATCH 3/3] fix(libp2p): use 15s timeout per address to connect --- pkg/p2p/libp2p/libp2p.go | 2 +- pkg/topology/kademlia/kademlia.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 2009a676dd2..cfc2f83d15f 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -1030,7 +1030,7 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b return address, p2p.ErrAlreadyConnected } - connectCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) err = s.connectionBreaker.Execute(func() error { return s.host.Connect(connectCtx, *info) }) cancel() diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index c19e79c0e2f..d4b28e8eb1b 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -42,7 +42,7 @@ const ( addPeerBatchSize = 500 - // Each underlay address gets up to 10s for connection (in libp2p.Connect). + // Each underlay address gets up to 15s for connection (in libp2p.Connect). // This budget allows multiple addresses to be tried sequentially per peer. peerConnectionAttemptTimeout = 45 * time.Second // timeout for establishing a new connection with peer. )