diff --git a/rosbag2_compression/CMakeLists.txt b/rosbag2_compression/CMakeLists.txt index 1862fa2eb2..0376daa247 100644 --- a/rosbag2_compression/CMakeLists.txt +++ b/rosbag2_compression/CMakeLists.txt @@ -107,6 +107,7 @@ if(BUILD_TESTING) test/rosbag2_compression/test_sequential_compression_writer.cpp) target_link_libraries(test_sequential_compression_writer ${PROJECT_NAME} + fake_plugin rosbag2_cpp::rosbag2_cpp rosbag2_storage::rosbag2_storage ) diff --git a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp index 3028f2a49d..e326dedf5b 100644 --- a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp +++ b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp @@ -17,6 +17,7 @@ #include #include +#include #include "visibility_control.hpp" @@ -60,7 +61,12 @@ struct CompressionOptions std::string compression_format; CompressionMode compression_mode; uint64_t compression_queue_size; + /// \brief // The number of compression threads uint64_t compression_threads; + /// \brief If set, the compression thread(s) will try to set the given priority for itself + /// For Windows the valid values are: THREAD_PRIORITY_LOWEST, THREAD_PRIORITY_BELOW_NORMAL and + /// THREAD_PRIORITY_NORMAL. For POSIX compatible OSes this is the "nice" value. + std::optional thread_priority; }; } // namespace rosbag2_compression diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index c3d6277ae4..54b94d42bb 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -33,6 +34,12 @@ #include "rosbag2_storage/storage_interfaces/read_write_interface.hpp" #include "logging.hpp" +#ifdef _WIN32 +#include +#else +#include +#include +#endif namespace rosbag2_compression { @@ -62,6 +69,45 @@ SequentialCompressionWriter::~SequentialCompressionWriter() void SequentialCompressionWriter::compression_thread_fn() { + if (compression_options_.thread_priority) { +#ifdef _WIN32 + // This must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST... + int wanted_thread_priority = *compression_options_.thread_priority; + if (!SetThreadPriority(GetCurrentThread(), wanted_thread_priority)) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Could not set thread priority of compression thread to: " << wanted_thread_priority << + ". Error code: " << GetLastError()); + } else { + auto detected_thread_priority = GetThreadPriority(GetCurrentThread()); + if (detected_thread_priority == THREAD_PRIORITY_ERROR_RETURN) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Failed to get current thread priority. Error code: " << GetLastError()); + } else if (wanted_thread_priority != detected_thread_priority) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Could not set thread priority of compression thread to: " << + wanted_thread_priority << ". Detected thread priority: " << detected_thread_priority); + } + } +#else + int wanted_nice_value = *compression_options_.thread_priority; + + errno = 0; + int cur_nice_value = getpriority(PRIO_PROCESS, 0); + if (cur_nice_value == -1 && errno != 0) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Could not set nice value of compression thread to: " << wanted_nice_value << + " : Could not determine cur nice value"); + } else { + int new_nice_value = nice(wanted_nice_value - cur_nice_value); + if ((new_nice_value == -1 && errno != 0)) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Could not set nice value of compression thread to: " << wanted_nice_value << + ". Error : " << std::strerror(errno)); + } + } +#endif + } + // Every thread needs to have its own compression context for thread safety. auto compressor = compression_factory_->create_compressor( compression_options_.compression_format); diff --git a/rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp b/rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp new file mode 100644 index 0000000000..fa1b9f3f35 --- /dev/null +++ b/rosbag2_compression/test/rosbag2_compression/fake_compression_factory.hpp @@ -0,0 +1,45 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_ +#define ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_ + +#include +#include + +#include "rosbag2_compression/compression_factory.hpp" +#include "fake_compressor.hpp" + +class ROSBAG2_COMPRESSION_EXPORT FakeCompressionFactory + : public rosbag2_compression::CompressionFactory +{ +public: + FakeCompressionFactory() = delete; + + ~FakeCompressionFactory() override = default; + + explicit FakeCompressionFactory(int & detected_thread_priority) + : detected_thread_priority_(detected_thread_priority) {} + + std::shared_ptr + create_compressor(const std::string & /*compression_format*/) override + { + return std::make_shared(detected_thread_priority_); + } + +private: + int & detected_thread_priority_; +}; + +#endif // ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_ diff --git a/rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp b/rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp index 8c97ccfa23..01a5e9f0fd 100644 --- a/rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp +++ b/rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp @@ -13,11 +13,27 @@ // limitations under the License. #include +#ifdef _WIN32 +#include +#else +#include +#endif #include "pluginlib/class_list_macros.hpp" - #include "fake_compressor.hpp" +FakeCompressor::FakeCompressor(int & detected_thread_priority) +{ +#ifndef _WIN32 + int cur_nice_value = getpriority(PRIO_PROCESS, 0); + if (cur_nice_value != -1 && errno == 0) { + detected_thread_priority = cur_nice_value; + } +#else + detected_thread_priority = GetThreadPriority(GetCurrentThread()); +#endif +} + std::string FakeCompressor::compress_uri(const std::string & uri) { return uri + "." + get_compression_identifier(); diff --git a/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp b/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp index 01c6bb5be0..d641c97be0 100644 --- a/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp +++ b/rosbag2_compression/test/rosbag2_compression/fake_compressor.hpp @@ -20,11 +20,14 @@ #include "rosbag2_compression/base_compressor_interface.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" -class FakeCompressor : public rosbag2_compression::BaseCompressorInterface +class ROSBAG2_COMPRESSION_EXPORT FakeCompressor : public rosbag2_compression:: + BaseCompressorInterface { public: FakeCompressor() = default; + explicit FakeCompressor(int & detected_thread_priority); + std::string compress_uri(const std::string & uri) override; void compress_serialized_bag_message( diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 535ceb35a2..da611b5ab8 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -36,6 +36,14 @@ #include "mock_storage_factory.hpp" #include "mock_compression_factory.hpp" +#include "fake_compression_factory.hpp" + +#ifdef _WIN32 +#include +#else +#include +#include +#endif using namespace testing; // NOLINT @@ -143,13 +151,15 @@ class SequentialCompressionWriterTest : public TestWithParam const uint64_t kDefaultCompressionQueueSize = 1; const uint64_t kDefaultCompressionQueueThreads = 4; + const std::optional kDefaultCompressionQueueThreadsPriority = std::nullopt; }; TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri) { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; initializeWriter(compression_options); EXPECT_THROW( @@ -163,7 +173,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_bad_compression_format) { rosbag2_compression::CompressionOptions compression_options{ "bad_format", rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; initializeWriter(compression_options); EXPECT_THROW( @@ -175,7 +186,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size) { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; // Set minimum file size greater than max bagfile size option const uint64_t min_split_file_size = 10; @@ -196,7 +208,8 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; initializeWriter(compression_options); auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty"; @@ -211,7 +224,8 @@ TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor) { rosbag2_compression::CompressionOptions compression_options{ DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, - kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; auto compression_factory = std::make_unique>(); EXPECT_CALL(*compression_factory, create_compressor(_)).Times(1); @@ -235,7 +249,8 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, kDefaultCompressionQueueSize, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority }; initializeFakeFileStorage(); @@ -277,7 +292,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_open_and_ DefaultTestCompressor, rosbag2_compression::CompressionMode::MESSAGE, 0, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority }; initializeFakeFileStorage(); @@ -315,7 +331,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_bag_split DefaultTestCompressor, rosbag2_compression::CompressionMode::MESSAGE, 0, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority }; initializeFakeFileStorage(); @@ -363,7 +380,8 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz DefaultTestCompressor, rosbag2_compression::CompressionMode::MESSAGE, kCompressionQueueSize, - kDefaultCompressionQueueThreads + kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority }; initializeFakeFileStorage(); @@ -384,6 +402,60 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite); } +TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority) +{ + const std::string test_topic_name = "test_topic"; + const std::string test_topic_type = "test_msgs/BasicTypes"; + const uint64_t kCompressionQueueSize = GetParam(); +#ifndef _WIN32 + const int32_t wanted_thread_priority = 10; + errno = 0; + int cur_nice_value = getpriority(PRIO_PROCESS, 0); + ASSERT_TRUE(!(cur_nice_value == -1 && errno != 0)); +#else + const int32_t wanted_thread_priority = THREAD_PRIORITY_LOWEST; + auto current_thread_priority = GetThreadPriority(GetCurrentThread()); + ASSERT_NE(current_thread_priority, THREAD_PRIORITY_ERROR_RETURN); + ASSERT_NE(current_thread_priority, wanted_thread_priority); +#endif + + // queue size should be 0 or at least the number of remaining messages to prevent message loss + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::MESSAGE, + kCompressionQueueSize, + kDefaultCompressionQueueThreads, + wanted_thread_priority + }; + +#ifndef _WIN32 + // nice values are in the range from -20 to +19, so this value will never be read + int32_t detected_thread_priority = 100; +#else + int32_t detected_thread_priority = THREAD_PRIORITY_ERROR_RETURN; +#endif + + initializeFakeFileStorage(); + initializeWriter( + compression_options, + std::make_unique(detected_thread_priority)); + + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({test_topic_name, test_topic_type, "", {}, ""}); + + auto message = std::make_shared(); + message->topic_name = test_topic_name; + + const size_t kNumMessagesToWrite = 5; + for (size_t i = 0; i < kNumMessagesToWrite; i++) { + writer_->write(message); + } + writer_.reset(); // reset will call writer destructor + + EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority); + EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite); +} + INSTANTIATE_TEST_SUITE_P( SequentialCompressionWriterTestQueueSizes, SequentialCompressionWriterTest, diff --git a/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp b/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp index 13c936d322..e5f621faf3 100644 --- a/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp +++ b/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp @@ -178,7 +178,7 @@ void WriterBenchmark::create_writer() if (!bag_config_.compression_format.empty()) { rosbag2_compression::CompressionOptions compression_options{ bag_config_.compression_format, rosbag2_compression::CompressionMode::MESSAGE, - bag_config_.compression_queue_size, bag_config_.compression_threads}; + bag_config_.compression_queue_size, bag_config_.compression_threads, std::nullopt}; writer_ = std::make_unique( compression_options); diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index 50621877f8..95bdaaf533 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -42,6 +42,7 @@ struct RecordOptions std::string compression_format = ""; uint64_t compression_queue_size = 1; uint64_t compression_threads = 0; + int32_t compression_threads_priority = 0; std::unordered_map topic_qos_profile_overrides{}; bool include_hidden_topics = false; bool include_unpublished_topics = false; diff --git a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp index 49dc587ac5..3af2976c9d 100644 --- a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp +++ b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp @@ -235,6 +235,10 @@ RecordOptions get_record_options_from_node_params(rclcpp::Node & node) node, "record.compression_threads", 0, std::numeric_limits::max(), record_options.compression_threads); + record_options.compression_threads_priority = param_utils::declare_integer_node_params( + node, "record.compression_threads_priority", std::numeric_limits::min(), + std::numeric_limits::max(), record_options.compression_threads_priority); + std::string qos_profile_overrides_path = node.declare_parameter("record.qos_profile_overrides_path", ""); diff --git a/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp b/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp index b1abeae33c..565337590f 100644 --- a/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp +++ b/rosbag2_transport/src/rosbag2_transport/reader_writer_factory.cpp @@ -53,7 +53,8 @@ std::unique_ptr ReaderWriterFactory::make_writer( record_options.compression_format, rosbag2_compression::compression_mode_from_string(record_options.compression_mode), record_options.compression_queue_size, - record_options.compression_threads + record_options.compression_threads, + record_options.compression_threads_priority, }; if (compression_options.compression_threads < 1) { compression_options.compression_threads = std::thread::hardware_concurrency(); diff --git a/rosbag2_transport/src/rosbag2_transport/record_options.cpp b/rosbag2_transport/src/rosbag2_transport/record_options.cpp index ec6a0a9813..d593a56502 100644 --- a/rosbag2_transport/src/rosbag2_transport/record_options.cpp +++ b/rosbag2_transport/src/rosbag2_transport/record_options.cpp @@ -39,6 +39,7 @@ Node convert::encode( node["compression_format"] = record_options.compression_format; node["compression_queue_size"] = record_options.compression_queue_size; node["compression_threads"] = record_options.compression_threads; + node["compression_threads_priority"] = record_options.compression_threads_priority; node["topic_qos_profile_overrides"] = convert>::encode( record_options.topic_qos_profile_overrides); @@ -66,6 +67,9 @@ bool convert::decode( optional_assign(node, "compression_format", record_options.compression_format); optional_assign(node, "compression_queue_size", record_options.compression_queue_size); optional_assign(node, "compression_threads", record_options.compression_threads); + optional_assign( + node, "compression_threads_priority", + record_options.compression_threads_priority); std::unordered_map qos_overrides; if (node["topic_qos_profile_overrides"]) { diff --git a/rosbag2_transport/test/resources/recorder_node_params.yaml b/rosbag2_transport/test/resources/recorder_node_params.yaml index b5ca93b7b1..8fd6123988 100644 --- a/rosbag2_transport/test/resources/recorder_node_params.yaml +++ b/rosbag2_transport/test/resources/recorder_node_params.yaml @@ -18,6 +18,7 @@ recorder_params_node: compression_format: "h264" compression_queue_size: 10 compression_threads: 2 + compression_threads_priority: -1 qos_profile_overrides_path: "" include_hidden_topics: true include_unpublished_topics: true diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp index 54dbbe055e..3b9d494d6c 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp @@ -220,6 +220,7 @@ TEST_P(ComposableRecorderTests, recorder_can_parse_parameters_from_file) { EXPECT_EQ(record_options.compression_format, "h264"); EXPECT_EQ(record_options.compression_queue_size, 10); EXPECT_EQ(record_options.compression_threads, 2); + EXPECT_EQ(record_options.compression_threads_priority, -1); std::unordered_map topic_qos_profile_overrides{ std::pair{ "/overrided_topic_qos",