diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 29e7c53a0..eb5966732 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -75,6 +75,8 @@ #include "rmw_dds_common/qos.hpp" #include "rmw_dds_common/security.hpp" +#include "rosidl_runtime_c/type_hash.h" + #include "rosidl_typesupport_cpp/message_type_support.hpp" #include "tracetools/tracetools.h" @@ -938,10 +940,29 @@ static void handle_builtintopic_endpoint( rmw_gid_t ppgid; dds_qos_to_rmw_qos(s->qos, &qos_profile); convert_guid_to_gid(s->participant_key, ppgid); + + rosidl_type_hash_t type_hash = rosidl_get_zero_initialized_type_hash(); + void * userdata; + size_t userdata_size; + if (dds_qget_userdata(s->qos, &userdata, &userdata_size)) { + RCPPUTILS_SCOPE_EXIT(dds_free(userdata)); + if (RMW_RET_OK != rmw_dds_common::parse_type_hash_from_user_data( + reinterpret_cast(userdata), userdata_size, type_hash)) + { + RCUTILS_LOG_WARN_NAMED( + "rmw_cyclonedds_cpp", + "Failed to parse type hash for topic '%s' with type '%s' from USER_DATA '%*s'.", + s->topic_name, s->type_name, + static_cast(userdata_size), reinterpret_cast(userdata)); + type_hash = rosidl_get_zero_initialized_type_hash(); + } + } + impl->common.graph_cache.add_entity( gid, std::string(s->topic_name), std::string(s->type_name), + type_hash, ppgid, qos_profile, is_reader); @@ -2055,7 +2076,9 @@ static rmw_time_t dds_duration_to_rmw(dds_duration_t duration) static dds_qos_t * create_readwrite_qos( const rmw_qos_profile_t * qos_policies, - bool ignore_local_publications) + const rosidl_type_hash_t & type_hash, + bool ignore_local_publications, + const std::string & extra_user_data) { dds_duration_t ldur; dds_qos_t * qos = dds_create_qos(); @@ -2147,6 +2170,17 @@ static dds_qos_t * create_readwrite_qos( if (ignore_local_publications) { dds_qset_ignorelocal(qos, DDS_IGNORELOCAL_PARTICIPANT); } + + std::string typehash_str; + if (RMW_RET_OK != rmw_dds_common::encode_type_hash_for_user_data_qos(type_hash, typehash_str)) { + RCUTILS_LOG_WARN_NAMED( + "rmw_cyclonedds_cpp", + "Failed to encode type hash for topic, will not distribute it in USER_DATA."); + typehash_str.clear(); + } + std::string user_data = extra_user_data + typehash_str; + dds_qset_userdata(qos, user_data.data(), user_data.size()); + return qos; } @@ -2343,7 +2377,7 @@ static CddsPublisher * create_cdds_publisher( set_error_message_from_create_topic(topic, fqtopic_name); goto fail_topic; } - if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) { + if ((qos = create_readwrite_qos(qos_policies, *type_support->type_hash, false, "")) == nullptr) { goto fail_qos; } if ((pub->enth = dds_create_writer(dds_pub, topic, qos, listener)) < 0) { @@ -2875,7 +2909,9 @@ static CddsSubscription * create_cdds_subscription( set_error_message_from_create_topic(topic, fqtopic_name); goto fail_topic; } - if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) { + if ((qos = create_readwrite_qos( + qos_policies, *type_support->type_hash, ignore_local_publications, "")) == nullptr) + { goto fail_qos; } if ((sub->enth = dds_create_reader(dds_sub, topic, qos, listener)) < 0) { @@ -4740,6 +4776,10 @@ static rmw_ret_t rmw_init_cs( auto sub = std::make_unique(); std::string subtopic_name, pubtopic_name; void * pub_type_support, * sub_type_support; + dds_qos_t * pub_qos, * sub_qos; + const rosidl_type_hash_t * pub_type_hash; + const rosidl_type_hash_t * sub_type_hash; + std::string user_data; std::unique_ptr pub_msg_ts, sub_msg_ts; @@ -4752,8 +4792,10 @@ static rmw_ret_t rmw_init_cs( sub_type_support = create_request_type_support( type_support->data, type_support->typesupport_identifier); + sub_type_hash = type_supports->request_typesupport->type_hash; pub_type_support = create_response_type_support( type_support->data, type_support->typesupport_identifier); + pub_type_hash = type_supports->response_typesupport->type_hash; subtopic_name = make_fqtopic(ROS_SERVICE_REQUESTER_PREFIX, service_name, "Request", qos_policies); pubtopic_name = make_fqtopic(ROS_SERVICE_RESPONSE_PREFIX, service_name, "Reply", qos_policies); @@ -4763,8 +4805,10 @@ static rmw_ret_t rmw_init_cs( pub_type_support = create_request_type_support( type_support->data, type_support->typesupport_identifier); + pub_type_hash = type_supports->request_typesupport->type_hash; sub_type_support = create_response_type_support( type_support->data, type_support->typesupport_identifier); + sub_type_hash = type_supports->response_typesupport->type_hash; pubtopic_name = make_fqtopic(ROS_SERVICE_REQUESTER_PREFIX, service_name, "Request", qos_policies); subtopic_name = make_fqtopic(ROS_SERVICE_RESPONSE_PREFIX, service_name, "Reply", qos_policies); @@ -4798,28 +4842,32 @@ static rmw_ret_t rmw_init_cs( set_error_message_from_create_topic(subtopic, subtopic_name); goto fail_subtopic; } - // before proceeding to outright ignore given QoS policies, sanity check them - dds_qos_t * qos; - if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) { - goto fail_qos; - } // store a unique identifier for this client/service in the user // data of the reader and writer so that we can always determine // which pairs belong together get_unique_csid(node, cs->id); - { - std::string user_data = std::string(is_service ? "serviceid=" : "clientid=") + csid_to_string( - cs->id) + std::string(";"); - dds_qset_userdata(qos, user_data.c_str(), user_data.size()); + user_data = std::string(is_service ? "serviceid=" : "clientid=") + csid_to_string( + cs->id) + std::string(";"); + + if ((pub_qos = create_readwrite_qos(qos_policies, *pub_type_hash, false, user_data)) == nullptr) { + goto fail_pub_qos; } - if ((pub->enth = dds_create_writer(node->context->impl->dds_pub, pubtopic, qos, nullptr)) < 0) { + if ((sub_qos = create_readwrite_qos(qos_policies, *sub_type_hash, false, user_data)) == nullptr) { + goto fail_sub_qos; + } + + if ((pub->enth = + dds_create_writer(node->context->impl->dds_pub, pubtopic, pub_qos, nullptr)) < 0) + { RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; } get_entity_gid(pub->enth, pub->gid); pub->sertype = pub_stact; - if ((sub->enth = dds_create_reader(node->context->impl->dds_sub, subtopic, qos, listener)) < 0) { + if ((sub->enth = + dds_create_reader(node->context->impl->dds_sub, subtopic, sub_qos, listener)) < 0) + { RMW_SET_ERROR_MSG("failed to create reader"); goto fail_reader; } @@ -4833,7 +4881,8 @@ static rmw_ret_t rmw_init_cs( goto fail_instance_handle; } dds_delete_listener(listener); - dds_delete_qos(qos); + dds_delete_qos(pub_qos); + dds_delete_qos(sub_qos); dds_delete(subtopic); dds_delete(pubtopic); @@ -4848,8 +4897,10 @@ static rmw_ret_t rmw_init_cs( fail_reader: dds_delete(pub->enth); fail_writer: - dds_delete_qos(qos); -fail_qos: + dds_delete_qos(sub_qos); +fail_sub_qos: + dds_delete_qos(pub_qos); +fail_pub_qos: dds_delete(subtopic); fail_subtopic: dds_delete(pubtopic);