forked from TakshDhabalia/Driving_Optimization
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDetection_Latest.py
More file actions
87 lines (64 loc) · 2.13 KB
/
Detection_Latest.py
File metadata and controls
87 lines (64 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import cv2
import threading
import time
import os
import numpy as np
import MQTT
import Camera as cam
count_lock = threading.Lock() # Lock for count variable
car_count_lock = threading.Lock() # Lock for car_count variable
count = 1
car_count = 1
def car_detection(image_path, batch_size):
global car_count
image = cv2.imread(image_path)
image_arr = np.array(image)
grey = cv2.cvtColor(image_arr, cv2.COLOR_BGR2GRAY)
blur = cv2.GaussianBlur(grey, (5, 5), 0)
dilated = cv2.dilate(blur, np.ones((3, 3)))
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
closing = cv2.morphologyEx(dilated, cv2.MORPH_CLOSE, kernel)
car_cascade_src = 'cars.xml'
car_cascade = cv2.CascadeClassifier(car_cascade_src)
cars = car_cascade.detectMultiScale(closing, 1.1, 1)
cnt = len(cars)
print(f"{cnt} cars found for {image_path}")
with car_count_lock:
car_count += cnt
# If batch is completed, reset car_count and return the count
if count % batch_size == 0:
with car_count_lock:
ca = car_count
car_count = 0
return ca
def car_detection_repeated(batch_size, total_images):
global count
for i in range(total_images):
image_path = f"images/{count}.jpeg"
# Car detection
car_count = car_detection(image_path, batch_size)
# Increase count
with count_lock:
count += 1
# Publish car count if it's not None
if car_count is not None:
client.publish("ESIOT", car_count)
time.sleep(10)
if __name__ == "__main__":
if not os.path.exists("images"):
os.makedirs("images")
# Start capturing images
t1 = threading.Thread(target=cam.capture_images, args=("http://192.168.1.14:8080/video", 3))
# Start car detection
t2 = threading.Thread(target=car_detection_repeated, args=(1, 100))
# Initialize MQTT client
client = MQTT.MQTTClient("172.28.66.244", 1993)
client.connect()
t2.start()
print("t2 started")
# Start threads
t1.start()
print("t1 started")
# Join threads
t1.join()
t2.join()