-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathext_page.py
More file actions
328 lines (264 loc) · 9.08 KB
/
ext_page.py
File metadata and controls
328 lines (264 loc) · 9.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
import random
import time
import logging
from typing import Optional, Callable
from playwright.sync_api import Page, Locator, Response
from playwright_stealth.stealth import Stealth
from ext_browser_context import ExtBrowserContext
from ext_locator import ExtLocator
logger = logging.getLogger(__name__)
DEFAULT_TIMEOUT_MILLIS = 10000.0
class ExtPage:
"""扩展页面类"""
def __init__(self, page: Page, ext_bc: ExtBrowserContext, locked: bool = False):
self.page = page
self.ext_bc = ext_bc
self.locked = locked
self.suspended = False
self._stealth = None
self.stealth_enabled = False
def enable_stealth(self):
"""启用 stealth 模式"""
if not self.stealth_enabled:
self._stealth = Stealth()
self._stealth.apply_stealth_sync(self.page)
self.stealth_enabled = True
logger.info("Stealth 模式已启用")
@property
def is_closed(self) -> bool:
return self.page.is_closed()
def new_ext_page(self) -> 'ExtPage':
"""创建新的扩展页面
Returns:
ExtPage 实例
"""
return self.ext_bc.new_ext_page()
def expect_ext_page(self, cb: Callable) -> 'ExtPage':
"""等待新页面打开
Args:
cb: 触发新页面打开的回调函数
Returns:
新的 ExtPage 实例
"""
self.check_suspend()
with self.page.expect_popup() as popup_info:
cb()
page = popup_info.value
if page:
return self.ext_bc.build_ext_page(page)
else:
raise RuntimeError("Failed to get new page")
def ext_context(self) -> ExtBrowserContext:
"""获取扩展浏览器上下文
Returns:
ExtBrowserContext 实例
"""
return self.ext_bc
def release(self):
"""释放页面锁定"""
with self.ext_bc._lock:
for ext_page in self.ext_bc.ext_pages:
if ext_page == self:
ext_page.locked = False
break
def close(self):
"""关闭页面"""
self.page.close()
def close_all(self):
"""关闭页面和浏览器上下文"""
self.close()
self.ext_bc.close()
def re_new_page_by_error(self, err: Exception):
"""根据错误重新创建页面
Args:
err: 错误对象
"""
if "target closed" in str(err):
self.close()
for i in range(3):
try:
new_page = self.ext_bc.browser_context.new_page()
self.page = new_page
return
except Exception as ne:
logger.error(f"Retry {i + 1} failed: {ne}")
time.sleep(1)
def navigate_with_loaded_state(self, url: str):
"""导航到指定 URL 并等待加载完成
Args:
url: 目标 URL
"""
try:
self.check_suspend()
self.page.goto(url, wait_until="load")
except Exception as e:
logger.error(f"Page.Goto url[{url}] error: {e}")
self.re_new_page_by_error(e)
raise
def navigate(self, url: str, wait_until: str = "load"):
"""导航到指定 URL
Args:
url: 目标 URL
wait_until: 等待状态
"""
try:
self.check_suspend()
self.page.goto(url, wait_until=wait_until)
except Exception as e:
logger.error(f"Page.Goto url[{url}] error: {e}")
self.re_new_page_by_error(e)
raise
def reload_with_loaded_state(self):
"""重新加载页面并等待加载完成"""
try:
self.check_suspend()
self.page.reload(wait_until="load")
except Exception as e:
logger.error(f"Page.Reload url[{self.page.url}] error: {e}")
self.re_new_page_by_error(e)
raise
def wait_for_load_state_load(self):
"""等待页面加载状态为 load"""
try:
self.page.wait_for_load_state("load")
except Exception as e:
logger.error(f"Page.WaitForLoadStateLoad error: {e}")
self.re_new_page_by_error(e)
raise
def wait_for_dom_content_loaded(self):
"""等待 DOM 内容加载完成"""
try:
self.page.wait_for_load_state("domcontentloaded")
except Exception as e:
logger.error(f"Page.WaitForDomContentLoaded error: {e}")
self.re_new_page_by_error(e)
raise
def wait_for_selector_state_visible(self, selector: str):
"""等待选择器可见
Args:
selector: CSS 选择器
"""
try:
self.page.wait_for_selector(selector, state="visible", timeout=DEFAULT_TIMEOUT_MILLIS)
except Exception as e:
logger.error(f"Page.WaitForSelector error: {e}")
raise
def random_wait_short(self):
"""随机等待短时间(100-1000ms)"""
self.random_wait_range(100, 1000)
def random_wait_middle(self):
"""随机等待中等时间(3000-6000ms)"""
self.random_wait_range(3000, 6000)
def random_wait_long(self):
"""随机等待长时间(10000-20000ms)"""
self.random_wait_range(10000, 20000)
def random_wait_range(self, min_ms: int, max_ms: int):
"""随机等待指定范围的时间
Args:
min_ms: 最小毫秒数
max_ms: 最大毫秒数
"""
millisecond = random.randint(min_ms, max_ms)
logger.info(f"等待 {millisecond} 毫秒")
time.sleep(millisecond / 1000.0)
def expect_response_text(self, url_or_predicate: str, cb: Callable) -> str:
"""等待响应并返回文本内容
Args:
url_or_predicate: URL 或谓词
cb: 触发请求的回调函数
Returns:
响应文本内容
"""
try:
self.check_suspend()
def predicate(response: Response):
return url_or_predicate in response.url
with self.page.expect_response(predicate, timeout=DEFAULT_TIMEOUT_MILLIS) as response_info:
cb()
response = response_info.value
if not response.ok:
raise RuntimeError("Response not OK")
text = response.text()
if not text:
logger.error(f"get response text empty: {url_or_predicate}")
raise RuntimeError("Response text is empty")
return text
except Exception as e:
logger.error(f"Page.ExpectResponseText error: {e}")
raise
def html_content(self) -> str:
"""获取页面 HTML 内容
Returns:
HTML 内容字符串
"""
try:
return self.page.content()
except Exception as e:
logger.error(f"Page.Content error: {e}")
return ""
def ext_locator(self, *selectors: str) -> ExtLocator:
"""创建扩展定位器
Args:
*selectors: CSS 选择器列表
Returns:
ExtLocator 实例
"""
locator = None
for selector in selectors:
if locator is None:
locator = self.page.locator(selector)
else:
locator = locator.locator(selector)
return ExtLocator(self, locator, list(selectors))
def must_inner_text(self, *selectors: str) -> str:
"""获取元素的内部文本
Args:
*selectors: CSS 选择器列表
Returns:
内部文本
"""
locator = self.ext_locator(*selectors)
return locator.must_inner_text()
def must_text_content(self, *selectors: str) -> str:
"""获取元素的文本内容
Args:
*selectors: CSS 选择器列表
Returns:
文本内容
"""
locator = self.ext_locator(*selectors)
return locator.must_text_content()
def exists(self, selector: str) -> bool:
"""检查元素是否存在
Args:
selector: CSS 选择器
Returns:
元素是否存在
"""
return self.ext_locator(selector).exists()
def click(self, selector: str):
"""点击元素
Args:
selector: CSS 选择器
"""
try:
self.check_suspend()
locator = self.ext_locator(selector)
if locator.exists():
locator.click()
except Exception as e:
logger.error(f"Page.Click[{selector}] error: {e}")
raise
def suspend(self):
"""暂停页面操作"""
self.suspended = True
def continue_page(self):
"""继续页面操作"""
self.suspended = False
def check_suspend(self):
"""检查是否需要暂停"""
while self.suspended:
self.random_wait_middle()
def __getattr__(self, name):
"""代理到 Playwright Page"""
return getattr(self.page, name)