diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp index d0121eff0f..620f31c40d 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp @@ -44,11 +44,18 @@ class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport explicit TypeSupport(const rosidl_message_type_support_t * type_supports); + bool get_key_hash_from_ros_message( + void * ros_message, + eprosima::fastdds::rtps::InstanceHandle_t * ihandle, + bool force_md5, + const void * impl) const override; + protected: void set_members(const message_type_support_callbacks_t * members); private: const message_type_support_callbacks_t * members_; + const message_type_support_key_callbacks_t * key_callbacks_; bool has_data_; }; diff --git a/rmw_fastrtps_cpp/src/publisher.cpp b/rmw_fastrtps_cpp/src/publisher.cpp index f690916ddf..3ad3139280 100644 --- a/rmw_fastrtps_cpp/src/publisher.cpp +++ b/rmw_fastrtps_cpp/src/publisher.cpp @@ -259,6 +259,15 @@ rmw_fastrtps_cpp::create_publisher( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->data_writer_ = publisher->create_datawriter( info->topic_, diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index ccb4fa3ec7..4f052b73d6 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -328,6 +328,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->response_reader_ = subscriber->create_datareader( response_topic_desc, @@ -386,6 +395,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->request_writer_ = publisher->create_datawriter( info->request_topic_, diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index ed50828f04..0dc457abdf 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -324,6 +324,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->request_reader_ = subscriber->create_datareader( request_topic_desc, @@ -386,6 +395,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->response_writer_ = publisher->create_datawriter( info->response_topic_, diff --git a/rmw_fastrtps_cpp/src/subscription.cpp b/rmw_fastrtps_cpp/src/subscription.cpp index 5443ea1cc7..f4b4242ae5 100644 --- a/rmw_fastrtps_cpp/src/subscription.cpp +++ b/rmw_fastrtps_cpp/src/subscription.cpp @@ -392,6 +392,15 @@ __create_dynamic_subscription( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + info->datareader_qos_ = reader_qos; // create_datareader @@ -654,6 +663,15 @@ __create_subscription( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + info->datareader_qos_ = reader_qos; // create_datareader diff --git a/rmw_fastrtps_cpp/src/type_support_common.cpp b/rmw_fastrtps_cpp/src/type_support_common.cpp index 7c31a950e1..7b7f5ff2c8 100644 --- a/rmw_fastrtps_cpp/src/type_support_common.cpp +++ b/rmw_fastrtps_cpp/src/type_support_common.cpp @@ -31,6 +31,11 @@ TypeSupport::TypeSupport( is_compute_key_provided = false; max_size_bound_ = false; is_plain_ = false; + key_is_unbounded_ = false; + key_max_serialized_size_ = 0; + members_ = nullptr; + key_callbacks_ = nullptr; + has_data_ = false; } void TypeSupport::set_members(const message_type_support_callbacks_t * members) @@ -60,6 +65,16 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members) max_serialized_type_size = 4 + data_size; // Account for RTPS submessage alignment max_serialized_type_size = (max_serialized_type_size + 3) & ~3; + + if (nullptr != members->key_callbacks) { + key_callbacks_ = members->key_callbacks; + is_compute_key_provided = true; + + key_max_serialized_size_ = key_callbacks_->max_serialized_size_key(key_is_unbounded_); + if (!key_is_unbounded_) { + key_buffer_.reserve(key_max_serialized_size_); + } + } } size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const @@ -132,6 +147,55 @@ bool TypeSupport::deserializeROSmessage( return true; } +bool TypeSupport::get_key_hash_from_ros_message( + void * ros_message, + eprosima::fastdds::rtps::InstanceHandle_t * ihandle, + bool force_md5, + const void * [[maybe_unused]] impl) const +{ + assert(ros_message); + assert(ihandle); + + // retrieve estimated serialized size in case key is unbounded + if (key_is_unbounded_) { + key_max_serialized_size_ = (std::max) ( + key_max_serialized_size_, + key_callbacks_->get_serialized_size_key(ros_message)); + key_buffer_.reserve(key_max_serialized_size_); + } + + eprosima::fastcdr::FastBuffer fast_buffer( + reinterpret_cast(key_buffer_.data()), + key_max_serialized_size_); + + eprosima::fastcdr::Cdr ser( + fast_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1); + + key_callbacks_->cdr_serialize_key(ros_message, ser); + + const size_t max_serialized_key_length = 16; + + auto ser_length = ser.get_serialized_data_length(); + + // check for md5 + if (force_md5 || key_max_serialized_size_ > max_serialized_key_length) { + md5_.init(); + md5_.update(key_buffer_.data(), static_cast(ser_length)); + md5_.finalize(); + + for (uint8_t i = 0; i < max_serialized_key_length; ++i) { + ihandle->value[i] = md5_.digest[i]; + } + } else { + memset(ihandle->value, 0, max_serialized_key_length); + for (uint8_t i = 0; i < ser_length; ++i) { + ihandle->value[i] = key_buffer_[i]; + } + } + + return true; +} + MessageTypeSupport::MessageTypeSupport( const message_type_support_callbacks_t * members, const rosidl_message_type_support_t * type_supports) diff --git a/rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp index 9646f2fff2..84405be569 100644 --- a/rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp @@ -61,6 +61,13 @@ MessageTypeSupport::MessageTypeSupport( } else { this->max_serialized_type_size++; } + + if (this->members_->has_any_key_member_) { + this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(members); + this->is_compute_key_provided = true; + this->key_buffer_.reserve(this->key_max_serialized_size_); + } + // Account for RTPS submessage alignment this->max_serialized_type_size = (this->max_serialized_type_size + 3) & ~3; } diff --git a/rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp index f397174cb1..bed1661af5 100644 --- a/rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp @@ -62,6 +62,13 @@ RequestTypeSupport::RequestTypeSupport( } else { this->max_serialized_type_size++; } + + if (this->members_->has_any_key_member_) { + this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(this->members_); + this->is_compute_key_provided = true; + this->key_buffer_.reserve(this->key_max_serialized_size_); + } + // Account for RTPS submessage alignment this->max_serialized_type_size = (this->max_serialized_type_size + 3) & ~3; } @@ -98,6 +105,13 @@ ResponseTypeSupport::ResponseTypeSupport } else { this->max_serialized_type_size++; } + + if (this->members_->has_any_key_member_) { + this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(this->members_); + this->is_compute_key_provided = true; + this->key_buffer_.reserve(this->key_max_serialized_size_); + } + // Account for RTPS submessage alignment this->max_serialized_type_size = (this->max_serialized_type_size + 3) & ~3; } diff --git a/rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp b/rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp index ae6b6f72c1..ffc7e5aa15 100644 --- a/rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp +++ b/rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp @@ -138,6 +138,10 @@ class TypeSupportProxy : public rmw_fastrtps_shared_cpp::TypeSupport bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override; + + bool get_key_hash_from_ros_message( + void * ros_message, eprosima::fastdds::rtps::InstanceHandle_t * ihandle, bool force_md5, + const void * impl) const override; }; class BaseTypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport @@ -185,6 +189,12 @@ class TypeSupport : public BaseTypeSupport bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override; + bool get_key_hash_from_ros_message( + void * ros_message, + eprosima::fastdds::rtps::InstanceHandle_t * ihandle, + bool force_md5, + const void * impl) const override; + protected: // Meant for messages typesupport explicit TypeSupport(const void * ros_type_support); @@ -195,24 +205,58 @@ class TypeSupport : public BaseTypeSupport const void * ros_message_type_supports); size_t calculateMaxSerializedSize(const MembersType * members, size_t current_alignment); + size_t calculateMaxSerializedKeySize(const MembersType * members); const MembersType * members_; private: + size_t calculateMaxSerializedSize( + const MembersType * members, + size_t current_alignment, + bool compute_key, + bool & is_key_unbounded); + size_t getEstimatedSerializedSize( const MembersType * members, const void * ros_message, size_t current_alignment) const; + size_t getEstimatedSerializedKeySize( + const MembersType * members, + const void * ros_message) const; + + size_t getEstimatedSerializedSize( + const MembersType * members, + const void * ros_message, + size_t current_alignment, + bool compute_key) const; + bool serializeROSmessage( eprosima::fastcdr::Cdr & ser, const MembersType * members, const void * ros_message) const; + bool serializeKeyROSmessage( + eprosima::fastcdr::Cdr & ser, + const MembersType * members, + const void * ros_message) const; + + bool serializeROSmessage( + eprosima::fastcdr::Cdr & ser, + const MembersType * members, + const void * ros_message, + bool compute_key) const; + bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, const MembersType * members, void * ros_message) const; + + bool get_key_hash_from_ros_message( + const MembersType * members, + void * ros_message, + eprosima::fastdds::rtps::InstanceHandle_t * ihandle, + bool force_md5) const; }; } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_dynamic_cpp/src/TypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/src/TypeSupport_impl.hpp index 37a4141104..c27ebb386d 100644 --- a/rmw_fastrtps_dynamic_cpp/src/TypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/src/TypeSupport_impl.hpp @@ -76,6 +76,7 @@ TypeSupport::TypeSupport( is_compute_key_provided = false; max_size_bound_ = false; is_plain_ = false; + key_is_unbounded_ = false; } // C++ specialization @@ -207,12 +208,36 @@ bool TypeSupport::serializeROSmessage( eprosima::fastcdr::Cdr & ser, const MembersType * members, const void * ros_message) const +{ + return serializeROSmessage(ser, members, ros_message, false); +} + +template +bool TypeSupport::serializeKeyROSmessage( + eprosima::fastcdr::Cdr & ser, + const MembersType * members, + const void * ros_message) const +{ + return serializeROSmessage(ser, members, ros_message, true); +} + +template +bool TypeSupport::serializeROSmessage( + eprosima::fastcdr::Cdr & ser, + const MembersType * members, + const void * ros_message, + bool compute_key) const { assert(members); assert(ros_message); for (uint32_t i = 0; i < members->member_count_; ++i) { const auto member = members->members_ + i; + + if (compute_key && !member->is_key_ && members->has_any_key_member_) { + continue; + } + void * field = const_cast(static_cast(ros_message)) + member->offset_; switch (member->type_id_) { case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL: @@ -266,7 +291,7 @@ bool TypeSupport::serializeROSmessage( { auto sub_members = static_cast(member->members_->data); if (!member->is_array_) { - serializeROSmessage(ser, sub_members, field); + serializeROSmessage(ser, sub_members, field, compute_key); } else { size_t array_size = 0; @@ -288,7 +313,9 @@ bool TypeSupport::serializeROSmessage( return false; } for (size_t index = 0; index < array_size; ++index) { - serializeROSmessage(ser, sub_members, member->get_function(field, index)); + serializeROSmessage( + ser, sub_members, member->get_function(field, index), + compute_key); } } } @@ -301,6 +328,57 @@ bool TypeSupport::serializeROSmessage( return true; } +template +bool TypeSupport::get_key_hash_from_ros_message( + const MembersType * members, + void * ros_message, + eprosima::fastdds::rtps::InstanceHandle_t * ihandle, + bool force_md5) const +{ + assert(members); + assert(ros_message); + assert(ihandle); + + // get estimated serialized size in case key is unbounded + if (this->key_is_unbounded_) { + this->key_max_serialized_size_ = + (std::max) (this->key_max_serialized_size_, + this->getEstimatedSerializedKeySize(members, ros_message)); + key_buffer_.reserve(this->key_max_serialized_size_); + } + + eprosima::fastcdr::FastBuffer buffer( + reinterpret_cast(this->key_buffer_.data()), + this->key_max_serialized_size_); + + eprosima::fastcdr::Cdr ser( + buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1); + + // serialize + serializeKeyROSmessage(ser, members_, ros_message); + + // check for md5 + if (force_md5 || this->key_max_serialized_size_ > 16) { + md5_.init(); + + md5_.update( + this->key_buffer_.data(), + static_cast(ser.get_serialized_data_length())); + + md5_.finalize(); + + for (uint8_t i = 0; i < 16; ++i) { + ihandle->value[i] = md5_.digest[i]; + } + } else { + for (uint8_t i = 0; i < 16; ++i) { + ihandle->value[i] = this->key_buffer_[i]; + } + } + + return true; +} + // C++ specialization template size_t next_field_align( @@ -470,6 +548,24 @@ size_t TypeSupport::getEstimatedSerializedSize( const MembersType * members, const void * ros_message, size_t current_alignment) const +{ + return getEstimatedSerializedSize(members, ros_message, current_alignment, false); +} + +template +size_t TypeSupport::getEstimatedSerializedKeySize( + const MembersType * members, + const void * ros_message) const +{ + return getEstimatedSerializedSize(members, ros_message, 0, true); +} + +template +size_t TypeSupport::getEstimatedSerializedSize( + const MembersType * members, + const void * ros_message, + size_t current_alignment, + bool compute_key) const { assert(members); assert(ros_message); @@ -479,6 +575,11 @@ size_t TypeSupport::getEstimatedSerializedSize( for (uint32_t i = 0; i < members->member_count_; ++i) { const auto member = members->members_ + i; void * field = const_cast(static_cast(ros_message)) + member->offset_; + + if (compute_key && !member->is_key_ && members->has_any_key_member_) { + continue; + } + switch (member->type_id_) { case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL: current_alignment = next_field_align(member, field, current_alignment); @@ -525,7 +626,9 @@ size_t TypeSupport::getEstimatedSerializedSize( { auto sub_members = static_cast(member->members_->data); if (!member->is_array_) { - current_alignment += getEstimatedSerializedSize(sub_members, field, current_alignment); + current_alignment += getEstimatedSerializedSize( + sub_members, field, current_alignment, + compute_key); } else { size_t array_size = 0; @@ -550,7 +653,8 @@ size_t TypeSupport::getEstimatedSerializedSize( current_alignment += getEstimatedSerializedSize( sub_members, member->get_function(field, index), - current_alignment); + current_alignment, + compute_key); } } } @@ -812,6 +916,24 @@ bool TypeSupport::deserializeROSmessage( template size_t TypeSupport::calculateMaxSerializedSize( const MembersType * members, size_t current_alignment) +{ + bool is_key_unbounded{false}; // unused + return calculateMaxSerializedSize(members, current_alignment, false, is_key_unbounded); +} + +template +size_t TypeSupport::calculateMaxSerializedKeySize( + const MembersType * members) +{ + return calculateMaxSerializedSize(members, 0, true, this->key_is_unbounded_); +} + +template +size_t TypeSupport::calculateMaxSerializedSize( + const MembersType * members, + size_t current_alignment, + bool compute_key, + bool & is_key_unbounded) { assert(members); @@ -824,6 +946,11 @@ size_t TypeSupport::calculateMaxSerializedSize( const auto * member = members->members_ + i; size_t array_size = 1; + + if (compute_key && !member->is_key_ && members->has_any_key_member_) { + continue; + } + if (member->is_array_) { array_size = member->array_size_; @@ -875,6 +1002,11 @@ size_t TypeSupport::calculateMaxSerializedSize( { this->max_size_bound_ = false; this->is_plain_ = false; + + if (compute_key) { + is_key_unbounded = true; + } + size_t character_size = (member->type_id_ == rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING) ? 4 : 1; size_t extra_char = @@ -890,7 +1022,9 @@ size_t TypeSupport::calculateMaxSerializedSize( { auto sub_members = static_cast(member->members_->data); for (size_t index = 0; index < array_size; ++index) { - size_t curr = calculateMaxSerializedSize(sub_members, current_alignment); + size_t curr = calculateMaxSerializedSize( + sub_members, current_alignment, compute_key, + is_key_unbounded); current_alignment += curr; last_member_size += curr; } @@ -990,6 +1124,26 @@ bool TypeSupport::deserializeROSmessage( return true; } +template +bool TypeSupport::get_key_hash_from_ros_message( + void * ros_message, + eprosima::fastdds::rtps::InstanceHandle_t * ihandle, + bool force_md5, + const void * [[maybe_unused]] impl) const +{ + assert(ros_message); + assert(ihandle); + assert(members_); + + bool ret = false; + + if (members_->member_count_ != 0) { + ret = TypeSupport::get_key_hash_from_ros_message(members_, ros_message, ihandle, force_md5); + } + + return ret; +} + } // namespace rmw_fastrtps_dynamic_cpp #endif // TYPESUPPORT_IMPL_HPP_ diff --git a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp index 68ce03a0b3..5f3168e6cc 100644 --- a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp @@ -271,6 +271,15 @@ rmw_fastrtps_dynamic_cpp::create_publisher( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter (with publisher name to not change name policy) info->data_writer_ = publisher->create_datawriter( info->topic_, diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index 32e9e26616..483c03cd42 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -357,6 +357,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->response_reader_ = subscriber->create_datareader( response_topic_desc, @@ -415,6 +424,15 @@ rmw_create_client( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter info->request_writer_ = publisher->create_datawriter( info->request_topic_, diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index a93c14d345..3c6201b182 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -353,6 +353,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (request_fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + // Creates DataReader info->request_reader_ = subscriber->create_datareader( request_topic_desc, @@ -415,6 +424,15 @@ rmw_create_service( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (response_fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + writer_qos.history(), + writer_qos.resource_limits()); + } + // Creates DataWriter info->response_writer_ = publisher->create_datawriter( info->response_topic_, diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index 64afb9229b..4c7f2acd9f 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -272,6 +272,15 @@ create_subscription( return nullptr; } + // Apply resource limits QoS if the type is keyed + if (fastdds_type->is_compute_key_provided && + !participant_info->leave_middleware_default_qos) + { + rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys( + reader_qos.history(), + reader_qos.resource_limits()); + } + eprosima::fastdds::dds::DataReaderQos original_qos = reader_qos; switch (subscription_options->require_unique_network_flow_endpoints) { default: diff --git a/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp b/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp index 3f7c8123da..335f15dd06 100644 --- a/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp @@ -24,6 +24,8 @@ TypeSupportProxy::TypeSupportProxy(rmw_fastrtps_shared_cpp::TypeSupport * inner_ max_serialized_type_size = inner_type->max_serialized_type_size; is_plain_ = inner_type->is_plain(eprosima::fastdds::dds::XCDR_DATA_REPRESENTATION); max_size_bound_ = inner_type->is_bounded(); + is_compute_key_provided = inner_type->is_compute_key_provided; + key_is_unbounded_ = inner_type->is_key_unbounded(); } size_t TypeSupportProxy::getEstimatedSerializedSize( @@ -47,4 +49,14 @@ bool TypeSupportProxy::deserializeROSmessage( return type_impl->deserializeROSmessage(deser, ros_message, impl); } +bool TypeSupportProxy::get_key_hash_from_ros_message( + void * ros_message, + eprosima::fastdds::rtps::InstanceHandle_t * ihandle, + bool force_md5, + const void * impl) const +{ + auto type_impl = static_cast(impl); + return type_impl->get_key_hash_from_ros_message(ros_message, ihandle, force_md5, impl); +} + } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp index 476bf160db..20726828c6 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp @@ -17,6 +17,7 @@ #include #include +#include #include "fastdds/dds/topic/TopicDataType.hpp" #include "fastdds/rtps/common/InstanceHandle.hpp" @@ -24,6 +25,7 @@ #include "fastcdr/FastBuffer.h" #include "fastcdr/Cdr.h" +#include "fastdds/utils/md5.hpp" #include "rcutils/logging_macros.h" @@ -60,15 +62,17 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType virtual bool deserializeROSmessage( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const = 0; + virtual bool get_key_hash_from_ros_message( + void * ros_message, + eprosima::fastdds::rtps::InstanceHandle_t * ihandle, + bool force_md5, + const void * impl) const = 0; + RMW_FASTRTPS_SHARED_CPP_PUBLIC bool compute_key( const void * const data, eprosima::fastdds::rtps::InstanceHandle_t & ihandle, - bool force_md5 = false) override - { - (void)data; (void)ihandle; (void)force_md5; - return false; - } + bool force_md5 = false) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC bool compute_key( @@ -124,6 +128,12 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType return type_supports_; } + RMW_FASTRTPS_SHARED_CPP_PUBLIC + inline bool is_key_unbounded() const + { + return key_is_unbounded_; + } + RMW_FASTRTPS_SHARED_CPP_PUBLIC virtual ~TypeSupport() {} @@ -136,6 +146,11 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType bool max_size_bound_ {false}; bool is_plain_ {false}; const rosidl_message_type_support_t * type_supports_ {nullptr}; + bool key_is_unbounded_ {false}; + mutable size_t key_max_serialized_size_ {0}; + mutable eprosima::fastdds::MD5 md5_; + mutable std::vector key_buffer_; + mutable std::mutex mtx_; }; } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp index 9fd4bc9258..6a501c71da 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp @@ -126,6 +126,21 @@ create_datareader( CustomDataReaderListener * listener, eprosima::fastdds::dds::DataReader ** data_reader); +/** +* Apply specific resource limits when using keys. +* Max samples per instance is set to history depth if KEEP_LAST +* else UNLIMITED. +* +* \param[in] history_qos History entitiy QoS. +* \param[in, out] res_limits_qos Resource limits entitiy QoS. +* +*/ +RMW_FASTRTPS_SHARED_CPP_PUBLIC +void +apply_qos_resource_limits_for_keys( + const eprosima::fastdds::dds::HistoryQosPolicy & history_qos, + eprosima::fastdds::dds::ResourceLimitsQosPolicy & res_limits_qos); + } // namespace rmw_fastrtps_shared_cpp #endif // RMW_FASTRTPS_SHARED_CPP__UTILS_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp b/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp index 04539c3dbd..206faf3d08 100644 --- a/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp +++ b/rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp @@ -66,6 +66,58 @@ void * TypeSupport::create_data() return new eprosima::fastcdr::FastBuffer(); } +bool TypeSupport::compute_key( + const void * const data, + eprosima::fastdds::rtps::InstanceHandle_t & ihandle, + bool force_md5) +{ + assert(data); + + bool ret = false; + + if (!is_compute_key_provided) { + return ret; + } + + auto ser_data = static_cast(data); + + switch (ser_data->type) { + case FASTDDS_SERIALIZED_DATA_TYPE_ROS_MESSAGE: + { + std::lock_guard lock(this->mtx_); + ret = + this->get_key_hash_from_ros_message(ser_data->data, &ihandle, force_md5, ser_data->impl); + break; + } + + case FASTDDS_SERIALIZED_DATA_TYPE_CDR_BUFFER: + { + // TODO(MiguelCompany): In order to support keys in rmw_publish_serialized_message, + // we would need a get_key_hash_from_payload method + break; + } + + case FASTDDS_SERIALIZED_DATA_TYPE_DYNAMIC_MESSAGE: + { + auto m_type = std::make_shared(); + + // Retrieves the key (ihandle) from the dynamic data stored in data->data + return m_type->compute_key( + static_cast(ser_data->data), + ihandle, + force_md5); + + break; + } + default: + { + break; + } + } + + return ret; +} + bool TypeSupport::serialize( const void * const data, eprosima::fastdds::rtps::SerializedPayload_t & payload, eprosima::fastdds::dds::DataRepresentationId_t data_representation) diff --git a/rmw_fastrtps_shared_cpp/src/utils.cpp b/rmw_fastrtps_shared_cpp/src/utils.cpp index 53ee6b3812..a781020107 100644 --- a/rmw_fastrtps_shared_cpp/src/utils.cpp +++ b/rmw_fastrtps_shared_cpp/src/utils.cpp @@ -181,5 +181,18 @@ create_datareader( return true; } +void +apply_qos_resource_limits_for_keys( + const eprosima::fastdds::dds::HistoryQosPolicy & history_qos, + eprosima::fastdds::dds::ResourceLimitsQosPolicy & res_limits_qos) +{ + res_limits_qos.max_instances = 0; + res_limits_qos.max_samples = 0; + if (history_qos.kind == eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS) { + res_limits_qos.max_samples_per_instance = history_qos.depth; + } else { + res_limits_qos.max_samples_per_instance = 0; + } +} } // namespace rmw_fastrtps_shared_cpp