Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport

explicit TypeSupport(const rosidl_message_type_support_t * type_supports);

bool get_key_hash_from_ros_message(
void * ros_message,
eprosima::fastdds::rtps::InstanceHandle_t * ihandle,
bool force_md5,
const void * impl) const override;

protected:
void set_members(const message_type_support_callbacks_t * members);

private:
const message_type_support_callbacks_t * members_;
const message_type_support_key_callbacks_t * key_callbacks_;
bool has_data_;
};

Expand Down
9 changes: 9 additions & 0 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ rmw_fastrtps_cpp::create_publisher(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->is_compute_key_provided &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->data_writer_ = publisher->create_datawriter(
info->topic_,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,15 @@ rmw_create_client(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (response_fastdds_type->is_compute_key_provided &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

// Creates DataReader
info->response_reader_ = subscriber->create_datareader(
response_topic_desc,
Expand Down Expand Up @@ -386,6 +395,15 @@ rmw_create_client(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (request_fastdds_type->is_compute_key_provided &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->request_writer_ = publisher->create_datawriter(
info->request_topic_,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ rmw_create_service(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (request_fastdds_type->is_compute_key_provided &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

// Creates DataReader
info->request_reader_ = subscriber->create_datareader(
request_topic_desc,
Expand Down Expand Up @@ -386,6 +395,15 @@ rmw_create_service(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (response_fastdds_type->is_compute_key_provided &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->response_writer_ = publisher->create_datawriter(
info->response_topic_,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,15 @@ __create_dynamic_subscription(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->is_compute_key_provided &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

info->datareader_qos_ = reader_qos;

// create_datareader
Expand Down Expand Up @@ -654,6 +663,15 @@ __create_subscription(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->is_compute_key_provided &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

info->datareader_qos_ = reader_qos;

// create_datareader
Expand Down
64 changes: 64 additions & 0 deletions rmw_fastrtps_cpp/src/type_support_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ TypeSupport::TypeSupport(
is_compute_key_provided = false;
max_size_bound_ = false;
is_plain_ = false;
key_is_unbounded_ = false;
key_max_serialized_size_ = 0;
members_ = nullptr;
key_callbacks_ = nullptr;
has_data_ = false;
}

void TypeSupport::set_members(const message_type_support_callbacks_t * members)
Expand Down Expand Up @@ -60,6 +65,16 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members)
max_serialized_type_size = 4 + data_size;
// Account for RTPS submessage alignment
max_serialized_type_size = (max_serialized_type_size + 3) & ~3;

if (nullptr != members->key_callbacks) {
key_callbacks_ = members->key_callbacks;
is_compute_key_provided = true;

key_max_serialized_size_ = key_callbacks_->max_serialized_size_key(key_is_unbounded_);
if (!key_is_unbounded_) {
key_buffer_.reserve(key_max_serialized_size_);
}
}
}

size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const
Expand Down Expand Up @@ -132,6 +147,55 @@ bool TypeSupport::deserializeROSmessage(
return true;
}

bool TypeSupport::get_key_hash_from_ros_message(
void * ros_message,
eprosima::fastdds::rtps::InstanceHandle_t * ihandle,
bool force_md5,
const void * [[maybe_unused]] impl) const
{
assert(ros_message);
assert(ihandle);

// retrieve estimated serialized size in case key is unbounded
if (key_is_unbounded_) {
key_max_serialized_size_ = (std::max) (
key_max_serialized_size_,
key_callbacks_->get_serialized_size_key(ros_message));
key_buffer_.reserve(key_max_serialized_size_);
}

eprosima::fastcdr::FastBuffer fast_buffer(
reinterpret_cast<char *>(key_buffer_.data()),
key_max_serialized_size_);

eprosima::fastcdr::Cdr ser(
fast_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1);

key_callbacks_->cdr_serialize_key(ros_message, ser);

const size_t max_serialized_key_length = 16;

auto ser_length = ser.get_serialized_data_length();

// check for md5
if (force_md5 || key_max_serialized_size_ > max_serialized_key_length) {
md5_.init();
md5_.update(key_buffer_.data(), static_cast<unsigned int>(ser_length));
md5_.finalize();

for (uint8_t i = 0; i < max_serialized_key_length; ++i) {
ihandle->value[i] = md5_.digest[i];
}
} else {
memset(ihandle->value, 0, max_serialized_key_length);
for (uint8_t i = 0; i < ser_length; ++i) {
ihandle->value[i] = key_buffer_[i];
}
}

return true;
}

MessageTypeSupport::MessageTypeSupport(
const message_type_support_callbacks_t * members,
const rosidl_message_type_support_t * type_supports)
Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(
} else {
this->max_serialized_type_size++;
}

if (this->members_->has_any_key_member_) {
this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(members);
this->is_compute_key_provided = true;
this->key_buffer_.reserve(this->key_max_serialized_size_);
}

// Account for RTPS submessage alignment
this->max_serialized_type_size = (this->max_serialized_type_size + 3) & ~3;
}
Expand Down
14 changes: 14 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ RequestTypeSupport<ServiceMembersType, MessageMembersType>::RequestTypeSupport(
} else {
this->max_serialized_type_size++;
}

if (this->members_->has_any_key_member_) {
this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(this->members_);
this->is_compute_key_provided = true;
this->key_buffer_.reserve(this->key_max_serialized_size_);
}

// Account for RTPS submessage alignment
this->max_serialized_type_size = (this->max_serialized_type_size + 3) & ~3;
}
Expand Down Expand Up @@ -98,6 +105,13 @@ ResponseTypeSupport<ServiceMembersType, MessageMembersType>::ResponseTypeSupport
} else {
this->max_serialized_type_size++;
}

if (this->members_->has_any_key_member_) {
this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(this->members_);
this->is_compute_key_provided = true;
this->key_buffer_.reserve(this->key_max_serialized_size_);
}

// Account for RTPS submessage alignment
this->max_serialized_type_size = (this->max_serialized_type_size + 3) & ~3;
}
Expand Down
44 changes: 44 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ class TypeSupportProxy : public rmw_fastrtps_shared_cpp::TypeSupport

bool deserializeROSmessage(
eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override;

bool get_key_hash_from_ros_message(
void * ros_message, eprosima::fastdds::rtps::InstanceHandle_t * ihandle, bool force_md5,
const void * impl) const override;
};

class BaseTypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
Expand Down Expand Up @@ -185,6 +189,12 @@ class TypeSupport : public BaseTypeSupport
bool deserializeROSmessage(
eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override;

bool get_key_hash_from_ros_message(
void * ros_message,
eprosima::fastdds::rtps::InstanceHandle_t * ihandle,
bool force_md5,
const void * impl) const override;

protected:
// Meant for messages typesupport
explicit TypeSupport(const void * ros_type_support);
Expand All @@ -195,24 +205,58 @@ class TypeSupport : public BaseTypeSupport
const void * ros_message_type_supports);

size_t calculateMaxSerializedSize(const MembersType * members, size_t current_alignment);
size_t calculateMaxSerializedKeySize(const MembersType * members);

const MembersType * members_;

private:
size_t calculateMaxSerializedSize(
const MembersType * members,
size_t current_alignment,
bool compute_key,
bool & is_key_unbounded);

size_t getEstimatedSerializedSize(
const MembersType * members,
const void * ros_message,
size_t current_alignment) const;

size_t getEstimatedSerializedKeySize(
const MembersType * members,
const void * ros_message) const;

size_t getEstimatedSerializedSize(
const MembersType * members,
const void * ros_message,
size_t current_alignment,
bool compute_key) const;

bool serializeROSmessage(
eprosima::fastcdr::Cdr & ser,
const MembersType * members,
const void * ros_message) const;

bool serializeKeyROSmessage(
eprosima::fastcdr::Cdr & ser,
const MembersType * members,
const void * ros_message) const;

bool serializeROSmessage(
eprosima::fastcdr::Cdr & ser,
const MembersType * members,
const void * ros_message,
bool compute_key) const;

bool deserializeROSmessage(
eprosima::fastcdr::Cdr & deser,
const MembersType * members,
void * ros_message) const;

bool get_key_hash_from_ros_message(
const MembersType * members,
void * ros_message,
eprosima::fastdds::rtps::InstanceHandle_t * ihandle,
bool force_md5) const;
};

} // namespace rmw_fastrtps_dynamic_cpp
Expand Down
Loading