TxT2Docx/file_handler.py

393 lines
13 KiB
Python
Raw Normal View History

2025-09-21 19:01:40 +08:00
"""
文件处理模块
负责文件系统相关的操作包括文件扫描匹配读取和路径处理等功能
"""
import os
import glob
from typing import List, Dict, Any
from config import config
class FileHandler:
"""文件处理器类,负责文件相关的操作"""
@staticmethod
def scan_txt_files(folder_path: str) -> List[Dict[str, str]]:
"""
扫描文件夹中的所有TXT文件
Args:
folder_path: TXT文件所在的文件夹路径
Returns:
List[Dict[str, str]]: TXT文件信息列表每个元素包含pathnamerelative_pathfolder
Raises:
Exception: 当文件夹不存在或没有找到TXT文件时
"""
if not os.path.isdir(folder_path):
raise Exception(f"TXT文件夹不存在: {folder_path}")
txt_files = []
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.lower().endswith(".txt"):
txt_path = os.path.join(root, file)
file_name = os.path.splitext(file)[0]
txt_files.append({
"path": txt_path,
"name": file_name,
"relative_path": os.path.relpath(txt_path, folder_path),
"folder": root
})
if not txt_files:
raise Exception(f"{folder_path} 中未找到任何TXT文件")
return sorted(txt_files, key=lambda x: x["relative_path"])
@staticmethod
def find_matching_image_folders(txt_files: List[Dict[str, str]], images_root: str) -> List[Dict[str, Any]]:
"""
根据TXT文件名匹配图片文件夹
Args:
txt_files: TXT文件信息列表
images_root: 图片根文件夹路径
Returns:
List[Dict[str, Any]]: 匹配的文件对列表每个元素包含txtimage_folderall_matches
Raises:
Exception: 当图片根文件夹不存在时
"""
if not os.path.isdir(images_root):
raise Exception(f"图片根文件夹不存在: {images_root}")
# 获取所有图片文件夹
all_image_folders = []
for root, dirs, _ in os.walk(images_root):
for dir_name in dirs:
folder_path = os.path.join(root, dir_name)
all_image_folders.append({
"path": folder_path,
"name": dir_name,
"relative_path": os.path.relpath(folder_path, images_root)
})
matched_pairs = []
for txt in txt_files:
matches = FileHandler._find_matches_for_txt(txt, all_image_folders)
if matches:
# 选择最短路径的匹配项
matches.sort(key=lambda x: len(x["relative_path"]))
matched_pairs.append({
"txt": txt,
"image_folder": matches[0],
"all_matches": matches
})
else:
matched_pairs.append({
"txt": txt,
"image_folder": None,
"all_matches": []
})
return matched_pairs
@staticmethod
def _find_matches_for_txt(txt_info: Dict[str, str], image_folders: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""
为单个TXT文件查找匹配的图片文件夹
Args:
txt_info: TXT文件信息
image_folders: 所有图片文件夹信息列表
Returns:
List[Dict[str, str]]: 匹配的图片文件夹列表
"""
matches = []
txt_name = txt_info["name"].lower()
for img_folder in image_folders:
folder_name = img_folder["name"].lower()
if config.match_pattern == "exact" and txt_name == folder_name:
matches.append(img_folder)
elif config.match_pattern == "prefix" and folder_name.startswith(txt_name):
matches.append(img_folder)
elif config.match_pattern == "contains" and txt_name in folder_name:
matches.append(img_folder)
return matches
@staticmethod
def get_image_files(folder_path: str) -> List[str]:
"""
获取文件夹中的所有图片文件
Args:
folder_path: 图片文件夹路径
Returns:
List[str]: 图片文件路径列表按配置的排序方式排序
"""
if not folder_path or not os.path.isdir(folder_path):
return []
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.gif', '*.webp', '*.tiff']
image_files = []
for ext in image_extensions:
pattern = os.path.join(folder_path, ext)
image_files.extend(glob.glob(pattern))
# 也检查大写扩展名
pattern_upper = os.path.join(folder_path, ext.upper())
image_files.extend(glob.glob(pattern_upper))
# 去重(防止大小写扩展名重复)
image_files = list(set(image_files))
# 根据配置排序
if config.image_sort_by == "name":
image_files.sort()
elif config.image_sort_by == "time":
image_files.sort(key=lambda x: os.path.getmtime(x))
return image_files
@staticmethod
def read_markdown_txt(file_path: str) -> str:
"""
读取含Markdown内容的TXT文件
Args:
file_path: TXT文件路径
Returns:
str: 文件内容
Raises:
Exception: 当文件不存在或无法解析时
"""
if not os.path.exists(file_path):
raise Exception(f"TXT文件不存在: {file_path}")
# 尝试多种编码
encodings = [config.txt_encoding, "gbk", "utf-16", "iso-8859-1"]
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
# 统一换行符
content = content.replace("\r\n", "\n").replace("\r", "\n")
return content
except UnicodeDecodeError:
continue
except Exception as e:
print(f"读取文件 {file_path} 时出错 (编码: {encoding}): {e}")
continue
raise Exception(f"无法解析TXT文件编码问题: {file_path}")
@staticmethod
def prepare_output_path(txt_info: Dict[str, str], images_root: str, output_root: str) -> str:
"""
准备输出文件路径
Args:
txt_info: TXT文件信息
images_root: 图片根目录备用
output_root: 输出根目录
Returns:
str: 输出文件的完整路径
"""
# 根据配置决定输出位置
if config.output_location == "txt_folder":
base_folder = txt_info["folder"]
else:
base_folder = output_root
# 确保输出文件夹存在
os.makedirs(base_folder, exist_ok=True)
# 生成输出文件名
txt_name = txt_info["name"]
output_path = os.path.join(base_folder, f"{txt_name}.docx")
# 如果文件已存在,添加序号
counter = 1
while os.path.exists(output_path):
output_path = os.path.join(base_folder, f"{txt_name}_{counter}.docx")
counter += 1
return output_path
@staticmethod
def validate_paths(txt_folder: str, images_root: str, output_root: str = None) -> Dict[str, bool]:
"""
验证路径的有效性
Args:
txt_folder: TXT文件夹路径
images_root: 图片根文件夹路径
output_root: 输出根文件夹路径可选
Returns:
Dict[str, bool]: 路径验证结果
"""
result = {
"txt_folder_valid": bool(txt_folder and os.path.isdir(txt_folder)),
"images_root_valid": bool(images_root and os.path.isdir(images_root)),
"output_root_valid": True # 默认有效,因为可以创建
}
# 如果指定了输出路径且配置要求使用自定义路径,则验证输出路径
if config.output_location == "custom" and output_root:
try:
# 尝试创建输出目录(如果不存在)
if not os.path.exists(output_root):
os.makedirs(output_root, exist_ok=True)
result["output_root_valid"] = os.path.isdir(output_root)
except Exception:
result["output_root_valid"] = False
return result
@staticmethod
def get_folder_statistics(folder_path: str) -> Dict[str, int]:
"""
获取文件夹统计信息
Args:
folder_path: 文件夹路径
Returns:
Dict[str, int]: 统计信息包含txt_filesimage_folderstotal_images等
"""
stats = {
"txt_files": 0,
"image_folders": 0,
"total_images": 0,
"total_subfolders": 0
}
if not os.path.isdir(folder_path):
return stats
try:
# 统计TXT文件
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.lower().endswith(".txt"):
stats["txt_files"] += 1
# 统计子文件夹(可能包含图片)
for root, dirs, files in os.walk(folder_path):
stats["total_subfolders"] += len(dirs)
# 检查是否包含图片
image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tiff']
has_images = any(
file.lower().endswith(ext) for file in files
for ext in image_extensions
)
if has_images:
stats["image_folders"] += 1
# 统计图片数量
for file in files:
if any(file.lower().endswith(ext) for ext in image_extensions):
stats["total_images"] += 1
except Exception as e:
print(f"获取文件夹统计信息时出错: {e}")
return stats
@staticmethod
def create_backup(file_path: str) -> str:
"""
创建文件备份
Args:
file_path: 要备份的文件路径
Returns:
str: 备份文件路径如果备份失败则返回空字符串
"""
if not os.path.exists(file_path):
return ""
try:
backup_path = f"{file_path}.backup"
counter = 1
# 如果备份文件已存在,添加序号
while os.path.exists(backup_path):
backup_path = f"{file_path}.backup.{counter}"
counter += 1
# 复制文件
import shutil
shutil.copy2(file_path, backup_path)
return backup_path
except Exception as e:
print(f"创建备份文件失败: {e}")
return ""
@staticmethod
def cleanup_temp_files(temp_dir: str) -> None:
"""
清理临时文件
Args:
temp_dir: 临时文件目录
"""
try:
if os.path.exists(temp_dir):
import shutil
shutil.rmtree(temp_dir)
except Exception as e:
print(f"清理临时文件失败: {e}")
# 创建全局文件处理器实例
file_handler = FileHandler()
# 兼容旧接口的函数
def scan_txt_files(folder_path: str) -> List[Dict[str, str]]:
"""扫描TXT文件兼容旧接口"""
return FileHandler.scan_txt_files(folder_path)
def find_matching_image_folders(txt_files: List[Dict[str, str]], images_root: str) -> List[Dict[str, Any]]:
"""查找匹配的图片文件夹(兼容旧接口)"""
return FileHandler.find_matching_image_folders(txt_files, images_root)
def get_image_files(folder_path: str) -> List[str]:
"""获取图片文件(兼容旧接口)"""
return FileHandler.get_image_files(folder_path)
def read_markdown_txt(file_path: str) -> str:
"""读取Markdown TXT文件兼容旧接口"""
return FileHandler.read_markdown_txt(file_path)
def prepare_output_path(txt_info: Dict[str, str], images_root: str, output_root: str) -> str:
"""准备输出路径(兼容旧接口)"""
return FileHandler.prepare_output_path(txt_info, images_root, output_root)