forked from BiFangKNT/mtga
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtemp_old_app.py
More file actions
461 lines (397 loc) · 37.4 KB
/
temp_old_app.py
File metadata and controls
461 lines (397 loc) · 37.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
from __future__ import annotations
import contextlib
import json
import logging
import time
import uuid
import requests
from flask import Flask, Response, jsonify, request
from modules.proxy.proxy_auth import ProxyAuth
from modules.proxy.proxy_config import DEFAULT_MIDDLE_ROUTE, ProxyConfig, build_proxy_config
from modules.proxy.proxy_transport import ProxyTransport
from modules.runtime.resource_manager import ResourceManager
class ProxyApp:
"""浠g悊鏈嶅姟鐨勯鍩熼€昏緫锛氶厤缃В鏋?+ Flask 璺敱 + 涓婃父杞彂銆?""
def __init__(self, config=None, log_func=print, *, resource_manager: ResourceManager):
self.config = config or {}
self.log_func = log_func
self.resource_manager = resource_manager
self.app: Flask | None = None
self.valid = True
self.proxy_config: ProxyConfig | None = None
self.auth: ProxyAuth | None = None
self.transport: ProxyTransport | None = None
self.http_client: requests.Session | None = None
self.target_api_base_url = ""
self.middle_route = ""
self.inbound_route = DEFAULT_MIDDLE_ROUTE
self.custom_model_id = ""
self.target_model_id = ""
self.stream_mode = None
self.debug_mode = False
self.disable_ssl_strict_mode = False
proxy_config = build_proxy_config(
self.config,
resource_manager=self.resource_manager,
log_func=self.log_func,
)
if not proxy_config:
self.valid = False
return
self.proxy_config = proxy_config
self.target_api_base_url = proxy_config.target_api_base_url
self.middle_route = proxy_config.middle_route
self.custom_model_id = proxy_config.custom_model_id
self.target_model_id = proxy_config.target_model_id
self.stream_mode = proxy_config.stream_mode # None, 'true', 'false'
self.debug_mode = proxy_config.debug_mode
self.disable_ssl_strict_mode = proxy_config.disable_ssl_strict_mode
self.auth = ProxyAuth(proxy_config.mtga_auth_key)
self.transport = ProxyTransport(
resource_manager=self.resource_manager,
disable_ssl_strict_mode=self.disable_ssl_strict_mode,
log_func=self.log_func,
)
self.http_client = self.transport.session
self._create_app()
def close(self) -> None:
if self.transport:
self.transport.close()
@staticmethod
def _new_request_id() -> str:
return uuid.uuid4().hex[:6]
@staticmethod
def _timestamp_ms() -> str:
now = time.time()
base = time.strftime("%H:%M:%S", time.localtime(now))
ms = int((now % 1) * 1000)
return f"{base}.{ms:03d}"
def _log_request(self, request_id: str, message: str):
self.log_func(f"{self._timestamp_ms()} [{request_id}] {message}")
def _get_mapped_model_id(self):
return self.custom_model_id
def _build_route(self, base_route: str, suffix: str) -> str:
middle_route = base_route or ""
if not middle_route.startswith("/"):
middle_route = f"/{middle_route}"
if middle_route == "/":
return f"/{suffix.lstrip('/')}"
return f"{middle_route.rstrip('/')}/{suffix.lstrip('/')}"
def _create_app(self):
self.app = Flask(__name__)
if self.debug_mode:
logging.getLogger().setLevel(logging.INFO)
self.app.logger.setLevel(logging.INFO)
models_route = self._build_route(self.inbound_route, "models")
chat_route = self._build_route(self.inbound_route, "chat/completions")
self.app.add_url_rule(models_route, "get_models", self._get_models, methods=["GET"])
self.app.add_url_rule(
chat_route,
"chat_completions",
self._chat_completions,
methods=["POST"],
)
def _get_models(self):
self.log_func(f"鏀跺埌妯″瀷鍒楄〃璇锋眰 {self._build_route(self.inbound_route, 'models')}")
auth = self.auth
if not auth:
self.log_func("浠g悊閴存潈鏈氨缁?)
return jsonify(
{"error": {"message": "Proxy not ready", "type": "server_error"}}
), 500
auth_header = request.headers.get("Authorization")
if not auth.verify(auth_header):
self.log_func("妯″瀷鍒楄〃璇锋眰閴存潈澶辫触")
return jsonify(
{"error": {"message": "Invalid authentication", "type": "authentication_error"}}
), 401
mapped_model_id = self._get_mapped_model_id()
model_data = {
"object": "list",
"data": [
{
"id": mapped_model_id,
"object": "model",
"owned_by": "openai",
"created": int(time.time()),
"permission": [
{
"id": f"modelperm-{mapped_model_id}",
"object": "model_permission",
"created": int(time.time()),
"allow_create_engine": False,
"allow_sampling": True,
"allow_logprobs": True,
"allow_search_indices": False,
"allow_view": True,
"allow_fine_tuning": False,
"organization": "*",
"group": None,
"is_blocking": False,
}
],
}
],
}
self.log_func(f"杩斿洖鏄犲皠妯″瀷: {mapped_model_id}")
return jsonify(model_data)
def _chat_completions(self): # noqa: PLR0911, PLR0912, PLR0915
request_id = self._new_request_id()
def log(message: str):
self._log_request(request_id, message)
log(f"鏀跺埌鑱婂ぉ琛ュ叏璇锋眰 {self._build_route(self.inbound_route, 'chat/completions')}")
auth = self.auth
transport = self.transport
http_client = self.http_client
if not (auth and transport and http_client):
log("浠g悊鏈嶅姟鏈氨缁?)
return jsonify({"error": "Proxy not ready"}), 500
if self.debug_mode:
headers_str = "\\n".join(f"{k}: {v}" for k, v in request.headers.items())
log_message = (
f"--- 璇锋眰澶?(璋冭瘯妯″紡) ---\\n{headers_str}\\n"
"--------------------------------------"
)
try:
body_str = request.get_data(as_text=True)
log_message += (
f"--- 璇锋眰浣?(璋冭瘯妯″紡) ---\\n{body_str}\\n"
"--------------------------------------"
)
except Exception as body_exc:
error_msg = f"璇诲彇璇锋眰浣撴暟鎹椂鍑洪敊: {body_exc}\\n"
log(error_msg)
log_message += error_msg
log(log_message)
request_data = request.get_json(silent=True)
if request_data is None:
log("瑙f瀽 JSON 澶辫触鎴栬姹備笉鏄?JSON 鏍煎紡")
log(f"Content-Type: {request.headers.get('Content-Type')}")
return jsonify(
{
"error": "Invalid JSON or Content-Type",
"message": (
"The request body must be valid JSON and the Content-Type header "
"must be 'application/json'."
),
}
), 400
client_requested_stream = request_data.get("stream", False)
log(f"瀹㈡埛绔姹傜殑娴佹ā寮? {client_requested_stream}")
if "model" in request_data:
original_model = request_data["model"]
log(f"鏇挎崲妯″瀷鍚? {original_model} -> {self.target_model_id}")
request_data["model"] = self.target_model_id
else:
log(f"璇锋眰涓病鏈?model 瀛楁锛屾坊鍔?model: {self.target_model_id}")
request_data["model"] = self.target_model_id
if self.stream_mode is not None:
stream_value = self.stream_mode == "true"
if "stream" in request_data:
original_stream_value = request_data["stream"]
log(f"寮哄埗淇敼娴佹ā寮? {original_stream_value} -> {stream_value}")
request_data["stream"] = stream_value
else:
log(f"璇锋眰涓病鏈?stream 鍙傛暟锛岃缃负 {stream_value}")
request_data["stream"] = stream_value
auth_header = request.headers.get("Authorization")
if not auth.verify(auth_header):
log("鑱婂ぉ琛ュ叏璇锋眰MTGA閴存潈澶辫触")
return jsonify(
{"error": {"message": "Invalid authentication", "type": "authentication_error"}}
), 401
target_api_key = ""
if self.proxy_config:
target_api_key = self.proxy_config.api_key
forward_headers = auth.build_forward_headers(
auth_header,
target_api_key,
log_func=log,
)
try:
target_url = (
f"{self.target_api_base_url.rstrip('/')}"
f"{self._build_route(self.middle_route, 'chat/completions')}"
)
log(f"杞彂璇锋眰鍒? {target_url}")
is_stream = request_data.get("stream", False)
log(f"娴佹ā寮? {is_stream}")
response_from_target = http_client.post(
target_url,
json=request_data,
headers=forward_headers,
stream=is_stream,
timeout=300,
)
response_from_target.raise_for_status()
if self.debug_mode:
log(f"涓婃父鍝嶅簲鐘舵€佺爜: {response_from_target.status_code}")
log(f"涓婃父 Content-Type: {response_from_target.headers.get('content-type')}")
if is_stream:
log("杩斿洖娴佸紡鍝嶅簲")
log_file = None
log_file_stack = None
log_path = None
if self.debug_mode:
try:
log_path = transport.prepare_sse_log_path()
log_file_stack = contextlib.ExitStack()
log_file = log_file_stack.enter_context(open(log_path, "wb")) # noqa: SIM115
log(f"SSE 鍘熷鏁版嵁灏嗚褰曞埌: {log_path}")
except Exception as log_exc: # noqa: BLE001
log(f"SSE 鏃ュ織鏂囦欢鍒涘缓澶辫触: {log_exc}")
def generate_stream(): # noqa: PLR0915, PLR0912
nonlocal log_file, log_file_stack
event_index = 0
done_sent = False
finish_reason_seen = None
try:
for upstream_chunk_index, raw_event in transport.extract_sse_events(
response_from_target, log_file=log_file, log=log
):
event_index += 1
event_text = raw_event.decode("utf-8", errors="replace")
data_lines = [
line[len("data:") :].lstrip()
for line in event_text.splitlines()
if line.startswith("data:")
]
if not data_lines:
log(f"evt#{event_index} 璺宠繃鏃?data 琛岀殑浜嬩欢: {event_text!r}")
continue
data_str = "\n".join(data_lines)
if self.debug_mode:
log(
f"UP<< evt#{event_index} src_chunk#{upstream_chunk_index} "
f"bytes={len(raw_event)} | {data_str.strip()}"
)
if data_str.strip() == "[DONE]":
done_sent = True
done_bytes = b"data: [DONE]\n\n"
try:
yield done_bytes
except GeneratorExit:
log(
f"DOWN 杩炴帴鎻愬墠涓柇锛屽凡璇诲彇涓婃父 evt#{event_index} (DONE)"
)
raise
except Exception as downstream_exc: # noqa: BLE001
log(f"DOWN 鍐欏叆寮傚父 (DONE)锛屽仠姝㈠悜涓嬫父鍙戦€? {downstream_exc}")
break
log("宸茶浆鍙?[DONE]")
break
normalized_bytes, finish_reason = transport.normalize_openai_event(
data_str,
event_index,
model_name=self.target_model_id,
log=log,
)
if finish_reason:
finish_reason_seen = finish_reason
try:
yield normalized_bytes
except GeneratorExit:
log(
f"DOWN 杩炴帴鎻愬墠涓柇锛屽凡璇诲彇涓婃父 evt#{event_index} "
f"finish={finish_reason_seen}"
)
raise
except Exception as downstream_exc: # noqa: BLE001
log(f"DOWN 鍐欏叆寮傚父锛屽仠姝㈠悜涓嬫父鍙戦€? {downstream_exc}")
break
if not done_sent:
tail_bytes = b"data: [DONE]\n\n"
with contextlib.suppress(Exception):
yield tail_bytes
if self.debug_mode:
extra = (
f"锛宖inish_reason={finish_reason_seen}"
if finish_reason_seen
else ""
)
log(f"鏈敹鍒颁笂娓?[DONE]锛屽凡琛ュ彂缁堟浜嬩欢{extra}")
finally:
if log_file_stack:
with contextlib.suppress(Exception):
log_file_stack.close()
if log_path:
log(f"SSE 璁板綍瀹屾垚: {log_path}")
with contextlib.suppress(Exception):
response_from_target.close()
if self.debug_mode:
log(f"UP 娴佺粨鏉燂紝绱 {event_index} 涓簨浠?)
downstream_content_type = response_from_target.headers.get(
"content-type", "text/event-stream"
)
if self.debug_mode:
log(f"涓嬫父鍝嶅簲 Content-Type: {downstream_content_type}")
return Response(
generate_stream(),
content_type=downstream_content_type,
)
response_json = response_from_target.json()
if client_requested_stream and self.stream_mode == "false":
log("灏嗛潪娴佸紡鍝嶅簲杞崲涓烘祦寮忔牸寮忚繑鍥炵粰瀹㈡埛绔?)
def simulate_stream():
choices = response_json.get("choices", [])
if not choices:
log("鍝嶅簲涓病鏈夋壘鍒?choices 瀛楁")
yield f"data: {json.dumps({'error': 'No choices in response'})}\\n\\n"
return
first_choice = choices[0]
message = first_choice.get("message", {})
content = message.get("content", "")
if not content:
log("鍝嶅簲涓病鏈夋壘鍒板唴瀹?)
yield f"data: {json.dumps({'error': 'No content in response'})}\\n\\n"
return
model = response_json.get("model", "")
id_value = response_json.get("id", "")
created = response_json.get("created", 0)
chunk_size = 10
total_chars = len(content)
for i in range(0, total_chars, chunk_size):
chunk = content[i : i + chunk_size]
chunk_data = {
"id": id_value,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": chunk},
"finish_reason": None
if i + chunk_size < total_chars
else first_choice.get("finish_reason", "stop"),
}
],
}
yield f"data: {json.dumps(chunk_data)}\\n\\n"
time.sleep(0.01)
yield "data: [DONE]\\n\\n"
return Response(simulate_stream(), content_type="text/event-stream")
if self.debug_mode:
response_str = json.dumps(response_json, indent=2, ensure_ascii=False)
log(
f"--- 瀹屾暣鍝嶅簲浣?(璋冭瘯妯″紡) ---\\n{response_str}\\n"
"--------------------------------------"
)
else:
log("杩斿洖闈炴祦寮?JSON 鍝嶅簲")
return jsonify(response_json), response_from_target.status_code
except requests.exceptions.HTTPError as e:
error_msg = f"鐩爣 API HTTP 閿欒: {e.response.status_code} - {e.response.text}"
log(error_msg)
return jsonify(
{"error": f"Target API error: {e.response.status_code}", "details": e.response.text}
), e.response.status_code
except requests.exceptions.RequestException as e:
error_msg = f"杩炴帴鐩爣 API 鏃跺嚭閿? {e}"
log(error_msg)
return jsonify({"error": f"Error contacting target API: {str(e)}"}), 503
except Exception as e:
error_msg = f"鍙戠敓鎰忓閿欒: {e}"
log(error_msg)
return jsonify({"error": "An internal server error occurred"}), 500
__all__ = ["ProxyApp"]