Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ env_logger = "0.10"

# Json Serialize / Deserialize
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_json = { version = "1.0", features = ["raw_value"] }

# Async
tokio = { version = "1", features = ["full", "test-util"] }
Expand Down
40 changes: 20 additions & 20 deletions packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use talos_messenger_core::{
core::{ActionService, MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, MessengerSystemService},
errors::MessengerServiceResult,
suffix::MessengerStateTransitionTimestamps,
utlis::get_actions_deserialised,
utlis::get_action_deserialised,
};

use super::models::KafkaAction;
Expand Down Expand Up @@ -38,36 +38,36 @@ where
headers,
} = actions;

if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()) {
match get_actions_deserialised::<Vec<KafkaAction>>(publish_actions_for_type) {
Ok(actions) => {
let total_len = actions.len() as u32;
if let Some(publish_actions) = commit_actions.get(&self.publisher.get_publish_type().to_string()) {
let total_len = publish_actions.len() as u32;
let mut publish_vec = vec![];
for publish_action in publish_actions {
match get_action_deserialised::<KafkaAction>(publish_action.clone()) {
Ok(action) => {
let headers_cloned = headers.clone();

let headers_cloned = headers.clone();

let publish_vec = actions.into_iter().map(|action| {
let publisher = self.publisher.clone();
let mut headers = headers_cloned.clone();
let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap();

headers.insert(MessengerStateTransitionTimestamps::EndOnCommitActions.to_string(), timestamp);
async move {
publish_vec.push(async move {
if let Err(publish_error) = publisher.send(version, action, headers, total_len).await {
error!("Failed to publish message for version={version} with error {publish_error}")
}
}
});
join_all(publish_vec).await;
}
Err(err) => {
error!(
"Failed to deserialise for version={version} key={} for data={:?} with error={:?}",
&self.publisher.get_publish_type(),
err.data,
err.reason
);
});
}
Err(err) => {
error!(
"Failed to deserialise for version={version} key={} for data={:?} with error={:?}",
&self.publisher.get_publish_type(),
err.data,
err.reason
);
}
}
}
join_all(publish_vec).await;
}
}
None => {
Expand Down
4 changes: 2 additions & 2 deletions packages/talos_messenger_core/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use ahash::HashMap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::value::RawValue;
use strum::{Display, EnumIter, EnumString};
use talos_certifier::ports::errors::MessagePublishError;

Expand Down Expand Up @@ -50,7 +50,7 @@ pub trait MessengerSystemService {
#[derive(Debug, Clone)]
pub struct MessengerCommitActions {
pub version: u64,
pub commit_actions: HashMap<String, Value>,
pub commit_actions: HashMap<String, Vec<Box<RawValue>>>,
pub headers: HashMap<String, String>,
}

Expand Down
113 changes: 109 additions & 4 deletions packages/talos_messenger_core/src/models/candidate_message.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use log::{error, warn};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::{value::RawValue, Value};
use std::collections::HashMap;
use talos_certifier::core::CandidateMessageBaseTrait;

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub type OnCommitActions = HashMap<String, HashMap<String, Vec<Box<RawValue>>>>;

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase", tag = "_typ")]
pub struct MessengerCandidateMessage {
// UNIQUENESS FIELD
Expand All @@ -18,7 +21,8 @@ pub struct MessengerCandidateMessage {
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub on_commit: Option<Box<Value>>,
#[serde(default, deserialize_with = "deserialize_oncommit_actions")]
pub on_commit: Option<OnCommitActions>,
/// Cohort started certification
#[serde(default)]
pub certification_started_at: i128,
Expand Down Expand Up @@ -62,6 +66,43 @@ impl CandidateMessageBaseTrait for MessengerCandidateMessage {
}
}

fn deserialize_oncommit_actions<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Option<OnCommitActions>, D::Error> {
let val = Value::deserialize(deserializer)?;

let final_deserialized_output = match val {
Value::Null => Ok(None),
Value::Object(map) => {
let mut result = HashMap::new();
for (k1, v1) in map {
if let Value::Object(inner_map) = v1 {
let mut inner_result = HashMap::new();
for (k2, v2) in inner_map {
if let Ok(raw_value) = serde_json::from_value(v2) {
inner_result.insert(k2, raw_value);
} else {
warn!("Failed to deserialise on_commit action. Inner value is not deserializable to Vec<Box<RawValue>>");
return Ok(None); // If any inner value fails, return None.
}
}
result.insert(k1, inner_result);
} else {
warn!("Failed to deserialise as on_commit action. Outer value is not Object type.");
return Ok(None); // If any outer value is not an object, return None.
}
}
Ok(Some(result))
}
// Return None for any other type
_ => {
warn!("Failed to deserialise as on_commit action as it is not of the expected structure. Received on_commit_actions = {val:?}");
Ok(None)
}
};
error!("Result of deserialization is {final_deserialized_output:?}");

final_deserialized_output
}

// // $coverage:ignore-start
#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -111,4 +152,68 @@ mod tests {
// error!("deserialised candidate message: {deserialised:#?}");
assert!(deserialised.on_commit.is_some());
}
#[test]
fn deserialise_invalid_on_commit_action() {
let on_commit = json!("Test");

let json = json!({
"xid": "1a5793f8-4ca3-420f-8e81-219e7b039e2a",
"agent": "test-agent",
"cohort": "test-cohort",
"readset": [
"c5893d97-30f6-446d-a349-1741f7eff599"
],
"readvers": [],
"snapshot": 2,
"writeset": [
"c5893d97-30f6-446d-a349-1741f7eff599"
],
"statemap": [
{
"SomeStateMap": {
"initialVersion": 0,
"newVersion": 20
}
}
],
"onCommit": on_commit,
"certificationStartedAt": 0,
"requestCreatedAt": 0,
"publishedAt": 0,
"receivedAt": 0,
});

let deserialised: MessengerCandidateMessage = serde_json::from_value(json).unwrap();
assert!(deserialised.on_commit.is_none());
}
#[test]
fn deserialise_no_on_commit_action() {
let json = json!({
"xid": "1a5793f8-4ca3-420f-8e81-219e7b039e2a",
"agent": "test-agent",
"cohort": "test-cohort",
"readset": [
"c5893d97-30f6-446d-a349-1741f7eff599"
],
"readvers": [],
"snapshot": 2,
"writeset": [
"c5893d97-30f6-446d-a349-1741f7eff599"
],
"statemap": [
{
"SomeStateMap": {
"initialVersion": 0,
"newVersion": 20
}
}
],
"certificationStartedAt": 0,
"requestCreatedAt": 0,
"publishedAt": 0,
"receivedAt": 0,
});
let deserialised: MessengerCandidateMessage = serde_json::from_value(json).unwrap();
assert!(deserialised.on_commit.is_none());
}
}
2 changes: 1 addition & 1 deletion packages/talos_messenger_core/src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod candidate_message;

pub use candidate_message::MessengerCandidateMessage;
pub use candidate_message::{MessengerCandidateMessage, OnCommitActions};
33 changes: 17 additions & 16 deletions packages/talos_messenger_core/src/services/inbound_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use ahash::{HashMap, HashMapExt};
use ahash::HashMap;
use async_trait::async_trait;
use log::{debug, error, info, warn};

Expand Down Expand Up @@ -88,24 +88,19 @@ where
///
async fn process_next_actions(&mut self) -> MessengerServiceResult {
let items_to_process = self.suffix.get_suffix_items_to_process();
for item in items_to_process {
let ver = item.version;

let mut headers = item.headers;
for mut payload in items_to_process {
let ver = payload.version;

// let mut headers = payload.headers;
let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap();
headers.insert(MessengerStateTransitionTimestamps::StartOnCommitActions.to_string(), timestamp);

let payload_to_send = MessengerCommitActions {
version: ver,
commit_actions: item.actions.iter().fold(HashMap::new(), |mut acc, (key, value)| {
acc.insert(key.to_string(), value.get_payload().clone());
acc
}),
headers,
};
payload
.headers
.insert(MessengerStateTransitionTimestamps::StartOnCommitActions.to_string(), timestamp);

// send for publishing

self.tx_actions_channel.send(payload_to_send).await.map_err(|e| MessengerServiceError {
self.tx_actions_channel.send(payload).await.map_err(|e| MessengerServiceError {
kind: crate::errors::MessengerServiceErrorKind::Channel,
reason: e.to_string(),
data: Some(ver.to_string()),
Expand Down Expand Up @@ -141,7 +136,9 @@ where
if let Some(index_to_prune) = self.suffix.get_safe_prune_index() {
// error!("Pruning till index {index_to_prune}");
// Call prune method on suffix.
let _ = self.suffix.prune_till_index(index_to_prune);
if let Ok(pruned_items) = self.suffix.prune_till_index(index_to_prune) {
drop(pruned_items);
}
debug!(
"[Suffix Pruning] suffix after prune printed as tuple of (index, ver, decsision_ver) \n\n {:?}",
self.suffix.retrieve_all_some_vec_items()
Expand Down Expand Up @@ -331,6 +328,9 @@ where
let new_offset = if new_prune_version.eq(&last_version) {
if let Some(decision_vers) = last_version_decision_ver {
debug!("Updating to decision offset ({:?})", decision_vers);

// error!("Resetting suffix to initial state");
// self.suffix.reset();
decision_vers
} else {
new_prune_version
Expand All @@ -346,6 +346,7 @@ where
error!("[Commit] Error committing {err:?}");
}
Ok(_) => {
info!("Committed to new offset {new_offset}");
self.last_committed_version = new_offset;
},
}
Expand Down
Loading
Loading