414 lines
12 KiB
Python
414 lines
12 KiB
Python
"""
|
||
文本处理模块
|
||
|
||
负责文本的各种处理功能,包括顺序调换、标点符号替换、错别字处理等。
|
||
"""
|
||
|
||
from typing import Optional
|
||
from config import config
|
||
from error_chars import ErrorCharProcessor
|
||
from replacestr import replace_text
|
||
|
||
|
||
class TextProcessor:
|
||
"""文本处理器类,统一处理各种文本操作"""
|
||
|
||
def __init__(self):
|
||
"""初始化文本处理器"""
|
||
self.error_processor = None
|
||
self._init_error_processor()
|
||
|
||
def _init_error_processor(self) -> None:
|
||
"""初始化错别字处理器"""
|
||
if config.enable_char_errors:
|
||
self.error_processor = ErrorCharProcessor(config.char_error_db_path)
|
||
|
||
def replace_periods(self, text: str) -> str:
|
||
"""
|
||
将中间出现的句号统一替换为逗号;
|
||
若文本末尾是句号,则直接删除该句号。
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
str: 处理后的文本
|
||
"""
|
||
if not text:
|
||
return ''
|
||
|
||
text = text.rstrip()
|
||
if not text:
|
||
return ''
|
||
|
||
# 去掉末尾句号(如果有)
|
||
if text[-1] == '。':
|
||
text = text[:-1]
|
||
|
||
# 把剩余句号替换为逗号
|
||
return text.replace('。', ',')
|
||
|
||
def reverse_text_order(self, content: str) -> str:
|
||
"""
|
||
反转文本顺序(按字符级反转)
|
||
|
||
Args:
|
||
content: 输入文本
|
||
|
||
Returns:
|
||
str: 反转后的文本
|
||
"""
|
||
if not content:
|
||
return content
|
||
return content[::-1]
|
||
|
||
def reverse_paragraph_order(self, content: str) -> str:
|
||
"""
|
||
反转段落顺序(保留段落内文字顺序)
|
||
|
||
Args:
|
||
content: 输入文本
|
||
|
||
Returns:
|
||
str: 段落顺序反转后的文本
|
||
"""
|
||
if not content:
|
||
return content
|
||
paragraphs = content.split('\n')
|
||
return '\n'.join(reversed(paragraphs))
|
||
|
||
def apply_char_errors(self, text: str) -> str:
|
||
"""
|
||
应用错别字处理
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
str: 处理后的文本
|
||
"""
|
||
if not config.enable_char_errors or not text:
|
||
return text
|
||
|
||
try:
|
||
# 重新初始化错别字处理器(以防配置改变)
|
||
if not self.error_processor:
|
||
self._init_error_processor()
|
||
|
||
if self.error_processor:
|
||
modified_text, replace_count, _, _ = self.error_processor.introduce_char_errors(
|
||
text, config.char_error_intensity
|
||
)
|
||
if replace_count > 0:
|
||
print(f"已应用错别字处理,替换了 {replace_count} 个字符。")
|
||
return modified_text
|
||
|
||
except Exception as e:
|
||
# 如果错别字处理出错,返回原文本
|
||
print(f"错别字处理出错: {e}")
|
||
|
||
return text
|
||
|
||
def apply_text_order_processing(self, text: str) -> str:
|
||
"""
|
||
应用文字顺序处理
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
str: 处理后的文本
|
||
"""
|
||
if not config.reverse_text_order or not text:
|
||
return text
|
||
|
||
try:
|
||
return replace_text(text)
|
||
except Exception as e:
|
||
print(f"文字顺序处理出错: {e}")
|
||
return text
|
||
|
||
def process_text_content(self, text: str) -> str:
|
||
"""
|
||
统一处理文字内容:顺序调换、错别字处理和标点符号替换
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
str: 处理后的文本
|
||
"""
|
||
if not text or not text.strip():
|
||
return text
|
||
|
||
processed_text = text
|
||
|
||
# 先进行文字顺序处理
|
||
processed_text = self.apply_text_order_processing(processed_text)
|
||
|
||
# 应用错别字处理
|
||
processed_text = self.apply_char_errors(processed_text)
|
||
|
||
# 控制段落句子数
|
||
if config.max_sentences_per_paragraph > 0:
|
||
processed_text = self.limit_sentences_per_paragraph(processed_text, config.max_sentences_per_paragraph)
|
||
|
||
# 最后进行标点符号替换
|
||
if config.replace_punctuation:
|
||
processed_text = self.replace_periods(processed_text)
|
||
|
||
return processed_text
|
||
|
||
def clean_text(self, text: str) -> str:
|
||
"""
|
||
清理文本,去除多余的空白字符
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
str: 清理后的文本
|
||
"""
|
||
if not text:
|
||
return text
|
||
|
||
# 替换不同类型的换行符
|
||
text = text.replace("\r\n", "\n").replace("\r", "\n")
|
||
|
||
# 去除行尾空白
|
||
lines = [line.rstrip() for line in text.split('\n')]
|
||
|
||
return '\n'.join(lines)
|
||
|
||
def normalize_text(self, text: str) -> str:
|
||
"""
|
||
标准化文本格式
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
str: 标准化后的文本
|
||
"""
|
||
if not text:
|
||
return text
|
||
|
||
# 首先清理文本
|
||
text = self.clean_text(text)
|
||
|
||
# 统一中文标点符号
|
||
punctuation_map = {
|
||
',': ',', # 全角逗号
|
||
'。': '。', # 全角句号
|
||
'!': '!', # 全角感叹号
|
||
'?': '?', # 全角问号
|
||
';': ';', # 全角分号
|
||
':': ':', # 全角冒号
|
||
}
|
||
|
||
for old, new in punctuation_map.items():
|
||
text = text.replace(old, new)
|
||
|
||
return text
|
||
|
||
def get_processing_statistics(self, text: str) -> dict:
|
||
"""
|
||
获取文本处理统计信息
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
dict: 统计信息
|
||
"""
|
||
if not text:
|
||
return {
|
||
"total_chars": 0,
|
||
"total_lines": 0,
|
||
"non_empty_lines": 0,
|
||
"error_chars_enabled": config.enable_char_errors,
|
||
"estimated_error_replacements": 0
|
||
}
|
||
|
||
lines = text.split('\n')
|
||
non_empty_lines = [line for line in lines if line.strip()]
|
||
|
||
stats = {
|
||
"total_chars": len(text),
|
||
"total_lines": len(lines),
|
||
"non_empty_lines": len(non_empty_lines),
|
||
"error_chars_enabled": config.enable_char_errors,
|
||
"estimated_error_replacements": 0
|
||
}
|
||
|
||
# 如果启用了错别字处理,获取估计的替换数量
|
||
if config.enable_char_errors:
|
||
try:
|
||
if not self.error_processor:
|
||
self._init_error_processor()
|
||
|
||
if self.error_processor:
|
||
error_stats = self.error_processor.get_statistics(
|
||
text, config.char_error_intensity
|
||
)
|
||
stats["estimated_error_replacements"] = error_stats["estimated_replacements"]
|
||
stats["replaceable_chars"] = error_stats["replaceable_chars"]
|
||
except Exception as e:
|
||
print(f"获取错别字统计失败: {e}")
|
||
|
||
return stats
|
||
|
||
def preview_processing(self, text: str, max_length: int = 200) -> dict:
|
||
"""
|
||
预览文本处理效果(不修改原文本)
|
||
|
||
Args:
|
||
text: 输入文本
|
||
max_length: 预览文本的最大长度
|
||
|
||
Returns:
|
||
dict: 包含原文本和处理后文本的预览
|
||
"""
|
||
if not text:
|
||
return {
|
||
"original": "",
|
||
"processed": "",
|
||
"truncated": False
|
||
}
|
||
|
||
# 截取预览长度
|
||
preview_text = text[:max_length] if len(text) > max_length else text
|
||
truncated = len(text) > max_length
|
||
|
||
# 处理预览文本
|
||
processed_text = self.process_text_content(preview_text)
|
||
|
||
return {
|
||
"original": preview_text,
|
||
"processed": processed_text,
|
||
"truncated": truncated
|
||
}
|
||
|
||
def limit_sentences_per_paragraph(self, text: str, max_sentences: int) -> str:
|
||
"""
|
||
控制每个段落的句子数量
|
||
|
||
Args:
|
||
text: 输入文本
|
||
max_sentences: 每段最大句子数
|
||
|
||
Returns:
|
||
str: 处理后的文本
|
||
"""
|
||
if not text or max_sentences <= 0:
|
||
return text
|
||
|
||
# 定义句子结束标点符号
|
||
sentence_endings = ['。', '!', '?', '.', '!', '?']
|
||
|
||
# 按段落分割文本
|
||
paragraphs = text.split('\n')
|
||
processed_paragraphs = []
|
||
|
||
for paragraph in paragraphs:
|
||
if not paragraph.strip():
|
||
processed_paragraphs.append(paragraph)
|
||
continue
|
||
|
||
# 找到所有句子结束位置
|
||
sentences = []
|
||
current_sentence = ""
|
||
|
||
for char in paragraph:
|
||
current_sentence += char
|
||
# 如果是句子结束符号,则认为是一个完整句子
|
||
if char in sentence_endings:
|
||
sentences.append(current_sentence)
|
||
current_sentence = ""
|
||
|
||
# 添加最后一个可能没有结束符号的句子
|
||
if current_sentence.strip():
|
||
sentences.append(current_sentence)
|
||
|
||
# 如果段落句子数不超过限制,直接添加
|
||
if len(sentences) <= max_sentences:
|
||
processed_paragraphs.append(paragraph)
|
||
continue
|
||
|
||
# 如果超过限制,重新组织段落
|
||
new_paragraphs = []
|
||
current_new_paragraph = ""
|
||
|
||
for i, sentence in enumerate(sentences):
|
||
current_new_paragraph += sentence
|
||
|
||
# 每达到max_sentences句就换段落
|
||
if (i + 1) % max_sentences == 0:
|
||
new_paragraphs.append(current_new_paragraph.strip())
|
||
current_new_paragraph = ""
|
||
|
||
# 添加剩余的句子
|
||
if current_new_paragraph.strip():
|
||
new_paragraphs.append(current_new_paragraph.strip())
|
||
|
||
# 将新段落添加到结果中
|
||
processed_paragraphs.extend(new_paragraphs)
|
||
|
||
return '\n'.join(processed_paragraphs)
|
||
|
||
|
||
# 创建全局文本处理器实例
|
||
text_processor = TextProcessor()
|
||
|
||
|
||
# 兼容旧接口的函数
|
||
def process_text_content(text: str) -> str:
|
||
"""
|
||
处理文本内容(兼容旧接口)
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
str: 处理后的文本
|
||
"""
|
||
return text_processor.process_text_content(text)
|
||
|
||
|
||
def replace_periods(text: str) -> str:
|
||
"""
|
||
替换句号为逗号(兼容旧接口)
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
str: 处理后的文本
|
||
"""
|
||
return text_processor.replace_periods(text)
|
||
|
||
|
||
def reverse_text_order(content: str) -> str:
|
||
"""
|
||
反转文本顺序(兼容旧接口)
|
||
|
||
Args:
|
||
content: 输入文本
|
||
|
||
Returns:
|
||
str: 反转后的文本
|
||
"""
|
||
return text_processor.reverse_text_order(content)
|
||
|
||
|
||
def apply_char_errors(text: str) -> str:
|
||
"""
|
||
应用错别字处理(兼容旧接口)
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
str: 处理后的文本
|
||
"""
|
||
return text_processor.apply_char_errors(text) |