381 lines
13 KiB
Python
381 lines
13 KiB
Python
"""
|
||
文件处理模块
|
||
|
||
负责文件系统相关的操作,包括文件扫描、匹配、读取和路径处理等功能。
|
||
"""
|
||
|
||
import os
|
||
import glob
|
||
from typing import List, Dict, Any, Optional
|
||
from config import config
|
||
|
||
|
||
class FileHandler:
|
||
"""文件处理器类,负责文件相关的操作"""
|
||
|
||
@staticmethod
|
||
def scan_txt_files(folder_path: str) -> List[Dict[str, str]]:
|
||
"""
|
||
扫描文件夹中的所有TXT文件
|
||
|
||
Args:
|
||
folder_path: TXT文件所在的文件夹路径
|
||
|
||
Returns:
|
||
List[Dict[str, str]]: TXT文件信息列表,每个元素包含path、name、relative_path、folder
|
||
|
||
Raises:
|
||
Exception: 当文件夹不存在或没有找到TXT文件时
|
||
"""
|
||
if not os.path.isdir(folder_path):
|
||
raise Exception(f"TXT文件夹不存在: {folder_path}")
|
||
|
||
txt_files = []
|
||
|
||
for root, dirs, files in os.walk(folder_path):
|
||
for file in files:
|
||
if file.lower().endswith(".txt"):
|
||
txt_path = os.path.join(root, file)
|
||
file_name = os.path.splitext(file)[0]
|
||
txt_files.append({
|
||
"path": txt_path,
|
||
"name": file_name,
|
||
"relative_path": os.path.relpath(txt_path, folder_path),
|
||
"folder": root
|
||
})
|
||
|
||
if not txt_files:
|
||
raise Exception(f"在 {folder_path} 中未找到任何TXT文件")
|
||
|
||
return sorted(txt_files, key=lambda x: x["relative_path"])
|
||
|
||
@staticmethod
|
||
def find_matching_image_folders(txt_files: List[Dict[str, str]], images_root: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
根据TXT文件名匹配图片文件夹
|
||
|
||
Args:
|
||
txt_files: TXT文件信息列表
|
||
images_root: 图片根文件夹路径
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 匹配的文件对列表,每个元素包含txt、image_folder、all_matches
|
||
|
||
Raises:
|
||
Exception: 当图片根文件夹不存在时
|
||
"""
|
||
if not os.path.isdir(images_root):
|
||
raise Exception(f"图片根文件夹不存在: {images_root}")
|
||
|
||
matched_pairs = []
|
||
|
||
# 优化:直接在遍历过程中进行匹配,避免先获取所有文件夹
|
||
for txt in txt_files:
|
||
matches = []
|
||
txt_name = txt["name"].lower()
|
||
|
||
# 遍历图片根目录下的所有子目录进行匹配
|
||
for root, dirs, _ in os.walk(images_root):
|
||
for dir_name in dirs:
|
||
folder_path = os.path.join(root, dir_name)
|
||
folder_name = dir_name.lower()
|
||
|
||
if config.match_pattern == "exact" and txt_name == folder_name:
|
||
matches.append({
|
||
"path": folder_path,
|
||
"name": dir_name,
|
||
"relative_path": os.path.relpath(folder_path, images_root)
|
||
})
|
||
elif config.match_pattern == "prefix" and folder_name.startswith(txt_name):
|
||
matches.append({
|
||
"path": folder_path,
|
||
"name": dir_name,
|
||
"relative_path": os.path.relpath(folder_path, images_root)
|
||
})
|
||
elif config.match_pattern == "contains" and txt_name in folder_name:
|
||
matches.append({
|
||
"path": folder_path,
|
||
"name": dir_name,
|
||
"relative_path": os.path.relpath(folder_path, images_root)
|
||
})
|
||
|
||
if matches:
|
||
# 选择最短路径的匹配项
|
||
matches.sort(key=lambda x: len(x["relative_path"]))
|
||
matched_pairs.append({
|
||
"txt": txt,
|
||
"image_folder": matches[0],
|
||
"all_matches": matches
|
||
})
|
||
else:
|
||
matched_pairs.append({
|
||
"txt": txt,
|
||
"image_folder": None,
|
||
"all_matches": []
|
||
})
|
||
|
||
return matched_pairs
|
||
|
||
@staticmethod
|
||
def get_image_files(folder_path: str) -> List[str]:
|
||
"""
|
||
获取文件夹中的所有图片文件
|
||
|
||
Args:
|
||
folder_path: 图片文件夹路径
|
||
|
||
Returns:
|
||
List[str]: 图片文件路径列表,按配置的排序方式排序
|
||
"""
|
||
if not folder_path or not os.path.isdir(folder_path):
|
||
return []
|
||
|
||
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.gif', '*.webp', '*.tiff']
|
||
image_files = set() # 使用集合去重
|
||
|
||
for ext in image_extensions:
|
||
# 优化:只进行一次glob搜索,同时匹配大小写
|
||
pattern = os.path.join(folder_path, ext)
|
||
image_files.update(glob.glob(pattern))
|
||
pattern_upper = os.path.join(folder_path, ext.upper())
|
||
image_files.update(glob.glob(pattern_upper))
|
||
|
||
image_files = list(image_files) # 转换回列表
|
||
|
||
# 根据配置排序
|
||
if config.image_sort_by == "name":
|
||
image_files.sort()
|
||
elif config.image_sort_by == "time":
|
||
image_files.sort(key=lambda x: os.path.getmtime(x))
|
||
|
||
return image_files
|
||
|
||
@staticmethod
|
||
def read_markdown_txt(file_path: str) -> str:
|
||
"""
|
||
读取含Markdown内容的TXT文件
|
||
|
||
Args:
|
||
file_path: TXT文件路径
|
||
|
||
Returns:
|
||
str: 文件内容
|
||
|
||
Raises:
|
||
Exception: 当文件不存在或无法解析时
|
||
"""
|
||
if not os.path.exists(file_path):
|
||
raise Exception(f"TXT文件不存在: {file_path}")
|
||
|
||
# 尝试多种编码
|
||
encodings = [config.txt_encoding, "gbk", "utf-16", "iso-8859-1"]
|
||
|
||
for encoding in encodings:
|
||
try:
|
||
with open(file_path, 'r', encoding=encoding) as f:
|
||
content = f.read()
|
||
|
||
# 统一换行符
|
||
content = content.replace("\r\n", "\n").replace("\r", "\n")
|
||
return content
|
||
|
||
except UnicodeDecodeError:
|
||
continue
|
||
except Exception as e:
|
||
print(f"读取文件 {file_path} 时出错 (编码: {encoding}): {e}")
|
||
continue
|
||
|
||
raise Exception(f"无法解析TXT文件(编码问题): {file_path}")
|
||
|
||
@staticmethod
|
||
def prepare_output_path(txt_info: Dict[str, str], images_root: str, output_root: str) -> str:
|
||
"""
|
||
准备输出文件路径
|
||
|
||
Args:
|
||
txt_info: TXT文件信息
|
||
images_root: 图片根目录(备用)
|
||
output_root: 输出根目录
|
||
|
||
Returns:
|
||
str: 输出文件的完整路径
|
||
"""
|
||
# 根据配置决定输出位置
|
||
if config.output_location == "txt_folder":
|
||
base_folder = txt_info["folder"]
|
||
else:
|
||
base_folder = output_root
|
||
|
||
# 确保输出文件夹存在
|
||
os.makedirs(base_folder, exist_ok=True)
|
||
|
||
# 生成输出文件名
|
||
txt_name = txt_info["name"]
|
||
output_path = os.path.join(base_folder, f"{txt_name}.docx")
|
||
|
||
# 如果文件已存在,添加序号
|
||
counter = 1
|
||
while os.path.exists(output_path):
|
||
output_path = os.path.join(base_folder, f"{txt_name}_{counter}.docx")
|
||
counter += 1
|
||
|
||
return output_path
|
||
|
||
@staticmethod
|
||
def validate_paths(txt_folder: str, images_root: str, output_root: Optional[str] = None) -> Dict[str, bool]:
|
||
"""
|
||
验证路径的有效性
|
||
|
||
Args:
|
||
txt_folder: TXT文件夹路径
|
||
images_root: 图片根文件夹路径
|
||
output_root: 输出根文件夹路径(可选)
|
||
|
||
Returns:
|
||
Dict[str, bool]: 路径验证结果
|
||
"""
|
||
result = {
|
||
"txt_folder_valid": bool(txt_folder and os.path.isdir(txt_folder)),
|
||
"images_root_valid": bool(images_root and os.path.isdir(images_root)),
|
||
"output_root_valid": True # 默认有效,因为可以创建
|
||
}
|
||
|
||
# 如果指定了输出路径且配置要求使用自定义路径,则验证输出路径
|
||
if config.output_location == "custom" and output_root:
|
||
try:
|
||
# 尝试创建输出目录(如果不存在)
|
||
if not os.path.exists(output_root):
|
||
os.makedirs(output_root, exist_ok=True)
|
||
result["output_root_valid"] = os.path.isdir(output_root)
|
||
except Exception:
|
||
result["output_root_valid"] = False
|
||
|
||
return result
|
||
|
||
@staticmethod
|
||
def get_folder_statistics(folder_path: str) -> Dict[str, int]:
|
||
"""
|
||
获取文件夹统计信息
|
||
|
||
Args:
|
||
folder_path: 文件夹路径
|
||
|
||
Returns:
|
||
Dict[str, int]: 统计信息,包含txt_files、image_folders、total_images等
|
||
"""
|
||
stats = {
|
||
"txt_files": 0,
|
||
"image_folders": 0,
|
||
"total_images": 0,
|
||
"total_subfolders": 0
|
||
}
|
||
|
||
if not os.path.isdir(folder_path):
|
||
return stats
|
||
|
||
try:
|
||
# 统计TXT文件
|
||
for root, dirs, files in os.walk(folder_path):
|
||
for file in files:
|
||
if file.lower().endswith(".txt"):
|
||
stats["txt_files"] += 1
|
||
|
||
# 统计子文件夹(可能包含图片)
|
||
for root, dirs, files in os.walk(folder_path):
|
||
stats["total_subfolders"] += len(dirs)
|
||
|
||
# 检查是否包含图片
|
||
image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tiff']
|
||
has_images = any(
|
||
file.lower().endswith(ext) for file in files
|
||
for ext in image_extensions
|
||
)
|
||
|
||
if has_images:
|
||
stats["image_folders"] += 1
|
||
# 统计图片数量
|
||
for file in files:
|
||
if any(file.lower().endswith(ext) for ext in image_extensions):
|
||
stats["total_images"] += 1
|
||
|
||
except Exception as e:
|
||
print(f"获取文件夹统计信息时出错: {e}")
|
||
|
||
return stats
|
||
|
||
@staticmethod
|
||
def create_backup(file_path: str) -> str:
|
||
"""
|
||
创建文件备份
|
||
|
||
Args:
|
||
file_path: 要备份的文件路径
|
||
|
||
Returns:
|
||
str: 备份文件路径,如果备份失败则返回空字符串
|
||
"""
|
||
if not os.path.exists(file_path):
|
||
return ""
|
||
|
||
try:
|
||
backup_path = f"{file_path}.backup"
|
||
counter = 1
|
||
|
||
# 如果备份文件已存在,添加序号
|
||
while os.path.exists(backup_path):
|
||
backup_path = f"{file_path}.backup.{counter}"
|
||
counter += 1
|
||
|
||
# 复制文件
|
||
import shutil
|
||
shutil.copy2(file_path, backup_path)
|
||
return backup_path
|
||
|
||
except Exception as e:
|
||
print(f"创建备份文件失败: {e}")
|
||
return ""
|
||
|
||
@staticmethod
|
||
def cleanup_temp_files(temp_dir: str) -> None:
|
||
"""
|
||
清理临时文件
|
||
|
||
Args:
|
||
temp_dir: 临时文件目录
|
||
"""
|
||
try:
|
||
if os.path.exists(temp_dir):
|
||
import shutil
|
||
shutil.rmtree(temp_dir)
|
||
except Exception as e:
|
||
print(f"清理临时文件失败: {e}")
|
||
|
||
|
||
# 创建全局文件处理器实例
|
||
file_handler = FileHandler()
|
||
|
||
|
||
# 兼容旧接口的函数
|
||
def scan_txt_files(folder_path: str) -> List[Dict[str, str]]:
|
||
"""扫描TXT文件(兼容旧接口)"""
|
||
return FileHandler.scan_txt_files(folder_path)
|
||
|
||
|
||
def find_matching_image_folders(txt_files: List[Dict[str, str]], images_root: str) -> List[Dict[str, Any]]:
|
||
"""查找匹配的图片文件夹(兼容旧接口)"""
|
||
return FileHandler.find_matching_image_folders(txt_files, images_root)
|
||
|
||
|
||
def get_image_files(folder_path: str) -> List[str]:
|
||
"""获取图片文件(兼容旧接口)"""
|
||
return FileHandler.get_image_files(folder_path)
|
||
|
||
|
||
def read_markdown_txt(file_path: str) -> str:
|
||
"""读取Markdown TXT文件(兼容旧接口)"""
|
||
return FileHandler.read_markdown_txt(file_path)
|
||
|
||
|
||
def prepare_output_path(txt_info: Dict[str, str], images_root: str, output_root: str) -> str:
|
||
"""准备输出路径(兼容旧接口)"""
|
||
return FileHandler.prepare_output_path(txt_info, images_root, output_root) |