""" 文件处理模块 负责文件系统相关的操作,包括文件扫描、匹配、读取和路径处理等功能。 """ import os import glob from typing import List, Dict, Any from config import config class FileHandler: """文件处理器类,负责文件相关的操作""" @staticmethod def scan_txt_files(folder_path: str) -> List[Dict[str, str]]: """ 扫描文件夹中的所有TXT文件 Args: folder_path: TXT文件所在的文件夹路径 Returns: List[Dict[str, str]]: TXT文件信息列表,每个元素包含path、name、relative_path、folder Raises: Exception: 当文件夹不存在或没有找到TXT文件时 """ if not os.path.isdir(folder_path): raise Exception(f"TXT文件夹不存在: {folder_path}") txt_files = [] for root, dirs, files in os.walk(folder_path): for file in files: if file.lower().endswith(".txt"): txt_path = os.path.join(root, file) file_name = os.path.splitext(file)[0] txt_files.append({ "path": txt_path, "name": file_name, "relative_path": os.path.relpath(txt_path, folder_path), "folder": root }) if not txt_files: raise Exception(f"在 {folder_path} 中未找到任何TXT文件") return sorted(txt_files, key=lambda x: x["relative_path"]) @staticmethod def find_matching_image_folders(txt_files: List[Dict[str, str]], images_root: str) -> List[Dict[str, Any]]: """ 根据TXT文件名匹配图片文件夹 Args: txt_files: TXT文件信息列表 images_root: 图片根文件夹路径 Returns: List[Dict[str, Any]]: 匹配的文件对列表,每个元素包含txt、image_folder、all_matches Raises: Exception: 当图片根文件夹不存在时 """ if not os.path.isdir(images_root): raise Exception(f"图片根文件夹不存在: {images_root}") # 获取所有图片文件夹 all_image_folders = [] for root, dirs, _ in os.walk(images_root): for dir_name in dirs: folder_path = os.path.join(root, dir_name) all_image_folders.append({ "path": folder_path, "name": dir_name, "relative_path": os.path.relpath(folder_path, images_root) }) matched_pairs = [] for txt in txt_files: matches = FileHandler._find_matches_for_txt(txt, all_image_folders) if matches: # 选择最短路径的匹配项 matches.sort(key=lambda x: len(x["relative_path"])) matched_pairs.append({ "txt": txt, "image_folder": matches[0], "all_matches": matches }) else: matched_pairs.append({ "txt": txt, "image_folder": None, "all_matches": [] }) return matched_pairs @staticmethod def _find_matches_for_txt(txt_info: Dict[str, str], image_folders: List[Dict[str, str]]) -> List[Dict[str, str]]: """ 为单个TXT文件查找匹配的图片文件夹 Args: txt_info: TXT文件信息 image_folders: 所有图片文件夹信息列表 Returns: List[Dict[str, str]]: 匹配的图片文件夹列表 """ matches = [] txt_name = txt_info["name"].lower() for img_folder in image_folders: folder_name = img_folder["name"].lower() if config.match_pattern == "exact" and txt_name == folder_name: matches.append(img_folder) elif config.match_pattern == "prefix" and folder_name.startswith(txt_name): matches.append(img_folder) elif config.match_pattern == "contains" and txt_name in folder_name: matches.append(img_folder) return matches @staticmethod def get_image_files(folder_path: str) -> List[str]: """ 获取文件夹中的所有图片文件 Args: folder_path: 图片文件夹路径 Returns: List[str]: 图片文件路径列表,按配置的排序方式排序 """ if not folder_path or not os.path.isdir(folder_path): return [] image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.gif', '*.webp', '*.tiff'] image_files = [] for ext in image_extensions: pattern = os.path.join(folder_path, ext) image_files.extend(glob.glob(pattern)) # 也检查大写扩展名 pattern_upper = os.path.join(folder_path, ext.upper()) image_files.extend(glob.glob(pattern_upper)) # 去重(防止大小写扩展名重复) image_files = list(set(image_files)) # 根据配置排序 if config.image_sort_by == "name": image_files.sort() elif config.image_sort_by == "time": image_files.sort(key=lambda x: os.path.getmtime(x)) return image_files @staticmethod def read_markdown_txt(file_path: str) -> str: """ 读取含Markdown内容的TXT文件 Args: file_path: TXT文件路径 Returns: str: 文件内容 Raises: Exception: 当文件不存在或无法解析时 """ if not os.path.exists(file_path): raise Exception(f"TXT文件不存在: {file_path}") # 尝试多种编码 encodings = [config.txt_encoding, "gbk", "utf-16", "iso-8859-1"] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() # 统一换行符 content = content.replace("\r\n", "\n").replace("\r", "\n") return content except UnicodeDecodeError: continue except Exception as e: print(f"读取文件 {file_path} 时出错 (编码: {encoding}): {e}") continue raise Exception(f"无法解析TXT文件(编码问题): {file_path}") @staticmethod def prepare_output_path(txt_info: Dict[str, str], images_root: str, output_root: str) -> str: """ 准备输出文件路径 Args: txt_info: TXT文件信息 images_root: 图片根目录(备用) output_root: 输出根目录 Returns: str: 输出文件的完整路径 """ # 根据配置决定输出位置 if config.output_location == "txt_folder": base_folder = txt_info["folder"] else: base_folder = output_root # 确保输出文件夹存在 os.makedirs(base_folder, exist_ok=True) # 生成输出文件名 txt_name = txt_info["name"] output_path = os.path.join(base_folder, f"{txt_name}.docx") # 如果文件已存在,添加序号 counter = 1 while os.path.exists(output_path): output_path = os.path.join(base_folder, f"{txt_name}_{counter}.docx") counter += 1 return output_path @staticmethod def validate_paths(txt_folder: str, images_root: str, output_root: str = None) -> Dict[str, bool]: """ 验证路径的有效性 Args: txt_folder: TXT文件夹路径 images_root: 图片根文件夹路径 output_root: 输出根文件夹路径(可选) Returns: Dict[str, bool]: 路径验证结果 """ result = { "txt_folder_valid": bool(txt_folder and os.path.isdir(txt_folder)), "images_root_valid": bool(images_root and os.path.isdir(images_root)), "output_root_valid": True # 默认有效,因为可以创建 } # 如果指定了输出路径且配置要求使用自定义路径,则验证输出路径 if config.output_location == "custom" and output_root: try: # 尝试创建输出目录(如果不存在) if not os.path.exists(output_root): os.makedirs(output_root, exist_ok=True) result["output_root_valid"] = os.path.isdir(output_root) except Exception: result["output_root_valid"] = False return result @staticmethod def get_folder_statistics(folder_path: str) -> Dict[str, int]: """ 获取文件夹统计信息 Args: folder_path: 文件夹路径 Returns: Dict[str, int]: 统计信息,包含txt_files、image_folders、total_images等 """ stats = { "txt_files": 0, "image_folders": 0, "total_images": 0, "total_subfolders": 0 } if not os.path.isdir(folder_path): return stats try: # 统计TXT文件 for root, dirs, files in os.walk(folder_path): for file in files: if file.lower().endswith(".txt"): stats["txt_files"] += 1 # 统计子文件夹(可能包含图片) for root, dirs, files in os.walk(folder_path): stats["total_subfolders"] += len(dirs) # 检查是否包含图片 image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tiff'] has_images = any( file.lower().endswith(ext) for file in files for ext in image_extensions ) if has_images: stats["image_folders"] += 1 # 统计图片数量 for file in files: if any(file.lower().endswith(ext) for ext in image_extensions): stats["total_images"] += 1 except Exception as e: print(f"获取文件夹统计信息时出错: {e}") return stats @staticmethod def create_backup(file_path: str) -> str: """ 创建文件备份 Args: file_path: 要备份的文件路径 Returns: str: 备份文件路径,如果备份失败则返回空字符串 """ if not os.path.exists(file_path): return "" try: backup_path = f"{file_path}.backup" counter = 1 # 如果备份文件已存在,添加序号 while os.path.exists(backup_path): backup_path = f"{file_path}.backup.{counter}" counter += 1 # 复制文件 import shutil shutil.copy2(file_path, backup_path) return backup_path except Exception as e: print(f"创建备份文件失败: {e}") return "" @staticmethod def cleanup_temp_files(temp_dir: str) -> None: """ 清理临时文件 Args: temp_dir: 临时文件目录 """ try: if os.path.exists(temp_dir): import shutil shutil.rmtree(temp_dir) except Exception as e: print(f"清理临时文件失败: {e}") # 创建全局文件处理器实例 file_handler = FileHandler() # 兼容旧接口的函数 def scan_txt_files(folder_path: str) -> List[Dict[str, str]]: """扫描TXT文件(兼容旧接口)""" return FileHandler.scan_txt_files(folder_path) def find_matching_image_folders(txt_files: List[Dict[str, str]], images_root: str) -> List[Dict[str, Any]]: """查找匹配的图片文件夹(兼容旧接口)""" return FileHandler.find_matching_image_folders(txt_files, images_root) def get_image_files(folder_path: str) -> List[str]: """获取图片文件(兼容旧接口)""" return FileHandler.get_image_files(folder_path) def read_markdown_txt(file_path: str) -> str: """读取Markdown TXT文件(兼容旧接口)""" return FileHandler.read_markdown_txt(file_path) def prepare_output_path(txt_info: Dict[str, str], images_root: str, output_root: str) -> str: """准备输出路径(兼容旧接口)""" return FileHandler.prepare_output_path(txt_info, images_root, output_root)