diff --git a/event_feed/Cargo.toml b/event_feed/Cargo.toml index f358aa1..a6925ba 100644 --- a/event_feed/Cargo.toml +++ b/event_feed/Cargo.toml @@ -26,7 +26,8 @@ tendermint-rpc = { version = "0.35.0", features = [ ] } futures = "0.3.30" base64 = "0.22.0" -clap ={ version = "4.5.4", features = [ "derive", "env" ]} +clap = { version = "4.5.4", features = ["derive", "env"] } +thiserror = "1.0.59" [dependencies.ethers] version = '2.0.8' diff --git a/event_feed/src/common/context.rs b/event_feed/src/common/context.rs index 0f36eab..909b37d 100644 --- a/event_feed/src/common/context.rs +++ b/event_feed/src/common/context.rs @@ -1,10 +1,10 @@ use super::CosmosFeed; use super::EthFeed; +// use crate::error::IOError; use crate::IconFeed; use crate::PolkadotFeed; -use anyhow::Result; use subxt::PolkadotConfig; - +use anyhow::Result; /// Represents the main context of the event feed. This is used to configure the event feed with the /// appropriate chain. pub enum Context { @@ -15,7 +15,6 @@ pub enum Context { } impl Context { - /// Starts the event feed. pub async fn feed_events(&self, cb: &dyn Fn(Vec)) -> Result<()> { match self { diff --git a/event_feed/src/common/kuska_client.rs b/event_feed/src/common/kuska_client.rs index 58e8e12..ab0917c 100644 --- a/event_feed/src/common/kuska_client.rs +++ b/event_feed/src/common/kuska_client.rs @@ -1,4 +1,5 @@ use super::*; +use crate::error::IOError; use anyhow::anyhow; use anyhow::Result; use runtime::api::dto::content::Mention; @@ -61,7 +62,8 @@ impl Producer { .publish(&message.to_string(), Some(vec![mention])) .await; - result.unwrap(); + // result.map_err(|e| IOError::Anyhow(e))?; + result.map_err(|e| IOError::Anyhow(anyhow!("{e}")))?; Ok(()) } diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index d7d85ae..5869c32 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -1,7 +1,8 @@ use super::*; use crate::common; +use crate::error::IOError; use anyhow::*; -use futures::StreamExt; +use core::result::Result::Ok; use runtime::{logger::CoreLogger, Logger}; use serde_json::Value; use tendermint_rpc::event::{Event, EventData}; @@ -41,36 +42,57 @@ impl CosmosFeed { pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url) .await - .unwrap(); + .map_err(|err| IOError::Other(format!("Client not created {}", err)))?; self.logger.info(&format!( "Following the chain at {}", self.chain_config.node_url )); let driver_handle = tokio::spawn(async move { driver.run().await }); - let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap(); + let mut subs = client + .subscribe(EventType::NewBlock.into()) + .await + .map_err(|err| IOError::Other(format!("Error in subscribing the client {}", err)))?; let mut events: Vec = Vec::new(); while let Some(res) = subs.next().await { - let ev = res.unwrap(); - events.push(ev.clone()); - - let filter_events = events - .iter() - .filter(|tendermint_event| { - Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config) - .is_some() - }) - .flat_map(|tendermint_event| { - Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config) - .unwrap() - }) - .collect::>(); - - // Call the callback with all or filtered events - cb(filter_events); - - events.clear(); + match res { + Ok(ev) => { + events.push(ev.clone()); + let filter_events = events + .iter() + .filter(|tendermint_event| { + Self::convert_to_feeder_event( + self, + tendermint_event, + &self.chain_config, + ) + .is_some() + }) + .flat_map(|tendermint_event| { + match Self::convert_to_feeder_event( + self, + tendermint_event, + &self.chain_config, + ) { + Some(value) => value, + None => { + let _ = Err::(IOError::Other( + "Error converting to FeederEvent".to_string(), + )); + vec![serde_json::Value::Null] + } + } + }) + .collect::>(); + cb(filter_events); + events.clear(); + } + Err(err) => { + // Consider retry logic or other actions here + return Err(IOError::Other(format!("Error receving event {} ", err)).into()); + } + } } drop(subs); @@ -102,7 +124,7 @@ impl CosmosFeed { result_begin_block, result_end_block: _, } => { - let block = block.as_ref().unwrap(); + let block = block.as_ref()?; let block_number = block.header.version.block as usize; let hash_string = block.header.last_commit_hash.map(|h| h.to_string()); self.logger.info(&format!( @@ -127,7 +149,12 @@ impl CosmosFeed { tx_hash: hash_string.clone(), log_index: 0, }) - .map(|e| serde_json::to_value(e).unwrap()) + .map(|e| { + serde_json::to_value(e).unwrap_or_else(|err| { + eprintln!("Error converting FeederEvent to Json: {}", err); + serde_json::Value::Null + }) + }) .collect() } else { self.logger.info("Filtering events based on the event name"); @@ -146,7 +173,12 @@ impl CosmosFeed { }) .collect::>() .into_iter() - .map(|e| serde_json::to_value(e).unwrap()) + .map(|e| { + serde_json::to_value(e).unwrap_or_else(|err| { + eprintln!("Error converting FeederEvent to Json: {}", err); + serde_json::Value::Null + }) + }) .collect() }; diff --git a/event_feed/src/error/io.rs b/event_feed/src/error/io.rs new file mode 100644 index 0000000..a7a21e7 --- /dev/null +++ b/event_feed/src/error/io.rs @@ -0,0 +1,20 @@ +use super::*; + +#[derive(Error, Debug)] +pub enum IOError { + Anyhow(Error), + Other(String), + Std(Box), + Subxt(subxt::Error), +} + +impl Display for IOError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IOError::Anyhow(error) => write!(f, "{}", error), + IOError::Other(error) => write!(f, "{}", error), + IOError::Std(error) => write!(f, "{}", error), + IOError::Subxt(error) => write!(f, "{}", error), + } + } +} diff --git a/event_feed/src/error/mod.rs b/event_feed/src/error/mod.rs new file mode 100644 index 0000000..f4e0efc --- /dev/null +++ b/event_feed/src/error/mod.rs @@ -0,0 +1,6 @@ +mod io; +pub use io::*; + +use anyhow::Error; +use std::fmt::Display; +use thiserror::Error; diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index 30ede70..ef5f52e 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -1,4 +1,7 @@ use super::*; +use crate::error::IOError; +use core::result::Result::Ok; + use runtime::{logger::CoreLogger, Logger}; /// Represents an Ethereum blockchain event feed. pub struct EthFeed { @@ -49,12 +52,15 @@ impl EthFeed { pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let client = Arc::new(&self.eth_service); - let last_block = client - .get_block(BlockNumber::Latest) - .await? - .unwrap() - .number - .unwrap(); + let last_block = match client.get_block(BlockNumber::Latest).await { + Ok(Some(block)) => block.number.unwrap(), + Ok(None) => { + return Err( + IOError::Other("Failed to fetch the latest block number".to_string()).into(), + ) + } + Err(err) => return Err(IOError::Other(err.to_string()).into()), + }; let events = self.events.iter().map(|e| e.0.clone()); diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index 2a5f346..db8a5a2 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -1,4 +1,5 @@ use super::*; +use crate::error::IOError; use runtime::{logger::CoreLogger, Logger}; /// Represents the icon blockchain event feed which contains the endpoint and the filters. @@ -60,8 +61,12 @@ impl IconFeed { let mut score_filter = false; if !self.events.is_empty() || !self.score.is_empty() { - let tx_hash: String = - serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap(); + let tx_hash: String = serde_json::from_value( + transaction + .get("txHash") + .ok_or_else(|| anyhow::anyhow!("Transaction hash not found in transaction"))? + .clone(), + )?; self.logger.info(&format!( "Filtering the events with the tx_hash : {:?}", tx_hash @@ -139,18 +144,30 @@ impl IconFeed { } }; - let transactions: Vec = serde_json::from_value( - block - .get("result") - .and_then(|val| val.get("confirmed_transaction_list")) - .unwrap() - .clone(), - )?; + let transactions = match block + .get("result") + .and_then(|val| val.get("confirmed_transaction_list")) + .cloned() + { + Some(transactions) => transactions, + None => { + return Err(IOError::Other(format!( + "No taransactions found in block {}", + latest_height + )) + .into()); + } + }; + + let transactions: Vec = serde_json::from_value(transactions) + .map_err(|err| IOError::Other(format!("Error in transacation: {}", err)))?; let mut filtered_tx = Vec::::new(); for transaction in transactions { - if self.filter(&transaction).await? { + if self.filter(&transaction).await.map_err(|err| { + IOError::Other(format!("Error filtering transaction: {}", err)) + })? { filtered_tx.push(transaction); } } @@ -171,7 +188,16 @@ impl IconFeed { sleep(Duration::from_secs(1)); } - latest_height = get_icon_block_height(&self.icon_service).await?; + latest_height = match get_icon_block_height(&self.icon_service).await { + Ok(height) => height, + Err(err) => { + return Err(IOError::Other(format!( + "Error getting latest block height: {}", + err + )) + .into()); + } + } } } } diff --git a/event_feed/src/lib.rs b/event_feed/src/lib.rs index ae2915c..34031e2 100644 --- a/event_feed/src/lib.rs +++ b/event_feed/src/lib.rs @@ -1,18 +1,18 @@ - pub mod common; pub mod cosmos; +pub mod error; pub mod eth; pub mod icon; pub mod substrate; -/// can use this library to fetch events from multiple chain like +/// can use this library to fetch events from multiple chain like /// - substrate /// - icon /// - eth /// - cosmos -/// +/// /// use event_feed::*; -/// +/// /// #[tokio::main] /// async fn main() { /// dotenv::dotenv().ok(); @@ -59,12 +59,10 @@ pub mod substrate; /// }) /// .await; /// } -/// +/// pub use common::*; pub use cosmos::*; +pub use error::*; pub use eth::*; pub use icon::*; pub use substrate::*; - - - diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index c357e34..11c6761 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -1,10 +1,11 @@ +use clap::{Parser, Subcommand}; use event_feed::{ - ChainConfig, Context, CosmosFeed, EthFeed, IconFeed, PolkadotFeed, Producer, ProducerConfig, + ChainConfig, Context, CosmosFeed, EthFeed, IOError, IconFeed, PolkadotFeed, Producer, + ProducerConfig, }; +use runtime::{logger::CoreLogger, Logger}; use std::{fs, path}; use subxt::PolkadotConfig; -use runtime::{logger::CoreLogger, Logger}; -use clap::{Parser, Subcommand}; #[derive(Parser, Debug, Clone)] #[command(author, version, about = "Async CLI Example")] @@ -36,30 +37,42 @@ async fn main() -> Result<(), Box> { let (chain_config, producer_config) = match args.clone().config { Some(path) => { let read_data = fs::read(path::Path::new(&path))?; - let chain_config: ChainConfig = serde_json::from_slice(&read_data).unwrap(); - let producer_config: ProducerConfig = - serde_json::from_slice(&read_data.clone()).unwrap(); + let chain_config: ChainConfig = + serde_json::from_slice(&read_data).map_err(|e| IOError::Anyhow(e.into()))?; + let producer_config: ProducerConfig = serde_json::from_slice(&read_data.clone()) + .map_err(|e| IOError::Anyhow(e.into()))?; (chain_config, producer_config) } None => (ChainConfig::default(), ProducerConfig::default()), }; - let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); + let mut ssb_client = Producer::new(producer_config.clone()) + .await + .map_err(|e| IOError::Anyhow(e))?; logger.info("SSB client created"); - ssb_client.accept_invite().await.unwrap(); + ssb_client + .accept_invite() + .await + .map_err(|e| IOError::Anyhow(e))?; let feed = match args.command { Commands::Substrate => { - let polkadot_client = PolkadotFeed::::new(chain_config.clone(), logger.clone()) - .await - .unwrap(); + let polkadot_client = + PolkadotFeed::::new(chain_config.clone(), logger.clone()) + .await + .map_err(|e| { + IOError::Other(format!("Error initializing POLKADOT client : {}", e)) + })?; Context::PolkadotFeed(polkadot_client) } Commands::Icon => { - let icon_client = IconFeed::new(chain_config.clone(), logger.clone()).unwrap(); + let icon_client = IconFeed::new(chain_config.clone(), logger.clone()) + .map_err(|e| IOError::Other(format!("Error initializing ICON client: {}", e)))?; Context::IconFeed(icon_client) } Commands::Eth => { - let eth_client = EthFeed::new(chain_config.clone(), logger.clone()).await.unwrap(); + let eth_client = EthFeed::new(chain_config.clone(), logger.clone()) + .await + .map_err(|e| IOError::Other(format!("Error initializing ETH client: {}", e)))?; Context::EthFeed(eth_client) } Commands::Cosmos => { @@ -69,7 +82,7 @@ async fn main() -> Result<(), Box> { }; let (tx, rx) = std::sync::mpsc::channel::>(); - + tokio::spawn(async move { while let Ok(logs) = rx.recv() { for log in logs { @@ -77,7 +90,7 @@ async fn main() -> Result<(), Box> { } } }); - + let _ = feed .feed_events(&|e| { tx.send(e).unwrap(); diff --git a/event_feed/src/substrate/feeder.rs b/event_feed/src/substrate/feeder.rs index 9d4d0af..d257411 100644 --- a/event_feed/src/substrate/feeder.rs +++ b/event_feed/src/substrate/feeder.rs @@ -1,5 +1,8 @@ use super::*; use crate::common::ChainConfig; +use crate::error::IOError; +use anyhow::*; +use core::result::Result::Ok; use runtime::{logger::CoreLogger, Logger}; #[derive(Debug, Clone)] @@ -48,23 +51,57 @@ impl PolkadotFeed { /// /// * `cb`: A callback function to return the events to the caller. pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { - let mut blocks_sub = self.client.blocks().subscribe_finalized().await?; + let mut blocks_sub = self + .client + .blocks() + .subscribe_finalized() + .await + .map_err(|err| IOError::Subxt(err))?; // For each block, print a bunch of information about it: loop { if let Some(block) = blocks_sub.next().await { - let block = block.unwrap(); + let block = block.map_err(IOError::Subxt)?; let mut fetched_events = Vec::new(); - let extrinsics = block.extrinsics().await.unwrap(); + let extrinsics = block + .extrinsics() + .await + .map_err(|err| IOError::Subxt(err))?; for ext in extrinsics.iter() { - let ext = ext.unwrap(); - let events = ext.events().await.unwrap(); - let event_details = events.iter().collect::, _>>().unwrap(); + let ext = match ext { + Ok(ext) => ext, + Err(err) => return Err(IOError::Subxt(err).into()), + }; + let events = match ext.events().await { + Ok(events) => events, + Err(err) => { + return Err( + IOError::Other(format!("Error fetching events: {}", err)).into() + ) + } + }; + let event_details = match events.iter().collect::, _>>() { + Ok(event_details) => event_details, + Err(err) => { + return Err( + IOError::Other(format!("Error collecting events: {}", err)).into() + ) + } + }; let filter = self.split_filter(); for event in event_details.iter() { - let s = event.field_values().unwrap(); + let s = match event.field_values() { + Ok(data) => data, + Err(err) => { + return Err(IOError::Other(format!( + "Error getting event field values: {}", + err + )) + .into()) + } + }; let data = format!("{}", s).replace("((", "[").replace("))", "]"); let pallet_name = event.pallet_name().to_lowercase(); @@ -88,7 +125,10 @@ impl PolkadotFeed { method: event.variant_name().to_string(), field_value: data, }; - let serialize_event = serde_json::to_value(&decode_event)?; + let serialize_event = + serde_json::to_value(&decode_event).map_err(|err| { + IOError::Other(format!("Serialising error {}", err)) + })?; fetched_events.push(serialize_event); } } diff --git a/event_feed/src/substrate/mod.rs b/event_feed/src/substrate/mod.rs index 696b61b..fb109ba 100644 --- a/event_feed/src/substrate/mod.rs +++ b/event_feed/src/substrate/mod.rs @@ -4,7 +4,6 @@ pub mod feeder; pub use feeder::*; use serde::{Deserialize, Serialize}; use subxt::{OnlineClient, PolkadotConfig}; -use anyhow::Result; #[subxt::subxt(runtime_metadata_path = "./src/common/utils/polkadot_metadata_full.scale")] pub mod polkadot {}