-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdemo_main.py
More file actions
473 lines (393 loc) · 15.2 KB
/
demo_main.py
File metadata and controls
473 lines (393 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
import os
import threading
import logging
import subprocess
from concurrent.futures import ThreadPoolExecutor
import asyncio
import requests
import cv2
import pytesseract
import uvicorn
from fastapi import FastAPI
from fastapi import Form, File, UploadFile
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from ultralytics import YOLO
import supervision as sv
from PIL import Image
import pyautogui
import time
import fitz
import re
# Initialize Tesseract OCR
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
# Lock for managing priority between YOLO and verification tasks
from threading import Lock
yolo_lock = Lock()
# Add logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
app = FastAPI()
# Constants
CORSMiddleware
origins = [
"*" # 필요한 도메인들로 교체 가능
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
VIDEO_FEED_PATH = "./assets/final_seat_vid.mp4" # Path to your MP4 video file
MODEL_PATH = "./models/best.pt" # YOLO model file
INTERVAL = 5 # Time interval in seconds
SEGMENTS = [
(160, 270, 310, 520), # Seat 1
(340, 270, 490, 520), # Seat 2
(520, 270, 670, 520), # Seat 3
(700, 270, 850, 520), # Seat 4
(880, 270, 1030, 520), # Seat 5
(1060, 270, 1210, 520), # Seat 6
]
CHROME_PATH = "./assets/chrome-win64/chrome.exe"
CHROMEDRIVER_PATH = "./assets/chromedriver.exe"
VERIFICATION_URL = "https://icert.skku.edu/etc/verification"
# Global seat states
INITIAL_SEAT_STATES = {i: "Free" for i in range(1, 7)}
seats = INITIAL_SEAT_STATES.copy()
# YOLO model setup
model = YOLO(MODEL_PATH)
box_annotator = sv.BoxAnnotator(thickness=2)
# ThreadPoolExecutor for running blocking code
executor = ThreadPoolExecutor(max_workers=5)
# Models for requests
class SeatState(BaseModel):
seat_number: int
seat_status: str
class VerifyPayload(BaseModel):
student_name: str
student_code: str
# Helper Functions for OCR-based PDF Verification
def extract_text_from_pdf(pdf_path: str) -> str:
"""
Extract text from a PDF file using PyMuPDF.
Args:
pdf_path (str): Path to the PDF file.
Returns:
str: Extracted text from the PDF.
"""
text = ""
try:
pdf_document = fitz.open(pdf_path)
for page_num in range(len(pdf_document)):
page = pdf_document[page_num]
text += page.get_text()
pdf_document.close()
except Exception as e:
logging.error(f"Error extracting text from PDF: {e}")
return text.strip()
def extract_name_from_text(text: str) -> str:
"""
Extract the student's name from the PDF text.
Args:
text (str): Extracted text.
Returns:
str: Extracted name.
"""
match = re.search(r"성\s*명\s*:\s*([가-힣\s]+)", text)
if match:
name = "".join(match.group(1).split())
if "생년월일" in name:
name = name.replace("생년월일", "")
return name
return None
def extract_student_code_from_text(text: str) -> str:
"""
Extract the student code from the PDF text.
Args:
text (str): Extracted text.
Returns:
str: Extracted student code.
"""
match = re.search(r"문서확인번호\s*▣\s*([A-Z0-9\-]+)\s*▣", text)
if match:
return match.group(1)
return None
# Utility functions
def intersects(segment, bbox):
"""
Determine if a bounding box intersects with a segment.
Args:
segment (tuple): The segment coordinates (x1, y1, x2, y2).
bbox (tuple): The bounding box coordinates (bx1, by1, bx2, by2).
Returns:
bool: True if the bounding box intersects with the segment, False otherwise.
"""
x1, y1, x2, y2 = segment
bx1, by1, bx2, by2 = bbox
return not (x2 < bx1 or bx2 < x1 or y2 < by1 or by2 < y1)
def update_seat_states(detections, segments):
"""
Update the seat states based on detections and predefined segments.
Args:
detections (sv.Detections): Detections from the YOLO model.
segments (list): List of seat segment coordinates.
Returns:
dict: Updated seat states with seat numbers as keys and statuses as values.
"""
# Map class IDs to status strings expected by the backend
class_id_to_status = {
0: "Missing", # Missing class
1: "Obstructed", # Object class
2: "Occupied", # Person class
}
states = {i + 1: "Free" for i in range(len(segments))}
for i, segment in enumerate(segments):
for bbox, class_id in zip(detections.xyxy, detections.class_id):
if intersects(segment, bbox):
status = class_id_to_status.get(class_id, "Free")
if status == "Occupied":
states[i + 1] = "Occupied"
break
elif status == "Missing":
if states[i + 1] == "Free":
states[i + 1] = "Missing"
elif status == "Obstructed":
if states[i + 1] == "Free":
states[i + 1] = "Obstructed"
return states
def broadcast_to_backends(seat_states):
"""
Broadcast seat states to backend URLs synchronously.
Args:
seat_states (dict): Dictionary containing seat numbers and their statuses.
"""
payload = {"seats": [{"seat_number": seat_id, "seat_status": state} for seat_id, state in seat_states.items()]}
for url in origins:
try:
response = requests.post(url, json=payload)
if response.status_code == 200:
logging.info(f"Successfully sent to backend {url}")
else:
logging.error(f"Failed to send to backend {url}. Status code: {response.status_code}")
except Exception as e:
logging.error(f"Error sending data to backend {url}: {e}")
def capture_screen_area(left, top, width, height, save_path="screenshot.bmp"):
"""
Capture a specific area of the screen and save it as an image.
Args:
left (int): The x-coordinate of the top-left corner.
top (int): The y-coordinate of the top-left corner.
width (int): The width of the area to capture.
height (int): The height of the area to capture.
save_path (str): The file path to save the screenshot.
"""
screenshot = pyautogui.screenshot(region=(left, top, width, height))
screenshot.save(save_path)
def extract_text_from_image(image_path):
"""
Extract text from an image using Tesseract OCR.
Args:
image_path (str): The path to the image file.
Returns:
str: The extracted text.
"""
img = Image.open(image_path)
text = pytesseract.image_to_string(img, lang="kor+eng").strip()
return text
def process_and_broadcast_seat_states_sync():
"""
Process video feed synchronously to update seat states and broadcast them to backends.
Displays the video in a window with bounding boxes and labels.
"""
cap = cv2.VideoCapture(VIDEO_FEED_PATH)
if not cap.isOpened():
logging.error("Error: Could not open video feed")
return
fps = cap.get(cv2.CAP_PROP_FPS)
fps = fps if 1 <= fps <= 120 else 30 # Default to 30 FPS if invalid
frame_delay = int(1000 / fps)
last_detection_time = 0
while True:
ret, frame = cap.read()
if not ret:
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
continue
current_time = time.time()
if current_time - last_detection_time >= INTERVAL:
with yolo_lock:
results = model.predict(frame)
detections = sv.Detections.from_ultralytics(results[0])
detections = detections[detections.confidence > 0.5]
global seats
seats = update_seat_states(detections, SEGMENTS)
broadcast_to_backends(seats)
# Annotate the frame without using labels
annotated_frame = box_annotator.annotate(scene=frame.copy(), detections=detections)
# Add custom labels manually
for bbox, class_id in zip(detections.xyxy, detections.class_id):
x1, y1, x2, y2 = map(int, bbox)
label = f"{int(class_id)}: {model.names[int(class_id)]}"
cv2.putText(
annotated_frame,
label,
(x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(255, 255, 255),
2,
)
cv2.imshow("Annotated Video Feed", annotated_frame)
last_detection_time = current_time
if cv2.waitKey(frame_delay) & 0xFF == ord("q"):
break
cap.release()
cv2.destroyAllWindows()
logging.info("Video processing stopped.")
def perform_verification(payload: VerifyPayload) -> bool:
"""
Perform student verification by automating a browser interaction.
Args:
payload (VerifyPayload): The verification payload containing student details.
Returns:
bool: True if verification was successful, False otherwise.
"""
logging.info(f"Starting verification for {payload.student_name}")
student_name = payload.student_name
student_code = payload.student_code
code_parts = student_code.split("-")
if len(code_parts) != 4:
logging.error(f"Invalid code format for {student_name}")
return False
options = Options()
options.binary_location = CHROME_PATH
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-data-dir=/chrome_profile")
options.add_argument("--disable-software-rasterizer")
options.add_argument("--disable-extensions")
# options.add_argument("--headless") # Uncomment to run in headless mode
driver = None
try:
service = Service(CHROMEDRIVER_PATH)
driver = webdriver.Chrome(service=service, options=options)
driver.get(VERIFICATION_URL)
# Input the code parts without explicit waits
textfield_ids = ["textfield", "textfield2", "textfield3", "textfield4"]
for i, part in enumerate(code_parts):
textfield = driver.find_element(By.ID, textfield_ids[i])
textfield.send_keys(part)
# Click the submit button
driver.find_element(By.ID, "submitBtn").click()
# Optionally, wait for the page to load or the result to be displayed
# time.sleep(5) # Uncomment if needed
# Capture screenshot of the verification result
capture_screen_area(0, 0, 1341, 1080, "verification_result.bmp")
result_text = extract_text_from_image("verification_result.bmp")
logging.info(f"Extracted text: {result_text}")
if student_name in result_text:
logging.info(f"Verification successful for {student_name}")
return True
else:
logging.error(f"Verification failed for {student_name}")
return False
except Exception as e:
logging.error(f"Error during verification: {e}")
return False
finally:
if driver:
driver.quit()
subprocess.run("taskkill /f /im ePageSaferRT.exe", shell=True)
logging.info(f"Finished verification for {student_name}")
@app.post("/verify")
async def verify_student(payload: VerifyPayload):
"""
Endpoint to perform student verification and return the result.
Args:
payload (VerifyPayload): The verification payload containing student details.
Returns:
dict: A dictionary containing 'success': True or False.
"""
logging.info(f"Received verification request:")
logging.info(f"Student Name: {payload.student_name}")
logging.info(f"Student Code: {payload.student_code}")
try:
# Acquire the lock to prevent concurrency issues
with yolo_lock:
loop = asyncio.get_running_loop()
# Run the blocking verification in a separate thread
success = await loop.run_in_executor(executor, perform_verification, payload)
logging.info(f"Verification result for {payload.student_name}: {'Success' if success else 'Failed'}")
return {"content": success}
except Exception as e:
logging.error(f"Verification failed for {payload.student_name}: {str(e)}")
return {"content": False}
@app.post("/auth/verify")
async def auth_verify_student(
student_name: str = Form(...),
student_number: str = Form(...),
pdf_file: UploadFile = File(...),
):
"""
Endpoint for student verification using PDF files.
Args:
student_name (str): Name of the student.
student_number (str): Student number.
pdf_file (UploadFile): Uploaded PDF file.
Returns:
dict: Verification result.
"""
try:
# Save the uploaded PDF
file_location = f"./uploaded_files/{pdf_file.filename}"
os.makedirs(os.path.dirname(file_location), exist_ok=True)
with open(file_location, "wb") as f:
f.write(await pdf_file.read())
# Extract text from the PDF
extracted_text = extract_text_from_pdf(file_location)
extracted_name = extract_name_from_text(extracted_text)
extracted_code = extract_student_code_from_text(extracted_text)
if not extracted_name or not extracted_code:
return JSONResponse(
content={"content": False, "error": "Failed to extract required data from the PDF."},
status_code=400,
)
# Validate the provided name with the extracted name
if student_name.strip() != extracted_name.strip():
return JSONResponse(
content={"content": False, "error": "Provided name does not match the extracted name."},
status_code=400,
)
# Perform the verification with the extracted code
payload = VerifyPayload(student_name=student_name, student_code=extracted_code)
is_verified = perform_verification(payload)
return JSONResponse(content={"content": is_verified}, status_code=200)
except Exception as e:
logging.error(f"Error in /auth/verify: {e}")
return JSONResponse(content={"content": False, "error": str(e)}, status_code=500)
@app.get("/seat/states", response_model=list[SeatState])
def get_seat_states():
"""
Endpoint to retrieve the current seat states.
Returns:
list[SeatState]: A list of seat states.
"""
return [{"seat_number": seat_id, "seat_status": state} for seat_id, state in seats.items()]
if __name__ == "__main__":
# Start the FastAPI app in a separate thread
def start_server():
uvicorn.run(app, host="0.0.0.0", port=5000)
server_thread = threading.Thread(target=start_server)
server_thread.start()
# Run the video processing function in the main thread
process_and_broadcast_seat_states_sync()