Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/adnl/src/overlay/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1946,16 +1946,16 @@ impl BroadcastParsed for BroadcastTwostepSimple {
}

pub(crate) struct BroadcastTwostepSimpleProtocol {
big_data: bool,
extra: Option<Vec<u8>>,
reliable: bool,
}

impl BroadcastTwostepSimpleProtocol {
pub(crate) fn for_recv(big_data: bool) -> Self {
Self { big_data, extra: None }
pub(crate) fn for_recv(reliable: bool) -> Self {
Self { reliable, extra: None }
}
pub(crate) fn for_send(big_data: bool, extra: Vec<u8>) -> Self {
Self { big_data, extra: Some(extra) }
pub(crate) fn for_send(reliable: bool, extra: Vec<u8>) -> Self {
Self { reliable, extra: Some(extra) }
}
fn calc_to_sign(bcast_id: BroadcastId, data: &[u8]) -> Result<Vec<u8>> {
let to_sign =
Expand Down Expand Up @@ -1992,7 +1992,7 @@ impl BroadcastProtocol<BroadcastTwostepSimple> for BroadcastTwostepSimpleProtoco
}

fn send_method(&self) -> BroadcastSendMethod {
if self.big_data {
if self.reliable {
BroadcastSendMethod::QuicOrRldp
} else {
BroadcastSendMethod::Fast
Expand Down
170 changes: 124 additions & 46 deletions src/adnl/src/overlay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ impl OverlayType {
}
}

fn quic_requested(&self) -> bool {
matches!(self, OverlayType::Private { use_quic: true, .. })
}

fn calc_message_prefix(&self, overlay_id: &OverlayShortId) -> Result<Vec<u8>> {
match self {
Self::CertifiedMembers { certificate, .. } => serialize_boxed(
Expand Down Expand Up @@ -485,6 +489,8 @@ declare_counted!(
nodes: lockfree::map::Map<Arc<KeyId>, NodeObject>,
overlay_id: Arc<OverlayShortId>,
owned_broadcasts: lockfree::map::Map<BroadcastId, OwnedBroadcast>,
// Peers waiting for ADNL address resolution before being added to known_peers
pending_peers: lockfree::queue::Queue<Arc<KeyId>>,
purge_broadcasts: lockfree::queue::Queue<BroadcastId>,
purge_broadcasts_count: AtomicU32,
queue_one_time_broadcasts: tokio::sync::mpsc::UnboundedSender<(BroadcastId, Instant)>,
Expand Down Expand Up @@ -998,6 +1004,15 @@ impl Overlay {
.map_err(|e| error!("Error putting one time broadcast into monitoring queue: {e}"))
}

fn try_add_peer(&self, our_key: &Arc<KeyId>, peer: &Arc<KeyId>) -> Result<bool> {
if self.adnl.have_peer(our_key, peer)? {
self.known_peers.add(peer)?;
Ok(true)
} else {
Ok(false)
}
}

fn update_neighbours(&self, n: u32) -> Result<()> {
if self.overlay_type.is_private() {
let n = min(self.known_peers.all().count(), n);
Expand Down Expand Up @@ -1428,31 +1443,8 @@ impl OverlayNode {
self.add_typed_private_overlay(overlay_type, params, peers)
}

/// Add semiprivate overlay
pub fn add_semiprivate_overlay(
&self,
params: OverlayParams,
overlay_key: Option<&Arc<dyn KeyOption>>,
root_members: &[Arc<KeyId>],
certificate: Option<MemberCertificate>,
max_slaves: usize,
) -> Result<bool> {
let mut root_members_full = HashMap::with_capacity(root_members.len());
for member in root_members {
root_members_full.insert(member.clone(), lockfree::map::Map::new());
}
let overlay_type = OverlayType::CertifiedMembers {
root_members: root_members_full,
max_slaves,
bcast_prefix: OverlayUtils::calc_message_prefix(params.overlay_id)?,
certificate,
key: overlay_key.cloned(),
};
self.add_typed_private_overlay(overlay_type, params, root_members)
}

/// Add private overlay peers
pub fn add_private_peers(
/// Add private peers to ADNL layer
pub fn add_private_peers_to_adnl(
&self,
local_adnl_key: &Arc<KeyId>,
peers: Vec<(IpAddress, Option<IpAddress>, Arc<dyn KeyOption>)>,
Expand All @@ -1466,6 +1458,15 @@ impl OverlayNode {
Ok(ret)
}

/// Add private peers to the overlay
pub fn add_private_peers_to_overlay(
&self,
overlay_id: &Arc<OverlayShortId>,
peers: &[Arc<KeyId>],
) -> Result<usize> {
self.add_peers_to_overlay(overlay_id, peers, "Cannot get overlay to add peers")
}

/// Add public overlay peer
pub fn add_public_peer<N1: Borrow<NodeV1>, N2: Borrow<NodeV2>>(
&self,
Expand Down Expand Up @@ -1551,18 +1552,27 @@ impl OverlayNode {
Ok(Some(ret))
}

fn calc_src_key_for_broadcast<'a>(
&'a self,
overlay: &'a Overlay,
src_key: Option<&'a Arc<dyn KeyOption>>,
) -> &'a Arc<dyn KeyOption> {
if let Some(source) = src_key {
source
} else if let Some(key) = overlay.overlay_key() {
key
} else {
&self.node_key
/// Add semiprivate overlay
pub fn add_semiprivate_overlay(
&self,
params: OverlayParams,
overlay_key: Option<&Arc<dyn KeyOption>>,
root_members: &[Arc<KeyId>],
certificate: Option<MemberCertificate>,
max_slaves: usize,
) -> Result<bool> {
let mut root_members_full = HashMap::with_capacity(root_members.len());
for member in root_members {
root_members_full.insert(member.clone(), lockfree::map::Map::new());
}
let overlay_type = OverlayType::CertifiedMembers {
root_members: root_members_full,
max_slaves,
bcast_prefix: OverlayUtils::calc_message_prefix(params.overlay_id)?,
certificate,
key: overlay_key.cloned(),
};
self.add_typed_private_overlay(overlay_type, params, root_members)
}

/// Broadcast message
Expand Down Expand Up @@ -1617,10 +1627,11 @@ impl OverlayNode {
};
let neighbours = overlay.calc_broadcast_twostep_neighbours();
let big_data = data.object.len() >= Self::MIN_BYTES_FEC_TWO_STEPS_BROADCAST;
let reliable = big_data || overlay.overlay_type.quic_requested();
if big_data && (neighbours >= Self::MIN_NODES_FEC_TWO_STEPS_BROADCAST) {
BroadcastTwostepFecProtocol::for_send(data.object, neighbours, extra)?.send(ctx).await
} else {
BroadcastTwostepSimpleProtocol::for_send(big_data, extra).send(ctx).await
BroadcastTwostepSimpleProtocol::for_send(reliable, extra).send(ctx).await
}
}

Expand Down Expand Up @@ -1986,6 +1997,7 @@ impl OverlayNode {
overlay_id: params.overlay_id.clone(),
overlay_type,
owned_broadcasts: lockfree::map::Map::new(),
pending_peers: lockfree::queue::Queue::new(),
purge_broadcasts: lockfree::queue::Queue::new(),
purge_broadcasts_count: AtomicU32::new(0),
queue_one_time_broadcasts: sender_one_time,
Expand Down Expand Up @@ -2028,6 +2040,11 @@ impl OverlayNode {
let overlay = self.get_overlay(params.overlay_id, "Cannot add overlay")?;
let handle = params.runtime.unwrap_or_else(tokio::runtime::Handle::current);
handle.spawn(async move {
let local_adnl_key = if overlay.overlay_type.is_private() {
Some(overlay.overlay_key().unwrap_or(&default_key).id())
} else {
None
};
let mut timeout_peers = 0;
let mut last_one_time_broadcast = None;
let mut next_ping = None;
Expand Down Expand Up @@ -2055,6 +2072,31 @@ impl OverlayNode {
if let Err(e) = overlay.update_neighbours(1) {
log::error!(target: TARGET, "Error: {}", e)
}
if let Some(key) = &local_adnl_key {
let mut pending = Vec::new();
while let Some(peer) = overlay.pending_peers.pop() {
pending.push(peer);
}
for peer in pending {
match overlay.try_add_peer(key, &peer) {
Ok(true) => {
log::info!(
target: TARGET,
"Resolved pending peer {peer} in overlay {}",
overlay.overlay_id
);
continue;
}
Err(e) => log::warn!(
target: TARGET,
"Error resolving pending peer {peer} in overlay {}: {e}",
overlay.overlay_id
),
_ => (),
}
overlay.pending_peers.push(peer);
}
}
timeout_peers = 0;
}
let peer = if let Some(iter) = next_ping.as_mut() {
Expand Down Expand Up @@ -2101,6 +2143,34 @@ impl OverlayNode {
Ok(added)
}

fn add_peers_to_overlay(
&self,
overlay_id: &Arc<OverlayShortId>,
peers: &[Arc<KeyId>],
msg: &str,
) -> Result<usize> {
let overlay = self.get_overlay(overlay_id, msg)?;
let our_key = overlay.overlay_key().unwrap_or(&self.node_key).id();
let mut ret = 0;
for peer in peers {
if peer == our_key {
continue;
}
if overlay.try_add_peer(our_key, peer)? {
ret += 1;
} else {
log::info!(
target: TARGET,
"Peer {peer} has no ADNL address yet in overlay {}, queued for later",
overlay.overlay_id
);
overlay.pending_peers.push(peer.clone());
}
}
overlay.update_neighbours(Self::MAX_OVERLAY_NEIGHBOURS)?;
Ok(ret)
}

fn add_typed_private_overlay(
&self,
overlay_type: OverlayType,
Expand All @@ -2109,20 +2179,27 @@ impl OverlayNode {
) -> Result<bool> {
let overlay_id = params.overlay_id;
if self.add_overlay(overlay_type, params)? {
let overlay = self.get_overlay(&overlay_id, "Cannot add the private overlay")?;
let our_key = overlay.overlay_key().unwrap_or(&self.node_key).id();
for peer in peers {
if peer != our_key {
overlay.known_peers.add(peer)?;
}
}
overlay.update_neighbours(Self::MAX_OVERLAY_NEIGHBOURS)?;
self.add_peers_to_overlay(overlay_id, peers, "Cannot add the private overlay")?;
Ok(true)
} else {
Ok(false)
}
}

fn calc_src_key_for_broadcast<'a>(
&'a self,
overlay: &'a Overlay,
src_key: Option<&'a Arc<dyn KeyOption>>,
) -> &'a Arc<dyn KeyOption> {
if let Some(source) = src_key {
source
} else if let Some(key) = overlay.overlay_key() {
key
} else {
&self.node_key
}
}

fn check_overlay_adnl_address(&self, overlay: &Arc<Overlay>, adnl: &Arc<KeyId>) -> bool {
let local_adnl = overlay.overlay_key().unwrap_or(&self.node_key).id();
if local_adnl != adnl {
Expand Down Expand Up @@ -2399,7 +2476,8 @@ impl Subscriber for OverlayNode {
}
Ok(Broadcast::Overlay_BroadcastTwostepSimple(bcast)) => {
let big_data = bcast.data.len() >= Self::MIN_BYTES_FEC_TWO_STEPS_BROADCAST;
BroadcastTwostepSimpleProtocol::for_recv(big_data).recv(bcast, ctx).await?;
let reliable = big_data || ctx.overlay.overlay_type.quic_requested();
BroadcastTwostepSimpleProtocol::for_recv(reliable).recv(bcast, ctx).await?;
return Ok(true);
}
Ok(bcast) => fail!("Unsupported overlay broadcast message {:?}", bcast),
Expand Down
Loading