From 72e985e81b64c8ea6982e680de19f051f090cca6 Mon Sep 17 00:00:00 2001 From: Roman Sokolkov Date: Wed, 31 Jul 2024 13:55:25 +0300 Subject: [PATCH 1/5] Add cli option compression-threads-priority Signed-off-by: Roman Sokolkov --- ros2bag/ros2bag/verb/record.py | 5 ++ ...record_with_compression_thread_priority.py | 54 +++++++++++++++++++ .../sequential_compression_writer.cpp | 2 + rosbag2_py/src/rosbag2_py/_transport.cpp | 1 + 4 files changed, 62 insertions(+) create mode 100644 ros2bag/test/test_record_with_compression_thread_priority.py diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index 733ff360af..20afc89f77 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -194,6 +194,10 @@ def add_recorder_arguments(parser: ArgumentParser) -> None: '--compression-threads', type=int, default=0, help='Number of files or messages that may be compressed in parallel. ' 'Default is %(default)d, which will be interpreted as the number of CPU cores.') + parser.add_argument( + '--compression-threads-priority', type=int, default=0, + help='Thread scheduling priority. Use nice value for Linux and ' + 'priority level for Windows. Default is %(default)d.') parser.add_argument( '--compression-mode', type=str, default='none', choices=['none', 'file', 'message'], @@ -338,6 +342,7 @@ def main(self, *, args): # noqa: D102 record_options.compression_format = args.compression_format record_options.compression_queue_size = args.compression_queue_size record_options.compression_threads = args.compression_threads + record_options.compression_threads_priority = args.compression_threads_priority record_options.topic_qos_profile_overrides = qos_profile_overrides record_options.include_hidden_topics = args.include_hidden_topics record_options.include_unpublished_topics = args.include_unpublished_topics diff --git a/ros2bag/test/test_record_with_compression_thread_priority.py b/ros2bag/test/test_record_with_compression_thread_priority.py new file mode 100644 index 0000000000..31f59fd0dd --- /dev/null +++ b/ros2bag/test/test_record_with_compression_thread_priority.py @@ -0,0 +1,54 @@ +# Copyright 2020 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +import tempfile + +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess + +import launch_testing +import launch_testing.actions + +import pytest + + +@pytest.mark.launch_test +def generate_test_description(): + tmp_dir_name = tempfile.mkdtemp() + output_path = Path(tmp_dir_name) / 'ros2bag_test_record_with_compression_thread_priority' + record_process = ExecuteProcess( + cmd=['ros2', 'bag', 'record', '-a', '--output', output_path.as_posix(), + '--log-level', 'debug', '--compression-threads-priority', '1', + '--compression-mode', 'file', '--compression-format', 'zstd'], + name='ros2bag-cli', + output='screen', + ) + + return LaunchDescription([ + record_process, + launch_testing.actions.ReadyToTest() + ]), locals() + + +class TestRecordWithCompressionThreadPriority(unittest.TestCase): + + def test_priority_propagated_into_compression_thread( + self, record_process, proc_output): + proc_output.assertWaitFor( + 'Setting compression thread priority to 1', + process=record_process + ) diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 8f5a89c619..d1d7f2e233 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -71,6 +71,8 @@ SequentialCompressionWriter::~SequentialCompressionWriter() void SequentialCompressionWriter::compression_thread_fn() { if (compression_options_.thread_priority) { + ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Setting compression thread priority to " + << *compression_options_.thread_priority); #ifdef _WIN32 // This must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST... int wanted_thread_priority = *compression_options_.thread_priority; diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index a92dd36842..9d15e82b15 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -573,6 +573,7 @@ PYBIND11_MODULE(_transport, m) { .def_readwrite("compression_format", &RecordOptions::compression_format) .def_readwrite("compression_queue_size", &RecordOptions::compression_queue_size) .def_readwrite("compression_threads", &RecordOptions::compression_threads) + .def_readwrite("compression_threads_priority", &RecordOptions::compression_threads_priority) .def_property( "topic_qos_profile_overrides", &RecordOptions::getTopicQoSProfileOverrides, From eacc1f2121aa1c5bfe4b3cd2cd0489f5fa0b8597 Mon Sep 17 00:00:00 2001 From: Roman Sokolkov Date: Mon, 5 Aug 2024 10:38:03 +0300 Subject: [PATCH 2/5] Fix CI issues Signed-off-by: Roman Sokolkov --- ros2bag/test/test_record_with_compression_thread_priority.py | 2 +- .../rosbag2_compression/sequential_compression_writer.cpp | 5 +++-- rosbag2_py/rosbag2_py/_transport.pyi | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ros2bag/test/test_record_with_compression_thread_priority.py b/ros2bag/test/test_record_with_compression_thread_priority.py index 31f59fd0dd..5fe002036c 100644 --- a/ros2bag/test/test_record_with_compression_thread_priority.py +++ b/ros2bag/test/test_record_with_compression_thread_priority.py @@ -1,4 +1,4 @@ -# Copyright 2020 Open Source Robotics Foundation, Inc. +# Copyright 2024 Open Source Robotics Foundation, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index d1d7f2e233..d4674e2eb0 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -71,8 +71,9 @@ SequentialCompressionWriter::~SequentialCompressionWriter() void SequentialCompressionWriter::compression_thread_fn() { if (compression_options_.thread_priority) { - ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Setting compression thread priority to " - << *compression_options_.thread_priority); + ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM( + "Setting compression thread priority to " + << *compression_options_.thread_priority); #ifdef _WIN32 // This must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST... int wanted_thread_priority = *compression_options_.thread_priority; diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index 0b67ca5752..dbab6f6464 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -48,6 +48,7 @@ class RecordOptions: compression_mode: str compression_queue_size: int compression_threads: int + compression_threads_priority: int disable_keyboard_controls: bool exclude_regex: str exclude_service_events: List[str] From 7a33afd17e81e3dc81ec523f2d00b02116432051 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Thu, 8 Aug 2024 18:19:20 -0700 Subject: [PATCH 3/5] Add timeout for the test_priority_propagated_into_compression_thread Signed-off-by: Michael Orlov --- ros2bag/test/test_record_with_compression_thread_priority.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ros2bag/test/test_record_with_compression_thread_priority.py b/ros2bag/test/test_record_with_compression_thread_priority.py index 5fe002036c..a6e58086c6 100644 --- a/ros2bag/test/test_record_with_compression_thread_priority.py +++ b/ros2bag/test/test_record_with_compression_thread_priority.py @@ -50,5 +50,6 @@ def test_priority_propagated_into_compression_thread( self, record_process, proc_output): proc_output.assertWaitFor( 'Setting compression thread priority to 1', + timeout=45, process=record_process ) From 0903d7b754f778d72195508845b0af3b66a6e906 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Thu, 8 Aug 2024 18:31:08 -0700 Subject: [PATCH 4/5] Update help section and doxygen comments for thread priority parameters Signed-off-by: Michael Orlov --- ros2bag/ros2bag/verb/record.py | 11 +++++++++-- .../rosbag2_compression/compression_options.hpp | 5 +++-- .../include/rosbag2_transport/record_options.hpp | 5 +++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index 20afc89f77..dadb6807ba 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -196,8 +196,15 @@ def add_recorder_arguments(parser: ArgumentParser) -> None: 'Default is %(default)d, which will be interpreted as the number of CPU cores.') parser.add_argument( '--compression-threads-priority', type=int, default=0, - help='Thread scheduling priority. Use nice value for Linux and ' - 'priority level for Windows. Default is %(default)d.') + help='Compression threads scheduling priority. \nFor Windows the valid values are:' + ' THREAD_PRIORITY_LOWEST=-2, THREAD_PRIORITY_BELOW_NORMAL=-1 and' + ' THREAD_PRIORITY_NORMAL=0. Please refer to' + ' https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority' # noqa E501 + ' for details.\n' + 'For POSIX compatible OSes this is the "nice" value. The nice value range is' + ' -20 to +19 where -20 is highest, 0 default and +19 is lowest.' + ' Please refer to https://man7.org/linux/man-pages/man2/nice.2.html for details.\n' + 'Default is %(default)d.') parser.add_argument( '--compression-mode', type=str, default='none', choices=['none', 'file', 'message'], diff --git a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp index 3b8445de04..b40ca4aef4 100644 --- a/rosbag2_compression/include/rosbag2_compression/compression_options.hpp +++ b/rosbag2_compression/include/rosbag2_compression/compression_options.hpp @@ -64,8 +64,9 @@ struct CompressionOptions /// \brief // The number of compression threads uint64_t compression_threads{0}; /// \brief If set, the compression thread(s) will try to set the given priority for itself - /// For Windows the valid values are: THREAD_PRIORITY_LOWEST, THREAD_PRIORITY_BELOW_NORMAL and - /// THREAD_PRIORITY_NORMAL. For POSIX compatible OSes this is the "nice" value. + /// For Windows the valid values are: THREAD_PRIORITY_LOWEST=-2, THREAD_PRIORITY_BELOW_NORMAL=-1 + /// and THREAD_PRIORITY_NORMAL=0. For POSIX compatible OSes this is the "nice" value. + /// The nice value range is -20 to +19 where -20 is highest, 0 default and +19 is lowest. std::optional thread_priority; }; diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index 695dc531bd..5d6d22bb38 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -47,7 +47,12 @@ struct RecordOptions std::string compression_mode = ""; std::string compression_format = ""; uint64_t compression_queue_size = 1; + /// \brief // The number of compression threads uint64_t compression_threads = 0; + /// \brief Compression threads scheduling priority. + /// For Windows the valid values are: THREAD_PRIORITY_LOWEST=-2, THREAD_PRIORITY_BELOW_NORMAL=-1 + /// and THREAD_PRIORITY_NORMAL=0. For POSIX compatible OSes this is the "nice" value. + /// The nice value range is -20 to +19 where -20 is highest, 0 default and +19 is lowest. int32_t compression_threads_priority = 0; std::unordered_map topic_qos_profile_overrides{}; bool include_hidden_topics = false; From ef55ba17e5ebace049847cc3cfe619964e56c6f6 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Thu, 8 Aug 2024 18:34:26 -0700 Subject: [PATCH 5/5] Use integer type for compression threads priority default value in tests - Rationale: To test the same behavior as in the writer factory class Signed-off-by: Michael Orlov --- .../rosbag2_compression/test_sequential_compression_writer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 25f913756f..2b50719520 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -154,7 +154,7 @@ class SequentialCompressionWriterTest : public TestWithParam const uint64_t kDefaultCompressionQueueSize = 1; const uint64_t kDefaultCompressionQueueThreads = 4; - const std::optional kDefaultCompressionQueueThreadsPriority = std::nullopt; + const int32_t kDefaultCompressionQueueThreadsPriority = 0; }; TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri)