diff --git a/src/net_processing.cpp b/src/net_processing.cpp index e36d147c3240..28d7d2c13ab6 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -379,7 +379,7 @@ struct Peer { * permit self-announcement. */ double m_addr_token_bucket GUARDED_BY(NetEventsInterface::g_msgproc_mutex){1.0}; /** When m_addr_token_bucket was last updated */ - std::chrono::microseconds m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){GetTime()}; + NodeClock::time_point m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){NodeClock::now()}; /** Total number of addresses that were dropped due to rate limiting. */ std::atomic m_addr_rate_limited{0}; /** Total number of addresses that were processed (excludes rate-limited ones). */ @@ -1083,6 +1083,9 @@ class PeerManagerImpl final : public PeerManager */ bool SetupAddressRelay(const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); + void ProcessAddrs(std::string_view msg_type, CNode& pfrom, Peer& peer, std::vector&& vAddr, const std::atomic& interruptMsgProc) + EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_peer_mutex); + void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); void PushAddress(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); @@ -4026,90 +4029,8 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string }; std::vector vAddr; - vRecv >> ser_params(vAddr); - - if (!SetupAddressRelay(pfrom, peer)) { - LogDebug(BCLog::NET, "ignoring %s message from %s peer=%d\n", msg_type, pfrom.ConnectionTypeAsString(), pfrom.GetId()); - return; - } - - if (vAddr.size() > MAX_ADDR_TO_SEND) - { - Misbehaving(peer, strprintf("%s message size = %u", msg_type, vAddr.size())); - return; - } - - // Store the new addresses - std::vector vAddrOk; - const auto current_a_time{Now()}; - - // Update/increment addr rate limiting bucket. - const auto current_time{GetTime()}; - if (peer.m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) { - // Don't increment bucket if it's already full - const auto time_diff = std::max(current_time - peer.m_addr_token_timestamp, 0us); - const double increment = Ticks(time_diff) * MAX_ADDR_RATE_PER_SECOND; - peer.m_addr_token_bucket = std::min(peer.m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET); - } - peer.m_addr_token_timestamp = current_time; - - const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr); - uint64_t num_proc = 0; - uint64_t num_rate_limit = 0; - std::shuffle(vAddr.begin(), vAddr.end(), m_rng); - for (CAddress& addr : vAddr) - { - if (interruptMsgProc) - return; - - // Apply rate limiting. - if (peer.m_addr_token_bucket < 1.0) { - if (rate_limited) { - ++num_rate_limit; - continue; - } - } else { - peer.m_addr_token_bucket -= 1.0; - } - // We only bother storing full nodes, though this may include - // things which we would not make an outbound connection to, in - // part because we may make feeler connections to them. - if (!MayHaveUsefulAddressDB(addr.nServices) && !HasAllDesirableServiceFlags(addr.nServices)) - continue; - - if (addr.nTime <= NodeSeconds{100000000s} || addr.nTime > current_a_time + 10min) { - addr.nTime = current_a_time - 5 * 24h; - } - AddAddressKnown(peer, addr); - if (m_banman && (m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr))) { - // Do not process banned/discouraged addresses beyond remembering we received them - continue; - } - ++num_proc; - const bool reachable{g_reachable_nets.Contains(addr)}; - if (addr.nTime > current_a_time - 10min && !peer.m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) { - // Relay to a limited number of other nodes - RelayAddress(pfrom.GetId(), addr, reachable); - } - // Do not store addresses outside our network - if (reachable) { - vAddrOk.push_back(addr); - } - } - peer.m_addr_processed += num_proc; - peer.m_addr_rate_limited += num_rate_limit; - LogDebug(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n", - vAddr.size(), num_proc, num_rate_limit, pfrom.GetId()); - - m_addrman.Add(vAddrOk, pfrom.addr, /*time_penalty=*/2h); - if (vAddr.size() < 1000) peer.m_getaddr_sent = false; - - // AddrFetch: Require multiple addresses to avoid disconnecting on self-announcements - if (pfrom.IsAddrFetchConn() && vAddr.size() > 1) { - LogDebug(BCLog::NET, "addrfetch connection completed, %s", pfrom.DisconnectMsg()); - pfrom.fDisconnect = true; - } + ProcessAddrs(msg_type, pfrom, peer, std::move(vAddr), interruptMsgProc); return; } @@ -5701,6 +5622,93 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer) return true; } +void PeerManagerImpl::ProcessAddrs(std::string_view msg_type, CNode& pfrom, Peer& peer, std::vector&& vAddr, const std::atomic& interruptMsgProc) +{ + AssertLockNotHeld(m_peer_mutex); + AssertLockHeld(g_msgproc_mutex); + + if (!SetupAddressRelay(pfrom, peer)) { + LogDebug(BCLog::NET, "ignoring %s message from %s peer=%d\n", msg_type, pfrom.ConnectionTypeAsString(), pfrom.GetId()); + return; + } + + if (vAddr.size() > MAX_ADDR_TO_SEND) + { + Misbehaving(peer, strprintf("%s message size = %u", msg_type, vAddr.size())); + return; + } + + // Store the new addresses + std::vector vAddrOk; + + // Update/increment addr rate limiting bucket. + const auto current_time{NodeClock::now()}; + if (peer.m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) { + // Don't increment bucket if it's already full + const auto time_diff{current_time - peer.m_addr_token_timestamp}; + const double increment{std::max(Ticks(time_diff), 0.0) * MAX_ADDR_RATE_PER_SECOND}; + peer.m_addr_token_bucket = std::min(peer.m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET); + } + peer.m_addr_token_timestamp = current_time; + + const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr); + uint64_t num_proc = 0; + uint64_t num_rate_limit = 0; + std::shuffle(vAddr.begin(), vAddr.end(), m_rng); + for (CAddress& addr : vAddr) + { + if (interruptMsgProc) + return; + + // Apply rate limiting. + if (peer.m_addr_token_bucket < 1.0) { + if (rate_limited) { + ++num_rate_limit; + continue; + } + } else { + peer.m_addr_token_bucket -= 1.0; + } + // We only bother storing full nodes, though this may include + // things which we would not make an outbound connection to, in + // part because we may make feeler connections to them. + if (!MayHaveUsefulAddressDB(addr.nServices) && !HasAllDesirableServiceFlags(addr.nServices)) + continue; + + if (addr.nTime <= NodeSeconds{100000000s} || addr.nTime > current_time + 10min) { + addr.nTime = std::chrono::time_point_cast(current_time - 5 * 24h); + } + AddAddressKnown(peer, addr); + if (m_banman && (m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr))) { + // Do not process banned/discouraged addresses beyond remembering we received them + continue; + } + ++num_proc; + const bool reachable{g_reachable_nets.Contains(addr)}; + if (addr.nTime > current_time - 10min && !peer.m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) { + // Relay to a limited number of other nodes + RelayAddress(pfrom.GetId(), addr, reachable); + } + // Do not store addresses outside our network + if (reachable) { + vAddrOk.push_back(addr); + } + } + peer.m_addr_processed += num_proc; + peer.m_addr_rate_limited += num_rate_limit; + LogDebug(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n", + vAddr.size(), num_proc, num_rate_limit, pfrom.GetId()); + + m_addrman.Add(vAddrOk, pfrom.addr, /*time_penalty=*/2h); + if (vAddr.size() < 1000) peer.m_getaddr_sent = false; + + // AddrFetch: Require multiple addresses to avoid disconnecting on self-announcements + if (pfrom.IsAddrFetchConn() && vAddr.size() > 1) { + LogDebug(BCLog::NET, "addrfetch connection completed, %s", pfrom.DisconnectMsg()); + pfrom.fDisconnect = true; + } +} + bool PeerManagerImpl::SendMessages(CNode& node) { AssertLockNotHeld(m_tx_download_mutex); diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index a4052f910243..cef49a3845e0 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -584,6 +584,11 @@ def remove_tests(exclude_list): def run_tests(*, test_list, build_dir, tmpdir, jobs=1, enable_coverage=False, args=None, combined_logs_len=0, failfast=False, use_term_control, results_filepath=None): args = args or [] + # Some optional Python dependencies (e.g. pycapnp) may emit warnings or fail under + # CPython free-threaded builds when the GIL is disabled. Force it on for all + # functional tests so every child process inherits PYTHON_GIL=1. + os.environ["PYTHON_GIL"] = "1" + # Warn if bitcoind is already running try: # pgrep exits with code zero when one or more matching processes found