Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/FlotationAnalytics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
Данная работа представляет автоматизацию анализа флотации. Вы можете загрузить видеозапись, выбрать интересующий трекер и на выходе будет детальный анализ трекинга пузырей.
=====================

Выполните следующий код в терминале, чтобы создать и перейти в виртуальное окружение:

`python3 -m venv .venv`

`source ./.venv/bin/activate`

Установите все необходимые зависимости:

`pip install -r requirements.txt`

В моем коде используются вызовы следующих моделей: **DeepSORT** и **CounTR**. Их сначала необходимо склонировать себе в проект:

DeepSORT: `git clone https://github.com/nwojke/deep_sort.git`

CounTR: `git clone https://github.com/Verg-Avesta/CounTR.git`

Также в репозитории CounTR нужно внести некоторые изменения, чтобы корректно запустить проект:
- в файле **CounTR/util/misc.py**:

закомментировать строчку`from torch._six import inf` и прописать `inf = float('inf')`

- в файле **CounTR/util/pos_embed.py**:

`omega = np.arange(embed_dim // 2, dtype=np.float)` на `omega = np.arange(embed_dim // 2, dtype=float)`

В файле **CounTR/models_mae_cross.py** нужно написать правильный путь, а именно добавить название папки

`from CounTR.models_crossvit import CrossAttentionBlock`

`from CounTR.util.pos_embed import get_2d_sincos_pos_embed`


Скачайте веса и добавьте в папку **model**:

https://drive.google.com/file/d/1CzYyiYqLshMdqJ9ZPFJyIzXBa7uFUIYZ/view?usp=sharing

В данной работе используются следующие существующие методы трекинга:
- SORT
- DeepSORT
- Bot-SORT
- ByteTrack

Для детекции объектов ограничивающими рамками:
- YOLOv11

Для детекции центров объектов:
- CounTR
- PseCO

### Запуск основного сервиса:

`PYTHONPATH=абсолютный_путь_до_проекта HYDRA_FULL_ERROR=1 python app/main.py tracker=botsort output_dir="resuts_end"`,

где tracker - выбор трекера, output_dir - папка для сохранения результатов

### Запуск тестов:

`PYTHONPATH=абсолютный_путь_до_проекта pytest tests/test_metrics.py -v`

Также представлены скрипты, которые не связаны с сервисом, но позволяют провести анализ.
-----------------------------------

**Конвертация из YOLO формата в CVAT аннотацию:** `fromYOLOtoCVAT.py`

**Подсчет контролируемых метрик: MOTP и MOTA.** А также визуализация сопоставлений истинных ограничивающих рамок с предсказанными лежит в `metrics.py`

В коде используются `cvat_annotations.xml` и `output_botsort`.

`cvat_annotations.xml` - разметка в формате CVAT

`output_botsort` - разметка в расширенном формате YOLO

Код, который по сегментации делает разметку, представлен в `fromMaskToYolo.py`.

Запустить код можно с помощью команды:

`!python fromMaskToYolo.py --image_folder '/path/to/images' --mask_folder '/path/to/masks' --output_folder '/path/to/output' --class_id 0`, где

- **image_folder** - путь к папке с изображениями,

- **mask_folder** - путь к папке с масками,

- **output_folder** - путь для сохранения YOLO разметки

### Проведенное исследование

Для оценки качества работы разных методов использовался Google Colab. Код из него был экспортирован в формате `.py` и находится в папке **research**.

Он включает в себя обучение детектора, запуск моделей, замер FPS, подсчет среднего перемещения и других метрик. Также в нем реализована визуализация трекинга.
4 changes: 4 additions & 0 deletions src/FlotationAnalytics/app/classes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .deepSortTracker import DeepSortTracker
from .trackingQualityAnalyzer import TrackingQualityAnalyzer
from .objectTracker import ObjectTracker
from .videoTracker import VideoTracker
54 changes: 54 additions & 0 deletions src/FlotationAnalytics/app/classes/deepSortTracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from deep_sort.application_util import preprocessing
from deep_sort.deep_sort import nn_matching
from deep_sort.deep_sort.detection import Detection
from deep_sort.deep_sort.tracker import Tracker
import numpy as np


class DeepSortTracker:
def __init__(
self,
img_size,
nms_max_overlap=0.6,
max_cosine_distance=0.5,
nn_budget=None,
max_age=30,
min_hits=3,
iou_threshold=0.3,
):
self.img_size = img_size
self.nms_max_overlap = nms_max_overlap
self.iou_threshold = iou_threshold
metric = nn_matching.NearestNeighborDistanceMetric(
"cosine", max_cosine_distance, nn_budget
)
self.tracker = Tracker(metric, max_age=max_age, n_init=min_hits)

def prepare_detections(self, yolo_detections):
detections = []
for det in yolo_detections:
x1, y1, x2, y2, conf, _ = det
bbox = (x1, y1, x2 - x1, y2 - y1) # Конвертация в формат (x,y,w,h)
feature = []
detections.append(Detection(bbox, conf, feature))
return detections

def update(self, yolo_detections):
detections = self.prepare_detections(yolo_detections)
boxes = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
indices = preprocessing.non_max_suppression(boxes, self.nms_max_overlap, scores)
detections = [detections[i] for i in indices]

self.tracker.predict()
self.tracker.update(detections)

results = []
for track in self.tracker.tracks:
print("track", track, track.is_confirmed(), track.time_since_update)
if not track.is_confirmed() or track.time_since_update > 1:
continue
bbox = track.to_tlwh()
results.append([track.track_id, bbox[0], bbox[1], bbox[2], bbox[3]])

