Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use talos_common_utils::env_var;
use talos_common_utils::{back_pressure::TalosBackPressureConfig, env_var};
use talos_messenger_actions::messenger_with_kafka::{messenger_with_kafka, Configuration};
use talos_messenger_core::utlis::{create_whitelist_actions_from_str, ActionsParserConfig};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
Expand Down Expand Up @@ -40,6 +40,7 @@ async fn main() {
channel_buffers: None,
commit_size: Some(2_000),
commit_frequency: None,
back_pressure_config: Some(TalosBackPressureConfig::new(None, Some(50_000))),
};

messenger_with_kafka(config).await.unwrap();
Expand Down
230 changes: 230 additions & 0 deletions packages/talos_common_utils/src/back_pressure.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
#[derive(Debug, Default, Clone)]
pub struct TalosBackPressureConfig {
/// Current count which is used to check against max and min to determine if back-pressure should be applied.
pub current: u32,
/// Flag denotes if back pressure is enabled, and therefore the thread cannot receive more messages (candidate/decisions) to process.
pub is_enabled: bool,
/// Max count before back pressue is enabled.
/// if `None`, back pressure logic will not apply.
pub max_threshold: Option<u32>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is "max count" then its better if we named the attribute with workd "count" in it.

Please change the order.
is enabled
thencount and max count.

/// `min_threshold` helps to prevent immediate toggle between switch on and off of the backpressure.
/// Batch of items to process, when back pressure is enabled before disable logic is checked?
/// if None, no minimum check is done, and as soon as the count is below the max_threshold, back pressure is disabled.
Comment on lines +11 to +12
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have another look at the description. I am not able to follow the purpose of this parameter.

pub min_threshold: Option<u32>,
}

impl TalosBackPressureConfig {
pub fn new(min_threshold: Option<u32>, max_threshold: Option<u32>) -> Self {
assert!(
min_threshold.le(&max_threshold),
"min_threshold ({min_threshold:?}) must be less or equal to the max_threshold ({max_threshold:?})"
);
Self {
max_threshold,
min_threshold,
is_enabled: false,
current: 0,
}
}

pub fn increment_current(&mut self) {
self.current += 1;
}

pub fn decrement_current(&mut self) {
self.current -= 1;
}

/// Get the remaining available count before hitting the max_threshold, and thereby enabling back pressure.
pub fn get_remaining_count(&self) -> Option<u32> {
self.max_threshold.map(|max| max.saturating_sub(self.current))
}

pub fn update_back_pressure_flag(&mut self, is_enabled: bool) {
self.is_enabled = is_enabled
}

/// Looks at the `max_threshold` and `min_threshold` to determine if back pressure should be enabled. `max_threshold` is used to determine when to enable the back-pressure
/// whereas, `min_threshold` is used to look at the lower bound
/// - `max_threshold` - Use to determine the upper bound of maximum items allowed.
/// If this is set to `None`, no back pressure will be applied.
/// `max_threshold` is
/// - `min_threshold` - Use to determine the lower bound of maximum items allowed. If this is set to `None`, no back pressure will be applied.
pub fn should_apply_back_pressure(&mut self) -> bool {
let current_count = self.current;
match self.max_threshold {
Some(max_threshold) => {
// if not enabled, only check against the max_threshold.
if current_count >= max_threshold {
true
} else {
// if already enabled, check when is it safe to remove.

// If there is Some(`min_threshold`), then return true if current_count > `min_threshold`.
// If there is Some(`min_threshold`), then return false if current_count <= `min_threshold`.
// If None, then return false.
match self.min_threshold {
Some(min_threshold) if self.is_enabled => current_count > min_threshold,
_ => false,
}
}
}
// if None, then we don't apply any back pressure.
None => false,
}
}
}

