diff --git a/rmw_cyclonedds_cpp/src/serdata.cpp b/rmw_cyclonedds_cpp/src/serdata.cpp index 93dd19f9..c6f591b8 100644 --- a/rmw_cyclonedds_cpp/src/serdata.cpp +++ b/rmw_cyclonedds_cpp/src/serdata.cpp @@ -13,6 +13,10 @@ // limitations under the License. #include "serdata.hpp" +#include +#include +#include +#include #include #include #include @@ -143,15 +147,58 @@ static void serialize_into_serdata_rmw(serdata_rmw * d, const void * sample) static void serialize_into_serdata_rmw_on_demand(serdata_rmw * d) { #ifdef DDS_HAS_SHM + static std::atomic shm_serialized_view_count {0}; + static std::atomic shm_serialized_view_bytes {0}; + static std::atomic shm_serialized_copy_count {0}; + static std::atomic shm_serialized_copy_bytes {0}; + + static bool receipts_enabled = []() { + const char * v = std::getenv("RMW_CYCLONEDDS_SHM_RECEIPTS"); + const bool enabled = v && v[0] && v[0] != '0'; + if (enabled) { + std::atexit([]() { + const auto v_cnt = shm_serialized_view_count.load(std::memory_order_relaxed); + const auto v_b = shm_serialized_view_bytes.load(std::memory_order_relaxed); + const auto c_cnt = shm_serialized_copy_count.load(std::memory_order_relaxed); + const auto c_b = shm_serialized_copy_bytes.load(std::memory_order_relaxed); + std::fprintf( + stderr, + "rmw_cyclonedds shm receipts: serialized_view_count=%" PRIu64 + " serialized_view_bytes=%" PRIu64 + " serialized_copy_count=%" PRIu64 + " serialized_copy_bytes=%" PRIu64 + "\n", + v_cnt, v_b, c_cnt, c_b); + }); + } + return enabled; + }(); + auto type = const_cast(static_cast(d->type)); { std::lock_guard lock(type->serialize_lock); - if (d->iox_chunk && d->data() == nullptr) { + if (d->iox_chunk && !d->has_data_ro()) { auto iox_header = iceoryx_header_from_chunk(d->iox_chunk); // if the iox chunk has the data in serialized form if (iox_header->shm_data_state == IOX_CHUNK_CONTAINS_SERIALIZED_DATA) { - d->resize(iox_header->data_size); - memcpy(d->data(), d->iox_chunk, iox_header->data_size); + // Fail-closed: only take a zero-copy view when we don't need the extra + // padding that `resize()` adds for DDSI/CDR alignment. + if ((iox_header->data_size % 4) == 0) { + d->set_external_view(d->iox_chunk, iox_header->data_size); + if (receipts_enabled) { + shm_serialized_view_count.fetch_add(1, std::memory_order_relaxed); + shm_serialized_view_bytes.fetch_add( + static_cast(iox_header->data_size), std::memory_order_relaxed); + } + } else { + d->resize(iox_header->data_size); + memcpy(d->data(), d->iox_chunk, iox_header->data_size); + if (receipts_enabled) { + shm_serialized_copy_count.fetch_add(1, std::memory_order_relaxed); + shm_serialized_copy_bytes.fetch_add( + static_cast(iox_header->data_size), std::memory_order_relaxed); + } + } } else if (iox_header->shm_data_state == IOX_CHUNK_CONTAINS_RAW_DATA) { serialize_into_serdata_rmw(const_cast(d), d->iox_chunk); } else { @@ -315,7 +362,7 @@ static void serdata_rmw_to_ser(const struct ddsi_serdata * dcmn, size_t off, siz { auto d = static_cast(dcmn); serialize_into_serdata_rmw_on_demand(const_cast(d)); - memcpy(buf, byte_offset(d->data(), off), sz); + memcpy(buf, byte_offset(const_cast(d->data_ro()), off), sz); } static struct ddsi_serdata * serdata_rmw_to_ser_ref( @@ -324,7 +371,7 @@ static struct ddsi_serdata * serdata_rmw_to_ser_ref( { auto d = static_cast(dcmn); serialize_into_serdata_rmw_on_demand(const_cast(d)); - ref->iov_base = byte_offset(d->data(), off); + ref->iov_base = byte_offset(const_cast(d->data_ro()), off); ref->iov_len = (ddsrt_iov_len_t) sz; return ddsi_serdata_ref(d); } @@ -354,7 +401,7 @@ static bool serdata_rmw_to_sample( /* ROS 2 doesn't do keys in a meaningful way yet */ } else if (!type->is_request_header) { serialize_into_serdata_rmw_on_demand(const_cast(d)); - cycdeser sd(d->data(), d->size()); + cycdeser sd(d->data_ro(), d->size()); if (using_introspection_c_typesupport(type->type_support.typesupport_identifier_)) { auto typed_typesupport = static_cast(type->type_support.type_support_); @@ -373,7 +420,7 @@ static bool serdata_rmw_to_sample( cdds_request_wrapper_t * const wrap = static_cast(sample); auto prefix = [wrap](cycdeser & ser) {ser >> wrap->header.guid; ser >> wrap->header.seq;}; serialize_into_serdata_rmw_on_demand(const_cast(d)); - cycdeser sd(d->data(), d->size()); + cycdeser sd(d->data_ro(), d->size()); if (using_introspection_c_typesupport(type->type_support.typesupport_identifier_)) { auto typed_typesupport = static_cast(type->type_support.type_support_); @@ -428,7 +475,7 @@ static size_t serdata_rmw_print( return static_cast(snprintf(buf, bufsize, ":k:{}")); } else if (!type->is_request_header) { serialize_into_serdata_rmw_on_demand(const_cast(d)); - cycprint sd(buf, bufsize, d->data(), d->size()); + cycprint sd(buf, bufsize, d->data_ro(), d->size()); if (using_introspection_c_typesupport(type->type_support.typesupport_identifier_)) { auto typed_typesupport = static_cast(type->type_support.type_support_); @@ -446,7 +493,7 @@ static size_t serdata_rmw_print( auto prefix = [&wrap](cycprint & ser) { ser >> wrap.header.guid; ser.print_constant(","); ser >> wrap.header.seq; }; - cycprint sd(buf, bufsize, d->data(), d->size()); + cycprint sd(buf, bufsize, d->data_ro(), d->size()); if (using_introspection_c_typesupport(type->type_support.typesupport_identifier_)) { auto typed_typesupport = static_cast(type->type_support.type_support_); @@ -721,6 +768,7 @@ struct sertype_rmw * create_sertype( void serdata_rmw::resize(size_t requested_size) { + clear_external_view(); if (!requested_size) { m_size = 0; m_data.reset(); diff --git a/rmw_cyclonedds_cpp/src/serdata.hpp b/rmw_cyclonedds_cpp/src/serdata.hpp index 243a9cdc..e2370e8c 100644 --- a/rmw_cyclonedds_cpp/src/serdata.hpp +++ b/rmw_cyclonedds_cpp/src/serdata.hpp @@ -62,12 +62,32 @@ class serdata_rmw : public ddsi_serdata /* first two bytes of data is CDR encoding second two bytes are encoding options */ std::unique_ptr m_data {nullptr}; + // Optional: view into externally owned serialized bytes (e.g., iceoryx SHM chunk). + // When set, `data_ro()` points to this memory and `data()` stays null (read-only). + const byte * m_external_data {nullptr}; + size_t m_external_size {0}; public: serdata_rmw(const ddsi_sertype * type, ddsi_serdata_kind kind); void resize(size_t requested_size); size_t size() const {return m_size;} + // Writable data pointer (owned). Returns null when holding an external view. void * data() const {return m_data.get();} + // Read-only pointer to serialized bytes. May point into externally owned memory. + const void * data_ro() const {return m_external_data ? m_external_data : m_data.get();} + bool has_data_ro() const {return m_data != nullptr || m_external_data != nullptr;} + void set_external_view(const void * data, size_t size) + { + m_data.reset(); + m_external_data = static_cast(data); + m_external_size = size; + m_size = size; + } + void clear_external_view() + { + m_external_data = nullptr; + m_external_size = 0; + } }; typedef struct cdds_request_header diff --git a/shared_memory_support.md b/shared_memory_support.md index e90976a0..7e71abe6 100644 --- a/shared_memory_support.md +++ b/shared_memory_support.md @@ -232,6 +232,14 @@ It is currently not possible to accurately check whether Shared Memory transfer Another way to check whether Shared Memory is used is running the iceoryx introspection client, which allows tracking of various statistics of the iceoryx communication, Shared Memory utilization being one of them. +### Optional receipts (copy vs zero-copy view) + +For debugging / profiling you can enable a lightweight receipt that reports, at process exit, how many times and how many bytes the RMW had to copy from a received shared-memory chunk into a heap buffer (vs. taking a zero-copy view). + +```console +export RMW_CYCLONEDDS_SHM_RECEIPTS=1 +``` + ### Building the iceoryx introspection The introspection client is not build by default, so we need to do so manually.