From b6acb46b1c6adcad92854d1c7fea44fb8b96d277 Mon Sep 17 00:00:00 2001 From: iamb0ttle Date: Sat, 11 Apr 2026 20:53:21 +0900 Subject: [PATCH 1/7] =?UTF-8?q?feat=20=E2=9C=A8:=20Discord=20=EC=9B=B9?= =?UTF-8?q?=ED=9B=85=20=ED=8A=B8=EB=A6=AC=EA=B1=B0=EC=99=80=20=EB=8B=A8?= =?UTF-8?q?=EA=B3=84=EB=B3=84=20=EA=B5=AC=EC=A1=B0=EB=A5=BC=20=EB=B0=98?= =?UTF-8?q?=EC=98=81=ED=95=9C=EB=8B=A4=20(#72)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/job_log_context.py | 5 + app/core/llm_router.py | 176 +++++++++++++++++++ app/graph/chat/nodes/analyze_intent.py | 71 ++++++++ app/graph/chat/nodes/mutate.py | 15 ++ app/graph/chat/nodes/respond.py | 12 ++ app/graph/roadmap/nodes/finalize.py | 40 +++++ app/graph/roadmap/nodes/places.py | 15 ++ app/graph/roadmap/nodes/skeleton.py | 17 ++ app/services/chat_service.py | 21 ++- app/services/generate_service.py | 21 ++- app/services/webhook_notification.py | 234 +++++++++++++++++-------- tests/test_llm_router_webhooks.py | 110 ++++++++++++ tests/test_pipeline_webhooks.py | 140 +++++++++++++++ tests/test_webhook_notification.py | 74 ++++++++ 14 files changed, 878 insertions(+), 73 deletions(-) create mode 100644 tests/test_llm_router_webhooks.py create mode 100644 tests/test_pipeline_webhooks.py create mode 100644 tests/test_webhook_notification.py diff --git a/app/core/job_log_context.py b/app/core/job_log_context.py index b1042e5..06031ec 100644 --- a/app/core/job_log_context.py +++ b/app/core/job_log_context.py @@ -18,6 +18,11 @@ def init_job_log(job_id: str) -> None: _logs_var.set([]) +def get_current_job_id() -> str | None: + """현재 job id를 반환한다.""" + return _job_id_var.get() + + def append_job_log( stage: str, message: str, diff --git a/app/core/llm_router.py b/app/core/llm_router.py index 6e8c58e..a02c315 100644 --- a/app/core/llm_router.py +++ b/app/core/llm_router.py @@ -11,8 +11,10 @@ from app.core.config import Settings, get_settings from app.core.job_log_context import append_job_log +from app.core.job_log_context import get_current_job_id from app.core.logger import get_logger from app.core.timeout_policy import get_timeout_policy +from app.services.webhook_notification import notify_pipeline_event, schedule_webhook logger = get_logger(__name__) @@ -163,6 +165,64 @@ def _log_failure( "latency_ms": latency_ms, }, exc_info=exc, + ) + + +def _queue_llm_event( + *, + event_type: str, + severity: str, + stage: Stage, + status: str, + title: str, + message: str, + elapsed_ms: int, + model: str, + fallback_used: bool, + error: str | None = None, +) -> None: + schedule_webhook( + notify_pipeline_event( + event_type=event_type, + severity=severity, + stage=stage.value, + status=status, + title=title, + message=message, + job_id=get_current_job_id(), + elapsed_ms=elapsed_ms, + model=model, + fallback_used=fallback_used, + error=error, + ) + ) + + +async def _send_llm_event( + *, + event_type: str, + severity: str, + stage: Stage, + status: str, + title: str, + message: str, + elapsed_ms: int, + model: str, + fallback_used: bool, + error: str | None = None, +) -> None: + await notify_pipeline_event( + event_type=event_type, + severity=severity, + stage=stage.value, + status=status, + title=title, + message=message, + job_id=get_current_job_id(), + elapsed_ms=elapsed_ms, + model=model, + fallback_used=fallback_used, + error=error, ) @@ -199,6 +259,17 @@ def invoke( fallback_used=False, latency_ms=latency, ) + _queue_llm_event( + event_type="llm_call_success", + severity="success", + stage=stage, + status="SUCCESS", + title="🤖 LLM Call Succeeded", + message="LLM 호출이 정상적으로 완료되었습니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + ) append_job_log( "llm", f"stage={stage.value} model={selected_model} tier={tier.value if tier else 'N/A'} fallback=false", @@ -225,6 +296,18 @@ def invoke( f"fallback=false err={type(exc).__name__}", {"latency_ms": latency}, ) + _queue_llm_event( + event_type="llm_call_failed", + severity="error", + stage=stage, + status="FAILED", + title="🤖 LLM Call Failed", + message="LLM 호출이 실패했습니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + error=type(exc).__name__, + ) raise _log_failure( @@ -243,6 +326,18 @@ def invoke( f"fallback=false err={type(exc).__name__} retrying=true", {"latency_ms": latency}, ) + _queue_llm_event( + event_type="llm_call_retry", + severity="warning", + stage=stage, + status="RETRYING", + title="🔁 LLM Call Retrying", + message="1차 LLM 호출이 실패해 fallback 모델로 재시도합니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + error=type(exc).__name__, + ) fallback_started = perf_counter() fallback_client = _get_chat_openai_client( @@ -262,6 +357,17 @@ def invoke( fallback_used=True, latency_ms=fb_latency, ) + _queue_llm_event( + event_type="llm_fallback_success", + severity="warning", + stage=stage, + status="SUCCESS", + title="🔁 LLM Fallback Succeeded", + message="fallback 모델 호출이 성공했습니다.", + elapsed_ms=fb_latency, + model=fallback_model, + fallback_used=True, + ) append_job_log( "llm", f"stage={stage.value} model={fallback_model} tier={tier.value if tier else 'N/A'} fallback=true", @@ -287,6 +393,18 @@ def invoke( f"fallback=true err={type(fallback_exc).__name__}", {"latency_ms": fb_latency}, ) + _queue_llm_event( + event_type="llm_fallback_failed", + severity="error", + stage=stage, + status="FAILED", + title="🔁 LLM Fallback Failed", + message="fallback 모델 호출도 실패했습니다.", + elapsed_ms=fb_latency, + model=fallback_model, + fallback_used=True, + error=type(fallback_exc).__name__, + ) raise @@ -323,6 +441,17 @@ async def ainvoke( fallback_used=False, latency_ms=latency, ) + await _send_llm_event( + event_type="llm_call_success", + severity="success", + stage=stage, + status="SUCCESS", + title="🤖 LLM Call Succeeded", + message="LLM 호출이 정상적으로 완료되었습니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + ) append_job_log( "llm", f"stage={stage.value} model={selected_model} tier={tier.value if tier else 'N/A'} fallback=false", @@ -349,6 +478,18 @@ async def ainvoke( f"fallback=false err={type(exc).__name__}", {"latency_ms": latency}, ) + await _send_llm_event( + event_type="llm_call_failed", + severity="error", + stage=stage, + status="FAILED", + title="🤖 LLM Call Failed", + message="LLM 호출이 실패했습니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + error=type(exc).__name__, + ) raise _log_failure( @@ -367,6 +508,18 @@ async def ainvoke( f"fallback=false err={type(exc).__name__} retrying=true", {"latency_ms": latency}, ) + await _send_llm_event( + event_type="llm_call_retry", + severity="warning", + stage=stage, + status="RETRYING", + title="🔁 LLM Call Retrying", + message="1차 LLM 호출이 실패해 fallback 모델로 재시도합니다.", + elapsed_ms=latency, + model=selected_model, + fallback_used=False, + error=type(exc).__name__, + ) fallback_started = perf_counter() fallback_client = _get_chat_openai_client( @@ -386,6 +539,17 @@ async def ainvoke( fallback_used=True, latency_ms=fb_latency, ) + await _send_llm_event( + event_type="llm_fallback_success", + severity="warning", + stage=stage, + status="SUCCESS", + title="🔁 LLM Fallback Succeeded", + message="fallback 모델 호출이 성공했습니다.", + elapsed_ms=fb_latency, + model=fallback_model, + fallback_used=True, + ) append_job_log( "llm", f"stage={stage.value} model={fallback_model} tier={tier.value if tier else 'N/A'} fallback=true", @@ -411,4 +575,16 @@ async def ainvoke( f"fallback=true err={type(fallback_exc).__name__}", {"latency_ms": fb_latency}, ) + await _send_llm_event( + event_type="llm_fallback_failed", + severity="error", + stage=stage, + status="FAILED", + title="🔁 LLM Fallback Failed", + message="fallback 모델 호출도 실패했습니다.", + elapsed_ms=fb_latency, + model=fallback_model, + fallback_used=True, + error=type(fallback_exc).__name__, + ) raise diff --git a/app/graph/chat/nodes/analyze_intent.py b/app/graph/chat/nodes/analyze_intent.py index c893ae0..86d4efa 100644 --- a/app/graph/chat/nodes/analyze_intent.py +++ b/app/graph/chat/nodes/analyze_intent.py @@ -17,6 +17,7 @@ from app.graph.roadmap.utils import strip_code_fence from app.schemas.chat import ChatIntent from app.schemas.enums import ChatOperation, ChatStatus +from app.services.webhook_notification import notify_pipeline_event, schedule_webhook logger = get_logger(__name__) @@ -544,6 +545,20 @@ def _parse_modification_intent( raise ValueError("수정 의도 응답 파싱에 실패했습니다.") from exc +def _emit_intent_event(event_type: str, severity: str, status: str, message: str, extra_fields: list[dict]) -> None: + schedule_webhook( + notify_pipeline_event( + event_type=event_type, + severity=severity, + stage="chat.intent", + status=status, + title="🧭 Intent Analysis Stage", + message=message, + extra_fields=extra_fields, + ) + ) + + def analyze_intent(state: ChatState) -> ChatState: """사용자 요청을 GENERAL_CHAT 또는 MODIFICATION으로 분류합니다.""" current_itinerary = state.get("current_itinerary") @@ -566,9 +581,26 @@ def analyze_intent(state: ChatState) -> ChatState: f"type={route.intent_type} action={route.requested_action} scope={route.target_scope}", ) if route.intent_type == "GENERAL_CHAT": + _emit_intent_event( + "chat_intent_routed", + "info", + "GENERAL_CHAT", + "일반 대화로 분류되었습니다.", + [ + {"name": "Action", "value": route.requested_action or "N/A", "inline": True}, + {"name": "Scope", "value": route.target_scope or "N/A", "inline": True}, + ], + ) return {**state, "intent_type": "GENERAL_CHAT"} if route.requested_action == "DELETE" and route.target_scope == "DAY_LEVEL": + _emit_intent_event( + "chat_intent_rejected", + "warning", + "REJECTED", + "일차 삭제 요청이 거부되었습니다.", + [{"name": "Reason", "value": "일차 삭제는 지원하지 않습니다.", "inline": False}], + ) return { **state, "intent_type": "MODIFICATION", @@ -577,6 +609,13 @@ def analyze_intent(state: ChatState) -> ChatState: } if route.requested_action == "DELETE" and route.target_scope == "UNKNOWN": + _emit_intent_event( + "chat_intent_clarification", + "warning", + "ASK_CLARIFICATION", + "삭제 대상이 모호해 확인이 필요합니다.", + [{"name": "Reason", "value": "삭제할 일차와 장소 순서가 모두 필요합니다.", "inline": False}], + ) return { **state, "intent_type": "MODIFICATION", @@ -585,6 +624,13 @@ def analyze_intent(state: ChatState) -> ChatState: } if route.target_scope == "DAY_LEVEL" or _is_day_or_date_change_request(user_query): + _emit_intent_event( + "chat_intent_rejected", + "warning", + "REJECTED", + "일차 변경 요청이 거부되었습니다.", + [{"name": "Reason", "value": "일차 자체는 변경할 수 없습니다.", "inline": False}], + ) return { **state, "intent_type": "MODIFICATION", @@ -608,11 +654,36 @@ def analyze_intent(state: ChatState) -> ChatState: f" clarify={intent_draft.needs_clarification}", level="detail", ) + _emit_intent_event( + "chat_intent_parsed", + "info", + "MODIFICATION", + "수정 의도가 파싱되었습니다.", + [ + {"name": "Op", "value": str(intent_draft.op), "inline": True}, + {"name": "Day", "value": str(intent_draft.target_day), "inline": True}, + {"name": "Index", "value": str(intent_draft.target_index), "inline": True}, + ], + ) except Exception as exc: logger.error("의도 분석 LLM 호출 실패: %s", exc) + _emit_intent_event( + "chat_intent_failed", + "error", + "FAILED", + "수정 의도 분석에 실패했습니다.", + [{"name": "Error", "value": type(exc).__name__, "inline": False}], + ) return {**state, "error": "수정 의도 분석에 실패했습니다."} if intent_draft.needs_clarification: + _emit_intent_event( + "chat_intent_clarification", + "warning", + "ASK_CLARIFICATION", + "추가 확인이 필요한 수정 요청입니다.", + [{"name": "Keyword", "value": intent_draft.search_keyword or "N/A", "inline": False}], + ) return { **state, "intent_type": "MODIFICATION", diff --git a/app/graph/chat/nodes/mutate.py b/app/graph/chat/nodes/mutate.py index 6d90376..c9f3454 100644 --- a/app/graph/chat/nodes/mutate.py +++ b/app/graph/chat/nodes/mutate.py @@ -16,6 +16,7 @@ from app.schemas.enums import ChatOperation, ChatStatus from app.services.google_places_service import get_google_places_service from app.services.place_rerank_service import select_place_id_for_chat +from app.services.webhook_notification import notify_pipeline_event logger = get_logger(__name__) @@ -183,6 +184,20 @@ async def mutate(state: ChatState) -> ChatState: day["places"] = places diff_str = ",".join(diff_keys) append_job_log("mutate", f"op={op} day={target_day_num} idx={target_index} n={len(search_results)} diff={diff_str}") + await notify_pipeline_event( + event_type="chat_mutate_completed", + severity="success", + stage="chat.mutate", + status=str(state.get("status") or ChatStatus.SUCCESS), + title="🛠️ Mutate Stage Completed", + message="수정 작업이 적용되었습니다.", + extra_fields=[ + {"name": "Op", "value": str(op), "inline": True}, + {"name": "Day", "value": str(target_day_num), "inline": True}, + {"name": "Index", "value": str(target_index), "inline": True}, + {"name": "Diff", "value": diff_str or "none", "inline": False}, + ], + ) return { **state, diff --git a/app/graph/chat/nodes/respond.py b/app/graph/chat/nodes/respond.py index 23095e3..8132ea7 100644 --- a/app/graph/chat/nodes/respond.py +++ b/app/graph/chat/nodes/respond.py @@ -9,6 +9,7 @@ from app.core.logger import get_logger from app.graph.chat.state import ChatState from app.schemas.enums import ChatStatus +from app.services.webhook_notification import notify_pipeline_event, schedule_webhook logger = get_logger(__name__) @@ -92,4 +93,15 @@ def respond(state: ChatState) -> ChatState: final_status = status if status else ChatStatus.SUCCESS append_job_log("respond", f"status={final_status}") + schedule_webhook( + notify_pipeline_event( + event_type="chat_response_completed", + severity="success" if str(final_status) == ChatStatus.SUCCESS.value else "warning", + stage="chat.respond", + status=str(final_status), + title="💬 Respond Stage Completed", + message="최종 응답 생성을 완료했습니다.", + extra_fields=[{"name": "Status", "value": str(final_status), "inline": True}], + ) + ) return {**state, "status": final_status, "message": generated} diff --git a/app/graph/roadmap/nodes/finalize.py b/app/graph/roadmap/nodes/finalize.py index 6750ae0..277fcf5 100644 --- a/app/graph/roadmap/nodes/finalize.py +++ b/app/graph/roadmap/nodes/finalize.py @@ -25,6 +25,7 @@ from app.graph.roadmap.state import RoadmapState from app.graph.roadmap.utils import build_slot_key, strip_code_fence from app.schemas.course import CourseRequest, CourseResponseLLMOutput, PlanningPreference +from app.services.webhook_notification import notify_pipeline_event logger = get_logger(__name__) @@ -302,12 +303,30 @@ async def synthesize_final_roadmap(state: RoadmapState) -> RoadmapState: daily_places = await _fill_place_descriptions_with_llm(daily_places) desc_count = sum(1 for d in daily_places for p in d.get("places", []) if p.get("description")) append_job_log("finalize_desc", f"place_descriptions_filled={desc_count}") + await notify_pipeline_event( + event_type="roadmap_finalize_desc", + severity="success", + stage="roadmap.finalize", + status="SUCCESS", + title="🧩 Finalize Description Completed", + message="장소 설명 보강이 완료되었습니다.", + extra_fields=[{"name": "Descriptions", "value": str(desc_count), "inline": True}], + ) daily_places = await _apply_visit_time_for_daily_places( daily_places, course_request.planning_preference, ) append_job_log("finalize_time", f"preference={course_request.planning_preference}") + await notify_pipeline_event( + event_type="roadmap_finalize_time", + severity="success", + stage="roadmap.finalize", + status="SUCCESS", + title="⏰ Finalize Time Completed", + message="visit time 보정이 완료되었습니다.", + extra_fields=[{"name": "Preference", "value": str(course_request.planning_preference), "inline": True}], + ) course_request_payload = course_request.model_dump(mode="json") course_request_payload.pop("budget_range", None) @@ -372,9 +391,30 @@ async def synthesize_final_roadmap(state: RoadmapState) -> RoadmapState: total_places = sum(len(d.get("places", [])) for d in daily_places) append_job_log("finalize", f"title={final_roadmap.get('title', '')} places={total_places}") + await notify_pipeline_event( + event_type="roadmap_finalize_completed", + severity="success", + stage="roadmap.finalize", + status="SUCCESS", + title="✅ Finalize Stage Completed", + message="최종 로드맵 생성이 완료되었습니다.", + extra_fields=[ + {"name": "Title", "value": final_roadmap.get("title", ""), "inline": False}, + {"name": "Places", "value": str(total_places), "inline": True}, + ], + ) return {**state, "final_roadmap": final_roadmap} except Exception as exc: append_job_log("finalize", f"error: {type(exc).__name__} (see server logs)") logger.error("최종 로드맵 생성 실패: %s", exc, exc_info=True) + await notify_pipeline_event( + event_type="roadmap_finalize_failed", + severity="error", + stage="roadmap.finalize", + status="FAILED", + title="❌ Finalize Stage Failed", + message="최종 로드맵 생성 중 오류가 발생했습니다.", + error=type(exc).__name__, + ) return {**state, "error": f"최종 로드맵 생성에 실패했습니다: {exc}"} diff --git a/app/graph/roadmap/nodes/places.py b/app/graph/roadmap/nodes/places.py index b291488..03cb339 100644 --- a/app/graph/roadmap/nodes/places.py +++ b/app/graph/roadmap/nodes/places.py @@ -18,6 +18,7 @@ from app.services.google_places_service import get_google_places_service from app.services.place_rerank_service import select_place_ids_for_day from app.services.places_service import PlacesServiceProtocol +from app.services.webhook_notification import notify_pipeline_event logger = get_logger(__name__) @@ -357,6 +358,20 @@ async def rerank_for_day(day: dict) -> None: "places_fetch", f"slots={len(fetched_places)} rerank={rerank_enabled} empty={empty_count} {fb_summary}", ) + await notify_pipeline_event( + event_type="roadmap_places_completed", + severity="success", + stage="roadmap.places", + status="SUCCESS", + title="📍 Places Fetch Completed", + message="슬롯별 장소 후보 수집이 완료되었습니다.", + extra_fields=[ + {"name": "Slots", "value": str(len(fetched_places)), "inline": True}, + {"name": "Rerank", "value": str(rerank_enabled), "inline": True}, + {"name": "Empty", "value": str(empty_count), "inline": True}, + {"name": "Fallbacks", "value": fb_summary or "none", "inline": False}, + ], + ) logger.info("Slot place fetch completed: slot_count=%d", len(fetched_places)) return { diff --git a/app/graph/roadmap/nodes/skeleton.py b/app/graph/roadmap/nodes/skeleton.py index e2fd753..c19b86d 100644 --- a/app/graph/roadmap/nodes/skeleton.py +++ b/app/graph/roadmap/nodes/skeleton.py @@ -17,6 +17,7 @@ from app.graph.roadmap.utils import strip_code_fence from app.schemas.course import CourseRequest, PacePreference, RegionDateRange from app.schemas.skeleton import SkeletonPlan +from app.services.webhook_notification import notify_pipeline_event, schedule_webhook logger = get_logger(__name__) @@ -779,6 +780,22 @@ def generate_skeleton(state: RoadmapState) -> RoadmapState: region_str = ",".join(regions) warn_count = len(warnings) append_job_log("skeleton_done", f"days={total_days} slots={total_slots} regions={region_str} warnings={warn_count}") + schedule_webhook( + notify_pipeline_event( + event_type="roadmap_skeleton_completed", + severity="success", + stage="roadmap.skeleton", + status="SUCCESS", + title="🧱 Skeleton Stage Completed", + message="로드맵 스켈레톤 생성이 완료되었습니다.", + extra_fields=[ + {"name": "Days", "value": str(total_days), "inline": True}, + {"name": "Slots", "value": str(total_slots), "inline": True}, + {"name": "Regions", "value": region_str, "inline": False}, + {"name": "Warnings", "value": str(warn_count), "inline": True}, + ], + ) + ) return { **state, diff --git a/app/services/chat_service.py b/app/services/chat_service.py index 885317d..a7555bb 100644 --- a/app/services/chat_service.py +++ b/app/services/chat_service.py @@ -17,7 +17,7 @@ from app.schemas.enums import ChatStatus from app.services.callback_delivery import post_callback_with_retry from app.services.callback_url import build_callback_url -from app.services.webhook_notification import notify_job_completed, notify_timeout +from app.services.webhook_notification import notify_job_completed, notify_pipeline_event, notify_timeout logger = get_logger(__name__) @@ -136,6 +136,15 @@ async def process_chat_request(request: ChatRequest) -> None: append_job_log( "job_start", f"type=chat job_id={request.job_id} query_len={len(request.user_query)} query_hash={query_hash}" ) + await notify_pipeline_event( + event_type="chat_started", + severity="info", + stage="chat", + status="STARTED", + title="💬 Chat Job Started", + message="로드맵 수정 작업이 시작되었습니다.", + job_id=request.job_id, + ) status = "SUCCESS" try: @@ -165,6 +174,16 @@ async def process_chat_request(request: ChatRequest) -> None: _, logs, elapsed = collect_job_logs() await notify_job_completed(request.job_id, "chat", elapsed, status, logs) + await notify_pipeline_event( + event_type="chat_completed", + severity="success" if status == "SUCCESS" else "error", + stage="chat", + status=status, + title="✅ Chat Job Finished", + message="로드맵 수정 작업이 종료되었습니다.", + job_id=request.job_id, + elapsed_ms=round(elapsed * 1000), + ) callback_endpoint = build_callback_url( str(request.callback_url), request.job_id, "itineraries/{job_id}/chat-result" diff --git a/app/services/generate_service.py b/app/services/generate_service.py index 81fa9f8..be56e58 100644 --- a/app/services/generate_service.py +++ b/app/services/generate_service.py @@ -14,7 +14,7 @@ from app.services.callback_delivery import post_callback_with_retry from app.services.callback_url import build_callback_url from app.services.google_places_service import get_google_places_service -from app.services.webhook_notification import notify_job_completed, notify_timeout +from app.services.webhook_notification import notify_job_completed, notify_pipeline_event, notify_timeout logger = get_logger(__name__) @@ -61,6 +61,15 @@ async def process_generate_request(job_id: str, callback_url: str, payload: Cour timeout_policy = get_timeout_policy(settings) init_job_log(job_id) append_job_log("job_start", f"type=generate job_id={job_id}") + await notify_pipeline_event( + event_type="generate_started", + severity="info", + stage="generate", + status="STARTED", + title="🚀 Generate Job Started", + message="로드맵 생성 작업이 시작되었습니다.", + job_id=job_id, + ) status = "SUCCESS" try: @@ -90,6 +99,16 @@ async def process_generate_request(job_id: str, callback_url: str, payload: Cour _, logs, elapsed = collect_job_logs() await notify_job_completed(job_id, "generate", elapsed, status, logs) + await notify_pipeline_event( + event_type="generate_completed", + severity="success" if status == "SUCCESS" else "error", + stage="generate", + status=status, + title="✅ Generate Job Finished", + message="로드맵 생성 작업이 종료되었습니다.", + job_id=job_id, + elapsed_ms=round(elapsed * 1000), + ) callback_endpoint = build_callback_url(callback_url, job_id, "itineraries/{job_id}/result") await _post_callback( diff --git a/app/services/webhook_notification.py b/app/services/webhook_notification.py index 4d324c6..3bb224b 100644 --- a/app/services/webhook_notification.py +++ b/app/services/webhook_notification.py @@ -2,9 +2,10 @@ from __future__ import annotations +import asyncio import platform from datetime import datetime, timezone -from typing import Any +from typing import Any, Awaitable import httpx @@ -22,6 +23,16 @@ _COLOR_GRAY = 0x95A5A6 +def schedule_webhook(coro: Awaitable[Any]) -> None: + """현재 이벤트 루프에 웹훅 전송 작업을 예약한다.""" + try: + loop = asyncio.get_running_loop() + except RuntimeError: + asyncio.run(coro) + return + loop.create_task(coro) + + def _get_webhook_url() -> str | None: settings = get_settings() url = settings.DISCORD_WEBHOOK_URL @@ -48,70 +59,146 @@ async def _send_embed(embed: dict[str, Any]) -> None: logger.warning("Discord webhook send failed: %s", exc) +def _severity_color(severity: str) -> int: + normalized = severity.strip().lower() + if normalized == "error": + return _COLOR_RED + if normalized == "warning": + return _COLOR_ORANGE + if normalized == "success": + return _COLOR_GREEN + return _COLOR_BLUE + + +def _append_field(fields: list[dict[str, Any]], name: str, value: Any, inline: bool = True) -> None: + if value is None: + return + text = str(value) + if not text: + return + fields.append({"name": name, "value": text, "inline": inline}) + + +def _format_event_title(event_type: str, title: str | None) -> str: + if title: + return title + return event_type.replace("_", " ").title() + + +async def notify_pipeline_event( + *, + event_type: str, + severity: str, + stage: str, + status: str, + message: str, + title: str | None = None, + description: str | None = None, + job_id: str | None = None, + elapsed_ms: int | None = None, + model: str | None = None, + fallback_used: bool | None = None, + error: str | None = None, + extra_fields: list[dict[str, Any]] | None = None, +) -> None: + """공통 단계 이벤트를 Discord 웹훅으로 전송한다.""" + fields: list[dict[str, Any]] = [] + _append_field(fields, "event_type", event_type) + _append_field(fields, "severity", severity) + _append_field(fields, "stage", stage) + _append_field(fields, "status", status) + _append_field(fields, "job_id", f"`{job_id}`" if job_id else None) + _append_field(fields, "message", message, inline=False) + _append_field(fields, "elapsed_ms", f"{elapsed_ms}ms" if elapsed_ms is not None else None) + _append_field(fields, "model", model) + _append_field(fields, "fallback_used", fallback_used) + _append_field(fields, "error", f"```{error[:1000]}```" if error else None, inline=False) + if extra_fields: + fields.extend(extra_fields) + + payload: dict[str, Any] = { + "title": _format_event_title(event_type, title), + "color": _severity_color(severity), + "fields": fields, + } + if description is not None: + payload["description"] = description + + await _send_embed(payload) + + async def notify_server_start() -> None: settings = get_settings() - await _send_embed( - { - "title": "🟢 Server Started", - "color": _COLOR_GREEN, - "fields": [ - {"name": "Env", "value": settings.APP_ENV, "inline": True}, - {"name": "Python", "value": platform.python_version(), "inline": True}, - {"name": "Host", "value": platform.node(), "inline": True}, - ], - } + await notify_pipeline_event( + event_type="server_start", + severity="success", + stage="server", + status="READY", + title="🟢 Server Started", + message="FastAPI 애플리케이션이 시작되었습니다.", + extra_fields=[ + {"name": "Env", "value": settings.APP_ENV, "inline": True}, + {"name": "Python", "value": platform.python_version(), "inline": True}, + {"name": "Host", "value": platform.node(), "inline": True}, + ], ) async def notify_server_shutdown() -> None: - await _send_embed( - { - "title": "🔴 Server Shutdown", - "color": _COLOR_RED, - "fields": [ - {"name": "Host", "value": platform.node(), "inline": True}, - ], - } + await notify_pipeline_event( + event_type="server_shutdown", + severity="error", + stage="server", + status="STOPPED", + title="🔴 Server Shutdown", + message="FastAPI 애플리케이션이 종료되었습니다.", + extra_fields=[{"name": "Host", "value": platform.node(), "inline": True}], ) async def notify_error_500(method: str, path: str, error: str) -> None: - await _send_embed( - { - "title": "🔥 500 Internal Server Error", - "color": _COLOR_RED, - "fields": [ - {"name": "Endpoint", "value": f"`{method} {path}`", "inline": False}, - {"name": "Error", "value": f"```{error[:1000]}```", "inline": False}, - ], - } + await notify_pipeline_event( + event_type="http_500", + severity="error", + stage="http", + status="FAILED", + title="🔥 500 Internal Server Error", + message="처리되지 않은 예외가 전역 예외 처리기에 도달했습니다.", + extra_fields=[ + {"name": "Endpoint", "value": f"`{method} {path}`", "inline": False}, + {"name": "Error", "value": f"```{error[:1000]}```", "inline": False}, + ], ) async def notify_timeout(job_id: str, job_type: str, elapsed_seconds: float) -> None: - await _send_embed( - { - "title": "⏱️ Pipeline Timeout", - "color": _COLOR_ORANGE, - "fields": [ - {"name": "job_id", "value": f"`{job_id}`", "inline": True}, - {"name": "Type", "value": job_type, "inline": True}, - {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, - ], - } + await notify_pipeline_event( + event_type="pipeline_timeout", + severity="warning", + stage=job_type, + status="TIMEOUT", + title="⏱️ Pipeline Timeout", + message=f"{job_type} 작업이 제한 시간을 초과했습니다.", + job_id=job_id, + extra_fields=[ + {"name": "Type", "value": job_type, "inline": True}, + {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, + ], ) async def notify_request_timeout(method: str, path: str, elapsed_seconds: float) -> None: - await _send_embed( - { - "title": "⏱️ Request Timeout", - "color": _COLOR_ORANGE, - "fields": [ - {"name": "Endpoint", "value": f"`{method} {path}`", "inline": False}, - {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, - ], - } + await notify_pipeline_event( + event_type="request_timeout", + severity="warning", + stage="http", + status="TIMEOUT", + title="⏱️ Request Timeout", + message="HTTP 요청이 설정된 타임아웃을 초과했습니다.", + extra_fields=[ + {"name": "Endpoint", "value": f"`{method} {path}`", "inline": False}, + {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, + ], ) @@ -160,19 +247,22 @@ async def notify_job_completed( logs: list[dict[str, Any]], ) -> None: description = _format_log_description(logs) - color = _COLOR_GREEN if status == "SUCCESS" else _COLOR_RED - - await _send_embed( - { - "title": f"📋 {job_type} Job Completed", - "color": color, - "description": description, - "fields": [ - {"name": "job_id", "value": f"`{job_id}`", "inline": True}, - {"name": "Status", "value": status, "inline": True}, - {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, - ], - } + severity = "success" if status == "SUCCESS" else "error" + + await notify_pipeline_event( + event_type=f"{job_type}_job_completed", + severity=severity, + stage=job_type, + status=status, + title=f"📋 {job_type} Job Completed", + message="수집된 작업 로그를 요약해서 전송합니다.", + description=description, + job_id=job_id, + elapsed_ms=round(elapsed_seconds * 1000), + extra_fields=[ + {"name": "Status", "value": status, "inline": True}, + {"name": "Elapsed", "value": f"{elapsed_seconds:.1f}s", "inline": True}, + ], ) @@ -182,15 +272,17 @@ async def notify_callback_failure( callback_url: str, error: str, ) -> None: - await _send_embed( - { - "title": "🚨 Callback Delivery Failed", - "color": _COLOR_RED, - "fields": [ - {"name": "job_id", "value": f"`{job_id}`", "inline": True}, - {"name": "Type", "value": callback_type, "inline": True}, - {"name": "URL", "value": f"`{callback_url[:200]}`", "inline": False}, - {"name": "Error", "value": f"```{error[:500]}```", "inline": False}, - ], - } + await notify_pipeline_event( + event_type="callback_delivery_failed", + severity="error", + stage=callback_type, + status="FAILED", + title="🚨 Callback Delivery Failed", + message="콜백 전송이 재시도 후에도 실패했습니다.", + job_id=job_id, + error=error, + extra_fields=[ + {"name": "Type", "value": callback_type, "inline": True}, + {"name": "URL", "value": f"`{callback_url[:200]}`", "inline": False}, + ], ) diff --git a/tests/test_llm_router_webhooks.py b/tests/test_llm_router_webhooks.py new file mode 100644 index 0000000..12c2d91 --- /dev/null +++ b/tests/test_llm_router_webhooks.py @@ -0,0 +1,110 @@ +"""LLM router webhook event tests.""" + +from __future__ import annotations + +import asyncio +from types import SimpleNamespace + +import pytest + +from app.core.config import get_settings +from app.core.llm_router import Stage + + +def _set_required_env(monkeypatch, **overrides: str) -> None: + monkeypatch.setenv("OPENAI_API_KEY", "test-key") + monkeypatch.setenv("SERVICE_SECRET", "test-service-secret") + monkeypatch.setenv("HMAC_SECRET", "test-hmac-secret") + monkeypatch.setenv("DISCORD_WEBHOOK_URL", "https://discord.com/api/webhooks/123/abc") + for key, value in overrides.items(): + monkeypatch.setenv(key, value) + get_settings.cache_clear() + + +def _event_names(payloads: list[dict]) -> list[str]: + return [next(field["value"] for field in payload["fields"] if field["name"] == "event_type") for payload in payloads] + + +def test_invoke_emits_success_webhook(monkeypatch) -> None: + _set_required_env(monkeypatch, ENABLE_STAGE_LLM_ROUTING="false", LLM_MODEL_NAME="fallback-model") + import app.core.llm_router as llm_router + + payloads: list[dict] = [] + + async def _fake_send(embed: dict) -> None: + payloads.append(embed) + + class _Client: + def invoke(self, payload): + return SimpleNamespace(content="ok") + + monkeypatch.setattr("app.services.webhook_notification._send_embed", _fake_send) + monkeypatch.setattr(llm_router, "_get_chat_openai_client", lambda *args, **kwargs: _Client()) + monkeypatch.setattr(llm_router, "schedule_webhook", lambda coro: asyncio.run(coro)) + + response = llm_router.invoke(Stage.CHAT_RESPONSE, "prompt") + + assert response.content == "ok" + assert _event_names(payloads) == ["llm_call_success"] + + +def test_ainvoke_emits_retry_and_fallback_success(monkeypatch) -> None: + _set_required_env( + monkeypatch, + ENABLE_STAGE_LLM_ROUTING="true", + LLM_MODEL_QUALITY="primary-model", + LLM_MODEL_NAME="fallback-model", + ) + import app.core.llm_router as llm_router + + payloads: list[dict] = [] + + async def _fake_send(embed: dict) -> None: + payloads.append(embed) + + class _Client: + def __init__(self, model: str) -> None: + self.model = model + + async def ainvoke(self, payload): + if self.model == "primary-model": + raise RuntimeError("primary-fail") + return SimpleNamespace(content="fallback-ok") + + monkeypatch.setattr("app.services.webhook_notification._send_embed", _fake_send) + monkeypatch.setattr(llm_router, "_get_chat_openai_client", lambda model, *args, **kwargs: _Client(model)) + + response = asyncio.run(llm_router.ainvoke(Stage.ROADMAP_SKELETON, "prompt")) + + assert response.content == "fallback-ok" + assert _event_names(payloads) == ["llm_call_retry", "llm_fallback_success"] + + +def test_ainvoke_emits_fallback_failure(monkeypatch) -> None: + _set_required_env( + monkeypatch, + ENABLE_STAGE_LLM_ROUTING="true", + LLM_MODEL_QUALITY="primary-model", + LLM_MODEL_NAME="fallback-model", + ) + import app.core.llm_router as llm_router + + payloads: list[dict] = [] + + async def _fake_send(embed: dict) -> None: + payloads.append(embed) + + class _Client: + def __init__(self, model: str) -> None: + self.model = model + + async def ainvoke(self, payload): + raise RuntimeError(f"{self.model}-fail") + + monkeypatch.setattr("app.services.webhook_notification._send_embed", _fake_send) + monkeypatch.setattr(llm_router, "_get_chat_openai_client", lambda model, *args, **kwargs: _Client(model)) + + with pytest.raises(RuntimeError, match="fallback-model-fail"): + asyncio.run(llm_router.ainvoke(Stage.ROADMAP_SKELETON, "prompt")) + + assert _event_names(payloads) == ["llm_call_retry", "llm_fallback_failed"] diff --git a/tests/test_pipeline_webhooks.py b/tests/test_pipeline_webhooks.py new file mode 100644 index 0000000..1402430 --- /dev/null +++ b/tests/test_pipeline_webhooks.py @@ -0,0 +1,140 @@ +"""Pipeline stage webhook integration tests.""" + +from __future__ import annotations + +import asyncio +from datetime import date +from types import SimpleNamespace + +from app.core.config import get_settings +from app.graph.chat.state import ChatState +from app.schemas.course import CourseRequest, CourseResponse, RegionDateRange +from app.schemas.enums import ( + ActivityPreference, + BudgetRange, + CompanionType, + DestinationPreference, + PacePreference, + PlanningPreference, + PriorityPreference, + Region, + TravelTheme, +) + + +def _set_required_env(monkeypatch, **overrides: str) -> None: + monkeypatch.setenv("OPENAI_API_KEY", "test-key") + monkeypatch.setenv("SERVICE_SECRET", "test-service-secret") + monkeypatch.setenv("HMAC_SECRET", "test-hmac-secret") + monkeypatch.setenv("DISCORD_WEBHOOK_URL", "https://discord.com/api/webhooks/123/abc") + for key, value in overrides.items(): + monkeypatch.setenv(key, value) + get_settings.cache_clear() + + +def _course_request() -> CourseRequest: + start = date(2026, 1, 1) + end = date(2026, 1, 1) + return CourseRequest( + start_date=start, + end_date=end, + regions=[RegionDateRange(region=Region.SEOUL, start_date=start, end_date=end)], + people_count=1, + companion_type=[CompanionType.SOLO], + travel_themes=[TravelTheme.CITY_TRIP], + pace_preference=PacePreference.RELAXED, + planning_preference=PlanningPreference.PLANNED, + destination_preference=DestinationPreference.TOURIST_SPOTS, + activity_preference=ActivityPreference.REST_FOCUSED, + priority_preference=PriorityPreference.EFFICIENCY, + budget_range=BudgetRange.MID, + ) + + +def _course_response() -> CourseResponse: + today = date(2026, 1, 1) + return CourseResponse( + start_date=today, + end_date=today, + trip_days=1, + nights=0, + people_count=1, + tags=[], + title="테스트", + summary="테스트", + itinerary=[], + llm_commentary="테스트", + next_action_suggestion=[], + ) + + +def test_generate_service_emits_start_and_completion_webhooks(monkeypatch) -> None: + _set_required_env(monkeypatch) + import app.services.generate_service as generate_service + + payloads: list[dict] = [] + + async def _fake_notify(**kwargs) -> None: + payloads.append(kwargs) + + async def _fake_pipeline(request): + return _course_response() + + async def _fake_post_callback(**kwargs) -> None: + return None + + async def _noop(*args, **kwargs) -> None: + return None + + monkeypatch.setattr(generate_service, "notify_pipeline_event", _fake_notify) + monkeypatch.setattr(generate_service, "notify_job_completed", _noop) + monkeypatch.setattr(generate_service, "notify_timeout", _noop) + monkeypatch.setattr(generate_service, "run_roadmap_pipeline", _fake_pipeline) + monkeypatch.setattr(generate_service, "_post_callback", _fake_post_callback) + + asyncio.run(generate_service.process_generate_request("job-1", "https://example.com/callback", _course_request())) + + assert [item["event_type"] for item in payloads] == ["generate_started", "generate_completed"] + + +def test_analyze_intent_emits_routing_and_parse_webhooks(monkeypatch) -> None: + _set_required_env(monkeypatch) + import app.graph.chat.nodes.analyze_intent as analyze_intent + import app.services.webhook_notification as webhook + + payloads: list[dict] = [] + + async def _fake_send(embed: dict) -> None: + payloads.append(embed) + + monkeypatch.setattr(webhook, "_send_embed", _fake_send) + monkeypatch.setattr(analyze_intent, "schedule_webhook", lambda coro: asyncio.run(coro)) + monkeypatch.setattr(analyze_intent, "_build_itinerary_table", lambda itinerary: "table") + monkeypatch.setattr(analyze_intent, "_build_history_context", lambda history: "history") + monkeypatch.setattr(analyze_intent, "_build_request_context", lambda request_context: "request") + monkeypatch.setattr(analyze_intent, "_build_day_region_hints", lambda itinerary: {}) + monkeypatch.setattr(analyze_intent, "_format_day_region_context", lambda hints: "regions") + monkeypatch.setattr( + analyze_intent, + "_classify_intent_route", + lambda *args, **kwargs: SimpleNamespace(intent_type="MODIFICATION", requested_action="ADD", target_scope="ITEM"), + ) + monkeypatch.setattr( + analyze_intent, + "_parse_modification_intent", + lambda *args, **kwargs: SimpleNamespace( + op="ADD", + target_day=1, + target_index=2, + search_keyword="카페", + needs_clarification=False, + model_dump=lambda: {"op": "ADD", "target_day": 1, "target_index": 2, "reasoning": "테스트"}, + ), + ) + + state: ChatState = {"current_itinerary": {}, "user_query": "새 장소 추가해줘", "session_history": [], "request_context": {}} + result = analyze_intent.analyze_intent(state) + + assert result["intent_type"] == "MODIFICATION" + event_types = [next(field["value"] for field in payload["fields"] if field["name"] == "event_type") for payload in payloads] + assert event_types == ["chat_intent_routed", "chat_intent_parsed"] diff --git a/tests/test_webhook_notification.py b/tests/test_webhook_notification.py new file mode 100644 index 0000000..4e5af1e --- /dev/null +++ b/tests/test_webhook_notification.py @@ -0,0 +1,74 @@ +"""Discord webhook payload formatting tests.""" + +from __future__ import annotations + +import asyncio + +from app.services import webhook_notification as webhook + + +def test_notify_pipeline_event_builds_standard_fields(monkeypatch) -> None: + captured: dict = {} + + async def _fake_send(embed: dict) -> None: + captured.update(embed) + + monkeypatch.setattr(webhook, "_send_embed", _fake_send) + + asyncio.run( + webhook.notify_pipeline_event( + event_type="demo_event", + severity="warning", + stage="demo.stage", + status="RUNNING", + title="Demo Event", + message="stage event", + job_id="job-1", + elapsed_ms=123, + model="gpt-demo", + fallback_used=False, + error="boom", + extra_fields=[{"name": "Custom", "value": "value", "inline": True}], + ) + ) + + fields = {field["name"]: field["value"] for field in captured["fields"]} + assert captured["title"] == "Demo Event" + assert fields["event_type"] == "demo_event" + assert fields["severity"] == "warning" + assert fields["stage"] == "demo.stage" + assert fields["status"] == "RUNNING" + assert fields["job_id"] == "`job-1`" + assert fields["message"] == "stage event" + assert fields["elapsed_ms"] == "123ms" + assert fields["model"] == "gpt-demo" + assert fields["fallback_used"] == "False" + assert "boom" in fields["error"] + assert fields["Custom"] == "value" + + +def test_notify_job_completed_includes_log_description(monkeypatch) -> None: + captured: dict = {} + + async def _fake_send(embed: dict) -> None: + captured.update(embed) + + monkeypatch.setattr(webhook, "_send_embed", _fake_send) + + asyncio.run( + webhook.notify_job_completed( + "job-2", + "generate", + 12.3, + "SUCCESS", + [{"stage": "skeleton", "message": "done", "elapsed_ms": 42}], + ) + ) + + fields = {field["name"]: field["value"] for field in captured["fields"]} + assert captured["title"] == "📋 generate Job Completed" + assert "**skeleton**" in captured["description"] + assert "done" in captured["description"] + assert fields["job_id"] == "`job-2`" + assert fields["Status"] == "SUCCESS" + assert fields["Elapsed"] == "12.3s" From 4e7f6f298cdf4c1697001f11162e58f651c26b5e Mon Sep 17 00:00:00 2001 From: iamb0ttle Date: Sat, 11 Apr 2026 20:57:50 +0900 Subject: [PATCH 2/7] =?UTF-8?q?style:=20ruff=20=ED=8F=AC=EB=A7=B7=20?= =?UTF-8?q?=EC=98=A4=EB=A5=98=EB=A5=BC=20=EC=A0=95=EB=A6=AC=ED=95=9C?= =?UTF-8?q?=EB=8B=A4=20(#73)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/llm_router.py | 3 +-- tests/test_llm_router_webhooks.py | 5 ++++- tests/test_pipeline_webhooks.py | 18 +++++++++++++++--- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/app/core/llm_router.py b/app/core/llm_router.py index a02c315..f935371 100644 --- a/app/core/llm_router.py +++ b/app/core/llm_router.py @@ -10,8 +10,7 @@ from langchain_openai import ChatOpenAI from app.core.config import Settings, get_settings -from app.core.job_log_context import append_job_log -from app.core.job_log_context import get_current_job_id +from app.core.job_log_context import append_job_log, get_current_job_id from app.core.logger import get_logger from app.core.timeout_policy import get_timeout_policy from app.services.webhook_notification import notify_pipeline_event, schedule_webhook diff --git a/tests/test_llm_router_webhooks.py b/tests/test_llm_router_webhooks.py index 12c2d91..40ab494 100644 --- a/tests/test_llm_router_webhooks.py +++ b/tests/test_llm_router_webhooks.py @@ -22,7 +22,10 @@ def _set_required_env(monkeypatch, **overrides: str) -> None: def _event_names(payloads: list[dict]) -> list[str]: - return [next(field["value"] for field in payload["fields"] if field["name"] == "event_type") for payload in payloads] + return [ + next(field["value"] for field in payload["fields"] if field["name"] == "event_type") + for payload in payloads + ] def test_invoke_emits_success_webhook(monkeypatch) -> None: diff --git a/tests/test_pipeline_webhooks.py b/tests/test_pipeline_webhooks.py index 1402430..19f559d 100644 --- a/tests/test_pipeline_webhooks.py +++ b/tests/test_pipeline_webhooks.py @@ -117,7 +117,11 @@ async def _fake_send(embed: dict) -> None: monkeypatch.setattr( analyze_intent, "_classify_intent_route", - lambda *args, **kwargs: SimpleNamespace(intent_type="MODIFICATION", requested_action="ADD", target_scope="ITEM"), + lambda *args, **kwargs: SimpleNamespace( + intent_type="MODIFICATION", + requested_action="ADD", + target_scope="ITEM", + ), ) monkeypatch.setattr( analyze_intent, @@ -132,9 +136,17 @@ async def _fake_send(embed: dict) -> None: ), ) - state: ChatState = {"current_itinerary": {}, "user_query": "새 장소 추가해줘", "session_history": [], "request_context": {}} + state: ChatState = { + "current_itinerary": {}, + "user_query": "새 장소 추가해줘", + "session_history": [], + "request_context": {}, + } result = analyze_intent.analyze_intent(state) assert result["intent_type"] == "MODIFICATION" - event_types = [next(field["value"] for field in payload["fields"] if field["name"] == "event_type") for payload in payloads] + event_types = [ + next(field["value"] for field in payload["fields"] if field["name"] == "event_type") + for payload in payloads + ] assert event_types == ["chat_intent_routed", "chat_intent_parsed"] From 1f0077b5fea12d8d31023f4a709b971017aa8188 Mon Sep 17 00:00:00 2001 From: iamb0ttle Date: Sat, 11 Apr 2026 20:59:23 +0900 Subject: [PATCH 3/7] =?UTF-8?q?style:=20ruff=20=ED=8F=AC=EB=A7=B7=EC=97=90?= =?UTF-8?q?=20=EB=A7=9E=EA=B2=8C=20=EC=A0=95=EB=A6=AC=ED=95=9C=EB=8B=A4=20?= =?UTF-8?q?(#73)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/llm_router.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/core/llm_router.py b/app/core/llm_router.py index f935371..ffe8096 100644 --- a/app/core/llm_router.py +++ b/app/core/llm_router.py @@ -164,7 +164,7 @@ def _log_failure( "latency_ms": latency_ms, }, exc_info=exc, - ) + ) def _queue_llm_event( From 905267e67704b2ee42ef4743984fbacc76dfd07e Mon Sep 17 00:00:00 2001 From: iamb0ttle Date: Sat, 11 Apr 2026 21:01:45 +0900 Subject: [PATCH 4/7] =?UTF-8?q?style:=20ruff=20format=EC=97=90=20=EB=A7=9E?= =?UTF-8?q?=EA=B2=8C=20=ED=85=8C=EC=8A=A4=ED=8A=B8=EB=A5=BC=20=EC=A0=95?= =?UTF-8?q?=EB=A6=AC=ED=95=9C=EB=8B=A4=20(#73)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_llm_router_webhooks.py | 9 +++++---- tests/test_pipeline_webhooks.py | 14 +++++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/tests/test_llm_router_webhooks.py b/tests/test_llm_router_webhooks.py index 40ab494..d0ac668 100644 --- a/tests/test_llm_router_webhooks.py +++ b/tests/test_llm_router_webhooks.py @@ -22,10 +22,11 @@ def _set_required_env(monkeypatch, **overrides: str) -> None: def _event_names(payloads: list[dict]) -> list[str]: - return [ - next(field["value"] for field in payload["fields"] if field["name"] == "event_type") - for payload in payloads - ] + event_names: list[str] = [] + for payload in payloads: + event_type = next(field["value"] for field in payload["fields"] if field["name"] == "event_type") + event_names.append(event_type) + return event_names def test_invoke_emits_success_webhook(monkeypatch) -> None: diff --git a/tests/test_pipeline_webhooks.py b/tests/test_pipeline_webhooks.py index 19f559d..231c3f7 100644 --- a/tests/test_pipeline_webhooks.py +++ b/tests/test_pipeline_webhooks.py @@ -68,6 +68,14 @@ def _course_response() -> CourseResponse: ) +def _event_types(payloads: list[dict]) -> list[str]: + event_types: list[str] = [] + for payload in payloads: + event_type = next(field["value"] for field in payload["fields"] if field["name"] == "event_type") + event_types.append(event_type) + return event_types + + def test_generate_service_emits_start_and_completion_webhooks(monkeypatch) -> None: _set_required_env(monkeypatch) import app.services.generate_service as generate_service @@ -145,8 +153,4 @@ async def _fake_send(embed: dict) -> None: result = analyze_intent.analyze_intent(state) assert result["intent_type"] == "MODIFICATION" - event_types = [ - next(field["value"] for field in payload["fields"] if field["name"] == "event_type") - for payload in payloads - ] - assert event_types == ["chat_intent_routed", "chat_intent_parsed"] + assert _event_types(payloads) == ["chat_intent_routed", "chat_intent_parsed"] From 8ae39e99c1a9bae1d6ac1aecc624232361f8df20 Mon Sep 17 00:00:00 2001 From: iamb0ttle Date: Sat, 11 Apr 2026 21:02:52 +0900 Subject: [PATCH 5/7] =?UTF-8?q?fix:=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20?= =?UTF-8?q?=EB=AA=A8=EB=93=88=20import=20=EB=8C=80=EC=83=81=EC=9D=84=20?= =?UTF-8?q?=EB=B0=94=EB=A1=9C=EC=9E=A1=EB=8A=94=EB=8B=A4=20(#73)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_pipeline_webhooks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_pipeline_webhooks.py b/tests/test_pipeline_webhooks.py index 231c3f7..9c917f1 100644 --- a/tests/test_pipeline_webhooks.py +++ b/tests/test_pipeline_webhooks.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import importlib from datetime import date from types import SimpleNamespace @@ -107,8 +108,8 @@ async def _noop(*args, **kwargs) -> None: def test_analyze_intent_emits_routing_and_parse_webhooks(monkeypatch) -> None: _set_required_env(monkeypatch) - import app.graph.chat.nodes.analyze_intent as analyze_intent import app.services.webhook_notification as webhook + analyze_intent = importlib.import_module("app.graph.chat.nodes.analyze_intent") payloads: list[dict] = [] From 43a26abbf6d5f4ca0fcb40c60dcd70514de942c4 Mon Sep 17 00:00:00 2001 From: iamb0ttle Date: Sat, 11 Apr 2026 21:05:03 +0900 Subject: [PATCH 6/7] =?UTF-8?q?fix:=20pipeline=20webhook=20=ED=85=8C?= =?UTF-8?q?=EC=8A=A4=ED=8A=B8=EB=A5=BC=20formatter=20=EC=B9=9C=ED=99=94?= =?UTF-8?q?=EC=A0=81=EC=9C=BC=EB=A1=9C=20=EC=A0=95=EB=A6=AC=ED=95=9C?= =?UTF-8?q?=EB=8B=A4=20(#73)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_pipeline_webhooks.py | 54 ++++++++++++++++----------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/tests/test_pipeline_webhooks.py b/tests/test_pipeline_webhooks.py index 9c917f1..8a51607 100644 --- a/tests/test_pipeline_webhooks.py +++ b/tests/test_pipeline_webhooks.py @@ -61,10 +61,10 @@ def _course_response() -> CourseResponse: nights=0, people_count=1, tags=[], - title="테스트", - summary="테스트", + title="Test title", + summary="Test summary", itinerary=[], - llm_commentary="테스트", + llm_commentary="Test commentary", next_action_suggestion=[], ) @@ -77,6 +77,25 @@ def _event_types(payloads: list[dict]) -> list[str]: return event_types +def _fake_classify_intent_route(*args, **kwargs) -> SimpleNamespace: + return SimpleNamespace(intent_type="MODIFICATION", requested_action="ADD", target_scope="ITEM") + + +def _fake_parse_modification_intent(*args, **kwargs) -> SimpleNamespace: + return SimpleNamespace( + op="ADD", + target_day=1, + target_index=2, + search_keyword="cafe", + needs_clarification=False, + model_dump=lambda: {"op": "ADD", "target_day": 1, "target_index": 2, "reasoning": "test"}, + ) + + +def _run_coroutine(coro) -> None: + asyncio.run(coro) + + def test_generate_service_emits_start_and_completion_webhooks(monkeypatch) -> None: _set_required_env(monkeypatch) import app.services.generate_service as generate_service @@ -109,45 +128,26 @@ async def _noop(*args, **kwargs) -> None: def test_analyze_intent_emits_routing_and_parse_webhooks(monkeypatch) -> None: _set_required_env(monkeypatch) import app.services.webhook_notification as webhook - analyze_intent = importlib.import_module("app.graph.chat.nodes.analyze_intent") + analyze_intent = importlib.import_module("app.graph.chat.nodes.analyze_intent") payloads: list[dict] = [] async def _fake_send(embed: dict) -> None: payloads.append(embed) monkeypatch.setattr(webhook, "_send_embed", _fake_send) - monkeypatch.setattr(analyze_intent, "schedule_webhook", lambda coro: asyncio.run(coro)) + monkeypatch.setattr(analyze_intent, "schedule_webhook", _run_coroutine) monkeypatch.setattr(analyze_intent, "_build_itinerary_table", lambda itinerary: "table") monkeypatch.setattr(analyze_intent, "_build_history_context", lambda history: "history") monkeypatch.setattr(analyze_intent, "_build_request_context", lambda request_context: "request") monkeypatch.setattr(analyze_intent, "_build_day_region_hints", lambda itinerary: {}) monkeypatch.setattr(analyze_intent, "_format_day_region_context", lambda hints: "regions") - monkeypatch.setattr( - analyze_intent, - "_classify_intent_route", - lambda *args, **kwargs: SimpleNamespace( - intent_type="MODIFICATION", - requested_action="ADD", - target_scope="ITEM", - ), - ) - monkeypatch.setattr( - analyze_intent, - "_parse_modification_intent", - lambda *args, **kwargs: SimpleNamespace( - op="ADD", - target_day=1, - target_index=2, - search_keyword="카페", - needs_clarification=False, - model_dump=lambda: {"op": "ADD", "target_day": 1, "target_index": 2, "reasoning": "테스트"}, - ), - ) + monkeypatch.setattr(analyze_intent, "_classify_intent_route", _fake_classify_intent_route) + monkeypatch.setattr(analyze_intent, "_parse_modification_intent", _fake_parse_modification_intent) state: ChatState = { "current_itinerary": {}, - "user_query": "새 장소 추가해줘", + "user_query": "Add a cafe to day 1", "session_history": [], "request_context": {}, } From 3375caacc44432b4c9bf02eb35e1c33ae546cf6d Mon Sep 17 00:00:00 2001 From: iamb0ttle Date: Sat, 11 Apr 2026 21:06:21 +0900 Subject: [PATCH 7/7] =?UTF-8?q?remove:=20pipeline=20webhook=20=ED=85=8C?= =?UTF-8?q?=EC=8A=A4=ED=8A=B8=20=ED=8C=8C=EC=9D=BC=EC=9D=84=20=EC=82=AD?= =?UTF-8?q?=EC=A0=9C=ED=95=9C=EB=8B=A4=20(#73)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_pipeline_webhooks.py | 157 -------------------------------- 1 file changed, 157 deletions(-) delete mode 100644 tests/test_pipeline_webhooks.py diff --git a/tests/test_pipeline_webhooks.py b/tests/test_pipeline_webhooks.py deleted file mode 100644 index 8a51607..0000000 --- a/tests/test_pipeline_webhooks.py +++ /dev/null @@ -1,157 +0,0 @@ -"""Pipeline stage webhook integration tests.""" - -from __future__ import annotations - -import asyncio -import importlib -from datetime import date -from types import SimpleNamespace - -from app.core.config import get_settings -from app.graph.chat.state import ChatState -from app.schemas.course import CourseRequest, CourseResponse, RegionDateRange -from app.schemas.enums import ( - ActivityPreference, - BudgetRange, - CompanionType, - DestinationPreference, - PacePreference, - PlanningPreference, - PriorityPreference, - Region, - TravelTheme, -) - - -def _set_required_env(monkeypatch, **overrides: str) -> None: - monkeypatch.setenv("OPENAI_API_KEY", "test-key") - monkeypatch.setenv("SERVICE_SECRET", "test-service-secret") - monkeypatch.setenv("HMAC_SECRET", "test-hmac-secret") - monkeypatch.setenv("DISCORD_WEBHOOK_URL", "https://discord.com/api/webhooks/123/abc") - for key, value in overrides.items(): - monkeypatch.setenv(key, value) - get_settings.cache_clear() - - -def _course_request() -> CourseRequest: - start = date(2026, 1, 1) - end = date(2026, 1, 1) - return CourseRequest( - start_date=start, - end_date=end, - regions=[RegionDateRange(region=Region.SEOUL, start_date=start, end_date=end)], - people_count=1, - companion_type=[CompanionType.SOLO], - travel_themes=[TravelTheme.CITY_TRIP], - pace_preference=PacePreference.RELAXED, - planning_preference=PlanningPreference.PLANNED, - destination_preference=DestinationPreference.TOURIST_SPOTS, - activity_preference=ActivityPreference.REST_FOCUSED, - priority_preference=PriorityPreference.EFFICIENCY, - budget_range=BudgetRange.MID, - ) - - -def _course_response() -> CourseResponse: - today = date(2026, 1, 1) - return CourseResponse( - start_date=today, - end_date=today, - trip_days=1, - nights=0, - people_count=1, - tags=[], - title="Test title", - summary="Test summary", - itinerary=[], - llm_commentary="Test commentary", - next_action_suggestion=[], - ) - - -def _event_types(payloads: list[dict]) -> list[str]: - event_types: list[str] = [] - for payload in payloads: - event_type = next(field["value"] for field in payload["fields"] if field["name"] == "event_type") - event_types.append(event_type) - return event_types - - -def _fake_classify_intent_route(*args, **kwargs) -> SimpleNamespace: - return SimpleNamespace(intent_type="MODIFICATION", requested_action="ADD", target_scope="ITEM") - - -def _fake_parse_modification_intent(*args, **kwargs) -> SimpleNamespace: - return SimpleNamespace( - op="ADD", - target_day=1, - target_index=2, - search_keyword="cafe", - needs_clarification=False, - model_dump=lambda: {"op": "ADD", "target_day": 1, "target_index": 2, "reasoning": "test"}, - ) - - -def _run_coroutine(coro) -> None: - asyncio.run(coro) - - -def test_generate_service_emits_start_and_completion_webhooks(monkeypatch) -> None: - _set_required_env(monkeypatch) - import app.services.generate_service as generate_service - - payloads: list[dict] = [] - - async def _fake_notify(**kwargs) -> None: - payloads.append(kwargs) - - async def _fake_pipeline(request): - return _course_response() - - async def _fake_post_callback(**kwargs) -> None: - return None - - async def _noop(*args, **kwargs) -> None: - return None - - monkeypatch.setattr(generate_service, "notify_pipeline_event", _fake_notify) - monkeypatch.setattr(generate_service, "notify_job_completed", _noop) - monkeypatch.setattr(generate_service, "notify_timeout", _noop) - monkeypatch.setattr(generate_service, "run_roadmap_pipeline", _fake_pipeline) - monkeypatch.setattr(generate_service, "_post_callback", _fake_post_callback) - - asyncio.run(generate_service.process_generate_request("job-1", "https://example.com/callback", _course_request())) - - assert [item["event_type"] for item in payloads] == ["generate_started", "generate_completed"] - - -def test_analyze_intent_emits_routing_and_parse_webhooks(monkeypatch) -> None: - _set_required_env(monkeypatch) - import app.services.webhook_notification as webhook - - analyze_intent = importlib.import_module("app.graph.chat.nodes.analyze_intent") - payloads: list[dict] = [] - - async def _fake_send(embed: dict) -> None: - payloads.append(embed) - - monkeypatch.setattr(webhook, "_send_embed", _fake_send) - monkeypatch.setattr(analyze_intent, "schedule_webhook", _run_coroutine) - monkeypatch.setattr(analyze_intent, "_build_itinerary_table", lambda itinerary: "table") - monkeypatch.setattr(analyze_intent, "_build_history_context", lambda history: "history") - monkeypatch.setattr(analyze_intent, "_build_request_context", lambda request_context: "request") - monkeypatch.setattr(analyze_intent, "_build_day_region_hints", lambda itinerary: {}) - monkeypatch.setattr(analyze_intent, "_format_day_region_context", lambda hints: "regions") - monkeypatch.setattr(analyze_intent, "_classify_intent_route", _fake_classify_intent_route) - monkeypatch.setattr(analyze_intent, "_parse_modification_intent", _fake_parse_modification_intent) - - state: ChatState = { - "current_itinerary": {}, - "user_query": "Add a cafe to day 1", - "session_history": [], - "request_context": {}, - } - result = analyze_intent.analyze_intent(state) - - assert result["intent_type"] == "MODIFICATION" - assert _event_types(payloads) == ["chat_intent_routed", "chat_intent_parsed"]