Skip to content

Commit 15c4955

Browse files
committed
feat: add delta sync, storage_list_prefix/count_prefix host functions
- Delta sync: track last_sync_block per challenge, only transfer entries updated since last known good block (no full download on divergence) - Reduce sync interval from 60 to 30 blocks (~3 min) - Add storage_list_prefix and storage_count_prefix host functions for WASM - Implement list_prefix/count_prefix in StorageBackend trait and ChallengeStorageBackend - Add SDK wrappers: host_storage_list_prefix, host_storage_count_prefix - Fix StorageVote consensus write to use put_options_with_block
1 parent 20aa8d2 commit 15c4955

File tree

5 files changed

+278
-24
lines changed

5 files changed

+278
-24
lines changed

bins/validator-node/src/challenge_storage.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,4 +166,49 @@ impl StorageBackend for ChallengeStorageBackend {
166166
.map_err(|e| StorageHostError::StorageError(e.to_string()))?;
167167
Ok(result.map(|v| v.data))
168168
}
169+
170+
fn list_prefix(
171+
&self,
172+
challenge_id: &str,
173+
prefix: &[u8],
174+
limit: u32,
175+
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageHostError> {
176+
let prefix_bytes = if prefix.is_empty() {
177+
None
178+
} else {
179+
Some(prefix)
180+
};
181+
182+
let result = tokio::task::block_in_place(|| {
183+
tokio::runtime::Handle::current().block_on(self.storage.list_prefix(
184+
challenge_id,
185+
prefix_bytes,
186+
limit as usize,
187+
None,
188+
))
189+
})
190+
.map_err(|e| StorageHostError::StorageError(e.to_string()))?;
191+
192+
// Convert StorageKey back to the raw key bytes the WASM expects.
193+
// The key stored in DStorageKey is hex-encoded, so decode it back.
194+
let items: Vec<(Vec<u8>, Vec<u8>)> = result
195+
.items
196+
.into_iter()
197+
.map(|(storage_key, stored_value)| {
198+
let raw_key = hex::decode(storage_key.key_string().unwrap_or_default())
199+
.unwrap_or_else(|_| storage_key.key.clone());
200+
(raw_key, stored_value.data)
201+
})
202+
.collect();
203+
204+
Ok(items)
205+
}
206+
207+
fn count_prefix(&self, challenge_id: &str, _prefix: &[u8]) -> Result<u64, StorageHostError> {
208+
tokio::task::block_in_place(|| {
209+
tokio::runtime::Handle::current()
210+
.block_on(self.storage.count_by_namespace(challenge_id))
211+
})
212+
.map_err(|e| StorageHostError::StorageError(e.to_string()))
213+
}
169214
}

bins/validator-node/src/main.rs

Lines changed: 74 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -933,10 +933,14 @@ async fn main() -> Result<()> {
933933
let mut stale_job_interval = tokio::time::interval(Duration::from_secs(120));
934934
let mut weight_check_interval = tokio::time::interval(Duration::from_secs(30));
935935
let mut last_weight_submission_epoch: u64 = 0; // Local tracking of weight submissions
936-
let mut challenge_sync_interval = tokio::time::interval(Duration::from_secs(60)); // Check every minute
937-
let mut last_sync_block: u64 = 0; // Last block where sync was triggered
938-
let sync_block_interval: u64 = 60; // Sync every 60 blocks
939-
let mut storage_stats_interval = tokio::time::interval(Duration::from_secs(300)); // Log stats every 5 min
936+
let mut challenge_sync_interval = tokio::time::interval(Duration::from_secs(30)); // Check every 30s
937+
let mut last_sync_block: u64 = 0;
938+
let sync_block_interval: u64 = 30; // Sync every 30 blocks (~3 min)
939+
let mut storage_stats_interval = tokio::time::interval(Duration::from_secs(300));
940+
// Track last synced block per challenge for delta sync
941+
let challenge_last_sync: Arc<
942+
RwLock<std::collections::HashMap<platform_core::ChallengeId, u64>>,
943+
> = Arc::new(RwLock::new(std::collections::HashMap::new()));
940944

941945
// Clone p2p_cmd_tx for use in the loop
942946
let p2p_broadcast_tx = p2p_cmd_tx.clone();
@@ -955,6 +959,7 @@ async fn main() -> Result<()> {
955959
&keypair,
956960
&p2p_cmd_tx,
957961
&chain_state,
962+
&challenge_last_sync,
958963
).await;
959964
}
960965

