Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ venv.bak/
*.idea
.DS_Store

*.h5

__pycache__/
*.pyc
audio/
emotion_diary/
#emotion_diary/
emotion_png/
pyvenv.cfg
97 changes: 97 additions & 0 deletions app/ML/ModelService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# import numpy as np
# from dotenv import load_dotenv
# from fastapi import Request, UploadFile, File, APIRouter
# from typing import List
# from tensorflow.keras.models import load_model
# from sentence_transformers import SentenceTransformer
# import io
# import requests
#
# from app.ML.audio_extractor_utils import get_features
# from app.ML.loss import boundary_enhanced_focal_loss
# from app.ML.plot_utils import save_plot, get_s3_png_url
# from app.ML.speech_to_text import speech_to_text
#
# import os
#
# from app.service.gpt import EmotionReportGPT
# from app.utils.convertFileExtension import convert_to_wav
#
# router = APIRouter(
# prefix="/api/fastapi",
# )
# load_dotenv()
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
#
#
#
#
# @router.post("/predict")
# async def predict(request: Request, files: List[UploadFile] = File(...)):
# # token = request.headers.get("Authorization").split(" ")[1]
# print(files)
# # 1) 임시 파일 저장 or 메모리 내 처리
# wav_data_list = []
# for file in files:
# raw = await file.read()
# ext = file.filename.split('.')[-1] # 'm4a', 'mp3' 등
# wav_bytes = convert_to_wav(raw, ext) # BytesIO 변환
# wav_data_list.append(wav_bytes)
#
# # 2) 오디오 특징 추출
# all_feats = []
# for wav_bytes in wav_data_list:
# # get_features 함수가 경로 입력이면, 아래처럼 메모리 파일 처리 필요
# # 임시파일로 저장 후 경로 전달 or get_features 수정 필요
#
# temp_path = f"temp_{file.filename}"
# with open(temp_path, "wb") as f:
# f.write(wav_bytes)
# feats = get_features(temp_path)
# os.remove(temp_path)
# all_feats.append(feats)
#
# all_feats = np.stack(all_feats, axis=0)
# pooled_feats = all_feats.mean(axis=0)
# audio_input = pooled_feats[np.newaxis, :, np.newaxis]
#
# # 3) STT & 텍스트 임베딩
# texts = []
# for wav_bytes in wav_data_list:
# temp_path = f"temp_stt.wav"
# with open(temp_path, "wb") as f:
# f.write(wav_bytes)
# text = speech_to_text(temp_path)
# os.remove(temp_path)
# texts.append(text)
#
# full_text = " . ".join(texts)
# text_vec = embedding_model.encode([full_text])[0]
# text_input = text_vec[np.newaxis, :]
#
# # 4) 예측
# prediction = model.predict([audio_input, text_input])
# pred_percent = (prediction[0] * 100).tolist()
#
# # 5) JSON 응답
# result = {label: round(p, 2) for label, p in zip(emotion_labels, pred_percent)}
# top_idx = np.argmax(pred_percent)
# result['predicted_emotion'] = emotion_labels[top_idx]
#
# local_path = save_plot(pred_percent)
# s3_path = get_s3_png_url(local_path)
# reporter = EmotionReportGPT(full_text, pred_percent)
# report_text = reporter.get_report_text()
#
# print(s3_path)
#
# # send_emotion_report_to_spring(s3_path, report_text)
#
# data = {
# "imageUrl": s3_path,
# "report_text": report_text
# }
# return data
#
#
#
69 changes: 69 additions & 0 deletions app/ML/audio_extractor_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import librosa
import librosa.display
import numpy as np


def noise(data):
noise_amp = 0.035 * np.random.uniform() * np.amax(data)
data = data + noise_amp * np.random.normal(size=data.shape[0])
return data


def stretch(data, rate=0.8):
return librosa.effects.time_stretch(y=data, rate=rate)


def shift(data):
shift_range = int(np.random.uniform(low=-5, high=5) * 1000)
return np.roll(data, shift_range)


def pitch(data, sampling_rate, pitch_factor=0.7):
return librosa.effects.pitch_shift(y=data, sr=sampling_rate, n_steps=pitch_factor)


def extract_features(data, sample_rate):
# ZCR
result = np.array([])
zcr = np.mean(librosa.feature.zero_crossing_rate(y=data).T, axis=0)
result = np.hstack((result, zcr)) # stacking horizontally

# Chroma_stft
stft = np.abs(librosa.stft(data))
chroma_stft = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T, axis=0)
result = np.hstack((result, chroma_stft)) # stacking horizontally

# MFCC
mfcc = np.mean(librosa.feature.mfcc(y=data, sr=sample_rate).T, axis=0)
result = np.hstack((result, mfcc)) # stacking horizontally

