import os import tempfile import json from typing import List, Dict, Tuple, Optional from utils import get_file_hash, read_text_file, split_into_paragraphs, clean_text from content_analyzer import analyzer from exceptions import DocumentProcessingError from security import security_manager class TextFileProcessor: def __init__(self, upload_folder: str = "uploads"): self.upload_folder = upload_folder if not os.path.exists(upload_folder): os.makedirs(upload_folder) # Create a cache file for file hashes self.hash_cache_file = os.path.join(upload_folder, "hash_cache.json") self.load_hash_cache() def load_hash_cache(self): """Load file hash cache from disk""" if os.path.exists(self.hash_cache_file): with open(self.hash_cache_file, "r", encoding="utf-8") as f: self.hash_cache = json.load(f) else: self.hash_cache = {} def save_hash_cache(self): """Save file hash cache to disk""" with open(self.hash_cache_file, "w", encoding="utf-8") as f: json.dump(self.hash_cache, f, ensure_ascii=False, indent=2) def is_file_processed(self, file_path: str) -> bool: """Check if file has been processed before""" file_hash = get_file_hash(file_path) return file_hash in self.hash_cache def mark_file_as_processed(self, file_path: str, segments: List[Dict], analysis: Optional[Dict] = None): """Mark file as processed and cache its hash""" file_hash = get_file_hash(file_path) cache_entry = { "file_name": os.path.basename(file_path), "segments": segments } # 添加内容分析结果 if analysis: cache_entry["analysis"] = analysis self.hash_cache[file_hash] = cache_entry self.save_hash_cache() def get_cached_segments(self, file_path: str) -> List[Dict]: """Get cached segments for a processed file""" file_hash = get_file_hash(file_path) if file_hash in self.hash_cache: return self.hash_cache[file_hash]["segments"] return [] def get_cached_analysis(self, file_path: str) -> Dict: """Get cached analysis for a processed file""" file_hash = get_file_hash(file_path) if file_hash in self.hash_cache and "analysis" in self.hash_cache[file_hash]: return self.hash_cache[file_hash]["analysis"] return {} def process_files(self, file_paths: List[str], progress_callback=None) -> Tuple[List[Dict], int, int]: """ Process uploaded text files with progress tracking Returns: (processed_segments, empty_files_count, duplicate_files_count) """ processed_segments = [] empty_files_count = 0 duplicate_files_count = 0 total_files = len(file_paths) for i, file_path in enumerate(file_paths): # Call progress callback if provided if progress_callback: progress_callback(i, total_files, f"处理文件 {i+1}/{total_files}") # 验证文件安全性 is_valid, error_msg = security_manager.validate_file(file_path) if not is_valid: print(f"文件验证失败 {file_path}: {error_msg}") empty_files_count += 1 continue # Check if file is empty if os.path.getsize(file_path) == 0: empty_files_count += 1 continue # Check if file has been processed before (for incremental update) if self.is_file_processed(file_path): # Use cached segments cached_segments = self.get_cached_segments(file_path) processed_segments.extend(cached_segments) duplicate_files_count += 1 continue # Process file try: # Read file content content = read_text_file(file_path) # 清理文本内容 content = security_manager.sanitize_text(content) # Clean text cleaned_content = clean_text(content) # Split into paragraphs paragraphs = split_into_paragraphs(cleaned_content) # Analyze content for keywords and summary analyzer_instance = analyzer # Assuming analyzer is accessible analysis = analyzer_instance.analyze_content(cleaned_content) # Create segments with metadata file_name = os.path.basename(file_path) file_segments = [] for j, paragraph in enumerate(paragraphs): if paragraph.strip(): # Only add non-empty paragraphs # 清理段落内容 clean_paragraph = security_manager.sanitize_text(paragraph) segment = { "content": clean_paragraph, "metadata": { "file_name": file_name, "paragraph_id": j + 1, "segment_id": f"{file_name}_{j + 1}" } } file_segments.append(segment) processed_segments.append(segment) # Cache processed file with analysis self.mark_file_as_processed(file_path, file_segments, analysis) except Exception as e: error_msg = f"Error processing file {file_path}: {str(e)}" print(error_msg) # Log error but continue with other files # Continue with other files even if one fails return processed_segments, empty_files_count, duplicate_files_count