diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index 66db5995a..6f31dc920 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -34,13 +34,10 @@ use alloy_sol_types::SolEvent; use kinode::process::chain::ChainResponses; use kinode_process_lib::{ await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap, - print_to_terminal, println, timer, Address, Message, PackageId, Request, Response, + print_to_terminal, println, set_state, timer, Address, Message, PackageId, Request, Response, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::{HashMap, HashSet}, - str::FromStr, -}; +use std::collections::{HashMap, HashSet}; wit_bindgen::generate!({ path: "target/wit", @@ -63,20 +60,109 @@ const KIMAP_ADDRESS: &str = "0x9CE8cCD2932DC727c70f9ae4f8C2b68E6Abed58C"; const DELAY_MS: u64 = 1_000; // 1s -#[derive(Debug, Serialize, Deserialize)] -pub struct State { - /// the kimap helper we are using - pub kimap: kimap::Kimap, +pub const CURRENT_VERSION: u32 = 1; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VersionedState { + version: u32, + #[serde(flatten)] + state: StateEnum, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum StateEnum { + V1(AppStoreState), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AppStoreState { /// the last block at which we saved the state of the listings to disk. /// when we boot, we can read logs starting from this block and /// rebuild latest state. - pub last_saved_block: u64, + pub last_block: u64, /// onchain listings pub listings: HashMap, /// set of packages that we have published pub published: HashSet, } +impl AppStoreState { + pub fn save(&self) -> anyhow::Result<()> { + let versioned_state = VersionedState { + version: CURRENT_VERSION, + state: StateEnum::V1(self.clone()), + }; + set_state(&serde_json::to_vec(&versioned_state)?); + Ok(()) + } +} + +impl VersionedState { + pub fn new() -> Self { + Self { + version: CURRENT_VERSION, + state: StateEnum::V1(AppStoreState { + last_block: 0, + listings: HashMap::new(), + published: HashSet::new(), + }), + } + } + + pub fn state(&self) -> &AppStoreState { + match &self.state { + StateEnum::V1(state) => state, + } + } + + pub fn state_mut(&mut self) -> &mut AppStoreState { + match &mut self.state { + StateEnum::V1(state) => state, + } + } + + pub fn update_block(&mut self, block: u64) { + if block > 2 { + self.state_mut().last_block = block - 2; + } + } + + pub fn load_or_create() -> Self { + match get_state() { + Some(bytes) => match serde_json::from_slice(&bytes) { + Ok(state) => { + let state: VersionedState = state; + if state.version != CURRENT_VERSION { + println!( + "migrating state from version {} to {}", + state.version, CURRENT_VERSION + ); + state.migrate() + } else { + state + } + } + Err(e) => { + println!("failed to deserialize state: {e}, creating new"); // note, dangerous but we can always re-index. + Self::new() + } + }, + None => Self::new(), + } + } + + // potential future migrations can be added here. + fn migrate(self) -> Self { + match self.version { + 1 => self, // current version + // 2 => migrate_to_v3(self), + // 3 => migrate_to_v4(self), + _ => Self::new(), // unknown version, start fresh + } + } +} + /// listing information derived from metadata hash in listing event #[derive(Clone, Debug, Deserialize, Serialize)] pub struct PackageListing { @@ -106,8 +192,13 @@ fn init(our: Address) { // can change, log requests can take quite a long time. let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT); - let mut state = fetch_state(eth_provider); - fetch_and_subscribe_logs(&our, &mut state); + let mut versioned_state = VersionedState::load_or_create(); + let state = versioned_state.state_mut(); + + let contract_address = KIMAP_ADDRESS.parse::().unwrap(); + let kimap = kimap::Kimap::new(eth_provider, contract_address); + + fetch_and_subscribe_logs(&our, state, &kimap); loop { match await_message() { @@ -115,7 +206,7 @@ fn init(our: Address) { print_to_terminal(1, &format!("chain: got network error: {send_error}")); } Ok(message) => { - if let Err(e) = handle_message(&our, &mut state, &message) { + if let Err(e) = handle_message(&our, state, &message, &kimap) { print_to_terminal(1, &format!("chain: error handling message: {:?}", e)); } } @@ -123,7 +214,12 @@ fn init(our: Address) { } } -fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> { +fn handle_message( + our: &Address, + state: &mut AppStoreState, + message: &Message, + kimap: &kimap::Kimap, +) -> anyhow::Result<()> { if !message.is_request() { if message.is_local(&our) && message.source().process == "timer:distro:sys" { // handling of ETH RPC subscriptions delayed by DELAY_MS @@ -132,7 +228,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow return Err(anyhow::anyhow!("foo")); }; let log = serde_json::from_slice(context)?; - handle_eth_log(our, state, log, false)?; + handle_eth_log(our, state, log, kimap, false)?; return Ok(()); } } else { @@ -153,10 +249,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } } else { // attempt to resubscribe - state - .kimap - .provider - .subscribe_loop(1, app_store_filter(state)); + kimap.provider.subscribe_loop(1, app_store_filter()); } } Req::Request(chains) => { @@ -168,7 +261,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow Ok(()) } -fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result<()> { +fn handle_local_request(state: &mut AppStoreState, req: ChainRequests) -> anyhow::Result<()> { match req { ChainRequests::GetApp(package_id) => { let onchain_app = state @@ -213,6 +306,7 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result ChainRequests::StartAutoUpdate(package_id) => { if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) { listing.auto_update = true; + state.save()?; let response = ChainResponses::AutoUpdateStarted; Response::new().body(&response).send()?; } else { @@ -223,6 +317,7 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result ChainRequests::StopAutoUpdate(package_id) => { if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) { listing.auto_update = false; + state.save()?; let response = ChainResponses::AutoUpdateStopped; Response::new().body(&response).send()?; } else { @@ -236,8 +331,9 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result fn handle_eth_log( our: &Address, - state: &mut State, + state: &mut AppStoreState, log: eth::Log, + kimap: &kimap::Kimap, startup: bool, ) -> anyhow::Result<()> { let block_number: u64 = log @@ -272,7 +368,7 @@ fn handle_eth_log( let hash_note = format!("~metadata-hash.{}", note.parent_path); // owner can change which we don't track (yet?) so don't save, need to get when desired - let (tba, _owner, data) = match state.kimap.get(&hash_note) { + let (tba, _owner, data) = match kimap.get(&hash_note) { Ok(gr) => Ok(gr), Err(e) => match e { eth::EthError::RpcError(_) => { @@ -281,7 +377,7 @@ fn handle_eth_log( // `timer:distro:sys` so that events are processed in // order of receipt std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); - state.kimap.get(&hash_note) + kimap.get(&hash_note) } _ => Err(e), }, @@ -339,7 +435,12 @@ fn handle_eth_log( } } + state.last_block = block_number; + if !startup { + // save logs immediately if not startup. + state.save()?; + // if auto_update is enabled, send a message to downloads to kick off the update. if let Some(listing) = state.listings.get(&package_id) { if listing.auto_update { @@ -357,15 +458,13 @@ fn handle_eth_log( } } - state.last_saved_block = block_number; - Ok(()) } /// after startup, fetch metadata for all listings /// we do this as a separate step to not repeatedly fetch outdated metadata /// as we process logs. -fn update_all_metadata(state: &mut State) { +fn update_all_metadata(state: &mut AppStoreState, kimap: &kimap::Kimap) { state.listings.retain(|package_id, listing| { let (tba, metadata_hash) = { // generate ~metadata-hash full-path @@ -376,7 +475,7 @@ fn update_all_metadata(state: &mut State) { ); // owner can change which we don't track (yet?) so don't save, need to get when desired - let Ok((tba, _owner, data)) = (match state.kimap.get(&hash_note) { + let Ok((tba, _owner, data)) = (match kimap.get(&hash_note) { Ok(gr) => Ok(gr), Err(e) => match e { eth::EthError::RpcError(_) => { @@ -385,7 +484,7 @@ fn update_all_metadata(state: &mut State) { // `timer:distro:sys` so that events are processed in // order of receipt std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); - state.kimap.get(&hash_note) + kimap.get(&hash_note) } _ => Err(e), }, @@ -423,6 +522,9 @@ fn update_all_metadata(state: &mut State) { } true }); + if let Err(e) = state.save() { + print_to_terminal(0, &format!("error saving state: {e}")); + } } /// create the filter used for app store getLogs and subscription. @@ -431,31 +533,29 @@ fn update_all_metadata(state: &mut State) { /// at the URI. /// /// this means that ~metadata-hash should be *posted before or at the same time* as ~metadata-uri! -pub fn app_store_filter(state: &State) -> eth::Filter { +pub fn app_store_filter() -> eth::Filter { let notes = vec![keccak256("~metadata-uri")]; + let contract_address = KIMAP_ADDRESS.parse::().unwrap(); eth::Filter::new() - .address(*state.kimap.address()) + .address(contract_address) .events([kimap::contract::Note::SIGNATURE]) .topic3(notes) } /// create a filter to fetch app store event logs from chain and subscribe to new events -pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) { - let filter = app_store_filter(state); +pub fn fetch_and_subscribe_logs(our: &Address, state: &mut AppStoreState, kimap: &kimap::Kimap) { + let filter = app_store_filter(); // get past logs, subscribe to new ones. // subscribe first so we don't miss any logs println!("subscribing..."); - state.kimap.provider.subscribe_loop(1, filter.clone()); - for log in fetch_logs( - &state.kimap.provider, - &filter.from_block(state.last_saved_block), - ) { - if let Err(e) = handle_eth_log(our, state, log, true) { + kimap.provider.subscribe_loop(1, filter.clone()); + for log in fetch_logs(&kimap.provider, &filter.from_block(state.last_block)) { + if let Err(e) = handle_eth_log(our, state, log, kimap, true) { print_to_terminal(1, &format!("error ingesting log: {e}")); }; } - update_all_metadata(state); + update_all_metadata(state, kimap); } /// fetch logs from the chain with a given filter @@ -504,32 +604,6 @@ pub fn keccak_256_hash(bytes: &[u8]) -> String { format!("0x{:x}", hasher.finalize()) } -/// fetch state from disk or create a new one if that fails -pub fn fetch_state(provider: eth::Provider) -> State { - if let Some(state_bytes) = get_state() { - match serde_json::from_slice::(&state_bytes) { - Ok(state) => { - if state.kimap.address().to_string() == KIMAP_ADDRESS { - return state; - } else { - println!( - "state contract address mismatch. rebuilding state! expected {}, got {}", - KIMAP_ADDRESS, - state.kimap.address().to_string() - ); - } - } - Err(e) => println!("failed to deserialize saved state, rebuilding: {e}"), - } - } - State { - kimap: kimap::Kimap::new(provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()), - last_saved_block: 0, - listings: HashMap::new(), - published: HashSet::new(), - } -} - // quite annoyingly, we must convert from our gen'd version of PackageId // to the process_lib's gen'd version. this is in order to access custom // Impls that we want to use diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 765e36e14..0b01fabf3 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -4,8 +4,8 @@ use crate::kinode::process::kns_indexer::{ use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; use kinode_process_lib::{ - await_message, call_init, eth, kimap, net, print_to_terminal, println, timer, Address, Message, - Request, Response, + await_message, call_init, eth, get_state, kimap, net, print_to_terminal, println, set_state, + timer, Address, Message, Request, Response, }; use serde::{Deserialize, Serialize}; use std::{ @@ -39,11 +39,23 @@ const MAX_PENDING_ATTEMPTS: u8 = 3; const SUBSCRIPTION_TIMEOUT: u64 = 60; const DELAY_MS: u64 = 1_000; // 1s -#[derive(Clone, Debug, Serialize, Deserialize)] -struct State { - chain_id: u64, - // what contract this state pertains to - contract_address: eth::Address, +pub const CURRENT_VERSION: u32 = 1; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VersionedState { + version: u32, + #[serde(flatten)] + state: StateEnum, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum StateEnum { + V1(KnsState), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KnsState { // namehash to human readable name names: HashMap, // human readable name to most recent on-chain routing information as json @@ -52,12 +64,93 @@ struct State { last_block: u64, } +impl KnsState { + pub fn save(&self) -> anyhow::Result<()> { + let versioned_state = VersionedState { + version: CURRENT_VERSION, + state: StateEnum::V1(self.clone()), + }; + set_state(&serde_json::to_vec(&versioned_state)?); + Ok(()) + } +} + +impl VersionedState { + pub fn new() -> Self { + Self { + version: CURRENT_VERSION, + state: StateEnum::V1(KnsState { + last_block: 0, + names: HashMap::new(), + nodes: HashMap::new(), + }), + } + } + + pub fn state(&self) -> &KnsState { + match &self.state { + StateEnum::V1(state) => state, + } + } + + pub fn state_mut(&mut self) -> &mut KnsState { + match &mut self.state { + StateEnum::V1(state) => state, + } + } + + pub fn update_block(&mut self, block: u64) { + if block > 2 { + self.state_mut().last_block = block - 2; + } + } + + pub fn save(&self) -> anyhow::Result<()> { + set_state(&serde_json::to_vec(self).unwrap()); + Ok(()) + } + + pub fn load_or_create() -> Self { + match get_state() { + Some(bytes) => match serde_json::from_slice(&bytes) { + Ok(state) => { + let state: VersionedState = state; + if state.version != CURRENT_VERSION { + println!( + "migrating state from version {} to {}", + state.version, CURRENT_VERSION + ); + state.migrate() + } else { + state + } + } + Err(e) => { + println!("failed to deserialize state: {e}, creating new"); // note, dangerous but we can always re-index. + Self::new() + } + }, + None => Self::new(), + } + } + + // potential future migrations can be added here. + fn migrate(self) -> Self { + match self.version { + 1 => self, // current version + // 2 => migrate_to_v3(self), + // 3 => migrate_to_v4(self), + _ => Self::new(), // unknown version, start fresh + } + } +} + // note: not defined in wit api right now like IndexerRequests. #[derive(Clone, Debug, Serialize, Deserialize)] enum IndexerResponses { Name(Option), NodeInfo(Option), - GetState(State), + GetState(KnsState), } #[derive(Debug, thiserror::Error)] @@ -75,26 +168,27 @@ fn init(our: Address) { // us to quickly verify we have the updated mapping with root hash, but right // now it's tricky to recover from missed events. - let state = State { - chain_id: CHAIN_ID, - contract_address: KIMAP_ADDRESS.parse::().unwrap(), - nodes: HashMap::new(), - names: HashMap::new(), - last_block: KIMAP_FIRST_BLOCK, - }; + // Create or load versioned state - if let Err(e) = main(our, state) { + let versioned_state = VersionedState::load_or_create(); + + if let Err(e) = main(our, versioned_state) { println!("fatal error: {e}"); } } -fn main(our: Address, mut state: State) -> anyhow::Result<()> { +fn main(our: Address, mut versioned_state: VersionedState) -> anyhow::Result<()> { + let mut state = versioned_state.state_mut(); + #[cfg(feature = "simulation-mode")] add_temp_hardcoded_tlzs(&mut state); + let contract_address = KIMAP_ADDRESS.parse::().unwrap(); + // sub_id: 1 let mints_filter = eth::Filter::new() - .address(state.contract_address) + .address(contract_address) + .from_block(state.last_block - 2) .to_block(eth::BlockNumberOrTag::Latest) .event("Mint(bytes32,bytes32,bytes,bytes)"); @@ -108,21 +202,22 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // sub_id: 2 let notes_filter = eth::Filter::new() - .address(state.contract_address) + .address(contract_address) + .from_block(state.last_block - 2) .to_block(eth::BlockNumberOrTag::Latest) .event("Note(bytes32,bytes32,bytes,bytes,bytes)") .topic3(notes); // 60s timeout -- these calls can take a long time // if they do time out, we try them again - let eth_provider: eth::Provider = eth::Provider::new(state.chain_id, SUBSCRIPTION_TIMEOUT); + let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, SUBSCRIPTION_TIMEOUT); print_to_terminal( 1, &format!( "subscribing, state.block: {}, chain_id: {}", state.last_block - 1, - state.chain_id + CHAIN_ID ), ); @@ -218,7 +313,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { } fn handle_eth_message( - state: &mut State, + state: &mut KnsState, eth_provider: ð::Provider, tick: bool, pending_notes: &mut BTreeMap>, @@ -261,7 +356,7 @@ fn handle_eth_message( } fn handle_pending_notes( - state: &mut State, + state: &mut KnsState, pending_notes: &mut BTreeMap>, ) -> anyhow::Result<()> { if pending_notes.is_empty() { @@ -314,7 +409,7 @@ fn handle_pending_notes( Ok(()) } -fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Result<()> { +fn handle_note(state: &mut KnsState, note: &kimap::contract::Note) -> anyhow::Result<()> { let note_label = String::from_utf8(note.label.to_vec())?; let node_hash = note.parenthash.to_string(); @@ -392,7 +487,7 @@ fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Resul } fn handle_log( - state: &mut State, + state: &mut KnsState, pending_notes: &mut BTreeMap>, log: ð::Log, ) -> anyhow::Result<()> { @@ -456,6 +551,9 @@ fn handle_log( } }; + if let Err(e) = state.save() { + print_to_terminal(1, &format!("state-saving error! {e:?}")); + } Ok(()) } @@ -463,7 +561,7 @@ fn handle_log( fn fetch_and_process_logs( eth_provider: ð::Provider, - state: &mut State, + state: &mut KnsState, filter: eth::Filter, pending_notes: &mut BTreeMap>, ) { @@ -519,7 +617,7 @@ fn get_parent_name(names: &HashMap, parent_hash: &str) -> Option // TEMP. Either remove when event reimitting working with anvil, // or refactor into better structure(!) #[cfg(feature = "simulation-mode")] -fn add_temp_hardcoded_tlzs(state: &mut State) { +fn add_temp_hardcoded_tlzs(state: &mut KnsState) { // add some hardcoded top level zones state.names.insert( "0xdeeac81ae11b64e7cab86d089c306e5d223552a630f02633ce170d2786ff1bbd".to_string(), @@ -532,7 +630,7 @@ fn add_temp_hardcoded_tlzs(state: &mut State) { } /// Decodes bytes into an array of keccak256 hashes (32 bytes each) and returns their full names. -fn decode_routers(data: &[u8], state: &State) -> Vec { +fn decode_routers(data: &[u8], state: &KnsState) -> Vec { if data.len() % 32 != 0 { print_to_terminal( 1,