# Root Mean Square Value
rms = np.mean(librosa.feature.rms(y=data).T, axis=0)
result = np.hstack((result, rms)) # stacking horizontally

# MelSpectogram
mel = np.mean(librosa.feature.melspectrogram(y=data, sr=sample_rate).T, axis=0)
result = np.hstack((result, mel)) # stacking horizontally

return result


def get_features(path):
data, sample_rate = librosa.load(path, duration=2.5, offset=0.0)

# without augmentation
res1 = extract_features(data, sample_rate)
result = np.array(res1)

# data with noise
noise_data = noise(data)
res2 = extract_features(noise_data, sample_rate)
result = np.concatenate((result, res2), axis=0)

# data with stretching and pitching
new_data = stretch(data)
data_stretch_pitch = pitch(new_data, sample_rate)
res3 = extract_features(data_stretch_pitch, sample_rate)
result = np.concatenate((result, res3), axis=0)

return result
Binary file not shown.
29 changes: 29 additions & 0 deletions app/ML/loss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import tensorflow as tf


# 1. Boundary-Enhanced Focal Loss 구현 (소수 클래스 식별 강화)
def boundary_enhanced_focal_loss(y_true, y_pred, gamma=2.0, margin=0.3):
y_pred = tf.clip_by_value(y_pred, 1e-7, 1.0 - 1e-7)

# 하드 샘플 마이닝 (낮은 확률로 예측된 샘플 식별)
correct_prob = tf.reduce_sum(y_true * y_pred, axis=-1)
hard_mask = tf.cast(tf.less(correct_prob, margin), tf.float32)

# 클래스별 가중치 계산 (소수 클래스에 더 높은 가중치)
effective_counts = tf.reduce_sum(y_true, axis=0)
alpha = 1.0 / (effective_counts + 1e-7)
alpha = alpha / tf.reduce_sum(alpha)

# 소수 클래스 추가 가중치 부여 (surprise, neutral)
class_boost = tf.constant([1.0, 0.5, 1.0, 1.0, 1.0, 2.5, 5.0], dtype=tf.float32)
alpha = alpha * class_boost

# Focal Loss 계산
cross_entropy = -y_true * tf.math.log(y_pred)
focal_weight = tf.pow(1.0 - y_pred, gamma)

# 하드 샘플에 추가 가중치 부여
sample_weight = 1.0 + hard_mask * 2.0
loss = sample_weight[:, tf.newaxis] * alpha * focal_weight * cross_entropy

return tf.reduce_sum(loss)
40 changes: 40 additions & 0 deletions app/ML/plot_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# 그래프 그리기
import os
from datetime import datetime

from matplotlib import pyplot as plt
from app.service.s3Service import upload_to_s3_png

colors = ['#e74c3c', '#3498db', '#f1c40f', '#e67e22', '#9b59b6', '#1abc9c', '#95a5a6']
emotion_labels = ['angry', 'sadness', 'happiness', 'fear', 'disgust', 'surprise', 'neutral']


def save_plot(predictions_percent):
plt.figure(figsize=(10, 6))
bars = plt.barh(emotion_labels, predictions_percent, color=colors, alpha=0.85)

plt.title('Emotion Probability Distribution', fontsize=20, weight='bold', pad=15)
plt.xlabel('Probability (%)', fontsize=14)
plt.xlim(0, max(predictions_percent) + 10)
plt.grid(axis='x', linestyle='--', alpha=0.6)

for bar, percent in zip(bars, predictions_percent):
width = bar.get_width()
plt.text(width + 0.8, bar.get_y() + bar.get_height() / 2, f'{percent:.1f}%', va='center', fontsize=13,
weight='bold', color='#333')

plt.yticks(fontsize=14, weight='bold')
plt.tight_layout()

date_str = datetime.now().strftime("%Y%m%d")
filename = f"{date_str}"
local_path = os.getcwd() + f"/app/emotion_png/{filename}_emotion_distribution.png"
# 이미지 파일로 저장
plt.savefig(local_path, dpi=300, bbox_inches='tight')
plt.show()

return local_path


def get_s3_png_url(local_path):
return upload_to_s3_png(local_path)
95 changes: 95 additions & 0 deletions app/ML/predict_colab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model
from sentence_transformers import SentenceTransformer
import glob
import os

from app.ML.audio_extractor_utils import get_features
from app.ML.loss import boundary_enhanced_focal_loss
from app.ML.speech_to_text import speech_to_text
import os
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'


