343 lines
13 KiB
Python
343 lines
13 KiB
Python
"""
|
||
批量处理模块
|
||
|
||
负责批量处理多个TXT文件,协调文件读取、解析、转换和输出等步骤。
|
||
"""
|
||
|
||
import os
|
||
from typing import List, Dict, Any, Callable, Optional
|
||
|
||
from file_handler import FileHandler
|
||
from markdown_parser import MarkdownParser
|
||
from docx_generator import DocxGenerator
|
||
|
||
|
||
class BatchProcessor:
|
||
"""批量处理器类"""
|
||
|
||
def __init__(self):
|
||
"""初始化批量处理器"""
|
||
self.file_handler = FileHandler()
|
||
self.markdown_parser = MarkdownParser()
|
||
self.docx_generator = DocxGenerator()
|
||
|
||
def process_batch(self, matched_pairs: List[Dict[str, Any]], output_root: str,
|
||
progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
|
||
"""
|
||
批量处理匹配的文件对
|
||
|
||
Args:
|
||
matched_pairs: 匹配的TXT文件和图片文件夹对列表
|
||
output_root: 输出根目录
|
||
progress_callback: 进度回调函数 (progress: int, message: str) -> None
|
||
|
||
Returns:
|
||
Dict[str, Any]: 处理结果统计
|
||
"""
|
||
total = len(matched_pairs)
|
||
success_count = 0
|
||
failed_items = []
|
||
processed_files = []
|
||
|
||
for i, pair in enumerate(matched_pairs):
|
||
try:
|
||
if progress_callback:
|
||
overall_progress = int((i / total) * 100)
|
||
progress_callback(overall_progress, f"处理 {i + 1}/{total}: {pair['txt']['name']}")
|
||
|
||
# 处理单个文件对
|
||
result = self._process_single_pair(pair, output_root, i, total, progress_callback)
|
||
|
||
if result['success']:
|
||
success_count += 1
|
||
processed_files.append(result['output_path'])
|
||
else:
|
||
failed_items.append({
|
||
"name": pair['txt']['name'],
|
||
"error": result['error']
|
||
})
|
||
|
||
except Exception as e:
|
||
failed_items.append({
|
||
"name": pair['txt']['name'],
|
||
"error": str(e)
|
||
})
|
||
|
||
# 确定主要输出文件夹
|
||
main_output_folder = ""
|
||
if matched_pairs and success_count > 0:
|
||
sample_output = self.file_handler.prepare_output_path(
|
||
matched_pairs[0]['txt'], "", output_root
|
||
)
|
||
main_output_folder = os.path.dirname(sample_output)
|
||
|
||
return {
|
||
"total": total,
|
||
"success": success_count,
|
||
"failed": len(failed_items),
|
||
"failed_items": failed_items,
|
||
"main_output_folder": main_output_folder,
|
||
"processed_files": processed_files
|
||
}
|
||
|
||
def _process_single_pair(self, pair: Dict[str, Any], output_root: str,
|
||
current_index: int, total_count: int,
|
||
progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
|
||
"""
|
||
处理单个TXT文件和图片文件夹对
|
||
|
||
Args:
|
||
pair: 文件对信息
|
||
output_root: 输出根目录
|
||
current_index: 当前处理的索引
|
||
total_count: 总文件数
|
||
progress_callback: 进度回调函数
|
||
|
||
Returns:
|
||
Dict[str, Any]: 处理结果
|
||
"""
|
||
result = {
|
||
"success": False,
|
||
"output_path": "",
|
||
"error": ""
|
||
}
|
||
|
||
try:
|
||
# 准备输出路径
|
||
output_path = self.file_handler.prepare_output_path(
|
||
pair['txt'],
|
||
pair['image_folder']['path'] if pair['image_folder'] else "",
|
||
output_root
|
||
)
|
||
result["output_path"] = output_path
|
||
|
||
# 读取TXT内容
|
||
txt_content = self.file_handler.read_markdown_txt(pair['txt']['path'])
|
||
if not txt_content.strip():
|
||
raise Exception("TXT文件内容为空")
|
||
|
||
# 解析内容为结构化数据
|
||
sections = self.markdown_parser.parse(txt_content)
|
||
if not sections:
|
||
raise Exception("未解析到有效内容")
|
||
|
||
# 获取图片文件
|
||
image_files = []
|
||
if pair['image_folder']:
|
||
image_files = self.file_handler.get_image_files(pair['image_folder']['path'])
|
||
|
||
# 生成DOCX
|
||
def update_file_progress(progress: int, text: str):
|
||
if progress_callback:
|
||
# 优化进度计算,避免浮点数精度问题
|
||
# 确保当前文件进度在0-100范围内
|
||
file_progress = max(0, min(100, progress))
|
||
|
||
# 计算整体进度:当前文件的进度在总进度中的占比
|
||
if total_count > 0:
|
||
# 使用整数运算避免浮点数精度问题
|
||
overall_progress = (current_index * 100 + file_progress) // total_count
|
||
overall_progress = max(0, min(100, overall_progress))
|
||
progress_callback(int(overall_progress), f"{pair['txt']['name']}: {text}")
|
||
else:
|
||
progress_callback(file_progress, f"{pair['txt']['name']}: {text}")
|
||
|
||
success = self.docx_generator.generate(sections, image_files, output_path, update_file_progress)
|
||
|
||
if success:
|
||
result["success"] = True
|
||
else:
|
||
result["error"] = "DOCX生成失败"
|
||
|
||
except Exception as e:
|
||
result["error"] = str(e)
|
||
|
||
return result
|
||
|
||
def validate_batch_input(self, txt_folder: str, images_root: str,
|
||
output_root: Optional[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
验证批量处理的输入参数
|
||
|
||
Args:
|
||
txt_folder: TXT文件夹路径
|
||
images_root: 图片根文件夹路径
|
||
output_root: 输出根文件夹路径(可选)
|
||
|
||
Returns:
|
||
Dict[str, Any]: 验证结果
|
||
"""
|
||
result = {
|
||
"valid": True,
|
||
"errors": [],
|
||
"warnings": [],
|
||
"statistics": {}
|
||
}
|
||
|
||
try:
|
||
# 验证路径
|
||
path_validation = self.file_handler.validate_paths(txt_folder, images_root, output_root)
|
||
|
||
if not path_validation["txt_folder_valid"]:
|
||
result["errors"].append("TXT文件夹路径无效")
|
||
result["valid"] = False
|
||
|
||
if not path_validation["images_root_valid"]:
|
||
result["errors"].append("图片根文件夹路径无效")
|
||
result["valid"] = False
|
||
|
||
if not path_validation["output_root_valid"]:
|
||
result["errors"].append("输出根文件夹路径无效")
|
||
result["valid"] = False
|
||
|
||
# 如果基本路径验证通过,获取统计信息
|
||
if result["valid"]:
|
||
try:
|
||
txt_files = self.file_handler.scan_txt_files(txt_folder)
|
||
result["statistics"]["txt_files_count"] = len(txt_files)
|
||
|
||
if len(txt_files) == 0:
|
||
result["warnings"].append("未找到任何TXT文件")
|
||
|
||
# 获取图片文件夹统计
|
||
img_stats = self.file_handler.get_folder_statistics(images_root)
|
||
result["statistics"]["image_folders_count"] = img_stats["image_folders"]
|
||
result["statistics"]["total_images"] = img_stats["total_images"]
|
||
|
||
if img_stats["image_folders"] == 0:
|
||
result["warnings"].append("未找到任何包含图片的文件夹")
|
||
|
||
except Exception as e:
|
||
result["warnings"].append(f"获取文件统计信息失败: {str(e)}")
|
||
|
||
except Exception as e:
|
||
result["errors"].append(f"验证过程出错: {str(e)}")
|
||
result["valid"] = False
|
||
|
||
return result
|
||
|
||
def preview_batch_processing(self, txt_folder: str, images_root: str) -> Dict[str, Any]:
|
||
"""
|
||
预览批量处理结果(不实际处理)
|
||
|
||
Args:
|
||
txt_folder: TXT文件夹路径
|
||
images_root: 图片根文件夹路径
|
||
|
||
Returns:
|
||
Dict[str, Any]: 预览结果
|
||
"""
|
||
preview = {
|
||
"txt_files": [],
|
||
"matched_pairs": [],
|
||
"unmatched_txt_files": [],
|
||
"statistics": {
|
||
"total_txt_files": 0,
|
||
"matched_files": 0,
|
||
"unmatched_files": 0,
|
||
"total_images": 0
|
||
}
|
||
}
|
||
|
||
try:
|
||
# 扫描TXT文件
|
||
txt_files = self.file_handler.scan_txt_files(txt_folder)
|
||
preview["txt_files"] = txt_files
|
||
preview["statistics"]["total_txt_files"] = len(txt_files)
|
||
|
||
# 查找匹配的图片文件夹
|
||
matched_pairs = self.file_handler.find_matching_image_folders(txt_files, images_root)
|
||
|
||
matched_files = []
|
||
unmatched_files = []
|
||
total_images = 0
|
||
|
||
for pair in matched_pairs:
|
||
if pair['image_folder']:
|
||
matched_files.append(pair)
|
||
# 统计图片数量
|
||
image_files = self.file_handler.get_image_files(pair['image_folder']['path'])
|
||
total_images += len(image_files)
|
||
else:
|
||
unmatched_files.append(pair['txt'])
|
||
|
||
preview["matched_pairs"] = matched_files
|
||
preview["unmatched_txt_files"] = unmatched_files
|
||
preview["statistics"]["matched_files"] = len(matched_files)
|
||
preview["statistics"]["unmatched_files"] = len(unmatched_files)
|
||
preview["statistics"]["total_images"] = total_images
|
||
|
||
except Exception as e:
|
||
preview["error"] = str(e)
|
||
|
||
return preview
|
||
|
||
def get_processing_estimates(self, matched_pairs: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""
|
||
获取处理时间和资源估算
|
||
|
||
Args:
|
||
matched_pairs: 匹配的文件对列表
|
||
|
||
Returns:
|
||
Dict[str, Any]: 估算结果
|
||
"""
|
||
estimates = {
|
||
"total_files": len(matched_pairs),
|
||
"estimated_time_minutes": 0,
|
||
"estimated_output_size_mb": 0,
|
||
"warnings": []
|
||
}
|
||
|
||
try:
|
||
total_txt_size = 0
|
||
total_image_size = 0
|
||
total_images = 0
|
||
|
||
for pair in matched_pairs:
|
||
# 统计TXT文件大小
|
||
txt_path = pair['txt']['path']
|
||
if os.path.exists(txt_path):
|
||
total_txt_size += os.path.getsize(txt_path)
|
||
|
||
# 统计图片文件大小
|
||
if pair['image_folder']:
|
||
image_files = self.file_handler.get_image_files(pair['image_folder']['path'])
|
||
total_images += len(image_files)
|
||
for img_path in image_files:
|
||
if os.path.exists(img_path):
|
||
total_image_size += os.path.getsize(img_path)
|
||
|
||
# 估算处理时间(基于经验值)
|
||
# 假设每个文件平均处理时间为10秒,每张图片额外增加2秒
|
||
base_time = len(matched_pairs) * 10 # 秒
|
||
image_time = total_images * 2 # 秒
|
||
total_time_seconds = base_time + image_time
|
||
estimates["estimated_time_minutes"] = max(1, total_time_seconds // 60)
|
||
|
||
# 估算输出文件大小(DOCX通常比原文件大)
|
||
estimated_size_bytes = total_txt_size * 2 + total_image_size * 0.8 # 压缩后的图片
|
||
estimates["estimated_output_size_mb"] = max(1, estimated_size_bytes // (1024 * 1024))
|
||
|
||
# 添加警告
|
||
if total_images > 1000:
|
||
estimates["warnings"].append("图片数量较多,处理时间可能较长")
|
||
|
||
if estimated_size_bytes > 500 * 1024 * 1024: # 500MB
|
||
estimates["warnings"].append("预计输出文件较大,请确保有足够的磁盘空间")
|
||
|
||
except Exception as e:
|
||
estimates["error"] = str(e)
|
||
|
||
return estimates
|
||
|
||
|
||
# 创建全局批量处理器实例
|
||
batch_processor = BatchProcessor()
|
||
|
||
|
||
# 兼容旧接口的函数
|
||
def process_batch(matched_pairs: List[Dict[str, Any]], output_root: str,
|
||
progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
|
||
"""批量处理文件对(兼容旧接口)"""
|
||
return batch_processor.process_batch(matched_pairs, output_root, progress_callback) |