From daae14c68406da81e2cc6a8b1fc75fe14a8cb5a7 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Mon, 9 Dec 2024 17:23:02 +0200 Subject: [PATCH 1/8] kns_indexer: add kv as state backend --- Cargo.lock | 68 ++-- .../kns_indexer/kns_indexer/Cargo.toml | 2 +- .../kns_indexer/kns_indexer/src/lib.rs | 319 +++++++++++++----- kinode/packages/kns_indexer/pkg/manifest.json | 6 +- 4 files changed, 268 insertions(+), 127 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86152e012..0e8c1d912 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,7 +78,7 @@ name = "alias" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "serde", "serde_json", "wit-bindgen", @@ -1012,7 +1012,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "bincode", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "process_macros", "rand 0.8.5", "serde", @@ -1383,7 +1383,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "serde", "serde_json", "url", @@ -1592,7 +1592,7 @@ name = "cat" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "serde", "serde_json", "wit-bindgen", @@ -1656,7 +1656,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "bincode", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "process_macros", "rand 0.8.5", "serde", @@ -1675,7 +1675,7 @@ version = "0.2.1" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "pleco", "serde", "serde_json", @@ -1867,7 +1867,7 @@ checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" name = "contacts" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.9.4 (git+https://github.com/kinode-dao/process_lib?rev=088a549)", + "kinode_process_lib 0.9.4", "process_macros", "serde", "serde_json", @@ -2444,7 +2444,7 @@ name = "download" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "process_macros", "serde", "serde_json", @@ -2456,7 +2456,7 @@ name = "downloads" version = "0.5.0" dependencies = [ "anyhow", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "process_macros", "rand 0.8.5", "serde", @@ -2493,7 +2493,7 @@ dependencies = [ name = "echo" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "wit-bindgen", ] @@ -2732,7 +2732,7 @@ version = "0.2.0" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "process_macros", "rand 0.8.5", "serde", @@ -2886,7 +2886,7 @@ dependencies = [ name = "get_block" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "serde", "serde_json", "wit-bindgen", @@ -2951,7 +2951,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" name = "globe" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "serde", "serde_json", "url", @@ -3078,7 +3078,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" name = "help" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "wit-bindgen", ] @@ -3107,7 +3107,7 @@ checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" name = "hi" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "serde", "serde_json", "wit-bindgen", @@ -3140,7 +3140,7 @@ version = "0.1.2" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "serde", "serde_json", "wit-bindgen", @@ -3455,7 +3455,7 @@ name = "install" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "process_macros", "serde", "serde_json", @@ -3632,7 +3632,7 @@ name = "kfetch" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "rmp-serde", "serde", "serde_json", @@ -3644,7 +3644,7 @@ name = "kill" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "serde", "serde_json", "wit-bindgen", @@ -3740,8 +3740,7 @@ dependencies = [ [[package]] name = "kinode_process_lib" version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c257733fdc158b8223e43d92baeac02fe3d6a06b62953dbaea36e989f861b138" +source = "git+https://github.com/kinode-dao/process_lib?rev=088a549#088a5497257eada697e0869d6a8d7e9ef5e620f6" dependencies = [ "alloy 0.1.4", "alloy-primitives", @@ -3762,8 +3761,9 @@ dependencies = [ [[package]] name = "kinode_process_lib" -version = "0.9.4" -source = "git+https://github.com/kinode-dao/process_lib?rev=088a549#088a5497257eada697e0869d6a8d7e9ef5e620f6" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613dee198b9f19c72b55324ceb70a55da0e63c0b17ee5d528c6bfb6705267f7f" dependencies = [ "alloy 0.1.4", "alloy-primitives", @@ -3827,7 +3827,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "hex", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "rmp-serde", "serde", "serde_json", @@ -4056,7 +4056,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "regex", "serde", "serde_json", @@ -4226,7 +4226,7 @@ dependencies = [ name = "net_diagnostics" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "rmp-serde", "serde", "wit-bindgen", @@ -4552,7 +4552,7 @@ dependencies = [ name = "peer" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "rmp-serde", "serde", "wit-bindgen", @@ -4562,7 +4562,7 @@ dependencies = [ name = "peers" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "rmp-serde", "serde", "wit-bindgen", @@ -5603,7 +5603,7 @@ dependencies = [ "anyhow", "base64 0.22.1", "bincode", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "rmp-serde", "serde", "serde_json", @@ -5821,7 +5821,7 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" name = "state" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "serde", "serde_json", "wit-bindgen", @@ -5998,7 +5998,7 @@ version = "0.1.1" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "rand 0.8.5", "regex", "serde", @@ -6012,7 +6012,7 @@ version = "0.1.1" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "process_macros", "serde", "serde_json", @@ -6269,7 +6269,7 @@ version = "0.2.0" dependencies = [ "anyhow", "clap", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "serde", "serde_json", "wit-bindgen", @@ -6600,7 +6600,7 @@ name = "uninstall" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kinode_process_lib 0.9.7", "process_macros", "serde", "serde_json", diff --git a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml index 4a6b3cbe7..84b4963f0 100644 --- a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml +++ b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml @@ -11,7 +11,7 @@ anyhow = "1.0" alloy-primitives = "0.7.0" alloy-sol-types = "0.7.0" hex = "0.4.3" -kinode_process_lib = "0.9.4" +kinode_process_lib = "0.9.6" rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 765e36e14..ae7997580 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -4,13 +4,15 @@ use crate::kinode::process::kns_indexer::{ use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; use kinode_process_lib::{ - await_message, call_init, eth, kimap, net, print_to_terminal, println, timer, Address, Message, - Request, Response, + await_message, call_init, eth, kimap, + kv::{self, Kv}, + net, print_to_terminal, println, timer, Address, Message, Request, Response, }; use serde::{Deserialize, Serialize}; use std::{ - collections::{hash_map::HashMap, BTreeMap}, + collections::BTreeMap, net::{IpAddr, Ipv4Addr, Ipv6Addr}, + str::FromStr, }; wit_bindgen::generate!({ @@ -41,15 +43,172 @@ const DELAY_MS: u64 = 1_000; // 1s #[derive(Clone, Debug, Serialize, Deserialize)] struct State { - chain_id: u64, - // what contract this state pertains to - contract_address: eth::Address, - // namehash to human readable name - names: HashMap, - // human readable name to most recent on-chain routing information as json - nodes: HashMap, + // TODO: maybe these shouldn't even be stored in-mem. + // version of the state in kv + version: u32, // last block we have an update from last_block: u64, + // kv handle + // includes keys and values for: + // "chain_id", "version", "last_block", "contract_address", + // "{namehash}" -> "{name}", "{name}" -> "{node_info}" + kv: Kv>, // todo: maybe serialize directly into known enum of possible types? +} + +impl State { + fn load(our: &Address) -> Self { + let kv: Kv> = match kv::open(our.package_id(), "kns_indexer", Some(10)) { + Ok(kv) => kv, + Err(e) => panic!("fatal: error opening kns_indexer key_value database: {e:?}"), + }; + + let mut state = Self { + kv, + version: 0, + last_block: 0, + }; + + // Load or initialize chain_id + let chain_id = state.get_chain_id(); + if chain_id == 0 { + state.set_chain_id(CHAIN_ID); + } + + // Load or initialize contract_address + let contract_address = state.get_contract_address(); + if contract_address + == eth::Address::from_str(KIMAP_ADDRESS) + .expect("Failed to parse KIMAP_ADDRESS constant") + { + state.set_contract_address(contract_address); + } + + // Load or initialize last_block + let last_block = state.get_last_block(); + if last_block == 0 { + state.set_last_block(KIMAP_FIRST_BLOCK); + } + + // Load or initialize version + let version = state.get_version(); + if version == 0 { + state.set_version(1); // Start at version 1 + } + + // Update state struct with final values + state.version = state.get_version(); + state.last_block = state.get_last_block(); + + println!( + "kns_indexer: loaded state: version: {}, last_block: {}, chain_id: {}, kimap_address: {}", + state.version, + state.last_block, + state.get_chain_id(), + state.get_contract_address() + ); + + state + } + + fn get_last_block(&self) -> u64 { + self.kv + .get(&"last_block".to_string()) + .ok() + .and_then(|bytes| serde_json::from_slice(&bytes).ok()) + .unwrap_or(0) + } + + fn set_last_block(&mut self, block: u64) { + self.kv + .set( + &"last_block".to_string(), + &serde_json::to_vec(&block).unwrap(), + None, + ) + .unwrap(); + } + + fn get_version(&self) -> u32 { + self.kv + .get(&"version".to_string()) + .ok() + .and_then(|bytes| serde_json::from_slice(&bytes).ok()) + .unwrap_or(0) + } + + fn set_version(&mut self, version: u32) { + self.kv + .set( + &"version".to_string(), + &serde_json::to_vec(&version).unwrap(), + None, + ) + .unwrap(); + } + + fn get_name(&self, namehash: &str) -> Option { + self.kv + .get(&namehash.to_string()) + .ok() + .and_then(|bytes| String::from_utf8(bytes).ok()) + } + + fn set_name(&mut self, namehash: &str, name: &str) { + self.kv + .set(&namehash.to_string(), &name.as_bytes().to_vec(), None) + .unwrap(); + } + + fn get_node(&self, name: &str) -> Option { + self.kv + .get(&name.to_string()) + .ok() + .and_then(|bytes| serde_json::from_slice(&bytes).ok()) + } + + fn set_node(&mut self, name: &str, node: &net::KnsUpdate) { + self.kv + .set(&name.to_string(), &serde_json::to_vec(&node).unwrap(), None) + .unwrap(); + } + + fn get_chain_id(&self) -> u64 { + self.kv + .get(&"chain_id".to_string()) + .ok() + .and_then(|bytes| serde_json::from_slice(&bytes).ok()) + .unwrap_or(CHAIN_ID) + } + + fn set_chain_id(&mut self, chain_id: u64) { + self.kv + .set( + &"chain_id".to_string(), + &serde_json::to_vec(&chain_id).unwrap(), + None, + ) + .unwrap(); + } + + fn get_contract_address(&self) -> eth::Address { + match self.kv.get(&"contract_address".to_string()) { + Ok(bytes) => match serde_json::from_slice(&bytes) { + Ok(addr) => addr, + Err(_) => eth::Address::from_str(KIMAP_ADDRESS) + .expect("Failed to parse KIMAP_ADDRESS constant"), + }, + Err(_) => eth::Address::from_str(KIMAP_ADDRESS) + .expect("Failed to parse KIMAP_ADDRESS constant"), + } + } + + fn set_contract_address(&mut self, contract_address: eth::Address) { + if let Ok(bytes) = serde_json::to_vec(&contract_address) { + self.kv + .set(&"contract_address".to_string(), &bytes, None) + .expect("Failed to set contract address"); + } + } } // note: not defined in wit api right now like IndexerRequests. @@ -70,18 +229,8 @@ call_init!(init); fn init(our: Address) { println!("indexing on contract address {KIMAP_ADDRESS}"); - // we **can** persist PKI state between boots but with current size, it's - // more robust just to reload the whole thing. the new contracts will allow - // us to quickly verify we have the updated mapping with root hash, but right - // now it's tricky to recover from missed events. - - let state = State { - chain_id: CHAIN_ID, - contract_address: KIMAP_ADDRESS.parse::().unwrap(), - nodes: HashMap::new(), - names: HashMap::new(), - last_block: KIMAP_FIRST_BLOCK, - }; + // state is loaded from kv, and updated with the current block number and version. + let state = State::load(&our); if let Err(e) = main(our, state) { println!("fatal error: {e}"); @@ -94,7 +243,8 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // sub_id: 1 let mints_filter = eth::Filter::new() - .address(state.contract_address) + .address(state.get_contract_address()) + .from_block(state.get_last_block()) .to_block(eth::BlockNumberOrTag::Latest) .event("Mint(bytes32,bytes32,bytes,bytes)"); @@ -108,21 +258,23 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // sub_id: 2 let notes_filter = eth::Filter::new() - .address(state.contract_address) + .address(state.get_contract_address()) + .from_block(state.get_last_block()) .to_block(eth::BlockNumberOrTag::Latest) .event("Note(bytes32,bytes32,bytes,bytes,bytes)") .topic3(notes); // 60s timeout -- these calls can take a long time // if they do time out, we try them again - let eth_provider: eth::Provider = eth::Provider::new(state.chain_id, SUBSCRIPTION_TIMEOUT); + let eth_provider: eth::Provider = + eth::Provider::new(state.get_chain_id(), SUBSCRIPTION_TIMEOUT); print_to_terminal( 1, &format!( "subscribing, state.block: {}, chain_id: {}", - state.last_block - 1, - state.chain_id + state.get_last_block() - 1, + state.get_chain_id() ), ); @@ -197,7 +349,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // sending a response to the proper place. Response::new() .body(serde_json::to_vec(&IndexerResponses::Name( - state.names.get(hash).cloned(), + state.get_name(hash), ))?) .send()?; } @@ -205,10 +357,12 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { IndexerRequests::NodeInfo(NodeInfoRequest { ref name, .. }) => { Response::new() .body(serde_json::to_vec(&IndexerResponses::NodeInfo( - state.nodes.get(name).cloned(), + state.get_node(name), ))?) .send()?; } + // note no longer relevant. + // TODO: redo with iterator once available. IndexerRequests::GetState(GetStateRequest { .. }) => { Response::new().body(serde_json::to_vec(&state)?).send()?; } @@ -322,68 +476,53 @@ fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Resul return Err(anyhow::anyhow!("skipping invalid note: {note_label}")); } - let Some(node_name) = get_parent_name(&state.names, &node_hash) else { + let Some(node_name) = get_parent_name(&state, &node_hash) else { return Err(KnsError::NoParentError.into()); }; - match note_label.as_str() { - "~ws-port" => { - let ws = bytes_to_port(¬e.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { + if let Some(mut node) = state.get_node(&node_name) { + match note_label.as_str() { + "~ws-port" => { + let ws = bytes_to_port(¬e.data)?; node.ports.insert("ws".to_string(), ws); - // port defined, -> direct - node.routers = vec![]; + node.routers = vec![]; // port defined, -> direct } - } - "~tcp-port" => { - let tcp = bytes_to_port(¬e.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { + "~tcp-port" => { + let tcp = bytes_to_port(¬e.data)?; node.ports.insert("tcp".to_string(), tcp); - // port defined, -> direct - node.routers = vec![]; - } - } - "~net-key" => { - if note.data.len() != 32 { - return Err(anyhow::anyhow!("invalid net-key length")); + node.routers = vec![]; // port defined, -> direct } - if let Some(node) = state.nodes.get_mut(&node_name) { + "~net-key" => { + if note.data.len() != 32 { + return Err(anyhow::anyhow!("invalid net-key length")); + } node.public_key = hex::encode(¬e.data); } - } - "~routers" => { - let routers = decode_routers(¬e.data, state); - if let Some(node) = state.nodes.get_mut(&node_name) { + "~routers" => { + let routers = decode_routers(¬e.data, state); node.routers = routers; - // -> indirect - node.ports = BTreeMap::new(); + node.ports = BTreeMap::new(); // -> indirect node.ips = vec![]; } - } - "~ip" => { - let ip = bytes_to_ip(¬e.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { + "~ip" => { + let ip = bytes_to_ip(¬e.data)?; node.ips = vec![ip.to_string()]; - // -> direct - node.routers = vec![]; + node.routers = vec![]; // -> direct + } + _other => { + // Ignore unknown notes } } - _other => { - // Ignore unknown notes - } - } - // only send an update if we have a *full* set of data for networking: - // a node name, plus either or - if let Some(node_info) = state.nodes.get(&node_name) { - if !node_info.public_key.is_empty() - && ((!node_info.ips.is_empty() && !node_info.ports.is_empty()) - || node_info.routers.len() > 0) + // Update the node in the state + state.set_node(&node_name, &node); + + // Only send an update if we have a *full* set of data for networking + if !node.public_key.is_empty() + && ((!node.ips.is_empty() && !node.ports.is_empty()) || !node.routers.is_empty()) { Request::to(("our", "net", "distro", "sys")) - .body(rmp_serde::to_vec(&net::NetAction::KnsUpdate( - node_info.clone(), - ))?) + .body(rmp_serde::to_vec(&net::NetAction::KnsUpdate(node))?) .send()?; } } @@ -411,15 +550,15 @@ fn handle_log( return Err(anyhow::anyhow!("skipping invalid name: {name}")); } - let full_name = match get_parent_name(&state.names, &parent_hash) { + let full_name = match get_parent_name(&state, &parent_hash) { Some(parent_name) => format!("{name}.{parent_name}"), None => name, }; - state.names.insert(child_hash.clone(), full_name.clone()); - state.nodes.insert( - full_name.clone(), - net::KnsUpdate { + state.set_name(&child_hash.clone(), &full_name.clone()); + state.set_node( + &full_name.clone(), + &net::KnsUpdate { name: full_name.clone(), public_key: String::new(), ips: Vec::new(), @@ -486,13 +625,13 @@ fn fetch_and_process_logs( } } -fn get_parent_name(names: &HashMap, parent_hash: &str) -> Option { - let mut current_hash = parent_hash; +fn get_parent_name(state: &State, parent_hash: &str) -> Option { + let mut current_hash = parent_hash.to_string(); let mut components = Vec::new(); // Collect components in a vector let mut visited_hashes = std::collections::HashSet::new(); - while let Some(parent_name) = names.get(current_hash) { - if !visited_hashes.insert(current_hash) { + while let Some(parent_name) = state.get_name(¤t_hash) { + if !visited_hashes.insert(current_hash.clone()) { break; } @@ -501,7 +640,7 @@ fn get_parent_name(names: &HashMap, parent_hash: &str) -> Option } // Update current_hash to the parent's hash for the next iteration - if let Some(new_parent_hash) = names.get(parent_name) { + if let Some(new_parent_hash) = state.get_name(&parent_name) { current_hash = new_parent_hash; } else { break; @@ -521,13 +660,13 @@ fn get_parent_name(names: &HashMap, parent_hash: &str) -> Option #[cfg(feature = "simulation-mode")] fn add_temp_hardcoded_tlzs(state: &mut State) { // add some hardcoded top level zones - state.names.insert( - "0xdeeac81ae11b64e7cab86d089c306e5d223552a630f02633ce170d2786ff1bbd".to_string(), - "os".to_string(), + state.set_name( + &"0xdeeac81ae11b64e7cab86d089c306e5d223552a630f02633ce170d2786ff1bbd".to_string(), + &"os".to_string(), ); - state.names.insert( - "0x137d9e4cc0479164d40577620cb3b41b083c6e8dbf58f8523be76d207d6fd8ea".to_string(), - "dev".to_string(), + state.set_name( + &"0x137d9e4cc0479164d40577620cb3b41b083c6e8dbf58f8523be76d207d6fd8ea".to_string(), + &"dev".to_string(), ); } @@ -545,7 +684,7 @@ fn decode_routers(data: &[u8], state: &State) -> Vec { for chunk in data.chunks(32) { let hash_str = format!("0x{}", hex::encode(chunk)); - match state.names.get(&hash_str) { + match state.get_name(&hash_str) { Some(full_name) => routers.push(full_name.clone()), None => print_to_terminal( 1, diff --git a/kinode/packages/kns_indexer/pkg/manifest.json b/kinode/packages/kns_indexer/pkg/manifest.json index 863653260..e791e57ae 100644 --- a/kinode/packages/kns_indexer/pkg/manifest.json +++ b/kinode/packages/kns_indexer/pkg/manifest.json @@ -8,12 +8,14 @@ "eth:distro:sys", "http_server:distro:sys", "net:distro:sys", - "timer:distro:sys" + "timer:distro:sys", + "kv:distro:sys" ], "grant_capabilities": [ "eth:distro:sys", "http_server:distro:sys", - "timer:distro:sys" + "timer:distro:sys", + "kv:distro:sys" ], "public": false } From f633b078b7c3bea69bd2730df06cf340a17baeb2 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Mon, 9 Dec 2024 17:30:12 +0200 Subject: [PATCH 2/8] kns: add meta keys --- kinode/packages/kns_indexer/Cargo.lock | 6 +- .../packages/kns_indexer/get_block/Cargo.toml | 2 +- .../kns_indexer/kns_indexer/src/lib.rs | 82 ++++++++++++------- kinode/packages/kns_indexer/state/Cargo.toml | 2 +- 4 files changed, 56 insertions(+), 36 deletions(-) diff --git a/kinode/packages/kns_indexer/Cargo.lock b/kinode/packages/kns_indexer/Cargo.lock index ac88fe790..721ea9444 100644 --- a/kinode/packages/kns_indexer/Cargo.lock +++ b/kinode/packages/kns_indexer/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -1462,9 +1462,9 @@ dependencies = [ [[package]] name = "kinode_process_lib" -version = "0.9.4" +version = "0.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c257733fdc158b8223e43d92baeac02fe3d6a06b62953dbaea36e989f861b138" +checksum = "613dee198b9f19c72b55324ceb70a55da0e63c0b17ee5d528c6bfb6705267f7f" dependencies = [ "alloy", "alloy-primitives", diff --git a/kinode/packages/kns_indexer/get_block/Cargo.toml b/kinode/packages/kns_indexer/get_block/Cargo.toml index 351d48f6a..33658dd19 100644 --- a/kinode/packages/kns_indexer/get_block/Cargo.toml +++ b/kinode/packages/kns_indexer/get_block/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" simulation-mode = [] [dependencies] -kinode_process_lib = "0.9.4" +kinode_process_lib = "0.9.6" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = "0.24.0" diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index ae7997580..78a6606c6 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -43,15 +43,14 @@ const DELAY_MS: u64 = 1_000; // 1s #[derive(Clone, Debug, Serialize, Deserialize)] struct State { - // TODO: maybe these shouldn't even be stored in-mem. // version of the state in kv version: u32, // last block we have an update from last_block: u64, // kv handle // includes keys and values for: - // "chain_id", "version", "last_block", "contract_address", - // "{namehash}" -> "{name}", "{name}" -> "{node_info}" + // "meta:chain_id", "meta:version", "meta:last_block", "meta:contract_address", + // "names:{namehash}" -> "{name}", "nodes:{name}" -> "{node_info}" kv: Kv>, // todo: maybe serialize directly into known enum of possible types? } @@ -68,13 +67,13 @@ impl State { last_block: 0, }; - // Load or initialize chain_id + // load or initialize chain_id let chain_id = state.get_chain_id(); if chain_id == 0 { state.set_chain_id(CHAIN_ID); } - // Load or initialize contract_address + // load or initialize contract_address let contract_address = state.get_contract_address(); if contract_address == eth::Address::from_str(KIMAP_ADDRESS) @@ -83,19 +82,19 @@ impl State { state.set_contract_address(contract_address); } - // Load or initialize last_block + // load or initialize last_block let last_block = state.get_last_block(); if last_block == 0 { state.set_last_block(KIMAP_FIRST_BLOCK); } - // Load or initialize version + // load or initialize version let version = state.get_version(); if version == 0 { state.set_version(1); // Start at version 1 } - // Update state struct with final values + // update state struct with final values state.version = state.get_version(); state.last_block = state.get_last_block(); @@ -110,9 +109,30 @@ impl State { state } + fn meta_version_key() -> &'static str { + "meta:version" + } + fn meta_last_block_key() -> &'static str { + "meta:last_block" + } + fn meta_chain_id_key() -> &'static str { + "meta:chain_id" + } + fn meta_contract_address_key() -> &'static str { + "meta:contract_address" + } + + fn name_key(namehash: &str) -> String { + format!("names:{namehash}") + } + + fn node_key(name: &str) -> String { + format!("nodes:{name}") + } + fn get_last_block(&self) -> u64 { self.kv - .get(&"last_block".to_string()) + .get(&Self::meta_last_block_key().to_string()) .ok() .and_then(|bytes| serde_json::from_slice(&bytes).ok()) .unwrap_or(0) @@ -121,16 +141,17 @@ impl State { fn set_last_block(&mut self, block: u64) { self.kv .set( - &"last_block".to_string(), + &Self::meta_last_block_key().to_string(), &serde_json::to_vec(&block).unwrap(), None, ) .unwrap(); + self.last_block = block; } fn get_version(&self) -> u32 { self.kv - .get(&"version".to_string()) + .get(&Self::meta_version_key().to_string()) .ok() .and_then(|bytes| serde_json::from_slice(&bytes).ok()) .unwrap_or(0) @@ -139,42 +160,47 @@ impl State { fn set_version(&mut self, version: u32) { self.kv .set( - &"version".to_string(), + &Self::meta_version_key().to_string(), &serde_json::to_vec(&version).unwrap(), None, ) .unwrap(); + self.version = version; } fn get_name(&self, namehash: &str) -> Option { self.kv - .get(&namehash.to_string()) + .get(&Self::name_key(namehash)) .ok() .and_then(|bytes| String::from_utf8(bytes).ok()) } fn set_name(&mut self, namehash: &str, name: &str) { self.kv - .set(&namehash.to_string(), &name.as_bytes().to_vec(), None) + .set(&Self::name_key(namehash), &name.as_bytes().to_vec(), None) .unwrap(); } fn get_node(&self, name: &str) -> Option { self.kv - .get(&name.to_string()) + .get(&Self::node_key(name)) .ok() .and_then(|bytes| serde_json::from_slice(&bytes).ok()) } fn set_node(&mut self, name: &str, node: &net::KnsUpdate) { self.kv - .set(&name.to_string(), &serde_json::to_vec(&node).unwrap(), None) + .set( + &Self::node_key(name), + &serde_json::to_vec(&node).unwrap(), + None, + ) .unwrap(); } fn get_chain_id(&self) -> u64 { self.kv - .get(&"chain_id".to_string()) + .get(&Self::meta_chain_id_key().to_string()) .ok() .and_then(|bytes| serde_json::from_slice(&bytes).ok()) .unwrap_or(CHAIN_ID) @@ -183,7 +209,7 @@ impl State { fn set_chain_id(&mut self, chain_id: u64) { self.kv .set( - &"chain_id".to_string(), + &Self::meta_chain_id_key().to_string(), &serde_json::to_vec(&chain_id).unwrap(), None, ) @@ -191,7 +217,7 @@ impl State { } fn get_contract_address(&self) -> eth::Address { - match self.kv.get(&"contract_address".to_string()) { + match self.kv.get(&Self::meta_contract_address_key().to_string()) { Ok(bytes) => match serde_json::from_slice(&bytes) { Ok(addr) => addr, Err(_) => eth::Address::from_str(KIMAP_ADDRESS) @@ -205,7 +231,7 @@ impl State { fn set_contract_address(&mut self, contract_address: eth::Address) { if let Ok(bytes) = serde_json::to_vec(&contract_address) { self.kv - .set(&"contract_address".to_string(), &bytes, None) + .set(&Self::meta_contract_address_key().to_string(), &bytes, None) .expect("Failed to set contract address"); } } @@ -402,7 +428,7 @@ fn handle_eth_message( let block_number = eth_provider.get_block_number(); if let Ok(block_number) = block_number { print_to_terminal(2, &format!("new block: {}", block_number)); - state.last_block = block_number; + state.set_last_block(block_number); } } handle_pending_notes(state, pending_notes)?; @@ -440,15 +466,9 @@ fn handle_pending_notes( None => { print_to_terminal(1, &format!("pending note handling error: {e:?}")) } - Some(ee) => match ee { - KnsError::NoParentError => { - // print_to_terminal( - // 1, - // &format!("note still awaiting mint; attempt {attempt}"), - // ); - keep_notes.push((note, attempt + 1)); - } - }, + Some(KnsError::NoParentError) => { + keep_notes.push((note, attempt + 1)); + } } } } @@ -536,7 +556,7 @@ fn handle_log( log: ð::Log, ) -> anyhow::Result<()> { if let Some(block) = log.block_number { - state.last_block = block; + state.set_last_block(block); } match log.topics()[0] { diff --git a/kinode/packages/kns_indexer/state/Cargo.toml b/kinode/packages/kns_indexer/state/Cargo.toml index 3299252dd..7308b80f1 100644 --- a/kinode/packages/kns_indexer/state/Cargo.toml +++ b/kinode/packages/kns_indexer/state/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" simulation-mode = [] [dependencies] -kinode_process_lib = "0.9.4" +kinode_process_lib = "0.9.6" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = "0.24.0" From 8a5ac51e7ba1ea0b9e92a3eae7f3cc5f6e761d55 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Tue, 10 Dec 2024 21:54:14 +0200 Subject: [PATCH 3/8] kns: block number hotfix --- .../kns_indexer/kns_indexer/src/lib.rs | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 78a6606c6..74cc32269 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -176,26 +176,30 @@ impl State { } fn set_name(&mut self, namehash: &str, name: &str) { + println!("set_name({namehash}, {name})"); self.kv .set(&Self::name_key(namehash), &name.as_bytes().to_vec(), None) .unwrap(); } fn get_node(&self, name: &str) -> Option { - self.kv + let x = self.kv .get(&Self::node_key(name)) .ok() - .and_then(|bytes| serde_json::from_slice(&bytes).ok()) + .and_then(|bytes| serde_json::from_slice(&bytes).ok()); + println!("get_node({name}) -> {x:?}"); + x } fn set_node(&mut self, name: &str, node: &net::KnsUpdate) { - self.kv + let x = self.kv .set( &Self::node_key(name), &serde_json::to_vec(&node).unwrap(), None, - ) - .unwrap(); + ); + println!("set_node({name}, {:?})", node); + x.unwrap(); } fn get_chain_id(&self) -> u64 { @@ -258,6 +262,8 @@ fn init(our: Address) { // state is loaded from kv, and updated with the current block number and version. let state = State::load(&our); + println!("got last block: {}", state.get_last_block()); + if let Err(e) = main(our, state) { println!("fatal error: {e}"); } @@ -308,7 +314,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { println!("subscribing to new logs..."); eth_provider.subscribe_loop(1, mints_filter.clone()); eth_provider.subscribe_loop(2, notes_filter.clone()); - + println!("done subscribing to new logs."); // if subscription results come back in the wrong order, we store them here // until the right block is reached. @@ -626,7 +632,7 @@ fn fetch_and_process_logs( filter: eth::Filter, pending_notes: &mut BTreeMap>, ) { - let filter = filter.from_block(KIMAP_FIRST_BLOCK); + let filter = filter.from_block(state.last_block); loop { match eth_provider.get_logs(&filter) { Ok(logs) => { From 3259f279ea1f2a0342b53240d66739ad361014d4 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Tue, 10 Dec 2024 21:54:33 +0200 Subject: [PATCH 4/8] sqlite: fix faulty db_path --- kinode/src/sqlite.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kinode/src/sqlite.rs b/kinode/src/sqlite.rs index ecc08be3a..84e778b12 100644 --- a/kinode/src/sqlite.rs +++ b/kinode/src/sqlite.rs @@ -83,10 +83,14 @@ impl SqliteState { fs::create_dir_all(&db_path).await?; - let db_file_path = format!("{}.db", db); + let db_file_path = db_path.join(format!("{}.db", db)); let db_conn = Connection::open(db_file_path)?; - let _ = db_conn.execute("PRAGMA journal_mode=WAL", []); + let _: String = db_conn.query_row( + "PRAGMA journal_mode=WAL", + [], + |row| row.get(0) + )?; self.open_dbs.insert(key, Mutex::new(db_conn)); From 9fddb2cedd9796ebb87ca34770fed5cae4c0e952 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Tue, 10 Dec 2024 21:54:52 +0200 Subject: [PATCH 5/8] app_store: sqlite storage backend --- kinode/packages/app_store/chain/src/lib.rs | 612 ++++++++++++++------ kinode/packages/app_store/pkg/manifest.json | 2 + 2 files changed, 431 insertions(+), 183 deletions(-) diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index 66db5995a..44c29e9bd 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -33,14 +33,14 @@ use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; use kinode::process::chain::ChainResponses; use kinode_process_lib::{ - await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap, + await_message, call_init, eth, http, kimap, get_blob, print_to_terminal, println, timer, Address, Message, PackageId, Request, Response, + sqlite::{self, Sqlite}, + kernel_types as kt, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::{HashMap, HashSet}, - str::FromStr, -}; +use std::str::FromStr; +use std::collections::HashMap; wit_bindgen::generate!({ path: "target/wit", @@ -63,7 +63,6 @@ const KIMAP_ADDRESS: &str = "0x9CE8cCD2932DC727c70f9ae4f8C2b68E6Abed58C"; const DELAY_MS: u64 = 1_000; // 1s -#[derive(Debug, Serialize, Deserialize)] pub struct State { /// the kimap helper we are using pub kimap: kimap::Kimap, @@ -71,10 +70,8 @@ pub struct State { /// when we boot, we can read logs starting from this block and /// rebuild latest state. pub last_saved_block: u64, - /// onchain listings - pub listings: HashMap, - /// set of packages that we have published - pub published: HashSet, + /// tables: listings: , published: vec + pub db: DB, } /// listing information derived from metadata hash in listing event @@ -83,10 +80,9 @@ pub struct PackageListing { pub tba: eth::Address, pub metadata_uri: String, pub metadata_hash: String, - // should this even be optional? - // relegate to only valid apps maybe? pub metadata: Option, pub auto_update: bool, + pub block: u64 } #[derive(Debug, Serialize, Deserialize, process_macros::SerdeJsonInto)] @@ -96,18 +92,270 @@ pub enum Req { Request(ChainRequests), } +pub struct DB { + inner: Sqlite, +} + +impl DB { + pub fn connect(our: &Address) -> anyhow::Result { + let inner = sqlite::open(our.package_id(), "app_store_chain.sqlite", Some(10))?; + // create tables + inner.write(CREATE_META_TABLE.into(), vec![], None)?; + inner.write(CREATE_LISTINGS_TABLE.into(), vec![], None)?; + inner.write(CREATE_PUBLISHED_TABLE.into(), vec![], None)?; + + Ok(Self { inner }) + } + + pub fn get_last_saved_block(&self) -> anyhow::Result { + let query = "SELECT value FROM meta WHERE key = 'last_saved_block'"; + let rows = self.inner.read(query.into(), vec![])?; + if let Some(row) = rows.get(0) { + if let Some(val_str) = row.get("value").and_then(|v| v.as_str()) { + if let Ok(block) = val_str.parse::() { + return Ok(block); + } + } + } + Ok(0) + } + + pub fn set_last_saved_block(&self, block: u64) -> anyhow::Result<()> { + let query = "INSERT INTO meta (key, value) VALUES ('last_saved_block', ?) + ON CONFLICT(key) DO UPDATE SET value=excluded.value"; + let params = vec![block.to_string().into()]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn insert_or_update_listing(&self, package_id: &PackageId, listing: &PackageListing) -> anyhow::Result<()> { + let metadata_json = if let Some(m) = &listing.metadata { + serde_json::to_string(m)? + } else { + "".to_string() + }; + + let query = "INSERT INTO listings (package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(package_name, publisher_node) + DO UPDATE SET + tba=excluded.tba, + metadata_uri=excluded.metadata_uri, + metadata_hash=excluded.metadata_hash, + metadata_json=excluded.metadata_json, + auto_update=excluded.auto_update, + block=excluded.block"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + listing.tba.to_string().into(), + listing.metadata_uri.clone().into(), + listing.metadata_hash.clone().into(), + metadata_json.into(), + (if listing.auto_update {1} else {0}).into(), + listing.block.into() + ]; + + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + + pub fn delete_listing(&self, package_id: &PackageId) -> anyhow::Result<()> { + let query = "DELETE FROM listings WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn get_listing(&self, package_id: &PackageId) -> anyhow::Result> { + let query = "SELECT tba, metadata_uri, metadata_hash, metadata_json, auto_update, block FROM listings WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + let rows = self.inner.read(query.into(), params)?; + if let Some(row) = rows.get(0) { + Ok(Some(self.row_to_listing(row)?)) + } else { + Ok(None) + } + } + + pub fn get_all_listings(&self) -> anyhow::Result> { + let query = "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block FROM listings"; + let rows = self.inner.read(query.into(), vec![])?; + let mut listings = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + let listing = self.row_to_listing(&row)?; + listings.push((pid, listing)); + } + Ok(listings) + } + + pub fn get_listings_batch(&self, limit: u64, offset: u64) -> anyhow::Result> { + let query = format!( + "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block + FROM listings + ORDER BY package_name, publisher_node + LIMIT {} OFFSET {}", + limit, offset + ); + + let rows = self.inner.read(query, vec![])?; + let mut listings = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + let listing = self.row_to_listing(&row)?; + listings.push((pid, listing)); + } + Ok(listings) + } + + pub fn get_listings_since_block(&self, block_number: u64) -> anyhow::Result> { + let query = "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block + FROM listings + WHERE block > ?"; + let params = vec![block_number.into()]; + let rows = self.inner.read(query.into(), params)?; + let mut listings = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + let listing = self.row_to_listing(&row)?; + listings.push((pid, listing)); + } + Ok(listings) + } + + + pub fn row_to_listing(&self, row: &HashMap) -> anyhow::Result { + let tba_str = row["tba"].as_str().ok_or_else(|| anyhow::anyhow!("Invalid tba"))?; + let tba = tba_str.parse::()?; + let metadata_uri = row["metadata_uri"].as_str().unwrap_or("").to_string(); + let metadata_hash = row["metadata_hash"].as_str().unwrap_or("").to_string(); + let metadata_json = row["metadata_json"].as_str().unwrap_or(""); + let metadata: Option = + if metadata_json.is_empty() { + None + } else { + serde_json::from_str(metadata_json)? + }; + let auto_update = row["auto_update"].as_i64().unwrap_or(0) == 1; + let block = row["block"].as_i64().unwrap_or(0) as u64; + + Ok(PackageListing { + tba, + metadata_uri, + metadata_hash, + metadata, + auto_update, + block, + }) + } + + pub fn get_published(&self, package_id: &PackageId) -> anyhow::Result { + let query = "SELECT 1 FROM published WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + let rows = self.inner.read(query.into(), params)?; + Ok(!rows.is_empty()) + } + + pub fn insert_published(&self, package_id: &PackageId) -> anyhow::Result<()> { + let query = "INSERT INTO published (package_name, publisher_node) VALUES (?, ?) ON CONFLICT DO NOTHING"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn delete_published(&self, package_id: &PackageId) -> anyhow::Result<()> { + let query = "DELETE FROM published WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn get_all_published(&self) -> anyhow::Result> { + let query = "SELECT package_name, publisher_node FROM published"; + let rows = self.inner.read(query.into(), vec![])?; + let mut result = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + result.push(pid); + } + Ok(result) + } +} + +const CREATE_META_TABLE: &str = " +CREATE TABLE IF NOT EXISTS meta ( + key TEXT PRIMARY KEY, + value TEXT +);"; + +const CREATE_LISTINGS_TABLE: &str = " +CREATE TABLE IF NOT EXISTS listings ( + package_name TEXT NOT NULL, + publisher_node TEXT NOT NULL, + tba TEXT NOT NULL, + metadata_uri TEXT, + metadata_hash TEXT, + metadata_json TEXT, + auto_update INTEGER NOT NULL DEFAULT 0, + block INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (package_name, publisher_node) +);"; + +const CREATE_PUBLISHED_TABLE: &str = " +CREATE TABLE IF NOT EXISTS published ( + package_name TEXT NOT NULL, + publisher_node TEXT NOT NULL, + PRIMARY KEY (package_name, publisher_node) +);"; + call_init!(init); fn init(our: Address) { + let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT); + + let db = DB::connect(&our).expect("failed to open DB"); + let kimap_helper = kimap::Kimap::new(eth_provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()); + let last_saved_block = db.get_last_saved_block().unwrap_or(0); + println!( - "chain started, indexing on contract address {}", - KIMAP_ADDRESS + "chain started, indexing on kimap address {} at block {}", + KIMAP_ADDRESS, last_saved_block ); - // create new provider with request-timeout of 60s - // can change, log requests can take quite a long time. - let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT); + let mut state = State { + kimap: kimap_helper, + last_saved_block, + db, + }; - let mut state = fetch_state(eth_provider); - fetch_and_subscribe_logs(&our, &mut state); + fetch_and_subscribe_logs(&our, &mut state, last_saved_block); loop { match await_message() { @@ -126,17 +374,15 @@ fn init(our: Address) { fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> { if !message.is_request() { if message.is_local(&our) && message.source().process == "timer:distro:sys" { - // handling of ETH RPC subscriptions delayed by DELAY_MS - // to allow kns to have a chance to process block: handle now let Some(context) = message.context() else { - return Err(anyhow::anyhow!("foo")); + return Err(anyhow::anyhow!("No context in timer message")); }; let log = serde_json::from_slice(context)?; handle_eth_log(our, state, log, false)?; return Ok(()); } } else { - match message.body().try_into()? { + match serde_json::from_slice::(message.body())? { Req::Eth(eth_result) => { if !message.is_local(our) || message.source().process != "eth:distro:sys" { return Err(anyhow::anyhow!( @@ -147,16 +393,11 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow if let Ok(eth::EthSub { result, .. }) = eth_result { if let eth::SubscriptionResult::Log(ref log) = result { - // delay handling of ETH RPC subscriptions by DELAY_MS - // to allow kns to have a chance to process block timer::set_timer(DELAY_MS, Some(serde_json::to_vec(log)?)); } } else { - // attempt to resubscribe - state - .kimap - .provider - .subscribe_loop(1, app_store_filter(state)); + // re-subscribe if error + state.kimap.provider.subscribe_loop(1, app_store_filter(state)); } } Req::Request(chains) => { @@ -171,48 +412,37 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result<()> { match req { ChainRequests::GetApp(package_id) => { - let onchain_app = state - .listings - .get(&package_id.clone().to_process_lib()) - .map(|app| OnchainApp { - package_id: package_id, - tba: app.tba.to_string(), - metadata_uri: app.metadata_uri.clone(), - metadata_hash: app.metadata_hash.clone(), - metadata: app.metadata.as_ref().map(|m| m.clone().into()), - auto_update: app.auto_update, - }); + let pid = package_id.clone().to_process_lib(); + let listing = state.db.get_listing(&pid)?; + let onchain_app = listing.map(|app| app.to_onchain_app(&pid)); let response = ChainResponses::GetApp(onchain_app); Response::new().body(&response).send()?; } ChainRequests::GetApps => { - let apps: Vec = state - .listings - .iter() - .map(|(id, listing)| listing.to_onchain_app(id)) + let listings = state.db.get_all_listings()?; + let apps: Vec = listings + .into_iter() + .map(|(pid, listing)| listing.to_onchain_app(&pid)) .collect(); - let response = ChainResponses::GetApps(apps); Response::new().body(&response).send()?; } ChainRequests::GetOurApps => { - let apps: Vec = state - .published - .iter() - .filter_map(|id| { - state - .listings - .get(id) - .map(|listing| listing.to_onchain_app(id)) - }) - .collect(); - + let published_list = state.db.get_all_published()?; + let mut apps = Vec::new(); + for pid in published_list { + if let Some(listing) = state.db.get_listing(&pid)? { + apps.push(listing.to_onchain_app(&pid)); + } + } let response = ChainResponses::GetOurApps(apps); Response::new().body(&response).send()?; } ChainRequests::StartAutoUpdate(package_id) => { - if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) { + let pid = package_id.to_process_lib(); + if let Some(mut listing) = state.db.get_listing(&pid)? { listing.auto_update = true; + state.db.insert_or_update_listing(&pid, &listing)?; let response = ChainResponses::AutoUpdateStarted; Response::new().body(&response).send()?; } else { @@ -221,8 +451,10 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result } } ChainRequests::StopAutoUpdate(package_id) => { - if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) { + let pid = package_id.to_process_lib(); + if let Some(mut listing) = state.db.get_listing(&pid)? { listing.auto_update = false; + state.db.insert_or_update_listing(&pid, &listing)?; let response = ChainResponses::AutoUpdateStopped; Response::new().body(&response).send()?; } else { @@ -256,7 +488,7 @@ fn handle_eth_log( if package.is_empty() || publisher.is_empty() { Err(anyhow::anyhow!("invalid publisher name")) } else { - Ok(PackageId::new(&package, &publisher)) + Ok(PackageId::new(package, publisher)) } })?; @@ -265,8 +497,8 @@ fn handle_eth_log( // at the URI. let metadata_uri = String::from_utf8_lossy(¬e.data).to_string(); - let is_our_package = &package_id.publisher() == &our.node(); - + let is_our_package = package_id.publisher() == our.node(); + println!("we got a log: {block_number} {package_id} {metadata_uri}"); let (tba, metadata_hash) = if !startup { // generate ~metadata-hash full-path let hash_note = format!("~metadata-hash.{}", note.parent_path); @@ -290,10 +522,12 @@ fn handle_eth_log( match data { None => { - // if ~metadata-uri is also empty, this is an unpublish action! + // unpublish if metadata_uri empty if metadata_uri.is_empty() { - state.published.remove(&package_id); - state.listings.remove(&package_id); + state.db.delete_published(&package_id)?; + state.db.delete_listing(&package_id)?; + state.last_saved_block = block_number; + state.db.set_last_saved_block(block_number)?; return Ok(()); } return Err(anyhow::anyhow!( @@ -307,7 +541,7 @@ fn handle_eth_log( }; if is_our_package { - state.published.insert(package_id.clone()); + state.db.insert_published(&package_id)?; } // if this is a startup event, we don't need to fetch metadata from the URI -- @@ -320,109 +554,143 @@ fn handle_eth_log( None }; - match state.listings.entry(package_id.clone()) { - std::collections::hash_map::Entry::Occupied(mut listing) => { - let listing = listing.get_mut(); - listing.metadata_uri = metadata_uri; - listing.tba = tba; - listing.metadata_hash = metadata_hash; - listing.metadata = metadata.clone(); - } - std::collections::hash_map::Entry::Vacant(listing) => { - listing.insert(PackageListing { - tba, - metadata_uri, - metadata_hash, - metadata: metadata.clone(), - auto_update: false, - }); - } + let mut listing = state + .db + .get_listing(&package_id)? + .unwrap_or(PackageListing { + tba, + metadata_uri: metadata_uri.clone(), + metadata_hash: metadata_hash.clone(), + metadata: metadata.clone(), + auto_update: false, + block: block_number, + }); + // update fields + listing.tba = tba; + listing.metadata_uri = metadata_uri; + listing.metadata_hash = metadata_hash; + listing.metadata = metadata.clone(); + + state.db.insert_or_update_listing(&package_id, &listing)?; + + if !startup && listing.auto_update { + println!("kicking off auto-update for: {}", package_id); + Request::to(("our", "downloads", "app_store", "sys")) + .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { + package_id: crate::kinode::process::main::PackageId::from_process_lib( + package_id.clone(), + ), + metadata: metadata.unwrap().into(), + })) + .send() + .unwrap(); } if !startup { - // if auto_update is enabled, send a message to downloads to kick off the update. - if let Some(listing) = state.listings.get(&package_id) { - if listing.auto_update { - print_to_terminal(0, &format!("kicking off auto-update for: {}", package_id)); - Request::to(("our", "downloads", "app_store", "sys")) - .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { - package_id: crate::kinode::process::main::PackageId::from_process_lib( - package_id, - ), - metadata: metadata.unwrap().into(), - })) - .send() - .unwrap(); - } - } + state.last_saved_block = block_number; + state.db.set_last_saved_block(block_number)?; } - state.last_saved_block = block_number; - Ok(()) } /// after startup, fetch metadata for all listings /// we do this as a separate step to not repeatedly fetch outdated metadata /// as we process logs. -fn update_all_metadata(state: &mut State) { - state.listings.retain(|package_id, listing| { - let (tba, metadata_hash) = { - // generate ~metadata-hash full-path - let hash_note = format!( - "~metadata-hash.{}.{}", - package_id.package(), - package_id.publisher() - ); - - // owner can change which we don't track (yet?) so don't save, need to get when desired - let Ok((tba, _owner, data)) = (match state.kimap.get(&hash_note) { - Ok(gr) => Ok(gr), - Err(e) => match e { - eth::EthError::RpcError(_) => { - // retry on RpcError after DELAY_MS sleep - // sleep here rather than with, e.g., a message to - // `timer:distro:sys` so that events are processed in - // order of receipt - std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); - state.kimap.get(&hash_note) - } - _ => Err(e), - }, - }) else { - return false; - }; +fn update_all_metadata(state: &mut State, last_saved_block: u64) { + let updated_listings = match state.db.get_listings_since_block(last_saved_block) { + Ok(listings) => listings, + Err(e) => { + print_to_terminal(1, &format!("error fetching updated listings since block {last_saved_block}: {e}")); + return; + } + }; - match data { - None => { - // if ~metadata-uri is also empty, this is an unpublish action! - if listing.metadata_uri.is_empty() { - state.published.remove(package_id); + for (pid, mut listing) in updated_listings { + let hash_note = format!("~metadata-hash.{}.{}", pid.package(), pid.publisher()); + let (tba, metadata_hash) = match state.kimap.get(&hash_note) { + Ok((t, _o, data)) => { + match data { + None => { + // If metadata_uri empty, unpublish + if listing.metadata_uri.is_empty() { + if let Err(e) = state.db.delete_published(&pid) { + print_to_terminal(1, &format!("error deleting published: {e}")); + } + } + if let Err(e) = state.db.delete_listing(&pid) { + print_to_terminal(1, &format!("error deleting listing: {e}")); + } + continue; + } + Some(hash_note) => (t, String::from_utf8_lossy(&hash_note).to_string()), + } + } + Err(e) => { + // If RpcError, retry once after delay + if let eth::EthError::RpcError(_) = e { + std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); + match state.kimap.get(&hash_note) { + Ok((t, _o, data)) => { + if let Some(hash_note) = data { + (t, String::from_utf8_lossy(&hash_note).to_string()) + } else { + // no data again after retry + if listing.metadata_uri.is_empty() { + if let Err(e) = state.db.delete_published(&pid) { + print_to_terminal(1, &format!("error deleting published: {e}")); + } + } + if let Err(e) = state.db.delete_listing(&pid) { + print_to_terminal(1, &format!("error deleting listing: {e}")); + } + continue; + } + } + Err(e2) => { + print_to_terminal(1, &format!("error retrieving metadata-hash after retry: {e2:?}")); + continue; + } } - return false; + } else { + print_to_terminal(1, &format!("error retrieving metadata-hash: {e:?} for {pid}")); + continue; } - Some(hash_note) => (tba, String::from_utf8_lossy(&hash_note).to_string()), } }; + + // Update listing fields listing.tba = tba; listing.metadata_hash = metadata_hash; - let metadata = - fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30).ok(); + + let metadata = match fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30) { + Ok(md) => Some(md), + Err(err) => { + print_to_terminal(1, &format!("error fetching metadata for {}: {err}", pid)); + None + } + }; listing.metadata = metadata.clone(); + + if let Err(e) = state.db.insert_or_update_listing(&pid, &listing) { + print_to_terminal(1, &format!("error updating listing {}: {e}", pid)); + } + if listing.auto_update { - print_to_terminal(0, &format!("kicking off auto-update for: {}", package_id)); - Request::to(("our", "downloads", "app_store", "sys")) - .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { - package_id: crate::kinode::process::main::PackageId::from_process_lib( - package_id.clone(), - ), - metadata: metadata.unwrap().into(), - })) - .send() - .unwrap(); + if let Some(md) = metadata { + print_to_terminal(0, &format!("kicking off auto-update for: {}", pid)); + if let Err(e) = Request::to(("our", "downloads", "app_store", "sys")) + .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { + package_id: crate::kinode::process::main::PackageId::from_process_lib(pid.clone()), + metadata: md.into(), + })) + .send() + { + print_to_terminal(1, &format!("error sending auto-update request: {e}")); + } + } } - true - }); + } } /// create the filter used for app store getLogs and subscription. @@ -441,21 +709,25 @@ pub fn app_store_filter(state: &State) -> eth::Filter { } /// create a filter to fetch app store event logs from chain and subscribe to new events -pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) { +pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State, last_saved_block: u64) { let filter = app_store_filter(state); // get past logs, subscribe to new ones. // subscribe first so we don't miss any logs - println!("subscribing..."); - state.kimap.provider.subscribe_loop(1, filter.clone()); - for log in fetch_logs( - &state.kimap.provider, - &filter.from_block(state.last_saved_block), - ) { + state.kimap.provider.subscribe_loop(1, filter.clone()); + println!("fetching old logs from block {last_saved_block}"); + for log in fetch_logs(&state.kimap.provider, &filter.from_block(last_saved_block)) { if let Err(e) = handle_eth_log(our, state, log, true) { print_to_terminal(1, &format!("error ingesting log: {e}")); }; } - update_all_metadata(state); + + update_all_metadata(state, last_saved_block); + // save updated last_saved_block + if let Ok(block_number) = state.kimap.provider.get_block_number() { + state.last_saved_block = block_number; + state.db.set_last_saved_block(block_number).unwrap(); + } + println!("up to date to block {}", state.last_saved_block); } /// fetch logs from the chain with a given filter @@ -504,32 +776,6 @@ pub fn keccak_256_hash(bytes: &[u8]) -> String { format!("0x{:x}", hasher.finalize()) } -/// fetch state from disk or create a new one if that fails -pub fn fetch_state(provider: eth::Provider) -> State { - if let Some(state_bytes) = get_state() { - match serde_json::from_slice::(&state_bytes) { - Ok(state) => { - if state.kimap.address().to_string() == KIMAP_ADDRESS { - return state; - } else { - println!( - "state contract address mismatch. rebuilding state! expected {}, got {}", - KIMAP_ADDRESS, - state.kimap.address().to_string() - ); - } - } - Err(e) => println!("failed to deserialize saved state, rebuilding: {e}"), - } - } - State { - kimap: kimap::Kimap::new(provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()), - last_saved_block: 0, - listings: HashMap::new(), - published: HashSet::new(), - } -} - // quite annoyingly, we must convert from our gen'd version of PackageId // to the process_lib's gen'd version. this is in order to access custom // Impls that we want to use @@ -584,4 +830,4 @@ impl From for OnchainMetadata { }, } } -} +} \ No newline at end of file diff --git a/kinode/packages/app_store/pkg/manifest.json b/kinode/packages/app_store/pkg/manifest.json index d409e2656..b01c2aa10 100644 --- a/kinode/packages/app_store/pkg/manifest.json +++ b/kinode/packages/app_store/pkg/manifest.json @@ -39,6 +39,7 @@ "eth:distro:sys", "http_server:distro:sys", "http_client:distro:sys", + "sqlite:distro:sys", { "process": "vfs:distro:sys", "params": { @@ -52,6 +53,7 @@ "vfs:distro:sys", "http_client:distro:sys", "eth:distro:sys", + "sqlite:distro:sys", "timer:distro:sys" ], "public": false From cb320416d93eb8623c2b23e2f5614ffe3aea3291 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 10 Dec 2024 19:55:22 +0000 Subject: [PATCH 6/8] Format Rust code using rustfmt --- kinode/packages/app_store/chain/src/lib.rs | 99 ++++++++++++------- .../kns_indexer/kns_indexer/src/lib.rs | 14 +-- kinode/src/sqlite.rs | 6 +- 3 files changed, 74 insertions(+), 45 deletions(-) diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index 44c29e9bd..29bc1c488 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -33,14 +33,14 @@ use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; use kinode::process::chain::ChainResponses; use kinode_process_lib::{ - await_message, call_init, eth, http, kimap, get_blob, - print_to_terminal, println, timer, Address, Message, PackageId, Request, Response, + await_message, call_init, eth, get_blob, http, kernel_types as kt, kimap, print_to_terminal, + println, sqlite::{self, Sqlite}, - kernel_types as kt, + timer, Address, Message, PackageId, Request, Response, }; use serde::{Deserialize, Serialize}; -use std::str::FromStr; use std::collections::HashMap; +use std::str::FromStr; wit_bindgen::generate!({ path: "target/wit", @@ -82,7 +82,7 @@ pub struct PackageListing { pub metadata_hash: String, pub metadata: Option, pub auto_update: bool, - pub block: u64 + pub block: u64, } #[derive(Debug, Serialize, Deserialize, process_macros::SerdeJsonInto)] @@ -128,13 +128,17 @@ impl DB { Ok(()) } - pub fn insert_or_update_listing(&self, package_id: &PackageId, listing: &PackageListing) -> anyhow::Result<()> { + pub fn insert_or_update_listing( + &self, + package_id: &PackageId, + listing: &PackageListing, + ) -> anyhow::Result<()> { let metadata_json = if let Some(m) = &listing.metadata { serde_json::to_string(m)? } else { "".to_string() }; - + let query = "INSERT INTO listings (package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(package_name, publisher_node) @@ -152,14 +156,13 @@ impl DB { listing.metadata_uri.clone().into(), listing.metadata_hash.clone().into(), metadata_json.into(), - (if listing.auto_update {1} else {0}).into(), - listing.block.into() + (if listing.auto_update { 1 } else { 0 }).into(), + listing.block.into(), ]; self.inner.write(query.into(), params, None)?; Ok(()) } - pub fn delete_listing(&self, package_id: &PackageId) -> anyhow::Result<()> { let query = "DELETE FROM listings WHERE package_name = ? AND publisher_node = ?"; @@ -200,7 +203,11 @@ impl DB { Ok(listings) } - pub fn get_listings_batch(&self, limit: u64, offset: u64) -> anyhow::Result> { + pub fn get_listings_batch( + &self, + limit: u64, + offset: u64, + ) -> anyhow::Result> { let query = format!( "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block FROM listings @@ -222,7 +229,10 @@ impl DB { Ok(listings) } - pub fn get_listings_since_block(&self, block_number: u64) -> anyhow::Result> { + pub fn get_listings_since_block( + &self, + block_number: u64, + ) -> anyhow::Result> { let query = "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block FROM listings WHERE block > ?"; @@ -240,14 +250,18 @@ impl DB { Ok(listings) } - - pub fn row_to_listing(&self, row: &HashMap) -> anyhow::Result { - let tba_str = row["tba"].as_str().ok_or_else(|| anyhow::anyhow!("Invalid tba"))?; + pub fn row_to_listing( + &self, + row: &HashMap, + ) -> anyhow::Result { + let tba_str = row["tba"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Invalid tba"))?; let tba = tba_str.parse::()?; let metadata_uri = row["metadata_uri"].as_str().unwrap_or("").to_string(); let metadata_hash = row["metadata_hash"].as_str().unwrap_or("").to_string(); let metadata_json = row["metadata_json"].as_str().unwrap_or(""); - let metadata: Option = + let metadata: Option = if metadata_json.is_empty() { None } else { @@ -342,7 +356,8 @@ fn init(our: Address) { let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT); let db = DB::connect(&our).expect("failed to open DB"); - let kimap_helper = kimap::Kimap::new(eth_provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()); + let kimap_helper = + kimap::Kimap::new(eth_provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()); let last_saved_block = db.get_last_saved_block().unwrap_or(0); println!( @@ -397,7 +412,10 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } } else { // re-subscribe if error - state.kimap.provider.subscribe_loop(1, app_store_filter(state)); + state + .kimap + .provider + .subscribe_loop(1, app_store_filter(state)); } } Req::Request(chains) => { @@ -601,7 +619,10 @@ fn update_all_metadata(state: &mut State, last_saved_block: u64) { let updated_listings = match state.db.get_listings_since_block(last_saved_block) { Ok(listings) => listings, Err(e) => { - print_to_terminal(1, &format!("error fetching updated listings since block {last_saved_block}: {e}")); + print_to_terminal( + 1, + &format!("error fetching updated listings since block {last_saved_block}: {e}"), + ); return; } }; @@ -638,7 +659,10 @@ fn update_all_metadata(state: &mut State, last_saved_block: u64) { // no data again after retry if listing.metadata_uri.is_empty() { if let Err(e) = state.db.delete_published(&pid) { - print_to_terminal(1, &format!("error deleting published: {e}")); + print_to_terminal( + 1, + &format!("error deleting published: {e}"), + ); } } if let Err(e) = state.db.delete_listing(&pid) { @@ -648,12 +672,18 @@ fn update_all_metadata(state: &mut State, last_saved_block: u64) { } } Err(e2) => { - print_to_terminal(1, &format!("error retrieving metadata-hash after retry: {e2:?}")); + print_to_terminal( + 1, + &format!("error retrieving metadata-hash after retry: {e2:?}"), + ); continue; } } } else { - print_to_terminal(1, &format!("error retrieving metadata-hash: {e:?} for {pid}")); + print_to_terminal( + 1, + &format!("error retrieving metadata-hash: {e:?} for {pid}"), + ); continue; } } @@ -663,15 +693,16 @@ fn update_all_metadata(state: &mut State, last_saved_block: u64) { listing.tba = tba; listing.metadata_hash = metadata_hash; - let metadata = match fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30) { - Ok(md) => Some(md), - Err(err) => { - print_to_terminal(1, &format!("error fetching metadata for {}: {err}", pid)); - None - } - }; + let metadata = + match fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30) { + Ok(md) => Some(md), + Err(err) => { + print_to_terminal(1, &format!("error fetching metadata for {}: {err}", pid)); + None + } + }; listing.metadata = metadata.clone(); - + if let Err(e) = state.db.insert_or_update_listing(&pid, &listing) { print_to_terminal(1, &format!("error updating listing {}: {e}", pid)); } @@ -681,7 +712,9 @@ fn update_all_metadata(state: &mut State, last_saved_block: u64) { print_to_terminal(0, &format!("kicking off auto-update for: {}", pid)); if let Err(e) = Request::to(("our", "downloads", "app_store", "sys")) .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { - package_id: crate::kinode::process::main::PackageId::from_process_lib(pid.clone()), + package_id: crate::kinode::process::main::PackageId::from_process_lib( + pid.clone(), + ), metadata: md.into(), })) .send() @@ -713,7 +746,7 @@ pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State, last_saved_blo let filter = app_store_filter(state); // get past logs, subscribe to new ones. // subscribe first so we don't miss any logs - state.kimap.provider.subscribe_loop(1, filter.clone()); + state.kimap.provider.subscribe_loop(1, filter.clone()); println!("fetching old logs from block {last_saved_block}"); for log in fetch_logs(&state.kimap.provider, &filter.from_block(last_saved_block)) { if let Err(e) = handle_eth_log(our, state, log, true) { @@ -830,4 +863,4 @@ impl From for OnchainMetadata { }, } } -} \ No newline at end of file +} diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 74cc32269..01dcf5267 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -183,7 +183,8 @@ impl State { } fn get_node(&self, name: &str) -> Option { - let x = self.kv + let x = self + .kv .get(&Self::node_key(name)) .ok() .and_then(|bytes| serde_json::from_slice(&bytes).ok()); @@ -192,12 +193,11 @@ impl State { } fn set_node(&mut self, name: &str, node: &net::KnsUpdate) { - let x = self.kv - .set( - &Self::node_key(name), - &serde_json::to_vec(&node).unwrap(), - None, - ); + let x = self.kv.set( + &Self::node_key(name), + &serde_json::to_vec(&node).unwrap(), + None, + ); println!("set_node({name}, {:?})", node); x.unwrap(); } diff --git a/kinode/src/sqlite.rs b/kinode/src/sqlite.rs index 84e778b12..d8646be7d 100644 --- a/kinode/src/sqlite.rs +++ b/kinode/src/sqlite.rs @@ -86,11 +86,7 @@ impl SqliteState { let db_file_path = db_path.join(format!("{}.db", db)); let db_conn = Connection::open(db_file_path)?; - let _: String = db_conn.query_row( - "PRAGMA journal_mode=WAL", - [], - |row| row.get(0) - )?; + let _: String = db_conn.query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))?; self.open_dbs.insert(key, Mutex::new(db_conn)); From e0771725b752c25b0eff1724100886da313ee37c Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Tue, 17 Dec 2024 17:23:57 +0200 Subject: [PATCH 7/8] kns & app_store: review fixes --- .../packages/app-store/app-store/src/lib.rs | 2 - kinode/packages/app-store/chain/src/lib.rs | 10 ++--- kinode/packages/app-store/pkg/manifest.json | 4 +- .../kns-indexer/kns-indexer/src/lib.rs | 42 ++++++++----------- kinode/packages/kns-indexer/pkg/manifest.json | 2 +- 5 files changed, 23 insertions(+), 37 deletions(-) diff --git a/kinode/packages/app-store/app-store/src/lib.rs b/kinode/packages/app-store/app-store/src/lib.rs index ba385d662..15eff4bf1 100644 --- a/kinode/packages/app-store/app-store/src/lib.rs +++ b/kinode/packages/app-store/app-store/src/lib.rs @@ -78,8 +78,6 @@ pub enum Resp { call_init!(init); fn init(our: Address) { - println!("started"); - let mut http_server = http::server::HttpServer::new(5); http_api::init_frontend(&our, &mut http_server); diff --git a/kinode/packages/app-store/chain/src/lib.rs b/kinode/packages/app-store/chain/src/lib.rs index 1e641ccda..7e262b2b3 100644 --- a/kinode/packages/app-store/chain/src/lib.rs +++ b/kinode/packages/app-store/chain/src/lib.rs @@ -360,10 +360,6 @@ fn init(our: Address) { kimap::Kimap::new(eth_provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()); let last_saved_block = db.get_last_saved_block().unwrap_or(0); - println!( - "chain started, indexing on kimap address {} at block {}", - KIMAP_ADDRESS, last_saved_block - ); let mut state = State { kimap: kimap_helper, last_saved_block, @@ -520,7 +516,7 @@ fn handle_eth_log( let metadata_uri = String::from_utf8_lossy(¬e.data).to_string(); let is_our_package = package_id.publisher() == our.node(); - println!("we got a log: {block_number} {package_id} {metadata_uri}"); + let (tba, metadata_hash) = if !startup { // generate ~metadata-hash full-path let hash_note = format!("~metadata-hash.{}", note.parent_path); @@ -751,7 +747,7 @@ pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State, last_saved_blo // get past logs, subscribe to new ones. // subscribe first so we don't miss any logs state.kimap.provider.subscribe_loop(1, filter.clone()); - println!("fetching old logs from block {last_saved_block}"); + // println!("fetching old logs from block {last_saved_block}"); for log in fetch_logs(&state.kimap.provider, &filter.from_block(last_saved_block)) { if let Err(e) = handle_eth_log(our, state, log, true) { print_to_terminal(1, &format!("error ingesting log: {e}")); @@ -764,7 +760,7 @@ pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State, last_saved_blo state.last_saved_block = block_number; state.db.set_last_saved_block(block_number).unwrap(); } - println!("up to date to block {}", state.last_saved_block); + // println!("up to date to block {}", state.last_saved_block); } /// fetch logs from the chain with a given filter diff --git a/kinode/packages/app-store/pkg/manifest.json b/kinode/packages/app-store/pkg/manifest.json index ea3d3a43e..4a7e951e0 100644 --- a/kinode/packages/app-store/pkg/manifest.json +++ b/kinode/packages/app-store/pkg/manifest.json @@ -37,8 +37,8 @@ "vfs:distro:sys", "kns-indexer:kns-indexer:sys", "eth:distro:sys", - "http_server:distro:sys", - "http_client:distro:sys", + "http-server:distro:sys", + "http-client:distro:sys", "sqlite:distro:sys", { "process": "vfs:distro:sys", diff --git a/kinode/packages/kns-indexer/kns-indexer/src/lib.rs b/kinode/packages/kns-indexer/kns-indexer/src/lib.rs index 2d450d946..c7dcc2ac9 100644 --- a/kinode/packages/kns-indexer/kns-indexer/src/lib.rs +++ b/kinode/packages/kns-indexer/kns-indexer/src/lib.rs @@ -100,11 +100,17 @@ impl State { state.last_block = state.get_last_block(); println!( - "kns_indexer: loaded state: version: {}, last_block: {}, chain_id: {}, kimap_address: {}", + "\n 🐦‍⬛ KNS Indexer State\n\ + ▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔\n\ + Version {:>6}\n\ + Last Block {:>6}\n\ + Chain ID {:>6}\n\ + KIMAP {}\n\ + ▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁\n", state.version, state.last_block, state.get_chain_id(), - state.get_contract_address() + state.get_contract_address().to_string() ); state @@ -177,7 +183,6 @@ impl State { } fn set_name(&mut self, namehash: &str, name: &str) { - println!("set_name({namehash}, {name})"); self.kv .set(&Self::name_key(namehash), &name.as_bytes().to_vec(), None) .unwrap(); @@ -189,7 +194,6 @@ impl State { .get(&Self::node_key(name)) .ok() .and_then(|bytes| serde_json::from_slice(&bytes).ok()); - println!("get_node({name}) -> {x:?}"); x } @@ -199,7 +203,6 @@ impl State { &serde_json::to_vec(&node).unwrap(), None, ); - println!("set_node({name}, {:?})", node); x.unwrap(); } @@ -299,13 +302,9 @@ enum KnsError { call_init!(init); fn init(our: Address) { - println!("indexing on contract address {KIMAP_ADDRESS}"); - // state is loaded from kv, and updated with the current block number and version. let state = State::load(&our); - println!("got last block: {}", state.get_last_block()); - if let Err(e) = main(our, state) { println!("fatal error: {e}"); } @@ -315,10 +314,14 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { #[cfg(feature = "simulation-mode")] add_temp_hardcoded_tlzs(&mut state); + let chain_id = state.get_chain_id(); + let kimap_address = state.get_contract_address(); + let last_block = state.get_last_block(); + // sub_id: 1 let mints_filter = eth::Filter::new() - .address(state.get_contract_address()) - .from_block(state.get_last_block()) + .address(kimap_address) + .from_block(last_block) .to_block(eth::BlockNumberOrTag::Latest) .event("Mint(bytes32,bytes32,bytes,bytes)"); @@ -332,28 +335,17 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // sub_id: 2 let notes_filter = eth::Filter::new() - .address(state.get_contract_address()) - .from_block(state.get_last_block()) + .address(kimap_address) + .from_block(last_block) .to_block(eth::BlockNumberOrTag::Latest) .event("Note(bytes32,bytes32,bytes,bytes,bytes)") .topic3(notes); // 60s timeout -- these calls can take a long time // if they do time out, we try them again - let eth_provider: eth::Provider = - eth::Provider::new(state.get_chain_id(), SUBSCRIPTION_TIMEOUT); - - print_to_terminal( - 1, - &format!( - "subscribing, state.block: {}, chain_id: {}", - state.get_last_block() - 1, - state.get_chain_id() - ), - ); + let eth_provider: eth::Provider = eth::Provider::new(chain_id, SUBSCRIPTION_TIMEOUT); // subscribe to logs first, so no logs are missed - println!("subscribing to new logs..."); eth_provider.subscribe_loop(1, mints_filter.clone()); eth_provider.subscribe_loop(2, notes_filter.clone()); println!("done subscribing to new logs."); diff --git a/kinode/packages/kns-indexer/pkg/manifest.json b/kinode/packages/kns-indexer/pkg/manifest.json index ca86d702e..fe7b4fd17 100644 --- a/kinode/packages/kns-indexer/pkg/manifest.json +++ b/kinode/packages/kns-indexer/pkg/manifest.json @@ -13,7 +13,7 @@ ], "grant_capabilities": [ "eth:distro:sys", - "http_server:distro:sys", + "http-server:distro:sys", "timer:distro:sys", "kv:distro:sys" ], From d448966ef66775de28fb91a64185f8a2b352cff4 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Tue, 17 Dec 2024 22:00:51 +0200 Subject: [PATCH 8/8] kns: cleanup --- Cargo.lock | 87 +++++---- Cargo.toml | 3 +- kinode/packages/kns-indexer/Cargo.lock | 13 +- kinode/packages/kns-indexer/Cargo.toml | 3 +- .../kns-indexer/api/kns-indexer:sys-v0.wit | 18 +- .../packages/kns-indexer/get-block/Cargo.toml | 2 +- .../kns-indexer/kns-indexer/Cargo.toml | 2 +- .../kns-indexer/kns-indexer/src/lib.rs | 170 ++++++------------ kinode/packages/kns-indexer/pkg/scripts.json | 12 -- kinode/packages/kns-indexer/state/Cargo.toml | 20 --- kinode/packages/kns-indexer/state/src/lib.rs | 46 ----- 11 files changed, 111 insertions(+), 265 deletions(-) delete mode 100644 kinode/packages/kns-indexer/state/Cargo.toml delete mode 100644 kinode/packages/kns-indexer/state/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index c10e77731..5ceaadea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,7 +93,7 @@ name = "alias" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -1045,7 +1045,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "process_macros", "rand 0.8.5", "serde", @@ -1625,7 +1625,7 @@ name = "cat" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -1689,7 +1689,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "process_macros", "rand 0.8.5", "serde", @@ -1708,7 +1708,7 @@ version = "0.2.1" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "pleco", "serde", "serde_json", @@ -1906,7 +1906,7 @@ checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" name = "contacts" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "process_macros", "serde", "serde_json", @@ -2502,7 +2502,7 @@ name = "download" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "process_macros", "serde", "serde_json", @@ -2514,7 +2514,7 @@ name = "downloads" version = "0.5.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "process_macros", "rand 0.8.5", "serde", @@ -2551,7 +2551,7 @@ dependencies = [ name = "echo" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "wit-bindgen 0.36.0", ] @@ -2808,7 +2808,7 @@ version = "0.2.0" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "process_macros", "rand 0.8.5", "serde", @@ -2962,7 +2962,7 @@ dependencies = [ name = "get_block" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0209da1)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -3151,7 +3151,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" name = "help" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "wit-bindgen 0.36.0", ] @@ -3180,7 +3180,7 @@ checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" name = "hi" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -3213,7 +3213,7 @@ version = "0.1.2" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -3656,7 +3656,7 @@ name = "install" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "process_macros", "serde", "serde_json", @@ -3834,7 +3834,7 @@ name = "kfetch" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "rmp-serde", "serde", "serde_json", @@ -3846,7 +3846,7 @@ name = "kill" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -3939,6 +3939,28 @@ dependencies = [ "wit-bindgen 0.24.0", ] +[[package]] +name = "kinode_process_lib" +version = "0.10.0" +source = "git+https://github.com/kinode-dao/process_lib?rev=0209da1#0209da15340d9a2b92152916a8349d8f248775e5" +dependencies = [ + "alloy 0.1.4", + "alloy-primitives 0.7.7", + "alloy-sol-macro", + "alloy-sol-types", + "anyhow", + "bincode", + "http 1.2.0", + "mime_guess", + "rand 0.8.5", + "rmp-serde", + "serde", + "serde_json", + "thiserror 1.0.69", + "url", + "wit-bindgen 0.36.0", +] + [[package]] name = "kinode_process_lib" version = "0.10.0" @@ -4007,7 +4029,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "hex", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0209da1)", "process_macros", "rmp-serde", "serde", @@ -4244,7 +4266,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "regex", "serde", "serde_json", @@ -4413,7 +4435,7 @@ dependencies = [ name = "net-diagnostics" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "rmp-serde", "serde", "wit-bindgen 0.36.0", @@ -4759,7 +4781,7 @@ dependencies = [ name = "peer" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "rmp-serde", "serde", "wit-bindgen 0.36.0", @@ -4769,7 +4791,7 @@ dependencies = [ name = "peers" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "rmp-serde", "serde", "wit-bindgen 0.36.0", @@ -5849,7 +5871,7 @@ dependencies = [ "anyhow", "base64 0.22.1", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "rmp-serde", "serde", "serde_json", @@ -6066,17 +6088,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" -[[package]] -name = "state" -version = "0.1.0" -dependencies = [ - "kinode_process_lib 0.10.0", - "process_macros", - "serde", - "serde_json", - "wit-bindgen 0.36.0", -] - [[package]] name = "static_assertions" version = "1.1.0" @@ -6271,7 +6282,7 @@ version = "0.1.1" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "rand 0.8.5", "regex", "serde", @@ -6285,7 +6296,7 @@ version = "0.1.1" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "process_macros", "serde", "serde_json", @@ -6560,7 +6571,7 @@ version = "0.2.0" dependencies = [ "anyhow", "clap", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -6893,7 +6904,7 @@ name = "uninstall" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=9c441fe)", "process_macros", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 7ca952cdd..65a72d73f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,8 +19,7 @@ members = [ "kinode/packages/chess/chess", "kinode/packages/contacts/contacts", "kinode/packages/homepage/homepage", - "kinode/packages/kns-indexer/kns-indexer", "kinode/packages/kns-indexer/get-block", "kinode/packages/kns-indexer/state", - "kinode/packages/settings/settings", + "kinode/packages/kns-indexer/kns-indexer", "kinode/packages/kns-indexer/get-block", "kinode/packages/settings/settings", "kinode/packages/terminal/terminal", "kinode/packages/terminal/alias", "kinode/packages/terminal/cat", "kinode/packages/terminal/echo", "kinode/packages/terminal/help", "kinode/packages/terminal/hi", "kinode/packages/terminal/kfetch", diff --git a/kinode/packages/kns-indexer/Cargo.lock b/kinode/packages/kns-indexer/Cargo.lock index e42a487cb..f90ab4f9d 100644 --- a/kinode/packages/kns-indexer/Cargo.lock +++ b/kinode/packages/kns-indexer/Cargo.lock @@ -1508,7 +1508,7 @@ dependencies = [ [[package]] name = "kinode_process_lib" version = "0.10.0" -source = "git+https://github.com/kinode-dao/process_lib?rev=9c441fe#9c441fe19d534d640980b0495ef8dc00fcbabc03" +source = "git+https://github.com/kinode-dao/process_lib?rev=0209da1#0209da15340d9a2b92152916a8349d8f248775e5" dependencies = [ "alloy", "alloy-primitives 0.7.7", @@ -2458,17 +2458,6 @@ dependencies = [ "der", ] -[[package]] -name = "state" -version = "0.1.0" -dependencies = [ - "kinode_process_lib", - "process_macros", - "serde", - "serde_json", - "wit-bindgen", -] - [[package]] name = "static_assertions" version = "1.1.0" diff --git a/kinode/packages/kns-indexer/Cargo.toml b/kinode/packages/kns-indexer/Cargo.toml index 662ed7a66..8e9024729 100644 --- a/kinode/packages/kns-indexer/Cargo.toml +++ b/kinode/packages/kns-indexer/Cargo.toml @@ -3,8 +3,7 @@ resolver = "2" members = [ "get-block", "kns-indexer", - "state", -] + ] [profile.release] panic = "abort" diff --git a/kinode/packages/kns-indexer/api/kns-indexer:sys-v0.wit b/kinode/packages/kns-indexer/api/kns-indexer:sys-v0.wit index fd3fb52fb..f49ba4ace 100644 --- a/kinode/packages/kns-indexer/api/kns-indexer:sys-v0.wit +++ b/kinode/packages/kns-indexer/api/kns-indexer:sys-v0.wit @@ -14,16 +14,12 @@ interface kns-indexer { /// returns an Option /// set block to 0 if you just want to get the current state of the indexer node-info(node-info-request), - /// return the entire state of the indexer at the given block - /// set block to 0 if you just want to get the current state of the indexer - get-state(get-state-request), } variant indexer-response { name(option), node-info(option), - get-state(wit-state), - } + } record namehash-to-name-request { hash: string, @@ -35,10 +31,6 @@ interface kns-indexer { block: u64, } - record get-state-request { - block: u64, - } - record wit-kns-update { name: string, public-key: string, @@ -46,14 +38,6 @@ interface kns-indexer { ports: list>, // map, but wit doesn't support maps routers: list, } - - record wit-state { - chain-id: u64, - contract-address: list, // 20-byte ETH address - names: list>, // map, but wit doesn't support maps - nodes: list>, // map, but wit doesn't support maps - last-block: u64, - } } world kns-indexer-sys-v0 { diff --git a/kinode/packages/kns-indexer/get-block/Cargo.toml b/kinode/packages/kns-indexer/get-block/Cargo.toml index c90afe6c0..f818b82e2 100644 --- a/kinode/packages/kns-indexer/get-block/Cargo.toml +++ b/kinode/packages/kns-indexer/get-block/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" simulation-mode = [] [dependencies] -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "9c441fe" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "0209da1" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = "0.36.0" diff --git a/kinode/packages/kns-indexer/kns-indexer/Cargo.toml b/kinode/packages/kns-indexer/kns-indexer/Cargo.toml index bdd7ce1e3..35dc65496 100644 --- a/kinode/packages/kns-indexer/kns-indexer/Cargo.toml +++ b/kinode/packages/kns-indexer/kns-indexer/Cargo.toml @@ -11,7 +11,7 @@ anyhow = "1.0" alloy-primitives = "0.7.0" alloy-sol-types = "0.7.0" hex = "0.4.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "9c441fe" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "0209da1" } process_macros = "0.1" rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } diff --git a/kinode/packages/kns-indexer/kns-indexer/src/lib.rs b/kinode/packages/kns-indexer/kns-indexer/src/lib.rs index c7dcc2ac9..aa4b48693 100644 --- a/kinode/packages/kns-indexer/kns-indexer/src/lib.rs +++ b/kinode/packages/kns-indexer/kns-indexer/src/lib.rs @@ -1,6 +1,5 @@ use crate::kinode::process::kns_indexer::{ - GetStateRequest, IndexerRequest, IndexerResponse, NamehashToNameRequest, NodeInfoRequest, - WitKnsUpdate, WitState, + IndexerRequest, IndexerResponse, NamehashToNameRequest, NodeInfoRequest, WitKnsUpdate, }; use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; @@ -38,6 +37,8 @@ const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; // optimism #[cfg(feature = "simulation-mode")] const KIMAP_FIRST_BLOCK: u64 = 1; // local +const CURRENT_VERSION: u32 = 1; + const MAX_PENDING_ATTEMPTS: u8 = 3; const SUBSCRIPTION_TIMEOUT: u64 = 60; const DELAY_MS: u64 = 1_000; // 1s @@ -52,7 +53,7 @@ struct State { // includes keys and values for: // "meta:chain_id", "meta:version", "meta:last_block", "meta:contract_address", // "names:{namehash}" -> "{name}", "nodes:{name}" -> "{node_info}" - kv: Kv>, // todo: maybe serialize directly into known enum of possible types? + kv: Kv>, } impl State { @@ -68,109 +69,91 @@ impl State { last_block: 0, }; - // load or initialize chain_id + let version = state.get_version(); let chain_id = state.get_chain_id(); - if chain_id == 0 { - state.set_chain_id(CHAIN_ID); - } - - // load or initialize contract_address let contract_address = state.get_contract_address(); - if contract_address - == eth::Address::from_str(KIMAP_ADDRESS) - .expect("Failed to parse KIMAP_ADDRESS constant") - { - state.set_contract_address(contract_address); - } - - // load or initialize last_block let last_block = state.get_last_block(); - if last_block == 0 { - state.set_last_block(KIMAP_FIRST_BLOCK); - } - // load or initialize version - let version = state.get_version(); - if version == 0 { - state.set_version(1); // Start at version 1 + if version != CURRENT_VERSION + || chain_id != CHAIN_ID + || contract_address != eth::Address::from_str(KIMAP_ADDRESS).unwrap() + { + // if version/contract/chain_id are new, run migrations here. } + state.set_chain_id(chain_id); + state.set_contract_address(contract_address); + state.set_version(CURRENT_VERSION); + // update state struct with final values - state.version = state.get_version(); - state.last_block = state.get_last_block(); + state.version = version; + state.last_block = last_block; println!( "\n 🐦‍⬛ KNS Indexer State\n\ ▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔\n\ - Version {:>6}\n\ - Last Block {:>6}\n\ - Chain ID {:>6}\n\ - KIMAP {}\n\ + Version {}\n\ + Last Block {}\n\ + Chain ID {}\n\ + KIMAP {}\n\ ▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁\n", state.version, state.last_block, - state.get_chain_id(), - state.get_contract_address().to_string() + chain_id, + contract_address.to_string(), ); state } - fn meta_version_key() -> &'static str { - "meta:version" + fn meta_version_key() -> String { + "meta:version".to_string() } - fn meta_last_block_key() -> &'static str { - "meta:last_block" + + fn meta_last_block_key() -> String { + "meta:last_block".to_string() } - fn meta_chain_id_key() -> &'static str { - "meta:chain_id" + + fn meta_chain_id_key() -> String { + "meta:chain_id".to_string() } - fn meta_contract_address_key() -> &'static str { - "meta:contract_address" + + fn meta_contract_address_key() -> String { + "meta:contract_address".to_string() } fn name_key(namehash: &str) -> String { - format!("names:{namehash}") + format!("name:{}", namehash) } fn node_key(name: &str) -> String { - format!("nodes:{name}") + format!("node:{}", name) } fn get_last_block(&self) -> u64 { self.kv - .get(&Self::meta_last_block_key().to_string()) + .get_as::(&Self::meta_last_block_key()) .ok() - .and_then(|bytes| serde_json::from_slice(&bytes).ok()) - .unwrap_or(0) + .unwrap_or(KIMAP_FIRST_BLOCK) } fn set_last_block(&mut self, block: u64) { self.kv - .set( - &Self::meta_last_block_key().to_string(), - &serde_json::to_vec(&block).unwrap(), - None, - ) + .set_as::(&Self::meta_last_block_key(), &block, None) .unwrap(); self.last_block = block; } fn get_version(&self) -> u32 { self.kv - .get(&Self::meta_version_key().to_string()) + .get_as::(&Self::meta_version_key()) .ok() - .and_then(|bytes| serde_json::from_slice(&bytes).ok()) - .unwrap_or(0) + .unwrap_or(CURRENT_VERSION) } fn set_version(&mut self, version: u32) { self.kv - .set( - &Self::meta_version_key().to_string(), - &serde_json::to_vec(&version).unwrap(), - None, - ) + .set_as::(&Self::meta_version_key(), &version, None) .unwrap(); self.version = version; } @@ -189,83 +172,46 @@ impl State { } fn get_node(&self, name: &str) -> Option { - let x = self - .kv - .get(&Self::node_key(name)) - .ok() - .and_then(|bytes| serde_json::from_slice(&bytes).ok()); - x + self.kv.get_as::(&Self::node_key(name)).ok() } fn set_node(&mut self, name: &str, node: &net::KnsUpdate) { - let x = self.kv.set( - &Self::node_key(name), - &serde_json::to_vec(&node).unwrap(), - None, - ); - x.unwrap(); + self.kv + .set_as::(&Self::node_key(name), &node, None) + .unwrap(); } fn get_chain_id(&self) -> u64 { self.kv - .get(&Self::meta_chain_id_key().to_string()) + .get_as::(&Self::meta_chain_id_key()) .ok() - .and_then(|bytes| serde_json::from_slice(&bytes).ok()) .unwrap_or(CHAIN_ID) } fn set_chain_id(&mut self, chain_id: u64) { self.kv - .set( - &Self::meta_chain_id_key().to_string(), - &serde_json::to_vec(&chain_id).unwrap(), - None, - ) + .set_as::(&Self::meta_chain_id_key(), &chain_id, None) .unwrap(); } fn get_contract_address(&self) -> eth::Address { - match self.kv.get(&Self::meta_contract_address_key().to_string()) { - Ok(bytes) => match serde_json::from_slice(&bytes) { - Ok(addr) => addr, - Err(_) => eth::Address::from_str(KIMAP_ADDRESS) - .expect("Failed to parse KIMAP_ADDRESS constant"), - }, + match self + .kv + .get_as::(&Self::meta_contract_address_key()) + { + Ok(addr) => addr, Err(_) => eth::Address::from_str(KIMAP_ADDRESS) .expect("Failed to parse KIMAP_ADDRESS constant"), } } fn set_contract_address(&mut self, contract_address: eth::Address) { - if let Ok(bytes) = serde_json::to_vec(&contract_address) { - self.kv - .set(&Self::meta_contract_address_key().to_string(), &bytes, None) - .expect("Failed to set contract address"); - } + self.kv + .set_as::(&Self::meta_contract_address_key(), &contract_address, None) + .expect("Failed to set contract address"); } } -// impl From for WitState { -// fn from(s: State) -> Self { -// let contract_address: [u8; 20] = s.contract_address.into(); -// WitState { -// chain_id: s.chain_id.clone(), -// contract_address: contract_address.to_vec(), -// names: s -// .names -// .iter() -// .map(|(k, v)| (k.clone(), v.clone())) -// .collect::>(), -// nodes: s -// .nodes -// .iter() -// .map(|(k, v)| (k.clone(), v.clone().into())) -// .collect::>(), -// last_block: s.last_block.clone(), -// } -// } -// } - impl From for WitKnsUpdate { fn from(k: net::KnsUpdate) -> Self { WitKnsUpdate { @@ -344,6 +290,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // 60s timeout -- these calls can take a long time // if they do time out, we try them again let eth_provider: eth::Provider = eth::Provider::new(chain_id, SUBSCRIPTION_TIMEOUT); + let _kimap_helper = kimap::Kimap::new(eth_provider.clone(), kimap_address); // subscribe to logs first, so no logs are missed eth_provider.subscribe_loop(1, mints_filter.clone()); @@ -427,11 +374,6 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { )) .send()?; } - // note no longer relevant. - // TODO: redo with iterator once available. - IndexerRequest::GetState(GetStateRequest { .. }) => { - Response::new().body(serde_json::to_vec(&state)?).send()?; - } } } } diff --git a/kinode/packages/kns-indexer/pkg/scripts.json b/kinode/packages/kns-indexer/pkg/scripts.json index bee99cd16..25798ef73 100644 --- a/kinode/packages/kns-indexer/pkg/scripts.json +++ b/kinode/packages/kns-indexer/pkg/scripts.json @@ -10,17 +10,5 @@ "eth:distro:sys" ], "wit_version": 1 - }, - "state.wasm": { - "root": false, - "public": false, - "request_networking": false, - "request_capabilities": [ - "kns-indexer:kns-indexer:sys" - ], - "grant_capabilities": [ - "kns-indexer:kns-indexer:sys" - ], - "wit_version": 1 } } diff --git a/kinode/packages/kns-indexer/state/Cargo.toml b/kinode/packages/kns-indexer/state/Cargo.toml deleted file mode 100644 index 4bd0387d4..000000000 --- a/kinode/packages/kns-indexer/state/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "state" -version = "0.1.0" -edition = "2021" - -[features] -simulation-mode = [] - -[dependencies] -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "9c441fe" } -process_macros = "0.1" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -wit-bindgen = "0.36.0" - -[lib] -crate-type = ["cdylib"] - -[package.metadata.component] -package = "kinode:process" diff --git a/kinode/packages/kns-indexer/state/src/lib.rs b/kinode/packages/kns-indexer/state/src/lib.rs deleted file mode 100644 index 27df8a95d..000000000 --- a/kinode/packages/kns-indexer/state/src/lib.rs +++ /dev/null @@ -1,46 +0,0 @@ -use crate::kinode::process::kns_indexer::{GetStateRequest, IndexerRequest, IndexerResponse}; -use kinode_process_lib::{eth, script, Address, Message, Request}; - -wit_bindgen::generate!({ - path: "target/wit", - world: "kns-indexer-sys-v0", - generate_unused_types: true, - additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto], -}); - -script!(init); -fn init(_our: Address, _args: String) -> String { - // we don't take any args - - let Ok(Message::Response { body, .. }) = - Request::to(("our", "kns-indexer", "kns-indexer", "sys")) - .body(IndexerRequest::GetState(GetStateRequest { block: 0 })) - .send_and_await_response(10) - .unwrap() - else { - return "failed to get state from kns-indexer".to_string(); - }; - let Ok(IndexerResponse::GetState(state)) = body.try_into() else { - return "failed to deserialize state".to_string(); - }; - // can change later, but for now, just print every known node name - let mut names = state - .names - .iter() - .map(|(_k, v)| v.clone()) - .collect::>(); - names.sort(); - let contract_address: [u8; 20] = state - .contract_address - .try_into() - .expect("invalid contract addess: doesn't have 20 bytes"); - let contract_address: eth::Address = contract_address.into(); - format!( - "\nrunning on chain id {}\nCA: {}\n{} known nodes as of block {}\n {}", - state.chain_id, - contract_address, - names.len(), - state.last_block, - names.join("\n ") - ) -}