From dc4e729438ba04546a35a1f0938f3449bd59c8c5 Mon Sep 17 00:00:00 2001 From: kwiss Date: Mon, 9 Dec 2024 00:43:48 +0100 Subject: [PATCH 1/2] feat(executed): updated shitty executed function add current block tracking --- ark-sqlx/src/providers/marketplace/order.rs | 784 ++++++++++++-------- arkchain-indexer-marketplace/src/main.rs | 68 +- 2 files changed, 542 insertions(+), 310 deletions(-) diff --git a/ark-sqlx/src/providers/marketplace/order.rs b/ark-sqlx/src/providers/marketplace/order.rs index 542da8a5..e39e4cbd 100644 --- a/ark-sqlx/src/providers/marketplace/order.rs +++ b/ark-sqlx/src/providers/marketplace/order.rs @@ -100,6 +100,17 @@ struct TokenInfo { chain_id: String, } +#[derive(sqlx::FromRow)] +struct OrginalOrder { + token_id: String, + contract_address: String, + chain_id: String, + token_id_hex: String, + amount: Option, + currency_address: Option, + event_type: TokenEventType, +} + #[derive(sqlx::FromRow)] struct Offer { offer_amount: Option, @@ -192,11 +203,11 @@ pub struct OfferData { to_address: String, } -pub struct OfferExecutedInfo { +pub struct OrderExecutedInfo { block_timestamp: u64, contract_address: String, token_id: String, - to_address: String, + to_address: Option, price: String, currency_chain_id: String, currency_address: String, @@ -291,31 +302,12 @@ impl OrderProvider { LIMIT 1 ); "; - let exists: i32 = sqlx::query_scalar(query) + let exists: bool = sqlx::query_scalar(query) // Changed from i32 to bool .bind(contract_address) .bind(token_id) .bind(chain_id) .fetch_one(&client.pool) .await?; - Ok(exists != 0) - } - - async fn order_hash_exists_in_token( - client: &SqlxCtxPg, - order_hash: &str, - ) -> Result { - let query = " - SELECT EXISTS ( - SELECT 1 - FROM token - WHERE listing_orderhash = $1 - LIMIT 1 - ); - "; - let exists: bool = sqlx::query_scalar(query) - .bind(order_hash) - .fetch_one(&client.pool) - .await?; Ok(exists) } @@ -907,127 +899,285 @@ impl OrderProvider { } } - pub async fn update_token_data_on_status_executed( + // Common function for updating best offer + pub async fn update_best_offer( client: &SqlxCtxPg, - info: &OfferExecutedInfo, + info: &OrderExecutedInfo, ) -> Result<(), ProviderError> { - let query = " - UPDATE token - SET - current_owner = $3, - updated_timestamp = $4, - last_price = $5, - currency_chain_id = $6, - currency_address = $7, - listing_start_amount = null, - listing_start_date = null, - listing_currency_address = null, - listing_currency_chain_id = null, - listing_timestamp = null, - listing_broker_id = null, - listing_orderhash = null, - listing_end_amount = null, - listing_end_date = null, - top_bid_currency_address = null, - top_bid_amount = null, - top_bid_broker_id = null, - top_bid_order_hash = null, - top_bid_start_date = null, - top_bid_end_date = null, - held_timestamp = $8, - status = $9, - buy_in_progress = false, - has_bid = false - WHERE contract_address = $1 AND token_id = $2; - "; - - sqlx::query(query) - .bind(&info.contract_address) - .bind(&info.token_id) - .bind(&info.to_address) - .bind(info.block_timestamp as i64) - .bind(&info.price) - .bind(&info.currency_chain_id) - .bind(&info.currency_address) - .bind(info.block_timestamp as i64) - .bind(EXECUTED_STR) - .execute(&client.pool) - .await?; - - // remove offers belonging to old owner - let delete_query = "DELETE FROM token_offer WHERE offer_maker = $1 AND contract_address = $2 AND token_id = $3"; - sqlx::query(delete_query) - .bind(&info.to_address) - .bind(&info.contract_address) - .bind(&info.token_id) - .execute(&client.pool) - .await?; - - let now = chrono::Utc::now().timestamp(); let select_query = " - SELECT hex_to_decimal(offer_amount), currency_address, start_date, end_date, order_hash - FROM token_offer - WHERE contract_address = $1 AND token_id = $2 AND end_date >= $3 - ORDER BY offer_amount DESC - LIMIT 1 - "; - let best_offer: Option<(BigDecimal, String, i64, i64, String)> = + SELECT hex_to_decimal(offer_amount), currency_address, currency_chain_id, start_date, end_date, order_hash + FROM token_offer + WHERE contract_address = $1 + AND token_id = $2 + AND end_date >= $3 + AND offer_maker != $4 + ORDER BY offer_amount DESC + LIMIT 1"; + + let best_offer: Option<(BigDecimal, String, String, i64, i64, String)> = sqlx::query_as(select_query) .bind(&info.contract_address) .bind(&info.token_id) - .bind(now) + .bind(info.block_timestamp as i64) + .bind(&info.to_address) .fetch_optional(&client.pool) .await?; - if let Some((offer_amount, currency_address, start_date, end_date, top_bid_order_hash)) = - best_offer - { - let update_query = " + match best_offer { + Some((amount, curr_addr, curr_chain_id, start, end, order_hash)) => { + sqlx::query( + " UPDATE token - SET top_bid_amount = $3, top_bid_start_date = $4, top_bid_end_date = $5, top_bid_currency_address = $6, top_bid_order_hash = $7, has_bid = true - WHERE contract_address = $1 AND token_id = $2 - "; - sqlx::query(update_query) + SET top_bid_amount = $3, + top_bid_start_date = $4, + top_bid_end_date = $5, + top_bid_currency_address = $6, + top_bid_order_hash = $7, + has_bid = true + WHERE contract_address = $1 AND token_id = $2", + ) .bind(&info.contract_address) .bind(&info.token_id) - .bind(offer_amount) - .bind(start_date) - .bind(end_date) - .bind(currency_address) - .bind(top_bid_order_hash) + .bind(amount) + .bind(start) + .bind(end) + .bind(curr_addr) + .bind(order_hash) .execute(&client.pool) .await?; - } else { - let update_query = " - UPDATE token - SET top_bid_amount = NULL, top_bid_start_date = NULL, top_bid_end_date = NULL, top_bid_currency_address = NULL, top_bid_order_hash = NULL, has_bid = false - WHERE contract_address = $1 AND token_id = $2 - "; - sqlx::query(update_query) + } + None => { + sqlx::query( + " + UPDATE token + SET top_bid_amount = NULL, + top_bid_start_date = NULL, + top_bid_end_date = NULL, + top_bid_currency_address = NULL, + top_bid_order_hash = NULL, + has_bid = false + WHERE contract_address = $1 AND token_id = $2", + ) .bind(&info.contract_address) .bind(&info.token_id) .execute(&client.pool) .await?; + } } + Ok(()) + } + + pub async fn verify_token_status( + client: &SqlxCtxPg, + contract_address: &str, + token_id: &str, + ) -> Result<(), ProviderError> { + let verify_query = " + SELECT status, buy_in_progress, + listing_start_amount, listing_start_date, + listing_currency_address, listing_currency_chain_id, + listing_timestamp, listing_broker_id, listing_orderhash, + listing_end_amount, listing_end_date, current_owner + FROM token + WHERE contract_address = $1 AND token_id = $2"; + + let result = sqlx::query(verify_query) + .bind(contract_address) + .bind(token_id) + .fetch_one(&client.pool) + .await?; + + info!( + "Token {}/{} verification:\n\ + - Status: {}\n\ + - Buy in progress: {}\n\ + - Current owner: {}\n\ + - Listing start amount: {}\n\ + - Listing start date: {}\n\ + - Listing currency address: {}\n\ + - Listing currency chain id: {}\n\ + - Listing timestamp: {}\n\ + - Listing broker id: {}\n\ + - Listing order hash: {}\n\ + - Listing end amount: {}\n\ + - Listing end date: {}", + contract_address, + token_id, + result.get::("status"), + result.get::("buy_in_progress"), + result + .get::, _>("current_owner") + .unwrap_or_default(), + result + .get::, _>("listing_start_amount") + .unwrap_or_default(), + result + .get::, _>("listing_start_date") + .unwrap_or_default(), + result + .get::, _>("listing_currency_address") + .unwrap_or_default(), + result + .get::, _>("listing_currency_chain_id") + .unwrap_or_default(), + result + .get::, _>("listing_timestamp") + .unwrap_or_default(), + result + .get::, _>("listing_broker_id") + .unwrap_or_default(), + result + .get::, _>("listing_orderhash") + .unwrap_or_default(), + result + .get::, _>("listing_end_amount") + .unwrap_or_default(), + result + .get::, _>("listing_end_date") + .unwrap_or_default(), + ); Ok(()) } - async fn insert_event_history( + pub async fn update_token_data_on_listing_executed( client: &SqlxCtxPg, - event_data: &EventHistoryData, + info: &OrderExecutedInfo, ) -> Result<(), ProviderError> { - if !Self::token_exists( - client, - &event_data.contract_address, - &event_data.token_id, - &event_data.chain_id, - ) - .await? - { - return Err(ProviderError::from("Token does not exist")); + // 1. Update token with new owner and clean listing data + Self::verify_token_status(client, &info.contract_address, &info.token_id).await?; + + let base_update = " + UPDATE token + SET + current_owner = CASE + WHEN $3::text IS NOT NULL THEN $3::text + ELSE current_owner + END, + updated_timestamp = $4, + last_price = $5, + currency_chain_id = $6, + currency_address = $7, + listing_start_amount = null, + listing_start_date = null, + listing_currency_address = null, + listing_currency_chain_id = null, + listing_timestamp = null, + listing_broker_id = null, + listing_orderhash = null, + listing_end_amount = null, + listing_end_date = null, + status = $8, + buy_in_progress = false + WHERE contract_address = $1 AND token_id = $2"; + + let result = sqlx::query(base_update) + .bind(&info.contract_address) + .bind(&info.token_id) + .bind(&info.to_address) // buyer address + .bind(info.block_timestamp as i64) + .bind(&info.price) + .bind(&info.currency_chain_id) + .bind(&info.currency_address) + .bind(OrderStatus::Executed.to_string()) + .execute(&client.pool) + .await?; + + info!("Updated token status: {:?}", result.rows_affected()); + + if result.rows_affected() == 0 { + error!( + "No rows updated for token {}/{}", + info.contract_address, info.token_id + ); } + Self::verify_token_status(client, &info.contract_address, &info.token_id).await?; + // 2. Remove buyer's offers only + let delete_buyer_offers = " + DELETE FROM token_offer + WHERE offer_maker = $1 + AND contract_address = $2 + AND token_id = $3"; + + sqlx::query(delete_buyer_offers) + .bind(&info.to_address) + .bind(&info.contract_address) + .bind(&info.token_id) + .execute(&client.pool) + .await?; + + // 3. Update best offer (excluding buyer) + Self::update_best_offer(client, info).await?; + + Ok(()) + } + + pub async fn update_token_data_on_offer_executed( + client: &SqlxCtxPg, + info: &OrderExecutedInfo, + ) -> Result<(), ProviderError> { + // 1. Update token and clean listing/offer data + let base_update = " + UPDATE token + SET + current_owner = CASE + WHEN $3::text IS NOT NULL THEN $3::text + ELSE current_owner + END, + updated_timestamp = $4, + last_price = $5, + currency_chain_id = $6, + currency_address = $7, + listing_start_amount = null, + listing_start_date = null, + listing_currency_address = null, + listing_currency_chain_id = null, + listing_timestamp = null, + listing_broker_id = null, + listing_orderhash = null, + listing_end_amount = null, + listing_end_date = null, + status = $8, + buy_in_progress = false + WHERE contract_address = $1 AND token_id = $2"; + + sqlx::query(base_update) + .bind(&info.contract_address) + .bind(&info.token_id) + .bind(&info.to_address) // offer maker becomes owner + .bind(info.block_timestamp as i64) + .bind(&info.price) + .bind(&info.currency_chain_id) + .bind(&info.currency_address) + .bind(OrderStatus::Executed.to_string()) + .execute(&client.pool) + .await?; + + // 2. Remove all offers from the new owner + let delete_new_owner_offers = " + DELETE FROM token_offer + WHERE offer_maker = $1 + AND contract_address = $2 + AND token_id = $3"; + + sqlx::query(delete_new_owner_offers) + .bind(&info.to_address) + .bind(&info.contract_address) + .bind(&info.token_id) + .execute(&client.pool) + .await?; + + // 3. Update best offer (excluding new owner) + Self::update_best_offer(client, info).await?; + + Ok(()) + } + + async fn insert_event_history( + client: &SqlxCtxPg, + event_data: &EventHistoryData, + ) -> Result<(), ProviderError> { let token_event_id = format!("{}_{}", &event_data.order_hash, event_data.block_timestamp); let eth_amount: Option; if event_data.currency_address == Some(CURRENCY_ADDRESS_ETH.to_string()) @@ -1375,11 +1525,14 @@ impl OrderProvider { if event_type == TokenEventType::Offer || event_type == TokenEventType::CollectionOffer { // create token without listing information let upsert_query = " - INSERT INTO token (contract_address, token_id, token_id_hex, chain_id, updated_timestamp, listing_orderhash, block_timestamp, status) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (contract_address, token_id, chain_id) - DO NOTHING; - "; + INSERT INTO token (contract_address, token_id, token_id_hex, chain_id, updated_timestamp, + listing_orderhash, block_timestamp, status, + buy_in_progress, has_bid, is_burned) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, + false, false, false) + ON CONFLICT (contract_address, token_id, chain_id) + DO NOTHING; + "; sqlx::query(upsert_query) .bind(contract_address.clone()) @@ -1421,44 +1574,48 @@ impl OrderProvider { } else { // create token with listing information let upsert_query = " - INSERT INTO token ( - contract_address, - token_id, - chain_id, - token_id_hex, - listing_timestamp, - updated_timestamp, - held_timestamp, - current_owner, - quantity, - listing_start_amount, - listing_end_amount, - listing_start_date, - listing_end_date, - listing_broker_id, - listing_orderhash, - listing_currency_address, - listing_currency_chain_id, - block_timestamp, - status, - listing_type) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) - ON CONFLICT (token_id, contract_address, chain_id) DO UPDATE SET - current_owner = EXCLUDED.current_owner, - token_id_hex = EXCLUDED.token_id_hex, - listing_timestamp = EXCLUDED.listing_timestamp, - listing_start_amount = EXCLUDED.listing_start_amount, - listing_end_amount = EXCLUDED.listing_end_amount, - listing_start_date = EXCLUDED.listing_start_date, - listing_end_date = EXCLUDED.listing_end_date, - listing_broker_id = EXCLUDED.listing_broker_id, - listing_orderhash = EXCLUDED.listing_orderhash, - status = EXCLUDED.status, - updated_timestamp = EXCLUDED.updated_timestamp, - listing_currency_address = EXCLUDED.listing_currency_address, - listing_currency_chain_id = EXCLUDED.listing_currency_chain_id, - listing_type = EXCLUDED.listing_type; - "; + INSERT INTO token ( + contract_address, + token_id, + chain_id, + token_id_hex, + listing_timestamp, + updated_timestamp, + held_timestamp, + current_owner, + quantity, + listing_start_amount, + listing_end_amount, + listing_start_date, + listing_end_date, + listing_broker_id, + listing_orderhash, + listing_currency_address, + listing_currency_chain_id, + block_timestamp, + status, + listing_type, + buy_in_progress, + has_bid, + is_burned) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, + false, false, false) + ON CONFLICT (token_id, contract_address, chain_id) DO UPDATE SET + current_owner = EXCLUDED.current_owner, + token_id_hex = EXCLUDED.token_id_hex, + listing_timestamp = EXCLUDED.listing_timestamp, + listing_start_amount = EXCLUDED.listing_start_amount, + listing_end_amount = EXCLUDED.listing_end_amount, + listing_start_date = EXCLUDED.listing_start_date, + listing_end_date = EXCLUDED.listing_end_date, + listing_broker_id = EXCLUDED.listing_broker_id, + listing_orderhash = EXCLUDED.listing_orderhash, + status = EXCLUDED.status, + updated_timestamp = EXCLUDED.updated_timestamp, + listing_currency_address = EXCLUDED.listing_currency_address, + listing_currency_chain_id = EXCLUDED.listing_currency_chain_id, + listing_type = EXCLUDED.listing_type; +"; let upsert_query_binded = sqlx::query(upsert_query) .bind(contract_address.clone()) @@ -1604,6 +1761,7 @@ impl OrderProvider { let tst_token_address = Felt::from_str(¤cy_address).map_err(|_| { ProviderError::ParsingError("Invalid currency address".to_string()) })?; + println!("tst_token_address: {:?}", tst_token_address); let decimals_selector = selector!("decimals"); let decimals = provider .call_contract_method(tst_token_address, decimals_selector) @@ -1704,7 +1862,22 @@ impl OrderProvider { block_timestamp: u64, data: &FulfilledData, ) -> Result<(), ProviderError> { - trace!("Registering fulfilled order {:?}", data); + // First check if an executed event exists with a later timestamp + let check_executed_query = " + SELECT block_timestamp + FROM token_event + WHERE order_hash = $1 + AND event_type = 'Executed' + AND block_timestamp > $2 + LIMIT 1; + "; + + let executed_exists = sqlx::query_scalar::<_, i64>(check_executed_query) + .bind(&data.order_hash) + .bind(block_timestamp as i64) + .fetch_optional(&client.pool) + .await?; + if let Some(token_data) = Self::get_token_data_by_order_hash(client, &data.order_hash).await? { @@ -1723,6 +1896,7 @@ impl OrderProvider { } } + // Always record the fulfill event Self::insert_event_history( client, &EventHistoryData { @@ -1742,30 +1916,60 @@ impl OrderProvider { ) .await?; - Self::update_token_status( - client, - &token_data.contract_address, - &token_data.token_id, - OrderStatus::Fulfilled, - ) - .await?; + // Only update statuses if no later executed event exists + if executed_exists.is_none() { + info!("Updating token status to Fulfilled as no later Executed event exists"); + Self::update_token_status( + client, + &token_data.contract_address, + &token_data.token_id, + OrderStatus::Fulfilled, + ) + .await?; - Self::update_offer_status(client, &data.order_hash, OrderStatus::Fulfilled).await?; + Self::update_offer_status(client, &data.order_hash, OrderStatus::Fulfilled).await?; + } else { + info!( + "Skipping status update for order {} as it was already executed", + data.order_hash + ); + } } else if let Some(offer_data) = Self::get_offer_data_by_order_hash(client, &data.order_hash).await? { - Self::update_token_status( - client, - &offer_data.contract_address, - &offer_data.token_id, - OrderStatus::Fulfilled, - ) - .await?; + // Only update status if no later executed event exists + if executed_exists.is_none() { + Self::update_token_status( + client, + &offer_data.contract_address, + &offer_data.token_id, + OrderStatus::Fulfilled, + ) + .await?; + } } Ok(()) } + /// This function checks if a currency mapping exists in the database + pub async fn check_currency_mapping_exists( + client: &SqlxCtxPg, + currency_chain_id: &str, + currency_address: &str, + ) -> Result { + let query = " + SELECT COUNT(*) FROM currency_mapping WHERE chain_id = $1 AND currency_address = $2 + "; + let count: i64 = sqlx::query_scalar(query) + .bind(currency_chain_id) + .bind(currency_address) + .fetch_one(&client.pool) + .await?; + + Ok(count > 0) + } + pub async fn register_executed( client: &SqlxCtxPg, redis_conn: Arc>, @@ -1773,140 +1977,132 @@ impl OrderProvider { block_timestamp: u64, data: &ExecutedData, ) -> Result<(), ProviderError> { + println!("executed event {}", data.order_hash); trace!("Registering executed order {:?}", data); - if let Some(offer_data) = - Self::get_offer_data_by_order_hash(client, &data.order_hash).await? - { - match Self::clear_tokens_cache(redis_conn.clone(), &offer_data.contract_address).await { + + // 1. Get the original order event (Listing or Offer) + let select_query = " + SELECT token_id, contract_address, chain_id, token_id_hex, amount, currency_address, event_type + FROM token_event + WHERE order_hash = $1 + AND event_type IN ('Listing', 'Offer', 'Auction') + ORDER BY block_timestamp DESC + LIMIT 1; + "; + + let original_order: Option = sqlx::query_as(select_query) + .bind(data.order_hash.clone()) + .fetch_optional(&client.pool) + .await?; + + // Always record the executed event, with or without original order data + if let Some(order) = original_order.as_ref() { + // Clear redis cache for this token + match Self::clear_tokens_cache(redis_conn.clone(), &order.contract_address).await { Ok(_) => {} Err(e) => { println!("Error when deleting cache : {}", e); } } - if let Some(token_data) = Self::get_token_data_by_id( + + // Insert the execution event with full data + Self::insert_event_history( client, - &offer_data.contract_address, - &offer_data.token_id, - &offer_data.chain_id, + &EventHistoryData { + order_hash: data.order_hash.clone(), + block_timestamp: block_timestamp as i64, + token_id: order.token_id.clone(), + token_id_hex: order.token_id_hex.clone(), + contract_address: order.contract_address.clone(), + chain_id: order.chain_id.clone(), + event_type: TokenEventType::Executed, + canceled_reason: None, + to_address: data.to.clone(), + from_address: data.from.clone(), + amount: Some(order.amount.clone().unwrap_or_default()), + currency_address: Some(order.currency_address.clone().unwrap_or_default()), + }, ) - .await? - { - /* EventType::Offer | EventType::CollectionOffer */ - let to_address = Some(offer_data.offer_maker.clone()); - Self::update_offer_status(client, &data.order_hash, OrderStatus::Executed).await?; - let from_address = Self::get_current_owner( - client, - &offer_data.contract_address, - &offer_data.token_id, - &offer_data.chain_id, - ) - .await?; - let params = OfferExecutedInfo { - block_timestamp, - contract_address: offer_data.contract_address.clone(), - token_id: offer_data.token_id.clone(), - to_address: offer_data.offer_maker.clone(), - price: offer_data.offer_amount.clone(), - currency_chain_id: offer_data.currency_chain_id.clone(), - currency_address: offer_data.currency_address.clone(), - }; - Self::update_token_data_on_status_executed(client, ¶ms).await?; - - Self::insert_event_history( - client, - &EventHistoryData { - order_hash: data.order_hash.clone(), - token_id: offer_data.token_id.clone(), - token_id_hex: token_data.token_id_hex.clone(), - contract_address: offer_data.contract_address.clone(), - chain_id: offer_data.chain_id.clone(), - event_type: TokenEventType::Executed, - block_timestamp: block_timestamp as i64, - canceled_reason: None, - to_address, - from_address, - amount: Some(offer_data.offer_amount.clone()), - currency_address: Some(offer_data.currency_address.clone()), - }, - ) - .await?; - } - } else { - // listing - let order_in_token = Self::order_hash_exists_in_token(client, &data.order_hash).await?; - if order_in_token { - if let Some(token_data) = - Self::get_token_data_by_order_hash(client, &data.order_hash).await? - { - match Self::clear_tokens_cache(redis_conn.clone(), &token_data.contract_address) - .await - { - Ok(_) => {} - Err(e) => { - println!("Error when deleting cache : {}", e); - } - } + .await?; - let fulfiller = Self::get_fulfiller_address_from_event( - client, - &token_data.contract_address, - &token_data.token_id, - &token_data.chain_id, - &data.order_hash, - ) - .await?; + println!("register_executed: {:?}", data); - /* EventType::Listing | EventType::Auction */ - let params = OfferExecutedInfo { - block_timestamp, - contract_address: token_data.contract_address.clone(), - token_id: token_data.token_id.clone(), - to_address: fulfiller.clone().unwrap_or_default(), - price: token_data.listing_start_amount.clone().unwrap_or_default(), - currency_chain_id: token_data.currency_chain_id.clone().unwrap_or_default(), - currency_address: token_data.currency_address.clone().unwrap_or_default(), - }; - Self::update_token_data_on_status_executed(client, ¶ms).await?; - Self::insert_event_history( - client, - &EventHistoryData { - order_hash: data.order_hash.clone(), - block_timestamp: block_timestamp as i64, - token_id: token_data.token_id.clone(), - token_id_hex: token_data.token_id_hex.clone(), - contract_address: token_data.contract_address.clone(), - chain_id: token_data.chain_id, - event_type: TokenEventType::Executed, - canceled_reason: None, - to_address: data.to.clone(), - amount: token_data.listing_start_amount, - from_address: data.from.clone(), - currency_address: token_data.currency_address.clone(), - }, - ) - .await?; + // Prepare common parameters for token update + let params = OrderExecutedInfo { + block_timestamp, + contract_address: order.contract_address.clone(), + token_id: order.token_id.clone(), + to_address: data.to.clone(), + price: order.amount.clone().unwrap_or_default(), + currency_chain_id: order.chain_id.clone(), + currency_address: order.currency_address.clone().unwrap_or_default(), + }; + + // Update token data based on event type + match order.event_type.to_db_string().as_str() { + "Listing" => { + Self::update_token_data_on_listing_executed(client, ¶ms).await?; + } + "Auction" => { + // First delete all offers for this token + let delete_offers_query = " + DELETE FROM token_offer + WHERE contract_address = $1 + AND token_id = $2 + AND chain_id = $3"; + + sqlx::query(delete_offers_query) + .bind(&order.contract_address) + .bind(&order.token_id) + .bind(&order.chain_id) + .execute(&client.pool) + .await?; + + // Then handle it like a listing + Self::update_token_data_on_listing_executed(client, ¶ms).await?; + } + "Offer" => { + Self::update_token_data_on_offer_executed(client, ¶ms).await?; + Self::update_offer_status(client, &data.order_hash, OrderStatus::Executed) + .await?; + } + _ => { + error!("Unknown event type: {}", order.event_type); } } - } - Ok(()) - } + } else { + error!( + "No original Listing or Offer found for order hash: {}", + data.order_hash + ); - /// This function checks if a currency mapping exists in the database - pub async fn check_currency_mapping_exists( - client: &SqlxCtxPg, - currency_chain_id: &str, - currency_address: &str, - ) -> Result { - let query = " - SELECT COUNT(*) FROM currency_mapping WHERE chain_id = $1 AND currency_address = $2 - "; - let count: i64 = sqlx::query_scalar(query) - .bind(currency_chain_id) - .bind(currency_address) - .fetch_one(&client.pool) + // Still record the executed event with available data + Self::insert_event_history( + client, + &EventHistoryData { + order_hash: data.order_hash.clone(), + block_timestamp: block_timestamp as i64, + token_id: "MISSING_DATA".to_string(), + token_id_hex: "MISSING_DATA".to_string(), + contract_address: "MISSING_DATA".to_string(), + chain_id: "MISSING_DATA".to_string(), + event_type: TokenEventType::Executed, + canceled_reason: Some("No original order found".to_string()), + to_address: data.to.clone(), + from_address: data.from.clone(), + amount: None, + currency_address: None, + }, + ) .await?; - Ok(count > 0) + error!( + "Recorded execution event with missing data for order hash: {:?}", + data + ); + } + + Ok(()) } pub async fn status_back_to_open( diff --git a/arkchain-indexer-marketplace/src/main.rs b/arkchain-indexer-marketplace/src/main.rs index 804a799e..ca0dfb73 100644 --- a/arkchain-indexer-marketplace/src/main.rs +++ b/arkchain-indexer-marketplace/src/main.rs @@ -1,7 +1,3 @@ -//! How to use Diri library. -//! -//! Can be run with `cargo run --example diri`. -//! use anyhow::Result; use ark_sqlx::providers::SqlxMarketplaceProvider; use arkproject::diri::{event_handler::EventHandler, Diri}; @@ -13,7 +9,7 @@ use starknet::{ core::types::BlockId, providers::{jsonrpc::HttpTransport, AnyProvider, JsonRpcClient, Provider}, }; -use std::{env, sync::Arc}; +use std::{env, sync::Arc, fs, path::Path}; use tracing::{error, info, trace, warn}; use tracing_subscriber::fmt; use tracing_subscriber::EnvFilter; @@ -28,6 +24,40 @@ struct DatabaseCredentials { host: String, } +#[derive(Debug)] +struct Checkpoint { + file_path: String, + current_block: u64, +} + +impl Checkpoint { + async fn new(file_path: String) -> Result { + let current_block = if Path::new(&file_path).exists() { + let contents = fs::read_to_string(&file_path)?; + contents.trim().parse::()? + } else { + fs::write(&file_path, "0")?; + trace!("Created new checkpoint file at {}", file_path); + 0 + }; + + Ok(Checkpoint { + file_path, + current_block, + }) + } + + async fn save(&self, block_number: u64) -> Result<()> { + fs::write(&self.file_path, block_number.to_string())?; + trace!("Saved checkpoint: block {}", block_number); + Ok(()) + } + + fn get_block(&self) -> u64 { + self.current_block + } +} + async fn get_database_url() -> Result { match std::env::var("DATABASE_URL") { Ok(url) => Ok(url), @@ -69,10 +99,6 @@ async fn main() -> Result<()> { HttpTransport::new(rpc_url_converted.clone()), ))); - // Quick launch locally: - // docker-compose up -d arkchain_postgres - // cd ark-sqlx - // sqlx database reset --database-url postgres://postgres:123@localhost:5432/arkchain-marketplace --source marketplace let storage = SqlxMarketplaceProvider::new(&database_uri).await?; let handler = DefaultEventHandler {}; @@ -83,7 +109,8 @@ async fn main() -> Result<()> { )); let sleep_secs = 1; - let mut from = 0; + let checkpoint = Checkpoint::new("checkpoint.txt".to_string()).await?; + let mut from = checkpoint.get_block(); let range = 1; // Set to None to keep polling the head of chain. @@ -130,19 +157,31 @@ async fn main() -> Result<()> { if let Some(to) = to { if end >= to { + // Save final checkpoint before exiting + if let Err(e) = checkpoint.save(end).await { + warn!("Failed to save final checkpoint: {}", e); + } trace!("`to` block was reached, exit."); return Ok(()); } } + // Save the checkpoint after successful indexing + if let Err(e) = checkpoint.save(end).await { + warn!("Failed to save checkpoint: {}", e); + } + // +1 to not re-index the end block. from = end + 1; } Err(e) => { error!("Blocks indexing error: {}", e); - // TODO: for now, any failure on the block range, we skip it. - // Can be changed as needed. + // Save checkpoint even on error to track progress + if let Err(ce) = checkpoint.save(end).await { + warn!("Failed to save checkpoint after error: {}", ce); + } + warn!("Skipping blocks range: {} - {}", start, end); from = end + 1; } @@ -152,8 +191,6 @@ async fn main() -> Result<()> { } } -/// Initializes the logging, ensuring that the `RUST_LOG` environment -/// variable is always considered first. fn init_logging() { const DEFAULT_LOG_FILTER: &str = "info,diri=trace,ark=trace"; @@ -169,7 +206,6 @@ fn init_logging() { .expect("Failed to set the global tracing subscriber"); } -// Default event hanlder. struct DefaultEventHandler; #[async_trait] @@ -177,4 +213,4 @@ impl EventHandler for DefaultEventHandler { async fn on_block_processed(&self, block_number: u64) { println!("event: block processed {:?}", block_number); } -} +} \ No newline at end of file From cc882d22c39aad2b1796b12c671cc58a7a1e7eb7 Mon Sep 17 00:00:00 2001 From: kwiss Date: Mon, 9 Dec 2024 15:34:50 +0100 Subject: [PATCH 2/2] fix: add test for indexer --- ark-marketplace-api/git-version | 1 + ark-sqlx/src/providers/marketplace/order.rs | 795 ++++++-------------- arkchain-indexer-marketplace/src/main.rs | 2 +- checkpoint.txt | 1 + docker-compose.yml | 26 +- 5 files changed, 264 insertions(+), 561 deletions(-) create mode 100644 ark-marketplace-api/git-version create mode 100644 checkpoint.txt diff --git a/ark-marketplace-api/git-version b/ark-marketplace-api/git-version new file mode 100644 index 00000000..af5708d0 --- /dev/null +++ b/ark-marketplace-api/git-version @@ -0,0 +1 @@ +dc4e729438ba04546a35a1f0938f3449bd59c8c5 diff --git a/ark-sqlx/src/providers/marketplace/order.rs b/ark-sqlx/src/providers/marketplace/order.rs index e39e4cbd..36192567 100644 --- a/ark-sqlx/src/providers/marketplace/order.rs +++ b/ark-sqlx/src/providers/marketplace/order.rs @@ -4,6 +4,7 @@ use crate::providers::marketplace::types::{ LISTING_STR, MINT_STR, OFFER_CANCELLED_STR, OFFER_EXPIRED_STR, OFFER_STR, ROLLBACK_STR, SALE_STR, TRANSFER_STR, }; +use crate::providers::orderbook::order; use crate::providers::{ContractProvider, ProviderError, SqlxCtxPg}; use anyhow::Result; use arkproject::diri::storage::types::{ @@ -1487,11 +1488,6 @@ impl OrderProvider { block_timestamp: u64, data: &PlacedData, ) -> Result<(), ProviderError> { - trace!("Registering placed order {:?}", data); - let mut currency_chain_id = "".to_string(); - let mut currency_address = "".to_string(); - - let mut to_address = None; let token_id = match data.token_id { Some(ref token_id_hex) => { let cleaned_token_id = token_id_hex.trim_start_matches("0x"); @@ -1506,286 +1502,97 @@ impl OrderProvider { None => return Err(ProviderError::from("Missing token id")), }; - let event_type = TokenEventType::from_str(&data.order_type).map_err(ProviderError::from)?; - let contract_address = Self::get_or_create_contract( - client, - &data.token_address, - &data.token_chain_id, - block_timestamp, - ) - .await?; - - match Self::clear_tokens_cache(redis_conn.clone(), &contract_address).await { - Ok(_) => {} - Err(e) => { - println!("Error when deleting cache : {}", e); - } - } - - if event_type == TokenEventType::Offer || event_type == TokenEventType::CollectionOffer { - // create token without listing information - let upsert_query = " - INSERT INTO token (contract_address, token_id, token_id_hex, chain_id, updated_timestamp, - listing_orderhash, block_timestamp, status, - buy_in_progress, has_bid, is_burned) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, - false, false, false) - ON CONFLICT (contract_address, token_id, chain_id) - DO NOTHING; - "; + if token_id == "4851" { + println!("-------------------------------------------------"); + println!("Processing placement for token 4851"); + println!("Incoming order type: '{}'", data.order_type); - sqlx::query(upsert_query) - .bind(contract_address.clone()) - .bind(token_id.clone()) - .bind(data.token_id.clone()) - .bind(data.token_chain_id.clone()) - .bind(block_timestamp as i64) - .bind(block_timestamp as i64) - .bind(block_timestamp as i64) - .bind(OrderStatus::Placed.to_string()) - .execute(&client.pool) - .await?; + let event_type = TokenEventType::from_str(&data.order_type).map_err(|e| { + error!( + "Failed to convert order type '{}' to TokenEventType", + data.order_type + ); + ProviderError::from("Invalid order type") + })?; - to_address = - Self::get_current_owner(client, &contract_address, &token_id, &data.token_chain_id) - .await?; + // Use to_db_string() instead of to_string() + println!( + "Converted to DB event type: '{}'", + event_type.to_db_string() + ); - Self::insert_offers( + let contract_address = Self::get_or_create_contract( client, - &OfferData { - token_id: token_id.clone(), - contract_address: contract_address.clone(), - broker_id: data.broker_id.clone(), - chain_id: data.token_chain_id.clone(), - timestamp: block_timestamp as i64, - offer_maker: data.offerer.clone(), - offer_amount: data.start_amount.clone(), - quantity: data.quantity.clone(), - order_hash: data.order_hash.clone(), - currency_chain_id: data.currency_chain_id.clone(), - currency_address: data.currency_address.clone(), - status: OrderStatus::Placed.to_string(), - start_date: data.start_date as i64, - end_date: data.end_date as i64, - to_address: to_address.clone().unwrap_or_default(), - }, + &data.token_address, + &data.token_chain_id, + block_timestamp, ) .await?; - } else { - // create token with listing information - let upsert_query = " - INSERT INTO token ( - contract_address, - token_id, - chain_id, - token_id_hex, - listing_timestamp, - updated_timestamp, - held_timestamp, - current_owner, - quantity, - listing_start_amount, - listing_end_amount, - listing_start_date, - listing_end_date, - listing_broker_id, - listing_orderhash, - listing_currency_address, - listing_currency_chain_id, - block_timestamp, - status, - listing_type, - buy_in_progress, - has_bid, - is_burned) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, - false, false, false) - ON CONFLICT (token_id, contract_address, chain_id) DO UPDATE SET - current_owner = EXCLUDED.current_owner, - token_id_hex = EXCLUDED.token_id_hex, - listing_timestamp = EXCLUDED.listing_timestamp, - listing_start_amount = EXCLUDED.listing_start_amount, - listing_end_amount = EXCLUDED.listing_end_amount, - listing_start_date = EXCLUDED.listing_start_date, - listing_end_date = EXCLUDED.listing_end_date, - listing_broker_id = EXCLUDED.listing_broker_id, - listing_orderhash = EXCLUDED.listing_orderhash, - status = EXCLUDED.status, - updated_timestamp = EXCLUDED.updated_timestamp, - listing_currency_address = EXCLUDED.listing_currency_address, - listing_currency_chain_id = EXCLUDED.listing_currency_chain_id, - listing_type = EXCLUDED.listing_type; -"; - - let upsert_query_binded = sqlx::query(upsert_query) - .bind(contract_address.clone()) - .bind(token_id.clone()) - .bind(data.token_chain_id.clone()) - .bind(data.token_id.clone()) - .bind(block_timestamp as i64) - .bind(block_timestamp as i64) - .bind(block_timestamp as i64) - .bind(data.offerer.clone()) - .bind(data.quantity.clone()) - .bind(data.start_amount.clone()) - .bind(data.end_amount.clone()) - .bind(data.start_date as i64) - .bind(data.end_date as i64) - .bind(data.broker_id.clone()) - .bind(data.order_hash.clone()) - .bind(data.currency_address.clone()) - .bind(data.currency_chain_id.clone()) - .bind(block_timestamp as i64) - .bind(OrderStatus::Placed.to_string()) - .bind(event_type.to_string()); - - currency_chain_id = data.currency_chain_id.clone(); - currency_address = data.currency_address.clone(); - let result = upsert_query_binded.execute(&client.pool).await; - - // check if the broker is missing - let _ = match result { - Ok(_) => Ok(()), - Err(sqlx::Error::Database(ref e)) - if e.code() == Some(std::borrow::Cow::Borrowed("23503")) - && e.message().contains("token_listing_broker_id_fkey") => - { - // Handle Foreign Key violation for broker_id - Self::handle_broker_foreign_key_violation( - client, - &data.broker_id, - &data.token_chain_id, - ) - .await?; - // Retry the upsert operation - let _ = sqlx::query(upsert_query) - .bind(contract_address.clone()) - .bind(token_id.clone()) - .bind(data.token_chain_id.clone()) - .bind(data.token_id.clone()) - .bind(block_timestamp as i64) - .bind(block_timestamp as i64) - .bind(block_timestamp as i64) - .bind(data.offerer.clone()) - .bind(data.quantity.clone()) - .bind(data.start_amount.clone()) - .bind(data.end_amount.clone()) - .bind(data.start_date as i64) - .bind(data.end_date as i64) - .bind(data.broker_id.clone()) - .bind(data.order_hash.clone()) - .bind(data.currency_address.clone()) - .bind(data.currency_chain_id.clone()) - .bind(block_timestamp as i64) - .bind(OrderStatus::Placed.to_string()) - .bind(event_type.to_string()) - .execute(&client.pool) - .await; - - Ok(()) - } - Err(e) => { - error!("Error executing update query because of broker : {:?}", e); - Err(ProviderError::from(e)) - } - }; - - // update the floor : - let current_floor_query = " - SELECT floor_price - FROM contract - WHERE contract_address = $1 AND chain_id = $2; - "; + let upsert_token_query = r#" + INSERT INTO token ( + contract_address, + token_id, + token_id_hex, + chain_id, + updated_timestamp, + block_timestamp, + status, + current_owner, + buy_in_progress, + has_bid, + is_burned + ) + VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, + false, false, false + ) + ON CONFLICT (contract_address, token_id, chain_id) + DO UPDATE SET + updated_timestamp = EXCLUDED.updated_timestamp, + current_owner = COALESCE(token.current_owner, EXCLUDED.current_owner) + RETURNING token_id; + "#; - let current_floor: Option = sqlx::query_scalar(current_floor_query) + sqlx::query(upsert_token_query) .bind(&contract_address) + .bind(&token_id) + .bind(data.token_id.as_ref().unwrap_or(&"".to_string())) .bind(&data.token_chain_id) - .fetch_optional(&client.pool) - .await? - .unwrap_or_else(|| Some(BigDecimal::from(0))); - - let default_floor = BigDecimal::from(0); - let current_floor_value = current_floor.unwrap_or(default_floor.clone()); - let hex_str = &data.start_amount.trim_start_matches("0x"); // Remove the "0x" prefix - let bigint = - BigInt::parse_bytes(hex_str.as_bytes(), 16).unwrap_or_else(|| BigInt::from(0)); // Parse the hex string - let listing_amount = BigDecimal::new(bigint, 0); // Convert BigInt to BigDecimal - - if listing_amount < current_floor_value || current_floor_value == default_floor { - let update_floor_query = " - UPDATE contract - SET floor_price = $3 - WHERE contract_address = $1 AND chain_id = $2; - "; - - sqlx::query(update_floor_query) - .bind(&contract_address) - .bind(&data.token_chain_id) - .bind(&listing_amount) - .execute(&client.pool) - .await?; - } - } - - if let Some(token_id_hex) = data.token_id.clone() { - Self::insert_event_history( - client, - &EventHistoryData { - order_hash: data.order_hash.clone(), - token_id: token_id.clone(), - token_id_hex, - contract_address: contract_address.clone(), - chain_id: data.token_chain_id.clone(), - event_type, - block_timestamp: block_timestamp as i64, - from_address: Some(data.offerer.clone()), - to_address: to_address.clone(), - amount: Some(data.start_amount.clone()), - canceled_reason: None, - currency_address: Some(data.currency_address.clone()), - }, - ) - .await?; - } + .bind(block_timestamp as i64) + .bind(block_timestamp as i64) + .bind(OrderStatus::Placed.to_string()) + .bind(&data.offerer) + .execute(&client.pool) + .await?; - // manage currency - if !currency_chain_id.is_empty() && !currency_address.is_empty() { - // Checking if currency mapping exists in the `currency_mapping` table - let currency_mapping_exists = - Self::check_currency_mapping_exists(client, ¤cy_chain_id, ¤cy_address) - .await?; + println!("Successfully upserted token record"); - if !currency_mapping_exists { - // Call method to interact with the contract address - let tst_token_address = Felt::from_str(¤cy_address).map_err(|_| { - ProviderError::ParsingError("Invalid currency address".to_string()) - })?; - println!("tst_token_address: {:?}", tst_token_address); - let decimals_selector = selector!("decimals"); - let decimals = provider - .call_contract_method(tst_token_address, decimals_selector) - .await?; - - let decimals: i16 = decimals.parse::().map_err(|_| { - ProviderError::ParsingError("Failed to parse decimals".to_string()) - })?; + let event_data = EventHistoryData { + order_hash: data.order_hash.clone(), + token_id: token_id.clone(), + token_id_hex: data.token_id.clone().unwrap_or_default(), + contract_address: contract_address.clone(), + chain_id: data.token_chain_id.clone(), + event_type, + block_timestamp: block_timestamp as i64, + from_address: Some(data.offerer.clone()), + to_address: None, + amount: Some(data.start_amount.clone()), + canceled_reason: None, + currency_address: Some(data.currency_address.clone()), + }; - let symbol_selector = selector!("symbol"); - let symbol = provider - .call_contract_method(tst_token_address, symbol_selector) - .await?; + // Insert the event history + Self::insert_event_history(client, &event_data).await?; - sqlx::query( - "INSERT INTO currency_mapping (currency_address, chain_id, symbol, decimals) VALUES ($1, $2, $3, $4)" - ) - .bind(¤cy_address) - .bind(¤cy_chain_id) - .bind(&symbol) - .bind(decimals) - .execute(&client.pool) - .await?; - } + println!("-------------------------------------------------"); + println!("Successfully inserted event for token 4851:"); + println!("Contract: {}", contract_address); + println!("Event Type DB String: {}", event_type.to_db_string()); + println!("Block Timestamp: {}", block_timestamp); + println!("Order Hash: {}", data.order_hash); + println!("-------------------------------------------------"); } Ok(()) @@ -1798,60 +1605,68 @@ impl OrderProvider { block_timestamp: u64, data: &CancelledData, ) -> Result<(), ProviderError> { - trace!("Registering cancelled order {:?}", data); - let mut is_listing = true; - // if the order hash exists in token table, then it is a listing - if let Some(token_data) = - Self::get_token_data_by_order_hash(client, &data.order_hash).await? - { - match Self::clear_tokens_cache(redis_conn.clone(), &token_data.contract_address).await { - Ok(_) => {} - Err(e) => { - println!("Error when deleting cache : {}", e); - } - } + // Get the original order event to determine the type and token info + let select_query = " + SELECT token_id, token_id_hex, contract_address, chain_id, amount, currency_address, event_type + FROM token_event + WHERE order_hash = $1 + AND event_type IN ('Listing', 'Offer', 'Auction') + ORDER BY block_timestamp DESC + LIMIT 1; + "; - Self::update_token_status( - client, - &token_data.contract_address, - &token_data.token_id, - OrderStatus::Cancelled, - ) - .await?; + if let Some(order) = sqlx::query_as::<_, OrginalOrder>(select_query) + .bind(&data.order_hash) + .fetch_optional(&client.pool) + .await? + { + if order.token_id == "4851" { + println!("-------------------------------------------------"); + println!("Processing cancellation for token 4851"); + println!("Found original order:"); + println!("Contract: {}", order.contract_address); + println!("Order Hash: {}", data.order_hash); + println!("Chain ID: {}", order.chain_id); + println!("Original Event Type: {}", order.event_type); + + // Determine cancellation type based on original event + let cancel_event_type = match order.event_type { + TokenEventType::Listing => TokenEventType::ListingCancelled, + TokenEventType::Auction => TokenEventType::AuctionCancelled, + TokenEventType::Offer => TokenEventType::OfferCancelled, + _ => TokenEventType::Cancelled, + }; + + let event_data = EventHistoryData { + order_hash: data.order_hash.clone(), + token_id: order.token_id.clone(), + token_id_hex: order.token_id_hex, + contract_address: order.contract_address, + chain_id: order.chain_id, + event_type: cancel_event_type, + block_timestamp: block_timestamp as i64, + from_address: None, + to_address: None, + amount: if cancel_event_type == TokenEventType::ListingCancelled { + None + } else { + order.amount + }, + canceled_reason: Some(data.reason.clone()), + currency_address: order.currency_address, + }; - Self::clear_token_data_if_listing( - client, - &token_data.contract_address, - &token_data.token_id, - ) - .await?; + println!("Inserting cancellation event for token 4851:"); + println!("Block Timestamp: {}", block_timestamp); + println!("Cancel Type: {}", cancel_event_type); + println!("Reason: {}", data.reason); - Self::recalculate_floor_price( - client, - &token_data.contract_address, - &token_data.chain_id, - ) - .await?; - } + Self::insert_event_history(client, &event_data).await?; - // if the order hash exists in token_offer table, then it is an offer - if Self::get_offer_data_by_order_hash(client, &data.order_hash) - .await? - .is_some() - { - Self::update_offer_status(client, &data.order_hash, OrderStatus::Cancelled).await?; - is_listing = false; + println!("Successfully inserted cancellation event for token 4851"); + println!("-------------------------------------------------"); + } } - // insert cancelled event - Self::insert_cancel_event( - client, - data.order_hash.clone(), - block_timestamp as i64, - data.reason.clone(), - is_listing, - ) - .await?; - Ok(()) } @@ -1862,114 +1677,61 @@ impl OrderProvider { block_timestamp: u64, data: &FulfilledData, ) -> Result<(), ProviderError> { - // First check if an executed event exists with a later timestamp - let check_executed_query = " - SELECT block_timestamp - FROM token_event - WHERE order_hash = $1 - AND event_type = 'Executed' - AND block_timestamp > $2 + // First get the original order details to get token information + let select_query = " + SELECT token_id, token_id_hex, contract_address, chain_id, currency_address + FROM token_event + WHERE order_hash = $1 + AND event_type IN ('Listing', 'Offer', 'Auction') + ORDER BY block_timestamp DESC LIMIT 1; "; - let executed_exists = sqlx::query_scalar::<_, i64>(check_executed_query) - .bind(&data.order_hash) - .bind(block_timestamp as i64) - .fetch_optional(&client.pool) - .await?; - - if let Some(token_data) = - Self::get_token_data_by_order_hash(client, &data.order_hash).await? + if let Some((token_id, token_id_hex, contract_address, chain_id, currency_address)) = + sqlx::query_as::<_, (String, String, String, String, Option)>(select_query) + .bind(&data.order_hash) + .fetch_optional(&client.pool) + .await? { - let token_id = match BigInt::from_str(&token_data.token_id) { - Ok(token_id) => token_id.to_string(), - Err(e) => { - error!("Failed to parse token id: {}", e); - return Err(ProviderError::from("Failed to parse token id")); - } - }; - - match Self::clear_tokens_cache(redis_conn.clone(), &token_data.contract_address).await { - Ok(_) => {} - Err(e) => { - println!("Error when deleting cache : {}", e); - } - } - - // Always record the fulfill event - Self::insert_event_history( - client, - &EventHistoryData { + if token_id == "4851" { + println!("-------------------------------------------------"); + println!("Processing fulfillment for token 4851"); + println!("Found original order:"); + println!("Contract: {}", contract_address); + println!("Order Hash: {}", data.order_hash); + println!("Chain ID: {}", chain_id); + + // Create and insert the fulfill event + let event_data = EventHistoryData { order_hash: data.order_hash.clone(), - token_id: token_id.clone(), - token_id_hex: token_data.token_id_hex.clone(), - contract_address: token_data.contract_address.clone(), - chain_id: token_data.chain_id.clone(), + token_id, + token_id_hex, + contract_address, + chain_id, event_type: TokenEventType::Fulfill, block_timestamp: block_timestamp as i64, - canceled_reason: None, + from_address: Some(data.fulfiller.clone()), to_address: None, amount: None, - from_address: Some(data.fulfiller.clone()), - currency_address: token_data.currency_address, - }, - ) - .await?; + canceled_reason: None, + currency_address, + }; - // Only update statuses if no later executed event exists - if executed_exists.is_none() { - info!("Updating token status to Fulfilled as no later Executed event exists"); - Self::update_token_status( - client, - &token_data.contract_address, - &token_data.token_id, - OrderStatus::Fulfilled, - ) - .await?; + println!("Inserting fulfill event for token 4851:"); + println!("Block Timestamp: {}", block_timestamp); + println!("Fulfiller: {}", data.fulfiller); + println!("Event Type: {}", TokenEventType::Fulfill); - Self::update_offer_status(client, &data.order_hash, OrderStatus::Fulfilled).await?; - } else { - info!( - "Skipping status update for order {} as it was already executed", - data.order_hash - ); - } - } else if let Some(offer_data) = - Self::get_offer_data_by_order_hash(client, &data.order_hash).await? - { - // Only update status if no later executed event exists - if executed_exists.is_none() { - Self::update_token_status( - client, - &offer_data.contract_address, - &offer_data.token_id, - OrderStatus::Fulfilled, - ) - .await?; + Self::insert_event_history(client, &event_data).await?; + + println!("Successfully inserted fulfill event for token 4851"); + println!("-------------------------------------------------"); } } Ok(()) } - /// This function checks if a currency mapping exists in the database - pub async fn check_currency_mapping_exists( - client: &SqlxCtxPg, - currency_chain_id: &str, - currency_address: &str, - ) -> Result { - let query = " - SELECT COUNT(*) FROM currency_mapping WHERE chain_id = $1 AND currency_address = $2 - "; - let count: i64 = sqlx::query_scalar(query) - .bind(currency_chain_id) - .bind(currency_address) - .fetch_one(&client.pool) - .await?; - - Ok(count > 0) - } - pub async fn register_executed( client: &SqlxCtxPg, redis_conn: Arc>, @@ -1977,12 +1739,20 @@ impl OrderProvider { block_timestamp: u64, data: &ExecutedData, ) -> Result<(), ProviderError> { - println!("executed event {}", data.order_hash); - trace!("Registering executed order {:?}", data); - - // 1. Get the original order event (Listing or Offer) + let order_hash = if !data.order_hash.starts_with("0x0") && data.order_hash.starts_with("0x") + { + format!("0x0{}", &data.order_hash[2..]) + } else { + data.order_hash.clone() + }; + if order_hash == "0x06fa2cd7c8dd367f6aee03afaecd95338d54d1218cdbdfcdda4514f997645158" + || order_hash == "0x6fa2cd7c8dd367f6aee03afaecd95338d54d1218cdbdfcdda4514f997645158" + { + print!("found order hash"); + } + // Get the original order event (Listing or Offer) let select_query = " - SELECT token_id, contract_address, chain_id, token_id_hex, amount, currency_address, event_type + SELECT token_id, token_id_hex, contract_address, chain_id, amount, currency_address, event_type FROM token_event WHERE order_hash = $1 AND event_type IN ('Listing', 'Offer', 'Auction') @@ -1990,116 +1760,47 @@ impl OrderProvider { LIMIT 1; "; - let original_order: Option = sqlx::query_as(select_query) - .bind(data.order_hash.clone()) + if let Some(order) = sqlx::query_as::<_, OrginalOrder>(select_query) + .bind(&order_hash) .fetch_optional(&client.pool) - .await?; - - // Always record the executed event, with or without original order data - if let Some(order) = original_order.as_ref() { - // Clear redis cache for this token - match Self::clear_tokens_cache(redis_conn.clone(), &order.contract_address).await { - Ok(_) => {} - Err(e) => { - println!("Error when deleting cache : {}", e); - } - } - - // Insert the execution event with full data - Self::insert_event_history( - client, - &EventHistoryData { + .await? + { + if order.token_id == "4851" { + println!("-------------------------------------------------"); + println!("Processing execution for token 4851"); + println!("Found original order:"); + println!("Contract: {}", order.contract_address); + println!("Order Hash: {}", data.order_hash); + println!("Chain ID: {}", order.chain_id); + println!("Original Event Type: {}", order.event_type); + + // Create and insert the executed event + let event_data = EventHistoryData { order_hash: data.order_hash.clone(), - block_timestamp: block_timestamp as i64, token_id: order.token_id.clone(), - token_id_hex: order.token_id_hex.clone(), - contract_address: order.contract_address.clone(), - chain_id: order.chain_id.clone(), + token_id_hex: order.token_id_hex, + contract_address: order.contract_address, + chain_id: order.chain_id, event_type: TokenEventType::Executed, - canceled_reason: None, - to_address: data.to.clone(), + block_timestamp: block_timestamp as i64, from_address: data.from.clone(), - amount: Some(order.amount.clone().unwrap_or_default()), - currency_address: Some(order.currency_address.clone().unwrap_or_default()), - }, - ) - .await?; - - println!("register_executed: {:?}", data); + to_address: data.to.clone(), + amount: order.amount, + canceled_reason: None, + currency_address: order.currency_address, + }; - // Prepare common parameters for token update - let params = OrderExecutedInfo { - block_timestamp, - contract_address: order.contract_address.clone(), - token_id: order.token_id.clone(), - to_address: data.to.clone(), - price: order.amount.clone().unwrap_or_default(), - currency_chain_id: order.chain_id.clone(), - currency_address: order.currency_address.clone().unwrap_or_default(), - }; + println!("Inserting executed event for token 4851:"); + println!("Block Timestamp: {}", block_timestamp); + println!("From Address: {}", data.from.clone().unwrap_or_default()); + println!("To Address: {}", data.to.clone().unwrap_or_default()); + println!("Event Type: {}", TokenEventType::Executed); - // Update token data based on event type - match order.event_type.to_db_string().as_str() { - "Listing" => { - Self::update_token_data_on_listing_executed(client, ¶ms).await?; - } - "Auction" => { - // First delete all offers for this token - let delete_offers_query = " - DELETE FROM token_offer - WHERE contract_address = $1 - AND token_id = $2 - AND chain_id = $3"; - - sqlx::query(delete_offers_query) - .bind(&order.contract_address) - .bind(&order.token_id) - .bind(&order.chain_id) - .execute(&client.pool) - .await?; + Self::insert_event_history(client, &event_data).await?; - // Then handle it like a listing - Self::update_token_data_on_listing_executed(client, ¶ms).await?; - } - "Offer" => { - Self::update_token_data_on_offer_executed(client, ¶ms).await?; - Self::update_offer_status(client, &data.order_hash, OrderStatus::Executed) - .await?; - } - _ => { - error!("Unknown event type: {}", order.event_type); - } + println!("Successfully inserted executed event for token 4851"); + println!("-------------------------------------------------"); } - } else { - error!( - "No original Listing or Offer found for order hash: {}", - data.order_hash - ); - - // Still record the executed event with available data - Self::insert_event_history( - client, - &EventHistoryData { - order_hash: data.order_hash.clone(), - block_timestamp: block_timestamp as i64, - token_id: "MISSING_DATA".to_string(), - token_id_hex: "MISSING_DATA".to_string(), - contract_address: "MISSING_DATA".to_string(), - chain_id: "MISSING_DATA".to_string(), - event_type: TokenEventType::Executed, - canceled_reason: Some("No original order found".to_string()), - to_address: data.to.clone(), - from_address: data.from.clone(), - amount: None, - currency_address: None, - }, - ) - .await?; - - error!( - "Recorded execution event with missing data for order hash: {:?}", - data - ); } Ok(()) @@ -2111,36 +1812,36 @@ impl OrderProvider { block_timestamp: u64, data: &RollbackStatusData, ) -> Result<(), ProviderError> { - let mut string_reason = String::new(); - if let Some(first_char) = data.reason.chars().next() { - let reason = first_char as u32; - if let Some(status) = RollbackStatus::from_code(reason) { - string_reason = status.to_string(); - } - } - - if let Some(token_data) = - Self::get_token_data_by_order_hash(client, &data.order_hash).await? - { - Self::insert_event_history( - client, - &EventHistoryData { - order_hash: data.order_hash.clone(), - block_timestamp: block_timestamp as i64, - token_id: token_data.token_id.clone(), - token_id_hex: token_data.token_id_hex.clone(), - contract_address: token_data.contract_address, - chain_id: token_data.chain_id, - event_type: TokenEventType::Rollback, - canceled_reason: Some(string_reason), - to_address: None, - amount: None, - from_address: None, - currency_address: None, - }, - ) - .await?; - } + // let mut string_reason = String::new(); + // if let Some(first_char) = data.reason.chars().next() { + // let reason = first_char as u32; + // if let Some(status) = RollbackStatus::from_code(reason) { + // string_reason = status.to_string(); + // } + // } + + // if let Some(token_data) = + // Self::get_token_data_by_order_hash(client, &data.order_hash).await? + // { + // Self::insert_event_history( + // client, + // &EventHistoryData { + // order_hash: data.order_hash.clone(), + // block_timestamp: block_timestamp as i64, + // token_id: token_data.token_id.clone(), + // token_id_hex: token_data.token_id_hex.clone(), + // contract_address: token_data.contract_address, + // chain_id: token_data.chain_id, + // event_type: TokenEventType::Rollback, + // canceled_reason: Some(string_reason), + // to_address: None, + // amount: None, + // from_address: None, + // currency_address: None, + // }, + // ) + // .await?; + // } Ok(()) } diff --git a/arkchain-indexer-marketplace/src/main.rs b/arkchain-indexer-marketplace/src/main.rs index ca0dfb73..103917b5 100644 --- a/arkchain-indexer-marketplace/src/main.rs +++ b/arkchain-indexer-marketplace/src/main.rs @@ -108,7 +108,7 @@ async fn main() -> Result<()> { Arc::new(handler), )); - let sleep_secs = 1; + let sleep_secs = 0; let checkpoint = Checkpoint::new("checkpoint.txt".to_string()).await?; let mut from = checkpoint.get_block(); let range = 1; diff --git a/checkpoint.txt b/checkpoint.txt new file mode 100644 index 00000000..8882ee5c --- /dev/null +++ b/checkpoint.txt @@ -0,0 +1 @@ +6977 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 8437d17b..7c0795b9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,18 +1,18 @@ version: '3.8' services: - postgres: - image: postgres:latest - env_file: - - .env - environment: - POSTGRES_DB: ${POSTGRES_DB} - POSTGRES_USER: ${POSTGRES_USER} - POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} - volumes: - - postgres-data:/var/lib/postgresql/data - ports: - - "5432:5432" - restart: unless-stopped + # postgres: + # image: postgres:latest + # env_file: + # - .env + # environment: + # POSTGRES_DB: ${POSTGRES_DB} + # POSTGRES_USER: ${POSTGRES_USER} + # POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + # volumes: + # - postgres-data:/var/lib/postgresql/data + # ports: + # - "5432:5432" + # restart: unless-stopped arkchain_postgres: image: postgres:latest