diff --git a/llm_web_kit/main_html_parser/parser/layout_batch_parser.py b/llm_web_kit/main_html_parser/parser/layout_batch_parser.py index 48a9f5c5..c439690b 100644 --- a/llm_web_kit/main_html_parser/parser/layout_batch_parser.py +++ b/llm_web_kit/main_html_parser/parser/layout_batch_parser.py @@ -2,10 +2,8 @@ import re from hashlib import sha256 -import nltk from lxml import html from lxml.html import etree -from nltk.tokenize import word_tokenize from selectolax.parser import HTMLParser from llm_web_kit.html_layout.html_layout_cosin import get_feature, similarity @@ -13,11 +11,17 @@ from llm_web_kit.libs.html_utils import element_to_html, html_to_element from llm_web_kit.main_html_parser.parser.parser import BaseMainHtmlParser -nltk.download('punkt', quiet=True) # 静默模式避免日志干扰 +# Pre-compile regexes for performance +POST_NUMBER_REGEX = re.compile(r'(post|postid)-(\d+)', re.IGNORECASE) +NATURAL_LANGUAGE_REGEX = re.compile(r'[^\w\s]') +WHITESPACE_REGEX = re.compile(r'[ \t\n]+') +SPACES_REGEX = re.compile(r' +') +WORD_REGEX = re.compile(r'\w+') + MAX_LENGTH = 10 SIMILARITY_THRESHOLD = 0.75 -DYNAMIC_ID_SIM_THRESHOLD = 0.9 +DYNAMIC_ID_SIM_THRESHOLD = 0.85 class LayoutBatchParser(BaseMainHtmlParser): @@ -31,6 +35,8 @@ def __init__(self, template_data: str | dict): self.more_noise_enable = False self.dynamic_classid_similarity_threshold = 0.85 self.ids = dict() + self.normalize_key_cache = {} + self.processed_template_data = None def parse_tuple_key(self, key_str): if key_str.startswith('(') and key_str.endswith(')'): @@ -110,36 +116,45 @@ def process(self, html_source: str, template_dict_html: str) -> str: return content, body def get_tokens(self, content): - tokens = word_tokenize(content) + tokens = WORD_REGEX.findall(content) return tokens def normalize_key(self, tup): if not tup: return None + tup = tuple(tup) + if tup in self.normalize_key_cache: + return self.normalize_key_cache[tup] + tag, class_id, idd = tup if class_id: - class_id = re.sub(r'[ \t\n]+', ' ', class_id) + class_id = WHITESPACE_REGEX.sub(' ', class_id) if idd: valid_id = self.ids.get(idd, True) - idd = re.sub(r' +', ' ', idd) + idd = SPACES_REGEX.sub(' ', idd) # 如果有id,则无需判断class,因为有的网页和模版id相同,但是class不同 if tag in ['body', 'html']: - return (tag, None, None) + res = (tag, None, None) + self.normalize_key_cache[tup] = res + return res if idd and valid_id: idd_norm = self.replace_post_number(idd) - return (tag, None, idd_norm) + res = (tag, None, idd_norm) + self.normalize_key_cache[tup] = res + return res - return (tag, self.replace_post_number(class_id), self.replace_post_number(idd)) + res = (tag, self.replace_post_number(class_id), self.replace_post_number(idd)) + self.normalize_key_cache[tup] = res + return res def replace_post_number(self, text): if not text: return None # 匹配 "post-数字" 或 "postid-数字"(不区分大小写),并替换数字部分为空 - pattern = r'(post|postid)-(\d+)' # 使用 \1 保留前面的 "post" 或 "postid",但替换数字部分 - return re.sub(pattern, lambda m: f'{m.group(1)}-', text, flags=re.IGNORECASE).strip() + return POST_NUMBER_REGEX.sub(lambda m: f'{m.group(1)}-', text).strip() def find_blocks_drop(self, element, depth, element_dict, parent_keyy, parent_label, template_doc, tree): # 判断这个tag是否有id @@ -153,64 +168,32 @@ def find_blocks_drop(self, element, depth, element_dict, parent_keyy, parent_lab is_natural_language = self.__is_natural_language(text) or length_tail >= 10 idd = element.get('id') tag = element.tag - layer_nodes = element_dict.get(depth, {}) + if tag in ['script', 'style', 'meta', 'link']: + return class_tag = element.get('class') ori_keyy = (tag, class_tag, idd) - if idd and idd.strip(): - try: - idd_ele = tree.xpath(f'//*[@id="{idd}"]') - if len(idd_ele) > 3: - self.ids[idd] = False - else: - self.ids[idd] = True - except Exception: - self.ids[idd] = True keyy = self.normalize_key(ori_keyy) # 获取element的当前层的所有节点 element_parent = element.getparent() current_layer_keys = {} if element_parent is None: - child_str = html.tostring(element, encoding='utf-8').decode() - current_layer_keys[keyy] = (ori_keyy, child_str) + current_layer_keys[keyy] = (ori_keyy, element) else: for child in element_parent: if isinstance(child, etree._Comment): continue child_ori_key = (child.tag, child.get('class'), child.get('id')) child_key = self.normalize_key(child_ori_key) - child_str = html.tostring(child, encoding='utf-8').decode() - current_layer_keys[child_key] = (child_ori_key, child_str) + current_layer_keys[child_key] = (child_ori_key, child) # 匹配正文节点 has_red = False layer_nodes_dict = dict() layer_nodes_dict_drop_tail = dict() - layer_norm_eles = {} - # 构造当前层的候选映射字典 - for ele_keyy, ele_value in layer_nodes.items(): - layer_node_idd = ele_keyy[2] - if layer_node_idd and layer_node_idd.strip() and layer_node_idd not in self.ids: - try: - idd_ele = template_doc.xpath(f'//*[@id="{layer_node_idd}"]') - if len(idd_ele) > 3: - self.ids[layer_node_idd] = False - else: - self.ids[layer_node_idd] = self.ids.get(layer_node_idd, True) - except Exception: - self.ids[layer_node_idd] = self.ids.get(layer_node_idd, True) - ele_parent_keyy = self.normalize_key(ele_value[1]) - if ele_parent_keyy is not None: - ele_parent_keyy = tuple(ele_parent_keyy) - ele_label = ele_value[0] - is_drop_tail = ele_value[3] - norm_ele_keyy = self.normalize_key(ele_keyy[:3]) - if norm_ele_keyy in layer_norm_eles: - layer_norm_eles[norm_ele_keyy].append((ele_label, ele_keyy[:3], ele_parent_keyy, is_drop_tail)) - else: - layer_norm_eles[norm_ele_keyy] = [(ele_label, ele_keyy[:3], ele_parent_keyy, is_drop_tail)] + layer_norm_eles = self.processed_template_data.get(depth, {}) # 尝试匹配当前层每个节点,判断是否存在至少一个红色节点 for current_layer_key, current_layer_value in current_layer_keys.items(): current_layer_ori_key = current_layer_value[0] - node_html = current_layer_value[1] + node_element = current_layer_value[1] if current_layer_key in layer_norm_eles: for layer_norm_ele_value in layer_norm_eles[current_layer_key]: if layer_norm_ele_value[2] != parent_keyy: @@ -228,19 +211,18 @@ def find_blocks_drop(self, element, depth, element_dict, parent_keyy, parent_lab break # 动态id匹配逻辑 elif self.dynamic_id_enable and current_layer_key[2]: - node_label, matched_ele_key, is_drop_tail = self.__match_tag_class(layer_nodes, current_layer_ori_key, parent_keyy, - node_html, template_doc) + layer_nodes = element_dict.get(depth, {}) + node_html = html.tostring(node_element, encoding='utf-8').decode() + node_label, matched_ele_key, is_drop_tail = self.__match_tag_class(layer_nodes, current_layer_ori_key, parent_keyy, node_html, template_doc) if node_label is None and self.dynamic_classid_enable: - node_label, matched_ele_key, is_drop_tail = self.__match_tag(layer_nodes, current_layer_ori_key, parent_keyy, - node_html, - template_doc, False, True) + node_label, matched_ele_key, is_drop_tail = self.__match_tag(layer_nodes, current_layer_ori_key, parent_keyy, node_html, template_doc, False, True) if node_label is None: continue # 采用element dict中的key来替换 if current_layer_key == keyy: keyy = matched_ele_key element.set('id', matched_ele_key[2]) - if current_layer_key in layer_nodes_dict: + if matched_ele_key in layer_nodes_dict: layer_nodes_dict[matched_ele_key].append(node_label) layer_nodes_dict_drop_tail[matched_ele_key].append(is_drop_tail) else: @@ -249,16 +231,16 @@ def find_blocks_drop(self, element, depth, element_dict, parent_keyy, parent_lab if node_label == 'red': has_red = True elif self.dynamic_id_enable and self.dynamic_classid_enable and current_layer_key[1]: - node_label, matched_ele_key, is_drop_tail = self.__match_tag(layer_nodes, current_layer_ori_key, parent_keyy, - node_html, - template_doc, True, False) + layer_nodes = element_dict.get(depth, {}) + node_html = html.tostring(node_element, encoding='utf-8').decode() + node_label, matched_ele_key, is_drop_tail = self.__match_tag(layer_nodes, current_layer_ori_key, parent_keyy, node_html, template_doc, True, False) if node_label is None: continue # 采用element dict中的key来替换 if current_layer_key == keyy: keyy = matched_ele_key element.set('class', matched_ele_key[1]) - if current_layer_key in layer_nodes_dict: + if matched_ele_key in layer_nodes_dict: layer_nodes_dict[matched_ele_key].append(node_label) layer_nodes_dict_drop_tail[matched_ele_key].append(is_drop_tail) else: @@ -303,6 +285,8 @@ def drop_node_element(self, html_source, element_dict, template_dict_html): # 解析 HTML 内容 tree = html_to_element(html_source) doc = html_to_element(template_dict_html) + # 预处理元素字典和有效ids + self._preprocess_template_data(element_dict, doc, tree) self.find_blocks_drop(tree, 0, element_dict, None, '', doc, tree) return element_to_html(tree) @@ -363,6 +347,47 @@ def __get_max_width_layer(self, element_dict): return max_width_layer - 2 if max_width_layer > 4 else 3 + def _preprocess_template_data(self, element_dict, template_doc, tree): + elements_with_id = tree.xpath('//*[@id]') + elements_with_id_dict = template_doc.xpath('//*[@id]') + ids_count = {} + ids_count_dict = {} + # 提取所有 id 的值 + all_ids = [element.get('id') for element in elements_with_id] + for id in all_ids: + ids_count[id] = ids_count.get(id, 0) + 1 + for id, count in ids_count.items(): + if count > 3: + self.ids[id] = False + else: + self.ids[id] = True + + # 提取所有element dict id 的值 + all_ids_dict = [element.get('id') for element in elements_with_id_dict] + for id in all_ids_dict: + ids_count_dict[id] = ids_count_dict.get(id, 0) + 1 + for id, count in ids_count_dict.items(): + if count > 3: + self.ids[id] = False + else: + self.ids[id] = self.ids.get(id, True) + self.processed_template_data = {} + + for depth, layer_nodes in element_dict.items(): + layer_norm_eles = {} + for ele_keyy, ele_value in layer_nodes.items(): + ele_parent_keyy = self.normalize_key(ele_value[1]) + if ele_parent_keyy is not None: + ele_parent_keyy = tuple(ele_parent_keyy) + ele_label = ele_value[0] + is_drop_tail = ele_value[3] + norm_ele_keyy = self.normalize_key(ele_keyy[:3]) + if norm_ele_keyy in layer_norm_eles: + layer_norm_eles[norm_ele_keyy].append((ele_label, ele_keyy[:3], ele_parent_keyy, is_drop_tail)) + else: + layer_norm_eles[norm_ele_keyy] = [(ele_label, ele_keyy[:3], ele_parent_keyy, is_drop_tail)] + self.processed_template_data[depth] = layer_norm_eles + def __match_tag_class(self, layer_nodes, current_layer_key, parent_key, node_html, template_doc): # 构建主键和父键的key current_norm_key = (self.normalize_key((current_layer_key[0], current_layer_key[1], None)), parent_key) @@ -427,8 +452,14 @@ def __match_tag(self, layer_nodes, current_layer_key, parent_key, node_html, tem template_sim = similarity(feature1, feature2, layer_n=3) if template_sim >= self.dynamic_classid_similarity_threshold: return ele_label, self.normalize_key(ele_keyy[0:3]), is_drop_tail + for ele_keyy, ele_value in layer_nodes.items(): # first class方案 if ele_keyy[1] is not None and current_layer_key[1] is not None: + ele_parent_keyy = self.normalize_key(ele_value[1]) + ele_label = ele_value[0] + is_drop_tail = ele_value[3] + if ele_parent_keyy is not None: + ele_parent_keyy = tuple(ele_parent_keyy) current_norm_key_with_first_class = ( self.normalize_key((current_layer_key[0], current_layer_key[1].strip().split(' ')[0], None)), parent_key) @@ -436,7 +467,8 @@ def __match_tag(self, layer_nodes, current_layer_key, parent_key, node_html, tem norm_ele_keyy_parent_with_first_class = (norm_ele_keyy_with_first_class, ele_parent_keyy) if current_norm_key_with_first_class == norm_ele_keyy_parent_with_first_class: first_class_res = ele_label, self.normalize_key(ele_keyy[0:3]), is_drop_tail - return first_class_res + return first_class_res + return None, None, None def __is_natural_language(self, text, min_words=10): """判断文本是否像自然语言. @@ -446,5 +478,5 @@ def __is_natural_language(self, text, min_words=10): :return: bool """ # 移除标点符号和多余空格 - cleaned_text = re.sub(r'[^\w\s]', '', text.strip()) + cleaned_text = NATURAL_LANGUAGE_REGEX.sub('', text.strip()) return len(cleaned_text) >= min_words diff --git a/tests/llm_web_kit/main_html_parser/parser/test_layout_parser.py b/tests/llm_web_kit/main_html_parser/parser/test_layout_parser.py index 3eaf8471..606bb7e4 100644 --- a/tests/llm_web_kit/main_html_parser/parser/test_layout_parser.py +++ b/tests/llm_web_kit/main_html_parser/parser/test_layout_parser.py @@ -419,7 +419,7 @@ def test_all_ids(self): parser = LayoutBatchParser({}) parts = parser.parse(pre_data) main_html_body = parts[PreDataJsonKey.MAIN_HTML_BODY] - assert '全部按定尺或倍尺供應,提高材料的利用率' in main_html_body and '在線留言' not in main_html_body and '批發兼零售' not in main_html_body + assert '全部按定尺或倍尺供應,提高材料的利用率' in main_html_body and '批發兼零售' not in main_html_body and '在線留言' not in main_html_body def test_multi_same_first_class_id(self): # 构造测试html @@ -445,5 +445,4 @@ def test_multi_same_first_class_id(self): parser = LayoutBatchParser({}) parts = parser.parse(pre_data) main_html_body = parts[PreDataJsonKey.MAIN_HTML_BODY] - print(main_html_body) assert 'Spredfast wanted to follow' in main_html_body and 'Photography' not in main_html_body