Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,6 @@ dmypy.json

# Pyre type checker
.pyre/

# IDE
.vscode/
1 change: 1 addition & 0 deletions chimerapy/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def register_nodes_metadata():
"chimerapy.pipelines.yolov8.multi_vid_pose:YoloV8Node",
"chimerapy.pipelines.yolov8.multi_save:MultiSaveNode",
"chimerapy.pipelines.yolov8.display:DisplayNode",
"chimerapy.pipelines.g3.node:G3",
],
}

Expand Down
Empty file.
180 changes: 180 additions & 0 deletions chimerapy/pipelines/g3/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from typing import List, Optional

import asyncio
import base64
import cv2
import logging
import numpy as np

from g3pylib import connect_to_glasses

import chimerapy.engine as cpe
from chimerapy.orchestrator import source_node


@source_node(name="CPPipelines_G3")
class G3(cpe.Node):
"""A node that connects to a Tobii Glasses 3 instance to perform streaming and recording operations.

Sample `gaze_data` in datachunk:
```JSON
{
"gaze2d": [0.468, 0.483],
"gaze3d": [37.543, -18.034, 821.265],
"eyeleft": {
"gazeorigin": [28.799, -7.165, -23.945],
"gazedirection": [0.0463, -0.0337, 0.998],
"pupildiameter": 2.633
},
"eyeright": {
"gazeorigin": [-28.367, -5.353, -21.426],
"gazedirection": [0.0436, 0.00611, 0.999],
"pupildiameter": 2.782
}
}
```

Parameters
----------
hostname: str, required
The G3 device's serial number (e.g. TG03B-080200004381), used for connections.
name: str, optional (default: "")
The name of the node. If not provided, the `hostname` will be used.
show_gaze: bool, optional (default: False)
Whether to render the gaze circle on video frames
frames_key: str, optional (default: "frame")
The key to use for the video frame in the data chunk
**kwargs
Additional keyword arguments to pass to the Node constructor
"""

def __init__(
self,
hostname: str,
name: str = "",
show_gaze: bool = False,
frame_key: str = "frame",
**kwargs,
):
self.hostname = hostname
self.show_gaze = show_gaze
self.frame_key = frame_key

if not name:
name = hostname

super().__init__(name=name, **kwargs)

async def setup(self) -> None:
self.g3 = await connect_to_glasses.with_hostname(
self.hostname, using_zeroconf=True
)

(
self.scene_queue,
self.unsub_to_scene,
) = await self.g3.rudimentary.subscribe_to_scene()
(
self.gaze_queue,
self.unsub_to_gaze,
) = await self.g3.rudimentary.subscribe_to_gaze()

await self.g3.rudimentary.start_streams()
self.is_recording = False

async def step(self) -> cpe.DataChunk:
# start on-device recording if node enters RECORDING state
if self.state.fsm == "RECORDING" and not self.is_recording:
if await self.g3.recorder.start():
self.is_recording = True
print("\n---------------Started Recording On Device---------------")
elif self.state.fsm == "STOPPED" and self.is_recording:
if await self.g3.recorder.stop():
self.is_recording = False
print("\n---------------Stopped Recording On Device---------------")

ret_chunk = cpe.DataChunk()

frame_timestamp, frame_data_b64 = await self.scene_queue.get()
gaze_timestamp, gaze_data = await self.gaze_queue.get()

# add timestamp to gaze data
gaze_data["timestamp"] = gaze_timestamp

# convert base64-encoded frame data string to cv2 image
frame_nparr = np.fromstring(base64.b64decode(frame_data_b64), dtype=np.uint8)
frame_data = cv2.imdecode(frame_nparr, cv2.IMREAD_COLOR)

if self.show_gaze and "gaze2d" in gaze_data:
gaze2d = gaze_data["gaze2d"]
logging.info(f"Gaze2d: {gaze2d[0]:9.4f},{gaze2d[1]:9.4f}")

# convert rational (x,y) to pixel location (x,y)
h, w = frame_data.shape[:2]
fix = (int(gaze2d[0] * w), int(gaze2d[1] * h))

# draw gaze on frame_data
frame_data = cv2.circle(frame_data, fix, 10, (0, 0, 255), 3)

# save gaze_data to file
self.save_json("gaze-data", gaze_data)

# self.save_video("stream", frame_data, 25)

# TODO: figure out why CV2 window isn't showing
ret_chunk.add(self.frame_key, frame_data, "image")
# print("---------------------ADDED FRAME DATA------------------------")
ret_chunk.add("gaze_data", gaze_data)
# print("---------------------ADDED GAZE DATA------------------------")

return ret_chunk

@cpe.register # .with_config(style="blocking")
async def calibrate(self) -> bool:
# TODO: test if calibration works separately for rudimentary and regular
await self.g3.rudimentary.calibrate()
return await self.g3.calibrate.run()

