From a7f33a0660dc8bbbc9c3eb10ec191b4b134271cd Mon Sep 17 00:00:00 2001 From: ranxia Date: Tue, 14 Jan 2025 20:24:57 +0800 Subject: [PATCH] update format --- .../integrations/readers/pai_pdf_reader.py | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/src/pai_rag/integrations/readers/pai_pdf_reader.py b/src/pai_rag/integrations/readers/pai_pdf_reader.py index 0a0feca1..45f2df65 100644 --- a/src/pai_rag/integrations/readers/pai_pdf_reader.py +++ b/src/pai_rag/integrations/readers/pai_pdf_reader.py @@ -20,7 +20,6 @@ from magic_pdf.config.ocr_content_type import BlockType, ContentType from magic_pdf.libs.commons import join_path from urllib.parse import urlparse -from collections import defaultdict DEFAULT_HEADING_DIFF_THRESHOLD = 2 @@ -71,8 +70,8 @@ def create_markdown( text_height_min = float("inf") text_height_max = 0 title_list = [] - # 存储每个title及其index - title_dict = defaultdict(list) + # 存储每个title的index + md_title_index = [] # 记录index index_count = 0 for page_info in pdf_info_dict: @@ -89,15 +88,17 @@ def create_markdown( paras_of_layout, img_buket_path, title_list, - title_dict, + md_title_index, text_height_min, text_height_max, index_count, ) output_content.extend(page_markdown) - self.post_process_multi_level_headings( - title_list, output_content, title_dict, text_height_min, text_height_max + new_title_list = self.post_process_multi_level_headings( + title_list, text_height_min, text_height_max ) + for idx, content_idx in enumerate(md_title_index): + output_content[content_idx] = new_title_list[idx] markdown_result = "\n\n".join(output_content) return markdown_result @@ -107,7 +108,7 @@ def create_page_markdown( paras_of_layout, img_buket_path, title_list, - title_dict, + md_title_index, text_height_min, text_height_max, index_count, @@ -123,7 +124,7 @@ def create_page_markdown( para_text = merge_para_with_text(para_block) elif para_type == BlockType.Title: para_text = f"# {merge_para_with_text(para_block)}" - title_dict[para_text].append(index_count) + md_title_index.append(index_count) elif para_type == BlockType.InterlineEquation: para_text = merge_para_with_text(para_block) elif para_type == BlockType.Image: @@ -203,14 +204,18 @@ def collect_title_info( return text_height_min, text_height_max def post_process_multi_level_headings( - self, title_list, output_content, title_dict, text_height_min, text_height_max + self, title_list, text_height_min, text_height_max ): logger.info( "*****************************start process headings*****************************" ) - sorted_list = sorted(title_list, key=itemgetter(1), reverse=True) + indexed_title_list = [ + (idx, title_text, title_height) + for idx, (title_text, title_height) in enumerate(title_list) + ] + sorted_list = sorted(indexed_title_list, key=itemgetter(2), reverse=True) diff_list = [ - (sorted_list[i][1] - sorted_list[i + 1][1], i) + (sorted_list[i][2] - sorted_list[i + 1][2], i) for i in range(len(sorted_list) - 1) ] sorted_diff = sorted(diff_list, key=itemgetter(0), reverse=True) @@ -221,31 +226,34 @@ def post_process_multi_level_headings( if diff >= DEFAULT_HEADING_DIFF_THRESHOLD and len(slice_index) <= 5: slice_index.append(index) slice_index.sort(reverse=True) + rank_mapping = {} # idx到rank的映射 rank = 1 cur_index = 0 if len(slice_index) > 0: cur_index = slice_index.pop() - for index, (title_text, title_height) in enumerate(sorted_list): + for index, (idx, title_text, title_height) in enumerate(sorted_list): if index > cur_index: rank += 1 if len(slice_index) > 0: cur_index = slice_index.pop() else: cur_index = len(sorted_list) - 1 - title_level = "#" * rank + " " + rank_mapping[idx] = rank + new_title_list = [] + for original_idx, (title_text, title_height) in enumerate(title_list): + assigned_rank = rank_mapping.get(original_idx, 6) + title_level = "#" * assigned_rank + " " + + # 高度范围在纯文本之间不作为标题 if text_height_min <= text_height_max and int( text_height_min ) <= title_height <= int(text_height_max): title_level = "" - old_title = "# " + title_text + new_title = title_level + title_text - if len(title_dict.get(old_title)) > 0: - md_index = title_dict.get(old_title).pop() - output_content[md_index] = new_title + new_title_list.append(new_title) - logger.info( - "*****************************process headings ended*****************************" - ) + return new_title_list def parse_pdf( self,