Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 92 additions & 84 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ struct Peer {
* permit self-announcement. */
double m_addr_token_bucket GUARDED_BY(NetEventsInterface::g_msgproc_mutex){1.0};
/** When m_addr_token_bucket was last updated */
std::chrono::microseconds m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){GetTime<std::chrono::microseconds>()};
NodeClock::time_point m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){NodeClock::now()};
/** Total number of addresses that were dropped due to rate limiting. */
std::atomic<uint64_t> m_addr_rate_limited{0};
/** Total number of addresses that were processed (excludes rate-limited ones). */
Expand Down Expand Up @@ -1083,6 +1083,9 @@ class PeerManagerImpl final : public PeerManager
*/
bool SetupAddressRelay(const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);

void ProcessAddrs(std::string_view msg_type, CNode& pfrom, Peer& peer, std::vector<CAddress>&& vAddr, const std::atomic<bool>& interruptMsgProc)
EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_peer_mutex);

void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void PushAddress(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);

Expand Down Expand Up @@ -4026,90 +4029,8 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string
};

std::vector<CAddress> vAddr;

vRecv >> ser_params(vAddr);

if (!SetupAddressRelay(pfrom, peer)) {
LogDebug(BCLog::NET, "ignoring %s message from %s peer=%d\n", msg_type, pfrom.ConnectionTypeAsString(), pfrom.GetId());
return;
}

if (vAddr.size() > MAX_ADDR_TO_SEND)
{
Misbehaving(peer, strprintf("%s message size = %u", msg_type, vAddr.size()));
return;
}

// Store the new addresses
std::vector<CAddress> vAddrOk;
const auto current_a_time{Now<NodeSeconds>()};

// Update/increment addr rate limiting bucket.
const auto current_time{GetTime<std::chrono::microseconds>()};
if (peer.m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
// Don't increment bucket if it's already full
const auto time_diff = std::max(current_time - peer.m_addr_token_timestamp, 0us);
const double increment = Ticks<SecondsDouble>(time_diff) * MAX_ADDR_RATE_PER_SECOND;
peer.m_addr_token_bucket = std::min<double>(peer.m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
peer.m_addr_token_timestamp = current_time;

const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr);
uint64_t num_proc = 0;
uint64_t num_rate_limit = 0;
std::shuffle(vAddr.begin(), vAddr.end(), m_rng);
for (CAddress& addr : vAddr)
{
if (interruptMsgProc)
return;

// Apply rate limiting.
if (peer.m_addr_token_bucket < 1.0) {
if (rate_limited) {
++num_rate_limit;
continue;
}
} else {
peer.m_addr_token_bucket -= 1.0;
}
// We only bother storing full nodes, though this may include
// things which we would not make an outbound connection to, in
// part because we may make feeler connections to them.
if (!MayHaveUsefulAddressDB(addr.nServices) && !HasAllDesirableServiceFlags(addr.nServices))
continue;

if (addr.nTime <= NodeSeconds{100000000s} || addr.nTime > current_a_time + 10min) {
addr.nTime = current_a_time - 5 * 24h;
}
AddAddressKnown(peer, addr);
if (m_banman && (m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr))) {
// Do not process banned/discouraged addresses beyond remembering we received them
continue;
}
++num_proc;
const bool reachable{g_reachable_nets.Contains(addr)};
if (addr.nTime > current_a_time - 10min && !peer.m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) {
// Relay to a limited number of other nodes
RelayAddress(pfrom.GetId(), addr, reachable);
}
// Do not store addresses outside our network
if (reachable) {
vAddrOk.push_back(addr);
}
}
peer.m_addr_processed += num_proc;
peer.m_addr_rate_limited += num_rate_limit;
LogDebug(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n",
vAddr.size(), num_proc, num_rate_limit, pfrom.GetId());

m_addrman.Add(vAddrOk, pfrom.addr, /*time_penalty=*/2h);
if (vAddr.size() < 1000) peer.m_getaddr_sent = false;

// AddrFetch: Require multiple addresses to avoid disconnecting on self-announcements
if (pfrom.IsAddrFetchConn() && vAddr.size() > 1) {
LogDebug(BCLog::NET, "addrfetch connection completed, %s", pfrom.DisconnectMsg());
pfrom.fDisconnect = true;
}
ProcessAddrs(msg_type, pfrom, peer, std::move(vAddr), interruptMsgProc);
return;
}

