diff --git a/core/main/src/broker/http_broker.rs b/core/main/src/broker/http_broker.rs index 5779c2534..c05117dba 100644 --- a/core/main/src/broker/http_broker.rs +++ b/core/main/src/broker/http_broker.rs @@ -24,12 +24,16 @@ use ripple_sdk::{ tokio::{self, sync::mpsc}, utils::error::RippleError, }; +use serde_json::Value; use super::endpoint_broker::{ BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerRequest, BrokerSender, EndpointBroker, EndpointBrokerState, BROKER_CHANNEL_BUFFER_SIZE, }; -use crate::state::platform_state::PlatformState; +use crate::{ + broker::rules_engine::{jq_compile, RuleTransformType}, + state::platform_state::PlatformState, +}; use tokio_tungstenite::tungstenite::http::uri::InvalidUri; pub struct HttpBroker { @@ -42,39 +46,59 @@ pub struct HttpBroker { async fn send_http_request( client: &Client, - method: Method, uri: &Uri, - path: &str, + broker_request: BrokerRequest, ) -> Result, RippleError> { /* TODO? we may need to support body for POST request in the future */ - let http_request = Request::new(Body::empty()); - let (mut parts, _) = http_request.into_parts(); - //TODO, need to refactor to support other methods - parts.method = method.clone(); - /* - mix endpoint url with method - */ - let uri: Uri = format!("{}{}", uri, path) + let mut method = Method::GET; + let mut body = Body::empty(); + + // A rule with a request transform defined indicates that the request is a POST, where + // the request transform is the body of the request. Otherwise, it is a GET request. + + if let Some(request_transform) = broker_request + .rule + .transform + .get_transform_data(RuleTransformType::Request) + { + method = Method::POST; + + let transform_params = + match serde_json::from_str::>(&broker_request.rpc.params_json) { + Ok(mut params) => params.pop().unwrap_or(Value::Null), + Err(e) => { + error!( + "send_http_request: Error in http broker parsing request params: e={:?}", + e + ); + Value::Null + } + }; + + let body_val = jq_compile( + transform_params, + &request_transform, + format!("{}_http_post", broker_request.rpc.ctx.method), + )?; + + body = Body::from(body_val.to_string()); + } + + let uri: Uri = format!("{}{}", uri, broker_request.rule.alias) .parse() .map_err(|e: InvalidUri| RippleError::BrokerError(e.to_string()))?; - let new_request = Request::builder() - .uri(uri) - .body(Body::empty()) - .map_err(|e| RippleError::BrokerError(e.to_string()))?; - let (uri_parts, _) = new_request.into_parts(); - parts.uri = uri_parts.uri; + debug!("http_broker sending {} request={}", method, uri,); - let http_request = Request::from_parts(parts, Body::empty()); + let http_request = Request::builder() + .uri(uri) + .method(method) + .body(body) + .map_err(|e| RippleError::BrokerError(e.to_string()))?; - debug!( - "http_broker sending {} request={}", - method, - http_request.uri(), - ); match client.request(http_request).await { Ok(v) => Ok(v), Err(e) => { @@ -129,7 +153,7 @@ impl EndpointBroker for HttpBroker { while let Some(request) = tr.recv().await { LogSignal::new("http_broker".to_string(), format!("received request - start processing request={:?}", request), request.rpc.ctx.clone()) .with_diagnostic_context_item("rule_alias", request.rule.alias.as_str()).emit_debug(); - match send_http_request(&client, Method::GET, &uri, &request.clone().rule.alias) + match send_http_request(&client, &uri, request.clone()) .await { Ok(response) => { diff --git a/core/main/src/firebolt/handlers/discovery_rpc.rs b/core/main/src/firebolt/handlers/discovery_rpc.rs index 9a813acda..a077a2cf1 100644 --- a/core/main/src/firebolt/handlers/discovery_rpc.rs +++ b/core/main/src/firebolt/handlers/discovery_rpc.rs @@ -18,8 +18,8 @@ use std::{collections::HashMap, time::Duration}; use crate::{ - firebolt::handlers::privacy_rpc::PrivacyImpl, - firebolt::rpc::RippleRPCProvider, + broker::broker_utils::BrokerUtils, + firebolt::{handlers::privacy_rpc::PrivacyImpl, rpc::RippleRPCProvider}, service::apps::{ app_events::{AppEventDecorationError, AppEventDecorator, AppEvents}, provider_broker::{self, ProviderBroker}, @@ -37,7 +37,7 @@ use jsonrpsee::{ use ripple_sdk::{ api::{ account_link::AccountLinkRequest, - apps::{AppError, AppManagerResponse, AppMethod, AppRequest, AppResponse}, + apps::{AppError, AppManagerResponse, AppMethod, AppRequest}, config::Config, firebolt::{ fb_capabilities::FireboltCap, @@ -51,7 +51,7 @@ use ripple_sdk::{ }, }, extn::extn_client_message::ExtnResponse, - log::{error, info}, + log::{debug, error, info}, tokio::{sync::oneshot, time::timeout}, }; use ripple_sdk::{ @@ -527,7 +527,7 @@ impl DiscoveryServer for DiscoveryImpl { DiscoveryImpl::get_content_policy(&ctx, &self.state, &ctx.app_id).await } - async fn launch(&self, ctx: CallContext, request: LaunchRequest) -> RpcResult { + async fn launch(&self, _ctx: CallContext, request: LaunchRequest) -> RpcResult { let app_defaults_configuration = self.state.get_device_manifest().applications.defaults; let intent_validation_config = self @@ -537,7 +537,7 @@ impl DiscoveryServer for DiscoveryImpl { .intent_validation; validate_navigation_intent(intent_validation_config, request.intent.clone()).await?; - let req_updated_source = update_intent_source(ctx.app_id.clone(), request.clone()); + let t_state = &mut self.state.clone(); if let Some(reserved_app_id) = app_defaults_configuration.get_reserved_application_id(&request.app_id) @@ -552,47 +552,26 @@ impl DiscoveryServer for DiscoveryImpl { )); } - // Not validating the intent, pass-through to app as is. - if !AppEvents::is_app_registered_for_event( - &self.state, - reserved_app_id.to_string(), - DISCOVERY_EVENT_ON_NAVIGATE_TO, - ) { - return Err(rpc_navigate_reserved_app_err( - format!("Discovery.launch: reserved app id {} is not registered for discovery.onNavigateTo event", - reserved_app_id).as_str(), - )); - } - // emit EVENT_ON_NAVIGATE_TO to the reserved app. - AppEvents::emit_to_app( - &self.state, - reserved_app_id.to_string(), - DISCOVERY_EVENT_ON_NAVIGATE_TO, - &serde_json::to_value(req_updated_source.intent).unwrap(), + match BrokerUtils::process_internal_main_request( + t_state, + "discovery.launch.internal", + Some(serde_json::to_value(request).map_err(|e| { + error!("Serialization error: {:?}", e); + rpc_err("Failed to serialize LaunchIntent") + })?), ) - .await; - info!( - "emit_to_app called for app {} event {}", - reserved_app_id.to_string(), - DISCOVERY_EVENT_ON_NAVIGATE_TO - ); - return Ok(true); - } - let (app_resp_tx, app_resp_rx) = oneshot::channel::(); - - let app_request = - AppRequest::new(AppMethod::Launch(req_updated_source.clone()), app_resp_tx); - - if self - .state - .get_client() - .send_app_request(app_request) - .is_ok() - && app_resp_rx.await.is_ok() - { - return Ok(true); + .await + { + Ok(val) => { + debug!("Internal subscription launch successful"); + return Ok(val.as_bool().unwrap_or(false)); + } + Err(e) => { + error!("Internal subscription launch failed: {:?}", e); + return Err(rpc_err("Internal subscription launch failed")); + } + } } - Err(jsonrpsee::core::Error::Custom(String::from( "Discovery.launch: some failure", ))) @@ -788,70 +767,69 @@ impl DiscoveryServer for DiscoveryImpl { } } } -fn update_intent_source(source_app_id: String, request: LaunchRequest) -> LaunchRequest { - let source = format!("xrn:firebolt:application:{}", source_app_id); - match request.intent.clone() { - Some(NavigationIntent::NavigationIntentStrict(navigation_intent)) => { - let updated_navigation_intent = match navigation_intent { - NavigationIntentStrict::Home(mut home_intent) => { - home_intent.context.source = source; - NavigationIntentStrict::Home(home_intent) - } - NavigationIntentStrict::Launch(mut launch_intent) => { - launch_intent.context.source = source; - NavigationIntentStrict::Launch(launch_intent) - } - NavigationIntentStrict::Entity(mut entity_intent) => { - entity_intent.context.source = source; - NavigationIntentStrict::Entity(entity_intent) - } - NavigationIntentStrict::Playback(mut playback_intent) => { - playback_intent.context.source = source; - NavigationIntentStrict::Playback(playback_intent) - } - NavigationIntentStrict::Search(mut search_intent) => { - search_intent.context.source = source; - NavigationIntentStrict::Search(search_intent) - } - NavigationIntentStrict::Section(mut section_intent) => { - section_intent.context.source = source; - NavigationIntentStrict::Section(section_intent) - } - NavigationIntentStrict::Tune(mut tune_intent) => { - tune_intent.context.source = source; - NavigationIntentStrict::Tune(tune_intent) - } - NavigationIntentStrict::ProviderRequest(mut provider_request_intent) => { - provider_request_intent.context.source = source; - NavigationIntentStrict::ProviderRequest(provider_request_intent) - } - NavigationIntentStrict::PlayEntity(mut p) => { - p.context.source = source; - NavigationIntentStrict::PlayEntity(p) - } - NavigationIntentStrict::PlayQuery(mut p) => { - p.context.source = source; - NavigationIntentStrict::PlayQuery(p) - } - }; - - LaunchRequest { - app_id: request.app_id, - intent: Some(NavigationIntent::NavigationIntentStrict( - updated_navigation_intent, - )), - } - } - Some(NavigationIntent::NavigationIntentLoose(mut loose_intent)) => { - loose_intent.context.source = source; - LaunchRequest { - app_id: request.app_id, - intent: Some(NavigationIntent::NavigationIntentLoose(loose_intent)), - } - } - _ => request, - } -} +// fn update_intent_source(source: String, request: LaunchRequest) -> LaunchRequest { +// match request.intent.clone() { +// Some(NavigationIntent::NavigationIntentStrict(navigation_intent)) => { +// let updated_navigation_intent = match navigation_intent { +// NavigationIntentStrict::Home(mut home_intent) => { +// home_intent.context.source = source; +// NavigationIntentStrict::Home(home_intent) +// } +// NavigationIntentStrict::Launch(mut launch_intent) => { +// launch_intent.context.source = source; +// NavigationIntentStrict::Launch(launch_intent) +// } +// NavigationIntentStrict::Entity(mut entity_intent) => { +// entity_intent.context.source = source; +// NavigationIntentStrict::Entity(entity_intent) +// } +// NavigationIntentStrict::Playback(mut playback_intent) => { +// playback_intent.context.source = source; +// NavigationIntentStrict::Playback(playback_intent) +// } +// NavigationIntentStrict::Search(mut search_intent) => { +// search_intent.context.source = source; +// NavigationIntentStrict::Search(search_intent) +// } +// NavigationIntentStrict::Section(mut section_intent) => { +// section_intent.context.source = source; +// NavigationIntentStrict::Section(section_intent) +// } +// NavigationIntentStrict::Tune(mut tune_intent) => { +// tune_intent.context.source = source; +// NavigationIntentStrict::Tune(tune_intent) +// } +// NavigationIntentStrict::ProviderRequest(mut provider_request_intent) => { +// provider_request_intent.context.source = source; +// NavigationIntentStrict::ProviderRequest(provider_request_intent) +// } +// NavigationIntentStrict::PlayEntity(mut p) => { +// p.context.source = source; +// NavigationIntentStrict::PlayEntity(p) +// } +// NavigationIntentStrict::PlayQuery(mut p) => { +// p.context.source = source; +// NavigationIntentStrict::PlayQuery(p) +// } +// }; + +// LaunchRequest { +// app_id: request.app_id, +// intent: Some(NavigationIntent::NavigationIntentStrict( +// updated_navigation_intent, +// )), +// } +// } +// Some(NavigationIntent::NavigationIntentLoose(mut loose_intent)) => { +// loose_intent.context.source = source; +// LaunchRequest { +// app_id: request.app_id, +// intent: Some(NavigationIntent::NavigationIntentLoose(loose_intent)), +// } +// } +// _ => request, +// } +// } pub async fn validate_navigation_intent( intent_validation_config: IntentValidation, diff --git a/core/main/src/firebolt/handlers/privacy_rpc.rs b/core/main/src/firebolt/handlers/privacy_rpc.rs index 5a376123a..c9bc59106 100644 --- a/core/main/src/firebolt/handlers/privacy_rpc.rs +++ b/core/main/src/firebolt/handlers/privacy_rpc.rs @@ -26,6 +26,13 @@ use jsonrpsee::{ proc_macros::rpc, RpcModule, }; +use ripple_sdk::api::device::device_user_grants_data::GrantStatus; +use ripple_sdk::api::distributor::distributor_privacy::ShareWatchHistoryRequest; +use ripple_sdk::api::distributor::distributor_usergrants::{ + UserGrantsCloudSetParams, UserGrantsCloudStoreRequest, +}; +use ripple_sdk::api::usergrant_entry::UserGrantInfo; +use ripple_sdk::log::{error, info}; use ripple_sdk::utils::rpc_utils::rpc_error_with_code_result; use ripple_sdk::{ api::{ @@ -61,7 +68,6 @@ use std::collections::HashMap; pub const US_PRIVACY_KEY: &str = "us_privacy"; pub const LMT_KEY: &str = "lmt"; - #[derive(Debug, Clone)] struct AllowAppContentAdTargetingSettings { lmt: String, @@ -318,6 +324,13 @@ pub trait Privacy { ) -> RpcResult; #[method(name = "privacy.settings")] async fn get_settings(&self, ctx: CallContext) -> RpcResult; + + #[method(name = "privacysettings.shareWatchHistory")] + async fn set_share_watch_history( + &self, + ctx: CallContext, + share: ShareWatchHistoryRequest, + ) -> RpcResult<()>; } pub async fn get_allow_app_content_ad_targeting_settings( @@ -1095,6 +1108,62 @@ impl PrivacyServer for PrivacyImpl { } } } + async fn set_share_watch_history( + &self, + ctx: CallContext, + request: ShareWatchHistoryRequest, + ) -> RpcResult<()> { + info!( + "RPC call: privacySettings.shareWatchHistory with share={}", + request.share + ); + let session = if let Some(session) = self.state.session_state.get_account_session() { + session + } else { + return Err(jsonrpsee::core::Error::Custom(String::from( + "Account session is not available", + ))); + }; + + // Create a UserGrantInfo object for watch history sharing + + let user_grant_info = UserGrantInfo { + status: if request.share { + Some(GrantStatus::Allowed) + } else { + Some(GrantStatus::Denied) + }, + app_name: Some(ctx.app_id), + ..Default::default() + }; + + // Remove optOutContext field for now as it is supported only by xcal:primaryContentAdTargeting privacy setting + // let opt_out_context = "xcal:watchHistory".to_string(); + + let params = UserGrantsCloudSetParams { + account_session: session, + user_grant_info, + }; + + let payload = UserGrantsCloudStoreRequest::SetShareWatchHistoryCloud(params); + + match self.state.get_client().send_extn_request(payload).await { + Ok(_) => { + info!( + "Successfully set watch history sharing to {}", + request.share + ); + Ok(()) + } + Err(e) => { + error!("Failed to set watch history sharing: {:?}", e); + Err(jsonrpsee::core::Error::Custom(format!( + "Failed to set watch history sharing: {:?}", + e + ))) + } + } + } } pub struct PrivacyProvider; diff --git a/core/sdk/src/api/distributor/distributor_privacy.rs b/core/sdk/src/api/distributor/distributor_privacy.rs index 6b1ffb26b..b397a2808 100644 --- a/core/sdk/src/api/distributor/distributor_privacy.rs +++ b/core/sdk/src/api/distributor/distributor_privacy.rs @@ -338,6 +338,12 @@ pub enum PrivacyResponse { Grants(UserGrants), } +#[derive(Debug, Deserialize, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ShareWatchHistoryRequest { + pub share: bool, +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/sdk/src/api/distributor/distributor_usergrants.rs b/core/sdk/src/api/distributor/distributor_usergrants.rs index 00208425f..fe75ea46f 100644 --- a/core/sdk/src/api/distributor/distributor_usergrants.rs +++ b/core/sdk/src/api/distributor/distributor_usergrants.rs @@ -37,10 +37,12 @@ pub struct UserGrantsCloudSetParams { pub account_session: AccountSession, pub user_grant_info: UserGrantInfo, } + #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] pub enum UserGrantsCloudStoreRequest { GetCloudUserGrants(UserGrantsCloudGetParams), SetCloudUserGrants(UserGrantsCloudSetParams), + SetShareWatchHistoryCloud(UserGrantsCloudSetParams), } impl ExtnPayloadProvider for UserGrantsCloudStoreRequest {