Skip to content

Commit 8b84dde

Browse files
committed
feat: persistent WASM instances with background_tick support
- Add background_tick() to Challenge trait (default no-op) - Cache WASM instances between sync/background_tick calls (persistent) - Reuse persistent instance in execute_sync_with_block instead of recreating - Add execute_background_tick() called every 12s from main loop - Update block_height/epoch/timestamp on persistent instance each call - Invalidate persistent instance on WASM re-upload (version bump) - Add TimeState::set_fixed_timestamp() for updating deterministic time
1 parent 58d734d commit 8b84dde

File tree

4 files changed

+222
-41
lines changed

4 files changed

+222
-41
lines changed

bins/validator-node/src/main.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,6 +1182,7 @@ async fn main() -> Result<()> {
11821182
let sync_block_interval: u64 = 1; // Call WASM sync every block, WASM decides frequency internally
11831183
let mut storage_stats_interval = tokio::time::interval(Duration::from_secs(300));
11841184
let mut storage_flush_interval = tokio::time::interval(Duration::from_secs(5));
1185+
let mut background_tick_interval = tokio::time::interval(Duration::from_secs(12));
11851186
// Track last synced block per challenge for delta sync
11861187
let challenge_last_sync: Arc<
11871188
RwLock<std::collections::HashMap<platform_core::ChallengeId, u64>>,
@@ -2046,6 +2047,36 @@ async fn main() -> Result<()> {
20462047
}
20472048
}
20482049

2050+
// Background tick - call background_tick() on persistent WASM instances every block
2051+
_ = background_tick_interval.tick() => {
2052+
if !is_bootnode {
2053+
let current_block = state_manager.apply(|state| state.bittensor_block);
2054+
let current_epoch = current_block / 360;
2055+
2056+
let challenges: Vec<_> = {
2057+
let cs = chain_state.read();
2058+
cs.wasm_challenge_configs
2059+
.iter()
2060+
.filter(|(_, cfg)| cfg.is_active)
2061+
.map(|(id, _)| id.clone())
2062+
.collect()
2063+
};
2064+
2065+
for challenge_id in challenges {
2066+
let module_path = challenge_id.to_string();
2067+
if let Some(ref executor) = wasm_executor {
2068+
if let Err(e) = executor.execute_background_tick(&module_path, current_block, current_epoch) {
2069+
debug!(
2070+
challenge_id = %challenge_id,
2071+
error = %e,
2072+
"background_tick failed"
2073+
);
2074+
}
2075+
}
2076+
}
2077+
}
2078+
}
2079+
20492080
// Periodic storage stats logging
20502081
_ = storage_stats_interval.tick() => {
20512082
let stats = storage.tracked_stats().await;

bins/validator-node/src/wasm_executor.rs

Lines changed: 174 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
use anyhow::{Context, Result};
2-
use parking_lot::RwLock;
2+
use parking_lot::{Mutex, RwLock};
33
use platform_challenge_sdk_wasm::{DedupFlags, EvaluationInput, EvaluationOutput, WeightEntry};
44
use std::collections::HashMap;
55
use std::path::PathBuf;
6-
use std::sync::atomic::{AtomicBool, Ordering};
6+
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
77
use std::sync::Arc;
88
use std::time::Instant;
9-
use tracing::{debug, info};
9+
use tracing::{debug, info, warn};
1010
use wasm_runtime_interface::{
11-
ConsensusPolicy, ExecPolicy, InMemoryStorageBackend, InstanceConfig, LlmPolicy, NetworkPolicy,
12-
RuntimeConfig, SandboxPolicy, StorageBackend, StorageHostConfig, TerminalPolicy, TimePolicy,
13-
WasmModule, WasmRuntime, WasmRuntimeError,
11+
ChallengeInstance, ConsensusPolicy, ExecPolicy, InMemoryStorageBackend, InstanceConfig,
12+
LlmPolicy, NetworkPolicy, RuntimeConfig, SandboxPolicy, StorageBackend, StorageHostConfig,
13+
TerminalPolicy, TimePolicy, WasmModule, WasmRuntime, WasmRuntimeError,
1414
};
1515

1616
const MAX_EVALUATION_OUTPUT_SIZE: usize = 64 * 1024 * 1024;
@@ -125,10 +125,25 @@ impl Drop for DedupGuard<'_> {
125125
}
126126
}
127127

