From f99f12802bbc00877b288e3deab0d2ed00ff8060 Mon Sep 17 00:00:00 2001 From: Soumya Kundu Date: Sun, 10 Dec 2017 03:12:23 +0530 Subject: [PATCH 1/5] Added ParticipantBuiltInTopic listener class --- src/NodePBITListener.cpp | 133 +++++++++++++++++++++++++++++++++++++++ src/NodePBITListener.h | 69 ++++++++++++++++++++ 2 files changed, 202 insertions(+) create mode 100644 src/NodePBITListener.cpp create mode 100644 src/NodePBITListener.h diff --git a/src/NodePBITListener.cpp b/src/NodePBITListener.cpp new file mode 100644 index 0000000..28c9190 --- /dev/null +++ b/src/NodePBITListener.cpp @@ -0,0 +1,133 @@ +#include "NodePBITListener.h" + +#include +#include +#include + +#include +#include + +namespace NodeOpenDDS { +using namespace v8; + +Local copytoV8(const DDS::Time_t& src) +{ + Local stru = Nan::New(); + stru->Set(Nan::New("sec").ToLocalChecked(), Nan::New(src.sec)); + stru->Set(Nan::New("nanosec").ToLocalChecked(), + Nan::New(src.nanosec)); + return stru; +} + +Local copytoV8(const DDS::SampleInfo& src) +{ + Local stru = Nan::New(); +#define INT(X) stru->Set(Nan::New(#X).ToLocalChecked(), Nan::New(src.X)) + INT(sample_state); + INT(view_state); + INT(instance_state); + stru->Set(Nan::New("source_timestamp").ToLocalChecked(), + copytoV8(src.source_timestamp)); + INT(instance_handle); + INT(publication_handle); + INT(disposed_generation_count); + INT(no_writers_generation_count); + INT(sample_rank); + INT(generation_rank); + INT(absolute_generation_rank); +#undef INT + stru->Set(Nan::New("valid_data").ToLocalChecked(), + Nan::New(src.valid_data)); + return stru; +} + +Local toV8(const DDS::ParticipantBuiltinTopicData& src) +{ + ACE_UNUSED_ARG(src); + Local stru = Nan::New(); + + std::string str; + for (CORBA::ULong i = 0; i < src.user_data.value.length(); ++i) { + str += src.user_data.value[i]; + } + stru->Set(Nan::New("user_data").ToLocalChecked(), Nan::New(str).ToLocalChecked()); + + const v8::Local tgt(Nan::New(3)); + for (CORBA::Long i = 0; i < 3; ++i) { + tgt->Set(Nan::New(i), Nan::New(src.key.value[i])); + } + stru->Set(Nan::New("key").ToLocalChecked(), tgt); + + return stru; +} + +NodePBITListener::NodePBITListener(const Local& callback, + const DDS::ParticipantBuiltinTopicDataSeq part_data, + const DDS::SampleInfoSeq infos, + const DDS::DataReader_var& dr) + : callback_(callback) + , part_data_(part_data) + , infos_(infos) + , dr_(dr) + , async_uv_pbit_(this) +{ + uv_async_init(uv_default_loop(), &async_uv_pbit_, async_cb); +} + +NodePBITListener::~NodePBITListener() +{ +} + +void NodePBITListener::async_cb(uv_async_t* async_uv) +{ + static_cast(async_uv)->outer_->async(); +} + +void NodePBITListener::close_cb(uv_handle_t* handle_uv) +{ + static_cast((uv_async_t*)handle_uv)->outer_->_remove_ref(); +} + +void NodePBITListener::shutdown() +{ + _add_ref(); + uv_close((uv_handle_t*)&async_uv_pbit_, close_cb); +} + +void NodePBITListener::on_data_available(DDS::DataReader* dr) +{ + + DDS::ParticipantBuiltinTopicDataDataReader_var part_dr = + DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr); + + part_dr->take(part_data_, infos_, 1, DDS::NOT_READ_SAMPLE_STATE, DDS::ANY_VIEW_STATE, + DDS::ANY_INSTANCE_STATE); + + uv_async_send(&async_uv_pbit_); +} + +void NodePBITListener::async() // called from libuv event loop +{ + Nan::HandleScope scope; + + try { + const v8::Local stru = Nan::New(); + + Local argv[] = { copytoV8(infos_[0]), toV8(part_data_[0]) }; + + Local callback = Nan::New(callback_); + Nan::Callback cb(callback); + cb.Call(sizeof(argv) / sizeof(argv[0]), argv); + } catch (...) { + } +} + +void NodePBITListener::reserve(CORBA::ULong) +{ +} + +void NodePBITListener::push_back(const DDS::SampleInfo& src, const void* sample) +{ +} + +} diff --git a/src/NodePBITListener.h b/src/NodePBITListener.h new file mode 100644 index 0000000..9db9385 --- /dev/null +++ b/src/NodePBITListener.h @@ -0,0 +1,69 @@ +#ifndef OPENDDS_NODEPBITLISTENER_H +#define OPENDDS_NODEPBITLISTENER_H + +#include + +#include + +#include +#include +#include + +#include +#include +#include + +namespace NodeOpenDDS { + + class NodePBITListener + : public virtual OpenDDS::DCPS::LocalObject + , private OpenDDS::DCPS::AbstractSamples { + public: + NodePBITListener(const v8::Local& callback, + const DDS::ParticipantBuiltinTopicDataSeq part_data, + const DDS::SampleInfoSeq infos, const DDS::DataReader_var& dr); + ~NodePBITListener(); + void shutdown(); + + private: + static void async_cb(uv_async_t* async_uv); + static void close_cb(uv_handle_t* handle_uv); + + typedef DDS::RequestedDeadlineMissedStatus RDMStatus; + void on_requested_deadline_missed(DDS::DataReader*, const RDMStatus&) {} + typedef DDS::RequestedIncompatibleQosStatus RIQStatus; + void on_requested_incompatible_qos(DDS::DataReader*, const RIQStatus&) {} + void on_sample_rejected(DDS::DataReader*, + const DDS::SampleRejectedStatus&) {} + void on_liveliness_changed(DDS::DataReader*, + const DDS::LivelinessChangedStatus&) {} + void on_subscription_matched(DDS::DataReader*, + const DDS::SubscriptionMatchedStatus&) {} + void on_sample_lost(DDS::DataReader*, const DDS::SampleLostStatus&) {} + + void on_data_available(DDS::DataReader*); + + void async(); // called from libuv event loop + + Nan::Persistent callback_; + const DDS::DataReader_var& dr_; + + DDS::ParticipantBuiltinTopicDataSeq part_data_; + DDS::SampleInfoSeq infos_; + + struct AsyncUvN : uv_async_t { + explicit AsyncUvN(NodePBITListener* outer) : outer_(outer) {} + NodePBITListener* outer_; + } async_uv_pbit_; + + NodePBITListener(const NodePBITListener&); + NodePBITListener& operator=(const NodePBITListener&); + + void reserve(CORBA::ULong); + void push_back(const DDS::SampleInfo& src, const void* sample); + + }; + +} + +#endif From 557962ecf3de69c686669f9b88da0fb406967f2d Mon Sep 17 00:00:00 2001 From: Soumya Kundu Date: Sun, 10 Dec 2017 03:13:49 +0530 Subject: [PATCH 2/5] Added function for participant JS object for ParticipantBuiltInTopic reader --- src/node-opendds.cpp | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/node-opendds.cpp b/src/node-opendds.cpp index bc55f77..0eb800b 100644 --- a/src/node-opendds.cpp +++ b/src/node-opendds.cpp @@ -43,6 +43,7 @@ namespace { void delete_participant(const Nan::FunctionCallbackInfo& fci); void subscribe(const Nan::FunctionCallbackInfo& fci); void unsubscribe(const Nan::FunctionCallbackInfo& fci); + void subscribe_participant_topic(const Nan::FunctionCallbackInfo& fci); void initialize(const Nan::FunctionCallbackInfo& fci) { @@ -96,6 +97,7 @@ namespace { ot->SetInternalFieldCount(1); Nan::SetMethod(ot, "subscribe", subscribe); Nan::SetMethod(ot, "unsubscribe", unsubscribe); + Nan::SetMethod(ot, "subscribe_participant_topic", subscribe_participant_topic); const Local obj = ot->NewInstance(); Nan::SetInternalFieldPointer(obj, 0, dp._retn()); fci.GetReturnValue().Set(obj); @@ -248,6 +250,37 @@ namespace { fci.GetReturnValue().Set(obj); } + void subscribe_participant_topic(const Nan::FunctionCallbackInfo& fci) { + if (fci.Length() < 1) { + Nan::ThrowTypeError("At least 1 argument required"); + fci.GetReturnValue().SetUndefined(); + return; + } + if (!fci[fci.Length() - 1]->IsFunction()) { + Nan::ThrowTypeError("Last argument must be a function"); + fci.GetReturnValue().SetUndefined(); + return; + } + void* const internal = Nan::GetInternalFieldPointer(fci.This(), 0); + DDS::DomainParticipant* const dp = + static_cast(internal); + + DDS::Subscriber_var bit_subscriber = dp->get_builtin_subscriber() ; + DDS::DataReader_var dr = + bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC); + + DDS::ParticipantBuiltinTopicDataSeq part_data; + DDS::SampleInfoSeq infos; + + Local cb = fci[fci.Length() - 1]; + NodePBITListener* const npbitl = new NodePBITListener(cb.As(), part_data, infos, dr); + const DDS::DataReaderListener_var listen(npbitl); + + dr->set_listener(listen.in(), OpenDDS::DCPS::DEFAULT_STATUS_MASK); + + fci.GetReturnValue().SetUndefined(); + } + void unsubscribe(const Nan::FunctionCallbackInfo& fci) { if (fci.Length() < 1 || !fci[0]->IsObject()) { From 707f2bb9ade525b80bcd0618bfcd363bac39106a Mon Sep 17 00:00:00 2001 From: Soumya Kundu Date: Sun, 10 Dec 2017 03:14:44 +0530 Subject: [PATCH 3/5] Added files to binding.gyp and usage function in test.js --- binding.gyp | 1 + test/test.js | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/binding.gyp b/binding.gyp index cb3b6e6..c989bdb 100644 --- a/binding.gyp +++ b/binding.gyp @@ -38,6 +38,7 @@ 'target_name': 'node_opendds', 'sources': [ 'src/node-opendds.cpp', 'src/NodeDRListener.cpp', + 'src/NodePBITListener.cpp', 'src/NodeQosConversion.cpp' ], 'include_dirs': [ " Date: Sun, 10 Dec 2017 12:21:57 +0530 Subject: [PATCH 4/5] Missed include --- src/node-opendds.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/node-opendds.cpp b/src/node-opendds.cpp index 0eb800b..9df1d3b 100644 --- a/src/node-opendds.cpp +++ b/src/node-opendds.cpp @@ -1,4 +1,5 @@ #include "NodeDRListener.h" +#include "NodePBITListener.h" #include "NodeQosConversion.h" #include @@ -18,6 +19,7 @@ using namespace v8; using OpenDDS::DCPS::Data_Types_Register; using NodeOpenDDS::NodeDRListener; +using NodeOpenDDS::NodePBITListener; using NodeOpenDDS::convertQos; namespace { From 10ae0c4b4aaa931797ca1fcbaa1077b8a758c761 Mon Sep 17 00:00:00 2001 From: Soumya Kundu Date: Wed, 20 Dec 2017 00:59:33 +0530 Subject: [PATCH 5/5] Added function to unsubscribe function for ParticipantBuiltInTopic --- src/node-opendds.cpp | 23 +++++++++++++++++++++++ test/test.js | 1 + 2 files changed, 24 insertions(+) diff --git a/src/node-opendds.cpp b/src/node-opendds.cpp index 9df1d3b..affbf85 100644 --- a/src/node-opendds.cpp +++ b/src/node-opendds.cpp @@ -46,6 +46,7 @@ namespace { void subscribe(const Nan::FunctionCallbackInfo& fci); void unsubscribe(const Nan::FunctionCallbackInfo& fci); void subscribe_participant_topic(const Nan::FunctionCallbackInfo& fci); + void unsubscribe_participant_topic(const Nan::FunctionCallbackInfo& fci); void initialize(const Nan::FunctionCallbackInfo& fci) { @@ -100,6 +101,7 @@ namespace { Nan::SetMethod(ot, "subscribe", subscribe); Nan::SetMethod(ot, "unsubscribe", unsubscribe); Nan::SetMethod(ot, "subscribe_participant_topic", subscribe_participant_topic); + Nan::SetMethod(ot, "unsubscribe_participant_topic", unsubscribe_participant_topic); const Local obj = ot->NewInstance(); Nan::SetInternalFieldPointer(obj, 0, dp._retn()); fci.GetReturnValue().Set(obj); @@ -322,6 +324,27 @@ namespace { fci.GetReturnValue().SetUndefined(); } + void unsubscribe_participant_topic(const Nan::FunctionCallbackInfo& fci) + { + void* const internal = Nan::GetInternalFieldPointer(fci.This(), 0); + DDS::DomainParticipant* const dp = + static_cast(internal); + + DDS::Subscriber_var bit_subscriber = dp->get_builtin_subscriber() ; + DDS::DataReader_var dr = + bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC); + + const DDS::DataReaderListener_var drl = dr->get_listener(); + NodePBITListener* const ndrl = dynamic_cast(drl.in()); + ndrl->shutdown(); + + dr = 0; + bit_subscriber->delete_contained_entities(); + dp->delete_subscriber(bit_subscriber); + + fci.GetReturnValue().SetUndefined(); + } + void finalize(const Nan::FunctionCallbackInfo& fci) { if (fci.Length() < 1) { diff --git a/test/test.js b/test/test.js index e98063e..e4b74b8 100644 --- a/test/test.js +++ b/test/test.js @@ -68,6 +68,7 @@ try { log('Sample Info', sinfo); if (sinfo.valid_data && sample.id === last_sample_id) { participant.unsubscribe(reader); + participant.unsubscribe_participant_topic(); } } catch (e) { console.log("Error in callback: " + e);