Expand Down Expand Up @@ -5701,6 +5622,93 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer)
return true;
}

void PeerManagerImpl::ProcessAddrs(std::string_view msg_type, CNode& pfrom, Peer& peer, std::vector<CAddress>&& vAddr, const std::atomic<bool>& interruptMsgProc)
{
AssertLockNotHeld(m_peer_mutex);
AssertLockHeld(g_msgproc_mutex);

if (!SetupAddressRelay(pfrom, peer)) {
LogDebug(BCLog::NET, "ignoring %s message from %s peer=%d\n", msg_type, pfrom.ConnectionTypeAsString(), pfrom.GetId());
return;
}

if (vAddr.size() > MAX_ADDR_TO_SEND)
{
Misbehaving(peer, strprintf("%s message size = %u", msg_type, vAddr.size()));
return;
}

// Store the new addresses
std::vector<CAddress> vAddrOk;

// Update/increment addr rate limiting bucket.
const auto current_time{NodeClock::now()};
if (peer.m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
// Don't increment bucket if it's already full
const auto time_diff{current_time - peer.m_addr_token_timestamp};
const double increment{std::max(Ticks<SecondsDouble>(time_diff), 0.0) * MAX_ADDR_RATE_PER_SECOND};
peer.m_addr_token_bucket = std::min<double>(peer.m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
peer.m_addr_token_timestamp = current_time;

const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr);
uint64_t num_proc = 0;
uint64_t num_rate_limit = 0;
std::shuffle(vAddr.begin(), vAddr.end(), m_rng);
for (CAddress& addr : vAddr)
{
if (interruptMsgProc)
return;

// Apply rate limiting.
if (peer.m_addr_token_bucket < 1.0) {
if (rate_limited) {
++num_rate_limit;
continue;
}
} else {
peer.m_addr_token_bucket -= 1.0;
}
// We only bother storing full nodes, though this may include
// things which we would not make an outbound connection to, in
// part because we may make feeler connections to them.
if (!MayHaveUsefulAddressDB(addr.nServices) && !HasAllDesirableServiceFlags(addr.nServices))
continue;

if (addr.nTime <= NodeSeconds{100000000s} || addr.nTime > current_time + 10min) {
addr.nTime = std::chrono::time_point_cast<std::chrono::seconds>(current_time - 5 * 24h);
}
AddAddressKnown(peer, addr);
if (m_banman && (m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr))) {
// Do not process banned/discouraged addresses beyond remembering we received them
continue;
}
++num_proc;
const bool reachable{g_reachable_nets.Contains(addr)};
if (addr.nTime > current_time - 10min && !peer.m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) {
// Relay to a limited number of other nodes
RelayAddress(pfrom.GetId(), addr, reachable);
}
// Do not store addresses outside our network
if (reachable) {
vAddrOk.push_back(addr);
}
}
peer.m_addr_processed += num_proc;
peer.m_addr_rate_limited += num_rate_limit;
LogDebug(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n",
vAddr.size(), num_proc, num_rate_limit, pfrom.GetId());

m_addrman.Add(vAddrOk, pfrom.addr, /*time_penalty=*/2h);
if (vAddr.size() < 1000) peer.m_getaddr_sent = false;

// AddrFetch: Require multiple addresses to avoid disconnecting on self-announcements
if (pfrom.IsAddrFetchConn() && vAddr.size() > 1) {
LogDebug(BCLog::NET, "addrfetch connection completed, %s", pfrom.DisconnectMsg());
pfrom.fDisconnect = true;
}
}

bool PeerManagerImpl::SendMessages(CNode& node)
{
AssertLockNotHeld(m_tx_download_mutex);
Expand Down
5 changes: 5 additions & 0 deletions test/functional/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,11 @@ def remove_tests(exclude_list):
def run_tests(*, test_list, build_dir, tmpdir, jobs=1, enable_coverage=False, args=None, combined_logs_len=0, failfast=False, use_term_control, results_filepath=None):
args = args or []

# Some optional Python dependencies (e.g. pycapnp) may emit warnings or fail under
# CPython free-threaded builds when the GIL is disabled. Force it on for all
# functional tests so every child process inherits PYTHON_GIL=1.
os.environ["PYTHON_GIL"] = "1"

# Warn if bitcoind is already running
try:
# pgrep exits with code zero when one or more matching processes found
Expand Down
Loading