fix: 优化文档分割处理 (#814)

This commit is contained in:
gcalgoz 2024-07-19 17:33:18 +08:00 committed by GitHub
parent d3d09b10ec
commit 4d8ac28674
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -19,6 +19,7 @@ def get_level_block(text, level_content_list, level_content_index, cursor):
:param text: 文本 :param text: 文本
:param level_content_list: 拆分的title数组 :param level_content_list: 拆分的title数组
:param level_content_index: 指定的下标 :param level_content_index: 指定的下标
:param cursor: 开始的下标位置
:return: 拆分后的文本数据 :return: 拆分后的文本数据
""" """
start_content: str = level_content_list[level_content_index].get('content') start_content: str = level_content_list[level_content_index].get('content')
@ -26,7 +27,7 @@ def get_level_block(text, level_content_list, level_content_index, cursor):
level_content_list) else None level_content_list) else None
start_index = text.index(start_content, cursor) start_index = text.index(start_content, cursor)
end_index = text.index(next_content, start_index + 1) if next_content is not None else len(text) end_index = text.index(next_content, start_index + 1) if next_content is not None else len(text)
return text[start_index:end_index].lstrip(level_content_list[level_content_index]['content']), end_index return text[start_index+len(start_content):end_index], end_index
def to_tree_obj(content, state='title'): def to_tree_obj(content, state='title'):
@ -148,10 +149,10 @@ def to_block_paragraph(tree_data_list: List[dict]):
def parse_title_level(text, content_level_pattern: List, index): def parse_title_level(text, content_level_pattern: List, index):
if len(content_level_pattern) == index: if index >= len(content_level_pattern):
return [] return []
result = parse_level(text, content_level_pattern[index]) result = parse_level(text, content_level_pattern[index])
if len(result) == 0 and len(content_level_pattern) > index + 1: if len(result) == 0 and len(content_level_pattern) > index:
return parse_title_level(text, content_level_pattern, index + 1) return parse_title_level(text, content_level_pattern, index + 1)
return result return result
@ -213,46 +214,48 @@ def group_by(list_source: List, key):
return result return result
def result_tree_to_paragraph(result_tree: List[dict], result, parent_chain): def result_tree_to_paragraph(result_tree: List[dict], result, parent_chain, with_filter: bool):
""" """
转换为分段对象 转换为分段对象
:param result_tree: 解析文本的树 :param result_tree: 解析文本的树
:param result: [] 用于递归 :param result: [] 用于递归
:param parent_chain: [] 用户递归存储数据 :param parent_chain: [] 用户递归存储数据
:param with_filter: 是否过滤block
:return: List[{'problem':'xx','content':'xx'}] :return: List[{'problem':'xx','content':'xx'}]
""" """
for item in result_tree: for item in result_tree:
if item.get('state') == 'block': if item.get('state') == 'block':
result.append({'title': " ".join(parent_chain), 'content': item.get("content")}) result.append({'title': " ".join(parent_chain),
'content': filter_special_char(item.get("content")) if with_filter else item.get("content")})
children = item.get("children") children = item.get("children")
if children is not None and len(children) > 0: if children is not None and len(children) > 0:
result_tree_to_paragraph(children, result, [*parent_chain, item.get('content')]) result_tree_to_paragraph(children, result,
[*parent_chain, remove_special_symbol(item.get('content'))], with_filter)
return result return result
def post_handler_paragraph(content: str, limit: int, with_filter: bool): def post_handler_paragraph(content: str, limit: int):
"""
根据文本的最大字符分段
:param content: 需要分段的文本字段
:param limit: 最大分段字符
:return: 分段后数据
""" """
根据文本的最大字符分段
:param with_filter: 是否过滤特殊字符
:param content: 需要分段的文本字段
:param limit: 最大分段字符
:return: 分段后数据
"""
split_list = content.split('\n')
result = [] result = []
temp_char = '' temp_char, start = '', 0
for split in split_list: while (pos := content.find("\n", start)) != -1:
split, start = content[start:pos + 1], pos + 1
if len(temp_char + split) > limit: if len(temp_char + split) > limit:
result.append(temp_char) result.append(temp_char)
temp_char = '' temp_char = ''
temp_char = temp_char + split + '\n' temp_char = temp_char + split
temp_char = temp_char + content[start:]
if len(temp_char) > 0: if len(temp_char) > 0:
result.append(temp_char) result.append(temp_char)
pattern = "[\\S\\s]{1," + str(limit) + '}' pattern = "[\\S\\s]{1," + str(limit) + '}'
# 如果\n 单段超过限制,则继续拆分 # 如果\n 单段超过限制,则继续拆分
s = list(map(lambda row: filter_special_char(row) if with_filter else row, list( return reduce(lambda x, y: [*x, *y], map(lambda row: re.findall(pattern, row), result), [])
reduce(lambda x, y: [*x, *y], list(map(lambda row: list(re.findall(pattern, row)), result)), []))))
return s
replace_map = { replace_map = {
@ -293,40 +296,24 @@ class SplitModel:
:param index: 从那个正则开始解析 :param index: 从那个正则开始解析
:return: 解析后的树形结果数据 :return: 解析后的树形结果数据
""" """
if len(self.content_level_pattern) == index: level_content_list = parse_title_level(text, self.content_level_pattern, index)
return if len(level_content_list) == 0:
level_content_list = parse_title_level(text, self.content_level_pattern, 0) return list(map(lambda row: to_tree_obj(row, 'block'), post_handler_paragraph(text, limit=self.limit)))
if index == 0 and text.lstrip().index(level_content_list[0]["content"].lstrip()) != 0:
level_content_list.insert(0, to_tree_obj(""))
cursor = 0 cursor = 0
for i in range(len(level_content_list)): for i in range(len(level_content_list)):
block, cursor = get_level_block(text, level_content_list, i, cursor) block, cursor = get_level_block(text, level_content_list, i, cursor)
children = self.parse_to_tree(text=block, if len(block) == 0:
index=index + 1) level_content_list[i]['children'] = [to_tree_obj("", "block")]
if children is not None and len(children) > 0: continue
level_content_list[i]['children'] = children children = self.parse_to_tree(text=block, index=index + 1)
else: level_content_list[i]['children'] = children
if len(block) > 0: first_child_idx_in_block = block.lstrip().index(children[0]["content"].lstrip())
level_content_list[i]['children'] = list( if first_child_idx_in_block != 0:
map(lambda row: to_tree_obj(row, 'block'), inner_children = self.parse_to_tree(block[:first_child_idx_in_block], index + 1)
post_handler_paragraph(block, with_filter=self.with_filter, limit=self.limit))) level_content_list[i]['children'].extend(inner_children)
if len(level_content_list) > 0:
end_index = text.index(level_content_list[0].get('content'))
if end_index == 0:
return level_content_list
other_content = text[0:end_index]
children = self.parse_to_tree(text=other_content,
index=index)
if len(children) > 0:
level_content_list = [*level_content_list, *children]
else:
if len(other_content.strip()) > 0:
level_content_list = [*level_content_list, *list(
map(lambda row: to_tree_obj(row, 'block'),
post_handler_paragraph(other_content, with_filter=self.with_filter, limit=self.limit)))]
else:
if len(text.strip()) > 0:
level_content_list = [*level_content_list, *list(
map(lambda row: to_tree_obj(row, 'block'),
post_handler_paragraph(text, with_filter=self.with_filter, limit=self.limit)))]
return level_content_list return level_content_list
def parse(self, text: str): def parse(self, text: str):
@ -339,7 +326,7 @@ class SplitModel:
text = text.replace('\r', '\n') text = text.replace('\r', '\n')
text = text.replace("\0", '') text = text.replace("\0", '')
result_tree = self.parse_to_tree(text, 0) result_tree = self.parse_to_tree(text, 0)
result = result_tree_to_paragraph(result_tree, [], []) result = result_tree_to_paragraph(result_tree, [], [], self.with_filter)
return [item for item in [self.post_reset_paragraph(row) for row in result] if return [item for item in [self.post_reset_paragraph(row) for row in result] if
'content' in item and len(item.get('content').strip()) > 0] 'content' in item and len(item.get('content').strip()) > 0]