Skip to content

Commit 5fd0d1a

Browse files
committed
fix: support insecure ws:// URLs + auto-reconnect Bittensor client
- Vendor bittensor-rs and patch BittensorClient::new to use from_insecure_url, allowing both ws:// and wss:// endpoints - BlockSync now recreates the BittensorClient from scratch when the broadcast channel closes (dead websocket), with exponential backoff - SubtensorClient gains reconnect() and with_reconnect() methods for automatic transport-error recovery on metagraph sync and weight ops
1 parent acb0a05 commit 5fd0d1a

76 files changed

Lines changed: 12350 additions & 16 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 6 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,8 @@ authors = ["Platform Network"]
3535
license = "Apache-2.0"
3636

3737
[workspace.dependencies]
38-
# Bittensor (with CRv4 timelock encryption support and new Subtensor API)
39-
# Updated to d2ca7c2: fix epoch calculation to match subtensor formula
40-
bittensor-rs = { git = "https://github.com/CortexLM/bittensor-rs", rev = "99b6325" }
38+
# Bittensor (vendored with insecure-URL fix: ws:// support + auto-reconnect)
39+
bittensor-rs = { path = "vendor/bittensor-rs" }
4140

4241
# Async runtime
4342
tokio = { version = "1.40", features = ["full", "sync", "macros", "rt-multi-thread"] }

