-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfastApi.py
More file actions
166 lines (138 loc) · 6.04 KB
/
fastApi.py
File metadata and controls
166 lines (138 loc) · 6.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# FastApi.py 이거 실행하고 인텔리제이 실행!! (얼굴인식 최종코드)
from fastapi import FastAPI, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
import cv2
import numpy as np
from facenet_pytorch import MTCNN, InceptionResnetV1
from PIL import Image
import torch
import json
from scipy.spatial.distance import cosine
import mysql.connector
import time
from torchvision import transforms
import uvicorn
app = FastAPI()
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], #모든 도메인에서 요청을 허용
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 데이터베이스 연결 함수
def connect_database():
return mysql.connector.connect(
host="localhost",
user="face",
password="1234",
database="my_face"
)
# 얼굴 인식 클래스 정의
class FaceRecognition:
def __init__(self, device='cpu'):
self.device = torch.device(device)
self.mtcnn = MTCNN(keep_all=True, device=self.device) # return_landmarks 제거
self.facenet = InceptionResnetV1(pretrained='vggface2').eval().to(self.device)
self.previous_name = None
self.current_name = None
self.recognition_start_time = None
def detect_faces(self, frame):
# MTCNN의 detect 메서드: boxes와 probs만 반환
boxes, probs = self.mtcnn.detect(frame)
return boxes, probs
def get_embedding(self, face_image):
transform = transforms.Compose([
transforms.Resize((160, 160)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
face_tensor = transform(face_image).unsqueeze(0).to(self.device)
with torch.no_grad():
return self.facenet(face_tensor).cpu().numpy().flatten()
# def find_closest_match(self, embedding_vector, threshold=0.35):
# conn = connect_database()
# cursor = conn.cursor(dictionary=True)
# cursor.execute("SELECT name, embedding_vector FROM users")
# rows = cursor.fetchall()
#
# closest_name = None
# min_distance = float('inf')
# for row in rows:
# db_embedding = np.array(json.loads(row["embedding_vector"])).flatten()
# distance = cosine(embedding_vector, db_embedding)
# if distance < threshold and distance < min_distance:
# closest_name = row["name"]
# min_distance = distance
#
# cursor.close()
# conn.close()
# return closest_name
def find_closest_match(self, embedding_vector, threshold=0.35):
conn = connect_database()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT name, embedding_vector FROM users")
rows = cursor.fetchall()
closest_name = None
min_distance = float('inf') # 가장 낮은 거리 저장
best_score = 0 # 유사도 점수 (1 - 코사인 거리), 가장 높은 유사도 저장
for row in rows:
db_embedding = np.array(json.loads(row["embedding_vector"])).flatten()
distance = cosine(embedding_vector, db_embedding)
similarity = 1 - distance # 유사도를 0~1 범위로 변환
# similarity가 더 높으면 저장 (threshold와 관계 없이)
if similarity > best_score:
best_score = similarity
min_distance = distance
closest_name = row["name"]
# threshold는 인식 여부 판단용 (이름만 None 처리)
if min_distance > threshold:
closest_name = None
# if distance < threshold and distance < min_distance:
# closest_name = row["name"]
# min_distance = distance
# best_score = similarity # 가장 높은 유사도 저장
cursor.close()
conn.close()
return closest_name, best_score # 유사도 점수 추가 반환
def process_frame(self, image_rgb):
boxes, _ = self.detect_faces(image_rgb)
if boxes is not None and len(boxes) > 0:
largest_box = max(boxes, key=lambda box: (box[2] - box[0]) * (box[3] - box[1]))
x1, y1, x2, y2 = map(int, largest_box)
face_crop = image_rgb[y1:y2, x1:x2]
face_image = Image.fromarray(face_crop)
embedding = self.get_embedding(face_image)
#closest_name = self.find_closest_match(embedding)
# ✅ 가장 가까운 사용자 찾기 (이름 & 정확도 반환)
closest_name, accuracy = self.find_closest_match(embedding)
# ✅ 정확도 출력 (로그)
print(f"인식된 사용자: {closest_name}, 정확도: {accuracy:.2f}")
# 이름 변경 시 타이머 초기화
if closest_name != self.previous_name:
self.previous_name = closest_name
self.recognition_start_time = time.time()
# 동일한 이름이 5초 동안 감지된 경우
if closest_name == self.previous_name and closest_name is not None:
elapsed_time = time.time() - self.recognition_start_time
remaining_time = max(0, 5 - elapsed_time)
if elapsed_time >= 5:
self.current_name = closest_name
return {"currentName": self.current_name or "Unknown", "remainingTime": int(remaining_time)}
# 얼굴 감지 실패 시 초기화
self.previous_name = None
self.recognition_start_time = None
return {"currentName": "", "remainingTime": 5}
recognition = FaceRecognition()
@app.post("/recognize")
async def recognize_face(file: UploadFile = File(...)):
image_bytes = await file.read()
nparr = np.frombuffer(image_bytes, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# 얼굴 감지 및 처리
result = recognition.process_frame(image_rgb)
return result
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)