Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/db/alembic/versions/20260213_000000_base_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ def upgrade() -> None:
server_default=sa.text("CURRENT_TIMESTAMP"),
),
sa.Column("model", sa.String(), nullable=False),
sa.Column("request_kind", sa.String(), nullable=True),
sa.Column("session_id_hash", sa.String(), nullable=True),
sa.Column("transport", sa.String(), nullable=True),
sa.Column("input_tokens", sa.Integer(), nullable=True),
sa.Column("output_tokens", sa.Integer(), nullable=True),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""add request kind and session hash to request_logs

Revision ID: 20260311_000000_add_request_logs_kind_and_session_hash
Revises: 20260312_120000_add_dashboard_upstream_stream_transport
Create Date: 2026-03-11
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy.engine import Connection

# revision identifiers, used by Alembic.
revision = "20260311_000000_add_request_logs_kind_and_session_hash"
down_revision = "20260312_120000_add_dashboard_upstream_stream_transport"
branch_labels = None
depends_on = None


def _columns(connection: Connection, table_name: str) -> set[str]:
inspector = sa.inspect(connection)
if not inspector.has_table(table_name):
return set()
return {column["name"] for column in inspector.get_columns(table_name)}


def upgrade() -> None:
bind = op.get_bind()
columns = _columns(bind, "request_logs")
if not columns:
return

with op.batch_alter_table("request_logs") as batch_op:
if "request_kind" not in columns:
batch_op.add_column(sa.Column("request_kind", sa.String(), nullable=True))
if "session_id_hash" not in columns:
batch_op.add_column(sa.Column("session_id_hash", sa.String(), nullable=True))


def downgrade() -> None:
bind = op.get_bind()
columns = _columns(bind, "request_logs")
if not columns:
return

with op.batch_alter_table("request_logs") as batch_op:
if "session_id_hash" in columns:
batch_op.drop_column("session_id_hash")
if "request_kind" in columns:
batch_op.drop_column("request_kind")
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""merge request-log kind and account-status heads