crates/bittensor-integration/src/block_sync.rs

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ pub struct BlockSync {
6565
config: BlockSyncConfig,
6666
listener: Option<BlockListener>,
6767
client: Option<Arc<BittensorClient>>,
68+
/// Stored so we can recreate the client on disconnect.
69+
rpc_url: Option<String>,
6870
running: Arc<RwLock<bool>>,
6971
event_tx: mpsc::Sender<BlockSyncEvent>,
7072
event_rx: Option<mpsc::Receiver<BlockSyncEvent>>,
@@ -83,6 +85,7 @@ impl BlockSync {
8385
config,
8486
listener: None,
8587
client: None,
88+
rpc_url: None,
8689
running: Arc::new(RwLock::new(false)),
8790
event_tx,
8891
event_rx: Some(event_rx),
@@ -129,6 +132,7 @@ impl BlockSync {
129132
epoch_info.blocks_remaining, mins, secs
130133
);
131134

135+
self.rpc_url = Some(client.rpc_url.clone());
132136
self.listener = Some(listener);
133137
self.client = Some(client);
134138

@@ -140,7 +144,12 @@ impl BlockSync {
140144
*self.tempo.read().await
141145
}
142146

143-
/// Start the block sync loop
147+
/// Start the block sync loop.
148+
///
149+
/// The spawned background task monitors the block listener and, if the
150+
/// underlying Bittensor RPC connection dies permanently (broadcast channel
151+
/// closed), it will **recreate** the `BittensorClient` from scratch,
152+
/// re-initialise the `BlockListener`, and resume processing blocks.
144153
pub async fn start(&self) -> anyhow::Result<()> {
145154
let listener = self
146155
.listener
@@ -152,6 +161,11 @@ impl BlockSync {
152161
.as_ref()
153162
.ok_or_else(|| anyhow::anyhow!("Not connected"))?;
154163

164+
let rpc_url = self
165+
.rpc_url
166+
.clone()
167+
.ok_or_else(|| anyhow::anyhow!("RPC URL not set"))?;
168+
155169
// Check if already running
156170
{
157171
let mut running = self.running.write().await;
@@ -162,28 +176,32 @@ impl BlockSync {
162176
}
163177

164178
// Subscribe to block events
165-
let mut block_rx = listener.subscribe();
179+
let block_rx = listener.subscribe();
166180
let event_tx = self.event_tx.clone();
167181
let running = self.running.clone();
168182
let current_block = self.current_block.clone();
169183
let current_epoch = self.current_epoch.clone();
170184
let current_phase = self.current_phase.clone();
185+
let config = self.config.clone();
171186

172187
// Start the listener
173188
listener.start(client.clone()).await?;
174189

175-
// Process events in background
190+
// Process events in background with automatic client recreation
176191
tokio::spawn(async move {
177192
let mut was_disconnected = false;
178193
let mut first_block_seen = false;
194+
let mut current_block_rx = block_rx;
195+
let mut consecutive_reconnect_failures: u32 = 0;
179196

180197
loop {
181198
if !*running.read().await {
182199
break;
183200
}
184201

185-
match block_rx.recv().await {
202+
match current_block_rx.recv().await {
186203
Ok(event) => {
204+
consecutive_reconnect_failures = 0;
187205
let should_break = BlockSync::handle_block_event(
188206
event,
189207
&event_tx,
@@ -203,8 +221,62 @@ impl BlockSync {
203221
warn!("Block sync lagged by {} events", n);
204222
}
205223
Err(broadcast::error::RecvError::Closed) => {
206-
info!("Block event channel closed");
207-
break;
224+
// The BlockListener's internal task has died and the
225+
// broadcast channel is gone. The old BittensorClient's
226+
// websocket is likely dead too, so retrying with it
227+
// will not help. Recreate everything from scratch.
228+
229+
if !*running.read().await {
230+
break;
231+
}
232+
233+
consecutive_reconnect_failures += 1;
234+
let delay_secs = std::cmp::min(5 * consecutive_reconnect_failures, 60);
235+
warn!(
236+
attempt = consecutive_reconnect_failures,
237+
delay_secs,
238+
"Block listener channel closed — recreating Bittensor client in {}s",
239+
delay_secs,
240+
);
241+
242+
let _ = event_tx
243+
.send(BlockSyncEvent::Disconnected(
244+
"Block listener channel closed, recreating client".into(),
245+
))
246+
.await;
247+
248+
tokio::time::sleep(std::time::Duration::from_secs(delay_secs as u64))
249+
.await;
250+
251+
match BlockSync::recreate_listener(&rpc_url, &config).await {
252+
Ok((new_client, new_listener, new_rx, epoch_info)) => {
253+
info!(
254+
block = epoch_info.current_block,
255+
epoch = epoch_info.epoch_number,
256+
"Bittensor client recreated successfully"
257+
);
258+
*current_block.write().await = epoch_info.current_block;
259+
*current_epoch.write().await = epoch_info.epoch_number;
260+
*current_phase.write().await = epoch_info.phase;
261+
current_block_rx = new_rx;
262+
was_disconnected = true;
263+
consecutive_reconnect_failures = 0;
264+
265+
if let Err(e) = new_listener.start(new_client).await {
266+
warn!("Failed to start recreated listener: {}", e);
267+
} else {
268+
let _ =
269+
event_tx.send(BlockSyncEvent::Reconnected).await;
270+
}
271+
}
272+
Err(e) => {
273+
warn!(
274+
attempt = consecutive_reconnect_failures,
275+
error = %e,
276+
"Failed to recreate Bittensor client, will retry"
277+
);
278+
}
279+
}
208280
}
209281
}
210282
}
@@ -213,6 +285,34 @@ impl BlockSync {
213285
Ok(())
214286
}
215287

288+
/// Create a fresh `BittensorClient` + `BlockListener` from the RPC URL.
289+
async fn recreate_listener(
290+
rpc_url: &str,
291+
config: &BlockSyncConfig,
292+
) -> anyhow::Result<(
293+
Arc<BittensorClient>,
294+
BlockListener,
295+
broadcast::Receiver<BlockEvent>,
296+
EpochInfo,
297+
)> {
298+
let client = Arc::new(BittensorClient::new(rpc_url).await?);
299+
300+
let listener_config = BlockListenerConfig {
301+
netuid: config.netuid,
302+
channel_capacity: config.channel_capacity,
303+
auto_reconnect: true,
304+
reconnect_delay_ms: 5000,
305+
};
306+
307+
let listener = BlockListener::new(listener_config);
308+
listener.init(&client).await?;
309+
310+
let epoch_info = listener.current_epoch_info(&client).await?;
311+
let rx = listener.subscribe();
312+
313+
Ok((client, listener, rx, epoch_info))
314+
}
315+
216316
async fn handle_block_event(
217317
event: BlockEvent,
218318
event_tx: &mpsc::Sender<BlockSyncEvent>,

crates/bittensor-integration/src/client.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,61 @@ impl SubtensorClient {
3939
Ok(())
4040
}
4141

42+
/// Reconnect to Subtensor by creating a fresh client.
43+
/// Useful when the underlying websocket has died.
44+
pub async fn reconnect(&mut self) -> Result<()> {
45+
info!(
46+
"Reconnecting to Subtensor: {}",
47+
self.config.endpoint
48+
);
49+
self.client = None;
50+
let client = BittensorClient::new(&self.config.endpoint).await?;
51+
self.client = Some(client);
52+
info!("Reconnected to Subtensor");
53+
Ok(())
54+
}
55+
56+
/// Execute a fallible operation, reconnecting once if the error looks
57+
/// like a dead websocket / RPC transport failure.
58+
pub async fn with_reconnect<F, Fut, T>(&mut self, op: F) -> Result<T>
59+
where
60+
F: Fn(&BittensorClient) -> Fut,
61+
Fut: std::future::Future<Output = Result<T>>,
62+
{
63+
let client = self
64+
.client
65+
.as_ref()
66+
.ok_or_else(|| anyhow::anyhow!("Not connected to Subtensor"))?;
67+
68+
match op(client).await {
69+
Ok(val) => Ok(val),
70+
Err(first_err) => {
71+
let msg = first_err.to_string().to_lowercase();
72+
let is_transport = msg.contains("websocket")
73+
|| msg.contains("connection reset")
74+
|| msg.contains("connection refused")
75+
|| msg.contains("broken pipe")
76+
|| msg.contains("eof")
77+
|| msg.contains("channel closed")
78+
|| msg.contains("timed out")
79+
|| msg.contains("timeout");
80+
81+
if !is_transport {
82+
return Err(first_err);
83+
}
84+
85+
info!("Transport error detected, attempting reconnect: {}", first_err);
86+
self.reconnect().await?;
87+
88+
let client = self
89+
.client
90+
.as_ref()
91+
.ok_or_else(|| anyhow::anyhow!("Not connected after reconnect"))?;
92+
op(client).await
93+
}
94+
}
95+
}
96+
4297
/// Set the signer from a seed phrase or key
4398
pub fn set_signer(&mut self, seed: &str) -> Result<()> {
4499
let signer = signer_from_seed(seed)?;

vendor/bittensor-rs/Cargo.toml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
[package]
2+
name = "bittensor-rs"
3+
version = "0.1.0"
4+
edition = "2021"
5+
authors = ["Cortex Foundation"]
6+
description = "A Rust SDK for interacting with the Bittensor network"
7+
repository = "https://github.com/CortexLM/bittensor-rs"
8+
readme = "README.md"
9+
license = "MIT"
10+
keywords = ["bittensor", "blockchain", "substrate", "sdk"]
11+
categories = ["cryptography::cryptocurrencies", "network-programming"]
12+
13+
[dependencies]
14+
anyhow = "1.0.100"
15+
async-trait = "0.1.89"
16+
futures = "0.3.31"
17+
hex = "0.4.3"
18+
num-traits = "0.2.19"
19+
parity-scale-codec = { version = "3.7.5", features = ["derive"] }
20+
scale-decode = "0.16.0"
21+
scale-encode = "0.10.0"
22+
scale-info = "2.11"
23+
serde = { version = "1.0.228", features = ["derive"] }
24+
serde_json = "1.0"
25+
sp-core = "38.1.0"
26+
sp-runtime = "44.0.0"
27+
subxt = "0.44"
28+
thiserror = "2.0.17"
29+
tokio = { version = "1.48.0", features = ["full"] }
30+
rand = "0.9"
31+
regex = "1.10"
32+
tracing = "0.1"
33+
chrono = { version = "0.4", features = ["serde"] }
34+
35+
# CRv4 / Timelock encryption dependencies (same as subtensor)
36+
tle = { git = "https://github.com/ideal-lab5/timelock", rev = "5416406cfd32799e31e1795393d4916894de4468", default-features = false, features = ["std"] }
37+
w3f-bls = { git = "https://github.com/opentensor/bls", branch = "fix-no-std", default-features = false, features = ["std"] }
38+
ark-serialize = { version = "0.4.0", features = ["derive"] }
39+
ark-std = "0.4.0"
40+
sha2 = "0.10"
41+
rand_chacha = "0.3"
42+
43+
[patch.crates-io]
44+
w3f-bls = { git = "https://github.com/opentensor/bls", branch = "fix-no-std" }

0 commit comments

Comments
 (0)