Skip to content

Commit d1d932a

Browse files
authored
Merge pull request #20 from SynergyX-AI-Pattern/bug/#18_prediction_batch_upsert
[FIX] 예측 배치, 배포 오류 해결 #18
2 parents 50d6b6c + d26d96e commit d1d932a

File tree

5 files changed

+83
-61
lines changed

5 files changed

+83
-61
lines changed

.github/workflows/fastapi-dev-ci-cd.yml

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ jobs:
4343
rsync -avz -e "ssh -i private_key.pem -o StrictHostKeyChecking=no" \
4444
--exclude venv --exclude .git ./ $EC2_USERNAME@$EC2_HOST:/home/$EC2_USERNAME/fastapi-app/
4545
46-
# .env 파일 전송
46+
# .env 전송
4747
scp -i private_key.pem -o StrictHostKeyChecking=no .env $EC2_USERNAME@$EC2_HOST:/home/$EC2_USERNAME/fastapi-app/.env
4848
49-
# 서버 실행 스크립트
49+
# 서버 설정 및 재시작
5050
ssh -i private_key.pem -o StrictHostKeyChecking=no $EC2_USERNAME@$EC2_HOST "
5151
set -e
5252
cd /home/$EC2_USERNAME/fastapi-app
@@ -62,14 +62,11 @@ jobs:
6262
pip install --upgrade pip
6363
pip install -r requirements.txt
6464
65-
# fast api 서버 재시작
66-
pkill -f 'uvicorn' || true
67-
export PYTHONPATH=$(pwd)
68-
69-
echo '[INFO] FastAPI 실행 중...'
70-
nohup ./venv/bin/uvicorn app.main:app --host 0.0.0.0 --port 8000 > fastapi.log 2>&1 &
71-
sleep 8
72-
tail -n 50 fastapi.log
65+
# systemd 서비스 재시작
66+
echo '[INFO] FastAPI 서비스 재시작...'
67+
sudo systemctl daemon-reload
68+
sudo systemctl restart fastapi
69+
sudo systemctl status fastapi --no-pager -l
7370
"
7471
7572
rm -f private_key.pem

app/main.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from fastapi import FastAPI
44
import logging
55
from dotenv import load_dotenv
6-
from app.core.config import settings
76
from app.core.logging_config import setup_logging
87
from app.exceptions.base import APIException
98
from app.exceptions.exception_handlers import api_exception_handler
@@ -16,6 +15,7 @@
1615
from starlette.exceptions import HTTPException as StarletteHTTPException
1716
from app.schedulers.predict_batch_runner import start_batch_scheduler
1817
from app.schedulers.pattern_detection_runner import start_pattern_detection_scheduler
18+
from app.core.config import settings
1919

2020
load_dotenv()
2121
setup_logging()
@@ -27,17 +27,23 @@
2727
@asynccontextmanager
2828
async def lifespan(app: FastAPI):
2929
if settings.ENV == "prod":
30-
try:
31-
logger.info("prod - 배치 스케줄러 실행 시작")
32-
start_batch_scheduler()
33-
except Exception as e:
34-
logger.exception(f"배치 스케줄러 시작 실패: {e}")
35-
36-
try:
37-
logger.info("prod - 패턴 감지 스케줄러 실행 시작")
38-
start_pattern_detection_scheduler()
39-
except Exception as e:
40-
logger.exception(f"패턴 감지 스케줄러 시작 실패: {e}")
30+
# 중복 실행 방지
31+
if not getattr(app.state, "schedulers_started", False):
32+
try:
33+
logger.info("prod - 배치 스케줄러 실행 시작")
34+
start_batch_scheduler()
35+
except Exception as e:
36+
logger.exception(f"배치 스케줄러 시작 실패: {e}")
37+
38+
try:
39+
logger.info("prod - 패턴 감지 스케줄러 실행 시작")
40+
start_pattern_detection_scheduler()
41+
except Exception as e:
42+
logger.exception(f"패턴 감지 스케줄러 시작 실패: {e}")
43+
44+
app.state.schedulers_started = True
45+
else:
46+
logger.info("스케줄러가 이미 시작되어 있어 생략")
4147
yield
4248

4349

app/schedulers/predict_batch_runner.py

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import logging
33
import time
44
import asyncio
5-
from concurrent.futures import ThreadPoolExecutor, as_completed
65
from app.services.batch_service import BatchService
76
from app.utils.discord_notifier import notify_discord_async
87

@@ -12,48 +11,42 @@
1211
def start_batch_scheduler():
1312
scheduler = BackgroundScheduler(timezone="Asia/Seoul")
1413

