Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions migrations/2025-07-12-121135_create_servers_table/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,14 @@ CREATE TABLE mempool_inputs (
input_vout INTEGER NOT NULL,
FOREIGN KEY(txid) REFERENCES mempool_tx(txid),
FOREIGN KEY(input_txid, input_vout) REFERENCES utxos(txid, vout)
);

CREATE TABLE utxo_subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
txid TEXT NOT NULL,
vout INTEGER NOT NULL,
client_id TEXT NOT NULL,
subscribed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
active BOOLEAN NOT NULL DEFAULT true,
FOREIGN KEY(txid, vout) REFERENCES utxos(txid, vout)
);
22 changes: 22 additions & 0 deletions src/db/db_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::db::model::MempoolTx;
use crate::subscription::SubscriptionManager;
use diesel::RunQueryDsl;
use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl, SqliteConnection, r2d2::ConnectionManager};
use r2d2::Pool;
Expand All @@ -20,6 +21,7 @@ pub async fn run(
) {
let mut servers: HashMap<String, ServerInfo> = HashMap::new();
let mut conn = pool.get().unwrap();
let mut subscription_manager = SubscriptionManager::new();
info!("DB manager started");
while let Some(request) = rx.recv().await {
match request {
Expand Down Expand Up @@ -64,6 +66,26 @@ pub async fn run(

let _ = resp_tx.send(mempool_tx).await;
}
DbRequest::AddSubscription(outpoint, client_id, notification_tx) => {
info!("adding new subscription {client_id} for UTXO {outpoint:?} ");
subscription_manager.add_subscription(outpoint, client_id, notification_tx);
}
DbRequest::RemoveSubscription(outpoint, client_id) => {
info!("removing subscription (client_id: {client_id}) from UTXO {outpoint:?}");
subscription_manager.remove_subscription(outpoint, client_id);
}
DbRequest::GetSubscriptions(outpoint, resp_tx) => {
let count = subscription_manager.get_subscription_count(&outpoint);
info!("Queried subscriptions for {:?}: {} active", outpoint, count);

let _ = resp_tx
.send(subscription_manager.get_subscriptions(outpoint))
.await;
}
DbRequest::NotifyUtxoSpent(outpoint, notification) => {
info!("Processing UTXO spent notification for {:?}", outpoint);
subscription_manager.notify(outpoint, notification);
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/db/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,16 @@ pub struct MempoolInput {
pub input_txid: String,
pub input_vout: i32,
}

#[derive(Queryable, Insertable, Debug, Serialize, Deserialize)]
#[diesel(table_name = crate::db::schema::utxo_subscriptions)]
pub struct UtxoSubscription {
pub id: i32,
pub txid: String,
pub vout: i32,
pub client_id: String,
pub subscribed_at: chrono::NaiveDateTime,
pub active: bool,
}


19 changes: 18 additions & 1 deletion src/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ diesel::table! {
}
}

diesel::table! {
utxo_subscriptions (id) {
id -> Nullable<Integer>,
txid -> Text,
vout -> Integer,
client_id -> Text,
subscribed_at -> Timestamp,
active -> Bool,
}
}

diesel::table! {
utxos (txid, vout) {
txid -> Text,
Expand All @@ -39,4 +50,10 @@ diesel::table! {

diesel::joinable!(mempool_inputs -> mempool_tx (txid));

diesel::allow_tables_to_appear_in_same_query!(mempool_inputs, mempool_tx, servers, utxos,);
diesel::allow_tables_to_appear_in_same_query!(
mempool_inputs,
mempool_tx,
servers,
utxo_subscriptions,
utxos,
);
4 changes: 2 additions & 2 deletions src/indexer/tracker_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ pub async fn run(
loop {
let blockchain_info = handle_result!(status_tx, client.get_blockchain_info());
let tip_height = blockchain_info.blocks + 1;
utxo_indexer.process_mempool();
utxo_indexer.process_mempool(&db_tx);

for height in last_tip..tip_height {
utxo_indexer.process_block(height);
utxo_indexer.process_block(height, &db_tx);
let block_hash = handle_result!(status_tx, client.get_block_hash(height));
let block = handle_result!(status_tx, client.get_block(block_hash));
for tx in block.txdata {
Expand Down
36 changes: 32 additions & 4 deletions src/indexer/utxo_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use crate::db::model::{MempoolInput, MempoolTx, Utxo};
use crate::db::schema::{mempool_inputs, mempool_tx, utxos};
use crate::indexer::rpc::BitcoinRpc;
use crate::types::{DbRequest, UtxoSpentNotification};
use chrono::NaiveDateTime;
use diesel::SqliteConnection;
use diesel::prelude::*;
Expand All @@ -19,7 +20,7 @@ impl<'a> Indexer<'a> {
Self { conn, rpc }
}

pub fn process_mempool(&mut self) {
pub fn process_mempool(&mut self, db_tx: &tokio::sync::mpsc::Sender<DbRequest>) {
let txids = self.rpc.get_raw_mempool().unwrap();
let mut conn = self.conn.get().expect("Failed to get DB connection");

Expand All @@ -28,7 +29,7 @@ impl<'a> Indexer<'a> {

self.insert_mempool_tx(&txid.to_string());

for input in &tx.input {
for (input_idx, input) in tx.input.iter().enumerate() {
let prevout = &input.previous_output;
diesel::insert_into(mempool_inputs::table)
.values(&MempoolInput {
Expand All @@ -39,6 +40,19 @@ impl<'a> Indexer<'a> {
.execute(&mut conn)
.unwrap();

let notification = UtxoSpentNotification {
watched_outpoint: *prevout,
spending_txid: txid.to_string(),
spending_input_index: input_idx as u32,
block_height: None,
confirmed: false,
timestamp: chrono::Utc::now().naive_utc(),
};

if let Err(e) = db_tx.try_send(DbRequest::NotifyUtxoSpent(*prevout, notification)) {
tracing::warn!("Failed to send UTXO spent notification: {:?}", e);
}

self.mark_utxo_spent(
&txid.to_string(),
prevout.vout as i32,
Expand Down Expand Up @@ -66,14 +80,28 @@ impl<'a> Indexer<'a> {
}
}

pub fn process_block(&mut self, height: u64) {
pub fn process_block(&mut self, height: u64, db_tx: &tokio::sync::mpsc::Sender<DbRequest>) {
let block_hash = self.rpc.get_block_hash(height).unwrap();
let block = self.rpc.get_block(block_hash).unwrap();
let mut conn = self.conn.get().expect("Failed to get DB connection");

for tx in block.txdata.iter() {
for input in &tx.input {
for (input_idx, input) in tx.input.iter().enumerate() {
let prevout = &input.previous_output;

let notification = UtxoSpentNotification {
watched_outpoint: *prevout,
spending_txid: tx.compute_txid().to_string(),
spending_input_index: input_idx as u32,
block_height: Some(height as u32),
confirmed: true,
timestamp: chrono::Utc::now().naive_utc(),
};

if let Err(e) = db_tx.try_send(DbRequest::NotifyUtxoSpent(*prevout, notification)) {
tracing::warn!("Failed to send UTXO spent notification: {:?}", e);
}

self.mark_utxo_spent(
&prevout.txid.to_string(),
prevout.vout as i32,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod status;
mod tor;
mod types;
mod utils;
mod subscription;

use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
Expand Down
87 changes: 83 additions & 4 deletions src/server/tracker_server.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use std::sync::Arc;

use crate::db::model::MempoolTx;
use crate::server::tracker_monitor::monitor_systems;
use crate::status;
use crate::types::DbRequest;
use crate::types::TrackerClientToServer;
use crate::types::TrackerServerToClient;
use crate::types::UtxoSpentNotification;
use crate::utils::read_message;
use crate::utils::send_message;
use tokio::io::BufReader;
use tokio::io::BufWriter;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tracing::error;
Expand Down Expand Up @@ -47,12 +51,31 @@ pub async fn run(

Ok(())
}
async fn handle_client(mut stream: TcpStream, db_tx: Sender<DbRequest>) {
let (read_half, write_half) = stream.split();
let mut reader = BufReader::new(read_half);
let mut writer = BufWriter::new(write_half);

async fn handle_client(stream: TcpStream, db_tx: Sender<DbRequest>) {
let (notification_tx, mut notification_rx) = mpsc::channel::<UtxoSpentNotification>(100);

let stream = Arc::new(Mutex::new(stream));
let stream_clone = stream.clone();

tokio::spawn(async move {
while let Some(notification) = notification_rx.recv().await {
let message = TrackerServerToClient::UtxoSpent(notification);
let mut stream_guard = stream_clone.lock().await;
let (_, write_half) = (*stream_guard).split();
let mut writer = BufWriter::new(write_half);
if let Err(e) = send_message(&mut writer, &message).await {
error!("Failed to send notification to client: {}", e);
break;
}
}
});

loop {
let mut stream_guard = stream.lock().await;
let (read_half, write_half) = stream_guard.split();
let mut reader = BufReader::new(read_half);
let mut writer = BufWriter::new(write_half);
let buffer = match read_message(&mut reader).await {
Ok(buf) => buf,
Err(e) if e.io_error_kind() == Some(std::io::ErrorKind::UnexpectedEof) => {
Expand Down Expand Up @@ -126,6 +149,62 @@ async fn handle_client(mut stream: TcpStream, db_tx: Sender<DbRequest>) {
}
}
}

TrackerClientToServer::Subscribe {
outpoint,
client_id,
} => {
info!("Client {} subscribing to UTXO {:?}", client_id, outpoint);

let db_request = DbRequest::AddSubscription(
outpoint,
client_id.clone(),
notification_tx.clone(),
);

if let Err(e) = db_tx.send(db_request).await {
error!("Failed to send subscription request to DB: {}", e);
break;
}

let confirmation = TrackerServerToClient::SubscriptionConfirmed { outpoint };
if let Err(e) = send_message(&mut writer, &confirmation).await {
error!("Failed to send subscription confirmation: {}", e);
break;
}
}

TrackerClientToServer::Unsubscribe {
outpoint,
client_id,
} => {
info!(
"Client {} unsubscribing from UTXO {:?}",
client_id, outpoint
);

let db_request = DbRequest::RemoveSubscription(outpoint, client_id.clone());

if let Err(e) = db_tx.send(db_request).await {
error!("Failed to send unsubscribe request to DB: {}", e);
break;
}

// Send confirmation
let confirmation = TrackerServerToClient::SubscriptionRemoved { outpoint };
if let Err(e) = send_message(&mut writer, &confirmation).await {
error!("Failed to send unsubscription confirmation: {}", e);
break;
}
}

TrackerClientToServer::Heartbeat => {
let heartbeat = TrackerServerToClient::HeartbeatAck;
if let Err(e) = send_message(&mut writer, &heartbeat).await {
error!("Failed to send heartbeat ack: {}", e);
break;
}
}
}
}

Expand Down
Loading