""" 错别字处理模块 负责错别字的加载、管理和文本错误引入功能。 支持自定义错别字库,可按强度控制错误引入比例。 """ import os import json import random import re from typing import Dict, List, Tuple class ErrorCharProcessor: """错别字处理器类""" def __init__(self, db_path: str = "data/error_chars.json"): """ 初始化错别字处理器 Args: db_path: 错别字库文件路径 """ self.db_path = db_path self.error_chars = self.load_error_chars() def load_error_chars(self) -> Dict[str, str]: """ 加载错别字库 Returns: Dict[str, str]: 错别字映射字典 {正确字: 错误字} """ # 检查文件夹是否存在,不存在则创建 dir_name = os.path.dirname(self.db_path) if dir_name and not os.path.exists(dir_name): os.makedirs(dir_name) # print(f"加载错别字库文件: {self.db_path}") # 检查文件是否存在,不存在则创建默认库 if not os.path.exists(self.db_path): default_chars = self._get_default_error_chars() self.save_error_chars(default_chars) return default_chars # 加载已存在的错别字库 try: with open(self.db_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"加载错别字库失败: {e}") # 如果加载失败,返回默认库 return self._get_default_error_chars() def save_error_chars(self, error_chars: Dict[str, str]) -> bool: """ 保存错别字库到文件 Args: error_chars: 错别字映射字典 Returns: bool: 是否保存成功 """ try: # 确保目录存在 dir_name = os.path.dirname(self.db_path) if dir_name and not os.path.exists(dir_name): os.makedirs(dir_name) with open(self.db_path, 'w', encoding='utf-8') as f: json.dump(error_chars, f, ensure_ascii=False, indent=2) return True except Exception as e: print(f"保存错别字库失败: {e}") return False def _get_default_error_chars(self) -> Dict[str, str]: """ 获取默认错别字库 Returns: Dict[str, str]: 默认错别字映射 """ return { "的": "地", "地": "得", "得": "的", "在": "再", "再": "在", "是": "事", "事": "是", "他": "她", "她": "他", "你": "您", "您": "你", "们": "门", "门": "们", "有": "又", "又": "有", "和": "合", "合": "和", "到": "倒", "倒": "到", "就": "才", "才": "就", "要": "耍", "耍": "要", "会": "汇", "汇": "会", "看": "着", "着": "看", "说": "讲", "讲": "说", "做": "作", "作": "做", "已": "己", "己": "已", "以": "已", "已": "以", "进": "近", "近": "进", "象": "像", "像": "象", "对": "队", "队": "对", "分": "份", "份": "分", } def introduce_char_errors(self, text: str, intensity: float = 1.0) -> Tuple[str, int, List[str], List[str]]: """ 将文本中的正确单字替换为常见错误单字 Args: text: 要处理的文本 intensity: 错误引入强度,0.0-1.0之间,1.0表示替换所有可能的字 Returns: Tuple[str, int, List[str], List[str]]: 处理后的文本、替换的总数量、原句列表、处理后的句子列表 """ if not text or intensity <= 0: return text, 0, [], [] # 句子拆分 original_sentences = self._split_into_sentences(text) modified_sentences = [] total_replace = 0 for sentence in original_sentences: modified, count = self._introduce_errors_to_sentence(sentence, intensity) modified_sentences.append(modified) total_replace += count modified_text = ''.join(modified_sentences) return modified_text, total_replace, original_sentences, modified_sentences def _split_into_sentences(self, text: str) -> List[str]: """ 句子拆分函数 Args: text: 要拆分的文本 Returns: List[str]: 拆分后的句子列表 """ separators = re.compile(r'([。!?;,.!?;])') parts = separators.split(text) sentences = [] for i in range(0, len(parts) - 1, 2): if parts[i] or parts[i + 1]: sentences.append(parts[i] + parts[i + 1]) if len(parts) % 2 == 1 and parts[-1]: sentences.append(parts[-1]) return sentences def _introduce_errors_to_sentence(self, sentence: str, intensity: float) -> Tuple[str, int]: """ 单句错误引入函数 Args: sentence: 要处理的句子 intensity: 错误引入强度 Returns: Tuple[str, int]: 处理后的句子和替换数量 """ modified = list(sentence) replace_count = 0 for i, char in enumerate(modified): if char in self.error_chars and random.random() <= intensity: modified[i] = self.error_chars[char] replace_count += 1 return ''.join(modified), replace_count def add_error_mapping(self, correct_char: str, error_char: str) -> None: """ 添加错别字映射 Args: correct_char: 正确字符 error_char: 错误字符 """ self.error_chars[correct_char] = error_char def remove_error_mapping(self, correct_char: str) -> bool: """ 删除错别字映射 Args: correct_char: 要删除的正确字符 Returns: bool: 是否删除成功 """ if correct_char in self.error_chars: del self.error_chars[correct_char] return True return False def get_error_chars(self) -> Dict[str, str]: """ 获取当前错别字映射 Returns: Dict[str, str]: 错别字映射字典 """ return self.error_chars.copy() def update_error_chars(self, new_error_chars: Dict[str, str]) -> None: """ 更新错别字映射 Args: new_error_chars: 新的错别字映射 """ self.error_chars.update(new_error_chars) def clear_error_chars(self) -> None: """清空所有错别字映射""" self.error_chars.clear() def reset_to_default(self) -> None: """重置为默认错别字库""" self.error_chars = self._get_default_error_chars() def get_statistics(self, text: str, intensity: float = 1.0) -> Dict[str, int]: """ 获取文本错误引入统计信息(不实际修改文本) Args: text: 要统计的文本 intensity: 错误引入强度 Returns: Dict[str, int]: 统计信息 """ if not text: return {"total_chars": 0, "replaceable_chars": 0, "estimated_replacements": 0} total_chars = len(text) replaceable_chars = sum(1 for char in text if char in self.error_chars) estimated_replacements = int(replaceable_chars * intensity) return { "total_chars": total_chars, "replaceable_chars": replaceable_chars, "estimated_replacements": estimated_replacements } def create_error_processor(db_path: str = "data/error_chars.json") -> ErrorCharProcessor: """ 创建错别字处理器实例的工厂函数 Args: db_path: 错别字库文件路径 Returns: ErrorCharProcessor: 错别字处理器实例 """ return ErrorCharProcessor(db_path) # 兼容旧接口的函数 def load_error_chars(db_path: str = "data/error_chars.json") -> Dict[str, str]: """ 加载错别字库(兼容旧接口) Args: db_path: 错别字库文件路径 Returns: Dict[str, str]: 错别字映射字典 """ processor = ErrorCharProcessor(db_path) return processor.get_error_chars() def introduce_char_errors(text: str, intensity: float = 1.0, db_path: str = "data/error_chars.json") -> Tuple[str, int, List[str], List[str]]: """ 将文本中的正确单字替换为常见错误单字(兼容旧接口) Args: text: 要处理的文本 intensity: 错误引入强度,0.0-1.0之间 db_path: 错别字库文件路径 Returns: Tuple[str, int, List[str], List[str]]: 处理后的文本、替换的总数量、原句列表、处理后的句子列表 """ processor = ErrorCharProcessor(db_path) return processor.introduce_char_errors(text, intensity)