Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions rmw_cyclonedds_cpp/src/serdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
// limitations under the License.
#include "serdata.hpp"

#include <atomic>
#include <cinttypes>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <memory>
#include <regex>
Expand Down Expand Up @@ -143,15 +147,58 @@ static void serialize_into_serdata_rmw(serdata_rmw * d, const void * sample)
static void serialize_into_serdata_rmw_on_demand(serdata_rmw * d)
{
#ifdef DDS_HAS_SHM
static std::atomic<uint64_t> shm_serialized_view_count {0};
static std::atomic<uint64_t> shm_serialized_view_bytes {0};
static std::atomic<uint64_t> shm_serialized_copy_count {0};
static std::atomic<uint64_t> shm_serialized_copy_bytes {0};

static bool receipts_enabled = []() {
const char * v = std::getenv("RMW_CYCLONEDDS_SHM_RECEIPTS");
const bool enabled = v && v[0] && v[0] != '0';
if (enabled) {
std::atexit([]() {
const auto v_cnt = shm_serialized_view_count.load(std::memory_order_relaxed);
const auto v_b = shm_serialized_view_bytes.load(std::memory_order_relaxed);
const auto c_cnt = shm_serialized_copy_count.load(std::memory_order_relaxed);
const auto c_b = shm_serialized_copy_bytes.load(std::memory_order_relaxed);
std::fprintf(
stderr,
"rmw_cyclonedds shm receipts: serialized_view_count=%" PRIu64
" serialized_view_bytes=%" PRIu64
" serialized_copy_count=%" PRIu64
" serialized_copy_bytes=%" PRIu64
"\n",
v_cnt, v_b, c_cnt, c_b);
});
}
return enabled;
}();

auto type = const_cast<sertype_rmw *>(static_cast<const sertype_rmw *>(d->type));
{
std::lock_guard<std::mutex> lock(type->serialize_lock);
if (d->iox_chunk && d->data() == nullptr) {
if (d->iox_chunk && !d->has_data_ro()) {
auto iox_header = iceoryx_header_from_chunk(d->iox_chunk);
// if the iox chunk has the data in serialized form
if (iox_header->shm_data_state == IOX_CHUNK_CONTAINS_SERIALIZED_DATA) {
d->resize(iox_header->data_size);
memcpy(d->data(), d->iox_chunk, iox_header->data_size);
// Fail-closed: only take a zero-copy view when we don't need the extra
// padding that `resize()` adds for DDSI/CDR alignment.
if ((iox_header->data_size % 4) == 0) {
d->set_external_view(d->iox_chunk, iox_header->data_size);
if (receipts_enabled) {
shm_serialized_view_count.fetch_add(1, std::memory_order_relaxed);
shm_serialized_view_bytes.fetch_add(
static_cast<uint64_t>(iox_header->data_size), std::memory_order_relaxed);
}
} else {
d->resize(iox_header->data_size);
memcpy(d->data(), d->iox_chunk, iox_header->data_size);
if (receipts_enabled) {
shm_serialized_copy_count.fetch_add(1, std::memory_order_relaxed);
shm_serialized_copy_bytes.fetch_add(
static_cast<uint64_t>(iox_header->data_size), std::memory_order_relaxed);
}
}
} else if (iox_header->shm_data_state == IOX_CHUNK_CONTAINS_RAW_DATA) {
serialize_into_serdata_rmw(const_cast<serdata_rmw *>(d), d->iox_chunk);
} else {
Expand Down Expand Up @@ -315,7 +362,7 @@ static void serdata_rmw_to_ser(const struct ddsi_serdata * dcmn, size_t off, siz
{
auto d = static_cast<const serdata_rmw *>(dcmn);
serialize_into_serdata_rmw_on_demand(const_cast<serdata_rmw *>(d));
memcpy(buf, byte_offset(d->data(), off), sz);
memcpy(buf, byte_offset(const_cast<void *>(d->data_ro()), off), sz);
}

static struct ddsi_serdata * serdata_rmw_to_ser_ref(
Expand All @@ -324,7 +371,7 @@ static struct ddsi_serdata * serdata_rmw_to_ser_ref(
{
auto d = static_cast<const serdata_rmw *>(dcmn);
serialize_into_serdata_rmw_on_demand(const_cast<serdata_rmw *>(d));
ref->iov_base = byte_offset(d->data(), off);
ref->iov_base = byte_offset(const_cast<void *>(d->data_ro()), off);
ref->iov_len = (ddsrt_iov_len_t) sz;
return ddsi_serdata_ref(d);
}
Expand Down Expand Up @@ -354,7 +401,7 @@ static bool serdata_rmw_to_sample(
/* ROS 2 doesn't do keys in a meaningful way yet */
} else if (!type->is_request_header) {
serialize_into_serdata_rmw_on_demand(const_cast<serdata_rmw *>(d));
cycdeser sd(d->data(), d->size());
cycdeser sd(d->data_ro(), d->size());
if (using_introspection_c_typesupport(type->type_support.typesupport_identifier_)) {
auto typed_typesupport =
static_cast<MessageTypeSupport_c *>(type->type_support.type_support_);
Expand All @@ -373,7 +420,7 @@ static bool serdata_rmw_to_sample(
cdds_request_wrapper_t * const wrap = static_cast<cdds_request_wrapper_t *>(sample);
auto prefix = [wrap](cycdeser & ser) {ser >> wrap->header.guid; ser >> wrap->header.seq;};
serialize_into_serdata_rmw_on_demand(const_cast<serdata_rmw *>(d));
cycdeser sd(d->data(), d->size());
cycdeser sd(d->data_ro(), d->size());
if (using_introspection_c_typesupport(type->type_support.typesupport_identifier_)) {
auto typed_typesupport =
static_cast<MessageTypeSupport_c *>(type->type_support.type_support_);
Expand Down Expand Up @@ -428,7 +475,7 @@ static size_t serdata_rmw_print(
return static_cast<size_t>(snprintf(buf, bufsize, ":k:{}"));
} else if (!type->is_request_header) {
serialize_into_serdata_rmw_on_demand(const_cast<serdata_rmw *>(d));
cycprint sd(buf, bufsize, d->data(), d->size());
cycprint sd(buf, bufsize, d->data_ro(), d->size());
if (using_introspection_c_typesupport(type->type_support.typesupport_identifier_)) {
auto typed_typesupport =
static_cast<MessageTypeSupport_c *>(type->type_support.type_support_);
Expand All @@ -446,7 +493,7 @@ static size_t serdata_rmw_print(
auto prefix = [&wrap](cycprint & ser) {
ser >> wrap.header.guid; ser.print_constant(","); ser >> wrap.header.seq;
};
cycprint sd(buf, bufsize, d->data(), d->size());
cycprint sd(buf, bufsize, d->data_ro(), d->size());
if (using_introspection_c_typesupport(type->type_support.typesupport_identifier_)) {
auto typed_typesupport =
static_cast<MessageTypeSupport_c *>(type->type_support.type_support_);
Expand Down Expand Up @@ -721,6 +768,7 @@ struct sertype_rmw * create_sertype(

void serdata_rmw::resize(size_t requested_size)
{
clear_external_view();
if (!requested_size) {
m_size = 0;
m_data.reset();
Expand Down
20 changes: 20 additions & 0 deletions rmw_cyclonedds_cpp/src/serdata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,32 @@ class serdata_rmw : public ddsi_serdata
/* first two bytes of data is CDR encoding
second two bytes are encoding options */
std::unique_ptr<byte[]> m_data {nullptr};
// Optional: view into externally owned serialized bytes (e.g., iceoryx SHM chunk).
// When set, `data_ro()` points to this memory and `data()` stays null (read-only).
const byte * m_external_data {nullptr};
size_t m_external_size {0};

public:
serdata_rmw(const ddsi_sertype * type, ddsi_serdata_kind kind);
void resize(size_t requested_size);
size_t size() const {return m_size;}
// Writable data pointer (owned). Returns null when holding an external view.
void * data() const {return m_data.get();}
// Read-only pointer to serialized bytes. May point into externally owned memory.
const void * data_ro() const {return m_external_data ? m_external_data : m_data.get();}
bool has_data_ro() const {return m_data != nullptr || m_external_data != nullptr;}
void set_external_view(const void * data, size_t size)
{
m_data.reset();
m_external_data = static_cast<const byte *>(data);
m_external_size = size;
m_size = size;
}
void clear_external_view()
{
m_external_data = nullptr;
m_external_size = 0;
}
};

typedef struct cdds_request_header
Expand Down
8 changes: 8 additions & 0 deletions shared_memory_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ It is currently not possible to accurately check whether Shared Memory transfer

Another way to check whether Shared Memory is used is running the iceoryx introspection client, which allows tracking of various statistics of the iceoryx communication, Shared Memory utilization being one of them.

### Optional receipts (copy vs zero-copy view)

For debugging / profiling you can enable a lightweight receipt that reports, at process exit, how many times and how many bytes the RMW had to copy from a received shared-memory chunk into a heap buffer (vs. taking a zero-copy view).

```console
export RMW_CYCLONEDDS_SHM_RECEIPTS=1
```

### Building the iceoryx introspection

The introspection client is not build by default, so we need to do so manually.
Expand Down