diff --git a/rmw_fastrtps_cpp/CMakeLists.txt b/rmw_fastrtps_cpp/CMakeLists.txt index c30caa0e15..fdf14d266f 100644 --- a/rmw_fastrtps_cpp/CMakeLists.txt +++ b/rmw_fastrtps_cpp/CMakeLists.txt @@ -39,8 +39,8 @@ find_package(tracetools REQUIRED) find_package(fastrtps_cmake_module REQUIRED) find_package(fastcdr REQUIRED CONFIG) -find_package(fastrtps 2.6.2 REQUIRED CONFIG) -find_package(FastRTPS 2.6.2 REQUIRED MODULE) +find_package(fastrtps 2.14.2 REQUIRED CONFIG) +find_package(FastRTPS 2.14.2 REQUIRED MODULE) find_package(rmw REQUIRED) find_package(rosidl_runtime_c REQUIRED) diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport.hpp index 5f431bc46c..7414c90744 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport.hpp @@ -25,7 +25,7 @@ namespace rmw_fastrtps_cpp class MessageTypeSupport : public TypeSupport { public: - explicit MessageTypeSupport(const message_type_support_callbacks_t * members); + explicit MessageTypeSupport(const message_type_support_callbacks_t * members, uint8_t abi_version); }; } // namespace rmw_fastrtps_cpp diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp index 1a9a52bf73..34adbefe3f 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp @@ -31,6 +31,8 @@ namespace rmw_fastrtps_cpp { +uint8_t get_type_support_abi_version(const char * identifier); + class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport { public: @@ -42,13 +44,19 @@ class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override; -protected: + bool get_key_hash_from_ros_message( + void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, const void * impl) const override; + TypeSupport(); +protected: void set_members(const message_type_support_callbacks_t * members); + void set_members_v2(const message_type_support_callbacks_t * members); + private: const message_type_support_callbacks_t * members_; + const message_type_support_key_callbacks_t* key_callbacks_; bool has_data_; }; diff --git a/rmw_fastrtps_cpp/src/publisher.cpp b/rmw_fastrtps_cpp/src/publisher.cpp index d6cce9c402..e8d1a3b1ad 100644 --- a/rmw_fastrtps_cpp/src/publisher.cpp +++ b/rmw_fastrtps_cpp/src/publisher.cpp @@ -126,6 +126,8 @@ rmw_fastrtps_cpp::create_publisher( } } + uint8_t abi_version = get_type_support_abi_version(type_support->typesupport_identifier); + std::lock_guard lck(participant_info->entity_creation_mutex_); ///// @@ -180,7 +182,7 @@ rmw_fastrtps_cpp::create_publisher( ///// // Create the Type Support struct if (!fastdds_type) { - auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks); + auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks, abi_version); if (!tsupport) { RMW_SET_ERROR_MSG("create_publisher() failed to allocate MessageTypeSupport"); return nullptr; @@ -269,6 +271,15 @@ rmw_fastrtps_cpp::create_publisher( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->data_writer_ = publisher->create_datawriter( topic.topic, diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index 57fef8d013..296cec7101 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -320,6 +320,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->response_reader_ = subscriber->create_datareader( response_topic_desc, @@ -374,6 +383,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->request_writer_ = publisher->create_datawriter( request_topic.topic, diff --git a/rmw_fastrtps_cpp/src/rmw_serialize.cpp b/rmw_fastrtps_cpp/src/rmw_serialize.cpp index 8689e69c50..b0697b7d1c 100644 --- a/rmw_fastrtps_cpp/src/rmw_serialize.cpp +++ b/rmw_fastrtps_cpp/src/rmw_serialize.cpp @@ -40,7 +40,9 @@ rmw_serialize( } auto callbacks = static_cast(ts->data); - auto tss = MessageTypeSupport_cpp(callbacks); + + uint8_t abi_version = rmw_fastrtps_cpp::get_type_support_abi_version(type_support->typesupport_identifier); + auto tss = MessageTypeSupport_cpp(callbacks, abi_version); auto data_length = tss.getEstimatedSerializedSize(ros_message, callbacks); if (serialized_message->buffer_capacity < data_length) { if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) { @@ -78,7 +80,9 @@ rmw_deserialize( } auto callbacks = static_cast(ts->data); - auto tss = MessageTypeSupport_cpp(callbacks); + + uint8_t abi_version = rmw_fastrtps_cpp::get_type_support_abi_version(type_support->typesupport_identifier); + auto tss = MessageTypeSupport_cpp(callbacks, abi_version); eprosima::fastcdr::FastBuffer buffer( reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index 88b32262b5..9238f088eb 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -319,6 +319,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->request_reader_ = subscriber->create_datareader( request_topic_desc, @@ -377,6 +386,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->response_writer_ = publisher->create_datawriter( response_topic.topic, diff --git a/rmw_fastrtps_cpp/src/subscription.cpp b/rmw_fastrtps_cpp/src/subscription.cpp index 737104fc78..1b736f2002 100644 --- a/rmw_fastrtps_cpp/src/subscription.cpp +++ b/rmw_fastrtps_cpp/src/subscription.cpp @@ -123,6 +123,8 @@ create_subscription( } } + uint8_t abi_version = get_type_support_abi_version(type_support->typesupport_identifier); + std::lock_guard lck(participant_info->entity_creation_mutex_); ///// @@ -178,7 +180,7 @@ create_subscription( ///// // Create the Type Support struct if (!fastdds_type) { - auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks); + auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks, abi_version); if (!tsupport) { RMW_SET_ERROR_MSG("create_subscription() failed to allocate MessageTypeSupport"); return nullptr; @@ -280,6 +282,15 @@ create_subscription( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + info->datareader_qos_ = reader_qos; // create_datareader diff --git a/rmw_fastrtps_cpp/src/type_support_common.cpp b/rmw_fastrtps_cpp/src/type_support_common.cpp index eb3d38232a..b2e958ae56 100644 --- a/rmw_fastrtps_cpp/src/type_support_common.cpp +++ b/rmw_fastrtps_cpp/src/type_support_common.cpp @@ -23,11 +23,30 @@ namespace rmw_fastrtps_cpp { +uint8_t get_type_support_abi_version(const char * identifier) +{ + uint8_t abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1; + + if (strcmp(identifier, RMW_FASTRTPS_CPP_TYPESUPPORT_C_V2) == 0 || + strcmp(identifier, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP_V2) == 0) + { + abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V2; + } + + return abi_version; +} + TypeSupport::TypeSupport() { + abi_version_ = AbiVersion::ABI_V1; m_isGetKeyDefined = false; max_size_bound_ = false; is_plain_ = false; + key_is_unbounded_ = false; + key_max_serialized_size_ = 0; + members_ = nullptr; + key_callbacks_ = nullptr; + has_data_ = false; } void TypeSupport::set_members(const message_type_support_callbacks_t * members) @@ -59,6 +78,24 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members) m_typeSize = (m_typeSize + 3) & ~3; } +void TypeSupport::set_members_v2(const message_type_support_callbacks_t * members) +{ + + set_members(members); + + if (nullptr != members->key_callbacks) + { + key_callbacks_ = members->key_callbacks; + m_isGetKeyDefined = true; + + key_max_serialized_size_ = key_callbacks_->max_serialized_size_key(key_is_unbounded_); + if (!key_is_unbounded_) + { + key_buffer_.reserve(key_max_serialized_size_); + } + } +} + size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const { if (is_plain_) { @@ -129,14 +166,90 @@ bool TypeSupport::deserializeROSmessage( return true; } -MessageTypeSupport::MessageTypeSupport(const message_type_support_callbacks_t * members) +bool TypeSupport::get_key_hash_from_ros_message( + void * ros_message, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5, + const void * impl) const +{ + assert(ros_message); + (void)impl; + + // retrieve estimated serialized size in case key is unbounded + if (key_is_unbounded_) + { + key_max_serialized_size_ = (std::max) ( + key_max_serialized_size_, + key_callbacks_->get_serialized_size_key(ros_message)); + key_buffer_.reserve(key_max_serialized_size_); + } + + eprosima::fastcdr::FastBuffer fast_buffer( + reinterpret_cast(key_buffer_.data()), + key_max_serialized_size_); + + eprosima::fastcdr::Cdr ser( + fast_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); + + key_callbacks_->cdr_serialize_key(ros_message, ser); + + const size_t max_serialized_key_length = 16; + + auto ser_length = ser.getSerializedDataLength(); + + // check for md5 + if (force_md5 || key_max_serialized_size_ > max_serialized_key_length) + { + md5_.init(); + md5_.update(key_buffer_.data(), static_cast(ser_length)); + md5_.finalize(); + + for (uint8_t i = 0; i < max_serialized_key_length; ++i) + { + ihandle->value[i] = md5_.digest[i]; + } + } + else + { + memset(ihandle->value, 0, max_serialized_key_length); + for (uint8_t i = 0; i < ser_length; ++i) + { + ihandle->value[i] = key_buffer_[i]; + } + } + + return true; +} + +MessageTypeSupport::MessageTypeSupport( + const message_type_support_callbacks_t * members, + uint8_t abi_version) { assert(members); + abi_version_ = abi_version; + std::string name = _create_type_name(members); this->setName(name.c_str()); - set_members(members); + switch (abi_version) + { + case TypeSupport::AbiVersion::ABI_V1: + { + set_members(members); + break; + } + case TypeSupport::AbiVersion::ABI_V2: + { + set_members_v2(members); + break; + } + default: + { + set_members(members); + break; + } + } } ServiceTypeSupport::ServiceTypeSupport() diff --git a/rmw_fastrtps_cpp/src/type_support_common.hpp b/rmw_fastrtps_cpp/src/type_support_common.hpp index a86d45e1d4..13874c8fed 100644 --- a/rmw_fastrtps_cpp/src/type_support_common.hpp +++ b/rmw_fastrtps_cpp/src/type_support_common.hpp @@ -18,6 +18,8 @@ #include #include +#include "rcpputils/find_and_replace.hpp" + #include "rmw/error_handling.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" @@ -33,6 +35,8 @@ #include "rosidl_typesupport_fastrtps_cpp/service_type_support.h" #define RMW_FASTRTPS_CPP_TYPESUPPORT_C rosidl_typesupport_fastrtps_c__identifier #define RMW_FASTRTPS_CPP_TYPESUPPORT_CPP rosidl_typesupport_fastrtps_cpp::typesupport_identifier +#define RMW_FASTRTPS_CPP_TYPESUPPORT_C_V2 rosidl_typesupport_fastrtps_c__identifier_v2 +#define RMW_FASTRTPS_CPP_TYPESUPPORT_CPP_V2 rosidl_typesupport_fastrtps_cpp::typesupport_identifier_v2 using MessageTypeSupport_cpp = rmw_fastrtps_cpp::MessageTypeSupport; using TypeSupport_cpp = rmw_fastrtps_cpp::TypeSupport; @@ -52,7 +56,9 @@ _create_type_name( std::string message_namespace(members->message_namespace_); std::string message_name(members->message_name_); if (!message_namespace.empty()) { - ss << message_namespace << "::"; + // Find and replace C namespace separator with C++, in case this is using C typesupport + std::string message_namespace_new = rcpputils::find_and_replace(message_namespace, "__", "::"); + ss << message_namespace_new << "::"; } ss << "dds_::" << message_name << "_"; return ss.str(); diff --git a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt index 802e4058ed..f1e11efb6c 100644 --- a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt +++ b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt @@ -38,8 +38,8 @@ find_package(rmw_fastrtps_shared_cpp REQUIRED) find_package(fastrtps_cmake_module REQUIRED) find_package(fastcdr REQUIRED CONFIG) -find_package(fastrtps 2.6.2 REQUIRED CONFIG) -find_package(FastRTPS 2.6.2 REQUIRED MODULE) +find_package(fastrtps 2.14.2 REQUIRED CONFIG) +find_package(FastRTPS 2.14.2 REQUIRED MODULE) find_package(rmw REQUIRED) find_package(rosidl_runtime_c REQUIRED) diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport.hpp index 67d8a72b89..6aa2b8123a 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport.hpp @@ -32,7 +32,7 @@ template class MessageTypeSupport : public TypeSupport { public: - MessageTypeSupport(const MembersType * members, const void * ros_type_support); + MessageTypeSupport(const MembersType * members, const void * ros_type_support, uint8_t abi_version); }; } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp index 95f48e996f..16f7540350 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp @@ -33,11 +33,12 @@ namespace rmw_fastrtps_dynamic_cpp template MessageTypeSupport::MessageTypeSupport( - const MembersType * members, const void * ros_type_support) + const MembersType * members, const void * ros_type_support, uint8_t abi_version) : TypeSupport(ros_type_support) { assert(members); this->members_ = members; + this->abi_version_ = abi_version; std::ostringstream ss; std::string message_namespace(this->members_->message_namespace_); @@ -56,10 +57,17 @@ MessageTypeSupport::MessageTypeSupport( // Encapsulation size this->m_typeSize = 4; if (this->members_->member_count_ != 0) { - this->m_typeSize += static_cast(this->calculateMaxSerializedSize(members, 0)); + this->m_typeSize += static_cast(this->calculateMaxSerializedSize(members, 0, this->key_max_serialized_size_)); } else { this->m_typeSize++; } + + if (this->key_max_serialized_size_ != 0) + { + this->m_isGetKeyDefined = true; + this->key_buffer_.reserve(this->key_max_serialized_size_); + } + // Account for RTPS submessage alignment this->m_typeSize = (this->m_typeSize + 3) & ~3; } diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport.hpp index c4ac036d7a..47b2cf4f46 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport.hpp @@ -30,14 +30,14 @@ template class RequestTypeSupport : public TypeSupport { public: - RequestTypeSupport(const ServiceMembersType * members, const void * ros_type_support); + RequestTypeSupport(const ServiceMembersType * members, const void * ros_type_support, uint8_t abi_version); }; template class ResponseTypeSupport : public TypeSupport { public: - ResponseTypeSupport(const ServiceMembersType * members, const void * ros_type_support); + ResponseTypeSupport(const ServiceMembersType * members, const void * ros_type_support, uint8_t abi_version); }; } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp index 45e3fc6028..e6002fa4e2 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp @@ -32,11 +32,12 @@ namespace rmw_fastrtps_dynamic_cpp template RequestTypeSupport::RequestTypeSupport( - const ServiceMembersType * members, const void * ros_type_support) + const ServiceMembersType * members, const void * ros_type_support, uint8_t abi_version) : TypeSupport(ros_type_support) { assert(members); this->members_ = members->request_members_; + this->abi_version_ = abi_version; std::ostringstream ss; std::string service_namespace(members->service_namespace_); @@ -55,21 +56,29 @@ RequestTypeSupport::RequestTypeSupport( // Encapsulation size this->m_typeSize = 4; if (this->members_->member_count_ != 0) { - this->m_typeSize += static_cast(this->calculateMaxSerializedSize(this->members_, 0)); + this->m_typeSize += static_cast(this->calculateMaxSerializedSize(this->members_, 0, this->key_max_serialized_size_)); } else { this->m_typeSize++; } + + if (this->key_max_serialized_size_ != 0) + { + this->m_isGetKeyDefined = true; + this->key_buffer_.reserve(this->key_max_serialized_size_); + } + // Account for RTPS submessage alignment this->m_typeSize = (this->m_typeSize + 3) & ~3; } template ResponseTypeSupport::ResponseTypeSupport( - const ServiceMembersType * members, const void * ros_type_support) + const ServiceMembersType * members, const void * ros_type_support, uint8_t abi_version) : TypeSupport(ros_type_support) { assert(members); this->members_ = members->response_members_; + this->abi_version_ = abi_version; std::ostringstream ss; std::string service_namespace(members->service_namespace_); @@ -88,10 +97,17 @@ ResponseTypeSupport::ResponseTypeSupport // Encapsulation size this->m_typeSize = 4; if (this->members_->member_count_ != 0) { - this->m_typeSize += static_cast(this->calculateMaxSerializedSize(this->members_, 0)); + this->m_typeSize += static_cast(this->calculateMaxSerializedSize(this->members_, 0, this->key_max_serialized_size_)); } else { this->m_typeSize++; } + + if (this->key_max_serialized_size_ != 0) + { + this->m_isGetKeyDefined = true; + this->key_buffer_.reserve(this->key_max_serialized_size_); + } + // Account for RTPS submessage alignment this->m_typeSize = (this->m_typeSize + 3) & ~3; } diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport.hpp index 270839275e..eb575a3e97 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport.hpp @@ -138,6 +138,10 @@ class TypeSupportProxy : public rmw_fastrtps_shared_cpp::TypeSupport bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override; + + bool get_key_hash_from_ros_message( + void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, const void * impl) const override; + }; class BaseTypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport @@ -170,10 +174,13 @@ class TypeSupport : public BaseTypeSupport bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override; + bool get_key_hash_from_ros_message( + void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, const void * impl) const override; + protected: explicit TypeSupport(const void * ros_type_support); - size_t calculateMaxSerializedSize(const MembersType * members, size_t current_alignment); + size_t calculateMaxSerializedSize(const MembersType * members, size_t current_alignment, size_t& max_key_size); const MembersType * members_; @@ -181,7 +188,8 @@ class TypeSupport : public BaseTypeSupport size_t getEstimatedSerializedSize( const MembersType * members, const void * ros_message, - size_t current_alignment) const; + size_t current_alignment, + bool compute_key_members_only = false) const; bool serializeROSmessage( eprosima::fastcdr::Cdr & ser, @@ -192,6 +200,18 @@ class TypeSupport : public BaseTypeSupport eprosima::fastcdr::Cdr & deser, const MembersType * members, void * ros_message) const; + + bool serializeKeyROSmessage( + eprosima::fastcdr::Cdr & ser, + const MembersType * members, + void * ros_message, + bool check_if_member_is_key) const; + + bool get_key_hash_from_ros_message( + const MembersType * members, + void * ros_message, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5) const; }; } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp index 899119f887..b56f44e7cf 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/TypeSupport_impl.hpp @@ -67,6 +67,8 @@ TypeSupport::TypeSupport(const void * ros_type_support) m_isGetKeyDefined = false; max_size_bound_ = false; is_plain_ = false; + key_max_serialized_size_ = 0; + key_is_unbounded_ = false; } // C++ specialization @@ -299,6 +301,163 @@ bool TypeSupport::serializeROSmessage( return true; } +template +bool TypeSupport::serializeKeyROSmessage( + eprosima::fastcdr::Cdr & ser, + const MembersType * members, + void * ros_message, + bool check_if_member_is_key) const +{ + for (uint32_t i = 0; i < members->member_count_; ++i) { + const auto member = members->members_ + i; + + if (check_if_member_is_key && + this->abi_version_ != AbiVersion::ABI_V1 && + !*(members->key_members_array_+i)) + { + continue; + } + + void * field = const_cast(static_cast(ros_message)) + member->offset_; + switch (member->type_id_) { + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL: + if (!member->is_array_) { + // don't cast to bool here because if the bool is + // uninitialized the random value can't be deserialized + ser << (*static_cast(field) ? true : false); + } else { + serialize_field(member, field, ser); + } + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BYTE: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT8: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_CHAR: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT8: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT32: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT64: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT16: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT16: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT32: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT32: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT64: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT64: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_STRING: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_MESSAGE: + { + auto sub_members = static_cast(member->members_->data); + if (!member->is_array_) { + serializeKeyROSmessage(ser, sub_members, field, false); + } else { + size_t array_size = 0; + + if (member->array_size_ && !member->is_upper_bound_) { + array_size = member->array_size_; + } else { + if (!member->size_function) { + RMW_SET_ERROR_MSG("unexpected error: size function is null"); + return false; + } + array_size = member->size_function(field); + + // Serialize length + ser << (uint32_t)array_size; + } + + if (array_size != 0 && !member->get_function) { + RMW_SET_ERROR_MSG("unexpected error: get_function function is null"); + return false; + } + for (size_t index = 0; index < array_size; ++index) { + serializeKeyROSmessage(ser, sub_members, member->get_function(field, index), false); + } + } + } + break; + default: + throw std::runtime_error("unknown type"); + } + } + + return true; +} + +template +bool TypeSupport::get_key_hash_from_ros_message( + const MembersType * members, + void * ros_message, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5) const +{ + assert(members); + assert(ros_message); + + // get estimated serialized size in case key is unbounded + if (this->key_is_unbounded_) + { + this->key_max_serialized_size_ = this->getEstimatedSerializedSize(members, ros_message, 0, true); + key_buffer_.reserve(this->key_max_serialized_size_); + } + + eprosima::fastcdr::FastBuffer buffer( + reinterpret_cast(this->key_buffer_.data()), + this->key_max_serialized_size_); + + eprosima::fastcdr::Cdr ser( + buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); + + // serialize + serializeKeyROSmessage(ser, members_, ros_message, true); + + // check for md5 + if (force_md5 || this->key_max_serialized_size_ > 16) + { + md5_.init(); + + md5_.update(this->key_buffer_.data(), static_cast(ser.getSerializedDataLength())); + + md5_.finalize(); + + for (uint8_t i = 0; i < 16; ++i) + { + ihandle->value[i] = md5_.digest[i]; + } + } + else + { + for (uint8_t i = 0; i < 16; ++i) + { + ihandle->value[i] = this->key_buffer_[i]; + } + } + + return true; +} + // C++ specialization template size_t next_field_align( @@ -465,7 +624,8 @@ template size_t TypeSupport::getEstimatedSerializedSize( const MembersType * members, const void * ros_message, - size_t current_alignment) const + size_t current_alignment, + bool compute_key_members_only) const { assert(members); assert(ros_message); @@ -475,6 +635,14 @@ size_t TypeSupport::getEstimatedSerializedSize( for (uint32_t i = 0; i < members->member_count_; ++i) { const auto member = members->members_ + i; void * field = const_cast(static_cast(ros_message)) + member->offset_; + + if (compute_key_members_only && + this->abi_version_ != AbiVersion::ABI_V1 && + !*(members->key_members_array_+i)) + { + continue; + } + switch (member->type_id_) { case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL: current_alignment = next_field_align(member, field, current_alignment); @@ -521,7 +689,7 @@ size_t TypeSupport::getEstimatedSerializedSize( { auto sub_members = static_cast(member->members_->data); if (!member->is_array_) { - current_alignment += getEstimatedSerializedSize(sub_members, field, current_alignment); + current_alignment += getEstimatedSerializedSize(sub_members, field, current_alignment, compute_key_members_only); } else { size_t array_size = 0; @@ -546,7 +714,8 @@ size_t TypeSupport::getEstimatedSerializedSize( current_alignment += getEstimatedSerializedSize( sub_members, member->get_function(field, index), - current_alignment); + current_alignment, + compute_key_members_only); } } } @@ -816,7 +985,7 @@ bool TypeSupport::deserializeROSmessage( template size_t TypeSupport::calculateMaxSerializedSize( - const MembersType * members, size_t current_alignment) + const MembersType * members, size_t current_alignment, size_t& max_serialized_key_size) { assert(members); @@ -829,6 +998,14 @@ size_t TypeSupport::calculateMaxSerializedSize( const auto * member = members->members_ + i; size_t array_size = 1; + bool member_is_key = false; + + if (this->abi_version_ != AbiVersion::ABI_V1 && + *(members->key_members_array_+i)) + { + member_is_key = true; + } + if (member->is_array_) { array_size = member->array_size_; @@ -842,6 +1019,12 @@ size_t TypeSupport::calculateMaxSerializedSize( this->is_plain_ = false; current_alignment += padding + eprosima::fastcdr::Cdr::alignment(current_alignment, padding); + + if (member_is_key) + { + max_serialized_key_size += padding + + eprosima::fastcdr::Cdr::alignment(max_serialized_key_size, padding); + } } } @@ -854,12 +1037,22 @@ size_t TypeSupport::calculateMaxSerializedSize( case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT8: last_member_size = array_size * sizeof(int8_t); current_alignment += array_size * sizeof(int8_t); + if (member_is_key) + { + max_serialized_key_size += array_size * sizeof(uint8_t) + + eprosima::fastcdr::Cdr::alignment(max_serialized_key_size, sizeof(uint8_t)); + } break; case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT16: case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT16: last_member_size = array_size * sizeof(uint16_t); current_alignment += array_size * sizeof(uint16_t) + eprosima::fastcdr::Cdr::alignment(current_alignment, sizeof(uint16_t)); + if (member_is_key) + { + max_serialized_key_size += array_size * sizeof(uint16_t) + + eprosima::fastcdr::Cdr::alignment(max_serialized_key_size, sizeof(uint16_t)); + } break; case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT32: case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT32: @@ -867,6 +1060,11 @@ size_t TypeSupport::calculateMaxSerializedSize( last_member_size = array_size * sizeof(uint32_t); current_alignment += array_size * sizeof(uint32_t) + eprosima::fastcdr::Cdr::alignment(current_alignment, sizeof(uint32_t)); + if (member_is_key) + { + max_serialized_key_size += array_size * sizeof(uint32_t) + + eprosima::fastcdr::Cdr::alignment(max_serialized_key_size, sizeof(uint32_t)); + } break; case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT64: case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT64: @@ -874,6 +1072,11 @@ size_t TypeSupport::calculateMaxSerializedSize( last_member_size = array_size * sizeof(uint64_t); current_alignment += array_size * sizeof(uint64_t) + eprosima::fastcdr::Cdr::alignment(current_alignment, sizeof(uint64_t)); + if (member_is_key) + { + max_serialized_key_size += array_size * sizeof(uint64_t) + + eprosima::fastcdr::Cdr::alignment(max_serialized_key_size, sizeof(uint64_t)); + } break; case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_STRING: case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING: @@ -886,6 +1089,13 @@ size_t TypeSupport::calculateMaxSerializedSize( current_alignment += padding + eprosima::fastcdr::Cdr::alignment(current_alignment, padding) + character_size * (member->string_upper_bound_ + 1); + if (member_is_key) + { + key_is_unbounded_ = true; + max_serialized_key_size += padding + + eprosima::fastcdr::Cdr::alignment(max_serialized_key_size, padding) + + character_size * (member->string_upper_bound_ + 1); + } } } break; @@ -893,9 +1103,16 @@ size_t TypeSupport::calculateMaxSerializedSize( { auto sub_members = static_cast(member->members_->data); for (size_t index = 0; index < array_size; ++index) { - size_t curr = calculateMaxSerializedSize(sub_members, current_alignment); + // We set the type as keyed + // if any of the parent struct members are keyed + size_t dummy_max_serialized_key_size; + size_t curr = calculateMaxSerializedSize(sub_members, current_alignment, dummy_max_serialized_key_size); current_alignment += curr; last_member_size += curr; + if (member_is_key) + { + max_serialized_key_size += curr; + } } } break; @@ -993,6 +1210,25 @@ bool TypeSupport::deserializeROSmessage( return true; } +template +bool TypeSupport::get_key_hash_from_ros_message( + void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, const void * impl) const +{ + + assert(ros_message); + assert(members_); + + bool ret = false; + + (void)impl; + if (members_->member_count_ != 0) + { + ret = TypeSupport::get_key_hash_from_ros_message(members_, ros_message, ihandle, force_md5); + } + + return ret; +} + } // namespace rmw_fastrtps_dynamic_cpp #endif // RMW_FASTRTPS_DYNAMIC_CPP__TYPESUPPORT_IMPL_HPP_ diff --git a/rmw_fastrtps_dynamic_cpp/src/client_service_common.cpp b/rmw_fastrtps_dynamic_cpp/src/client_service_common.cpp index 7677079d34..e77383ed38 100644 --- a/rmw_fastrtps_dynamic_cpp/src/client_service_common.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/client_service_common.cpp @@ -20,10 +20,12 @@ const void * get_request_ptr(const void * untyped_service_members, const char * typesupport) { - if (using_introspection_c_typesupport(typesupport)) { + uint8_t abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1; + (void)abi_version; + if (using_introspection_c_typesupport(typesupport, abi_version)) { return get_request_ptr( untyped_service_members); - } else if (using_introspection_cpp_typesupport(typesupport)) { + } else if (using_introspection_cpp_typesupport(typesupport, abi_version)) { return get_request_ptr( untyped_service_members); } @@ -34,10 +36,12 @@ get_request_ptr(const void * untyped_service_members, const char * typesupport) const void * get_response_ptr(const void * untyped_service_members, const char * typesupport) { - if (using_introspection_c_typesupport(typesupport)) { + uint8_t abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1; + (void)abi_version; + if (using_introspection_c_typesupport(typesupport, abi_version)) { return get_response_ptr( untyped_service_members); - } else if (using_introspection_cpp_typesupport(typesupport)) { + } else if (using_introspection_cpp_typesupport(typesupport, abi_version)) { return get_response_ptr( untyped_service_members); } diff --git a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp index 83c656404b..523c56686f 100644 --- a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp @@ -274,6 +274,15 @@ rmw_fastrtps_dynamic_cpp::create_publisher( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter (with publisher name to not change name policy) info->data_writer_ = publisher->create_datawriter( topic.topic, diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index 2b67a26f11..fb3269c00e 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -351,6 +351,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->response_reader_ = subscriber->create_datareader( response_topic_desc, @@ -405,6 +414,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter info->request_writer_ = publisher->create_datawriter( request_topic.topic, diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index 3f3a51f036..f11565f0fb 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -350,6 +350,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->request_reader_ = subscriber->create_datareader( request_topic_desc, @@ -408,6 +417,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter info->response_writer_ = publisher->create_datawriter( response_topic.topic, diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index 3a64262733..d6cb9d70af 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -271,6 +271,15 @@ create_subscription( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->m_isGetKeyDefined && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + eprosima::fastdds::dds::DataReaderQos original_qos = reader_qos; switch (subscription_options->require_unique_network_flow_endpoints) { default: diff --git a/rmw_fastrtps_dynamic_cpp/src/type_support_common.cpp b/rmw_fastrtps_dynamic_cpp/src/type_support_common.cpp index d60206970a..c5fb722800 100644 --- a/rmw_fastrtps_dynamic_cpp/src/type_support_common.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/type_support_common.cpp @@ -23,14 +23,35 @@ #include "type_support_common.hpp" bool -using_introspection_c_typesupport(const char * typesupport_identifier) +using_introspection_c_typesupport(const char * typesupport_identifier, uint8_t &abi_version) { - return typesupport_identifier == rosidl_typesupport_introspection_c__identifier; + bool ret = false; + if (strcmp(typesupport_identifier, rosidl_typesupport_introspection_c__identifier) == 0) + { + abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1; + ret = true; + + } else if (strcmp(typesupport_identifier, rosidl_typesupport_introspection_c__identifier_v2) == 0) + { + abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V2; + ret = true; + } + return ret; } bool -using_introspection_cpp_typesupport(const char * typesupport_identifier) +using_introspection_cpp_typesupport(const char * typesupport_identifier, uint8_t &abi_version) { - return typesupport_identifier == - rosidl_typesupport_introspection_cpp::typesupport_identifier; + bool ret = false; + if (strcmp(typesupport_identifier, rosidl_typesupport_introspection_cpp::typesupport_identifier) == 0) + { + abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1; + ret = true; + + } else if (strcmp(typesupport_identifier, rosidl_typesupport_introspection_cpp::typesupport_identifier_v2) == 0) + { + abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V2; + ret = true; + } + return ret; } diff --git a/rmw_fastrtps_dynamic_cpp/src/type_support_common.hpp b/rmw_fastrtps_dynamic_cpp/src/type_support_common.hpp index 1846b7ba5d..4dc6e05ea3 100644 --- a/rmw_fastrtps_dynamic_cpp/src/type_support_common.hpp +++ b/rmw_fastrtps_dynamic_cpp/src/type_support_common.hpp @@ -63,10 +63,12 @@ using ResponseTypeSupport_cpp = rmw_fastrtps_dynamic_cpp::ResponseTypeSupport< >; bool -using_introspection_c_typesupport(const char * typesupport_identifier); +using_introspection_c_typesupport(const char * typesupport_identifier, + uint8_t& abi_version); bool -using_introspection_cpp_typesupport(const char * typesupport_identifier); +using_introspection_cpp_typesupport(const char * typesupport_identifier, + uint8_t& abi_version); template ROSIDL_TYPESUPPORT_INTROSPECTION_CPP_LOCAL @@ -98,10 +100,12 @@ _create_type_name( const void * untyped_members, const char * typesupport) { - if (using_introspection_c_typesupport(typesupport)) { + uint8_t abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1; + (void)abi_version; + if (using_introspection_c_typesupport(typesupport, abi_version)) { return _create_type_name( untyped_members); - } else if (using_introspection_cpp_typesupport(typesupport)) { + } else if (using_introspection_cpp_typesupport(typesupport, abi_version)) { return _create_type_name( untyped_members); } diff --git a/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp b/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp index 43c3b0ac42..86312756c7 100644 --- a/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp @@ -23,6 +23,7 @@ TypeSupportProxy::TypeSupportProxy(rmw_fastrtps_shared_cpp::TypeSupport * inner_ m_typeSize = inner_type->m_typeSize; is_plain_ = inner_type->is_plain(); max_size_bound_ = inner_type->is_bounded(); + m_isGetKeyDefined = inner_type->m_isGetKeyDefined; } size_t TypeSupportProxy::getEstimatedSerializedSize( @@ -46,4 +47,11 @@ bool TypeSupportProxy::deserializeROSmessage( return type_impl->deserializeROSmessage(deser, ros_message, impl); } +bool TypeSupportProxy::get_key_hash_from_ros_message( + void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, const void * impl) const +{ + auto type_impl = static_cast(impl); + return type_impl->get_key_hash_from_ros_message(ros_message, ihandle, force_md5, impl); +} + } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_dynamic_cpp/src/type_support_registry.cpp b/rmw_fastrtps_dynamic_cpp/src/type_support_registry.cpp index d7f635b847..5d2092a956 100644 --- a/rmw_fastrtps_dynamic_cpp/src/type_support_registry.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/type_support_registry.cpp @@ -81,14 +81,18 @@ type_support_ptr TypeSupportRegistry::get_message_type_support( { auto creator_fun = [&ros_type_support]() -> type_support_ptr { - if (using_introspection_c_typesupport(ros_type_support->typesupport_identifier)) { + uint8_t abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1; + if (using_introspection_c_typesupport(ros_type_support->typesupport_identifier, abi_version)) + { auto members = static_cast( ros_type_support->data); - return new MessageTypeSupport_c(members, ros_type_support); - } else if (using_introspection_cpp_typesupport(ros_type_support->typesupport_identifier)) { + return new MessageTypeSupport_c(members, ros_type_support, abi_version); + } + else if (using_introspection_cpp_typesupport(ros_type_support->typesupport_identifier, abi_version)) + { auto members = static_cast( ros_type_support->data); - return new MessageTypeSupport_cpp(members, ros_type_support); + return new MessageTypeSupport_cpp(members, ros_type_support, abi_version); } RMW_SET_ERROR_MSG("Unknown typesupport identifier"); return nullptr; @@ -102,14 +106,18 @@ type_support_ptr TypeSupportRegistry::get_request_type_support( { auto creator_fun = [&ros_type_support]() -> type_support_ptr { - if (using_introspection_c_typesupport(ros_type_support->typesupport_identifier)) { + uint8_t abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1; + if (using_introspection_c_typesupport(ros_type_support->typesupport_identifier, abi_version)) + { auto members = static_cast( ros_type_support->data); - return new RequestTypeSupport_c(members, ros_type_support); - } else if (using_introspection_cpp_typesupport(ros_type_support->typesupport_identifier)) { + return new RequestTypeSupport_c(members, ros_type_support, abi_version); + } + else if (using_introspection_cpp_typesupport(ros_type_support->typesupport_identifier, abi_version)) + { auto members = static_cast( ros_type_support->data); - return new RequestTypeSupport_cpp(members, ros_type_support); + return new RequestTypeSupport_cpp(members, ros_type_support, abi_version); } RMW_SET_ERROR_MSG("Unknown typesupport identifier"); return nullptr; @@ -123,14 +131,18 @@ type_support_ptr TypeSupportRegistry::get_response_type_support( { auto creator_fun = [&ros_type_support]() -> type_support_ptr { - if (using_introspection_c_typesupport(ros_type_support->typesupport_identifier)) { + uint8_t abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1; + if (using_introspection_c_typesupport(ros_type_support->typesupport_identifier, abi_version)) + { auto members = static_cast( ros_type_support->data); - return new ResponseTypeSupport_c(members, ros_type_support); - } else if (using_introspection_cpp_typesupport(ros_type_support->typesupport_identifier)) { + return new ResponseTypeSupport_c(members, ros_type_support, abi_version); + } + else if (using_introspection_cpp_typesupport(ros_type_support->typesupport_identifier, abi_version)) + { auto members = static_cast( ros_type_support->data); - return new ResponseTypeSupport_cpp(members, ros_type_support); + return new ResponseTypeSupport_cpp(members, ros_type_support, abi_version); } RMW_SET_ERROR_MSG("Unknown typesupport identifier"); return nullptr; diff --git a/rmw_fastrtps_shared_cpp/CMakeLists.txt b/rmw_fastrtps_shared_cpp/CMakeLists.txt index 2790d6b93f..02b2c248e2 100644 --- a/rmw_fastrtps_shared_cpp/CMakeLists.txt +++ b/rmw_fastrtps_shared_cpp/CMakeLists.txt @@ -44,8 +44,8 @@ find_package(tracetools REQUIRED) find_package(fastrtps_cmake_module REQUIRED) find_package(fastcdr REQUIRED CONFIG) -find_package(fastrtps 2.6.2 REQUIRED CONFIG) -find_package(FastRTPS 2.6.2 REQUIRED MODULE) +find_package(fastrtps 2.14.2 REQUIRED CONFIG) +find_package(FastRTPS 2.14.2 REQUIRED MODULE) find_package(rmw REQUIRED) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp index 0a23d195ee..2d73f17b25 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp @@ -25,6 +25,7 @@ #include "fastcdr/FastBuffer.h" #include "fastcdr/Cdr.h" +#include "fastrtps/utils/md5.h" #include "rcutils/logging_macros.h" @@ -46,6 +47,13 @@ struct SerializedData class TypeSupport : public eprosima::fastdds::dds::TopicDataType { public: + + enum AbiVersion + { + ABI_V1 = 1, + ABI_V2 + }; + virtual size_t getEstimatedSerializedSize(const void * ros_message, const void * impl) const = 0; virtual bool serializeROSmessage( @@ -54,15 +62,14 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType virtual bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const = 0; + virtual bool get_key_hash_from_ros_message( + void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, const void * impl) const = 0; + RMW_FASTRTPS_SHARED_CPP_PUBLIC bool getKey( void * data, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, - bool force_md5 = false) override - { - (void)data; (void)ihandle; (void)force_md5; - return false; - } + bool force_md5 = false) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC bool serialize(void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload) override; @@ -104,8 +111,13 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType RMW_FASTRTPS_SHARED_CPP_PUBLIC TypeSupport(); + mutable uint8_t abi_version_; bool max_size_bound_; bool is_plain_; + bool key_is_unbounded_; + mutable size_t key_max_serialized_size_; + mutable MD5 md5_; + mutable std::vector key_buffer_; }; RMW_FASTRTPS_SHARED_CPP_PUBLIC diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp index 88b8e183a9..e80969242a 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp @@ -184,6 +184,22 @@ create_datareader( SubListener * listener, eprosima::fastdds::dds::DataReader ** data_reader); + +/** +* Apply specific resource limits when using keys. +* Max samples per instance is set to history depth if KEEP_LAST +* else UNLIMITED. +* +* \param[in] hitory_qos History entitiy QoS. +* \param[in, out] res_limits_qos Resource limits entitiy QoS. +* +*/ +RMW_FASTRTPS_SHARED_CPP_PUBLIC +void +apply_qos_resource_limits_for_keys( + const eprosima::fastdds::dds::HistoryQosPolicy & history_qos, + eprosima::fastdds::dds::ResourceLimitsQosPolicy & res_limits_qos); + } // namespace rmw_fastrtps_shared_cpp #endif // RMW_FASTRTPS_SHARED_CPP__UTILS_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp b/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp index bad85cbd07..0198506c6d 100644 --- a/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp +++ b/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp @@ -30,6 +30,8 @@ #include "fastrtps/types/TypeNamesGenerator.h" #include "fastrtps/types/AnnotationParameterValue.h" +#include "rcpputils/find_and_replace.hpp" + #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" #include "rmw/error_handling.h" @@ -62,6 +64,35 @@ void * TypeSupport::createData() return new eprosima::fastcdr::FastBuffer(); } +bool TypeSupport::getKey( + void * data, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5) +{ + assert(data); + + bool ret = false; + + if (!m_isGetKeyDefined) + { + return ret; + } + + auto ser_data = static_cast(data); + + if (ser_data->is_cdr_buffer) + { + // TODO + // We would need a get_key_hash_from_payload method + } + else + { + ret = this->get_key_hash_from_ros_message(ser_data->data, ihandle, force_md5, ser_data->impl); + } + + return ret; +} + bool TypeSupport::serialize( void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload) { @@ -168,15 +199,27 @@ get_type_support_introspection( type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier); if (nullptr == type_support) { - rcutils_error_string_t error_string = rcutils_get_error_string(); - rcutils_reset_error(); - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "Type support not from this implementation. Got:\n" - " %s\n" - " %s\n" - "while fetching it", - prev_error_string.str, error_string.str); - return nullptr; + type_support = + get_message_typesupport_handle( + type_supports, + rosidl_typesupport_introspection_c__identifier_v2); + if (nullptr == type_support) { + type_support = + get_message_typesupport_handle( + type_supports, + rosidl_typesupport_introspection_cpp::typesupport_identifier_v2); + if (nullptr == type_support) { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "Type support not from this implementation. Got:\n" + " %s\n" + " %s\n" + "while fetching it", + prev_error_string.str, error_string.str); + return nullptr; + } + } } } @@ -196,7 +239,8 @@ _create_type_name( std::string message_namespace(members->message_namespace_); std::string message_name(members->message_name_); if (!message_namespace.empty()) { - ss << message_namespace << "::"; + // Find and replace C namespace separator with C++, in case this is using C typesupport + message_namespace = rcpputils::find_and_replace(message_namespace, "__", "::"); } ss << "dds_::" << message_name << "_"; return ss.str(); diff --git a/rmw_fastrtps_shared_cpp/src/participant.cpp b/rmw_fastrtps_shared_cpp/src/participant.cpp index 1c9ee267a4..19f20dbe52 100644 --- a/rmw_fastrtps_shared_cpp/src/participant.cpp +++ b/rmw_fastrtps_shared_cpp/src/participant.cpp @@ -17,6 +17,9 @@ #include #include #include +#include +#include +#include #include "fastdds/dds/core/status/StatusMask.hpp" #include "fastdds/dds/domain/DomainParticipantFactory.hpp" @@ -33,6 +36,7 @@ #include "fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h" #include "rcpputils/scope_exit.hpp" +#include "rcpputils/filesystem_helper.hpp" #include "rcutils/env.h" #include "rcutils/filesystem.h" @@ -46,6 +50,124 @@ #include "rmw_dds_common/security.hpp" +#if HAVE_SECURITY +// Processor for security attributes with FILE URI +bool +process_file_uri_security_file( + const std::string & prefix, const rcpputils::fs::path & full_path, + std::string & result) +{ + if (!full_path.is_regular_file()) { + return false; + } + result = prefix + full_path.string(); + return true; +} + +// Processor for security attributes with PKCS#11 URI +bool +process_pkcs_uri_security_file( + const std::string & /*prefix*/, const rcpputils::fs::path & full_path, + std::string & result) +{ + const std::string p11_prefix("pkcs11:"); + + std::ifstream ifs(full_path.string()); + if (!ifs.is_open()) { + return false; + } + if (!(ifs >> result)) { + return false; + } + if (result.find(p11_prefix) != 0) { + return false; + } + + return true; +} + +static +bool +get_security_files( + const std::string & prefix, const std::string & secure_root, + std::unordered_map & result) +{ + using std::placeholders::_1; + using std::placeholders::_2; + using std::placeholders::_3; + using security_file_processor = + std::function; + using processor_vector = + std::vector>; + + // Key: the security attribute + // Value: ordered sequence of pairs. Each pair contains one possible file name + // for the attribute and the corresponding processor method + // Pairs are ordered by priority: the first one matching is used. + const std::unordered_map required_files{ + {"IDENTITY_CA", { + {"identity_ca.cert.pem", std::bind(process_file_uri_security_file, _1, _2, _3)}, + {"identity_ca.cert.p11", std::bind(process_pkcs_uri_security_file, _1, _2, _3)}}}, + {"CERTIFICATE", { + {"cert.pem", std::bind(process_file_uri_security_file, _1, _2, _3)}, + {"cert.p11", std::bind(process_pkcs_uri_security_file, _1, _2, _3)}}}, + {"PRIVATE_KEY", { + {"key.pem", std::bind(process_file_uri_security_file, _1, _2, _3)}, + {"key.p11", std::bind(process_pkcs_uri_security_file, _1, _2, _3)}}}, + {"PERMISSIONS_CA", { + {"permissions_ca.cert.pem", std::bind(process_file_uri_security_file, _1, _2, _3)}, + {"permissions_ca.cert.p11", std::bind(process_pkcs_uri_security_file, _1, _2, _3)}}}, + {"GOVERNANCE", { + {"governance.p7s", std::bind(process_file_uri_security_file, _1, _2, _3)}}}, + {"PERMISSIONS", { + {"permissions.p7s", std::bind(process_file_uri_security_file, _1, _2, _3)}}}, + }; + + const std::unordered_map optional_files{ + {"CRL", { + {"crl.pem", std::bind(process_file_uri_security_file, _1, _2, _3)}}} + }; + + for (const std::pair>> & el : required_files) + { + std::string attribute_value; + bool processed = false; + for (auto & proc : el.second) { + rcpputils::fs::path full_path(secure_root); + full_path /= proc.first; + if (proc.second(prefix, full_path, attribute_value)) { + processed = true; + break; + } + } + if (!processed) { + result.clear(); + return false; + } + result[el.first] = attribute_value; + } + + for (const std::pair & el : optional_files) { + std::string attribute_value; + bool processed = false; + for (auto & proc : el.second) { + rcpputils::fs::path full_path(secure_root); + full_path /= proc.first; + if (proc.second(prefix, full_path, attribute_value)) { + processed = true; + break; + } + } + if (processed) { + result[el.first] = attribute_value; + } + } + + return true; +} +#endif + // Private function to create Participant with QoS static CustomParticipantInfo * __create_participant( @@ -232,7 +354,7 @@ rmw_fastrtps_shared_cpp::create_participant( // if security_root_path provided, try to find the key and certificate files #if HAVE_SECURITY std::unordered_map security_files_paths; - if (rmw_dds_common::get_security_files( + if (get_security_files( "file://", security_options->security_root_path, security_files_paths)) { eprosima::fastrtps::rtps::PropertyPolicy property_policy; diff --git a/rmw_fastrtps_shared_cpp/src/utils.cpp b/rmw_fastrtps_shared_cpp/src/utils.cpp index f52b2b202e..d33838cb8f 100644 --- a/rmw_fastrtps_shared_cpp/src/utils.cpp +++ b/rmw_fastrtps_shared_cpp/src/utils.cpp @@ -214,5 +214,21 @@ create_datareader( return true; } +void +apply_qos_resource_limits_for_keys( + const eprosima::fastdds::dds::HistoryQosPolicy & history_qos, + eprosima::fastdds::dds::ResourceLimitsQosPolicy & res_limits_qos) +{ + res_limits_qos.max_instances = 0; + res_limits_qos.max_samples = 0; + if (history_qos.kind == eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS) + { + res_limits_qos.max_samples_per_instance = history_qos.depth; + } + else + { + res_limits_qos.max_samples_per_instance = 0; + } +} } // namespace rmw_fastrtps_shared_cpp