Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 48 additions & 24 deletions core/main/src/broker/http_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ use ripple_sdk::{
tokio::{self, sync::mpsc},
utils::error::RippleError,
};
use serde_json::Value;

use super::endpoint_broker::{
BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerRequest,
BrokerSender, EndpointBroker, EndpointBrokerState, BROKER_CHANNEL_BUFFER_SIZE,
};
use crate::state::platform_state::PlatformState;
use crate::{
broker::rules_engine::{jq_compile, RuleTransformType},
state::platform_state::PlatformState,
};
use tokio_tungstenite::tungstenite::http::uri::InvalidUri;

pub struct HttpBroker {
Expand All @@ -42,39 +46,59 @@ pub struct HttpBroker {

async fn send_http_request(
client: &Client<HttpConnector>,
method: Method,
uri: &Uri,
path: &str,
broker_request: BrokerRequest,
) -> Result<Response<Body>, RippleError> {
/*
TODO? we may need to support body for POST request in the future
*/
let http_request = Request::new(Body::empty());
let (mut parts, _) = http_request.into_parts();
//TODO, need to refactor to support other methods
parts.method = method.clone();
/*
mix endpoint url with method
*/

let uri: Uri = format!("{}{}", uri, path)
let mut method = Method::GET;
let mut body = Body::empty();

// A rule with a request transform defined indicates that the request is a POST, where
// the request transform is the body of the request. Otherwise, it is a GET request.

if let Some(request_transform) = broker_request
.rule
.transform
.get_transform_data(RuleTransformType::Request)
{
method = Method::POST;

let transform_params =
match serde_json::from_str::<Vec<Value>>(&broker_request.rpc.params_json) {
Ok(mut params) => params.pop().unwrap_or(Value::Null),
Err(e) => {
error!(
"send_http_request: Error in http broker parsing request params: e={:?}",
e
);
Value::Null
}
};

let body_val = jq_compile(
transform_params,
&request_transform,
format!("{}_http_post", broker_request.rpc.ctx.method),
)?;

body = Body::from(body_val.to_string());
}

let uri: Uri = format!("{}{}", uri, broker_request.rule.alias)
.parse()
.map_err(|e: InvalidUri| RippleError::BrokerError(e.to_string()))?;
let new_request = Request::builder()
.uri(uri)
.body(Body::empty())
.map_err(|e| RippleError::BrokerError(e.to_string()))?;
let (uri_parts, _) = new_request.into_parts();

parts.uri = uri_parts.uri;
debug!("http_broker sending {} request={}", method, uri,);

let http_request = Request::from_parts(parts, Body::empty());
let http_request = Request::builder()
.uri(uri)
.method(method)
.body(body)
.map_err(|e| RippleError::BrokerError(e.to_string()))?;

debug!(
"http_broker sending {} request={}",
method,
http_request.uri(),
);
match client.request(http_request).await {
Ok(v) => Ok(v),
Err(e) => {
Expand Down Expand Up @@ -129,7 +153,7 @@ impl EndpointBroker for HttpBroker {
while let Some(request) = tr.recv().await {
LogSignal::new("http_broker".to_string(), format!("received request - start processing request={:?}", request), request.rpc.ctx.clone())
.with_diagnostic_context_item("rule_alias", request.rule.alias.as_str()).emit_debug();
match send_http_request(&client, Method::GET, &uri, &request.clone().rule.alias)
match send_http_request(&client, &uri, request.clone())
.await
{
Ok(response) => {
Expand Down
196 changes: 87 additions & 109 deletions core/main/src/firebolt/handlers/discovery_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use std::{collections::HashMap, time::Duration};

use crate::{
firebolt::handlers::privacy_rpc::PrivacyImpl,
firebolt::rpc::RippleRPCProvider,
broker::broker_utils::BrokerUtils,
firebolt::{handlers::privacy_rpc::PrivacyImpl, rpc::RippleRPCProvider},
service::apps::{
app_events::{AppEventDecorationError, AppEventDecorator, AppEvents},
provider_broker::{self, ProviderBroker},
Expand All @@ -37,7 +37,7 @@ use jsonrpsee::{
use ripple_sdk::{
api::{
account_link::AccountLinkRequest,
apps::{AppError, AppManagerResponse, AppMethod, AppRequest, AppResponse},
apps::{AppError, AppManagerResponse, AppMethod, AppRequest},
config::Config,
firebolt::{
fb_capabilities::FireboltCap,
Expand All @@ -51,7 +51,7 @@ use ripple_sdk::{
},
},
extn::extn_client_message::ExtnResponse,
log::{error, info},
log::{debug, error, info},
tokio::{sync::oneshot, time::timeout},
};
use ripple_sdk::{
Expand Down Expand Up @@ -527,7 +527,7 @@ impl DiscoveryServer for DiscoveryImpl {
DiscoveryImpl::get_content_policy(&ctx, &self.state, &ctx.app_id).await
}

async fn launch(&self, ctx: CallContext, request: LaunchRequest) -> RpcResult<bool> {
async fn launch(&self, _ctx: CallContext, request: LaunchRequest) -> RpcResult<bool> {
let app_defaults_configuration = self.state.get_device_manifest().applications.defaults;

let intent_validation_config = self
Expand All @@ -537,7 +537,7 @@ impl DiscoveryServer for DiscoveryImpl {
.intent_validation;
validate_navigation_intent(intent_validation_config, request.intent.clone()).await?;

let req_updated_source = update_intent_source(ctx.app_id.clone(), request.clone());
let t_state = &mut self.state.clone();

if let Some(reserved_app_id) =
app_defaults_configuration.get_reserved_application_id(&request.app_id)
Expand All @@ -552,47 +552,26 @@ impl DiscoveryServer for DiscoveryImpl {
));
}

// Not validating the intent, pass-through to app as is.
if !AppEvents::is_app_registered_for_event(
&self.state,
reserved_app_id.to_string(),
DISCOVERY_EVENT_ON_NAVIGATE_TO,
) {
return Err(rpc_navigate_reserved_app_err(
format!("Discovery.launch: reserved app id {} is not registered for discovery.onNavigateTo event",
reserved_app_id).as_str(),
));
}
// emit EVENT_ON_NAVIGATE_TO to the reserved app.
AppEvents::emit_to_app(
&self.state,
reserved_app_id.to_string(),
DISCOVERY_EVENT_ON_NAVIGATE_TO,
&serde_json::to_value(req_updated_source.intent).unwrap(),
match BrokerUtils::process_internal_main_request(
t_state,
"discovery.launch.internal",
Some(serde_json::to_value(request).map_err(|e| {
error!("Serialization error: {:?}", e);
rpc_err("Failed to serialize LaunchIntent")
})?),
)
.await;
info!(
"emit_to_app called for app {} event {}",
reserved_app_id.to_string(),
DISCOVERY_EVENT_ON_NAVIGATE_TO
);
return Ok(true);
}
let (app_resp_tx, app_resp_rx) = oneshot::channel::<AppResponse>();

let app_request =
AppRequest::new(AppMethod::Launch(req_updated_source.clone()), app_resp_tx);

if self
.state
.get_client()
.send_app_request(app_request)
.is_ok()
&& app_resp_rx.await.is_ok()
{
return Ok(true);
.await
{
Ok(val) => {
debug!("Internal subscription launch successful");
return Ok(val.as_bool().unwrap_or(false));
}
Err(e) => {
error!("Internal subscription launch failed: {:?}", e);
return Err(rpc_err("Internal subscription launch failed"));
}
}
}

Err(jsonrpsee::core::Error::Custom(String::from(
"Discovery.launch: some failure",
)))
Expand Down Expand Up @@ -788,70 +767,69 @@ impl DiscoveryServer for DiscoveryImpl {
}
}
}
fn update_intent_source(source_app_id: String, request: LaunchRequest) -> LaunchRequest {
let source = format!("xrn:firebolt:application:{}", source_app_id);
match request.intent.clone() {
Some(NavigationIntent::NavigationIntentStrict(navigation_intent)) => {
let updated_navigation_intent = match navigation_intent {
NavigationIntentStrict::Home(mut home_intent) => {
home_intent.context.source = source;
NavigationIntentStrict::Home(home_intent)
}
NavigationIntentStrict::Launch(mut launch_intent) => {
launch_intent.context.source = source;
NavigationIntentStrict::Launch(launch_intent)
}
NavigationIntentStrict::Entity(mut entity_intent) => {
entity_intent.context.source = source;
NavigationIntentStrict::Entity(entity_intent)
}
NavigationIntentStrict::Playback(mut playback_intent) => {
playback_intent.context.source = source;
NavigationIntentStrict::Playback(playback_intent)
}
NavigationIntentStrict::Search(mut search_intent) => {
search_intent.context.source = source;
NavigationIntentStrict::Search(search_intent)
}
NavigationIntentStrict::Section(mut section_intent) => {
section_intent.context.source = source;
NavigationIntentStrict::Section(section_intent)
}
NavigationIntentStrict::Tune(mut tune_intent) => {
tune_intent.context.source = source;
NavigationIntentStrict::Tune(tune_intent)
}
NavigationIntentStrict::ProviderRequest(mut provider_request_intent) => {
provider_request_intent.context.source = source;
NavigationIntentStrict::ProviderRequest(provider_request_intent)
}
NavigationIntentStrict::PlayEntity(mut p) => {
p.context.source = source;
NavigationIntentStrict::PlayEntity(p)
}
NavigationIntentStrict::PlayQuery(mut p) => {
p.context.source = source;
NavigationIntentStrict::PlayQuery(p)
}
};

LaunchRequest {
app_id: request.app_id,
intent: Some(NavigationIntent::NavigationIntentStrict(
updated_navigation_intent,
)),
}
}
Some(NavigationIntent::NavigationIntentLoose(mut loose_intent)) => {
loose_intent.context.source = source;
LaunchRequest {
app_id: request.app_id,
intent: Some(NavigationIntent::NavigationIntentLoose(loose_intent)),
}
}
_ => request,
}
}
// fn update_intent_source(source: String, request: LaunchRequest) -> LaunchRequest {
// match request.intent.clone() {
// Some(NavigationIntent::NavigationIntentStrict(navigation_intent)) => {
// let updated_navigation_intent = match navigation_intent {
// NavigationIntentStrict::Home(mut home_intent) => {
// home_intent.context.source = source;
// NavigationIntentStrict::Home(home_intent)
// }
// NavigationIntentStrict::Launch(mut launch_intent) => {
// launch_intent.context.source = source;
// NavigationIntentStrict::Launch(launch_intent)
// }
// NavigationIntentStrict::Entity(mut entity_intent) => {
// entity_intent.context.source = source;
// NavigationIntentStrict::Entity(entity_intent)
// }
// NavigationIntentStrict::Playback(mut playback_intent) => {
// playback_intent.context.source = source;
// NavigationIntentStrict::Playback(playback_intent)
// }
// NavigationIntentStrict::Search(mut search_intent) => {
// search_intent.context.source = source;
// NavigationIntentStrict::Search(search_intent)
// }
// NavigationIntentStrict::Section(mut section_intent) => {
// section_intent.context.source = source;
// NavigationIntentStrict::Section(section_intent)
// }
// NavigationIntentStrict::Tune(mut tune_intent) => {
// tune_intent.context.source = source;
// NavigationIntentStrict::Tune(tune_intent)
// }
// NavigationIntentStrict::ProviderRequest(mut provider_request_intent) => {
// provider_request_intent.context.source = source;
// NavigationIntentStrict::ProviderRequest(provider_request_intent)
// }
// NavigationIntentStrict::PlayEntity(mut p) => {
// p.context.source = source;
// NavigationIntentStrict::PlayEntity(p)
// }
// NavigationIntentStrict::PlayQuery(mut p) => {
// p.context.source = source;
// NavigationIntentStrict::PlayQuery(p)
// }
// };

// LaunchRequest {
// app_id: request.app_id,
// intent: Some(NavigationIntent::NavigationIntentStrict(
// updated_navigation_intent,
// )),
// }
// }
// Some(NavigationIntent::NavigationIntentLoose(mut loose_intent)) => {
// loose_intent.context.source = source;
// LaunchRequest {
// app_id: request.app_id,
// intent: Some(NavigationIntent::NavigationIntentLoose(loose_intent)),
// }
// }
// _ => request,
// }
// }

pub async fn validate_navigation_intent(
intent_validation_config: IntentValidation,
Expand Down
Loading