538 lines
17 KiB
Python
538 lines
17 KiB
Python
"""
|
||
Markdown解析模块
|
||
|
||
负责解析Markdown格式的文本,提取结构化信息和内联格式。
|
||
支持标题、列表、代码块、表格、链接等常见Markdown元素。
|
||
"""
|
||
|
||
import re
|
||
from typing import List, Dict, Any
|
||
from config import config
|
||
|
||
|
||
class MarkdownParser:
|
||
"""Markdown解析器类"""
|
||
|
||
# Markdown格式匹配模式
|
||
PATTERNS = {
|
||
'heading': re.compile(r'^(\s*)(#{1,6})\s+(.+)$'),
|
||
'bold_asterisk': re.compile(r'\*\*(.+?)\*\*'),
|
||
'bold_underscore': re.compile(r'__(.+?)__'),
|
||
'italic_asterisk': re.compile(r'(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)'),
|
||
'italic_underscore': re.compile(r'_(.+?)_'),
|
||
'code_inline': re.compile(r'`([^`]+)`'),
|
||
'code_block': re.compile(r'^```(\w+)?\s*\n(.*?)\n```', re.MULTILINE | re.DOTALL),
|
||
'strikethrough': re.compile(r'~~(.+?)~~'),
|
||
'link': re.compile(r'\[([^\]]+)\]\(([^)]+)\)'),
|
||
'image': re.compile(r'!\[([^\]]*)\]\(([^)]+)\)'),
|
||
'unordered_list': re.compile(r'^\s*[-*+]\s+(.+)$'),
|
||
'ordered_list': re.compile(r'^\s*\d+\.\s+(.+)$'),
|
||
'blockquote': re.compile(r'^\s*>\s*(.+)$'),
|
||
'horizontal_rule': re.compile(r'^(\s*[-*_]){3,}\s*$'),
|
||
'table_row': re.compile(r'^\|(.+)\|$'),
|
||
'table_separator': re.compile(r'^\|(\s*:?-+:?\s*\|)+$')
|
||
}
|
||
|
||
@classmethod
|
||
def parse(cls, txt_content: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
解析Markdown内容为结构化数据
|
||
|
||
Args:
|
||
txt_content: Markdown文本内容
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 解析后的结构化数据列表
|
||
"""
|
||
if not txt_content:
|
||
return []
|
||
|
||
elements = cls._parse_elements(txt_content)
|
||
sections = cls._group_by_sections(elements)
|
||
return sections
|
||
|
||
@classmethod
|
||
def _parse_elements(cls, txt_content: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
解析Markdown内容为元素列表
|
||
|
||
Args:
|
||
txt_content: Markdown文本内容
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 元素列表
|
||
"""
|
||
elements = []
|
||
lines = txt_content.split('\n')
|
||
i = 0
|
||
in_code_block = False
|
||
code_block_content = []
|
||
table_mode = False
|
||
table_rows = []
|
||
code_block_language = ""
|
||
|
||
while i < len(lines):
|
||
line = lines[i].rstrip('\r')
|
||
|
||
# 处理代码块
|
||
if line.strip().startswith('```'):
|
||
if not in_code_block:
|
||
in_code_block = True
|
||
code_block_language = line.strip()[3:].strip()
|
||
code_block_content = []
|
||
else:
|
||
in_code_block = False
|
||
elements.append({
|
||
'type': 'code_block',
|
||
'language': code_block_language,
|
||
'content': '\n'.join(code_block_content),
|
||
'level': 0
|
||
})
|
||
code_block_content = []
|
||
code_block_language = ""
|
||
i += 1
|
||
continue
|
||
|
||
if in_code_block:
|
||
code_block_content.append(line)
|
||
i += 1
|
||
continue
|
||
|
||
# 处理表格
|
||
table_match = cls.PATTERNS['table_row'].match(line)
|
||
table_sep_match = cls.PATTERNS['table_separator'].match(line)
|
||
|
||
if table_match or table_sep_match:
|
||
if not table_mode:
|
||
table_mode = True
|
||
table_rows = []
|
||
|
||
if table_match and not table_sep_match:
|
||
cells = [cell.strip() for cell in table_match.group(1).split('|')]
|
||
table_rows.append(cells)
|
||
|
||
i += 1
|
||
continue
|
||
elif table_mode:
|
||
# 表格结束
|
||
if table_rows:
|
||
elements.append({
|
||
'type': 'table',
|
||
'rows': table_rows,
|
||
'level': 0
|
||
})
|
||
table_mode = False
|
||
table_rows = []
|
||
|
||
# 处理标题
|
||
heading_match = cls.PATTERNS['heading'].match(line)
|
||
if heading_match:
|
||
level = len(heading_match.group(2))
|
||
if level <= config.title_levels:
|
||
heading_text = heading_match.group(3).strip()
|
||
# 先移除Markdown标记但保留文本内容
|
||
cleaned_text = re.sub(r'\*\*(.+?)\*\*|__(.+?)__', r'\1\2', heading_text)
|
||
elements.append({
|
||
'type': 'heading',
|
||
'level': level,
|
||
'content': heading_text, # 保留原始内容用于格式处理
|
||
'cleaned_content': cleaned_text # 用于显示的纯文本
|
||
})
|
||
i += 1
|
||
continue
|
||
|
||
# 处理水平分隔线
|
||
if cls.PATTERNS['horizontal_rule'].match(line):
|
||
elements.append({
|
||
'type': 'horizontal_rule',
|
||
'level': 0
|
||
})
|
||
i += 1
|
||
continue
|
||
|
||
# 处理列表
|
||
ul_match = cls.PATTERNS['unordered_list'].match(line)
|
||
ol_match = cls.PATTERNS['ordered_list'].match(line)
|
||
|
||
if ul_match:
|
||
elements.append({
|
||
'type': 'unordered_list',
|
||
'content': ul_match.group(1),
|
||
'level': 0
|
||
})
|
||
i += 1
|
||
continue
|
||
|
||
if ol_match:
|
||
elements.append({
|
||
'type': 'ordered_list',
|
||
'content': ol_match.group(1),
|
||
'level': 0
|
||
})
|
||
i += 1
|
||
continue
|
||
|
||
# 处理引用
|
||
quote_match = cls.PATTERNS['blockquote'].match(line)
|
||
if quote_match:
|
||
elements.append({
|
||
'type': 'blockquote',
|
||
'content': quote_match.group(1),
|
||
'level': 0
|
||
})
|
||
i += 1
|
||
continue
|
||
|
||
# 处理空行
|
||
if line.strip() == '':
|
||
elements.append({
|
||
'type': 'empty',
|
||
'content': '',
|
||
'level': 0
|
||
})
|
||
i += 1
|
||
continue
|
||
|
||
# 处理普通段落
|
||
elements.append({
|
||
'type': 'paragraph',
|
||
'content': line,
|
||
'level': 0
|
||
})
|
||
|
||
i += 1
|
||
|
||
# 处理剩余的表格
|
||
if table_mode and table_rows:
|
||
elements.append({
|
||
'type': 'table',
|
||
'rows': table_rows,
|
||
'level': 0
|
||
})
|
||
|
||
return elements
|
||
|
||
@classmethod
|
||
def _group_by_sections(cls, elements: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""
|
||
将解析的元素按标题分组
|
||
|
||
Args:
|
||
elements: 元素列表
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 按章节分组的结构
|
||
"""
|
||
sections = []
|
||
current_section = {
|
||
'type': 'section',
|
||
'level': 0,
|
||
'content': '前置内容',
|
||
'elements': []
|
||
}
|
||
|
||
for element in elements:
|
||
if element['type'] == 'heading':
|
||
# 保存当前section
|
||
if current_section['elements']:
|
||
sections.append(current_section)
|
||
|
||
# 创建新section
|
||
current_section = {
|
||
'type': 'section',
|
||
'level': element['level'],
|
||
'content': element['content'],
|
||
'elements': []
|
||
}
|
||
else:
|
||
current_section['elements'].append(element)
|
||
|
||
# 添加最后一个section
|
||
if current_section['elements'] or current_section['content'] != '前置内容':
|
||
sections.append(current_section)
|
||
|
||
return sections
|
||
|
||
@classmethod
|
||
def extract_inline_formatting(cls, text: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
提取行内格式信息
|
||
|
||
Args:
|
||
text: 要分析的文本
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 格式信息列表
|
||
"""
|
||
if not text:
|
||
return []
|
||
|
||
formatting = []
|
||
|
||
# 提取粗体 (**)
|
||
for match in cls.PATTERNS['bold_asterisk'].finditer(text):
|
||
formatting.append({
|
||
'type': 'bold',
|
||
'start': match.start(),
|
||
'end': match.end(),
|
||
'content': match.group(1)
|
||
})
|
||
|
||
# 提取粗体 (__)
|
||
for match in cls.PATTERNS['bold_underscore'].finditer(text):
|
||
formatting.append({
|
||
'type': 'bold',
|
||
'start': match.start(),
|
||
'end': match.end(),
|
||
'content': match.group(1)
|
||
})
|
||
|
||
# 提取斜体 (*)
|
||
for match in cls.PATTERNS['italic_asterisk'].finditer(text):
|
||
# 检查是否与粗体重叠
|
||
overlaps = any(f['start'] <= match.start() < f['end'] or f['start'] < match.end() <= f['end']
|
||
for f in formatting if f['type'] == 'bold')
|
||
if not overlaps:
|
||
formatting.append({
|
||
'type': 'italic',
|
||
'start': match.start(),
|
||
'end': match.end(),
|
||
'content': match.group(1)
|
||
})
|
||
|
||
# 提取斜体 (_)
|
||
for match in cls.PATTERNS['italic_underscore'].finditer(text):
|
||
overlaps = any(f['start'] <= match.start() < f['end'] or f['start'] < match.end() <= f['end']
|
||
for f in formatting if f['type'] in ['bold', 'italic'])
|
||
if not overlaps:
|
||
formatting.append({
|
||
'type': 'italic',
|
||
'start': match.start(),
|
||
'end': match.end(),
|
||
'content': match.group(1)
|
||
})
|
||
|
||
# 提取行内代码
|
||
for match in cls.PATTERNS['code_inline'].finditer(text):
|
||
formatting.append({
|
||
'type': 'code',
|
||
'start': match.start(),
|
||
'end': match.end(),
|
||
'content': match.group(1)
|
||
})
|
||
|
||
# 提取删除线
|
||
for match in cls.PATTERNS['strikethrough'].finditer(text):
|
||
formatting.append({
|
||
'type': 'strikethrough',
|
||
'start': match.start(),
|
||
'end': match.end(),
|
||
'content': match.group(1)
|
||
})
|
||
|
||
# 提取链接
|
||
for match in cls.PATTERNS['link'].finditer(text):
|
||
formatting.append({
|
||
'type': 'link',
|
||
'start': match.start(),
|
||
'end': match.end(),
|
||
'text': match.group(1),
|
||
'url': match.group(2)
|
||
})
|
||
|
||
# 按位置排序
|
||
formatting.sort(key=lambda x: x['start'])
|
||
return formatting
|
||
|
||
@classmethod
|
||
def clean_markdown_text(cls, text: str) -> str:
|
||
"""
|
||
清理Markdown标记,返回纯文本
|
||
|
||
Args:
|
||
text: 包含Markdown标记的文本
|
||
|
||
Returns:
|
||
str: 清理后的纯文本
|
||
"""
|
||
if not text:
|
||
return text
|
||
|
||
# 移除各种Markdown标记
|
||
cleaned = text
|
||
|
||
# 移除粗体和斜体标记
|
||
cleaned = re.sub(r'\*\*(.+?)\*\*', r'\1', cleaned) # **bold**
|
||
cleaned = re.sub(r'__(.+?)__', r'\1', cleaned) # __bold__
|
||
cleaned = re.sub(r'(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)', r'\1', cleaned) # *italic*
|
||
cleaned = re.sub(r'_(.+?)_', r'\1', cleaned) # _italic_
|
||
|
||
# 移除行内代码标记
|
||
cleaned = re.sub(r'`([^`]+)`', r'\1', cleaned) # `code`
|
||
|
||
# 移除删除线标记
|
||
cleaned = re.sub(r'~~(.+?)~~', r'\1', cleaned) # ~~strikethrough~~
|
||
|
||
# 移除链接标记,保留链接文本
|
||
cleaned = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', cleaned) # [text](url)
|
||
|
||
# 移除图片标记
|
||
cleaned = re.sub(r'!\[([^\]]*)\]\([^)]+\)', r'\1', cleaned) # 
|
||
|
||
return cleaned
|
||
|
||
@classmethod
|
||
def get_text_statistics(cls, text: str) -> Dict[str, int]:
|
||
"""
|
||
获取Markdown文本的统计信息
|
||
|
||
Args:
|
||
text: Markdown文本
|
||
|
||
Returns:
|
||
Dict[str, int]: 统计信息
|
||
"""
|
||
if not text:
|
||
return {
|
||
"total_chars": 0,
|
||
"total_lines": 0,
|
||
"headings": 0,
|
||
"paragraphs": 0,
|
||
"code_blocks": 0,
|
||
"tables": 0,
|
||
"links": 0,
|
||
"images": 0
|
||
}
|
||
|
||
stats = {
|
||
"total_chars": len(text),
|
||
"total_lines": len(text.split('\n')),
|
||
"headings": 0,
|
||
"paragraphs": 0,
|
||
"code_blocks": 0,
|
||
"tables": 0,
|
||
"links": 0,
|
||
"images": 0
|
||
}
|
||
|
||
# 统计各种元素
|
||
lines = text.split('\n')
|
||
in_code_block = False
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# 代码块
|
||
if line.startswith('```'):
|
||
if not in_code_block:
|
||
stats["code_blocks"] += 1
|
||
in_code_block = not in_code_block
|
||
continue
|
||
|
||
if in_code_block:
|
||
continue
|
||
|
||
# 标题
|
||
if cls.PATTERNS['heading'].match(line):
|
||
stats["headings"] += 1
|
||
continue
|
||
|
||
# 表格
|
||
if cls.PATTERNS['table_row'].match(line):
|
||
stats["tables"] += 1
|
||
continue
|
||
|
||
# 普通段落
|
||
if not (cls.PATTERNS['unordered_list'].match(line) or
|
||
cls.PATTERNS['ordered_list'].match(line) or
|
||
cls.PATTERNS['blockquote'].match(line) or
|
||
cls.PATTERNS['horizontal_rule'].match(line)):
|
||
stats["paragraphs"] += 1
|
||
|
||
# 统计链接和图片
|
||
stats["links"] = len(cls.PATTERNS['link'].findall(text))
|
||
stats["images"] = len(cls.PATTERNS['image'].findall(text))
|
||
|
||
return stats
|
||
|
||
@classmethod
|
||
def validate_markdown(cls, text: str) -> Dict[str, Any]:
|
||
"""
|
||
验证Markdown格式的有效性
|
||
|
||
Args:
|
||
text: 要验证的Markdown文本
|
||
|
||
Returns:
|
||
Dict[str, Any]: 验证结果
|
||
"""
|
||
result = {
|
||
"valid": True,
|
||
"warnings": [],
|
||
"errors": []
|
||
}
|
||
|
||
if not text:
|
||
result["warnings"].append("文本为空")
|
||
return result
|
||
|
||
lines = text.split('\n')
|
||
in_code_block = False
|
||
table_started = False
|
||
|
||
for i, line in enumerate(lines, 1):
|
||
line = line.rstrip()
|
||
|
||
# 检查代码块
|
||
if line.strip().startswith('```'):
|
||
in_code_block = not in_code_block
|
||
continue
|
||
|
||
if in_code_block:
|
||
continue
|
||
|
||
# 检查表格格式
|
||
if cls.PATTERNS['table_row'].match(line):
|
||
if not table_started:
|
||
table_started = True
|
||
# 检查表格格式
|
||
if not line.startswith('|') or not line.endswith('|'):
|
||
result["warnings"].append(f"第{i}行: 表格格式可能不完整")
|
||
elif table_started:
|
||
table_started = False
|
||
|
||
# 检查标题格式
|
||
heading_match = cls.PATTERNS['heading'].match(line)
|
||
if heading_match:
|
||
level = len(heading_match.group(2))
|
||
if level > 6:
|
||
result["warnings"].append(f"第{i}行: 标题层级过深 (>{6})")
|
||
|
||
# 检查未闭合的代码块
|
||
if in_code_block:
|
||
result["errors"].append("代码块未正确闭合")
|
||
result["valid"] = False
|
||
|
||
return result
|
||
|
||
|
||
# 创建全局解析器实例
|
||
markdown_parser = MarkdownParser()
|
||
|
||
|
||
# 兼容旧接口的函数
|
||
def parse(txt_content: str) -> List[Dict[str, Any]]:
|
||
"""解析Markdown内容(兼容旧接口)"""
|
||
return MarkdownParser.parse(txt_content)
|
||
|
||
|
||
def extract_inline_formatting(text: str) -> List[Dict[str, Any]]:
|
||
"""提取行内格式(兼容旧接口)"""
|
||
return MarkdownParser.extract_inline_formatting(text)
|
||
|
||
|
||
def group_by_sections(elements: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""按章节分组(兼容旧接口)"""
|
||
return MarkdownParser._group_by_sections(elements) |