Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace cyber {
namespace service_discovery {

class ParticipantListener
: public eprosima::fastdds::dds::DomainParticipantImpl {
: public eprosima::fastdds::rtps::RTPSParticipantListener {
public:
using ChangeFunc = std::function<void(
const eprosima::fastdds::rtps::ParticipantDiscoveryInfo& info)>;
Expand Down
17 changes: 5 additions & 12 deletions cyber/service_discovery/communication/subscriber_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,18 @@ SubscriberListener::~SubscriberListener() {
callback_ = nullptr;
}

void SubscriberListener::onNewDataMessage(eprosima::fastdds::dds::Subscriber* sub) {
void SubscriberListener::on_data_available(eprosima::fastdds::dds::DataReader* reader) {
RETURN_IF_NULL(callback_);

std::lock_guard<std::mutex> lock(mutex_);
eprosima::fastrtps::SampleInfo_t m_info;
eprosima::fastdds::dds::SampleInfo m_info;
cyber::transport::UnderlayMessage m;
RETURN_IF(!sub->takeNextData(reinterpret_cast<void*>(&m), &m_info));
RETURN_IF(m_info.sampleKind != eprosima::fastdds::rtps::ALIVE);
RETURN_IF(
reader->take_next_sample(reinterpret_cast<void*>(&m), &m_info) != eprosima::fastdds::dds::RETCODE_OK);
RETURN_IF(m_info.instance_state != eprosima::fastdds::dds::ALIVE_INSTANCE_STATE);

callback_(m.data());
}

void SubscriberListener::onSubscriptionMatched(
eprosima::fastdds::dds::Subscriber* sub,
eprosima::fastdds::rtps::MatchingInfo& info) {
(void)sub;
(void)info;
}

} // namespace service_discovery
} // namespace cyber
} // namespace apollo
10 changes: 5 additions & 5 deletions cyber/service_discovery/communication/subscriber_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <mutex>
#include <string>

#include "fastdds/dds/subscriber/SampleInfo.hpp"
#include "fastdds/dds/subscriber/DataReader.hpp"
#include "fastdds/dds/subscriber/DataReaderListener.hpp"
#include "fastdds/dds/subscriber/Subscriber.hpp"
#include "fastdds/dds/subscriber/SubscriberListener.hpp"
#include "fastdds/rtps/common/MatchingInfo.hpp"
Expand All @@ -29,17 +32,14 @@ namespace apollo {
namespace cyber {
namespace service_discovery {

class SubscriberListener : public eprosima::fastdds::dds::SubscriberListener {
class SubscriberListener : public eprosima::fastdds::dds::DataReaderListener {
public:
using NewMsgCallback = std::function<void(const std::string&)>;

explicit SubscriberListener(const NewMsgCallback& callback);
virtual ~SubscriberListener();

void onNewDataMessage(eprosima::fastdds::dds::Subscriber* sub);
void onSubscriptionMatched(
eprosima::fastdds::dds::Subscriber* sub,
eprosima::fastdds::rtps::MatchingInfo& info); // NOLINT
virtual void on_data_available(eprosima::fastdds::dds::DataReader* reader) override;

private:
NewMsgCallback callback_;
Expand Down
15 changes: 10 additions & 5 deletions cyber/service_discovery/specific_manager/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ Manager::Manager()

Manager::~Manager() { Shutdown(); }

bool Manager::StartDiscovery(RtpsParticipant* participant) {
bool Manager::StartDiscovery(eprosima::fastdds::dds::DomainParticipant* participant) {
if (participant == nullptr) {
return false;
}
participant_ = participant;
if (is_discovery_started_.exchange(true)) {
return true;
}
Expand All @@ -70,13 +71,17 @@ void Manager::StopDiscovery() {
{
std::lock_guard<std::mutex> lg(lock_);
if (publisher_ != nullptr) {
eprosima::fastrtps::Domain::removePublisher(publisher_);
if (participant_) {
participant_->delete_publisher(publisher_);
}
publisher_ = nullptr;
}
}

if (subscriber_ != nullptr) {
eprosima::fastrtps::Domain::removeSubscriber(subscriber_);
if (participant_) {
participant_->delete_subscriber(subscriber_);
}
subscriber_ = nullptr;
}

Expand Down Expand Up @@ -137,7 +142,7 @@ void Manager::RemoveChangeListener(const ChangeConnection& conn) {
local_conn.Disconnect();
}

bool Manager::CreatePublisher(RtpsParticipant* participant) {
bool Manager::CreatePublisher(eprosima::fastdds::dds::DomainParticipant* participant) {
RtpsPublisherAttr pub_attr;
RETURN_VAL_IF(
!AttributesFiller::FillInPubAttr(
Expand All @@ -148,7 +153,7 @@ bool Manager::CreatePublisher(RtpsParticipant* participant) {
return publisher_ != nullptr;
}

bool Manager::CreateSubscriber(RtpsParticipant* participant) {
bool Manager::CreateSubscriber(eprosima::fastdds::dds::DomainParticipant* participant) {
RtpsSubscriberAttr sub_attr;
RETURN_VAL_IF(
!AttributesFiller::FillInSubAttr(
Expand Down
9 changes: 5 additions & 4 deletions cyber/service_discovery/specific_manager/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "fastdds/dds/publisher/Publisher.hpp"
#include "fastdds/dds/subscriber/Subscriber.hpp"
#include "fastdds/dds/domain/DomainParticipant.hpp"
#include "xmlparser/attributes/PublisherAttributes.hpp"
#include "xmlparser/attributes/SubscriberAttributes.hpp"

Expand Down Expand Up @@ -53,7 +54,6 @@ class Manager {
using ChangeFunc = std::function<void(const ChangeMsg&)>;
using ChangeConnection = base::Connection<const ChangeMsg&>;

using RtpsParticipant = eprosima::fastdds::rtps::RTPSParticipant;
using RtpsPublisherAttr = eprosima::fastdds::PublisherAttributes;
using RtpsSubscriberAttr = eprosima::fastdds::SubscriberAttributes;

Expand All @@ -74,7 +74,7 @@ class Manager {
* @return true if start successfully
* @return false if start fail
*/
bool StartDiscovery(RtpsParticipant* participant);
bool StartDiscovery(eprosima::fastdds::dds::DomainParticipant* participant);

/**
* @brief Stop topology discovery
Expand Down Expand Up @@ -135,8 +135,8 @@ class Manager {
int process_id) = 0;

protected:
bool CreatePublisher(RtpsParticipant* participant);
bool CreateSubscriber(RtpsParticipant* participant);
bool CreatePublisher(eprosima::fastdds::dds::DomainParticipant* participant);
bool CreateSubscriber(eprosima::fastdds::dds::DomainParticipant* participant);

virtual bool Check(const RoleAttributes& attr) = 0;
virtual void Dispose(const ChangeMsg& msg) = 0;
Expand All @@ -157,6 +157,7 @@ class Manager {
std::string host_name_;
int process_id_;
std::string channel_name_;
eprosima::fastdds::dds::DomainParticipant* participant_;
eprosima::fastdds::dds::Publisher* publisher_;
std::mutex lock_;
eprosima::fastdds::dds::Subscriber* subscriber_;
Expand Down
16 changes: 8 additions & 8 deletions cyber/transport/rtps/attributes_filler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ bool AttributesFiller::FillInPubAttr(

pub_attr->topic.topicName = channel_name;
pub_attr->topic.topicDataType = "UnderlayMessage";
pub_attr->topic.topicKind = eprosima::fastdds::dds::NO_KEY;
pub_attr->topic.topicKind = eprosima::fastdds::rtps::NO_KEY;

switch (qos.history()) {
case QosHistoryPolicy::HISTORY_KEEP_LAST:
Expand Down Expand Up @@ -87,8 +87,8 @@ bool AttributesFiller::FillInPubAttr(

// transform messages per second to rtps heartbeat
// set default heartbeat period
pub_attr->times.heartbeatPeriod.seconds = 1;
pub_attr->times.heartbeatPeriod.fraction = 0;
pub_attr->times.heartbeat_period.seconds = 1;
pub_attr->times.heartbeat_period.fraction(0);
if (qos.mps() != 0) {
uint64_t mps = qos.mps();

Expand All @@ -103,14 +103,14 @@ bool AttributesFiller::FillInPubAttr(
uint32_t fraction = fractions & 0xffffffff;
int32_t seconds = static_cast<int32_t>(fractions >> 32);

pub_attr->times.heartbeatPeriod.seconds = seconds;
pub_attr->times.heartbeatPeriod.fraction = fraction;
pub_attr->times.heartbeat_period.seconds = seconds;
pub_attr->times.heartbeat_period.fraction(fraction);
}

pub_attr->qos.m_publishMode.kind =
eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE;
pub_attr->historyMemoryPolicy =
eprosima::fastdds::dds::DYNAMIC_RESERVE_MEMORY_MODE;
eprosima::fastdds::rtps::DYNAMIC_RESERVE_MEMORY_MODE;
pub_attr->topic.resourceLimitsQos.max_samples = 10000;

return true;
Expand All @@ -122,7 +122,7 @@ bool AttributesFiller::FillInSubAttr(
RETURN_VAL_IF_NULL(sub_attr, false);
sub_attr->topic.topicName = channel_name;
sub_attr->topic.topicDataType = "UnderlayMessage";
sub_attr->topic.topicKind = eprosima::fastdds::dds::NO_KEY;
sub_attr->topic.topicKind = eprosima::fastdds::rtps::NO_KEY;

switch (qos.history()) {
case QosHistoryPolicy::HISTORY_KEEP_LAST:
Expand Down Expand Up @@ -173,7 +173,7 @@ bool AttributesFiller::FillInSubAttr(
}

sub_attr->historyMemoryPolicy =
eprosima::fastdds::dds::DYNAMIC_RESERVE_MEMORY_MODE;
eprosima::fastdds::rtps::DYNAMIC_RESERVE_MEMORY_MODE;
sub_attr->topic.resourceLimitsQos.max_samples = 10000;

return true;
Expand Down
46 changes: 26 additions & 20 deletions cyber/transport/rtps/participant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include "cyber/transport/rtps/participant.h"

#include "fastdds/dds/domain/DomainParticipantFactory.hpp"
#include "fastdds/rtps/common/Locator.hpp"
#include "fastdds/utils/QosConverters.hpp"
#include "xmlparser/attributes/ParticipantAttributes.hpp"

#include "cyber/proto/transport_conf.pb.h"
Expand All @@ -28,14 +30,17 @@ namespace apollo {
namespace cyber {
namespace transport {

using eprosima::fastdds::dds::DomainParticipantFactory;

Participant::Participant(
const std::string& name, int send_port,
eprosima::fastdds::rtps::RTPSParticipantListener* listener)
eprosima::fastdds::dds::DomainParticipantListener* listener)
: shutdown_(false),
name_(name),
send_port_(send_port),
listener_(listener),
fastrtps_participant_(nullptr) {}
fastrtps_participant_(nullptr),
type_(new UnderlayMessageType()) {}

Participant::~Participant() {}

Expand All @@ -46,13 +51,13 @@ void Participant::Shutdown() {

std::lock_guard<std::mutex> lk(mutex_);
if (fastrtps_participant_ != nullptr) {
eprosima::fastrtps::Domain::removeParticipant(fastrtps_participant_);
DomainParticipantFactory::get_instance()->delete_participant(fastrtps_participant_);
fastrtps_participant_ = nullptr;
listener_ = nullptr;
}
}

eprosima::fastdds::rtps::RTPSParticipant* Participant::fastrtps_participant() {
eprosima::fastdds::dds::DomainParticipant* Participant::fastrtps_participant() {
if (shutdown_.load()) {
return nullptr;
}
Expand All @@ -68,7 +73,7 @@ eprosima::fastdds::rtps::RTPSParticipant* Participant::fastrtps_participant() {

void Participant::CreateFastRtpsParticipant(
const std::string& name, int send_port,
eprosima::fastdds::rtps::RTPSParticipantListener* listener) {
eprosima::fastdds::dds::DomainParticipantListener* listener) {
uint32_t domain_id = 80;

const char* val = ::getenv("CYBER_DOMAIN_ID");
Expand All @@ -89,27 +94,25 @@ void Participant::CreateFastRtpsParticipant(
}

eprosima::fastdds::ParticipantAttributes attr;
attr.rtps.defaultSendPort = send_port;
attr.rtps.port.domainIDGain =
static_cast<uint16_t>(part_attr_conf->domain_id_gain());
attr.rtps.port.portBase = static_cast<uint16_t>(part_attr_conf->port_base());
attr.rtps.use_IP6_to_send = false;
attr.rtps.builtin.use_SIMPLE_RTPSParticipantDiscoveryProtocol = true;
attr.rtps.builtin.use_SIMPLE_EndpointDiscoveryProtocol = true;
attr.rtps.builtin.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter =
attr.rtps.builtin.discovery_config.discoveryProtocol = eprosima::fastdds::rtps::DiscoveryProtocol::SIMPLE;
attr.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true;
attr.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter =
true;
attr.rtps.builtin.m_simpleEDP.use_PublicationWriterANDSubscriptionReader =
attr.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader =
true;
attr.rtps.builtin.domainId = domain_id;
attr.domainId = domain_id;

/**
* The user should set the lease_duration and the announcement_period with
* values that differ in at least 30%. Values too close to each other may
* cause the failure of the writer liveliness assertion in networks with high
* latency or with lots of communication errors.
*/
attr.rtps.builtin.leaseDuration.seconds = part_attr_conf->lease_duration();
attr.rtps.builtin.leaseDuration_announcementperiod.seconds =
attr.rtps.builtin.discovery_config.leaseDuration.seconds = part_attr_conf->lease_duration();
attr.rtps.builtin.discovery_config.leaseDuration_announcementperiod.seconds =
part_attr_conf->announcement_period();

attr.rtps.setName(name.c_str());
Expand All @@ -126,22 +129,25 @@ void Participant::CreateFastRtpsParticipant(
ADEBUG << "cyber ip: " << ip_env;

eprosima::fastdds::rtps::Locator_t locator;
locator.port = 0;
RETURN_IF(!locator.set_IP4_address(ip_env));
locator.port = send_port;
RETURN_IF(!eprosima::fastdds::rtps::IPLocator::setIPv4(locator, ip_env));

locator.kind = LOCATOR_KIND_UDPv4;

attr.rtps.defaultUnicastLocatorList.push_back(locator);
attr.rtps.defaultOutLocatorList.push_back(locator);
attr.rtps.builtin.metatrafficUnicastLocatorList.push_back(locator);

locator.set_IP4_address(239, 255, 0, 1);
eprosima::fastdds::rtps::IPLocator::setIPv4(locator, std::string("239.255.0.1"));
attr.rtps.builtin.metatrafficMulticastLocatorList.push_back(locator);

eprosima::fastdds::dds::DomainParticipantExtendedQos extended_qos;
eprosima::fastdds::dds::utils::set_attributes_from_extended_qos(
attr, extended_qos
);
fastrtps_participant_ =
eprosima::fastrtps::Domain::createParticipant(attr, listener);
DomainParticipantFactory::get_instance()->create_participant(extended_qos, listener_);
RETURN_IF_NULL(fastrtps_participant_);
eprosima::fastrtps::Domain::registerType(fastrtps_participant_, &type_);
type_.register_type(fastrtps_participant_);
}

} // namespace transport
Expand Down
16 changes: 10 additions & 6 deletions cyber/transport/rtps/participant.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
#include <mutex>
#include <string>

#include "fastdds/dds/domain/DomainParticipant.hpp"
#include "fastdds/dds/topic/TypeSupport.hpp"
#include "fastdds/rtps/common/Locator.hpp"
#include "fastdds/rtps/participant/RTPSParticipant.hpp"
#include "fastdds/rtps/participant/RTPSParticipantListener.hpp"

#include "cyber/transport/rtps/underlay_message_type.h"

Expand All @@ -36,25 +40,25 @@ using ParticipantPtr = std::shared_ptr<Participant>;
class Participant {
public:
Participant(const std::string& name, int send_port,
eprosima::fastdds::rtps::RTPSParticipantListener* listener = nullptr);
eprosima::fastdds::dds::DomainParticipantListener* listener = nullptr);
virtual ~Participant();

void Shutdown();

eprosima::fastdds::rtps::RTPSParticipant* fastrtps_participant();
eprosima::fastdds::dds::DomainParticipant* fastrtps_participant();
bool is_shutdown() const { return shutdown_.load(); }

private:
void CreateFastRtpsParticipant(
const std::string& name, int send_port,
eprosima::fastdds::rtps::RTPSParticipantListener* listener);
eprosima::fastdds::dds::DomainParticipantListener* listener);

std::atomic<bool> shutdown_;
std::string name_;
int send_port_;
eprosima::fastdds::rtps::RTPSParticipantListener* listener_;
UnderlayMessageType type_;
eprosima::fastdds::rtps::RTPSParticipant* fastrtps_participant_;
eprosima::fastdds::dds::DomainParticipantListener* listener_;
eprosima::fastdds::dds::TypeSupport type_;
eprosima::fastdds::dds::DomainParticipant* fastrtps_participant_;
std::mutex mutex_;
};

Expand Down
Loading