Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
#include <nebula_msgs/msg/nebula_packet.hpp>
#include <nebula_msgs/msg/nebula_packets.hpp>

#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -114,17 +116,31 @@ class ContinentalSRR520HwInterface
/// @param debug Target string
void print_debug(std::string debug);

/// @brief Main loop of the CAN receiver thread
/// @brief Main loop of the classic CAN receiver thread
void receive_loop();

/// @brief Main loop of the CAN FD receiver thread
void receive_fd_loop();

/// @brief Main loop of the packet callback processing thread
void callback_processing_loop();

std::unique_ptr<::drivers::socketcan::SocketCanReceiver> can_receiver_ptr_;
std::unique_ptr<::drivers::socketcan::SocketCanReceiver> can_fd_receiver_ptr_;
std::unique_ptr<::drivers::socketcan::SocketCanSender> can_sender_ptr_;
std::unique_ptr<std::thread> receiver_thread_ptr_;
std::unique_ptr<std::thread> receiver_fd_thread_ptr_;
std::unique_ptr<std::thread> callback_thread_ptr_;

std::shared_ptr<const ContinentalSRR520SensorConfiguration> config_ptr_;
std::function<void(std::unique_ptr<nebula_msgs::msg::NebulaPacket> buffer)>
nebula_packet_callback_;

// Thread-safe packet queue
std::queue<std::unique_ptr<nebula_msgs::msg::NebulaPacket>> packet_queue_;
std::mutex packet_queue_mutex_;
std::condition_variable packet_queue_cv_;

std::mutex receiver_mutex_;
bool sensor_interface_active_{};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <nebula_common/util/crc.hpp>
#include <rclcpp/time.hpp>

#include <condition_variable>
#include <iostream>
#include <limits>
#include <memory>
Expand Down Expand Up @@ -51,16 +52,34 @@ Status ContinentalSRR520HwInterface::sensor_interface_start()
try {
can_sender_ptr_ =
std::make_unique<::drivers::socketcan::SocketCanSender>(config_ptr_->interface, true);

// Create classic CAN receiver (enable_fd = false)
can_receiver_ptr_ =
std::make_unique<::drivers::socketcan::SocketCanReceiver>(config_ptr_->interface, false);

// Create CAN FD receiver (enable_fd = true)
can_fd_receiver_ptr_ =
std::make_unique<::drivers::socketcan::SocketCanReceiver>(config_ptr_->interface, true);

can_receiver_ptr_->SetCanFilters(
::drivers::socketcan::SocketCanReceiver::CanFilterList(config_ptr_->filters));
can_fd_receiver_ptr_->SetCanFilters(
::drivers::socketcan::SocketCanReceiver::CanFilterList(config_ptr_->filters));
logger_->info(std::string("applied filters: ") + config_ptr_->filters);

sensor_interface_active_ = true;

// Start classic CAN receiver thread
receiver_thread_ptr_ =
std::make_unique<std::thread>(&ContinentalSRR520HwInterface::receive_loop, this);

// Start CAN FD receiver thread
receiver_fd_thread_ptr_ =
std::make_unique<std::thread>(&ContinentalSRR520HwInterface::receive_fd_loop, this);

// Start callback processing thread
callback_thread_ptr_ =
std::make_unique<std::thread>(&ContinentalSRR520HwInterface::callback_processing_loop, this);
} catch (const std::exception & ex) {
Status status = Status::CAN_CONNECTION_ERROR;
logger_->error(std::string("Error connecting to CAN interface: ") + ex.what());
Expand Down Expand Up @@ -108,8 +127,68 @@ void ContinentalSRR520HwInterface::receive_loop()
}

try {
packet_msg_ptr->data.resize(68); // 64 bytes of data + 4 bytes of ID
receive_id = can_receiver_ptr_->receive_fd(
packet_msg_ptr->data.resize(12); // 8 bytes of data + 4 bytes of ID for classic CAN
receive_id = can_receiver_ptr_->receive(
packet_msg_ptr->data.data() + 4 * sizeof(uint8_t), receiver_timeout_nsec);
} catch (const std::exception & ex) {
logger_->error(std::string("Error receiving classic CAN message: ") + ex.what());
continue;
}

packet_msg_ptr->data.resize(receive_id.length() + 4);

uint32_t id = receive_id.identifier();
packet_msg_ptr->data[0] = (id & 0xFF000000) >> 24;
packet_msg_ptr->data[1] = (id & 0x00FF0000) >> 16;
packet_msg_ptr->data[2] = (id & 0x0000FF00) >> 8;
packet_msg_ptr->data[3] = (id & 0x000000FF) >> 0;

int64_t stamp = use_bus_time
? static_cast<int64_t>(receive_id.get_bus_time() * 1000U)
: static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());

