From 2420be79fb7060fa460778047779fbe07b45cf8e Mon Sep 17 00:00:00 2001 From: 0xkr8os Date: Tue, 26 Aug 2025 13:12:36 -0500 Subject: [PATCH 1/5] feat: expose add registration function for rendezvous behavior --- protocols/rendezvous/src/lib.rs | 2 +- protocols/rendezvous/src/server.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/protocols/rendezvous/src/lib.rs b/protocols/rendezvous/src/lib.rs index 221178728af..a34a0cbe3eb 100644 --- a/protocols/rendezvous/src/lib.rs +++ b/protocols/rendezvous/src/lib.rs @@ -24,7 +24,7 @@ use libp2p_swarm::StreamProtocol; -pub use self::codec::{Cookie, ErrorCode, Namespace, NamespaceTooLong, Registration, Ttl}; +pub use self::codec::{Cookie, ErrorCode, Namespace, NamespaceTooLong, Registration, Ttl, NewRegistration}; mod codec; diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index 572f82b057f..2c22744e39b 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -85,6 +85,10 @@ impl Behaviour { registrations: Registrations::with_config(config), } } + + pub fn add_registration(&mut self, registration: NewRegistration) -> Result { + self.registrations.add(registration) + } } #[derive(Debug)] From e412b4454106884ed3af53c30f73117cc1094e0d Mon Sep 17 00:00:00 2001 From: 0xkr8os Date: Thu, 4 Sep 2025 17:30:37 -0500 Subject: [PATCH 2/5] feat: auth rendezvous --- .gitignore | 3 + Cargo.lock | 24 + Cargo.toml | 2 + protocols/auth-rendezvous/CHANGELOG.md | 147 +++ protocols/auth-rendezvous/Cargo.toml | 43 + protocols/auth-rendezvous/src/client.rs | 462 ++++++++++ protocols/auth-rendezvous/src/codec.rs | 671 ++++++++++++++ .../auth-rendezvous/src/generated/mod.rs | 2 + .../src/generated/rendezvous/mod.rs | 2 + .../src/generated/rendezvous/pb.rs | 372 ++++++++ .../auth-rendezvous/src/generated/rpc.proto | 63 ++ protocols/auth-rendezvous/src/lib.rs | 54 ++ protocols/auth-rendezvous/src/server.rs | 860 ++++++++++++++++++ protocols/auth-rendezvous/tests/rendezvous.rs | 526 +++++++++++ 14 files changed, 3231 insertions(+) create mode 100644 protocols/auth-rendezvous/CHANGELOG.md create mode 100644 protocols/auth-rendezvous/Cargo.toml create mode 100644 protocols/auth-rendezvous/src/client.rs create mode 100644 protocols/auth-rendezvous/src/codec.rs create mode 100644 protocols/auth-rendezvous/src/generated/mod.rs create mode 100644 protocols/auth-rendezvous/src/generated/rendezvous/mod.rs create mode 100644 protocols/auth-rendezvous/src/generated/rendezvous/pb.rs create mode 100644 protocols/auth-rendezvous/src/generated/rpc.proto create mode 100644 protocols/auth-rendezvous/src/lib.rs create mode 100644 protocols/auth-rendezvous/src/server.rs create mode 100644 protocols/auth-rendezvous/tests/rendezvous.rs diff --git a/.gitignore b/.gitignore index eb5a316cbd1..83665e26114 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ target + +.DS_Store +.vscode/ diff --git a/Cargo.lock b/Cargo.lock index b088d0da44f..3f922795028 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2474,6 +2474,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "libp2p-auth-rendezvous" +version = "0.17.0" +dependencies = [ + "async-trait", + "asynchronous-codec", + "bimap", + "futures", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-request-response", + "libp2p-swarm", + "libp2p-swarm-test", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "thiserror 2.0.12", + "tokio", + "tracing", + "tracing-subscriber", + "web-time 1.1.0", +] + [[package]] name = "libp2p-autonat" version = "0.15.0" diff --git a/Cargo.toml b/Cargo.toml index 527d20c27e4..c60fc389dd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "protocols/perf", "protocols/ping", "protocols/relay", + "protocols/auth-rendezvous", "protocols/rendezvous", "protocols/request-response", "protocols/stream", @@ -98,6 +99,7 @@ libp2p-plaintext = { version = "0.43.0", path = "transports/plaintext" } libp2p-pnet = { version = "0.26.0", path = "transports/pnet" } libp2p-quic = { version = "0.13.0", path = "transports/quic" } libp2p-relay = { version = "0.21.0", path = "protocols/relay" } +libp2p-auth-rendezvous = { version = "0.1.0", path = "protocols/auth-rendezvous" } libp2p-rendezvous = { version = "0.17.0", path = "protocols/rendezvous" } libp2p-request-response = { version = "0.29.0", path = "protocols/request-response" } libp2p-server = { version = "0.12.7", path = "misc/server" } diff --git a/protocols/auth-rendezvous/CHANGELOG.md b/protocols/auth-rendezvous/CHANGELOG.md new file mode 100644 index 00000000000..77727945347 --- /dev/null +++ b/protocols/auth-rendezvous/CHANGELOG.md @@ -0,0 +1,147 @@ +## 0.17.0 + +- Emit `ToSwarm::NewExternalAddrOfPeer` for newly discovered peers. + See [PR 5138](https://github.com/libp2p/rust-libp2p/pull/5138). +- Log error instead of panicking when sending response to channel fails + See [PR 6002](https://github.com/libp2p/rust-libp2p/pull/6002). + + + +## 0.16.0 + +- Update to `libp2p-request-response` `v0.28.0`. + + + +## 0.15.0 + + + +## 0.14.1 +- Use `web-time` instead of `instant`. + See [PR 5347](https://github.com/libp2p/rust-libp2p/pull/5347). + +## 0.14.0 + + +## 0.13.1 +- Refresh registration upon a change in external addresses. + See [PR 4629]. + +[PR 4629]: https://github.com/libp2p/rust-libp2p/pull/4629 + +## 0.13.0 + +- Changed the signature of the function `client::Behavior::register()`, + it returns `Result<(), RegisterError>` now. + Remove the `Remote` variant from `RegisterError` and instead put the information from `Remote` + directly into the variant from the `Event` enum. + See [PR 4073]. + +- Raise MSRV to 1.65. + See [PR 3715]. + +[PR 4073]: https://github.com/libp2p/rust-libp2p/pull/4073 +[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 + +## 0.12.1 + +- Migrate from `prost` to `quick-protobuf`. This removes `protoc` dependency. See [PR 3312]. + +[PR 3312]: https://github.com/libp2p/rust-libp2p/pull/3312 + +## 0.12.0 + +- Update to `libp2p-core` `v0.39.0`. + +- Update to `libp2p-swarm` `v0.42.0`. + +## 0.11.0 + +- De- and encode protobuf messages using `prost-codec`. See [PR 3058]. + +- Update to `libp2p-core` `v0.38.0`. + +- Update to `libp2p-swarm` `v0.41.0`. + +- Replace `Client` and `Server`'s `NetworkBehaviour` implementation `inject_*` methods with the new `on_*` methods. + See [PR 3011]. + +- Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090]. + +[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 +[PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058 +[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090 + +## 0.10.0 + +- Update to `libp2p-core` `v0.37.0`. + +- Update to `libp2p-swarm` `v0.40.0`. + +## 0.9.0 + +- Update to `libp2p-swarm` `v0.39.0`. + +- Update to `libp2p-core` `v0.36.0`. + +## 0.8.0 + +- Update prost requirement from 0.10 to 0.11 which no longer installs the protoc Protobuf compiler. + Thus you will need protoc installed locally. See [PR 2788]. + +- Update to `libp2p-swarm` `v0.38.0`. + +- Update to `libp2p-core` `v0.35.0`. + +[PR 2788]: https://github.com/libp2p/rust-libp2p/pull/2788 + +## 0.7.0 + +- Update to `libp2p-core` `v0.34.0`. + +- Update to `libp2p-swarm` `v0.37.0`. + +## 0.6.0 + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + +- Renamed `Error::ConversionError` to `Error::Conversion` in the `codec` module. See [PR 2620]. + +[PR 2620]: https://github.com/libp2p/rust-libp2p/pull/2620 + +## 0.5.0 + +- Update to `libp2p-swarm` `v0.35.0`. + +## 0.4.0 [2022-02-22] + +- Update to `libp2p-core` `v0.32.0`. + +- Update to `libp2p-swarm` `v0.34.0`. + +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + +## 0.3.0 [2022-01-27] + +- Update dependencies. + +- Migrate to Rust edition 2021 (see [PR 2339]). + +[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 + +## 0.2.0 [2021-11-16] + +- Use `instant` and `futures-timer` instead of `wasm-timer` (see [PR 2245]). + +- Update dependencies. + +[PR 2245]: https://github.com/libp2p/rust-libp2p/pull/2245 + +## 0.1.0 [2021-11-01] + +- Initial release. diff --git a/protocols/auth-rendezvous/Cargo.toml b/protocols/auth-rendezvous/Cargo.toml new file mode 100644 index 00000000000..1c160be023d --- /dev/null +++ b/protocols/auth-rendezvous/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "libp2p-auth-rendezvous" +edition.workspace = true +rust-version = { workspace = true } +description = "Rendezvous protocol for libp2p" +version = "0.17.0" +authors = ["The COMIT guys "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +asynchronous-codec = { workspace = true } +async-trait = "0.1" +bimap = "0.6.3" +futures = { workspace = true, features = ["std"] } +futures-timer = "3.0.3" +web-time = { workspace = true } +libp2p-core = { workspace = true } +libp2p-swarm = { workspace = true } +libp2p-identity = { workspace = true } +libp2p-request-response = { workspace = true } +quick-protobuf = "0.8" +quick-protobuf-codec = { workspace = true } +rand = "0.8" +thiserror = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +libp2p-swarm = { workspace = true, features = ["macros", "tokio"] } +libp2p-swarm-test = { path = "../../swarm-test" } +rand = "0.8" +tokio = { workspace = true, features = [ "rt-multi-thread", "time", "macros", "sync", "process", "fs", "net" ] } +tracing-subscriber = { workspace = true, features = ["env-filter"] } + +# Passing arguments to the docsrs builder in order to properly document cfg's. +# More information: https://docs.rs/about/builds#cross-compiling +[package.metadata.docs.rs] +all-features = true + +[lints] +workspace = true diff --git a/protocols/auth-rendezvous/src/client.rs b/protocols/auth-rendezvous/src/client.rs new file mode 100644 index 00000000000..ecf92577001 --- /dev/null +++ b/protocols/auth-rendezvous/src/client.rs @@ -0,0 +1,462 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + collections::{HashMap, VecDeque}, + iter, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{ + future::{BoxFuture, FutureExt}, + stream::{FuturesUnordered, StreamExt}, +}; +use libp2p_core::{transport::PortUse, Endpoint, Multiaddr, PeerRecord}; +use libp2p_identity::{Keypair, PeerId, SigningError}; +use libp2p_request_response::{OutboundRequestId, ProtocolSupport}; +use libp2p_swarm::{ + ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, +}; + +use crate::codec::{ + Cookie, ErrorCode, Message, Message::*, Namespace, NewRegistration, Registration, Ttl, +}; + +pub struct Behaviour { + events: VecDeque::ToSwarm, THandlerInEvent>>, + + inner: libp2p_request_response::Behaviour, + + keypair: Keypair, + + waiting_for_register: HashMap>)>, + waiting_for_discovery: HashMap)>, + + /// Hold addresses of all peers that we have discovered so far. + /// + /// Storing these internally allows us to assist the [`libp2p_swarm::Swarm`] in dialing by + /// returning addresses from [`NetworkBehaviour::handle_pending_outbound_connection`]. + discovered_peers: HashMap>>, + + registered_namespaces: HashMap<(PeerId, Namespace), (Ttl, Option>)>, + + /// Tracks the expiry of registrations that we have discovered and stored in `discovered_peers` + /// otherwise we have a memory leak. + expiring_registrations: FuturesUnordered>, + + external_addresses: ExternalAddresses, +} + +impl Behaviour { + /// Create a new instance of the rendezvous [`NetworkBehaviour`]. + pub fn new(keypair: Keypair) -> Self { + Self { + events: Default::default(), + inner: libp2p_request_response::Behaviour::with_codec( + crate::codec::Codec::default(), + iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Outbound)), + libp2p_request_response::Config::default(), + ), + keypair, + waiting_for_register: Default::default(), + waiting_for_discovery: Default::default(), + discovered_peers: Default::default(), + registered_namespaces: Default::default(), + expiring_registrations: FuturesUnordered::from_iter(vec![ + futures::future::pending().boxed() + ]), + external_addresses: Default::default(), + } + } + + /// Register our external addresses in the given namespace with the given rendezvous peer. + /// + /// External addresses are either manually added via + /// [`libp2p_swarm::Swarm::add_external_address`] or reported by other [`NetworkBehaviour`]s + /// via [`ToSwarm::ExternalAddrConfirmed`]. + pub fn register( + &mut self, + namespace: Namespace, + rendezvous_node: PeerId, + ttl: Option, + auth_token: Option>, + ) -> Result<(), RegisterError> { + let external_addresses = self.external_addresses.iter().cloned().collect::>(); + if external_addresses.is_empty() { + return Err(RegisterError::NoExternalAddresses); + } + + let peer_record = PeerRecord::new(&self.keypair, external_addresses)?; + let req_id = self.inner.send_request( + &rendezvous_node, + Register { + new_registration: NewRegistration::new(namespace.clone(), peer_record, ttl), + auth_token: auth_token.clone(), + }, + ); + self.waiting_for_register + .insert(req_id, (rendezvous_node, namespace, auth_token)); + + Ok(()) + } + + /// Unregister ourselves from the given namespace with the given rendezvous peer. + pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) { + self.registered_namespaces + .retain(|(rz_node, ns), _| rz_node.ne(&rendezvous_node) && ns.ne(&namespace)); + + self.inner + .send_request(&rendezvous_node, Unregister(namespace)); + } + + /// Discover other peers at a given rendezvous peer. + /// + /// If desired, the registrations can be filtered by a namespace. + /// If no namespace is given, peers from all namespaces will be returned. + /// A successfully discovery returns a cookie within [`Event::Discovered`]. + /// Such a cookie can be used to only fetch the _delta_ of registrations since + /// the cookie was acquired. + pub fn discover( + &mut self, + namespace: Option, + cookie: Option, + limit: Option, + rendezvous_node: PeerId, + auth_token: Option>, + ) { + let req_id = self.inner.send_request( + &rendezvous_node, + Discover { + namespace: namespace.clone(), + cookie, + limit, + auth_token: auth_token, + }, + ); + + self.waiting_for_discovery + .insert(req_id, (rendezvous_node, namespace)); + } +} + +#[derive(Debug, thiserror::Error)] +pub enum RegisterError { + #[error("We don't know about any externally reachable addresses of ours")] + NoExternalAddresses, + #[error("Failed to make a new PeerRecord")] + FailedToMakeRecord(#[from] SigningError), +} + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum Event { + /// We successfully discovered other nodes with using the contained rendezvous node. + Discovered { + rendezvous_node: PeerId, + registrations: Vec, + cookie: Cookie, + }, + /// We failed to discover other nodes on the contained rendezvous node. + DiscoverFailed { + rendezvous_node: PeerId, + namespace: Option, + error: ErrorCode, + }, + /// We successfully registered with the contained rendezvous node. + Registered { + rendezvous_node: PeerId, + ttl: Ttl, + namespace: Namespace, + }, + /// We failed to register with the contained rendezvous node. + RegisterFailed { + rendezvous_node: PeerId, + namespace: Namespace, + error: ErrorCode, + }, + /// The connection details we learned from this node expired. + Expired { peer: PeerId }, +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; + + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.inner.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + port_use: PortUse, + ) -> Result, ConnectionDenied> { + self.inner.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + self.inner + .on_connection_handler_event(peer_id, connection_id, event); + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + let changed = self.external_addresses.on_swarm_event(&event); + + self.inner.on_swarm_event(event); + + if changed && self.external_addresses.iter().count() > 0 { + let registered = self.registered_namespaces.clone(); + for ((rz_node, ns), (ttl, auth_token)) in registered { + if let Err(e) = self.register(ns, rz_node, Some(ttl), auth_token) { + tracing::warn!("refreshing registration failed: {e}") + } + } + } + } + + #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))] + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + use libp2p_request_response as req_res; + loop { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + + match self.inner.poll(cx) { + Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message { + message: + req_res::Message::Response { + request_id, + response, + }, + .. + })) => { + if let Some(event) = self.handle_response(&request_id, response) { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + continue; // not a request we care about + } + Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure { + request_id, + .. + })) => { + if let Some(event) = self.event_for_outbound_failure(&request_id) { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + continue; // not a request we care about + } + Poll::Ready(ToSwarm::GenerateEvent( + req_res::Event::InboundFailure { .. } + | req_res::Event::ResponseSent { .. } + | req_res::Event::Message { + message: req_res::Message::Request { .. }, + .. + }, + )) => { + unreachable!("rendezvous clients never receive requests") + } + Poll::Ready(other) => { + let new_to_swarm = + other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants")); + + return Poll::Ready(new_to_swarm); + } + Poll::Pending => {} + } + + if let Poll::Ready(Some((peer, expired_registration))) = + self.expiring_registrations.poll_next_unpin(cx) + { + let Some(registrations) = self.discovered_peers.get_mut(&peer) else { + continue; + }; + registrations.remove(&expired_registration); + if registrations.is_empty() { + self.discovered_peers.remove(&peer); + } + return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired { peer })); + } + + return Poll::Pending; + } + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let addrs = maybe_peer + .map(|peer| self.discovered_peer_addrs(&peer).cloned().collect()) + .unwrap_or_default(); + Ok(addrs) + } +} + +impl Behaviour { + fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option { + if let Some((rendezvous_node, namespace, _)) = self.waiting_for_register.remove(req_id) { + return Some(Event::RegisterFailed { + rendezvous_node, + namespace, + error: ErrorCode::Unavailable, + }); + }; + + if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) { + return Some(Event::DiscoverFailed { + rendezvous_node, + namespace, + error: ErrorCode::Unavailable, + }); + }; + + None + } + + fn handle_response( + &mut self, + request_id: &OutboundRequestId, + response: Message, + ) -> Option { + match response { + RegisterResponse(Ok(ttl)) => { + let (rendezvous_node, namespace, auth_token) = + self.waiting_for_register.remove(request_id)?; + self.registered_namespaces + .insert((rendezvous_node, namespace.clone()), (ttl, auth_token)); + + Some(Event::Registered { + rendezvous_node, + ttl, + namespace, + }) + } + RegisterResponse(Err(error_code)) => { + let (rendezvous_node, namespace, _) = + self.waiting_for_register.remove(request_id)?; + Some(Event::RegisterFailed { + rendezvous_node, + namespace, + error: error_code, + }) + } + DiscoverResponse(Ok((registrations, cookie))) => { + let (rendezvous_node, _ns) = self.waiting_for_discovery.remove(request_id)?; + registrations.iter().for_each(|registration| { + let peer_id = registration.record.peer_id(); + let addresses = registration.record.addresses(); + let namespace = registration.namespace.clone(); + let ttl = registration.ttl; + + // Emit events for all newly discovered addresses. + let new_addr_events = addresses + .iter() + .filter_map(|address| { + if self.discovered_peer_addrs(&peer_id).any(|a| a == address) { + return None; + } + Some(ToSwarm::NewExternalAddrOfPeer { + peer_id, + address: address.clone(), + }) + }) + .collect::>(); + self.events.extend(new_addr_events); + + // Update list of discovered peers. + self.discovered_peers + .entry(peer_id) + .or_default() + .insert(namespace.clone(), addresses.to_owned()); + + // Push registration expiry future. + self.expiring_registrations.push( + async move { + // if the timer errors we consider it expired + futures_timer::Delay::new(Duration::from_secs(ttl)).await; + (peer_id, namespace) + } + .boxed(), + ); + }); + + Some(Event::Discovered { + rendezvous_node, + registrations, + cookie, + }) + } + DiscoverResponse(Err(error_code)) => { + let (rendezvous_node, ns) = self.waiting_for_discovery.remove(request_id)?; + Some(Event::DiscoverFailed { + rendezvous_node, + namespace: ns, + error: error_code, + }) + } + _ => unreachable!("rendezvous clients never receive requests"), + } + } + + fn discovered_peer_addrs(&self, peer: &PeerId) -> impl Iterator { + self.discovered_peers + .get(peer) + .map(|addrs| addrs.values().flatten()) + .unwrap_or_default() + } +} diff --git a/protocols/auth-rendezvous/src/codec.rs b/protocols/auth-rendezvous/src/codec.rs new file mode 100644 index 00000000000..a939f1c2218 --- /dev/null +++ b/protocols/auth-rendezvous/src/codec.rs @@ -0,0 +1,671 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{fmt, io}; + +use async_trait::async_trait; +use asynchronous_codec::{BytesMut, Decoder, Encoder, FramedRead, FramedWrite}; +use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt}; +use libp2p_core::{peer_record, signed_envelope, PeerRecord, SignedEnvelope}; +use libp2p_swarm::StreamProtocol; +use quick_protobuf_codec::Codec as ProtobufCodec; +use rand::RngCore; + +use crate::DEFAULT_TTL; + +pub type Ttl = u64; +pub(crate) type Limit = u64; + +const MAX_MESSAGE_LEN_BYTES: usize = 1024 * 1024; + +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Clone, PartialEq)] +pub enum Message { + Register{ + new_registration: NewRegistration, + auth_token: Option>, + }, + RegisterResponse(Result), + Unregister(Namespace), + Discover { + namespace: Option, + cookie: Option, + limit: Option, + auth_token: Option>, + }, + DiscoverResponse(Result<(Vec, Cookie), ErrorCode>), +} + +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub struct Namespace(String); + +impl Namespace { + /// Creates a new [`Namespace`] from a static string. + /// + /// This will panic if the namespace is too long. We accepting panicking in this case because we + /// are enforcing a `static lifetime which means this value can only be a constant in the + /// program and hence we hope the developer checked that it is of an acceptable length. + pub fn from_static(value: &'static str) -> Self { + if value.len() > crate::MAX_NAMESPACE { + panic!("Namespace '{value}' is too long!") + } + + Namespace(value.to_owned()) + } + + pub fn new(value: String) -> Result { + if value.len() > crate::MAX_NAMESPACE { + return Err(NamespaceTooLong); + } + + Ok(Namespace(value)) + } +} + +impl From for String { + fn from(namespace: Namespace) -> Self { + namespace.0 + } +} + +impl fmt::Display for Namespace { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl PartialEq for Namespace { + fn eq(&self, other: &str) -> bool { + self.0.eq(other) + } +} + +impl PartialEq for str { + fn eq(&self, other: &Namespace) -> bool { + other.0.eq(self) + } +} + +#[derive(Debug, thiserror::Error)] +#[error("Namespace is too long")] +pub struct NamespaceTooLong; + +#[derive(Debug, Eq, PartialEq, Hash, Clone)] +pub struct Cookie { + id: u64, + namespace: Option, +} + +impl Cookie { + /// Construct a new [`Cookie`] for a given namespace. + /// + /// This cookie will only be valid for subsequent DISCOVER requests targeting the same + /// namespace. + pub fn for_namespace(namespace: Namespace) -> Self { + Self { + id: rand::thread_rng().next_u64(), + namespace: Some(namespace), + } + } + + /// Construct a new [`Cookie`] for a DISCOVER request that inquires about all namespaces. + pub fn for_all_namespaces() -> Self { + Self { + id: rand::random(), + namespace: None, + } + } + + pub fn into_wire_encoding(self) -> Vec { + let id_bytes = self.id.to_be_bytes(); + let namespace = self.namespace.map(|ns| ns.0).unwrap_or_default(); + + let mut buffer = Vec::with_capacity(id_bytes.len() + namespace.len()); + buffer.extend_from_slice(&id_bytes); + buffer.extend_from_slice(namespace.as_bytes()); + + buffer + } + + pub fn from_wire_encoding(mut bytes: Vec) -> Result { + // check length early to avoid panic during slicing + if bytes.len() < 8 { + return Err(InvalidCookie); + } + + let namespace = bytes.split_off(8); + let namespace = if namespace.is_empty() { + None + } else { + Some( + Namespace::new(String::from_utf8(namespace).map_err(|_| InvalidCookie)?) + .map_err(|_| InvalidCookie)?, + ) + }; + + let bytes = <[u8; 8]>::try_from(bytes).map_err(|_| InvalidCookie)?; + let id = u64::from_be_bytes(bytes); + + Ok(Self { id, namespace }) + } + + pub fn namespace(&self) -> Option<&Namespace> { + self.namespace.as_ref() + } +} + +#[derive(Debug, thiserror::Error)] +#[error("The cookie was malformed")] +pub struct InvalidCookie; + +#[derive(Debug, Clone, PartialEq)] +pub struct NewRegistration { + pub namespace: Namespace, + pub record: PeerRecord, + pub ttl: Option, +} + +impl NewRegistration { + pub fn new(namespace: Namespace, record: PeerRecord, ttl: Option) -> Self { + Self { + namespace, + record, + ttl, + } + } + + pub fn effective_ttl(&self) -> Ttl { + self.ttl.unwrap_or(DEFAULT_TTL) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Registration { + pub namespace: Namespace, + pub record: PeerRecord, + pub ttl: Ttl, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ErrorCode { + InvalidNamespace, + InvalidSignedPeerRecord, + InvalidTtl, + InvalidCookie, + NotAuthorized, + InternalError, + Unavailable, +} + +impl Encoder for Codec { + type Item<'a> = Message; + type Error = Error; + + fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> { + let mut pb: ProtobufCodec = ProtobufCodec::new(MAX_MESSAGE_LEN_BYTES); + + pb.encode(proto::Message::from(item), dst)?; + + Ok(()) + } +} + +impl Decoder for Codec { + type Item = Message; + type Error = Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let mut pb: ProtobufCodec = ProtobufCodec::new(MAX_MESSAGE_LEN_BYTES); + + let Some(message) = pb.decode(src)? else { + return Ok(None); + }; + + Ok(Some(message.try_into()?)) + } +} + +#[derive(Clone, Default)] +pub struct Codec {} + +#[async_trait] +impl libp2p_request_response::Codec for Codec { + type Protocol = StreamProtocol; + type Request = Message; + type Response = Message; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let message = FramedRead::new(io, self.clone()) + .next() + .await + .ok_or(io::ErrorKind::UnexpectedEof)??; + + Ok(message) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let message = FramedRead::new(io, self.clone()) + .next() + .await + .ok_or(io::ErrorKind::UnexpectedEof)??; + + Ok(message) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + FramedWrite::new(io, self.clone()).send(req).await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + FramedWrite::new(io, self.clone()).send(res).await?; + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Codec(#[from] quick_protobuf_codec::Error), + #[error("Failed to read/write")] + Io(#[from] std::io::Error), + #[error("Failed to convert wire message to internal data model")] + Conversion(#[from] ConversionError), +} + +impl From for std::io::Error { + fn from(value: Error) -> Self { + match value { + Error::Io(e) => e, + Error::Codec(e) => io::Error::from(e), + Error::Conversion(e) => io::Error::new(io::ErrorKind::InvalidInput, e), + } + } +} + +impl From for proto::Message { + fn from(message: Message) -> Self { + match message { + Message::Register{new_registration: NewRegistration{namespace, record, ttl}, auth_token} => proto::Message { + type_pb: Some(proto::MessageType::REGISTER), + register: Some(proto::Register { + ns: Some(namespace.into()), + ttl, + signedPeerRecord: Some(record.into_signed_envelope().into_protobuf_encoding()), + authToken: auth_token, + }), + registerResponse: None, + unregister: None, + discover: None, + discoverResponse: None, + }, + Message::RegisterResponse(Ok(ttl)) => proto::Message { + type_pb: Some(proto::MessageType::REGISTER_RESPONSE), + registerResponse: Some(proto::RegisterResponse { + status: Some(proto::ResponseStatus::OK), + statusText: None, + ttl: Some(ttl), + }), + register: None, + discover: None, + unregister: None, + discoverResponse: None, + }, + Message::RegisterResponse(Err(error)) => proto::Message { + type_pb: Some(proto::MessageType::REGISTER_RESPONSE), + registerResponse: Some(proto::RegisterResponse { + status: Some(proto::ResponseStatus::from(error)), + statusText: None, + ttl: None, + }), + register: None, + discover: None, + unregister: None, + discoverResponse: None, + }, + Message::Unregister(namespace) => proto::Message { + type_pb: Some(proto::MessageType::UNREGISTER), + unregister: Some(proto::Unregister { + ns: Some(namespace.into()), + id: None, + }), + register: None, + registerResponse: None, + discover: None, + discoverResponse: None, + }, + Message::Discover { + namespace, + cookie, + limit, + auth_token, + } => proto::Message { + type_pb: Some(proto::MessageType::DISCOVER), + discover: Some(proto::Discover { + ns: namespace.map(|ns| ns.into()), + cookie: cookie.map(|cookie| cookie.into_wire_encoding()), + limit, + authToken: auth_token, + }), + register: None, + registerResponse: None, + unregister: None, + discoverResponse: None, + }, + Message::DiscoverResponse(Ok((registrations, cookie))) => proto::Message { + type_pb: Some(proto::MessageType::DISCOVER_RESPONSE), + discoverResponse: Some(proto::DiscoverResponse { + registrations: registrations + .into_iter() + .map(|reggo| proto::Register { + ns: Some(reggo.namespace.into()), + ttl: Some(reggo.ttl), + signedPeerRecord: Some( + reggo.record.into_signed_envelope().into_protobuf_encoding(), + ), + authToken: None, + }) + .collect(), + status: Some(proto::ResponseStatus::OK), + statusText: None, + cookie: Some(cookie.into_wire_encoding()), + }), + register: None, + discover: None, + unregister: None, + registerResponse: None, + }, + Message::DiscoverResponse(Err(error)) => proto::Message { + type_pb: Some(proto::MessageType::DISCOVER_RESPONSE), + discoverResponse: Some(proto::DiscoverResponse { + registrations: Vec::new(), + status: Some(proto::ResponseStatus::from(error)), + statusText: None, + cookie: None, + }), + register: None, + discover: None, + unregister: None, + registerResponse: None, + }, + } + } +} + +impl TryFrom for Message { + type Error = ConversionError; + + fn try_from(message: proto::Message) -> Result { + let message = match message { + proto::Message { + type_pb: Some(proto::MessageType::REGISTER), + register: + Some(proto::Register { + ns, + ttl, + signedPeerRecord: Some(signed_peer_record), + authToken, + }), + .. + } => Message::Register{new_registration: NewRegistration { + namespace: ns + .map(Namespace::new) + .transpose()? + .ok_or(ConversionError::MissingNamespace)?, + ttl, + record: PeerRecord::from_signed_envelope(SignedEnvelope::from_protobuf_encoding(&signed_peer_record)?)?, + }, auth_token: authToken}, + proto::Message { + type_pb: Some(proto::MessageType::REGISTER_RESPONSE), + registerResponse: + Some(proto::RegisterResponse { + status: Some(proto::ResponseStatus::OK), + ttl, + .. + }), + .. + } => Message::RegisterResponse(Ok(ttl.ok_or(ConversionError::MissingTtl)?)), + proto::Message { + type_pb: Some(proto::MessageType::DISCOVER), + discover: Some(proto::Discover { ns, limit, cookie, authToken }), + .. + } => Message::Discover { + namespace: ns.map(Namespace::new).transpose()?, + cookie: cookie.map(Cookie::from_wire_encoding).transpose()?, + limit, + auth_token: authToken, + }, + proto::Message { + type_pb: Some(proto::MessageType::DISCOVER_RESPONSE), + discoverResponse: + Some(proto::DiscoverResponse { + registrations, + status: Some(proto::ResponseStatus::OK), + cookie: Some(cookie), + .. + }), + .. + } => { + let registrations = registrations + .into_iter() + .map(|reggo| { + Ok(Registration { + namespace: reggo + .ns + .map(Namespace::new) + .transpose()? + .ok_or(ConversionError::MissingNamespace)?, + record: PeerRecord::from_signed_envelope( + SignedEnvelope::from_protobuf_encoding( + ®go + .signedPeerRecord + .ok_or(ConversionError::MissingSignedPeerRecord)?, + )?, + )?, + ttl: reggo.ttl.ok_or(ConversionError::MissingTtl)?, + }) + }) + .collect::, ConversionError>>()?; + let cookie = Cookie::from_wire_encoding(cookie)?; + + Message::DiscoverResponse(Ok((registrations, cookie))) + } + proto::Message { + type_pb: Some(proto::MessageType::REGISTER_RESPONSE), + registerResponse: + Some(proto::RegisterResponse { + status: Some(response_status), + .. + }), + .. + } => Message::RegisterResponse(Err(response_status.try_into()?)), + proto::Message { + type_pb: Some(proto::MessageType::UNREGISTER), + unregister: Some(proto::Unregister { ns, .. }), + .. + } => Message::Unregister( + ns.map(Namespace::new) + .transpose()? + .ok_or(ConversionError::MissingNamespace)?, + ), + proto::Message { + type_pb: Some(proto::MessageType::DISCOVER_RESPONSE), + discoverResponse: + Some(proto::DiscoverResponse { + status: Some(response_status), + .. + }), + .. + } => Message::DiscoverResponse(Err(response_status.try_into()?)), + _ => return Err(ConversionError::InconsistentWireMessage), + }; + + Ok(message) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ConversionError { + #[error("The wire message is consistent")] + InconsistentWireMessage, + #[error("Missing namespace field")] + MissingNamespace, + #[error("Invalid namespace")] + InvalidNamespace(#[from] NamespaceTooLong), + #[error("Missing signed peer record field")] + MissingSignedPeerRecord, + #[error("Missing TTL field")] + MissingTtl, + #[error("Bad status code")] + BadStatusCode, + #[error("Failed to decode signed envelope")] + BadSignedEnvelope(#[from] signed_envelope::DecodingError), + #[error("Failed to decode envelope as signed peer record")] + BadSignedPeerRecord(#[from] peer_record::FromEnvelopeError), + #[error(transparent)] + BadCookie(#[from] InvalidCookie), + #[error("The requested PoW difficulty is out of range")] + PoWDifficultyOutOfRange, + #[error("The provided PoW hash is not 32 bytes long")] + BadPoWHash, +} + +impl ConversionError { + pub fn to_error_code(&self) -> ErrorCode { + match self { + ConversionError::MissingNamespace => ErrorCode::InvalidNamespace, + ConversionError::MissingSignedPeerRecord => ErrorCode::InvalidSignedPeerRecord, + ConversionError::BadSignedEnvelope(_) => ErrorCode::InvalidSignedPeerRecord, + ConversionError::BadSignedPeerRecord(_) => ErrorCode::InvalidSignedPeerRecord, + ConversionError::BadCookie(_) => ErrorCode::InvalidCookie, + ConversionError::MissingTtl => ErrorCode::InvalidTtl, + ConversionError::InconsistentWireMessage => ErrorCode::InternalError, + ConversionError::BadStatusCode => ErrorCode::InternalError, + ConversionError::PoWDifficultyOutOfRange => ErrorCode::InternalError, + ConversionError::BadPoWHash => ErrorCode::InternalError, + ConversionError::InvalidNamespace(_) => ErrorCode::InvalidNamespace, + } + } +} + +impl TryFrom for ErrorCode { + type Error = UnmappableStatusCode; + + fn try_from(value: proto::ResponseStatus) -> Result { + use proto::ResponseStatus::*; + + let code = match value { + OK => return Err(UnmappableStatusCode(value)), + E_INVALID_NAMESPACE => ErrorCode::InvalidNamespace, + E_INVALID_SIGNED_PEER_RECORD => ErrorCode::InvalidSignedPeerRecord, + E_INVALID_TTL => ErrorCode::InvalidTtl, + E_INVALID_COOKIE => ErrorCode::InvalidCookie, + E_NOT_AUTHORIZED => ErrorCode::NotAuthorized, + E_INTERNAL_ERROR => ErrorCode::InternalError, + E_UNAVAILABLE => ErrorCode::Unavailable, + }; + + Ok(code) + } +} + +impl From for proto::ResponseStatus { + fn from(error_code: ErrorCode) -> Self { + use proto::ResponseStatus::*; + + match error_code { + ErrorCode::InvalidNamespace => E_INVALID_NAMESPACE, + ErrorCode::InvalidSignedPeerRecord => E_INVALID_SIGNED_PEER_RECORD, + ErrorCode::InvalidTtl => E_INVALID_TTL, + ErrorCode::InvalidCookie => E_INVALID_COOKIE, + ErrorCode::NotAuthorized => E_NOT_AUTHORIZED, + ErrorCode::InternalError => E_INTERNAL_ERROR, + ErrorCode::Unavailable => E_UNAVAILABLE, + } + } +} + +impl From for ConversionError { + fn from(_: UnmappableStatusCode) -> Self { + ConversionError::InconsistentWireMessage + } +} + +#[derive(Debug, thiserror::Error)] +#[error("The response code ({0:?}) cannot be mapped to our ErrorCode enum")] +pub struct UnmappableStatusCode(proto::ResponseStatus); + +mod proto { + #![allow(unreachable_pub)] + include!("generated/mod.rs"); + pub(crate) use self::rendezvous::pb::{mod_Message::*, Message}; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cookie_wire_encoding_roundtrip() { + let cookie = Cookie::for_namespace(Namespace::from_static("foo")); + + let bytes = cookie.clone().into_wire_encoding(); + let parsed = Cookie::from_wire_encoding(bytes).unwrap(); + + assert_eq!(parsed, cookie); + } + + #[test] + fn cookie_wire_encoding_length() { + let cookie = Cookie::for_namespace(Namespace::from_static("foo")); + + let bytes = cookie.into_wire_encoding(); + + assert_eq!(bytes.len(), 8 + 3) + } +} diff --git a/protocols/auth-rendezvous/src/generated/mod.rs b/protocols/auth-rendezvous/src/generated/mod.rs new file mode 100644 index 00000000000..448a0207b2d --- /dev/null +++ b/protocols/auth-rendezvous/src/generated/mod.rs @@ -0,0 +1,2 @@ +// Automatically generated mod.rs +pub mod rendezvous; diff --git a/protocols/auth-rendezvous/src/generated/rendezvous/mod.rs b/protocols/auth-rendezvous/src/generated/rendezvous/mod.rs new file mode 100644 index 00000000000..aec6164c7ef --- /dev/null +++ b/protocols/auth-rendezvous/src/generated/rendezvous/mod.rs @@ -0,0 +1,2 @@ +// Automatically generated mod.rs +pub mod pb; diff --git a/protocols/auth-rendezvous/src/generated/rendezvous/pb.rs b/protocols/auth-rendezvous/src/generated/rendezvous/pb.rs new file mode 100644 index 00000000000..914ff3722f1 --- /dev/null +++ b/protocols/auth-rendezvous/src/generated/rendezvous/pb.rs @@ -0,0 +1,372 @@ +// Automatically generated rust module for 'rpc.proto' file + +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(unused_imports)] +#![allow(unknown_lints)] +#![allow(clippy::all)] +#![cfg_attr(rustfmt, rustfmt_skip)] + + +use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result}; +use quick_protobuf::sizeofs::*; +use super::super::*; + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Message { + pub type_pb: Option, + pub register: Option, + pub registerResponse: Option, + pub unregister: Option, + pub discover: Option, + pub discoverResponse: Option, +} + +impl<'a> MessageRead<'a> for Message { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(8) => msg.type_pb = Some(r.read_enum(bytes)?), + Ok(18) => msg.register = Some(r.read_message::(bytes)?), + Ok(26) => msg.registerResponse = Some(r.read_message::(bytes)?), + Ok(34) => msg.unregister = Some(r.read_message::(bytes)?), + Ok(42) => msg.discover = Some(r.read_message::(bytes)?), + Ok(50) => msg.discoverResponse = Some(r.read_message::(bytes)?), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for Message { + fn get_size(&self) -> usize { + 0 + + self.type_pb.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + + self.register.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) + + self.registerResponse.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) + + self.unregister.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) + + self.discover.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) + + self.discoverResponse.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.type_pb { w.write_with_tag(8, |w| w.write_enum(*s as i32))?; } + if let Some(ref s) = self.register { w.write_with_tag(18, |w| w.write_message(s))?; } + if let Some(ref s) = self.registerResponse { w.write_with_tag(26, |w| w.write_message(s))?; } + if let Some(ref s) = self.unregister { w.write_with_tag(34, |w| w.write_message(s))?; } + if let Some(ref s) = self.discover { w.write_with_tag(42, |w| w.write_message(s))?; } + if let Some(ref s) = self.discoverResponse { w.write_with_tag(50, |w| w.write_message(s))?; } + Ok(()) + } +} + +pub mod mod_Message { + +use super::*; + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Register { + pub ns: Option, + pub signedPeerRecord: Option>, + pub ttl: Option, + pub authToken: Option>, +} + +impl<'a> MessageRead<'a> for Register { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.ns = Some(r.read_string(bytes)?.to_owned()), + Ok(18) => msg.signedPeerRecord = Some(r.read_bytes(bytes)?.to_owned()), + Ok(24) => msg.ttl = Some(r.read_uint64(bytes)?), + Ok(34) => msg.authToken = Some(r.read_bytes(bytes)?.to_owned()), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for Register { + fn get_size(&self) -> usize { + 0 + + self.ns.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.signedPeerRecord.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.ttl.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + + self.authToken.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.ns { w.write_with_tag(10, |w| w.write_string(&**s))?; } + if let Some(ref s) = self.signedPeerRecord { w.write_with_tag(18, |w| w.write_bytes(&**s))?; } + if let Some(ref s) = self.ttl { w.write_with_tag(24, |w| w.write_uint64(*s))?; } + if let Some(ref s) = self.authToken { w.write_with_tag(34, |w| w.write_bytes(&**s))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct RegisterResponse { + pub status: Option, + pub statusText: Option, + pub ttl: Option, +} + +impl<'a> MessageRead<'a> for RegisterResponse { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(8) => msg.status = Some(r.read_enum(bytes)?), + Ok(18) => msg.statusText = Some(r.read_string(bytes)?.to_owned()), + Ok(24) => msg.ttl = Some(r.read_uint64(bytes)?), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for RegisterResponse { + fn get_size(&self) -> usize { + 0 + + self.status.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + + self.statusText.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.ttl.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.status { w.write_with_tag(8, |w| w.write_enum(*s as i32))?; } + if let Some(ref s) = self.statusText { w.write_with_tag(18, |w| w.write_string(&**s))?; } + if let Some(ref s) = self.ttl { w.write_with_tag(24, |w| w.write_uint64(*s))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Unregister { + pub ns: Option, + pub id: Option>, +} + +impl<'a> MessageRead<'a> for Unregister { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.ns = Some(r.read_string(bytes)?.to_owned()), + Ok(18) => msg.id = Some(r.read_bytes(bytes)?.to_owned()), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for Unregister { + fn get_size(&self) -> usize { + 0 + + self.ns.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.id.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.ns { w.write_with_tag(10, |w| w.write_string(&**s))?; } + if let Some(ref s) = self.id { w.write_with_tag(18, |w| w.write_bytes(&**s))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Discover { + pub ns: Option, + pub limit: Option, + pub cookie: Option>, + pub authToken: Option>, +} + +impl<'a> MessageRead<'a> for Discover { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.ns = Some(r.read_string(bytes)?.to_owned()), + Ok(16) => msg.limit = Some(r.read_uint64(bytes)?), + Ok(26) => msg.cookie = Some(r.read_bytes(bytes)?.to_owned()), + Ok(34) => msg.authToken = Some(r.read_bytes(bytes)?.to_owned()), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for Discover { + fn get_size(&self) -> usize { + 0 + + self.ns.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.limit.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + + self.cookie.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.authToken.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.ns { w.write_with_tag(10, |w| w.write_string(&**s))?; } + if let Some(ref s) = self.limit { w.write_with_tag(16, |w| w.write_uint64(*s))?; } + if let Some(ref s) = self.cookie { w.write_with_tag(26, |w| w.write_bytes(&**s))?; } + if let Some(ref s) = self.authToken { w.write_with_tag(34, |w| w.write_bytes(&**s))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct DiscoverResponse { + pub registrations: Vec, + pub cookie: Option>, + pub status: Option, + pub statusText: Option, +} + +impl<'a> MessageRead<'a> for DiscoverResponse { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.registrations.push(r.read_message::(bytes)?), + Ok(18) => msg.cookie = Some(r.read_bytes(bytes)?.to_owned()), + Ok(24) => msg.status = Some(r.read_enum(bytes)?), + Ok(34) => msg.statusText = Some(r.read_string(bytes)?.to_owned()), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for DiscoverResponse { + fn get_size(&self) -> usize { + 0 + + self.registrations.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + + self.cookie.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.status.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + + self.statusText.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + for s in &self.registrations { w.write_with_tag(10, |w| w.write_message(s))?; } + if let Some(ref s) = self.cookie { w.write_with_tag(18, |w| w.write_bytes(&**s))?; } + if let Some(ref s) = self.status { w.write_with_tag(24, |w| w.write_enum(*s as i32))?; } + if let Some(ref s) = self.statusText { w.write_with_tag(34, |w| w.write_string(&**s))?; } + Ok(()) + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum MessageType { + REGISTER = 0, + REGISTER_RESPONSE = 1, + UNREGISTER = 2, + DISCOVER = 3, + DISCOVER_RESPONSE = 4, +} + +impl Default for MessageType { + fn default() -> Self { + MessageType::REGISTER + } +} + +impl From for MessageType { + fn from(i: i32) -> Self { + match i { + 0 => MessageType::REGISTER, + 1 => MessageType::REGISTER_RESPONSE, + 2 => MessageType::UNREGISTER, + 3 => MessageType::DISCOVER, + 4 => MessageType::DISCOVER_RESPONSE, + _ => Self::default(), + } + } +} + +impl<'a> From<&'a str> for MessageType { + fn from(s: &'a str) -> Self { + match s { + "REGISTER" => MessageType::REGISTER, + "REGISTER_RESPONSE" => MessageType::REGISTER_RESPONSE, + "UNREGISTER" => MessageType::UNREGISTER, + "DISCOVER" => MessageType::DISCOVER, + "DISCOVER_RESPONSE" => MessageType::DISCOVER_RESPONSE, + _ => Self::default(), + } + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum ResponseStatus { + OK = 0, + E_INVALID_NAMESPACE = 100, + E_INVALID_SIGNED_PEER_RECORD = 101, + E_INVALID_TTL = 102, + E_INVALID_COOKIE = 103, + E_NOT_AUTHORIZED = 200, + E_INTERNAL_ERROR = 300, + E_UNAVAILABLE = 400, +} + +impl Default for ResponseStatus { + fn default() -> Self { + ResponseStatus::OK + } +} + +impl From for ResponseStatus { + fn from(i: i32) -> Self { + match i { + 0 => ResponseStatus::OK, + 100 => ResponseStatus::E_INVALID_NAMESPACE, + 101 => ResponseStatus::E_INVALID_SIGNED_PEER_RECORD, + 102 => ResponseStatus::E_INVALID_TTL, + 103 => ResponseStatus::E_INVALID_COOKIE, + 200 => ResponseStatus::E_NOT_AUTHORIZED, + 300 => ResponseStatus::E_INTERNAL_ERROR, + 400 => ResponseStatus::E_UNAVAILABLE, + _ => Self::default(), + } + } +} + +impl<'a> From<&'a str> for ResponseStatus { + fn from(s: &'a str) -> Self { + match s { + "OK" => ResponseStatus::OK, + "E_INVALID_NAMESPACE" => ResponseStatus::E_INVALID_NAMESPACE, + "E_INVALID_SIGNED_PEER_RECORD" => ResponseStatus::E_INVALID_SIGNED_PEER_RECORD, + "E_INVALID_TTL" => ResponseStatus::E_INVALID_TTL, + "E_INVALID_COOKIE" => ResponseStatus::E_INVALID_COOKIE, + "E_NOT_AUTHORIZED" => ResponseStatus::E_NOT_AUTHORIZED, + "E_INTERNAL_ERROR" => ResponseStatus::E_INTERNAL_ERROR, + "E_UNAVAILABLE" => ResponseStatus::E_UNAVAILABLE, + _ => Self::default(), + } + } +} + +} + diff --git a/protocols/auth-rendezvous/src/generated/rpc.proto b/protocols/auth-rendezvous/src/generated/rpc.proto new file mode 100644 index 00000000000..d46d5641466 --- /dev/null +++ b/protocols/auth-rendezvous/src/generated/rpc.proto @@ -0,0 +1,63 @@ +syntax = "proto2"; + +package rendezvous.pb; + +message Message { + enum MessageType { + REGISTER = 0; + REGISTER_RESPONSE = 1; + UNREGISTER = 2; + DISCOVER = 3; + DISCOVER_RESPONSE = 4; + } + + enum ResponseStatus { + OK = 0; + E_INVALID_NAMESPACE = 100; + E_INVALID_SIGNED_PEER_RECORD = 101; + E_INVALID_TTL = 102; + E_INVALID_COOKIE = 103; + E_NOT_AUTHORIZED = 200; + E_INTERNAL_ERROR = 300; + E_UNAVAILABLE = 400; + } + + message Register { + optional string ns = 1; + optional bytes signedPeerRecord = 2; + optional uint64 ttl = 3; // in seconds + optional bytes authToken = 4; + } + + message RegisterResponse { + optional ResponseStatus status = 1; + optional string statusText = 2; + optional uint64 ttl = 3; // in seconds + } + + message Unregister { + optional string ns = 1; + optional bytes id = 2; + } + + message Discover { + optional string ns = 1; + optional uint64 limit = 2; + optional bytes cookie = 3; + optional bytes authToken = 4; + } + + message DiscoverResponse { + repeated Register registrations = 1; + optional bytes cookie = 2; + optional ResponseStatus status = 3; + optional string statusText = 4; + } + + optional MessageType type = 1; + optional Register register = 2; + optional RegisterResponse registerResponse = 3; + optional Unregister unregister = 4; + optional Discover discover = 5; + optional DiscoverResponse discoverResponse = 6; +} diff --git a/protocols/auth-rendezvous/src/lib.rs b/protocols/auth-rendezvous/src/lib.rs new file mode 100644 index 00000000000..a34a0cbe3eb --- /dev/null +++ b/protocols/auth-rendezvous/src/lib.rs @@ -0,0 +1,54 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the [Rendezvous](https://github.com/libp2p/specs/blob/master/rendezvous/README.md) protocol. + +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +use libp2p_swarm::StreamProtocol; + +pub use self::codec::{Cookie, ErrorCode, Namespace, NamespaceTooLong, Registration, Ttl, NewRegistration}; + +mod codec; + +/// If unspecified, rendezvous nodes should assume a TTL of 2h. +/// +/// See . +pub const DEFAULT_TTL: Ttl = 60 * 60 * 2; + +/// By default, nodes should require a minimum TTL of 2h +/// +/// . +pub const MIN_TTL: Ttl = 60 * 60 * 2; + +/// By default, nodes should allow a maximum TTL of 72h +/// +/// . +pub const MAX_TTL: Ttl = 60 * 60 * 72; + +/// The maximum namespace length. +/// +/// . +pub const MAX_NAMESPACE: usize = 255; + +pub(crate) const PROTOCOL_IDENT: StreamProtocol = StreamProtocol::new("/rendezvous/1.0.0"); + +pub mod client; +pub mod server; diff --git a/protocols/auth-rendezvous/src/server.rs b/protocols/auth-rendezvous/src/server.rs new file mode 100644 index 00000000000..35e692e10cd --- /dev/null +++ b/protocols/auth-rendezvous/src/server.rs @@ -0,0 +1,860 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + collections::{HashMap, HashSet}, + iter, + task::{ready, Context, Poll}, + time::Duration, +}; + +use bimap::BiMap; +use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use libp2p_core::{transport::PortUse, Endpoint, Multiaddr}; +use libp2p_identity::PeerId; +use libp2p_request_response::ProtocolSupport; +use libp2p_swarm::{ + behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, +}; + +use crate::{ + codec::{Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, Ttl}, + MAX_TTL, MIN_TTL, +}; + +pub type AuthorizeFn = + Box, &Option>) -> bool + Send + Sync>; + +pub struct Behaviour { + inner: libp2p_request_response::Behaviour, + + registrations: Registrations, + authorized_peers: HashMap, HashSet>>, + authorize_fn: AuthorizeFn, +} + +pub struct Config { + min_ttl: Ttl, + max_ttl: Ttl, +} + +impl Config { + pub fn with_min_ttl(mut self, min_ttl: Ttl) -> Self { + self.min_ttl = min_ttl; + self + } + + pub fn with_max_ttl(mut self, max_ttl: Ttl) -> Self { + self.max_ttl = max_ttl; + self + } +} + +impl Default for Config { + fn default() -> Self { + Self { + min_ttl: MIN_TTL, + max_ttl: MAX_TTL, + } + } +} + +impl Behaviour { + /// Create a new instance of the rendezvous [`NetworkBehaviour`]. + pub fn new(config: Config, authorize_fn: AuthorizeFn) -> Self { + Self { + inner: libp2p_request_response::Behaviour::with_codec( + crate::codec::Codec::default(), + iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Inbound)), + libp2p_request_response::Config::default(), + ), + + registrations: Registrations::with_config(config), + authorized_peers: HashMap::new(), + authorize_fn: authorize_fn, + } + } + + pub fn add_registration( + &mut self, + registration: NewRegistration, + ) -> Result { + self.registrations.add(registration) + } + + pub fn add_authorized_peer_set(&mut self, namespace: Vec, set: HashSet>) { + self.authorized_peers.insert(namespace, set); + } +} + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum Event { + /// We successfully served a discover request from a peer. + DiscoverServed { + enquirer: PeerId, + registrations: Vec, + }, + /// We failed to serve a discover request for a peer. + DiscoverNotServed { enquirer: PeerId, error: ErrorCode }, + /// A peer successfully registered with us. + PeerRegistered { + peer: PeerId, + registration: Registration, + }, + /// We declined a registration from a peer. + PeerNotRegistered { + peer: PeerId, + namespace: Namespace, + error: ErrorCode, + }, + /// A peer successfully unregistered with us. + PeerUnregistered { peer: PeerId, namespace: Namespace }, + /// A registration from a peer expired. + RegistrationExpired(Registration), +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; + + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.inner.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + port_use: PortUse, + ) -> Result, ConnectionDenied> { + self.inner.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection: ConnectionId, + event: THandlerOutEvent, + ) { + self.inner + .on_connection_handler_event(peer_id, connection, event); + } + + #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))] + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) { + return Poll::Ready(ToSwarm::GenerateEvent(Event::RegistrationExpired( + registration, + ))); + } + + loop { + if let Poll::Ready(to_swarm) = self.inner.poll(cx) { + match to_swarm { + ToSwarm::GenerateEvent(libp2p_request_response::Event::Message { + peer: peer_id, + message: + libp2p_request_response::Message::Request { + request, channel, .. + }, + .. + }) => { + if let Some((event, response)) = handle_request( + peer_id, + request, + &mut self.registrations, + &self.authorize_fn, + ) { + if let Some(resp) = response { + if let Err(resp) = self.inner.send_response(channel, resp) { + tracing::debug!( + %peer_id, + "Failed to send response, peer disconnected {resp:?}" + ); + } + } + + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + continue; + } + ToSwarm::GenerateEvent(libp2p_request_response::Event::InboundFailure { + peer, + request_id, + error, + .. + }) => { + tracing::warn!( + %peer, + request=%request_id, + "Inbound request with peer failed: {error}" + ); + + continue; + } + ToSwarm::GenerateEvent(libp2p_request_response::Event::ResponseSent { + .. + }) + | ToSwarm::GenerateEvent(libp2p_request_response::Event::Message { + peer: _, + message: libp2p_request_response::Message::Response { .. }, + .. + }) + | ToSwarm::GenerateEvent(libp2p_request_response::Event::OutboundFailure { + .. + }) => { + continue; + } + other => { + let new_to_swarm = other + .map_out(|_| unreachable!("we manually map `GenerateEvent` variants")); + + return Poll::Ready(new_to_swarm); + } + }; + } + + return Poll::Pending; + } + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + self.inner.on_swarm_event(event); + } +} + +fn handle_request( + peer_id: PeerId, + message: Message, + registrations: &mut Registrations, + authorize_fn: &AuthorizeFn, +) -> Option<(Event, Option)> { + match message { + Message::Register { + new_registration: registration, + auth_token, + } => { + if registration.record.peer_id() != peer_id { + let error = ErrorCode::NotAuthorized; + + let event = Event::PeerNotRegistered { + peer: peer_id, + namespace: registration.namespace, + error, + }; + + return Some((event, Some(Message::RegisterResponse(Err(error))))); + } + + if !(*authorize_fn)(&peer_id, Some(®istration.namespace), &auth_token) { + let error = ErrorCode::NotAuthorized; + + let event = Event::PeerNotRegistered { + peer: peer_id, + namespace: registration.namespace, + error, + }; + + return Some((event, Some(Message::RegisterResponse(Err(error))))); + } + + let namespace = registration.namespace.clone(); + + match registrations.add(registration) { + Ok(registration) => { + let response = Message::RegisterResponse(Ok(registration.ttl)); + + let event = Event::PeerRegistered { + peer: peer_id, + registration, + }; + + Some((event, Some(response))) + } + Err(TtlOutOfRange::TooLong { .. }) | Err(TtlOutOfRange::TooShort { .. }) => { + let error = ErrorCode::InvalidTtl; + + let response = Message::RegisterResponse(Err(error)); + + let event = Event::PeerNotRegistered { + peer: peer_id, + namespace, + error, + }; + + Some((event, Some(response))) + } + } + } + Message::Unregister(namespace) => { + registrations.remove(namespace.clone(), peer_id); + + let event = Event::PeerUnregistered { + peer: peer_id, + namespace, + }; + + Some((event, None)) + } + Message::Discover { + namespace, + cookie, + limit, + auth_token, + } => { + if !(*authorize_fn)(&peer_id, namespace.as_ref(), &auth_token) { + let error = ErrorCode::NotAuthorized; + let response = Message::DiscoverResponse(Err(error)); + let event = Event::DiscoverNotServed { + enquirer: peer_id, + error, + }; + return Some((event, Some(response))); + } + + match registrations.get(namespace, cookie, limit) { + Ok((registrations, cookie)) => { + let discovered = registrations.cloned().collect::>(); + + let response = Message::DiscoverResponse(Ok((discovered.clone(), cookie))); + + let event = Event::DiscoverServed { + enquirer: peer_id, + registrations: discovered, + }; + + Some((event, Some(response))) + } + Err(_) => { + let error = ErrorCode::InvalidCookie; + + let response = Message::DiscoverResponse(Err(error)); + + let event = Event::DiscoverNotServed { + enquirer: peer_id, + error, + }; + + Some((event, Some(response))) + } + } + } + Message::RegisterResponse(_) => None, + Message::DiscoverResponse(_) => None, + } +} + +#[derive(Debug, Eq, PartialEq, Hash, Copy, Clone)] +struct RegistrationId(u64); + +impl RegistrationId { + fn new() -> Self { + Self(rand::random()) + } +} + +#[derive(Debug, PartialEq)] +struct ExpiredRegistration(Registration); + +pub struct Registrations { + registrations_for_peer: BiMap<(PeerId, Namespace), RegistrationId>, + registrations: HashMap, + cookies: HashMap>, + min_ttl: Ttl, + max_ttl: Ttl, + next_expiry: FuturesUnordered>, +} + +#[derive(Debug, thiserror::Error)] +pub enum TtlOutOfRange { + #[error("Requested TTL ({requested}s) is too long; max {bound}s")] + TooLong { bound: Ttl, requested: Ttl }, + #[error("Requested TTL ({requested}s) is too short; min {bound}s")] + TooShort { bound: Ttl, requested: Ttl }, +} + +impl Default for Registrations { + fn default() -> Self { + Registrations::with_config(Config::default()) + } +} + +impl Registrations { + pub fn with_config(config: Config) -> Self { + Self { + registrations_for_peer: Default::default(), + registrations: Default::default(), + min_ttl: config.min_ttl, + max_ttl: config.max_ttl, + cookies: Default::default(), + next_expiry: FuturesUnordered::from_iter(vec![futures::future::pending().boxed()]), + } + } + + pub fn add( + &mut self, + new_registration: NewRegistration, + ) -> Result { + let ttl = new_registration.effective_ttl(); + if ttl > self.max_ttl { + return Err(TtlOutOfRange::TooLong { + bound: self.max_ttl, + requested: ttl, + }); + } + if ttl < self.min_ttl { + return Err(TtlOutOfRange::TooShort { + bound: self.min_ttl, + requested: ttl, + }); + } + + let namespace = new_registration.namespace; + let registration_id = RegistrationId::new(); + + if let Some(old_registration) = self + .registrations_for_peer + .get_by_left(&(new_registration.record.peer_id(), namespace.clone())) + { + self.registrations.remove(old_registration); + } + + self.registrations_for_peer.insert( + (new_registration.record.peer_id(), namespace.clone()), + registration_id, + ); + + let registration = Registration { + namespace, + record: new_registration.record, + ttl, + }; + self.registrations + .insert(registration_id, registration.clone()); + + let next_expiry = futures_timer::Delay::new(Duration::from_secs(ttl)) + .map(move |_| registration_id) + .boxed(); + + self.next_expiry.push(next_expiry); + + Ok(registration) + } + + pub fn remove(&mut self, namespace: Namespace, peer_id: PeerId) { + let reggo_to_remove = self + .registrations_for_peer + .remove_by_left(&(peer_id, namespace)); + + if let Some((_, reggo_to_remove)) = reggo_to_remove { + self.registrations.remove(®go_to_remove); + } + } + + pub fn get( + &mut self, + discover_namespace: Option, + cookie: Option, + limit: Option, + ) -> Result<(impl Iterator + '_, Cookie), CookieNamespaceMismatch> { + let cookie_namespace = cookie.as_ref().and_then(|cookie| cookie.namespace()); + + match (discover_namespace.as_ref(), cookie_namespace) { + // discover all namespace but cookie is specific to a namespace? => bad + (None, Some(_)) => return Err(CookieNamespaceMismatch), + // discover for a namespace but cookie is for a different namespace? => bad + (Some(namespace), Some(cookie_namespace)) if namespace != cookie_namespace => { + return Err(CookieNamespaceMismatch) + } + // every other combination is fine + _ => {} + } + + let mut reggos_of_last_discover = cookie + .and_then(|cookie| self.cookies.get(&cookie)) + .cloned() + .unwrap_or_default(); + + let ids = self + .registrations_for_peer + .iter() + .filter_map({ + |((_, namespace), registration_id)| { + if reggos_of_last_discover.contains(registration_id) { + return None; + } + + match discover_namespace.as_ref() { + Some(discover_namespace) if discover_namespace == namespace => { + Some(registration_id) + } + Some(_) => None, + None => Some(registration_id), + } + } + }) + .take(limit.unwrap_or(u64::MAX) as usize) + .cloned() + .collect::>(); + + reggos_of_last_discover.extend(&ids); + + let new_cookie = discover_namespace + .map(Cookie::for_namespace) + .unwrap_or_else(Cookie::for_all_namespaces); + self.cookies + .insert(new_cookie.clone(), reggos_of_last_discover); + + let regs = &self.registrations; + let registrations = ids + .into_iter() + .map(move |id| regs.get(&id).expect("bad internal data structure")); + + Ok((registrations, new_cookie)) + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + loop { + let expired_registration = ready!(self.next_expiry.poll_next_unpin(cx)).expect( + "This stream should never finish because it is initialised with a pending future", + ); + + // clean up our cookies + self.cookies.retain(|_, registrations| { + registrations.remove(&expired_registration); + + // retain all cookies where there are still registrations left + !registrations.is_empty() + }); + + self.registrations_for_peer + .remove_by_right(&expired_registration); + match self.registrations.remove(&expired_registration) { + None => { + continue; + } + Some(registration) => { + return Poll::Ready(ExpiredRegistration(registration)); + } + } + } + } +} + +#[derive(Debug, thiserror::Error, Eq, PartialEq)] +#[error("The provided cookie is not valid for a DISCOVER request for the given namespace")] +pub struct CookieNamespaceMismatch; + +#[cfg(test)] +mod tests { + use libp2p_core::PeerRecord; + use libp2p_identity as identity; + use web_time::SystemTime; + + use super::*; + + #[test] + fn given_cookie_from_discover_when_discover_again_then_only_get_diff() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("foo")).unwrap(); + + let (initial_discover, cookie) = registrations.get(None, None, None).unwrap(); + assert_eq!(initial_discover.count(), 2); + + let (subsequent_discover, _) = registrations.get(None, Some(cookie), None).unwrap(); + assert_eq!(subsequent_discover.count(), 0); + } + + #[test] + fn given_registrations_when_discover_all_then_all_are_returned() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("foo")).unwrap(); + + let (discover, _) = registrations.get(None, None, None).unwrap(); + + assert_eq!(discover.count(), 2); + } + + #[test] + fn given_registrations_when_discover_only_for_specific_namespace_then_only_those_are_returned() + { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("bar")).unwrap(); + + let (discover, _) = registrations + .get(Some(Namespace::from_static("foo")), None, None) + .unwrap(); + + assert_eq!( + discover.map(|r| &r.namespace).collect::>(), + vec!["foo"] + ); + } + + #[test] + fn given_reregistration_old_registration_is_discarded() { + let alice = identity::Keypair::generate_ed25519(); + let mut registrations = Registrations::default(); + registrations + .add(new_registration("foo", alice.clone(), None)) + .unwrap(); + registrations + .add(new_registration("foo", alice, None)) + .unwrap(); + + let (discover, _) = registrations + .get(Some(Namespace::from_static("foo")), None, None) + .unwrap(); + + assert_eq!( + discover.map(|r| &r.namespace).collect::>(), + vec!["foo"] + ); + } + + #[test] + fn given_cookie_from_2nd_discover_does_not_return_nodes_from_first_discover() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("foo")).unwrap(); + + let (initial_discover, cookie1) = registrations.get(None, None, None).unwrap(); + assert_eq!(initial_discover.count(), 2); + + let (subsequent_discover, cookie2) = registrations.get(None, Some(cookie1), None).unwrap(); + assert_eq!(subsequent_discover.count(), 0); + + let (subsequent_discover, _) = registrations.get(None, Some(cookie2), None).unwrap(); + assert_eq!(subsequent_discover.count(), 0); + } + + #[test] + fn cookie_from_different_discover_request_is_not_valid() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("bar")).unwrap(); + + let (_, foo_discover_cookie) = registrations + .get(Some(Namespace::from_static("foo")), None, None) + .unwrap(); + let result = registrations.get( + Some(Namespace::from_static("bar")), + Some(foo_discover_cookie), + None, + ); + + assert!(matches!(result, Err(CookieNamespaceMismatch))) + } + + #[tokio::test] + async fn given_two_registration_ttls_one_expires_one_lives() { + let mut registrations = Registrations::with_config(Config { + min_ttl: 0, + max_ttl: 4, + }); + + let start_time = SystemTime::now(); + + registrations + .add(new_dummy_registration_with_ttl("foo", 1)) + .unwrap(); + registrations + .add(new_dummy_registration_with_ttl("bar", 4)) + .unwrap(); + + let event = registrations.next_event().await; + + let elapsed = start_time.elapsed().unwrap(); + assert!(elapsed.as_secs() >= 1); + assert!(elapsed.as_secs() < 2); + + assert_eq!(event.0.namespace, Namespace::from_static("foo")); + + { + let (mut discovered_foo, _) = registrations + .get(Some(Namespace::from_static("foo")), None, None) + .unwrap(); + assert!(discovered_foo.next().is_none()); + } + let (mut discovered_bar, _) = registrations + .get(Some(Namespace::from_static("bar")), None, None) + .unwrap(); + assert!(discovered_bar.next().is_some()); + } + + #[tokio::test] + async fn given_peer_unregisters_before_expiry_do_not_emit_registration_expired() { + let mut registrations = Registrations::with_config(Config { + min_ttl: 1, + max_ttl: 10, + }); + let dummy_registration = new_dummy_registration_with_ttl("foo", 2); + let namespace = dummy_registration.namespace.clone(); + let peer_id = dummy_registration.record.peer_id(); + + registrations.add(dummy_registration).unwrap(); + registrations.no_event_for(1).await; + registrations.remove(namespace, peer_id); + + registrations.no_event_for(3).await + } + + /// FuturesUnordered stop polling for ready futures when poll_next() is called until a None + /// value is returned. To prevent the next_expiry future from going to "sleep", next_expiry + /// is initialised with a future that always returns pending. This test ensures that + /// FuturesUnordered does not stop polling for ready futures. + #[tokio::test] + async fn given_all_registrations_expired_then_successfully_handle_new_registration_and_expiry() + { + let mut registrations = Registrations::with_config(Config { + min_ttl: 0, + max_ttl: 10, + }); + let dummy_registration = new_dummy_registration_with_ttl("foo", 1); + + registrations.add(dummy_registration.clone()).unwrap(); + let _ = registrations.next_event_in_at_most(2).await; + + registrations.no_event_for(1).await; + + registrations.add(dummy_registration).unwrap(); + let _ = registrations.next_event_in_at_most(2).await; + } + + #[tokio::test] + async fn cookies_are_cleaned_up_if_registrations_expire() { + let mut registrations = Registrations::with_config(Config { + min_ttl: 1, + max_ttl: 10, + }); + + registrations + .add(new_dummy_registration_with_ttl("foo", 2)) + .unwrap(); + let (_, _) = registrations.get(None, None, None).unwrap(); + + assert_eq!(registrations.cookies.len(), 1); + + let _ = registrations.next_event_in_at_most(3).await; + + assert_eq!(registrations.cookies.len(), 0); + } + + #[test] + fn given_limit_discover_only_returns_n_results() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("foo")).unwrap(); + + let (registrations, _) = registrations.get(None, None, Some(1)).unwrap(); + + assert_eq!(registrations.count(), 1); + } + + #[test] + fn given_limit_cookie_can_be_used_for_pagination() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("foo")).unwrap(); + + let (discover1, cookie) = registrations.get(None, None, Some(1)).unwrap(); + assert_eq!(discover1.count(), 1); + + let (discover2, _) = registrations.get(None, Some(cookie), None).unwrap(); + assert_eq!(discover2.count(), 1); + } + + fn new_dummy_registration(namespace: &'static str) -> NewRegistration { + let identity = identity::Keypair::generate_ed25519(); + + new_registration(namespace, identity, None) + } + + fn new_dummy_registration_with_ttl(namespace: &'static str, ttl: Ttl) -> NewRegistration { + let identity = identity::Keypair::generate_ed25519(); + + new_registration(namespace, identity, Some(ttl)) + } + + fn new_registration( + namespace: &'static str, + identity: identity::Keypair, + ttl: Option, + ) -> NewRegistration { + NewRegistration::new( + Namespace::from_static(namespace), + PeerRecord::new(&identity, vec!["/ip4/127.0.0.1/tcp/1234".parse().unwrap()]).unwrap(), + ttl, + ) + } + + /// Defines utility functions that make the tests more readable. + impl Registrations { + async fn next_event(&mut self) -> ExpiredRegistration { + futures::future::poll_fn(|cx| self.poll(cx)).await + } + + /// Polls [`Registrations`] for `seconds` and panics if it returns a event during this time. + async fn no_event_for(&mut self, seconds: u64) { + tokio::time::timeout(Duration::from_secs(seconds), self.next_event()) + .await + .unwrap_err(); + } + + /// Polls [`Registrations`] for at most `seconds` and panics if doesn't + /// return an event within that time. + async fn next_event_in_at_most(&mut self, seconds: u64) -> ExpiredRegistration { + tokio::time::timeout(Duration::from_secs(seconds), self.next_event()) + .await + .unwrap() + } + } +} diff --git a/protocols/auth-rendezvous/tests/rendezvous.rs b/protocols/auth-rendezvous/tests/rendezvous.rs new file mode 100644 index 00000000000..f7a18d97d76 --- /dev/null +++ b/protocols/auth-rendezvous/tests/rendezvous.rs @@ -0,0 +1,526 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::time::Duration; + +use futures::{stream::FuturesUnordered, StreamExt}; +use libp2p_auth_rendezvous as rendezvous; +use libp2p_auth_rendezvous::client::RegisterError; +use libp2p_auth_rendezvous::Namespace; +use libp2p_core::{multiaddr::Protocol, Multiaddr}; +use libp2p_identity as identity; +use libp2p_identity::PeerId; +use libp2p_swarm::{DialError, Swarm, SwarmEvent}; +use libp2p_swarm_test::SwarmExt; +use tracing_subscriber::EnvFilter; + +#[tokio::test] +async fn given_successful_registration_then_successful_discovery() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice, mut bob], mut robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + + alice + .behaviour_mut() + .register(namespace.clone(), *robert.local_peer_id(), None, None) + .unwrap(); + + match libp2p_swarm_test::drive(&mut alice, &mut robert).await { + ( + [rendezvous::client::Event::Registered { + rendezvous_node, + ttl, + namespace: register_node_namespace, + }], + [rendezvous::server::Event::PeerRegistered { peer, registration }], + ) => { + assert_eq!(&peer, alice.local_peer_id()); + assert_eq!(&rendezvous_node, robert.local_peer_id()); + assert_eq!(registration.namespace, namespace); + assert_eq!(register_node_namespace, namespace); + assert_eq!(ttl, rendezvous::DEFAULT_TTL); + } + events => panic!("Unexpected events: {events:?}"), + } + + bob.behaviour_mut().discover( + Some(namespace.clone()), + None, + None, + *robert.local_peer_id(), + None, + ); + + match libp2p_swarm_test::drive(&mut bob, &mut robert).await { + ( + [rendezvous::client::Event::Discovered { registrations, .. }], + [rendezvous::server::Event::DiscoverServed { .. }], + ) => match registrations.as_slice() { + [rendezvous::Registration { + namespace: registered_namespace, + record, + ttl, + }] => { + assert_eq!(*ttl, rendezvous::DEFAULT_TTL); + assert_eq!(record.peer_id(), *alice.local_peer_id()); + assert_eq!(*registered_namespace, namespace); + } + _ => panic!("Expected exactly one registration to be returned from discover"), + }, + events => panic!("Unexpected events: {events:?}"), + } +} + +#[tokio::test] +async fn should_return_error_when_no_external_addresses() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let server = new_server(rendezvous::server::Config::default()).await; + let mut client = Swarm::new_ephemeral_tokio(rendezvous::client::Behaviour::new); + + let actual = client + .behaviour_mut() + .register(namespace.clone(), *server.local_peer_id(), None, None) + .unwrap_err(); + + assert!(matches!(actual, RegisterError::NoExternalAddresses)) +} + +#[tokio::test] +async fn given_successful_registration_then_refresh_ttl() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice, mut bob], mut robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + + let roberts_peer_id = *robert.local_peer_id(); + let refresh_ttl = 10_000; + + alice + .behaviour_mut() + .register(namespace.clone(), roberts_peer_id, None, None) + .unwrap(); + + match libp2p_swarm_test::drive(&mut alice, &mut robert).await { + ( + [rendezvous::client::Event::Registered { .. }], + [rendezvous::server::Event::PeerRegistered { .. }], + ) => {} + events => panic!("Unexpected events: {events:?}"), + } + + bob.behaviour_mut() + .discover(Some(namespace.clone()), None, None, roberts_peer_id, None); + + match libp2p_swarm_test::drive(&mut bob, &mut robert).await { + ( + [rendezvous::client::Event::Discovered { .. }], + [rendezvous::server::Event::DiscoverServed { .. }], + ) => {} + events => panic!("Unexpected events: {events:?}"), + } + + alice + .behaviour_mut() + .register(namespace.clone(), roberts_peer_id, Some(refresh_ttl), None) + .unwrap(); + + match libp2p_swarm_test::drive(&mut alice, &mut robert).await { + ( + [rendezvous::client::Event::Registered { ttl, .. }], + [rendezvous::server::Event::PeerRegistered { .. }], + ) => { + assert_eq!(ttl, refresh_ttl); + } + events => panic!("Unexpected events: {events:?}"), + } + + bob.behaviour_mut().discover( + Some(namespace.clone()), + None, + None, + *robert.local_peer_id(), + None, + ); + + match libp2p_swarm_test::drive(&mut bob, &mut robert).await { + ( + [rendezvous::client::Event::Discovered { registrations, .. }], + [rendezvous::server::Event::DiscoverServed { .. }], + ) => match registrations.as_slice() { + [rendezvous::Registration { ttl, .. }] => { + assert_eq!(*ttl, refresh_ttl); + } + _ => panic!("Expected exactly one registration to be returned from discover"), + }, + events => panic!("Unexpected events: {events:?}"), + } +} + +#[tokio::test] +async fn given_successful_registration_then_refresh_external_addrs() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice], mut robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + + let roberts_peer_id = *robert.local_peer_id(); + + alice + .behaviour_mut() + .register(namespace.clone(), roberts_peer_id, None, None) + .unwrap(); + + match libp2p_swarm_test::drive(&mut alice, &mut robert).await { + ( + [rendezvous::client::Event::Registered { .. }], + [rendezvous::server::Event::PeerRegistered { .. }], + ) => {} + events => panic!("Unexpected events: {events:?}"), + } + + let external_addr = Multiaddr::empty().with(Protocol::Memory(0)); + + alice.add_external_address(external_addr.clone()); + + match libp2p_swarm_test::drive(&mut alice, &mut robert).await { + ( + [rendezvous::client::Event::Registered { .. }], + [rendezvous::server::Event::PeerRegistered { registration, .. }], + ) => { + let record = registration.record; + assert!(record.addresses().contains(&external_addr)); + } + events => panic!("Unexpected events: {events:?}"), + } + + alice.remove_external_address(&external_addr); + + match libp2p_swarm_test::drive(&mut alice, &mut robert).await { + ( + [rendezvous::client::Event::Registered { .. }], + [rendezvous::server::Event::PeerRegistered { registration, .. }], + ) => { + let record = registration.record; + assert!(!record.addresses().contains(&external_addr)); + } + events => panic!("Unexpected events: {events:?}"), + } +} + +#[tokio::test] +async fn given_invalid_ttl_then_unsuccessful_registration() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice], mut robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + + alice + .behaviour_mut() + .register( + namespace.clone(), + *robert.local_peer_id(), + Some(100_000_000), + None, + ) + .unwrap(); + + match libp2p_swarm_test::drive(&mut alice, &mut robert).await { + ( + [rendezvous::client::Event::RegisterFailed { error, .. }], + [rendezvous::server::Event::PeerNotRegistered { .. }], + ) => { + assert_eq!(error, rendezvous::ErrorCode::InvalidTtl); + } + events => panic!("Unexpected events: {events:?}"), + } +} + +#[tokio::test] +async fn discover_allows_for_dial_by_peer_id() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice, mut bob], robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + + let roberts_peer_id = *robert.local_peer_id(); + tokio::spawn(robert.loop_on_next()); + + alice + .behaviour_mut() + .register(namespace.clone(), roberts_peer_id, None, None) + .unwrap(); + match alice.next_behaviour_event().await { + rendezvous::client::Event::Registered { .. } => {} + event => panic!("Unexpected event: {event:?}"), + } + + bob.behaviour_mut() + .discover(Some(namespace.clone()), None, None, roberts_peer_id, None); + match bob.next_behaviour_event().await { + rendezvous::client::Event::Discovered { registrations, .. } => { + assert!(!registrations.is_empty()); + } + event => panic!("Unexpected event: {event:?}"), + } + + let alices_peer_id = *alice.local_peer_id(); + let bobs_peer_id = *bob.local_peer_id(); + + bob.dial(alices_peer_id).unwrap(); + + let alice_connected_to = tokio::spawn(async move { + loop { + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = + alice.select_next_some().await + { + break peer_id; + } + } + }); + let bob_connected_to = tokio::spawn(async move { + loop { + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = bob.select_next_some().await + { + break peer_id; + } + } + }); + + assert_eq!(alice_connected_to.await.unwrap(), bobs_peer_id); + assert_eq!(bob_connected_to.await.unwrap(), alices_peer_id); +} + +#[tokio::test] +async fn eve_cannot_register() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let mut robert = new_server(rendezvous::server::Config::default()).await; + let mut eve = new_impersonating_client().await; + eve.connect(&mut robert).await; + + eve.behaviour_mut() + .register(namespace.clone(), *robert.local_peer_id(), None, None) + .unwrap(); + + match libp2p_swarm_test::drive(&mut eve, &mut robert).await { + ( + [rendezvous::client::Event::RegisterFailed { + error: err_code, .. + }], + [rendezvous::server::Event::PeerNotRegistered { .. }], + ) => { + assert_eq!(err_code, rendezvous::ErrorCode::NotAuthorized); + } + events => panic!("Unexpected events: {events:?}"), + } +} + +// test if charlie can operate as client and server simultaneously +#[tokio::test] +async fn can_combine_client_and_server() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice], mut robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + let mut charlie = new_combined_node().await; + charlie.connect(&mut robert).await; + alice.connect(&mut charlie).await; + + charlie + .behaviour_mut() + .client + .register(namespace.clone(), *robert.local_peer_id(), None, None) + .unwrap(); + match libp2p_swarm_test::drive(&mut charlie, &mut robert).await { + ( + [CombinedEvent::Client(rendezvous::client::Event::Registered { .. })], + [rendezvous::server::Event::PeerRegistered { .. }], + ) => {} + events => panic!("Unexpected events: {events:?}"), + } + + alice + .behaviour_mut() + .register(namespace, *charlie.local_peer_id(), None, None) + .unwrap(); + match libp2p_swarm_test::drive(&mut charlie, &mut alice).await { + ( + [CombinedEvent::Server(rendezvous::server::Event::PeerRegistered { .. })], + [rendezvous::client::Event::Registered { .. }], + ) => {} + events => panic!("Unexpected events: {events:?}"), + } +} + +#[tokio::test] +async fn registration_on_clients_expire() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice, mut bob], robert) = + new_server_with_connected_clients(rendezvous::server::Config::default().with_min_ttl(1)) + .await; + + let alice_peer_id = *alice.local_peer_id(); + let roberts_peer_id = *robert.local_peer_id(); + tokio::spawn(robert.loop_on_next()); + + let registration_ttl = 1; + + alice + .behaviour_mut() + .register( + namespace.clone(), + roberts_peer_id, + Some(registration_ttl), + None, + ) + .unwrap(); + match alice.next_behaviour_event().await { + rendezvous::client::Event::Registered { .. } => {} + event => panic!("Unexpected event: {event:?}"), + } + bob.behaviour_mut() + .discover(Some(namespace), None, None, roberts_peer_id, None); + match bob.next_behaviour_event().await { + rendezvous::client::Event::Discovered { registrations, .. } => { + assert!(!registrations.is_empty()); + } + event => panic!("Unexpected event: {event:?}"), + } + + match bob.next_swarm_event().await { + SwarmEvent::NewExternalAddrOfPeer { + peer_id: discovered_peer_id, + .. + } => { + assert_eq!(discovered_peer_id, alice_peer_id); + } + event => panic!("Unexpected event: {event:?}"), + } + + tokio::time::sleep(Duration::from_secs(registration_ttl + 1)).await; + + let event = bob.select_next_some().await; + let error = bob.dial(*alice.local_peer_id()).unwrap_err(); + + assert!(matches!( + event, + SwarmEvent::Behaviour(rendezvous::client::Event::Expired { .. }) + )); + assert!(matches!(error, DialError::NoAddresses)); +} + +async fn new_server_with_connected_clients( + config: rendezvous::server::Config, +) -> ( + [Swarm; N], + Swarm, +) { + let mut server = new_server(config).await; + + let mut clients: [Swarm<_>; N] = match (0usize..N) + .map(|_| new_client()) + .collect::>() + .collect::>() + .await + .try_into() + { + Ok(clients) => clients, + Err(_) => panic!("Vec is of size N"), + }; + + for client in &mut clients { + client.connect(&mut server).await; + } + + (clients, server) +} + +async fn new_client() -> Swarm { + let mut client = Swarm::new_ephemeral_tokio(rendezvous::client::Behaviour::new); + client.listen().with_memory_addr_external().await; // we need to listen otherwise we don't have addresses to register + + client +} + +async fn new_server(config: rendezvous::server::Config) -> Swarm { + let authorize_fn = Box::new(|_: &PeerId, _: Option<&Namespace>, _: &Option>| true); + let mut server = + Swarm::new_ephemeral_tokio(|_| rendezvous::server::Behaviour::new(config, authorize_fn)); + + server.listen().with_memory_addr_external().await; + + server +} + +async fn new_combined_node() -> Swarm { + let authorize_fn = Box::new(|_: &PeerId, _: Option<&Namespace>, _: &Option>| true); + let mut node = Swarm::new_ephemeral_tokio(|identity| Combined { + client: rendezvous::client::Behaviour::new(identity), + server: rendezvous::server::Behaviour::new( + rendezvous::server::Config::default(), + authorize_fn, + ), + }); + node.listen().with_memory_addr_external().await; + + node +} + +async fn new_impersonating_client() -> Swarm { + // In reality, if Eve were to try and fake someones identity, she would obviously only know the + // public key. Due to the type-safe API of the `Rendezvous` behaviour and `PeerRecord`, we + // actually cannot construct a bad `PeerRecord` (i.e. one that is claims to be someone else). + // As such, the best we can do is hand eve a completely different keypair from what she is using + // to authenticate her connection. + let someone_else = identity::Keypair::generate_ed25519(); + let mut eve = + Swarm::new_ephemeral_tokio(move |_| rendezvous::client::Behaviour::new(someone_else)); + eve.listen().with_memory_addr_external().await; + + eve +} + +#[derive(libp2p_swarm::NetworkBehaviour)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] +struct Combined { + client: rendezvous::client::Behaviour, + server: rendezvous::server::Behaviour, +} From bc559068d1db18fb1d043cc6e277148201fa26c0 Mon Sep 17 00:00:00 2001 From: 0xkr8os Date: Thu, 4 Sep 2025 17:50:57 -0500 Subject: [PATCH 3/5] rexport from libp2p --- Cargo.toml | 2 +- libp2p/Cargo.toml | 2 ++ libp2p/src/lib.rs | 3 +++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c60fc389dd7..6405a606d52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -105,7 +105,7 @@ libp2p-request-response = { version = "0.29.0", path = "protocols/request-respon libp2p-server = { version = "0.12.7", path = "misc/server" } libp2p-stream = { version = "0.4.0-alpha", path = "protocols/stream" } libp2p-swarm = { version = "0.47.0", path = "swarm" } -libp2p-swarm-derive = { version = "=0.35.1", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required. +libp2p-swarm-derive = { version = "=0.35.1", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required. libp2p-swarm-test = { version = "0.6.0", path = "swarm-test" } libp2p-tcp = { version = "0.44.0", path = "transports/tcp" } libp2p-tls = { version = "0.6.2", path = "transports/tls" } diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 48f4c9477bd..8213b9fbfb1 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -72,6 +72,7 @@ plaintext = ["dep:libp2p-plaintext"] pnet = ["dep:libp2p-pnet"] quic = ["dep:libp2p-quic"] relay = ["dep:libp2p-relay", "libp2p-metrics?/relay"] +auth-rendezvous = ["dep:libp2p-auth-rendezvous"] rendezvous = ["dep:libp2p-rendezvous"] request-response = ["dep:libp2p-request-response"] rsa = ["libp2p-identity/rsa"] @@ -113,6 +114,7 @@ libp2p-ping = { workspace = true, optional = true } libp2p-plaintext = { workspace = true, optional = true } libp2p-pnet = { workspace = true, optional = true } libp2p-relay = { workspace = true, optional = true } +libp2p-auth-rendezvous = { workspace = true, optional = true } libp2p-rendezvous = { workspace = true, optional = true } libp2p-request-response = { workspace = true, optional = true } libp2p-swarm = { workspace = true } diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 42461f8ef8e..29b3b9a677e 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -36,6 +36,9 @@ pub use bytes; pub use futures; #[doc(inline)] pub use libp2p_allow_block_list as allow_block_list; +#[cfg(feature = "auth-rendezvous")] +#[doc(inline)] +pub use libp2p_auth_rendezvous as auth_rendezvous; #[cfg(feature = "autonat")] #[doc(inline)] pub use libp2p_autonat as autonat; From 687eaf1537159e8804c65edba6eee632e97e0b0f Mon Sep 17 00:00:00 2001 From: 0xkr8os Date: Thu, 4 Sep 2025 17:55:26 -0500 Subject: [PATCH 4/5] changed version --- Cargo.lock | 3 ++- protocols/auth-rendezvous/Cargo.toml | 12 ++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f922795028..7b4a23472b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2422,6 +2422,7 @@ dependencies = [ "futures-timer", "getrandom 0.2.15", "libp2p-allow-block-list", + "libp2p-auth-rendezvous", "libp2p-autonat", "libp2p-connection-limits", "libp2p-core", @@ -2476,7 +2477,7 @@ dependencies = [ [[package]] name = "libp2p-auth-rendezvous" -version = "0.17.0" +version = "0.1.0" dependencies = [ "async-trait", "asynchronous-codec", diff --git a/protocols/auth-rendezvous/Cargo.toml b/protocols/auth-rendezvous/Cargo.toml index 1c160be023d..b477f45a365 100644 --- a/protocols/auth-rendezvous/Cargo.toml +++ b/protocols/auth-rendezvous/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-auth-rendezvous" edition.workspace = true rust-version = { workspace = true } description = "Rendezvous protocol for libp2p" -version = "0.17.0" +version = "0.1.0" authors = ["The COMIT guys "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -31,7 +31,15 @@ tracing = { workspace = true } libp2p-swarm = { workspace = true, features = ["macros", "tokio"] } libp2p-swarm-test = { path = "../../swarm-test" } rand = "0.8" -tokio = { workspace = true, features = [ "rt-multi-thread", "time", "macros", "sync", "process", "fs", "net" ] } +tokio = { workspace = true, features = [ + "rt-multi-thread", + "time", + "macros", + "sync", + "process", + "fs", + "net", +] } tracing-subscriber = { workspace = true, features = ["env-filter"] } # Passing arguments to the docsrs builder in order to properly document cfg's. From c9ce2c1b48b038c193e0758cd8afe18167b35d44 Mon Sep 17 00:00:00 2001 From: 0xkr8os Date: Thu, 4 Sep 2025 19:30:29 -0500 Subject: [PATCH 5/5] use authorizer trait --- protocols/auth-rendezvous/src/server.rs | 279 +++++++++--------- protocols/auth-rendezvous/tests/rendezvous.rs | 24 +- 2 files changed, 157 insertions(+), 146 deletions(-) diff --git a/protocols/auth-rendezvous/src/server.rs b/protocols/auth-rendezvous/src/server.rs index 35e692e10cd..a1b2620b526 100644 --- a/protocols/auth-rendezvous/src/server.rs +++ b/protocols/auth-rendezvous/src/server.rs @@ -40,15 +40,20 @@ use crate::{ MAX_TTL, MIN_TTL, }; -pub type AuthorizeFn = - Box, &Option>) -> bool + Send + Sync>; +pub trait Authorizer: Send + Sync + 'static { + fn is_authorized( + &self, + peer_id: &PeerId, + namespace: Option<&Namespace>, + auth_token: Option<&Vec>, + ) -> bool; +} -pub struct Behaviour { +pub struct Behaviour { inner: libp2p_request_response::Behaviour, registrations: Registrations, - authorized_peers: HashMap, HashSet>>, - authorize_fn: AuthorizeFn, + authorizer: A, } pub struct Config { @@ -77,9 +82,9 @@ impl Default for Config { } } -impl Behaviour { +impl Behaviour { /// Create a new instance of the rendezvous [`NetworkBehaviour`]. - pub fn new(config: Config, authorize_fn: AuthorizeFn) -> Self { + pub fn new(config: Config, authorizer: A) -> Self { Self { inner: libp2p_request_response::Behaviour::with_codec( crate::codec::Codec::default(), @@ -88,8 +93,7 @@ impl Behaviour { ), registrations: Registrations::with_config(config), - authorized_peers: HashMap::new(), - authorize_fn: authorize_fn, + authorizer, } } @@ -100,8 +104,131 @@ impl Behaviour { self.registrations.add(registration) } - pub fn add_authorized_peer_set(&mut self, namespace: Vec, set: HashSet>) { - self.authorized_peers.insert(namespace, set); + fn handle_request( + &mut self, + peer_id: PeerId, + message: Message, + ) -> Option<(Event, Option)> { + match message { + Message::Register { + new_registration: registration, + auth_token, + } => { + if registration.record.peer_id() != peer_id { + let error = ErrorCode::NotAuthorized; + + let event = Event::PeerNotRegistered { + peer: peer_id, + namespace: registration.namespace, + error, + }; + + return Some((event, Some(Message::RegisterResponse(Err(error))))); + } + + if !self.authorizer.is_authorized( + &peer_id, + Some(®istration.namespace), + auth_token.as_ref(), + ) { + let error = ErrorCode::NotAuthorized; + + let event = Event::PeerNotRegistered { + peer: peer_id, + namespace: registration.namespace, + error, + }; + + return Some((event, Some(Message::RegisterResponse(Err(error))))); + } + + let namespace = registration.namespace.clone(); + + match self.registrations.add(registration) { + Ok(registration) => { + let response = Message::RegisterResponse(Ok(registration.ttl)); + + let event = Event::PeerRegistered { + peer: peer_id, + registration, + }; + + Some((event, Some(response))) + } + Err(TtlOutOfRange::TooLong { .. }) | Err(TtlOutOfRange::TooShort { .. }) => { + let error = ErrorCode::InvalidTtl; + + let response = Message::RegisterResponse(Err(error)); + + let event = Event::PeerNotRegistered { + peer: peer_id, + namespace, + error, + }; + + Some((event, Some(response))) + } + } + } + Message::Unregister(namespace) => { + self.registrations.remove(namespace.clone(), peer_id); + + let event = Event::PeerUnregistered { + peer: peer_id, + namespace, + }; + + Some((event, None)) + } + Message::Discover { + namespace, + cookie, + limit, + auth_token, + } => { + if !self + .authorizer + .is_authorized(&peer_id, namespace.as_ref(), auth_token.as_ref()) + { + let error = ErrorCode::NotAuthorized; + let response = Message::DiscoverResponse(Err(error)); + let event = Event::DiscoverNotServed { + enquirer: peer_id, + error, + }; + return Some((event, Some(response))); + } + + match self.registrations.get(namespace, cookie, limit) { + Ok((registrations, cookie)) => { + let discovered = registrations.cloned().collect::>(); + + let response = Message::DiscoverResponse(Ok((discovered.clone(), cookie))); + + let event = Event::DiscoverServed { + enquirer: peer_id, + registrations: discovered, + }; + + Some((event, Some(response))) + } + Err(_) => { + let error = ErrorCode::InvalidCookie; + + let response = Message::DiscoverResponse(Err(error)); + + let event = Event::DiscoverNotServed { + enquirer: peer_id, + error, + }; + + Some((event, Some(response))) + } + } + } + Message::RegisterResponse(_) => None, + Message::DiscoverResponse(_) => None, + } } } @@ -132,7 +259,7 @@ pub enum Event { RegistrationExpired(Registration), } -impl NetworkBehaviour for Behaviour { +impl NetworkBehaviour for Behaviour { type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; @@ -203,12 +330,7 @@ impl NetworkBehaviour for Behaviour { }, .. }) => { - if let Some((event, response)) = handle_request( - peer_id, - request, - &mut self.registrations, - &self.authorize_fn, - ) { + if let Some((event, response)) = self.handle_request(peer_id, request) { if let Some(resp) = response { if let Err(resp) = self.inner.send_response(channel, resp) { tracing::debug!( @@ -268,127 +390,6 @@ impl NetworkBehaviour for Behaviour { } } -fn handle_request( - peer_id: PeerId, - message: Message, - registrations: &mut Registrations, - authorize_fn: &AuthorizeFn, -) -> Option<(Event, Option)> { - match message { - Message::Register { - new_registration: registration, - auth_token, - } => { - if registration.record.peer_id() != peer_id { - let error = ErrorCode::NotAuthorized; - - let event = Event::PeerNotRegistered { - peer: peer_id, - namespace: registration.namespace, - error, - }; - - return Some((event, Some(Message::RegisterResponse(Err(error))))); - } - - if !(*authorize_fn)(&peer_id, Some(®istration.namespace), &auth_token) { - let error = ErrorCode::NotAuthorized; - - let event = Event::PeerNotRegistered { - peer: peer_id, - namespace: registration.namespace, - error, - }; - - return Some((event, Some(Message::RegisterResponse(Err(error))))); - } - - let namespace = registration.namespace.clone(); - - match registrations.add(registration) { - Ok(registration) => { - let response = Message::RegisterResponse(Ok(registration.ttl)); - - let event = Event::PeerRegistered { - peer: peer_id, - registration, - }; - - Some((event, Some(response))) - } - Err(TtlOutOfRange::TooLong { .. }) | Err(TtlOutOfRange::TooShort { .. }) => { - let error = ErrorCode::InvalidTtl; - - let response = Message::RegisterResponse(Err(error)); - - let event = Event::PeerNotRegistered { - peer: peer_id, - namespace, - error, - }; - - Some((event, Some(response))) - } - } - } - Message::Unregister(namespace) => { - registrations.remove(namespace.clone(), peer_id); - - let event = Event::PeerUnregistered { - peer: peer_id, - namespace, - }; - - Some((event, None)) - } - Message::Discover { - namespace, - cookie, - limit, - auth_token, - } => { - if !(*authorize_fn)(&peer_id, namespace.as_ref(), &auth_token) { - let error = ErrorCode::NotAuthorized; - let response = Message::DiscoverResponse(Err(error)); - let event = Event::DiscoverNotServed { - enquirer: peer_id, - error, - }; - return Some((event, Some(response))); - } - - match registrations.get(namespace, cookie, limit) { - Ok((registrations, cookie)) => { - let discovered = registrations.cloned().collect::>(); - - let response = Message::DiscoverResponse(Ok((discovered.clone(), cookie))); - - let event = Event::DiscoverServed { - enquirer: peer_id, - registrations: discovered, - }; - - Some((event, Some(response))) - } - Err(_) => { - let error = ErrorCode::InvalidCookie; - - let response = Message::DiscoverResponse(Err(error)); - - let event = Event::DiscoverNotServed { - enquirer: peer_id, - error, - }; - - Some((event, Some(response))) - } - } - } - Message::RegisterResponse(_) => None, - Message::DiscoverResponse(_) => None, - } -} - #[derive(Debug, Eq, PartialEq, Hash, Copy, Clone)] struct RegistrationId(u64); diff --git a/protocols/auth-rendezvous/tests/rendezvous.rs b/protocols/auth-rendezvous/tests/rendezvous.rs index f7a18d97d76..21e767a4c29 100644 --- a/protocols/auth-rendezvous/tests/rendezvous.rs +++ b/protocols/auth-rendezvous/tests/rendezvous.rs @@ -451,7 +451,7 @@ async fn new_server_with_connected_clients( config: rendezvous::server::Config, ) -> ( [Swarm; N], - Swarm, + Swarm>, ) { let mut server = new_server(config).await; @@ -473,6 +473,14 @@ async fn new_server_with_connected_clients( (clients, server) } +struct MockAuthorizer; + +impl rendezvous::server::Authorizer for MockAuthorizer { + fn is_authorized(&self, _: &PeerId, _: Option<&Namespace>, _: Option<&Vec>) -> bool { + true + } +} + async fn new_client() -> Swarm { let mut client = Swarm::new_ephemeral_tokio(rendezvous::client::Behaviour::new); client.listen().with_memory_addr_external().await; // we need to listen otherwise we don't have addresses to register @@ -480,10 +488,12 @@ async fn new_client() -> Swarm { client } -async fn new_server(config: rendezvous::server::Config) -> Swarm { - let authorize_fn = Box::new(|_: &PeerId, _: Option<&Namespace>, _: &Option>| true); +async fn new_server( + config: rendezvous::server::Config, +) -> Swarm> { + let authorizer = MockAuthorizer; let mut server = - Swarm::new_ephemeral_tokio(|_| rendezvous::server::Behaviour::new(config, authorize_fn)); + Swarm::new_ephemeral_tokio(|_| rendezvous::server::Behaviour::new(config, authorizer)); server.listen().with_memory_addr_external().await; @@ -491,12 +501,12 @@ async fn new_server(config: rendezvous::server::Config) -> Swarm Swarm { - let authorize_fn = Box::new(|_: &PeerId, _: Option<&Namespace>, _: &Option>| true); + let authorizer = MockAuthorizer; let mut node = Swarm::new_ephemeral_tokio(|identity| Combined { client: rendezvous::client::Behaviour::new(identity), server: rendezvous::server::Behaviour::new( rendezvous::server::Config::default(), - authorize_fn, + authorizer, ), }); node.listen().with_memory_addr_external().await; @@ -522,5 +532,5 @@ async fn new_impersonating_client() -> Swarm { #[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct Combined { client: rendezvous::client::Behaviour, - server: rendezvous::server::Behaviour, + server: rendezvous::server::Behaviour, }