Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 139 additions & 65 deletions kinode/packages/app_store/chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@ use alloy_sol_types::SolEvent;
use kinode::process::chain::ChainResponses;
use kinode_process_lib::{
await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap,
print_to_terminal, println, timer, Address, Message, PackageId, Request, Response,
print_to_terminal, println, set_state, timer, Address, Message, PackageId, Request, Response,
};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
str::FromStr,
};
use std::collections::{HashMap, HashSet};

wit_bindgen::generate!({
path: "target/wit",
Expand All @@ -63,20 +60,109 @@ const KIMAP_ADDRESS: &str = "0x9CE8cCD2932DC727c70f9ae4f8C2b68E6Abed58C";

const DELAY_MS: u64 = 1_000; // 1s

#[derive(Debug, Serialize, Deserialize)]
pub struct State {
/// the kimap helper we are using
pub kimap: kimap::Kimap,
pub const CURRENT_VERSION: u32 = 1;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionedState {
version: u32,
#[serde(flatten)]
state: StateEnum,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum StateEnum {
V1(AppStoreState),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppStoreState {
/// the last block at which we saved the state of the listings to disk.
/// when we boot, we can read logs starting from this block and
/// rebuild latest state.
pub last_saved_block: u64,
pub last_block: u64,
/// onchain listings
pub listings: HashMap<PackageId, PackageListing>,
/// set of packages that we have published
pub published: HashSet<PackageId>,
}

impl AppStoreState {
pub fn save(&self) -> anyhow::Result<()> {
let versioned_state = VersionedState {
version: CURRENT_VERSION,
state: StateEnum::V1(self.clone()),
};
set_state(&serde_json::to_vec(&versioned_state)?);
Ok(())
}
}

impl VersionedState {
pub fn new() -> Self {
Self {
version: CURRENT_VERSION,
state: StateEnum::V1(AppStoreState {
last_block: 0,
listings: HashMap::new(),
published: HashSet::new(),
}),
}
}

pub fn state(&self) -> &AppStoreState {
match &self.state {
StateEnum::V1(state) => state,
}
}

pub fn state_mut(&mut self) -> &mut AppStoreState {
match &mut self.state {
StateEnum::V1(state) => state,
}
}

pub fn update_block(&mut self, block: u64) {
if block > 2 {
self.state_mut().last_block = block - 2;
}
}

pub fn load_or_create() -> Self {
match get_state() {
Some(bytes) => match serde_json::from_slice(&bytes) {
Ok(state) => {
let state: VersionedState = state;
if state.version != CURRENT_VERSION {
println!(
"migrating state from version {} to {}",
state.version, CURRENT_VERSION
);
state.migrate()
} else {
state
}
}
Err(e) => {
println!("failed to deserialize state: {e}, creating new"); // note, dangerous but we can always re-index.
Self::new()
}
},
None => Self::new(),
}
}

// potential future migrations can be added here.
fn migrate(self) -> Self {
match self.version {
1 => self, // current version
// 2 => migrate_to_v3(self),
// 3 => migrate_to_v4(self),
_ => Self::new(), // unknown version, start fresh
}
}
}

/// listing information derived from metadata hash in listing event
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PackageListing {
Expand Down Expand Up @@ -106,24 +192,34 @@ fn init(our: Address) {
// can change, log requests can take quite a long time.
let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT);

let mut state = fetch_state(eth_provider);
fetch_and_subscribe_logs(&our, &mut state);
let mut versioned_state = VersionedState::load_or_create();
let state = versioned_state.state_mut();

let contract_address = KIMAP_ADDRESS.parse::<eth::Address>().unwrap();
let kimap = kimap::Kimap::new(eth_provider, contract_address);

fetch_and_subscribe_logs(&our, state, &kimap);

loop {
match await_message() {
Err(send_error) => {
print_to_terminal(1, &format!("chain: got network error: {send_error}"));
}
Ok(message) => {
if let Err(e) = handle_message(&our, &mut state, &message) {
if let Err(e) = handle_message(&our, state, &message, &kimap) {
print_to_terminal(1, &format!("chain: error handling message: {:?}", e));
}
}
}
}
}

fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> {
fn handle_message(
our: &Address,
state: &mut AppStoreState,
message: &Message,
kimap: &kimap::Kimap,
) -> anyhow::Result<()> {
if !message.is_request() {
if message.is_local(&our) && message.source().process == "timer:distro:sys" {
// handling of ETH RPC subscriptions delayed by DELAY_MS
Expand All @@ -132,7 +228,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
return Err(anyhow::anyhow!("foo"));
};
let log = serde_json::from_slice(context)?;
handle_eth_log(our, state, log, false)?;
handle_eth_log(our, state, log, kimap, false)?;
return Ok(());
}
} else {
Expand All @@ -153,10 +249,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
}
} else {
// attempt to resubscribe
state
.kimap
.provider
.subscribe_loop(1, app_store_filter(state));
kimap.provider.subscribe_loop(1, app_store_filter());
}
}
Req::Request(chains) => {
Expand All @@ -168,7 +261,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
Ok(())
}

fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result<()> {
fn handle_local_request(state: &mut AppStoreState, req: ChainRequests) -> anyhow::Result<()> {
match req {
ChainRequests::GetApp(package_id) => {
let onchain_app = state
Expand Down Expand Up @@ -213,6 +306,7 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result
ChainRequests::StartAutoUpdate(package_id) => {
if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) {
listing.auto_update = true;
state.save()?;
let response = ChainResponses::AutoUpdateStarted;
Response::new().body(&response).send()?;
} else {
Expand All @@ -223,6 +317,7 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result
ChainRequests::StopAutoUpdate(package_id) => {
if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) {
listing.auto_update = false;
state.save()?;
let response = ChainResponses::AutoUpdateStopped;
Response::new().body(&response).send()?;
} else {
Expand All @@ -236,8 +331,9 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result

fn handle_eth_log(
our: &Address,
state: &mut State,
state: &mut AppStoreState,
log: eth::Log,
kimap: &kimap::Kimap,
startup: bool,
) -> anyhow::Result<()> {
let block_number: u64 = log
Expand Down Expand Up @@ -272,7 +368,7 @@ fn handle_eth_log(
let hash_note = format!("~metadata-hash.{}", note.parent_path);

// owner can change which we don't track (yet?) so don't save, need to get when desired
let (tba, _owner, data) = match state.kimap.get(&hash_note) {
let (tba, _owner, data) = match kimap.get(&hash_note) {
Ok(gr) => Ok(gr),
Err(e) => match e {
eth::EthError::RpcError(_) => {
Expand All @@ -281,7 +377,7 @@ fn handle_eth_log(
// `timer:distro:sys` so that events are processed in
// order of receipt
std::thread::sleep(std::time::Duration::from_millis(DELAY_MS));
state.kimap.get(&hash_note)
kimap.get(&hash_note)
}
_ => Err(e),
},
Expand Down Expand Up @@ -339,7 +435,12 @@ fn handle_eth_log(
}
}

state.last_block = block_number;

if !startup {
// save logs immediately if not startup.
state.save()?;

// if auto_update is enabled, send a message to downloads to kick off the update.
if let Some(listing) = state.listings.get(&package_id) {
if listing.auto_update {
Expand All @@ -357,15 +458,13 @@ fn handle_eth_log(
}
}

state.last_saved_block = block_number;

Ok(())
}

/// after startup, fetch metadata for all listings
/// we do this as a separate step to not repeatedly fetch outdated metadata
/// as we process logs.
fn update_all_metadata(state: &mut State) {
fn update_all_metadata(state: &mut AppStoreState, kimap: &kimap::Kimap) {
state.listings.retain(|package_id, listing| {
let (tba, metadata_hash) = {
// generate ~metadata-hash full-path
Expand All @@ -376,7 +475,7 @@ fn update_all_metadata(state: &mut State) {
);

// owner can change which we don't track (yet?) so don't save, need to get when desired
let Ok((tba, _owner, data)) = (match state.kimap.get(&hash_note) {
let Ok((tba, _owner, data)) = (match kimap.get(&hash_note) {
Ok(gr) => Ok(gr),
Err(e) => match e {
eth::EthError::RpcError(_) => {
Expand All @@ -385,7 +484,7 @@ fn update_all_metadata(state: &mut State) {
// `timer:distro:sys` so that events are processed in
// order of receipt
std::thread::sleep(std::time::Duration::from_millis(DELAY_MS));
state.kimap.get(&hash_note)
kimap.get(&hash_note)
}
_ => Err(e),
},
Expand Down Expand Up @@ -423,6 +522,9 @@ fn update_all_metadata(state: &mut State) {
}
true
});
if let Err(e) = state.save() {
print_to_terminal(0, &format!("error saving state: {e}"));
}
}

/// create the filter used for app store getLogs and subscription.
Expand All @@ -431,31 +533,29 @@ fn update_all_metadata(state: &mut State) {
/// at the URI.
///
/// this means that ~metadata-hash should be *posted before or at the same time* as ~metadata-uri!
pub fn app_store_filter(state: &State) -> eth::Filter {
pub fn app_store_filter() -> eth::Filter {
let notes = vec![keccak256("~metadata-uri")];
let contract_address = KIMAP_ADDRESS.parse::<eth::Address>().unwrap();

eth::Filter::new()
.address(*state.kimap.address())
.address(contract_address)
.events([kimap::contract::Note::SIGNATURE])
.topic3(notes)
}

/// create a filter to fetch app store event logs from chain and subscribe to new events
pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) {
let filter = app_store_filter(state);
pub fn fetch_and_subscribe_logs(our: &Address, state: &mut AppStoreState, kimap: &kimap::Kimap) {
let filter = app_store_filter();
// get past logs, subscribe to new ones.
// subscribe first so we don't miss any logs
println!("subscribing...");
state.kimap.provider.subscribe_loop(1, filter.clone());
for log in fetch_logs(
&state.kimap.provider,
&filter.from_block(state.last_saved_block),
) {
if let Err(e) = handle_eth_log(our, state, log, true) {
kimap.provider.subscribe_loop(1, filter.clone());
for log in fetch_logs(&kimap.provider, &filter.from_block(state.last_block)) {
if let Err(e) = handle_eth_log(our, state, log, kimap, true) {
print_to_terminal(1, &format!("error ingesting log: {e}"));
};
}
update_all_metadata(state);
update_all_metadata(state, kimap);
}

/// fetch logs from the chain with a given filter
Expand Down Expand Up @@ -504,32 +604,6 @@ pub fn keccak_256_hash(bytes: &[u8]) -> String {
format!("0x{:x}", hasher.finalize())
}

/// fetch state from disk or create a new one if that fails
pub fn fetch_state(provider: eth::Provider) -> State {
if let Some(state_bytes) = get_state() {
match serde_json::from_slice::<State>(&state_bytes) {
Ok(state) => {
if state.kimap.address().to_string() == KIMAP_ADDRESS {
return state;
} else {
println!(
"state contract address mismatch. rebuilding state! expected {}, got {}",
KIMAP_ADDRESS,
state.kimap.address().to_string()
);
}
}
Err(e) => println!("failed to deserialize saved state, rebuilding: {e}"),
}
}
State {
kimap: kimap::Kimap::new(provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()),
last_saved_block: 0,
listings: HashMap::new(),
published: HashSet::new(),
}
}

// quite annoyingly, we must convert from our gen'd version of PackageId
// to the process_lib's gen'd version. this is in order to access custom
// Impls that we want to use
Expand Down
Loading
Loading