Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/libp2p/protocol/gossip/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ namespace libp2p::protocol::gossip {
Backoff graft_flood_threshold{10};

std::chrono::seconds heartbeat_interval{1};
/// Time to live for fanout peers (default is 60 seconds).
std::chrono::seconds fanout_ttl{60};

size_t retain_scores = 4;

Expand Down
14 changes: 13 additions & 1 deletion include/libp2p/protocol/gossip/gossip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,17 @@ namespace libp2p::protocol::gossip {
/** Publish a payload to this topic (signed locally). */
void publish(BytesIn message);
/** Count outbound peers currently in the mesh. */
size_t meshOutCount();
size_t meshOutCount() const;

std::weak_ptr<Gossip> weak_gossip_;
TopicHash topic_hash_;
bool publish_only_ = false;
CoroOutcomeChannel<Message> receive_channel_;
History history_;
TopicBackoff backoff_;
std::unordered_set<PeerPtr> peers_; // all peers subscribed
std::unordered_set<PeerPtr> mesh_peers_; // peers in mesh
time_cache::Time last_publish_{};
};

/**
Expand Down Expand Up @@ -260,6 +263,12 @@ namespace libp2p::protocol::gossip {
/** Subscribe locally to a topic by string name. */
std::shared_ptr<Topic> subscribe(std::string_view topic_hash);

/**
* Publish message to topic.
* Creates publish only topic if `subscribe` was not called before.
*/
void publish(const TopicHash &topic_hash, BytesIn data);

/** Publish a payload to a topic (signed, deduped, broadcast). */
void publish(Topic &topic, BytesIn data);

Expand Down Expand Up @@ -317,6 +326,9 @@ namespace libp2p::protocol::gossip {
bool handle_ihave(const PeerPtr &peer,
const gossipsub::pb::RPC &pb_message);

std::shared_ptr<Topic> getOrCreateTopic(const TopicHash &topic_hash,
bool publish_only);

// Dependencies and state
std::shared_ptr<boost::asio::io_context> io_context_;
std::shared_ptr<host::BasicHost> host_;
Expand Down
31 changes: 31 additions & 0 deletions include/qtils/retain_if.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <algorithm>
#include <vector>

namespace qtils {
template <typename T>
void retainIf(std::vector<T> &v, auto &&predicate) {
v.erase(std::remove_if(
v.begin(), v.end(), [&](T &v) { return not predicate(v); }),
v.end());
}

template <typename C>
requires requires { typename C::key_type; }
void retainIf(C &v, auto &&predicate) {
for (auto it = v.begin(); it != v.end();) {
if (not predicate(*it)) {
it = v.erase(it);
} else {
++it;
}
}
}
} // namespace qtils
96 changes: 82 additions & 14 deletions src/protocol/gossip/gossip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <qtils/bytes.hpp>
#include <qtils/bytestr.hpp>
#include <qtils/option_take.hpp>
#include <qtils/retain_if.hpp>

namespace libp2p::protocol::gossip {
// Signing context prefix for message signing/verification (libp2p-pubsub:).
Expand Down Expand Up @@ -199,7 +200,7 @@ namespace libp2p::protocol::gossip {
}

// Count outbound connections present in the mesh for the topic.
size_t Topic::meshOutCount() {
size_t Topic::meshOutCount() const {
size_t count = 0;
for (auto &peer : mesh_peers_) {
if (peer->out_) {
Expand Down Expand Up @@ -340,7 +341,10 @@ namespace libp2p::protocol::gossip {
peer->stream_out_ = stream;
if (not self->topics_.empty()) {
auto &message = self->getBatch(peer);
for (auto &topic_hash : self->topics_ | std::views::keys) {
for (auto &[topic_hash, topic] : self->topics_) {
if (topic->publish_only_) {
continue;
}
message.subscribe(topic_hash, true);
}
}
Expand Down Expand Up @@ -387,11 +391,17 @@ namespace libp2p::protocol::gossip {
// Subscribe locally to a topic: create Topic, announce to peers, and seed
// mesh.
std::shared_ptr<Topic> Gossip::subscribe(TopicHash topic_hash) {
return getOrCreateTopic(topic_hash, false);
}

std::shared_ptr<Topic> Gossip::getOrCreateTopic(const TopicHash &topic_hash,
bool publish_only) {
auto topic_it = topics_.find(topic_hash);
if (topic_it == topics_.end()) {
auto topic = std::make_shared<Topic>(Topic{
weak_from_this(),
topic_hash,
publish_only,
{*io_context_},
{config_.history_length},
{config_},
Expand All @@ -401,26 +411,50 @@ namespace libp2p::protocol::gossip {
if (peer->topics_.contains(topic_hash)) {
topic->peers_.emplace(peer);
}
}
}
auto &topic = topic_it->second;
// upgrade publish only topic to subscribed topic
if (not publish_only and topic->publish_only_) {
topic->publish_only_ = false;
for (auto &peer : topic->peers_) {
getBatch(peer).subscribe(topic_hash, true);
}
for (auto &peer : choose_peers_.choose(
topic->peers_,
[&](const PeerPtr &peer) {
return not topic->mesh_peers_.contains(peer)
and not is_backoff_with_slack(*topic, peer)
and not score_.below(peer->peer_id_, config_.score.zero);
},
config_.mesh_n_for_topic(topic_hash))) {
auto peers = std::exchange(topic->mesh_peers_, {});
auto predicate = [&](const PeerPtr &peer) {
return not topic->mesh_peers_.contains(peer)
and not is_backoff_with_slack(*topic, peer)
and not score_.below(peer->peer_id_, config_.score.zero);
};
auto mesh_n = config_.mesh_n_for_topic(topic_hash);
for (auto &peer : peers) {
if (topic->mesh_peers_.size() >= mesh_n) {
break;
}
if (not predicate(peer)) {
continue;
}
graft(*topic, peer);
}
if (auto more = saturating_sub(mesh_n, topic->mesh_peers_.size())) {
for (auto &peer :
choose_peers_.choose(topic->peers_, predicate, more)) {
graft(*topic, peer);
}
}
}
return topic_it->second;
return topic;
}

std::shared_ptr<Topic> Gossip::subscribe(std::string_view topic_hash) {
return subscribe(qtils::ByteVec(qtils::str2byte(topic_hash)));
}

void Gossip::publish(const TopicHash &topic_hash, BytesIn data) {
auto topic = getOrCreateTopic(topic_hash, true);
publish(*topic, data);
}

// Local publish path: create message (signed or anonymous), dedupe, and
// broadcast to peers.
void Gossip::publish(Topic &topic, BytesIn data) {
Expand Down Expand Up @@ -483,7 +517,7 @@ namespace libp2p::protocol::gossip {
auto &topic = topic_it->second;
topic->peers_.emplace(peer);

if (peer->isGossipsub()
if (not topic->publish_only_ and peer->isGossipsub()
and topic->mesh_peers_.size()
< config_.mesh_n_low_for_topic(topic_hash)
and not topic->mesh_peers_.contains(peer)
Expand Down Expand Up @@ -592,6 +626,9 @@ namespace libp2p::protocol::gossip {
auto &topic = topic_it->second;
topic->peers_.emplace(peer);

if (topic->publish_only_) {
continue;
}
if (topic->mesh_peers_.contains(peer)) {
continue;
}
Expand Down Expand Up @@ -739,12 +776,18 @@ namespace libp2p::protocol::gossip {
for (auto &peer : choose_peers_.choose(
topic.peers_,
[&](const PeerPtr &peer) {
return not topic.mesh_peers_.contains(peer);
return not topic.mesh_peers_.contains(peer)
and not score_.below(peer->peer_id_,
config_.score.publish_threshold);
},
more)) {
add_peer(peer);
if (topic.publish_only_) {
topic.mesh_peers_.emplace(peer);
}
}
}
topic.last_publish_ = time_cache::Clock::now();
}
}
}
Expand Down Expand Up @@ -923,7 +966,33 @@ namespace libp2p::protocol::gossip {

apply_iwant_penalties();

qtils::retainIf(topics_, [&](const decltype(topics_)::value_type &p) {
auto &topic = p.second;
return not topic->publish_only_
or time_cache::Clock::now() - topic->last_publish_
< config_.fanout_ttl;
});
for (auto &[topic_hash, topic] : topics_) {
auto mesh_n = config_.mesh_n_for_topic(topic_hash);
if (topic->publish_only_) {
qtils::retainIf(topic->mesh_peers_, [&](const PeerPtr &peer) {
return not score_.below(peer->peer_id_,
config_.score.publish_threshold);
});
if (auto more = saturating_sub(mesh_n, topic->mesh_peers_.size())) {
for (auto &peer : choose_peers_.choose(
topic->peers_,
[&](const PeerPtr &peer) {
return not topic->mesh_peers_.contains(peer)
and not score_.below(peer->peer_id_,
config_.score.publish_threshold);
},
more)) {
topic->mesh_peers_.emplace(peer);
}
}
continue;
}
for (auto peer_it = topic->mesh_peers_.begin();
peer_it != topic->mesh_peers_.end();) {
auto &peer = *peer_it;
Expand All @@ -935,7 +1004,6 @@ namespace libp2p::protocol::gossip {
}
}

auto mesh_n = config_.mesh_n_for_topic(topic_hash);
auto mesh_outbound_min = config_.mesh_outbound_min_for_topic(topic_hash);
if (topic->mesh_peers_.size()
< config_.mesh_n_low_for_topic(topic_hash)) {
Expand Down
Loading