diff --git a/Cargo.toml b/Cargo.toml index e5aa3dd7..4a9747e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ env_logger = "0.10" # Json Serialize / Deserialize serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde_json = { version = "1.0", features = ["raw_value"] } # Async tokio = { version = "1", features = ["full", "test-util"] } diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index b823c974..c13c2b77 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -10,7 +10,7 @@ use talos_messenger_core::{ core::{ActionService, MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, MessengerSystemService}, errors::MessengerServiceResult, suffix::MessengerStateTransitionTimestamps, - utlis::get_actions_deserialised, + utlis::get_action_deserialised, }; use super::models::KafkaAction; @@ -38,36 +38,36 @@ where headers, } = actions; - if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()) { - match get_actions_deserialised::>(publish_actions_for_type) { - Ok(actions) => { - let total_len = actions.len() as u32; + if let Some(publish_actions) = commit_actions.get(&self.publisher.get_publish_type().to_string()) { + let total_len = publish_actions.len() as u32; + let mut publish_vec = vec![]; + for publish_action in publish_actions { + match get_action_deserialised::(publish_action.clone()) { + Ok(action) => { + let headers_cloned = headers.clone(); - let headers_cloned = headers.clone(); - - let publish_vec = actions.into_iter().map(|action| { let publisher = self.publisher.clone(); let mut headers = headers_cloned.clone(); let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap(); headers.insert(MessengerStateTransitionTimestamps::EndOnCommitActions.to_string(), timestamp); - async move { + publish_vec.push(async move { if let Err(publish_error) = publisher.send(version, action, headers, total_len).await { error!("Failed to publish message for version={version} with error {publish_error}") } - } - }); - join_all(publish_vec).await; - } - Err(err) => { - error!( - "Failed to deserialise for version={version} key={} for data={:?} with error={:?}", - &self.publisher.get_publish_type(), - err.data, - err.reason - ); + }); + } + Err(err) => { + error!( + "Failed to deserialise for version={version} key={} for data={:?} with error={:?}", + &self.publisher.get_publish_type(), + err.data, + err.reason + ); + } } } + join_all(publish_vec).await; } } None => { diff --git a/packages/talos_messenger_core/src/core.rs b/packages/talos_messenger_core/src/core.rs index 162a9fc1..034acf0e 100644 --- a/packages/talos_messenger_core/src/core.rs +++ b/packages/talos_messenger_core/src/core.rs @@ -1,7 +1,7 @@ use ahash::HashMap; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use serde_json::value::RawValue; use strum::{Display, EnumIter, EnumString}; use talos_certifier::ports::errors::MessagePublishError; @@ -50,7 +50,7 @@ pub trait MessengerSystemService { #[derive(Debug, Clone)] pub struct MessengerCommitActions { pub version: u64, - pub commit_actions: HashMap, + pub commit_actions: HashMap>>, pub headers: HashMap, } diff --git a/packages/talos_messenger_core/src/models/candidate_message.rs b/packages/talos_messenger_core/src/models/candidate_message.rs index 47e53e18..de599c86 100644 --- a/packages/talos_messenger_core/src/models/candidate_message.rs +++ b/packages/talos_messenger_core/src/models/candidate_message.rs @@ -1,9 +1,12 @@ -use serde::{Deserialize, Serialize}; -use serde_json::Value; +use log::{error, warn}; +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::{value::RawValue, Value}; use std::collections::HashMap; use talos_certifier::core::CandidateMessageBaseTrait; -#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub type OnCommitActions = HashMap>>>; + +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase", tag = "_typ")] pub struct MessengerCandidateMessage { // UNIQUENESS FIELD @@ -18,7 +21,8 @@ pub struct MessengerCandidateMessage { #[serde(skip_serializing_if = "Option::is_none")] pub metadata: Option>, #[serde(skip_serializing_if = "Option::is_none")] - pub on_commit: Option>, + #[serde(default, deserialize_with = "deserialize_oncommit_actions")] + pub on_commit: Option, /// Cohort started certification #[serde(default)] pub certification_started_at: i128, @@ -62,6 +66,43 @@ impl CandidateMessageBaseTrait for MessengerCandidateMessage { } } +fn deserialize_oncommit_actions<'de, D: Deserializer<'de>>(deserializer: D) -> Result, D::Error> { + let val = Value::deserialize(deserializer)?; + + let final_deserialized_output = match val { + Value::Null => Ok(None), + Value::Object(map) => { + let mut result = HashMap::new(); + for (k1, v1) in map { + if let Value::Object(inner_map) = v1 { + let mut inner_result = HashMap::new(); + for (k2, v2) in inner_map { + if let Ok(raw_value) = serde_json::from_value(v2) { + inner_result.insert(k2, raw_value); + } else { + warn!("Failed to deserialise on_commit action. Inner value is not deserializable to Vec>"); + return Ok(None); // If any inner value fails, return None. + } + } + result.insert(k1, inner_result); + } else { + warn!("Failed to deserialise as on_commit action. Outer value is not Object type."); + return Ok(None); // If any outer value is not an object, return None. + } + } + Ok(Some(result)) + } + // Return None for any other type + _ => { + warn!("Failed to deserialise as on_commit action as it is not of the expected structure. Received on_commit_actions = {val:?}"); + Ok(None) + } + }; + error!("Result of deserialization is {final_deserialized_output:?}"); + + final_deserialized_output +} + // // $coverage:ignore-start #[cfg(test)] mod tests { @@ -111,4 +152,68 @@ mod tests { // error!("deserialised candidate message: {deserialised:#?}"); assert!(deserialised.on_commit.is_some()); } + #[test] + fn deserialise_invalid_on_commit_action() { + let on_commit = json!("Test"); + + let json = json!({ + "xid": "1a5793f8-4ca3-420f-8e81-219e7b039e2a", + "agent": "test-agent", + "cohort": "test-cohort", + "readset": [ + "c5893d97-30f6-446d-a349-1741f7eff599" + ], + "readvers": [], + "snapshot": 2, + "writeset": [ + "c5893d97-30f6-446d-a349-1741f7eff599" + ], + "statemap": [ + { + "SomeStateMap": { + "initialVersion": 0, + "newVersion": 20 + } + } + ], + "onCommit": on_commit, + "certificationStartedAt": 0, + "requestCreatedAt": 0, + "publishedAt": 0, + "receivedAt": 0, + }); + + let deserialised: MessengerCandidateMessage = serde_json::from_value(json).unwrap(); + assert!(deserialised.on_commit.is_none()); + } + #[test] + fn deserialise_no_on_commit_action() { + let json = json!({ + "xid": "1a5793f8-4ca3-420f-8e81-219e7b039e2a", + "agent": "test-agent", + "cohort": "test-cohort", + "readset": [ + "c5893d97-30f6-446d-a349-1741f7eff599" + ], + "readvers": [], + "snapshot": 2, + "writeset": [ + "c5893d97-30f6-446d-a349-1741f7eff599" + ], + "statemap": [ + { + "SomeStateMap": { + "initialVersion": 0, + "newVersion": 20 + } + } + ], + "certificationStartedAt": 0, + "requestCreatedAt": 0, + "publishedAt": 0, + "receivedAt": 0, + }); + let deserialised: MessengerCandidateMessage = serde_json::from_value(json).unwrap(); + assert!(deserialised.on_commit.is_none()); + } } diff --git a/packages/talos_messenger_core/src/models/mod.rs b/packages/talos_messenger_core/src/models/mod.rs index a871d017..6796adf5 100644 --- a/packages/talos_messenger_core/src/models/mod.rs +++ b/packages/talos_messenger_core/src/models/mod.rs @@ -1,3 +1,3 @@ mod candidate_message; -pub use candidate_message::MessengerCandidateMessage; +pub use candidate_message::{MessengerCandidateMessage, OnCommitActions}; diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 7d00d6dd..70ce03ea 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use ahash::{HashMap, HashMapExt}; +use ahash::HashMap; use async_trait::async_trait; use log::{debug, error, info, warn}; @@ -88,24 +88,19 @@ where /// async fn process_next_actions(&mut self) -> MessengerServiceResult { let items_to_process = self.suffix.get_suffix_items_to_process(); - for item in items_to_process { - let ver = item.version; - let mut headers = item.headers; + for mut payload in items_to_process { + let ver = payload.version; + + // let mut headers = payload.headers; let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap(); - headers.insert(MessengerStateTransitionTimestamps::StartOnCommitActions.to_string(), timestamp); - - let payload_to_send = MessengerCommitActions { - version: ver, - commit_actions: item.actions.iter().fold(HashMap::new(), |mut acc, (key, value)| { - acc.insert(key.to_string(), value.get_payload().clone()); - acc - }), - headers, - }; + payload + .headers + .insert(MessengerStateTransitionTimestamps::StartOnCommitActions.to_string(), timestamp); + // send for publishing - self.tx_actions_channel.send(payload_to_send).await.map_err(|e| MessengerServiceError { + self.tx_actions_channel.send(payload).await.map_err(|e| MessengerServiceError { kind: crate::errors::MessengerServiceErrorKind::Channel, reason: e.to_string(), data: Some(ver.to_string()), @@ -141,7 +136,9 @@ where if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { // error!("Pruning till index {index_to_prune}"); // Call prune method on suffix. - let _ = self.suffix.prune_till_index(index_to_prune); + if let Ok(pruned_items) = self.suffix.prune_till_index(index_to_prune) { + drop(pruned_items); + } debug!( "[Suffix Pruning] suffix after prune printed as tuple of (index, ver, decsision_ver) \n\n {:?}", self.suffix.retrieve_all_some_vec_items() @@ -331,6 +328,9 @@ where let new_offset = if new_prune_version.eq(&last_version) { if let Some(decision_vers) = last_version_decision_ver { debug!("Updating to decision offset ({:?})", decision_vers); + + // error!("Resetting suffix to initial state"); + // self.suffix.reset(); decision_vers } else { new_prune_version @@ -346,6 +346,7 @@ where error!("[Commit] Error committing {err:?}"); } Ok(_) => { + info!("Committed to new offset {new_offset}"); self.last_committed_version = new_offset; }, } diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs index 8e50789d..57f5f4b2 100644 --- a/packages/talos_messenger_core/src/suffix.rs +++ b/packages/talos_messenger_core/src/suffix.rs @@ -1,14 +1,14 @@ use ahash::{HashMap, HashMapExt}; use log::{debug, warn}; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use serde_json::value::RawValue; use std::fmt::Debug; use strum::{Display, EnumString}; use talos_certifier::model::{Decision, DecisionMessageTrait}; use talos_suffix::{core::SuffixResult, Suffix, SuffixTrait}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; -use crate::models::MessengerCandidateMessage; +use crate::{core::MessengerCommitActions, models::MessengerCandidateMessage}; pub trait MessengerSuffixItemTrait: Debug + Clone { fn set_state(&mut self, state: SuffixItemState); @@ -78,7 +78,7 @@ pub trait MessengerSuffixTrait: SuffixTrait { /// Get the state of an item by version. fn get_item_state(&self, version: u64) -> Option; /// Gets the suffix items eligible to process. - fn get_suffix_items_to_process(&self) -> Vec; + fn get_suffix_items_to_process(&self) -> Vec; /// Updates the decision for a version. fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D, headers: HashMap); /// Updates the action for a version using the action_key for lookup. @@ -151,15 +151,16 @@ pub enum SuffixItemCompleteStateReason { ErrorProcessing, } -#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct AllowedActionsMapItem { - payload: Value, + payload: Vec>, count: u32, total_count: u32, } impl AllowedActionsMapItem { - pub fn new(payload: Value, total_count: u32) -> Self { + pub fn new(payload: Vec>) -> Self { + let total_count = payload.len() as u32; AllowedActionsMapItem { payload, count: 0, @@ -171,7 +172,7 @@ impl AllowedActionsMapItem { self.count += 1; } - pub fn get_payload(&self) -> &Value { + pub fn get_payload(&self) -> &Vec> { &self.payload } @@ -184,14 +185,14 @@ impl AllowedActionsMapItem { } } -#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] -pub struct ActionsMapWithVersion { - pub actions: HashMap, - pub version: u64, - pub headers: HashMap, -} +// #[derive(Debug, Serialize, Deserialize, Clone)] +// pub struct ActionsMapWithVersion { +// pub actions: HashMap, +// pub version: u64, +// pub headers: HashMap, +// } -#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct MessengerCandidate { pub candidate: MessengerCandidateMessage, /// Safepoint received for committed outcomes from certifier. @@ -310,13 +311,13 @@ where } } - fn get_suffix_items_to_process(&self) -> Vec { + fn get_suffix_items_to_process(&self) -> Vec { let current_prune_index = self.get_meta().prune_index; let start_index = current_prune_index.unwrap_or(0); // let start_ms = Instant::now(); - let items: Vec = self + let items: Vec = self .messages // we know from start_index = prune_index, everything prioir to this is already completed. // This helps in taking a smaller slice out of the suffix to iterate over. @@ -357,9 +358,12 @@ where headers.extend(state_timestamps_headers); - ActionsMapWithVersion { + MessengerCommitActions { version: x.item_ver, - actions: x.item.get_commit_actions().clone(), + commit_actions: x.item.get_commit_actions().iter().fold(HashMap::new(), |mut acc, (key, value)| { + acc.insert(key.to_string(), value.get_payload().clone()); + acc + }), headers, } }) @@ -431,7 +435,7 @@ where let index = self.index_from_head(safe_prune_version)?; self.update_prune_index(index.into()); - debug!("[Update prune index] Prune version updated to {index} (version={safe_prune_version}"); + debug!("[Update prune index] Prune version updated to {index} (version={safe_prune_version})"); Some((index, safe_prune_version)) } diff --git a/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs b/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs index 821e661f..b0a02101 100644 --- a/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs +++ b/packages/talos_messenger_core/src/tests/messenger_inbound_service.rs @@ -1,4 +1,5 @@ use ahash::{AHashMap, HashMap, HashMapExt}; +use serde_json::json; use talos_certifier::test_helpers::mocks::payload::{build_kafka_on_commit_message, build_on_commit_publish_kafka_payload, get_default_payload}; use talos_suffix::core::SuffixConfig; @@ -154,8 +155,7 @@ async fn test_suffix_item_state_by_on_commit() { assert!(candidate_with_no_on_commit.on_commit.is_none()); // Candidate with no supported on-commit action - let on_commit = MockOnCommitMessage::build_from_str(r#"{"notSuportedAction": {"name": "Kindred"}}"#); - let on_commit_value = on_commit.as_value(); + let on_commit_value = json!({"notSuportedAction": {"subAction": [{"name": "Kindred"}]}}); let candidate_with_irrelevant_on_commit: MessengerCandidateMessage = CandidateTestPayload::new().add_on_commit(&on_commit_value).build(); assert!(candidate_with_irrelevant_on_commit.on_commit.is_some()); @@ -489,8 +489,7 @@ async fn test_suffix_exhaustive_state_transitions_without_pruning() { let candidate_with_no_on_commit: MessengerCandidateMessage = CandidateTestPayload::new().build(); assert!(candidate_with_no_on_commit.on_commit.is_none()); // Candidate with no supported on-commit action - let on_commit = MockOnCommitMessage::build_from_str(r#"{"notSuportedAction": {"name": "Kindred"}}"#); - let on_commit_value = on_commit.as_value(); + let on_commit_value = json!({"notSuportedAction": {"subAction": [{"name": "Kindred"}]}}); let candidate_with_irrelevant_on_commit: MessengerCandidateMessage = CandidateTestPayload::new().add_on_commit(&on_commit_value).build(); assert!(candidate_with_irrelevant_on_commit.on_commit.is_some()); @@ -796,8 +795,7 @@ async fn test_suffix_prune_index_update_all_candidates_with_commit_decision_earl let candidate_with_no_on_commit: MessengerCandidateMessage = CandidateTestPayload::new().build(); assert!(candidate_with_no_on_commit.on_commit.is_none()); // Candidate with no supported on-commit action - let on_commit = MockOnCommitMessage::build_from_str(r#"{"notSuportedAction": {"name": "Kindred"}}"#); - let on_commit_value = on_commit.as_value(); + let on_commit_value = json!({"notSuportedAction": {"subAction": [{"name": "Kindred"}]}}); let candidate_with_irrelevant_on_commit: MessengerCandidateMessage = CandidateTestPayload::new().add_on_commit(&on_commit_value).build(); assert!(candidate_with_irrelevant_on_commit.on_commit.is_some()); diff --git a/packages/talos_messenger_core/src/tests/test_utils.rs b/packages/talos_messenger_core/src/tests/test_utils.rs index 72afb5a6..2434932e 100644 --- a/packages/talos_messenger_core/src/tests/test_utils.rs +++ b/packages/talos_messenger_core/src/tests/test_utils.rs @@ -24,7 +24,7 @@ use crate::{ models::MessengerCandidateMessage, services::{MessengerInboundService, MessengerInboundServiceConfig}, suffix::MessengerCandidate, - utlis::get_actions_deserialised, + utlis::get_action_deserialised, }; use super::payload::on_commit::MockOnCommitKafkaMessage; @@ -180,10 +180,10 @@ impl ActionService for MockActionService { } = actions; error!("Headers... {headers:#?}"); - if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()) { - match get_actions_deserialised::>(publish_actions_for_type) { - Ok(actions) => { - for _ in actions { + if let Some(publish_actions) = commit_actions.get(&self.publisher.get_publish_type().to_string()) { + for publish_action in publish_actions { + match get_action_deserialised::(publish_action.clone()) { + Ok(_) => { match headers.get("feedbackType").map(|f| f.to_owned()) { Some(feedback) if feedback == FeedbackTypeHeader::Error.to_string() => { self.publisher.send_error_feedback(version).await.unwrap(); @@ -201,14 +201,14 @@ impl ActionService for MockActionService { } }; } - } - Err(err) => { - error!( - "Failed to deserialise for version={version} key={} for data={:?} with error={:?}", - &self.publisher.get_publish_type(), - err.data, - err.reason - ); + Err(err) => { + error!( + "Failed to deserialise for version={version} key={} for data={:?} with error={:?}", + &self.publisher.get_publish_type(), + err.data, + err.reason + ); + } } } } diff --git a/packages/talos_messenger_core/src/tests/utils.rs b/packages/talos_messenger_core/src/tests/utils.rs index aa160ba3..1ca8c4a7 100644 --- a/packages/talos_messenger_core/src/tests/utils.rs +++ b/packages/talos_messenger_core/src/tests/utils.rs @@ -1,7 +1,10 @@ use ahash::{HashMap, HashMapExt}; -use serde_json::{json, Value}; +use serde_json::{json, value::RawValue}; -use crate::utlis::{create_whitelist_actions_from_str, get_actions_deserialised, get_allowed_commit_actions, ActionsParserConfig}; +use crate::{ + models::OnCommitActions, + utlis::{create_whitelist_actions_from_str, get_action_deserialised, get_allowed_commit_actions, ActionsParserConfig}, +}; // Start - testing create_whitelist_actions_from_str function #[test] @@ -50,26 +53,28 @@ fn test_fn_get_allowed_commit_actions_allowed_actions_negative_scenarios() { } }); + let on_commit_actions_deserialised: OnCommitActions = serde_json::from_value(on_commit_actions).unwrap(); + // When allowed action map is empty. - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + let result = get_allowed_commit_actions(&on_commit_actions_deserialised, &allowed_actions); assert!(result.is_empty()); // When allowed action is supported type by the messenger, but the sub actions are not provided allowed_actions.clear(); allowed_actions.insert("publish".to_string(), vec![]); - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + let result = get_allowed_commit_actions(&on_commit_actions_deserialised, &allowed_actions); assert!(result.is_empty()); // When allowed action is supported type by the messenger, but the sub actions are not supported allowed_actions.clear(); allowed_actions.insert("publish".to_string(), vec!["sqs".to_string(), "sns".to_string()]); - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + let result = get_allowed_commit_actions(&on_commit_actions_deserialised, &allowed_actions); assert!(result.is_empty()); // When allowed action is non supported type by the messenger, with empty sub type allowed_actions.clear(); allowed_actions.insert("random".to_string(), vec![]); - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + let result = get_allowed_commit_actions(&on_commit_actions_deserialised, &allowed_actions); assert!(result.is_empty()); // When allowed action is non supported type by the messenger, but has valid sub actions @@ -78,7 +83,7 @@ fn test_fn_get_allowed_commit_actions_allowed_actions_negative_scenarios() { "random".to_string(), vec!["sqs".to_string(), "sns".to_string(), "kafka".to_string(), "mqtt".to_string()], ); - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + let result = get_allowed_commit_actions(&on_commit_actions_deserialised, &allowed_actions); assert!(result.is_empty()); } @@ -92,39 +97,36 @@ fn test_fn_get_allowed_commit_actions_on_commit_action_negative_scenarios() { // When on_commit_actions are not present let on_commit_actions = serde_json::json!({}); - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); - assert!(result.is_empty()); + let on_commit_actions_deserialised: OnCommitActions = serde_json::from_value(on_commit_actions).unwrap(); - // When on_commit_actions is of type not supported by messenger - // 1. When actions is an array - let on_commit_actions = serde_json::json!([1, 2, 3, 4]); - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + let result = get_allowed_commit_actions(&on_commit_actions_deserialised, &allowed_actions); assert!(result.is_empty()); - // 2. When actions is some other object type + // When actions is some other object type let on_commit_actions = serde_json::json!({ "test": { - "a": "foo", - "kafka": "bar" + "a": vec!["foo"], + "kafka": vec!["bar"] } }); - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + let on_commit_actions_deserialised: OnCommitActions = serde_json::from_value(on_commit_actions).unwrap(); + let result = get_allowed_commit_actions(&on_commit_actions_deserialised, &allowed_actions); assert!(result.is_empty()); // When on_commit_actions is valid json supported by messenger, but not the action required by messenger let on_commit_actions = serde_json::json!({ - "random": { - "kafka": [ - { - "_typ": "KafkaMessage", - "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider - "value": "test" - }, - { - "_typ": "KafkaMessage", - "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider - "value": "test" - } + "random": { + "kafka": [ + { + "_typ": "KafkaMessage", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + }, + { + "_typ": "KafkaMessage", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + } ], "mqtt": [ { @@ -132,20 +134,22 @@ fn test_fn_get_allowed_commit_actions_on_commit_action_negative_scenarios() { "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider "value": "test" } - ] - } - }); - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + ] + } + }); + let on_commit_actions_deserialised: OnCommitActions = serde_json::from_value(on_commit_actions).unwrap(); + let result = get_allowed_commit_actions(&on_commit_actions_deserialised, &allowed_actions); assert!(result.is_empty()); // When on_commit_actions is valid json supported by messenger, with valid action, but the sub-actions are not supported by messenger let on_commit_actions = serde_json::json!({ "publish": { - "foo": "Lorem", - "bar": "Ipsum" + "foo": vec!["Lorem"], + "bar": vec!["Ipsum"] } }); - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + let on_commit_actions_deserialised: OnCommitActions = serde_json::from_value(on_commit_actions).unwrap(); + let result = get_allowed_commit_actions(&on_commit_actions_deserialised, &allowed_actions); assert!(result.is_empty()); } @@ -154,18 +158,18 @@ fn test_fn_get_allowed_commit_actions_positive_scenario() { let mut allowed_actions = HashMap::new(); let on_commit_actions = serde_json::json!({ - "publish": { - "kafka": [ - { - "_typ": "KafkaMessage", - "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider - "value": "test" - }, - { - "_typ": "KafkaMessage", - "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider - "value": "test" - } + "publish": { + "kafka": [ + { + "_typ": "KafkaMessage", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + }, + { + "_typ": "KafkaMessage", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + } ], "mqtt": [ { @@ -173,15 +177,16 @@ fn test_fn_get_allowed_commit_actions_positive_scenario() { "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider "value": "test" } - ] - } - }); + ] + } + }); + let on_commit_actions_deserialised: OnCommitActions = serde_json::from_value(on_commit_actions).unwrap(); allowed_actions.insert( "publish".to_string(), vec!["sqs".to_string(), "sns".to_string(), "kafka".to_string(), "mqtt".to_string()], ); - let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + let result = get_allowed_commit_actions(&on_commit_actions_deserialised, &allowed_actions); assert_eq!(result.len(), 2); } @@ -198,30 +203,40 @@ fn test_fn_get_allowed_commit_actions_positive_scenario() { // 2. Key #[test] fn test_fn_get_actions_deserialised_actions_incorrect_arguments() { - let mut actions_map: HashMap = HashMap::new(); + let mut actions_map: HashMap> = HashMap::new(); - // When value is empty string. - actions_map.insert("kafka".to_string(), "".into()); - let result = get_actions_deserialised::>(actions_map.get("kafka").unwrap()); + // When value is string. + let json = json!(""); + let json_raw_value = RawValue::from_string(json.to_string()).unwrap(); + actions_map.insert("kafka".to_string(), *Box::new(json_raw_value)); + let result = get_action_deserialised::(actions_map.get("kafka").unwrap().clone()); assert!(result.is_err()); // When value is Array of string, but we want to parse it to array of u32. - actions_map.insert("kafka".to_string(), vec!["foo", "bar"].into()); - let result = get_actions_deserialised::>(actions_map.get("kafka").unwrap()); + let json = json!(vec!["foo", "bar"]); + let json_raw_value = RawValue::from_string(json.to_string()).unwrap(); + + actions_map.insert("kafka".to_string(), json_raw_value); + let result = get_action_deserialised::>(actions_map.get("kafka").unwrap().clone()); assert!(result.is_err()); } #[test] fn test_fn_get_actions_deserialised_actions_correct_arguments_passed() { - let mut actions_map: HashMap = HashMap::new(); + let mut actions_map: HashMap> = HashMap::new(); // When value is empty string. - actions_map.insert("kafka".to_string(), "".into()); - let result = get_actions_deserialised::(actions_map.get("kafka").unwrap()); + let json = json!(""); + let json_raw_value = RawValue::from_string(json.to_string()).unwrap(); + actions_map.insert("kafka".to_string(), json_raw_value); + let result = get_action_deserialised::(actions_map.get("kafka").unwrap().clone()); assert!(result.is_ok()); // When value is Array of string. - actions_map.insert("kafka".to_string(), vec!["foo", "bar"].into()); - let result = get_actions_deserialised::>(actions_map.get("kafka").unwrap()); + + let json = json!(vec!["foo", "bar"]); + let json_raw_value = RawValue::from_string(json.to_string()).unwrap(); + actions_map.insert("kafka".to_string(), json_raw_value); + let result = get_action_deserialised::>(actions_map.get("kafka").unwrap().clone()); assert!(result.is_ok()); // More complex type @@ -233,18 +248,14 @@ fn test_fn_get_actions_deserialised_actions_correct_arguments_passed() { state: String, } - actions_map.insert( - "address".to_string(), - json!( - - { - "street_number": 47, - "street": "Wallaby Way".to_string(), - "city": "Sydney".to_string(), - "state": "New South Wales".to_string(), - } - ), - ); - let result = get_actions_deserialised::
(actions_map.get("address").unwrap()); + let json = json!({ + "street_number": 47, + "street": "Wallaby Way".to_string(), + "city": "Sydney".to_string(), + "state": "New South Wales".to_string(), + }); + let json_raw_value = RawValue::from_string(json.to_string()).unwrap(); + actions_map.insert("address".to_string(), json_raw_value); + let result = get_action_deserialised::
(actions_map.get("address").unwrap().clone()); assert!(result.is_ok()); } diff --git a/packages/talos_messenger_core/src/utlis.rs b/packages/talos_messenger_core/src/utlis.rs index 88715c1b..970e56a6 100644 --- a/packages/talos_messenger_core/src/utlis.rs +++ b/packages/talos_messenger_core/src/utlis.rs @@ -2,9 +2,9 @@ use std::any::type_name; use ahash::{HashMap, HashMapExt}; use serde::de::DeserializeOwned; -use serde_json::Value; +use serde_json::{value::RawValue, Value}; -use crate::{errors::MessengerActionError, suffix::AllowedActionsMapItem}; +use crate::{errors::MessengerActionError, models::OnCommitActions, suffix::AllowedActionsMapItem}; #[derive(Debug, Clone, Copy)] pub struct ActionsParserConfig<'a> { @@ -73,17 +73,30 @@ pub fn get_value_by_key<'a>(value: &'a Value, key: &str) -> Option<&'a Value> { /// Create a Hashmap of all the actions that require to be actioned by the messenger. /// Key for the map is the Action type. eg: "kafka", "mqtt" ..etc /// Value for the map contains the payload and some meta information like items actioned, and is completed flag -pub fn get_allowed_commit_actions(on_commit_actions: &Value, allowed_actions: &HashMap>) -> HashMap { +pub fn get_allowed_commit_actions( + on_commit_actions: &OnCommitActions, + allowed_actions: &HashMap>, +) -> HashMap { let mut filtered_actions = HashMap::new(); allowed_actions.iter().for_each(|(action_key, sub_action_keys)| { - if let Some(action) = get_value_by_key(on_commit_actions, action_key) { + // if let Some(action) = get_value_by_key(on_commit_actions, action_key) { + // for sub_action_key in sub_action_keys { + // if let Some(sub_action) = get_value_by_key(action, sub_action_key) { + // filtered_actions.insert( + // sub_action_key.to_string(), + // AllowedActionsMapItem::new(sub_action.clone(), get_total_action_count(sub_action)), + // ); + // } + // } + // } + + // - look at first level + if let Some(first_level) = on_commit_actions.get(action_key) { + // - look at second level for sub_action_key in sub_action_keys { - if let Some(sub_action) = get_value_by_key(action, sub_action_key) { - filtered_actions.insert( - sub_action_key.to_string(), - AllowedActionsMapItem::new(sub_action.clone(), get_total_action_count(sub_action)), - ); + if let Some(second_level) = first_level.get(sub_action_key) { + filtered_actions.insert(sub_action_key.to_string(), AllowedActionsMapItem::new(second_level.clone())); } } } @@ -101,15 +114,23 @@ pub fn get_total_action_count(action: &Value) -> u32 { } /// Retrieves sub actions under publish by using a look key. -pub fn get_actions_deserialised(actions: &Value) -> Result { - match serde_json::from_value(actions.clone()) { - Ok(res) => Ok(res), - Err(err) => Err(MessengerActionError { - kind: crate::errors::MessengerActionErrorKind::Deserialisation, - reason: format!("Deserialisation to type={} failed, with error={:?}", type_name::(), err), - data: actions.to_string(), - }), - } +pub fn get_action_deserialised(action: Box) -> Result { + // let action = action.get(); + let deserialised_action = serde_json::from_str(action.get()).map_err(|err| MessengerActionError { + kind: crate::errors::MessengerActionErrorKind::Deserialisation, + reason: format!("Deserialisation to type={} failed, with error={:?}", type_name::(), err), + data: action.to_string(), + })?; + + Ok(deserialised_action) + // match serde_json::from_value(actions.clone()) { + // Ok(res) => Ok(res), + // Err(err) => Err(MessengerActionError { + // kind: crate::errors::MessengerActionErrorKind::Deserialisation, + // reason: format!("Deserialisation to type={} failed, with error={:?}", type_name::(), err), + // data: actions.to_string(), + // }), + // } } ///// Retrieves the oncommit actions that are supported by the system. diff --git a/packages/talos_suffix/src/suffix.rs b/packages/talos_suffix/src/suffix.rs index 572f0aab..393d272d 100644 --- a/packages/talos_suffix/src/suffix.rs +++ b/packages/talos_suffix/src/suffix.rs @@ -58,6 +58,20 @@ where Suffix { meta, messages } } + pub fn reset(&mut self) { + let meta = SuffixMeta { + head: 0, + last_insert_vers: 0, + prune_index: None, + ..self.meta + }; + + let messages = VecDeque::with_capacity(1_000); + + self.meta = meta; + self.messages = messages; + } + pub fn index_from_head(&self, version: u64) -> Option { let head = self.meta.head; if version < head { @@ -373,9 +387,10 @@ where /// This enables to move the head to the appropiate location. fn prune_till_index(&mut self, index: usize) -> SuffixResult>>> { info!( - "Suffix message length before pruning={} and current suffix head={}", + "Suffix message length before pruning={} and current suffix head={} | suffix capacity = {}", self.messages.len(), - self.meta.head + self.meta.head, + self.messages.capacity(), ); let start_ms = Instant::now(); @@ -390,13 +405,17 @@ where } else { self.update_head(0) } + // let new_suffix_capacity = self.messages.len() + 1_000; + // let final_capacity = if new_suffix_capacity > 400_000 { new_suffix_capacity } else { 400_000 }; + // self.messages.shrink_to(final_capacity); info!( - "Suffix message length after pruning={} and new suffix head={} | Prune took {} microseconds and update head took {} microseconds", + "Suffix message length after pruning={} and new suffix head={} | Prune took {} microseconds and update head took {} microseconds | suffix capacity = {}", self.messages.len(), self.meta.head, drain_end_ms, - start_ms_2.elapsed().as_micros() + start_ms_2.elapsed().as_micros(), + self.messages.capacity() ); Ok(drained_entries)