Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ build
*.jpg
.DS_Store
.venv/
venv/
.vscode
*.eim
.act-secrets
Expand Down
141 changes: 141 additions & 0 deletions examples/image/track-objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#!/usr/bin/env python

import device_patches # Device specific patches for Jetson Nano (needs to be before importing cv2)

# To run this example, you will need to install filterpy and lap via `pip install filterpy lap`
try:
import cv2
except ImportError:
print('Missing OpenCV, install via `pip3 install "opencv-python>=4.5.1.48,<5"`')
exit(1)
import os
import sys, getopt
import signal
import time
from edge_impulse_linux.image import ImageImpulseRunner
from utils.sort_tracker import SORTTracker

runner = None
# if you don't want to see a camera preview, set this to False
show_camera = True
if (sys.platform == 'linux' and not os.environ.get('DISPLAY')):
show_camera = False

def now():
return round(time.time() * 1000)

def get_webcams():
port_ids = []
for port in range(5):
print("Looking for a camera in port %s:" %port)
camera = cv2.VideoCapture(port)
if camera.isOpened():
ret = camera.read()[0]
if ret:
backendName =camera.getBackendName()
w = camera.get(3)
h = camera.get(4)
print("Camera %s (%s x %s) found in port %s " %(backendName,h,w, port))
port_ids.append(port)
camera.release()
return port_ids

def sigint_handler(sig, frame):
print('Interrupted')
if (runner):
runner.stop()
sys.exit(0)

signal.signal(signal.SIGINT, sigint_handler)

def help():
print('python classify.py <path_to_model.eim> <Camera port ID, only required when more than 1 camera is present>')

def main(argv):
try:
opts, args = getopt.getopt(argv, "h", ["--help"])
except getopt.GetoptError:
help()
sys.exit(2)

for opt, arg in opts:
if opt in ('-h', '--help'):
help()
sys.exit()

if len(args) == 0:
help()
sys.exit(2)

model = args[0]

dir_path = os.path.dirname(os.path.realpath(__file__))
modelfile = os.path.join(dir_path, model)

print('MODEL: ' + modelfile)

with ImageImpulseRunner(modelfile) as runner:
try:
model_info = runner.init()
# model_info = runner.init(debug=True, timeout=60) # to get debug print out and set longer timeout
print('Loaded runner for "' + model_info['project']['owner'] + ' / ' + model_info['project']['name'] + '"')
labels = model_info['model_parameters']['labels']
if len(args)>= 2:
videoCaptureDeviceId = int(args[1])
else:
port_ids = get_webcams()
if len(port_ids) == 0:
raise Exception('Cannot find any webcams')
if len(args)<= 1 and len(port_ids)> 1:
raise Exception("Multiple cameras found. Add the camera port ID as a second argument to use to this script")
videoCaptureDeviceId = int(port_ids[0])

camera = cv2.VideoCapture(videoCaptureDeviceId)
ret = camera.read()[0]
if ret:
backendName = camera.getBackendName()
w = camera.get(3)
h = camera.get(4)
print("Camera %s (%s x %s) in port %s selected." %(backendName,h,w, videoCaptureDeviceId))
camera.release()
else:
raise Exception("Couldn't initialize selected camera.")

next_frame = 0 # limit to ~10 fps here

# Initialize tracker with your chosen parameters
tracker = SORTTracker(
max_age=5,
min_hits=3,
iou_threshold=0.01
)

for res, img in runner.classifier(videoCaptureDeviceId):
if (next_frame > now()):
time.sleep((next_frame - now()) / 1000)

if "bounding_boxes" in res["result"].keys():
detections = []
for bb in res["result"]["bounding_boxes"]:
x, y, w, h = bb['x'], bb['y'], bb['width'], bb['height']
detections.append([x + w/2, y + h/2, w, h]) # For FOMO, we might want to pass a larger box than the detected one