15-
@scheduler.scheduled_job('cron', hour=19, minute=15)
14+
@scheduler.scheduled_job(
15+
'cron',
16+
hour=16, minute=5, day_of_week='mon-fri',
17+
coalesce=False, # 밀린 작업 버림
18+
max_instances=1, # 중복 실행 방지
19+
misfire_grace_time=600 # 지연 허용 10분
20+
)
1621
def run_batch():
17-
logger.info("[Scheduler] 18:05 배치 예측 시작")
18-
start = time.time()
19-
20-
success_count = 0
21-
fail_count = 0
22-
failed_symbols = [] # 실패한 종목
23-
24-
start_id = 1
25-
end_id = 100
26-
max_workers = 3
27-
28-
with ThreadPoolExecutor(max_workers=max_workers) as executor:
29-
futures = {
30-
executor.submit(BatchService.process_single_stock, stock_id): stock_id
31-
for stock_id in range(start_id, end_id + 1)
32-
}
33-
34-
for future in as_completed(futures):
35-
stock_id = futures[future]
36-
try:
37-
stock_id, success, symbol_or_msg = future.result()
38-
if success:
39-
success_count += 1
40-
else:
41-
fail_count += 1
42-
failed_symbols.append(symbol_or_msg) # symbol 저장
43-
except Exception as e:
44-
logger.exception(f"[Batch] [{stock_id}] 처리 중 예외 발생: {str(e)}")
45-
fail_count += 1
22+
logger.info("[Scheduler] 16:05 배치 예측 시작")
23+
start_time = time.time()
24+
25+
start_id, end_id = 1, 100
26+
chunk_size = 20
27+
max_workers = 6
28+
cooldown_sec = 2
29+
30+
success_count, fail_count, failed_symbols = BatchService.run_batch_in_chunks(
31+
start_id=start_id,
32+
end_id=end_id,
33+
chunk_size=chunk_size,
34+
max_workers=max_workers,
35+
cooldown_sec=cooldown_sec
36+
)
4637

47-
duration = time.time() - start
38+
duration = time.time() - start_time
4839
logger.info(
49-
f"[Scheduler] 배치 예측 완료 - 성공: {success_count}, 실패: {fail_count}, 소요 시간: {duration:.2f}s"
40+
f"[Scheduler] 배치 예측 완료 - 성공: {success_count}, 실패: {fail_count}, 소요: {duration:.2f}s"
5041
)
5142

43+
# Discord 알림
5244
# 스레드에서 비동기 함수 실행
5345
try:
5446
import threading
5547
def run_notification():
56-
asyncio.run(notify_discord_async(success_count, fail_count, duration, failed_symbols))
48+
asyncio.run(
49+
notify_discord_async(success_count, fail_count, duration, failed_symbols, 1404342496221462539))
5750

5851
notification_thread = threading.Thread(target=run_notification)
5952
notification_thread.start()

app/services/batch_service.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,37 @@ def run_batch_in_chunks(start_id=1, end_id=100, chunk_size=20, max_workers=6, co
2929
"""
3030
logger.info(f"분할 배치 시작: {start_id} ~ {end_id} (chunk: {chunk_size})")
3131

32+
success_count = 0
33+
fail_count = 0
34+
failed_symbols = []
35+
3236
for start in range(start_id, end_id + 1, chunk_size):
3337
end = min(start + chunk_size - 1, end_id)
3438

3539
logger.info(f"예측 구간: {start} ~ {end}")
36-
BatchService.batch_predict_and_save(start_id=start, end_id=end, max_workers=max_workers)
40+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
41+
futures = {
42+
executor.submit(BatchService.process_single_stock, stock_id): stock_id
43+
for stock_id in range(start, end + 1)
44+
}
45+
for future in as_completed(futures):
46+
stock_id = futures[future]
47+
try:
48+
stock_id, success, symbol_or_msg = future.result()
49+
if success:
50+
success_count += 1
51+
else:
52+
fail_count += 1
53+
failed_symbols.append(symbol_or_msg)
54+
except Exception as e:
55+
logger.exception(f"[{stock_id}] 처리 중 예외 발생: {str(e)}")
56+
fail_count += 1
3757

3858
if cooldown_sec > 0:
3959
time.sleep(cooldown_sec)
4060

4161
logger.info("전체 분할 배치 완료")
62+
return success_count, fail_count, failed_symbols
4263

4364
@staticmethod
4465
def process_single_stock(stock_id: int) -> tuple[int, bool, str]:

app/utils/discord_notifier.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
logger = logging.getLogger(__name__)
77

8-
DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL")
9-
108

119
def format_duration(seconds: float) -> str:
1210
seconds = int(seconds)
@@ -29,11 +27,15 @@ async def notify_discord_async(
2927
success_count: int,
3028
fail_count: int,
3129
duration: float,
32-
failed_symbols: list[str] = None
30+
failed_symbols: list[str] = None,
31+
mention_role_id: str = None
3332
) -> None:
3433
"""
3534
배치 예측 결과를 디스코드로 전송하는 비동기 함수
3635
"""
36+
import os
37+
DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL")
38+
3739
if not DISCORD_WEBHOOK_URL:
3840
logger.warning("DISCORD_WEBHOOK_URL이 설정되지 않았습니다.")
3941
return
@@ -42,8 +44,11 @@ async def notify_discord_async(
4244
formatted_duration = format_duration(duration)
4345
today_str = datetime.now().strftime('%Y-%m-%d')
4446

47+
# 역할 멘션 문자열 생성
48+
mention_str = f"<@&{mention_role_id}> " if mention_role_id else ""
49+
4550
content = f"""
46-
📊 배치 예측 완료 ({today_str})
51+
{mention_str}📊 배치 예측 완료 ({today_str})
4752
- 종목 수: {total}
4853
- 성공: {success_count}
4954
- 실패: {fail_count}

0 commit comments

Comments
 (0)