From 9c91b1561f97d50e04f23fc7a1a9bb343a1ea198 Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 15 Jan 2026 19:22:21 +0500 Subject: [PATCH 1/2] fanout (publish only) --- include/libp2p/protocol/gossip/config.hpp | 2 + include/libp2p/protocol/gossip/gossip.hpp | 14 +++- include/qtils/retain_if.hpp | 31 ++++++++ src/protocol/gossip/gossip.cpp | 95 +++++++++++++++++++---- 4 files changed, 127 insertions(+), 15 deletions(-) create mode 100644 include/qtils/retain_if.hpp diff --git a/include/libp2p/protocol/gossip/config.hpp b/include/libp2p/protocol/gossip/config.hpp index f61e7f57..7925bc2a 100644 --- a/include/libp2p/protocol/gossip/config.hpp +++ b/include/libp2p/protocol/gossip/config.hpp @@ -168,6 +168,8 @@ namespace libp2p::protocol::gossip { Backoff graft_flood_threshold{10}; std::chrono::seconds heartbeat_interval{1}; + /// Time to live for fanout peers (default is 60 seconds). + std::chrono::seconds fanout_ttl{60}; size_t retain_scores = 4; diff --git a/include/libp2p/protocol/gossip/gossip.hpp b/include/libp2p/protocol/gossip/gossip.hpp index bb170214..08132566 100644 --- a/include/libp2p/protocol/gossip/gossip.hpp +++ b/include/libp2p/protocol/gossip/gossip.hpp @@ -150,14 +150,17 @@ namespace libp2p::protocol::gossip { /** Publish a payload to this topic (signed locally). */ void publish(BytesIn message); /** Count outbound peers currently in the mesh. */ - size_t meshOutCount(); + size_t meshOutCount() const; + std::weak_ptr weak_gossip_; TopicHash topic_hash_; + bool publish_only_ = false; CoroOutcomeChannel receive_channel_; History history_; TopicBackoff backoff_; std::unordered_set peers_; // all peers subscribed std::unordered_set mesh_peers_; // peers in mesh + time_cache::Time last_publish_; }; /** @@ -260,6 +263,12 @@ namespace libp2p::protocol::gossip { /** Subscribe locally to a topic by string name. */ std::shared_ptr subscribe(std::string_view topic_hash); + /** + * Publish message to topic. + * Creates publish only topic if `subscribe` was not called before. + */ + void publish(const TopicHash &topic_hash, BytesIn data); + /** Publish a payload to a topic (signed, deduped, broadcast). */ void publish(Topic &topic, BytesIn data); @@ -317,6 +326,9 @@ namespace libp2p::protocol::gossip { bool handle_ihave(const PeerPtr &peer, const gossipsub::pb::RPC &pb_message); + std::shared_ptr getOrCreateTopic(const TopicHash &topic_hash, + bool publish_only); + // Dependencies and state std::shared_ptr io_context_; std::shared_ptr host_; diff --git a/include/qtils/retain_if.hpp b/include/qtils/retain_if.hpp new file mode 100644 index 00000000..3f9864e0 --- /dev/null +++ b/include/qtils/retain_if.hpp @@ -0,0 +1,31 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include + +namespace qtils { + template + void retainIf(std::vector &v, auto &&predicate) { + v.erase(std::remove_if( + v.begin(), v.end(), [&](T &v) { return not predicate(v); }), + v.end()); + } + + template + requires requires { typename C::key_type; } + void retainIf(C &v, auto &&predicate) { + for (auto it = v.begin(); it != v.end();) { + if (not predicate(*it)) { + it = v.erase(it); + } else { + ++it; + } + } + } +} // namespace qtils diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index 9ff8dfaf..85f72895 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace libp2p::protocol::gossip { // Signing context prefix for message signing/verification (libp2p-pubsub:). @@ -199,7 +200,7 @@ namespace libp2p::protocol::gossip { } // Count outbound connections present in the mesh for the topic. - size_t Topic::meshOutCount() { + size_t Topic::meshOutCount() const { size_t count = 0; for (auto &peer : mesh_peers_) { if (peer->out_) { @@ -340,7 +341,10 @@ namespace libp2p::protocol::gossip { peer->stream_out_ = stream; if (not self->topics_.empty()) { auto &message = self->getBatch(peer); - for (auto &topic_hash : self->topics_ | std::views::keys) { + for (auto &[topic_hash, topic] : self->topics_) { + if (topic->publish_only_) { + continue; + } message.subscribe(topic_hash, true); } } @@ -387,11 +391,17 @@ namespace libp2p::protocol::gossip { // Subscribe locally to a topic: create Topic, announce to peers, and seed // mesh. std::shared_ptr Gossip::subscribe(TopicHash topic_hash) { + return getOrCreateTopic(topic_hash, false); + } + + std::shared_ptr Gossip::getOrCreateTopic(const TopicHash &topic_hash, + bool publish_only) { auto topic_it = topics_.find(topic_hash); if (topic_it == topics_.end()) { auto topic = std::make_shared(Topic{ weak_from_this(), topic_hash, + publish_only, {*io_context_}, {config_.history_length}, {config_}, @@ -401,26 +411,49 @@ namespace libp2p::protocol::gossip { if (peer->topics_.contains(topic_hash)) { topic->peers_.emplace(peer); } + } + } + auto &topic = topic_it->second; + if (not publish_only and topic->publish_only_) { + topic->publish_only_ = false; + for (auto &peer : topic->peers_) { getBatch(peer).subscribe(topic_hash, true); } - for (auto &peer : choose_peers_.choose( - topic->peers_, - [&](const PeerPtr &peer) { - return not topic->mesh_peers_.contains(peer) - and not is_backoff_with_slack(*topic, peer) - and not score_.below(peer->peer_id_, config_.score.zero); - }, - config_.mesh_n_for_topic(topic_hash))) { + auto peers = std::exchange(topic->mesh_peers_, {}); + auto predicate = [&](const PeerPtr &peer) { + return not topic->mesh_peers_.contains(peer) + and not is_backoff_with_slack(*topic, peer) + and not score_.below(peer->peer_id_, config_.score.zero); + }; + auto mesh_n = config_.mesh_n_for_topic(topic_hash); + for (auto &peer : peers) { + if (topic->mesh_peers_.size() >= mesh_n) { + break; + } + if (not predicate(peer)) { + continue; + } graft(*topic, peer); } + if (auto more = saturating_sub(mesh_n, topic->mesh_peers_.size())) { + for (auto &peer : + choose_peers_.choose(topic->peers_, predicate, more)) { + graft(*topic, peer); + } + } } - return topic_it->second; + return topic; } std::shared_ptr Gossip::subscribe(std::string_view topic_hash) { return subscribe(qtils::ByteVec(qtils::str2byte(topic_hash))); } + void Gossip::publish(const TopicHash &topic_hash, BytesIn data) { + auto topic = getOrCreateTopic(topic_hash, true); + publish(*topic, data); + } + // Local publish path: create message (signed or anonymous), dedupe, and // broadcast to peers. void Gossip::publish(Topic &topic, BytesIn data) { @@ -483,7 +516,7 @@ namespace libp2p::protocol::gossip { auto &topic = topic_it->second; topic->peers_.emplace(peer); - if (peer->isGossipsub() + if (not topic->publish_only_ and peer->isGossipsub() and topic->mesh_peers_.size() < config_.mesh_n_low_for_topic(topic_hash) and not topic->mesh_peers_.contains(peer) @@ -592,6 +625,9 @@ namespace libp2p::protocol::gossip { auto &topic = topic_it->second; topic->peers_.emplace(peer); + if (topic->publish_only_) { + continue; + } if (topic->mesh_peers_.contains(peer)) { continue; } @@ -739,12 +775,18 @@ namespace libp2p::protocol::gossip { for (auto &peer : choose_peers_.choose( topic.peers_, [&](const PeerPtr &peer) { - return not topic.mesh_peers_.contains(peer); + return not topic.mesh_peers_.contains(peer) + and not score_.below(peer->peer_id_, + config_.score.publish_threshold); }, more)) { add_peer(peer); + if (topic.publish_only_) { + topic.mesh_peers_.emplace(peer); + } } } + topic.last_publish_ = time_cache::Clock::now(); } } } @@ -923,7 +965,33 @@ namespace libp2p::protocol::gossip { apply_iwant_penalties(); + qtils::retainIf(topics_, [&](const decltype(topics_)::value_type &p) { + auto &topic = p.second; + return not topic->publish_only_ + or time_cache::Clock::now() - topic->last_publish_ + < config_.fanout_ttl; + }); for (auto &[topic_hash, topic] : topics_) { + auto mesh_n = config_.mesh_n_for_topic(topic_hash); + if (topic->publish_only_) { + qtils::retainIf(topic->mesh_peers_, [&](const PeerPtr &peer) { + return not score_.below(peer->peer_id_, + config_.score.publish_threshold); + }); + if (auto more = saturating_sub(mesh_n, topic->mesh_peers_.size())) { + for (auto &peer : choose_peers_.choose( + topic->peers_, + [&](const PeerPtr &peer) { + return not topic->mesh_peers_.contains(peer) + and not score_.below(peer->peer_id_, + config_.score.publish_threshold); + }, + more)) { + topic->mesh_peers_.emplace(peer); + } + } + continue; + } for (auto peer_it = topic->mesh_peers_.begin(); peer_it != topic->mesh_peers_.end();) { auto &peer = *peer_it; @@ -935,7 +1003,6 @@ namespace libp2p::protocol::gossip { } } - auto mesh_n = config_.mesh_n_for_topic(topic_hash); auto mesh_outbound_min = config_.mesh_outbound_min_for_topic(topic_hash); if (topic->mesh_peers_.size() < config_.mesh_n_low_for_topic(topic_hash)) { From e28843dad920a83cf9e31871afd8d57610f6f1df Mon Sep 17 00:00:00 2001 From: turuslan Date: Fri, 16 Jan 2026 13:17:04 +0500 Subject: [PATCH 2/2] pr comment --- include/libp2p/protocol/gossip/gossip.hpp | 2 +- src/protocol/gossip/gossip.cpp | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/include/libp2p/protocol/gossip/gossip.hpp b/include/libp2p/protocol/gossip/gossip.hpp index 08132566..36e7e8f9 100644 --- a/include/libp2p/protocol/gossip/gossip.hpp +++ b/include/libp2p/protocol/gossip/gossip.hpp @@ -160,7 +160,7 @@ namespace libp2p::protocol::gossip { TopicBackoff backoff_; std::unordered_set peers_; // all peers subscribed std::unordered_set mesh_peers_; // peers in mesh - time_cache::Time last_publish_; + time_cache::Time last_publish_{}; }; /** diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index 85f72895..78c6514f 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -414,6 +414,7 @@ namespace libp2p::protocol::gossip { } } auto &topic = topic_it->second; + // upgrade publish only topic to subscribed topic if (not publish_only and topic->publish_only_) { topic->publish_only_ = false; for (auto &peer : topic->peers_) {