diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..a7f71b5 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +*.dat binary +*.rec binary diff --git a/Makefile b/Makefile index 3946ed5..96d0162 100644 --- a/Makefile +++ b/Makefile @@ -25,10 +25,10 @@ VERSION := $(shell git describe --always HEAD) CHECKSUM_SIGNING_CERT ?= ./keys/checksumsign.private.pem -### Simple build ########################################## +### Simple debug build #################################### .PHONY: build build: - @./build.sh -i $(SRC) -o $(OUT) -v $(VERSION) + @./build.sh -i $(SRC) -o $(OUT) -v $(VERSION) -t debug ### Test suite ############################################ diff --git a/Readme.md b/Readme.md index c3cc9ad..d5e71a9 100644 --- a/Readme.md +++ b/Readme.md @@ -14,12 +14,30 @@ The default nix shell (defined in `nix/devShell.nix`) provides all necessary dep - Enter the nix development shell: `nix develop` - Build the driver: `make` -- Run the driver: `./bin/dividat-driver` +- Run the driver: `make run` or `./bin/dividat-driver` + + +### Build tags + +By default, all the development-related make targets (`make build`, and those +that depend on it, e.g. `make test`, `make run`) build the driver with `debug` +tag. + +The `debug` tag currently enables mock device registration for the Flex backend. + +Cross-build targets and plain `go build` invocations do not pass the `debug` +tag and produce "release" builds. + +If you are using IDEs/other tools to build the Driver, make sure they pass the +`debug` tag for dev builds. ### Tests Run the test suite with: `make test`. +Make sure actual Senso/Flex devices are unplugged, since their presence can +cause test assumptions about available devices to fail. + ### Go modules To install a module, use `go get github.com/owner/repo`. @@ -120,7 +138,20 @@ Data from Senso can be recorded using the [`recorder`](src/dividat-driver/record #### Senso Flex data -Like Senso data, but with `make record-flex`. +On Linux, you can record the raw serial data using: + + ./tools/record-flex-serial -o recording.serial.dat + +On macOS, you can only record the WebSocket binary data from the `/flex` +endpoint as output by the Driver, using: + + make record-flex > recording.ws.dat + +For Flex V6 (Sensitronics) devices, the two methods should be equivalent. + +The Driver must be running and connected to the device in both cases. + +For more details, see the [Flex recording and replay](tools/flex-recording-and-replay.md) docs. ### Data replayer @@ -140,8 +171,16 @@ The Senso replayer will appear as a Senso network device, so both driver and rep #### Senso Flex replay -The Senso Flex replayer (`npm run replay-flex`) supports the same parameters as the Senso replayer. +The Senso Flex replayer (`npm run replay-flex`) supports the same parameters as the Senso replayer and also allows to fake device metadata. + +Driver must be running and built with the `debug` tag, which is the default if +you run `make build` and/or `make run`. + +You can then replay a recording using: + + node tools/replay-flex -d recording.dat -It mocks the driver with respect to the `/flex` WebSocket resource and the `/` metadata HTTP route, so the real driver can not be running at the same time. +If you are using a WebSocket binary stream recording, you should also specify +`--passthru` mode. -You can control the mocked driver version via the `--driver-version` flag. +For more details, see the [Flex recording and replay](tools/flex-recording-and-replay.md) docs. diff --git a/build.sh b/build.sh index ecdeb1b..084ac18 100755 --- a/build.sh +++ b/build.sh @@ -32,8 +32,9 @@ set -euo pipefail IN="" OUT="" VERSION="" +BUILD_TAGS="" -while getopts "i:o:v:" opt; do +while getopts "i:o:v:t:" opt; do case $opt in i) IN="$OPTARG" ;; @@ -41,6 +42,8 @@ while getopts "i:o:v:" opt; do ;; v) VERSION="$OPTARG" ;; + t) BUILD_TAGS="$OPTARG" + ;; \?) echo "Invalid option: -$OPTARG" >&2; exit 1 ;; esac done @@ -60,7 +63,7 @@ ensure_flag_set "-i" "$IN" ensure_flag_set "-o" "$OUT" if [ "$all_flags_set" = false ]; then - echo "Usage: build-driver -v -i -o " + echo "Usage: build-driver -v -i -o [-t ]" exit 1 fi @@ -81,7 +84,9 @@ if [ $VERBOSE = "1" ]; then echo "GCO_ENABLED=${CGO_ENABLED:=}" echo "CC=${CC:=}" echo "LD_FLAGS=$LD_FLAGS" + echo "BUILD_TAGS=$BUILD_TAGS" fi -go build -ldflags "$LD_FLAGS" -o "$OUT" "$IN" +go build -tags "$BUILD_TAGS" -ldflags "$LD_FLAGS" -o "$OUT" "$IN" + echo "Built $OUT" diff --git a/go.mod b/go.mod index d1f69b1..5bcd391 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/dividat/driver -go 1.12 +go 1.18 require ( github.com/cenkalti/backoff v2.2.1+incompatible @@ -17,8 +17,16 @@ require ( // Both projects are dormant at the moment, but we might want to re-evaluate this // dependency choice as these projects evolve in the future. github.com/libp2p/zeroconf/v2 v2.2.0 - github.com/pin/tftp v2.1.0+incompatible github.com/sirupsen/logrus v1.8.1 go.bug.st/serial v1.6.1 ) + +require ( + github.com/creack/goselect v0.1.2 // indirect + github.com/miekg/dns v1.1.43 // indirect + golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6 // indirect + golang.org/x/sys v0.19.0 // indirect +) + +replace go.bug.st/serial => github.com/dividat/go-serial v1.6.4-usbmetadata diff --git a/go.sum b/go.sum index 22d9bd1..e27f169 100644 --- a/go.sum +++ b/go.sum @@ -4,11 +4,12 @@ github.com/creack/goselect v0.1.2 h1:2DNy14+JPjRBgPzAd1thbQp4BSIihxcBf0IXhQXDRa0 github.com/creack/goselect v0.1.2/go.mod h1:a/NhLweNvqIYMuxcMOuWY516Cimucms3DglDzQP3hKY= github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/denisbrodbeck/machineid v1.0.1 h1:geKr9qtkB876mXguW2X6TU4ZynleN6ezuMSRhl4D7AQ= github.com/denisbrodbeck/machineid v1.0.1/go.mod h1:dJUwb7PTidGDeYyUBmXZ2GphQBbjJCrnectwCyxcUSI= +github.com/dividat/go-serial v1.6.4-usbmetadata h1:U8b1GaZ1roUtf1Jve6sg+yuHdLe7FsBvZxnm1W7bipo= +github.com/dividat/go-serial v1.6.4-usbmetadata/go.mod h1:nofMJxTeNVny/m6+KaafC6vJGj3miwQZ6vW4BZUGJPI= github.com/ebfe/scard v0.0.0-20190212122703-c3d1b1916a95 h1:OM0MnUcXBysj7ZtXvThVWHMoahuKQ8FuwIdeSLcNdP4= github.com/ebfe/scard v0.0.0-20190212122703-c3d1b1916a95/go.mod h1:8hHvF8DlEq5kE3KWOsZQezdWq1OTOVxZArZMscS954E= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= @@ -25,12 +26,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -go.bug.st/serial v1.6.1 h1:VSSWmUxlj1T/YlRo2J104Zv3wJFrjHIl/T3NeruWAHY= -go.bug.st/serial v1.6.1/go.mod h1:UABfsluHAiaNI+La2iESysd9Vetq7VRdpxvjx7CmmOE= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6 h1:0PC75Fz/kyMGhL0e1QnypqK2kQMqKt9csD1GnMJR+Zk= golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= @@ -42,13 +39,10 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY= -golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/nix/devShell.nix b/nix/devShell.nix index 78047ce..238b102 100644 --- a/nix/devShell.nix +++ b/nix/devShell.nix @@ -6,8 +6,9 @@ mkShell go gcc - # node for tests + # test dependencies nodejs + socat # Required for building go dependencies autoconf diff --git a/package-lock.json b/package-lock.json index f86cd8c..aa1e519 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,11 +12,11 @@ "bonjour": "^3.5.0", "chai": "^4.1.1", "mocha": "^10.8.2", - "ws": "^5.2.4" + "ws": "^8.18.3" }, "devDependencies": { "binary-split": "^1.0.5", - "minimist": "^1.2.8" + "commander": "^12.1.0" } }, "node_modules/ansi-colors": { @@ -80,11 +80,6 @@ "node": "*" } }, - "node_modules/async-limiter": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz", - "integrity": "sha1-ePrtjD0HSrgfIrTphdeehzj3IPg=" - }, "node_modules/balanced-match": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz", @@ -286,6 +281,15 @@ "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==" }, + "node_modules/commander": { + "version": "12.1.0", + "resolved": "https://registry.npmjs.org/commander/-/commander-12.1.0.tgz", + "integrity": "sha512-Vw8qHK3bZM9y/P10u3Vib8o/DdkvA2OtPtZvD871QKjy74Wj1WSKFILMPRPSdUSx5RFK1arlJzEtA4PkFgnbuA==", + "dev": true, + "engines": { + "node": ">=18" + } + }, "node_modules/core-util-is": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", @@ -739,15 +743,6 @@ "node": ">=10" } }, - "node_modules/minimist": { - "version": "1.2.8", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.8.tgz", - "integrity": "sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==", - "dev": true, - "funding": { - "url": "https://github.com/sponsors/ljharb" - } - }, "node_modules/mocha": { "version": "10.8.2", "resolved": "https://registry.npmjs.org/mocha/-/mocha-10.8.2.tgz", @@ -1130,11 +1125,23 @@ "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==" }, "node_modules/ws": { - "version": "5.2.4", - "resolved": "https://registry.npmjs.org/ws/-/ws-5.2.4.tgz", - "integrity": "sha512-fFCejsuC8f9kOSu9FYaOw8CdO68O3h5v0lg4p74o8JqWpwTf9tniOD+nOB78aWoVSS6WptVUmDrp/KPsMVBWFQ==", - "dependencies": { - "async-limiter": "~1.0.0" + "version": "8.18.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.3.tgz", + "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } } }, "node_modules/xtend": { diff --git a/package.json b/package.json index 9d7d0f7..e959ea5 100644 --- a/package.json +++ b/package.json @@ -12,10 +12,10 @@ "bonjour": "^3.5.0", "chai": "^4.1.1", "mocha": "^10.8.2", - "ws": "^5.2.4" + "ws": "^8.18.3" }, "devDependencies": { "binary-split": "^1.0.5", - "minimist": "^1.2.8" + "commander": "^12.1.0" } } diff --git a/src/dividat-driver/flex/device/passthru/main.go b/src/dividat-driver/flex/device/passthru/main.go new file mode 100644 index 0000000..1615c2c --- /dev/null +++ b/src/dividat-driver/flex/device/passthru/main.go @@ -0,0 +1,86 @@ +// Passthru device for sending chunked serial bytes "as is" +// Useful for: +// - recording raw serial data from a device +// - replaying recoreded raw serial data +// +// Currently used in unit tests. +package passthru + +import ( + "bufio" + "context" + "io" + + "github.com/sirupsen/logrus" + "go.bug.st/serial" +) + +// Serial communication +type PassthruHandler struct{} + +func (PassthruHandler) Run(ctx context.Context, logger *logrus.Entry, port serial.Port, tx chan interface{}, onReceive func([]byte)) { + logger.Info("PassthruReader started") + readerCtx := context.WithoutCancel(ctx) + + // Channel to receive ack that reader is done + readerDoneChan := make(chan struct{}) + + // Start the initial reader goroutine + go readFromPort(readerCtx, logger, port, onReceive, readerDoneChan) + + // Forward WebSocket commands to device + for { + select { + case <-ctx.Done(): + return + + case <-readerDoneChan: + return + + case i := <-tx: + data, _ := i.([]byte) + _, err := port.Write(data) + if err != nil { + logger.WithField("error", err).Info("Failed to write command to serial port.") + return + } + logger.WithField("bytes", data).Debug("Wrote binary command to serial out: " + string(data)) + } + } +} + +// Infinite loop for requesting and reading serial data. +// Stops (returns) upon any error or ctx cancel. +func readFromPort( + ctx context.Context, + logger *logrus.Entry, + port serial.Port, + onReceive func([]byte), + doneChan chan<- struct{}, +) { + defer func() { + // Signal that the reader has completed + close(doneChan) + }() + + // max estimated frame size in bytes: + // 10 (header) + 24*24*4 (samples) + 20 (misc extras) = 2380 bytes, so ~2 kilobytes + reader := bufio.NewReaderSize(port, 2048) + + var message []byte = make([]byte, 2048) + // Start signal acquisition + for { + // Terminate if we were cancelled + if ctx.Err() != nil { + logger.Debug("Stopping reader: context cancelled") + return + } + + readBytes, err := io.ReadAtLeast(reader, message, 1) + if err != nil { + logger.WithField("err", err).Error("Error reading from serial port") + return + } + onReceive(message[:readBytes]) + } +} diff --git a/src/dividat-driver/flex/device/sensingtex/main.go b/src/dividat-driver/flex/device/sensingtex/main.go new file mode 100644 index 0000000..e3bbdf9 --- /dev/null +++ b/src/dividat-driver/flex/device/sensingtex/main.go @@ -0,0 +1,236 @@ +// Serial communication +package sensingtex + +import ( + "bufio" + "bytes" + "context" + "encoding/binary" + + "github.com/sirupsen/logrus" + "go.bug.st/serial" +) + +type ReaderState int + +const ( + WAITING_FOR_HEADER ReaderState = iota + HEADER_START + HEADER_READ_LENGTH_MSB + HEADER_READ_LENGTH_LSB + WAITING_FOR_BODY + BODY_START + BODY_READ_SAMPLE + UNEXPECTED_BYTE +) + +const ( + HEADER_START_MARKER = 'N' + BODY_START_MARKER = 'P' +) + +const ( + // row, column and pressure value, one uint8 each + BYTES_PER_SAMPLE_8BIT = 3 + + // same as above, but pressure value is 2 bytes (uint16), big-endian + // Note: Sensing Tex docs state value max is 2^12-1 (hence "12bit"), + // but in practice they seem to send values up to ~9000, so more like + // "14 bit". + BYTES_PER_SAMPLE_12BIT = 4 +) + +var BITDEPTH_8_CMD = []byte{'U', 'L', '\n'} + +var BITDEPTH_12_CMD = []byte{'U', 'M', '\n'} + +func isBitdepthCommand(cmd []byte) bool { + return bytes.Equal(cmd, BITDEPTH_8_CMD) || bytes.Equal(cmd, BITDEPTH_12_CMD) +} + +func bitdepthCommandToBytesPerSample(cmd []byte) int { + if bytes.Equal(cmd, BITDEPTH_8_CMD) { + return BYTES_PER_SAMPLE_8BIT + } else { + return BYTES_PER_SAMPLE_12BIT + } +} + +type SensingTexHandler struct{} + +func (SensingTexHandler) Run(ctx context.Context, logger *logrus.Entry, port serial.Port, tx chan interface{}, onReceive func([]byte)) { + readerCtx, readerCtxCancel := context.WithCancel(ctx) + + port.ResetInputBuffer() // flush any unread data buffered by the OS + + // For backwards compatibility, we set 8bit depth by default + // TODO: use UsbDeviceInfo.BcdDevice to identify V4 vs V5 and get rid of this + _, err := port.Write(BITDEPTH_8_CMD) + if err != nil { + logger.WithField("error", err).Info("Failed to set bitdepth of 8.") + return + } + configuredBytesPerSample := BYTES_PER_SAMPLE_8BIT + + // Channel to receive ack that reader is done + readerDoneChan := make(chan struct{}) + + // Start the initial reader goroutine + go readFromPort(readerCtx, logger, port, configuredBytesPerSample, onReceive, readerDoneChan) + + // Forward WebSocket commands to device + for { + select { + case <-ctx.Done(): + return + + case <-readerDoneChan: + return + + case i := <-tx: + data, _ := i.([]byte) + + if isBitdepthCommand(data) { + newBytesPerSample := bitdepthCommandToBytesPerSample(data) + + // If bytes per sample has changed, we need to restart the reader + if newBytesPerSample != configuredBytesPerSample { + logger.WithFields(logrus.Fields{ + "old": configuredBytesPerSample, + "new": newBytesPerSample, + }).Info("Bytes per sample changed, will restart reader") + + logger.Debug("Sending stop to reader and waiting for ack") + readerCtxCancel() + <-readerDoneChan + logger.Debug("Ack received, reader stopped") + + _, err = port.Write(data) + if err != nil { + logger.WithField("error", err).Info("Failed to write new bitdepth.") + return + } + + port.ResetInputBuffer() // flush any data that was not yet read + + configuredBytesPerSample = newBytesPerSample + + readerCtx, readerCtxCancel = context.WithCancel(ctx) + readerDoneChan = make(chan struct{}) + + // Start a new reader goroutine with updated bytesPerSample + go readFromPort(readerCtx, logger, port, configuredBytesPerSample, onReceive, readerDoneChan) + } + } else { + // For non-bitdepth commands, just forward them + _, err = port.Write(data) + if err != nil { + logger.WithField("error", err).Info("Failed to write command to serial port.") + return + } + logger.WithField("bytes", data).Debug("Wrote binary command to serial out: " + string(data)) + } + } + } +} + +// Infinite loop for requesting and reading serial data. +// Stops (returns) upon any error or ctx cancel. +func readFromPort( + ctx context.Context, + logger *logrus.Entry, + port serial.Port, + bytesPerSample int, + onReceive func([]byte), + doneChan chan<- struct{}, +) { + defer func() { + // Signal that the reader has completed + close(doneChan) + }() + + reader := bufio.NewReader(port) + state := WAITING_FOR_HEADER + var samplesLeftInSet int + var bytesLeftInSample int + + // Note: for Flex v4 this command seems to cause the firmware to push + // data as fast as we can consume it, whereas for Flex v5 it merely + // requests a single frame. + START_MEASUREMENT_CMD := []byte{'S', '\n'} + _, err := port.Write(START_MEASUREMENT_CMD) + if err != nil { + logger.WithField("error", err).Info("Failed to write start message to serial port.") + return + } + + // Start signal acquisition + var buff []byte + for { + // Terminate if we were cancelled + if ctx.Err() != nil { + logger.Debug("Stopping reader: context cancelled") + return + } + + input, err := reader.ReadByte() + if err != nil { + logger.WithField("err", err).Error("Error reading from serial port") + return + } + + var length_msb byte + + // Finite State Machine for parsing byte stream + switch { + case state == WAITING_FOR_HEADER && input == HEADER_START_MARKER: + state = HEADER_START + case state == HEADER_START && input == '\n': + state = HEADER_READ_LENGTH_MSB + case state == HEADER_READ_LENGTH_MSB: + // The number of measurements in each set is given as two + // consecutive bytes (big-endian). + length_msb = input + state = HEADER_READ_LENGTH_LSB + case state == HEADER_READ_LENGTH_LSB: + length_lsb := input + samplesLeftInSet = int(binary.BigEndian.Uint16([]byte{length_msb, length_lsb})) + state = WAITING_FOR_BODY + case state == WAITING_FOR_BODY && input == BODY_START_MARKER: + state = BODY_START + case state == BODY_START && input == '\n': + state = BODY_READ_SAMPLE + buff = []byte{} + bytesLeftInSample = bytesPerSample + case state == BODY_READ_SAMPLE: + buff = append(buff, input) + bytesLeftInSample = bytesLeftInSample - 1 + + if bytesLeftInSample <= 0 { + samplesLeftInSet = samplesLeftInSet - 1 + + if samplesLeftInSet <= 0 { + // Finish and send set + onReceive(buff) + + // Get ready for next set and request it + state = WAITING_FOR_HEADER + // Optional for Flex v4, mandatory for v5 + _, err = port.Write(START_MEASUREMENT_CMD) + if err != nil { + logger.WithField("error", err).Info("Failed to write poll message to serial port.") + return + } + } else { + // Start next point + bytesLeftInSample = bytesPerSample + } + } + case state == UNEXPECTED_BYTE && input == HEADER_START_MARKER: + // Recover from error state when a new header is seen + state = HEADER_START + default: + state = UNEXPECTED_BYTE + } + } +} diff --git a/src/dividat-driver/flex/device/sensitronics/main.go b/src/dividat-driver/flex/device/sensitronics/main.go new file mode 100644 index 0000000..2ec2ec0 --- /dev/null +++ b/src/dividat-driver/flex/device/sensitronics/main.go @@ -0,0 +1,130 @@ +package sensitronics + +import ( + "bufio" + "context" + "encoding/binary" + "fmt" + "io" + + "github.com/sirupsen/logrus" + "go.bug.st/serial" +) + +const ( + HEADER_START_MARKER = 0xFF + HEADER_SIZE = 4 +) + +type SensitronicsHandler struct{} + +func (SensitronicsHandler) Run(ctx context.Context, logger *logrus.Entry, port serial.Port, tx chan interface{}, onReceive func([]byte)) { + readerCtx := context.WithoutCancel(ctx) + + port.ResetInputBuffer() // flush any unread data buffered by the OS + + // Channel to receive ack that reader is done + readerDoneChan := make(chan struct{}) + + // Start the initial reader goroutine + go readFromPort(readerCtx, logger, port, onReceive, readerDoneChan) + + // Forward WebSocket commands to device + for { + select { + case <-ctx.Done(): + return + + case <-readerDoneChan: + return + + case i := <-tx: + data, _ := i.([]byte) + _, err := port.Write(data) + if err != nil { + logger.WithField("error", err).Info("Failed to write command to serial port.") + return + } + logger.WithField("bytes", data).Debug("Wrote binary command to serial out: " + string(data)) + } + } +} + +func readMessage(reader *bufio.Reader) ([]byte, error) { + // Read start marker + marker, err := reader.ReadByte() + if err != nil { + return nil, err + } + if marker != HEADER_START_MARKER { + return nil, fmt.Errorf("expected header start marker 0x%02X, got 0x%02X", HEADER_START_MARKER, marker) + } + + // Read message type + messageType, err := reader.ReadByte() + if err != nil { + return nil, err + } + + // Read message length + var lengthBytes [2]byte + if _, err := io.ReadFull(reader, lengthBytes[:]); err != nil { + return nil, err + } + bodyLength := int(binary.LittleEndian.Uint16(lengthBytes[:])) + + // Allocate full message buffer and reassemble header + message := make([]byte, HEADER_SIZE+bodyLength) + message[0] = marker + message[1] = messageType + message[2] = lengthBytes[0] + message[3] = lengthBytes[1] + + // Read body directly into message buffer + if _, err := io.ReadFull(reader, message[HEADER_SIZE:]); err != nil { + return nil, err + } + + return message, nil +} + +// Infinite loop for requesting and reading serial data. +// Stops (returns) upon any error or ctx cancel. +func readFromPort( + ctx context.Context, + logger *logrus.Entry, + port serial.Port, + onReceive func([]byte), + doneChan chan<- struct{}, +) { + defer func() { + // Signal that the reader has completed + close(doneChan) + }() + + reader := bufio.NewReader(port) + + START_MEASUREMENT_CMD := []byte{'S', '\n'} + _, err := port.Write(START_MEASUREMENT_CMD) + if err != nil { + logger.WithField("error", err).Info("Failed to write start message to serial port.") + return + } + + // Start signal acquisition + for { + // Terminate if we were cancelled + if ctx.Err() != nil { + logger.Debug("Stopping reader: context cancelled") + return + } + + message, err := readMessage(reader) + if err != nil { + logger.WithField("err", err).Error("Error reading from serial port") + return + } + + onReceive(message) + } +} diff --git a/src/dividat-driver/flex/enumerator/main.go b/src/dividat-driver/flex/enumerator/main.go new file mode 100644 index 0000000..439bfdf --- /dev/null +++ b/src/dividat-driver/flex/enumerator/main.go @@ -0,0 +1,134 @@ +package enumerator + +import ( + "context" + "strconv" + "strings" + + "github.com/sirupsen/logrus" + serialenum "go.bug.st/serial/enumerator" + + "github.com/dividat/driver/src/dividat-driver/flex/enumerator/mockdev" + "github.com/dividat/driver/src/dividat-driver/protocol" + "github.com/dividat/driver/src/dividat-driver/util" +) + +type DeviceFamily int + +const ( + DeviceFamilyPassthru DeviceFamily = iota + DeviceFamilySensingTex + DeviceFamilySensitronics +) + +type DeviceEnumerator struct { + ctx context.Context + log *logrus.Entry + mockDeviceRegistry *mockdev.MockDeviceRegistry +} + +func New(ctx context.Context, log *logrus.Entry, mockDeviceRegistry *mockdev.MockDeviceRegistry) *DeviceEnumerator { + return &DeviceEnumerator{ + ctx: ctx, + log: log, + mockDeviceRegistry: mockDeviceRegistry, + } +} + +func (handle *DeviceEnumerator) getSerialPortList() ([]*serialenum.PortDetails, error) { + realDevices, err := serialenum.GetDetailedPortsList() + if err != nil { + return nil, err + } + + mockDevices := handle.mockDeviceRegistry.ListMockDevices() + + allDevices := append(realDevices, mockDevices...) + + return allDevices, nil +} + +// Check whether a port looks like a potential Flex device. +// +// Vendor IDs: +// +// 16C0 - Van Ooijen Technische Informatica (Teensy) +func isTeensyDevice(device protocol.UsbDeviceInfo) bool { + return device.IdVendor == 0x16C0 +} + +func findMatchingDeviceFamily(device protocol.UsbDeviceInfo) *DeviceFamily { + if !isTeensyDevice(device) { + return nil + } + + if strings.HasPrefix(device.Product, "PASSTHRU") { + return util.PointerTo(DeviceFamilyPassthru) + } + + if device.Manufacturer == "Teensyduino" { + return util.PointerTo(DeviceFamilySensingTex) + } else if device.Manufacturer == "Sensitronics" || device.Manufacturer == "Dividat" { + return util.PointerTo(DeviceFamilySensitronics) + } + + return nil +} + +type MatchedDevice struct { + Family DeviceFamily + Info protocol.UsbDeviceInfo +} + +func (handle *DeviceEnumerator) ListMatchingDevices() []MatchedDevice { + ports, err := handle.getSerialPortList() + if err != nil { + handle.log.WithField("error", err).Info("Could not list serial devices.") + return nil + } + var matching []MatchedDevice + for _, port := range ports { + handle.log.WithField("name", port.Name).WithField("vendor", port.VID).Debug("Considering serial port.") + + device, err := portDetailsToDeviceInfo(*port) + if err != nil { + handle.log.WithField("port", port).WithField("err", err).Error("Failed to convert serial port details to device info!") + continue + } + + family := findMatchingDeviceFamily(*device) + + if family != nil { + handle.log.WithField("name", port.Name).WithField("family", *family).Debug("Serial port matches a Flex device.") + matchedDevice := MatchedDevice{Family: *family, Info: *device} + matching = append(matching, matchedDevice) + } + } + return matching +} + +func portDetailsToDeviceInfo(port serialenum.PortDetails) (*protocol.UsbDeviceInfo, error) { + idVendor, err := strconv.ParseUint(port.VID, 16, 16) // hex, uint16 + if err != nil { + return nil, err + } + idProduct, err := strconv.ParseUint(port.PID, 16, 16) // hex, uint16 + if err != nil { + return nil, err + } + bcdDevice, err := strconv.ParseUint(port.BcdDevice, 16, 16) // hex, uint16 + if err != nil { + return nil, err + } + + deviceInfo := protocol.UsbDeviceInfo{ + Path: port.Name, + IdVendor: uint16(idVendor), + IdProduct: uint16(idProduct), + BcdDevice: uint16(bcdDevice), + SerialNumber: port.SerialNumber, + Manufacturer: port.Manufacturer, + Product: port.Product, + } + return &deviceInfo, nil +} diff --git a/src/dividat-driver/flex/enumerator/mockdev/disabled.go b/src/dividat-driver/flex/enumerator/mockdev/disabled.go new file mode 100644 index 0000000..5c92afd --- /dev/null +++ b/src/dividat-driver/flex/enumerator/mockdev/disabled.go @@ -0,0 +1,26 @@ +//go:build !debug + +package mockdev + +import ( + "net/http" + + "github.com/sirupsen/logrus" + serialenum "go.bug.st/serial/enumerator" +) + +type MockDeviceRegistry struct { +} + +func New(log *logrus.Entry) *MockDeviceRegistry { + return &MockDeviceRegistry{} +} + +func (h *MockDeviceRegistry) ListMockDevices() []*serialenum.PortDetails { + return nil +} + +// Mock device registration is only available in debug builds. +func (h *MockDeviceRegistry) ServeHTTP(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Mock device registration is not available in production builds", http.StatusForbidden) +} diff --git a/src/dividat-driver/flex/enumerator/mockdev/enabled.go b/src/dividat-driver/flex/enumerator/mockdev/enabled.go new file mode 100644 index 0000000..c445483 --- /dev/null +++ b/src/dividat-driver/flex/enumerator/mockdev/enabled.go @@ -0,0 +1,113 @@ +//go:build debug + +package mockdev + +import ( + "encoding/json" + "errors" + "net/http" + "strconv" + "strings" + + "github.com/sirupsen/logrus" + serialenum "go.bug.st/serial/enumerator" +) + +type MockDeviceId int + +var ErrDeviceNotFound = errors.New("mock device id not found") +var ErrDeviceExists = errors.New("mock device id is already defined") + +type MockDeviceRegistry struct { + log *logrus.Entry + registeredMockDevices map[MockDeviceId]*serialenum.PortDetails +} + +func New(log *logrus.Entry) *MockDeviceRegistry { + log.Info("Mock device registry enabled (debug build)") + return &MockDeviceRegistry{ + log: log, + registeredMockDevices: make(map[MockDeviceId]*serialenum.PortDetails), + } +} + +func (h *MockDeviceRegistry) handlePost(w http.ResponseWriter, r *http.Request) { + var portDetails serialenum.PortDetails + if err := json.NewDecoder(r.Body).Decode(&portDetails); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + mockDeviceId := h.registerMockDevice(portDetails) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]int{"id": int(mockDeviceId)}) +} + +func (h *MockDeviceRegistry) handleDelete(w http.ResponseWriter, r *http.Request) { + pathParts := strings.Split(r.URL.Path, "/") + if len(pathParts) != 2 || pathParts[1] == "" { + http.Error(w, "Invalid path", http.StatusBadRequest) + return + } + idStr := pathParts[1] + id, err := strconv.Atoi(idStr) + if err != nil { + http.Error(w, "Invalid ID", http.StatusBadRequest) + return + } + + if err := h.unregisterMockDevice(MockDeviceId(id)); err != nil { + if err == ErrDeviceNotFound { + http.Error(w, "Device not found", http.StatusNotFound) + } else { + http.Error(w, "Internal error", http.StatusInternalServerError) + } + return + } + w.WriteHeader(http.StatusOK) +} + +func (h *MockDeviceRegistry) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPost: + h.handlePost(w, r) + return + case http.MethodDelete: + h.handleDelete(w, r) + return + } + + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) +} + +func (h *MockDeviceRegistry) ListMockDevices() []*serialenum.PortDetails { + ports := make([]*serialenum.PortDetails, 0, len(h.registeredMockDevices)) + for _, port := range h.registeredMockDevices { + ports = append(ports, port) + } + return ports +} + +func (h *MockDeviceRegistry) nextMockDeviceId() MockDeviceId { + maxId := MockDeviceId(-1) + for id := range h.registeredMockDevices { + if id > maxId { + maxId = id + } + } + return maxId + 1 +} + +func (h *MockDeviceRegistry) registerMockDevice(portDetails serialenum.PortDetails) MockDeviceId { + mockDeviceId := h.nextMockDeviceId() + h.registeredMockDevices[mockDeviceId] = &portDetails + + return mockDeviceId +} + +func (h *MockDeviceRegistry) unregisterMockDevice(mockDeviceId MockDeviceId) error { + if _, ok := h.registeredMockDevices[mockDeviceId]; !ok { + return ErrDeviceNotFound + } + delete(h.registeredMockDevices, mockDeviceId) + return nil +} diff --git a/src/dividat-driver/flex/main.go b/src/dividat-driver/flex/main.go index 78f3775..9ca4ca9 100644 --- a/src/dividat-driver/flex/main.go +++ b/src/dividat-driver/flex/main.go @@ -14,365 +14,375 @@ The functionality of this module is as follows: */ import ( - "bufio" - "bytes" "context" - "encoding/binary" + "net/http" + "reflect" "strings" + "sync" "time" "github.com/cskr/pubsub" + gorilla "github.com/gorilla/websocket" "github.com/sirupsen/logrus" "go.bug.st/serial" - "go.bug.st/serial/enumerator" + + "github.com/dividat/driver/src/dividat-driver/flex/device/passthru" + "github.com/dividat/driver/src/dividat-driver/flex/device/sensingtex" + "github.com/dividat/driver/src/dividat-driver/flex/device/sensitronics" + "github.com/dividat/driver/src/dividat-driver/flex/enumerator" + "github.com/dividat/driver/src/dividat-driver/flex/enumerator/mockdev" + "github.com/dividat/driver/src/dividat-driver/protocol" + "github.com/dividat/driver/src/dividat-driver/util" + "github.com/dividat/driver/src/dividat-driver/websocket" ) -// Handle for managing SensingTex connection +// how often to look for Flex devices while there are clients and no devices are +// connected +const backgroundScanIntervalSeconds = 2 + +// pubsub topic names, must be unique +const brokerTopicTx = "tx" +const brokerTopicRx = "rx" +const brokerTopicRxBroadcast = "rx-broadcast" + +// Handle for managing Flex type Handle struct { - broker *pubsub.PubSub + websocket.Handle +} +type DeviceBackend struct { ctx context.Context + log *logrus.Entry + + currentDevice *protocol.UsbDeviceInfo + + enumerator *enumerator.DeviceEnumerator + + broker *pubsub.PubSub cancelCurrentConnection context.CancelFunc - subscriberCount int + connectionChangeMutex *sync.Mutex - log *logrus.Entry + backgroundScanCancel context.CancelFunc + + subscriberCount int } // New returns an initialized handler -func New(ctx context.Context, log *logrus.Entry) *Handle { - handle := Handle{ +func New(ctx context.Context, log *logrus.Entry, mockDeviceRegistry *mockdev.MockDeviceRegistry) *Handle { + backend := DeviceBackend{ + ctx: ctx, + log: log, + + enumerator: enumerator.New(ctx, log.WithField("package", "flex.enumerator"), mockDeviceRegistry), + broker: pubsub.New(32), - ctx: ctx, - log: log, + + connectionChangeMutex: &sync.Mutex{}, + + subscriberCount: 0, } + websocketHandle := websocket.Handle{ + DeviceBackend: &backend, + Broker: backend.broker, + BrokerRx: brokerTopicRx, + BrokerTx: brokerTopicTx, + BrokerRxBroadcast: util.PointerTo(brokerTopicRxBroadcast), + Log: log, + } + + handle := Handle{Handle: websocketHandle} + // Clean up go func() { <-ctx.Done() - handle.broker.Shutdown() + backend.broker.Shutdown() }() return &handle } -// Connect to device -func (handle *Handle) Connect() { - handle.subscriberCount++ - - // If there is no existing connection, create it - if handle.cancelCurrentConnection == nil { - ctx, cancel := context.WithCancel(handle.ctx) +func (backend *DeviceBackend) broadcastMessage(msg protocol.Message) { + broadcast := protocol.Broadcast{Message: msg} + backend.broker.TryPub(broadcast, brokerTopicRxBroadcast) +} - onReceive := func(data []byte) { - handle.broker.TryPub(data, "flex-rx") - } +func (backend *DeviceBackend) broadcastStatusUpdate() { + status := backend.GetStatus() + backend.broadcastMessage(protocol.Message{Status: &status}) +} - go listeningLoop(ctx, handle.log, handle.broker.Sub("flex-tx"), onReceive) +type SerialDeviceHandler interface { + // Read from the serial port and pipe its signal into the callback, summarizing + // package units into a buffer. Forward commands from client. + Run(ctx context.Context, logger *logrus.Entry, port serial.Port, tx chan interface{}, onReceive func([]byte)) +} - handle.cancelCurrentConnection = cancel +// Pick the appropriate handler for the device +func deviceFamilyToHandler(family enumerator.DeviceFamily) SerialDeviceHandler { + switch family { + case enumerator.DeviceFamilyPassthru: + return &passthru.PassthruHandler{} + case enumerator.DeviceFamilySensingTex: + return &sensingtex.SensingTexHandler{} + case enumerator.DeviceFamilySensitronics: + return &sensitronics.SensitronicsHandler{} + default: + return nil } } -// Deregister subscribers and disconnect when none left -func (handle *Handle) DeregisterSubscriber() { - handle.subscriberCount-- +// concealPassthruDevice returns a copy of the UsbDeviceInfo with the +// "PASSTHRU-" prefix stripped from the Product field, if present. +// +// Allows to mock arbitrary device metadata while using the PassthruReader. Used +// in tools/replay-flex. +func concealPassthruDevice(deviceInfo protocol.UsbDeviceInfo) protocol.UsbDeviceInfo { + const prefix = "PASSTHRU-" + deviceInfo.Product = strings.TrimPrefix(deviceInfo.Product, prefix) + return deviceInfo +} + +// connect to a "validated" device +func (backend *DeviceBackend) connectInternal(matchedDevice enumerator.MatchedDevice) error { + // Only allow one connection change at a time + backend.connectionChangeMutex.Lock() + defer backend.connectionChangeMutex.Unlock() + + device := matchedDevice.Info - if handle.subscriberCount == 0 && handle.cancelCurrentConnection != nil { - handle.cancelCurrentConnection() - handle.cancelCurrentConnection = nil + // in theory we could just look at UsbDeviceInfo.Path, but being defensive + if reflect.DeepEqual(&device, backend.currentDevice) { + backend.log.Info("Ignoring connect request since we are already connected to the same device.") + return nil } -} -// Keep looking for serial devices and connect to them when found, sending signals into the -// callback. -func listeningLoop(ctx context.Context, logger *logrus.Entry, tx chan interface{}, onReceive func([]byte)) { - for { - scanAndConnectSerial(ctx, logger, tx, onReceive) + // disconnect current connection first + backend.Disconnect() - // Terminate if we were cancelled - if ctx.Err() != nil { - return - } + backend.log.WithField("path", device.Path).Info("Attempting to connect with device.") - time.Sleep(2 * time.Second) + ctx, cancel := context.WithCancel(backend.ctx) + + onReceive := func(data []byte) { + backend.broker.TryPub(data, brokerTopicRx) } -} -// One pass of browsing for serial devices and trying to connect to them turn by turn, first -// successful connection wins. -func scanAndConnectSerial(ctx context.Context, logger *logrus.Entry, tx chan interface{}, onReceive func([]byte)) { - ports, err := enumerator.GetDetailedPortsList() + port, err := backend.openSerial(device.Path) if err != nil { - logger.WithField("error", err).Info("Could not list serial devices.") - return + backend.log.WithField("path", device.Path).WithField("error", err).Info("Failed to open connection to serial port.") + return err + } + backend.log.WithField("path", device.Path).Info("Opened serial port.") + reader := deviceFamilyToHandler(matchedDevice.Family) + // should not happen + if reader == nil { + backend.log.WithField("device", matchedDevice).Error("Could not find reader for device!") + port.Close() + return err } + backend.currentDevice = &device - for _, port := range ports { - // Terminate if we have been cancelled - if ctx.Err() != nil { - return - } + _ = context.AfterFunc(ctx, func() { + backend.log.Debug("Cancelling the current connection.") + port.Close() + backend.currentDevice = nil + backend.cancelCurrentConnection = nil + backend.broadcastStatusUpdate() + }) + backend.cancelCurrentConnection = cancel - logger.WithField("name", port.Name).WithField("vendor", port.VID).Debug("Considering serial port.") + backend.broadcastStatusUpdate() - if isFlexLike(port) { - connectSerial(ctx, logger, port.Name, tx, onReceive) - } - } -} + tx := backend.broker.Sub(brokerTopicTx) -// Check whether a port looks like a potential Flex device. -// -// Vendor IDs: -// -// 16C0 - Van Ooijen Technische Informatica (Teensy) -func isFlexLike(port *enumerator.PortDetails) bool { - vendorId := strings.ToUpper(port.VID) + go func() { + defer cancel() + reader.Run(ctx, backend.log, port, tx, onReceive) + }() - return vendorId == "16C0" + return nil } -// Serial communication +func (backend *DeviceBackend) connectToFirstIfNotConnected() { + if backend.cancelCurrentConnection != nil { + // already connected, nothing to do + return + } -type ReaderState int + devices := backend.enumerator.ListMatchingDevices() -const ( - WAITING_FOR_HEADER ReaderState = iota - HEADER_START - HEADER_READ_LENGTH_MSB - HEADER_READ_LENGTH_LSB - WAITING_FOR_BODY - BODY_START - BODY_READ_SAMPLE - UNEXPECTED_BYTE -) + // try devices until the first success + for _, device := range devices { + err := backend.connectInternal(device) + if err == nil { + return + } + } +} -const ( - HEADER_START_MARKER = 'N' - BODY_START_MARKER = 'P' -) +func (backend *DeviceBackend) disableAutoConnect() { + if backend.backgroundScanCancel != nil { + backend.backgroundScanCancel() + backend.backgroundScanCancel = nil + } +} -const ( - // row, column and pressure value, one uint8 each - BYTES_PER_SAMPLE_8BIT = 3 +func (backend *DeviceBackend) enableAutoConnect() { + if backend.backgroundScanCancel == nil { + ctx, cancel := context.WithCancel(backend.ctx) + go backend.backgroundScan(ctx) + backend.backgroundScanCancel = cancel + } +} - // same as above, but pressure value is 2 bytes (uint16), big-endian - // Note: Sensing Tex docs state value max is 2^12-1 (hence "12bit"), - // but in practice they seem to send values up to ~9000, so more like - // "14 bit". - BYTES_PER_SAMPLE_12BIT = 4 -) +func (backend *DeviceBackend) backgroundScan(ctx context.Context) { + ticker := time.NewTicker(backgroundScanIntervalSeconds * time.Second) + defer func() { + backend.log.Info("Stopping background scan and auto-connect") + ticker.Stop() + }() + + backend.log.Info("Background scan and auto-connect started") + + for { + select { + case <-ticker.C: + backend.connectToFirstIfNotConnected() -var BITDEPTH_8_CMD = []byte{'U', 'L', '\n'} + case <-ctx.Done(): + return + } + } -var BITDEPTH_12_CMD = []byte{'U', 'M', '\n'} +} -func isBitdepthCommand(cmd []byte) bool { - return bytes.Equal(cmd, BITDEPTH_8_CMD) || bytes.Equal(cmd, BITDEPTH_12_CMD) +// Check if client has requested manual-connect via a Sec-WebSocket-Protocol +func wantsManualConnect(req *http.Request) bool { + for _, protocol := range gorilla.Subprotocols(req) { + if protocol == "manual-connect" { + return true + } + } + return false } -func bitdepthCommandToBytesPerSample(cmd []byte) int { - if bytes.Equal(cmd, BITDEPTH_8_CMD) { - return BYTES_PER_SAMPLE_8BIT +func (backend *DeviceBackend) RegisterSubscriber(req *http.Request) { + backend.subscriberCount++ + + // If a client has specified manual-connect in WebSocket sub-protocols, + // we disable auto-connect globally. Last-client-wins, meaning that + // if another client connects later without `manual-connect`, then + // auto-connect will be re-enabled. + if wantsManualConnect(req) { + backend.disableAutoConnect() } else { - return BYTES_PER_SAMPLE_12BIT + // backwards compatible setup: auto-connect by default + backend.connectToFirstIfNotConnected() + backend.enableAutoConnect() } } -// Actually attempt to connect to an individual serial port and pipe its signal into the callback, summarizing -// package units into a buffer. -func connectSerial(ctx context.Context, logger *logrus.Entry, serialName string, tx chan interface{}, onReceive func([]byte)) { - mode := &serial.Mode{ - BaudRate: 115200, - Parity: serial.NoParity, - DataBits: 8, - StopBits: serial.OneStopBit, +// Deregister subscribers and disconnect when none left +func (backend *DeviceBackend) DeregisterSubscriber(req *http.Request) { + backend.subscriberCount-- + + if backend.subscriberCount == 0 { + backend.disableAutoConnect() + backend.Disconnect() } +} - logger.WithField("name", serialName).Info("Attempting to connect with serial port.") - port, err := serial.Open(serialName, mode) - if err != nil { - logger.WithField("config", mode).WithField("error", err).Info("Failed to open connection to serial port.") - return +func (backend *DeviceBackend) GetStatus() protocol.Status { + status := protocol.Status{} + + if backend.currentDevice != nil { + status.Address = &backend.currentDevice.Path + newDeviceInfo := protocol.MakeDeviceInfoUsb(concealPassthruDevice(*backend.currentDevice)) + status.DeviceInfo = &newDeviceInfo } - port.ResetInputBuffer() // flush any unread data buffered by the OS + return status +} - readerCtx, readerCtxCancel := context.WithCancel(ctx) +// NOTE: The remaining Driver commands are not currently used in Play for Flex - defer func() { - logger.WithField("name", serialName).Info("Disconnecting from serial port.") - port.Close() - readerCtxCancel() - }() +func (backend *DeviceBackend) lookupDeviceInfo(portName string) *enumerator.MatchedDevice { + devices := backend.enumerator.ListMatchingDevices() + for _, device := range devices { + if device.Info.Path == portName { + return &device + } + } + return nil +} - // For backwards compatibility, we set 8bit depth by default - _, err = port.Write(BITDEPTH_8_CMD) - if err != nil { - logger.WithField("error", err).Info("Failed to set bitdepth of 8.") +// Connect to device using only the address (path, e.g. "/dev/ttyACM0") +// Currently not used in Play +func (backend *DeviceBackend) Connect(address string) { + port := backend.lookupDeviceInfo(address) + if port == nil { + backend.log.WithField("address", address).Error("Could not look up device, aborting Connect.") return + } else { + backend.connectInternal(*port) } - configuredBytesPerSample := BYTES_PER_SAMPLE_8BIT - - // Channel to receive ack that reader is done - readerDoneChan := make(chan struct{}) - // Start the initial reader goroutine - go readFromPort(readerCtx, logger, port, configuredBytesPerSample, onReceive, readerDoneChan) +} - // Forward WebSocket commands to device - for { - select { - case <-ctx.Done(): - return +// Currently not used in Play +func (backend *DeviceBackend) Disconnect() { + if backend.cancelCurrentConnection != nil { + backend.cancelCurrentConnection() + } +} - case <-readerDoneChan: - return +// Currently not used in Play +func (backend *DeviceBackend) Discover(duration int, ctx context.Context) chan protocol.DeviceInfo { + matching := backend.enumerator.ListMatchingDevices() + devices := make(chan protocol.DeviceInfo) - case i := <-tx: - data, _ := i.([]byte) - - if isBitdepthCommand(data) { - newBytesPerSample := bitdepthCommandToBytesPerSample(data) - - // If bytes per sample has changed, we need to restart the reader - if newBytesPerSample != configuredBytesPerSample { - logger.WithFields(logrus.Fields{ - "old": configuredBytesPerSample, - "new": newBytesPerSample, - }).Info("Bytes per sample changed, will restart reader") - - logger.Debug("Sending stop to reader and waiting for ack") - readerCtxCancel() - <-readerDoneChan - logger.Debug("Ack received, reader stopped") - - _, err = port.Write(data) - if err != nil { - logger.WithField("error", err).Info("Failed to write new bitdepth.") - return - } - - port.ResetInputBuffer() // flush any data that was not yet read - - configuredBytesPerSample = newBytesPerSample - - readerCtx, readerCtxCancel = context.WithCancel(ctx) - readerDoneChan = make(chan struct{}) - - // Start a new reader goroutine with updated bytesPerSample - go readFromPort(readerCtx, logger, port, configuredBytesPerSample, onReceive, readerDoneChan) - } - } else { - // For non-bitdepth commands, just forward them - _, err = port.Write(data) - if err != nil { - logger.WithField("error", err).Info("Failed to write command to serial port.") - return - } - logger.WithField("bytes", data).Debug("Wrote binary command to serial out: " + string(data)) + go func(matchedDevices []enumerator.MatchedDevice) { + for _, matchedDevice := range matchedDevices { + // Terminate if we have been cancelled + if ctx.Err() != nil { + break } + + usbDevice := concealPassthruDevice(matchedDevice.Info) + device := protocol.MakeDeviceInfoUsb(usbDevice) + + devices <- device } - } + + close(devices) + }(matching) + return devices } -// Infinite loop for requesting and reading serial data. -// Stops (returns) upon any error or ctx cancel. -func readFromPort( - ctx context.Context, - logger *logrus.Entry, - port serial.Port, - bytesPerSample int, - onReceive func([]byte), - doneChan chan<- struct{}, -) { - defer func() { - // Signal that the reader has completed - close(doneChan) - }() +// not supported +func (backend *DeviceBackend) IsUpdatingFirmware() bool { + return false +} - reader := bufio.NewReader(port) - state := WAITING_FOR_HEADER - var samplesLeftInSet int - var bytesLeftInSample int +// not supported +func (backend *DeviceBackend) ProcessFirmwareUpdateRequest(command protocol.UpdateFirmware, send websocket.SendMsg) { + // noop + return +} - // Note: for Flex v4 this command seems to cause the firmware to push - // data as fast as we can consume it, whereas for Flex v5 it merely - // requests a single frame. - START_MEASUREMENT_CMD := []byte{'S', '\n'} - _, err := port.Write(START_MEASUREMENT_CMD) - if err != nil { - logger.WithField("error", err).Info("Failed to write start message to serial port.") - return +func (backend *DeviceBackend) openSerial(serialName string) (serial.Port, error) { + mode := &serial.Mode{ + BaudRate: 115200, + Parity: serial.NoParity, + DataBits: 8, + StopBits: serial.OneStopBit, } - // Start signal acquisition - var buff []byte - for { - // Terminate if we were cancelled - if ctx.Err() != nil { - logger.Debug("Stopping reader: context cancelled") - return - } - - input, err := reader.ReadByte() - if err != nil { - logger.WithField("err", err).Error("Error reading from serial port") - return - } - - var length_msb byte - - // Finite State Machine for parsing byte stream - switch { - case state == WAITING_FOR_HEADER && input == HEADER_START_MARKER: - state = HEADER_START - case state == HEADER_START && input == '\n': - state = HEADER_READ_LENGTH_MSB - case state == HEADER_READ_LENGTH_MSB: - // The number of measurements in each set is given as two - // consecutive bytes (big-endian). - length_msb = input - state = HEADER_READ_LENGTH_LSB - case state == HEADER_READ_LENGTH_LSB: - length_lsb := input - samplesLeftInSet = int(binary.BigEndian.Uint16([]byte{length_msb, length_lsb})) - state = WAITING_FOR_BODY - case state == WAITING_FOR_BODY && input == BODY_START_MARKER: - state = BODY_START - case state == BODY_START && input == '\n': - state = BODY_READ_SAMPLE - buff = []byte{} - bytesLeftInSample = bytesPerSample - case state == BODY_READ_SAMPLE: - buff = append(buff, input) - bytesLeftInSample = bytesLeftInSample - 1 - - if bytesLeftInSample <= 0 { - samplesLeftInSet = samplesLeftInSet - 1 - - if samplesLeftInSet <= 0 { - // Finish and send set - onReceive(buff) - - // Get ready for next set and request it - state = WAITING_FOR_HEADER - // Optional for Flex v4, mandatory for v5 - _, err = port.Write(START_MEASUREMENT_CMD) - if err != nil { - logger.WithField("error", err).Info("Failed to write poll message to serial port.") - return - } - } else { - // Start next point - bytesLeftInSample = bytesPerSample - } - } - case state == UNEXPECTED_BYTE && input == HEADER_START_MARKER: - // Recover from error state when a new header is seen - state = HEADER_START - default: - state = UNEXPECTED_BYTE - } + port, err := serial.Open(serialName, mode) + if err != nil { + return nil, err } + + return port, nil } diff --git a/src/dividat-driver/flex/websocket.go b/src/dividat-driver/flex/websocket.go deleted file mode 100644 index bbec7c8..0000000 --- a/src/dividat-driver/flex/websocket.go +++ /dev/null @@ -1,130 +0,0 @@ -package flex - -import ( - "context" - "net/http" - "sync" - "time" - - "github.com/gorilla/websocket" - "github.com/sirupsen/logrus" -) - -// WEBSOCKET PROTOCOL - -// Implement net/http Handler interface -func (handle *Handle) ServeHTTP(w http.ResponseWriter, r *http.Request) { - - // Set up logger - var log = handle.log.WithFields(logrus.Fields{ - "clientAddress": r.RemoteAddr, - "userAgent": r.UserAgent(), - }) - - // Update to WebSocket - conn, err := webSocketUpgrader.Upgrade(w, r, nil) - if err != nil { - log.WithError(err).Error("Could not upgrade connection to WebSocket.") - http.Error(w, "WebSocket upgrade error", http.StatusBadRequest) - return - } - - log.Info("WebSocket connection opened") - - // Create a mutex for writing to WebSocket (connection supports only one concurrent reader and one concurrent writer (https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency)) - writeMutex := sync.Mutex{} - - // Create a context for this WebSocket connection - ctx, cancel := context.WithCancel(context.Background()) - - // Send binary data up the WebSocket - sendBinary := func(data []byte) error { - writeMutex.Lock() - conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)) - err := conn.WriteMessage(websocket.BinaryMessage, data) - writeMutex.Unlock() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - log.WithError(err).Error("WebSocket error") - } - return err - } - return nil - } - - // Create channels with data received from SensingTex controller - rx := handle.broker.Sub("flex-rx") - - // send data from device - go rx_data_loop(ctx, rx, sendBinary) - - // Helper function to close the connection - close := func() { - handle.broker.Unsub(rx) - - handle.DeregisterSubscriber() - - // Cancel the context - cancel() - - // Close websocket connection - conn.Close() - - log.Info("Websocket connection closed") - } - - // Start connecting to devices - handle.Connect() - - // Main loop for the WebSocket connection - go func() { - defer close() - for { - - messageType, msg, err := conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - log.WithError(err).Error("WebSocket error") - } - return - } - if messageType == websocket.BinaryMessage { - handle.broker.TryPub(msg, "flex-tx") - } - } - }() - -} - -// HELPERS - -// rx_data_loop reads data from SensingTex and forwards it up the WebSocket -func rx_data_loop(ctx context.Context, rx chan interface{}, send func([]byte) error) { - var err error - for { - select { - case <-ctx.Done(): - return - - case i := <-rx: - data, ok := i.([]byte) - if ok { - err = send(data) - } - } - - if err != nil { - return - } - } -} - -// Helper to upgrade http to WebSocket -var webSocketUpgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - // Check is performed by top-level HTTP middleware, and not repeated here. - return true - }, -} diff --git a/src/dividat-driver/main.go b/src/dividat-driver/main.go index b0ba827..be53a9e 100644 --- a/src/dividat-driver/main.go +++ b/src/dividat-driver/main.go @@ -45,6 +45,7 @@ func (p *program) Start(s service.Service) error { var permissibleOrigins stringList flag.Var(&permissibleOrigins, "permissible-origin", "Permissible origin to make requests to the driver's HTTP endpoints, may be repeated. Default is a list of common Dividat origins.") flag.Parse() + if len(permissibleOrigins) == 0 { permissibleOrigins = defaultOrigins } diff --git a/src/dividat-driver/protocol/main.go b/src/dividat-driver/protocol/main.go new file mode 100644 index 0000000..2f44f7f --- /dev/null +++ b/src/dividat-driver/protocol/main.go @@ -0,0 +1,256 @@ +package protocol + +import ( + "encoding/json" + "errors" + "net" + + "github.com/libp2p/zeroconf/v2" +) + +// DRIVER COMMAND PROTOCOL + +// Command sent by Play +type Command struct { + *GetStatus + + *Connect + *Disconnect + + *Discover + *UpdateFirmware +} + +func PrettyPrintCommand(command Command) string { + if command.GetStatus != nil { + return "GetStatus" + } else if command.Connect != nil { + return "Connect" + } else if command.Disconnect != nil { + return "Disconnect" + } else if command.Discover != nil { + return "Discover" + } else if command.UpdateFirmware != nil { + return "UpdateFirmware" + } + return "Unknown" +} + +// GetStatus command +type GetStatus struct{} + +// Connect command +type Connect struct { + Address string `json:"address"` +} + +// Disconnect command +type Disconnect struct{} + +// Discover command +type Discover struct { + Duration int `json:"duration"` +} + +type UpdateFirmware struct { + SerialNumber string `json:"serialNumber"` + Image string `json:"image"` +} + +// UnmarshalJSON implements encoding/json Unmarshaler interface +func (command *Command) UnmarshalJSON(data []byte) error { + + // Helper struct to get type + temp := struct { + Type string `json:"type"` + }{} + if err := json.Unmarshal(data, &temp); err != nil { + return err + } + + if temp.Type == "GetStatus" { + command.GetStatus = &GetStatus{} + + } else if temp.Type == "Connect" { + err := json.Unmarshal(data, &command.Connect) + if err != nil { + return err + } + + } else if temp.Type == "Disconnect" { + command.Disconnect = &Disconnect{} + + } else if temp.Type == "Discover" { + + err := json.Unmarshal(data, &command.Discover) + if err != nil { + return err + } + + } else if temp.Type == "UpdateFirmware" { + err := json.Unmarshal(data, &command.UpdateFirmware) + if err != nil { + return err + } + } else { + return errors.New("can not decode unknown command") + } + + return nil +} + +// A broadcast is a Message that it sent to all connected clients +type Broadcast struct { + Message Message +} + +func (broadcast *Broadcast) MarshalJSON() ([]byte, error) { + temp := struct { + Type string `json:"type"` + Message Message `json:"message"` + }{} + temp.Type = "Broadcast" + temp.Message = broadcast.Message + + return json.Marshal(&temp) +} + +// Driver Message sent to Play in response to a Command (hence, to a single client) +type Message struct { + *Status + Discovered *DeviceInfo + FirmwareUpdateMessage *FirmwareUpdateMessage +} + +type DeviceType string + +const ( + DeviceTypeSenso DeviceType = "senso" + DeviceTypeFlex DeviceType = "flex" +) + +type DeviceInfo struct { + deviceType DeviceType + tcpDeviceInfo *zeroconf.ServiceEntry // present if DeviceType == senso + usbDeviceInfo *UsbDeviceInfo // present if DeviceType == flex +} + +func MakeDeviceInfoUsb(usbInfo UsbDeviceInfo) DeviceInfo { + return DeviceInfo{ + deviceType: DeviceTypeFlex, + usbDeviceInfo: &usbInfo, + } +} + +func MakeDeviceInfoTcp(tcpInfo zeroconf.ServiceEntry) DeviceInfo { + return DeviceInfo{ + deviceType: DeviceTypeSenso, + tcpDeviceInfo: &tcpInfo, + } +} + +// hand-rolled marshalling, because encode/json does not deal with unexported fields +func (d DeviceInfo) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + DeviceType DeviceType `json:"deviceType"` + TcpDeviceInfo *zeroconf.ServiceEntry `json:"tcpDevice,omitempty"` + UsbDeviceInfo *UsbDeviceInfo `json:"usbDevice,omitempty"` + }{ + DeviceType: d.deviceType, + TcpDeviceInfo: d.tcpDeviceInfo, + UsbDeviceInfo: d.usbDeviceInfo, + }) +} + +type UsbDeviceInfo struct { + Path string `json:"path"` + + IdVendor uint16 `json:"idVendor"` + IdProduct uint16 `json:"idProduct"` + BcdDevice uint16 `json:"bcdDevice"` + + SerialNumber string `json:"serialNumber"` + Manufacturer string `json:"manufacturer"` + Product string `json:"product"` +} + +// Status is a message containing status information +type Status struct { + // ip for Senso, /dev/* path for Flex + Address *string + // optional, currently only used in Flex + DeviceInfo *DeviceInfo +} + +type FirmwareUpdateMessage struct { + FirmwareUpdateProgress *string + FirmwareUpdateSuccess *string + FirmwareUpdateFailure *string +} + +// MarshalJSON ipmlements JSON encoder for messages +func (message *Message) MarshalJSON() ([]byte, error) { + if message.Status != nil { + status := struct { + Type string `json:"type"` + Address *string `json:"address"` + DeviceInfo *DeviceInfo `json:"device"` + }{ + Type: "Status", + Address: message.Status.Address, + DeviceInfo: message.Status.DeviceInfo, + } + return json.Marshal(&status) + + } else if message.Discovered != nil { + serviceEntry := message.Discovered.tcpDeviceInfo + msg := struct { + Type string `json:"type"` + // Senso only, duplicated for backwards compat + ServiceEntry *zeroconf.ServiceEntry `json:"service"` + IP []net.IP `json:"ip"` + // New protocol + DeviceInfo *DeviceInfo `json:"device"` + }{ + Type: "Discovered", + ServiceEntry: serviceEntry, + DeviceInfo: message.Discovered, + } + if serviceEntry != nil { + msg.IP = append(serviceEntry.AddrIPv4, serviceEntry.AddrIPv6...) + } + return json.Marshal(&msg) + + } else if message.FirmwareUpdateMessage != nil { + fwUpdate := struct { + Type string `json:"type"` + Message string `json:"message"` + }{} + + firmwareUpdateMessage := *message.FirmwareUpdateMessage + + if firmwareUpdateMessage.FirmwareUpdateProgress != nil { + + fwUpdate.Type = "FirmwareUpdateProgress" + fwUpdate.Message = *firmwareUpdateMessage.FirmwareUpdateProgress + + } else if firmwareUpdateMessage.FirmwareUpdateFailure != nil { + + fwUpdate.Type = "FirmwareUpdateFailure" + fwUpdate.Message = *firmwareUpdateMessage.FirmwareUpdateFailure + + } else if firmwareUpdateMessage.FirmwareUpdateSuccess != nil { + + fwUpdate.Type = "FirmwareUpdateSuccess" + fwUpdate.Message = *firmwareUpdateMessage.FirmwareUpdateSuccess + + } else { + return nil, errors.New("could not marshal firmware update message") + } + + return json.Marshal(fwUpdate) + } + + return nil, errors.New("could not marshal message") + +} diff --git a/src/dividat-driver/senso/main.go b/src/dividat-driver/senso/main.go index 2b0ce4f..865e3ee 100644 --- a/src/dividat-driver/senso/main.go +++ b/src/dividat-driver/senso/main.go @@ -2,6 +2,7 @@ package senso import ( "context" + "net/http" "sync" "time" @@ -9,81 +10,130 @@ import ( "github.com/sirupsen/logrus" "github.com/dividat/driver/src/dividat-driver/firmware" + "github.com/dividat/driver/src/dividat-driver/protocol" + "github.com/dividat/driver/src/dividat-driver/service" + "github.com/dividat/driver/src/dividat-driver/websocket" ) +// pubsub topic names, must be unique +const brokerTopicRx = "rx" +const brokerTopicTx = "tx" + // Handle for managing Senso type Handle struct { - broker *pubsub.PubSub - - Address *string + websocket.Handle +} +type DeviceBackend struct { ctx context.Context + log *logrus.Entry + + address *string + firmwareUpdate *firmware.Update + + broker *pubsub.PubSub cancelCurrentConnection context.CancelFunc connectionChangeMutex *sync.Mutex +} - firmwareUpdate *firmware.Update +func (backend *DeviceBackend) RegisterSubscriber(r *http.Request) { + // noop + return +} - log *logrus.Entry +func (backend *DeviceBackend) Discover(duration int, ctx context.Context) chan protocol.DeviceInfo { + discoveryCtx, _ := context.WithTimeout(ctx, time.Duration(duration)*time.Second) + // map over the channel to wrap ServiceEntry into DeviceInfo + deviceChan := make(chan protocol.DeviceInfo) + go func() { + for service := range service.Scan(discoveryCtx) { + device := protocol.MakeDeviceInfoTcp(service.ServiceEntry) + deviceChan <- device + } + close(deviceChan) + }() + return deviceChan +} + +func (backend *DeviceBackend) GetStatus() protocol.Status { + return protocol.Status{ + Address: backend.address, + } +} + +func (backend *DeviceBackend) IsUpdatingFirmware() bool { + return backend.firmwareUpdate.IsUpdating() } // New returns an initialized Senso handler func New(ctx context.Context, log *logrus.Entry) *Handle { - handle := Handle{} - - handle.ctx = ctx + backend := DeviceBackend{ + ctx: ctx, + log: log, - handle.log = log + broker: pubsub.New(32), - handle.connectionChangeMutex = &sync.Mutex{} - handle.firmwareUpdate = firmware.InitialUpdateState() + connectionChangeMutex: &sync.Mutex{}, + firmwareUpdate: firmware.InitialUpdateState(), + } - // PubSub broker - handle.broker = pubsub.New(32) + websocketHandle := websocket.Handle{ + DeviceBackend: &backend, + Broker: backend.broker, + BrokerRx: brokerTopicRx, + BrokerTx: brokerTopicTx, + Log: log, + } + handle := Handle{Handle: websocketHandle} // Clean up go func() { <-ctx.Done() - handle.broker.Shutdown() + backend.broker.Shutdown() }() return &handle } +func (backend *DeviceBackend) DeregisterSubscriber(req *http.Request) { + // noop +} + // Connect to a Senso, will create TCP connections to control and data ports -func (handle *Handle) Connect(address string) { +func (backend *DeviceBackend) Connect(address string) { // Only allow one connection change at a time - handle.connectionChangeMutex.Lock() - defer handle.connectionChangeMutex.Unlock() + backend.connectionChangeMutex.Lock() + defer backend.connectionChangeMutex.Unlock() // disconnect current connection first - handle.Disconnect() + backend.Disconnect() - // set address in handle - handle.Address = &address + // set address in backend + backend.address = &address - // Create a child context for a new connection. This allows an individual connection (attempt) to be cancelled without restarting the whole Senso handler - ctx, cancel := context.WithCancel(handle.ctx) + // Create a child context for a new connection. This allows an individual connection (attempt) to be cancelled without restarting the whole Senso backendr + ctx, cancel := context.WithCancel(backend.ctx) - handle.log.WithField("address", address).Info("Attempting to connect with Senso.") + backend.log.WithField("address", address).Info("Attempting to connect with Senso.") onReceive := func(data []byte) { - handle.broker.TryPub(data, "rx") + backend.broker.TryPub(data, brokerTopicRx) } - go connectTCP(ctx, handle.log.WithField("channel", "data"), address+":55568", handle.broker.Sub("noTx"), onReceive) + go connectTCP(ctx, backend.log.WithField("channel", "data"), address+":55568", backend.broker.Sub("noTx"), onReceive) time.Sleep(1000 * time.Millisecond) - go connectTCP(ctx, handle.log.WithField("channel", "control"), address+":55567", handle.broker.Sub("tx"), onReceive) + go connectTCP(ctx, backend.log.WithField("channel", "control"), address+":55567", backend.broker.Sub(brokerTopicTx), onReceive) - handle.cancelCurrentConnection = cancel + backend.cancelCurrentConnection = cancel } // Disconnect from current connection -func (handle *Handle) Disconnect() { - if handle.cancelCurrentConnection != nil { - handle.log.Info("Disconnecting from Senso.") - handle.cancelCurrentConnection() - handle.Address = nil +func (backend *DeviceBackend) Disconnect() { + if backend.cancelCurrentConnection != nil { + backend.log.Info("Disconnecting from Senso.") + backend.cancelCurrentConnection() + backend.address = nil } } diff --git a/src/dividat-driver/senso/update_firmware.go b/src/dividat-driver/senso/update_firmware.go index b68bddd..7ce6584 100644 --- a/src/dividat-driver/senso/update_firmware.go +++ b/src/dividat-driver/senso/update_firmware.go @@ -8,41 +8,37 @@ import ( "io" "github.com/dividat/driver/src/dividat-driver/firmware" + "github.com/dividat/driver/src/dividat-driver/protocol" + "github.com/dividat/driver/src/dividat-driver/websocket" ) -type SendMsg struct { - progress func(string) - failure func(string) - success func(string) -} - // Disconnect from current connection -func (handle *Handle) ProcessFirmwareUpdateRequest(command UpdateFirmware, send SendMsg) { - handle.log.Info("Processing firmware update request.") - handle.firmwareUpdate.SetUpdating(true) +func (backend *DeviceBackend) ProcessFirmwareUpdateRequest(command protocol.UpdateFirmware, send websocket.SendMsg) { + backend.log.Info("Processing firmware update request.") + backend.firmwareUpdate.SetUpdating(true) - if handle.cancelCurrentConnection != nil { - send.progress("Disconnecting from the Senso") - handle.cancelCurrentConnection() + if backend.cancelCurrentConnection != nil { + send.Progress("Disconnecting from the Senso") + backend.cancelCurrentConnection() } image, err := decodeImage(command.Image) if err != nil { msg := fmt.Sprintf("Error decoding base64 string: %v", err) - send.failure(msg) - handle.log.Error(msg) + send.Failure(msg) + backend.log.Error(msg) return } - err = firmware.UpdateBySerial(context.Background(), command.SerialNumber, image, send.progress) + err = firmware.UpdateBySerial(context.Background(), command.SerialNumber, image, send.Progress) if err != nil { failureMsg := fmt.Sprintf("Failed to update firmware: %v", err) - send.failure(failureMsg) - handle.log.Error(failureMsg) + send.Failure(failureMsg) + backend.log.Error(failureMsg) } else { - send.success("Firmware successfully transmitted") + send.Success("Firmware successfully transmitted") } - handle.firmwareUpdate.SetUpdating(false) + backend.firmwareUpdate.SetUpdating(false) } func decodeImage(base64Str string) (io.Reader, error) { diff --git a/src/dividat-driver/senso/websocket.go b/src/dividat-driver/senso/websocket.go deleted file mode 100644 index 98b4710..0000000 --- a/src/dividat-driver/senso/websocket.go +++ /dev/null @@ -1,417 +0,0 @@ -package senso - -import ( - "context" - "encoding/json" - "errors" - "net" - "net/http" - "sync" - "time" - - "github.com/gorilla/websocket" - "github.com/libp2p/zeroconf/v2" - "github.com/sirupsen/logrus" - - "github.com/dividat/driver/src/dividat-driver/service" -) - -// WEBSOCKET PROTOCOL - -// Command sent by Play -type Command struct { - *GetStatus - - *Connect - *Disconnect - - *Discover - *UpdateFirmware -} - -func prettyPrintCommand(command Command) string { - if command.GetStatus != nil { - return "GetStatus" - } else if command.Connect != nil { - return "Connect" - } else if command.Disconnect != nil { - return "Disconnect" - } else if command.Discover != nil { - return "Discover" - } else if command.UpdateFirmware != nil { - return "UpdateFirmware" - } - return "Unknown" -} - -// GetStatus command -type GetStatus struct{} - -// Connect command -type Connect struct { - Address string `json:"address"` -} - -// Disconnect command -type Disconnect struct{} - -// Discover command -type Discover struct { - Duration int `json:"duration"` -} - -type UpdateFirmware struct { - SerialNumber string `json:"serialNumber"` - Image string `json:"image"` -} - -// UnmarshalJSON implements encoding/json Unmarshaler interface -func (command *Command) UnmarshalJSON(data []byte) error { - - // Helper struct to get type - temp := struct { - Type string `json:"type"` - }{} - if err := json.Unmarshal(data, &temp); err != nil { - return err - } - - if temp.Type == "GetStatus" { - command.GetStatus = &GetStatus{} - - } else if temp.Type == "Connect" { - err := json.Unmarshal(data, &command.Connect) - if err != nil { - return err - } - - } else if temp.Type == "Disconnect" { - command.Disconnect = &Disconnect{} - - } else if temp.Type == "Discover" { - - err := json.Unmarshal(data, &command.Discover) - if err != nil { - return err - } - - } else if temp.Type == "UpdateFirmware" { - err := json.Unmarshal(data, &command.UpdateFirmware) - if err != nil { - return err - } - } else { - return errors.New("can not decode unknown command") - } - - return nil -} - -// Message that can be sent to Play -type Message struct { - *Status - Discovered *zeroconf.ServiceEntry - FirmwareUpdateMessage *FirmwareUpdateMessage -} - -// Status is a message containing status information -type Status struct { - Address *string -} - -type FirmwareUpdateMessage struct { - FirmwareUpdateProgress *string - FirmwareUpdateSuccess *string - FirmwareUpdateFailure *string -} - -// MarshalJSON ipmlements JSON encoder for messages -func (message *Message) MarshalJSON() ([]byte, error) { - if message.Status != nil { - return json.Marshal(&struct { - Type string `json:"type"` - Address *string `json:"address"` - }{ - Type: "Status", - Address: message.Status.Address, - }) - - } else if message.Discovered != nil { - return json.Marshal(&struct { - Type string `json:"type"` - ServiceEntry *zeroconf.ServiceEntry `json:"service"` - IP []net.IP `json:"ip"` - }{ - Type: "Discovered", - ServiceEntry: message.Discovered, - IP: append(message.Discovered.AddrIPv4, message.Discovered.AddrIPv6...), - }) - - } else if message.FirmwareUpdateMessage != nil { - fwUpdate := struct { - Type string `json:"type"` - Message string `json:"message"` - }{} - - firmwareUpdateMessage := *message.FirmwareUpdateMessage - - if firmwareUpdateMessage.FirmwareUpdateProgress != nil { - - fwUpdate.Type = "FirmwareUpdateProgress" - fwUpdate.Message = *firmwareUpdateMessage.FirmwareUpdateProgress - - } else if firmwareUpdateMessage.FirmwareUpdateFailure != nil { - - fwUpdate.Type = "FirmwareUpdateFailure" - fwUpdate.Message = *firmwareUpdateMessage.FirmwareUpdateFailure - - } else if firmwareUpdateMessage.FirmwareUpdateSuccess != nil { - - fwUpdate.Type = "FirmwareUpdateSuccess" - fwUpdate.Message = *firmwareUpdateMessage.FirmwareUpdateSuccess - - } else { - return nil, errors.New("could not marshal firmware update message") - } - - return json.Marshal(fwUpdate) - } - - return nil, errors.New("could not marshal message") - -} - -// Implement net/http Handler interface -func (handle *Handle) ServeHTTP(w http.ResponseWriter, r *http.Request) { - - // Set up logger - var log = handle.log.WithFields(logrus.Fields{ - "clientAddress": r.RemoteAddr, - "userAgent": r.UserAgent(), - }) - - // Update to WebSocket - conn, err := webSocketUpgrader.Upgrade(w, r, nil) - if err != nil { - log.WithError(err).Error("Could not upgrade connection to WebSocket.") - http.Error(w, "WebSocket upgrade error", http.StatusBadRequest) - return - } - - log.Info("WebSocket connection opened") - - // Create a mutex for writing to WebSocket (connection supports only one concurrent reader and one concurrent writer (https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency)) - writeMutex := sync.Mutex{} - - // Create a context for this WebSocket connection - ctx, cancel := context.WithCancel(context.Background()) - - // Send binary data up the WebSocket - sendBinary := func(data []byte) error { - writeMutex.Lock() - conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)) - err := conn.WriteMessage(websocket.BinaryMessage, data) - writeMutex.Unlock() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - log.WithError(err).Error("WebSocket error") - } - return err - } - return nil - } - - // send messgae up the WebSocket - sendMessage := func(message Message) error { - writeMutex.Lock() - conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)) - err := conn.WriteJSON(&message) - writeMutex.Unlock() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - log.WithError(err).Error("WebSocket error") - } - return err - } - return nil - } - - // Create channels with data received from Senso - rx := handle.broker.Sub("rx") - - // send data from Control and Data channel - go rx_data_loop(ctx, rx, sendBinary) - - // Helper function to close the connection - close := func() { - // Unsubscribe from broker - handle.broker.Unsub(rx) - - // Cancel the context - cancel() - - // Close websocket connection - conn.Close() - - log.Info("Websocket connection closed") - } - - // Main loop for the WebSocket connection - go func() { - defer close() - for { - - messageType, msg, err := conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - log.WithError(err).Error("WebSocket error") - } - return - } - - if messageType == websocket.BinaryMessage { - - if handle.firmwareUpdate.IsUpdating() { - handle.log.Debug("Ignoring Senso command during firmware update.") - continue - } - - handle.broker.TryPub(msg, "tx") - - } else if messageType == websocket.TextMessage { - - var command Command - decodeErr := json.Unmarshal(msg, &command) - if decodeErr != nil { - log.WithField("rawCommand", msg).WithError(decodeErr).Warning("Can not decode command.") - continue - } - log.WithField("command", prettyPrintCommand(command)).Debug("Received command.") - - if handle.firmwareUpdate.IsUpdating() && (command.GetStatus == nil || command.Discover == nil) { - log.WithField("command", prettyPrintCommand(command)).Debug("Ignoring command during firmware update.") - continue - } - - err := handle.dispatchCommand(ctx, log, command, sendMessage) - if err != nil { - return - } - } - - } - }() - -} - -// HELPERS - -// dispatchCommand handles incomming commands and sends responses back up the WebSocket -func (handle *Handle) dispatchCommand(ctx context.Context, log *logrus.Entry, command Command, sendMessage func(Message) error) error { - - if command.GetStatus != nil { - - var message Message - - message.Status = &Status{Address: handle.Address} - - err := sendMessage(message) - - if err != nil { - return err - } - - } else if command.Connect != nil { - handle.Connect(command.Connect.Address) - return nil - - } else if command.Disconnect != nil { - handle.Disconnect() - return nil - - } else if command.Discover != nil { - - discoveryCtx, _ := context.WithTimeout(ctx, time.Duration(command.Discover.Duration)*time.Second) - - entries := service.Scan(discoveryCtx) - - go func(entries chan service.Service) { - for entry := range entries { - log.WithField("service", entry).Debug("Discovered service.") - - var message Message - message.Discovered = &entry.ServiceEntry - - err := sendMessage(message) - if err != nil { - return - } - - } - log.Debug("Discovery finished.") - }(entries) - - return nil - - } else if command.UpdateFirmware != nil { - go handle.ProcessFirmwareUpdateRequest(*command.UpdateFirmware, SendMsg{ - progress: func(msg string) { - sendMessage(firmwareUpdateProgress(msg)) - }, - failure: func(msg string) { - sendMessage(firmwareUpdateFailure(msg)) - }, - success: func(msg string) { - sendMessage(firmwareUpdateSuccess(msg)) - }, - }) - } - return nil -} - -func firmwareUpdateSuccess(msg string) Message { - return firmwareUpdateMessage(FirmwareUpdateMessage{FirmwareUpdateSuccess: &msg}) -} - -func firmwareUpdateFailure(msg string) Message { - return firmwareUpdateMessage(FirmwareUpdateMessage{FirmwareUpdateFailure: &msg}) -} - -func firmwareUpdateProgress(msg string) Message { - return firmwareUpdateMessage(FirmwareUpdateMessage{FirmwareUpdateProgress: &msg}) -} - -func firmwareUpdateMessage(msg FirmwareUpdateMessage) Message { - return Message{FirmwareUpdateMessage: &msg} -} - -// rx_data_loop reads data from Senso and forwards it up the WebSocket -func rx_data_loop(ctx context.Context, rx chan interface{}, send func([]byte) error) { - var err error - for { - select { - case <-ctx.Done(): - return - - case i := <-rx: - data, ok := i.([]byte) - if ok { - err = send(data) - } - } - - if err != nil { - return - } - } -} - -// Helper to upgrade http to WebSocket -var webSocketUpgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - // Check is performed by top-level HTTP middleware, and not repeated here. - return true - }, -} diff --git a/src/dividat-driver/server/main.go b/src/dividat-driver/server/main.go index 94f5bfe..51bcfeb 100644 --- a/src/dividat-driver/server/main.go +++ b/src/dividat-driver/server/main.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus" "github.com/dividat/driver/src/dividat-driver/flex" + "github.com/dividat/driver/src/dividat-driver/flex/enumerator/mockdev" "github.com/dividat/driver/src/dividat-driver/logging" "github.com/dividat/driver/src/dividat-driver/rfid" "github.com/dividat/driver/src/dividat-driver/senso" @@ -55,8 +56,11 @@ func Start(logger *logrus.Logger, origins []string) context.CancelFunc { sensoHandle := senso.New(ctx, baseLog.WithField("package", "senso")) http.Handle("/senso", originMiddleware(origins, baseLog, sensoHandle)) - // Setup SensingTex reader - flexHandle := flex.New(ctx, baseLog.WithField("package", "flex")) + // Setup Flex reader + mockDeviceRegistry := mockdev.New(baseLog.WithField("package", "flex.enumerator.mockdev")) + http.Handle("/flex/mock", http.RedirectHandler("/flex/mock/", http.StatusMovedPermanently)) + http.Handle("/flex/mock/", http.StripPrefix("/flex/mock", mockDeviceRegistry)) + flexHandle := flex.New(ctx, baseLog.WithField("package", "flex"), mockDeviceRegistry) http.Handle("/flex", originMiddleware(origins, baseLog, flexHandle)) // Setup RFID scanner diff --git a/src/dividat-driver/util/main.go b/src/dividat-driver/util/main.go new file mode 100644 index 0000000..6a2b38b --- /dev/null +++ b/src/dividat-driver/util/main.go @@ -0,0 +1,6 @@ +package util + +// return pointer to value, useful when handling const +func PointerTo[T any](val T) *T { + return &val +} diff --git a/src/dividat-driver/websocket/main.go b/src/dividat-driver/websocket/main.go new file mode 100644 index 0000000..fef0a96 --- /dev/null +++ b/src/dividat-driver/websocket/main.go @@ -0,0 +1,325 @@ +// common parts to senso.websocket and flex.websocket +package websocket + +import ( + "context" + "encoding/json" + "net/http" + "sync" + "time" + + "github.com/cskr/pubsub" + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" + + "github.com/dividat/driver/src/dividat-driver/protocol" +) + +type SendMsg struct { + Progress func(string) + Failure func(string) + Success func(string) +} + +type DeviceBackend interface { + GetStatus() protocol.Status + Discover(duration int, ctx context.Context) chan protocol.DeviceInfo + Connect(address string) + Disconnect() + RegisterSubscriber(r *http.Request) + DeregisterSubscriber(r *http.Request) + ProcessFirmwareUpdateRequest(command protocol.UpdateFirmware, send SendMsg) + IsUpdatingFirmware() bool +} + +type Handle struct { + Broker *pubsub.PubSub + BrokerRx string + BrokerTx string + // Flex-only for now + BrokerRxBroadcast *string + + Log *logrus.Entry + + DeviceBackend DeviceBackend +} + +func (handle *Handle) ServeHTTP(w http.ResponseWriter, r *http.Request) { + + // Set up logger + var log = handle.Log.WithFields(logrus.Fields{ + "clientAddress": r.RemoteAddr, + "userAgent": r.UserAgent(), + }) + + // Update to WebSocket + conn, err := webSocketUpgrader.Upgrade(w, r, nil) + if err != nil { + log.WithError(err).Error("Could not upgrade connection to WebSocket.") + http.Error(w, "WebSocket upgrade error", http.StatusBadRequest) + return + } + + log.Info("WebSocket connection opened") + + // Create a mutex for writing to WebSocket (connection supports only one concurrent reader and one concurrent writer (https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency)) + writeMutex := sync.Mutex{} + + // Create a context for this WebSocket connection + ctx, cancel := context.WithCancel(context.Background()) + + // Send binary data up the WebSocket + sendBinary := func(data []byte) error { + writeMutex.Lock() + conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)) + err := conn.WriteMessage(websocket.BinaryMessage, data) + writeMutex.Unlock() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + log.WithError(err).Error("WebSocket error") + } + return err + } + return nil + } + + // send serialized JSON data up the WebSocket as a text message + sendText := func(message []byte) error { + writeMutex.Lock() + conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)) + err := conn.WriteMessage(websocket.TextMessage, message) + writeMutex.Unlock() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + log.WithError(err).Error("WebSocket error") + } + return err + } + return nil + } + + sendMessage := func(message protocol.Message) error { + msg, err := message.MarshalJSON() + if err != nil { + return err + } + return sendText(msg) + } + + sendBroadcast := func(broadcast protocol.Broadcast) error { + msg, err := broadcast.MarshalJSON() + if err != nil { + return err + } + return sendText(msg) + } + + handle.DeviceBackend.RegisterSubscriber(r) + + // Create channels with data received from device + rx := handle.Broker.Sub(handle.BrokerRx) + + // forward data and controls from device to client + go rx_data_loop(ctx, rx, sendBinary) + + // forward DriverBackend broadcast events to client + var rxBroadcast chan interface{} + if handle.BrokerRxBroadcast != nil { + rxBroadcast = handle.Broker.Sub(*handle.BrokerRxBroadcast) + go rx_broadcast_loop(ctx, rxBroadcast, sendBroadcast) + } + + // Helper function to close the connection + close := func() { + // Unsubscribe from broker + handle.Broker.Unsub(rx) + if rxBroadcast != nil { + handle.Broker.Unsub(rxBroadcast) + } + + handle.DeviceBackend.DeregisterSubscriber(r) + + // Cancel the context + cancel() + + // Close websocket connection + conn.Close() + + log.Info("Websocket connection closed") + } + + // Main loop for the WebSocket connection + go func() { + defer close() + for { + + messageType, msg, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + log.WithError(err).Error("WebSocket error") + } + return + } + + if messageType == websocket.BinaryMessage { + + if handle.DeviceBackend.IsUpdatingFirmware() { + handle.Log.Debug("Ignoring device command during firmware update.") + continue + } + + handle.Broker.TryPub(msg, handle.BrokerTx) + + } else if messageType == websocket.TextMessage { + + var command protocol.Command + decodeErr := json.Unmarshal(msg, &command) + if decodeErr != nil { + log.WithField("rawCommand", msg).WithError(decodeErr).Warning("Can not decode command.") + continue + } + log.WithField("command", protocol.PrettyPrintCommand(command)).Debug("Received command.") + + if handle.DeviceBackend.IsUpdatingFirmware() && (command.GetStatus == nil || command.Discover == nil) { + log.WithField("command", protocol.PrettyPrintCommand(command)).Debug("Ignoring command during firmware update.") + continue + } + + err := handle.dispatchCommand(ctx, log, command, sendMessage) + if err != nil { + return + } + } + + } + }() + +} + +// HELPERS + +// dispatchCommand handles incomming commands and sends responses back up the WebSocket +func (handle *Handle) dispatchCommand(ctx context.Context, log *logrus.Entry, command protocol.Command, sendMessage func(protocol.Message) error) error { + + if command.GetStatus != nil { + status := handle.DeviceBackend.GetStatus() + message := protocol.Message{Status: &status} + err := sendMessage(message) + + if err != nil { + return err + } + + } else if command.Connect != nil { + handle.DeviceBackend.Connect(command.Connect.Address) + return nil + + } else if command.Disconnect != nil { + handle.DeviceBackend.Disconnect() + return nil + + } else if command.Discover != nil { + entries := handle.DeviceBackend.Discover(command.Discover.Duration, ctx) + + go func(entries chan protocol.DeviceInfo) { + for entry := range entries { + log.WithField("service", entry).Debug("Discovered service.") + + var message protocol.Message + message.Discovered = &entry + + err := sendMessage(message) + if err != nil { + return + } + + } + log.Debug("Discovery finished.") + }(entries) + + return nil + + } else if command.UpdateFirmware != nil { + go handle.DeviceBackend.ProcessFirmwareUpdateRequest(*command.UpdateFirmware, SendMsg{ + Progress: func(msg string) { + sendMessage(firmwareUpdateProgress(msg)) + }, + Failure: func(msg string) { + sendMessage(firmwareUpdateFailure(msg)) + }, + Success: func(msg string) { + sendMessage(firmwareUpdateSuccess(msg)) + }, + }) + } + return nil +} + +func firmwareUpdateSuccess(msg string) protocol.Message { + return firmwareUpdateMessage(protocol.FirmwareUpdateMessage{FirmwareUpdateSuccess: &msg}) +} + +func firmwareUpdateFailure(msg string) protocol.Message { + return firmwareUpdateMessage(protocol.FirmwareUpdateMessage{FirmwareUpdateFailure: &msg}) +} + +func firmwareUpdateProgress(msg string) protocol.Message { + return firmwareUpdateMessage(protocol.FirmwareUpdateMessage{FirmwareUpdateProgress: &msg}) +} + +func firmwareUpdateMessage(msg protocol.FirmwareUpdateMessage) protocol.Message { + return protocol.Message{FirmwareUpdateMessage: &msg} +} + +// rx_broadcast_loop reads events from DeviceBackend and forwards them to the WebSocket +func rx_broadcast_loop(ctx context.Context, rx chan interface{}, send func(protocol.Broadcast) error) { + var err error + for { + select { + case <-ctx.Done(): + return + + case msg := <-rx: + data, ok := msg.(protocol.Broadcast) + if ok { + err = send(data) + } + } + + if err != nil { + return + } + } +} + +// rx_data_loop reads data from device and forwards it up the WebSocket +func rx_data_loop(ctx context.Context, rx chan interface{}, send func([]byte) error) { + var err error + for { + select { + case <-ctx.Done(): + return + + case i := <-rx: + data, ok := i.([]byte) + if ok { + err = send(data) + } + } + + if err != nil { + return + } + } +} + +// Helper to upgrade http to WebSocket +var webSocketUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + Subprotocols: []string{"manual-connect"}, // for Flex + CheckOrigin: func(r *http.Request) bool { + // Check is performed by top-level HTTP middleware, and not repeated here. + return true + }, +} diff --git a/test/flex/helpers.js b/test/flex/helpers.js new file mode 100644 index 0000000..cf798fa --- /dev/null +++ b/test/flex/helpers.js @@ -0,0 +1,143 @@ +const { wait } = require("../utils"); + +async function waitForEndpoint(url, maxAttempts = 5, delay = 100) { + let lastError = null; + for (let i = 0; i < maxAttempts; i++) { + try { + // note: not connecting to websocket to avoid messing with test state by having a client + const res = await fetch(url); + return; + } catch (e) { + lastError = e; + await wait(delay); + } + } + throw new Error(`Endpoint ${url} not available after ${maxAttempts} attempts, last error: ${e}`); +} + +/** + * Generate a single synthetic Flex serial data frame. + * + * Protocol format: + * - Header: 'N' + '\n' + length_msb + length_lsb (big-endian uint16) + * - Body: 'P' + '\n' + samples... + * - Each sample (8-bit): row (1 byte) + col (1 byte) + pressure (1 byte) + * - Each sample (12-bit): row (1 byte) + col (1 byte) + pressure (2 bytes big-endian) + * + * Each frame contains two points: + * Point 1: (n, 1, n*2+1) - row=n, col=1, pressure=n*2+1 + * Point 2: (1, n, n*3+1) - row=1, col=n, pressure=n*3+1 + * + * @param {number} n - Frame index (0-23) + * @param {number} [bitDepth=12] - Bit depth: 8 for v4, 12 for v5 + * @returns {Buffer} - The serial data for one frame + */ +function generateFlexSerialFrame(n, bitDepth = 12) { + const numSamples = 2; + + // Header: 'N' + '\n' + length (2 bytes big-endian) + const header = Buffer.from("N\n"); + const length = Buffer.alloc(2); + length.writeUInt16BE(numSamples); + + const bodyStart = Buffer.from("P\n"); + + const bytesPerSample = bitDepth === 8 ? 3 : 4; + + // Sample 1: (n, 1, n*2+1) + const sample1 = Buffer.alloc(bytesPerSample); + sample1[0] = n; // row + sample1[1] = 1; // col + const pressure1 = n * 2 + 1; + if (bitDepth === 8) { + sample1[2] = pressure1 & 0xff; + } else { + sample1.writeUInt16BE(pressure1, 2); + } + + // Sample 2: (1, n, n*3+1) + const sample2 = Buffer.alloc(bytesPerSample); + sample2[0] = 1; // row + sample2[1] = n; // col + const pressure2 = n * 3 + 1; + if (bitDepth === 8) { + sample2[2] = pressure2 & 0xff; + } else { + sample2.writeUInt16BE(pressure2, 2); + } + + return Buffer.concat([header, length, bodyStart, sample1, sample2]); +} + +/** + * Generate a single synthetic Sensitronics serial data frame. + * + * Protocol format (TLV): + * - 0xFF: frame start marker (1 byte) + * - MESSAGE_TYPE: uint8 (1 byte) + * - MESSAGE_LENGTH: uint16 little-endian (2 bytes) + * - MESSAGE_VALUE: variable-length data (MESSAGE_LENGTH bytes) + * + * @param {number} messageType - The message type (0-255) + * @param {Buffer} data - The message payload + * @returns {Buffer} - The complete frame + */ +function generateSensitronicsFrame(messageType, data) { + const header = Buffer.alloc(4); + header[0] = 0xff; // start marker + header[1] = messageType & 0xff; // message type + header.writeUInt16LE(data.length, 2); // message length + + return Buffer.concat([header, data]); +} + +/** + * Generate a Sensitronics frame with random message type and random data. + * + * @param {number} [maxDataLength=100] - Maximum length of random data + * @returns {Buffer} - The complete frame + */ +function generateRandomSensitronicsFrame(maxDataLength = 100) { + const messageType = Math.floor(Math.random() * 256); + const dataLength = Math.floor(Math.random() * maxDataLength); + const data = Buffer.alloc(dataLength); + for (let i = 0; i < dataLength; i++) { + data[i] = Math.floor(Math.random() * 256); + } + + return generateSensitronicsFrame(messageType, data); +} + +/** + * Split a buffer into random-sized chunks. + * Useful for simulating fragmented serial transmission. + * + * @param {Buffer} buffer - The buffer to split + * @param {number} [minChunkSize=1] - Minimum chunk size + * @param {number} [maxChunkSize=10] - Maximum chunk size + * @returns {Buffer[]} - Array of buffer chunks + */ +function splitBufferRandomly(buffer, minChunkSize = 1, maxChunkSize = 10) { + const chunks = []; + let offset = 0; + + while (offset < buffer.length) { + const remaining = buffer.length - offset; + const chunkSize = Math.min( + remaining, + Math.floor(Math.random() * (maxChunkSize - minChunkSize + 1)) + minChunkSize + ); + chunks.push(buffer.subarray(offset, offset + chunkSize)); + offset += chunkSize; + } + + return chunks; +} + +module.exports = { + waitForEndpoint, + generateFlexSerialFrame, + generateSensitronicsFrame, + generateRandomSensitronicsFrame, + splitBufferRandomly, +}; diff --git a/test/flex/index.js b/test/flex/index.js new file mode 100644 index 0000000..0c55618 --- /dev/null +++ b/test/flex/index.js @@ -0,0 +1,596 @@ +const { wait, startDriver, connectWS, expectEvent } = require("../utils"); +const expect = require("chai").expect; +const VirtualDevice = require("./mock/VirtualDevice"); +const path = require("path"); +const { + waitForEndpoint, + generateFlexSerialFrame, + generateRandomSensitronicsFrame, + splitBufferRandomly, +} = require("./helpers"); + +function expectMessageType(ws, msgType) { + return expectEvent(ws, "message", (s) => { + const msg = JSON.parse(s); + return msg.type === msgType; + }).then(JSON.parse); +}; + +function sendCmd(ws, cmd) { + return ws.send(JSON.stringify(cmd)); +} + +function expectCmdReply(ws, cmd, replyType, replyCheck) { + const replyPromise = expectMessageType(ws, replyType); + sendCmd(ws, cmd); + + return replyPromise.then(replyCheck) +} + +function expectStatusReply(ws, replyCheck) { + return expectCmdReply(ws, { type: "GetStatus" }, "Status", replyCheck); +} + +function expectBroadcast(ws, check) { + return expectMessageType(ws, "Broadcast").then(check) +} + +// Connects to the WebSocket and verifies driver is connected to the device. +async function connectAndVerifyWS(device) { + const flexWS = await connectWS("ws://127.0.0.1:8382/flex"); + + await wait(30); + + expectStatusReply(flexWS, (status) => { + expect(status.address).to.be.equal(device.address); + }); + + return flexWS; +} + +async function withDeviceAndClient(deviceConfig) { + const virtualDevice = new VirtualDevice(deviceConfig); + await virtualDevice.initialize(); + + await virtualDevice.registerWithDriver("http://127.0.0.1:8382"); + expect(virtualDevice.isRegistered()).to.be.true; + + const flexWS = await connectAndVerifyWS(virtualDevice); + + return [virtualDevice, flexWS]; +} + +describe("Flex functionality", () => { + var driver; + + beforeEach(async () => { + var code = 0; + driver = startDriver().on("exit", (c) => { + code = c; + }); + + await waitForEndpoint("http://127.0.0.1:8382/flex"); + expect(code).to.be.equal(0); + driver.removeAllListeners(); + }); + + afterEach(async () => { + driver.kill(); + }); + + describe("Generic features (PASSTHRU device/reader)", () => { + var virtualDevice; + + beforeEach(async () => { + virtualDevice = new VirtualDevice({ + idVendor: "16c0", + product: "PASSTHRU", + }); + await virtualDevice.initialize(); + }); + + it("Commands: Connect and GetStatus", async function () { + this.timeout(3000); + + await virtualDevice.registerWithDriver("http://127.0.0.1:8382"); + expect(virtualDevice.isRegistered()).to.be.true; + + await wait(20); + + // Connect flex endpoint client + const flexWS = await connectWS("ws://127.0.0.1:8382/flex", { }, [ "manual-connect" ]); + + // Drive should not auto-connect since manual-connect is specified + await expectStatusReply(flexWS, (statusAfterRegistration) => { + expect(statusAfterRegistration.address).to.be.null; + expect(statusAfterRegistration.device).to.be.null; + }); + + // Send command to connect to the virtual device + const cmd = { + type: "Connect", + address: virtualDevice.address, + }; + sendCmd(flexWS, cmd); + await expectStatusReply(flexWS, (statusAfterConnect) => { + expect(statusAfterConnect.address).to.be.equal(virtualDevice.address); + expect(statusAfterConnect.device.deviceType).to.be.equal("flex"); + expect(statusAfterConnect.device.usbDevice.serialNumber).to.be.equal(virtualDevice.serialNumber); + }); + }); + + it("Commands: Discover", async function () { + const virtualDevice1 = virtualDevice; + + // Second virtual Flex device + const virtualDevice2 = new VirtualDevice({ + idVendor: "16c0", + manufacturer: "SecondVendor", + product: "PASSTHRU", + }); + await virtualDevice2.initialize(); + + // + const virtualDeviceIgnored = new VirtualDevice({ + idVendor: "14f2", + manufacturer: "IgnoreMe", + product: "NotAFlex", + }); + await virtualDeviceIgnored.initialize(); + + // Connect flex endpoint client + const flexWS = await connectWS("ws://127.0.0.1:8382/flex"); + + const discovered = new Promise((resolve, reject) => { + const values = []; + flexWS.on("message", (a) => { + const msg = JSON.parse(a); + expect(msg.type).to.equal("Discovered") + values.push(msg); + if (values.length === 2) { + resolve(values); + } + }); + }); + + await virtualDevice1.registerWithDriver("http://127.0.0.1:8382"); + await virtualDevice2.registerWithDriver("http://127.0.0.1:8382"); + await virtualDeviceIgnored.registerWithDriver("http://127.0.0.1:8382"); + + sendCmd(flexWS, { + type: 'Discover', + duration: 5 + }); + + const devices = await discovered; + + expect(devices).to.have.length(2); + + const receivedFields = devices.map((d) => { + return { + path: d.device.usbDevice.path, + manufacturer: d.device.usbDevice.manufacturer, + product: d.device.usbDevice.product + } + }); + const actualFields = [virtualDevice1, virtualDevice2].map((d) => { + return { + path: d.address, + manufacturer: d.manufacturer, + product: d.product + } + }); + expect(receivedFields).to.have.deep.members(actualFields); + }); + + it("Multiple clients can Connect and GetStatus at the same time", async function () { + // obviously not a complete test, but sufficient to detect certain hiccups in the driver + this.timeout(2000); + + await virtualDevice.registerWithDriver("http://127.0.0.1:8382"); + + const clients = [...Array(5).keys()].map((_) => { + return connectWS("ws://127.0.0.1:8382/flex").then((ws) => { + return expectStatusReply(ws, (status) => { + expect(status.address).to.be.equal(virtualDevice.address); + }); + }); + }); + + await Promise.all(clients); + }); + + it("Broadcasts: Status on Connect and Disconnect ", async function () { + this.timeout(10000); + + // Connect to flex endpoint with multiple clients + const flexWS1 = await connectWS("ws://127.0.0.1:8382/flex"); + const flexWS2 = await connectWS("ws://127.0.0.1:8382/flex"); + const clients = [ flexWS1, flexWS2 ]; + + // Initial status is null + for (const ws of clients) { + await expectStatusReply(ws, (statusInitial) => { + expect(statusInitial.address).to.be.null; + expect(statusInitial.device).to.be.null; + }); + }; + + const broadcast1 = expectBroadcast(flexWS1, (broadcast) => { + expect(broadcast.message.type).to.be.equal("Status"); + expect(broadcast.message.address).to.be.equal(virtualDevice.address); + expect(broadcast.message.device.usbDevice.serialNumber).to.be.equal(virtualDevice.serialNumber); + return broadcast + }); + const broadcast2 = expectBroadcast(flexWS2, (b) => { return b }); + + await virtualDevice.registerWithDriver("http://127.0.0.1:8382"); + expect(virtualDevice.isRegistered()).to.be.true; + + // this will await for Flex backgroundScanIntervalSeconds, which is 2 seconds currently + expect(await broadcast1).to.deep.equal(await broadcast2); + + for (const ws of clients) { + // Reply to GetStatus should match the Status Broadcast + expectStatusReply(ws, (status) => { + expect(status).to.deep.equal(broadcastChecked.message); + }); + }; + + const disconnectBroadcast1 = expectBroadcast(flexWS1, (broadcast) => { + expect(broadcast.message.type).to.be.equal("Status"); + expect(broadcast.message.address).to.be.null; + expect(broadcast.message.device).to.be.null; + return broadcast + }); + const disconnectBroadcast2 = expectBroadcast(flexWS2, (b) => { return b }); + + await virtualDevice.serialPort.close(); + expect(await disconnectBroadcast1).to.deep.equal(await disconnectBroadcast2); + }); + + it("Can receive binary data verbatim with passthru", async function () { + this.timeout(10000); + + await virtualDevice.registerWithDriver("http://127.0.0.1:8382"); + const flexWS = await connectAndVerifyWS(virtualDevice); + + // generate some random data + const binaryData = Buffer.alloc(2048); + for (let i = 0; i < binaryData.length; i++) { + binaryData[i] = Math.floor(Math.random() * 256); + } + const binaryDataChunks = splitBufferRandomly(binaryData, 64, 256); + + // Set up promise to collect WebSocket data + let receivedData = Buffer.from(""); + const expectData = new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + if (receivedData.length === 0) { + reject(new Error("No data received within timeout")); + } else { + reject( + new Error( + "Not all bytes received in time: " + + `${receivedData.length} out of ${binaryData.length}`, + ), + ); + } + }, 8000); + + flexWS.on("message", function message(data, isBinary) { + if (isBinary) { + receivedData = Buffer.concat([receivedData, data]); + } + if (receivedData.length === binaryData.length) { + clearTimeout(timeout); + resolve(); + return; + } + }); + }); + + for (const chunk of binaryDataChunks) { + virtualDevice.serialPort.write(chunk); + await wait(10) + } + + await expectData; + + expect(receivedData.length).to.be.equal(binaryData.length); + expect(receivedData).to.deep.equal(binaryData); + }); + }); + + describe("PASSTHRU-PretendFlex device", () => { + var virtualDevice; + var flexWS; + + beforeEach(async function() { + this.timeout(3000); + [virtualDevice, flexWS] = await withDeviceAndClient({ + idVendor: "16c0", + product: "PASSTHRU-PretendFlex", + }); + }); + + it("present PASSTHRU- as device to client", async function () { + await expectStatusReply(flexWS, (status) => { + expect(status.address).to.be.equal(virtualDevice.address); + expect(status.device.usbDevice.product).to.be.equal("PretendFlex"); + }); + }); + }); + + describe("Flex v4 device", () => { + var virtualDevice; + var flexWS; + + beforeEach(async function() { + this.timeout(3000); + [virtualDevice, flexWS] = await withDeviceAndClient({ + idVendor: "16c0", + manufacturer: "Teensyduino", + bcdDevice: "0277", + }); + }); + + it("can receive 8-bit synthetic data via WebSocket", async function () { + this.timeout(10000); + + const numFrames = 24; + + // Set up promise to collect WebSocket data + const receivedFrames = []; + const expectData = new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + if (receivedFrames.length === 0) { + reject(new Error("No data received within timeout")); + } else if (receivedFrames.length < numFrames) { + reject( + new Error( + `Expected ${numFrames} frames, got: ${receivedFrames.length}` + ) + ); + } + }, 8000); + + flexWS.on("message", function message(data, isBinary) { + if (isBinary) { + receivedFrames.push(Buffer.from(data)); + } + if (receivedFrames.length === numFrames) { + clearTimeout(timeout); + resolve(); + } + }); + }); + + // Send the synthetic serial data to the device + for (let i = 0; i < numFrames; i++) { + virtualDevice.serialPort.write(generateFlexSerialFrame(i, 8)); + } + + // Wait for data to be received + await expectData; + + // Verify we received the correct number of frames + expect(receivedFrames.length).to.be.equal(numFrames); + + // Check each frame's content + // The driver forwards the sample data (without headers) in 8-bit mode: + // 3 bytes per sample: row, col, pressure + for (let frameIdx = 0; frameIdx < numFrames; frameIdx++) { + const frame = receivedFrames[frameIdx]; + + // Each frame should have 2 samples * 3 bytes = 6 bytes + expect(frame.length).to.be.equal(6); + + // Sample 1: (frameIdx, 1, frameIdx*2+1) + expect(frame[0]).to.be.equal(frameIdx); // row + expect(frame[1]).to.be.equal(1); // col + expect(frame[2]).to.be.equal(frameIdx * 2 + 1); // pressure + + // Sample 2: (1, frameIdx, frameIdx*3+1) + expect(frame[3]).to.be.equal(1); // row + expect(frame[4]).to.be.equal(frameIdx); // col + expect(frame[5]).to.be.equal(frameIdx * 3 + 1); // pressure + } + }); + }); + + describe("Flex v5 device", () => { + var virtualDevice; + var flexWS; + + beforeEach(async function() { + this.timeout(3000); + [virtualDevice, flexWS] = await withDeviceAndClient({ + idVendor: "16c0", + manufacturer: "Teensyduino", + bcdDevice: "0278", + }); + }); + + it("can receive 12-bit synthetic data via WebSocket", async function () { + this.timeout(10000); + + // Track commands received from driver to know when mode switch is complete + const modeSwitchDone = new Promise((resolve) => { + let modeSwitchComplete = false; + let seenUM = false; + virtualDevice.serialPort.on("data", (data) => { + const str = data.toString(); + // After mode switch, driver sends UM\n then S\n + if (str.includes("UM")) { + seenUM = true; + } + // When we see S\n after UM, the mode switch is complete + if (seenUM && str.includes("S\n") && !modeSwitchComplete) { + modeSwitchComplete = true; + resolve(); + } + }); + }); + + // Switch to 12-bit mode by sending UM\n command + const switchTo12BitCmd = Buffer.from("UM\n"); + flexWS.send(switchTo12BitCmd); + + // Send dummy data to unblock the old reader (it's blocking on ReadByte) + // NOTE: this is a theoretical bug in the driver, but this happens only in + // the synthetic setup and there's no point patching to-be-legacy device + // corner-cases at this point + await wait(50); + virtualDevice.serialPort.write(Buffer.from([0x00])); + + await modeSwitchDone; + + const numFrames = 24; + + // Set up promise to collect WebSocket data + const receivedFrames = []; + const expectData = new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + if (receivedFrames.length === 0) { + reject(new Error("No data received within timeout")); + } else if (receivedFrames.length < numFrames) { + reject( + new Error( + `Expected ${numFrames} frames, got: ${receivedFrames.length}` + ) + ); + } + }, 8000); + + flexWS.on("message", function message(data, isBinary) { + if (isBinary) { + receivedFrames.push(Buffer.from(data)); + } + if (receivedFrames.length === numFrames) { + clearTimeout(timeout); + resolve(); + } + }); + }); + + for (let i = 0; i < numFrames; i++) { + virtualDevice.serialPort.write(generateFlexSerialFrame(i, 12)); + } + + await expectData; + + expect(receivedFrames.length).to.be.equal(numFrames); + + // Check each frame's content + for (let frameIdx = 0; frameIdx < numFrames; frameIdx++) { + const frame = receivedFrames[frameIdx]; + + // Each frame should have 2 samples * 4 bytes = 8 bytes + expect(frame.length).to.be.equal(8); + + // Sample 1: (frameIdx, 1, frameIdx*2+1) + expect(frame[0]).to.be.equal(frameIdx); // row + expect(frame[1]).to.be.equal(1); // col + const pressure1 = frame.readUInt16BE(2); + expect(pressure1).to.be.equal(frameIdx * 2 + 1); + + // Sample 2: (1, frameIdx, frameIdx*3+1) + expect(frame[4]).to.be.equal(1); // row + expect(frame[5]).to.be.equal(frameIdx); // col + const pressure2 = frame.readUInt16BE(6); + expect(pressure2).to.be.equal(frameIdx * 3 + 1); + } + }); + }); + + describe("Sensitronics device", () => { + var virtualDevice; + var flexWS; + + beforeEach(async function() { + this.timeout(3000); + + virtualDevice = new VirtualDevice({ + idVendor: "16c0", + idProduct: "0483", + manufacturer: "Dividat", + product: "FlexV6", + }); + await virtualDevice.initialize(); + }); + + it("driver sends start command, chunks frames", async function () { + this.timeout(10000); + + // Set up listener for start command before connecting + const startCmdReceived = new Promise((resolve) => { + virtualDevice.serialPort.on("data", (data) => { + const str = data.toString(); + if (str.includes("S\n")) { + resolve(); + } + }); + }); + + await virtualDevice.registerWithDriver("http://127.0.0.1:8382"); + expect(virtualDevice.isRegistered()).to.be.true; + + flexWS = await connectAndVerifyWS(virtualDevice); + + // Generate random frames + const numFrames = 30; + const generatedFrames = []; + for (let i = 0; i < numFrames; i++) { + generatedFrames.push(generateRandomSensitronicsFrame(50)); + } + + const allFramesBuffer = Buffer.concat(generatedFrames); + + // Split the buffer into random chunks to simulate fragmented transmission + const chunks = splitBufferRandomly(allFramesBuffer, 1, 15); + + const receivedFrames = []; + const expectData = new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + if (receivedFrames.length === 0) { + reject(new Error("No data received within timeout")); + } else if (receivedFrames.length < numFrames) { + reject( + new Error( + `Expected ${numFrames} frames, got: ${receivedFrames.length}` + ) + ); + } + }, 8000); + + flexWS.on("message", function message(data, isBinary) { + if (isBinary) { + receivedFrames.push(Buffer.from(data)); + } + if (receivedFrames.length === numFrames) { + clearTimeout(timeout); + resolve(); + } + }); + }); + + // wait for start command before producing data + await startCmdReceived; + for (const chunk of chunks) { + virtualDevice.serialPort.write(chunk); + } + + await expectData; + + expect(receivedFrames.length).to.be.equal(numFrames); + + for (let i = 0; i < numFrames; i++) { + expect( + receivedFrames[i].equals(generatedFrames[i]), + `Frame ${i} mismatch` + ).to.be.true; + } + }); + }); +}); diff --git a/test/flex/mock/VirtualDevice.js b/test/flex/mock/VirtualDevice.js new file mode 100644 index 0000000..80baae1 --- /dev/null +++ b/test/flex/mock/VirtualDevice.js @@ -0,0 +1,144 @@ +const VirtualSerialPort = require("./VirtualSerialPort"); +const fs = require("fs"); +const { promisify } = require("util"); +const sleep = promisify(setTimeout); + +class VirtualDevice { + constructor(usbInfo = {}) { + // Note: on Linux, go.bug.st/serial/enumerator reads directly from sysFS + // files and treats the data as strings. So to test that our code converts + // them to uint16 correctly, we have to specify them as strings too. + // + // sysFS stores USB descriptors as fixed-length (4 char) hex strings WITHOUT + // the 0x prefix, so e.g. + this.idVendor = usbInfo.idVendor || "F0FA"; + this.idProduct = usbInfo.idProduct || "0001"; + this.bcdDevice = usbInfo.bcdDevice || "0001"; + this.serialNumber = usbInfo.serialNumber || "9090909"; + this.manufacturer = usbInfo.manufacturer || "Mockfactory"; + this.product = usbInfo.product || "Mockdevice"; + + this.serialPort = new VirtualSerialPort(); + this.registeredId = null; + this.replayStopRequested = false; + this.address = null; + } + + async initialize() { + this.address = await this.serialPort.open(); + } + + async registerWithDriver(url) { + if (this.registeredId !== null) { + throw new Error("Device is already registered"); + } + + if (this.serialPort.getPortPath() === null) { + throw new Error("Serial port is not created"); + } + + const payload = { + Name: this.serialPort.getPortPath(), + VID: this.idVendor, + PID: this.idProduct, + BcdDevice: this.bcdDevice, + SerialNumber: this.serialNumber, + Manufacturer: this.manufacturer, + Product: this.product, + }; + + const response = await fetch(`${url}/flex/mock/`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(payload), + }); + + if (!response.ok) { + throw new Error( + `Failed to register device: ${response.status} ${response.statusText}`, + ); + } + + const result = await response.json(); + this.registeredId = result.id; + + return this.registeredId; + } + + async unregisterFromDriver(url) { + if (this.registeredId === null) { + throw new Error("Device is not registered"); + } + + const response = await fetch(`${url}/flex/mock/${this.registeredId}`, { + method: "DELETE", + }); + + if (!response.ok) { + throw new Error( + `Failed to unregister device: ${response.status} ${response.statusText}`, + ); + } + + this.registeredId = null; + } + + isRegistered() { + return this.registeredId !== null; + } + + stopReplay() { + this.replayStopRequested = true; + } + + async replayRecording(filePath, loop = true, speedFactor = 1) { + this.replayStopRequested = false; + if (!this.serialPort || !this.serialPort.isOpen) { + throw new Error("Serial port is not open"); + } + + do { + try { + const fileContent = fs.readFileSync(filePath, "utf8"); + const lines = fileContent.trim().split("\n"); + + for (const line of lines) { + if (this.replayStopRequested) { + return; + } + + if (line.trim() === "") continue; + + const [sleepDurationStr, base64Data] = line.split(","); + const sleepDuration = parseInt(sleepDurationStr.trim(), 10); + + if (isNaN(sleepDuration) || !base64Data) { + console.warn(`Skipping invalid line: ${line}`); + continue; + } + + // Sleep for specified duration (adjusted by speed factor) + // Note: sleeping before writing the data to simulate the amount + // of time it took the device to produce the sample. + const adjustedSleepDuration = sleepDuration / speedFactor; + await sleep(adjustedSleepDuration); + + // Convert base64 to binary data + const binaryData = Buffer.from(base64Data.trim(), "base64"); + + // Write data to serial port + const writeSuccess = this.serialPort.write(binaryData); + if (!writeSuccess) { + console.warn(`Failed to write frame data to serial port`); + } + } + } catch (error) { + throw new Error(`Failed to replay recording: ${error.message}`); + } + } while (loop); + } +} + +module.exports = VirtualDevice; diff --git a/test/flex/mock/VirtualSerialPort.js b/test/flex/mock/VirtualSerialPort.js new file mode 100644 index 0000000..bc07f62 --- /dev/null +++ b/test/flex/mock/VirtualSerialPort.js @@ -0,0 +1,210 @@ +const { spawn } = require("child_process"); +const { EventEmitter } = require("events"); +const fs = require("fs"); +const path = require("path"); +const os = require("os"); +// enable debug logs by passing an setting env DEBUG=VirtualSerialPort +const debug = require("debug")("VirtualSerialPort") + +class VirtualSerialPort extends EventEmitter { + constructor() { + super(); + this.socatProcess = null; + this.ttyFd = null; + this.readStream = null; + this.writeStream = null; + this.ttyPath = null; + this.subsidiaryPath = null; + this.isOpen = false; + } + + /** + * @async + * @returns {Promise} subsidiary TTY path + */ + async open() { + debug("Opening virtual serial port"); + return new Promise((resolve, reject) => { + const tmpDir = os.tmpdir(); + const timestamp = Date.now(); + this.subsidiaryPath = path.join(tmpDir, `vtty_subsidiary_${timestamp}`); + debug("Creating subsidiary path:", this.subsidiaryPath); + + this.socatProcess = spawn( + "socat", + ["-", `pty,raw,echo=0,link=${this.subsidiaryPath}`], + { + stdio: ["pipe", "pipe", "pipe"], + }, + ); + + this.socatProcess.on("error", (error) => { + console.error("socat process error:", error.message); + this.emit( + "error", + new Error(`Failed to spawn socat: ${error.message}`), + ); + reject(error); + }); + + const waitForSubsidiary = () => { + try { + fs.accessSync(this.subsidiaryPath, fs.constants.F_OK); + debug( + "Subsidiary path accessible, setting up TTY", + ); + this.setupTTY(); + this.isOpen = true; + debug("Port opened successfully"); + this.emit("open"); + resolve(this.subsidiaryPath); + } catch (e) { + setTimeout(waitForSubsidiary, 10); + } + }; + + setTimeout(waitForSubsidiary, 10); + + this.socatProcess.on("exit", (code, signal) => { + debug( + "socat process exited with code:", + code, + "signal:", + signal, + ); + this.isOpen = false; + this.emit("close"); + if (code !== 0 && code !== null && signal !== "SIGTERM") { + console.error( + "socat exited with error code:", + code, + ); + this.emit( + "error", + new Error(`socat process exited with code ${code}`), + ); + } + }); + }); + } + + setupTTY() { + debug("Setting up TTY streams"); + try { + this.writeStream = this.socatProcess.stdin; + this.readStream = this.socatProcess.stdout; + + this.readStream.on("data", (data) => { + debug("Received data:", data.length, "bytes"); + this.emit("data", data); + }); + + this.readStream.on("error", (error) => { + console.error("Read stream error:", error.message); + this.emit("error", error); + }); + + this.writeStream.on("error", (error) => { + console.error("Write stream error:", error.message); + this.emit("error", error); + }); + + this.readStream.on("end", () => { + debug("Read stream ended"); + if (this.isOpen) { + debug( + "Emitting close event due to read stream end", + ); + this.emit("close"); + this.isOpen = false; + } + }); + } catch (error) { + console.error("Error setting up TTY:", error.message); + this.emit("error", error); + } + } + + /** + * @param {Buffer|string} data + * @returns {boolean} success + */ + write(data) { + if (!this.isOpen || !this.writeStream) { + cosnole.error( + "Write failed: TTY is not open (isOpen:", + this.isOpen, + "writeStream:", + !!this.writeStream, + ")", + ); + this.emit("error", new Error("TTY is not open")); + return false; + } + + try { + const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data); + debug("Writing data:", buffer.length, "bytes"); + return this.writeStream.write(buffer); + } catch (error) { + console.error("Write error:", error.message); + this.emit("error", error); + return false; + } + } + + close() { + if (!this.isOpen) { + debug("Close called but port already closed"); + return; + } + + debug("Closing virtual serial port"); + this.isOpen = false; + + if (this.writeStream && !this.writeStream.destroyed) { + debug("Ending write stream"); + this.writeStream.end(); + } + + this.readStream = null; + this.writeStream = null; + + if (this.socatProcess && !this.socatProcess.killed) { + debug("Killing socat process"); + // Remove all listeners to prevent errors during cleanup + this.socatProcess.removeAllListeners(); + this.socatProcess.kill("SIGTERM"); + + setTimeout(() => { + if (this.socatProcess && !this.socatProcess.killed) { + debug("Force killing socat process"); + this.socatProcess.kill("SIGKILL"); + } + }, 1000); + } + + setTimeout(() => { + this.cleanupPaths(); + }, 500); + } + + cleanupPaths() { + try { + if (this.subsidiaryPath && fs.existsSync(this.subsidiaryPath)) { + fs.unlinkSync(this.subsidiaryPath); + } + } catch (error) { + // Ignore cleanup errors + } + } + + /** + * @returns {string|null} subsidiary TTY path + */ + getPortPath() { + return this.subsidiaryPath; + } +} + +module.exports = VirtualSerialPort; diff --git a/test/index.js b/test/index.js index 9302ff0..ed5accf 100644 --- a/test/index.js +++ b/test/index.js @@ -11,6 +11,10 @@ describe('Senso', () => { require('./senso') }) -describe('RFID', () => { - require('./rfid') +describe("RFID", () => { + require("./rfid") +}) + +describe("Flex", () => { + require("./flex") }) diff --git a/test/utils.js b/test/utils.js index 4ab0c3c..7774cba 100644 --- a/test/utils.js +++ b/test/utils.js @@ -9,14 +9,15 @@ module.exports = { }, startDriver: function (...args) { - return spawn('bin/dividat-driver', args) - // useful for debugging: - // return spawn('bin/dividat-driver', [], {stdio: 'inherit'}) + return spawn("bin/dividat-driver", args, { + // uncomment for Driver logs when debugging: + // stdio: "inherit", + }) }, - connectWS: function (url, opts) { + connectWS: function (url, opts, protocols) { return new Promise((resolve, reject) => { - const ws = new WebSocket(url, opts) + const ws = new WebSocket(url, protocols, opts) ws.on('open', () => { ws.removeAllListeners() resolve(ws) diff --git a/tools/flex-recording-and-replay.md b/tools/flex-recording-and-replay.md new file mode 100644 index 0000000..04e27e8 --- /dev/null +++ b/tools/flex-recording-and-replay.md @@ -0,0 +1,103 @@ +# Recording and replaying Flex data + +This document provides additional details on how you can record and replay Flex +data. + +## Recording Flex data + +There are 3 ways to record Senso Flex data: + +1. Reading data directly from the serial device, e.g. + + socat stdio /dev/ttyACM0 > recording.dat + + However, this means you cannot run the driver and thus cannot interact with + the device. In particular, any setup commands would have to be either + executed manually or prior to starting the recording. + + Not recommended and not supported for replay. timestamping+base64 left as an + exercise. + +2. Recording serial data by spying on driver's reads using `strace`: + + ./tools/record-flex-serial -o recording.serial.dat + + By default, the script will attach to `pidof dividat-driver` and spy on reads + from `/dev/ttyACM0`, but you can override it with `-p` and `-d` flags. See + `./tools/record-flex-serial --help` for details. + + This records serial data that can be then be used to do end-to-end replays + (that involve the driver's processing). + + This recording method only works on Linux. You can still replay these + recordings on macOS. + + To replay the data, use + + node tools/replay-flex -d recording.serial.dat + + By convention, such recordings are suffixed with ..serial.dat + +3. Recording the WebSocket binary stream from `/flex`: + + This records the processed output as produced by the driver, using the same + mechanism as for the Senso. + + make record-flex > recording.dat + + This data can be replayed using a special "passthru" mock device that + pretends to be a different device to the client: + + node tools/replay-flex -d passthru- recording.ws.dat + + This method can be useful if: + - You need to capture the exact output of the driver instead of the device + (e.g. for diff'ing) + - You cannot use `strace` for recording (e.g. on macOS) + + Note: for `v6` (Sensitronics) devices, the driver outputs identical bytes to the + serial data read, just chunked/framed (i.e. `concat(serial out) == + concat(WS binary stream)`). This means you can record the websocket + stream, but still replay as if it was recorded directly from the serial + output (using `-d v6`). + + By convention, such recordings are suffixed with ..ws.dat + +## Replaying Flex recordings + +The Senso Flex replayer (`npm run replay-flex`) supports the same parameters as +the Senso replayer and also allows to fake device metadata. + +Flex replay works by creating a mock serial device (using +`test/flex/mock/VirtualDevice.js`) and registering it in the Driver. + +Driver must be running and built with the `debug` tag, which is the default if +you run `make build` and/or `make run`. + +You can then replay a recording using: + + node tools/replay-flex -d recording.dat + +If you are using a serial data recording (i.e. recorded using +`tools/record-flex-serial`), the Driver will parse it as if it was reading the +serial data from a real device. This can be used to e.g. do regression testing +of the parsing logic in the Driver. + +If you are using a WebSocket stream recording (i.e. recorded using `make +record-flex`), you must also specify `--passthru` mode. This will instruct the +Driver to bypass device-specific serial data parsing and instead to simply +forward the serial data over the WebSocket verbatim. + +Note: if you are using `passthru` and the recorded data is not chunked into +frames (e.g. it is a recording of raw serial data), the client will receive +incomplete/split frames as separate WebSocket messages. This will not work with +Play, since it expects to receive complete frames. TL;DR: do not use +`passthru` with recordings obtained using `tools/record-flex-serial`. + +See +[src/dividat-driver/flex/devices/passthru/main.go](src/dividat-driver/flex/devices/passthru/main.go) +for the implementation details of `passthru`. + +CLI help is available with: + + node tools/replay-flex --help diff --git a/tools/record-flex-serial b/tools/record-flex-serial new file mode 100755 index 0000000..659a312 --- /dev/null +++ b/tools/record-flex-serial @@ -0,0 +1,73 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + echo "Usage: $0 [-p PID] [-d DEVICE] [-o OUTPUT]" >&2 + echo " -p PID Process ID to trace (default: pidof dividat-driver)" >&2 + echo " -d DEVICE Device path to trace (default: /dev/ttyACM0)" >&2 + echo " -o OUTPUT Output file (default: stdout)" >&2 + exit 1 +} + +while getopts "p:d:o:h" opt; do + case $opt in + p) PID="$OPTARG" ;; + d) DEVICE="$OPTARG" ;; + o) OUTPUT="$OPTARG" ;; + h) usage ;; + *) usage ;; + esac +done + +if [[ -z "${PID:-}" ]]; then + if ! PID=$(pidof dividat-driver 2>/dev/null); then + echo "Error: dividat-driver is not running" >&2 + exit 1 + fi +fi + +if [[ -z "${DEVICE:-}" ]]; then + DEVICE="/dev/ttyACM0" +fi + +if [[ ! -e "$DEVICE" ]]; then + echo "Error: Device $DEVICE does not exist" >&2 + exit 1 +fi + +if [[ -z "${OUTPUT:-}" ]]; then + OUTPUT="/dev/stdout" +fi + +echo "Tracing PID=$PID, device=$DEVICE, following all threads and forks..." >&2 + +# strace output will look like this: +# [pid 166687] 1767784040.380 read(8, "\x01\x01\x00", 4096) = 3 +# [pid 166687] 1767784040.387 read(8, "\x4e\x0a\x00\x04\x50\x0a\x02\x02\x00\x01\x01\x00\x02\x02\x00\x01\x01\x00", 4096) = 18 +# [pid 166687] 1767784040.395 read(8, "\x4e\x0a\x00\x04\x50\x0a\x02\x02\x00\x01\x01\x00\x02\x02\x00\x01\x01\x00", 4096) = 18 +strace \ + --timestamps=format:unix,precision:ms \ + -s 65536 \ + -xx \ + -e trace=read,readv \ + -P "$DEVICE" \ + --follow-forks -p "$PID" 2>&1 | \ +awk ' +BEGIN { prev = 0 } +match($0, /^[pid [0-9]+\] ([0-9.]+) readv?\([0-9]+, "([^"]+)",/, m) { + ts = m[1] + hex = m[2] + gsub(/\\x/, "", hex) + + if (prev == 0) delta = 0 + else delta = int((ts - prev) * 1000) + prev = ts + + printf "%d,", delta + print hex +} +' | while IFS=, read -r delta hex; do + printf "%d, " "$delta" + echo "$hex" | xxd -r -p | base64 --wrap=0 + echo "" +done >> "$OUTPUT" diff --git a/tools/replay-flex/index.js b/tools/replay-flex/index.js index 76f11d9..905e8dd 100644 --- a/tools/replay-flex/index.js +++ b/tools/replay-flex/index.js @@ -1,85 +1,164 @@ -// Mock the driver at localhost:8382 to replay Senso Flex package recordings - -const argv = require('minimist')(process.argv.slice(2)) -const fs = require('fs') -const split = require('binary-split') -const websocket = require('ws') -const http = require('http') -const EventEmitter = require('events') - -var recFile = argv['_'].pop() || 'rec/flex/zero.dat' -let speedFactor = 1/(parseFloat(argv['speed']) || 1) -let driverVersion = argv['driver-version'] || "9.9.9-REPLAY" -let loop = !argv['once'] - -// Create a never ending stream of data -function Replayer (recFile) { - var emitter = new EventEmitter() - - function createStream () { - var stream = new fs.createReadStream(recFile).pipe(split()) - - stream.on('data', (data) => { - stream.pause() - - var items = data.toString().split(',') - var msg - var timeout - if (items.length === 2) { - msg = items[1] - timeout = items[0] - } else { - msg = items[0] - timeout = 20 - } - var buf = Buffer.from(msg, 'base64') - emitter.emit('data', buf) - - setTimeout(() => { - stream.resume() - }, timeout * speedFactor) - }).on('end', () => { - if (loop) { - console.log('End of the record stream, looping.') - createStream() - } else { - console.log('End of the record stream, exiting.') - process.exit(0) - } - }) - } - createStream() - return emitter +#!/usr/bin/env node +// Replay Senso Flex serial data recordings to a running Driver via mock device + +const { program } = require("commander"); + +// Import VirtualDevice from test utilities +const VirtualDevice = require("../../test/flex/mock/VirtualDevice"); + +// USB descriptors for different device types +// Note: Values are 4-char hex strings without 0x prefix (sysFS format) +const DEVICE_USB_INFO = { + // V4 is Teensy 3.x with bcdDevice <= 0x0277 + v4: { + idVendor: "16c0", + idProduct: "0483", + manufacturer: "Teensyduino", + product: "Teensy", + bcdDevice: "0277", + }, + // V5 is Teensy 4.0 with bcdDevice > 0x0277 + v5: { + idVendor: "16c0", + idProduct: "0483", + manufacturer: "Teensyduino", + product: "Teensy", + bcdDevice: "0278", + }, + // V6 (Sensitronics) + v6: { + idVendor: "16c0", + idProduct: "0483", + manufacturer: "Dividat", + product: "FlexV6", + }, +}; + +// Define CLI using commander +program + .description("Replay Senso Flex serial data recordings to a running Driver via a mock device.\n\nNote: The Driver must be running with test mode enabled for mock device registration.") + .argument("[recording-file]", "path to the recording file", "rec/flex/zero.dat") + .option("-s, --speed ", "replay speed multiplier (>1 faster, <1 slower)", parseFloat, 1) + .option("--once", "play recording once and exit instead of looping") + .option("-u, --driver-url ", "URL of the running Driver", "http://127.0.0.1:8382") + .requiredOption("-d, --device ", "device type to emulate (v4, v5, v6)") + .option("--passthru", "Replay the recording verbatim. For use with recordings of /flex WS stream.") + .parse(); + +const options = program.opts(); +const recFile = program.args[0] || "rec/flex/zero.dat"; +const speed = options.speed; +const loop = !options.once; +const driverUrl = options.driverUrl; +const deviceType = options.device.toLowerCase(); + +// Validate device type +const validDeviceTypes = Object.keys(DEVICE_USB_INFO); +if (!validDeviceTypes.includes(deviceType)) { + console.error(`Error: Invalid device type '${deviceType}'`); + console.error(`Valid options: ${validDeviceTypes.join(", ")}`); + process.exit(1); } -const driverMetadata = { - "message": "Dividat Driver", - "version": driverVersion, - "machineId": "b58f4aa6e34227c2d0517c924c9060bc8a25d8de677bb42d9dd3d9d2a7eb128d", - "os": "linux", - "arch": "amd64", +// Get USB info and optionally apply passthru prefix +const usbInfo = { ...DEVICE_USB_INFO[deviceType] }; +if (options.passthru) { + usbInfo.product = `PASSTHRU-${deviceType}`; } -const server = http.createServer((req, res) => { - if (req.method === 'GET' && req.url === '/') { - res.writeHead(200, - {'Content-Type': 'application/json', - 'Access-Control-Allow-Origin': '*'} - ); - res.end(JSON.stringify(driverMetadata)); - } else { - res.writeHead(404); - res.end(); +async function main() { + console.log(`Replay Flex Recording Tool`); + console.log(`--------------------------`); + console.log(`Recording file: ${recFile}`); + console.log(`Speed: ${speed}x`); + console.log(`Loop: ${loop}`); + console.log(`Driver URL: ${driverUrl}`); + console.log(`Device type: ${deviceType}`); + console.log(); + + // Check if recording file exists + const fs = require("fs"); + if (!fs.existsSync(recFile)) { + console.error(`Error: Recording file not found: ${recFile}`); + process.exit(1); } -}); -const wss = new websocket.Server({ server }); + // Create virtual device with selected USB descriptors + const virtualDevice = new VirtualDevice(usbInfo); -wss.on('connection', function connection(ws) { - const dataStream = Replayer(recFile) - dataStream.on('data', (data) => ws.send(data)) -}); + // Initialize the virtual serial port + console.log("Initializing virtual device..."); + try { + await virtualDevice.initialize(); + console.log(`Virtual serial port created at: ${virtualDevice.address}`); + } catch (error) { + console.error(`Failed to initialize virtual device: ${error.message}`); + process.exit(1); + } + + // Register mock device with the Driver + console.log(`Registering mock device with Driver at ${driverUrl}...`); + try { + await virtualDevice.registerWithDriver(driverUrl); + console.log(`Mock device registered with ID: ${virtualDevice.registeredId}`); + } catch (error) { + console.error(`Failed to register mock device with Driver: ${error.message}`); + console.error(`Make sure the Driver is running with test mode enabled.`); + virtualDevice.serialPort.close(); + process.exit(1); + } + + // Track if we're shutting down to suppress expected errors + let isShuttingDown = false; + + // Handle errors from the serial port (e.g., socat exit on SIGINT) + virtualDevice.serialPort.on("error", (error) => { + if (!isShuttingDown) { + console.error(`Serial port error: ${error.message}`); + } + }); + + // Handle graceful shutdown + const cleanup = async () => { + if (isShuttingDown) return; + isShuttingDown = true; + + console.log("\nShutting down..."); + virtualDevice.stopReplay(); + + // Close serial port first to prevent error events from socat + if (virtualDevice.serialPort) { + virtualDevice.serialPort.close(); + } + + try { + await virtualDevice.unregisterFromDriver(driverUrl); + console.log("Unregistered mock device from Driver."); + } catch (error) { + console.warn(`Warning: Failed to unregister device: ${error.message}`); + } + + process.exit(0); + }; + + process.on("SIGINT", cleanup); + process.on("SIGTERM", cleanup); + + // Start replaying the recording + console.log(`\nStarting replay of ${recFile}...`); + try { + await virtualDevice.replayRecording(recFile, loop, speed); + if (!loop) { + console.log("End of recording reached, exiting."); + await cleanup(); + } + } catch (error) { + console.error(`Replay error: ${error.message}`); + await cleanup(); + } +} -server.listen(8382, () => { - console.log('Mock Driver running at http://localhost:8382/'); +main().catch((error) => { + console.error(`Unexpected error: ${error.message}`); + process.exit(1); }); diff --git a/tools/replay/index.js b/tools/replay/index.js index 1402759..e6535b4 100644 --- a/tools/replay/index.js +++ b/tools/replay/index.js @@ -1,6 +1,6 @@ // Mock up a Senso data and control server -const argv = require('minimist')(process.argv.slice(2)) +const { program } = require('commander') const fs = require('fs') const split = require('binary-split') const net = require('net') @@ -9,9 +9,18 @@ const EventEmitter = require('events') const control = require('./control') -var recFile = argv['_'].pop() || 'rec/senso/zero.dat' -let speedFactor = 1/(parseFloat(argv['speed']) || 1) -let loop = !argv['once'] +// Define CLI using commander +program + .description('Mock up a Senso data and control server by replaying recorded data.') + .argument('[recording-file]', 'path to the recording file', 'rec/senso/zero.dat') + .option('-s, --speed ', 'replay speed multiplier (>1 faster, <1 slower)', parseFloat, 1) + .option('--once', 'play recording once and exit instead of looping') + .parse() + +const options = program.opts() +const recFile = program.args[0] || 'rec/senso/zero.dat' +let speedFactor = 1 / (options.speed || 1) +let loop = !options.once async function mockSenso (profile, data) { var socket = await listenForConnection('0.0.0.0', 55567)