diff --git a/.gitignore b/.gitignore index 19232cf..9b290ea 100644 --- a/.gitignore +++ b/.gitignore @@ -134,3 +134,6 @@ dmypy.json # Pyre type checker .pyre/ + +# IDE +.vscode/ diff --git a/chimerapy/pipelines/__init__.py b/chimerapy/pipelines/__init__.py index 5147277..dc221ea 100644 --- a/chimerapy/pipelines/__init__.py +++ b/chimerapy/pipelines/__init__.py @@ -22,6 +22,7 @@ def register_nodes_metadata(): "chimerapy.pipelines.yolov8.multi_vid_pose:YoloV8Node", "chimerapy.pipelines.yolov8.multi_save:MultiSaveNode", "chimerapy.pipelines.yolov8.display:DisplayNode", + "chimerapy.pipelines.g3.node:G3", ], } diff --git a/chimerapy/pipelines/g3/__init__.py b/chimerapy/pipelines/g3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chimerapy/pipelines/g3/node.py b/chimerapy/pipelines/g3/node.py new file mode 100644 index 0000000..f61be31 --- /dev/null +++ b/chimerapy/pipelines/g3/node.py @@ -0,0 +1,180 @@ +from typing import List, Optional + +import asyncio +import base64 +import cv2 +import logging +import numpy as np + +from g3pylib import connect_to_glasses + +import chimerapy.engine as cpe +from chimerapy.orchestrator import source_node + + +@source_node(name="CPPipelines_G3") +class G3(cpe.Node): + """A node that connects to a Tobii Glasses 3 instance to perform streaming and recording operations. + + Sample `gaze_data` in datachunk: + ```JSON + { + "gaze2d": [0.468, 0.483], + "gaze3d": [37.543, -18.034, 821.265], + "eyeleft": { + "gazeorigin": [28.799, -7.165, -23.945], + "gazedirection": [0.0463, -0.0337, 0.998], + "pupildiameter": 2.633 + }, + "eyeright": { + "gazeorigin": [-28.367, -5.353, -21.426], + "gazedirection": [0.0436, 0.00611, 0.999], + "pupildiameter": 2.782 + } + } + ``` + + Parameters + ---------- + hostname: str, required + The G3 device's serial number (e.g. TG03B-080200004381), used for connections. + name: str, optional (default: "") + The name of the node. If not provided, the `hostname` will be used. + show_gaze: bool, optional (default: False) + Whether to render the gaze circle on video frames + frames_key: str, optional (default: "frame") + The key to use for the video frame in the data chunk + **kwargs + Additional keyword arguments to pass to the Node constructor + """ + + def __init__( + self, + hostname: str, + name: str = "", + show_gaze: bool = False, + frame_key: str = "frame", + **kwargs, + ): + self.hostname = hostname + self.show_gaze = show_gaze + self.frame_key = frame_key + + if not name: + name = hostname + + super().__init__(name=name, **kwargs) + + async def setup(self) -> None: + self.g3 = await connect_to_glasses.with_hostname( + self.hostname, using_zeroconf=True + ) + + ( + self.scene_queue, + self.unsub_to_scene, + ) = await self.g3.rudimentary.subscribe_to_scene() + ( + self.gaze_queue, + self.unsub_to_gaze, + ) = await self.g3.rudimentary.subscribe_to_gaze() + + await self.g3.rudimentary.start_streams() + self.is_recording = False + + async def step(self) -> cpe.DataChunk: + # start on-device recording if node enters RECORDING state + if self.state.fsm == "RECORDING" and not self.is_recording: + if await self.g3.recorder.start(): + self.is_recording = True + print("\n---------------Started Recording On Device---------------") + elif self.state.fsm == "STOPPED" and self.is_recording: + if await self.g3.recorder.stop(): + self.is_recording = False + print("\n---------------Stopped Recording On Device---------------") + + ret_chunk = cpe.DataChunk() + + frame_timestamp, frame_data_b64 = await self.scene_queue.get() + gaze_timestamp, gaze_data = await self.gaze_queue.get() + + # add timestamp to gaze data + gaze_data["timestamp"] = gaze_timestamp + + # convert base64-encoded frame data string to cv2 image + frame_nparr = np.fromstring(base64.b64decode(frame_data_b64), dtype=np.uint8) + frame_data = cv2.imdecode(frame_nparr, cv2.IMREAD_COLOR) + + if self.show_gaze and "gaze2d" in gaze_data: + gaze2d = gaze_data["gaze2d"] + logging.info(f"Gaze2d: {gaze2d[0]:9.4f},{gaze2d[1]:9.4f}") + + # convert rational (x,y) to pixel location (x,y) + h, w = frame_data.shape[:2] + fix = (int(gaze2d[0] * w), int(gaze2d[1] * h)) + + # draw gaze on frame_data + frame_data = cv2.circle(frame_data, fix, 10, (0, 0, 255), 3) + + # save gaze_data to file + self.save_json("gaze-data", gaze_data) + + # self.save_video("stream", frame_data, 25) + + # TODO: figure out why CV2 window isn't showing + ret_chunk.add(self.frame_key, frame_data, "image") + # print("---------------------ADDED FRAME DATA------------------------") + ret_chunk.add("gaze_data", gaze_data) + # print("---------------------ADDED GAZE DATA------------------------") + + return ret_chunk + + @cpe.register # .with_config(style="blocking") + async def calibrate(self) -> bool: + # TODO: test if calibration works separately for rudimentary and regular + await self.g3.rudimentary.calibrate() + return await self.g3.calibrate.run() + + # TODO: check recorder state before making start, stop, cancel calls + # TODO: add visual cue to indicate recorder state, e.g. UI components or greyed out buttons + # @cpe.register + # async def start_recording(self) -> bool: + # return await self.g3.recorder.start() + + @cpe.register + async def cancel_recording(self) -> None: + await self.g3.recorder.cancel() + + # @cpe.register + # async def stop_recording(self) -> bool: + # # TODO: check if recording has started + # return await self.g3.recorder.stop() + + @cpe.register + async def take_snapshot(self) -> bool: + return await self.g3.recorder.snapshot() + + @cpe.register.with_config(params={"download_dir": "str"}) + async def download_latest_recording(self, download_dir: str) -> Optional[str]: + if not await self.g3.recordings: + return None + + return await self.g3.recordings[0].download_files(download_dir) + + @cpe.register.with_config(params={"download_dir": "str"}) + async def download_all_recordings(self, download_dir: str) -> Optional[List[str]]: + if not await self.g3.recordings: + return None + + return await asyncio.gather( + recording.download_files(download_dir) for recording in self.g3.recordings + ) + + async def teardown(self) -> None: + # await self.streams.__aexit__() + await self.g3.recorder.stop() + await self.g3.rudimentary.stop_streams() + await self.unsub_to_scene + await self.unsub_to_gaze + + await self.g3.close() diff --git a/configs/g3/all_local_single_device.json b/configs/g3/all_local_single_device.json new file mode 100644 index 0000000..f9ce495 --- /dev/null +++ b/configs/g3/all_local_single_device.json @@ -0,0 +1,42 @@ +{ + "mode": "record", + "workers": { + "manager_ip": "172.16.80.147", + "manager_port": 9001, + "instances": [ + { + "name": "local", + "id": "local", + "description": "local worker for the MMLA pipeline demo with a G3 node" + } + ] + }, + "nodes": [ + { + "registry_name": "CPPipelines_G3", + "name": "g3-instance", + "kwargs": { + "hostname": "TG03B-080201037351", + "show_gaze": true + }, + "package": "chimerapy-pipelines" + }, + { + "registry_name": "CPPipelines_ShowWindows", + "name": "show", + "package": "chimerapy-pipelines" + } + ], + "adj": [["g3-instance", "show"]], + "manager_config": { + "logdir": "cp-logs", + "port": 9001 + }, + "mappings": { + "local": ["g3-instance", "show"] + }, + "timeouts": { + "commit_timeout": 120, + "shutdown_timeout": 300 + } +} diff --git a/pyproject.toml b/pyproject.toml index c056fde..b4cfc91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,15 +2,13 @@ name = "chimerapy-pipelines" version = "0.0.1" description = "Respository of sharable pipelines for ChimeraPy" -license = {file = "LICENSE.txt"} +license = { file = "LICENSE.txt" } readme = "README.md" requires-python = ">3.6" keywords = ["education", "multimodal", "data", "learning", "analytics"] -classifiers = [ - "Programming Language :: Python :: 3" -] +classifiers = ["Programming Language :: Python :: 3"] dependencies = [ 'numpy', @@ -26,7 +24,7 @@ dependencies = [ 'chimerapy-orchestrator', 'chimerapy-engine', 'pyaudio', - 'pvrecorder' + 'pvrecorder', ] [project.optional-dependencies] @@ -40,16 +38,18 @@ test = [ mfsort = [ 'mf_sort[yolo] @ git+https://github.com/kbvatral/MF-SORT.git@master#egg=mf_sort', - 'ultralytics' + 'ultralytics', ] embodied = [ 'elp @ git+http://github.com/oele-isis-vanderbilt/EmbodiedLearningProject@main#egg=elp', - 'l2cs @ git+https://github.com/edavalosanaya/L2CS-Net.git@main#egg=l2cs' + 'l2cs @ git+https://github.com/edavalosanaya/L2CS-Net.git@main#egg=l2cs', ] -yolov8 = [ - 'ultralytics' +yolov8 = ['ultralytics'] + +g3 = [ + 'g3pylib @ git+https://github.com/albertna/g3pylib.git@feature/download-recording#egg=g3pylib', ] [project.urls] @@ -72,7 +72,7 @@ where = ["."] ignore = ["E501"] select = ["E", "W", "F", "C", "B", "I"] ignore-init-module-imports = true -fixable = ["I001"] # isort fix only +fixable = ["I001"] # isort fix only extend-exclude = ["run.py"] [tool.ruff.per-file-ignores] @@ -80,8 +80,6 @@ extend-exclude = ["run.py"] "chimerapy/pipelines/__version__.py" = ["E402"] - - [project.entry-points."chimerapy.orchestrator.nodes_registry"] get_nodes_registry = "chimerapy.pipelines:register_nodes_metadata" @@ -96,7 +94,7 @@ log_cli_format = "%(asctime)s.%(msecs)03d [%(levelname)8s] %(message)s (%(filena log_cli_date_format = "%Y-%m-%d %H:%M:%S" # Timeout -faulthandler_timeout=300 +faulthandler_timeout = 300 # Ignore warnings filterwarnings = "ignore::DeprecationWarning"