diff --git a/.clang-format b/.clang-format new file mode 100644 index 000000000..7e1e93055 --- /dev/null +++ b/.clang-format @@ -0,0 +1,23 @@ +BasedOnStyle: Google +AllowShortBlocksOnASingleLine: true +AllowShortIfStatementsOnASingleLine: true +ColumnLimit: 80 +DerivePointerAlignment: false +PointerAlignment: Left +IndentWidth: 4 +TabWidth: 4 +UseTab: Never +BreakBeforeBraces: Stroustrup +IncludeBlocks: Regroup +IncludeCategories: + - Regex: '^"(llvm|llvm-c|clang|clang-c)/' + Priority: 2 + SortPriority: 2 + CaseSensitive: true + - Regex: '^((<|")(gtest|gmock|isl|json)/)' + Priority: 3 + - Regex: '<[[:alnum:].]+>' + Priority: 4 + - Regex: '.*' + Priority: 1 + SortPriority: 0 diff --git a/.gitignore b/.gitignore index e63416c28..2a694a670 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ build/ .lock* .waf* deps/ +.vscode/ diff --git a/examples/cpp/transport/SerialCobsTransportTest.cpp b/examples/cpp/transport/SerialCobsTransportTest.cpp new file mode 100644 index 000000000..d1ada6e0a --- /dev/null +++ b/examples/cpp/transport/SerialCobsTransportTest.cpp @@ -0,0 +1,94 @@ +#include "types/example_t.hpp" +#include "zcm/transport/cobs_serial/generic_serial_cobs_transport.h" +#include "zcm/zcm-cpp.hpp" +#include + +#include +#include +#include + +using namespace std; +using namespace zcm; + +#define PUBLISH_DT (1e6) / (5) +#define MIN(A, B) ((A) < (B)) ? (A) : (B) +#define MAX_FIFO 12 + +queue fifo; + +static size_t get(uint8_t* data, size_t nData, void* usr) +{ + size_t n = MIN(MAX_FIFO, nData); + n = MIN(fifo.size(), n); + + for (size_t i = 0; i < n; ++i) { + data[i] = fifo.front(); + fifo.pop(); + } + + return n; +} + +static size_t put(const uint8_t* data, size_t nData, void* usr) +{ + size_t n = MIN(MAX_FIFO - fifo.size(), nData); + // cout << "Put " << n << " bytes" << endl; + + for (size_t i = 0; i < n; ++i) fifo.push(data[i]); + + return n; +} + +static uint64_t utime(void* usr) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return (uint64_t)tv.tv_sec * 1000000 + tv.tv_usec; +} + +class Handler { + int64_t lastHost = 0; + + public: + void handle(const ReceiveBuffer* rbuf, const string& chan, + const example_t* msg) + { + cout << "Message received" << endl; + + if (msg->timestamp <= lastHost || rbuf->recv_utime < lastHost) { + assert("ERROR: utime mismatch. This should never happen"); + } + lastHost = msg->timestamp; + } +}; + +int main(int argc, const char* argv[]) +{ + ZCM zcmLocal(zcm_trans_generic_serial_cobs_create(&get, &put, NULL, &utime, + NULL, 1024, 1024 * 5)); + + example_t example = {}; + example.num_ranges = 1; + example.ranges.resize(1); + example.ranges.at(0) = 1; + + Handler handler; + auto sub = zcmLocal.subscribe("EXAMPLE", &Handler::handle, &handler); + + uint64_t nextPublish = 0; + while (true) { + uint64_t now = utime(NULL); + if (now > nextPublish) { + cout << "Publishing" << endl; + example.timestamp = now; + zcmLocal.publish("EXAMPLE", &example); + nextPublish = now + PUBLISH_DT; + } + + zcmLocal.handleNonblock(); + } + + zcmLocal.unsubscribe(sub); + + return 0; +} diff --git a/examples/cpp/transport/wscript b/examples/cpp/transport/wscript index 238764b5e..82c242c49 100644 --- a/examples/cpp/transport/wscript +++ b/examples/cpp/transport/wscript @@ -5,3 +5,7 @@ def build(ctx): ctx.program(target = 'generic-serial', use = 'default zcm examplezcmtypes_cpp', source = 'SerialTransportTest.cpp') + + ctx.program(target = 'generic-cobs-serial', + use = 'default zcm examplezcmtypes_cpp', + source = 'SerialCobsTransportTest.cpp') diff --git a/zcm/transport/cobs_serial/generic_serial_cobs_transport.c b/zcm/transport/cobs_serial/generic_serial_cobs_transport.c new file mode 100644 index 000000000..d6c5abd73 --- /dev/null +++ b/zcm/transport/cobs_serial/generic_serial_cobs_transport.c @@ -0,0 +1,368 @@ +#include "zcm/transport/cobs_serial/generic_serial_cobs_transport.h" + +#include "zcm/transport.h" +#include "zcm/transport/generic_serial_circ_buff.h" +#include "zcm/transport/generic_serial_fletcher.h" +#include "zcm/util/cobs.h" +#include "zcm/zcm.h" + +#include +#include +#include +#include + +#ifndef ZCM_COBS_SERIAL_TERM_CHAR +#define ZCM_COBS_SERIAL_TERM_CHAR (0x00) +#endif + +#define ASSERT(x) + +// Framing (size = 8 + chanLen + data_len) +// chanLen +// data_len (4 bytes) +// *chan +// *data +// sum1(*chan, *data) +// sum2(*chan, *data) +// 0x00 -- termination char +#define FRAME_BYTES 8 +#define COBS_MAX_FRAME_OVERHEAD 1 +static const size_t minMessageSize = FRAME_BYTES + COBS_MAX_FRAME_OVERHEAD; + +typedef struct zcm_trans_cobs_serial_t { + zcm_trans_t trans; // This must be first to preserve pointer casting + + circBuffer_t sendBuffer; + circBuffer_t recvBuffer; + size_t mtu; + uint8_t recvChanName[ZCM_CHANNEL_MAXLEN + 1]; + uint8_t* sendMsgData; + uint8_t* recvMsgData; + + size_t (*get)(uint8_t* data, size_t nData, void* usr); + size_t (*put)(const uint8_t* data, size_t nData, void* usr); + void* put_get_usr; + + uint64_t (*time)(void* usr); + void* time_usr; +} zcm_trans_cobs_serial_t; + +static zcm_trans_cobs_serial_t* cast(zcm_trans_t* zt); + +size_t serial_cobs_get_mtu(zcm_trans_cobs_serial_t* zt) { return zt->mtu; } + +/** + * @brief COBS encode \p length bytes from \p src to \p dest + * + * @param dest + * @param src + * @param length + * + * @return Encoded buffer length in bytes + * + * @note Does not encode termination character + */ +static size_t cobs_encode_zcm(circBuffer_t* dest, const uint8_t* src, + size_t length) +{ + uint8_t code = 1; + size_t stuffBytes = 1; + size_t bytesWritten = 0; + + for (const uint8_t* byte = src; length--; ++byte) { + if (*byte) { ++code; } + + if (!*byte || code == 0xff) { // zero or end of block, restart + cb_push_back(dest, code); + ++bytesWritten; + while (--code) { + cb_push_back(dest, src[bytesWritten - stuffBytes]); + ++bytesWritten; + } + if (*byte) { stuffBytes++; } + code = 1; + } + } + cb_push_back(dest, code); + ++bytesWritten; + while (--code) { + cb_push_back(dest, src[bytesWritten - stuffBytes]); + ++bytesWritten; + } + + return bytesWritten; +} + +/** + * @brief COBS decode \p length bytes from \p src to \p dest + * + * @param dest + * @param src + * @param length + * + * @return Decoded buffer length in bytes, excluding delimeter + * + * @note Stops decoding if delimiter byte is found + * @note Pops values out of \p src if message is complete or malformed, + * does not pop values if message is incomplete + */ +static size_t cobs_decode_zcm(uint8_t* dest, circBuffer_t* src, size_t length) +{ + size_t bytesRead = 0; + uint8_t byte = cb_front(src, bytesRead++); + if (byte == 0x00) { + cb_pop_front(src, bytesRead); + return 0; + } + + size_t stuffBytes = 0; + uint8_t* decode = dest; + for (uint8_t code = 0xff, block = 0; bytesRead < length; --block) { + if (block) { + if (byte == 0x00) { // packet malformed + cb_pop_front(src, bytesRead); + return 0; + } + + *decode = byte; + decode++; + } + else { + if (code == 0xff) { stuffBytes++; } + else { + *decode = 0; + decode++; + } + + block = code = byte; + if (code == 0x00) break; + } + + byte = cb_front(src, bytesRead++); + } + + if (byte == 0x00) { // ended on terminator (complete) + stuffBytes++; + cb_pop_front(src, bytesRead); + return bytesRead - stuffBytes; + } + + // ended on non-terminator (incomplete) + return 0; +} + +int serial_cobs_sendmsg(zcm_trans_cobs_serial_t* zt, zcm_msg_t msg) +{ + size_t chanLen = strlen(msg.channel); + size_t payloadLen = FRAME_BYTES + chanLen + msg.len; + + if (msg.len > zt->mtu || chanLen > ZCM_CHANNEL_MAXLEN) { + return ZCM_EINVALID; + } + if (payloadLen + cobsMaxOverhead(payloadLen) > cb_room(&zt->sendBuffer)) { + return ZCM_EAGAIN; + } + + uint8_t* pMsgData = zt->sendMsgData; + + // Copy channel and message length + *pMsgData = (uint8_t)chanLen; + pMsgData++; + + pMsgData[0] = (uint8_t)(msg.len & 0xFF); + pMsgData[1] = (uint8_t)((msg.len >> 8) & 0xFF); + pMsgData[2] = (uint8_t)((msg.len >> 16) & 0xFF); + pMsgData[3] = (uint8_t)((msg.len >> 24) & 0xFF); + pMsgData += 4; + + // Copy channel name into msgData + memcpy(pMsgData, msg.channel, chanLen); + pMsgData += chanLen; + + // Copy message contents into msgData + memcpy(pMsgData, msg.buf, msg.len); + pMsgData += msg.len; + + // Calculate Fletcher-16 checksum and complementary bytes + uint16_t checksum = fletcher16(zt->sendMsgData, payloadLen - 3); + uint8_t sumLow = (uint8_t)(checksum & 0xFF); + uint8_t sumHigh = (uint8_t)((checksum >> 8) & 0xFF); + pMsgData[0] = 255 - ((sumLow + sumHigh) % 255); + pMsgData[1] = 255 - ((sumLow + pMsgData[0]) % 255); + + size_t bytesEncoded = + cobs_encode_zcm(&zt->sendBuffer, zt->sendMsgData, payloadLen - 1); + if (bytesEncoded <= payloadLen - 1) { + return ZCM_EAGAIN; // encoding failed + } + cb_push_back(&zt->sendBuffer, ZCM_COBS_SERIAL_TERM_CHAR); + + return ZCM_EOK; +} + +int serial_cobs_recvmsg_enable(zcm_trans_cobs_serial_t* zt, const char* channel, + bool enable) +{ + return ZCM_EOK; // not implemented +} + +int serial_cobs_recvmsg(zcm_trans_cobs_serial_t* zt, zcm_msg_t* msg, + int timeout) +{ + uint64_t utime = zt->time(zt->time_usr); + size_t incomingSize = cb_size(&zt->recvBuffer); + if (incomingSize < minMessageSize) return ZCM_EAGAIN; + + // COBS decode + size_t bytesDecoded = + cobs_decode_zcm(zt->recvMsgData, &zt->recvBuffer, incomingSize); + if (bytesDecoded < minMessageSize) return ZCM_EAGAIN; + + // Extract channel and message sizes + uint8_t* pMsgData = zt->recvMsgData; + uint8_t chanLen = *pMsgData; + pMsgData++; + + msg->len = pMsgData[0]; + msg->len |= pMsgData[1] << 8; + msg->len |= pMsgData[2] << 16; + msg->len |= pMsgData[3] << 24; + pMsgData += 4; + + // Value rationality checks + if (chanLen > ZCM_CHANNEL_MAXLEN || msg->len > zt->mtu) return ZCM_EINVALID; + + // Calculate Fletcher-16 checksum for entire payload (including checkbytes) + uint16_t checksum = 0; + checksum = fletcher16(zt->recvMsgData, bytesDecoded); + if (checksum != 0x0000) return ZCM_EINVALID; + + // Copy channel name + memset(&zt->recvChanName, '\0', ZCM_CHANNEL_MAXLEN); + memcpy(zt->recvChanName, pMsgData, chanLen); + pMsgData += chanLen; + + // Copy values into msg + msg->channel = (char*)zt->recvChanName; + msg->buf = pMsgData; + msg->utime = utime; + + return ZCM_EOK; +} + +int serial_cobs_update_rx(zcm_trans_t* _zt) +{ + zcm_trans_cobs_serial_t* zt = cast(_zt); + cb_flush_in(&zt->recvBuffer, zt->get, zt->put_get_usr); + return ZCM_EOK; +} + +int serial_cobs_update_tx(zcm_trans_t* _zt) +{ + zcm_trans_cobs_serial_t* zt = cast(_zt); + cb_flush_out(&zt->sendBuffer, zt->put, zt->put_get_usr); + return ZCM_EOK; +} + +/********************** STATICS **********************/ +static size_t _serial_get_mtu(zcm_trans_t* zt) +{ + return serial_cobs_get_mtu(cast(zt)); +} + +static int _serial_sendmsg(zcm_trans_t* zt, zcm_msg_t msg) +{ + return serial_cobs_sendmsg(cast(zt), msg); +} + +static int _serial_recvmsg_enable(zcm_trans_t* zt, const char* channel, + bool enable) +{ + return serial_cobs_recvmsg_enable(cast(zt), channel, enable); +} + +static int _serial_recvmsg(zcm_trans_t* zt, zcm_msg_t* msg, unsigned timeout) +{ + return serial_cobs_recvmsg(cast(zt), msg, timeout); +} + +static int _serial_update(zcm_trans_t* zt) +{ + int rxRet = serial_cobs_update_rx(zt); + int txRet = serial_cobs_update_tx(zt); + return rxRet == ZCM_EOK ? txRet : rxRet; +} + +static zcm_trans_methods_t methods = { + &_serial_get_mtu, + &_serial_sendmsg, + &_serial_recvmsg_enable, + &_serial_recvmsg, + NULL, // drops + &_serial_update, + &zcm_trans_generic_serial_cobs_destroy, +}; + +static zcm_trans_cobs_serial_t* cast(zcm_trans_t* zt) +{ + assert(zt->vtbl == &methods); + return (zcm_trans_cobs_serial_t*)zt; +} + +zcm_trans_t* zcm_trans_generic_serial_cobs_create( + size_t (*get)(uint8_t* data, size_t nData, void* usr), + size_t (*put)(const uint8_t* data, size_t nData, void* usr), + void* put_get_usr, uint64_t (*timestamp_now)(void* usr), void* time_usr, + size_t MTU, size_t bufSize) +{ + if (MTU == 0 || bufSize < FRAME_BYTES + MTU) return NULL; + zcm_trans_cobs_serial_t* zt = malloc(sizeof(zcm_trans_cobs_serial_t)); + if (zt == NULL) return NULL; + zt->mtu = MTU; + + zt->sendMsgData = NULL; + zt->recvMsgData = NULL; + zt->recvBuffer.data = NULL; + zt->sendBuffer.data = NULL; + + size_t maxPayloadSize = FRAME_BYTES + ZCM_CHANNEL_MAXLEN + zt->mtu; + zt->recvMsgData = malloc(maxPayloadSize * sizeof(uint8_t)); + if (zt->recvMsgData == NULL) goto fail; + + zt->sendMsgData = malloc(maxPayloadSize * sizeof(uint8_t)); + if (zt->sendMsgData == NULL) goto fail; + + if (!cb_init(&zt->recvBuffer, bufSize)) goto fail; + if (!cb_init(&zt->sendBuffer, bufSize)) goto fail; + + zt->trans.trans_type = ZCM_NONBLOCKING; + zt->trans.vtbl = &methods; + + zt->get = get; + zt->put = put; + zt->put_get_usr = put_get_usr; + + zt->time = timestamp_now; + zt->time_usr = time_usr; + + return (zcm_trans_t*)zt; + +fail: + if (zt->recvBuffer.data != NULL) cb_deinit(&zt->recvBuffer); + if (zt->sendBuffer.data != NULL) cb_deinit(&zt->sendBuffer); + if (zt->recvMsgData != NULL) free(zt->recvMsgData); + if (zt->sendMsgData != NULL) free(zt->sendMsgData); + + free(zt); + return NULL; +} + +void zcm_trans_generic_serial_cobs_destroy(zcm_trans_t* _zt) +{ + zcm_trans_cobs_serial_t* zt = cast(_zt); + cb_deinit(&zt->recvBuffer); + cb_deinit(&zt->sendBuffer); + free(zt->recvMsgData); + free(zt->sendMsgData); + free(zt); +} diff --git a/zcm/transport/cobs_serial/generic_serial_cobs_transport.h b/zcm/transport/cobs_serial/generic_serial_cobs_transport.h new file mode 100644 index 000000000..4b8547556 --- /dev/null +++ b/zcm/transport/cobs_serial/generic_serial_cobs_transport.h @@ -0,0 +1,29 @@ +#ifndef _ZCM_TRANS_NONBLOCKING_SERIAL_COBS_H +#define _ZCM_TRANS_NONBLOCKING_SERIAL_COBS_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "zcm/transport.h" +#include "zcm/zcm.h" + +#include + +zcm_trans_t* zcm_trans_generic_serial_cobs_create( + size_t (*get)(uint8_t* data, size_t nData, void* usr), + size_t (*put)(const uint8_t* data, size_t nData, void* usr), + void* put_get_usr, uint64_t (*timestamp_now)(void* usr), void* time_usr, + size_t MTU, size_t bufSize); + +// frees all resources inside of zt and frees zt itself +void zcm_trans_generic_serial_cobs_destroy(zcm_trans_t* zt); + +int serial_cobs_update_rx(zcm_trans_t* _zt); +int serial_cobs_update_tx(zcm_trans_t* _zt); + +#ifdef __cplusplus +} +#endif + +#endif /* _ZCM_TRANS_NONBLOCKING_SERIAL_COBS_H */ diff --git a/zcm/transport/generic_serial_fletcher.h b/zcm/transport/generic_serial_fletcher.h index f06e7de8a..472e5c4b0 100644 --- a/zcm/transport/generic_serial_fletcher.h +++ b/zcm/transport/generic_serial_fletcher.h @@ -2,6 +2,7 @@ #define _ZCM_TRANS_NONBLOCKING_SERIAL_FLETCHER_H #include +#include static inline uint16_t fletcherUpdate(uint8_t b, uint16_t prevSum) { @@ -19,4 +20,34 @@ static inline uint16_t fletcherUpdate(uint8_t b, uint16_t prevSum) return (sumHigh << 8) | sumLow; } +/** + * @brief Calculate Fletcher-16 checksum + * + * @param data + * @param len + * + * @return Fletcher16 checksum + */ +// RRR (Bendes): why is this not deferring to the above? DRY principle :) +// RRR (Griff): because this one is more efficient +static inline uint16_t fletcher16(const uint8_t* data, size_t len) +{ + uint32_t c0, c1; + + /* Found by solving for c1 overflow: */ + /* n > 0 and n * (n+1) / 2 * (2^8-1) < (2^32-1). */ + for (c0 = c1 = 0; len > 0;) { + size_t blocklen = len; + if (blocklen > 5802) blocklen = 5802; + len -= blocklen; + do { + c0 = c0 + *data++; + c1 = c1 + c0; + } while (--blocklen); + c0 = c0 % 255; + c1 = c1 % 255; + } + return (c1 << 8 | c0); +} + #endif /* _ZCM_TRANS_NONBLOCKING_FLETCHER_H */ diff --git a/zcm/transport/transport_cobs_serial.cpp b/zcm/transport/transport_cobs_serial.cpp new file mode 100644 index 000000000..619fffab6 --- /dev/null +++ b/zcm/transport/transport_cobs_serial.cpp @@ -0,0 +1,302 @@ +#include "generic_serial_transport.h" +#include "util/TimeUtil.hpp" +#include "zcm/transport.h" +#include "zcm/transport/cobs_serial/generic_serial_cobs_transport.h" +#include "zcm/transport/transport_serial.hpp" +#include "zcm/transport_register.hpp" +#include "zcm/transport_registrar.h" +#include "zcm/util/debug.h" +#include "zcm/util/lockfile.h" +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +using namespace std; + +// TODO: This transport layer needs to be "hardened" to handle +// all of the possible errors and corner cases. Currently, it +// should work fine in most cases, but it might fail on some +// rare cases... + +// Define this the class name you want +#define ZCM_TRANS_CLASSNAME TransportCobsSerial +#define MTU (1 << 14) +#define ESCAPE_CHAR (0xcc) + +#define SERIAL_TIMEOUT_US 1e5 // u-seconds + +#define US_TO_MS(a) (a) / 1e3 + +using u8 = uint8_t; +using u16 = uint16_t; +using u32 = uint32_t; +using u64 = uint64_t; + +struct ZCM_TRANS_CLASSNAME : public zcm_trans_t { + Serial ser; + + int baud; + bool hwFlowControl; + + bool raw; + string rawChan; + int rawSize; + std::unique_ptr rawBuf; + + string address; + + unordered_map options; + + zcm_trans_t* gst; + + uint64_t timeoutLeft; + + string* findOption(const string& s) + { + auto it = options.find(s); + if (it == options.end()) return nullptr; + return &it->second; + } + + ZCM_TRANS_CLASSNAME(zcm_url_t* url) + { + trans_type = ZCM_BLOCKING; + vtbl = &methods; + + // build 'options' + auto* opts = zcm_url_opts(url); + for (size_t i = 0; i < opts->numopts; ++i) + options[opts->name[i]] = opts->value[i]; + + baud = 0; + auto* baudStr = findOption("baud"); + if (!baudStr) { + fprintf(stderr, "Baud unspecified. Bypassing serial baud setup.\n"); + } + else { + baud = atoi(baudStr->c_str()); + if (baud == 0) { + ZCM_DEBUG("expected integer argument for 'baud'"); + return; + } + } + + hwFlowControl = false; + auto* hwFlowControlStr = findOption("hw_flow_control"); + if (hwFlowControlStr) { + if (*hwFlowControlStr == "true") { hwFlowControl = true; } + else if (*hwFlowControlStr == "false") { + hwFlowControl = false; + } + else { + ZCM_DEBUG("expected boolean argument for 'hw_flow_control'"); + return; + } + } + + raw = false; + auto* rawStr = findOption("raw"); + if (rawStr) { + if (*rawStr == "true") { raw = true; } + else if (*rawStr == "false") { + raw = false; + } + else { + ZCM_DEBUG("expected boolean argument for 'raw'"); + return; + } + } + + rawChan = ""; + auto* rawChanStr = findOption("raw_channel"); + if (rawChanStr) { rawChan = *rawChanStr; } + + rawSize = 1024; + auto* rawSizeStr = findOption("raw_size"); + if (rawSizeStr) { + rawSize = atoi(rawSizeStr->c_str()); + if (rawSize <= 0) { + ZCM_DEBUG("expected positive integer argument for 'raw_size'"); + return; + } + } + + address = zcm_url_address(url); + ser.open(address, baud, hwFlowControl); + + if (raw) { + rawBuf.reset(new uint8_t[rawSize]); + gst = nullptr; + } + else { + gst = zcm_trans_generic_serial_cobs_create( + &ZCM_TRANS_CLASSNAME::get, &ZCM_TRANS_CLASSNAME::put, this, + &ZCM_TRANS_CLASSNAME::timestamp_now, nullptr, MTU, MTU * 10); + } + } + + ~ZCM_TRANS_CLASSNAME() + { + ser.close(); + if (gst) zcm_trans_generic_serial_cobs_destroy(gst); + } + + bool good() { return ser.isOpen(); } + + static size_t get(uint8_t* data, size_t nData, void* usr) + { + ZCM_TRANS_CLASSNAME* me = cast((zcm_trans_t*)usr); + uint64_t startUtime = TimeUtil::utime(); + int ret = me->ser.read(data, nData, me->timeoutLeft); + uint64_t diff = TimeUtil::utime() - startUtime; + me->timeoutLeft = me->timeoutLeft > diff ? me->timeoutLeft - diff : 0; + return ret < 0 ? 0 : ret; + } + + static size_t put(const uint8_t* data, size_t nData, void* usr) + { + ZCM_TRANS_CLASSNAME* me = cast((zcm_trans_t*)usr); + int ret = me->ser.write(data, nData); + return ret < 0 ? 0 : ret; + } + + static uint64_t timestamp_now(void* usr) { return TimeUtil::utime(); } + + /********************** METHODS **********************/ + size_t getMtu() { return raw ? MTU : zcm_trans_get_mtu(this->gst); } + + int sendmsg(zcm_msg_t msg) + { + if (raw) { + if (put(msg.buf, msg.len, this) != 0) return ZCM_EOK; + return ZCM_EAGAIN; + } + else { + // Note: No need to lock here ONLY because the internals of + // generic serial transport sendmsg only use the sendBuffer + // and touch no variables related to receiving + int ret = zcm_trans_sendmsg(this->gst, msg); + if (ret != ZCM_EOK) return ret; + return serial_cobs_update_tx(this->gst); + } + } + + int recvmsgEnable(const char* channel, bool enable) + { + return raw ? ZCM_EOK + : zcm_trans_recvmsg_enable(this->gst, channel, enable); + } + + int recvmsg(zcm_msg_t* msg, int timeoutMs) + { + timeoutLeft = + timeoutMs > 0 ? timeoutMs * 1e3 : numeric_limits::max(); + + if (raw) { + size_t sz = get(rawBuf.get(), rawSize, this); + if (sz == 0 || rawChan.empty()) return ZCM_EAGAIN; + + msg->utime = timestamp_now(this); + msg->channel = rawChan.c_str(); + msg->len = sz; + msg->buf = rawBuf.get(); + + return ZCM_EOK; + } + else { + do { + uint64_t startUtime = TimeUtil::utime(); + + // Note: No need to lock here ONLY because the internals of + // generic serial transport recvmsg only use the recv + // related data members and touch no variables related to + // sending + int ret = zcm_trans_recvmsg(this->gst, msg, timeoutLeft); + if (ret == ZCM_EOK) return ret; + + uint64_t diff = TimeUtil::utime() - startUtime; + startUtime = TimeUtil::utime(); + // Note: timeoutLeft is calculated here because serial_update_rx + // needs it to be set properly so that the blocking read + // in `get` knows how long it has to exit + timeoutLeft = timeoutLeft > diff ? timeoutLeft - diff : 0; + + serial_cobs_update_rx(this->gst); + + diff = TimeUtil::utime() - startUtime; + timeoutLeft = timeoutLeft > diff ? timeoutLeft - diff : 0; + + } while (timeoutLeft > 0); + + return ZCM_EAGAIN; + } + } + + /********************** STATICS **********************/ + static zcm_trans_methods_t methods; + static ZCM_TRANS_CLASSNAME* cast(zcm_trans_t* zt) + { + assert(zt->vtbl == &methods); + return (ZCM_TRANS_CLASSNAME*)zt; + } + + static size_t _getMtu(zcm_trans_t* zt) { return cast(zt)->getMtu(); } + + static int _sendmsg(zcm_trans_t* zt, zcm_msg_t msg) + { + return cast(zt)->sendmsg(msg); + } + + static int _recvmsgEnable(zcm_trans_t* zt, const char* channel, bool enable) + { + return cast(zt)->recvmsgEnable(channel, enable); + } + + static int _recvmsg(zcm_trans_t* zt, zcm_msg_t* msg, unsigned timeout) + { + return cast(zt)->recvmsg(msg, timeout); + } + + static void _destroy(zcm_trans_t* zt) { delete cast(zt); } + + static const TransportRegister reg; +}; + +zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = { + &ZCM_TRANS_CLASSNAME::_getMtu, + &ZCM_TRANS_CLASSNAME::_sendmsg, + &ZCM_TRANS_CLASSNAME::_recvmsgEnable, + &ZCM_TRANS_CLASSNAME::_recvmsg, + NULL, // drops + NULL, // update + &ZCM_TRANS_CLASSNAME::_destroy, +}; + +static zcm_trans_t* create(zcm_url_t* url, char** opt_errmsg) +{ + if (opt_errmsg) *opt_errmsg = NULL; // Feature unused in this transport + auto* trans = new ZCM_TRANS_CLASSNAME(url); + if (trans->good()) return trans; + + delete trans; + return nullptr; +} + +#ifdef USING_TRANS_SERIAL +// Register this transport with ZCM +const TransportRegister ZCM_TRANS_CLASSNAME::reg( + "serial-cobs", + "Transfer data via a serial connection using COBS encoding " + "(e.g. 'serial-cobs:///dev/ttyUSB0?baud=115200&hw_flow_control=true' or " + "'serial-cobs:///dev/pts/10?raw=true&raw_channel=RAW_SERIAL')", + create); +#endif diff --git a/zcm/transport/transport_serial.cpp b/zcm/transport/transport_serial.cpp index b0cb8e060..175648a29 100644 --- a/zcm/transport/transport_serial.cpp +++ b/zcm/transport/transport_serial.cpp @@ -1,27 +1,23 @@ +#include "generic_serial_transport.h" +#include "util/TimeUtil.hpp" #include "zcm/transport.h" -#include "zcm/transport_registrar.h" #include "zcm/transport_register.hpp" -#include "zcm/util/lockfile.h" +#include "zcm/transport_registrar.h" #include "zcm/util/debug.h" - -#include "generic_serial_transport.h" - -#include "util/TimeUtil.hpp" - -#include -#include -#include -#include -#include +#include "zcm/util/lockfile.h" #include +#include +#include #include #include - +#include +#include #include #include #include -#include +#include +#include using namespace std; // TODO: This transport layer needs to be "hardened" to handle @@ -31,21 +27,20 @@ using namespace std; // Define this the class name you want #define ZCM_TRANS_CLASSNAME TransportSerial -#define MTU (1<<14) +#define MTU (1 << 14) #define ESCAPE_CHAR (0xcc) -#define SERIAL_TIMEOUT_US 1e5 // u-seconds +#define SERIAL_TIMEOUT_US 1e5 // u-seconds -#define US_TO_MS(a) (a)/1e3 +#define US_TO_MS(a) (a) / 1e3 -using u8 = uint8_t; +using u8 = uint8_t; using u16 = uint16_t; using u32 = uint32_t; using u64 = uint64_t; -struct Serial -{ - Serial(){} +struct Serial { + Serial() {} ~Serial() { close(); } bool open(const string& port, int baud, bool hwFlowControl); @@ -54,7 +49,8 @@ struct Serial int write(const u8* buf, size_t sz); int read(u8* buf, size_t sz, u64 timeoutUs); - // Returns 0 on invalid input baud otherwise returns termios constant baud value + // Returns 0 on invalid input baud otherwise returns termios constant baud + // value static int convertBaud(int baud); Serial(const Serial&) = delete; @@ -62,7 +58,7 @@ struct Serial Serial& operator=(const Serial&) = delete; Serial& operator=(Serial&&) = delete; - private: + private: string port; int fd = -1; lockfile_t* lf; @@ -71,17 +67,21 @@ struct Serial bool Serial::open(const string& port_, int baud, bool hwFlowControl) { if (baud == 0) { - fprintf(stderr, "Serial baud rate not specified in url. " - "Proceeding without setting baud\n"); - } else if (!(baud = convertBaud(baud))) { - fprintf(stderr, "Unrecognized baudrate. Failed to open serial device.\n "); + fprintf(stderr, + "Serial baud rate not specified in url. " + "Proceeding without setting baud\n"); + } + else if (!(baud = convertBaud(baud))) { + fprintf(stderr, + "Unrecognized baudrate. Failed to open serial device.\n "); return false; } lf = lockfile_trylock(port_.c_str()); if (!lf) { - ZCM_DEBUG("failed to create lock file, refusing to open serial device (%s)", - port_.c_str()); + ZCM_DEBUG( + "failed to create lock file, refusing to open serial device (%s)", + port_.c_str()); return false; } this->port = port_; @@ -89,7 +89,8 @@ bool Serial::open(const string& port_, int baud, bool hwFlowControl) int flags = O_RDWR | O_NOCTTY | O_SYNC; fd = ::open(port.c_str(), flags, 0); if (fd < 0) { - ZCM_DEBUG("failed to open serial device (%s): %s", port.c_str(), strerror(errno)); + ZCM_DEBUG("failed to open serial device (%s): %s", port.c_str(), + strerror(errno)); goto fail; } @@ -117,8 +118,8 @@ bool Serial::open(const string& port_, int baud, bool hwFlowControl) opts.c_cflag |= CS8; opts.c_cflag &= ~PARENB; if (hwFlowControl) opts.c_cflag |= CRTSCTS; - opts.c_cc[VTIME] = 1; - opts.c_cc[VMIN] = 30; + opts.c_cc[VTIME] = 1; + opts.c_cc[VMIN] = 30; // set the new termios config if (tcsetattr(fd, TCSANOW, &opts)) { @@ -130,7 +131,7 @@ bool Serial::open(const string& port_, int baud, bool hwFlowControl) return true; - fail: +fail: // Close the port if it was opened if (fd > 0) { const int saved_errno = errno; @@ -194,7 +195,8 @@ int Serial::read(u8* buf, size_t sz, u64 timeoutUs) int ret = ::read(fd, buf, sz); if (ret == -1) { ZCM_DEBUG("ERR: serial read failed: %s", strerror(errno)); - } else if (ret == 0) { + } + else if (ret == 0) { ZCM_DEBUG("ERR: serial device unplugged"); close(); assert(false && "ERR: serial device unplugged\n" && @@ -202,11 +204,13 @@ int Serial::read(u8* buf, size_t sz, u64 timeoutUs) return -3; } return ret; - } else { + } + else { ZCM_DEBUG("ERR: serial bytes not ready"); return -1; } - } else { + } + else { ZCM_DEBUG("ERR: serial read timed out"); return -2; } @@ -236,8 +240,7 @@ int Serial::convertBaud(int baud) } } -struct ZCM_TRANS_CLASSNAME : public zcm_trans_t -{ +struct ZCM_TRANS_CLASSNAME : public zcm_trans_t { Serial ser; int baud; @@ -277,7 +280,8 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t auto* baudStr = findOption("baud"); if (!baudStr) { fprintf(stderr, "Baud unspecified. Bypassing serial baud setup.\n"); - } else { + } + else { baud = atoi(baudStr->c_str()); if (baud == 0) { ZCM_DEBUG("expected integer argument for 'baud'"); @@ -288,11 +292,11 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t hwFlowControl = false; auto* hwFlowControlStr = findOption("hw_flow_control"); if (hwFlowControlStr) { - if (*hwFlowControlStr == "true") { - hwFlowControl = true; - } else if (*hwFlowControlStr == "false") { + if (*hwFlowControlStr == "true") { hwFlowControl = true; } + else if (*hwFlowControlStr == "false") { hwFlowControl = false; - } else { + } + else { ZCM_DEBUG("expected boolean argument for 'hw_flow_control'"); return; } @@ -301,11 +305,11 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t raw = false; auto* rawStr = findOption("raw"); if (rawStr) { - if (*rawStr == "true") { - raw = true; - } else if (*rawStr == "false") { + if (*rawStr == "true") { raw = true; } + else if (*rawStr == "false") { raw = false; - } else { + } + else { ZCM_DEBUG("expected boolean argument for 'raw'"); return; } @@ -313,9 +317,7 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t rawChan = ""; auto* rawChanStr = findOption("raw_channel"); - if (rawChanStr) { - rawChan = *rawChanStr; - } + if (rawChanStr) { rawChan = *rawChanStr; } rawSize = 1024; auto* rawSizeStr = findOption("raw_size"); @@ -333,13 +335,11 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t if (raw) { rawBuf.reset(new uint8_t[rawSize]); gst = nullptr; - } else { - gst = zcm_trans_generic_serial_create(&ZCM_TRANS_CLASSNAME::get, - &ZCM_TRANS_CLASSNAME::put, - this, - &ZCM_TRANS_CLASSNAME::timestamp_now, - nullptr, - MTU, MTU * 10); + } + else { + gst = zcm_trans_generic_serial_create( + &ZCM_TRANS_CLASSNAME::get, &ZCM_TRANS_CLASSNAME::put, this, + &ZCM_TRANS_CLASSNAME::timestamp_now, nullptr, MTU, MTU * 10); } } @@ -349,41 +349,38 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t if (gst) zcm_trans_generic_serial_destroy(gst); } - bool good() - { - return ser.isOpen(); - } + bool good() { return ser.isOpen(); } static size_t get(uint8_t* data, size_t nData, void* usr) { - ZCM_TRANS_CLASSNAME* me = cast((zcm_trans_t*) usr); + ZCM_TRANS_CLASSNAME* me = cast((zcm_trans_t*)usr); uint64_t startUtime = TimeUtil::utime(); int ret = me->ser.read(data, nData, me->timeoutLeftUs); uint64_t diff = TimeUtil::utime() - startUtime; - me->timeoutLeftUs = me->timeoutLeftUs > diff ? me->timeoutLeftUs - diff : 0; + me->timeoutLeftUs = + me->timeoutLeftUs > diff ? me->timeoutLeftUs - diff : 0; return ret < 0 ? 0 : ret; } static size_t put(const uint8_t* data, size_t nData, void* usr) { - ZCM_TRANS_CLASSNAME* me = cast((zcm_trans_t*) usr); + ZCM_TRANS_CLASSNAME* me = cast((zcm_trans_t*)usr); int ret = me->ser.write(data, nData); return ret < 0 ? 0 : ret; } - static uint64_t timestamp_now(void* usr) - { return TimeUtil::utime(); } + static uint64_t timestamp_now(void* usr) { return TimeUtil::utime(); } /********************** METHODS **********************/ - size_t getMtu() - { return raw ? MTU : zcm_trans_get_mtu(this->gst); } + size_t getMtu() { return raw ? MTU : zcm_trans_get_mtu(this->gst); } int sendmsg(zcm_msg_t msg) { if (raw) { if (put(msg.buf, msg.len, this) != 0) return ZCM_EOK; return ZCM_EAGAIN; - } else { + } + else { // Note: No need to lock here ONLY because the internals of // generic serial transport sendmsg only use the sendBuffer // and touch no variables related to receiving @@ -394,7 +391,10 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t } int recvmsgEnable(const char* channel, bool enable) - { return raw ? ZCM_EOK : zcm_trans_recvmsg_enable(this->gst, channel, enable); } + { + return raw ? ZCM_EOK + : zcm_trans_recvmsg_enable(this->gst, channel, enable); + } int recvmsg(zcm_msg_t* msg, unsigned timeoutMs) { @@ -404,27 +404,30 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t size_t sz = get(rawBuf.get(), rawSize, this); if (sz == 0 || rawChan.empty()) return ZCM_EAGAIN; - msg->utime = timestamp_now(this); + msg->utime = timestamp_now(this); msg->channel = rawChan.c_str(); - msg->len = sz; - msg->buf = rawBuf.get(); + msg->len = sz; + msg->buf = rawBuf.get(); return ZCM_EOK; - } else { + } + else { do { uint64_t startUtime = TimeUtil::utime(); // Note: No need to lock here ONLY because the internals of - // generic serial transport recvmsg only use the recv related - // data members and touch no variables related to sending + // generic serial transport recvmsg only use the recv + // related data members and touch no variables related to + // sending int ret = zcm_trans_recvmsg(this->gst, msg, 0); if (ret == ZCM_EOK) return ret; uint64_t diff = TimeUtil::utime() - startUtime; startUtime = TimeUtil::utime(); - // Note: timeoutLeftUs is calculated here because serial_update_rx - // needs it to be set properly so that the blocking read in - // `get` knows how long it has to exit + // Note: timeoutLeftUs is calculated here because + // serial_update_rx + // needs it to be set properly so that the blocking read + // in `get` knows how long it has to exit timeoutLeftUs = timeoutLeftUs > diff ? timeoutLeftUs - diff : 0; serial_update_rx(this->gst); @@ -446,20 +449,24 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t return (ZCM_TRANS_CLASSNAME*)zt; } - static size_t _getMtu(zcm_trans_t* zt) - { return cast(zt)->getMtu(); } + static size_t _getMtu(zcm_trans_t* zt) { return cast(zt)->getMtu(); } static int _sendmsg(zcm_trans_t* zt, zcm_msg_t msg) - { return cast(zt)->sendmsg(msg); } + { + return cast(zt)->sendmsg(msg); + } static int _recvmsgEnable(zcm_trans_t* zt, const char* channel, bool enable) - { return cast(zt)->recvmsgEnable(channel, enable); } + { + return cast(zt)->recvmsgEnable(channel, enable); + } static int _recvmsg(zcm_trans_t* zt, zcm_msg_t* msg, unsigned timeout) - { return cast(zt)->recvmsg(msg, timeout); } + { + return cast(zt)->recvmsg(msg, timeout); + } - static void _destroy(zcm_trans_t* zt) - { delete cast(zt); } + static void _destroy(zcm_trans_t* zt) { delete cast(zt); } static const TransportRegister reg; }; @@ -469,17 +476,16 @@ zcm_trans_methods_t ZCM_TRANS_CLASSNAME::methods = { &ZCM_TRANS_CLASSNAME::_sendmsg, &ZCM_TRANS_CLASSNAME::_recvmsgEnable, &ZCM_TRANS_CLASSNAME::_recvmsg, - NULL, // drops - NULL, // update + NULL, // drops + NULL, // update &ZCM_TRANS_CLASSNAME::_destroy, }; -static zcm_trans_t* create(zcm_url_t* url, char **opt_errmsg) +static zcm_trans_t* create(zcm_url_t* url, char** opt_errmsg) { - if (opt_errmsg) *opt_errmsg = NULL; // Feature unused in this transport + if (opt_errmsg) *opt_errmsg = NULL; // Feature unused in this transport auto* trans = new ZCM_TRANS_CLASSNAME(url); - if (trans->good()) - return trans; + if (trans->good()) return trans; delete trans; return nullptr; @@ -488,8 +494,9 @@ static zcm_trans_t* create(zcm_url_t* url, char **opt_errmsg) #ifdef USING_TRANS_SERIAL // Register this transport with ZCM const TransportRegister ZCM_TRANS_CLASSNAME::reg( - "serial", "Transfer data via a serial connection " - "(e.g. 'serial:///dev/ttyUSB0?baud=115200&hw_flow_control=true' or " - "'serial:///dev/pts/10?raw=true&raw_channel=RAW_SERIAL')", + "serial", + "Transfer data via a serial connection " + "(e.g. 'serial:///dev/ttyUSB0?baud=115200&hw_flow_control=true' or " + "'serial:///dev/pts/10?raw=true&raw_channel=RAW_SERIAL')", create); #endif diff --git a/zcm/transport/transport_serial.hpp b/zcm/transport/transport_serial.hpp new file mode 100644 index 000000000..5e9dea9d1 --- /dev/null +++ b/zcm/transport/transport_serial.hpp @@ -0,0 +1,33 @@ +#include "zcm/util/lockfile.h" + +#include + +using u8 = uint8_t; +using u16 = uint16_t; +using u32 = uint32_t; +using u64 = uint64_t; + +struct Serial { + Serial() {} + ~Serial() { close(); } + + bool open(const std::string& port, int baud, bool hwFlowControl); + bool isOpen() { return fd > 0; }; + void close(); + + int write(const u8* buf, size_t sz); + int read(u8* buf, size_t sz, u64 timeoutMs); + // Returns 0 on invalid input baud otherwise returns termios constant baud + // value + static int convertBaud(int baud); + + Serial(const Serial&) = delete; + Serial(Serial&&) = delete; + Serial& operator=(const Serial&) = delete; + Serial& operator=(Serial&&) = delete; + + private: + std::string port; + int fd = -1; + lockfile_t* lf; +}; diff --git a/zcm/util/cobs.h b/zcm/util/cobs.h new file mode 100644 index 000000000..bb0b3ae37 --- /dev/null +++ b/zcm/util/cobs.h @@ -0,0 +1,102 @@ +#ifndef _ZCM_TRANS_NONBLOCKING_COBS_H +#define _ZCM_TRANS_NONBLOCKING_COBS_H + +#include +#include + +/* Only define inline for C99 builds or better */ +#if (__STDC_VERSION__ >= 199901L) || (__cplusplus) +#define INLINE inline +#else +#define INLINE +#endif + +/** + * @brief COBS encode \p length bytes from \p src to \p dest + * + * @param dest + * @param src + * @param length + * + * @return Encoded buffer length in bytes + * + * @note Does not output delimiter byte + * @note \p dest must point to output buffer of length + * greater than or equal to \p src + */ +static INLINE size_t cobs_encode(uint8_t* dest, const uint8_t* src, + size_t length) +{ + uint8_t* encode = dest; + uint8_t* codep = encode++; + uint8_t code = 1; + + for (const uint8_t* byte = src; length--; ++byte) { + if (*byte) // not zero, write it + *encode++ = *byte, ++code; + + if (!*byte || code == 0xff) { // zero or end of block, restart + *codep = code; + code = 1; + codep = encode; + + if (!*byte || length) ++encode; + } + } + + *codep = code; + + return (size_t)(encode - dest); +} + +/** + * @brief COBS decode \p length bytes from \p src to \p dest + * + * @param dest + * @param src + * @param length + * + * @return Number of bytes successfully decoded + * + * @note Stops decoding if delimiter byte is found + */ +static INLINE size_t cobs_decode(uint8_t* dest, const uint8_t* src, + size_t length) +{ + const uint8_t* byte = src; + uint8_t* decode = dest; + + for (uint8_t code = 0xff, block = 0; byte < src + length; --block) { + if (block) { + *decode = *byte; + decode++; + byte++; + continue; + } + + if (code != 0xff) { + *decode = 0; + decode++; + } + block = code = *byte; + byte++; + + if (!code) break; // found delimeter + } + + return (size_t)(decode - dest); +} + +/** + * @brief Calculate maximum COBS encoding overhead + * + * @param msgSize Size of the pre-encoded message + * + * @return Maximum number of overhead bytes + */ +static INLINE size_t cobsMaxOverhead(size_t msgSize) +{ + return (msgSize + 253) / 254; +} + +#endif /* _ZCM_TRANS_NONBLOCKING_COBS_H */ diff --git a/zcm/wscript b/zcm/wscript index d79484ce8..9c091679d 100644 --- a/zcm/wscript +++ b/zcm/wscript @@ -39,6 +39,7 @@ def build(ctx): 'util/*.c', 'util/*.cpp', 'tools/*.c', 'tools/*.cpp', 'transport/*.c', 'transport/*.cpp', + 'transport/cobs_serial/*.c', 'transport/udp/*.cpp', 'transport/lockfree/lf_*.c'], excl=srcExcludes)) @@ -50,7 +51,9 @@ def build(ctx): 'transport/generic_serial_transport.c', 'transport/generic_serial_circ_buff.h', 'transport/generic_serial_circ_buff.c', - 'transport/generic_serial_fletcher.h'] + 'transport/generic_serial_fletcher.h', + 'transport/cobs_serial/generic_serial_cobs_transport.h', + 'transport/cobs_serial/generic_serial_cobs_transport.c'] if ctx.env.USING_THIRD_PARTY: embedSource.append('transport/third-party/embedded/**') @@ -104,7 +107,8 @@ def build(ctx): ctx.install_files('${PREFIX}/include/zcm/transport', ['transport/generic_serial_transport.h', 'transport/generic_serial_circ_buff.h', - 'transport/generic_serial_fletcher.h']) + 'transport/generic_serial_fletcher.h', + 'transport/cobs_serial/generic_serial_cobs_transport.h']) ctx.install_files('${PREFIX}/share/embedded', ['zcm-embed.tar.gz']) @@ -113,10 +117,10 @@ def build(ctx): ctx.recurse('json') if ctx.env.USING_JAVA: - ctx.recurse('java'); + ctx.recurse('java') if ctx.env.USING_NODEJS: - ctx.recurse('js'); + ctx.recurse('js') if ctx.env.USING_PYTHON: ctx.recurse('python')