diff --git a/commons/zenoh-protocol-core/src/encoding.rs b/commons/zenoh-protocol-core/src/encoding.rs index 7d5308d4e3..702e0af89e 100644 --- a/commons/zenoh-protocol-core/src/encoding.rs +++ b/commons/zenoh-protocol-core/src/encoding.rs @@ -172,7 +172,7 @@ impl From<&'static str> for Encoding { } } -impl<'a> From for Encoding { +impl From for Encoding { fn from(s: String) -> Self { for (i, v) in consts::MIMES.iter().enumerate() { if i != 0 && s.starts_with(v) { diff --git a/commons/zenoh-protocol-core/src/lib.rs b/commons/zenoh-protocol-core/src/lib.rs index 077c405c81..51b8cab32e 100644 --- a/commons/zenoh-protocol-core/src/lib.rs +++ b/commons/zenoh-protocol-core/src/lib.rs @@ -33,7 +33,7 @@ pub type NonZeroZInt = NonZeroU64; pub const ZINT_MAX_BYTES: usize = 10; // WhatAmI values -pub type WhatAmI = whatami::WhatAmI; +pub use whatami::WhatAmI; /// Constants and helpers for zenoh `whatami` flags. pub mod whatami; @@ -54,14 +54,14 @@ pub use locators::Locator; pub mod endpoints; pub use endpoints::EndPoint; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Property { pub key: ZInt, pub value: Vec, } /// The global unique id of a zenoh peer. -#[derive(Clone, Copy, Eq)] +#[derive(Clone, Copy)] pub struct PeerId { size: usize, id: [u8; PeerId::MAX_SIZE], @@ -119,10 +119,12 @@ impl FromStr for PeerId { impl PartialEq for PeerId { #[inline] fn eq(&self, other: &Self) -> bool { - self.size == other.size && self.as_slice() == other.as_slice() + self.as_slice() == other.as_slice() } } +impl Eq for PeerId {} + impl Hash for PeerId { fn hash(&self, state: &mut H) { self.as_slice().hash(state); @@ -148,7 +150,7 @@ impl From<&PeerId> for uhlc::ID { } } -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] #[repr(u8)] pub enum Priority { Control = 0, @@ -170,6 +172,7 @@ impl Default for Priority { Priority::Data } } + impl TryFrom for Priority { type Error = zenoh_core::Error; @@ -191,7 +194,7 @@ impl TryFrom for Priority { } } -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] #[repr(u8)] pub enum Reliability { BestEffort, @@ -204,13 +207,13 @@ impl Default for Reliability { } } -#[derive(Debug, Copy, Clone, PartialEq, Default)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default)] pub struct Channel { pub priority: Priority, pub reliability: Reliability, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum ConduitSnList { Plain(ConduitSn), QoS(Box<[ConduitSn; Priority::NUM]>), @@ -248,14 +251,14 @@ impl fmt::Display for ConduitSnList { } /// The kind of reliability. -#[derive(Debug, Copy, Clone, PartialEq, Default)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default)] pub struct ConduitSn { pub reliable: ZInt, pub best_effort: ZInt, } /// The kind of congestion control. -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] #[repr(u8)] pub enum CongestionControl { Block, @@ -269,7 +272,7 @@ impl Default for CongestionControl { } /// The subscription mode. -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] #[repr(u8)] pub enum SubMode { Push, @@ -284,21 +287,21 @@ impl Default for SubMode { } /// A time period. -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct Period { pub origin: ZInt, pub period: ZInt, pub duration: ZInt, } -#[derive(Debug, Clone, PartialEq, Default)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] pub struct SubInfo { pub reliability: Reliability, pub mode: SubMode, pub period: Option, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct QueryableInfo { pub complete: ZInt, pub distance: ZInt, @@ -320,7 +323,7 @@ pub mod queryable { } /// The kind of consolidation. -#[derive(Debug, Clone, PartialEq, Copy)] +#[derive(Debug, Clone, PartialEq, Copy, Eq, Hash)] #[repr(u8)] pub enum ConsolidationMode { None, @@ -330,7 +333,7 @@ pub enum ConsolidationMode { /// The kind of consolidation that should be applied on replies to a [`get`](zenoh::Session::get) /// at different stages of the reply process. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConsolidationStrategy { pub first_routers: ConsolidationMode, pub last_router: ConsolidationMode, @@ -415,7 +418,7 @@ impl Default for ConsolidationStrategy { } /// The [`Queryable`](zenoh::queryable::Queryable)s that should be target of a [`get`](zenoh::Session::get). -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Target { BestMatching, All, @@ -432,7 +435,7 @@ impl Default for Target { } /// The [`Queryable`](zenoh::queryable::Queryable)s that should be target of a [`get`](zenoh::Session::get). -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct QueryTarget { pub kind: ZInt, pub target: Target, diff --git a/io/zenoh-transport/src/primitives/mod.rs b/io/zenoh-transport/src/primitives/mod.rs index e39ffef072..b90c19224b 100644 --- a/io/zenoh-transport/src/primitives/mod.rs +++ b/io/zenoh-transport/src/primitives/mod.rs @@ -23,6 +23,7 @@ use super::protocol::io::ZBuf; use super::protocol::proto::{DataInfo, RoutingContext}; pub use demux::*; pub use mux::*; +use std::ops::Deref; pub trait Primitives: Send + Sync { fn decl_resource(&self, expr_id: ZInt, key_expr: &KeyExpr); @@ -96,6 +97,133 @@ pub trait Primitives: Send + Sync { fn send_close(&self); } +impl

Primitives for P +where + P: Deref + Send + Sync, + P::Target: Primitives + Send + Sync, +{ + fn decl_resource(&self, expr_id: ZInt, key_expr: &KeyExpr) { + self.deref().decl_resource(expr_id, key_expr) + } + + fn forget_resource(&self, expr_id: ZInt) { + self.deref().forget_resource(expr_id) + } + + fn decl_publisher(&self, key_expr: &KeyExpr, routing_context: Option) { + self.deref().decl_publisher(key_expr, routing_context) + } + + fn forget_publisher(&self, key_expr: &KeyExpr, routing_context: Option) { + self.deref().forget_publisher(key_expr, routing_context) + } + + fn decl_subscriber( + &self, + key_expr: &KeyExpr, + sub_info: &SubInfo, + routing_context: Option, + ) { + self.deref() + .decl_subscriber(key_expr, sub_info, routing_context) + } + + fn forget_subscriber(&self, key_expr: &KeyExpr, routing_context: Option) { + self.deref().forget_subscriber(key_expr, routing_context) + } + + fn decl_queryable( + &self, + key_expr: &KeyExpr, + kind: ZInt, + qabl_info: &QueryableInfo, + routing_context: Option, + ) { + self.deref() + .decl_queryable(key_expr, kind, qabl_info, routing_context) + } + + fn forget_queryable( + &self, + key_expr: &KeyExpr, + kind: ZInt, + routing_context: Option, + ) { + self.deref() + .forget_queryable(key_expr, kind, routing_context) + } + + fn send_data( + &self, + key_expr: &KeyExpr, + payload: ZBuf, + channel: Channel, + cogestion_control: CongestionControl, + data_info: Option, + routing_context: Option, + ) { + self.deref().send_data( + key_expr, + payload, + channel, + cogestion_control, + data_info, + routing_context, + ) + } + + fn send_query( + &self, + key_expr: &KeyExpr, + value_selector: &str, + qid: ZInt, + target: QueryTarget, + consolidation: ConsolidationStrategy, + routing_context: Option, + ) { + self.deref().send_query( + key_expr, + value_selector, + qid, + target, + consolidation, + routing_context, + ) + } + + fn send_reply_data( + &self, + qid: ZInt, + replier_kind: ZInt, + replier_id: PeerId, + key_expr: KeyExpr, + info: Option, + payload: ZBuf, + ) { + self.deref() + .send_reply_data(qid, replier_kind, replier_id, key_expr, info, payload) + } + + fn send_reply_final(&self, qid: ZInt) { + self.deref().send_reply_final(qid) + } + + fn send_pull( + &self, + is_final: bool, + key_expr: &KeyExpr, + pull_id: ZInt, + max_samples: &Option, + ) { + self.deref() + .send_pull(is_final, key_expr, pull_id, max_samples) + } + + fn send_close(&self) { + self.deref().send_close() + } +} + #[derive(Default)] pub struct DummyPrimitives; diff --git a/zenoh/src/net/routing/face.rs b/zenoh/src/net/routing/face.rs index 11d7af5593..dedf172392 100644 --- a/zenoh/src/net/routing/face.rs +++ b/zenoh/src/net/routing/face.rs @@ -11,6 +11,7 @@ // Contributors: // ADLINK zenoh team, // +use super::network::LinkId; use super::router::*; use async_std::sync::Arc; use std::collections::{HashMap, HashSet}; @@ -24,12 +25,28 @@ use zenoh_protocol_core::{ }; use zenoh_transport::Primitives; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(transparent)] +pub struct FaceId(usize); + +impl FaceId { + pub fn new(id: usize) -> Self { + Self(id) + } +} + +impl fmt::Display for FaceId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + pub struct FaceState { - pub(super) id: usize, + pub(super) id: FaceId, pub(super) pid: PeerId, pub(super) whatami: WhatAmI, pub(super) primitives: Arc, - pub(super) link_id: usize, + pub(super) link_id: LinkId, pub(super) local_mappings: HashMap, pub(super) remote_mappings: HashMap, pub(super) local_subs: HashSet, @@ -42,11 +59,11 @@ pub struct FaceState { impl FaceState { pub(super) fn new( - id: usize, + id: FaceId, pid: PeerId, whatami: WhatAmI, primitives: Arc, - link_id: usize, + link_id: LinkId, ) -> Arc { Arc::new(FaceState { id, @@ -157,7 +174,6 @@ impl fmt::Display for FaceState { } } -#[derive(Clone)] pub struct Face { pub(crate) tables: Arc>, pub(crate) state: Arc, diff --git a/zenoh/src/net/routing/network.rs b/zenoh/src/net/routing/network.rs index fb6567b99c..28ec4b2b59 100644 --- a/zenoh/src/net/routing/network.rs +++ b/zenoh/src/net/routing/network.rs @@ -13,24 +13,42 @@ // use super::runtime::Runtime; use petgraph::graph::NodeIndex; -use petgraph::visit::{IntoNodeReferences, VisitMap, Visitable}; +use petgraph::visit::{VisitMap, Visitable}; +use std::collections::HashSet; use std::convert::TryInto; +use std::fmt; use vec_map::VecMap; use zenoh_link::Locator; use zenoh_protocol::core::{PeerId, WhatAmI, ZInt}; use zenoh_protocol::proto::{LinkState, ZenohMessage}; use zenoh_transport::TransportUnicast; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(transparent)] +pub(crate) struct LinkId(usize); + +impl LinkId { + pub fn new(id: usize) -> Self { + Self(id) + } +} + +impl fmt::Display for LinkId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + pub(crate) struct Node { pub(crate) pid: PeerId, pub(crate) whatami: Option, pub(crate) locators: Option>, pub(crate) sn: ZInt, - pub(crate) links: Vec, + pub(crate) links: HashSet, } -impl std::fmt::Debug for Node { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { +impl fmt::Debug for Node { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.pid) } } @@ -75,7 +93,6 @@ impl Link { } } -#[derive(Clone)] pub(crate) struct Tree { pub(crate) parent: Option, pub(crate) childs: Vec, @@ -109,7 +126,7 @@ impl Network { whatami: Some(runtime.whatami), locators: None, sn: 1, - links: vec![], + links: HashSet::new(), }); Network { name, @@ -129,7 +146,7 @@ impl Network { } pub(crate) fn dot(&self) -> String { - std::format!( + format!( "{:?}", petgraph::dot::Dot::with_config(&self.graph, &[petgraph::dot::Config::EdgeNoLabel]) ) @@ -143,8 +160,8 @@ impl Network { } #[inline] - pub(crate) fn get_link(&self, id: usize) -> Option<&Link> { - self.links.get(id) + pub(crate) fn get_link(&self, id: LinkId) -> Option<&Link> { + self.links.get(id.0) } #[inline] @@ -153,23 +170,23 @@ impl Network { } #[inline] - pub(crate) fn get_local_context(&self, context: Option, link_id: usize) -> usize { + pub(crate) fn get_local_context(&self, context: Option, link_id: LinkId) -> NodeIndex { let context = context.unwrap_or(0); match self.get_link(link_id) { Some(link) => match link.get_local_psid(&context) { - Some(psid) => (*psid).try_into().unwrap_or(0), + Some(&psid) => NodeIndex::new(psid as usize), None => { log::error!( "Cannot find local psid for context {} on link {}", context, link_id ); - 0 + NodeIndex::new(0) } }, None => { log::error!("Cannot find link {}", link_id); - 0 + NodeIndex::new(0) } } } @@ -381,7 +398,7 @@ impl Network { let oldsn = node.sn; if oldsn < sn { node.sn = sn; - node.links = links.clone(); + node.links = links.iter().cloned().collect(); if locators.is_some() { node.locators = locators; } @@ -400,7 +417,7 @@ impl Network { whatami: Some(whatami), locators, sn, - links: links.clone(), + links: links.iter().cloned().collect(), }; log::debug!("{} Add node (state) {}", self.name, pid); let idx = self.add_node(node); @@ -430,7 +447,7 @@ impl Network { whatami: None, locators: None, sn: 0, - links: vec![], + links: HashSet::new(), }; log::debug!("{} Add node (reintroduced) {}", self.name, link.clone()); let idx = self.add_node(node); @@ -527,7 +544,7 @@ impl Network { removed } - pub(crate) fn add_link(&mut self, transport: TransportUnicast) -> usize { + pub(crate) fn add_link(&mut self, transport: TransportUnicast) -> LinkId { let free_index = { let mut i = 0; while self.links.contains_key(i) { @@ -549,7 +566,7 @@ impl Network { whatami: Some(whatami), locators: None, sn: 0, - links: vec![], + links: HashSet::new(), }), true, ) @@ -559,7 +576,7 @@ impl Network { log::trace!("Update edge (link) {} {}", self.graph[self.idx].pid, pid); self.update_edge(self.idx, idx); } - self.graph[self.idx].links.push(pid); + self.graph[self.idx].links.insert(pid); self.graph[self.idx].sn += 1; if new { @@ -570,7 +587,7 @@ impl Network { let idxs = self.graph.node_indices().map(|i| (i, true)).collect(); self.send_on_link(idxs, &transport); - free_index + LinkId::new(free_index) } pub(crate) fn remove_link(&mut self, pid: &PeerId) -> Vec<(NodeIndex, Node)> { @@ -739,17 +756,12 @@ impl Network { new_childs } -} -#[inline] -pub(super) fn shared_nodes(net1: &Network, net2: &Network) -> Vec { - net1.graph - .node_references() - .filter_map(|(_, node1)| { - net2.graph - .node_references() - .any(|(_, node2)| node1.pid == node2.pid) - .then(|| node1.pid) - }) - .collect() + #[inline] + pub(super) fn shared_nodes(&self, other: &Network) -> Vec { + let pid_set1: HashSet<_> = self.graph.node_weights().map(|node| node.pid).collect(); + let pid_set2: HashSet<_> = other.graph.node_weights().map(|node| node.pid).collect(); + let common_pids: Vec<_> = pid_set1.intersection(&pid_set2).cloned().collect(); + common_pids + } } diff --git a/zenoh/src/net/routing/pubsub.rs b/zenoh/src/net/routing/pubsub.rs index d854998e17..75d4dab7de 100644 --- a/zenoh/src/net/routing/pubsub.rs +++ b/zenoh/src/net/routing/pubsub.rs @@ -28,7 +28,7 @@ use zenoh_protocol_core::{ use crate::net::routing::router::Matches; -use super::face::FaceState; +use super::face::{FaceId, FaceState}; use super::network::Network; use super::restree::Strengthen; use super::router::{ @@ -39,7 +39,7 @@ use super::router::{ #[inline] fn send_sourced_subscription_to_net_childs( restree: &mut ResourceTree, - faces: &HashMap>, + faces: &HashMap>, net: &Network, childs: &[NodeIndex], res: &ResourceTreeIndex, @@ -365,7 +365,7 @@ fn client_subs(tables: &Tables, res: &ResourceTreeIndex) -> Vec> #[inline] fn send_forget_sourced_subscription_to_net_childs( restree: &mut ResourceTree, - faces: &HashMap>, + faces: &HashMap>, net: &Network, childs: &[NodeIndex], res: &ResourceTreeIndex, @@ -766,14 +766,14 @@ fn insert_faces_for_subs( suffix: &str, tables: &Tables, net: &Network, - source: usize, + source: NodeIndex, subs: &VecSet, ) { - if net.trees.len() > source { + if net.trees.len() > source.index() { for sub in subs { if let Some(sub_idx) = net.get_idx(sub) { - if net.trees[source].directions.len() > sub_idx.index() { - if let Some(direction) = net.trees[source].directions[sub_idx.index()] { + if net.trees[source.index()].directions.len() > sub_idx.index() { + if let Some(direction) = net.trees[source.index()].directions[sub_idx.index()] { if net.graph.contains_node(direction) { if let Some(face) = tables.get_face(&net.graph[direction].pid) { route.entry(face.id).or_insert_with(|| { @@ -786,8 +786,8 @@ fn insert_faces_for_subs( ( face.clone(), key_expr.to_owned(), - if source != 0 { - Some(RoutingContext::new(source as ZInt)) + if source.index() != 0 { + Some(RoutingContext::new(source.index() as ZInt)) } else { None }, @@ -800,7 +800,7 @@ fn insert_faces_for_subs( } } } else { - log::trace!("Tree for node sid:{} not yet ready", source); + log::trace!("Tree for node sid:{} not yet ready", source.index()); } } @@ -808,7 +808,7 @@ fn compute_data_route( tables: &Tables, prefix: &ResourceTreeIndex, suffix: &str, - source: Option, + source: Option, source_type: WhatAmI, ) -> Arc { let mut route = HashMap::new(); @@ -828,7 +828,7 @@ fn compute_data_route( let net = tables.routers_net.as_ref().unwrap(); let router_source = match source_type { WhatAmI::Router => source.unwrap(), - _ => net.idx.index(), + _ => net.idx, }; insert_faces_for_subs( &mut route, @@ -845,7 +845,7 @@ fn compute_data_route( let net = tables.peers_net.as_ref().unwrap(); let peer_source = match source_type { WhatAmI::Peer => source.unwrap(), - _ => net.idx.index(), + _ => net.idx, }; insert_faces_for_subs( &mut route, @@ -863,7 +863,7 @@ fn compute_data_route( let net = tables.peers_net.as_ref().unwrap(); let peer_source = match source_type { WhatAmI::Router | WhatAmI::Peer => source.unwrap(), - _ => net.idx.index(), + _ => net.idx, }; insert_faces_for_subs( &mut route, @@ -934,8 +934,8 @@ pub(crate) fn compute_data_routes(tables: &mut Tables, res: &ResourceTreeIndex) .routers_data_routes .resize_with(max_idx.index() + 1, || Arc::new(HashMap::new())); - for idx in &indexes { - let route = compute_data_route(tables, res, "", Some(idx.index()), WhatAmI::Peer); + for &idx in &indexes { + let route = compute_data_route(tables, res, "", Some(idx), WhatAmI::Peer); tables.restree.weight_mut(res).routers_data_routes[idx.index()] = route; } } @@ -955,8 +955,8 @@ pub(crate) fn compute_data_routes(tables: &mut Tables, res: &ResourceTreeIndex) .peers_data_routes .resize_with(max_idx.index() + 1, || Arc::new(HashMap::new())); - for idx in &indexes { - let route = compute_data_route(tables, res, "", Some(idx.index()), WhatAmI::Peer); + for &idx in &indexes { + let route = compute_data_route(tables, res, "", Some(idx), WhatAmI::Peer); tables.restree.weight_mut(res).peers_data_routes[idx.index()] = route; } } @@ -1024,20 +1024,22 @@ macro_rules! treat_timestamp { fn routers_data_route( tables: &Tables, res: &ResourceTreeIndex, - context: usize, + context: NodeIndex, ) -> Option> { let ctx = tables.restree.weight(res); - (ctx.routers_data_routes.len() > context).then(|| ctx.routers_data_routes[context].clone()) + (ctx.routers_data_routes.len() > context.index()) + .then(|| ctx.routers_data_routes[context.index()].clone()) } #[inline(always)] fn peers_data_route( tables: &Tables, res: &ResourceTreeIndex, - context: usize, + context: NodeIndex, ) -> Option> { let ctx = tables.restree.weight(res); - (ctx.peers_data_routes.len() > context).then(|| ctx.peers_data_routes[context].clone()) + (ctx.peers_data_routes.len() > context.index()) + .then(|| ctx.peers_data_routes[context.index()].clone()) } #[inline(always)] @@ -1090,7 +1092,7 @@ fn get_data_route( } _ => res .as_ref() - .and_then(|res| routers_data_route(tables, res, 0)) + .and_then(|res| routers_data_route(tables, res, NodeIndex::new(0))) .unwrap_or_else(|| { compute_data_route(tables, prefix, suffix, None, WhatAmI::Client) }), @@ -1114,7 +1116,7 @@ fn get_data_route( } _ => res .as_ref() - .and_then(|res| peers_data_route(tables, res, 0)) + .and_then(|res| peers_data_route(tables, res, NodeIndex::new(0))) .unwrap_or_else(|| { compute_data_route(tables, prefix, suffix, None, WhatAmI::Client) }), @@ -1138,59 +1140,60 @@ fn get_matching_pulls( .unwrap_or_else(|| compute_matching_pulls(tables, prefix, suffix)) } -macro_rules! send_to_first { - ($route:expr, $srcface:expr, $payload:expr, $channel:expr, $cong_ctrl:expr, $data_info:expr) => { - let (outface, key_expr, context) = $route.values().next().unwrap(); - if $srcface.id != outface.id { - outface - .primitives - .send_data( - &key_expr, - $payload, - $channel, // @TODO: Need to check the active subscriptions to determine the right reliability value - $cong_ctrl, - $data_info, - *context, - ) - } +fn send_to_first( + route: &Route, + srcface: &FaceState, + payload: ZBuf, + channel: Channel, + cong_ctrl: CongestionControl, + data_info: Option, +) { + let (outface, key_expr, context) = route.values().next().unwrap(); + if srcface.id != outface.id { + outface.primitives.send_data( + &key_expr, payload, + channel, // @TODO: Need to check the active subscriptions to determine the right reliability value + cong_ctrl, data_info, *context, + ) } } -macro_rules! send_to_all { - ($route:expr, $srcface:expr, $payload:expr, $channel:expr, $cong_ctrl:expr, $data_info:expr) => { - for (outface, key_expr, context) in $route.values() { - if $srcface.id != outface.id { - outface - .primitives - .send_data( - &key_expr, - $payload.clone(), - $channel, // @TODO: Need to check the active subscriptions to determine the right reliability value - $cong_ctrl, - $data_info.clone(), - *context, - ) - } +fn send_to_all( + route: &Route, + srcface: &FaceState, + payload: ZBuf, + channel: Channel, + cong_ctrl: CongestionControl, + data_info: Option, +) { + for (outface, key_expr, context) in route.values() { + if srcface.id != outface.id { + outface.primitives.send_data( + key_expr, + payload.clone(), + channel, // @TODO: Need to check the active subscriptions to determine the right reliability value + cong_ctrl, + data_info.clone(), + *context, + ) } } } -macro_rules! cache_data { - ( - $tables:expr, - $matching_pulls:expr, - $prefix:expr, - $suffix:expr, - $payload:expr, - $info:expr - ) => { - for context in $matching_pulls.iter() { - get_mut_unchecked(context).last_values.insert( - [&$tables.restree.expr(&$prefix), $suffix].concat(), - ($info.clone(), $payload.clone()), - ); - } - }; +fn cache_data( + tables: &Tables, + matching_pulls: Arc, + prefix: ResourceTreeIndex, + suffix: &str, + payload: &ZBuf, + info: Option<&DataInfo>, +) { + for context in matching_pulls.iter() { + get_mut_unchecked(context).last_values.insert( + [&tables.restree.expr(&prefix), suffix].concat(), + (info.cloned(), payload.clone()), + ); + } } #[inline] @@ -1228,21 +1231,35 @@ pub fn route_data( let data_info = treat_timestamp!(&tables.hlc, info); if route.len() == 1 && matching_pulls.len() == 0 { - send_to_first!(route, face, payload, channel, congestion_control, data_info); + send_to_first( + &*route, + face, + payload, + channel, + congestion_control, + data_info, + ); } else { if !matching_pulls.is_empty() { let lock = zlock!(tables.pull_caches_lock); - cache_data!( - tables, + cache_data( + &*tables, matching_pulls, prefix, expr.suffix.as_ref(), - payload, - data_info + &payload, + data_info.as_ref(), ); drop(lock); } - send_to_all!(route, face, payload, channel, congestion_control, data_info); + send_to_all( + &*route, + face, + payload, + channel, + congestion_control, + data_info, + ); } } } @@ -1289,22 +1306,36 @@ pub fn full_reentrant_route_data( if route.len() == 1 && matching_pulls.len() == 0 { drop(tables); - send_to_first!(route, face, payload, channel, congestion_control, data_info); + send_to_first( + &*route, + face, + payload, + channel, + congestion_control, + data_info, + ); } else { if !matching_pulls.is_empty() { let lock = zlock!(tables.pull_caches_lock); - cache_data!( - tables, + cache_data( + &*tables, matching_pulls, prefix, expr.suffix.as_ref(), - payload, - data_info + &payload, + data_info.as_ref(), ); drop(lock); } drop(tables); - send_to_all!(route, face, payload, channel, congestion_control, data_info); + send_to_all( + &*route, + face, + payload, + channel, + congestion_control, + data_info, + ); } } } diff --git a/zenoh/src/net/routing/queries.rs b/zenoh/src/net/routing/queries.rs index 01deb6fd97..0b8c501d4e 100644 --- a/zenoh/src/net/routing/queries.rs +++ b/zenoh/src/net/routing/queries.rs @@ -30,7 +30,7 @@ use zenoh_protocol_core::{ Target, WhatAmI, ZInt, }; -use super::face::FaceState; +use super::face::{FaceId, FaceState}; use super::network::Network; use super::restree::Strengthen; use super::router::Tables; @@ -208,26 +208,26 @@ fn local_qabl_info( #[allow(clippy::too_many_arguments)] #[inline] -fn send_sourced_queryable_to_net_childs>>( +fn send_sourced_queryable_to_net_childs( restree: &mut ResourceTree, - faces: &HashMap>, + faces: &HashMap>, net: &Network, childs: &[NodeIndex], res: &ResourceTreeIndex, kind: ZInt, qabl_info: &QueryableInfo, - src_face: Option, + src_face: Option<&Arc>, routing_context: Option, ) { - for child in childs { - if net.graph.contains_node(*child) { + for &child in childs { + if let Some(child_node) = net.graph.node_weight(child) { match faces .values() - .find(|face| face.pid == net.graph[*child].pid) + .find(|face| face.pid == child_node.pid) .cloned() { Some(someface) => { - if src_face.is_none() || someface.id != src_face.as_ref().unwrap().borrow().id { + if src_face.is_none() || someface.id != src_face.as_ref().unwrap().id { let key_expr = Tables::decl_key(restree, res, &someface); log::debug!( @@ -245,7 +245,7 @@ fn send_sourced_queryable_to_net_childs ); } } - None => log::trace!("Unable to find face for pid {}", net.graph[*child].pid), + None => log::trace!("Unable to find face for pid {}", child_node.pid), } } } @@ -283,12 +283,12 @@ fn propagate_simple_queryable( } } -fn propagate_sourced_queryable>>( +fn propagate_sourced_queryable( tables: &mut Tables, res: &ResourceTreeIndex, kind: ZInt, qabl_info: &QueryableInfo, - src_face: Option, + src_face: Option<&Arc>, source: &PeerId, net_type: WhatAmI, ) { @@ -385,9 +385,9 @@ pub fn declare_router_queryable( } } -fn register_peer_queryable>>( +fn register_peer_queryable( tables: &mut Tables, - face: Option, + face: Option<&Arc>, res: &ResourceTreeIndex, kind: ZInt, qabl_info: &QueryableInfo, @@ -570,7 +570,7 @@ fn client_qabls(tables: &Tables, res: &ResourceTreeIndex, kind: ZInt) -> Vec>, + faces: &HashMap>, net: &Network, childs: &[NodeIndex], res: &ResourceTreeIndex, @@ -854,14 +854,7 @@ pub(crate) fn undeclare_client_queryable( undeclare_peer_queryable(tables, None, &res, kind, &tables.pid.clone()); } else { let local_info = local_peer_qabl_info(tables, &res, kind); - register_peer_queryable::<&Arc>( - tables, - None, - &res, - kind, - &local_info, - tables.pid, - ); + register_peer_queryable(tables, None, &res, kind, &local_info, tables.pid); } } _ => { @@ -1028,7 +1021,7 @@ pub(crate) fn queries_tree_change( .map(|((qabl, kind), qabl_info)| ((*qabl, *kind), qabl_info.clone())) { if qabl == tree_id { - send_sourced_queryable_to_net_childs::<&Arc>( + send_sourced_queryable_to_net_childs( restree, &tables.faces, net, @@ -1066,15 +1059,16 @@ fn insert_target_for_qabls( suffix: &str, tables: &Tables, net: &Network, - source: usize, + source: NodeIndex, qabls: &VecMap<(PeerId, ZInt), QueryableInfo>, complete: bool, ) { - if net.trees.len() > source { + if net.trees.len() > source.index() { for ((qabl, qabl_kind), qabl_info) in qabls { if let Some(qabl_idx) = net.get_idx(qabl) { - if net.trees[source].directions.len() > qabl_idx.index() { - if let Some(direction) = net.trees[source].directions[qabl_idx.index()] { + if net.trees[source.index()].directions.len() > qabl_idx.index() { + if let Some(direction) = net.trees[source.index()].directions[qabl_idx.index()] + { if net.graph.contains_node(direction) { if let Some(face) = tables.get_face(&net.graph[direction].pid) { if net.distances.len() > qabl_idx.index() { @@ -1088,8 +1082,8 @@ fn insert_target_for_qabls( direction: ( face.clone(), key_expr.to_owned(), - if source != 0 { - Some(RoutingContext::new(source as ZInt)) + if source.index() != 0 { + Some(RoutingContext::new(source.index() as ZInt)) } else { None }, @@ -1106,7 +1100,7 @@ fn insert_target_for_qabls( } } } else { - log::trace!("Tree for node sid:{} not yet ready", source); + log::trace!("Tree for node sid:{} not yet ready", source.index()); } } @@ -1114,7 +1108,7 @@ fn compute_query_route( tables: &mut Tables, prefix: &ResourceTreeIndex, suffix: &str, - source: Option, + source: Option, source_type: WhatAmI, ) -> Arc { let mut route = TargetQablSet::new(); @@ -1135,7 +1129,7 @@ fn compute_query_route( let net = tables.routers_net.as_ref().unwrap(); let router_source = match source_type { WhatAmI::Router => source.unwrap(), - _ => net.idx.index(), + _ => net.idx, }; insert_target_for_qabls( &mut route, @@ -1153,7 +1147,7 @@ fn compute_query_route( let net = tables.peers_net.as_ref().unwrap(); let peer_source = match source_type { WhatAmI::Peer => source.unwrap(), - _ => net.idx.index(), + _ => net.idx, }; insert_target_for_qabls( &mut route, @@ -1172,7 +1166,7 @@ fn compute_query_route( let net = tables.peers_net.as_ref().unwrap(); let peer_source = match source_type { WhatAmI::Router | WhatAmI::Peer => source.unwrap(), - _ => net.idx.index(), + _ => net.idx, }; insert_target_for_qabls( &mut route, @@ -1225,9 +1219,9 @@ pub(crate) fn compute_query_routes(tables: &mut Tables, res: &ResourceTreeIndex) .routers_query_routes .resize_with(max_idx.index() + 1, || Arc::new(TargetQablSet::new())); - for idx in &indexes { + for &idx in &indexes { tables.restree.weight_mut(res).routers_query_routes[idx.index()] = - compute_query_route(tables, res, "", Some(idx.index()), WhatAmI::Router); + compute_query_route(tables, res, "", Some(idx), WhatAmI::Router); } } if tables.whatami == WhatAmI::Router || tables.whatami == WhatAmI::Peer { @@ -1246,9 +1240,9 @@ pub(crate) fn compute_query_routes(tables: &mut Tables, res: &ResourceTreeIndex) .peers_query_routes .resize_with(max_idx.index() + 1, || Arc::new(TargetQablSet::new())); - for idx in &indexes { + for &idx in &indexes { tables.restree.weight_mut(res).peers_query_routes[idx.index()] = - compute_query_route(tables, res, "", Some(idx.index()), WhatAmI::Peer); + compute_query_route(tables, res, "", Some(idx), WhatAmI::Peer); } } if tables.whatami == WhatAmI::Client { @@ -1378,7 +1372,6 @@ fn compute_final_route( } } -#[derive(Clone)] struct QueryCleanup { tables: Arc>, face: Weak, @@ -1407,20 +1400,22 @@ impl Timed for QueryCleanup { pub(super) fn routers_query_route( tables: &Tables, res: &ResourceTreeIndex, - context: usize, + context: NodeIndex, ) -> Option> { let ctx = tables.restree.weight(res); - (ctx.routers_query_routes.len() > context).then(|| ctx.routers_query_routes[context].clone()) + (ctx.routers_query_routes.len() > context.index()) + .then(|| ctx.routers_query_routes[context.index()].clone()) } #[inline(always)] pub(super) fn peers_query_route( tables: &Tables, res: &ResourceTreeIndex, - context: usize, + context: NodeIndex, ) -> Option> { let ctx = tables.restree.weight(res); - (ctx.peers_query_routes.len() > context).then(|| ctx.peers_query_routes[context].clone()) + (ctx.peers_query_routes.len() > context.index()) + .then(|| ctx.peers_query_routes[context.index()].clone()) } #[inline(always)] @@ -1494,7 +1489,7 @@ pub fn route_query( _ => tables .restree .get(&prefix, expr.suffix.as_ref()) - .and_then(|res| routers_query_route(&tables, &res, 0)) + .and_then(|res| routers_query_route(&tables, &res, NodeIndex::new(0))) .unwrap_or_else(|| { compute_query_route( &mut tables, @@ -1527,7 +1522,7 @@ pub fn route_query( _ => tables .restree .get(&prefix, expr.suffix.as_ref()) - .and_then(|res| peers_query_route(&tables, &res, 0)) + .and_then(|res| peers_query_route(&tables, &res, NodeIndex::new(0))) .unwrap_or_else(|| { compute_query_route( &mut tables, diff --git a/zenoh/src/net/routing/restree/arctree.rs b/zenoh/src/net/routing/restree/arctree.rs index b67d6e6ff4..dac815b1cd 100644 --- a/zenoh/src/net/routing/restree/arctree.rs +++ b/zenoh/src/net/routing/restree/arctree.rs @@ -1,3 +1,8 @@ +use std::{ + borrow::Cow, + hash::{Hash, Hasher}, +}; + // // Copyright (c) 2017, 2020 ADLINK Technology Inc. // @@ -34,7 +39,7 @@ impl Node { } } - fn expr(&self) -> std::borrow::Cow<'static, str> { + fn expr(&self) -> Cow<'static, str> { match &self.parent { Some(parent) => [&parent.expr() as &str, &self.suffix].concat().into(), None => "".into(), @@ -49,8 +54,8 @@ impl PartialEq for Node { } impl Eq for Node {} -impl std::hash::Hash for Node { - fn hash(&self, state: &mut H) { +impl Hash for Node { + fn hash(&self, state: &mut H) { self.expr().hash(state); } } @@ -355,7 +360,7 @@ impl<'a, Weight: 'a> ResourceTreeContainer<'a, Weight> for ArcTree { } #[inline] - fn expr<'b>(&'b self, index: &'b Self::Index) -> std::borrow::Cow<'b, str> { + fn expr<'b>(&'b self, index: &'b Self::Index) -> Cow<'b, str> { index.expr() } diff --git a/zenoh/src/net/routing/restree/mod.rs b/zenoh/src/net/routing/restree/mod.rs index 2fa9af6c5c..c77fd5d546 100644 --- a/zenoh/src/net/routing/restree/mod.rs +++ b/zenoh/src/net/routing/restree/mod.rs @@ -12,6 +12,8 @@ // ADLINK zenoh team, // +use std::borrow::Cow; + use petgraph::visit::Walker; pub mod arctree; @@ -55,7 +57,7 @@ pub trait ResourceTreeContainer<'a, Weight>: 'a { from: &Self::Index, ) -> Self::Matches; - fn expr<'b>(&'b self, index: &'b Self::Index) -> std::borrow::Cow<'b, str>; + fn expr<'b>(&'b self, index: &'b Self::Index) -> Cow<'b, str>; fn weight<'b>(&'b self, index: &'b Self::Index) -> &'b Weight; fn weight_mut<'b>(&'b mut self, index: &'b Self::Index) -> &'b mut Weight; } @@ -186,7 +188,7 @@ impl ResourceTreeContainer<'a, Weight, Index = Index>, Index: Clone, } #[inline] - pub fn expr<'b>(&'b self, index: &'b Index) -> std::borrow::Cow<'b, str> { + pub fn expr<'b>(&'b self, index: &'b Index) -> Cow<'b, str> { self.container.expr(index) } diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index d563cf34d6..dfd5319600 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -11,8 +11,8 @@ // Contributors: // ADLINK zenoh team, // -use super::face::{Face, FaceState}; -use super::network::{shared_nodes, Network}; +use super::face::{Face, FaceId, FaceState}; +use super::network::{LinkId, Network}; pub use super::pubsub::*; pub use super::queries::*; use super::restree::ResourceTreeContainer; @@ -68,9 +68,9 @@ pub(crate) type ResourceTree = pub(crate) type ResourceTreeIndex = Index; pub(crate) type ResourceTreeWeakIndex = WeakIndex; pub(super) type Direction = (Arc, KeyExpr<'static>, Option); -pub(super) type Route = HashMap; +pub(super) type Route = HashMap; #[cfg(feature = "complete_n")] -pub(super) type QueryRoute = HashMap; +pub(super) type QueryRoute = HashMap; #[cfg(not(feature = "complete_n"))] pub(super) type QueryRoute = Route; pub(super) struct TargetQabl { @@ -93,7 +93,7 @@ pub(crate) struct SessionContext { } pub(crate) struct ResourceContext { pub(super) matches: Vec, - pub(super) session_ctxs: VecMap>, + pub(super) session_ctxs: VecMap>, pub(super) router_subs: VecSet, pub(super) peer_subs: VecSet, pub(super) router_qabls: VecMap<(PeerId, ZInt), QueryableInfo>, @@ -136,7 +136,7 @@ pub struct Tables { // pub(crate) timer: Timer, // pub(crate) queries_default_timeout: Duration, pub(crate) restree: ResourceTree, - pub(crate) faces: HashMap>, + pub(crate) faces: HashMap>, pub(crate) pull_caches_lock: Mutex<()>, pub(crate) router_subs: HashSet, pub(crate) peer_subs: HashSet, @@ -231,9 +231,9 @@ impl Tables { pid: PeerId, whatami: WhatAmI, primitives: Arc, - link_id: usize, + link_id: LinkId, ) -> Weak { - let fid = self.face_counter; + let fid = FaceId::new(self.face_counter); self.face_counter += 1; let newface = self .faces @@ -254,7 +254,7 @@ impl Tables { whatami: WhatAmI, primitives: Arc, ) -> Weak { - self.open_net_face(pid, whatami, primitives, 0) + self.open_net_face(pid, whatami, primitives, LinkId::new(0)) } pub fn close_face(&mut self, face: &Weak) { @@ -453,7 +453,7 @@ impl Tables { restree: &ResourceTree, prefix: &ResourceTreeIndex, suffix: &'a str, - sid: usize, + sid: FaceId, ) -> KeyExpr<'a> { let mut path = restree.reverse_path(prefix, suffix); while let Some(res) = path.walk_next(restree.container()) { @@ -618,10 +618,11 @@ impl Router { peers_autoconnect, routers_autoconnect_gossip, )); - tables.shared_nodes = shared_nodes( - tables.routers_net.as_ref().unwrap(), - tables.peers_net.as_ref().unwrap(), - ); + tables.shared_nodes = tables + .routers_net + .as_ref() + .unwrap() + .shared_nodes(tables.peers_net.as_ref().unwrap()); } } @@ -659,32 +660,37 @@ impl Router { .as_mut() .unwrap() .add_link(transport.clone()), - _ => 0, + _ => LinkId::new(0), }; if tables.whatami == WhatAmI::Router { - tables.shared_nodes = shared_nodes( - tables.routers_net.as_ref().unwrap(), - tables.peers_net.as_ref().unwrap(), - ); + tables.shared_nodes = tables + .routers_net + .as_ref() + .unwrap() + .shared_nodes(tables.peers_net.as_ref().unwrap()); } - let handler = Arc::new(LinkStateInterceptor::new( - transport.clone(), - self.tables.clone(), - Face { + let handler = { + let face = Face { tables: self.tables.clone(), state: tables .open_net_face( transport.get_pid().unwrap(), whatami, - Arc::new(Mux::new(transport)), + Arc::new(Mux::new(transport.clone())), link_id, ) .upgrade() .unwrap(), - }, - )); + }; + + Arc::new(LinkStateInterceptor::new( + transport, + self.tables.clone(), + face, + )) + }; match (self.whatami, whatami) { (WhatAmI::Router, WhatAmI::Router) => { @@ -704,12 +710,14 @@ impl Router { pub struct LinkStateInterceptor { pub(crate) transport: TransportUnicast, pub(crate) tables: Arc>, - pub(crate) face: Face, - pub(crate) demux: DeMux, + pub(crate) face: Arc, + pub(crate) demux: DeMux>, } impl LinkStateInterceptor { fn new(transport: TransportUnicast, tables: Arc>, face: Face) -> Self { + let face = Arc::new(face); + LinkStateInterceptor { transport, tables, @@ -743,10 +751,11 @@ impl TransportPeerEventHandler for LinkStateInterceptor { ); } - tables.shared_nodes = shared_nodes( - tables.routers_net.as_ref().unwrap(), - tables.peers_net.as_ref().unwrap(), - ); + tables.shared_nodes = tables + .routers_net + .as_ref() + .unwrap() + .shared_nodes(tables.peers_net.as_ref().unwrap()); tables.schedule_compute_trees(self.tables.clone(), WhatAmI::Router); } @@ -764,10 +773,11 @@ impl TransportPeerEventHandler for LinkStateInterceptor { } if tables.whatami == WhatAmI::Router { - tables.shared_nodes = shared_nodes( - tables.routers_net.as_ref().unwrap(), - tables.peers_net.as_ref().unwrap(), - ); + tables.shared_nodes = tables + .routers_net + .as_ref() + .unwrap() + .shared_nodes(tables.peers_net.as_ref().unwrap()); } tables.schedule_compute_trees(self.tables.clone(), WhatAmI::Peer); @@ -801,10 +811,11 @@ impl TransportPeerEventHandler for LinkStateInterceptor { queries_remove_node(&mut tables, &removed_node.pid, WhatAmI::Router); } - tables.shared_nodes = shared_nodes( - tables.routers_net.as_ref().unwrap(), - tables.peers_net.as_ref().unwrap(), - ); + tables.shared_nodes = tables + .routers_net + .as_ref() + .unwrap() + .shared_nodes(tables.peers_net.as_ref().unwrap()); tables.schedule_compute_trees(tables_ref.clone(), WhatAmI::Router); } @@ -819,10 +830,11 @@ impl TransportPeerEventHandler for LinkStateInterceptor { } if tables.whatami == WhatAmI::Router { - tables.shared_nodes = shared_nodes( - tables.routers_net.as_ref().unwrap(), - tables.peers_net.as_ref().unwrap(), - ); + tables.shared_nodes = tables + .routers_net + .as_ref() + .unwrap() + .shared_nodes(tables.peers_net.as_ref().unwrap()); } tables.schedule_compute_trees(tables_ref.clone(), WhatAmI::Peer); diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 53ba31b99b..4da3acc6d4 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -19,8 +19,8 @@ use async_std::task; use futures::future::{BoxFuture, FutureExt}; use log::{error, trace}; use serde_json::json; -use std::collections::HashMap; use std::sync::Mutex; +use std::{collections::HashMap, panic::AssertUnwindSafe}; use zenoh_buffers::{SplitBuffer, ZBuf}; use zenoh_protocol::proto::{data_kind, DataInfo, RoutingContext}; use zenoh_protocol_core::{ @@ -515,8 +515,7 @@ pub async fn router_data( use std::convert::TryFrom; let stats = crate::prelude::ValueSelector::try_from(selector) .ok() - .map(|s| s.properties.get("stats").map(|v| v == "true")) - .flatten() + .and_then(|s| s.properties.get("stats").map(|v| v == "true")) .unwrap_or(false); if stats { json.as_object_mut().unwrap().insert( @@ -618,7 +617,7 @@ pub async fn plugins_status( if !with_extended_string(plugin_key, &["/**"], matches_plugin) { return; } - match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + match std::panic::catch_unwind(AssertUnwindSafe(|| { plugin.adminspace_getter(&selector, plugin_key) })) { Ok(Ok(response)) => responses.extend(response), diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index e4424c596e..f5e9fe03cb 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -24,6 +24,8 @@ use async_std::sync::Arc; use async_std::task::JoinHandle; use futures::Future; use std::any::Any; +use std::ops::Deref; +use std::sync::RwLock; use std::time::Duration; use stop_token::future::FutureExt; use stop_token::{StopSource, TimedOutError}; @@ -48,7 +50,7 @@ pub struct RuntimeState { pub config: Notifier, pub manager: TransportManager, pub hlc: Option>, - pub(crate) stop_source: std::sync::RwLock>, + pub(crate) stop_source: RwLock>, } #[derive(Clone)] @@ -56,7 +58,7 @@ pub struct Runtime { state: Arc, } -impl std::ops::Deref for Runtime { +impl Deref for Runtime { type Target = RuntimeState; fn deref(&self) -> &RuntimeState { @@ -112,7 +114,7 @@ impl Runtime { )); let handler = Arc::new(RuntimeTransportEventHandler { - runtime: std::sync::RwLock::new(None), + runtime: RwLock::new(None), }); let transport_manager = TransportManager::builder() @@ -133,7 +135,7 @@ impl Runtime { config: config.clone(), manager: transport_manager, hlc, - stop_source: std::sync::RwLock::new(Some(StopSource::new())), + stop_source: RwLock::new(Some(StopSource::new())), }), }; *handler.runtime.write().unwrap() = Some(runtime.clone()); @@ -200,7 +202,7 @@ impl Runtime { } struct RuntimeTransportEventHandler { - runtime: std::sync::RwLock>, + runtime: RwLock>, } impl TransportEventHandler for RuntimeTransportEventHandler { @@ -212,7 +214,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler { match zread!(self.runtime).as_ref() { Some(runtime) => Ok(Arc::new(RuntimeSession { runtime: runtime.clone(), - locator: std::sync::RwLock::new(None), + locator: RwLock::new(None), sub_event_handler: runtime.router.new_transport_unicast(transport).unwrap(), })), None => bail!("Runtime not yet ready!"), @@ -230,7 +232,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler { pub(super) struct RuntimeSession { pub(super) runtime: Runtime, - pub(super) locator: std::sync::RwLock>, + pub(super) locator: RwLock>, pub(super) sub_event_handler: Arc, } diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 8547a719c6..f4c369cc00 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -73,7 +73,7 @@ impl Runtime { .map(AsRef::as_ref) .unwrap_or("auto") .to_owned(), - std::time::Duration::from_secs_f64(guard.scouting().timeout().unwrap_or(3.)), + Duration::from_secs_f64(guard.scouting().timeout().unwrap_or(3.)), ) }; match peers.len() { @@ -145,7 +145,7 @@ impl Runtime { .map(AsRef::as_ref) .unwrap_or_else(|| ZN_MULTICAST_INTERFACE_DEFAULT) .to_string(), - std::time::Duration::from_secs_f64(guard.scouting().delay().unwrap_or(0.2)), + Duration::from_secs_f64(guard.scouting().delay().unwrap_or(0.2)), ) }; @@ -271,7 +271,7 @@ impl Runtime { .unwrap() .unwrap() .as_any() - .downcast_ref::() + .downcast_ref::() { if let Some(locator) = &*zread!(orch_transport.locator) { !peers.contains(locator) @@ -293,7 +293,7 @@ impl Runtime { .unwrap() .unwrap() .as_any() - .downcast_ref::() + .downcast_ref::() { if let Some(locator) = &*zread!(orch_transport.locator) { return *locator == peer; @@ -449,11 +449,9 @@ impl Runtime { #[allow(clippy::or_fun_call)] let local_addr = socket .local_addr() - .or::(Ok(SocketAddr::new(addr, 0).into())) - .unwrap() + .unwrap_or(SocketAddr::new(addr, 0).into()) .as_socket() - .or(Some(SocketAddr::new(addr, 0))) - .unwrap(); + .unwrap_or(SocketAddr::new(addr, 0)); log::debug!("UDP port bound to {}", local_addr); } Err(err) => { @@ -476,7 +474,7 @@ impl Runtime { .unwrap() .unwrap() .as_any() - .downcast_ref::() + .downcast_ref::() { *zwrite!(orch_transport.locator) = Some(peer); } @@ -501,8 +499,8 @@ impl Runtime { mcast_addr: &SocketAddr, mut f: F, ) where - F: FnMut(Hello) -> Fut + std::marker::Send + Copy, - Fut: Future + std::marker::Send, + F: FnMut(Hello) -> Fut + Send + Copy, + Fut: Future + Send, Self: Sized, { let send = async { @@ -549,7 +547,7 @@ impl Runtime { if let Some(msg) = zbuf.reader().read_transport_message() { log::trace!("Received {:?} from {}", msg.body, peer); if let TransportBody::Hello(hello) = &msg.body { - let whatami = hello.whatami.or(Some(WhatAmI::Router)).unwrap(); + let whatami = hello.whatami.unwrap_or(WhatAmI::Router); if matcher.matches(whatami) { if let Loop::Break = f(hello.clone()).await { break; @@ -607,7 +605,7 @@ impl Runtime { sockets: &[UdpSocket], what: I, addr: &SocketAddr, - timeout: std::time::Duration, + timeout: Duration, ) -> ZResult<()> { let scout = async { Runtime::scout(sockets, what.into(), addr, move |hello| async move { @@ -749,7 +747,7 @@ impl Runtime { session.runtime.spawn(async move { let mut delay = CONNECTION_RETRY_INITIAL_PERIOD; while runtime.start_client().await.is_err() { - async_std::task::sleep(std::time::Duration::from_millis(delay)).await; + async_std::task::sleep(Duration::from_millis(delay)).await; delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; if delay > CONNECTION_RETRY_MAX_PERIOD { delay = CONNECTION_RETRY_MAX_PERIOD;