Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ impl MatchingEngineExecutor {
order_id: OrderId,
) -> Result<(), MatchingEngineError> {
info!("Running internal matching engine on order {order_id}");

// Skip matching if the account has tasks in flight
let queue_len = self.state.serial_tasks_queue_len(&account_id).await?;
if queue_len > 0 {
info!(
"account {account_id} has tasks in flight (queue_len={queue_len}), \
skipping internal matching engine for {order_id}"
);
return Ok(());
}

// Lookup the order, matchable amount, and matching pool
let (order, matchable_amount) = self.fetch_order_and_matchable_amount(&order_id).await?;
let matching_pool = self.fetch_matching_pool(&order_id).await?;
Expand Down Expand Up @@ -85,6 +96,16 @@ impl MatchingEngineExecutor {
let account_id = self.get_account_id_for_order(&user_order).await?;
let other_account_id = self.get_account_id_for_order(&match_result.other_order_id).await?;

// Skip settlement if the counterparty account has tasks in flight
let other_queue_len = self.state.serial_tasks_queue_len(&other_account_id).await?;
if other_queue_len > 0 {
info!(
"counterparty account {other_account_id} has tasks in flight \
(queue_len={other_queue_len}), skipping settlement for {user_order}"
);
return Ok(());
}

let order0_ring = self.get_order_ring(&user_order).await?;
let order1_ring = self.get_order_ring(&match_result.other_order_id).await?;
let use_private = PrivacyRing::supports_private_settlement(order0_ring, order1_ring);
Expand Down Expand Up @@ -240,6 +261,7 @@ mod tests {
order::{Order, mocks::mock_order},
order_auth::mocks::mock_order_auth,
};
use types_tasks::{RefreshAccountTaskDescriptor, TaskDescriptor};

async fn mock_executor(state: State) -> MatchingEngineExecutor {
let (_job_queue, job_receiver) = new_matching_engine_worker_queue();
Expand Down Expand Up @@ -288,6 +310,24 @@ mod tests {
(executor, state, account.id, order, matchable_amount)
}

#[tokio::test]
async fn test_pre_match_skips_when_own_queue_busy() {
let (executor, state, account_id, order, _matchable_amount) =
setup_executor_with_order().await;

// Enqueue a task so the serial queue is non-empty
let keychain = state.get_account_keychain(&account_id).await.unwrap().unwrap();
let descriptor =
TaskDescriptor::from(RefreshAccountTaskDescriptor::new(account_id, keychain));
let (_task_id, waiter) =
state.enqueue_preemptive_task(vec![account_id], descriptor, true).await.unwrap();
waiter.await.unwrap();

// Should return Ok(()) without attempting a match
let result = executor.run_internal_matching_engine(account_id, order.id).await;
assert!(result.is_ok());
}

#[tokio::test]
async fn test_order_still_valid_false_when_order_removed() {
let (executor, state, account_id, order, attempted_matchable_amount) =
Expand Down
Loading