#[cfg(test)]
mod test {
use super::TalosBackPressureConfig;

#[test]
#[should_panic(expected = "min_threshold (Some(50)) must be less or equal to the max_threshold (Some(10))")]
fn test_panic_min_greater_than_max_threshold() {
let _ = TalosBackPressureConfig::new(Some(50), Some(10));
}

#[test]
fn test_back_pressure_when_min_equal_max_threshold() {
let mut bp = TalosBackPressureConfig::new(Some(10), Some(10));

// initially current is at 0.
assert_eq!(bp.current, 0);
for _ in 0..10 {
bp.increment_current()
}
// current set to 10.
assert_eq!(bp.current, 10);

let should_enable = bp.should_apply_back_pressure();
bp.update_back_pressure_flag(should_enable);

// Because the current value is >= to max_threshold, we should enable backpressure.
assert!(bp.is_enabled);

// Decrement the counter two times.
bp.decrement_current();
bp.decrement_current();

let should_enable = bp.should_apply_back_pressure();
bp.update_back_pressure_flag(should_enable);

// Because the current value is < max_threshold and since max and min are same, we could disable backpressure.
assert!(!bp.is_enabled);
}

#[test]
fn test_back_pressure_when_threshold_min_some_max_some() {
let mut bp = TalosBackPressureConfig::new(Some(10), Some(20));

// initially current is at 0.
assert_eq!(bp.current, 0);
for _ in 0..30 {
bp.increment_current()
}
// current == 30.
assert_eq!(bp.current, 30);

let should_enable = bp.should_apply_back_pressure();
bp.update_back_pressure_flag(should_enable);

// Because the current value is >= to max_threshold, we should enable backpressure.
assert!(bp.is_enabled);

// Decrement the counter 10 times.
for _ in 0..10 {
bp.decrement_current();
}
// current == 20.
assert_eq!(bp.current, 20);

let should_enable = bp.should_apply_back_pressure();
bp.update_back_pressure_flag(should_enable);

// Because the current value is == max_threshold, backpressure will remain on.
assert!(bp.is_enabled);

// Decrement the counter 5 times.
for _ in 0..5 {
bp.decrement_current();
}
// current == 15.
assert_eq!(bp.current, 15);

let should_enable = bp.should_apply_back_pressure();
bp.update_back_pressure_flag(should_enable);

// Although the current value is < max_threshold, it is still greater than min_threshold, backpressure will remain on.
assert!(bp.is_enabled);

// Decrement the counter 5 times.
for _ in 0..5 {
bp.decrement_current();
}
// current == 10.
assert_eq!(bp.current, 10);

let should_enable = bp.should_apply_back_pressure();
bp.update_back_pressure_flag(should_enable);

// Although the current value is < max_threshold and == min_threshold, backpressure can be disabled.
assert!(!bp.is_enabled);
}

#[test]
fn test_back_pressure_when_threshold_min_none_max_some() {
let mut bp = TalosBackPressureConfig::new(None, Some(20));

// initially current is at 0.
assert_eq!(bp.current, 0);
for _ in 0..30 {
bp.increment_current()
}
// current == 30.
assert_eq!(bp.current, 30);

let should_enable = bp.should_apply_back_pressure();
bp.update_back_pressure_flag(should_enable);

// Because the current value is >= to max_threshold, we should enable backpressure.
assert!(bp.is_enabled);

// Decrement the counter 11 times.
for _ in 0..11 {
bp.decrement_current();
}
// current == 19.
assert_eq!(bp.current, 19);

let should_enable = bp.should_apply_back_pressure();
bp.update_back_pressure_flag(should_enable);

// Because the current value is < max_threshold, and min_threshold is None, backpressure will be disabled.
assert!(!bp.is_enabled);
}

#[test]
fn test_back_pressure_when_threshold_min_none_max_none() {
let mut bp = TalosBackPressureConfig::new(None, None);

// initially current is at 0.
assert_eq!(bp.current, 0);
for _ in 0..1_000 {
bp.increment_current()
}
// current == 1_000.
assert_eq!(bp.current, 1_000);

let should_enable = bp.should_apply_back_pressure();

// Because the max_threshold is None, no back pressure.
assert!(!should_enable);
}

#[test]
#[should_panic(expected = "min_threshold (Some(10)) must be less or equal to the max_threshold (None)")]
fn test_back_pressure_when_threshold_min_some_max_none() {
let _ = TalosBackPressureConfig::new(Some(10), None);
}
}
1 change: 1 addition & 0 deletions packages/talos_common_utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod back_pressure;
pub mod env;
14 changes: 13 additions & 1 deletion packages/talos_messenger_actions/src/kafka/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures_executor::block_on;
use log::error;
use log::{debug, error};
use rdkafka::{producer::ProducerContext, ClientContext};
use talos_messenger_core::{core::MessengerChannelFeedback, errors::MessengerActionError};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -31,6 +31,12 @@ impl ProducerContext for MessengerProducerContext {
// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) {
error!("[Messenger Producer Context] Error sending feedback for version={version} with error={error:?}");
} else {
debug!(
"Finished sending success feedback from producer contextfor an action from version = {}. Total feedbacks remaining to process = {} ",
delivery_opaque.version,
self.tx_feedback_channel.max_capacity() - self.tx_feedback_channel.capacity()
);
};
}
Err((publish_error, borrowed_message)) => {
Expand All @@ -51,6 +57,12 @@ impl ProducerContext for MessengerProducerContext {
Box::new(messenger_error),
))) {
error!("[Messenger Producer Context] Error sending error feedback for version={version} with \npublish_error={publish_error:?} \nchannel send_error={send_error:?}");
} else {
debug!(
"Finshed sending error feedback send from producer context for an action from version = {}. Total feedbacks remaining to process = {} ",
delivery_opaque.version,
self.tx_feedback_channel.max_capacity() - self.tx_feedback_channel.capacity()
);
};
}
}
Expand Down
5 changes: 4 additions & 1 deletion packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ where
commit_actions,
headers,
} = actions;