return results
157 changes: 157 additions & 0 deletions src/FlotationAnalytics/app/classes/objectTracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import numpy as np
from .trackingQualityAnalyzer import TrackingQualityAnalyzer
from .utils import get_center

MAX_DISAPPEARED = 3
MAX_POSITIONS_HISTORY = 5
IOU_DIST_WEIGHTS = (0.4, 0.6)
MIN_OBJECT_SIZE = 5
MIN_DISTANCE = 15
MAX_COST_THRESHOLD = 0.6
BORDER_DISAPPEAR_MULTIPLIER = 2


class ObjectTracker:
def __init__(self, max_disappeared=1, frame_size=(1920, 1080)):
self.next_id = 1
self.objects = {}
self.max_disappeared = max_disappeared
self.frame_width, self.frame_height = frame_size
self.metrics = TrackingQualityAnalyzer()

def update(self, detections, current_frame=None):
for obj_id in self.objects:
if len(self.objects[obj_id]["positions"]) > 1:
last_pos = self.objects[obj_id]["positions"][-1]
prev_pos = self.objects[obj_id]["positions"][-2]
velocity = last_pos - prev_pos
predicted_pos = last_pos + velocity
self.objects[obj_id]["predicted_pos"] = predicted_pos
else:
self.objects[obj_id]["predicted_pos"] = get_center(
self.objects[obj_id]["bbox"]
)

for obj_id in list(self.objects.keys()):
bbox = self.objects[obj_id]["bbox"]
if (
bbox[0] <= 5
or bbox[1] <= 5
or bbox[2] >= self.frame_width - 5
or bbox[3] >= self.frame_height - 5
):
self.objects[obj_id]["disappeared"] += BORDER_DISAPPEAR_MULTIPLIER
else:
self.objects[obj_id]["disappeared"] += 1

if self.objects[obj_id]["disappeared"] > self.max_disappeared:
del self.objects[obj_id]

if len(detections) == 0:
return self._prepare_output(current_frame)

if len(self.objects) == 0:
for det in detections:
det_center = get_center(det)
self.objects[self.next_id] = {
"bbox": det,
"disappeared": 0,
"positions": [det_center],
"predicted_pos": det_center,
}
self.next_id += 1
return self._prepare_output(current_frame)

obj_ids = list(self.objects.keys())
obj_bboxes = [self.objects[obj_id]["bbox"] for obj_id in obj_ids]

cost_matrix = np.zeros((len(obj_bboxes), len(detections)))
for i, obj_bbox in enumerate(obj_bboxes):
obj_predicted = self.objects[obj_ids[i]]["predicted_pos"]
for j, det_bbox in enumerate(detections):
det_center = get_center(det_bbox)
iou_score = 1 - self.calculate_iou(obj_bbox, det_bbox)
dist_score = np.linalg.norm(obj_predicted - det_center) / 100
cost_matrix[i, j] = (
IOU_DIST_WEIGHTS[0] * iou_score + IOU_DIST_WEIGHTS[1] * dist_score
)

matched_obj_indices = set()
matched_det_indices = set()

while True:
min_cost = np.min(cost_matrix)
if min_cost > MAX_COST_THRESHOLD:
break

i, j = np.unravel_index(np.argmin(cost_matrix), cost_matrix.shape)
obj_id = obj_ids[i]

self.objects[obj_id]["bbox"] = detections[j]
self.objects[obj_id]["disappeared"] = 0
det_center = get_center(detections[j])
self.objects[obj_id]["positions"].append(det_center)
if len(self.objects[obj_id]["positions"]) > MAX_POSITIONS_HISTORY:
self.objects[obj_id]["positions"].pop(0)

matched_obj_indices.add(i)
matched_det_indices.add(j)

cost_matrix[i, :] = float("inf")
cost_matrix[:, j] = float("inf")

for j in set(range(len(detections))) - matched_det_indices:
det = detections[j]
det_center = get_center(det)

width = det[2] - det[0]
height = det[3] - det[1]
if width < MIN_OBJECT_SIZE or height < MIN_OBJECT_SIZE:
continue

too_close = False
for obj_id in self.objects:
obj_center = get_center(self.objects[obj_id]["bbox"])
distance = np.linalg.norm(obj_center - det_center)
if distance < MIN_DISTANCE:
too_close = True
break

if not too_close:
self.objects[self.next_id] = {
"bbox": det,
"disappeared": 0,
"positions": [det_center],
"predicted_pos": det_center,
}
self.next_id += 1

return self._prepare_output(current_frame)

@staticmethod
def calculate_iou(bbox1, bbox2):
x1 = max(bbox1[0], bbox2[0])
y1 = max(bbox1[1], bbox2[1])
x2 = min(bbox1[2], bbox2[2])
y2 = min(bbox1[3], bbox2[3])

inter_area = max(0, x2 - x1) * max(0, y2 - y1)
if inter_area == 0:
return 0.0

bbox1_area = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
bbox2_area = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])

return inter_area / float(bbox1_area + bbox2_area - inter_area)

def _prepare_output(self, current_frame):
current_tracks = [
[obj_id, obj["bbox"][0], obj["bbox"][1], obj["bbox"][2], obj["bbox"][3]]
for obj_id, obj in self.objects.items()
]

if current_frame is not None:
self.metrics.update_metrics(
len(self.metrics.metrics["frame"]), current_tracks, current_frame
)
return {obj_id: obj["bbox"] for obj_id, obj in self.objects.items()}
Loading