diff --git a/.github/release.yml b/.github/release.yml index c4022026eb..23d56cfb1a 100644 --- a/.github/release.yml +++ b/.github/release.yml @@ -14,12 +14,22 @@ changelog: categories: + - title: Breaking changes 💥 + labels: + - breaking-change - title: New features 🎉 labels: - enhancement + - new feature - title: Bug fixes 🐞 labels: - bug + - title: Documentation 📝 + labels: + - documentation + - title: Dependencies 👷 + labels: + - dependencies - title: Other changes labels: - - "*" + - "*" \ No newline at end of file diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 336f952f3d..ed23b0337a 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -251,13 +251,13 @@ pub mod ext { pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false); - pub const DECLARE: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false); + pub const DECLARE: Self = Self::new(Priority::Control, CongestionControl::Block, false); pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::Drop, false); pub const REQUEST: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false); pub const RESPONSE: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false); pub const RESPONSE_FINAL: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false); - pub const OAM: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false); + pub const OAM: Self = Self::new(Priority::Control, CongestionControl::Block, false); pub const fn new( priority: Priority, diff --git a/commons/zenoh-shm/src/api/cleanup.rs b/commons/zenoh-shm/src/api/cleanup.rs new file mode 100644 index 0000000000..7178af29c8 --- /dev/null +++ b/commons/zenoh-shm/src/api/cleanup.rs @@ -0,0 +1,31 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::posix_shm::cleanup::cleanup_orphaned_segments; + +/// Linux: Trigger cleanup for orphaned SHM segments +/// If process that created named SHM segment crashes or exits by a signal, the segment persists in the system +/// disregarding if it is used by other Zenoh processes or not. This is the detail of POSIX specification for +/// shared memory that is hard to bypass. To deal with this we developed a cleanup routine that enumerates all +/// segments and tries to find processes that are using it. If no such process found, segment will be removed. +/// There is no ideal signal to trigger this cleanup, so by default, zenoh triggers it in the following moments: +/// - first POSIX SHM segment creation +/// - process exit via exit() call or return from maint function +/// It is OK to additionally trigger this function at any time, but be aware that this can be costly. +/// +/// For non-linux platforms this function currently does nothing +#[zenoh_macros::unstable_doc] +pub fn cleanup_orphaned_shm_segments() { + cleanup_orphaned_segments(); +} diff --git a/commons/zenoh-shm/src/api/mod.rs b/commons/zenoh-shm/src/api/mod.rs index a87188da29..8802b1eb58 100644 --- a/commons/zenoh-shm/src/api/mod.rs +++ b/commons/zenoh-shm/src/api/mod.rs @@ -13,6 +13,7 @@ // pub mod buffer; +pub mod cleanup; pub mod client; pub mod client_storage; pub mod common; diff --git a/commons/zenoh-shm/src/cleanup.rs b/commons/zenoh-shm/src/cleanup.rs index 5649732bf6..a5c3aacc4f 100644 --- a/commons/zenoh-shm/src/cleanup.rs +++ b/commons/zenoh-shm/src/cleanup.rs @@ -14,6 +14,8 @@ use static_init::dynamic; +use crate::posix_shm::cleanup::cleanup_orphaned_segments; + /// A global cleanup, that is guaranteed to be dropped at normal program exit and that will /// execute all registered cleanup routines at this moment #[dynamic(lazy, drop)] @@ -26,6 +28,8 @@ pub(crate) struct Cleanup { impl Cleanup { fn new() -> Self { + // on first cleanup subsystem touch we perform zenoh segment cleanup + cleanup_orphaned_segments(); Self { cleanups: Default::default(), } @@ -34,10 +38,8 @@ impl Cleanup { pub(crate) fn register_cleanup(&self, cleanup_fn: Box) { self.cleanups.push(Some(cleanup_fn)); } -} -impl Drop for Cleanup { - fn drop(&mut self) { + fn cleanup(&self) { while let Some(cleanup) = self.cleanups.pop() { if let Some(f) = cleanup { f(); @@ -45,3 +47,11 @@ impl Drop for Cleanup { } } } + +impl Drop for Cleanup { + fn drop(&mut self) { + // on finalization stage we perform zenoh segment cleanup + cleanup_orphaned_segments(); + self.cleanup(); + } +} diff --git a/commons/zenoh-shm/src/posix_shm/cleanup.rs b/commons/zenoh-shm/src/posix_shm/cleanup.rs new file mode 100644 index 0000000000..9b74f24a22 --- /dev/null +++ b/commons/zenoh-shm/src/posix_shm/cleanup.rs @@ -0,0 +1,134 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub(crate) use platform::cleanup_orphaned_segments; + +#[cfg(not(all(unix, not(target_os = "macos"))))] +mod platform { + pub(crate) fn cleanup_orphaned_segments() {} +} + +#[cfg(all(unix, not(target_os = "macos")))] +mod platform { + use std::{borrow::Borrow, collections::HashSet, fs, path::PathBuf}; + + use zenoh_result::ZResult; + + #[derive(PartialEq, Eq, Hash)] + struct ProcFdDir(PathBuf); + + impl ProcFdDir { + fn enumerate_fds(&self) -> ZResult> { + let fds = self.0.read_dir()?; + let fd_map: HashSet = fds + .filter_map(Result::ok) + .map(|f| std::convert::Into::::into(f.path())) + .collect(); + Ok(fd_map) + } + } + + impl From for ProcFdDir { + fn from(value: PathBuf) -> Self { + Self(value) + } + } + + #[derive(PartialEq, Eq, Hash)] + struct FdFile(PathBuf); + + impl From for FdFile { + fn from(value: PathBuf) -> Self { + Self(value) + } + } + + #[derive(PartialEq, Eq, Hash)] + struct ShmFile(PathBuf); + + impl ShmFile { + fn cleanup_file(self) { + let _ = std::fs::remove_file(self.0); + } + } + + impl Borrow for ShmFile { + fn borrow(&self) -> &PathBuf { + &self.0 + } + } + + impl From for ShmFile { + fn from(value: PathBuf) -> Self { + Self(value) + } + } + + pub(crate) fn cleanup_orphaned_segments() { + if let Err(e) = cleanup_orphaned_segments_inner() { + tracing::error!("Error performing orphaned SHM segments cleanup: {e}") + } + } + + fn enumerate_shm_files() -> ZResult> { + let shm_files = fs::read_dir("/dev/shm")?; + Ok(shm_files + .filter_map(Result::ok) + .filter_map(|f| { + if let Some(ext) = f.path().extension() { + if ext == "zenoh" { + return Some(std::convert::Into::::into(f.path())); + } + } + None + }) + .collect()) + } + + fn enumerate_proc_dirs() -> ZResult> { + let proc_dirs = fs::read_dir("/proc")?; + Ok(proc_dirs + .filter_map(Result::ok) + .map(|f| std::convert::Into::::into(f.path().join("fd"))) + .collect()) + } + + fn enumerate_proc_fds() -> ZResult> { + let mut fds = HashSet::default(); + let dirs = enumerate_proc_dirs()?; + for dir in dirs { + if let Ok(dir_fds) = dir.enumerate_fds() { + fds.extend(dir_fds); + } + } + Ok(fds) + } + + fn cleanup_orphaned_segments_inner() -> ZResult<()> { + let fd_map = enumerate_proc_fds()?; + let mut shm_map = enumerate_shm_files()?; + + for fd_file in fd_map { + if let Ok(resolved_link) = fd_file.0.read_link() { + shm_map.remove(&resolved_link); + } + } + + for shm_file_to_cleanup in shm_map { + shm_file_to_cleanup.cleanup_file(); + } + + Ok(()) + } +} diff --git a/commons/zenoh-shm/src/posix_shm/mod.rs b/commons/zenoh-shm/src/posix_shm/mod.rs index a63b1c9e6d..495077f9f0 100644 --- a/commons/zenoh-shm/src/posix_shm/mod.rs +++ b/commons/zenoh-shm/src/posix_shm/mod.rs @@ -14,3 +14,4 @@ pub mod array; tested_crate_module!(segment); +pub(crate) mod cleanup; diff --git a/commons/zenoh-shm/src/posix_shm/segment.rs b/commons/zenoh-shm/src/posix_shm/segment.rs index 6a34506029..0f82be5beb 100644 --- a/commons/zenoh-shm/src/posix_shm/segment.rs +++ b/commons/zenoh-shm/src/posix_shm/segment.rs @@ -106,7 +106,7 @@ where fn os_id(id: ID, id_prefix: &str) -> String { let os_id_str = format!("{id_prefix}_{id}"); let crc_os_id_str = ECMA.checksum(os_id_str.as_bytes()); - format!("{:x}", crc_os_id_str) + format!("{:x}.zenoh", crc_os_id_str) } pub fn as_ptr(&self) -> *mut u8 { diff --git a/examples/examples/z_bytes.rs b/examples/examples/z_bytes.rs index c75932ef36..808a8a2ec2 100644 --- a/examples/examples/z_bytes.rs +++ b/examples/examples/z_bytes.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - // // Copyright (c) 2024 ZettaScale Technology // @@ -13,7 +11,16 @@ use std::collections::HashMap; // Contributors: // ZettaScale Zenoh Team, // -use zenoh::bytes::ZBytes; +use std::{ + collections::HashMap, + str::FromStr, + time::{SystemTime, UNIX_EPOCH}, +}; + +use zenoh::{ + bytes::{Encoding, ZBytes}, + time::{Timestamp, TimestampId}, +}; fn main() { // Raw bytes @@ -100,6 +107,23 @@ fn main() { let output: (f64, String) = z_deserialize(&payload).unwrap(); assert_eq!(input, output); + // Zenoh types + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().into(); + let input = Timestamp::new(now, TimestampId::rand()); + let payload = z_serialize(&input); + let output: Timestamp = z_deserialize(&payload).unwrap(); + assert_eq!(input, output); + + let input = Encoding::TEXT_JSON; + let payload = z_serialize(&input); + let output: Encoding = z_deserialize(&payload).unwrap(); + assert_eq!(input, output); + + let input = Encoding::from_str("text/plain;foobar").unwrap(); + let payload = z_serialize(&input); + let output: Encoding = z_deserialize(&payload).unwrap(); + assert_eq!(input, output); + // Look at Serialize/Deserialize documentation for the exhaustive // list of provided implementations } diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 8a712d56db..16ed92bca3 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -141,7 +141,7 @@ impl StageIn { &mut self, msg: &mut NetworkMessage, priority: Priority, - deadline_before_drop: Option, + deadline_before_drop: Option>, ) -> bool { // Lock the current serialization batch. let mut c_guard = self.mutex.current(); @@ -163,7 +163,7 @@ impl StageIn { Some(deadline) if !$fragment => { // We are in the congestion scenario and message is droppable // Wait for an available batch until deadline - if !self.s_ref.wait_deadline(deadline) { + if !deadline.map_or(false, |deadline| self.s_ref.wait_deadline(deadline)) { // Still no available batch. // Restore the sequence number and drop the message $restore_sn; @@ -628,7 +628,11 @@ impl TransmissionPipelineProducer { }; // If message is droppable, compute a deadline after which the sample could be dropped let deadline_before_drop = if msg.is_droppable() { - Some(Instant::now() + self.wait_before_drop) + if self.wait_before_drop.is_zero() { + Some(None) + } else { + Some(Some(Instant::now() + self.wait_before_drop)) + } } else { None }; diff --git a/io/zenoh-transport/src/lib.rs b/io/zenoh-transport/src/lib.rs index e603563b6e..f004b4d511 100644 --- a/io/zenoh-transport/src/lib.rs +++ b/io/zenoh-transport/src/lib.rs @@ -82,7 +82,6 @@ impl TransportEventHandler for DummyTransportEventHandler { /*************************************/ pub trait TransportMulticastEventHandler: Send + Sync { fn new_peer(&self, peer: TransportPeer) -> ZResult>; - fn closing(&self); fn closed(&self); fn as_any(&self) -> &dyn Any; } @@ -95,7 +94,6 @@ impl TransportMulticastEventHandler for DummyTransportMulticastEventHandler { fn new_peer(&self, _peer: TransportPeer) -> ZResult> { Ok(Arc::new(DummyTransportPeerEventHandler)) } - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { self @@ -121,7 +119,6 @@ pub trait TransportPeerEventHandler: Send + Sync { fn handle_message(&self, msg: NetworkMessage) -> ZResult<()>; fn new_link(&self, src: Link); fn del_link(&self, link: Link); - fn closing(&self); fn closed(&self); fn as_any(&self) -> &dyn Any; } @@ -137,7 +134,6 @@ impl TransportPeerEventHandler for DummyTransportPeerEventHandler { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index 36b9dbbea0..bcccaa9a85 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -178,11 +178,7 @@ impl TransportMulticastInner { pub(super) async fn delete(&self) -> ZResult<()> { tracing::debug!("Closing multicast transport on {:?}", self.locator); - // Notify the callback that we are going to close the transport let callback = zwrite!(self.callback).take(); - if let Some(cb) = callback.as_ref() { - cb.closing(); - } // Delete the transport on the manager let _ = self.manager.del_transport_multicast(&self.locator).await; @@ -441,7 +437,6 @@ impl TransportMulticastInner { // TODO(yuyuan): Unify the termination peer.token.cancel(); - peer.handler.closing(); drop(guard); peer.handler.closed(); } diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index c602dcf806..69d88af636 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -117,12 +117,7 @@ impl TransportUnicastLowlatency { // to avoid concurrent new_transport and closing/closed notifications let mut a_guard = self.get_alive().await; *a_guard = false; - - // Notify the callback that we are going to close the transport let callback = zwrite!(self.callback).take(); - if let Some(cb) = callback.as_ref() { - cb.closing(); - } // Delete the transport on the manager let _ = self.manager.del_transport_unicast(&self.config.zid).await; diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index f01a4a8f18..fdaadaea66 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -129,12 +129,7 @@ impl TransportUnicastUniversal { // to avoid concurrent new_transport and closing/closed notifications let mut a_guard = self.get_alive().await; *a_guard = false; - - // Notify the callback that we are going to close the transport let callback = zwrite!(self.callback).take(); - if let Some(cb) = callback.as_ref() { - cb.closing(); - } // Delete the transport on the manager let _ = self.manager.del_transport_unicast(&self.config.zid).await; diff --git a/io/zenoh-transport/tests/endpoints.rs b/io/zenoh-transport/tests/endpoints.rs index 7fe2f949ef..3ebb015981 100644 --- a/io/zenoh-transport/tests/endpoints.rs +++ b/io/zenoh-transport/tests/endpoints.rs @@ -62,7 +62,6 @@ impl TransportPeerEventHandler for SC { } fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/multicast_compression.rs b/io/zenoh-transport/tests/multicast_compression.rs index 3b8715c0df..5e31aa6514 100644 --- a/io/zenoh-transport/tests/multicast_compression.rs +++ b/io/zenoh-transport/tests/multicast_compression.rs @@ -111,7 +111,6 @@ mod tests { count: self.count.clone(), })) } - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { @@ -127,7 +126,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/multicast_transport.rs b/io/zenoh-transport/tests/multicast_transport.rs index 664de47ffb..18c8468ecc 100644 --- a/io/zenoh-transport/tests/multicast_transport.rs +++ b/io/zenoh-transport/tests/multicast_transport.rs @@ -110,7 +110,6 @@ mod tests { count: self.count.clone(), })) } - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { @@ -126,7 +125,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/transport_whitelist.rs b/io/zenoh-transport/tests/transport_whitelist.rs index f8428b457d..66f5b58e3b 100644 --- a/io/zenoh-transport/tests/transport_whitelist.rs +++ b/io/zenoh-transport/tests/transport_whitelist.rs @@ -58,7 +58,6 @@ impl TransportPeerEventHandler for SCRouter { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_authenticator.rs b/io/zenoh-transport/tests/unicast_authenticator.rs index a9b22ad5bb..77f025717d 100644 --- a/io/zenoh-transport/tests/unicast_authenticator.rs +++ b/io/zenoh-transport/tests/unicast_authenticator.rs @@ -70,7 +70,6 @@ impl TransportPeerEventHandler for MHRouterAuthenticator { } fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_compression.rs b/io/zenoh-transport/tests/unicast_compression.rs index 7b18983b4b..b7f34fbf7f 100644 --- a/io/zenoh-transport/tests/unicast_compression.rs +++ b/io/zenoh-transport/tests/unicast_compression.rs @@ -110,7 +110,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { @@ -150,7 +149,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_concurrent.rs b/io/zenoh-transport/tests/unicast_concurrent.rs index ea124e1c05..410ac33955 100644 --- a/io/zenoh-transport/tests/unicast_concurrent.rs +++ b/io/zenoh-transport/tests/unicast_concurrent.rs @@ -98,7 +98,6 @@ impl TransportPeerEventHandler for MHPeer { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_intermittent.rs b/io/zenoh-transport/tests/unicast_intermittent.rs index 9564eeb865..3e2f8196f4 100644 --- a/io/zenoh-transport/tests/unicast_intermittent.rs +++ b/io/zenoh-transport/tests/unicast_intermittent.rs @@ -138,7 +138,6 @@ impl TransportPeerEventHandler for SCClient { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_priorities.rs b/io/zenoh-transport/tests/unicast_priorities.rs index d71cc845cc..87cf5b5e9e 100644 --- a/io/zenoh-transport/tests/unicast_priorities.rs +++ b/io/zenoh-transport/tests/unicast_priorities.rs @@ -133,7 +133,6 @@ impl TransportPeerEventHandler for SCRouter { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { @@ -183,7 +182,6 @@ impl TransportPeerEventHandler for SCClient { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_shm.rs b/io/zenoh-transport/tests/unicast_shm.rs index db5f719665..b03771a164 100644 --- a/io/zenoh-transport/tests/unicast_shm.rs +++ b/io/zenoh-transport/tests/unicast_shm.rs @@ -141,7 +141,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_simultaneous.rs b/io/zenoh-transport/tests/unicast_simultaneous.rs index 248ff2ef53..97d43fc672 100644 --- a/io/zenoh-transport/tests/unicast_simultaneous.rs +++ b/io/zenoh-transport/tests/unicast_simultaneous.rs @@ -126,7 +126,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index 5a414d664d..e8f473b754 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -296,7 +296,6 @@ impl TransportPeerEventHandler for SCRouter { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { @@ -336,7 +335,6 @@ impl TransportPeerEventHandler for SCClient { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index d708dc73fb..a254252bf3 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -34,7 +34,7 @@ use futures::StreamExt; use http_types::Method; use serde::{Deserialize, Serialize}; use tide::{http::Mime, sse::Sender, Request, Response, Server, StatusCode}; -use tokio::time::timeout; +use tokio::{task::JoinHandle, time::timeout}; use zenoh::{ bytes::{Encoding, ZBytes}, internal::{ @@ -72,6 +72,7 @@ lazy_static::lazy_static! { .build() .expect("Unable to create runtime"); } + #[inline(always)] pub(crate) fn blockon_runtime(task: F) -> F::Output { // Check whether able to get the current runtime @@ -87,6 +88,24 @@ pub(crate) fn blockon_runtime(task: F) -> F::Output { } } +pub(crate) fn spawn_runtime(task: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), spawn on the current runtime + rt.spawn(task) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), spawn on the global runtime + TOKIO_RUNTIME.spawn(task) + } + } +} + #[derive(Serialize, Deserialize)] struct JSONSample { key: String, @@ -274,15 +293,15 @@ impl Plugin for RestPlugin { MAX_BLOCK_THREAD_NUM.store(conf.max_block_thread_num, Ordering::SeqCst); let task = run(runtime.clone(), conf.clone()); - let task = blockon_runtime(async { - timeout(Duration::from_millis(1), TOKIO_RUNTIME.spawn(task)).await - }); + let task = + blockon_runtime(async { timeout(Duration::from_millis(1), spawn_runtime(task)).await }); - // The spawn task (TOKIO_RUNTIME.spawn(task)) should not return immediately. The server should block inside. + // The spawn task (spawn_runtime(task)).await) should not return immediately. The server should block inside. // If it returns immediately (for example, address already in use), we can get the error inside Ok if let Ok(Ok(Err(e))) = task { bail!("REST server failed within 1ms: {e}") } + Ok(Box::new(RunningPlugin(conf))) } } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs index c30f7b89c4..c4f49ee154 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs @@ -15,18 +15,20 @@ use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; +use tokio::task::JoinHandle; use zenoh::{ bytes::ZBytes, internal::Value, - key_expr::OwnedKeyExpr, + key_expr::{format::keformat, OwnedKeyExpr}, query::{ConsolidationMode, Query, Selector}, sample::{Sample, SampleKind}, + session::ZenohId, }; use zenoh_backend_traits::StorageInsertionResult; use super::{ classification::{IntervalIdx, SubIntervalIdx}, - core::Replication, + core::{aligner_key_expr_formatter, Replication}, digest::{DigestDiff, Fingerprint}, log::EventMetadata, }; @@ -48,6 +50,8 @@ use super::{ /// hence directly skipping to the `SubIntervals` variant. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub(crate) enum AlignmentQuery { + Discovery, + All, Diff(DigestDiff), Intervals(HashSet), SubIntervals(HashMap>), @@ -64,6 +68,7 @@ pub(crate) enum AlignmentQuery { /// Not all replies are made, it depends on the Era when a misalignment was detected. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub(crate) enum AlignmentReply { + Discovery(ZenohId), Intervals(HashMap), SubIntervals(HashMap>), Events(Vec), @@ -97,6 +102,47 @@ impl Replication { }; match alignment_query { + AlignmentQuery::Discovery => { + tracing::trace!("Processing `AlignmentQuery::Discovery`"); + reply_to_query( + &query, + AlignmentReply::Discovery(self.zenoh_session.zid()), + None, + ) + .await; + } + AlignmentQuery::All => { + tracing::trace!("Processing `AlignmentQuery::All`"); + + let idx_intervals = self + .replication_log + .read() + .await + .intervals + .keys() + .copied() + .collect::>(); + + for interval_idx in idx_intervals { + let mut events_to_send = Vec::default(); + if let Some(interval) = self + .replication_log + .read() + .await + .intervals + .get(&interval_idx) + { + interval.sub_intervals.values().for_each(|sub_interval| { + events_to_send.extend(sub_interval.events.values().map(Into::into)); + }); + } + + // NOTE: As we took the lock in the `if let` block, it is released here, + // diminishing contention. + + self.reply_events(&query, events_to_send).await; + } + } AlignmentQuery::Diff(digest_diff) => { tracing::trace!("Processing `AlignmentQuery::Diff`"); if digest_diff.cold_eras_differ { @@ -258,6 +304,11 @@ impl Replication { /// is the reason why we need the consolidation to set to be `None` (⚠️). pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec) { for event_metadata in events_to_retrieve { + if event_metadata.action == SampleKind::Delete { + reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await; + continue; + } + let stored_data = { let mut storage = self.storage.lock().await; match storage.get(event_metadata.stripped_key.clone(), "").await { @@ -319,7 +370,7 @@ impl Replication { &self, replica_aligner_ke: OwnedKeyExpr, alignment_query: AlignmentQuery, - ) { + ) -> JoinHandle<()> { let replication = self.clone(); tokio::task::spawn(async move { let attachment = match bincode::serialize(&alignment_query) { @@ -330,17 +381,29 @@ impl Replication { } }; + // NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies are + // sent, they will be "consolidated" and only one of them will make it through. + // + // When we retrieve Samples from a Replica, each Sample is sent in a separate + // reply. Hence the need to have no consolidation. + let mut consolidation = ConsolidationMode::None; + + if matches!(alignment_query, AlignmentQuery::Discovery) { + // NOTE: `Monotonic` means that Zenoh will forward the first answer it receives (and + // ensure that later answers are with a higher timestamp — we do not care + // about that last aspect). + // + // By setting the consolidation to this value when performing the initial + // alignment, we select the most reactive Replica (hopefully the closest as + // well). + consolidation = ConsolidationMode::Monotonic; + } + match replication .zenoh_session .get(Into::::into(replica_aligner_ke.clone())) .attachment(attachment) - // NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies - // are sent, they will be "consolidated" and only one of them will make it - // through. - // - // When we retrieve Samples from a Replica, each Sample is sent in a separate - // reply. Hence the need to have no consolidation. - .consolidation(ConsolidationMode::None) + .consolidation(consolidation) .await { Err(e) => { @@ -384,10 +447,17 @@ impl Replication { sample, ) .await; + + // The consolidation mode `Monotonic`, used for sending out an + // `AlignmentQuery::Discovery`, will keep on sending replies. We only want + // to discover / align with a single Replica so we break here. + if matches!(alignment_query, AlignmentQuery::Discovery) { + return; + } } } } - }); + }) } /// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is @@ -435,6 +505,39 @@ impl Replication { sample: Sample, ) { match alignment_reply { + AlignmentReply::Discovery(replica_zid) => { + let parsed_ke = match aligner_key_expr_formatter::parse(&replica_aligner_ke) { + Ok(ke) => ke, + Err(e) => { + tracing::error!( + "Failed to parse < {replica_aligner_ke} > as a valid Aligner key \ + expression: {e:?}" + ); + return; + } + }; + + let replica_aligner_ke = match keformat!( + aligner_key_expr_formatter::formatter(), + hash_configuration = parsed_ke.hash_configuration(), + zid = replica_zid, + ) { + Ok(ke) => ke, + Err(e) => { + tracing::error!("Failed to generate a valid Aligner key expression: {e:?}"); + return; + } + }; + + tracing::debug!("Performing initial alignment with Replica < {replica_zid} >"); + + if let Err(e) = self + .spawn_query_replica_aligner(replica_aligner_ke, AlignmentQuery::All) + .await + { + tracing::error!("Error returned while performing the initial alignment: {e:?}"); + } + } AlignmentReply::Intervals(replica_intervals) => { tracing::trace!("Processing `AlignmentReply::Intervals`"); let intervals_diff = { @@ -613,18 +716,26 @@ impl Replication { } } - if matches!( - self.storage - .lock() - .await - .put( - replica_event.stripped_key.clone(), - sample.into(), - replica_event.timestamp, - ) - .await, - Ok(StorageInsertionResult::Outdated) | Err(_) - ) { + // NOTE: This code can only be called with `action` set to `delete` on an initial + // alignment, in which case the Storage of the receiving Replica is empty => there + // is no need to actually call `storage.delete`. + // + // Outside of an initial alignment, the `delete` action will be performed at the + // step above, in `AlignmentReply::Events`. + if replica_event.action == SampleKind::Put + && matches!( + self.storage + .lock() + .await + .put( + replica_event.stripped_key.clone(), + sample.into(), + replica_event.timestamp, + ) + .await, + Ok(StorageInsertionResult::Outdated) | Err(_) + ) + { return; } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs index 12aba06521..58559427f6 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs @@ -34,16 +34,12 @@ use zenoh::{ }; use zenoh_backend_traits::Storage; -use super::{ - digest::Digest, - log::LogLatest, - service::{MAX_RETRY, WAIT_PERIOD_SECS}, -}; +use super::{digest::Digest, log::LogLatest}; use crate::{replication::aligner::AlignmentQuery, storages_mgt::LatestUpdates}; kedefine!( - pub digest_key_expr_formatter: "@-digest/${zid:*}/${storage_ke:**}", - pub aligner_key_expr_formatter: "@zid/${zid:*}/${storage_ke:**}/aligner", + pub digest_key_expr_formatter: "@-digest/${zid:*}/${hash_configuration:*}", + pub aligner_key_expr_formatter: "@zid/${zid:*}/${hash_configuration:*}/aligner", ); #[derive(Clone)] @@ -56,6 +52,63 @@ pub(crate) struct Replication { } impl Replication { + /// Performs an initial alignment, skipping the comparison of Digest, asking directly the first + /// discovered Replica for all its entries. + /// + /// # ⚠️ Assumption: empty Storage + /// + /// We assume that this method will only be called if the underlying Storage is empty. This has + /// at least one consequence: if the Aligner receives a `delete` event from the Replica, it will + /// not attempt to delete anything from the Storage. + /// + /// # Replica discovery + /// + /// To discover a Replica, this method will create a Digest subscriber, wait to receive a + /// *valid* Digest and, upon reception, ask that Replica for all its entries. + /// + /// To avoid waiting indefinitely (in case there are no other Replica on the network), the + /// subscriber will wait for, at most, the duration of two Intervals. + pub(crate) async fn initial_alignment(&self) { + let ke_all_replicas = match keformat!( + aligner_key_expr_formatter::formatter(), + hash_configuration = *self + .replication_log + .read() + .await + .configuration + .fingerprint(), + zid = "*", + ) { + Ok(ke) => ke, + Err(e) => { + tracing::error!( + "Failed to generate key expression to query all Replicas: {e:?}. Skipping \ + initial alignment." + ); + return; + } + }; + + // NOTE: As discussed with @OlivierHecart, the plugins do not wait for the duration of the + // "scouting delay" before performing any Zenoh operation. Hence, we manually enforce this + // delay when performing the initial alignment. + let delay = self + .zenoh_session + .config() + .lock() + .scouting + .delay() + .unwrap_or(500); + tokio::time::sleep(Duration::from_millis(delay)).await; + + if let Err(e) = self + .spawn_query_replica_aligner(ke_all_replicas, AlignmentQuery::Discovery) + .await + { + tracing::error!("Initial alignment failed with: {e:?}"); + } + } + /// Spawns a task that periodically publishes the [Digest] of the Replication [Log]. /// /// This task will perform the following steps: @@ -68,16 +121,20 @@ impl Replication { /// /// [Log]: crate::replication::log::LogLatest pub(crate) fn spawn_digest_publisher(&self) -> JoinHandle<()> { - let zenoh_session = self.zenoh_session.clone(); - let storage_key_expr = self.storage_key_expr.clone(); - let replication_log = self.replication_log.clone(); - let latest_updates = self.latest_updates.clone(); + let replication = self.clone(); tokio::task::spawn(async move { + let configuration = replication + .replication_log + .read() + .await + .configuration + .clone(); + let digest_key_put = match keformat!( digest_key_expr_formatter::formatter(), - zid = zenoh_session.zid(), - storage_ke = storage_key_expr + zid = replication.zenoh_session.zid(), + hash_configuration = *configuration.fingerprint(), ) { Ok(key) => key, Err(e) => { @@ -88,25 +145,14 @@ impl Replication { } }; - // Scope to not forget to release the lock. - let (publication_interval, propagation_delay, last_elapsed_interval) = { - let replication_log_guard = replication_log.read().await; - let configuration = replication_log_guard.configuration(); - let last_elapsed_interval = match configuration.last_elapsed_interval() { - Ok(idx) => idx, - Err(e) => { - tracing::error!( - "Fatal error, call to `last_elapsed_interval` failed with: {e:?}" - ); - return; - } - }; - - ( - configuration.interval, - configuration.propagation_delay, - last_elapsed_interval, - ) + let last_elapsed_interval = match configuration.last_elapsed_interval() { + Ok(idx) => idx, + Err(e) => { + tracing::error!( + "Fatal error, call to `last_elapsed_interval` failed with: {e:?}" + ); + return; + } }; // We have no control over when a replica is going to be started. The purpose is here @@ -114,7 +160,7 @@ impl Replication { // at every interval (+ δ). let duration_until_next_interval = { let millis_last_elapsed = - *last_elapsed_interval as u128 * publication_interval.as_millis(); + *last_elapsed_interval as u128 * configuration.interval.as_millis(); if millis_last_elapsed > u64::MAX as u128 { tracing::error!( @@ -137,7 +183,7 @@ impl Replication { }; Duration::from_millis( - (publication_interval.as_millis() - (millis_since_now - millis_last_elapsed)) + (configuration.interval.as_millis() - (millis_since_now - millis_last_elapsed)) as u64, ) }; @@ -147,7 +193,7 @@ impl Replication { let mut events = HashMap::default(); // Internal delay to avoid an "update storm". - let max_publication_delay = (publication_interval.as_millis() / 3) as u64; + let max_publication_delay = (configuration.interval.as_millis() / 3) as u64; let mut digest_update_start: Instant; let mut digest: Digest; @@ -159,15 +205,15 @@ impl Replication { // Except that we want to take into account the time it takes for a publication to // reach this Zenoh node. Hence, we sleep for `propagation_delay` to, hopefully, // catch the publications that are in transit. - tokio::time::sleep(propagation_delay).await; + tokio::time::sleep(configuration.propagation_delay).await; { - let mut latest_updates_guard = latest_updates.write().await; + let mut latest_updates_guard = replication.latest_updates.write().await; std::mem::swap(&mut events, &mut latest_updates_guard); } { - let mut replication_guard = replication_log.write().await; + let mut replication_guard = replication.replication_log.write().await; replication_guard.update(events.drain().map(|(_, event)| event)); digest = match replication_guard.digest() { Ok(digest) => digest, @@ -193,7 +239,8 @@ impl Replication { // buffer that, hopefully, has enough memory. let buffer_capacity = serialization_buffer.capacity(); - match zenoh_session + match replication + .zenoh_session .put( &digest_key_put, std::mem::replace( @@ -208,17 +255,17 @@ impl Replication { } let digest_update_duration = digest_update_start.elapsed(); - if digest_update_duration > publication_interval { + if digest_update_duration > configuration.interval { tracing::warn!( "The duration it took to update and publish the Digest is superior to the \ duration of an Interval ({} ms), we recommend increasing the duration of \ the latter. Digest update: {} ms (incl. delay: {} ms)", - publication_interval.as_millis(), + configuration.interval.as_millis(), digest_update_duration.as_millis(), - publication_delay + propagation_delay.as_millis() as u64 + publication_delay + configuration.propagation_delay.as_millis() as u64 ); } else { - tokio::time::sleep(publication_interval - digest_update_duration).await; + tokio::time::sleep(configuration.interval - digest_update_duration).await; } } }) @@ -232,16 +279,20 @@ impl Replication { /// /// [DigestDiff]: super::digest::DigestDiff pub(crate) fn spawn_digest_subscriber(&self) -> JoinHandle<()> { - let zenoh_session = self.zenoh_session.clone(); - let storage_key_expr = self.storage_key_expr.clone(); - let replication_log = self.replication_log.clone(); let replication = self.clone(); tokio::task::spawn(async move { + let configuration = replication + .replication_log + .read() + .await + .configuration + .clone(); + let digest_key_sub = match keformat!( digest_key_expr_formatter::formatter(), zid = "*", - storage_ke = &storage_key_expr + hash_configuration = *configuration.fingerprint() ) { Ok(key) => key, Err(e) => { @@ -254,33 +305,22 @@ impl Replication { } }; - let mut retry = 0; - let subscriber = loop { - match zenoh_session + let subscriber = match replication + .zenoh_session .declare_subscriber(&digest_key_sub) // NOTE: We need to explicitly set the locality to `Remote` as otherwise the // Digest subscriber will also receive the Digest published by its own // Digest publisher. .allowed_origin(Locality::Remote) .await - { - Ok(subscriber) => break subscriber, - Err(e) => { - if retry < MAX_RETRY { - retry += 1; - tracing::warn!( - "Failed to declare Digest subscriber: {e:?}. Attempt \ - {retry}/{MAX_RETRY}." - ); - tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; - } else { - tracing::error!( - "Could not declare Digest subscriber. The storage will not \ - receive the Replication Digest of other replicas." - ); - return; - } - } + { + Ok(subscriber) => subscriber, + Err(e) => { + tracing::error!( + "Could not declare Digest subscriber: {e:?}. The storage will not receive \ + the Replication Digest of other replicas." + ); + return; } }; @@ -323,7 +363,7 @@ impl Replication { tracing::debug!("Replication digest received"); - let digest = match replication_log.read().await.digest() { + let digest = match replication.replication_log.read().await.digest() { Ok(digest) => digest, Err(e) => { tracing::error!( @@ -338,7 +378,7 @@ impl Replication { let replica_aligner_ke = match keformat!( aligner_key_expr_formatter::formatter(), - storage_ke = &storage_key_expr, + hash_configuration = *configuration.fingerprint(), zid = source_zid, ) { Ok(key) => key, @@ -371,15 +411,20 @@ impl Replication { /// responsible for fetching in the Replication Log or in the Storage the relevant information /// to send to the Replica such that it can align its own Storage. pub(crate) fn spawn_aligner_queryable(&self) -> JoinHandle<()> { - let zenoh_session = self.zenoh_session.clone(); - let storage_key_expr = self.storage_key_expr.clone(); let replication = self.clone(); tokio::task::spawn(async move { + let configuration = replication + .replication_log + .read() + .await + .configuration + .clone(); + let aligner_ke = match keformat!( aligner_key_expr_formatter::formatter(), - zid = zenoh_session.zid(), - storage_ke = storage_key_expr, + zid = replication.zenoh_session.zid(), + hash_configuration = *configuration.fingerprint(), ) { Ok(ke) => ke, Err(e) => { @@ -391,30 +436,19 @@ impl Replication { } }; - let mut retry = 0; - let queryable = loop { - match zenoh_session - .declare_queryable(&aligner_ke) - .allowed_origin(Locality::Remote) - .await - { - Ok(queryable) => break queryable, - Err(e) => { - if retry < MAX_RETRY { - retry += 1; - tracing::warn!( - "Failed to declare the Aligner queryable: {e:?}. Attempt \ - {retry}/{MAX_RETRY}." - ); - tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; - } else { - tracing::error!( - "Could not declare the Aligner queryable. This storage will NOT \ - align with other replicas." - ); - return; - } - } + let queryable = match replication + .zenoh_session + .declare_queryable(&aligner_ke) + .allowed_origin(Locality::Remote) + .await + { + Ok(queryable) => queryable, + Err(e) => { + tracing::error!( + "Could not declare the Aligner queryable: {e:?}. This storage will NOT \ + align with other replicas." + ); + return; } }; diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs index 06ec31d9a2..e48fb4fecd 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs @@ -12,13 +12,13 @@ // ZettaScale Zenoh Team, // -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tokio::{ sync::{broadcast::Receiver, RwLock}, task::JoinHandle, }; -use zenoh::{key_expr::OwnedKeyExpr, query::QueryTarget, sample::Locality, session::Session}; +use zenoh::{key_expr::OwnedKeyExpr, session::Session}; use super::{core::Replication, LogLatest}; use crate::storages_mgt::{LatestUpdates, StorageMessage, StorageService}; @@ -29,84 +29,53 @@ pub(crate) struct ReplicationService { aligner_queryable_handle: JoinHandle<()>, } -pub(crate) const MAX_RETRY: usize = 2; -pub(crate) const WAIT_PERIOD_SECS: u64 = 4; - impl ReplicationService { /// Starts the `ReplicationService`, spawning multiple tasks. /// + /// # Initial alignment + /// + /// To optimise network resources, if the Storage is empty an "initial alignment" will be + /// performed: if a Replica is detected, a query will be made to retrieve the entire content of + /// its Storage. + /// /// # Tasks spawned /// - /// This function will spawn two tasks: + /// This function will spawn four long-lived tasks: /// 1. One to publish the [Digest]. - /// 2. One to wait on the provided [Receiver] in order to stop the Replication Service, + /// 2. One to receive the [Digest] of other Replica. + /// 3. One to receive alignment queries of other Replica. + /// 4. One to wait on the provided [Receiver] in order to stop the Replication Service, /// attempting to abort all the tasks that were spawned, once a Stop message has been /// received. pub async fn spawn_start( zenoh_session: Arc, - storage_service: StorageService, + storage_service: &StorageService, storage_key_expr: OwnedKeyExpr, replication_log: Arc>, latest_updates: Arc>, mut rx: Receiver, ) { - // We perform a "wait-try" policy because Zenoh needs some time to propagate the routing - // information and, here, we need to have the queryables propagated. - // - // 4 seconds is an arbitrary value. - let mut attempt = 0; - let mut received_reply = false; - - while attempt < MAX_RETRY { - attempt += 1; - tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; - - match zenoh_session - .get(&storage_key_expr) - // `BestMatching`, the default option for `target`, will try to minimise the storage - // that are queried and their distance while trying to maximise the key space - // covered. - // - // In other words, if there is a close and complete storage, it will only query this - // one. - .target(QueryTarget::BestMatching) - // The value `Remote` is self-explanatory but why it is needed deserves an - // explanation: we do not want to query the local database as the purpose is to get - // the data from other replicas (if there is one). - .allowed_destination(Locality::Remote) - .await - { - Ok(replies) => { - while let Ok(reply) = replies.recv_async().await { - received_reply = true; - if let Ok(sample) = reply.into_result() { - if let Err(e) = storage_service.process_sample(sample).await { - tracing::error!("{e:?}"); - } - } - } - } - Err(e) => tracing::error!("Initial alignment Query failed with: {e:?}"), - } + let storage = storage_service.storage.clone(); - if received_reply { - break; - } + let replication = Replication { + zenoh_session, + replication_log, + storage_key_expr, + latest_updates, + storage, + }; - tracing::debug!( - "Found no Queryable matching '{storage_key_expr}'. Attempt {attempt}/{MAX_RETRY}." - ); + if replication + .replication_log + .read() + .await + .intervals + .is_empty() + { + replication.initial_alignment().await; } tokio::task::spawn(async move { - let replication = Replication { - zenoh_session, - replication_log, - storage_key_expr, - latest_updates, - storage: storage_service.storage.clone(), - }; - let replication_service = Self { digest_publisher_handle: replication.spawn_digest_publisher(), digest_subscriber_handle: replication.spawn_digest_subscriber(), @@ -122,7 +91,7 @@ impl ReplicationService { }); } - /// Stops all the tasks spawned by the `ReplicationService`. + /// Stops all the long-lived tasks spawned by the `ReplicationService`. pub fn stop(self) { self.digest_publisher_handle.abort(); self.digest_subscriber_handle.abort(); diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs index 7dcc60bf32..3e5a2e1f09 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs @@ -68,9 +68,8 @@ pub(crate) async fn create_and_start_storage( let storage_name = parts[7]; let name = format!("{uuid}/{storage_name}"); - tracing::trace!("Start storage '{}' on keyexpr '{}'", name, config.key_expr); - let (tx, rx_storage) = tokio::sync::broadcast::channel(1); + let rx_replication = tx.subscribe(); let mut entries = match storage.get_all_entries().await { Ok(entries) => entries @@ -111,50 +110,52 @@ pub(crate) async fn create_and_start_storage( let latest_updates = Arc::new(RwLock::new(latest_updates)); let storage = Arc::new(Mutex::new(storage)); - let storage_service = StorageService::start( - zenoh_session.clone(), - config.clone(), - &name, - storage, - capability, - rx_storage, - CacheLatest::new(latest_updates.clone(), replication_log.clone()), - ) - .await; - - // Testing if the `replication_log` is set is equivalent to testing if the `replication` is - // set: the `replication_log` is only set when the latter is. - if let Some(replication_log) = replication_log { - let rx_replication = tx.subscribe(); - - // NOTE Although the function `ReplicationService::spawn_start` spawns its own tasks, we - // still need to call it within a dedicated task because the Zenoh routing tables are - // populated only after the plugins have been loaded. - // - // If we don't wait for the routing tables to be populated the initial alignment - // (i.e. querying any Storage on the network handling the same key expression), will - // never work. - // - // TODO Do we really want to perform such an initial alignment? Because this query will - // target any Storage that matches the same key expression, regardless of if they have - // been configured to be replicated. - tokio::task::spawn(async move { + + // NOTE The StorageService method `start_storage_queryable_subscriber` does not spawn its own + // task to loop/wait on the Subscriber and Queryable it creates. Thus we spawn the task + // here. + // + // Doing so also allows us to return early from the creation of the Storage, creation which + // blocks populating the routing tables. + // + // TODO Do we really want to perform such an initial alignment? Because this query will + // target any Storage that matches the same key expression, regardless of if they have + // been configured to be replicated. + tokio::task::spawn(async move { + let storage_service = StorageService::new( + zenoh_session.clone(), + config.clone(), + &name, + storage, + capability, + CacheLatest::new(latest_updates.clone(), replication_log.clone()), + ) + .await; + + // Testing if the `replication_log` is set is equivalent to testing if the `replication` is + // set: the `replication_log` is only set when the latter is. + if let Some(replication_log) = replication_log { tracing::debug!( "Starting replication of storage '{}' on keyexpr '{}'", name, config.key_expr, ); + ReplicationService::spawn_start( zenoh_session, - storage_service, + &storage_service, config.key_expr, replication_log, latest_updates, rx_replication, ) .await; - }); - } + } + + storage_service + .start_storage_queryable_subscriber(rx_storage) + .await; + }); Ok(tx) } diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index d55f574d7f..8ae3d77634 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -61,10 +61,8 @@ struct Update { #[derive(Clone)] pub struct StorageService { session: Arc, - key_expr: OwnedKeyExpr, - complete: bool, + configuration: StorageConfig, name: String, - strip_prefix: Option, pub(crate) storage: Arc>>, capability: Capability, tombstones: Arc>>, @@ -73,21 +71,18 @@ pub struct StorageService { } impl StorageService { - pub async fn start( + pub async fn new( session: Arc, config: StorageConfig, name: &str, storage: Arc>>, capability: Capability, - rx: Receiver, cache_latest: CacheLatest, ) -> Self { let storage_service = StorageService { session, - key_expr: config.key_expr, - complete: config.complete, + configuration: config, name: name.to_string(), - strip_prefix: config.strip_prefix, storage, capability, tombstones: Arc::new(RwLock::new(KeBoxTree::default())), @@ -121,22 +116,16 @@ impl StorageService { } } } - storage_service - .clone() - .start_storage_queryable_subscriber(rx, config.garbage_collection_config) - .await; storage_service } - async fn start_storage_queryable_subscriber( - self, - mut rx: Receiver, - gc_config: GarbageCollectionConfig, - ) { + pub(crate) async fn start_storage_queryable_subscriber(self, mut rx: Receiver) { // start periodic GC event let t = Timer::default(); + let gc_config = self.configuration.garbage_collection_config.clone(); + let latest_updates = if self.cache_latest.replication_log.is_none() { Some(self.cache_latest.latest_updates.clone()) } else { @@ -154,8 +143,10 @@ impl StorageService { ); t.add_async(gc).await; + let storage_key_expr = &self.configuration.key_expr; + // subscribe on key_expr - let storage_sub = match self.session.declare_subscriber(&self.key_expr).await { + let storage_sub = match self.session.declare_subscriber(storage_key_expr).await { Ok(storage_sub) => storage_sub, Err(e) => { tracing::error!("Error starting storage '{}': {}", self.name, e); @@ -166,8 +157,8 @@ impl StorageService { // answer to queries on key_expr let storage_queryable = match self .session - .declare_queryable(&self.key_expr) - .complete(self.complete) + .declare_queryable(storage_key_expr) + .complete(self.configuration.complete) .await { Ok(storage_queryable) => storage_queryable, @@ -177,6 +168,12 @@ impl StorageService { } }; + tracing::debug!( + "Starting storage '{}' on keyexpr '{}'", + self.name, + storage_key_expr + ); + tokio::task::spawn(async move { loop { tokio::select!( @@ -249,6 +246,8 @@ impl StorageService { matching_keys ); + let prefix = self.configuration.strip_prefix.as_ref(); + for k in matching_keys { if self.is_deleted(&k, sample_timestamp).await { tracing::trace!("Skipping Sample < {} > deleted later on", k); @@ -298,13 +297,12 @@ impl StorageService { } }; - let stripped_key = - match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) { - Ok(stripped) => stripped, - Err(e) => { - bail!("{e:?}"); - } - }; + let stripped_key = match crate::strip_prefix(prefix, sample_to_store.key_expr()) { + Ok(stripped) => stripped, + Err(e) => { + bail!("{e:?}"); + } + }; // If the Storage was declared as only keeping the Latest value, we ensure that, for // each received Sample, it is indeed the Latest value that is processed. @@ -437,19 +435,21 @@ impl StorageService { let wildcards = self.wildcard_updates.read().await; let mut ts = timestamp; let mut update = None; + + let prefix = self.configuration.strip_prefix.as_ref(); + for node in wildcards.intersecting_keys(key_expr) { let weight = wildcards.weight_at(&node); if weight.is_some() && weight.unwrap().data.timestamp > *ts { // if the key matches a wild card update, check whether it was saved in storage // remember that wild card updates change only existing keys - let stripped_key = - match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) { - Ok(stripped) => stripped, - Err(e) => { - tracing::error!("{}", e); - break; - } - }; + let stripped_key = match crate::strip_prefix(prefix, &key_expr.into()) { + Ok(stripped) => stripped, + Err(e) => { + tracing::error!("{}", e); + break; + } + }; let mut storage = self.storage.lock().await; match storage.get(stripped_key, "").await { Ok(stored_data) => { @@ -532,20 +532,22 @@ impl StorageService { } }; tracing::trace!("[STORAGE] Processing query on key_expr: {}", q.key_expr()); + + let prefix = self.configuration.strip_prefix.as_ref(); + if q.key_expr().is_wild() { // resolve key expr into individual keys let matching_keys = self.get_matching_keys(q.key_expr()).await; let mut storage = self.storage.lock().await; for key in matching_keys { - let stripped_key = - match crate::strip_prefix(self.strip_prefix.as_ref(), &key.clone().into()) { - Ok(k) => k, - Err(e) => { - tracing::error!("{}", e); - // @TODO: return error when it is supported - return; - } - }; + let stripped_key = match crate::strip_prefix(prefix, &key.clone().into()) { + Ok(k) => k, + Err(e) => { + tracing::error!("{}", e); + // @TODO: return error when it is supported + return; + } + }; match storage.get(stripped_key, q.parameters().as_str()).await { Ok(stored_data) => { for entry in stored_data { @@ -570,7 +572,7 @@ impl StorageService { } drop(storage); } else { - let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), q.key_expr()) { + let stripped_key = match crate::strip_prefix(prefix, q.key_expr()) { Ok(k) => k, Err(e) => { tracing::error!("{}", e); @@ -607,13 +609,26 @@ impl StorageService { let mut result = Vec::new(); // @TODO: if cache exists, use that to get the list let storage = self.storage.lock().await; + + let prefix = self.configuration.strip_prefix.as_ref(); + match storage.get_all_entries().await { Ok(entries) => { for (k, _ts) in entries { // @TODO: optimize adding back the prefix (possible inspiration from https://github.com/eclipse-zenoh/zenoh/blob/0.5.0-beta.9/backends/traits/src/utils.rs#L79) let full_key = match k { - Some(key) => crate::prefix(self.strip_prefix.as_ref(), &key), - None => self.strip_prefix.clone().unwrap(), + Some(key) => crate::prefix(prefix, &key), + None => { + let Some(prefix) = prefix else { + // TODO Check if we have anything in place that would prevent such + // an error from happening. + tracing::error!( + "Internal bug: empty key with no `strip_prefix` configured" + ); + continue; + }; + prefix.clone() + } }; if key_expr.intersects(&full_key.clone()) { result.push(full_key); diff --git a/zenoh-ext/src/serialization.rs b/zenoh-ext/src/serialization.rs index a368bc1684..04b30a5976 100644 --- a/zenoh-ext/src/serialization.rs +++ b/zenoh-ext/src/serialization.rs @@ -7,9 +7,13 @@ use std::{ marker::PhantomData, ops::{Deref, DerefMut}, ptr, + str::FromStr, }; -use zenoh::bytes::{ZBytes, ZBytesReader, ZBytesWriter}; +use zenoh::{ + bytes::{Encoding, ZBytes, ZBytesReader, ZBytesWriter}, + time::{Timestamp, TimestampId, NTP64}, +}; #[derive(Debug)] pub struct ZDeserializeError; @@ -89,6 +93,12 @@ impl ZSerializer { } } +impl Default for ZSerializer { + fn default() -> Self { + Self::new() + } +} + impl From for ZBytes { fn from(value: ZSerializer) -> Self { value.finish() @@ -460,3 +470,63 @@ macro_rules! impl_varint { )*}; } impl_varint!(u8: i8, u16: i16, u32: i32, u64: i64, usize: isize); + +// +// Serialization/deseialization for zenoh types +// + +impl Serialize for NTP64 { + fn serialize(&self, serializer: &mut ZSerializer) { + let time = self.as_u64(); + time.serialize(serializer); + } +} + +impl Deserialize for NTP64 { + fn deserialize(deserializer: &mut ZDeserializer) -> Result { + let time = u64::deserialize(deserializer)?; + Ok(NTP64(time)) + } +} + +impl Serialize for TimestampId { + fn serialize(&self, serializer: &mut ZSerializer) { + self.to_le_bytes().serialize(serializer); + } +} + +impl Deserialize for TimestampId { + fn deserialize(deserializer: &mut ZDeserializer) -> Result { + let id = Vec::::deserialize(deserializer)?; + let id = id.as_slice().try_into().map_err(|_| ZDeserializeError)?; + Ok(id) + } +} + +impl Serialize for Timestamp { + fn serialize(&self, serializer: &mut ZSerializer) { + self.get_time().serialize(serializer); + self.get_id().serialize(serializer); + } +} + +impl Deserialize for Timestamp { + fn deserialize(deserializer: &mut ZDeserializer) -> Result { + let time = NTP64::deserialize(deserializer)?; + let id = TimestampId::deserialize(deserializer)?; + Ok(Timestamp::new(time, id)) + } +} + +impl Serialize for Encoding { + fn serialize(&self, serializer: &mut ZSerializer) { + self.to_string().serialize(serializer); + } +} + +impl Deserialize for zenoh::bytes::Encoding { + fn deserialize(deserializer: &mut ZDeserializer) -> Result { + let encoding = String::deserialize(deserializer)?; + Encoding::from_str(&encoding).map_err(|_| ZDeserializeError) + } +} diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index 6a686e2637..b6071f97f8 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -182,8 +182,6 @@ impl TransportMulticastEventHandler for Handler { } } - fn closing(&self) {} - fn closed(&self) {} fn as_any(&self) -> &dyn std::any::Any { @@ -245,8 +243,6 @@ impl TransportPeerEventHandler for PeerHandler { ); } - fn closing(&self) {} - fn closed(&self) { let info = DataInfo { kind: SampleKind::Delete, diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 85ee915a03..e0abc0c186 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -422,6 +422,7 @@ pub mod shm { zshm::{zshm, ZShm}, zshmmut::{zshmmut, ZShmMut}, }, + cleanup::cleanup_orphaned_shm_segments, client::{shm_client::ShmClient, shm_segment::ShmSegment}, client_storage::{ShmClientStorage, GLOBAL_CLIENT_STORAGE}, common::types::{ChunkID, ProtocolID, SegmentID}, diff --git a/zenoh/src/net/primitives/demux.rs b/zenoh/src/net/primitives/demux.rs index e4774aab4a..8735ed0f32 100644 --- a/zenoh/src/net/primitives/demux.rs +++ b/zenoh/src/net/primitives/demux.rs @@ -100,25 +100,10 @@ impl TransportPeerEventHandler for DeMux { fn del_link(&self, _link: Link) {} - fn closing(&self) { + fn closed(&self) { self.face.send_close(); - if let Some(transport) = self.transport.as_ref() { - let mut declares = vec![]; - let ctrl_lock = zlock!(self.face.tables.ctrl_lock); - let mut tables = zwrite!(self.face.tables.tables); - let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport, &mut |p, m| { - declares.push((p.clone(), m)) - }); - drop(tables); - drop(ctrl_lock); - for (p, m) in declares { - p.send_declare(m); - } - } } - fn closed(&self) {} - fn as_any(&self) -> &dyn Any { self } diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index 47067f231e..63376a6d63 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -200,9 +200,7 @@ impl Primitives for Mux { } } - fn send_close(&self) { - // self.handler.closing().await; - } + fn send_close(&self) {} } impl EPrimitives for Mux { @@ -530,9 +528,7 @@ impl Primitives for McastMux { } } - fn send_close(&self) { - // self.handler.closing().await; - } + fn send_close(&self) {} } impl EPrimitives for McastMux { diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 654f5cf9c3..57d3dbb041 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -16,6 +16,7 @@ use std::{ collections::HashMap, fmt, sync::{Arc, Weak}, + time::Duration, }; use tokio_util::sync::CancellationToken; @@ -37,13 +38,16 @@ use super::{ super::router::*, interests::{declare_final, declare_interest, undeclare_interest, CurrentInterest}, resource::*, - tables::{self, TablesLock}, + tables::TablesLock, }; use crate::{ api::key_expr::KeyExpr, net::{ primitives::{McastMux, Mux, Primitives}, - routing::interceptor::{InterceptorTrait, InterceptorsChain}, + routing::{ + dispatcher::interests::finalize_pending_interests, + interceptor::{InterceptorTrait, InterceptorsChain}, + }, }, }; @@ -421,7 +425,25 @@ impl Primitives for Face { } fn send_close(&self) { - tables::close_face(&self.tables, &Arc::downgrade(&self.state)); + tracing::debug!("Close {}", self.state); + let mut state = self.state.clone(); + state.task_controller.terminate_all(Duration::from_secs(10)); + finalize_pending_queries(&self.tables, &mut state); + let mut declares = vec![]; + let ctrl_lock = zlock!(self.tables.ctrl_lock); + finalize_pending_interests(&self.tables, &mut state, &mut |p, m| { + declares.push((p.clone(), m)) + }); + ctrl_lock.close_face( + &self.tables, + &self.tables.clone(), + &mut state, + &mut |p, m| declares.push((p.clone(), m)), + ); + drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } } } diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index 2c5cfffffb..4dd447360e 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -14,7 +14,7 @@ use std::{ any::Any, collections::HashMap, - sync::{Arc, Mutex, RwLock, Weak}, + sync::{Arc, Mutex, RwLock}, time::Duration, }; @@ -30,7 +30,6 @@ use zenoh_sync::get_mut_unchecked; use super::face::FaceState; pub use super::{pubsub::*, queries::*, resource::*}; use crate::net::routing::{ - dispatcher::interests::finalize_pending_interests, hat::{self, HatTrait}, interceptor::{interceptor_factories, InterceptorFactory}, }; @@ -169,27 +168,6 @@ impl Tables { } } -pub fn close_face(tables: &TablesLock, face: &Weak) { - match face.upgrade() { - Some(mut face) => { - tracing::debug!("Close {}", face); - face.task_controller.terminate_all(Duration::from_secs(10)); - finalize_pending_queries(tables, &mut face); - let mut declares = vec![]; - let ctrl_lock = zlock!(tables.ctrl_lock); - finalize_pending_interests(tables, &mut face, &mut |p, m| { - declares.push((p.clone(), m)) - }); - ctrl_lock.close_face(tables, &mut face, &mut |p, m| declares.push((p.clone(), m))); - drop(ctrl_lock); - for (p, m) in declares { - p.send_declare(m); - } - } - None => tracing::error!("Face already closed!"), - } -} - pub struct TablesLock { pub tables: RwLock, pub(crate) ctrl_lock: Mutex>, diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 04ab653d9f..169b6ccbf1 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -130,6 +130,7 @@ impl HatBaseTrait for HatCode { fn close_face( &self, tables: &TablesLock, + _tables_ref: &Arc, face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -260,16 +261,6 @@ impl HatBaseTrait for HatCode { 0 } - fn closing( - &self, - _tables: &mut Tables, - _tables_ref: &Arc, - _transport: &TransportUnicast, - _send_declare: &mut SendDeclare, - ) -> ZResult<()> { - Ok(()) - } - #[inline] fn ingress_filter(&self, _tables: &Tables, _face: &FaceState, _expr: &mut RoutingExpr) -> bool { true diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 167d9ae58b..a5d1608274 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -261,6 +261,7 @@ impl HatBaseTrait for HatCode { fn close_face( &self, tables: &TablesLock, + tables_ref: &Arc, face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -367,6 +368,21 @@ impl HatBaseTrait for HatCode { Resource::clean(&mut res); } wtables.faces.remove(&face.id); + + if face.whatami != WhatAmI::Client { + for (_, removed_node) in hat_mut!(wtables) + .linkstatepeers_net + .as_mut() + .unwrap() + .remove_link(&face.zid) + { + pubsub_remove_node(&mut wtables, &removed_node.zid, send_declare); + queries_remove_node(&mut wtables, &removed_node.zid, send_declare); + token_remove_node(&mut wtables, &removed_node.zid, send_declare); + } + + hat_mut!(wtables).schedule_compute_trees(tables_ref.clone()); + }; drop(wtables); } @@ -422,35 +438,6 @@ impl HatBaseTrait for HatCode { .get_local_context(routing_context, face_hat!(face).link_id) } - fn closing( - &self, - tables: &mut Tables, - tables_ref: &Arc, - transport: &TransportUnicast, - send_declare: &mut SendDeclare, - ) -> ZResult<()> { - match (transport.get_zid(), transport.get_whatami()) { - (Ok(zid), Ok(whatami)) => { - if whatami != WhatAmI::Client { - for (_, removed_node) in hat_mut!(tables) - .linkstatepeers_net - .as_mut() - .unwrap() - .remove_link(&zid) - { - pubsub_remove_node(tables, &removed_node.zid, send_declare); - queries_remove_node(tables, &removed_node.zid, send_declare); - token_remove_node(tables, &removed_node.zid, send_declare); - } - - hat_mut!(tables).schedule_compute_trees(tables_ref.clone()); - }; - } - (_, _) => tracing::error!("Closed transport in session closing!"), - } - Ok(()) - } - #[inline] fn ingress_filter(&self, _tables: &Tables, _face: &FaceState, _expr: &mut RoutingExpr) -> bool { true diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 3e5a81d259..74850ea4c1 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -131,17 +131,10 @@ pub(crate) trait HatBaseTrait { fn info(&self, tables: &Tables, kind: WhatAmI) -> String; - fn closing( - &self, - tables: &mut Tables, - tables_ref: &Arc, - transport: &TransportUnicast, - send_declare: &mut SendDeclare, - ) -> ZResult<()>; - fn close_face( &self, tables: &TablesLock, + tables_ref: &Arc, face: &mut Arc, send_declare: &mut SendDeclare, ); diff --git a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs index b3216c6b8c..d69eb74cc7 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs @@ -427,8 +427,8 @@ impl Network { .get_transport_unicast(&zid) .await .is_none() + && runtime.connect_peer(&zid, &locators).await { - runtime.connect_peer(&zid, &locators).await; runtime .start_conditions() .terminate_peer_connector_zid(zid) diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 4d20204c19..b4da9a241d 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -205,6 +205,7 @@ impl HatBaseTrait for HatCode { fn close_face( &self, tables: &TablesLock, + _tables_ref: &Arc, face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -311,6 +312,14 @@ impl HatBaseTrait for HatCode { Resource::clean(&mut res); } wtables.faces.remove(&face.id); + + if face.whatami != WhatAmI::Client { + hat_mut!(wtables) + .gossip + .as_mut() + .unwrap() + .remove_link(&face.zid); + }; drop(wtables); } @@ -354,24 +363,6 @@ impl HatBaseTrait for HatCode { 0 } - fn closing( - &self, - tables: &mut Tables, - _tables_ref: &Arc, - transport: &TransportUnicast, - _send_declare: &mut SendDeclare, - ) -> ZResult<()> { - match (transport.get_zid(), transport.get_whatami()) { - (Ok(zid), Ok(whatami)) => { - if whatami != WhatAmI::Client { - hat_mut!(tables).gossip.as_mut().unwrap().remove_link(&zid); - }; - } - (_, _) => tracing::error!("Closed transport in session closing!"), - } - Ok(()) - } - #[inline] fn ingress_filter(&self, _tables: &Tables, _face: &FaceState, _expr: &mut RoutingExpr) -> bool { true diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index c7ea567fb4..5e1906920f 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -430,6 +430,7 @@ impl HatBaseTrait for HatCode { fn close_face( &self, tables: &TablesLock, + tables_ref: &Arc, face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -536,6 +537,84 @@ impl HatBaseTrait for HatCode { Resource::clean(&mut res); } wtables.faces.remove(&face.id); + + match face.whatami { + WhatAmI::Router => { + for (_, removed_node) in hat_mut!(wtables) + .routers_net + .as_mut() + .unwrap() + .remove_link(&face.zid) + { + pubsub_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Router, + send_declare, + ); + queries_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Router, + send_declare, + ); + token_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Router, + send_declare, + ); + } + + if hat!(wtables).full_net(WhatAmI::Peer) { + hat_mut!(wtables).shared_nodes = shared_nodes( + hat!(wtables).routers_net.as_ref().unwrap(), + hat!(wtables).linkstatepeers_net.as_ref().unwrap(), + ); + } + + hat_mut!(wtables).schedule_compute_trees(tables_ref.clone(), WhatAmI::Router); + } + WhatAmI::Peer => { + if hat!(wtables).full_net(WhatAmI::Peer) { + for (_, removed_node) in hat_mut!(wtables) + .linkstatepeers_net + .as_mut() + .unwrap() + .remove_link(&face.zid) + { + pubsub_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Peer, + send_declare, + ); + queries_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Peer, + send_declare, + ); + token_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Peer, + send_declare, + ); + } + + hat_mut!(wtables).shared_nodes = shared_nodes( + hat!(wtables).routers_net.as_ref().unwrap(), + hat!(wtables).linkstatepeers_net.as_ref().unwrap(), + ); + + hat_mut!(wtables).schedule_compute_trees(tables_ref.clone(), WhatAmI::Peer); + } else if let Some(net) = hat_mut!(wtables).linkstatepeers_net.as_mut() { + net.remove_link(&face.zid); + } + } + _ => (), + }; drop(wtables); } @@ -689,100 +768,6 @@ impl HatBaseTrait for HatCode { } } - fn closing( - &self, - tables: &mut Tables, - tables_ref: &Arc, - transport: &TransportUnicast, - send_declare: &mut SendDeclare, - ) -> ZResult<()> { - match (transport.get_zid(), transport.get_whatami()) { - (Ok(zid), Ok(whatami)) => { - match whatami { - WhatAmI::Router => { - for (_, removed_node) in hat_mut!(tables) - .routers_net - .as_mut() - .unwrap() - .remove_link(&zid) - { - pubsub_remove_node( - tables, - &removed_node.zid, - WhatAmI::Router, - send_declare, - ); - queries_remove_node( - tables, - &removed_node.zid, - WhatAmI::Router, - send_declare, - ); - token_remove_node( - tables, - &removed_node.zid, - WhatAmI::Router, - send_declare, - ); - } - - if hat!(tables).full_net(WhatAmI::Peer) { - hat_mut!(tables).shared_nodes = shared_nodes( - hat!(tables).routers_net.as_ref().unwrap(), - hat!(tables).linkstatepeers_net.as_ref().unwrap(), - ); - } - - hat_mut!(tables) - .schedule_compute_trees(tables_ref.clone(), WhatAmI::Router); - } - WhatAmI::Peer => { - if hat!(tables).full_net(WhatAmI::Peer) { - for (_, removed_node) in hat_mut!(tables) - .linkstatepeers_net - .as_mut() - .unwrap() - .remove_link(&zid) - { - pubsub_remove_node( - tables, - &removed_node.zid, - WhatAmI::Peer, - send_declare, - ); - queries_remove_node( - tables, - &removed_node.zid, - WhatAmI::Peer, - send_declare, - ); - token_remove_node( - tables, - &removed_node.zid, - WhatAmI::Peer, - send_declare, - ); - } - - hat_mut!(tables).shared_nodes = shared_nodes( - hat!(tables).routers_net.as_ref().unwrap(), - hat!(tables).linkstatepeers_net.as_ref().unwrap(), - ); - - hat_mut!(tables) - .schedule_compute_trees(tables_ref.clone(), WhatAmI::Peer); - } else if let Some(net) = hat_mut!(tables).linkstatepeers_net.as_mut() { - net.remove_link(&zid); - } - } - _ => (), - }; - } - (_, _) => tracing::error!("Closed transport in session closing!"), - } - Ok(()) - } - #[inline] fn ingress_filter(&self, tables: &Tables, face: &FaceState, expr: &mut RoutingExpr) -> bool { face.whatami != WhatAmI::Peer diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index aca6734d4d..301698eea6 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -457,16 +457,9 @@ impl TransportPeerEventHandler for RuntimeSession { } } - fn closing(&self) { - self.main_handler.closing(); - Runtime::closing_session(self); - for handler in &self.slave_handlers { - handler.closing(); - } - } - fn closed(&self) { self.main_handler.closed(); + Runtime::closed_session(self); for handler in &self.slave_handlers { handler.closed(); } @@ -500,12 +493,6 @@ impl TransportMulticastEventHandler for RuntimeMulticastGroup { })) } - fn closing(&self) { - for handler in &self.slave_handlers { - handler.closed(); - } - } - fn closed(&self) { for handler in &self.slave_handlers { handler.closed(); @@ -541,13 +528,6 @@ impl TransportPeerEventHandler for RuntimeMulticastSession { } } - fn closing(&self) { - self.main_handler.closing(); - for handler in &self.slave_handlers { - handler.closing(); - } - } - fn closed(&self) { self.main_handler.closed(); for handler in &self.slave_handlers { diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index faadf5eb5c..e09a4e812b 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -911,7 +911,7 @@ impl Runtime { } } - /// Returns `true` if a new Transport instance has been opened with `zid`. + /// Returns `true` if a new Transport instance is established with `zid` or had already been established. #[must_use] async fn connect(&self, zid: &ZenohIdProto, scouted_locators: &[Locator]) -> bool { if !self.insert_pending_connection(*zid).await { @@ -1024,7 +1024,8 @@ impl Runtime { } } - pub async fn connect_peer(&self, zid: &ZenohIdProto, locators: &[Locator]) { + /// Returns `true` if a new Transport instance is established with `zid` or had already been established. + pub async fn connect_peer(&self, zid: &ZenohIdProto, locators: &[Locator]) -> bool { let manager = self.manager(); if zid != &manager.zid() { let has_unicast = manager.get_transport_unicast(zid).await.is_some(); @@ -1042,10 +1043,13 @@ impl Runtime { if !has_unicast && !has_multicast { tracing::debug!("Try to connect to peer {} via any of {:?}", zid, locators); - let _ = self.connect(zid, locators).await; + self.connect(zid, locators).await } else { tracing::trace!("Already connected scouted peer: {}", zid); + true } + } else { + true } } @@ -1089,7 +1093,7 @@ impl Runtime { ) { Runtime::scout(ucast_sockets, what, addr, move |hello| async move { if !hello.locators.is_empty() { - self.connect_peer(&hello.zid, &hello.locators).await + self.connect_peer(&hello.zid, &hello.locators).await; } else { tracing::warn!("Received Hello with no locators: {:?}", hello); } @@ -1180,7 +1184,7 @@ impl Runtime { } } - pub(super) fn closing_session(session: &RuntimeSession) { + pub(super) fn closed_session(session: &RuntimeSession) { if session.runtime.is_closed() { return; } diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 23c0d9c053..8953397b8d 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -32,10 +32,7 @@ use zenoh_protocol::{ use crate::net::{ primitives::{DummyPrimitives, EPrimitives, Primitives}, routing::{ - dispatcher::{ - pubsub::SubscriberInfo, - tables::{self, Tables}, - }, + dispatcher::{pubsub::SubscriberInfo, tables::Tables}, router::*, RoutingContext, }, @@ -189,15 +186,14 @@ fn multisub_test() { let tables = router.tables.clone(); let primitives = Arc::new(DummyPrimitives {}); - let face0 = Arc::downgrade(&router.new_primitives(primitives).state); - assert!(face0.upgrade().is_some()); + let face0 = &router.new_primitives(primitives); // -------------- let sub_info = SubscriberInfo; declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 0, &"sub".into(), &sub_info, @@ -213,7 +209,7 @@ fn multisub_test() { declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 1, &"sub".into(), &sub_info, @@ -225,7 +221,7 @@ fn multisub_test() { undeclare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 0, &WireExpr::empty(), NodeId::default(), @@ -236,7 +232,7 @@ fn multisub_test() { undeclare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 1, &WireExpr::empty(), NodeId::default(), @@ -244,7 +240,7 @@ fn multisub_test() { ); assert!(res.upgrade().is_none()); - tables::close_face(&tables, &face0); + face0.send_close(); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -260,11 +256,10 @@ async fn clean_test() { let tables = router.tables.clone(); let primitives = Arc::new(DummyPrimitives {}); - let face0 = Arc::downgrade(&router.new_primitives(primitives).state); - assert!(face0.upgrade().is_some()); + let face0 = &router.new_primitives(primitives); // -------------- - register_expr(&tables, &mut face0.upgrade().unwrap(), 1, &"todrop1".into()); + register_expr(&tables, &mut face0.state.clone(), 1, &"todrop1".into()); let optres1 = Resource::get_resource(zread!(tables.tables)._get_root(), "todrop1") .map(|res| Arc::downgrade(&res)); assert!(optres1.is_some()); @@ -273,7 +268,7 @@ async fn clean_test() { register_expr( &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 2, &"todrop1/todrop11".into(), ); @@ -283,30 +278,30 @@ async fn clean_test() { let res2 = optres2.unwrap(); assert!(res2.upgrade().is_some()); - register_expr(&tables, &mut face0.upgrade().unwrap(), 3, &"**".into()); + register_expr(&tables, &mut face0.state.clone(), 3, &"**".into()); let optres3 = Resource::get_resource(zread!(tables.tables)._get_root(), "**") .map(|res| Arc::downgrade(&res)); assert!(optres3.is_some()); let res3 = optres3.unwrap(); assert!(res3.upgrade().is_some()); - unregister_expr(&tables, &mut face0.upgrade().unwrap(), 1); + unregister_expr(&tables, &mut face0.state.clone(), 1); assert!(res1.upgrade().is_some()); assert!(res2.upgrade().is_some()); assert!(res3.upgrade().is_some()); - unregister_expr(&tables, &mut face0.upgrade().unwrap(), 2); + unregister_expr(&tables, &mut face0.state.clone(), 2); assert!(res1.upgrade().is_none()); assert!(res2.upgrade().is_none()); assert!(res3.upgrade().is_some()); - unregister_expr(&tables, &mut face0.upgrade().unwrap(), 3); + unregister_expr(&tables, &mut face0.state.clone(), 3); assert!(res1.upgrade().is_none()); assert!(res2.upgrade().is_none()); assert!(res3.upgrade().is_none()); // -------------- - register_expr(&tables, &mut face0.upgrade().unwrap(), 1, &"todrop1".into()); + register_expr(&tables, &mut face0.state.clone(), 1, &"todrop1".into()); let optres1 = Resource::get_resource(zread!(tables.tables)._get_root(), "todrop1") .map(|res| Arc::downgrade(&res)); assert!(optres1.is_some()); @@ -318,7 +313,7 @@ async fn clean_test() { declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 0, &"todrop1/todrop11".into(), &sub_info, @@ -334,7 +329,7 @@ async fn clean_test() { declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 1, &WireExpr::from(1).with_suffix("/todrop12"), &sub_info, @@ -351,7 +346,7 @@ async fn clean_test() { undeclare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 1, &WireExpr::empty(), NodeId::default(), @@ -367,7 +362,7 @@ async fn clean_test() { undeclare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 0, &WireExpr::empty(), NodeId::default(), @@ -377,17 +372,17 @@ async fn clean_test() { assert!(res2.upgrade().is_none()); assert!(res3.upgrade().is_none()); - unregister_expr(&tables, &mut face0.upgrade().unwrap(), 1); + unregister_expr(&tables, &mut face0.state.clone(), 1); assert!(res1.upgrade().is_none()); assert!(res2.upgrade().is_none()); assert!(res3.upgrade().is_none()); // -------------- - register_expr(&tables, &mut face0.upgrade().unwrap(), 2, &"todrop3".into()); + register_expr(&tables, &mut face0.state.clone(), 2, &"todrop3".into()); declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 2, &"todrop3".into(), &sub_info, @@ -403,7 +398,7 @@ async fn clean_test() { undeclare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 2, &WireExpr::empty(), NodeId::default(), @@ -411,16 +406,16 @@ async fn clean_test() { ); assert!(res1.upgrade().is_some()); - unregister_expr(&tables, &mut face0.upgrade().unwrap(), 2); + unregister_expr(&tables, &mut face0.state.clone(), 2); assert!(res1.upgrade().is_none()); // -------------- - register_expr(&tables, &mut face0.upgrade().unwrap(), 3, &"todrop4".into()); - register_expr(&tables, &mut face0.upgrade().unwrap(), 4, &"todrop5".into()); + register_expr(&tables, &mut face0.state.clone(), 3, &"todrop4".into()); + register_expr(&tables, &mut face0.state.clone(), 4, &"todrop5".into()); declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 3, &"todrop5".into(), &sub_info, @@ -430,7 +425,7 @@ async fn clean_test() { declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 4, &"todrop6".into(), &sub_info, @@ -455,8 +450,7 @@ async fn clean_test() { assert!(res2.upgrade().is_some()); assert!(res3.upgrade().is_some()); - tables::close_face(&tables, &face0); - assert!(face0.upgrade().is_none()); + face0.send_close(); assert!(res1.upgrade().is_none()); assert!(res2.upgrade().is_none()); assert!(res3.upgrade().is_none()); diff --git a/zenoh/tests/authentication.rs b/zenoh/tests/authentication.rs index e73963a98d..cd9b3848d2 100644 --- a/zenoh/tests/authentication.rs +++ b/zenoh/tests/authentication.rs @@ -41,10 +41,10 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_usrpswd(37447).await; - test_pub_sub_allow_then_deny_usrpswd(37447).await; - test_get_qbl_allow_then_deny_usrpswd(37447).await; - test_get_qbl_deny_then_allow_usrpswd(37447).await; + test_pub_sub_deny_then_allow_usrpswd(29447).await; + test_pub_sub_allow_then_deny_usrpswd(29447).await; + test_get_qbl_allow_then_deny_usrpswd(29447).await; + test_get_qbl_deny_then_allow_usrpswd(29447).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -53,10 +53,10 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_tls(37448, false).await; - test_pub_sub_allow_then_deny_tls(37449).await; - test_get_qbl_allow_then_deny_tls(37450).await; - test_get_qbl_deny_then_allow_tls(37451).await; + test_pub_sub_deny_then_allow_tls(29448, false).await; + test_pub_sub_allow_then_deny_tls(29449).await; + test_get_qbl_allow_then_deny_tls(29450).await; + test_get_qbl_deny_then_allow_tls(29451).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -65,10 +65,10 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_quic(37452).await; - test_pub_sub_allow_then_deny_quic(37453).await; - test_get_qbl_deny_then_allow_quic(37454).await; - test_get_qbl_allow_then_deny_quic(37455).await; + test_pub_sub_deny_then_allow_quic(29452).await; + test_pub_sub_allow_then_deny_quic(29453).await; + test_get_qbl_deny_then_allow_quic(29454).await; + test_get_qbl_allow_then_deny_quic(29455).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -78,7 +78,7 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_tls(37456, true).await; + test_pub_sub_deny_then_allow_tls(29456, true).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -87,8 +87,8 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_deny_allow_combination(37457).await; - test_allow_deny_combination(37458).await; + test_deny_allow_combination(29457).await; + test_allow_deny_combination(29458).await; } #[allow(clippy::all)] diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 76c0ccff41..25dac7ddb5 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -145,7 +145,7 @@ fn downsampling_test( fn downsampling_by_keyexpr_impl(flow: InterceptorFlow) { let ke_prefix = "test/downsamples_by_keyexp"; - let locator = "tcp/127.0.0.1:38446"; + let locator = "tcp/127.0.0.1:31446"; let ke_10hz: KeyExpr = format!("{ke_prefix}/10hz").try_into().unwrap(); let ke_20hz: KeyExpr = format!("{ke_prefix}/20hz").try_into().unwrap(); @@ -198,7 +198,7 @@ fn downsampling_by_keyexpr() { #[cfg(unix)] fn downsampling_by_interface_impl(flow: InterceptorFlow) { let ke_prefix = "test/downsamples_by_interface"; - let locator = "tcp/127.0.0.1:38447"; + let locator = "tcp/127.0.0.1:31447"; let ke_10hz: KeyExpr = format!("{ke_prefix}/10hz").try_into().unwrap(); let ke_no_effect: KeyExpr = format!("{ke_prefix}/no_effect").try_into().unwrap(); diff --git a/zenoh/tests/unicity.rs b/zenoh/tests/unicity.rs index d6a8c8f621..eefead014e 100644 --- a/zenoh/tests/unicity.rs +++ b/zenoh/tests/unicity.rs @@ -82,7 +82,7 @@ async fn open_router_session() -> Session { config .listen .endpoints - .set(vec!["tcp/127.0.0.1:37447".parse().unwrap()]) + .set(vec!["tcp/127.0.0.1:30447".parse().unwrap()]) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][00a] Opening router session"); @@ -100,7 +100,7 @@ async fn open_client_sessions() -> (Session, Session, Session) { config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect - .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:37447" + .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:30447" .parse::() .unwrap()])) .unwrap(); @@ -111,7 +111,7 @@ async fn open_client_sessions() -> (Session, Session, Session) { config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect - .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:37447" + .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:30447" .parse::() .unwrap()])) .unwrap(); @@ -122,7 +122,7 @@ async fn open_client_sessions() -> (Session, Session, Session) { config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect - .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:37447" + .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:30447" .parse::() .unwrap()])) .unwrap();