diff --git a/app/modules/proxy/service.py b/app/modules/proxy/service.py index bdd8652a..622652fc 100644 --- a/app/modules/proxy/service.py +++ b/app/modules/proxy/service.py @@ -172,6 +172,7 @@ def __init__(self, repo_factory: ProxyRepoFactory) -> None: self._http_bridge_sessions: dict[_HTTPBridgeSessionKey, _HTTPBridgeSession] = {} self._http_bridge_inflight_sessions: dict[_HTTPBridgeSessionKey, asyncio.Future[_HTTPBridgeSession]] = {} self._http_bridge_turn_state_index: dict[tuple[str, str | None], _HTTPBridgeSessionKey] = {} + self._http_bridge_previous_response_index: dict[tuple[str, str | None], _HTTPBridgeSessionKey] = {} self._http_bridge_lock = anyio.Lock() def stream_responses( @@ -1490,6 +1491,35 @@ async def _get_or_create_http_bridge_session( ] = alias_session.key key = alias_session.key elif incoming_turn_state.startswith("http_turn_"): + if previous_response_id is not None: + previous_alias_key = _http_bridge_previous_response_alias_key( + previous_response_id, + api_key_id, + ) + previous_key = self._http_bridge_previous_response_index.get(previous_alias_key) + if previous_key is not None: + previous_session = self._http_bridge_sessions.get(previous_key) + if ( + previous_session is not None + and not previous_session.closed + and previous_session.account.status == AccountStatus.ACTIVE + ): + key = previous_session.key + self._promote_http_bridge_session_to_codex_affinity( + previous_session, + turn_state=incoming_turn_state, + settings=settings, + ) + previous_session.downstream_turn_state_aliases.add(incoming_turn_state) + for alias in previous_session.downstream_turn_state_aliases: + self._http_bridge_turn_state_index[ + _http_bridge_turn_state_alias_key( + alias, + previous_session.key.api_key_id, + ) + ] = previous_session.key + continue + self._http_bridge_previous_response_index.pop(previous_alias_key, None) key = _HTTPBridgeSessionKey("turn_state_header", incoming_turn_state, api_key_id) if self._http_bridge_inflight_sessions.get(key) is not None: pass @@ -1585,6 +1615,52 @@ async def _get_or_create_http_bridge_session( sessions_to_close.append(existing) inflight_future = self._http_bridge_inflight_sessions.get(key) + if previous_response_id is not None and inflight_future is None and ( + existing is None or existing.closed or existing.account.status != AccountStatus.ACTIVE + ): + previous_alias_key = _http_bridge_previous_response_alias_key(previous_response_id, api_key_id) + previous_key = self._http_bridge_previous_response_index.get(previous_alias_key) + if previous_key is not None: + previous_session = self._http_bridge_sessions.get(previous_key) + if ( + previous_session is not None + and not previous_session.closed + and previous_session.account.status == AccountStatus.ACTIVE + ): + key = previous_session.key + existing = previous_session + inflight_future = self._http_bridge_inflight_sessions.get(previous_key) + if incoming_turn_state: + self._promote_http_bridge_session_to_codex_affinity( + previous_session, + turn_state=incoming_turn_state, + settings=settings, + ) + previous_session.downstream_turn_state_aliases.add(incoming_turn_state) + for alias in previous_session.downstream_turn_state_aliases: + self._http_bridge_turn_state_index[ + _http_bridge_turn_state_alias_key( + alias, + previous_session.key.api_key_id, + ) + ] = previous_session.key + if inflight_future is None: + previous_session.request_model = request_model + previous_session.last_used_at = time.monotonic() + _log_http_bridge_event( + "reuse", + key, + account_id=previous_session.account.id, + model=previous_session.request_model, + pending_count=await self._http_bridge_pending_count(previous_session), + cache_key_family=key.affinity_kind, + model_class=_extract_model_class(previous_session.request_model) + if previous_session.request_model + else None, + ) + return previous_session + else: + self._http_bridge_previous_response_index.pop(previous_alias_key, None) if previous_response_id is not None: continuity_error = ProxyResponseError( 400, @@ -1750,6 +1826,7 @@ async def close_all_http_bridge_sessions(self) -> None: sessions_to_close = list(self._http_bridge_sessions.values()) self._http_bridge_sessions.clear() self._http_bridge_inflight_sessions.clear() + self._http_bridge_previous_response_index.clear() for session in sessions_to_close: await self._close_http_bridge_session(session) @@ -1789,8 +1866,10 @@ async def _close_http_bridge_session( session.closed = True if turn_state_lock_held: self._unregister_http_bridge_turn_states_locked(session) + self._unregister_http_bridge_previous_response_ids_locked(session) else: await self._unregister_http_bridge_turn_states(session) + await self._unregister_http_bridge_previous_response_ids(session) if session.upstream_reader is not None: await _await_cancelled_task(session.upstream_reader, label="http bridge upstream reader") try: @@ -1818,10 +1897,29 @@ async def _register_http_bridge_turn_state(self, session: "_HTTPBridgeSession", session.key ) + async def _register_http_bridge_previous_response_id( + self, + session: "_HTTPBridgeSession", + response_id: str, + ) -> None: + stripped_response_id = response_id.strip() + if not stripped_response_id: + return + async with self._http_bridge_lock: + if session.closed: + return + alias_key = _http_bridge_previous_response_alias_key(stripped_response_id, session.key.api_key_id) + self._http_bridge_previous_response_index[alias_key] = session.key + session.previous_response_ids.add(stripped_response_id) + async def _unregister_http_bridge_turn_states(self, session: "_HTTPBridgeSession") -> None: async with self._http_bridge_lock: self._unregister_http_bridge_turn_states_locked(session) + async def _unregister_http_bridge_previous_response_ids(self, session: "_HTTPBridgeSession") -> None: + async with self._http_bridge_lock: + self._unregister_http_bridge_previous_response_ids_locked(session) + def _unregister_http_bridge_turn_states_locked(self, session: "_HTTPBridgeSession") -> None: aliases = tuple(session.downstream_turn_state_aliases) for alias in aliases: @@ -1831,6 +1929,18 @@ def _unregister_http_bridge_turn_states_locked(self, session: "_HTTPBridgeSessio ) session.downstream_turn_state_aliases.clear() + def _unregister_http_bridge_previous_response_ids_locked(self, session: "_HTTPBridgeSession") -> None: + response_ids_set = getattr(session, "previous_response_ids", None) + if not isinstance(response_ids_set, set): + return + response_ids = tuple(response_ids_set) + for response_id in response_ids: + self._http_bridge_previous_response_index.pop( + _http_bridge_previous_response_alias_key(response_id, session.key.api_key_id), + None, + ) + response_ids_set.clear() + def _promote_http_bridge_session_to_codex_affinity( self, session: "_HTTPBridgeSession", @@ -2444,6 +2554,9 @@ async def _process_http_bridge_upstream_text( if event_type == "response.created" and release_create_gate and created_request_state is not None: _release_websocket_response_create_gate(created_request_state, session.response_create_gate) + if response_id is not None and matched_request_state is not None: + await self._register_http_bridge_previous_response_id(session, response_id) + if matched_request_state is not None and matched_request_state.event_queue is not None: await matched_request_state.event_queue.put(event_block) @@ -4410,6 +4523,7 @@ class _HTTPBridgeSession: upstream_turn_state: str | None = None downstream_turn_state: str | None = None downstream_turn_state_aliases: set[str] = field(default_factory=set) + previous_response_ids: set[str] = field(default_factory=set) upstream_reader: asyncio.Task[None] | None = None closed: bool = False @@ -5037,6 +5151,10 @@ def _http_bridge_turn_state_alias_key(turn_state: str, api_key_id: str | None) - return (turn_state, api_key_id) +def _http_bridge_previous_response_alias_key(response_id: str, api_key_id: str | None) -> tuple[str, str | None]: + return (response_id.strip(), api_key_id) + + def _resolve_prompt_cache_key( payload: ResponsesRequest | ResponsesCompactRequest, *, diff --git a/openspec/changes/harden-http-bridge-previous-response-recovery/proposal.md b/openspec/changes/harden-http-bridge-previous-response-recovery/proposal.md new file mode 100644 index 00000000..c6e656a8 --- /dev/null +++ b/openspec/changes/harden-http-bridge-previous-response-recovery/proposal.md @@ -0,0 +1,12 @@ +## Why +Intermittent HTTP bridge continuity glitches can return `previous_response_not_found` even when the client is still on the same logical conversation. This commonly happens when `x-codex-turn-state` alias continuity is briefly unavailable or prompt-cache affinity drifts between adjacent turns. + +## What Changes +- Add a bridge-local recovery index that maps completed/created `response.id` values to live HTTP bridge sessions. +- Use `previous_response_id` to recover a live bridge session before fail-closed continuity errors are emitted. +- Keep fail-closed behavior when no live session can be recovered. +- Add integration coverage for recovery when the request key changes between turns. + +## Impact +- Reduces intermittent `previous_response_not_found` responses for valid sequential turns. +- Preserves strict fail-closed behavior when continuity is truly gone. diff --git a/openspec/changes/harden-http-bridge-previous-response-recovery/specs/responses-api-compat/spec.md b/openspec/changes/harden-http-bridge-previous-response-recovery/specs/responses-api-compat/spec.md new file mode 100644 index 00000000..06c7c060 --- /dev/null +++ b/openspec/changes/harden-http-bridge-previous-response-recovery/specs/responses-api-compat/spec.md @@ -0,0 +1,17 @@ +### ADDED Requirements + +### Requirement: HTTP bridge recovers continuity from previous_response_id when live session is known +When handling HTTP `/v1/responses` and `/backend-api/codex/responses`, the service MUST maintain an in-memory mapping from emitted upstream `response.id` values to the owning live HTTP bridge session (scoped by API key identity). For requests that include `previous_response_id`, the service MUST attempt to recover and reuse that mapped live bridge session before returning a continuity loss error. + +#### Scenario: previous_response_id recovers continuity when request affinity key drifts +- **WHEN** a prior HTTP bridged request emitted a `response.id` +- **AND** a follow-up HTTP request includes that value as `previous_response_id` +- **AND** the follow-up request's bridge affinity key differs from the prior request +- **THEN** the service reuses the mapped live bridge session for the follow-up request +- **AND** the request succeeds without returning `previous_response_not_found` + +#### Scenario: stale previous_response_id mapping does not bypass fail-closed behavior +- **WHEN** a follow-up HTTP request includes `previous_response_id` +- **AND** the mapped bridge session is closed, missing, or otherwise inactive +- **THEN** the service removes the stale mapping +- **AND** the service continues the existing fail-closed behavior by returning `previous_response_not_found` diff --git a/openspec/changes/harden-http-bridge-previous-response-recovery/tasks.md b/openspec/changes/harden-http-bridge-previous-response-recovery/tasks.md new file mode 100644 index 00000000..b70f4d1b --- /dev/null +++ b/openspec/changes/harden-http-bridge-previous-response-recovery/tasks.md @@ -0,0 +1,4 @@ +- [x] Add HTTP bridge previous-response recovery index keyed by `response.id` + API key scope. +- [x] Reuse recovered live bridge sessions for requests carrying `previous_response_id` before fail-closed continuity errors. +- [x] Clean up previous-response recovery mappings when bridge sessions close. +- [x] Add integration regression coverage for recovery without turn-state alias continuity. diff --git a/tests/integration/test_http_responses_bridge.py b/tests/integration/test_http_responses_bridge.py index f6df026b..1f290b20 100644 --- a/tests/integration/test_http_responses_bridge.py +++ b/tests/integration/test_http_responses_bridge.py @@ -5561,6 +5561,105 @@ async def fake_connect_responses_websocket( assert connect_count == 1 +@pytest.mark.asyncio +async def test_v1_responses_http_bridge_recovers_previous_response_without_turn_state_alias( + async_client, + monkeypatch, +): + _install_bridge_settings(monkeypatch, enabled=True) + account_id = await _import_account( + async_client, + "acc_http_bridge_previous_response_recover", + "http-bridge-previous-response-recover@example.com", + ) + account = await _get_account(account_id) + fake_upstream = _FakeBridgeUpstreamWebSocket() + connect_count = 0 + + async def fake_select_account_with_budget( + self, + deadline, + *, + request_id, + kind, + sticky_key, + sticky_kind, + reallocate_sticky, + sticky_max_age_seconds, + prefer_earlier_reset_accounts, + routing_strategy, + model, + exclude_account_ids=None, + additional_limit_name=None, + ): + del ( + self, + deadline, + request_id, + kind, + sticky_key, + sticky_kind, + reallocate_sticky, + sticky_max_age_seconds, + prefer_earlier_reset_accounts, + routing_strategy, + model, + exclude_account_ids, + additional_limit_name, + ) + return AccountSelection(account=account, error_message=None, error_code=None) + + async def fake_ensure_fresh_with_budget(self, target, *, force=False, timeout_seconds): + del self, force, timeout_seconds + return target + + async def fake_connect_responses_websocket( + headers, + access_token, + account_id_header, + *, + base_url=None, + session=None, + ): + del headers, access_token, account_id_header, base_url, session + nonlocal connect_count + connect_count += 1 + return fake_upstream + + monkeypatch.setattr(proxy_module.ProxyService, "_select_account_with_budget", fake_select_account_with_budget) + monkeypatch.setattr(proxy_module.ProxyService, "_ensure_fresh_with_budget", fake_ensure_fresh_with_budget) + monkeypatch.setattr(proxy_module, "connect_responses_websocket", fake_connect_responses_websocket) + + first = await async_client.post( + "/v1/responses", + json={ + "model": "gpt-5.1", + "instructions": "Return exactly OK.", + "input": "hello", + "prompt_cache_key": "recover-previous-response-a", + }, + ) + assert first.status_code == 200 + first_body = first.json() + + second = await async_client.post( + "/v1/responses", + json={ + "model": "gpt-5.1", + "instructions": "Return exactly OK.", + "input": "hello-again", + # Intentionally rotate the cache key so continuity has to recover from previous_response_id. + "prompt_cache_key": "recover-previous-response-b", + "previous_response_id": first_body["id"], + }, + ) + + assert second.status_code == 200 + second_body = second.json() + assert second_body["id"] != first_body["id"] + assert connect_count == 1 + + @pytest.mark.asyncio async def test_v1_responses_http_bridge_precreated_disconnect_returns_previous_response_not_found( async_client,