diff --git a/CMakeLists.txt b/CMakeLists.txt index c33e60c..f3823fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.8) +cmake_minimum_required(VERSION 3.10...3.27) project(system_data_recorder) if(NOT CMAKE_CXX_STANDARD) @@ -36,12 +36,25 @@ ament_target_dependencies(sdr_component ) rclcpp_components_register_nodes(sdr_component "sdr::SystemDataRecorder") -add_executable(system_data_recorder src/system_data_recorder.cpp) -target_link_libraries(system_data_recorder sdr_component) -ament_target_dependencies(system_data_recorder +add_executable(sdr src/sdr_main.cpp) + +add_executable(sdr_commander + src/sdr_commander_main.cpp + src/sdr_keyboard_commander.cpp +) + +target_link_libraries(sdr sdr_component) + +ament_target_dependencies(sdr + rclcpp +) + +ament_target_dependencies(sdr_commander rclcpp + lifecycle_msgs ) + install(TARGETS sdr_component ARCHIVE DESTINATION lib @@ -50,10 +63,20 @@ install(TARGETS ) install(TARGETS - system_data_recorder + sdr DESTINATION lib/${PROJECT_NAME} ) +install(TARGETS + sdr_commander + DESTINATION lib/${PROJECT_NAME} +) + +# Install launch, config directories +install(DIRECTORY launch DESTINATION share/${PROJECT_NAME}) +install(DIRECTORY config DESTINATION share/${PROJECT_NAME}) + + if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) # the following line skips the linter which checks for copyrights @@ -66,3 +89,7 @@ if(BUILD_TESTING) endif() ament_package() + + + + diff --git a/README.md b/README.md index 2bbc9b5..2d186c9 100644 --- a/README.md +++ b/README.md @@ -1,129 +1,197 @@ # System Data Recorder (SDR) - -A lifecycle node and executable for recording topic data to a rosbag2 bag, while simultaneously copying the split bag files to another location as each bag file is completed. +A lifecycle node for recording multiple, distinct rosbag2 bags sessions, while simultaneously copying the split bag files to another location as each bag file is completed. This is useful, for example, to copy bag data files to an external disc during recording as each data file is completed, rather than waiting until all data is recorded before copying the entire bag at once (an operation that can take a significant time if the bag is large). The copying function requires that a maximum file size for bag files be enabled. Otherwise no splitting will be performed and the files will not be copied until recording is terminated. +## Configuration + +The SDR is configured using a YAML parameter file, which is loaded via a ROS 2 launch file. This is the recommended way to run the node. + +#### Parameters -## Compilation +* `bag_name_prefix` (string): The base name for each bag file. A timestamp will be appended to this (e.g., `"recording"` becomes `"recording_2025-07-10_02-30-00"`). +* `copy_destination` (string): The parent directory where all recording session folders will be saved. +* `max_file_size` (int): The maximum size in bytes for an individual `.db3` file before it is split. A value greater than `0` is required for the copy-on-split feature to work. +* `topic_names` (string array): A list of topics to record. +* `topic_types` (string array): A corresponding list of message types for the topics. -The SDR requires two features not yet available in the rosbag2 main branch or binary releases. -You will need to compile rosbag2 from source, applying the following two pull requests to the source prior to compiling it. +#### Example `sdr_example.yaml` -1. [Expose the QoS object wrapper](https://github.com/ros2/rosbag2/pull/910) -2. [Notification of significant events during bag recording and playback](https://github.com/ros2/rosbag2/pull/908) +```yaml +sdr: + ros__parameters: + # The base name for each bag file. A timestamp will be appended to this. + bag_name_prefix: "robot_data" -Create a `colcon` workspace with the [SDR source code](https://github.com/osrf/system_data_recorder) in it, and compile the workspace. + # The parent directory where all recording session folders will be saved. + copy_destination: "/home/user/sdr_bags" + # Maximum size of each individual .db3 file in bytes before splitting. + max_file_size: 268435456 # 256 MiB + + # List of topics and their types to record. + topic_names: [ + "/chatter", + "/diagnostics" + ] + topic_types: [ + "std_msgs/msg/String", + "diagnostic_msgs/msg/DiagnosticArray" + ] +``` ## Use ### Lifecycle management -Run the executable to start the node. +Run the executable using a launch file that loads your parameters. For example: +`ros2 launch system_data_recorder sdr_example.launch.py` + +In a separate terminal, use the lifecycle manager to control the node. - ros2 run system_data_recorder system_data_recorder +1. **Configure the node.** +This sets up the main session directory and prepares the node for recording. This is done only once. -In a separate terminal, use the lifecycle manager to configure and activate the node. +```bash +ros2 lifecycle set /sdr configure +``` - ros2 lifecycle set sdr configure - ros2 lifecycle set sdr activate +2. **Activate to start recording.** +This begins a new, timestamped bag recording. -This will enable recording of data to the bag. -To pause recording, deactivate the node. +```bash +ros2 lifecycle set /sdr activate +``` - ros2 lifecycle set sdr deactivate +3. **Deactivate to stop recording.** +This finalizes the current bag and copies its files to the permanent destination. -From here, recording can be resumed by re-activating the node, or recording can be terminated by cleaning up the node. +```bash +ros2 lifecycle set /sdr deactivate +``` - ros2 lifecycle set sdr cleanup +You can repeat steps 2 and 3 as many times as you like to create multiple, separate bag files within the same run. -Once cleaned up, the node will copy the final files of the bag, as well as any metadata, to the backup destination. +4. **Cleanup the node.** +This stops the background threads and cleans up resources. -### Configuring +```bash +ros2 lifecycle set /sdr cleanup +``` -The SDR is configured by the arguments passed to the node's constructor. -These are not currently exposed as command line arguments or ROS parameters, so they must be changed in the source code. -See the constructor's documentation block for the possible parameters. +5. **Shutdown the node.** -By default, the SDR will: +```bash +ros2 lifecycle set /sdr shutdown +``` -- Record from the `/chatter` topic, expecting `std_msgs/msg/String` data. -- Record to a bag named `test_bag`. -- Copy bag files to the directory `copied_bag/test_bag`. -- Split bag files every 100,000 bytes. +### A Simple Test -### A simple test +1. In one terminal, start publishing data: -In one terminal, start publishing data on the `/chatter` topic. +```bash +ros2 launch system_data_recorder sdr_example.launch.py +``` - ros2 run demo_nodes_cpp talker +2. In another terminal, launch the SDR node with your parameters: -In another terminal, start the SDR node. +```bash +ros2 launch your_package_name your_launch_file.py +``` - ros2 run system_data_recorder system_data_recorder +3. In a third terminal, configure the SDR, then activate it to start the first recording: -In a third terminal, configure and activate the SDR. +```bash +ros2 lifecycle set /sdr configure +ros2 lifecycle set /sdr activate +``` - ros2 lifecycle set sdr configure - ros2 lifecycle set sdr activate +4. After some time, deactivate it to save the first bag: -Wait a minute or two for 100,000 bytes of data to be recorded. -Navigate to the directory `copied_bag` and observe that there is a `test_bag` -directory containing the first data file of the bag. +```bash +ros2 lifecycle set /sdr deactivate +``` -Deactivate and cleanup the SDR. +5. Activate it again to start a second, new recording: - ros2 lifecycle set sdr deactivate - ros2 lifecycle set sdr cleanup +```bash +ros2 lifecycle set /sdr activate +``` -Again navigate to the `copied_bag` directory, and observe that the `test_bag` -directory now contains all the data files of the bag, and a `metadata.yaml` -file. +6. Deactivate and clean up: -This is a complete, usable bag. -This can be demonstrated by stopping the `talker` node, then starting the -`listener` node and playing the bag file. +```bash +ros2 lifecycle set /sdr deactivate +ros2 lifecycle set /sdr cleanup +``` - ros2 run demo_nodes_cpp listener - ros2 bag play copied_bag/test_bag +Now, navigate to your `copy_destination` directory (e.g., `/home/user/sdr_bags`). You will find a main session folder (e.g., `sdr_session_2025-07-10_03-00-00`), and inside it will be two complete, timestamped bag directories from your two recording sessions. -The recorded data will be echoed by the `listener` node. +You can play back one of the bags: +```bash +ros2 bag play /home/user/sdr_bags/sdr_session_.../robot_data_... +``` -## Lifecycle transition behaviours +## Lifecycle Transition Behaviours ### on_configure -In the on_configure state, the node sets up the rosbag2 infrastructure for -recording by creating a writer and opening the storage. It also sets up the -worker thread that will copy files in parallel to the recording of data. A -callback is registered with the writer so that the node will get informed each -time a new file is created in the bag. +Creates the main, timestamped session directory that will contain all bags recorded during this run. It also starts the background file-copying thread. **It does not start any recording.** ### on_activate -In the on_activate transition, the node simply notifies the worker thread that -it is recording. Data will be written to the bag automatically because the -state changes to `Active`. +Begins a **new, unique recording session**. It creates a timestamped bag, sets up a temporary storage location, initializes a `rosbag2::Writer`, and subscribes to the requested topics. Data begins being written to the temporary bag file. ### on_deactivate -In the on_deactivate transition, the node simply notifies the worker thread -that it is paused. Data will not be written to the bag because the state -changes to `Inactive`. +**Finalizes the current recording session**. It unsubscribes from topics, closes the `rosbag2::Writer` (which flushes all data to disk and writes the `metadata.yaml` file), and queues the final bag files to be copied to their permanent destination within the main session folder. ### on_cleanup -When cleaning up, the node needs to stop recording, stop receiving data (i.e. -unsubscribe from topics), and ensure that the final files of the bag (the last -data file and the metadata file, which gets written when the writer object -destructs) are copied to the destination directory. +Ensures the last active recording session is properly deactivated and finalized. It then gracefully shuts down the background copy thread. ### on_shutdown -If not already performed, the shutdown transition performs the same functions -as the cleanup transition. +Performs the same actions as `on_cleanup` before shutting down the node. + + +--- + +## Keyboard Commander Utility + +For manual control and testing, an `SDRKeyboardCommander` node is available. This node listens for keyboard presses and sends the corresponding lifecycle transition requests to the `/sdr` node. + + +### Running + +1. In one terminal, run your `sdr` node: + + ```bash + ros2 launch system_data_recorder sdr_example.launch.py + ``` + +2. In a second terminal, source your workspace and run the commander: + + ```bash + ros2 run system_data_recorder sdr_commander + ``` + +### Controls + +Once the commander node is running and connected to the `/sdr` services, you can use the following keys to control the recorder: + +| Key | Action | Lifecycle Transition | +| :--- | :--- | :--- | +| **c** | Configure | `CONFIGURE` | +| **a** | Activate | `ACTIVATE` (Starts recording) | +| **d** | Deactivate | `DEACTIVATE` (Pauses recording) | +| **l** | Cleanup | `CLEANUP` | +| **s** | Shutdown | `SHUTDOWN` | +| **g** | Get State | (Queries and prints the current state) | +| **h** | Help | (Prints the help menu) | +| **q** | Quit | (Shuts down the commander node) | + diff --git a/config/sdr_example.yaml b/config/sdr_example.yaml new file mode 100644 index 0000000..fd3d519 --- /dev/null +++ b/config/sdr_example.yaml @@ -0,0 +1,18 @@ +sdr: + ros__parameters: + # The base name for each bag file. A timestamp will be appended to this. + bag_name_prefix: "robot_data" + + # The parent directory where all recording session folders will be saved. + copy_destination: "/home//sdr_bags" + + # Maximum size of each individual .db3 file in bytes before splitting. + max_file_size: 268435456 # 256 MiB + + # List of topics and their types to record. + topic_names: [ + "/chatter", + ] + topic_types: [ + "std_msgs/msg/String", + ] \ No newline at end of file diff --git a/include/sdr/sdr_component.hpp b/include/sdr/sdr_component.hpp index d50d347..0f436e4 100644 --- a/include/sdr/sdr_component.hpp +++ b/include/sdr/sdr_component.hpp @@ -19,9 +19,14 @@ #include #include #include +#include // For timestamping +#include // For formatting the timestamp +#include +#include +#include "rclcpp/parameter_client.hpp" -#include "sdr/visibility_control.h" +#include "sdr/visibility_control.h" #include "rclcpp/rclcpp.hpp" #include "rclcpp_lifecycle/lifecycle_node.hpp" #include "rosbag2_cpp/writer.hpp" @@ -30,34 +35,19 @@ namespace sdr { +// A struct to hold information for the file-copying thread +struct FileCopyJob +{ + std::string source_path; + std::filesystem::path destination_path; +}; + class SystemDataRecorder : public rclcpp_lifecycle::LifecycleNode { public: - /// Constructor - /// - /// @param node_name The graph name to use for the SDR node - /// @param options The options to construct the node with - /// @param bag_uri The relative or full path to where the bag will be stored during recording. - /// Must not exist. - /// @param copy_destination The relative or full path to where the bag files will be copied. Must - /// not exist. - /// @param max_file_size The maximum size of each individual file in the bag, in bytes. This - /// should be tuned based on the time to copy each file, how often the backup files should be - /// copied, how fast data is coming in, etc. - /// @param topics_and_types A map of topics to record and their types. Keys of the map are topic - /// names, values of the map are the topic types, e.g. "example_interfaces/msg/String". SDR_PUBLIC - SystemDataRecorder( - const std::string & node_name, - const rclcpp::NodeOptions & options, - const std::string & bag_uri, - const std::string & copy_destination, - const unsigned int max_file_size, - const std::unordered_map & topics_and_types); - - ///////////////////////////////////////////////////////////////////////////////////////////////// - // Lifecycle states - ///////////////////////////////////////////////////////////////////////////////////////////////// + explicit SystemDataRecorder(const rclcpp::NodeOptions & options); + SDR_PUBLIC rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn on_configure(const rclcpp_lifecycle::State & state); @@ -79,36 +69,31 @@ class SystemDataRecorder : public rclcpp_lifecycle::LifecycleNode on_shutdown(const rclcpp_lifecycle::State & state); private: - ///////////////////////////////////////////////////////////////////////////////////////////////// - // Variables and functions for the record functionality - ///////////////////////////////////////////////////////////////////////////////////////////////// - - // Options for the bag storage - rosbag2_storage::StorageOptions storage_options_; - // The topics that will be recorded, and their types - std::unordered_map topics_and_types_; - // A map from topic names to subscriber entities - std::unordered_map> subscriptions_; - - // This writer does the actual storing of data during recording - std::shared_ptr writer_; + // Parameter loading + bool read_parameters(); - void start_recording(); - void stop_recording(); - void pause_recording(); - void unpause_recording(); + // Helper function to get a formatted timestamp string + std::string generate_timestamp(); + // Bag recording functionality void subscribe_to_topics(); void subscribe_to_topic(const std::string & topic, const std::string & type); + void unsubscribe_from_topics(); + + // Parameter querying functionality + void write_all_nodes_parameters_yaml(const std::filesystem::path & out_file); + void write_yaml_value_from_param(YAML::Node & node, const rclcpp::Parameter & p); + + std::string get_serialised_offered_qos_for_topic(const std::string & topic); rclcpp::QoS get_appropriate_qos_for_topic(const std::string & topic); - void unsubscribe_from_topics(); - ///////////////////////////////////////////////////////////////////////////////////////////////// - // Variables and functions for the file-copying thread and functionality - ///////////////////////////////////////////////////////////////////////////////////////////////// + rosbag2_storage::StorageOptions storage_options_; + std::unordered_map topics_and_types_; + std::unordered_map> subscriptions_; + std::shared_ptr writer_; - // This is used to pass state-change messages from the node to the file-copying worker thread + // File-copying functionality enum class SdrStateChange { NO_CHANGE, @@ -117,39 +102,30 @@ class SystemDataRecorder : public rclcpp_lifecycle::LifecycleNode FINISHED }; - // This variable holds the message to send to the worker thread SdrStateChange state_msg_ = SdrStateChange::NO_CHANGE; - // This queue passes bag files that need to be copied to the worker thread - std::queue files_to_copy_; - // This mutex protects the two above variables + std::queue files_to_copy_; // Queue now holds jobs std::mutex copy_thread_mutex_; - // This condition variable is used to wake up the worker thread when there is a new state-change - // message or there are files to copy std::condition_variable copy_thread_wake_cv_; - // The worker thread std::shared_ptr copy_thread_; void copy_thread_main(); bool copy_thread_should_wake(); void notify_state_change(SdrStateChange new_state); - void notify_new_file_to_copy(const std::string & file_uri); - void notify_new_file_to_copy(const std::filesystem::path & file_path); - - // The source directory is the location of the bag, i.e. where bag files are copied from - std::filesystem::path source_directory_; - // The destination directory where bag files will be copied to - std::filesystem::path destination_directory_; - // The name of the final file in the bag; no event notification will be received when recording - // stops because there is no "stop recording" concept in rosbag2, you simply stop passing data to - // the writer. So we need to store the name of the final file and copy it during cleanup. + void notify_new_file_to_copy(const FileCopyJob & job); + void copy_bag_file(const FileCopyJob & job); + + // Base configuration loaded from parameters + std::string base_bag_name_prefix_; + std::filesystem::path base_copy_destination_; + + // Session-specific paths, valid during an active recording + std::filesystem::path session_destination_directory_; // The main folder for this run + std::filesystem::path current_bag_tmp_directory_; // Temp folder for the current bag + std::filesystem::path current_bag_final_destination_; // Final folder for the current bag std::string last_bag_file_ = ""; - // Prevent multiple cleanup bool cleaned_up = true; - - bool create_copy_destination(); - void copy_bag_file(const std::string & bag_file_name); }; -} // namespace sdr +} // namespace sdr -#endif // SDR__SDR_COMPONENT_HPP__ +#endif // SDR__SDR_COMPONENT_HPP__ diff --git a/include/sdr/sdr_keyboard_commander.hpp b/include/sdr/sdr_keyboard_commander.hpp new file mode 100644 index 0000000..02aab09 --- /dev/null +++ b/include/sdr/sdr_keyboard_commander.hpp @@ -0,0 +1,105 @@ +// Copyright 2025 nimiCurtis +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +// THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +#ifndef SDR_KEYBOARD_COMMANDER_HPP_ +#define SDR_KEYBOARD_COMMANDER_HPP_ + +#include "rclcpp/rclcpp.hpp" +#include "lifecycle_msgs/srv/change_state.hpp" +#include "lifecycle_msgs/srv/get_state.hpp" +#include "lifecycle_msgs/msg/transition.hpp" +#include "lifecycle_msgs/msg/state.hpp" + +#include // For non-blocking keyboard input +#include // For STDIN_FILENO +#include +#include + +// Define aliases for convenience +using ChangeState = lifecycle_msgs::srv::ChangeState; +using GetState = lifecycle_msgs::srv::GetState; +using Transition = lifecycle_msgs::msg::Transition; +using State = lifecycle_msgs::msg::State; + +namespace sdr +{ + +class SDRKeyboardCommander : public rclcpp::Node +{ +public: + explicit SDRKeyboardCommander(const rclcpp::NodeOptions & options); + virtual ~SDRKeyboardCommander(); + +private: + // --- Keyboard Handling --- + /** + * @brief Configures the terminal for non-blocking, non-echoing input. + */ + void init_keyboard(); + + /** + * @brief Restores the terminal to its original settings. + */ + void restore_keyboard(); + + /** + * @brief Polls the keyboard for a single character. + * @return The character pressed, or -1 if no key was pressed. + */ + int getch(); + + /** + * @brief Timer callback to check for keyboard input and process it. + */ + void check_keyboard(); + + // --- ROS 2 Lifecycle Client Functions --- + /** + * @brief Sends a request to the /sdr/change_state service. + * @param transition_id The ID of the transition to request (e.g., Transition::TRANSITION_CONFIGURE). + */ + void request_transition(uint8_t transition_id); + + /** + * @brief Sends a request to the /sdr/get_state service. + */ + void get_current_state(); + + /** + * @brief Prints the help menu to the console. + */ + void print_help(); + + // --- Member Variables --- + rclcpp::Client::SharedPtr change_state_client_; + rclcpp::Client::SharedPtr get_state_client_; + rclcpp::TimerBase::SharedPtr timer_; + + // For keyboard input + struct termios old_tio_, new_tio_; + + // For logging + std::map transition_map_; + std::string target_node_name_ = "sdr"; +}; + +} // namespace sdr + +#endif // SDR_KEYBOARD_COMMANDER_HPP_ diff --git a/include/sdr/visibility_control.h b/include/sdr/visibility_control.h index d576cfb..b286465 100644 --- a/include/sdr/visibility_control.h +++ b/include/sdr/visibility_control.h @@ -56,4 +56,3 @@ extern "C" #endif #endif // SDR__VISIBILITY_CONTROL_H_ - diff --git a/launch/sdr_example.launch.py b/launch/sdr_example.launch.py new file mode 100644 index 0000000..7612c09 --- /dev/null +++ b/launch/sdr_example.launch.py @@ -0,0 +1,24 @@ +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch_ros.actions import Node + +def generate_launch_description(): + # Get the path to the package + sdr_pkg_path = get_package_share_directory('system_data_recorder') # <-- CHANGE 'your_package_name' + + # Get the path to the YAML file + config_file_path = os.path.join(sdr_pkg_path, 'config', 'sdr_example.yaml') + + # Create the node action + sdr_node = Node( + package='system_data_recorder', # <-- CHANGE 'your_package_name' + executable='system_data_recorder', # <-- CHANGE to your C++ executable name from CMakeLists.txt + name='sdr', + output='screen', + parameters=[config_file_path] + ) + + return LaunchDescription([ + sdr_node + ]) \ No newline at end of file diff --git a/package.xml b/package.xml index 4bd6ff5..03d56c4 100644 --- a/package.xml +++ b/package.xml @@ -18,8 +18,8 @@ rosbag2_storage yaml_cpp_vendor - ament_lint_auto - ament_lint_common + ament_cmake diff --git a/src/sdr_commander_main.cpp b/src/sdr_commander_main.cpp new file mode 100644 index 0000000..55e8405 --- /dev/null +++ b/src/sdr_commander_main.cpp @@ -0,0 +1,35 @@ +// Copyright 2025 nimiCurtis +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +// THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +#include "sdr/sdr_keyboard_commander.hpp" // Adjust this path +#include "rclcpp/rclcpp.hpp" + +int main(int argc, char * argv[]) +{ + rclcpp::init(argc, argv); + + rclcpp::NodeOptions options; + auto commander_node = std::make_shared(options); + + rclcpp::spin(commander_node); + + rclcpp::shutdown(); + return 0; +} diff --git a/src/sdr_component.cpp b/src/sdr_component.cpp index a901a5c..b8c468d 100644 --- a/src/sdr_component.cpp +++ b/src/sdr_component.cpp @@ -14,192 +14,419 @@ #include "sdr/sdr_component.hpp" +#include #include "lifecycle_msgs/msg/state.hpp" #include "rosbag2_storage/topic_metadata.hpp" #include "rosbag2_transport/qos.hpp" #include "yaml-cpp/yaml.h" + namespace sdr { -SystemDataRecorder::SystemDataRecorder( - const std::string & node_name, - const rclcpp::NodeOptions & options, - const std::string & bag_uri, - const std::string & copy_destination, - const unsigned int max_file_size, - const std::unordered_map & topics_and_types) -: rclcpp_lifecycle::LifecycleNode(node_name, options), - topics_and_types_(topics_and_types), - source_directory_(bag_uri) +SystemDataRecorder::SystemDataRecorder(const rclcpp::NodeOptions & options) +: rclcpp_lifecycle::LifecycleNode("sdr", options) +{ + RCLCPP_INFO(get_logger(), "Initializing SystemDataRecorder node..."); + if (!read_parameters()) { + throw std::runtime_error("Failed to read parameters for SystemDataRecorder"); + } +} + +bool SystemDataRecorder::read_parameters() { - // Set up the storage options for writing the bag - storage_options_.uri = bag_uri; - // Only the "sqlite3" storage backend is supported; currently this is the only backend provided - // by rosbag2 so this is not a problem at this time + // Declare parameters with new names + this->declare_parameter("bag_name_prefix", "bag"); + this->declare_parameter("copy_destination", ""); + this->declare_parameter("max_file_size", 0); + this->declare_parameter>("topic_names", std::vector{}); + this->declare_parameter>("topic_types", std::vector{}); + + std::string copy_destination_str; + int max_file_size; + std::vector topic_names, topic_types; + + if (!this->get_parameter( + "bag_name_prefix", + base_bag_name_prefix_) || base_bag_name_prefix_.empty()) + { + RCLCPP_FATAL(get_logger(), "Required parameter 'bag_name_prefix' not set or is empty."); + return false; + } + if (!this->get_parameter( + "copy_destination", + copy_destination_str) || copy_destination_str.empty()) + { + RCLCPP_FATAL(get_logger(), "Required parameter 'copy_destination' not set or is empty."); + return false; + } + base_copy_destination_ = copy_destination_str; + + if (!this->get_parameter("max_file_size", max_file_size) || max_file_size <= 0) { + RCLCPP_FATAL(get_logger(), "Required parameter 'max_file_size' not set or is <= 0."); + return false; + } + this->get_parameter("topic_names", topic_names); + this->get_parameter("topic_types", topic_types); + + if (topic_names.size() != topic_types.size()) { + RCLCPP_FATAL( + get_logger(), "'topic_names' and 'topic_types' must have the same number of entries."); + return false; + } + + for (size_t i = 0; i < topic_names.size(); ++i) { + this->topics_and_types_[topic_names[i]] = topic_types[i]; + } + storage_options_.storage_id = "sqlite3"; - // Set the maximum size of each individual file storage_options_.max_bagfile_size = max_file_size; - // Using a write cache is not required, but it helps avoid disc I/O becoming a bottleneck. - // Set this option to 0 to disable the cache - storage_options_.max_cache_size = 100*1024*1024; - - // Get the name of the bag directory from the bag URI - the bag URI should not be empty - auto bag_directory_name = std::filesystem::path(bag_uri).filename(); - if (bag_directory_name.empty()) { - // There was probably a slash on the end - std::string bag_uri_copy = bag_uri; - bag_directory_name = std::filesystem::path( - bag_uri_copy.erase(bag_uri.size() - 1, 1)).filename(); + storage_options_.max_cache_size = 100 * 1024 * 1024; + + RCLCPP_INFO(get_logger(), "--- SystemDataRecorder Base Configuration ---"); + RCLCPP_INFO(get_logger(), "* Bag Name Prefix: %s", base_bag_name_prefix_.c_str()); + RCLCPP_INFO(get_logger(), "* Base Copy Destination: %s", base_copy_destination_.c_str()); + RCLCPP_INFO(get_logger(), "* Max File Size: %d bytes", max_file_size); + RCLCPP_INFO(get_logger(), "* Topics to Record:"); + if (this->topics_and_types_.empty()) { + RCLCPP_INFO(get_logger(), " - None"); + } else { + for (const auto & pair : this->topics_and_types_) { + RCLCPP_INFO(get_logger(), " - %s (%s)", pair.first.c_str(), pair.second.c_str()); + } } - // A directory named after the bag will be created under the copy_destination directory to put - // the copied files into - destination_directory_ = std::filesystem::path(copy_destination) / bag_directory_name; + RCLCPP_INFO(get_logger(), "-------------------------------------------"); - // Initially, the "last" bag file will be the first bag file - last_bag_file_ = std::filesystem::path(bag_uri) / bag_directory_name.concat("_0.db3"); + return true; } -/////////////////////////////////////////////////////////////////////////////////////////////////// -// Lifecycle states -/////////////////////////////////////////////////////////////////////////////////////////////////// +std::string SystemDataRecorder::generate_timestamp() +{ + auto now = std::chrono::system_clock::now(); + auto in_time_t = std::chrono::system_clock::to_time_t(now); + std::stringstream ss; + ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d_%H-%M-%S"); + return ss.str(); +} -// In the on_configure state, we set up the rosbag2 infrastructure for recording by creating a -// writer and opening the storage. We also set up the worker thread that will copy files in -// parallel to the recording of data. A callback is registered with the writer so that we will get -// informed each time a new file is created in the bag. -rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn -SystemDataRecorder::on_configure(const rclcpp_lifecycle::State & /* state */) + +void SystemDataRecorder::write_all_nodes_parameters_yaml(const std::filesystem::path & out_file) { - RCLCPP_INFO(get_logger(), "Preparing to begin recording"); + YAML::Node root; + + // Local helper node; not added to any executor + auto query_node = std::make_shared( + std::string(this->get_name()) + "_param_query", + this->get_namespace(), + rclcpp::NodeOptions() + .context(this->get_node_base_interface()->get_context()) + .start_parameter_services(false) + .start_parameter_event_publisher(false) + .use_intra_process_comms(false) + ); - RCLCPP_INFO(get_logger(), "Copying bag files to %s", destination_directory_.c_str()); + auto make_fqn = [](const std::string & ns, const std::string & name) { + if (ns.empty() || ns == "/") {return std::string("/") + name;} + return ns + "/" + name; + }; + const std::string self_fqn = make_fqn(this->get_namespace(), this->get_name()); + + auto name_ns_pairs = this->get_node_graph_interface()->get_node_names_and_namespaces(); + std::sort( + name_ns_pairs.begin(), name_ns_pairs.end(), + [](const auto & a, const auto & b) { + if (a.second == b.second) {return a.first < b.first;} + return a.second < b.second; + }); - // Prepare for file-copying by creating the destination directory and setting up a worker thread - try { - if (!create_copy_destination()) { - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::FAILURE; + for (const auto & nmns : name_ns_pairs) { + const auto & name = nmns.first; + const auto & ns = nmns.second; + const auto fqn = make_fqn(ns, name); + + if (fqn == self_fqn) { + continue; // skip our own node + } + if (!name.empty() && name[0] == '_') { + continue; // optional: skip hidden nodes + + } + auto client = std::make_shared(query_node.get(), fqn); + + if (!client->wait_for_service(std::chrono::milliseconds(300))) { + RCLCPP_WARN(get_logger(), "Param service not available for node %s, skipping.", fqn.c_str()); + continue; } + + // 1) List declared parameter names (depth is generous) + rcl_interfaces::msg::ListParametersResult list_res; + try { + list_res = client->list_parameters( + {}, + rcl_interfaces::srv::ListParameters::Request::DEPTH_RECURSIVE + ); + + } catch (const std::exception & e) { + RCLCPP_WARN(get_logger(), "list_parameters failed for %s: %s", fqn.c_str(), e.what()); + continue; + } + + const auto & names = list_res.names; + if (names.empty()) {continue;} + + YAML::Node params_node(YAML::NodeType::Map); + + // 2) Fetch each parameter **individually** + for (const auto & full_name : names) { + // FIX: Changed "qos_overrides." to "qos_overrides" + // Your log showed "qos_overrides./..." so this will catch it + if (full_name.rfind("qos_overrides", 0) == 0) { + continue; // skip QoS noise + } + + rclcpp::Parameter param; + try { + auto vec = client->get_parameters({full_name}); + if (!vec.empty()) { + param = vec.front(); + } else { + // If the server gives nothing back, still create a null entry + std::stringstream ss(full_name); + std::string key; + std::vector keys; + while (std::getline(ss, key, '.')) {if (!key.empty()) {keys.push_back(key);}} + + // --- FIX: Use a reference, not a pointer --- + YAML::Node & current_ref = params_node; + for (size_t k = 0; k + 1 < keys.size(); ++k) { + current_ref = current_ref[keys[k]]; // Re-assign reference to child + } + current_ref[keys.back()] = YAML::Node(); // null + // --- END FIX --- + continue; + } + } catch (const std::exception & e) { + RCLCPP_WARN( + get_logger(), "get_parameters(%s) failed for %s: %s", + full_name.c_str(), fqn.c_str(), e.what()); + continue; + } + + // 3) Build nested YAML using the **full** dotted name + std::stringstream ss(full_name); + std::string key; + std::vector keys; + while (std::getline(ss, key, '.')) {if (!key.empty()) {keys.push_back(key);}} + + YAML::Node value_node; + write_yaml_value_from_param(value_node, param); + + if (keys.empty()) { + params_node[full_name] = value_node; + } else { + // Recursively build nested structure + std::function build_nested = + [&](YAML::Node & node, size_t depth) { + if (depth == keys.size() - 1) { + // Last key, set the value + node[keys[depth]] = value_node; + } else { + // Intermediate key, ensure it's a Map and recurse + if (!node[keys[depth]].IsDefined() || !node[keys[depth]].IsMap()) { + node[keys[depth]] = YAML::Node(YAML::NodeType::Map); + } + YAML::Node child = node[keys[depth]]; + build_nested(child, depth + 1); + node[keys[depth]] = child; // Write back the modified child + } + }; + + build_nested(params_node, 0); + } + } + YAML::Node node_root; + node_root["node_name"] = name; + node_root["node_namespace"] = ns; + node_root["parameters"] = params_node; + + root[fqn] = node_root; } - catch (std::filesystem::filesystem_error const & ex) { - RCLCPP_ERROR(get_logger(), "Could not create destination directory for bag backup"); + + try { + std::ofstream ofs(out_file); + ofs << root; + ofs.close(); + RCLCPP_INFO_STREAM(get_logger(), "Wrote ALL nodes parameters YAML: " << out_file); + } catch (const std::exception & e) { + RCLCPP_ERROR( + get_logger(), "Failed writing ALL nodes parameters YAML to '%s': %s", + out_file.c_str(), e.what()); + } + + query_node.reset(); // ensure destruction before return +} + + +void SystemDataRecorder::write_yaml_value_from_param(YAML::Node & node, const rclcpp::Parameter & p) +{ + using rclcpp::ParameterType; + + switch (p.get_type()) { + case ParameterType::PARAMETER_BOOL: node = p.as_bool(); break; + case ParameterType::PARAMETER_INTEGER: node = p.as_int(); break; + case ParameterType::PARAMETER_DOUBLE: node = p.as_double(); break; + case ParameterType::PARAMETER_STRING: node = p.as_string(); break; + case ParameterType::PARAMETER_BYTE_ARRAY: { + YAML::Node seq; for (auto b : p.as_byte_array()) { + seq.push_back(static_cast(b)); + } + node = seq; break; + } + case ParameterType::PARAMETER_BOOL_ARRAY: { + YAML::Node seq; for (auto v : p.as_bool_array()) { + seq.push_back(v); + } + node = seq; break; + } + case ParameterType::PARAMETER_INTEGER_ARRAY: { + YAML::Node seq; for (auto v : p.as_integer_array()) { + seq.push_back(v); + } + node = seq; break; + } + case ParameterType::PARAMETER_DOUBLE_ARRAY: { + YAML::Node seq; for (auto v : p.as_double_array()) { + seq.push_back(v); + } + node = seq; break; + } + case ParameterType::PARAMETER_STRING_ARRAY: { + YAML::Node seq; for (auto & v : p.as_string_array()) { + seq.push_back(v); + } + node = seq; break; + } + case ParameterType::PARAMETER_NOT_SET: + default: node = YAML::Node(); break; + } +} + + +rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn +SystemDataRecorder::on_configure(const rclcpp_lifecycle::State &) +{ + RCLCPP_INFO(get_logger(), "Configuring SDR. Creating main session directory."); + + // Create the main timestamped directory for all bags from this run + session_destination_directory_ = base_copy_destination_ / ("sdr_session_" + generate_timestamp()); + + try { + std::filesystem::create_directories(session_destination_directory_); + } catch (const std::filesystem::filesystem_error & ex) { + RCLCPP_ERROR( + get_logger(), "Could not create session directory '%s': %s", + session_destination_directory_.c_str(), ex.what()); return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::FAILURE; } - copy_thread_ = std::make_shared([this]{ copy_thread_main(); }); + RCLCPP_INFO( + get_logger(), "All copied bags for this session will be stored in: %s", + session_destination_directory_.c_str()); + - // Notify the copy thread to get it into the correct state + // Start the file-copying thread + copy_thread_ = std::make_shared( + [this] + {this->copy_thread_main();}); notify_state_change(SdrStateChange::PAUSED); + cleaned_up = false; + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; +} + +rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn +SystemDataRecorder::on_activate(const rclcpp_lifecycle::State &) +{ + std::string bag_name = base_bag_name_prefix_ + "_" + generate_timestamp(); + RCLCPP_INFO(get_logger(), "Activating. Starting new recording session: %s", bag_name.c_str()); + + // Set up paths for this specific recording session + current_bag_tmp_directory_ = std::filesystem::temp_directory_path() / bag_name; + current_bag_final_destination_ = session_destination_directory_ / bag_name; + storage_options_.uri = current_bag_tmp_directory_.string(); + + // Pre-emptively set the name of the first bag file. This ensures we always have a valid + // filename to copy, even if no bag split occurs. + last_bag_file_ = (current_bag_tmp_directory_ / (bag_name + "_0.db3")).string(); + + // Ensure the temporary directory is clean + if (std::filesystem::exists(current_bag_tmp_directory_)) { + std::filesystem::remove_all(current_bag_tmp_directory_); + } + std::filesystem::create_directories(current_bag_final_destination_); + + const auto params_yaml_path = current_bag_final_destination_ / "rosparams.yaml"; + write_all_nodes_parameters_yaml(params_yaml_path); - // Prepare the rosbag2 objects for recording - // A sequential writer is used as this is the only writer currently available writer_ = std::make_shared( std::make_unique()); - // Add a callback for when a split occurs + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; callbacks.write_split_callback = - [this] - (rosbag2_cpp::bag_events::BagSplitInfo & info) { - // Record the opened file - this will be the last file remaining to be copied when recording - // is terminated + [this](rosbag2_cpp::bag_events::BagSplitInfo & info) + { last_bag_file_ = info.opened_file; - // Notify the worker thread about the closed file being ready to copy - notify_new_file_to_copy(info.closed_file); + notify_new_file_to_copy({info.closed_file, current_bag_final_destination_}); }; writer_->add_event_callbacks(callbacks); - // Open the bag - writer_->open( - storage_options_, - {rmw_get_serialization_format(), rmw_get_serialization_format()}); + writer_->open(storage_options_, {rmw_get_serialization_format(), rmw_get_serialization_format()}); - // Start receiving data - it won't be recorded yet because the node's lifecycle is not in the - // Active state subscribe_to_topics(); + notify_state_change(SdrStateChange::RECORDING); - // Make sure cleanup happens when the on_cleanup transition occurs - this allows the node to be - // cleaned up and then configured again (with the bag files moved away somewhere else) without - // shutting it down - cleaned_up = false; return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; } -// In the on_activate transition, we simply notify the worker thread that we are recording. Data -// will be written to the bag because the state changes to Active. rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn -SystemDataRecorder::on_activate(const rclcpp_lifecycle::State & /* state */) +SystemDataRecorder::on_deactivate(const rclcpp_lifecycle::State &) { - RCLCPP_INFO(get_logger(), "Starting recording"); + RCLCPP_INFO(get_logger(), "Deactivating. Finalizing current bag."); + notify_state_change(SdrStateChange::PAUSED); + unsubscribe_from_topics(); - // Notify the copy thread - notify_state_change(SdrStateChange::RECORDING); + if (writer_) { + writer_.reset(); // Finalizes bag and writes metadata.yaml + notify_new_file_to_copy({last_bag_file_, current_bag_final_destination_}); + notify_new_file_to_copy( + {current_bag_tmp_directory_ / "metadata.yaml", current_bag_final_destination_}); + } return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; } -// In the on_deactivate transition, we simply notify the worker thread that we are paused. Data -// will not be written to the bag because the state changes to Inactive. rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn -SystemDataRecorder::on_deactivate(const rclcpp_lifecycle::State & /* state */) +SystemDataRecorder::on_cleanup(const rclcpp_lifecycle::State & state) { - RCLCPP_INFO(get_logger(), "Pausing recording"); + RCLCPP_INFO(get_logger(), "Cleaning up SDR."); + if (get_current_state().id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE) { + on_deactivate(state); + } - // Notify the copy thread - notify_state_change(SdrStateChange::PAUSED); - return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; -} + if (copy_thread_) { + notify_state_change(SdrStateChange::FINISHED); + copy_thread_->join(); + copy_thread_.reset(); + } -// When cleaning up, we need to stop recording, stop receiving data (i.e. unsubscribe from topics), -// and ensure that the final files of the bag (the last data file and the metadata file, which gets -// written when the writer object destructs) are copied to the destination directory. -rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn -SystemDataRecorder::on_cleanup(const rclcpp_lifecycle::State & /* state */) -{ - RCLCPP_INFO(get_logger(), "Stopping and finalising recording"); - // Avoid double-cleanup if on_shutdown is called after this cleaned_up = true; - - // Stop recording by stopping data from arriving - unsubscribe_from_topics(); - // Clean up the writer so that the bag is closed and the metadata file is written - writer_.reset(); - // Copy the final data file - notify_new_file_to_copy(last_bag_file_); - // Copy the metadata file - notify_new_file_to_copy(source_directory_ / "metadata.yaml"); - - // Notify the worker thread that we are cleaning up, so that it exits onces it's finished its - // final work - notify_state_change(SdrStateChange::FINISHED); - copy_thread_->join(); - // Get rid of the worker thread - copy_thread_.reset(); - - RCLCPP_INFO(get_logger(), "Cleanup complete"); + RCLCPP_INFO(get_logger(), "Cleanup complete."); return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; } -// If not already performed, the shutdown transition performs the same functions as the cleanup -// transition. rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn -SystemDataRecorder::on_shutdown(const rclcpp_lifecycle::State & /* state */) +SystemDataRecorder::on_shutdown(const rclcpp_lifecycle::State & state) { - RCLCPP_INFO(get_logger(), "Stopping and finalising recording (hard shutdown)"); + RCLCPP_INFO(get_logger(), "Shutting down SDR."); if (!cleaned_up) { - // Stop recording - unsubscribe_from_topics(); - // Clean up the writer - writer_.reset(); - // Copy the final file - notify_new_file_to_copy(last_bag_file_); - // Copy the metadata file - notify_new_file_to_copy(source_directory_ / "metadata.yaml"); + on_cleanup(state); } - - // Notify the copy thread - but only if it hasn't already been cleaned up - if (copy_thread_) { - notify_state_change(SdrStateChange::FINISHED); - copy_thread_->join(); - copy_thread_.reset(); - } - RCLCPP_INFO(get_logger(), "Cleanup complete"); return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS; } @@ -224,12 +451,11 @@ void SystemDataRecorder::subscribe_to_topic(const std::string & topic, const std // The metadata to pass to the writer object so it can register the topic in the bag auto topic_metadata = rosbag2_storage::TopicMetadata( { - topic, // Topic name - type, // Topic type, e.g. "example_interfaces/msg/String" - rmw_get_serialization_format(), // The serialization format, most likely to be "CDR" - offered_qos // The offered QoS profile for the topic in YAML - } - ); + topic, // Topic name + type, // Topic type, e.g. "example_interfaces/msg/String" + rmw_get_serialization_format(), // The serialization format, most likely to be "CDR" + offered_qos // The offered QoS profile for the topic in YAML + }); // It is a good idea to create the topic in the writer prior to adding the subscription in case // data arrives after subscribing and before the topic is created. Although we should be ignoring // any data until the node is set to active, we maintain this good practice here for future @@ -245,7 +471,8 @@ void SystemDataRecorder::subscribe_to_topic(const std::string & topic, const std topic, type, qos, - [this, topic, type](std::shared_ptr message) { + [this, topic, type](std::shared_ptr message) + { // When a message is received, it should only be written to the bag if recording is not // paused (i.e. the node lifecycle state is "active"). If recording is paused, the message is // thrown away. @@ -312,9 +539,9 @@ rclcpp::QoS SystemDataRecorder::get_appropriate_qos_for_topic(const std::string RCLCPP_WARN( get_logger(), "Some, but not all, publishers on topic \"%s\" are offering " - "RMW_QOS_POLICY_RELIABILITY_RELIABLE. Falling back to " - "RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT as it will connect to all publishers. Some " - "messages from Reliable publishers could be dropped.", + "RMW_QOS_POLICY_RELIABILITY_RELIABLE. Falling back to " + "RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT as it will connect to all publishers. Some " + "messages from Reliable publishers could be dropped.", topic.c_str()); } qos.best_effort(); @@ -330,9 +557,9 @@ rclcpp::QoS SystemDataRecorder::get_appropriate_qos_for_topic(const std::string RCLCPP_WARN( get_logger(), "Some, but not all, publishers on topic \"%s\" are offering " - "RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. Falling back to " - "RMW_QOS_POLICY_DURABILITY_VOLATILE as it will connect to all publishers. Previously-" - "published latched messages will not be retrieved.", + "RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. Falling back to " + "RMW_QOS_POLICY_DURABILITY_VOLATILE as it will connect to all publishers. Previously-" + "published latched messages will not be retrieved.", topic.c_str()); } qos.durability_volatile(); @@ -346,13 +573,12 @@ void SystemDataRecorder::unsubscribe_from_topics() // Make a note of the topics we are subscribed to std::vector topics; for (const auto & topic_with_type : topics_and_types_) { - topics.push_back(rosbag2_storage::TopicMetadata( - { - topic_with_type.first, - topic_with_type.second, - rmw_get_serialization_format(), - "" - })); + topics.push_back( + rosbag2_storage::TopicMetadata( + {topic_with_type.first, + topic_with_type.second, + rmw_get_serialization_format(), + ""})); } // Unsubscribing happens automatically when the subscription objects are destroyed subscriptions_.clear(); @@ -362,109 +588,90 @@ void SystemDataRecorder::unsubscribe_from_topics() // File-copying thread /////////////////////////////////////////////////////////////////////////////////////////////////// -// The main function for the file-copying worker thread void SystemDataRecorder::copy_thread_main() { RCLCPP_INFO(get_logger(), "Copy thread: Starting"); - // Local state storage, to keep the critical section as short as possible SdrStateChange current_state = SdrStateChange::PAUSED; - std::queue local_files_to_copy; - // Loop until receiving a state-change message indicating it is time to stop - while (current_state != SdrStateChange::FINISHED) - { - { // Critical section start + std::queue local_files_to_copy; + + while (current_state != SdrStateChange::FINISHED) { + { std::unique_lock lock(copy_thread_mutex_); - if (files_to_copy_.empty()) { - // Only wait if there's nothing to copy, otherwise skip the wait and go on - while (!copy_thread_should_wake()) { - copy_thread_wake_cv_.wait(lock); - } - } + copy_thread_wake_cv_.wait( + lock, [this] + {return copy_thread_should_wake();}); - // If the state has changed, make a note of the new state if (state_msg_ != SdrStateChange::NO_CHANGE) { current_state = state_msg_; - // Reset the message-carrying variable so we don't get confused the next time around the - // loop state_msg_ = SdrStateChange::NO_CHANGE; } - - // local_files_to_copy_ will be empty here, so we are swapping the received queue of files - // and emptying the message-carrying queue at the same time local_files_to_copy.swap(files_to_copy_); - } // Critical section end + } - // Now copy any files that were sent from the node - while(!local_files_to_copy.empty()) { - std::string uri = local_files_to_copy.front(); + while (!local_files_to_copy.empty()) { + FileCopyJob job = local_files_to_copy.front(); local_files_to_copy.pop(); - copy_bag_file(uri); + copy_bag_file(job); } } + + // After thread is finished, one last check for any remaining files + while (!files_to_copy_.empty()) { + copy_bag_file(files_to_copy_.front()); + files_to_copy_.pop(); + } + // And clean up the final temporary directory if it exists + if (std::filesystem::exists(current_bag_tmp_directory_)) { + std::filesystem::remove_all(current_bag_tmp_directory_); + } RCLCPP_INFO(get_logger(), "Copy thread: Exiting"); } -// Only wake up if there are files to copy or a state-change message has been received bool SystemDataRecorder::copy_thread_should_wake() { return state_msg_ != SdrStateChange::NO_CHANGE || !files_to_copy_.empty(); } -// Notify the worker thread of a state change by sending it a message and triggering its condition -// variable void SystemDataRecorder::notify_state_change(SdrStateChange new_state) { - { // Critical section start + { std::lock_guard lock(copy_thread_mutex_); - // new_state must not be NO_CHANGE or the copy thread won't wake up state_msg_ = new_state; - } // Critical section end + } copy_thread_wake_cv_.notify_one(); } -// Notify the worker thread of a state change by adding the file to the queue and triggering its -// condition variable -void SystemDataRecorder::notify_new_file_to_copy(const std::string & file_uri) +void SystemDataRecorder::notify_new_file_to_copy(const FileCopyJob & job) { - { // Critical section start + { std::lock_guard lock(copy_thread_mutex_); - files_to_copy_.push(file_uri); - } // Critical section end + files_to_copy_.push(job); + } copy_thread_wake_cv_.notify_one(); } -// Override that accepts a std::filesystem::path -void SystemDataRecorder::notify_new_file_to_copy(const std::filesystem::path & file_path) -{ - notify_new_file_to_copy(file_path.string()); -} - /////////////////////////////////////////////////////////////////////////////////////////////////// // File-copying functionality /////////////////////////////////////////////////////////////////////////////////////////////////// -// Create the destination directory to copy bag files to -// Returns true if the directory was created, false otherwise -bool SystemDataRecorder::create_copy_destination() +void SystemDataRecorder::copy_bag_file(const FileCopyJob & job) { - if (std::filesystem::exists(destination_directory_)) { - RCLCPP_ERROR(get_logger(), "Copy destination directory already exists"); - return false; + if (job.source_path.empty() || !std::filesystem::exists(job.source_path)) { + RCLCPP_WARN( + get_logger(), "Skipping copy of non-existent source file: %s", + job.source_path.c_str()); + return; } - RCLCPP_INFO(get_logger(), "Creating destination directory %s", destination_directory_.c_str()); - return std::filesystem::create_directories(destination_directory_); -} - -// Copy a bag file to the destination directory -void SystemDataRecorder::copy_bag_file(const std::string & bag_file_name) -{ RCLCPP_INFO( - get_logger(), - "Copying %s to %s", - bag_file_name.c_str(), - destination_directory_.c_str()); - - std::filesystem::copy(bag_file_name, destination_directory_); + get_logger(), "Copying %s to %s", + job.source_path.c_str(), job.destination_path.c_str()); + try { + std::filesystem::copy(job.source_path, job.destination_path); + } catch (const std::filesystem::filesystem_error & e) { + RCLCPP_ERROR( + get_logger(), "Failed to copy '%s' to '%s': %s", + job.source_path.c_str(), job.destination_path.c_str(), e.what()); + } } -} // namespace sdr +} // namespace sdr diff --git a/src/sdr_keyboard_commander.cpp b/src/sdr_keyboard_commander.cpp new file mode 100644 index 0000000..127ddea --- /dev/null +++ b/src/sdr_keyboard_commander.cpp @@ -0,0 +1,217 @@ +// Copyright 2025 nimiCurtis +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +// THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +#include "sdr/sdr_keyboard_commander.hpp" // Adjust this path to match your package + +namespace sdr +{ + +SDRKeyboardCommander::SDRKeyboardCommander(const rclcpp::NodeOptions & options) +: Node("sdr_keyboard_commander", options) +{ + // Initialize the service clients + change_state_client_ = this->create_client( + "/" + target_node_name_ + "/change_state"); + get_state_client_ = this->create_client( + "/" + target_node_name_ + "/get_state"); + + // Initialize the transition-to-string map for logging + transition_map_[Transition::TRANSITION_CONFIGURE] = "CONFIGURE"; + transition_map_[Transition::TRANSITION_ACTIVATE] = "ACTIVATE"; + transition_map_[Transition::TRANSITION_DEACTIVATE] = "DEACTIVATE"; + transition_map_[Transition::TRANSITION_CLEANUP] = "CLEANUP"; + transition_map_[Transition::TRANSITION_INACTIVE_SHUTDOWN] = "SHUTDOWN"; + + // Set up non-blocking keyboard + init_keyboard(); + + // Start a timer to poll the keyboard + timer_ = this->create_wall_timer( + std::chrono::milliseconds(100), + std::bind(&SDRKeyboardCommander::check_keyboard, this)); + + RCLCPP_INFO(get_logger(), "SDR Keyboard Commander started."); + RCLCPP_INFO(get_logger(), "Waiting for '/%s' lifecycle services...", target_node_name_.c_str()); + + // Wait for services to be available + while (!change_state_client_->wait_for_service(std::chrono::seconds(1))) { + if (!rclcpp::ok()) { + RCLCPP_ERROR(get_logger(), "Interrupted while waiting for service. Exiting."); + return; + } + RCLCPP_INFO( + get_logger(), "Service '/%s/change_state' not available, waiting...", + target_node_name_.c_str()); + } + + RCLCPP_INFO(get_logger(), "Services found. Ready to command."); + print_help(); +} + +SDRKeyboardCommander::~SDRKeyboardCommander() +{ + // Restore terminal settings on exit + restore_keyboard(); +} + +void SDRKeyboardCommander::print_help() +{ + printf("\n--- SDR Keyboard Commander ---\n"); + printf("Control the '%s' node:\n", target_node_name_.c_str()); + printf(" [c] -> CONFIGURE\n"); + printf(" [a] -> ACTIVATE (Start Recording)\n"); + printf(" [d] -> DEACTIVATE (Pause Recording)\n"); + printf(" [l] -> CLEANUP\n"); + printf(" [s] -> SHUTDOWN\n"); + printf(" [g] -> GET current state\n"); + printf(" [h] -> Print this HELP menu\n"); + printf(" [q] -> QUIT\n"); + printf("--------------------------------\n"); +} + +void SDRKeyboardCommander::init_keyboard() +{ + // Get the current terminal settings + tcgetattr(STDIN_FILENO, &old_tio_); + // Copy settings to a new struct + new_tio_ = old_tio_; + // Disable canonical mode (line buffering) and echo + new_tio_.c_lflag &= ~(ICANON | ECHO); + // Set minimum number of characters for a read to 0 + new_tio_.c_cc[VMIN] = 0; + // Set the read timeout to 0 (non-blocking) + new_tio_.c_cc[VTIME] = 0; + // Apply the new settings + tcsetattr(STDIN_FILENO, TCSANOW, &new_tio_); +} + +void SDRKeyboardCommander::restore_keyboard() +{ + // Restore the original terminal settings + tcsetattr(STDIN_FILENO, TCSANOW, &old_tio_); +} + +int SDRKeyboardCommander::getch() +{ + char c = -1; + // Read 1 character from stdin + ssize_t n = read(STDIN_FILENO, &c, 1); + if (n == 1) { + return c; // Return the character if read was successful + } + return -1; // Return -1 if no key was pressed +} + +void SDRKeyboardCommander::check_keyboard() +{ + int key = getch(); + if (key == -1) { + return; // No key pressed + } + + switch (key) { + case 'c': + RCLCPP_INFO(get_logger(), "Requesting CONFIGURE..."); + request_transition(Transition::TRANSITION_CONFIGURE); + break; + case 'a': + RCLCPP_INFO(get_logger(), "Requesting ACTIVATE..."); + request_transition(Transition::TRANSITION_ACTIVATE); + break; + case 'd': + RCLCPP_INFO(get_logger(), "Requesting DEACTIVATE..."); + request_transition(Transition::TRANSITION_DEACTIVATE); + break; + case 'l': + RCLCPP_INFO(get_logger(), "Requesting CLEANUP..."); + request_transition(Transition::TRANSITION_CLEANUP); + break; + case 's': + RCLCPP_INFO(get_logger(), "Requesting SHUTDOWN..."); + request_transition(Transition::TRANSITION_INACTIVE_SHUTDOWN); + break; + case 'g': + RCLCPP_INFO(get_logger(), "Requesting GET_STATE..."); + get_current_state(); + break; + case 'h': + print_help(); + break; + case 'q': + RCLCPP_INFO(get_logger(), "Quitting commander node..."); + restore_keyboard(); // Restore keyboard before shutting down + rclcpp::shutdown(); + break; + } +} + +void SDRKeyboardCommander::request_transition(uint8_t transition_id) +{ + if (!change_state_client_->service_is_ready()) { + RCLCPP_ERROR( + get_logger(), "Service '%s' is not available.", change_state_client_->get_service_name()); + return; + } + + auto request = std::make_shared(); + request->transition.id = transition_id; + + // Send the request asynchronously + change_state_client_->async_send_request( + request, + [this, transition_id](rclcpp::Client::SharedFuture future) + { + auto result = future.get(); + if (result->success) { + RCLCPP_INFO( + this->get_logger(), "Transition '%s' successful.", + this->transition_map_[transition_id].c_str()); + } else { + RCLCPP_WARN( + this->get_logger(), "Transition '%s' failed.", + this->transition_map_[transition_id].c_str()); + } + }); +} + +void SDRKeyboardCommander::get_current_state() +{ + if (!get_state_client_->service_is_ready()) { + RCLCPP_ERROR( + get_logger(), "Service '%s' is not available.", + get_state_client_->get_service_name()); + return; + } + + auto request = std::make_shared(); + + // Send the request asynchronously + get_state_client_->async_send_request( + request, + [this](rclcpp::Client::SharedFuture future) + { + auto result = future.get(); + RCLCPP_INFO( + this->get_logger(), "Current state of '%s' is: %s", + this->target_node_name_.c_str(), result->current_state.label.c_str()); + }); +} + +} // namespace sdr diff --git a/src/system_data_recorder.cpp b/src/sdr_main.cpp similarity index 72% rename from src/system_data_recorder.cpp rename to src/sdr_main.cpp index d05f489..4678131 100644 --- a/src/system_data_recorder.cpp +++ b/src/sdr_main.cpp @@ -25,19 +25,12 @@ int main(int argc, char * argv[]) { rclcpp::init(argc, argv); + rclcpp::executors::SingleThreadedExecutor exec; - auto topics_and_types = std::unordered_map(); - topics_and_types.insert({"/chatter", "std_msgs/msg/String"}); - auto sdr_component = std::make_shared( - "sdr", - rclcpp::NodeOptions(), - "test_bag", - "copied_bag", - 100000, - topics_and_types); + // Create the node. It will load its parameters automatically. + auto sdr_node = std::make_shared(rclcpp::NodeOptions()); - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(sdr_component->get_node_base_interface()); + exec.add_node(sdr_node->get_node_base_interface()); exec.spin(); rclcpp::shutdown();