diff --git a/app/core/job_log_context.py b/app/core/job_log_context.py index b1042e5..06031ec 100644 --- a/app/core/job_log_context.py +++ b/app/core/job_log_context.py @@ -18,6 +18,11 @@ def init_job_log(job_id: str) -> None: _logs_var.set([]) +def get_current_job_id() -> str | None: + """현재 job id를 반환한다.""" + return _job_id_var.get() + + def append_job_log( stage: str, message: str, diff --git a/app/core/llm_router.py b/app/core/llm_router.py index 6e8c58e..ffe8096 100644 --- a/app/core/llm_router.py +++ b/app/core/llm_router.py @@ -10,9 +10,10 @@ from langchain_openai import ChatOpenAI from app.core.config import Settings, get_settings -from app.core.job_log_context import append_job_log +from app.core.job_log_context import append_job_log, get_current_job_id from app.core.logger import get_logger from app.core.timeout_policy import get_timeout_policy +from app.services.webhook_notification import notify_pipeline_event, schedule_webhook logger = get_logger(__name__) @@ -166,6 +167,64 @@ def _log_failure( ) +def _queue_llm_event( + *, + event_type: str, + severity: str, + stage: Stage, + status: str, + title: str, + message: str, + elapsed_ms: int, + model: str, + fallback_used: bool, + error: str | None = None, +) -> None: + schedule_webhook( + notify_pipeline_event( + event_type=event_type, + severity=severity, + stage=stage.value, + status=status, + title=title, + message=message, + job_id=get_current_job_id(), + elapsed_ms=elapsed_ms, + model=model, + fallback_used=fallback_used, + error=error, + ) + ) + + +async def _send_llm_event( + *, + event_type: str, + severity: str, + stage: Stage, + status: str, + title: str, + message: str, + elapsed_ms: int, + model: str, + fallback_used: bool, + error: str | None = None, +) -> None: + await notify_pipeline_event( + event_type=event_type, + severity=severity, + stage=stage.value, + status=status, + title=title, + message=message, + job_id=get_current_job_id(), + elapsed_ms=elapsed_ms, + model=model, + fallback_used=fallback_used, + error=error, + ) + + def invoke( stage: Stage, payload: Any, @@ -199,6 +258,17 @@ def invoke( fallback_used=False, latency_ms=latency, ) + _queue_llm_event( + event_type="llm_call_success", + severity="success", + stage=stage, + status="SUCCESS", + title="🤖 LLM Call Succeeded", + message="LLM 호출이 정상적으로 완료되었습니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + ) append_job_log( "llm", f"stage={stage.value} model={selected_model} tier={tier.value if tier else 'N/A'} fallback=false", @@ -225,6 +295,18 @@ def invoke( f"fallback=false err={type(exc).__name__}", {"latency_ms": latency}, ) + _queue_llm_event( + event_type="llm_call_failed", + severity="error", + stage=stage, + status="FAILED", + title="🤖 LLM Call Failed", + message="LLM 호출이 실패했습니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + error=type(exc).__name__, + ) raise _log_failure( @@ -243,6 +325,18 @@ def invoke( f"fallback=false err={type(exc).__name__} retrying=true", {"latency_ms": latency}, ) + _queue_llm_event( + event_type="llm_call_retry", + severity="warning", + stage=stage, + status="RETRYING", + title="🔁 LLM Call Retrying", + message="1차 LLM 호출이 실패해 fallback 모델로 재시도합니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + error=type(exc).__name__, + ) fallback_started = perf_counter() fallback_client = _get_chat_openai_client( @@ -262,6 +356,17 @@ def invoke( fallback_used=True, latency_ms=fb_latency, ) + _queue_llm_event( + event_type="llm_fallback_success", + severity="warning", + stage=stage, + status="SUCCESS", + title="🔁 LLM Fallback Succeeded", + message="fallback 모델 호출이 성공했습니다.", + elapsed_ms=fb_latency, + model=fallback_model, + fallback_used=True, + ) append_job_log( "llm", f"stage={stage.value} model={fallback_model} tier={tier.value if tier else 'N/A'} fallback=true", @@ -287,6 +392,18 @@ def invoke( f"fallback=true err={type(fallback_exc).__name__}", {"latency_ms": fb_latency}, ) + _queue_llm_event( + event_type="llm_fallback_failed", + severity="error", + stage=stage, + status="FAILED", + title="🔁 LLM Fallback Failed", + message="fallback 모델 호출도 실패했습니다.", + elapsed_ms=fb_latency, + model=fallback_model, + fallback_used=True, + error=type(fallback_exc).__name__, + ) raise @@ -323,6 +440,17 @@ async def ainvoke( fallback_used=False, latency_ms=latency, ) + await _send_llm_event( + event_type="llm_call_success", + severity="success", + stage=stage, + status="SUCCESS", + title="🤖 LLM Call Succeeded", + message="LLM 호출이 정상적으로 완료되었습니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + ) append_job_log( "llm", f"stage={stage.value} model={selected_model} tier={tier.value if tier else 'N/A'} fallback=false", @@ -349,6 +477,18 @@ async def ainvoke( f"fallback=false err={type(exc).__name__}", {"latency_ms": latency}, ) + await _send_llm_event( + event_type="llm_call_failed", + severity="error", + stage=stage, + status="FAILED", + title="🤖 LLM Call Failed", + message="LLM 호출이 실패했습니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + error=type(exc).__name__, + ) raise _log_failure( @@ -367,6 +507,18 @@ async def ainvoke( f"fallback=false err={type(exc).__name__} retrying=true", {"latency_ms": latency}, ) + await _send_llm_event( + event_type="llm_call_retry", + severity="warning", + stage=stage, + status="RETRYING", + title="🔁 LLM Call Retrying", + message="1차 LLM 호출이 실패해 fallback 모델로 재시도합니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + error=type(exc).__name__, + ) fallback_started = perf_counter() fallback_client = _get_chat_openai_client( @@ -386,6 +538,17 @@ async def ainvoke( fallback_used=True, latency_ms=fb_latency, ) + await _send_llm_event( + event_type="llm_fallback_success", + severity="warning", + stage=stage, + status="SUCCESS", + title="🔁 LLM Fallback Succeeded", + message="fallback 모델 호출이 성공했습니다.", + elapsed_ms=fb_latency, + model=fallback_model, + fallback_used=True, + ) append_job_log( "llm", f"stage={stage.value} model={fallback_model} tier={tier.value if tier else 'N/A'} fallback=true", @@ -411,4 +574,16 @@ async def ainvoke( f"fallback=true err={type(fallback_exc).__name__}", {"latency_ms": fb_latency}, ) + await _send_llm_event( + event_type="llm_fallback_failed", + severity="error", + stage=stage, + status="FAILED", + title="🔁 LLM Fallback Failed", + message="fallback 모델 호출도 실패했습니다.", + elapsed_ms=fb_latency, + model=fallback_model, + fallback_used=True, + error=type(fallback_exc).__name__, + ) raise diff --git a/app/graph/chat/nodes/analyze_intent.py b/app/graph/chat/nodes/analyze_intent.py index c893ae0..86d4efa 100644 --- a/app/graph/chat/nodes/analyze_intent.py +++ b/app/graph/chat/nodes/analyze_intent.py @@ -17,6 +17,7 @@ from app.graph.roadmap.utils import strip_code_fence from app.schemas.chat import ChatIntent from app.schemas.enums import ChatOperation, ChatStatus +from app.services.webhook_notification import notify_pipeline_event, schedule_webhook logger = get_logger(__name__) @@ -544,6 +545,20 @@ def _parse_modification_intent( raise ValueError("수정 의도 응답 파싱에 실패했습니다.") from exc +def _emit_intent_event(event_type: str, severity: str, status: str, message: str, extra_fields: list[dict]) -> None: + schedule_webhook( + notify_pipeline_event( + event_type=event_type, + severity=severity, + stage="chat.intent", + status=status, + title="🧭 Intent Analysis Stage", + message=message, + extra_fields=extra_fields, + ) + ) + + def analyze_intent(state: ChatState) -> ChatState: """사용자 요청을 GENERAL_CHAT 또는 MODIFICATION으로 분류합니다.""" current_itinerary = state.get("current_itinerary") @@ -566,9 +581,26 @@ def analyze_intent(state: ChatState) -> ChatState: f"type={route.intent_type} action={route.requested_action} scope={route.target_scope}", ) if route.intent_type == "GENERAL_CHAT": + _emit_intent_event( + "chat_intent_routed", + "info", + "GENERAL_CHAT", + "일반 대화로 분류되었습니다.", + [ + {"name": "Action", "value": route.requested_action or "N/A", "inline": True}, + {"name": "Scope", "value": route.target_scope or "N/A", "inline": True}, + ], + ) return {**state, "intent_type": "GENERAL_CHAT"} if route.requested_action == "DELETE" and route.target_scope == "DAY_LEVEL": + _emit_intent_event( + "chat_intent_rejected", + "warning", + "REJECTED", + "일차 삭제 요청이 거부되었습니다.", + [{"name": "Reason", "value": "일차 삭제는 지원하지 않습니다.", "inline": False}], + ) return { **state, "intent_type": "MODIFICATION", @@ -577,6 +609,13 @@ def analyze_intent(state: ChatState) -> ChatState: } if route.requested_action == "DELETE" and route.target_scope == "UNKNOWN": + _emit_intent_event( + "chat_intent_clarification", + "warning", + "ASK_CLARIFICATION", + "삭제 대상이 모호해 확인이 필요합니다.", + [{"name": "Reason", "value": "삭제할 일차와 장소 순서가 모두 필요합니다.", "inline": False}], + ) return { **state, "intent_type": "MODIFICATION", @@ -585,6 +624,13 @@ def analyze_intent(state: ChatState) -> ChatState: } if route.target_scope == "DAY_LEVEL" or _is_day_or_date_change_request(user_query): + _emit_intent_event( + "chat_intent_rejected", + "warning", + "REJECTED", + "일차 변경 요청이 거부되었습니다.", + [{"name": "Reason", "value": "일차 자체는 변경할 수 없습니다.", "inline": False}], + ) return { **state, "intent_type": "MODIFICATION", @@ -608,11 +654,36 @@ def analyze_intent(state: ChatState) -> ChatState: f" clarify={intent_draft.needs_clarification}", level="detail", ) + _emit_intent_event( + "chat_intent_parsed", + "info", + "MODIFICATION", + "수정 의도가 파싱되었습니다.", + [ + {"name": "Op", "value": str(intent_draft.op), "inline": True}, + {"name": "Day", "value": str(intent_draft.target_day), "inline": True}, + {"name": "Index", "value": str(intent_draft.target_index), "inline": True}, + ], + ) except Exception as exc: logger.error("의도 분석 LLM 호출 실패: %s", exc) + _emit_intent_event( + "chat_intent_failed", + "error", + "FAILED", + "수정 의도 분석에 실패했습니다.", + [{"name": "Error", "value": type(exc).__name__, "inline": False}], + ) return {**state, "error": "수정 의도 분석에 실패했습니다."} if intent_draft.needs_clarification: + _emit_intent_event( + "chat_intent_clarification", + "warning", + "ASK_CLARIFICATION", + "추가 확인이 필요한 수정 요청입니다.", + [{"name": "Keyword", "value": intent_draft.search_keyword or "N/A", "inline": False}], + ) return { **state, "intent_type": "MODIFICATION", diff --git a/app/graph/chat/nodes/mutate.py b/app/graph/chat/nodes/mutate.py index 6d90376..c9f3454 100644 --- a/app/graph/chat/nodes/mutate.py +++ b/app/graph/chat/nodes/mutate.py @@ -16,6 +16,7 @@ from app.schemas.enums import ChatOperation, ChatStatus from app.services.google_places_service import get_google_places_service from app.services.place_rerank_service import select_place_id_for_chat +from app.services.webhook_notification import notify_pipeline_event logger = get_logger(__name__) @@ -183,6 +184,20 @@ async def mutate(state: ChatState) -> ChatState: day["places"] = places diff_str = ",".join(diff_keys) append_job_log("mutate", f"op={op} day={target_day_num} idx={target_index} n={len(search_results)} diff={diff_str}") + await notify_pipeline_event( + event_type="chat_mutate_completed", + severity="success", + stage="chat.mutate", + status=str(state.get("status") or ChatStatus.SUCCESS), + title="🛠️ Mutate Stage Completed", + message="수정 작업이 적용되었습니다.", + extra_fields=[ + {"name": "Op", "value": str(op), "inline": True}, + {"name": "Day", "value": str(target_day_num), "inline": True}, + {"name": "Index", "value": str(target_index), "inline": True}, + {"name": "Diff", "value": diff_str or "none", "inline": False}, + ], + ) return { **state, diff --git a/app/graph/chat/nodes/respond.py b/app/graph/chat/nodes/respond.py index 23095e3..8132ea7 100644 --- a/app/graph/chat/nodes/respond.py +++ b/app/graph/chat/nodes/respond.py @@ -9,6 +9,7 @@ from app.core.logger import get_logger from app.graph.chat.state import ChatState from app.schemas.enums import ChatStatus +from app.services.webhook_notification import notify_pipeline_event, schedule_webhook logger = get_logger(__name__) @@ -92,4 +93,15 @@ def respond(state: ChatState) -> ChatState: final_status = status if status else ChatStatus.SUCCESS append_job_log("respond", f"status={final_status}") + schedule_webhook( + notify_pipeline_event( + event_type="chat_response_completed", + severity="success" if str(final_status) == ChatStatus.SUCCESS.value else "warning", + stage="chat.respond", + status=str(final_status), + title="💬 Respond Stage Completed", + message="최종 응답 생성을 완료했습니다.", + extra_fields=[{"name": "Status", "value": str(final_status), "inline": True}], + ) + ) return {**state, "status": final_status, "message": generated} diff --git a/app/graph/roadmap/nodes/finalize.py b/app/graph/roadmap/nodes/finalize.py index 6750ae0..277fcf5 100644 --- a/app/graph/roadmap/nodes/finalize.py +++ b/app/graph/roadmap/nodes/finalize.py @@ -25,6 +25,7 @@ from app.graph.roadmap.state import RoadmapState from app.graph.roadmap.utils import build_slot_key, strip_code_fence from app.schemas.course import CourseRequest, CourseResponseLLMOutput, PlanningPreference +from app.services.webhook_notification import notify_pipeline_event logger = get_logger(__name__) @@ -302,12 +303,30 @@ async def synthesize_final_roadmap(state: RoadmapState) -> RoadmapState: daily_places = await _fill_place_descriptions_with_llm(daily_places) desc_count = sum(1 for d in daily_places for p in d.get("places", []) if p.get("description")) append_job_log("finalize_desc", f"place_descriptions_filled={desc_count}") + await notify_pipeline_event( + event_type="roadmap_finalize_desc", + severity="success", + stage="roadmap.finalize", + status="SUCCESS", + title="🧩 Finalize Description Completed", + message="장소 설명 보강이 완료되었습니다.", + extra_fields=[{"name": "Descriptions", "value": str(desc_count), "inline": True}], + ) daily_places = await _apply_visit_time_for_daily_places( daily_places, course_request.planning_preference, ) append_job_log("finalize_time", f"preference={course_request.planning_preference}") + await notify_pipeline_event( + event_type="roadmap_finalize_time", + severity="success", + stage="roadmap.finalize", + status="SUCCESS", + title="⏰ Finalize Time Completed", + message="visit time 보정이 완료되었습니다.", + extra_fields=[{"name": "Preference", "value": str(course_request.planning_preference), "inline": True}], + ) course_request_payload = course_request.model_dump(mode="json") course_request_payload.pop("budget_range", None) @@ -372,9 +391,30 @@ async def synthesize_final_roadmap(state: RoadmapState) -> RoadmapState: total_places = sum(len(d.get("places", [])) for d in daily_places) append_job_log("finalize", f"title={final_roadmap.get('title', '')} places={total_places}") + await notify_pipeline_event( + event_type="roadmap_finalize_completed", + severity="success", + stage="roadmap.finalize", + status="SUCCESS", + title="✅ Finalize Stage Completed", + message="최종 로드맵 생성이 완료되었습니다.", + extra_fields=[ + {"name": "Title", "value": final_roadmap.get("title", ""), "inline": False}, + {"name": "Places", "value": str(total_places), "inline": True}, + ], + ) return {**state, "final_roadmap": final_roadmap} except Exception as exc: append_job_log("finalize", f"error: {type(exc).__name__} (see server logs)") logger.error("최종 로드맵 생성 실패: %s", exc, exc_info=True) + await notify_pipeline_event( + event_type="roadmap_finalize_failed", + severity="error", + stage="roadmap.finalize", + status="FAILED", + title="❌ Finalize Stage Failed", + message="최종 로드맵 생성 중 오류가 발생했습니다.", + error=type(exc).__name__, + ) return {**state, "error": f"최종 로드맵 생성에 실패했습니다: {exc}"} diff --git a/app/graph/roadmap/nodes/places.py b/app/graph/roadmap/nodes/places.py index b291488..03cb339 100644 --- a/app/graph/roadmap/nodes/places.py +++ b/app/graph/roadmap/nodes/places.py @@ -18,6 +18,7 @@ from app.services.google_places_service import get_google_places_service from app.services.place_rerank_service import select_place_ids_for_day from app.services.places_service import PlacesServiceProtocol +from app.services.webhook_notification import notify_pipeline_event logger = get_logger(__name__) @@ -357,6 +358,20 @@ async def rerank_for_day(day: dict) -> None: "places_fetch", f"slots={len(fetched_places)} rerank={rerank_enabled} empty={empty_count} {fb_summary}", ) + await notify_pipeline_event( + event_type="roadmap_places_completed", + severity="success", + stage="roadmap.places", + status="SUCCESS", + title="📍 Places Fetch Completed", + message="슬롯별 장소 후보 수집이 완료되었습니다.", + extra_fields=[ + {"name": "Slots", "value": str(len(fetched_places)), "inline": True}, + {"name": "Rerank", "value": str(rerank_enabled), "inline": True}, + {"name": "Empty", "value": str(empty_count), "inline": True}, + {"name": "Fallbacks", "value": fb_summary or "none", "inline": False}, + ], + ) logger.info("Slot place fetch completed: slot_count=%d", len(fetched_places)) return { diff --git a/app/graph/roadmap/nodes/skeleton.py b/app/graph/roadmap/nodes/skeleton.py index e2fd753..c19b86d 100644 --- a/app/graph/roadmap/nodes/skeleton.py +++ b/app/graph/roadmap/nodes/skeleton.py @@ -17,6 +17,7 @@ from app.graph.roadmap.utils import strip_code_fence from app.schemas.course import CourseRequest, PacePreference, RegionDateRange from app.schemas.skeleton import SkeletonPlan +from app.services.webhook_notification import notify_pipeline_event, schedule_webhook logger = get_logger(__name__) @@ -779,6 +780,22 @@ def generate_skeleton(state: RoadmapState) -> RoadmapState: region_str = ",".join(regions) warn_count = len(warnings) append_job_log("skeleton_done", f"days={total_days} slots={total_slots} regions={region_str} warnings={warn_count}") + schedule_webhook( + notify_pipeline_event( + event_type="roadmap_skeleton_completed", + severity="success", + stage="roadmap.skeleton", + status="SUCCESS", + title="🧱 Skeleton Stage Completed", + message="로드맵 스켈레톤 생성이 완료되었습니다.", + extra_fields=[ + {"name": "Days", "value": str(total_days), "inline": True}, + {"name": "Slots", "value": str(total_slots), "inline": True}, + {"name": "Regions", "value": region_str, "inline": False}, + {"name": "Warnings", "value": str(warn_count), "inline": True}, + ], + ) + ) return { **state, diff --git a/app/services/chat_service.py b/app/services/chat_service.py index 885317d..a7555bb 100644 --- a/app/services/chat_service.py +++ b/app/services/chat_service.py @@ -17,7 +17,7 @@ from app.schemas.enums import ChatStatus from app.services.callback_delivery import post_callback_with_retry from app.services.callback_url import build_callback_url -from app.services.webhook_notification import notify_job_completed, notify_timeout +from app.services.webhook_notification import notify_job_completed, notify_pipeline_event, notify_timeout logger = get_logger(__name__) @@ -136,6 +136,15 @@ async def process_chat_request(request: ChatRequest) -> None: append_job_log( "job_start", f"type=chat job_id={request.job_id} query_len={len(request.user_query)} query_hash={query_hash}" ) + await notify_pipeline_event( + event_type="chat_started", + severity="info", + stage="chat", + status="STARTED", + title="💬 Chat Job Started", + message="로드맵 수정 작업이 시작되었습니다.", + job_id=request.job_id, + ) status = "SUCCESS" try: @@ -165,6 +174,16 @@ async def process_chat_request(request: ChatRequest) -> None: _, logs, elapsed = collect_job_logs() await notify_job_completed(request.job_id, "chat", elapsed, status, logs) + await notify_pipeline_event( + event_type="chat_completed", + severity="success" if status == "SUCCESS" else "error", + stage="chat", + status=status, + title="✅ Chat Job Finished", + message="로드맵 수정 작업이 종료되었습니다.", + job_id=request.job_id, + elapsed_ms=round(elapsed * 1000), + ) callback_endpoint = build_callback_url( str(request.callback_url), request.job_id, "itineraries/{job_id}/chat-result" diff --git a/app/services/generate_service.py b/app/services/generate_service.py index 81fa9f8..be56e58 100644 --- a/app/services/generate_service.py +++ b/app/services/generate_service.py @@ -14,7 +14,7 @@ from app.services.callback_delivery import post_callback_with_retry from app.services.callback_url import build_callback_url from app.services.google_places_service import get_google_places_service -from app.services.webhook_notification import notify_job_completed, notify_timeout +from app.services.webhook_notification import notify_job_completed, notify_pipeline_event, notify_timeout logger = get_logger(__name__) @@ -61,6 +61,15 @@ async def process_generate_request(job_id: str, callback_url: str, payload: Cour timeout_policy = get_timeout_policy(settings) init_job_log(job_id) append_job_log("job_start", f"type=generate job_id={job_id}") + await notify_pipeline_event( + event_type="generate_started", + severity="info", + stage="generate", + status="STARTED", + title="🚀 Generate Job Started", + message="로드맵 생성 작업이 시작되었습니다.", + job_id=job_id, + ) status = "SUCCESS" try: @@ -90,6 +99,16 @@ async def process_generate_request(job_id: str, callback_url: str, payload: Cour _, logs, elapsed = collect_job_logs() await notify_job_completed(job_id, "generate", elapsed, status, logs) + await notify_pipeline_event( + event_type="generate_completed", + severity="success" if status == "SUCCESS" else "error", + stage="generate", + status=status, + title="✅ Generate Job Finished", + message="로드맵 생성 작업이 종료되었습니다.", + job_id=job_id, + elapsed_ms=round(elapsed * 1000), + ) callback_endpoint = build_callback_url(callback_url, job_id, "itineraries/{job_id}/result") await _post_callback( diff --git a/app/services/webhook_notification.py b/app/services/webhook_notification.py index 4d324c6..3bb224b 100644 --- a/app/services/webhook_notification.py +++ b/app/services/webhook_notification.py @@ -2,9 +2,10 @@ from __future__ import annotations +import asyncio import platform from datetime import datetime, timezone -from typing import Any +from typing import Any, Awaitable import httpx @@ -22,6 +23,16 @@ _COLOR_GRAY = 0x95A5A6 +def schedule_webhook(coro: Awaitable[Any]) -> None: + """현재 이벤트 루프에 웹훅 전송 작업을 예약한다.""" + try: + loop = asyncio.get_running_loop() + except RuntimeError: + asyncio.run(coro) + return + loop.create_task(coro) + + def _get_webhook_url() -> str | None: settings = get_settings() url = settings.DISCORD_WEBHOOK_URL @@ -48,70 +59,146 @@ async def _send_embed(embed: dict[str, Any]) -> None: logger.warning("Discord webhook send failed: %s", exc) +def _severity_color(severity: str) -> int: + normalized = severity.strip().lower() + if normalized == "error": + return _COLOR_RED + if normalized == "warning": + return _COLOR_ORANGE + if normalized == "success": + return _COLOR_GREEN + return _COLOR_BLUE + + +def _append_field(fields: list[dict[str, Any]], name: str, value: Any, inline: bool = True) -> None: + if value is None: + return + text = str(value) + if not text: + return + fields.append({"name": name, "value": text, "inline": inline}) + + +def _format_event_title(event_type: str, title: str | None) -> str: + if title: + return title + return event_type.replace("_", " ").title() + + +async def notify_pipeline_event( + *, + event_type: str, + severity: str, + stage: str, + status: str, + message: str, + title: str | None = None, + description: str | None = None, + job_id: str | None = None, + elapsed_ms: int | None = None, + model: str | None = None, + fallback_used: bool | None = None, + error: str | None = None, + extra_fields: list[dict[str, Any]] | None = None, +) -> None: + """공통 단계 이벤트를 Discord 웹훅으로 전송한다.""" + fields: list[dict[str, Any]] = [] + _append_field(fields, "event_type", event_type) + _append_field(fields, "severity", severity) + _append_field(fields, "stage", stage) + _append_field(fields, "status", status) + _append_field(fields, "job_id", f"`{job_id}`" if job_id else None) + _append_field(fields, "message", message, inline=False) + _append_field(fields, "elapsed_ms", f"{elapsed_ms}ms" if elapsed_ms is not None else None) + _append_field(fields, "model", model) + _append_field(fields, "fallback_used", fallback_used) + _append_field(fields, "error", f"```{error[:1000]}```" if error else None, inline=False) + if extra_fields: + fields.extend(extra_fields) + + payload: dict[str, Any] = { + "title": _format_event_title(event_type, title), + "color": _severity_color(severity), + "fields": fields, + } + if description is not None: + payload["description"] = description + + await _send_embed(payload) + + async def notify_server_start() -> None: settings = get_settings() - await _send_embed( - { - "title": "🟢 Server Started", - "color": _COLOR_GREEN, - "fields": [ - {"name": "Env", "value": settings.APP_ENV, "inline": True}, - {"name": "Python", "value": platform.python_version(), "inline": True}, - {"name": "Host", "value": platform.node(), "inline": True}, - ], - } + await notify_pipeline_event( + event_type="server_start", + severity="success", + stage="server", + status="READY", + title="🟢 Server Started", + message="FastAPI 애플리케이션이 시작되었습니다.", + extra_fields=[ + {"name": "Env", "value": settings.APP_ENV, "inline": True}, + {"name": "Python", "value": platform.python_version(), "inline": True}, + {"name": "Host", "value": platform.node(), "inline": True}, + ], ) async def notify_server_shutdown() -> None: - await _send_embed( - { - "title": "🔴 Server Shutdown", - "color": _COLOR_RED, - "fields": [ - {"name": "Host", "value": platform.node(), "inline": True}, - ], - } + await notify_pipeline_event( + event_type="server_shutdown", + severity="error", + stage="server", + status="STOPPED", + title="🔴 Server Shutdown", + message="FastAPI 애플리케이션이 종료되었습니다.", + extra_fields=[{"name": "Host", "value": platform.node(), "inline": True}], ) async def notify_error_500(method: str, path: str, error: str) -> None: - await _send_embed( - { - "title": "🔥 500 Internal Server Error", - "color": _COLOR_RED, - "fields": [ - {"name": "Endpoint", "value": f"`{method} {path}`", "inline": False}, - {"name": "Error", "value": f"```{error[:1000]}```", "inline": False}, - ], - } + await notify_pipeline_event( + event_type="http_500", + severity="error", + stage="http", + status="FAILED", + title="🔥 500 Internal Server Error", + message="처리되지 않은 예외가 전역 예외 처리기에 도달했습니다.", + extra_fields=[ + {"name": "Endpoint", "value": f"`{method} {path}`", "inline": False}, + {"name": "Error", "value": f"```{error[:1000]}```", "inline": False}, + ], ) async def notify_timeout(job_id: str, job_type: str, elapsed_seconds: float) -> None: - await _send_embed( - { - "title": "⏱️ Pipeline Timeout", - "color": _COLOR_ORANGE, - "fields": [ - {"name": "job_id", "value": f"`{job_id}`", "inline": True}, - {"name": "Type", "value": job_type, "inline": True}, - {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, - ], - } + await notify_pipeline_event( + event_type="pipeline_timeout", + severity="warning", + stage=job_type, + status="TIMEOUT", + title="⏱️ Pipeline Timeout", + message=f"{job_type} 작업이 제한 시간을 초과했습니다.", + job_id=job_id, + extra_fields=[ + {"name": "Type", "value": job_type, "inline": True}, + {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, + ], ) async def notify_request_timeout(method: str, path: str, elapsed_seconds: float) -> None: - await _send_embed( - { - "title": "⏱️ Request Timeout", - "color": _COLOR_ORANGE, - "fields": [ - {"name": "Endpoint", "value": f"`{method} {path}`", "inline": False}, - {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, - ], - } + await notify_pipeline_event( + event_type="request_timeout", + severity="warning", + stage="http", + status="TIMEOUT", + title="⏱️ Request Timeout", + message="HTTP 요청이 설정된 타임아웃을 초과했습니다.", + extra_fields=[ + {"name": "Endpoint", "value": f"`{method} {path}`", "inline": False}, + {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, + ], ) @@ -160,19 +247,22 @@ async def notify_job_completed( logs: list[dict[str, Any]], ) -> None: description = _format_log_description(logs) - color = _COLOR_GREEN if status == "SUCCESS" else _COLOR_RED - - await _send_embed( - { - "title": f"📋 {job_type} Job Completed", - "color": color, - "description": description, - "fields": [ - {"name": "job_id", "value": f"`{job_id}`", "inline": True}, - {"name": "Status", "value": status, "inline": True}, - {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, - ], - } + severity = "success" if status == "SUCCESS" else "error" + + await notify_pipeline_event( + event_type=f"{job_type}_job_completed", + severity=severity, + stage=job_type, + status=status, + title=f"📋 {job_type} Job Completed", + message="수집된 작업 로그를 요약해서 전송합니다.", + description=description, + job_id=job_id, + elapsed_ms=round(elapsed_seconds * 1000), + extra_fields=[ + {"name": "Status", "value": status, "inline": True}, + {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, + ], ) @@ -182,15 +272,17 @@ async def notify_callback_failure( callback_url: str, error: str, ) -> None: - await _send_embed( - { - "title": "🚨 Callback Delivery Failed", - "color": _COLOR_RED, - "fields": [ - {"name": "job_id", "value": f"`{job_id}`", "inline": True}, - {"name": "Type", "value": callback_type, "inline": True}, - {"name": "URL", "value": f"`{callback_url[:200]}`", "inline": False}, - {"name": "Error", "value": f"```{error[:500]}```", "inline": False}, - ], - } + await notify_pipeline_event( + event_type="callback_delivery_failed", + severity="error", + stage=callback_type, + status="FAILED", + title="🚨 Callback Delivery Failed", + message="콜백 전송이 재시도 후에도 실패했습니다.", + job_id=job_id, + error=error, + extra_fields=[ + {"name": "Type", "value": callback_type, "inline": True}, + {"name": "URL", "value": f"`{callback_url[:200]}`", "inline": False}, + ], ) diff --git a/tests/test_llm_router_webhooks.py b/tests/test_llm_router_webhooks.py new file mode 100644 index 0000000..d0ac668 --- /dev/null +++ b/tests/test_llm_router_webhooks.py @@ -0,0 +1,114 @@ +"""LLM router webhook event tests.""" + +from __future__ import annotations + +import asyncio +from types import SimpleNamespace + +import pytest + +from app.core.config import get_settings +from app.core.llm_router import Stage + + +def _set_required_env(monkeypatch, **overrides: str) -> None: + monkeypatch.setenv("OPENAI_API_KEY", "test-key") + monkeypatch.setenv("SERVICE_SECRET", "test-service-secret") + monkeypatch.setenv("HMAC_SECRET", "test-hmac-secret") + monkeypatch.setenv("DISCORD_WEBHOOK_URL", "https://discord.com/api/webhooks/123/abc") + for key, value in overrides.items(): + monkeypatch.setenv(key, value) + get_settings.cache_clear() + + +def _event_names(payloads: list[dict]) -> list[str]: + event_names: list[str] = [] + for payload in payloads: + event_type = next(field["value"] for field in payload["fields"] if field["name"] == "event_type") + event_names.append(event_type) + return event_names + + +def test_invoke_emits_success_webhook(monkeypatch) -> None: + _set_required_env(monkeypatch, ENABLE_STAGE_LLM_ROUTING="false", LLM_MODEL_NAME="fallback-model") + import app.core.llm_router as llm_router + + payloads: list[dict] = [] + + async def _fake_send(embed: dict) -> None: + payloads.append(embed) + + class _Client: + def invoke(self, payload): + return SimpleNamespace(content="ok") + + monkeypatch.setattr("app.services.webhook_notification._send_embed", _fake_send) + monkeypatch.setattr(llm_router, "_get_chat_openai_client", lambda *args, **kwargs: _Client()) + monkeypatch.setattr(llm_router, "schedule_webhook", lambda coro: asyncio.run(coro)) + + response = llm_router.invoke(Stage.CHAT_RESPONSE, "prompt") + + assert response.content == "ok" + assert _event_names(payloads) == ["llm_call_success"] + + +def test_ainvoke_emits_retry_and_fallback_success(monkeypatch) -> None: + _set_required_env( + monkeypatch, + ENABLE_STAGE_LLM_ROUTING="true", + LLM_MODEL_QUALITY="primary-model", + LLM_MODEL_NAME="fallback-model", + ) + import app.core.llm_router as llm_router + + payloads: list[dict] = [] + + async def _fake_send(embed: dict) -> None: + payloads.append(embed) + + class _Client: + def __init__(self, model: str) -> None: + self.model = model + + async def ainvoke(self, payload): + if self.model == "primary-model": + raise RuntimeError("primary-fail") + return SimpleNamespace(content="fallback-ok") + + monkeypatch.setattr("app.services.webhook_notification._send_embed", _fake_send) + monkeypatch.setattr(llm_router, "_get_chat_openai_client", lambda model, *args, **kwargs: _Client(model)) + + response = asyncio.run(llm_router.ainvoke(Stage.ROADMAP_SKELETON, "prompt")) + + assert response.content == "fallback-ok" + assert _event_names(payloads) == ["llm_call_retry", "llm_fallback_success"] + + +def test_ainvoke_emits_fallback_failure(monkeypatch) -> None: + _set_required_env( + monkeypatch, + ENABLE_STAGE_LLM_ROUTING="true", + LLM_MODEL_QUALITY="primary-model", + LLM_MODEL_NAME="fallback-model", + ) + import app.core.llm_router as llm_router + + payloads: list[dict] = [] + + async def _fake_send(embed: dict) -> None: + payloads.append(embed) + + class _Client: + def __init__(self, model: str) -> None: + self.model = model + + async def ainvoke(self, payload): + raise RuntimeError(f"{self.model}-fail") + + monkeypatch.setattr("app.services.webhook_notification._send_embed", _fake_send) + monkeypatch.setattr(llm_router, "_get_chat_openai_client", lambda model, *args, **kwargs: _Client(model)) + + with pytest.raises(RuntimeError, match="fallback-model-fail"): + asyncio.run(llm_router.ainvoke(Stage.ROADMAP_SKELETON, "prompt")) + + assert _event_names(payloads) == ["llm_call_retry", "llm_fallback_failed"] diff --git a/tests/test_webhook_notification.py b/tests/test_webhook_notification.py new file mode 100644 index 0000000..4e5af1e --- /dev/null +++ b/tests/test_webhook_notification.py @@ -0,0 +1,74 @@ +"""Discord webhook payload formatting tests.""" + +from __future__ import annotations + +import asyncio + +from app.services import webhook_notification as webhook + + +def test_notify_pipeline_event_builds_standard_fields(monkeypatch) -> None: + captured: dict = {} + + async def _fake_send(embed: dict) -> None: + captured.update(embed) + + monkeypatch.setattr(webhook, "_send_embed", _fake_send) + + asyncio.run( + webhook.notify_pipeline_event( + event_type="demo_event", + severity="warning", + stage="demo.stage", + status="RUNNING", + title="Demo Event", + message="stage event", + job_id="job-1", + elapsed_ms=123, + model="gpt-demo", + fallback_used=False, + error="boom", + extra_fields=[{"name": "Custom", "value": "value", "inline": True}], + ) + ) + + fields = {field["name"]: field["value"] for field in captured["fields"]} + assert captured["title"] == "Demo Event" + assert fields["event_type"] == "demo_event" + assert fields["severity"] == "warning" + assert fields["stage"] == "demo.stage" + assert fields["status"] == "RUNNING" + assert fields["job_id"] == "`job-1`" + assert fields["message"] == "stage event" + assert fields["elapsed_ms"] == "123ms" + assert fields["model"] == "gpt-demo" + assert fields["fallback_used"] == "False" + assert "boom" in fields["error"] + assert fields["Custom"] == "value" + + +def test_notify_job_completed_includes_log_description(monkeypatch) -> None: + captured: dict = {} + + async def _fake_send(embed: dict) -> None: + captured.update(embed) + + monkeypatch.setattr(webhook, "_send_embed", _fake_send) + + asyncio.run( + webhook.notify_job_completed( + "job-2", + "generate", + 12.3, + "SUCCESS", + [{"stage": "skeleton", "message": "done", "elapsed_ms": 42}], + ) + ) + + fields = {field["name"]: field["value"] for field in captured["fields"]} + assert captured["title"] == "📋 generate Job Completed" + assert "**skeleton**" in captured["description"] + assert "done" in captured["description"] + assert fields["job_id"] == "`job-2`" + assert fields["Status"] == "SUCCESS" + assert fields["Elapsed"] == "12.3s"