diff --git a/Cargo.lock b/Cargo.lock index 14b7af096..8421261a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -94,7 +94,7 @@ name = "alias" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -1275,7 +1275,7 @@ dependencies = [ "alloy-sol-types 0.8.15", "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "process_macros", "rand 0.8.5", "serde", @@ -1858,7 +1858,7 @@ name = "cat" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -1922,7 +1922,7 @@ dependencies = [ "alloy-sol-types 0.8.15", "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "process_macros", "rand 0.8.5", "serde", @@ -1941,7 +1941,7 @@ version = "0.2.1" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "pleco", "serde", "serde_json", @@ -2139,7 +2139,7 @@ checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" name = "contacts" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "process_macros", "serde", "serde_json", @@ -2735,7 +2735,7 @@ name = "download" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "process_macros", "serde", "serde_json", @@ -2747,7 +2747,7 @@ name = "downloads" version = "0.5.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "process_macros", "rand 0.8.5", "serde", @@ -2784,7 +2784,7 @@ dependencies = [ name = "echo" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "wit-bindgen 0.36.0", ] @@ -3052,7 +3052,7 @@ version = "0.2.0" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "process_macros", "rand 0.8.5", "serde", @@ -3206,7 +3206,7 @@ dependencies = [ name = "get_block" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0209da1)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -3402,7 +3402,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" name = "help" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "wit-bindgen 0.36.0", ] @@ -3431,7 +3431,7 @@ checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" name = "hi" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -3464,7 +3464,7 @@ version = "0.1.2" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -3907,7 +3907,7 @@ name = "install" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "process_macros", "serde", "serde_json", @@ -4085,7 +4085,7 @@ name = "kfetch" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "rmp-serde", "serde", "serde_json", @@ -4097,7 +4097,7 @@ name = "kill" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -4188,6 +4188,28 @@ dependencies = [ "wit-bindgen 0.24.0", ] +[[package]] +name = "kinode_process_lib" +version = "0.10.0" +source = "git+https://github.com/kinode-dao/process_lib?rev=0209da1#0209da15340d9a2b92152916a8349d8f248775e5" +dependencies = [ + "alloy 0.1.4", + "alloy-primitives 0.7.7", + "alloy-sol-macro 0.7.7", + "alloy-sol-types 0.7.7", + "anyhow", + "bincode", + "http 1.2.0", + "mime_guess", + "rand 0.8.5", + "rmp-serde", + "serde", + "serde_json", + "thiserror 1.0.69", + "url", + "wit-bindgen 0.36.0", +] + [[package]] name = "kinode_process_lib" version = "0.10.0" @@ -4256,7 +4278,7 @@ dependencies = [ "alloy-sol-types 0.8.15", "anyhow", "hex", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0209da1)", "process_macros", "rmp-serde", "serde", @@ -4493,7 +4515,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "regex", "serde", "serde_json", @@ -4662,7 +4684,7 @@ dependencies = [ name = "net-diagnostics" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "rmp-serde", "serde", "wit-bindgen 0.36.0", @@ -5021,7 +5043,7 @@ dependencies = [ name = "peer" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "rmp-serde", "serde", "wit-bindgen 0.36.0", @@ -5031,7 +5053,7 @@ dependencies = [ name = "peers" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "rmp-serde", "serde", "wit-bindgen 0.36.0", @@ -6147,7 +6169,7 @@ dependencies = [ "anyhow", "base64 0.22.1", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "rmp-serde", "serde", "serde_json", @@ -6294,7 +6316,7 @@ dependencies = [ [[package]] name = "snow" version = "0.9.0" -source = "git+https://github.com/dr-frmr/snow?branch=dr/extract_cipherstates#1d4eb5f6747aa59aabb32bbbe698fb4bb7dfb9a4" +source = "git+https://github.com/dr-frmr/snow?branch=dr%2Fextract_cipherstates#1d4eb5f6747aa59aabb32bbbe698fb4bb7dfb9a4" dependencies = [ "aes-gcm", "blake2", @@ -6364,17 +6386,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" -[[package]] -name = "state" -version = "0.1.0" -dependencies = [ - "kinode_process_lib 0.10.0", - "process_macros", - "serde", - "serde_json", - "wit-bindgen 0.36.0", -] - [[package]] name = "static_assertions" version = "1.1.0" @@ -6581,7 +6592,7 @@ version = "0.1.1" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "rand 0.8.5", "regex", "serde", @@ -6595,7 +6606,7 @@ version = "0.1.1" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "process_macros", "serde", "serde_json", @@ -6886,7 +6897,7 @@ version = "0.2.0" dependencies = [ "anyhow", "clap", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "serde", "serde_json", "wit-bindgen 0.36.0", @@ -7253,7 +7264,7 @@ name = "uninstall" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.10.0", + "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=0443ece)", "process_macros", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 7ca952cdd..65a72d73f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,8 +19,7 @@ members = [ "kinode/packages/chess/chess", "kinode/packages/contacts/contacts", "kinode/packages/homepage/homepage", - "kinode/packages/kns-indexer/kns-indexer", "kinode/packages/kns-indexer/get-block", "kinode/packages/kns-indexer/state", - "kinode/packages/settings/settings", + "kinode/packages/kns-indexer/kns-indexer", "kinode/packages/kns-indexer/get-block", "kinode/packages/settings/settings", "kinode/packages/terminal/terminal", "kinode/packages/terminal/alias", "kinode/packages/terminal/cat", "kinode/packages/terminal/echo", "kinode/packages/terminal/help", "kinode/packages/terminal/hi", "kinode/packages/terminal/kfetch", diff --git a/kinode/packages/app-store/app-store/src/lib.rs b/kinode/packages/app-store/app-store/src/lib.rs index ba385d662..15eff4bf1 100644 --- a/kinode/packages/app-store/app-store/src/lib.rs +++ b/kinode/packages/app-store/app-store/src/lib.rs @@ -78,8 +78,6 @@ pub enum Resp { call_init!(init); fn init(our: Address) { - println!("started"); - let mut http_server = http::server::HttpServer::new(5); http_api::init_frontend(&our, &mut http_server); diff --git a/kinode/packages/app-store/chain/src/lib.rs b/kinode/packages/app-store/chain/src/lib.rs index 81510da32..5e25916a4 100644 --- a/kinode/packages/app-store/chain/src/lib.rs +++ b/kinode/packages/app-store/chain/src/lib.rs @@ -33,14 +33,14 @@ use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; use kinode::process::chain::ChainResponses; use kinode_process_lib::{ - await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap, - print_to_terminal, println, timer, Address, Message, PackageId, Request, Response, + await_message, call_init, eth, get_blob, http, kernel_types as kt, kimap, print_to_terminal, + println, + sqlite::{self, Sqlite}, + timer, Address, Message, PackageId, Request, Response, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::{HashMap, HashSet}, - str::FromStr, -}; +use std::collections::HashMap; +use std::str::FromStr; wit_bindgen::generate!({ path: "target/wit", @@ -63,7 +63,6 @@ const KIMAP_ADDRESS: &str = "0x9CE8cCD2932DC727c70f9ae4f8C2b68E6Abed58C"; const DELAY_MS: u64 = 1_000; // 1s -#[derive(Debug, Serialize, Deserialize)] pub struct State { /// the kimap helper we are using pub kimap: kimap::Kimap, @@ -71,10 +70,8 @@ pub struct State { /// when we boot, we can read logs starting from this block and /// rebuild latest state. pub last_saved_block: u64, - /// onchain listings - pub listings: HashMap, - /// set of packages that we have published - pub published: HashSet, + /// tables: listings: , published: vec + pub db: DB, } /// listing information derived from metadata hash in listing event @@ -83,10 +80,9 @@ pub struct PackageListing { pub tba: eth::Address, pub metadata_uri: String, pub metadata_hash: String, - // should this even be optional? - // relegate to only valid apps maybe? pub metadata: Option, pub auto_update: bool, + pub block: u64, } #[derive(Debug, Serialize, Deserialize, process_macros::SerdeJsonInto)] @@ -96,18 +92,281 @@ pub enum Req { Request(ChainRequests), } +pub struct DB { + inner: Sqlite, +} + +impl DB { + pub fn connect(our: &Address) -> anyhow::Result { + let inner = sqlite::open(our.package_id(), "app_store_chain.sqlite", Some(10))?; + // create tables + inner.write(CREATE_META_TABLE.into(), vec![], None)?; + inner.write(CREATE_LISTINGS_TABLE.into(), vec![], None)?; + inner.write(CREATE_PUBLISHED_TABLE.into(), vec![], None)?; + + Ok(Self { inner }) + } + + pub fn get_last_saved_block(&self) -> anyhow::Result { + let query = "SELECT value FROM meta WHERE key = 'last_saved_block'"; + let rows = self.inner.read(query.into(), vec![])?; + if let Some(row) = rows.get(0) { + if let Some(val_str) = row.get("value").and_then(|v| v.as_str()) { + if let Ok(block) = val_str.parse::() { + return Ok(block); + } + } + } + Ok(0) + } + + pub fn set_last_saved_block(&self, block: u64) -> anyhow::Result<()> { + let query = "INSERT INTO meta (key, value) VALUES ('last_saved_block', ?) + ON CONFLICT(key) DO UPDATE SET value=excluded.value"; + let params = vec![block.to_string().into()]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn insert_or_update_listing( + &self, + package_id: &PackageId, + listing: &PackageListing, + ) -> anyhow::Result<()> { + let metadata_json = if let Some(m) = &listing.metadata { + serde_json::to_string(m)? + } else { + "".to_string() + }; + + let query = "INSERT INTO listings (package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(package_name, publisher_node) + DO UPDATE SET + tba=excluded.tba, + metadata_uri=excluded.metadata_uri, + metadata_hash=excluded.metadata_hash, + metadata_json=excluded.metadata_json, + auto_update=excluded.auto_update, + block=excluded.block"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + listing.tba.to_string().into(), + listing.metadata_uri.clone().into(), + listing.metadata_hash.clone().into(), + metadata_json.into(), + (if listing.auto_update { 1 } else { 0 }).into(), + listing.block.into(), + ]; + + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn delete_listing(&self, package_id: &PackageId) -> anyhow::Result<()> { + let query = "DELETE FROM listings WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn get_listing(&self, package_id: &PackageId) -> anyhow::Result> { + let query = "SELECT tba, metadata_uri, metadata_hash, metadata_json, auto_update, block FROM listings WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + let rows = self.inner.read(query.into(), params)?; + if let Some(row) = rows.get(0) { + Ok(Some(self.row_to_listing(row)?)) + } else { + Ok(None) + } + } + + pub fn get_all_listings(&self) -> anyhow::Result> { + let query = "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block FROM listings"; + let rows = self.inner.read(query.into(), vec![])?; + let mut listings = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + let listing = self.row_to_listing(&row)?; + listings.push((pid, listing)); + } + Ok(listings) + } + + pub fn get_listings_batch( + &self, + limit: u64, + offset: u64, + ) -> anyhow::Result> { + let query = format!( + "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block + FROM listings + ORDER BY package_name, publisher_node + LIMIT {} OFFSET {}", + limit, offset + ); + + let rows = self.inner.read(query, vec![])?; + let mut listings = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + let listing = self.row_to_listing(&row)?; + listings.push((pid, listing)); + } + Ok(listings) + } + + pub fn get_listings_since_block( + &self, + block_number: u64, + ) -> anyhow::Result> { + let query = "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block + FROM listings + WHERE block > ?"; + let params = vec![block_number.into()]; + let rows = self.inner.read(query.into(), params)?; + let mut listings = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + let listing = self.row_to_listing(&row)?; + listings.push((pid, listing)); + } + Ok(listings) + } + + pub fn row_to_listing( + &self, + row: &HashMap, + ) -> anyhow::Result { + let tba_str = row["tba"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Invalid tba"))?; + let tba = tba_str.parse::()?; + let metadata_uri = row["metadata_uri"].as_str().unwrap_or("").to_string(); + let metadata_hash = row["metadata_hash"].as_str().unwrap_or("").to_string(); + let metadata_json = row["metadata_json"].as_str().unwrap_or(""); + let metadata: Option = + if metadata_json.is_empty() { + None + } else { + serde_json::from_str(metadata_json)? + }; + let auto_update = row["auto_update"].as_i64().unwrap_or(0) == 1; + let block = row["block"].as_i64().unwrap_or(0) as u64; + + Ok(PackageListing { + tba, + metadata_uri, + metadata_hash, + metadata, + auto_update, + block, + }) + } + + pub fn get_published(&self, package_id: &PackageId) -> anyhow::Result { + let query = "SELECT 1 FROM published WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + let rows = self.inner.read(query.into(), params)?; + Ok(!rows.is_empty()) + } + + pub fn insert_published(&self, package_id: &PackageId) -> anyhow::Result<()> { + let query = "INSERT INTO published (package_name, publisher_node) VALUES (?, ?) ON CONFLICT DO NOTHING"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn delete_published(&self, package_id: &PackageId) -> anyhow::Result<()> { + let query = "DELETE FROM published WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn get_all_published(&self) -> anyhow::Result> { + let query = "SELECT package_name, publisher_node FROM published"; + let rows = self.inner.read(query.into(), vec![])?; + let mut result = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + result.push(pid); + } + Ok(result) + } +} + +const CREATE_META_TABLE: &str = " +CREATE TABLE IF NOT EXISTS meta ( + key TEXT PRIMARY KEY, + value TEXT +);"; + +const CREATE_LISTINGS_TABLE: &str = " +CREATE TABLE IF NOT EXISTS listings ( + package_name TEXT NOT NULL, + publisher_node TEXT NOT NULL, + tba TEXT NOT NULL, + metadata_uri TEXT, + metadata_hash TEXT, + metadata_json TEXT, + auto_update INTEGER NOT NULL DEFAULT 0, + block INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (package_name, publisher_node) +);"; + +const CREATE_PUBLISHED_TABLE: &str = " +CREATE TABLE IF NOT EXISTS published ( + package_name TEXT NOT NULL, + publisher_node TEXT NOT NULL, + PRIMARY KEY (package_name, publisher_node) +);"; + call_init!(init); fn init(our: Address) { - println!( - "chain started, indexing on contract address {}", - KIMAP_ADDRESS - ); - // create new provider with request-timeout of 60s - // can change, log requests can take quite a long time. let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT); - let mut state = fetch_state(eth_provider); - fetch_and_subscribe_logs(&our, &mut state); + let db = DB::connect(&our).expect("failed to open DB"); + let kimap_helper = + kimap::Kimap::new(eth_provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()); + let last_saved_block = db.get_last_saved_block().unwrap_or(0); + + let mut state = State { + kimap: kimap_helper, + last_saved_block, + db, + }; + + fetch_and_subscribe_logs(&our, &mut state, last_saved_block); loop { match await_message() { @@ -126,17 +385,15 @@ fn init(our: Address) { fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> { if !message.is_request() { if message.is_local(&our) && message.source().process == "timer:distro:sys" { - // handling of ETH RPC subscriptions delayed by DELAY_MS - // to allow kns to have a chance to process block: handle now let Some(context) = message.context() else { - return Err(anyhow::anyhow!("foo")); + return Err(anyhow::anyhow!("No context in timer message")); }; let log = serde_json::from_slice(context)?; handle_eth_log(our, state, log, false)?; return Ok(()); } } else { - match message.body().try_into()? { + match serde_json::from_slice::(message.body())? { Req::Eth(eth_result) => { if !message.is_local(our) || message.source().process != "eth:distro:sys" { return Err(anyhow::anyhow!( @@ -154,7 +411,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow timer::set_timer(DELAY_MS, Some(serde_json::to_vec(log)?)); } } else { - // attempt to resubscribe + // re-subscribe if error state .kimap .provider @@ -173,48 +430,37 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result<()> { match req { ChainRequests::GetApp(package_id) => { - let onchain_app = state - .listings - .get(&package_id.clone().to_process_lib()) - .map(|app| OnchainApp { - package_id: package_id, - tba: app.tba.to_string(), - metadata_uri: app.metadata_uri.clone(), - metadata_hash: app.metadata_hash.clone(), - metadata: app.metadata.as_ref().map(|m| m.clone().into()), - auto_update: app.auto_update, - }); + let pid = package_id.clone().to_process_lib(); + let listing = state.db.get_listing(&pid)?; + let onchain_app = listing.map(|app| app.to_onchain_app(&pid)); let response = ChainResponses::GetApp(onchain_app); Response::new().body(&response).send()?; } ChainRequests::GetApps => { - let apps: Vec = state - .listings - .iter() - .map(|(id, listing)| listing.to_onchain_app(id)) + let listings = state.db.get_all_listings()?; + let apps: Vec = listings + .into_iter() + .map(|(pid, listing)| listing.to_onchain_app(&pid)) .collect(); - let response = ChainResponses::GetApps(apps); Response::new().body(&response).send()?; } ChainRequests::GetOurApps => { - let apps: Vec = state - .published - .iter() - .filter_map(|id| { - state - .listings - .get(id) - .map(|listing| listing.to_onchain_app(id)) - }) - .collect(); - + let published_list = state.db.get_all_published()?; + let mut apps = Vec::new(); + for pid in published_list { + if let Some(listing) = state.db.get_listing(&pid)? { + apps.push(listing.to_onchain_app(&pid)); + } + } let response = ChainResponses::GetOurApps(apps); Response::new().body(&response).send()?; } ChainRequests::StartAutoUpdate(package_id) => { - if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) { + let pid = package_id.to_process_lib(); + if let Some(mut listing) = state.db.get_listing(&pid)? { listing.auto_update = true; + state.db.insert_or_update_listing(&pid, &listing)?; let response = ChainResponses::AutoUpdateStarted; Response::new().body(&response).send()?; } else { @@ -223,8 +469,10 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result } } ChainRequests::StopAutoUpdate(package_id) => { - if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) { + let pid = package_id.to_process_lib(); + if let Some(mut listing) = state.db.get_listing(&pid)? { listing.auto_update = false; + state.db.insert_or_update_listing(&pid, &listing)?; let response = ChainResponses::AutoUpdateStopped; Response::new().body(&response).send()?; } else { @@ -258,7 +506,7 @@ fn handle_eth_log( if package.is_empty() || publisher.is_empty() { Err(anyhow::anyhow!("invalid publisher name")) } else { - Ok(PackageId::new(&package, &publisher)) + Ok(PackageId::new(package, publisher)) } })?; @@ -267,7 +515,7 @@ fn handle_eth_log( // at the URI. let metadata_uri = String::from_utf8_lossy(¬e.data).to_string(); - let is_our_package = &package_id.publisher() == &our.node(); + let is_our_package = package_id.publisher() == our.node(); let (tba, metadata_hash) = if !startup { // generate ~metadata-hash full-path @@ -292,10 +540,12 @@ fn handle_eth_log( match data { None => { - // if ~metadata-uri is also empty, this is an unpublish action! + // unpublish if metadata_uri empty if metadata_uri.is_empty() { - state.published.remove(&package_id); - state.listings.remove(&package_id); + state.db.delete_published(&package_id)?; + state.db.delete_listing(&package_id)?; + state.last_saved_block = block_number; + state.db.set_last_saved_block(block_number)?; return Ok(()); } return Err(anyhow::anyhow!( @@ -309,7 +559,7 @@ fn handle_eth_log( }; if is_our_package { - state.published.insert(package_id.clone()); + state.db.insert_published(&package_id)?; } // if this is a startup event, we don't need to fetch metadata from the URI -- @@ -322,109 +572,158 @@ fn handle_eth_log( None }; - match state.listings.entry(package_id.clone()) { - std::collections::hash_map::Entry::Occupied(mut listing) => { - let listing = listing.get_mut(); - listing.metadata_uri = metadata_uri; - listing.tba = tba; - listing.metadata_hash = metadata_hash; - listing.metadata = metadata.clone(); - } - std::collections::hash_map::Entry::Vacant(listing) => { - listing.insert(PackageListing { - tba, - metadata_uri, - metadata_hash, - metadata: metadata.clone(), - auto_update: false, - }); - } + let mut listing = state + .db + .get_listing(&package_id)? + .unwrap_or(PackageListing { + tba, + metadata_uri: metadata_uri.clone(), + metadata_hash: metadata_hash.clone(), + metadata: metadata.clone(), + auto_update: false, + block: block_number, + }); + // update fields + listing.tba = tba; + listing.metadata_uri = metadata_uri; + listing.metadata_hash = metadata_hash; + listing.metadata = metadata.clone(); + + state.db.insert_or_update_listing(&package_id, &listing)?; + + if !startup && listing.auto_update { + println!("kicking off auto-update for: {}", package_id); + Request::to(("our", "downloads", "app_store", "sys")) + .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { + package_id: crate::kinode::process::main::PackageId::from_process_lib( + package_id.clone(), + ), + metadata: metadata.unwrap().into(), + })) + .send() + .unwrap(); } if !startup { - // if auto_update is enabled, send a message to downloads to kick off the update. - if let Some(listing) = state.listings.get(&package_id) { - if listing.auto_update { - print_to_terminal(0, &format!("kicking off auto-update for: {}", package_id)); - Request::to(("our", "downloads", "app-store", "sys")) - .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { - package_id: crate::kinode::process::main::PackageId::from_process_lib( - package_id, - ), - metadata: metadata.unwrap().into(), - })) - .send() - .unwrap(); - } - } + state.last_saved_block = block_number; + state.db.set_last_saved_block(block_number)?; } - state.last_saved_block = block_number; - Ok(()) } /// after startup, fetch metadata for all listings /// we do this as a separate step to not repeatedly fetch outdated metadata /// as we process logs. -fn update_all_metadata(state: &mut State) { - state.listings.retain(|package_id, listing| { - let (tba, metadata_hash) = { - // generate ~metadata-hash full-path - let hash_note = format!( - "~metadata-hash.{}.{}", - package_id.package(), - package_id.publisher() +fn update_all_metadata(state: &mut State, last_saved_block: u64) { + let updated_listings = match state.db.get_listings_since_block(last_saved_block) { + Ok(listings) => listings, + Err(e) => { + print_to_terminal( + 1, + &format!("error fetching updated listings since block {last_saved_block}: {e}"), ); + return; + } + }; - // owner can change which we don't track (yet?) so don't save, need to get when desired - let Ok((tba, _owner, data)) = (match state.kimap.get(&hash_note) { - Ok(gr) => Ok(gr), - Err(e) => match e { - eth::EthError::RpcError(_) => { - // retry on RpcError after DELAY_MS sleep - // sleep here rather than with, e.g., a message to - // `timer:distro:sys` so that events are processed in - // order of receipt - std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); - state.kimap.get(&hash_note) + for (pid, mut listing) in updated_listings { + let hash_note = format!("~metadata-hash.{}.{}", pid.package(), pid.publisher()); + let (tba, metadata_hash) = match state.kimap.get(&hash_note) { + Ok((t, _o, data)) => { + match data { + None => { + // If metadata_uri empty, unpublish + if listing.metadata_uri.is_empty() { + if let Err(e) = state.db.delete_published(&pid) { + print_to_terminal(1, &format!("error deleting published: {e}")); + } + } + if let Err(e) = state.db.delete_listing(&pid) { + print_to_terminal(1, &format!("error deleting listing: {e}")); + } + continue; } - _ => Err(e), - }, - }) else { - return false; - }; - - match data { - None => { - // if ~metadata-uri is also empty, this is an unpublish action! - if listing.metadata_uri.is_empty() { - state.published.remove(package_id); + Some(hash_note) => (t, String::from_utf8_lossy(&hash_note).to_string()), + } + } + Err(e) => { + // If RpcError, retry once after delay + if let eth::EthError::RpcError(_) = e { + std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); + match state.kimap.get(&hash_note) { + Ok((t, _o, data)) => { + if let Some(hash_note) = data { + (t, String::from_utf8_lossy(&hash_note).to_string()) + } else { + // no data again after retry + if listing.metadata_uri.is_empty() { + if let Err(e) = state.db.delete_published(&pid) { + print_to_terminal( + 1, + &format!("error deleting published: {e}"), + ); + } + } + if let Err(e) = state.db.delete_listing(&pid) { + print_to_terminal(1, &format!("error deleting listing: {e}")); + } + continue; + } + } + Err(e2) => { + print_to_terminal( + 1, + &format!("error retrieving metadata-hash after retry: {e2:?}"), + ); + continue; + } } - return false; + } else { + print_to_terminal( + 1, + &format!("error retrieving metadata-hash: {e:?} for {pid}"), + ); + continue; } - Some(hash_note) => (tba, String::from_utf8_lossy(&hash_note).to_string()), } }; + + // Update listing fields listing.tba = tba; listing.metadata_hash = metadata_hash; + let metadata = - fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30).ok(); + match fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30) { + Ok(md) => Some(md), + Err(err) => { + print_to_terminal(1, &format!("error fetching metadata for {}: {err}", pid)); + None + } + }; listing.metadata = metadata.clone(); + + if let Err(e) = state.db.insert_or_update_listing(&pid, &listing) { + print_to_terminal(1, &format!("error updating listing {}: {e}", pid)); + } + if listing.auto_update { - print_to_terminal(0, &format!("kicking off auto-update for: {}", package_id)); - Request::to(("our", "downloads", "app-store", "sys")) - .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { - package_id: crate::kinode::process::main::PackageId::from_process_lib( - package_id.clone(), - ), - metadata: metadata.unwrap().into(), - })) - .send() - .unwrap(); + if let Some(md) = metadata { + print_to_terminal(0, &format!("kicking off auto-update for: {}", pid)); + if let Err(e) = Request::to(("our", "downloads", "app_store", "sys")) + .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { + package_id: crate::kinode::process::main::PackageId::from_process_lib( + pid.clone(), + ), + metadata: md.into(), + })) + .send() + { + print_to_terminal(1, &format!("error sending auto-update request: {e}")); + } + } } - true - }); + } } /// create the filter used for app store getLogs and subscription. @@ -443,21 +742,25 @@ pub fn app_store_filter(state: &State) -> eth::Filter { } /// create a filter to fetch app store event logs from chain and subscribe to new events -pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) { +pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State, last_saved_block: u64) { let filter = app_store_filter(state); // get past logs, subscribe to new ones. // subscribe first so we don't miss any logs - println!("subscribing..."); state.kimap.provider.subscribe_loop(1, filter.clone()); - for log in fetch_logs( - &state.kimap.provider, - &filter.from_block(state.last_saved_block), - ) { + // println!("fetching old logs from block {last_saved_block}"); + for log in fetch_logs(&state.kimap.provider, &filter.from_block(last_saved_block)) { if let Err(e) = handle_eth_log(our, state, log, true) { print_to_terminal(1, &format!("error ingesting log: {e}")); }; } - update_all_metadata(state); + + update_all_metadata(state, last_saved_block); + // save updated last_saved_block + if let Ok(block_number) = state.kimap.provider.get_block_number() { + state.last_saved_block = block_number; + state.db.set_last_saved_block(block_number).unwrap(); + } + // println!("up to date to block {}", state.last_saved_block); } /// fetch logs from the chain with a given filter @@ -506,32 +809,6 @@ pub fn keccak_256_hash(bytes: &[u8]) -> String { format!("0x{:x}", hasher.finalize()) } -/// fetch state from disk or create a new one if that fails -pub fn fetch_state(provider: eth::Provider) -> State { - if let Some(state_bytes) = get_state() { - match serde_json::from_slice::(&state_bytes) { - Ok(state) => { - if state.kimap.address().to_string() == KIMAP_ADDRESS { - return state; - } else { - println!( - "state contract address mismatch. rebuilding state! expected {}, got {}", - KIMAP_ADDRESS, - state.kimap.address().to_string() - ); - } - } - Err(e) => println!("failed to deserialize saved state, rebuilding: {e}"), - } - } - State { - kimap: kimap::Kimap::new(provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()), - last_saved_block: 0, - listings: HashMap::new(), - published: HashSet::new(), - } -} - // quite annoyingly, we must convert from our gen'd version of PackageId // to the process_lib's gen'd version. this is in order to access custom // Impls that we want to use diff --git a/kinode/packages/app-store/pkg/manifest.json b/kinode/packages/app-store/pkg/manifest.json index 5198fa233..4a7e951e0 100644 --- a/kinode/packages/app-store/pkg/manifest.json +++ b/kinode/packages/app-store/pkg/manifest.json @@ -39,6 +39,7 @@ "eth:distro:sys", "http-server:distro:sys", "http-client:distro:sys", + "sqlite:distro:sys", { "process": "vfs:distro:sys", "params": { @@ -52,6 +53,7 @@ "vfs:distro:sys", "http-client:distro:sys", "eth:distro:sys", + "sqlite:distro:sys", "timer:distro:sys" ], "public": false diff --git a/kinode/packages/kns-indexer/Cargo.lock b/kinode/packages/kns-indexer/Cargo.lock index 71bdee99e..b2ee67bef 100644 --- a/kinode/packages/kns-indexer/Cargo.lock +++ b/kinode/packages/kns-indexer/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -24,7 +24,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", - "getrandom", "once_cell", "version_check", "zerocopy", @@ -38,9 +37,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "alloy" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7e1758e5d759c0114140152ae72032eafcfdd7b599e995ebbc8eeafa2b4c977" +checksum = "0ba1c79677c9ce51c8d45e20845b05e6fb070ea2c863fba03ad6af2c778474bd" dependencies = [ "alloy-consensus", "alloy-core", @@ -60,65 +59,47 @@ version = "0.1.48" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0161082e0edd9013d23083465cc04b20e44b7a15646d36ba7b0cdb7cd6fe18f" dependencies = [ - "alloy-primitives", + "alloy-primitives 0.8.15", "num_enum", "strum", ] [[package]] name = "alloy-consensus" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a205d0cbb7bfdf9f4fd4b0ec842bc4c5f926e8c14ec3072d3fd75dd363baf1e0" +checksum = "da374e868f54c7f4ad2ad56829827badca388efd645f8cf5fccc61c2b5343504" dependencies = [ "alloy-eips", - "alloy-primitives", + "alloy-primitives 0.7.7", "alloy-rlp", "alloy-serde", - "alloy-trie", - "auto_impl", "c-kzg", - "derive_more", - "serde", -] - -[[package]] -name = "alloy-consensus-any" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "993c34090a3f281cb746fd1604520cf21f8407ffbeb006aaa34c0556bffa718e" -dependencies = [ - "alloy-consensus", - "alloy-eips", - "alloy-primitives", - "alloy-rlp", - "alloy-serde", "serde", ] [[package]] name = "alloy-core" -version = "0.8.15" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c618bd382f0bc2ac26a7e4bfae01c9b015ca8f21b37ca40059ae35a7e62b3dc6" +checksum = "529fc6310dc1126c8de51c376cbc59c79c7f662bd742be7dc67055d5421a81b4" dependencies = [ "alloy-dyn-abi", - "alloy-json-abi", - "alloy-primitives", - "alloy-rlp", - "alloy-sol-types", + "alloy-json-abi 0.7.7", + "alloy-primitives 0.7.7", + "alloy-sol-types 0.7.7", ] [[package]] name = "alloy-dyn-abi" -version = "0.8.15" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41056bde53ae10ffbbf11618efbe1e0290859e5eab0fe9ef82ebdb62f12a866f" +checksum = "413902aa18a97569e60f679c23f46a18db1656d87ab4d4e49d0e1e52042f66df" dependencies = [ - "alloy-json-abi", - "alloy-primitives", - "alloy-sol-type-parser", - "alloy-sol-types", + "alloy-json-abi 0.7.7", + "alloy-primitives 0.7.7", + "alloy-sol-type-parser 0.7.7", + "alloy-sol-types 0.7.7", "const-hex", "itoa", "serde", @@ -126,42 +107,16 @@ dependencies = [ "winnow", ] -[[package]] -name = "alloy-eip2930" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0069cf0642457f87a01a014f6dc29d5d893cd4fd8fddf0c3cdfad1bb3ebafc41" -dependencies = [ - "alloy-primitives", - "alloy-rlp", - "serde", -] - -[[package]] -name = "alloy-eip7702" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c986539255fb839d1533c128e190e557e52ff652c9ef62939e233a81dd93f7e" -dependencies = [ - "alloy-primitives", - "alloy-rlp", - "derive_more", - "serde", -] - [[package]] name = "alloy-eips" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1d9907c29ce622946759bf4fd3418166bfeae76c1c544b8081c7be3acd9b4be" +checksum = "f76ecab54890cdea1e4808fc0891c7e6cfcf71fe1a9fe26810c7280ef768f4ed" dependencies = [ - "alloy-eip2930", - "alloy-eip7702", - "alloy-primitives", + "alloy-primitives 0.7.7", "alloy-rlp", "alloy-serde", "c-kzg", - "derive_more", "once_cell", "serde", "sha2", @@ -169,78 +124,92 @@ dependencies = [ [[package]] name = "alloy-genesis" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f13f7405a8eb8021258994ed1beab490c3e509ebbe2c18e1c24ae10749d56b" +checksum = "bca15afde1b6d15e3fc1c97421262b1bbb37aee45752e3c8b6d6f13f776554ff" dependencies = [ - "alloy-primitives", + "alloy-primitives 0.7.7", "alloy-serde", - "alloy-trie", "serde", ] +[[package]] +name = "alloy-json-abi" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc05b04ac331a9f07e3a4036ef7926e49a8bf84a99a1ccfc7e2ab55a5fcbb372" +dependencies = [ + "alloy-primitives 0.7.7", + "alloy-sol-type-parser 0.7.7", + "serde", + "serde_json", +] + [[package]] name = "alloy-json-abi" version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c357da577dfb56998d01f574d81ad7a1958d248740a7981b205d69d65a7da404" dependencies = [ - "alloy-primitives", - "alloy-sol-type-parser", + "alloy-primitives 0.8.15", + "alloy-sol-type-parser 0.8.15", "serde", "serde_json", ] [[package]] name = "alloy-json-rpc" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39a786ce6bc7539dc30cabac6b7875644247c9e7d780e71a9f254d42ebdc013c" +checksum = "6d6f34930b7e3e2744bcc79056c217f00cb2abb33bc5d4ff88da7623c5bb078b" dependencies = [ - "alloy-primitives", - "alloy-sol-types", + "alloy-primitives 0.7.7", "serde", "serde_json", - "thiserror 2.0.7", + "thiserror 1.0.69", "tracing", ] [[package]] name = "alloy-network" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99051f82f77159d5bee06108f33cffee02849e2861fc500bf74213aa2ae8a26e" +checksum = "25f6895fc31b48fa12306ef9b4f78b7764f8bd6d7d91cdb0a40e233704a0f23f" dependencies = [ "alloy-consensus", - "alloy-consensus-any", "alloy-eips", "alloy-json-rpc", - "alloy-network-primitives", - "alloy-primitives", - "alloy-rpc-types-any", + "alloy-primitives 0.7.7", "alloy-rpc-types-eth", "alloy-serde", "alloy-signer", - "alloy-sol-types", + "alloy-sol-types 0.7.7", "async-trait", "auto_impl", "futures-utils-wasm", - "serde", - "serde_json", - "thiserror 2.0.7", + "thiserror 1.0.69", ] [[package]] -name = "alloy-network-primitives" -version = "0.8.1" +name = "alloy-primitives" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2aff127863f8279921397be8af0ac3f05a8757d5c4c972b491c278518fa07c7" +checksum = "ccb3ead547f4532bc8af961649942f0b9c16ee9226e26caa3f38420651cc0bf4" dependencies = [ - "alloy-consensus", - "alloy-eips", - "alloy-primitives", - "alloy-serde", + "alloy-rlp", + "bytes", + "cfg-if", + "const-hex", + "derive_more 0.99.18", + "hex-literal", + "itoa", + "k256", + "keccak-asm", + "proptest", + "rand", + "ruint", "serde", + "tiny-keccak", ] [[package]] @@ -253,7 +222,7 @@ dependencies = [ "bytes", "cfg-if", "const-hex", - "derive_more", + "derive_more 1.0.0", "foldhash", "hashbrown 0.15.2", "hex-literal", @@ -273,17 +242,16 @@ dependencies = [ [[package]] name = "alloy-provider" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0280a4f68e0cefde9449ee989a248230efbe3f95255299d2a7a92009e154629d" +checksum = "9c538bfa893d07e27cb4f3c1ab5f451592b7c526d511d62b576a2ce59e146e4a" dependencies = [ "alloy-chains", "alloy-consensus", "alloy-eips", "alloy-json-rpc", "alloy-network", - "alloy-network-primitives", - "alloy-primitives", + "alloy-primitives 0.7.7", "alloy-rpc-client", "alloy-rpc-types-eth", "alloy-transport", @@ -295,17 +263,13 @@ dependencies = [ "futures", "futures-utils-wasm", "lru", - "parking_lot", "pin-project", "reqwest", - "schnellru", "serde", "serde_json", - "thiserror 2.0.7", "tokio", "tracing", "url", - "wasmtimer", ] [[package]] @@ -332,12 +296,11 @@ dependencies = [ [[package]] name = "alloy-rpc-client" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6fc8b0f68619cfab3a2e15dca7b80ab266f78430bb4353dec546528e04b7449" +checksum = "5ba31bae67773fd5a60020bea900231f8396202b7feca4d0c70c6b59308ab4a8" dependencies = [ "alloy-json-rpc", - "alloy-primitives", "alloy-transport", "alloy-transport-http", "futures", @@ -350,75 +313,73 @@ dependencies = [ "tower", "tracing", "url", - "wasmtimer", ] [[package]] name = "alloy-rpc-types" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "986f23fe42ac95832901a24b93c20f7ed2b9644394c02b86222801230da60041" -dependencies = [ - "alloy-primitives", - "alloy-rpc-types-eth", - "alloy-serde", - "serde", -] - -[[package]] -name = "alloy-rpc-types-any" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57e3aa433d3657b42e98e257ee6fa201f5c853245648a33da8fbb7497a5008bf" +checksum = "184a7a42c7ba9141cc9e76368356168c282c3bc3d9e5d78f3556bdfe39343447" dependencies = [ - "alloy-consensus-any", "alloy-rpc-types-eth", "alloy-serde", ] [[package]] name = "alloy-rpc-types-eth" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0643cc497a71941f526454fe4fecb47e9307d3a7b6c05f70718a0341643bcc79" +checksum = "ab4123ee21f99ba4bd31bfa36ba89112a18a500f8b452f02b35708b1b951e2b9" dependencies = [ "alloy-consensus", - "alloy-consensus-any", "alloy-eips", - "alloy-network-primitives", - "alloy-primitives", + "alloy-primitives 0.7.7", "alloy-rlp", "alloy-serde", - "alloy-sol-types", - "derive_more", + "alloy-sol-types 0.7.7", "itertools 0.13.0", "serde", "serde_json", + "thiserror 1.0.69", ] [[package]] name = "alloy-serde" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea61b049d7ecc66a29f107970dae493d0908e366048f7484a1ca9b02c85f9b2b" +checksum = "9416c52959e66ead795a11f4a86c248410e9e368a0765710e57055b8a1774dd6" dependencies = [ - "alloy-primitives", + "alloy-primitives 0.7.7", "serde", "serde_json", ] [[package]] name = "alloy-signer" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93461b0e79c2ddd791fec5f369ab5c2686a33bbb03530144972edf5248f8a2c7" +checksum = "b33753c09fa1ad85e5b092b8dc2372f1e337a42e84b9b4cff9fede75ba4adb32" dependencies = [ - "alloy-primitives", + "alloy-primitives 0.7.7", "async-trait", "auto_impl", "elliptic-curve", "k256", - "thiserror 2.0.7", + "thiserror 1.0.69", +] + +[[package]] +name = "alloy-sol-macro" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b40397ddcdcc266f59f959770f601ce1280e699a91fc1862f29cef91707cd09" +dependencies = [ + "alloy-sol-macro-expander 0.7.7", + "alloy-sol-macro-input 0.7.7", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 2.0.90", ] [[package]] @@ -427,21 +388,39 @@ version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9d64f851d95619233f74b310f12bcf16e0cbc27ee3762b6115c14a84809280a" dependencies = [ - "alloy-sol-macro-expander", - "alloy-sol-macro-input", + "alloy-sol-macro-expander 0.8.15", + "alloy-sol-macro-input 0.8.15", "proc-macro-error2", "proc-macro2", "quote", "syn 2.0.90", ] +[[package]] +name = "alloy-sol-macro-expander" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "867a5469d61480fea08c7333ffeca52d5b621f5ca2e44f271b117ec1fc9a0525" +dependencies = [ + "alloy-sol-macro-input 0.7.7", + "const-hex", + "heck", + "indexmap", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 2.0.90", + "syn-solidity 0.7.7", + "tiny-keccak", +] + [[package]] name = "alloy-sol-macro-expander" version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bf7ed1574b699f48bf17caab4e6e54c6d12bc3c006ab33d58b1e227c1c3559f" dependencies = [ - "alloy-sol-macro-input", + "alloy-sol-macro-input 0.8.15", "const-hex", "heck", "indexmap", @@ -449,10 +428,25 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.90", - "syn-solidity", + "syn-solidity 0.8.15", "tiny-keccak", ] +[[package]] +name = "alloy-sol-macro-input" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e482dc33a32b6fadbc0f599adea520bd3aaa585c141a80b404d0a3e3fa72528" +dependencies = [ + "const-hex", + "dunce", + "heck", + "proc-macro2", + "quote", + "syn 2.0.90", + "syn-solidity 0.7.7", +] + [[package]] name = "alloy-sol-macro-input" version = "0.8.15" @@ -465,7 +459,17 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.90", - "syn-solidity", + "syn-solidity 0.8.15", +] + +[[package]] +name = "alloy-sol-type-parser" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbcba3ca07cf7975f15d871b721fb18031eec8bce51103907f6dcce00b255d98" +dependencies = [ + "serde", + "winnow", ] [[package]] @@ -478,24 +482,36 @@ dependencies = [ "winnow", ] +[[package]] +name = "alloy-sol-types" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a91ca40fa20793ae9c3841b83e74569d1cc9af29a2f5237314fd3452d51e38c7" +dependencies = [ + "alloy-primitives 0.7.7", + "alloy-sol-macro 0.7.7", + "const-hex", + "serde", +] + [[package]] name = "alloy-sol-types" version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1174cafd6c6d810711b4e00383037bdb458efc4fe3dbafafa16567e0320c54d8" dependencies = [ - "alloy-json-abi", - "alloy-primitives", - "alloy-sol-macro", + "alloy-json-abi 0.8.15", + "alloy-primitives 0.8.15", + "alloy-sol-macro 0.8.15", "const-hex", "serde", ] [[package]] name = "alloy-transport" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf656f983e14812df65b5aee37e7b37535f68a848295e6ed736b2054a405cb7" +checksum = "01b51a291f949f755e6165c3ed562883175c97423703703355f4faa4b7d0a57c" dependencies = [ "alloy-json-rpc", "base64", @@ -503,19 +519,18 @@ dependencies = [ "futures-utils-wasm", "serde", "serde_json", - "thiserror 2.0.7", + "thiserror 1.0.69", "tokio", "tower", "tracing", "url", - "wasmtimer", ] [[package]] name = "alloy-transport-http" -version = "0.8.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec938d51a47b7953b1c0fd8ddeb89a29eb113cd4908dfc4e01c7893b252d669f" +checksum = "86d65871f9f1cafe1ed25cde2f1303be83e6473e995a2d56c275ae4fcce6119c" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -526,22 +541,6 @@ dependencies = [ "url", ] -[[package]] -name = "alloy-trie" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a5fd8fea044cc9a8c8a50bb6f28e31f0385d820f116c5b98f6f4e55d6e5590b" -dependencies = [ - "alloy-primitives", - "alloy-rlp", - "arrayvec", - "derive_more", - "nybbles", - "serde", - "smallvec", - "tracing", -] - [[package]] name = "anyhow" version = "1.0.94" @@ -677,9 +676,6 @@ name = "arrayvec" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" -dependencies = [ - "serde", -] [[package]] name = "async-stream" @@ -903,6 +899,12 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "core-foundation" version = "0.9.4" @@ -928,12 +930,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crossbeam-utils" -version = "0.8.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" - [[package]] name = "crunchy" version = "0.2.2" @@ -964,12 +960,11 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.1.0" +version = "5.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "crossbeam-utils", "hashbrown 0.14.5", "lock_api", "once_cell", @@ -997,6 +992,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_more" +version = "0.99.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version 0.4.1", + "syn 2.0.90", +] + [[package]] name = "derive_more" version = "1.0.0" @@ -1353,12 +1361,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "hashbrown" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" - [[package]] name = "hashbrown" version = "0.14.5" @@ -1761,12 +1763,12 @@ dependencies = [ [[package]] name = "kinode_process_lib" version = "0.10.0" -source = "git+https://github.com/kinode-dao/process_lib?rev=0443ece#0443ece2a5dfdbdc1b40db454a1535e1b1c1a1b3" +source = "git+https://github.com/kinode-dao/process_lib?rev=0209da1#0209da15340d9a2b92152916a8349d8f248775e5" dependencies = [ "alloy", - "alloy-primitives", - "alloy-sol-macro", - "alloy-sol-types", + "alloy-primitives 0.7.7", + "alloy-sol-macro 0.7.7", + "alloy-sol-types 0.7.7", "anyhow", "bincode", "http", @@ -1784,8 +1786,8 @@ dependencies = [ name = "kns-indexer" version = "0.2.0" dependencies = [ - "alloy-primitives", - "alloy-sol-types", + "alloy-primitives 0.8.15", + "alloy-sol-types 0.8.15", "anyhow", "hex", "kinode_process_lib", @@ -1976,19 +1978,6 @@ dependencies = [ "syn 2.0.90", ] -[[package]] -name = "nybbles" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95f06be0417d97f81fe4e5c86d7d01b392655a9cac9c19a848aa033e18937b23" -dependencies = [ - "alloy-rlp", - "const-hex", - "proptest", - "serde", - "smallvec", -] - [[package]] name = "object" version = "0.36.5" @@ -2074,16 +2063,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "parking_lot" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" -dependencies = [ - "lock_api", - "parking_lot_core", -] - [[package]] name = "parking_lot_core" version = "0.9.10" @@ -2207,6 +2186,30 @@ dependencies = [ "toml_edit", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-error-attr2" version = "2.0.0" @@ -2554,17 +2557,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "schnellru" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9a8ef13a93c54d20580de1e5c413e624e53121d42fc7e2c11d10ef7f8b02367" -dependencies = [ - "ahash", - "cfg-if", - "hashbrown 0.13.2", -] - [[package]] name = "scopeguard" version = "1.2.0" @@ -2737,9 +2729,6 @@ name = "smallvec" version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" -dependencies = [ - "serde", -] [[package]] name = "socket2" @@ -2776,17 +2765,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" -[[package]] -name = "state" -version = "0.1.0" -dependencies = [ - "kinode_process_lib", - "process_macros", - "serde", - "serde_json", - "wit-bindgen", -] - [[package]] name = "static_assertions" version = "1.1.0" @@ -2843,6 +2821,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn-solidity" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c837dc8852cb7074e46b444afb81783140dab12c58867b49fb3898fbafedf7ea" +dependencies = [ + "paste", + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "syn-solidity" version = "0.8.15" @@ -3043,16 +3033,17 @@ dependencies = [ [[package]] name = "tower" -version = "0.5.2" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "pin-project", "pin-project-lite", - "sync_wrapper", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -3073,6 +3064,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3323,20 +3315,6 @@ dependencies = [ "semver 1.0.24", ] -[[package]] -name = "wasmtimer" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0048ad49a55b9deb3953841fa1fc5858f0efbcb7a18868c899a360269fac1b23" -dependencies = [ - "futures", - "js-sys", - "parking_lot", - "pin-utils", - "slab", - "wasm-bindgen", -] - [[package]] name = "web-sys" version = "0.3.76" diff --git a/kinode/packages/kns-indexer/Cargo.toml b/kinode/packages/kns-indexer/Cargo.toml index 662ed7a66..8e9024729 100644 --- a/kinode/packages/kns-indexer/Cargo.toml +++ b/kinode/packages/kns-indexer/Cargo.toml @@ -3,8 +3,7 @@ resolver = "2" members = [ "get-block", "kns-indexer", - "state", -] + ] [profile.release] panic = "abort" diff --git a/kinode/packages/kns-indexer/api/kns-indexer:sys-v0.wit b/kinode/packages/kns-indexer/api/kns-indexer:sys-v0.wit index fd3fb52fb..f49ba4ace 100644 --- a/kinode/packages/kns-indexer/api/kns-indexer:sys-v0.wit +++ b/kinode/packages/kns-indexer/api/kns-indexer:sys-v0.wit @@ -14,16 +14,12 @@ interface kns-indexer { /// returns an Option /// set block to 0 if you just want to get the current state of the indexer node-info(node-info-request), - /// return the entire state of the indexer at the given block - /// set block to 0 if you just want to get the current state of the indexer - get-state(get-state-request), } variant indexer-response { name(option), node-info(option), - get-state(wit-state), - } + } record namehash-to-name-request { hash: string, @@ -35,10 +31,6 @@ interface kns-indexer { block: u64, } - record get-state-request { - block: u64, - } - record wit-kns-update { name: string, public-key: string, @@ -46,14 +38,6 @@ interface kns-indexer { ports: list>, // map, but wit doesn't support maps routers: list, } - - record wit-state { - chain-id: u64, - contract-address: list, // 20-byte ETH address - names: list>, // map, but wit doesn't support maps - nodes: list>, // map, but wit doesn't support maps - last-block: u64, - } } world kns-indexer-sys-v0 { diff --git a/kinode/packages/kns-indexer/get-block/Cargo.toml b/kinode/packages/kns-indexer/get-block/Cargo.toml index d3ff69fcd..f818b82e2 100644 --- a/kinode/packages/kns-indexer/get-block/Cargo.toml +++ b/kinode/packages/kns-indexer/get-block/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" simulation-mode = [] [dependencies] -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "0443ece" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "0209da1" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = "0.36.0" diff --git a/kinode/packages/kns-indexer/kns-indexer/Cargo.toml b/kinode/packages/kns-indexer/kns-indexer/Cargo.toml index cf4deba51..2ba73d917 100644 --- a/kinode/packages/kns-indexer/kns-indexer/Cargo.toml +++ b/kinode/packages/kns-indexer/kns-indexer/Cargo.toml @@ -11,7 +11,7 @@ anyhow = "1.0" alloy-primitives = "0.8.15" alloy-sol-types = "0.8.15" hex = "0.4.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "0443ece" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "0209da1" } process_macros = "0.1" rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } diff --git a/kinode/packages/kns-indexer/kns-indexer/src/lib.rs b/kinode/packages/kns-indexer/kns-indexer/src/lib.rs index e07dc060b..6dc74763a 100644 --- a/kinode/packages/kns-indexer/kns-indexer/src/lib.rs +++ b/kinode/packages/kns-indexer/kns-indexer/src/lib.rs @@ -1,17 +1,18 @@ use crate::kinode::process::kns_indexer::{ - GetStateRequest, IndexerRequest, IndexerResponse, NamehashToNameRequest, NodeInfoRequest, - WitKnsUpdate, WitState, + IndexerRequest, IndexerResponse, NamehashToNameRequest, NodeInfoRequest, WitKnsUpdate, }; use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; use kinode_process_lib::{ - await_message, call_init, eth, kimap, net, print_to_terminal, println, timer, Address, Message, - Request, Response, + await_message, call_init, eth, kimap, + kv::{self, Kv}, + net, print_to_terminal, println, timer, Address, Message, Request, Response, }; use serde::{Deserialize, Serialize}; use std::{ - collections::{hash_map::HashMap, BTreeMap}, + collections::BTreeMap, net::{IpAddr, Ipv4Addr, Ipv6Addr}, + str::FromStr, }; wit_bindgen::generate!({ @@ -36,58 +37,179 @@ const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; // optimism #[cfg(feature = "simulation-mode")] const KIMAP_FIRST_BLOCK: u64 = 1; // local +const CURRENT_VERSION: u32 = 1; + const MAX_PENDING_ATTEMPTS: u8 = 3; const SUBSCRIPTION_TIMEOUT: u64 = 60; const DELAY_MS: u64 = 1_000; // 1s #[derive(Clone, Debug, Serialize, Deserialize)] struct State { - chain_id: u64, - // what contract this state pertains to - contract_address: eth::Address, - // namehash to human readable name - names: HashMap, - // human readable name to most recent on-chain routing information as json - nodes: HashMap, + // version of the state in kv + version: u32, // last block we have an update from last_block: u64, + // kv handle + // includes keys and values for: + // "meta:chain_id", "meta:version", "meta:last_block", "meta:contract_address", + // "names:{namehash}" -> "{name}", "nodes:{name}" -> "{node_info}" + kv: Kv>, } -impl From for WitState { - fn from(s: State) -> Self { - let contract_address: [u8; 20] = s.contract_address.into(); - WitState { - chain_id: s.chain_id.clone(), - contract_address: contract_address.to_vec(), - names: s - .names - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect::>(), - nodes: s - .nodes - .iter() - .map(|(k, v)| (k.clone(), v.clone().into())) - .collect::>(), - last_block: s.last_block.clone(), +impl State { + fn load(our: &Address) -> Self { + let kv: Kv> = match kv::open(our.package_id(), "kns_indexer", Some(10)) { + Ok(kv) => kv, + Err(e) => panic!("fatal: error opening kns_indexer key_value database: {e:?}"), + }; + + let mut state = Self { + kv, + version: 0, + last_block: 0, + }; + + let version = state.get_version(); + let chain_id = state.get_chain_id(); + let contract_address = state.get_contract_address(); + let last_block = state.get_last_block(); + + if version != CURRENT_VERSION + || chain_id != CHAIN_ID + || contract_address != eth::Address::from_str(KIMAP_ADDRESS).unwrap() + { + // if version/contract/chain_id are new, run migrations here. } + + state.set_chain_id(chain_id); + state.set_contract_address(contract_address); + state.set_version(CURRENT_VERSION); + + // update state struct with final values + state.version = version; + state.last_block = last_block; + + println!( + "\n 🐦‍⬛ KNS Indexer State\n\ + ▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔\n\ + Version {}\n\ + Last Block {}\n\ + Chain ID {}\n\ + KIMAP {}\n\ + ▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁\n", + state.version, + state.last_block, + chain_id, + contract_address.to_string(), + ); + + state + } + + fn meta_version_key() -> String { + "meta:version".to_string() } -} -impl From for State { - fn from(s: WitState) -> Self { - let contract_address: [u8; 20] = s - .contract_address - .try_into() - .expect("invalid contract addess: doesn't have 20 bytes"); - State { - chain_id: s.chain_id.clone(), - contract_address: contract_address.into(), - names: HashMap::from_iter(s.names), - nodes: HashMap::from_iter(s.nodes.iter().map(|(k, v)| (k.clone(), v.clone().into()))), - last_block: s.last_block.clone(), + fn meta_last_block_key() -> String { + "meta:last_block".to_string() + } + + fn meta_chain_id_key() -> String { + "meta:chain_id".to_string() + } + + fn meta_contract_address_key() -> String { + "meta:contract_address".to_string() + } + + fn name_key(namehash: &str) -> String { + format!("name:{}", namehash) + } + + fn node_key(name: &str) -> String { + format!("node:{}", name) + } + + fn get_last_block(&self) -> u64 { + self.kv + .get_as::(&Self::meta_last_block_key()) + .ok() + .unwrap_or(KIMAP_FIRST_BLOCK) + } + + fn set_last_block(&mut self, block: u64) { + self.kv + .set_as::(&Self::meta_last_block_key(), &block, None) + .unwrap(); + self.last_block = block; + } + + fn get_version(&self) -> u32 { + self.kv + .get_as::(&Self::meta_version_key()) + .ok() + .unwrap_or(CURRENT_VERSION) + } + + fn set_version(&mut self, version: u32) { + self.kv + .set_as::(&Self::meta_version_key(), &version, None) + .unwrap(); + self.version = version; + } + + fn get_name(&self, namehash: &str) -> Option { + self.kv + .get(&Self::name_key(namehash)) + .ok() + .and_then(|bytes| String::from_utf8(bytes).ok()) + } + + fn set_name(&mut self, namehash: &str, name: &str) { + self.kv + .set(&Self::name_key(namehash), &name.as_bytes().to_vec(), None) + .unwrap(); + } + + fn get_node(&self, name: &str) -> Option { + self.kv.get_as::(&Self::node_key(name)).ok() + } + + fn set_node(&mut self, name: &str, node: &net::KnsUpdate) { + self.kv + .set_as::(&Self::node_key(name), &node, None) + .unwrap(); + } + + fn get_chain_id(&self) -> u64 { + self.kv + .get_as::(&Self::meta_chain_id_key()) + .ok() + .unwrap_or(CHAIN_ID) + } + + fn set_chain_id(&mut self, chain_id: u64) { + self.kv + .set_as::(&Self::meta_chain_id_key(), &chain_id, None) + .unwrap(); + } + + fn get_contract_address(&self) -> eth::Address { + match self + .kv + .get_as::(&Self::meta_contract_address_key()) + { + Ok(addr) => addr, + Err(_) => eth::Address::from_str(KIMAP_ADDRESS) + .expect("Failed to parse KIMAP_ADDRESS constant"), } } + + fn set_contract_address(&mut self, contract_address: eth::Address) { + self.kv + .set_as::(&Self::meta_contract_address_key(), &contract_address, None) + .expect("Failed to set contract address"); + } } impl From for WitKnsUpdate { @@ -126,20 +248,8 @@ enum KnsError { call_init!(init); fn init(our: Address) { - println!("indexing on contract address {KIMAP_ADDRESS}"); - - // we **can** persist PKI state between boots but with current size, it's - // more robust just to reload the whole thing. the new contracts will allow - // us to quickly verify we have the updated mapping with root hash, but right - // now it's tricky to recover from missed events. - - let state = State { - chain_id: CHAIN_ID, - contract_address: KIMAP_ADDRESS.parse::().unwrap(), - nodes: HashMap::new(), - names: HashMap::new(), - last_block: KIMAP_FIRST_BLOCK, - }; + // state is loaded from kv, and updated with the current block number and version. + let state = State::load(&our); if let Err(e) = main(our, state) { println!("fatal error: {e}"); @@ -150,9 +260,14 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { #[cfg(feature = "simulation-mode")] add_temp_hardcoded_tlzs(&mut state); + let chain_id = state.get_chain_id(); + let kimap_address = state.get_contract_address(); + let last_block = state.get_last_block(); + // sub_id: 1 let mints_filter = eth::Filter::new() - .address(state.contract_address) + .address(kimap_address) + .from_block(last_block) .to_block(eth::BlockNumberOrTag::Latest) .event("Mint(bytes32,bytes32,bytes,bytes)"); @@ -166,29 +281,21 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // sub_id: 2 let notes_filter = eth::Filter::new() - .address(state.contract_address) + .address(kimap_address) + .from_block(last_block) .to_block(eth::BlockNumberOrTag::Latest) .event("Note(bytes32,bytes32,bytes,bytes,bytes)") .topic3(notes); // 60s timeout -- these calls can take a long time // if they do time out, we try them again - let eth_provider: eth::Provider = eth::Provider::new(state.chain_id, SUBSCRIPTION_TIMEOUT); - - print_to_terminal( - 1, - &format!( - "subscribing, state.block: {}, chain_id: {}", - state.last_block - 1, - state.chain_id - ), - ); + let eth_provider: eth::Provider = eth::Provider::new(chain_id, SUBSCRIPTION_TIMEOUT); + let _kimap_helper = kimap::Kimap::new(eth_provider.clone(), kimap_address); // subscribe to logs first, so no logs are missed - println!("subscribing to new logs..."); eth_provider.subscribe_loop(1, mints_filter.clone()); eth_provider.subscribe_loop(2, notes_filter.clone()); - + println!("done subscribing to new logs."); // if subscription results come back in the wrong order, we store them here // until the right block is reached. @@ -254,22 +361,19 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // TODO: make sure we've seen the whole block, while actually // sending a response to the proper place. Response::new() - .body(IndexerResponse::Name(state.names.get(hash).cloned())) + .body(IndexerResponse::Name(state.get_name(hash))) .send()?; } IndexerRequest::NodeInfo(NodeInfoRequest { ref name, .. }) => { Response::new() - .body(IndexerResponse::NodeInfo( - state.nodes.get(name).map(|n| n.clone().into()), + .body(&IndexerResponse::NodeInfo( + state + .get_node(name) + .map(|update| WitKnsUpdate::from(update)), )) .send()?; } - IndexerRequest::GetState(GetStateRequest { .. }) => { - Response::new() - .body(IndexerResponse::GetState(state.clone().into())) - .send()?; - } } } } @@ -308,7 +412,7 @@ fn handle_eth_message( let block_number = eth_provider.get_block_number(); if let Ok(block_number) = block_number { print_to_terminal(2, &format!("new block: {}", block_number)); - state.last_block = block_number; + state.set_last_block(block_number); } } handle_pending_notes(state, pending_notes)?; @@ -346,15 +450,9 @@ fn handle_pending_notes( None => { print_to_terminal(1, &format!("pending note handling error: {e:?}")) } - Some(ee) => match ee { - KnsError::NoParentError => { - // print_to_terminal( - // 1, - // &format!("note still awaiting mint; attempt {attempt}"), - // ); - keep_notes.push((note, attempt + 1)); - } - }, + Some(KnsError::NoParentError) => { + keep_notes.push((note, attempt + 1)); + } } } } @@ -382,68 +480,53 @@ fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Resul return Err(anyhow::anyhow!("skipping invalid note: {note_label}")); } - let Some(node_name) = get_parent_name(&state.names, &node_hash) else { + let Some(node_name) = get_parent_name(&state, &node_hash) else { return Err(KnsError::NoParentError.into()); }; - match note_label.as_str() { - "~ws-port" => { - let ws = bytes_to_port(¬e.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { + if let Some(mut node) = state.get_node(&node_name) { + match note_label.as_str() { + "~ws-port" => { + let ws = bytes_to_port(¬e.data)?; node.ports.insert("ws".to_string(), ws); - // port defined, -> direct - node.routers = vec![]; + node.routers = vec![]; // port defined, -> direct } - } - "~tcp-port" => { - let tcp = bytes_to_port(¬e.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { + "~tcp-port" => { + let tcp = bytes_to_port(¬e.data)?; node.ports.insert("tcp".to_string(), tcp); - // port defined, -> direct - node.routers = vec![]; + node.routers = vec![]; // port defined, -> direct } - } - "~net-key" => { - if note.data.len() != 32 { - return Err(anyhow::anyhow!("invalid net-key length")); - } - if let Some(node) = state.nodes.get_mut(&node_name) { + "~net-key" => { + if note.data.len() != 32 { + return Err(anyhow::anyhow!("invalid net-key length")); + } node.public_key = hex::encode(¬e.data); } - } - "~routers" => { - let routers = decode_routers(¬e.data, state); - if let Some(node) = state.nodes.get_mut(&node_name) { + "~routers" => { + let routers = decode_routers(¬e.data, state); node.routers = routers; - // -> indirect - node.ports = BTreeMap::new(); + node.ports = BTreeMap::new(); // -> indirect node.ips = vec![]; } - } - "~ip" => { - let ip = bytes_to_ip(¬e.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { + "~ip" => { + let ip = bytes_to_ip(¬e.data)?; node.ips = vec![ip.to_string()]; - // -> direct - node.routers = vec![]; + node.routers = vec![]; // -> direct + } + _other => { + // Ignore unknown notes } } - _other => { - // Ignore unknown notes - } - } - // only send an update if we have a *full* set of data for networking: - // a node name, plus either or - if let Some(node_info) = state.nodes.get(&node_name) { - if !node_info.public_key.is_empty() - && ((!node_info.ips.is_empty() && !node_info.ports.is_empty()) - || node_info.routers.len() > 0) + // Update the node in the state + state.set_node(&node_name, &node); + + // Only send an update if we have a *full* set of data for networking + if !node.public_key.is_empty() + && ((!node.ips.is_empty() && !node.ports.is_empty()) || !node.routers.is_empty()) { Request::to(("our", "net", "distro", "sys")) - .body(rmp_serde::to_vec(&net::NetAction::KnsUpdate( - node_info.clone(), - ))?) + .body(rmp_serde::to_vec(&net::NetAction::KnsUpdate(node))?) .send()?; } } @@ -457,7 +540,7 @@ fn handle_log( log: ð::Log, ) -> anyhow::Result<()> { if let Some(block) = log.block_number { - state.last_block = block; + state.set_last_block(block); } match log.topics()[0] { @@ -471,15 +554,15 @@ fn handle_log( return Err(anyhow::anyhow!("skipping invalid name: {name}")); } - let full_name = match get_parent_name(&state.names, &parent_hash) { + let full_name = match get_parent_name(&state, &parent_hash) { Some(parent_name) => format!("{name}.{parent_name}"), None => name, }; - state.names.insert(child_hash.clone(), full_name.clone()); - state.nodes.insert( - full_name.clone(), - net::KnsUpdate { + state.set_name(&child_hash.clone(), &full_name.clone()); + state.set_node( + &full_name.clone(), + &net::KnsUpdate { name: full_name.clone(), public_key: String::new(), ips: Vec::new(), @@ -527,7 +610,7 @@ fn fetch_and_process_logs( filter: eth::Filter, pending_notes: &mut BTreeMap>, ) { - let filter = filter.from_block(KIMAP_FIRST_BLOCK); + let filter = filter.from_block(state.last_block); loop { match eth_provider.get_logs(&filter) { Ok(logs) => { @@ -546,13 +629,13 @@ fn fetch_and_process_logs( } } -fn get_parent_name(names: &HashMap, parent_hash: &str) -> Option { - let mut current_hash = parent_hash; +fn get_parent_name(state: &State, parent_hash: &str) -> Option { + let mut current_hash = parent_hash.to_string(); let mut components = Vec::new(); // Collect components in a vector let mut visited_hashes = std::collections::HashSet::new(); - while let Some(parent_name) = names.get(current_hash) { - if !visited_hashes.insert(current_hash) { + while let Some(parent_name) = state.get_name(¤t_hash) { + if !visited_hashes.insert(current_hash.clone()) { break; } @@ -561,7 +644,7 @@ fn get_parent_name(names: &HashMap, parent_hash: &str) -> Option } // Update current_hash to the parent's hash for the next iteration - if let Some(new_parent_hash) = names.get(parent_name) { + if let Some(new_parent_hash) = state.get_name(&parent_name) { current_hash = new_parent_hash; } else { break; @@ -581,13 +664,13 @@ fn get_parent_name(names: &HashMap, parent_hash: &str) -> Option #[cfg(feature = "simulation-mode")] fn add_temp_hardcoded_tlzs(state: &mut State) { // add some hardcoded top level zones - state.names.insert( - "0xdeeac81ae11b64e7cab86d089c306e5d223552a630f02633ce170d2786ff1bbd".to_string(), - "os".to_string(), + state.set_name( + &"0xdeeac81ae11b64e7cab86d089c306e5d223552a630f02633ce170d2786ff1bbd".to_string(), + &"os".to_string(), ); - state.names.insert( - "0x137d9e4cc0479164d40577620cb3b41b083c6e8dbf58f8523be76d207d6fd8ea".to_string(), - "dev".to_string(), + state.set_name( + &"0x137d9e4cc0479164d40577620cb3b41b083c6e8dbf58f8523be76d207d6fd8ea".to_string(), + &"dev".to_string(), ); } @@ -605,7 +688,7 @@ fn decode_routers(data: &[u8], state: &State) -> Vec { for chunk in data.chunks(32) { let hash_str = format!("0x{}", hex::encode(chunk)); - match state.names.get(&hash_str) { + match state.get_name(&hash_str) { Some(full_name) => routers.push(full_name.clone()), None => print_to_terminal( 1, diff --git a/kinode/packages/kns-indexer/pkg/manifest.json b/kinode/packages/kns-indexer/pkg/manifest.json index 3ed563dbb..fe7b4fd17 100644 --- a/kinode/packages/kns-indexer/pkg/manifest.json +++ b/kinode/packages/kns-indexer/pkg/manifest.json @@ -8,12 +8,14 @@ "eth:distro:sys", "http-server:distro:sys", "net:distro:sys", - "timer:distro:sys" + "timer:distro:sys", + "kv:distro:sys" ], "grant_capabilities": [ "eth:distro:sys", "http-server:distro:sys", - "timer:distro:sys" + "timer:distro:sys", + "kv:distro:sys" ], "public": false } diff --git a/kinode/packages/kns-indexer/pkg/scripts.json b/kinode/packages/kns-indexer/pkg/scripts.json index bee99cd16..25798ef73 100644 --- a/kinode/packages/kns-indexer/pkg/scripts.json +++ b/kinode/packages/kns-indexer/pkg/scripts.json @@ -10,17 +10,5 @@ "eth:distro:sys" ], "wit_version": 1 - }, - "state.wasm": { - "root": false, - "public": false, - "request_networking": false, - "request_capabilities": [ - "kns-indexer:kns-indexer:sys" - ], - "grant_capabilities": [ - "kns-indexer:kns-indexer:sys" - ], - "wit_version": 1 } } diff --git a/kinode/packages/kns-indexer/state/Cargo.toml b/kinode/packages/kns-indexer/state/Cargo.toml deleted file mode 100644 index 34d14ca93..000000000 --- a/kinode/packages/kns-indexer/state/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "state" -version = "0.1.0" -edition = "2021" - -[features] -simulation-mode = [] - -[dependencies] -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "0443ece" } -process_macros = "0.1" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -wit-bindgen = "0.36.0" - -[lib] -crate-type = ["cdylib"] - -[package.metadata.component] -package = "kinode:process" diff --git a/kinode/packages/kns-indexer/state/src/lib.rs b/kinode/packages/kns-indexer/state/src/lib.rs deleted file mode 100644 index 27df8a95d..000000000 --- a/kinode/packages/kns-indexer/state/src/lib.rs +++ /dev/null @@ -1,46 +0,0 @@ -use crate::kinode::process::kns_indexer::{GetStateRequest, IndexerRequest, IndexerResponse}; -use kinode_process_lib::{eth, script, Address, Message, Request}; - -wit_bindgen::generate!({ - path: "target/wit", - world: "kns-indexer-sys-v0", - generate_unused_types: true, - additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto], -}); - -script!(init); -fn init(_our: Address, _args: String) -> String { - // we don't take any args - - let Ok(Message::Response { body, .. }) = - Request::to(("our", "kns-indexer", "kns-indexer", "sys")) - .body(IndexerRequest::GetState(GetStateRequest { block: 0 })) - .send_and_await_response(10) - .unwrap() - else { - return "failed to get state from kns-indexer".to_string(); - }; - let Ok(IndexerResponse::GetState(state)) = body.try_into() else { - return "failed to deserialize state".to_string(); - }; - // can change later, but for now, just print every known node name - let mut names = state - .names - .iter() - .map(|(_k, v)| v.clone()) - .collect::>(); - names.sort(); - let contract_address: [u8; 20] = state - .contract_address - .try_into() - .expect("invalid contract addess: doesn't have 20 bytes"); - let contract_address: eth::Address = contract_address.into(); - format!( - "\nrunning on chain id {}\nCA: {}\n{} known nodes as of block {}\n {}", - state.chain_id, - contract_address, - names.len(), - state.last_block, - names.join("\n ") - ) -} diff --git a/kinode/src/sqlite.rs b/kinode/src/sqlite.rs index 267c636ea..4ce58e060 100644 --- a/kinode/src/sqlite.rs +++ b/kinode/src/sqlite.rs @@ -83,10 +83,10 @@ impl SqliteState { fs::create_dir_all(&db_path).await?; - let db_file_path = format!("{}.db", db); + let db_file_path = db_path.join(format!("{}.db", db)); let db_conn = Connection::open(db_file_path)?; - let _ = db_conn.execute("PRAGMA journal_mode=WAL", []); + let _: String = db_conn.query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))?; self.open_dbs.insert(key, Mutex::new(db_conn));