Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions eyeGestures/Fixation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
class Fixation:
"""Class performing Fixation"""

def __init__(self, x, y, radius=100):
def __init__(self, x: float, y: float, radius: int = 100) -> None:
self.radius = radius
self.fixation = 0.0
self.x = x
self.y = y

def process(self, x, y):
def process(self, x: float, y: float) -> float:
"""Function processing x and y estimated points for fixation detection"""

if (x - self.x) ** 2 + (y - self.y) ** 2 < self.radius**2:
self.fixation = min(self.fixation + 0.02, 1.0)
else:
self.x = x
self.y = y
self.fixation = 0
self.fixation = 0.0

return self.fixation
22 changes: 12 additions & 10 deletions eyeGestures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, calibration_radius: int = 1000) -> None:
self.clb: Dict[str, Calibrator_v2] = dict() # Calibrator_v2()
self.cap = None
self.calibration: Dict[str, bool] = dict()
self.average_points: Dict[str, npt.NDArray] = dict()
self.average_points: Dict[str, npt.NDArray[np.float64]] = dict()
self.filled_points: Dict[str, int] = dict()
self.enable_CN = False
self.calibrate_gestures = False
Expand All @@ -37,12 +37,12 @@ def __init__(self, calibration_radius: int = 1000) -> None:

# this has to be contexted
self.prev_timestamp: Dict[str, float] = dict()
self.prev_point: Dict[str, npt.NDArray] = dict()
self.prev_point: Dict[str, npt.NDArray[np.float64]] = dict()
self.fix: Optional[float] = None
self.velocity_max: Dict[str, int] = dict()
self.velocity_min: Dict[str, int] = dict()
self.fixationTracker: Dict[str, Fixation] = dict()
self.key_points_buffer: Dict[str, List[npt.NDArray]] = dict()
self.key_points_buffer: Dict[str, List[npt.NDArray[np.float64]]] = dict()

self.starting_head_position = np.zeros((1, 2))
self.starting_size = np.zeros((1, 2))
Expand All @@ -55,11 +55,11 @@ def saveModel(self, context: str = "main") -> Optional[bytes]:
def loadModel(self, model: Any, context: str = "main") -> None:
self.clb[context] = pickle.loads(model)

def uploadCalibrationMap(self, points: npt.NDArray, context: str = "main") -> None:
def uploadCalibrationMap(self, points: npt.NDArray[np.float64], context: str = "main") -> None:
self.addContext(context)
self.clb[context].updMatrix(np.array(points))

def getLandmarks(self, frame: cv2.typing.MatLike) -> Tuple[npt.NDArray, bool, cv2.typing.MatLike]:
def getLandmarks(self, frame: cv2.typing.MatLike) -> Tuple[npt.NDArray[np.float64], bool, cv2.typing.MatLike]:

frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = cv2.flip(frame, 1)
Expand All @@ -68,6 +68,7 @@ def getLandmarks(self, frame: cv2.typing.MatLike) -> Tuple[npt.NDArray, bool, cv
self.face.process(frame, self.finder.find(frame))

face_landmarks = self.face.get_landmarks()
assert face_landmarks is not None
l_eye = self.face.get_left_eye()
r_eye = self.face.get_right_eye()
l_eye_landmarks = l_eye.getLandmarks()
Expand All @@ -93,7 +94,9 @@ def getLandmarks(self, frame: cv2.typing.MatLike) -> Tuple[npt.NDArray, bool, cv
scale_y = self.starting_size[0, 1] / y_width

# eye_events = np.array([event.blink,event.fixation]).reshape(1, 2)
key_points = np.concatenate(
assert l_eye_landmarks is not None
assert r_eye_landmarks is not None
key_points: npt.NDArray[np.float64] = np.concatenate(
(
l_eye_landmarks,
r_eye_landmarks,
Expand Down Expand Up @@ -172,7 +175,6 @@ def step(
self.filled_points[context] = 1

averaged_point = np.sum(self.average_points[context][:, :], axis=0) / (self.filled_points[context])

fixation = self.fixationTracker[context].process(averaged_point[0], averaged_point[1])

duration = time.time() - self.prev_timestamp[context]
Expand Down Expand Up @@ -229,7 +231,7 @@ def __init__(self, calibration_radius: int = 1000) -> None:

self.CN: int = 5

self.average_points: Dict[str, npt.NDArray] = dict()
self.average_points: Dict[str, npt.NDArray[np.float64]] = dict()
self.filled_points: Dict[str, int] = dict()
self.enable_CN = False
self.calibrate_gestures = False
Expand All @@ -244,13 +246,13 @@ def saveModel(self, context: str = "main") -> Optional[bytes]:
def loadModel(self, model: Any, context: str = "main") -> None:
self.clb[context] = pickle.loads(model)

def uploadCalibrationMap(self, points: npt.NDArray, context="main") -> None:
def uploadCalibrationMap(self, points: npt.NDArray[np.float64], context="main") -> None:
self.addContext(context)
self.clb[context].updMatrix(np.array(points))

def getLandmarks(
self, frame: cv2.typing.MatLike, calibrate: bool = False, context: str = "main"
) -> Tuple[npt.NDArray, npt.NDArray, bool, bool, Optional[Cevent]]:
) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], bool, bool, Optional[Cevent]]:

frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = cv2.flip(frame, 1)
Expand Down
64 changes: 33 additions & 31 deletions eyeGestures/calibration_v2.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import threading
from typing import List

import numpy as np
import numpy.typing as npt
import sklearn.linear_model as scireg


def euclidean_distance(point1, point2):
def euclidean_distance(point1: npt.NDArray[np.float64], point2: npt.NDArray[np.float64]) -> np.float64:
return np.linalg.norm(point1 - point2)


Expand All @@ -15,13 +17,13 @@ class Calibrator:
ACCEPTANCE_RADIUS = 500
CALIBRATION_RADIUS = 1000

def __init__(self, calibration_radius=1000):
self.X = []
self.Y_y = []
self.Y_x = []
self.__tmp_X = []
self.__tmp_Y_y = []
self.__tmp_Y_x = []
def __init__(self, calibration_radius: int = 1000) -> None:
self.X: List[npt.NDArray[np.float64]] = []
self.Y_y: List[npt.NDArray[np.float64]] = []
self.Y_x: List[npt.NDArray[np.float64]] = []
self.__tmp_X: List[npt.NDArray[np.float64]] = []
self.__tmp_Y_y: List[npt.NDArray[np.float64]] = []
self.__tmp_Y_x: List[npt.NDArray[np.float64]] = []
self.reg = None
self.reg_x = scireg.Ridge(alpha=0.5)
self.reg_y = scireg.Ridge(alpha=0.5)
Expand All @@ -40,28 +42,28 @@ def __init__(self, calibration_radius=1000):

self.lock = threading.Lock()
self.calcualtion_coroutine = threading.Thread(target=self.__async_post_fit)
self.fit_coroutines = []
self.fit_coroutines: List[threading.Thread] = []

def __launch_fit(self):
def __launch_fit(self) -> None:
coroutine = threading.Thread(target=self.__async_fit)
self.fit_coroutines.append(coroutine)
coroutine.start()
self.__join_finished()

def __join_finished(self):
def __join_finished(self) -> None:
for coroutine in self.fit_coroutines:
if not coroutine.is_alive():
coroutine.join()

def add(self, x, y):
def add(self, x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> None:
with self.lock:
self.__tmp_X.append(x.flatten())
self.__tmp_Y_y.append(y[1])
self.__tmp_Y_x.append(y[0])
self.__launch_fit()

# This coroutine helps to asynchronously recalculate results
def __async_fit(self):
def __async_fit(self) -> None:
try:
with self.lock:
__fit_tmp_X = np.array(self.__tmp_X + self.X, dtype=object)
Expand All @@ -74,7 +76,7 @@ def __async_fit(self):
print(f"Exception as {e}")

# This coroutine helps to asynchronously recalculate results
def __async_post_fit(self):
def __async_post_fit(self) -> None:
try:
tmp_fixations_x = scireg.LassoCV(cv=50, max_iter=10000)
tmp_fixations_y = scireg.LassoCV(cv=50, max_iter=10000)
Expand All @@ -95,16 +97,16 @@ def __async_post_fit(self):
print(f"Exception as {e}")
self.cv_not_set = True

def post_fit(self):
def post_fit(self) -> None:
if self.cv_not_set:
# self.calcualtion_coroutine.start()
self.cv_not_set = False

def whichAlgorithm(self):
def whichAlgorithm(self) -> str:
with self.lock:
return self.current_algorithm

def predict(self, x):
def predict(self, x: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
with self.lock:
if self.fitted:
x = x.flatten()
Expand All @@ -114,7 +116,7 @@ def predict(self, x):
return np.array([y_x, y_y])
return np.array([0.0, 0.0])

def movePoint(self):
def movePoint(self) -> None:
with self.lock:
self.X = self.X + self.__tmp_X
self.Y_y = self.Y_y + self.__tmp_Y_y
Expand All @@ -124,39 +126,39 @@ def movePoint(self):
self.__tmp_Y_y = []
self.__tmp_Y_x = []

def isReadyToMove(self):
def isReadyToMove(self) -> int:
return len(self.__tmp_X) > 30 # magic number - collect 30 points

def getCurrentPoint(self, width, heigth):
def getCurrentPoint(self, width: int, heigth: int) -> npt.NDArray[np.float64]:
return self.matrix.getCurrentPoint(width, heigth)

def updMatrix(self, points):
return self.matrix.updMatrix(points)
def updMatrix(self, points: npt.NDArray[np.float64]) -> None:
self.matrix.updMatrix(points)

def unfit(self):
def unfit(self) -> None:
self.acceptance_radius = self.ACCEPTANCE_RADIUS
self.calibration_radius = self.CALIBRATION_RADIUS
self.fitted = False

def increase_precision(self):
def increase_precision(self) -> None:
if self.acceptance_radius > self.precision_limit:
self.acceptance_radius -= self.precision_step
if self.calibration_radius > self.precision_limit and self.acceptance_radius < self.calibration_radius:
self.calibration_radius -= self.precision_step

def insideClbRadius(self, point, width, height):
def insideClbRadius(self, point: npt.NDArray[np.float64], width: int, height: int) -> np.bool_:
return euclidean_distance(point, self.getCurrentPoint(width, height)) < self.calibration_radius

def insideAcptcRadius(self, point, width, height):
def insideAcptcRadius(self, point: npt.NDArray[np.float64], width: int, height: int) -> np.bool_:
return euclidean_distance(point, self.getCurrentPoint(width, height)) < self.acceptance_radius


class CalibrationMatrix:

def __init__(self):
def __init__(self) -> None:

self.iterator = 0
self.points = np.array(
self.points: npt.NDArray[np.float64] = np.array(
[
[1, 0.5],
[0.75, 0.5],
Expand Down Expand Up @@ -186,14 +188,14 @@ def __init__(self):
]
)

def updMatrix(self, points):
def updMatrix(self, points: npt.NDArray[np.float64]) -> None:
self.points = points
self.iterator = 0

def movePoint(self):
def movePoint(self) -> None:
self.iterator += 1
self.iterator %= len(self.points)

def getCurrentPoint(self, width=1.0, height=1.0):
def getCurrentPoint(self, width: int = 1, height: int = 1) -> npt.NDArray[np.float64]:
it = self.iterator
return np.array([self.points[it, 0] * width, self.points[it, 1] * height])
Loading