Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 94 additions & 62 deletions llm_web_kit/main_html_parser/parser/layout_batch_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@
import re
from hashlib import sha256

import nltk
from lxml import html
from lxml.html import etree
from nltk.tokenize import word_tokenize
from selectolax.parser import HTMLParser

from llm_web_kit.html_layout.html_layout_cosin import get_feature, similarity
from llm_web_kit.input.pre_data_json import PreDataJson, PreDataJsonKey
from llm_web_kit.libs.html_utils import element_to_html, html_to_element
from llm_web_kit.main_html_parser.parser.parser import BaseMainHtmlParser

nltk.download('punkt', quiet=True) # 静默模式避免日志干扰
# Pre-compile regexes for performance
POST_NUMBER_REGEX = re.compile(r'(post|postid)-(\d+)', re.IGNORECASE)
NATURAL_LANGUAGE_REGEX = re.compile(r'[^\w\s]')
WHITESPACE_REGEX = re.compile(r'[ \t\n]+')
SPACES_REGEX = re.compile(r' +')
WORD_REGEX = re.compile(r'\w+')


MAX_LENGTH = 10
SIMILARITY_THRESHOLD = 0.75
DYNAMIC_ID_SIM_THRESHOLD = 0.9
DYNAMIC_ID_SIM_THRESHOLD = 0.85


class LayoutBatchParser(BaseMainHtmlParser):
Expand All @@ -31,6 +35,8 @@ def __init__(self, template_data: str | dict):
self.more_noise_enable = False
self.dynamic_classid_similarity_threshold = 0.85
self.ids = dict()
self.normalize_key_cache = {}
self.processed_template_data = None

def parse_tuple_key(self, key_str):
if key_str.startswith('(') and key_str.endswith(')'):
Expand Down Expand Up @@ -110,36 +116,45 @@ def process(self, html_source: str, template_dict_html: str) -> str:
return content, body

def get_tokens(self, content):
tokens = word_tokenize(content)
tokens = WORD_REGEX.findall(content)
return tokens

def normalize_key(self, tup):
if not tup:
return None
tup = tuple(tup)
if tup in self.normalize_key_cache:
return self.normalize_key_cache[tup]

tag, class_id, idd = tup
if class_id:
class_id = re.sub(r'[ \t\n]+', ' ', class_id)
class_id = WHITESPACE_REGEX.sub(' ', class_id)
if idd:
valid_id = self.ids.get(idd, True)
idd = re.sub(r' +', ' ', idd)
idd = SPACES_REGEX.sub(' ', idd)

# 如果有id,则无需判断class,因为有的网页和模版id相同,但是class不同
if tag in ['body', 'html']:
return (tag, None, None)
res = (tag, None, None)
self.normalize_key_cache[tup] = res
return res

if idd and valid_id:
idd_norm = self.replace_post_number(idd)
return (tag, None, idd_norm)
res = (tag, None, idd_norm)
self.normalize_key_cache[tup] = res
return res

return (tag, self.replace_post_number(class_id), self.replace_post_number(idd))
res = (tag, self.replace_post_number(class_id), self.replace_post_number(idd))
self.normalize_key_cache[tup] = res
return res

def replace_post_number(self, text):
if not text:
return None
# 匹配 "post-数字" 或 "postid-数字"(不区分大小写),并替换数字部分为空
pattern = r'(post|postid)-(\d+)'
# 使用 \1 保留前面的 "post" 或 "postid",但替换数字部分
return re.sub(pattern, lambda m: f'{m.group(1)}-', text, flags=re.IGNORECASE).strip()
return POST_NUMBER_REGEX.sub(lambda m: f'{m.group(1)}-', text).strip()