Revision ID: 20260319_191500_merge_request_log_kind_and_account_status_heads
Revises: 20260311_000000_add_request_logs_kind_and_session_hash, 20260319_183000_normalize_sqlite_account_status_casing
Create Date: 2026-03-19
"""

from __future__ import annotations

# revision identifiers, used by Alembic.
revision = "20260319_191500_merge_request_log_kind_and_account_status_heads"
down_revision = (
"20260311_000000_add_request_logs_kind_and_session_hash",
"20260319_183000_normalize_sqlite_account_status_casing",
)
branch_labels = None
depends_on = None


def upgrade() -> None:
pass


def downgrade() -> None:
pass
2 changes: 2 additions & 0 deletions app/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class RequestLog(Base):
request_id: Mapped[str] = mapped_column(String, nullable=False)
requested_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), nullable=False)
model: Mapped[str] = mapped_column(String, nullable=False)
request_kind: Mapped[str | None] = mapped_column(String, nullable=True)
session_id_hash: Mapped[str | None] = mapped_column(String, nullable=True)
transport: Mapped[str | None] = mapped_column(String, nullable=True)
service_tier: Mapped[str | None] = mapped_column(String, nullable=True)
input_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True)
Expand Down
64 changes: 64 additions & 0 deletions app/modules/proxy/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@
_TEXT_DONE_CONTENT_PART_TYPES = frozenset({"output_text", "refusal"})
_REQUEST_TRANSPORT_HTTP = "http"
_REQUEST_TRANSPORT_WEBSOCKET = "websocket"
_REQUEST_KIND_RESPONSES = "responses"
_REQUEST_KIND_COMPACT = "compact"
_REQUEST_KIND_TRANSCRIPTION = "transcription"
_COMPACT_UPSTREAM_ENDPOINT = "/codex/responses/compact"
_COMPACT_SAME_CONTRACT_RETRY_BUDGET = 1
_ACCOUNT_RECOVERY_RETRY_CODES = frozenset(
{
Expand Down Expand Up @@ -317,6 +321,7 @@ async def _stream_via_http_bridge(
api_key=api_key,
api_key_reservation=api_key_reservation,
)
request_state.session_id_hash = _session_id_hash_from_headers(headers)
request_state.transport = _REQUEST_TRANSPORT_HTTP

await self._submit_http_bridge_request(
Expand Down Expand Up @@ -367,6 +372,7 @@ async def compact_responses(

settings = await get_settings_cache().get()
prefer_earlier_reset = settings.prefer_earlier_reset_accounts
session_id_hash = _session_id_hash_from_headers(headers)
had_prompt_cache_key = _prompt_cache_key_from_request_model(payload) is not None
affinity = _sticky_key_for_compact_request(
payload,
Expand Down Expand Up @@ -614,6 +620,8 @@ async def _call_compact(target: Account) -> CompactResponsePayload:
usage.output_tokens_details.reasoning_tokens if usage and usage.output_tokens_details else None
),
reasoning_effort=reasoning_effort,
request_kind=_REQUEST_KIND_COMPACT,
session_id_hash=session_id_hash,
transport=_REQUEST_TRANSPORT_HTTP,
service_tier=_service_tier_from_response(response) or _service_tier_from_compact_payload(payload),
)
Expand Down Expand Up @@ -647,6 +655,7 @@ async def transcribe(
settings = await get_settings_cache().get()
prefer_earlier_reset = settings.prefer_earlier_reset_accounts
routing_strategy = _routing_strategy(settings)
session_id_hash = _session_id_hash_from_headers(headers)
try:
selection = await self._select_account_with_budget(
deadline,
Expand Down Expand Up @@ -781,6 +790,8 @@ async def _call_transcribe(target: Account) -> dict[str, JsonValue]:
status=log_status,
error_code=log_error_code,
error_message=log_error_message,
request_kind=_REQUEST_KIND_TRANSCRIPTION,
session_id_hash=session_id_hash,
transport=_REQUEST_TRANSPORT_HTTP,
)

Expand Down Expand Up @@ -1055,6 +1066,7 @@ async def _prepare_websocket_response_create_request(
include_type_field=True,
attach_event_queue=False,
)
request_state.session_id_hash = _session_id_hash_from_headers(headers)
had_prompt_cache_key = _prompt_cache_key_from_request_model(responses_payload) is not None
affinity_policy = _sticky_key_for_responses_request(
responses_payload,
Expand Down Expand Up @@ -2548,6 +2560,8 @@ async def _finalize_websocket_request_state(
cached_input_tokens=cached_input_tokens,
reasoning_tokens=reasoning_tokens,
reasoning_effort=request_state.reasoning_effort,
request_kind=request_state.request_kind,
session_id_hash=request_state.session_id_hash,
transport=request_state.transport,
service_tier=response_service_tier,
)
Expand All @@ -2573,6 +2587,8 @@ async def _write_websocket_connect_failure(
error_code=error_code,
error_message=error_message,
reasoning_effort=request_state.reasoning_effort,
request_kind=request_state.request_kind,
session_id_hash=request_state.session_id_hash,
transport=request_state.transport,
service_tier=request_state.service_tier,
)
Expand Down Expand Up @@ -2681,6 +2697,8 @@ async def _fail_pending_websocket_requests(
error_code=error_code,
error_message=error_message,
reasoning_effort=request_state.reasoning_effort,
request_kind=request_state.request_kind,
session_id_hash=request_state.session_id_hash,
transport=request_state.transport,
service_tier=request_state.service_tier,
)
Expand Down Expand Up @@ -2929,6 +2947,7 @@ async def _stream_with_retry(
sticky_threads_enabled=settings.sticky_threads_enabled,
api_key=api_key,
)
session_id_hash = _session_id_hash_from_headers(headers)
sticky_key_source = "none"
if affinity.kind == StickySessionKind.CODEX_SESSION:
sticky_key_source = "session_header"
Expand Down Expand Up @@ -2966,6 +2985,8 @@ async def _stream_with_retry(
error_code="upstream_request_timeout",
error_message="Proxy request budget exhausted",
reasoning_effort=payload.reasoning.effort if payload.reasoning else None,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
service_tier=payload.service_tier,
transport=request_transport,
)
Expand Down Expand Up @@ -2998,6 +3019,8 @@ async def _stream_with_retry(
error_code="upstream_request_timeout",
error_message="Proxy request budget exhausted",
reasoning_effort=payload.reasoning.effort if payload.reasoning else None,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
service_tier=payload.service_tier,
transport=request_transport,
)
Expand Down Expand Up @@ -3037,6 +3060,8 @@ async def _stream_with_retry(
error_code=error_code,
error_message=no_accounts_msg,
reasoning_effort=payload.reasoning.effort if payload.reasoning else None,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
transport=request_transport,
service_tier=payload.service_tier,
)
Expand All @@ -3062,6 +3087,8 @@ async def _stream_with_retry(
error_code="upstream_request_timeout",
error_message="Proxy request budget exhausted",
reasoning_effort=payload.reasoning.effort if payload.reasoning else None,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
service_tier=payload.service_tier,
transport=request_transport,
)
Expand All @@ -3087,6 +3114,8 @@ async def _stream_with_retry(
error_code="upstream_unavailable",
error_message=message,
reasoning_effort=payload.reasoning.effort if payload.reasoning else None,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
service_tier=payload.service_tier,
transport=request_transport,
)
Expand Down Expand Up @@ -3117,6 +3146,8 @@ async def _stream_with_retry(
error_code="upstream_request_timeout",
error_message="Proxy request budget exhausted",
reasoning_effort=payload.reasoning.effort if payload.reasoning else None,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
service_tier=payload.service_tier,
transport=request_transport,
)
Expand Down Expand Up @@ -3144,6 +3175,8 @@ async def _stream_with_retry(
suppress_text_done_events=suppress_text_done_events,
upstream_stream_transport=upstream_stream_transport,
request_transport=request_transport,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
):
yield line
except (_TransientStreamError, ProxyResponseError) as tex:
Expand Down Expand Up @@ -3236,6 +3269,8 @@ async def _stream_with_retry(
error_code="upstream_request_timeout",
error_message="Proxy request budget exhausted",
reasoning_effort=payload.reasoning.effort if payload.reasoning else None,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
service_tier=payload.service_tier,
transport=request_transport,
)
Expand Down Expand Up @@ -3269,6 +3304,8 @@ async def _stream_with_retry(
error_code="upstream_unavailable",
error_message=message,
reasoning_effort=payload.reasoning.effort if payload.reasoning else None,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
service_tier=payload.service_tier,
transport=request_transport,
)
Expand Down Expand Up @@ -3298,6 +3335,8 @@ async def _stream_with_retry(
error_code="upstream_request_timeout",
error_message="Proxy request budget exhausted",
reasoning_effort=payload.reasoning.effort if payload.reasoning else None,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
service_tier=payload.service_tier,
transport=request_transport,
)
Expand All @@ -3316,6 +3355,8 @@ async def _stream_with_retry(
suppress_text_done_events=suppress_text_done_events,
upstream_stream_transport=upstream_stream_transport,
request_transport=request_transport,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
):
yield line
finally:
Expand Down Expand Up @@ -3398,6 +3439,8 @@ async def _stream_with_retry(
error_code="no_accounts",
error_message=retries_exhausted_msg,
reasoning_effort=payload.reasoning.effort if payload.reasoning else None,
request_kind=_REQUEST_KIND_RESPONSES,
session_id_hash=session_id_hash,
transport=request_transport,
service_tier=payload.service_tier,
)
Expand Down Expand Up @@ -3432,6 +3475,8 @@ async def _stream_once(
suppress_text_done_events: bool,
upstream_stream_transport: str | None,
request_transport: str,
request_kind: str,
session_id_hash: str | None,
) -> AsyncIterator[str]:
account_id_value = account.id
access_token = self._encryptor.decrypt(account.access_token_encrypted)
Expand Down Expand Up @@ -3608,6 +3653,8 @@ async def _stream_once(
cached_input_tokens=cached_input_tokens,
reasoning_tokens=reasoning_tokens,
reasoning_effort=reasoning_effort,
request_kind=request_kind,
session_id_hash=session_id_hash,
transport=request_transport,
service_tier=service_tier,
)
Expand All @@ -3633,6 +3680,8 @@ async def _write_request_log(
cached_input_tokens: int | None = None,
reasoning_tokens: int | None = None,
reasoning_effort: str | None = None,
request_kind: str | None = None,
session_id_hash: str | None = None,
transport: str | None = None,
service_tier: str | None = None,
) -> None:
Expand All @@ -3649,6 +3698,8 @@ async def _write_request_log(
cached_input_tokens=cached_input_tokens,
reasoning_tokens=reasoning_tokens,
reasoning_effort=reasoning_effort,
request_kind=request_kind,
session_id_hash=session_id_hash,
transport=transport,
service_tier=service_tier,
latency_ms=latency_ms,
Expand Down Expand Up @@ -3676,6 +3727,8 @@ async def _write_stream_preflight_error(
error_message: str,
reasoning_effort: str | None,
service_tier: str | None,
request_kind: str | None = None,
session_id_hash: str | None = None,
transport: str = _REQUEST_TRANSPORT_HTTP,
) -> None:
await self._write_request_log(
Expand All @@ -3688,6 +3741,8 @@ async def _write_stream_preflight_error(
error_code=error_code,
error_message=error_message,
reasoning_effort=reasoning_effort,
request_kind=request_kind,
session_id_hash=session_id_hash,
transport=transport,
service_tier=service_tier,
)
Expand Down Expand Up @@ -4021,6 +4076,8 @@ class _WebSocketRequestState:
started_at: float
response_id: str | None = None
awaiting_response_created: bool = False
request_kind: str = _REQUEST_KIND_RESPONSES
session_id_hash: str | None = None
event_queue: asyncio.Queue[str | None] | None = None
transport: str = _REQUEST_TRANSPORT_WEBSOCKET
api_key: ApiKeyData | None = None
Expand Down Expand Up @@ -4565,6 +4622,13 @@ def _sticky_key_from_session_header(headers: Mapping[str, str]) -> str | None:
return None


def _session_id_hash_from_headers(headers: Mapping[str, str]) -> str | None:
session_id = _sticky_key_from_session_header(headers)
if session_id is None:
return None
return _hash_identifier(session_id)


def _sticky_key_from_turn_state_header(headers: Mapping[str, str]) -> str | None:
normalized = {key.lower(): value for key, value in headers.items()}
value = normalized.get("x-codex-turn-state")
Expand Down
Loading
Loading