diff --git a/crates/floresta-electrum/src/electrum_protocol.rs b/crates/floresta-electrum/src/electrum_protocol.rs index caad13721..44109f42b 100644 --- a/crates/floresta-electrum/src/electrum_protocol.rs +++ b/crates/floresta-electrum/src/electrum_protocol.rs @@ -42,6 +42,7 @@ use tracing::error; use tracing::info; use tracing::trace; +use crate::error::Error; use crate::get_arg; use crate::json_rpc_res; use crate::request::Request; @@ -92,22 +93,23 @@ impl TcpActor { result = lines.next_line() => { match result { Ok(Some(line)) => { - self.message_transmitter - .send(Message::Message((self.client_id, line))) - .expect("Main loop is broken"); + if let Err(e) = self.message_transmitter.send(Message::Message((self.client_id, line))) { + error!("main loop receiver dropped: {e:?}"); + break; + } } Ok(None) => { info!("Client closed connection: {}", self.client_id); - self.message_transmitter - .send(Message::Disconnect(self.client_id)) - .expect("Main loop is broken"); + if let Err(e) = self.message_transmitter.send(Message::Disconnect(self.client_id)) { + error!("main loop receiver dropped: {e:?}"); + } break; } Err(e) => { error!("Error reading from client: {e:?}"); - self.message_transmitter - .send(Message::Disconnect(self.client_id)) - .expect("Main loop is broken"); + if let Err(e) = self.message_transmitter.send(Message::Disconnect(self.client_id)) { + error!("main loop receiver dropped: {e:?}"); + } break; } } @@ -250,7 +252,7 @@ impl ElectrumServer { &mut self, client: Arc, request: Request, - ) -> Result { + ) -> Result { // Methods are in alphabetical order match request.method.as_str() { "blockchain.block.header" => { @@ -258,11 +260,11 @@ impl ElectrumServer { let hash = self .chain .get_block_hash(height as u32) - .map_err(|_| super::error::Error::InvalidParams)?; + .map_err(|_| Error::InvalidParams)?; let header = self .chain .get_block_header(&hash) - .map_err(|e| super::error::Error::Blockchain(Box::new(e)))?; + .map_err(|e| Error::Blockchain(Box::new(e)))?; let header = serialize_hex(&header); json_rpc_res!(request, header) } @@ -275,12 +277,12 @@ impl ElectrumServer { let hash = self .chain .get_block_hash(height as u32) - .map_err(|_| super::error::Error::InvalidParams)?; + .map_err(|_| Error::InvalidParams)?; let header = self .chain .get_block_header(&hash) - .map_err(|e| super::error::Error::Blockchain(Box::new(e)))?; + .map_err(|e| Error::Blockchain(Box::new(e)))?; let header = serialize_hex(&header); headers.push_str(&header); } @@ -295,11 +297,11 @@ impl ElectrumServer { let (height, hash) = self .chain .get_best_block() - .map_err(|e| super::error::Error::Blockchain(Box::new(e)))?; + .map_err(|e| Error::Blockchain(Box::new(e)))?; let header = self .chain .get_block_header(&hash) - .map_err(|e| super::error::Error::Blockchain(Box::new(e)))?; + .map_err(|e| Error::Blockchain(Box::new(e)))?; let result = json!({ "height": height, "hex": serialize_hex(&header) @@ -336,15 +338,17 @@ impl ElectrumServer { "blockchain.scripthash.get_mempool" => json_rpc_res!(request, []), "blockchain.scripthash.listunspent" => { let hash = get_arg!(request, sha256::Hash, 0); - let utxos = self.address_cache.get_address_utxos(&hash); - if utxos.is_none() { + let Some(utxos) = self.address_cache.get_address_utxos(&hash) else { return json_rpc_res!(request, []); - } + }; let mut final_utxos = Vec::new(); - for (utxo, prevout) in utxos.unwrap().into_iter() { - let height = self.address_cache.get_height(&prevout.txid).unwrap(); - - let position = self.address_cache.get_position(&prevout.txid).unwrap(); + for (utxo, prevout) in utxos.into_iter() { + let Some(height) = self.address_cache.get_height(&prevout.txid) else { + return json_rpc_res!(request, []); + }; + let Some(position) = self.address_cache.get_position(&prevout.txid) else { + return json_rpc_res!(request, []); + }; final_utxos.push(json!({ "height": height, @@ -453,10 +457,8 @@ impl ElectrumServer { // end of experimental endpoints "blockchain.transaction.broadcast" => { let tx = get_arg!(request, String, 0); - let hex: Vec<_> = - Vec::from_hex(&tx).map_err(|_| super::error::Error::InvalidParams)?; - let tx: Transaction = - deserialize(&hex).map_err(|_| super::error::Error::InvalidParams)?; + let hex = Vec::from_hex(&tx).map_err(|_| Error::InvalidParams)?; + let tx: Transaction = deserialize(&hex).map_err(|_| Error::InvalidParams)?; let txid = tx.compute_txid(); if let Err(e) = self @@ -465,7 +467,7 @@ impl ElectrumServer { .await? { error!("Could not broadcast transaction {txid} due to {e}"); - return Err(super::error::Error::Mempool(Box::new(e))); + return Err(Error::Mempool(Box::new(e))); }; let updated = self @@ -485,22 +487,22 @@ impl ElectrumServer { return json_rpc_res!(request, tx); } - Err(super::error::Error::InvalidParams) + Err(Error::InvalidParams) } "blockchain.transaction.get_merkle" => { let tx_id = get_arg!(request, Txid, 0); - let proof = self.address_cache.get_merkle_proof(&tx_id); - let height = self.address_cache.get_height(&tx_id); - if let Some(proof) = proof { - let result = json!({ - "merkle": proof.hashes, - "block_height": height.unwrap_or(0), - "pos": proof.pos - }); - return json_rpc_res!(request, result); - } - - Err(super::error::Error::InvalidParams) + let Some(proof) = self.address_cache.get_merkle_proof(&tx_id) else { + return Err(Error::InvalidParams); + }; + let Some(height) = self.address_cache.get_height(&tx_id) else { + return Err(Error::InvalidParams); + }; + let result = json!({ + "merkle": proof.hashes, + "block_height": height, + "pos": proof.pos + }); + json_rpc_res!(request, result) } //blockchain.transaction.id_from_pos // TODO: Create an actual histogram @@ -514,7 +516,7 @@ impl ElectrumServer { let genesis_hash = self .chain .get_block_hash(0) - .expect("Genesis block should be present"); + .expect("Genesis block is always in the chain store"); let res = json!( { "genesis_hash": genesis_hash, @@ -535,12 +537,15 @@ impl ElectrumServer { [format!("Floresta {}", env!("CARGO_PKG_VERSION")), "1.4"] ), - _ => Err(super::error::Error::InvalidParams), + _ => Err(Error::InvalidParams), } } - pub async fn rebroadcast_mempool_transactions(&self) { - let unconfirmed = self.address_cache.find_unconfirmed().unwrap(); + pub async fn rebroadcast_mempool_transactions(&self) -> Result<(), Error> { + let unconfirmed = self + .address_cache + .find_unconfirmed() + .map_err(|e| Error::WatchOnly(Box::new(e)))?; for tx in unconfirmed { let txid = tx.compute_txid(); if let Ok(Err(e)) = self.node_interface.broadcast_transaction(tx.clone()).await { @@ -549,6 +554,7 @@ impl ElectrumServer { debug!("Rebroadcasted transaction {txid}"); } } + Ok(()) } pub async fn main_loop(mut self) -> Result<(), crate::error::Error> { @@ -584,7 +590,7 @@ impl ElectrumServer { .unwrap_or(true); if should_rebroadcast { - self.rebroadcast_mempool_transactions().await; + self.rebroadcast_mempool_transactions().await?; self.last_rebroadcast = Some(Instant::now()); } @@ -611,10 +617,7 @@ impl ElectrumServer { /// Usually, we'll rely on compact block filters to speed things up. If /// we don't have compact block filters, we may rescan using the older, /// more bandwidth-intensive method of actually downloading blocks. - async fn rescan_for_addresses( - &mut self, - addresses: Vec, - ) -> Result<(), super::error::Error> { + async fn rescan_for_addresses(&mut self, addresses: Vec) -> Result<(), Error> { // If compact block filters are enabled, use them. Otherwise, fallback // to the "old-school" rescaning. if let Some(cfilters) = &self.block_filters { @@ -634,7 +637,7 @@ impl ElectrumServer { start_height: Option, stop_height: Option, addresses: Vec, - ) -> Result<(), super::error::Error> { + ) -> Result<(), Error> { // By default, we look from 1..tip let mut _addresses = addresses .iter() @@ -644,6 +647,7 @@ impl ElectrumServer { let Ok(blocks) = cfilters.match_any(_addresses, start_height, stop_height, self.chain.clone()) else { + error!("Could not find matching block filters"); self.addresses_to_scan.extend(addresses); // push them back to get a retry return Ok(()); }; @@ -654,16 +658,16 @@ impl ElectrumServer { for block in blocks { let block = self.node_interface.get_block(block).await; let Ok(Some(block)) = block else { + error!("Could not get block from node"); self.addresses_to_scan.extend(addresses); // push them back to get a retry return Ok(()); }; - let height = self - .chain - .get_block_height(&block.block_hash()) - .ok() - .flatten() - .unwrap(); + let Ok(Some(height)) = self.chain.get_block_height(&block.block_hash()) else { + error!("Could not get block height: {}", block.block_hash()); + self.addresses_to_scan.extend(addresses); // push them back to get a retry + return Ok(()); + }; self.handle_block(block, height); } @@ -708,10 +712,17 @@ impl ElectrumServer { self.address_cache.bump_height(height); } - if self.chain.get_height().unwrap() == height { - for client in &mut self.clients.values() { - let res = client.write(serde_json::to_string(&result).unwrap().as_bytes()); - if res.is_err() { + let height_matches = self + .chain + .get_height() + .inspect_err(|e| error!("Could not get chain height: {e:?}")) + .is_ok_and(|h| h == height); + + if height_matches { + let serialized = + serde_json::to_string(&result).expect("serde_json::Value is always serializable"); + for client in self.clients.values() { + if client.write(serialized.as_bytes()).is_err() { info!("Could not write to client {client:?}"); } } @@ -732,17 +743,17 @@ impl ElectrumServer { Message::Message((client, msg)) => { trace!("Message: {msg}"); if let Ok(req) = serde_json::from_str::(msg.as_str()) { - let client = self.clients.get(&client); - if client.is_none() { + let Some(client) = self.clients.get(&client).cloned() else { error!("Client sent a message but is not listed as client"); return Ok(()); - } - let client = client.unwrap().to_owned(); + }; let id = req.id.to_owned(); let res = self.handle_client_request(client.clone(), req).await; if let Ok(res) = res { - client.write(serde_json::to_string(&res).unwrap().as_bytes())?; + let res = serde_json::to_string(&res) + .expect("serde_json::Value is always serializable"); + client.write(res.as_bytes())?; } else { let res = json!({ "jsonrpc": "2.0", @@ -753,17 +764,17 @@ impl ElectrumServer { }, "id": id }); - client.write(serde_json::to_string(&res).unwrap().as_bytes())?; + let res = serde_json::to_string(&res) + .expect("serde_json::Value is always serializable"); + client.write(res.as_bytes())?; } } else if let Ok(requests) = serde_json::from_str::>(&msg) { let mut results = Vec::new(); for req in requests { - let client = self.clients.get(&client); - if client.is_none() { + let Some(client) = self.clients.get(&client).cloned() else { error!("Client sent a message but is not listed as client"); return Ok(()); - } - let client = client.unwrap().to_owned(); + }; let id = req.id.to_owned(); let res = self.handle_client_request(client.clone(), req).await; @@ -783,7 +794,9 @@ impl ElectrumServer { } } if let Some(client) = self.clients.get(&client) { - client.write(serde_json::to_string(&results).unwrap().as_bytes())?; + let results = serde_json::to_string(&results) + .expect("serde_json::Value is always serializable"); + client.write(results.as_bytes())?; } } else { let res = json!({ @@ -796,7 +809,9 @@ impl ElectrumServer { "id": null }); if let Some(client) = self.clients.get(&client) { - client.write(serde_json::to_string(&res).unwrap().as_bytes())?; + let res = serde_json::to_string(&res) + .expect("serde_json::Value is always serializable"); + client.write(res.as_bytes())?; } } } @@ -813,15 +828,20 @@ impl ElectrumServer { for (_, out) in transactions { let hash = get_spk_hash(&out.script_pubkey); if let Some(client) = self.client_addresses.get(&hash) { - let history = self.address_cache.get_address_history(&hash); + let Some(history) = self.address_cache.get_address_history(&hash) else { + info!("Could not get address history for {hash}"); + continue; + }; - let status_hash = get_status(history.unwrap()); + let status_hash = get_status(history); let notify = json!({ "jsonrpc": "2.0", "method": "blockchain.scripthash.subscribe", "params": [hash, status_hash] }); - if let Err(err) = client.write(serde_json::to_string(¬ify).unwrap().as_bytes()) { + let notify = serde_json::to_string(¬ify) + .expect("serde_json::Value is always serializable"); + if let Err(err) = client.write(notify.as_bytes()) { error!("{err}"); } } @@ -848,9 +868,12 @@ pub async fn client_accept_loop( tls_stream, message_transmitter.clone(), )); - message_transmitter - .send(Message::NewClient((client.client_id, client))) - .expect("Main loop is broken"); + if let Err(e) = + message_transmitter.send(Message::NewClient((client.client_id, client))) + { + error!("main loop receiver dropped: {e:?}"); + break; + } id_count += 1; } Err(e) => { @@ -859,9 +882,12 @@ pub async fn client_accept_loop( } } else { let client = Arc::new(Client::new(id_count, stream, message_transmitter.clone())); - message_transmitter - .send(Message::NewClient((client.client_id, client))) - .expect("Main loop is broken"); + if let Err(e) = + message_transmitter.send(Message::NewClient((client.client_id, client))) + { + error!("main loop receiver dropped: {e:?}"); + break; + } id_count += 1; } } @@ -922,13 +948,13 @@ macro_rules! json_rpc_res { } #[macro_export] -/// Returns and parses a value from the request json or fails with [super::error::Error::InvalidParams]. +/// Returns and parses a value from the request json or fails with [Error::InvalidParams]. macro_rules! get_arg { ($request:ident, $arg_type:ty, $idx:literal) => { if let Some(arg) = $request.params.get($idx) { serde_json::from_value::<$arg_type>(arg.clone())? } else { - return Err(super::error::Error::InvalidParams); + return Err(Error::InvalidParams); } }; } diff --git a/crates/floresta-electrum/src/error.rs b/crates/floresta-electrum/src/error.rs index 8b3276edb..c04f00107 100644 --- a/crates/floresta-electrum/src/error.rs +++ b/crates/floresta-electrum/src/error.rs @@ -20,6 +20,9 @@ pub enum Error { #[error("Mempool accept error")] Mempool(Box), + #[error("Watch-only cache error")] + WatchOnly(Box), + #[error("Node isn't working")] NodeInterface(#[from] oneshot::error::RecvError), } diff --git a/crates/floresta-electrum/src/lib.rs b/crates/floresta-electrum/src/lib.rs index 108012201..eea2c5323 100644 --- a/crates/floresta-electrum/src/lib.rs +++ b/crates/floresta-electrum/src/lib.rs @@ -7,6 +7,7 @@ html_favicon_url = "https://raw.githubusercontent.com/getfloresta/floresta-media/master/logo_png/Icon-Green(main).png" )] #![allow(clippy::manual_is_multiple_of)] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] use serde::Deserialize; use serde::Serialize;