Skip to content

Commit 75b311b

Browse files
authored
Merge pull request #54 from RSquad/quic_dedup
QUIC connection deduplication fix
2 parents 14d7a71 + 39d6b56 commit 75b311b

3 files changed

Lines changed: 319 additions & 32 deletions

File tree

src/adnl/src/quic/mod.rs

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ use ton_block::{
3939

4040
const TARGET: &str = "quic";
4141

42+
/// Key for the QUIC inbound connection map: (local_key_id, peer_key_id).
43+
/// Matches the C++ `AdnlPath{local_id, peer_id}` semantics so that two
44+
/// connections from the same peer address but different key pairs (e.g.
45+
/// current + next validator keys) coexist instead of evicting each other.
46+
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
47+
struct QuicInboundKey(Arc<KeyId>, Arc<KeyId>);
48+
49+
type QuicInboundMap = lockfree::map::Map<QuicInboundKey, quinn::Connection>;
4250
type QuicSendQueue = SendQueue<Vec<u8>>;
4351

4452
/// Extract a `KeyId` from an Ed25519 SubjectPublicKeyInfo (SPKI) DER blob.
@@ -347,7 +355,7 @@ pub struct QuicNode {
347355
/// Max concurrent in-flight streams per inbound connection.
348356
max_streams_per_connection: usize,
349357
/// Inbound connection maps, one per endpoint/accept-loop. Used by the stats dumper.
350-
inbound_pools: Mutex<Vec<Arc<Connections<quinn::Connection>>>>,
358+
inbound_pools: Mutex<Vec<Arc<QuicInboundMap>>>,
351359
/// Per-TL-tag message counters for the stats dumper.
352360
msg_stats: Arc<MsgStats>,
353361
}
@@ -762,7 +770,7 @@ impl QuicNode {
762770
let local_key_names: Arc<lockfree::map::Map<String, Arc<KeyId>>> =
763771
Arc::new(lockfree::map::Map::new());
764772

765-
let inbound: Arc<Connections<quinn::Connection>> = Connections::new();
773+
let inbound: Arc<QuicInboundMap> = Arc::new(lockfree::map::Map::new());
766774
match self.inbound_pools.lock() {
767775
Ok(mut pools) => pools.push(inbound.clone()),
768776
Err(e) => log::warn!(
@@ -840,7 +848,7 @@ impl QuicNode {
840848
incoming: quinn::Incoming,
841849
local_key_names: Arc<lockfree::map::Map<String, Arc<KeyId>>>,
842850
server_cert_resolver: Arc<QuicServerCertResolver>,
843-
inbound: Arc<Connections<quinn::Connection>>,
851+
inbound: Arc<QuicInboundMap>,
844852
subscribers: Arc<Vec<Arc<dyn Subscriber>>>,
845853
bind_addr: SocketAddr,
846854
max_streams_per_connection: usize,
@@ -900,25 +908,32 @@ impl QuicNode {
900908
}
901909
};
902910

911+
let inbound_key = QuicInboundKey(local_key_id.clone(), peer_key_id.clone());
903912
let had_existing = {
904913
let mut found_existing = false;
905-
let result = add_unbound_object_to_map_with_update(inbound.map(), addr, |existing| {
906-
if existing.is_some() {
907-
found_existing = true;
908-
// Keep existing entry; resolver task will handle replacement
909-
Ok(None)
910-
} else {
911-
Ok(Some(conn.clone()))
912-
}
913-
});
914+
let result =
915+
add_unbound_object_to_map_with_update(&inbound, inbound_key.clone(), |existing| {
916+
if existing.is_some() {
917+
found_existing = true;
918+
// Keep existing entry; resolver task will handle replacement
919+
Ok(None)
920+
} else {
921+
Ok(Some(conn.clone()))
922+
}
923+
});
914924
if let Err(e) = result {
915925
log::warn!(target: TARGET, "Store QUIC inbound for {addr}: {e}");
916926
return;
917927
}
918928
found_existing
919929
};
920930
if had_existing {
921-
tokio::spawn(Self::resolve_duplicate_connection(inbound.clone(), conn.clone(), addr));
931+
tokio::spawn(Self::resolve_duplicate_connection(
932+
inbound.clone(),
933+
conn.clone(),
934+
inbound_key.clone(),
935+
addr,
936+
));
922937
}
923938

924939
let peers = AdnlPeers::with_keys(local_key_id, peer_key_id);
@@ -1008,9 +1023,9 @@ impl QuicNode {
10081023
() = uni_loop => {}
10091024
}
10101025
let is_current =
1011-
inbound.map().get(&addr).map(|e| e.val().stable_id() == conn_id).unwrap_or(false);
1026+
inbound.get(&inbound_key).map(|e| e.val().stable_id() == conn_id).unwrap_or(false);
10121027
if is_current {
1013-
inbound.map().remove(&addr);
1028+
inbound.remove(&inbound_key);
10141029
}
10151030
log::info!(
10161031
target: TARGET,
@@ -1231,36 +1246,33 @@ impl QuicNode {
12311246
}
12321247

12331248
async fn resolve_duplicate_connection(
1234-
inbound: Arc<Connections<quinn::Connection>>,
1249+
inbound: Arc<QuicInboundMap>,
12351250
new_conn: quinn::Connection,
1251+
key: QuicInboundKey,
12361252
addr: SocketAddr,
12371253
) {
12381254
use rand::Rng;
12391255
let delay_ms = rand::thread_rng().gen_range(500..=2500);
12401256
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
12411257

12421258
let old_alive =
1243-
inbound.map().get(&addr).map(|e| e.val().close_reason().is_none()).unwrap_or(false);
1259+
inbound.get(&key).map(|e| e.val().close_reason().is_none()).unwrap_or(false);
12441260
let new_alive = new_conn.close_reason().is_none();
12451261

12461262
if old_alive && new_alive {
1247-
if let Some(old) = inbound.map().remove(&addr) {
1263+
if let Some(old) = inbound.remove(&key) {
12481264
log::info!(
12491265
target: TARGET,
12501266
"Closing old duplicate inbound from {addr} (both alive after {delay_ms}ms)"
12511267
);
12521268
old.val().close(0u32.into(), b"Replaced by new inbound");
12531269
}
12541270
let nc = new_conn.clone();
1255-
let _ = add_unbound_object_to_map_with_update(inbound.map(), addr, |_| {
1256-
Ok(Some(nc.clone()))
1257-
});
1271+
let _ = add_unbound_object_to_map_with_update(&inbound, key, |_| Ok(Some(nc.clone())));
12581272
} else if new_alive {
1259-
inbound.map().remove(&addr);
1273+
inbound.remove(&key);
12601274
let nc = new_conn.clone();
1261-
let _ = add_unbound_object_to_map_with_update(inbound.map(), addr, |_| {
1262-
Ok(Some(nc.clone()))
1263-
});
1275+
let _ = add_unbound_object_to_map_with_update(&inbound, key, |_| Ok(Some(nc.clone())));
12641276
log::debug!(
12651277
target: TARGET,
12661278
"Old inbound from {addr} already closed, keeping new"
@@ -1440,7 +1452,7 @@ impl QuicNode {
14401452
bind_addr: SocketAddr,
14411453
max_streams_per_connection: usize,
14421454
cancellation_token: tokio_util::sync::CancellationToken,
1443-
inbound: Arc<Connections<quinn::Connection>>,
1455+
inbound: Arc<QuicInboundMap>,
14441456
msg_stats: Arc<MsgStats>,
14451457
) {
14461458
tokio::spawn(async move {
@@ -1613,8 +1625,9 @@ impl QuicNode {
16131625
}
16141626
};
16151627
for pool in &pools {
1616-
for conn_entry in pool.map().iter() {
1617-
let addr = *conn_entry.key();
1628+
for conn_entry in pool.iter() {
1629+
let QuicInboundKey(ref local_id, ref peer_id) = *conn_entry.key();
1630+
let addr = conn_entry.val().remote_address();
16181631
let conn = conn_entry.val();
16191632
let s = conn.stats();
16201633
let id = (conn.stable_id(), false);
@@ -1626,9 +1639,9 @@ impl QuicNode {
16261639
fmt::Write::write_fmt(
16271640
&mut dump,
16281641
format_args!(
1629-
" inbound peer={addr} \
1630-
dtx={} bytes/{} dgrams drx={} bytes/{} dgrams \
1631-
dlost={} pkts rtt={:?} cwnd={} mtu={}\n",
1642+
" inbound peer={addr} local={local_id} remote={peer_id} \
1643+
dtx={} bytes/{} dgrams drx={} bytes/{} dgrams \
1644+
dlost={} pkts rtt={:?} cwnd={} mtu={}\n",
16321645
delta.tx_bytes,
16331646
delta.tx_dgrams,
16341647
delta.rx_bytes,

0 commit comments

Comments
 (0)