diff --git a/.gitignore b/.gitignore index 826e2b4..eb85c24 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ # This file is used to ignore files which are generated # ---------------------------------------------------------------------------- - +.vscode *~ *.autosave *.a diff --git a/data_tamer_cpp/include/data_tamer/channel.hpp b/data_tamer_cpp/include/data_tamer/channel.hpp index 6cee1a5..4e527bf 100644 --- a/data_tamer_cpp/include/data_tamer/channel.hpp +++ b/data_tamer_cpp/include/data_tamer/channel.hpp @@ -153,6 +153,16 @@ class LogChannel : public std::enable_shared_from_this */ void addDataSink(std::shared_ptr sink); + /** + * @brief removeDataSink remove a sink, i.e. a class collecting our snapshots. + */ + void removeDataSink(std::shared_ptr sink); + + /** + * @brief getNumberOfSink returns the number of registered sinks. + */ + size_t getNumberOfSinks() const; + /** * @brief takeSnapshot copies the current value of all your registered values * and send an instance of Snapshot to all your Sinks. diff --git a/data_tamer_cpp/src/channel.cpp b/data_tamer_cpp/src/channel.cpp index d0de225..18c59f2 100644 --- a/data_tamer_cpp/src/channel.cpp +++ b/data_tamer_cpp/src/channel.cpp @@ -31,6 +31,7 @@ struct LogChannel::Pimpl Schema schema; bool logging_started = false; + mutable Mutex sinks_mutex; std::unordered_set> sinks; }; @@ -160,7 +161,29 @@ void LogChannel::unregister(const RegistrationID& id) void LogChannel::addDataSink(std::shared_ptr sink) { - _p->sinks.insert(sink); + std::lock_guard const lock_sinks(_p->sinks_mutex); + + if(!_p->logging_started) + { + _p->sinks.insert(sink); + } + else + { + sink->addChannel(_p->channel_name, _p->schema); + _p->sinks.insert(sink); + } +} + +void LogChannel::removeDataSink(std::shared_ptr sink) +{ + std::lock_guard const lock(_p->sinks_mutex); + _p->sinks.erase(sink); +} + +size_t LogChannel::getNumberOfSinks() const +{ + std::lock_guard const lock(_p->sinks_mutex); + return _p->sinks.size(); } Schema LogChannel::getSchema() const @@ -183,12 +206,16 @@ void LogChannel::addCustomType(const std::string& custom_type_name, bool LogChannel::takeSnapshot(std::chrono::nanoseconds timestamp) { { - std::lock_guard const lock(_p->mutex); - + std::lock_guard const lock_sinks(_p->sinks_mutex); if(_p->sinks.empty()) { return false; } + } + + { + std::lock_guard const lock(_p->mutex); + // update the _p->snapshot.active_mask if necessary if(_p->mask_dirty) { @@ -220,6 +247,8 @@ bool LogChannel::takeSnapshot(std::chrono::nanoseconds timestamp) { _p->logging_started = true; _p->snapshot.schema_hash = _p->schema.hash; + + std::lock_guard const lock_sinks(_p->sinks_mutex); for(auto const& sink : _p->sinks) { sink->addChannel(_p->channel_name, _p->schema); @@ -243,9 +272,12 @@ bool LogChannel::takeSnapshot(std::chrono::nanoseconds timestamp) } bool all_pushed = true; - for(auto& sink : _p->sinks) { - all_pushed &= sink->pushSnapshot(_p->snapshot); + std::lock_guard const lock_sinks(_p->sinks_mutex); + for(auto& sink : _p->sinks) + { + all_pushed &= sink->pushSnapshot(_p->snapshot); + } } return all_pushed; } diff --git a/data_tamer_cpp/tests/CMakeLists.txt b/data_tamer_cpp/tests/CMakeLists.txt index 6429aa6..34ec696 100644 --- a/data_tamer_cpp/tests/CMakeLists.txt +++ b/data_tamer_cpp/tests/CMakeLists.txt @@ -28,7 +28,8 @@ else() add_executable(datatamer_test dt_tests.cpp custom_types_tests.cpp - parser_tests.cpp) + parser_tests.cpp + add_remove_sink_tests.cpp) gtest_discover_tests(datatamer_test DISCOVERY_MODE PRE_TEST) target_include_directories(datatamer_test diff --git a/data_tamer_cpp/tests/add_remove_sink_tests.cpp b/data_tamer_cpp/tests/add_remove_sink_tests.cpp new file mode 100644 index 0000000..f33450e --- /dev/null +++ b/data_tamer_cpp/tests/add_remove_sink_tests.cpp @@ -0,0 +1,71 @@ +#include "data_tamer/channel.hpp" +#include "data_tamer/sinks/dummy_sink.hpp" + +#include +#include +#include + +using namespace DataTamer; + +void take_snapshots(std::shared_ptr channel, int count) +{ + for(int i = 0; i < count; i++) + { + channel->takeSnapshot(); + std::this_thread::sleep_for(std::chrono::microseconds(50)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + +TEST(DataTamerSinkRegistry, AddSinkIncreasesCountAndRef) +{ + auto channel = LogChannel::create("chan"); + auto sink = std::make_shared(); + channel->addDataSink(sink); + + std::vector dummyData = { 10, 11, 12 }; + channel->registerValue("valsA", &dummyData); + + ASSERT_EQ(channel->getNumberOfSinks(), 1); +} + +TEST(DataTamerSinkRegistry, SnapshotsAreRecordedWhileSinkPresent) +{ + auto channel = LogChannel::create("chan"); + auto sink = std::make_shared(); + channel->addDataSink(sink); + + std::vector dummyData = { 10, 11, 12 }; + channel->registerValue("valsA", &dummyData); + + const int snapshot_count = 10; + take_snapshots(channel, snapshot_count); + + const auto hash = channel->getSchema().hash; + ASSERT_EQ(sink->snapshots_count[hash], snapshot_count); +} + +TEST(DataTamerSinkRegistry, RemoveSinkStopsRecording) +{ + auto channel = LogChannel::create("chan"); + auto sink = std::make_shared(); + channel->addDataSink(sink); + + std::vector dummyData = { 10, 11, 12 }; + channel->registerValue("valsA", &dummyData); + + const int snapshot_count = 10; + take_snapshots(channel, snapshot_count); + + const auto hash = channel->getSchema().hash; + ASSERT_EQ(sink->snapshots_count[hash], snapshot_count); + + channel->removeDataSink(sink); + + ASSERT_EQ(channel->getNumberOfSinks(), 0); + + // Taking more snapshots, should not be recorded in the sink (i.e does not increase snapshots_count) + take_snapshots(channel, snapshot_count); + + ASSERT_EQ(sink->snapshots_count[hash], snapshot_count); +} \ No newline at end of file