From 550b17775e6d0fc8bf510e9bcf432e01d2e96402 Mon Sep 17 00:00:00 2001 From: Stefan Bossbaly Date: Thu, 4 Jan 2024 22:14:31 -0500 Subject: [PATCH] Migrate to parking_lot Migrate from std implementations of Mutex and RwLock to the parking_lot implementations. They are faster and more flexible than the those found in the Rust standard library. --- core/launcher/src/manager/app_launcher.rs | 22 ++-- .../launcher/src/manager/container_manager.rs | 26 ++-- .../launcher/src/manager/container_message.rs | 3 +- core/launcher/src/manager/view_manager.rs | 45 +++---- .../bootstrap/extn/load_extn_metadata_step.rs | 2 +- .../main/src/bootstrap/extn/load_extn_step.rs | 6 +- .../bootstrap/extn/start_extn_channel_step.rs | 4 +- core/main/src/firebolt/rpc_router.rs | 8 +- .../src/processor/main_context_processor.rs | 5 +- core/main/src/service/apps/app_events.rs | 24 ++-- core/main/src/service/apps/apps_updater.rs | 22 ++-- .../apps/delegated_launcher_handler.rs | 37 +++--- core/main/src/service/apps/provider_broker.rs | 28 ++--- core/main/src/service/data_governance.rs | 7 +- core/main/src/service/extn/ripple_client.rs | 5 +- core/main/src/service/user_grants.rs | 69 ++++------ core/main/src/state/cap/cap_state.rs | 8 +- core/main/src/state/cap/generic_cap_state.rs | 13 +- core/main/src/state/cap/permitted_state.rs | 11 +- core/main/src/state/extn_state.rs | 20 ++- core/main/src/state/metrics_state.rs | 23 ++-- core/main/src/state/openrpc_state.rs | 38 +++--- core/main/src/state/session_state.rs | 32 +++-- core/sdk/Cargo.toml | 2 +- core/sdk/src/api/apps.rs | 18 +-- core/sdk/src/extn/client/extn_client.rs | 119 ++++++++---------- core/sdk/src/framework/bootstrap.rs | 12 +- core/sdk/src/lib.rs | 1 + core/sdk/src/utils/time_utils.rs | 10 +- .../mock_device/src/mock_web_socket_server.rs | 14 +-- .../src/events/thunder_event_processor.rs | 17 ++- .../src/processors/thunder_device_info.rs | 39 +++--- .../thunder_ripple_sdk/src/thunder_state.rs | 5 +- .../general/src/general_privacy_processor.rs | 12 +- .../general/src/general_session_processor.rs | 9 +- 35 files changed, 323 insertions(+), 393 deletions(-) diff --git a/core/launcher/src/manager/app_launcher.rs b/core/launcher/src/manager/app_launcher.rs index 0cacaa645..13c612c7f 100644 --- a/core/launcher/src/manager/app_launcher.rs +++ b/core/launcher/src/manager/app_launcher.rs @@ -17,7 +17,7 @@ use std::{ collections::HashMap, - sync::{Arc, RwLock}, + sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -45,6 +45,7 @@ use ripple_sdk::{ extn::extn_client_message::ExtnResponse, framework::ripple_contract::RippleContract, log::{debug, error, info, warn}, + parking_lot::RwLock, tokio::{ self, time::{sleep, Duration}, @@ -117,7 +118,6 @@ impl AppLauncherState { Some(t) => self .apps .read() - .unwrap() .iter() .filter(|(_app_id, app)| app.launch_params._type.eq(&t)) .count(), @@ -129,11 +129,11 @@ impl AppLauncherState { } fn get_app_len(&self) -> usize { - self.apps.read().unwrap().len() + self.apps.read().len() } fn get_app_by_id(&self, app_id: &str) -> Option { - let v = self.apps.read().unwrap(); + let v = self.apps.read(); let r = v.get(app_id); if let Some(r) = r { let v = r.clone(); @@ -143,12 +143,12 @@ impl AppLauncherState { } fn get_apps(&self) -> Vec { - let r = self.apps.read().unwrap(); + let r = self.apps.read(); r.iter().map(|(_s, app)| app.clone()).collect() } fn set_app_state(&self, container_id: &str, lifecycle_state: LifecycleState) { - let mut v = self.apps.write().unwrap(); + let mut v = self.apps.write(); let r = v.get_mut(container_id); if let Some(r) = r { r.state = lifecycle_state @@ -156,7 +156,7 @@ impl AppLauncherState { } fn set_app_ready(&self, app_id: &str) { - let mut v = self.apps.write().unwrap(); + let mut v = self.apps.write(); let r = v.get_mut(app_id); if let Some(r) = r { r.ready = true; @@ -164,7 +164,7 @@ impl AppLauncherState { } fn set_app_viewid(&self, container_id: &str, view_id: Uuid) { - let mut v = self.apps.write().unwrap(); + let mut v = self.apps.write(); let r = v.get_mut(container_id); if let Some(r) = r { r.container_props.view_id = view_id @@ -172,18 +172,18 @@ impl AppLauncherState { } fn add_app(&self, key: String, app: App) { - let mut r = self.apps.write().unwrap(); + let mut r = self.apps.write(); r.insert(key, app); } fn remove_app(&self, key: &str) -> Option { - let mut r = self.apps.write().unwrap(); + let mut r = self.apps.write(); r.remove(key) } fn always_retained_apps(&self, policy: RetentionPolicy) -> Vec { let mut candidates = Vec::new(); - for (_id, app) in self.apps.read().unwrap().iter() { + for (_id, app) in self.apps.read().iter() { if policy.always_retained.contains(&app.app_id) { continue; } diff --git a/core/launcher/src/manager/container_manager.rs b/core/launcher/src/manager/container_manager.rs index 2bec785a7..b4cae0921 100644 --- a/core/launcher/src/manager/container_manager.rs +++ b/core/launcher/src/manager/container_manager.rs @@ -15,10 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, sync::Arc}; use ripple_sdk::{ api::{ @@ -26,6 +23,7 @@ use ripple_sdk::{ firebolt::fb_lifecycle::LifecycleState, }, log::{debug, error}, + parking_lot::RwLock, }; use serde::{Deserialize, Serialize}; @@ -63,7 +61,7 @@ pub struct ContainerState { impl ContainerState { fn get_prev_stack(&self) -> Option { let prev_container = { - let stack = self.stack.read().unwrap(); + let stack = self.stack.read(); stack.peek().cloned() }; prev_container @@ -71,48 +69,48 @@ impl ContainerState { fn get_container_by_name(&self, id: &String) -> Option { { - let container = self.containers.read().unwrap(); + let container = self.containers.read(); container.get(id).cloned() } } fn add_container(&self, k: String, v: ContainerProperties) { - let mut containers = self.containers.write().unwrap(); + let mut containers = self.containers.write(); containers.insert(k, v); } fn remove_container(&self, k: String) { - let mut containers = self.containers.write().unwrap(); + let mut containers = self.containers.write(); containers.remove(&k); } fn contains_stack_by_name(&self, id: &String) -> bool { - let mut stack = self.stack.write().unwrap(); + let mut stack = self.stack.write(); stack.contains(id) } fn stack_len(&self) -> usize { - let stack = self.stack.write().unwrap(); + let stack = self.stack.write(); stack.len() } fn add_stack(&self, id: String) { - let mut stack = self.stack.write().unwrap(); + let mut stack = self.stack.write(); stack.push(id); } fn pop_stack_by_name(&self, name: &str) { - let mut stack = self.stack.write().unwrap(); + let mut stack = self.stack.write(); stack.pop_item(name); } fn bring_stack_to_front(&self, name: &str) { - let mut stack = self.stack.write().unwrap(); + let mut stack = self.stack.write(); stack.bring_to_front(name); } fn send_stack_to_back(&self, name: &str) { - let mut stack = self.stack.write().unwrap(); + let mut stack = self.stack.write(); stack.send_to_back(name); } } diff --git a/core/launcher/src/manager/container_message.rs b/core/launcher/src/manager/container_message.rs index 92a21ff3d..998508081 100644 --- a/core/launcher/src/manager/container_message.rs +++ b/core/launcher/src/manager/container_message.rs @@ -15,9 +15,10 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use ripple_sdk::{ + parking_lot::RwLock, tokio::sync::{mpsc, oneshot}, uuid::Uuid, }; diff --git a/core/launcher/src/manager/view_manager.rs b/core/launcher/src/manager/view_manager.rs index b08a96eb2..74ebcfb76 100644 --- a/core/launcher/src/manager/view_manager.rs +++ b/core/launcher/src/manager/view_manager.rs @@ -15,10 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, sync::Arc}; use ripple_sdk::{ api::{ @@ -30,6 +27,7 @@ use ripple_sdk::{ manifest::apps::AppProperties, }, log::error, + parking_lot::RwLock, tokio::sync::oneshot, utils::{channel_utils::oneshot_send_and_log, error::RippleError}, uuid::Uuid, @@ -87,12 +85,13 @@ impl ViewRequest { } pub fn send_response(&self, response: ViewResponse) -> Result<(), RippleError> { - let mut sender = self.resp_tx.write().unwrap(); - if sender.is_some() { - oneshot_send_and_log(sender.take().unwrap(), response, "ViewManager response"); - Ok(()) - } else { - Err(RippleError::SenderMissing) + let mut sender = self.resp_tx.write(); + match sender.take() { + Some(tx) => { + oneshot_send_and_log(tx, response, "ViewManager response"); + Ok(()) + } + None => Err(RippleError::SenderMissing), } } } @@ -142,27 +141,23 @@ pub struct ViewState { impl ViewState { fn insert_view(&self, key: String, view: ViewId) { - let _ = self.view_pool.write().unwrap().insert(key, view); + let _ = self.view_pool.write().insert(key, view); } fn get_name(&self, key: ViewId) -> Option { - self.view_pool - .read() - .unwrap() - .iter() - .find_map( - |(name, &id)| { - if id == key { - Some(name.clone()) - } else { - None - } - }, - ) + self.view_pool.read().iter().find_map( + |(name, &id)| { + if id == key { + Some(name.clone()) + } else { + None + } + }, + ) } fn remove(&self, key: &str) { - let _ = self.view_pool.write().unwrap().remove(key); + let _ = self.view_pool.write().remove(key); } } diff --git a/core/main/src/bootstrap/extn/load_extn_metadata_step.rs b/core/main/src/bootstrap/extn/load_extn_metadata_step.rs index 6770c2583..d7c1e5930 100644 --- a/core/main/src/bootstrap/extn/load_extn_metadata_step.rs +++ b/core/main/src/bootstrap/extn/load_extn_metadata_step.rs @@ -69,7 +69,7 @@ impl Bootstep for LoadExtensionMetadataStep { }) .collect(); unsafe { - let mut loaded_extns = state.extn_state.loaded_libraries.write().unwrap(); + let mut loaded_extns = state.extn_state.loaded_libraries.write(); for (extn_path, entry) in extn_paths { debug!(""); debug!(""); diff --git a/core/main/src/bootstrap/extn/load_extn_step.rs b/core/main/src/bootstrap/extn/load_extn_step.rs index df9051a38..8507457b8 100644 --- a/core/main/src/bootstrap/extn/load_extn_step.rs +++ b/core/main/src/bootstrap/extn/load_extn_step.rs @@ -46,7 +46,7 @@ impl Bootstep for LoadExtensionsStep { "LoadExtensionsStep".into() } async fn setup(&self, state: BootstrapState) -> Result<(), RippleError> { - let loaded_extensions = state.extn_state.loaded_libraries.read().unwrap(); + let loaded_extensions = state.extn_state.loaded_libraries.read(); let mut deferred_channels: Vec = Vec::new(); let mut device_channels: Vec = Vec::new(); let mut jsonrpsee_extns: Methods = Methods::new(); @@ -122,13 +122,13 @@ impl Bootstep for LoadExtensionsStep { } { - let mut device_channel_state = state.extn_state.device_channels.write().unwrap(); + let mut device_channel_state = state.extn_state.device_channels.write(); info!("{} Device channels extension loaded", device_channels.len()); let _ = device_channel_state.extend(device_channels); } { - let mut deferred_channel_state = state.extn_state.deferred_channels.write().unwrap(); + let mut deferred_channel_state = state.extn_state.deferred_channels.write(); info!( "{} Deferred channels extension loaded", deferred_channels.len() diff --git a/core/main/src/bootstrap/extn/start_extn_channel_step.rs b/core/main/src/bootstrap/extn/start_extn_channel_step.rs index 1391c7085..7c15ba96d 100644 --- a/core/main/src/bootstrap/extn/start_extn_channel_step.rs +++ b/core/main/src/bootstrap/extn/start_extn_channel_step.rs @@ -53,7 +53,7 @@ impl Bootstep for StartExtnChannelsStep { async fn setup(&self, state: BootstrapState) -> Result<(), RippleError> { let mut extn_ids = Vec::new(); { - let mut device_channels = state.extn_state.device_channels.write().unwrap(); + let mut device_channels = state.extn_state.device_channels.write(); while let Some(device_channel) = device_channels.pop() { let id = device_channel.extn_id.clone(); extn_ids.push(id); @@ -65,7 +65,7 @@ impl Bootstep for StartExtnChannelsStep { } { - let mut deferred_channels = state.extn_state.deferred_channels.write().unwrap(); + let mut deferred_channels = state.extn_state.deferred_channels.write(); while let Some(deferred_channel) = deferred_channels.pop() { let id = deferred_channel.extn_id.clone(); extn_ids.push(id); diff --git a/core/main/src/firebolt/rpc_router.rs b/core/main/src/firebolt/rpc_router.rs index 4c6855661..fb355af14 100644 --- a/core/main/src/firebolt/rpc_router.rs +++ b/core/main/src/firebolt/rpc_router.rs @@ -27,6 +27,7 @@ use jsonrpsee::{ }, types::{error::ErrorCode, Id, Params}, }; + use ripple_sdk::{ api::{ apps::EffectiveTransport, @@ -36,11 +37,12 @@ use ripple_sdk::{ chrono::Utc, extn::extn_client_message::{ExtnMessage, ExtnResponse}, log::{error, info}, + parking_lot::RwLock, serde_json::{self, Result as SResult}, tokio::{self}, utils::error::RippleError, }; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use crate::{ service::telemetry_builder::TelemetryBuilder, @@ -64,12 +66,12 @@ impl RouterState { } pub fn update_methods(&self, methods: Methods) { - let mut methods_state = self.methods.write().unwrap(); + let mut methods_state = self.methods.write(); let _ = methods_state.merge(methods.initialize_resources(&self.resources).unwrap()); } fn get_methods(&self) -> Methods { - self.methods.read().unwrap().clone() + self.methods.read().clone() } } diff --git a/core/main/src/processor/main_context_processor.rs b/core/main/src/processor/main_context_processor.rs index fca6a8648..67f6a7542 100644 --- a/core/main/src/processor/main_context_processor.rs +++ b/core/main/src/processor/main_context_processor.rs @@ -16,7 +16,7 @@ // use std::{ - sync::{Arc, Once, RwLock}, + sync::{Arc, Once}, time::Duration, }; @@ -41,6 +41,7 @@ use ripple_sdk::{ extn_client_message::{ExtnMessage, ExtnResponse}, }, log::{debug, error, info}, + parking_lot::RwLock, tokio::{ self, sync::{mpsc::Receiver as MReceiver, mpsc::Sender as MSender}, @@ -332,7 +333,7 @@ impl ExtnEventProcessor for MainContextProcessor { _ => {} } { - let mut context = state.current_context.write().unwrap(); + let mut context = state.current_context.write(); context.deep_copy(extracted_message); } } diff --git a/core/main/src/service/apps/app_events.rs b/core/main/src/service/apps/app_events.rs index d43c2113f..b8feac9d9 100644 --- a/core/main/src/service/apps/app_events.rs +++ b/core/main/src/service/apps/app_events.rs @@ -27,15 +27,13 @@ use ripple_sdk::{ protocol::BridgeProtocolRequest, }, log::error, + parking_lot::RwLock, serde_json::{json, Value}, tokio::sync::mpsc, utils::channel_utils::mpsc_send_and_log, }; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, sync::Arc}; use crate::state::platform_state::PlatformState; @@ -85,7 +83,7 @@ impl std::fmt::Debug for AppEventsState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut listeners_debug = HashMap::::default(); - for (event_name, context_map) in self.listeners.read().unwrap().iter() { + for (event_name, context_map) in self.listeners.read().iter() { for (context, listeners) in context_map.iter() { listeners .iter() @@ -240,7 +238,7 @@ impl AppEvents { } let session = session.unwrap(); let app_events_state = &state.app_events_state; - let mut listeners = app_events_state.listeners.write().unwrap(); + let mut listeners = app_events_state.listeners.write(); let event_ctx_string = event_context.map(|x| x.to_string()); if listen_request.listen { @@ -301,7 +299,7 @@ impl AppEvents { event_name: &str, context: Option, ) -> Vec { - let listeners = state.listeners.read().unwrap(); + let listeners = state.listeners.read(); let mut vec = Vec::new(); if let Some(entry) = listeners.get(event_name) { @@ -416,7 +414,7 @@ impl AppEvents { pub fn remove_session(state: &PlatformState, session_id: String) { state.session_state.clear_session(&session_id); - let mut listeners = state.app_events_state.listeners.write().unwrap(); + let mut listeners = state.app_events_state.listeners.write(); let all_events = listeners.keys().cloned().collect::>(); for event_name in all_events { if let Some(ctx_map) = listeners.get_mut(&event_name) { @@ -462,15 +460,7 @@ pub mod tests { call_context, listen_request, ); - assert!( - platform_state - .app_events_state - .listeners - .read() - .unwrap() - .len() - == 1 - ); + assert!(platform_state.app_events_state.listeners.read().len() == 1); let listeners = AppEvents::get_listeners(&platform_state.app_events_state, "test_event", None); assert!(listeners.len() == 1); diff --git a/core/main/src/service/apps/apps_updater.rs b/core/main/src/service/apps/apps_updater.rs index 0172aba9f..f035a03e3 100644 --- a/core/main/src/service/apps/apps_updater.rs +++ b/core/main/src/service/apps/apps_updater.rs @@ -16,6 +16,7 @@ use ripple_sdk::{ extn_client_message::{ExtnMessage, ExtnPayload, ExtnResponse}, }, log::info, + parking_lot::RwLock, tokio::sync::mpsc::{Receiver, Sender}, }; @@ -26,7 +27,7 @@ use serde::{Deserialize, Serialize}; use std::{ fs, path::{Path, PathBuf}, - sync::{Arc, RwLock}, + sync::Arc, }; #[cfg(test)] use std::{println as debug, println as error}; @@ -75,7 +76,7 @@ impl AppsUpdaterState { } pub fn persist_failed_installs(&self) -> bool { - let failed_installs = self.failed_app_installs.read().unwrap(); + let failed_installs = self.failed_app_installs.read(); let path = std::path::Path::new(&self.failed_app_installs_persist_path) .join(FAILED_INSTALLS_FILE_NAME); match fs::OpenOptions::new() @@ -101,7 +102,7 @@ impl AppsUpdaterState { pub fn success_install(&self, app_id: String) { let did_change = { - let mut failed = self.failed_app_installs.write().unwrap(); + let mut failed = self.failed_app_installs.write(); let len_before = failed.len(); failed.retain(|a| a.app_id != app_id); failed.len() != len_before @@ -113,7 +114,7 @@ impl AppsUpdaterState { pub fn fail_install(&self, install: FailedAppInstall) { { - let mut failed = self.failed_app_installs.write().unwrap(); + let mut failed = self.failed_app_installs.write(); if let Some(exists_pos) = failed.iter().position(|a| a.app_id == install.app_id) { if let Some(exists) = failed.get_mut(exists_pos) { exists.failed_install_version = install.failed_install_version.clone(); @@ -126,7 +127,7 @@ impl AppsUpdaterState { } pub fn remove_pending(&mut self, app_id: &String, version: &String) -> Option { - let mut pi = self.pending_installs.write().unwrap(); + let mut pi = self.pending_installs.write(); pi.iter() .position(|a| a.app_id == *app_id && a.version == *version) @@ -255,7 +256,7 @@ fn is_expected_install( return in_old_catalog; } // If not in the old catalog, check if it failed to install - let fis = failed_installs.read().unwrap(); + let fis = failed_installs.read(); let failed_install = fis.iter().find(|a| a.app_id == installed_app.id); match failed_install { Some(fi) => match &fi.last_good_version { @@ -393,7 +394,7 @@ pub async fn update(mut state: AppsUpdaterState, apps_catalog_update: AppsCatalo Some(c) => c.iter().find(|a| a.id == app.id).map(|a| a.version.clone()), None => None, }; - state.pending_installs.write().unwrap().push(AppInstall { + state.pending_installs.write().push(AppInstall { app_id: app.id.clone(), version: app.version.clone(), previous_version: previous_version.clone(), @@ -434,11 +435,7 @@ fn install_complete(mut state: AppsUpdaterState, op: AppOperationComplete) { #[cfg(test)] pub mod tests { - use std::{ - path::Path, - sync::{Arc, RwLock}, - time::Duration, - }; + use std::{path::Path, sync::Arc, time::Duration}; use super::{AppsUpdater, AppsUpdaterState}; use crate::{service::extn::ripple_client::RippleClient, state::platform_state::PlatformState}; @@ -456,6 +453,7 @@ pub mod tests { extn_client_message::{ExtnMessage, ExtnResponse}, mock_extension_client::{MockExtnClient, MockExtnRequest}, }, + parking_lot::RwLock, tokio::{self, spawn, sync::mpsc, task::JoinHandle, time::sleep}, }; use ripple_sdk::{ diff --git a/core/main/src/service/apps/delegated_launcher_handler.rs b/core/main/src/service/apps/delegated_launcher_handler.rs index a0f7cfac7..313ca100d 100644 --- a/core/main/src/service/apps/delegated_launcher_handler.rs +++ b/core/main/src/service/apps/delegated_launcher_handler.rs @@ -15,11 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashMap, - env, fs, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, env, fs, sync::Arc}; use ripple_sdk::{ api::{ @@ -45,6 +41,7 @@ use ripple_sdk::{ gateway::rpc_gateway_api::{AppIdentification, CallerSession}, }, log::{debug, error, trace, warn}, + parking_lot::RwLock, serde_json::{self}, tokio::sync::oneshot, utils::{error::RippleError, time_utils::Timer}, @@ -175,17 +172,16 @@ impl AppManagerState { path.display().to_string() } pub fn get_persisted_app_title_for_app_id(&self, app_id: &str) -> Option { - self.app_title.read().unwrap().get(app_id).cloned() + self.app_title.read().get(app_id).cloned() } pub fn persist_app_title(&self, app_id: &str, title: &str) -> bool { { let _ = self .app_title .write() - .unwrap() .insert(app_id.to_owned(), title.to_owned()); } - let map = { self.app_title.read().unwrap().clone() }; + let map = { self.app_title.read().clone() }; let path = std::path::Path::new(&self.app_title_persist_path).join(APP_ID_TITLE_FILE_NAME); if let Ok(file) = fs::OpenOptions::new() .create(true) @@ -201,17 +197,16 @@ impl AppManagerState { false } pub fn exists(&self, app_id: &str) -> bool { - self.apps.read().unwrap().contains_key(app_id) + self.apps.read().contains_key(app_id) } pub fn get_app_id_from_session_id(&self, session_id: &str) -> Option { { - debug!("apps and sessions {:?}", self.apps.read().unwrap()); + debug!("apps and sessions {:?}", self.apps.read()); } if let Some((_, app)) = self .apps .read() - .unwrap() .iter() .find(|(_, app)| app.session_id.eq(session_id)) { @@ -222,14 +217,14 @@ impl AppManagerState { } fn set_session(&self, app_id: &str, session: AppSession) { - let mut apps = self.apps.write().unwrap(); + let mut apps = self.apps.write(); if let Some(app) = apps.get_mut(app_id) { app.current_session = session } } fn update_active_session(&self, app_id: &str, session: Option) { - let mut apps = self.apps.write().unwrap(); + let mut apps = self.apps.write(); if let Some(app) = apps.get_mut(app_id) { debug!( "Setting session : {{ appId:{} , session:{:?} }}", @@ -240,33 +235,33 @@ impl AppManagerState { } fn set_state(&self, app_id: &str, state: LifecycleState) { - let mut apps = self.apps.write().unwrap(); + let mut apps = self.apps.write(); if let Some(app) = apps.get_mut(app_id) { app.state = state; } } fn insert(&self, app_id: String, app: App) { - let mut apps = self.apps.write().unwrap(); + let mut apps = self.apps.write(); let _ = apps.insert(app_id, app); } pub fn get(&self, app_id: &str) -> Option { - self.apps.read().unwrap().get(app_id).cloned() + self.apps.read().get(app_id).cloned() } fn remove(&self, app_id: &str) -> Option { - let mut apps = self.apps.write().unwrap(); + let mut apps = self.apps.write(); apps.remove(app_id) } fn set_internal_state(&mut self, app_id: &str, method: AppMethod) { - let mut apps = self.apps.write().unwrap(); + let mut apps = self.apps.write(); if let Some(app) = apps.get_mut(app_id) { app.internal_state = Some(method); } } fn get_internal_state(&mut self, app_id: &str) -> Option { - let apps = self.apps.read().unwrap(); + let apps = self.apps.read(); if let Some(app) = apps.get(app_id) { app.internal_state.clone() } else { @@ -274,12 +269,12 @@ impl AppManagerState { } } fn store_intent(&self, app_id: &str, intent: NavigationIntent) { - let mut intents = self.intents.write().unwrap(); + let mut intents = self.intents.write(); let _ = intents.insert(app_id.to_owned(), intent); } fn take_intent(&self, app_id: &str) -> Option { - let mut intents = self.intents.write().unwrap(); + let mut intents = self.intents.write(); intents.remove(app_id) } } diff --git a/core/main/src/service/apps/provider_broker.rs b/core/main/src/service/apps/provider_broker.rs index e6270156c..e1d891ee4 100644 --- a/core/main/src/service/apps/provider_broker.rs +++ b/core/main/src/service/apps/provider_broker.rs @@ -32,6 +32,7 @@ use ripple_sdk::{ gateway::rpc_gateway_api::{CallContext, CallerSession}, }, log::{debug, error, info, warn}, + parking_lot::RwLock, serde_json, tokio::sync::oneshot, utils::channel_utils::oneshot_send_and_log, @@ -39,10 +40,7 @@ use ripple_sdk::{ }; use serde::{Deserialize, Serialize}; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, sync::Arc}; use crate::{ service::apps::app_events::AppEvents, @@ -146,7 +144,7 @@ impl ProviderBroker { method: String, provider: CallContext, ) { - let mut provider_methods = pst.provider_broker_state.provider_methods.write().unwrap(); + let mut provider_methods = pst.provider_broker_state.provider_methods.write(); let cap_method = format!("{}:{}", capability, method); if let Some(method) = provider_methods.get(&cap_method) { // unregister the capability if it is provided by the session @@ -180,7 +178,7 @@ impl ProviderBroker { listen_request, ); { - let mut provider_methods = pst.provider_broker_state.provider_methods.write().unwrap(); + let mut provider_methods = pst.provider_broker_state.provider_methods.write(); provider_methods.insert( cap_method, ProviderMethod { @@ -205,7 +203,7 @@ impl ProviderBroker { } pub fn get_provider_methods(pst: &PlatformState) -> ProviderResult { - let provider_methods = pst.provider_broker_state.provider_methods.read().unwrap(); + let provider_methods = pst.provider_broker_state.provider_methods.read(); let mut result: HashMap> = HashMap::new(); let caps_keys = provider_methods.keys(); let all_caps = caps_keys.cloned().collect::>(); @@ -229,7 +227,7 @@ impl ProviderBroker { debug!("invoking provider for {}", cap_method); let provider_opt = { - let provider_methods = pst.provider_broker_state.provider_methods.read().unwrap(); + let provider_methods = pst.provider_broker_state.provider_methods.read(); provider_methods.get(&cap_method).cloned() }; if let Some(provider) = provider_opt { @@ -275,7 +273,7 @@ impl ProviderBroker { provider: ProviderMethod, ) -> String { let c_id = Uuid::new_v4().to_string(); - let mut active_sessions = pst.provider_broker_state.active_sessions.write().unwrap(); + let mut active_sessions = pst.provider_broker_state.active_sessions.write(); debug!("started provider session {} {}", c_id, request.capability); active_sessions.insert( c_id.clone(), @@ -296,7 +294,7 @@ impl ProviderBroker { // Remove any duplicate requests. ProviderBroker::remove_request(pst, &request.capability); - let mut request_queue = pst.provider_broker_state.request_queue.write().unwrap(); + let mut request_queue = pst.provider_broker_state.request_queue.write(); if request_queue.is_full() { warn!("invoke_method: Request queue full, removing oldest request"); request_queue.remove(0); @@ -309,7 +307,7 @@ impl ProviderBroker { "provider_response, {}, {:?}", resp.correlation_id, resp.result ); - let mut active_sessions = pst.provider_broker_state.active_sessions.write().unwrap(); + let mut active_sessions = pst.provider_broker_state.active_sessions.write(); match active_sessions.remove(&resp.correlation_id) { Some(session) => { oneshot_send_and_log(session.caller.tx, resp.result, "ProviderResponse"); @@ -331,7 +329,7 @@ impl ProviderBroker { } fn cleanup_caps_for_unregister(pst: &PlatformState, session_id: String) -> Vec { - let mut active_sessions = pst.provider_broker_state.active_sessions.write().unwrap(); + let mut active_sessions = pst.provider_broker_state.active_sessions.write(); let cid_keys = active_sessions.keys(); let all_cids = cid_keys.cloned().collect::>(); let mut clear_cids = Vec::::new(); @@ -353,7 +351,7 @@ impl ProviderBroker { for cid in clear_cids { active_sessions.remove(&cid); } - let mut provider_methods = pst.provider_broker_state.provider_methods.write().unwrap(); + let mut provider_methods = pst.provider_broker_state.provider_methods.write(); // find all providers for the session being unregistered // remove the provided capability let mut clear_caps = Vec::new(); @@ -384,7 +382,7 @@ impl ProviderBroker { } fn remove_request(pst: &PlatformState, capability: &String) -> Option { - let mut request_queue = pst.provider_broker_state.request_queue.write().unwrap(); + let mut request_queue = pst.provider_broker_state.request_queue.write(); let mut iter = request_queue.iter(); let cap = iter.position(|request| request.capability.eq(capability)); if let Some(index) = cap { @@ -400,7 +398,7 @@ impl ProviderBroker { _capability: String, request: FocusRequest, ) { - let mut active_sessions = pst.provider_broker_state.active_sessions.write().unwrap(); + let mut active_sessions = pst.provider_broker_state.active_sessions.write(); if let Some(session) = active_sessions.get_mut(&request.correlation_id) { session.focused = true; if pst.has_internal_launcher() { diff --git a/core/main/src/service/data_governance.rs b/core/main/src/service/data_governance.rs index aa473a4df..221809db7 100644 --- a/core/main/src/service/data_governance.rs +++ b/core/main/src/service/data_governance.rs @@ -25,10 +25,11 @@ use ripple_sdk::{ storage_property::StorageProperty, }, log::{debug, info}, + parking_lot::RwLock, utils::error::RippleError, }; use std::collections::HashSet; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use crate::{ processor::storage::storage_manager::StorageManager, state::platform_state::PlatformState, @@ -65,12 +66,12 @@ impl std::fmt::Debug for DataGovernanceState { impl DataGovernance { fn update_local_exclusion_policy(state: &DataGovernanceState, excl: ExclusionPolicy) { - let mut dg = state.exclusions.write().unwrap(); + let mut dg = state.exclusions.write(); *dg = Some(excl) } fn get_local_exclusion_policy(state: &DataGovernanceState) -> Option { - let dg = state.exclusions.read().unwrap(); + let dg = state.exclusions.read(); (*dg).clone() } diff --git a/core/main/src/service/extn/ripple_client.rs b/core/main/src/service/extn/ripple_client.rs index 3db983efd..3e9025b0c 100644 --- a/core/main/src/service/extn/ripple_client.rs +++ b/core/main/src/service/extn/ripple_client.rs @@ -15,7 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use ripple_sdk::{ api::{ @@ -35,6 +35,7 @@ use ripple_sdk::{ }, framework::RippleResponse, log::error, + parking_lot::RwLock, tokio::{ self, sync::{mpsc::Sender, oneshot}, @@ -129,7 +130,7 @@ impl RippleClient { } pub fn get_extn_client(&self) -> ExtnClient { - self.client.read().unwrap().clone() + self.client.read().clone() } pub async fn init(&self) { diff --git a/core/main/src/service/user_grants.rs b/core/main/src/service/user_grants.rs index 1b03d49c6..37e0f5de6 100644 --- a/core/main/src/service/user_grants.rs +++ b/core/main/src/service/user_grants.rs @@ -18,7 +18,7 @@ use std::{ collections::{HashMap, HashSet}, path::Path, - sync::{Arc, RwLock}, + sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -54,6 +54,7 @@ use ripple_sdk::{ }, framework::file_store::FileStore, log::{debug, error, warn}, + parking_lot::RwLock, serde_json::Value, tokio::sync::oneshot, utils::error::RippleError, @@ -110,7 +111,7 @@ impl GrantState { } fn check_device_grants(&self, grant_entry: &GrantEntry) -> Option { - let device_grants = self.device_grants.read().unwrap(); + let device_grants = self.device_grants.read(); if let Some(v) = device_grants.value.get(grant_entry) { return v.status.clone(); } @@ -118,7 +119,7 @@ impl GrantState { } fn check_app_grants(&self, grant_entry: &GrantEntry, app_id: &str) -> Option { - let grant_app_map = self.grant_app_map.read().unwrap(); + let grant_app_map = self.grant_app_map.read(); if let Some(app_map) = grant_app_map.value.get(app_id) { if let Some(v) = app_map.get(grant_entry) { return v.status.clone(); @@ -159,12 +160,7 @@ impl GrantState { } else { HashMap::default() }; - let grant_entries = platform_state - .cap_state - .grant_state - .grant_app_map - .read() - .unwrap(); + let grant_entries = platform_state.cap_state.grant_state.grant_app_map.read(); for (app_id, app_entries) in grant_entries.value.iter() { let mut grant_entry_set_to_remove: HashSet = HashSet::new(); for grant_entry in app_entries.iter() { @@ -191,12 +187,7 @@ impl GrantState { } else { HashMap::default() }; - let grant_entries = platform_state - .cap_state - .grant_state - .device_grants - .read() - .unwrap(); + let grant_entries = platform_state.cap_state.grant_state.device_grants.read(); let mut grant_entry_set_to_remove: HashSet = HashSet::new(); for grant_entry in grant_entries.value.iter() { if !grant_policies_map.contains_key(&grant_entry.capability) { @@ -229,12 +220,8 @@ impl GrantState { app_id_opt = Some(id.to_string()); } else { // Delete device grant in local storage and grant state - let mut device_grant_map_write = platform_state - .cap_state - .grant_state - .device_grants - .write() - .unwrap(); + let mut device_grant_map_write = + platform_state.cap_state.grant_state.device_grants.write(); device_grant_map_write .value @@ -260,12 +247,8 @@ impl GrantState { ) -> Option { let mut gc_opt: Option = None; { - let mut grant_app_map_write = platform_state - .cap_state - .grant_state - .grant_app_map - .write() - .unwrap(); + let mut grant_app_map_write = + platform_state.cap_state.grant_state.grant_app_map.write(); let entries = grant_app_map_write.value.entry(app_id).or_default(); if entries.contains(entry) { gc_opt = Some(entry.clone()); @@ -282,7 +265,7 @@ impl GrantState { new_entry: GrantEntry, ) { if let Some(app_id) = app_id { - let mut grant_state = self.grant_app_map.write().unwrap(); + let mut grant_state = self.grant_app_map.write(); //Get a mutable reference to the value associated with a key, create it if it doesn't exist, let entries = grant_state.value.entry(app_id).or_default(); if entries.contains(&new_entry) { @@ -298,7 +281,7 @@ impl GrantState { } pub fn clear_local_entries(&self, ps: &PlatformState, persistence_type: PolicyPersistenceType) { - let mut app_grant_state = self.grant_app_map.write().unwrap(); + let mut app_grant_state = self.grant_app_map.write(); for (_, entries) in app_grant_state.value.iter_mut() { entries.retain(|entry| { !self.check_grant_policy_persistence( @@ -311,7 +294,7 @@ impl GrantState { } app_grant_state.sync(); - let mut device_grant_state = self.device_grants.write().unwrap(); + let mut device_grant_state = self.device_grants.write(); device_grant_state.value.retain(|entry: &GrantEntry| { !self.check_grant_policy_persistence( ps, @@ -353,7 +336,7 @@ impl GrantState { F: FnMut(&GrantEntry) -> bool, { let mut deleted = false; - let mut grant_state = self.grant_app_map.write().unwrap(); + let mut grant_state = self.grant_app_map.write(); let entries = match grant_state.value.get_mut(&app_id) { Some(entries) => entries, None => return false, @@ -373,7 +356,7 @@ impl GrantState { pub fn delete_all_entries_for_lifespan(&self, lifespan: &GrantLifespan) -> bool { let mut deleted = false; { - let mut grant_state = self.grant_app_map.write().unwrap(); + let mut grant_state = self.grant_app_map.write(); for set in grant_state.value.values_mut() { let prev_len = set.len(); @@ -387,7 +370,7 @@ impl GrantState { } } { - let mut grant_state = self.device_grants.write().unwrap(); + let mut grant_state = self.device_grants.write(); let prev_len = grant_state.value.len(); grant_state .value @@ -406,7 +389,7 @@ impl GrantState { pub fn delete_expired_entries_for_app(&self, app_id: String) -> bool { let mut deleted = false; - let mut grant_state = self.grant_app_map.write().unwrap(); + let mut grant_state = self.grant_app_map.write(); let entries = match grant_state.value.get_mut(&app_id) { Some(entries) => entries, None => return false, @@ -422,7 +405,7 @@ impl GrantState { pub fn delete_expired_entries_for_device(&self) -> bool { let mut deleted = false; - let mut grant_state = self.device_grants.write().unwrap(); + let mut grant_state = self.device_grants.write(); let prev_len = grant_state.value.len(); grant_state.value.retain(|entry| !entry.has_expired()); if grant_state.value.len() < prev_len { @@ -434,7 +417,7 @@ impl GrantState { pub fn delete_all_expired_entries(&self) -> bool { // delete expired entries for app - let mut grant_state = self.grant_app_map.write().unwrap(); + let mut grant_state = self.grant_app_map.write(); for (_, entries) in grant_state.value.iter_mut() { entries.retain(|entry| !entry.has_expired()); } @@ -446,7 +429,7 @@ impl GrantState { } fn add_device_entry(&self, entry: GrantEntry) { - let mut device_grants = self.device_grants.write().unwrap(); + let mut device_grants = self.device_grants.write(); if entry.status.is_none() { device_grants.value.remove(&entry); } else { @@ -467,7 +450,7 @@ impl GrantState { return result; } - let grant_state = self.grant_app_map.read().unwrap(); + let grant_state = self.grant_app_map.read(); debug!("grant state: {:?}", grant_state); let entries = grant_state.value.get(app_id)?; @@ -490,7 +473,7 @@ impl GrantState { role: CapabilityRole, capability: &str, ) -> Option { - let grant_state = self.device_grants.read().unwrap(); + let grant_state = self.device_grants.read(); for entry in grant_state.value.iter() { if !entry.has_expired() && (entry.role == role) && (entry.capability == capability) { @@ -688,7 +671,7 @@ impl GrantState { // Returns all active and denied user grant entries for the given `app_id`. pub fn get_grant_entries_for_app_id(&self, app_id: String) -> HashSet { self.delete_expired_entries_for_app(app_id.clone()); - match self.grant_app_map.read().unwrap().value.get(&app_id) { + match self.grant_app_map.read().value.get(&app_id) { Some(x) => x.iter().cloned().collect(), None => HashSet::new(), } @@ -698,7 +681,7 @@ impl GrantState { // Pass None for device scope pub fn get_device_entries(&self) -> HashSet { self.delete_expired_entries_for_device(); - self.device_grants.read().unwrap().value.clone() + self.device_grants.read().value.clone() } // Returns all active and denied user grant entries for the given `capability` @@ -707,7 +690,7 @@ impl GrantState { capability: &str, ) -> HashMap> { self.delete_all_expired_entries(); - let grant_state = self.grant_app_map.read().unwrap(); + let grant_state = self.grant_app_map.read(); let mut grant_entry_map: HashMap> = HashMap::new(); for (app_id, app_entries) in grant_state.value.iter() { for item in app_entries { @@ -719,7 +702,7 @@ impl GrantState { } } } - let device_grants = self.device_grants.read().unwrap(); + let device_grants = self.device_grants.read(); let grant_sets = device_grants .value .iter() diff --git a/core/main/src/state/cap/cap_state.rs b/core/main/src/state/cap/cap_state.rs index 3e808dfe9..8a66fe181 100644 --- a/core/main/src/state/cap/cap_state.rs +++ b/core/main/src/state/cap/cap_state.rs @@ -18,14 +18,14 @@ use std::{ collections::HashSet, hash::{Hash, Hasher}, - sync::{Arc, RwLock}, + sync::Arc, }; use crate::{ service::{apps::app_events::AppEvents, user_grants::GrantState}, state::platform_state::PlatformState, }; -use ripple_sdk::{api::firebolt::fb_capabilities::RolePermission, serde_json}; +use ripple_sdk::{api::firebolt::fb_capabilities::RolePermission, parking_lot::RwLock, serde_json}; use ripple_sdk::{ api::{ firebolt::{ @@ -72,7 +72,7 @@ impl CapState { event: CapEvent, request: CapListenRPCRequest, ) { - let mut r = ps.cap_state.primed_listeners.write().unwrap(); + let mut r = ps.cap_state.primed_listeners.write(); if let Some(cap) = FireboltCap::parse(request.capability) { let check = CapEventEntry { app_id: call_context.app_id.clone(), @@ -114,7 +114,7 @@ impl CapState { cap: &FireboltCap, app_id: Option, ) -> bool { - let r = ps.cap_state.primed_listeners.read().unwrap(); + let r = ps.cap_state.primed_listeners.read(); debug!("primed entries {:?}", r); if r.iter().any(|x| { if matches!(&x.event, _event) && &x.cap == cap { diff --git a/core/main/src/state/cap/generic_cap_state.rs b/core/main/src/state/cap/generic_cap_state.rs index 9db75cf9b..29b84d5c5 100644 --- a/core/main/src/state/cap/generic_cap_state.rs +++ b/core/main/src/state/cap/generic_cap_state.rs @@ -17,7 +17,7 @@ use std::{ collections::{HashMap, HashSet}, - sync::{Arc, RwLock}, + sync::Arc, }; use ripple_sdk::{ @@ -28,6 +28,7 @@ use ripple_sdk::{ manifest::device_manifest::DeviceManifest, }, log::debug, + parking_lot::RwLock, }; use crate::state::platform_state::PlatformState; @@ -55,7 +56,7 @@ impl GenericCapState { } pub fn ingest_supported(&self, request: Vec) { - let mut supported = self.supported.write().unwrap(); + let mut supported = self.supported.write(); supported.extend( request .iter() @@ -65,7 +66,7 @@ impl GenericCapState { } pub fn ingest_availability(&self, request: Vec, is_available: bool) { - let mut not_available = self.not_available.write().unwrap(); + let mut not_available = self.not_available.write(); for cap in request { if is_available { not_available.remove(&cap.as_str()); @@ -77,7 +78,7 @@ impl GenericCapState { } pub fn check_for_processor(&self, request: Vec) -> HashMap { - let supported = self.supported.read().unwrap(); + let supported = self.supported.read(); let mut result = HashMap::new(); for cap in request { result.insert(cap.clone(), supported.contains(&cap)); @@ -86,7 +87,7 @@ impl GenericCapState { } pub fn check_supported(&self, request: &[FireboltPermission]) -> Result<(), DenyReasonWithCap> { - let supported = self.supported.read().unwrap(); + let supported = self.supported.read(); let not_supported: Vec = request .iter() .filter(|fb_perm| !supported.contains(&fb_perm.cap.as_str())) @@ -111,7 +112,7 @@ impl GenericCapState { &self, request: &Vec, ) -> Result<(), DenyReasonWithCap> { - let not_available = self.not_available.read().unwrap(); + let not_available = self.not_available.read(); let mut result: Vec = Vec::new(); for fb_perm in request { if fb_perm.role == CapabilityRole::Use && not_available.contains(&fb_perm.cap.as_str()) diff --git a/core/main/src/state/cap/permitted_state.rs b/core/main/src/state/cap/permitted_state.rs index 2f59d0a2d..4450d2256 100644 --- a/core/main/src/state/cap/permitted_state.rs +++ b/core/main/src/state/cap/permitted_state.rs @@ -18,7 +18,7 @@ use std::{ collections::{HashMap, HashSet}, path::Path, - sync::{Arc, RwLock}, + sync::Arc, }; use ripple_sdk::{ @@ -38,6 +38,7 @@ use ripple_sdk::{ extn::extn_client_message::{ExtnPayload, ExtnResponse}, framework::{file_store::FileStore, RippleResponse}, log::{debug, error, info}, + parking_lot::RwLock, tokio, utils::error::RippleError, }; @@ -66,24 +67,24 @@ impl PermittedState { } fn ingest(&mut self, extend_perms: HashMap>) { - let mut perms = self.permitted.write().unwrap(); + let mut perms = self.permitted.write(); perms.value.extend(extend_perms); perms.sync(); } #[cfg(test)] pub fn set_permissions(&mut self, permissions: HashMap>) { - let mut perms = self.permitted.write().unwrap(); + let mut perms = self.permitted.write(); perms.value = permissions; perms.sync(); } fn get_all_permissions(&self) -> HashMap> { - self.permitted.read().unwrap().value.clone() + self.permitted.read().value.clone() } fn has_cached_permissions(&self, app_id: &String) -> bool { // check if the app has permissions cached - self.permitted.read().unwrap().value.contains_key(app_id) + self.permitted.read().value.contains_key(app_id) } pub fn check_cap_role(&self, app_id: &str, role_info: &RoleInfo) -> Result { let role = role_info diff --git a/core/main/src/state/extn_state.rs b/core/main/src/state/extn_state.rs index 085589d1f..9a01b6773 100644 --- a/core/main/src/state/extn_state.rs +++ b/core/main/src/state/extn_state.rs @@ -15,11 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - thread, -}; +use std::{collections::HashMap, sync::Arc, thread}; use jsonrpsee::core::server::rpc_module::Methods; use ripple_sdk::{ @@ -35,6 +31,7 @@ use ripple_sdk::{ }, libloading::Library, log::info, + parking_lot::RwLock, tokio::sync::mpsc, utils::error::RippleError, }; @@ -144,7 +141,7 @@ impl ExtnState { } pub fn update_extn_status(&self, id: ExtnId, status: ExtnStatus) { - let mut extn_status_map = self.extn_status_map.write().unwrap(); + let mut extn_status_map = self.extn_status_map.write(); let _ = extn_status_map.insert(id.to_string(), status); } @@ -152,7 +149,6 @@ impl ExtnState { if let Some(ExtnStatus::Ready) = self .extn_status_map .read() - .unwrap() .get(extn_id.to_string().as_str()) { return true; @@ -166,18 +162,18 @@ impl ExtnState { return true; } } - let mut extn_status_listeners = self.extn_status_listeners.write().unwrap(); + let mut extn_status_listeners = self.extn_status_listeners.write(); let _ = extn_status_listeners.insert(id.to_string(), sender); false } pub fn get_extn_status_listener(&self, id: ExtnId) -> Option> { - let extn_status_listeners = self.extn_status_listeners.read().unwrap(); + let extn_status_listeners = self.extn_status_listeners.read(); extn_status_listeners.get(id.to_string().as_str()).cloned() } pub fn clear_status_listener(&self, extn_id: ExtnId) { - let mut extn_status_listeners = self.extn_status_listeners.write().unwrap(); + let mut extn_status_listeners = self.extn_status_listeners.write(); let _ = extn_status_listeners.remove(extn_id.to_string().as_str()); } @@ -210,11 +206,11 @@ impl ExtnState { } pub fn extend_methods(&self, methods: Methods) { - let mut methods_state = self.extn_methods.write().unwrap(); + let mut methods_state = self.extn_methods.write(); let _ = methods_state.merge(methods); } pub fn get_extn_methods(&self) -> Methods { - self.extn_methods.read().unwrap().clone() + self.extn_methods.read().clone() } } diff --git a/core/main/src/state/metrics_state.rs b/core/main/src/state/metrics_state.rs index 83770830b..e9ca1d3c7 100644 --- a/core/main/src/state/metrics_state.rs +++ b/core/main/src/state/metrics_state.rs @@ -15,10 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashSet, - sync::{Arc, RwLock}, -}; +use std::{collections::HashSet, sync::Arc}; use jsonrpsee::tracing::debug; use ripple_sdk::{ @@ -32,6 +29,7 @@ use ripple_sdk::{ chrono::{DateTime, Utc}, extn::extn_client_message::ExtnResponse, log::error, + parking_lot::RwLock, utils::error::RippleError, }; @@ -54,7 +52,7 @@ pub struct MetricsState { impl MetricsState { fn send_context_update_request(platform_state: &PlatformState) { let extn_client = platform_state.get_client().get_extn_client(); - let metrics_context = platform_state.metrics.context.read().unwrap().clone(); + let metrics_context = platform_state.metrics.context.read().clone(); if let Err(e) = extn_client .request_transient(RippleContextUpdateRequest::MetricsContext(metrics_context)) @@ -67,13 +65,13 @@ impl MetricsState { } pub fn get_context(&self) -> MetricsContext { - self.context.read().unwrap().clone() + self.context.read().clone() } pub fn get_privacy_settings_cache(&self) -> PrivacySettingsData { - self.privacy_settings_cache.read().unwrap().clone() + self.privacy_settings_cache.read().clone() } pub fn update_privacy_settings_cache(&self, value: &PrivacySettingsData) { - let mut cache = self.privacy_settings_cache.write().unwrap(); + let mut cache = self.privacy_settings_cache.write(); *cache = value.clone(); } pub async fn initialize(state: &PlatformState) { @@ -171,7 +169,7 @@ impl MetricsState { { // Time to set them - let mut context = state.metrics.context.write().unwrap(); + let mut context = state.metrics.context.write(); context.enabled = metrics_enabled; @@ -230,7 +228,7 @@ impl MetricsState { pub async fn update_account_session(state: &PlatformState) { { - let mut context = state.metrics.context.write().unwrap(); + let mut context = state.metrics.context.write(); let account_session = state.session_state.get_account_session(); if let Some(session) = account_session { context.account_id = session.account_id; @@ -246,7 +244,7 @@ impl MetricsState { } pub fn operational_telemetry_listener(&self, target: &str, listen: bool) { - let mut listeners = self.operational_telemetry_listeners.write().unwrap(); + let mut listeners = self.operational_telemetry_listeners.write(); if listen { listeners.insert(target.to_string()); } else { @@ -257,7 +255,6 @@ impl MetricsState { pub fn get_listeners(&self) -> Vec { self.operational_telemetry_listeners .read() - .unwrap() .iter() .map(|x| x.to_owned()) .collect() @@ -266,7 +263,7 @@ impl MetricsState { pub fn update_session_id(&self, platform_state: PlatformState, value: Option) { let value = value.unwrap_or_default(); { - let mut context = self.context.write().unwrap(); + let mut context = self.context.write(); context.device_session_id = value; } Self::send_context_update_request(&platform_state); diff --git a/core/main/src/state/openrpc_state.rs b/core/main/src/state/openrpc_state.rs index 9908dc457..26cb1eab3 100644 --- a/core/main/src/state/openrpc_state.rs +++ b/core/main/src/state/openrpc_state.rs @@ -15,22 +15,22 @@ // SPDX-License-Identifier: Apache-2.0 // -use ripple_sdk::api::{ - firebolt::{ - fb_capabilities::FireboltPermission, - fb_openrpc::{ - CapabilitySet, FireboltOpenRpc, FireboltOpenRpcMethod, FireboltVersionManifest, - OpenRPCParser, +use ripple_sdk::{ + api::{ + firebolt::{ + fb_capabilities::FireboltPermission, + fb_openrpc::{ + CapabilityPolicy, CapabilitySet, FireboltOpenRpc, FireboltOpenRpcMethod, + FireboltVersionManifest, OpenRPCParser, + }, }, + manifest::exclusory::{Exclusory, ExclusoryImpl}, }, - manifest::exclusory::{Exclusory, ExclusoryImpl}, -}; -use ripple_sdk::log::error; -use ripple_sdk::{api::firebolt::fb_openrpc::CapabilityPolicy, serde_json}; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, + log::error, + parking_lot::RwLock, + serde_json, }; +use std::{collections::HashMap, sync::Arc}; #[derive(Debug, Clone)] pub enum ApiSurface { @@ -115,7 +115,7 @@ impl OpenRpcState { ApiSurface::Firebolt => self.firebolt_cap_map.clone(), ApiSurface::Ripple => self.ripple_cap_map.clone(), }; - let cap_set_opt = { cap_map.read().unwrap().get(method).cloned() }; + let cap_set_opt = { cap_map.read().get(method).cloned() }; if let Some(cap_set) = cap_set_opt { perm_list = cap_set.into_firebolt_permissions_vec(); result = Some(perm_list); @@ -125,16 +125,16 @@ impl OpenRpcState { } pub fn get_capability_policy(&self, cap: String) -> Option { - self.cap_policies.read().unwrap().get(&cap).cloned() + self.cap_policies.read().get(&cap).cloned() } pub fn extend_caps(&self, caps: HashMap) { - let mut cap_map = self.firebolt_cap_map.write().unwrap(); + let mut cap_map = self.firebolt_cap_map.write(); cap_map.extend(caps); } pub fn extend_policies(&self, policies: HashMap) { - let mut cap_policies = self.cap_policies.write().unwrap(); + let mut cap_policies = self.cap_policies.write(); cap_policies.extend(policies); } @@ -152,7 +152,7 @@ impl OpenRpcState { } } { - let ext_rpcs = self.extended_rpc.read().unwrap(); + let ext_rpcs = self.extended_rpc.read(); for ext_rpc in ext_rpcs.iter() { if let Some(method) = ext_rpc.methods.iter().find(|x| x.is_named(property)) { // Checking if the property tag is havin x-allow-value extension. @@ -193,7 +193,7 @@ impl OpenRpcState { return Some(v); } { - let ext_rpcs = self.extended_rpc.read().unwrap(); + let ext_rpcs = self.extended_rpc.read(); for ext_rpc in ext_rpcs.iter() { if let Some(v) = ext_rpc .methods diff --git a/core/main/src/state/session_state.rs b/core/main/src/state/session_state.rs index a9bf28d95..79eea7bdc 100644 --- a/core/main/src/state/session_state.rs +++ b/core/main/src/state/session_state.rs @@ -15,10 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, sync::Arc}; use ripple_sdk::{ api::{ @@ -26,6 +23,7 @@ use ripple_sdk::{ gateway::rpc_gateway_api::{ApiMessage, CallContext}, session::{AccountSession, ProvisionRequest}, }, + parking_lot::RwLock, tokio::sync::mpsc::Sender, utils::error::RippleError, }; @@ -104,7 +102,7 @@ pub struct PendingSessionInfo { impl SessionState { pub fn insert_session_token(&self, token: String) { - let mut session_state = self.account_session.write().unwrap(); + let mut session_state = self.account_session.write(); let account_session = session_state.take(); if let Some(mut session) = account_session { session.token = token; @@ -113,12 +111,12 @@ impl SessionState { } pub fn insert_account_session(&self, account_session: AccountSession) { - let mut session_state = self.account_session.write().unwrap(); + let mut session_state = self.account_session.write(); let _ = session_state.insert(account_session); } pub fn get_account_session(&self) -> Option { - let session_state = self.account_session.read().unwrap(); + let session_state = self.account_session.read(); if let Some(session) = session_state.clone() { return Some(session); } @@ -127,7 +125,7 @@ impl SessionState { } pub fn get_app_id(&self, session_id: String) -> Option { - let session_map = self.session_map.read().unwrap(); + let session_map = self.session_map.read(); if let Some(session) = session_map.get(&session_id) { return Some(session.get_app_id()); } @@ -135,21 +133,21 @@ impl SessionState { } pub fn has_session(&self, ctx: &CallContext) -> bool { - self.session_map.read().unwrap().contains_key(&ctx.get_id()) + self.session_map.read().contains_key(&ctx.get_id()) } pub fn add_session(&self, id: String, session: Session) { - let mut session_state = self.session_map.write().unwrap(); + let mut session_state = self.session_map.write(); session_state.insert(id, session); } pub fn clear_session(&self, id: &str) { - let mut session_state = self.session_map.write().unwrap(); + let mut session_state = self.session_map.write(); session_state.remove(id); } pub fn update_account_session(&self, provision: ProvisionRequest) { - let mut session_state = self.account_session.write().unwrap(); + let mut session_state = self.account_session.write(); let account_session = session_state.take(); if let Some(mut session) = account_session { session.device_id = provision.device_id; @@ -162,7 +160,7 @@ impl SessionState { } pub fn get_session(&self, ctx: &CallContext) -> Option { - let session_state = self.session_map.read().unwrap(); + let session_state = self.session_map.read(); if let Some(cid) = &ctx.cid { session_state.get(cid).cloned() } else { @@ -171,12 +169,12 @@ impl SessionState { } pub fn get_session_for_connection_id(&self, cid: &str) -> Option { - let session_state = self.session_map.read().unwrap(); + let session_state = self.session_map.read(); session_state.get(cid).cloned() } pub fn add_pending_session(&self, app_id: String, info: Option) { - let mut pending_sessions = self.pending_sessions.write().unwrap(); + let mut pending_sessions = self.pending_sessions.write(); if info.is_none() && pending_sessions.get(&app_id).is_some() { return; } @@ -184,10 +182,10 @@ impl SessionState { } pub fn clear_pending_session(&self, app_id: &String) { - self.pending_sessions.write().unwrap().remove(app_id); + self.pending_sessions.write().remove(app_id); } pub fn get_pending_session_info(&self, app_id: &String) -> Option> { - self.pending_sessions.read().unwrap().get(app_id).cloned() + self.pending_sessions.read().get(app_id).cloned() } } diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index be45f5f06..d5aa554ec 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -54,7 +54,7 @@ jsonrpsee-core = { version = "0.9.0", features = ["server"] } regex = "=1.7.3" async-channel = "=2.1.0" tree_magic_mini = { version = "=3.0.3", optional = true} - +parking_lot = "0.12.1" [dev-dependencies] ripple_sdk = { path = ".", features=["tdk"]} diff --git a/core/sdk/src/api/apps.rs b/core/sdk/src/api/apps.rs index 55d1b8ab7..22ec40bf7 100644 --- a/core/sdk/src/api/apps.rs +++ b/core/sdk/src/api/apps.rs @@ -15,8 +15,9 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::sync::{Arc, RwLock}; +use std::sync::Arc; +use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::oneshot; @@ -177,12 +178,13 @@ impl AppRequest { } pub fn send_response(&self, response: AppResponse) -> Result<(), RippleError> { - let mut sender = self.resp_tx.write().unwrap(); - if sender.is_some() { - oneshot_send_and_log(sender.take().unwrap(), response, "AppManager response"); - Ok(()) - } else { - Err(RippleError::SenderMissing) + let mut sender = self.resp_tx.write(); + match sender.take() { + Some(tx) => { + oneshot_send_and_log(tx, response, "AppManager response"); + Ok(()) + } + None => Err(RippleError::SenderMissing), } } } @@ -452,7 +454,7 @@ mod tests { // Drop the lock explicitly { - let sender_lock = app_request.resp_tx.read().unwrap(); + let sender_lock = app_request.resp_tx.read(); assert!(sender_lock.is_none()); } // Lock is released here diff --git a/core/sdk/src/extn/client/extn_client.rs b/core/sdk/src/extn/client/extn_client.rs index d5e406e39..c6aa428fd 100644 --- a/core/sdk/src/extn/client/extn_client.rs +++ b/core/sdk/src/extn/client/extn_client.rs @@ -15,11 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_channel::{bounded, Receiver as CReceiver, Sender as CSender}; use chrono::Utc; @@ -29,6 +25,7 @@ use log::{debug, error, info, trace}; #[cfg(test)] use {println as info, println as trace, println as debug, println as error}; +use parking_lot::RwLock; use tokio::sync::{ mpsc::Sender as MSender, oneshot::{self, Sender as OSender}, @@ -86,12 +83,12 @@ pub struct ExtnClient { } fn add_stream_processor

(id: String, context: P, map: Arc>>) { - let mut processor_state = map.write().unwrap(); + let mut processor_state = map.write(); processor_state.insert(id, context); } fn add_vec_stream_processor

(id: String, context: P, map: Arc>>>) { - let mut processor_state = map.write().unwrap(); + let mut processor_state = map.write(); if let std::collections::hash_map::Entry::Vacant(e) = processor_state.entry(id.clone()) { e.insert(vec![context]); } else { @@ -101,13 +98,13 @@ fn add_vec_stream_processor

(id: String, context: P, map: Arc(id: String, processor: Option

, map: Arc>>) { if let Some(processor) = processor { - let mut processor_state = map.write().unwrap(); + let mut processor_state = map.write(); processor_state.insert(id, processor); } } pub fn remove_processor

(id: String, map: Arc>>) { - let mut processor_state = map.write().unwrap(); + let mut processor_state = map.write(); let sender = processor_state.remove(&id); drop(sender); } @@ -203,7 +200,7 @@ impl ExtnClient { let id = id.to_string(); { // creating a map - extnId & sender used for requestor mapping to add to callback in extnMessage - let mut sender_map = self.extn_sender_map.write().unwrap(); + let mut sender_map = self.extn_sender_map.write(); sender_map.insert(id.clone(), sender); } { @@ -219,7 +216,7 @@ impl ExtnClient { None => error!("Unknown contract {}", contract), } } - let mut contract_map = self.contract_map.write().unwrap(); + let mut contract_map = self.contract_map.write(); contract_map.extend(map); } } @@ -227,7 +224,6 @@ impl ExtnClient { pub fn get_other_senders(&self) -> Vec> { self.extn_sender_map .read() - .unwrap() .iter() .inspect(|item| debug!("other sender: {:?}", item.0)) .map(|(_, v)| v) @@ -281,7 +277,7 @@ impl ExtnClient { message ); { - let mut ripple_context = self.ripple_context.write().unwrap(); + let mut ripple_context = self.ripple_context.write(); ripple_context.deep_copy(context); } } @@ -367,14 +363,14 @@ impl ExtnClient { // context members then it propagates the event to other extension's extn client. // Propagating 'known information' to other clients increases processing but no meaningful task is performed. let propagate = { - let mut ripple_context = self.ripple_context.write().unwrap(); + let mut ripple_context = self.ripple_context.write(); debug!( "Received context request: {:?} current ripple_context: {:?}", request, ripple_context ); ripple_context.update(request) }; - let new_context = { self.ripple_context.read().unwrap().clone() }; + let new_context = { self.ripple_context.read().clone() }; let message = new_context.get_event_message(); if propagate { debug!("Formed Context update event: {:?}", message); @@ -408,7 +404,7 @@ impl ExtnClient { ) { let id_c = msg.id.clone(); let processor_result = { - let mut processors = processor.write().unwrap(); + let mut processors = processor.write(); processors.remove(&id_c) }; @@ -430,7 +426,7 @@ impl ExtnClient { let id_c: String = msg.target.as_clear_string(); let v = { - let processors = processor.read().unwrap(); + let processors = processor.read(); processors.get(&id_c).cloned() }; if let Some(sender) = v { @@ -454,7 +450,7 @@ impl ExtnClient { let mut gc_sender_indexes: Vec = Vec::new(); let read_processor = processor.clone(); { - let processors = read_processor.read().unwrap(); + let processors = read_processor.read(); let v = processors.get(&id_c).cloned(); if let Some(v) = v { for (index, s) in v.iter().enumerate() { @@ -492,7 +488,7 @@ impl ExtnClient { ) { let indices = match gc_sender_indexes { Some(i) => Some(i), - None => processor.read().unwrap().get(&id_c).map(|v| { + None => processor.read().get(&id_c).map(|v| { v.iter() .filter(|x| x.is_closed()) .enumerate() @@ -502,7 +498,7 @@ impl ExtnClient { }; if let Some(indices) = indices { if !indices.is_empty() { - let mut gc_cleanup = processor.write().unwrap(); + let mut gc_cleanup = processor.write(); if let Some(sender_list) = gc_cleanup.get_mut(&id_c) { for index in indices { let r = sender_list.remove(index); @@ -521,13 +517,7 @@ impl ExtnClient { contract: RippleContract, ) -> Option> { let contract_str: String = contract.as_clear_string(); - let id = { - self.contract_map - .read() - .unwrap() - .get(&contract_str) - .cloned() - }; + let id = { self.contract_map.read().get(&contract_str).cloned() }; if let Some(extn_id) = id { return self.get_extn_sender_with_extn_id(&extn_id); } @@ -536,7 +526,7 @@ impl ExtnClient { } fn get_extn_sender_with_extn_id(&self, id: &str) -> Option> { - return self.extn_sender_map.read().unwrap().get(id).cloned(); + return self.extn_sender_map.read().get(id).cloned(); } /// Critical method used by request processors to send response message back to the requestor @@ -708,7 +698,7 @@ impl ExtnClient { } pub fn has_token(&self) -> bool { - let ripple_context = self.ripple_context.read().unwrap(); + let ripple_context = self.ripple_context.read(); // matches!( // ripple_context.activation_status.clone(), // ActivationStatus::AccountToken(_) @@ -721,35 +711,35 @@ impl ExtnClient { pub fn get_activation_status(&self) -> Option { // pub fn get_activation_status(&self) -> ActivationStatus { - let ripple_context = self.ripple_context.read().unwrap(); + let ripple_context = self.ripple_context.read(); ripple_context.activation_status.clone() } pub fn has_internet(&self) -> bool { - let ripple_context = self.ripple_context.read().unwrap(); + let ripple_context = self.ripple_context.read(); matches!( ripple_context.internet_connectivity.as_ref(), Some(internet_connectivity) if matches!(internet_connectivity, InternetConnectionStatus::FullyConnected | InternetConnectionStatus::LimitedInternet) ) } pub fn internet_status(&self) -> Option { - let ripple_contract = self.ripple_context.read().unwrap(); + let ripple_contract = self.ripple_context.read(); ripple_contract.internet_connectivity.clone() } pub fn get_timezone(&self) -> Option { - let ripple_context = self.ripple_context.read().unwrap(); + let ripple_context = self.ripple_context.read(); // Some(ripple_context.time_zone.clone()) ripple_context.time_zone.clone() } pub fn get_features(&self) -> Vec { - let ripple_context = self.ripple_context.read().unwrap(); + let ripple_context = self.ripple_context.read(); ripple_context.features.clone() } pub fn get_metrics_context(&self) -> Option { - let ripple_context = self.ripple_context.read().unwrap(); + let ripple_context = self.ripple_context.read(); ripple_context.metrics_context.clone() } } @@ -847,7 +837,7 @@ pub mod tests { extn_client.request_processors.clone(), ); - assert!(extn_client.request_processors.read().unwrap().len() == 1); + assert!(extn_client.request_processors.read().len() == 1); } #[test] @@ -863,12 +853,11 @@ pub mod tests { extn_client.event_processors.clone(), ); - assert!(extn_client.event_processors.read().unwrap().len() == 1); + assert!(extn_client.event_processors.read().len() == 1); assert_eq!( extn_client .event_processors .read() - .unwrap() .get(&id) .map(|v| v.len()), Some(1), @@ -883,7 +872,7 @@ pub mod tests { let (tx, _rx) = oneshot::channel(); add_single_processor(id, Some(tx), extn_client.response_processors.clone()); - assert!(extn_client.response_processors.read().unwrap().len() == 1); + assert!(extn_client.response_processors.read().len() == 1); } #[tokio::test(flavor = "multi_thread")] @@ -895,7 +884,7 @@ pub mod tests { extn_client.add_request_processor(processor); tokio::time::sleep(Duration::from_millis(10)).await; - assert!(extn_client.request_processors.read().unwrap().len() == 1); + assert!(extn_client.request_processors.read().len() == 1); validate(|captured_logs| { for log in captured_logs { assert!(log @@ -914,12 +903,11 @@ pub mod tests { let id = processor.contract().as_clear_string(); extn_client.add_event_processor(processor); - assert!(extn_client.event_processors.read().unwrap().len() == 1); + assert!(extn_client.event_processors.read().len() == 1); assert_eq!( extn_client .event_processors .read() - .unwrap() .get(&id) .map(|v| v.len()), Some(1), @@ -1069,7 +1057,7 @@ pub mod tests { drop(rx); assert_eq!( - extn_client.event_processors.read().unwrap().len(), + extn_client.event_processors.read().len(), 1, "Assertion failed: event_processors map should be empty before cleanup" ); @@ -1077,7 +1065,7 @@ pub mod tests { ExtnClient::cleanup_vec_stream(id_clone, None, processor_clone); assert_eq!( - extn_client.event_processors.read().unwrap().len(), + extn_client.event_processors.read().len(), 0, "Assertion failed: event_processors map should be empty after cleanup" ); @@ -1459,7 +1447,7 @@ pub mod tests { tokio::time::sleep(Duration::from_millis(100)).await; assert!(result.is_ok()); - let ripple_context = main_client.ripple_context.read().unwrap(); + let ripple_context = main_client.ripple_context.read(); assert_eq!( ripple_context.time_zone.as_ref().unwrap().time_zone, time_zone @@ -1503,7 +1491,7 @@ pub mod tests { // how to verify the event response in other sender? tokio::time::sleep(Duration::from_millis(100)).await; assert_eq!( - extn_client.event_processors.read().unwrap().len(), + extn_client.event_processors.read().len(), 0, "Assertion failed: event_processors map should be empty after cleanup" ); @@ -1544,7 +1532,7 @@ pub mod tests { }); extn_client.context_update(request); - let ripple_context = extn_client.ripple_context.read().unwrap(); + let ripple_context = extn_client.ripple_context.read(); assert!( matches!(&ripple_context.time_zone, Some(time_zone) if time_zone.time_zone == test_string && time_zone.offset == 1) @@ -1625,10 +1613,10 @@ pub mod tests { Some(tx), extn_client.response_processors.clone(), ); - assert!(extn_client.response_processors.read().unwrap().len() == 1); + assert!(extn_client.response_processors.read().len() == 1); } else { - extn_client.response_processors.write().unwrap().clear(); - assert!(extn_client.response_processors.read().unwrap().len() == 0); + extn_client.response_processors.write().clear(); + assert!(extn_client.response_processors.read().len() == 0); } let msg = ExtnMessage { @@ -1674,15 +1662,15 @@ pub mod tests { MockRequestProcessor::new_v1(extn_client.clone(), vec![RippleContract::Internal]); if tc.contains("req processor err") { - extn_client.request_processors.write().unwrap().clear(); - assert!(extn_client.request_processors.read().unwrap().len() == 0); + extn_client.request_processors.write().clear(); + assert!(extn_client.request_processors.read().len() == 0); } else { add_stream_processor( processor.contract().as_clear_string(), processor.sender(), extn_client.request_processors.clone(), ); - assert!(extn_client.request_processors.read().unwrap().len() == 1); + assert!(extn_client.request_processors.read().len() == 1); } add_stream_processor( @@ -1691,7 +1679,7 @@ pub mod tests { extn_client.request_processors.clone(), ); - assert!(extn_client.request_processors.read().unwrap().len() == 1); + assert!(extn_client.request_processors.read().len() == 1); let msg = ExtnMessage { id: "some-id".to_string(), @@ -1743,12 +1731,11 @@ pub mod tests { extn_client.event_processors.clone(), ); - assert!(extn_client.event_processors.read().unwrap().len() == 1); + assert!(extn_client.event_processors.read().len() == 1); assert_eq!( extn_client .event_processors .read() - .unwrap() .get(&id) .map(|v| v.len()), Some(1), @@ -1788,7 +1775,7 @@ pub mod tests { Some(tx), extn_client.response_processors.clone(), ); - assert!(extn_client.response_processors.read().unwrap().len() == 1); + assert!(extn_client.response_processors.read().len() == 1); let req = ExtnMessage { id: id.clone(), @@ -1833,7 +1820,7 @@ pub mod tests { Some(tx), extn_client.response_processors.clone(), ); - assert!(extn_client.response_processors.read().unwrap().len() == 1); + assert!(extn_client.response_processors.read().len() == 1); let msg = ExtnMessage { id: id.clone(), @@ -2094,7 +2081,7 @@ pub mod tests { // Set activation status to AccountToken { - let mut ripple_context = extn_client.ripple_context.write().unwrap(); + let mut ripple_context = extn_client.ripple_context.write(); ripple_context.activation_status = Some(ActivationStatus::AccountToken(AccountToken { token: "some_token".to_string(), expires: 123, @@ -2107,7 +2094,7 @@ pub mod tests { // Reset activation status to None { - let mut ripple_context = extn_client.ripple_context.write().unwrap(); + let mut ripple_context = extn_client.ripple_context.write(); ripple_context.activation_status = None; } @@ -2122,7 +2109,7 @@ pub mod tests { // Set activation status to AccountToken { - let mut ripple_context = extn_client.ripple_context.write().unwrap(); + let mut ripple_context = extn_client.ripple_context.write(); ripple_context.activation_status = Some(ActivationStatus::AccountToken(AccountToken { token: "some_token".to_string(), expires: 123, @@ -2141,7 +2128,7 @@ pub mod tests { // Reset activation status to None { - let mut ripple_context = extn_client.ripple_context.write().unwrap(); + let mut ripple_context = extn_client.ripple_context.write(); ripple_context.activation_status = None; } @@ -2161,11 +2148,7 @@ pub mod tests { #[tokio::test] async fn test_has_internet(connectivity: InternetConnectionStatus, expected_result: bool) { let extn_client = ExtnClient::mock(); - extn_client - .ripple_context - .write() - .unwrap() - .internet_connectivity = Some(connectivity); + extn_client.ripple_context.write().internet_connectivity = Some(connectivity); let has_internet = extn_client.has_internet(); assert_eq!(has_internet, expected_result); @@ -2179,7 +2162,7 @@ pub mod tests { offset: -5, }; - extn_client.ripple_context.write().unwrap().time_zone = Some(test_timezone.clone()); + extn_client.ripple_context.write().time_zone = Some(test_timezone.clone()); let result = extn_client.get_timezone(); assert_eq!(result, Some(test_timezone)); } diff --git a/core/sdk/src/framework/bootstrap.rs b/core/sdk/src/framework/bootstrap.rs index e81c478d2..3b63b1ebb 100644 --- a/core/sdk/src/framework/bootstrap.rs +++ b/core/sdk/src/framework/bootstrap.rs @@ -15,10 +15,11 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use async_trait::async_trait; use log::debug; +use parking_lot::RwLock; use tokio::sync::mpsc::{Receiver, Sender}; use crate::utils::error::RippleError; @@ -83,11 +84,10 @@ impl TransientChannel { } pub fn get_receiver(&self) -> Result, RippleError> { - let mut tr = self.tr.write().unwrap(); - if tr.is_some() { - return Ok(tr.take().unwrap()); + let mut tr = self.tr.write(); + match tr.take() { + Some(receiver) => Ok(receiver), + None => Err(RippleError::InvalidAccess), } - - Err(RippleError::InvalidAccess) } } diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index 38576d75b..43a99c8ee 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -29,6 +29,7 @@ pub extern crate chrono; pub extern crate futures; pub extern crate libloading; pub extern crate log; +pub extern crate parking_lot; pub extern crate semver; pub extern crate serde; pub extern crate serde_json; diff --git a/core/sdk/src/utils/time_utils.rs b/core/sdk/src/utils/time_utils.rs index 03b3494ce..fd530bea7 100644 --- a/core/sdk/src/utils/time_utils.rs +++ b/core/sdk/src/utils/time_utils.rs @@ -16,10 +16,8 @@ // use chrono::{LocalResult, TimeZone, Utc}; -use std::{ - sync::{Arc, Mutex}, - time::Duration, -}; +use parking_lot::Mutex; +use std::{sync::Arc, time::Duration}; use futures::Future; use tokio::task::JoinHandle; @@ -71,7 +69,7 @@ impl Timer { let cancelled = cancelled_flag_mutex.clone(); let handle = tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(delay_ms)).await; - let cancelled = { *cancelled_flag_mutex.lock().unwrap() }; + let cancelled = { *cancelled_flag_mutex.lock() }; if !cancelled { callback.await; } @@ -80,7 +78,7 @@ impl Timer { } pub fn cancel(self) { - let mut cancelled_flag = self.cancelled.lock().unwrap(); + let mut cancelled_flag = self.cancelled.lock(); *cancelled_flag = true; self.handle.abort(); } diff --git a/device/mock_device/src/mock_web_socket_server.rs b/device/mock_device/src/mock_web_socket_server.rs index fc4569794..f47fbe568 100644 --- a/device/mock_device/src/mock_web_socket_server.rs +++ b/device/mock_device/src/mock_web_socket_server.rs @@ -14,18 +14,14 @@ // // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashMap, - net::SocketAddr, - sync::{Arc, RwLock}, - time::Duration, -}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use http::{HeaderMap, StatusCode}; use ripple_sdk::{ api::gateway::rpc_gateway_api::JsonRpcApiRequest, futures::{stream::SplitSink, SinkExt, StreamExt}, log::{debug, error, warn}, + parking_lot::RwLock, tokio::{ self, net::{TcpListener, TcpStream}, @@ -354,7 +350,7 @@ impl MockWebSocketServer { } fn responses_for_key_v2(&self, req: &JsonRpcApiRequest) -> Option { - let mock_data = self.mock_data_v2.read().unwrap(); + let mock_data = self.mock_data_v2.read(); if let Some(v) = mock_data.get(&req.method.to_lowercase()).cloned() { if v.len() == 1 { return v.first().cloned(); @@ -384,7 +380,7 @@ impl MockWebSocketServer { } pub async fn add_request_response_v2(&self, request: MockData) -> Result<(), MockDataError> { - let mut mock_data = self.mock_data_v2.write().unwrap(); + let mut mock_data = self.mock_data_v2.write(); let lower_key_mock_data: MockData = request .into_iter() .map(|(k, v)| (k.to_lowercase(), v)) @@ -394,7 +390,7 @@ impl MockWebSocketServer { } pub async fn remove_request_response_v2(&self, request: MockData) -> Result<(), MockDataError> { - let mut mock_data = self.mock_data_v2.write().unwrap(); + let mut mock_data = self.mock_data_v2.write(); for (cleanup_key, cleanup_params) in request { if let Some(v) = mock_data.remove(&cleanup_key.to_lowercase()) { let mut new_param_response = Vec::new(); diff --git a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs index c58ec68d5..7afadf726 100644 --- a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs +++ b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs @@ -15,11 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashMap, - str::FromStr, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, str::FromStr, sync::Arc}; use ripple_sdk::{ api::{ @@ -36,6 +32,7 @@ use ripple_sdk::{ }, extn::extn_client_message::ExtnEvent, log::{debug, error, trace}, + parking_lot::RwLock, serde_json::{self, Value}, utils::error::RippleError, }; @@ -279,7 +276,7 @@ impl ThunderEventProcessor { } pub fn get_handler(&self, event: &str) -> Option { - let event_map = self.event_map.read().unwrap(); + let event_map = self.event_map.read(); event_map.get(event).cloned() } @@ -298,7 +295,7 @@ impl ThunderEventProcessor { pub fn add_event_listener(&self, app_id: String, handler: ThunderEventHandler) -> bool { let event_name = handler.get_id(); - let mut event_map = self.event_map.write().unwrap(); + let mut event_map = self.event_map.write(); if let Some(entry) = event_map.get_mut(&event_name) { entry.add_listener(app_id); return false; @@ -309,7 +306,7 @@ impl ThunderEventProcessor { } pub fn remove_event_listener(&self, event_name: String, app_id: String) -> bool { - let mut event_map = self.event_map.write().unwrap(); + let mut event_map = self.event_map.write(); if let Some(entry) = event_map.get_mut(&event_name) { if !entry.remove_listener(app_id) { return false; @@ -320,7 +317,7 @@ impl ThunderEventProcessor { } pub fn add_last_event(&self, event_name: &str, value: &ExtnEvent) { - let mut last_event_map = self.last_event.write().unwrap(); + let mut last_event_map = self.last_event.write(); last_event_map.insert( event_name.to_string(), serde_json::to_value(value.clone()).unwrap(), @@ -329,7 +326,7 @@ impl ThunderEventProcessor { pub fn check_last_event(&self, event_name: &str, value: &ExtnEvent) -> bool { let ref_value = serde_json::to_value(value.clone()).unwrap(); - let last_event_map = self.last_event.read().unwrap(); + let last_event_map = self.last_event.read(); if let Some(last_event) = last_event_map.get(event_name) { return last_event.eq(&ref_value); } diff --git a/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs b/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs index bdd550903..8c6aec14e 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs @@ -15,11 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use crate::{ client::{thunder_client::ThunderClient, thunder_plugin::ThunderPlugin}, @@ -77,6 +73,7 @@ use ripple_sdk::{ manifest::device_manifest::DefaultValues, }, log::trace, + parking_lot::RwLock, serde_json::{Map, Value}, tokio::join, }; @@ -175,74 +172,74 @@ impl CachedState { } fn get_hdcp_support(&self) -> Option> { - self.cached.read().unwrap().hdcp_support.clone() + self.cached.read().hdcp_support.clone() } fn update_hdcp_support(&self, value: HashMap) { - let mut hdcp = self.cached.write().unwrap(); + let mut hdcp = self.cached.write(); let _ = hdcp.hdcp_support.insert(value); } fn get_hdcp_status(&self) -> Option { - self.cached.read().unwrap().hdcp_status.clone() + self.cached.read().hdcp_status.clone() } fn update_hdcp_status(&self, value: HDCPStatus) { - let mut hdcp = self.cached.write().unwrap(); + let mut hdcp = self.cached.write(); let _ = hdcp.hdcp_status.insert(value); } fn get_hdr(&self) -> Option> { - self.cached.read().unwrap().hdr_profile.clone() + self.cached.read().hdr_profile.clone() } fn update_hdr_support(&self, value: HashMap) { - let mut hdr = self.cached.write().unwrap(); + let mut hdr = self.cached.write(); let _ = hdr.hdr_profile.insert(value); } fn get_mac_address(&self) -> Option { - self.cached.read().unwrap().mac_address.clone() + self.cached.read().mac_address.clone() } fn get_serial_number(&self) -> Option { - self.cached.read().unwrap().serial_number.clone() + self.cached.read().serial_number.clone() } fn update_serial_number(&self, serial_number: String) { - let mut cached = self.cached.write().unwrap(); + let mut cached = self.cached.write(); let _ = cached.serial_number.insert(serial_number); } fn update_mac_address(&self, mac: String) { - let mut cached = self.cached.write().unwrap(); + let mut cached = self.cached.write(); let _ = cached.mac_address.insert(mac); } fn get_model(&self) -> Option { - self.cached.read().unwrap().model.clone() + self.cached.read().model.clone() } fn update_model(&self, model: String) { - let mut cached = self.cached.write().unwrap(); + let mut cached = self.cached.write(); let _ = cached.model.insert(model); } fn get_make(&self) -> Option { - self.cached.read().unwrap().make.clone() + self.cached.read().make.clone() } fn update_make(&self, make: String) { - let mut cached = self.cached.write().unwrap(); + let mut cached = self.cached.write(); let _ = cached.make.insert(make); } fn get_version(&self) -> Option { - self.cached.read().unwrap().version.clone() + self.cached.read().version.clone() } fn update_version(&self, version: FireboltSemanticVersion) { - let mut cached = self.cached.write().unwrap(); + let mut cached = self.cached.write(); let _ = cached.version.insert(version); } } diff --git a/device/thunder_ripple_sdk/src/thunder_state.rs b/device/thunder_ripple_sdk/src/thunder_state.rs index 700b01d64..7fdbd29af 100644 --- a/device/thunder_ripple_sdk/src/thunder_state.rs +++ b/device/thunder_ripple_sdk/src/thunder_state.rs @@ -15,7 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use ripple_sdk::{ api::device::device_operator::{ @@ -25,6 +25,7 @@ use ripple_sdk::{ client::extn_client::ExtnClient, extn_client_message::{ExtnMessage, ExtnPayloadProvider}, }, + parking_lot::RwLock, tokio, tokio::sync::mpsc, tokio::sync::{Mutex, Notify}, @@ -142,7 +143,7 @@ impl ThunderState { } pub fn start_event_thread(&self) { - let mut rx = self.receiver.write().unwrap(); + let mut rx = self.receiver.write(); let rx = rx.take(); if let Some(mut r) = rx { let state_c = self.clone(); diff --git a/distributor/general/src/general_privacy_processor.rs b/distributor/general/src/general_privacy_processor.rs index 0cff2c198..bc435bc79 100644 --- a/distributor/general/src/general_privacy_processor.rs +++ b/distributor/general/src/general_privacy_processor.rs @@ -15,10 +15,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, sync::Arc}; use ripple_sdk::{ api::distributor::distributor_privacy::{ @@ -36,6 +33,7 @@ use ripple_sdk::{ extn_client_message::{ExtnPayloadProvider, ExtnResponse}, }, framework::file_store::FileStore, + parking_lot::RwLock, }; use serde::{Deserialize, Serialize}; @@ -73,7 +71,7 @@ impl PrivacyState { } fn get_property(&self, params: GetPropertyParams) -> bool { - let data = self.privacy_data.read().unwrap(); + let data = self.privacy_data.read(); match params.setting { PrivacySetting::AppDataCollection(a) => data.value.get_data_collections(a), PrivacySetting::AppEntitlementCollection(e) => data.value.get_ent_collections(e), @@ -104,7 +102,7 @@ impl PrivacyState { } fn set_property(&self, params: SetPropertyParams) -> bool { - let mut data = self.privacy_data.write().unwrap(); + let mut data = self.privacy_data.write(); match params.setting.clone() { PrivacySetting::AppDataCollection(a) => { data.value.set_data_collections(a, params.value) @@ -118,7 +116,7 @@ impl PrivacyState { } fn get_settings(&self) -> PrivacySettings { - let data = self.privacy_data.read().unwrap(); + let data = self.privacy_data.read(); data.value.settings.clone() } } diff --git a/distributor/general/src/general_session_processor.rs b/distributor/general/src/general_session_processor.rs index 12120faae..e5342bd7b 100644 --- a/distributor/general/src/general_session_processor.rs +++ b/distributor/general/src/general_session_processor.rs @@ -34,9 +34,10 @@ use ripple_sdk::{ }, framework::file_store::FileStore, log::error, + parking_lot::RwLock, utils::error::RippleError, }; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; #[derive(Debug, Clone)] pub struct DistSessionState { @@ -86,7 +87,7 @@ impl DistributorSessionProcessor { } async fn get_token(mut state: DistSessionState, msg: ExtnMessage) -> bool { - let session = state.session.read().unwrap().value.clone(); + let session = state.session.read().value.clone(); if let Err(e) = state .client .respond( @@ -110,7 +111,7 @@ impl DistributorSessionProcessor { token: AccountSessionTokenRequest, ) -> bool { { - let mut session = state.session.write().unwrap(); + let mut session = state.session.write(); session.value.token = token.token; session.sync(); } @@ -123,7 +124,7 @@ impl DistributorSessionProcessor { provision: ProvisionRequest, ) -> bool { { - let mut session = state.session.write().unwrap(); + let mut session = state.session.write(); session.value.account_id = provision.account_id; session.value.device_id = provision.device_id; if let Some(distributor) = provision.distributor_id {