# TODO: check recorder state before making start, stop, cancel calls
# TODO: add visual cue to indicate recorder state, e.g. UI components or greyed out buttons
# @cpe.register
# async def start_recording(self) -> bool:
# return await self.g3.recorder.start()

@cpe.register
async def cancel_recording(self) -> None:
await self.g3.recorder.cancel()

# @cpe.register
# async def stop_recording(self) -> bool:
# # TODO: check if recording has started
# return await self.g3.recorder.stop()

@cpe.register
async def take_snapshot(self) -> bool:
return await self.g3.recorder.snapshot()

@cpe.register.with_config(params={"download_dir": "str"})
async def download_latest_recording(self, download_dir: str) -> Optional[str]:
if not await self.g3.recordings:
return None

return await self.g3.recordings[0].download_files(download_dir)

@cpe.register.with_config(params={"download_dir": "str"})
async def download_all_recordings(self, download_dir: str) -> Optional[List[str]]:
if not await self.g3.recordings:
return None

return await asyncio.gather(
recording.download_files(download_dir) for recording in self.g3.recordings
)

async def teardown(self) -> None:
# await self.streams.__aexit__()
await self.g3.recorder.stop()
await self.g3.rudimentary.stop_streams()
await self.unsub_to_scene
await self.unsub_to_gaze

await self.g3.close()
42 changes: 42 additions & 0 deletions configs/g3/all_local_single_device.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"mode": "record",
"workers": {
"manager_ip": "172.16.80.147",
"manager_port": 9001,
"instances": [
{
"name": "local",
"id": "local",
"description": "local worker for the MMLA pipeline demo with a G3 node"
}
]
},
"nodes": [
{
"registry_name": "CPPipelines_G3",
"name": "g3-instance",
"kwargs": {
"hostname": "TG03B-080201037351",
"show_gaze": true
},
"package": "chimerapy-pipelines"
},
{
"registry_name": "CPPipelines_ShowWindows",
"name": "show",
"package": "chimerapy-pipelines"
}
],
"adj": [["g3-instance", "show"]],
"manager_config": {
"logdir": "cp-logs",
"port": 9001
},
"mappings": {
"local": ["g3-instance", "show"]
},
"timeouts": {
"commit_timeout": 120,
"shutdown_timeout": 300
}
}
24 changes: 11 additions & 13 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
name = "chimerapy-pipelines"
version = "0.0.1"
description = "Respository of sharable pipelines for ChimeraPy"
license = {file = "LICENSE.txt"}
license = { file = "LICENSE.txt" }
readme = "README.md"
requires-python = ">3.6"

keywords = ["education", "multimodal", "data", "learning", "analytics"]

classifiers = [
"Programming Language :: Python :: 3"
]
classifiers = ["Programming Language :: Python :: 3"]

dependencies = [
'numpy',
Expand All @@ -26,7 +24,7 @@ dependencies = [
'chimerapy-orchestrator',
'chimerapy-engine',
'pyaudio',
'pvrecorder'
'pvrecorder',
]

[project.optional-dependencies]
Expand All @@ -40,16 +38,18 @@ test = [

mfsort = [
'mf_sort[yolo] @ git+https://github.com/kbvatral/MF-SORT.git@master#egg=mf_sort',
'ultralytics'
'ultralytics',
]

embodied = [
'elp @ git+http://github.com/oele-isis-vanderbilt/EmbodiedLearningProject@main#egg=elp',
'l2cs @ git+https://github.com/edavalosanaya/L2CS-Net.git@main#egg=l2cs'
'l2cs @ git+https://github.com/edavalosanaya/L2CS-Net.git@main#egg=l2cs',
]

yolov8 = [
'ultralytics'
yolov8 = ['ultralytics']

g3 = [
'g3pylib @ git+https://github.com/albertna/g3pylib.git@feature/download-recording#egg=g3pylib',
]

[project.urls]
Expand All @@ -72,16 +72,14 @@ where = ["."]
ignore = ["E501"]
select = ["E", "W", "F", "C", "B", "I"]
ignore-init-module-imports = true
fixable = ["I001"] # isort fix only
fixable = ["I001"] # isort fix only
extend-exclude = ["run.py"]

[tool.ruff.per-file-ignores]
"__init__.py" = ["E402", "F401"]
"chimerapy/pipelines/__version__.py" = ["E402"]




[project.entry-points."chimerapy.orchestrator.nodes_registry"]
get_nodes_registry = "chimerapy.pipelines:register_nodes_metadata"

Expand All @@ -96,7 +94,7 @@ log_cli_format = "%(asctime)s.%(msecs)03d [%(levelname)8s] %(message)s (%(filena
log_cli_date_format = "%Y-%m-%d %H:%M:%S"

# Timeout
faulthandler_timeout=300
faulthandler_timeout = 300

# Ignore warnings
filterwarnings = "ignore::DeprecationWarning"