@@ -193,6 +193,7 @@ impl BlockSync {
193193 let mut first_block_seen = false ;
194194 let mut current_block_rx = block_rx;
195195 let mut consecutive_reconnect_failures: u32 = 0 ;
196+ let mut consecutive_connection_errors: u32 = 0 ;
196197
197198 loop {
198199 if !* running. read ( ) . await {
@@ -202,6 +203,17 @@ impl BlockSync {
202203 match current_block_rx. recv ( ) . await {
203204 Ok ( event) => {
204205 consecutive_reconnect_failures = 0 ;
206+
207+ // Track consecutive ConnectionError events.
208+ // When the WebSocket dies, BlockListener keeps
209+ // retrying on the same dead client and emits
210+ // ConnectionError in a loop without ever closing
211+ // the broadcast channel. After a threshold of
212+ // consecutive errors we proactively recreate the
213+ // listener with a fresh client.
214+ let is_connection_error =
215+ matches ! ( & event, BlockEvent :: ConnectionError ( _) ) ;
216+
205217 let should_break = BlockSync :: handle_block_event (
206218 event,
207219 & event_tx,
@@ -216,6 +228,81 @@ impl BlockSync {
216228 if should_break {
217229 break ;
218230 }
231+
232+ if is_connection_error {
233+ consecutive_connection_errors += 1 ;
234+ } else {
235+ consecutive_connection_errors = 0 ;
236+ }
237+
238+ // Too many consecutive connection errors — the
239+ // underlying client is dead. Recreate everything.
240+ if consecutive_connection_errors >= 5 {
241+ warn ! (
242+ errors = consecutive_connection_errors,
243+ "Too many consecutive connection errors — \
244+ recreating Bittensor client"
245+ ) ;
246+ consecutive_connection_errors = 0 ;
247+
248+ if !* running. read ( ) . await {
249+ break ;
250+ }
251+
252+ let delay_secs = 5u64 ;
253+ tokio:: time:: sleep (
254+ std:: time:: Duration :: from_secs ( delay_secs) ,
255+ )
256+ . await ;
257+
258+ match BlockSync :: recreate_listener ( & rpc_url, & config)
259+ . await
260+ {
261+ Ok ( (
262+ new_client,
263+ new_listener,
264+ new_rx,
265+ epoch_info,
266+ ) ) => {
267+ info ! (
268+ block = epoch_info. current_block,
269+ epoch = epoch_info. epoch_number,
270+ "Bittensor client recreated after \
271+ consecutive connection errors"
272+ ) ;
273+ * current_block. write ( ) . await =
274+ epoch_info. current_block ;
275+ * current_epoch. write ( ) . await =
276+ epoch_info. epoch_number ;
277+ * current_phase. write ( ) . await =
278+ epoch_info. phase ;
279+ current_block_rx = new_rx;
280+ was_disconnected = true ;
281+
282+ if let Err ( e) =
283+ new_listener. start ( new_client) . await
284+ {
285+ warn ! (
286+ "Failed to start recreated \
287+ listener: {}",
288+ e
289+ ) ;
290+ } else {
291+ let _ = event_tx
292+ . send ( BlockSyncEvent :: Reconnected )
293+ . await ;
294+ }
295+ }
296+ Err ( e) => {
297+ warn ! (
298+ error = %e,
299+ "Failed to recreate Bittensor client \
300+ after connection errors, will keep \
301+ retrying"
302+ ) ;
303+ }
304+ }
305+ }
219306 }
220307 Err ( broadcast:: error:: RecvError :: Lagged ( n) ) => {
221308 warn ! ( "Block sync lagged by {} events" , n) ;
0 commit comments