diff --git a/core/main/src/broker/broker_utils.rs b/core/main/src/broker/broker_utils.rs index 642997fff..8d6425a5e 100644 --- a/core/main/src/broker/broker_utils.rs +++ b/core/main/src/broker/broker_utils.rs @@ -15,15 +15,19 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::time::Duration; - -use crate::utils::rpc_utils::extract_tcp_port; +use crate::{state::platform_state::PlatformState, utils::rpc_utils::extract_tcp_port}; use futures::stream::{SplitSink, SplitStream}; use futures_util::StreamExt; +use jsonrpsee::{core::RpcResult, types::error::CallError}; use ripple_sdk::{ + api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats}, + extn::extn_client_message::ExtnResponse, log::{error, info}, tokio::{self, net::TcpStream}, + uuid::Uuid, }; +use serde_json::Value; +use std::time::Duration; use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream}; pub struct BrokerUtils; @@ -66,4 +70,45 @@ impl BrokerUtils { tokio::time::sleep(Duration::from_secs(1)).await; } } + + pub async fn process_internal_main_request<'a>( + state: &'a PlatformState, + method: &'a str, + ) -> RpcResult { + let ctx = CallContext::new( + Uuid::new_v4().to_string(), + Uuid::new_v4().to_string(), + "internal".into(), + 1, + ApiProtocol::Extn, + method.to_string(), + None, + false, + ); + let rpc_request = RpcRequest { + ctx: ctx.clone(), + method: method.to_string(), + params_json: RpcRequest::prepend_ctx(None, &ctx), + stats: RpcStats::default(), + }; + + let resp = state + .get_client() + .get_extn_client() + .main_internal_request(rpc_request.clone()) + .await; + + if let Ok(res) = resp { + if let Some(ExtnResponse::Value(val)) = res.payload.extract::() { + return Ok(val); + } + } + + // TODO: Update error handling + Err(jsonrpsee::core::Error::Call(CallError::Custom { + code: -32100, + message: format!("failed to get {}", method), + data: None, + })) + } } diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 17808fa6f..bc93c5978 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -44,6 +44,7 @@ use std::{ }; use crate::{ + broker::broker_utils::BrokerUtils, firebolt::firebolt_gateway::{FireboltGatewayCommand, JsonRpcError}, service::extn::ripple_client::RippleClient, state::platform_state::PlatformState, @@ -741,6 +742,7 @@ impl BrokerOutputForwarder { if let Some(id) = id { if let Ok(broker_request) = platform_state.endpoint_state.get_request(id) { + let trigger_event_handling = broker_request.rule.event_handler.is_some(); let workflow_callback = broker_request.clone().workflow_callback; let sub_processed = broker_request.is_subscription_processed(); let rpc_request = broker_request.rpc.clone(); @@ -760,6 +762,24 @@ impl BrokerOutputForwarder { if !apply_filter(&broker_request, &result, &rpc_request) { continue; } + + // TODO: Refactor code in the future to apply rule-based filtering and transformations as required. + if trigger_event_handling { + if let Some(method) = broker_request.rule.event_handler.clone() + { + let platform_state_c = platform_state.clone(); + let rpc_request_c = rpc_request.clone(); + let response_c = response.clone(); + tokio::spawn(Self::handle_event( + platform_state_c, + method, + rpc_request_c, + response_c, + )); + continue; + } + } + // check if the request transform has event_decorator_method if let Some(decorator_method) = broker_request.rule.transform.event_decorator_method.clone() @@ -919,6 +939,38 @@ impl BrokerOutputForwarder { }); } + async fn handle_event( + platform_state: PlatformState, + method: String, + rpc_request: RpcRequest, + mut response: JsonRpcApiResponse, + ) { + let session_id = rpc_request.ctx.get_id(); + let request_id = rpc_request.ctx.call_id; + let protocol = rpc_request.ctx.protocol.clone(); + let platform_state_c = &platform_state; + + if let Ok(res) = + BrokerUtils::process_internal_main_request(platform_state_c, method.as_str()).await + { + response.result = Some(serde_json::to_value(res.clone()).unwrap()); + } + response.id = Some(request_id); + + let message = ApiMessage::new( + protocol, + serde_json::to_string(&response).unwrap(), + request_id.to_string(), + ); + + if let Some(session) = platform_state_c + .session_state + .get_session_for_connection_id(&session_id) + { + return_api_message_for_transport(session, message, platform_state.clone()).await; + } + } + pub fn handle_non_jsonrpc_response( data: &[u8], callback: BrokerCallback, @@ -1098,6 +1150,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, sources: None, }, subscription_processed: None, @@ -1168,6 +1221,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, sources: None, }, None, @@ -1181,6 +1235,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, sources: None, }, None, diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index 0bc78c131..2c1626bf5 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -102,6 +102,8 @@ pub struct Rule { #[serde(skip_serializing_if = "Option::is_none")] pub filter: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub event_handler: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub endpoint: Option, #[serde(skip_serializing_if = "Option::is_none")] pub sources: Option>, diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 3af8594d5..8d2a57a10 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -478,6 +478,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, sources: None, }, subscription_processed: None, @@ -627,6 +628,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, sources: None, }, subscription_processed: Some(false), @@ -681,6 +683,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, sources: None, }, subscription_processed: Some(true), diff --git a/core/main/src/broker/websocket_broker.rs b/core/main/src/broker/websocket_broker.rs index 683d15b2f..e70789225 100644 --- a/core/main/src/broker/websocket_broker.rs +++ b/core/main/src/broker/websocket_broker.rs @@ -254,6 +254,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, sources: None, }, workflow_callback: None, @@ -295,6 +296,7 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + event_handler: None, sources: None, }, workflow_callback: None,