From e94ced81b49ba19aa08f39d824ecad00db315f74 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 4 Sep 2023 12:17:11 +0000 Subject: [PATCH 01/10] feat(SequentialCompressionWriter): Added option to set compression thread priority Signed-off-by: Janosch Machowinski --- .../compression_options.hpp | 4 ++ .../sequential_compression_writer.cpp | 48 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp index 3028f2a49d..1ae7e71fa2 100644 --- a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp +++ b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp @@ -17,6 +17,7 @@ #include #include +#include #include "visibility_control.hpp" @@ -61,6 +62,9 @@ struct CompressionOptions CompressionMode compression_mode; uint64_t compression_queue_size; uint64_t compression_threads; + /// if set, the compression thread(s) will try to set + /// the given priority for itself + std::optional thread_priority; }; } // namespace rosbag2_compression diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index c3d6277ae4..74dbb8f44b 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -33,6 +34,12 @@ #include "rosbag2_storage/storage_interfaces/read_write_interface.hpp" #include "logging.hpp" +#ifdef _WIN32 +#include +#else +#include +#include +#endif namespace rosbag2_compression { @@ -62,6 +69,47 @@ SequentialCompressionWriter::~SequentialCompressionWriter() void SequentialCompressionWriter::compression_thread_fn() { + if (compression_options_.thread_priority) { + +#ifdef _WIN32 + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Changing thread priority is not implemented for windows"); + + /** + * The implementation should look something like this + + uint8_t nice_value = *compression_options_.thread_nice_value; + + // this must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST... + DWORD dwThreadPri = *compression_options_.thread_nice_value; + + if(!SetThreadPriority(GetCurrentThread(), dwThreadPri)) + { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Could not set nice value of compression thread to " << nice_value << " : " << std::strerror(GetLastError())); + } + */ + +#else + int wanted_nice_value = *compression_options_.thread_priority; + + errno = 0; + int cur_nice_value = getpriority(PRIO_PROCESS, 0); + if (cur_nice_value != -1 && errno == 0) { + int new_nice_value = nice(wanted_nice_value - cur_nice_value); + if ((new_nice_value == -1 && errno != 0) || new_nice_value != wanted_nice_value) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Could not set nice value of compression thread to " << wanted_nice_value << " : " << std::strerror( + errno)); + } + } else { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Could not set nice value of compression thread to " << wanted_nice_value << + " : Could not determine cur nice value"); + } +#endif + } + // Every thread needs to have its own compression context for thread safety. auto compressor = compression_factory_->create_compressor( compression_options_.compression_format); From a390c4a79756a5bf3323e29ab69a55baf6f7a655 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Tue, 12 Sep 2023 09:21:42 +0000 Subject: [PATCH 02/10] feat: Added compression_threads_priority option Signed-off-by: Janosch Machowinski --- .../test_sequential_compression_writer.cpp | 28 +++++++++++++------ .../rosbag2_transport/record_options.hpp | 1 + .../reader_writer_factory.cpp | 3 +- .../src/rosbag2_transport/record_options.cpp | 4 +++ 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 535ceb35a2..4f2ff587bd 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -143,13 +143,15 @@ class SequentialCompressionWriterTest : public TestWithParam const uint64_t kDefaultCompressionQueueSize = 1; const uint64_t kDefaultCompressionQueueThreads = 4; + const std::optional kDefaultCompressionQueueThreadsPriority = std::nullopt; }; TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri) { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; initializeWriter(compression_options); EXPECT_THROW( @@ -163,7 +165,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_bad_compression_format) { rosbag2_compression::CompressionOptions compression_options{ "bad_format", rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; initializeWriter(compression_options); EXPECT_THROW( @@ -175,7 +178,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size) { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; // Set minimum file size greater than max bagfile size option const uint64_t min_split_file_size = 10; @@ -196,7 +200,8 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; initializeWriter(compression_options); auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty"; @@ -211,7 +216,8 @@ TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor) { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; auto compression_factory = std::make_unique>(); EXPECT_CALL(*compression_factory, create_compressor(_)).Times(1); @@ -235,7 +241,8 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, kDefaultCompressionQueueSize, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority }; initializeFakeFileStorage(); @@ -277,7 +284,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_open_and_ DefaultTestCompressor, rosbag2_compression::CompressionMode::MESSAGE, 0, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority }; initializeFakeFileStorage(); @@ -315,7 +323,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_bag_split DefaultTestCompressor, rosbag2_compression::CompressionMode::MESSAGE, 0, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority }; initializeFakeFileStorage(); @@ -363,7 +372,8 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz DefaultTestCompressor, rosbag2_compression::CompressionMode::MESSAGE, kCompressionQueueSize, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority }; initializeFakeFileStorage(); diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index 50621877f8..cf96933f99 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -42,6 +42,7 @@ struct RecordOptions std::string compression_format = ""; uint64_t compression_queue_size = 1; uint64_t compression_threads = 0; + int8_t compression_threads_priority = 0; std::unordered_map topic_qos_profile_overrides{}; bool include_hidden_topics = false; bool include_unpublished_topics = false; diff --git a/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp b/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp index b1abeae33c..565337590f 100644 --- a/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp +++ b/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp @@ -53,7 +53,8 @@ std::unique_ptr ReaderWriterFactory::make_writer( record_options.compression_format, rosbag2_compression::compression_mode_from_string(record_options.compression_mode), record_options.compression_queue_size, - record_options.compression_threads + record_options.compression_threads, + record_options.compression_threads_priority, }; if (compression_options.compression_threads < 1) { compression_options.compression_threads = std::thread::hardware_concurrency(); diff --git a/rosbag2_transport/src/rosbag2_transport/record_options.cpp b/rosbag2_transport/src/rosbag2_transport/record_options.cpp index ec6a0a9813..cb57f1964d 100644 --- a/rosbag2_transport/src/rosbag2_transport/record_options.cpp +++ b/rosbag2_transport/src/rosbag2_transport/record_options.cpp @@ -39,6 +39,7 @@ Node convert::encode( node["compression_format"] = record_options.compression_format; node["compression_queue_size"] = record_options.compression_queue_size; node["compression_threads"] = record_options.compression_threads; + node["compression_threads_priority"] = record_options.compression_threads_priority; node["topic_qos_profile_overrides"] = convert>::encode( record_options.topic_qos_profile_overrides); @@ -66,6 +67,9 @@ bool convert::decode( optional_assign(node, "compression_format", record_options.compression_format); optional_assign(node, "compression_queue_size", record_options.compression_queue_size); optional_assign(node, "compression_threads", record_options.compression_threads); + optional_assign( + node, "compression_threads_priority", + record_options.compression_threads_priority); std::unordered_map qos_overrides; if (node["topic_qos_profile_overrides"]) { From 6ea1dcf0748bb975e65732b95f9f17d37d41a4b3 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Thu, 14 Sep 2023 08:21:35 +0000 Subject: [PATCH 03/10] test: implemented test case for setting priority value Signed-off-by: Janosch Machowinski --- .../test_sequential_compression_writer.cpp | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 4f2ff587bd..60ec4ae065 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -37,6 +37,13 @@ #include "mock_compression_factory.hpp" +#ifdef _WIN32 +#include +#else +#include +#include +#endif + using namespace testing; // NOLINT static constexpr const char * DefaultTestCompressor = "fake_comp"; @@ -394,6 +401,100 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite); } + +TEST_P(SequentialCompressionWriterTest, writer_sets_nice_value) +{ + const std::string test_topic_name = "test_topic"; + const std::string test_topic_type = "test_msgs/BasicTypes"; + const uint64_t kCompressionQueueSize = GetParam(); + const int wanted_nice_value = 10; + + class TestCompressor : public rosbag2_compression::BaseCompressorInterface + { + int & detected_nice_value; + +public: + TestCompressor(int & detected_nice_value) + : detected_nice_value(detected_nice_value) {} + virtual std::string compress_uri(const std::string & uri) + { + return uri; + } + + virtual void compress_serialized_bag_message( + const rosbag2_storage::SerializedBagMessage * bag_message, + rosbag2_storage::SerializedBagMessage * compressed_message) + { +#ifdef _WIN32 + int cur_nice_value = getpriority(PRIO_PROCESS, 0); + if (cur_nice_value != -1 && errno == 0) { + + detected_nice_value = cur_nice_value; + } +#else + //FIXME implement windows version +#endif + + *compressed_message = *bag_message; + } + + /** + * Get the identifier of the compression algorithm. + * This is appended to the extension of the compressed file. + */ + virtual std::string get_compression_identifier() const + { + return "niceTest"; + } + }; + + class FakeFactory : public rosbag2_compression::CompressionFactory + { + int & detected_nice_value; + +public: + FakeFactory(int & detected_nice_value) + : detected_nice_value(detected_nice_value) {} + + virtual std::shared_ptr + create_compressor(const std::string & /*compression_format*/) + { + return std::make_shared(detected_nice_value); + } + }; + + // queue size should be 0 or at least the number of remaining messages to prevent message loss + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::MESSAGE, + kCompressionQueueSize, + kDefaultCompressionQueueThreads, + wanted_nice_value + }; + + // nice values are in the range from -20 to +19, so this value will never be read + int detected_nice_value = 100; + + initializeFakeFileStorage(); + initializeWriter(compression_options, std::make_unique(detected_nice_value)); + + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({test_topic_name, test_topic_type, "", "", ""}); + + auto message = std::make_shared(); + message->topic_name = test_topic_name; + + const size_t kNumMessagesToWrite = 5; + for (size_t i = 0; i < kNumMessagesToWrite; i++) { + writer_->write(message); + } + writer_.reset(); // reset will call writer destructor + + EXPECT_EQ(detected_nice_value, *compression_options.thread_priority); + + EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite); +} + INSTANTIATE_TEST_SUITE_P( SequentialCompressionWriterTestQueueSizes, SequentialCompressionWriterTest, From 32e5002fded7cc63bcaf1fb0059bc4f9038c41df Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Wed, 20 Sep 2023 23:21:53 -0700 Subject: [PATCH 04/10] Redesign test for the case when compression writer sets threads priority Co-authored-by: Janosch Machowinski Signed-off-by: Michael Orlov --- rosbag2_compression/CMakeLists.txt | 3 +- .../fake_compression_factory.hpp | 45 +++++++++++ .../rosbag2_compression/fake_compressor.cpp | 18 ++++- .../rosbag2_compression/fake_compressor.hpp | 2 + .../test_sequential_compression_writer.cpp | 78 ++++--------------- 5 files changed, 81 insertions(+), 65 deletions(-) create mode 100644 rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp diff --git a/rosbag2_compression/CMakeLists.txt b/rosbag2_compression/CMakeLists.txt index 1862fa2eb2..9bb9a9e795 100644 --- a/rosbag2_compression/CMakeLists.txt +++ b/rosbag2_compression/CMakeLists.txt @@ -104,7 +104,8 @@ if(BUILD_TESTING) ) ament_add_gmock(test_sequential_compression_writer - test/rosbag2_compression/test_sequential_compression_writer.cpp) + test/rosbag2_compression/test_sequential_compression_writer.cpp + test/rosbag2_compression/fake_compressor.cpp) target_link_libraries(test_sequential_compression_writer ${PROJECT_NAME} rosbag2_cpp::rosbag2_cpp diff --git a/rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp b/rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp new file mode 100644 index 0000000000..d023789462 --- /dev/null +++ b/rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp @@ -0,0 +1,45 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_ +#define ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_ + +#include +#include + +#include "rosbag2_compression/compression_factory.hpp" +#include "fake_compressor.hpp" + +class FakeCompressionFactory + : public rosbag2_compression::CompressionFactory +{ +public: + FakeCompressionFactory() = delete; + + ~FakeCompressionFactory() override = default; + + explicit FakeCompressionFactory(int & detected_thread_priority) + : detected_thread_priority_(detected_thread_priority) {} + + std::shared_ptr + create_compressor(const std::string & /*compression_format*/) override + { + return std::make_shared(detected_thread_priority_); + } + +private: + int & detected_thread_priority_; +}; + +#endif // ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_ diff --git a/rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp b/rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp index 8c97ccfa23..01a5e9f0fd 100644 --- a/rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp +++ b/rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp @@ -13,11 +13,27 @@ // limitations under the License. #include +#ifdef _WIN32 +#include +#else +#include +#endif #include "pluginlib/class_list_macros.hpp" - #include "fake_compressor.hpp" +FakeCompressor::FakeCompressor(int & detected_thread_priority) +{ +#ifndef _WIN32 + int cur_nice_value = getpriority(PRIO_PROCESS, 0); + if (cur_nice_value != -1 && errno == 0) { + detected_thread_priority = cur_nice_value; + } +#else + detected_thread_priority = GetThreadPriority(GetCurrentThread()); +#endif +} + std::string FakeCompressor::compress_uri(const std::string & uri) { return uri + "." + get_compression_identifier(); diff --git a/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp b/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp index 01c6bb5be0..492958c3ec 100644 --- a/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp +++ b/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp @@ -25,6 +25,8 @@ class FakeCompressor : public rosbag2_compression::BaseCompressorInterface public: FakeCompressor() = default; + explicit FakeCompressor(int & detected_thread_priority); + std::string compress_uri(const std::string & uri) override; void compress_serialized_bag_message( diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 60ec4ae065..99207bd3d6 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -36,12 +36,10 @@ #include "mock_storage_factory.hpp" #include "mock_compression_factory.hpp" +#include "fake_compression_factory.hpp" #ifdef _WIN32 #include -#else -#include -#include #endif using namespace testing; // NOLINT @@ -401,82 +399,37 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite); } - -TEST_P(SequentialCompressionWriterTest, writer_sets_nice_value) +TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) { const std::string test_topic_name = "test_topic"; const std::string test_topic_type = "test_msgs/BasicTypes"; const uint64_t kCompressionQueueSize = GetParam(); - const int wanted_nice_value = 10; - - class TestCompressor : public rosbag2_compression::BaseCompressorInterface - { - int & detected_nice_value; - -public: - TestCompressor(int & detected_nice_value) - : detected_nice_value(detected_nice_value) {} - virtual std::string compress_uri(const std::string & uri) - { - return uri; - } - - virtual void compress_serialized_bag_message( - const rosbag2_storage::SerializedBagMessage * bag_message, - rosbag2_storage::SerializedBagMessage * compressed_message) - { -#ifdef _WIN32 - int cur_nice_value = getpriority(PRIO_PROCESS, 0); - if (cur_nice_value != -1 && errno == 0) { - - detected_nice_value = cur_nice_value; - } +#ifndef _WIN32 + const int wanted_thread_priority = 10; #else - //FIXME implement windows version + const int wanted_thread_priority = THREAD_MODE_BACKGROUND_BEGIN; #endif - *compressed_message = *bag_message; - } - - /** - * Get the identifier of the compression algorithm. - * This is appended to the extension of the compressed file. - */ - virtual std::string get_compression_identifier() const - { - return "niceTest"; - } - }; - - class FakeFactory : public rosbag2_compression::CompressionFactory - { - int & detected_nice_value; - -public: - FakeFactory(int & detected_nice_value) - : detected_nice_value(detected_nice_value) {} - - virtual std::shared_ptr - create_compressor(const std::string & /*compression_format*/) - { - return std::make_shared(detected_nice_value); - } - }; - // queue size should be 0 or at least the number of remaining messages to prevent message loss rosbag2_compression::CompressionOptions compression_options { DefaultTestCompressor, rosbag2_compression::CompressionMode::MESSAGE, kCompressionQueueSize, kDefaultCompressionQueueThreads, - wanted_nice_value + wanted_thread_priority }; +#ifndef _WIN32 // nice values are in the range from -20 to +19, so this value will never be read - int detected_nice_value = 100; + int detected_thread_priority = 100; +#else + int detected_thread_priority = THREAD_PRIORITY_ERROR_RETURN; +#endif initializeFakeFileStorage(); - initializeWriter(compression_options, std::make_unique(detected_nice_value)); + initializeWriter( + compression_options, + std::make_unique(detected_thread_priority)); writer_->open(tmp_dir_storage_options_); writer_->create_topic({test_topic_name, test_topic_type, "", "", ""}); @@ -490,8 +443,7 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_nice_value) } writer_.reset(); // reset will call writer destructor - EXPECT_EQ(detected_nice_value, *compression_options.thread_priority); - + EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority); EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite); } From fc749da672bcec5f49ac2542e419a28aaaba3f21 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Fri, 22 Sep 2023 15:01:31 +0000 Subject: [PATCH 05/10] fix: Fixed inverted check Signed-off-by: Janosch Machowinski --- .../sequential_compression_writer.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 74dbb8f44b..556e139a22 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -95,17 +95,17 @@ void SequentialCompressionWriter::compression_thread_fn() errno = 0; int cur_nice_value = getpriority(PRIO_PROCESS, 0); - if (cur_nice_value != -1 && errno == 0) { + if (cur_nice_value == -1 && errno != 0) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Could not set nice value of compression thread to " << wanted_nice_value << + " : Could not determine cur nice value"); + } else { int new_nice_value = nice(wanted_nice_value - cur_nice_value); - if ((new_nice_value == -1 && errno != 0) || new_nice_value != wanted_nice_value) { + if ((new_nice_value == -1 && errno != 0)) { ROSBAG2_COMPRESSION_LOG_WARN_STREAM( "Could not set nice value of compression thread to " << wanted_nice_value << " : " << std::strerror( errno)); } - } else { - ROSBAG2_COMPRESSION_LOG_WARN_STREAM( - "Could not set nice value of compression thread to " << wanted_nice_value << - " : Could not determine cur nice value"); } #endif } From 497f80659e09f6df08062a2336d5d918481bd72e Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Wed, 4 Oct 2023 15:28:49 +0000 Subject: [PATCH 06/10] chore: made compression_threads_priority an int32_t Signed-off-by: Janosch Machowinski --- .../rosbag2_compression/compression_options.hpp | 2 +- .../sequential_compression_writer.cpp | 5 ++--- .../test_sequential_compression_writer.cpp | 10 +++++----- .../src/writer_benchmark.cpp | 2 +- .../include/rosbag2_transport/record_options.hpp | 2 +- .../src/rosbag2_transport/record_options.cpp | 2 +- 6 files changed, 11 insertions(+), 12 deletions(-) diff --git a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp index 1ae7e71fa2..24f08eb983 100644 --- a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp +++ b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp @@ -64,7 +64,7 @@ struct CompressionOptions uint64_t compression_threads; /// if set, the compression thread(s) will try to set /// the given priority for itself - std::optional thread_priority; + std::optional thread_priority; }; } // namespace rosbag2_compression diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 556e139a22..6879acefee 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -70,7 +70,6 @@ SequentialCompressionWriter::~SequentialCompressionWriter() void SequentialCompressionWriter::compression_thread_fn() { if (compression_options_.thread_priority) { - #ifdef _WIN32 ROSBAG2_COMPRESSION_LOG_WARN_STREAM( "Changing thread priority is not implemented for windows"); @@ -103,8 +102,8 @@ void SequentialCompressionWriter::compression_thread_fn() int new_nice_value = nice(wanted_nice_value - cur_nice_value); if ((new_nice_value == -1 && errno != 0)) { ROSBAG2_COMPRESSION_LOG_WARN_STREAM( - "Could not set nice value of compression thread to " << wanted_nice_value << " : " << std::strerror( - errno)); + "Could not set nice value of compression thread to " << wanted_nice_value << + " : " << std::strerror(errno)); } } #endif diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 99207bd3d6..08a6955284 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -148,7 +148,7 @@ class SequentialCompressionWriterTest : public TestWithParam const uint64_t kDefaultCompressionQueueSize = 1; const uint64_t kDefaultCompressionQueueThreads = 4; - const std::optional kDefaultCompressionQueueThreadsPriority = std::nullopt; + const std::optional kDefaultCompressionQueueThreadsPriority = std::nullopt; }; TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri) @@ -405,9 +405,9 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) const std::string test_topic_type = "test_msgs/BasicTypes"; const uint64_t kCompressionQueueSize = GetParam(); #ifndef _WIN32 - const int wanted_thread_priority = 10; + const int32_t wanted_thread_priority = 10; #else - const int wanted_thread_priority = THREAD_MODE_BACKGROUND_BEGIN; + const int32_t wanted_thread_priority = THREAD_MODE_BACKGROUND_BEGIN; #endif // queue size should be 0 or at least the number of remaining messages to prevent message loss @@ -421,9 +421,9 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) #ifndef _WIN32 // nice values are in the range from -20 to +19, so this value will never be read - int detected_thread_priority = 100; + int32_t detected_thread_priority = 100; #else - int detected_thread_priority = THREAD_PRIORITY_ERROR_RETURN; + int32_t detected_thread_priority = THREAD_PRIORITY_ERROR_RETURN; #endif initializeFakeFileStorage(); diff --git a/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp b/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp index 13c936d322..e5f621faf3 100644 --- a/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp +++ b/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp @@ -178,7 +178,7 @@ void WriterBenchmark::create_writer() if (!bag_config_.compression_format.empty()) { rosbag2_compression::CompressionOptions compression_options{ bag_config_.compression_format, rosbag2_compression::CompressionMode::MESSAGE, - bag_config_.compression_queue_size, bag_config_.compression_threads}; + bag_config_.compression_queue_size, bag_config_.compression_threads, std::nullopt}; writer_ = std::make_unique( compression_options); diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index cf96933f99..95bdaaf533 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -42,7 +42,7 @@ struct RecordOptions std::string compression_format = ""; uint64_t compression_queue_size = 1; uint64_t compression_threads = 0; - int8_t compression_threads_priority = 0; + int32_t compression_threads_priority = 0; std::unordered_map topic_qos_profile_overrides{}; bool include_hidden_topics = false; bool include_unpublished_topics = false; diff --git a/rosbag2_transport/src/rosbag2_transport/record_options.cpp b/rosbag2_transport/src/rosbag2_transport/record_options.cpp index cb57f1964d..d593a56502 100644 --- a/rosbag2_transport/src/rosbag2_transport/record_options.cpp +++ b/rosbag2_transport/src/rosbag2_transport/record_options.cpp @@ -67,7 +67,7 @@ bool convert::decode( optional_assign(node, "compression_format", record_options.compression_format); optional_assign(node, "compression_queue_size", record_options.compression_queue_size); optional_assign(node, "compression_threads", record_options.compression_threads); - optional_assign( + optional_assign( node, "compression_threads_priority", record_options.compression_threads_priority); From 99c05a24f9be5321209a4e0e161ed3988651f447 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Wed, 15 Nov 2023 19:06:39 -0800 Subject: [PATCH 07/10] Enable thread priority for Windows Signed-off-by: Michael Orlov --- .../compression_options.hpp | 6 ++- .../sequential_compression_writer.cpp | 37 +++++++++---------- .../test_sequential_compression_writer.cpp | 13 ++++++- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp index 24f08eb983..e326dedf5b 100644 --- a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp +++ b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp @@ -61,9 +61,11 @@ struct CompressionOptions std::string compression_format; CompressionMode compression_mode; uint64_t compression_queue_size; + /// \brief // The number of compression threads uint64_t compression_threads; - /// if set, the compression thread(s) will try to set - /// the given priority for itself + /// \brief If set, the compression thread(s) will try to set the given priority for itself + /// For Windows the valid values are: THREAD_PRIORITY_LOWEST, THREAD_PRIORITY_BELOW_NORMAL and + /// THREAD_PRIORITY_NORMAL. For POSIX compatible OSes this is the "nice" value. std::optional thread_priority; }; diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 6879acefee..5eb0b20e04 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -71,24 +71,23 @@ void SequentialCompressionWriter::compression_thread_fn() { if (compression_options_.thread_priority) { #ifdef _WIN32 - ROSBAG2_COMPRESSION_LOG_WARN_STREAM( - "Changing thread priority is not implemented for windows"); - - /** - * The implementation should look something like this - - uint8_t nice_value = *compression_options_.thread_nice_value; - - // this must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST... - DWORD dwThreadPri = *compression_options_.thread_nice_value; - - if(!SetThreadPriority(GetCurrentThread(), dwThreadPri)) - { + // This must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST... + int wanted_thread_priority = *compression_options_.thread_priority; + if (!SetThreadPriority(GetCurrentThread(), wanted_thread_priority)) { ROSBAG2_COMPRESSION_LOG_WARN_STREAM( - "Could not set nice value of compression thread to " << nice_value << " : " << std::strerror(GetLastError())); + "Could not set thread priority of compression thread to the: " << wanted_thread_priority << + ". Error code: " << GetLastError()); + } else { + auto detected_thread_priority = GetThreadPriority(GetCurrentThread()); + if (detected_thread_priority == THREAD_PRIORITY_ERROR_RETURN) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Failed to get current thread priority. Error code: " << GetLastError()); + } else if (wanted_thread_priority != detected_thread_priority) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Could not set thread priority of compression thread to the: " << + wanted_thread_priority << ". Detected thread priority: " << detected_thread_priority); + } } - */ - #else int wanted_nice_value = *compression_options_.thread_priority; @@ -96,14 +95,14 @@ void SequentialCompressionWriter::compression_thread_fn() int cur_nice_value = getpriority(PRIO_PROCESS, 0); if (cur_nice_value == -1 && errno != 0) { ROSBAG2_COMPRESSION_LOG_WARN_STREAM( - "Could not set nice value of compression thread to " << wanted_nice_value << + "Could not set nice value of compression thread to the: " << wanted_nice_value << " : Could not determine cur nice value"); } else { int new_nice_value = nice(wanted_nice_value - cur_nice_value); if ((new_nice_value == -1 && errno != 0)) { ROSBAG2_COMPRESSION_LOG_WARN_STREAM( - "Could not set nice value of compression thread to " << wanted_nice_value << - " : " << std::strerror(errno)); + "Could not set nice value of compression thread to the: " << wanted_nice_value << + ". Error : " << std::strerror(errno)); } } #endif diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 08a6955284..da611b5ab8 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -40,6 +40,9 @@ #ifdef _WIN32 #include +#else +#include +#include #endif using namespace testing; // NOLINT @@ -406,8 +409,14 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) const uint64_t kCompressionQueueSize = GetParam(); #ifndef _WIN32 const int32_t wanted_thread_priority = 10; + errno = 0; + int cur_nice_value = getpriority(PRIO_PROCESS, 0); + ASSERT_TRUE(!(cur_nice_value == -1 && errno != 0)); #else - const int32_t wanted_thread_priority = THREAD_MODE_BACKGROUND_BEGIN; + const int32_t wanted_thread_priority = THREAD_PRIORITY_LOWEST; + auto current_thread_priority = GetThreadPriority(GetCurrentThread()); + ASSERT_NE(current_thread_priority, THREAD_PRIORITY_ERROR_RETURN); + ASSERT_NE(current_thread_priority, wanted_thread_priority); #endif // queue size should be 0 or at least the number of remaining messages to prevent message loss @@ -432,7 +441,7 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) std::make_unique(detected_thread_priority)); writer_->open(tmp_dir_storage_options_); - writer_->create_topic({test_topic_name, test_topic_type, "", "", ""}); + writer_->create_topic({test_topic_name, test_topic_type, "", {}, ""}); auto message = std::make_shared(); message->topic_name = test_topic_name; From 3cc1b5b76d21f0af61b7a38302bb699b5bf78076 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Wed, 29 Nov 2023 08:11:51 -0800 Subject: [PATCH 08/10] Address grammar in log messages Signed-off-by: Michael Orlov --- .../rosbag2_compression/sequential_compression_writer.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 5eb0b20e04..54b94d42bb 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -75,7 +75,7 @@ void SequentialCompressionWriter::compression_thread_fn() int wanted_thread_priority = *compression_options_.thread_priority; if (!SetThreadPriority(GetCurrentThread(), wanted_thread_priority)) { ROSBAG2_COMPRESSION_LOG_WARN_STREAM( - "Could not set thread priority of compression thread to the: " << wanted_thread_priority << + "Could not set thread priority of compression thread to: " << wanted_thread_priority << ". Error code: " << GetLastError()); } else { auto detected_thread_priority = GetThreadPriority(GetCurrentThread()); @@ -84,7 +84,7 @@ void SequentialCompressionWriter::compression_thread_fn() "Failed to get current thread priority. Error code: " << GetLastError()); } else if (wanted_thread_priority != detected_thread_priority) { ROSBAG2_COMPRESSION_LOG_WARN_STREAM( - "Could not set thread priority of compression thread to the: " << + "Could not set thread priority of compression thread to: " << wanted_thread_priority << ". Detected thread priority: " << detected_thread_priority); } } @@ -95,13 +95,13 @@ void SequentialCompressionWriter::compression_thread_fn() int cur_nice_value = getpriority(PRIO_PROCESS, 0); if (cur_nice_value == -1 && errno != 0) { ROSBAG2_COMPRESSION_LOG_WARN_STREAM( - "Could not set nice value of compression thread to the: " << wanted_nice_value << + "Could not set nice value of compression thread to: " << wanted_nice_value << " : Could not determine cur nice value"); } else { int new_nice_value = nice(wanted_nice_value - cur_nice_value); if ((new_nice_value == -1 && errno != 0)) { ROSBAG2_COMPRESSION_LOG_WARN_STREAM( - "Could not set nice value of compression thread to the: " << wanted_nice_value << + "Could not set nice value of compression thread to: " << wanted_nice_value << ". Error : " << std::strerror(errno)); } } From 22c18d592b14f59938655322f44b9b377d35bfe7 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Tue, 5 Dec 2023 08:05:50 +0000 Subject: [PATCH 09/10] fix: fixed namespace collision warning in test Signed-off-by: Janosch Machowinski --- rosbag2_compression/CMakeLists.txt | 4 ++-- .../test/rosbag2_compression/fake_compression_factory.hpp | 2 +- .../test/rosbag2_compression/fake_compressor.hpp | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/rosbag2_compression/CMakeLists.txt b/rosbag2_compression/CMakeLists.txt index 9bb9a9e795..0376daa247 100644 --- a/rosbag2_compression/CMakeLists.txt +++ b/rosbag2_compression/CMakeLists.txt @@ -104,10 +104,10 @@ if(BUILD_TESTING) ) ament_add_gmock(test_sequential_compression_writer - test/rosbag2_compression/test_sequential_compression_writer.cpp - test/rosbag2_compression/fake_compressor.cpp) + test/rosbag2_compression/test_sequential_compression_writer.cpp) target_link_libraries(test_sequential_compression_writer ${PROJECT_NAME} + fake_plugin rosbag2_cpp::rosbag2_cpp rosbag2_storage::rosbag2_storage ) diff --git a/rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp b/rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp index d023789462..fa1b9f3f35 100644 --- a/rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp +++ b/rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp @@ -21,7 +21,7 @@ #include "rosbag2_compression/compression_factory.hpp" #include "fake_compressor.hpp" -class FakeCompressionFactory +class ROSBAG2_COMPRESSION_EXPORT FakeCompressionFactory : public rosbag2_compression::CompressionFactory { public: diff --git a/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp b/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp index 492958c3ec..d641c97be0 100644 --- a/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp +++ b/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp @@ -20,7 +20,8 @@ #include "rosbag2_compression/base_compressor_interface.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" -class FakeCompressor : public rosbag2_compression::BaseCompressorInterface +class ROSBAG2_COMPRESSION_EXPORT FakeCompressor : public rosbag2_compression:: + BaseCompressorInterface { public: FakeCompressor() = default; From 2f5811353696bf61786f8b03706d36d5023e2a2c Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Wed, 13 Dec 2023 12:54:29 -0800 Subject: [PATCH 10/10] Add compression_threads_priority to the node params parser Signed-off-by: Michael Orlov --- .../src/rosbag2_transport/config_options_from_node_params.cpp | 4 ++++ rosbag2_transport/test/resources/recorder_node_params.yaml | 1 + .../test/rosbag2_transport/test_composable_recorder.cpp | 1 + 3 files changed, 6 insertions(+) diff --git a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp index 49dc587ac5..3af2976c9d 100644 --- a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp +++ b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp @@ -235,6 +235,10 @@ RecordOptions get_record_options_from_node_params(rclcpp::Node & node) node, "record.compression_threads", 0, std::numeric_limits::max(), record_options.compression_threads); + record_options.compression_threads_priority = param_utils::declare_integer_node_params( + node, "record.compression_threads_priority", std::numeric_limits::min(), + std::numeric_limits::max(), record_options.compression_threads_priority); + std::string qos_profile_overrides_path = node.declare_parameter("record.qos_profile_overrides_path", ""); diff --git a/rosbag2_transport/test/resources/recorder_node_params.yaml b/rosbag2_transport/test/resources/recorder_node_params.yaml index b5ca93b7b1..8fd6123988 100644 --- a/rosbag2_transport/test/resources/recorder_node_params.yaml +++ b/rosbag2_transport/test/resources/recorder_node_params.yaml @@ -18,6 +18,7 @@ recorder_params_node: compression_format: "h264" compression_queue_size: 10 compression_threads: 2 + compression_threads_priority: -1 qos_profile_overrides_path: "" include_hidden_topics: true include_unpublished_topics: true diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp index 54dbbe055e..3b9d494d6c 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp @@ -220,6 +220,7 @@ TEST_P(ComposableRecorderTests, recorder_can_parse_parameters_from_file) { EXPECT_EQ(record_options.compression_format, "h264"); EXPECT_EQ(record_options.compression_queue_size, 10); EXPECT_EQ(record_options.compression_threads, 2); + EXPECT_EQ(record_options.compression_threads_priority, -1); std::unordered_map topic_qos_profile_overrides{ std::pair{ "/overrided_topic_qos",