Skip to content

Commit 161ac60

Browse files
authored
Merge pull request #69 from RSquad/simplex_upgrade
Updates for Simplex implementation
2 parents 2f9529d + 04cd7a0 commit 161ac60

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+9234
-6261
lines changed

src/adnl/src/overlay/broadcast.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1946,16 +1946,16 @@ impl BroadcastParsed for BroadcastTwostepSimple {
19461946
}
19471947

19481948
pub(crate) struct BroadcastTwostepSimpleProtocol {
1949-
big_data: bool,
19501949
extra: Option<Vec<u8>>,
1950+
reliable: bool,
19511951
}
19521952

19531953
impl BroadcastTwostepSimpleProtocol {
1954-
pub(crate) fn for_recv(big_data: bool) -> Self {
1955-
Self { big_data, extra: None }
1954+
pub(crate) fn for_recv(reliable: bool) -> Self {
1955+
Self { reliable, extra: None }
19561956
}
1957-
pub(crate) fn for_send(big_data: bool, extra: Vec<u8>) -> Self {
1958-
Self { big_data, extra: Some(extra) }
1957+
pub(crate) fn for_send(reliable: bool, extra: Vec<u8>) -> Self {
1958+
Self { reliable, extra: Some(extra) }
19591959
}
19601960
fn calc_to_sign(bcast_id: BroadcastId, data: &[u8]) -> Result<Vec<u8>> {
19611961
let to_sign =
@@ -1992,7 +1992,7 @@ impl BroadcastProtocol<BroadcastTwostepSimple> for BroadcastTwostepSimpleProtoco
19921992
}
19931993

19941994
fn send_method(&self) -> BroadcastSendMethod {
1995-
if self.big_data {
1995+
if self.reliable {
19961996
BroadcastSendMethod::QuicOrRldp
19971997
} else {
19981998
BroadcastSendMethod::Fast

src/adnl/src/overlay/mod.rs

Lines changed: 124 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,10 @@ impl OverlayType {
377377
}
378378
}
379379

380+
fn quic_requested(&self) -> bool {
381+
matches!(self, OverlayType::Private { use_quic: true, .. })
382+
}
383+
380384
fn calc_message_prefix(&self, overlay_id: &OverlayShortId) -> Result<Vec<u8>> {
381385
match self {
382386
Self::CertifiedMembers { certificate, .. } => serialize_boxed(
@@ -485,6 +489,8 @@ declare_counted!(
485489
nodes: lockfree::map::Map<Arc<KeyId>, NodeObject>,
486490
overlay_id: Arc<OverlayShortId>,
487491
owned_broadcasts: lockfree::map::Map<BroadcastId, OwnedBroadcast>,
492+
// Peers waiting for ADNL address resolution before being added to known_peers
493+
pending_peers: lockfree::queue::Queue<Arc<KeyId>>,
488494
purge_broadcasts: lockfree::queue::Queue<BroadcastId>,
489495
purge_broadcasts_count: AtomicU32,
490496
queue_one_time_broadcasts: tokio::sync::mpsc::UnboundedSender<(BroadcastId, Instant)>,
@@ -998,6 +1004,15 @@ impl Overlay {
9981004
.map_err(|e| error!("Error putting one time broadcast into monitoring queue: {e}"))
9991005
}
10001006

1007+
fn try_add_peer(&self, our_key: &Arc<KeyId>, peer: &Arc<KeyId>) -> Result<bool> {
1008+
if self.adnl.have_peer(our_key, peer)? {
1009+
self.known_peers.add(peer)?;
1010+
Ok(true)
1011+
} else {
1012+
Ok(false)
1013+
}
1014+
}
1015+
10011016
fn update_neighbours(&self, n: u32) -> Result<()> {
10021017
if self.overlay_type.is_private() {
10031018
let n = min(self.known_peers.all().count(), n);
@@ -1428,31 +1443,8 @@ impl OverlayNode {
14281443
self.add_typed_private_overlay(overlay_type, params, peers)
14291444
}
14301445

1431-
/// Add semiprivate overlay
1432-
pub fn add_semiprivate_overlay(
1433-
&self,
1434-
params: OverlayParams,
1435-
overlay_key: Option<&Arc<dyn KeyOption>>,
1436-
root_members: &[Arc<KeyId>],
1437-
certificate: Option<MemberCertificate>,
1438-
max_slaves: usize,
1439-
) -> Result<bool> {
1440-
let mut root_members_full = HashMap::with_capacity(root_members.len());
1441-
for member in root_members {
1442-
root_members_full.insert(member.clone(), lockfree::map::Map::new());
1443-
}
1444-
let overlay_type = OverlayType::CertifiedMembers {
1445-
root_members: root_members_full,
1446-
max_slaves,
1447-
bcast_prefix: OverlayUtils::calc_message_prefix(params.overlay_id)?,
1448-
certificate,
1449-
key: overlay_key.cloned(),
1450-
};
1451-
self.add_typed_private_overlay(overlay_type, params, root_members)
1452-
}
1453-
1454-
/// Add private overlay peers
1455-
pub fn add_private_peers(
1446+
/// Add private peers to ADNL layer
1447+
pub fn add_private_peers_to_adnl(
14561448
&self,
14571449
local_adnl_key: &Arc<KeyId>,
14581450
peers: Vec<(IpAddress, Option<IpAddress>, Arc<dyn KeyOption>)>,
@@ -1466,6 +1458,15 @@ impl OverlayNode {
14661458
Ok(ret)
14671459
}
14681460

1461+
/// Add private peers to the overlay
1462+
pub fn add_private_peers_to_overlay(
1463+
&self,
1464+
overlay_id: &Arc<OverlayShortId>,
1465+
peers: &[Arc<KeyId>],
1466+
) -> Result<usize> {
1467+
self.add_peers_to_overlay(overlay_id, peers, "Cannot get overlay to add peers")
1468+
}
1469+
14691470
/// Add public overlay peer
14701471
pub fn add_public_peer<N1: Borrow<NodeV1>, N2: Borrow<NodeV2>>(
14711472
&self,
@@ -1551,18 +1552,27 @@ impl OverlayNode {
15511552
Ok(Some(ret))
15521553
}
15531554

1554-
fn calc_src_key_for_broadcast<'a>(
1555-
&'a self,
1556-
overlay: &'a Overlay,
1557-
src_key: Option<&'a Arc<dyn KeyOption>>,
1558-
) -> &'a Arc<dyn KeyOption> {
1559-
if let Some(source) = src_key {
1560-
source
1561-
} else if let Some(key) = overlay.overlay_key() {
1562-
key
1563-
} else {
1564-
&self.node_key
1555+
/// Add semiprivate overlay
1556+
pub fn add_semiprivate_overlay(
1557+
&self,
1558+
params: OverlayParams,
1559+
overlay_key: Option<&Arc<dyn KeyOption>>,
1560+
root_members: &[Arc<KeyId>],
1561+
certificate: Option<MemberCertificate>,
1562+
max_slaves: usize,
1563+
) -> Result<bool> {
1564+
let mut root_members_full = HashMap::with_capacity(root_members.len());
1565+
for member in root_members {
1566+
root_members_full.insert(member.clone(), lockfree::map::Map::new());
15651567
}
1568+
let overlay_type = OverlayType::CertifiedMembers {
1569+
root_members: root_members_full,
1570+
max_slaves,
1571+
bcast_prefix: OverlayUtils::calc_message_prefix(params.overlay_id)?,
1572+
certificate,
1573+
key: overlay_key.cloned(),
1574+
};
1575+
self.add_typed_private_overlay(overlay_type, params, root_members)
15661576
}
15671577

15681578
/// Broadcast message
@@ -1617,10 +1627,11 @@ impl OverlayNode {
16171627
};
16181628
let neighbours = overlay.calc_broadcast_twostep_neighbours();
16191629
let big_data = data.object.len() >= Self::MIN_BYTES_FEC_TWO_STEPS_BROADCAST;
1630+
let reliable = big_data || overlay.overlay_type.quic_requested();
16201631
if big_data && (neighbours >= Self::MIN_NODES_FEC_TWO_STEPS_BROADCAST) {
16211632
BroadcastTwostepFecProtocol::for_send(data.object, neighbours, extra)?.send(ctx).await
16221633
} else {
1623-
BroadcastTwostepSimpleProtocol::for_send(big_data, extra).send(ctx).await
1634+
BroadcastTwostepSimpleProtocol::for_send(reliable, extra).send(ctx).await
16241635
}
16251636
}
16261637

@@ -1986,6 +1997,7 @@ impl OverlayNode {
19861997
overlay_id: params.overlay_id.clone(),
19871998
overlay_type,
19881999
owned_broadcasts: lockfree::map::Map::new(),
2000+
pending_peers: lockfree::queue::Queue::new(),
19892001
purge_broadcasts: lockfree::queue::Queue::new(),
19902002
purge_broadcasts_count: AtomicU32::new(0),
19912003
queue_one_time_broadcasts: sender_one_time,
@@ -2028,6 +2040,11 @@ impl OverlayNode {
20282040
let overlay = self.get_overlay(params.overlay_id, "Cannot add overlay")?;
20292041
let handle = params.runtime.unwrap_or_else(tokio::runtime::Handle::current);
20302042
handle.spawn(async move {
2043+
let local_adnl_key = if overlay.overlay_type.is_private() {
2044+
Some(overlay.overlay_key().unwrap_or(&default_key).id())
2045+
} else {
2046+
None
2047+
};
20312048
let mut timeout_peers = 0;
20322049
let mut last_one_time_broadcast = None;
20332050
let mut next_ping = None;
@@ -2055,6 +2072,31 @@ impl OverlayNode {
20552072
if let Err(e) = overlay.update_neighbours(1) {
20562073
log::error!(target: TARGET, "Error: {}", e)
20572074
}
2075+
if let Some(key) = &local_adnl_key {
2076+
let mut pending = Vec::new();
2077+
while let Some(peer) = overlay.pending_peers.pop() {
2078+
pending.push(peer);
2079+
}
2080+
for peer in pending {
2081+
match overlay.try_add_peer(key, &peer) {
2082+
Ok(true) => {
2083+
log::info!(
2084+
target: TARGET,
2085+
"Resolved pending peer {peer} in overlay {}",
2086+
overlay.overlay_id
2087+
);
2088+
continue;
2089+
}
2090+
Err(e) => log::warn!(
2091+
target: TARGET,
2092+
"Error resolving pending peer {peer} in overlay {}: {e}",
2093+
overlay.overlay_id
2094+
),
2095+
_ => (),
2096+
}
2097+
overlay.pending_peers.push(peer);
2098+
}
2099+
}
20582100
timeout_peers = 0;
20592101
}
20602102
let peer = if let Some(iter) = next_ping.as_mut() {
@@ -2101,6 +2143,34 @@ impl OverlayNode {
21012143
Ok(added)
21022144
}
21032145

2146+
fn add_peers_to_overlay(
2147+
&self,
2148+
overlay_id: &Arc<OverlayShortId>,
2149+
peers: &[Arc<KeyId>],
2150+
msg: &str,
2151+
) -> Result<usize> {
2152+
let overlay = self.get_overlay(overlay_id, msg)?;
2153+
let our_key = overlay.overlay_key().unwrap_or(&self.node_key).id();
2154+
let mut ret = 0;
2155+
for peer in peers {
2156+
if peer == our_key {
2157+
continue;
2158+
}
2159+
if overlay.try_add_peer(our_key, peer)? {
2160+
ret += 1;
2161+
} else {
2162+
log::info!(
2163+
target: TARGET,
2164+
"Peer {peer} has no ADNL address yet in overlay {}, queued for later",
2165+
overlay.overlay_id
2166+
);
2167+
overlay.pending_peers.push(peer.clone());
2168+
}
2169+
}
2170+
overlay.update_neighbours(Self::MAX_OVERLAY_NEIGHBOURS)?;
2171+
Ok(ret)
2172+
}
2173+
21042174
fn add_typed_private_overlay(
21052175
&self,
21062176
overlay_type: OverlayType,
@@ -2109,20 +2179,27 @@ impl OverlayNode {
21092179
) -> Result<bool> {
21102180
let overlay_id = params.overlay_id;
21112181
if self.add_overlay(overlay_type, params)? {
2112-
let overlay = self.get_overlay(&overlay_id, "Cannot add the private overlay")?;
2113-
let our_key = overlay.overlay_key().unwrap_or(&self.node_key).id();
2114-
for peer in peers {
2115-
if peer != our_key {
2116-
overlay.known_peers.add(peer)?;
2117-
}
2118-
}
2119-
overlay.update_neighbours(Self::MAX_OVERLAY_NEIGHBOURS)?;
2182+
self.add_peers_to_overlay(overlay_id, peers, "Cannot add the private overlay")?;
21202183
Ok(true)
21212184
} else {
21222185
Ok(false)
21232186
}
21242187
}
21252188

2189+
fn calc_src_key_for_broadcast<'a>(
2190+
&'a self,
2191+
overlay: &'a Overlay,
2192+
src_key: Option<&'a Arc<dyn KeyOption>>,
2193+
) -> &'a Arc<dyn KeyOption> {
2194+
if let Some(source) = src_key {
2195+
source
2196+
} else if let Some(key) = overlay.overlay_key() {
2197+
key
2198+
} else {
2199+
&self.node_key
2200+
}
2201+
}
2202+
21262203
fn check_overlay_adnl_address(&self, overlay: &Arc<Overlay>, adnl: &Arc<KeyId>) -> bool {
21272204
let local_adnl = overlay.overlay_key().unwrap_or(&self.node_key).id();
21282205
if local_adnl != adnl {
@@ -2399,7 +2476,8 @@ impl Subscriber for OverlayNode {
23992476
}
24002477
Ok(Broadcast::Overlay_BroadcastTwostepSimple(bcast)) => {
24012478
let big_data = bcast.data.len() >= Self::MIN_BYTES_FEC_TWO_STEPS_BROADCAST;
2402-
BroadcastTwostepSimpleProtocol::for_recv(big_data).recv(bcast, ctx).await?;
2479+
let reliable = big_data || ctx.overlay.overlay_type.quic_requested();
2480+
BroadcastTwostepSimpleProtocol::for_recv(reliable).recv(bcast, ctx).await?;
24032481
return Ok(true);
24042482
}
24052483
Ok(bcast) => fail!("Unsupported overlay broadcast message {:?}", bcast),

0 commit comments

Comments
 (0)