TxT2Docx/text_processor.py
wsb1224 d3ac3238ed 更新功能:
段落控制功能,可自定义控制每个段落有多少句话
2025-10-15 17:54:51 +08:00

414 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
文本处理模块
负责文本的各种处理功能,包括顺序调换、标点符号替换、错别字处理等。
"""
from typing import Optional
from config import config
from error_chars import ErrorCharProcessor
from replacestr import replace_text
class TextProcessor:
"""文本处理器类,统一处理各种文本操作"""
def __init__(self):
"""初始化文本处理器"""
self.error_processor = None
self._init_error_processor()
def _init_error_processor(self) -> None:
"""初始化错别字处理器"""
if config.enable_char_errors:
self.error_processor = ErrorCharProcessor(config.char_error_db_path)
def replace_periods(self, text: str) -> str:
"""
将中间出现的句号统一替换为逗号;
若文本末尾是句号,则直接删除该句号。
Args:
text: 输入文本
Returns:
str: 处理后的文本
"""
if not text:
return ''
text = text.rstrip()
if not text:
return ''
# 去掉末尾句号(如果有)
if text[-1] == '':
text = text[:-1]
# 把剩余句号替换为逗号
return text.replace('', '')
def reverse_text_order(self, content: str) -> str:
"""
反转文本顺序(按字符级反转)
Args:
content: 输入文本
Returns:
str: 反转后的文本
"""
if not content:
return content
return content[::-1]
def reverse_paragraph_order(self, content: str) -> str:
"""
反转段落顺序(保留段落内文字顺序)
Args:
content: 输入文本
Returns:
str: 段落顺序反转后的文本
"""
if not content:
return content
paragraphs = content.split('\n')
return '\n'.join(reversed(paragraphs))
def apply_char_errors(self, text: str) -> str:
"""
应用错别字处理
Args:
text: 输入文本
Returns:
str: 处理后的文本
"""
if not config.enable_char_errors or not text:
return text
try:
# 重新初始化错别字处理器(以防配置改变)
if not self.error_processor:
self._init_error_processor()
if self.error_processor:
modified_text, replace_count, _, _ = self.error_processor.introduce_char_errors(
text, config.char_error_intensity
)
if replace_count > 0:
print(f"已应用错别字处理,替换了 {replace_count} 个字符。")
return modified_text
except Exception as e:
# 如果错别字处理出错,返回原文本
print(f"错别字处理出错: {e}")
return text
def apply_text_order_processing(self, text: str) -> str:
"""
应用文字顺序处理
Args:
text: 输入文本
Returns:
str: 处理后的文本
"""
if not config.reverse_text_order or not text:
return text
try:
return replace_text(text)
except Exception as e:
print(f"文字顺序处理出错: {e}")
return text
def process_text_content(self, text: str) -> str:
"""
统一处理文字内容:顺序调换、错别字处理和标点符号替换
Args:
text: 输入文本
Returns:
str: 处理后的文本
"""
if not text or not text.strip():
return text
processed_text = text
# 先进行文字顺序处理
processed_text = self.apply_text_order_processing(processed_text)
# 应用错别字处理
processed_text = self.apply_char_errors(processed_text)
# 控制段落句子数
if config.max_sentences_per_paragraph > 0:
processed_text = self.limit_sentences_per_paragraph(processed_text, config.max_sentences_per_paragraph)
# 最后进行标点符号替换
if config.replace_punctuation:
processed_text = self.replace_periods(processed_text)
return processed_text
def clean_text(self, text: str) -> str:
"""
清理文本,去除多余的空白字符
Args:
text: 输入文本
Returns:
str: 清理后的文本
"""
if not text:
return text
# 替换不同类型的换行符
text = text.replace("\r\n", "\n").replace("\r", "\n")
# 去除行尾空白
lines = [line.rstrip() for line in text.split('\n')]
return '\n'.join(lines)
def normalize_text(self, text: str) -> str:
"""
标准化文本格式
Args:
text: 输入文本
Returns:
str: 标准化后的文本
"""
if not text:
return text
# 首先清理文本
text = self.clean_text(text)
# 统一中文标点符号
punctuation_map = {
'': '', # 全角逗号
'': '', # 全角句号
'': '', # 全角感叹号
'': '', # 全角问号
'': '', # 全角分号
'': '', # 全角冒号
}
for old, new in punctuation_map.items():
text = text.replace(old, new)
return text
def get_processing_statistics(self, text: str) -> dict:
"""
获取文本处理统计信息
Args:
text: 输入文本
Returns:
dict: 统计信息
"""
if not text:
return {
"total_chars": 0,
"total_lines": 0,
"non_empty_lines": 0,
"error_chars_enabled": config.enable_char_errors,
"estimated_error_replacements": 0
}
lines = text.split('\n')
non_empty_lines = [line for line in lines if line.strip()]
stats = {
"total_chars": len(text),
"total_lines": len(lines),
"non_empty_lines": len(non_empty_lines),
"error_chars_enabled": config.enable_char_errors,
"estimated_error_replacements": 0
}
# 如果启用了错别字处理,获取估计的替换数量
if config.enable_char_errors:
try:
if not self.error_processor:
self._init_error_processor()
if self.error_processor:
error_stats = self.error_processor.get_statistics(
text, config.char_error_intensity
)
stats["estimated_error_replacements"] = error_stats["estimated_replacements"]
stats["replaceable_chars"] = error_stats["replaceable_chars"]
except Exception as e:
print(f"获取错别字统计失败: {e}")
return stats
def preview_processing(self, text: str, max_length: int = 200) -> dict:
"""
预览文本处理效果(不修改原文本)
Args:
text: 输入文本
max_length: 预览文本的最大长度
Returns:
dict: 包含原文本和处理后文本的预览
"""
if not text:
return {
"original": "",
"processed": "",
"truncated": False
}
# 截取预览长度
preview_text = text[:max_length] if len(text) > max_length else text
truncated = len(text) > max_length
# 处理预览文本
processed_text = self.process_text_content(preview_text)
return {
"original": preview_text,
"processed": processed_text,
"truncated": truncated
}
def limit_sentences_per_paragraph(self, text: str, max_sentences: int) -> str:
"""
控制每个段落的句子数量
Args:
text: 输入文本
max_sentences: 每段最大句子数
Returns:
str: 处理后的文本
"""
if not text or max_sentences <= 0:
return text
# 定义句子结束标点符号
sentence_endings = ['', '', '', '.', '!', '?']
# 按段落分割文本
paragraphs = text.split('\n')
processed_paragraphs = []
for paragraph in paragraphs:
if not paragraph.strip():
processed_paragraphs.append(paragraph)
continue
# 找到所有句子结束位置
sentences = []
current_sentence = ""
for char in paragraph:
current_sentence += char
# 如果是句子结束符号,则认为是一个完整句子
if char in sentence_endings:
sentences.append(current_sentence)
current_sentence = ""
# 添加最后一个可能没有结束符号的句子
if current_sentence.strip():
sentences.append(current_sentence)
# 如果段落句子数不超过限制,直接添加
if len(sentences) <= max_sentences:
processed_paragraphs.append(paragraph)
continue
# 如果超过限制,重新组织段落
new_paragraphs = []
current_new_paragraph = ""
for i, sentence in enumerate(sentences):
current_new_paragraph += sentence
# 每达到max_sentences句就换段落
if (i + 1) % max_sentences == 0:
new_paragraphs.append(current_new_paragraph.strip())
current_new_paragraph = ""
# 添加剩余的句子
if current_new_paragraph.strip():
new_paragraphs.append(current_new_paragraph.strip())
# 将新段落添加到结果中
processed_paragraphs.extend(new_paragraphs)
return '\n'.join(processed_paragraphs)
# 创建全局文本处理器实例
text_processor = TextProcessor()
# 兼容旧接口的函数
def process_text_content(text: str) -> str:
"""
处理文本内容(兼容旧接口)
Args:
text: 输入文本
Returns:
str: 处理后的文本
"""
return text_processor.process_text_content(text)
def replace_periods(text: str) -> str:
"""
替换句号为逗号(兼容旧接口)
Args:
text: 输入文本
Returns:
str: 处理后的文本
"""
return text_processor.replace_periods(text)
def reverse_text_order(content: str) -> str:
"""
反转文本顺序(兼容旧接口)
Args:
content: 输入文本
Returns:
str: 反转后的文本
"""
return text_processor.reverse_text_order(content)
def apply_char_errors(text: str) -> str:
"""
应用错别字处理(兼容旧接口)
Args:
text: 输入文本
Returns:
str: 处理后的文本
"""
return text_processor.apply_char_errors(text)