From c0a16802aa5b74e3a4b57bed4c45d662e0418641 Mon Sep 17 00:00:00 2001 From: DaveVodrazka Date: Tue, 17 Feb 2026 11:18:55 +0100 Subject: [PATCH] fix: match indexer handlers to deepbookv3 --- crates/indexer/src/handlers/mod.rs | 154 --------- .../src/handlers/order_update_handler.rs | 302 ++++++++++++------ .../handlers/trade_params_update_handler.rs | 121 +++++-- crates/orderbook/src/handlers/mod.rs | 57 ---- 4 files changed, 292 insertions(+), 342 deletions(-) diff --git a/crates/indexer/src/handlers/mod.rs b/crates/indexer/src/handlers/mod.rs index d92d732..a6e90b2 100644 --- a/crates/indexer/src/handlers/mod.rs +++ b/crates/indexer/src/handlers/mod.rs @@ -139,80 +139,6 @@ macro_rules! define_handler { } } - #[async_trait::async_trait] - impl sui_indexer_alt_framework::postgres::handler::Handler for $handler { - async fn commit<'a>( - values: &[Self::Value], - conn: &mut sui_pg_db::Connection<'a>, - ) -> anyhow::Result { - use diesel_async::RunQueryDsl; - Ok(diesel::insert_into(deeplook_schema::schema::$table::table) - .values(values) - .on_conflict_do_nothing() - .execute(conn) - .await?) - } - } - }; - { - name: $handler:ident, - processor_name: $proc_name:literal, - event_type: $event:ty, - db_model: $model:ty, - table: $table:ident, - tx_context: |$tx:ident, $checkpoint:ident, $env:ident| $ctx:expr, - map_event: |$ev:ident, $meta:ident, $ctx_var:ident| $body:expr - } => { - pub struct $handler { - env: $crate::DeepbookEnv, - } - - impl $handler { - pub fn new(env: $crate::DeepbookEnv) -> Self { - Self { env } - } - } - - #[async_trait::async_trait] - impl sui_indexer_alt_framework::pipeline::Processor for $handler { - const NAME: &'static str = $proc_name; - type Value = $model; - - async fn process( - &self, - checkpoint: &std::sync::Arc, - ) -> anyhow::Result> { - use $crate::handlers::{is_deepbook_tx, EventMeta}; - use $crate::traits::MoveStruct; - - let mut results = vec![]; - for tx in &checkpoint.transactions { - if !is_deepbook_tx(tx, &checkpoint.object_set, self.env) { - continue; - } - let Some(events) = &tx.events else { continue }; - - let $tx = tx; - let $checkpoint = checkpoint; - let $env = self.env; - let ctx = { $ctx }; - - let base_meta = EventMeta::from_checkpoint_tx(checkpoint, tx); - - for (index, ev) in events.data.iter().enumerate() { - if <$event>::matches_event_type(&ev.type_, self.env) { - let $ev: $event = bcs::from_bytes(&ev.contents)?; - let $meta = base_meta.with_index(index); - let $ctx_var = &ctx; - results.push($body); - tracing::debug!("Observed {} event", $proc_name); - } - } - } - Ok(results) - } - } - #[async_trait::async_trait] impl sui_indexer_alt_framework::postgres::handler::Handler for $handler { async fn commit<'a>( @@ -230,86 +156,6 @@ macro_rules! define_handler { }; } -#[macro_export] -macro_rules! define_multi_handler { - { - name: $handler:ident, - processor_name: $proc_name:literal, - db_model: $model:ty, - table: $table:ident, - events: [ - $( - { - event_type: $event:ty, - map_event: |$ev:ident, $meta:ident| $body:expr - } - ),+ $(,)? - ] - } => { - pub struct $handler { - env: $crate::DeepbookEnv, - } - - impl $handler { - pub fn new(env: $crate::DeepbookEnv) -> Self { - Self { env } - } - } - - #[async_trait::async_trait] - impl sui_indexer_alt_framework::pipeline::Processor for $handler { - const NAME: &'static str = $proc_name; - type Value = $model; - - async fn process( - &self, - checkpoint: &std::sync::Arc, - ) -> anyhow::Result> { - use $crate::handlers::{is_deepbook_tx, EventMeta}; - use $crate::traits::MoveStruct; - - let mut results = vec![]; - for tx in &checkpoint.transactions { - if !is_deepbook_tx(tx, &checkpoint.object_set, self.env) { - continue; - } - let Some(events) = &tx.events else { continue }; - - let base_meta = EventMeta::from_checkpoint_tx(checkpoint, tx); - - for (index, ev) in events.data.iter().enumerate() { - let mut handled = false; - $( - if !handled && <$event>::matches_event_type(&ev.type_, self.env) { - let $ev: $event = bcs::from_bytes(&ev.contents)?; - let $meta = base_meta.with_index(index); - results.push($body); - tracing::debug!("Observed {} event", $proc_name); - handled = true; - } - )+ - } - } - Ok(results) - } - } - - #[async_trait::async_trait] - impl sui_indexer_alt_framework::postgres::handler::Handler for $handler { - async fn commit<'a>( - values: &[Self::Value], - conn: &mut sui_pg_db::Connection<'a>, - ) -> anyhow::Result { - use diesel_async::RunQueryDsl; - Ok(diesel::insert_into(deeplook_schema::schema::$table::table) - .values(values) - .on_conflict_do_nothing() - .execute(conn) - .await?) - } - } - }; -} pub mod balances_handler; pub mod flash_loan_handler; pub mod order_fill_handler; diff --git a/crates/indexer/src/handlers/order_update_handler.rs b/crates/indexer/src/handlers/order_update_handler.rs index 034ad6e..9124077 100644 --- a/crates/indexer/src/handlers/order_update_handler.rs +++ b/crates/indexer/src/handlers/order_update_handler.rs @@ -1,110 +1,210 @@ -use crate::define_multi_handler; +use crate::handlers::{is_deepbook_tx, try_extract_move_call_package}; use crate::models::deepbook::order::{OrderCanceled, OrderModified}; use crate::models::deepbook::order_info::{OrderExpired, OrderPlaced}; +use crate::traits::MoveStruct; use crate::utils::ms_to_secs; +use crate::DeepbookEnv; +use async_trait::async_trait; use deeplook_schema::models::{OrderUpdate, OrderUpdateStatus}; +use deeplook_schema::schema::order_updates; +use diesel_async::RunQueryDsl; +use std::sync::Arc; +use sui_indexer_alt_framework::pipeline::Processor; +use sui_indexer_alt_framework::postgres::handler::Handler; +use sui_indexer_alt_framework::postgres::Connection; +use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint; +use sui_types::transaction::TransactionDataAPI; +use tracing::debug; -define_multi_handler! { - name: OrderUpdateHandler, - processor_name: "order_update", - db_model: OrderUpdate, - table: order_updates, - events: [ - { - event_type: OrderPlaced, - map_event: |event, meta| OrderUpdate { - event_digest: meta.event_digest(), - digest: meta.digest(), - sender: meta.sender(), - checkpoint: meta.checkpoint(), - checkpoint_timestamp_ms: meta.checkpoint_timestamp_ms(), - timestamp: ms_to_secs(meta.checkpoint_timestamp_ms()), - package: meta.package(), - status: OrderUpdateStatus::Placed, - pool_id: event.pool_id.to_string(), - order_id: event.order_id.to_string(), - client_order_id: event.client_order_id as i64, - price: event.price as i64, - is_bid: event.is_bid, - onchain_timestamp: event.timestamp as i64, - original_quantity: event.placed_quantity as i64, - quantity: event.placed_quantity as i64, - filled_quantity: 0, - trader: event.trader.to_string(), - balance_manager_id: event.balance_manager_id.to_string(), - } - }, - { - event_type: OrderModified, - map_event: |event, meta| OrderUpdate { - event_digest: meta.event_digest(), - digest: meta.digest(), - sender: meta.sender(), - checkpoint: meta.checkpoint(), - checkpoint_timestamp_ms: meta.checkpoint_timestamp_ms(), - timestamp: ms_to_secs(meta.checkpoint_timestamp_ms()), - package: meta.package(), - status: OrderUpdateStatus::Modified, - pool_id: event.pool_id.to_string(), - order_id: event.order_id.to_string(), - client_order_id: event.client_order_id as i64, - price: event.price as i64, - is_bid: event.is_bid, - onchain_timestamp: event.timestamp as i64, - original_quantity: event.previous_quantity as i64, - quantity: event.new_quantity as i64, - filled_quantity: event.filled_quantity as i64, - trader: event.trader.to_string(), - balance_manager_id: event.balance_manager_id.to_string(), - } - }, - { - event_type: OrderCanceled, - map_event: |event, meta| OrderUpdate { - event_digest: meta.event_digest(), - digest: meta.digest(), - sender: meta.sender(), - checkpoint: meta.checkpoint(), - checkpoint_timestamp_ms: meta.checkpoint_timestamp_ms(), - timestamp: ms_to_secs(meta.checkpoint_timestamp_ms()), - package: meta.package(), - status: OrderUpdateStatus::Canceled, - pool_id: event.pool_id.to_string(), - order_id: event.order_id.to_string(), - client_order_id: event.client_order_id as i64, - price: event.price as i64, - is_bid: event.is_bid, - onchain_timestamp: event.timestamp as i64, - original_quantity: event.original_quantity as i64, - quantity: event.base_asset_quantity_canceled as i64, - filled_quantity: (event.original_quantity - event.base_asset_quantity_canceled) as i64, - trader: event.trader.to_string(), - balance_manager_id: event.balance_manager_id.to_string(), +type TransactionMetadata = (String, u64, u64, String, String); + +pub struct OrderUpdateHandler { + env: DeepbookEnv, +} + +impl OrderUpdateHandler { + pub fn new(env: DeepbookEnv) -> Self { + Self { env } + } +} + +#[async_trait] +impl Processor for OrderUpdateHandler { + const NAME: &'static str = "order_update"; + type Value = OrderUpdate; + + async fn process(&self, checkpoint: &Arc) -> anyhow::Result> { + let mut results = vec![]; + + for tx in &checkpoint.transactions { + if !is_deepbook_tx(tx, &checkpoint.object_set, self.env) { + continue; } - }, - { - event_type: OrderExpired, - map_event: |event, meta| OrderUpdate { - event_digest: meta.event_digest(), - digest: meta.digest(), - sender: meta.sender(), - checkpoint: meta.checkpoint(), - checkpoint_timestamp_ms: meta.checkpoint_timestamp_ms(), - timestamp: ms_to_secs(meta.checkpoint_timestamp_ms()), - package: meta.package(), - status: OrderUpdateStatus::Expired, - pool_id: event.pool_id.to_string(), - order_id: event.order_id.to_string(), - client_order_id: event.client_order_id as i64, - price: event.price as i64, - is_bid: event.is_bid, - onchain_timestamp: event.timestamp as i64, - original_quantity: event.original_quantity as i64, - quantity: event.base_asset_quantity_canceled as i64, - filled_quantity: (event.original_quantity - event.base_asset_quantity_canceled) as i64, - trader: event.trader.to_string(), - balance_manager_id: event.balance_manager_id.to_string(), + let Some(events) = &tx.events else { + continue; + }; + + let package = try_extract_move_call_package(tx).unwrap_or_default(); + let metadata = ( + tx.transaction.sender().to_string(), + checkpoint.summary.sequence_number, + checkpoint.summary.timestamp_ms, + tx.transaction.digest().to_string(), + package.clone(), + ); + + for (index, ev) in events.data.iter().enumerate() { + if OrderPlaced::matches_event_type(&ev.type_, self.env) { + let event = bcs::from_bytes(&ev.contents)?; + results.push(process_order_placed(event, metadata.clone(), index)); + debug!("Observed Deepbook Order Placed {:?}", tx); + } else if OrderModified::matches_event_type(&ev.type_, self.env) { + let event = bcs::from_bytes(&ev.contents)?; + results.push(process_order_modified(event, metadata.clone(), index)); + debug!("Observed Deepbook Order Modified {:?}", tx); + } else if OrderCanceled::matches_event_type(&ev.type_, self.env) { + let event = bcs::from_bytes(&ev.contents)?; + results.push(process_order_canceled(event, metadata.clone(), index)); + debug!("Observed Deepbook Order Canceled {:?}", tx); + } else if OrderExpired::matches_event_type(&ev.type_, self.env) { + let event = bcs::from_bytes(&ev.contents)?; + results.push(process_order_expired(event, metadata.clone(), index)); + debug!("Observed Deepbook Order Expired {:?}", tx); + } } } - ] + Ok(results) + } +} + +#[async_trait] +impl Handler for OrderUpdateHandler { + async fn commit<'a>( + values: &[Self::Value], + conn: &mut Connection<'a>, + ) -> anyhow::Result { + Ok(diesel::insert_into(order_updates::table) + .values(values) + .on_conflict_do_nothing() + .execute(conn) + .await?) + } +} + +fn process_order_placed( + order_placed: OrderPlaced, + (sender, checkpoint, checkpoint_timestamp_ms, digest, package): TransactionMetadata, + event_index: usize, +) -> OrderUpdate { + let event_digest = format!("{digest}{event_index}"); + OrderUpdate { + event_digest, + digest, + sender, + checkpoint: checkpoint as i64, + checkpoint_timestamp_ms: checkpoint_timestamp_ms as i64, + timestamp: ms_to_secs(checkpoint_timestamp_ms as i64), + package, + status: OrderUpdateStatus::Placed, + pool_id: order_placed.pool_id.to_string(), + order_id: order_placed.order_id.to_string(), + client_order_id: order_placed.client_order_id as i64, + price: order_placed.price as i64, + is_bid: order_placed.is_bid, + onchain_timestamp: order_placed.timestamp as i64, + original_quantity: order_placed.placed_quantity as i64, + quantity: order_placed.placed_quantity as i64, + filled_quantity: 0, + trader: order_placed.trader.to_string(), + balance_manager_id: order_placed.balance_manager_id.to_string(), + } +} + +fn process_order_modified( + order_modified: OrderModified, + (sender, checkpoint, checkpoint_timestamp_ms, digest, package): TransactionMetadata, + event_index: usize, +) -> OrderUpdate { + let event_digest = format!("{digest}{event_index}"); + OrderUpdate { + digest, + event_digest, + sender, + checkpoint: checkpoint as i64, + checkpoint_timestamp_ms: checkpoint_timestamp_ms as i64, + timestamp: ms_to_secs(checkpoint_timestamp_ms as i64), + package, + status: OrderUpdateStatus::Modified, + pool_id: order_modified.pool_id.to_string(), + order_id: order_modified.order_id.to_string(), + client_order_id: order_modified.client_order_id as i64, + price: order_modified.price as i64, + is_bid: order_modified.is_bid, + onchain_timestamp: order_modified.timestamp as i64, + original_quantity: order_modified.previous_quantity as i64, + quantity: order_modified.new_quantity as i64, + filled_quantity: order_modified.filled_quantity as i64, + trader: order_modified.trader.to_string(), + balance_manager_id: order_modified.balance_manager_id.to_string(), + } +} + +fn process_order_canceled( + order_canceled: OrderCanceled, + (sender, checkpoint, checkpoint_timestamp_ms, digest, package): TransactionMetadata, + event_index: usize, +) -> OrderUpdate { + let event_digest = format!("{digest}{event_index}"); + OrderUpdate { + digest, + event_digest, + sender, + checkpoint: checkpoint as i64, + checkpoint_timestamp_ms: checkpoint_timestamp_ms as i64, + timestamp: ms_to_secs(checkpoint_timestamp_ms as i64), + package, + status: OrderUpdateStatus::Canceled, + pool_id: order_canceled.pool_id.to_string(), + order_id: order_canceled.order_id.to_string(), + client_order_id: order_canceled.client_order_id as i64, + price: order_canceled.price as i64, + is_bid: order_canceled.is_bid, + onchain_timestamp: order_canceled.timestamp as i64, + original_quantity: order_canceled.original_quantity as i64, + quantity: order_canceled.base_asset_quantity_canceled as i64, + filled_quantity: (order_canceled.original_quantity + - order_canceled.base_asset_quantity_canceled) as i64, + trader: order_canceled.trader.to_string(), + balance_manager_id: order_canceled.balance_manager_id.to_string(), + } +} + +fn process_order_expired( + order_expired: OrderExpired, + (sender, checkpoint, checkpoint_timestamp_ms, digest, package): TransactionMetadata, + event_index: usize, +) -> OrderUpdate { + let event_digest = format!("{digest}{event_index}"); + OrderUpdate { + digest, + event_digest, + sender, + checkpoint: checkpoint as i64, + checkpoint_timestamp_ms: checkpoint_timestamp_ms as i64, + timestamp: ms_to_secs(checkpoint_timestamp_ms as i64), + package, + status: OrderUpdateStatus::Expired, + pool_id: order_expired.pool_id.to_string(), + order_id: order_expired.order_id.to_string(), + client_order_id: order_expired.client_order_id as i64, + price: order_expired.price as i64, + is_bid: order_expired.is_bid, + onchain_timestamp: order_expired.timestamp as i64, + original_quantity: order_expired.original_quantity as i64, + quantity: order_expired.base_asset_quantity_canceled as i64, + filled_quantity: (order_expired.original_quantity + - order_expired.base_asset_quantity_canceled) as i64, + trader: order_expired.trader.to_string(), + balance_manager_id: order_expired.balance_manager_id.to_string(), + } } diff --git a/crates/indexer/src/handlers/trade_params_update_handler.rs b/crates/indexer/src/handlers/trade_params_update_handler.rs index a532cb2..2d9d664 100644 --- a/crates/indexer/src/handlers/trade_params_update_handler.rs +++ b/crates/indexer/src/handlers/trade_params_update_handler.rs @@ -1,36 +1,97 @@ -use crate::define_handler; +use crate::handlers::{is_deepbook_tx, try_extract_move_call_package}; use crate::models::deepbook::governance::TradeParamsUpdateEvent; +use crate::traits::MoveStruct; use crate::utils::ms_to_secs; +use crate::DeepbookEnv; +use async_trait::async_trait; use deeplook_schema::models::TradeParamsUpdate; +use deeplook_schema::schema::trade_params_update; +use diesel_async::RunQueryDsl; +use std::sync::Arc; +use sui_indexer_alt_framework::pipeline::Processor; +use sui_indexer_alt_framework::postgres::handler::Handler; +use sui_indexer_alt_framework::postgres::Connection; +use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint; +use sui_types::transaction::TransactionDataAPI; +use tracing::debug; -define_handler! { - name: TradeParamsUpdateHandler, - processor_name: "trade_params_update", - event_type: TradeParamsUpdateEvent, - db_model: TradeParamsUpdate, - table: trade_params_update, - tx_context: |tx, checkpoint, env| { - let deepbook_addresses = env.package_addresses(); - let pool = tx.input_objects(&checkpoint.object_set).find(|o| { - matches!(o.data.struct_tag(), Some(struct_tag) - if deepbook_addresses.iter().any(|addr| struct_tag.address == *addr) - && struct_tag.module.as_str() == "pool" - && struct_tag.name.as_str() == "Pool") - }); - pool.map(|o| o.id().to_hex_uncompressed()) - .unwrap_or_else(|| "0x0".to_string()) - }, - map_event: |event, meta, pool_id| TradeParamsUpdate { - event_digest: meta.event_digest(), - digest: meta.digest(), - sender: meta.sender(), - checkpoint: meta.checkpoint(), - checkpoint_timestamp_ms: meta.checkpoint_timestamp_ms(), - timestamp: ms_to_secs(meta.checkpoint_timestamp_ms()), - package: meta.package(), - pool_id: pool_id.clone(), - taker_fee: event.taker_fee as i64, - maker_fee: event.maker_fee as i64, - stake_required: event.stake_required as i64, +pub struct TradeParamsUpdateHandler { + env: DeepbookEnv, +} + +impl TradeParamsUpdateHandler { + pub fn new(env: DeepbookEnv) -> Self { + Self { env } + } +} + +#[async_trait] +impl Processor for TradeParamsUpdateHandler { + const NAME: &'static str = "trade_params_update"; + type Value = TradeParamsUpdate; + + async fn process(&self, checkpoint: &Arc) -> anyhow::Result> { + let mut results = vec![]; + for tx in &checkpoint.transactions { + if !is_deepbook_tx(tx, &checkpoint.object_set, self.env) { + continue; + } + let Some(events) = &tx.events else { + continue; + }; + + let package = try_extract_move_call_package(tx).unwrap_or_default(); + let checkpoint_timestamp_ms = checkpoint.summary.timestamp_ms as i64; + let checkpoint_seq = checkpoint.summary.sequence_number as i64; + let digest = tx.transaction.digest(); + + // Get package addresses for deepbook + let deepbook_addresses = self.env.package_addresses(); + + let pool = tx + .input_objects(&checkpoint.object_set) + .find(|o| matches!(o.data.struct_tag(), Some(struct_tag) + if deepbook_addresses.iter().any(|addr| struct_tag.address == *addr) && struct_tag.name.as_str() == "Pool")); + let pool_id = pool + .map(|o| o.id().to_hex_uncompressed()) + .unwrap_or("0x0".to_string()); + + for (index, ev) in events.data.iter().enumerate() { + if !TradeParamsUpdateEvent::matches_event_type(&ev.type_, self.env) { + continue; + } + let event: TradeParamsUpdateEvent = bcs::from_bytes(&ev.contents)?; + let data = TradeParamsUpdate { + digest: digest.to_string(), + event_digest: format!("{digest}{index}"), + sender: tx.transaction.sender().to_string(), + checkpoint: checkpoint_seq, + checkpoint_timestamp_ms, + timestamp: ms_to_secs(checkpoint_timestamp_ms), + package: package.clone(), + pool_id: pool_id.clone(), + taker_fee: event.taker_fee as i64, + maker_fee: event.maker_fee as i64, + stake_required: event.stake_required as i64, + }; + debug!("Observed Deepbook Trade Params Update Event {:?}", data); + results.push(data); + } + } + Ok(results) + } +} + +#[async_trait] +impl Handler for TradeParamsUpdateHandler { + async fn commit<'a>( + values: &[Self::Value], + conn: &mut Connection<'a>, + ) -> anyhow::Result { + Ok(diesel::insert_into(trade_params_update::table) + .values(values) + .on_conflict_do_nothing() + .execute(conn) + .await?) } } diff --git a/crates/orderbook/src/handlers/mod.rs b/crates/orderbook/src/handlers/mod.rs index 4f719ac..6781972 100644 --- a/crates/orderbook/src/handlers/mod.rs +++ b/crates/orderbook/src/handlers/mod.rs @@ -1,58 +1 @@ -use deeplook_indexer::DeepbookEnv; -use sui_types::full_checkpoint_content::CheckpointTransaction; -use sui_types::transaction::{Command, TransactionDataAPI}; - pub mod orderbook_order_update_handler; - -pub(crate) fn is_deepbook_tx(tx: &CheckpointTransaction, env: DeepbookEnv) -> bool { - let deepbook_addresses = env.package_addresses(); - let deepbook_packages = env.package_ids(); - - // Check input objects against all known package versions - let has_deepbook_input = tx.input_objects.iter().any(|obj| { - obj.data - .type_() - .map(|t| deepbook_addresses.iter().any(|addr| t.address() == *addr)) - .unwrap_or_default() - }); - - if has_deepbook_input { - return true; - } - - // Check if transaction has deepbook events from any version - if let Some(events) = &tx.events { - let has_deepbook_event = events.data.iter().any(|event| { - deepbook_addresses - .iter() - .any(|addr| event.type_.address == *addr) - }); - if has_deepbook_event { - return true; - } - } - - // Check if transaction calls a deepbook function from any version - let txn_kind = tx.transaction.transaction_data().kind(); - let has_deepbook_call = txn_kind.iter_commands().any(|cmd| { - if let Command::MoveCall(move_call) = cmd { - deepbook_packages - .iter() - .any(|pkg| *pkg == move_call.package) - } else { - false - } - }); - - has_deepbook_call -} - -pub(crate) fn try_extract_move_call_package(tx: &CheckpointTransaction) -> Option { - let txn_kind = tx.transaction.transaction_data().kind(); - let first_command = txn_kind.iter_commands().next()?; - if let Command::MoveCall(move_call) = first_command { - Some(move_call.package.to_string()) - } else { - None - } -}