128+
/// A persistent WASM instance that stays alive between calls.
129+
/// The module_version tracks which compiled module this instance was created
130+
/// from; when the module is re-uploaded the instance is recreated.
131+
struct PersistentInstance {
132+
instance: ChallengeInstance,
133+
module_version: u64,
134+
created_at: Instant,
135+
}
136+
137+
// ChallengeInstance contains wasmtime Store which is Send but not Sync.
138+
// We protect access with a Mutex so only one call at a time.
139+
unsafe impl Send for PersistentInstance {}
140+
128141
pub struct WasmChallengeExecutor {
129142
runtime: WasmRuntime,
130143
config: WasmExecutorConfig,
131144
module_cache: RwLock<HashMap<String, Arc<WasmModule>>>,
145+
module_versions: RwLock<HashMap<String, u64>>,
146+
persistent_instances: RwLock<HashMap<String, Arc<Mutex<PersistentInstance>>>>,
132147
dedup_state: RwLock<HashMap<String, Arc<DedupState>>>,
133148
}
134149

@@ -155,6 +170,8 @@ impl WasmChallengeExecutor {
155170
runtime,
156171
config,
157172
module_cache: RwLock::new(HashMap::new()),
173+
module_versions: RwLock::new(HashMap::new()),
174+
persistent_instances: RwLock::new(HashMap::new()),
158175
dedup_state: RwLock::new(HashMap::new()),
159176
})
160177
}
@@ -272,7 +289,7 @@ impl WasmChallengeExecutor {
272289
restart_id: String::new(),
273290
config_version: 0,
274291
storage_host_config: StorageHostConfig {
275-
allow_direct_writes: false,
292+
allow_direct_writes: true,
276293
require_consensus: true,
277294
..self.config.storage_host_config.clone()
278295
},
@@ -404,7 +421,7 @@ impl WasmChallengeExecutor {
404421
restart_id: String::new(),
405422
config_version: 0,
406423
storage_host_config: StorageHostConfig {
407-
allow_direct_writes: false,
424+
allow_direct_writes: true,
408425
require_consensus: true,
409426
..self.config.storage_host_config.clone()
410427
},
@@ -893,7 +910,7 @@ impl WasmChallengeExecutor {
893910
restart_id: String::new(),
894911
config_version: 0,
895912
storage_host_config: StorageHostConfig {
896-
allow_direct_writes: false,
913+
allow_direct_writes: true,
897914
require_consensus: true,
898915
..self.config.storage_host_config.clone()
899916
},
@@ -1078,7 +1095,7 @@ impl WasmChallengeExecutor {
10781095
challenge_id: module_path.to_string(),
10791096
validator_id: "validator".to_string(),
10801097
storage_host_config: StorageHostConfig {
1081-
allow_direct_writes: false,
1098+
allow_direct_writes: true,
10821099
require_consensus: true,
10831100
..self.config.storage_host_config.clone()
10841101
},
@@ -1150,7 +1167,7 @@ impl WasmChallengeExecutor {
11501167
}
11511168

11521169
/// Execute sync on a WASM challenge module with block context.
1153-
/// Returns WasmSyncResult with leaderboard hash and stats for consensus.
1170+
/// Reuses the persistent WASM instance across calls for in-memory state.
11541171
pub fn execute_sync_with_block(
11551172
&self,
11561173
module_path: &str,
@@ -1179,40 +1196,31 @@ impl WasmChallengeExecutor {
11791196
}
11801197
};
11811198

1182-
// Pass real wall-clock time so WASM can compute a correct 24 h window
1183-
// for GitHub API &since= queries.
1199+
let pi = self.get_or_create_persistent(module_path, block_height, epoch)?;
1200+
let mut pi_guard = pi.lock();
1201+
11841202
let real_now_ms = std::time::SystemTime::now()
11851203
.duration_since(std::time::UNIX_EPOCH)
11861204
.map(|d| d.as_millis() as u64)
11871205
.unwrap_or(0);
11881206

1189-
let instance_config = InstanceConfig {
1190-
challenge_id: module_path.to_string(),
1191-
validator_id: "validator".to_string(),
1192-
storage_host_config: StorageHostConfig {
1193-
allow_direct_writes: false,
1194-
require_consensus: true,
1195-
..self.config.storage_host_config.clone()
1196-
},
1197-
storage_backend: Arc::clone(&self.config.storage_backend),
1198-
consensus_policy: ConsensusPolicy::default(),
1199-
network_policy: NetworkPolicy::development(),
1200-
time_policy: TimePolicy::deterministic(real_now_ms),
1201-
llm_policy: match &self.config.chutes_api_key {
1202-
Some(key) => LlmPolicy::with_api_key(key.clone()),
1203-
None => LlmPolicy::default(),
1204-
},
1205-
block_height,
1206-
epoch,
1207-
..Default::default()
1208-
};
1207+
// Update block/epoch/timestamp on the persistent instance
1208+
{
1209+
let state = pi_guard.instance.store_mut().data_mut();
1210+
state.consensus_state.block_height = block_height;
1211+
state.consensus_state.epoch = epoch;
1212+
state.fixed_timestamp_ms = Some(real_now_ms as i64);
1213+
state.time_state.set_fixed_timestamp(real_now_ms);
1214+
}
12091215

1210-
let mut instance = self
1211-
.runtime
1212-
.instantiate(&module, instance_config, None)
1213-
.map_err(|e| anyhow::anyhow!("WASM instantiation failed: {}", e))?;
1216+
// Reset fuel before each sync call
1217+
if self.config.enable_fuel {
1218+
if let Some(limit) = self.config.fuel_limit {
1219+
let _ = pi_guard.instance.store_mut().set_fuel(limit);
1220+
}
1221+
}
12141222

1215-
let result = instance
1223+
let result = pi_guard.instance
12161224
.call_return_i64("sync")
12171225
.map_err(|e| anyhow::anyhow!("WASM sync call failed: {}", e))?;
12181226

@@ -1230,7 +1238,7 @@ impl WasmChallengeExecutor {
12301238
});
12311239
}
12321240

1233-
let result_data = instance
1241+
let result_data = pi_guard.instance
12341242
.read_memory(out_ptr as usize, out_len as usize)
12351243
.map_err(|e| anyhow::anyhow!("failed to read WASM memory for sync output: {}", e))?;
12361244

@@ -1242,7 +1250,7 @@ impl WasmChallengeExecutor {
12421250
module = module_path,
12431251
total_users = sync_result.total_users,
12441252
execution_time_ms = start.elapsed().as_millis() as u64,
1245-
"WASM sync completed"
1253+
"WASM sync completed (persistent instance)"
12461254
);
12471255

12481256
// Clear the pending writes cache for this challenge so subsequent reads
@@ -1275,7 +1283,7 @@ impl WasmChallengeExecutor {
12751283
challenge_id: module_path.to_string(),
12761284
validator_id: "validator".to_string(),
12771285
storage_host_config: StorageHostConfig {
1278-
allow_direct_writes: false,
1286+
allow_direct_writes: true,
12791287
require_consensus: true,
12801288
..self.config.storage_host_config.clone()
12811289
},
@@ -1428,20 +1436,145 @@ impl WasmChallengeExecutor {
14281436
Ok(module)
14291437
}
14301438

1439+
/// Get or create a persistent WASM instance for a challenge.
1440+
/// The instance is reused across sync/background_tick calls.
1441+
/// It is recreated when the module is re-uploaded (version bump).
1442+
fn get_or_create_persistent(
1443+
&self,
1444+
module_path: &str,
1445+
block_height: u64,
1446+
epoch: u64,
1447+
) -> Result<Arc<Mutex<PersistentInstance>>> {
1448+
let current_version = self.module_versions.read().get(module_path).copied().unwrap_or(0);
1449+
1450+
// Check if we already have a valid persistent instance
1451+
{
1452+
let cache = self.persistent_instances.read();
1453+
if let Some(pi) = cache.get(module_path) {
1454+
let guard = pi.lock();
1455+
if guard.module_version == current_version {
1456+
drop(guard);
1457+
return Ok(Arc::clone(pi));
1458+
}
1459+
// Version mismatch, will recreate below
1460+
}
1461+
}
1462+
1463+
// Create a new persistent instance
1464+
let module = self.load_module(module_path)
1465+
.context("Failed to load WASM module for persistent instance")?;
1466+
1467+
let real_now_ms = std::time::SystemTime::now()
1468+
.duration_since(std::time::UNIX_EPOCH)
1469+
.map(|d| d.as_millis() as u64)
1470+
.unwrap_or(0);
1471+
1472+
let instance_config = InstanceConfig {
1473+
challenge_id: module_path.to_string(),
1474+
validator_id: "validator".to_string(),
1475+
storage_host_config: StorageHostConfig {
1476+
allow_direct_writes: true,
1477+
require_consensus: true,
1478+
..self.config.storage_host_config.clone()
1479+
},
1480+
storage_backend: Arc::clone(&self.config.storage_backend),
1481+
consensus_policy: ConsensusPolicy::default(),
1482+
network_policy: NetworkPolicy::development(),
1483+
time_policy: TimePolicy::deterministic(real_now_ms),
1484+
llm_policy: match &self.config.chutes_api_key {
1485+
Some(key) => LlmPolicy::with_api_key(key.clone()),
1486+
None => LlmPolicy::default(),
1487+
},
1488+
block_height,
1489+
epoch,
1490+
..Default::default()
1491+
};
1492+
1493+
let instance = self.runtime
1494+
.instantiate(&module, instance_config, None)
1495+
.map_err(|e| anyhow::anyhow!("Failed to create persistent WASM instance: {}", e))?;
1496+
1497+
let pi = Arc::new(Mutex::new(PersistentInstance {
1498+
instance,
1499+
module_version: current_version,
1500+
created_at: Instant::now(),
1501+
}));
1502+
1503+
self.persistent_instances.write().insert(module_path.to_string(), Arc::clone(&pi));
1504+
info!(module = module_path, version = current_version, "persistent WASM instance created");
1505+
Ok(pi)
1506+
}
1507+
1508+
/// Execute background_tick() on the persistent WASM instance.
1509+
/// Called every block for lightweight background work.
1510+
pub fn execute_background_tick(
1511+
&self,
1512+
module_path: &str,
1513+
block_height: u64,
1514+
epoch: u64,
1515+
) -> Result<()> {
1516+
let pi = self.get_or_create_persistent(module_path, block_height, epoch)?;
1517+
let mut guard = pi.lock();
1518+
1519+
let real_now_ms = std::time::SystemTime::now()
1520+
.duration_since(std::time::UNIX_EPOCH)
1521+
.map(|d| d.as_millis() as u64)
1522+
.unwrap_or(0);
1523+
1524+
// Update block/epoch/timestamp context on the persistent instance
1525+
{
1526+
let state = guard.instance.store_mut().data_mut();
1527+
state.consensus_state.block_height = block_height;
1528+
state.consensus_state.epoch = epoch;
1529+
state.fixed_timestamp_ms = Some(real_now_ms as i64);
1530+
state.time_state.set_fixed_timestamp(real_now_ms);
1531+
}
1532+
1533+
// Reset fuel if enabled
1534+
if self.config.enable_fuel {
1535+
if let Some(limit) = self.config.fuel_limit {
1536+
let _ = guard.instance.store_mut().set_fuel(limit);
1537+
}
1538+
}
1539+
1540+
// Call background_tick - void function, no return value
1541+
match guard.instance.call("background_tick", &[]) {
1542+
Ok(_) => {}
1543+
Err(WasmRuntimeError::MissingExport(_)) => {
1544+
// WASM doesn't export background_tick, that's fine
1545+
}
1546+
Err(e) => {
1547+
warn!(module = module_path, error = %e, "background_tick failed");
1548+
}
1549+
}
1550+
1551+
Ok(())
1552+
}
1553+
14311554
#[allow(dead_code)]
14321555
pub fn invalidate_cache(&self, module_path: &str) {
14331556
let mut cache = self.module_cache.write();
14341557
if cache.remove(module_path).is_some() {
14351558
info!(module = module_path, "WASM module cache entry invalidated");
14361559
}
14371560
self.dedup_state.write().remove(module_path);
1561+
// Bump module version so persistent instance gets recreated
1562+
let mut versions = self.module_versions.write();
1563+
let v = versions.entry(module_path.to_string()).or_insert(0);
1564+
*v += 1;
1565+
info!(module = module_path, version = *v, "module version bumped");
1566+
// Drop old persistent instance
1567+
if self.persistent_instances.write().remove(module_path).is_some() {
1568+
info!(module = module_path, "persistent instance dropped");
1569+
}
14381570
}
14391571

14401572
#[allow(dead_code)]
14411573
pub fn clear_cache(&self) {
14421574
let mut cache = self.module_cache.write();
14431575
let count = cache.len();
14441576
cache.clear();
1577+
self.persistent_instances.write().clear();
14451578
info!(cleared = count, "WASM module cache cleared");
14461579
}
14471580

0 commit comments

Comments
 (0)