-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
306 lines (257 loc) · 7.92 KB
/
main.py
File metadata and controls
306 lines (257 loc) · 7.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import asyncio
import argparse
import sys
from datetime import datetime
from typing import List
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query, Request
from pydantic import BaseModel
from prometheus_client import Counter, Gauge
import uvicorn
from loguru import logger
from proxy_pool.core.fetcher import ProxyFetcher
from proxy_pool.core.validator import ProxyValidator
from proxy_pool.core.storage import RedisProxyClient
from proxy_pool.core.cleaner import ProxyCleaner
# from proxy_pool.models.proxy_model import ProxyModel
from proxy_pool.utils.config import Settings
from proxy_pool.utils.logger import setup_logger
# 全局配置
settings = Settings()
# 全局共享的 Redis 客户端实例
storage = RedisProxyClient()
class Metrics:
""" 监控指标 """
def __init__(self):
self.proxy_total = Gauge(
"proxy_pool_total",
"Total number of proxies"
)
self.proxy_valid = Gauge(
"proxy_pool_valid",
"Number of valid proxies"
)
self.fetch_counter = Counter(
"proxy_pool_fetch_total",
"Total number of fetch operations"
)
self.api_requests = Counter(
"proxy_pool_api_requests",
"Total number of API requests",
["endpoint", "method"]
)
metrics = Metrics()
class ProxyResponse(BaseModel):
"""代理响应模型"""
proxies: List[str]
count: int
timestamp: datetime
class ProxyPoolApplication:
def __init__(self):
self.config = settings
self.fetcher = ProxyFetcher()
self.validator = ProxyValidator()
self.storage = storage # 共享的 Redis 客户端
self.cleaner = ProxyCleaner()
self._running = True # 控制运行状态
async def stop(self):
""" 关闭程序 """
self._running = False
# 清理资源
logger.info("正在关闭代理池应用...")
self.storage.close()
await self.fetcher.close()
logger.info("代理池应用已关闭")
async def run(self):
""" 主运行流程 """
try:
while self._running:
try:
await self._run_cycle()
except Exception as e:
logger.error(f"代理池运行异常: {e}")
await asyncio.sleep(60)
finally:
await self.stop()
async def _run_cycle(self):
""" 单次运行循环 """
try:
# 更新指标
metrics.fetch_counter.inc()
# 1. 获取代理
raw_proxies = await self.fetcher.fetch_all()
logger.info(f"获取原始代理 {len(raw_proxies)} 个")
metrics.proxy_total.set(len(raw_proxies))
# 2. 验证代理
valid_proxies = await self.validator.validate_proxy(raw_proxies)
logger.info(f"验证通过代理 {len(valid_proxies)} 个")
metrics.proxy_valid.set(len(valid_proxies))
# 3. 存储代理
for proxy in valid_proxies:
await self.storage.add(proxy)
# 4. 清理无效代理
await self.cleaner.clean_invalid_proxies()
# 等待下一次循环
await asyncio.sleep(self.config.FETCH_INTERVAL)
except Exception as e:
logger.error(f"代理池运行循环发生异常: {e}")
raise
# FastAPI 应用
@asynccontextmanager
async def lifespan(app: FastAPI):
""" FastAPI 生命周期管理 """
# 启动时
logger.info("API服务启动...")
yield
# 关闭时
logger.info("API服务关闭...")
storage.close()
app = FastAPI(
title="代理池服务",
description="高性能代理池服务",
version="1.0.0",
lifespan=lifespan
)
@app.middleware("http")
async def track_requests(request: Request, call_next):
""" 请求追踪中间件 """
metrics.api_requests.labels(
endpoint=request.url.path,
method=request.method
).inc()
return await call_next(request)
@app.get("/proxies", response_model=ProxyResponse)
async def get_proxies(
count: int = Query(default=10, ge=1, le=100),
# protocol: Optional[str] = Query(default=None, regex="^(http|https)?$")
):
""" 获取代理列表 """
try:
proxies = await storage.random_proxy(count)
if not proxies:
raise HTTPException(
status_code=404,
detail="No proxies available"
)
return ProxyResponse(
proxies=proxies,
count=len(proxies),
timestamp=datetime.now()
)
except Exception as e:
logger.exception("获取代理失败")
raise HTTPException(
status_code=500,
detail=str(e)
)
@app.get("/stats")
async def get_stats():
""" 获取统计信息 """
try:
total = await storage.get_proxy_count()
return {
"total": total,
"fetch_count": metrics.fetch_counter.value.get(),
"timestamp": datetime.now()
}
except Exception as e:
logger.exception("获取统计信息失败")
raise HTTPException(
status_code=500,
detail=str(e)
)
async def start_api_server(port: int):
"""
启动 API 服务
"""
config = uvicorn.Config(
app,
host="127.0.0.1",
port=port,
log_level="info"
)
server = uvicorn.Server(config)
await server.serve()
async def run_fetch_mode():
"""
模式: fetch - 获取代理
"""
fetcher = ProxyFetcher()
try:
raw_proxies = await fetcher.fetch_all()
logger.info(f"获取代理 {len(raw_proxies)} 个")
finally:
await fetcher.close()
async def run_validate_mode():
"""
模式: validate - 验证代理
"""
try:
validator = ProxyValidator()
proxies_str = await storage.get_all_proxies()
valid_proxies = await validator.validate_proxy(proxies_str)
async with storage.pipeline() as pipe:
for proxy in valid_proxies:
await pipe.add(proxy)
logger.info(f"验证有效代理 {len(valid_proxies)} 个")
finally:
await storage.close()
async def run_serve_mode():
"""
模式: serve - 启动完整服务
"""
# 启动代理池任务和API服务
app_ = ProxyPoolApplication()
try:
await asyncio.gather(
app_.run(),
start_api_server(settings.API_PORT)
)
except asyncio.CancelledError:
logger.info("服务停止中...")
finally:
await app_.stop()
async def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description="代理池服务")
parser.add_argument(
"--mode",
choices=["fetch", "validate", "serve"],
default="serve",
help="运行模式",
)
parser.add_argument(
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="日志级别",
)
# parser.add_argument(
# "--port",
# type=int,
# default=5010,
# help="API服务端口"
# )
args = parser.parse_args()
# 配置日志
setup_logger(args.log_level)
logger.info(f"启动模式: {args.mode}")
try:
if args.mode == "fetch":
await run_fetch_mode()
elif args.mode == "validate":
await run_validate_mode()
elif args.mode == "serve":
await run_serve_mode()
except KeyboardInterrupt:
logger.info("服务已手动停止")
except Exception as e:
logger.exception(f"服务启动失败: {e}")
sys.exit(1)
if __name__ == "__main__":
try:
asyncio.run(main()) # 启动整个应用
except KeyboardInterrupt:
logger.info("服务已手动停止")
except Exception as error:
logger.error(f"服务启动失败: {error}")