tracked_objects = tracker.update(detections)
for track_id, (x, y, w, h) in tracked_objects:
x1, y1 = int(x - w/2), int(y - h/2)
x2, y2 = int(x + w/2), int(y + h/2)
cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
cv2.putText(img, f"ID: {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)

if (show_camera):
cv2.imshow('edgeimpulse', cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
if cv2.waitKey(1) == ord('q'):
break

next_frame = now() + 100
finally:
if (runner):
runner.stop()

if __name__ == "__main__":
main(sys.argv[1:])
Empty file.
134 changes: 134 additions & 0 deletions examples/image/utils/sort_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# To use this package, you will need to install filterpy and lap via `pip install filterpy lap`

import numpy as np
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class Track:
def __init__(self, detection, track_id):
self.track_id = track_id
self.history = [detection]
self.hits = 1
self.no_losses = 0
self.kf = KalmanFilter(dim_x=4, dim_z=2)
self.kf.F = np.array([
[1, 0, 1, 0],
[0, 1, 0, 1],
[0, 0, 1, 0],
[0, 0, 0, 1]
])
self.kf.H = np.array([
[1, 0, 0, 0],
[0, 1, 0, 0]
])
self.kf.R *= 10.
self.kf.Q[2:, 2:] *= 0.01
self.kf.x[:2] = np.array(detection[:2]).reshape(2, 1)

class SORTTracker:
def __init__(self, max_age=5, min_hits=3, iou_threshold=0.1):
"""
Args:
max_age: Maximum number of frames to keep a track alive without detections.
min_hits: Minimum number of detections to initialize a track.
iou_threshold: IoU threshold for matching detections to tracks.
"""
self.next_id = 1
self.tracks = {}
self.max_age = max_age
self.min_hits = min_hits
self.iou_threshold = iou_threshold

def update(self, detections):
if not detections:
for track_id in list(self.tracks.keys()):
self.tracks[track_id]['no_losses'] += 1
if self.tracks[track_id]['no_losses'] > self.max_age:
logger.debug(f"Removing track {track_id} due to max age.")
del self.tracks[track_id]
return []

# Predict
for track_id in self.tracks:
self.tracks[track_id]['kf'].predict()

# Match detections to tracks using IoU
cost_matrix = self._build_cost_matrix(detections)
row_ind, col_ind = linear_sum_assignment(cost_matrix)

assigned_tracks = set()
for r, c in zip(row_ind, col_ind):
if cost_matrix[r, c] > (1 - self.iou_threshold):
continue
track_id = list(self.tracks.keys())[r]
self.tracks[track_id]['kf'].update(np.array(detections[c][:2]).reshape(2, 1))
self.tracks[track_id]['history'].append(detections[c])
self.tracks[track_id]['hits'] += 1
self.tracks[track_id]['no_losses'] = 0
assigned_tracks.add(track_id)

# Create new tracks for unassigned detections
for i, det in enumerate(detections):
if i not in col_ind:
self.tracks[self.next_id] = {
'kf': KalmanFilter(dim_x=4, dim_z=2),
'history': [det],
'hits': 1,
'no_losses': 0
}
self.tracks[self.next_id]['kf'].F = np.array([
[1, 0, 1, 0],
[0, 1, 0, 1],
[0, 0, 1, 0],
[0, 0, 0, 1]
])
self.tracks[self.next_id]['kf'].H = np.array([
[1, 0, 0, 0],
[0, 1, 0, 0]
])
self.tracks[self.next_id]['kf'].R *= 10.
self.tracks[self.next_id]['kf'].Q[2:, 2:] *= 0.01
self.tracks[self.next_id]['kf'].x[:2] = np.array(det[:2]).reshape(2, 1)
self.next_id += 1

# Remove lost tracks
for track_id in list(self.tracks.keys()):
if track_id not in assigned_tracks:
self.tracks[track_id]['no_losses'] += 1
if self.tracks[track_id]['no_losses'] > self.max_age:
logger.debug(f"Removing track {track_id} due to no detections.")
del self.tracks[track_id]

return self._get_tracked_objects()

def _build_cost_matrix(self, detections):
cost_matrix = np.zeros((len(self.tracks), len(detections)))
for i, track_id in enumerate(self.tracks):
for j, det in enumerate(detections):
iou = self._calculate_iou(self.tracks[track_id]['history'][-1], det)
cost_matrix[i, j] = 1 - iou
return cost_matrix

def _calculate_iou(self, box1, box2):
x1, y1, w1, h1 = box1
x2, y2, w2, h2 = box2
box1_coords = (x1 - w1/2, y1 - h1/2, x1 + w1/2, y1 + h1/2)
box2_coords = (x2 - w2/2, y2 - h2/2, x2 + w2/2, y2 + h2/2)
xi1, yi1 = max(box1_coords[0], box2_coords[0]), max(box1_coords[1], box2_coords[1])
xi2, yi2 = min(box1_coords[2], box2_coords[2]), min(box1_coords[3], box2_coords[3])
inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
box1_area = w1 * h1
box2_area = w2 * h2
union_area = box1_area + box2_area - inter_area
return inter_area / union_area if union_area > 0 else 0

def _get_tracked_objects(self):
tracked_objects = []
for track_id, track in self.tracks.items():
if track['hits'] >= self.min_hits:
tracked_objects.append((track_id, track['history'][-1]))
return tracked_objects