From 45dab3d8929ae162949d3f3e262e531aa9ca3783 Mon Sep 17 00:00:00 2001 From: taiyi Date: Sun, 21 Sep 2025 19:01:40 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AC=AC=E4=B8=80=E6=AC=A1=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/.gitignore | 3 + .../inspectionProfiles/profiles_settings.xml | 6 + .idea/misc.xml | 6 + .idea/modules.xml | 8 + .idea/vcs.xml | 6 + README_重构说明.md | 151 +++++ Txt2docx2.py | 1 + batch_processor.py | 336 +++++++++++ config.py | 256 +++++++++ data/11.txt | 1 + data/error_chars.json | 18 +- docx_generator.py | 428 ++++++++++++++ error_chars.py | 323 +++++++++++ file_handler.py | 393 +++++++++++++ gui_config.py | 183 ++++++ gui_matching_editor.py | 85 +++ gui_results.py | 44 ++ image_processor.py | 356 ++++++++++++ main.py | 368 ++++++++++++ markdown_parser.py | 538 ++++++++++++++++++ text_processor.py | 343 +++++++++++ 21 files changed, 3839 insertions(+), 14 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 README_重构说明.md create mode 100644 batch_processor.py create mode 100644 config.py create mode 100644 data/11.txt create mode 100644 docx_generator.py create mode 100644 error_chars.py create mode 100644 file_handler.py create mode 100644 gui_config.py create mode 100644 gui_matching_editor.py create mode 100644 gui_results.py create mode 100644 image_processor.py create mode 100644 main.py create mode 100644 markdown_parser.py create mode 100644 text_processor.py diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..359bb53 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..5dc547e --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..df0dbbc --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/README_重构说明.md b/README_重构说明.md new file mode 100644 index 0000000..17b3833 --- /dev/null +++ b/README_重构说明.md @@ -0,0 +1,151 @@ +# TXT2DOCX 重构项目说明 + +## 项目结构 + +重构后的项目采用模块化设计,将原来1636行的单一文件拆分为多个功能明确的模块: + +``` +TxT2DOCX/ +├── main.py # 主程序入口 +├── config.py # 配置管理模块 +├── file_handler.py # 文件处理模块 +├── text_processor.py # 文本处理模块 +├── markdown_parser.py # Markdown解析模块 +├── image_processor.py # 图片处理模块 +├── error_chars.py # 错别字处理模块 +├── docx_generator.py # DOCX文档生成模块 +├── batch_processor.py # 批量处理模块 +├── gui_config.py # GUI配置窗口 +├── gui_matching_editor.py # GUI匹配编辑器 +├── gui_results.py # GUI结果显示 +├── replacestr.py # 原有文字处理脚本 +├── Txt2docx2.py # 原有主程序(备份) +└── data/ + ├── 11.txt # 测试数据 + └── error_chars.json # 错别字库 +``` + +## 模块说明 + +### 1. 配置管理模块 (config.py) +- **职责**: 统一管理应用程序的所有配置项 +- **功能**: 配置加载、保存、默认值设置 +- **优势**: 集中配置管理,支持配置文件持久化 + +### 2. 文件处理模块 (file_handler.py) +- **职责**: 处理文件系统相关操作 +- **功能**: 文件扫描、匹配、读取、路径处理 +- **优势**: 统一文件操作接口,支持多种编码格式 + +### 3. 文本处理模块 (text_processor.py) +- **职责**: 处理文本的各种操作 +- **功能**: 顺序调换、标点符号替换、错别字处理 +- **优势**: 模块化文本处理,易于扩展新功能 + +### 4. Markdown解析模块 (markdown_parser.py) +- **职责**: 解析Markdown格式文本 +- **功能**: 结构化解析、格式提取、内容分组 +- **优势**: 完整的Markdown支持,可扩展新格式 + +### 5. 图片处理模块 (image_processor.py) +- **职责**: 处理图片相关操作 +- **功能**: 图片读取、尺寸调整、格式转换 +- **优势**: 专业的图片处理,支持多种格式 + +### 6. 错别字处理模块 (error_chars.py) +- **职责**: 管理错别字库和错误引入 +- **功能**: 错别字库管理、错误引入控制 +- **优势**: 独立的错别字处理,支持自定义库 + +### 7. DOCX生成模块 (docx_generator.py) +- **职责**: 生成DOCX文档 +- **功能**: 文档结构生成、格式应用、图片插入 +- **优势**: 专业的文档生成,支持丰富格式 + +### 8. 批量处理模块 (batch_processor.py) +- **职责**: 协调批量处理流程 +- **功能**: 批量转换、进度管理、错误处理 +- **优势**: 高效的批量处理,详细的进度反馈 + +### 9. 主程序 (main.py) +- **职责**: 程序入口和GUI主界面 +- **功能**: 应用程序启动、主界面管理 +- **优势**: 清晰的程序结构,易于维护 + +## 重构优势 + +### 1. **模块化设计** +- 每个模块职责单一,功能明确 +- 模块间低耦合,高内聚 +- 易于单独测试和调试 + +### 2. **可维护性提升** +- 代码结构清晰,逻辑分明 +- 修改某个功能不影响其他模块 +- 新功能易于添加和集成 + +### 3. **可扩展性增强** +- 支持插件式扩展 +- 新的文件格式处理容易添加 +- 新的文本处理功能容易集成 + +### 4. **代码复用** +- 各模块可独立使用 +- 提供兼容旧接口的函数 +- 便于其他项目复用 + +### 5. **错误处理改进** +- 更细粒度的错误处理 +- 详细的错误信息反馈 +- 更好的异常恢复机制 + +## 兼容性 + +重构后的代码保持与原有功能的完全兼容: +- 所有原有功能都得到保留 +- 配置文件格式保持不变 +- 输出结果与原版本一致 +- 提供兼容旧接口的函数 + +## 使用方法 + +### 运行主程序 +```bash +python main.py +``` + +### 使用单独模块 +```python +# 使用配置模块 +from config import config +config.enable_char_errors = True + +# 使用文本处理模块 +from text_processor import text_processor +processed_text = text_processor.process_text_content("测试文本") + +# 使用文件处理模块 +from file_handler import FileHandler +txt_files = FileHandler.scan_txt_files("./txt_folder") +``` + +## 测试状态 + +✅ 所有模块导入测试通过 +✅ 配置管理功能正常 +✅ 文本处理功能正常 +✅ 文件处理功能正常 +✅ Markdown解析功能正常 +✅ 主程序启动正常 + +## 后续改进方向 + +1. **单元测试**: 为每个模块添加完整的单元测试 +2. **文档完善**: 添加更详细的API文档 +3. **性能优化**: 优化大文件处理性能 +4. **功能扩展**: 支持更多Markdown扩展语法 +5. **GUI改进**: 优化用户界面体验 + +## 总结 + +通过模块化重构,项目代码从原来的1636行单文件,重构为11个功能模块,总计约2000+行代码。每个模块职责明确,代码结构清晰,易于维护和扩展。重构后的代码不仅保持了原有功能的完整性,还大大提升了代码的可维护性和可扩展性。 \ No newline at end of file diff --git a/Txt2docx2.py b/Txt2docx2.py index 9a4de76..285de12 100644 --- a/Txt2docx2.py +++ b/Txt2docx2.py @@ -25,6 +25,7 @@ def load_error_chars(db_path: str = "data/error_chars.json") -> dict: if not os.path.exists(dir_name): os.makedirs(dir_name) + print(f"加载到文件{db_path}") # 检查文件是否存在,不存在则创建默认库 if not os.path.exists(db_path): default_chars = { diff --git a/batch_processor.py b/batch_processor.py new file mode 100644 index 0000000..6b38f89 --- /dev/null +++ b/batch_processor.py @@ -0,0 +1,336 @@ +""" +批量处理模块 + +负责批量处理多个TXT文件,协调文件读取、解析、转换和输出等步骤。 +""" + +import os +from typing import List, Dict, Any, Callable, Optional + +from file_handler import FileHandler +from markdown_parser import MarkdownParser +from docx_generator import DocxGenerator + + +class BatchProcessor: + """批量处理器类""" + + def __init__(self): + """初始化批量处理器""" + self.file_handler = FileHandler() + self.markdown_parser = MarkdownParser() + self.docx_generator = DocxGenerator() + + def process_batch(self, matched_pairs: List[Dict[str, Any]], output_root: str, + progress_callback: Optional[Callable] = None) -> Dict[str, Any]: + """ + 批量处理匹配的文件对 + + Args: + matched_pairs: 匹配的TXT文件和图片文件夹对列表 + output_root: 输出根目录 + progress_callback: 进度回调函数 (progress: int, message: str) -> None + + Returns: + Dict[str, Any]: 处理结果统计 + """ + total = len(matched_pairs) + success_count = 0 + failed_items = [] + processed_files = [] + + for i, pair in enumerate(matched_pairs): + try: + if progress_callback: + overall_progress = int((i / total) * 100) + progress_callback(overall_progress, f"处理 {i + 1}/{total}: {pair['txt']['name']}") + + # 处理单个文件对 + result = self._process_single_pair(pair, output_root, i, total, progress_callback) + + if result['success']: + success_count += 1 + processed_files.append(result['output_path']) + else: + failed_items.append({ + "name": pair['txt']['name'], + "error": result['error'] + }) + + except Exception as e: + failed_items.append({ + "name": pair['txt']['name'], + "error": str(e) + }) + + # 确定主要输出文件夹 + main_output_folder = "" + if matched_pairs and success_count > 0: + sample_output = self.file_handler.prepare_output_path( + matched_pairs[0]['txt'], "", output_root + ) + main_output_folder = os.path.dirname(sample_output) + + return { + "total": total, + "success": success_count, + "failed": len(failed_items), + "failed_items": failed_items, + "main_output_folder": main_output_folder, + "processed_files": processed_files + } + + def _process_single_pair(self, pair: Dict[str, Any], output_root: str, + current_index: int, total_count: int, + progress_callback: Optional[Callable] = None) -> Dict[str, Any]: + """ + 处理单个TXT文件和图片文件夹对 + + Args: + pair: 文件对信息 + output_root: 输出根目录 + current_index: 当前处理的索引 + total_count: 总文件数 + progress_callback: 进度回调函数 + + Returns: + Dict[str, Any]: 处理结果 + """ + result = { + "success": False, + "output_path": "", + "error": "" + } + + try: + # 准备输出路径 + output_path = self.file_handler.prepare_output_path( + pair['txt'], + pair['image_folder']['path'] if pair['image_folder'] else "", + output_root + ) + result["output_path"] = output_path + + # 读取TXT内容 + txt_content = self.file_handler.read_markdown_txt(pair['txt']['path']) + if not txt_content.strip(): + raise Exception("TXT文件内容为空") + + # 解析内容为结构化数据 + sections = self.markdown_parser.parse(txt_content) + if not sections: + raise Exception("未解析到有效内容") + + # 获取图片文件 + image_files = [] + if pair['image_folder']: + image_files = self.file_handler.get_image_files(pair['image_folder']['path']) + + # 生成DOCX + def update_file_progress(progress: int, text: str): + if progress_callback: + # 计算整体进度:当前文件的进度在总进度中的占比 + file_weight = 1.0 / total_count + current_file_progress = current_index + (progress / 100.0) + overall_progress = int((current_file_progress / total_count) * 100) + progress_callback(overall_progress, f"{pair['txt']['name']}: {text}") + + success = self.docx_generator.generate(sections, image_files, output_path, update_file_progress) + + if success: + result["success"] = True + else: + result["error"] = "DOCX生成失败" + + except Exception as e: + result["error"] = str(e) + + return result + + def validate_batch_input(self, txt_folder: str, images_root: str, + output_root: str = None) -> Dict[str, Any]: + """ + 验证批量处理的输入参数 + + Args: + txt_folder: TXT文件夹路径 + images_root: 图片根文件夹路径 + output_root: 输出根文件夹路径(可选) + + Returns: + Dict[str, Any]: 验证结果 + """ + result = { + "valid": True, + "errors": [], + "warnings": [], + "statistics": {} + } + + try: + # 验证路径 + path_validation = self.file_handler.validate_paths(txt_folder, images_root, output_root) + + if not path_validation["txt_folder_valid"]: + result["errors"].append("TXT文件夹路径无效") + result["valid"] = False + + if not path_validation["images_root_valid"]: + result["errors"].append("图片根文件夹路径无效") + result["valid"] = False + + if not path_validation["output_root_valid"]: + result["errors"].append("输出根文件夹路径无效") + result["valid"] = False + + # 如果基本路径验证通过,获取统计信息 + if result["valid"]: + try: + txt_files = self.file_handler.scan_txt_files(txt_folder) + result["statistics"]["txt_files_count"] = len(txt_files) + + if len(txt_files) == 0: + result["warnings"].append("未找到任何TXT文件") + + # 获取图片文件夹统计 + img_stats = self.file_handler.get_folder_statistics(images_root) + result["statistics"]["image_folders_count"] = img_stats["image_folders"] + result["statistics"]["total_images"] = img_stats["total_images"] + + if img_stats["image_folders"] == 0: + result["warnings"].append("未找到任何包含图片的文件夹") + + except Exception as e: + result["warnings"].append(f"获取文件统计信息失败: {str(e)}") + + except Exception as e: + result["errors"].append(f"验证过程出错: {str(e)}") + result["valid"] = False + + return result + + def preview_batch_processing(self, txt_folder: str, images_root: str) -> Dict[str, Any]: + """ + 预览批量处理结果(不实际处理) + + Args: + txt_folder: TXT文件夹路径 + images_root: 图片根文件夹路径 + + Returns: + Dict[str, Any]: 预览结果 + """ + preview = { + "txt_files": [], + "matched_pairs": [], + "unmatched_txt_files": [], + "statistics": { + "total_txt_files": 0, + "matched_files": 0, + "unmatched_files": 0, + "total_images": 0 + } + } + + try: + # 扫描TXT文件 + txt_files = self.file_handler.scan_txt_files(txt_folder) + preview["txt_files"] = txt_files + preview["statistics"]["total_txt_files"] = len(txt_files) + + # 查找匹配的图片文件夹 + matched_pairs = self.file_handler.find_matching_image_folders(txt_files, images_root) + + matched_files = [] + unmatched_files = [] + total_images = 0 + + for pair in matched_pairs: + if pair['image_folder']: + matched_files.append(pair) + # 统计图片数量 + image_files = self.file_handler.get_image_files(pair['image_folder']['path']) + total_images += len(image_files) + else: + unmatched_files.append(pair['txt']) + + preview["matched_pairs"] = matched_files + preview["unmatched_txt_files"] = unmatched_files + preview["statistics"]["matched_files"] = len(matched_files) + preview["statistics"]["unmatched_files"] = len(unmatched_files) + preview["statistics"]["total_images"] = total_images + + except Exception as e: + preview["error"] = str(e) + + return preview + + def get_processing_estimates(self, matched_pairs: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + 获取处理时间和资源估算 + + Args: + matched_pairs: 匹配的文件对列表 + + Returns: + Dict[str, Any]: 估算结果 + """ + estimates = { + "total_files": len(matched_pairs), + "estimated_time_minutes": 0, + "estimated_output_size_mb": 0, + "warnings": [] + } + + try: + total_txt_size = 0 + total_image_size = 0 + total_images = 0 + + for pair in matched_pairs: + # 统计TXT文件大小 + txt_path = pair['txt']['path'] + if os.path.exists(txt_path): + total_txt_size += os.path.getsize(txt_path) + + # 统计图片文件大小 + if pair['image_folder']: + image_files = self.file_handler.get_image_files(pair['image_folder']['path']) + total_images += len(image_files) + for img_path in image_files: + if os.path.exists(img_path): + total_image_size += os.path.getsize(img_path) + + # 估算处理时间(基于经验值) + # 假设每个文件平均处理时间为10秒,每张图片额外增加2秒 + base_time = len(matched_pairs) * 10 # 秒 + image_time = total_images * 2 # 秒 + total_time_seconds = base_time + image_time + estimates["estimated_time_minutes"] = max(1, total_time_seconds // 60) + + # 估算输出文件大小(DOCX通常比原文件大) + estimated_size_bytes = total_txt_size * 2 + total_image_size * 0.8 # 压缩后的图片 + estimates["estimated_output_size_mb"] = max(1, estimated_size_bytes // (1024 * 1024)) + + # 添加警告 + if total_images > 1000: + estimates["warnings"].append("图片数量较多,处理时间可能较长") + + if estimated_size_bytes > 500 * 1024 * 1024: # 500MB + estimates["warnings"].append("预计输出文件较大,请确保有足够的磁盘空间") + + except Exception as e: + estimates["error"] = str(e) + + return estimates + + +# 创建全局批量处理器实例 +batch_processor = BatchProcessor() + + +# 兼容旧接口的函数 +def process_batch(matched_pairs: List[Dict[str, Any]], output_root: str, + progress_callback: Optional[Callable] = None) -> Dict[str, Any]: + """批量处理文件对(兼容旧接口)""" + return batch_processor.process_batch(matched_pairs, output_root, progress_callback) \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..c942a9a --- /dev/null +++ b/config.py @@ -0,0 +1,256 @@ +""" +配置管理模块 + +负责应用程序的配置管理,包括配置的加载、保存和默认值设置。 +支持文件处理、文本处理、图片处理和文档格式等各类配置。 +""" + +import os +import configparser +from typing import Dict, Any + + +class Config: + """配置管理类,统一管理应用程序的所有配置项""" + + def __init__(self): + """初始化配置,设置所有默认值""" + # 文件处理配置 + self.txt_encoding = "utf-8" + self.match_pattern = "exact" # exact: 完全匹配, prefix: 前缀匹配, contains: 包含 + self.output_location = "txt_folder" # txt_folder or custom + + # 最近使用的文件夹路径 + self.last_txt_folder = "" + self.last_images_root = "" + self.last_output_root = "" + + # 文字处理配置 + self.reverse_text_order = False # 转换文字顺序开关 + self.replace_punctuation = False # 是否替换标点符号 + self.add_disclaimer = False # 是否添加免责声明 + + # 错别字处理配置 + self.enable_char_errors = False # 是否启用错别字处理 + self.char_error_intensity = 0.3 # 错别字强度 0.0-1.0 + self.char_error_db_path = "data/error_chars.json" # 错别字库路径 + + # 图片处理配置 + self.image_sort_by = "name" # name or time + self.image_resize = "none" # none or width + self.image_width = 6 # 英寸 + self.image_alignment = "center" # left, center, right + self.image_strategy = "cycle" # cycle, truncate, repeat_last + + # 文档格式配置 + self.line_spacing = 1.5 + self.title_levels = 6 # 支持的最大标题层级 + + def load_from_file(self, file_path: str) -> bool: + """ + 从配置文件加载配置 + + Args: + file_path: 配置文件路径 + + Returns: + bool: 是否成功加载 + """ + if not os.path.exists(file_path): + return False + + try: + config_parser = configparser.ConfigParser() + config_parser.read(file_path, encoding='utf-8') + + # 加载文件处理配置 + if 'FileHandling' in config_parser: + section = config_parser['FileHandling'] + self.txt_encoding = section.get('txt_encoding', self.txt_encoding) + self.match_pattern = section.get('match_pattern', self.match_pattern) + self.output_location = section.get('output_location', self.output_location) + self.last_txt_folder = section.get('last_txt_folder', self.last_txt_folder) + self.last_images_root = section.get('last_images_root', self.last_images_root) + self.last_output_root = section.get('last_output_root', self.last_output_root) + + # 加载文字处理配置 + if 'TextProcessing' in config_parser: + section = config_parser['TextProcessing'] + self.reverse_text_order = section.getboolean('reverse_text_order', self.reverse_text_order) + self.replace_punctuation = section.getboolean('replace_punctuation', self.replace_punctuation) + self.add_disclaimer = section.getboolean('add_disclaimer', self.add_disclaimer) + self.enable_char_errors = section.getboolean('enable_char_errors', self.enable_char_errors) + self.char_error_intensity = section.getfloat('char_error_intensity', self.char_error_intensity) + self.char_error_db_path = section.get('char_error_db_path', self.char_error_db_path) + + # 加载图片处理配置 + if 'ImageProcessing' in config_parser: + section = config_parser['ImageProcessing'] + self.image_sort_by = section.get('image_sort_by', self.image_sort_by) + self.image_resize = section.get('image_resize', self.image_resize) + self.image_width = section.getfloat('image_width', self.image_width) + self.image_alignment = section.get('image_alignment', self.image_alignment) + self.image_strategy = section.get('image_strategy', self.image_strategy) + + # 加载文档格式配置 + if 'DocumentFormat' in config_parser: + section = config_parser['DocumentFormat'] + self.line_spacing = section.getfloat('line_spacing', self.line_spacing) + self.title_levels = section.getint('title_levels', self.title_levels) + + return True + + except Exception as e: + print(f"加载配置文件失败: {e}") + return False + + def save_to_file(self, file_path: str) -> bool: + """ + 保存配置到文件 + + Args: + file_path: 配置文件路径 + + Returns: + bool: 是否成功保存 + """ + try: + config_parser = configparser.ConfigParser() + + # 保存文件处理配置 + config_parser['FileHandling'] = { + 'txt_encoding': self.txt_encoding, + 'match_pattern': self.match_pattern, + 'output_location': self.output_location, + 'last_txt_folder': self.last_txt_folder, + 'last_images_root': self.last_images_root, + 'last_output_root': self.last_output_root + } + + # 保存文字处理配置 + config_parser['TextProcessing'] = { + 'reverse_text_order': str(self.reverse_text_order), + 'replace_punctuation': str(self.replace_punctuation), + 'add_disclaimer': str(self.add_disclaimer), + 'enable_char_errors': str(self.enable_char_errors), + 'char_error_intensity': str(self.char_error_intensity), + 'char_error_db_path': self.char_error_db_path + } + + # 保存图片处理配置 + config_parser['ImageProcessing'] = { + 'image_sort_by': self.image_sort_by, + 'image_resize': self.image_resize, + 'image_width': str(self.image_width), + 'image_alignment': self.image_alignment, + 'image_strategy': self.image_strategy + } + + # 保存文档格式配置 + config_parser['DocumentFormat'] = { + 'line_spacing': str(self.line_spacing), + 'title_levels': str(self.title_levels) + } + + # 确保目录存在 + os.makedirs(os.path.dirname(file_path), exist_ok=True) + + with open(file_path, 'w', encoding='utf-8') as f: + config_parser.write(f) + + return True + + except Exception as e: + print(f"保存配置文件失败: {e}") + return False + + def to_dict(self) -> Dict[str, Any]: + """ + 将配置转换为字典格式 + + Returns: + Dict[str, Any]: 配置字典 + """ + return { + 'file_handling': { + 'txt_encoding': self.txt_encoding, + 'match_pattern': self.match_pattern, + 'output_location': self.output_location, + 'last_txt_folder': self.last_txt_folder, + 'last_images_root': self.last_images_root, + 'last_output_root': self.last_output_root + }, + 'text_processing': { + 'reverse_text_order': self.reverse_text_order, + 'replace_punctuation': self.replace_punctuation, + 'add_disclaimer': self.add_disclaimer, + 'enable_char_errors': self.enable_char_errors, + 'char_error_intensity': self.char_error_intensity, + 'char_error_db_path': self.char_error_db_path + }, + 'image_processing': { + 'image_sort_by': self.image_sort_by, + 'image_resize': self.image_resize, + 'image_width': self.image_width, + 'image_alignment': self.image_alignment, + 'image_strategy': self.image_strategy + }, + 'document_format': { + 'line_spacing': self.line_spacing, + 'title_levels': self.title_levels + } + } + + def from_dict(self, config_dict: Dict[str, Any]) -> None: + """ + 从字典加载配置 + + Args: + config_dict: 配置字典 + """ + # 文件处理配置 + if 'file_handling' in config_dict: + fh = config_dict['file_handling'] + self.txt_encoding = fh.get('txt_encoding', self.txt_encoding) + self.match_pattern = fh.get('match_pattern', self.match_pattern) + self.output_location = fh.get('output_location', self.output_location) + self.last_txt_folder = fh.get('last_txt_folder', self.last_txt_folder) + self.last_images_root = fh.get('last_images_root', self.last_images_root) + self.last_output_root = fh.get('last_output_root', self.last_output_root) + + # 文字处理配置 + if 'text_processing' in config_dict: + tp = config_dict['text_processing'] + self.reverse_text_order = tp.get('reverse_text_order', self.reverse_text_order) + self.replace_punctuation = tp.get('replace_punctuation', self.replace_punctuation) + self.add_disclaimer = tp.get('add_disclaimer', self.add_disclaimer) + self.enable_char_errors = tp.get('enable_char_errors', self.enable_char_errors) + self.char_error_intensity = tp.get('char_error_intensity', self.char_error_intensity) + self.char_error_db_path = tp.get('char_error_db_path', self.char_error_db_path) + + # 图片处理配置 + if 'image_processing' in config_dict: + ip = config_dict['image_processing'] + self.image_sort_by = ip.get('image_sort_by', self.image_sort_by) + self.image_resize = ip.get('image_resize', self.image_resize) + self.image_width = ip.get('image_width', self.image_width) + self.image_alignment = ip.get('image_alignment', self.image_alignment) + self.image_strategy = ip.get('image_strategy', self.image_strategy) + + # 文档格式配置 + if 'document_format' in config_dict: + df = config_dict['document_format'] + self.line_spacing = df.get('line_spacing', self.line_spacing) + self.title_levels = df.get('title_levels', self.title_levels) + + def reset_to_defaults(self) -> None: + """重置所有配置为默认值""" + self.__init__() + + +# 全局配置实例 +CONFIG_FILE_PATH = os.path.join(os.path.expanduser("~"), ".txt2md2docx.ini") +config = Config() + +# 自动加载配置 +config.load_from_file(CONFIG_FILE_PATH) \ No newline at end of file diff --git a/data/11.txt b/data/11.txt new file mode 100644 index 0000000..77c2e94 --- /dev/null +++ b/data/11.txt @@ -0,0 +1 @@ +1C8FE-D014B-A0084-9CF61 \ No newline at end of file diff --git a/data/error_chars.json b/data/error_chars.json index 9df266a..a233f53 100644 --- a/data/error_chars.json +++ b/data/error_chars.json @@ -1,6 +1,5 @@ { "日": "曰", - "木": "本", "度": "渡", "暴": "爆", "籍": "藉", @@ -29,9 +28,7 @@ "赝": "膺", "掣": "擎", "峰": "锋", - "读": "续", "眯": "咪", - "胶": "狡", "旯": "旮", "奄": "掩", "恃": "持", @@ -56,25 +53,16 @@ "博": "搏", "灿": "粲", "毫": "豪", - "检": "捡", - "骄": "娇", "梁": "粱", "蓬": "篷", "辟": "僻", "欺": "期", "洽": "恰", - "皱": "邹", - "诸": "著", - "煮": "著", - "壮": "状", "追": "摧", - "卓": "桌", "咨": "资", "滋": "磁", - "阻": "组", "遵": "尊", - "的": "得", "她": "他", "到": "倒", "倒": "到", @@ -85,7 +73,6 @@ "作": "做", "已": "已", "己": "已", - "以": "已", "进": "近", "近": "进", "象": "像", @@ -109,7 +96,10 @@ "夂": "夊", "祖": "袓", "芙": "褔", - "萬": "萭" + "萬": "萭", + "有": "侑", + "的": "昀", + "是": "昰" } diff --git a/docx_generator.py b/docx_generator.py new file mode 100644 index 0000000..3cb072c --- /dev/null +++ b/docx_generator.py @@ -0,0 +1,428 @@ +""" +DOCX文档生成模块 + +负责将解析后的Markdown结构转换为DOCX文档,包括文本格式化、图片插入和样式设置。 +""" + +import os +import re +from typing import List, Dict, Any, Callable, Optional +from docx import Document +from docx.shared import Inches, Pt, RGBColor +from docx.enum.text import WD_ALIGN_PARAGRAPH +from docx.enum.style import WD_STYLE_TYPE + +from config import config +from text_processor import text_processor +from image_processor import ImageProcessor +from markdown_parser import MarkdownParser + + +# 免责声明文本 +DISCLAIMER_TEXT = """`[免责声明]文章的时间、过程、图片均来自于网络,文章旨在传播正能量,均无低俗等不良引导,请观众勿对号入座,并上升到人身攻击等方面。观众理性看待本事件,切勿留下主观臆断的恶意评论,互联网不是法外之地。本文如若真实性存在争议、事件版权或图片侵权问题,请及时联系作者,我们将予以删除。`""" + + +class DocxGenerator: + """DOCX文档生成器类""" + + def __init__(self): + """初始化DOCX生成器""" + self.temp_files = [] # 跟踪临时文件以便清理 + + def generate(self, sections: List[Dict[str, Any]], image_files: List[str], + output_path: str, progress_callback: Optional[Callable] = None) -> bool: + """ + 生成DOCX文档 + + Args: + sections: 解析后的文档章节列表 + image_files: 图片文件路径列表 + output_path: 输出文件路径 + progress_callback: 进度回调函数 + + Returns: + bool: 是否生成成功 + + Raises: + Exception: 生成失败时 + """ + try: + doc = Document() + self._setup_document_styles(doc) + + total_sections = len(sections) + image_index = 0 + image_count = len(image_files) + + for i, section in enumerate(sections): + if progress_callback: + progress = int((i / total_sections) * 100) + section_title = section['content'][:30] + "..." if len(section['content']) > 30 else section['content'] + progress_callback(progress, f"处理章节: {section_title}") + + # 添加章节内容 + image_index = self._add_section_to_doc(doc, section, image_files, image_index, image_count, output_path) + + # 添加免责声明 + if config.add_disclaimer: + self._add_disclaimer(doc) + + # 保存文档 + doc.save(output_path) + + if progress_callback: + progress_callback(100, "转换完成!") + + return True + + except Exception as e: + raise Exception(f"生成DOCX失败: {str(e)}") + finally: + # 清理临时文件 + self._cleanup_temp_files() + + def _setup_document_styles(self, doc: Document) -> None: + """ + 设置文档样式 + + Args: + doc: DOCX文档对象 + """ + try: + # 设置默认字体和行距 + styles = doc.styles + + # 设置正文样式 + if 'Normal' in styles: + normal_style = styles['Normal'] + if config.line_spacing > 0: + normal_style.paragraph_format.line_spacing = config.line_spacing + + except Exception as e: + print(f"设置文档样式时出错: {e}") + + def _add_section_to_doc(self, doc: Document, section: Dict[str, Any], + image_files: List[str], image_index: int, image_count: int, + output_path: str) -> int: + """ + 添加章节内容到文档 + + Args: + doc: DOCX文档对象 + section: 章节数据 + image_files: 图片文件列表 + image_index: 当前图片索引 + image_count: 图片总数 + output_path: 输出文件路径(用于临时文件) + + Returns: + int: 更新后的图片索引 + """ + # 添加章节标题 + if section['level'] > 0 and section['level'] <= config.title_levels: + heading_text = text_processor.process_text_content(section['content']) + para = doc.add_heading(level=section['level']) + self._apply_inline_formatting(para, heading_text) + elif section['content'] != '前置内容': + heading_text = text_processor.process_text_content(section['content']) + para = doc.add_paragraph() + run = para.add_run(heading_text) + run.font.size = Pt(14) + run.font.bold = True + para.space_after = Pt(12) + + # 处理章节中的元素 + elements = section.get('elements', []) + if not elements: + return image_index + + # 处理第一个非空元素后插入图片 + first_content_added = False + + for element in elements: + # 添加元素到文档 + self._add_element_to_doc(doc, element) + + # 在第一个内容元素后插入图片 + if not first_content_added and element['type'] not in ['empty']: + first_content_added = True + image_index = self._insert_section_image(doc, image_files, image_index, image_count, output_path) + + return image_index + + def _add_element_to_doc(self, doc: Document, element: Dict[str, Any]) -> None: + """ + 将解析的元素添加到文档中 + + Args: + doc: DOCX文档对象 + element: 元素数据 + """ + element_type = element['type'] + content = text_processor.process_text_content(element.get('content', '')) + + if element_type == 'paragraph': + self._add_formatted_paragraph(doc, content) + + elif element_type == 'unordered_list': + para = doc.add_paragraph(style='List Bullet') + self._apply_inline_formatting(para, content) + + elif element_type == 'ordered_list': + para = doc.add_paragraph(style='List Number') + self._apply_inline_formatting(para, content) + + elif element_type == 'blockquote': + para = doc.add_paragraph(style='Quote') + self._apply_inline_formatting(para, content) + + elif element_type == 'code_block': + self._add_code_block(doc, element.get('content', ''), element.get('language', '')) + + elif element_type == 'table': + self._add_table_to_doc(doc, element.get('rows', [])) + + elif element_type == 'horizontal_rule': + self._add_horizontal_rule(doc) + + elif element_type == 'empty': + doc.add_paragraph() + + def _add_formatted_paragraph(self, doc: Document, content: str) -> None: + """ + 添加带格式的段落 + + Args: + doc: DOCX文档对象 + content: 段落内容 + """ + if not content or not content.strip(): + doc.add_paragraph() + return + + para = doc.add_paragraph() + self._apply_inline_formatting(para, content) + + if config.line_spacing > 0: + para.paragraph_format.line_spacing = config.line_spacing + + def _apply_inline_formatting(self, paragraph, text: str) -> None: + """ + 应用行内格式到段落 + + Args: + paragraph: DOCX段落对象 + text: 要格式化的文本 + """ + # 首先处理文字内容(已在调用前处理) + processed_text = text + + # 提取格式信息 + formatting = MarkdownParser.extract_inline_formatting(processed_text) + + # 如果没有格式,直接添加文本 + if not formatting: + paragraph.add_run(processed_text) + return + + current_pos = 0 + + for fmt in formatting: + # 添加格式前的普通文本 + if fmt['start'] > current_pos: + paragraph.add_run(processed_text[current_pos:fmt['start']]) + + # 创建格式化的run + if fmt['type'] == 'bold': + clean_text = re.sub(r'\*\*(.+?)\*\*|__(.+?)__', r'\1\2', processed_text[fmt['start']:fmt['end']]) + run = paragraph.add_run(clean_text) + run.bold = True + + elif fmt['type'] == 'italic': + clean_text = re.sub(r'(? None: + """ + 添加代码块 + + Args: + doc: DOCX文档对象 + content: 代码内容 + language: 编程语言 + """ + para = doc.add_paragraph(style='No Spacing') + run = para.add_run(content) + run.font.name = 'Courier New' + run.font.size = Pt(10) + + # 设置背景色(如果支持) + try: + para.paragraph_format.space_before = Pt(6) + para.paragraph_format.space_after = Pt(6) + except: + pass + + def _add_table_to_doc(self, doc: Document, rows: List[List[str]]) -> None: + """ + 添加表格到文档 + + Args: + doc: DOCX文档对象 + rows: 表格行数据 + """ + if not rows: + return + + table = doc.add_table(rows=len(rows), cols=len(rows[0])) + table.style = 'Table Grid' + + for i, row_data in enumerate(rows): + row_cells = table.rows[i].cells + for j, cell_data in enumerate(row_data): + if j < len(row_cells): + processed_text = text_processor.process_text_content(cell_data) + row_cells[j].text = processed_text + + def _add_horizontal_rule(self, doc: Document) -> None: + """ + 在文档中添加横线 + + Args: + doc: DOCX文档对象 + """ + para = doc.add_paragraph() + run = para.add_run() + run.font.underline = True + run.text = " " * 100 # 足够长的下划线作为横线 + para.alignment = WD_ALIGN_PARAGRAPH.CENTER + + def _insert_section_image(self, doc: Document, image_files: List[str], + image_index: int, image_count: int, output_path: str) -> int: + """ + 为章节插入图片 + + Args: + doc: DOCX文档对象 + image_files: 图片文件列表 + image_index: 当前图片索引 + image_count: 图片总数 + output_path: 输出文件路径 + + Returns: + int: 更新后的图片索引 + """ + if image_count > 0 and image_index < image_count: + try: + self._insert_image(doc, image_files[image_index], output_path) + image_index += 1 + + # 根据策略处理图片不足的情况 + if image_index >= image_count: + if config.image_strategy == "cycle": + image_index = 0 + elif config.image_strategy == "truncate": + image_index = image_count + # repeat_last策略:保持当前索引-1,下次还用最后一张 + + except Exception as e: + # 插入失败时添加错误提示 + para = doc.add_paragraph() + run = para.add_run(f"[图片插入失败: {str(e)}]") + run.font.color.rgb = RGBColor(255, 0, 0) # 红色 + + return image_index + + def _insert_image(self, doc: Document, image_path: str, output_path: str) -> None: + """ + 插入图片到文档 + + Args: + doc: DOCX文档对象 + image_path: 图片文件路径 + output_path: 输出文件路径(用于临时文件) + """ + try: + # 处理图片 + img, width = ImageProcessor.process_image(image_path) + + temp_img_path = None + if config.image_resize == "width": + # 需要保存临时图片 + temp_dir = os.path.dirname(output_path) + os.makedirs(temp_dir, exist_ok=True) + temp_img_path = os.path.join(temp_dir, f"temp_img_{hash(image_path)}.png") + img.save(temp_img_path) + self.temp_files.append(temp_img_path) + img_path = temp_img_path + else: + img_path = image_path + + # 创建段落并插入图片 + para = doc.add_paragraph() + run = para.runs[0] if para.runs else para.add_run() + run.add_picture(img_path, width=Inches(width)) + para.alignment = ImageProcessor.get_image_alignment() + + except Exception as e: + raise Exception(f"插入图片失败: {str(e)}") + + def _add_disclaimer(self, doc: Document) -> None: + """ + 添加免责声明 + + Args: + doc: DOCX文档对象 + """ + doc.add_paragraph("---") + para = doc.add_paragraph() + disclaimer_text = text_processor.process_text_content(DISCLAIMER_TEXT) + run = para.add_run(disclaimer_text) + run.font.size = Pt(10) + para.paragraph_format.line_spacing = 1.0 + + def _cleanup_temp_files(self) -> None: + """清理临时文件""" + for temp_file in self.temp_files: + try: + if os.path.exists(temp_file): + os.remove(temp_file) + except Exception as e: + print(f"清理临时文件失败 {temp_file}: {e}") + self.temp_files.clear() + + +# 创建全局DOCX生成器实例 +docx_generator = DocxGenerator() + + +# 兼容旧接口的函数 +def generate(sections: List[Dict[str, Any]], image_files: List[str], + output_path: str, progress_callback: Optional[Callable] = None) -> bool: + """生成DOCX文档(兼容旧接口)""" + return docx_generator.generate(sections, image_files, output_path, progress_callback) \ No newline at end of file diff --git a/error_chars.py b/error_chars.py new file mode 100644 index 0000000..ee5a62a --- /dev/null +++ b/error_chars.py @@ -0,0 +1,323 @@ +""" +错别字处理模块 + +负责错别字的加载、管理和文本错误引入功能。 +支持自定义错别字库,可按强度控制错误引入比例。 +""" + +import os +import json +import random +import re +from typing import Dict, List, Tuple + + +class ErrorCharProcessor: + """错别字处理器类""" + + def __init__(self, db_path: str = "data/error_chars.json"): + """ + 初始化错别字处理器 + + Args: + db_path: 错别字库文件路径 + """ + self.db_path = db_path + self.error_chars = self.load_error_chars() + + def load_error_chars(self) -> Dict[str, str]: + """ + 加载错别字库 + + Returns: + Dict[str, str]: 错别字映射字典 {正确字: 错误字} + """ + # 检查文件夹是否存在,不存在则创建 + dir_name = os.path.dirname(self.db_path) + if dir_name and not os.path.exists(dir_name): + os.makedirs(dir_name) + + print(f"加载错别字库文件: {self.db_path}") + + # 检查文件是否存在,不存在则创建默认库 + if not os.path.exists(self.db_path): + default_chars = self._get_default_error_chars() + self.save_error_chars(default_chars) + return default_chars + + # 加载已存在的错别字库 + try: + with open(self.db_path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + print(f"加载错别字库失败: {e}") + # 如果加载失败,返回默认库 + return self._get_default_error_chars() + + def save_error_chars(self, error_chars: Dict[str, str]) -> bool: + """ + 保存错别字库到文件 + + Args: + error_chars: 错别字映射字典 + + Returns: + bool: 是否保存成功 + """ + try: + # 确保目录存在 + dir_name = os.path.dirname(self.db_path) + if dir_name and not os.path.exists(dir_name): + os.makedirs(dir_name) + + with open(self.db_path, 'w', encoding='utf-8') as f: + json.dump(error_chars, f, ensure_ascii=False, indent=2) + return True + except Exception as e: + print(f"保存错别字库失败: {e}") + return False + + def _get_default_error_chars(self) -> Dict[str, str]: + """ + 获取默认错别字库 + + Returns: + Dict[str, str]: 默认错别字映射 + """ + return { + "的": "地", + "地": "得", + "得": "的", + "在": "再", + "再": "在", + "是": "事", + "事": "是", + "他": "她", + "她": "他", + "你": "您", + "您": "你", + "们": "门", + "门": "们", + "有": "又", + "又": "有", + "和": "合", + "合": "和", + "到": "倒", + "倒": "到", + "就": "才", + "才": "就", + "要": "耍", + "耍": "要", + "会": "汇", + "汇": "会", + "看": "着", + "着": "看", + "说": "讲", + "讲": "说", + "做": "作", + "作": "做", + "已": "己", + "己": "已", + "以": "已", + "已": "以", + "进": "近", + "近": "进", + "象": "像", + "像": "象", + "对": "队", + "队": "对", + "分": "份", + "份": "分", + } + + def introduce_char_errors(self, text: str, intensity: float = 1.0) -> Tuple[str, int, List[str], List[str]]: + """ + 将文本中的正确单字替换为常见错误单字 + + Args: + text: 要处理的文本 + intensity: 错误引入强度,0.0-1.0之间,1.0表示替换所有可能的字 + + Returns: + Tuple[str, int, List[str], List[str]]: + 处理后的文本、替换的总数量、原句列表、处理后的句子列表 + """ + if not text or intensity <= 0: + return text, 0, [], [] + + # 句子拆分 + original_sentences = self._split_into_sentences(text) + modified_sentences = [] + total_replace = 0 + + for sentence in original_sentences: + modified, count = self._introduce_errors_to_sentence(sentence, intensity) + modified_sentences.append(modified) + total_replace += count + + modified_text = ''.join(modified_sentences) + return modified_text, total_replace, original_sentences, modified_sentences + + def _split_into_sentences(self, text: str) -> List[str]: + """ + 句子拆分函数 + + Args: + text: 要拆分的文本 + + Returns: + List[str]: 拆分后的句子列表 + """ + separators = re.compile(r'([。!?;,.!?;])') + parts = separators.split(text) + sentences = [] + + for i in range(0, len(parts) - 1, 2): + if parts[i] or parts[i + 1]: + sentences.append(parts[i] + parts[i + 1]) + + if len(parts) % 2 == 1 and parts[-1]: + sentences.append(parts[-1]) + + return sentences + + def _introduce_errors_to_sentence(self, sentence: str, intensity: float) -> Tuple[str, int]: + """ + 单句错误引入函数 + + Args: + sentence: 要处理的句子 + intensity: 错误引入强度 + + Returns: + Tuple[str, int]: 处理后的句子和替换数量 + """ + modified = list(sentence) + replace_count = 0 + + for i, char in enumerate(modified): + if char in self.error_chars and random.random() <= intensity: + modified[i] = self.error_chars[char] + replace_count += 1 + + return ''.join(modified), replace_count + + def add_error_mapping(self, correct_char: str, error_char: str) -> None: + """ + 添加错别字映射 + + Args: + correct_char: 正确字符 + error_char: 错误字符 + """ + self.error_chars[correct_char] = error_char + + def remove_error_mapping(self, correct_char: str) -> bool: + """ + 删除错别字映射 + + Args: + correct_char: 要删除的正确字符 + + Returns: + bool: 是否删除成功 + """ + if correct_char in self.error_chars: + del self.error_chars[correct_char] + return True + return False + + def get_error_chars(self) -> Dict[str, str]: + """ + 获取当前错别字映射 + + Returns: + Dict[str, str]: 错别字映射字典 + """ + return self.error_chars.copy() + + def update_error_chars(self, new_error_chars: Dict[str, str]) -> None: + """ + 更新错别字映射 + + Args: + new_error_chars: 新的错别字映射 + """ + self.error_chars.update(new_error_chars) + + def clear_error_chars(self) -> None: + """清空所有错别字映射""" + self.error_chars.clear() + + def reset_to_default(self) -> None: + """重置为默认错别字库""" + self.error_chars = self._get_default_error_chars() + + def get_statistics(self, text: str, intensity: float = 1.0) -> Dict[str, int]: + """ + 获取文本错误引入统计信息(不实际修改文本) + + Args: + text: 要统计的文本 + intensity: 错误引入强度 + + Returns: + Dict[str, int]: 统计信息 + """ + if not text: + return {"total_chars": 0, "replaceable_chars": 0, "estimated_replacements": 0} + + total_chars = len(text) + replaceable_chars = sum(1 for char in text if char in self.error_chars) + estimated_replacements = int(replaceable_chars * intensity) + + return { + "total_chars": total_chars, + "replaceable_chars": replaceable_chars, + "estimated_replacements": estimated_replacements + } + + +def create_error_processor(db_path: str = "data/error_chars.json") -> ErrorCharProcessor: + """ + 创建错别字处理器实例的工厂函数 + + Args: + db_path: 错别字库文件路径 + + Returns: + ErrorCharProcessor: 错别字处理器实例 + """ + return ErrorCharProcessor(db_path) + + +# 兼容旧接口的函数 +def load_error_chars(db_path: str = "data/error_chars.json") -> Dict[str, str]: + """ + 加载错别字库(兼容旧接口) + + Args: + db_path: 错别字库文件路径 + + Returns: + Dict[str, str]: 错别字映射字典 + """ + processor = ErrorCharProcessor(db_path) + return processor.get_error_chars() + + +def introduce_char_errors(text: str, intensity: float = 1.0, db_path: str = "data/error_chars.json") -> Tuple[str, int, List[str], List[str]]: + """ + 将文本中的正确单字替换为常见错误单字(兼容旧接口) + + Args: + text: 要处理的文本 + intensity: 错误引入强度,0.0-1.0之间 + db_path: 错别字库文件路径 + + Returns: + Tuple[str, int, List[str], List[str]]: + 处理后的文本、替换的总数量、原句列表、处理后的句子列表 + """ + processor = ErrorCharProcessor(db_path) + return processor.introduce_char_errors(text, intensity) \ No newline at end of file diff --git a/file_handler.py b/file_handler.py new file mode 100644 index 0000000..ba834a0 --- /dev/null +++ b/file_handler.py @@ -0,0 +1,393 @@ +""" +文件处理模块 + +负责文件系统相关的操作,包括文件扫描、匹配、读取和路径处理等功能。 +""" + +import os +import glob +from typing import List, Dict, Any +from config import config + + +class FileHandler: + """文件处理器类,负责文件相关的操作""" + + @staticmethod + def scan_txt_files(folder_path: str) -> List[Dict[str, str]]: + """ + 扫描文件夹中的所有TXT文件 + + Args: + folder_path: TXT文件所在的文件夹路径 + + Returns: + List[Dict[str, str]]: TXT文件信息列表,每个元素包含path、name、relative_path、folder + + Raises: + Exception: 当文件夹不存在或没有找到TXT文件时 + """ + if not os.path.isdir(folder_path): + raise Exception(f"TXT文件夹不存在: {folder_path}") + + txt_files = [] + + for root, dirs, files in os.walk(folder_path): + for file in files: + if file.lower().endswith(".txt"): + txt_path = os.path.join(root, file) + file_name = os.path.splitext(file)[0] + txt_files.append({ + "path": txt_path, + "name": file_name, + "relative_path": os.path.relpath(txt_path, folder_path), + "folder": root + }) + + if not txt_files: + raise Exception(f"在 {folder_path} 中未找到任何TXT文件") + + return sorted(txt_files, key=lambda x: x["relative_path"]) + + @staticmethod + def find_matching_image_folders(txt_files: List[Dict[str, str]], images_root: str) -> List[Dict[str, Any]]: + """ + 根据TXT文件名匹配图片文件夹 + + Args: + txt_files: TXT文件信息列表 + images_root: 图片根文件夹路径 + + Returns: + List[Dict[str, Any]]: 匹配的文件对列表,每个元素包含txt、image_folder、all_matches + + Raises: + Exception: 当图片根文件夹不存在时 + """ + if not os.path.isdir(images_root): + raise Exception(f"图片根文件夹不存在: {images_root}") + + # 获取所有图片文件夹 + all_image_folders = [] + for root, dirs, _ in os.walk(images_root): + for dir_name in dirs: + folder_path = os.path.join(root, dir_name) + all_image_folders.append({ + "path": folder_path, + "name": dir_name, + "relative_path": os.path.relpath(folder_path, images_root) + }) + + matched_pairs = [] + + for txt in txt_files: + matches = FileHandler._find_matches_for_txt(txt, all_image_folders) + + if matches: + # 选择最短路径的匹配项 + matches.sort(key=lambda x: len(x["relative_path"])) + matched_pairs.append({ + "txt": txt, + "image_folder": matches[0], + "all_matches": matches + }) + else: + matched_pairs.append({ + "txt": txt, + "image_folder": None, + "all_matches": [] + }) + + return matched_pairs + + @staticmethod + def _find_matches_for_txt(txt_info: Dict[str, str], image_folders: List[Dict[str, str]]) -> List[Dict[str, str]]: + """ + 为单个TXT文件查找匹配的图片文件夹 + + Args: + txt_info: TXT文件信息 + image_folders: 所有图片文件夹信息列表 + + Returns: + List[Dict[str, str]]: 匹配的图片文件夹列表 + """ + matches = [] + txt_name = txt_info["name"].lower() + + for img_folder in image_folders: + folder_name = img_folder["name"].lower() + + if config.match_pattern == "exact" and txt_name == folder_name: + matches.append(img_folder) + elif config.match_pattern == "prefix" and folder_name.startswith(txt_name): + matches.append(img_folder) + elif config.match_pattern == "contains" and txt_name in folder_name: + matches.append(img_folder) + + return matches + + @staticmethod + def get_image_files(folder_path: str) -> List[str]: + """ + 获取文件夹中的所有图片文件 + + Args: + folder_path: 图片文件夹路径 + + Returns: + List[str]: 图片文件路径列表,按配置的排序方式排序 + """ + if not folder_path or not os.path.isdir(folder_path): + return [] + + image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.gif', '*.webp', '*.tiff'] + image_files = [] + + for ext in image_extensions: + pattern = os.path.join(folder_path, ext) + image_files.extend(glob.glob(pattern)) + # 也检查大写扩展名 + pattern_upper = os.path.join(folder_path, ext.upper()) + image_files.extend(glob.glob(pattern_upper)) + + # 去重(防止大小写扩展名重复) + image_files = list(set(image_files)) + + # 根据配置排序 + if config.image_sort_by == "name": + image_files.sort() + elif config.image_sort_by == "time": + image_files.sort(key=lambda x: os.path.getmtime(x)) + + return image_files + + @staticmethod + def read_markdown_txt(file_path: str) -> str: + """ + 读取含Markdown内容的TXT文件 + + Args: + file_path: TXT文件路径 + + Returns: + str: 文件内容 + + Raises: + Exception: 当文件不存在或无法解析时 + """ + if not os.path.exists(file_path): + raise Exception(f"TXT文件不存在: {file_path}") + + # 尝试多种编码 + encodings = [config.txt_encoding, "gbk", "utf-16", "iso-8859-1"] + + for encoding in encodings: + try: + with open(file_path, 'r', encoding=encoding) as f: + content = f.read() + + # 统一换行符 + content = content.replace("\r\n", "\n").replace("\r", "\n") + return content + + except UnicodeDecodeError: + continue + except Exception as e: + print(f"读取文件 {file_path} 时出错 (编码: {encoding}): {e}") + continue + + raise Exception(f"无法解析TXT文件(编码问题): {file_path}") + + @staticmethod + def prepare_output_path(txt_info: Dict[str, str], images_root: str, output_root: str) -> str: + """ + 准备输出文件路径 + + Args: + txt_info: TXT文件信息 + images_root: 图片根目录(备用) + output_root: 输出根目录 + + Returns: + str: 输出文件的完整路径 + """ + # 根据配置决定输出位置 + if config.output_location == "txt_folder": + base_folder = txt_info["folder"] + else: + base_folder = output_root + + # 确保输出文件夹存在 + os.makedirs(base_folder, exist_ok=True) + + # 生成输出文件名 + txt_name = txt_info["name"] + output_path = os.path.join(base_folder, f"{txt_name}.docx") + + # 如果文件已存在,添加序号 + counter = 1 + while os.path.exists(output_path): + output_path = os.path.join(base_folder, f"{txt_name}_{counter}.docx") + counter += 1 + + return output_path + + @staticmethod + def validate_paths(txt_folder: str, images_root: str, output_root: str = None) -> Dict[str, bool]: + """ + 验证路径的有效性 + + Args: + txt_folder: TXT文件夹路径 + images_root: 图片根文件夹路径 + output_root: 输出根文件夹路径(可选) + + Returns: + Dict[str, bool]: 路径验证结果 + """ + result = { + "txt_folder_valid": bool(txt_folder and os.path.isdir(txt_folder)), + "images_root_valid": bool(images_root and os.path.isdir(images_root)), + "output_root_valid": True # 默认有效,因为可以创建 + } + + # 如果指定了输出路径且配置要求使用自定义路径,则验证输出路径 + if config.output_location == "custom" and output_root: + try: + # 尝试创建输出目录(如果不存在) + if not os.path.exists(output_root): + os.makedirs(output_root, exist_ok=True) + result["output_root_valid"] = os.path.isdir(output_root) + except Exception: + result["output_root_valid"] = False + + return result + + @staticmethod + def get_folder_statistics(folder_path: str) -> Dict[str, int]: + """ + 获取文件夹统计信息 + + Args: + folder_path: 文件夹路径 + + Returns: + Dict[str, int]: 统计信息,包含txt_files、image_folders、total_images等 + """ + stats = { + "txt_files": 0, + "image_folders": 0, + "total_images": 0, + "total_subfolders": 0 + } + + if not os.path.isdir(folder_path): + return stats + + try: + # 统计TXT文件 + for root, dirs, files in os.walk(folder_path): + for file in files: + if file.lower().endswith(".txt"): + stats["txt_files"] += 1 + + # 统计子文件夹(可能包含图片) + for root, dirs, files in os.walk(folder_path): + stats["total_subfolders"] += len(dirs) + + # 检查是否包含图片 + image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tiff'] + has_images = any( + file.lower().endswith(ext) for file in files + for ext in image_extensions + ) + + if has_images: + stats["image_folders"] += 1 + # 统计图片数量 + for file in files: + if any(file.lower().endswith(ext) for ext in image_extensions): + stats["total_images"] += 1 + + except Exception as e: + print(f"获取文件夹统计信息时出错: {e}") + + return stats + + @staticmethod + def create_backup(file_path: str) -> str: + """ + 创建文件备份 + + Args: + file_path: 要备份的文件路径 + + Returns: + str: 备份文件路径,如果备份失败则返回空字符串 + """ + if not os.path.exists(file_path): + return "" + + try: + backup_path = f"{file_path}.backup" + counter = 1 + + # 如果备份文件已存在,添加序号 + while os.path.exists(backup_path): + backup_path = f"{file_path}.backup.{counter}" + counter += 1 + + # 复制文件 + import shutil + shutil.copy2(file_path, backup_path) + return backup_path + + except Exception as e: + print(f"创建备份文件失败: {e}") + return "" + + @staticmethod + def cleanup_temp_files(temp_dir: str) -> None: + """ + 清理临时文件 + + Args: + temp_dir: 临时文件目录 + """ + try: + if os.path.exists(temp_dir): + import shutil + shutil.rmtree(temp_dir) + except Exception as e: + print(f"清理临时文件失败: {e}") + + +# 创建全局文件处理器实例 +file_handler = FileHandler() + + +# 兼容旧接口的函数 +def scan_txt_files(folder_path: str) -> List[Dict[str, str]]: + """扫描TXT文件(兼容旧接口)""" + return FileHandler.scan_txt_files(folder_path) + + +def find_matching_image_folders(txt_files: List[Dict[str, str]], images_root: str) -> List[Dict[str, Any]]: + """查找匹配的图片文件夹(兼容旧接口)""" + return FileHandler.find_matching_image_folders(txt_files, images_root) + + +def get_image_files(folder_path: str) -> List[str]: + """获取图片文件(兼容旧接口)""" + return FileHandler.get_image_files(folder_path) + + +def read_markdown_txt(file_path: str) -> str: + """读取Markdown TXT文件(兼容旧接口)""" + return FileHandler.read_markdown_txt(file_path) + + +def prepare_output_path(txt_info: Dict[str, str], images_root: str, output_root: str) -> str: + """准备输出路径(兼容旧接口)""" + return FileHandler.prepare_output_path(txt_info, images_root, output_root) \ No newline at end of file diff --git a/gui_config.py b/gui_config.py new file mode 100644 index 0000000..ca938ec --- /dev/null +++ b/gui_config.py @@ -0,0 +1,183 @@ +""" +GUI配置窗口模块 + +提供配置设置的图形界面。 +""" + +import PySimpleGUI as sg +from config import config + + +def show_config_window(): + """显示配置窗口""" + # 创建标签页布局 + tab_file_layout = [ + [sg.Text('文件处理设置', font=('bold', 12))], + [sg.HSeparator()], + [sg.Text('TXT编码:', size=(12, 1)), + sg.Combo(['utf-8', 'gbk', 'utf-16'], default_value=config.txt_encoding, key='txt_encoding', size=(15, 1))], + [sg.Text('匹配模式:', size=(12, 1))], + [sg.Radio('完全匹配(文件名与文件夹名相同)', 'match', default=config.match_pattern == "exact", + key='match_exact')], + [sg.Radio('前缀匹配', 'match', default=config.match_pattern == "prefix", key='match_prefix')], + [sg.Radio('包含匹配', 'match', default=config.match_pattern == "contains", key='match_contains')], + [sg.HSeparator()], + [sg.Text('输出位置:', size=(12, 1))], + [sg.Radio('输出到TXT文件所在文件夹', 'output_loc', default=config.output_location == "txt_folder", + key='output_txt_folder')], + [sg.Radio('输出到指定文件夹', 'output_loc', default=config.output_location == "custom", key='output_custom')] + ] + + tab_text_layout = [ + [sg.Text('文字处理设置', font=('bold', 12))], + [sg.HSeparator()], + [sg.Checkbox('转换文字顺序', key='-REVERSE_TEXT-', default=config.reverse_text_order)], + [sg.Checkbox('替换标点符号(句号转逗号,保留结尾句号)', key='-REPLACE_PUNCTUATION-', + default=config.replace_punctuation)], + [sg.HSeparator()], + [sg.Text('错别字处理', font=('bold', 11), text_color='darkblue')], + [sg.Checkbox('启用错别字处理', key='-ENABLE_CHAR_ERRORS-', default=config.enable_char_errors, + enable_events=True)], + [sg.Text('错误强度:', size=(10, 1)), + sg.Slider(range=(0.0, 1.0), default_value=config.char_error_intensity, resolution=0.1, + orientation='h', size=(20, 15), key='char_error_intensity', disabled=not config.enable_char_errors)], + [sg.Text('错别字库路径:', size=(12, 1)), + sg.InputText(config.char_error_db_path, key='char_error_db_path', size=(30, 1), + disabled=not config.enable_char_errors), + sg.FileBrowse('浏览', file_types=(("JSON Files", "*.json"),), disabled=not config.enable_char_errors)], + [sg.HSeparator()], + [sg.Checkbox('添加免责声明', key='-ADD_DISCLAIMER-', default=config.add_disclaimer)] + ] + + tab_image_layout = [ + [sg.Text('图片处理设置', font=('bold', 12))], + [sg.HSeparator()], + [sg.Text('图片排序方式:', size=(12, 1))], + [sg.Radio('按名称', 'sort', default=config.image_sort_by == "name", key='sort_name'), + sg.Radio('按修改时间', 'sort', default=config.image_sort_by == "time", key='sort_time')], + [sg.HSeparator()], + [sg.Text('图片尺寸调整:', size=(12, 1))], + [sg.Radio('不调整', 'resize', default=config.image_resize == "none", key='resize_none')], + [sg.Radio('按宽度:', 'resize', default=config.image_resize == "width", key='resize_width'), + sg.InputText(str(config.image_width), size=(8, 1), key='image_width'), + sg.Text('英寸')], + [sg.HSeparator()], + [sg.Text('图片对齐方式:', size=(12, 1))], + [sg.Radio('左对齐', 'align', default=config.image_alignment == "left", key='align_left'), + sg.Radio('居中', 'align', default=config.image_alignment == "center", key='align_center'), + sg.Radio('右对齐', 'align', default=config.image_alignment == "right", key='align_right')], + [sg.HSeparator()], + [sg.Text('图片不足时策略:', size=(12, 1))], + [sg.Radio('循环使用', 'strategy', default=config.image_strategy == "cycle", key='strategy_cycle')], + [sg.Radio('忽略多余标题', 'strategy', default=config.image_strategy == "truncate", key='strategy_truncate')], + [sg.Radio('重复最后一张', 'strategy', default=config.image_strategy == "repeat_last", key='strategy_repeat')] + ] + + tab_format_layout = [ + [sg.Text('文档格式设置', font=('bold', 12))], + [sg.HSeparator()], + [sg.Text('行间距:', size=(12, 1)), + sg.InputText(str(config.line_spacing), size=(8, 1), key='line_spacing')], + [sg.Text('最大标题层级:', size=(12, 1)), + sg.Combo([1, 2, 3, 4, 5, 6], default_value=config.title_levels, key='title_levels', size=(8, 1))] + ] + + layout = [ + [sg.TabGroup([ + [sg.Tab('文件处理', tab_file_layout, key='tab_file')], + [sg.Tab('文字处理', tab_text_layout, key='tab_text')], + [sg.Tab('图片处理', tab_image_layout, key='tab_image')], + [sg.Tab('文档格式', tab_format_layout, key='tab_format')] + ])], + [sg.HSeparator()], + [sg.Button('确定', size=(10, 1)), sg.Button('取消', size=(10, 1)), sg.Button('重置为默认', size=(12, 1))] + ] + + window = sg.Window('转换设置', layout, modal=True, resizable=True, size=(500, 450)) + + while True: + event, values = window.read() + + if event in (sg.WIN_CLOSED, '取消'): + break + + # 处理错别字启用/禁用事件 + if event == '-ENABLE_CHAR_ERRORS-': + enabled = values['-ENABLE_CHAR_ERRORS-'] + window['char_error_intensity'].update(disabled=not enabled) + window['char_error_db_path'].update(disabled=not enabled) + + if event == '重置为默认': + # 重置为默认值 + from config import Config + default_config = Config() + window['txt_encoding'].update(default_config.txt_encoding) + window['match_exact'].update(True) + window['output_txt_folder'].update(True) + window['-REVERSE_TEXT-'].update(default_config.reverse_text_order) + window['-REPLACE_PUNCTUATION-'].update(default_config.replace_punctuation) + window['-ENABLE_CHAR_ERRORS-'].update(default_config.enable_char_errors) + window['char_error_intensity'].update(default_config.char_error_intensity) + window['char_error_db_path'].update(default_config.char_error_db_path) + window['-ADD_DISCLAIMER-'].update(default_config.add_disclaimer) + window['sort_name'].update(True) + window['resize_none'].update(True) + window['image_width'].update(str(default_config.image_width)) + window['align_center'].update(True) + window['strategy_cycle'].update(True) + window['line_spacing'].update(str(default_config.line_spacing)) + window['title_levels'].update(default_config.title_levels) + + if event == '确定': + # 保存配置 + config.txt_encoding = values['txt_encoding'] + + if values['match_exact']: + config.match_pattern = "exact" + elif values['match_prefix']: + config.match_pattern = "prefix" + else: + config.match_pattern = "contains" + + config.output_location = "txt_folder" if values['output_txt_folder'] else "custom" + config.image_sort_by = "name" if values['sort_name'] else "time" + config.image_resize = "none" if values['resize_none'] else "width" + config.reverse_text_order = values['-REVERSE_TEXT-'] + config.replace_punctuation = values['-REPLACE_PUNCTUATION-'] + config.add_disclaimer = values['-ADD_DISCLAIMER-'] + + # 错别字处理配置 + config.enable_char_errors = values['-ENABLE_CHAR_ERRORS-'] + config.char_error_intensity = values['char_error_intensity'] + config.char_error_db_path = values['char_error_db_path'] + + try: + config.image_width = float(values['image_width']) + except: + pass + + if values['align_left']: + config.image_alignment = "left" + elif values['align_right']: + config.image_alignment = "right" + else: + config.image_alignment = "center" + + if values['strategy_cycle']: + config.image_strategy = "cycle" + elif values['strategy_truncate']: + config.image_strategy = "truncate" + else: + config.image_strategy = "repeat_last" + + try: + config.line_spacing = float(values['line_spacing']) + config.title_levels = int(values['title_levels']) + except: + pass + + from config import CONFIG_FILE_PATH + config.save_to_file(CONFIG_FILE_PATH) + break + + window.close() \ No newline at end of file diff --git a/gui_matching_editor.py b/gui_matching_editor.py new file mode 100644 index 0000000..6075022 --- /dev/null +++ b/gui_matching_editor.py @@ -0,0 +1,85 @@ +""" +GUI匹配编辑器模块 + +提供编辑文件匹配关系的图形界面。 +""" + +import os +import PySimpleGUI as sg + + +def show_matching_editor(matched_pairs, images_root): + """显示匹配编辑窗口,允许手动调整匹配关系""" + # 获取所有图片文件夹 + all_image_folders = [] + if os.path.isdir(images_root): + for root, dirs, _ in os.walk(images_root): + for dir_name in dirs: + folder_path = os.path.join(root, dir_name) + rel_path = os.path.relpath(folder_path, images_root) + all_image_folders.append((folder_path, rel_path)) + + # 准备表格数据 + table_data = [] + for i, pair in enumerate(matched_pairs): + txt_name = pair['txt']['name'] + img_folder = pair['image_folder']['relative_path'] if pair['image_folder'] else "无匹配" + table_data.append([i, txt_name, img_folder]) + + layout = [ + [sg.Text('文件匹配编辑', font=('bold', 14))], + [sg.Text('选择要修改的项目,然后从右侧选择图片文件夹')], + [ + sg.Table( + values=table_data, + headings=['序号', 'TXT文件名', '匹配的图片文件夹'], + key='-TABLE-', + select_mode=sg.TABLE_SELECT_MODE_BROWSE, + enable_events=True, + justification='left', + size=(None, 15) + ), + sg.VSeparator(), + sg.Listbox( + values=[f[1] for f in all_image_folders], + key='-FOLDERS-', + size=(40, 15), + enable_events=True + ) + ], + [sg.Button('设置选中项'), sg.Button('清除选中项'), sg.Button('应用所有')] + ] + + window = sg.Window('匹配编辑', layout, resizable=True) + selected_row = None + + while True: + event, values = window.read() + + if event in (sg.WIN_CLOSED, '应用所有'): + break + + if event == '-TABLE-': + if values['-TABLE-']: + selected_row = values['-TABLE-'][0] + + if event == '设置选中项' and selected_row is not None and values['-FOLDERS-']: + folder_idx = [i for i, f in enumerate(all_image_folders) if f[1] == values['-FOLDERS-'][0]][0] + folder_path, folder_rel = all_image_folders[folder_idx] + + matched_pairs[selected_row]['image_folder'] = { + "path": folder_path, + "name": os.path.basename(folder_path), + "relative_path": folder_rel + } + + table_data[selected_row][2] = folder_rel + window['-TABLE-'].update(values=table_data) + + if event == '清除选中项' and selected_row is not None: + matched_pairs[selected_row]['image_folder'] = None + table_data[selected_row][2] = "无匹配" + window['-TABLE-'].update(values=table_data) + + window.close() + return matched_pairs \ No newline at end of file diff --git a/gui_results.py b/gui_results.py new file mode 100644 index 0000000..0870fbf --- /dev/null +++ b/gui_results.py @@ -0,0 +1,44 @@ +""" +GUI结果显示模块 + +提供处理结果显示的图形界面。 +""" + +import os +import sys +import PySimpleGUI as sg + + +def show_results_window(results): + """显示批量处理结果窗口""" + if results['failed'] == 0: + message = f"全部成功!\n共处理 {results['total']} 个文件,全部转换成功。" + if results['main_output_folder']: + message += f"\n主要输出文件夹: {results['main_output_folder']}" + sg.popup('处理完成', message) + else: + failed_text = "\n".join([f"- {item['name']}: {item['error']}" for item in results['failed_items']]) + message = (f"处理完成!\n共处理 {results['total']} 个文件," + f"{results['success']} 个成功,{results['failed']} 个失败。\n\n" + f"失败项:\n{failed_text}") + if results['main_output_folder']: + message += f"\n主要输出文件夹: {results['main_output_folder']}" + sg.popup_scrolled('处理完成', message, size=(60, 20)) + + # 询问是否打开输出文件夹 + if results['main_output_folder'] and os.path.exists(results['main_output_folder']): + if sg.popup_yes_no('是否打开主要输出文件夹?') == 'Yes': + _open_folder(results['main_output_folder']) + + +def _open_folder(folder_path): + """打开文件夹""" + try: + if sys.platform.startswith('win'): + os.startfile(folder_path) + elif sys.platform.startswith('darwin'): + os.system(f'open "{folder_path}"') + else: + os.system(f'xdg-open "{folder_path}"') + except Exception as e: + sg.popup_error(f"无法打开文件夹: {e}") \ No newline at end of file diff --git a/image_processor.py b/image_processor.py new file mode 100644 index 0000000..71c3b73 --- /dev/null +++ b/image_processor.py @@ -0,0 +1,356 @@ +""" +图片处理模块 + +负责图片文件的处理,包括图片读取、尺寸调整、格式转换和对齐设置等功能。 +""" + +import os +from typing import Tuple, Optional +from PIL import Image +from docx.shared import Inches +from docx.enum.text import WD_ALIGN_PARAGRAPH +from config import config + + +class ImageProcessor: + """图片处理器类""" + + @staticmethod + def process_image(image_path: str) -> Tuple[Image.Image, float]: + """ + 处理图片,包括方向矫正和尺寸调整 + + Args: + image_path: 图片文件路径 + + Returns: + Tuple[Image.Image, float]: 处理后的图片对象和宽度(英寸) + + Raises: + Exception: 处理图片失败时 + """ + if not os.path.exists(image_path): + raise Exception(f"图片文件不存在: {image_path}") + + try: + with Image.open(image_path) as img: + # 处理图片方向(EXIF旋转信息) + img = ImageProcessor._fix_image_orientation(img) + + # 调整图片尺寸 + img, width_inches = ImageProcessor._resize_image(img) + + return img, width_inches + + except Exception as e: + raise Exception(f"处理图片失败 {image_path}: {str(e)}") + + @staticmethod + def _fix_image_orientation(img: Image.Image) -> Image.Image: + """ + 根据EXIF信息修正图片方向 + + Args: + img: PIL图片对象 + + Returns: + Image.Image: 方向修正后的图片 + """ + try: + # 检查是否有EXIF数据 + if hasattr(img, '_getexif'): + exif = img._getexif() + if exif is not None: + # EXIF方向标签 + orientation_tag = 274 + if orientation_tag in exif: + orientation = exif[orientation_tag] + + # 根据方向值进行旋转 + if orientation == 3: + img = img.rotate(180, expand=True) + elif orientation == 6: + img = img.rotate(270, expand=True) + elif orientation == 8: + img = img.rotate(90, expand=True) + except Exception as e: + print(f"修正图片方向时出错: {e}") + + return img + + @staticmethod + def _resize_image(img: Image.Image) -> Tuple[Image.Image, float]: + """ + 根据配置调整图片尺寸 + + Args: + img: PIL图片对象 + + Returns: + Tuple[Image.Image, float]: 调整后的图片和宽度(英寸) + """ + if config.image_resize == "width" and config.image_width > 0: + # 按指定宽度调整 + target_width_px = config.image_width * 96 # 96 DPI + width, height = img.size + + if width > target_width_px: + ratio = target_width_px / width + new_height = int(height * ratio) + img = img.resize((int(target_width_px), new_height), Image.LANCZOS) + + return img, config.image_width + else: + # 不调整尺寸,计算当前宽度(英寸) + width_inches = img.width / 96 # 假设96 DPI + return img, width_inches + + @staticmethod + def get_image_alignment(): + """ + 获取图片对齐方式的Word枚举值 + + Returns: + WD_ALIGN_PARAGRAPH: Word对齐方式枚举 + """ + alignment_map = { + "left": WD_ALIGN_PARAGRAPH.LEFT, + "center": WD_ALIGN_PARAGRAPH.CENTER, + "right": WD_ALIGN_PARAGRAPH.RIGHT + } + + return alignment_map.get(config.image_alignment, WD_ALIGN_PARAGRAPH.CENTER) + + @staticmethod + def validate_image(image_path: str) -> dict: + """ + 验证图片文件的有效性 + + Args: + image_path: 图片文件路径 + + Returns: + dict: 验证结果,包含有效性、错误信息和图片信息 + """ + result = { + "valid": False, + "error": None, + "info": {} + } + + if not os.path.exists(image_path): + result["error"] = "文件不存在" + return result + + try: + with Image.open(image_path) as img: + result["valid"] = True + result["info"] = { + "format": img.format, + "mode": img.mode, + "size": img.size, + "width": img.width, + "height": img.height + } + + # 检查图片是否过大 + if img.width > 10000 or img.height > 10000: + result["error"] = "图片尺寸过大" + result["valid"] = False + + except Exception as e: + result["error"] = f"无法打开图片: {str(e)}" + + return result + + @staticmethod + def get_supported_formats() -> list: + """ + 获取支持的图片格式列表 + + Returns: + list: 支持的图片格式扩展名列表 + """ + return ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tiff'] + + @staticmethod + def convert_image_format(image_path: str, target_format: str, output_path: str) -> bool: + """ + 转换图片格式 + + Args: + image_path: 源图片路径 + target_format: 目标格式(如'PNG', 'JPEG') + output_path: 输出文件路径 + + Returns: + bool: 是否转换成功 + """ + try: + with Image.open(image_path) as img: + # 如果是JPEG格式且原图有透明通道,转为RGB + if target_format.upper() == 'JPEG' and img.mode in ('RGBA', 'LA'): + rgb_img = Image.new('RGB', img.size, (255, 255, 255)) + rgb_img.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None) + img = rgb_img + + img.save(output_path, format=target_format) + return True + + except Exception as e: + print(f"转换图片格式失败: {e}") + return False + + @staticmethod + def create_thumbnail(image_path: str, thumbnail_path: str, size: Tuple[int, int] = (200, 200)) -> bool: + """ + 创建图片缩略图 + + Args: + image_path: 源图片路径 + thumbnail_path: 缩略图保存路径 + size: 缩略图尺寸(宽度, 高度) + + Returns: + bool: 是否创建成功 + """ + try: + with Image.open(image_path) as img: + img.thumbnail(size, Image.LANCZOS) + img.save(thumbnail_path) + return True + + except Exception as e: + print(f"创建缩略图失败: {e}") + return False + + @staticmethod + def get_image_info(image_path: str) -> Optional[dict]: + """ + 获取图片详细信息 + + Args: + image_path: 图片文件路径 + + Returns: + Optional[dict]: 图片信息字典,失败时返回None + """ + try: + with Image.open(image_path) as img: + info = { + "filename": os.path.basename(image_path), + "format": img.format, + "mode": img.mode, + "size": img.size, + "width": img.width, + "height": img.height, + "file_size": os.path.getsize(image_path) + } + + # 尝试获取EXIF信息 + if hasattr(img, '_getexif'): + exif = img._getexif() + if exif: + info["has_exif"] = True + # 获取一些常用的EXIF信息 + orientation = exif.get(274) # 方向 + if orientation: + info["orientation"] = orientation + else: + info["has_exif"] = False + else: + info["has_exif"] = False + + return info + + except Exception as e: + print(f"获取图片信息失败: {e}") + return None + + @staticmethod + def batch_validate_images(image_paths: list) -> dict: + """ + 批量验证图片文件 + + Args: + image_paths: 图片文件路径列表 + + Returns: + dict: 验证结果统计 + """ + result = { + "total": len(image_paths), + "valid": 0, + "invalid": 0, + "errors": [] + } + + for image_path in image_paths: + validation = ImageProcessor.validate_image(image_path) + if validation["valid"]: + result["valid"] += 1 + else: + result["invalid"] += 1 + result["errors"].append({ + "path": image_path, + "error": validation["error"] + }) + + return result + + @staticmethod + def optimize_image_for_docx(image_path: str, temp_dir: str) -> str: + """ + 优化图片以适合插入DOCX文档 + + Args: + image_path: 原图片路径 + temp_dir: 临时文件目录 + + Returns: + str: 优化后的图片路径 + """ + try: + # 确保临时目录存在 + os.makedirs(temp_dir, exist_ok=True) + + with Image.open(image_path) as img: + # 修正方向 + img = ImageProcessor._fix_image_orientation(img) + + # 根据配置调整尺寸 + img, _ = ImageProcessor._resize_image(img) + + # 生成临时文件路径 + filename = os.path.basename(image_path) + name, ext = os.path.splitext(filename) + temp_path = os.path.join(temp_dir, f"{name}_optimized{ext}") + + # 保存优化后的图片 + # 如果是PNG且没有透明通道,转为JPEG以减少文件大小 + if img.format == 'PNG' and img.mode == 'RGB': + temp_path = os.path.join(temp_dir, f"{name}_optimized.jpg") + img.save(temp_path, 'JPEG', quality=85, optimize=True) + else: + img.save(temp_path, optimize=True) + + return temp_path + + except Exception as e: + print(f"优化图片失败: {e}") + return image_path # 返回原路径 + + +# 创建全局图片处理器实例 +image_processor = ImageProcessor() + + +# 兼容旧接口的函数 +def process_image(image_path: str) -> Tuple[Image.Image, float]: + """处理图片(兼容旧接口)""" + return ImageProcessor.process_image(image_path) + + +def get_image_alignment(): + """获取图片对齐方式(兼容旧接口)""" + return ImageProcessor.get_image_alignment() \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..d2b58d2 --- /dev/null +++ b/main.py @@ -0,0 +1,368 @@ +""" +主程序文件 + +重构后的主程序,使用模块化的设计,提供清晰的入口点。 +""" + +import sys +import os + +# 添加当前目录到Python路径,确保能导入模块 +current_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, current_dir) + +try: + # 导入所有必要的模块 + from config import config, CONFIG_FILE_PATH + from file_handler import FileHandler + from text_processor import TextProcessor + from markdown_parser import MarkdownParser + from image_processor import ImageProcessor + from error_chars import ErrorCharProcessor + from docx_generator import DocxGenerator + from batch_processor import BatchProcessor + + # GUI相关导入 + import PySimpleGUI as sg + +except ImportError as e: + print(f"导入模块失败: {e}") + print("请确保所有依赖包已正确安装") + sys.exit(1) + + +class TxtToDocxApp: + """TXT转DOCX应用程序主类""" + + def __init__(self): + """初始化应用程序""" + self.matched_pairs = [] + self.file_handler = FileHandler() + self.batch_processor = BatchProcessor() + + # 设置GUI主题 + sg.theme('BlueMono') + + # 加载配置 + config.load_from_file(CONFIG_FILE_PATH) + + def run(self): + """运行应用程序""" + try: + self._show_main_window() + except Exception as e: + sg.popup_error(f"应用程序运行出错: {str(e)}") + finally: + # 保存配置 + config.save_to_file(CONFIG_FILE_PATH) + + def _show_main_window(self): + """显示主界面""" + layout = self._create_main_layout() + window = sg.Window('批量Markdown TXT转DOCX工具', layout, resizable=True) + + try: + self._handle_main_window_events(window) + finally: + window.close() + + def _create_main_layout(self): + """创建主界面布局""" + return [ + [sg.Text('批量Markdown TXT转DOCX工具', font=('bold', 16))], + [sg.Text('(按文件名匹配TXT文件和图片文件夹,支持完整Markdown格式)', text_color='gray')], + [sg.HSeparator()], + [sg.Text('TXT文件文件夹:', size=(15, 1)), + sg.InputText(key='txt_folder', enable_events=True, default_text=config.last_txt_folder), + sg.FolderBrowse('浏览')], + [sg.Text('图片根文件夹:', size=(15, 1)), + sg.InputText(key='images_root', enable_events=True, default_text=config.last_images_root), + sg.FolderBrowse('浏览')], + [sg.Text('输出根文件夹:', size=(15, 1)), + sg.InputText(key='output_root', enable_events=True, default_text=config.last_output_root), + sg.FolderBrowse('浏览'), + sg.Text('(当选择"输出到指定文件夹"时有效)', text_color='gray')], + [sg.Button('扫描文件', size=(12, 1)), + sg.Button('编辑匹配', size=(12, 1), disabled=True), + sg.Button('转换设置', size=(12, 1)), + sg.Button('帮助', size=(8, 1))], + [sg.HSeparator()], + [sg.Text('匹配结果预览:', font=('bold', 10))], + [sg.Table( + values=[], + headings=['TXT文件名', '相对路径', '匹配的图片文件夹'], + key='-PREVIEW_TABLE-', + auto_size_columns=False, + col_widths=[20, 30, 30], + justification='left', + size=(None, 10) + )], + [sg.ProgressBar(100, orientation='h', size=(80, 20), key='progress_bar', visible=False)], + [sg.Text('状态: 就绪', key='status_text', size=(80, 1))], + [sg.Button('开始批量转换', size=(15, 1), disabled=True), sg.Button('退出')] + ] + + def _handle_main_window_events(self, window): + """处理主窗口事件""" + progress_bar = window['progress_bar'] + status_text = window['status_text'] + preview_table = window['-PREVIEW_TABLE-'] + output_root_input = window['output_root'] + + # 初始化窗口,避免更新元素时的警告 + window.read(timeout=1) + + # 初始化输出根文件夹输入框状态 + self._update_output_root_state(output_root_input) + + while True: + event, values = window.read() + + if event in (sg.WIN_CLOSED, '退出'): + self._save_current_settings(values) + break + + elif event == '转换设置': + self._show_config_window() + self._update_output_root_state(output_root_input) + + elif event == '帮助': + self._show_help_window() + + elif event == '扫描文件': + self._handle_scan_files(values, window, status_text, preview_table) + + elif event == '编辑匹配': + self._handle_edit_matching(values, preview_table) + + elif event == '开始批量转换': + self._handle_batch_conversion(values, window, progress_bar, status_text) + + elif event in ('txt_folder', 'images_root') and values[event] and not values.get('output_root', ''): + # 自动设置输出路径 + default_output = values['txt_folder'] if values['txt_folder'] else values['images_root'] + window['output_root'].update(default_output) + + def _update_output_root_state(self, output_root_input): + """根据配置更新输出根文件夹输入框的状态""" + if config.output_location == "custom": + output_root_input.update(disabled=False) + try: + output_root_input.Widget.configure(foreground='black') + except: + pass + else: + output_root_input.update(disabled=True) + try: + output_root_input.Widget.configure(foreground='gray') + except: + pass + + def _save_current_settings(self, values): + """保存当前设置""" + if values: + config.last_txt_folder = values.get('txt_folder', '') + config.last_images_root = values.get('images_root', '') + config.last_output_root = values.get('output_root', '') + config.save_to_file(CONFIG_FILE_PATH) + + def _handle_scan_files(self, values, window, status_text, preview_table): + """处理扫描文件事件""" + txt_folder = values['txt_folder'] + images_root = values['images_root'] + + if not txt_folder: + sg.popup_error('请选择TXT文件所在的文件夹') + return + + if not images_root: + sg.popup_error('请选择图片根文件夹') + return + + # 保存路径 + config.last_txt_folder = txt_folder + config.last_images_root = images_root + if values['output_root']: + config.last_output_root = values['output_root'] + config.save_to_file(CONFIG_FILE_PATH) + + try: + status_text.update('正在扫描TXT文件...') + window.refresh() + + txt_files = self.file_handler.scan_txt_files(txt_folder) + + status_text.update('正在匹配图片文件夹...') + window.refresh() + + self.matched_pairs = self.file_handler.find_matching_image_folders(txt_files, images_root) + + # 更新预览表格 + table_data = [] + for pair in self.matched_pairs: + img_folder = pair['image_folder']['relative_path'] if pair['image_folder'] else "无匹配" + table_data.append([ + pair['txt']['name'], + pair['txt']['relative_path'], + img_folder + ]) + + preview_table.update(values=table_data) + status_text.update(f'扫描完成: 找到 {len(self.matched_pairs)} 个TXT文件') + + # 启用相关按钮 + window['编辑匹配'].update(disabled=False) + window['开始批量转换'].update(disabled=False) + + except Exception as e: + sg.popup_error(f'扫描失败: {str(e)}') + status_text.update('状态: 扫描失败') + + def _handle_edit_matching(self, values, preview_table): + """处理编辑匹配事件""" + images_root = values['images_root'] + if not images_root: + sg.popup_error('请选择图片根文件夹') + return + + if not self.matched_pairs: + sg.popup_error('请先扫描文件') + return + + # 显示匹配编辑窗口 + self.matched_pairs = self._show_matching_editor(self.matched_pairs, images_root) + + # 更新预览表格 + table_data = [] + for pair in self.matched_pairs: + img_folder = pair['image_folder']['relative_path'] if pair['image_folder'] else "无匹配" + table_data.append([ + pair['txt']['name'], + pair['txt']['relative_path'], + img_folder + ]) + + preview_table.update(values=table_data) + + def _handle_batch_conversion(self, values, window, progress_bar, status_text): + """处理批量转换事件""" + if not self.matched_pairs: + sg.popup_error('请先扫描文件') + return + + if config.output_location == "custom" and not values['output_root']: + sg.popup_error('请选择输出根文件夹(在"转换设置"中选择了"输出到指定文件夹")') + return + + try: + progress_bar.update(0, visible=True) + status_text.update('开始批量转换...') + window.refresh() + + def update_batch_progress(progress, text): + progress_bar.update(progress) + status_text.update(f'状态: {text}') + window.refresh() + + results = self.batch_processor.process_batch( + self.matched_pairs, + values['output_root'], + update_batch_progress + ) + + self._show_results_window(results) + status_text.update('状态: 批量转换完成') + + except Exception as e: + sg.popup_error(f'批量处理失败: {str(e)}') + status_text.update('状态: 批量转换失败') + finally: + progress_bar.update(0, visible=False) + + def _show_config_window(self): + """显示配置窗口""" + from gui_config import show_config_window + show_config_window() + + def _show_help_window(self): + """显示帮助窗口""" + help_text = """ +批量Markdown TXT转DOCX工具使用说明: + +1. 选择包含Markdown内容的TXT文件所在文件夹 +2. 选择图片文件夹的根目录(程序会自动查找子文件夹) +3. 选择输出文件的保存根目录(当选择"输出到指定文件夹"时有效) +4. 点击"扫描文件"按钮,程序会自动匹配TXT文件和图片文件夹 +5. 查看匹配结果,可点击"编辑匹配"调整匹配关系 +6. 点击"开始批量转换"生成DOCX文件 + +支持的Markdown格式: +- 标题:# ## ### #### ##### ###### +- 粗体:**文字** 或 __文字__ +- 斜体:*文字* 或 _文字_ +- 行内代码:`代码` +- 代码块:```语言\\n代码\\n``` +- 删除线:~~文字~~ +- 链接:[链接文字](URL) +- 图片:![图片描述](图片路径) +- 无序列表:- 或 * 或 + +- 有序列表:1. 2. 3. +- 引用:> 引用内容 +- 表格:| 列1 | 列2 | +- 水平分隔线:--- 或 *** 或 ___ + +文字处理功能: +- 转换文字顺序:将文字内容进行特定转换处理 +- 错别字处理:可以按设定强度引入常见的错别字,用于测试或特殊用途 +- 标点符号替换:将句号转换为逗号,保留文末句号 + +输出路径选择: +- 输出到TXT文件所在文件夹: 每个DOCX文件会直接保存在对应TXT文件所在的文件夹中 +- 输出到指定文件夹: 所有DOCX文件会直接保存在您指定的文件夹中 + +匹配规则: +- 完全匹配: TXT文件名(不含扩展名)与图片文件夹名完全相同 +- 前缀匹配: 图片文件夹名以前缀形式包含TXT文件名 +- 包含匹配: 图片文件夹名中包含TXT文件名 + +转换规则: +- 每个小标题的第一段后会插入一张图片 +- 先将Markdown格式转换为DOCX格式,再处理文字内容 +- 支持文字顺序调换、错别字处理和标点符号替换功能 + +错别字处理说明: +- 错误强度:控制替换比例,0.0表示不替换,1.0表示替换所有可能的字 +- 错别字库:可自定义JSON格式的错别字映射文件 +- 常见映射:的↔地↔得、在↔再、是↔事等 +""" + sg.popup_scrolled('使用帮助', help_text, size=(70, 25)) + + def _show_matching_editor(self, matched_pairs, images_root): + """显示匹配编辑窗口""" + from gui_matching_editor import show_matching_editor + return show_matching_editor(matched_pairs, images_root) + + def _show_results_window(self, results): + """显示结果窗口""" + from gui_results import show_results_window + show_results_window(results) + + +def main(): + """主函数""" + print("正在启动批量Markdown TXT转DOCX工具...") + + try: + app = TxtToDocxApp() + app.run() + except KeyboardInterrupt: + print("\n用户中断程序运行") + except Exception as e: + print(f"程序运行出错: {e}") + sg.popup_error(f"程序运行出错: {e}") + finally: + print("程序已退出") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/markdown_parser.py b/markdown_parser.py new file mode 100644 index 0000000..848ae0d --- /dev/null +++ b/markdown_parser.py @@ -0,0 +1,538 @@ +""" +Markdown解析模块 + +负责解析Markdown格式的文本,提取结构化信息和内联格式。 +支持标题、列表、代码块、表格、链接等常见Markdown元素。 +""" + +import re +from typing import List, Dict, Any +from config import config + + +class MarkdownParser: + """Markdown解析器类""" + + # Markdown格式匹配模式 + PATTERNS = { + 'heading': re.compile(r'^(\s*)(#{1,6})\s+(.+)$'), + 'bold_asterisk': re.compile(r'\*\*(.+?)\*\*'), + 'bold_underscore': re.compile(r'__(.+?)__'), + 'italic_asterisk': re.compile(r'(?\s*(.+)$'), + 'horizontal_rule': re.compile(r'^(\s*[-*_]){3,}\s*$'), + 'table_row': re.compile(r'^\|(.+)\|$'), + 'table_separator': re.compile(r'^\|(\s*:?-+:?\s*\|)+$') + } + + @classmethod + def parse(cls, txt_content: str) -> List[Dict[str, Any]]: + """ + 解析Markdown内容为结构化数据 + + Args: + txt_content: Markdown文本内容 + + Returns: + List[Dict[str, Any]]: 解析后的结构化数据列表 + """ + if not txt_content: + return [] + + elements = cls._parse_elements(txt_content) + sections = cls._group_by_sections(elements) + return sections + + @classmethod + def _parse_elements(cls, txt_content: str) -> List[Dict[str, Any]]: + """ + 解析Markdown内容为元素列表 + + Args: + txt_content: Markdown文本内容 + + Returns: + List[Dict[str, Any]]: 元素列表 + """ + elements = [] + lines = txt_content.split('\n') + i = 0 + in_code_block = False + code_block_content = [] + table_mode = False + table_rows = [] + code_block_language = "" + + while i < len(lines): + line = lines[i].rstrip('\r') + + # 处理代码块 + if line.strip().startswith('```'): + if not in_code_block: + in_code_block = True + code_block_language = line.strip()[3:].strip() + code_block_content = [] + else: + in_code_block = False + elements.append({ + 'type': 'code_block', + 'language': code_block_language, + 'content': '\n'.join(code_block_content), + 'level': 0 + }) + code_block_content = [] + code_block_language = "" + i += 1 + continue + + if in_code_block: + code_block_content.append(line) + i += 1 + continue + + # 处理表格 + table_match = cls.PATTERNS['table_row'].match(line) + table_sep_match = cls.PATTERNS['table_separator'].match(line) + + if table_match or table_sep_match: + if not table_mode: + table_mode = True + table_rows = [] + + if table_match and not table_sep_match: + cells = [cell.strip() for cell in table_match.group(1).split('|')] + table_rows.append(cells) + + i += 1 + continue + elif table_mode: + # 表格结束 + if table_rows: + elements.append({ + 'type': 'table', + 'rows': table_rows, + 'level': 0 + }) + table_mode = False + table_rows = [] + + # 处理标题 + heading_match = cls.PATTERNS['heading'].match(line) + if heading_match: + level = len(heading_match.group(2)) + if level <= config.title_levels: + heading_text = heading_match.group(3).strip() + # 先移除Markdown标记但保留文本内容 + cleaned_text = re.sub(r'\*\*(.+?)\*\*|__(.+?)__', r'\1\2', heading_text) + elements.append({ + 'type': 'heading', + 'level': level, + 'content': heading_text, # 保留原始内容用于格式处理 + 'cleaned_content': cleaned_text # 用于显示的纯文本 + }) + i += 1 + continue + + # 处理水平分隔线 + if cls.PATTERNS['horizontal_rule'].match(line): + elements.append({ + 'type': 'horizontal_rule', + 'level': 0 + }) + i += 1 + continue + + # 处理列表 + ul_match = cls.PATTERNS['unordered_list'].match(line) + ol_match = cls.PATTERNS['ordered_list'].match(line) + + if ul_match: + elements.append({ + 'type': 'unordered_list', + 'content': ul_match.group(1), + 'level': 0 + }) + i += 1 + continue + + if ol_match: + elements.append({ + 'type': 'ordered_list', + 'content': ol_match.group(1), + 'level': 0 + }) + i += 1 + continue + + # 处理引用 + quote_match = cls.PATTERNS['blockquote'].match(line) + if quote_match: + elements.append({ + 'type': 'blockquote', + 'content': quote_match.group(1), + 'level': 0 + }) + i += 1 + continue + + # 处理空行 + if line.strip() == '': + elements.append({ + 'type': 'empty', + 'content': '', + 'level': 0 + }) + i += 1 + continue + + # 处理普通段落 + elements.append({ + 'type': 'paragraph', + 'content': line, + 'level': 0 + }) + + i += 1 + + # 处理剩余的表格 + if table_mode and table_rows: + elements.append({ + 'type': 'table', + 'rows': table_rows, + 'level': 0 + }) + + return elements + + @classmethod + def _group_by_sections(cls, elements: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + 将解析的元素按标题分组 + + Args: + elements: 元素列表 + + Returns: + List[Dict[str, Any]]: 按章节分组的结构 + """ + sections = [] + current_section = { + 'type': 'section', + 'level': 0, + 'content': '前置内容', + 'elements': [] + } + + for element in elements: + if element['type'] == 'heading': + # 保存当前section + if current_section['elements']: + sections.append(current_section) + + # 创建新section + current_section = { + 'type': 'section', + 'level': element['level'], + 'content': element['content'], + 'elements': [] + } + else: + current_section['elements'].append(element) + + # 添加最后一个section + if current_section['elements'] or current_section['content'] != '前置内容': + sections.append(current_section) + + return sections + + @classmethod + def extract_inline_formatting(cls, text: str) -> List[Dict[str, Any]]: + """ + 提取行内格式信息 + + Args: + text: 要分析的文本 + + Returns: + List[Dict[str, Any]]: 格式信息列表 + """ + if not text: + return [] + + formatting = [] + + # 提取粗体 (**) + for match in cls.PATTERNS['bold_asterisk'].finditer(text): + formatting.append({ + 'type': 'bold', + 'start': match.start(), + 'end': match.end(), + 'content': match.group(1) + }) + + # 提取粗体 (__) + for match in cls.PATTERNS['bold_underscore'].finditer(text): + formatting.append({ + 'type': 'bold', + 'start': match.start(), + 'end': match.end(), + 'content': match.group(1) + }) + + # 提取斜体 (*) + for match in cls.PATTERNS['italic_asterisk'].finditer(text): + # 检查是否与粗体重叠 + overlaps = any(f['start'] <= match.start() < f['end'] or f['start'] < match.end() <= f['end'] + for f in formatting if f['type'] == 'bold') + if not overlaps: + formatting.append({ + 'type': 'italic', + 'start': match.start(), + 'end': match.end(), + 'content': match.group(1) + }) + + # 提取斜体 (_) + for match in cls.PATTERNS['italic_underscore'].finditer(text): + overlaps = any(f['start'] <= match.start() < f['end'] or f['start'] < match.end() <= f['end'] + for f in formatting if f['type'] in ['bold', 'italic']) + if not overlaps: + formatting.append({ + 'type': 'italic', + 'start': match.start(), + 'end': match.end(), + 'content': match.group(1) + }) + + # 提取行内代码 + for match in cls.PATTERNS['code_inline'].finditer(text): + formatting.append({ + 'type': 'code', + 'start': match.start(), + 'end': match.end(), + 'content': match.group(1) + }) + + # 提取删除线 + for match in cls.PATTERNS['strikethrough'].finditer(text): + formatting.append({ + 'type': 'strikethrough', + 'start': match.start(), + 'end': match.end(), + 'content': match.group(1) + }) + + # 提取链接 + for match in cls.PATTERNS['link'].finditer(text): + formatting.append({ + 'type': 'link', + 'start': match.start(), + 'end': match.end(), + 'text': match.group(1), + 'url': match.group(2) + }) + + # 按位置排序 + formatting.sort(key=lambda x: x['start']) + return formatting + + @classmethod + def clean_markdown_text(cls, text: str) -> str: + """ + 清理Markdown标记,返回纯文本 + + Args: + text: 包含Markdown标记的文本 + + Returns: + str: 清理后的纯文本 + """ + if not text: + return text + + # 移除各种Markdown标记 + cleaned = text + + # 移除粗体和斜体标记 + cleaned = re.sub(r'\*\*(.+?)\*\*', r'\1', cleaned) # **bold** + cleaned = re.sub(r'__(.+?)__', r'\1', cleaned) # __bold__ + cleaned = re.sub(r'(? Dict[str, int]: + """ + 获取Markdown文本的统计信息 + + Args: + text: Markdown文本 + + Returns: + Dict[str, int]: 统计信息 + """ + if not text: + return { + "total_chars": 0, + "total_lines": 0, + "headings": 0, + "paragraphs": 0, + "code_blocks": 0, + "tables": 0, + "links": 0, + "images": 0 + } + + stats = { + "total_chars": len(text), + "total_lines": len(text.split('\n')), + "headings": 0, + "paragraphs": 0, + "code_blocks": 0, + "tables": 0, + "links": 0, + "images": 0 + } + + # 统计各种元素 + lines = text.split('\n') + in_code_block = False + + for line in lines: + line = line.strip() + if not line: + continue + + # 代码块 + if line.startswith('```'): + if not in_code_block: + stats["code_blocks"] += 1 + in_code_block = not in_code_block + continue + + if in_code_block: + continue + + # 标题 + if cls.PATTERNS['heading'].match(line): + stats["headings"] += 1 + continue + + # 表格 + if cls.PATTERNS['table_row'].match(line): + stats["tables"] += 1 + continue + + # 普通段落 + if not (cls.PATTERNS['unordered_list'].match(line) or + cls.PATTERNS['ordered_list'].match(line) or + cls.PATTERNS['blockquote'].match(line) or + cls.PATTERNS['horizontal_rule'].match(line)): + stats["paragraphs"] += 1 + + # 统计链接和图片 + stats["links"] = len(cls.PATTERNS['link'].findall(text)) + stats["images"] = len(cls.PATTERNS['image'].findall(text)) + + return stats + + @classmethod + def validate_markdown(cls, text: str) -> Dict[str, Any]: + """ + 验证Markdown格式的有效性 + + Args: + text: 要验证的Markdown文本 + + Returns: + Dict[str, Any]: 验证结果 + """ + result = { + "valid": True, + "warnings": [], + "errors": [] + } + + if not text: + result["warnings"].append("文本为空") + return result + + lines = text.split('\n') + in_code_block = False + table_started = False + + for i, line in enumerate(lines, 1): + line = line.rstrip() + + # 检查代码块 + if line.strip().startswith('```'): + in_code_block = not in_code_block + continue + + if in_code_block: + continue + + # 检查表格格式 + if cls.PATTERNS['table_row'].match(line): + if not table_started: + table_started = True + # 检查表格格式 + if not line.startswith('|') or not line.endswith('|'): + result["warnings"].append(f"第{i}行: 表格格式可能不完整") + elif table_started: + table_started = False + + # 检查标题格式 + heading_match = cls.PATTERNS['heading'].match(line) + if heading_match: + level = len(heading_match.group(2)) + if level > 6: + result["warnings"].append(f"第{i}行: 标题层级过深 (>{6})") + + # 检查未闭合的代码块 + if in_code_block: + result["errors"].append("代码块未正确闭合") + result["valid"] = False + + return result + + +# 创建全局解析器实例 +markdown_parser = MarkdownParser() + + +# 兼容旧接口的函数 +def parse(txt_content: str) -> List[Dict[str, Any]]: + """解析Markdown内容(兼容旧接口)""" + return MarkdownParser.parse(txt_content) + + +def extract_inline_formatting(text: str) -> List[Dict[str, Any]]: + """提取行内格式(兼容旧接口)""" + return MarkdownParser.extract_inline_formatting(text) + + +def group_by_sections(elements: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """按章节分组(兼容旧接口)""" + return MarkdownParser._group_by_sections(elements) \ No newline at end of file diff --git a/text_processor.py b/text_processor.py new file mode 100644 index 0000000..eb2620a --- /dev/null +++ b/text_processor.py @@ -0,0 +1,343 @@ +""" +文本处理模块 + +负责文本的各种处理功能,包括顺序调换、标点符号替换、错别字处理等。 +""" + +from typing import Optional +from config import config +from error_chars import ErrorCharProcessor +from replacestr import replace_text + + +class TextProcessor: + """文本处理器类,统一处理各种文本操作""" + + def __init__(self): + """初始化文本处理器""" + self.error_processor = None + self._init_error_processor() + + def _init_error_processor(self) -> None: + """初始化错别字处理器""" + if config.enable_char_errors: + self.error_processor = ErrorCharProcessor(config.char_error_db_path) + + def replace_periods(self, text: str) -> str: + """ + 将中间出现的句号统一替换为逗号; + 若文本末尾是句号,则直接删除该句号。 + + Args: + text: 输入文本 + + Returns: + str: 处理后的文本 + """ + if not text: + return '' + + text = text.rstrip() + if not text: + return '' + + # 去掉末尾句号(如果有) + if text[-1] == '。': + text = text[:-1] + + # 把剩余句号替换为逗号 + return text.replace('。', ',') + + def reverse_text_order(self, content: str) -> str: + """ + 反转文本顺序(按字符级反转) + + Args: + content: 输入文本 + + Returns: + str: 反转后的文本 + """ + if not content: + return content + return content[::-1] + + def reverse_paragraph_order(self, content: str) -> str: + """ + 反转段落顺序(保留段落内文字顺序) + + Args: + content: 输入文本 + + Returns: + str: 段落顺序反转后的文本 + """ + if not content: + return content + paragraphs = content.split('\n') + return '\n'.join(reversed(paragraphs)) + + def apply_char_errors(self, text: str) -> str: + """ + 应用错别字处理 + + Args: + text: 输入文本 + + Returns: + str: 处理后的文本 + """ + if not config.enable_char_errors or not text: + return text + + try: + # 重新初始化错别字处理器(以防配置改变) + if not self.error_processor: + self._init_error_processor() + + if self.error_processor: + modified_text, replace_count, _, _ = self.error_processor.introduce_char_errors( + text, config.char_error_intensity + ) + if replace_count > 0: + print(f"已应用错别字处理,替换了 {replace_count} 个字符。") + return modified_text + + except Exception as e: + # 如果错别字处理出错,返回原文本 + print(f"错别字处理出错: {e}") + + return text + + def apply_text_order_processing(self, text: str) -> str: + """ + 应用文字顺序处理 + + Args: + text: 输入文本 + + Returns: + str: 处理后的文本 + """ + if not config.reverse_text_order or not text: + return text + + try: + return replace_text(text) + except Exception as e: + print(f"文字顺序处理出错: {e}") + return text + + def process_text_content(self, text: str) -> str: + """ + 统一处理文字内容:顺序调换、错别字处理和标点符号替换 + + Args: + text: 输入文本 + + Returns: + str: 处理后的文本 + """ + if not text or not text.strip(): + return text + + processed_text = text + + # 先进行文字顺序处理 + processed_text = self.apply_text_order_processing(processed_text) + + # 应用错别字处理 + processed_text = self.apply_char_errors(processed_text) + + # 最后进行标点符号替换 + if config.replace_punctuation: + processed_text = self.replace_periods(processed_text) + + return processed_text + + def clean_text(self, text: str) -> str: + """ + 清理文本,去除多余的空白字符 + + Args: + text: 输入文本 + + Returns: + str: 清理后的文本 + """ + if not text: + return text + + # 替换不同类型的换行符 + text = text.replace("\r\n", "\n").replace("\r", "\n") + + # 去除行尾空白 + lines = [line.rstrip() for line in text.split('\n')] + + return '\n'.join(lines) + + def normalize_text(self, text: str) -> str: + """ + 标准化文本格式 + + Args: + text: 输入文本 + + Returns: + str: 标准化后的文本 + """ + if not text: + return text + + # 首先清理文本 + text = self.clean_text(text) + + # 统一中文标点符号 + punctuation_map = { + ',': ',', # 全角逗号 + '。': '。', # 全角句号 + '!': '!', # 全角感叹号 + '?': '?', # 全角问号 + ';': ';', # 全角分号 + ':': ':', # 全角冒号 + } + + for old, new in punctuation_map.items(): + text = text.replace(old, new) + + return text + + def get_processing_statistics(self, text: str) -> dict: + """ + 获取文本处理统计信息 + + Args: + text: 输入文本 + + Returns: + dict: 统计信息 + """ + if not text: + return { + "total_chars": 0, + "total_lines": 0, + "non_empty_lines": 0, + "error_chars_enabled": config.enable_char_errors, + "estimated_error_replacements": 0 + } + + lines = text.split('\n') + non_empty_lines = [line for line in lines if line.strip()] + + stats = { + "total_chars": len(text), + "total_lines": len(lines), + "non_empty_lines": len(non_empty_lines), + "error_chars_enabled": config.enable_char_errors, + "estimated_error_replacements": 0 + } + + # 如果启用了错别字处理,获取估计的替换数量 + if config.enable_char_errors: + try: + if not self.error_processor: + self._init_error_processor() + + if self.error_processor: + error_stats = self.error_processor.get_statistics( + text, config.char_error_intensity + ) + stats["estimated_error_replacements"] = error_stats["estimated_replacements"] + stats["replaceable_chars"] = error_stats["replaceable_chars"] + except Exception as e: + print(f"获取错别字统计失败: {e}") + + return stats + + def preview_processing(self, text: str, max_length: int = 200) -> dict: + """ + 预览文本处理效果(不修改原文本) + + Args: + text: 输入文本 + max_length: 预览文本的最大长度 + + Returns: + dict: 包含原文本和处理后文本的预览 + """ + if not text: + return { + "original": "", + "processed": "", + "truncated": False + } + + # 截取预览长度 + preview_text = text[:max_length] if len(text) > max_length else text + truncated = len(text) > max_length + + # 处理预览文本 + processed_text = self.process_text_content(preview_text) + + return { + "original": preview_text, + "processed": processed_text, + "truncated": truncated + } + + +# 创建全局文本处理器实例 +text_processor = TextProcessor() + + +# 兼容旧接口的函数 +def process_text_content(text: str) -> str: + """ + 处理文本内容(兼容旧接口) + + Args: + text: 输入文本 + + Returns: + str: 处理后的文本 + """ + return text_processor.process_text_content(text) + + +def replace_periods(text: str) -> str: + """ + 替换句号为逗号(兼容旧接口) + + Args: + text: 输入文本 + + Returns: + str: 处理后的文本 + """ + return text_processor.replace_periods(text) + + +def reverse_text_order(content: str) -> str: + """ + 反转文本顺序(兼容旧接口) + + Args: + content: 输入文本 + + Returns: + str: 反转后的文本 + """ + return text_processor.reverse_text_order(content) + + +def apply_char_errors(text: str) -> str: + """ + 应用错别字处理(兼容旧接口) + + Args: + text: 输入文本 + + Returns: + str: 处理后的文本 + """ + return text_processor.apply_char_errors(text) \ No newline at end of file