使用记录
+查看 API 调用记录与统计。
+| 时间 | +类型 | +模型 | +状态 | +流式 | +耗时 | +Token | +IP | +
|---|
diff --git a/.gitignore b/.gitignore index 7b897ce0..d990d28a 100644 --- a/.gitignore +++ b/.gitignore @@ -42,6 +42,7 @@ env/ # Logs logs/ +!app/static/logs/ *.log # Data diff --git a/app/api/v1/admin.py b/app/api/v1/admin.py index c27103ad..7c965fd3 100644 --- a/app/api/v1/admin.py +++ b/app/api/v1/admin.py @@ -1781,3 +1781,69 @@ async def _on_item(item: str, res: dict): "task_id": task.id, "total": len(token_list), } + + +# ==================== Usage Logs ==================== + + +@router.get("/admin/logs", response_class=HTMLResponse, include_in_schema=False) +async def admin_logs_page(): + """使用记录页""" + return await render_template("logs/logs.html") + + +@router.get("/api/v1/admin/logs", dependencies=[Depends(verify_api_key)]) +async def get_logs_api( + type: str = Query(default=None), + model: str = Query(default=None), + status: str = Query(default=None), + token_hash: str = Query(default=None), + start_time: int = Query(default=None), + end_time: int = Query(default=None), + page: int = Query(default=1, ge=1), + page_size: int = Query(default=20, ge=1, le=100), +): + """查询使用记录""" + from app.services.usage_log import UsageLogService + + try: + logs, total = await UsageLogService.query( + type=type, + model=model, + status=status, + token_hash=token_hash, + start_time=start_time, + end_time=end_time, + page=page, + page_size=page_size, + ) + return {"data": logs, "total": total, "page": page, "page_size": page_size} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.delete("/api/v1/admin/logs", dependencies=[Depends(verify_api_key)]) +async def delete_logs_api(data: dict): + """清理使用记录""" + from app.services.usage_log import UsageLogService + + before = data.get("before_timestamp") + if not before: + raise HTTPException(status_code=400, detail="Missing before_timestamp") + try: + deleted = await UsageLogService.delete(int(before)) + return {"status": "success", "deleted": deleted} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/api/v1/admin/logs/stats", dependencies=[Depends(verify_api_key)]) +async def get_logs_stats_api(): + """使用记录统计""" + from app.services.usage_log import UsageLogService + + try: + stats = await UsageLogService.stats() + return stats + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/app/api/v1/chat.py b/app/api/v1/chat.py index 6c420567..634090fc 100644 --- a/app/api/v1/chat.py +++ b/app/api/v1/chat.py @@ -4,7 +4,7 @@ from typing import Any, Dict, List, Optional, Union -from fastapi import APIRouter +from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field, field_validator @@ -248,13 +248,18 @@ def validate_request(request: ChatCompletionRequest): @router.post("/chat/completions") -async def chat_completions(request: ChatCompletionRequest): +async def chat_completions(request: ChatCompletionRequest, raw_request: Request): """Chat Completions API - 兼容 OpenAI""" from app.core.logger import logger # 参数验证 validate_request(request) + # 获取客户端 IP + client_ip = raw_request.headers.get("x-forwarded-for", "").split(",")[0].strip() + if not client_ip: + client_ip = raw_request.client.host if raw_request.client else "" + logger.debug(f"Chat request: model={request.model}, stream={request.stream}") # 检测视频模型 @@ -274,6 +279,7 @@ async def chat_completions(request: ChatCompletionRequest): video_length=v_conf.video_length, resolution=v_conf.resolution_name, preset=v_conf.preset, + client_ip=client_ip, ) else: result = await ChatService.completions( @@ -281,6 +287,7 @@ async def chat_completions(request: ChatCompletionRequest): messages=[msg.model_dump() for msg in request.messages], stream=request.stream, thinking=request.thinking, + client_ip=client_ip, ) if isinstance(result, dict): diff --git a/app/api/v1/image.py b/app/api/v1/image.py index ec207c18..62ba9628 100644 --- a/app/api/v1/image.py +++ b/app/api/v1/image.py @@ -8,10 +8,11 @@ import random import re import time +import time as _time from pathlib import Path from typing import List, Optional, Union -from fastapi import APIRouter, File, Form, UploadFile +from fastapi import APIRouter, File, Form, Request, UploadFile from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field, ValidationError @@ -289,17 +290,10 @@ async def call_grok( @router.post("/images/generations") -async def create_image(request: ImageGenerationRequest): - """ - Image Generation API +async def create_image(request: ImageGenerationRequest, raw_request: Request): + """Image Generation API""" + start_time = _time.time() - 流式响应格式: - - event: image_generation.partial_image - - event: image_generation.completed - - 非流式响应格式: - - {"created": ..., "data": [{"b64_json": "..."}], "usage": {...}} - """ # stream 默认为 false if request.stream is None: request.stream = False @@ -481,9 +475,23 @@ async def _fetch_batch(call_target: int): "input_tokens_details": {"text_tokens": 0, "image_tokens": 0}, } + # 记录使用日志 + client_ip = raw_request.headers.get("x-forwarded-for", "").split(",")[0].strip() + if not client_ip: + client_ip = raw_request.client.host if raw_request.client else "" + try: + from app.services.usage_log import UsageLogService + use_time = int((_time.time() - start_time) * 1000) + await UsageLogService.record( + type="image", model=request.model, is_stream=False, + use_time=use_time, status="success", ip=client_ip, + ) + except Exception: + pass + return JSONResponse( content={ - "created": int(time.time()), + "created": int(_time.time()), "data": data, "usage": usage, } @@ -492,6 +500,7 @@ async def _fetch_batch(call_target: int): @router.post("/images/edits") async def edit_image( + raw_request: Request, prompt: str = Form(...), image: List[UploadFile] = File(...), model: Optional[str] = Form("grok-imagine-1.0-edit"), @@ -502,11 +511,9 @@ async def edit_image( style: Optional[str] = Form(None), stream: Optional[bool] = Form(False), ): - """ - Image Edits API + """Image Edits API""" + start_time = _time.time() - 同官方 API 格式,仅支持 multipart/form-data 文件上传 - """ if response_format is None: response_format = resolve_response_format(None) @@ -733,9 +740,23 @@ async def _call_edit(): data = [{response_field: img} for img in selected_images] + # 记录使用日志 + client_ip = raw_request.headers.get("x-forwarded-for", "").split(",")[0].strip() + if not client_ip: + client_ip = raw_request.client.host if raw_request.client else "" + try: + from app.services.usage_log import UsageLogService + use_time = int((_time.time() - start_time) * 1000) + await UsageLogService.record( + type="image", model=edit_request.model, is_stream=False, + use_time=use_time, status="success", ip=client_ip, + ) + except Exception: + pass + return JSONResponse( content={ - "created": int(time.time()), + "created": int(_time.time()), "data": data, "usage": { "total_tokens": 0, diff --git a/app/core/storage.py b/app/core/storage.py index 7bca6f31..310c36f6 100644 --- a/app/core/storage.py +++ b/app/core/storage.py @@ -15,7 +15,7 @@ import hashlib import time import tomllib -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional, Tuple from pathlib import Path from enum import Enum @@ -37,6 +37,8 @@ CONFIG_FILE = DATA_DIR / "config.toml" TOKEN_FILE = DATA_DIR / "token.json" LOCK_DIR = DATA_DIR / ".locks" +LOG_FILE = DATA_DIR / "usage_logs.json" +LOG_MAX_ITEMS = 10000 # LocalStorage 日志上限 # JSON 序列化优化助手函数 @@ -95,6 +97,26 @@ async def acquire_lock(self, name: str, timeout: int = 10): # 默认空实现,用于 fallback yield + # ---- Usage Logs ---- + + async def save_log(self, log: Dict[str, Any]): + """写入单条使用日志(默认空实现)""" + pass + + async def query_logs( + self, filters: Dict[str, Any], page: int = 1, page_size: int = 20 + ) -> Tuple[List[Dict[str, Any]], int]: + """分页查询使用日志(默认空实现)""" + return [], 0 + + async def delete_logs(self, before_timestamp: int) -> int: + """删除指定时间之前的日志(默认空实现)""" + return 0 + + async def stats_logs(self) -> Dict[str, Any]: + """日志统计摘要(默认空实现)""" + return {"total": 0, "success": 0, "error": 0, "today": 0, "models": {}} + async def verify_connection(self) -> bool: """健康检查""" return True @@ -229,6 +251,94 @@ async def save_tokens(self, data: Dict[str, Any]): async def close(self): pass + # ---- Usage Logs (Local) ---- + + async def _load_logs(self) -> List[Dict[str, Any]]: + if not LOG_FILE.exists(): + return [] + try: + async with aiofiles.open(LOG_FILE, "rb") as f: + content = await f.read() + return json_loads(content) if content.strip() else [] + except Exception as e: + logger.error(f"LocalStorage: 加载日志失败: {e}") + return [] + + async def _save_logs(self, logs: List[Dict[str, Any]]): + try: + LOG_FILE.parent.mkdir(parents=True, exist_ok=True) + temp_path = LOG_FILE.with_suffix(".tmp") + async with aiofiles.open(temp_path, "wb") as f: + await f.write(orjson.dumps(logs)) + os.replace(temp_path, LOG_FILE) + except Exception as e: + logger.error(f"LocalStorage: 保存日志失败: {e}") + + async def save_log(self, log: Dict[str, Any]): + async with self._lock: + logs = await self._load_logs() + logs.insert(0, log) + if len(logs) > LOG_MAX_ITEMS: + logs = logs[:LOG_MAX_ITEMS] + await self._save_logs(logs) + + async def query_logs( + self, filters: Dict[str, Any], page: int = 1, page_size: int = 20 + ) -> Tuple[List[Dict[str, Any]], int]: + logs = await self._load_logs() + filtered = self._filter_logs(logs, filters) + total = len(filtered) + start = (page - 1) * page_size + return filtered[start : start + page_size], total + + @staticmethod + def _filter_logs( + logs: List[Dict[str, Any]], filters: Dict[str, Any] + ) -> List[Dict[str, Any]]: + result = logs + for key in ("type", "model", "status", "token_hash"): + if key in filters: + result = [r for r in result if r.get(key) == filters[key]] + if "start_time" in filters: + result = [r for r in result if r.get("created_at", 0) >= filters["start_time"]] + if "end_time" in filters: + result = [r for r in result if r.get("created_at", 0) <= filters["end_time"]] + return result + + async def delete_logs(self, before_timestamp: int) -> int: + async with self._lock: + logs = await self._load_logs() + original = len(logs) + logs = [l for l in logs if l.get("created_at", 0) >= before_timestamp] + deleted = original - len(logs) + if deleted > 0: + await self._save_logs(logs) + return deleted + + async def stats_logs(self) -> Dict[str, Any]: + logs = await self._load_logs() + import calendar, datetime + today_start = int( + datetime.datetime.now() + .replace(hour=0, minute=0, second=0, microsecond=0) + .timestamp() + ) + total = len(logs) + success = sum(1 for l in logs if l.get("status") == "success") + error = total - success + today = sum(1 for l in logs if l.get("created_at", 0) >= today_start) + models: Dict[str, int] = {} + for l in logs: + m = l.get("model", "unknown") + models[m] = models.get(m, 0) + 1 + return { + "total": total, + "success": success, + "error": error, + "today": today, + "models": models, + } + class RedisStorage(BaseStorage): """ @@ -480,6 +590,148 @@ async def save_tokens(self, data: Dict[str, Any]): logger.error(f"RedisStorage: 保存 Token 失败: {e}") raise + # ---- Usage Logs (Redis) ---- + + async def save_log(self, log: Dict[str, Any]): + try: + log_id = log.get("id", "") + created_at = log.get("created_at", 0) + await self.redis.zadd( + "grok2api:logs:timeline", {log_id: created_at} + ) + await self.redis.hset( + f"grok2api:log:{log_id}", mapping={ + k: json_dumps(v) if isinstance(v, (dict, list, bool)) else str(v) + for k, v in log.items() + } + ) + # 超限清理:保留最新 LOG_MAX_ITEMS 条 + count = await self.redis.zcard("grok2api:logs:timeline") + if count > LOG_MAX_ITEMS: + to_remove = await self.redis.zrange( + "grok2api:logs:timeline", 0, count - LOG_MAX_ITEMS - 1 + ) + if to_remove: + pipe = self.redis.pipeline() + for rid in to_remove: + pipe.delete(f"grok2api:log:{rid}") + pipe.zremrangebyrank("grok2api:logs:timeline", 0, count - LOG_MAX_ITEMS - 1) + await pipe.execute() + except Exception as e: + logger.warning(f"RedisStorage: 保存日志失败: {e}") + + async def _get_log_by_id(self, log_id: str) -> Optional[Dict[str, Any]]: + raw = await self.redis.hgetall(f"grok2api:log:{log_id}") + if not raw: + return None + result = {} + for k, v in raw.items(): + try: + result[k] = json_loads(v) + except Exception: + result[k] = v + return result + + async def query_logs( + self, filters: Dict[str, Any], page: int = 1, page_size: int = 20 + ) -> Tuple[List[Dict[str, Any]], int]: + try: + min_score = filters.get("start_time", "-inf") + max_score = filters.get("end_time", "+inf") + # 按时间倒序获取所有 ID + all_ids = await self.redis.zrevrangebyscore( + "grok2api:logs:timeline", max_score, min_score + ) + if not all_ids: + return [], 0 + + # 批量获取日志详情 + logs = [] + pipe = self.redis.pipeline() + for lid in all_ids: + pipe.hgetall(f"grok2api:log:{lid}") + results = await pipe.execute() + + for raw in results: + if not raw: + continue + log = {} + for k, v in raw.items(): + try: + log[k] = json_loads(v) + except Exception: + log[k] = v + logs.append(log) + + # 内存过滤(除时间外的字段) + for key in ("type", "model", "status", "token_hash"): + if key in filters: + logs = [l for l in logs if l.get(key) == filters[key]] + + total = len(logs) + start = (page - 1) * page_size + return logs[start : start + page_size], total + except Exception as e: + logger.error(f"RedisStorage: 查询日志失败: {e}") + return [], 0 + + async def delete_logs(self, before_timestamp: int) -> int: + try: + ids = await self.redis.zrangebyscore( + "grok2api:logs:timeline", "-inf", before_timestamp + ) + if not ids: + return 0 + pipe = self.redis.pipeline() + for lid in ids: + pipe.delete(f"grok2api:log:{lid}") + pipe.zremrangebyscore("grok2api:logs:timeline", "-inf", before_timestamp) + await pipe.execute() + return len(ids) + except Exception as e: + logger.error(f"RedisStorage: 删除日志失败: {e}") + return 0 + + async def stats_logs(self) -> Dict[str, Any]: + try: + import datetime + today_start = int( + datetime.datetime.now() + .replace(hour=0, minute=0, second=0, microsecond=0) + .timestamp() + ) + total = await self.redis.zcard("grok2api:logs:timeline") or 0 + today = await self.redis.zcount( + "grok2api:logs:timeline", today_start, "+inf" + ) or 0 + + # 获取所有日志统计 success/error/models + all_ids = await self.redis.zrevrange("grok2api:logs:timeline", 0, -1) + success = 0 + models: Dict[str, int] = {} + if all_ids: + pipe = self.redis.pipeline() + for lid in all_ids: + pipe.hmget(f"grok2api:log:{lid}", "status", "model") + results = await pipe.execute() + for r in results: + if r and r[0] == "success": + success += 1 + if r and r[1]: + m = r[1] + models[m] = models.get(m, 0) + 1 + + return { + "total": total, + "success": success, + "error": total - success, + "today": today, + "models": models, + } + except Exception as e: + logger.error(f"RedisStorage: 统计日志失败: {e}") + return {"total": 0, "success": 0, "error": 0, "today": 0, "models": {}} + async def close(self): try: await self.redis.close() @@ -577,6 +829,40 @@ async def _ensure_schema(self): except Exception: pass + # 使用记录表 + await conn.execute( + text(""" + CREATE TABLE IF NOT EXISTS usage_logs ( + id VARCHAR(32) PRIMARY KEY, + created_at BIGINT NOT NULL, + type VARCHAR(16), + model VARCHAR(64), + is_stream BOOLEAN DEFAULT FALSE, + use_time INT DEFAULT 0, + status VARCHAR(16), + error_message TEXT, + token_hash VARCHAR(64), + pool_name VARCHAR(64), + effort VARCHAR(8), + ip VARCHAR(64), + request_id VARCHAR(64) + ) + """) + ) + + try: + await conn.execute( + text("CREATE INDEX idx_logs_created ON usage_logs (created_at)") + ) + except Exception: + pass + try: + await conn.execute( + text("CREATE INDEX idx_logs_model ON usage_logs (model)") + ) + except Exception: + pass + self._initialized = True except Exception as e: logger.error(f"SQLStorage: Schema 初始化失败: {e}") @@ -754,6 +1040,148 @@ async def save_tokens(self, data: Dict[str, Any]): logger.error(f"SQLStorage: 保存 Token 失败: {e}") raise + # ---- Usage Logs (SQL) ---- + + async def save_log(self, log: Dict[str, Any]): + await self._ensure_schema() + from sqlalchemy import text + try: + async with self.async_session() as session: + await session.execute( + text(""" + INSERT INTO usage_logs + (id, created_at, type, model, is_stream, use_time, + status, error_message, token_hash, pool_name, effort, ip, request_id) + VALUES + (:id, :created_at, :type, :model, :is_stream, :use_time, + :status, :error_message, :token_hash, :pool_name, :effort, :ip, :request_id) + """), + { + "id": log.get("id", ""), + "created_at": log.get("created_at", 0), + "type": log.get("type", "chat"), + "model": log.get("model", ""), + "is_stream": log.get("is_stream", False), + "use_time": log.get("use_time", 0), + "status": log.get("status", "success"), + "error_message": log.get("error_message", ""), + "token_hash": log.get("token_hash", ""), + "pool_name": log.get("pool_name", ""), + "effort": log.get("effort", "low"), + "ip": log.get("ip", ""), + "request_id": log.get("request_id", ""), + }, + ) + await session.commit() + except Exception as e: + logger.warning(f"SQLStorage: 保存日志失败: {e}") + + async def query_logs( + self, filters: Dict[str, Any], page: int = 1, page_size: int = 20 + ) -> Tuple[List[Dict[str, Any]], int]: + await self._ensure_schema() + from sqlalchemy import text + try: + conditions = [] + params: Dict[str, Any] = {} + for key in ("type", "model", "status", "token_hash"): + if key in filters: + conditions.append(f"{key} = :{key}") + params[key] = filters[key] + if "start_time" in filters: + conditions.append("created_at >= :start_time") + params["start_time"] = filters["start_time"] + if "end_time" in filters: + conditions.append("created_at <= :end_time") + params["end_time"] = filters["end_time"] + + where = (" WHERE " + " AND ".join(conditions)) if conditions else "" + + async with self.async_session() as session: + count_res = await session.execute( + text(f"SELECT COUNT(*) FROM usage_logs{where}"), params + ) + total = count_res.scalar() or 0 + + offset = (page - 1) * page_size + params["limit"] = page_size + params["offset"] = offset + res = await session.execute( + text( + f"SELECT * FROM usage_logs{where} ORDER BY created_at DESC LIMIT :limit OFFSET :offset" + ), + params, + ) + rows = res.fetchall() + columns = res.keys() + logs = [dict(zip(columns, row)) for row in rows] + return logs, total + except Exception as e: + logger.error(f"SQLStorage: 查询日志失败: {e}") + return [], 0 + + async def delete_logs(self, before_timestamp: int) -> int: + await self._ensure_schema() + from sqlalchemy import text + try: + async with self.async_session() as session: + count_res = await session.execute( + text("SELECT COUNT(*) FROM usage_logs WHERE created_at < :ts"), + {"ts": before_timestamp}, + ) + count = count_res.scalar() or 0 + if count > 0: + await session.execute( + text("DELETE FROM usage_logs WHERE created_at < :ts"), + {"ts": before_timestamp}, + ) + await session.commit() + return count + except Exception as e: + logger.error(f"SQLStorage: 删除日志失败: {e}") + return 0 + + async def stats_logs(self) -> Dict[str, Any]: + await self._ensure_schema() + from sqlalchemy import text + import datetime + try: + today_start = int( + datetime.datetime.now() + .replace(hour=0, minute=0, second=0, microsecond=0) + .timestamp() + ) + async with self.async_session() as session: + total_r = await session.execute(text("SELECT COUNT(*) FROM usage_logs")) + total = total_r.scalar() or 0 + + success_r = await session.execute( + text("SELECT COUNT(*) FROM usage_logs WHERE status = 'success'") + ) + success = success_r.scalar() or 0 + + today_r = await session.execute( + text("SELECT COUNT(*) FROM usage_logs WHERE created_at >= :ts"), + {"ts": today_start}, + ) + today = today_r.scalar() or 0 + + model_r = await session.execute( + text("SELECT model, COUNT(*) as cnt FROM usage_logs GROUP BY model") + ) + models = {row[0]: row[1] for row in model_r.fetchall()} + + return { + "total": total, + "success": success, + "error": total - success, + "today": today, + "models": models, + } + except Exception as e: + logger.error(f"SQLStorage: 统计日志失败: {e}") + return {"total": 0, "success": 0, "error": 0, "today": 0, "models": {}} + async def close(self): await self.engine.dispose() diff --git a/app/services/grok/services/chat.py b/app/services/grok/services/chat.py index 0fee80e8..db017a78 100644 --- a/app/services/grok/services/chat.py +++ b/app/services/grok/services/chat.py @@ -2,6 +2,7 @@ Grok Chat 服务 """ +import time import orjson from typing import Dict, List, Any from dataclasses import dataclass @@ -385,8 +386,11 @@ async def completions( messages: List[Dict[str, Any]], stream: bool = None, thinking: str = None, + client_ip: str = "", ): """Chat Completions 入口""" + start_time = time.time() + # 获取 token token_mgr = await get_token_manager() await token_mgr.reload_if_stale() @@ -408,9 +412,11 @@ async def completions( for attempt in range(max_token_retries): # 选择 token(排除已失败的) token = None - for pool_name in ModelService.pool_candidates_for_model(model): - token = token_mgr.get_token(pool_name, exclude=tried_tokens) + pool_name = "" + for pn in ModelService.pool_candidates_for_model(model): + token = token_mgr.get_token(pn, exclude=tried_tokens) if token: + pool_name = pn break if not token and not tried_tokens: @@ -418,9 +424,10 @@ async def completions( logger.info("No available tokens, attempting to refresh cooling tokens...") result = await token_mgr.refresh_cooling_tokens() if result.get("recovered", 0) > 0: - for pool_name in ModelService.pool_candidates_for_model(model): - token = token_mgr.get_token(pool_name) + for pn in ModelService.pool_candidates_for_model(model): + token = token_mgr.get_token(pn) if token: + pool_name = pn break if not token: @@ -445,12 +452,15 @@ async def completions( logger.debug(f"Processing stream response: model={model}") processor = StreamProcessor(model_name, token, think) return wrap_stream_with_usage( - processor.process(response), token_mgr, token, model + processor.process(response), token_mgr, token, model, + start_time=start_time, client_ip=client_ip, + pool_name=pool_name, log_type="chat", ) # 非流式 logger.debug(f"Processing non-stream response: model={model}") result = await CollectProcessor(model_name, token).process(response) + effort_str = "low" try: model_info = ModelService.get(model) effort = ( @@ -458,16 +468,43 @@ async def completions( if (model_info and model_info.cost.value == "high") else EffortType.LOW ) + effort_str = effort.value await token_mgr.consume(token, effort) logger.info(f"Chat completed: model={model}, effort={effort.value}") except Exception as e: logger.warning(f"Failed to record usage: {e}") + + # 记录使用日志 + try: + from app.services.usage_log import UsageLogService + use_time = int((time.time() - start_time) * 1000) + await UsageLogService.record( + type="chat", model=model, is_stream=False, + use_time=use_time, status="success", token=token, + pool_name=pool_name, effort=effort_str, ip=client_ip, + ) + except Exception as e: + logger.warning(f"Failed to record usage log: {e}") + return result except UpstreamException as e: status_code = e.details.get("status") if e.details else None last_error = e + # 记录错误日志 + try: + from app.services.usage_log import UsageLogService + use_time = int((time.time() - start_time) * 1000) + await UsageLogService.record( + type="chat", model=model, is_stream=is_stream, + use_time=use_time, status="error", + error_message=str(e), token=token, + pool_name=pool_name, ip=client_ip, + ) + except Exception: + pass + if status_code == 429: # 配额不足,标记 token 为 cooling 并换 token 重试 await token_mgr.mark_rate_limited(token) diff --git a/app/services/grok/services/media.py b/app/services/grok/services/media.py index 79d9b677..058f2589 100644 --- a/app/services/grok/services/media.py +++ b/app/services/grok/services/media.py @@ -3,6 +3,7 @@ """ import asyncio +import time from typing import AsyncGenerator, Optional import orjson @@ -299,8 +300,11 @@ async def completions( video_length: int = 6, resolution: str = "480p", preset: str = "normal", + client_ip: str = "", ): """视频生成入口""" + start_time = time.time() + # 获取 token(使用智能路由) token_mgr = await get_token_manager() await token_mgr.reload_if_stale() @@ -367,10 +371,13 @@ async def completions( if is_stream: processor = VideoStreamProcessor(model, token, think) return wrap_stream_with_usage( - processor.process(response), token_mgr, token, model + processor.process(response), token_mgr, token, model, + start_time=start_time, client_ip=client_ip, + pool_name="", log_type="video", ) result = await VideoCollectProcessor(model, token).process(response) + effort_str = "low" try: model_info = ModelService.get(model) effort = ( @@ -378,10 +385,24 @@ async def completions( if (model_info and model_info.cost.value == "high") else EffortType.LOW ) + effort_str = effort.value await token_mgr.consume(token, effort) logger.debug(f"Video completed, recorded usage (effort={effort.value})") except Exception as e: logger.warning(f"Failed to record video usage: {e}") + + # 记录使用日志 + try: + from app.services.usage_log import UsageLogService + use_time = int((time.time() - start_time) * 1000) + await UsageLogService.record( + type="video", model=model, is_stream=False, + use_time=use_time, status="success", token=token, + pool_name="", effort=effort_str, ip=client_ip, + ) + except Exception as e: + logger.warning(f"Failed to record usage log: {e}") + return result diff --git a/app/services/grok/utils/stream.py b/app/services/grok/utils/stream.py index c9e64dd2..05c1b954 100644 --- a/app/services/grok/utils/stream.py +++ b/app/services/grok/utils/stream.py @@ -2,6 +2,8 @@ 流式响应通用工具 """ +import time + from typing import AsyncGenerator from app.core.logger import logger @@ -10,7 +12,9 @@ async def wrap_stream_with_usage( - stream: AsyncGenerator, token_mgr, token: str, model: str + stream: AsyncGenerator, token_mgr, token: str, model: str, + *, start_time: float = 0, client_ip: str = "", + pool_name: str = "", log_type: str = "chat", ) -> AsyncGenerator: """ 包装流式响应,在完成时记录使用 @@ -20,6 +24,10 @@ async def wrap_stream_with_usage( token_mgr: TokenManager 实例 token: Token 字符串 model: 模型名称 + start_time: 请求开始时间 + client_ip: 客户端 IP + pool_name: Token 池名 + log_type: 日志类型 (chat/video) """ success = False try: @@ -27,6 +35,7 @@ async def wrap_stream_with_usage( yield chunk success = True finally: + effort_str = "low" if success: try: model_info = ModelService.get(model) @@ -35,6 +44,7 @@ async def wrap_stream_with_usage( if (model_info and model_info.cost.value == "high") else EffortType.LOW ) + effort_str = effort.value await token_mgr.consume(token, effort) logger.debug( f"Stream completed, recorded usage for token {token[:10]}... (effort={effort.value})" @@ -42,5 +52,20 @@ async def wrap_stream_with_usage( except Exception as e: logger.warning(f"Failed to record stream usage: {e}") + # 记录使用日志 + try: + from app.services.usage_log import UsageLogService + use_time = int((time.time() - start_time) * 1000) if start_time else 0 + await UsageLogService.record( + type=log_type, model=model, is_stream=True, + use_time=use_time, + status="success" if success else "error", + error_message="" if success else "stream interrupted", + token=token, pool_name=pool_name, + effort=effort_str, ip=client_ip, + ) + except Exception as e: + logger.warning(f"Failed to record usage log: {e}") + __all__ = ["wrap_stream_with_usage"] diff --git a/app/services/usage_log.py b/app/services/usage_log.py new file mode 100644 index 00000000..c8a600e7 --- /dev/null +++ b/app/services/usage_log.py @@ -0,0 +1,127 @@ +""" +使用记录服务 (Usage Log Service) +记录每次 API 调用的关键信息,支持异步写入不阻塞请求。 +""" + +import asyncio +import time +import uuid +from typing import Any, Dict, List, Optional, Tuple + +from pydantic import BaseModel, Field + +from app.core.logger import logger +from app.core.storage import get_storage + + +class UsageLog(BaseModel): + """使用记录数据模型""" + + id: str = Field(default_factory=lambda: uuid.uuid4().hex[:16]) + created_at: int = Field(default_factory=lambda: int(time.time())) + type: str = "chat" # chat / image / video + model: str = "" + is_stream: bool = False + use_time: int = 0 # 耗时(毫秒) + status: str = "success" # success / error + error_message: str = "" + token_hash: str = "" # 脱敏 Token + pool_name: str = "" + effort: str = "low" # low / high + ip: str = "" + request_id: str = "" + + +def mask_token(token: str) -> str: + """Token 脱敏:保留前8后8""" + if not token or len(token) <= 20: + return token or "" + return f"{token[:8]}...{token[-8:]}" + + +class UsageLogService: + """使用记录服务""" + + @staticmethod + async def record( + *, + type: str = "chat", + model: str = "", + is_stream: bool = False, + use_time: int = 0, + status: str = "success", + error_message: str = "", + token: str = "", + pool_name: str = "", + effort: str = "low", + ip: str = "", + request_id: str = "", + ): + """ + 异步记录一条使用日志。 + 使用 create_task 确保不阻塞请求响应。 + """ + log = UsageLog( + type=type, + model=model, + is_stream=is_stream, + use_time=use_time, + status=status, + error_message=error_message, + token_hash=mask_token(token), + pool_name=pool_name, + effort=effort, + ip=ip, + request_id=request_id, + ) + asyncio.create_task(UsageLogService._save(log)) + + @staticmethod + async def _save(log: UsageLog): + """实际写入存储""" + try: + storage = get_storage() + await storage.save_log(log.model_dump()) + except Exception as e: + logger.warning(f"Failed to save usage log: {e}") + + @staticmethod + async def query( + *, + type: str = None, + model: str = None, + status: str = None, + start_time: int = None, + end_time: int = None, + token_hash: str = None, + page: int = 1, + page_size: int = 20, + ) -> Tuple[List[Dict[str, Any]], int]: + """查询使用记录""" + storage = get_storage() + filters = {} + if type: + filters["type"] = type + if model: + filters["model"] = model + if status: + filters["status"] = status + if start_time: + filters["start_time"] = start_time + if end_time: + filters["end_time"] = end_time + if token_hash: + filters["token_hash"] = token_hash + return await storage.query_logs(filters, page, page_size) + + @staticmethod + async def delete(before_timestamp: int) -> int: + """删除指定时间之前的记录""" + storage = get_storage() + return await storage.delete_logs(before_timestamp) + + @staticmethod + async def stats() -> Dict[str, Any]: + """统计摘要""" + storage = get_storage() + return await storage.stats_logs() diff --git a/app/static/common/header.html b/app/static/common/header.html index 7303639d..5cbb3407 100644 --- a/app/static/common/header.html +++ b/app/static/common/header.html @@ -21,6 +21,7 @@ Token管理 配置管理 缓存管理 + 使用记录