BASE_DIR_resp = "/home/team4/Desktop/capstone/AI/app/emotion_diary"
BASE_DIR_win = "C:/Users/YJG/Desktop/2025_1_capstone_2/AI/app/emotion_diary"
emotion_labels = ['angry', 'sadness', 'happiness', 'fear', 'disgust', 'surprise', 'neutral']
model_path_resp = "/home/team4/Desktop/capstone/AI/app/ML/ko-sbert_multimodal_0501_3_resnet_augment_h.h5"
model_path_win = "C:/Users/YJG/Desktop/2025_1_capstone_2/AI/app/ML/ko-sbert_multimodal_0501_3_resnet_augment_h.h5"


def predict():
BASE_DIR = BASE_DIR_win
model_path = model_path_win
# (가정) 미리 정의된 함수/변수
# get_features(path): (486,) 벡터 반환
# speech_to_text(path): STT → 문자열 반환
# boundary_enhanced_focal_loss: 커스텀 손실
# emotion_labels: ['angry','sadness','happiness','fear','disgust','surprise','neutral']
# model_path, sample_path: 경로 문자열

# 1) WAV 파일 리스트
# sample_wav_list = [
# sample_path + "/jg_sadness_1.wav",
# sample_path + "/jg_sadness_2.wav",
# sample_path + "/jg_sadness_3.wav",
# sample_path + "/jg_sadness_4.wav",
# sample_path + "/jg_sadness_5.wav"
# ]
sample_wav_list = glob.glob(os.path.join(BASE_DIR, "**", "*.wav"), recursive=True)

# 2) 오디오 특징 평균 풀링
all_feats = np.stack([get_features(p) for p in sample_wav_list], axis=0) # (5,486)
pooled_feats = all_feats.mean(axis=0) # (486,)

# 3) 모델 입력 형태 맞추기
audio_input = pooled_feats[np.newaxis, :, np.newaxis] # (1,486,1)

# 4) 전체 텍스트 STT → 하나의 문장으로 결합
texts = [speech_to_text(p) for p in sample_wav_list]
full_text = " . ".join(texts)

# 5) 텍스트 임베딩
embedding_model = SentenceTransformer('jhgan/ko-sbert-multitask')
text_vec = embedding_model.encode([full_text])[0] # (768,)
text_input = text_vec[np.newaxis, :] # (1,768)

# 6) 모델 로드 및 예측
model = load_model(model_path, custom_objects={
'boundary_enhanced_focal_loss': boundary_enhanced_focal_loss
})
prediction = model.predict([audio_input, text_input]) # (1,7)
pred_percent = prediction[0] * 100 # (7,)

# 7) 콘솔에 출력
for lbl, p in zip(emotion_labels, pred_percent):
print(f"{lbl}: {p:.2f}%")
top_idx = np.argmax(pred_percent)
print(f"\n최종 예측 감정: {emotion_labels[top_idx]}")

# 8) 가로 막대그래프 시각화
colors = ['#e74c3c', '#3498db', '#f1c40f', '#e67e22', '#9b59b6', '#1abc9c', '#95a5a6']

plt.figure(figsize=(10, 6))
bars = plt.barh(emotion_labels, pred_percent, color=colors, alpha=0.85)

plt.title('Emotion Probability Distribution', fontsize=18, weight='bold', pad=15)
plt.xlabel('Probability (%)', fontsize=14)
plt.xlim(0, pred_percent.max() + 10)
plt.grid(axis='x', linestyle='--', alpha=0.6)

for bar, p in zip(bars, pred_percent):
plt.text(p + 1, bar.get_y() + bar.get_height() / 2,
f'{p:.1f}%', va='center', fontsize=12, weight='bold', color='#333')

plt.yticks(fontsize=13, weight='bold')
plt.tight_layout()

# 이미지 파일로 저장
plt.savefig('emotion_distribution.png', dpi=300, bbox_inches='tight')
plt.show()


if __name__ == "__main__":
predict()
26 changes: 26 additions & 0 deletions app/ML/speech_to_text.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import speech_recognition as sr

# sample_wav_path = sample_path + "/sh_sadness_2.wav"


# STT 변환 함수
def speech_to_text(audio_path):
recognizer = sr.Recognizer()

# 음성 파일 로드
with sr.AudioFile(audio_path) as source:
audio_data = recognizer.record(source) # 음성 데이터 읽기

try:
# 구글 STT API 사용 (무료)
text = recognizer.recognize_google(audio_data, language="ko-KR")
return text
except sr.UnknownValueError:
return "음성을 인식할 수 없습니다."
except sr.RequestError:
return "STT 요청 실패"

#
# # MP3에서 변환한 WAV 파일 입력
# sample_text = speech_to_text(sample_wav_path)
# print("변환된 텍스트:", sample_text)
Loading