From 83d7b47c16670146666c5ac563dfce8598cf4fa8 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Tue, 1 Apr 2025 08:34:44 +1100 Subject: [PATCH 1/4] chore: first steps to generic backpressure logic --- .../examples/messenger_using_kafka.rs | 6 +- .../src/kafka/context.rs | 14 +- .../src/kafka/service.rs | 5 +- .../src/messenger_with_kafka.rs | 13 +- packages/talos_messenger_core/src/errors.rs | 2 +- .../src/services/inbound_service.rs | 331 +++++++++++++----- .../talos_messenger_core/src/services/mod.rs | 2 +- packages/talos_messenger_core/src/suffix.rs | 33 +- .../src/tests/messenger_inbound_service.rs | 279 ++++++++++++++- .../src/tests/test_utils.rs | 49 +-- 10 files changed, 614 insertions(+), 120 deletions(-) diff --git a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs index ad3de29c..040456bc 100644 --- a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs +++ b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs @@ -1,6 +1,9 @@ use talos_common_utils::env_var; use talos_messenger_actions::messenger_with_kafka::{messenger_with_kafka, Configuration}; -use talos_messenger_core::utlis::{create_whitelist_actions_from_str, ActionsParserConfig}; +use talos_messenger_core::{ + services::TalosBackPressureConfig, + utlis::{create_whitelist_actions_from_str, ActionsParserConfig}, +}; use talos_rdkafka_utils::kafka_config::KafkaConfig; use talos_suffix::core::SuffixConfig; @@ -40,6 +43,7 @@ async fn main() { channel_buffers: None, commit_size: Some(2_000), commit_frequency: None, + back_pressure_config: Some(TalosBackPressureConfig::new(Some(40), Some(50))), }; messenger_with_kafka(config).await.unwrap(); diff --git a/packages/talos_messenger_actions/src/kafka/context.rs b/packages/talos_messenger_actions/src/kafka/context.rs index 79795cc1..5f04ba3a 100644 --- a/packages/talos_messenger_actions/src/kafka/context.rs +++ b/packages/talos_messenger_actions/src/kafka/context.rs @@ -1,5 +1,5 @@ use futures_executor::block_on; -use log::error; +use log::{debug, error}; use rdkafka::{producer::ProducerContext, ClientContext}; use talos_messenger_core::{core::MessengerChannelFeedback, errors::MessengerActionError}; use tokio::sync::mpsc; @@ -31,6 +31,12 @@ impl ProducerContext for MessengerProducerContext { // Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown. if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) { error!("[Messenger Producer Context] Error sending feedback for version={version} with error={error:?}"); + } else { + debug!( + "Finished sending success feedback from producer contextfor an action from version = {}. Total feedbacks remaining to process = {} ", + delivery_opaque.version, + self.tx_feedback_channel.max_capacity() - self.tx_feedback_channel.capacity() + ); }; } Err((publish_error, borrowed_message)) => { @@ -51,6 +57,12 @@ impl ProducerContext for MessengerProducerContext { Box::new(messenger_error), ))) { error!("[Messenger Producer Context] Error sending error feedback for version={version} with \npublish_error={publish_error:?} \nchannel send_error={send_error:?}"); + } else { + debug!( + "Finshed sending error feedback send from producer context for an action from version = {}. Total feedbacks remaining to process = {} ", + delivery_opaque.version, + self.tx_feedback_channel.max_capacity() - self.tx_feedback_channel.capacity() + ); }; } } diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index b823c974..32bfee8a 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -37,10 +37,13 @@ where commit_actions, headers, } = actions; - if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()) { match get_actions_deserialised::>(publish_actions_for_type) { Ok(actions) => { + debug!( + "Sending actions for version = {version} \n| Remaining messages in the channel = {}", + self.rx_actions_channel.max_capacity() - self.rx_actions_channel.capacity() + ); let total_len = actions.len() as u32; let headers_cloned = headers.clone(); diff --git a/packages/talos_messenger_actions/src/messenger_with_kafka.rs b/packages/talos_messenger_actions/src/messenger_with_kafka.rs index d4637e0a..b8080814 100644 --- a/packages/talos_messenger_actions/src/messenger_with_kafka.rs +++ b/packages/talos_messenger_actions/src/messenger_with_kafka.rs @@ -7,7 +7,7 @@ use talos_certifier_adapters::KafkaConsumer; use talos_messenger_core::{ core::{MessengerPublisher, PublishActionType}, errors::{MessengerServiceError, MessengerServiceErrorKind, MessengerServiceResult}, - services::{MessengerInboundService, MessengerInboundServiceConfig}, + services::{MessengerInboundService, MessengerInboundServiceConfig, TalosBackPressureConfig}, suffix::MessengerCandidate, talos_messenger_service::TalosMessengerService, }; @@ -110,6 +110,8 @@ pub struct Configuration { pub commit_size: Option, /// Commit issuing frequency. pub commit_frequency: Option, + /// messenger can receive more messages. + pub back_pressure_config: Option, } pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult { @@ -135,7 +137,14 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu // START - Inbound service let suffix: Suffix = Suffix::with_config(config.suffix_config.unwrap_or_default()); let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size, config.commit_frequency); - let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config); + let inbound_service = MessengerInboundService::new( + kafka_consumer, + tx_actions_channel, + rx_feedback_channel, + suffix, + inbound_service_config, + config.back_pressure_config.unwrap_or_default(), + ); // END - Inbound service // START - Publish service diff --git a/packages/talos_messenger_core/src/errors.rs b/packages/talos_messenger_core/src/errors.rs index 824acc84..ed35ef07 100644 --- a/packages/talos_messenger_core/src/errors.rs +++ b/packages/talos_messenger_core/src/errors.rs @@ -1,6 +1,6 @@ use thiserror::Error as ThisError; -pub type MessengerServiceResult = Result<(), MessengerServiceError>; +pub type MessengerServiceResult = Result; #[derive(Debug, PartialEq, Clone)] pub enum MessengerActionErrorKind { diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 7d00d6dd..71ab2804 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -4,7 +4,12 @@ use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; use log::{debug, error, info, warn}; -use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage}; +use talos_certifier::{ + core::{CandidateChannelMessage, DecisionChannelMessage}, + model::DecisionMessageTrait, + ports::{errors::MessageReceiverError, MessageReciever}, + ChannelMessage, +}; use talos_suffix::{core::SuffixMeta, Suffix, SuffixTrait}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tokio::{ @@ -33,7 +38,6 @@ pub struct MessengerInboundServiceConfig { /// The allowed on_commit actions allowed_actions: HashMap>, } - impl MessengerInboundServiceConfig { pub fn new(allowed_actions: HashMap>, commit_size: Option, commit_frequency: Option) -> Self { Self { @@ -59,6 +63,89 @@ where last_committed_version: u64, /// The next version ready to be send for commit. next_version_to_commit: u64, + // Flag denotes if back pressure is enabled, and therefore the thread cannot receive more messages (candidate/decisions) to process. + // enable_back_pressure: bool, + /// Number of items currently in progress. Which denotes, the number of items in suffix in `SuffixItemState::Processing` or `SuffixItemState::PartiallyComplete`. + in_progress_count: u32, + /// Configs to check back pressure. + back_pressure_configs: TalosBackPressureConfig, +} + +#[derive(Debug, Default, Clone)] +pub struct TalosBackPressureConfig { + /// Flag denotes if back pressure is enabled, and therefore the thread cannot receive more messages (candidate/decisions) to process. + pub is_enabled: bool, + /// Max count before back pressue is enabled. + /// if `None`, back pressure logic will not apply. + max_threshold: Option, + /// `min_threshold` helps to prevent immediate toggle between switch on and off of the backpressure. + /// Batch of items to process, when back pressure is enabled before disable logic is checked? + /// if None, no minimum check is done, and as soon as the count is below the max_threshold, back pressure is disabled. + min_threshold: Option, +} + +impl TalosBackPressureConfig { + pub fn new(min_threshold: Option, max_threshold: Option) -> Self { + assert!( + min_threshold.le(&max_threshold), + "min_threshold ({min_threshold:?}) must be less than max_threshold ({max_threshold:?})" + ); + Self { + max_threshold, + min_threshold, + is_enabled: false, + } + } + + /// Get the remaining available count before hitting the max_threshold, and thereby enabling back pressure. + pub fn get_remaining_count(&self, current_count: u32) -> Option { + self.max_threshold.map(|max| max.saturating_sub(current_count)) + } + + /// Looks at the `max_threshold` and `min_threshold` to determine if back pressure should be enabled. `max_threshold` is used to determine when to enable the back-pressure + /// whereas, `min_threshold` is used to look at the lower bound + /// - `max_threshold` - Use to determine the upper bound of maximum items allowed. + /// If this is set to `None`, no back pressure will be applied. + /// `max_threshold` is + /// - `min_threshold` - Use to determine the lower bound of maximum items allowed. If this is set to `None`, no back pressure will be applied. + pub fn should_enable_back_pressure(&mut self, current_count: u32) -> bool { + match self.max_threshold { + Some(max_threshold) => { + // if not enabled, only check against the max_threshold. + if current_count >= max_threshold { + true + } else { + // if already enabled, check when is it safe to remove. + + // If there is Some(`min_threshold`), then return true if current_count > `min_threshold`. + // If there is Some(`min_threshold`), then return false if current_count <= `min_threshold`. + // If None, then return false. + match self.min_threshold { + Some(min_threshold) if self.is_enabled => current_count > min_threshold, + _ => false, + } + } + } + // if None, then we don't apply any back pressure. + None => false, + } + } +} + +pub async fn receive_new_message( + receiver: &mut M, + is_back_pressure_enabled: bool, +) -> Option>, MessageReceiverError>> +where + M: MessageReciever> + Send + Sync + 'static, +{ + if is_back_pressure_enabled { + // This sleep is fine, this just introduces a delay for fairness so that other async arms of tokio select can execute. + tokio::time::sleep(Duration::from_millis(2)).await; + None + } else { + Some(receiver.consume_message().await) + } } impl MessengerInboundService @@ -71,8 +158,10 @@ where rx_feedback_channel: mpsc::Receiver, suffix: Suffix, config: MessengerInboundServiceConfig, + back_pressure_configs: TalosBackPressureConfig, ) -> Self { let commit_interval = tokio::time::interval(Duration::from_millis(config.commit_frequency as u64)); + Self { message_receiver, tx_actions_channel, @@ -82,12 +171,19 @@ where commit_interval, last_committed_version: 0, next_version_to_commit: 0, + // enable_back_pressure: false, + in_progress_count: 0, + back_pressure_configs, } } /// Get next versions with their commit actions to process. /// - async fn process_next_actions(&mut self) -> MessengerServiceResult { - let items_to_process = self.suffix.get_suffix_items_to_process(); + async fn process_next_actions(&mut self) -> MessengerServiceResult { + let max_new_items_to_pick = self.back_pressure_configs.get_remaining_count(self.in_progress_count); + + let items_to_process = self.suffix.get_suffix_items_to_process(max_new_items_to_pick); + let new_in_progress_count = items_to_process.len() as u32; + for item in items_to_process { let ver = item.version; @@ -114,9 +210,11 @@ where // Mark item as in process self.suffix.set_item_state(ver, SuffixItemState::Processing); + // Increment the in_progress_count which is used to determine when back-pressure is enforced. + self.in_progress_count += 1; } - Ok(()) + Ok(new_in_progress_count) } fn update_commit_offset(&mut self, version: u64) { @@ -158,6 +256,9 @@ where // self.all_completed_versions.push(version); + // When any item moved to final state, we decrement this counter, which thereby relaxes the back-pressure if it was enforced. + self.in_progress_count -= 1; + if let Some((_, new_prune_version)) = self.suffix.update_prune_index_from_version(version) { self.update_commit_offset(new_prune_version); } @@ -220,98 +321,151 @@ where }; } - pub async fn run_once(&mut self) -> MessengerServiceResult { - tokio::select! { - // Receive feedback from publisher. - Some(feedback_result) = self.rx_feedback_channel.recv() => { - match feedback_result { - MessengerChannelFeedback::Error(version, key, message_error) => { - error!("Failed to process version={version} with error={message_error:?}"); - self.handle_action_failed(version, &key); - - }, - MessengerChannelFeedback::Success(version, key) => { - debug!("Successfully processed version={version} with action_key={key}"); - self.handle_action_success(version, &key); - }, - } - + /// Process the feedback received from another thread for some `on_commit` action. + pub(crate) fn process_feedbacks(&mut self, feedback_result: MessengerChannelFeedback) { + match feedback_result { + MessengerChannelFeedback::Error(version, key, message_error) => { + error!("Failed to process version={version} with error={message_error:?}"); + self.handle_action_failed(version, &key); } - // 1. Consume message. - // Ok(Some(msg)) = self.message_receiver.consume_message() => { - reciever_result = self.message_receiver.consume_message() => { - - match reciever_result { - // 2.1 For CM - Install messages on the version - Ok(Some(ChannelMessage::Candidate(candidate))) => { - let version = candidate.message.version; - debug!("Candidate version received is {version}"); - if version > 0 { - // insert item to suffix - if let Err(insert_error) = self.suffix.insert(version, candidate.message.into()) { - warn!("Failed to insert version {version} into suffix with error {insert_error:?}"); - } - - if let Some(item_to_update) = self.suffix.get_mut(version){ - if let Some(commit_actions) = &item_to_update.item.candidate.on_commit { - let filter_actions = get_allowed_commit_actions(commit_actions, &self.config.allowed_actions); - if filter_actions.is_empty() { - // There are on_commit actions, but not the ones required by messenger - item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoRelavantCommitActions)); - } else { - item_to_update.item.set_commit_action(filter_actions); - } - } else { - // No on_commit actions - item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoCommitActions)); - - } - }; + MessengerChannelFeedback::Success(version, key) => { + debug!("Successfully processed version={version} with action_key={key}"); + self.handle_action_success(version, &key); + } + } + } + /// Process the incoming candidate message + /// - Writes the candidate to suffix. + /// - Update to an early `Complete` state based on the `on_commit` actions. + /// - Transform the `on_commit` action to format required internally. + pub(crate) fn process_candidate_message(&mut self, candidate: CandidateChannelMessage) { + let version = candidate.message.version; + debug!("Candidate version received is {version}"); + if version > 0 { + // insert item to suffix + if let Err(insert_error) = self.suffix.insert(version, candidate.message.into()) { + warn!("Failed to insert version {version} into suffix with error {insert_error:?}"); + } + if let Some(item_to_update) = self.suffix.get_mut(version) { + if let Some(commit_actions) = &item_to_update.item.candidate.on_commit { + let filter_actions = get_allowed_commit_actions(commit_actions, &self.config.allowed_actions); + if filter_actions.is_empty() { + // There are on_commit actions, but not the ones required by messenger + item_to_update + .item + .set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoRelavantCommitActions)); + } else { + item_to_update.item.set_commit_action(filter_actions); + } + } else { + // No on_commit actions + item_to_update + .item + .set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoCommitActions)); + } + }; + } else { + warn!("Version 0 will not be inserted into suffix.") + } + } - } else { - warn!("Version 0 will not be inserted into suffix.") - } - }, - // 2.2 For DM - Update the decision with outcome + safepoint. - Ok(Some(ChannelMessage::Decision(decision))) => { - let version = decision.message.get_candidate_version(); - debug!("[Decision Message] Decision version received = {} for candidate version = {}", decision.decision_version, version); - - // TODO: GK - no hardcoded filters on headers - let headers: HashMap = decision.headers.into_iter().filter(|(key, _)| key.as_str() != "messageType").collect(); - self.suffix.update_item_decision(version, decision.decision_version, &decision.message, headers); - - // Look for any early `Complete(..)` state, and update the `prune_index` and `commit_offset`. - if let Ok(Some(suffix_item)) = self.suffix.get(version){ - if matches!(suffix_item.item.get_state(), SuffixItemState::Complete(..)) { - if let Some((_, new_prune_version)) = self.suffix.update_prune_index_from_version(version) { - self.update_commit_offset(new_prune_version); - } - } + /// Process the incoming decision message + /// - Update the suffix candidate item with decision. + /// - If any type of early `Complete` state, update the `prune_index` and `commit_offset` if applicable. + pub(crate) fn process_decision_message(&mut self, decision: DecisionChannelMessage) { + let version = decision.message.get_candidate_version(); + debug!( + "[Decision Message] Decision version received = {} for candidate version = {}", + decision.decision_version, version + ); + + // TODO: GK - no hardcoded filters on headers + let headers: HashMap = decision.headers.into_iter().filter(|(key, _)| key.as_str() != "messageType").collect(); + self.suffix.update_item_decision(version, decision.decision_version, &decision.message, headers); + + // Look for any early `Complete(..)` state, and update the `prune_index` and `commit_offset`. + if let Ok(Some(suffix_item)) = self.suffix.get(version) { + if matches!(suffix_item.item.get_state(), SuffixItemState::Complete(..)) { + if let Some((_, new_prune_version)) = self.suffix.update_prune_index_from_version(version) { + self.update_commit_offset(new_prune_version); + } + } + }; + } - }; + /// Compared the `max_in_progress_count` in config against `self.in_progress_count` to determine if back pressure should be enabled. + /// This helps to limit processing the incoming messages and therefore enable a bound on the suffix. + pub(crate) fn check_for_back_pressure(&mut self) { + // // Check to see if back pressure should be enabled. + // let should_enable = self + // .config + // .max_in_progress + // .map_or(false, |max_in_progress_count| self.in_progress_count >= max_in_progress_count); + + let should_enable = self.back_pressure_configs.should_enable_back_pressure(self.in_progress_count); + if should_enable { + // if back pressure was not enabled earlier, but will be enabled now, print a waning. + if !self.back_pressure_configs.is_enabled { + warn!( + "Applying back pressure as the total number of records currently being processed ({}) >= max in progress allowed ({:?})", + self.in_progress_count, self.back_pressure_configs.max_threshold + ); + } + // info!( + // "Back pressure - max_threshold = {:?} | min_threshold = {:?} | current in progress count = {} ", + // self.back_pressure_configs.max_threshold, self.back_pressure_configs.min_threshold, self.in_progress_count + // ); + } + self.back_pressure_configs.is_enabled = should_enable; + } - // Pick the next items from suffix whose actions are to be processed. - self.process_next_actions().await?; + pub async fn run_once(&mut self) -> MessengerServiceResult { + tokio::select! { + // feedback arm - Processes the feedbacks + Some(feedback_result) = self.rx_feedback_channel.recv() => { + self.process_feedbacks(feedback_result); + self.process_next_actions().await?; + } - }, - Ok(None) => { - debug!("No message to process.."); - }, - Err(error) => { - // Catch the error propogated, and if it has a version, mark the item as completed. - if let Some(version) = error.version { - if let Some(item_to_update) = self.suffix.get_mut(version){ - item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::ErrorProcessing)); + // Incoming message arm - + // - Processes the candidate or decision messages received. + // - Applies back pressure if too many records are being processed. + // - If back pressure is applied, will try to drain as many feedbacks and update the suffix as soon as possible to release the backpressure quickly. + result = receive_new_message(&mut self.message_receiver, self.back_pressure_configs.is_enabled) => { + if let Some(receiver_result) = result { + match receiver_result { + // 2.1 For CM - Install messages on the version + Ok(Some(ChannelMessage::Candidate(candidate))) => { + self.process_candidate_message(*candidate); + }, + // 2.2 For DM - Update the decision with outcome + safepoint. + Ok(Some(ChannelMessage::Decision(decision))) => { + + self.process_decision_message(*decision); + // Pick the next items from suffix whose actions are to be processed. + self.process_next_actions().await?; + + + }, + Ok(None) => { + debug!("No message to process.."); + }, + Err(error) => { + // Catch the error propogated, and if it has a version, mark the item as completed. + if let Some(version) = error.version { + if let Some(item_to_update) = self.suffix.get_mut(version){ + item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::ErrorProcessing)); + } } - } - error!("error consuming message....{:?}", error); - }, + error!("error consuming message....{:?}", error); + }, + } + } else { + debug!("Not consuming new messages due to backpressure enabled. Current in progress count = {} | Total feedbacks available in the feedback channel = {} | total actions send in the actions channel = {}", self.in_progress_count ,self.rx_feedback_channel.max_capacity() - self.rx_feedback_channel.capacity(), self.tx_actions_channel.max_capacity() - self.tx_actions_channel.capacity()); } - } // Periodically check and update the commit frequency and prune index. _ = self.commit_interval.tick() => { @@ -363,6 +517,9 @@ where } + // Check if back pressure should be enabled. + self.check_for_back_pressure(); + // NOTE: Pruning and committing offset adds to latency if done more frequently. if (self.next_version_to_commit - self.last_committed_version).ge(&(self.config.commit_size as u64)) { match self.message_receiver.commit() { diff --git a/packages/talos_messenger_core/src/services/mod.rs b/packages/talos_messenger_core/src/services/mod.rs index d3ae629a..dded7d0a 100644 --- a/packages/talos_messenger_core/src/services/mod.rs +++ b/packages/talos_messenger_core/src/services/mod.rs @@ -1,3 +1,3 @@ mod inbound_service; -pub use inbound_service::{MessengerInboundService, MessengerInboundServiceConfig}; +pub use inbound_service::{MessengerInboundService, MessengerInboundServiceConfig, TalosBackPressureConfig}; diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs index 8e50789d..796339e5 100644 --- a/packages/talos_messenger_core/src/suffix.rs +++ b/packages/talos_messenger_core/src/suffix.rs @@ -78,7 +78,7 @@ pub trait MessengerSuffixTrait: SuffixTrait { /// Get the state of an item by version. fn get_item_state(&self, version: u64) -> Option; /// Gets the suffix items eligible to process. - fn get_suffix_items_to_process(&self) -> Vec; + fn get_suffix_items_to_process(&self, limit: Option) -> Vec; /// Updates the decision for a version. fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D, headers: HashMap); /// Updates the action for a version using the action_key for lookup. @@ -88,6 +88,9 @@ pub trait MessengerSuffixTrait: SuffixTrait { /// If the prune index was updated, returns the new prune_index, else returns None. fn update_prune_index_from_version(&mut self, version: u64) -> Option<(usize, u64)>; + /// Get all versions on suffix for a state. + fn get_versions_by_state(&self, state: &SuffixItemState, start_index: Option, end_index: Option) -> Vec; + /// Checks if all commit actions are completed for the version fn are_all_actions_complete_for_version(&self, version: u64) -> SuffixResult; } @@ -310,7 +313,9 @@ where } } - fn get_suffix_items_to_process(&self) -> Vec { + fn get_suffix_items_to_process(&self, limit: Option) -> Vec { + let max_to_take = if let Some(max_limit) = limit { max_limit as usize } else { u32::MAX as usize }; + let current_prune_index = self.get_meta().prune_index; let start_index = current_prune_index.unwrap_or(0); @@ -343,6 +348,7 @@ where _ => true, } }) + .take(max_to_take) // add timings related headers. .map(|x| { let mut headers = x.item.get_headers().clone(); @@ -436,6 +442,29 @@ where Some((index, safe_prune_version)) } + /// Get all versions on suffix for a state, between an index range. + fn get_versions_by_state(&self, state: &SuffixItemState, start_index: Option, end_index: Option) -> Vec { + // If suffix is empty, return empty vec + if self.messages.is_empty() { + return vec![]; + } + + let start_index = start_index.unwrap_or(0); + + let end_index = match end_index { + Some(index) if index > start_index => index, + _ => self.suffix_length() - 1, + }; + + let k = self + .messages + .range(start_index..=end_index) + .flatten() + .filter_map(|i| if i.item.get_state().eq(state) { Some(i.item_ver) } else { None }) + .collect(); + k + } + fn are_all_actions_complete_for_version(&self, version: u64) -> SuffixResult { if let Ok(Some(item)) = self.get(version) { Ok(item.item.get_commit_actions().iter().all(|(_, x)| x.is_completed())) diff --git a/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs b/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs index 821e661f..b36ec346 100644 --- a/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs +++ b/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs @@ -4,8 +4,8 @@ use talos_suffix::core::SuffixConfig; use crate::{ models::MessengerCandidateMessage, - services::MessengerInboundServiceConfig, - suffix::{MessengerSuffixAssertionTrait, SuffixItemState}, + services::{MessengerInboundServiceConfig, TalosBackPressureConfig}, + suffix::{self, MessengerSuffixAssertionTrait, MessengerSuffixTrait, SuffixItemState}, tests::{ payload::{ candidate::{CandidateTestPayload, MockChannelMessage}, @@ -29,6 +29,7 @@ async fn test_suffix_without_feedback() { allowed_actions.insert("publish".to_owned(), vec!["kafka".to_owned()]); let inbound_service_configs = MessengerInboundServiceConfig::new(allowed_actions.into(), Some(10), Some(60 * 60 * 1_000)); + let backpressure_configs = TalosBackPressureConfig::new(None, None); let configs = MessengerServicesTesterConfigs::new( SuffixConfig { capacity: 50, @@ -36,6 +37,7 @@ async fn test_suffix_without_feedback() { min_size_after_prune: None, }, inbound_service_configs, + backpressure_configs, ); let mut service_tester = MessengerServiceTester::new_with_mock_action_service(configs); // END - Prep before test @@ -137,6 +139,7 @@ async fn test_suffix_item_state_by_on_commit() { allowed_actions.insert("publish".to_owned(), vec!["kafka".to_owned()]); let inbound_service_configs = MessengerInboundServiceConfig::new(allowed_actions.into(), Some(10), Some(60 * 60 * 1_000)); + let backpressure_configs = TalosBackPressureConfig::new(None, None); let configs = MessengerServicesTesterConfigs::new( SuffixConfig { capacity: 50, @@ -144,6 +147,7 @@ async fn test_suffix_item_state_by_on_commit() { min_size_after_prune: None, }, inbound_service_configs, + backpressure_configs, ); let mut service_tester = MessengerServiceTester::new_with_mock_action_service(configs); // END - Prep before test @@ -234,6 +238,7 @@ async fn test_suffix_item_state_by_decision() { allowed_actions.insert("publish".to_owned(), vec!["kafka".to_owned()]); let inbound_service_configs = MessengerInboundServiceConfig::new(allowed_actions.into(), Some(10), Some(60 * 60 * 1_000)); + let backpressure_configs = TalosBackPressureConfig::new(None, None); let configs = MessengerServicesTesterConfigs::new( SuffixConfig { capacity: 50, @@ -241,6 +246,7 @@ async fn test_suffix_item_state_by_decision() { min_size_after_prune: None, }, inbound_service_configs, + backpressure_configs, ); let mut service_tester = MessengerServiceTester::new_with_mock_action_service(configs); // END - Prep before test @@ -290,6 +296,7 @@ async fn test_suffix_with_success_feedbacks_only() { allowed_actions.insert("publish".to_owned(), vec!["kafka".to_owned()]); let inbound_service_configs = MessengerInboundServiceConfig::new(allowed_actions.into(), Some(10), Some(60 * 60 * 1_000)); + let backpressure_configs = TalosBackPressureConfig::new(None, None); let configs = MessengerServicesTesterConfigs::new( SuffixConfig { capacity: 50, @@ -297,6 +304,7 @@ async fn test_suffix_with_success_feedbacks_only() { min_size_after_prune: None, }, inbound_service_configs, + backpressure_configs, ); let mut service_tester = MessengerServiceTester::new_with_mock_action_service(configs); @@ -460,6 +468,7 @@ async fn test_suffix_exhaustive_state_transitions_without_pruning() { allowed_actions.insert("publish".to_owned(), vec!["kafka".to_owned()]); let inbound_service_configs = MessengerInboundServiceConfig::new(allowed_actions.into(), Some(10), Some(60 * 60 * 1_000)); + let backpressure_configs = TalosBackPressureConfig::new(None, None); let configs = MessengerServicesTesterConfigs::new( SuffixConfig { capacity: 50, @@ -467,6 +476,7 @@ async fn test_suffix_exhaustive_state_transitions_without_pruning() { min_size_after_prune: None, }, inbound_service_configs, + backpressure_configs, ); let mut service_tester = MessengerServiceTester::new_with_mock_action_service(configs); @@ -778,6 +788,7 @@ async fn test_suffix_prune_index_update_all_candidates_with_commit_decision_earl allowed_actions.insert("publish".to_owned(), vec!["kafka".to_owned()]); let inbound_service_configs = MessengerInboundServiceConfig::new(allowed_actions.into(), Some(10), Some(60 * 60 * 1_000)); + let backpressure_configs = TalosBackPressureConfig::new(None, None); let configs = MessengerServicesTesterConfigs::new( SuffixConfig { capacity: 50, @@ -785,6 +796,7 @@ async fn test_suffix_prune_index_update_all_candidates_with_commit_decision_earl min_size_after_prune: None, }, inbound_service_configs, + backpressure_configs, ); let mut service_tester = MessengerServiceTester::new_with_mock_action_service(configs); @@ -832,6 +844,269 @@ async fn test_suffix_prune_index_update_all_candidates_with_commit_decision_earl // End - Prepare basic candidates with various different types of on-commit actions } +// Test to see suffix write is paused and back pressure is applied. +#[tokio::test] +async fn test_back_pressure_build_up() { + let mut allowed_actions = AHashMap::new(); + allowed_actions.insert("publish".to_owned(), vec!["kafka".to_owned()]); + + // Max items allowed to be in progress before enforcing back pressure. + + let inbound_service_configs = MessengerInboundServiceConfig::new(allowed_actions.into(), Some(10), Some(60 * 60 * 1_000)); + let backpressure_configs = TalosBackPressureConfig::new(None, Some(2)); + let configs = MessengerServicesTesterConfigs::new( + SuffixConfig { + capacity: 50, + prune_start_threshold: Some(50), + min_size_after_prune: None, + }, + inbound_service_configs, + backpressure_configs, + ); + let mut service_tester = MessengerServiceTester::new_with_mock_action_service(configs); + + let headers = HashMap::new(); + // Candidate with on commit publish to kafka messages. + let mut on_commit = MockOnCommitMessage::new(); + + let on_commit_count_2 = 2; + for _ in 0..on_commit_count_2 { + on_commit.insert_kafka_message("some-topic".to_owned(), None, None, get_default_payload()); + } + let on_commit_value = on_commit.as_value(); + let candidate_with_on_commit: MessengerCandidateMessage = CandidateTestPayload::new().add_on_commit(&on_commit_value).build(); + assert!(candidate_with_on_commit.on_commit.is_some()); + + // End - Prepare basic candidates with various different types of on-commit actions + + let commit_outcome = build_mock_outcome(None, Some(0)); + let abort_outcome = build_mock_outcome(None, None); + + // -------------------------------------------------------------------------------------------------------------- + // + // START - Prepare test candidates and decisions + // + // -------------------------------------------------------------------------------------------------------------- + + // ################ version 20 Candidate (with valid on_commit actions) and Decision (with commit outcome) #####################/ + let vers_20 = 20; + let cm_20 = MockChannelMessage::new(&candidate_with_on_commit, vers_20); + let cm_channel_msg_20 = cm_20.build_candidate_channel_message(&headers); + + // Produces a commit outcome with safepoint as version 3. + let dm_channel_msg_20 = cm_20.build_decision_channel_message(vers_20 + 1, &commit_outcome, 0, &headers); + + // ################ version 22 Candidate (with valid on_commit actions) and Decision (with commit outcome) #####################/ + let vers_22 = 22; + let cm_22 = MockChannelMessage::new(&candidate_with_on_commit, vers_22); + let cm_channel_msg_22 = cm_22.build_candidate_channel_message(&headers); + + // Produces a commit outcome with safepoint as version 3. + let dm_channel_msg_22 = cm_22.build_decision_channel_message(vers_22 + 1, &commit_outcome, 0, &headers); + + // ################ version 25 Candidate (with valid on_commit actions) and Decision (with abort outcome) #####################/ + let vers_25 = 25; + let cm_25 = MockChannelMessage::new(&candidate_with_on_commit, vers_25); + let cm_channel_msg_25 = cm_25.build_candidate_channel_message(&headers); + + // Produces a commit outcome with safepoint as version 3. + let dm_channel_msg_25 = cm_25.build_decision_channel_message(vers_25 + 2, &commit_outcome, 0, &headers); + + // ################ version 30 Candidate (with valid on_commit actions) and Decision (with commit outcome) #####################/ + let vers_30 = 30; + let cm_30 = MockChannelMessage::new(&candidate_with_on_commit, vers_30); + let cm_channel_msg_30 = cm_30.build_candidate_channel_message(&headers); + + // Produces a commit outcome with safepoint as version 3. + let dm_channel_msg_30 = cm_30.build_decision_channel_message(vers_30 + 2, &commit_outcome, 0, &headers); + + // ################ version 33 Candidate (with valid on_commit actions) and Decision (with commit outcome) #####################/ + let vers_33 = 33; + let cm_33 = MockChannelMessage::new(&candidate_with_on_commit, vers_33); + let cm_channel_msg_33 = cm_33.build_candidate_channel_message(&headers); + + // Produces a commit outcome with safepoint as version 3. + let dm_channel_msg_33 = cm_33.build_decision_channel_message(vers_33 + 2, &abort_outcome, 0, &headers); + + // ################ version 37 Candidate (with valid on_commit actions) and Decision (with commit outcome) #####################/ + let vers_37 = 37; + let cm_37 = MockChannelMessage::new(&candidate_with_on_commit, vers_37); + let cm_channel_msg_37 = cm_37.build_candidate_channel_message(&headers); + + // Produces a commit outcome with safepoint as version 3. + let dm_channel_msg_37 = cm_37.build_decision_channel_message(vers_37 + 2, &commit_outcome, 0, &headers); + + // ################ version 40 Candidate (with valid on_commit actions) and Decision (with commit outcome) #####################/ + let vers_40 = 40; + let cm_40 = MockChannelMessage::new(&candidate_with_on_commit, vers_40); + let cm_channel_msg_40 = cm_40.build_candidate_channel_message(&headers); + + // Produces a commit outcome with safepoint as version 3. + let _dm_channel_msg_40 = cm_40.build_decision_channel_message(vers_40 + 2, &commit_outcome, 0, &headers); + + // END - Prepare test candidates and decisions + + env_logger::init(); + + // -------------------------------------------------------------------------------------------------------------- + // + // START - Process the test candidates and decisions and assert + // + // -------------------------------------------------------------------------------------------------------------- + + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // ###### - Processing candidates 20, 22 and 25 and assert suffix length + // ###### - Feedbacks are not processed now + // ###### - max items allowed to be processed = 2. + // + // ****** - 🧪 Expected result - Length of suffix after insert is 6. (Head = 20 | Tail = 25) + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + service_tester.process_message_journey(Some(cm_channel_msg_20), None, None).await; + let suffix = service_tester.get_suffix(); + assert_eq!(suffix.messages.len(), 1); + + // service_tester.process_message_journey(None, None, Some(JourneyConfig::new(true, 5))).await; + + service_tester.process_message_journey(Some(cm_channel_msg_22), None, None).await; + let suffix = service_tester.get_suffix(); + assert_eq!(suffix.messages.len(), 3); + + service_tester.process_message_journey(Some(cm_channel_msg_25), None, None).await; + let suffix = service_tester.get_suffix(); + assert_eq!(suffix.messages.len(), 6); + + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // ###### - Processing decisions for 20, 22 and 25. + // ###### - Decisions are published in the following order - 25, 20 and finally decision for 22. + // ###### - Feedbacks are not processed yet. + // ###### - max items allowed to be processed = 2. + // + // ****** - 🧪 Expected result: + // ****** - As version 25 and 20 decisions came in first, and since the decisions are commit and there is no + // ****** safepoint constraint, those candidates will move to `SuffixItemState::Processing`. + // ****** - As version 22 comes last, and the max allowed items to be processed at a time is `2`, the decision will not + // ****** received, and therefore the state would be `SuffixItemState::AwaitingDecision`. + // + // ****** - ✅ !!! Back pressure applied !!! Decision from version 22, is not processed + // + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + service_tester.process_message_journey(None, Some(dm_channel_msg_25), None).await; + // Scan whole suffix and find items currently Processing + let suffix = service_tester.get_suffix(); + let suffix_items_in_processing_state = suffix.get_versions_by_state(&SuffixItemState::Processing, None, None).len(); + assert_eq!(suffix_items_in_processing_state, 1); + + service_tester.process_message_journey(None, Some(dm_channel_msg_20), None).await; + // Scan whole suffix and find items currently Processing + let suffix = service_tester.get_suffix(); + let suffix_items_in_processing_state = suffix.get_versions_by_state(&SuffixItemState::Processing, None, None).len(); + assert_eq!(suffix_items_in_processing_state, 2); + + service_tester.process_message_journey(None, Some(dm_channel_msg_22), None).await; + // Scan whole suffix and find items currently Processing + let suffix = service_tester.get_suffix(); + let suffix_items_in_processing_state = suffix.get_versions_by_state(&SuffixItemState::Processing, None, None).len(); + + assert_eq!(suffix.get_item_state(vers_20).unwrap(), SuffixItemState::Processing); + assert_eq!(suffix.get_item_state(vers_25).unwrap(), SuffixItemState::Processing); + // Decision message was not consumed due to back pressure applied. + assert_eq!(suffix.get_item_state(vers_22).unwrap(), SuffixItemState::AwaitingDecision); + assert_eq!(suffix_items_in_processing_state, 2); + + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // ###### - action service processes for version 25. + // ###### - Because version 25's version was first received, it will be send for processing + // ###### - We know that version 20 and 25 are in `SuffixItemState::Processing`, and each have 2 on_commit actions. + // + // ****** - 🧪 Expected result: + // ****** - Version 25 state updated to `Complete`. + // ****** - Version 22 state updated to `Processing`. + // ****** - Version 20 state updated to `Processing`. + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + + // Action service will process version 25 `on_commit` actions + service_tester.process_message_journey(None, None, Some(JourneyConfig::new(true, 10))).await; + + let suffix = service_tester.get_suffix(); + assert_eq!(suffix.get_item_state(vers_20).unwrap(), SuffixItemState::Processing); + assert_eq!( + suffix.get_item_state(vers_25).unwrap(), + SuffixItemState::Complete(suffix::SuffixItemCompleteStateReason::Processed) + ); + assert_eq!(suffix.get_item_state(vers_22).unwrap(), SuffixItemState::Processing); + + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // ###### - Processing candidates 30, 33 and 37 + // ###### - Processing of decisions are not done at the moment. + // ###### - Feedbacks are not processed now + // ###### - max items allowed to be processed = 3. + // + // ****** - 🧪 Expected result: + // ****** - ✅ !!! Back pressure applied !!! + // - Version 30, 33 and 37 candidates will not be processed (inserted into suffix). + // - Suffix length remains at 6. + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + + service_tester.process_message_journey(Some(cm_channel_msg_30), None, None).await; + service_tester.process_message_journey(Some(cm_channel_msg_33), None, None).await; + service_tester.process_message_journey(Some(cm_channel_msg_37), None, None).await; + let suffix = service_tester.get_suffix(); + assert_eq!(suffix.messages.len(), 6); + + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // ###### - Processing `on_commit` actions for version 20 and 22 + // ###### - We process all the actions available for action service to pick + // + // ****** - 🧪 Expected result: + // - Version 20, 22 candidates move to final state `Complete(Processed)`. + // - Version 30, 33 and 37 candidates will not be `AwaitingDecision` state. + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + + service_tester.process_message_journey(None, None, Some(JourneyConfig::new(true, 2))).await; + service_tester.process_message_journey(None, None, Some(JourneyConfig::new(true, 5))).await; + let suffix = service_tester.get_suffix(); + assert_eq!( + suffix.get_item_state(vers_20).unwrap(), + SuffixItemState::Complete(suffix::SuffixItemCompleteStateReason::Processed) + ); + assert_eq!( + suffix.get_item_state(vers_22).unwrap(), + SuffixItemState::Complete(suffix::SuffixItemCompleteStateReason::Processed) + ); + assert_eq!(suffix.get_item_state(vers_30).unwrap(), SuffixItemState::AwaitingDecision); + assert_eq!(suffix.get_item_state(vers_33).unwrap(), SuffixItemState::AwaitingDecision); + assert_eq!(suffix.get_item_state(vers_37).unwrap(), SuffixItemState::AwaitingDecision); + + let total_in_flight = suffix.get_versions_by_state(&SuffixItemState::Processing, None, None).len() + + suffix.get_versions_by_state(&SuffixItemState::PartiallyComplete, None, None).len(); + + assert_eq!(total_in_flight, 0); + assert_eq!(suffix.messages.len(), 18); + + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // ###### - Processing decisions for version 33, 30 and 37 and then process candidate 40 + // + // ****** - 🧪 Expected result: + // - Version 30, 37 will be in `Processing` state. + // - Version 33 will be in `Complete(Aborted)` state, due to abort decision. + // - ✅ !!! Back pressure applied !!! Version 40 candidate will not be inserted and suffix length will remain + // same. + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + service_tester.process_message_journey(None, Some(dm_channel_msg_33), None).await; + service_tester.process_message_journey(None, Some(dm_channel_msg_30), None).await; + service_tester.process_message_journey(None, Some(dm_channel_msg_37), None).await; + service_tester.process_message_journey(Some(cm_channel_msg_40), None, None).await; + + let suffix = service_tester.get_suffix(); + assert_eq!( + suffix.get_item_state(vers_33).unwrap(), + SuffixItemState::Complete(suffix::SuffixItemCompleteStateReason::Aborted) + ); + assert_eq!(suffix.get_item_state(vers_30).unwrap(), SuffixItemState::Processing); + assert_eq!(suffix.get_item_state(vers_37).unwrap(), SuffixItemState::Processing); + assert_eq!(suffix.messages.len(), 18); +} + // TODO: GK - Run `test_suffix_exhaustive_state_transitions_without_pruning` with smaller prune_threshold to see how pruning behaves through all the above scenarios. // TODO: GK - Test commit offset logic diff --git a/packages/talos_messenger_core/src/tests/test_utils.rs b/packages/talos_messenger_core/src/tests/test_utils.rs index 72afb5a6..15c5ca6f 100644 --- a/packages/talos_messenger_core/src/tests/test_utils.rs +++ b/packages/talos_messenger_core/src/tests/test_utils.rs @@ -13,6 +13,7 @@ use talos_certifier::{ ChannelMessage, }; use talos_suffix::{core::SuffixConfig, Suffix}; + use tokio::{ sync::mpsc::{self, Sender}, task::JoinHandle, @@ -22,7 +23,7 @@ use crate::{ core::{ActionService, MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, PublishActionType}, errors::{MessengerActionError, MessengerServiceResult}, models::MessengerCandidateMessage, - services::{MessengerInboundService, MessengerInboundServiceConfig}, + services::{MessengerInboundService, MessengerInboundServiceConfig, TalosBackPressureConfig}, suffix::MessengerCandidate, utlis::get_actions_deserialised, }; @@ -171,6 +172,7 @@ pub struct MockActionService { impl ActionService for MockActionService { async fn process_action(&mut self) -> MessengerServiceResult { let actions_result = self.rx_actions_channel.try_recv(); + // error!("Action result on try_recv is {actions_result:?}"); match actions_result { Ok(actions) => { let MessengerCommitActions { @@ -179,7 +181,7 @@ impl ActionService for MockActionService { headers, } = actions; - error!("Headers... {headers:#?}"); + // error!("Headers... {headers:#?}"); if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()) { match get_actions_deserialised::>(publish_actions_for_type) { Ok(actions) => { @@ -261,13 +263,32 @@ pub fn build_mock_outcome(conflict_version: Option, safepoint: Option) pub struct MessengerServicesTesterConfigs { suffix_configs: SuffixConfig, inbound_service_configs: MessengerInboundServiceConfig, + backpressure_configs: TalosBackPressureConfig, } impl MessengerServicesTesterConfigs { - pub fn new(suffix_configs: SuffixConfig, inbound_service_configs: MessengerInboundServiceConfig) -> Self { + pub fn new(suffix_configs: SuffixConfig, inbound_service_configs: MessengerInboundServiceConfig, backpressure_configs: TalosBackPressureConfig) -> Self { MessengerServicesTesterConfigs { suffix_configs, inbound_service_configs, + backpressure_configs, + } + } +} + +pub struct JourneyConfig { + /// Should the journey process the actions in the actions service. + should_process_actions: bool, + /// There can be `n` number of actions under `on_commit` of a version. + /// Therefore this field helps to iterate through the feedbacks from actions_service for the actions. + run_inbound_service_count: u32, +} + +impl JourneyConfig { + pub fn new(process_actions: bool, run_inbound_service_count: u32) -> Self { + JourneyConfig { + run_inbound_service_count, + should_process_actions: process_actions, } } } @@ -305,6 +326,7 @@ impl MessengerServiceTester Inbound Service. - if configs.expected_feedbacks_in_journey > 0 { - for _ in 1..=configs.expected_feedbacks_in_journey { + if configs.run_inbound_service_count > 0 { + for _ in 1..=configs.run_inbound_service_count { self.inbound_service.run_once().await.unwrap(); } } @@ -366,23 +388,6 @@ impl MessengerServiceTester Self { - JourneyConfig { - expected_feedbacks_in_journey: expected_feedbacks, - should_process_actions: process_actions, - } - } -} - impl MessengerServiceTester where M: MessageReciever> + Send + Sync + 'static, From 15161b09cb52d310cae3083754cfe9fff07e2540 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Wed, 2 Apr 2025 08:55:25 +1100 Subject: [PATCH 2/4] chore: back pressure logic refactor --- .../src/services/inbound_service.rs | 44 ++++++++++++------- .../src/tests/messenger_inbound_service.rs | 5 +-- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 71ab2804..9bd8aae6 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -63,16 +63,14 @@ where last_committed_version: u64, /// The next version ready to be send for commit. next_version_to_commit: u64, - // Flag denotes if back pressure is enabled, and therefore the thread cannot receive more messages (candidate/decisions) to process. - // enable_back_pressure: bool, - /// Number of items currently in progress. Which denotes, the number of items in suffix in `SuffixItemState::Processing` or `SuffixItemState::PartiallyComplete`. - in_progress_count: u32, /// Configs to check back pressure. back_pressure_configs: TalosBackPressureConfig, } #[derive(Debug, Default, Clone)] pub struct TalosBackPressureConfig { + /// Current count which is used to check against max and min to determine if back-pressure should be applied. + current: u32, /// Flag denotes if back pressure is enabled, and therefore the thread cannot receive more messages (candidate/decisions) to process. pub is_enabled: bool, /// Max count before back pressue is enabled. @@ -88,18 +86,27 @@ impl TalosBackPressureConfig { pub fn new(min_threshold: Option, max_threshold: Option) -> Self { assert!( min_threshold.le(&max_threshold), - "min_threshold ({min_threshold:?}) must be less than max_threshold ({max_threshold:?})" + "min_threshold ({min_threshold:?}) must be less or equal to the max_threshold ({max_threshold:?})" ); Self { max_threshold, min_threshold, is_enabled: false, + current: 0, } } + pub fn increment_current(&mut self) { + self.current += 1; + } + + pub fn decrement_current(&mut self) { + self.current -= 1; + } + /// Get the remaining available count before hitting the max_threshold, and thereby enabling back pressure. - pub fn get_remaining_count(&self, current_count: u32) -> Option { - self.max_threshold.map(|max| max.saturating_sub(current_count)) + pub fn get_remaining_count(&self) -> Option { + self.max_threshold.map(|max| max.saturating_sub(self.current)) } /// Looks at the `max_threshold` and `min_threshold` to determine if back pressure should be enabled. `max_threshold` is used to determine when to enable the back-pressure @@ -108,7 +115,8 @@ impl TalosBackPressureConfig { /// If this is set to `None`, no back pressure will be applied. /// `max_threshold` is /// - `min_threshold` - Use to determine the lower bound of maximum items allowed. If this is set to `None`, no back pressure will be applied. - pub fn should_enable_back_pressure(&mut self, current_count: u32) -> bool { + pub fn should_enable_back_pressure(&mut self) -> bool { + let current_count = self.current; match self.max_threshold { Some(max_threshold) => { // if not enabled, only check against the max_threshold. @@ -172,14 +180,14 @@ where last_committed_version: 0, next_version_to_commit: 0, // enable_back_pressure: false, - in_progress_count: 0, + // in_progress_count: 0, back_pressure_configs, } } /// Get next versions with their commit actions to process. /// async fn process_next_actions(&mut self) -> MessengerServiceResult { - let max_new_items_to_pick = self.back_pressure_configs.get_remaining_count(self.in_progress_count); + let max_new_items_to_pick = self.back_pressure_configs.get_remaining_count(); let items_to_process = self.suffix.get_suffix_items_to_process(max_new_items_to_pick); let new_in_progress_count = items_to_process.len() as u32; @@ -211,7 +219,13 @@ where // Mark item as in process self.suffix.set_item_state(ver, SuffixItemState::Processing); // Increment the in_progress_count which is used to determine when back-pressure is enforced. - self.in_progress_count += 1; + self.back_pressure_configs.increment_current(); + + // info!( + // "Current items in channel = {} | from current count in backpressure configs = {}", + // self.tx_actions_channel.max_capacity() - self.tx_actions_channel.capacity(), + // self.back_pressure_configs.current + // ); } Ok(new_in_progress_count) @@ -257,7 +271,7 @@ where // self.all_completed_versions.push(version); // When any item moved to final state, we decrement this counter, which thereby relaxes the back-pressure if it was enforced. - self.in_progress_count -= 1; + self.back_pressure_configs.decrement_current(); if let Some((_, new_prune_version)) = self.suffix.update_prune_index_from_version(version) { self.update_commit_offset(new_prune_version); @@ -404,13 +418,13 @@ where // .max_in_progress // .map_or(false, |max_in_progress_count| self.in_progress_count >= max_in_progress_count); - let should_enable = self.back_pressure_configs.should_enable_back_pressure(self.in_progress_count); + let should_enable = self.back_pressure_configs.should_enable_back_pressure(); if should_enable { // if back pressure was not enabled earlier, but will be enabled now, print a waning. if !self.back_pressure_configs.is_enabled { warn!( "Applying back pressure as the total number of records currently being processed ({}) >= max in progress allowed ({:?})", - self.in_progress_count, self.back_pressure_configs.max_threshold + self.back_pressure_configs.current, self.back_pressure_configs.max_threshold ); } // info!( @@ -464,7 +478,7 @@ where }, } } else { - debug!("Not consuming new messages due to backpressure enabled. Current in progress count = {} | Total feedbacks available in the feedback channel = {} | total actions send in the actions channel = {}", self.in_progress_count ,self.rx_feedback_channel.max_capacity() - self.rx_feedback_channel.capacity(), self.tx_actions_channel.max_capacity() - self.tx_actions_channel.capacity()); + debug!("Not consuming new messages due to backpressure enabled. Current in progress count = {} | Total feedbacks available in the feedback channel = {} | total actions send in the actions channel = {}", self.back_pressure_configs.current ,self.rx_feedback_channel.max_capacity() - self.rx_feedback_channel.capacity(), self.tx_actions_channel.max_capacity() - self.tx_actions_channel.capacity()); } } // Periodically check and update the commit frequency and prune index. diff --git a/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs b/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs index b36ec346..5c9e78d8 100644 --- a/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs +++ b/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs @@ -173,8 +173,7 @@ async fn test_suffix_item_state_by_on_commit() { let on_commit_value = on_commit.as_value(); let candidate_with_on_commit: MessengerCandidateMessage = CandidateTestPayload::new().add_on_commit(&on_commit_value).build(); assert!(candidate_with_on_commit.on_commit.is_some()); - // env_logger::init(); - // error!("candidate_with_on_commit is \n {candidate_with_on_commit:#?}"); + // END - Prepare basic candidates with various different types of on-commit actions. let headers = HashMap::new(); @@ -946,8 +945,6 @@ async fn test_back_pressure_build_up() { // END - Prepare test candidates and decisions - env_logger::init(); - // -------------------------------------------------------------------------------------------------------------- // // START - Process the test candidates and decisions and assert From 78b7116707a1264d92adee4b82fefdd679f2b576 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Wed, 2 Apr 2025 13:11:37 +1100 Subject: [PATCH 3/4] chore: final refactor --- .../examples/messenger_using_kafka.rs | 9 +- .../talos_common_utils/src/back_pressure.rs | 76 ++++++++ packages/talos_common_utils/src/lib.rs | 1 + .../src/messenger_with_kafka.rs | 3 +- .../src/services/inbound_service.rs | 179 +++++------------- .../talos_messenger_core/src/services/mod.rs | 2 +- .../src/tests/messenger_inbound_service.rs | 3 +- .../src/tests/test_utils.rs | 9 +- 8 files changed, 140 insertions(+), 142 deletions(-) create mode 100644 packages/talos_common_utils/src/back_pressure.rs diff --git a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs index 040456bc..b4e2d020 100644 --- a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs +++ b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs @@ -1,9 +1,6 @@ -use talos_common_utils::env_var; +use talos_common_utils::{back_pressure::TalosBackPressureConfig, env_var}; use talos_messenger_actions::messenger_with_kafka::{messenger_with_kafka, Configuration}; -use talos_messenger_core::{ - services::TalosBackPressureConfig, - utlis::{create_whitelist_actions_from_str, ActionsParserConfig}, -}; +use talos_messenger_core::utlis::{create_whitelist_actions_from_str, ActionsParserConfig}; use talos_rdkafka_utils::kafka_config::KafkaConfig; use talos_suffix::core::SuffixConfig; @@ -43,7 +40,7 @@ async fn main() { channel_buffers: None, commit_size: Some(2_000), commit_frequency: None, - back_pressure_config: Some(TalosBackPressureConfig::new(Some(40), Some(50))), + back_pressure_config: Some(TalosBackPressureConfig::new(None, Some(50_000))), }; messenger_with_kafka(config).await.unwrap(); diff --git a/packages/talos_common_utils/src/back_pressure.rs b/packages/talos_common_utils/src/back_pressure.rs new file mode 100644 index 00000000..1687543e --- /dev/null +++ b/packages/talos_common_utils/src/back_pressure.rs @@ -0,0 +1,76 @@ +#[derive(Debug, Default, Clone)] +pub struct TalosBackPressureConfig { + /// Current count which is used to check against max and min to determine if back-pressure should be applied. + pub current: u32, + /// Flag denotes if back pressure is enabled, and therefore the thread cannot receive more messages (candidate/decisions) to process. + pub is_enabled: bool, + /// Max count before back pressue is enabled. + /// if `None`, back pressure logic will not apply. + pub max_threshold: Option, + /// `min_threshold` helps to prevent immediate toggle between switch on and off of the backpressure. + /// Batch of items to process, when back pressure is enabled before disable logic is checked? + /// if None, no minimum check is done, and as soon as the count is below the max_threshold, back pressure is disabled. + pub min_threshold: Option, +} + +impl TalosBackPressureConfig { + pub fn new(min_threshold: Option, max_threshold: Option) -> Self { + assert!( + min_threshold.le(&max_threshold), + "min_threshold ({min_threshold:?}) must be less or equal to the max_threshold ({max_threshold:?})" + ); + Self { + max_threshold, + min_threshold, + is_enabled: false, + current: 0, + } + } + + pub fn increment_current(&mut self) { + self.current += 1; + } + + pub fn decrement_current(&mut self) { + self.current -= 1; + } + + /// Get the remaining available count before hitting the max_threshold, and thereby enabling back pressure. + pub fn get_remaining_count(&self) -> Option { + self.max_threshold.map(|max| max.saturating_sub(self.current)) + } + + pub fn update_back_pressure_flag(&mut self, is_enabled: bool) { + self.is_enabled = is_enabled + } + + /// Looks at the `max_threshold` and `min_threshold` to determine if back pressure should be enabled. `max_threshold` is used to determine when to enable the back-pressure + /// whereas, `min_threshold` is used to look at the lower bound + /// - `max_threshold` - Use to determine the upper bound of maximum items allowed. + /// If this is set to `None`, no back pressure will be applied. + /// `max_threshold` is + /// - `min_threshold` - Use to determine the lower bound of maximum items allowed. If this is set to `None`, no back pressure will be applied. + pub fn should_apply_back_pressure(&mut self) -> bool { + let current_count = self.current; + match self.max_threshold { + Some(max_threshold) => { + // if not enabled, only check against the max_threshold. + if current_count >= max_threshold { + true + } else { + // if already enabled, check when is it safe to remove. + + // If there is Some(`min_threshold`), then return true if current_count > `min_threshold`. + // If there is Some(`min_threshold`), then return false if current_count <= `min_threshold`. + // If None, then return false. + match self.min_threshold { + Some(min_threshold) if self.is_enabled => current_count > min_threshold, + _ => false, + } + } + } + // if None, then we don't apply any back pressure. + None => false, + } + } +} diff --git a/packages/talos_common_utils/src/lib.rs b/packages/talos_common_utils/src/lib.rs index 3d7924f6..91a7efbc 100644 --- a/packages/talos_common_utils/src/lib.rs +++ b/packages/talos_common_utils/src/lib.rs @@ -1 +1,2 @@ +pub mod back_pressure; pub mod env; diff --git a/packages/talos_messenger_actions/src/messenger_with_kafka.rs b/packages/talos_messenger_actions/src/messenger_with_kafka.rs index b8080814..30668f69 100644 --- a/packages/talos_messenger_actions/src/messenger_with_kafka.rs +++ b/packages/talos_messenger_actions/src/messenger_with_kafka.rs @@ -4,10 +4,11 @@ use log::debug; use rdkafka::producer::ProducerContext; use talos_certifier::ports::{errors::MessagePublishError, MessageReciever}; use talos_certifier_adapters::KafkaConsumer; +use talos_common_utils::back_pressure::TalosBackPressureConfig; use talos_messenger_core::{ core::{MessengerPublisher, PublishActionType}, errors::{MessengerServiceError, MessengerServiceErrorKind, MessengerServiceResult}, - services::{MessengerInboundService, MessengerInboundServiceConfig, TalosBackPressureConfig}, + services::{MessengerInboundService, MessengerInboundServiceConfig}, suffix::MessengerCandidate, talos_messenger_service::TalosMessengerService, }; diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 9bd8aae6..51b8326d 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -10,6 +10,7 @@ use talos_certifier::{ ports::{errors::MessageReceiverError, MessageReciever}, ChannelMessage, }; +use talos_common_utils::back_pressure::TalosBackPressureConfig; use talos_suffix::{core::SuffixMeta, Suffix, SuffixTrait}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tokio::{ @@ -67,79 +68,6 @@ where back_pressure_configs: TalosBackPressureConfig, } -#[derive(Debug, Default, Clone)] -pub struct TalosBackPressureConfig { - /// Current count which is used to check against max and min to determine if back-pressure should be applied. - current: u32, - /// Flag denotes if back pressure is enabled, and therefore the thread cannot receive more messages (candidate/decisions) to process. - pub is_enabled: bool, - /// Max count before back pressue is enabled. - /// if `None`, back pressure logic will not apply. - max_threshold: Option, - /// `min_threshold` helps to prevent immediate toggle between switch on and off of the backpressure. - /// Batch of items to process, when back pressure is enabled before disable logic is checked? - /// if None, no minimum check is done, and as soon as the count is below the max_threshold, back pressure is disabled. - min_threshold: Option, -} - -impl TalosBackPressureConfig { - pub fn new(min_threshold: Option, max_threshold: Option) -> Self { - assert!( - min_threshold.le(&max_threshold), - "min_threshold ({min_threshold:?}) must be less or equal to the max_threshold ({max_threshold:?})" - ); - Self { - max_threshold, - min_threshold, - is_enabled: false, - current: 0, - } - } - - pub fn increment_current(&mut self) { - self.current += 1; - } - - pub fn decrement_current(&mut self) { - self.current -= 1; - } - - /// Get the remaining available count before hitting the max_threshold, and thereby enabling back pressure. - pub fn get_remaining_count(&self) -> Option { - self.max_threshold.map(|max| max.saturating_sub(self.current)) - } - - /// Looks at the `max_threshold` and `min_threshold` to determine if back pressure should be enabled. `max_threshold` is used to determine when to enable the back-pressure - /// whereas, `min_threshold` is used to look at the lower bound - /// - `max_threshold` - Use to determine the upper bound of maximum items allowed. - /// If this is set to `None`, no back pressure will be applied. - /// `max_threshold` is - /// - `min_threshold` - Use to determine the lower bound of maximum items allowed. If this is set to `None`, no back pressure will be applied. - pub fn should_enable_back_pressure(&mut self) -> bool { - let current_count = self.current; - match self.max_threshold { - Some(max_threshold) => { - // if not enabled, only check against the max_threshold. - if current_count >= max_threshold { - true - } else { - // if already enabled, check when is it safe to remove. - - // If there is Some(`min_threshold`), then return true if current_count > `min_threshold`. - // If there is Some(`min_threshold`), then return false if current_count <= `min_threshold`. - // If None, then return false. - match self.min_threshold { - Some(min_threshold) if self.is_enabled => current_count > min_threshold, - _ => false, - } - } - } - // if None, then we don't apply any back pressure. - None => false, - } - } -} - pub async fn receive_new_message( receiver: &mut M, is_back_pressure_enabled: bool, @@ -148,7 +76,8 @@ where M: MessageReciever> + Send + Sync + 'static, { if is_back_pressure_enabled { - // This sleep is fine, this just introduces a delay for fairness so that other async arms of tokio select can execute. + // This sleep with hardcoded value is fine, this just introduces a delay for fairness so that other async arms of tokio select can execute. + // We introduce this delay just so that feedback or any other arm gets a chance to execute when back-pressue is enabled. tokio::time::sleep(Duration::from_millis(2)).await; None } else { @@ -179,8 +108,6 @@ where commit_interval, last_committed_version: 0, next_version_to_commit: 0, - // enable_back_pressure: false, - // in_progress_count: 0, back_pressure_configs, } } @@ -220,12 +147,6 @@ where self.suffix.set_item_state(ver, SuffixItemState::Processing); // Increment the in_progress_count which is used to determine when back-pressure is enforced. self.back_pressure_configs.increment_current(); - - // info!( - // "Current items in channel = {} | from current count in backpressure configs = {}", - // self.tx_actions_channel.max_capacity() - self.tx_actions_channel.capacity(), - // self.back_pressure_configs.current - // ); } Ok(new_in_progress_count) @@ -409,30 +330,57 @@ where }; } - /// Compared the `max_in_progress_count` in config against `self.in_progress_count` to determine if back pressure should be enabled. - /// This helps to limit processing the incoming messages and therefore enable a bound on the suffix. - pub(crate) fn check_for_back_pressure(&mut self) { - // // Check to see if back pressure should be enabled. - // let should_enable = self - // .config - // .max_in_progress - // .map_or(false, |max_in_progress_count| self.in_progress_count >= max_in_progress_count); - - let should_enable = self.back_pressure_configs.should_enable_back_pressure(); - if should_enable { - // if back pressure was not enabled earlier, but will be enabled now, print a waning. - if !self.back_pressure_configs.is_enabled { - warn!( + /// Common house keeping checks and updates that run at the end of every iteration of `Self::run_once`. + /// - Check if backpressure must be enabled. + /// - Check if we have crossed the `commit_size` threshold and issue a commit. + /// - Check if suffix should be pruned. + pub(crate) fn run_once_housekeeping(&mut self) -> MessengerServiceResult { + // check back pressure + let enable_back_pressure = self.back_pressure_configs.should_apply_back_pressure(); + + // update back pressure if it has changed. + if self.back_pressure_configs.is_enabled != enable_back_pressure { + if enable_back_pressure { + info!( "Applying back pressure as the total number of records currently being processed ({}) >= max in progress allowed ({:?})", self.back_pressure_configs.current, self.back_pressure_configs.max_threshold ); + } else { + info!( + "Removing back pressure as the total number of records currently being processed ({}) < max in progress allowed ({:?})", + self.back_pressure_configs.current, self.back_pressure_configs.max_threshold + ); } - // info!( - // "Back pressure - max_threshold = {:?} | min_threshold = {:?} | current in progress count = {} ", - // self.back_pressure_configs.max_threshold, self.back_pressure_configs.min_threshold, self.in_progress_count - // ); + self.back_pressure_configs.update_back_pressure_flag(enable_back_pressure); + } + + // NOTE: Pruning and committing offset adds to latency if done more frequently. + + // issue commit, is applicable. + if (self.next_version_to_commit - self.last_committed_version).ge(&(self.config.commit_size as u64)) { + match self.message_receiver.commit() { + Err(err) => { + error!("[Commit] Error committing {err:?}"); + } + Ok(_) => { + self.last_committed_version = self.next_version_to_commit; + info!("Last commit at offset {}", self.last_committed_version); + } + }; } - self.back_pressure_configs.is_enabled = should_enable; + + // suffix prune, if applicable. + let SuffixMeta { + prune_index, + prune_start_threshold, + .. + } = self.suffix.get_meta(); + + if prune_index.gt(prune_start_threshold) { + self.suffix_pruning(); + }; + + Ok(()) } pub async fn run_once(&mut self) -> MessengerServiceResult { @@ -531,34 +479,7 @@ where } - // Check if back pressure should be enabled. - self.check_for_back_pressure(); - - // NOTE: Pruning and committing offset adds to latency if done more frequently. - if (self.next_version_to_commit - self.last_committed_version).ge(&(self.config.commit_size as u64)) { - match self.message_receiver.commit() { - Err(err) => { - error!("[Commit] Error committing {err:?}"); - } - Ok(_) => { - self.last_committed_version = self.next_version_to_commit; - info!("Last commit at offset {}", self.last_committed_version); - } - }; - // else { - // } - } - - // Update the prune index and commit - let SuffixMeta { - prune_index, - prune_start_threshold, - .. - } = self.suffix.get_meta(); - - if prune_index.gt(prune_start_threshold) { - self.suffix_pruning(); - }; + self.run_once_housekeeping()?; Ok(()) } diff --git a/packages/talos_messenger_core/src/services/mod.rs b/packages/talos_messenger_core/src/services/mod.rs index dded7d0a..d3ae629a 100644 --- a/packages/talos_messenger_core/src/services/mod.rs +++ b/packages/talos_messenger_core/src/services/mod.rs @@ -1,3 +1,3 @@ mod inbound_service; -pub use inbound_service::{MessengerInboundService, MessengerInboundServiceConfig, TalosBackPressureConfig}; +pub use inbound_service::{MessengerInboundService, MessengerInboundServiceConfig}; diff --git a/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs b/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs index 5c9e78d8..bacd027e 100644 --- a/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs +++ b/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs @@ -1,10 +1,11 @@ use ahash::{AHashMap, HashMap, HashMapExt}; use talos_certifier::test_helpers::mocks::payload::{build_kafka_on_commit_message, build_on_commit_publish_kafka_payload, get_default_payload}; +use talos_common_utils::back_pressure::TalosBackPressureConfig; use talos_suffix::core::SuffixConfig; use crate::{ models::MessengerCandidateMessage, - services::{MessengerInboundServiceConfig, TalosBackPressureConfig}, + services::MessengerInboundServiceConfig, suffix::{self, MessengerSuffixAssertionTrait, MessengerSuffixTrait, SuffixItemState}, tests::{ payload::{ diff --git a/packages/talos_messenger_core/src/tests/test_utils.rs b/packages/talos_messenger_core/src/tests/test_utils.rs index 15c5ca6f..e183f953 100644 --- a/packages/talos_messenger_core/src/tests/test_utils.rs +++ b/packages/talos_messenger_core/src/tests/test_utils.rs @@ -12,6 +12,7 @@ use talos_certifier::{ }, ChannelMessage, }; +use talos_common_utils::back_pressure::TalosBackPressureConfig; use talos_suffix::{core::SuffixConfig, Suffix}; use tokio::{ @@ -23,7 +24,7 @@ use crate::{ core::{ActionService, MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, PublishActionType}, errors::{MessengerActionError, MessengerServiceResult}, models::MessengerCandidateMessage, - services::{MessengerInboundService, MessengerInboundServiceConfig, TalosBackPressureConfig}, + services::{MessengerInboundService, MessengerInboundServiceConfig}, suffix::MessengerCandidate, utlis::get_actions_deserialised, }; @@ -263,7 +264,7 @@ pub fn build_mock_outcome(conflict_version: Option, safepoint: Option) pub struct MessengerServicesTesterConfigs { suffix_configs: SuffixConfig, inbound_service_configs: MessengerInboundServiceConfig, - backpressure_configs: TalosBackPressureConfig, + back_pressure_configs: TalosBackPressureConfig, } impl MessengerServicesTesterConfigs { @@ -271,7 +272,7 @@ impl MessengerServicesTesterConfigs { MessengerServicesTesterConfigs { suffix_configs, inbound_service_configs, - backpressure_configs, + back_pressure_configs: backpressure_configs, } } } @@ -326,7 +327,7 @@ impl MessengerServiceTester Date: Wed, 2 Apr 2025 13:56:15 +1100 Subject: [PATCH 4/4] chore: add unit tests for backpressure --- .../talos_common_utils/src/back_pressure.rs | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) diff --git a/packages/talos_common_utils/src/back_pressure.rs b/packages/talos_common_utils/src/back_pressure.rs index 1687543e..71dea779 100644 --- a/packages/talos_common_utils/src/back_pressure.rs +++ b/packages/talos_common_utils/src/back_pressure.rs @@ -74,3 +74,157 @@ impl TalosBackPressureConfig { } } } + +#[cfg(test)] +mod test { + use super::TalosBackPressureConfig; + + #[test] + #[should_panic(expected = "min_threshold (Some(50)) must be less or equal to the max_threshold (Some(10))")] + fn test_panic_min_greater_than_max_threshold() { + let _ = TalosBackPressureConfig::new(Some(50), Some(10)); + } + + #[test] + fn test_back_pressure_when_min_equal_max_threshold() { + let mut bp = TalosBackPressureConfig::new(Some(10), Some(10)); + + // initially current is at 0. + assert_eq!(bp.current, 0); + for _ in 0..10 { + bp.increment_current() + } + // current set to 10. + assert_eq!(bp.current, 10); + + let should_enable = bp.should_apply_back_pressure(); + bp.update_back_pressure_flag(should_enable); + + // Because the current value is >= to max_threshold, we should enable backpressure. + assert!(bp.is_enabled); + + // Decrement the counter two times. + bp.decrement_current(); + bp.decrement_current(); + + let should_enable = bp.should_apply_back_pressure(); + bp.update_back_pressure_flag(should_enable); + + // Because the current value is < max_threshold and since max and min are same, we could disable backpressure. + assert!(!bp.is_enabled); + } + + #[test] + fn test_back_pressure_when_threshold_min_some_max_some() { + let mut bp = TalosBackPressureConfig::new(Some(10), Some(20)); + + // initially current is at 0. + assert_eq!(bp.current, 0); + for _ in 0..30 { + bp.increment_current() + } + // current == 30. + assert_eq!(bp.current, 30); + + let should_enable = bp.should_apply_back_pressure(); + bp.update_back_pressure_flag(should_enable); + + // Because the current value is >= to max_threshold, we should enable backpressure. + assert!(bp.is_enabled); + + // Decrement the counter 10 times. + for _ in 0..10 { + bp.decrement_current(); + } + // current == 20. + assert_eq!(bp.current, 20); + + let should_enable = bp.should_apply_back_pressure(); + bp.update_back_pressure_flag(should_enable); + + // Because the current value is == max_threshold, backpressure will remain on. + assert!(bp.is_enabled); + + // Decrement the counter 5 times. + for _ in 0..5 { + bp.decrement_current(); + } + // current == 15. + assert_eq!(bp.current, 15); + + let should_enable = bp.should_apply_back_pressure(); + bp.update_back_pressure_flag(should_enable); + + // Although the current value is < max_threshold, it is still greater than min_threshold, backpressure will remain on. + assert!(bp.is_enabled); + + // Decrement the counter 5 times. + for _ in 0..5 { + bp.decrement_current(); + } + // current == 10. + assert_eq!(bp.current, 10); + + let should_enable = bp.should_apply_back_pressure(); + bp.update_back_pressure_flag(should_enable); + + // Although the current value is < max_threshold and == min_threshold, backpressure can be disabled. + assert!(!bp.is_enabled); + } + + #[test] + fn test_back_pressure_when_threshold_min_none_max_some() { + let mut bp = TalosBackPressureConfig::new(None, Some(20)); + + // initially current is at 0. + assert_eq!(bp.current, 0); + for _ in 0..30 { + bp.increment_current() + } + // current == 30. + assert_eq!(bp.current, 30); + + let should_enable = bp.should_apply_back_pressure(); + bp.update_back_pressure_flag(should_enable); + + // Because the current value is >= to max_threshold, we should enable backpressure. + assert!(bp.is_enabled); + + // Decrement the counter 11 times. + for _ in 0..11 { + bp.decrement_current(); + } + // current == 19. + assert_eq!(bp.current, 19); + + let should_enable = bp.should_apply_back_pressure(); + bp.update_back_pressure_flag(should_enable); + + // Because the current value is < max_threshold, and min_threshold is None, backpressure will be disabled. + assert!(!bp.is_enabled); + } + + #[test] + fn test_back_pressure_when_threshold_min_none_max_none() { + let mut bp = TalosBackPressureConfig::new(None, None); + + // initially current is at 0. + assert_eq!(bp.current, 0); + for _ in 0..1_000 { + bp.increment_current() + } + // current == 1_000. + assert_eq!(bp.current, 1_000); + + let should_enable = bp.should_apply_back_pressure(); + + // Because the max_threshold is None, no back pressure. + assert!(!should_enable); + } + + #[test] + #[should_panic(expected = "min_threshold (Some(10)) must be less or equal to the max_threshold (None)")] + fn test_back_pressure_when_threshold_min_some_max_none() { + let _ = TalosBackPressureConfig::new(Some(10), None); + } +}