if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()) {
match get_actions_deserialised::<Vec<KafkaAction>>(publish_actions_for_type) {
Ok(actions) => {
debug!(
"Sending actions for version = {version} \n| Remaining messages in the channel = {}",
self.rx_actions_channel.max_capacity() - self.rx_actions_channel.capacity()
);
let total_len = actions.len() as u32;

let headers_cloned = headers.clone();
Expand Down
12 changes: 11 additions & 1 deletion packages/talos_messenger_actions/src/messenger_with_kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use log::debug;
use rdkafka::producer::ProducerContext;
use talos_certifier::ports::{errors::MessagePublishError, MessageReciever};
use talos_certifier_adapters::KafkaConsumer;
use talos_common_utils::back_pressure::TalosBackPressureConfig;
use talos_messenger_core::{
core::{MessengerPublisher, PublishActionType},
errors::{MessengerServiceError, MessengerServiceErrorKind, MessengerServiceResult},
Expand Down Expand Up @@ -110,6 +111,8 @@ pub struct Configuration {
pub commit_size: Option<u32>,
/// Commit issuing frequency.
pub commit_frequency: Option<u32>,
/// messenger can receive more messages.
pub back_pressure_config: Option<TalosBackPressureConfig>,
}

pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult {
Expand All @@ -135,7 +138,14 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu
// START - Inbound service
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(config.suffix_config.unwrap_or_default());
let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size, config.commit_frequency);
let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config);
let inbound_service = MessengerInboundService::new(
kafka_consumer,
tx_actions_channel,
rx_feedback_channel,
suffix,
inbound_service_config,
config.back_pressure_config.unwrap_or_default(),
);
// END - Inbound service

// START - Publish service
Expand Down
2 changes: 1 addition & 1 deletion packages/talos_messenger_core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use thiserror::Error as ThisError;

pub type MessengerServiceResult = Result<(), MessengerServiceError>;
pub type MessengerServiceResult<T = ()> = Result<T, MessengerServiceError>;

#[derive(Debug, PartialEq, Clone)]
pub enum MessengerActionErrorKind {
Expand Down
Loading
Loading