From 8bb0f7670d1dade827d74ba3adf672d51d30b567 Mon Sep 17 00:00:00 2001 From: Louis MOREAU Date: Fri, 19 Dec 2025 12:09:57 +0100 Subject: [PATCH 1/2] Added example --- .gitignore | 1 + examples/image/track-objects.py | 141 +++++++++++++++++++++++++++ examples/image/utils/__init__.py | 0 examples/image/utils/sort_tracker.py | 134 +++++++++++++++++++++++++ 4 files changed, 276 insertions(+) create mode 100644 examples/image/track-objects.py create mode 100644 examples/image/utils/__init__.py create mode 100644 examples/image/utils/sort_tracker.py diff --git a/.gitignore b/.gitignore index 02df1b9..eca524b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ build *.jpg .DS_Store .venv/ +venv/ .vscode *.eim .act-secrets diff --git a/examples/image/track-objects.py b/examples/image/track-objects.py new file mode 100644 index 0000000..fc6e9e8 --- /dev/null +++ b/examples/image/track-objects.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python + +import device_patches # Device specific patches for Jetson Nano (needs to be before importing cv2) + +# To run this example, you will need to install filterpy and lap via `pip install filterpy lap` +try: + import cv2 +except ImportError: + print('Missing OpenCV, install via `pip3 install "opencv-python>=4.5.1.48,<5"`') + exit(1) +import os +import sys, getopt +import signal +import time +from edge_impulse_linux.image import ImageImpulseRunner +from utils.sort_tracker import SORTTracker + +runner = None +# if you don't want to see a camera preview, set this to False +show_camera = True +if (sys.platform == 'linux' and not os.environ.get('DISPLAY')): + show_camera = False + +def now(): + return round(time.time() * 1000) + +def get_webcams(): + port_ids = [] + for port in range(5): + print("Looking for a camera in port %s:" %port) + camera = cv2.VideoCapture(port) + if camera.isOpened(): + ret = camera.read()[0] + if ret: + backendName =camera.getBackendName() + w = camera.get(3) + h = camera.get(4) + print("Camera %s (%s x %s) found in port %s " %(backendName,h,w, port)) + port_ids.append(port) + camera.release() + return port_ids + +def sigint_handler(sig, frame): + print('Interrupted') + if (runner): + runner.stop() + sys.exit(0) + +signal.signal(signal.SIGINT, sigint_handler) + +def help(): + print('python classify.py ') + +def main(argv): + try: + opts, args = getopt.getopt(argv, "h", ["--help"]) + except getopt.GetoptError: + help() + sys.exit(2) + + for opt, arg in opts: + if opt in ('-h', '--help'): + help() + sys.exit() + + if len(args) == 0: + help() + sys.exit(2) + + model = args[0] + + dir_path = os.path.dirname(os.path.realpath(__file__)) + modelfile = os.path.join(dir_path, model) + + print('MODEL: ' + modelfile) + + with ImageImpulseRunner(modelfile) as runner: + try: + model_info = runner.init() + # model_info = runner.init(debug=True, timeout=60) # to get debug print out and set longer timeout + print('Loaded runner for "' + model_info['project']['owner'] + ' / ' + model_info['project']['name'] + '"') + labels = model_info['model_parameters']['labels'] + if len(args)>= 2: + videoCaptureDeviceId = int(args[1]) + else: + port_ids = get_webcams() + if len(port_ids) == 0: + raise Exception('Cannot find any webcams') + if len(args)<= 1 and len(port_ids)> 1: + raise Exception("Multiple cameras found. Add the camera port ID as a second argument to use to this script") + videoCaptureDeviceId = int(port_ids[0]) + + camera = cv2.VideoCapture(videoCaptureDeviceId) + ret = camera.read()[0] + if ret: + backendName = camera.getBackendName() + w = camera.get(3) + h = camera.get(4) + print("Camera %s (%s x %s) in port %s selected." %(backendName,h,w, videoCaptureDeviceId)) + camera.release() + else: + raise Exception("Couldn't initialize selected camera.") + + next_frame = 0 # limit to ~10 fps here + + # Initialize tracker with your chosen parameters + tracker = SORTTracker( + max_age=5, + min_hits=3, + iou_threshold=0.01 + ) + + for res, img in runner.classifier(videoCaptureDeviceId): + if (next_frame > now()): + time.sleep((next_frame - now()) / 1000) + + if "bounding_boxes" in res["result"].keys(): + detections = [] + for bb in res["result"]["bounding_boxes"]: + x, y, w, h = bb['x'], bb['y'], bb['width'], bb['height'] + detections.append([x + w/2, y + h/2, w, h]) # For FOMO, we might want to pass a larger box than the detected one + + tracked_objects = tracker.update(detections) + for track_id, (x, y, w, h) in tracked_objects: + x1, y1 = int(x - w/2), int(y - h/2) + x2, y2 = int(x + w/2), int(y + h/2) + cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2) + cv2.putText(img, f"ID: {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2) + + if (show_camera): + cv2.imshow('edgeimpulse', cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) + if cv2.waitKey(1) == ord('q'): + break + + next_frame = now() + 100 + finally: + if (runner): + runner.stop() + +if __name__ == "__main__": + main(sys.argv[1:]) \ No newline at end of file diff --git a/examples/image/utils/__init__.py b/examples/image/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/image/utils/sort_tracker.py b/examples/image/utils/sort_tracker.py new file mode 100644 index 0000000..cbd7235 --- /dev/null +++ b/examples/image/utils/sort_tracker.py @@ -0,0 +1,134 @@ +# To use this package, you will need to install filterpy and lap via `pip install filterpy lap` + +import numpy as np +from filterpy.kalman import KalmanFilter +from scipy.optimize import linear_sum_assignment +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class Track: + def __init__(self, detection, track_id): + self.track_id = track_id + self.history = [detection] + self.hits = 1 + self.no_losses = 0 + self.kf = KalmanFilter(dim_x=4, dim_z=2) + self.kf.F = np.array([ + [1, 0, 1, 0], + [0, 1, 0, 1], + [0, 0, 1, 0], + [0, 0, 0, 1] + ]) + self.kf.H = np.array([ + [1, 0, 0, 0], + [0, 1, 0, 0] + ]) + self.kf.R *= 10. + self.kf.Q[2:, 2:] *= 0.01 + self.kf.x[:2] = np.array(detection[:2]).reshape(2, 1) + +class SORTTracker: + def __init__(self, max_age=5, min_hits=3, iou_threshold=0.1): + """ + Args: + max_age: Maximum number of frames to keep a track alive without detections. + min_hits: Minimum number of detections to initialize a track. + iou_threshold: IoU threshold for matching detections to tracks. + """ + self.next_id = 1 + self.tracks = {} + self.max_age = max_age + self.min_hits = min_hits + self.iou_threshold = iou_threshold + + def update(self, detections): + if not detections: + for track_id in list(self.tracks.keys()): + self.tracks[track_id]['no_losses'] += 1 + if self.tracks[track_id]['no_losses'] > self.max_age: + logger.debug(f"Removing track {track_id} due to max age.") + del self.tracks[track_id] + return [] + + # Predict + for track_id in self.tracks: + self.tracks[track_id]['kf'].predict() + + # Match detections to tracks using IoU + cost_matrix = self._build_cost_matrix(detections) + row_ind, col_ind = linear_sum_assignment(cost_matrix) + + assigned_tracks = set() + for r, c in zip(row_ind, col_ind): + if cost_matrix[r, c] > (1 - self.iou_threshold): + continue + track_id = list(self.tracks.keys())[r] + self.tracks[track_id]['kf'].update(np.array(detections[c][:2]).reshape(2, 1)) + self.tracks[track_id]['history'].append(detections[c]) + self.tracks[track_id]['hits'] += 1 + self.tracks[track_id]['no_losses'] = 0 + assigned_tracks.add(track_id) + + # Create new tracks for unassigned detections + for i, det in enumerate(detections): + if i not in col_ind: + self.tracks[self.next_id] = { + 'kf': KalmanFilter(dim_x=4, dim_z=2), + 'history': [det], + 'hits': 1, + 'no_losses': 0 + } + self.tracks[self.next_id]['kf'].F = np.array([ + [1, 0, 1, 0], + [0, 1, 0, 1], + [0, 0, 1, 0], + [0, 0, 0, 1] + ]) + self.tracks[self.next_id]['kf'].H = np.array([ + [1, 0, 0, 0], + [0, 1, 0, 0] + ]) + self.tracks[self.next_id]['kf'].R *= 10. + self.tracks[self.next_id]['kf'].Q[2:, 2:] *= 0.01 + self.tracks[self.next_id]['kf'].x[:2] = np.array(det[:2]).reshape(2, 1) + self.next_id += 1 + + # Remove lost tracks + for track_id in list(self.tracks.keys()): + if track_id not in assigned_tracks: + self.tracks[track_id]['no_losses'] += 1 + if self.tracks[track_id]['no_losses'] > self.max_age: + logger.debug(f"Removing track {track_id} due to no detections.") + del self.tracks[track_id] + + return self._get_tracked_objects() + + def _build_cost_matrix(self, detections): + cost_matrix = np.zeros((len(self.tracks), len(detections))) + for i, track_id in enumerate(self.tracks): + for j, det in enumerate(detections): + iou = self._calculate_iou(self.tracks[track_id]['history'][-1], det) + cost_matrix[i, j] = 1 - iou + return cost_matrix + + def _calculate_iou(self, box1, box2): + x1, y1, w1, h1 = box1 + x2, y2, w2, h2 = box2 + box1_coords = (x1 - w1/2, y1 - h1/2, x1 + w1/2, y1 + h1/2) + box2_coords = (x2 - w2/2, y2 - h2/2, x2 + w2/2, y2 + h2/2) + xi1, yi1 = max(box1_coords[0], box2_coords[0]), max(box1_coords[1], box2_coords[1]) + xi2, yi2 = min(box1_coords[2], box2_coords[2]), min(box1_coords[3], box2_coords[3]) + inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1) + box1_area = w1 * h1 + box2_area = w2 * h2 + union_area = box1_area + box2_area - inter_area + return inter_area / union_area if union_area > 0 else 0 + + def _get_tracked_objects(self): + tracked_objects = [] + for track_id, track in self.tracks.items(): + if track['hits'] >= self.min_hits: + tracked_objects.append((track_id, track['history'][-1])) + return tracked_objects From 2deb0a21ca52df93a012096754f63bd36881da64 Mon Sep 17 00:00:00 2001 From: Louis MOREAU Date: Fri, 19 Dec 2025 16:54:46 +0100 Subject: [PATCH 2/2] Used the bbox produced by the classifier instead of the one from the tracker output for more fluidity --- examples/image/track-objects.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/image/track-objects.py b/examples/image/track-objects.py index fc6e9e8..1e3d9c6 100644 --- a/examples/image/track-objects.py +++ b/examples/image/track-objects.py @@ -118,14 +118,15 @@ def main(argv): detections = [] for bb in res["result"]["bounding_boxes"]: x, y, w, h = bb['x'], bb['y'], bb['width'], bb['height'] + img = cv2.rectangle(img, (bb['x'], bb['y']), (bb['x'] + bb['width'], bb['y'] + bb['height']), (255, 0, 0), 1) detections.append([x + w/2, y + h/2, w, h]) # For FOMO, we might want to pass a larger box than the detected one tracked_objects = tracker.update(detections) for track_id, (x, y, w, h) in tracked_objects: x1, y1 = int(x - w/2), int(y - h/2) x2, y2 = int(x + w/2), int(y + h/2) - cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2) - cv2.putText(img, f"ID: {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2) + # cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2) + cv2.putText(img, f"ID: {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 1) if (show_camera): cv2.imshow('edgeimpulse', cv2.cvtColor(img, cv2.COLOR_RGB2BGR))