-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
110 lines (85 loc) · 3.83 KB
/
main.py
File metadata and controls
110 lines (85 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""Entry point for the real-time object detection system."""
import argparse
import sys
import cv2
from src.alert_system import AlertManager, AlertRule
from src.detector import ObjectDetector
from src.tracker import CentroidTracker
from src.utils import draw_tracks, draw_zone, load_config
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Real-Time Object Detection System")
parser.add_argument("--source", default="0", help="Video source (0=webcam, path, or RTSP URL)")
parser.add_argument("--config", default="config/default_config.yaml", help="Config file path")
parser.add_argument("--confidence", type=float, default=None, help="Detection confidence threshold")
parser.add_argument("--model", default=None, help="YOLOv8 model path")
parser.add_argument("--save", default=None, help="Save output video to path")
parser.add_argument("--no-display", action="store_true", help="Disable display window")
return parser.parse_args()
def main() -> None:
args = parse_args()
config = load_config(args.config)
model_cfg = config.get("model", {})
detector = ObjectDetector(
model_path=args.model or model_cfg.get("path", "yolov8n.pt"),
confidence=args.confidence or model_cfg.get("confidence", 0.5),
device=model_cfg.get("device", "auto"),
classes=model_cfg.get("classes"),
)
tracker_cfg = config.get("tracker", {})
tracker = CentroidTracker(
max_disappeared=tracker_cfg.get("max_disappeared", 30),
max_distance=tracker_cfg.get("max_distance", 80),
)
alert_cfg = config.get("alerts", {})
rules = [AlertRule(**r) for r in alert_cfg.get("rules", [])]
alert_manager = AlertManager(rules=rules, log_file=alert_cfg.get("log_file", "alerts.log"))
zones = [(z["name"], tuple(z["coords"])) for z in config.get("zones", [])]
source = int(args.source) if args.source.isdigit() else args.source
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Error: Cannot open video source: {source}")
sys.exit(1)
writer = None
if args.save:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
fps = cap.get(cv2.CAP_PROP_FPS) or 30
w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
writer = cv2.VideoWriter(args.save, fourcc, fps, (w, h))
print(f"Starting detection on source: {source}")
print("Press 'q' to quit")
try:
while True:
ret, frame = cap.read()
if not ret:
break
detections = detector.detect_frame(frame)
tracked = tracker.update(detections)
alerts = alert_manager.check(detections)
annotated = detector.draw_detections(frame, detections)
annotated = draw_tracks(annotated, tracked)
for zone_name, zone_coords in zones:
annotated = draw_zone(annotated, zone_coords, label=zone_name)
zone_info = tracker.check_zone(zone_coords)
count = len(zone_info["inside"])
cv2.putText(
annotated, f"{zone_name}: {count}",
(zone_coords[0], zone_coords[1] - 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2,
)
for alert in alerts:
print(f"[ALERT] {alert['message']} (conf={alert['confidence']:.2f})")
if not args.no_display:
cv2.imshow("Object Detection", annotated)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
if writer:
writer.write(annotated)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
cap.release()
if writer:
writer.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()