From 989f41d880ac7bd18a9d72f0430437593cc8094d Mon Sep 17 00:00:00 2001 From: Brindha Senthil Date: Wed, 30 Oct 2024 13:33:52 -0700 Subject: [PATCH 1/3] feat: Updated code for jq rule event handling --- core/main/src/broker/broker_utils.rs | 52 ++++++++++++++++++++++-- core/main/src/broker/endpoint_broker.rs | 52 +++++++++++++++++++++++- core/main/src/broker/rules_engine.rs | 2 + core/main/src/broker/thunder_broker.rs | 3 ++ core/main/src/broker/websocket_broker.rs | 2 + 5 files changed, 107 insertions(+), 4 deletions(-) diff --git a/core/main/src/broker/broker_utils.rs b/core/main/src/broker/broker_utils.rs index 642997fff..5fd508d22 100644 --- a/core/main/src/broker/broker_utils.rs +++ b/core/main/src/broker/broker_utils.rs @@ -15,15 +15,20 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::time::Duration; - -use crate::utils::rpc_utils::extract_tcp_port; +use crate::{state::platform_state::PlatformState, utils::rpc_utils::extract_tcp_port}; use futures::stream::{SplitSink, SplitStream}; use futures_util::StreamExt; +use jsonrpsee::{core::RpcResult, types::error::CallError}; +use ripple_sdk::api::firebolt::fb_capabilities::CAPABILITY_NOT_AVAILABLE; use ripple_sdk::{ + api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats}, + extn::extn_client_message::ExtnResponse, log::{error, info}, tokio::{self, net::TcpStream}, + uuid::Uuid, }; +use serde_json::{from_value, Value}; +use std::time::Duration; use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream}; pub struct BrokerUtils; @@ -66,4 +71,45 @@ impl BrokerUtils { tokio::time::sleep(Duration::from_secs(1)).await; } } + + pub async fn handle_main_internal_request<'a>( + state: &'a PlatformState, + method: &'a str, + ) -> RpcResult { + let ctx = CallContext::new( + Uuid::new_v4().to_string(), + Uuid::new_v4().to_string(), + "internal".into(), + 1, + ApiProtocol::Extn, + method.to_string(), + None, + false, + ); + let rpc_request = RpcRequest { + ctx: ctx.clone(), + method: method.to_string(), + params_json: RpcRequest::prepend_ctx(None, &ctx), + stats: RpcStats::default(), + }; + + let resp = state + .get_client() + .get_extn_client() + .main_internal_request(rpc_request.clone()) + .await; + + if let Ok(res) = resp { + if let Some(ExtnResponse::Value(val)) = res.payload.extract::() { + return Ok(Value::from(val)); + } + } + + // TODO: Update error handling + Err(jsonrpsee::core::Error::Call(CallError::Custom { + code: CAPABILITY_NOT_AVAILABLE, + message: format!("{} is not available", method.to_string()), + data: None, + })) + } } diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 9598a6fd1..596bfb9e5 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -43,9 +43,10 @@ use std::{ }; use crate::{ + broker::broker_utils::BrokerUtils, firebolt::firebolt_gateway::{FireboltGatewayCommand, JsonRpcError}, service::extn::ripple_client::RippleClient, - state::platform_state::PlatformState, + state::{self, platform_state::PlatformState}, utils::router_utils::{ add_telemetry_status_code, get_rpc_header, return_api_message_for_transport, return_extn_response, @@ -601,6 +602,7 @@ impl BrokerOutputForwarder { if let Some(id) = id { if let Ok(broker_request) = platform_state.endpoint_state.get_request(id) { + let requires_event_handling = broker_request.rule.event_handler.is_some(); let sub_processed = broker_request.is_subscription_processed(); let rpc_request = broker_request.rpc.clone(); let session_id = rpc_request.ctx.get_id(); @@ -619,6 +621,51 @@ impl BrokerOutputForwarder { if !apply_filter(&broker_request, &result, &rpc_request) { continue; } + + // handle events with internal request if required + if requires_event_handling { + if let Some(method) = broker_request.rule.event_handler.clone() + { + let session_id = rpc_request.ctx.get_id(); + let request_id = rpc_request.ctx.call_id; + let protocol = rpc_request.ctx.protocol.clone(); + let platform_state_c = platform_state.clone(); + tokio::spawn(async move { + if let Ok(res) = + BrokerUtils::handle_main_internal_request( + &platform_state_c, + method.as_str(), + ) + .await + { + response.result = Some( + serde_json::to_value(res.clone()).unwrap(), + ); + } + response.id = Some(request_id); + + let message = ApiMessage::new( + protocol, + serde_json::to_string(&response).unwrap(), + request_id.to_string(), + ); + + if let Some(session) = platform_state_c + .session_state + .get_session_for_connection_id(&session_id) + { + return_api_message_for_transport( + session, + message, + platform_state_c, + ) + .await + } + }); + continue; + } + } + // check if the request transform has event_decorator_method if let Some(decorator_method) = broker_request.rule.transform.event_decorator_method.clone() @@ -911,6 +958,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, }, subscription_processed: None, }, @@ -979,6 +1027,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, }, None, ); @@ -990,6 +1039,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, }, None, ); diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index 6b4222a27..c24c54702 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -93,6 +93,8 @@ pub struct Rule { #[serde(skip_serializing_if = "Option::is_none")] pub filter: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub event_handler: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub endpoint: Option, } diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index fa51d7f18..35b362059 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -474,6 +474,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, }, subscription_processed: None, } @@ -621,6 +622,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, }, subscription_processed: Some(false), }; @@ -673,6 +675,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, }, subscription_processed: Some(true), }; diff --git a/core/main/src/broker/websocket_broker.rs b/core/main/src/broker/websocket_broker.rs index b0d02b3bc..12858e867 100644 --- a/core/main/src/broker/websocket_broker.rs +++ b/core/main/src/broker/websocket_broker.rs @@ -250,6 +250,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, }, subscription_processed: None, }; @@ -289,6 +290,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, }, subscription_processed: None, }; From c713666ae7b6b9e5768fd16c19690d18226acd73 Mon Sep 17 00:00:00 2001 From: Brindha Senthil Date: Wed, 30 Oct 2024 14:03:31 -0700 Subject: [PATCH 2/3] feat: Fixed clippy errors --- core/main/src/broker/broker_utils.rs | 6 +++--- core/main/src/broker/endpoint_broker.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/main/src/broker/broker_utils.rs b/core/main/src/broker/broker_utils.rs index 5fd508d22..fec4ca893 100644 --- a/core/main/src/broker/broker_utils.rs +++ b/core/main/src/broker/broker_utils.rs @@ -27,7 +27,7 @@ use ripple_sdk::{ tokio::{self, net::TcpStream}, uuid::Uuid, }; -use serde_json::{from_value, Value}; +use serde_json::Value; use std::time::Duration; use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream}; @@ -101,14 +101,14 @@ impl BrokerUtils { if let Ok(res) = resp { if let Some(ExtnResponse::Value(val)) = res.payload.extract::() { - return Ok(Value::from(val)); + return Ok(val); } } // TODO: Update error handling Err(jsonrpsee::core::Error::Call(CallError::Custom { code: CAPABILITY_NOT_AVAILABLE, - message: format!("{} is not available", method.to_string()), + message: format!("{} is not available", method), data: None, })) } diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 596bfb9e5..ae91f31f2 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -46,7 +46,7 @@ use crate::{ broker::broker_utils::BrokerUtils, firebolt::firebolt_gateway::{FireboltGatewayCommand, JsonRpcError}, service::extn::ripple_client::RippleClient, - state::{self, platform_state::PlatformState}, + state::platform_state::PlatformState, utils::router_utils::{ add_telemetry_status_code, get_rpc_header, return_api_message_for_transport, return_extn_response, From a90b0a2199fb5f839e3b79e253dc21e3fb496e78 Mon Sep 17 00:00:00 2001 From: Brindha Senthil Date: Wed, 30 Oct 2024 15:15:53 -0700 Subject: [PATCH 3/3] feat: refactored code --- core/main/src/broker/broker_utils.rs | 7 +-- core/main/src/broker/endpoint_broker.rs | 82 +++++++++++++------------ 2 files changed, 47 insertions(+), 42 deletions(-) diff --git a/core/main/src/broker/broker_utils.rs b/core/main/src/broker/broker_utils.rs index fec4ca893..8d6425a5e 100644 --- a/core/main/src/broker/broker_utils.rs +++ b/core/main/src/broker/broker_utils.rs @@ -19,7 +19,6 @@ use crate::{state::platform_state::PlatformState, utils::rpc_utils::extract_tcp_ use futures::stream::{SplitSink, SplitStream}; use futures_util::StreamExt; use jsonrpsee::{core::RpcResult, types::error::CallError}; -use ripple_sdk::api::firebolt::fb_capabilities::CAPABILITY_NOT_AVAILABLE; use ripple_sdk::{ api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats}, extn::extn_client_message::ExtnResponse, @@ -72,7 +71,7 @@ impl BrokerUtils { } } - pub async fn handle_main_internal_request<'a>( + pub async fn process_internal_main_request<'a>( state: &'a PlatformState, method: &'a str, ) -> RpcResult { @@ -107,8 +106,8 @@ impl BrokerUtils { // TODO: Update error handling Err(jsonrpsee::core::Error::Call(CallError::Custom { - code: CAPABILITY_NOT_AVAILABLE, - message: format!("{} is not available", method), + code: -32100, + message: format!("failed to get {}", method), data: None, })) } diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index ae91f31f2..74e22e022 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -602,7 +602,7 @@ impl BrokerOutputForwarder { if let Some(id) = id { if let Ok(broker_request) = platform_state.endpoint_state.get_request(id) { - let requires_event_handling = broker_request.rule.event_handler.is_some(); + let trigger_event_handling = broker_request.rule.event_handler.is_some(); let sub_processed = broker_request.is_subscription_processed(); let rpc_request = broker_request.rpc.clone(); let session_id = rpc_request.ctx.get_id(); @@ -622,46 +622,19 @@ impl BrokerOutputForwarder { continue; } - // handle events with internal request if required - if requires_event_handling { + // TODO: Refactor code in the future to apply rule-based filtering and transformations as required. + if trigger_event_handling { if let Some(method) = broker_request.rule.event_handler.clone() { - let session_id = rpc_request.ctx.get_id(); - let request_id = rpc_request.ctx.call_id; - let protocol = rpc_request.ctx.protocol.clone(); let platform_state_c = platform_state.clone(); - tokio::spawn(async move { - if let Ok(res) = - BrokerUtils::handle_main_internal_request( - &platform_state_c, - method.as_str(), - ) - .await - { - response.result = Some( - serde_json::to_value(res.clone()).unwrap(), - ); - } - response.id = Some(request_id); - - let message = ApiMessage::new( - protocol, - serde_json::to_string(&response).unwrap(), - request_id.to_string(), - ); - - if let Some(session) = platform_state_c - .session_state - .get_session_for_connection_id(&session_id) - { - return_api_message_for_transport( - session, - message, - platform_state_c, - ) - .await - } - }); + let rpc_request_c = rpc_request.clone(); + let response_c = response.clone(); + tokio::spawn(Self::handle_event( + platform_state_c, + method, + rpc_request_c, + response_c, + )); continue; } } @@ -746,6 +719,7 @@ impl BrokerOutputForwarder { let request_id = rpc_request.ctx.call_id; response.id = Some(request_id); let tm_str = get_rpc_header(&rpc_request); + // Step 2: Create the message let mut message = ApiMessage::new( rpc_request.ctx.protocol.clone(), @@ -806,6 +780,38 @@ impl BrokerOutputForwarder { }); } + async fn handle_event( + platform_state: PlatformState, + method: String, + rpc_request: RpcRequest, + mut response: JsonRpcApiResponse, + ) { + let session_id = rpc_request.ctx.get_id(); + let request_id = rpc_request.ctx.call_id; + let protocol = rpc_request.ctx.protocol.clone(); + let platform_state_c = &platform_state; + + if let Ok(res) = + BrokerUtils::process_internal_main_request(platform_state_c, method.as_str()).await + { + response.result = Some(serde_json::to_value(res.clone()).unwrap()); + } + response.id = Some(request_id); + + let message = ApiMessage::new( + protocol, + serde_json::to_string(&response).unwrap(), + request_id.to_string(), + ); + + if let Some(session) = platform_state_c + .session_state + .get_session_for_connection_id(&session_id) + { + return_api_message_for_transport(session, message, platform_state.clone()).await; + } + } + pub fn handle_non_jsonrpc_response( data: &[u8], callback: BrokerCallback,