TxT2Docx/file_handler.py

381 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
文件处理模块
负责文件系统相关的操作,包括文件扫描、匹配、读取和路径处理等功能。
"""
import os
import glob
from typing import List, Dict, Any, Optional
from config import config
class FileHandler:
"""文件处理器类,负责文件相关的操作"""
@staticmethod
def scan_txt_files(folder_path: str) -> List[Dict[str, str]]:
"""
扫描文件夹中的所有TXT文件
Args:
folder_path: TXT文件所在的文件夹路径
Returns:
List[Dict[str, str]]: TXT文件信息列表每个元素包含path、name、relative_path、folder
Raises:
Exception: 当文件夹不存在或没有找到TXT文件时
"""
if not os.path.isdir(folder_path):
raise Exception(f"TXT文件夹不存在: {folder_path}")
txt_files = []
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.lower().endswith(".txt"):
txt_path = os.path.join(root, file)
file_name = os.path.splitext(file)[0]
txt_files.append({
"path": txt_path,
"name": file_name,
"relative_path": os.path.relpath(txt_path, folder_path),
"folder": root
})
if not txt_files:
raise Exception(f"{folder_path} 中未找到任何TXT文件")
return sorted(txt_files, key=lambda x: x["relative_path"])
@staticmethod
def find_matching_image_folders(txt_files: List[Dict[str, str]], images_root: str) -> List[Dict[str, Any]]:
"""
根据TXT文件名匹配图片文件夹
Args:
txt_files: TXT文件信息列表
images_root: 图片根文件夹路径
Returns:
List[Dict[str, Any]]: 匹配的文件对列表每个元素包含txt、image_folder、all_matches
Raises:
Exception: 当图片根文件夹不存在时
"""
if not os.path.isdir(images_root):
raise Exception(f"图片根文件夹不存在: {images_root}")
matched_pairs = []
# 优化:直接在遍历过程中进行匹配,避免先获取所有文件夹
for txt in txt_files:
matches = []
txt_name = txt["name"].lower()
# 遍历图片根目录下的所有子目录进行匹配
for root, dirs, _ in os.walk(images_root):
for dir_name in dirs:
folder_path = os.path.join(root, dir_name)
folder_name = dir_name.lower()
if config.match_pattern == "exact" and txt_name == folder_name:
matches.append({
"path": folder_path,
"name": dir_name,
"relative_path": os.path.relpath(folder_path, images_root)
})
elif config.match_pattern == "prefix" and folder_name.startswith(txt_name):
matches.append({
"path": folder_path,
"name": dir_name,
"relative_path": os.path.relpath(folder_path, images_root)
})
elif config.match_pattern == "contains" and txt_name in folder_name:
matches.append({
"path": folder_path,
"name": dir_name,
"relative_path": os.path.relpath(folder_path, images_root)
})
if matches:
# 选择最短路径的匹配项
matches.sort(key=lambda x: len(x["relative_path"]))
matched_pairs.append({
"txt": txt,
"image_folder": matches[0],
"all_matches": matches
})
else:
matched_pairs.append({
"txt": txt,
"image_folder": None,
"all_matches": []
})
return matched_pairs
@staticmethod
def get_image_files(folder_path: str) -> List[str]:
"""
获取文件夹中的所有图片文件
Args:
folder_path: 图片文件夹路径
Returns:
List[str]: 图片文件路径列表,按配置的排序方式排序
"""
if not folder_path or not os.path.isdir(folder_path):
return []
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.gif', '*.webp', '*.tiff']
image_files = set() # 使用集合去重
for ext in image_extensions:
# 优化只进行一次glob搜索同时匹配大小写
pattern = os.path.join(folder_path, ext)
image_files.update(glob.glob(pattern))
pattern_upper = os.path.join(folder_path, ext.upper())
image_files.update(glob.glob(pattern_upper))
image_files = list(image_files) # 转换回列表
# 根据配置排序
if config.image_sort_by == "name":
image_files.sort()
elif config.image_sort_by == "time":
image_files.sort(key=lambda x: os.path.getmtime(x))
return image_files
@staticmethod
def read_markdown_txt(file_path: str) -> str:
"""
读取含Markdown内容的TXT文件
Args:
file_path: TXT文件路径
Returns:
str: 文件内容
Raises:
Exception: 当文件不存在或无法解析时
"""
if not os.path.exists(file_path):
raise Exception(f"TXT文件不存在: {file_path}")
# 尝试多种编码
encodings = [config.txt_encoding, "gbk", "utf-16", "iso-8859-1"]
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
# 统一换行符
content = content.replace("\r\n", "\n").replace("\r", "\n")
return content
except UnicodeDecodeError:
continue
except Exception as e:
print(f"读取文件 {file_path} 时出错 (编码: {encoding}): {e}")
continue
raise Exception(f"无法解析TXT文件编码问题: {file_path}")
@staticmethod
def prepare_output_path(txt_info: Dict[str, str], images_root: str, output_root: str) -> str:
"""
准备输出文件路径
Args:
txt_info: TXT文件信息
images_root: 图片根目录(备用)
output_root: 输出根目录
Returns:
str: 输出文件的完整路径
"""
# 根据配置决定输出位置
if config.output_location == "txt_folder":
base_folder = txt_info["folder"]
else:
base_folder = output_root
# 确保输出文件夹存在
os.makedirs(base_folder, exist_ok=True)
# 生成输出文件名
txt_name = txt_info["name"]
output_path = os.path.join(base_folder, f"{txt_name}.docx")
# 如果文件已存在,添加序号
counter = 1
while os.path.exists(output_path):
output_path = os.path.join(base_folder, f"{txt_name}_{counter}.docx")
counter += 1
return output_path
@staticmethod
def validate_paths(txt_folder: str, images_root: str, output_root: Optional[str] = None) -> Dict[str, bool]:
"""
验证路径的有效性
Args:
txt_folder: TXT文件夹路径
images_root: 图片根文件夹路径
output_root: 输出根文件夹路径(可选)
Returns:
Dict[str, bool]: 路径验证结果
"""
result = {
"txt_folder_valid": bool(txt_folder and os.path.isdir(txt_folder)),
"images_root_valid": bool(images_root and os.path.isdir(images_root)),
"output_root_valid": True # 默认有效,因为可以创建
}
# 如果指定了输出路径且配置要求使用自定义路径,则验证输出路径
if config.output_location == "custom" and output_root:
try:
# 尝试创建输出目录(如果不存在)
if not os.path.exists(output_root):
os.makedirs(output_root, exist_ok=True)
result["output_root_valid"] = os.path.isdir(output_root)
except Exception:
result["output_root_valid"] = False
return result
@staticmethod
def get_folder_statistics(folder_path: str) -> Dict[str, int]:
"""
获取文件夹统计信息
Args:
folder_path: 文件夹路径
Returns:
Dict[str, int]: 统计信息包含txt_files、image_folders、total_images等
"""
stats = {
"txt_files": 0,
"image_folders": 0,
"total_images": 0,
"total_subfolders": 0
}
if not os.path.isdir(folder_path):
return stats
try:
# 统计TXT文件
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.lower().endswith(".txt"):
stats["txt_files"] += 1
# 统计子文件夹(可能包含图片)
for root, dirs, files in os.walk(folder_path):
stats["total_subfolders"] += len(dirs)
# 检查是否包含图片
image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tiff']
has_images = any(
file.lower().endswith(ext) for file in files
for ext in image_extensions
)
if has_images:
stats["image_folders"] += 1
# 统计图片数量
for file in files:
if any(file.lower().endswith(ext) for ext in image_extensions):
stats["total_images"] += 1
except Exception as e:
print(f"获取文件夹统计信息时出错: {e}")
return stats
@staticmethod
def create_backup(file_path: str) -> str:
"""
创建文件备份
Args:
file_path: 要备份的文件路径
Returns:
str: 备份文件路径,如果备份失败则返回空字符串
"""
if not os.path.exists(file_path):
return ""
try:
backup_path = f"{file_path}.backup"
counter = 1
# 如果备份文件已存在,添加序号
while os.path.exists(backup_path):
backup_path = f"{file_path}.backup.{counter}"
counter += 1
# 复制文件
import shutil
shutil.copy2(file_path, backup_path)
return backup_path
except Exception as e:
print(f"创建备份文件失败: {e}")
return ""
@staticmethod
def cleanup_temp_files(temp_dir: str) -> None:
"""
清理临时文件
Args:
temp_dir: 临时文件目录
"""
try:
if os.path.exists(temp_dir):
import shutil
shutil.rmtree(temp_dir)
except Exception as e:
print(f"清理临时文件失败: {e}")
# 创建全局文件处理器实例
file_handler = FileHandler()
# 兼容旧接口的函数
def scan_txt_files(folder_path: str) -> List[Dict[str, str]]:
"""扫描TXT文件兼容旧接口"""
return FileHandler.scan_txt_files(folder_path)
def find_matching_image_folders(txt_files: List[Dict[str, str]], images_root: str) -> List[Dict[str, Any]]:
"""查找匹配的图片文件夹(兼容旧接口)"""
return FileHandler.find_matching_image_folders(txt_files, images_root)
def get_image_files(folder_path: str) -> List[str]:
"""获取图片文件(兼容旧接口)"""
return FileHandler.get_image_files(folder_path)
def read_markdown_txt(file_path: str) -> str:
"""读取Markdown TXT文件兼容旧接口"""
return FileHandler.read_markdown_txt(file_path)
def prepare_output_path(txt_info: Dict[str, str], images_root: str, output_root: str) -> str:
"""准备输出路径(兼容旧接口)"""
return FileHandler.prepare_output_path(txt_info, images_root, output_root)