@@ -1837,6 +1842,7 @@ async fn handle_network_event(
18371842
keypair: &Keypair,
18381843
p2p_cmd_tx: &tokio::sync::mpsc::Sender<platform_p2p_consensus::P2PCommand>,
18391844
chain_state: &Arc<RwLock<platform_core::ChainState>>,
1845+
challenge_last_sync: &Arc<RwLock<std::collections::HashMap<platform_core::ChallengeId, u64>>>,
18401846
) {
18411847
match event {
18421848
NetworkEvent::Message { source, message } => match message {
@@ -2692,7 +2698,7 @@ async fn handle_network_event(
26922698
.put(
26932699
storage_key.clone(),
26942700
proposal.value.clone(),
2695-
PutOptions::default(),
2701+
put_options_with_block(state_manager),
26962702
)
26972703
.await
26982704
.map(|_| true)
@@ -3142,20 +3148,31 @@ async fn handle_network_event(
31423148
};
31433149

31443150
if our_hash != [0u8; 32] && our_hash == proposal.sync_result_hash {
3151+
// In sync -- update last_sync_block
3152+
challenge_last_sync
3153+
.write()
3154+
.insert(proposal.challenge_id, proposal.block_number);
31453155
debug!(
31463156
challenge_id = %proposal.challenge_id,
31473157
"Sync hash matches peer, storage is in sync"
31483158
);
31493159
} else if proposal.sync_result_hash != [0u8; 32] {
3160+
// Delta sync: only request entries since our last known good block
3161+
let since_block = challenge_last_sync
3162+
.read()
3163+
.get(&proposal.challenge_id)
3164+
.copied()
3165+
.unwrap_or(0);
3166+
31503167
info!(
31513168
challenge_id = %proposal.challenge_id,
31523169
proposer = %proposal.proposer.to_ss58(),
31533170
our_hash = %hex::encode(&our_hash[..8]),
31543171
peer_hash = %hex::encode(&proposal.sync_result_hash[..8]),
3155-
"Sync hash divergence detected, requesting storage sync"
3172+
since_block = since_block,
3173+
"Sync hash divergence, requesting delta sync"
31563174
);
31573175

3158-
// Request storage data from the proposer
31593176
let timestamp = chrono::Utc::now().timestamp_millis();
31603177
let sign_data =
31613178
bincode::serialize(&(&proposal.challenge_id, &our_hash, timestamp))
@@ -3167,6 +3184,7 @@ async fn handle_network_event(
31673184
challenge_id: proposal.challenge_id,
31683185
requester: keypair.hotkey(),
31693186
current_hash: our_hash,
3187+
since_block,
31703188
timestamp,
31713189
signature,
31723190
},
@@ -3192,26 +3210,45 @@ async fn handle_network_event(
31923210
warn!(requester = %req.requester.to_ss58(), "Storage sync request from non-validator");
31933211
} else {
31943212
let challenge_ns = req.challenge_id.to_string();
3213+
let since = req.since_block;
31953214
info!(
31963215
challenge_id = %req.challenge_id,
31973216
requester = %req.requester.to_ss58(),
3198-
"Responding to storage sync request"
3217+
since_block = since,
3218+
"Responding to storage sync request (delta)"
31993219
);
32003220

3201-
// Read all keys in this challenge namespace
3202-
match storage.list_prefix(&challenge_ns, None, 10_000, None).await {
3203-
Ok(list_result) => {
3204-
let entries: Vec<platform_p2p_consensus::StorageSyncEntry> =
3205-
list_result
3206-
.items
3207-
.iter()
3208-
.map(|(key, value)| platform_p2p_consensus::StorageSyncEntry {
3209-
namespace: key.namespace.clone(),
3210-
key: key.key.clone(),
3211-
value: value.data.clone(),
3212-
version: value.metadata.version,
3213-
})
3214-
.collect();
3221+
// Collect items: if since_block > 0, delta only
3222+
let items_result: Result<
3223+
Vec<(
3224+
platform_distributed_storage::StorageKey,
3225+
platform_distributed_storage::StoredValue,
3226+
)>,
3227+
_,
3228+
> = if since > 0 {
3229+
storage
3230+
.list_after_block(&challenge_ns, since, 10_000)
3231+
.await
3232+
.map(|qr| qr.items)
3233+
} else {
3234+
storage
3235+
.list_prefix(&challenge_ns, None, 10_000, None)
3236+
.await
3237+
.map(|lr| lr.items)
3238+
};
3239+
3240+
match items_result {
3241+
Ok(items) => {
3242+
let entries: Vec<platform_p2p_consensus::StorageSyncEntry> = items
3243+
.iter()
3244+
.map(|(key, value)| platform_p2p_consensus::StorageSyncEntry {
3245+
namespace: key.namespace.clone(),
3246+
key: key.key.clone(),
3247+
value: value.data.clone(),
3248+
version: value.metadata.version,
3249+
updated_block: value.metadata.updated_block,
3250+
})
3251+
.collect();
32153252

32163253
let total = entries.len() as u64;
32173254

@@ -3254,7 +3291,8 @@ async fn handle_network_event(
32543291
info!(
32553292
challenge_id = %req.challenge_id,
32563293
entries = total,
3257-
"Storage sync response sent"
3294+
since_block = since,
3295+
"Storage sync delta response sent"
32583296
);
32593297
}
32603298
}
@@ -3286,6 +3324,7 @@ async fn handle_network_event(
32863324

32873325
let mut applied = 0u64;
32883326
let mut skipped = 0u64;
3327+
let mut max_block = 0u64;
32893328
let opts = put_options_with_block(state_manager);
32903329

32913330
for entry in &resp.entries {
@@ -3306,16 +3345,27 @@ async fn handle_network_event(
33063345
warn!(error = %e, "Failed to apply synced entry");
33073346
} else {
33083347
applied += 1;
3348+
if entry.updated_block > max_block {
3349+
max_block = entry.updated_block;
3350+
}
33093351
}
33103352
}
33113353
}
33123354
}
33133355

3356+
// Update last_sync_block for this challenge
3357+
if max_block > 0 {
3358+
challenge_last_sync
3359+
.write()
3360+
.insert(resp.challenge_id, max_block);
3361+
}
3362+
33143363
info!(
33153364
challenge_id = %resp.challenge_id,
33163365
applied = applied,
33173366
skipped = skipped,
3318-
"Storage sync complete"
3367+
max_block = max_block,
3368+
"Storage delta sync complete"
33193369
);
33203370
}
33213371
}

crates/challenge-sdk-wasm/src/host_functions.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ extern "C" {
2323
key_len: i32,
2424
value_ptr: i32,
2525
) -> i32;
26+
fn storage_list_prefix(prefix_ptr: i32, prefix_len: i32, result_ptr: i32, limit: i32) -> i32;
27+
fn storage_count_prefix(prefix_ptr: i32, prefix_len: i32) -> i64;
2628
}
2729

2830
#[link(wasm_import_module = "platform_terminal")]
@@ -135,6 +137,33 @@ pub fn host_storage_get_cross(challenge_id: &[u8], key: &[u8]) -> Result<Vec<u8>
135137
Ok(value_buf)
136138
}
137139

140+
/// List all key-value pairs matching a prefix. Returns bincode-encoded Vec<(Vec<u8>, Vec<u8>)>.
141+
pub fn host_storage_list_prefix(prefix: &[u8], limit: i32) -> Result<Vec<u8>, i32> {
142+
let mut result_buf = vec![0u8; RESPONSE_BUF_LARGE];
143+
let status = unsafe {
144+
storage_list_prefix(
145+
prefix.as_ptr() as i32,
146+
prefix.len() as i32,
147+
result_buf.as_mut_ptr() as i32,
148+
limit,
149+
)
150+
};
151+
if status < 0 {
152+
return Err(status);
153+
}
154+
result_buf.truncate(status as usize);
155+
Ok(result_buf)
156+
}
157+
158+
/// Count keys matching a prefix.
159+
pub fn host_storage_count_prefix(prefix: &[u8]) -> Result<u64, i32> {
160+
let count = unsafe { storage_count_prefix(prefix.as_ptr() as i32, prefix.len() as i32) };
161+
if count < 0 {
162+
return Err(count as i32);
163+
}
164+
Ok(count as u64)
165+
}
166+
138167
pub fn host_terminal_exec(request: &[u8]) -> Result<Vec<u8>, i32> {
139168
let mut result_buf = vec![0u8; RESPONSE_BUF_LARGE];
140169
let status = unsafe {

crates/p2p-consensus/src/messages.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,9 @@ pub struct StorageSyncRequestMessage {
10421042
pub requester: Hotkey,
10431043
/// Our current hash for this namespace (so responder can skip if identical)
10441044
pub current_hash: [u8; 32],
1045+
/// Only send entries updated after this block (0 = full sync)
1046+
#[serde(default)]
1047+
pub since_block: u64,
10451048
/// Timestamp
10461049
pub timestamp: i64,
10471050
/// Signature
@@ -1058,6 +1061,9 @@ pub struct StorageSyncEntry {
10581061
pub value: Vec<u8>,
10591062
/// Metadata version
10601063
pub version: u64,
1064+
/// Block at which this entry was last updated
1065+
#[serde(default)]
1066+
pub updated_block: u64,
10611067
}
10621068

10631069
/// Response with storage data for a challenge namespace

0 commit comments

Comments
 (0)