TxT2Docx/batch_processor.py

343 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
批量处理模块
负责批量处理多个TXT文件协调文件读取、解析、转换和输出等步骤。
"""
import os
from typing import List, Dict, Any, Callable, Optional
from file_handler import FileHandler
from markdown_parser import MarkdownParser
from docx_generator import DocxGenerator
class BatchProcessor:
"""批量处理器类"""
def __init__(self):
"""初始化批量处理器"""
self.file_handler = FileHandler()
self.markdown_parser = MarkdownParser()
self.docx_generator = DocxGenerator()
def process_batch(self, matched_pairs: List[Dict[str, Any]], output_root: str,
progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
"""
批量处理匹配的文件对
Args:
matched_pairs: 匹配的TXT文件和图片文件夹对列表
output_root: 输出根目录
progress_callback: 进度回调函数 (progress: int, message: str) -> None
Returns:
Dict[str, Any]: 处理结果统计
"""
total = len(matched_pairs)
success_count = 0
failed_items = []
processed_files = []
for i, pair in enumerate(matched_pairs):
try:
if progress_callback:
overall_progress = int((i / total) * 100)
progress_callback(overall_progress, f"处理 {i + 1}/{total}: {pair['txt']['name']}")
# 处理单个文件对
result = self._process_single_pair(pair, output_root, i, total, progress_callback)
if result['success']:
success_count += 1
processed_files.append(result['output_path'])
else:
failed_items.append({
"name": pair['txt']['name'],
"error": result['error']
})
except Exception as e:
failed_items.append({
"name": pair['txt']['name'],
"error": str(e)
})
# 确定主要输出文件夹
main_output_folder = ""
if matched_pairs and success_count > 0:
sample_output = self.file_handler.prepare_output_path(
matched_pairs[0]['txt'], "", output_root
)
main_output_folder = os.path.dirname(sample_output)
return {
"total": total,
"success": success_count,
"failed": len(failed_items),
"failed_items": failed_items,
"main_output_folder": main_output_folder,
"processed_files": processed_files
}
def _process_single_pair(self, pair: Dict[str, Any], output_root: str,
current_index: int, total_count: int,
progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
"""
处理单个TXT文件和图片文件夹对
Args:
pair: 文件对信息
output_root: 输出根目录
current_index: 当前处理的索引
total_count: 总文件数
progress_callback: 进度回调函数
Returns:
Dict[str, Any]: 处理结果
"""
result = {
"success": False,
"output_path": "",
"error": ""
}
try:
# 准备输出路径
output_path = self.file_handler.prepare_output_path(
pair['txt'],
pair['image_folder']['path'] if pair['image_folder'] else "",
output_root
)
result["output_path"] = output_path
# 读取TXT内容
txt_content = self.file_handler.read_markdown_txt(pair['txt']['path'])
if not txt_content.strip():
raise Exception("TXT文件内容为空")
# 解析内容为结构化数据
sections = self.markdown_parser.parse(txt_content)
if not sections:
raise Exception("未解析到有效内容")
# 获取图片文件
image_files = []
if pair['image_folder']:
image_files = self.file_handler.get_image_files(pair['image_folder']['path'])
# 生成DOCX
def update_file_progress(progress: int, text: str):
if progress_callback:
# 优化进度计算,避免浮点数精度问题
# 确保当前文件进度在0-100范围内
file_progress = max(0, min(100, progress))
# 计算整体进度:当前文件的进度在总进度中的占比
if total_count > 0:
# 使用整数运算避免浮点数精度问题
overall_progress = (current_index * 100 + file_progress) // total_count
overall_progress = max(0, min(100, overall_progress))
progress_callback(int(overall_progress), f"{pair['txt']['name']}: {text}")
else:
progress_callback(file_progress, f"{pair['txt']['name']}: {text}")
success = self.docx_generator.generate(sections, image_files, output_path, update_file_progress)
if success:
result["success"] = True
else:
result["error"] = "DOCX生成失败"
except Exception as e:
result["error"] = str(e)
return result
def validate_batch_input(self, txt_folder: str, images_root: str,
output_root: Optional[str] = None) -> Dict[str, Any]:
"""
验证批量处理的输入参数
Args:
txt_folder: TXT文件夹路径
images_root: 图片根文件夹路径
output_root: 输出根文件夹路径(可选)
Returns:
Dict[str, Any]: 验证结果
"""
result = {
"valid": True,
"errors": [],
"warnings": [],
"statistics": {}
}
try:
# 验证路径
path_validation = self.file_handler.validate_paths(txt_folder, images_root, output_root)
if not path_validation["txt_folder_valid"]:
result["errors"].append("TXT文件夹路径无效")
result["valid"] = False
if not path_validation["images_root_valid"]:
result["errors"].append("图片根文件夹路径无效")
result["valid"] = False
if not path_validation["output_root_valid"]:
result["errors"].append("输出根文件夹路径无效")
result["valid"] = False
# 如果基本路径验证通过,获取统计信息
if result["valid"]:
try:
txt_files = self.file_handler.scan_txt_files(txt_folder)
result["statistics"]["txt_files_count"] = len(txt_files)
if len(txt_files) == 0:
result["warnings"].append("未找到任何TXT文件")
# 获取图片文件夹统计
img_stats = self.file_handler.get_folder_statistics(images_root)
result["statistics"]["image_folders_count"] = img_stats["image_folders"]
result["statistics"]["total_images"] = img_stats["total_images"]
if img_stats["image_folders"] == 0:
result["warnings"].append("未找到任何包含图片的文件夹")
except Exception as e:
result["warnings"].append(f"获取文件统计信息失败: {str(e)}")
except Exception as e:
result["errors"].append(f"验证过程出错: {str(e)}")
result["valid"] = False
return result
def preview_batch_processing(self, txt_folder: str, images_root: str) -> Dict[str, Any]:
"""
预览批量处理结果(不实际处理)
Args:
txt_folder: TXT文件夹路径
images_root: 图片根文件夹路径
Returns:
Dict[str, Any]: 预览结果
"""
preview = {
"txt_files": [],
"matched_pairs": [],
"unmatched_txt_files": [],
"statistics": {
"total_txt_files": 0,
"matched_files": 0,
"unmatched_files": 0,
"total_images": 0
}
}
try:
# 扫描TXT文件
txt_files = self.file_handler.scan_txt_files(txt_folder)
preview["txt_files"] = txt_files
preview["statistics"]["total_txt_files"] = len(txt_files)
# 查找匹配的图片文件夹
matched_pairs = self.file_handler.find_matching_image_folders(txt_files, images_root)
matched_files = []
unmatched_files = []
total_images = 0
for pair in matched_pairs:
if pair['image_folder']:
matched_files.append(pair)
# 统计图片数量
image_files = self.file_handler.get_image_files(pair['image_folder']['path'])
total_images += len(image_files)
else:
unmatched_files.append(pair['txt'])
preview["matched_pairs"] = matched_files
preview["unmatched_txt_files"] = unmatched_files
preview["statistics"]["matched_files"] = len(matched_files)
preview["statistics"]["unmatched_files"] = len(unmatched_files)
preview["statistics"]["total_images"] = total_images
except Exception as e:
preview["error"] = str(e)
return preview
def get_processing_estimates(self, matched_pairs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
获取处理时间和资源估算
Args:
matched_pairs: 匹配的文件对列表
Returns:
Dict[str, Any]: 估算结果
"""
estimates = {
"total_files": len(matched_pairs),
"estimated_time_minutes": 0,
"estimated_output_size_mb": 0,
"warnings": []
}
try:
total_txt_size = 0
total_image_size = 0
total_images = 0
for pair in matched_pairs:
# 统计TXT文件大小
txt_path = pair['txt']['path']
if os.path.exists(txt_path):
total_txt_size += os.path.getsize(txt_path)
# 统计图片文件大小
if pair['image_folder']:
image_files = self.file_handler.get_image_files(pair['image_folder']['path'])
total_images += len(image_files)
for img_path in image_files:
if os.path.exists(img_path):
total_image_size += os.path.getsize(img_path)
# 估算处理时间(基于经验值)
# 假设每个文件平均处理时间为10秒每张图片额外增加2秒
base_time = len(matched_pairs) * 10 # 秒
image_time = total_images * 2 # 秒
total_time_seconds = base_time + image_time
estimates["estimated_time_minutes"] = max(1, total_time_seconds // 60)
# 估算输出文件大小DOCX通常比原文件大
estimated_size_bytes = total_txt_size * 2 + total_image_size * 0.8 # 压缩后的图片
estimates["estimated_output_size_mb"] = max(1, estimated_size_bytes // (1024 * 1024))
# 添加警告
if total_images > 1000:
estimates["warnings"].append("图片数量较多,处理时间可能较长")
if estimated_size_bytes > 500 * 1024 * 1024: # 500MB
estimates["warnings"].append("预计输出文件较大,请确保有足够的磁盘空间")
except Exception as e:
estimates["error"] = str(e)
return estimates
# 创建全局批量处理器实例
batch_processor = BatchProcessor()
# 兼容旧接口的函数
def process_batch(matched_pairs: List[Dict[str, Any]], output_root: str,
progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
"""批量处理文件对(兼容旧接口)"""
return batch_processor.process_batch(matched_pairs, output_root, progress_callback)