packet_msg_ptr->stamp.sec = stamp / 1'000'000'000;
packet_msg_ptr->stamp.nanosec = stamp % 1'000'000'000;

if (receive_id.frame_type() == ::drivers::socketcan::FrameType::ERROR) {
logger_->error("Classic CAN message is an error frame");
continue;
}

// Push packet to thread-safe queue instead of calling callback directly
{
std::lock_guard<std::mutex> queue_lock(packet_queue_mutex_);
packet_queue_.push(std::move(packet_msg_ptr));
}
packet_queue_cv_.notify_one();
}
}

void ContinentalSRR520HwInterface::receive_fd_loop()
{
::drivers::socketcan::CanId receive_id{};
std::chrono::nanoseconds receiver_timeout_nsec;
bool use_bus_time = false;

while (true) {
auto packet_msg_ptr = std::make_unique<nebula_msgs::msg::NebulaPacket>();

{
std::lock_guard lock(receiver_mutex_);
receiver_timeout_nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(config_ptr_->receiver_timeout_sec));
use_bus_time = config_ptr_->use_bus_time;

if (!sensor_interface_active_) {
break;
}
}

try {
packet_msg_ptr->data.resize(68); // 64 bytes of data + 4 bytes of ID for CAN FD
receive_id = can_fd_receiver_ptr_->receive_fd(
packet_msg_ptr->data.data() + 4 * sizeof(uint8_t), receiver_timeout_nsec);
} catch (const std::exception & ex) {
logger_->error(std::string("Error receiving CAN FD message: ") + ex.what());
Expand Down Expand Up @@ -138,7 +217,47 @@ void ContinentalSRR520HwInterface::receive_loop()
continue;
}

nebula_packet_callback_(std::move(packet_msg_ptr));
// Push packet to thread-safe queue instead of calling callback directly
{
std::lock_guard<std::mutex> queue_lock(packet_queue_mutex_);
packet_queue_.push(std::move(packet_msg_ptr));
}
packet_queue_cv_.notify_one();
}
}

void ContinentalSRR520HwInterface::callback_processing_loop()
{
while (true) {
std::unique_ptr<nebula_msgs::msg::NebulaPacket> packet_ptr;

// Wait for packets in the queue or check if we should exit
{
std::unique_lock<std::mutex> queue_lock(packet_queue_mutex_);
packet_queue_cv_.wait(queue_lock, [this]() {
std::lock_guard receiver_lock(receiver_mutex_);
return !packet_queue_.empty() || !sensor_interface_active_;
});

// Check if we should exit
{
std::lock_guard receiver_lock(receiver_mutex_);
if (!sensor_interface_active_ && packet_queue_.empty()) {
break;
}
}

// Get packet from queue if available
if (!packet_queue_.empty()) {
packet_ptr = std::move(packet_queue_.front());
packet_queue_.pop();
}
}

// Process the packet outside the lock
if (packet_ptr && nebula_packet_callback_) {
nebula_packet_callback_(std::move(packet_ptr));
}
}
}

Expand Down Expand Up @@ -257,7 +376,21 @@ Status ContinentalSRR520HwInterface::sensor_interface_stop()
sensor_interface_active_ = false;
}

receiver_thread_ptr_->join();
// Notify the callback processing thread to wake up and check the stop condition
packet_queue_cv_.notify_all();

if (receiver_thread_ptr_ && receiver_thread_ptr_->joinable()) {
receiver_thread_ptr_->join();
}

if (receiver_fd_thread_ptr_ && receiver_fd_thread_ptr_->joinable()) {
receiver_fd_thread_ptr_->join();
}

if (callback_thread_ptr_ && callback_thread_ptr_->joinable()) {
callback_thread_ptr_->join();
}

return Status::ERROR_1;
}

Expand Down
Loading