Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 68 additions & 17 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
#include "rmw_dds_common/qos.hpp"
#include "rmw_dds_common/security.hpp"

#include "rosidl_runtime_c/type_hash.h"

#include "rosidl_typesupport_cpp/message_type_support.hpp"

#include "tracetools/tracetools.h"
Expand Down Expand Up @@ -938,10 +940,29 @@ static void handle_builtintopic_endpoint(
rmw_gid_t ppgid;
dds_qos_to_rmw_qos(s->qos, &qos_profile);
convert_guid_to_gid(s->participant_key, ppgid);

rosidl_type_hash_t type_hash = rosidl_get_zero_initialized_type_hash();
void * userdata;
size_t userdata_size;
if (dds_qget_userdata(s->qos, &userdata, &userdata_size)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure we are leaking the userdata here. At least, the implementation in https://github.com/eclipse-cyclonedds/cyclonedds/blob/11b923aceb2faa506459fee6cfd708345569ac43/src/core/ddsc/src/dds_qos.c#L31 (which is what dds_qget_userdata calls) seems to allocate memory, and we don't seem to be freeing it anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Rebased (all 4 prs) and added RCPPUTILS_SCOPE_EXIT(dds_free(userdata)); in the if block.

RCPPUTILS_SCOPE_EXIT(dds_free(userdata));
if (RMW_RET_OK != rmw_dds_common::parse_type_hash_from_user_data(
reinterpret_cast<const uint8_t *>(userdata), userdata_size, type_hash))
{
RCUTILS_LOG_WARN_NAMED(
"rmw_cyclonedds_cpp",
"Failed to parse type hash for topic '%s' with type '%s' from USER_DATA '%*s'.",
s->topic_name, s->type_name,
static_cast<int>(userdata_size), reinterpret_cast<char *>(userdata));
type_hash = rosidl_get_zero_initialized_type_hash();
}
}

impl->common.graph_cache.add_entity(
gid,
std::string(s->topic_name),
std::string(s->type_name),
type_hash,
ppgid,
qos_profile,
is_reader);
Expand Down Expand Up @@ -2055,7 +2076,9 @@ static rmw_time_t dds_duration_to_rmw(dds_duration_t duration)

static dds_qos_t * create_readwrite_qos(
const rmw_qos_profile_t * qos_policies,
bool ignore_local_publications)
const rosidl_type_hash_t & type_hash,
bool ignore_local_publications,
const std::string & extra_user_data)
{
dds_duration_t ldur;
dds_qos_t * qos = dds_create_qos();
Expand Down Expand Up @@ -2147,6 +2170,17 @@ static dds_qos_t * create_readwrite_qos(
if (ignore_local_publications) {
dds_qset_ignorelocal(qos, DDS_IGNORELOCAL_PARTICIPANT);
}

std::string typehash_str;
if (RMW_RET_OK != rmw_dds_common::encode_type_hash_for_user_data_qos(type_hash, typehash_str)) {
RCUTILS_LOG_WARN_NAMED(
"rmw_cyclonedds_cpp",
"Failed to encode type hash for topic, will not distribute it in USER_DATA.");
typehash_str.clear();
}
std::string user_data = extra_user_data + typehash_str;
dds_qset_userdata(qos, user_data.data(), user_data.size());

return qos;
}

Expand Down Expand Up @@ -2343,7 +2377,7 @@ static CddsPublisher * create_cdds_publisher(
set_error_message_from_create_topic(topic, fqtopic_name);
goto fail_topic;
}
if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) {
if ((qos = create_readwrite_qos(qos_policies, *type_support->type_hash, false, "")) == nullptr) {
goto fail_qos;
}
if ((pub->enth = dds_create_writer(dds_pub, topic, qos, listener)) < 0) {
Expand Down Expand Up @@ -2875,7 +2909,9 @@ static CddsSubscription * create_cdds_subscription(
set_error_message_from_create_topic(topic, fqtopic_name);
goto fail_topic;
}
if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) {
if ((qos = create_readwrite_qos(
qos_policies, *type_support->type_hash, ignore_local_publications, "")) == nullptr)
{
goto fail_qos;
}
if ((sub->enth = dds_create_reader(dds_sub, topic, qos, listener)) < 0) {
Expand Down Expand Up @@ -4740,6 +4776,10 @@ static rmw_ret_t rmw_init_cs(
auto sub = std::make_unique<CddsSubscription>();
std::string subtopic_name, pubtopic_name;
void * pub_type_support, * sub_type_support;
dds_qos_t * pub_qos, * sub_qos;
const rosidl_type_hash_t * pub_type_hash;
const rosidl_type_hash_t * sub_type_hash;
std::string user_data;

std::unique_ptr<rmw_cyclonedds_cpp::StructValueType> pub_msg_ts, sub_msg_ts;

Expand All @@ -4752,8 +4792,10 @@ static rmw_ret_t rmw_init_cs(

sub_type_support = create_request_type_support(
type_support->data, type_support->typesupport_identifier);
sub_type_hash = type_supports->request_typesupport->type_hash;
pub_type_support = create_response_type_support(
type_support->data, type_support->typesupport_identifier);
pub_type_hash = type_supports->response_typesupport->type_hash;
subtopic_name =
make_fqtopic(ROS_SERVICE_REQUESTER_PREFIX, service_name, "Request", qos_policies);
pubtopic_name = make_fqtopic(ROS_SERVICE_RESPONSE_PREFIX, service_name, "Reply", qos_policies);
Expand All @@ -4763,8 +4805,10 @@ static rmw_ret_t rmw_init_cs(

pub_type_support = create_request_type_support(
type_support->data, type_support->typesupport_identifier);
pub_type_hash = type_supports->request_typesupport->type_hash;
sub_type_support = create_response_type_support(
type_support->data, type_support->typesupport_identifier);
sub_type_hash = type_supports->response_typesupport->type_hash;
pubtopic_name =
make_fqtopic(ROS_SERVICE_REQUESTER_PREFIX, service_name, "Request", qos_policies);
subtopic_name = make_fqtopic(ROS_SERVICE_RESPONSE_PREFIX, service_name, "Reply", qos_policies);
Expand Down Expand Up @@ -4798,28 +4842,32 @@ static rmw_ret_t rmw_init_cs(
set_error_message_from_create_topic(subtopic, subtopic_name);
goto fail_subtopic;
}
// before proceeding to outright ignore given QoS policies, sanity check them
dds_qos_t * qos;
if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) {
goto fail_qos;
}

// store a unique identifier for this client/service in the user
// data of the reader and writer so that we can always determine
// which pairs belong together
get_unique_csid(node, cs->id);
{
std::string user_data = std::string(is_service ? "serviceid=" : "clientid=") + csid_to_string(
cs->id) + std::string(";");
dds_qset_userdata(qos, user_data.c_str(), user_data.size());
user_data = std::string(is_service ? "serviceid=" : "clientid=") + csid_to_string(
cs->id) + std::string(";");

if ((pub_qos = create_readwrite_qos(qos_policies, *pub_type_hash, false, user_data)) == nullptr) {
goto fail_pub_qos;
}
if ((pub->enth = dds_create_writer(node->context->impl->dds_pub, pubtopic, qos, nullptr)) < 0) {
if ((sub_qos = create_readwrite_qos(qos_policies, *sub_type_hash, false, user_data)) == nullptr) {
goto fail_sub_qos;
}

if ((pub->enth =
dds_create_writer(node->context->impl->dds_pub, pubtopic, pub_qos, nullptr)) < 0)
{
RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer;
}
get_entity_gid(pub->enth, pub->gid);
pub->sertype = pub_stact;
if ((sub->enth = dds_create_reader(node->context->impl->dds_sub, subtopic, qos, listener)) < 0) {
if ((sub->enth =
dds_create_reader(node->context->impl->dds_sub, subtopic, sub_qos, listener)) < 0)
{
RMW_SET_ERROR_MSG("failed to create reader");
goto fail_reader;
}
Expand All @@ -4833,7 +4881,8 @@ static rmw_ret_t rmw_init_cs(
goto fail_instance_handle;
}
dds_delete_listener(listener);
dds_delete_qos(qos);
dds_delete_qos(pub_qos);
dds_delete_qos(sub_qos);
dds_delete(subtopic);
dds_delete(pubtopic);

Expand All @@ -4848,8 +4897,10 @@ static rmw_ret_t rmw_init_cs(
fail_reader:
dds_delete(pub->enth);
fail_writer:
dds_delete_qos(qos);
fail_qos:
dds_delete_qos(sub_qos);
fail_sub_qos:
dds_delete_qos(pub_qos);
fail_pub_qos:
dds_delete(subtopic);
fail_subtopic:
dds_delete(pubtopic);
Expand Down