From 83688fe3a49c74bb2d8522ba9d275fad8c5fb0b8 Mon Sep 17 00:00:00 2001 From: charles Date: Fri, 8 Aug 2025 20:12:25 -0700 Subject: [PATCH] FG-185 Add statistic usage api --- .gitignore | 3 + app/api/routes/statistic.py | 203 +++++++++++++++++++++++++++++++++++ app/api/schemas/statistic.py | 61 +++++++++++ app/main.py | 2 + 4 files changed, 269 insertions(+) create mode 100644 app/api/routes/statistic.py create mode 100644 app/api/schemas/statistic.py diff --git a/.gitignore b/.gitignore index bf0056c..a7bda6e 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,6 @@ venv.bak/ # Performance test results tests/performance/results/ .coverage* + +# Local test script +tools/local_test_script.py diff --git a/app/api/routes/statistic.py b/app/api/routes/statistic.py new file mode 100644 index 0000000..df9b636 --- /dev/null +++ b/app/api/routes/statistic.py @@ -0,0 +1,203 @@ +from fastapi import APIRouter, Depends, Query +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, desc, func +from sqlalchemy.sql.functions import coalesce +from datetime import datetime, timedelta, UTC +from enum import StrEnum + +from app.api.dependencies import get_async_db, get_current_active_user +from app.models.user import User +from app.models.usage_tracker import UsageTracker +from app.models.provider_key import ProviderKey +from app.models.forge_api_key import ForgeApiKey +from app.api.schemas.statistic import ( + UsageRealtimeResponse, + UsageSummaryResponse, + ForgeKeyUsageSummaryResponse, +) + +router = APIRouter() + + +# I want a query parameter called "offset: " and "limit: " +@router.get("/usage/realtime", response_model=list[UsageRealtimeResponse]) +async def get_usage_realtime( + current_user: User = Depends(get_current_active_user), + db: AsyncSession = Depends(get_async_db), + offset: int = Query(0, ge=0), + limit: int = Query(10, ge=1), +): + """ + Get real-time usage statistics for the current user up to the last 7 days. + """ + # Calculate the date 7 days ago + seven_days_ago = datetime.now(UTC) - timedelta(days=7) + + # Build the query + query = ( + select( + UsageTracker.created_at.label("timestamp"), + coalesce(ForgeApiKey.name, ForgeApiKey.key).label("forge_key"), + ProviderKey.provider_name.label("provider_name"), + UsageTracker.model.label("model_name"), + (UsageTracker.input_tokens + UsageTracker.output_tokens).label("tokens"), + func.extract( + "epoch", UsageTracker.updated_at - UsageTracker.created_at + ).label("duration"), + ) + .join(ProviderKey, UsageTracker.provider_key_id == ProviderKey.id) + .join(ForgeApiKey, UsageTracker.forge_key_id == ForgeApiKey.id) + .where( + UsageTracker.user_id == current_user.id, + UsageTracker.created_at >= seven_days_ago, + ) + .order_by(desc(UsageTracker.created_at)) + .offset(offset) + .limit(limit) + ) + + # Execute the query + result = await db.execute(query) + rows = result.fetchall() + + # Convert to list of dictionaries + usage_stats = [] + for row in rows: + usage_stats.append( + { + "timestamp": row.timestamp, + "forge_key": row.forge_key, + "provider_name": row.provider_name, + "model_name": row.model_name, + "tokens": row.tokens, + "duration": round(float(row.duration), 2) + if row.duration is not None + else 0.0, + } + ) + print(usage_stats) + + return [UsageRealtimeResponse(**usage_stat) for usage_stat in usage_stats] + + +class UsageSummaryTimeSpan(StrEnum): + day = "day" + week = "week" + month = "month" + + +@router.get("/usage/summary", response_model=list[UsageSummaryResponse]) +async def get_usage_summary( + current_user: User = Depends(get_current_active_user), + db: AsyncSession = Depends(get_async_db), + span: UsageSummaryTimeSpan = Query(UsageSummaryTimeSpan.week), +): + """ + Get usage summary for the current user for the past day/week/month + """ + start_time = None + if span == UsageSummaryTimeSpan.day: + start_time = datetime.now(UTC) - timedelta(days=1) + elif span == UsageSummaryTimeSpan.week: + start_time = datetime.now(UTC) - timedelta(weeks=1) + elif span == UsageSummaryTimeSpan.month: + start_time = datetime.now(UTC) - timedelta(days=30) + + # Build the query based on time span + if span == UsageSummaryTimeSpan.day: + # For daily span, group by hour + time_group = func.date_trunc("hour", UsageTracker.created_at) + else: + # For weekly/monthly span, group by day + time_group = func.date_trunc("day", UsageTracker.created_at) + + query = ( + select( + time_group.label("time_point"), + coalesce(ForgeApiKey.name, ForgeApiKey.key).label("forge_key"), + func.sum(UsageTracker.input_tokens + UsageTracker.output_tokens).label( + "tokens" + ), + ) + .join(ForgeApiKey, UsageTracker.forge_key_id == ForgeApiKey.id) + .where( + UsageTracker.user_id == current_user.id, + UsageTracker.created_at >= start_time, + ) + .group_by(time_group, ForgeApiKey.name, ForgeApiKey.key) + .order_by(time_group, desc("tokens"), "forge_key") + ) + + # Execute the query + result = await db.execute(query) + rows = result.fetchall() + + data_points = dict() + for row in rows: + if row.time_point not in data_points: + data_points[row.time_point] = {"breakdown": [], "total_tokens": 0} + data_points[row.time_point]["breakdown"].append( + {"forge_key": row.forge_key, "tokens": row.tokens} + ) + data_points[row.time_point]["total_tokens"] += row.tokens + + return [ + UsageSummaryResponse( + time_point=time_point, + breakdown=data_point["breakdown"], + total_tokens=data_point["total_tokens"], + ) + for time_point, data_point in data_points.items() + ] + + +class ForgeKeyUsageTimeSpan(StrEnum): + day = "day" + week = "week" + month = "month" + year = "year" + all = "all" + + +@router.get("/forge-key/usage", response_model=list[ForgeKeyUsageSummaryResponse]) +async def get_forge_key_usage( + current_user: User = Depends(get_current_active_user), + db: AsyncSession = Depends(get_async_db), + span: ForgeKeyUsageTimeSpan = Query(ForgeKeyUsageTimeSpan.week), +): + """ + Get usage summary for all the forge keys for the past day/week/month/year/all + """ + start_time = None + if span == ForgeKeyUsageTimeSpan.day: + start_time = datetime.now(UTC) - timedelta(days=1) + elif span == ForgeKeyUsageTimeSpan.week: + start_time = datetime.now(UTC) - timedelta(weeks=1) + elif span == ForgeKeyUsageTimeSpan.month: + start_time = datetime.now(UTC) - timedelta(days=30) + elif span == ForgeKeyUsageTimeSpan.year: + start_time = datetime.now(UTC) - timedelta(days=365) + + query = ( + select( + coalesce(ForgeApiKey.name, ForgeApiKey.key).label("forge_key"), + func.sum(UsageTracker.input_tokens + UsageTracker.output_tokens).label( + "tokens" + ), + ) + .join(ForgeApiKey, UsageTracker.forge_key_id == ForgeApiKey.id) + .where( + UsageTracker.user_id == current_user.id, + start_time is None or UsageTracker.created_at >= start_time, + ) + .group_by(ForgeApiKey.name, ForgeApiKey.key) + .order_by(desc("tokens"), "forge_key") + ) + + result = await db.execute(query) + rows = result.fetchall() + + return [ + ForgeKeyUsageSummaryResponse(forge_key=row.forge_key, tokens=row.tokens) + for row in rows + ] diff --git a/app/api/schemas/statistic.py b/app/api/schemas/statistic.py new file mode 100644 index 0000000..c90cd4c --- /dev/null +++ b/app/api/schemas/statistic.py @@ -0,0 +1,61 @@ +from pydantic import BaseModel, field_validator +from datetime import datetime +import re + +from app.api.schemas.forge_api_key import ForgeApiKeyMasked + +def mask_forge_name_or_key(v: str) -> str: + # If the forge key is a valid forge key, mask it + if re.match(r"forge-\w{18}", v): + return ForgeApiKeyMasked.mask_api_key(v) + # Otherwise, return the original value (user customized name) + return v + +class UsageRealtimeResponse(BaseModel): + timestamp: datetime + forge_key: str + provider_name: str + model_name: str + tokens: int + duration: float + + @field_validator('forge_key') + @classmethod + def mask_forge_key(cls, v: str) -> str: + return mask_forge_name_or_key(v) + + @field_validator('timestamp') + @classmethod + def convert_timestamp_to_iso(cls, v: datetime) -> str: + return v.isoformat() + + +class UsageSummaryBreakdown(BaseModel): + forge_key: str + tokens: int + + @field_validator('forge_key') + @classmethod + def mask_forge_key(cls, v: str) -> str: + return mask_forge_name_or_key(v) + + +class UsageSummaryResponse(BaseModel): + time_point: datetime + breakdown: list[UsageSummaryBreakdown] + total_tokens: int + + @field_validator('time_point') + @classmethod + def convert_timestamp_to_iso(cls, v: datetime) -> str: + return v.isoformat() + + +class ForgeKeyUsageSummaryResponse(BaseModel): + forge_key: str + tokens: int + + @field_validator('forge_key') + @classmethod + def mask_forge_key(cls, v: str) -> str: + return mask_forge_name_or_key(v) \ No newline at end of file diff --git a/app/main.py b/app/main.py index 12a9f8c..4f1a674 100644 --- a/app/main.py +++ b/app/main.py @@ -15,6 +15,7 @@ claude_code, provider_keys, proxy, + statistic, stats, users, webhooks, @@ -167,6 +168,7 @@ def create_app() -> FastAPI: v1_router.include_router(proxy.router, tags=["proxy"]) v1_router.include_router(stats.router, prefix="/stats", tags=["stats"]) v1_router.include_router(webhooks.router, prefix="/webhooks", tags=["webhooks"]) + v1_router.include_router(statistic.router, prefix='/statistic', tags=["statistic"]) # Claude Code compatible API endpoints v1_router.include_router(claude_code.router, tags=["Claude Code API"])