Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions app/modules/proxy/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def __init__(self, repo_factory: ProxyRepoFactory) -> None:
self._http_bridge_sessions: dict[_HTTPBridgeSessionKey, _HTTPBridgeSession] = {}
self._http_bridge_inflight_sessions: dict[_HTTPBridgeSessionKey, asyncio.Future[_HTTPBridgeSession]] = {}
self._http_bridge_turn_state_index: dict[tuple[str, str | None], _HTTPBridgeSessionKey] = {}
self._http_bridge_previous_response_index: dict[tuple[str, str | None], _HTTPBridgeSessionKey] = {}
self._http_bridge_lock = anyio.Lock()

def stream_responses(
Expand Down Expand Up @@ -1490,6 +1491,35 @@ async def _get_or_create_http_bridge_session(
] = alias_session.key
key = alias_session.key
elif incoming_turn_state.startswith("http_turn_"):
if previous_response_id is not None:
previous_alias_key = _http_bridge_previous_response_alias_key(
previous_response_id,
api_key_id,
)
previous_key = self._http_bridge_previous_response_index.get(previous_alias_key)
if previous_key is not None:
previous_session = self._http_bridge_sessions.get(previous_key)
if (
previous_session is not None
and not previous_session.closed
and previous_session.account.status == AccountStatus.ACTIVE
):
key = previous_session.key
self._promote_http_bridge_session_to_codex_affinity(
previous_session,
turn_state=incoming_turn_state,
settings=settings,
)
previous_session.downstream_turn_state_aliases.add(incoming_turn_state)
for alias in previous_session.downstream_turn_state_aliases:
self._http_bridge_turn_state_index[
_http_bridge_turn_state_alias_key(
alias,
previous_session.key.api_key_id,
)
] = previous_session.key
continue
self._http_bridge_previous_response_index.pop(previous_alias_key, None)
key = _HTTPBridgeSessionKey("turn_state_header", incoming_turn_state, api_key_id)
if self._http_bridge_inflight_sessions.get(key) is not None:
pass
Expand Down Expand Up @@ -1585,6 +1615,52 @@ async def _get_or_create_http_bridge_session(
sessions_to_close.append(existing)

inflight_future = self._http_bridge_inflight_sessions.get(key)
if previous_response_id is not None and inflight_future is None and (
existing is None or existing.closed or existing.account.status != AccountStatus.ACTIVE
Comment on lines +1618 to +1619
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Recover by previous_response_id despite inflight key creation

This guard blocks previous-response recovery whenever an inflight session future exists for the request-derived key, and the method then sets a continuity error for any request with previous_response_id. That means a valid mapped live session can be ignored during concurrent traffic (for example, another request is currently creating a session for the same prompt-cache key), causing a false 400 continuity failure instead of reusing the known session.

Useful? React with 👍 / 👎.

):
previous_alias_key = _http_bridge_previous_response_alias_key(previous_response_id, api_key_id)
previous_key = self._http_bridge_previous_response_index.get(previous_alias_key)
if previous_key is not None:
previous_session = self._http_bridge_sessions.get(previous_key)
if (
previous_session is not None
and not previous_session.closed
and previous_session.account.status == AccountStatus.ACTIVE
):
key = previous_session.key
existing = previous_session
inflight_future = self._http_bridge_inflight_sessions.get(previous_key)
if incoming_turn_state:
self._promote_http_bridge_session_to_codex_affinity(
previous_session,
turn_state=incoming_turn_state,
settings=settings,
)
previous_session.downstream_turn_state_aliases.add(incoming_turn_state)
for alias in previous_session.downstream_turn_state_aliases:
self._http_bridge_turn_state_index[
_http_bridge_turn_state_alias_key(
alias,
previous_session.key.api_key_id,
)
] = previous_session.key
if inflight_future is None:
previous_session.request_model = request_model
previous_session.last_used_at = time.monotonic()
_log_http_bridge_event(
"reuse",
key,
account_id=previous_session.account.id,
model=previous_session.request_model,
pending_count=await self._http_bridge_pending_count(previous_session),
cache_key_family=key.affinity_kind,
model_class=_extract_model_class(previous_session.request_model)
if previous_session.request_model
else None,
)
return previous_session
else:
self._http_bridge_previous_response_index.pop(previous_alias_key, None)
if previous_response_id is not None:
continuity_error = ProxyResponseError(
400,
Expand Down Expand Up @@ -1750,6 +1826,7 @@ async def close_all_http_bridge_sessions(self) -> None:
sessions_to_close = list(self._http_bridge_sessions.values())
self._http_bridge_sessions.clear()
self._http_bridge_inflight_sessions.clear()
self._http_bridge_previous_response_index.clear()

for session in sessions_to_close:
await self._close_http_bridge_session(session)
Expand Down Expand Up @@ -1789,8 +1866,10 @@ async def _close_http_bridge_session(
session.closed = True
if turn_state_lock_held:
self._unregister_http_bridge_turn_states_locked(session)
self._unregister_http_bridge_previous_response_ids_locked(session)
else:
await self._unregister_http_bridge_turn_states(session)
await self._unregister_http_bridge_previous_response_ids(session)
if session.upstream_reader is not None:
await _await_cancelled_task(session.upstream_reader, label="http bridge upstream reader")
try:
Expand Down Expand Up @@ -1818,10 +1897,29 @@ async def _register_http_bridge_turn_state(self, session: "_HTTPBridgeSession",
session.key
)

async def _register_http_bridge_previous_response_id(
self,
session: "_HTTPBridgeSession",
response_id: str,
) -> None:
stripped_response_id = response_id.strip()
if not stripped_response_id:
return
async with self._http_bridge_lock:
if session.closed:
return
alias_key = _http_bridge_previous_response_alias_key(stripped_response_id, session.key.api_key_id)
self._http_bridge_previous_response_index[alias_key] = session.key
session.previous_response_ids.add(stripped_response_id)

async def _unregister_http_bridge_turn_states(self, session: "_HTTPBridgeSession") -> None:
async with self._http_bridge_lock:
self._unregister_http_bridge_turn_states_locked(session)

async def _unregister_http_bridge_previous_response_ids(self, session: "_HTTPBridgeSession") -> None:
async with self._http_bridge_lock:
self._unregister_http_bridge_previous_response_ids_locked(session)

def _unregister_http_bridge_turn_states_locked(self, session: "_HTTPBridgeSession") -> None:
aliases = tuple(session.downstream_turn_state_aliases)
for alias in aliases:
Expand All @@ -1831,6 +1929,18 @@ def _unregister_http_bridge_turn_states_locked(self, session: "_HTTPBridgeSessio
)
session.downstream_turn_state_aliases.clear()

def _unregister_http_bridge_previous_response_ids_locked(self, session: "_HTTPBridgeSession") -> None:
response_ids_set = getattr(session, "previous_response_ids", None)
if not isinstance(response_ids_set, set):
return
response_ids = tuple(response_ids_set)
for response_id in response_ids:
self._http_bridge_previous_response_index.pop(
_http_bridge_previous_response_alias_key(response_id, session.key.api_key_id),
None,
)
response_ids_set.clear()

def _promote_http_bridge_session_to_codex_affinity(
self,
session: "_HTTPBridgeSession",
Expand Down Expand Up @@ -2444,6 +2554,9 @@ async def _process_http_bridge_upstream_text(
if event_type == "response.created" and release_create_gate and created_request_state is not None:
_release_websocket_response_create_gate(created_request_state, session.response_create_gate)

if response_id is not None and matched_request_state is not None:
await self._register_http_bridge_previous_response_id(session, response_id)
Comment on lines +2557 to +2558
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Index response IDs for terminal events without created

_process_http_bridge_upstream_text only writes to _http_bridge_previous_response_index when matched_request_state is set, but the function explicitly supports a fallback where response.created is missing and _pop_terminal_websocket_request_state(...) resolves the request at terminal time. In that fallback, matched_request_state stays None, so successful responses with a terminal response.id are never indexed, and a follow-up request using that previous_response_id can incorrectly fail with previous_response_not_found even though continuity is still live.

Useful? React with 👍 / 👎.


if matched_request_state is not None and matched_request_state.event_queue is not None:
await matched_request_state.event_queue.put(event_block)

Expand Down Expand Up @@ -4410,6 +4523,7 @@ class _HTTPBridgeSession:
upstream_turn_state: str | None = None
downstream_turn_state: str | None = None
downstream_turn_state_aliases: set[str] = field(default_factory=set)
previous_response_ids: set[str] = field(default_factory=set)
upstream_reader: asyncio.Task[None] | None = None
closed: bool = False

Expand Down Expand Up @@ -5037,6 +5151,10 @@ def _http_bridge_turn_state_alias_key(turn_state: str, api_key_id: str | None) -
return (turn_state, api_key_id)


def _http_bridge_previous_response_alias_key(response_id: str, api_key_id: str | None) -> tuple[str, str | None]:
return (response_id.strip(), api_key_id)


def _resolve_prompt_cache_key(
payload: ResponsesRequest | ResponsesCompactRequest,
*,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
## Why
Intermittent HTTP bridge continuity glitches can return `previous_response_not_found` even when the client is still on the same logical conversation. This commonly happens when `x-codex-turn-state` alias continuity is briefly unavailable or prompt-cache affinity drifts between adjacent turns.

## What Changes
- Add a bridge-local recovery index that maps completed/created `response.id` values to live HTTP bridge sessions.
- Use `previous_response_id` to recover a live bridge session before fail-closed continuity errors are emitted.
- Keep fail-closed behavior when no live session can be recovered.
- Add integration coverage for recovery when the request key changes between turns.

## Impact
- Reduces intermittent `previous_response_not_found` responses for valid sequential turns.
- Preserves strict fail-closed behavior when continuity is truly gone.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### ADDED Requirements

### Requirement: HTTP bridge recovers continuity from previous_response_id when live session is known
When handling HTTP `/v1/responses` and `/backend-api/codex/responses`, the service MUST maintain an in-memory mapping from emitted upstream `response.id` values to the owning live HTTP bridge session (scoped by API key identity). For requests that include `previous_response_id`, the service MUST attempt to recover and reuse that mapped live bridge session before returning a continuity loss error.

#### Scenario: previous_response_id recovers continuity when request affinity key drifts
- **WHEN** a prior HTTP bridged request emitted a `response.id`
- **AND** a follow-up HTTP request includes that value as `previous_response_id`
- **AND** the follow-up request's bridge affinity key differs from the prior request
- **THEN** the service reuses the mapped live bridge session for the follow-up request
- **AND** the request succeeds without returning `previous_response_not_found`

#### Scenario: stale previous_response_id mapping does not bypass fail-closed behavior
- **WHEN** a follow-up HTTP request includes `previous_response_id`
- **AND** the mapped bridge session is closed, missing, or otherwise inactive
- **THEN** the service removes the stale mapping
- **AND** the service continues the existing fail-closed behavior by returning `previous_response_not_found`
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- [x] Add HTTP bridge previous-response recovery index keyed by `response.id` + API key scope.
- [x] Reuse recovered live bridge sessions for requests carrying `previous_response_id` before fail-closed continuity errors.
- [x] Clean up previous-response recovery mappings when bridge sessions close.
- [x] Add integration regression coverage for recovery without turn-state alias continuity.
99 changes: 99 additions & 0 deletions tests/integration/test_http_responses_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -5561,6 +5561,105 @@ async def fake_connect_responses_websocket(
assert connect_count == 1


@pytest.mark.asyncio
async def test_v1_responses_http_bridge_recovers_previous_response_without_turn_state_alias(
async_client,
monkeypatch,
):
_install_bridge_settings(monkeypatch, enabled=True)
account_id = await _import_account(
async_client,
"acc_http_bridge_previous_response_recover",
"http-bridge-previous-response-recover@example.com",
)
account = await _get_account(account_id)
fake_upstream = _FakeBridgeUpstreamWebSocket()
connect_count = 0

async def fake_select_account_with_budget(
self,
deadline,
*,
request_id,
kind,
sticky_key,
sticky_kind,
reallocate_sticky,
sticky_max_age_seconds,
prefer_earlier_reset_accounts,
routing_strategy,
model,
exclude_account_ids=None,
additional_limit_name=None,
):
del (
self,
deadline,
request_id,
kind,
sticky_key,
sticky_kind,
reallocate_sticky,
sticky_max_age_seconds,
prefer_earlier_reset_accounts,
routing_strategy,
model,
exclude_account_ids,
additional_limit_name,
)
return AccountSelection(account=account, error_message=None, error_code=None)

async def fake_ensure_fresh_with_budget(self, target, *, force=False, timeout_seconds):
del self, force, timeout_seconds
return target

async def fake_connect_responses_websocket(
headers,
access_token,
account_id_header,
*,
base_url=None,
session=None,
):
del headers, access_token, account_id_header, base_url, session
nonlocal connect_count
connect_count += 1
return fake_upstream

monkeypatch.setattr(proxy_module.ProxyService, "_select_account_with_budget", fake_select_account_with_budget)
monkeypatch.setattr(proxy_module.ProxyService, "_ensure_fresh_with_budget", fake_ensure_fresh_with_budget)
monkeypatch.setattr(proxy_module, "connect_responses_websocket", fake_connect_responses_websocket)

first = await async_client.post(
"/v1/responses",
json={
"model": "gpt-5.1",
"instructions": "Return exactly OK.",
"input": "hello",
"prompt_cache_key": "recover-previous-response-a",
},
)
assert first.status_code == 200
first_body = first.json()

second = await async_client.post(
"/v1/responses",
json={
"model": "gpt-5.1",
"instructions": "Return exactly OK.",
"input": "hello-again",
# Intentionally rotate the cache key so continuity has to recover from previous_response_id.
"prompt_cache_key": "recover-previous-response-b",
"previous_response_id": first_body["id"],
},
)

assert second.status_code == 200
second_body = second.json()
assert second_body["id"] != first_body["id"]
assert connect_count == 1


@pytest.mark.asyncio
async def test_v1_responses_http_bridge_precreated_disconnect_returns_previous_response_not_found(
async_client,
Expand Down
Loading