Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ OPENAI_API_KEY=
# 서비스 간 인증 시 사용하는 공용 시크릿(x-service-secret)
SERVICE_SECRET=

# 쿼리 해싱 등 보안 용도로 사용되는 HMAC 시크릿
HMAC_SECRET=

# ------------------------------
# LLM / 요청 타임아웃 정책
# ------------------------------
Expand Down
1 change: 1 addition & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Settings(BaseSettings):

OPENAI_API_KEY: str
SERVICE_SECRET: str
HMAC_SECRET: str
Comment thread
zweadfx marked this conversation as resolved.
LLM_MODEL_NAME: str = "gpt-4o-mini"
ENABLE_STAGE_LLM_ROUTING: bool = False
LLM_MODEL_QUALITY: str = "gpt-4o-mini"
Expand Down
10 changes: 8 additions & 2 deletions app/core/job_log_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ def init_job_log(job_id: str) -> None:
_logs_var.set([])


def append_job_log(stage: str, message: str, extra: dict[str, Any] | None = None) -> None:
def append_job_log(
stage: str,
message: str,
extra: dict[str, Any] | None = None,
*,
level: str = "info",
) -> None:
"""현재 job에 로그 항목을 추가한다. init_job_log 호출 전이면 무시한다."""
logs = _logs_var.get()
if logs is None:
Expand All @@ -27,7 +33,7 @@ def append_job_log(stage: str, message: str, extra: dict[str, Any] | None = None
start = _start_time_var.get()
elapsed_ms = round((time.monotonic() - start) * 1000)

entry: dict[str, Any] = {"stage": stage, "message": message, "elapsed_ms": elapsed_ms}
entry: dict[str, Any] = {"stage": stage, "message": message, "elapsed_ms": elapsed_ms, "level": level}
if extra:
entry.update(extra)

Expand Down
89 changes: 79 additions & 10 deletions app/core/llm_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from langchain_openai import ChatOpenAI

from app.core.config import Settings, get_settings
from app.core.job_log_context import append_job_log
from app.core.logger import get_logger
from app.core.timeout_policy import get_timeout_policy

Expand Down Expand Up @@ -189,27 +190,41 @@ def invoke(
resolved_settings.OPENAI_API_KEY,
)
response = client.invoke(payload)
latency = round((perf_counter() - started) * 1000)
_log_success(
stage=stage,
tier=tier,
selected_model=selected_model,
routing_enabled=routing_enabled,
fallback_used=False,
latency_ms=(perf_counter() - started) * 1000,
latency_ms=latency,
)
append_job_log(
"llm",
f"stage={stage.value} model={selected_model} tier={tier.value if tier else 'N/A'} fallback=false",
{"latency_ms": latency},
level="detail",
)
Comment thread
zweadfx marked this conversation as resolved.
return response
except Exception as exc:
latency = round((perf_counter() - started) * 1000)
if (not routing_enabled) or selected_model == fallback_model:
_log_failure(
stage=stage,
tier=tier,
selected_model=selected_model,
routing_enabled=routing_enabled,
fallback_used=False,
latency_ms=(perf_counter() - started) * 1000,
latency_ms=latency,
message="LLM call failed",
exc=exc,
)
append_job_log(
"llm_error",
f"stage={stage.value} model={selected_model} tier={tier.value if tier else 'N/A'} "
f"fallback=false err={type(exc).__name__}",
{"latency_ms": latency},
)
raise

_log_failure(
Expand All @@ -218,10 +233,16 @@ def invoke(
selected_model=selected_model,
routing_enabled=routing_enabled,
fallback_used=False,
latency_ms=(perf_counter() - started) * 1000,
latency_ms=latency,
message="LLM call failed. Retrying with fallback model.",
exc=exc,
)
append_job_log(
"llm_error",
f"stage={stage.value} model={selected_model} tier={tier.value if tier else 'N/A'} "
f"fallback=false err={type(exc).__name__} retrying=true",
{"latency_ms": latency},
)

fallback_started = perf_counter()
fallback_client = _get_chat_openai_client(
Expand All @@ -232,26 +253,40 @@ def invoke(
)
try:
response = fallback_client.invoke(payload)
fb_latency = round((perf_counter() - fallback_started) * 1000)
_log_success(
stage=stage,
tier=tier,
selected_model=fallback_model,
routing_enabled=routing_enabled,
fallback_used=True,
latency_ms=(perf_counter() - fallback_started) * 1000,
latency_ms=fb_latency,
)
append_job_log(
"llm",
f"stage={stage.value} model={fallback_model} tier={tier.value if tier else 'N/A'} fallback=true",
{"latency_ms": fb_latency},
level="detail",
)
return response
except Exception as fallback_exc:
fb_latency = round((perf_counter() - fallback_started) * 1000)
_log_failure(
stage=stage,
tier=tier,
selected_model=fallback_model,
routing_enabled=routing_enabled,
fallback_used=True,
latency_ms=(perf_counter() - fallback_started) * 1000,
latency_ms=fb_latency,
message="LLM fallback call failed",
exc=fallback_exc,
)
append_job_log(
"llm_error",
f"stage={stage.value} model={fallback_model} tier={tier.value if tier else 'N/A'} "
f"fallback=true err={type(fallback_exc).__name__}",
{"latency_ms": fb_latency},
)
raise


Expand Down Expand Up @@ -279,27 +314,41 @@ async def ainvoke(
resolved_settings.OPENAI_API_KEY,
)
response = await client.ainvoke(payload)
latency = round((perf_counter() - started) * 1000)
_log_success(
stage=stage,
tier=tier,
selected_model=selected_model,
routing_enabled=routing_enabled,
fallback_used=False,
latency_ms=(perf_counter() - started) * 1000,
latency_ms=latency,
)
append_job_log(
"llm",
f"stage={stage.value} model={selected_model} tier={tier.value if tier else 'N/A'} fallback=false",
{"latency_ms": latency},
level="detail",
)
return response
except Exception as exc:
latency = round((perf_counter() - started) * 1000)
if (not routing_enabled) or selected_model == fallback_model:
_log_failure(
stage=stage,
tier=tier,
selected_model=selected_model,
routing_enabled=routing_enabled,
fallback_used=False,
latency_ms=(perf_counter() - started) * 1000,
latency_ms=latency,
message="LLM async call failed",
exc=exc,
)
append_job_log(
"llm_error",
f"stage={stage.value} model={selected_model} tier={tier.value if tier else 'N/A'} "
f"fallback=false err={type(exc).__name__}",
{"latency_ms": latency},
)
raise

_log_failure(
Expand All @@ -308,10 +357,16 @@ async def ainvoke(
selected_model=selected_model,
routing_enabled=routing_enabled,
fallback_used=False,
latency_ms=(perf_counter() - started) * 1000,
latency_ms=latency,
message="LLM async call failed. Retrying with fallback model.",
exc=exc,
)
append_job_log(
"llm_error",
f"stage={stage.value} model={selected_model} tier={tier.value if tier else 'N/A'} "
f"fallback=false err={type(exc).__name__} retrying=true",
{"latency_ms": latency},
)

fallback_started = perf_counter()
fallback_client = _get_chat_openai_client(
Expand All @@ -322,24 +377,38 @@ async def ainvoke(
)
try:
response = await fallback_client.ainvoke(payload)
fb_latency = round((perf_counter() - fallback_started) * 1000)
_log_success(
stage=stage,
tier=tier,
selected_model=fallback_model,
routing_enabled=routing_enabled,
fallback_used=True,
latency_ms=(perf_counter() - fallback_started) * 1000,
latency_ms=fb_latency,
)
append_job_log(
"llm",
f"stage={stage.value} model={fallback_model} tier={tier.value if tier else 'N/A'} fallback=true",
{"latency_ms": fb_latency},
level="detail",
)
return response
except Exception as fallback_exc:
fb_latency = round((perf_counter() - fallback_started) * 1000)
_log_failure(
stage=stage,
tier=tier,
selected_model=fallback_model,
routing_enabled=routing_enabled,
fallback_used=True,
latency_ms=(perf_counter() - fallback_started) * 1000,
latency_ms=fb_latency,
message="LLM async fallback call failed",
exc=fallback_exc,
)
append_job_log(
"llm_error",
f"stage={stage.value} model={fallback_model} tier={tier.value if tier else 'N/A'} "
f"fallback=true err={type(fallback_exc).__name__}",
{"latency_ms": fb_latency},
)
raise
7 changes: 7 additions & 0 deletions app/graph/chat/nodes/analyze_intent.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,13 @@ def analyze_intent(state: ChatState) -> ChatState:
user_query=user_query,
)
intent_draft = _ensure_search_keyword_contains_region(intent_draft, day_region_hints)
append_job_log(
"intent_parsed",
f"op={intent_draft.op} day={intent_draft.target_day} idx={intent_draft.target_index}"
f" kw={intent_draft.search_keyword[:30] if intent_draft.search_keyword else 'N/A'}"
f" clarify={intent_draft.needs_clarification}",
level="detail",
)
except Exception as exc:
logger.error("의도 분석 LLM 호출 실패: %s", exc)
return {**state, "error": "수정 의도 분석에 실패했습니다."}
Expand Down
10 changes: 9 additions & 1 deletion app/graph/chat/nodes/mutate.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ async def mutate(state: ChatState) -> ChatState:
diff_keys.append(build_diff_key(dest_day_num, dest_pos + 1))

day["places"] = places
append_job_log("mutate", f"op={op} day={target_day_num} index={target_index} results={len(search_results)}")
diff_str = ",".join(diff_keys)
append_job_log("mutate", f"op={op} day={target_day_num} idx={target_index} n={len(search_results)} diff={diff_str}")

return {
**state,
Expand Down Expand Up @@ -282,6 +283,13 @@ async def _search_place(intent: dict, day: dict) -> tuple:
logger.error("Google Places search failed: %s", exc)
return None, [], {"error": "장소 검색에 실패했습니다."}

append_job_log(
"mutate_search",
f"kw={keyword[:40]} bbox={'Y' if day_bbox else 'N'} rerank={rerank_enabled}"
f" fb_unfilt={fallback_to_unfiltered} n={len(results)}",
level="detail",
)

search_results = [r.model_dump() for r in results]
if not results:
suggested = _suggest_alternative_keyword(keyword)
Expand Down
7 changes: 6 additions & 1 deletion app/graph/roadmap/nodes/finalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,14 @@ async def synthesize_final_roadmap(state: RoadmapState) -> RoadmapState:
itinerary_context, daily_places = _prepare_final_context(state)
course_request = CourseRequest.model_validate(state["course_request"])
daily_places = await _fill_place_descriptions_with_llm(daily_places)
desc_count = sum(1 for d in daily_places for p in d.get("places", []) if p.get("description"))
append_job_log("finalize_desc", f"place_descriptions_filled={desc_count}")

daily_places = await _apply_visit_time_for_daily_places(
daily_places,
course_request.planning_preference,
)
append_job_log("finalize_time", f"preference={course_request.planning_preference}")

course_request_payload = course_request.model_dump(mode="json")
course_request_payload.pop("budget_range", None)
Expand Down Expand Up @@ -366,7 +370,8 @@ async def synthesize_final_roadmap(state: RoadmapState) -> RoadmapState:
"itinerary": daily_places,
}

append_job_log("finalize", f"title={final_roadmap.get('title', '')}")
total_places = sum(len(d.get("places", [])) for d in daily_places)
append_job_log("finalize", f"title={final_roadmap.get('title', '')} places={total_places}")
return {**state, "final_roadmap": final_roadmap}

except Exception as exc:
Expand Down
25 changes: 20 additions & 5 deletions app/graph/roadmap/nodes/places.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async def search_for_slot(
query: str,
price_levels: list[str] | None,
region: Region | str | None,
) -> tuple[str, list]:
) -> tuple[str, list, str]:
geo_filter_scope = "roadmap_region"
geo_missing_region_bbox = False
geo_filter_fallback_unfiltered = False
Expand Down Expand Up @@ -262,17 +262,28 @@ async def search_for_slot(
bias_used,
unfiltered_used,
)
return slot_key, [place.model_dump() for place in places]
if fallback_stage != "restriction":
append_job_log(
"places_search",
f"slot={slot_key} q={query[:40]} fb={fallback_stage} n={len(places)}",
level="detail",
)
return slot_key, [place.model_dump() for place in places], fallback_stage
except Exception as exc:
logger.warning("Slot place search failed: slot=%s error=%s", slot_key, exc)
return slot_key, []
return slot_key, [], "error"

results = await asyncio.gather(
*[search_for_slot(key, query, levels, region) for key, query, levels, region in tasks]
)

for slot_key, places in results:
empty_count = 0
fb_stats: dict[str, int] = {}
for slot_key, places, fb_stage in results:
fetched_places[slot_key] = places
if not places:
empty_count += 1
fb_stats[fb_stage] = fb_stats.get(fb_stage, 0) + 1

if rerank_enabled:

Expand Down Expand Up @@ -341,7 +352,11 @@ async def rerank_for_day(day: dict) -> None:

await asyncio.gather(*[rerank_for_day(day) for day in skeleton_plan])

append_job_log("places_fetch", f"slot_count={len(fetched_places)} rerank={rerank_enabled}")
fb_summary = " ".join(f"{k}={v}" for k, v in sorted(fb_stats.items()))
append_job_log(
"places_fetch",
f"slots={len(fetched_places)} rerank={rerank_enabled} empty={empty_count} {fb_summary}",
)
logger.info("Slot place fetch completed: slot_count=%d", len(fetched_places))

return {
Expand Down
Loading
Loading