148 lines
6.0 KiB
Python
148 lines
6.0 KiB
Python
import os
|
|
import tempfile
|
|
import json
|
|
from typing import List, Dict, Tuple, Optional
|
|
from utils import get_file_hash, read_text_file, split_into_paragraphs, clean_text
|
|
from content_analyzer import analyzer
|
|
from exceptions import DocumentProcessingError
|
|
from security import security_manager
|
|
|
|
class TextFileProcessor:
|
|
def __init__(self, upload_folder: str = "uploads"):
|
|
self.upload_folder = upload_folder
|
|
if not os.path.exists(upload_folder):
|
|
os.makedirs(upload_folder)
|
|
|
|
# Create a cache file for file hashes
|
|
self.hash_cache_file = os.path.join(upload_folder, "hash_cache.json")
|
|
self.load_hash_cache()
|
|
|
|
def load_hash_cache(self):
|
|
"""Load file hash cache from disk"""
|
|
if os.path.exists(self.hash_cache_file):
|
|
with open(self.hash_cache_file, "r", encoding="utf-8") as f:
|
|
self.hash_cache = json.load(f)
|
|
else:
|
|
self.hash_cache = {}
|
|
|
|
def save_hash_cache(self):
|
|
"""Save file hash cache to disk"""
|
|
with open(self.hash_cache_file, "w", encoding="utf-8") as f:
|
|
json.dump(self.hash_cache, f, ensure_ascii=False, indent=2)
|
|
|
|
def is_file_processed(self, file_path: str) -> bool:
|
|
"""Check if file has been processed before"""
|
|
file_hash = get_file_hash(file_path)
|
|
return file_hash in self.hash_cache
|
|
|
|
def mark_file_as_processed(self, file_path: str, segments: List[Dict], analysis: Optional[Dict] = None):
|
|
"""Mark file as processed and cache its hash"""
|
|
file_hash = get_file_hash(file_path)
|
|
cache_entry = {
|
|
"file_name": os.path.basename(file_path),
|
|
"segments": segments
|
|
}
|
|
|
|
# 添加内容分析结果
|
|
if analysis:
|
|
cache_entry["analysis"] = analysis
|
|
|
|
self.hash_cache[file_hash] = cache_entry
|
|
self.save_hash_cache()
|
|
|
|
def get_cached_segments(self, file_path: str) -> List[Dict]:
|
|
"""Get cached segments for a processed file"""
|
|
file_hash = get_file_hash(file_path)
|
|
if file_hash in self.hash_cache:
|
|
return self.hash_cache[file_hash]["segments"]
|
|
return []
|
|
|
|
def get_cached_analysis(self, file_path: str) -> Dict:
|
|
"""Get cached analysis for a processed file"""
|
|
file_hash = get_file_hash(file_path)
|
|
if file_hash in self.hash_cache and "analysis" in self.hash_cache[file_hash]:
|
|
return self.hash_cache[file_hash]["analysis"]
|
|
return {}
|
|
|
|
def process_files(self, file_paths: List[str], progress_callback=None) -> Tuple[List[Dict], int, int]:
|
|
"""
|
|
Process uploaded text files with progress tracking
|
|
Returns: (processed_segments, empty_files_count, duplicate_files_count)
|
|
"""
|
|
processed_segments = []
|
|
empty_files_count = 0
|
|
duplicate_files_count = 0
|
|
|
|
total_files = len(file_paths)
|
|
|
|
for i, file_path in enumerate(file_paths):
|
|
# Call progress callback if provided
|
|
if progress_callback:
|
|
progress_callback(i, total_files, f"处理文件 {i+1}/{total_files}")
|
|
|
|
# 验证文件安全性
|
|
is_valid, error_msg = security_manager.validate_file(file_path)
|
|
if not is_valid:
|
|
print(f"文件验证失败 {file_path}: {error_msg}")
|
|
empty_files_count += 1
|
|
continue
|
|
|
|
# Check if file is empty
|
|
if os.path.getsize(file_path) == 0:
|
|
empty_files_count += 1
|
|
continue
|
|
|
|
# Check if file has been processed before (for incremental update)
|
|
if self.is_file_processed(file_path):
|
|
# Use cached segments
|
|
cached_segments = self.get_cached_segments(file_path)
|
|
processed_segments.extend(cached_segments)
|
|
duplicate_files_count += 1
|
|
continue
|
|
|
|
# Process file
|
|
try:
|
|
# Read file content
|
|
content = read_text_file(file_path)
|
|
|
|
# 清理文本内容
|
|
content = security_manager.sanitize_text(content)
|
|
|
|
# Clean text
|
|
cleaned_content = clean_text(content)
|
|
|
|
# Split into paragraphs
|
|
paragraphs = split_into_paragraphs(cleaned_content)
|
|
|
|
# Analyze content for keywords and summary
|
|
analyzer_instance = analyzer # Assuming analyzer is accessible
|
|
analysis = analyzer_instance.analyze_content(cleaned_content)
|
|
|
|
# Create segments with metadata
|
|
file_name = os.path.basename(file_path)
|
|
file_segments = []
|
|
for j, paragraph in enumerate(paragraphs):
|
|
if paragraph.strip(): # Only add non-empty paragraphs
|
|
# 清理段落内容
|
|
clean_paragraph = security_manager.sanitize_text(paragraph)
|
|
segment = {
|
|
"content": clean_paragraph,
|
|
"metadata": {
|
|
"file_name": file_name,
|
|
"paragraph_id": j + 1,
|
|
"segment_id": f"{file_name}_{j + 1}"
|
|
}
|
|
}
|
|
file_segments.append(segment)
|
|
processed_segments.append(segment)
|
|
|
|
# Cache processed file with analysis
|
|
self.mark_file_as_processed(file_path, file_segments, analysis)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error processing file {file_path}: {str(e)}"
|
|
print(error_msg)
|
|
# Log error but continue with other files
|
|
# Continue with other files even if one fails
|
|
|
|
return processed_segments, empty_files_count, duplicate_files_count |