def find_blocks_drop(self, element, depth, element_dict, parent_keyy, parent_label, template_doc, tree):
# 判断这个tag是否有id
Expand All @@ -153,64 +168,32 @@ def find_blocks_drop(self, element, depth, element_dict, parent_keyy, parent_lab
is_natural_language = self.__is_natural_language(text) or length_tail >= 10
idd = element.get('id')
tag = element.tag
layer_nodes = element_dict.get(depth, {})
if tag in ['script', 'style', 'meta', 'link']:
return
class_tag = element.get('class')
ori_keyy = (tag, class_tag, idd)
if idd and idd.strip():
try:
idd_ele = tree.xpath(f'//*[@id="{idd}"]')
if len(idd_ele) > 3:
self.ids[idd] = False
else:
self.ids[idd] = True
except Exception:
self.ids[idd] = True
keyy = self.normalize_key(ori_keyy)
# 获取element的当前层的所有节点
element_parent = element.getparent()
current_layer_keys = {}
if element_parent is None:
child_str = html.tostring(element, encoding='utf-8').decode()
current_layer_keys[keyy] = (ori_keyy, child_str)
current_layer_keys[keyy] = (ori_keyy, element)
else:
for child in element_parent:
if isinstance(child, etree._Comment):
continue
child_ori_key = (child.tag, child.get('class'), child.get('id'))
child_key = self.normalize_key(child_ori_key)
child_str = html.tostring(child, encoding='utf-8').decode()
current_layer_keys[child_key] = (child_ori_key, child_str)
current_layer_keys[child_key] = (child_ori_key, child)
# 匹配正文节点
has_red = False
layer_nodes_dict = dict()
layer_nodes_dict_drop_tail = dict()
layer_norm_eles = {}
# 构造当前层的候选映射字典
for ele_keyy, ele_value in layer_nodes.items():
layer_node_idd = ele_keyy[2]
if layer_node_idd and layer_node_idd.strip() and layer_node_idd not in self.ids:
try:
idd_ele = template_doc.xpath(f'//*[@id="{layer_node_idd}"]')
if len(idd_ele) > 3:
self.ids[layer_node_idd] = False
else:
self.ids[layer_node_idd] = self.ids.get(layer_node_idd, True)
except Exception:
self.ids[layer_node_idd] = self.ids.get(layer_node_idd, True)
ele_parent_keyy = self.normalize_key(ele_value[1])
if ele_parent_keyy is not None:
ele_parent_keyy = tuple(ele_parent_keyy)
ele_label = ele_value[0]
is_drop_tail = ele_value[3]
norm_ele_keyy = self.normalize_key(ele_keyy[:3])
if norm_ele_keyy in layer_norm_eles:
layer_norm_eles[norm_ele_keyy].append((ele_label, ele_keyy[:3], ele_parent_keyy, is_drop_tail))
else:
layer_norm_eles[norm_ele_keyy] = [(ele_label, ele_keyy[:3], ele_parent_keyy, is_drop_tail)]
layer_norm_eles = self.processed_template_data.get(depth, {})
# 尝试匹配当前层每个节点,判断是否存在至少一个红色节点
for current_layer_key, current_layer_value in current_layer_keys.items():
current_layer_ori_key = current_layer_value[0]
node_html = current_layer_value[1]
node_element = current_layer_value[1]
if current_layer_key in layer_norm_eles:
for layer_norm_ele_value in layer_norm_eles[current_layer_key]:
if layer_norm_ele_value[2] != parent_keyy:
Expand All @@ -228,19 +211,18 @@ def find_blocks_drop(self, element, depth, element_dict, parent_keyy, parent_lab
break
# 动态id匹配逻辑
elif self.dynamic_id_enable and current_layer_key[2]:
node_label, matched_ele_key, is_drop_tail = self.__match_tag_class(layer_nodes, current_layer_ori_key, parent_keyy,
node_html, template_doc)
layer_nodes = element_dict.get(depth, {})
node_html = html.tostring(node_element, encoding='utf-8').decode()
node_label, matched_ele_key, is_drop_tail = self.__match_tag_class(layer_nodes, current_layer_ori_key, parent_keyy, node_html, template_doc)
if node_label is None and self.dynamic_classid_enable:
node_label, matched_ele_key, is_drop_tail = self.__match_tag(layer_nodes, current_layer_ori_key, parent_keyy,
node_html,
template_doc, False, True)
node_label, matched_ele_key, is_drop_tail = self.__match_tag(layer_nodes, current_layer_ori_key, parent_keyy, node_html, template_doc, False, True)
if node_label is None:
continue
# 采用element dict中的key来替换
if current_layer_key == keyy:
keyy = matched_ele_key
element.set('id', matched_ele_key[2])
if current_layer_key in layer_nodes_dict:
if matched_ele_key in layer_nodes_dict:
layer_nodes_dict[matched_ele_key].append(node_label)
layer_nodes_dict_drop_tail[matched_ele_key].append(is_drop_tail)
else:
Expand All @@ -249,16 +231,16 @@ def find_blocks_drop(self, element, depth, element_dict, parent_keyy, parent_lab
if node_label == 'red':
has_red = True
elif self.dynamic_id_enable and self.dynamic_classid_enable and current_layer_key[1]:
node_label, matched_ele_key, is_drop_tail = self.__match_tag(layer_nodes, current_layer_ori_key, parent_keyy,
node_html,
template_doc, True, False)
layer_nodes = element_dict.get(depth, {})
node_html = html.tostring(node_element, encoding='utf-8').decode()
node_label, matched_ele_key, is_drop_tail = self.__match_tag(layer_nodes, current_layer_ori_key, parent_keyy, node_html, template_doc, True, False)
if node_label is None:
continue
# 采用element dict中的key来替换
if current_layer_key == keyy:
keyy = matched_ele_key
element.set('class', matched_ele_key[1])
if current_layer_key in layer_nodes_dict:
if matched_ele_key in layer_nodes_dict:
layer_nodes_dict[matched_ele_key].append(node_label)
layer_nodes_dict_drop_tail[matched_ele_key].append(is_drop_tail)
else:
Expand Down Expand Up @@ -303,6 +285,8 @@ def drop_node_element(self, html_source, element_dict, template_dict_html):
# 解析 HTML 内容
tree = html_to_element(html_source)
doc = html_to_element(template_dict_html)
# 预处理元素字典和有效ids
self._preprocess_template_data(element_dict, doc, tree)
self.find_blocks_drop(tree, 0, element_dict, None, '', doc, tree)
return element_to_html(tree)

Expand Down Expand Up @@ -363,6 +347,47 @@ def __get_max_width_layer(self, element_dict):

return max_width_layer - 2 if max_width_layer > 4 else 3

def _preprocess_template_data(self, element_dict, template_doc, tree):
elements_with_id = tree.xpath('//*[@id]')
elements_with_id_dict = template_doc.xpath('//*[@id]')
ids_count = {}
ids_count_dict = {}
# 提取所有 id 的值
all_ids = [element.get('id') for element in elements_with_id]
for id in all_ids:
ids_count[id] = ids_count.get(id, 0) + 1
for id, count in ids_count.items():
if count > 3:
self.ids[id] = False
else:
self.ids[id] = True

# 提取所有element dict id 的值
all_ids_dict = [element.get('id') for element in elements_with_id_dict]
for id in all_ids_dict:
ids_count_dict[id] = ids_count_dict.get(id, 0) + 1
for id, count in ids_count_dict.items():
if count > 3:
self.ids[id] = False
else:
self.ids[id] = self.ids.get(id, True)
self.processed_template_data = {}

for depth, layer_nodes in element_dict.items():
layer_norm_eles = {}
for ele_keyy, ele_value in layer_nodes.items():
ele_parent_keyy = self.normalize_key(ele_value[1])
if ele_parent_keyy is not None:
ele_parent_keyy = tuple(ele_parent_keyy)
ele_label = ele_value[0]
is_drop_tail = ele_value[3]
norm_ele_keyy = self.normalize_key(ele_keyy[:3])
if norm_ele_keyy in layer_norm_eles:
layer_norm_eles[norm_ele_keyy].append((ele_label, ele_keyy[:3], ele_parent_keyy, is_drop_tail))
else:
layer_norm_eles[norm_ele_keyy] = [(ele_label, ele_keyy[:3], ele_parent_keyy, is_drop_tail)]
self.processed_template_data[depth] = layer_norm_eles

def __match_tag_class(self, layer_nodes, current_layer_key, parent_key, node_html, template_doc):
# 构建主键和父键的key
current_norm_key = (self.normalize_key((current_layer_key[0], current_layer_key[1], None)), parent_key)
Expand Down Expand Up @@ -427,16 +452,23 @@ def __match_tag(self, layer_nodes, current_layer_key, parent_key, node_html, tem
template_sim = similarity(feature1, feature2, layer_n=3)
if template_sim >= self.dynamic_classid_similarity_threshold:
return ele_label, self.normalize_key(ele_keyy[0:3]), is_drop_tail
for ele_keyy, ele_value in layer_nodes.items():
# first class方案
if ele_keyy[1] is not None and current_layer_key[1] is not None:
ele_parent_keyy = self.normalize_key(ele_value[1])
ele_label = ele_value[0]
is_drop_tail = ele_value[3]
if ele_parent_keyy is not None:
ele_parent_keyy = tuple(ele_parent_keyy)
current_norm_key_with_first_class = (
self.normalize_key((current_layer_key[0], current_layer_key[1].strip().split(' ')[0], None)),
parent_key)
norm_ele_keyy_with_first_class = self.normalize_key((ele_keyy[0], ele_keyy[1].strip().split(' ')[0], None))
norm_ele_keyy_parent_with_first_class = (norm_ele_keyy_with_first_class, ele_parent_keyy)
if current_norm_key_with_first_class == norm_ele_keyy_parent_with_first_class:
first_class_res = ele_label, self.normalize_key(ele_keyy[0:3]), is_drop_tail
return first_class_res
return first_class_res
return None, None, None

def __is_natural_language(self, text, min_words=10):
"""判断文本是否像自然语言.
Expand All @@ -446,5 +478,5 @@ def __is_natural_language(self, text, min_words=10):
:return: bool
"""
# 移除标点符号和多余空格
cleaned_text = re.sub(r'[^\w\s]', '', text.strip())
cleaned_text = NATURAL_LANGUAGE_REGEX.sub('', text.strip())
return len(cleaned_text) >= min_words
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ def test_all_ids(self):
parser = LayoutBatchParser({})
parts = parser.parse(pre_data)
main_html_body = parts[PreDataJsonKey.MAIN_HTML_BODY]
assert '全部按定尺或倍尺供應,提高材料的利用率' in main_html_body and '在線留言' not in main_html_body and '批發兼零售' not in main_html_body
assert '全部按定尺或倍尺供應,提高材料的利用率' in main_html_body and '批發兼零售' not in main_html_body and '在線留言' not in main_html_body

def test_multi_same_first_class_id(self):
# 构造测试html
Expand All @@ -445,5 +445,4 @@ def test_multi_same_first_class_id(self):
parser = LayoutBatchParser({})
parts = parser.parse(pre_data)
main_html_body = parts[PreDataJsonKey.MAIN_HTML_BODY]
print(main_html_body)
assert 'Spredfast wanted to follow' in main_html_body and 'Photography' not in main_html_body