Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c552f6a
Refs #20164: rmw_fastrtps_shared: prepare api for keys support
Mario-DL Jan 11, 2024
c2f9d1f
Refs #20164: rmw_fastrtps_shared: getKey() implementation
Mario-DL Jan 11, 2024
cc8688a
Refs #20164: rmw_fastrtps* interface methods update for keys support
Mario-DL Jan 11, 2024
9c92a54
Refs #20164: rmw_fastrtps_cpp empty implementation
Mario-DL Jan 11, 2024
1f8c374
Refs #20164: rmw_fastrtps_dynamic_cpp Message & Service Typesupport c…
Mario-DL Jan 11, 2024
81967a1
Refs #20164: rmw_fastrtps_dynamic_cpp type_support_proxy updates impl…
Mario-DL Jan 11, 2024
b4eb1d4
Refs #20164: rmw_fastrtps_dynamic_cpp type_support implementation
Mario-DL Jan 11, 2024
5ca0a1d
Refs #20164: Move typesupport key vars to rmw_fastrtps_shared_cpp
Mario-DL Jan 30, 2024
c469246
Refs #20164: Initialize Typesupport key members in rmw_fastrtps_cpp
Mario-DL Jan 30, 2024
492580e
Refs #20310: rmw_fastrtps_shared_cpp: add abi_version enum and type s…
Mario-DL Feb 8, 2024
490641c
Refs #20310: rmw_fastrtps_cpp: adopt abi version v2
Mario-DL Feb 8, 2024
0bb4c5e
Refs #20310: rmw_fastrtps_dynamic_cpp: adopt abi version v2
Mario-DL Feb 8, 2024
14ff344
Refs #20310: rmw_fastrtps_shared_cpp: apply_qos_resource_limits_for_k…
Mario-DL Feb 8, 2024
e981e4f
Refs #20310: rmw_fastrtps_cpp: apply qos for keys
Mario-DL Feb 8, 2024
6c43312
Refs #20310: rmw_fastrtps_dynamic_cpp: apply qos for keys
Mario-DL Feb 8, 2024
bffe8d3
Refs #20310: Review suggestions
Mario-DL Feb 23, 2024
183e7de
Refs #20310: fix: include checking for the v2 identifier when trying …
Mario-DL Feb 23, 2024
0f13106
Refs #20310: rmw_fastrtps_cpp review 2 suggestion
Mario-DL Feb 27, 2024
0080627
Refs #20310: Updates according latest message_type_support structure …
Mario-DL Feb 28, 2024
31817df
Refs #20310: Compile for Fast CDR 1 only
Mario-DL Feb 29, 2024
d05909a
Refs #20310: Comment typo
Mario-DL Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace rmw_fastrtps_cpp
class MessageTypeSupport : public TypeSupport
{
public:
explicit MessageTypeSupport(const message_type_support_callbacks_t * members);
explicit MessageTypeSupport(const message_type_support_callbacks_t * members, uint8_t abi_version);
};

} // namespace rmw_fastrtps_cpp
Expand Down
8 changes: 8 additions & 0 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
namespace rmw_fastrtps_cpp
{

uint8_t get_type_support_abi_version(const char * identifier);

class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
{
public:
Expand All @@ -42,13 +44,19 @@ class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
bool deserializeROSmessage(
eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override;

bool get_key_hash_from_ros_message(
void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5, const void * impl) const override;

TypeSupport();

protected:
void set_members(const message_type_support_callbacks_t * members);

void set_members_v2(const message_type_support_callbacks_t * members);

private:
const message_type_support_callbacks_t * members_;
const message_type_support_key_callbacks_t* key_callbacks_;
bool has_data_;
};

Expand Down
13 changes: 12 additions & 1 deletion rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ rmw_fastrtps_cpp::create_publisher(
}
}

uint8_t abi_version = get_type_support_abi_version(type_support->typesupport_identifier);

std::lock_guard<std::mutex> lck(participant_info->entity_creation_mutex_);

/////
Expand Down Expand Up @@ -176,7 +178,7 @@ rmw_fastrtps_cpp::create_publisher(
/////
// Create the Type Support struct
if (!fastdds_type) {
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks, abi_version);
if (!tsupport) {
RMW_SET_ERROR_MSG("create_publisher() failed to allocate MessageTypeSupport");
return nullptr;
Expand Down Expand Up @@ -264,6 +266,15 @@ rmw_fastrtps_cpp::create_publisher(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->data_writer_ = publisher->create_datawriter(
info->topic_,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,15 @@ rmw_create_client(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (response_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

// Creates DataReader
info->response_reader_ = subscriber->create_datareader(
response_topic_desc,
Expand Down Expand Up @@ -381,6 +390,15 @@ rmw_create_client(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (request_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->request_writer_ = publisher->create_datawriter(
info->request_topic_,
Expand Down
8 changes: 6 additions & 2 deletions rmw_fastrtps_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ rmw_serialize(
}

auto callbacks = static_cast<const message_type_support_callbacks_t *>(ts->data);
auto tss = MessageTypeSupport_cpp(callbacks);

uint8_t abi_version = rmw_fastrtps_cpp::get_type_support_abi_version(type_support->typesupport_identifier);
auto tss = MessageTypeSupport_cpp(callbacks, abi_version);
auto data_length = tss.getEstimatedSerializedSize(ros_message, callbacks);
if (serialized_message->buffer_capacity < data_length) {
if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) {
Expand Down Expand Up @@ -78,7 +80,9 @@ rmw_deserialize(
}

auto callbacks = static_cast<const message_type_support_callbacks_t *>(ts->data);
auto tss = MessageTypeSupport_cpp(callbacks);

uint8_t abi_version = rmw_fastrtps_cpp::get_type_support_abi_version(type_support->typesupport_identifier);
auto tss = MessageTypeSupport_cpp(callbacks, abi_version);
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,15 @@ rmw_create_service(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (request_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

// Creates DataReader
info->request_reader_ = subscriber->create_datareader(
request_topic_desc,
Expand Down Expand Up @@ -384,6 +393,15 @@ rmw_create_service(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (response_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->response_writer_ = publisher->create_datawriter(
info->response_topic_,
Expand Down
22 changes: 21 additions & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,15 @@ __create_dynamic_subscription(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
info->datareader_qos_.history(),
info->datareader_qos_.resource_limits());
}

info->datareader_qos_ = reader_qos;

// create_datareader
Expand Down Expand Up @@ -494,6 +503,8 @@ __create_subscription(
}
}

uint8_t abi_version = get_type_support_abi_version(type_support->typesupport_identifier);

std::lock_guard<std::mutex> lck(participant_info->entity_creation_mutex_);

/////
Expand Down Expand Up @@ -549,7 +560,7 @@ __create_subscription(
/////
// Create the Type Support struct
if (!fastdds_type) {
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
auto tsupport = new (std::nothrow) MessageTypeSupport_cpp(callbacks, abi_version);
if (!tsupport) {
RMW_SET_ERROR_MSG("create_subscription() failed to allocate MessageTypeSupport");
return nullptr;
Expand Down Expand Up @@ -659,6 +670,15 @@ __create_subscription(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

info->datareader_qos_ = reader_qos;

// create_datareader
Expand Down
117 changes: 115 additions & 2 deletions rmw_fastrtps_cpp/src/type_support_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,30 @@
namespace rmw_fastrtps_cpp
{

uint8_t get_type_support_abi_version(const char * identifier)
{
uint8_t abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V1;

if (strcmp(identifier, RMW_FASTRTPS_CPP_TYPESUPPORT_C_V2) == 0 ||
strcmp(identifier, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP_V2) == 0)
{
abi_version = rmw_fastrtps_shared_cpp::TypeSupport::AbiVersion::ABI_V2;
}

return abi_version;
}

TypeSupport::TypeSupport()
{
abi_version_ = AbiVersion::ABI_V1;
m_isGetKeyDefined = false;
max_size_bound_ = false;
is_plain_ = false;
key_is_unbounded_ = false;
key_max_serialized_size_ = 0;
members_ = nullptr;
key_callbacks_ = nullptr;
has_data_ = false;
}

void TypeSupport::set_members(const message_type_support_callbacks_t * members)
Expand Down Expand Up @@ -59,6 +78,24 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members)
m_typeSize = (m_typeSize + 3) & ~3;
}

void TypeSupport::set_members_v2(const message_type_support_callbacks_t * members)
{

set_members(members);

if (nullptr != members->key_callbacks)
{
key_callbacks_ = members->key_callbacks;
m_isGetKeyDefined = true;

key_max_serialized_size_ = key_callbacks_->max_serialized_size_key(key_is_unbounded_);
if (!key_is_unbounded_)
{
key_buffer_.reserve(key_max_serialized_size_);
}
}
}

size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const
{
if (is_plain_) {
Expand Down Expand Up @@ -129,14 +166,90 @@ bool TypeSupport::deserializeROSmessage(
return true;
}

MessageTypeSupport::MessageTypeSupport(const message_type_support_callbacks_t * members)
bool TypeSupport::get_key_hash_from_ros_message(
void * ros_message,
eprosima::fastrtps::rtps::InstanceHandle_t * ihandle,
bool force_md5,
const void * impl) const
{
assert(ros_message);
(void)impl;

// retrieve estimated serialized size in case key is unbounded
if (key_is_unbounded_)
{
key_max_serialized_size_ = (std::max) (
key_max_serialized_size_,
key_callbacks_->get_serialized_size_key(ros_message));
key_buffer_.reserve(key_max_serialized_size_);
}

eprosima::fastcdr::FastBuffer fast_buffer(
reinterpret_cast<char *>(key_buffer_.data()),
key_max_serialized_size_);

eprosima::fastcdr::Cdr ser(
fast_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);

key_callbacks_->cdr_serialize_key(ros_message, ser);

const size_t max_serialized_key_length = 16;

auto ser_length = ser.getSerializedDataLength();

// check for md5
if (force_md5 || key_max_serialized_size_ > max_serialized_key_length)
{
md5_.init();
md5_.update(key_buffer_.data(), static_cast<unsigned int>(ser_length));
md5_.finalize();

for (uint8_t i = 0; i < max_serialized_key_length; ++i)
{
ihandle->value[i] = md5_.digest[i];
}
}
else
{
memset(ihandle->value, 0, max_serialized_key_length);
for (uint8_t i = 0; i < ser_length; ++i)
{
ihandle->value[i] = key_buffer_[i];
}
}

return true;
}

MessageTypeSupport::MessageTypeSupport(
const message_type_support_callbacks_t * members,
uint8_t abi_version)
{
assert(members);

abi_version_ = abi_version;

std::string name = _create_type_name(members);
this->setName(name.c_str());

set_members(members);
switch (abi_version)
{
case TypeSupport::AbiVersion::ABI_V1:
{
set_members(members);
break;
}
case TypeSupport::AbiVersion::ABI_V2:
{
set_members_v2(members);
break;
}
default:
{
set_members(members);
break;
}
}
}

ServiceTypeSupport::ServiceTypeSupport()
Expand Down
2 changes: 2 additions & 0 deletions rmw_fastrtps_cpp/src/type_support_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "rosidl_typesupport_fastrtps_cpp/service_type_support.h"
#define RMW_FASTRTPS_CPP_TYPESUPPORT_C rosidl_typesupport_fastrtps_c__identifier
#define RMW_FASTRTPS_CPP_TYPESUPPORT_CPP rosidl_typesupport_fastrtps_cpp::typesupport_identifier
#define RMW_FASTRTPS_CPP_TYPESUPPORT_C_V2 rosidl_typesupport_fastrtps_c__identifier_v2
#define RMW_FASTRTPS_CPP_TYPESUPPORT_CPP_V2 rosidl_typesupport_fastrtps_cpp::typesupport_identifier_v2

using MessageTypeSupport_cpp = rmw_fastrtps_cpp::MessageTypeSupport;
using TypeSupport_cpp = rmw_fastrtps_cpp::TypeSupport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ template<typename MembersType>
class MessageTypeSupport : public TypeSupport<MembersType>
{
public:
MessageTypeSupport(const MembersType * members, const void * ros_type_support);
MessageTypeSupport(const MembersType * members, const void * ros_type_support, uint8_t abi_version);
};

} // namespace rmw_fastrtps_dynamic_cpp
Expand Down
Loading