Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
06aa47a
feat(proxy): durable http bridge ownership
aaiyer Mar 21, 2026
b30b937
fix(review): address durable bridge regressions
aaiyer Mar 22, 2026
bf5ed8d
fix(proxy): harden durable http bridge leases
aaiyer Mar 22, 2026
2245e2d
fix(test): remove stale bridge inflight cleanup
aaiyer Mar 22, 2026
58d1c92
fix(proxy): preserve recovered bridge ownership
aaiyer Mar 22, 2026
9e45f8b
fix(proxy): handle stale bridge owners and legacy replays
aaiyer Mar 23, 2026
fb3b9ca
fix(proxy): rebind recovered bridge affinity
aaiyer Mar 23, 2026
1d2c5ce
fix(proxy): restore turn-state creation compatibility
aaiyer Mar 25, 2026
a062430
fix(proxy): harden bridge turn-state recovery
aaiyer Mar 25, 2026
ac0ecc4
fix(proxy): stabilize bridge ownership recovery
aaiyer Mar 25, 2026
ba1a846
fix(proxy): preserve bridge turn-state rollout compatibility
aaiyer Mar 25, 2026
f5230aa
fix(proxy): preserve bridge lease handoff on replay
aaiyer Mar 25, 2026
e5ca193
fix(proxy): keep bridge leases alive per worker
aaiyer Mar 25, 2026
e02297d
fix(proxy): recover signed bridge replays after restart
aaiyer Mar 25, 2026
d4844aa
fix(proxy): preserve recovered bridge aliases
aaiyer Mar 25, 2026
527fa75
fix(proxy): invalidate bridge sessions on lease loss
aaiyer Mar 25, 2026
437dda2
fix(proxy): align bridge recovery and lease state
aaiyer Mar 25, 2026
8a29fca
fix(proxy): unify bridge invalidation lifecycle
aaiyer Mar 25, 2026
107b5e8
fix(proxy): restore repo bundle compatibility
aaiyer Mar 25, 2026
7c40c44
fix(proxy): guard bridge lease expiry and reconnect failure
aaiyer Mar 25, 2026
7a41983
fix(proxy): scope bridge ownership to replicas
aaiyer Mar 25, 2026
d819475
fix(proxy): split bridge replay and worker ownership
aaiyer Mar 25, 2026
fb293ad
fix(proxy): fail closed on unreadable bridge owners
aaiyer Mar 25, 2026
c46418a
fix(proxy): preserve bridge continuity across replay races
aaiyer Mar 25, 2026
ddcd1e9
fix(proxy): fail turn-state registration on lease loss
aaiyer Mar 25, 2026
c4eeff1
fix(proxy): canonicalize signed bridge turn states
aaiyer Mar 25, 2026
add9652
fix(proxy): claim bridge leases by stable affinity
aaiyer Mar 25, 2026
cf6b726
fix(proxy): preserve bridge reconnect lease handoff
aaiyer Mar 25, 2026
9624c4d
fix(proxy): restore bridge lease persist hook
aaiyer Mar 25, 2026
fc2b819
fix(proxy): expire recovered stale turn states
aaiyer Mar 25, 2026
8fb2e2c
style(proxy): format bridge service and tests
aaiyer Mar 25, 2026
b901fa5
fix(ci): satisfy bridge type checks
aaiyer Mar 25, 2026
7412ba6
fix(proxy): serialize http bridge lease cleanup
aaiyer Mar 25, 2026
2def1f8
fix(proxy): avoid http bridge cleanup deadlock
aaiyer Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions app/db/alembic/versions/20260321_120000_add_http_bridge_leases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""add http_bridge_leases table

Revision ID: 20260321_120000_add_http_bridge_leases
Revises: 20260320_000000_add_request_log_requested_actual_tiers
Create Date: 2026-03-21
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy.engine import Connection

# revision identifiers, used by Alembic.
revision = "20260321_120000_add_http_bridge_leases"
down_revision = "20260320_000000_add_request_log_requested_actual_tiers"
branch_labels = None
depends_on = None


def _table_exists(connection: Connection, table_name: str) -> bool:
inspector = sa.inspect(connection)
return inspector.has_table(table_name)


def _index_exists(connection: Connection, index_name: str, table_name: str) -> bool:
inspector = sa.inspect(connection)
if not inspector.has_table(table_name):
return False
return any(index["name"] == index_name for index in inspector.get_indexes(table_name))


