diff --git a/include/csp/csp_id.h b/include/csp/csp_id.h index 5bdbab9cf..5ba07958d 100644 --- a/include/csp/csp_id.h +++ b/include/csp/csp_id.h @@ -15,6 +15,14 @@ extern "C" { */ void csp_id_prepend(csp_packet_t * packet); +/** + * Extract CSP header from a 4 (CSP1) or 6 (CSP2) byte array. + * + * @param The first bytes of a CSP packet, representing the CSP header structure. + * @return The extracted header ID structure. + */ +csp_id_t csp_id_extract(const uint8_t * data); + /** * Strip CSP header fields from the packet's data buffer. * @@ -86,6 +94,14 @@ int csp_id_get_header_size(void); */ void csp_id_prepend_fixup_cspv1(csp_packet_t * packet); +/** + * Extract CSP header from a 4 (CSP1) or 6 (CSP2) byte array (ZMQ fixup). + * + * @param The first bytes of a CSP packet, representing the CSP header structure. + * @return The extracted header ID structure. + */ +csp_id_t csp_id_extract_fixup_cspv1(const uint8_t * data); + /** * Strip CSPv1-compatible ID header (ZMQ fixup). * @@ -105,6 +121,13 @@ static inline void csp_id_prepend_fixup_cspv1(csp_packet_t * packet) { csp_id_prepend(packet); } +/** + * Wrapper for csp_id_extract when no fixup is required. + */ +static inline csp_id_t csp_id_extract_fixup_cspv1(const uint8_t * data) { + return csp_id_extract(data); +} + /** * Wrapper for csp_id_strip when no fixup is required. */ diff --git a/src/csp_id.c b/src/csp_id.c index 5e7dc7d78..47d755bfe 100644 --- a/src/csp_id.c +++ b/src/csp_id.c @@ -63,16 +63,11 @@ static void csp_id1_prepend(csp_packet_t * packet, bool cspv1_fixup) { memcpy(packet->frame_begin, &id1, CSP_ID1_HEADER_SIZE); } -static int csp_id1_strip(csp_packet_t * packet, bool cspv1_fixup) { - - if (packet->frame_length < CSP_ID1_HEADER_SIZE) { - return -1; - } +static csp_id_t csp_id1_extract(const uint8_t * data, bool cspv1_fixup) { /* Get 32 bit in network byte order */ uint32_t id1_raw = 0; - memcpy(&id1_raw, packet->frame_begin, CSP_ID1_HEADER_SIZE); - packet->length = packet->frame_length - CSP_ID1_HEADER_SIZE; + memcpy(&id1_raw, data, CSP_ID1_HEADER_SIZE); /* Convert to host order */ uint32_t id1 = be32toh(id1_raw); @@ -83,14 +78,15 @@ static int csp_id1_strip(csp_packet_t * packet, bool cspv1_fixup) { /* Parse header: * Now in easy to work with in 32 bit register */ - packet->id.pri = (id1 >> CSP_ID1_PRIO_OFFSET) & CSP_ID1_PRIO_MASK; - packet->id.dst = (id1 >> CSP_ID1_DST_OFFSET) & CSP_ID1_DST_MASK; - packet->id.src = (id1 >> CSP_ID1_SRC_OFFSET) & CSP_ID1_SRC_MASK; - packet->id.dport = (id1 >> CSP_ID1_DPORT_OFFSET) & CSP_ID1_DPORT_MASK; - packet->id.sport = (id1 >> CSP_ID1_SPORT_OFFSET) & CSP_ID1_SPORT_MASK; - packet->id.flags = (id1 >> CSP_ID1_FLAGS_OFFSET) & CSP_ID1_FLAGS_MASK; - - return 0; + csp_id_t id; + id.pri = (id1 >> CSP_ID1_PRIO_OFFSET) & CSP_ID1_PRIO_MASK; + id.dst = (id1 >> CSP_ID1_DST_OFFSET) & CSP_ID1_DST_MASK; + id.src = (id1 >> CSP_ID1_SRC_OFFSET) & CSP_ID1_SRC_MASK; + id.dport = (id1 >> CSP_ID1_DPORT_OFFSET) & CSP_ID1_DPORT_MASK; + id.sport = (id1 >> CSP_ID1_SPORT_OFFSET) & CSP_ID1_SPORT_MASK; + id.flags = (id1 >> CSP_ID1_FLAGS_OFFSET) & CSP_ID1_FLAGS_MASK; + + return id; } static void csp_id1_setup_rx(csp_packet_t * packet) { @@ -147,17 +143,12 @@ static void csp_id2_prepend(csp_packet_t * packet) { memcpy(packet->frame_begin, &id2, CSP_ID2_HEADER_SIZE); } -static int csp_id2_strip(csp_packet_t * packet) { - - if (packet->frame_length < CSP_ID2_HEADER_SIZE) { - return -1; - } +static csp_id_t csp_id2_extract(const uint8_t* data) { /* Get 48 bit in network byte order: * Most significant byte ends in byte 0 */ uint64_t id2 = 0; - memcpy(&id2, packet->frame_begin, CSP_ID2_HEADER_SIZE); - packet->length = packet->frame_length - CSP_ID2_HEADER_SIZE; + memcpy(&id2, data, CSP_ID2_HEADER_SIZE); /* Convert to host order: * Most significant byte ends in byte 7, we then shift down @@ -166,14 +157,15 @@ static int csp_id2_strip(csp_packet_t * packet) { /* Parse header: * Now in easy to work with in 32 bit register */ - packet->id.pri = (id2 >> CSP_ID2_PRIO_OFFSET) & CSP_ID2_PRIO_MASK; - packet->id.dst = (id2 >> CSP_ID2_DST_OFFSET) & CSP_ID2_DST_MASK; - packet->id.src = (id2 >> CSP_ID2_SRC_OFFSET) & CSP_ID2_SRC_MASK; - packet->id.dport = (id2 >> CSP_ID2_DPORT_OFFSET) & CSP_ID2_DPORT_MASK; - packet->id.sport = (id2 >> CSP_ID2_SPORT_OFFSET) & CSP_ID2_SPORT_MASK; - packet->id.flags = (id2 >> CSP_ID2_FLAGS_OFFSET) & CSP_ID2_FLAGS_MASK; - - return 0; + csp_id_t id; + id.pri = (id2 >> CSP_ID2_PRIO_OFFSET) & CSP_ID2_PRIO_MASK; + id.dst = (id2 >> CSP_ID2_DST_OFFSET) & CSP_ID2_DST_MASK; + id.src = (id2 >> CSP_ID2_SRC_OFFSET) & CSP_ID2_SRC_MASK; + id.dport = (id2 >> CSP_ID2_DPORT_OFFSET) & CSP_ID2_DPORT_MASK; + id.sport = (id2 >> CSP_ID2_SPORT_OFFSET) & CSP_ID2_SPORT_MASK; + id.flags = (id2 >> CSP_ID2_FLAGS_OFFSET) & CSP_ID2_FLAGS_MASK; + + return id; } static void csp_id2_setup_rx(csp_packet_t * packet) { @@ -197,12 +189,22 @@ void csp_id_prepend(csp_packet_t * packet) { } } -int csp_id_strip(csp_packet_t * packet) { +csp_id_t csp_id_extract(const uint8_t * data) { if (csp_conf.version == 2) { - return csp_id2_strip(packet); + return csp_id2_extract(data); } else { - return csp_id1_strip(packet, false); + return csp_id1_extract(data, false); + } +} + +int csp_id_strip(csp_packet_t * packet) { + if (packet->frame_length < csp_id_get_header_size()) { + return -1; } + + packet->id = csp_id_extract(packet->frame_begin); + packet->length = packet->frame_length - csp_id_get_header_size(); + return 0; } #if (CSP_FIXUP_V1_ZMQ_LITTLE_ENDIAN) @@ -216,14 +218,24 @@ void csp_id_prepend_fixup_cspv1(csp_packet_t * packet) { } } -int csp_id_strip_fixup_cspv1(csp_packet_t * packet) { +csp_id_t csp_id_extract_fixup_cspv1(const uint8_t * data) { if (csp_conf.version == 2) { - return csp_id2_strip(packet); + return csp_id2_extract(data); } else { - return csp_id1_strip(packet, true); + return csp_id1_extract(data, true); } } +int csp_id_strip_fixup_cspv1(csp_packet_t * packet) { + if (packet->frame_length < csp_id_get_header_size()) { + return -1; + } + + packet->id = csp_id_extract_fixup_cspv1(packet->frame_begin); + packet->length = packet->frame_length - csp_id_get_header_size(); + return 0; +} + #endif int csp_id_setup_rx(csp_packet_t * packet) { diff --git a/src/drivers/eth/eth_linux.c b/src/drivers/eth/eth_linux.c index 449d116cd..fdbf17d6b 100644 --- a/src/drivers/eth/eth_linux.c +++ b/src/drivers/eth/eth_linux.c @@ -7,12 +7,15 @@ #include #include +#include +#include +#include +#include #include #include #include #include -#include #include #include diff --git a/src/interfaces/csp_if_can.c b/src/interfaces/csp_if_can.c index f50897e60..cd2eab2ef 100644 --- a/src/interfaces/csp_if_can.c +++ b/src/interfaces/csp_if_can.c @@ -49,7 +49,21 @@ static int csp_can1_rx(csp_iface_t * iface, uint32_t id, const uint8_t * data, u csp_packet_t * packet = csp_can_pbuf_find(ifdata, id, CFP_ID_CONN_MASK, task_woken); if (packet == NULL) { if (CFP_TYPE(id) == CFP_BEGIN) { - packet = csp_can_pbuf_new(ifdata, id, task_woken); + uint8_t header[CFP1_CSP_HEADER_SIZE]; + /* Copy first 4 from data as they represent the CSP header, the data field is in network order */ + memcpy(header, data, CFP1_CSP_HEADER_SIZE); + csp_id_t csp_id = csp_id_extract(header); + packet = csp_can_pbuf_new(ifdata, id, csp_id, task_woken); + if (packet == NULL) { + iface->drop++; + return CSP_ERR_NOBUFS; + } + + csp_id_setup_rx(packet); + packet->id = csp_id; + + memcpy(packet->frame_begin, data, CFP1_CSP_HEADER_SIZE); + packet->frame_length += CFP1_CSP_HEADER_SIZE; } else { iface->frame++; return CSP_ERR_INVAL; @@ -64,21 +78,15 @@ static int csp_can1_rx(csp_iface_t * iface, uint32_t id, const uint8_t * data, u case CFP_BEGIN: /* Discard packet if DLC is less than CSP id + CSP length fields */ - if (dlc < (sizeof(uint32_t) + sizeof(uint16_t))) { + if (dlc < CFP1_DATA_OFFSET) { csp_dbg_can_errno = CSP_DBG_CAN_ERR_SHORT_BEGIN; iface->frame++; csp_can_pbuf_free(ifdata, packet, 1, task_woken); break; } - csp_id_setup_rx(packet); - - /* Copy CSP identifier (header) */ - memcpy(packet->frame_begin, data, sizeof(uint32_t)); - packet->frame_length += sizeof(uint32_t); - /* Copy CSP length (of data) */ - memcpy(&(packet->length), data + sizeof(uint32_t), sizeof(packet->length)); + memcpy(&(packet->length), data + CFP1_CSP_HEADER_SIZE, CFP1_DATA_LEN_SIZE); packet->length = be16toh(packet->length); /* Overflow: check if incoming frame data length is larger than buffer length */ @@ -92,7 +100,7 @@ static int csp_can1_rx(csp_iface_t * iface, uint32_t id, const uint8_t * data, u packet->rx_count = 0; /* Set offset to prevent CSP header from being copied to CSP data */ - offset = sizeof(uint32_t) + sizeof(uint16_t); + offset = CFP1_DATA_OFFSET; /* Set remain field - increment to include begin packet */ packet->remain = CFP_REMAIN(id) + 1; @@ -260,50 +268,56 @@ static int csp_can2_rx(csp_iface_t * iface, uint32_t id, const uint8_t * data, u csp_packet_t * packet = csp_can_pbuf_find(ifdata, id, CFP2_ID_CONN_MASK, task_woken); if (packet == NULL) { if (id & (CFP2_BEGIN_MASK << CFP2_BEGIN_OFFSET)) { - packet = csp_can_pbuf_new(ifdata, id, task_woken); - } else { - iface->frame++; - return CSP_ERR_INVAL; - } - } + /* Discard packet if DLC is less than CSP id + CSP length fields */ + if (dlc < 4) { + csp_dbg_can_errno = CSP_DBG_CAN_ERR_SHORT_BEGIN; + iface->frame++; + return CSP_ERR_INVAL; + } + + /* Copy first 2 bytes from CFP 2.0 header: + * Because the id field has already been converted in memory to a 32-bit + * host-order field, extract the first two bytes and convert back to + * network order */ + uint8_t header[6]; + uint16_t first_two = id >> CFP2_DST_OFFSET; + first_two = htobe16(first_two); + memcpy(header, &first_two, 2); + + /* Copy next 4 from data, the data field is in network order */ + memcpy(&header[2], data, 4); + + /* Move RX offset for incoming data */ + data += 4; + dlc -= 4; + + /* Create CSP header info from the first bytes received */ + csp_id_t csp_id = csp_id_extract(header); + packet = csp_can_pbuf_new(ifdata, id, csp_id, task_woken); + if (packet == NULL) { + iface->drop++; + return CSP_ERR_NOBUFS; + } - /* BEGIN */ - if (id & (CFP2_BEGIN_MASK << CFP2_BEGIN_OFFSET)) { + /* Prepare new CSP packet by adding header as extracted */ + csp_id_setup_rx(packet); + packet->id = csp_id; + memcpy(packet->frame_begin, header, csp_id_get_header_size()); + packet->frame_length = csp_id_get_header_size(); + packet->length = 0; - /* Discard packet if DLC is less than CSP id + CSP length fields */ - if (dlc < 4) { - csp_dbg_can_errno = CSP_DBG_CAN_ERR_SHORT_BEGIN; + /* Set next expected fragment counter to be 1 */ + packet->rx_count = 1; + } else { iface->frame++; - csp_can_pbuf_free(ifdata, packet, 1, task_woken); return CSP_ERR_INVAL; } + } - csp_id_setup_rx(packet); - - /* Copy first 2 bytes from CFP 2.0 header: - * Because the id field has already been converted in memory to a 32-bit - * host-order field, extract the first two bytes and convert back to - * network order */ - uint16_t first_two = id >> CFP2_DST_OFFSET; - first_two = htobe16(first_two); - memcpy(packet->frame_begin, &first_two, 2); - - /* Copy next 4 from data, the data field is in network order */ - memcpy(&packet->frame_begin[2], data, 4); - - packet->frame_length = 6; - packet->length = 0; - - /* Move RX offset for incoming data */ - data += 4; - dlc -= 4; - - /* Set next expected fragment counter to be 1 */ - packet->rx_count = 1; - /* FRAGMENT */ - } else { + /* FRAGMENT */ + if (!(id & (CFP2_BEGIN_MASK << CFP2_BEGIN_OFFSET))) { int fragment_counter = (id >> CFP2_FC_OFFSET) & CFP2_FC_MASK; @@ -337,8 +351,8 @@ static int csp_can2_rx(csp_iface_t * iface, uint32_t id, const uint8_t * data, u /* END */ if (id & (CFP2_END_MASK << CFP2_END_OFFSET)) { - /* Parse CSP header into csp_id type */ - csp_id_strip(packet); + /* Extract data length */ + packet->length = packet->frame_length - csp_id_get_header_size(); /* Rewrite incoming L2 broadcast to local node */ if (packet->id.dst == 0x3FFF) { diff --git a/src/interfaces/csp_if_can_pbuf.c b/src/interfaces/csp_if_can_pbuf.c index faca94e1e..4fc761bd8 100644 --- a/src/interfaces/csp_if_can_pbuf.c +++ b/src/interfaces/csp_if_can_pbuf.c @@ -6,6 +6,7 @@ #include #include #include +#include #include "../csp_buffer_private.h" @@ -45,13 +46,24 @@ void csp_can_pbuf_free(csp_can_interface_data_t * ifdata, csp_packet_t * buffer, } -csp_packet_t * csp_can_pbuf_new(csp_can_interface_data_t * ifdata, uint32_t id, int * task_woken) { +csp_packet_t * csp_can_pbuf_new(csp_can_interface_data_t * ifdata, uint32_t id, csp_id_t csp_id, int * task_woken) { csp_can_pbuf_cleanup(ifdata, task_woken); uint32_t now = (task_woken) ? csp_get_ms_isr() : csp_get_ms(); - csp_packet_t * packet = (task_woken) ? csp_buffer_get_always_isr() : csp_buffer_get_always(); + csp_packet_t * packet = NULL; + if (csp_iflist_get_by_addr(csp_id.dst) != NULL) { + /* The packet is for us, make sure we don't silently ignore the situation if we can't process it */ + packet = (task_woken) ? csp_buffer_get_always_isr() : csp_buffer_get_always(); + } else { + /* The packet is not for us, it is ok to drop it if we don't have enough buffers*/ + packet = (task_woken) ? csp_buffer_get_isr(0) : csp_buffer_get(0); + } + + if (packet == NULL) { + return packet; + } packet->last_used = now; packet->cfpid = id; diff --git a/src/interfaces/csp_if_can_pbuf.h b/src/interfaces/csp_if_can_pbuf.h index 08e53b70a..737fad1f4 100644 --- a/src/interfaces/csp_if_can_pbuf.h +++ b/src/interfaces/csp_if_can_pbuf.h @@ -11,6 +11,6 @@ typedef struct { } csp_can_pbuf_element_t; void csp_can_pbuf_free(csp_can_interface_data_t * ifdata, csp_packet_t * buffer, int buf_free, int * task_woken); -csp_packet_t * csp_can_pbuf_new(csp_can_interface_data_t * ifdata, uint32_t id, int * task_woken); +csp_packet_t * csp_can_pbuf_new(csp_can_interface_data_t * ifdata, uint32_t id, csp_id_t csp_id, int * task_woken); csp_packet_t * csp_can_pbuf_find(csp_can_interface_data_t * ifdata, uint32_t id, uint32_t mask, int * task_woken); void csp_can_pbuf_cleanup(csp_can_interface_data_t * ifdata, int * task_woken); diff --git a/src/interfaces/csp_if_eth.c b/src/interfaces/csp_if_eth.c index 0d81d6246..569695805 100644 --- a/src/interfaces/csp_if_eth.c +++ b/src/interfaces/csp_if_eth.c @@ -1,6 +1,6 @@ #include -#include +#include "csp_if_eth_pbuf.h" #include #include @@ -174,13 +174,15 @@ int csp_eth_rx(csp_iface_t * iface, csp_eth_header_t * eth_frame, uint32_t recei return CSP_ERR_INVAL; } - if (frame_length == 0 || frame_length > (CSP_BUFFER_SIZE + csp_id_get_header_size())) { + if (frame_length < csp_id_get_header_size() || frame_length > (CSP_BUFFER_SIZE + csp_id_get_header_size())) { iface->frame++; csp_print("eth rx frame_length of %u is invalid\n", frame_length); return CSP_ERR_INVAL; } - csp_packet_t * packet = csp_eth_pbuf_find(ifdata, packet_id, task_woken); + csp_id_t csp_id = csp_id_extract(eth_frame->frame_begin); + + csp_packet_t * packet = csp_eth_pbuf_find(ifdata, packet_id, csp_id, task_woken); if (packet == NULL) { iface->drop++; @@ -191,6 +193,7 @@ int csp_eth_rx(csp_iface_t * iface, csp_eth_header_t * eth_frame, uint32_t recei if (packet->frame_length == 0) { /* First segment */ csp_id_setup_rx(packet); + packet->id = csp_id; packet->frame_length = frame_length; packet->rx_count = 0; } @@ -217,14 +220,9 @@ int csp_eth_rx(csp_iface_t * iface, csp_eth_header_t * eth_frame, uint32_t recei return CSP_ERR_NONE; } - csp_eth_pbuf_free(ifdata, packet, false, task_woken); + packet->length = packet->frame_length - csp_id_get_header_size(); - if (csp_id_strip(packet) != 0) { - csp_print("eth rx packet discarded due to error in ID field\n"); - iface->frame++; - (task_woken) ? csp_buffer_free_isr(packet) : csp_buffer_free(packet); - return CSP_ERR_INVAL; - } + csp_eth_pbuf_free(ifdata, packet, false, task_woken); /* Record CSP and MAC addresses of source */ csp_eth_arp_set_addr(eth_frame->ether_shost, packet->id.src); diff --git a/src/interfaces/csp_if_eth_pbuf.c b/src/interfaces/csp_if_eth_pbuf.c index 3d7473645..3f22973a8 100644 --- a/src/interfaces/csp_if_eth_pbuf.c +++ b/src/interfaces/csp_if_eth_pbuf.c @@ -4,7 +4,9 @@ #include #include -#include +#include "csp_if_eth_pbuf.h" + +#include "../csp_buffer_private.h" /* Buffer element timeout in ms */ #define PBUF_TIMEOUT_MS 1000 @@ -41,11 +43,18 @@ void csp_eth_pbuf_free(csp_eth_interface_data_t * ifdata, csp_packet_t * buffer, } -csp_packet_t * csp_eth_pbuf_new(csp_eth_interface_data_t * ifdata, uint32_t id, uint32_t now, int * task_woken) { +csp_packet_t * csp_eth_pbuf_new(csp_eth_interface_data_t * ifdata, uint32_t id, csp_id_t csp_id, uint32_t now, int * task_woken) { csp_eth_pbuf_cleanup(ifdata, now, task_woken); - csp_packet_t * packet = (task_woken) ? csp_buffer_get_isr(0) : csp_buffer_get(0); + csp_packet_t * packet = NULL; + if (csp_iflist_get_by_addr(csp_id.dst) != NULL) { + /* The packet is for us, make sure we don't silently ignore the situation if we can't process it */ + packet = (task_woken) ? csp_buffer_get_always_isr() : csp_buffer_get_always(); + } else { + /* The packet is not for us, it is ok to drop it if we don't have enough buffers*/ + packet = (task_woken) ? csp_buffer_get_isr(0) : csp_buffer_get(0); + } if (packet == NULL) { return NULL; } @@ -92,7 +101,7 @@ void csp_eth_pbuf_cleanup(csp_eth_interface_data_t * ifdata, uint32_t now, int * } -csp_packet_t * csp_eth_pbuf_find(csp_eth_interface_data_t * ifdata, uint32_t id, int * task_woken) { +csp_packet_t * csp_eth_pbuf_find(csp_eth_interface_data_t * ifdata, uint32_t id, csp_id_t csp_id, int * task_woken) { uint32_t now = (task_woken) ? csp_get_ms_isr() : csp_get_ms(); @@ -106,6 +115,6 @@ csp_packet_t * csp_eth_pbuf_find(csp_eth_interface_data_t * ifdata, uint32_t id, packet = packet->next; } - return csp_eth_pbuf_new(ifdata, id, now, task_woken); + return csp_eth_pbuf_new(ifdata, id, csp_id, now, task_woken); } diff --git a/include/csp/interfaces/csp_if_eth_pbuf.h b/src/interfaces/csp_if_eth_pbuf.h similarity index 93% rename from include/csp/interfaces/csp_if_eth_pbuf.h rename to src/interfaces/csp_if_eth_pbuf.h index 01e1e0e94..f41538919 100644 --- a/include/csp/interfaces/csp_if_eth_pbuf.h +++ b/src/interfaces/csp_if_eth_pbuf.h @@ -27,11 +27,7 @@ #pragma once #include -#include #include -#include -#include -#include #include @@ -47,8 +43,8 @@ typedef struct { } csp_eth_pbuf_element_t; void csp_eth_pbuf_free(csp_eth_interface_data_t * ifdata, csp_packet_t * buffer, int buf_free, int * task_woken); -csp_packet_t * csp_eth_pbuf_new(csp_eth_interface_data_t * ifdata, uint32_t id, uint32_t now, int * task_woken); -csp_packet_t * csp_eth_pbuf_find(csp_eth_interface_data_t * ifdata, uint32_t id, int * task_woken); +csp_packet_t * csp_eth_pbuf_new(csp_eth_interface_data_t * ifdata, uint32_t id, csp_id_t csp_id, uint32_t now, int * task_woken); +csp_packet_t * csp_eth_pbuf_find(csp_eth_interface_data_t * ifdata, uint32_t id, csp_id_t csp_id, int * task_woken); void csp_eth_pbuf_cleanup(csp_eth_interface_data_t * ifdata, uint32_t now, int * task_woken); #ifdef __cplusplus diff --git a/src/interfaces/csp_if_zmqhub.c b/src/interfaces/csp_if_zmqhub.c index b726decae..061f72032 100644 --- a/src/interfaces/csp_if_zmqhub.c +++ b/src/interfaces/csp_if_zmqhub.c @@ -14,6 +14,7 @@ #include #include "../csp_macro.h" +#include "../csp_buffer_private.h" /** * ZMQ destination size (for libcsp1 backwards compatibility) @@ -129,30 +130,35 @@ static void * csp_zmqhub_task(void * param) { continue; } + // Copy the data from zmq to csp + uint8_t * rx_data = zmq_msg_data(&msg); + rx_data = csp_zmqhub_fixup_cspv1_del_dest_addr(rx_data, &datalen); + + csp_id_t csp_id = csp_id_extract_fixup_cspv1(rx_data); + // Create new csp packet - packet = csp_buffer_get(0); + if (csp_iflist_get_by_addr(csp_id.dst) != NULL) { + /* The packet is for us, make sure we don't silently ignore the situation if we can't process it */ + packet = csp_buffer_get_always(); + } else { + /* The packet is not for us, it is ok to drop it if we don't have enough buffers*/ + packet = csp_buffer_get(0); + } + if (packet == NULL) { csp_print("RX %s: Failed to get csp_buffer(%u)\n", drv->iface.name, datalen); zmq_msg_close(&msg); continue; } - // Copy the data from zmq to csp - uint8_t * rx_data = zmq_msg_data(&msg); - rx_data = csp_zmqhub_fixup_cspv1_del_dest_addr(rx_data, &datalen); - csp_id_setup_rx(packet); + packet->id = csp_id; memcpy(packet->frame_begin, rx_data, datalen); packet->frame_length = datalen; + /* Extract data length */ + packet->length = packet->frame_length - csp_id_get_header_size(); - /* Parse the frame and strip the ID field */ - if (csp_id_strip_fixup_cspv1(packet) != 0) { - drv->iface.rx_error++; - csp_buffer_free(packet); - zmq_msg_close(&msg); - continue; - } // Route packet csp_qfifo_write(packet, &drv->iface, NULL);