From 398fdca4091d64f09bcc93dbc1c79d062bd5e884 Mon Sep 17 00:00:00 2001 From: nimiCurtis Date: Thu, 10 Jul 2025 02:24:41 +0300 Subject: [PATCH 1/9] Refactor SystemDataRecorder for improved parameter handling and multi session management --- CMakeLists.txt | 7 +- README.md | 168 ++++--- config/sdr_example.yaml | 18 + include/sdr/sdr_component.hpp | 208 ++++---- launch/sdr_example.launch.py | 24 + src/sdr_component.cpp | 880 ++++++++++++++++++---------------- src/system_data_recorder.cpp | 17 +- 7 files changed, 695 insertions(+), 627 deletions(-) create mode 100644 config/sdr_example.yaml create mode 100644 launch/sdr_example.launch.py diff --git a/CMakeLists.txt b/CMakeLists.txt index c33e60c..128970e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.8) +cmake_minimum_required(VERSION 3.10...3.27) project(system_data_recorder) if(NOT CMAKE_CXX_STANDARD) @@ -54,6 +54,11 @@ install(TARGETS DESTINATION lib/${PROJECT_NAME} ) +# Install launch, config directories +install(DIRECTORY launch DESTINATION share/${PROJECT_NAME}) +install(DIRECTORY config DESTINATION share/${PROJECT_NAME}) + + if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) # the following line skips the linter which checks for copyrights diff --git a/README.md b/README.md index 2bbc9b5..548fbff 100644 --- a/README.md +++ b/README.md @@ -1,129 +1,159 @@ # System Data Recorder (SDR) - -A lifecycle node and executable for recording topic data to a rosbag2 bag, while simultaneously copying the split bag files to another location as each bag file is completed. +A lifecycle node for recording multiple, distinct rosbag2 bags sessions, while simultaneously copying the split bag files to another location as each bag file is completed. This is useful, for example, to copy bag data files to an external disc during recording as each data file is completed, rather than waiting until all data is recorded before copying the entire bag at once (an operation that can take a significant time if the bag is large). The copying function requires that a maximum file size for bag files be enabled. Otherwise no splitting will be performed and the files will not be copied until recording is terminated. +## Configuration + +The SDR is configured using a YAML parameter file, which is loaded via a ROS 2 launch file. This is the recommended way to run the node. + +#### Parameters -## Compilation +* `bag_name_prefix` (string): The base name for each bag file. A timestamp will be appended to this (e.g., `"recording"` becomes `"recording_2025-07-10_02-30-00"`). +* `copy_destination` (string): The parent directory where all recording session folders will be saved. +* `max_file_size` (int): The maximum size in bytes for an individual `.db3` file before it is split. A value greater than `0` is required for the copy-on-split feature to work. +* `topic_names` (string array): A list of topics to record. +* `topic_types` (string array): A corresponding list of message types for the topics. -The SDR requires two features not yet available in the rosbag2 main branch or binary releases. -You will need to compile rosbag2 from source, applying the following two pull requests to the source prior to compiling it. +#### Example `sdr_example.yaml` -1. [Expose the QoS object wrapper](https://github.com/ros2/rosbag2/pull/910) -2. [Notification of significant events during bag recording and playback](https://github.com/ros2/rosbag2/pull/908) +```yaml +sdr: + ros__parameters: + # The base name for each bag file. A timestamp will be appended to this. + bag_name_prefix: "robot_data" -Create a `colcon` workspace with the [SDR source code](https://github.com/osrf/system_data_recorder) in it, and compile the workspace. + # The parent directory where all recording session folders will be saved. + copy_destination: "/home/user/sdr_bags" + # Maximum size of each individual .db3 file in bytes before splitting. + max_file_size: 268435456 # 256 MiB + + # List of topics and their types to record. + topic_names: [ + "/chatter", + "/diagnostics" + ] + topic_types: [ + "std_msgs/msg/String", + "diagnostic_msgs/msg/DiagnosticArray" + ] +``` ## Use ### Lifecycle management -Run the executable to start the node. +Run the executable using a launch file that loads your parameters. For example: +`ros2 launch system_data_recorder sdr_example.launch.py` + +In a separate terminal, use the lifecycle manager to control the node. - ros2 run system_data_recorder system_data_recorder +1. **Configure the node.** +This sets up the main session directory and prepares the node for recording. This is done only once. -In a separate terminal, use the lifecycle manager to configure and activate the node. +```bash +ros2 lifecycle set /sdr configure +``` - ros2 lifecycle set sdr configure - ros2 lifecycle set sdr activate +2. **Activate to start recording.** +This begins a new, timestamped bag recording. -This will enable recording of data to the bag. -To pause recording, deactivate the node. +```bash +ros2 lifecycle set /sdr activate +``` - ros2 lifecycle set sdr deactivate +3. **Deactivate to stop recording.** +This finalizes the current bag and copies its files to the permanent destination. -From here, recording can be resumed by re-activating the node, or recording can be terminated by cleaning up the node. +```bash +ros2 lifecycle set /sdr deactivate +``` - ros2 lifecycle set sdr cleanup +You can repeat steps 2 and 3 as many times as you like to create multiple, separate bag files within the same run. -Once cleaned up, the node will copy the final files of the bag, as well as any metadata, to the backup destination. +4. **Cleanup the node.** +This stops the background threads and cleans up resources. -### Configuring +```bash +ros2 lifecycle set /sdr cleanup +``` -The SDR is configured by the arguments passed to the node's constructor. -These are not currently exposed as command line arguments or ROS parameters, so they must be changed in the source code. -See the constructor's documentation block for the possible parameters. +5. **Shutdown the node.** -By default, the SDR will: +```bash +ros2 lifecycle set /sdr shutdown +``` -- Record from the `/chatter` topic, expecting `std_msgs/msg/String` data. -- Record to a bag named `test_bag`. -- Copy bag files to the directory `copied_bag/test_bag`. -- Split bag files every 100,000 bytes. +### A Simple Test -### A simple test +1. In one terminal, start publishing data: -In one terminal, start publishing data on the `/chatter` topic. +```bash +ros2 launch system_data_recorder sdr_example.launch.py +``` - ros2 run demo_nodes_cpp talker +2. In another terminal, launch the SDR node with your parameters: -In another terminal, start the SDR node. +```bash +ros2 launch your_package_name your_launch_file.py +``` - ros2 run system_data_recorder system_data_recorder +3. In a third terminal, configure the SDR, then activate it to start the first recording: -In a third terminal, configure and activate the SDR. +```bash +ros2 lifecycle set /sdr configure +ros2 lifecycle set /sdr activate +``` - ros2 lifecycle set sdr configure - ros2 lifecycle set sdr activate +4. After some time, deactivate it to save the first bag: -Wait a minute or two for 100,000 bytes of data to be recorded. -Navigate to the directory `copied_bag` and observe that there is a `test_bag` -directory containing the first data file of the bag. +```bash +ros2 lifecycle set /sdr deactivate +``` -Deactivate and cleanup the SDR. +5. Activate it again to start a second, new recording: - ros2 lifecycle set sdr deactivate - ros2 lifecycle set sdr cleanup +```bash +ros2 lifecycle set /sdr activate +``` -Again navigate to the `copied_bag` directory, and observe that the `test_bag` -directory now contains all the data files of the bag, and a `metadata.yaml` -file. +6. Deactivate and clean up: -This is a complete, usable bag. -This can be demonstrated by stopping the `talker` node, then starting the -`listener` node and playing the bag file. +```bash +ros2 lifecycle set /sdr deactivate +ros2 lifecycle set /sdr cleanup +``` - ros2 run demo_nodes_cpp listener - ros2 bag play copied_bag/test_bag +Now, navigate to your `copy_destination` directory (e.g., `/home/user/sdr_bags`). You will find a main session folder (e.g., `sdr_session_2025-07-10_03-00-00`), and inside it will be two complete, timestamped bag directories from your two recording sessions. -The recorded data will be echoed by the `listener` node. +You can play back one of the bags: +```bash +ros2 bag play /home/user/sdr_bags/sdr_session_.../robot_data_... +``` -## Lifecycle transition behaviours +## Lifecycle Transition Behaviours ### on_configure -In the on_configure state, the node sets up the rosbag2 infrastructure for -recording by creating a writer and opening the storage. It also sets up the -worker thread that will copy files in parallel to the recording of data. A -callback is registered with the writer so that the node will get informed each -time a new file is created in the bag. +Creates the main, timestamped session directory that will contain all bags recorded during this run. It also starts the background file-copying thread. **It does not start any recording.** ### on_activate -In the on_activate transition, the node simply notifies the worker thread that -it is recording. Data will be written to the bag automatically because the -state changes to `Active`. +Begins a **new, unique recording session**. It creates a timestamped bag, sets up a temporary storage location, initializes a `rosbag2::Writer`, and subscribes to the requested topics. Data begins being written to the temporary bag file. ### on_deactivate -In the on_deactivate transition, the node simply notifies the worker thread -that it is paused. Data will not be written to the bag because the state -changes to `Inactive`. +**Finalizes the current recording session**. It unsubscribes from topics, closes the `rosbag2::Writer` (which flushes all data to disk and writes the `metadata.yaml` file), and queues the final bag files to be copied to their permanent destination within the main session folder. ### on_cleanup -When cleaning up, the node needs to stop recording, stop receiving data (i.e. -unsubscribe from topics), and ensure that the final files of the bag (the last -data file and the metadata file, which gets written when the writer object -destructs) are copied to the destination directory. +Ensures the last active recording session is properly deactivated and finalized. It then gracefully shuts down the background copy thread. ### on_shutdown -If not already performed, the shutdown transition performs the same functions -as the cleanup transition. +Performs the same actions as `on_cleanup` before shutting down the node. diff --git a/config/sdr_example.yaml b/config/sdr_example.yaml new file mode 100644 index 0000000..4bf64ed --- /dev/null +++ b/config/sdr_example.yaml @@ -0,0 +1,18 @@ +sdr: + ros__parameters: + # The base name for each bag file. A timestamp will be appended to this. + bag_name_prefix: "robot_data" + + # The parent directory where all recording session folders will be saved. + copy_destination: "/home/user/sdr_bags" + + # Maximum size of each individual .db3 file in bytes before splitting. + max_file_size: 268435456 # 256 MiB + + # List of topics and their types to record. + topic_names: [ + "/chatter", + ] + topic_types: [ + "std_msgs/msg/String", + ] \ No newline at end of file diff --git a/include/sdr/sdr_component.hpp b/include/sdr/sdr_component.hpp index d50d347..bf8b4cd 100644 --- a/include/sdr/sdr_component.hpp +++ b/include/sdr/sdr_component.hpp @@ -19,9 +19,10 @@ #include #include #include +#include // For timestamping +#include // For formatting the timestamp #include "sdr/visibility_control.h" - #include "rclcpp/rclcpp.hpp" #include "rclcpp_lifecycle/lifecycle_node.hpp" #include "rosbag2_cpp/writer.hpp" @@ -30,126 +31,91 @@ namespace sdr { -class SystemDataRecorder : public rclcpp_lifecycle::LifecycleNode -{ -public: - /// Constructor - /// - /// @param node_name The graph name to use for the SDR node - /// @param options The options to construct the node with - /// @param bag_uri The relative or full path to where the bag will be stored during recording. - /// Must not exist. - /// @param copy_destination The relative or full path to where the bag files will be copied. Must - /// not exist. - /// @param max_file_size The maximum size of each individual file in the bag, in bytes. This - /// should be tuned based on the time to copy each file, how often the backup files should be - /// copied, how fast data is coming in, etc. - /// @param topics_and_types A map of topics to record and their types. Keys of the map are topic - /// names, values of the map are the topic types, e.g. "example_interfaces/msg/String". - SDR_PUBLIC - SystemDataRecorder( - const std::string & node_name, - const rclcpp::NodeOptions & options, - const std::string & bag_uri, - const std::string & copy_destination, - const unsigned int max_file_size, - const std::unordered_map & topics_and_types); - - ///////////////////////////////////////////////////////////////////////////////////////////////// - // Lifecycle states - ///////////////////////////////////////////////////////////////////////////////////////////////// - SDR_PUBLIC - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - on_configure(const rclcpp_lifecycle::State & state); - - SDR_PUBLIC - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - on_activate(const rclcpp_lifecycle::State & state); - - SDR_PUBLIC - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - on_deactivate(const rclcpp_lifecycle::State & state); - - SDR_PUBLIC - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - on_cleanup(const rclcpp_lifecycle::State & state); - - SDR_PUBLIC - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - on_shutdown(const rclcpp_lifecycle::State & state); - -private: - ///////////////////////////////////////////////////////////////////////////////////////////////// - // Variables and functions for the record functionality - ///////////////////////////////////////////////////////////////////////////////////////////////// - - // Options for the bag storage - rosbag2_storage::StorageOptions storage_options_; - // The topics that will be recorded, and their types - std::unordered_map topics_and_types_; - // A map from topic names to subscriber entities - std::unordered_map> subscriptions_; - - // This writer does the actual storing of data during recording - std::shared_ptr writer_; - - void start_recording(); - void stop_recording(); - void pause_recording(); - void unpause_recording(); - - void subscribe_to_topics(); - void subscribe_to_topic(const std::string & topic, const std::string & type); - std::string get_serialised_offered_qos_for_topic(const std::string & topic); - rclcpp::QoS get_appropriate_qos_for_topic(const std::string & topic); - void unsubscribe_from_topics(); - - ///////////////////////////////////////////////////////////////////////////////////////////////// - // Variables and functions for the file-copying thread and functionality - ///////////////////////////////////////////////////////////////////////////////////////////////// - - // This is used to pass state-change messages from the node to the file-copying worker thread - enum class SdrStateChange + // A struct to hold information for the file-copying thread + struct FileCopyJob + { + std::string source_path; + std::filesystem::path destination_path; + }; + + class SystemDataRecorder : public rclcpp_lifecycle::LifecycleNode { - NO_CHANGE, - PAUSED, - RECORDING, - FINISHED + public: + SDR_PUBLIC + explicit SystemDataRecorder(const rclcpp::NodeOptions &options); + + SDR_PUBLIC + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + on_configure(const rclcpp_lifecycle::State &state); + + SDR_PUBLIC + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + on_activate(const rclcpp_lifecycle::State &state); + + SDR_PUBLIC + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + on_deactivate(const rclcpp_lifecycle::State &state); + + SDR_PUBLIC + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + on_cleanup(const rclcpp_lifecycle::State &state); + + SDR_PUBLIC + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + on_shutdown(const rclcpp_lifecycle::State &state); + + private: + // Parameter loading + bool read_parameters(); + + // Helper function to get a formatted timestamp string + std::string generate_timestamp(); + + // Bag recording functionality + void subscribe_to_topics(); + void subscribe_to_topic(const std::string &topic, const std::string &type); + void unsubscribe_from_topics(); + std::string get_serialised_offered_qos_for_topic(const std::string &topic); + rclcpp::QoS get_appropriate_qos_for_topic(const std::string &topic); + + rosbag2_storage::StorageOptions storage_options_; + std::unordered_map topics_and_types_; + std::unordered_map> subscriptions_; + std::shared_ptr writer_; + + // File-copying functionality + enum class SdrStateChange + { + NO_CHANGE, + PAUSED, + RECORDING, + FINISHED + }; + + SdrStateChange state_msg_ = SdrStateChange::NO_CHANGE; + std::queue files_to_copy_; // Queue now holds jobs + std::mutex copy_thread_mutex_; + std::condition_variable copy_thread_wake_cv_; + std::shared_ptr copy_thread_; + + void copy_thread_main(); + bool copy_thread_should_wake(); + void notify_state_change(SdrStateChange new_state); + void notify_new_file_to_copy(const FileCopyJob &job); + void copy_bag_file(const FileCopyJob &job); + + // Base configuration loaded from parameters + std::string base_bag_name_prefix_; + std::filesystem::path base_copy_destination_; + + // Session-specific paths, valid during an active recording + std::filesystem::path session_destination_directory_; // The main folder for this run + std::filesystem::path current_bag_tmp_directory_; // Temp folder for the current bag + std::filesystem::path current_bag_final_destination_; // Final folder for the current bag + std::string last_bag_file_ = ""; + bool cleaned_up = true; }; - // This variable holds the message to send to the worker thread - SdrStateChange state_msg_ = SdrStateChange::NO_CHANGE; - // This queue passes bag files that need to be copied to the worker thread - std::queue files_to_copy_; - // This mutex protects the two above variables - std::mutex copy_thread_mutex_; - // This condition variable is used to wake up the worker thread when there is a new state-change - // message or there are files to copy - std::condition_variable copy_thread_wake_cv_; - // The worker thread - std::shared_ptr copy_thread_; - - void copy_thread_main(); - bool copy_thread_should_wake(); - void notify_state_change(SdrStateChange new_state); - void notify_new_file_to_copy(const std::string & file_uri); - void notify_new_file_to_copy(const std::filesystem::path & file_path); - - // The source directory is the location of the bag, i.e. where bag files are copied from - std::filesystem::path source_directory_; - // The destination directory where bag files will be copied to - std::filesystem::path destination_directory_; - // The name of the final file in the bag; no event notification will be received when recording - // stops because there is no "stop recording" concept in rosbag2, you simply stop passing data to - // the writer. So we need to store the name of the final file and copy it during cleanup. - std::string last_bag_file_ = ""; - // Prevent multiple cleanup - bool cleaned_up = true; - - bool create_copy_destination(); - void copy_bag_file(const std::string & bag_file_name); -}; - -} // namespace sdr - -#endif // SDR__SDR_COMPONENT_HPP__ +} // namespace sdr + +#endif // SDR__SDR_COMPONENT_HPP__ \ No newline at end of file diff --git a/launch/sdr_example.launch.py b/launch/sdr_example.launch.py new file mode 100644 index 0000000..7612c09 --- /dev/null +++ b/launch/sdr_example.launch.py @@ -0,0 +1,24 @@ +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch_ros.actions import Node + +def generate_launch_description(): + # Get the path to the package + sdr_pkg_path = get_package_share_directory('system_data_recorder') # <-- CHANGE 'your_package_name' + + # Get the path to the YAML file + config_file_path = os.path.join(sdr_pkg_path, 'config', 'sdr_example.yaml') + + # Create the node action + sdr_node = Node( + package='system_data_recorder', # <-- CHANGE 'your_package_name' + executable='system_data_recorder', # <-- CHANGE to your C++ executable name from CMakeLists.txt + name='sdr', + output='screen', + parameters=[config_file_path] + ) + + return LaunchDescription([ + sdr_node + ]) \ No newline at end of file diff --git a/src/sdr_component.cpp b/src/sdr_component.cpp index a901a5c..62b902c 100644 --- a/src/sdr_component.cpp +++ b/src/sdr_component.cpp @@ -14,6 +14,7 @@ #include "sdr/sdr_component.hpp" +#include #include "lifecycle_msgs/msg/state.hpp" #include "rosbag2_storage/topic_metadata.hpp" #include "rosbag2_transport/qos.hpp" @@ -22,449 +23,480 @@ namespace sdr { -SystemDataRecorder::SystemDataRecorder( - const std::string & node_name, - const rclcpp::NodeOptions & options, - const std::string & bag_uri, - const std::string & copy_destination, - const unsigned int max_file_size, - const std::unordered_map & topics_and_types) -: rclcpp_lifecycle::LifecycleNode(node_name, options), - topics_and_types_(topics_and_types), - source_directory_(bag_uri) -{ - // Set up the storage options for writing the bag - storage_options_.uri = bag_uri; - // Only the "sqlite3" storage backend is supported; currently this is the only backend provided - // by rosbag2 so this is not a problem at this time - storage_options_.storage_id = "sqlite3"; - // Set the maximum size of each individual file - storage_options_.max_bagfile_size = max_file_size; - // Using a write cache is not required, but it helps avoid disc I/O becoming a bottleneck. - // Set this option to 0 to disable the cache - storage_options_.max_cache_size = 100*1024*1024; - - // Get the name of the bag directory from the bag URI - the bag URI should not be empty - auto bag_directory_name = std::filesystem::path(bag_uri).filename(); - if (bag_directory_name.empty()) { - // There was probably a slash on the end - std::string bag_uri_copy = bag_uri; - bag_directory_name = std::filesystem::path( - bag_uri_copy.erase(bag_uri.size() - 1, 1)).filename(); - } - // A directory named after the bag will be created under the copy_destination directory to put - // the copied files into - destination_directory_ = std::filesystem::path(copy_destination) / bag_directory_name; - - // Initially, the "last" bag file will be the first bag file - last_bag_file_ = std::filesystem::path(bag_uri) / bag_directory_name.concat("_0.db3"); -} - -/////////////////////////////////////////////////////////////////////////////////////////////////// -// Lifecycle states -/////////////////////////////////////////////////////////////////////////////////////////////////// - -// In the on_configure state, we set up the rosbag2 infrastructure for recording by creating a -// writer and opening the storage. We also set up the worker thread that will copy files in -// parallel to the recording of data. A callback is registered with the writer so that we will get -// informed each time a new file is created in the bag. -rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn -SystemDataRecorder::on_configure(const rclcpp_lifecycle::State & /* state */) -{ - RCLCPP_INFO(get_logger(), "Preparing to begin recording"); + SystemDataRecorder::SystemDataRecorder(const rclcpp::NodeOptions &options) + : rclcpp_lifecycle::LifecycleNode("sdr", options) + { + RCLCPP_INFO(get_logger(), "Initializing SystemDataRecorder node..."); + if (!read_parameters()) + { + throw std::runtime_error("Failed to read parameters for SystemDataRecorder"); + } + } + + bool SystemDataRecorder::read_parameters() + { + // Declare parameters with new names + this->declare_parameter("bag_name_prefix", "bag"); + this->declare_parameter("copy_destination", ""); + this->declare_parameter("max_file_size", 0); + this->declare_parameter>("topic_names", std::vector{}); + this->declare_parameter>("topic_types", std::vector{}); + + std::string copy_destination_str; + int max_file_size; + std::vector topic_names, topic_types; + + if (!this->get_parameter("bag_name_prefix", base_bag_name_prefix_) || base_bag_name_prefix_.empty()) + { + RCLCPP_FATAL(get_logger(), "Required parameter 'bag_name_prefix' not set or is empty."); + return false; + } + if (!this->get_parameter("copy_destination", copy_destination_str) || copy_destination_str.empty()) + { + RCLCPP_FATAL(get_logger(), "Required parameter 'copy_destination' not set or is empty."); + return false; + } + base_copy_destination_ = copy_destination_str; + + if (!this->get_parameter("max_file_size", max_file_size) || max_file_size <= 0) + { + RCLCPP_FATAL(get_logger(), "Required parameter 'max_file_size' not set or is <= 0."); + return false; + } + this->get_parameter("topic_names", topic_names); + this->get_parameter("topic_types", topic_types); + + if (topic_names.size() != topic_types.size()) + { + RCLCPP_FATAL(get_logger(), "'topic_names' and 'topic_types' must have the same number of entries."); + return false; + } - RCLCPP_INFO(get_logger(), "Copying bag files to %s", destination_directory_.c_str()); + for (size_t i = 0; i < topic_names.size(); ++i) + { + this->topics_and_types_[topic_names[i]] = topic_types[i]; + } + + storage_options_.storage_id = "sqlite3"; + storage_options_.max_bagfile_size = max_file_size; + storage_options_.max_cache_size = 100 * 1024 * 1024; + + RCLCPP_INFO(get_logger(), "--- SystemDataRecorder Base Configuration ---"); + RCLCPP_INFO(get_logger(), "* Bag Name Prefix: %s", base_bag_name_prefix_.c_str()); + RCLCPP_INFO(get_logger(), "* Base Copy Destination: %s", base_copy_destination_.c_str()); + RCLCPP_INFO(get_logger(), "* Max File Size: %d bytes", max_file_size); + RCLCPP_INFO(get_logger(), "* Topics to Record:"); + if (this->topics_and_types_.empty()) + { + RCLCPP_INFO(get_logger(), " - None"); + } + else + { + for (const auto &pair : this->topics_and_types_) + { + RCLCPP_INFO(get_logger(), " - %s (%s)", pair.first.c_str(), pair.second.c_str()); + } + } + RCLCPP_INFO(get_logger(), "-------------------------------------------"); - // Prepare for file-copying by creating the destination directory and setting up a worker thread - try { - if (!create_copy_destination()) { - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::FAILURE; + return true; } - } - catch (std::filesystem::filesystem_error const & ex) { - RCLCPP_ERROR(get_logger(), "Could not create destination directory for bag backup"); - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::FAILURE; - } - copy_thread_ = std::make_shared([this]{ copy_thread_main(); }); - - // Notify the copy thread to get it into the correct state - notify_state_change(SdrStateChange::PAUSED); - - // Prepare the rosbag2 objects for recording - // A sequential writer is used as this is the only writer currently available - writer_ = std::make_shared( - std::make_unique()); - // Add a callback for when a split occurs - rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; - callbacks.write_split_callback = - [this] - (rosbag2_cpp::bag_events::BagSplitInfo & info) { - // Record the opened file - this will be the last file remaining to be copied when recording - // is terminated - last_bag_file_ = info.opened_file; - // Notify the worker thread about the closed file being ready to copy - notify_new_file_to_copy(info.closed_file); - }; - writer_->add_event_callbacks(callbacks); - // Open the bag - writer_->open( - storage_options_, - {rmw_get_serialization_format(), rmw_get_serialization_format()}); - - // Start receiving data - it won't be recorded yet because the node's lifecycle is not in the - // Active state - subscribe_to_topics(); - - // Make sure cleanup happens when the on_cleanup transition occurs - this allows the node to be - // cleaned up and then configured again (with the bag files moved away somewhere else) without - // shutting it down - cleaned_up = false; - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; -} - -// In the on_activate transition, we simply notify the worker thread that we are recording. Data -// will be written to the bag because the state changes to Active. -rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn -SystemDataRecorder::on_activate(const rclcpp_lifecycle::State & /* state */) -{ - RCLCPP_INFO(get_logger(), "Starting recording"); - // Notify the copy thread - notify_state_change(SdrStateChange::RECORDING); - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; -} + std::string SystemDataRecorder::generate_timestamp() + { + auto now = std::chrono::system_clock::now(); + auto in_time_t = std::chrono::system_clock::to_time_t(now); + std::stringstream ss; + ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d_%H-%M-%S"); + return ss.str(); + } -// In the on_deactivate transition, we simply notify the worker thread that we are paused. Data -// will not be written to the bag because the state changes to Inactive. -rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn -SystemDataRecorder::on_deactivate(const rclcpp_lifecycle::State & /* state */) -{ - RCLCPP_INFO(get_logger(), "Pausing recording"); - - // Notify the copy thread - notify_state_change(SdrStateChange::PAUSED); - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; -} - -// When cleaning up, we need to stop recording, stop receiving data (i.e. unsubscribe from topics), -// and ensure that the final files of the bag (the last data file and the metadata file, which gets -// written when the writer object destructs) are copied to the destination directory. -rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn -SystemDataRecorder::on_cleanup(const rclcpp_lifecycle::State & /* state */) -{ - RCLCPP_INFO(get_logger(), "Stopping and finalising recording"); - // Avoid double-cleanup if on_shutdown is called after this - cleaned_up = true; - - // Stop recording by stopping data from arriving - unsubscribe_from_topics(); - // Clean up the writer so that the bag is closed and the metadata file is written - writer_.reset(); - // Copy the final data file - notify_new_file_to_copy(last_bag_file_); - // Copy the metadata file - notify_new_file_to_copy(source_directory_ / "metadata.yaml"); - - // Notify the worker thread that we are cleaning up, so that it exits onces it's finished its - // final work - notify_state_change(SdrStateChange::FINISHED); - copy_thread_->join(); - // Get rid of the worker thread - copy_thread_.reset(); - - RCLCPP_INFO(get_logger(), "Cleanup complete"); - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; -} - -// If not already performed, the shutdown transition performs the same functions as the cleanup -// transition. -rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn -SystemDataRecorder::on_shutdown(const rclcpp_lifecycle::State & /* state */) -{ - RCLCPP_INFO(get_logger(), "Stopping and finalising recording (hard shutdown)"); - if (!cleaned_up) { - // Stop recording - unsubscribe_from_topics(); - // Clean up the writer - writer_.reset(); - // Copy the final file - notify_new_file_to_copy(last_bag_file_); - // Copy the metadata file - notify_new_file_to_copy(source_directory_ / "metadata.yaml"); - } - - // Notify the copy thread - but only if it hasn't already been cleaned up - if (copy_thread_) { - notify_state_change(SdrStateChange::FINISHED); - copy_thread_->join(); - copy_thread_.reset(); - } - RCLCPP_INFO(get_logger(), "Cleanup complete"); - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; -} - -/////////////////////////////////////////////////////////////////////////////////////////////////// -// Bag recording functionality -/////////////////////////////////////////////////////////////////////////////////////////////////// - -void SystemDataRecorder::subscribe_to_topics() -{ - for (const auto & topic_with_type : topics_and_types_) { - subscribe_to_topic(topic_with_type.first, topic_with_type.second); - } -} + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + SystemDataRecorder::on_configure(const rclcpp_lifecycle::State &) + { + RCLCPP_INFO(get_logger(), "Configuring SDR. Creating main session directory."); -void SystemDataRecorder::subscribe_to_topic(const std::string & topic, const std::string & type) -{ - // Get the QoS offered by the topic for saving in the bag - auto offered_qos = get_serialised_offered_qos_for_topic(topic); - // Find out what QoS is most appropriate to use when subscribing to the topic - auto qos = get_appropriate_qos_for_topic(topic); + // Create the main timestamped directory for all bags from this run + session_destination_directory_ = base_copy_destination_ / ("sdr_session_" + generate_timestamp()); - // The metadata to pass to the writer object so it can register the topic in the bag - auto topic_metadata = rosbag2_storage::TopicMetadata( + try + { + std::filesystem::create_directories(session_destination_directory_); + } + catch (const std::filesystem::filesystem_error &ex) + { + RCLCPP_ERROR( + get_logger(), "Could not create session directory '%s': %s", + session_destination_directory_.c_str(), ex.what()); + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::FAILURE; + } + RCLCPP_INFO( + get_logger(), "All copied bags for this session will be stored in: %s", + session_destination_directory_.c_str()); + + copy_thread_ = std::make_shared([this] + { this->copy_thread_main(); }); + notify_state_change(SdrStateChange::PAUSED); + cleaned_up = false; + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; + } + + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + SystemDataRecorder::on_activate(const rclcpp_lifecycle::State &) { - topic, // Topic name - type, // Topic type, e.g. "example_interfaces/msg/String" - rmw_get_serialization_format(), // The serialization format, most likely to be "CDR" - offered_qos // The offered QoS profile for the topic in YAML + std::string bag_name = base_bag_name_prefix_ + "_" + generate_timestamp(); + RCLCPP_INFO(get_logger(), "Activating. Starting new recording session: %s", bag_name.c_str()); + + // Set up paths for this specific recording session + current_bag_tmp_directory_ = std::filesystem::temp_directory_path() / bag_name; + current_bag_final_destination_ = session_destination_directory_ / bag_name; + storage_options_.uri = current_bag_tmp_directory_.string(); + + // Pre-emptively set the name of the first bag file. This ensures we always have a valid + // filename to copy, even if no bag split occurs. + last_bag_file_ = (current_bag_tmp_directory_ / (bag_name + "_0.db3")).string(); + + // Ensure the temporary directory is clean + if (std::filesystem::exists(current_bag_tmp_directory_)) + { + std::filesystem::remove_all(current_bag_tmp_directory_); + } + std::filesystem::create_directories(current_bag_final_destination_); + + writer_ = std::make_shared( + std::make_unique()); + + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [this](rosbag2_cpp::bag_events::BagSplitInfo &info) + { + last_bag_file_ = info.opened_file; + notify_new_file_to_copy({info.closed_file, current_bag_final_destination_}); + }; + writer_->add_event_callbacks(callbacks); + writer_->open(storage_options_, {rmw_get_serialization_format(), rmw_get_serialization_format()}); + + subscribe_to_topics(); + notify_state_change(SdrStateChange::RECORDING); + + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; } - ); - // It is a good idea to create the topic in the writer prior to adding the subscription in case - // data arrives after subscribing and before the topic is created. Although we should be ignoring - // any data until the node is set to active, we maintain this good practice here for future - // maintainability. - writer_->create_topic(topic_metadata); - - // Create a generic subscriber. A generic subscriber received message data in serialized form, - // which means that: - // - No de-serialization will take place, saving that processing time, and - // - The data type does not need to be known at compile time, so we don't need a templated - // callback for when message data is received. - auto subscription = create_generic_subscription( - topic, - type, - qos, - [this, topic, type](std::shared_ptr message) { - // When a message is received, it should only be written to the bag if recording is not - // paused (i.e. the node lifecycle state is "active"). If recording is paused, the message is - // thrown away. - if (get_current_state().id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE) { - writer_->write(message, topic, type, rclcpp::Clock(RCL_SYSTEM_TIME).now()); - } - }); - if (subscription) { - subscriptions_.insert({topic, subscription}); - RCLCPP_INFO(get_logger(), "Subscribed to topic '%s'", topic.c_str()); - } else { - writer_->remove_topic(topic_metadata); - RCLCPP_ERROR(get_logger(), "Failed to subscribe to topic '%s'", topic.c_str()); - } -} - -std::string SystemDataRecorder::get_serialised_offered_qos_for_topic(const std::string & topic) -{ - YAML::Node offered_qos_profiles; - auto endpoints = get_publishers_info_by_topic(topic); - for (const auto & endpoint : endpoints) { - offered_qos_profiles.push_back(rosbag2_transport::Rosbag2QoS(endpoint.qos_profile())); - } - return YAML::Dump(offered_qos_profiles); -} - -// Figure out the most appropriate QoS for a given topic. This method tries to decide if -// constrained QoS can be used, or if less-trustworthy QoS needs to be used to catch data from -// every publisher. -// Returns the QoS to use. -rclcpp::QoS SystemDataRecorder::get_appropriate_qos_for_topic(const std::string & topic) -{ - auto qos = rclcpp::QoS(rmw_qos_profile_default.depth); - - // Get the information about known publishers on this topic - auto endpoints = get_publishers_info_by_topic(topic); - if (endpoints.empty()) { - // There are not yet any publishers on the topic. - // Use the default QoS profile, as we do not know what publishers will use since there are not - // yet any publishers on this topic. - return qos; - } - - // Count the number of reliable and transient-local publishers for the topic - size_t reliability_reliable_endpoints_count = 0; - size_t durability_transient_local_endpoints_count = 0; - for (const auto & endpoint : endpoints) { - const auto & profile = endpoint.qos_profile().get_rmw_qos_profile(); - if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - ++reliability_reliable_endpoints_count; + + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + SystemDataRecorder::on_deactivate(const rclcpp_lifecycle::State &) + { + RCLCPP_INFO(get_logger(), "Deactivating. Finalizing current bag."); + notify_state_change(SdrStateChange::PAUSED); + unsubscribe_from_topics(); + + if (writer_) + { + writer_.reset(); // Finalizes bag and writes metadata.yaml + notify_new_file_to_copy({last_bag_file_, current_bag_final_destination_}); + notify_new_file_to_copy( + {current_bag_tmp_directory_ / "metadata.yaml", current_bag_final_destination_}); + } + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; + } + + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + SystemDataRecorder::on_cleanup(const rclcpp_lifecycle::State &state) + { + RCLCPP_INFO(get_logger(), "Cleaning up SDR."); + if (get_current_state().id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE) + { + on_deactivate(state); + } + + if (copy_thread_) + { + notify_state_change(SdrStateChange::FINISHED); + copy_thread_->join(); + copy_thread_.reset(); + } + cleaned_up = true; + RCLCPP_INFO(get_logger(), "Cleanup complete."); + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; + } + + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + SystemDataRecorder::on_shutdown(const rclcpp_lifecycle::State &state) + { + RCLCPP_INFO(get_logger(), "Shutting down SDR."); + if (!cleaned_up) + { + on_cleanup(state); + } + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; + } + + /////////////////////////////////////////////////////////////////////////////////////////////////// + // Bag recording functionality + /////////////////////////////////////////////////////////////////////////////////////////////////// + + void SystemDataRecorder::subscribe_to_topics() + { + for (const auto &topic_with_type : topics_and_types_) + { + subscribe_to_topic(topic_with_type.first, topic_with_type.second); + } } - if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ++durability_transient_local_endpoints_count; + + void SystemDataRecorder::subscribe_to_topic(const std::string &topic, const std::string &type) + { + // Get the QoS offered by the topic for saving in the bag + auto offered_qos = get_serialised_offered_qos_for_topic(topic); + // Find out what QoS is most appropriate to use when subscribing to the topic + auto qos = get_appropriate_qos_for_topic(topic); + + // The metadata to pass to the writer object so it can register the topic in the bag + auto topic_metadata = rosbag2_storage::TopicMetadata( + { + topic, // Topic name + type, // Topic type, e.g. "example_interfaces/msg/String" + rmw_get_serialization_format(), // The serialization format, most likely to be "CDR" + offered_qos // The offered QoS profile for the topic in YAML + }); + // It is a good idea to create the topic in the writer prior to adding the subscription in case + // data arrives after subscribing and before the topic is created. Although we should be ignoring + // any data until the node is set to active, we maintain this good practice here for future + // maintainability. + writer_->create_topic(topic_metadata); + + // Create a generic subscriber. A generic subscriber received message data in serialized form, + // which means that: + // - No de-serialization will take place, saving that processing time, and + // - The data type does not need to be known at compile time, so we don't need a templated + // callback for when message data is received. + auto subscription = create_generic_subscription( + topic, + type, + qos, + [this, topic, type](std::shared_ptr message) + { + // When a message is received, it should only be written to the bag if recording is not + // paused (i.e. the node lifecycle state is "active"). If recording is paused, the message is + // thrown away. + if (get_current_state().id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE) + { + writer_->write(message, topic, type, rclcpp::Clock(RCL_SYSTEM_TIME).now()); + } + }); + if (subscription) + { + subscriptions_.insert({topic, subscription}); + RCLCPP_INFO(get_logger(), "Subscribed to topic '%s'", topic.c_str()); + } + else + { + writer_->remove_topic(topic_metadata); + RCLCPP_ERROR(get_logger(), "Failed to subscribe to topic '%s'", topic.c_str()); + } } - } - - if (reliability_reliable_endpoints_count == endpoints.size()) { - // All publishers are reliable, so we can use the reliable QoS - qos.reliable(); - } else { - if (reliability_reliable_endpoints_count > 0) { - // There is a mix of QoS profiles amongst the publishers, so use the QoS setting that captures - // all of them - RCLCPP_WARN( - get_logger(), - "Some, but not all, publishers on topic \"%s\" are offering " - "RMW_QOS_POLICY_RELIABILITY_RELIABLE. Falling back to " - "RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT as it will connect to all publishers. Some " - "messages from Reliable publishers could be dropped.", - topic.c_str()); + + std::string SystemDataRecorder::get_serialised_offered_qos_for_topic(const std::string &topic) + { + YAML::Node offered_qos_profiles; + auto endpoints = get_publishers_info_by_topic(topic); + for (const auto &endpoint : endpoints) + { + offered_qos_profiles.push_back(rosbag2_transport::Rosbag2QoS(endpoint.qos_profile())); + } + return YAML::Dump(offered_qos_profiles); } - qos.best_effort(); - } - - if (durability_transient_local_endpoints_count == endpoints.size()) { - // All publishers are transient local, so we can use the transient local QoS - qos.transient_local(); - } else { - if (durability_transient_local_endpoints_count > 0) { - // There is a mix of QoS profiles amongst the publishers, so use the QoS setting that captures - // all of them - RCLCPP_WARN( - get_logger(), - "Some, but not all, publishers on topic \"%s\" are offering " - "RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. Falling back to " - "RMW_QOS_POLICY_DURABILITY_VOLATILE as it will connect to all publishers. Previously-" - "published latched messages will not be retrieved.", - topic.c_str()); + + // Figure out the most appropriate QoS for a given topic. This method tries to decide if + // constrained QoS can be used, or if less-trustworthy QoS needs to be used to catch data from + // every publisher. + // Returns the QoS to use. + rclcpp::QoS SystemDataRecorder::get_appropriate_qos_for_topic(const std::string &topic) + { + auto qos = rclcpp::QoS(rmw_qos_profile_default.depth); + + // Get the information about known publishers on this topic + auto endpoints = get_publishers_info_by_topic(topic); + if (endpoints.empty()) + { + // There are not yet any publishers on the topic. + // Use the default QoS profile, as we do not know what publishers will use since there are not + // yet any publishers on this topic. + return qos; + } + + // Count the number of reliable and transient-local publishers for the topic + size_t reliability_reliable_endpoints_count = 0; + size_t durability_transient_local_endpoints_count = 0; + for (const auto &endpoint : endpoints) + { + const auto &profile = endpoint.qos_profile().get_rmw_qos_profile(); + if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) + { + ++reliability_reliable_endpoints_count; + } + if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) + { + ++durability_transient_local_endpoints_count; + } + } + + if (reliability_reliable_endpoints_count == endpoints.size()) + { + // All publishers are reliable, so we can use the reliable QoS + qos.reliable(); + } + else + { + if (reliability_reliable_endpoints_count > 0) + { + // There is a mix of QoS profiles amongst the publishers, so use the QoS setting that captures + // all of them + RCLCPP_WARN( + get_logger(), + "Some, but not all, publishers on topic \"%s\" are offering " + "RMW_QOS_POLICY_RELIABILITY_RELIABLE. Falling back to " + "RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT as it will connect to all publishers. Some " + "messages from Reliable publishers could be dropped.", + topic.c_str()); + } + qos.best_effort(); + } + + if (durability_transient_local_endpoints_count == endpoints.size()) + { + // All publishers are transient local, so we can use the transient local QoS + qos.transient_local(); + } + else + { + if (durability_transient_local_endpoints_count > 0) + { + // There is a mix of QoS profiles amongst the publishers, so use the QoS setting that captures + // all of them + RCLCPP_WARN( + get_logger(), + "Some, but not all, publishers on topic \"%s\" are offering " + "RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. Falling back to " + "RMW_QOS_POLICY_DURABILITY_VOLATILE as it will connect to all publishers. Previously-" + "published latched messages will not be retrieved.", + topic.c_str()); + } + qos.durability_volatile(); + } + + return qos; } - qos.durability_volatile(); - } - return qos; -} + void SystemDataRecorder::unsubscribe_from_topics() + { + // Make a note of the topics we are subscribed to + std::vector topics; + for (const auto &topic_with_type : topics_and_types_) + { + topics.push_back(rosbag2_storage::TopicMetadata( + {topic_with_type.first, + topic_with_type.second, + rmw_get_serialization_format(), + ""})); + } + // Unsubscribing happens automatically when the subscription objects are destroyed + subscriptions_.clear(); + } -void SystemDataRecorder::unsubscribe_from_topics() -{ - // Make a note of the topics we are subscribed to - std::vector topics; - for (const auto & topic_with_type : topics_and_types_) { - topics.push_back(rosbag2_storage::TopicMetadata( - { - topic_with_type.first, - topic_with_type.second, - rmw_get_serialization_format(), - "" - })); - } - // Unsubscribing happens automatically when the subscription objects are destroyed - subscriptions_.clear(); -} - -/////////////////////////////////////////////////////////////////////////////////////////////////// -// File-copying thread -/////////////////////////////////////////////////////////////////////////////////////////////////// - -// The main function for the file-copying worker thread -void SystemDataRecorder::copy_thread_main() -{ - RCLCPP_INFO(get_logger(), "Copy thread: Starting"); - // Local state storage, to keep the critical section as short as possible - SdrStateChange current_state = SdrStateChange::PAUSED; - std::queue local_files_to_copy; - // Loop until receiving a state-change message indicating it is time to stop - while (current_state != SdrStateChange::FINISHED) - { - { // Critical section start - std::unique_lock lock(copy_thread_mutex_); - if (files_to_copy_.empty()) { - // Only wait if there's nothing to copy, otherwise skip the wait and go on - while (!copy_thread_should_wake()) { - copy_thread_wake_cv_.wait(lock); - } - } - - // If the state has changed, make a note of the new state - if (state_msg_ != SdrStateChange::NO_CHANGE) { - current_state = state_msg_; - // Reset the message-carrying variable so we don't get confused the next time around the - // loop - state_msg_ = SdrStateChange::NO_CHANGE; - } - - // local_files_to_copy_ will be empty here, so we are swapping the received queue of files - // and emptying the message-carrying queue at the same time - local_files_to_copy.swap(files_to_copy_); - } // Critical section end - - // Now copy any files that were sent from the node - while(!local_files_to_copy.empty()) { - std::string uri = local_files_to_copy.front(); - local_files_to_copy.pop(); - copy_bag_file(uri); + /////////////////////////////////////////////////////////////////////////////////////////////////// + // File-copying thread + /////////////////////////////////////////////////////////////////////////////////////////////////// + + void SystemDataRecorder::copy_thread_main() + { + RCLCPP_INFO(get_logger(), "Copy thread: Starting"); + SdrStateChange current_state = SdrStateChange::PAUSED; + std::queue local_files_to_copy; + + while (current_state != SdrStateChange::FINISHED) + { + { + std::unique_lock lock(copy_thread_mutex_); + copy_thread_wake_cv_.wait( + lock, [this] + { return copy_thread_should_wake(); }); + + if (state_msg_ != SdrStateChange::NO_CHANGE) + { + current_state = state_msg_; + state_msg_ = SdrStateChange::NO_CHANGE; + } + local_files_to_copy.swap(files_to_copy_); + } + + while (!local_files_to_copy.empty()) + { + FileCopyJob job = local_files_to_copy.front(); + local_files_to_copy.pop(); + copy_bag_file(job); + } + } + + // After thread is finished, one last check for any remaining files + while (!files_to_copy_.empty()) + { + copy_bag_file(files_to_copy_.front()); + files_to_copy_.pop(); + } + // And clean up the final temporary directory if it exists + if (std::filesystem::exists(current_bag_tmp_directory_)) + { + std::filesystem::remove_all(current_bag_tmp_directory_); + } + RCLCPP_INFO(get_logger(), "Copy thread: Exiting"); } - } - RCLCPP_INFO(get_logger(), "Copy thread: Exiting"); -} -// Only wake up if there are files to copy or a state-change message has been received -bool SystemDataRecorder::copy_thread_should_wake() -{ - return state_msg_ != SdrStateChange::NO_CHANGE || !files_to_copy_.empty(); -} + bool SystemDataRecorder::copy_thread_should_wake() + { + return state_msg_ != SdrStateChange::NO_CHANGE || !files_to_copy_.empty(); + } -// Notify the worker thread of a state change by sending it a message and triggering its condition -// variable -void SystemDataRecorder::notify_state_change(SdrStateChange new_state) -{ - { // Critical section start - std::lock_guard lock(copy_thread_mutex_); - // new_state must not be NO_CHANGE or the copy thread won't wake up - state_msg_ = new_state; - } // Critical section end - copy_thread_wake_cv_.notify_one(); -} - -// Notify the worker thread of a state change by adding the file to the queue and triggering its -// condition variable -void SystemDataRecorder::notify_new_file_to_copy(const std::string & file_uri) -{ - { // Critical section start - std::lock_guard lock(copy_thread_mutex_); - files_to_copy_.push(file_uri); - } // Critical section end - copy_thread_wake_cv_.notify_one(); -} - -// Override that accepts a std::filesystem::path -void SystemDataRecorder::notify_new_file_to_copy(const std::filesystem::path & file_path) -{ - notify_new_file_to_copy(file_path.string()); -} + void SystemDataRecorder::notify_state_change(SdrStateChange new_state) + { + { + std::lock_guard lock(copy_thread_mutex_); + state_msg_ = new_state; + } + copy_thread_wake_cv_.notify_one(); + } -/////////////////////////////////////////////////////////////////////////////////////////////////// -// File-copying functionality -/////////////////////////////////////////////////////////////////////////////////////////////////// + void SystemDataRecorder::notify_new_file_to_copy(const FileCopyJob &job) + { + { + std::lock_guard lock(copy_thread_mutex_); + files_to_copy_.push(job); + } + copy_thread_wake_cv_.notify_one(); + } -// Create the destination directory to copy bag files to -// Returns true if the directory was created, false otherwise -bool SystemDataRecorder::create_copy_destination() -{ - if (std::filesystem::exists(destination_directory_)) { - RCLCPP_ERROR(get_logger(), "Copy destination directory already exists"); - return false; - } - RCLCPP_INFO(get_logger(), "Creating destination directory %s", destination_directory_.c_str()); - return std::filesystem::create_directories(destination_directory_); -} - -// Copy a bag file to the destination directory -void SystemDataRecorder::copy_bag_file(const std::string & bag_file_name) -{ - RCLCPP_INFO( - get_logger(), - "Copying %s to %s", - bag_file_name.c_str(), - destination_directory_.c_str()); + /////////////////////////////////////////////////////////////////////////////////////////////////// + // File-copying functionality + /////////////////////////////////////////////////////////////////////////////////////////////////// - std::filesystem::copy(bag_file_name, destination_directory_); -} + void SystemDataRecorder::copy_bag_file(const FileCopyJob &job) + { + if (job.source_path.empty() || !std::filesystem::exists(job.source_path)) + { + RCLCPP_WARN(get_logger(), "Skipping copy of non-existent source file: %s", job.source_path.c_str()); + return; + } + RCLCPP_INFO( + get_logger(), "Copying %s to %s", + job.source_path.c_str(), job.destination_path.c_str()); + try + { + std::filesystem::copy(job.source_path, job.destination_path); + } + catch (const std::filesystem::filesystem_error &e) + { + RCLCPP_ERROR( + get_logger(), "Failed to copy '%s' to '%s': %s", + job.source_path.c_str(), job.destination_path.c_str(), e.what()); + } + } -} // namespace sdr +} // namespace sdr \ No newline at end of file diff --git a/src/system_data_recorder.cpp b/src/system_data_recorder.cpp index d05f489..1cacefc 100644 --- a/src/system_data_recorder.cpp +++ b/src/system_data_recorder.cpp @@ -22,22 +22,15 @@ #include "rosbag2_cpp/writer.hpp" #include "rosbag2_transport/recorder.hpp" -int main(int argc, char * argv[]) +int main(int argc, char *argv[]) { rclcpp::init(argc, argv); + rclcpp::executors::SingleThreadedExecutor exec; - auto topics_and_types = std::unordered_map(); - topics_and_types.insert({"/chatter", "std_msgs/msg/String"}); - auto sdr_component = std::make_shared( - "sdr", - rclcpp::NodeOptions(), - "test_bag", - "copied_bag", - 100000, - topics_and_types); + // Create the node. It will load its parameters automatically. + auto sdr_node = std::make_shared(rclcpp::NodeOptions()); - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(sdr_component->get_node_base_interface()); + exec.add_node(sdr_node->get_node_base_interface()); exec.spin(); rclcpp::shutdown(); From 9e56d9473db2a88d47d53ff206286c847e8e65e5 Mon Sep 17 00:00:00 2001 From: nimiCurtis Date: Wed, 10 Sep 2025 13:32:44 +0300 Subject: [PATCH 2/9] update cmake version --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c33e60c..3bcd0ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.8) +cmake_minimum_required(VERSION 3.10...3.27) project(system_data_recorder) if(NOT CMAKE_CXX_STANDARD) From d038cee9e30567d259dd8aa54c28dba5e92ed774 Mon Sep 17 00:00:00 2001 From: nimrod Date: Sun, 28 Sep 2025 14:25:43 +0300 Subject: [PATCH 3/9] pkg xmlA --- package.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.xml b/package.xml index 4bd6ff5..03d56c4 100644 --- a/package.xml +++ b/package.xml @@ -18,8 +18,8 @@ rosbag2_storage yaml_cpp_vendor - ament_lint_auto - ament_lint_common + ament_cmake From 93f7b8d209f0661c41ac7a8e44ad7a008397ab7e Mon Sep 17 00:00:00 2001 From: nimiCurtis Date: Mon, 20 Oct 2025 09:13:14 +0300 Subject: [PATCH 4/9] Add SDRKeyboardCommander for keyboard control and service interaction - Implemented SDRKeyboardCommander class for handling keyboard input and lifecycle service requests. - Added sdr_commander_main executable to initialize and run the commander node. - Updated CMakeLists.txt to include new executables and dependencies. - Modified sdr_example.yaml to use a placeholder for the copy destination path. --- CMakeLists.txt | 21 +++ config/sdr_example.yaml | 2 +- include/sdr/sdr_keyboard_commander.hpp | 85 +++++++++++ src/sdr_commander_main.cpp | 15 ++ src/sdr_keyboard_commander.cpp | 202 +++++++++++++++++++++++++ 5 files changed, 324 insertions(+), 1 deletion(-) create mode 100644 include/sdr/sdr_keyboard_commander.hpp create mode 100644 src/sdr_commander_main.cpp create mode 100644 src/sdr_keyboard_commander.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 128970e..0d3f594 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,11 +37,23 @@ ament_target_dependencies(sdr_component rclcpp_components_register_nodes(sdr_component "sdr::SystemDataRecorder") add_executable(system_data_recorder src/system_data_recorder.cpp) +add_executable(sdr_commander_main + src/sdr_commander_main.cpp + src/sdr_keyboard_commander.cpp +) + target_link_libraries(system_data_recorder sdr_component) + ament_target_dependencies(system_data_recorder rclcpp ) +ament_target_dependencies(sdr_commander_main + rclcpp + lifecycle_msgs +) + + install(TARGETS sdr_component ARCHIVE DESTINATION lib @@ -54,6 +66,11 @@ install(TARGETS DESTINATION lib/${PROJECT_NAME} ) +install(TARGETS + sdr_commander_main + DESTINATION lib/${PROJECT_NAME} +) + # Install launch, config directories install(DIRECTORY launch DESTINATION share/${PROJECT_NAME}) install(DIRECTORY config DESTINATION share/${PROJECT_NAME}) @@ -71,3 +88,7 @@ if(BUILD_TESTING) endif() ament_package() + + + + diff --git a/config/sdr_example.yaml b/config/sdr_example.yaml index 4bf64ed..fd3d519 100644 --- a/config/sdr_example.yaml +++ b/config/sdr_example.yaml @@ -4,7 +4,7 @@ sdr: bag_name_prefix: "robot_data" # The parent directory where all recording session folders will be saved. - copy_destination: "/home/user/sdr_bags" + copy_destination: "/home//sdr_bags" # Maximum size of each individual .db3 file in bytes before splitting. max_file_size: 268435456 # 256 MiB diff --git a/include/sdr/sdr_keyboard_commander.hpp b/include/sdr/sdr_keyboard_commander.hpp new file mode 100644 index 0000000..6b79362 --- /dev/null +++ b/include/sdr/sdr_keyboard_commander.hpp @@ -0,0 +1,85 @@ +#ifndef SDR_KEYBOARD_COMMANDER_HPP_ +#define SDR_KEYBOARD_COMMANDER_HPP_ + +#include "rclcpp/rclcpp.hpp" +#include "lifecycle_msgs/srv/change_state.hpp" +#include "lifecycle_msgs/srv/get_state.hpp" +#include "lifecycle_msgs/msg/transition.hpp" +#include "lifecycle_msgs/msg/state.hpp" + +#include // For non-blocking keyboard input +#include // For STDIN_FILENO +#include +#include + +// Define aliases for convenience +using ChangeState = lifecycle_msgs::srv::ChangeState; +using GetState = lifecycle_msgs::srv::GetState; +using Transition = lifecycle_msgs::msg::Transition; +using State = lifecycle_msgs::msg::State; + +namespace sdr +{ + + class SDRKeyboardCommander : public rclcpp::Node + { + public: + explicit SDRKeyboardCommander(const rclcpp::NodeOptions &options); + virtual ~SDRKeyboardCommander(); + + private: + // --- Keyboard Handling --- + /** + * @brief Configures the terminal for non-blocking, non-echoing input. + */ + void init_keyboard(); + + /** + * @brief Restores the terminal to its original settings. + */ + void restore_keyboard(); + + /** + * @brief Polls the keyboard for a single character. + * @return The character pressed, or -1 if no key was pressed. + */ + int getch(); + + /** + * @brief Timer callback to check for keyboard input and process it. + */ + void check_keyboard(); + + // --- ROS 2 Lifecycle Client Functions --- + /** + * @brief Sends a request to the /sdr/change_state service. + * @param transition_id The ID of the transition to request (e.g., Transition::TRANSITION_CONFIGURE). + */ + void request_transition(uint8_t transition_id); + + /** + * @brief Sends a request to the /sdr/get_state service. + */ + void get_current_state(); + + /** + * @brief Prints the help menu to the console. + */ + void print_help(); + + // --- Member Variables --- + rclcpp::Client::SharedPtr change_state_client_; + rclcpp::Client::SharedPtr get_state_client_; + rclcpp::TimerBase::SharedPtr timer_; + + // For keyboard input + struct termios old_tio_, new_tio_; + + // For logging + std::map transition_map_; + std::string target_node_name_ = "sdr"; + }; + +} // namespace sdr + +#endif // SDR_KEYBOARD_COMMANDER_HPP_ \ No newline at end of file diff --git a/src/sdr_commander_main.cpp b/src/sdr_commander_main.cpp new file mode 100644 index 0000000..67c7ae6 --- /dev/null +++ b/src/sdr_commander_main.cpp @@ -0,0 +1,15 @@ +#include "sdr/sdr_keyboard_commander.hpp" // Adjust this path +#include "rclcpp/rclcpp.hpp" + +int main(int argc, char *argv[]) +{ + rclcpp::init(argc, argv); + + rclcpp::NodeOptions options; + auto commander_node = std::make_shared(options); + + rclcpp::spin(commander_node); + + rclcpp::shutdown(); + return 0; +} \ No newline at end of file diff --git a/src/sdr_keyboard_commander.cpp b/src/sdr_keyboard_commander.cpp new file mode 100644 index 0000000..cbaeab0 --- /dev/null +++ b/src/sdr_keyboard_commander.cpp @@ -0,0 +1,202 @@ +#include "sdr/sdr_keyboard_commander.hpp" // Adjust this path to match your package + +namespace sdr +{ + + SDRKeyboardCommander::SDRKeyboardCommander(const rclcpp::NodeOptions &options) + : Node("sdr_keyboard_commander", options) + { + // Initialize the service clients + change_state_client_ = this->create_client( + "/" + target_node_name_ + "/change_state"); + get_state_client_ = this->create_client( + "/" + target_node_name_ + "/get_state"); + + // Initialize the transition-to-string map for logging + transition_map_[Transition::TRANSITION_CONFIGURE] = "CONFIGURE"; + transition_map_[Transition::TRANSITION_ACTIVATE] = "ACTIVATE"; + transition_map_[Transition::TRANSITION_DEACTIVATE] = "DEACTIVATE"; + transition_map_[Transition::TRANSITION_CLEANUP] = "CLEANUP"; + transition_map_[Transition::TRANSITION_INACTIVE_SHUTDOWN] = "SHUTDOWN"; + + // Set up non-blocking keyboard + init_keyboard(); + + // Start a timer to poll the keyboard + timer_ = this->create_wall_timer( + std::chrono::milliseconds(100), + std::bind(&SDRKeyboardCommander::check_keyboard, this)); + + RCLCPP_INFO(get_logger(), "SDR Keyboard Commander started."); + RCLCPP_INFO(get_logger(), "Waiting for '/%s' lifecycle services...", target_node_name_.c_str()); + + // Wait for services to be available + while (!change_state_client_->wait_for_service(std::chrono::seconds(1))) + { + if (!rclcpp::ok()) + { + RCLCPP_ERROR(get_logger(), "Interrupted while waiting for service. Exiting."); + return; + } + RCLCPP_INFO(get_logger(), "Service '/%s/change_state' not available, waiting...", target_node_name_.c_str()); + } + + RCLCPP_INFO(get_logger(), "Services found. Ready to command."); + print_help(); + } + + SDRKeyboardCommander::~SDRKeyboardCommander() + { + // Restore terminal settings on exit + restore_keyboard(); + } + + void SDRKeyboardCommander::print_help() + { + printf("\n--- SDR Keyboard Commander ---\n"); + printf("Control the '%s' node:\n", target_node_name_.c_str()); + printf(" [c] -> CONFIGURE\n"); + printf(" [a] -> ACTIVATE (Start Recording)\n"); + printf(" [d] -> DEACTIVATE (Pause Recording)\n"); + printf(" [l] -> CLEANUP\n"); + printf(" [s] -> SHUTDOWN\n"); + printf(" [g] -> GET current state\n"); + printf(" [h] -> Print this HELP menu\n"); + printf(" [q] -> QUIT\n"); + printf("--------------------------------\n"); + } + + void SDRKeyboardCommander::init_keyboard() + { + // Get the current terminal settings + tcgetattr(STDIN_FILENO, &old_tio_); + // Copy settings to a new struct + new_tio_ = old_tio_; + // Disable canonical mode (line buffering) and echo + new_tio_.c_lflag &= ~(ICANON | ECHO); + // Set minimum number of characters for a read to 0 + new_tio_.c_cc[VMIN] = 0; + // Set the read timeout to 0 (non-blocking) + new_tio_.c_cc[VTIME] = 0; + // Apply the new settings + tcsetattr(STDIN_FILENO, TCSANOW, &new_tio_); + } + + void SDRKeyboardCommander::restore_keyboard() + { + // Restore the original terminal settings + tcsetattr(STDIN_FILENO, TCSANOW, &old_tio_); + } + + int SDRKeyboardCommander::getch() + { + char c = -1; + // Read 1 character from stdin + ssize_t n = read(STDIN_FILENO, &c, 1); + if (n == 1) + { + return c; // Return the character if read was successful + } + return -1; // Return -1 if no key was pressed + } + + void SDRKeyboardCommander::check_keyboard() + { + int key = getch(); + if (key == -1) + { + return; // No key pressed + } + + switch (key) + { + case 'c': + RCLCPP_INFO(get_logger(), "Requesting CONFIGURE..."); + request_transition(Transition::TRANSITION_CONFIGURE); + break; + case 'a': + RCLCPP_INFO(get_logger(), "Requesting ACTIVATE..."); + request_transition(Transition::TRANSITION_ACTIVATE); + break; + case 'd': + RCLCPP_INFO(get_logger(), "Requesting DEACTIVATE..."); + request_transition(Transition::TRANSITION_DEACTIVATE); + break; + case 'l': + RCLCPP_INFO(get_logger(), "Requesting CLEANUP..."); + request_transition(Transition::TRANSITION_CLEANUP); + break; + case 's': + RCLCPP_INFO(get_logger(), "Requesting SHUTDOWN..."); + request_transition(Transition::TRANSITION_INACTIVE_SHUTDOWN); + break; + case 'g': + RCLCPP_INFO(get_logger(), "Requesting GET_STATE..."); + get_current_state(); + break; + case 'h': + print_help(); + break; + case 'q': + RCLCPP_INFO(get_logger(), "Quitting commander node..."); + restore_keyboard(); // Restore keyboard before shutting down + rclcpp::shutdown(); + break; + } + } + + void SDRKeyboardCommander::request_transition(uint8_t transition_id) + { + if (!change_state_client_->service_is_ready()) + { + RCLCPP_ERROR(get_logger(), "Service '%s' is not available.", change_state_client_->get_service_name()); + return; + } + + auto request = std::make_shared(); + request->transition.id = transition_id; + + // Send the request asynchronously + change_state_client_->async_send_request( + request, + [this, transition_id](rclcpp::Client::SharedFuture future) + { + auto result = future.get(); + if (result->success) + { + RCLCPP_INFO( + this->get_logger(), "Transition '%s' successful.", + this->transition_map_[transition_id].c_str()); + } + else + { + RCLCPP_WARN( + this->get_logger(), "Transition '%s' failed.", + this->transition_map_[transition_id].c_str()); + } + }); + } + + void SDRKeyboardCommander::get_current_state() + { + if (!get_state_client_->service_is_ready()) + { + RCLCPP_ERROR(get_logger(), "Service '%s' is not available.", get_state_client_->get_service_name()); + return; + } + + auto request = std::make_shared(); + + // Send the request asynchronously + get_state_client_->async_send_request( + request, + [this](rclcpp::Client::SharedFuture future) + { + auto result = future.get(); + RCLCPP_INFO( + this->get_logger(), "Current state of '%s' is: %s", + this->target_node_name_.c_str(), result->current_state.label.c_str()); + }); + } + +} // namespace sdr \ No newline at end of file From e3d75bdda7a9677d4ab5a095ae698659fc2bc9b4 Mon Sep 17 00:00:00 2001 From: nimiCurtis Date: Mon, 20 Oct 2025 09:25:00 +0300 Subject: [PATCH 5/9] Refactor system_data_recorder structure: replace main executable, remove old source file, and update README for keyboard commander utility --- CMakeLists.txt | 15 ++++---- README.md | 38 +++++++++++++++++++ ...{system_data_recorder.cpp => sdr_main.cpp} | 0 3 files changed, 46 insertions(+), 7 deletions(-) rename src/{system_data_recorder.cpp => sdr_main.cpp} (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0d3f594..f3823fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,19 +36,20 @@ ament_target_dependencies(sdr_component ) rclcpp_components_register_nodes(sdr_component "sdr::SystemDataRecorder") -add_executable(system_data_recorder src/system_data_recorder.cpp) -add_executable(sdr_commander_main +add_executable(sdr src/sdr_main.cpp) + +add_executable(sdr_commander src/sdr_commander_main.cpp src/sdr_keyboard_commander.cpp ) -target_link_libraries(system_data_recorder sdr_component) +target_link_libraries(sdr sdr_component) -ament_target_dependencies(system_data_recorder +ament_target_dependencies(sdr rclcpp ) -ament_target_dependencies(sdr_commander_main +ament_target_dependencies(sdr_commander rclcpp lifecycle_msgs ) @@ -62,12 +63,12 @@ install(TARGETS ) install(TARGETS - system_data_recorder + sdr DESTINATION lib/${PROJECT_NAME} ) install(TARGETS - sdr_commander_main + sdr_commander DESTINATION lib/${PROJECT_NAME} ) diff --git a/README.md b/README.md index 548fbff..2d186c9 100644 --- a/README.md +++ b/README.md @@ -157,3 +157,41 @@ Ensures the last active recording session is properly deactivated and finalized. ### on_shutdown Performs the same actions as `on_cleanup` before shutting down the node. + + +--- + +## Keyboard Commander Utility + +For manual control and testing, an `SDRKeyboardCommander` node is available. This node listens for keyboard presses and sends the corresponding lifecycle transition requests to the `/sdr` node. + + +### Running + +1. In one terminal, run your `sdr` node: + + ```bash + ros2 launch system_data_recorder sdr_example.launch.py + ``` + +2. In a second terminal, source your workspace and run the commander: + + ```bash + ros2 run system_data_recorder sdr_commander + ``` + +### Controls + +Once the commander node is running and connected to the `/sdr` services, you can use the following keys to control the recorder: + +| Key | Action | Lifecycle Transition | +| :--- | :--- | :--- | +| **c** | Configure | `CONFIGURE` | +| **a** | Activate | `ACTIVATE` (Starts recording) | +| **d** | Deactivate | `DEACTIVATE` (Pauses recording) | +| **l** | Cleanup | `CLEANUP` | +| **s** | Shutdown | `SHUTDOWN` | +| **g** | Get State | (Queries and prints the current state) | +| **h** | Help | (Prints the help menu) | +| **q** | Quit | (Shuts down the commander node) | + diff --git a/src/system_data_recorder.cpp b/src/sdr_main.cpp similarity index 100% rename from src/system_data_recorder.cpp rename to src/sdr_main.cpp From 02355659a21cc7f95caf4f1d1184c01aad76d48f Mon Sep 17 00:00:00 2001 From: nimiCurtis Date: Wed, 22 Oct 2025 13:15:31 +0300 Subject: [PATCH 6/9] Update configuration and enhance parameter handling in SystemDataRecorder - Change copy_destination path in sdr_example.yaml to reflect user directory. - Add parameter querying functionality in sdr_component.cpp to write all node parameters to a YAML file. - Implement logging for parameter retrieval process. --- include/sdr/sdr_component.hpp | 10 ++ src/sdr_component.cpp | 202 +++++++++++++++++++++++++++++++++- 2 files changed, 211 insertions(+), 1 deletion(-) diff --git a/include/sdr/sdr_component.hpp b/include/sdr/sdr_component.hpp index bf8b4cd..2b75f73 100644 --- a/include/sdr/sdr_component.hpp +++ b/include/sdr/sdr_component.hpp @@ -21,6 +21,10 @@ #include #include // For timestamping #include // For formatting the timestamp +#include +#include +#include "rclcpp/parameter_client.hpp" + #include "sdr/visibility_control.h" #include "rclcpp/rclcpp.hpp" @@ -75,6 +79,12 @@ namespace sdr void subscribe_to_topics(); void subscribe_to_topic(const std::string &topic, const std::string &type); void unsubscribe_from_topics(); + + // Parameter querying functionality + void write_all_nodes_parameters_yaml(const std::filesystem::path& out_file); + void write_yaml_value_from_param(YAML::Node& node, const rclcpp::Parameter& p); + + std::string get_serialised_offered_qos_for_topic(const std::string &topic); rclcpp::QoS get_appropriate_qos_for_topic(const std::string &topic); diff --git a/src/sdr_component.cpp b/src/sdr_component.cpp index 62b902c..ba0d6f4 100644 --- a/src/sdr_component.cpp +++ b/src/sdr_component.cpp @@ -20,6 +20,8 @@ #include "rosbag2_transport/qos.hpp" #include "yaml-cpp/yaml.h" + + namespace sdr { @@ -111,6 +113,198 @@ namespace sdr return ss.str(); } + + void SystemDataRecorder::write_all_nodes_parameters_yaml(const std::filesystem::path& out_file) +{ + YAML::Node root; + + // Local helper node; not added to any executor + auto query_node = std::make_shared( + std::string(this->get_name()) + "_param_query", + this->get_namespace(), + rclcpp::NodeOptions() + .context(this->get_node_base_interface()->get_context()) + .start_parameter_services(false) + .start_parameter_event_publisher(false) + .use_intra_process_comms(false) + ); + + auto make_fqn = [](const std::string& ns, const std::string& name) { + if (ns.empty() || ns == "/") return std::string("/") + name; + return ns + "/" + name; + }; + const std::string self_fqn = make_fqn(this->get_namespace(), this->get_name()); + + auto name_ns_pairs = this->get_node_graph_interface()->get_node_names_and_namespaces(); + std::sort(name_ns_pairs.begin(), name_ns_pairs.end(), + [](const auto& a, const auto& b) { + if (a.second == b.second) return a.first < b.first; + return a.second < b.second; + }); + + for (const auto& nmns : name_ns_pairs) { + const auto& name = nmns.first; + const auto& ns = nmns.second; + const auto fqn = make_fqn(ns, name); + + if (fqn == self_fqn) continue; // skip our own node + if (!name.empty() && name[0] == '_') continue; // optional: skip hidden nodes + + auto client = std::make_shared(query_node.get(), fqn); + + if (!client->wait_for_service(std::chrono::milliseconds(300))) { + RCLCPP_WARN(get_logger(), "Param service not available for node %s, skipping.", fqn.c_str()); + continue; + } + + // 1) List declared parameter names (depth is generous) + rcl_interfaces::msg::ListParametersResult list_res; + try { + list_res = client->list_parameters( + {}, + rcl_interfaces::srv::ListParameters::Request::DEPTH_RECURSIVE + ); + + // --- ADD THIS LOGGING BLOCK --- + RCLCPP_INFO(get_logger(), "--- DEBUG PARAMS FOR NODE: %s ---", fqn.c_str()); + std::string names_str = "Names: ["; + for (const auto& name : list_res.names) { + names_str += "\"" + name + "\", "; + } + names_str += "]"; + RCLCPP_INFO(get_logger(), "%s", names_str.c_str()); + // --- END LOGGING BLOCK --- + } catch (const std::exception& e) { + RCLCPP_WARN(get_logger(), "list_parameters failed for %s: %s", fqn.c_str(), e.what()); + continue; + } + + const auto& names = list_res.names; + if (names.empty()) continue; + + YAML::Node params_node(YAML::NodeType::Map); + + // 2) Fetch each parameter **individually** + for (const auto& full_name : names) { + // FIX: Changed "qos_overrides." to "qos_overrides" + // Your log showed "qos_overrides./..." so this will catch it + if (full_name.rfind("qos_overrides", 0) == 0) { + continue; // skip QoS noise + } + + rclcpp::Parameter param; + try { + auto vec = client->get_parameters({full_name}); + if (!vec.empty()) { + param = vec.front(); + } else { + // If the server gives nothing back, still create a null entry + std::stringstream ss(full_name); + std::string key; + std::vector keys; + while (std::getline(ss, key, '.')) if (!key.empty()) keys.push_back(key); + + // --- FIX: Use a reference, not a pointer --- + YAML::Node& current_ref = params_node; + for (size_t k = 0; k + 1 < keys.size(); ++k) { + current_ref = current_ref[keys[k]]; // Re-assign reference to child + } + current_ref[keys.back()] = YAML::Node(); // null + // --- END FIX --- + continue; + } + } catch (const std::exception& e) { + RCLCPP_WARN(get_logger(), "get_parameters(%s) failed for %s: %s", + full_name.c_str(), fqn.c_str(), e.what()); + continue; + } + + // 3) Build nested YAML using the **full** dotted name + std::stringstream ss(full_name); + std::string key; + std::vector keys; + while (std::getline(ss, key, '.')) if (!key.empty()) keys.push_back(key); + + YAML::Node value_node; + write_yaml_value_from_param(value_node, param); + + if (keys.empty()) { + params_node[full_name] = value_node; + } else { + // Recursively build nested structure + std::function build_nested = + [&](YAML::Node& node, size_t depth) { + if (depth == keys.size() - 1) { + // Last key, set the value + node[keys[depth]] = value_node; + } else { + // Intermediate key, ensure it's a Map and recurse + if (!node[keys[depth]].IsDefined() || !node[keys[depth]].IsMap()) { + node[keys[depth]] = YAML::Node(YAML::NodeType::Map); + } + YAML::Node child = node[keys[depth]]; + build_nested(child, depth + 1); + node[keys[depth]] = child; // Write back the modified child + } + }; + + build_nested(params_node, 0); + } + } + YAML::Node node_root; + node_root["node_name"] = name; + node_root["node_namespace"] = ns; + node_root["parameters"] = params_node; + + root[fqn] = node_root; + } + + try { + std::ofstream ofs(out_file); + ofs << root; + ofs.close(); + RCLCPP_INFO_STREAM(get_logger(), "Wrote ALL nodes parameters YAML: " << out_file); + } catch (const std::exception& e) { + RCLCPP_ERROR(get_logger(), "Failed writing ALL nodes parameters YAML to '%s': %s", + out_file.c_str(), e.what()); + } + + query_node.reset(); // ensure destruction before return +} + + + + void SystemDataRecorder::write_yaml_value_from_param(YAML::Node& node, const rclcpp::Parameter& p) + { + using rclcpp::ParameterType; + + switch (p.get_type()) { + case ParameterType::PARAMETER_BOOL: node = p.as_bool(); break; + case ParameterType::PARAMETER_INTEGER: node = p.as_int(); break; + case ParameterType::PARAMETER_DOUBLE: node = p.as_double(); break; + case ParameterType::PARAMETER_STRING: node = p.as_string(); break; + case ParameterType::PARAMETER_BYTE_ARRAY: { + YAML::Node seq; for (auto b : p.as_byte_array()) seq.push_back(static_cast(b)); node = seq; break; + } + case ParameterType::PARAMETER_BOOL_ARRAY: { + YAML::Node seq; for (auto v : p.as_bool_array()) seq.push_back(v); node = seq; break; + } + case ParameterType::PARAMETER_INTEGER_ARRAY: { + YAML::Node seq; for (auto v : p.as_integer_array()) seq.push_back(v); node = seq; break; + } + case ParameterType::PARAMETER_DOUBLE_ARRAY: { + YAML::Node seq; for (auto v : p.as_double_array()) seq.push_back(v); node = seq; break; + } + case ParameterType::PARAMETER_STRING_ARRAY: { + YAML::Node seq; for (auto& v : p.as_string_array()) seq.push_back(v); node = seq; break; + } + case ParameterType::PARAMETER_NOT_SET: + default: node = YAML::Node(); break; + } + } + + + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn SystemDataRecorder::on_configure(const rclcpp_lifecycle::State &) { @@ -133,9 +327,11 @@ namespace sdr RCLCPP_INFO( get_logger(), "All copied bags for this session will be stored in: %s", session_destination_directory_.c_str()); + + // Start the file-copying thread copy_thread_ = std::make_shared([this] - { this->copy_thread_main(); }); + { this->copy_thread_main(); }); notify_state_change(SdrStateChange::PAUSED); cleaned_up = false; return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; @@ -163,6 +359,9 @@ namespace sdr } std::filesystem::create_directories(current_bag_final_destination_); + const auto params_yaml_path = current_bag_final_destination_ / "rosparams.yaml"; + write_all_nodes_parameters_yaml(params_yaml_path); + writer_ = std::make_shared( std::make_unique()); @@ -214,6 +413,7 @@ namespace sdr copy_thread_->join(); copy_thread_.reset(); } + cleaned_up = true; RCLCPP_INFO(get_logger(), "Cleanup complete."); return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; From 758e53206d4fb75b21a3e77f53a80933edb0decd Mon Sep 17 00:00:00 2001 From: nimiCurtis Date: Wed, 22 Oct 2025 13:39:35 +0300 Subject: [PATCH 7/9] Fix formatting in sdr_component.cpp by removing unnecessary whitespace --- src/sdr_component.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sdr_component.cpp b/src/sdr_component.cpp index ba0d6f4..fff2cee 100644 --- a/src/sdr_component.cpp +++ b/src/sdr_component.cpp @@ -24,7 +24,7 @@ namespace sdr { - + SystemDataRecorder::SystemDataRecorder(const rclcpp::NodeOptions &options) : rclcpp_lifecycle::LifecycleNode("sdr", options) { From 493c7d1242e6c98730b90bc2d72cba81d4b118c0 Mon Sep 17 00:00:00 2001 From: nimiCurtis Date: Tue, 28 Oct 2025 12:02:06 +0200 Subject: [PATCH 8/9] Refactor SDRKeyboardCommander for improved readability and structure - Reorganized constructor and destructor for clarity. - Enhanced keyboard input handling with clearer function definitions. - Improved logging messages for service availability checks. - Streamlined asynchronous service request handling for state changes. - Updated formatting for consistency and readability in the code. --- include/sdr/sdr_component.hpp | 176 ++-- include/sdr/sdr_keyboard_commander.hpp | 118 +-- include/sdr/visibility_control.h | 1 - src/sdr_commander_main.cpp | 16 +- src/sdr_component.cpp | 1120 ++++++++++++------------ src/sdr_keyboard_commander.cpp | 375 ++++---- src/sdr_main.cpp | 2 +- 7 files changed, 893 insertions(+), 915 deletions(-) diff --git a/include/sdr/sdr_component.hpp b/include/sdr/sdr_component.hpp index 2b75f73..0f436e4 100644 --- a/include/sdr/sdr_component.hpp +++ b/include/sdr/sdr_component.hpp @@ -35,97 +35,97 @@ namespace sdr { - // A struct to hold information for the file-copying thread - struct FileCopyJob - { - std::string source_path; - std::filesystem::path destination_path; - }; +// A struct to hold information for the file-copying thread +struct FileCopyJob +{ + std::string source_path; + std::filesystem::path destination_path; +}; + +class SystemDataRecorder : public rclcpp_lifecycle::LifecycleNode +{ +public: + SDR_PUBLIC + explicit SystemDataRecorder(const rclcpp::NodeOptions & options); + + SDR_PUBLIC + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + on_configure(const rclcpp_lifecycle::State & state); + + SDR_PUBLIC + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + on_activate(const rclcpp_lifecycle::State & state); + + SDR_PUBLIC + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + on_deactivate(const rclcpp_lifecycle::State & state); + + SDR_PUBLIC + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + on_cleanup(const rclcpp_lifecycle::State & state); - class SystemDataRecorder : public rclcpp_lifecycle::LifecycleNode + SDR_PUBLIC + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn + on_shutdown(const rclcpp_lifecycle::State & state); + +private: + // Parameter loading + bool read_parameters(); + + // Helper function to get a formatted timestamp string + std::string generate_timestamp(); + + // Bag recording functionality + void subscribe_to_topics(); + void subscribe_to_topic(const std::string & topic, const std::string & type); + void unsubscribe_from_topics(); + + // Parameter querying functionality + void write_all_nodes_parameters_yaml(const std::filesystem::path & out_file); + void write_yaml_value_from_param(YAML::Node & node, const rclcpp::Parameter & p); + + + std::string get_serialised_offered_qos_for_topic(const std::string & topic); + rclcpp::QoS get_appropriate_qos_for_topic(const std::string & topic); + + rosbag2_storage::StorageOptions storage_options_; + std::unordered_map topics_and_types_; + std::unordered_map> subscriptions_; + std::shared_ptr writer_; + + // File-copying functionality + enum class SdrStateChange { - public: - SDR_PUBLIC - explicit SystemDataRecorder(const rclcpp::NodeOptions &options); - - SDR_PUBLIC - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - on_configure(const rclcpp_lifecycle::State &state); - - SDR_PUBLIC - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - on_activate(const rclcpp_lifecycle::State &state); - - SDR_PUBLIC - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - on_deactivate(const rclcpp_lifecycle::State &state); - - SDR_PUBLIC - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - on_cleanup(const rclcpp_lifecycle::State &state); - - SDR_PUBLIC - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - on_shutdown(const rclcpp_lifecycle::State &state); - - private: - // Parameter loading - bool read_parameters(); - - // Helper function to get a formatted timestamp string - std::string generate_timestamp(); - - // Bag recording functionality - void subscribe_to_topics(); - void subscribe_to_topic(const std::string &topic, const std::string &type); - void unsubscribe_from_topics(); - - // Parameter querying functionality - void write_all_nodes_parameters_yaml(const std::filesystem::path& out_file); - void write_yaml_value_from_param(YAML::Node& node, const rclcpp::Parameter& p); - - - std::string get_serialised_offered_qos_for_topic(const std::string &topic); - rclcpp::QoS get_appropriate_qos_for_topic(const std::string &topic); - - rosbag2_storage::StorageOptions storage_options_; - std::unordered_map topics_and_types_; - std::unordered_map> subscriptions_; - std::shared_ptr writer_; - - // File-copying functionality - enum class SdrStateChange - { - NO_CHANGE, - PAUSED, - RECORDING, - FINISHED - }; - - SdrStateChange state_msg_ = SdrStateChange::NO_CHANGE; - std::queue files_to_copy_; // Queue now holds jobs - std::mutex copy_thread_mutex_; - std::condition_variable copy_thread_wake_cv_; - std::shared_ptr copy_thread_; - - void copy_thread_main(); - bool copy_thread_should_wake(); - void notify_state_change(SdrStateChange new_state); - void notify_new_file_to_copy(const FileCopyJob &job); - void copy_bag_file(const FileCopyJob &job); - - // Base configuration loaded from parameters - std::string base_bag_name_prefix_; - std::filesystem::path base_copy_destination_; - - // Session-specific paths, valid during an active recording - std::filesystem::path session_destination_directory_; // The main folder for this run - std::filesystem::path current_bag_tmp_directory_; // Temp folder for the current bag - std::filesystem::path current_bag_final_destination_; // Final folder for the current bag - std::string last_bag_file_ = ""; - bool cleaned_up = true; + NO_CHANGE, + PAUSED, + RECORDING, + FINISHED }; + SdrStateChange state_msg_ = SdrStateChange::NO_CHANGE; + std::queue files_to_copy_; // Queue now holds jobs + std::mutex copy_thread_mutex_; + std::condition_variable copy_thread_wake_cv_; + std::shared_ptr copy_thread_; + + void copy_thread_main(); + bool copy_thread_should_wake(); + void notify_state_change(SdrStateChange new_state); + void notify_new_file_to_copy(const FileCopyJob & job); + void copy_bag_file(const FileCopyJob & job); + + // Base configuration loaded from parameters + std::string base_bag_name_prefix_; + std::filesystem::path base_copy_destination_; + + // Session-specific paths, valid during an active recording + std::filesystem::path session_destination_directory_; // The main folder for this run + std::filesystem::path current_bag_tmp_directory_; // Temp folder for the current bag + std::filesystem::path current_bag_final_destination_; // Final folder for the current bag + std::string last_bag_file_ = ""; + bool cleaned_up = true; +}; + } // namespace sdr -#endif // SDR__SDR_COMPONENT_HPP__ \ No newline at end of file +#endif // SDR__SDR_COMPONENT_HPP__ diff --git a/include/sdr/sdr_keyboard_commander.hpp b/include/sdr/sdr_keyboard_commander.hpp index 6b79362..fd94fb2 100644 --- a/include/sdr/sdr_keyboard_commander.hpp +++ b/include/sdr/sdr_keyboard_commander.hpp @@ -21,65 +21,65 @@ using State = lifecycle_msgs::msg::State; namespace sdr { - class SDRKeyboardCommander : public rclcpp::Node - { - public: - explicit SDRKeyboardCommander(const rclcpp::NodeOptions &options); - virtual ~SDRKeyboardCommander(); - - private: - // --- Keyboard Handling --- - /** - * @brief Configures the terminal for non-blocking, non-echoing input. - */ - void init_keyboard(); - - /** - * @brief Restores the terminal to its original settings. - */ - void restore_keyboard(); - - /** - * @brief Polls the keyboard for a single character. - * @return The character pressed, or -1 if no key was pressed. - */ - int getch(); - - /** - * @brief Timer callback to check for keyboard input and process it. - */ - void check_keyboard(); - - // --- ROS 2 Lifecycle Client Functions --- - /** - * @brief Sends a request to the /sdr/change_state service. - * @param transition_id The ID of the transition to request (e.g., Transition::TRANSITION_CONFIGURE). - */ - void request_transition(uint8_t transition_id); - - /** - * @brief Sends a request to the /sdr/get_state service. - */ - void get_current_state(); - - /** - * @brief Prints the help menu to the console. - */ - void print_help(); - - // --- Member Variables --- - rclcpp::Client::SharedPtr change_state_client_; - rclcpp::Client::SharedPtr get_state_client_; - rclcpp::TimerBase::SharedPtr timer_; - - // For keyboard input - struct termios old_tio_, new_tio_; - - // For logging - std::map transition_map_; - std::string target_node_name_ = "sdr"; - }; +class SDRKeyboardCommander : public rclcpp::Node +{ +public: + explicit SDRKeyboardCommander(const rclcpp::NodeOptions & options); + virtual ~SDRKeyboardCommander(); + +private: + // --- Keyboard Handling --- + /** + * @brief Configures the terminal for non-blocking, non-echoing input. + */ + void init_keyboard(); + + /** + * @brief Restores the terminal to its original settings. + */ + void restore_keyboard(); + + /** + * @brief Polls the keyboard for a single character. + * @return The character pressed, or -1 if no key was pressed. + */ + int getch(); + + /** + * @brief Timer callback to check for keyboard input and process it. + */ + void check_keyboard(); + + // --- ROS 2 Lifecycle Client Functions --- + /** + * @brief Sends a request to the /sdr/change_state service. + * @param transition_id The ID of the transition to request (e.g., Transition::TRANSITION_CONFIGURE). + */ + void request_transition(uint8_t transition_id); + + /** + * @brief Sends a request to the /sdr/get_state service. + */ + void get_current_state(); + + /** + * @brief Prints the help menu to the console. + */ + void print_help(); + + // --- Member Variables --- + rclcpp::Client::SharedPtr change_state_client_; + rclcpp::Client::SharedPtr get_state_client_; + rclcpp::TimerBase::SharedPtr timer_; + + // For keyboard input + struct termios old_tio_, new_tio_; + + // For logging + std::map transition_map_; + std::string target_node_name_ = "sdr"; +}; } // namespace sdr -#endif // SDR_KEYBOARD_COMMANDER_HPP_ \ No newline at end of file +#endif // SDR_KEYBOARD_COMMANDER_HPP_ diff --git a/include/sdr/visibility_control.h b/include/sdr/visibility_control.h index d576cfb..b286465 100644 --- a/include/sdr/visibility_control.h +++ b/include/sdr/visibility_control.h @@ -56,4 +56,3 @@ extern "C" #endif #endif // SDR__VISIBILITY_CONTROL_H_ - diff --git a/src/sdr_commander_main.cpp b/src/sdr_commander_main.cpp index 67c7ae6..07efa11 100644 --- a/src/sdr_commander_main.cpp +++ b/src/sdr_commander_main.cpp @@ -1,15 +1,15 @@ #include "sdr/sdr_keyboard_commander.hpp" // Adjust this path #include "rclcpp/rclcpp.hpp" -int main(int argc, char *argv[]) +int main(int argc, char * argv[]) { - rclcpp::init(argc, argv); + rclcpp::init(argc, argv); - rclcpp::NodeOptions options; - auto commander_node = std::make_shared(options); + rclcpp::NodeOptions options; + auto commander_node = std::make_shared(options); - rclcpp::spin(commander_node); + rclcpp::spin(commander_node); - rclcpp::shutdown(); - return 0; -} \ No newline at end of file + rclcpp::shutdown(); + return 0; +} diff --git a/src/sdr_component.cpp b/src/sdr_component.cpp index fff2cee..15e2764 100644 --- a/src/sdr_component.cpp +++ b/src/sdr_component.cpp @@ -21,135 +21,136 @@ #include "yaml-cpp/yaml.h" - namespace sdr { - - SystemDataRecorder::SystemDataRecorder(const rclcpp::NodeOptions &options) - : rclcpp_lifecycle::LifecycleNode("sdr", options) - { - RCLCPP_INFO(get_logger(), "Initializing SystemDataRecorder node..."); - if (!read_parameters()) - { - throw std::runtime_error("Failed to read parameters for SystemDataRecorder"); - } - } - bool SystemDataRecorder::read_parameters() - { - // Declare parameters with new names - this->declare_parameter("bag_name_prefix", "bag"); - this->declare_parameter("copy_destination", ""); - this->declare_parameter("max_file_size", 0); - this->declare_parameter>("topic_names", std::vector{}); - this->declare_parameter>("topic_types", std::vector{}); - - std::string copy_destination_str; - int max_file_size; - std::vector topic_names, topic_types; - - if (!this->get_parameter("bag_name_prefix", base_bag_name_prefix_) || base_bag_name_prefix_.empty()) - { - RCLCPP_FATAL(get_logger(), "Required parameter 'bag_name_prefix' not set or is empty."); - return false; - } - if (!this->get_parameter("copy_destination", copy_destination_str) || copy_destination_str.empty()) - { - RCLCPP_FATAL(get_logger(), "Required parameter 'copy_destination' not set or is empty."); - return false; - } - base_copy_destination_ = copy_destination_str; +SystemDataRecorder::SystemDataRecorder(const rclcpp::NodeOptions & options) +: rclcpp_lifecycle::LifecycleNode("sdr", options) +{ + RCLCPP_INFO(get_logger(), "Initializing SystemDataRecorder node..."); + if (!read_parameters()) { + throw std::runtime_error("Failed to read parameters for SystemDataRecorder"); + } +} - if (!this->get_parameter("max_file_size", max_file_size) || max_file_size <= 0) - { - RCLCPP_FATAL(get_logger(), "Required parameter 'max_file_size' not set or is <= 0."); - return false; - } - this->get_parameter("topic_names", topic_names); - this->get_parameter("topic_types", topic_types); +bool SystemDataRecorder::read_parameters() +{ + // Declare parameters with new names + this->declare_parameter("bag_name_prefix", "bag"); + this->declare_parameter("copy_destination", ""); + this->declare_parameter("max_file_size", 0); + this->declare_parameter>("topic_names", std::vector{}); + this->declare_parameter>("topic_types", std::vector{}); + + std::string copy_destination_str; + int max_file_size; + std::vector topic_names, topic_types; + + if (!this->get_parameter( + "bag_name_prefix", + base_bag_name_prefix_) || base_bag_name_prefix_.empty()) + { + RCLCPP_FATAL(get_logger(), "Required parameter 'bag_name_prefix' not set or is empty."); + return false; + } + if (!this->get_parameter( + "copy_destination", + copy_destination_str) || copy_destination_str.empty()) + { + RCLCPP_FATAL(get_logger(), "Required parameter 'copy_destination' not set or is empty."); + return false; + } + base_copy_destination_ = copy_destination_str; - if (topic_names.size() != topic_types.size()) - { - RCLCPP_FATAL(get_logger(), "'topic_names' and 'topic_types' must have the same number of entries."); - return false; - } + if (!this->get_parameter("max_file_size", max_file_size) || max_file_size <= 0) { + RCLCPP_FATAL(get_logger(), "Required parameter 'max_file_size' not set or is <= 0."); + return false; + } + this->get_parameter("topic_names", topic_names); + this->get_parameter("topic_types", topic_types); - for (size_t i = 0; i < topic_names.size(); ++i) - { - this->topics_and_types_[topic_names[i]] = topic_types[i]; - } + if (topic_names.size() != topic_types.size()) { + RCLCPP_FATAL( + get_logger(), "'topic_names' and 'topic_types' must have the same number of entries."); + return false; + } - storage_options_.storage_id = "sqlite3"; - storage_options_.max_bagfile_size = max_file_size; - storage_options_.max_cache_size = 100 * 1024 * 1024; - - RCLCPP_INFO(get_logger(), "--- SystemDataRecorder Base Configuration ---"); - RCLCPP_INFO(get_logger(), "* Bag Name Prefix: %s", base_bag_name_prefix_.c_str()); - RCLCPP_INFO(get_logger(), "* Base Copy Destination: %s", base_copy_destination_.c_str()); - RCLCPP_INFO(get_logger(), "* Max File Size: %d bytes", max_file_size); - RCLCPP_INFO(get_logger(), "* Topics to Record:"); - if (this->topics_and_types_.empty()) - { - RCLCPP_INFO(get_logger(), " - None"); - } - else - { - for (const auto &pair : this->topics_and_types_) - { - RCLCPP_INFO(get_logger(), " - %s (%s)", pair.first.c_str(), pair.second.c_str()); - } - } - RCLCPP_INFO(get_logger(), "-------------------------------------------"); + for (size_t i = 0; i < topic_names.size(); ++i) { + this->topics_and_types_[topic_names[i]] = topic_types[i]; + } - return true; + storage_options_.storage_id = "sqlite3"; + storage_options_.max_bagfile_size = max_file_size; + storage_options_.max_cache_size = 100 * 1024 * 1024; + + RCLCPP_INFO(get_logger(), "--- SystemDataRecorder Base Configuration ---"); + RCLCPP_INFO(get_logger(), "* Bag Name Prefix: %s", base_bag_name_prefix_.c_str()); + RCLCPP_INFO(get_logger(), "* Base Copy Destination: %s", base_copy_destination_.c_str()); + RCLCPP_INFO(get_logger(), "* Max File Size: %d bytes", max_file_size); + RCLCPP_INFO(get_logger(), "* Topics to Record:"); + if (this->topics_and_types_.empty()) { + RCLCPP_INFO(get_logger(), " - None"); + } else { + for (const auto & pair : this->topics_and_types_) { + RCLCPP_INFO(get_logger(), " - %s (%s)", pair.first.c_str(), pair.second.c_str()); } + } + RCLCPP_INFO(get_logger(), "-------------------------------------------"); - std::string SystemDataRecorder::generate_timestamp() - { - auto now = std::chrono::system_clock::now(); - auto in_time_t = std::chrono::system_clock::to_time_t(now); - std::stringstream ss; - ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d_%H-%M-%S"); - return ss.str(); - } + return true; +} +std::string SystemDataRecorder::generate_timestamp() +{ + auto now = std::chrono::system_clock::now(); + auto in_time_t = std::chrono::system_clock::to_time_t(now); + std::stringstream ss; + ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d_%H-%M-%S"); + return ss.str(); +} - void SystemDataRecorder::write_all_nodes_parameters_yaml(const std::filesystem::path& out_file) + +void SystemDataRecorder::write_all_nodes_parameters_yaml(const std::filesystem::path & out_file) { YAML::Node root; // Local helper node; not added to any executor auto query_node = std::make_shared( - std::string(this->get_name()) + "_param_query", - this->get_namespace(), - rclcpp::NodeOptions() - .context(this->get_node_base_interface()->get_context()) - .start_parameter_services(false) - .start_parameter_event_publisher(false) - .use_intra_process_comms(false) + std::string(this->get_name()) + "_param_query", + this->get_namespace(), + rclcpp::NodeOptions() + .context(this->get_node_base_interface()->get_context()) + .start_parameter_services(false) + .start_parameter_event_publisher(false) + .use_intra_process_comms(false) ); - auto make_fqn = [](const std::string& ns, const std::string& name) { - if (ns.empty() || ns == "/") return std::string("/") + name; - return ns + "/" + name; - }; + auto make_fqn = [](const std::string & ns, const std::string & name) { + if (ns.empty() || ns == "/") {return std::string("/") + name;} + return ns + "/" + name; + }; const std::string self_fqn = make_fqn(this->get_namespace(), this->get_name()); auto name_ns_pairs = this->get_node_graph_interface()->get_node_names_and_namespaces(); - std::sort(name_ns_pairs.begin(), name_ns_pairs.end(), - [](const auto& a, const auto& b) { - if (a.second == b.second) return a.first < b.first; - return a.second < b.second; - }); - - for (const auto& nmns : name_ns_pairs) { - const auto& name = nmns.first; - const auto& ns = nmns.second; - const auto fqn = make_fqn(ns, name); - - if (fqn == self_fqn) continue; // skip our own node - if (!name.empty() && name[0] == '_') continue; // optional: skip hidden nodes + std::sort( + name_ns_pairs.begin(), name_ns_pairs.end(), + [](const auto & a, const auto & b) { + if (a.second == b.second) {return a.first < b.first;} + return a.second < b.second; + }); + + for (const auto & nmns : name_ns_pairs) { + const auto & name = nmns.first; + const auto & ns = nmns.second; + const auto fqn = make_fqn(ns, name); + + if (fqn == self_fqn) { + continue; // skip our own node + } + if (!name.empty() && name[0] == '_') { + continue; // optional: skip hidden nodes + } auto client = std::make_shared(query_node.get(), fqn); if (!client->wait_for_service(std::chrono::milliseconds(300))) { @@ -160,96 +161,97 @@ namespace sdr // 1) List declared parameter names (depth is generous) rcl_interfaces::msg::ListParametersResult list_res; try { - list_res = client->list_parameters( - {}, - rcl_interfaces::srv::ListParameters::Request::DEPTH_RECURSIVE - ); - - // --- ADD THIS LOGGING BLOCK --- - RCLCPP_INFO(get_logger(), "--- DEBUG PARAMS FOR NODE: %s ---", fqn.c_str()); - std::string names_str = "Names: ["; - for (const auto& name : list_res.names) { - names_str += "\"" + name + "\", "; - } - names_str += "]"; - RCLCPP_INFO(get_logger(), "%s", names_str.c_str()); - // --- END LOGGING BLOCK --- - } catch (const std::exception& e) { + list_res = client->list_parameters( + {}, + rcl_interfaces::srv::ListParameters::Request::DEPTH_RECURSIVE + ); + + // --- ADD THIS LOGGING BLOCK --- + RCLCPP_INFO(get_logger(), "--- DEBUG PARAMS FOR NODE: %s ---", fqn.c_str()); + std::string names_str = "Names: ["; + for (const auto & name : list_res.names) { + names_str += "\"" + name + "\", "; + } + names_str += "]"; + RCLCPP_INFO(get_logger(), "%s", names_str.c_str()); + // --- END LOGGING BLOCK --- + } catch (const std::exception & e) { RCLCPP_WARN(get_logger(), "list_parameters failed for %s: %s", fqn.c_str(), e.what()); continue; } - const auto& names = list_res.names; - if (names.empty()) continue; + const auto & names = list_res.names; + if (names.empty()) {continue;} YAML::Node params_node(YAML::NodeType::Map); // 2) Fetch each parameter **individually** - for (const auto& full_name : names) { - // FIX: Changed "qos_overrides." to "qos_overrides" - // Your log showed "qos_overrides./..." so this will catch it - if (full_name.rfind("qos_overrides", 0) == 0) { + for (const auto & full_name : names) { + // FIX: Changed "qos_overrides." to "qos_overrides" + // Your log showed "qos_overrides./..." so this will catch it + if (full_name.rfind("qos_overrides", 0) == 0) { continue; // skip QoS noise - } + } - rclcpp::Parameter param; - try { + rclcpp::Parameter param; + try { auto vec = client->get_parameters({full_name}); if (!vec.empty()) { - param = vec.front(); + param = vec.front(); } else { - // If the server gives nothing back, still create a null entry - std::stringstream ss(full_name); - std::string key; - std::vector keys; - while (std::getline(ss, key, '.')) if (!key.empty()) keys.push_back(key); - - // --- FIX: Use a reference, not a pointer --- - YAML::Node& current_ref = params_node; - for (size_t k = 0; k + 1 < keys.size(); ++k) { - current_ref = current_ref[keys[k]]; // Re-assign reference to child - } - current_ref[keys.back()] = YAML::Node(); // null - // --- END FIX --- - continue; - } - } catch (const std::exception& e) { - RCLCPP_WARN(get_logger(), "get_parameters(%s) failed for %s: %s", - full_name.c_str(), fqn.c_str(), e.what()); + // If the server gives nothing back, still create a null entry + std::stringstream ss(full_name); + std::string key; + std::vector keys; + while (std::getline(ss, key, '.')) {if (!key.empty()) {keys.push_back(key);}} + + // --- FIX: Use a reference, not a pointer --- + YAML::Node & current_ref = params_node; + for (size_t k = 0; k + 1 < keys.size(); ++k) { + current_ref = current_ref[keys[k]]; // Re-assign reference to child + } + current_ref[keys.back()] = YAML::Node(); // null + // --- END FIX --- + continue; + } + } catch (const std::exception & e) { + RCLCPP_WARN( + get_logger(), "get_parameters(%s) failed for %s: %s", + full_name.c_str(), fqn.c_str(), e.what()); continue; - } - - // 3) Build nested YAML using the **full** dotted name - std::stringstream ss(full_name); - std::string key; - std::vector keys; - while (std::getline(ss, key, '.')) if (!key.empty()) keys.push_back(key); - - YAML::Node value_node; - write_yaml_value_from_param(value_node, param); + } + + // 3) Build nested YAML using the **full** dotted name + std::stringstream ss(full_name); + std::string key; + std::vector keys; + while (std::getline(ss, key, '.')) {if (!key.empty()) {keys.push_back(key);}} + + YAML::Node value_node; + write_yaml_value_from_param(value_node, param); + + if (keys.empty()) { + params_node[full_name] = value_node; + } else { + // Recursively build nested structure + std::function build_nested = + [&](YAML::Node & node, size_t depth) { + if (depth == keys.size() - 1) { + // Last key, set the value + node[keys[depth]] = value_node; + } else { + // Intermediate key, ensure it's a Map and recurse + if (!node[keys[depth]].IsDefined() || !node[keys[depth]].IsMap()) { + node[keys[depth]] = YAML::Node(YAML::NodeType::Map); + } + YAML::Node child = node[keys[depth]]; + build_nested(child, depth + 1); + node[keys[depth]] = child; // Write back the modified child + } + }; - if (keys.empty()) { - params_node[full_name] = value_node; - } else { - // Recursively build nested structure - std::function build_nested = - [&](YAML::Node& node, size_t depth) { - if (depth == keys.size() - 1) { - // Last key, set the value - node[keys[depth]] = value_node; - } else { - // Intermediate key, ensure it's a Map and recurse - if (!node[keys[depth]].IsDefined() || !node[keys[depth]].IsMap()) { - node[keys[depth]] = YAML::Node(YAML::NodeType::Map); - } - YAML::Node child = node[keys[depth]]; - build_nested(child, depth + 1); - node[keys[depth]] = child; // Write back the modified child - } - }; - - build_nested(params_node, 0); - } + build_nested(params_node, 0); + } } YAML::Node node_root; node_root["node_name"] = name; @@ -264,439 +266,421 @@ namespace sdr ofs << root; ofs.close(); RCLCPP_INFO_STREAM(get_logger(), "Wrote ALL nodes parameters YAML: " << out_file); - } catch (const std::exception& e) { - RCLCPP_ERROR(get_logger(), "Failed writing ALL nodes parameters YAML to '%s': %s", - out_file.c_str(), e.what()); + } catch (const std::exception & e) { + RCLCPP_ERROR( + get_logger(), "Failed writing ALL nodes parameters YAML to '%s': %s", + out_file.c_str(), e.what()); } query_node.reset(); // ensure destruction before return } +void SystemDataRecorder::write_yaml_value_from_param(YAML::Node & node, const rclcpp::Parameter & p) +{ + using rclcpp::ParameterType; + + switch (p.get_type()) { + case ParameterType::PARAMETER_BOOL: node = p.as_bool(); break; + case ParameterType::PARAMETER_INTEGER: node = p.as_int(); break; + case ParameterType::PARAMETER_DOUBLE: node = p.as_double(); break; + case ParameterType::PARAMETER_STRING: node = p.as_string(); break; + case ParameterType::PARAMETER_BYTE_ARRAY: { + YAML::Node seq; for (auto b : p.as_byte_array()) { + seq.push_back(static_cast(b)); + } + node = seq; break; + } + case ParameterType::PARAMETER_BOOL_ARRAY: { + YAML::Node seq; for (auto v : p.as_bool_array()) { + seq.push_back(v); + } + node = seq; break; + } + case ParameterType::PARAMETER_INTEGER_ARRAY: { + YAML::Node seq; for (auto v : p.as_integer_array()) { + seq.push_back(v); + } + node = seq; break; + } + case ParameterType::PARAMETER_DOUBLE_ARRAY: { + YAML::Node seq; for (auto v : p.as_double_array()) { + seq.push_back(v); + } + node = seq; break; + } + case ParameterType::PARAMETER_STRING_ARRAY: { + YAML::Node seq; for (auto & v : p.as_string_array()) { + seq.push_back(v); + } + node = seq; break; + } + case ParameterType::PARAMETER_NOT_SET: + default: node = YAML::Node(); break; + } +} - void SystemDataRecorder::write_yaml_value_from_param(YAML::Node& node, const rclcpp::Parameter& p) - { - using rclcpp::ParameterType; - - switch (p.get_type()) { - case ParameterType::PARAMETER_BOOL: node = p.as_bool(); break; - case ParameterType::PARAMETER_INTEGER: node = p.as_int(); break; - case ParameterType::PARAMETER_DOUBLE: node = p.as_double(); break; - case ParameterType::PARAMETER_STRING: node = p.as_string(); break; - case ParameterType::PARAMETER_BYTE_ARRAY: { - YAML::Node seq; for (auto b : p.as_byte_array()) seq.push_back(static_cast(b)); node = seq; break; - } - case ParameterType::PARAMETER_BOOL_ARRAY: { - YAML::Node seq; for (auto v : p.as_bool_array()) seq.push_back(v); node = seq; break; - } - case ParameterType::PARAMETER_INTEGER_ARRAY: { - YAML::Node seq; for (auto v : p.as_integer_array()) seq.push_back(v); node = seq; break; - } - case ParameterType::PARAMETER_DOUBLE_ARRAY: { - YAML::Node seq; for (auto v : p.as_double_array()) seq.push_back(v); node = seq; break; - } - case ParameterType::PARAMETER_STRING_ARRAY: { - YAML::Node seq; for (auto& v : p.as_string_array()) seq.push_back(v); node = seq; break; - } - case ParameterType::PARAMETER_NOT_SET: - default: node = YAML::Node(); break; - } - } +rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn +SystemDataRecorder::on_configure(const rclcpp_lifecycle::State &) +{ + RCLCPP_INFO(get_logger(), "Configuring SDR. Creating main session directory."); + // Create the main timestamped directory for all bags from this run + session_destination_directory_ = base_copy_destination_ / ("sdr_session_" + generate_timestamp()); - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - SystemDataRecorder::on_configure(const rclcpp_lifecycle::State &) - { - RCLCPP_INFO(get_logger(), "Configuring SDR. Creating main session directory."); + try { + std::filesystem::create_directories(session_destination_directory_); + } catch (const std::filesystem::filesystem_error & ex) { + RCLCPP_ERROR( + get_logger(), "Could not create session directory '%s': %s", + session_destination_directory_.c_str(), ex.what()); + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::FAILURE; + } + RCLCPP_INFO( + get_logger(), "All copied bags for this session will be stored in: %s", + session_destination_directory_.c_str()); + + + // Start the file-copying thread + copy_thread_ = std::make_shared( + [this] + {this->copy_thread_main();}); + notify_state_change(SdrStateChange::PAUSED); + cleaned_up = false; + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; +} - // Create the main timestamped directory for all bags from this run - session_destination_directory_ = base_copy_destination_ / ("sdr_session_" + generate_timestamp()); +rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn +SystemDataRecorder::on_activate(const rclcpp_lifecycle::State &) +{ + std::string bag_name = base_bag_name_prefix_ + "_" + generate_timestamp(); + RCLCPP_INFO(get_logger(), "Activating. Starting new recording session: %s", bag_name.c_str()); - try - { - std::filesystem::create_directories(session_destination_directory_); - } - catch (const std::filesystem::filesystem_error &ex) - { - RCLCPP_ERROR( - get_logger(), "Could not create session directory '%s': %s", - session_destination_directory_.c_str(), ex.what()); - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::FAILURE; - } - RCLCPP_INFO( - get_logger(), "All copied bags for this session will be stored in: %s", - session_destination_directory_.c_str()); - - - // Start the file-copying thread - copy_thread_ = std::make_shared([this] - { this->copy_thread_main(); }); - notify_state_change(SdrStateChange::PAUSED); - cleaned_up = false; - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; - } + // Set up paths for this specific recording session + current_bag_tmp_directory_ = std::filesystem::temp_directory_path() / bag_name; + current_bag_final_destination_ = session_destination_directory_ / bag_name; + storage_options_.uri = current_bag_tmp_directory_.string(); - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - SystemDataRecorder::on_activate(const rclcpp_lifecycle::State &) - { - std::string bag_name = base_bag_name_prefix_ + "_" + generate_timestamp(); - RCLCPP_INFO(get_logger(), "Activating. Starting new recording session: %s", bag_name.c_str()); - - // Set up paths for this specific recording session - current_bag_tmp_directory_ = std::filesystem::temp_directory_path() / bag_name; - current_bag_final_destination_ = session_destination_directory_ / bag_name; - storage_options_.uri = current_bag_tmp_directory_.string(); - - // Pre-emptively set the name of the first bag file. This ensures we always have a valid - // filename to copy, even if no bag split occurs. - last_bag_file_ = (current_bag_tmp_directory_ / (bag_name + "_0.db3")).string(); - - // Ensure the temporary directory is clean - if (std::filesystem::exists(current_bag_tmp_directory_)) - { - std::filesystem::remove_all(current_bag_tmp_directory_); - } - std::filesystem::create_directories(current_bag_final_destination_); + // Pre-emptively set the name of the first bag file. This ensures we always have a valid + // filename to copy, even if no bag split occurs. + last_bag_file_ = (current_bag_tmp_directory_ / (bag_name + "_0.db3")).string(); - const auto params_yaml_path = current_bag_final_destination_ / "rosparams.yaml"; - write_all_nodes_parameters_yaml(params_yaml_path); + // Ensure the temporary directory is clean + if (std::filesystem::exists(current_bag_tmp_directory_)) { + std::filesystem::remove_all(current_bag_tmp_directory_); + } + std::filesystem::create_directories(current_bag_final_destination_); - writer_ = std::make_shared( - std::make_unique()); + const auto params_yaml_path = current_bag_final_destination_ / "rosparams.yaml"; + write_all_nodes_parameters_yaml(params_yaml_path); - rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; - callbacks.write_split_callback = - [this](rosbag2_cpp::bag_events::BagSplitInfo &info) - { - last_bag_file_ = info.opened_file; - notify_new_file_to_copy({info.closed_file, current_bag_final_destination_}); - }; - writer_->add_event_callbacks(callbacks); - writer_->open(storage_options_, {rmw_get_serialization_format(), rmw_get_serialization_format()}); + writer_ = std::make_shared( + std::make_unique()); - subscribe_to_topics(); - notify_state_change(SdrStateChange::RECORDING); + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [this](rosbag2_cpp::bag_events::BagSplitInfo & info) + { + last_bag_file_ = info.opened_file; + notify_new_file_to_copy({info.closed_file, current_bag_final_destination_}); + }; + writer_->add_event_callbacks(callbacks); + writer_->open(storage_options_, {rmw_get_serialization_format(), rmw_get_serialization_format()}); - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; - } + subscribe_to_topics(); + notify_state_change(SdrStateChange::RECORDING); - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - SystemDataRecorder::on_deactivate(const rclcpp_lifecycle::State &) - { - RCLCPP_INFO(get_logger(), "Deactivating. Finalizing current bag."); - notify_state_change(SdrStateChange::PAUSED); - unsubscribe_from_topics(); - - if (writer_) - { - writer_.reset(); // Finalizes bag and writes metadata.yaml - notify_new_file_to_copy({last_bag_file_, current_bag_final_destination_}); - notify_new_file_to_copy( - {current_bag_tmp_directory_ / "metadata.yaml", current_bag_final_destination_}); - } - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; - } + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; +} - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - SystemDataRecorder::on_cleanup(const rclcpp_lifecycle::State &state) - { - RCLCPP_INFO(get_logger(), "Cleaning up SDR."); - if (get_current_state().id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE) - { - on_deactivate(state); - } +rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn +SystemDataRecorder::on_deactivate(const rclcpp_lifecycle::State &) +{ + RCLCPP_INFO(get_logger(), "Deactivating. Finalizing current bag."); + notify_state_change(SdrStateChange::PAUSED); + unsubscribe_from_topics(); + + if (writer_) { + writer_.reset(); // Finalizes bag and writes metadata.yaml + notify_new_file_to_copy({last_bag_file_, current_bag_final_destination_}); + notify_new_file_to_copy( + {current_bag_tmp_directory_ / "metadata.yaml", current_bag_final_destination_}); + } + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; +} - if (copy_thread_) - { - notify_state_change(SdrStateChange::FINISHED); - copy_thread_->join(); - copy_thread_.reset(); - } +rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn +SystemDataRecorder::on_cleanup(const rclcpp_lifecycle::State & state) +{ + RCLCPP_INFO(get_logger(), "Cleaning up SDR."); + if (get_current_state().id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE) { + on_deactivate(state); + } - cleaned_up = true; - RCLCPP_INFO(get_logger(), "Cleanup complete."); - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; - } + if (copy_thread_) { + notify_state_change(SdrStateChange::FINISHED); + copy_thread_->join(); + copy_thread_.reset(); + } - rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn - SystemDataRecorder::on_shutdown(const rclcpp_lifecycle::State &state) - { - RCLCPP_INFO(get_logger(), "Shutting down SDR."); - if (!cleaned_up) - { - on_cleanup(state); - } - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; - } + cleaned_up = true; + RCLCPP_INFO(get_logger(), "Cleanup complete."); + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; +} - /////////////////////////////////////////////////////////////////////////////////////////////////// - // Bag recording functionality - /////////////////////////////////////////////////////////////////////////////////////////////////// +rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn +SystemDataRecorder::on_shutdown(const rclcpp_lifecycle::State & state) +{ + RCLCPP_INFO(get_logger(), "Shutting down SDR."); + if (!cleaned_up) { + on_cleanup(state); + } + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; +} - void SystemDataRecorder::subscribe_to_topics() - { - for (const auto &topic_with_type : topics_and_types_) - { - subscribe_to_topic(topic_with_type.first, topic_with_type.second); - } - } +/////////////////////////////////////////////////////////////////////////////////////////////////// +// Bag recording functionality +/////////////////////////////////////////////////////////////////////////////////////////////////// - void SystemDataRecorder::subscribe_to_topic(const std::string &topic, const std::string &type) - { - // Get the QoS offered by the topic for saving in the bag - auto offered_qos = get_serialised_offered_qos_for_topic(topic); - // Find out what QoS is most appropriate to use when subscribing to the topic - auto qos = get_appropriate_qos_for_topic(topic); - - // The metadata to pass to the writer object so it can register the topic in the bag - auto topic_metadata = rosbag2_storage::TopicMetadata( - { - topic, // Topic name - type, // Topic type, e.g. "example_interfaces/msg/String" - rmw_get_serialization_format(), // The serialization format, most likely to be "CDR" - offered_qos // The offered QoS profile for the topic in YAML - }); - // It is a good idea to create the topic in the writer prior to adding the subscription in case - // data arrives after subscribing and before the topic is created. Although we should be ignoring - // any data until the node is set to active, we maintain this good practice here for future - // maintainability. - writer_->create_topic(topic_metadata); - - // Create a generic subscriber. A generic subscriber received message data in serialized form, - // which means that: - // - No de-serialization will take place, saving that processing time, and - // - The data type does not need to be known at compile time, so we don't need a templated - // callback for when message data is received. - auto subscription = create_generic_subscription( - topic, - type, - qos, - [this, topic, type](std::shared_ptr message) - { - // When a message is received, it should only be written to the bag if recording is not - // paused (i.e. the node lifecycle state is "active"). If recording is paused, the message is - // thrown away. - if (get_current_state().id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE) - { - writer_->write(message, topic, type, rclcpp::Clock(RCL_SYSTEM_TIME).now()); - } - }); - if (subscription) - { - subscriptions_.insert({topic, subscription}); - RCLCPP_INFO(get_logger(), "Subscribed to topic '%s'", topic.c_str()); - } - else - { - writer_->remove_topic(topic_metadata); - RCLCPP_ERROR(get_logger(), "Failed to subscribe to topic '%s'", topic.c_str()); - } - } +void SystemDataRecorder::subscribe_to_topics() +{ + for (const auto & topic_with_type : topics_and_types_) { + subscribe_to_topic(topic_with_type.first, topic_with_type.second); + } +} - std::string SystemDataRecorder::get_serialised_offered_qos_for_topic(const std::string &topic) - { - YAML::Node offered_qos_profiles; - auto endpoints = get_publishers_info_by_topic(topic); - for (const auto &endpoint : endpoints) - { - offered_qos_profiles.push_back(rosbag2_transport::Rosbag2QoS(endpoint.qos_profile())); - } - return YAML::Dump(offered_qos_profiles); - } +void SystemDataRecorder::subscribe_to_topic(const std::string & topic, const std::string & type) +{ + // Get the QoS offered by the topic for saving in the bag + auto offered_qos = get_serialised_offered_qos_for_topic(topic); + // Find out what QoS is most appropriate to use when subscribing to the topic + auto qos = get_appropriate_qos_for_topic(topic); - // Figure out the most appropriate QoS for a given topic. This method tries to decide if - // constrained QoS can be used, or if less-trustworthy QoS needs to be used to catch data from - // every publisher. - // Returns the QoS to use. - rclcpp::QoS SystemDataRecorder::get_appropriate_qos_for_topic(const std::string &topic) + // The metadata to pass to the writer object so it can register the topic in the bag + auto topic_metadata = rosbag2_storage::TopicMetadata( { - auto qos = rclcpp::QoS(rmw_qos_profile_default.depth); - - // Get the information about known publishers on this topic - auto endpoints = get_publishers_info_by_topic(topic); - if (endpoints.empty()) - { - // There are not yet any publishers on the topic. - // Use the default QoS profile, as we do not know what publishers will use since there are not - // yet any publishers on this topic. - return qos; - } + topic, // Topic name + type, // Topic type, e.g. "example_interfaces/msg/String" + rmw_get_serialization_format(), // The serialization format, most likely to be "CDR" + offered_qos // The offered QoS profile for the topic in YAML + }); + // It is a good idea to create the topic in the writer prior to adding the subscription in case + // data arrives after subscribing and before the topic is created. Although we should be ignoring + // any data until the node is set to active, we maintain this good practice here for future + // maintainability. + writer_->create_topic(topic_metadata); + + // Create a generic subscriber. A generic subscriber received message data in serialized form, + // which means that: + // - No de-serialization will take place, saving that processing time, and + // - The data type does not need to be known at compile time, so we don't need a templated + // callback for when message data is received. + auto subscription = create_generic_subscription( + topic, + type, + qos, + [this, topic, type](std::shared_ptr message) + { + // When a message is received, it should only be written to the bag if recording is not + // paused (i.e. the node lifecycle state is "active"). If recording is paused, the message is + // thrown away. + if (get_current_state().id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE) { + writer_->write(message, topic, type, rclcpp::Clock(RCL_SYSTEM_TIME).now()); + } + }); + if (subscription) { + subscriptions_.insert({topic, subscription}); + RCLCPP_INFO(get_logger(), "Subscribed to topic '%s'", topic.c_str()); + } else { + writer_->remove_topic(topic_metadata); + RCLCPP_ERROR(get_logger(), "Failed to subscribe to topic '%s'", topic.c_str()); + } +} - // Count the number of reliable and transient-local publishers for the topic - size_t reliability_reliable_endpoints_count = 0; - size_t durability_transient_local_endpoints_count = 0; - for (const auto &endpoint : endpoints) - { - const auto &profile = endpoint.qos_profile().get_rmw_qos_profile(); - if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) - { - ++reliability_reliable_endpoints_count; - } - if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) - { - ++durability_transient_local_endpoints_count; - } - } +std::string SystemDataRecorder::get_serialised_offered_qos_for_topic(const std::string & topic) +{ + YAML::Node offered_qos_profiles; + auto endpoints = get_publishers_info_by_topic(topic); + for (const auto & endpoint : endpoints) { + offered_qos_profiles.push_back(rosbag2_transport::Rosbag2QoS(endpoint.qos_profile())); + } + return YAML::Dump(offered_qos_profiles); +} - if (reliability_reliable_endpoints_count == endpoints.size()) - { - // All publishers are reliable, so we can use the reliable QoS - qos.reliable(); - } - else - { - if (reliability_reliable_endpoints_count > 0) - { - // There is a mix of QoS profiles amongst the publishers, so use the QoS setting that captures - // all of them - RCLCPP_WARN( - get_logger(), - "Some, but not all, publishers on topic \"%s\" are offering " - "RMW_QOS_POLICY_RELIABILITY_RELIABLE. Falling back to " - "RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT as it will connect to all publishers. Some " - "messages from Reliable publishers could be dropped.", - topic.c_str()); - } - qos.best_effort(); - } +// Figure out the most appropriate QoS for a given topic. This method tries to decide if +// constrained QoS can be used, or if less-trustworthy QoS needs to be used to catch data from +// every publisher. +// Returns the QoS to use. +rclcpp::QoS SystemDataRecorder::get_appropriate_qos_for_topic(const std::string & topic) +{ + auto qos = rclcpp::QoS(rmw_qos_profile_default.depth); + + // Get the information about known publishers on this topic + auto endpoints = get_publishers_info_by_topic(topic); + if (endpoints.empty()) { + // There are not yet any publishers on the topic. + // Use the default QoS profile, as we do not know what publishers will use since there are not + // yet any publishers on this topic. + return qos; + } - if (durability_transient_local_endpoints_count == endpoints.size()) - { - // All publishers are transient local, so we can use the transient local QoS - qos.transient_local(); - } - else - { - if (durability_transient_local_endpoints_count > 0) - { - // There is a mix of QoS profiles amongst the publishers, so use the QoS setting that captures - // all of them - RCLCPP_WARN( - get_logger(), - "Some, but not all, publishers on topic \"%s\" are offering " - "RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. Falling back to " - "RMW_QOS_POLICY_DURABILITY_VOLATILE as it will connect to all publishers. Previously-" - "published latched messages will not be retrieved.", - topic.c_str()); - } - qos.durability_volatile(); - } + // Count the number of reliable and transient-local publishers for the topic + size_t reliability_reliable_endpoints_count = 0; + size_t durability_transient_local_endpoints_count = 0; + for (const auto & endpoint : endpoints) { + const auto & profile = endpoint.qos_profile().get_rmw_qos_profile(); + if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + ++reliability_reliable_endpoints_count; + } + if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + ++durability_transient_local_endpoints_count; + } + } - return qos; + if (reliability_reliable_endpoints_count == endpoints.size()) { + // All publishers are reliable, so we can use the reliable QoS + qos.reliable(); + } else { + if (reliability_reliable_endpoints_count > 0) { + // There is a mix of QoS profiles amongst the publishers, so use the QoS setting that captures + // all of them + RCLCPP_WARN( + get_logger(), + "Some, but not all, publishers on topic \"%s\" are offering " + "RMW_QOS_POLICY_RELIABILITY_RELIABLE. Falling back to " + "RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT as it will connect to all publishers. Some " + "messages from Reliable publishers could be dropped.", + topic.c_str()); } + qos.best_effort(); + } - void SystemDataRecorder::unsubscribe_from_topics() - { - // Make a note of the topics we are subscribed to - std::vector topics; - for (const auto &topic_with_type : topics_and_types_) - { - topics.push_back(rosbag2_storage::TopicMetadata( - {topic_with_type.first, - topic_with_type.second, - rmw_get_serialization_format(), - ""})); - } - // Unsubscribing happens automatically when the subscription objects are destroyed - subscriptions_.clear(); + if (durability_transient_local_endpoints_count == endpoints.size()) { + // All publishers are transient local, so we can use the transient local QoS + qos.transient_local(); + } else { + if (durability_transient_local_endpoints_count > 0) { + // There is a mix of QoS profiles amongst the publishers, so use the QoS setting that captures + // all of them + RCLCPP_WARN( + get_logger(), + "Some, but not all, publishers on topic \"%s\" are offering " + "RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. Falling back to " + "RMW_QOS_POLICY_DURABILITY_VOLATILE as it will connect to all publishers. Previously-" + "published latched messages will not be retrieved.", + topic.c_str()); } + qos.durability_volatile(); + } - /////////////////////////////////////////////////////////////////////////////////////////////////// - // File-copying thread - /////////////////////////////////////////////////////////////////////////////////////////////////// + return qos; +} - void SystemDataRecorder::copy_thread_main() - { - RCLCPP_INFO(get_logger(), "Copy thread: Starting"); - SdrStateChange current_state = SdrStateChange::PAUSED; - std::queue local_files_to_copy; - - while (current_state != SdrStateChange::FINISHED) - { - { - std::unique_lock lock(copy_thread_mutex_); - copy_thread_wake_cv_.wait( - lock, [this] - { return copy_thread_should_wake(); }); - - if (state_msg_ != SdrStateChange::NO_CHANGE) - { - current_state = state_msg_; - state_msg_ = SdrStateChange::NO_CHANGE; - } - local_files_to_copy.swap(files_to_copy_); - } +void SystemDataRecorder::unsubscribe_from_topics() +{ + // Make a note of the topics we are subscribed to + std::vector topics; + for (const auto & topic_with_type : topics_and_types_) { + topics.push_back( + rosbag2_storage::TopicMetadata( + {topic_with_type.first, + topic_with_type.second, + rmw_get_serialization_format(), + ""})); + } + // Unsubscribing happens automatically when the subscription objects are destroyed + subscriptions_.clear(); +} - while (!local_files_to_copy.empty()) - { - FileCopyJob job = local_files_to_copy.front(); - local_files_to_copy.pop(); - copy_bag_file(job); - } - } +/////////////////////////////////////////////////////////////////////////////////////////////////// +// File-copying thread +/////////////////////////////////////////////////////////////////////////////////////////////////// - // After thread is finished, one last check for any remaining files - while (!files_to_copy_.empty()) - { - copy_bag_file(files_to_copy_.front()); - files_to_copy_.pop(); - } - // And clean up the final temporary directory if it exists - if (std::filesystem::exists(current_bag_tmp_directory_)) - { - std::filesystem::remove_all(current_bag_tmp_directory_); - } - RCLCPP_INFO(get_logger(), "Copy thread: Exiting"); - } +void SystemDataRecorder::copy_thread_main() +{ + RCLCPP_INFO(get_logger(), "Copy thread: Starting"); + SdrStateChange current_state = SdrStateChange::PAUSED; + std::queue local_files_to_copy; - bool SystemDataRecorder::copy_thread_should_wake() + while (current_state != SdrStateChange::FINISHED) { { - return state_msg_ != SdrStateChange::NO_CHANGE || !files_to_copy_.empty(); + std::unique_lock lock(copy_thread_mutex_); + copy_thread_wake_cv_.wait( + lock, [this] + {return copy_thread_should_wake();}); + + if (state_msg_ != SdrStateChange::NO_CHANGE) { + current_state = state_msg_; + state_msg_ = SdrStateChange::NO_CHANGE; + } + local_files_to_copy.swap(files_to_copy_); } - void SystemDataRecorder::notify_state_change(SdrStateChange new_state) - { - { - std::lock_guard lock(copy_thread_mutex_); - state_msg_ = new_state; - } - copy_thread_wake_cv_.notify_one(); + while (!local_files_to_copy.empty()) { + FileCopyJob job = local_files_to_copy.front(); + local_files_to_copy.pop(); + copy_bag_file(job); } + } - void SystemDataRecorder::notify_new_file_to_copy(const FileCopyJob &job) - { - { - std::lock_guard lock(copy_thread_mutex_); - files_to_copy_.push(job); - } - copy_thread_wake_cv_.notify_one(); - } + // After thread is finished, one last check for any remaining files + while (!files_to_copy_.empty()) { + copy_bag_file(files_to_copy_.front()); + files_to_copy_.pop(); + } + // And clean up the final temporary directory if it exists + if (std::filesystem::exists(current_bag_tmp_directory_)) { + std::filesystem::remove_all(current_bag_tmp_directory_); + } + RCLCPP_INFO(get_logger(), "Copy thread: Exiting"); +} - /////////////////////////////////////////////////////////////////////////////////////////////////// - // File-copying functionality - /////////////////////////////////////////////////////////////////////////////////////////////////// +bool SystemDataRecorder::copy_thread_should_wake() +{ + return state_msg_ != SdrStateChange::NO_CHANGE || !files_to_copy_.empty(); +} - void SystemDataRecorder::copy_bag_file(const FileCopyJob &job) - { - if (job.source_path.empty() || !std::filesystem::exists(job.source_path)) - { - RCLCPP_WARN(get_logger(), "Skipping copy of non-existent source file: %s", job.source_path.c_str()); - return; - } - RCLCPP_INFO( - get_logger(), "Copying %s to %s", - job.source_path.c_str(), job.destination_path.c_str()); - try - { - std::filesystem::copy(job.source_path, job.destination_path); - } - catch (const std::filesystem::filesystem_error &e) - { - RCLCPP_ERROR( - get_logger(), "Failed to copy '%s' to '%s': %s", - job.source_path.c_str(), job.destination_path.c_str(), e.what()); - } - } +void SystemDataRecorder::notify_state_change(SdrStateChange new_state) +{ + { + std::lock_guard lock(copy_thread_mutex_); + state_msg_ = new_state; + } + copy_thread_wake_cv_.notify_one(); +} + +void SystemDataRecorder::notify_new_file_to_copy(const FileCopyJob & job) +{ + { + std::lock_guard lock(copy_thread_mutex_); + files_to_copy_.push(job); + } + copy_thread_wake_cv_.notify_one(); +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +// File-copying functionality +/////////////////////////////////////////////////////////////////////////////////////////////////// + +void SystemDataRecorder::copy_bag_file(const FileCopyJob & job) +{ + if (job.source_path.empty() || !std::filesystem::exists(job.source_path)) { + RCLCPP_WARN( + get_logger(), "Skipping copy of non-existent source file: %s", + job.source_path.c_str()); + return; + } + RCLCPP_INFO( + get_logger(), "Copying %s to %s", + job.source_path.c_str(), job.destination_path.c_str()); + try { + std::filesystem::copy(job.source_path, job.destination_path); + } catch (const std::filesystem::filesystem_error & e) { + RCLCPP_ERROR( + get_logger(), "Failed to copy '%s' to '%s': %s", + job.source_path.c_str(), job.destination_path.c_str(), e.what()); + } +} -} // namespace sdr \ No newline at end of file +} // namespace sdr diff --git a/src/sdr_keyboard_commander.cpp b/src/sdr_keyboard_commander.cpp index cbaeab0..189f6cc 100644 --- a/src/sdr_keyboard_commander.cpp +++ b/src/sdr_keyboard_commander.cpp @@ -3,200 +3,195 @@ namespace sdr { - SDRKeyboardCommander::SDRKeyboardCommander(const rclcpp::NodeOptions &options) - : Node("sdr_keyboard_commander", options) - { - // Initialize the service clients - change_state_client_ = this->create_client( - "/" + target_node_name_ + "/change_state"); - get_state_client_ = this->create_client( - "/" + target_node_name_ + "/get_state"); - - // Initialize the transition-to-string map for logging - transition_map_[Transition::TRANSITION_CONFIGURE] = "CONFIGURE"; - transition_map_[Transition::TRANSITION_ACTIVATE] = "ACTIVATE"; - transition_map_[Transition::TRANSITION_DEACTIVATE] = "DEACTIVATE"; - transition_map_[Transition::TRANSITION_CLEANUP] = "CLEANUP"; - transition_map_[Transition::TRANSITION_INACTIVE_SHUTDOWN] = "SHUTDOWN"; - - // Set up non-blocking keyboard - init_keyboard(); - - // Start a timer to poll the keyboard - timer_ = this->create_wall_timer( - std::chrono::milliseconds(100), - std::bind(&SDRKeyboardCommander::check_keyboard, this)); - - RCLCPP_INFO(get_logger(), "SDR Keyboard Commander started."); - RCLCPP_INFO(get_logger(), "Waiting for '/%s' lifecycle services...", target_node_name_.c_str()); - - // Wait for services to be available - while (!change_state_client_->wait_for_service(std::chrono::seconds(1))) - { - if (!rclcpp::ok()) - { - RCLCPP_ERROR(get_logger(), "Interrupted while waiting for service. Exiting."); - return; - } - RCLCPP_INFO(get_logger(), "Service '/%s/change_state' not available, waiting...", target_node_name_.c_str()); - } - - RCLCPP_INFO(get_logger(), "Services found. Ready to command."); - print_help(); - } - - SDRKeyboardCommander::~SDRKeyboardCommander() - { - // Restore terminal settings on exit - restore_keyboard(); - } - - void SDRKeyboardCommander::print_help() - { - printf("\n--- SDR Keyboard Commander ---\n"); - printf("Control the '%s' node:\n", target_node_name_.c_str()); - printf(" [c] -> CONFIGURE\n"); - printf(" [a] -> ACTIVATE (Start Recording)\n"); - printf(" [d] -> DEACTIVATE (Pause Recording)\n"); - printf(" [l] -> CLEANUP\n"); - printf(" [s] -> SHUTDOWN\n"); - printf(" [g] -> GET current state\n"); - printf(" [h] -> Print this HELP menu\n"); - printf(" [q] -> QUIT\n"); - printf("--------------------------------\n"); - } - - void SDRKeyboardCommander::init_keyboard() - { - // Get the current terminal settings - tcgetattr(STDIN_FILENO, &old_tio_); - // Copy settings to a new struct - new_tio_ = old_tio_; - // Disable canonical mode (line buffering) and echo - new_tio_.c_lflag &= ~(ICANON | ECHO); - // Set minimum number of characters for a read to 0 - new_tio_.c_cc[VMIN] = 0; - // Set the read timeout to 0 (non-blocking) - new_tio_.c_cc[VTIME] = 0; - // Apply the new settings - tcsetattr(STDIN_FILENO, TCSANOW, &new_tio_); +SDRKeyboardCommander::SDRKeyboardCommander(const rclcpp::NodeOptions & options) +: Node("sdr_keyboard_commander", options) +{ + // Initialize the service clients + change_state_client_ = this->create_client( + "/" + target_node_name_ + "/change_state"); + get_state_client_ = this->create_client( + "/" + target_node_name_ + "/get_state"); + + // Initialize the transition-to-string map for logging + transition_map_[Transition::TRANSITION_CONFIGURE] = "CONFIGURE"; + transition_map_[Transition::TRANSITION_ACTIVATE] = "ACTIVATE"; + transition_map_[Transition::TRANSITION_DEACTIVATE] = "DEACTIVATE"; + transition_map_[Transition::TRANSITION_CLEANUP] = "CLEANUP"; + transition_map_[Transition::TRANSITION_INACTIVE_SHUTDOWN] = "SHUTDOWN"; + + // Set up non-blocking keyboard + init_keyboard(); + + // Start a timer to poll the keyboard + timer_ = this->create_wall_timer( + std::chrono::milliseconds(100), + std::bind(&SDRKeyboardCommander::check_keyboard, this)); + + RCLCPP_INFO(get_logger(), "SDR Keyboard Commander started."); + RCLCPP_INFO(get_logger(), "Waiting for '/%s' lifecycle services...", target_node_name_.c_str()); + + // Wait for services to be available + while (!change_state_client_->wait_for_service(std::chrono::seconds(1))) { + if (!rclcpp::ok()) { + RCLCPP_ERROR(get_logger(), "Interrupted while waiting for service. Exiting."); + return; } + RCLCPP_INFO( + get_logger(), "Service '/%s/change_state' not available, waiting...", + target_node_name_.c_str()); + } - void SDRKeyboardCommander::restore_keyboard() - { - // Restore the original terminal settings - tcsetattr(STDIN_FILENO, TCSANOW, &old_tio_); - } + RCLCPP_INFO(get_logger(), "Services found. Ready to command."); + print_help(); +} - int SDRKeyboardCommander::getch() - { - char c = -1; - // Read 1 character from stdin - ssize_t n = read(STDIN_FILENO, &c, 1); - if (n == 1) - { - return c; // Return the character if read was successful - } - return -1; // Return -1 if no key was pressed - } +SDRKeyboardCommander::~SDRKeyboardCommander() +{ + // Restore terminal settings on exit + restore_keyboard(); +} - void SDRKeyboardCommander::check_keyboard() - { - int key = getch(); - if (key == -1) - { - return; // No key pressed - } - - switch (key) - { - case 'c': - RCLCPP_INFO(get_logger(), "Requesting CONFIGURE..."); - request_transition(Transition::TRANSITION_CONFIGURE); - break; - case 'a': - RCLCPP_INFO(get_logger(), "Requesting ACTIVATE..."); - request_transition(Transition::TRANSITION_ACTIVATE); - break; - case 'd': - RCLCPP_INFO(get_logger(), "Requesting DEACTIVATE..."); - request_transition(Transition::TRANSITION_DEACTIVATE); - break; - case 'l': - RCLCPP_INFO(get_logger(), "Requesting CLEANUP..."); - request_transition(Transition::TRANSITION_CLEANUP); - break; - case 's': - RCLCPP_INFO(get_logger(), "Requesting SHUTDOWN..."); - request_transition(Transition::TRANSITION_INACTIVE_SHUTDOWN); - break; - case 'g': - RCLCPP_INFO(get_logger(), "Requesting GET_STATE..."); - get_current_state(); - break; - case 'h': - print_help(); - break; - case 'q': - RCLCPP_INFO(get_logger(), "Quitting commander node..."); - restore_keyboard(); // Restore keyboard before shutting down - rclcpp::shutdown(); - break; - } - } +void SDRKeyboardCommander::print_help() +{ + printf("\n--- SDR Keyboard Commander ---\n"); + printf("Control the '%s' node:\n", target_node_name_.c_str()); + printf(" [c] -> CONFIGURE\n"); + printf(" [a] -> ACTIVATE (Start Recording)\n"); + printf(" [d] -> DEACTIVATE (Pause Recording)\n"); + printf(" [l] -> CLEANUP\n"); + printf(" [s] -> SHUTDOWN\n"); + printf(" [g] -> GET current state\n"); + printf(" [h] -> Print this HELP menu\n"); + printf(" [q] -> QUIT\n"); + printf("--------------------------------\n"); +} + +void SDRKeyboardCommander::init_keyboard() +{ + // Get the current terminal settings + tcgetattr(STDIN_FILENO, &old_tio_); + // Copy settings to a new struct + new_tio_ = old_tio_; + // Disable canonical mode (line buffering) and echo + new_tio_.c_lflag &= ~(ICANON | ECHO); + // Set minimum number of characters for a read to 0 + new_tio_.c_cc[VMIN] = 0; + // Set the read timeout to 0 (non-blocking) + new_tio_.c_cc[VTIME] = 0; + // Apply the new settings + tcsetattr(STDIN_FILENO, TCSANOW, &new_tio_); +} + +void SDRKeyboardCommander::restore_keyboard() +{ + // Restore the original terminal settings + tcsetattr(STDIN_FILENO, TCSANOW, &old_tio_); +} - void SDRKeyboardCommander::request_transition(uint8_t transition_id) +int SDRKeyboardCommander::getch() +{ + char c = -1; + // Read 1 character from stdin + ssize_t n = read(STDIN_FILENO, &c, 1); + if (n == 1) { + return c; // Return the character if read was successful + } + return -1; // Return -1 if no key was pressed +} + +void SDRKeyboardCommander::check_keyboard() +{ + int key = getch(); + if (key == -1) { + return; // No key pressed + } + + switch (key) { + case 'c': + RCLCPP_INFO(get_logger(), "Requesting CONFIGURE..."); + request_transition(Transition::TRANSITION_CONFIGURE); + break; + case 'a': + RCLCPP_INFO(get_logger(), "Requesting ACTIVATE..."); + request_transition(Transition::TRANSITION_ACTIVATE); + break; + case 'd': + RCLCPP_INFO(get_logger(), "Requesting DEACTIVATE..."); + request_transition(Transition::TRANSITION_DEACTIVATE); + break; + case 'l': + RCLCPP_INFO(get_logger(), "Requesting CLEANUP..."); + request_transition(Transition::TRANSITION_CLEANUP); + break; + case 's': + RCLCPP_INFO(get_logger(), "Requesting SHUTDOWN..."); + request_transition(Transition::TRANSITION_INACTIVE_SHUTDOWN); + break; + case 'g': + RCLCPP_INFO(get_logger(), "Requesting GET_STATE..."); + get_current_state(); + break; + case 'h': + print_help(); + break; + case 'q': + RCLCPP_INFO(get_logger(), "Quitting commander node..."); + restore_keyboard(); // Restore keyboard before shutting down + rclcpp::shutdown(); + break; + } +} + +void SDRKeyboardCommander::request_transition(uint8_t transition_id) +{ + if (!change_state_client_->service_is_ready()) { + RCLCPP_ERROR( + get_logger(), "Service '%s' is not available.", change_state_client_->get_service_name()); + return; + } + + auto request = std::make_shared(); + request->transition.id = transition_id; + + // Send the request asynchronously + change_state_client_->async_send_request( + request, + [this, transition_id](rclcpp::Client::SharedFuture future) { - if (!change_state_client_->service_is_ready()) - { - RCLCPP_ERROR(get_logger(), "Service '%s' is not available.", change_state_client_->get_service_name()); - return; - } - - auto request = std::make_shared(); - request->transition.id = transition_id; - - // Send the request asynchronously - change_state_client_->async_send_request( - request, - [this, transition_id](rclcpp::Client::SharedFuture future) - { - auto result = future.get(); - if (result->success) - { - RCLCPP_INFO( - this->get_logger(), "Transition '%s' successful.", - this->transition_map_[transition_id].c_str()); - } - else - { - RCLCPP_WARN( - this->get_logger(), "Transition '%s' failed.", - this->transition_map_[transition_id].c_str()); - } - }); - } - - void SDRKeyboardCommander::get_current_state() + auto result = future.get(); + if (result->success) { + RCLCPP_INFO( + this->get_logger(), "Transition '%s' successful.", + this->transition_map_[transition_id].c_str()); + } else { + RCLCPP_WARN( + this->get_logger(), "Transition '%s' failed.", + this->transition_map_[transition_id].c_str()); + } + }); +} + +void SDRKeyboardCommander::get_current_state() +{ + if (!get_state_client_->service_is_ready()) { + RCLCPP_ERROR( + get_logger(), "Service '%s' is not available.", + get_state_client_->get_service_name()); + return; + } + + auto request = std::make_shared(); + + // Send the request asynchronously + get_state_client_->async_send_request( + request, + [this](rclcpp::Client::SharedFuture future) { - if (!get_state_client_->service_is_ready()) - { - RCLCPP_ERROR(get_logger(), "Service '%s' is not available.", get_state_client_->get_service_name()); - return; - } - - auto request = std::make_shared(); - - // Send the request asynchronously - get_state_client_->async_send_request( - request, - [this](rclcpp::Client::SharedFuture future) - { - auto result = future.get(); - RCLCPP_INFO( - this->get_logger(), "Current state of '%s' is: %s", - this->target_node_name_.c_str(), result->current_state.label.c_str()); - }); - } - -} // namespace sdr \ No newline at end of file + auto result = future.get(); + RCLCPP_INFO( + this->get_logger(), "Current state of '%s' is: %s", + this->target_node_name_.c_str(), result->current_state.label.c_str()); + }); +} + +} // namespace sdr diff --git a/src/sdr_main.cpp b/src/sdr_main.cpp index 1cacefc..4678131 100644 --- a/src/sdr_main.cpp +++ b/src/sdr_main.cpp @@ -22,7 +22,7 @@ #include "rosbag2_cpp/writer.hpp" #include "rosbag2_transport/recorder.hpp" -int main(int argc, char *argv[]) +int main(int argc, char * argv[]) { rclcpp::init(argc, argv); rclcpp::executors::SingleThreadedExecutor exec; From 38da64b7c31ada81efb8cd7bd98b02cb50774bca Mon Sep 17 00:00:00 2001 From: nimiCurtis Date: Sun, 9 Nov 2025 14:12:09 +0200 Subject: [PATCH 9/9] Add copyright notice and licensing information to source files --- include/sdr/sdr_keyboard_commander.hpp | 20 ++++++++++++++++++++ src/sdr_commander_main.cpp | 20 ++++++++++++++++++++ src/sdr_component.cpp | 9 --------- src/sdr_keyboard_commander.cpp | 20 ++++++++++++++++++++ 4 files changed, 60 insertions(+), 9 deletions(-) diff --git a/include/sdr/sdr_keyboard_commander.hpp b/include/sdr/sdr_keyboard_commander.hpp index fd94fb2..02aab09 100644 --- a/include/sdr/sdr_keyboard_commander.hpp +++ b/include/sdr/sdr_keyboard_commander.hpp @@ -1,3 +1,23 @@ +// Copyright 2025 nimiCurtis +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +// THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + #ifndef SDR_KEYBOARD_COMMANDER_HPP_ #define SDR_KEYBOARD_COMMANDER_HPP_ diff --git a/src/sdr_commander_main.cpp b/src/sdr_commander_main.cpp index 07efa11..55e8405 100644 --- a/src/sdr_commander_main.cpp +++ b/src/sdr_commander_main.cpp @@ -1,3 +1,23 @@ +// Copyright 2025 nimiCurtis +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +// THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + #include "sdr/sdr_keyboard_commander.hpp" // Adjust this path #include "rclcpp/rclcpp.hpp" diff --git a/src/sdr_component.cpp b/src/sdr_component.cpp index 15e2764..b8c468d 100644 --- a/src/sdr_component.cpp +++ b/src/sdr_component.cpp @@ -166,15 +166,6 @@ void SystemDataRecorder::write_all_nodes_parameters_yaml(const std::filesystem:: rcl_interfaces::srv::ListParameters::Request::DEPTH_RECURSIVE ); - // --- ADD THIS LOGGING BLOCK --- - RCLCPP_INFO(get_logger(), "--- DEBUG PARAMS FOR NODE: %s ---", fqn.c_str()); - std::string names_str = "Names: ["; - for (const auto & name : list_res.names) { - names_str += "\"" + name + "\", "; - } - names_str += "]"; - RCLCPP_INFO(get_logger(), "%s", names_str.c_str()); - // --- END LOGGING BLOCK --- } catch (const std::exception & e) { RCLCPP_WARN(get_logger(), "list_parameters failed for %s: %s", fqn.c_str(), e.what()); continue; diff --git a/src/sdr_keyboard_commander.cpp b/src/sdr_keyboard_commander.cpp index 189f6cc..127ddea 100644 --- a/src/sdr_keyboard_commander.cpp +++ b/src/sdr_keyboard_commander.cpp @@ -1,3 +1,23 @@ +// Copyright 2025 nimiCurtis +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +// THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + #include "sdr/sdr_keyboard_commander.hpp" // Adjust this path to match your package namespace sdr