def upgrade() -> None:
bind = op.get_bind()
if not _table_exists(bind, "http_bridge_leases"):
op.create_table(
"http_bridge_leases",
sa.Column("session_id", sa.String(), primary_key=True),
sa.Column("affinity_kind", sa.String(), nullable=False),
sa.Column("affinity_key", sa.String(), nullable=False),
sa.Column("api_key_scope", sa.String(), nullable=False, server_default=sa.text("''")),
sa.Column("owner_instance_id", sa.String(), nullable=False),
sa.Column("lease_expires_at", sa.DateTime(), nullable=False),
sa.Column("account_id", sa.String(), nullable=True),
sa.Column("request_model", sa.String(), nullable=True),
sa.Column("codex_session", sa.Boolean(), nullable=False, server_default=sa.false()),
sa.Column("idle_ttl_seconds", sa.Float(), nullable=False),
sa.Column("upstream_turn_state", sa.String(), nullable=True),
sa.Column("downstream_turn_state", sa.String(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.now(), nullable=False),
)
if not _index_exists(bind, "ix_http_bridge_leases_owner_expires", "http_bridge_leases"):
op.create_index(
"ix_http_bridge_leases_owner_expires",
"http_bridge_leases",
["owner_instance_id", "lease_expires_at"],
)
if not _index_exists(bind, "ix_http_bridge_leases_expires", "http_bridge_leases"):
op.create_index(
"ix_http_bridge_leases_expires",
"http_bridge_leases",
["lease_expires_at"],
)


def downgrade() -> None:
bind = op.get_bind()
if _table_exists(bind, "http_bridge_leases"):
if _index_exists(bind, "ix_http_bridge_leases_expires", "http_bridge_leases"):
op.drop_index("ix_http_bridge_leases_expires", table_name="http_bridge_leases")
if _index_exists(bind, "ix_http_bridge_leases_owner_expires", "http_bridge_leases"):
op.drop_index("ix_http_bridge_leases_owner_expires", table_name="http_bridge_leases")
op.drop_table("http_bridge_leases")
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""merge http bridge lease head

Revision ID: 20260322_000000_merge_http_bridge_lease_head
Revises: 20260321_120000_add_http_bridge_leases, 20260321_210000_merge_request_log_tiers_and_dashboard_index_heads
Create Date: 2026-03-22
"""

from __future__ import annotations

# revision identifiers, used by Alembic.
revision = "20260322_000000_merge_http_bridge_lease_head"
down_revision = (
"20260321_120000_add_http_bridge_leases",
"20260321_210000_merge_request_log_tiers_and_dashboard_index_heads",
)
branch_labels = None
depends_on = None


def upgrade() -> None:
pass


def downgrade() -> None:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""enforce http bridge lease affinity uniqueness

Revision ID: 20260325_120000_enforce_http_bridge_lease_affinity_uniqueness
Revises: 20260322_000000_merge_http_bridge_lease_head
Create Date: 2026-03-25
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy.engine import Connection

# revision identifiers, used by Alembic.
revision = "20260325_120000_enforce_http_bridge_lease_affinity_uniqueness"
down_revision = "20260322_000000_merge_http_bridge_lease_head"
branch_labels = None
depends_on = None


def _table_exists(connection: Connection, table_name: str) -> bool:
inspector = sa.inspect(connection)
return inspector.has_table(table_name)


def _index_exists(connection: Connection, index_name: str, table_name: str) -> bool:
inspector = sa.inspect(connection)
if not inspector.has_table(table_name):
return False
return any(index["name"] == index_name for index in inspector.get_indexes(table_name))


def upgrade() -> None:
bind = op.get_bind()
if not _table_exists(bind, "http_bridge_leases"):
return
op.execute(
sa.text(
"""
DELETE FROM http_bridge_leases
WHERE session_id IN (
SELECT session_id
FROM (
SELECT
session_id,
ROW_NUMBER() OVER (
PARTITION BY affinity_kind, affinity_key, api_key_scope
ORDER BY lease_expires_at DESC, updated_at DESC, created_at DESC, session_id DESC
) AS row_num
FROM http_bridge_leases
) ranked_leases
WHERE ranked_leases.row_num > 1
)
"""
)
)
if not _index_exists(bind, "ux_http_bridge_leases_affinity_scope", "http_bridge_leases"):
op.create_index(
"ux_http_bridge_leases_affinity_scope",
"http_bridge_leases",
["affinity_kind", "affinity_key", "api_key_scope"],
unique=True,
)


def downgrade() -> None:
bind = op.get_bind()
if _table_exists(bind, "http_bridge_leases") and _index_exists(
bind,
"ux_http_bridge_leases_affinity_scope",
"http_bridge_leases",
):
op.drop_index("ux_http_bridge_leases_affinity_scope", table_name="http_bridge_leases")
35 changes: 35 additions & 0 deletions app/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,41 @@ class StickySession(Base):
)


class HttpBridgeLease(Base):
__tablename__ = "http_bridge_leases"
__table_args__ = (
Index(
"ux_http_bridge_leases_affinity_scope",
"affinity_kind",
"affinity_key",
"api_key_scope",
unique=True,
),
Index("ix_http_bridge_leases_owner_expires", "owner_instance_id", "lease_expires_at"),
Index("ix_http_bridge_leases_expires", "lease_expires_at"),
)

session_id: Mapped[str] = mapped_column(String, primary_key=True)
affinity_kind: Mapped[str] = mapped_column(String, nullable=False)
affinity_key: Mapped[str] = mapped_column(String, nullable=False)
api_key_scope: Mapped[str] = mapped_column(String, nullable=False, default="", server_default=text("''"))
owner_instance_id: Mapped[str] = mapped_column(String, nullable=False)
lease_expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
account_id: Mapped[str | None] = mapped_column(String, nullable=True)
request_model: Mapped[str | None] = mapped_column(String, nullable=True)
codex_session: Mapped[bool] = mapped_column(Boolean, default=False, server_default=false(), nullable=False)
idle_ttl_seconds: Mapped[float] = mapped_column(Float, nullable=False)
upstream_turn_state: Mapped[str | None] = mapped_column(String, nullable=True)
downstream_turn_state: Mapped[str | None] = mapped_column(String, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), nullable=False)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
server_default=func.now(),
onupdate=func.now(),
nullable=False,
)


class DashboardSettings(Base):
__tablename__ = "dashboard_settings"

Expand Down
2 changes: 2 additions & 0 deletions app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from app.modules.firewall.repository import FirewallRepository
from app.modules.firewall.service import FirewallService
from app.modules.oauth.service import OauthService
from app.modules.proxy.bridge_repository import HttpBridgeLeasesRepository
from app.modules.proxy.repo_bundle import ProxyRepositories
from app.modules.proxy.service import ProxyService
from app.modules.proxy.sticky_repository import StickySessionsRepository
Expand Down Expand Up @@ -151,6 +152,7 @@ async def _proxy_repo_context() -> AsyncIterator[ProxyRepositories]:
usage=UsageRepository(session),
request_logs=RequestLogsRepository(session),
sticky_sessions=StickySessionsRepository(session),
http_bridge_leases=HttpBridgeLeasesRepository(session),
api_keys=ApiKeysRepository(session),
additional_usage=AdditionalUsageRepository(session),
)
Expand Down
24 changes: 9 additions & 15 deletions app/modules/proxy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,13 +445,9 @@ async def _stream_responses(
rate_limit_headers = await context.service.rate_limit_headers()
bridge_active = prefer_http_bridge and proxy_service_module.get_settings().http_responses_session_bridge_enabled
downstream_turn_state = (
proxy_service_module.ensure_http_downstream_turn_state(request.headers) if bridge_active else None
)
turn_state_headers = (
proxy_service_module.build_downstream_turn_state_response_headers(downstream_turn_state)
if downstream_turn_state is not None
else {}
proxy_service_module.requested_http_downstream_turn_state(request.headers) if bridge_active else None
)
turn_state_headers: dict[str, str] = {}
payload.stream = True
if prefer_http_bridge:
stream = context.service.stream_http_responses(
Expand All @@ -464,6 +460,7 @@ async def _stream_responses(
api_key_reservation=reservation,
suppress_text_done_events=suppress_text_done_events,
downstream_turn_state=downstream_turn_state,
response_headers_out=turn_state_headers,
)
else:
stream = context.service.stream_responses(
Expand All @@ -482,15 +479,15 @@ async def _stream_responses(
return StreamingResponse(
_prepend_first(None, stream),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", **rate_limit_headers},
headers={"Cache-Control": "no-cache", **turn_state_headers, **rate_limit_headers},
)
except ProxyResponseError as exc:
await _release_reservation(reservation)
return _logged_error_json_response(
request,
exc.status_code,
exc.payload,
headers=rate_limit_headers,
headers={**turn_state_headers, **rate_limit_headers},
)
return StreamingResponse(
_prepend_first(first, stream),
Expand Down Expand Up @@ -521,13 +518,9 @@ async def _collect_responses(
rate_limit_headers = await context.service.rate_limit_headers()
bridge_active = prefer_http_bridge and proxy_service_module.get_settings().http_responses_session_bridge_enabled
downstream_turn_state = (
proxy_service_module.ensure_http_downstream_turn_state(request.headers) if bridge_active else None
)
turn_state_headers = (
proxy_service_module.build_downstream_turn_state_response_headers(downstream_turn_state)
if downstream_turn_state is not None
else {}
proxy_service_module.requested_http_downstream_turn_state(request.headers) if bridge_active else None
)
turn_state_headers: dict[str, str] = {}
payload.stream = True
if prefer_http_bridge:
stream = context.service.stream_http_responses(
Expand All @@ -540,6 +533,7 @@ async def _collect_responses(
api_key_reservation=reservation,
suppress_text_done_events=suppress_text_done_events,
downstream_turn_state=downstream_turn_state,
response_headers_out=turn_state_headers,
)
else:
stream = context.service.stream_responses(
Expand All @@ -561,7 +555,7 @@ async def _collect_responses(
request,
exc.status_code,
error.model_dump(mode="json", exclude_none=True),
headers=rate_limit_headers,
headers={**turn_state_headers, **rate_limit_headers},
)
if isinstance(response_payload, OpenAIResponsePayload):
if response_payload.status == "failed":
Expand Down
Loading
Loading