Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,4 @@ cython_debug/
*.jpeg

skellytracker/utilities/quine_directory_printer/output/*
recorded_objects.npy
16 changes: 11 additions & 5 deletions skellytracker/process_folder_of_videos.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import logging
from multiprocessing import Pool, cpu_count
from pathlib import Path
import sys
from typing import Optional
from typing import Optional, Union
import numpy as np
from pydantic import BaseModel


from skellytracker.trackers.base_tracker.base_tracker import BaseTracker
from skellytracker.trackers.base_tracker.base_tracking_params import BaseTrackingParams
from skellytracker.trackers.bright_point_tracker.brightest_point_tracker import (
BrightestPointTracker,
)

from skellytracker.utilities.get_video_paths import get_video_paths
try:
from skellytracker.trackers.yolo_mediapipe_combo_tracker.yolo_mediapipe_combo_tracker import (
Expand Down Expand Up @@ -139,12 +140,17 @@ def process_single_video(
return output_array


def get_tracker(tracker_name: str, tracking_params: BaseModel) -> BaseTracker:
def get_tracker(
tracker_name: str,
tracking_params: Union[
BaseTrackingParams, MediapipeTrackingParams, YOLOTrackingParams
],
) -> BaseTracker:
"""
Returns a tracker object based on the given tracker_type and tracking_params.

:param tracker_type (str): The type of tracker to be created.
:param tracking_params (BaseModel): The tracking parameters to be used for creating the tracker.
:param tracking_params (Union[BaseTrackingParams, MediapipeTrackingParams, YOLOTrackingParams],): The tracking parameters to be used for creating the tracker.
:return BaseTracker: The tracker object based on the given tracker_type and tracking_params.
:raise ValueError: If an invalid tracker_type is provided.
"""
Expand All @@ -169,7 +175,7 @@ def get_tracker(tracker_name: str, tracking_params: BaseModel) -> BaseTracker:

elif tracker_name == "YOLOPoseTracker":
tracker = YOLOPoseTracker(
model_size="medium",
model_size=tracking_params.model_size,
)

elif tracker_name == "BrightestPointTracker":
Expand Down
19 changes: 10 additions & 9 deletions skellytracker/trackers/charuco_tracker/charuco_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ def __init__(self,
tracked_object_names: List[str],
squares_x: int,
squares_y: int,
dictionary: cv2.aruco_Dictionary,
dictionary: cv2.aruco.Dictionary,
squareLength: float = 1,
markerLength: float = .8,
):
super().__init__(tracked_object_names=tracked_object_names)
self.board = cv2.aruco.CharucoBoard_create(squares_x, squares_y, squareLength, markerLength, dictionary)
self.board = cv2.aruco.CharucoBoard([squares_x, squares_y], squareLength, markerLength, dictionary)
self.dictionary = dictionary

def process_image(self, image: np.ndarray, **kwargs) -> Dict[str, TrackedObject]:
# Convert the image to grayscale
gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

# Detect Aruco markers
corners, ids, _ = cv2.aruco.detectMarkers(gray_image, self.board.dictionary)
corners, ids, _ = cv2.aruco.detectMarkers(gray_image, self.dictionary)

# If any markers were found
if len(corners) > 0:
Expand Down Expand Up @@ -66,13 +67,13 @@ def annotate_image(self, image: np.ndarray, tracked_objects: Dict[str, TrackedOb


if __name__ == "__main__":
charuco_squares_x_in = 7
charuco_squares_y_in = 5
number_of_charuco_markers = charuco_squares_x_in - 1 * charuco_squares_y_in - 1
charuco_squares_x = 7
charuco_squares_y = 5
number_of_charuco_markers = charuco_squares_x - 1 * charuco_squares_y - 1
charuco_ids = [str(index) for index in range(number_of_charuco_markers)]

CharucoTracker(tracked_object_names=charuco_ids,
squares_x=charuco_squares_x_in,
squares_y=charuco_squares_y_in,
dictionary=cv2.aruco.Dictionary_get(cv2.aruco.DICT_4X4_250)
squares_x=charuco_squares_x,
squares_y=charuco_squares_y,
dictionary=cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_250)
).demo()
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ class YOLOObjectTrackingParams(BaseTrackingParams):
model_size: str = "medium"
person_only: bool = True
confidence_threshold: float = 0.5
use_gpu: bool = True
26 changes: 24 additions & 2 deletions skellytracker/trackers/yolo_object_tracker/yolo_object_tracker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import platform
import numpy as np

from torch import Tensor, cuda, backends
from typing import Dict
from ultralytics import YOLO

Expand All @@ -18,6 +21,7 @@ def __init__(
model_size: str = "nano",
person_only: bool = True,
confidence_threshold: float = 0.5,
use_gpu: bool = True,
):
super().__init__(tracked_object_names=["object"], recorder=YOLOObjectRecorder())

Expand All @@ -29,10 +33,28 @@ def __init__(
else:
self.classes = None

if cuda.is_available() and use_gpu:
self.device = "0"
cuda.set_device(int(self.device))
elif backends.mps.is_available() and use_gpu:
self.device = "mps"
else:
self.device = "cpu"

def process_image(self, image, **kwargs) -> Dict[str, TrackedObject]:
results = self.model(image, classes=self.classes, max_det=1, verbose=False, conf=self.confidence_threshold)
results = self.model(
image,
classes=self.classes,
max_det=1,
verbose=False,
conf=self.confidence_threshold,
device=self.device,
)

box_xyxy = np.asarray(results[0].boxes.xyxy).flatten()
if self.device == "mps": # numpy cannot handle mps tensors currently
box_xyxy = np.asarray(Tensor.cpu(results[0].boxes.xyxy)).flatten()
else:
box_xyxy = np.asarray(results[0].boxes.xyxy).flatten()

if box_xyxy.size > 0:
self.tracked_objects["object"].pixel_x = (box_xyxy[0] + box_xyxy[2]) / 0.5
Expand Down
1 change: 1 addition & 0 deletions skellytracker/trackers/yolo_tracker/yolo_model_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@ class YOLOModelInfo:

class YOLOTrackingParams(BaseTrackingParams):
model_size: str = "medium"
use_gpu: bool = True
17 changes: 14 additions & 3 deletions skellytracker/trackers/yolo_tracker/yolo_tracker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from torch import cuda, backends, Tensor
import numpy as np
from typing import Dict
from ultralytics import YOLO
Expand All @@ -9,15 +10,22 @@


class YOLOPoseTracker(BaseTracker):
def __init__(self, model_size: str = "nano"):
def __init__(self, model_size: str = "nano", use_gpu: bool = True):
super().__init__(tracked_object_names=[], recorder=YOLORecorder())

pytorch_model = YOLOModelInfo.model_dictionary[model_size]
self.model = YOLO(pytorch_model)
if cuda.is_available() and use_gpu:
self.device = "0"
cuda.set_device(int(self.device))
elif backends.mps.is_available() and use_gpu:
self.device = "mps"
else:
self.device = "cpu"

def process_image(self, image: np.ndarray, **kwargs) -> Dict[str, TrackedObject]:
# "max_det=1" argument to limit to single person tracking for now
results = self.model(image, max_det=1, verbose=False)
results = self.model(image, max_det=1, verbose=True, device=self.device)

self.unpack_results(results)

Expand All @@ -29,7 +37,10 @@ def annotate_image(self, image: np.ndarray, results, **kwargs) -> np.ndarray:
return results[-1].plot()

def unpack_results(self, results):
tracked_person = np.asarray(results[-1].keypoints.xy)
if self.device == "mps":
tracked_person = np.asarray(Tensor.cpu(results[-1].keypoints.xy))
else:
tracked_person = np.asarray(results[-1].keypoints.xy)
self.tracked_objects["tracked_person"] = TrackedObject(
object_id="tracked_person"
)
Expand Down