-
Notifications
You must be signed in to change notification settings - Fork 87
feat(v1-usage): add credit-based Codex override windows #304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| """add credit-based api key limit enum values | ||
|
|
||
| Revision ID: 20260403_000000_add_credit_api_key_limit_values | ||
| Revises: 20260402_000000_switch_dashboard_routing_default_to_capacity_weighted | ||
| Create Date: 2026-04-03 | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import sqlalchemy as sa | ||
| from alembic import op | ||
| from sqlalchemy.engine import Connection | ||
|
|
||
| revision = "20260403_000000_add_credit_api_key_limit_values" | ||
| down_revision = "20260402_000000_switch_dashboard_routing_default_to_capacity_weighted" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def _table_exists(connection: Connection, table_name: str) -> bool: | ||
| inspector = sa.inspect(connection) | ||
| return inspector.has_table(table_name) | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| bind = op.get_bind() | ||
| if not _table_exists(bind, "api_key_limits"): | ||
| return | ||
| if bind.dialect.name != "postgresql": | ||
| return | ||
|
|
||
| op.execute(sa.text("ALTER TYPE limit_type ADD VALUE IF NOT EXISTS 'credits'")) | ||
| op.execute(sa.text("ALTER TYPE limit_window ADD VALUE IF NOT EXISTS '5h'")) | ||
| op.execute(sa.text("ALTER TYPE limit_window ADD VALUE IF NOT EXISTS '7d'")) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| bind = op.get_bind() | ||
| if not _table_exists(bind, "api_key_limits"): | ||
| return | ||
| if bind.dialect.name != "postgresql": | ||
| return | ||
| # PostgreSQL enum values cannot be removed safely in-place here. | ||
| return |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,12 +3,16 @@ | |
| import logging | ||
| import time | ||
| from collections.abc import AsyncIterator, Mapping | ||
| from datetime import datetime, timezone | ||
| from typing import cast | ||
|
|
||
| from fastapi import APIRouter, Body, Depends, File, Form, Request, Response, Security, UploadFile, WebSocket | ||
| from fastapi.responses import JSONResponse, StreamingResponse | ||
| from pydantic import ValidationError | ||
| from sqlalchemy import select | ||
| from sqlalchemy.ext.asyncio import AsyncSession | ||
|
|
||
| from app.core import usage as usage_core | ||
| from app.core.auth.dependencies import ( | ||
| set_openai_error_format, | ||
| validate_codex_usage_identity, | ||
|
|
@@ -39,15 +43,19 @@ | |
| from app.core.openai.v1_requests import V1ResponsesCompactRequest, V1ResponsesRequest | ||
| from app.core.runtime_logging import log_error_response | ||
| from app.core.types import JsonValue | ||
| from app.core.usage.types import UsageWindowRow | ||
| from app.core.utils.json_guards import is_json_mapping | ||
| from app.core.utils.sse import parse_sse_data_json | ||
| from app.db.models import Account, UsageHistory | ||
| from app.db.session import get_background_session | ||
| from app.dependencies import ProxyContext, get_proxy_context, get_proxy_websocket_context | ||
| from app.modules.api_keys.repository import ApiKeysRepository | ||
| from app.modules.api_keys.service import ( | ||
| ApiKeyData, | ||
| ApiKeyInvalidError, | ||
| ApiKeyRateLimitExceededError, | ||
| ApiKeySelfLimitData, | ||
| ApiKeySelfUsageData, | ||
| ApiKeysService, | ||
| ApiKeyUsageReservationData, | ||
| ) | ||
|
|
@@ -69,6 +77,7 @@ | |
| V1UsageLimitResponse, | ||
| V1UsageResponse, | ||
| ) | ||
| from app.modules.usage.repository import UsageRepository | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -251,6 +260,7 @@ async def v1_usage( | |
| async with get_background_session() as session: | ||
| service = ApiKeysService(ApiKeysRepository(session)) | ||
| usage = await service.get_key_usage_summary_for_self(api_key.id) | ||
| aggregate_limits = await _build_aggregate_credit_limits(session) | ||
|
|
||
| if usage is None: | ||
| raise ProxyAuthError("Invalid API key") | ||
|
|
@@ -260,18 +270,124 @@ async def v1_usage( | |
| total_tokens=usage.total_tokens, | ||
| cached_input_tokens=usage.cached_input_tokens, | ||
| total_cost_usd=usage.total_cost_usd, | ||
| limits=[ | ||
| V1UsageLimitResponse( | ||
| limit_type=limit.limit_type, | ||
| limit_window=limit.limit_window, | ||
| max_value=limit.max_value, | ||
| current_value=limit.current_value, | ||
| remaining_value=limit.remaining_value, | ||
| model_filter=limit.model_filter, | ||
| reset_at=limit.reset_at.isoformat() + "Z", | ||
| ) | ||
| for limit in usage.limits | ||
| ], | ||
| limits=_build_v1_usage_limits(usage, aggregate_limits), | ||
| ) | ||
|
|
||
|
|
||
| def _build_v1_usage_limits( | ||
| usage: ApiKeySelfUsageData, | ||
| aggregate_limits: dict[str, V1UsageLimitResponse], | ||
| ) -> list[V1UsageLimitResponse]: | ||
| raw_limits = [_to_v1_usage_limit_response(limit) for limit in usage.limits] | ||
| credit_overrides = { | ||
| limit.limit_window: limit | ||
| for limit in usage.limits | ||
| if limit.limit_type == "credits" and limit.model_filter is None | ||
| } | ||
|
|
||
| if aggregate_limits: | ||
| merged: list[V1UsageLimitResponse] = [] | ||
| for window in ("5h", "7d"): | ||
| aggregate = aggregate_limits.get(window) | ||
| if aggregate is None: | ||
| continue | ||
| merged.append(_apply_credit_override(aggregate, credit_overrides.get(window))) | ||
| if {item.limit_window for item in merged} == {"5h", "7d"}: | ||
| return merged | ||
|
|
||
| return raw_limits | ||
|
|
||
|
|
||
| def _to_v1_usage_limit_response(limit: ApiKeySelfLimitData) -> V1UsageLimitResponse: | ||
| current_value = max(0, min(limit.current_value, limit.max_value)) | ||
| return V1UsageLimitResponse( | ||
| limit_type=limit.limit_type, | ||
| limit_window=limit.limit_window, | ||
| max_value=limit.max_value, | ||
| current_value=current_value, | ||
| remaining_value=max(0, limit.max_value - current_value), | ||
| model_filter=limit.model_filter, | ||
| reset_at=limit.reset_at.isoformat() + "Z", | ||
| source=limit.source, | ||
| ) | ||
|
|
||
|
|
||
| def _apply_credit_override( | ||
| aggregate_limit: V1UsageLimitResponse, | ||
| override_limit: ApiKeySelfLimitData | None, | ||
| ) -> V1UsageLimitResponse: | ||
| if override_limit is None: | ||
| return aggregate_limit | ||
|
|
||
| override_max = max(0, override_limit.max_value) | ||
| current_value = max(0, min(aggregate_limit.current_value, override_max)) | ||
| return V1UsageLimitResponse( | ||
| limit_type="credits", | ||
| limit_window=aggregate_limit.limit_window, | ||
| max_value=override_max, | ||
| current_value=current_value, | ||
| remaining_value=max(0, override_max - current_value), | ||
| model_filter=None, | ||
| reset_at=aggregate_limit.reset_at, | ||
| source="api_key_override", | ||
| ) | ||
|
|
||
|
|
||
| async def _build_aggregate_credit_limits(session: AsyncSession) -> dict[str, V1UsageLimitResponse]: | ||
| usage_repository = UsageRepository(session) | ||
| primary_latest = await usage_repository.latest_by_account(window="primary") | ||
| secondary_latest = await usage_repository.latest_by_account(window="secondary") | ||
|
|
||
| primary_rows = [_usage_entry_to_window_row(entry) for entry in primary_latest.values()] | ||
| secondary_rows = [_usage_entry_to_window_row(entry) for entry in secondary_latest.values()] | ||
| primary_rows, secondary_rows = usage_core.normalize_weekly_only_rows(primary_rows, secondary_rows) | ||
|
|
||
| account_ids = {row.account_id for row in primary_rows} | {row.account_id for row in secondary_rows} | ||
| if not account_ids: | ||
| return {} | ||
|
|
||
| account_map = {account.id: account for account in await _load_accounts_by_id(session, account_ids)} | ||
| limits: dict[str, V1UsageLimitResponse] = {} | ||
|
|
||
| for window_key, rows, label in (("primary", primary_rows, "5h"), ("secondary", secondary_rows, "7d")): | ||
| if not rows: | ||
| continue | ||
| summary = usage_core.summarize_usage_window(rows, account_map, window_key) | ||
| max_value = max(0, int(round(summary.capacity_credits or 0.0))) | ||
| if max_value <= 0: | ||
| continue | ||
| current_value = max(0, min(int(round(summary.used_credits or 0.0)), max_value)) | ||
| reset_at = "" | ||
| if summary.reset_at is not None: | ||
| reset_at = datetime.fromtimestamp(summary.reset_at, tz=timezone.utc).isoformat().replace("+00:00", "Z") | ||
| limits[label] = V1UsageLimitResponse( | ||
| limit_type="credits", | ||
| limit_window=label, | ||
| max_value=max_value, | ||
| current_value=current_value, | ||
| remaining_value=max(0, max_value - current_value), | ||
| model_filter=None, | ||
| reset_at=reset_at, | ||
| source="aggregate", | ||
| ) | ||
|
|
||
| return limits | ||
|
|
||
|
|
||
| async def _load_accounts_by_id(session: AsyncSession, account_ids: set[str]) -> list[Account]: | ||
| if not account_ids: | ||
| return [] | ||
| result = await session.execute(select(Account).where(Account.id.in_(account_ids))) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The aggregate credit window calculation loads accounts by ID without filtering status, so paused/deactivated accounts with historical usage can still contribute capacity and usage. In environments with disabled accounts, this inflates Useful? React with 👍 / 👎. |
||
| return list(result.scalars().all()) | ||
|
|
||
|
|
||
| def _usage_entry_to_window_row(entry: UsageHistory) -> UsageWindowRow: | ||
| return UsageWindowRow( | ||
| account_id=entry.account_id, | ||
| used_percent=entry.used_percent, | ||
| reset_at=entry.reset_at, | ||
| window_minutes=entry.window_minutes, | ||
| recorded_at=entry.recorded_at, | ||
| ) | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an upstream usage row has
reset_at=None(which is a valid persisted state),_build_aggregate_credit_limitsstill emits aV1UsageLimitResponsewithreset_at="". Consumers of/v1/usagethat parsereset_atas an ISO timestamp can fail or mis-handle the window countdown for those rows. This should return a valid timestamp or omit/fallback the aggregate row when reset time is unavailable.Useful? React with 👍 / 👎.