diff --git a/migrations/2025-07-12-121135_create_servers_table/up.sql b/migrations/2025-07-12-121135_create_servers_table/up.sql index cc063d7..b518388 100644 --- a/migrations/2025-07-12-121135_create_servers_table/up.sql +++ b/migrations/2025-07-12-121135_create_servers_table/up.sql @@ -29,4 +29,14 @@ CREATE TABLE mempool_inputs ( input_vout INTEGER NOT NULL, FOREIGN KEY(txid) REFERENCES mempool_tx(txid), FOREIGN KEY(input_txid, input_vout) REFERENCES utxos(txid, vout) +); + +CREATE TABLE utxo_subscriptions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + txid TEXT NOT NULL, + vout INTEGER NOT NULL, + client_id TEXT NOT NULL, + subscribed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + active BOOLEAN NOT NULL DEFAULT true, + FOREIGN KEY(txid, vout) REFERENCES utxos(txid, vout) ); \ No newline at end of file diff --git a/src/db/db_manager.rs b/src/db/db_manager.rs index f69c5ec..4553fb7 100644 --- a/src/db/db_manager.rs +++ b/src/db/db_manager.rs @@ -1,4 +1,5 @@ use crate::db::model::MempoolTx; +use crate::subscription::SubscriptionManager; use diesel::RunQueryDsl; use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl, SqliteConnection, r2d2::ConnectionManager}; use r2d2::Pool; @@ -20,6 +21,7 @@ pub async fn run( ) { let mut servers: HashMap = HashMap::new(); let mut conn = pool.get().unwrap(); + let mut subscription_manager = SubscriptionManager::new(); info!("DB manager started"); while let Some(request) = rx.recv().await { match request { @@ -64,6 +66,26 @@ pub async fn run( let _ = resp_tx.send(mempool_tx).await; } + DbRequest::AddSubscription(outpoint, client_id, notification_tx) => { + info!("adding new subscription {client_id} for UTXO {outpoint:?} "); + subscription_manager.add_subscription(outpoint, client_id, notification_tx); + } + DbRequest::RemoveSubscription(outpoint, client_id) => { + info!("removing subscription (client_id: {client_id}) from UTXO {outpoint:?}"); + subscription_manager.remove_subscription(outpoint, client_id); + } + DbRequest::GetSubscriptions(outpoint, resp_tx) => { + let count = subscription_manager.get_subscription_count(&outpoint); + info!("Queried subscriptions for {:?}: {} active", outpoint, count); + + let _ = resp_tx + .send(subscription_manager.get_subscriptions(outpoint)) + .await; + } + DbRequest::NotifyUtxoSpent(outpoint, notification) => { + info!("Processing UTXO spent notification for {:?}", outpoint); + subscription_manager.notify(outpoint, notification); + } } } diff --git a/src/db/model.rs b/src/db/model.rs index db357f3..a9cbbaa 100644 --- a/src/db/model.rs +++ b/src/db/model.rs @@ -36,3 +36,16 @@ pub struct MempoolInput { pub input_txid: String, pub input_vout: i32, } + +#[derive(Queryable, Insertable, Debug, Serialize, Deserialize)] +#[diesel(table_name = crate::db::schema::utxo_subscriptions)] +pub struct UtxoSubscription { + pub id: i32, + pub txid: String, + pub vout: i32, + pub client_id: String, + pub subscribed_at: chrono::NaiveDateTime, + pub active: bool, +} + + diff --git a/src/db/schema.rs b/src/db/schema.rs index 8aab1d9..d7ec032 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -24,6 +24,17 @@ diesel::table! { } } +diesel::table! { + utxo_subscriptions (id) { + id -> Nullable, + txid -> Text, + vout -> Integer, + client_id -> Text, + subscribed_at -> Timestamp, + active -> Bool, + } +} + diesel::table! { utxos (txid, vout) { txid -> Text, @@ -39,4 +50,10 @@ diesel::table! { diesel::joinable!(mempool_inputs -> mempool_tx (txid)); -diesel::allow_tables_to_appear_in_same_query!(mempool_inputs, mempool_tx, servers, utxos,); +diesel::allow_tables_to_appear_in_same_query!( + mempool_inputs, + mempool_tx, + servers, + utxo_subscriptions, + utxos, +); diff --git a/src/indexer/tracker_indexer.rs b/src/indexer/tracker_indexer.rs index 7874018..f306995 100644 --- a/src/indexer/tracker_indexer.rs +++ b/src/indexer/tracker_indexer.rs @@ -28,10 +28,10 @@ pub async fn run( loop { let blockchain_info = handle_result!(status_tx, client.get_blockchain_info()); let tip_height = blockchain_info.blocks + 1; - utxo_indexer.process_mempool(); + utxo_indexer.process_mempool(&db_tx); for height in last_tip..tip_height { - utxo_indexer.process_block(height); + utxo_indexer.process_block(height, &db_tx); let block_hash = handle_result!(status_tx, client.get_block_hash(height)); let block = handle_result!(status_tx, client.get_block(block_hash)); for tx in block.txdata { diff --git a/src/indexer/utxo_indexer.rs b/src/indexer/utxo_indexer.rs index c28beb5..322e96e 100644 --- a/src/indexer/utxo_indexer.rs +++ b/src/indexer/utxo_indexer.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::db::model::{MempoolInput, MempoolTx, Utxo}; use crate::db::schema::{mempool_inputs, mempool_tx, utxos}; use crate::indexer::rpc::BitcoinRpc; +use crate::types::{DbRequest, UtxoSpentNotification}; use chrono::NaiveDateTime; use diesel::SqliteConnection; use diesel::prelude::*; @@ -19,7 +20,7 @@ impl<'a> Indexer<'a> { Self { conn, rpc } } - pub fn process_mempool(&mut self) { + pub fn process_mempool(&mut self, db_tx: &tokio::sync::mpsc::Sender) { let txids = self.rpc.get_raw_mempool().unwrap(); let mut conn = self.conn.get().expect("Failed to get DB connection"); @@ -28,7 +29,7 @@ impl<'a> Indexer<'a> { self.insert_mempool_tx(&txid.to_string()); - for input in &tx.input { + for (input_idx, input) in tx.input.iter().enumerate() { let prevout = &input.previous_output; diesel::insert_into(mempool_inputs::table) .values(&MempoolInput { @@ -39,6 +40,19 @@ impl<'a> Indexer<'a> { .execute(&mut conn) .unwrap(); + let notification = UtxoSpentNotification { + watched_outpoint: *prevout, + spending_txid: txid.to_string(), + spending_input_index: input_idx as u32, + block_height: None, + confirmed: false, + timestamp: chrono::Utc::now().naive_utc(), + }; + + if let Err(e) = db_tx.try_send(DbRequest::NotifyUtxoSpent(*prevout, notification)) { + tracing::warn!("Failed to send UTXO spent notification: {:?}", e); + } + self.mark_utxo_spent( &txid.to_string(), prevout.vout as i32, @@ -66,14 +80,28 @@ impl<'a> Indexer<'a> { } } - pub fn process_block(&mut self, height: u64) { + pub fn process_block(&mut self, height: u64, db_tx: &tokio::sync::mpsc::Sender) { let block_hash = self.rpc.get_block_hash(height).unwrap(); let block = self.rpc.get_block(block_hash).unwrap(); let mut conn = self.conn.get().expect("Failed to get DB connection"); for tx in block.txdata.iter() { - for input in &tx.input { + for (input_idx, input) in tx.input.iter().enumerate() { let prevout = &input.previous_output; + + let notification = UtxoSpentNotification { + watched_outpoint: *prevout, + spending_txid: tx.compute_txid().to_string(), + spending_input_index: input_idx as u32, + block_height: Some(height as u32), + confirmed: true, + timestamp: chrono::Utc::now().naive_utc(), + }; + + if let Err(e) = db_tx.try_send(DbRequest::NotifyUtxoSpent(*prevout, notification)) { + tracing::warn!("Failed to send UTXO spent notification: {:?}", e); + } + self.mark_utxo_spent( &prevout.txid.to_string(), prevout.vout as i32, diff --git a/src/lib.rs b/src/lib.rs index b201f7e..1f19d89 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ mod status; mod tor; mod types; mod utils; +mod subscription; use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); diff --git a/src/server/tracker_server.rs b/src/server/tracker_server.rs index 51e92eb..ab4d27d 100644 --- a/src/server/tracker_server.rs +++ b/src/server/tracker_server.rs @@ -1,15 +1,19 @@ +use std::sync::Arc; + use crate::db::model::MempoolTx; use crate::server::tracker_monitor::monitor_systems; use crate::status; use crate::types::DbRequest; use crate::types::TrackerClientToServer; use crate::types::TrackerServerToClient; +use crate::types::UtxoSpentNotification; use crate::utils::read_message; use crate::utils::send_message; use tokio::io::BufReader; use tokio::io::BufWriter; use tokio::net::TcpListener; use tokio::net::TcpStream; +use tokio::sync::Mutex; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tracing::error; @@ -47,12 +51,31 @@ pub async fn run( Ok(()) } -async fn handle_client(mut stream: TcpStream, db_tx: Sender) { - let (read_half, write_half) = stream.split(); - let mut reader = BufReader::new(read_half); - let mut writer = BufWriter::new(write_half); + +async fn handle_client(stream: TcpStream, db_tx: Sender) { + let (notification_tx, mut notification_rx) = mpsc::channel::(100); + + let stream = Arc::new(Mutex::new(stream)); + let stream_clone = stream.clone(); + + tokio::spawn(async move { + while let Some(notification) = notification_rx.recv().await { + let message = TrackerServerToClient::UtxoSpent(notification); + let mut stream_guard = stream_clone.lock().await; + let (_, write_half) = (*stream_guard).split(); + let mut writer = BufWriter::new(write_half); + if let Err(e) = send_message(&mut writer, &message).await { + error!("Failed to send notification to client: {}", e); + break; + } + } + }); loop { + let mut stream_guard = stream.lock().await; + let (read_half, write_half) = stream_guard.split(); + let mut reader = BufReader::new(read_half); + let mut writer = BufWriter::new(write_half); let buffer = match read_message(&mut reader).await { Ok(buf) => buf, Err(e) if e.io_error_kind() == Some(std::io::ErrorKind::UnexpectedEof) => { @@ -126,6 +149,62 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender) { } } } + + TrackerClientToServer::Subscribe { + outpoint, + client_id, + } => { + info!("Client {} subscribing to UTXO {:?}", client_id, outpoint); + + let db_request = DbRequest::AddSubscription( + outpoint, + client_id.clone(), + notification_tx.clone(), + ); + + if let Err(e) = db_tx.send(db_request).await { + error!("Failed to send subscription request to DB: {}", e); + break; + } + + let confirmation = TrackerServerToClient::SubscriptionConfirmed { outpoint }; + if let Err(e) = send_message(&mut writer, &confirmation).await { + error!("Failed to send subscription confirmation: {}", e); + break; + } + } + + TrackerClientToServer::Unsubscribe { + outpoint, + client_id, + } => { + info!( + "Client {} unsubscribing from UTXO {:?}", + client_id, outpoint + ); + + let db_request = DbRequest::RemoveSubscription(outpoint, client_id.clone()); + + if let Err(e) = db_tx.send(db_request).await { + error!("Failed to send unsubscribe request to DB: {}", e); + break; + } + + // Send confirmation + let confirmation = TrackerServerToClient::SubscriptionRemoved { outpoint }; + if let Err(e) = send_message(&mut writer, &confirmation).await { + error!("Failed to send unsubscription confirmation: {}", e); + break; + } + } + + TrackerClientToServer::Heartbeat => { + let heartbeat = TrackerServerToClient::HeartbeatAck; + if let Err(e) = send_message(&mut writer, &heartbeat).await { + error!("Failed to send heartbeat ack: {}", e); + break; + } + } } } diff --git a/src/subscription/mod.rs b/src/subscription/mod.rs new file mode 100644 index 0000000..09e0486 --- /dev/null +++ b/src/subscription/mod.rs @@ -0,0 +1,113 @@ +use std::collections::HashMap; + +use bitcoincore_rpc::bitcoin::OutPoint; +use tokio::sync::mpsc; +use tracing::{info, warn}; + +use crate::types::{SubscriptionInfo, UtxoSpentNotification}; + +pub struct SubscriptionManager { + subscriptions: HashMap>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self { + subscriptions: HashMap::new(), + } + } + + pub fn add_subscription( + &mut self, + outpoint: OutPoint, + client_id: String, + notification_tx: mpsc::Sender, + ) { + let sub = SubscriptionInfo { + client_id: client_id.clone(), + outpoint, + subscribed_at: tokio::time::Instant::now(), + connection_tx: notification_tx, + }; + + self.subscriptions + .entry(outpoint) + .or_insert_with(Default::default) + .push(sub); + + info!( + "add subscription for client {} to UTXO {:?}", + client_id, outpoint + ); + } + + pub fn remove_subscription(&mut self, outpoint: OutPoint, client_id: String) -> bool { + if let Some(subscribers) = self.subscriptions.clone().get_mut(&outpoint) { + let init_len = subscribers.len(); + subscribers.retain(|sub| sub.client_id != client_id); + + if subscribers.is_empty() { + self.subscriptions.remove(&outpoint); + } + + let is_removed = init_len != subscribers.len(); + if is_removed { + info!( + "removed subscription for client {} from UTXO {:?}", + client_id, outpoint + ); + } + return is_removed; + } + false + } + + pub async fn notify(&mut self, outpoint: OutPoint, notification: UtxoSpentNotification) { + if let Some(subscribers) = self.subscriptions.get_mut(&outpoint) { + info!( + "Notifying {} subscribers about UTXO {:?} being spent...", + subscribers.len(), + outpoint + ); + + let mut failed_clients = vec![]; + + for sub in subscribers.iter() { + match sub.connection_tx.try_send(notification.clone()) { + Ok(_) => { + info!("notified client {} about UTXO spending", sub.client_id); + } + + Err(e) => { + warn!("Failed to notify client {}: {:?}", sub.client_id, e); + failed_clients.push(sub.client_id.clone()); + } + } + } + + for failed_client in failed_clients { + self.remove_subscription(outpoint, failed_client); + } + } + } + + pub fn get_subscription_count(&self, outpoint: &OutPoint) -> usize { + self.subscriptions + .get(outpoint) + .map_or(0, |subscriptions| subscriptions.len()) + } + + pub fn get_total_subscriptions(&self) -> usize { + self.subscriptions + .values() + .map(|subscriptions| subscriptions.len()) + .sum() + } + + pub fn get_subscriptions(&self, outpoint: OutPoint) -> Vec { + self.subscriptions + .get(&outpoint) + .unwrap_or(&Vec::new()) + .to_vec() + } +} diff --git a/src/types.rs b/src/types.rs index 13fbd26..bb2bbd5 100644 --- a/src/types.rs +++ b/src/types.rs @@ -21,6 +21,14 @@ pub enum DbRequest { QueryAll(Sender>), QueryActive(Sender>), WatchUtxo(OutPoint, Sender>), + AddSubscription( + OutPoint, + String, + tokio::sync::mpsc::Sender, + ), + RemoveSubscription(OutPoint, String), + GetSubscriptions(OutPoint, tokio::sync::mpsc::Sender>), + NotifyUtxoSpent(OutPoint, UtxoSpentNotification), } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Hash)] @@ -55,6 +63,24 @@ pub struct DnsMetadata { pub proof: FidelityProof, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct UtxoSpentNotification { + pub watched_outpoint: OutPoint, + pub spending_txid: String, + pub spending_input_index: u32, + pub block_height: Option, + pub confirmed: bool, + pub timestamp: chrono::NaiveDateTime, +} + +#[derive(Clone, Debug)] +pub struct SubscriptionInfo { + pub client_id: String, + pub outpoint: OutPoint, + pub subscribed_at: Instant, + pub connection_tx: tokio::sync::mpsc::Sender, +} + #[derive(Serialize, Deserialize, Debug)] #[allow(clippy::large_enum_variant)] pub enum TrackerClientToServer { @@ -72,6 +98,20 @@ pub enum TrackerClientToServer { Watch { outpoint: OutPoint, }, + /// Subscribe to UTXO spending notifications + Subscribe { + outpoint: OutPoint, + client_id: String, + }, + + /// Unsubscribe from UTXO not/ifications + Unsubscribe { + outpoint: OutPoint, + client_id: String, + }, + + /// Keep connection alive + Heartbeat, } #[derive(Serialize, Deserialize, Debug)] @@ -79,4 +119,11 @@ pub enum TrackerServerToClient { Address { addresses: Vec }, Ping { address: String, port: u16 }, WatchResponse { mempool_tx: Vec }, + UtxoSpent(UtxoSpentNotification), + + SubscriptionConfirmed { outpoint: OutPoint }, + + SubscriptionRemoved { outpoint: OutPoint }, + + HeartbeatAck, }