Skip to content

Commit 25c784f

Browse files
committed
feat(validator): add standalone hourly weight submitter
Add independent weight submission thread completely decoupled from P2P rate limiting and block sync: - Create standalone_weight_submitter.rs module - Timer checks every 60s for minute() == 0 (heure pile) - Fetches weights from chain.platform.zip/rpc - Submits directly via Subtensor::set_weights - Retry logic with backoff [5s, 10s, 30s] - Graceful shutdown handling Integration in main.rs: - Spawn after subtensor/signer creation - Shutdown in Ctrl+C handler This ensures weights are submitted hourly even when P2P is rate limited or experiencing issues.
1 parent 2783361 commit 25c784f

File tree

2 files changed

+289
-0
lines changed

2 files changed

+289
-0
lines changed

bins/validator-node/src/main.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
)]
1111

1212
mod challenge_storage;
13+
mod standalone_weight_submitter;
1314
mod wasm_executor;
1415

1516
use anyhow::Result;
@@ -21,6 +22,7 @@ use platform_bittensor::{
2122
BlockSync, BlockSyncConfig, BlockSyncEvent, BackgroundWeightHandler, Metagraph, Subtensor,
2223
SubtensorClient, WeightTaskHandle,
2324
};
25+
use standalone_weight_submitter::{StandaloneWeightSubmitter, spawn_standalone_weight_submitter};
2426
use platform_core::{
2527
checkpoint::{
2628
CheckpointData, CheckpointManager, CompletedEvaluationState, PendingEvaluationState,
@@ -543,6 +545,7 @@ async fn main() -> Result<()> {
543545
let mut block_rx: Option<tokio::sync::mpsc::Receiver<BlockSyncEvent>> = None;
544546
let mut weight_task_handle: Option<WeightTaskHandle> = None;
545547
let mut background_weight_handler: Option<BackgroundWeightHandler> = None;
548+
let mut standalone_weight_submitter: Option<Arc<StandaloneWeightSubmitter>> = None;
546549

547550
if !args.no_bittensor {
548551
info!("Connecting to Bittensor: {}", args.subtensor_endpoint);
@@ -735,6 +738,14 @@ async fn main() -> Result<()> {
735738
));
736739
info!("Background hourly weight handler spawned");
737740

741+
// Spawn standalone hourly weight submitter (decoupled from P2P)
742+
standalone_weight_submitter = Some(spawn_standalone_weight_submitter(
743+
subtensor.as_ref().unwrap().clone(),
744+
(**subtensor_signer.as_ref().unwrap()).clone(),
745+
args.netuid,
746+
));
747+
info!("Standalone hourly weight submitter spawned");
748+
738749
if endpoint != &args.subtensor_endpoint {
739750
warn!(
740751
"Connected to fallback Finney endpoint {} (primary {} failed)",
@@ -2355,6 +2366,9 @@ async fn main() -> Result<()> {
23552366
if let Some(ref handler) = background_weight_handler {
23562367
handler.shutdown().await;
23572368
}
2369+
if let Some(ref submitter) = standalone_weight_submitter {
2370+
submitter.shutdown();
2371+
}
23582372
}
23592373
).await;
23602374

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
//! Standalone weight submission thread.
2+
//! Completely decoupled from P2P rate limiting and block sync.
3+
//! Fetches weights from chain.platform.zip and submits directly to Bittensor.
4+
5+
use std::sync::atomic::{AtomicBool, Ordering};
6+
use std::sync::Arc;
7+
use std::time::Duration;
8+
9+
use anyhow::Result;
10+
use chrono::{Timelike, Utc};
11+
use reqwest::Client;
12+
use serde::Deserialize;
13+
use tracing::{debug, error, info, warn};
14+
15+
use platform_bittensor::{BittensorSigner, ExtrinsicWait, Subtensor};
16+
17+
/// URL for fetching pre-computed weights
18+
const WEIGHT_RPC_URL: &str = "https://chain.platform.zip/rpc";
19+
20+
/// How often to check the time (60 seconds)
21+
const CHECK_INTERVAL_SECS: u64 = 60;
22+
23+
/// Retry delays for RPC fetch (seconds)
24+
const RETRY_DELAYS: [u64; 3] = [5, 10, 30];
25+
26+
/// Response from subnet_getWeights RPC
27+
#[derive(Debug, Deserialize)]
28+
struct WeightResponse {
29+
result: WeightResult,
30+
}
31+
32+
#[derive(Debug, Deserialize)]
33+
struct WeightResult {
34+
weights: Vec<MechanismWeights>,
35+
}
36+
37+
#[derive(Debug, Deserialize)]
38+
struct MechanismWeights {
39+
#[serde(rename = "mechanismId")]
40+
mechanism_id: u8,
41+
entries: Vec<WeightEntry>,
42+
}
43+
44+
#[derive(Debug, Deserialize)]
45+
struct WeightEntry {
46+
uid: u16,
47+
weight: u16,
48+
}
49+
50+
/// Standalone weight submitter - no P2P dependencies
51+
pub struct StandaloneWeightSubmitter {
52+
subtensor: Arc<Subtensor>,
53+
signer: BittensorSigner,
54+
netuid: u16,
55+
http_client: Client,
56+
last_submission_hour: Arc<tokio::sync::Mutex<Option<i64>>>,
57+
running: Arc<AtomicBool>,
58+
}
59+
60+
impl StandaloneWeightSubmitter {
61+
/// Create a new standalone weight submitter
62+
pub fn new(
63+
subtensor: Arc<Subtensor>,
64+
signer: BittensorSigner,
65+
netuid: u16,
66+
) -> Self {
67+
let http_client = Client::builder()
68+
.timeout(Duration::from_secs(30))
69+
.build()
70+
.expect("Failed to create HTTP client");
71+
72+
Self {
73+
subtensor,
74+
signer,
75+
netuid,
76+
http_client,
77+
last_submission_hour: Arc::new(tokio::sync::Mutex::new(None)),
78+
running: Arc::new(AtomicBool::new(true)),
79+
}
80+
}
81+
82+
/// Shutdown gracefully
83+
pub fn shutdown(&self) {
84+
self.running.store(false, Ordering::SeqCst);
85+
info!("Standalone weight submitter shutdown requested");
86+
}
87+
88+
/// Run the submission loop (call in a spawned task)
89+
pub async fn run(&self) {
90+
let mut interval = tokio::time::interval(Duration::from_secs(CHECK_INTERVAL_SECS));
91+
92+
info!(
93+
url = WEIGHT_RPC_URL,
94+
netuid = self.netuid,
95+
"Standalone weight submitter started (hourly at :00)"
96+
);
97+
98+
while self.running.load(Ordering::SeqCst) {
99+
interval.tick().await;
100+
101+
let now = Utc::now();
102+
103+
// Check if we're at the top of the hour (minute 0)
104+
if now.minute() == 0 {
105+
let hour_epoch = now.timestamp() / 3600;
106+
107+
// Dedup: only submit once per hour
108+
let mut last = self.last_submission_hour.lock().await;
109+
if let Some(last_hour) = *last {
110+
if last_hour == hour_epoch {
111+
debug!("Already submitted this hour, skipping");
112+
continue;
113+
}
114+
}
115+
116+
// Try to submit with retries
117+
match self.submit_with_retry().await {
118+
Ok(()) => {
119+
*last = Some(hour_epoch);
120+
info!(hour = hour_epoch, "Weights submitted successfully");
121+
}
122+
Err(e) => {
123+
error!(error = %e, "Failed to submit weights after all retries");
124+
}
125+
}
126+
}
127+
}
128+
129+
info!("Standalone weight submitter stopped");
130+
}
131+
132+
/// Submit weights with retry logic
133+
async fn submit_with_retry(&self) -> Result<()> {
134+
let mut last_error = None;
135+
136+
for (attempt, delay) in RETRY_DELAYS.iter().enumerate() {
137+
match self.fetch_and_submit().await {
138+
Ok(()) => return Ok(()),
139+
Err(e) => {
140+
warn!(
141+
attempt = attempt + 1,
142+
max_attempts = RETRY_DELAYS.len(),
143+
delay_secs = delay,
144+
error = %e,
145+
"Weight submission failed, retrying..."
146+
);
147+
last_error = Some(e);
148+
if attempt < RETRY_DELAYS.len() - 1 {
149+
tokio::time::sleep(Duration::from_secs(*delay)).await;
150+
}
151+
}
152+
}
153+
}
154+
155+
Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Unknown error")))
156+
}
157+
158+
/// Fetch weights from RPC and submit to chain
159+
async fn fetch_and_submit(&self) -> Result<()> {
160+
// 1. Fetch from chain.platform.zip
161+
let weights = self.fetch_weights().await?;
162+
163+
if weights.is_empty() {
164+
warn!("Empty weights from RPC");
165+
return Ok(());
166+
}
167+
168+
// 2. Submit each mechanism
169+
for mw in weights {
170+
let uids: Vec<u16> = mw.entries.iter().map(|e| e.uid).collect();
171+
let weights: Vec<u16> = mw.entries.iter().map(|e| e.weight).collect();
172+
173+
info!(
174+
mechanism_id = mw.mechanism_id,
175+
uid_count = uids.len(),
176+
total_weight = weights.iter().map(|w| *w as u64).sum::<u64>(),
177+
"Submitting weights to Bittensor"
178+
);
179+
180+
self.subtensor
181+
.set_weights(
182+
&self.signer,
183+
self.netuid,
184+
&uids,
185+
&weights,
186+
mw.mechanism_id as u64,
187+
ExtrinsicWait::Finalized,
188+
)
189+
.await
190+
.map_err(|e| anyhow::anyhow!("set_weights failed: {}", e))?;
191+
}
192+
193+
Ok(())
194+
}
195+
196+
/// Fetch weights from chain.platform.zip/rpc
197+
async fn fetch_weights(&self) -> Result<Vec<MechanismWeights>> {
198+
let body = serde_json::json!({
199+
"jsonrpc": "2.0",
200+
"method": "subnet_getWeights",
201+
"params": {},
202+
"id": 1
203+
});
204+
205+
let response = self.http_client
206+
.post(WEIGHT_RPC_URL)
207+
.header("Content-Type", "application/json")
208+
.json(&body)
209+
.send()
210+
.await
211+
.map_err(|e| anyhow::anyhow!("HTTP request failed: {}", e))?;
212+
213+
let status = response.status();
214+
if !status.is_success() {
215+
anyhow::bail!("RPC HTTP error: {} {}", status.as_u16(), status.canonical_reason().unwrap_or(""));
216+
}
217+
218+
let parsed: WeightResponse = response
219+
.json()
220+
.await
221+
.map_err(|e| anyhow::anyhow!("JSON parse failed: {}", e))?;
222+
223+
Ok(parsed.result.weights)
224+
}
225+
}
226+
227+
/// Spawn the standalone weight submitter in a dedicated task.
228+
pub fn spawn_standalone_weight_submitter(
229+
subtensor: Arc<Subtensor>,
230+
signer: BittensorSigner,
231+
netuid: u16,
232+
) -> Arc<StandaloneWeightSubmitter> {
233+
let submitter = Arc::new(StandaloneWeightSubmitter::new(subtensor, signer, netuid));
234+
let submitter_clone = submitter.clone();
235+
236+
tokio::spawn(async move {
237+
submitter_clone.run().await;
238+
});
239+
240+
submitter
241+
}
242+
243+
#[cfg(test)]
244+
mod tests {
245+
use super::*;
246+
247+
#[test]
248+
fn test_constants() {
249+
assert!(!WEIGHT_RPC_URL.is_empty());
250+
assert_eq!(CHECK_INTERVAL_SECS, 60);
251+
assert_eq!(RETRY_DELAYS, [5, 10, 30]);
252+
}
253+
254+
#[test]
255+
fn test_json_parsing() {
256+
let json = serde_json::json!({
257+
"result": {
258+
"weights": [
259+
{
260+
"mechanismId": 0,
261+
"entries": [
262+
{"uid": 1, "weight": 100},
263+
{"uid": 2, "weight": 200}
264+
]
265+
}
266+
]
267+
}
268+
});
269+
270+
let parsed: WeightResponse = serde_json::from_value(json).unwrap();
271+
assert_eq!(parsed.result.weights.len(), 1);
272+
assert_eq!(parsed.result.weights[0].mechanism_id, 0);
273+
assert_eq!(parsed.result.weights[0].entries.len(), 2);
274+
}
275+
}

0 commit comments

Comments
 (0)