""" 批量处理模块 负责批量处理多个TXT文件,协调文件读取、解析、转换和输出等步骤。 """ import os from typing import List, Dict, Any, Callable, Optional from file_handler import FileHandler from markdown_parser import MarkdownParser from docx_generator import DocxGenerator class BatchProcessor: """批量处理器类""" def __init__(self): """初始化批量处理器""" self.file_handler = FileHandler() self.markdown_parser = MarkdownParser() self.docx_generator = DocxGenerator() def process_batch(self, matched_pairs: List[Dict[str, Any]], output_root: str, progress_callback: Optional[Callable] = None) -> Dict[str, Any]: """ 批量处理匹配的文件对 Args: matched_pairs: 匹配的TXT文件和图片文件夹对列表 output_root: 输出根目录 progress_callback: 进度回调函数 (progress: int, message: str) -> None Returns: Dict[str, Any]: 处理结果统计 """ total = len(matched_pairs) success_count = 0 failed_items = [] processed_files = [] for i, pair in enumerate(matched_pairs): try: if progress_callback: overall_progress = int((i / total) * 100) progress_callback(overall_progress, f"处理 {i + 1}/{total}: {pair['txt']['name']}") # 处理单个文件对 result = self._process_single_pair(pair, output_root, i, total, progress_callback) if result['success']: success_count += 1 processed_files.append(result['output_path']) else: failed_items.append({ "name": pair['txt']['name'], "error": result['error'] }) except Exception as e: failed_items.append({ "name": pair['txt']['name'], "error": str(e) }) # 确定主要输出文件夹 main_output_folder = "" if matched_pairs and success_count > 0: sample_output = self.file_handler.prepare_output_path( matched_pairs[0]['txt'], "", output_root ) main_output_folder = os.path.dirname(sample_output) return { "total": total, "success": success_count, "failed": len(failed_items), "failed_items": failed_items, "main_output_folder": main_output_folder, "processed_files": processed_files } def _process_single_pair(self, pair: Dict[str, Any], output_root: str, current_index: int, total_count: int, progress_callback: Optional[Callable] = None) -> Dict[str, Any]: """ 处理单个TXT文件和图片文件夹对 Args: pair: 文件对信息 output_root: 输出根目录 current_index: 当前处理的索引 total_count: 总文件数 progress_callback: 进度回调函数 Returns: Dict[str, Any]: 处理结果 """ result = { "success": False, "output_path": "", "error": "" } try: # 准备输出路径 output_path = self.file_handler.prepare_output_path( pair['txt'], pair['image_folder']['path'] if pair['image_folder'] else "", output_root ) result["output_path"] = output_path # 读取TXT内容 txt_content = self.file_handler.read_markdown_txt(pair['txt']['path']) if not txt_content.strip(): raise Exception("TXT文件内容为空") # 解析内容为结构化数据 sections = self.markdown_parser.parse(txt_content) if not sections: raise Exception("未解析到有效内容") # 获取图片文件 image_files = [] if pair['image_folder']: image_files = self.file_handler.get_image_files(pair['image_folder']['path']) # 生成DOCX def update_file_progress(progress: int, text: str): if progress_callback: # 优化进度计算,避免浮点数精度问题 # 确保当前文件进度在0-100范围内 file_progress = max(0, min(100, progress)) # 计算整体进度:当前文件的进度在总进度中的占比 if total_count > 0: # 使用整数运算避免浮点数精度问题 overall_progress = (current_index * 100 + file_progress) // total_count overall_progress = max(0, min(100, overall_progress)) progress_callback(int(overall_progress), f"{pair['txt']['name']}: {text}") else: progress_callback(file_progress, f"{pair['txt']['name']}: {text}") success = self.docx_generator.generate(sections, image_files, output_path, update_file_progress) if success: result["success"] = True else: result["error"] = "DOCX生成失败" except Exception as e: result["error"] = str(e) return result def validate_batch_input(self, txt_folder: str, images_root: str, output_root: Optional[str] = None) -> Dict[str, Any]: """ 验证批量处理的输入参数 Args: txt_folder: TXT文件夹路径 images_root: 图片根文件夹路径 output_root: 输出根文件夹路径(可选) Returns: Dict[str, Any]: 验证结果 """ result = { "valid": True, "errors": [], "warnings": [], "statistics": {} } try: # 验证路径 path_validation = self.file_handler.validate_paths(txt_folder, images_root, output_root) if not path_validation["txt_folder_valid"]: result["errors"].append("TXT文件夹路径无效") result["valid"] = False if not path_validation["images_root_valid"]: result["errors"].append("图片根文件夹路径无效") result["valid"] = False if not path_validation["output_root_valid"]: result["errors"].append("输出根文件夹路径无效") result["valid"] = False # 如果基本路径验证通过,获取统计信息 if result["valid"]: try: txt_files = self.file_handler.scan_txt_files(txt_folder) result["statistics"]["txt_files_count"] = len(txt_files) if len(txt_files) == 0: result["warnings"].append("未找到任何TXT文件") # 获取图片文件夹统计 img_stats = self.file_handler.get_folder_statistics(images_root) result["statistics"]["image_folders_count"] = img_stats["image_folders"] result["statistics"]["total_images"] = img_stats["total_images"] if img_stats["image_folders"] == 0: result["warnings"].append("未找到任何包含图片的文件夹") except Exception as e: result["warnings"].append(f"获取文件统计信息失败: {str(e)}") except Exception as e: result["errors"].append(f"验证过程出错: {str(e)}") result["valid"] = False return result def preview_batch_processing(self, txt_folder: str, images_root: str) -> Dict[str, Any]: """ 预览批量处理结果(不实际处理) Args: txt_folder: TXT文件夹路径 images_root: 图片根文件夹路径 Returns: Dict[str, Any]: 预览结果 """ preview = { "txt_files": [], "matched_pairs": [], "unmatched_txt_files": [], "statistics": { "total_txt_files": 0, "matched_files": 0, "unmatched_files": 0, "total_images": 0 } } try: # 扫描TXT文件 txt_files = self.file_handler.scan_txt_files(txt_folder) preview["txt_files"] = txt_files preview["statistics"]["total_txt_files"] = len(txt_files) # 查找匹配的图片文件夹 matched_pairs = self.file_handler.find_matching_image_folders(txt_files, images_root) matched_files = [] unmatched_files = [] total_images = 0 for pair in matched_pairs: if pair['image_folder']: matched_files.append(pair) # 统计图片数量 image_files = self.file_handler.get_image_files(pair['image_folder']['path']) total_images += len(image_files) else: unmatched_files.append(pair['txt']) preview["matched_pairs"] = matched_files preview["unmatched_txt_files"] = unmatched_files preview["statistics"]["matched_files"] = len(matched_files) preview["statistics"]["unmatched_files"] = len(unmatched_files) preview["statistics"]["total_images"] = total_images except Exception as e: preview["error"] = str(e) return preview def get_processing_estimates(self, matched_pairs: List[Dict[str, Any]]) -> Dict[str, Any]: """ 获取处理时间和资源估算 Args: matched_pairs: 匹配的文件对列表 Returns: Dict[str, Any]: 估算结果 """ estimates = { "total_files": len(matched_pairs), "estimated_time_minutes": 0, "estimated_output_size_mb": 0, "warnings": [] } try: total_txt_size = 0 total_image_size = 0 total_images = 0 for pair in matched_pairs: # 统计TXT文件大小 txt_path = pair['txt']['path'] if os.path.exists(txt_path): total_txt_size += os.path.getsize(txt_path) # 统计图片文件大小 if pair['image_folder']: image_files = self.file_handler.get_image_files(pair['image_folder']['path']) total_images += len(image_files) for img_path in image_files: if os.path.exists(img_path): total_image_size += os.path.getsize(img_path) # 估算处理时间(基于经验值) # 假设每个文件平均处理时间为10秒,每张图片额外增加2秒 base_time = len(matched_pairs) * 10 # 秒 image_time = total_images * 2 # 秒 total_time_seconds = base_time + image_time estimates["estimated_time_minutes"] = max(1, total_time_seconds // 60) # 估算输出文件大小(DOCX通常比原文件大) estimated_size_bytes = total_txt_size * 2 + total_image_size * 0.8 # 压缩后的图片 estimates["estimated_output_size_mb"] = max(1, estimated_size_bytes // (1024 * 1024)) # 添加警告 if total_images > 1000: estimates["warnings"].append("图片数量较多,处理时间可能较长") if estimated_size_bytes > 500 * 1024 * 1024: # 500MB estimates["warnings"].append("预计输出文件较大,请确保有足够的磁盘空间") except Exception as e: estimates["error"] = str(e) return estimates # 创建全局批量处理器实例 batch_processor = BatchProcessor() # 兼容旧接口的函数 def process_batch(matched_pairs: List[Dict[str, Any]], output_root: str, progress_callback: Optional[Callable] = None) -> Dict[str, Any]: """批量处理文件对(兼容旧接口)""" return batch_processor.process_batch(matched_pairs, output_root, progress_callback)