-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmonitor.py
More file actions
626 lines (521 loc) · 24.6 KB
/
monitor.py
File metadata and controls
626 lines (521 loc) · 24.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
import logging
import re
from pathlib import Path
from typing import Any, Callable, Optional
from urllib.parse import urljoin
from playwright.sync_api import Locator, Page, TimeoutError as PlaywrightTimeoutError, sync_playwright
from models import Activity
import page_selectors as selectors
class LoginRequiredError(RuntimeError):
"""登录状态不可用,需要用户手动重新登录。"""
class LectureMonitor:
"""活动页面监控核心流程。"""
def __init__(self, config: dict[str, Any]):
self.url = config["url"]
self.url_without_hash = self.url.split("#", 1)[0]
browser_config = config.get("browser", {})
self.headless = bool(browser_config.get("headless", False))
self.slow_mo_ms = int(browser_config.get("slow_mo_ms", 0))
self.profile_dir = Path(browser_config.get("profile_dir", "data/browser_profile"))
timeout_config = config.get("timeouts", {})
self.page_load_ms = int(timeout_config.get("page_load_ms", 30000))
self.ui_wait_ms = int(timeout_config.get("ui_wait_ms", 8000))
# 登录检测前给页面一点渲染时间,避免过早判定登录失效
self.login_check_delay_ms = int(timeout_config.get("login_check_delay_ms", 3500))
# 手动登录后,自动回到目标页的重试参数
self.login_recover_retry_count = int(timeout_config.get("login_recover_retry_count", 4))
self.login_recover_retry_wait_ms = int(timeout_config.get("login_recover_retry_wait_ms", 2000))
self.profile_dir.mkdir(parents=True, exist_ok=True)
def run_once(
self,
allow_manual_login: bool = False,
on_login_invalid: Optional[Callable[[str], None]] = None,
) -> list[Activity]:
"""执行一次完整流程:每次新开浏览器上下文,结束后关闭。"""
try:
return self._run_once_with_mode(
headless_mode=self.headless,
allow_manual_login=allow_manual_login,
on_login_invalid=on_login_invalid,
)
except LoginRequiredError:
# 仅在“默认无头 + 允许手动恢复”时,临时切到有头模式处理本轮查询
if not self.headless or not allow_manual_login:
raise
logging.warning("无头模式下检测到登录失效,将临时切换为有头模式进行登录恢复。")
input("请按回车打开临时浏览器窗口并完成登录恢复... ")
return self._run_once_with_mode(
headless_mode=False,
allow_manual_login=True,
# 无头分支已经触发过失效回调,这里避免重复触发邮件
on_login_invalid=None,
quick_manual_recover=True,
)
def close(self) -> None:
"""兼容接口:当前版本每轮自动关闭,无需额外释放。"""
return
def _run_once_with_mode(
self,
headless_mode: bool,
allow_manual_login: bool,
on_login_invalid: Optional[Callable[[str], None]] = None,
quick_manual_recover: bool = False,
) -> list[Activity]:
"""在指定浏览器模式下执行一轮查询。"""
with sync_playwright() as playwright:
context = playwright.chromium.launch_persistent_context(
user_data_dir=str(self.profile_dir),
headless=headless_mode,
slow_mo=self.slow_mo_ms,
)
try:
page = context.pages[0] if context.pages else context.new_page()
logging.info("打开页面: %s", self.url)
page.goto(self.url, wait_until="domcontentloaded", timeout=self.page_load_ms)
self._wait_before_login_check(page)
self._ensure_login(
page,
allow_manual_login=allow_manual_login,
on_login_invalid=on_login_invalid,
current_headless=headless_mode,
quick_manual_recover=quick_manual_recover,
)
self._apply_filters(
page,
allow_manual_login=allow_manual_login,
on_login_invalid=on_login_invalid,
current_headless=headless_mode,
)
self._click_query(page)
activities = self._parse_activities(page)
logging.info("本次解析到 %d 条活动", len(activities))
return activities
finally:
context.close()
def _ensure_login(
self,
page: Page,
allow_manual_login: bool,
on_login_invalid: Optional[Callable[[str], None]] = None,
current_headless: bool = False,
quick_manual_recover: bool = False,
) -> None:
"""检测登录状态;失效时抛出登录异常或进入手动恢复流程。"""
if self._has_search_filters(page):
logging.info("检测到筛选区域,登录状态可用。")
return
if quick_manual_recover:
logging.info("临时有头恢复模式:直接进入手动登录恢复流程。")
if not allow_manual_login:
raise LoginRequiredError("临时有头恢复模式下未允许手动登录。")
if current_headless:
raise LoginRequiredError("恢复模式参数异常:当前仍为无头模式。")
input("请在打开的浏览器中手动登录,然后回到终端按回车继续... ")
if self._recover_to_target_search_page(page):
logging.info("手动登录后已成功回到目标检索页。")
return
self._print_login_debug_tips()
raise LoginRequiredError("手动登录后仍未检测到筛选区域(已尝试自动跳回目标页)。")
# 兼容前端异步渲染:首次未命中时再等一轮再判定
logging.info("筛选区域首次未出现,等待页面继续渲染后重试。")
self._wait_before_login_check(page)
if self._has_search_filters(page):
logging.info("重试后检测到筛选区域,登录状态可用。")
return
# 第一次仍未命中时,先主动再跳一次目标页面,处理被统一门户重定向的情况
logging.info("仍未检测到筛选区域,尝试主动重新打开目标检索页。")
page.goto(self.url, wait_until="domcontentloaded", timeout=self.page_load_ms)
self._wait_before_login_check(page)
if self._has_search_filters(page):
logging.info("重新打开目标页后检测到筛选区域,登录状态可用。")
return
reason = "未检测到筛选区域,当前登录状态可能失效。"
logging.warning("暂未检测到筛选区域,可能尚未登录或登录失效。")
if on_login_invalid is not None:
try:
on_login_invalid(reason)
except Exception as exc:
logging.exception("登录失效回调执行失败: %s", exc)
if not allow_manual_login:
raise LoginRequiredError(reason)
if current_headless:
raise LoginRequiredError("无头模式下登录失效,需要临时切换为有头模式恢复登录。")
input("请在打开的浏览器中手动登录,然后回到终端按回车继续... ")
if self._recover_to_target_search_page(page):
logging.info("手动登录后已成功回到目标检索页。")
return
self._print_login_debug_tips()
raise LoginRequiredError("手动登录后仍未检测到筛选区域(已尝试自动跳回目标页)。")
def _has_search_filters(self, page: Page) -> bool:
"""粗略判断:页面是否存在筛选标签文本。"""
hit_count = 0
for item in selectors.FILTER_CONFIG:
label = item["label"]
if page.get_by_text(label, exact=False).count() > 0:
hit_count += 1
return hit_count >= 2
def _apply_filters(
self,
page: Page,
allow_manual_login: bool,
on_login_invalid: Optional[Callable[[str], None]] = None,
current_headless: bool = False,
) -> None:
"""按规格设置四个筛选项,首次失败时自动刷新目标页再重试一次。"""
failed_item = self._apply_filters_once(page)
if failed_item is None:
return
first_label, first_option = failed_item
handled_popup = self._recover_from_network_issue_popup(
page,
allow_manual_login=allow_manual_login,
on_login_invalid=on_login_invalid,
current_headless=current_headless,
)
if handled_popup:
failed_item = self._apply_filters_once(page)
if failed_item is None:
logging.info("处理网络异常弹窗后,筛选设置成功。")
return
first_label, first_option = failed_item
logging.warning(
"首次筛选失败:%s -> %s,尝试刷新目标页并重试一次。",
first_label,
first_option,
)
page.goto(self.url, wait_until="domcontentloaded", timeout=self.page_load_ms)
self._wait_before_login_check(page)
self._ensure_login(
page,
allow_manual_login=allow_manual_login,
on_login_invalid=on_login_invalid,
current_headless=current_headless,
)
failed_item = self._apply_filters_once(page)
if failed_item is None:
logging.info("刷新目标页后,筛选设置成功。")
return
label, option = failed_item
self._print_filter_debug_tips(label, option)
raise RuntimeError(f"筛选失败:{label} -> {option}")
def _recover_from_network_issue_popup(
self,
page: Page,
allow_manual_login: bool,
on_login_invalid: Optional[Callable[[str], None]] = None,
current_headless: bool = False,
) -> bool:
"""检测并处理“网络出现问题”弹窗。"""
has_network_issue = False
for keyword in selectors.NETWORK_ISSUE_KEYWORDS:
if page.get_by_text(keyword, exact=False).count() > 0:
has_network_issue = True
break
if not has_network_issue:
return False
logging.warning("检测到网络异常提示弹窗,尝试点击确认并等待页面恢复。")
clicked = False
for text in selectors.NETWORK_ISSUE_CONFIRM_TEXTS:
if self._try_click_confirm_button(page, text):
clicked = True
break
if not clicked:
logging.warning("检测到网络异常提示,但未找到可点击的确认按钮。")
return False
try:
page.wait_for_load_state("domcontentloaded", timeout=min(self.page_load_ms, 10000))
except PlaywrightTimeoutError:
pass
self._wait_before_login_check(page)
self._ensure_login(
page,
allow_manual_login=allow_manual_login,
on_login_invalid=on_login_invalid,
current_headless=current_headless,
)
return True
def _try_click_confirm_button(self, page: Page, button_text: str) -> bool:
"""点击弹窗确认按钮。"""
try:
button_by_role = page.get_by_role("button", name=button_text)
if button_by_role.count() > 0:
button_by_role.first.click(timeout=self.ui_wait_ms)
return True
except PlaywrightTimeoutError:
pass
try:
button_by_text = page.get_by_text(button_text, exact=True)
if button_by_text.count() > 0:
button_by_text.first.click(timeout=self.ui_wait_ms)
return True
except PlaywrightTimeoutError:
pass
return False
def _apply_filters_once(self, page: Page) -> Optional[tuple[str, str]]:
"""执行一轮筛选设置,失败时返回失败项。"""
for item in selectors.FILTER_CONFIG:
label = item["label"]
option = item["option"]
logging.info("设置筛选:%s -> %s", label, option)
success = self._set_single_filter(page, label, option)
if not success:
return label, option
page.wait_for_timeout(250)
return None
def _set_single_filter(self, page: Page, label: str, option: str) -> bool:
"""设置单个筛选项,使用多种稳健定位策略。"""
if self._set_filter_in_campus_block(page, label, option):
return True
if self._open_dropdown_by_role(page, label) and self._click_dropdown_option(page, option):
return True
if self._open_dropdown_near_label(page, label) and self._click_dropdown_option(page, option):
return True
if self._open_dropdown_by_label_text(page, label) and self._click_dropdown_option(page, option):
return True
return False
def _set_filter_in_campus_block(self, page: Page, label: str, option: str) -> bool:
"""策略 0:按 target.html 的搜索区块结构定位并选择。"""
label_match = page.locator(selectors.CAMPUS_FILTER_LABEL_SELECTOR, has_text=label)
blocks = page.locator(selectors.CAMPUS_FILTER_BLOCK_SELECTOR).filter(has=label_match)
if blocks.count() == 0:
return False
block = blocks.first
toggle_selector = ", ".join(selectors.CAMPUS_FILTER_TOGGLE_SELECTORS)
toggles = block.locator(toggle_selector)
if toggles.count() == 0:
return False
try:
toggles.first.click(timeout=self.ui_wait_ms)
except PlaywrightTimeoutError:
return False
# 优先精确匹配,避免“形势与政策”误点“形势与政策讲座”
exact_option = block.locator(selectors.CAMPUS_FILTER_OPTION_SELECTOR).filter(
has_text=re.compile(rf"^\\s*{re.escape(option)}\\s*$")
)
if exact_option.count() > 0:
try:
exact_option.first.click(timeout=self.ui_wait_ms)
return True
except PlaywrightTimeoutError:
return False
fuzzy_option = block.locator(selectors.CAMPUS_FILTER_OPTION_SELECTOR).filter(has_text=option)
if fuzzy_option.count() == 0:
return False
try:
fuzzy_option.first.click(timeout=self.ui_wait_ms)
return True
except PlaywrightTimeoutError:
return False
def _open_dropdown_by_role(self, page: Page, label: str) -> bool:
"""策略 1:通过 ARIA role + 可访问名称定位下拉框。"""
try:
combobox = page.get_by_role("combobox", name=re.compile(label))
if combobox.count() == 0:
return False
combobox.first.click(timeout=self.ui_wait_ms)
return True
except PlaywrightTimeoutError:
return False
def _open_dropdown_near_label(self, page: Page, label: str) -> bool:
"""策略 2:在常见表单项容器中,查找带指定 label 的下拉触发器。"""
for form_selector in selectors.FORM_ITEM_SELECTORS:
form_items = page.locator(form_selector).filter(has_text=label)
if form_items.count() == 0:
continue
form_item = form_items.first
for trigger_selector in selectors.DROPDOWN_TRIGGER_SELECTORS:
trigger = form_item.locator(trigger_selector)
if trigger.count() == 0:
continue
try:
trigger.first.click(timeout=self.ui_wait_ms)
return True
except PlaywrightTimeoutError:
continue
return False
def _open_dropdown_by_label_text(self, page: Page, label: str) -> bool:
"""策略 3:直接点击标签文本,适配部分自定义 UI。"""
try:
label_node = page.get_by_text(label, exact=False)
if label_node.count() == 0:
return False
label_node.first.click(timeout=self.ui_wait_ms)
return True
except PlaywrightTimeoutError:
return False
def _click_dropdown_option(self, page: Page, option: str) -> bool:
"""在已展开的下拉中选择目标项。"""
try:
option_by_role = page.get_by_role("option", name=option)
if option_by_role.count() > 0:
option_by_role.first.click(timeout=self.ui_wait_ms)
return True
except PlaywrightTimeoutError:
pass
for option_selector in selectors.DROPDOWN_OPTION_SELECTORS:
option_node = page.locator(option_selector).filter(has_text=option)
if option_node.count() == 0:
continue
try:
option_node.first.click(timeout=self.ui_wait_ms)
return True
except PlaywrightTimeoutError:
continue
try:
text_node = page.get_by_text(option, exact=True)
if text_node.count() > 0:
text_node.first.click(timeout=self.ui_wait_ms)
return True
except PlaywrightTimeoutError:
pass
return False
def _click_query(self, page: Page) -> None:
"""点击“查询”按钮。"""
try:
button = page.get_by_role("button", name=selectors.QUERY_BUTTON_TEXT)
if button.count() > 0:
button.first.click(timeout=self.ui_wait_ms)
return
except PlaywrightTimeoutError:
pass
text_button = page.get_by_text(selectors.QUERY_BUTTON_TEXT, exact=True)
if text_button.count() == 0:
raise RuntimeError("未找到“查询”按钮。")
text_button.first.click(timeout=self.ui_wait_ms)
def _parse_activities(self, page: Page) -> list[Activity]:
"""解析活动列表,提取标题、状态、时间。"""
page.wait_for_timeout(1200)
rows = self._find_result_rows(page)
if rows is None:
self._print_result_debug_tips()
return []
total = rows.count()
activities: list[Activity] = []
for idx in range(total):
row = rows.nth(idx)
activity = self._extract_activity(row)
if activity is not None:
activities.append(activity)
return activities
def _find_result_rows(self, page: Page) -> Optional[Locator]:
"""按顺序尝试多个结果行选择器。"""
for row_selector in selectors.RESULT_ROW_SELECTORS:
rows = page.locator(row_selector)
if rows.count() > 0:
logging.info("使用结果选择器: %s", row_selector)
return rows
return None
def _extract_activity(self, row: Locator) -> Optional[Activity]:
"""从单行中提取活动字段。"""
row_text = self._safe_inner_text(row)
if not row_text:
return None
title = self._extract_title(row, row_text)
if not title:
return None
status = self._extract_status(row_text)
time_text = self._extract_time(row_text)
detail_url = self._extract_detail_url(row)
return Activity(title=title, status=status, time_text=time_text, detail_url=detail_url)
def _extract_title(self, row: Locator, row_text: str) -> str:
"""优先从常见标题节点提取;失败再从整行文本推断。"""
for selector in selectors.TITLE_CANDIDATE_SELECTORS:
title_node = row.locator(selector)
if title_node.count() == 0:
continue
candidate = self._safe_inner_text(title_node.first)
if candidate:
return candidate
lines = [line.strip() for line in row_text.splitlines() if line.strip()]
for line in lines:
if any(keyword in line for keyword in selectors.STATUS_KEYWORDS):
continue
if self._contains_date(line):
continue
return line
return ""
def _extract_status(self, row_text: str) -> str:
"""根据关键词提取状态文本。"""
for keyword in selectors.STATUS_KEYWORDS:
if keyword in row_text:
return keyword
return "未知"
def _extract_time(self, row_text: str) -> str:
"""提取时间文本(兼容常见日期格式)。"""
time_pattern = re.compile(
r"(\d{4}[./-]\d{1,2}[./-]\d{1,2}(?:\s+\d{1,2}:\d{2})?"
r"(?:\s*[~\-至到]\s*\d{4}[./-]\d{1,2}[./-]\d{1,2}(?:\s+\d{1,2}:\d{2})?)?)"
)
match = time_pattern.search(row_text)
if match:
return match.group(1)
return "未识别"
def _contains_date(self, text: str) -> bool:
return bool(re.search(r"\d{4}[./-]\d{1,2}[./-]\d{1,2}", text))
def _safe_inner_text(self, locator: Locator) -> str:
"""安全读取文本,避免因局部节点异常导致整体流程中断。"""
try:
return locator.inner_text(timeout=800).strip()
except PlaywrightTimeoutError:
return ""
def _extract_detail_url(self, row: Locator) -> str:
"""提取活动详情链接并转换为可直达 URL。"""
anchors = row.locator("a[href]")
count = anchors.count()
if count == 0:
return ""
# 优先选择更像详情页的链接
for idx in range(min(count, 8)):
href = anchors.nth(idx).get_attribute("href")
normalized = self._normalize_url(href)
if "/activity/" in normalized or "partakedetail" in normalized:
return normalized
first_href = anchors.first.get_attribute("href")
return self._normalize_url(first_href)
def _normalize_url(self, href: Optional[str]) -> str:
"""将页面中的相对链接转为完整可访问链接。"""
if href is None:
return ""
clean_href = href.strip()
if not clean_href:
return ""
if clean_href.lower().startswith("javascript:"):
return ""
if clean_href.startswith("#"):
return f"{self.url_without_hash}{clean_href}"
return urljoin(self.url, clean_href)
def _wait_before_login_check(self, page: Page) -> None:
"""登录检测前等待页面稳定,降低误判率。"""
try:
# 若页面很快稳定,这里会提前返回;否则最多等待 5 秒
page.wait_for_load_state("networkidle", timeout=min(self.page_load_ms, 5000))
except PlaywrightTimeoutError:
# 某些页面网络请求常驻,networkidle 可能不会触发,忽略即可
pass
if self.login_check_delay_ms > 0:
page.wait_for_timeout(self.login_check_delay_ms)
def _recover_to_target_search_page(self, page: Page) -> bool:
"""登录后自动跳回目标页面并多次重试,处理被重定向到大厅首页的情况。"""
for attempt in range(1, self.login_recover_retry_count + 1):
logging.info(
"登录恢复:尝试回到目标检索页(%d/%d)",
attempt,
self.login_recover_retry_count,
)
page.goto(self.url, wait_until="domcontentloaded", timeout=self.page_load_ms)
self._wait_before_login_check(page)
if self._has_search_filters(page):
return True
if self.login_recover_retry_wait_ms > 0:
page.wait_for_timeout(self.login_recover_retry_wait_ms)
return False
def _print_login_debug_tips(self) -> None:
logging.error("TODO: 登录后仍未识别筛选区。请检查页面是否停留在活动检索页。")
logging.error("调试建议:确认 URL 是否为 /campus#/search,且页面已完全加载。")
def _print_filter_debug_tips(self, label: str, option: str) -> None:
logging.error("TODO: 无法定位筛选项 -> %s / %s", label, option)
logging.error("调试建议:使用浏览器开发者工具检查该筛选框的标签文本和下拉 DOM 结构。")
logging.error("调试建议:若页面使用了新组件,请在 page_selectors.py 中补充容器与选项选择器。")
def _print_result_debug_tips(self) -> None:
logging.warning("TODO: 查询后未匹配到结果行选择器。")
logging.warning("调试建议:检查列表是 table 还是 card,并在 page_selectors.py 的 RESULT_ROW_SELECTORS 中补充。")